## Linux 核心專題: workqueue 研究 > 執行人: ssheep773 [解說錄影](https://youtu.be/HvHbeGna9OE) [Github](https://github.com/ssheep773/work-stealing) ### Reviewed by `kevinzxc1217` 請問這裡的比較表格中,為什麼 `Context switches`, `CPUs utilized`, `Time elapsed` 等指標並沒有因應 pthread 的變多呈現遞增或遞減的趨勢呢? > 隨著執行緒的增加 CPU-migrations 也跟著上升,然而 Page fault , Context switches 這些例外事件發生的機率也跟著上升,使的 CPUs 使用率則會受到例外事件的增加下降。 ### Reviewed by `kkkkk1109` 想請問在你的實作中,是只要有執行緒一發生空閒,就會馬上去偷取其他執行緒的任務嗎,會有限制只能有幾個執行緒能偷取工作嗎? 另外在不同執行緒並行的情況下,工作佇列是會自己 resize 的嗎? 而工作佇列的大小是跟著 fib 數列 spawn 的數量決定的嗎? > 每個的執行緒只要空閒就會去偷其他執行緒的工作 > 當工作佇列滿的時候,會自己 resize 並且固定擴增為原本 size 的兩 ### Reviewed by `56han` 每次執行程式工作佇列大小分佈都相同嗎?程式執行時間和工作佇列大小、執行緒數量有關嗎? >程式執行時間與執行緒數量有關,然而記憶體等開銷也會影響到程式執行時間,至於工作佇列大小只是用儲存工作,與執行時間無關,但可以用來觀察工作的分配情況。本任務是使用隨機的偷取任務,因此每次執行皆不相同。 ### Reviewed by `yuyuan0625` 下文程式碼的註解請用英文書寫 請問哪種情境適合 child stealing 又哪種情境適合 continuation stealing 呢? >child stealing 適用於子任務之間較獨立,不會互相等待 continuation stealing 適合任務之間存在依賴性的情況,可以有效地管理後續任務的執行 ## 任務說明 延伸第九週測驗的 work steal 題目,探討 Linux 核心的 workqueue 如何處理 work stealing。 ## TODO: 重作[第九週測驗](https://hackmd.io/@sysprog/linux2024-quiz9)的測驗三 > 包含延伸問題 根據論文 〈Cilk: An Efficient Multithreaded Runtime System〉 實作計算費式數列。 論文特點是使用 Cilk 達到非阻塞 (non-blocking) 的並行處理,經由將程式碼拆分成多段執行,而含有執行程式碼與所需的參數稱為閉包 (closure) ,並以一個閉包為單位執行任務,將他們分配到不同處理器上並行執行。 論文中以 Cilk 撰寫費式數的函式 ```cilk thread fib (cont int k, int n) { if (n<2) send argument (k,n) else { cont int x, y; spawn next sum (k, ?x, ?y); spawn fib (x, n-1); spawn fib (y, n-2); } } thread sum (cont int k, int x, int y) { send argument (k, x+y); } ``` 介紹 Cilk 使用到的種語句(statement), `spawn` : 類似 `fork` 產生子任務。 `spawn next` : 是當前任務的後續任務。因為是非阻塞式的設計,因此將任務拆成兩個部分,將需要等待子任務的程式碼放在當前後續任務 (continuation) 中,當子任務完成,後續任務才會被放進 work queue 中等待執行。 在上面的例子中 `spawn next sum (k, ?x, ?y);` 就是須等待 `x` ,`y` 計算完成。 `k` 則是後續體任務。 `send argument (k, x+y)` : 則是變數的傳遞,將變數傳遞給需要的後續任務。 ![image](https://hackmd.io/_uploads/Sysh-b2LR.png) 從論文中的圖解釋程式執行的流程, A 產生 (`spawn`) 一個子任務(子閉包) B ,與產生(`spawn next`) 後續任務 F ,等待 B 任務的完成。 B 產生兩個子任務,並建立後續任務 E ,用於等待 C 、 D 的完成。 接著說明我的實作,首先在資料結構上 `work_t` 也就是論文中的閉包 (closure) 是執行的最小單元,包含三個元素 : 指向執行任務的程式碼、所需的變數,以及變數的數量,用於判斷任務是否就緒 (Ready)。 ```c typedef struct work_internal { task_t code; // execute code atomic_int join_count; // number of parameter to get ready void *args[]; // parameter } work_t; ``` >其中 `void *args[];` 的形式可以在變數宣告的時候在決定大小,其規則是一個結構只能有一個且必須放在結構的最後一項。 使用 `deque_t` 作為各執行緒的 work queue ,佇列是儲存 work 位址,並用 `top, bottom` 紀錄佇列的工作狀況,size 判斷佇列是否存滿需要擴充的依據,其中將 `array_t` 分離出來方便後續更動 `work_t *buffer[]` ```c typedef struct { atomic_size_t size; _Atomic work_t *buffer[]; } array_t; typedef struct { /* Assume that they never overflow */ atomic_size_t top, bottom; _Atomic(array_t *) array; } deque_t; ``` 接著介紹 `main()` 主要介紹起始任務與終止的任務, 建立 work 執行 fib 任務計算費式數,並且是就緒的狀態,包含兩個參數, `done_work` 後續任務,以及要計算的費式數 `n` ,需要注意的是參數式以指標的形式傳遞。 ```c work_t *work = malloc(sizeof(work_t) + 2 * sizeof(int *)); work->code = &fib; // run fib() task work->join_count = 0; // ready int *n = malloc(sizeof(int)); *n = fib_num; work->args[0] = done_work; work->args[1] = n; ``` `done_work` 是上面 `work` 的後續任務,執行 `done_task` 的程式碼,並且需要等待 `work` 生成的子任務完成,因此`join_count = 1` , 其中 `args[0]` 通常儲存後續任務,然而此任務為最後一個任務,因此無後續任務,`args[1]` 也就是 `final_result` 用於儲存計算的結果,並且在 `done_work` 中初始設為 `-1` 作為終止條件。 ```c work_t *done_work = malloc(sizeof(work_t) + 2 * sizeof(int *)); done_work->code = &done_task; done_work->join_count = 1; int *final_result = malloc(sizeof(int)); *final_result=-1; done_work->args[0] = NULL; done_work->args[1] = final_result; ``` 接著介紹 `work_t *fib(work_t *w)` 分為兩個部分, `n < 2` 與 `n > 2` 的情況, 在 `n < 2` 情況中,建立後續任務,執行 `sum` 的任務,並且需要等待目前的任務完成因此 `join_count = 1` ,並且此後續任務 `child_sum1` 的後續任務是 `cont` ,因為是 `n < 2` 因此根據費式數計算公式 `result = 1` ```c if (*n < 2) { work_t *child_sum1 = malloc(sizeof(work_t) + 2 * sizeof(int *)); child_sum1->code = &sum; child_sum1->join_count = 1; int *result = malloc(sizeof(int)); *result = 1; child_sum1->args[0] = cont; child_sum1->args[1] = result; return join_work(child_sum1); } ``` 在 `n > 2` 情況中,會遞迴的呼叫 `fib(n)` 任務,產生子任務 `fib(n-1)` 與 `fib(n-2)` 放入 `thread_queues` 中,並且建立 `fib(n)` 的後續任務 `child_sum2` ,等待前兩個子任務計算完畢,因此他的 `join_count = 2` 。 並且 `child_sum2` 的後續任務是 `fib(n)` 的後續任務 `cont` 。 下面分別是 `child_sum2` 跟 `fib(n-1)` , 而 `fib(n-2)` 與 `fib(n-1)` 類似只是將 `*n1 = *n - 1` 改成 `*n2 = *n - 2` 。 並且可以注意到最後是回傳 NULL ,並不是因為這裡沒有後續任務,而是將後續任務存在產生的子任務中傳遞。 ```c work_t *child_sum2 = malloc(sizeof(work_t) + 2 * sizeof(int *)); child_sum2->code = &sum; child_sum2->join_count = 2; int *result = malloc(sizeof(int)); *result=0; child_sum2->args[0] = cont; child_sum2->args[1] = result; /* spawn fib(n-1) */ work_t *fibwork1 = malloc(sizeof(work_t) + 2 * sizeof(int *)); fibwork1->code = &fib; fibwork1->join_count = 0; int *n1 = malloc(sizeof(int)); *n1 = *n - 1; fibwork1->args[0] = child_sum2; fibwork1->args[1] = n1; int j = rand() % N_THREADS; push(&thread_queues[j], fibwork1); ``` sum() 是 fib() 的後續任務,目的是將計算結果相加,首先讀取後續任務 `sum_cont` ,以及後續程式的變數 `result` ,因為要將計算結果存在後續任務的變數中,cur_result 是目前任務的結果, ```c work_t *sum(work_t *w) { work_t *sum_cont = (work_t *) w->args[0]; // continuation task int *result = (int *)sum_cont->args[1]; // parameter of continuation task int *cur_result = (int *)w->args[1]; if (*result == -1){ sum_cont->args[1] = cur_result; return done_task(sum_cont); } else{ int before_result = *(int *)sum_cont->args[1]; // just for check process *result = *result + *cur_result ; printf("Did item %p => Sum: %d + %d = %d\n", w, before_result, *cur_result, *result); free(cur_result); // release memory free(w); return join_work(sum_cont); } } ``` 將 `result == -1` 設為中止條件,當執行到最後的任務時,儲存最後結果,接著執行 `done_task ` 將 `&done` 設為 True 告知其他執行序任務結束。 ```c work_t *done_task(work_t *w) { printf("Done task Fib = %d\n", *(int *)w->args[1]); free(w->args[1]); free(w); atomic_store(&done, true); return NULL; } ``` 最後說明我覺得很關鍵的任務, `do_work` 執行任務而為什麼要有迴圈,因為要接續執行後續任務,除非是 `NULL` 是沒有後續任務要執行的情況 `join_work` 是後續任務,將子任務的結果傳遞給此任務的後續任務,使用 `join_count` 判斷子任務是否都完成,像是在 `child_sum2` 就是 `join_count = 2` ,那麼當只有一個數值完成時,是不滿足變數條件,因此回傳 NULL ,也就是沒有後續任務的情況。 但這時會想說這樣會不會遺失後續任務,其實是不會,因為若 `join_count = 2` 則說明有兩個子任務的後續任務相同,因此當第二個子任務完成時,即可執行後續任務,且子任務的變數儲存在後續任務中,因此不會有資料遺失。 ```c work_t *join_work(work_t *work) { int old_join_count = atomic_fetch_sub(&work->join_count, 1); if (old_join_count == 1) return work; return NULL; } /* using while loop to execute continuation task */ void do_work(int id, work_t *work) { while (work){ // printf("worker %d running item %p\n", id, work); work = (*(work->code)) (work); } } ``` 結果討論: 以計算 fib(20) 為例 利用 pref 觀察執行過程 ``` perf stat --repeat 5 -e instructions,context-switches,page-faults,cpu-migrations,cpu-clock ./main 20 ``` |Metrics |pthread_1 |pthread_4 |pthread_8 |pthread_16| | ------ | ---| -------- | -------- | -------- | |Instructions(M) |342 M|452 M|538 M |449 M | |Context switches |57 |23,047 |18,932 |17,893| |Page fault| 462 |480| 500 |539| |CPU-migrations| 8 |82 |259 |699| |CPUs utilized| 0.270 |1.185 | 0.972 |1.073 | |Time elapsed |0.222 秒 |0.123 秒| 0.218 秒 |0.199 秒| 隨著執行緒的增加 CPU-migrations 也跟著上升,然而 Page fault , Context switches 這些例外事件發生的機率也跟著上升,使的 CPUs 使用率則會受到例外事件的增加下降。 在 Instructions 的部份,我推測是因為當執行緒的數量還很少時,執行緒之間容易發生資料競爭的情況,也因此會需要花費更多的指令數爭搶資料。 不同執行緒數量下的執行緒工作佇列大小分佈 分別是 1,4,8,16 個執行緒 | workqueue size | 1 | 4 | 8 | 16 | | ------ | ---| -------- | -------- | -------- | | size 8 | - | - | - | - | | size 16 | 1 | - | - | - | | size 32 | - | - | - | - | | size 64 | - | - | 2 | 4 | | size 128 | - | - | 2 | 10 | | size 256 | - | 3 | 4 | 2 | | size 512 | - | 1 | | | 可以發現當執行緒只有一個時,工作佇列大小並沒有擴增大很大,這是因為也只有一個執行緒,因此不會有很多子任務產生,生成作多的子任務關係圖類似 skew binary tree 。 而當執行緒增加時,可以發現任務可以更好的分配到各個工作佇列, 因此佇列大小都有變小的趨勢。 ## TODO: 研讀 [Wikipedia: work stealing](https://en.wikipedia.org/wiki/Work_stealing) 並紀錄問題 > 解釋[第九週測驗](https://hackmd.io/@sysprog/linux2024-quiz9)的手法和 Linux 核心實作的落差 Child stealing vs. continuation stealing Child Stealing : 偷取子任務通常都是就緒的情況,因此會直接執行此子任務時,若此子任務接著產生許多子任務,這些任務都會被放入工作佇列,導致佇列過載。 適合用於子任務之間是獨立的情況,像是使用到 Divide and Conquer Algorithms 的任務,例如快速排序 (QuickSort)、合併排序 (MergeSort) 等。 Continuation Stealing : 偷取後續任務的缺點是需要記憶體的負擔,因為需要變數的傳遞,以及該後續的任務的後續任務,也都會是此處理器的任務,記憶體開銷會隨之增大。 工作竊取雖然只竊取一個工作,然而因為工作的延伸性,等於是將竊取工作相關的工作也都竊取走,以偷的取的工作為基準向上(後續任務)或是向下(子任務)偷取之後的工作。 本測驗的 work stealing 手法是採用照順序掃過每個工作佇列,這其實類似 linux 負載平衡 `idle_balance()` 初期所採用的搜尋手法 ,後來改採用維護一個紀錄過載 CPU 的稀疏矩陣,更快搜尋到需要負載平衡的 CPU 。 並且從 [work_queue](https://www.kernel.org/doc/html/next/core-api/workqueue.html) 中的指令發現,工作竊取時是禁止中斷發生的。 ## TODO: 撰寫 Linux kernel module 觀察 workqueue 的 work stealing 行為 > 搭配 ftrace (Linux CPU sched book Chapter 6) 來理解