# Linux 核心專題: simrupt 研究和應用
> 執行人: pao0626
> [專題解說錄影](https://youtu.be/TxDmvSxKpZM)
### Reviewed by `yu-hsiennn`
workqueue 實驗設計的程式碼重複張貼?
> 感謝提醒,已更新。by pao0626
### Reviewed by `marvin0102`
> 我只擷取 simrupt 專案中有使用到的 HI, TASKLET 和 TASKLET 這三種 softirq 的使用數量。
TASKLET 似乎重複出現
> 感謝提醒,已更新。by pao0626
### Reviewed by `ChengChaoChun`
在 simrupt 專案中定時器設置每 100ms 產生一次中斷。我看了你的實驗數據,一次中斷處理過程(top half 加上 bottom half)的時間比定時器中斷產生一次的時間快非常多,因此不會有兩個中斷處理過程同時進行。我想請問在 simrupt_work_func 函式中 val = fast_buf_get() 和 produce_data(val) 如果不使用 mutex lock 是否會影響?
```
[255961.544677] simrupt: [CPU#4] enter timer_handler
[255961.544695] simrupt: [CPU#4] produce data
[255961.544696] simrupt: [CPU#4] scheduling tasklet
[255961.544698] simrupt: [CPU#4] timer_handler in_irq: 3 usec
[255961.544756] simrupt: [CPU#4] simrupt_tasklet_func in_softirq: 0 usec
[255961.544758] tasklet run 54 usec after irq
[255961.544758] simrupt: [CPU#0] simrupt_work_func
[255961.544763] simrupt_work_func execution time: 4989 ns
```
## 任務簡介
[simrupt](https://github.com/sysprog21/simrupt) 專案名稱由 simulate 和 interrupt 二個單字組合而來,其作用是模擬 [IRQ 事件](https://www.kernel.org/doc/html/latest/core-api/genericirq.html),並展示以下 Linux 核心機制的運用:
- irq
- softirq
- tasklet
- workqueue
- kernel thread
- [kfifo](https://www.kernel.org/doc/htmldocs/kernel-api/kfifo.html)
- [memory barrier](https://www.kernel.org/doc/Documentation/memory-barriers.txt)
## TODO: 解釋 [simrupt](https://github.com/sysprog21/simrupt) 運作原理
> 搭配 [Linux Kernel Module Programming Guide](https://sysprog21.github.io/lkmpg/) 和課程教材,逐一解釋 [simrupt](https://github.com/sysprog21/simrupt) 所展示的核心機制,設計對應的實驗來解說。
```graphviz
digraph main {
rankdir = TD;
node [shape = box]
timer [label = "timer_handler"]
pcd [label = "process_data"]
fbp [label = "fast_buf_put"]
usd [label = "update_simrupt_data"]
fcb [label = "fast circular buffer", shape = oval]
stf [label = "simrupt_tasklet_func"]
wq [label = "workqueue|work\n (simrupt_work_func)", shape = record]
fbg [label = "fast_buf_get"]
pdd [label = "produce_data"]
k [label = "kfifo", shape = oval]
r [label = "simrupt_read"]
timer -> pcd
pcd -> fbp
fbp -> usd
fbp -> fcb [style = dotted]
pcd -> stf [label = "tasklet schedule"]
stf -> wq [label = "queue work"]
wq -> fbg
fcb -> fbg [style = dotted]
fbg -> pdd [style = dotted]
wq -> pdd
pdd -> k [label = "kfifo_in", style = dotted]
k -> r [label = "kfifo_to_user", style = dotted]
{rank=same fbp usd}
{rank=same fbg pdd}
{rank=same r timer}
}
```
上面是 simrupt 流程圖,實線表達函式執行順序,虛線表達資料的傳遞。
整體流程透過 `timer_handler` 模擬硬體中斷並呼叫 `process_data` 函式,該函式會做兩件事。
1. 透過 `fast_buf_put(update_simrupt_data())` 將產生的資料寫入 fast circular buffer。
2. 呼叫 `tasklet_schedule` 排程。
其目的是將中斷處理分為 top half 和 bottom half。 top half 快速響應了中斷呼叫,將資料放入 fast circular buffer,而需要花費大量時間將資料放入 kfifo 的過程則為 bottom half,將 bottom half 的任務交給 tasklet 排程,放入 workqueue 等待處理。當 workqueue 中的 work 被喚醒後透過 `simrupt_work_func` 處理時,會使用 `fast_buf_get` 函式取得 `fast circular buffer` 裡的資料,再透過 `produce_data` 函式將資料放入 kfifo 中。後續當使用者讀取該模組的<s>設備</s> 裝置檔案時會呼叫 `simrupt_read` 取出資料。
:::danger
注意用語:
* device 是「裝置」
> 已全部更改
:::
以下逐一解釋 simrupt 所展示的各項核心機制:
### irq
:::danger
不該用「軟中斷」,保留原文 softirq
> 已改進
:::
在 Linux 操作系統中,irq 和 softirq 是核心用於處理異步事件的兩種機制。irq 是由硬體裝置發起的信號,告知處理器需要立即處理某些事件,像是使用者敲擊鍵盤。<s>處理程序</s> 應盡可能避免執行過長時間的操作。
softirq 則用於 interrupt context 中延遲執行某些不太急迫的工作。處理那些不能在硬體中斷處理程序中立即完成的任務。可以被其他中斷 (包括其他 softirq 或 irq) 中斷。
補充:延遲執行的中斷稱為 deferred interrupts,也是上面提到的 bottom half。其中包含了 softirq, tasklet 和 workqueue 三種。
此專案中使用定時器 (timer) 模擬硬體中斷 (hard-irq)。
```c
timer_setup(&timer, timer_handler, 0);
```
:::danger
注意用語:
* macro 是「巨集」,不是「宏」
> 已改進
:::
`timer_setup` <s>宏</s> 巨集初始化一個核心定時器 `timer`。設定了定時器的到期處理函數`timer_handler`,並接收一個標誌參數。
```c
static struct timer_list timer;
static void timer_handler(struct timer_list *__timer)
{
ktime_t tv_start, tv_end;
s64 nsecs;
pr_info("simrupt: [CPU#%d] enter %s\n", smp_processor_id(), __func__);
/* We are using a kernel timer to simulate a hard-irq, so we must expect
* to be in softirq context here.
*/
WARN_ON_ONCE(!in_softirq());
/* Disable interrupts for this CPU to simulate real interrupt context */
local_irq_disable();
tv_start = ktime_get();
process_data();
tv_end = ktime_get();
nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start));
pr_info("simrupt: [CPU#%d] %s in_irq: %llu usec\n", smp_processor_id(),
__func__, (unsigned long long) nsecs >> 10);
mod_timer(&timer, jiffies + msecs_to_jiffies(delay));
local_irq_enable();
}
```
由於是使用核心計時器模擬硬體中斷,所以必須檢查目前程式碼是否在 softirq context 中執行。使用 `local_irq_disable` 函式停用目前 CPU 的中斷來模擬硬體中斷環境。接著呼叫先前提到的 `process_data` 函式將資料放入 fast_buf,並一樣透過 `ktime_get` 計算操作時間。並透過 `mod_timer` 更新定時器(可參考 [LKMPG 第十三章](https://sysprog21.github.io/lkmpg/#replacing-print-macros))。最後使用 `local_irq_enable` 重新啟用中斷。
補充: 全域變數 `jiffies` 保存自系統啟動以來發生的 `ticks` 數。`tick` 是一個基本時間單位,通常對應於系統定時器中斷的一次觸發。而定時器中斷是以固定頻率 `HZ` 發生的。啟動時,核心將變數初始化為零,並在每次定時器中斷期間加一,用於度量時間的流逝。因為一秒內有 `HZ` 次定時器中斷,所以每秒增加 `HZ` 個 `jiffies`,也是 `jiffies_value = seconds_value * HZ` 計算式的由來。
### softirq
:::danger
注意用語:
* thread 是「執行緒」
* command 是「命令」,而非「指令」
> 了解
:::
每個 CPU 上會初始化一個 ksoftirqd 核心<s>線程</s> 執行緒,負責處理各種類型的 softirq 中斷事件。
> 閱讀許多文件後會產生疑問: softirqs 和 tasklets 是在 interrupt context 運行的,但 ksoftirqd 卻又是一個在 process context 中運行的核心執行緒,兩者產生矛盾。參考[網路上相關問答](https://stackoverflow.com/questions/26458730/ksoftirqds-bottom-halves-in-interrupt-or-process-context),ksoftirqd 是在中斷處理過載時被呼叫,也就是當 irq 結束並呼叫 do_IrQ 後卻沒有馬上處理 softirq 時,就會被放至 ksoftirqd 等待 cpu 排程處理,雖然 ksoftirqd 是在 process context 執行。但它會啟用 local interrupts enabled 和 bottom halves disabled,與 interrupt context 環境相同。
可以透過以下<s>指令</s> 命令觀察我的電腦中 16 個 cpu 分別負責的 softirq 執行緒編號。
```shell
$ systemd-cgls -k | grep softirq
├─ 15 [ksoftirqd/0]
├─ 23 [ksoftirqd/1]
├─ 29 [ksoftirqd/2]
├─ 35 [ksoftirqd/3]
├─ 41 [ksoftirqd/4]
├─ 47 [ksoftirqd/5]
├─ 53 [ksoftirqd/6]
├─ 59 [ksoftirqd/7]
├─ 65 [ksoftirqd/8]
├─ 71 [ksoftirqd/9]
├─ 77 [ksoftirqd/10]
├─ 83 [ksoftirqd/11]
├─ 89 [ksoftirqd/12]
├─ 95 [ksoftirqd/13]
├─ 101 [ksoftirqd/14]
├─ 107 [ksoftirqd/15]
```
在 [Linux](https://elixir.bootlin.com/linux/latest/source/kernel/softirq.c) 中定義了 10 種 softirq 如下:
```c
const char * const softirq_to_name[NR_SOFTIRQS] = {
"HI", "TIMER", "NET_TX", "NET_RX", "BLOCK", "BLOCK_IOPOLL",
"TASKLET", "SCHED", "HRTIMER", "RCU"
};
```
在 simrupt 專案中使用了其中的 HI_SOFTIRQ 和 TASKLET_SOFTIRQ 用於 tasklet 的運作(通常不特別初始化時會使用優先權較低的後者),以及 TIMER_SOFTIRQ 用於處理前面提到的 `timer_handler`。
### tasklet
Tasklet 是一種建立在 softirq 之上的機制,其提供了更簡單的介面,並且具有動態配置的能力(可參考 [LKMPG 第十四章](https://sysprog21.github.io/lkmpg/#scheduling-tasks))。
兩者差異可粗略分成 softirq 傾向效能,tasklet 傾向易用性。原因在於同類型的 softirq 可能在不同的 CPU 上並行執行,因此要引入同步機制。但同類型的 tasklet 同時只能在其中一個 cpu 上執行。
在 simrupt 中相關程式碼如下:
```c
static struct workqueue_struct *simrupt_workqueue;
static void simrupt_tasklet_func(unsigned long __data)
{
ktime_t tv_start, tv_end;
s64 nsecs;
WARN_ON_ONCE(!in_interrupt());
WARN_ON_ONCE(!in_softirq());
tv_start = ktime_get();
queue_work(simrupt_workqueue, &work);
tv_end = ktime_get();
nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start));
pr_info("simrupt: [CPU#%d] %s in_softirq: %llu usec\n", smp_processor_id(),
__func__, (unsigned long long) nsecs >> 10);
}
static DECLARE_TASKLET_OLD(simrupt_tasklet, simrupt_tasklet_func);
```
:::danger
「中斷上下文」很難會意,保留原文 interrupt context
> 收到
:::
`simrupt_tasklet_func` 函式使用 `WARN_ON_ONCE` 檢查 `in_interrupt` 和 `in_softirq` (兩者皆透過檢查 preempt_count 來判斷),但卻不用檢查 `irqs_disabled`。是因為只需確保程式碼是在 interrupt context (執行環境由 ISR 控制),但仍需保持可搶佔性供優先級更高的中斷打斷。使用 `ktime_get` 來計算 `queue_work` 函式執行時間。整段程式碼透過 Tasklet 來快速響應中斷,並交由 Workqueue 在<s>進程</s> process context 中實際完成任務,以<s>優化</s> 改善整個系統的回應性和處理效率。`__func__` 會被替換成函式名稱。
:::danger
不要濫用「優化」一詞,參見: https://hackmd.io/@sysprog/it-vocabulary
process 是「行程」,而非「進程」。儘管簡體中文的「進程」寓意是「進行中的程序」,但在中華人民共和國官方的文宣中,也常見「進程」一詞,顯然語境跟作業系統無關。在中國文宣中,「進程」指英語 progress,後者可代表進步、過程、進展,甚至依詞性變化還帶有「步驟」的意思。
> 已更改
:::
不同 tasklet 可以在不同 CPU 同時執行,相同 tasklet 不能同時執行,一個 tasklet 只會在排程它的 CPU 上執行。
### workqueue
workqueue 和 softirq、tasklet 本質上的差異在於前者運行在 process context,滿足有睡眠需求的任務,後者們運行在 interrupt context。觀察以下程式碼檢查 `WARN_ON_ONCE(in_softirq())` 和 `WARN_ON_ONCE(in_interrupt())` 即可得證。
```c
static DEFINE_MUTEX(producer_lock);
static DEFINE_MUTEX(consumer_lock);
static DECLARE_WAIT_QUEUE_HEAD(rx_wait);
static void simrupt_work_func(struct work_struct *w)
{
int val, cpu;
WARN_ON_ONCE(in_softirq());
WARN_ON_ONCE(in_interrupt());
cpu = get_cpu();
pr_info("simrupt: [CPU#%d] %s\n", cpu, __func__);
put_cpu();
while (1) {
mutex_lock(&consumer_lock);
val = fast_buf_get();
mutex_unlock(&consumer_lock);
if (val < 0)
break;
mutex_lock(&producer_lock);
produce_data(val);
mutex_unlock(&producer_lock);
}
wake_up_interruptible(&rx_wait);
}
static DECLARE_WORK(work, simrupt_work_func);
```
`simrupt_work_func` 函式使用 `get_cpu` 和 `put_cpu` 來停用和重新啟用搶佔來取得整確的 cpu id,因為此函式運行在 process context 中,默認允許搶佔。使用 `consumer_lock` 來安全地從循環緩衝區中獲取資料,使用 `producer_lock` 來安全地將<s>數據</s> 資料寫入 kfifo 緩衝區。`wake_up_interruptible(&rx_wait)` 用於喚醒在 rx_wait 等待<s>隊列</s> 佇列上的任何<s>進程</s> 行程。
最後透過 `DECLARE_WORK` 靜態初始化一個工作項目,將 `simrupt_work_func` 函數綁定到這個工作項目上(如果使用 `INIT_WORK` 則是動態初始化)。在未來這個 work 會透過先前提到的 `queue_work` 來將其添加至 `simrupt_workqueue`。
:::danger
注意用語:
* data 是「資料」
* queue 是「佇列」,而非「隊列」
> 已更改
:::
### kfifo
kfifo 是 Linux 核心裡頭提供了 First-In-First-Out 的 Circular buffer 資料結構。在 Single Producer Single Consumer (SPSC) 情況中是 safe 的,即不需要額外的 lock 維護。多生產者需鎖定寫入作業,多消費者需鎖定讀取操作。
在 simrupt 專案中用來儲存即將傳到 userspace 的資料,相關程式碼如下:
```c
static DECLARE_KFIFO_PTR(rx_fifo, unsigned char);
static DECLARE_WAIT_QUEUE_HEAD(rx_wait);
```
宣告了一個 `rx_fifo` 指標指向一個 Kfifo 結構。以及一個為了防止讀取共同資料造成 race condition 的鎖 `rx_wait`。
```c
static void produce_data(unsigned char val)
{
/* Implement a kind of circular FIFO here (skip oldest element if kfifo
* buffer is full).
*/
unsigned int len = kfifo_in(&rx_fifo, &val, sizeof(val));
if (unlikely(len < sizeof(val)) && printk_ratelimit())
pr_warn("%s: %zu bytes dropped\n", __func__, sizeof(val) - len);
pr_debug("simrupt: %s: in %u/%u bytes\n", __func__, len,
kfifo_len(&rx_fifo));
}
```
在 `produce_data` 函式中透過 `kfifo_in(fifo, buf, n)` 函式,複製 buf 資料並放到 fifo 中,並回傳插入的資料大小。透過這個回傳值可以知道寫入的空間不足,打印資訊警告使用者丟棄了某些資料。
```c
static ssize_t simrupt_read(struct file *file,
char __user *buf,
size_t count,
loff_t *ppos)
{
unsigned int read;
int ret;
pr_debug("simrupt: %s(%p, %zd, %lld)\n", __func__, buf, count, *ppos);
if (unlikely(!access_ok(buf, count)))
return -EFAULT;
if (mutex_lock_interruptible(&read_lock))
return -ERESTARTSYS;
do {
ret = kfifo_to_user(&rx_fifo, buf, count, &read);
if (unlikely(ret < 0))
break;
if (read)
break;
if (file->f_flags & O_NONBLOCK) {
ret = -EAGAIN;
break;
}
ret = wait_event_interruptible(rx_wait, kfifo_len(&rx_fifo));
} while (ret == 0);
pr_debug("simrupt: %s: out %u/%u bytes\n", __func__, read,
kfifo_len(&rx_fifo));
mutex_unlock(&read_lock);
return ret ? ret : read;
}
```
:::danger
注意用語:
* character 是「字元」,而非「字符」,繁體中文的「符」出現較少,而共製漢語則濫用該字
* call 是「呼叫」,而非「調用」
:::
在<s>字符設備</s> 字元裝置註冊後,`simrupt_read` 函式使使用者可以使用標準的系统<s>调用</s> 呼叫來操作裝置,該函式透過 `kfifo_to_user(fifo, to, len, copied)` 函式將 len 個 bytes 資料從 fifo 移到 userspace。
這段程式碼先是使用 `mutex_lock_interruptible` 嘗試取得互斥鎖 `read_lock`,在等待鎖的過程中可以被訊號打斷。接著使用 do-while 迴圈從 kfifo 嘗試讀取資料並將其複製到使用者空間,其中包含了多個中止條件判定:
- `if (unlikely(ret < 0))` 判斷 `kfifo_to_user` 返回一個負值,表示資料複製過程中發生了錯誤。
- `if (read)` 判斷 read 變數大於 0,表示已有資料被成功複製到使用者空間,這時將跳出迴圈。
- `if (file->f_flags & O_NONBLOCK)` 則是用於處理當文件是以非阻塞模式 (O_NONBLOCK) 打開,且 kfifo 中沒有足夠的資料可供立即讀取(read 為 0),此時不會進入阻塞等待,而是返回 -EAGAIN 表達現在沒有資料可讀。
:::danger
注意用語!避免看低劣的簡體中文殘句。
避免使用 ChatGPT 的中文翻譯,其品質堪憂。
> 之後會注意並改進
:::
當核心模組無法立即處理一個<s>進程</s> 行程的請求時,`wait_event_interruptible` 將<s>進程</s> 行程加入等待 <s>隊列</s>佇列。`wait_event_interruptible` 在等待事件 (在這個例子中為 kfifo_len(&rx_fifo) 大於 0) 時,允許<s>進程</s> 行程對信號 (如 Ctrl+C) 做出響應,這與 `wait_event` 不同,後者會導致<s>進程</s> 行程完全阻塞,即使收到信號也不會喚醒(這段內容皆可參考 [LKMPG 第十一章](https://sysprog21.github.io/lkmpg/#blocking-processes-and-threads))。
所以 `simrupt_read` 函式有兩種情況可能導致行程進入睡眠:沒搶到互斥鎖以及 kfifo 為空的時候,後者在 kfifo 資料更新時會透過 `wake_up_interruptible` 觸發等待行程重新檢查條件。
最後在 `simrupt_init` 函式初始化模組時透過 `kfifo_alloc(fifo, size, gfp_mask)` 配置 kfifo buffer 一塊 `PAGE_SIZE` 大小的記憶體空間,並在 `simrupt_exit` 函式中止模型時使用 `kfifo_free(fifo)` 函式釋放記憶體空間。
### memory barrier
simrupt 中在 `fast_buf` (fast circular buffer) 中使用了 memory barrier。用於快速在 interrupt context 中儲存要放到 kfifo 的資料。
Circular Buffer 是個固定大小的緩衝區,透過 2 個 indicies: `head` 紀錄 producer 寫入資料的位置, `tail` 紀錄 consumer 讀出資料的位置,因此 `tail` 不會超過 `head`。兩者初始值一致,表示空緩衝區。
緩衝區大小維持 2 的冪,才可以使用 bitwise 操作去計算緩衝區空間,取代較慢的 modulus (divide) 操作。觀察以下於 [Linux](https://elixir.bootlin.com/linux/latest/source/include/linux/circ_buf.h#L16) 定義的巨集:
```c
#define CIRC_CNT(head, tail, size) (((head) - (tail)) & ((size)-1))
```
當 size 是 2 的冪時,可以使用 `& (size-1)` 操作取代對 size 進行模運算。因此 `CIRC_CNT` 巨集會返回緩衝區中目前的資料數量,供 consumer 所用。
```c
#define CIRC_SPACE(head,tail,size) CIRC_CNT((tail),((head)+1),(size))
```
`CIRC_SPACE` 巨集其實是對 `CIRC_CNT` 巨集的複用,將頭尾參數順序顛倒,計算從 `head` 到 `tail` 的空間,返回緩衝區中剩餘的可用空間,供 producer 所用。
需注意的是 `(head)+1` 因為剩餘的可用空間最大只有 `size - 1`,保留一個字元來達到不用來區分滿和空的狀態。
`smp_rmb` 和 `smp_wmb` 作為 memory barrier,防止記憶體讀取指令的重排,確保先讀寫索引值後再讀取寫入內容。觀察以下程式碼:
```c
static int fast_buf_get(void)
{
struct circ_buf *ring = &fast_buf;
/* prevent the compiler from merging or refetching accesses for tail */
unsigned long head = READ_ONCE(ring->head), tail = ring->tail;
int ret;
if (unlikely(!CIRC_CNT(head, tail, PAGE_SIZE)))
return -ENOENT;
/* read index before reading contents at that index */
smp_rmb();
/* extract item from the buffer */
ret = ring->buf[tail];
/* finish reading descriptor before incrementing tail */
smp_mb();
/* increment the tail pointer */
ring->tail = (tail + 1) & (PAGE_SIZE - 1);
return ret;
}
```
這個函式 `fast_buf_get` 是供 consumer 使用,從緩衝區中取得資料,並更新 tail index。因此需要使用 `READ_ONCE` <s>宏</s> 巨集讀取由 producer 更動的 `ring->head` 變數以確保讀取最新的正確值。透過 `smp_rmb()` 設置 read memory barrier 確保在這之前程式碼已讀取 `head` 和 `tail` 索引值,才讀取該索引值的內容。接著使用 `smp_mb` 設置 full memory barrier 確保先前的讀寫操作完成後,再進行寫入操作更新 `ring->tail` 索引值。
```c
static int fast_buf_put(unsigned char val)
{
struct circ_buf *ring = &fast_buf;
unsigned long head = ring->head;
/* prevent the compiler from merging or refetching accesses for tail */
unsigned long tail = READ_ONCE(ring->tail);
/* is circular buffer full? */
if (unlikely(!CIRC_SPACE(head, tail, PAGE_SIZE)))
return -ENOMEM;
ring->buf[ring->head] = val;
/* commit the item before incrementing the head */
smp_wmb();
/* update header pointer */
ring->head = (ring->head + 1) & (PAGE_SIZE - 1);
return 0;
}
```
這個函式 `fast_buf_put` 是供 producer 使用,與上個函式相似,只是`READ_ONCE` 改成確保 `ring->tail` 索引值,以及改成使用 `CIRC_SPACE` 確保是否還有可寫入空間,最後透過 `smp_wmb` 設置 write memory barrier 確保 `val` 寫入緩衝區後才進行寫入操作更新 `ring->head` 索引值。
:::danger
注意用語!
schedule 是「排程」
> 了解
:::
以下程式碼描述在核心中處理資料和<s>調度</s> 排程任務的流程
```c
static void process_data(void)
{
WARN_ON_ONCE(!irqs_disabled());
pr_info("simrupt: [CPU#%d] produce data\n", smp_processor_id());
fast_buf_put(update_simrupt_data());
pr_info("simrupt: [CPU#%d] scheduling tasklet\n", smp_processor_id());
tasklet_schedule(&simrupt_tasklet);
}
```
`WARN_ON_ONCE` 會在其參數表達式為真時發出警告。在這段程式碼中用於檢查 `irqs_disabled()` 是否為回傳 0,以確保在 interrupt context 中呼叫 `process_data` 函數。
`update_simrupt_data` 函式產生一段 ASCII 可顯示字元資料,並透過 `fast_buf_put` 函式將其放入環形緩衝區。
最後透過 `tasklet_schedule` <s>調度</s> 排程核心 `tasklet` 來非同步處理先前放入環形緩衝區的資料。
:::danger
注意用語:
* schedule 是「排程」(針對 process 的「行程」),而非「調度」。
摘自教育部新編國語詞典的「[調度](https://dict.revised.moe.edu.tw/dictView.jsp?ID=46002)」條目:
> 安排配置。如:「調度有方」。《花月痕》第一四回:「走進垂花門,見堂中正亂騰騰的擺設,謖如卻坐在炕上調度。」
「調度」一詞不足以反映作業系統核心的行為,而「排程」則更精準。
:::
### kernel module
我認為也是 simrupt 所展示的核心機制之一,simrupt 本身就是一個模組,在程式碼中透過 `simrupt_init` 和 `simrupt_exit` 進行註冊和註銷。`simrupt_init` 除了將專案中所用到的資料結構初始化,例如使用 vmalloc <s>分配</s> 配置空間給 fast_buf,vmalloc 與 kmalloc 差異在於前者保證連續的虛擬位址,後者則保證連續的實際位址,還透過 `cdev` 介面為 simrupt 註冊一個裝置號,並且加入到系統中。並使用 `class_create` 和 `device_create` 將裝置註冊到 sysfs (可參閱 [LKMPG 第六章](https://sysprog21.github.io/lkmpg/#character-device-drivers)), `class_create` 函式用於建立一個裝置類別 (device class) 的抽象概念。建立後當成參數傳入 `device_create` 函式,在該裝置類別中建立一個在 `/dev` 目錄下具體的裝置節點。
`cdev_add` 和 `device_create` 的差異在於 `cdev_add` 專注於核心如何處理對裝置的操作,確保當使用者空間的應用程式嘗試存取這個裝置時,核心能夠管理和排程這些操作。而 `device_create` 負責在檔案系統中建立一個使用者可以直接存取的節點,讓使用者能夠直接使用標準文件操作來與裝置互動。
### 實驗設計
在執行該專案時先紀錄系統目前資源開銷。
```shell
$ sudo cat /proc/softirqs
CPU0 CPU1 CPU2 CPU3 CPU4 CPU5 CPU6 CPU7 CPU8 CPU9 CPU10 CPU11 CPU12 CPU13 CPU14 CPU15
HI: 24 60395 0 1188439 0 278793 58535 1120762 0 0 0 2930 14 55972 0 5523
TIMER: 787923 572887 453747 458307 463694 621511 463080 499769 223574 109631 123459 81149 73009 74061 69896 1101358
TASKLET: 25 1654 2490 14302 2357 5019 19934 8830 1 0 69 1032 79 0 0 103
```
此命令用來查看 Linux 系統中 softirqs 的目前統計資訊,proc 檔案系統最初被設計用來方便存取有關行程的資訊,因此得名 proc。隨時間推移,它被廣泛應用於報告核心的各種資訊,(可參考 [LKMPG 第七章](https://sysprog21.github.io/lkmpg/#the-proc-file-system))
我只擷取 simrupt 專案中有使用到的 HI, TASKLET 和 TIMER 這三種 softirq 的使用數量。
開始實驗,掛載核心模組。
```shell
$ sudo insmod simrupt.ko
```
掛載後,會產生一個裝置檔案/dev/simrupt (原因可見上面[筆記](#kernel-module)),藉由以下命令可見到輸出的資料。
```shell
$ sudo cat /dev/simrupt
```
參考輸出: (可能會有異)
```shell
!"#$%&'()*+,-./0123456789:;<=>?@AB
```
dmesg 顯示核心訊息,加入 `-t | tail -12` 限制顯示數量
```shell
$ sudo dmesg -t | tail -12
simrupt: [CPU#4] enter timer_handler
simrupt: [CPU#4] produce data
simrupt: [CPU#4] scheduling tasklet
simrupt: [CPU#4] timer_handler in_irq: 4 usec
simrupt: [CPU#4] simrupt_tasklet_func in_softirq: 2 usec
simrupt: [CPU#7] simrupt_work_func
simrupt: [CPU#4] enter timer_handler
simrupt: [CPU#4] produce data
simrupt: [CPU#4] scheduling tasklet
simrupt: [CPU#4] timer_handler in_irq: 4 usec
simrupt: [CPU#4] simrupt_tasklet_func in_softirq: 2 usec
simrupt: [CPU#2] simrupt_work_func
```
可以觀察出 cpu4 處理了計時器的中斷和 tasklet 的排程,而後續 work 則可能在不同的 cpu 上被喚醒執行 `simrupt_work_func` 的處理。
再次觀察實驗後 Linux 系統中 softirqs 的目前統計資訊,可以明顯看出只有 cpu4 的 TASKLET 數量增加,且 HI 都維持不變,印證了我前面介紹 softirq 時的推論。而 Timer 在 Linux 核心中被廣泛使用,較難從這種方法中觀察出明顯使用痕跡。
```shell
$ sudo cat /proc/softirqs
CPU0 CPU1 CPU2 CPU3 CPU4 CPU5 CPU6 CPU7 CPU8 CPU9 CPU10 CPU11 CPU12 CPU13 CPU14 CPU15
HI: 24 60395 0 1188439 0 278793 58974 1134704 0 0 0 2930 14 55972 0 5523
TIMER: 789977 574245 454952 460083 466050 622689 464596 500847 223574 109631 123459 81149 73009 74061 69896 1101358
TASKLET: 25 1654 2490 14302 3447 5019 20181 8830 1 0 69 1032 79 0 0 103
```
我們還可以透過更改 `delay` 變數大小來控制我們模擬中斷的頻率,如同前面所說,simrupt 專案會在中斷時產生資料必依序傳入 `fast_buf` 和 kfifo,因此將 `delay` 變數改成 30000 ms 時重新執行上面的實驗即可觀察到資料輸出速度變成約 30 秒一個字元。
```c
static int delay = 30000;
```
再者,已知 simrupt 專案會定期發生中斷並產生資料,而 `cat` 命令又會不斷循環的呼叫 `read` 系統呼叫,再結合前面對於 simrupt 模組的裝置操作中 `simrupt_read` 的介紹,即可解釋為何 `cat` 會無止盡的輸出資料。因此,我們可以寫一個簡單的程式碼來改變使用系統呼叫 `read` 的方式來觀察結果。測試程式碼如下。
```c
#include <fcntl.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
int main()
{
char buf[1024];
int fd = open("/dev/simrupt", O_RDWR);
if (fd < 0) {
perror("Failed to open device");
return -1;
}
ssize_t bytes = read(fd, buf, sizeof(buf));
if (bytes < 0) {
perror("Failed to read data");
} else {
printf("Read %zd bytes: %s\n", bytes, buf);
}
close(fd);
return 0;
}
```
參考輸出如下(會隨時間長短有所差異)
```shell
Read 9 bytes: 23456789:
```
原因是這段測試程式碼相比 `cat`,只做了一次 `read` 系統呼叫,因此只會讀取當下 kfifo 的所有資料,如果為空則會進入睡眠直到讀取至少一個字元,使用者可以結合對 `delay` 變數的設定來控制資料產生的速度進而方便觀察。
而且由於只做了一次讀取,因此只要時間夠長或是 delay 變數設定的比較小,導致 simrupt 在模擬中斷期間產生的資料量超過 kfifo 的大小時就無法繼續寫入更新。可以透過以下命令觀察。
```shell
$ sudo dmesg
[91733.270703] simrupt: [CPU#4] enter timer_handler
[91733.270720] simrupt: [CPU#4] produce data
[91733.270723] simrupt: [CPU#4] scheduling tasklet
[91733.270726] simrupt: [CPU#4] timer_handler in_irq: 5 usec
[91733.270746] simrupt: [CPU#4] simrupt_tasklet_func in_softirq: 3 usec
[91733.270780] simrupt: [CPU#2] simrupt_work_func
[91733.270784] produce_data: 39 callbacks suppressed
[91733.270787] produce_data: 1 bytes dropped
```
其中 `produce_data: 39 callbacks suppressed` 是由於 Linux 核心日誌系統在報告大量相似訊息時保護核心免受過多的日誌記錄負載影響的一種機制導致。這個機制在核心中稱為 rate limit,用於減少日誌輸出的數量,避免因大量重複日誌而造成的效能問題或讀取困難。這則資訊說明,在先前的日誌記錄中,produce_data 函數相關的日誌輸出被抑制了39次。而抑制的輸出其實就是 `produce_data: 1 bytes dropped`,觀察 simrupt 中 `produce_data` 函式程式碼即可發現是因為 kfifo 緩衝區已滿導致無法寫入。
最後,在進行完所有實驗後記得卸載模組
```c
$ sudo rmmod simrupt
```
補充: 其實仔細觀察 simrupt 專案程式碼後,可以發現 tasklet 除了紀錄排程 workqueue 的時間以外沒有做任何事情,所以移除 tasklet 相關操作並沒有任何問題。
## TODO: 設計實驗來理解 workqueue
workqueue 可以將 work 延後執行,並交由一個 kernel thread 去執行,相對於 tasklet 要在 interrupt context 下執行,workqueue 能在 process context 下執行,且允許重新排程與 sleep,所以 workqueue 可以取代 tasklet。
參考 [LKMPG workqueue](https://sysprog21.github.io/lkmpg/#work-queues) 實驗,自行設計一個 workqueue 實驗,將 work 與 `delayed_work` 放到 workqueue 中,並記錄時間,之後,先執行 work,並再延遲 5 秒後,執行 `delayed_work`。
### 實驗設計
> 參考 [Linux doc](https://www.kernel.org/doc/html/v4.10/core-api/workqueue.html)
```c
#include <linux/init.h>
#include <linux/ktime.h>
#include <linux/module.h>
#include <linux/workqueue.h>
static struct workqueue_struct *queue = NULL;
static struct work_struct my_work;
static struct delayed_work my_delayed_work;
static ktime_t work_time, delayWork_time;
static void work_handler(struct work_struct *work)
{
work_time = ktime_get();
pr_info("Immediate work handler function at: %lld ms.\n",
ktime_to_ms(work_time));
}
static void delayed_work_handler(struct work_struct *work)
{
delayWork_time = ktime_get();
pr_info("Delayed work handler function at: %lld ms.\n",
ktime_to_ms(delayWork_time));
if (work_time) {
s64 msecs = ktime_to_ms(ktime_sub(delayWork_time, work_time));
pr_info("Time difference between work and delayed work: %lld s.\n",
msecs >> 10);
}
}
static int __init my_init(void)
{
queue = alloc_workqueue("ExampleWQ", WQ_UNBOUND, 1);
INIT_WORK(&my_work, work_handler);
INIT_DELAYED_WORK(&my_delayed_work, delayed_work_handler);
queue_work(queue, &my_work);
queue_delayed_work(queue, &my_delayed_work, msecs_to_jiffies(5000));
return 0;
}
static void __exit my_exit(void)
{
cancel_delayed_work_sync(&my_delayed_work);
destroy_workqueue(queue);
pr_info("Workqueue module exiting.\n");
}
module_init(my_init);
module_exit(my_exit);
MODULE_LICENSE("GPL");
MODULE_DESCRIPTION("Workqueue with immediate and delayed work example");
```
參考輸出如下:
```shell
$ sudo dmesg
[218579.766188] Immediate work handler function at: 218578457 ms.
[218584.929022] Delayed work handler function at: 218583620 ms.
[218584.929028] Time difference between work and delayed work: 5 s.
```
使用了[參考資料](https://www.kernel.org/doc/html/v4.10/core-api/workqueue.html)中的 `queue_delayed_work` 函式指定一個延遲時間。在延遲時間過後,工作項目將會被排程執行。
此外可以使用以下命令觀察目前核心中用來執行 workqueue 任務的 kworker 行程。在 CMWQ 架構下這些 kworker 執行序被所有 workqueue 共享。而且 CWMQ 會保留一個 idle 的 kworker 一段時間。這樣若隨即有 work 要處理則可以直接沿用,不必再重新建立。
```shell
$ systemd-cgls -k | grep kworker
├─ 8 [kworker/0:0H-events_highpri]
├─ 25 [kworker/1:0H-events_highpri]
├─ 31 [kworker/2:0H-events_highpri]
├─ 37 [kworker/3:0H-events_highpri]
...
```
表示方法為 `kworker/N:M{flag}`,其中 N 代表 CPU 的編號; M 是用來區別同一 CPU 上不同 kworker 的編號; flag 提供額外資訊關於 kworker 的角色或處理的任務類型。
最後補充介紹一下 `alloc_workqueue` 函式,這函式用於分配一個 workqueue,已取代舊的 `create_workqueue` 函式,其包含三個參數:
- @name: 工作佇列的名稱。
- @flags: 控制工作佇列行為的標誌。
- @max_active: 控制 workqueue 在每個 CPU 上可同時執行的 work 最大數量。
其中 flags 有以下幾種:
- WQ_UNBOUND: 不綁定任何特定 CPU worker。提高靈活性但效能較低,因為稍微犧牲了 locality 導致快取效率較低。
- WQ_FREEZABLE
- WQ_MEM_RECLAIM
- WQ_HIGHPRI:在特定 CPU 的高優先級 worker pool 中處理,適用於需要快速響應的任務,需注意其和位於同個 cpu 裡普通優先級 worker pool 是相互獨立的。
- WQ_CPU_INTENSIVE
CMWQ 架構圖:
![CMWQ](https://hackmd.io/_uploads/rkkZSxh80.png)
透過這張架構圖,對照上面的介紹與實驗結果,可以透過 `systemd-cgls -k | grep kworker` 命令觀察到每個 cpu 都有屬於自己的 `highpri` 高優先級 worker,以及一些 unbound 的 worker。
這個實驗與 simrupt 專案中皆使用 `WQ_UNBOUND` 與 `@max_active` 設為 1 的組合來實現嚴格的執行排序,確保在任何時候只有一個任務可以激活,從而實現與 ST 工作佇列相同的排序屬性。
(以上介紹可對照參考 [Linux doc](https://www.kernel.org/doc/html/v4.10/core-api/workqueue.html) 和 [2024 年 Linux 核心設計/實作課程作業](https://hackmd.io/@sysprog/linux2024-integration/%2F%40sysprog%2Flinux2024-integration-c#Concurrency-Managed-Workqueue))
## TODO: 在 Linux 核心選定 kfifo 應用案例
在 Linux 核心中,kfifo 用來管理 FIFO 環狀緩衝區。經常用於實現高效的緩衝和資料流處理,特別是在需要快速且線性地存取資料的情況下。以下是幾個常見的 kfifo 應用案例:
1. 字元裝置驅動程式,如 [/drivers/char/sonypi.c](https://elixir.bootlin.com/linux/v6.9.7/source/drivers/char/sonypi.c#L475)
2. 網路驅動程式,如 [/drivers/net/ieee802154/ca8210.c](https://elixir.bootlin.com/linux/v6.9.7/source/drivers/net/ieee802154/ca8210.c#L283)
## TODO: Memory Barrier 的使用和必要性
> 搭配實驗解說
simrupt 專案中分別在 `fast_buf_get` 和 `fast_buf_put` 中使用了 `smp_rmb`, `smp_mb` 和 `smp_wmb` 三個 memory barrier,防止記憶體讀取指令的重排。
其分別的使用場景和必要性可見我上面的[筆記](#memory-barrier)介紹
### 實驗設計
為了方便觀察資料的讀取順序是否因為少了 memory barrier 而改變,我將 simrupt 專案產生的資料範圍改成 A 到 Z 區間:
```diff
static inline int update_simrupt_data(void)
{
- simrupt_data = max((simrupt_data + 1) % 0x7f, 0x20);
+ simrupt_data = max((simrupt_data + 1) % 0x5b, 0x41);
return simrupt_data;
}
```
並將 delay 時間縮短使中斷發生的頻率增加,進而增加負載,接著移除三個 memory barrier ,並在 userspace 呼叫使用兩個 termianl 同時呼叫 read 觀察輸出結果:
在一開始只有一個 termianl 時輸出保持順序。
```shell
$ sudo cat /dev/simrupt
ABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZ
```
然而加入第二個 termianl 後兩者的輸出順序皆稍微亂序。
```shell
$ sudo cat /dev/simrupt
ACEHILMORUXADFIKNQTWZCFILORSUVXYABDEGHJKMNPQSTVWYZBCEFHIKLNOQRUVXY
```
由此可知雙方讀取的索引值在沒有 memory barrier 的保護下已經互相影響。
## TODO: 探討 Linux 中斷處理的流程
> 以 simrupt 作為解說對象,模仿科技公司面試對答的形式
我先簡單介紹大致概念。在 Linux 中,中斷是 CPU 對事件做出反應的機制。由於一個 cpu 在任意時刻只能處理一個任務,中斷允許 CPU 暫時停止目前行程的執行,立即去處理更緊急的任務。
中斷可以用許多角度分類,以下採用硬體還是軟體觸發來分類:
- Hardware Interrupts: 由硬體裝置產生。
- Software Interrupts: 系統呼叫。以及程式執行非法操作 (除零) 時觸發,稱為異常 (Exceptions)。
**針對硬體中斷進行討論**
中斷類型:
1. I/O interrupts: 由外部裝置(如鍵盤、鼠標、網卡等)引發的中斷,當這些裝置需要 CPU 處理某些事件時(如資料準備好、讀取完成等),它們會發送中斷信號給 CPU。我之前參與過一個 simrupt 專案就是一個模擬這種情況的驅動模組。
2. Timer interrupts: 由系統定時器定期引發的中斷,如 Round-Robin Scheduling。
3. Interprocessor interrupts: 在多處理器系統中,一個處理器向另一個處理器發送的中斷訊號,用於協調多處理器間的操作。
中斷發生流程:
1. 硬體裝置發送物理信號到中斷控制器 (PIC/APIC)。
2. 中斷控制器將物理信號轉成中斷向量交給 cpu 執行。
3. cpu 執行以下處理:
- 關閉中斷和保存狀態: 作業系統保存目前行程的狀態以便中斷處理完成後能恢復執行,與 context switch 的狀況不同,較為輕量避免佔用太多時間。
- 執行 ISR (Interrupt Service Routine): 每種中斷都有一個對應的 ISR,透過查尋 Interrupt Vector Table 取得。ISR 是位於核心特定位址的程式碼。
- 任務排程和恢復: ISR 執行完畢後,作業系統根據目前的任務優先順序調整 CPU 資源分配,恢復或開始新的行程。
然而 IRQ handler (ISR) 既要盡量縮短執行時間,又可能要處理許多事情,這存在矛盾,也因而誕生了 deferred interrupts 用於解決這種情況,包含了 softirq, tasklet, workqueue。
以我參與過的 simrupt 專案為例,模擬了網卡接收封包時觸發中斷的情況。
透過核心定時器 (timer_list, base on Timer softirq) 定期產生資料來模擬網卡接收資料,過程中使用 `local_irq_disable` 關閉中斷來達到與真實 I/O interrupts 進入 ISR 一樣的情形。在這段期間會將資料插入到快速環形緩衝區中,同時排程 tasklet 來處理後續更多的資料處理工作,達到迅速退出 ISR 避免長時間關閉中斷的效果。
接著 tasklet 會進行排程並透過 ksoftirqd 核心執行序處理後續操作,在 simrupt 這個專案中只進行了排程 workqueue 一個操作,因為 tasklet 需在 interrupt context 中執行,所以不能執行阻塞或睡眠等複雜操作。
最後 workqueue 會排程並透過 kworker 在 process context 中處理最後耗時的複雜操作,從 kfifo 中取出資料並將其提供給使用者空間。
可能遇到的提問:
1. fast_buf 和 kfifo 的差異?
Ans: fast_buf 是一個簡單的環形緩衝區,設計目的是在 interrupt context 中快速存儲資料。使用簡單的指針操作(head 和 tail)來管理資料的插入和取出。並透過維持 2 的冪大小達到用 bitwise 取代 modulus (divide) 操作。且由於在 interrupt context 所以不用考慮用鎖防止競爭問題。
kfifo 則是核心中一種通用的環形 FIFO 緩衝區,支持多讀者多寫者場景所以設計更加複雜。
2. 為何此驅動模組中選擇使用 Tasklet ,不使用 softirq?
Ans: 與我上面[筆記](#tasklet)內容重複。
3. irq 和 softirq 差異?
Ans: 兩者雖皆在 interrupt context 中執行,但後者是開放中斷的。
4. softirq 是可以 reschduling 導致可能 starve user-level processes,如何解決?
Ans: 核心可以通過限制每個 softirq 處理的次數,或是限制每次 softirq 處理所花費的時間來防止。
補充相關實驗展示如下(只產生三個字元並讀取的結果):
```shell
[255925.006593] simrupt: registered new simrupt device: 511,0
[255930.342137] openm current cnt: 1
[255961.544677] simrupt: [CPU#4] enter timer_handler
[255961.544695] simrupt: [CPU#4] produce data
[255961.544696] simrupt: [CPU#4] scheduling tasklet
[255961.544698] simrupt: [CPU#4] timer_handler in_irq: 3 usec
[255961.544756] simrupt: [CPU#4] simrupt_tasklet_func in_softirq: 0 usec
[255961.544758] tasklet run 54 usec after irq
[255961.544758] simrupt: [CPU#0] simrupt_work_func
[255961.544763] simrupt_work_func execution time: 4989 ns
[255992.264505] simrupt: [CPU#4] enter timer_handler
[255992.264528] simrupt: [CPU#4] produce data
[255992.264530] simrupt: [CPU#4] scheduling tasklet
[255992.264532] simrupt: [CPU#4] timer_handler in_irq: 4 usec
[255992.264608] simrupt: [CPU#4] simrupt_tasklet_func in_softirq: 1 usec
[255992.264611] tasklet run 70 usec after irq
[255992.264612] simrupt: [CPU#2] simrupt_work_func
[255992.264619] simrupt_work_func execution time: 6843 ns
[256022.984602] simrupt: [CPU#4] enter timer_handler
[256022.984619] simrupt: [CPU#4] produce data
[256022.984621] simrupt: [CPU#4] scheduling tasklet
[256022.984623] simrupt: [CPU#4] timer_handler in_irq: 4 usec
[256022.984694] simrupt: [CPU#4] simrupt_tasklet_func in_softirq: 0 usec
[256022.984697] tasklet run 65 usec after irq
[256022.984792] simrupt: [CPU#5] simrupt_work_func
[256022.984798] simrupt_work_func execution time: 6652 ns
[256027.531242] release, current cnt: 0
[256032.428760] simrupt: unloaded
```
可以觀察到 irq 和 tasklet 的執行速度都非常快,而 tasklet 排程後並不一定會馬上執行,而在 worker 中執行的任務時間相較之下確實非常久。
:::danger
使用課程指定的程式碼風格進行縮排,確保風格一致。
> 收到,本意是想精簡篇幅,現已改進
:::