# 2024q1 Homework6 (integration) contributed by < `p96114175` > - [ ] 研讀 [CMWQ](https://www.kernel.org/doc/html/latest/core-api/workqueue.html) (Concurrency Managed Workqueue) 文件,對照 [simrupt](https://github.com/sysprog21/simrupt) 專案的執行表現,留意到 worker-pools 類型可指定 "Bound" 來分配及限制特定 worker 執行於指定的 CPU,Linux 核心如何做到?CMWQ 關聯的 worker thread 又如何與 CPU 排程器互動? ## 研讀 [CMWQ](https://www.kernel.org/doc/html/latest/core-api/workqueue.html) 在非同步執行場景,一個 work item 指向要執行的函數,並被放在 queue 中,而一個 thread 通常用於以非同步方式執行 work item。在這 queue 被稱為 workqueue 而 thread 被稱為 worker。當 workqueue 為空, worker 變成 idle。一旦新的 work item 插入 queue,則 worker 再次開始執行。 接下來解釋一下為何要實作 Concurrency Managed Workqueue,可分成以下小標題來解釋原先 workqueue 實作存在的問題 **PID 資源** 原先的 multi threaded (MT) workqueue 中,每一個 CPU 有一個 worker thread,MT workqueue 的 worker 數量和 CPU 數量是一致的。而在 single threaded (ST) workqueue 中,整個系統只有一個 worker thread 來完成所有的 work item。多年來因為 CPU core 數目上升,這也導致一些系統在啟動時就耗盡了預設的 PID 空間。 **concurrency 不足** * Multi threaded workqueue 中每個 CPU 只有一個 worker 來提供 context。而 single threaded workqueue 只有一個 worker 來提供整個系統所需的 context,造就每一個 work item 為 sequential 執行。但前述的設計中 concurrency 是不夠的。 Concurrency Managed Workqueue (cmwq) 會專注於以下目標 * 和原先 workqueue API 保有相容性 * 全部的 workqueue 共享每個 CPU 的 worker pools ,並提供更具彈性的 concurrency 等級,以此降低資源浪費。 * 系統內部會處理好 worker pool 和 concurrency,使用者可視為黑盒子。 CMWQ 架構圖:參考自 [integration(C)](https://hackmd.io/@sysprog/linux2024-integration-c) ![image](https://hackmd.io/_uploads/SyUD0oI-C.png) ### 關於限制特定 worker 執行於指定的 CPU,可在 [CMWQ](https://www.kernel.org/doc/html/latest/core-api/workqueue.html) 的 Affinity Scopes 中觀察到 **Affinity Scopes** 一個 unbound workqueue 會根據其 Affinity Scopes 將 CPU 分組,來改善 cache locality。打個比方,如果一個 workqueue 使用默認的親和性範圍 “cache”,它將根據最後一級 cache 的邊界來對 CPU 分組。在 workqueue 上的 work item 將會被分配給其中一個 CPUs 的一個 worker,並根據 scope 的 affinity_strict 設定,該 worker 可能允許或不允許在範圍之外移動。 **workqueue_attrs** 下方是一個 workqueue attributes 的結構體定義,在 `cpumask` 指示了 workqueue 中的 work items 對哪些 CPU 是親和性的,執行於指定的 CPU,並且不允許執行在其他 CPU 上 ```c struct workqueue_attrs { int nice; cpumask_var_t cpumask; cpumask_var_t __pod_cpumask; bool affn_strict; enum wq_affn_scope affn_scope; bool ordered; }; ``` ## [simrupt](https://github.com/sysprog21/simrupt) 這裡記錄我對 simrupt 的觀察,後續再會針對第三次作業提及的人工智慧程式碼,進行整合。 在 [simrupt.c](https://github.com/sysprog21/simrupt/blob/main/simrupt.c) 中,採用 `alloc_workqueue` 來建立 workqueue,並將 `@flags` 設定為 `WQ_UNBOUND` ,根據 [CMWQ(Concurrency Managed Workqueue)](https://www.kernel.org/doc/html/latest/core-api/workqueue.html) 的解釋,host workers 不綁定至任意一個 CPU,workqueue 會表現得更像一個執行上下文的提供者並且沒有並發管理。 > Work items queued to an unbound wq are served by the special worker-pools which host workers which are not bound to any specific CPU. This makes the wq behave as a simple execution context provider without concurrency management. The unbound worker-pools try to start execution of work items as soon as possible. Unbound workqueue 犧牲了局部性,但這對以下情況很有用 * 預計 concurrency 等級要求有大幅度波動 * 長時間運行的 CPU 密集型工作負載可以由系統排程器更好地管理。 在這`@max_active` 設定為 WQ_MAX_ACTIVE ,它決定了在每一個 CPU 的同一時間上,能夠執行最多的 work item 是多少個。 ```c simrupt_workqueue = alloc_workqueue("simruptd", WQ_UNBOUND, WQ_MAX_ACTIVE); if (!simrupt_workqueue) { vfree(fast_buf.buf); device_destroy(simrupt_class, dev_id); class_destroy(simrupt_class); ret = -ENOMEM; goto error_cdev; } ``` **simrupt_work_func** 1. simrupt_work_func 由 kernel thread 所執行,他會去獲取每一個 cpu data,接下來他會透過 fast_buf_get() 來消耗 circular buffer 的資料並取得 val 變數 2. 再使用 produce_data(val) 將資料儲存至 kfifo buffer。 3. 呼叫 wake_up_interruptible() 函式,喚醒在 wait queue 睡著的 processes ,通知他們 kfifo buffer 已經有新數據了。 * 我們可以注意到執行 fast_buf_get() 和 produce_data() 時,都是在 mutex 情況下執行,以此來避免 race condition。 * 在這 rx_wait 則是作為 wait queue 用於實現 userspace 的 blocking I/O。 ```c static void simrupt_work_func(struct work_struct *w) { ... while (1) { /* Consume data from the circular buffer */ mutex_lock(&consumer_lock); val = fast_buf_get(); mutex_unlock(&consumer_lock); if (val < 0) break; /* Store data to the kfifo buffer */ mutex_lock(&producer_lock); produce_data(val); mutex_unlock(&producer_lock); } wake_up_interruptible(&rx_wait); } ``` **fast_buf_get** `fast_buf_get` 為 consumer,它藉由提取 index 為 tail 的 item,來獲取 val 變數 ```c static int fast_buf_get(void) { ... /* extract item from the buffer */ ret = ring->buf[tail]; ... } ``` **produce_data** 在 produce_data() 採用了 FIFO 方式將 value 插入至 kfifo buffer ```c static void produce_data(unsigned char val) { /* Implement a kind of circular FIFO here (skip oldest element if kfifo * buffer is full). */ unsigned int len = kfifo_in(&rx_fifo, &val, sizeof(val)); if (unlikely(len < sizeof(val)) && printk_ratelimit()) pr_warn("%s: %zu bytes dropped\n", __func__, sizeof(val) - len); pr_debug("simrupt: %s: in %u/%u bytes\n", __func__, len, kfifo_len(&rx_fifo)); } ``` 目前 tic-tac-toe AI algorithms 皆採用 negamax 來實現,並設計出相對應的 work item,像是 **PrintBoard_work_func**, **AI1_work_func**, **AI2_work_func**,這些函式可以以非同步方式被執行,詳情參見 [commit 36d6dfc](https://github.com/sysprog21/simrupt/commit/36d6dfc2d0a86faf9539211759797cad9676853a)。 ```c /* Work item: holds a pointer to the function that is going to be executed asynchronously. */ static DECLARE_WORK(PrintBoard_work, PrintBoard_work_func); static DECLARE_WORK(AI1_work, AI1_work_func); static DECLARE_WORK(AI2_work, AI2_work_func); ``` :::info 但是在我設計完以後,想將 kernel object 插入至 kernel 時,顯示 killed,目前還在調查原因。 ::: **不同 CPU 上執行 AI 演算法** 在閱讀 [Linux 核心專題: simrupt 研究和應用](https://hackmd.io/@sysprog/HJXlHtlB2)時,它提到 `一個 tasklet 只會在調度他的 CPU 上執行` ,因此當 `tasklet_schedule` 被呼叫時,此 tasklet 會被特定 CPU 所執行。如果希望不同 AI 演算法在不同 CPU 上所執行,便需要為每一個 AI 演算法設置一個 Tasklet。 ```c /* Tasklet handler. * * NOTE: different tasklets can run concurrently on different processors, but * two of the same type of tasklet cannot run simultaneously. Moreover, a * tasklet always runs on the same CPU that schedules it. */ static void simrupt_tasklet_func(unsigned long __data) { ktime_t tv_start, tv_end; s64 nsecs; WARN_ON_ONCE(!in_interrupt()); WARN_ON_ONCE(!in_softirq()); tv_start = ktime_get(); queue_work(simrupt_workqueue, &work); tv_end = ktime_get(); nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start)); pr_info("simrupt: [CPU#%d] %s in_softirq: %llu usec\n", smp_processor_id(), __func__, (unsigned long long) nsecs >> 10); } /* Tasklet for asynchronous bottom-half processing in softirq context */ static DECLARE_TASKLET_OLD(simrupt_tasklet, simrupt_tasklet_func); ```