# Linux 核心專題: 並行工作佇列的設計和應用 > 執行人: chloe0919 > [專題解說影片](https://www.youtube.com/watch?v=zfj_0PHP94g) ### Reviewed by `yy214123` 考慮到佇列僅剩最後一個元素的情況,我想詢問在 `take` 及 `steal` 操作中, `q->top` 及 `t` 這兩個變數的變化與 `cmpxchg` 的運作機制是如何搭配的? ```c if (!atomic_compare_exchange_strong_explicit( &q->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed)) /* Failed race */ x = EMPTY; ``` > [name=Chloexyw] 以 `take` 為例,在佇列僅剩一個元素時會利用 `cmpxchg` 對 `q->top` 和 `t` 的位置進行判斷 > 1. 如果此時這兩個位置**相同**,則代表沒有其他執行緒在 steal,所以 `cmpxchg` 會將 `&q->top` 更新成 `t + 1` 並且回傳 true > 2. 這兩個位置**不相同**,代表有其他執行緒在 steal 所以改變了 `&q->top` 的值,導致此值和 `take` 剛開始前索取出保存在 `t` 的不同,所以此時 `cmpxchg` 則不會更新 `&q->top` (已經被提早更新了),並且回傳 false 進入 if 判斷中將 x 設為 EMPTY,代表 `take` 失敗 ### Reviewed by `youjiaw` 在 [steal 的競爭問題](https://hackmd.io/XAECMfsbRSeDUbBZzRdC2A?both#steal-%E7%9A%84%E7%AB%B6%E7%88%AD%E5%95%8F%E9%A1%8C),我認為用隨機數減少執行緒之間的競爭是很好的方式,但是我很好奇測試結果中,少數不使用隨機數的執行時間反而比較少的情況,是由什麼原因造成? ### Reviewed by `kkkkk1109` > 在模擬好情況後一樣使用 ~~valgrin~~ Valgrind 檢查看看,可以看到這邊會顯示出 definitely lost,也就是真的記憶體洩漏 這裡有錯字 ## 任務簡介 延伸第九週測驗三 (work-steal),搭配閱讀指定的論文〈Cilk: An Efficient Multithreaded Runtime System〉,說明如何用 C11 Atomics 實作 deque 並建構足以進行 work-stealing 的並行結構,最終應用於平行化的資料排序實作中。 ## TODO: 重作第九週測驗三及延伸問題 > 不用抄寫參考題解,專注於 work-steal 程式碼本身。應進行所有指定的延伸問題。 **結構體:** 首先建立 `work_t` 結構體,用來將執行流程拆成多個段落,每個段落都以 `work_t` 進行描述,根據 〈[Cilk: An Efficient Multithreaded Runtime System](http://supertech.csail.mit.edu/papers/PPoPP95.pdf)〉的說明,`work_t` 對應到的也就是論文中提到的資料結構 "closure" ```c typedef struct work_internal { task_t code; atomic_int join_count; void *args[]; } work_t ``` * `code`: 指標,用來指向各執行緒要啟動任務時所要執行的函式,如果準備好執行則回傳下一個執行項目,否則回傳 NULL * `join_count`: 定義為 `atomic` 的整數,用來計算這個 work 還缺少了多少 arguments 才能進行 * `args`: 即 work 執行所需要的 arguments 兩結構體用來定義 `deque` ```c typedef struct { atomic_size_t size; _Atomic work_t *buffer[]; } array_t; typedef struct { /* Assume that they never overflow */ atomic_size_t top, bottom; _Atomic(array_t *) array; } deque_t; ``` * `top/bottom`: 表示佇列中元素的有效範圍 * `array`: 為指向函式 `array_t` 的指標,用來指向整個佇列本體 - [ ] `push` 操作流程主要是先取出 `bottom` 和 `top` 對應位置,並且先判斷佇列是否為 full,若成立則需要 resize 佇列,避免覆蓋到原本的 work,若不為 full 則直接將給定的 work `w` 更新到當前 `bottom` 所指的位置。 另外要確保放入 work `w` 和更新 `bottom` 位置為 `bottom + 1` 這兩個動作是不能被重排的,否則會出現某一執行緒還沒有放入 work,而另一執行緒已處理該位置上工作的問題,因此使用 `atomic_thread_fence(memory_order_release)` 確保前面的指令不會被重排到該指令之後。 ```c atomic_store_explicit(&a->buffer[b % a->size], w, memory_order_relaxed); atomic_thread_fence(memory_order_release); atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed); ``` - [ ] `resize` 擴展佇列是為了解決當佇列已滿時,為避免覆蓋到原始 work,所以重新創造一個新佇列,根據程式碼的註解說明,原始論文中是使用 garbage collector,可以自動釋放許久未訪問到的記憶體空間,但在此例中並沒有使用 garbage collector,但也不能直接釋放 `*a`,會產生其他執行緒正使用舊的佇列到一半被釋放的問題,所以藉由擴展佇列到原本的兩倍,使 memory leakage 是指數成長,可以成功限制 memory leakage 不會無限增長。 ``` /* The question arises as to the appropriate timing for releasing memory * associated with the previous array denoted by *a. In the original Chase * and Lev paper, this task was undertaken by the garbage collector, which * presumably possessed knowledge about ongoing steal operations by other * threads that might attempt to access data within the array. * * In our context, the responsible deallocation of *a cannot occur at this * point, as another thread could potentially be in the process of reading * from it. Thus, we opt to abstain from freeing *a in this context, * resulting in memory leakage. It is worth noting that our expansion * strategy for these queues involves consistent doubling of their size; * this design choice ensures that any leaked memory remains bounded by the * memory actively employed by the functional queues. */ ``` - [ ] `take` `take` 的目的是從執行緒自己的 dequeue 中取得下個要實行的 work,分別取出 bottom 和 top 的位置後,因為有 `atomic_thread_fence` 確保前面程式必須優於後面執行,如果等到確定好 deque 是否為空之後才進行 bottom 的更新,若此時有其他執行緒執行 `steal` 的動作時,就會發生 `take` 和 `steal` 同時進行的競爭問題,反之先更新 bottom 即使事後之後發現 deque 是空的或是 work 已經被 `steal` 掉,只要再復原 bottom 就好。 ```c work_t *take(deque_t *q) { size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed) - 1; array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed); atomic_store_explicit(&q->bottom, b, memory_order_relaxed); atomic_thread_fence(memory_order_seq_cst); ``` 再來,deque 有三種情境 : * `t < b` : 可以直接取走 `b` 位置對應的 work,最後要恢復 bottom 的值(更新為 `b` + 1) * `t == b` : deque 目前只剩一個 entry,此時可能會和 `steal` 發生競爭問題,可以透過 cmpxchg 來判斷是哪種情形 : * 只有 `take` : 判斷 `q->top` 和 `t` 的位置是否相等,若相等則代表目前沒有發生 steal 的競爭問題,所以只需要直接將 `q->top` 更新成 `t + 1` 並返回 true 即可 * 發生 `steal` : `q->top` 和 `t` 的位置不相等,代表目前有其他執行緒正在 `steal`,返回 false 進入 Failed race 將 `x = EMPTY`,表示不需要 `take` 了 最後兩種情況都要恢復 bottom ```c if (t <= b) { /* Non-empty queue */ x = (work_t *)atomic_load_explicit(&a->buffer[b % a->size], memory_order_relaxed); if (t == b) { /* Single last element in queue */ if (!atomic_compare_exchange_strong_explicit( &q->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed)) /* Failed race */ x = EMPTY; atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed); } ``` * ` t > b`: deque 為空,需要復原 bottom 以下用圖片說明**只有 `take`** 的操作流程 : 假設目前僅剩一個 entry ```graphviz digraph structs { node [shape=none]; struct1 [label=< <TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0"> <TR> <TD PORT="f0">A[0]</TD> <TD PORT="f1" WIDTH="50" HEIGHT="30" >A[1]</TD> <TD BGCOLOR="lightblue" PORT="f2" WIDTH="50" HEIGHT="30">A[2]</TD> <TD PORT="f3" WIDTH="50" HEIGHT="30">A[3]</TD> <TD PORT="f4" WIDTH="50" HEIGHT="30">...</TD> <TD PORT="f5" WIDTH="50" HEIGHT="30">A[n-1]</TD> <TD PORT="f6" WIDTH="50" HEIGHT="30">A[n]</TD> </TR> </TABLE> >]; { rank="same"; top[label="top",shape=plaintext] bottom[label="bottom",shape=plaintext] } top->struct1:f2 bottom-> struct1:f3; } ``` 首先要取出 bottom - 1 的位置並存在 `b`,之後先更新 bottom ```graphviz digraph structs { node [shape=none]; struct1 [label=< <TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0"> <TR> <TD PORT="f0">A[0]</TD> <TD PORT="f1" WIDTH="50" HEIGHT="30" >A[1]</TD> <TD BGCOLOR="lightblue" PORT="f2" WIDTH="50" HEIGHT="30">A[2]</TD> <TD PORT="f3" WIDTH="50" HEIGHT="30">A[3]</TD> <TD PORT="f4" WIDTH="50" HEIGHT="30">...</TD> <TD PORT="f5" WIDTH="50" HEIGHT="30">A[n-1]</TD> <TD PORT="f6" WIDTH="50" HEIGHT="30">A[n]</TD> </TR> </TABLE> >]; top[label="top",shape=plaintext] bottom[label="bottom",shape=plaintext] top->struct1:f2 bottom-> struct1:f2; } ``` 之後判斷目前沒有發生 steal 的競爭問題,所以需要將 `q->top` 更新成 `t + 1`,並且恢復 bottom 的值,最後取走 `b` 位置上的 work ```graphviz digraph structs { node [shape=none]; struct1 [label=< <TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0"> <TR> <TD PORT="f0">A[0]</TD> <TD PORT="f1" WIDTH="50" HEIGHT="30" >A[1]</TD> <TD BGCOLOR="lightblue" PORT="f2" WIDTH="50" HEIGHT="30">A[2]</TD> <TD PORT="f3" WIDTH="50" HEIGHT="30">A[3]</TD> <TD PORT="f4" WIDTH="50" HEIGHT="30">...</TD> <TD PORT="f5" WIDTH="50" HEIGHT="30">A[n-1]</TD> <TD PORT="f6" WIDTH="50" HEIGHT="30">A[n]</TD> </TR> </TABLE> >]; top[label="top",shape=plaintext] bottom[label="bottom",shape=plaintext] k[label="k",shape=plaintext, fontcolor = white] top->struct1:f3 bottom-> struct1:f3; k-> struct1:f2[color = red, dir=back]; } ``` - [ ] `steal` :::danger 注意用語,務必使用本課程教材規範的術語。 ::: `atomic_thread_fence` 保證需先取 `top` 再去取得 `bottom`,防止在這兩個讀取之間有其他<s>內存</s> 記憶體操作的指令被重排在此 ```c size_t t = atomic_load_explicit(&q->top, memory_order_acquire); atomic_thread_fence(memory_order_seq_cst); size_t b = atomic_load_explicit(&q->bottom, memory_order_acquire); ``` 再來若是 `t < b` 則代表有 work 可以 steal,不過還是要透過 cmpxchg 去和 take 競爭,這邊的操作和 `take` 類似,若發生 `take` 則返回 false 進入 Failed race,代表 steal 失敗 ```c work_t *x = EMPTY; if (t < b) { /* Non-empty queue */ array_t *a = atomic_load_explicit(&q->array, memory_order_consume); x = (work_t *)atomic_load_explicit(&a->buffer[t % a->size], memory_order_relaxed); if (!atomic_compare_exchange_strong_explicit( &q->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed)) { /* Failed race */ return ABORT; } ``` - [ ] `thread` 首先,先嘗試在自己的 deque 中執行 `take`,確定是否有 work 可以執行,若沒有就會去其他執行緒的 deque 中尋找可以 steal 的 work,其中如果 `i == id` 就代表是自己 deque,必須跳過到下一個找 當 `stolen` 為 `ABORT` 時,代表當時正有其他執行緒在 `take`,所以 steal 的行為才會失敗,不代表此執行緒的 deque 不能 steal,所以這時將 `i--`,重複回到這個 deque 再嘗試一次 steal ```c /* Currently, there is no work present in my own queue */ work_t *stolen = EMPTY; for (int i = 0; i < N_THREADS; ++i) { if (i == id) continue; stolen = steal(&thread_queues[i]); if (stolen == ABORT) { i--; continue; /* Try again at the same i */ } else if (stolen == EMPTY) continue; /* Found some work to do */ break; } ``` 如果 stolen 不為空代表有 steal 到,則執行該 work,若為空也就是看了一輪都沒有發現可偷的 work,但仍不代表所有 work 都完成,所以額外建立一個 work `done_work`,當其他的 work 完成時就會將 `done_work` 的 `join_count` 減一,直到 `join_count` 歸零時則會執行 `done_task` 函式,將 `done` 設為 true,最後檢查 `done` 看是否所有 work 皆已完成 ```c work_t *done_task(work_t *w) { free(w); atomic_store(&done, true); return NULL; } ``` ### 改進原始實作 先檢查看看原始 work steal 的策略是否有確實減少執行時間,以下為使用 `clock()` 對原始 work steal 程式和未使用 work steal 的策略進行時間的測試,其中仿照 `thread` 的內容定義一個函式,每個 thread 都只完成自己 deque 的工作而不進行 steal :::danger 注意書寫規範: * 使用 lab0 規範的程式碼書寫風格,務必用 clang-format 確認一致 >[name=Chloexyw] <s>好的,已修改</s> >不!你沒改,注意看[程式碼規範](https://hackmd.io/@sysprog/linux2024-lab0-a)的「clang-format 工具和一致的程式撰寫風格」,筆記列出的程式碼依循指定的風格。 ::: ```c void *thread_nosteal(void *payload) { int id = *(int *)payload; deque_t *my_queue = &thread_queues[id]; while (true) { work_t *work = take(my_queue); if (work != EMPTY) { do_work(id, work); } else { if (atomic_load(&done)) break; } } // printf("Thread %d finished\n", id); return NULL; } ``` 可以看到 thread 數量由 24 到 100 個,使用 work steal 策略的表現大致上比沒有使用來的好 ![compare_100](https://hackmd.io/_uploads/S1ZtmrEHR.png) #### 記憶體釋放 > [900c826](https://github.com/chloe0919/work_steal/commit/900c826df821e0bfe3718b8469848bac65bb3101) 我們要處理的記憶體釋放問題包括下面兩種: 1. 所有程式執行完後執行緒的 deque 沒有完全釋放乾淨 2. 在 resize 的函式中為了避免釋放舊的 deque 而導致目前有其他執行緒正在偷取工作時造成錯誤,所以當 deque 滿了而 resize 並不會釋放任何空間 首先第一種記憶體釋放可以在所有執行緒都結束工作時,再將他們釋放: :::danger 注意書寫規範: * 使用 lab0 規範的程式碼書寫風格,務必用 clang-format 確認一致 >[name=Chloexyw] <s>好的,已修改</s> > 不!你沒有!筆記列出的程式碼都要嚴格依循規範,注意細節。 ::: ```c for (int i = 0; i < N_THREADS; ++i){ array_t *array = atomic_load_explicit(&thread_queues[i].array, memory_order_relaxed); free(array); } free(thread_queues); ``` 以 Valgrind 進行記憶體的檢查,原本在修改前會有 still reachable,根據 [第一次作業關於 Valgrind 的解說](https://hackmd.io/@sysprog/linux2023-lab0/%2F%40sysprog%2Flinux2023-lab0-b),still reachable 指的是在程式結束後有未釋放卻還有指標指著的記憶體 ``` ==79134== LEAK SUMMARY: ==79134== definitely lost: 0 bytes in 0 blocks ==79134== indirectly lost: 0 bytes in 0 blocks ==79134== possibly lost: 0 bytes in 0 blocks ==79134== still reachable: 208 bytes in 3 blocks ==79134== suppressed: 0 bytes in 0 blocks ==79134== Rerun with --leak-check=full to see details of leaked memory ``` 經過上述釋放後,這些記憶體都被順利釋放成功: ``` ==80249== HEAP SUMMARY: ==80249== in use at exit: 0 bytes in 0 blocks ==80249== total heap usage: 39 allocs, 39 frees, 2,368 bytes allocated ==80249== ==80249== All heap blocks were freed -- no leaks are possible ==80249== ==80249== For lists of detected and suppressed errors, rerun with: -s ==80249== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0) ``` :::danger 改進漢語表達。 ::: 再來第二種記憶體釋放問題,我們先觀察原本的 [work-stealing](https://gist.github.com/jserv/111304e7c5061b05d4d29a47571f7a98) 的 main function,可以看到這邊的實作是先將所有的工作都先放進各自執行緒的 deque 之後才執行 `pthread_create`,而且只有 push 函式會使用到 resize,所以這邊實作的情況是不會發生有執行緒在 resize 時,恰好另一執行緒正在 steal 的問題,所以我在 `thread` 函式中塞入工作模擬出某些執行緒在 push 時可能會有另一執行緒正在 steal 的情況。 在模擬好情況後一樣使用 valgrin 檢查看看,可以看到這邊會顯示出 definitely lost,也就是真的記憶體洩漏 ``` ==81221== LEAK SUMMARY: ==81221== definitely lost: 80 bytes in 1 blocks ==81221== indirectly lost: 0 bytes in 0 blocks ==81221== possibly lost: 0 bytes in 0 blocks ==81221== still reachable: 0 bytes in 0 blocks ==81221== suppressed: 0 bytes in 0 blocks ==81221== Rerun with --leak-check=full to see details of leaked memory ``` 為了達到安全釋放 resize 後的舊佇列,在 array_t 的結構體中另外建立一個用來計數的 `ref_count`,並且在每次 resize 時都去判斷此 `ref_count` 是否為一(也就是是否只有自己使用中),若為一則代表可以安全的釋放出舊佇列的記憶體 ```c typedef struct { atomic_size_t size; atomic_size_t ref_count; _Atomic work_t *buffer[]; } array_t; ``` 另外,建立 `refcountadd` 和 `refcountsub` 函式來控制 `ref_count` 的值,假設有 a 執行緒要偷取 b 執行緒的工作時,就將 b 執行緒的 `ref_count` 的值加一,若當時執行緒 b 也正在執行 resize 就會發現 `ref_count` 不為一,所以也就無法立即釋放舊佇列 :::danger 注意書寫規範: * 使用 lab0 規範的程式碼書寫風格,務必用 clang-format 確認一致 * 注意看[程式碼規範](https://hackmd.io/@sysprog/linux2024-lab0-a)的「clang-format 工具和一致的程式撰寫風格」,筆記列出的程式碼依循指定的風格。 ::: ```c void refcountadd(array_t *a) { if (a != NULL) { atomic_fetch_add(&a->ref_count, 1); } } void refcountsub(array_t *a) { if (a != NULL) { atomic_fetch_sub(&a->ref_count, 1); } } ``` 經過調整後可以成功安全釋放記憶體: ``` ==82789== ==82789== HEAP SUMMARY: ==82789== in use at exit: 0 bytes in 0 blocks ==82789== total heap usage: 44 allocs, 44 frees, 2,584 bytes allocated ==82789== ==82789== All heap blocks were freed -- no leaks are possible ==82789== ==82789== For lists of detected and suppressed errors, rerun with: -s ==82789== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0) ``` #### steal 的競爭問題 > [7b189a3](https://github.com/chloe0919/work_steal/commit/7b189a33c5a776b8f85a20aab4082829d967d35a) 在原始的 steal 的實作中是使用 for 迴圈從第一個執行緒開始嘗試偷竊工作,這樣會容易導致多個執行緒都同時讀取同一個 deque 所產生的競爭問題,所以這邊可以改進成不同執行緒從隨機數開始依序的偷取,減少執行緒之間競爭同一個 deque 的問題。 接下來測試執行緒數量由 24 到 1024 個,有無使用隨機數之間的速度,可以看到大部分速度都有提昇 ![random_seq](https://hackmd.io/_uploads/SkqSmGsLR.png) ## TODO: 閱讀指定的論文〈Cilk: An Efficient Multithreaded Runtime System〉並落實 > 摘錄論文關鍵論述、闡述給定測驗題的程式碼和論文的落差、說明如何用 C11 Atomics 實作 deque 並建構足以進行 work-stealing 的並行結構 根據 〈[Cilk: An Efficient Multithreaded Runtime System](http://supertech.csail.mit.edu/papers/PPoPP95.pdf)〉的描述,含有執行程式碼與所需的參數的稱為 closure,每一個處理器都會有自己的 `ready queue`,用來保存已經準備好的 closures,每一個 closure 都會有對應的 level,計算的方法是從 spawn tree 的根節點一直到自己本身所經過的 closure 數量。 ### 論文中相關操作 **spawn & spawn next** 當執行緒執行時,可能會 spawn 子執行緒,若現在有一個位於 level L 的執行緒要 spawn 一個子執行緒 T 則需要執行以下的步驟: 1. 先為 T 分配且初始化 closure 2. 將可用的參數複製到 closure,初始化指向缺失的參數的指標和 `join counter` 3. 將該 closure 標記為 level (L + 1) 4. 如果沒有缺少的參數,則將 closure 放到 `ready queue` 級別為 (L + 1) 的列表的頭部 一樣是產生新的 closure,`spawn` 產生的新 closure 被視為 child,但 `spawn next` 則是被視為 successor,所以如果是 `spawn next` 則是需要將 closure 標記成 level L ![46D90929-62BF-4913-B036-CA6CB94A83BD](https://hackmd.io/_uploads/B1ov7wzwA.jpg) **send argument(k, value)** 當執行緒執行該指令時,代表要傳送參數到其他的執行緒,其中 `k` 是 Cilk language 提供的一種資料型態 (continuation),用來包含指向的 closure 指標和其參數所在的偏移量,而傳送參數的流程為: 1. 利用 continuation `k` 找到對應的 closure 和參數位置 2. 將 `value` 放到對應的參數位置中並且將 `join counter` 減一 3. 檢查 `join counter` 是否為 0,若成立則代表該 closure 可以等待被執行 work stealing 操作過程 : 首先,處理器會先檢查 `ready queue` 是否為空 - [ ] `ready queue` 為空: 代表目前該佇列已經沒有 closure 正在等待執行了,所以可以進行 work stealing 的操作: 1. 隨機選擇一個處理器作為 victim 2. 如果 victim 的 `ready queue` 為空則回到步驟 1 重新選擇 3. 從 victim 的 `ready queue` 中最上層非空 level 中選擇尾部的執行緒並執行 - [ ] `ready queue` 為非空: 執行以下步驟: 1. 移除目前佇列中最深的非空 level 中位於 head 的執行緒 2. 從 closure 提取該執行緒,並且執行 ### 闡述給定測驗題的程式碼和論文的落差 原論文是設計 C-based runtime system 以執行多執行緒並行程式,因此除 work stealing 外,有額外定義其他特殊的資料結構,在本測驗中並沒有使用,不過在測驗中有嘗試實作出論文中提到的 closure,並且以結構體 `wort_t` 進行定義,並使用 `atomic_int` 紀錄參數缺少的數量,若此數為 0 代表該 closure 已經準備好執行。 :::danger 留意 "Modeling performance" 和 "A theoretical analysis of the Cilk scheduler",描述本實作的性能表現。 ::: ## TODO: 實作「平行化」的資料排序程式 > 將上述 work-steal 程式碼應用於「平行化」的資料排序程式的實作,應確保程式碼的正確性和效率。