Try   HackMD

2022q1 Homework5 (quiz8)

contributed by < Eddielin0926 >

第八週測驗題

測驗 1


測驗 2 lfring

lfring 是個 lock-free ring buffer 實作,並支援 multiple-producer/multiple-consumer (MPMC)

static inline ringidx_t cond_reload(ringidx_t idx, const ringidx_t *loc)
{
    ringidx_t fresh = __atomic_load_n(loc, __ATOMIC_RELAXED);
    if (before(idx, fresh)) { /* fresh is after idx, use this instead */
        idx = fresh;
    } else { /* Continue with next slot */
        idx++; /* XXXXX DDD; */ 
    }
    return idx;
}

static inline ringidx_t find_tail(lfring_t *lfr, ringidx_t head, ringidx_t tail)
{
    if (lfr->flags & LFRING_FLAG_SP) /* single-producer enqueue */
        return __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);

    /* Multi-producer enqueue.
     * Scan ring for new elements that have been written but not released.
     */
    ringidx_t mask = lfr->mask;
    ringidx_t size = mask + 1; /* XXXXX KKK; */
    while (before(tail, head + size) &&
           __atomic_load_n(&lfr->ring[tail & mask].idx, __ATOMIC_ACQUIRE) ==
               tail)
        tail++;
    tail = cond_update(&lfr->tail, tail);
    return tail;
}

