# 2024q1 Homework6 (integration) contributed by < `eleanorLYJ` > :::danger 依據指定格式書寫 ::: > [作業規範](https://hackmd.io/@sysprog/linux2024-integration/%2F%40sysprog%2Flinux2024-integration-a) ## 理解核心模組運作 ### 更新 GCC 版本時遇到的問題。 編譯核心模組時,`$ make -C /lib/modules/`uname -r`/build M=`pwd` modules` 出現出現以下訊息,表示我的編譯器版本與kernel開發套件所用的編譯器版本版本不一致。 ``` make: Entering directory '/usr/src/linux-headers-6.5.0-26-generic' warning: the compiler differs from the one used to build the kernel The kernel was built by: x86_64-linux-gnu-gcc-12 (Ubuntu 12.3.0-1ubuntu1~22.04) 12.3.0 You are using: gcc-12 (Ubuntu 12.3.0-1ubuntu1~22.04) 12.3.0 ``` 我主要跟著 stackoverflow 的 [how to install gcc-12 on ubuntu](https://stackoverflow.com/questions/70835585/how-to-install-gcc-12-on-ubuntu) 這篇的的方法 在執行 `$ sudo apt-get install libmpfrc++-dev` 的部分,會出現錯誤訊息表示被 dpkg 中斷。 ``` E: dpkg was interrupted, you must manually run 'sudo dpkg --configure -a' to correct the problem. ``` 接著我手動跑 `$ sudo dpkg --configure -a`,會出現以下的訊息,並且運行了超過 1 小時仍卡在此訊息。 ``` Setting up context (2021.03.05.20220211-1) ... Running mtxrun --generate. This may take some time... done. Pregenerating ConTeXt MarkIV format. This may take some time... ``` 為了解決這一部分,我尋找了許多方法,最終根據 askubuntu 上的 [Pregenerating ConTeXt MarkIV format. This may take some time... takes forever](https://askubuntu.com/questions/956006/pregenerating-context-markiv-format-this-may-take-some-time-takes-forever)此貼文的方法,刪除 texlive-full 等的封包,再次跑`$ sudo dpkg --configure -a` 就能順利繼續更新 GCC 版本。 ### insmod >閱讀 Linux Device Drivers 的 [chapter 2: Building and Running Modules](https://static.lwn.net/images/pdf/LDD3/ch02.pdf) insmod 功用是將核心模組的載入到 kernel中,並且模組的符號表(symbol table)會與的 kernel 中的符號表相接。 ![Pasted image 20240427133958](https://hackmd.io/_uploads/Hy0VoFxzR.png) ## MODULE_LiCENSE > 閱讀 LKMPG 的 4.4 Licensing and Module Documentation 與 [Linux 核心模組運作原理](https://hackmd.io/@sysprog/linux-kernel-module) 能知道這部份的簡單結論,就是 MODULE_XXX 系列的巨集的參數會被合併成字串,之後把字串放到 .modinfo 的部分 `MODULE_LICENSE("Dual MIT/GPL")` 變成 ``"license = Dual MIT/GPL"`` <!-- Linux 核心模組的符號 (symbol) 如何被 Linux 核心找到 (使用 List API)、MODULE_LICENSE (GPL 與否對於可用的符號列表有關), --> <!-- ### strace 藉由 **strace 追蹤 Linux 核心的掛載**,涉及哪些些系統呼叫和子系統? --> ## CMWQ (Concurrency Managed Workqueue) > 閱讀 M06: integration中的 [CMWQ敘述](https://hackmd.io/@sysprog/linux2024-integration-c) + [CMWQ 文件](https://www.kernel.org/doc/html/latest/core-api/workqueue.html?highlight=workqueue) ### 五個基本認識名詞 1. Work Item 為待處理的任務或操作。能將work item 放入佇列,以便在適當的時候由work thread 處理。 2. Independent Thread 獨立執行緒是運行在其自己的執行上下文中的執行緒,它可以獨立於其他線程運行,並且可以同時處理多個任務。 3. Work Queue,wq 連接所有的 work item,形成一個佇列。這個佇列用於存儲待處理的任務。 4. Worker wq 有 work item 就執行與 work item 相關函式,若 wq 沒 worker item, worker 就 idle 5. Worker Pool 用於管理worker 執行緒,此外,worker pool 可以動態地調整 worker 的數量,以應對不同的負載情況。 ### work thread 如何與 CPU 排程器互動 [CMWQ 文件](https://www.kernel.org/doc/html/latest/core-api/workqueue.html?highlight=workqueue) >Each worker-pool bound to an actual CPU implements concurrency management by hooking into the scheduler. As long as there are one or more runnable workers on the CPU, the worker-pool doesn’t start execution of a new work, but, when the last running worker goes to sleep, it immediately schedules a new worker so that the CPU doesn’t sit idle while there are pending work items. 在 LKMPG 14.2 Work queues 提到 > Work queues To add a task to the scheduler we can use a workqueue. The kernel then uses the **Completely Fair Scheduler (CFS)** to execute work within the queue. 接著閱讀 Demystifying the Linux CPU Scheduler 的 2.3 Completely Fair Scheduler (CFS) > TODO 補充筆記 ## 解釋 ksort 如何運用 CMWQ 達到並行的排序 ### ksort 首先從作業說明得知,ksort 的設計。 > ksort 設計為一個 character device,可理解是個能夠循序存取檔案,透過定義相關的函式,可利用存取檔案的系統呼叫以存取 (即 open, read, write, mmap 等等)。因此,使用者層級 (user-level 或 userspace) 的程式可透過 read 系統呼叫來得到輸出。 ### CMWQ 的 API 使用CMWQ 中幾個重要 API,能夠創造出一個工作佇列,能夠自己管理工作任務與 CPU 資源 - `alloc_workqueue()` : 創建 workqueue - `INIT_WORK()` : 建立一個工作結構變數 - `queue_work()` : 調度工作,若工作結構已經在佇列中返回 False, 否則加到與某 CPU <!--對應的工作佇列-->中,倘若 CPU 死掉了,會被其他 CPU 接手執行;`schedule_work` : 會把工作放到全域的工作佇列 (global workqueue) - `drain_workqueue()`: 清空工作佇列; `destroy_workqueue()`釋放工作佇列相關的資源。 接著細看 ksort 的程式碼,其中除了 CMWQ 的部分,也理解了 character device 的實作。 #### ksort/sort_mod.c 中的 module_init (sort_init) <!-- > 閱讀 [Linux 核心模組運作原理](https://hackmd.io/@sysprog/linux-kernel-module) --> 要載入核心模組,會使用 insmod 這命令。 而 insmod 命令內部會呼叫 init_module,而這 module_init 就可以呼叫自己寫的函式(這裡為 `sort_init`)。 ```c module_init(sort_init); ``` 在 sort_init 中,我想要得知關於排序呼叫的地方與 CMWQ 如何被使用。 在 sort_init 中能看到初始化 character device,並註冊了這 character device 要使用的函式,另外中創建一個 workqueue。 ```c static int __init sort_init(void) { cdev_init(&cdev, &fops); if (cdev_add(&cdev, dev, 1) < 0) goto error_device_destroy; workqueue = alloc_workqueue("sortq", 0, WQ_MAX_ACTIVE); if (!workqueue) goto error_cdev_del; return 0; // ... } ``` > 閱讀 [Character device drivers](https://linux-kernel-labs.github.io/refs/heads/master/labs/device_drivers.html) 理解 charater device charater device 是由 struct cdev 表示 實作charater device,代表是對特定的檔案做system call > implementation of a character device driver means implementing the system calls specific to files: open, close, read, write, lseek, mmap, etc. These operations are described in the fields of the struct file_operations structure ```c static const struct file_operations fops = { .read = sort_read, .write = sort_write, .open = sort_open, .release = sort_release, .owner = THIS_MODULE, }; ``` 以上,除了 sort_read 其他函式目前都沒有實作。 ```c static ssize_t sort_read(struct file *file, char *buf, size_t size, loff_t *offset) { void *sort_buffer = kmalloc(size, GFP_KERNEL); // ... len = copy_from_user(sort_buffer, buf, size); // ... sort_main(sort_buffer, size / es, es, num_compare); len = copy_to_user(buf, sort_buffer, size); // ... kfree(sort_buffer); return size; } ``` 接著看 實作在 `sort_impl.c` 的 `sort_main()` ```c void sort_main(void *sort_buffer, size_t size, size_t es, cmp_t cmp) { struct qsort *q = kmalloc(sizeof(struct qsort), GFP_KERNEL); struct common common = { .swaptype = ((char *) sort_buffer - (char *) 0) % sizeof(long) || es % sizeof(long) ? 2 : es == sizeof(long) ? 0 : 1, .es = es, .cmp = cmp, }; init_qsort(q, sort_buffer, size, &common); queue_work(workqueue, &q->w); drain_workqueue(workqueue); } ``` 接著看 `init_qsort` 的實作,並且疑惑 `qsort_algo()` 的參數為甚麼是 `work_struct` 而不是 `qsort *`,這些答案能從追 INIT_WORK 此函式得到答案 ```c static void qsort_algo(struct work_struct *w); static void init_qsort(struct qsort *q, void *elems, size_t size, struct common *common) { INIT_WORK(&q->w, qsort_algo); q->a = elems; q->n = size; q->common = common; } ``` INIT_WORK 被實作在 workqueue.h 中 ```c #define INIT_WORK(_work, _func) \ __INIT_WORK((_work), (_func), 0) ``` <!-- 不知道 `lock_class_key __key` 是甚麼 --> ```c #define __INIT_WORK(_work, _func, _onstack) \ do { \ static __maybe_unused struct lock_class_key __key; \ \ __INIT_WORK_KEY(_work, _func, _onstack, &__key); \ } while (0) ``` 在`__INIT_WORK_KEY` 中設定工作結構的相關屬性後,初始化工作結構中的鏈結串列的頭,這用於將工作結構插入到工作佇列中。最後設置工作結構中的 func 指針,指向要執行的工作函式`func`。 ```c #define __INIT_WORK_KEY(_work, _func, _onstack, _key) \ do { \ __init_work((_work), _onstack); \ (_work)->data = (atomic_long_t) WORK_DATA_INIT(); \ INIT_LIST_HEAD(&(_work)->entry); \ (_work)->func = (_func); \ } while (0) #endif ``` ## simrupt - [ ] 解釋 simrupt 程式碼裡頭的 **mutex lock 的使用方式** - [ ] 探討能否改寫為 lock-free - [ ] simrupt 中 使用 **CMWQ** 情況 留意到 1. worker-pools 類型可指定 "Bound" 來分配 2. 限制特定 worker 執行於指定的 CPU,Linux 核心如何做到? 3. CMWQ 關聯的 worker thread 又如何與 CPU 排程器互動? ### simrupt 流程圖 (圖中的所有函數,我都省略 simrupt 前娺) ```graphviz digraph _graph_name_ { # 設定 (其實沒有順序,這整個文檔每行亂調輸出都一樣) rankdir=TD; # 順序由左至右(上下是"TD") graph [fontname="DFKai-SB"]; # 此三行是設定字型 node [fontname="DFKai-SB"]; # 中文務必指定 node [shape = box] edge [fontname="DFKai-SB"]; # 不然可能會出現亂碼 // subgraph work { // label = "關於work 與 tasklet"; // item [label="{DECLEAR_WORL|simrupt_task_func}"]; // } # 這邊是 node (節點) simrupt_init[label = "init"] char_device[label = "設定\nchar device"] wq[label = "建立\nwork queue"] timer[label = "設定\ntimer handler"] process[label = "process_data"] // DECLAR_WORK[label = "DECLAR_WORK"] read[label = "read"] wq_handler[label="workqueue_handler"] # 這邊是 edge (邊) simrupt_init -> char_device simrupt_init -> wq # 可以一直連 simrupt_init -> timer timer -> process // process -> tasklet // DECLARE_TASKLET_OLD->tasklet char_device -> read wq_handler ->fast_buf_get wq_handler-> produce_data process->tasklet_schedule ->tasklet_handler->queue_work process->fast_buf_put } ``` ### 值的流動 兩個會受到 mutex_lock 保護的資料結構 第一個 kfifo rx_fifo 第二 為 struct circ_buf fast_buf,從註解得知 circ_buf 是一個能快速儲存的結構。 在 simrupt 將 中斷內容存到 rx_fifo 之前 會先存到 fast_buf 中。 ```c /* We use an additional "faster" circular buffer to quickly store data from * interrupt context, before adding them to the kfifo. */ static struct circ_buf fast_buf; ``` 整理 simrupt 值的流動過程 - 在 `process_data` 的 `fast_bug_put` 存一個 `update_simrupt_data()` 產生的值進 fast_buf 中 - 在`fast_buf_get()` 中取得 fast_buf 當前tail 的值 - 在`produce_data()` 存值進 rx_fifo 中 ### 細看程式 ```c static const struct file_operations simrupt_fops = { .read = simrupt_read, // ... } static int __init simrupt_init(void) { // ... /* Register major/minor numbers */ ret = alloc_chrdev_region(&dev_id, 0, NR_SIMRUPT, DEV_NAME); /* Add the character device to the system */ cdev_init(&simrupt_cdev, &simrupt_fops); ret = cdev_add(&simrupt_cdev, dev_id, NR_SIMRUPT); // ... /* Allocate fast circular buffer */ fast_buf.buf = vmalloc(PAGE_SIZE); //... /* Create the workqueue */ simrupt_workqueue = alloc_workqueue("simruptd", WQ_UNBOUND, WQ_MAX_ACTIVE); // ... /* Setup the timer */ timer_setup(&timer, timer_handler, 0); ``` #### `timer_handler()` 中呼叫的 `process_data()` `fast_bug_get()` 存 `update_simrupt_data()`產生的值進 fast_buf 開頭的位置。 之後 `tasklet_schedule()` 調度一次 simrupt_tasklet ```c static void process_data(void) { // ... fast_buf_put(update_simrupt_data()); // ... tasklet_schedule(&simrupt_tasklet); } ``` 在 tasklet handler 中將工作提交到工作佇列中。 ```c /*Tasklet handler.*/ static void simrupt_tasklet_func(unsigned long __data) { // ... tv_start = ktime_get(); queue_work(simrupt_workqueue, &work); tv_end = ktime_get(); // ... } ``` 回頭看工作任務 ```c static DECLARE_WORK(work, simrupt_work_func); ``` 在 simrupt_work_func 看 mutex_lock 被使用。 consumer_lock 保護 fast_buf_get() 這 critical section,確保只有一個執行緒存取 fast_buf。 producer_lock 保護 produce_data(val) 這 critical section,確保只有一個執行緒能存取 rx_fifo。 ```c /*workqueue handler*/ static void simrupt_work_func(struct work_struct *w) { // ... while (1) { /* Consume data from the circular buffer */ mutex_lock(&consumer_lock); val = fast_buf_get(); mutex_unlock(&consumer_lock); // ... /* Store data to the kfifo buffer */ mutex_lock(&producer_lock); produce_data(val); mutex_unlock(&producer_lock); } // ... } ``` #### simrupt_read 中有使用到 read_lock - [mutex_lock_interruptible](https://manpages.org/mutex_lock_interruptible/9) 像 `mutex_lock` 一樣鎖定互斥鎖。如果已取得互斥鎖則回傳0,或休眠直到互斥鎖變得可用。 如果在等待鎖定時有訊號到達,則函數傳回 -EINTR。 ```c int __sched mutex_lock_interruptible(struct mutex * lock); ``` - [mutex_unlock](https://manpages.org/mutex_unlock/9): 解開互斥鎖 ```c void __sched mutex_unlock(struct mutex * lock); ``` 在`simrupt_read()` 中,如果沒有得到 read_lock ,就會得到 read_lock,之後進入迴圈中並執行以下兩個函式 - `kfifo_to_user`: 作用於 從 kfifo 複製 buf 大小到 user space中。 - `wait_event_interruptible(wq, condition);`: 以下情境,在 kfifo_len(&rx_fifo) > 0 之前,rx_wait 要等待。此函數如果被中斷會回傳 -ERESTARTSYS,如果 condition 成立就會回傳 0。 跳出迴圈解開 read_lock,這代表著 rx_fifo 中至少有一值能讀。 ```c static ssize_t simrupt_read(struct file *file, char __user *buf, size_t count, loff_t *ppos) { // ... if (mutex_lock_interruptible(&read_lock)) return -ERESTARTSYS; do { ret = kfifo_to_user(&rx_fifo, buf, count, &read); // ... ret = wait_event_interruptible(rx_wait, kfifo_len(&rx_fifo)); } while (ret == 0); // .,. mutex_unlock(&read_lock); return ret ? ret : read; } ``` ### 討論是否可以改成 lock-free