# 2022q1 Homework5 (quiz8) contributed by < `yaohwang99` > ## Problem 2 ```c typedef uintptr_t ringidx_t; struct element { void *ptr; uintptr_t idx; }; ``` Aligned to the size of cache line for cache friendly code. ```c struct lfring { ringidx_t head; ringidx_t tail ALIGNED(CACHE_LINE); uint32_t mask; uint32_t flags; struct element ring[] ALIGNED(CACHE_LINE); } ALIGNED(CACHE_LINE); typedef struct lfring lfring_t; ``` --- 1. Allocate memory for `ifring` 2. `ringsz` is rounded up to power of 2, therefore we can use mask to calculate modulo. 3. The indices of `ring[i]` is `i - ringsz`, for example, if ringsz is 8, then the indices are -8, -7, -6,...,-1. ```c lfring_t *lfring_alloc(uint32_t n_elems, uint32_t flags) { unsigned long ringsz = ROUNDUP_POW2(n_elems); if (n_elems == 0 || ringsz == 0 || ringsz > 0x80000000) { assert(0 && "invalid number of elements"); return NULL; } if ((flags & ~SUPPORTED_FLAGS) != 0) { assert(0 && "invalid flags"); return NULL; } size_t nbytes = sizeof(lfring_t) + ringsz * sizeof(struct element); lfring_t *lfr = osal_alloc(nbytes, CACHE_LINE); if (!lfr) return NULL; lfr->head = 0, lfr->tail = 0; lfr->mask = ringsz - 1; lfr->flags = flags; for (ringidx_t i = 0; i < ringsz; i++) { lfr->ring[i].ptr = NULL; lfr->ring[i].idx = i - ringsz; } return lfr; } ``` ```graphviz digraph{ compound = true rankdir = LR subgraph cluster0{ style = filled color = gray90 node0[shape = record, label = "head|tail|mask|flag|<ring>ring"] } subgraph cluster1{ style = filled color = gray90 node1[shape = record, label = "{<ptr0>ptr|ptr|ptr|ptr|ptr|ptr|ptr|ptr}|{idx|idx|idx|idx|idx|idx|idx|idx}"] } NULL[shape = plaintext] node0->node1[lhead = cluster1] node1:ptr0->NULL } ``` --- The enqueue steps can be summarized as the following: 1. Check if the slot is available. 2. Prepare the `neu` data. 3. If the slot is still available, write `neu` to the ring, otherwise, move to next slot and repeat from step 1. 4. Update tail if the tail is the newest. ```c= uint32_t lfring_enqueue(lfring_t *lfr, void *const *restrict elems, uint32_t n_elems) { intptr_t actual = 0; ringidx_t mask = lfr->mask; ringidx_t size = mask + 1; ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_RELAXED); if (lfr->flags & LFRING_FLAG_SP) { /* single-producer */ ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE); actual = MIN((intptr_t)(head + size - tail), (intptr_t) n_elems); if (actual <= 0) return 0; for (uint32_t i = 0; i < (uint32_t) actual; i++) { assert(lfr->ring[tail & mask].idx == tail - size); lfr->ring[tail & mask].ptr = *elems++; lfr->ring[tail & mask].idx = tail; tail++; } __atomic_store_n(&lfr->tail, tail, __ATOMIC_RELEASE); return (uint32_t) actual; } /* else: lock-free multi-producer */ ``` At line 29, `before(tail, __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE) + size)` checks if there are empty slots in the ring. At line 39, `(old.e.idx == tail - size)` if `tail` is available, if not, then `old.e.idx` should be equal to `tail` if it is used by other producer recently. `cond_reload(tail, &lfr->tail)` returns the newest tail, either `tail + 1` or `lfr->tail`. ```c=27 restart: while ((uint32_t) actual < n_elems && before(tail, __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE) + size)) { union { struct element e; ptrpair_t pp; } old, neu; void *elem = elems[actual]; struct element *slot = &lfr->ring[tail & mask]; old.e.ptr = __atomic_load_n(&slot->ptr, __ATOMIC_RELAXED); old.e.idx = __atomic_load_n(&slot->idx, __ATOMIC_RELAXED); do { if (UNLIKELY(old.e.idx != tail - size)) { if (old.e.idx != tail) { /* We are far behind. Restart with fresh index */ tail = cond_reload(tail, &lfr->tail); goto restart; } /* slot already enqueued */ tail++; /* Try next slot */ goto restart; } /* Found slot that was used one lap back. * Try to enqueue next element. */ neu.e.ptr = elem; neu.e.idx = tail; /* Set idx on enqueue */ } while (!lf_compare_exchange((ptrpair_t *) slot, &old.pp, neu.pp)); /* Enqueue succeeded */ actual++; tail++; /* Continue with next slot */ } (void) cond_update(&lfr->tail, tail); return (uint32_t) actual; } ``` The dequeue steps can be summarized as the following: 1. Check if the slots contain item. 2. Copy to `elems[]`. 3. If the slots are not dequeued by other consumer, update `lfr->head`, otherwise, update `head` to the new `lfr->head` and repeat from step 1. ```c= uint32_t lfring_dequeue(lfring_t *lfr, void **restrict elems, uint32_t n_elems, uint32_t *index) { ringidx_t mask = lfr->mask; intptr_t actual; ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED); ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE); do { actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems); if (UNLIKELY(actual <= 0)) { /* Ring buffer is empty, scan for new but unreleased elements */ tail = find_tail(lfr, head, tail); actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems); if (actual <= 0) return 0; } for (uint32_t i = 0; i < (uint32_t) actual; i++) elems[i] = lfr->ring[(head + i) & mask].ptr; smp_fence(LoadStore); // Order loads only if (UNLIKELY(lfr->flags & LFRING_FLAG_SC)) { /* Single-consumer */ __atomic_store_n(&lfr->head, head + actual, __ATOMIC_RELAXED); break; } /* else: lock-free multi-consumer */ } while (!__atomic_compare_exchange_n( &lfr->head, &head, /* Updated on failure */ /* XXXXX */ head + actual, /* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); *index = (uint32_t) head; return (uint32_t) actual; } ```