Try   HackMD

Linux 核心專題: workqueue 研究

執行人: ssheep773
解說錄影
Github

Reviewed by kevinzxc1217

請問這裡的比較表格中,為什麼 Context switches, CPUs utilized, Time elapsed 等指標並沒有因應 pthread 的變多呈現遞增或遞減的趨勢呢?

隨著執行緒的增加 CPU-migrations 也跟著上升,然而 Page fault , Context switches 這些例外事件發生的機率也跟著上升,使的 CPUs 使用率則會受到例外事件的增加下降。

Reviewed by kkkkk1109

想請問在你的實作中,是只要有執行緒一發生空閒,就會馬上去偷取其他執行緒的任務嗎,會有限制只能有幾個執行緒能偷取工作嗎? 另外在不同執行緒並行的情況下,工作佇列是會自己 resize 的嗎? 而工作佇列的大小是跟著 fib 數列 spawn 的數量決定的嗎?

每個的執行緒只要空閒就會去偷其他執行緒的工作
當工作佇列滿的時候,會自己 resize 並且固定擴增為原本 size 的兩

Reviewed by 56han

每次執行程式工作佇列大小分佈都相同嗎?程式執行時間和工作佇列大小、執行緒數量有關嗎?

程式執行時間與執行緒數量有關,然而記憶體等開銷也會影響到程式執行時間,至於工作佇列大小只是用儲存工作,與執行時間無關,但可以用來觀察工作的分配情況。本任務是使用隨機的偷取任務,因此每次執行皆不相同。

Reviewed by yuyuan0625

下文程式碼的註解請用英文書寫
請問哪種情境適合 child stealing 又哪種情境適合 continuation stealing 呢?

child stealing 適用於子任務之間較獨立,不會互相等待
continuation stealing 適合任務之間存在依賴性的情況,可以有效地管理後續任務的執行

任務說明

延伸第九週測驗的 work steal 題目,探討 Linux 核心的 workqueue 如何處理 work stealing。

TODO: 重作第九週測驗的測驗三

包含延伸問題

根據論文 〈Cilk: An Efficient Multithreaded Runtime System〉 實作計算費式數列。

論文特點是使用 Cilk 達到非阻塞 (non-blocking) 的並行處理,經由將程式碼拆分成多段執行,而含有執行程式碼與所需的參數稱為閉包 (closure) ,並以一個閉包為單位執行任務,將他們分配到不同處理器上並行執行。
論文中以 Cilk 撰寫費式數的函式

thread fib (cont int k, int n)
{ 
    if (n<2)
        send argument (k,n)
    else
    { 
        cont int x, y;
        spawn next sum (k, ?x, ?y);
        spawn fib (x, n-1);
        spawn fib (y, n-2);
    }
}
thread sum (cont int k, int x, int y)
{ 
    send argument (k, x+y);
}

介紹 Cilk 使用到的種語句(statement),

spawn : 類似 fork 產生子任務。

spawn next : 是當前任務的後續任務。因為是非阻塞式的設計,因此將任務拆成兩個部分,將需要等待子任務的程式碼放在當前後續任務 (continuation) 中,當子任務完成,後續任務才會被放進 work queue 中等待執行。
在上面的例子中 spawn next sum (k, ?x, ?y); 就是須等待 x ,y 計算完成。 k 則是後續體任務。

send argument (k, x+y) : 則是變數的傳遞,將變數傳遞給需要的後續任務。

Image Not Showing Possible Reasons
  • The image was uploaded to a note which you don't have access to
  • The note which the image was originally uploaded to has been deleted
Learn More →

從論文中的圖解釋程式執行的流程,
A 產生 (spawn) 一個子任務(子閉包) B ,與產生(spawn next) 後續任務 F ,等待 B 任務的完成。

B 產生兩個子任務,並建立後續任務 E ,用於等待 C 、 D 的完成。

