--- tags: System Software, 作業系統 --- # Linux ring buffer(kfifo) ## Introduction > * [linux/kfifo.h](https://github.com/torvalds/linux/blob/master/include/linux/kfifo.h) > * [lib/kfifo.c](https://github.com/torvalds/linux/blob/master/lib/kfifo.c) > * [kfifo-examples](https://github.com/sysprog21/kfifo-examples) Linux 的 kernel API 提供 [Kfifo](https://www.kernel.org/doc/html/latest/core-api/kernel-api.html?highlight=kfifo#fifo-buffer),後者是一個 FIFO 的 [ring buffer](https://en.wikipedia.org/wiki/Circular_buffer),在嘗試把 `lfring` 移植到 kernel 以前,我們應該先對 kfifo 實作有一定理解,以考慮到在 kernel 中的 ring buffer 可能需考慮的議題。 關於 Kfifo 合適的使用情境,可以見到其註解: > Note about locking: There is no locking required until only one reader and one writer is using the fifo and no `kfifo_reset()` will be called. `kfifo_reset_out()` can be safely used, until it will be only called in the reader thread. > For multiple writer and one reader there is only a need to lock the writer. > And vice versa for only one writer and multiple reader there is only a need > to lock the reader. ## 實作 ### 資料結構 使用 API 所需要的 `struct kfifo` 透過 macro `__STRUCT_KFIFO_PTR` 展開如下所示: ```cpp struct kfifo { union { struct __kfifo kfifo; unsigned char *type; const unsigned char *const_type; char (*rectype)[0]; void *ptr; void const *ptr_const; } type buf[0]; }; ``` :::info `__STRUCT_KFIFO_PTR` 也用來展開 `struct kfifo_rec_ptr_1` 等結構,差異是 `rectype` 的指標陣列有額外的空間可以進行其他用途,這裡暫時不特別討論。 ::: `struct __kfifo` 則是 kfifo 的關鍵結構,其定義如下: ```cpp struct __kfifo { unsigned int in; unsigned int out; unsigned int mask; unsigned int esize; void *data; }; ``` * 把 `in` 對應到 `lfring` 中的 `tail` (生產者目標 slot 之 index),把 `out` 對應到 `lfring` 中的 `head` (消費者目標 slot 之 index),`kfifo` 的邏輯就很容易理解了 * `esize` 是每個元素的大小,以 `struct kfifo` 而言即 `unsigned char` 之大小 1 bytes * `data` 是儲存資料的 buffer 實體 * `mask` 是 buffer 大小減 1,因為 buffer 大小被強制調整為 $2^n$ 而可藉此設計來以 bitwise 操作取代使用除法取餘數操作 ### `kfifo_alloc` `kfifo_alloc` 可將 kfifo 結構初始化,其使用方式範例為: ```cpp static struct kfifo test; kfifo_alloc(&test, FIFO_SIZE, GFP_KERNEL); ``` 而其定義則是一個如下的 macro: ```cpp #define kfifo_alloc(fifo, size, gfp_mask) \ __kfifo_int_must_check_helper( \ ({ \ typeof((fifo) + 1) __tmp = (fifo); \ struct __kfifo *__kfifo = &__tmp->kfifo; \ __is_kfifo_ptr(__tmp) ? \ __kfifo_alloc(__kfifo, size, sizeof(*__tmp->type), gfp_mask) : \ -EINVAL; \ }) \ ) ``` 其作用主要是初始化 `struct __kfifo`,但其中有些技巧也值得關注: 1. `__kfifo_int_must_check_helper` 結合最後一行讓配置錯誤時得以返回 `-EINVAL` 2. `fifo + 1` 運用 compiler 的特性避免使用者誤用 API 為 `kfifo_alloc(test, FIFO_SIZE, GFP_KERNEL)`,因為後者是不能被編譯器接受的操作 ```cpp int __kfifo_alloc(struct __kfifo *fifo, unsigned int size, size_t esize, gfp_t gfp_mask) { /* * round up to the next power of 2, since our 'let the indices * wrap' technique works only in this case. */ size = roundup_pow_of_two(size); fifo->in = 0; fifo->out = 0; fifo->esize = esize; if (size < 2) { fifo->data = NULL; fifo->mask = 0; return -EINVAL; } fifo->data = kmalloc_array(esize, size, gfp_mask); if (!fifo->data) { fifo->mask = 0; return -ENOMEM; } fifo->mask = size - 1; return 0; } ``` `__kfifo_alloc` 進行核心的初始化,對照 `lfring` 的實作可以發現類似的技巧,例如強制為 $2^n$(n 為整數) 大小的 buffer 以及相應的 `mask` 以使用 bitwise 操作取代取餘數的除法操作。 ### `kfifo_in` `kfifo_in` 的作用是將 `buf` 中的 object 以 byte 為單位,加入 `n` bytes 到 queue 之中。如以下的使用範例: ```cpp kfifo_in(&test, "hello", 5); ``` ```cpp #define kfifo_in(fifo, buf, n) \ ({ \ typeof((fifo) + 1) __tmp = (fifo); \ typeof(__tmp->ptr_const) __buf = (buf); \ unsigned long __n = (n); \ const size_t __recsize = sizeof(*__tmp->rectype); \ struct __kfifo *__kfifo = &__tmp->kfifo; \ (__recsize) ?\ __kfifo_in_r(__kfifo, __buf, __n, __recsize) : \ __kfifo_in(__kfifo, __buf, __n); \ }) ``` 在我們的案例中,使用的 `struct kfifo` 中的 `rectype` 之大小為 0,因此核心的插入使用到 `__kfifo_in` ```cpp static inline unsigned int kfifo_unused(struct __kfifo *fifo) { return (fifo->mask + 1) - (fifo->in - fifo->out); } unsigned int __kfifo_in(struct __kfifo *fifo, const void *buf, unsigned int len) { unsigned int l; l = kfifo_unused(fifo); if (len > l) len = l; kfifo_copy_in(fifo, buf, len, fifo->in); fifo->in += len; return len; } ``` 首先使用 `kfifo_unused` 計算 buffer 可生產的空間,可生產的空間的大小應為 `size - (in - out)`,因為 unsigned 整數的加減法之 wrapping add / wrapping subtact 特性因此其正確性恆滿足。將此空間與要求生產的數量進行相比較並調整後,接著藉由 `kfifo_copy_in` 將 `buf` 資料複製到 kfifo 的 buffer 目標位置中,並且對生產者的 index `fifo->in` 進行更新即可。 :::warning `kfifo_in` 沒有 `kfifo_is_full` 的檢查,雖然 `kfifo_unused` 回傳 0 時也是同樣意思,但提早返回是否有好處呢? ::: ```cpp static void kfifo_copy_in(struct __kfifo *fifo, const void *src, unsigned int len, unsigned int off) { unsigned int size = fifo->mask + 1; unsigned int esize = fifo->esize; unsigned int l; off &= fifo->mask; if (esize != 1) { off *= esize; size *= esize; len *= esize; } l = min(len, size - off); memcpy(fifo->data + off, src, l); memcpy(fifo->data, src + l, len - l); /* * make sure that the data in the fifo is up to date before * incrementing the fifo->in index counter */ smp_wmb(); } ``` `kfifo_copy_in` 也很容易理解,先計算要複製到 buffer (`fifo->data`) 中的哪個位置 (`off`),然後複製 `src` 到該位置即可。值得注意的是,我們可以看到使用了兩次 `memcpy`,這是因為 ring buffer 的邏輯是頭尾相接的特性(可參考 [Circular buffer](https://en.wikipedia.org/wiki/Circular_buffer)),因此資料可能需要繞回開頭的地方儲存。 * `smp_wmb()` 需要被安插以保證 buffer 內容的更新先於 `fifo->in` 的更新,因為前者仰賴舊的 `fifo->in` ### `kfifo_put` ```cpp #define kfifo_put(fifo, val) \ ({ \ typeof((fifo) + 1) __tmp = (fifo); \ typeof(*__tmp->const_type) __val = (val); \ unsigned int __ret; \ size_t __recsize = sizeof(*__tmp->rectype); \ struct __kfifo *__kfifo = &__tmp->kfifo; \ if (__recsize) \ __ret = __kfifo_in_r(__kfifo, &__val, sizeof(__val), \ __recsize); \ else { \ __ret = !kfifo_is_full(__tmp); \ if (__ret) { \ (__is_kfifo_ptr(__tmp) ? \ ((typeof(__tmp->type))__kfifo->data) : \ (__tmp->buf) \ )[__kfifo->in & __tmp->kfifo.mask] = \ *(typeof(__tmp->type))&__val; \ smp_wmb(); \ __kfifo->in++; \ } \ } \ __ret; \ }) ``` `kfifo_put` 整體流程其實與 `kfifo_in` 相近,只是用來處理要將單一個值加入到 buffer 的特殊使用情境。 ### `kfifo_out` ```cpp #define kfifo_out(fifo, buf, n) \ __kfifo_uint_must_check_helper( \ ({ \ typeof((fifo) + 1) __tmp = (fifo); \ typeof(__tmp->ptr) __buf = (buf); \ unsigned long __n = (n); \ const size_t __recsize = sizeof(*__tmp->rectype); \ struct __kfifo *__kfifo = &__tmp->kfifo; \ (__recsize) ?\ __kfifo_out_r(__kfifo, __buf, __n, __recsize) : \ __kfifo_out(__kfifo, __buf, __n); \ }) \ ) ``` 與 `kfifo_in` 相對的,`kfifo_out` 的目的是從 buffer 中進行消費,核心的函數是 `__kfifo_out` ```cpp unsigned int __kfifo_out_peek(struct __kfifo *fifo, void *buf, unsigned int len) { unsigned int l; l = fifo->in - fifo->out; if (len > l) len = l; kfifo_copy_out(fifo, buf, len, fifo->out); return len; } unsigned int __kfifo_out(struct __kfifo *fifo, void *buf, unsigned int len) { len = __kfifo_out_peek(fifo, buf, len); fifo->out += len; return len; } ``` `__kfifo_out_peek` 先進行可消費最大數量的檢查 `fifo->in - fifo->out`,將此空間與要求消費的數量進行相比較並調整後,`kfifo_copy_out` (即 `kfifo_copy_in` 之逆向操作,不贅述) 將其複製到給定的 `buf` 中,並且調整 `out` 即可。 :::warning `kfifo_out` 沒有 `kfifo_is_empty` 的檢查,同樣的,雖然 `__kfifo_out_peek` 計算出 `l` 為 0 時也是同樣意思,但提早返回是否有好處呢? ::: ### `kfifo_get` `kfifo_put` 的反向操作,不多贅述。 ## Kfifo 的 Commit 大冒險 ~~透過 Commit log 在 Linux 的歷史洪流中冒險真好玩?~~ ### 關於 " `kfifo_put` " [linux/kfifo.h](https://github.com/torvalds/linux/blob/master/include/linux/kfifo.h#L11) 註解中提到要把 `kfifo_put` 換成 `kfifo_in_spinlocked`: > Replace the use of `kfifo_put` into `kfifo_in_locked` and `kfifo_get` into `kfifo_out_locked` 一開始我誤以為是指 `kfifo_put` 可能有不安全的疑慮因此不建議使用。後來發現註解其實是針對使用舊 API 的程式碼,因為 `kfifo_put` 最早時候對應的功能是類似現在的 `kfifo_in` 之有鎖版本: * [c1e13f](https://github.com/torvalds/linux/commit/c1e13f25674ed564948ecb7dfe5f83e578892896#diff-2ba13228f60fece53a31b0769d20f2c4673e73c865e8ce1a308baa012f4d07fe) 被更名成 `kfifo_put_locked`,`kfifo_put` 這個 symbol 被刪去 * [2e956f](https://github.com/torvalds/linux/commit/2e956fb320568cc70861761483e2f0e2db75fd66#diff-2ba13228f60fece53a31b0769d20f2c4673e73c865e8ce1a308baa012f4d07fe) 中 `kfifo_put` 被加入,但此時(直至今日)的作用是加入單一個 value 到 buffer 中 也因此註解也許可能產生誤解? 得注意你拿到使用 `kfifo_put` 的程式碼到底原先是跑在哪個版上 kernel 再決定是否修改為 `kfifo_in_locked` (雖然新舊的 `kfifo_put` 參數數量不同,其實編譯器就能檢查出 API 的使用錯誤,不太可能誤用就是 XD) ### Race bug 關注 [7e10505](https://github.com/torvalds/linux/commit/7e105057a34c83cea542dacc55ff0528bce67afa#diff-2ba13228f60fece53a31b0769d20f2c4673e73c865e8ce1a308baa012f4d07fe) 所進行的更動,`kfifo_out_locked` 原始的設計中做了某種 「優化」: ```cpp= static inline void kfifo_reset(struct kfifo *fifo) { fifo->in = fifo->out = 0; } static inline __must_check unsigned int kfifo_out_locked(struct kfifo *fifo, unsigned char *to, unsigned int n, spinlock_t *lock) { unsigned long flags; unsigned int ret; spin_lock_irqsave(lock, flags); ret = kfifo_out(fifo, to, n); /* * optimization: if the FIFO is empty, set the indices to 0 * so we don't wrap the next time */ if (kfifo_is_empty(fifo)) kfifo_reset(fifo); spin_unlock_irqrestore(lock, flags); return ret; } ``` 第 16 到 21 行中,如果 buffer 為空的話,就把 kfifo 結構體給重新初始化(`in` 和 `out` 重設成 0)。 :::warning 我尚不清楚這個「優化」實際的幫助在哪,讓 `in` / `out` wrapping add 應該也不會造成額外的負擔(嗎)? ::: 但是這會有甚麼問題呢? 我們先考慮 `kfifo_reset` 的危險之處,kfifo 的程式註解中說道: > Note about locking: There is no locking required until only one reader and one writer is using the fifo and no `kfifo_reset()` will be called. 因為 kfifo 被預期在單讀單寫(SPSC) 是可以 lock-less 使用的,其前提是建立於 `in` 和 `out` 在某個 thread 中只能修改其中之一(視其為生產者或消費者而定)。但假如有一個 thread 可以同時修改兩者呢? 假設 thread A 呼叫 `kfifo_reset` 1. 先將 `fifo->out` 設為 0 2. 再將 `fifo->in` 設為 0 如果第一步做完時,context switch 發生,切換到 thread B,此時 `fifo->in` 還沒被設為 0,因此透過 `fifo->out` 和 `fifo->in` 判斷 buffer 的滿或空狀態,或者計算尚有多少可用空間時都會出現錯誤,也就可能發生存取到錯誤的 slot,計算出錯誤的 buffer 可生產數量/可消費數量等非預期的行為,萬惡之源就來自這短短的一行程式碼! 不過,回到原本的 `kfifo_out_locked`,這段程式碼中看似有用 `spin_lock_irqsave` 去保護 `fifo` 結構的存取,這樣難到不足以解決 `kfifo_reset` 的問題嗎? 參照其中的另一段註解: > For multiple writer and one reader there is only a need to lock the writer. And vice versa for only one writer and multiple reader there is only a need to lock the reader. 在單寫多讀的情境下,kfifo 預期可以只對 reader 端進行上鎖,不過若此優化仍存在的話,就必須也將 writer 端上鎖了,我認為這應該是必須移除這一段程式的關鍵原因。 :::danger 不知道是否有其他疏漏的考量,如果讀寫兩端都上鎖的話還是有 race 的可能性嗎? :::