Try   HackMD

2022q1 Homework5 (quiz8)

contributed by < yaohwang99 >

Problem 2

typedef uintptr_t ringidx_t;

struct element {
    void *ptr;
    uintptr_t idx;
};

Aligned to the size of cache line for cache friendly code.

struct lfring {
    ringidx_t head;
    ringidx_t tail ALIGNED(CACHE_LINE);
    uint32_t mask;
    uint32_t flags;
    struct element ring[] ALIGNED(CACHE_LINE);
} ALIGNED(CACHE_LINE);

typedef struct lfring lfring_t;

  1. Allocate memory for ifring
  2. ringsz is rounded up to power of 2, therefore we can use mask to calculate modulo.
  3. The indices of ring[i] is i - ringsz, for example, if ringsz is 8, then the indices are -8, -7, -6,,-1.
lfring_t *lfring_alloc(uint32_t n_elems, uint32_t flags)
{
    unsigned long ringsz = ROUNDUP_POW2(n_elems);
    if (n_elems == 0 || ringsz == 0 || ringsz > 0x80000000) {
        assert(0 && "invalid number of elements");
        return NULL;
    }
    if ((flags & ~SUPPORTED_FLAGS) != 0) {
        assert(0 && "invalid flags");
        return NULL;
    }

    size_t nbytes = sizeof(lfring_t) + ringsz * sizeof(struct element);
    lfring_t *lfr = osal_alloc(nbytes, CACHE_LINE);
    if (!lfr)
        return NULL;

    lfr->head = 0, lfr->tail = 0;
    lfr->mask = ringsz - 1;
    lfr->flags = flags;
    for (ringidx_t i = 0; i < ringsz; i++) {
        lfr->ring[i].ptr = NULL;
        lfr->ring[i].idx = i - ringsz;
    }
    return lfr;
}






%0


cluster0



cluster1




node0

head

tail

mask

flag

ring



node1

ptr

ptr

ptr

ptr

ptr

ptr

ptr

ptr

idx

idx

idx

idx

idx

idx

idx

idx



node0->node1





NULL
NULL



node1:ptr0->NULL






The enqueue steps can be summarized as the following:

  1. Check if the slot is available.
  2. Prepare the neu data.
  3. If the slot is still available, write neu to the ring, otherwise, move to next slot and repeat from step 1.
  4. Update tail if the tail is the newest.
uint32_t lfring_enqueue(lfring_t *lfr, void *const *restrict elems, uint32_t n_elems) { intptr_t actual = 0; ringidx_t mask = lfr->mask; ringidx_t size = mask + 1; ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_RELAXED); if (lfr->flags & LFRING_FLAG_SP) { /* single-producer */ ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE); actual = MIN((intptr_t)(head + size - tail), (intptr_t) n_elems); if (actual <= 0) return 0; for (uint32_t i = 0; i < (uint32_t) actual; i++) { assert(lfr->ring[tail & mask].idx == tail - size); lfr->ring[tail & mask].ptr = *elems++; lfr->ring[tail & mask].idx = tail; tail++; } __atomic_store_n(&lfr->tail, tail, __ATOMIC_RELEASE); return (uint32_t) actual; } /* else: lock-free multi-producer */

At line 29, before(tail, __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE) + size) checks if there are empty slots in the ring.
At line 39, (old.e.idx == tail - size) if tail is available, if not, then old.e.idx should be equal to tail if it is used by other producer recently.
cond_reload(tail, &lfr->tail) returns the newest tail, either tail + 1 or lfr->tail.

restart: while ((uint32_t) actual < n_elems && before(tail, __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE) + size)) { union { struct element e; ptrpair_t pp; } old, neu; void *elem = elems[actual]; struct element *slot = &lfr->ring[tail & mask]; old.e.ptr = __atomic_load_n(&slot->ptr, __ATOMIC_RELAXED); old.e.idx = __atomic_load_n(&slot->idx, __ATOMIC_RELAXED); do { if (UNLIKELY(old.e.idx != tail - size)) { if (old.e.idx != tail) { /* We are far behind. Restart with fresh index */ tail = cond_reload(tail, &lfr->tail); goto restart; } /* slot already enqueued */ tail++; /* Try next slot */ goto restart; } /* Found slot that was used one lap back. * Try to enqueue next element. */ neu.e.ptr = elem; neu.e.idx = tail; /* Set idx on enqueue */ } while (!lf_compare_exchange((ptrpair_t *) slot, &old.pp, neu.pp)); /* Enqueue succeeded */ actual++; tail++; /* Continue with next slot */ } (void) cond_update(&lfr->tail, tail); return (uint32_t) actual; }

The dequeue steps can be summarized as the following:

  1. Check if the slots contain item.
  2. Copy to elems[].
  3. If the slots are not dequeued by other consumer, update lfr->head, otherwise, update head to the new lfr->head and repeat from step 1.
uint32_t lfring_dequeue(lfring_t *lfr, void **restrict elems, uint32_t n_elems, uint32_t *index) { ringidx_t mask = lfr->mask; intptr_t actual; ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED); ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE); do { actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems); if (UNLIKELY(actual <= 0)) { /* Ring buffer is empty, scan for new but unreleased elements */ tail = find_tail(lfr, head, tail); actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems); if (actual <= 0) return 0; } for (uint32_t i = 0; i < (uint32_t) actual; i++) elems[i] = lfr->ring[(head + i) & mask].ptr; smp_fence(LoadStore); // Order loads only if (UNLIKELY(lfr->flags & LFRING_FLAG_SC)) { /* Single-consumer */ __atomic_store_n(&lfr->head, head + actual, __ATOMIC_RELAXED); break; } /* else: lock-free multi-consumer */ } while (!__atomic_compare_exchange_n( &lfr->head, &head, /* Updated on failure */ /* XXXXX */ head + actual, /* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); *index = (uint32_t) head; return (uint32_t) actual; }