# 2024q1 Homework6 (integration)
contributed by < `p96114175` >
- [ ] 研讀 [CMWQ](https://www.kernel.org/doc/html/latest/core-api/workqueue.html) (Concurrency Managed Workqueue) 文件,對照 [simrupt](https://github.com/sysprog21/simrupt) 專案的執行表現,留意到 worker-pools 類型可指定 "Bound" 來分配及限制特定 worker 執行於指定的 CPU,Linux 核心如何做到?CMWQ 關聯的 worker thread 又如何與 CPU 排程器互動?
## 研讀 [CMWQ](https://www.kernel.org/doc/html/latest/core-api/workqueue.html)
在非同步執行場景,一個 work item 指向要執行的函數,並被放在 queue 中,而一個 thread 通常用於以非同步方式執行 work item。在這 queue 被稱為 workqueue 而 thread 被稱為 worker。當 workqueue 為空, worker 變成 idle。一旦新的 work item 插入 queue,則 worker 再次開始執行。
接下來解釋一下為何要實作 Concurrency Managed Workqueue,可分成以下小標題來解釋原先 workqueue 實作存在的問題
**PID 資源**
原先的 multi threaded (MT) workqueue 中,每一個 CPU 有一個 worker thread,MT workqueue 的 worker 數量和 CPU 數量是一致的。而在 single threaded (ST) workqueue 中,整個系統只有一個 worker thread 來完成所有的 work item。多年來因為 CPU core 數目上升,這也導致一些系統在啟動時就耗盡了預設的 PID 空間。
**concurrency 不足**
* Multi threaded workqueue 中每個 CPU 只有一個 worker 來提供 context。而 single threaded workqueue 只有一個 worker 來提供整個系統所需的 context,造就每一個 work item 為 sequential 執行。但前述的設計中 concurrency 是不夠的。
Concurrency Managed Workqueue (cmwq) 會專注於以下目標
* 和原先 workqueue API 保有相容性
* 全部的 workqueue 共享每個 CPU 的 worker pools ,並提供更具彈性的 concurrency 等級,以此降低資源浪費。
* 系統內部會處理好 worker pool 和 concurrency,使用者可視為黑盒子。
CMWQ 架構圖:參考自 [integration(C)](https://hackmd.io/@sysprog/linux2024-integration-c)

### 關於限制特定 worker 執行於指定的 CPU,可在 [CMWQ](https://www.kernel.org/doc/html/latest/core-api/workqueue.html) 的 Affinity Scopes 中觀察到
**Affinity Scopes**
一個 unbound workqueue 會根據其 Affinity Scopes 將 CPU 分組,來改善 cache locality。打個比方,如果一個 workqueue 使用默認的親和性範圍 “cache”,它將根據最後一級 cache 的邊界來對 CPU 分組。在 workqueue 上的 work item 將會被分配給其中一個 CPUs 的一個 worker,並根據 scope 的 affinity_strict 設定,該 worker 可能允許或不允許在範圍之外移動。
**workqueue_attrs**
下方是一個 workqueue attributes 的結構體定義,在 `cpumask` 指示了 workqueue 中的 work items 對哪些 CPU 是親和性的,執行於指定的 CPU,並且不允許執行在其他 CPU 上
```c
struct workqueue_attrs {
int nice;
cpumask_var_t cpumask;
cpumask_var_t __pod_cpumask;
bool affn_strict;
enum wq_affn_scope affn_scope;
bool ordered;
};
```
## [simrupt](https://github.com/sysprog21/simrupt)
這裡記錄我對 simrupt 的觀察,後續再會針對第三次作業提及的人工智慧程式碼,進行整合。
在 [simrupt.c](https://github.com/sysprog21/simrupt/blob/main/simrupt.c) 中,採用 `alloc_workqueue` 來建立 workqueue,並將 `@flags` 設定為 `WQ_UNBOUND` ,根據 [CMWQ(Concurrency Managed Workqueue)](https://www.kernel.org/doc/html/latest/core-api/workqueue.html) 的解釋,host workers 不綁定至任意一個 CPU,workqueue 會表現得更像一個執行上下文的提供者並且沒有並發管理。
> Work items queued to an unbound wq are served by the special worker-pools which host workers which are not bound to any specific CPU. This makes the wq behave as a simple execution context provider without concurrency management. The unbound worker-pools try to start execution of work items as soon as possible.
Unbound workqueue 犧牲了局部性,但這對以下情況很有用
* 預計 concurrency 等級要求有大幅度波動
* 長時間運行的 CPU 密集型工作負載可以由系統排程器更好地管理。
在這`@max_active` 設定為 WQ_MAX_ACTIVE ,它決定了在每一個 CPU 的同一時間上,能夠執行最多的 work item 是多少個。
```c
simrupt_workqueue = alloc_workqueue("simruptd", WQ_UNBOUND, WQ_MAX_ACTIVE);
if (!simrupt_workqueue) {
vfree(fast_buf.buf);
device_destroy(simrupt_class, dev_id);
class_destroy(simrupt_class);
ret = -ENOMEM;
goto error_cdev;
}
```
**simrupt_work_func**
1. simrupt_work_func 由 kernel thread 所執行,他會去獲取每一個 cpu data,接下來他會透過 fast_buf_get() 來消耗 circular buffer 的資料並取得 val 變數
2. 再使用 produce_data(val) 將資料儲存至 kfifo buffer。
3. 呼叫 wake_up_interruptible() 函式,喚醒在 wait queue 睡著的 processes ,通知他們 kfifo buffer 已經有新數據了。
* 我們可以注意到執行 fast_buf_get() 和 produce_data() 時,都是在 mutex 情況下執行,以此來避免 race condition。
* 在這 rx_wait 則是作為 wait queue 用於實現 userspace 的 blocking I/O。
```c
static void simrupt_work_func(struct work_struct *w)
{
...
while (1) {
/* Consume data from the circular buffer */
mutex_lock(&consumer_lock);
val = fast_buf_get();
mutex_unlock(&consumer_lock);
if (val < 0)
break;
/* Store data to the kfifo buffer */
mutex_lock(&producer_lock);
produce_data(val);
mutex_unlock(&producer_lock);
}
wake_up_interruptible(&rx_wait);
}
```
**fast_buf_get**
`fast_buf_get` 為 consumer,它藉由提取 index 為 tail 的 item,來獲取 val 變數
```c
static int fast_buf_get(void)
{
...
/* extract item from the buffer */
ret = ring->buf[tail];
...
}
```
**produce_data**
在 produce_data() 採用了 FIFO 方式將 value 插入至 kfifo buffer
```c
static void produce_data(unsigned char val)
{
/* Implement a kind of circular FIFO here (skip oldest element if kfifo
* buffer is full).
*/
unsigned int len = kfifo_in(&rx_fifo, &val, sizeof(val));
if (unlikely(len < sizeof(val)) && printk_ratelimit())
pr_warn("%s: %zu bytes dropped\n", __func__, sizeof(val) - len);
pr_debug("simrupt: %s: in %u/%u bytes\n", __func__, len,
kfifo_len(&rx_fifo));
}
```
目前 tic-tac-toe AI algorithms 皆採用 negamax 來實現,並設計出相對應的 work item,像是 **PrintBoard_work_func**, **AI1_work_func**, **AI2_work_func**,這些函式可以以非同步方式被執行,詳情參見 [commit 36d6dfc](https://github.com/sysprog21/simrupt/commit/36d6dfc2d0a86faf9539211759797cad9676853a)。
```c
/* Work item: holds a pointer to the function that is going to be executed asynchronously.
*/
static DECLARE_WORK(PrintBoard_work, PrintBoard_work_func);
static DECLARE_WORK(AI1_work, AI1_work_func);
static DECLARE_WORK(AI2_work, AI2_work_func);
```
:::info
但是在我設計完以後,想將 kernel object 插入至 kernel 時,顯示 killed,目前還在調查原因。
:::
**不同 CPU 上執行 AI 演算法**
在閱讀 [Linux 核心專題: simrupt 研究和應用](https://hackmd.io/@sysprog/HJXlHtlB2)時,它提到 `一個 tasklet 只會在調度他的 CPU 上執行` ,因此當 `tasklet_schedule` 被呼叫時,此 tasklet 會被特定 CPU 所執行。如果希望不同 AI 演算法在不同 CPU 上所執行,便需要為每一個 AI 演算法設置一個 Tasklet。
```c
/* Tasklet handler.
*
* NOTE: different tasklets can run concurrently on different processors, but
* two of the same type of tasklet cannot run simultaneously. Moreover, a
* tasklet always runs on the same CPU that schedules it.
*/
static void simrupt_tasklet_func(unsigned long __data)
{
ktime_t tv_start, tv_end;
s64 nsecs;
WARN_ON_ONCE(!in_interrupt());
WARN_ON_ONCE(!in_softirq());
tv_start = ktime_get();
queue_work(simrupt_workqueue, &work);
tv_end = ktime_get();
nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start));
pr_info("simrupt: [CPU#%d] %s in_softirq: %llu usec\n", smp_processor_id(),
__func__, (unsigned long long) nsecs >> 10);
}
/* Tasklet for asynchronous bottom-half processing in softirq context */
static DECLARE_TASKLET_OLD(simrupt_tasklet, simrupt_tasklet_func);
```