# Linux 核心專題: 並行工作佇列的設計和應用
> 執行人: chloe0919
> [專題解說影片](https://www.youtube.com/watch?v=zfj_0PHP94g)
### Reviewed by `yy214123`
考慮到佇列僅剩最後一個元素的情況,我想詢問在 `take` 及 `steal` 操作中, `q->top` 及 `t` 這兩個變數的變化與 `cmpxchg` 的運作機制是如何搭配的?
```c
if (!atomic_compare_exchange_strong_explicit(
&q->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed))
/* Failed race */
x = EMPTY;
```
> [name=Chloexyw] 以 `take` 為例,在佇列僅剩一個元素時會利用 `cmpxchg` 對 `q->top` 和 `t` 的位置進行判斷
> 1. 如果此時這兩個位置**相同**,則代表沒有其他執行緒在 steal,所以 `cmpxchg` 會將 `&q->top` 更新成 `t + 1` 並且回傳 true
> 2. 這兩個位置**不相同**,代表有其他執行緒在 steal 所以改變了 `&q->top` 的值,導致此值和 `take` 剛開始前索取出保存在 `t` 的不同,所以此時 `cmpxchg` 則不會更新 `&q->top` (已經被提早更新了),並且回傳 false 進入 if 判斷中將 x 設為 EMPTY,代表 `take` 失敗
### Reviewed by `youjiaw`
在 [steal 的競爭問題](https://hackmd.io/XAECMfsbRSeDUbBZzRdC2A?both#steal-%E7%9A%84%E7%AB%B6%E7%88%AD%E5%95%8F%E9%A1%8C),我認為用隨機數減少執行緒之間的競爭是很好的方式,但是我很好奇測試結果中,少數不使用隨機數的執行時間反而比較少的情況,是由什麼原因造成?
### Reviewed by `kkkkk1109`
> 在模擬好情況後一樣使用 ~~valgrin~~ Valgrind 檢查看看,可以看到這邊會顯示出 definitely lost,也就是真的記憶體洩漏
這裡有錯字
## 任務簡介
延伸第九週測驗三 (work-steal),搭配閱讀指定的論文〈Cilk: An Efficient Multithreaded Runtime System〉,說明如何用 C11 Atomics 實作 deque 並建構足以進行 work-stealing 的並行結構,最終應用於平行化的資料排序實作中。
## TODO: 重作第九週測驗三及延伸問題
> 不用抄寫參考題解,專注於 work-steal 程式碼本身。應進行所有指定的延伸問題。
**結構體:**
首先建立 `work_t` 結構體,用來將執行流程拆成多個段落,每個段落都以 `work_t` 進行描述,根據 〈[Cilk: An Efficient Multithreaded Runtime System](http://supertech.csail.mit.edu/papers/PPoPP95.pdf)〉的說明,`work_t` 對應到的也就是論文中提到的資料結構 "closure"
```c
typedef struct work_internal {
task_t code;
atomic_int join_count;
void *args[];
} work_t
```
* `code`: 指標,用來指向各執行緒要啟動任務時所要執行的函式,如果準備好執行則回傳下一個執行項目,否則回傳 NULL
* `join_count`: 定義為 `atomic` 的整數,用來計算這個 work 還缺少了多少 arguments 才能進行
* `args`: 即 work 執行所需要的 arguments
兩結構體用來定義 `deque`
```c
typedef struct {
atomic_size_t size;
_Atomic work_t *buffer[];
} array_t;
typedef struct {
/* Assume that they never overflow */
atomic_size_t top, bottom;
_Atomic(array_t *) array;
} deque_t;
```
* `top/bottom`: 表示佇列中元素的有效範圍
* `array`: 為指向函式 `array_t` 的指標,用來指向整個佇列本體
- [ ] `push`
操作流程主要是先取出 `bottom` 和 `top` 對應位置,並且先判斷佇列是否為 full,若成立則需要 resize 佇列,避免覆蓋到原本的 work,若不為 full 則直接將給定的 work `w` 更新到當前 `bottom` 所指的位置。
另外要確保放入 work `w` 和更新 `bottom` 位置為 `bottom + 1` 這兩個動作是不能被重排的,否則會出現某一執行緒還沒有放入 work,而另一執行緒已處理該位置上工作的問題,因此使用 `atomic_thread_fence(memory_order_release)` 確保前面的指令不會被重排到該指令之後。
```c
atomic_store_explicit(&a->buffer[b % a->size], w, memory_order_relaxed);
atomic_thread_fence(memory_order_release);
atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed);
```
- [ ] `resize`
擴展佇列是為了解決當佇列已滿時,為避免覆蓋到原始 work,所以重新創造一個新佇列,根據程式碼的註解說明,原始論文中是使用 garbage collector,可以自動釋放許久未訪問到的記憶體空間,但在此例中並沒有使用 garbage collector,但也不能直接釋放 `*a`,會產生其他執行緒正使用舊的佇列到一半被釋放的問題,所以藉由擴展佇列到原本的兩倍,使 memory leakage 是指數成長,可以成功限制 memory leakage 不會無限增長。
```
/* The question arises as to the appropriate timing for releasing memory
* associated with the previous array denoted by *a. In the original Chase
* and Lev paper, this task was undertaken by the garbage collector, which
* presumably possessed knowledge about ongoing steal operations by other
* threads that might attempt to access data within the array.
*
* In our context, the responsible deallocation of *a cannot occur at this
* point, as another thread could potentially be in the process of reading
* from it. Thus, we opt to abstain from freeing *a in this context,
* resulting in memory leakage. It is worth noting that our expansion
* strategy for these queues involves consistent doubling of their size;
* this design choice ensures that any leaked memory remains bounded by the
* memory actively employed by the functional queues.
*/
```
- [ ] `take`
`take` 的目的是從執行緒自己的 dequeue 中取得下個要實行的 work,分別取出 bottom 和 top 的位置後,因為有 `atomic_thread_fence` 確保前面程式必須優於後面執行,如果等到確定好 deque 是否為空之後才進行 bottom 的更新,若此時有其他執行緒執行 `steal` 的動作時,就會發生 `take` 和 `steal` 同時進行的競爭問題,反之先更新 bottom 即使事後之後發現 deque 是空的或是 work 已經被 `steal` 掉,只要再復原 bottom 就好。
```c
work_t *take(deque_t *q)
{
size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed) - 1;
array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed);
atomic_store_explicit(&q->bottom, b, memory_order_relaxed);
atomic_thread_fence(memory_order_seq_cst);
```
再來,deque 有三種情境 :
* `t < b` : 可以直接取走 `b` 位置對應的 work,最後要恢復 bottom 的值(更新為 `b` + 1)
* `t == b` : deque 目前只剩一個 entry,此時可能會和 `steal` 發生競爭問題,可以透過 cmpxchg 來判斷是哪種情形 :
* 只有 `take` : 判斷 `q->top` 和 `t` 的位置是否相等,若相等則代表目前沒有發生 steal 的競爭問題,所以只需要直接將 `q->top` 更新成 `t + 1` 並返回 true 即可
* 發生 `steal` : `q->top` 和 `t` 的位置不相等,代表目前有其他執行緒正在 `steal`,返回 false 進入 Failed race 將 `x = EMPTY`,表示不需要 `take` 了
最後兩種情況都要恢復 bottom
```c
if (t <= b) {
/* Non-empty queue */
x = (work_t *)atomic_load_explicit(&a->buffer[b % a->size],
memory_order_relaxed);
if (t == b) {
/* Single last element in queue */
if (!atomic_compare_exchange_strong_explicit(
&q->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed))
/* Failed race */
x = EMPTY;
atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed);
}
```
* ` t > b`: deque 為空,需要復原 bottom
以下用圖片說明**只有 `take`** 的操作流程 :
假設目前僅剩一個 entry
```graphviz
digraph structs {
node [shape=none];
struct1 [label=<
<TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0">
<TR>
<TD PORT="f0">A[0]</TD>
<TD PORT="f1" WIDTH="50" HEIGHT="30" >A[1]</TD>
<TD BGCOLOR="lightblue" PORT="f2" WIDTH="50" HEIGHT="30">A[2]</TD>
<TD PORT="f3" WIDTH="50" HEIGHT="30">A[3]</TD>
<TD PORT="f4" WIDTH="50" HEIGHT="30">...</TD>
<TD PORT="f5" WIDTH="50" HEIGHT="30">A[n-1]</TD>
<TD PORT="f6" WIDTH="50" HEIGHT="30">A[n]</TD>
</TR>
</TABLE>
>];
{
rank="same";
top[label="top",shape=plaintext]
bottom[label="bottom",shape=plaintext]
}
top->struct1:f2
bottom-> struct1:f3;
}
```
首先要取出 bottom - 1 的位置並存在 `b`,之後先更新 bottom
```graphviz
digraph structs {
node [shape=none];
struct1 [label=<
<TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0">
<TR>
<TD PORT="f0">A[0]</TD>
<TD PORT="f1" WIDTH="50" HEIGHT="30" >A[1]</TD>
<TD BGCOLOR="lightblue" PORT="f2" WIDTH="50" HEIGHT="30">A[2]</TD>
<TD PORT="f3" WIDTH="50" HEIGHT="30">A[3]</TD>
<TD PORT="f4" WIDTH="50" HEIGHT="30">...</TD>
<TD PORT="f5" WIDTH="50" HEIGHT="30">A[n-1]</TD>
<TD PORT="f6" WIDTH="50" HEIGHT="30">A[n]</TD>
</TR>
</TABLE>
>];
top[label="top",shape=plaintext]
bottom[label="bottom",shape=plaintext]
top->struct1:f2
bottom-> struct1:f2;
}
```
之後判斷目前沒有發生 steal 的競爭問題,所以需要將 `q->top` 更新成 `t + 1`,並且恢復 bottom 的值,最後取走 `b` 位置上的 work
```graphviz
digraph structs {
node [shape=none];
struct1 [label=<
<TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0">
<TR>
<TD PORT="f0">A[0]</TD>
<TD PORT="f1" WIDTH="50" HEIGHT="30" >A[1]</TD>
<TD BGCOLOR="lightblue" PORT="f2" WIDTH="50" HEIGHT="30">A[2]</TD>
<TD PORT="f3" WIDTH="50" HEIGHT="30">A[3]</TD>
<TD PORT="f4" WIDTH="50" HEIGHT="30">...</TD>
<TD PORT="f5" WIDTH="50" HEIGHT="30">A[n-1]</TD>
<TD PORT="f6" WIDTH="50" HEIGHT="30">A[n]</TD>
</TR>
</TABLE>
>];
top[label="top",shape=plaintext]
bottom[label="bottom",shape=plaintext]
k[label="k",shape=plaintext, fontcolor = white]
top->struct1:f3
bottom-> struct1:f3;
k-> struct1:f2[color = red, dir=back];
}
```
- [ ] `steal`
:::danger
注意用語,務必使用本課程教材規範的術語。
:::
`atomic_thread_fence` 保證需先取 `top` 再去取得 `bottom`,防止在這兩個讀取之間有其他<s>內存</s> 記憶體操作的指令被重排在此
```c
size_t t = atomic_load_explicit(&q->top, memory_order_acquire);
atomic_thread_fence(memory_order_seq_cst);
size_t b = atomic_load_explicit(&q->bottom, memory_order_acquire);
```
再來若是 `t < b` 則代表有 work 可以 steal,不過還是要透過 cmpxchg 去和 take 競爭,這邊的操作和 `take` 類似,若發生 `take` 則返回 false 進入 Failed race,代表 steal 失敗
```c
work_t *x = EMPTY;
if (t < b) {
/* Non-empty queue */
array_t *a = atomic_load_explicit(&q->array, memory_order_consume);
x = (work_t *)atomic_load_explicit(&a->buffer[t % a->size],
memory_order_relaxed);
if (!atomic_compare_exchange_strong_explicit(
&q->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed)) {
/* Failed race */
return ABORT;
}
```
- [ ] `thread`
首先,先嘗試在自己的 deque 中執行 `take`,確定是否有 work 可以執行,若沒有就會去其他執行緒的 deque 中尋找可以 steal 的 work,其中如果 `i == id` 就代表是自己 deque,必須跳過到下一個找
當 `stolen` 為 `ABORT` 時,代表當時正有其他執行緒在 `take`,所以 steal 的行為才會失敗,不代表此執行緒的 deque 不能 steal,所以這時將 `i--`,重複回到這個 deque 再嘗試一次 steal
```c
/* Currently, there is no work present in my own queue */
work_t *stolen = EMPTY;
for (int i = 0; i < N_THREADS; ++i) {
if (i == id)
continue;
stolen = steal(&thread_queues[i]);
if (stolen == ABORT) {
i--;
continue; /* Try again at the same i */
} else if (stolen == EMPTY)
continue;
/* Found some work to do */
break;
}
```
如果 stolen 不為空代表有 steal 到,則執行該 work,若為空也就是看了一輪都沒有發現可偷的 work,但仍不代表所有 work 都完成,所以額外建立一個 work `done_work`,當其他的 work 完成時就會將 `done_work` 的 `join_count` 減一,直到 `join_count` 歸零時則會執行 `done_task` 函式,將 `done` 設為 true,最後檢查 `done` 看是否所有 work 皆已完成
```c
work_t *done_task(work_t *w)
{
free(w);
atomic_store(&done, true);
return NULL;
}
```
### 改進原始實作
先檢查看看原始 work steal 的策略是否有確實減少執行時間,以下為使用 `clock()` 對原始 work steal 程式和未使用 work steal 的策略進行時間的測試,其中仿照 `thread` 的內容定義一個函式,每個 thread 都只完成自己 deque 的工作而不進行 steal
:::danger
注意書寫規範:
* 使用 lab0 規範的程式碼書寫風格,務必用 clang-format 確認一致
>[name=Chloexyw] <s>好的,已修改</s>
>不!你沒改,注意看[程式碼規範](https://hackmd.io/@sysprog/linux2024-lab0-a)的「clang-format 工具和一致的程式撰寫風格」,筆記列出的程式碼依循指定的風格。
:::
```c
void *thread_nosteal(void *payload) {
int id = *(int *)payload;
deque_t *my_queue = &thread_queues[id];
while (true) {
work_t *work = take(my_queue);
if (work != EMPTY) {
do_work(id, work);
} else {
if (atomic_load(&done))
break;
}
}
// printf("Thread %d finished\n", id);
return NULL;
}
```
可以看到 thread 數量由 24 到 100 個,使用 work steal 策略的表現大致上比沒有使用來的好
![compare_100](https://hackmd.io/_uploads/S1ZtmrEHR.png)
#### 記憶體釋放
> [900c826](https://github.com/chloe0919/work_steal/commit/900c826df821e0bfe3718b8469848bac65bb3101)
我們要處理的記憶體釋放問題包括下面兩種:
1. 所有程式執行完後執行緒的 deque 沒有完全釋放乾淨
2. 在 resize 的函式中為了避免釋放舊的 deque 而導致目前有其他執行緒正在偷取工作時造成錯誤,所以當 deque 滿了而 resize 並不會釋放任何空間
首先第一種記憶體釋放可以在所有執行緒都結束工作時,再將他們釋放:
:::danger
注意書寫規範:
* 使用 lab0 規範的程式碼書寫風格,務必用 clang-format 確認一致
>[name=Chloexyw] <s>好的,已修改</s>
> 不!你沒有!筆記列出的程式碼都要嚴格依循規範,注意細節。
:::
```c
for (int i = 0; i < N_THREADS; ++i){
array_t *array = atomic_load_explicit(&thread_queues[i].array, memory_order_relaxed);
free(array);
}
free(thread_queues);
```
以 Valgrind 進行記憶體的檢查,原本在修改前會有 still reachable,根據 [第一次作業關於 Valgrind 的解說](https://hackmd.io/@sysprog/linux2023-lab0/%2F%40sysprog%2Flinux2023-lab0-b),still reachable 指的是在程式結束後有未釋放卻還有指標指著的記憶體
```
==79134== LEAK SUMMARY:
==79134== definitely lost: 0 bytes in 0 blocks
==79134== indirectly lost: 0 bytes in 0 blocks
==79134== possibly lost: 0 bytes in 0 blocks
==79134== still reachable: 208 bytes in 3 blocks
==79134== suppressed: 0 bytes in 0 blocks
==79134== Rerun with --leak-check=full to see details of leaked memory
```
經過上述釋放後,這些記憶體都被順利釋放成功:
```
==80249== HEAP SUMMARY:
==80249== in use at exit: 0 bytes in 0 blocks
==80249== total heap usage: 39 allocs, 39 frees, 2,368 bytes allocated
==80249==
==80249== All heap blocks were freed -- no leaks are possible
==80249==
==80249== For lists of detected and suppressed errors, rerun with: -s
==80249== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
```
:::danger
改進漢語表達。
:::
再來第二種記憶體釋放問題,我們先觀察原本的 [work-stealing](https://gist.github.com/jserv/111304e7c5061b05d4d29a47571f7a98) 的 main function,可以看到這邊的實作是先將所有的工作都先放進各自執行緒的 deque 之後才執行 `pthread_create`,而且只有 push 函式會使用到 resize,所以這邊實作的情況是不會發生有執行緒在 resize 時,恰好另一執行緒正在 steal 的問題,所以我在 `thread` 函式中塞入工作模擬出某些執行緒在 push 時可能會有另一執行緒正在 steal 的情況。
在模擬好情況後一樣使用 valgrin 檢查看看,可以看到這邊會顯示出 definitely lost,也就是真的記憶體洩漏
```
==81221== LEAK SUMMARY:
==81221== definitely lost: 80 bytes in 1 blocks
==81221== indirectly lost: 0 bytes in 0 blocks
==81221== possibly lost: 0 bytes in 0 blocks
==81221== still reachable: 0 bytes in 0 blocks
==81221== suppressed: 0 bytes in 0 blocks
==81221== Rerun with --leak-check=full to see details of leaked memory
```
為了達到安全釋放 resize 後的舊佇列,在 array_t 的結構體中另外建立一個用來計數的 `ref_count`,並且在每次 resize 時都去判斷此 `ref_count` 是否為一(也就是是否只有自己使用中),若為一則代表可以安全的釋放出舊佇列的記憶體
```c
typedef struct {
atomic_size_t size;
atomic_size_t ref_count;
_Atomic work_t *buffer[];
} array_t;
```
另外,建立 `refcountadd` 和 `refcountsub` 函式來控制 `ref_count` 的值,假設有 a 執行緒要偷取 b 執行緒的工作時,就將 b 執行緒的 `ref_count` 的值加一,若當時執行緒 b 也正在執行 resize 就會發現 `ref_count` 不為一,所以也就無法立即釋放舊佇列
:::danger
注意書寫規範:
* 使用 lab0 規範的程式碼書寫風格,務必用 clang-format 確認一致
* 注意看[程式碼規範](https://hackmd.io/@sysprog/linux2024-lab0-a)的「clang-format 工具和一致的程式撰寫風格」,筆記列出的程式碼依循指定的風格。
:::
```c
void refcountadd(array_t *a) {
if (a != NULL) {
atomic_fetch_add(&a->ref_count, 1);
}
}
void refcountsub(array_t *a) {
if (a != NULL) {
atomic_fetch_sub(&a->ref_count, 1);
}
}
```
經過調整後可以成功安全釋放記憶體:
```
==82789==
==82789== HEAP SUMMARY:
==82789== in use at exit: 0 bytes in 0 blocks
==82789== total heap usage: 44 allocs, 44 frees, 2,584 bytes allocated
==82789==
==82789== All heap blocks were freed -- no leaks are possible
==82789==
==82789== For lists of detected and suppressed errors, rerun with: -s
==82789== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
```
#### steal 的競爭問題
> [7b189a3](https://github.com/chloe0919/work_steal/commit/7b189a33c5a776b8f85a20aab4082829d967d35a)
在原始的 steal 的實作中是使用 for 迴圈從第一個執行緒開始嘗試偷竊工作,這樣會容易導致多個執行緒都同時讀取同一個 deque 所產生的競爭問題,所以這邊可以改進成不同執行緒從隨機數開始依序的偷取,減少執行緒之間競爭同一個 deque 的問題。
接下來測試執行緒數量由 24 到 1024 個,有無使用隨機數之間的速度,可以看到大部分速度都有提昇
![random_seq](https://hackmd.io/_uploads/SkqSmGsLR.png)
## TODO: 閱讀指定的論文〈Cilk: An Efficient Multithreaded Runtime System〉並落實
> 摘錄論文關鍵論述、闡述給定測驗題的程式碼和論文的落差、說明如何用 C11 Atomics 實作 deque 並建構足以進行 work-stealing 的並行結構
根據 〈[Cilk: An Efficient Multithreaded Runtime System](http://supertech.csail.mit.edu/papers/PPoPP95.pdf)〉的描述,含有執行程式碼與所需的參數的稱為 closure,每一個處理器都會有自己的 `ready queue`,用來保存已經準備好的 closures,每一個 closure 都會有對應的 level,計算的方法是從 spawn tree 的根節點一直到自己本身所經過的 closure 數量。
### 論文中相關操作
**spawn & spawn next**
當執行緒執行時,可能會 spawn 子執行緒,若現在有一個位於 level L 的執行緒要 spawn 一個子執行緒 T 則需要執行以下的步驟:
1. 先為 T 分配且初始化 closure
2. 將可用的參數複製到 closure,初始化指向缺失的參數的指標和 `join counter`
3. 將該 closure 標記為 level (L + 1)
4. 如果沒有缺少的參數,則將 closure 放到 `ready queue` 級別為 (L + 1) 的列表的頭部
一樣是產生新的 closure,`spawn` 產生的新 closure 被視為 child,但 `spawn next` 則是被視為 successor,所以如果是 `spawn next` 則是需要將 closure 標記成 level L
![46D90929-62BF-4913-B036-CA6CB94A83BD](https://hackmd.io/_uploads/B1ov7wzwA.jpg)
**send argument(k, value)**
當執行緒執行該指令時,代表要傳送參數到其他的執行緒,其中 `k` 是 Cilk language 提供的一種資料型態 (continuation),用來包含指向的 closure 指標和其參數所在的偏移量,而傳送參數的流程為:
1. 利用 continuation `k` 找到對應的 closure 和參數位置
2. 將 `value` 放到對應的參數位置中並且將 `join counter` 減一
3. 檢查 `join counter` 是否為 0,若成立則代表該 closure 可以等待被執行
work stealing 操作過程 :
首先,處理器會先檢查 `ready queue` 是否為空
- [ ] `ready queue` 為空:
代表目前該佇列已經沒有 closure 正在等待執行了,所以可以進行 work stealing 的操作:
1. 隨機選擇一個處理器作為 victim
2. 如果 victim 的 `ready queue` 為空則回到步驟 1 重新選擇
3. 從 victim 的 `ready queue` 中最上層非空 level 中選擇尾部的執行緒並執行
- [ ] `ready queue` 為非空:
執行以下步驟:
1. 移除目前佇列中最深的非空 level 中位於 head 的執行緒
2. 從 closure 提取該執行緒,並且執行
### 闡述給定測驗題的程式碼和論文的落差
原論文是設計 C-based runtime system 以執行多執行緒並行程式,因此除 work stealing 外,有額外定義其他特殊的資料結構,在本測驗中並沒有使用,不過在測驗中有嘗試實作出論文中提到的 closure,並且以結構體 `wort_t` 進行定義,並使用 `atomic_int` 紀錄參數缺少的數量,若此數為 0 代表該 closure 已經準備好執行。
:::danger
留意 "Modeling performance" 和 "A theoretical analysis of the Cilk scheduler",描述本實作的性能表現。
:::
## TODO: 實作「平行化」的資料排序程式
> 將上述 work-steal 程式碼應用於「平行化」的資料排序程式的實作,應確保程式碼的正確性和效率。