接著說明我的實作,首先在資料結構上
work_t 也就是論文中的閉包 (closure) 是執行的最小單元,包含三個元素 : 指向執行任務的程式碼、所需的變數,以及變數的數量,用於判斷任務是否就緒 (Ready)。

typedef struct work_internal {
    task_t code;           // execute code
    atomic_int join_count; // number of parameter to get ready
    void *args[];          // parameter
} work_t;

其中 void *args[]; 的形式可以在變數宣告的時候在決定大小,其規則是一個結構只能有一個且必須放在結構的最後一項。

使用 deque_t 作為各執行緒的 work queue ,佇列是儲存 work 位址,並用 top, bottom 紀錄佇列的工作狀況,size 判斷佇列是否存滿需要擴充的依據,其中將 array_t 分離出來方便後續更動 work_t *buffer[]

typedef struct {
    atomic_size_t size;
    _Atomic work_t *buffer[];
} array_t;

typedef struct {
    /* Assume that they never overflow */
    atomic_size_t top, bottom;
    _Atomic(array_t *) array;
} deque_t;

接著介紹 main() 主要介紹起始任務與終止的任務,

建立 work 執行 fib 任務計算費式數,並且是就緒的狀態,包含兩個參數, done_work 後續任務,以及要計算的費式數 n ,需要注意的是參數式以指標的形式傳遞。

work_t *work = malloc(sizeof(work_t) + 2 * sizeof(int *));
work->code = &fib;        // run fib() task
work->join_count = 0;     // ready
int *n = malloc(sizeof(int));
*n = fib_num;
work->args[0] = done_work;              
work->args[1] = n;              

done_work 是上面 work 的後續任務,執行 done_task 的程式碼,並且需要等待 work 生成的子任務完成,因此join_count = 1 , 其中 args[0] 通常儲存後續任務,然而此任務為最後一個任務,因此無後續任務,args[1] 也就是 final_result 用於儲存計算的結果,並且在 done_work 中初始設為 -1 作為終止條件。

work_t *done_work = malloc(sizeof(work_t) + 2 * sizeof(int *));
done_work->code = &done_task;         
done_work->join_count = 1;      
int *final_result = malloc(sizeof(int));
*final_result=-1;
done_work->args[0] = NULL;   
done_work->args[1] = final_result;         

接著介紹 work_t *fib(work_t *w)
分為兩個部分, n < 2n > 2 的情況,

n < 2 情況中,建立後續任務,執行 sum 的任務,並且需要等待目前的任務完成因此 join_count = 1 ,並且此後續任務 child_sum1 的後續任務是 cont ,因為是 n < 2 因此根據費式數計算公式 result = 1

if (*n < 2) {
    work_t *child_sum1 = malloc(sizeof(work_t) + 2 * sizeof(int *));
    child_sum1->code = &sum;       
    child_sum1->join_count = 1;      
    int *result = malloc(sizeof(int));
    *result = 1;  
    child_sum1->args[0] = cont;
    child_sum1->args[1] = result;

    return join_work(child_sum1);
}

n > 2 情況中,會遞迴的呼叫 fib(n) 任務,產生子任務 fib(n-1)fib(n-2) 放入 thread_queues 中,並且建立 fib(n) 的後續任務 child_sum2 ,等待前兩個子任務計算完畢,因此他的 join_count = 2
並且 child_sum2 的後續任務是 fib(n) 的後續任務 cont

下面分別是 child_sum2fib(n-1) , 而 fib(n-2)fib(n-1) 類似只是將 *n1 = *n - 1 改成 *n2 = *n - 2

並且可以注意到最後是回傳 NULL ,並不是因為這裡沒有後續任務,而是將後續任務存在產生的子任務中傳遞。

work_t *child_sum2 = malloc(sizeof(work_t) + 2 * sizeof(int *));
child_sum2->code = &sum;         
child_sum2->join_count = 2;      
int *result = malloc(sizeof(int));
*result=0;
child_sum2->args[0] = cont;   
child_sum2->args[1] = result;  

