--- title: 2025 年 Linux 核心設計課程作業 —— kxo (C) image: https://hackmd.io/_uploads/r1ob8RIaa.png description: 藉由改寫井字遊戲來熟悉數值系統和核心程式設計 tags: linux2025 --- # N04: kxo > 主講人: [jserv](https://wiki.csie.ncku.edu.tw/User/jserv) / 課程討論區: [2025 年系統軟體課程](https://www.facebook.com/groups/system.software2025/) :mega: 返回「[Linux 核心設計](https://wiki.csie.ncku.edu.tw/linux/schedule)」課程進度表 ## Linux 核心的並行處理 [simrupt](https://github.com/sysprog21/simrupt) 專案名稱由 simulate 和 interrupt 二個單字組合而來,其作用是模擬 [IRQ 事件](https://www.kernel.org/doc/html/latest/core-api/genericirq.html),並展示以下 Linux 核心機制的運用: - irq - softirq - tasklet - workqueue - kernel thread - [kfifo](https://www.kernel.org/doc/htmldocs/kernel-api/kfifo.html) - [memory barrier](https://www.kernel.org/doc/Documentation/memory-barriers.txt) ### timer_list Linux Kernel 提供兩種類型的計時器,分別是 dynamic timer 和 interval timers ,前者用在 kernel space 當中而後者用在 user space 當中。 `struct timer_list` 則是運用在 kernel space 當中的 dynamic timers ( 參見 [include/linux/timer_types.h](https://elixir.bootlin.com/linux/latest/source/include/linux/timer_types.h) )。 在 [kernel/time/timer.c](https://elixir.bootlin.com/linux/latest/source/kernel/time/timer.c) 當中定義以下函式,注意到每個 cpu 都有自己的 timer 。 ```c void __init init_timers(void) { init_timer_cpus(); posix_cputimers_init_work(); open_softirq(TIMER_SOFTIRQ, run_timer_softirq); } ``` 我們可以利用 `$ sudo cat /proc/timer_list` 來觀察 CPU timer_list 資訊。 ### Interrupt Handling [Interrupts and Interrupt Handling](https://0xax.gitbooks.io/linux-insides/content/Interrupts/index.html) 「中斷」(interrupt)是指處理器接收到來自硬體或軟體的訊號,該訊號表示某事件已發生,此類事件統稱中斷事件。所有可能的中斷事件都會被賦予一個唯一的中斷號。 當處理器接收到中斷訊號時,會根據中斷號查詢中斷向量表 (Interrupt Vector Table)找到並執行對應的程式。負責處理中斷事件的程式稱為「中斷處理程序」(interrupt handler),屬於核心層級的函式。 ![CSAPP_2016](https://hackmd.io/_uploads/S1eU-yYAye.jpg) >節錄自 CS:APP 第八章 與一般核心函式 (kernel function) 相比,中斷處理程序無法進入休眠狀態(無法排程),因此其執行時間應盡可能縮短,以避免長時間佔用處理器資源,影響系統整體效能。然而,中斷事件的處理過程往往涉及大量工作。 為有效分散負載並提升系統即時性,Linux 採用「先上車,後補票」的機制,將中斷處理流程劃分為兩個階段—「中斷處理」及「其後續」,也就是:`Top half`(上車) 與 `Bottom half`(補票) `Top Half` 包括: * 確認中斷來源:在共享 interrupt line 的情況下,檢查硬體狀態以判斷是否為本裝置觸發的中斷。 * Acknowledgement:向硬體發送確認訊號,表示中斷已被接收,防止重複觸發。 * 排程後續處理:標記需延後處理的任務。 * 其他不可延遲的關鍵任務。 這些任務通常在屏蔽其他中斷的狀態下執行,需快速完成,以減少對系統其他部分的影響。 實作後續的處理(`Bottom half`) 有以下三種機制: * softirqs * tasklets * workqueues 延伸閱讀: [Linux 核心設計: 中斷處理和現代架構考量](https://hackmd.io/@sysprog/linux-interrupt) #### softirqs 透過稱為 `ksoftirqd` 的 kernel thread 來達成,每個 CPU 都有一個這樣的 thread ,可透過以下命令觀察 ```shell $ systemd-cgls -k | grep ksoft ├─ 15 [ksoftirqd/0] ├─ 23 [ksoftirqd/1] ├─ 29 [ksoftirqd/2] ├─ 35 [ksoftirqd/3] ├─ 41 [ksoftirqd/4] ├─ 47 [ksoftirqd/5] ├─ 53 [ksoftirqd/6] ├─ 59 [ksoftirqd/7] │ │ └─6999 grep --color=auto ksoft ``` 我們可以在 [kernel/softirq.c](https://elixir.bootlin.com/linux/latest/source/kernel/softirq.c) 看到以下定義,分別有 `softirq_vec, ksoftirqd, softirq_to_name` ,每個 CPU 都有自己的 `ksoftirqd` kernel thread ,而這些 kernel thread 也有各自的 `softirq_vec` ,分別對應到 `softirq_to_name` 所對應的種類。 ```c static struct softirq_action softirq_vec[NR_SOFTIRQS] __cacheline_aligned_in_smp; DEFINE_PER_CPU(struct task_struct *, ksoftirqd); const char * const softirq_to_name[NR_SOFTIRQS] = { "HI", "TIMER", "NET_TX", "NET_RX", "BLOCK", "IRQ_POLL", "TASKLET", "SCHED", "HRTIMER", "RCU" }; ``` 利用以下命令觀察 ```shell $ sudo cat /proc/softirqs CPU0 CPU1 CPU2 CPU3 CPU4 CPU5 CPU6 CPU7 HI: 63 0 1 0 0 11 0 5 TIMER: 91569 81926 83934 103131 74011 127987 113794 125569 NET_TX: 4 2 0 0 0 3 0 33 NET_RX: 4400 2811 4047 3683 2501 5472 2422 152954 BLOCK: 351 282 679 225 358 15623 39399 486 IRQ_POLL: 0 0 0 0 0 0 0 0 TASKLET: 173719 0 57 524 20 42 0 1908 SCHED: 240706 199132 152716 184508 189760 380257 337690 202108 HRTIMER: 0 0 0 0 0 0 0 1 RCU: 142200 142248 117051 133957 138481 245684 225400 148630 ``` 被延遲的 interrupt 會被放到對應的欄位當中,透過 `raise_softirq` 來觸發, `wakeup_softirqd` 則是會觸發當前 CPU 的 `ksoftirqd` kernel thread 。 #### tasklets tasklets 在 Linux kernel 當中的實作位在 [/include/linux/interrupt.h](https://elixir.bootlin.com/linux/latest/source/include/linux/interrupt.h) ```c struct tasklet_struct { struct tasklet_struct *next; unsigned long state; atomic_t count; bool use_callback; union { void (*func)(unsigned long data); void (*callback)(struct tasklet_struct *t); }; unsigned long data; }; ``` 它是實作在 softirq 上,另一種延遲中斷處理的機制,它依賴以下兩種 softirqs * `TASKLET_SOFTIRQ` * `HI_SOFTIRQ` 同一種類型的 tasklets 不能同時在多個處理器上運作,從以上 tasklet_struct 的定義來看,可以剖析它的實作包括 * 指向位於 scheduling queue 當中下一個 tasklet 的指標 * tasklet 的狀態 * 該 tasklet 的 callback 函式 * callback 函式的參數 Linux 核心利用以下兩個函式來標示 tasklet 為 ready to run * `tasklet_schedule()` * `tasklet_hi_schedule()` 這兩個函式實作相近,差別在優先權,第一個函式所標註的 tasklet 優先權最低。 #### workqueues workqueue 和 tasklet 的概念類似,但依舊有差別, tasklet 透過 software interrupt context 來執行,而 workqueue 當中的 work items 則是透過 kernel process ,這代表 work item 的執行不像 tasklet 一樣是 atomic 的 (換言之,整個 tasklet 的函式只能執行在最初被分配到的 CPU 上)。 Kernel 會建立稱為 `worker threads` 的 kernel threads 來處理 work items ,我們可以透過以下命令來觀察這些 kernel threads 。 ```shell $ systemd-cgls -k | grep kworker ├─ 7 [kworker/0:0-events] ├─ 8 [kworker/0:0H-events_highpri] ├─ 25 [kworker/1:0H-events_highpri] ├─ 31 [kworker/2:0H-events_highpri] ├─ 37 [kworker/3:0H-events_highpri] ├─ 43 [kworker/4:0H-events_highpri] ├─ 55 [kworker/6:0H-events_highpri] ├─ 60 [kworker/7:0-events] ├─ 61 [kworker/7:0H-kblockd] ├─ 72 [kworker/3:1-events] ├─ 85 [kworker/3:1H-kblockd] ├─ 88 [kworker/6:1-events] ├─ 90 [kworker/1:1-mm_percpu_wq] ├─ 91 [kworker/2:1-events] ├─ 92 [kworker/5:1-mm_percpu_wq] ├─ 113 [kworker/4:1H-kblockd] ├─ 121 [kworker/u17:0-i915_flip] ├─ 128 [kworker/5:1H-kblockd] ├─ 129 [kworker/7:1H-kblockd] ├─ 148 [kworker/2:1H-kblockd] ├─ 152 [kworker/0:1H-kblockd] ├─ 153 [kworker/1:1H-kblockd] ├─ 166 [kworker/6:1H-kblockd] ├─ 181 [kworker/5:2H-kblockd] ├─ 918 [kworker/0:3-cgroup_destroy] ├─1323 [kworker/6:2-events] ├─1673 [kworker/u17:1] ├─4813 [kworker/4:0-events] ├─6021 [kworker/2:0-cgroup_destroy] ├─6031 [kworker/7:1-events] ├─6086 [kworker/4:1-cgroup_destroy] ├─6123 [kworker/3:0] ├─6140 [kworker/5:0] ├─6159 [kworker/u16:1-events_power_efficient] ├─6180 [kworker/u16:3-events_freezable_power_] ├─6923 [kworker/1:2] ├─7102 [kworker/u16:2-ext4-rsv-conversion] ├─7147 [kworker/u16:4-events_unbound] ├─7191 [kworker/2:2-events] ├─7192 [kworker/u16:0] │ │ └─7195 grep --color=auto kworker ``` `queue_work()` 函式則是可以幫我們把 work item 放置到 workqueue 當中。 ### kfifo [kfifo](https://archive.kernel.org/oldlinux/htmldocs/kernel-api/kfifo.html) 是 Linux 核心裡頭 First-In-First-Out (FIFO) 的結構,在 Single Producer Single Consumer (SPSC) 情況中是 safe 的,即不需要額外的 lock 維護,在程式碼中註解中也有提及。 在此專案中有一個 kfifo 資料結構 rx_fifo,用來儲存即將傳到 userspace 的 data。 ```c /* Data are stored into a kfifo buffer before passing them to the userspace */ static DECLARE_KFIFO_PTR(rx_fifo, unsigned char); /* NOTE: the usage of kfifo is safe (no need for extra locking), until there is * only one concurrent reader and one concurrent writer. Writes are serialized * from the interrupt context, readers are serialized using this mutex. */ static DEFINE_MUTEX(read_lock); ``` 將 Data 插入到 rx_fifo 中,並檢查寫入的長度與避免過度輸出日誌而影響效能,之所以對 len 進行檢查的原因在於 kfifo_in 所回傳之值,是實際成功插入的數量。 ```c /* Insert a value into the kfifo buffer */ static void produce_data(unsigned char val) { /* Implement a kind of circular FIFO here (skip oldest element if kfifo * buffer is full). */ unsigned int len = kfifo_in(&rx_fifo, &val, sizeof(val)); if (unlikely(len < sizeof(val)) && printk_ratelimit()) pr_warn("%s: %zu bytes dropped\n", __func__, sizeof(val) - len); pr_debug("simrupt: %s: in %u/%u bytes\n", __func__, len, kfifo_len(&rx_fifo)); } ``` `kfifo_in(fifo, buf, n);` - 複製 buf 資料並放到 fifo 中,最後會回傳插入的資料大小。 - 在程式碼中,會先確認要放入的大小是否大於剩餘的大小,若超過,則會以剩餘大小為主。 ```c unsigned int __kfifo_in(struct __kfifo *fifo, const void *buf, unsigned int len) { unsigned int l; l = kfifo_unused(fifo); if (len > l) len = l; kfifo_copy_in(fifo, buf, len, fifo->in); fifo->in += len; return len; } ``` `kfifo_to_user(fifo, to, len, copied);` - 將最多 len 個 bytes 資料從 fifo 移到 userspace。 `kfifo_alloc(fifo, size, gfp_mask);` - 動態配置一個 fifo buffer,配置成功會回傳 0。若要釋放 fifo 可藉由 `kfifo_free(fifo);` 實現。 ### fast circular buffer [Circular Buffer](https://www.kernel.org/doc/Documentation/core-api/circular-buffers.rst) 是個固定大小的緩衝區,其中具有 2 個 indicies: - `head index`: the point at which the producer inserts items into the buffer. - `tail index`: the point at which the consumer finds the next item in the buffer. 當 head 和 tail 重疊時,代表目前是空的緩衝區。相反的,當 head 比 tail 少 1 時,代表緩衝區是滿的。 當有項目被添加時,head index 會增加,當有項目被移除時,tail index 會被增加,tail 不會超過 head,且當兩者都到達緩衝區的末端時,都必須被設定回 0。也可以藉由此方法清除緩衝區中的資料。 ```c { ... /* Allocate fast circular buffer */ fast_buf.buf = vmalloc(PAGE_SIZE); ... /* Clear all data from the circular buffer fast_buf */ fast_buf.head = fast_buf.tail = 0; } ``` Measuring power-of-2 buffers: 讓緩衝區大小維持 2 的冪,就可以使用 bitwise 操作去計算緩衝區空間,避免使用較慢的 modulus (divide) 操作。 - [ ] [include/linux/circ_buf.h](https://elixir.bootlin.com/linux/latest/source/include/linux/circ_buf.h) ```c struct circ_buf { char *buf; int head; int tail; }; /* Return count in buffer. */ #define CIRC_CNT(head,tail,size) (((head) - (tail)) & ((size)-1)) /* Return space available, 0..size-1. We always leave one free char * as a completely full buffer has head == tail, which is the same as * empty. */ #define CIRC_SPACE(head,tail,size) CIRC_CNT((tail),((head)+1),(size)) /* Return count up to the end of the buffer. Carefully avoid * accessing head and tail more than once, so they can change * underneath us without returning inconsistent results. */ #define CIRC_CNT_TO_END(head,tail,size) \ ({int end = (size) - (tail); \ int n = ((head) + end) & ((size)-1); \ n < end ? n : end;}) /* Return space available up to the end of the buffer. */ #define CIRC_SPACE_TO_END(head,tail,size) \ ({int end = (size) - 1 - (head); \ int n = (end + (tail)) & ((size)-1); \ n <= end ? n : end+1;}) ``` `CIRC_SPACE*()` 被 producer 使用,`CIRC_CNT*()` 是 consumer 所用。 在 simrupt 中,一個「更快」的 circular buffer 被拿來儲存即將要放到 kfifo 的資料。 ```c /* We use an additional "faster" circular buffer to quickly store data from * interrupt context, before adding them to the kfifo. */ static struct circ_buf fast_buf; ``` `READ_ONCE()` 是個 relaxed-ordering 且保證 atomic 的 memory operation,可以確保在多執行緒環境中,讀取到的值是正確的,並保證讀寫操作不會被編譯器最佳化所影響。 `smp_rmb()` 是個 memory barrier,會防止記憶體讀取指令的重排,確保先讀取索引值後再讀取內容。在〈[Lockless patterns: relaxed access and partial memory barriers](https://lwn.net/Articles/846700/)〉提到 `smp_rmb()` 與 `smp_wmb()` 的 barrier 效果比 `smp_load_acquire()` 與 `smp_store_release()` 還要來的差,但是因為 load-store 之間的排序關係很少有影響,所以開發人員常以 `smp_rmb()` 和 `smp_wmb()` 作為 memory barrier。 `fast_buf_get` 扮演一個 consumer 的角色,會從緩衝區中取得資料,並更新 tail index。 ```c static int fast_buf_get(void) { struct circ_buf *ring = &fast_buf; /* prevent the compiler from merging or refetching accesses for tail */ unsigned long head = READ_ONCE(ring->head), tail = ring->tail; int ret; if (unlikely(!CIRC_CNT(head, tail, PAGE_SIZE))) return -ENOENT; /* read index before reading contents at that index */ smp_rmb(); /* extract item from the buffer */ ret = ring->buf[tail]; /* finish reading descriptor before incrementing tail */ smp_mb(); /* increment the tail pointer */ ring->tail = (tail + 1) & (PAGE_SIZE - 1); return ret; } ``` fast_buf_put 扮演一個 producer 的角色,藉由 `CIRC_SPACE()` 判斷 buffer 中是否有剩餘空間,並更新 head index。 ```c static int fast_buf_put(unsigned char val) { struct circ_buf *ring = &fast_buf; unsigned long head = ring->head; /* prevent the compiler from merging or refetching accesses for tail */ unsigned long tail = READ_ONCE(ring->tail); /* is circular buffer full? */ if (unlikely(!CIRC_SPACE(head, tail, PAGE_SIZE))) return -ENOMEM; ring->buf[ring->head] = val; /* commit the item before incrementing the head */ smp_wmb(); /* update header pointer */ ring->head = (ring->head + 1) & (PAGE_SIZE - 1); return 0; } ``` process_data 函式呼叫 `fast_buf_put(update_simrupt_data());` ,其中 `update_simrupt_data()` 會產生資料,這些資料的範圍在 `0x20` 到 `0x7E` 之間,即 ASCII 中的可顯示字元,這些資料會被放入 circular buffer 中,最後交由 `tasklet_schedule` 進行排程。 ```c static void process_data(void) { WARN_ON_ONCE(!irqs_disabled()); pr_info("simrupt: [CPU#%d] produce data\n", smp_processor_id()); fast_buf_put(update_simrupt_data()); pr_info("simrupt: [CPU#%d] scheduling tasklet\n", smp_processor_id()); tasklet_schedule(&simrupt_tasklet); } ``` ### tasklet tasklet 是基於 softirq 之上建立的,但最大的差別在於 tasklet 可以動態配置且可以被用在驅動裝置上。 tasklet 可以被 workqueues, timers 或 threaded interrupts 取代,但 kernel 中尚有使用 tasklet 的情況,Linux 核心開發者已著手 API 變更,而 `DECLARE_TASKLET_OLD` 的存在是顧及相容性。 [Modernizing the tasklet API](https://lwn.net/Articles/830964/) ```c #define DECLARE_TASKLET_OLD(arg1, arg2) DECLARE_TASKLET(arg1, arg2, 0L) ``` 首先會先確保函式在 interrupt context 和 softirq context 中執行,使用 queue_work 將 work 放入 workqueue 中,並記錄執行時間。 ```c /** * queue_work - queue work on a workqueue * @wq: workqueue to use * @work: work to queue */ static inline bool queue_work(struct workqueue_struct *wq, struct work_struct *work) { return queue_work_on(WORK_CPU_UNBOUND, wq, work); } ``` ```c /* Tasklet handler. * * NOTE: different tasklets can run concurrently on different processors, but * two of the same type of tasklet cannot run simultaneously. Moreover, a * tasklet always runs on the same CPU that schedules it. */ static void simrupt_tasklet_func(unsigned long __data) { ktime_t tv_start, tv_end; s64 nsecs; WARN_ON_ONCE(!in_interrupt()); WARN_ON_ONCE(!in_softirq()); tv_start = ktime_get(); queue_work(simrupt_workqueue, &work); tv_end = ktime_get(); nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start)); pr_info("simrupt: [CPU#%d] %s in_softirq: %llu usec\n", smp_processor_id(), __func__, (unsigned long long) nsecs >> 10); } /* Tasklet for asynchronous bottom-half processing in softirq context */ static DECLARE_TASKLET_OLD(simrupt_tasklet, simrupt_tasklet_func); ``` 藉由上述註解可以得知: - 不同 tasklet 可以在不同 CPU 同時執行 - 相同 tasklet 不能同時執行 - 一個 tasklet 只會在排程它的 CPU 上執行 | | softirq | tasklet | |:------------------------:|:-------:|:-------:| | 多個在同一個 CPU 執行? | No | No | | 相同的可在不同 CPU 執行? | Yes | No | | 會在同個 CPU 執行? | Yes | Maybe | 當 `tasklet_schedule()`被呼叫時,代表此 tasklet 被允許在 CPU 上執行,詳見 [linux/include/linux/interrupt.h](https://github.com/torvalds/linux/blob/master/include/linux/interrupt.h) ### workqueue [linux/include/linux/workqueue.h](https://elixir.bootlin.com/linux/latest/source/include/linux/workqueue.h) 定義兩個 mutex lock: `producer_lock` 和 `consumer_lock`。 ```c /* Mutex to serialize kfifo writers within the workqueue handler */ static DEFINE_MUTEX(producer_lock); /* Mutex to serialize fast_buf consumers: we can use a mutex because consumers * run in workqueue handler (kernel thread context). */ static DEFINE_MUTEX(consumer_lock); ``` `get_cpu()` 獲取目前 CPU 編號並 disable preemption,最後需要 `put_cpu()` 重新 enable preemption。 24-26行使用 `mutex_lock(&consumer_lock)` 鎖住消費者區域,防止其它的任務取得 circular buffer 的資料。 32-34行使用 `mutex_lock(&producer_lock)` 鎖住生產者區域,防止其它的任務寫入 kfifo buffer。 `wake_up_interruptible(&rx_wait)` 會喚醒 wait queue 上的行程,將其狀態設置為 TASK_RUNNING。 ```c= /* Wait queue to implement blocking I/O from userspace */ static DECLARE_WAIT_QUEUE_HEAD(rx_wait); /* Workqueue handler: executed by a kernel thread */ static void simrupt_work_func(struct work_struct *w) { int val, cpu; /* This code runs from a kernel thread, so softirqs and hard-irqs must * be enabled. */ WARN_ON_ONCE(in_softirq()); WARN_ON_ONCE(in_interrupt()); /* Pretend to simulate access to per-CPU data, disabling preemption * during the pr_info(). */ cpu = get_cpu(); pr_info("simrupt: [CPU#%d] %s\n", cpu, __func__); put_cpu(); while (1) { /* Consume data from the circular buffer */ mutex_lock(&consumer_lock); val = fast_buf_get(); mutex_unlock(&consumer_lock); if (val < 0) break; /* Store data to the kfifo buffer */ mutex_lock(&producer_lock); produce_data(val); mutex_unlock(&producer_lock); } wake_up_interruptible(&rx_wait); } ``` 在 workqueue 中執行的 work,可由以下方式定義。 - `DECLARE_WORK(name, void (*func) (void *), void *data)` 會在編譯時,靜態地初始化 work。 - `INIT_WORK(struct work_struct *work, woid(*func) (void *), void *data)` 在執行時,動態地初始化一個 work。 ```c /* Workqueue for asynchronous bottom-half processing */ static struct workqueue_struct *simrupt_workqueue; /* Work item: holds a pointer to the function that is going to be executed * asynchronously. */ static DECLARE_WORK(work, simrupt_work_func); ``` ### timer 藉由 `timer_setup()` 初始化 timer。 ```c /* Setup the timer */ timer_setup(&timer, timer_handler, 0); void timer_setup(struct timer_list * timer, void (*function)(struct timer_list *), unsigned int flags); ``` 目標是模擬 hard-irq,所以必須確保目前是在 softirq context,欲模擬在 interrupt context 中處理中斷,所以針對該 CPU disable interrupts。 ```c /* Timer to simulate a periodic IRQ */ static struct timer_list timer; static void timer_handler(struct timer_list *__timer) { ktime_t tv_start, tv_end; s64 nsecs; pr_info("simrupt: [CPU#%d] enter %s\n", smp_processor_id(), __func__); /* We are using a kernel timer to simulate a hard-irq, so we must expect * to be in softirq context here. */ WARN_ON_ONCE(!in_softirq()); /* Disable interrupts for this CPU to simulate real interrupt context */ local_irq_disable(); tv_start = ktime_get(); process_data(); tv_end = ktime_get(); nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start)); pr_info("simrupt: [CPU#%d] %s in_irq: %llu usec\n", smp_processor_id(), __func__, (unsigned long long) nsecs >> 10); mod_timer(&timer, jiffies + msecs_to_jiffies(delay)); local_irq_enable(); } ``` 使用 mod_timer 對 timer 進行排程。 [Jiffy](https://en.wikipedia.org/wiki/Jiffy_(time)) 表示不具體的非常短暫的時間段,可藉由以下公式進行轉換。 ``` jiffies_value = seconds_value * HZ; seconds_value = jiffies_value / HZ; ``` ### simrupt_init 該函式進行許多資料結構的初始化: 1. 配置給 kfifo 一個 PAGE_SIZE 大小的空間 ```c kfifo_alloc(&rx_fifo, PAGE_SIZE, GFP_KERNEL) ``` 2. 配置給 circular buffer 一個 PAGE_SIZE 大小的空間 ```c fast_buf.buf = vmalloc(PAGE_SIZE); ``` 3. 為 simrupt 註冊一個設備號,並且加入到系統中 ```c ret = alloc_chrdev_region(&dev_id, 0, NR_SIMRUPT, DEV_NAME); ... cdev_init(&simrupt_cdev, &simrupt_fops); ret = cdev_add(&simrupt_cdev, dev_id, NR_SIMRUPT); ``` 4. 註冊設備到 sysfs,即可允許使用者空間運用裝置檔案 `/dev/simrupt` 來存取和控制該設備 ```c device_create(simrupt_class, NULL, MKDEV(major, 0), NULL, DEV_NAME); ``` 5. 配置一個新的 workqueue ```c simrupt_workqueue = alloc_workqueue("simruptd", WQ_UNBOUND, WQ_MAX_ACTIVE); ``` 6. 設定 timer ```c timer_setup(&timer, timer_handler, 0); ``` ### simrupt 流程圖 ```graphviz digraph simrupt { node [shape = box] rankdir = TD timer_handler [label="timer_handler"] process_data [label = "process_data"] update_simrupt_data [label="update_simrupt_data"] fast_circular_buffer [label="fast_circular_buffer", shape=ellipse] simrupt_tasklet_func [label = "simrupt_tasklet_func"] simrupt_work_func [label = "simrupt_work_func"] kfifo [label = "kfifo", shape=ellipse] timer_handler -> process_data process_data -> update_simrupt_data update_simrupt_data -> fast_circular_buffer [label = " fast_buf_put"] process_data -> simrupt_tasklet_func [label = " tasklet_schedule"] simrupt_tasklet_func -> simrupt_work_func [label = " queue_work"] fast_circular_buffer -> simrupt_work_func [label = "fast_buf_get"] simrupt_work_func -> kfifo [label = " produce_data"] {rank=same update_simrupt_data simrupt_tasklet_func} } ``` ### simrupt 的使用 掛載核心模組。 ```shell $ sudo insmod simrupt.ko ``` 掛載後,會產生一個裝置檔案`/dev/simrupt`,藉由以下命令可見到輸出的資料。 ```shell $ sudo cat /dev/simrupt ``` 參考輸出: (可能會有異) ```shell !"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefghijklmnopqrstuvwxyz{|}~ ``` `dmesg` 顯示核心訊息,加入 `--follow` 可即時查看。 ```shell sudo dmesg --follow ``` 參考輸出: ```shell [882265.813265] simrupt: [CPU#3] enter timer_handler [882265.813297] simrupt: [CPU#3] produce data [882265.813299] simrupt: [CPU#3] scheduling tasklet [882265.813300] simrupt: [CPU#3] timer_handler in_irq: 2 usec [882265.813350] simrupt: [CPU#3] simrupt_tasklet_func in_softirq: 0 usec [882265.813383] simrupt: [CPU#5] simrupt_work_func ``` ## Linux 核心的 kfifo [kfifo](https://archive.kernel.org/oldlinux/htmldocs/kernel-api/kfifo.html) 是一個 [Circular buffer](https://en.wikipedia.org/wiki/Circular_buffer) 的資料結構,而 ring-buffer 就是參考 kfifo 所實作。 在 simrupt_init 會先配置 buffer,使其具備一個 PAGE 的空間。`fast_buf.buf = vmalloc(PAGE_SIZE);` 將 buffer 的虛擬記憶體位址存在 `fast_buf.buf` 。 ```graphviz digraph structs { rankdir = LR node[shape=plaintext] node[shape=ellipse] main [label= "main thread"] worker1 [label = "worker thread"] worker2 [label = "worker thread"] worker3 [label = "worker thread"] node[shape=box]; kfifo [label = "kfifo\n(1 PAGESIZE)"]; buffer [label = "buffer\n(1 PAGESIZE)"]; { main->buffer [label = "put data"] buffer->worker1 [label = "get data" color="blue"] worker1->kfifo [label = "put data" color="blue"] buffer->worker2 [label = "get data" color="blue"] worker2->kfifo [label = "put data" color="blue"] buffer->worker3 [label = "get data" color="blue"] worker3->kfifo [label = "put data" color="blue"] kfifo->userspace [label ="kfifo_to_user()"] } } ``` 主執行緒會將更新的字元放入 buffer 中,而每個 worker thread 則是使用函式 `fast_buf_get()` 從 buffer 取出資料後,藉由 `produce_data()` 放到 kfifo。 kfifo 適合的使用情境,可以在 [linux/kfifo.h](https://github.com/torvalds/linux/blob/master/include/linux/kfifo.h) 中看到: ```c /* * Note about locking: There is no locking required until only one reader * and one writer is using the fifo and no kfifo_reset() will be called. * kfifo_reset_out() can be safely used, until it will be only called * in the reader thread. * For multiple writer and one reader there is only a need to lock the writer. * And vice versa for only one writer and multiple reader there is only a need * to lock the reader. */ ``` 選定 [linux/samples/kfifo/](https://github.com/torvalds/linux/tree/master/samples/kfifo) 作為應用案例,並參考 [kfifo-examples](https://github.com/sysprog21/kfifo-examples) 進行實驗。 - [ ] 應用案例: `record-example.c` * 定義一個大小為 100 的 char 陣列 buf,用於臨時儲存資料。 - 定義一個 struct hello,包含一個 unsigned char buf,且該 buf 初始化為 "hello"。 * `kfifo_in(&test, &hello, sizeof(hello))` 將 struct hello 寫入 kfifo buffer,並用 `kfifo_peek_len(&test)` 印出 kfifo buffer 下一個 record 的大小。 * for 迴圈裡面,每次會使用 memset 將 buf 的前 i+1 個元素設置為 'a'+i,並用 `kfifo_in(&test, buf, i + 1)` 寫入 kfifo buffer。 * `kfifo_skip(&test)` 跳過 kfifo buffer 的第一個值,即跳過 "hello"。 * `kfifo_out_peek(&test, buf, sizeof(buf)` 會在不刪除元素情況下,印出 kfifo buffer 的第一個元素。 * `kfifo_len(&test)` 印出目前 kfifo buffer 已佔用的空間。 * while 迴圈用 `kfifo_out(&test, buf, sizeof(buf))` 逐一比對 kfifo buffer 中的元素是不是和 excepted_result 中的元素一樣。 ```c static int __init testfunc(void) { char buf[100]; unsigned int i; unsigned int ret; struct { unsigned char buf[6]; } hello = { "hello" }; printk(KERN_INFO "record fifo test start\n"); kfifo_in(&test, &hello, sizeof(hello)); /* show the size of the next record in the fifo */ printk(KERN_INFO "fifo peek len: %u\n", kfifo_peek_len(&test)); /* put in variable length data */ for (i = 0; i < 10; i++) { memset(buf, 'a' + i, i + 1); kfifo_in(&test, buf, i + 1); } /* skip first element of the fifo */ printk(KERN_INFO "skip 1st element\n"); kfifo_skip(&test); printk(KERN_INFO "fifo len: %u\n", kfifo_len(&test)); /* show the first record without removing from the fifo */ ret = kfifo_out_peek(&test, buf, sizeof(buf)); if (ret) printk(KERN_INFO "%.*s\n", ret, buf); /* check the correctness of all values in the fifo */ i = 0; while (!kfifo_is_empty(&test)) { ret = kfifo_out(&test, buf, sizeof(buf)); buf[ret] = '\0'; printk(KERN_INFO "item = %.*s\n", ret, buf); if (strcmp(buf, expected_result[i++])) { printk(KERN_WARNING "value mismatch: test failed\n"); return -EIO; } } if (i != ARRAY_SIZE(expected_result)) { printk(KERN_WARNING "size mismatch: test failed\n"); return -EIO; } printk(KERN_INFO "test passed\n"); return 0; } ``` 掛載核心模組。 ```shell $ sudo insmod record-example.ko ``` 利用 dmesg 查看核心訊息 ```shell $ sudo dmesg [360087.628314] record fifo test start [360087.628316] fifo peek len: 6 [360087.628317] skip 1st element [360087.628317] fifo len: 65 [360087.628318] a [360087.628318] item = a [360087.628319] item = bb [360087.628319] item = ccc [360087.628319] item = dddd [360087.628319] item = eeeee [360087.628320] item = ffffff [360087.628320] item = ggggggg [360087.628320] item = hhhhhhhh [360087.628321] item = iiiiiiiii [360087.628321] item = jjjjjjjjjj [360087.628321] test passed ``` - [ ] 應用案例: `bytestream-example.c` * 分別用 `kfifo_in` 與 `kfifo_put` 將字串 "hello" 與數字 0-9 放入 kfifo buffer。 - `kfifo_in`: 可一次將 n Bytes 的 object 放到 kfifo buffer 中。 - `kfifo_put`: 與 `kfifo_in` 相似,只是用來處理要將單一個值放入 kfifo buffer 的情境,若要插入時,buffer 已滿,則會返回 0。 * `kfifo_out` 先將 kfifo buffer 前 5 個值拿出,即 "hello"。 * `kfifo_out` 將 kfifo buffer 前 2 個值 (0、1) 拿出,再用 `kfifo_in` 重新將 0、1 放入 kfifo buffer,並用 `kfifo_skip` 拿出並忽略 buffer 中第一個值。 * 值從 20 開始,逐一將大小為 32B 的 kfifo buffer 填滿。 * 並用 `kfifo_get` 逐一檢查 buffer 內的值是否與 expected_result 中的值一樣,若一樣,則 test passed。 ```c static int __init testfunc(void) { unsigned char buf[6]; unsigned char i, j; unsigned int ret; printk(KERN_INFO "byte stream fifo test start\n"); /* put string into the fifo */ kfifo_in(&test, "hello", 5); /* put values into the fifo */ for (i = 0; i != 10; i++) kfifo_put(&test, i); /* show the number of used elements */ printk(KERN_INFO "fifo len: %u\n", kfifo_len(&test)); /* get max of 5 bytes from the fifo */ i = kfifo_out(&test, buf, 5); printk(KERN_INFO "buf: %.*s\n", i, buf); /* get max of 2 elements from the fifo */ ret = kfifo_out(&test, buf, 2); printk(KERN_INFO "ret: %d\n", ret); /* and put it back to the end of the fifo */ ret = kfifo_in(&test, buf, ret); printk(KERN_INFO "ret: %d\n", ret); /* skip first element of the fifo */ printk(KERN_INFO "skip 1st element\n"); kfifo_skip(&test); /* put values into the fifo until is full */ for (i = 20; kfifo_put(&test, i); i++) ; printk(KERN_INFO "queue len: %u\n", kfifo_len(&test)); /* show the first value without removing from the fifo */ if (kfifo_peek(&test, &i)) printk(KERN_INFO "%d\n", i); /* check the correctness of all values in the fifo */ j = 0; while (kfifo_get(&test, &i)) { printk(KERN_INFO "item = %d\n", i); if (i != expected_result[j++]) { printk(KERN_WARNING "value mismatch: test failed\n"); return -EIO; } } if (j != ARRAY_SIZE(expected_result)) { printk(KERN_WARNING "size mismatch: test failed\n"); return -EIO; } printk(KERN_INFO "test passed\n"); return 0; } ``` 掛載核心模組。 ```shell $ sudo insmod bytestream-example.ko ``` 利用 dmesg 查看核心訊息 ```shell $ sudo dmesg [130567.107610] byte stream fifo test start [130567.107612] fifo len: 15 [130567.107613] buf: hello [130567.107614] ret: 2 [130567.107614] ret: 2 [130567.107614] skip 1st element [130567.107615] queue len: 32 [130567.107615] 3 [130567.107615] item = 3 [130567.107615] item = 4 [130567.107616] item = 5 [130567.107616] item = 6 [130567.107616] item = 7 [130567.107616] item = 8 [130567.107617] item = 9 [130567.107617] item = 0 [130567.107617] item = 1 [130567.107617] item = 20 [130567.107618] item = 21 [130567.107618] item = 22 [130567.107618] item = 23 [130567.107618] item = 24 [130567.107619] item = 25 [130567.107619] item = 26 [130567.107619] item = 27 [130567.107619] item = 28 [130567.107619] item = 29 [130567.107620] item = 30 [130567.107620] item = 31 [130567.107620] item = 32 [130567.107620] item = 33 [130567.107621] item = 34 [130567.107621] item = 35 [130567.107621] item = 36 [130567.107621] item = 37 [130567.107622] item = 38 [130567.107622] item = 39 [130567.107622] item = 40 [130567.107622] item = 41 [130567.107623] item = 42 [130567.107623] test passed ``` - [ ] 應用案例: 生產者與消費者 > [producer-consumer.c](https://github.com/brianlin314/kfifo-examples/blob/master/producer-consumer.c) 設計一個 kfifo 的生產者與消費者實驗,包含一個 producer 與一個 consumer,producer 函式每 1 秒會將一個值放入 kfifo 中,並從 1 遞增到 10,而consumer 函式每 2 秒會消耗一個 kfifo 的值。 ```c static int producer(void *data) { unsigned char i; for (i = 1; i <= 10; i++) { kfifo_put(&test, i); pr_info("Producer inserted value: %d\n", i); msleep(1000); } kthread_stop(producer_thread); return 0; } static int consumer(void *data) { unsigned char j; unsigned char buf[10]; unsigned int ret; for (j = 1; j <= 5; j++) { ret = kfifo_out(&test, buf, 1); if (ret) { pr_info("Consumer removed value: %d\n", j); } else { pr_info("Consumer failed to remove value from kfifo\n"); } msleep(2000); } kthread_stop(consumer_thread); return 0; } ``` 在 example_init 中,使用 `kthread_run` 建立兩個 kernel thread,分別是 `producer_thread` 與 `consumer_thread`。 ```c producer_thread = kthread_run(producer, NULL, "producer_thread"); ... consumer_thread = kthread_run(consumer, NULL, "consumer_thread"); ``` 在 `example_exit` 中,會用 `kfifo_get` 逐一檢查 kfifo 剩餘的值是否與 expected_result 相同。 ```c static void __exit example_exit(void) { unsigned char i, j; /* check the correctness of all values in the fifo */ j = 0; while (kfifo_get(&test, &i)) { pr_info("kfifo item = %d\n", i); if (i != expected_result[j++]) { pr_warn("value mismatch: test failed\n"); goto error_EIO; } } pr_info("test passed\n"); kfifo_free(&test); pr_info("kfifo Example Exit\n"); error_EIO: kfifo_free(&test); } ``` ```shell $ make check $ sudo dmesg [96656.871161] kfifo Example Init [96656.871280] Producer inserted value: 1 [96656.871364] Consumer removed value: 1 [96657.890006] Producer inserted value: 2 [96658.882042] Consumer removed value: 2 [96658.914025] Producer inserted value: 3 [96659.937999] Producer inserted value: 4 [96660.897975] Consumer removed value: 3 [96660.961976] Producer inserted value: 5 [96661.985950] Producer inserted value: 6 [96662.917915] Consumer removed value: 4 [96663.009917] Producer inserted value: 7 [96664.033895] Producer inserted value: 8 [96664.929874] Consumer removed value: 5 [96665.057866] Producer inserted value: 9 [96666.081860] Producer inserted value: 10 [96801.846529] kfifo item = 6 [96801.846536] kfifo item = 7 [96801.846539] kfifo item = 8 [96801.846540] kfifo item = 9 [96801.846542] kfifo item = 10 [96801.846544] test passed [96801.846546] kfifo Example Exit ```