# Linux 核心專題: 並行程式設計 ## TODO: 閱讀指定的論文〈Cilk: An Efficient Multithreaded Runtime System〉 > 專題解說影片 : [Linux 核心專題: 並行程式設計](https://youtu.be/zfj_0PHP94g) > 根據 〈[Cilk: An Efficient Multithreaded Runtime System](http://supertech.csail.mit.edu/papers/PPoPP95.pdf)〉的描述,每一個處理器都會有自己的 `ready queue`,用來保已經 ready 的 closures,每一個 closure 都會有自己的 level 對應到在 spawn tree 上從 root 一直到 closure 的 spawn 數量。 **spawn & spawn next** 當執行緒執行時,可能會 spawn 子執行緒,若現在有一個位於 level L 的執行緒要 spawn 一個子執行緒 T 則需要執行以下的步驟: 1. 先為 T 分配且初始化 closure 2. 將可用的參數複製到 closure,初始化指向缺失的參數的指標和 `join counter` 3. 將該 closure 標記為 level (L + 1) 4. 如果沒有缺少的參數,則將 closure 放到 `ready queue` 級別為 (L + 1) 的列表的頭部 一樣是產生新的 closure,`spawn` 產生的新 closure 被視為 child,但 `spawn next` 則是被視為 successor,所以如果是 `spawn next` 則是需要將 closure 標記成 level L ```graphviz digraph G { rankdir=LR; node[shape=record]; map [label="T1 |0 |17 | |6"]; node[shape=record]; map1 [label="T2 |1 |42 |1 |"]; node[shape=record]; map2 [label="T3 |0 |4 | |"]; L[label=L,shape=plaintext,fontcolor=blue] LL[label="L+1",shape=plaintext,fontcolor=blue] map:s -> map1:n [label="spawn",style=dashed] map:e -> map2:w [label="spawn next",style=dashed] L->map[color = white] LL->map1[color = white] {rank=same; L; LL;} } ``` **send argument(k, value)** 當執行緒執行該指令時,代表要傳送參數到其他的執行緒,其中 `k` 是 Cilk language 提供的一種資料型態 (continuation),用來包含指向的 closure 指標和其參數所在的偏移量,而傳送參數的流程為: 1. 利用 continuation `k` 找到對應的 closure 和參數位置 2. 將 `value` 放到對應的參數位置中並且將 `join counter` 減一 3. 檢查 `join counter` 是否為 0,若成立則代表該 closure 可以等待被執行 **操作過程 :** 首先,處理器會先檢查 `ready queue` 是否為空 * **`ready queue` 為空**: 代表目前該佇列已經沒有 closure 正在等待執行了,所以可以進行 work stealing 的操作: 1. 隨機選擇一個處理器作為 victim 2. 如果 victim 的 `ready queue` 為空則回到步驟 1 重新選擇 3. 從 victim 的 `ready queue` 中最上層非空 level 中選擇尾部的執行緒並執行 * **`ready queue` 為非空**: 執行以下步驟: 1. 移除目前佇列中最深的非空 level 中位於 head 的執行緒 2. 從 closure 提取該執行緒,並且執行 ## TODO: Quiz 9 測驗三 (work-steal) 含延伸問題 ### 說明如何用 C11 Atomics 實作 deque 並建構足以進行 work-stealing 的並行結構 原論文是設計 C-based runtime system 以執行多執行緒並行程式,因此除 work stealing 外,有額外定義其他特殊的資料結構,在本測驗中並沒有使用 **結構體:** 首先建立 `work_t` 結構體,用來將執行流程拆成多個段落,每個段落都以 `work_t` 進行描述,根據 〈[Cilk: An Efficient Multithreaded Runtime System](http://supertech.csail.mit.edu/papers/PPoPP95.pdf)〉的說明,`work_t` 對應到的也就是論文中提到的資料結構 "closure" ```c typedef struct work_internal { task_t code; atomic_int join_count; void *args[]; } work_t ``` * `code`: 指標,用來指向各執行緒要啟動任務時所要執行的函式,如果準備好執行則回傳下一個執行項目,否則回傳 NULL * `join_count`: 定義為 `atomic` 的整數,用來計算這個 work 還缺少了多少 arguments 才能進行 * `args`: 即 work 執行所需要的 arguments 兩結構體用來定義 `deque` ```c typedef struct { atomic_size_t size; _Atomic work_t *buffer[]; } array_t; typedef struct { /* Assume that they never overflow */ atomic_size_t top, bottom; _Atomic(array_t *) array; } deque_t; ``` * `top/bottom`: 表示佇列中元素的有效範圍 * `array`: 為指向函式 `array_t` 的指標,用來指向整個佇列本體 - [ ] `push` 操作流程主要是先取出 `bottom` 和 `top` 對應位置,並且先判斷佇列是否為 full,若成立則需要 resize 佇列,避免覆蓋到原本的 work,若不為 full 則直接將給定的 work `w` 更新到當前 `bottom` 所指的位置。 另外要確保放入 work `w` 和更新 `bottom` 位置為 `bottom + 1` 這兩個動作是不能被重排的,否則會出現某一執行緒還沒有放入 work,而另一執行緒已處理該位置上工作的問題,因此使用 `atomic_thread_fence(memory_order_release)` 確保前面的指令不會被重排到該指令之後。 ```c atomic_store_explicit(&a->buffer[b % a->size], w, memory_order_relaxed); atomic_thread_fence(memory_order_release); atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed); ``` - [ ] `resize` 擴展佇列是為了解決當佇列已滿時,為避免覆蓋到原始 work,所以重新創造一個新佇列,根據程式碼的註解說明,原始論文中是使用 garbage collector,可以自動釋放許久未訪問到的記憶體空間,但在此例中並沒有使用 garbage collector,但也不能直接釋放 `*a`,會產生其他執行緒正使用舊的佇列到一半被釋放的問題,所以藉由擴展佇列到原本的兩倍,使 memory leakage 是指數成長,可以成功限制 memory leakage 不會無限增長。 ``` /* The question arises as to the appropriate timing for releasing memory * associated with the previous array denoted by *a. In the original Chase * and Lev paper, this task was undertaken by the garbage collector, which * presumably possessed knowledge about ongoing steal operations by other * threads that might attempt to access data within the array. * * In our context, the responsible deallocation of *a cannot occur at this * point, as another thread could potentially be in the process of reading * from it. Thus, we opt to abstain from freeing *a in this context, * resulting in memory leakage. It is worth noting that our expansion * strategy for these queues involves consistent doubling of their size; * this design choice ensures that any leaked memory remains bounded by the * memory actively employed by the functional queues. */ ``` - [ ] `take` `take` 的目的是從執行緒自己的 dequeue 中取得下個要實行的 work,分別取出 bottom 和 top 的位置後,因為有 `atomic_thread_fence` 確保前面程式必須優於後面執行,如果等到確定好 deque 是否為空之後才進行 bottom 的更新,若此時有其他執行緒執行 `steal` 的動作時,就會發生 `take` 和 `steal` 同時進行的競爭問題,反之先更新 bottom 即使事後之後發現 deque 是空的或是 work 已經被 `steal` 掉,只要再復原 bottom 就好。 ```c work_t *take(deque_t *q) { size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed) - 1; array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed); atomic_store_explicit(&q->bottom, b, memory_order_relaxed); atomic_thread_fence(memory_order_seq_cst); ``` 再來,deque 有三種情境 : * `t < b` : 可以直接取走 `b` 位置對應的 work,最後要恢復 bottom 的值(更新為 `b` + 1) * `t == b` : deque 目前只剩一個 entry,此時可能會和 `steal` 發生競爭問題,可以透過 cmpxchg 來判斷是哪種情形 : * 只有 `take` : 判斷 `q->top` 和 `t` 的位置是否相等,若相等則代表目前沒有發生 steal 的競爭問題,所以只需要直接將 `q->top` 更新成 `t + 1` 並返回 true 即可 * 發生 `steal` : `q->top` 和 `t` 的位置不相等,代表目前有其他執行緒正在 `steal`,返回 false 進入 Failed race 將 `x = EMPTY`,表示不需要 `take` 了 最後兩種情況都要恢復 bottom ```c if (t <= b) /* Non-empty queue */ x = atomic_load_explicit(&a->buffer[b % a->size], memory_order_relaxed); if (t == b) { /* Single last element in queue */ if (!atomic_compare_exchange_strong_explicit(&q->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed)) /* Failed race */ x = EMPTY; atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed); } ``` * ` t > b`: deque 為空,需要復原 bottom 以下用圖片說明**只有 `take`** 的操作流程 : 假設目前僅剩一個 entry ```graphviz digraph structs { node [shape=none]; struct1 [label=< <TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0"> <TR> <TD PORT="f0">A[0]</TD> <TD PORT="f1" WIDTH="50" HEIGHT="30" >A[1]</TD> <TD BGCOLOR="lightblue" PORT="f2" WIDTH="50" HEIGHT="30">A[2]</TD> <TD PORT="f3" WIDTH="50" HEIGHT="30">A[3]</TD> <TD PORT="f4" WIDTH="50" HEIGHT="30">...</TD> <TD PORT="f5" WIDTH="50" HEIGHT="30">A[n-1]</TD> <TD PORT="f6" WIDTH="50" HEIGHT="30">A[n]</TD> </TR> </TABLE> >]; { rank="same"; top[label="top",shape=plaintext] bottom[label="bottom",shape=plaintext] } top->struct1:f2 bottom-> struct1:f3; } ``` 首先要取出 bottom - 1 的位置並存在 `b`,之後先更新 bottom ```graphviz digraph structs { node [shape=none]; struct1 [label=< <TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0"> <TR> <TD PORT="f0">A[0]</TD> <TD PORT="f1" WIDTH="50" HEIGHT="30" >A[1]</TD> <TD BGCOLOR="lightblue" PORT="f2" WIDTH="50" HEIGHT="30">A[2]</TD> <TD PORT="f3" WIDTH="50" HEIGHT="30">A[3]</TD> <TD PORT="f4" WIDTH="50" HEIGHT="30">...</TD> <TD PORT="f5" WIDTH="50" HEIGHT="30">A[n-1]</TD> <TD PORT="f6" WIDTH="50" HEIGHT="30">A[n]</TD> </TR> </TABLE> >]; top[label="top",shape=plaintext] bottom[label="bottom",shape=plaintext] top->struct1:f2 bottom-> struct1:f2; } ``` 之後判斷目前沒有發生 steal 的競爭問題,所以需要將 `q->top` 更新成 `t + 1`,並且恢復 bottom 的值,最後取走 `b` 位置上的 work ```graphviz digraph structs { node [shape=none]; struct1 [label=< <TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0"> <TR> <TD PORT="f0">A[0]</TD> <TD PORT="f1" WIDTH="50" HEIGHT="30" >A[1]</TD> <TD BGCOLOR="lightblue" PORT="f2" WIDTH="50" HEIGHT="30">A[2]</TD> <TD PORT="f3" WIDTH="50" HEIGHT="30">A[3]</TD> <TD PORT="f4" WIDTH="50" HEIGHT="30">...</TD> <TD PORT="f5" WIDTH="50" HEIGHT="30">A[n-1]</TD> <TD PORT="f6" WIDTH="50" HEIGHT="30">A[n]</TD> </TR> </TABLE> >]; top[label="top",shape=plaintext] bottom[label="bottom",shape=plaintext] k[label="k",shape=plaintext, fontcolor = white] top->struct1:f3 bottom-> struct1:f3; k-> struct1:f2[color = red, dir=back]; } ``` - [ ] `steal` `atomic_thread_fence` 保證需先取 `top` 再去取得 `bottom`,防止在這兩個讀取之間有其他內存操作的指令被重排在此 ```c size_t t = atomic_load_explicit(&q->top, memory_order_acquire); atomic_thread_fence(memory_order_seq_cst); size_t b = atomic_load_explicit(&q->bottom, memory_order_acquire); ``` 再來若是 `t < b` 則代表有 work 可以 steal,不過還是要透過 cmpxchg 去和 take 競爭,這邊的操作和 `take` 類似,若發生 `take` 則返回 false 進入 Failed race,代表 steal 失敗 ```c work_t *x = EMPTY; if (t < b) { /* Non-empty queue */ array_t *a = atomic_load_explicit(&q->array, memory_order_consume); x = atomic_load_explicit(&a->buffer[t % a->size], memory_order_relaxed); if (!atomic_compare_exchange_strong_explicit( &q->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed)) /* Failed race */ return ABORT; } ``` - [ ] `thread` 首先,先嘗試在自己的 deque 中執行 `take`,確定是否有 work 可以執行,若沒有就會去其他執行緒的 deque 中尋找可以 steal 的 work,其中如果 `i == id` 就代表是自己 deque,必須跳過到下一個找 當 `stolen` 為 `ABORT` 時,代表當時正有其他執行緒在 `take`,所以 steal 的行為才會失敗,不代表此執行緒的 deque 不能 steal,所以這時將 `i--`,重複回到這個 deque 再嘗試一次 steal ```c /* Currently, there is no work present in my own queue */ work_t *stolen = EMPTY; for (int i = 0; i < N_THREADS; ++i) { if (i == id) continue; stolen = steal(&thread_queues[i]); if (stolen == ABORT) { i--; continue; /* Try again at the same i */ } else if (stolen == EMPTY) continue; /* Found some work to do */ break; } ``` 如果 stolen 不為空代表有 steal 到,則執行該 work,若為空也就是看了一輪都沒有發現可偷的 work,但仍不代表所有 work 都完成,所以額外建立一個 work `done_work`,當其他的 work 完成時就會將 `done_work` 的 `join_count` 減一,直到 `join_count` 歸零時則會執行 `done_task` 函式,將 `done` 設為 true,最後檢查 `done` 看是否所有 work 皆已完成 ```c work_t *done_task(work_t *w) { free(w); atomic_store(&done, true); return NULL; } ``` ### 改進原始實作 先檢查看看原始 work steal 的策略是否有確實減少執行時間,以下為使用 `clock()` 對原始 work steal 程式和未使用 work steal 的策略進行時間的測試,其中仿照 `thread` 的內容定義一個函式,每個 thread 都只完成自己 deque 的工作而不進行 steal ```c void *thread_nosteal(void *payload) { int id = *(int *)payload; deque_t *my_queue = &thread_queues[id]; while (true) { work_t *work = take(my_queue); if (work != EMPTY) { do_work(id, work); } else { if (atomic_load(&done)) break; } } // printf("Thread %d finished\n", id); return NULL; } ``` 可以看到 thread 數量由 24 到 100 個,使用 work steal 策略的表現大致上比沒有使用來的好 ![compare_100](https://hackmd.io/_uploads/S1ZtmrEHR.png) #### 記憶體釋放 > [900c826](https://github.com/chloe0919/work_steal/commit/900c826df821e0bfe3718b8469848bac65bb3101) > 我們要處理的記憶體釋放問題包括下面兩種: 1. 所有程式執行完後執行緒的 deque 沒有完全釋放乾淨 2. 在 resize 的函式中為了避免釋放舊的 deque 而導致目前有其他執行緒正在偷取工作時造成錯誤,所以當 deque 滿了而 resize 並不會釋放任何空間 首先第一種記憶體釋放可以在所有執行緒都結束工作時,再將他們釋放: ```c for (int i = 0; i < N_THREADS; ++i){ array_t *array = atomic_load_explicit(&thread_queues[i].array, memory_order_relaxed); free(array); } free(thread_queues); ``` 以 Valgrind 進行記憶體的檢查,原本在修改前會有 still reachable,根據 [Valgrind-Hackmd](https://hackmd.io/@sysprog/linux2023-lab0/%2F%40sysprog%2Flinux2023-lab0-b) 的說明,still reachable 指的是在程式結束後有未釋放卻還有指標指著的記憶體 ``` ==79134== LEAK SUMMARY: ==79134== definitely lost: 0 bytes in 0 blocks ==79134== indirectly lost: 0 bytes in 0 blocks ==79134== possibly lost: 0 bytes in 0 blocks ==79134== still reachable: 208 bytes in 3 blocks ==79134== suppressed: 0 bytes in 0 blocks ==79134== Rerun with --leak-check=full to see details of leaked memory ``` 經過上述釋放後,這些記憶體都被順利釋放成功: ``` ==80249== HEAP SUMMARY: ==80249== in use at exit: 0 bytes in 0 blocks ==80249== total heap usage: 39 allocs, 39 frees, 2,368 bytes allocated ==80249== ==80249== All heap blocks were freed -- no leaks are possible ==80249== ==80249== For lists of detected and suppressed errors, rerun with: -s ==80249== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0) ``` 再來第二種記憶體釋放問題,我們先觀察原本的 [work-stealing](https://gist.github.com/jserv/111304e7c5061b05d4d29a47571f7a98) 的 main function,可以看到這邊的實作是先將所有的工作都先放進各自執行緒的 deque 之後才執行 `pthread_create`,而且只有 push 函式會使用到 resize,所以如果要假設出**此執行緒在 resize 的過程中,恰好其他執行緒正使用舊的佇列而無法釋放導致的記憶體洩漏問題**,需要在 `thread` 函式中塞入工作使得某些執行緒在 push 時可能會有另一執行緒正在 steal 的情況 在模擬好情況後一樣使用 valgrin 檢查看看,可以看到這邊會顯示出 definitely lost,也就是真的記憶體洩漏 ``` ==81221== LEAK SUMMARY: ==81221== definitely lost: 80 bytes in 1 blocks ==81221== indirectly lost: 0 bytes in 0 blocks ==81221== possibly lost: 0 bytes in 0 blocks ==81221== still reachable: 0 bytes in 0 blocks ==81221== suppressed: 0 bytes in 0 blocks ==81221== Rerun with --leak-check=full to see details of leaked memory ``` 為了達到安全釋放 resize 後的舊佇列,在 array_t 的結構體中另外建立一個用來計數的 `ref_count`,並且在每次 resize 時都去判斷此 `ref_count` 是否為一(也就是是否只有自己使用中),若為一則代表可以安全的釋放出舊佇列的記憶體 ```c typedef struct { atomic_size_t size; atomic_size_t ref_count; _Atomic work_t *buffer[]; } array_t; ``` 另外,建立 `refcountadd` 和 `refcountsub` 兩函式來控制 `ref_count` 的值,假設有 a 執行緒要偷取 b 執行緒的工作時,就將 b 執行緒的 `ref_count` 的值加一,若當時執行緒 b 也正在執行 resize 就會發現 `ref_count` 不為一,所以也就無法立即釋放舊佇列 ```c void refcountadd(array_t *a) { if (a != NULL) { atomic_fetch_add(&a->ref_count, 1); } } void refcountsub(array_t *a) { if (a != NULL) { atomic_fetch_sub(&a->ref_count, 1); } } ``` 經過調整後可以成功安全釋放記憶體: ``` ==82789== ==82789== HEAP SUMMARY: ==82789== in use at exit: 0 bytes in 0 blocks ==82789== total heap usage: 44 allocs, 44 frees, 2,584 bytes allocated ==82789== ==82789== All heap blocks were freed -- no leaks are possible ==82789== ==82789== For lists of detected and suppressed errors, rerun with: -s ==82789== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0) ``` #### steal 的競爭問題 > [7b189a3](https://github.com/chloe0919/work_steal/commit/7b189a33c5a776b8f85a20aab4082829d967d35a) > 在原始的 steal 的實作中是使用 for 迴圈從第一個執行緒開始嘗試偷竊工作,這樣會容易導致多個執行緒都同時讀取同一個 deque 所產生的競爭問題,所以這邊可以改進成不同執行緒從隨機數開始依序的偷取,減少執行緒之間競爭同一個 deque 的問題 接下來測試執行緒數量由 24 到 1024 個,有無使用隨機數之間的速度,可以看到大部分速度都有提昇 ![random_seq](https://hackmd.io/_uploads/SkqSmGsLR.png) ## TODO: 實作「平行化」的資料排序程式 參考至 [RinHizakura](https://github.com/RinHizakura/linux-summer-2023/tree/main/hw3/work-steal-qsort) 將快速排序融合進 work steal 並行結構中,但是實作過程中會出現 segmentation fault,並使用 GDB 除錯後發現是 `do_work` 操作中存取了無效的記憶體空間,但目前仍改不掉這個 bug。