# 2024q1 Homework6 (integration) contributed by < [jeremy90307](https://github.com/jeremy90307) > ## 閱讀 [The Linux Kernel Module Programming Guide](https://sysprog21.github.io/lkmpg/#headers) 花了些許時間閱讀,把相關問題及重點紀錄在這 ----> [The Linux Kernel Module Programming Guide 筆記](https://hackmd.io/@jeremytsai/HW6_note) ### 解釋 [simrupt](https://github.com/sysprog21/simrupt) 程式碼裡頭的 mutex lock 的使用方式,並探討能否改寫為 [lock-free](https://hackmd.io/@sysprog/concurrency-lockfree) > 參考 : [2021 年的筆記](https://hackmd.io/@linD026/simrupt-vwifi) ### kfifo **解釋** > 參考 [kfifo(linux kernel 无锁队列)](https://zhuanlan.zhihu.com/p/500070147) > kfifo 使用 in 和 out 兩個變量作為進入和離開的索引,如圖 ![image](https://hackmd.io/_uploads/HyvH6mT-0.png) - 進入 n 筆資料時, in 變量就 `+ n` - 離開 n 筆資料時, out 變量就 `- n` - out 不允許大於 in (out = in 表示 fifo 為空) - in 不允許比 out 大(即超出 fifo 的總大小,會有液出) 在 simrupt.c 的註釋中解釋為何無須再使用額外的鎖 ``` /* NOTE: the usage of kfifo is safe (no need for extra locking), until there is * only one concurrent reader and one concurrent writer. Writes are serialized * from the interrupt context, readers are serialized using this mutex. */ ``` 使用到的 kfifo API `DECLARE_KFIFO_PTR` — macro to declare a fifo pointer object `kfifo_in` - put data into the fifo `kfifo_to_user` — copies data from the fifo into user space `kfifo_len` — returns the number of used elements in the fifo `kfifo_alloc` — dynamically allocates a new fifo buffer `kfifo_free` — frees the fifo > 更多 FIFO 的 API 參考 -> [kfifo](https://archive.kernel.org/oldlinux/htmldocs/kernel-api/kfifo.html) 在 [/include/linux/kfifo.h](https://elixir.bootlin.com/linux/latest/source/include/linux/kfifo.h) 可以看到 **kfifo 的結構體** ```c struct __kfifo { unsigned int in; // 進入索引 unsigned int out; // 離開索引 unsigned int mask; // 為佇列總大小 - 1 unsigned int esize; // 單個元素大小 void *data; // 指向佇列記憶體位置 }; #define __STRUCT_KFIFO_COMMON(datatype, recsize, ptrtype) \ union { \ struct __kfifo kfifo; \ datatype *type; \ const datatype *const_type; \ char (*rectype)[recsize]; \ ptrtype *ptr; \ ptrtype const *ptr_const; \ } ``` - `kfifo_in` ```c unsigned int __kfifo_in(struct __kfifo *fifo, const void *buf, unsigned int len) { unsigned int l; l = kfifo_unused(fifo); if (len > l) len = l; kfifo_copy_in(fifo, buf, len, fifo->in); fifo->in += len; return len; } ``` `kfifo_unused(fifo)` 用作查詢剩餘大小,若進入的大小大於剩餘大小,則以剩餘的大小為主 `len = l` ,最後 `kfifo_copy_in` 複製資料至記憶體。 ```c /* * internal helper to calculate the unused elements in a fifo */ static inline unsigned int kfifo_unused(struct __kfifo *fifo) { return (fifo->mask + 1) - (fifo->in - fifo->out); } ``` `fifo->mask + 1` 為整個佇列的大小減去已用大小 `fifo->in - fifo->out` ```c static void kfifo_copy_in(struct __kfifo *fifo, const void *src, unsigned int len, unsigned int off) { unsigned int size = fifo->mask + 1; unsigned int esize = fifo->esize; unsigned int l; off &= fifo->mask; if (esize != 1) { off *= esize; size *= esize; len *= esize; } l = min(len, size - off); memcpy(fifo->data + off, src, l); memcpy(fifo->data, src + l, len - l); /* * make sure that the data in the fifo is up to date before * incrementing the fifo->in index counter */ smp_wmb(); } ``` :::info 為何使用兩次 `memcpy` ? ::: - `kfifo_alloc` 申請記憶體空間並初始化 kfifo ```c int __kfifo_alloc(struct __kfifo *fifo, unsigned int size, size_t esize, gfp_t gfp_mask) { /* * round up to the next power of 2, since our 'let the indices * wrap' technique works only in this case. */ size = roundup_pow_of_two(size); fifo->in = 0; fifo->out = 0; fifo->esize = esize; if (size < 2) { fifo->data = NULL; fifo->mask = 0; return -EINVAL; } fifo->data = kmalloc_array(esize, size, gfp_mask); if (!fifo->data) { fifo->mask = 0; return -ENOMEM; } fifo->mask = size - 1; return 0; } ``` 當中 `roundup_pow_of_two` 可以求出 size 向上最接近的 2 的冪次 程式碼取自 [/include/linux/log2.h](https://elixir.bootlin.com/linux/latest/source/include/linux/log2.h#L174) ```c /** * roundup_pow_of_two - round the given value up to nearest power of two * @n: parameter * * round the given value up to the nearest power of two * - the result is undefined when n == 0 * - this can be used to initialise global variables from constant data */ #define roundup_pow_of_two(n) \ ( \ __builtin_constant_p(n) ? ( \ ((n) == 1) ? 1 : \ (1UL << (ilog2((n) - 1) + 1)) \ ) : \ __roundup_pow_of_two(n) \ ) ``` 當中 `__builtin_constant_p(n)` 函式中的 n 若為常數回傳 1 ,反之為 0 。 > 參考 [Other Built-in Functions Provided by GCC](https://gcc.gnu.org/onlinedocs/gcc/Other-Builtins.html) ### circular buffer > 參考 > -> [並行程式設計: Ring buffer](https://hackmd.io/@sysprog/concurrency/%2F%40sysprog%2Fconcurrency-ringbuffer#Ring-Buffer) > -> [circular buffer](https://zh.wikipedia.org/zh-tw/%E7%92%B0%E5%BD%A2%E7%B7%A9%E8%A1%9D%E5%8D%80) 先看註釋描述,看似要做出一個更快速的 circular buffer ,並結合 kfifo 。 :::info 更快速? ::: ``` /* We use an additional "faster" circular buffer to quickly store data from * interrupt context, before adding them to the kfifo. */ ``` 查找 [/include/linux/circ_buf.h](https://elixir.bootlin.com/linux/latest/source/include/linux/circ_buf.h) 來了解 simrupt 中 circular buffer 使用到的 API **結構體** - `buf` : real pointer that contains the array - `head`,`tail` : head and tail indices ```c struct circ_buf { char *buf; int head; int tail; }; ``` `CIRC_CNT` 回傳 buffer 被佔用的大小 > 緩衝區中總是有一個儲存單元保持未使用狀態。因此緩衝區最多存入 `size - 1` 。 ```c /* Return count in buffer. */ #define CIRC_CNT(head,tail,size) (((head) - (tail)) & ((size)-1)) ``` `CIRC_SPACE` 回傳緩衝區剩餘大小 ```c /* Return space available, 0..size-1. We always leave one free char as a completely full buffer has head == tail, which is the same as empty. */ #define CIRC_SPACE(head,tail,size) CIRC_CNT((tail),((head)+1),(size)) ``` - `fast_buf_clear` head = tail = 0 表示均指向同個位置,因此 buffer 為空。 ```c /* Clear all data from the circular buffer fast_buf */ static void fast_buf_clear(void) { fast_buf.head = fast_buf.tail = 0; } ``` - `fast_buf_get` - `fast_buf_put` ### Bottom half > 中斷概念參考 [Linux 核心設計: 中斷處理和現代架構考量](https://hackmd.io/@sysprog/linux-interrupt#Linux-%E6%A0%B8%E5%BF%83%E8%A8%AD%E8%A8%88-%E4%B8%AD%E6%96%B7%E8%99%95%E7%90%86%E5%92%8C%E7%8F%BE%E4%BB%A3%E6%9E%B6%E6%A7%8B%E8%80%83%E9%87%8F) 為防止 interrupt 產生打架的情況,分成三種類型: - Critical : Top-half (不允許其他中斷,立即處理) ex : acknowledge interrupt - Non-critical : Top-half (允許其他中斷) ex : read key scan code, add to buffer - Non-cirtical deferrable : Bottom half, do it **later** (允許其他中斷) 可以被延遲中斷,意味著可以重新被排程 ex : copy keyboard buffer to terminal handler process 在 Linux 作業系統中,使用了三種機制去實現 Bottom Half - softirq - tasklet - workqueue softirqs 和 tasklet 執行於 interrupt context , workqueue 執行於 process context 。 > 相關問題: > [Difference between interrupt context and process context?](https://stackoverflow.com/questions/57987140/difference-between-interrupt-context-and-process-context) ![image](https://hackmd.io/_uploads/rJWUMvXMA.png) #### softirq - 是 reentrant function,同一種 IRQ 可以在多個 CPU 上並行執行。 - 使用靜態建立 - kernel thread #### tasklet - 建構在 softirq 之上,tasklet 相較 softirq 允許動態建立。 - 同一時間只執行在一個 CPU 上,且不為 reentrant function 。 - kernel thread 在 simrupt 中 定義 tasklet ```c #define DECLARE_TASKLET_OLD(arg1, arg2) DECLARE_TASKLET(arg1, arg2, 0L) ``` 加入 tasklet ```c tasklet_schedule(&simrupt_tasklet); ``` 刪除 tasklet ``` tasklet_kill(&simrupt_tasklet); ``` #### Workqueue - 概念為對中斷的處理建立每個 CPU 的 kernel thread 在 simrupt 中建立 workqueue ```c static struct workqueue_struct *simrupt_workqueue; ``` 當中 `WQ_UNBOUND` 表示不綁定任何 CPU > 更多 Workqueue flags 參考 [/include/linux/workqueue.h](https://elixir.bootlin.com/linux/latest/source/include/linux/workqueue.h#L351) ```c /* Create the workqueue */ simrupt_workqueue = alloc_workqueue("simruptd", WQ_UNBOUND, WQ_MAX_ACTIVE); if (!simrupt_workqueue) { vfree(fast_buf.buf); device_destroy(simrupt_class, dev_id); class_destroy(simrupt_class); ret = -ENOMEM; goto error_cdev; } ``` 執行指定的 workqueue 任務,當中 `&work` 指向具體任務的指針。 ```c queue_work(simrupt_workqueue, &work); ``` 釋放 workqueue ```c destroy_workqueue(simrupt_workqueue); ``` 當 producer 將數據儲存至 kfifo buffer 及 consumer 將數據從 circular buffer 取出時使用互斥鎖。 ### Timer 使用 `timer_handler` 函式來模擬中斷的發生,在函式中 `local_irq_disable` 用於停用中斷,來避免中斷時又遇到中斷的發生,接著將延遲中斷的部份使用 `process_data` 將字元放入 circular buffer ,並啟用 tasklet 來處理中斷,最後使用 `local_irq_enable` 函式再次啟用中斷。 ### mutex lock 在 simrupt 中定義的鎖 ```c /* NOTE: the usage of kfifo is safe (no need for extra locking), until there is * only one concurrent reader and one concurrent writer. Writes are serialized * from the interrupt context, readers are serialized using this mutex. */ static DEFINE_MUTEX(read_lock); ``` 註釋指出,在特定條件下,即只有一個並行 reader 和一個並行 writer 時,對 kfifo 的使用是安全的,無需額外的鎖定機制。 考慮到並行,因寫入操作是由 interrupt context 進行(不可被搶佔、sleep),因此只有一個 writer 可以存取 kfifo,但讀取操作就必須使用 `DEFINE_MUTEX(read_lock)` 來限制只能有一位讀者存取 kfifo ,總結最後確保 kfifo 的並行是安全的,在同一時間只有一個 writer 及 reader 可以存取 kfifo 。 ```c /* Mutex to serialize kfifo writers within the workqueue handler */ static DEFINE_MUTEX(producer_lock); /* Mutex to serialize fast_buf consumers: we can use a mutex because consumers * run in workqueue handler (kernel thread context). */ static DEFINE_MUTEX(consumer_lock); ``` 藉由 kernel thread 取得 circular buffer 的資料,使用 mutex 來避免執行緒之間在取得資料時發生問題 ```c /* Consume data from the circular buffer */ mutex_lock(&consumer_lock); val = fast_buf_get(); mutex_unlock(&consumer_lock); ``` :::info **疑問** 前面的註釋提到寫入 kfifo 的操作在 interrupt context 已確保同時只能有一個寫著存取,為何這裡還要在上鎖? ::: ```c /* Store data to the kfifo buffer */ mutex_lock(&producer_lock); produce_data(val); mutex_unlock(&producer_lock); ``` 當 circular buffer 為空時 `if (val < 0)` 喚醒等待中的佇列 ```c wake_up_interruptible(&rx_wait); ``` ```c /* Wait queue to implement blocking I/O from userspace */ static DECLARE_WAIT_QUEUE_HEAD(rx_wait); ``` ### 流程圖 > 參考[江冠霆同學](https://hackmd.io/@Henry0922/linux2024-homework6#simrupt)的作業時發現他整理了一個流程圖,可以加速認識 simrupt 的運作原理,後續回頭複習時能更快了解。 ### lock-free [Linux 核心設計:淺談同步機制](https://www.youtube.com/watch?v=lNt-ewfygy8) - coarse-grained vs fine-grained 用粗粒(coarse-grained)細粒(fine-grained)來舉 lock 的優劣 - Atomic instruction `x++`,`x--` -> 確保不會被 CPU 交錯執行 - Compare and swap (CAS) * 語意: `if (x == y) x = z;` ```C int compareAndSwap(int *loc, int expectedValue, int valueToStore) { ATOMIC { int val = *loc; if (val == expected) { *loc = valueToStore; } return val; } } ``` 若傳入為符合預期的值進行 swap ,簡單的程式碼,重點在於 `ATOMIC` **並非無鎖的操作就是 `lock-free`** - **一個 lock-free 的程式需要確保符合這個特徵:執行該程式的所有執行緒至少有一個能夠不被阻擋 (blocked) 而繼續執行** * lock-Free 中的 "Lock" 沒有直接涉及 mutex 和 semaphore 一類 mutual exclusion,而是描述了應用程式受限於特定原因被鎖定的可能,例如可能因為 dead lock, live lock,或者 thread scheduling 致使的 priority inversion 等等 * 下方的程式碼沒有任何 mutex,但卻不符合 lock-free 的屬性:若有兩個執行緒同時執行程式碼,考慮到特定的排程方式,非常可能兩個執行緒同時陷入無窮迴圈,也就是阻擋彼此 * Lock-Free 程式設計的挑戰不僅來自於其任務本身的複雜性,還要始終著眼於對事物本質的洞察 ```c while (X == 0 ) { X = 1 - X; } ``` **討論能否改成 lock-free** 下方程式碼使用 mutex 確保從 circular buffer 取得的值是一致的 ```c mutex_lock(&consumer_lock); val = fast_buf_get(); mutex_unlock(&consumer_lock); ``` 在 `fast_buf_get` 中或許可以使用 atomic 的 CAS 操作來改寫,但具體怎麼改寫還在思考。 ## 閱讀 [Linux 核心模組運作原理](https://hackmd.io/@sysprog/linux-kernel-module) - [ ] 閱讀〈Linux 核心模組運作原理〉並對照 Linux 核心原始程式碼 (v6.1+),解釋 insmod 後,Linux 核心模組的符號 (symbol) 如何被 Linux 核心找到 (使用 List API)、MODULE_LICENSE 巨集指定的授權條款又對核心有什麼影響 (GPL 與否對於可用的符號列表有關),以及藉由 strace 追蹤 Linux 核心的掛載,涉及哪些系統呼叫和子系統? 因目前設備使用的 kernel 核心版本為 v6.5.0,為了後續實作查看 Linux kernel v6.5.0 的源程式碼 使用 strace 追蹤執行 insmod hello.ko 的過程有哪些系統呼叫被執行: ``` $sudo strace insmod hello.ko ``` 當中的 `finit_module` 在 [kernel/module/main.c](https://elixir.bootlin.com/linux/v6.5/source/kernel/module/main.c) 中可以找到,接著呼叫 `idempotent_init_module` -> `init_module_from_file` -> `load_module`,最後呼叫的 `load_module` 為 Linux 核心配置記憶體及載入模組。 接著查看到 `load_module` 的程式碼,-> `simplify_symbols` -> `resolve_symbol_wait ` -> `resolve_symbol` -> `find_symbol` -> `list_for_each_entry_rcu` `list_for_each_entry_rcu` 是一個 RCU 的 List API ```c /** * list_for_each_entry_srcu - iterate over rcu list of given type * @pos: the type * to use as a loop cursor. * @head: the head for your list. * @member: the name of the list_head within the struct. * @cond: lockdep expression for the lock required to traverse the list. * * This list-traversal primitive may safely run concurrently with * the _rcu list-mutation primitives such as list_add_rcu() * as long as the traversal is guarded by srcu_read_lock(). * The lockdep expression srcu_read_lock_held() can be passed as the * cond argument from read side. */ #define list_for_each_entry_srcu(pos, head, member, cond) \ for (__list_check_srcu(cond), \ pos = list_entry_rcu((head)->next, typeof(*pos), member); \ &pos->member != (head); \ pos = list_entry_rcu(pos->member.next, typeof(*pos), member)) ```