# 2025q1 Homerwork3 contributed by < [`charliechiou`](https://github.com/charliechiou?tab=repositories) > {%hackmd NrmQUGbRQWemgwPfhzXj6g %} ## Softirq, Tasklet, Workqueue Linux 核心中關於 Bottom half 有以下三種機制: - softirqs - tasklets - workqueues 閱讀 Linux Kernel Development 以理解其機制並整理於筆記中 [Softirq, Tasklet, Workqueue 理解](https://hackmd.io/@charliechiu/bottom_half) ## Simrupt >[Simrupt](https://github.com/sysprog21/simrupt) 專案名稱由 simulate 和 interrupt 二個單字組合而來,其作用是模擬 IRQ 事件。 參考 [Linux 核心的並行處理](https://hackmd.io/@sysprog/linux2025-kxo/%2F%40sysprog%2Flinux2025-kxo-c)及 [simrupt](https://github.com/sysprog21/simrupt) 專案以理解 Linux 中核心模組的使用及並行處理。 核心模組使用以下指令編譯 ```bash $ make -C /lib/modules/`uname -r`/build M=`pwd` modules ``` 並透過程式碼中 `module_init()` 及 `module_exit()` 當作核心模組的進出口,而關於核心模組的敘述可以設定如下 ```c MODULE_LICENSE("Dual MIT/GPL"); MODULE_AUTHOR("National Cheng Kung University, Taiwan"); MODULE_DESCRIPTION("A device that simulates interrupts"); ``` 編譯後也可以透過 `modinfo simrupt.ko` 來查看模組相關資訊。 由於核心模組運行在 Kernel mode 下,因此我們需要先引入核心相關的標頭檔。 ```c #include <linux/cdev.h> ... #include <linux/workqueue.h> ``` ### 初始化 程式碼透過 `simrupt_init` 來初始化模組,使用 `dev_id` 來註冊裝置及 `ret` 來紀錄回傳數值。並透過 `kfifo_alloc` 來分配 PAGE_SIZE 大小的記憶體空間,且由 [alloc_chrdev_region](https://manpages.debian.org/jessie-backports/linux-manual-4.9/alloc_chrdev_region.9) 來註冊一個裝置。 alloc_chrdev_region 會將分配的裝置編號儲存在 `&dev_id` 中,並使用 MAJOR 的宏來取得裝置編號。 ```c ret = alloc_chrdev_region(&dev_id, 0, NR_SIMRUPT, DEV_NAME); ``` 接著透過 `cdev_init` 來初始化一個 [character device](https://linux-kernel-labs.github.io/refs/heads/master/labs/device_drivers.html) 並使用 `cdev_add` 綁定。 ```c cdev_init(&simrupt_cdev, &simrupt_fops); ret = cdev_add(&simrupt_cdev, dev_id, NR_SIMRUPT); ``` 而其中初始化過程中所設定的操作則定義在 `&simrupt_fops` 中,如下 ```c static const struct file_operations simrupt_fops = { .read = simrupt_read, .llseek = no_llseek, .open = simrupt_open, .release = simrupt_release, .owner = THIS_MODULE, }; ``` 將 character device 不同的操作對應到不同的操作函式中。由於 linux 核心中並沒有類似模擬中斷類型的裝置,因此使用 class_create 來創造一個新的類別。 ```c simrupt_class = class_create(DEV_NAME); ``` 最後透過 [`device_create`](https://manpages.debian.org/jessie/linux-manual-3.16/device_create.9) 創建一個裝置節點 `/dev/simrupt` 讓使用者可以透過先前定義的 `open`, `read` 來操作我們所建立的裝置。由於我們前面在 alloc_chrdev_region 時將裝置的起始值設定為 0 ,因此這邊使用 `MKDEV(major, 0)` 將主及次裝置編號合成對應到前面設定的裝置。 ```c device_create(simrupt_class, NULL, MKDEV(major, 0), NULL, DEV_NAME); ``` 接著再分別使用 `vmalloc` 及 `alloc_workqueue` 宣告會使用到的 buffer 及 workqueue。 ```c fast_buf.buf = vmalloc(PAGE_SIZE); simrupt_workqueue = alloc_workqueue("simruptd", WQ_UNBOUND, WQ_MAX_ACTIVE); ``` 其中 `PAGE_SIZE` 是我們要分配的 buffer 大小,而 `WQ_UNBOUND` 及 `WQ_MAX_ACTIVE` 則分別表示不綁定特定 CPU 及同時啟用的工作數量上限。 最後再使用 `timer_setup` 初始化 timer ,並用 `atomic_set` 來統計裝置被打開的次數,以確保只有在裝置打開後才會執行。 ```c timer_setup(&timer, timer_handler, 0); atomic_set(&open_cnt, 0); ``` 其中設定當 timer 觸發時所要執行的程式 `timer_handler`。其中, `simrupt_open` 會在使用者 `open` 裝置的時候觸發,並透過 `atomic_inc_return` 將 open_cnt 加 1 並且回傳,若回傳值為 1 則表示裝置是第一次被開啟,因此需要啟動 timer 。 ```c if (atomic_inc_return(&open_cnt) == 1) mod_timer(&timer, jiffies + msecs_to_jiffies(delay)); ``` 其中 jiffies 在 linux 核心中表示**現在**,在經過 delay 秒後觸發 timer。 ### 觸發計時器 參考 linux 核心中的 [timer.c](https://github.com/torvalds/linux/blob/master/kernel/time/timer.c#L2612) 檔案及翻閱 Linux Kernel Development 中的敘述 , Linux 在啟動的過程中 ([init/main.c](https://elixir.bootlin.com/linux/v6.13.7/source/init/main.c#L1008)) 會呼叫 `init_timers()`。 ```c void __init init_timers(void) { init_timer_cpus(); posix_cputimers_init_work(); open_softirq(TIMER_SOFTIRQ, run_timer_softirq); } ``` 建立 timer 的時候會先註冊對應的 softirq (i.e., `open_softirq(TIMER_SOFTIRQ, run_timer_softirq);`) 在這邊將 TIMER_SOFTIRQ 類型的 softirq 其 handler 函式設為 run_timer_softirq,其中 open_softirq 實現如下 ```c void open_softirq(int nr, void (*action)(void)) { softirq_vec[nr].action = action; } /* * This function runs timers and the timer-tq in **bottom half context.** */ static __latent_entropy void run_timer_softirq(void) { ... } ``` 會把對應的 action 函式(在此為 run_timer_softirq )設定到全域陣列 softirq_vec[] 中對應 nr (在此為 TIMER_SOFTIRQ) 的位置。由此可知,timer 基於 softirq 建立,時間到觸發後會在 softirq context 中執行綁定的 `timer_handler`。 在 `timer_handler` 中,先藉由 `in_softirq` 來確認我們目前是在 sortirq 中,並且使用 `local_irq_disable` 將原始的 irq 關閉。 ```c WARN_ON_ONCE(!in_softirq()); /* Disable interrupts for this CPU to simulate real interrupt context */ local_irq_disable(); ``` 接著使用 `ktime_get()` 來取得目前的時間,並在 `process_data()` 前後計算,以便分析執行時間,並以 ns 的單位計算。 ```c tv_start = ktime_get(); process_data(); tv_end = ktime_get(); nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start)); pr_info("simrupt: [CPU#%d] %s in_irq: %llu usec\n", smp_processor_id(), __func__, (unsigned long long) nsecs >> 10); ``` 最後再重新設定下一次中斷的觸發時間點,並重新開啟原始的 irq 。 ```c mod_timer(&timer, jiffies + msecs_to_jiffies(delay)); local_irq_enable(); ``` ### 中斷執行程式 當計時器觸發時會執行 `process_data()`,首先先確認上一段是否有正確的將原始的 irq 取消 ```c WARN_ON_ONCE(!irqs_disabled()); ``` interrupts 可以分為 - interrupt handler must execute quickly - sometimes an interrupt handler must do a large amount of work 在 simrupt 中,透過將資料寫入環形的快取緩衝區 fast_buf 中,即為呼叫 `fast_buf_put()` 將 `update_simrupt_data()` 傳入 `fast_buf` 來模擬 top half 的操作。 ```c fast_buf_put(update_simrupt_data()); ``` 接著, 先宣告一個 tasklet 的型態,指定其 callback 的函式為`simrupt_tasklet_func()`。 ```c static DECLARE_TASKLET_OLD(simrupt_tasklet, simrupt_tasklet_func); ``` 再使用 `tasklet_schedule` 將剩餘操作排程等待執行,將 fast_buffer 搬移至 kfifo 的工作排程。 ```c tasklet_schedule(&simrupt_tasklet); ``` ### Fast buffer 為了增加延遲處理速度,我們會將資料先放入快取緩衝區 fast_buf 中,其中對於 fast buffer 的操作又分為 `fast_buf_put` , `fast_buf_get`, 及 `fast_buf_clear`,分別對應到寫入,讀出及移除,而其中的 fast buffer 則使用 Linux 的 [Circular Buffer](https://www.kernel.org/doc/Documentation/core-api/circular-buffers.rst) 來儲存。 >Circular Buffer 是個固定大小的緩衝區,其中具有 2 個 indicies: > >- head index: the point at which the producer inserts items into the buffer. >- tail index: the point at which the consumer finds the next item in the buffer. > >當 head 和 tail 重疊時,代表目前是空的緩衝區。相反的,當 head 比 tail 少 1 時,代表緩衝區是滿的。當有項目被添加時,head index 會增加,當有項目被移除時,tail index 會被增加,tail 不會超過 head,且當兩者都到達緩衝區的末端時,都必須被設定回 0。也可以藉由此方法清除緩衝區中的資料。 #### fast_buf_put 寫入前先將目前 fast_buf 的地址儲存在 ring 的指標變數中,並儲存 head ```c struct circ_buf *ring = &fast_buf; unsigned long head = ring->head; ``` 接著用核心提供的 MARCO 來確認使否還有剩餘空間 ```c if (unlikely(!CIRC_SPACE(head, tail, PAGE_SIZE))) return -ENOMEM; ``` 若有空間則寫入 ring 的 head 位置 ```c ring->buf[ring->head] = val; ``` 寫入完成後將 head 移動到下一個位置,由於緩衝區大小維持 2 的冪,因此可以使用 bitwise 操作來計算緩衝區空間。以 PAGE_SIZE 大小 8 為例,`PAGE_SIZE-1 = 0b0111`,遮掉超過 PAGE_SIZE 的高位相當於只留下除完後的餘數,透過位元計算快速求得模除的結果。 ```c ring->head = (ring->head + 1) & (PAGE_SIZE - 1); ``` 在 `fast_buf_put` 的操作中,為了避免編譯器造成的錯誤,因此在讀取 tail 時候限制僅讀取一次 ```c unsigned long tail = READ_ONCE(ring->tail); ``` 也使用 `smp_rmb()` 作為 memory barrier,會防止記憶體讀取指令的重排,確保先讀取索引值後再讀取內容。避免因為 cpu 的優化造成先執行 `ring->head = ...` 再執行 `ing->buf[ring->head] = val;` 造成錯誤,此即為 memory barrier 的用途。 #### fast_buf_get 讀取 fast buffer 中的資料方式和插入資料方式類似,同樣先讀取 head 及 tail 的地址,為了避免資料被其他 CPU 改動到,因此使用 `READ_ONCE`。 ```c unsigned long head = READ_ONCE(ring->head), tail = ring->tail; ``` 接著便讀取 tail 的資料並更新 tail 將資料移出,而過程中也同樣使用 memory barrier 來防止錯誤。 ```c /* read index before reading contents at that index */ smp_rmb(); /* extract item from the buffer */ ret = ring->buf[tail]; /* finish reading descriptor before incrementing tail */ smp_mb(); /* increment the tail pointer */ ring->tail = (tail + 1) & (PAGE_SIZE - 1); ``` #### fast_buf_clear 在 `fast_buf_clear` 則直接將 head 及 tail 設置為 0 來清空 fast buffer。 ```c fast_buf.head = fast_buf.tail = 0; ``` 在 top half 則透過上述的 fast_buf_put 將`update_simrupt_data` 產生的資料儲存在 fast buffer 中。 ```c /* Generate new data from the simulated device */ static inline int update_simrupt_data(void) { simrupt_data = max((simrupt_data + 1) % 0x7f, 0x20); return simrupt_data; } ``` 其中將 simrupt_data 限制在 0x20以內以防止小於ASCII 的空白字元 `' '`,並且透過 `% 0x7f` 讓資料循環在 `0x20~0x7f` 之間。 ### Tasklet & Workqueue 指定 tasklet 的 callback 的函式為 `simrupt_tasklet_func()`。 ```c static DECLARE_TASKLET_OLD(simrupt_tasklet, simrupt_tasklet_func); ``` 並使用 `tasklet_schedule` 將任務排程 ```c tasklet_schedule(&simrupt_tasklet); ``` #### simrupt_tasklet_func 任務排程後,`simrupt_tasklet_func` 使用 `queue_work` 將工作項目 work(work item)加入工作佇列 simrupt_workqueue 中排程執行並計算執行時間。 ```c queue_work(simrupt_workqueue, &work); ``` 其中工作佇列及每個 work 所綁定的 function 如下 ```c /* Workqueue for asynchronous bottom-half processing */ static struct workqueue_struct *simrupt_workqueue; /* Work item: holds a pointer to the function that is going to be executed * asynchronously. */ static DECLARE_WORK(work, simrupt_work_func); ``` simrupt_workqueue 是模組中的工作佇列,當 work 被排入該佇列並被 kernel thread 執行時,將會執行綁定的處理函式 simrupt_work_func()。 #### simrupt_work_func 當 simrupt_work_func 執行時,會避免目前是執行 softirq 或是 interrupt 的情況,使用到 `WARN_ON_ONCE` 來警告。 ```c /* This code runs from a kernel thread, so softirqs and hard-irqs must * be enabled. */ WARN_ON_ONCE(in_softirq()); WARN_ON_ONCE(in_interrupt()); ``` 透過 `get_cpu()` 來取得目前 CPU 編號並 disable preemption,最後需要 put_cpu() 重新 enable preemption。 ```c /* Pretend to simulate access to per-CPU data, disabling preemption * during the pr_info(). */ cpu = get_cpu(); pr_info("simrupt: [CPU#%d] %s\n", cpu, __func__); put_cpu(); ``` 在接下來的迴圈中,會不斷取出 circular buffer 中的資料 (consumer) 並且放入 kfifo buffer 中 (producer)。其中,為了避免不同的任務在執行過程中同時取得 circular buffer 的資料,或不同任務同時寫入 kfifo buffer 中,因此要使用 mutex_lock 來鎖住 critical section 。 ```c while (1) { /* Consume data from the circular buffer */ mutex_lock(&consumer_lock); val = fast_buf_get(); mutex_unlock(&consumer_lock); if (val < 0) break; /* Store data to the kfifo buffer */ mutex_lock(&producer_lock); produce_data(val); mutex_unlock(&producer_lock); } ``` 其中 `fast_buf_get` 從 circular buffer 中取得資料,並使用 `produce_data` 放入 kfifo buffer 中。而 producer_lock 及 consumer_lock 則定義為 mutex lock。 ```c static DEFINE_MUTEX(producer_lock); static DEFINE_MUTEX(consumer_lock); ``` 最後的 `wake_up_interruptible` 則對應到 `simrupt_read` 中的 `wait_event_interruptible`。當資料已經被放入 kfifo buffer 中,通知 `simrupt_read` 已經可以讀取。 ```c wake_up_interruptible(&rx_wait); ``` #### produce_data 資料使用 `fast_buf_get` 取出後,藉由 `produce_data` 存入 kfifo_buffer 中 ```c unsigned int len = kfifo_in(&rx_fifo, &val, sizeof(val)); ``` ### simrupt_read 當使用指使用 `cat /dev/simrupt` 開啟裝置時,會依序使用 `open()` 開啟檔案並使用 `read()` 來讀取檔案。對應到程式碼中 `simrupt_open` 及 `simrupt_read` 。 #### simrupt_open 當程式碼第一次被開啟時,會將記數 open_cnt 加一,以開啟 timer 。 ```c if (atomic_inc_return(&open_cnt) == 1) mod_timer(&timer, jiffies + msecs_to_jiffies(delay)); ``` #### simrupt_read 先使用 `access_ok` 來確認使用者提供的指標位置是合法的,並且使用 `mutex_lock_interruptible` 來確認目前是可以讀取的狀態。 ```c if (unlikely(!access_ok(buf, count))) return -EFAULT; if (mutex_lock_interruptible(&read_lock)) return -ERESTARTSYS; ``` 接著使用 `kfifo_to_user` 來將資料從 fifo 中搬移到使用者空間的 buf 中。如果回傳錯誤碼則退出,若成功搬移也退出。若目前 kfifo 中沒有資料則使用 `wait_event_interruptible` 睡眠直到 `wake_up_interruptible` 將其喚醒後再確認 kfifo_len(&rx_fifo) 是否大於 0 若是則回傳 0 。 ```c do { ret = kfifo_to_user(&rx_fifo, buf, count, &read); if (unlikely(ret < 0)) break; if (read) break; if (file->f_flags & O_NONBLOCK) { ret = -EAGAIN; break; } ret = wait_event_interruptible(rx_wait, kfifo_len(&rx_fifo)); } while (ret == 0); ``` 最後再釋放 read_lock。 ```c mutex_unlock(&read_lock); ``` :::danger 使用 bpftool 分析 :::