uint32_t lfring_dequeue(lfring_t *lfr,
                        void **restrict elems,
                        uint32_t n_elems,
                        uint32_t *index)
{
    ringidx_t mask = lfr->mask;
    intptr_t actual;
    ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED);
    ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
    do { /* skipped */
    } while (!__atomic_compare_exchange_n(
        &lfr->head, &head, /* Updated on failure */
        /* XXXXX */ HHH,
        /* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
    *index = (uint32_t) head;
    return (uint32_t) actual;
}
Question Answer
DDD idx++;
KKK mask + 1
TTT &lfr->ring[tail & mask].idx, __ATOMIC_ACQUIRE
HHH head + actual

解釋上述程式碼運作原理

這個程式是生產者消費者問題的實作,以有限環狀鏈結串列解決緩衝區問題。
tests.cmain 開始理解,可以看到 main 是由四種測試狀況構成的,分別是 multiple producer multiple consumer (MPMC), multiple producer single consumer (MPSC), single producer multiple consumer (SPMC), single producer single consumer (SPSC)。

int main(void)
{
    printf("testing MPMC lock-free ring\n");
    test_ringbuffer(LFRING_FLAG_MP | LFRING_FLAG_MC);

    printf("testing MPSC lock-free ring\n");
    test_ringbuffer(LFRING_FLAG_MP | LFRING_FLAG_SC);

    printf("testing SPMC lock-free ring\n");
    test_ringbuffer(LFRING_FLAG_SP | LFRING_FLAG_MC);

    printf("testing SPSC lock-free ring\n");
    test_ringbuffer(LFRING_FLAG_SP | LFRING_FLAG_SC);

    return 0;
}

test_ringbuffer 的參數 flag 會決定測試的生產者消費者組合,有四種前綴是 LFRING_FLAG_ 的 enum,用最低的兩個位元來決定情況。

enum {
    LFRING_FLAG_MP = 0x0000 /* Multiple producer */,
    LFRING_FLAG_SP = 0x0001 /* Single producer */,
    LFRING_FLAG_MC = 0x0000 /* Multi consumer */,
    LFRING_FLAG_SC = 0x0002 /* Single consumer */,
};

test_ringbuffer 中可以看到 flags 是作為 lfring_alloc 的參數,會存放在 lfring 中。ROUNDUP_POW2 會將 n_elems 近似至 2 的指數倍,用來決定要分配多少記憶體空間,最後初始化結構體中的成員。

lfring_t *lfring_alloc(uint32_t n_elems, uint32_t flags)
{
    unsigned long ringsz = ROUNDUP_POW2(n_elems);
    if (n_elems == 0 || ringsz == 0 || ringsz > 0x80000000) {
        assert(0 && "invalid number of elements");
        return NULL;
    }
    if ((flags & ~SUPPORTED_FLAGS) != 0) {
        assert(0 && "invalid flags");
        return NULL;
    }

    size_t nbytes = sizeof(lfring_t) + ringsz * sizeof(struct element);
    lfring_t *lfr = osal_alloc(nbytes, CACHE_LINE);
    if (!lfr)
        return NULL;

    lfr->head = 0, lfr->tail = 0;
    lfr->mask = ringsz - 1;
    lfr->flags = flags;
    for (ringidx_t i = 0; i < ringsz; i++) {
        lfr->ring[i].ptr = NULL;
        lfr->ring[i].idx = i - ringsz;
    }
    return lfr;
}

在分配記憶體空間時,使用的是 oscal_alloc,因為 aligment 是 64,因此實際上使用的是 aligned_allocaligned_alloc 會讓非配到的記憶體空間對齊 alignment,舉例來說,這邊的 alignment 是 64,因此初始的記憶體位址最低 6 位都會是 0。
ROUNDUP(a, b) 會讓 a 去近似 b 的倍數。

static inline void *osal_alloc(size_t size, size_t alignment)
{
    return alignment > 1 ? aligned_alloc(alignment, ROUNDUP(size, alignment))
                         : malloc(size);
}

ALIGNED(CACHE_LINE) 會展開成 __attribute__((__aligned__(64)))要求編譯器在分配空間時對齊 64 byte。

typedef uintptr_t ringidx_t;

struct element {
    void *ptr;
    uintptr_t idx;
};

struct lfring {
    ringidx_t head;
    ringidx_t tail ALIGNED(CACHE_LINE);
    uint32_t mask;
    uint32_t flags;
    struct element ring[] ALIGNED(CACHE_LINE);
} ALIGNED(CACHE_LINE);

restrict 關鍵字告訴編譯器所有對該位址的操作都會通過該指標,使編譯器更好的進行優化。
考慮到多執行續的情況,在讀取 lfr->tail 時使用了 __atomic_load_n 來會保證位址在讀取不會被其他執行緒打斷而造成指標的值被影響,可以參考並行程式設計: Atomics 操作Atomic vs. Non-Atomic Operations 來理解當沒有使用 atomic 操作時可能會造成的結果。
tail & mask 是因為 tail 對齊 cache line,所以與 mask 做 AND 可以當作 index 來使用。

/* Enqueue elements at tail */
uint32_t lfring_enqueue(lfring_t *lfr,
                        void *const *restrict elems,
                        uint32_t n_elems)
{
    intptr_t actual = 0;
    ringidx_t mask = lfr->mask;
    ringidx_t size = mask + 1;
    ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_RELAXED);

    if (lfr->flags & LFRING_FLAG_SP) { /* single-producer */
        ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE);
        actual = MIN((intptr_t)(head + size - tail), (intptr_t) n_elems);
        if (actual <= 0)
            return 0;

        for (uint32_t i = 0; i < (uint32_t) actual; i++) {
            assert(lfr->ring[tail & mask].idx == tail - size);
            lfr->ring[tail & mask].ptr = *elems++;
            lfr->ring[tail & mask].idx = tail;
            tail++;
        }
        __atomic_store_n(&lfr->tail, tail, __ATOMIC_RELEASE);
        return (uint32_t) actual;
    }

    /* else: lock-free multi-producer */
restart:
    while ((uint32_t) actual < n_elems &&
           before(tail, __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE) + size)) {
        union {
            struct element e;
            ptrpair_t pp;
        } old, neu;
        void *elem = elems[actual];
        struct element *slot = &lfr->ring[tail & mask];
        old.e.ptr = __atomic_load_n(&slot->ptr, __ATOMIC_RELAXED);
        old.e.idx = __atomic_load_n(&slot->idx, __ATOMIC_RELAXED);
        do {
            if (UNLIKELY(old.e.idx != tail - size)) {
                if (old.e.idx != tail) {
                    /* We are far behind. Restart with fresh index */
                    tail = cond_reload(tail, &lfr->tail);
                    goto restart;
                }
                /* slot already enqueued */
                tail++; /* Try next slot */
                goto restart;
            }

            /* Found slot that was used one lap back.
             * Try to enqueue next element.
             */
            neu.e.ptr = elem;
            neu.e.idx = tail; /* Set idx on enqueue */
        } while (!lf_compare_exchange((ptrpair_t *) slot, &old.pp, neu.pp));

        /* Enqueue succeeded */
        actual++;
        tail++; /* Continue with next slot */
    }
    (void) cond_update(&lfr->tail, tail);
    return (uint32_t) actual;
}

測驗 3