# 2021q3 Homework3 (lfring) contributed by < [`linD026`](https://github.com/linD026) > ###### tags: `linux2021` > [2021 年暑期 第 4 週隨堂測驗題目](https://hackmd.io/@sysprog/linux2021-summer-quiz4) --- ## lfring ( Lock-Free Ring Buffer ) ### 資料結構 ```cpp #define SUPPORTED_FLAGS \ (LFRING_FLAG_SP | LFRING_FLAG_MP | LFRING_FLAG_SC | LFRING_FLAG_MC) #define MIN(a, b) \ ({ \ __typeof__(a) tmp_a = (a); \ __typeof__(b) tmp_b = (b); \ tmp_a < tmp_b ? tmp_a : tmp_b; \ }) typedef uintptr_t ringidx_t; struct element { void *ptr; uintptr_t idx; }; struct lfring { ringidx_t head; ringidx_t tail ALIGNED(CACHE_LINE); uint32_t mask; uint32_t flags; struct element ring[] ALIGNED(CACHE_LINE); } ALIGNED(CACHE_LINE); ``` ### arch ```cpp static inline void smp_fence(unsigned int mask) { if ((mask & StoreLoad) == StoreLoad) { __asm__ volatile("mfence" ::: "memory"); } else if (mask != 0) { /* Any fence but StoreLoad */ // compiler barrer __asm__ volatile("" ::: "memory"); } } ``` ```cpp static inline bool lf_compare_exchange(register __int128 *var, __int128 *exp, __int128 neu) { union u128 cmp = {.ui = *exp}, with = {.ui = neu}; bool ret; __asm__ __volatile__("lock cmpxchg16b %1\n\tsetz %0" : "=q"(ret), "+m"(*var), "+d"(cmp.s.hi), "+a"(cmp.s.lo) : "c"(with.s.hi), "b"(with.s.lo) : "cc", "memory"); if (UNLIKELY(!ret)) *exp = cmp.ui; return ret; } ``` --- ```cpp /* True if 'a' is before 'b' ('a' < 'b') in serial number arithmetic */ static inline bool before(ringidx_t a, ringidx_t b) { return (intptr_t)(a - b) < 0; } static inline ringidx_t cond_update(ringidx_t *loc, ringidx_t neu) { ringidx_t old = __atomic_load_n(loc, __ATOMIC_RELAXED); do { if (before(neu, old)) /* neu < old */ return old; /* if neu > old, need to update *loc */ } while (!__atomic_compare_exchange_n(loc, &old, /* Updated on failure */ neu, /* weak */ true, __ATOMIC_RELEASE, __ATOMIC_RELAXED)); return neu; } static inline ringidx_t cond_reload(ringidx_t idx, const ringidx_t *loc) { ringidx_t fresh = __atomic_load_n(loc, __ATOMIC_RELAXED); if (before(idx, fresh)) { /* fresh is after idx, use this instead */ idx = fresh; } else { /* Continue with next slot */ // DDD; idx += 1; idx = idx + 1; } return idx; } ``` [6.55 Built-in Functions for Memory Model Aware Atomic Operations](https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html) >```cpp >bool __atomic_compare_exchange_n (type *ptr, type *expected, type desired, > bool weak, > int success_memorder, int failure_memorder) >``` > > This built-in function implements an atomic compare and exchange operation. This compares the contents of `*ptr` with the contents of `*expected`. If equal, the operation is a read-modify-write operation that writes `desired` into `*ptr`. **If they are not equal, the operation is a read and the current contents of `*ptr` are written into `*expected`.** `weak` is true for `weak compare_exchange`, which may fail spuriously, and false for the strong variation, which never fails spuriously. Many targets only offer the strong variation and ignore the parameter. When in doubt, use the strong variation. > > If desired is written into `*ptr` then true is returned and memory is affected according to the memory order specified by `success_memorder`. There are no restrictions on what memory order can be used here. > > Otherwise, false is returned and memory is affected according to `failure_memorder`. This memory order cannot be `__ATOMIC_RELEASE` nor `__ATOMIC_ACQ_REL`. It also cannot be a stronger order than that specified by `success_memorder`. ### enqueue ```cpp= /* Enqueue elements at tail */ uint32_t lfring_enqueue(lfring_t *lfr, void *const *restrict elems, uint32_t n_elems) { intptr_t actual = 0; ringidx_t mask = lfr->mask; ringidx_t size = mask + 1; ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_RELAXED); if (lfr->flags & LFRING_FLAG_SP) { /* single-producer */ ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE); actual = MIN((intptr_t)(head + size - tail), (intptr_t) n_elems); if (actual <= 0) return 0; for (uint32_t i = 0; i < (uint32_t) actual; i++) { assert(lfr->ring[tail & mask].idx == tail - size); lfr->ring[tail & mask].ptr = *elems++; lfr->ring[tail & mask].idx = tail; tail++; } __atomic_store_n(&lfr->tail, tail, __ATOMIC_RELEASE); return (uint32_t) actual; } /* else: lock-free multi-producer */ restart: while ((uint32_t) actual < n_elems && before(tail, __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE) + size)) { union { struct element e; ptrpair_t pp; } old, neu; void *elem = elems[actual]; struct element *slot = &lfr->ring[tail & mask]; old.e.ptr = __atomic_load_n(&slot->ptr, __ATOMIC_RELAXED); old.e.idx = __atomic_load_n(&slot->idx, __ATOMIC_RELAXED); do { if (UNLIKELY(old.e.idx != tail - size)) { if (old.e.idx != tail) { /* We are far behind. Restart with fresh index */ tail = cond_reload(tail, &lfr->tail); goto restart; } /* slot already enqueued */ tail++; /* Try next slot */ goto restart; } /* Found slot that was used one lap back. * Try to enqueue next element. */ neu.e.ptr = elem; neu.e.idx = tail; /* Set idx on enqueue */ } while (!lf_compare_exchange((ptrpair_t *) slot, &old.pp, neu.pp)); /* Enqueue succeeded */ actual++; tail++; /* Continue with next slot */ } (void) cond_update(&lfr->tail, tail); return (uint32_t) actual; } ``` 其中第 18 行 `assert(lfr->ring[tail & mask].idx == tail - size);` 是利用先前設定的數值來確認: ```cpp lfring_t *lfring_alloc(uint32_t n_elems, uint32_t flags) { ... for (ringidx_t i = 0; i < ringsz; i++) { lfr->ring[i].ptr = NULL; lfr->ring[i].idx = i - ringsz; } ... } ``` 是利用以 `ringsz` 大小的間距差來得到上一次此 index 的 `tail` 數值。 例如,以 `ringsz` 為 16 、 數值範圍為零到十五 (四個 bits) ,此時初始化的 index 0 的上以次會是 12 。 ```graphviz digraph main { node [shape = record] rankdir = LR bits [label = "{<0>0|12 -\> 0|0}|{1|1}|{2|2}|{3|3}|{<4>4|0 -\> 4|0}|{5|1}|{6|2}|{7|3}|{<8>8|4 -\> 8|0}|{9|1}|{10|_2}|{11|_3}|{<12>12|8 -\> 12|_0}|{13|_1}|{14|_2}|{15|_3}"] init -> bits:0 prev -> bits:12 next -> bits:4 next_next -> bits:8 label = "index 0" } ``` 在第 40 行兩個判斷是如果都成立,代表此 `old.e.idx` 的數值不是前一次或此次 `tail` 的,因此可以判定 index 已經被在 `tail` 數值之後的另一個 `tail` 更新。 ```cpp if (UNLIKELY(old.e.idx != tail - size)) { if (old.e.idx != tail) { /* We are far behind. Restart with fresh index */ tail = cond_reload(tail, &lfr->tail); goto restart; } /* slot already enqueued */ tail++; /* Try next slot */ goto restart; } ``` 第 13 行計算未使用的 index 是以 `size + head - tail` 的形式計算,而在 kfifo 則是以 `size - (tail - head)` 先是計算已使用數量,再減去整體大小。 * [/lib/kfifo.c](https://elixir.bootlin.com/linux/latest/source/lib/kfifo.c#L16) ```cpp /* * internal helper to calculate the unused elements in a fifo */ static inline unsigned int kfifo_unused(struct __kfifo *fifo) { return (fifo->mask + 1) - (fifo->in - fifo->out); } ``` --- ### dequeue ```cpp= /* Dequeue elements from head */ uint32_t lfring_dequeue(lfring_t *lfr, void **restrict elems, uint32_t n_elems, /*dequeue n_elems*/ uint32_t *index) { ringidx_t mask = lfr->mask; intptr_t actual; ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED); ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE); do { // check total index and n_elems actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems); if (UNLIKELY(actual <= 0)) { /* Ring buffer is empty, scan for new but unreleased elements */ tail = find_tail(lfr, head, tail); actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems); if (actual <= 0) return 0; } for (uint32_t i = 0; i < (uint32_t) actual; i++) elems[i] = lfr->ring[(head + i) & mask].ptr; smp_fence(LoadStore); // Order loads only if (UNLIKELY(lfr->flags & LFRING_FLAG_SC)) { /* Single-consumer */ __atomic_store_n(&lfr->head, head + actual, __ATOMIC_RELAXED); break; } /* else: lock-free multi-consumer */ } while (!__atomic_compare_exchange_n( &lfr->head, &head, /* Updated on failure */ head + actual /*HHH*/, /* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); // If they are not equal, the operation is a read and the // current contents of *ptr are written into *expected. *index = (uint32_t) head; return (uint32_t) actual; } ``` ### Compare CAS with arch.h and C11 ```cpp __asm__ __volatile__("lock cmpxchg16b %1\n\tsetz %0" : "=q"(ret), "+m"(*var), "+d"(cmp.s.hi), "+a"(cmp.s.lo) : "c"(with.s.hi), "b"(with.s.lo) : "cc", "memory"); ``` * [/tools/arch/x86/include/asm/cmpxchg.h](https://elixir.bootlin.com/linux/latest/source/tools/arch/x86/include/asm/cmpxchg.h#L86) ```cpp /* * Atomic compare and exchange. Compare OLD with MEM, if identical, * store NEW in MEM. Return the initial value in MEM. Success is * indicated by comparing RETURN with OLD. */ #define __raw_cmpxchg(ptr, old, new, size, lock) \ ({ \ __typeof__(*(ptr)) __ret; \ __typeof__(*(ptr)) __old = (old); \ __typeof__(*(ptr)) __new = (new); \ switch (size) { \ case __X86_CASE_B: { \ volatile u8 *__ptr = (volatile u8 *)(ptr); \ asm volatile(lock "cmpxchgb %2,%1" \ : "=a"(__ret), "+m"(*__ptr) \ : "q"(__new), "0"(__old) \ : "memory"); \ break; \ } \ case __X86_CASE_W: { \ volatile u16 *__ptr = (volatile u16 *)(ptr); \ asm volatile(lock "cmpxchgw %2,%1" \ : "=a"(__ret), "+m"(*__ptr) \ : "r"(__new), "0"(__old) \ : "memory"); \ break; \ } \ case __X86_CASE_L: { \ volatile u32 *__ptr = (volatile u32 *)(ptr); \ asm volatile(lock "cmpxchgl %2,%1" \ : "=a"(__ret), "+m"(*__ptr) \ : "r"(__new), "0"(__old) \ : "memory"); \ break; \ } \ case __X86_CASE_Q: { \ volatile u64 *__ptr = (volatile u64 *)(ptr); \ asm volatile(lock "cmpxchgq %2,%1" \ : "=a"(__ret), "+m"(*__ptr) \ : "r"(__new), "0"(__old) \ : "memory"); \ break; \ } \ default: \ __cmpxchg_wrong_size(); \ } \ __ret; \ }) ``` ### kfifo 研讀 Linux 核心 kfifo 文件,搭配 kfifo-examples 測試,以 git 取得 linux 程式碼工作副本,觀察 `git log include/linux/kfifo.h lib/kfifo.c` 並觀察修改記錄 留意 `spin_unlock_irqrestore` 的使用 解釋 commit 7e10505 的 race condition 成因 git show commit git blame line 為什麼改最佳化,改了之後又做了什麼 ### module