/* spawn fib(n-1) */
work_t *fibwork1 = malloc(sizeof(work_t) + 2 * sizeof(int *));
fibwork1->code = &fib;        
fibwork1->join_count = 0;      
int *n1 = malloc(sizeof(int));
*n1 = *n - 1;
fibwork1->args[0] = child_sum2;
fibwork1->args[1] = n1; 

int j = rand() % N_THREADS;
push(&thread_queues[j], fibwork1);

sum() 是 fib() 的後續任務,目的是將計算結果相加,首先讀取後續任務 sum_cont ,以及後續程式的變數 result ,因為要將計算結果存在後續任務的變數中,cur_result 是目前任務的結果,

work_t *sum(work_t *w)
{
    work_t *sum_cont = (work_t *) w->args[0];  // continuation task
    int *result = (int *)sum_cont->args[1];    // parameter of continuation task

    int *cur_result = (int *)w->args[1];

    if (*result == -1){
        sum_cont->args[1] = cur_result;  
        return done_task(sum_cont);
    }
    else{
        int before_result = *(int *)sum_cont->args[1];  // just for check process
        *result = *result + *cur_result ;
        printf("Did item %p => Sum: %d + %d = %d\n", w, before_result, *cur_result,  *result);
        free(cur_result); // release memory
        free(w);
        return join_work(sum_cont);
    }

}

result == -1 設為中止條件,當執行到最後的任務時,儲存最後結果,接著執行 done_task &done 設為 True 告知其他執行序任務結束。

work_t *done_task(work_t *w)
{
    printf("Done task Fib = %d\n", *(int *)w->args[1]);
    free(w->args[1]);
    free(w);
    atomic_store(&done, true);
    return NULL;
}

最後說明我覺得很關鍵的任務,

do_work 執行任務而為什麼要有迴圈,因為要接續執行後續任務,除非是 NULL 是沒有後續任務要執行的情況

join_work 是後續任務,將子任務的結果傳遞給此任務的後續任務,使用 join_count 判斷子任務是否都完成,像是在 child_sum2 就是 join_count = 2 ,那麼當只有一個數值完成時,是不滿足變數條件,因此回傳 NULL ,也就是沒有後續任務的情況。
但這時會想說這樣會不會遺失後續任務,其實是不會,因為若 join_count = 2 則說明有兩個子任務的後續任務相同,因此當第二個子任務完成時,即可執行後續任務,且子任務的變數儲存在後續任務中,因此不會有資料遺失。

work_t *join_work(work_t *work)
{
    int old_join_count = atomic_fetch_sub(&work->join_count, 1);
    if (old_join_count == 1)
        return work;
    
    return NULL;
}


/* using while loop to execute continuation task */
void do_work(int id, work_t *work)
{
    while (work){
        // printf("worker %d running item %p\n", id, work);
        work = (*(work->code)) (work);
    }
}

結果討論:
以計算 fib(20) 為例
利用 pref 觀察執行過程

perf stat --repeat 5 -e instructions,context-switches,page-faults,cpu-migrations,cpu-clock ./main 20
Metrics pthread_1 pthread_4 pthread_8 pthread_16
Instructions(M) 342 M 452 M 538 M 449 M
Context switches 57 23,047 18,932 17,893
Page fault 462 480 500 539
CPU-migrations 8 82 259 699
CPUs utilized 0.270 1.185 0.972 1.073
Time elapsed 0.222 秒 0.123 秒 0.218 秒 0.199 秒

隨著執行緒的增加 CPU-migrations 也跟著上升,然而 Page fault , Context switches 這些例外事件發生的機率也跟著上升,使的 CPUs 使用率則會受到例外事件的增加下降。
在 Instructions 的部份,我推測是因為當執行緒的數量還很少時,執行緒之間容易發生資料競爭的情況,也因此會需要花費更多的指令數爭搶資料。

