# 2022q1 Homework5 (quiz8)
contributed by < `yaohwang99` >
## Problem 2
```c
typedef uintptr_t ringidx_t;
struct element {
void *ptr;
uintptr_t idx;
};
```
Aligned to the size of cache line for cache friendly code.
```c
struct lfring {
ringidx_t head;
ringidx_t tail ALIGNED(CACHE_LINE);
uint32_t mask;
uint32_t flags;
struct element ring[] ALIGNED(CACHE_LINE);
} ALIGNED(CACHE_LINE);
typedef struct lfring lfring_t;
```
---
1. Allocate memory for `ifring`
2. `ringsz` is rounded up to power of 2, therefore we can use mask to calculate modulo.
3. The indices of `ring[i]` is `i - ringsz`, for example, if ringsz is 8, then the indices are -8, -7, -6,...,-1.
```c
lfring_t *lfring_alloc(uint32_t n_elems, uint32_t flags)
{
unsigned long ringsz = ROUNDUP_POW2(n_elems);
if (n_elems == 0 || ringsz == 0 || ringsz > 0x80000000) {
assert(0 && "invalid number of elements");
return NULL;
}
if ((flags & ~SUPPORTED_FLAGS) != 0) {
assert(0 && "invalid flags");
return NULL;
}
size_t nbytes = sizeof(lfring_t) + ringsz * sizeof(struct element);
lfring_t *lfr = osal_alloc(nbytes, CACHE_LINE);
if (!lfr)
return NULL;
lfr->head = 0, lfr->tail = 0;
lfr->mask = ringsz - 1;
lfr->flags = flags;
for (ringidx_t i = 0; i < ringsz; i++) {
lfr->ring[i].ptr = NULL;
lfr->ring[i].idx = i - ringsz;
}
return lfr;
}
```
```graphviz
digraph{
compound = true
rankdir = LR
subgraph cluster0{
style = filled
color = gray90
node0[shape = record, label = "head|tail|mask|flag|<ring>ring"]
}
subgraph cluster1{
style = filled
color = gray90
node1[shape = record, label = "{<ptr0>ptr|ptr|ptr|ptr|ptr|ptr|ptr|ptr}|{idx|idx|idx|idx|idx|idx|idx|idx}"]
}
NULL[shape = plaintext]
node0->node1[lhead = cluster1]
node1:ptr0->NULL
}
```
---
The enqueue steps can be summarized as the following:
1. Check if the slot is available.
2. Prepare the `neu` data.
3. If the slot is still available, write `neu` to the ring, otherwise, move to next slot and repeat from step 1.
4. Update tail if the tail is the newest.
```c=
uint32_t lfring_enqueue(lfring_t *lfr,
void *const *restrict elems,
uint32_t n_elems)
{
intptr_t actual = 0;
ringidx_t mask = lfr->mask;
ringidx_t size = mask + 1;
ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_RELAXED);
if (lfr->flags & LFRING_FLAG_SP) { /* single-producer */
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE);
actual = MIN((intptr_t)(head + size - tail), (intptr_t) n_elems);
if (actual <= 0)
return 0;
for (uint32_t i = 0; i < (uint32_t) actual; i++) {
assert(lfr->ring[tail & mask].idx == tail - size);
lfr->ring[tail & mask].ptr = *elems++;
lfr->ring[tail & mask].idx = tail;
tail++;
}
__atomic_store_n(&lfr->tail, tail, __ATOMIC_RELEASE);
return (uint32_t) actual;
}
/* else: lock-free multi-producer */
```
At line 29, `before(tail, __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE) + size)` checks if there are empty slots in the ring.
At line 39, `(old.e.idx == tail - size)` if `tail` is available, if not, then `old.e.idx` should be equal to `tail` if it is used by other producer recently.
`cond_reload(tail, &lfr->tail)` returns the newest tail, either `tail + 1` or `lfr->tail`.
```c=27
restart:
while ((uint32_t) actual < n_elems &&
before(tail, __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE) + size)) {
union {
struct element e;
ptrpair_t pp;
} old, neu;
void *elem = elems[actual];
struct element *slot = &lfr->ring[tail & mask];
old.e.ptr = __atomic_load_n(&slot->ptr, __ATOMIC_RELAXED);
old.e.idx = __atomic_load_n(&slot->idx, __ATOMIC_RELAXED);
do {
if (UNLIKELY(old.e.idx != tail - size)) {
if (old.e.idx != tail) {
/* We are far behind. Restart with fresh index */
tail = cond_reload(tail, &lfr->tail);
goto restart;
}
/* slot already enqueued */
tail++; /* Try next slot */
goto restart;
}
/* Found slot that was used one lap back.
* Try to enqueue next element.
*/
neu.e.ptr = elem;
neu.e.idx = tail; /* Set idx on enqueue */
} while (!lf_compare_exchange((ptrpair_t *) slot, &old.pp, neu.pp));
/* Enqueue succeeded */
actual++;
tail++; /* Continue with next slot */
}
(void) cond_update(&lfr->tail, tail);
return (uint32_t) actual;
}
```
The dequeue steps can be summarized as the following:
1. Check if the slots contain item.
2. Copy to `elems[]`.
3. If the slots are not dequeued by other consumer, update `lfr->head`, otherwise, update `head` to the new `lfr->head` and repeat from step 1.
```c=
uint32_t lfring_dequeue(lfring_t *lfr,
void **restrict elems,
uint32_t n_elems,
uint32_t *index)
{
ringidx_t mask = lfr->mask;
intptr_t actual;
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED);
ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
do {
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
if (UNLIKELY(actual <= 0)) {
/* Ring buffer is empty, scan for new but unreleased elements */
tail = find_tail(lfr, head, tail);
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
if (actual <= 0)
return 0;
}
for (uint32_t i = 0; i < (uint32_t) actual; i++)
elems[i] = lfr->ring[(head + i) & mask].ptr;
smp_fence(LoadStore); // Order loads only
if (UNLIKELY(lfr->flags & LFRING_FLAG_SC)) { /* Single-consumer */
__atomic_store_n(&lfr->head, head + actual, __ATOMIC_RELAXED);
break;
}
/* else: lock-free multi-consumer */
} while (!__atomic_compare_exchange_n(
&lfr->head, &head, /* Updated on failure */
/* XXXXX */ head + actual,
/* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
*index = (uint32_t) head;
return (uint32_t) actual;
}
```