--- tags: linux-summer-2021 --- # 2021q3 Homework3 (lfring) contributed by < `RinHizakura` > > [第 4 週測驗題](https://hackmd.io/@sysprog/linux2021-summer-quiz4) ## 實作 ### 資料結構 ```cpp typedef uintptr_t ringidx_t; struct element { void *ptr; uintptr_t idx; }; struct lfring { ringidx_t head; ringidx_t tail ALIGNED(CACHE_LINE); uint32_t mask; uint32_t flags; struct element ring[] ALIGNED(CACHE_LINE); } ALIGNED(CACHE_LINE); ``` * `head` / `tail`: ringbuf 中消費者與生產者在 buffer 中的操作 index * `mask`: 由於 ring buffer 的大小 (`ringsz`) 被強制定義為 $2^n$ ($n$ 為整數),可以通過 `mask == ringsz - 1` 的特性用 bitwise 操作去取代除/取餘數操作,優化計算的時間 * `flags`: 標記 buffer 的類型(單/多, 生產者/消費者) * `ring`: buffer 中的元素單位,動態分配其大小 :::info 應該可以把 `uintptr_t idx` 改成 `ringidx_t idx` 以方便擴充 ::: ### `lfring_enqueue` ```cpp /* Enqueue elements at tail */ uint32_t lfring_enqueue(lfring_t *lfr, void *const *restrict elems, uint32_t n_elems) { intptr_t actual = 0; ringidx_t mask = lfr->mask; ringidx_t size = mask + 1; ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_RELAXED); if (lfr->flags & LFRING_FLAG_SP) { /* single-producer */ ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE); actual = MIN((intptr_t)(head + size - tail), (intptr_t) n_elems); if (actual <= 0) return 0; for (uint32_t i = 0; i < (uint32_t) actual; i++) { assert(lfr->ring[tail & mask].idx == tail - size); lfr->ring[tail & mask].ptr = *elems++; lfr->ring[tail & mask].idx = tail; tail++; } __atomic_store_n(&lfr->tail, tail, __ATOMIC_RELEASE); return (uint32_t) actual; } ``` :::info [restrict type qualifier](https://en.cppreference.com/w/c/language/restrict) ::: enqueue 是生產者執行緒的核心操作。 在單生產者的情境下,寫入 buffer 的 thread 只有一個,因此可以不需對 buffer 的寫入做 CAS 操作,只要直接計算需插入的數量 `actual`(根據實際剩餘大小調整),並且更新 buffer 即可。消費者端則可以由 `lfr->tail` 的變化得知 buffer 的更新。 在多生產者的情況下,需要考慮生產者之間的同步正確性。在最單純的實作中,buffer slot 和 `lfr->tail` 的更新可能需要 atomically 的進行以達預期的正確性。思考有生產者 A 和 B 的情境,兩者分別要找到一個 availible slot 並且更新 tail,如果程式的運行時序如下: 1. 生產者 A 生產 object 到 slot index 1 2. 生產者 B 生產 object 到 slot index 2 3. 生產者 B 更新 `lfr->tail` 到 index 2 4. 生產者 A 更新 `lfr->tail` 到 index 1 可以發現這導致了 `lfr->tail` 被更新成了非預期的值。 也許可以通過同步的更新 slot 和 `lfr->tail` (Double CAS?) 以滿足正確性? 但這裡採用另外一種策略: 只要確保 slot 的更新是按照順序 (通過 `struct element` 的 `idx`),而 `lfr->tail` 的更新與否則可以由 thread 判斷是否在自己之後有其他的 thread 完成生產且預先更新 `lfr->tail`。回顧前面的情境: 1. 生產者 A 生產 object 到 slot index 1 2. 生產者 B 生產 object 到 slot index 2 3. 生產者 B 更新 `lfr->tail` 到 index 2 4. 生產者 A 想更新 `lfr->tail` 到 index 1,但生產者 A 得知存在另一生產者在自己之後生產 object,且已經更新 `lfr->tail`,則不必再更新之 關鍵是,生產者 A 通過甚麼方法得知「存在另一個生產者在自己之後生產 object」? 這裡需要理解目前 lfring 的 設計。程式的 `head` / `tail` 實際值上限並不會對應 ring buffer 的大小,而是會不斷累計,且通過取餘數 index 至正確的 buffer 位置。舉例來說,假設 buffer 的大小是 16,則 `tail == 18` 時會 index 到的位置是 18 & 0xf = 2。這有甚麼好處呢? 其一,這個方法良好確保了 ring buffer 應具有的 ring 特性;其二,`head` / `tail` 的大小關係表示對 buffer 的操作時序,可以從其數值得知 lap buffer 多少次。例如,假設 `tail == 2` 是第 1 個 lap: * `tail == 18` 時,$18 = 16 * 1 + 2$,是 index 到位置 2 的第 2 個 lap * `tail == 82` 時,$82 = 16 * 5 + 2$,是 index 到位置 2 的第 6 個 lap 藉由此設計,再搭配 `struct element` 的 `idx` member,程式得以正確同步,透過 `head` / `tail` 的大小直接判斷操作的時序先後 (也就是 `before` 函式)。 然而,為甚麼我認為這樣設計可能是有缺陷的呢? 因為此設計不允許 `lfr->tail` / `lfr->head` wrapping add 回 0,否則僅通過 `ringidx_t` 大小判斷先後的 `before` 將出錯。但這就代表可以 lap ring buffer 的次數是有極限的,如果 `ringidx_t` 可以表示的數字上限很小(如 `uint8_t`),該 ring buffer 是很難滿足實際的使用需求的。 :::warning 上述說法的正確性需要實驗驗證,不過需要實驗中 $2^{64}-1$ 以上的 enqueue + dequeue 有些耗時,如果直接更動 `ringidx` 的話需要仔細思考型別的調整 * 我感覺目前程式碼型別不夠 general? 應該要讓 `ringidx_t` typedef 如果更動成 `uintptr_t` 以外的型別時也可以符合預期 ::: 但假設 enqueue 操作次數不會超出 `ringidx_t` 的上限,或者想像 `ringidx_t` 是不存在上限的,此程式如此設計的正確性就會被滿足。 ```cpp /* else: lock-free multi-producer */ restart: while ((uint32_t) actual < n_elems && before(tail, __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE) + size)) { union { struct element e; ptrpair_t pp; } old, neu; void *elem = elems[actual]; struct element *slot = &lfr->ring[tail & mask]; old.e.ptr = __atomic_load_n(&slot->ptr, __ATOMIC_RELAXED); old.e.idx = __atomic_load_n(&slot->idx, __ATOMIC_RELAXED); ``` 回頭閱讀程式碼。在多生產者的情境下,生產者執行緒要找到可能為空的 slot (剛初始化好或者已經被消費掉),需要從 `tail` 搜尋到 `head + size - 1`,後者是當前 `tail` 所允許的最大值,因為如果 `tail == head + size - 1`,代表 buffer 已滿。 ```cpp do { if (UNLIKELY(old.e.idx != tail - size)) { if (old.e.idx != tail) { /* We are far behind. Restart with fresh index */ tail = cond_reload(tail, &lfr->tail); goto restart; } /* slot already enqueued */ tail++; /* Try next slot */ goto restart; } /* Found slot that was used one lap back. * Try to enqueue next element. */ neu.e.ptr = elem; neu.e.idx = tail; /* Set idx on enqueue */ } while (!lf_compare_exchange((ptrpair_t *) slot, &old.pp, neu.pp)); /* Enqueue succeeded */ actual++; tail++; /* Continue with next slot */ } (void) cond_update(&lfr->tail, tail); return (uint32_t) actual; } ``` 在前述的 `tail` 和 `head` 的關係下,我們可以巧妙的藉由 `tail` 和 `struct element` 的 `idx` member 之關係進行同步(注意 `lfring_alloc` 時 `idx` 的初始化方式,以及每次更新 slot 時填入的 `idx`),兩者的關係分別可能為: 1. `slot.idx == tail - size`: slot 的狀態為空 * 表示位置 `idx` 的 slot 在這個 lap 還未被寫入 * 且上個 lap 在此 slot 所生產的 object 必然已經被消費掉 (只從 `tail` 搜尋到 `head + size - 1` 確保了這點) 2. `slot.idx == tail`: slot 已經被其他 thread 生產新的 object (但還位來得及更新 `tail`) * 則找尋下一個 slot (`tail++`) 3. 其他情況: thread 的 local `tail` 進度落後數個 lap(可能被其他 thread 搶佔),從 `lfr->tail` 重新載入 因此,我們可以通過判斷此關係找到合適的 slot,並且 CAS 操作嘗試更新之。 * 最後的 `cond_update` 則透過 `lfr->tail` 和 local `tail` 的大小關係,得知我們之前提到的 「是否存在另一個生產者在自己之後生產 object」,並正確的更新 `lfr->tail` ### `lfring_dequeue` ```cpp uint32_t lfring_dequeue(lfring_t *lfr, void **restrict elems, uint32_t n_elems, uint32_t *index) { ringidx_t mask = lfr->mask; intptr_t actual; ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED); ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE); do { actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems); if (UNLIKELY(actual <= 0)) { /* Ring buffer is empty, scan for new but unreleased elements */ tail = find_tail(lfr, head, tail); actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems); if (actual <= 0) return 0; } for (uint32_t i = 0; i < (uint32_t) actual; i++) elems[i] = lfr->ring[(head + i) & mask].ptr; smp_fence(LoadStore); // Order loads only if (UNLIKELY(lfr->flags & LFRING_FLAG_SC)) { /* Single-consumer */ __atomic_store_n(&lfr->head, head + actual, __ATOMIC_RELAXED); break; } /* else: lock-free multi-consumer */ } while (!__atomic_compare_exchange_n( &lfr->head, &head, /* Updated on failure */ head + actual, /* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); *index = (uint32_t) head; return (uint32_t) actual; } ``` 對於消費者的操作,如果是單消費者,只要正確的取得 `lfr->tail` 就可以知道單 / 多生產者情境下可取用的 slot,消費之(放到 `elems` 中)並且更新 `lfr->head` 即可。多消費者的情況下則要 CAS 的更新 `lfr->head`,以確保只有一個消費者競爭到同一個 object。 :::info 可以發現目前的設計會一次性的嘗試消費連續 `actual` 個,而不是一個個找尋可以消費的 object 直到滿足 `actual`。如果更換成後者之實作可能有甚麼優點或缺點嗎? ::: ```cpp static inline ringidx_t find_tail(lfring_t *lfr, ringidx_t head, ringidx_t tail) { if (lfr->flags & LFRING_FLAG_SP) /* single-producer enqueue */ return __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE); /* Multi-producer enqueue. * Scan ring for new elements that have been written but not released. */ ringidx_t mask = lfr->mask; ringidx_t size = mask + 1; while (before(tail, head + size) && __atomic_load_n(&lfr->ring[tail & mask].idx, __ATOMIC_ACQUIRE) == tail) tail++; tail = cond_update(&lfr->tail, tail); return tail; } ``` 另一個是需要深入探討的點是 `find_tail`。在多生產的情境下,我們知道會存在已經生產但尚未更新 `lfr->tail` 的狀況。但因為生產者更新 slot 時會更新 `struct element` 的 `idx` 的原因,因此可以在消費者的 thread 藉此提早更新 `lfr->tail`,即 `find_tail` 的作用。 ### `lf_compare_exchange` ```cpp union u128 { struct { uint64_t lo, hi; } s; __int128 ui; }; static inline bool lf_compare_exchange(register __int128 *var, __int128 *exp, __int128 neu) { union u128 cmp = {.ui = *exp}, with = {.ui = neu}; bool ret; __asm__ __volatile__("lock cmpxchg16b %1\n\tsetz %0" : "=q"(ret), "+m"(*var), "+d"(cmp.s.hi), "+a"(cmp.s.lo) : "c"(with.s.hi), "b"(with.s.lo) : "cc", "memory"); if (unlikely(!ret)) *exp = cmp.ui; return ret; } ``` 為了多生產者的使用情境,需要做到以 slots 為單位的 compare-and-swap。`lfring` 中運用 slot 的大小剛好是一個 8 bytes 指標和一個 8 bytes 整數的性質,將 `struct element` 轉型成 128 bits 的整數,再用 128 bits 的 compare-and-swap instruction (剛好 x86-64 上有支援) 來實作之。 inline assembly 可以理解為: ``` lock cmpxchg16b [var] setz [ret] ``` * `"+d"(cmp.s.hi), "+a"(cmp.s.lo)`: 對於 cmpxchg16b,要 compare 是否相同的數值,其高 64 bits 放在 rdx 且低 64 bits 放在 rax * `"c"(with.s.hi), "b"(with.s.lo)`: 要 swap 的數值,其高 64 bits 放在 rcx 而低 64 bits 放在 rbx * 如果 `var` 與要比較的數值相同, zero flag 會被設置,且 [rcx:rbx] 會被複製到 `var` 中,否則 zero flag 會被清零,並將 [rdx:rax] 的數值設為 `var` > `=` identifies an operand which is only written; `+` identifies an operand that is both read and written; all other operands are assumed to only be read. * `cc`: 標示 assembly codes 會對 flag register 產生影響 * [What happens in the assembly output when we add “cc” to clobber list](https://stackoverflow.com/questions/59656857/what-happens-in-the-assembly-output-when-we-add-cc-to-clobber-list) * `memory`: 產生 compiler barrier,詳見下一章節 > * [關於GNU Inline Assembly](http://wen00072.github.io/blog/2015/12/10/about-inline-asm/) > * [6.47 How to Use Inline Assembly Language in C Code](https://gcc.gnu.org/onlinedocs/gcc/Using-Assembly-Language-with-C.html#Using-Assembly-Language-with-C) ### Memory Barrier ```cpp #if defined(__x86_64__) static inline void smp_fence(unsigned int mask) { if ((mask & StoreLoad) == StoreLoad) { __asm__ volatile("mfence" ::: "memory"); } else if (mask != 0) { /* Any fence but StoreLoad */ __asm__ volatile("" ::: "memory"); } } #else #error "Unsupported architecture" #endif ``` `lfring` 中的 `smp_fence` 用來插入 memory barrier,可以看到被分為了兩種: ```cpp __asm__ volatile("" ::: "memory"); ``` 如上所展示為 compiler barrier,其效果簡而言之,是讓 barrier 前的程式碼不能被重排到 barrier 之後,反之亦然。實際上 compiler barrier 不產生任何的指令,只是用來約束編譯器的行為。 ```cpp __asm__ volatile("mfence" ::: "memory"); ``` 如上則是 cpu barrier,其使得 CPU 不會將 barrier 前後的指令進行重排。普遍常見的 CPU 中,對於前後數據有依賴 (dependency) 關係的指令,CPU是不會去做重排的 (Alpha 除外)。而以 x86 的 memory-ordering model 而言,只有 store 指令後接 load (StoreLoad) 可以被重排,這也是甚麼這裡只有針對 StoreLoad 時需要插入 CPU barrier,其他狀況下只要插入 compiler barrier 即可。 > * [Weak vs. Strong Memory Models](https://preshing.com/20120930/weak-vs-strong-memory-models/) > * [浅谈Memory Reordering](http://dreamrunner.org/blog/2014/06/28/qian-tan-memory-reordering/) ## Kfifo [Linux ring buffer(kfifo)](https://hackmd.io/@RinHizakura/SJra5Mfjt) ## klfring [`klfring`](https://github.com/RinHizakura/klfring) 嘗試將 `lfring` 移植到 Linux 核心,此單元記錄一些移植過程需考量的議題以及修正方案。 ### `aligned_alloc` `lfring` 通過對齊的策略試圖讓對 ring buffer 的存取可以考慮到 cache 所帶來的效益,關鍵是結合 `__attribute__((__aligned__(x)))` keyword 和 `aligned_alloc` library call 的使用。然而 kernel 中並不存在像是 `kmalloc_aligned` 的 API (儘管有可能在未來實現的相關討論)。 參照 [Alignment guarantees for kmalloc()](https://lwn.net/Articles/787740/),其中提及 `kmalloc` 對於配置空間的對齊保證: > Natural alignment for an object means that its beginning address is a multiple of each of up to three different quantities: `ARCH_KMALLOC_MINALIGN` (eight bytes, by default), the cache-line size (for larger objects), and the size of the object itself when that size is a power of two. The actual required alignment will generally be the least common multiple of the three. > Most of the time, Babka said, kmalloc() already returns naturally aligned objects for a power-of-two allocation size; that result falls out of how the slab pages are laid out. But there are exceptions: when SLUB debugging is enabled or when the SLOB allocator is used. 參閱 `kmalloc` 在文件中的敘述: > [kmalloc](https://www.kernel.org/doc/html/latest/core-api/mm-api.html#c.kmalloc): The allocated object address is aligned to at least `ARCH_KMALLOC_MINALIGN` bytes 總結而言,`kmalloc` 所分配的空間在某些情境下是可能沒對齊 cache 的,可能導致 false sharing 問題,暫時未針對此問題進行解決。 ### Atomic operation 在 Linux 中直接使用 gcc extension 的 atomic operation ([Built-in functions for memory model aware atomic operations](https://gcc.gnu.org/onlinedocs/gcc-4.7.2/gcc/_005f_005fatomic-Builtins.html)),我記得是存在相關疑慮的? > * [C11 atomic variables and the kernel](https://lwn.net/Articles/586838/): February 18, 2014 > * [Time to move to C11 atomics?](https://lwn.net/Articles/691128/): June 15, 2016 於是我嘗試用 kernel 中的 atomic API 進行改寫,這應該會是比較合適的作法。我將結構中的 `head` 和 `tail` 改成 `atomic_long_t`。並透過相關 API 去對其進行操作。比較需要注意的地方是要修正某些變數的型別 (需要把透過特殊 API 操作的 `atomic_long_t` 和實際數值的 `unsigned long` 分開)。 * `long` 長度根據 [64-bit data models](https://en.wikipedia.org/wiki/64-bit_computing#64-bit_data_models) 在 64 位元的 Linux 上應為 64 bits ```cpp typedef atomic_long_t ringidx_t; typedef unsigned long __ringidx_t; ``` 要注意到型別的變動可能會影響 `klfring` 和 `lfring` 對輸入的有效數值差異,特別注意到 kernel 文件中提到: > While atomic_t, atomic_long_t and atomic64_t use int, long and s64 respectively (for hysterical raisins), the kernel uses -fno-strict-overflow (which implies -fwrapv) and defines signed overflow to behave like 2s-complement. Therefore, an explicitly unsigned variant of the atomic ops is strictly unnecessary and we can simply cast, there is no UB. 移植到 kernel 時可以修改的地方之一是 memory barrier 的使用,可以參照 > * [linux/arch/x86/include/asm/barrier.h](https://github.com/torvalds/linux/blob/master/arch/x86/include/asm/barrier.h) > * [linux/Documentation/memory-barriers.txt](https://github.com/torvalds/linux/blob/master/Documentation/memory-barriers.txt) > The Linux kernel has an explicit compiler barrier function that prevents the compiler from moving the memory accesses either side of it to the other side: ``` barrier(); ``` 此外也要將 atomic operation 使用 kernel API 進行替換,此部份則參照: > * [Semantics and Behavior of Atomic and Bitmask Operations](https://www.kernel.org/doc/html/v4.12/core-api/atomic_ops.html) > * [linux/Documentation/atomic_t.txt](https://github.com/torvalds/linux/blob/master/Documentation/atomic_t.txt) > * [Linux-Kernel Memory Model](http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p0124r5.html) ### CAS for buffer slot 要支援多生產者的話,除了基本的 64 位元的 CAS,還需考慮如合對整個 buffer slot 進行 CAS 操作,目前的作法是把 `lfring` 的 [`lf_compare_exchange`](#lf_compare_exchange) 實作直接搬至 kernel 中使用: * `atomic_long_t` 為 8 bytes 長度,因此 `klfring` 和 `lfring` 的 `struct element` 有相同大小,`lf_compare_exchange` 應足以滿足需求 :::info `atomic_t` 將 int 封裝在 struct 中,因此其大小與 int 相同 ```cpp typedef struct { int counter; } atomic_t; ``` 其作用只是讓編譯器去確保對此型別的操作需使用 atomic API。 ::: ### check * [ktsan](https://github.com/google/ktsan) * [KCSAN(1)](https://github.com/google/ktsan/wiki/KCSAN) * [KCSAN(2)](https://www.kernel.org/doc/html/latest/dev-tools/kcsan.html) * for Linux 5.8 up ### TODO list 直至 commit [`b59abf6`](https://github.com/RinHizakura/klfring/commit/b59abf6f5647f6aa7bb39640cac2846c8d755cc1) 為止,完成基本將 `lfring` 移植到 kernel 的目標。 - [ ] 透過更複雜的測資驗證實作的正確性 - [ ] 需考慮是否需要額外的 memory barrier 以確保不會發生非預期的重排 - [ ] 考慮修改為更寬鬆的 memory model(relaxed / acquire-release / comsume-release) - [ ] `lfring` 的限制(lap ring buffer 的次數限制 / 只支援 x86-64 架構)在仍然存在 `klfring`,需思考是否有合適的方法可以一並解決 ## Note * [[RFC] lfring: lock-free ring buffer](https://patches.dpdk.org/project/dpdk/patch/1548678513-14348-1-git-send-email-ola.liljedahl@arm.com/) * [读写一气呵成 - Linux中的原子操作](https://zhuanlan.zhihu.com/p/89299392) * [对优化说不 - Linux中的Barrier](https://zhuanlan.zhihu.com/p/96001570)