--- tags: linux2022 --- # 2022q1 Homework5 (quiz8) contributed by < [Eddielin0926](https://github.com/Eddielin0926) > > [第八週測驗題](https://hackmd.io/@sysprog/linux2022-quiz8) ## 測驗 1 --- ## [測驗 2](https://hackmd.io/@sysprog/rkQMKQu7c) lfring [`lfring`](https://github.com/Eddielin0926/lfring) 是個 lock-free ring buffer 實作,並支援 multiple-producer/multiple-consumer (MPMC) ```c static inline ringidx_t cond_reload(ringidx_t idx, const ringidx_t *loc) { ringidx_t fresh = __atomic_load_n(loc, __ATOMIC_RELAXED); if (before(idx, fresh)) { /* fresh is after idx, use this instead */ idx = fresh; } else { /* Continue with next slot */ idx++; /* XXXXX DDD; */ } return idx; } static inline ringidx_t find_tail(lfring_t *lfr, ringidx_t head, ringidx_t tail) { if (lfr->flags & LFRING_FLAG_SP) /* single-producer enqueue */ return __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE); /* Multi-producer enqueue. * Scan ring for new elements that have been written but not released. */ ringidx_t mask = lfr->mask; ringidx_t size = mask + 1; /* XXXXX KKK; */ while (before(tail, head + size) && __atomic_load_n(&lfr->ring[tail & mask].idx, __ATOMIC_ACQUIRE) == tail) tail++; tail = cond_update(&lfr->tail, tail); return tail; } uint32_t lfring_dequeue(lfring_t *lfr, void **restrict elems, uint32_t n_elems, uint32_t *index) { ringidx_t mask = lfr->mask; intptr_t actual; ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED); ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE); do { /* skipped */ } while (!__atomic_compare_exchange_n( &lfr->head, &head, /* Updated on failure */ /* XXXXX */ HHH, /* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); *index = (uint32_t) head; return (uint32_t) actual; } ``` | Question | Answer | |:-------- |:----------------------------------------------- | | DDD | `idx++;` | | KKK | `mask + 1` | | TTT | `&lfr->ring[tail & mask].idx, __ATOMIC_ACQUIRE` | | HHH | `head + actual` | ### 解釋上述程式碼運作原理 這個程式是[生產者消費者問題](https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem)的實作,以有限環狀鏈結串列解決緩衝區問題。 從 `tests.c` 的 `main` 開始理解,可以看到 `main` 是由四種測試狀況構成的,分別是 multiple producer multiple consumer (MPMC), multiple producer single consumer (MPSC), single producer multiple consumer (SPMC), single producer single consumer (SPSC)。 ```c int main(void) { printf("testing MPMC lock-free ring\n"); test_ringbuffer(LFRING_FLAG_MP | LFRING_FLAG_MC); printf("testing MPSC lock-free ring\n"); test_ringbuffer(LFRING_FLAG_MP | LFRING_FLAG_SC); printf("testing SPMC lock-free ring\n"); test_ringbuffer(LFRING_FLAG_SP | LFRING_FLAG_MC); printf("testing SPSC lock-free ring\n"); test_ringbuffer(LFRING_FLAG_SP | LFRING_FLAG_SC); return 0; } ``` `test_ringbuffer` 的參數 `flag` 會決定測試的生產者消費者組合,有四種前綴是 `LFRING_FLAG_` 的 enum,用最低的兩個位元來決定情況。 ```c enum { LFRING_FLAG_MP = 0x0000 /* Multiple producer */, LFRING_FLAG_SP = 0x0001 /* Single producer */, LFRING_FLAG_MC = 0x0000 /* Multi consumer */, LFRING_FLAG_SC = 0x0002 /* Single consumer */, }; ``` 在 `test_ringbuffer` 中可以看到 `flags` 是作為 `lfring_alloc` 的參數,會存放在 `lfring` 中。`ROUNDUP_POW2` 會將 `n_elems` 近似至 2 的指數倍,用來決定要分配多少記憶體空間,最後初始化結構體中的成員。 ```c lfring_t *lfring_alloc(uint32_t n_elems, uint32_t flags) { unsigned long ringsz = ROUNDUP_POW2(n_elems); if (n_elems == 0 || ringsz == 0 || ringsz > 0x80000000) { assert(0 && "invalid number of elements"); return NULL; } if ((flags & ~SUPPORTED_FLAGS) != 0) { assert(0 && "invalid flags"); return NULL; } size_t nbytes = sizeof(lfring_t) + ringsz * sizeof(struct element); lfring_t *lfr = osal_alloc(nbytes, CACHE_LINE); if (!lfr) return NULL; lfr->head = 0, lfr->tail = 0; lfr->mask = ringsz - 1; lfr->flags = flags; for (ringidx_t i = 0; i < ringsz; i++) { lfr->ring[i].ptr = NULL; lfr->ring[i].idx = i - ringsz; } return lfr; } ``` 在分配記憶體空間時,使用的是 `oscal_alloc`,因為 `aligment` 是 64,因此實際上使用的是 `aligned_alloc`,`aligned_alloc` 會讓非配到的記憶體空間對齊 `alignment`,舉例來說,這邊的 `alignment` 是 64,因此初始的記憶體位址最低 6 位都會是 0。 `ROUNDUP(a, b)` 會讓 `a` 去近似 `b` 的倍數。 ```c static inline void *osal_alloc(size_t size, size_t alignment) { return alignment > 1 ? aligned_alloc(alignment, ROUNDUP(size, alignment)) : malloc(size); } ``` `ALIGNED(CACHE_LINE)` 會展開成 `__attribute__((__aligned__(64)))`要求編譯器在分配空間時對齊 64 byte。 ```c typedef uintptr_t ringidx_t; struct element { void *ptr; uintptr_t idx; }; struct lfring { ringidx_t head; ringidx_t tail ALIGNED(CACHE_LINE); uint32_t mask; uint32_t flags; struct element ring[] ALIGNED(CACHE_LINE); } ALIGNED(CACHE_LINE); ``` `restrict` 關鍵字告訴編譯器所有對該位址的操作都會通過該指標,使編譯器更好的進行優化。 考慮到多執行續的情況,在讀取 `lfr->tail` 時使用了 `__atomic_load_n` 來會保證位址在讀取不會被其他執行緒打斷而造成指標的值被影響,可以參考[並行程式設計: Atomics 操作](https://hackmd.io/@sysprog/concurrency-atomics#%E8%83%8C%E5%BE%8C%E4%BD%9C%E7%A5%9F%E7%9A%84-cache)和 [Atomic vs. Non-Atomic Operations](https://preshing.com/20130618/atomic-vs-non-atomic-operations/) 來理解當沒有使用 `atomic` 操作時可能會造成的結果。 `tail & mask` 是因為 `tail` 對齊 cache line,所以與 `mask` 做 AND 可以當作 index 來使用。 ```c /* Enqueue elements at tail */ uint32_t lfring_enqueue(lfring_t *lfr, void *const *restrict elems, uint32_t n_elems) { intptr_t actual = 0; ringidx_t mask = lfr->mask; ringidx_t size = mask + 1; ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_RELAXED); if (lfr->flags & LFRING_FLAG_SP) { /* single-producer */ ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE); actual = MIN((intptr_t)(head + size - tail), (intptr_t) n_elems); if (actual <= 0) return 0; for (uint32_t i = 0; i < (uint32_t) actual; i++) { assert(lfr->ring[tail & mask].idx == tail - size); lfr->ring[tail & mask].ptr = *elems++; lfr->ring[tail & mask].idx = tail; tail++; } __atomic_store_n(&lfr->tail, tail, __ATOMIC_RELEASE); return (uint32_t) actual; } /* else: lock-free multi-producer */ restart: while ((uint32_t) actual < n_elems && before(tail, __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE) + size)) { union { struct element e; ptrpair_t pp; } old, neu; void *elem = elems[actual]; struct element *slot = &lfr->ring[tail & mask]; old.e.ptr = __atomic_load_n(&slot->ptr, __ATOMIC_RELAXED); old.e.idx = __atomic_load_n(&slot->idx, __ATOMIC_RELAXED); do { if (UNLIKELY(old.e.idx != tail - size)) { if (old.e.idx != tail) { /* We are far behind. Restart with fresh index */ tail = cond_reload(tail, &lfr->tail); goto restart; } /* slot already enqueued */ tail++; /* Try next slot */ goto restart; } /* Found slot that was used one lap back. * Try to enqueue next element. */ neu.e.ptr = elem; neu.e.idx = tail; /* Set idx on enqueue */ } while (!lf_compare_exchange((ptrpair_t *) slot, &old.pp, neu.pp)); /* Enqueue succeeded */ actual++; tail++; /* Continue with next slot */ } (void) cond_update(&lfr->tail, tail); return (uint32_t) actual; } ``` ## 測驗 3