# 2025q1 Homerwork3
contributed by < [`charliechiou`](https://github.com/charliechiou?tab=repositories) >
{%hackmd NrmQUGbRQWemgwPfhzXj6g %}
## Softirq, Tasklet, Workqueue
Linux 核心中關於 Bottom half 有以下三種機制:
- softirqs
- tasklets
- workqueues
閱讀 Linux Kernel Development 以理解其機制並整理於筆記中
[Softirq, Tasklet, Workqueue 理解](https://hackmd.io/@charliechiu/bottom_half)
## Simrupt
>[Simrupt](https://github.com/sysprog21/simrupt) 專案名稱由 simulate 和 interrupt 二個單字組合而來,其作用是模擬 IRQ 事件。
參考 [Linux 核心的並行處理](https://hackmd.io/@sysprog/linux2025-kxo/%2F%40sysprog%2Flinux2025-kxo-c)及 [simrupt](https://github.com/sysprog21/simrupt) 專案以理解 Linux 中核心模組的使用及並行處理。
核心模組使用以下指令編譯
```bash
$ make -C /lib/modules/`uname -r`/build M=`pwd` modules
```
並透過程式碼中 `module_init()` 及 `module_exit()` 當作核心模組的進出口,而關於核心模組的敘述可以設定如下
```c
MODULE_LICENSE("Dual MIT/GPL");
MODULE_AUTHOR("National Cheng Kung University, Taiwan");
MODULE_DESCRIPTION("A device that simulates interrupts");
```
編譯後也可以透過 `modinfo simrupt.ko` 來查看模組相關資訊。
由於核心模組運行在 Kernel mode 下,因此我們需要先引入核心相關的標頭檔。
```c
#include <linux/cdev.h>
...
#include <linux/workqueue.h>
```
### 初始化
程式碼透過 `simrupt_init` 來初始化模組,使用 `dev_id` 來註冊裝置及 `ret` 來紀錄回傳數值。並透過 `kfifo_alloc` 來分配 PAGE_SIZE 大小的記憶體空間,且由 [alloc_chrdev_region](https://manpages.debian.org/jessie-backports/linux-manual-4.9/alloc_chrdev_region.9) 來註冊一個裝置。 alloc_chrdev_region 會將分配的裝置編號儲存在 `&dev_id` 中,並使用 MAJOR 的宏來取得裝置編號。
```c
ret = alloc_chrdev_region(&dev_id, 0, NR_SIMRUPT, DEV_NAME);
```
接著透過 `cdev_init` 來初始化一個 [character device](https://linux-kernel-labs.github.io/refs/heads/master/labs/device_drivers.html) 並使用 `cdev_add` 綁定。
```c
cdev_init(&simrupt_cdev, &simrupt_fops);
ret = cdev_add(&simrupt_cdev, dev_id, NR_SIMRUPT);
```
而其中初始化過程中所設定的操作則定義在 `&simrupt_fops` 中,如下
```c
static const struct file_operations simrupt_fops = {
.read = simrupt_read,
.llseek = no_llseek,
.open = simrupt_open,
.release = simrupt_release,
.owner = THIS_MODULE,
};
```
將 character device 不同的操作對應到不同的操作函式中。由於 linux 核心中並沒有類似模擬中斷類型的裝置,因此使用 class_create 來創造一個新的類別。
```c
simrupt_class = class_create(DEV_NAME);
```
最後透過 [`device_create`](https://manpages.debian.org/jessie/linux-manual-3.16/device_create.9) 創建一個裝置節點 `/dev/simrupt` 讓使用者可以透過先前定義的 `open`, `read` 來操作我們所建立的裝置。由於我們前面在 alloc_chrdev_region 時將裝置的起始值設定為 0 ,因此這邊使用 `MKDEV(major, 0)` 將主及次裝置編號合成對應到前面設定的裝置。
```c
device_create(simrupt_class, NULL, MKDEV(major, 0), NULL, DEV_NAME);
```
接著再分別使用 `vmalloc` 及 `alloc_workqueue` 宣告會使用到的 buffer 及 workqueue。
```c
fast_buf.buf = vmalloc(PAGE_SIZE);
simrupt_workqueue = alloc_workqueue("simruptd", WQ_UNBOUND, WQ_MAX_ACTIVE);
```
其中 `PAGE_SIZE` 是我們要分配的 buffer 大小,而 `WQ_UNBOUND` 及 `WQ_MAX_ACTIVE` 則分別表示不綁定特定 CPU 及同時啟用的工作數量上限。
最後再使用 `timer_setup` 初始化 timer ,並用 `atomic_set` 來統計裝置被打開的次數,以確保只有在裝置打開後才會執行。
```c
timer_setup(&timer, timer_handler, 0);
atomic_set(&open_cnt, 0);
```
其中設定當 timer 觸發時所要執行的程式 `timer_handler`。其中, `simrupt_open` 會在使用者 `open` 裝置的時候觸發,並透過 `atomic_inc_return` 將 open_cnt 加 1 並且回傳,若回傳值為 1 則表示裝置是第一次被開啟,因此需要啟動 timer 。
```c
if (atomic_inc_return(&open_cnt) == 1)
mod_timer(&timer, jiffies + msecs_to_jiffies(delay));
```
其中 jiffies 在 linux 核心中表示**現在**,在經過 delay 秒後觸發 timer。
### 觸發計時器
參考 linux 核心中的 [timer.c](https://github.com/torvalds/linux/blob/master/kernel/time/timer.c#L2612) 檔案及翻閱 Linux Kernel Development 中的敘述 , Linux 在啟動的過程中 ([init/main.c](https://elixir.bootlin.com/linux/v6.13.7/source/init/main.c#L1008)) 會呼叫 `init_timers()`。
```c
void __init init_timers(void)
{
init_timer_cpus();
posix_cputimers_init_work();
open_softirq(TIMER_SOFTIRQ, run_timer_softirq);
}
```
建立 timer 的時候會先註冊對應的 softirq (i.e., `open_softirq(TIMER_SOFTIRQ, run_timer_softirq);`) 在這邊將 TIMER_SOFTIRQ 類型的 softirq 其 handler 函式設為 run_timer_softirq,其中 open_softirq 實現如下
```c
void open_softirq(int nr, void (*action)(void))
{
softirq_vec[nr].action = action;
}
/*
* This function runs timers and the timer-tq in **bottom half context.**
*/
static __latent_entropy void run_timer_softirq(void)
{
...
}
```
會把對應的 action 函式(在此為 run_timer_softirq )設定到全域陣列 softirq_vec[] 中對應 nr (在此為 TIMER_SOFTIRQ) 的位置。由此可知,timer 基於 softirq 建立,時間到觸發後會在 softirq context 中執行綁定的 `timer_handler`。
在 `timer_handler` 中,先藉由 `in_softirq` 來確認我們目前是在 sortirq 中,並且使用 `local_irq_disable` 將原始的 irq 關閉。
```c
WARN_ON_ONCE(!in_softirq());
/* Disable interrupts for this CPU to simulate real interrupt context */
local_irq_disable();
```
接著使用 `ktime_get()` 來取得目前的時間,並在 `process_data()` 前後計算,以便分析執行時間,並以 ns 的單位計算。
```c
tv_start = ktime_get();
process_data();
tv_end = ktime_get();
nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start));
pr_info("simrupt: [CPU#%d] %s in_irq: %llu usec\n", smp_processor_id(),
__func__, (unsigned long long) nsecs >> 10);
```
最後再重新設定下一次中斷的觸發時間點,並重新開啟原始的 irq 。
```c
mod_timer(&timer, jiffies + msecs_to_jiffies(delay));
local_irq_enable();
```
### 中斷執行程式
當計時器觸發時會執行 `process_data()`,首先先確認上一段是否有正確的將原始的 irq 取消
```c
WARN_ON_ONCE(!irqs_disabled());
```
interrupts 可以分為
- interrupt handler must execute quickly
- sometimes an interrupt handler must do a large amount of work
在 simrupt 中,透過將資料寫入環形的快取緩衝區 fast_buf 中,即為呼叫 `fast_buf_put()` 將 `update_simrupt_data()` 傳入 `fast_buf` 來模擬 top half 的操作。
```c
fast_buf_put(update_simrupt_data());
```
接著, 先宣告一個 tasklet 的型態,指定其 callback 的函式為`simrupt_tasklet_func()`。
```c
static DECLARE_TASKLET_OLD(simrupt_tasklet, simrupt_tasklet_func);
```
再使用 `tasklet_schedule` 將剩餘操作排程等待執行,將 fast_buffer 搬移至 kfifo 的工作排程。
```c
tasklet_schedule(&simrupt_tasklet);
```
### Fast buffer
為了增加延遲處理速度,我們會將資料先放入快取緩衝區 fast_buf 中,其中對於 fast buffer 的操作又分為 `fast_buf_put` , `fast_buf_get`, 及 `fast_buf_clear`,分別對應到寫入,讀出及移除,而其中的 fast buffer 則使用 Linux 的 [Circular Buffer](https://www.kernel.org/doc/Documentation/core-api/circular-buffers.rst) 來儲存。
>Circular Buffer 是個固定大小的緩衝區,其中具有 2 個 indicies:
>
>- head index: the point at which the producer inserts items into the buffer.
>- tail index: the point at which the consumer finds the next item in the buffer.
>
>當 head 和 tail 重疊時,代表目前是空的緩衝區。相反的,當 head 比 tail 少 1 時,代表緩衝區是滿的。當有項目被添加時,head index 會增加,當有項目被移除時,tail index 會被增加,tail 不會超過 head,且當兩者都到達緩衝區的末端時,都必須被設定回 0。也可以藉由此方法清除緩衝區中的資料。
#### fast_buf_put
寫入前先將目前 fast_buf 的地址儲存在 ring 的指標變數中,並儲存 head
```c
struct circ_buf *ring = &fast_buf;
unsigned long head = ring->head;
```
接著用核心提供的 MARCO 來確認使否還有剩餘空間
```c
if (unlikely(!CIRC_SPACE(head, tail, PAGE_SIZE)))
return -ENOMEM;
```
若有空間則寫入 ring 的 head 位置
```c
ring->buf[ring->head] = val;
```
寫入完成後將 head 移動到下一個位置,由於緩衝區大小維持 2 的冪,因此可以使用 bitwise 操作來計算緩衝區空間。以 PAGE_SIZE 大小 8 為例,`PAGE_SIZE-1 = 0b0111`,遮掉超過 PAGE_SIZE 的高位相當於只留下除完後的餘數,透過位元計算快速求得模除的結果。
```c
ring->head = (ring->head + 1) & (PAGE_SIZE - 1);
```
在 `fast_buf_put` 的操作中,為了避免編譯器造成的錯誤,因此在讀取 tail 時候限制僅讀取一次
```c
unsigned long tail = READ_ONCE(ring->tail);
```
也使用 `smp_rmb()` 作為 memory barrier,會防止記憶體讀取指令的重排,確保先讀取索引值後再讀取內容。避免因為 cpu 的優化造成先執行 `ring->head = ...` 再執行 `ing->buf[ring->head] = val;` 造成錯誤,此即為 memory barrier 的用途。
#### fast_buf_get
讀取 fast buffer 中的資料方式和插入資料方式類似,同樣先讀取 head 及 tail 的地址,為了避免資料被其他 CPU 改動到,因此使用 `READ_ONCE`。
```c
unsigned long head = READ_ONCE(ring->head), tail = ring->tail;
```
接著便讀取 tail 的資料並更新 tail 將資料移出,而過程中也同樣使用 memory barrier 來防止錯誤。
```c
/* read index before reading contents at that index */
smp_rmb();
/* extract item from the buffer */
ret = ring->buf[tail];
/* finish reading descriptor before incrementing tail */
smp_mb();
/* increment the tail pointer */
ring->tail = (tail + 1) & (PAGE_SIZE - 1);
```
#### fast_buf_clear
在 `fast_buf_clear` 則直接將 head 及 tail 設置為 0 來清空 fast buffer。
```c
fast_buf.head = fast_buf.tail = 0;
```
在 top half 則透過上述的 fast_buf_put 將`update_simrupt_data` 產生的資料儲存在 fast buffer 中。
```c
/* Generate new data from the simulated device */
static inline int update_simrupt_data(void)
{
simrupt_data = max((simrupt_data + 1) % 0x7f, 0x20);
return simrupt_data;
}
```
其中將 simrupt_data 限制在 0x20以內以防止小於ASCII 的空白字元 `' '`,並且透過 `% 0x7f` 讓資料循環在 `0x20~0x7f` 之間。
### Tasklet & Workqueue
指定 tasklet 的 callback 的函式為 `simrupt_tasklet_func()`。
```c
static DECLARE_TASKLET_OLD(simrupt_tasklet, simrupt_tasklet_func);
```
並使用 `tasklet_schedule` 將任務排程
```c
tasklet_schedule(&simrupt_tasklet);
```
#### simrupt_tasklet_func
任務排程後,`simrupt_tasklet_func` 使用 `queue_work` 將工作項目 work(work item)加入工作佇列 simrupt_workqueue 中排程執行並計算執行時間。
```c
queue_work(simrupt_workqueue, &work);
```
其中工作佇列及每個 work 所綁定的 function 如下
```c
/* Workqueue for asynchronous bottom-half processing */
static struct workqueue_struct *simrupt_workqueue;
/* Work item: holds a pointer to the function that is going to be executed
* asynchronously.
*/
static DECLARE_WORK(work, simrupt_work_func);
```
simrupt_workqueue 是模組中的工作佇列,當 work 被排入該佇列並被 kernel thread 執行時,將會執行綁定的處理函式 simrupt_work_func()。
#### simrupt_work_func
當 simrupt_work_func 執行時,會避免目前是執行 softirq 或是 interrupt 的情況,使用到 `WARN_ON_ONCE` 來警告。
```c
/* This code runs from a kernel thread, so softirqs and hard-irqs must
* be enabled.
*/
WARN_ON_ONCE(in_softirq());
WARN_ON_ONCE(in_interrupt());
```
透過 `get_cpu()` 來取得目前 CPU 編號並 disable preemption,最後需要 put_cpu() 重新 enable preemption。
```c
/* Pretend to simulate access to per-CPU data, disabling preemption
* during the pr_info().
*/
cpu = get_cpu();
pr_info("simrupt: [CPU#%d] %s\n", cpu, __func__);
put_cpu();
```
在接下來的迴圈中,會不斷取出 circular buffer 中的資料 (consumer) 並且放入 kfifo buffer 中 (producer)。其中,為了避免不同的任務在執行過程中同時取得 circular buffer 的資料,或不同任務同時寫入 kfifo buffer 中,因此要使用 mutex_lock 來鎖住 critical section 。
```c
while (1) {
/* Consume data from the circular buffer */
mutex_lock(&consumer_lock);
val = fast_buf_get();
mutex_unlock(&consumer_lock);
if (val < 0)
break;
/* Store data to the kfifo buffer */
mutex_lock(&producer_lock);
produce_data(val);
mutex_unlock(&producer_lock);
}
```
其中 `fast_buf_get` 從 circular buffer 中取得資料,並使用 `produce_data` 放入 kfifo buffer 中。而 producer_lock 及 consumer_lock 則定義為 mutex lock。
```c
static DEFINE_MUTEX(producer_lock);
static DEFINE_MUTEX(consumer_lock);
```
最後的 `wake_up_interruptible` 則對應到 `simrupt_read` 中的 `wait_event_interruptible`。當資料已經被放入 kfifo buffer 中,通知 `simrupt_read` 已經可以讀取。
```c
wake_up_interruptible(&rx_wait);
```
#### produce_data
資料使用 `fast_buf_get` 取出後,藉由 `produce_data` 存入 kfifo_buffer 中
```c
unsigned int len = kfifo_in(&rx_fifo, &val, sizeof(val));
```
### simrupt_read
當使用指使用 `cat /dev/simrupt` 開啟裝置時,會依序使用 `open()` 開啟檔案並使用 `read()` 來讀取檔案。對應到程式碼中 `simrupt_open` 及 `simrupt_read` 。
#### simrupt_open
當程式碼第一次被開啟時,會將記數 open_cnt 加一,以開啟 timer 。
```c
if (atomic_inc_return(&open_cnt) == 1)
mod_timer(&timer, jiffies + msecs_to_jiffies(delay));
```
#### simrupt_read
先使用 `access_ok` 來確認使用者提供的指標位置是合法的,並且使用 `mutex_lock_interruptible` 來確認目前是可以讀取的狀態。
```c
if (unlikely(!access_ok(buf, count)))
return -EFAULT;
if (mutex_lock_interruptible(&read_lock))
return -ERESTARTSYS;
```
接著使用 `kfifo_to_user` 來將資料從 fifo 中搬移到使用者空間的 buf 中。如果回傳錯誤碼則退出,若成功搬移也退出。若目前 kfifo 中沒有資料則使用 `wait_event_interruptible` 睡眠直到 `wake_up_interruptible` 將其喚醒後再確認 kfifo_len(&rx_fifo) 是否大於 0 若是則回傳 0 。
```c
do {
ret = kfifo_to_user(&rx_fifo, buf, count, &read);
if (unlikely(ret < 0))
break;
if (read)
break;
if (file->f_flags & O_NONBLOCK) {
ret = -EAGAIN;
break;
}
ret = wait_event_interruptible(rx_wait, kfifo_len(&rx_fifo));
} while (ret == 0);
```
最後再釋放 read_lock。
```c
mutex_unlock(&read_lock);
```
:::danger
使用 bpftool 分析
:::