---
title: 2024 年 Linux 核心設計/實作課程作業 —— integration (C)
description: 引導學員開發 Linux 核心模組,預期知悉核心模組如何掛載進入 Linux 核心、Virtual File System (VFS) 概況、character device driver 撰寫,和準備對應的測試及效能分析工具
tags: linux2024
---
# M06: integration
:::warning
:warning: 請務必詳閱作業描述 [(一)](/@sysprog/linux2024-integration-a), [(二)](/@sysprog/linux2024-integration-b), [(三)](/@sysprog/linux2024-integration-c) 及 [(四)](/@sysprog/linux2024-integration-d)
:::
> 主講人: [jserv](https://wiki.csie.ncku.edu.tw/User/jserv) / 課程討論區: [2024 年系統軟體課程](https://www.facebook.com/groups/system.software2024/)
:mega: 返回「[Linux 核心設計/實作](https://wiki.csie.ncku.edu.tw/linux/schedule)」課程進度表
## Concurrency Managed Workqueue
> [CMWQ](https://www.kernel.org/doc/html/latest/core-api/workqueue.html)
Linux 核心提供 workqueue 介面以達成非同步 (asynchronous) 的流程執行,使用 `struct list_head` 將要處理的每一個任務連接,使用者以一個 "work item" 來描述需被執行的 context,將其加入到所建立佇列之中,Linux 核心隨後會選出合適的 "worker" thread 來完成,處理結束者會從佇列中刪除,目的是要簡化執行緒的建立,可根據目前系統的 CPU 個數建立執行緒,使得執行緒得以並行。
在 [CMWQ](https://www.kernel.org/doc/html/latest/core-api/workqueue.html) 之前的 workqueue 實作中,worker thread 的數量是與被建立的 workqueue 數量直接掛勾的。這意味著使用者每建立一個新的 workqueue,系統上的 worker thread 總量便會增加。實作中包含了兩種 worker thread 的建立方式:
* multi threaded (MT) workqueue: 在每個 CPU 上會有一個 worker thread,work item 可以配置不同的 CPU 上執行
* single threaded (ST) workqueue: 僅以單一的 worker thread 來完成系統上所有的 work item
在 MT workqueue 的部份,這代表系統需要建立與 CPU 數量乘上 workqueue 總量等價的 worker thread,這是很嚴重的問題。畢竟隨著 Linux 的演進,一方面 workqueue 的使用者逐漸增加,而現代電腦的 CPU 核心數量同時也不斷增加。這代表一些系統在剛啟動 (booting) 時,其 32k 的 PID 空間就會被 worker thread 的行程給佔滿。
資源限制和並行等級之間存在取捨關係,ST workqueue 是為整個系統提供唯一的 context,因此所有的 work 都是逐一排隊執行,造成並行等級不佳,雖然對資源的使用量比較低。而 MT workqueue 為每個 CPU 提供一個 context,代價是消耗大量資源。但即便如此它提供的 concurrency 效果卻不盡如人意。一個關鍵的原因是 MT workqueue 的 worker thread 的數目是與 CPU 數量嚴格綁定的,且 worker thread 的 work item 也不能互相轉移。另一方面,傳統 workqueue 也容易出現 deadlock 問題。
總結來說,使用 ST workqueue 得到的並行等級較低,但較省資源。然而使用 MT workqueue 固然有嚴重的消耗資源,能提高的並行等級卻很有限。這樣不對等的 trade off 讓過去的 workqueue 並不是那麼好用。
CMWQ 就在此背景下誕生,並專注在以下目標:
* 維持與原始 workqueue API 的相容性
* 捨棄各個 workqueue 獨立對應一組 worker-pools 的作法。取而代之,所有 workqueue 會共享 per-CPU worker pools,並按需提供靈活的並行等級,避免大量的資源耗損
* worker pool 和並行等級間的調整由系統內部處理,對使用者來說可以視為一個黑盒子
CMWQ 架構圖:

> worker pool 管理每個 worker
> workqueue 連接所有的 work 所形成的佇列
### 設計方式
在 CMWQ 中,每一項函式流程的執行被抽象為一個 work item (`work_struct`)。
```c
typedef void (*work_func_t)(struct work_struct *work);
struct work_struct {
atomic_long_t data;
struct list_head entry;
work_func_t func;
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
#endif
};
```
work item 中包含一個 `func` 成員,是所要被執行的函式指標。當 Linux 裝置驅動程式或子系統想要非同步的方式執行某個函式,它就必須建立 `work_struct` 並設定 `func` 以指向要運行的函式,並將其加入到 workqueue 中。
worker thread 負責從佇列中逐一取出 work item 並執行其中的 `func`。若沒有 work item 在排隊,則 worker thread 就會處於 idle 狀態。在 worker thread 之上對其管理的結構稱為 worker-pools。
設計上,CMWQ 中對於每個 CPU 會有兩個 worker-pools。一個針對普通的 work,另一個則針對高優先級的 work。此外還有一些未綁定 CPU 的額外 worker-pools,這些 pools 的數量則是動態的。使用者通過 workqueue API 建立 work item 並進行排隊,並且允許設定旗標來影響部份 work item 的執行方式。例如 CPU locality、並行的限制、優先等級等。
當將 work item 排隊到 workqueue 時,會根據佇列參數和 workqueue 的屬性決定目標的 worker-pools,比如可以設定 workqueue 中的 work item 僅由指定 CPU 之普通或高優先級的 worker-pools 來執行。
對於限定 CPU 的 worker-pools,其並行管理的方式是將自身與 CPU 排程器掛勾。則每當任一 worker 被喚醒或是進入睡眠時,worker-pools 會收到通知,藉此來跟踪目前可運行的 worker 數量。一般而言,work item 不太會佔用 CPU 資源,因此設計上傾向使用最少數量的 worker thread 但避免 bandwidth 的損失。實作上,只要該 CPU 上有一個或多個 runnable worker thread,worker-pools 就暫時不會執行新的 work。一直到最後一個正在運行的 worker thread 進入睡眠狀態時,才立即排程一個新的 worker thread,這樣 CPU 就不會在仍有尚未處理的 work item 時無所事事,但也不至於過度的建立大量 worker thread。
worker-pools 類型:
1. Bound 類型:綁定特定的 CPU,使管理的 worker 執行在指定的 CPU 上執行,而每個 CPU 中會有二個 worker-pools 一個為高優先級的,另一個給普通優先級的,藉由不同的旗標影響 workqueue 的執行優先度
2. Unbound 類型:thread pool 用於處理不綁定特定 CPU,其 thread pool 是動態變化,藉由設定 workqueue 的屬性建立對應的 worker-pools
另一方面,除了 kthreads 的記憶體空間之外,保留 idle 的 worker thread 不會消耗其他資源,因此 CWMQ 會保留一個 idle 的 worker thread 一段時間。這樣若隨即有 work 要處理則可以直接沿用,不必再重新建立。
若使用不限定 CPU 的 workqueue,使用者可以藉由對應 API 來為此設定自定義的屬性,則 workqueue 將自動建立與屬性對應的 worker-pools。此種 workqueue 對並行程度的調整將由使用者定義。此外,若想讓綁定 CPU 的 workqueue 也有此特性,API 也提供相應的旗標以忽略並行的管理。
### CMWQ 優勢
1. 拆開 workqueue 與 worker-pools,可單純地將 work 放入佇列中而不必理會如何配置 worker 去執行,根據設定的旗標決定如何配置,適時的做切換,減少 worker 的 idle 情況,讓系統使用率提升
2. 若任務長時間佔用系統資源 (或有 blocking 的情況產生),CMWQ 會動態建立新的執行緒並配置給其他的 CPU 執行,避免過多的執行緒產生
3. 使不同的任務之間能被更彈性的執行 (所有的 workqueue 共享),會根據不同的優先級執行
### 將 `create_workqueue` 改以 `alloc_workqueue`
引入 `alloc_workqueue` 來自這項 2010 年的 [commit](https://github.com/torvalds/linux/commit/d320c03830b17af64e4547075003b1eeb274bc6c) 中提到 workqueue 有很多的功能存在,皆是利用參數去設定,並用旗標去指定功能(如:`WQ_UNBOUND` 和 `WQ_DFL_ACTIVE`)
> Now that workqueue is more featureful, there should be a public workqueue creation function which takes paramters to control them. Rename __create_workqueue() to alloc_workqueue() and make 0 max_active mean WQ_DFL_ACTIVE. In the long run, all create_workqueue_*() will be converted over to alloc_workqueue().
```diff
#define create_workqueue(name) \
- __create_workqueue((name), WQ_RESCUER, 1)
+ alloc_workqueue((name), WQ_RESCUER, 1)
```
可以看到在這則之前的 [commit](https://github.com/torvalds/linux/commit/0d557dc97f4bb501f086a03d0f00b99a7855d794) 建立一個 real time 的 `workqueue` 在 `__create_workqueue` 巨集中要更改到整個函式的 prototype,而使用到此巨集的函式皆要跟著改動所有的函式宣告配合巨集,在新增功能的時候會花費大量時間去改動相關的函式,舉例:
```diff
-#define __create_workqueue(name, singlethread, freezeable) \
+#define __create_workqueue(name, singlethread, freezeable, rt) \
-#define create_workqueue(name) __create_workqueue((name), 0, 0)
+#define create_workqueue(name) __create_workqueue((name), 0, 0, 0)
```
更改後相關的 `workqueue` 實作以新增巨集或是修改巨集為主要方向,在未來有更多的想法上都是藉由這樣的方式,如:
[workqueue: remove WQ_SINGLE_CPU and use WQ_UNBOUND instead](https://github.com/torvalds/linux/commit/c7fc77f78f16d138ca997ce096a62f46e2e9420a)
```diff
- WQ_SINGLE_CPU = 1 << 1, /* only single cpu at a time */
+ WQ_UNBOUND = 1 << 1, /* not bound to any cpu */
- WQ_UNBOUND = 1 << 6, /* not bound to any cpu */
```
## Linux 核心的並行處理
[simrupt](https://github.com/sysprog21/simrupt) 專案名稱由 simulate 和 interrupt 二個單字組合而來,其作用是模擬 [IRQ 事件](https://www.kernel.org/doc/html/latest/core-api/genericirq.html),並展示以下 Linux 核心機制的運用:
- irq
- softirq
- tasklet
- workqueue
- kernel thread
- [kfifo](https://www.kernel.org/doc/htmldocs/kernel-api/kfifo.html)
- [memory barrier](https://www.kernel.org/doc/Documentation/memory-barriers.txt)
### kfifo
[kfifo](https://archive.kernel.org/oldlinux/htmldocs/kernel-api/kfifo.html) 是 Linux 核心裡頭 First-In-First-Out (FIFO) 的結構,在 Single Producer Single Consumer (SPSC) 情況中是 safe 的,即不需要額外的 lock 維護,在程式碼中註解中也有提及。
在此專案中有一個 kfifo 資料結構 rx_fifo,用來儲存即將傳到 userspace 的 data。
```c
/* Data are stored into a kfifo buffer before passing them to the userspace */
static struct kfifo rx_fifo;
/* NOTE: the usage of kfifo is safe (no need for extra locking), until there is
* only one concurrent reader and one concurrent writer. Writes are serialized
* from the interrupt context, readers are serialized using this mutex.
*/
static DEFINE_MUTEX(read_lock);
```
將 Data 插入到 rx_fifo 中,並檢查寫入的長度與避免過度輸出日誌而影響效能,之所以對 len 進行檢查的原因在於 kfifo_in 所回傳之值,是實際成功插入的數量。
```c
/* Insert a value into the kfifo buffer */
static void produce_data(unsigned char val)
{
/* Implement a kind of circular FIFO here (skip oldest element if kfifo
* buffer is full).
*/
unsigned int len = kfifo_in(&rx_fifo, &val, sizeof(val));
if (unlikely(len < sizeof(val)) && printk_ratelimit())
pr_warn("%s: %zu bytes dropped\n", __func__, sizeof(val) - len);
pr_debug("simrupt: %s: in %u/%u bytes\n", __func__, len,
kfifo_len(&rx_fifo));
}
```
`kfifo_in(fifo, buf, n);`
- 複製 buf 資料並放到 fifo 中,最後會回傳插入的資料大小。
- 在程式碼中,會先確認要放入的大小是否大於剩餘的大小,若超過,則會以剩餘大小為主。
```c
unsigned int __kfifo_in(struct __kfifo *fifo,
const void *buf, unsigned int len)
{
unsigned int l;
l = kfifo_unused(fifo);
if (len > l)
len = l;
kfifo_copy_in(fifo, buf, len, fifo->in);
fifo->in += len;
return len;
}
```
`kfifo_to_user(fifo, to, len, copied);`
- 將最多 len 個 bytes 資料從 fifo 移到 userspace。
`kfifo_alloc(fifo, size, gfp_mask);`
- 動態配置一個 fifo buffer,配置成功會回傳 0。若要釋放 fifo 可藉由 `kfifo_free(fifo);` 實現。
### fast circular buffer
[Circular Buffer](https://www.kernel.org/doc/Documentation/core-api/circular-buffers.rst) 是個固定大小的緩衝區,其中具有 2 個 indicies:
- `head index`: the point at which the producer inserts items into the buffer.
- `tail index`: the point at which the consumer finds the next item in the buffer.
當 head 和 tail 重疊時,代表目前是空的緩衝區。相反的,當 head 比 tail 少 1 時,代表緩衝區是滿的。
當有項目被添加時,head index 會增加,當有項目被移除時,tail index 會被增加,tail 不會超過 head,且當兩者都到達緩衝區的末端時,都必須被設定回 0。也可以藉由此方法清除緩衝區中的資料。
```c
{
...
/* Allocate fast circular buffer */
fast_buf.buf = vmalloc(PAGE_SIZE);
...
/* Clear all data from the circular buffer fast_buf */
fast_buf.head = fast_buf.tail = 0;
}
```
Measuring power-of-2 buffers: 讓緩衝區大小維持 2 的冪,就可以使用 bitwise 操作去計算緩衝區空間,避免使用較慢的 modulus (divide) 操作。
- [ ] [include/linux/circ_buf.h](https://elixir.bootlin.com/linux/latest/source/include/linux/circ_buf.h)
```c
struct circ_buf {
char *buf;
int head;
int tail;
};
/* Return count in buffer. */
#define CIRC_CNT(head,tail,size) (((head) - (tail)) & ((size)-1))
/* Return space available, 0..size-1. We always leave one free char
* as a completely full buffer has head == tail, which is the same as
* empty.
*/
#define CIRC_SPACE(head,tail,size) CIRC_CNT((tail),((head)+1),(size))
/* Return count up to the end of the buffer. Carefully avoid
* accessing head and tail more than once, so they can change
* underneath us without returning inconsistent results.
*/
#define CIRC_CNT_TO_END(head,tail,size) \
({int end = (size) - (tail); \
int n = ((head) + end) & ((size)-1); \
n < end ? n : end;})
/* Return space available up to the end of the buffer. */
#define CIRC_SPACE_TO_END(head,tail,size) \
({int end = (size) - 1 - (head); \
int n = (end + (tail)) & ((size)-1); \
n <= end ? n : end+1;})
```
`CIRC_SPACE*()` 被 producer 使用,`CIRC_CNT*()` 是 consumer 所用。
在 simrupt 中,一個「更快」的 circular buffer 被拿來儲存即將要放到 kfifo 的資料。
```c
/* We use an additional "faster" circular buffer to quickly store data from
* interrupt context, before adding them to the kfifo.
*/
static struct circ_buf fast_buf;
```
`READ_ONCE()` 是個 relaxed-ordering 且保證 atomic 的 memory operation,可以確保在多執行緒環境中,讀取到的值是正確的,並保證讀寫操作不會被編譯器最佳化所影響。
`smp_rmb()` 是個 memory barrier,會防止記憶體讀取指令的重排,確保先讀取索引值後再讀取內容。在〈[Lockless patterns: relaxed access and partial memory barriers](https://lwn.net/Articles/846700/)〉提到 `smp_rmb()` 與 `smp_wmb()` 的 barrier 效果比 `smp_load_acquire()` 與 `smp_store_release()` 還要來的差,但是因為 load-store 之間的排序關係很少有影響,所以開發人員常以 `smp_rmb()` 和 `smp_wmb()` 作為 memory barrier。
`fast_buf_get` 扮演一個 consumer 的角色,會從緩衝區中取得資料,並更新 tail index。
```c
static int fast_buf_get(void)
{
struct circ_buf *ring = &fast_buf;
/* prevent the compiler from merging or refetching accesses for tail */
unsigned long head = READ_ONCE(ring->head), tail = ring->tail;
int ret;
if (unlikely(!CIRC_CNT(head, tail, PAGE_SIZE)))
return -ENOENT;
/* read index before reading contents at that index */
smp_rmb();
/* extract item from the buffer */
ret = ring->buf[tail];
/* finish reading descriptor before incrementing tail */
smp_mb();
/* increment the tail pointer */
ring->tail = (tail + 1) & (PAGE_SIZE - 1);
return ret;
}
```
fast_buf_put 扮演一個 producer 的角色,藉由 `CIRC_SPACE()` 判斷 buffer 中是否有剩餘空間,並更新 head index。
```c
static int fast_buf_put(unsigned char val)
{
struct circ_buf *ring = &fast_buf;
unsigned long head = ring->head;
/* prevent the compiler from merging or refetching accesses for tail */
unsigned long tail = READ_ONCE(ring->tail);
/* is circular buffer full? */
if (unlikely(!CIRC_SPACE(head, tail, PAGE_SIZE)))
return -ENOMEM;
ring->buf[ring->head] = val;
/* commit the item before incrementing the head */
smp_wmb();
/* update header pointer */
ring->head = (ring->head + 1) & (PAGE_SIZE - 1);
return 0;
}
```
process_data 函式呼叫 `fast_buf_put(update_simrupt_data());` ,其中 `update_simrupt_data()` 會產生資料,這些資料的範圍在 `0x20` 到 `0x7E` 之間,即 ASCII 中的可顯示字元,這些資料會被放入 circular buffer 中,最後交由 `tasklet_schedule` 進行排程。
```c
static void process_data(void)
{
WARN_ON_ONCE(!irqs_disabled());
pr_info("simrupt: [CPU#%d] produce data\n", smp_processor_id());
fast_buf_put(update_simrupt_data());
pr_info("simrupt: [CPU#%d] scheduling tasklet\n", smp_processor_id());
tasklet_schedule(&simrupt_tasklet);
}
```
### tasklet
tasklet 是基於 softirq 之上建立的,但最大的差別在於 tasklet 可以動態配置且可以被用在驅動裝置上。
tasklet 可以被 workqueues, timers 或 threaded interrupts 取代,但 kernel 中尚有使用 tasklet 的情況,Linux 核心開發者已著手 API 變更,而 `DECLARE_TASKLET_OLD` 的存在是顧及相容性。
[Modernizing the tasklet API](https://lwn.net/Articles/830964/)
```c
#define DECLARE_TASKLET_OLD(arg1, arg2) DECLARE_TASKLET(arg1, arg2, 0L)
```
首先會先確保函式在 interrupt context 和 softirq context 中執行,使用 queue_work 將 work 放入 workqueue 中,並記錄執行時間。
```c
/**
* queue_work - queue work on a workqueue
* @wq: workqueue to use
* @work: work to queue */
static inline bool queue_work(struct workqueue_struct *wq,
struct work_struct *work)
{
return queue_work_on(WORK_CPU_UNBOUND, wq, work);
}
```
```c
/* Tasklet handler.
*
* NOTE: different tasklets can run concurrently on different processors, but
* two of the same type of tasklet cannot run simultaneously. Moreover, a
* tasklet always runs on the same CPU that schedules it.
*/
static void simrupt_tasklet_func(unsigned long __data)
{
ktime_t tv_start, tv_end;
s64 nsecs;
WARN_ON_ONCE(!in_interrupt());
WARN_ON_ONCE(!in_softirq());
tv_start = ktime_get();
queue_work(simrupt_workqueue, &work);
tv_end = ktime_get();
nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start));
pr_info("simrupt: [CPU#%d] %s in_softirq: %llu usec\n", smp_processor_id(),
__func__, (unsigned long long) nsecs >> 10);
}
/* Tasklet for asynchronous bottom-half processing in softirq context */
static DECLARE_TASKLET_OLD(simrupt_tasklet, simrupt_tasklet_func);
```
藉由上述註解可以得知:
- 不同 tasklet 可以在不同 CPU 同時執行
- 相同 tasklet 不能同時執行
- 一個 tasklet 只會在排程它的 CPU 上執行
| | softirq | tasklet |
|:------------------------:|:-------:|:-------:|
| 多個在同一個 CPU 執行? | No | No |
| 相同的可在不同 CPU 執行? | Yes | No |
| 會在同個 CPU 執行? | Yes | Maybe |
當 `tasklet_schedule()`被呼叫時,代表此 tasklet 被允許在 CPU 上執行,詳見 [linux/include/linux/interrupt.h](https://github.com/torvalds/linux/blob/master/include/linux/interrupt.h)
### workqueue
[linux/include/linux/workqueue.h](https://elixir.bootlin.com/linux/latest/source/include/linux/workqueue.h)
定義兩個 mutex lock: `producer_lock` 和 `consumer_lock`。
```c
/* Mutex to serialize kfifo writers within the workqueue handler */
static DEFINE_MUTEX(producer_lock);
/* Mutex to serialize fast_buf consumers: we can use a mutex because consumers
* run in workqueue handler (kernel thread context).
*/
static DEFINE_MUTEX(consumer_lock);
```
`get_cpu()` 獲取目前 CPU 編號並 disable preemption,最後需要 `put_cpu()` 重新 enable preemption。
24-26行使用 `mutex_lock(&consumer_lock)` 鎖住消費者區域,防止其它的任務取得 circular buffer 的資料。
32-34行使用 `mutex_lock(&producer_lock)` 鎖住生產者區域,防止其它的任務寫入 kfifo buffer。
`wake_up_interruptible(&rx_wait)` 會換醒 wait queue 上的行程,將其狀態設置為 TASK_RUNNING。
```c=
/* Wait queue to implement blocking I/O from userspace */
static DECLARE_WAIT_QUEUE_HEAD(rx_wait);
/* Workqueue handler: executed by a kernel thread */
static void simrupt_work_func(struct work_struct *w)
{
int val, cpu;
/* This code runs from a kernel thread, so softirqs and hard-irqs must
* be enabled.
*/
WARN_ON_ONCE(in_softirq());
WARN_ON_ONCE(in_interrupt());
/* Pretend to simulate access to per-CPU data, disabling preemption
* during the pr_info().
*/
cpu = get_cpu();
pr_info("simrupt: [CPU#%d] %s\n", cpu, __func__);
put_cpu();
while (1) {
/* Consume data from the circular buffer */
mutex_lock(&consumer_lock);
val = fast_buf_get();
mutex_unlock(&consumer_lock);
if (val < 0)
break;
/* Store data to the kfifo buffer */
mutex_lock(&producer_lock);
produce_data(val);
mutex_unlock(&producer_lock);
}
wake_up_interruptible(&rx_wait);
}
```
在 workqueue 中執行的 work,可由以下方式定義。
- `DECLARE_WORK(name, void (*func) (void *), void *data)` 會在編譯時,靜態地初始化 work。
- `INIT_WORK(struct work_struct *work, woid(*func) (void *), void *data)` 在執行時,動態地初始化一個 work。
```c
/* Workqueue for asynchronous bottom-half processing */
static struct workqueue_struct *simrupt_workqueue;
/* Work item: holds a pointer to the function that is going to be executed
* asynchronously.
*/
static DECLARE_WORK(work, simrupt_work_func);
```
### timer
藉由 `timer_setup()` 初始化 timer。
```c
/* Setup the timer */
timer_setup(&timer, timer_handler, 0);
void timer_setup(struct timer_list * timer,
void (*function)(struct timer_list *),
unsigned int flags);
```
目標是模擬 hard-irq,所以必須確保目前是在 softirq context,欲模擬在 interrupt context 中處理中斷,所以針對該 CPU disable interrupts。
```c
/* Timer to simulate a periodic IRQ */
static struct timer_list timer;
static void timer_handler(struct timer_list *__timer)
{
ktime_t tv_start, tv_end;
s64 nsecs;
pr_info("simrupt: [CPU#%d] enter %s\n", smp_processor_id(), __func__);
/* We are using a kernel timer to simulate a hard-irq, so we must expect
* to be in softirq context here.
*/
WARN_ON_ONCE(!in_softirq());
/* Disable interrupts for this CPU to simulate real interrupt context */
local_irq_disable();
tv_start = ktime_get();
process_data();
tv_end = ktime_get();
nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start));
pr_info("simrupt: [CPU#%d] %s in_irq: %llu usec\n", smp_processor_id(),
__func__, (unsigned long long) nsecs >> 10);
mod_timer(&timer, jiffies + msecs_to_jiffies(delay));
local_irq_enable();
}
```
使用 mod_timer 對 timer 進行排程。
[Jiffy](https://en.wikipedia.org/wiki/Jiffy_(time)) 表示不具體的非常短暫的時間段,可藉由以下公式進行轉換。
```
jiffies_value = seconds_value * HZ;
seconds_value = jiffies_value / HZ;
```
### simrupt_init
該函式進行許多資料結構的初始化:
1. 配置給 kfifo 一個 PAGE_SIZE 大小的空間
```c
kfifo_alloc(&rx_fifo, PAGE_SIZE, GFP_KERNEL)
```
2. 配置給 circular buffer 一個 PAGE_SIZE 大小的空間
```c
fast_buf.buf = vmalloc(PAGE_SIZE);
```
3. 為 simrupt 註冊一個設備號,並且加入到系統中
```c
ret = alloc_chrdev_region(&dev_id, 0, NR_SIMRUPT, DEV_NAME);
...
cdev_init(&simrupt_cdev, &simrupt_fops);
ret = cdev_add(&simrupt_cdev, dev_id, NR_SIMRUPT);
```
4. 註冊設備到 sysfs,即可允許使用者空間運用裝置檔案 `/dev/simrupt` 來存取和控制該設備
```c
device_create(simrupt_class, NULL, MKDEV(major, 0), NULL, DEV_NAME);
```
5. 配置一個新的 workqueue
```c
simrupt_workqueue = alloc_workqueue("simruptd", WQ_UNBOUND, WQ_MAX_ACTIVE);
```
6. 設定 timer
```c
timer_setup(&timer, timer_handler, 0);
```
### simrupt 流程圖
```graphviz
digraph simrupt {
node [shape = box]
rankdir = TD
timer_handler [label="timer_handler"]
process_data [label = "process_data"]
update_simrupt_data [label="update_simrupt_data"]
fast_circular_buffer [label="fast_circular_buffer", shape=ellipse]
simrupt_tasklet_func [label = "simrupt_tasklet_func"]
simrupt_work_func [label = "simrupt_work_func"]
kfifo [label = "kfifo", shape=ellipse]
timer_handler -> process_data
process_data -> update_simrupt_data
update_simrupt_data -> fast_circular_buffer [label = " fast_buf_put"]
process_data -> simrupt_tasklet_func [label = " tasklet_schedule"]
simrupt_tasklet_func -> simrupt_work_func [label = " queue_work"]
fast_circular_buffer -> simrupt_work_func [label = "fast_buf_get"]
simrupt_work_func -> kfifo [label = " produce_data"]
{rank=same update_simrupt_data simrupt_tasklet_func}
}
```
### simrupt 的使用
掛載核心模組。
```shell
$ sudo insmod simrupt.ko
```
掛載後,會產生一個裝置檔案`/dev/simrupt`,藉由以下命令可見到輸出的資料。
```shell
$ sudo cat /dev/simrupt
```
參考輸出: (可能會有異)
```shell
!"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefghijklmnopqrstuvwxyz{|}~
```
`dmesg` 顯示核心訊息,加入 `--follow` 可即時查看。
```shell
sudo dmesg --follow
```
參考輸出:
```shell
[882265.813265] simrupt: [CPU#3] enter timer_handler
[882265.813297] simrupt: [CPU#3] produce data
[882265.813299] simrupt: [CPU#3] scheduling tasklet
[882265.813300] simrupt: [CPU#3] timer_handler in_irq: 2 usec
[882265.813350] simrupt: [CPU#3] simrupt_tasklet_func in_softirq: 0 usec
[882265.813383] simrupt: [CPU#5] simrupt_work_func
```
## Linux 核心的 kfifo
[kfifo](https://archive.kernel.org/oldlinux/htmldocs/kernel-api/kfifo.html) 是一個 [Circular buffer](https://en.wikipedia.org/wiki/Circular_buffer) 的資料結構,而 ring-buffer 就是參考 kfifo 所實作。
kfifo 適合的使用情境,可以在 [linux/kfifo.h](https://github.com/torvalds/linux/blob/master/include/linux/kfifo.h) 中看到:
```c
/*
* Note about locking: There is no locking required until only one reader
* and one writer is using the fifo and no kfifo_reset() will be called.
* kfifo_reset_out() can be safely used, until it will be only called
* in the reader thread.
* For multiple writer and one reader there is only a need to lock the writer.
* And vice versa for only one writer and multiple reader there is only a need
* to lock the reader.
*/
```
選定 [linux/samples/kfifo/](https://github.com/torvalds/linux/tree/master/samples/kfifo) 作為應用案例,並參考 [kfifo-examples](https://github.com/sysprog21/kfifo-examples) 進行實驗。
- [ ] 應用案例: `record-example.c`
* 定義一個大小為 100 的 char 陣列 buf,用於臨時儲存資料。
- 定義一個 struct hello,包含一個 unsigned char buf,且該 buf 初始化為 "hello"。
* `kfifo_in(&test, &hello, sizeof(hello))` 將 struct hello 寫入 kfifo buffer,並用 `kfifo_peek_len(&test)` 印出 kfifo buffer 下一個 record 的大小。
* for 迴圈裡面,每次會使用 memset 將 buf 的前 i+1 個元素設置為 'a'+i,並用 `kfifo_in(&test, buf, i + 1)` 寫入 kfifo buffer。
* `kfifo_skip(&test)` 跳過 kfifo buffer 的第一個值,即跳過 "hello"。
* `kfifo_out_peek(&test, buf, sizeof(buf)` 會在不刪除元素情況下,印出 kfifo buffer 的第一個元素。
* `kfifo_len(&test)` 印出目前 kfifo buffer 已佔用的空間。
* while 迴圈用 `kfifo_out(&test, buf, sizeof(buf))` 逐一比對 kfifo buffer 中的元素是不是和 excepted_result 中的元素一樣。
```c
static int __init testfunc(void)
{
char buf[100];
unsigned int i;
unsigned int ret;
struct { unsigned char buf[6]; } hello = { "hello" };
printk(KERN_INFO "record fifo test start\n");
kfifo_in(&test, &hello, sizeof(hello));
/* show the size of the next record in the fifo */
printk(KERN_INFO "fifo peek len: %u\n", kfifo_peek_len(&test));
/* put in variable length data */
for (i = 0; i < 10; i++) {
memset(buf, 'a' + i, i + 1);
kfifo_in(&test, buf, i + 1);
}
/* skip first element of the fifo */
printk(KERN_INFO "skip 1st element\n");
kfifo_skip(&test);
printk(KERN_INFO "fifo len: %u\n", kfifo_len(&test));
/* show the first record without removing from the fifo */
ret = kfifo_out_peek(&test, buf, sizeof(buf));
if (ret)
printk(KERN_INFO "%.*s\n", ret, buf);
/* check the correctness of all values in the fifo */
i = 0;
while (!kfifo_is_empty(&test)) {
ret = kfifo_out(&test, buf, sizeof(buf));
buf[ret] = '\0';
printk(KERN_INFO "item = %.*s\n", ret, buf);
if (strcmp(buf, expected_result[i++])) {
printk(KERN_WARNING "value mismatch: test failed\n");
return -EIO;
}
}
if (i != ARRAY_SIZE(expected_result)) {
printk(KERN_WARNING "size mismatch: test failed\n");
return -EIO;
}
printk(KERN_INFO "test passed\n");
return 0;
}
```
掛載核心模組。
```shell
$ sudo insmod record-example.ko
```
利用 dmesg 查看核心訊息
```shell
$ sudo dmesg
[360087.628314] record fifo test start
[360087.628316] fifo peek len: 6
[360087.628317] skip 1st element
[360087.628317] fifo len: 65
[360087.628318] a
[360087.628318] item = a
[360087.628319] item = bb
[360087.628319] item = ccc
[360087.628319] item = dddd
[360087.628319] item = eeeee
[360087.628320] item = ffffff
[360087.628320] item = ggggggg
[360087.628320] item = hhhhhhhh
[360087.628321] item = iiiiiiiii
[360087.628321] item = jjjjjjjjjj
[360087.628321] test passed
```
- [ ] 應用案例: `bytestream-example.c`
* 分別用 `kfifo_in` 與 `kfifo_put` 將字串 "hello" 與數字 0-9 放入 kfifo buffer。
- `kfifo_in`: 可一次將 n Bytes 的 object 放到 kfifo buffer 中。
- `kfifo_put`: 與 `kfifo_in` 相似,只是用來處理要將單一個值放入 kfifo buffer 的情境,若要插入時,buffer 已滿,則會返回 0。
* `kfifo_out` 先將 kfifo buffer 前 5 個值拿出,即 "hello"。
* `kfifo_out` 將 kfifo buffer 前 2 個值 (0、1) 拿出,再用 `kfifo_in` 重新將 0、1 放入 kfifo buffer,並用 `kfifo_skip` 拿出並忽略 buffer 中第一個值。
* 值從 20 開始,逐一將大小為 32B 的 kfifo buffer 填滿。
* 並用 `kfifo_get` 逐一檢查 buffer 內的值是否與 expected_result 中的值一樣,若一樣,則 test passed。
```c
static int __init testfunc(void)
{
unsigned char buf[6];
unsigned char i, j;
unsigned int ret;
printk(KERN_INFO "byte stream fifo test start\n");
/* put string into the fifo */
kfifo_in(&test, "hello", 5);
/* put values into the fifo */
for (i = 0; i != 10; i++)
kfifo_put(&test, i);
/* show the number of used elements */
printk(KERN_INFO "fifo len: %u\n", kfifo_len(&test));
/* get max of 5 bytes from the fifo */
i = kfifo_out(&test, buf, 5);
printk(KERN_INFO "buf: %.*s\n", i, buf);
/* get max of 2 elements from the fifo */
ret = kfifo_out(&test, buf, 2);
printk(KERN_INFO "ret: %d\n", ret);
/* and put it back to the end of the fifo */
ret = kfifo_in(&test, buf, ret);
printk(KERN_INFO "ret: %d\n", ret);
/* skip first element of the fifo */
printk(KERN_INFO "skip 1st element\n");
kfifo_skip(&test);
/* put values into the fifo until is full */
for (i = 20; kfifo_put(&test, i); i++)
;
printk(KERN_INFO "queue len: %u\n", kfifo_len(&test));
/* show the first value without removing from the fifo */
if (kfifo_peek(&test, &i))
printk(KERN_INFO "%d\n", i);
/* check the correctness of all values in the fifo */
j = 0;
while (kfifo_get(&test, &i)) {
printk(KERN_INFO "item = %d\n", i);
if (i != expected_result[j++]) {
printk(KERN_WARNING "value mismatch: test failed\n");
return -EIO;
}
}
if (j != ARRAY_SIZE(expected_result)) {
printk(KERN_WARNING "size mismatch: test failed\n");
return -EIO;
}
printk(KERN_INFO "test passed\n");
return 0;
}
```
掛載核心模組。
```shell
$ sudo insmod bytestream-example.ko
```
利用 dmesg 查看核心訊息
```shell
$ sudo dmesg
[130567.107610] byte stream fifo test start
[130567.107612] fifo len: 15
[130567.107613] buf: hello
[130567.107614] ret: 2
[130567.107614] ret: 2
[130567.107614] skip 1st element
[130567.107615] queue len: 32
[130567.107615] 3
[130567.107615] item = 3
[130567.107615] item = 4
[130567.107616] item = 5
[130567.107616] item = 6
[130567.107616] item = 7
[130567.107616] item = 8
[130567.107617] item = 9
[130567.107617] item = 0
[130567.107617] item = 1
[130567.107617] item = 20
[130567.107618] item = 21
[130567.107618] item = 22
[130567.107618] item = 23
[130567.107618] item = 24
[130567.107619] item = 25
[130567.107619] item = 26
[130567.107619] item = 27
[130567.107619] item = 28
[130567.107619] item = 29
[130567.107620] item = 30
[130567.107620] item = 31
[130567.107620] item = 32
[130567.107620] item = 33
[130567.107621] item = 34
[130567.107621] item = 35
[130567.107621] item = 36
[130567.107621] item = 37
[130567.107622] item = 38
[130567.107622] item = 39
[130567.107622] item = 40
[130567.107622] item = 41
[130567.107623] item = 42
[130567.107623] test passed
```
- [ ] 應用案例: 生產者與消費者
> [producer-consumer.c](https://github.com/brianlin314/kfifo-examples/blob/master/producer-consumer.c)
設計一個 kfifo 的生產者與消費者實驗,包含一個 producer 與一個 consumer,producer 函式每 1 秒會將一個值放入 kfifo 中,並從 1 遞增到 10,而consumer 函式每 2 秒會消耗一個 kfifo 的值。
```c
static int producer(void *data)
{
unsigned char i;
for (i = 1; i <= 10; i++) {
kfifo_put(&test, i);
pr_info("Producer inserted value: %d\n", i);
msleep(1000);
}
kthread_stop(producer_thread);
return 0;
}
static int consumer(void *data)
{
unsigned char j;
unsigned char buf[10];
unsigned int ret;
for (j = 1; j <= 5; j++) {
ret = kfifo_out(&test, buf, 1);
if (ret) {
pr_info("Consumer removed value: %d\n", j);
} else {
pr_info("Consumer failed to remove value from kfifo\n");
}
msleep(2000);
}
kthread_stop(consumer_thread);
return 0;
}
```
在 example_init 中,使用 `kthread_run` 建立兩個 kernel thread,分別是 `producer_thread` 與 `consumer_thread`。
```c
producer_thread = kthread_run(producer, NULL, "producer_thread");
...
consumer_thread = kthread_run(consumer, NULL, "consumer_thread");
```
在 `example_exit` 中,會用 `kfifo_get` 逐一檢查 kfifo 剩餘的值是否與 expected_result 相同。
```c
static void __exit example_exit(void)
{
unsigned char i, j;
/* check the correctness of all values in the fifo */
j = 0;
while (kfifo_get(&test, &i)) {
pr_info("kfifo item = %d\n", i);
if (i != expected_result[j++]) {
pr_warn("value mismatch: test failed\n");
goto error_EIO;
}
}
pr_info("test passed\n");
kfifo_free(&test);
pr_info("kfifo Example Exit\n");
error_EIO:
kfifo_free(&test);
}
```
```shell
$ make check
$ sudo dmesg
[96656.871161] kfifo Example Init
[96656.871280] Producer inserted value: 1
[96656.871364] Consumer removed value: 1
[96657.890006] Producer inserted value: 2
[96658.882042] Consumer removed value: 2
[96658.914025] Producer inserted value: 3
[96659.937999] Producer inserted value: 4
[96660.897975] Consumer removed value: 3
[96660.961976] Producer inserted value: 5
[96661.985950] Producer inserted value: 6
[96662.917915] Consumer removed value: 4
[96663.009917] Producer inserted value: 7
[96664.033895] Producer inserted value: 8
[96664.929874] Consumer removed value: 5
[96665.057866] Producer inserted value: 9
[96666.081860] Producer inserted value: 10
[96801.846529] kfifo item = 6
[96801.846536] kfifo item = 7
[96801.846539] kfifo item = 8
[96801.846540] kfifo item = 9
[96801.846542] kfifo item = 10
[96801.846544] test passed
[96801.846546] kfifo Example Exit
```