# linux2023: RinHizakura - 作業 3
> * [2023 年暑期 Linux 核心課程第 2 次測驗題](https://hackmd.io/@sysprog/linux2023-summer-quiz2)
> * [2023 年暑期 Linux 核心課程第 3 次作業](https://hackmd.io/@sysprog/linux2023-summer-quiz3)
> * [linux-summer-2023: hw3](https://github.com/RinHizakura/linux-summer-2023/tree/main/hw3)
## 測驗 1 / 挑戰 1
> 參考論文: [Cilk: An Efficient Multithreaded Runtime System](http://supertech.csail.mit.edu/papers/PPoPP95.pdf)
需注意到原論文是以設計 C-runtime 為出發點,因此除 work stealing 特徵外,有額外定義特殊的 datatype "*continuation*",或者為了以 [DAG](https://en.wikipedia.org/wiki/Directed_acyclic_graph) 抽象程式的執行邏輯也提供相應的 API,這些在本測驗題中並未承襲。
### 程式碼運作原理
#### 資料結構
```cpp
typedef struct work_internal {
task_t code;
atomic_int join_count;
void *args[];
} work_t;
```

我們可以將一段程式碼的執行流程拆成多個段落,循序或是平行(若不影響正確性)的運行它們。則每個段落我們稱之為 work,這是以 `work_t` 描述的。對照論文的敘述該資料結構也稱為 "closure",其中的成員包含:
* `code`: 各執行緒要啟動該任務時所要執行的函式之指標,且輸入至該函式的參數是 `work_t` 本身的 reference
* `join_count`: 用來計算這個 work 還缺少了多少 arguments 才得以進行
* `args`: 即 work 執行所需要的 arguments
根據論文,如果 clousre 已具備所有執行需要的參數,則為 ready closure,否則為 waiting closure。
```cpp
typedef struct {
atomic_size_t size;
_Atomic work_t *buffer[];
} array_t;
typedef struct {
/* Assume that they never overflow */
atomic_size_t top, bottom;
_Atomic(array_t *) array;
} deque_t;
```
可以建立多個執行緒來並行式的完成 work。則管理上,每個執行緒各自維護一個 [Double-ended queue(deque)](https://en.wikipedia.org/wiki/Double-ended_queue),通過 queue 讓 work 可以加入到其中一個執行緒中運行。而之所以需要 double-ended 則與 work stealing 的需求和新的 work 被 spwan 且加入 deque 的模式有關,這後續會再詳細說明。
```graphviz
digraph {
top [label="top", color=white];
bottom [label="bottom", color=white];
node [shape=record, fontcolor=black, fontsize=14, width=4.75, fixedsize=true];
values [label="<f0> A[0] | <f1> A[1] | <f2> A[2] | <f3> ...... | <f4> A[n - 2] | <f5> A[n - 1]", color=blue, fillcolor=lightblue, style=filled];
edge [color=blue];
top -> values:f1
bottom -> values:f4
}
```
deque 的結構很簡單,僅僅包含了 queue 本體和 top/bottom 來表示 queue 中元素的有效範圍。在某一執行緒上添加新的 work 稱為 `push`,行為上是將 work 擺在 bottom 對應的位置,並且將 bottom 增加 1。而該執行緒從自身的 deque 挑選下個 work 稱為 `take`,作法是選擇在 bottom - 1 位置的 work,並在取出後將 bottom 減少 1。換言之,對執行緒本身 dequeu 的是使用是偏向 stack(LIFO) 方式的。
而如果有執行緒處於閒置狀態,可以嘗試去 `steal` 其他執行緒的 work 來幫忙分擔。但 `steal` 的位置是 `take` 操作另一側的 top 位置。選擇和 `take` 不同側的 work 來消化可能是有好處的:
* 理想的 `steal` 是直接取走 dequeue 中最困難、最需費時完成的一個,因為如此才不會太快完成 steal 到的 work,又要再去 steal 下一個,造成 steal 上的成本。而通常並行模式上是大任務 spawn 出小任務,又 deque 對所屬執行緒是以 stack 方式使用,因此 steal `top` 位置的 work 很可能是更好的選擇
* 因為避開另一個執行緒選擇下個 work 的一側,可預期會有更少的競爭(contention),後者會有同步上的成本
#### `push`
```cpp
void push(deque_t *q, work_t *w)
{
size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed);
size_t t = atomic_load_explicit(&q->top, memory_order_acquire);
array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed);
if (b - t > a->size - 1) { /* Full queue */
resize(q);
a = atomic_load_explicit(&q->array, memory_order_relaxed);
}
atomic_store_explicit(&a->buffer[b % a->size], w, memory_order_relaxed);
atomic_thread_fence(memory_order_release);
atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed); // DDDD
}
```
對應上面的敘述,`push` 主要做的就只是將給定的 work `w` 更新到當前 bottom 所指的位置,然後更新 `bottom` 為 `bottom + 1` 而已。但要留意由於 deque 的 buffer 是可能被填滿的,此時我們需要通過 `resize` 先動態將 buffer 增大,再來完成上述的操作。
* DDDD = `b + 1`
想特別探討 [`atomic_thread_fence(memory_order_release)`](https://en.cppreference.com/w/cpp/atomic/atomic_thread_fence) 在這裡的必要性。首先要知道 release fence 的用途是可保證 fence 後的 store 必然在 fence 之前的任意 load/store 之後。回顧一下 push 的流程大概是:
```
b = load(&q->bottom)
q->array[b] = w; <--- (1)
q->bottom += 1 <--- (2)
```
則編譯器可以重排成以下:
```
b = load(&q->bottom)
q->bottom += 1; <--- (2)
q->array[b] = w; <--- (1)
```
這個重排是合法的。因為若考慮單執行緒,提早更新 `bottom` 並不會影響我們要 push 下個 entry 的結果。然而這個重排對多執行緒的狀況是不同的,因為假設執行緒 A 先做 (2) 且還沒來得及做 (1) 的情況下,另一個執行緒 B 還未等到 `w` 被放到正確位置,就可能先去處理該位置上的東西了。
:::warning
從程式碼理解,這實作感覺是直接假設 `top` 和 `bottom` 不會 unwrap?
:::
:::danger
暫時還沒想到 `atomic_load_explicit(&q->top, memory_order_acquire)` 如果換成 relaxed order 怎樣重排會出問題 :(
:::
:::danger
目前的 `resize` 實作存在議題: `resize` 的作法是配置一個更大的 buffer 並將舊內容搬動到新的 buffer 中。但是若搬動完成後直接釋放掉舊 buffer 可能是有風險的,理由在於另一個執行緒可能正在存取舊有的 buffer。[記憶體釋放問題](#記憶體釋放問題) 一節對此提出改進。
:::
#### `take`
`take` 的目的是從執行緒自己的 dequeue 中取得下個要實行的 work。整體的行為僅僅是得到 `q->bottom - 1` 位置的 work,並且將 `q->bottom` 減一(如果 buffer 非空)。
沒錯,僅僅如此。當然實際上需要考量正確的同步,因為其他執行緒會以 `steal` 方式取走本該在此 deque 中的 work,情況就變得複雜了,這也是為什麼這裡的程式碼看上去如此嚇人。
```cpp
work_t *take(deque_t *q)
{
size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed) - 1;
array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed);
atomic_store_explicit(&q->bottom, b, memory_order_relaxed);
atomic_thread_fence(memory_order_seq_cst);
```
逐行觀察的話,最開始先取得要拿走 work 的位置 `b`,然後就直接將 `q->bottom` 做了減一的更新了。這部份是被 `atomic_thread_fence` 確保必須先於後面的程式執行的。換言之,我們尚未確定 `top` 是否等於 `bottom`(queue 為空的情況)前,就先行預設位置 b 的 work 會被取走而更新了 `bottom`。
這是因為若等到確定好 deque 到底是否為空才更新 `bottom`,可能出現一個 work 同時被 `steal` 又被 `take` 的競爭問題。反之先更新 `bottom` 即使事後之後發現 deque 是空的或是 work 已經被 steal 掉,只要再復原 `bottom` 就好。
```cpp
size_t t = atomic_load_explicit(&q->top, memory_order_relaxed);
work_t *x;
if (t <= b) {
/* Non-empty queue */
x = atomic_load_explicit(&a->buffer[b % a->size], memory_order_relaxed);
if (t == b) {
/* Single last element in queue */
if (!atomic_compare_exchange_strong_explicit(
&q->top, &t, t + 1, // AAAA
memory_order_seq_cst, memory_order_relaxed))
/* Failed race */
x = EMPTY;
atomic_store_explicit(&q->bottom, b + 1,
memory_order_relaxed); // BBBB
}
} else { /* Empty queue */
x = EMPTY;
atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed); // CCCC
}
return x;
}
```
下面判斷會有三種情境,若:
* t < b: 執行緒可直接取走 `b` 位置對應的 work
* t == b: dequeue 中僅剩一個 entry,此時可能和 `steal` 操作產生競爭。可以透過 cmpxchg 來判斷是哪種情形,注意到為了可以作 cmpxchg,這裡刻意從 `top` 方向去取(邏輯上和從 bottom 取其實相同),也就是取完之後我們想將 `q->top + 1`,而非 `q->bottom - 1`,所以無論最後是前述的哪種情況都要復原 `bottom`
* t > b: deque 為空,需要復原 `bottom`
而因為前面原本將 bottom 減一是預設可以成功拿到對應 work 的情況,但發現 dequeu 為空的情況下,需要將 bottom 復原以確保 bottom 的更新情況最終符合沒有取走任何 work 的情況。因此
* AAAA = `t + 1`
* BBBB = `b + 1`
* CCCC = `b + 1`
#### `steal`
`steal` 操作是從其他執行緒的 deque 中偷取下一個 work,與 `take` 從 `bottom` 取起不同,`steal` 會從 `top` 方向進行。
```cpp
work_t *steal(deque_t *q)
{
size_t t = atomic_load_explicit(&q->top, memory_order_acquire);
atomic_thread_fence(memory_order_seq_cst);
size_t b = atomic_load_explicit(&q->bottom, memory_order_acquire);
```
`atomic_thread_fence` 保證需先取 `top` 再去取得 `bottom`。這對應了之前提到當 deque 剩一個 work 時,`take` 會提前更新 `bottom`,再嘗試從 `top` 取得 work 的行為。
```cpp
work_t *x = EMPTY;
if (t < b) {
/* Non-empty queue */
array_t *a = atomic_load_explicit(&q->array, memory_order_consume);
x = atomic_load_explicit(&a->buffer[t % a->size], memory_order_relaxed);
if (!atomic_compare_exchange_strong_explicit(
&q->top, &t, t + 1, memory_order_seq_cst,
memory_order_relaxed)) // EEEE
/* Failed race */
return ABORT;
}
return x;
}
```
則當 t < b,表示很可能 dequeue 中有可以偷走的 work。但具體還是要透過 cmpxchg 去競爭,確保 `take` 和 `steal` 的兩個執行緒最終只有一個可以得到它。
#### `thread`
```cpp
void *thread(void *payload)
{
int id = *(int *) payload;
deque_t *my_queue = &thread_queues[id];
while (true) {
work_t *work = take(my_queue);
if (work != EMPTY) {
do_work(id, work);
} else {
/* Currently, there is no work present in my own queue */
work_t *stolen = EMPTY;
for (int i = 0; i < N_THREADS; ++i) {
if (i == id)
continue;
stolen = steal(&thread_queues[i]);
if (stolen == ABORT) {
i--;
continue; /* Try again at the same i */
} else if (stolen == EMPTY)
continue;
/* Found some work to do */
break;
}
if (stolen == EMPTY) {
/* Despite the previous observation of all queues being devoid
* of tasks during the last examination, there exists
* a possibility that additional work items have been introduced
* subsequently. To account for this scenario, a state of active
* waiting is adopted, wherein the program continues to loop
* until the global "done" flag becomes set, indicative of
* potential new work additions.
*/
if (atomic_load(&done))
break;
continue;
} else {
do_work(id, stolen);
}
}
}
printf("work item %d finished\n", id);
return NULL;
}
```
回到各個執行緒的主體。每個 thread 就是循環進行以下流程以選擇出下個要處理的 work
1. 先嘗試從自己的 dequeue 中取得任務,如果成功取得就執行之
2. 若失敗,嘗試在其他執行緒的 deque 中獲取,值得注意的是當 steal 得到的回傳是 ABORT 時,表示 steal 其實是發生 contention 只是最終失敗了而已,這時候允許在同一個 deque 重偷一次
3. 如果 steal 成功,則執行偷到的 work
不過,終究會有把所有派發的 work 都做完的時候。而即便都看了一輪發現沒有可偷的 work,但不代表所有的 work 都完成了,也有可能再回頭找一輪就可以偷到。那麼怎麼準確知道真的沒有要進行的 work 呢? 這裡的作法是建立一個額外的 work `done_work`,做為其他 work 的一個 argument。當其他 work 完成時就將 argument 中 `done_work` 的 `join_count` 減一。一直到 `join_count` 歸零時 `done_work` 可執行底下的 function,再設置對應 flag 表示所有派發的 work 都完成。各個執行緒可以根據此 flag 決定是否可以結束。
### 程式碼改進
#### take 實作的正確性
```cpp
static work_t *take(deque_t *q)
{
size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed) - 1;
...
size_t t = atomic_load_explicit(&q->top, memory_order_relaxed);
work_t *x = EMPTY;
if (t <= b) { /* Non-empty queue */
...
} else { /* Empty queue */
...
}
```
目前的 `take` 函式實作在 `q->bottom` 為 0 且 `q->top` 為 0 時可能會發生問題。其原因在於: 雖然這種情況應該被判斷為 empty,然而因為型別原因,`b = q->bottom - 1` 並非 -1 而是會變成 `size_t::MAX`,導致 `b >= t` 成立。最終返回的 work 並非 `EMPTY`。
固然我們在前面解釋實作時有提到假設 `top` / `bottom` 不會 overflow,但我們是否也要假設在初始化階段就會預先 push 至少一個 `work_t` 到每個執行緒的 deque 中呢? 若否,則仍應該針對此問題進行修正。
一種比較簡單但有效的改法可能是在 `deque_init` 將 `top` 和 `bottom` 初始化為 1 而非 0。如此一來可以避免整數 wrapping 導致比較大小的錯誤。
#### 記憶體釋放問題
在前面有提到 resize 在資源釋放上的問題。為解決此,一個可行(但不一定足夠高效)的作法是: 我們在從舊的 `array_t` 換到新的時,仍然先保留舊的 `array_t` 以利後續的釋放。而何時可以真正將其釋放呢? 可以參考測驗 2 的中心思想,去追蹤每個執行緒最後一次使用的 `array_t` 是新的還是舊的,當所有執行緒都不再依賴於舊的 `array_t` 時,那麼就是時候將其釋放了。
具體的實作可以在 [deque.c](https://github.com/RinHizakura/linux-summer-2023/blob/main/hw3/work-steal-qsort/src/deque.c) 的 `deque_gc` 中看到。
```cpp
static void deque_gc(deque_t *q)
{
array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed);
atomic_store_explicit(&q->handle[tid].array, a, memory_order_relaxed);
atomic_thread_fence(memory_order_seq_cst);
array_t *old_a = atomic_load_explicit(&q->old_array, memory_order_relaxed);
/* Make threads racing for resetting the q->old_array, so we won't
* double freeing incorrectly. */
if (old_a && atomic_compare_exchange_weak(&q->old_array, &old_a, NULL)) {
for (int i = 0; i < q->nr_threads; i++) {
array_t *cur =
atomic_load_explicit(&q->handle[i].array, memory_order_relaxed);
if (cur == old_a) {
atomic_store_explicit(&q->old_array, old_a,
memory_order_relaxed);
return;
}
}
free(old_a);
}
}
```
`deque_gc` 會在每次的 `push`/`take`/`steal` 最後被呼叫,以追蹤每個執行緒對 deque 的使用狀況。而當 `q->old_array` 指向非 NULL 時,這表示有舊的 `array_t` 該被回收,此時執行緒間會以競爭將 `q->old_array` 設為 NULL 的方式來獲得「檢查是否可以釋放」和「真正釋放記憶體」的機會。之所以要競爭 `q->old_array`,是為了避免同時有兩個執行緒做嘗試而導致 double free 問題。
需注意的是,這個設計要正確所需基於的假設是: 當一個新的 `array_t` 該被建立時,前一個 `array_t` 應當已經被正確釋放了,否則因為我們沒有維護一連串舊 `array_t` 的機制,這將導致 dangling pointer 的問題。
目前使用 valgrind 測試可以正確釋放所有資源。但如果想要更完善的考慮上述假設不成立的情況,是有改進的空間的。另外,進行 `gc` 的條件或許也可以再做優化,以避免檢查帶來的額外成本。
```
$ valgrind ./build/qsort-mt
==9806== Memcheck, a memory error detector
==9806== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==9806== Using Valgrind-3.18.1 and LibVEX; rerun with -h for copyright info
==9806== Command: ./build/qsort-mt
==9806==
==9806==
==9806== HEAP SUMMARY:
==9806== in use at exit: 0 bytes in 0 blocks
==9806== total heap usage: 1,166 allocs, 1,166 frees, 448,960 bytes allocated
==9806==
==9806== All heap blocks were freed -- no leaks are possible
==9806==
==9806== For lists of detected and suppressed errors, rerun with: -s
==9806== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
```
### 引入 work stealing 機制至 qsort-mt
在 [work-steal-qsort](https://github.com/RinHizakura/linux-summer-2023/tree/main/hw3/work-steal-qsort) 中實驗性的將原測驗題的 work-stealing 機制封裝成更易使用的[介面](https://github.com/RinHizakura/linux-summer-2023/blob/main/hw3/work-steal-qsort/include/hina.h),以便未來有需要在 qsort 以外的演算法上使用時能夠更輕易的套用。注意到目前的介面設計是以 qsort 所需為主,根據平行方式的差異可能需再調整或新增。
對於 `take` 和 `push` 操作有做部份 memory model 和實作上的調整,主要是考慮了 `take` 和 `push` 之間的競爭: 原本是希望一個 thread 也可以透過 `push` 把 work 加入到其他 dequeue,雖然最終因為若想允許此功能,還得附帶考量 `push` 和 `push` 之間的競爭而作罷,但還是暫時保留下來作為後續的參照。
可以從實驗觀察到 CPU 的利用方式對於 throughput 的影響。參照測試環境如下:
```
$ lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Address sizes: 39 bits physical, 48 bits virtual
Byte Order: Little Endian
CPU(s): 16
On-line CPU(s) list: 0-15
Vendor ID: GenuineIntel
Model name: 11th Gen Intel(R) Core(TM) i7-11700 @ 2.50GHz
```
則使用 1 個執行緒到 32 個執行緒排序含 100000 個整數之陣列所需的時間,統計之結果如下(使用統計方法測量,可以得到比較平滑的曲線)。可以明顯看到並非任意的增加執行緒就能提昇執行表現,以使用 15 個執行緒為轉折,之後增加執行緒反而造成延遲的提昇。

從下圖可以更明顯的看到,在從 1 個執行緒提升到 15 個時延遲時間是有降低的,雖然隨著執行緒越多則越發不明顯。這些證據一方面顯示了引入 work-stealing 機制在 qsort 上是有一定效果的,也印驗了在 Linux 上經常設計 per-CPU 的資料結構的一大原因(如 CFS 中的 runqueue,或是後面提到的 CMWQ)。
可能會有疑惑的是,為什麼轉折點是 15 而非 16 個執行緒呢? 明明是 16 核的測試平台。其實這是因為當前的設計除了 work thread 以外還會有額外的 1 個 main thread 會一直 polling 某個狀態的 flag 以正確結束執行,導致對 CPU 資源的耗損。這部份是可以再做改進的。

:::danger
FIXME: 因為[測試腳本](https://github.com/RinHizakura/linux-summer-2023/blob/main/hw3/work-steal-qsort/plot.py)會大量實驗以便抽樣,過程其實有發現少數幾次會有 sorting 失敗的情況。這表示當前的實作並不是 100% 正確的,還需要再深入調整 QQ
:::
### Concurrency Managed Workqueue
整理至獨立的筆記 [Concurrency Managed Workqueue(CMWQ)](https://hackmd.io/@RinHizakura/H1PKDev6h)。
## 測驗 2
### 程式運行原理
#### 資料結構
```cpp
typedef struct {
node_t *init_node;
volatile long init_id __DOUBLE___CACHE_ALIGNED;
volatile long put_index __DOUBLE___CACHE_ALIGNED;
volatile long pop_index __DOUBLE___CACHE_ALIGNED;
handle_t *enqueue_handles[N_HANDLES], *dequeue_handles[N_HANDLES];
int threshold;
pthread_barrier_t enq_barrier, deq_barrier;
} mpmc_t;
```
`mpmc_t` 是貫穿整個測驗題的主題結構,底下成員包含了:
* `init_node`: mpmc 所需要的 queue 是以數個 `node_t` 鏈結在一起形成的,而 `init_node` 就是這段鏈結的開頭
* `node_t` 具體定義如下。每個 `node_t` 中會有一個固定長度的陣列 `ceil` 存放 mpmc 操作的單元,而如果陣列的容量不足,就建立另一個 node 並以 `next` reference 之,將新的 `node` 作為 mpmc queue 的延續
* `node_t` 中的 `id` 用來標示,`id` 越大表示是更新的 node,具體用途則是和 `node_t` 的查找和可否釋放的判斷有關,後面會再細講
```cpp
typedef struct __node {
struct __node *volatile next __DOUBLE___CACHE_ALIGNED;
long id __DOUBLE___CACHE_ALIGNED;
void *cells[N] __DOUBLE___CACHE_ALIGNED;
} node_t;
```
* `init_id`: 整個 `mpmc_t` 底下所有的 node 中最小的 `id` (最舊的 node 之 `id`)
* `put_index`: 作 enqueue 時對應的位置,並且在 enqueue 結束後 + 1
* `pop_index`: 作 dequeue 時對應的位置,並且在 dequeue 結束後 + 1
* `enqueue_handles`/`dequeue_handles`: 每個執行緒(producer 或 consumer)都必須建立自己的 `handle_t` 並註冊給 `mpmc_t`
* `handle_t` 結構如下,包含了一個 `spare` 是每個 thread 會直接預留 `node_t` 給 mpmc queue 擴充使用
* `push` 是該 thread 前一次 enqueue 操作的 `node_t`,`pop` 則是前次 dequeue 操作的 `node_t*`,這目的是掌握各個 thread 對 `node_t` 的使用狀況,以正確的釋放記憶體資源
```cpp
typedef struct {
node_t *spare;
node_t *volatile push __CACHE_ALIGNED;
node_t *volatile pop __CACHE_ALIGNED;
} handle_t;
```
* `threshold`: 由於檢查具體有哪些 `node_t` 可以釋放涉及同步上的演算法,`threshold` 定義了整體建立了多少個 `node_t` 時才啟動回收的流程
* `enq_barrier`/`deq_barrier`: [pthread_barrier_t](https://linux.die.net/man/3/pthread_barrier_init) 是多執行緒之間的一種同步方式,在 init 的時候會指定執行緒數量,則要等該數量的執行緒都執行到該 barrier 後,所有的執行緒才被允許繼續執行
#### `producer`
```cpp
static void *producer(void *index)
{
mpmc_t *q = &mpmc;
handle_t *th = calloc(1, sizeof(handle_t));
mpmc_queue_register(q, th, ENQUEUE);
for (;;) {
pthread_barrier_wait(&prod_barrier);
for (int i = 0; i < COUNTS_PER_THREAD; ++i)
mpmc_enqueue(
q, th, (void *) 1 + i + ((intptr_t) index) * COUNTS_PER_THREAD);
pthread_barrier_wait(&prod_barrier);
}
return NULL;
}
```
在 thread 建立之初需要建立各個執行緒獨立的 `handle_t` 到 `mqmc_t`。然後 `producer` 就只是迴圈透過 [`mpmc_enqueue`](#mpmc_enqueue) 重複將新 entry enqueue 到 mpmc queue 之中而已。
這裡的兩個 `prod_barrier` wait 對應在 main thread 的兩次 `prod_barrier`,其目的是每完成 `COUNTS_PER_THREAD` 次數的 enqueue 之後可以讓 main thread 停下來計算完成的時間。
#### `consumer`
```cpp
static void *consumer(void *index)
{
mpmc_t *q = &mpmc;
handle_t *th = calloc(1, sizeof(handle_t));
mpmc_queue_register(q, th, DEQUEUE);
for (;;) {
pthread_barrier_wait(&cons_barrier);
for (long i = 0; i < COUNTS_PER_THREAD; ++i) {
int value;
if (!(value = (intptr_t) mpmc_dequeue(q, th)))
return NULL;
array[value] = true;
}
pthread_barrier_wait(&cons_barrier);
}
fflush(stdout);
return NULL;
}
```
...
#### `mpmc_find_cell`
`mpmc_find_cell` 用於找尋在 enqueue 或 dequeue 時給定的 index `i` 所屬的 node cell 之位置。
```cpp
static void *mpmc_find_cell(node_t *volatile *ptr, long i, handle_t *th)
{
node_t *curr = *ptr; /* get current node */
/* j is thread's local node id (put node or pop node), (i / N) is the cell
* needed node id. and we should take it, By filling the nodes between the j
* and (i / N) through 'next' field
*/
for (long j = curr->id; j < i / N; ++j) {
node_t *next = curr->next;
if (!next) { /* start filling */
/* use thread's standby node */
node_t *tmp = th->spare;
if (!tmp) {
tmp = mpmc_new_node();
th->spare = tmp;
}
tmp->id = j + 1; /* next node's id */ // ZBBB
/* if true, then use this thread's node, else then thread has have
* done this.
*/
/* __atomic_compare_exchange_n(ptr, cmp, val, 0, __ATOMIC_RELEASE,
* __ATOMIC_ACQUIRE) is an atomic compare-and-swap that ensures
* release semantic when succeed or acquire semantic when failed.
*/
if (__atomic_compare_exchange_n(&curr->next, &next, tmp, 0,
__ATOMIC_RELEASE,
__ATOMIC_ACQUIRE)) {
next = tmp;
th->spare = NULL; /* now thread there is no standby node */
}
}
curr = next; /* take the next node */
}
```
會從想 find cell 的執行緒前次操作的 node `ptr` 開始找起。因為每個 node 可以存 N 個 cell,因此其實 index $[0, (N - 1)]$ 就對應 node id 0,index $[N, (2N - 1)]$ 就對應 node id 1.....以此類推。根據此規則我們可以找到想存取的 id 應該落在哪一個 `node_t` 結構中。
如果按照 index 線性搜尋的過程中,遇到 node 是還沒建立出來的(`!next`),則先看看每個 thread 最初預留的 `spare` node 是不是已經取走了,若尚未被取則將其作 `next`,否則就是直接分配一個新的 node。將 node 標註上正確的 id 之後,則可以建立 `curr->next = next` 的鏈結關係。
:::danger
如果 cmpxchg 失敗,`next` 不就恆為 NULL 了嗎? 這樣假如有下個 iteration 會存取到 NULL pointer?
:::
```cpp
*ptr = curr; /* update our node to the present node */
/* Orders processor execution, so other thread can see the '*ptr = curr' */
__atomic_thread_fence(__ATOMIC_SEQ_CST);
/* now we get the needed cell */
return &curr->cells[i & N_BITS]; // ZDDD
}
```
找到了目標 index 對應的 node 之後,做 `*ptr = curr` 將 `th->push` / `th->pop` 更新到對應的 node。前面有提到這是為了追蹤每個執行緒存取 `node_t` 的狀況,以確保不會錯誤釋放某個執行緒之後會繼續存取的 `node_t`。最終返回 index 對應的 cell。
:::danger
這裡 `__atomic_thread_fence` 的目的是應該是保證 `*ptr = curr` 的更新即時被其他執行緒同步到? 但總感覺 `th->push` / `th->pop` 同步上無論重排結果如何其實不會有正確性的疑慮,可能影響的只是可以被釋放的 `node_t` 第一時間被錯過而已(感覺可以用 relaxed order store 就好,重排不需要保證)?
:::
#### `mpmc_enqueue`
```cpp
void mpmc_enqueue(mpmc_t *q, handle_t *th, void *v)
{
/* return the needed index */
void *volatile *c = mpmc_find_cell(
&th->push, __atomic_fetch_add(&q->put_index, 1, __ATOMIC_SEQ_CST), th);
/* __atomic_fetch_add(ptr, val) is an atomic fetch-and-add that also
* ensures sequential consistency
*/
/* now c is the needed cell */
void *cv;
/* if XCHG (ATOMIC: Exchange Register/Memory with Register) return NULL,
* so our value has put into the cell, just return.
*/
if ((cv = __atomic_exchange_n(c, v, __ATOMIC_ACQ_REL)) == NULL)
return;
/* else the counterpart pop thread has wait this cell, so we just change the
* waiting value and wake it
*/
*((int *) cv) = 0; // ZAAA
mpmc_futex_wake(cv, 1);
}
```
`mpmc_enqueue` 根據 `mpmc_t` 當下 queue 的狀態,藉 `mpmc_find_cell` 找到要可以放下個 entry 的 cell `c`,以 xchg 方式將其置入。如果 `__atomic_exchange_n` 回傳 NULL,表示 `v` 加入該 cell 且期間沒有被其他執行緒競爭走。
若非 NULL,意味著 pop thread 事先放了一個 futex 在 cell(詳見 `mpmc_dequeue`),則除了把 `v` 放到 cell 之外,還需要改變 futex 的值來喚醒在等待 futex 的另一個執行緒(對照 `mpmc_dequeue` 的 `futex_wait`)。
:::info
注意 `__atomic_exchange_n` 與 `__atomic_compare_exchange_` 不同,後者只有在 compare 結果正確時可以置換原本的值。
:::
#### `mpmc_dequeue`
```cpp
void *mpmc_dequeue(mpmc_t *q, handle_t *th)
{
void *cv;
int futex_addr = 1;
/* the needed pop_index */
long index =
__atomic_fetch_add(&q->pop_index, 1, __ATOMIC_SEQ_CST); // ZCCC
/* locate the needed cell */
void *volatile *c = mpmc_find_cell(&th->pop, index, th);
/* because the queue is a blocking queue, so we just use more spin. */
int times = (1 << 20);
do {
cv = *c;
if (cv)
goto over;
#if defined(__i386__) || defined(__x86_64__)
__asm__ __volatile__("pause");
#elif defined(__aarch64__) || defined(__arm__)
__asm__ __volatile__("isb\n");
#endif
} while (times-- > 0);
```
`mpmc_dequeue` 與 enqueue 相對。首先同樣先由 `mpmc_find_cell` 找到可以取得下個 entry 的 cell `c`。後續的 do ... while 嘗試以 blocking wait 方式等待直到從 cell `c` 中取出非 NULL 的指標,後者即 producer 產生的新內容。這種情況下則可以跳過下面一小段程式碼,直接進行 `over` 的地方。
* `ZCCC` = 1
```cpp
/* XCHG, if return NULL so this cell is NULL, we just wait and observe the
* futex_addr's value to 0.
*/
if ((cv = __atomic_exchange_n(c, &futex_addr, __ATOMIC_ACQ_REL)) == NULL) {
/* call wait before compare futex_addr to prevent use-after-free of
* futex_addr at mpmc_enqueue(call wake)
*/
do {
mpmc_futex_wait(&futex_addr, 1);
} while (futex_addr == 1);
/* the counterpart put thread has change futex_addr's value to 0. and
* the data has into cell(c).
*/
cv = *c;
}
```
若 blocking wait 的 cell 中始終為 NULL,表示 producer 還來不及製造新產物。則 consumer 以 futex 競爭該 cell。若競爭成功,則可以透過 `futex_wait` 進入 suspend ,一直等待到 producer 完成後再藉由該 futex 通知自己就好。
```cpp
over:
/* if the index is the node's last cell: (N_BITS == 4095), it Try to reclaim
* the memory. so we just take the smallest ID node that is not
* reclaimed(init_node), and At the same time, by traversing the local data
* of other threads, we get a larger ID node(min_node). So it is safe to
* recycle the memory [init_node, min_node).
*/
if ((index & N_BITS) == N_BITS) {
/* __atomic_load_n(ptr, __ATOMIC_ACQUIRE) is a load with a following
* acquire fence to ensure no following load and stores can start before
* the current load completes.
*/
long init_index = __atomic_load_n(&q->init_id, __ATOMIC_ACQUIRE);
/* __atomic_compare_exchange_n(ptr, cmp, val, 0, __ATOMIC_ACQUIRE,
* __ATOMIC_RELAXED) is an atomic compare-and-swap that ensures acquire
* semantic when succeed or relaxed semantic when failed.
*/
if ((th->pop->id - init_index) >= q->threshold && init_index >= 0 &&
__atomic_compare_exchange_n(&q->init_id, &init_index, -1, 0,
__ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
```
至此 dequeue 的流程其實算是完成了。但遺漏的問題是: 我們總是動態配置 `node_t` 作為 mpmc queue 的延伸,那麼何時可以將不再使用到的 `node_t` 進行回收並釋放呢? 又該以何方法判斷是哪些 `node_t` 可以被釋放?
關於「何時」,由於檢查每個 `node_t` 在各個執行緒的使用狀況需要額外成本,如果頻繁檢查可能帶來效能的衝擊,因此本測驗題是以 pop index 是 node_t 的最後一個 cell 作為檢查點。首先可以取得 `q->init_id`,前面有提到這是 `mpmc_t` 目前的所有的 node 中最小的 id。然後在當前 pop 所用的 `node_t` 之 id 與 `init_id` 達到一定差距後 (`q->threshold`),才正式進行回收記憶體的流程。
注意由哪個執行緒做 `node_t` 的釋放也是需要正確的同步的,否則可能會有 double free 問題。因此執行緒間會去競爭把 `q->init_id` 從有效值設為 -1 的操作,成功的執行緒才可以進行後續任務。
```cpp
node_t *init_node = q->init_node;
th = q->dequeue_handles[0];
node_t *min_node = th->pop;
int i;
handle_t *next = q->dequeue_handles[i = 1];
while (next) {
node_t *next_min = next->pop;
if (next_min->id < min_node->id)
min_node = next_min;
if (min_node->id <= init_index)
break;
next = q->dequeue_handles[++i];
}
next = q->enqueue_handles[i = 0];
while (next) {
node_t *next_min = next->push;
if (next_min->id < min_node->id)
min_node = next_min;
if (min_node->id <= init_index)
break;
next = q->enqueue_handles[++i];
}
```
而關於方法,其實我們就是去找出當前所有執行緒底下的 `push` 和 `pop` 中的最小的 id,則 `[init_index, min_node->id)` 這範圍間的 `node_t` 就是我們不再會用到的,可以一併進行釋放。
:::info
需處理 `min_node->id <= init_index` 的情形。這可能發生在執行緒因為某些原因沒有進展而始終在使用一個比較舊的 `node_t`,這時我們只好先不做任何釋放。
:::
:::warning
覺得透過 `push`/`pop` 維護各個 thread 會用到的 `node_t` 範圍有點類似於 [Hazard pointer](https://en.wikipedia.org/wiki/Hazard_pointer) 想法的一種延伸? 只是 Hazard pointer 是記錄每個執行緒有哪些指標可以 retire,這裡則是反過來記錄有在用的
:::
```cpp
long new_id = min_node->id;
if (new_id <= init_index)
/* __atomic_store_n(ptr, val, __ATOMIC_RELEASE) is a store with
* a preceding release fence to ensure all previous load and
* stores completes before the current store is visible.
*/
__atomic_store_n(&q->init_id, init_index, __ATOMIC_RELEASE);
else {
q->init_node = min_node;
__atomic_store_n(&q->init_id, new_id, __ATOMIC_RELEASE);
do {
node_t *tmp = init_node->next;
free(init_node);
init_node = tmp;
} while (init_node != min_node);
}
}
}
return cv;
}
```
則最後可以將 `[init_node, min_node)` 範圍間的 `node_t` 做釋放,注意到 `q->init_id` 也需隨此更新。
## 挑戰 2
### 程式碼實作原理
參照[挑戰 2](https://hackmd.io/@sysprog/linux2023-summer-quiz3#%E6%8C%91%E6%88%B0-2) 的敘述,[MCS lock](https://lwn.net/Articles/590243/) 的核心理念是避免 lock 的競爭導致大量的 cache miss,進而造成記憶體存取上的延遲。
#### `mcslock_acquire`
```cpp
void mcslock_acquire(mcslock_t *lock, mcsnode_t *node)
{
node->next = NULL;
/* A0: Read and write lock, synchronized with A0/A1 */
mcsnode_t *prev = __atomic_exchange_n(lock, node, __ATOMIC_ACQ_REL);
if (LIKELY(!prev)) /* Lock uncontended, the lock is acquired */
return;
```
`mcslock_acquire` 的作用是取得 lock,參照原本說明,在 lock 未被任何人持有的情況下,其實僅為一個指向 NULL 的指標。
```graphviz
digraph {
mcs [color="white"]
null [color="white"]
mcs -> null
}
```
當第一個 acquire 的執行緒進行 acquire,則將指標指向該執行緒的 `mcsnode_t` 即可返回。
```graphviz
digraph {
mcs [color="white"]
null [color="white"]
node [shape=record];
node1 [label=" node1 | {<f0> next|<f1>}"];
mcs -> node1
node1:f0 -> null
}
```
```cpp
/* Otherwise, the lock is owned by another thread, waiting for its turn */
node->wait = MCS_WAIT;
/* B0: Write next, synchronized with B1/B2 */
__atomic_store_n(&prev->next, node, __ATOMIC_RELEASE);
```
如果又有其他執行緒想要 acquire(假設使用 node2),則除了改將 mcs 指向新的 node2 之外,也會使得原本的 node1 也指向 node2
```graphviz
digraph {
mcs [color="white"]
null [color="white"]
node [shape=record];
node1 [label=" node1 | {<f0> next|<f1> }"];
node2 [label=" node2 | {<f0> next|<f1> MCS_WAIT }"];
mcs -> node2
node1:f0:e -> node2
node2:f0 -> null
}
```
再有另一個 node3 的話會變成下面的樣子。
```graphviz
digraph {
mcs [color="white"]
null [color="white"]
node [shape=record];
node1 [label=" node1 | {<f0> next|<f1> }"];
node2 [label=" node2 | {<f0> next|<f1> MCS_WAIT }"];
node3 [label=" node3 | {<f0> next|<f1> MCS_WAIT }"];
mcs -> node3
node1:f0:e -> node2
node2:f0:e -> node3
node3->null
}
```
```cpp
/* Waiting for the previous thread to signal using the assigned node
* C0: Read wait, synchronized with C1
*/
wait_until_equal_u8(&node->wait, MCS_PROCEED, __ATOMIC_ACQUIRE);
}
```
像是 node2 和 node3 這種被標註為 `MCS_WAIT` 的 node,他們會一直等待到 `wait` 狀態的變更才可以獲得 lock。由於 `wait` 是由執行緒各自的 node 提供,可以想像當 mcslock 形成一個很長的排隊鏈,排在尾端的也不會因為前面每次 release lock 就需要 invalidate cache,如此一來可能就能一定程度避免 cacheline bouncing。
#### `mcslock_release`
```cpp
void mcslock_release(mcslock_t *lock, mcsnode_t *node)
{
mcsnode_t *next;
/* Check if any waiting thread exists */
/* B1: Read next, synchronized with B0 */
if ((next = __atomic_load_n(&node->next, __ATOMIC_ACQUIRE)) == NULL) {
/* No waiting threads detected, attempt lock release */
/* Use temporary variable as it might be overwritten */
mcsnode_t *tmp = node;
/* A1: write lock, synchronize with A0 */
if (__atomic_compare_exchange_n(lock, &tmp, NULL, 0, __ATOMIC_RELEASE,
__ATOMIC_RELAXED)) {
/* No waiting threads yet, lock released successfully */
return;
}
/* Otherwise, at least one waiting thread exists */
/* Wait for the first waiting thread to link its node with ours */
/* B2: Read next, synchronized with B0 */
while ((next = __atomic_load_n(&node->next, __ATOMIC_ACQUIRE)) == NULL)
spin_wait();
}
/* Signal the first waiting thread */
/* C1: Write wait, synchronized with C0 */
__atomic_store_n(&next->wait, MCS_PROCEED, __ATOMIC_RELEASE);
}
```
從前面的案例中可以知道,載入 `node->next` 並判斷是否為 NULL 就知道是否有下個 lock 的競爭者。若 `next` 為 NULL 表示沒有競爭者,此時首先需要 `cmpxchg` 嘗試將其 lock 指回 NULL,若嘗試失敗,表示途中有人 acquire lock,那麼等待 acquire 的人建立好連結後,再為其更新 `wait` 的狀態為 `MCS_PROCEED`。
若 `next` 非 NULL 是存在下個等待 lock 的競爭者的狀況,則直接設置 `wait` 為 `MCS_PROCEED` 使其成為下一個即可。
### C11 Atomics 改寫
[Use C11 atomic for mcslock example](https://github.com/RinHizakura/concurrent-programs/commit/6043e5272dbbdfee68d633aaae095e7c0507c4c2)。
理解上 builtin 和 C11 的 API 都是有直接的 1-to-1 對應的,沒有什麼特別困難之處(嗎
)?
### 研讀 Linux 核心的 locking 實作
對照測驗題和 [mcs_spinlock.h](https://github.com/torvalds/linux/blob/master/kernel/locking/mcs_spinlock.h) 之間的實作,其實整體實作邏輯相同。即便如此,還是存在一些細節上的差異。
一個值得關注的地方是 [locking/mcs: Use smp_cond_load_acquire() in MCS spin loop](https://github.com/torvalds/linux/commit/7f56b58a92aaf2cab049f32a19af7cc57a3972f2) 這個 commit。原本 `arch_mcs_spin_lock_contended` 的寫法是透過以下方式:
```cpp
#define arch_mcs_spin_lock_contended(l) \
do { \
while (!(smp_load_acquire(l))) \
cpu_relax(); \
} while (0)
```
這和本測驗題的 `wait_until_equal_u8` 也比較類似。但在新版的 Linux 上,更改為以下形式:
```cpp
#define arch_mcs_spin_lock_contended(l) \
do { \
smp_cond_load_acquire(l, VAL); \
} while (0)
```
該改動主要是針對 ARM64 平台。當時的 [`cpu_relax()`](https://github.com/torvalds/linux/blob/7f56b58a92aaf2cab049f32a19af7cc57a3972f2/arch/arm64/include/asm/processor.h#L201) 最終展開後為 [`YIELD`](https://developer.arm.com/documentation/dui0473/m/arm-and-thumb-instructions/yield),這主要可以減少因為 spinning 而導致的耗電。然而原作者指出 WFE 可能更適合對應情境,也就是 [`smp_cond_load_acquire`](https://github.com/torvalds/linux/blob/7f56b58a92aaf2cab049f32a19af7cc57a3972f2/arch/arm64/include/asm/barrier.h#L131) 展開後的作法。而在其他平台上 `smp_cond_load_acquire` 則和原本的實作接近甚至更好,因此最終做出此調整。
:::danger
沒有找到比較正式的文件說明選擇 ISB 和 WFE 的情境差異 :(
:::
### MCS lock 效益實驗
...
## 挑戰 3
###
### 程式碼實作原理
參照[挑戰 3](https://hackmd.io/@sysprog/linux2023-summer-quiz3#%E6%8C%91%E6%88%B0-3) 的說明,[seqlock](https://docs.kernel.org/locking/seqlock.html) 的優勢在於大量讀取和少許寫入的情境。與只允許 reader 和 reader 間同時進入 critical section 的 [rwlock](https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock) 相比,在 seqlock 中 reader 和 writer 間也可以同時進入之。這避免了 writer 因為 reader 的活躍而產生 starvation 的情況。而為了避免資料的不一致,seqlock 中的 reader 需要適時 retry。
#### 資料結構
```cpp
typedef uint32_t seqlock_t;
```
lock 本身其實只是一個 counter。此 counter 會初始為偶數的 0。writer 在進入 critical section 時將其加一,並在離開時也加一。則結合前段說明的對 writer 之限制條件下,couter 為奇數時就意味 writer 在 critical section 中,反之則 writer 不在 critical section。
對於 reader,必須等待 writer 離開 critical section 才能繼續運作。且 reader 進入 critical section 時需要記下 counter 的數值,並在離開時比對該數值是否有異動。若數值改變,意味著 read 途中發生 writer 的介入,因此必須重新讀取被更新後的資料。若數值不變,才可以直接離開 critical section。
#### `seqlock_acquire_rd`
```cpp
static inline seqlock_t wait_for_no_writer(const seqlock_t *sync, int mo)
{
seqlock_t l;
SEVL(); /* Do SEVL early to avoid excessive loop alignment (NOPs) */
if (UNLIKELY(((l = __atomic_load_n(sync, mo)) & SEQLOCK_WRITER) != 0)) {
while (WFE() && ((l = LDX(sync, mo)) & SEQLOCK_WRITER) != 0)
spin_wait();
}
assert((l & SEQLOCK_WRITER) == 0); /* No writer in progress */
return l;
}
seqlock_t seqlock_acquire_rd(const seqlock_t *sync)
{
/* Wait for any present writer to go away */
/* B: Synchronize with A */
return wait_for_no_writer(sync, __ATOMIC_ACQUIRE);
}
```
在 reader 端,進入 critical section(acquire)的條件是確保當前沒有 writer 即可,也就是確保 counter 為奇數: bit 0 為 1(`SEQLOCK_WRITER`)。
題目這裡也針對 aarch64 硬體架構進行微幅的優化:
* `SEVL`: ARM64 平台下提供 `sevl`(Set Event Locally) 指令,可以主動將 CPU 從 WFE 中喚醒
* `LDX`: 直接用 `ldaxrb` 或 `ldxrb` 取代 compiler 提供的 `__atomic_load_n`,
#### `seqlock_release_rd`
```cpp
bool seqlock_release_rd(const seqlock_t *sync, seqlock_t prv)
{
/* Enforce Load/Load order as if synchronizing with a store-release or
* fence-release in another thread.
*/
__atomic_thread_fence(__ATOMIC_ACQUIRE);
/* Test if sync remains unchanged => success */
return __atomic_load_n(sync, __ATOMIC_RELAXED) == prv;
}
```
reader 在離開 critical section 時必須確保 counter 與進入時的一致,否則就要做 retry。因此整個 reader 的運作邏輯如下:
```cpp
void seqlock_read(seqlock_t *sync, void *dst, const void *data, size_t len)
{
seqlock_t prv;
do {
prv = seqlock_acquire_rd(sync);
atomic_memcpy(dst, data, len);
} while (!seqlock_release_rd(sync, prv));
}
```
#### `seqlock_acquire_wr`
```cpp
void seqlock_acquire_wr(seqlock_t *sync)
{
seqlock_t l;
do {
/* Wait for any present writer to go away */
l = wait_for_no_writer(sync, __ATOMIC_RELAXED);
/* Attempt to increment, setting writer flag */
} while (
/* C: Synchronize with A */
!__atomic_compare_exchange_n(sync, &l, l + SEQLOCK_WRITER,
/*weak=*/true, __ATOMIC_ACQUIRE,
__ATOMIC_RELAXED));
/* Enforce Store/Store order as if synchronizing with a load-acquire or
* fence-acquire in another thread.
*/
__atomic_thread_fence(__ATOMIC_RELEASE);
}
```
writer 同樣必須要等到當前沒有 writer 才能進入 critical section,且需與其他 writer 競爭進入之。
#### `seqlock_release_wr`
```cpp
void seqlock_release_wr(seqlock_t *sync)
{
seqlock_t cur = *sync;
if (UNLIKELY(cur & SEQLOCK_WRITER) == 0) {
perror("seqlock: invalid write release");
return;
}
/* Increment, clearing writer flag */
/* A: Synchronize with B and C */
__atomic_store_n(sync, cur + 1, __ATOMIC_RELEASE);
}
```
writer 離開 critical section 時只要將 couter 再加一即可。
### Linux 中的 seqlock.h
Linux 核心的 [seqlock.h](https://github.com/torvalds/linux/blob/master/include/linux/seqlock.h) 提供更多元的機制。在 Linux 上首先提供 Sequence counters `seqcount_t`,這是 seqlock 底層的 counter,其並不提供多個 writer 間的互斥。若要直接使用該結構和相關 API,使用者端必須自行確保 writer 端的 critical section 之序列化(serialized)與不可搶佔(non-preemptible),以保證程式的同步正確性。
可以將 `seqcount_t` 與特定 lock 關聯起來,這形成 `seqcount_LOCKNAME_t`,這使得得以引入 [lockdep](https://docs.kernel.org/locking/lockdep-design.html) 來驗證寫入端部分是否已正確序列化和做搶佔的保護。具體 Kernel 中提供以下幾種:
* seqcount_spinlock_t
* seqcount_raw_spinlock_t
* seqcount_rwlock_t
* seqcount_mutex_t
* seqcount_ww_mutex_t
如果想要自動保證前述序列化與不可搶佔性,可以使用 `seqlock_t` 結構。後者結合 `seqcount_t` 和 spinlock 來達成此目的。
### 使用 seqlock 改寫 [spmc](https://github.com/sysprog21/concurrent-programs/tree/master/spmc)
嚴格來說,seqlock 是適用於"大量讀取操作和少量寫入操作"的場景,而非 [spmc](https://github.com/sysprog21/concurrent-programs/tree/master/spmc) 所描述的"大量讀取者和單一寫入者"的場景。這兩者並不相同,而後者是不利於 seqlock 發揮的,原因如下:
* seqlock 的正確性建立於當 reader 期間有 writer 進入,reader 就必須 retry,因此效益建立於大多 reader 期間沒有 writer 介入的場景。然而對於 spmc 的例子,雖然只有單一 writer,但 writer 卻會不斷做寫入操作而介入 reader
* seqlock 允許 reader 和 reader 間同時進入 critical section,但這前提是 reader 真的只是單純讀取資料。但在 spmc 的例子中 reader A 如果去寫 `node->front`,reader B 是要同步這件事的
結論來說,我不認為拿 seqlock 來改寫 [spmc](https://github.com/sysprog21/concurrent-programs/tree/master/spmc) 是合適的。此延伸題目應考慮設計其他更適合 seqlock 的場景。
## 挑戰 4
### 通過 ThreadSanitizer
原本的 cmap 無法通過 ThreadSanitizer,在 [#22](https://github.com/sysprog21/concurrent-programs/pull/22) 對此提出解決方案。
大部分的問題都可以只是將原本的操作換成 atomic operation 即可,例如 `inserts`/`removes` 一類的 counter,或者 `running` 這種狀態的變數。但仍有幾個需要留意的修改,因為這些修改雖然避免了 data race,但避免的方式可能與原本實作的邏輯不同,因此需要審慎考量這樣的邏輯調整是否合理。
一個部分是 random.c 中的 `seed`。這原本由於是由多個執行緒共用的,但沒有透過 atomic 操作存取而有 race。雖然直接替換成 atomic 操作可以解決 ThreadSanitizer 偵測到 race 的問題,但這種使用方式卻表示 seed 的產生也是無法預期。觀察 `random_next` 的實作:
```cpp
static uint32_t random_next(void)
{
uint32_t *seedp = &seed;
*seedp ^= *seedp << 13;
*seedp ^= *seedp >> 17;
*seedp ^= *seedp << 5;
return *seedp;
}
```
即便直接將上述操作轉換成 atomic,但因為執行緒間的競爭可能發生比如同一個 seed 被 `<<13` 多次之後,才又做 `<<17`, `<< 5` 來得到下個 random 值。我認為這並非預期的行為。取而代之,可以將 seed 改為各執行緒各自獨立的 thread local storage,使得不同的執行緒都以相異的 seed 來產生本身所需的 random 數,這會是比較合理的做法。
另一個難題是 writer thread 在資源釋放與 reader 的競爭。在 writer thread,從 `cmap` 中移除的物件會直接以 free 釋放掉:
```cpp
/* Remove */
...
MAP_FOREACH_WITH_HASH (elem, node, hash, cmap_state) {
if (elem->value > max_value) {
cmap_remove(&cmap_values, &elem->node);
free(elem);
...
```
然而我們在釋放時並沒有保證 reader 擁有該物件的指標並可以存取,這樣直接地釋放可能造成 use-after-free 的錯誤。
想解決這個問題,比較優雅甚至有效的做法可能是要引入 reference counting 或 hazard pointer 一類的機制,不過這並不容易,且可能引入更多複雜的程式碼。如果考慮更簡單的方式,我們可以先不釋放對應記憶體,只是將其記錄下來並留待所有 reader thread 都結束再釋放就好。這做法雖然暴力但卻有效保證了正確性,反過來缺點就是可能有導致記憶體不足的隱憂。對於此議題,仍然在思考是否有更高效但又足夠簡潔的解決辦法。
### 解釋 Utilization
想要了解在程式中輸出的 utilization 的涵義,我們首先從相關變數之更新方式下手。
```cpp
static void cmap_insert__(struct cmap_impl *impl, struct cmap_node *node)
{
size_t i = node->hash & impl->max;
node->next = impl->arr[i].first;
if (!impl->arr[i].first)
atomic_fetch_add(&impl->utilization, 1);
atomic_store(&impl->arr[i].first, node);
}
```
相關的變數 `utilization` 是在當 hashmap 的一個 bucket 從無到有對應的 entry 的情況下,會被加一。
```cpp
double cmap_utilization(const struct cmap *cmap)
{
struct rcu *impl_rcu = rcu_acquire(cmap->impl->p);
struct cmap_impl *impl = rcu_get(impl_rcu, struct cmap_impl *);
double res = (double) atomic_load(&impl->utilization) / (impl->max + 1);
rcu_release(impl_rcu);
return res;
}
```
而實際輸出的數值是 `cmap_utilization` 的回傳值,後者是將 `utilization` 除以 bucket 的總數量。
:::danger
有點無法解釋該數值的用途,不理解處是變數的 `utilization` 在 `cmap_remove` 時也沒有對應更新?
:::
### Lock-free Cuckoo Hash
https://eourcs.github.io/LockFreeCuckooHash/