# Linux 核心專題: 並行化的合併排序及其改進
> 執行人: youjiaw
> [專題解說影片](https://youtu.be/7kZxFtMPFd8)
## 任務簡介
回顧〈[並行程式設計講座](https://hackmd.io/@sysprog/concurrency)〉,運用其中的手法改寫經典的合併排序 (參見[第九週測驗題](https://hackmd.io/@sysprog/linux2024-quiz9)),逐步提升並行表現並予以量化分析。
## TODO: 回顧〈[並行程式設計講座](https://hackmd.io/@sysprog/concurrency)〉並紀錄問題
> 對照 [Linux 核心專題: 並行程式設計教材修訂](https://hackmd.io/@sysprog/H1dCuDxQR)
1. [〈並行程式設計: Atomics 操作〉](https://hackmd.io/@sysprog/concurrency-atomics) 提到 `alignas` 可以避免 false sharing 或增進記憶體的存取效率,但是我查閱 Linux 核心原始程式碼卻沒找到此操作。
:::success
隨後發現 Linux 核心是使用 `attribute((aligned(sizeof(long))))` 這種對齊方式。
> 參照 [concurrent-programs](https://github.com/sysprog21/concurrent-programs) 裡頭的程式碼。
:::
2. [Lock-free Thread Pool]() 的內文提到「若偵測到目前待執行的工作數目小於 worker thread 總數,則要透過 condition variable 喚醒可能處於等待狀態的 worker thread。」,為什麼不是大於?
## TODO: 並行化的合併排序
> [第九週測驗第一題](https://hackmd.io/@sysprog/linux2024-quiz9)藉由 [fork(2)](https://man7.org/linux/man-pages/man2/fork.2.html) 和 [mmap(2)](https://man7.org/linux/man-pages/man2/mmap.2.html) 系統呼叫來實作並行版本的合併排序,以此為出發,探討可能的實作和改進手法。過程中務必確保執行結果正確,且維持 stable sorting 的特性。
測驗題中的 `merge_sort` 函式如下:
```c
void merge_sort(int *arr, const int len)
{
if (len == 1)
return;
const int mid = len / 2;
const int left_len = len - mid;
const int right_len = mid;
/* If forked too often, it gets way too slow. */
if (fork_count < 5) {
pid_t pid = fork();
fork_count++;
if (pid == 0) { /* Child process */
merge_sort(arr, left_len);
exit(0);
}
/* Parent process */
merge_sort(arr + left_len, right_len);
waitpid(pid, NULL, 0);
} else {
merge_sort(arr, left_len);
merge_sort(arr + left_len, right_len);
}
memcpy(arr, merge(arr, left_len, arr + left_len, right_len),
len * sizeof(int));
}
```
## TODO: 嘗試改進並進行充分的量化分析
> 考慮以下手法:
> * 實作對 cache 更友善的合併排序
> * 引入 coroutine 和多執行緒,研讀[並行程式設計: 排程器原理](https://hackmd.io/@sysprog/concurrency-sched)
> * 避免遞迴,參考 Linux 核心原始程式碼的資料排序策略
> * 降低 fork 的成本,例如預先 fork (pre-forking)
> * 實作 workqueue,善用[第九週測驗第三題](https://hackmd.io/@sysprog/linux2024-quiz9)提及的 work stealing
> * 降低記憶體操作的成本,搭配課程教材,理解 Linux 核心記憶體管理
### 開發環境
```shell
$ gcc --version
gcc (Ubuntu 11.4.0-1ubuntu1~22.04) 11.4.0
$ lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Address sizes: 46 bits physical, 48 bits virtual
Byte Order: Little Endian
CPU(s): 20
On-line CPU(s) list: 0-19
Vendor ID: GenuineIntel
Model name: Intel(R) Core(TM) i9-10900X CPU @ 3.70GHz
CPU family: 6
Model: 85
Thread(s) per core: 2
Core(s) per socket: 10
Socket(s): 1
Stepping: 7
CPU max MHz: 4700.0000
CPU min MHz: 1200.0000
BogoMIPS: 7399.70
Virtualization features:
Virtualization: VT-x
Caches (sum of all):
L1d: 320 KiB (10 instances)
L1i: 320 KiB (10 instances)
L2: 10 MiB (10 instances)
L3: 19.3 MiB (1 instance)
NUMA:
NUMA node(s): 1
NUMA node0 CPU(s): 0-19
```
### 實作 workqueue 及 pre-forking
參考[〈案例: Thread Pool 實作和改進〉](https://hackmd.io/@sysprog/concurrency/%2F%40sysprog%2Fconcurrency-thread-pool#%E4%B8%A6%E8%A1%8C%E7%A8%8B%E5%BC%8F%E8%A8%AD%E8%A8%88-Thread-Pool-%E5%AF%A6%E4%BD%9C%E5%92%8C%E6%94%B9%E9%80%B2)的 [tpool.c](https://github.com/sysprog21/concurrent-programs/blob/master/tpool/tpool.c),加入 thread pool。
首先更改 `main` 函式,在呼叫 `merge_sort()` 之前先於 thread pool 中建立指定數量的 thread,並在排序完成後將資源釋放,這樣的做法可以減少動態建立和釋放的開銷。
```diff
+ pool = tpool_create(4);
merge_sort(arr, N_ITEMS);
+ tpool_join(pool);
```
`merge_sort()` 函式先將任務依照中間點,切分為左半邊與右半邊,並以 `arg` 結構體記錄任務的變數,接著使用 `tpool_apply()` 將各任務要執行的函式與變數加入 workqueue 中。
最後等待任務執行完,就釋放相關資源,並呼叫 `merge()`。
```c=
static void merge_sort(int *arr, const int len)
{
if (len == 1)
return;
const int mid = len / 2;
const int left_len = len - mid;
const int right_len = mid;
arg left_task = {arr, left_len};
arg right_task = {arr + left_len, right_len};
tpool_future_t left_future = tpool_apply(pool, merge_sort_task, &left_task);
tpool_future_t right_future =
tpool_apply(pool, merge_sort_task, &right_task);
tpool_future_get(left_future, 0);
tpool_future_destroy(left_future);
tpool_future_get(right_future, 0);
tpool_future_destroy(right_future);
memcpy(arr, merge(arr, left_len, arr + left_len, right_len),
len * sizeof(int));
}
```
其中,新增的 `arg` 結構體與 `merge_sort_task()` 函式如下。
```c
typedef struct __arg{
int *data;
const int len;
} arg;
void *merge_sort_task(arg *task) {
int *arr = (*task).data;
int len = (*task).len;
merge_sort(arr, len);
}
```
但是在這樣的更改下,程式無法順利執行完成。經過測試發現,所有 threads 都會在卡在 `merge_sort()` 第 16 行的 `tpool_future_get()` 中,呼叫 ` pthread_cond_wait(&future->cond_finished, &future->mutex)` 等待子任務的完成,卻一直沒有被喚醒。
進一步分析發現,因為 thread 是透過 `jobqueue_fetch()` 來取得並執行任務(下方第 1 行),也就是 `merge_sort()`,而上述等待情況所對應的喚醒函式 `pthread_cond_broadcast()` 需要等 `merge_sort()` 執行完成後才會被呼叫(下方第 11 行),但 `merge_sort()` 本身又在等待子任務完成,形成了所有 thread 執行的任務都在等待子任務完成,這種無法結束的等待情況。
```c=
void *ret_value = task->func(task->arg);
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_DESTROYED) {
pthread_mutex_unlock(&task->future->mutex);
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
} else {
task->future->flag |= __FUTURE_FINISHED;
task->future->result = ret_value;
pthread_cond_broadcast(&task->future->cond_finished);
pthread_mutex_unlock(&task->future->mutex);
}
```
因為目前沒有想到其他的解決辦法,所以直接將 `merge_sort()` 改為非遞迴,來避免任務與子任務之間的等待狀況。
### 更改為非遞迴的合併排序
非遞迴版本中,要交給 thread 執行的任務就沒有 `merge_sort()` 了,只有 `merge()`,main thread 會在 `merge_sort()` 中提前切分好所有要進行 `merge()` 的任務,並將這些任務加入 workqueue,更改後的 `arg` 結構體與 `merge_task()` 函式如下。
:::danger
減少參數,例如 `mid`。
> 謝謝老師的提醒,在目前的程式碼中 `mid` 無法直接由結構體的其他參數算出,需要使用 `merge_sort()` 中的 `current_size`。
:::
```c
typedef struct __arg{
int *arr;
int left;
int mid;
int right;
} arg;
void *merge_task(void *args)
{
arg *task = (arg *)args;
memcpy(task->arr + task->left, merge(task->arr, task->left, task->mid, task->right), (task->right - task->left + 1) * sizeof(int));
return NULL;
}
```
`merge_sort()` 中,等待 `future` 任務執行完成與釋放資源的函式,可以選擇放在兩個迴圈之間,或是放在最內層的迴圈。
若是如下方放在迴圈外面,除了 `futures` 陣列大小難以決定以外,一次性的把所有任務加入,也會導致執行順序無法確定,造成結果錯誤的狀況。
```c
void merge_sort(int *arr, int left, int right)
{
int current_size;
int left_start;
tpool_future_t futures[N_ITEMS];
int i = 0;
for (current_size = 1; current_size <= right - left;
current_size = 2 * current_size) {
for (left_start = left; left_start < right;
left_start += 2 * current_size) {
int right_end = MIN(left_start + 2 * current_size - 1, N_ITEMS - 1);
int mid = MIN(left_start + current_size - 1, N_ITEMS - 1);
arg *merge_arg = malloc(sizeof(arg));
merge_arg->arr = arr;
merge_arg->left = left_start;
merge_arg->mid = mid;
merge_arg->right = right_end;
futures[i++] = tpool_apply(pool, merge_task, merge_arg);
}
}
for (int j = 0; j < i; j++) {
tpool_future_get(futures[j], 0);
tpool_future_destroy(futures[j]);
}
}
```
如下方 `N_ITEMS = 4` 的例子,workqueue 會被加入第一層的兩個任務,及第二層的一個任務,假設 thread 數量為 3,且各取得了一個任務,這時如果第二層的任務比任何一個第一層的任務早完成,排序結果就可能會出錯。
```
- 正確例子
(4) (3) (2) (1)
\ / \ /
(3, 4) (1, 2)
\ /
(1, 2, 3, 4)
- 錯誤例子(先排前兩個,再排前兩個和後兩個,最後排後兩個)
(4) (3) (2) (1)
1st merge: \ /
(3, 4) (2, 1) -> 不變
2nd merge: \ /
(2, 1, 3, 4) -> 假設 (2, 1) 已排序過,所以按順序加入
3rd merge: \ / -> 排序 arr[2], arr[3]
(2, 1, 3, 4)
```
如果將函式放在兩個迴圈之間,相當於每次都只先加入一層的任務,並等待這些任務執行完畢,再繼續加入下一層的任務,直到結束。這樣就可以保證上述不確定的情況被排除。
:::danger
注意書寫規範:
* 使用 lab0 規範的程式碼書寫風格,務必用 clang-format 確認一致
> 已修正程式碼,謝謝老師。
:::
```c
void merge_sort(tpool_t pool, int *arr, int left, int right, int size)
{
int current_size;
int left_start;
for (current_size = 1; current_size <= right - left;
current_size = 2 * current_size) {
int len = size / (2 * current_size) + 1;
tpool_future_t futures[len];
int i = 0;
for (left_start = left; left_start < right;
left_start += 2 * current_size) {
int right_end = MIN(left_start + 2 * current_size - 1, size - 1);
int mid = MIN(left_start + current_size - 1, size - 1);
arg *merge_arg = malloc(sizeof(arg));
merge_arg->arr = arr;
merge_arg->left = left_start;
merge_arg->mid = mid;
merge_arg->right = right_end;
futures[i++] = tpool_apply(pool, merge_task, merge_arg);
}
for (int j = 0; j < i; j++) {
tpool_future_get(futures[j], 0);
tpool_future_destroy(futures[j]);
}
}
}
```
若是將函式放在最內層的迴圈內,則是代表每加入一個任務,就等待他做完才加入下一個任務,失去了並行的能力。
但是我認為這種方法可以讓剛加入的任務還在 memory hierachy 上層時,就先進行合併,所以對 cache 較友善,而且也不需要多餘的記憶體空間來存放 `futures` 陣列,將在後續效能評估進行討論。
:::danger
不用「猜想」,用效能分析工具來驗證你的想法。
> 好的,以補在下方的效能評估。
:::
```c
for (current_size = 1; current_size <= right - left;
current_size = 2 * current_size) {
for (left_start = left; left_start < right;
left_start += 2 * current_size) {
int right_end = MIN(left_start + 2 * current_size - 1, N_ITEMS - 1);
int mid = MIN(left_start + current_size - 1, N_ITEMS - 1);
arg *merge_arg = malloc(sizeof(arg));
merge_arg->arr = arr;
merge_arg->left = left_start;
merge_arg->mid = mid;
merge_arg->right = right_end;
tpool_future_t future = tpool_apply(pool, merge_task, merge_arg);
tpool_future_get(future, 0);
tpool_future_destroy(future);
}
}
```
### 排序測試和效能評估機制
使用以下兩種方式進行評估:
1. 執行時間
原先參考論文〈[Dude, is my code constant time?](https://eprint.iacr.org/2016/1123.pdf)〉,利用 CPU 提供的週期計數器 TSC register 作為評估執行時間的方法,但是 [Game Timing and Multicore Processors](https://learn.microsoft.com/en-us/windows/win32/dxtecharts/game-timing-and-multicore-processors) 提及在多處理器和雙核系統上不保證計數器在處理器核之間同步處理,因此使用 rdtsc 需要將待測試的程式限制在一個 CPU 上 (set CPU affinity)以此減少誤差。
因此,我決定使用 `clock_gettime()` 來測量,避免 cpu pinning 以及同步的問題。
> This use of RDTSC for timing suffers from these fundamental issues:
Discontinuous values. Using RDTSC directly assumes that the thread is always running on the same processor. Multiprocessor and dual-core systems do not guarantee synchronization of their cycle counters between cores.
...
2. 使用 perf 工具進行效能分析。
<!-- 3. 記憶體使用量
使用 valgrind 工具中的 massif 來測量記憶體使用量。 -->
以下測試皆固定建立 10 個 thread。
首先比較上方提到的,將等待任務執行完成與釋放資源的函式放在兩個迴圈之間(以下簡稱 A 方法),或是最內層的迴圈(以下簡稱 B 方法)。
:::warning
對照 [CS:APP 第 5 章](https://hackmd.io/@sysprog/CSAPP-ch5)
:::
**執行時間**
使用 `clock_gettime()` 測量 10 種陣列大小,從 100,000 筆資料開始,每次加 100,000 一直到 1,000,000,並且每個大小都會重複執行 5 次,最後再取執行時間的平均。
可以發現 A 方法在所有資料大小的平均執行時間都比 B 方法快 3 倍左右。
![compare](https://hackmd.io/_uploads/SJaB_-i8A.png)
**Perf**
使用 `perf stat` 分析
觀察下方結果可以發現,有如先前所推測的,B 方法雖然執行速度較慢、花費的 instructions, cycles 比較多,但是 cache-misses 比例從 3.66% 下降到了 0.21%。
而我原先認為 B 方法因為一加入任務就執行,所以 L1 data cache 的 misses 比例會比較小,但與測試的結果不相符,兩個方法的比例是差不多的。
經過查閱發現,在我的開發環境中,每個 cpu core 都有獨立的 L1, L2 cache,L3 cache 才有共享。因此,若是負責加入任務的 main thread 與執行任務的 thread 是由不同核執行,這些資料就不會被共享到。
| Method | instructions | cycles | cache-references | cache-misses | L1-dcache-loads | L1-dcache-loads-misses |
| ------ | ------------- | -------------- | ---------------- | ----------------------------------- | --------------- | ---------------------------------------------- |
| A 方法 | 950,1385,8505 | 1566,6598,5158 | 11,3224,7852| 4139,2926 (3.66% of all cache refs) | 258,8940,6382 | 16,7917,5049 (6.49% of all L1-dcache accesses) |
| B 方法 | 4812,2255,9812| 6422,3224,9672 |41,6341,6998|860,1043 (0.21% of all cache refs)|1154,0040,5239|73,3492,6426 (6.36% of all L1-dcache accesses)|
而 context-switches 次數也相差 5 倍,所以整體而言,A 方法無論是在執行時間,還是資源的消耗都明顯優於 B 方法。
| Method | context-switches | branch-misses | migrations |
| ------ | ---------------- | ------------- | ---------- |
| A 方法 |1286,8241|3,1163,8462|1,5208|
| B 方法 |6420,2873|11,1517,4417|2,9538|
#### 比較原程式碼與使用 A 方法的程式碼
原程式碼的 fork_count 在達到限制次數之前,每次 fork 都會增加當前的 process 數量,代表最多有 $2^n$ 個 process 同時運行。所以我認為可以在 A 方法建立相同數量的 thread 來進行比較。
原程式碼:
| fork_count | instructions | cycles | cache-references | cache-misses | context-switches | branch-misses | migrations | page-faults |
| ---------- | ----------------------------------- | ------------ | ---------------- | ------------ | ---------------- | ------------- | ---------- | ----------- |
| 2 | 69,9744,1954 (1.58 insn per cycle) | 44,2110,0134 | 836,6862 | 353,7818 (42.28% of all cache refs)| 11 | 5417,7829 | 4 | 13,5404 |
| 3 | 70,0072,1959 (1.59 insn per cycle)| 43,9481,5979 | 931,2532 | 361,0411 (38.77% of all cache refs)| 19 | 5398,8739 | 10 | 13,5530 |
| 4 | 70,0375,8024 (1.58 insn per cycle)| 44,3996,1195 | 895,0482 | 361,2487 (40.36% of all cache ref)| 24 | 5403,8126 | 10 | 13,5783 |
A 方法:
| thread_count | instructions | cycles | cache-references | cache-misses | context-switches | branch-misses | migrations | page-faults |
| ------------ | ----------------------------------- | -------------- | ---------------- | ------------ | ---------------- | ------------- | ---------- | ----------- |
| 4 | 385,1146,9270 (0.73 insn per cycle) | 528,7587,6798 | 5,5372,3054 | 4634,6027 (8.37% of all cache refs) | 251,5514 | 1,1819,8786 | 2262 | 19,6335 |
| 8 | 690,0228,5445 (0.62 insn per cycle) | 1107,2844,8769 | 8,6049,3112 | 4758,8317 (5.53% of all cache refs) | 644,9611 | 2,0449,7558 | 9244 | 19,6417 |
| 16 | 1216,9900,6744 (0.56 insn per cycle) | 2178,4866,1794 | 14,9976,0521 | 5521,1937 (3.68% of all cache refs) | 1326,1207 | 3,2319,3069 | 15,9792 | 19,6589 |
經過測試發現,原程式碼的執行時間和使用的資源並不會隨著 fork 數量的增加而有顯著的改變,而 A 方法卻變成所有開銷都會隨著 thread 數量增加而變多。
但是在 A 方法中,cache misses 佔 cache-references 的比例是隨著 thread 數量增加而變少的,而且為原程式碼的 $1/5$ 至 $1/10$ 倍。這代表在記憶體的存取上,使用 thread pool 改進是有效的,可以獲得更好的 locality of reference。
雖然 cache miss 比例減少了,但程式總體的執行時間和 CPU 資源使用仍然較高,代表效能瓶頸可能在其他地方。
接著用 `perf record` 進行分析,僅列出部分輸出進行討論。
原程式碼:
```
# Total Lost Samples: 0
# Samples: 4K of event 'cycles'
# Event count (approx.): 4313695937
# Overhead Command Shared Object Symbol
# ........ .......... ................. ....................................
64.61% merge_sort merge_sort [.] merge
5.28% merge_sort libc.so.6 [.] _int_malloc
4.50% merge_sort merge_sort [.] merge_sort
3.32% merge_sort libc.so.6 [.] __memmove_evex_unaligned_erms
2.61% merge_sort libc.so.6 [.] __libc_calloc
2.23% merge_sort [kernel.kallsyms] [k] asm_exc_page_fault
1.87% merge_sort merge_sort [.] rand_next
1.63% merge_sort merge_sort [.] main
1.06% merge_sort merge_sort [.] iabs
1.01% merge_sort [kernel.kallsyms] [k] __rcu_read_lock
…
```
A 方法:
```
# Total Lost Samples: 0
# Samples: 94K of event 'cycles'
# Event count (approx.): 37079673786
# Overhead Command Shared Object Symbol
# ........ ....... ................. ...........................................
18.07% main [kernel.kallsyms] [k] native_queued_spin_lock_slowpath
5.16% main [kernel.kallsyms] [k] psi_group_change
4.68% main libc.so.6 [.] pthread_cond_wait@@GLIBC_2.3.2
3.94% main [kernel.kallsyms] [k] futex_wake
3.72% main [kernel.kallsyms] [k] _raw_spin_lock
3.43% main libc.so.6 [.] pthread_mutex_lock@@GLIBC_2.2.5
2.47% main [kernel.kallsyms] [k] futex_q_lock
2.11% main libc.so.6 [.] __GI___lll_lock_wait
1.82% main [kernel.kallsyms] [k] __get_user_nocheck_4
1.80% main [kernel.kallsyms] [k] llist_add_batch
1.80% main main [.] merge
1.68% main main [.] jobqueue_fetch
1.65% main [kernel.kallsyms] [k] ttwu_queue_wakelist
1.59% main [kernel.kallsyms] [k] __futex_unqueue
…
```
原程式碼的 `merge` 函數佔用了 64.61% 的 CPU 時間,是主要的效能瓶頸,但是在 A 方法中,`merge` 函數僅佔 1.8%,佔用最多的是 `native_queued_spin_lock_slowpath`,還有其他與同步相關的函式。
代表 CPU 花大量時間在處理同步機制,這個結果也可以對應到上方的 instructions per cycle,隨著 thread 數量越多 instructions per cycle 逐漸減少。
#### 分析結果
原始 fork 版本:
1. Fork 建立的是獨立的 process,有各自的記憶體空間,減少了資源競爭。
2. Child process 結束後會直接退出,減少了長期運行的開銷。
我的版本:
1. Threads 共享同一個記憶體空間,可能導致更多的資源競爭和同步開銷。
2. Threads 直到程式結束前才會全部退出,可能導致額外的管理開銷。