不同執行緒數量下的執行緒工作佇列大小分佈
分別是 1,4,8,16 個執行緒

workqueue size 1 4 8 16
size 8 - - - -
size 16 1 - - -
size 32 - - - -
size 64 - - 2 4
size 128 - - 2 10
size 256 - 3 4 2
size 512 - 1

可以發現當執行緒只有一個時,工作佇列大小並沒有擴增大很大,這是因為也只有一個執行緒,因此不會有很多子任務產生,生成作多的子任務關係圖類似 skew binary tree 。

而當執行緒增加時,可以發現任務可以更好的分配到各個工作佇列, 因此佇列大小都有變小的趨勢。

解決於工作佇列擴增時的 race condition

先前新的任務是推送至隨機選擇的工作佇列,這會造成當佇列裝滿時,可能造成有兩個執行緒同時呼叫 push() 或是 resize() ,這會造成 race condition ,並且在多執行緒程式中呼叫 rand() 是,因為可能使用相同的隨機種子因而得到相同的隨機數字,造成資源競爭的情形。

-   j = rand() % N_THREADS;       
-   push(&thread_queues[j], fibwork2);
+   // Push into the current thread's work queue instead of pushing randomly.
+   push(&thread_queues[thread_id], fibwork2);

為取得目前是幾個執行序,在 work_t 中加入執行序號碼 thread_id 紀錄執行此任務的執行序號碼。
如此執行緒就只會將新產生任務推送至自己的工作佇列中,
從而避免 resize 時 race condition 的問題。
然而這也造成工作負載平衡更依賴於工作竊取,雖說論文中原本的方法就是只能推送至自己工作佇列,不應該使用隨機推送任務,因為這樣無法釐清是隨機推送還是工作竊取達到工作覆載的平衡。

 typedef struct work_internal {
     task_t code;
     atomic_int join_count;
+    int thread_id;  // Add field to track task ownership across threads
     void *args[];
 } work_t;

修改後發現工作佇列的大小實則變化不大,只有部份的工作佇列擴增到 2 倍,與先前的結果比較,可發現之前的程式資源競爭的情況嚴重並有 race condition 的問題。

workqueue size 1 2 4 8 16
size 8 - - 1 5 14
size 16 1 2 3 3 2

TODO: 研讀 Wikipedia: work stealing 並紀錄問題

解釋第九週測驗的手法和 Linux 核心實作的落差

Child stealing vs. continuation stealing

Child Stealing : 偷取子任務通常都是就緒的情況,因此會直接執行此子任務時,若此子任務接著產生許多子任務,這些任務都會被放入工作佇列,導致佇列過載。
適合用於子任務之間是獨立的情況,像是使用到 Divide and Conquer Algorithms 的任務,例如快速排序 (QuickSort)、合併排序 (MergeSort) 等。

Continuation Stealing : 偷取後續任務的缺點是需要記憶體的負擔,因為需要變數的傳遞,以及該後續的任務的後續任務,也都會是此處理器的任務,記憶體開銷會隨之增大。

工作竊取雖然只竊取一個工作,然而因為工作的延伸性,等於是將竊取工作相關的工作也都竊取走,以偷的取的工作為基準向上(後續任務)或是向下(子任務)偷取之後的工作。

本測驗的 work stealing 手法是採用照順序掃過每個工作佇列,這其實類似 linux 負載平衡 idle_balance() 初期所採用的搜尋手法 ,後來改採用維護一個紀錄過載 CPU 的稀疏矩陣,更快搜尋到需要負載平衡的 CPU 。
並且從 work_queue 中的指令發現,工作竊取時是禁止中斷發生的。

TODO: 撰寫 Linux kernel module 觀察 workqueue 的 work stealing 行為

搭配 ftrace (Linux CPU sched book Chapter 6) 來理解