## Linux 核心專題: workqueue 研究
> 執行人: ssheep773
[解說錄影](https://youtu.be/HvHbeGna9OE)
[Github](https://github.com/ssheep773/work-stealing)
### Reviewed by `kevinzxc1217`
請問這裡的比較表格中,為什麼 `Context switches`, `CPUs utilized`, `Time elapsed` 等指標並沒有因應 pthread 的變多呈現遞增或遞減的趨勢呢?
> 隨著執行緒的增加 CPU-migrations 也跟著上升,然而 Page fault , Context switches 這些例外事件發生的機率也跟著上升,使的 CPUs 使用率則會受到例外事件的增加下降。
### Reviewed by `kkkkk1109`
想請問在你的實作中,是只要有執行緒一發生空閒,就會馬上去偷取其他執行緒的任務嗎,會有限制只能有幾個執行緒能偷取工作嗎? 另外在不同執行緒並行的情況下,工作佇列是會自己 resize 的嗎? 而工作佇列的大小是跟著 fib 數列 spawn 的數量決定的嗎?
> 每個的執行緒只要空閒就會去偷其他執行緒的工作
> 當工作佇列滿的時候,會自己 resize 並且固定擴增為原本 size 的兩
### Reviewed by `56han`
每次執行程式工作佇列大小分佈都相同嗎?程式執行時間和工作佇列大小、執行緒數量有關嗎?
>程式執行時間與執行緒數量有關,然而記憶體等開銷也會影響到程式執行時間,至於工作佇列大小只是用儲存工作,與執行時間無關,但可以用來觀察工作的分配情況。本任務是使用隨機的偷取任務,因此每次執行皆不相同。
### Reviewed by `yuyuan0625`
下文程式碼的註解請用英文書寫
請問哪種情境適合 child stealing 又哪種情境適合 continuation stealing 呢?
>child stealing 適用於子任務之間較獨立,不會互相等待
continuation stealing 適合任務之間存在依賴性的情況,可以有效地管理後續任務的執行
## 任務說明
延伸第九週測驗的 work steal 題目,探討 Linux 核心的 workqueue 如何處理 work stealing。
## TODO: 重作[第九週測驗](https://hackmd.io/@sysprog/linux2024-quiz9)的測驗三
> 包含延伸問題
根據論文 〈Cilk: An Efficient Multithreaded Runtime System〉 實作計算費式數列。
論文特點是使用 Cilk 達到非阻塞 (non-blocking) 的並行處理,經由將程式碼拆分成多段執行,而含有執行程式碼與所需的參數稱為閉包 (closure) ,並以一個閉包為單位執行任務,將他們分配到不同處理器上並行執行。
論文中以 Cilk 撰寫費式數的函式
```cilk
thread fib (cont int k, int n)
{
if (n<2)
send argument (k,n)
else
{
cont int x, y;
spawn next sum (k, ?x, ?y);
spawn fib (x, n-1);
spawn fib (y, n-2);
}
}
thread sum (cont int k, int x, int y)
{
send argument (k, x+y);
}
```
介紹 Cilk 使用到的種語句(statement),
`spawn` : 類似 `fork` 產生子任務。
`spawn next` : 是當前任務的後續任務。因為是非阻塞式的設計,因此將任務拆成兩個部分,將需要等待子任務的程式碼放在當前後續任務 (continuation) 中,當子任務完成,後續任務才會被放進 work queue 中等待執行。
在上面的例子中 `spawn next sum (k, ?x, ?y);` 就是須等待 `x` ,`y` 計算完成。 `k` 則是後續體任務。
`send argument (k, x+y)` : 則是變數的傳遞,將變數傳遞給需要的後續任務。
![image](https://hackmd.io/_uploads/Sysh-b2LR.png)
從論文中的圖解釋程式執行的流程,
A 產生 (`spawn`) 一個子任務(子閉包) B ,與產生(`spawn next`) 後續任務 F ,等待 B 任務的完成。
B 產生兩個子任務,並建立後續任務 E ,用於等待 C 、 D 的完成。
接著說明我的實作,首先在資料結構上
`work_t` 也就是論文中的閉包 (closure) 是執行的最小單元,包含三個元素 : 指向執行任務的程式碼、所需的變數,以及變數的數量,用於判斷任務是否就緒 (Ready)。
```c
typedef struct work_internal {
task_t code; // execute code
atomic_int join_count; // number of parameter to get ready
void *args[]; // parameter
} work_t;
```
>其中 `void *args[];` 的形式可以在變數宣告的時候在決定大小,其規則是一個結構只能有一個且必須放在結構的最後一項。
使用 `deque_t` 作為各執行緒的 work queue ,佇列是儲存 work 位址,並用 `top, bottom` 紀錄佇列的工作狀況,size 判斷佇列是否存滿需要擴充的依據,其中將 `array_t` 分離出來方便後續更動 `work_t *buffer[]`
```c
typedef struct {
atomic_size_t size;
_Atomic work_t *buffer[];
} array_t;
typedef struct {
/* Assume that they never overflow */
atomic_size_t top, bottom;
_Atomic(array_t *) array;
} deque_t;
```
接著介紹 `main()` 主要介紹起始任務與終止的任務,
建立 work 執行 fib 任務計算費式數,並且是就緒的狀態,包含兩個參數, `done_work` 後續任務,以及要計算的費式數 `n` ,需要注意的是參數式以指標的形式傳遞。
```c
work_t *work = malloc(sizeof(work_t) + 2 * sizeof(int *));
work->code = &fib; // run fib() task
work->join_count = 0; // ready
int *n = malloc(sizeof(int));
*n = fib_num;
work->args[0] = done_work;
work->args[1] = n;
```
`done_work` 是上面 `work` 的後續任務,執行 `done_task` 的程式碼,並且需要等待 `work` 生成的子任務完成,因此`join_count = 1` , 其中 `args[0]` 通常儲存後續任務,然而此任務為最後一個任務,因此無後續任務,`args[1]` 也就是 `final_result` 用於儲存計算的結果,並且在 `done_work` 中初始設為 `-1` 作為終止條件。
```c
work_t *done_work = malloc(sizeof(work_t) + 2 * sizeof(int *));
done_work->code = &done_task;
done_work->join_count = 1;
int *final_result = malloc(sizeof(int));
*final_result=-1;
done_work->args[0] = NULL;
done_work->args[1] = final_result;
```
接著介紹 `work_t *fib(work_t *w)`
分為兩個部分, `n < 2` 與 `n > 2` 的情況,
在 `n < 2` 情況中,建立後續任務,執行 `sum` 的任務,並且需要等待目前的任務完成因此 `join_count = 1` ,並且此後續任務 `child_sum1` 的後續任務是 `cont` ,因為是 `n < 2` 因此根據費式數計算公式 `result = 1`
```c
if (*n < 2) {
work_t *child_sum1 = malloc(sizeof(work_t) + 2 * sizeof(int *));
child_sum1->code = ∑
child_sum1->join_count = 1;
int *result = malloc(sizeof(int));
*result = 1;
child_sum1->args[0] = cont;
child_sum1->args[1] = result;
return join_work(child_sum1);
}
```
在 `n > 2` 情況中,會遞迴的呼叫 `fib(n)` 任務,產生子任務 `fib(n-1)` 與 `fib(n-2)` 放入 `thread_queues` 中,並且建立 `fib(n)` 的後續任務 `child_sum2` ,等待前兩個子任務計算完畢,因此他的 `join_count = 2` 。
並且 `child_sum2` 的後續任務是 `fib(n)` 的後續任務 `cont` 。
下面分別是 `child_sum2` 跟 `fib(n-1)` , 而 `fib(n-2)` 與 `fib(n-1)` 類似只是將 `*n1 = *n - 1` 改成 `*n2 = *n - 2` 。
並且可以注意到最後是回傳 NULL ,並不是因為這裡沒有後續任務,而是將後續任務存在產生的子任務中傳遞。
```c
work_t *child_sum2 = malloc(sizeof(work_t) + 2 * sizeof(int *));
child_sum2->code = ∑
child_sum2->join_count = 2;
int *result = malloc(sizeof(int));
*result=0;
child_sum2->args[0] = cont;
child_sum2->args[1] = result;
/* spawn fib(n-1) */
work_t *fibwork1 = malloc(sizeof(work_t) + 2 * sizeof(int *));
fibwork1->code = &fib;
fibwork1->join_count = 0;
int *n1 = malloc(sizeof(int));
*n1 = *n - 1;
fibwork1->args[0] = child_sum2;
fibwork1->args[1] = n1;
int j = rand() % N_THREADS;
push(&thread_queues[j], fibwork1);
```
sum() 是 fib() 的後續任務,目的是將計算結果相加,首先讀取後續任務 `sum_cont` ,以及後續程式的變數 `result` ,因為要將計算結果存在後續任務的變數中,cur_result 是目前任務的結果,
```c
work_t *sum(work_t *w)
{
work_t *sum_cont = (work_t *) w->args[0]; // continuation task
int *result = (int *)sum_cont->args[1]; // parameter of continuation task
int *cur_result = (int *)w->args[1];
if (*result == -1){
sum_cont->args[1] = cur_result;
return done_task(sum_cont);
}
else{
int before_result = *(int *)sum_cont->args[1]; // just for check process
*result = *result + *cur_result ;
printf("Did item %p => Sum: %d + %d = %d\n", w, before_result, *cur_result, *result);
free(cur_result); // release memory
free(w);
return join_work(sum_cont);
}
}
```
將 `result == -1` 設為中止條件,當執行到最後的任務時,儲存最後結果,接著執行 `done_task ` 將 `&done` 設為 True 告知其他執行序任務結束。
```c
work_t *done_task(work_t *w)
{
printf("Done task Fib = %d\n", *(int *)w->args[1]);
free(w->args[1]);
free(w);
atomic_store(&done, true);
return NULL;
}
```
最後說明我覺得很關鍵的任務,
`do_work` 執行任務而為什麼要有迴圈,因為要接續執行後續任務,除非是 `NULL` 是沒有後續任務要執行的情況
`join_work` 是後續任務,將子任務的結果傳遞給此任務的後續任務,使用 `join_count` 判斷子任務是否都完成,像是在 `child_sum2` 就是 `join_count = 2` ,那麼當只有一個數值完成時,是不滿足變數條件,因此回傳 NULL ,也就是沒有後續任務的情況。
但這時會想說這樣會不會遺失後續任務,其實是不會,因為若 `join_count = 2` 則說明有兩個子任務的後續任務相同,因此當第二個子任務完成時,即可執行後續任務,且子任務的變數儲存在後續任務中,因此不會有資料遺失。
```c
work_t *join_work(work_t *work)
{
int old_join_count = atomic_fetch_sub(&work->join_count, 1);
if (old_join_count == 1)
return work;
return NULL;
}
/* using while loop to execute continuation task */
void do_work(int id, work_t *work)
{
while (work){
// printf("worker %d running item %p\n", id, work);
work = (*(work->code)) (work);
}
}
```
結果討論:
以計算 fib(20) 為例
利用 pref 觀察執行過程
```
perf stat --repeat 5 -e instructions,context-switches,page-faults,cpu-migrations,cpu-clock ./main 20
```
|Metrics |pthread_1 |pthread_4 |pthread_8 |pthread_16|
| ------ | ---| -------- | -------- | -------- |
|Instructions(M) |342 M|452 M|538 M |449 M |
|Context switches |57 |23,047 |18,932 |17,893|
|Page fault| 462 |480| 500 |539|
|CPU-migrations| 8 |82 |259 |699|
|CPUs utilized| 0.270 |1.185 | 0.972 |1.073 |
|Time elapsed |0.222 秒 |0.123 秒| 0.218 秒 |0.199 秒|
隨著執行緒的增加 CPU-migrations 也跟著上升,然而 Page fault , Context switches 這些例外事件發生的機率也跟著上升,使的 CPUs 使用率則會受到例外事件的增加下降。
在 Instructions 的部份,我推測是因為當執行緒的數量還很少時,執行緒之間容易發生資料競爭的情況,也因此會需要花費更多的指令數爭搶資料。
不同執行緒數量下的執行緒工作佇列大小分佈
分別是 1,4,8,16 個執行緒
| workqueue size | 1 | 4 | 8 | 16 |
| ------ | ---| -------- | -------- | -------- |
| size 8 | - | - | - | - |
| size 16 | 1 | - | - | - |
| size 32 | - | - | - | - |
| size 64 | - | - | 2 | 4 |
| size 128 | - | - | 2 | 10 |
| size 256 | - | 3 | 4 | 2 |
| size 512 | - | 1 | | |
可以發現當執行緒只有一個時,工作佇列大小並沒有擴增大很大,這是因為也只有一個執行緒,因此不會有很多子任務產生,生成作多的子任務關係圖類似 skew binary tree 。
而當執行緒增加時,可以發現任務可以更好的分配到各個工作佇列, 因此佇列大小都有變小的趨勢。
## TODO: 研讀 [Wikipedia: work stealing](https://en.wikipedia.org/wiki/Work_stealing) 並紀錄問題
> 解釋[第九週測驗](https://hackmd.io/@sysprog/linux2024-quiz9)的手法和 Linux 核心實作的落差
Child stealing vs. continuation stealing
Child Stealing : 偷取子任務通常都是就緒的情況,因此會直接執行此子任務時,若此子任務接著產生許多子任務,這些任務都會被放入工作佇列,導致佇列過載。
適合用於子任務之間是獨立的情況,像是使用到 Divide and Conquer Algorithms 的任務,例如快速排序 (QuickSort)、合併排序 (MergeSort) 等。
Continuation Stealing : 偷取後續任務的缺點是需要記憶體的負擔,因為需要變數的傳遞,以及該後續的任務的後續任務,也都會是此處理器的任務,記憶體開銷會隨之增大。
工作竊取雖然只竊取一個工作,然而因為工作的延伸性,等於是將竊取工作相關的工作也都竊取走,以偷的取的工作為基準向上(後續任務)或是向下(子任務)偷取之後的工作。
本測驗的 work stealing 手法是採用照順序掃過每個工作佇列,這其實類似 linux 負載平衡 `idle_balance()` 初期所採用的搜尋手法 ,後來改採用維護一個紀錄過載 CPU 的稀疏矩陣,更快搜尋到需要負載平衡的 CPU 。
並且從 [work_queue](https://www.kernel.org/doc/html/next/core-api/workqueue.html) 中的指令發現,工作竊取時是禁止中斷發生的。
## TODO: 撰寫 Linux kernel module 觀察 workqueue 的 work stealing 行為
> 搭配 ftrace (Linux CPU sched book Chapter 6) 來理解