---
tags: System Software, 作業系統
---
# Linux ring buffer(kfifo)
## Introduction
> * [linux/kfifo.h](https://github.com/torvalds/linux/blob/master/include/linux/kfifo.h)
> * [lib/kfifo.c](https://github.com/torvalds/linux/blob/master/lib/kfifo.c)
> * [kfifo-examples](https://github.com/sysprog21/kfifo-examples)
Linux 的 kernel API 提供 [Kfifo](https://www.kernel.org/doc/html/latest/core-api/kernel-api.html?highlight=kfifo#fifo-buffer),後者是一個 FIFO 的 [ring buffer](https://en.wikipedia.org/wiki/Circular_buffer),在嘗試把 `lfring` 移植到 kernel 以前,我們應該先對 kfifo 實作有一定理解,以考慮到在 kernel 中的 ring buffer 可能需考慮的議題。
關於 Kfifo 合適的使用情境,可以見到其註解:
> Note about locking: There is no locking required until only one reader and one writer is using the fifo and no `kfifo_reset()` will be called. `kfifo_reset_out()` can be safely used, until it will be only called in the reader thread.
> For multiple writer and one reader there is only a need to lock the writer.
> And vice versa for only one writer and multiple reader there is only a need
> to lock the reader.
## 實作
### 資料結構
使用 API 所需要的 `struct kfifo` 透過 macro `__STRUCT_KFIFO_PTR` 展開如下所示:
```cpp
struct kfifo {
union {
struct __kfifo kfifo;
unsigned char *type;
const unsigned char *const_type;
char (*rectype)[0];
void *ptr;
void const *ptr_const;
}
type buf[0];
};
```
:::info
`__STRUCT_KFIFO_PTR` 也用來展開 `struct kfifo_rec_ptr_1` 等結構,差異是 `rectype` 的指標陣列有額外的空間可以進行其他用途,這裡暫時不特別討論。
:::
`struct __kfifo` 則是 kfifo 的關鍵結構,其定義如下:
```cpp
struct __kfifo {
unsigned int in;
unsigned int out;
unsigned int mask;
unsigned int esize;
void *data;
};
```
* 把 `in` 對應到 `lfring` 中的 `tail` (生產者目標 slot 之 index),把 `out` 對應到 `lfring` 中的 `head` (消費者目標 slot 之 index),`kfifo` 的邏輯就很容易理解了
* `esize` 是每個元素的大小,以 `struct kfifo` 而言即 `unsigned char` 之大小 1 bytes
* `data` 是儲存資料的 buffer 實體
* `mask` 是 buffer 大小減 1,因為 buffer 大小被強制調整為 $2^n$ 而可藉此設計來以 bitwise 操作取代使用除法取餘數操作
### `kfifo_alloc`
`kfifo_alloc` 可將 kfifo 結構初始化,其使用方式範例為:
```cpp
static struct kfifo test;
kfifo_alloc(&test, FIFO_SIZE, GFP_KERNEL);
```
而其定義則是一個如下的 macro:
```cpp
#define kfifo_alloc(fifo, size, gfp_mask) \
__kfifo_int_must_check_helper( \
({ \
typeof((fifo) + 1) __tmp = (fifo); \
struct __kfifo *__kfifo = &__tmp->kfifo; \
__is_kfifo_ptr(__tmp) ? \
__kfifo_alloc(__kfifo, size, sizeof(*__tmp->type), gfp_mask) : \
-EINVAL; \
}) \
)
```
其作用主要是初始化 `struct __kfifo`,但其中有些技巧也值得關注:
1. `__kfifo_int_must_check_helper` 結合最後一行讓配置錯誤時得以返回 `-EINVAL`
2. `fifo + 1` 運用 compiler 的特性避免使用者誤用 API 為 `kfifo_alloc(test, FIFO_SIZE, GFP_KERNEL)`,因為後者是不能被編譯器接受的操作
```cpp
int __kfifo_alloc(struct __kfifo *fifo, unsigned int size,
size_t esize, gfp_t gfp_mask)
{
/*
* round up to the next power of 2, since our 'let the indices
* wrap' technique works only in this case.
*/
size = roundup_pow_of_two(size);
fifo->in = 0;
fifo->out = 0;
fifo->esize = esize;
if (size < 2) {
fifo->data = NULL;
fifo->mask = 0;
return -EINVAL;
}
fifo->data = kmalloc_array(esize, size, gfp_mask);
if (!fifo->data) {
fifo->mask = 0;
return -ENOMEM;
}
fifo->mask = size - 1;
return 0;
}
```
`__kfifo_alloc` 進行核心的初始化,對照 `lfring` 的實作可以發現類似的技巧,例如強制為 $2^n$(n 為整數) 大小的 buffer 以及相應的 `mask` 以使用 bitwise 操作取代取餘數的除法操作。
### `kfifo_in`
`kfifo_in` 的作用是將 `buf` 中的 object 以 byte 為單位,加入 `n` bytes 到 queue 之中。如以下的使用範例:
```cpp
kfifo_in(&test, "hello", 5);
```
```cpp
#define kfifo_in(fifo, buf, n) \
({ \
typeof((fifo) + 1) __tmp = (fifo); \
typeof(__tmp->ptr_const) __buf = (buf); \
unsigned long __n = (n); \
const size_t __recsize = sizeof(*__tmp->rectype); \
struct __kfifo *__kfifo = &__tmp->kfifo; \
(__recsize) ?\
__kfifo_in_r(__kfifo, __buf, __n, __recsize) : \
__kfifo_in(__kfifo, __buf, __n); \
})
```
在我們的案例中,使用的 `struct kfifo` 中的 `rectype` 之大小為 0,因此核心的插入使用到 `__kfifo_in`
```cpp
static inline unsigned int kfifo_unused(struct __kfifo *fifo)
{
return (fifo->mask + 1) - (fifo->in - fifo->out);
}
unsigned int __kfifo_in(struct __kfifo *fifo,
const void *buf, unsigned int len)
{
unsigned int l;
l = kfifo_unused(fifo);
if (len > l)
len = l;
kfifo_copy_in(fifo, buf, len, fifo->in);
fifo->in += len;
return len;
}
```
首先使用 `kfifo_unused` 計算 buffer 可生產的空間,可生產的空間的大小應為 `size - (in - out)`,因為 unsigned 整數的加減法之 wrapping add / wrapping subtact 特性因此其正確性恆滿足。將此空間與要求生產的數量進行相比較並調整後,接著藉由 `kfifo_copy_in` 將 `buf` 資料複製到 kfifo 的 buffer 目標位置中,並且對生產者的 index `fifo->in` 進行更新即可。
:::warning
`kfifo_in` 沒有 `kfifo_is_full` 的檢查,雖然 `kfifo_unused` 回傳 0 時也是同樣意思,但提早返回是否有好處呢?
:::
```cpp
static void kfifo_copy_in(struct __kfifo *fifo, const void *src,
unsigned int len, unsigned int off)
{
unsigned int size = fifo->mask + 1;
unsigned int esize = fifo->esize;
unsigned int l;
off &= fifo->mask;
if (esize != 1) {
off *= esize;
size *= esize;
len *= esize;
}
l = min(len, size - off);
memcpy(fifo->data + off, src, l);
memcpy(fifo->data, src + l, len - l);
/*
* make sure that the data in the fifo is up to date before
* incrementing the fifo->in index counter
*/
smp_wmb();
}
```
`kfifo_copy_in` 也很容易理解,先計算要複製到 buffer (`fifo->data`) 中的哪個位置 (`off`),然後複製 `src` 到該位置即可。值得注意的是,我們可以看到使用了兩次 `memcpy`,這是因為 ring buffer 的邏輯是頭尾相接的特性(可參考 [Circular buffer](https://en.wikipedia.org/wiki/Circular_buffer)),因此資料可能需要繞回開頭的地方儲存。
* `smp_wmb()` 需要被安插以保證 buffer 內容的更新先於 `fifo->in` 的更新,因為前者仰賴舊的 `fifo->in`
### `kfifo_put`
```cpp
#define kfifo_put(fifo, val) \
({ \
typeof((fifo) + 1) __tmp = (fifo); \
typeof(*__tmp->const_type) __val = (val); \
unsigned int __ret; \
size_t __recsize = sizeof(*__tmp->rectype); \
struct __kfifo *__kfifo = &__tmp->kfifo; \
if (__recsize) \
__ret = __kfifo_in_r(__kfifo, &__val, sizeof(__val), \
__recsize); \
else { \
__ret = !kfifo_is_full(__tmp); \
if (__ret) { \
(__is_kfifo_ptr(__tmp) ? \
((typeof(__tmp->type))__kfifo->data) : \
(__tmp->buf) \
)[__kfifo->in & __tmp->kfifo.mask] = \
*(typeof(__tmp->type))&__val; \
smp_wmb(); \
__kfifo->in++; \
} \
} \
__ret; \
})
```
`kfifo_put` 整體流程其實與 `kfifo_in` 相近,只是用來處理要將單一個值加入到 buffer 的特殊使用情境。
### `kfifo_out`
```cpp
#define kfifo_out(fifo, buf, n) \
__kfifo_uint_must_check_helper( \
({ \
typeof((fifo) + 1) __tmp = (fifo); \
typeof(__tmp->ptr) __buf = (buf); \
unsigned long __n = (n); \
const size_t __recsize = sizeof(*__tmp->rectype); \
struct __kfifo *__kfifo = &__tmp->kfifo; \
(__recsize) ?\
__kfifo_out_r(__kfifo, __buf, __n, __recsize) : \
__kfifo_out(__kfifo, __buf, __n); \
}) \
)
```
與 `kfifo_in` 相對的,`kfifo_out` 的目的是從 buffer 中進行消費,核心的函數是 `__kfifo_out`
```cpp
unsigned int __kfifo_out_peek(struct __kfifo *fifo,
void *buf, unsigned int len)
{
unsigned int l;
l = fifo->in - fifo->out;
if (len > l)
len = l;
kfifo_copy_out(fifo, buf, len, fifo->out);
return len;
}
unsigned int __kfifo_out(struct __kfifo *fifo,
void *buf, unsigned int len)
{
len = __kfifo_out_peek(fifo, buf, len);
fifo->out += len;
return len;
}
```
`__kfifo_out_peek` 先進行可消費最大數量的檢查 `fifo->in - fifo->out`,將此空間與要求消費的數量進行相比較並調整後,`kfifo_copy_out` (即 `kfifo_copy_in` 之逆向操作,不贅述) 將其複製到給定的 `buf` 中,並且調整 `out` 即可。
:::warning
`kfifo_out` 沒有 `kfifo_is_empty` 的檢查,同樣的,雖然 `__kfifo_out_peek` 計算出 `l` 為 0 時也是同樣意思,但提早返回是否有好處呢?
:::
### `kfifo_get`
`kfifo_put` 的反向操作,不多贅述。
## Kfifo 的 Commit 大冒險
~~透過 Commit log 在 Linux 的歷史洪流中冒險真好玩?~~
### 關於 " `kfifo_put` "
[linux/kfifo.h](https://github.com/torvalds/linux/blob/master/include/linux/kfifo.h#L11) 註解中提到要把 `kfifo_put` 換成 `kfifo_in_spinlocked`:
> Replace the use of `kfifo_put` into `kfifo_in_locked` and `kfifo_get` into `kfifo_out_locked`
一開始我誤以為是指 `kfifo_put` 可能有不安全的疑慮因此不建議使用。後來發現註解其實是針對使用舊 API 的程式碼,因為 `kfifo_put` 最早時候對應的功能是類似現在的 `kfifo_in` 之有鎖版本:
* [c1e13f](https://github.com/torvalds/linux/commit/c1e13f25674ed564948ecb7dfe5f83e578892896#diff-2ba13228f60fece53a31b0769d20f2c4673e73c865e8ce1a308baa012f4d07fe) 被更名成 `kfifo_put_locked`,`kfifo_put` 這個 symbol 被刪去
* [2e956f](https://github.com/torvalds/linux/commit/2e956fb320568cc70861761483e2f0e2db75fd66#diff-2ba13228f60fece53a31b0769d20f2c4673e73c865e8ce1a308baa012f4d07fe) 中 `kfifo_put` 被加入,但此時(直至今日)的作用是加入單一個 value 到 buffer 中
也因此註解也許可能產生誤解? 得注意你拿到使用 `kfifo_put` 的程式碼到底原先是跑在哪個版上 kernel 再決定是否修改為 `kfifo_in_locked` (雖然新舊的 `kfifo_put` 參數數量不同,其實編譯器就能檢查出 API 的使用錯誤,不太可能誤用就是 XD)
### Race bug
關注 [7e10505](https://github.com/torvalds/linux/commit/7e105057a34c83cea542dacc55ff0528bce67afa#diff-2ba13228f60fece53a31b0769d20f2c4673e73c865e8ce1a308baa012f4d07fe) 所進行的更動,`kfifo_out_locked` 原始的設計中做了某種 「優化」:
```cpp=
static inline void kfifo_reset(struct kfifo *fifo)
{
fifo->in = fifo->out = 0;
}
static inline __must_check unsigned int kfifo_out_locked(struct kfifo *fifo,
unsigned char *to, unsigned int n, spinlock_t *lock)
{
unsigned long flags;
unsigned int ret;
spin_lock_irqsave(lock, flags);
ret = kfifo_out(fifo, to, n);
/*
* optimization: if the FIFO is empty, set the indices to 0
* so we don't wrap the next time
*/
if (kfifo_is_empty(fifo))
kfifo_reset(fifo);
spin_unlock_irqrestore(lock, flags);
return ret;
}
```
第 16 到 21 行中,如果 buffer 為空的話,就把 kfifo 結構體給重新初始化(`in` 和 `out` 重設成 0)。
:::warning
我尚不清楚這個「優化」實際的幫助在哪,讓 `in` / `out` wrapping add 應該也不會造成額外的負擔(嗎)?
:::
但是這會有甚麼問題呢? 我們先考慮 `kfifo_reset` 的危險之處,kfifo 的程式註解中說道:
> Note about locking: There is no locking required until only one reader and one writer is using the fifo and no `kfifo_reset()` will be called.
因為 kfifo 被預期在單讀單寫(SPSC) 是可以 lock-less 使用的,其前提是建立於 `in` 和 `out` 在某個 thread 中只能修改其中之一(視其為生產者或消費者而定)。但假如有一個 thread 可以同時修改兩者呢? 假設 thread A 呼叫 `kfifo_reset`
1. 先將 `fifo->out` 設為 0
2. 再將 `fifo->in` 設為 0
如果第一步做完時,context switch 發生,切換到 thread B,此時 `fifo->in` 還沒被設為 0,因此透過 `fifo->out` 和 `fifo->in` 判斷 buffer 的滿或空狀態,或者計算尚有多少可用空間時都會出現錯誤,也就可能發生存取到錯誤的 slot,計算出錯誤的 buffer 可生產數量/可消費數量等非預期的行為,萬惡之源就來自這短短的一行程式碼!
不過,回到原本的 `kfifo_out_locked`,這段程式碼中看似有用 `spin_lock_irqsave` 去保護 `fifo` 結構的存取,這樣難到不足以解決 `kfifo_reset` 的問題嗎? 參照其中的另一段註解:
> For multiple writer and one reader there is only a need to lock the writer. And vice versa for only one writer and multiple reader there is only a need to lock the reader.
在單寫多讀的情境下,kfifo 預期可以只對 reader 端進行上鎖,不過若此優化仍存在的話,就必須也將 writer 端上鎖了,我認為這應該是必須移除這一段程式的關鍵原因。
:::danger
不知道是否有其他疏漏的考量,如果讀寫兩端都上鎖的話還是有 race 的可能性嗎?
:::