--- tags: linux2022 --- # Lock-free Ringbuffer 實作 contributed by < [Eddielin0926](https://github.com/Eddielin0926) > :::info - [ ] 在探討 lfring 之前,嘗試撰寫 lock-based ring buffer,設計正確性驗證程式 - [ ] 比較 lock-based vs. lock-free: 例如 https://github.com/sysprog21/concurrent-ll ::: ## 觀察 `lfring` 因為對 `lfring` 的 enqueue 和 dequeue 的行為還不熟悉,因此我想先將 `lfring` 的內容印出來觀察。在 `lfring_enqueue` 和 `lfring_dequeue` 回傳前將 `lfring` 先印出來,可以知道在這個測資下,四種情況的 `lfring` 除了 `flag` 以外的內容不會有差異。 ```c void lfring_show(struct lfring *ring) { printf("lfring: head %" PRIuPTR ", tail %" PRIuPTR ", mask %" PRIu32 ", flags %" PRIu32 "\n", ring->head, ring->tail, ring->mask, ring->flags); for (ringidx_t i = 0; i < ring->mask + 1; i++) { printf("lfring[%" PRIuPTR "]: ptr %p, idx %" PRIuPTR "\n", i, ring->ring[i].ptr, ring->ring[i].idx); } printf("\n"); } ``` ## lfring_t `ALIGNED(CACHE_LINE)` 會展開成 `__attribute__((__aligned__(64)))`要求編譯器在分配空間時對齊 64 byte。 ```c typedef uintptr_t ringidx_t; struct element { void *ptr; uintptr_t idx; }; struct lfring { ringidx_t head; ringidx_t tail ALIGNED(CACHE_LINE); uint32_t mask; uint32_t flags; struct element ring[] ALIGNED(CACHE_LINE); } ALIGNED(CACHE_LINE); ``` ## lfring flag 在 `lfring` 有四種情況,分別是**多生產者**、**單一生產者**、**多消費者**和**單一生產者**。 第一位 bit 代表生產者,第二位 bit 代表消費者,舉例來說單一生產者單一消費者就是 `LFRING_FLAG_SP | LFRING_FLAG_SC` 以二進位來說就是 `0b11`。 ```c enum { LFRING_FLAG_MP = 0x0000 /* Multiple producer */, LFRING_FLAG_SP = 0x0001 /* Single producer */, LFRING_FLAG_MC = 0x0000 /* Multi consumer */, LFRING_FLAG_SC = 0x0002 /* Single consumer */, }; ``` ## lfring_alloc `lfring_alloc` 會分配記憶體空間給 `lfring_t`,依據傳入的 `n_elems` 決定陣列大小。`ringsz` 的值是由 `n_elems` 近似至 2 的冪次方,用來決定要分配多少記憶體空間,最小的空間為 2,最大為 2^31。 ```c lfring_t *lfring_alloc(uint32_t n_elems, uint32_t flags) { unsigned long ringsz = ROUNDUP_POW2(n_elems); if (n_elems == 0 || ringsz == 0 || ringsz > 0x80000000) { assert(0 && "invalid number of elements"); return NULL; } if ((flags & ~SUPPORTED_FLAGS) != 0) { assert(0 && "invalid flags"); return NULL; } size_t nbytes = sizeof(lfring_t) + ringsz * sizeof(struct element); lfring_t *lfr = osal_alloc(nbytes, CACHE_LINE); if (!lfr) return NULL; lfr->head = 0, lfr->tail = 0; lfr->mask = ringsz - 1; lfr->flags = flags; for (ringidx_t i = 0; i < ringsz; i++) { lfr->ring[i].ptr = NULL; lfr->ring[i].idx = i - ringsz; } return lfr; } ``` :::success 根據註解,`ringsz > 0x80000000` 應該要是 `ringsz >= 0x80000000` ::: 在分配記憶體空間時,使用的是 `oscal_alloc`,但實際上使用的是 `aligned_alloc`,`aligned_alloc` 會讓分配到的記憶體空間對齊 `alignment`,舉例來說,這邊的 `alignment` 是 `CACHE_LINE` 64,因此初始的記憶體位址最低 6 位都會是 0。 `ROUNDUP(a, b)` 會讓 `a` 去近似 `b` 的倍數。 ```c static inline void *osal_alloc(size_t size, size_t alignment) { return alignment > 1 ? aligned_alloc(alignment, ROUNDUP(size, alignment)) : malloc(size); } ``` ## lfring_enqueue `restrict` 關鍵字告訴編譯器所有對該位址的操作都會通過該指標,使編譯器更好的進行優化。 考慮到多執行續的情況,在讀取 `lfr->tail` 時使用了 `__atomic_load_n` 來會保證位址在讀取不會被其他執行緒打斷而造成指標的值被影響,可以參考[並行程式設計: Atomics 操作](https://hackmd.io/@sysprog/concurrency-atomics#%E8%83%8C%E5%BE%8C%E4%BD%9C%E7%A5%9F%E7%9A%84-cache)和 [Atomic vs. Non-Atomic Operations](https://preshing.com/20130618/atomic-vs-non-atomic-operations/) 來理解當沒有使用 `atomic` 操作時可能會造成的結果。 以第 25 行的 `restart` 為界分為兩個部分,上半部為單一生產者,下半部為多生產者的情況。 ```c= uint32_t lfring_enqueue(lfring_t *lfr, void *const *restrict elems, uint32_t n_elems) { intptr_t actual = 0; ringidx_t mask = lfr->mask; ringidx_t size = mask + 1; ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_RELAXED); if (lfr->flags & LFRING_FLAG_SP) { ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE); actual = MIN((intptr_t)(head + size - tail), (intptr_t) n_elems); if (actual <= 0) return 0; for (uint32_t i = 0; i < (uint32_t) actual; i++) { assert(lfr->ring[tail & mask].idx == tail - size); lfr->ring[tail & mask].ptr = *elems++; lfr->ring[tail & mask].idx = tail; tail++; } __atomic_store_n(&lfr->tail, tail, __ATOMIC_RELEASE); return (uint32_t) actual; } restart: while ((uint32_t) actual < n_elems && before(tail, __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE) + size)) { union { struct element e; ptrpair_t pp; } old, neu; void *elem = elems[actual]; struct element *slot = &lfr->ring[tail & mask]; old.e.ptr = __atomic_load_n(&slot->ptr, __ATOMIC_RELAXED); old.e.idx = __atomic_load_n(&slot->idx, __ATOMIC_RELAXED); do { if (UNLIKELY(old.e.idx != tail - size)) { if (old.e.idx != tail) { tail = cond_reload(tail, &lfr->tail); goto restart; } tail++; goto restart; } neu.e.ptr = elem; neu.e.idx = tail; } while (!lf_compare_exchange((ptrpair_t *) slot, &old.pp, neu.pp)); actual++; tail++; } (void) cond_update(&lfr->tail, tail); return (uint32_t) actual; } ``` ### Single producer `actual` 代表的是 `n_element` 實際上能存放到 `lfring` 的最大數量,`head + size - tail` 能夠成立的原因是因為陣列大小是 2 的冪次方,然後我們的 `head` 和 `tail` 都是不斷增加的,所以 `tail` 會一直大於 `head`,所以 `tail & mask` 就是 ring index。 ### Multiple producer `before(tail, __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE) + size)` 其實就等於 `head + size - tail` 的 atomic 版本,用來檢查是否還有空間存放。 第一步會先將 tail 的 element 的內容放到 old 中,檢查 `old.e.idx` 是否等於 `tail - size` 意思就是這個位置被用掉了,就要找新的 `tail`。 ## lfring_dequeue dequeue 要做的與 enqueue 類似,首先也是要先計算實際能 dequeue 的數量,將 element 保存之後,檢查是否能成功更新 head,失敗的話就重頭再來一遍。當是單一消費者時就不用去檢查 `head` 的更新是否成功。 ```c uint32_t lfring_dequeue(lfring_t *lfr, void **restrict elems, uint32_t n_elems, uint32_t *index) { ringidx_t mask = lfr->mask; intptr_t actual; ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED); ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE); do { actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems); if (UNLIKELY(actual <= 0)) { /* Ring buffer is empty, scan for new but unreleased elements */ tail = find_tail(lfr, head, tail); actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems); if (actual <= 0) return 0; } for (uint32_t i = 0; i < (uint32_t) actual; i++) elems[i] = lfr->ring[(head + i) & mask].ptr; smp_fence(LoadStore); if (UNLIKELY(lfr->flags & LFRING_FLAG_SC)) { __atomic_store_n(&lfr->head, head + actual, __ATOMIC_RELAXED); break; } } while (!__atomic_compare_exchange_n( &lfr->head, &head, head + actual, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); *index = (uint32_t) head; return (uint32_t) actual; } ``` ## Configurable `CACHE_LINE` 利用 `getconf LEVEL1_DCACHE_LINESIZE` 來取得 cache line 大小,使得 `CACHE_LINE` 是可調整的。 - `common.h` ```c #ifndef CACHE_LINE #define CACHE_LINE 64 #endif ``` - `Makefile` ```shell CFLAGS += -DCACHE_LINE=$(shell getconf LEVEL1_DCACHE_LINESIZE) ```