---
tags: linux-summer-2021
---
# 2021q3 Homework3 (lfring)
contributed by < `RinHizakura` >
> [第 4 週測驗題](https://hackmd.io/@sysprog/linux2021-summer-quiz4)
## 實作
### 資料結構
```cpp
typedef uintptr_t ringidx_t;
struct element {
void *ptr;
uintptr_t idx;
};
struct lfring {
ringidx_t head;
ringidx_t tail ALIGNED(CACHE_LINE);
uint32_t mask;
uint32_t flags;
struct element ring[] ALIGNED(CACHE_LINE);
} ALIGNED(CACHE_LINE);
```
* `head` / `tail`: ringbuf 中消費者與生產者在 buffer 中的操作 index
* `mask`: 由於 ring buffer 的大小 (`ringsz`) 被強制定義為 $2^n$ ($n$ 為整數),可以通過 `mask == ringsz - 1` 的特性用 bitwise 操作去取代除/取餘數操作,優化計算的時間
* `flags`: 標記 buffer 的類型(單/多, 生產者/消費者)
* `ring`: buffer 中的元素單位,動態分配其大小
:::info
應該可以把 `uintptr_t idx` 改成 `ringidx_t idx` 以方便擴充
:::
### `lfring_enqueue`
```cpp
/* Enqueue elements at tail */
uint32_t lfring_enqueue(lfring_t *lfr,
void *const *restrict elems,
uint32_t n_elems)
{
intptr_t actual = 0;
ringidx_t mask = lfr->mask;
ringidx_t size = mask + 1;
ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_RELAXED);
if (lfr->flags & LFRING_FLAG_SP) { /* single-producer */
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE);
actual = MIN((intptr_t)(head + size - tail), (intptr_t) n_elems);
if (actual <= 0)
return 0;
for (uint32_t i = 0; i < (uint32_t) actual; i++) {
assert(lfr->ring[tail & mask].idx == tail - size);
lfr->ring[tail & mask].ptr = *elems++;
lfr->ring[tail & mask].idx = tail;
tail++;
}
__atomic_store_n(&lfr->tail, tail, __ATOMIC_RELEASE);
return (uint32_t) actual;
}
```
:::info
[restrict type qualifier](https://en.cppreference.com/w/c/language/restrict)
:::
enqueue 是生產者執行緒的核心操作。
在單生產者的情境下,寫入 buffer 的 thread 只有一個,因此可以不需對 buffer 的寫入做 CAS 操作,只要直接計算需插入的數量 `actual`(根據實際剩餘大小調整),並且更新 buffer 即可。消費者端則可以由 `lfr->tail` 的變化得知 buffer 的更新。
在多生產者的情況下,需要考慮生產者之間的同步正確性。在最單純的實作中,buffer slot 和 `lfr->tail` 的更新可能需要 atomically 的進行以達預期的正確性。思考有生產者 A 和 B 的情境,兩者分別要找到一個 availible slot 並且更新 tail,如果程式的運行時序如下:
1. 生產者 A 生產 object 到 slot index 1
2. 生產者 B 生產 object 到 slot index 2
3. 生產者 B 更新 `lfr->tail` 到 index 2
4. 生產者 A 更新 `lfr->tail` 到 index 1
可以發現這導致了 `lfr->tail` 被更新成了非預期的值。
也許可以通過同步的更新 slot 和 `lfr->tail` (Double CAS?) 以滿足正確性? 但這裡採用另外一種策略: 只要確保 slot 的更新是按照順序 (通過 `struct element` 的 `idx`),而 `lfr->tail` 的更新與否則可以由 thread 判斷是否在自己之後有其他的 thread 完成生產且預先更新 `lfr->tail`。回顧前面的情境:
1. 生產者 A 生產 object 到 slot index 1
2. 生產者 B 生產 object 到 slot index 2
3. 生產者 B 更新 `lfr->tail` 到 index 2
4. 生產者 A 想更新 `lfr->tail` 到 index 1,但生產者 A 得知存在另一生產者在自己之後生產 object,且已經更新 `lfr->tail`,則不必再更新之
關鍵是,生產者 A 通過甚麼方法得知「存在另一個生產者在自己之後生產 object」?
這裡需要理解目前 lfring 的 設計。程式的 `head` / `tail` 實際值上限並不會對應 ring buffer 的大小,而是會不斷累計,且通過取餘數 index 至正確的 buffer 位置。舉例來說,假設 buffer 的大小是 16,則 `tail == 18` 時會 index 到的位置是 18 & 0xf = 2。這有甚麼好處呢? 其一,這個方法良好確保了 ring buffer 應具有的 ring 特性;其二,`head` / `tail` 的大小關係表示對 buffer 的操作時序,可以從其數值得知 lap buffer 多少次。例如,假設 `tail == 2` 是第 1 個 lap:
* `tail == 18` 時,$18 = 16 * 1 + 2$,是 index 到位置 2 的第 2 個 lap
* `tail == 82` 時,$82 = 16 * 5 + 2$,是 index 到位置 2 的第 6 個 lap
藉由此設計,再搭配 `struct element` 的 `idx` member,程式得以正確同步,透過 `head` / `tail` 的大小直接判斷操作的時序先後 (也就是 `before` 函式)。
然而,為甚麼我認為這樣設計可能是有缺陷的呢? 因為此設計不允許 `lfr->tail` / `lfr->head` wrapping add 回 0,否則僅通過 `ringidx_t` 大小判斷先後的 `before` 將出錯。但這就代表可以 lap ring buffer 的次數是有極限的,如果 `ringidx_t` 可以表示的數字上限很小(如 `uint8_t`),該 ring buffer 是很難滿足實際的使用需求的。
:::warning
上述說法的正確性需要實驗驗證,不過需要實驗中 $2^{64}-1$ 以上的 enqueue + dequeue 有些耗時,如果直接更動 `ringidx` 的話需要仔細思考型別的調整
* 我感覺目前程式碼型別不夠 general? 應該要讓 `ringidx_t` typedef 如果更動成 `uintptr_t` 以外的型別時也可以符合預期
:::
但假設 enqueue 操作次數不會超出 `ringidx_t` 的上限,或者想像 `ringidx_t` 是不存在上限的,此程式如此設計的正確性就會被滿足。
```cpp
/* else: lock-free multi-producer */
restart:
while ((uint32_t) actual < n_elems &&
before(tail, __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE) + size)) {
union {
struct element e;
ptrpair_t pp;
} old, neu;
void *elem = elems[actual];
struct element *slot = &lfr->ring[tail & mask];
old.e.ptr = __atomic_load_n(&slot->ptr, __ATOMIC_RELAXED);
old.e.idx = __atomic_load_n(&slot->idx, __ATOMIC_RELAXED);
```
回頭閱讀程式碼。在多生產者的情境下,生產者執行緒要找到可能為空的 slot (剛初始化好或者已經被消費掉),需要從 `tail` 搜尋到 `head + size - 1`,後者是當前 `tail` 所允許的最大值,因為如果 `tail == head + size - 1`,代表 buffer 已滿。
```cpp
do {
if (UNLIKELY(old.e.idx != tail - size)) {
if (old.e.idx != tail) {
/* We are far behind. Restart with fresh index */
tail = cond_reload(tail, &lfr->tail);
goto restart;
}
/* slot already enqueued */
tail++; /* Try next slot */
goto restart;
}
/* Found slot that was used one lap back.
* Try to enqueue next element.
*/
neu.e.ptr = elem;
neu.e.idx = tail; /* Set idx on enqueue */
} while (!lf_compare_exchange((ptrpair_t *) slot, &old.pp, neu.pp));
/* Enqueue succeeded */
actual++;
tail++; /* Continue with next slot */
}
(void) cond_update(&lfr->tail, tail);
return (uint32_t) actual;
}
```
在前述的 `tail` 和 `head` 的關係下,我們可以巧妙的藉由 `tail` 和 `struct element` 的 `idx` member 之關係進行同步(注意 `lfring_alloc` 時 `idx` 的初始化方式,以及每次更新 slot 時填入的 `idx`),兩者的關係分別可能為:
1. `slot.idx == tail - size`: slot 的狀態為空
* 表示位置 `idx` 的 slot 在這個 lap 還未被寫入
* 且上個 lap 在此 slot 所生產的 object 必然已經被消費掉 (只從 `tail` 搜尋到 `head + size - 1` 確保了這點)
2. `slot.idx == tail`: slot 已經被其他 thread 生產新的 object (但還位來得及更新 `tail`)
* 則找尋下一個 slot (`tail++`)
3. 其他情況: thread 的 local `tail` 進度落後數個 lap(可能被其他 thread 搶佔),從 `lfr->tail` 重新載入
因此,我們可以通過判斷此關係找到合適的 slot,並且 CAS 操作嘗試更新之。
* 最後的 `cond_update` 則透過 `lfr->tail` 和 local `tail` 的大小關係,得知我們之前提到的 「是否存在另一個生產者在自己之後生產 object」,並正確的更新 `lfr->tail`
### `lfring_dequeue`
```cpp
uint32_t lfring_dequeue(lfring_t *lfr,
void **restrict elems,
uint32_t n_elems,
uint32_t *index)
{
ringidx_t mask = lfr->mask;
intptr_t actual;
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED);
ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
do {
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
if (UNLIKELY(actual <= 0)) {
/* Ring buffer is empty, scan for new but unreleased elements */
tail = find_tail(lfr, head, tail);
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
if (actual <= 0)
return 0;
}
for (uint32_t i = 0; i < (uint32_t) actual; i++)
elems[i] = lfr->ring[(head + i) & mask].ptr;
smp_fence(LoadStore); // Order loads only
if (UNLIKELY(lfr->flags & LFRING_FLAG_SC)) { /* Single-consumer */
__atomic_store_n(&lfr->head, head + actual, __ATOMIC_RELAXED);
break;
}
/* else: lock-free multi-consumer */
} while (!__atomic_compare_exchange_n(
&lfr->head, &head, /* Updated on failure */
head + actual,
/* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
*index = (uint32_t) head;
return (uint32_t) actual;
}
```
對於消費者的操作,如果是單消費者,只要正確的取得 `lfr->tail` 就可以知道單 / 多生產者情境下可取用的 slot,消費之(放到 `elems` 中)並且更新 `lfr->head` 即可。多消費者的情況下則要 CAS 的更新 `lfr->head`,以確保只有一個消費者競爭到同一個 object。
:::info
可以發現目前的設計會一次性的嘗試消費連續 `actual` 個,而不是一個個找尋可以消費的 object 直到滿足 `actual`。如果更換成後者之實作可能有甚麼優點或缺點嗎?
:::
```cpp
static inline ringidx_t find_tail(lfring_t *lfr, ringidx_t head, ringidx_t tail)
{
if (lfr->flags & LFRING_FLAG_SP) /* single-producer enqueue */
return __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
/* Multi-producer enqueue.
* Scan ring for new elements that have been written but not released.
*/
ringidx_t mask = lfr->mask;
ringidx_t size = mask + 1;
while (before(tail, head + size) &&
__atomic_load_n(&lfr->ring[tail & mask].idx, __ATOMIC_ACQUIRE) ==
tail)
tail++;
tail = cond_update(&lfr->tail, tail);
return tail;
}
```
另一個是需要深入探討的點是 `find_tail`。在多生產的情境下,我們知道會存在已經生產但尚未更新 `lfr->tail` 的狀況。但因為生產者更新 slot 時會更新 `struct element` 的 `idx` 的原因,因此可以在消費者的 thread 藉此提早更新 `lfr->tail`,即 `find_tail` 的作用。
### `lf_compare_exchange`
```cpp
union u128 {
struct {
uint64_t lo, hi;
} s;
__int128 ui;
};
static inline bool lf_compare_exchange(register __int128 *var,
__int128 *exp,
__int128 neu)
{
union u128 cmp = {.ui = *exp}, with = {.ui = neu};
bool ret;
__asm__ __volatile__("lock cmpxchg16b %1\n\tsetz %0"
: "=q"(ret), "+m"(*var), "+d"(cmp.s.hi), "+a"(cmp.s.lo)
: "c"(with.s.hi), "b"(with.s.lo)
: "cc", "memory");
if (unlikely(!ret))
*exp = cmp.ui;
return ret;
}
```
為了多生產者的使用情境,需要做到以 slots 為單位的 compare-and-swap。`lfring` 中運用 slot 的大小剛好是一個 8 bytes 指標和一個 8 bytes 整數的性質,將 `struct element` 轉型成 128 bits 的整數,再用 128 bits 的 compare-and-swap instruction (剛好 x86-64 上有支援) 來實作之。
inline assembly 可以理解為:
```
lock cmpxchg16b [var]
setz [ret]
```
* `"+d"(cmp.s.hi), "+a"(cmp.s.lo)`: 對於 cmpxchg16b,要 compare 是否相同的數值,其高 64 bits 放在 rdx 且低 64 bits 放在 rax
* `"c"(with.s.hi), "b"(with.s.lo)`: 要 swap 的數值,其高 64 bits 放在 rcx 而低 64 bits 放在 rbx
* 如果 `var` 與要比較的數值相同, zero flag 會被設置,且 [rcx:rbx] 會被複製到 `var` 中,否則 zero flag 會被清零,並將 [rdx:rax] 的數值設為 `var`
> `=` identifies an operand which is only written; `+` identifies an operand that is both read and written; all other operands are assumed to only be read.
* `cc`: 標示 assembly codes 會對 flag register 產生影響
* [What happens in the assembly output when we add “cc” to clobber list](https://stackoverflow.com/questions/59656857/what-happens-in-the-assembly-output-when-we-add-cc-to-clobber-list)
* `memory`: 產生 compiler barrier,詳見下一章節
> * [關於GNU Inline Assembly](http://wen00072.github.io/blog/2015/12/10/about-inline-asm/)
> * [6.47 How to Use Inline Assembly Language in C Code](https://gcc.gnu.org/onlinedocs/gcc/Using-Assembly-Language-with-C.html#Using-Assembly-Language-with-C)
### Memory Barrier
```cpp
#if defined(__x86_64__)
static inline void smp_fence(unsigned int mask)
{
if ((mask & StoreLoad) == StoreLoad) {
__asm__ volatile("mfence" ::: "memory");
} else if (mask != 0) {
/* Any fence but StoreLoad */
__asm__ volatile("" ::: "memory");
}
}
#else
#error "Unsupported architecture"
#endif
```
`lfring` 中的 `smp_fence` 用來插入 memory barrier,可以看到被分為了兩種:
```cpp
__asm__ volatile("" ::: "memory");
```
如上所展示為 compiler barrier,其效果簡而言之,是讓 barrier 前的程式碼不能被重排到 barrier 之後,反之亦然。實際上 compiler barrier 不產生任何的指令,只是用來約束編譯器的行為。
```cpp
__asm__ volatile("mfence" ::: "memory");
```
如上則是 cpu barrier,其使得 CPU 不會將 barrier 前後的指令進行重排。普遍常見的 CPU 中,對於前後數據有依賴 (dependency) 關係的指令,CPU是不會去做重排的 (Alpha 除外)。而以 x86 的 memory-ordering model 而言,只有 store 指令後接 load (StoreLoad) 可以被重排,這也是甚麼這裡只有針對 StoreLoad 時需要插入 CPU barrier,其他狀況下只要插入 compiler barrier 即可。
> * [Weak vs. Strong Memory Models](https://preshing.com/20120930/weak-vs-strong-memory-models/)
> * [浅谈Memory Reordering](http://dreamrunner.org/blog/2014/06/28/qian-tan-memory-reordering/)
## Kfifo
[Linux ring buffer(kfifo)](https://hackmd.io/@RinHizakura/SJra5Mfjt)
## klfring
[`klfring`](https://github.com/RinHizakura/klfring) 嘗試將 `lfring` 移植到 Linux 核心,此單元記錄一些移植過程需考量的議題以及修正方案。
### `aligned_alloc`
`lfring` 通過對齊的策略試圖讓對 ring buffer 的存取可以考慮到 cache 所帶來的效益,關鍵是結合 `__attribute__((__aligned__(x)))` keyword 和
`aligned_alloc` library call 的使用。然而 kernel 中並不存在像是 `kmalloc_aligned` 的 API (儘管有可能在未來實現的相關討論)。
參照 [Alignment guarantees for kmalloc()](https://lwn.net/Articles/787740/),其中提及 `kmalloc` 對於配置空間的對齊保證:
> Natural alignment for an object means that its beginning address is a multiple of each of up to three different quantities: `ARCH_KMALLOC_MINALIGN` (eight bytes, by default), the cache-line size (for larger objects), and the size of the object itself when that size is a power of two. The actual required alignment will generally be the least common multiple of the three.
> Most of the time, Babka said, kmalloc() already returns naturally aligned objects for a power-of-two allocation size; that result falls out of how the slab pages are laid out. But there are exceptions: when SLUB debugging is enabled or when the SLOB allocator is used.
參閱 `kmalloc` 在文件中的敘述:
> [kmalloc](https://www.kernel.org/doc/html/latest/core-api/mm-api.html#c.kmalloc): The allocated object address is aligned to at least `ARCH_KMALLOC_MINALIGN` bytes
總結而言,`kmalloc` 所分配的空間在某些情境下是可能沒對齊 cache 的,可能導致 false sharing 問題,暫時未針對此問題進行解決。
### Atomic operation
在 Linux 中直接使用 gcc extension 的 atomic operation ([Built-in functions for memory model aware atomic operations](https://gcc.gnu.org/onlinedocs/gcc-4.7.2/gcc/_005f_005fatomic-Builtins.html)),我記得是存在相關疑慮的?
> * [C11 atomic variables and the kernel](https://lwn.net/Articles/586838/): February 18, 2014
> * [Time to move to C11 atomics?](https://lwn.net/Articles/691128/): June 15, 2016
於是我嘗試用 kernel 中的 atomic API 進行改寫,這應該會是比較合適的作法。我將結構中的 `head` 和 `tail` 改成 `atomic_long_t`。並透過相關 API 去對其進行操作。比較需要注意的地方是要修正某些變數的型別 (需要把透過特殊 API 操作的 `atomic_long_t` 和實際數值的 `unsigned long` 分開)。
* `long` 長度根據 [64-bit data models](https://en.wikipedia.org/wiki/64-bit_computing#64-bit_data_models) 在 64 位元的 Linux 上應為 64 bits
```cpp
typedef atomic_long_t ringidx_t;
typedef unsigned long __ringidx_t;
```
要注意到型別的變動可能會影響 `klfring` 和 `lfring` 對輸入的有效數值差異,特別注意到 kernel 文件中提到:
> While atomic_t, atomic_long_t and atomic64_t use int, long and s64
respectively (for hysterical raisins), the kernel uses -fno-strict-overflow
(which implies -fwrapv) and defines signed overflow to behave like
2s-complement. Therefore, an explicitly unsigned variant of the atomic ops is strictly
unnecessary and we can simply cast, there is no UB.
移植到 kernel 時可以修改的地方之一是 memory barrier 的使用,可以參照
> * [linux/arch/x86/include/asm/barrier.h](https://github.com/torvalds/linux/blob/master/arch/x86/include/asm/barrier.h)
> * [linux/Documentation/memory-barriers.txt](https://github.com/torvalds/linux/blob/master/Documentation/memory-barriers.txt)
> The Linux kernel has an explicit compiler barrier function that prevents the
compiler from moving the memory accesses either side of it to the other side:
```
barrier();
```
此外也要將 atomic operation 使用 kernel API 進行替換,此部份則參照:
> * [Semantics and Behavior of Atomic and Bitmask Operations](https://www.kernel.org/doc/html/v4.12/core-api/atomic_ops.html)
> * [linux/Documentation/atomic_t.txt](https://github.com/torvalds/linux/blob/master/Documentation/atomic_t.txt)
> * [Linux-Kernel Memory Model](http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p0124r5.html)
### CAS for buffer slot
要支援多生產者的話,除了基本的 64 位元的 CAS,還需考慮如合對整個 buffer slot 進行 CAS 操作,目前的作法是把 `lfring` 的 [`lf_compare_exchange`](#lf_compare_exchange) 實作直接搬至 kernel 中使用:
* `atomic_long_t` 為 8 bytes 長度,因此 `klfring` 和 `lfring` 的 `struct element` 有相同大小,`lf_compare_exchange` 應足以滿足需求
:::info
`atomic_t` 將 int 封裝在 struct 中,因此其大小與 int 相同
```cpp
typedef struct {
int counter;
} atomic_t;
```
其作用只是讓編譯器去確保對此型別的操作需使用 atomic API。
:::
### check
* [ktsan](https://github.com/google/ktsan)
* [KCSAN(1)](https://github.com/google/ktsan/wiki/KCSAN)
* [KCSAN(2)](https://www.kernel.org/doc/html/latest/dev-tools/kcsan.html)
* for Linux 5.8 up
### TODO list
直至 commit [`b59abf6`](https://github.com/RinHizakura/klfring/commit/b59abf6f5647f6aa7bb39640cac2846c8d755cc1) 為止,完成基本將 `lfring` 移植到 kernel 的目標。
- [ ] 透過更複雜的測資驗證實作的正確性
- [ ] 需考慮是否需要額外的 memory barrier 以確保不會發生非預期的重排
- [ ] 考慮修改為更寬鬆的 memory model(relaxed / acquire-release / comsume-release)
- [ ] `lfring` 的限制(lap ring buffer 的次數限制 / 只支援 x86-64 架構)在仍然存在 `klfring`,需思考是否有合適的方法可以一並解決
## Note
* [[RFC] lfring: lock-free ring buffer](https://patches.dpdk.org/project/dpdk/patch/1548678513-14348-1-git-send-email-ola.liljedahl@arm.com/)
* [读写一气呵成 - Linux中的原子操作](https://zhuanlan.zhihu.com/p/89299392)
* [对优化说不 - Linux中的Barrier](https://zhuanlan.zhihu.com/p/96001570)