## Linux 核心專題:排程器原理
> 執行人: jeremy90307
## 解說影片
錄影解說 : [ Linux 核心專題:排程器原理 (youtube)](https://youtu.be/CPaGJcSDVtY)
## 任務簡介
強化對並行程式設計的認知。
### Reviewed by `eleanorLYJ`
討論 false sharing 的敘述不夠精確,應強調 CPU0 與 CPU1共享了同一塊的 cacheline,另外還有因為 cache coherence 機制,才導致效能降低
> 圖中有兩執行緒,當 CPU0 不斷的改變變數 X , CPU1 的 cache 該變數會變為無效,即使他並沒有修改此變量。這會導致頻繁的高速緩存無效和重新載入操作,從而降低性能
> 我確實在 false sharing 這裡解釋不夠完善,我會再將其補進我的另個[筆記](https://hackmd.io/@sysprog/concurrency/%2F%40sysprog%2Fconcurrency-concepts),但在這裡我主要是放上我在閱讀教材時遇到的問題,感謝同學的指教。
> [name=jeremy90307]
## TODO: 閱讀〈並行和多執行緒程式設計〉系列講座並紀錄問題和提出改進
因系列講座內容較多,因此另外開個筆記紀錄 -> [研讀〈並行和多執行緒程式設計〉系列講座](https://hackmd.io/@jeremytsai/concurrent-programs)
:::danger
集中於本頁面,專注於「紀錄問題」和「改進/討論」,已在教材出現的內容就不用建立副本
:::
### 〈[概念](https://hackmd.io/@sysprog/concurrency/%2F%40sysprog%2Fconcurrency-concepts)〉
**多工處理**
- 循序式程式(sequential program)
- 時間與程式的執行同一方向,在一特定時間僅有一項工作進行
- 前景/背景式程式(foreground / background program)
- 具備中斷
- 須確保資料的一致性問題
- 多工程式 (multi-tasking program)
- 並行程式設計,在系統中若包含多個 CPU 或不對等 (heterogeneous) 的執行單元時,必須將工作分散至各執行單元同時執行
- 理想情況上若各子工作可以完全獨立運行,則系統將可達到其最高效率。但各執行單元間彼此仍需共用系統資料(如記憶體、I/O 等)
:::info
問題 1
不對等 (heterogeneous) 的執行單元,代表的是什麼意思? 在系統中會有不對等是指優先順序嗎?
問題 2
同進進行的說法是否有瑕疵?因為在並行程式中使用快速切換來達到看似同時切換而已。
:::
我整理多重程式、多工、多執行緒、多重處理的表格如下
| | Multiprogramming 多重程式| Multitasking 多工| Multithreading 多執行緒| Multiprocessing 多重處理|
| -------- | -------- | -------- | - | - |
| 定義 | 在單一 CPU 上執行多個程式 | 在單一 CPU 上執行多個工作 | 在單一工作上運行多個執行緒 | 在多個 CPU 上運行多個行程 |
| 共享資源 | 資源(CPU、記憶體)在程式之間共享 | 資源(CPU、記憶體)在工作之間共享 | 資源(CPU、記憶體)在執行緒之間共享 | 每個行程都有自己一組的資源(CPU、記憶體) |
| 排程 | 使用循環或基於優先權的排程為程式分配 CPU 時間 | 使用基於優先權或時間分割的調度為工作分配 CPU 時間 | 使用基於優先權或時間分割的調度為執行緒分配 CPU 時間 | 每個行程可以有自己的排程演算法 |
| 記憶體管理 | 每個程式都有自己的記憶體空間 | 每個工作都有自己的記憶體空間 | 執行緒在工作內共享記憶體空間| 每個行程都有自己的記憶體空間 |
| 工作切換 | 需要工作切換才能在程式之間切換 | 需要工作切換才能在工作之間切換 | 需要工作切換才能在執行緒之間切換 | 需要工作切換才能在行程之間切換 |
| Inter-Process Communication (IPC) | 使用訊息傳遞或共享記憶體進行 IPC | 使用訊息傳遞或共享記憶體進行 IPC | 使用 IPC 的執行緒同步機制(例如 lock、semaphores) | 使用 IPC 的進程間通訊機制(例如 pipes、sockets) |
:::info
coroutine 的實作是否涵蓋多工與多執行緒,雖然單一 CPU 上執行多個工作,但工作內容(如 `qsort_r` )不是應該確保 thread safe 的情況嗎?
:::
下圖為單一 CPU 同一時間只執行一個任務,但利用快速切換的方式,看似一次執行多的任務。
![image](https://hackmd.io/_uploads/ryQU-7tLC.png)
![image](https://hackmd.io/_uploads/Skq4-QYU0.png)
:::info
頻繁的快速切換帶來的優勢?
:::
### 〈[排程器原理](https://hackmd.io/@sysprog/concurrency/%2F%40sysprog%2Fconcurrency-sched#%E4%B8%A6%E8%A1%8C%E7%A8%8B%E5%BC%8F%E8%A8%AD%E8%A8%88-%E6%8E%92%E7%A8%8B%E5%99%A8%E5%8E%9F%E7%90%86)〉
### 〈[Atomics 操作](https://hackmd.io/@sysprog/concurrency/%2F%40sysprog%2Fconcurrency-atomics)〉
簡述 race condition 的問題
假設兩執行緒各自將全域變數的值 +1
| Thread 1 | Thread 2 | | value |
| -------- | -------- | -------- |-------- |
| | | |0 |
| read | | <- |0 |
| increase | | |0 |
| write back | | -> |1 |
| | read | <- |1 |
| | increase | |1 |
| | write back | -> |2 |
若沒有使用 lock 或是同步機制,可能遇到下面問題
| Thread 1 | Thread 2 | | value |
| -------- | -------- | -------- |-------- |
| | | |0 |
| read | | <- |0 |
| | read | <- | |
| increase | | |0 |
| | increase | |0 |
| write back | | -> |1 |
| | write back | -> |1 |
結果並非預期的 2 而是 1。
上述問題的解決辦法有使用
- 阻塞式同步 : mutex lock
- 須考慮到 priority inversion (優先權反轉)的問題
- 解決方法:Priority Inheritance 、 Priority Ceiling
- 非阻塞式同步 : Atomic 指令操作,比如 compare-and-swap (CAS)
```c
int CAS(int *reg, int old, int new)
{
ATOMIC();
int tmp = *reg;
if(tmp == old)
*reg = new;
END_ATOMIC();
return tmp;
}
```
:::info
若有 Atomic 指令來確保資料的一致性,為何還需要用到 mutex lock 等方式?能非阻塞就不會影響到其他執行緒的進行不是更有效率嗎?
:::
**false sharing (偽共享)**
![image](https://hackmd.io/_uploads/BkqEa7KU0.png)
圖中有兩執行緒,當 CPU0 不斷的改變變數 X , CPU1 的 cache 該變數會變為無效,即使他並沒有修改此變量。這會導致頻繁的高速緩存無效和重新載入操作,從而降低性能。
:::info
問題
如何做到抑制 false sharing ? 共筆中有提到使用對齊的解決方法,但為何對齊 16 就可以避免 false sharing ?
考慮以下程式碼:
```c
alignas(16) int x;
printBinary(&x);
printf("%ld\n", alignof(x));
```
對應的執行輸出:
```
_0000_0000_0000_0000_0000_0000_1010_1101_1001_0100_1111_1111_1111_1000_1101_0000
16
```
因為 int x 變數的地址會對齊 16 (單位:位元組_,亦即 $2^4$,在其二進位表示中,低位元處 4 個位元皆為 0,此處使用 alignas 以避免 false sharing。
:::
TODO : 尚未閱讀 Memory Ordering 和 Barrier、處理器架構和其 Memory Order、wait-free & lock-free
### 〈[POSIX Thread](https://hackmd.io/@sysprog/concurrency/%2F%40sysprog%2Fposix-threads)〉
在此章節主要跟著教材學習 pthread 相關程式設計。
> [研讀〈並行和多執行緒程式設計〉_POSIX Thread](https://hackmd.io/nhcZvdS5QfOcShxU37MbQg?view#%E3%80%88POSIX-Thread%E3%80%89) 連結有相關實作說明
### [〈實作輕量級的 Mutex Lock〉](https://hackmd.io/@sysprog/concurrency/%2F%40sysprog%2Fconcurrency-mutex)
在**測試和驗證**中補充了些筆記即註解,使這個實作更加易懂 :arrow_down:
> [mutex/example/main.c](https://github.com/jeremy90307/concurrent-programs/blob/master/mutex/example/main.c)
模擬情境:
給定 16 個工作節點 $n_0$~$n_{16}$ ,他們彼此間具備從屬關係的: $n_k$ 的親代節點是 $n_{k-1}$ 。每個節點需要都等待親代節點就緒 (ready) 方可進行下一階段的工作 ( $n_0$ 沒有親代節點,因此無此限制)。
16 個節點共享一個 `clock`,`clock` 從 tick = 1 開始累計,節點在每個 tick 都有必須完成的事:在偶數 tick 時想像成是完成階段任務,可改變自身狀態為就緒並通知其子節點繼續後續任務,而在奇數 tick 時則推動時間讓 tick += 1。
結構體
```c
struct clock {
mutex_t mutex;
cond_t cond;
int ticks;
};
struct node {
struct clock *clock; // Each node points to a shared clock
struct node *parent; // Point to the parent node
mutex_t mutex;
cond_t cond;
bool ready; // Proceed to the next node only when ready is true
};
```
- `clock` 中的 `ticks` 決定是否繼續下一步驟,同步上使用 condition variable + mutex 機制去通知此事件,且通知應該是用 broadcast 方式讓所有正在等待的執行緒得知
> 在 POSIX_Thread 中 condition variable 的實作中因只有一個執行緒在等待,使用 `pthread_cond_signal`
- `node` 中 `ready` 主要讓結點能進行下個工作,因此當親代節點就緒時,主動通知此事。同樣需要 condition variable + mutex 機制,但以 signal 方式通知子節點就好。
**`thread_func(void *ptr)`**
```c
static void *thread_func(void *ptr)
{
struct node *self = ptr;
bool bit = false;
for (int i = 1; clock_wait(self->clock, i); ++i) {
printf("Thread [%c] | i : %d\n", self->name, i);
if (self->parent){
printf("Thread [%c] wait parent\n", self->name);
node_wait(self->parent);
}
if (bit) {
printf("Thread [%c] send signal\n", self->name);
node_signal(self);
} else {
printf("Thread [%c] trigger clock\n", self->name);
clock_tick(self->clock);
}
bit = !bit;
}
node_signal(self);
return NULL;
}
```
**`main(void)`**
```c
int main(void)
{
struct clock clock;
clock_init(&clock);
#define N_NODES 3
struct node nodes[N_NODES];
node_init(&clock, NULL, &nodes[0]);
for (int i = 1; i < N_NODES; ++i)
node_init(&clock, &nodes[i - 1], &nodes[i]);
printf("\nParent releationship : NULL ");
for(int i = 0; i < N_NODES; ++i)
printf(" -> [%c] ", nodes[i].name);
printf("\n");
pthread_t threads[N_NODES];
for (int i = 0; i < N_NODES; ++i) {
if (pthread_create(&threads[i], NULL, thread_func, &nodes[i]) != 0)
return EXIT_FAILURE;
}
printf("Tick start~\n");
clock_tick(&clock);
clock_wait(&clock, 1 << N_NODES);
clock_stop(&clock);
printf("\nTick stop~\n");
for (int i = 0; i < N_NODES; ++i) {
if (pthread_join(threads[i], NULL) != 0)
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
```
`main` 執行緒呼叫第一個 `clock_tick` 來讓 `tick` 變為 `1`,這樣其他執行緒就可以開始根據 `tick` 逐步進行。而這裡的 `clock_wait` 會一直等待 `tick` 到 `1 << N_NODES` 之後再用 `clock_stop` 來讓節點的執行緒得以結束。
稍微改寫 [main.c](https://github.com/jeremy90307/Posix_thread/blob/master/mutex/example/main.c) 使其更容易了解每個執行緒的進行過程,以下為輸出結果:
:::info
在執行程式時會出現下面錯誤,想請問為何會有這樣的問題?
```
FATAL: ThreadSanitizer: unexpected memory mapping 0x5c9bd4d2b000-0x5c9bd4d4b000
```
輸入 `sudo sysctl vm.mmap_rnd_bits=28` 來解決問題
> 參考 [FATAL: ThreadSanitizer: unexpected memory mapping when running on Linux Kernels 6.6+](https://stackoverflow.com/questions/77850769/fatal-threadsanitizer-unexpected-memory-mapping-when-running-on-linux-kernels)
:::
```
Parent releationship : NULL -> [A] -> [B] -> [C]
Tick start~
============clock_tick() tick : 1============
Thread [C] | i : 1
Thread [C] wait parent
Thread [B] | i : 1
Thread [B] wait parent
Thread [A] | i : 1
Thread [A] trigger clock
============clock_tick() tick : 2============
Thread [A] | i : 2
Thread [A] send signal
Thread [A] becomes not ready.
Thread [B] trigger clock
============clock_tick() tick : 3============
Thread [B] | i : 2
Thread [B] wait parent
Thread [A] | i : 3
Thread [A] trigger clock
============clock_tick() tick : 4============
Thread [A] | i : 4
Thread [A] send signal
Thread [A] becomes not ready.
Thread [B] send signal
Thread [B] | i : 3
Thread [B] becomes not ready.
Thread [C] trigger clock
============clock_tick() tick : 5============
Thread [B] wait parent
Thread [A] | i : 5
Thread [A] trigger clock
Thread [C] | i : 2
Thread [C] wait parent
============clock_tick() tick : 6============
Thread [A] | i : 6
Thread [A] send signal
Thread [A] becomes not ready.
Thread [B] trigger clock
============clock_tick() tick : 7============
Thread [B] | i : 4
Thread [B] wait parent
Thread [A] | i : 7
Thread [A] trigger clock
============clock_tick() tick : 8============
Thread [A] | i : 8
Thread [A] send signal
Tick stop~
Thread [A] becomes not ready.
Thread [B] send signal
Thread [B] becomes not ready.
Thread [C] send signal
```
- 一開始 main 執行緒 `clock_tick` 先將 tick 增加 1 接著進入 `clock_wait` 中進行等待,一直到 `clock->ticks` 等於 $2^{N_{NODES}}$
- 假設 $N_{NODES}$ 為 3 ,分別有 `Thread [A]`、`Thread [B]`、`Thread [C]` 來執行這三個 node ,且他們之間的關係為 `NULL -> [A] -> [B] -> [C] `
- 接著每個 node 進入 `thread_func` ,且預設的 `bit` 為 `false` ,每次迭代都會進行反轉,使其執行不同工作內容
- ==Thread [A] 因沒有親代節點關係,跳過等待親代節點 ready 的過程,也因此在每次 tick 增加後都能看 Thread [A] 不斷的執行==
```
============clock_tick() tick : 2============
Thread [A] | i : 2
Thread [A] send signal
Thread [A] becomes not ready.
Thread [B] trigger clock
```
在 tick : 2 中
- `Thread [A]` 發送節點的訊號喚醒正在等待的 `Thread [B]` (喚醒時 `Thread [A]` 轉變為 ready 因此 `Thread [B]` 才能執行工作,此時 `Thread [A]` 交棒給 `Thread [B]` 後再次轉變為 not ready)
- 此時的 `Thread [B]` 因第一次執行工作 `bit = false` ,所以進行 tick 加一的工作
```
============clock_tick() tick : 3============
Thread [B] | i : 2
Thread [B] wait parent
Thread [A] | i : 3
Thread [A] trigger clock
```
在 tick : 3 中
- `Thread [A]` 尚未 ready ,因此 `Thread [B]` 再次進入等待親代節點的狀態
- `Thread [A]` 不受親代節點影響繼續執行工作
```
============clock_tick() tick : 4============
Thread [A] | i : 4
Thread [A] send signal
Thread [A] becomes not ready.
Thread [B] send signal
Thread [B] | i : 3
Thread [B] becomes not ready.
Thread [C] trigger clock
```
在 tick : 4 中
- `Thread [A]` 發送節點的訊號喚醒正在等待的 `Thread [B]` (喚醒時 `Thread [A]` 轉變為 ready 因此 `Thread [B]` 才能執行工作,此時 `Thread [A]` 交棒給 `Thread [B]` 後再次轉變為 not ready)
- `Thread [B]` 因為 `bit` 的關係工作內容變成發送節點訊號喚醒正在等待的 `Thread [C]`
後續都是相似的步驟,因此不再贅述
最後在 tick : 8 中, `clock->ticks` 等於 $2^{N_{NODES}}$ 喚醒 main 執行緒,接著進入 `clock_stop(&clock)` 將 ticks 的值設為 `-1`,導致在執行緒函式中 `for` 迴圈中止。
:::info
為何在 `thread_func` 最後需要在進行 `node_signal(self)` ?
:::
**補充筆記:實作 Priority-inheritance mutex**
在 [mutex/pi-test/main.c](https://github.com/sysprog21/concurrent-programs/blob/master/mutex/pi-test/main.c) 函式中主要建立一個 Priority inversion 的情況
定義一個具集 `TRY(f)` 用來檢視錯誤,其中 `#f` 將參數 `f` 轉換為字符串,這樣可以在錯誤訊息中顯示出具體是哪個函式失敗。
```c
#define TRY(f) \
do { \
int __r; \
if ((__r = (f != 0))) { \
fprintf(stderr, "Run function %s = %d\n", #f, __r); \
return __r; \
} \
} while (0)
```
`ctx_init` 函式中的 `mutexattr_setprotocol` 設置了優先級繼承,這樣可以避免優先級反轉。
```c
static void ctx_init(struct ctx *ctx)
{
mutexattr_t mattr;
mutexattr_setprotocol(&mattr, PRIO_INHERIT);
mutex_init(&ctx->m0, &mattr);
}
```
```c
#include <pthread.h>
int pthread_mutexattr_setprotocol(pthread_mutexattr_t *attr, int protocol);
```
`attr` : 指向互斥鎖屬性的指標
`protocol` : 協議屬性
- `PTHREAD_PRIO_INHERIT` : 使用優先級繼承
- `PTHREAD_PRIO_PROTECT` : 使用優先級上限
- `PTHREAD_PRIO_NONE` : 不使用優先級繼承或上限的策略
**執行**
- 必須透過 `sudo` 執行,因為牽涉對執行緒優先權的調整,需要 root 權限
- 必須使用 taskset 將程式鎖定在單一 CPU 上,否則多 CPU 資源的情況下,就沒辦法讓 middle 佔據唯一的 CPU。
```
sudo taskset -c 1
```
:::info
問題 1
在研讀的過程中 `pthread_create_with_prio` 函式主要用作創建一個具有優先及的執行緒,當中 `sched_param` 的結構體又是從何而來?在完整程式碼中看起來沒有引用到排程器相關標頭檔。
部份程式碼
```c
static int pthread_create_with_prio(pthread_t *thread,
pthread_attr_t *attr,
void *(*start_routine)(void *),
void *arg,
int prio)
{
struct sched_param param;
param.sched_priority = prio;
TRY(pthread_attr_setschedparam(attr, ¶m));
TRY(pthread_create(thread, attr, start_routine, arg));
return 0;
}
```
:::
在 [pthread_setschedparam(3) — Linux manual page](https://www.man7.org/linux/man-pages/man3/pthread_setschedparam.3.html) 找到 `sched_param` 結構體
```c
struct sched_param {
int sched_priority; /* Scheduling priority */
};
```
:::info
`pthread_attr_setschedpolicy(&attr, SCHED_FIFO)`
`pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED)`
理解這兩 API 代表意思
:::
```c
int pthread_attr_setinheritsched(pthread_attr_t *attr,int inheritsched);
```
`attr` : 指向執行緒屬性的指標
`inheritsched` : 是否繼承父執行緒的排程屬性
- `PTHREAD_EXPLICIT_SCHED` : 不繼承,新執行緒使用由 `pthread_attr_setschedparam` 和 `pthread_attr_setschedpolicy` 設置的排程屬性,執行緒
- `PTHREAD_INHERIT_SCHED` : 繼承父執行緒的排程屬性
返回值:成功返回 `0` ,失敗返回非 0 值
```c
int pthread_attr_setschedpolicy(pthread_attr_t *attr, int policy);
```
`attr` : 指向執行緒屬性的指標
`policy` : 排程屬性
- `SCHED_FIFO` : 搶佔式排程,即會被優先即更高的執行緒搶佔
- `SCHED_RR` : 輪詢式排程,擁有同樣優先權的執行緒會以一固定時間量,又稱「時間配量」(time slice),用輪詢方式執行。
- `SCHED_OTHER` : 為 Linux 中默認。僅用於靜態優先級為 0 的執行緒(i.e., threads under real-time policies always have priority over `SCHED_OTHER` processes),從靜態優先級 0 列表中選擇要運行的執行緒,由動態優先級決定(即 nice 值)。
每個時間段中,當執行緒準備好運行但被排程器拒絕時,動態優先級會增加。這確保了所有 `SCHED_OTHER` 執行緒之間的公平進展。
返回值:成功返回 `0` ,失敗返回非 0 值
> [sched(7) — Linux manual page](https://www.man7.org/linux/man-pages/man7/sched.7.html)
```c
int pthread_attr_setschedparam(pthread_attr_t *restrict attr,
const struct sched_param *restrict param);
```
主要用作設定優先級
返回值:成功返回 `0` ,失敗返回非 0 的值
:::info
問題 2
請問若沒有給定優先權的級別,系統的判定如何決定下個執行的執行緒?
:::
TODO : 理解 nice 值、CFS
**測試**
主要測試排程策略相關問題。
完整測試程式 -> [mutex/test_priority](https://github.com/jeremy90307/Posix_thread/tree/master/mutex/test_priority)
基於問題 2 進行相關測試。設定三個執行緒,Thread 1 與 Thread 2 將優先級設置在 1 (其中優先級設置範圍 1 ~ 99),Thread 使用預設屬性(即 `SCHED_OTHER`),並進行兩種排程屬性:`SCHED_FIFO`、`SCHED_RR` 測試
- `SCHED_FIFO`
```
$ make run
sudo taskset -c 1 /home/jeremy/posix_thread/mutex/test_priority/test_priority
1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
```
在搶佔式的排程中,Thread 1 並沒有讓出資源給 Thread 2 執行。
:::info
問題 3
當這種靜態優先級與動態優先級同時出現時,如何決定動態優先級的執行緒何時能執行?在 `SCHED_FIFO` 的結果中看到 Thread 3 並沒有成功執行。
:::
- `SCHED_RR`
```
$ make run
sudo taskset -c 1 /home/jeremy/posix_thread/mutex/test_priority/test_priority
1 1 1 1 1 1 2 1 2 3 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 3 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2
```
可以看到 Thread 1 與 Thread 2 將資源的平均分配給不同執行緒,Thread 3 的非即時執行緒根據動態優先級偶爾也能分配到資源。
### [〈建立相容於 POSIX Thread 的實作〉](https://hackmd.io/@sysprog/concurrency/%2F%40sysprog%2Fconcurrency-thread-package)
- Chain Blocking : 一個高優先權執行緒為了取得 n 個資源,被 n 個低優先權執行緒執行緒 block 住,這種情形就稱為 Chained Blocking
- Deadlock : 不同優先級的執行緒們之間相互取得各自已經取得的資源,並且彼此等待對方釋放資源造成的 Deadlock
#### 實作 priority protection mutex
priority protection mutex 設定 :
`muthread_mutexattr_setprioceiling`: 設定 mutex 的 `prioceiling`
```c
muthread_mutexattr_setprotocol(&mattr, TBTHREAD_PRIO_INHERIT);
muthread_mutexattr_setprioceiling(&mattr, 25);
muthread_mutex_init(&mutex_normal, &mattr);
muthread_mutex_init(&mutex_normal_2, &mattr);
```
:::info
在[測驗程式 Tests/test-06-priority-inversion-PP.c](https://github.com/qwe661234/MuThreadPackage/blob/main/Tests/test-06-priority-inversion-PP.c) 中,我們在已經設定好會發生 priority inversion 的情況下去實作出 priority protection mutex 是否會過於不實際,若面對到更複雜的情況下使用 PP 是否對效能真的有更多提升?該如何驗證?
我在問題中提到的不實際是因為我們已知任務的優先順序,故將某任務的優先級提升至某執行緒優先級之上,在系統中是否是常見的作法?
:::
## TODO: 針對〈並行程式設計:排程器原理〉設計相關實驗
> 以隨堂測驗和作業題目 (如 Timsort) 為展示標的,設計 preemptive coroutine 整合的相關實驗至少二個,達成並行處理並確認執行結果符合預期
在整合相關實作之前,先嘗試了解在 [並行程式設計: 排程器原理](https://hackmd.io/@sysprog/concurrency-sched) 中 preemptive coroutine 的實作方法,才能延伸至其他案例中。
> 範例 : [preempt_sched](https://github.com/sysprog21/concurrent-programs/blob/master/preempt_sched/task_sched.c)
- `preempt_count` 用作決定是否允許搶佔,當大於 0 表示禁止搶佔
**中斷處理**
在了解中斷處理時出現幾個陌生的函式及結構體。
`sigset_t` 用來表示訊號集,其初始化方式有兩種:
1. 使用 `sigemptyset (sigset_t * set )` 指定訊號集為空
2. 或者使用 `sigfillset (sigset_t * set )` 指定訊號集為滿(即阻塞所有訊號),然後使用 `sigdelset` 單獨刪除指定的訊號(即不阻塞指定訊號),反之加入使用 `sigaddset`。
`sigprocmask` -> ???
:::danger
總是查閱第一手材料,如: https://man7.org/linux/man-pages/man2/sigprocmask.2.html
:::
```c
int sigprocmask(int how, const sigset_t *_Nullable restrict set,
sigset_t *_Nullable restrict oldset);
```
> `sigprocmask()` is used to fetch and/or change the signal mask of the calling thread. The signal mask is the set of signals whose delivery is currently blocked for the caller
不太能理解這裡所謂的遮罩意思,描述裡提及可以透過 `how` 來改變訊號集的狀態
- `SIG_BLOCK`:「阻塞」這些信號
- `SIG_UNBLOCK`:「解除阻塞」這些信號
- `SIG_SETMASK`:將當前的信號遮罩集設置為 `set` 指向的信號集,即「覆蓋」當前的信號遮罩集。
```c
static void local_irq_save(sigset_t *sig_set)
{
sigset_t block_set;
sigfillset(&block_set);
sigdelset(&block_set, SIGINT);
sigprocmask(SIG_BLOCK, &block_set, sig_set);
}
```
-> 用作阻塞除了 `SIGINT` 以外的所有訊號,並保存原先的訊號狀態 `sig_set`
```c
static void local_irq_restore(sigset_t *sig_set)
{
sigprocmask(SIG_SETMASK, sig_set, NULL);
}
```
-> 恢復至原來的訊號狀態
> 當 `sigprocmask` 的第三個參數 `oldset` 為 NULL 時,表示不需要保存之前的訊號狀態
**task**
`task_add()` 加入一個新任務
首先使用 `getcontext` 來取得當前的 context,接著在函式中呼叫 `makecontext` 前必須先設置新的 stack 及下個 context 的地址,如下:
> [makecontext(3) — Linux manual page](https://man7.org/linux/man-pages/man3/makecontext.3.html)
```c
task->context.uc_stack.ss_sp = task->stack;
task->context.uc_stack.ss_size = 1 << 20;
task->context.uc_stack.ss_flags = 0;
task->context.uc_link = NULL;
```
`ss_flags` -> 用處為何?
`uc_link` 為當前 context 執行結束後要執行的下個 context ,若為 `NULL` 則當前 context 執行完後結束。
在呼叫 `makecontext` 之前為何需要先設置 stack 大小 `ss_size` ?
> [Stack Overflow](https://github.com/sysprog21/concurrent-programs/blob/master/preempt_sched/task_sched.c) 當中解答提到 Because stack on some machine "grows toward numerically lower addresses" (like it does on x86 architecture and wiki x86) the makecontext needs/wants to place it data on the end of the stack. So it needs to determinate the end of stack and this is what ss_size is used for.
```c
makecontext(&task->context, (void (*)(void)) task_trampoline, 2, ptr.i[0],
ptr.i[1]);
```
`makecontext` 先設置要輸入給指定函式 `task_trampoline` 的參數數量 `2` 及參數 `ptr.i[0]`, `ptr.i[1]`,使 context switch 時能執行特定函式。
注意:確保當新任務第一次被切換時,其 timer signal 是被阻塞的
```c
sigaddset(&task->context.uc_sigmask, SIGALRM);
```
**trampoline**
> Trampolines (sometimes referred to as indirect jump vectors) are memory locations holding addresses pointing to interrupt service routines, I/O routines, etc. Execution jumps into the trampoline and then immediately jumps out, or bounces, hence the term trampoline.
```c
__attribute__((noreturn)) static void task_trampoline(int i0, int i1)
{
union task_ptr ptr = {.i = {i0, i1}};
struct task_struct *task = ptr.p;
/* We switch to trampoline with blocked timer. That is safe.
* So the first thing that we have to do is to unblock timer signal.
* Paired with task_add().
*/
local_irq_restore_trampoline(task);
task->callback(task->arg);
task->reap_self = true;
schedule();
__builtin_unreachable(); /* shall not reach here */
}
```
`noreturn` 關鍵字告訴編譯器此函式不返回原來調用者函式的位置。
:::info
這裡問題有點大
`task->callback(task->arg)` ??? 貌似跑去執行排序任務一直到完成排序,不能理解為何
:::
**排程**
```c
static void schedule(void)
{
sigset_t set;
local_irq_save(&set);
struct task_struct *next_task =
list_first_entry(&task_current->list, struct task_struct, list);
if (next_task) {
if (task_current->reap_self)
list_move(&task_current->list, &task_reap);
task_switch_to(task_current, next_task);
}
struct task_struct *task, *tmp;
list_for_each_entry_safe (task, tmp, &task_reap, list) /* clean reaps */
task_destroy(task);
local_irq_restore(&set);
}
```
`next_task` 使用 `list_first_entry` 選擇下一個被排程的任務(即為當前任務的下一個鏈節串列節點),若 `next_task` 存在使用 `task_switch_to()` 來進行 context switch 。
```c
static void task_switch_to(struct task_struct *from, struct task_struct *to)
{
task_current = to;
swapcontext(&from->context, &to->context);
}
```
`swapcontext` 將任務從 `from->context` 切換到 `to->context` ,且 `swapcontext` 會保存當前 CPU 狀態。
> [swapcontext(3) - Linux man page](https://linux.die.net/man/3/swapcontext)
`task_current->reap_self` 表示任務是否完成,若完成則將其至於一個待清除的鏈節串列。
**timer**
在此案例中創建一個每 10 ms 發送 `SIGALRM` 訊號給行程來模擬中斷
```c
static void timer_create(unsigned int usecs)
{
ualarm(usecs, usecs);
}
```
> [ualarm(3) — Linux manual page](https://man7.org/linux/man-pages/man3/ualarm.3.html)
`ualarm` 第一個參數為第一次發送 `SIGALRM` 的時間間隔,第二個參數為間隔多久發送 `SIGALRM` 。
在 `timer_cancel()` 中設置 `ualarm(0, 0)`,這樣 timer 會被取消。
```c
static void timer_init(void)
{
struct sigaction sa = {
.sa_handler = (void (*)(int)) timer_handler,
.sa_flags = SA_SIGINFO,
};
sigfillset(&sa.sa_mask);
sigaction(SIGALRM, &sa, NULL);
}
```
`sigaction` 設置處理函式為 `timer_handler`
> [sigaction(2) — Linux manual page](https://man7.org/linux/man-pages/man2/sigaction.2.html)
:::info
問題:
看到 `sa_flags` 被設置為 `SA_SIGINFO` 時,查了他的作用卻還是不能理解這段話的意思,在結構體中為何不直接設置 `sa_sigaction` ,而要使用這種替代方式?
> If `SA_SIGINFO` is specified in sa_flags, then sa_sigaction
(instead of sa_handler) specifies the signal-handling function
for signum. This function receives three arguments, as described
below.
```c
struct sigaction {
void (*sa_handler)(int);
void (*sa_sigaction)(int, siginfo_t *, void *);
sigset_t sa_mask;
int sa_flags;
void (*sa_restorer)(void);
};
```
:::
```c
static void timer_wait(void)
{
sigset_t mask;
sigprocmask(0, NULL, &mask);
sigdelset(&mask, SIGALRM);
sigsuspend(&mask);
}
```
`timer_wait()` -> 等待 `SIGALRM` 訊號,當接收到訊號後進入 `timer_handler` 再進入 `schedule`
- `sigprocmask` 將當前的訊號遮罩集保存至 `mask` 中,並使用 `sigdelset` 刪除 `SIGALRM` 訊號,使其不會被阻塞。
- `sigsuspend` 暫時將 mask 給定的遮罩取代執行緒的訊號遮罩,直到接收到 `SIGALRM` 訊號,使執行緒恢復執行。
> [sigsuspend(2) — Linux manual page](https://man7.org/linux/man-pages/man2/sigsuspend.2.html)
> sigsuspend() temporarily replaces the signal mask of the calling thread with the mask given by mask and then suspends the thread until delivery of a signal whose action is to invoke a signal handler or to terminate a process.
```c
static void timer_handler(int signo, siginfo_t *info, ucontext_t *ctx)
{
if (preempt_count) /* once preemption is disabled */
return;
/* We can schedule directly from sighandler because Linux kernel cares only
* about proper sigreturn frame in the stack.
*/
schedule();
}
```
當 `SIGALRM` 訊號到達時,使用 `schedule` 進行排程
**`main`**
```c
int main()
{
timer_init();
task_init();
task_add(sort, "1"), task_add(sort, "2"), task_add(sort, "3");
preempt_disable();
timer_create(10000); /* 10 ms */
while (!list_empty(&task_main.list) || !list_empty(&task_reap)) {
preempt_enable();
timer_wait();
preempt_disable();
}
preempt_enable();
timer_cancel();
return 0;
}
```
執行流程:
1. 初始化 timer 及 task
2. 新增三個任務,每個任務都執行隨機數列的排序,並將其加入到任務的鏈節串列。需注意到 `makecontext` 設置了新任務的入口點為 `task_trampoline`
3. 禁止搶佔,保持創建 timer 的 Atomic,且每隔 10000 usecs 發送一次 `SIGARLM` 訊號
4. 在 `while` 循環中反覆等待 `SIGARLM` 訊號,每次接收到訊號進行 context switch
5. 在等待的過程 `timer_wait` 中允取發生搶佔
::: info
當任務 1 收到 `SIGARLM` 訊號後將工作讓給任務 2 或 任務 3 皆是有可能?
:::
6. 當 `timer_wait` 接收到訊號後,交給 `schedule` 函式進行 context switch
7. 若當任務首次搶佔時,`task_trampoline` 會恢復 timer 的訊號,因原先會阻塞 `SIGARLM` ,避免創建資料的搶佔造成資料不一致?
> 在 `task_add` 函式中可以看到
8. 最後 `timer_cancel` 取消 timer
問題:
- 如何決定排程器的下個任務?
- 當接收到 `SIGALRM` 訊號時,會觸發 `timer_handler` ,在此函式中使用 `schedule` 函式進行排程
- 清除的時機 ?
### 案例 1 : list_sort 整合 preemptive coroutine
==起步時遇到的問題:==
在整合的過程中遇到一些困難,在 [preempt_sched](https://github.com/sysprog21/concurrent-programs/blob/master/preempt_sched/task_sched.c) 中的我不太能理解其 `qsort_r` 實作的方式,因此我該如何將 quicksort 改成能支持 coroutine 的形式,並整合至 preemptive coroutine 之中?
> 我原先案例 1 要使用 quicksort 整合 preemptive coroutine
我先將 lab0-c 中實作過得 `list_sort` 整合至 preemptive coroutine ,主要因為 list_sort 能找到完整的程式碼,了解其運作原理。
==整合中的問題:==
1. `random_shuffle()` 的意義,相較使用 `random()`?
2. 思考何時該禁止搶佔的發生?
目前已知在創建鏈結串列的節點、打印訊息時禁止搶佔,但禁止搶佔的依據是什麼?我一直無法確定。
4. 在 coroutine 中我是否應該注意我的 `list_sort` 是否 thread safe ?
5. 請問該如何寫好 `README` ?
**實作**:
完整程式碼 -> [github](https://github.com/jeremy90307/concurrent-programs/tree/master/preempt_sched/list_sort)
參考 [preempt_sched](https://github.com/sysprog21/concurrent-programs/blob/master/preempt_sched/task_sched.c) 的模擬搶佔情境,並將 `qsort_r` 更改成 `list_sort` ,其中 `list_sort` 取自 linux kernel ,是一種 merge sort 的實作方式改良,更多說明可以參考 [lab0-c](https://hackmd.io/GU68poa6RBqvgaM5PZ6UlA?view#%E5%AF%A6%E4%BD%9C-list_sortc) 。
`list_sort` 為雙向的鏈結串列,將節點包裝成 `element_t` 的結構體,結構體包含 `value` 與可以連接至上下節點的 `list` ,其中每個節點都有隨機的 `value` 值,每個 task 的工作即將各自的鏈結串列完成排序。
```diff
+ typedef struct {
+ uint32_t value;
+ struct list_head list;
+ } element_t;
```
**比較函式 `cmp`**
先看原程式碼中使用到的比較技巧
```c
static int cmp_u32(const void *a, const void *b, void *arg)
{
uint32_t x = *(uint32_t *) a, y = *(uint32_t *) b;
uint32_t diff = x ^ y;
if (!diff)
return 0; /* *a == *b */
diff = diff | (diff >> 1);
diff |= diff >> 2;
diff |= diff >> 4;
diff |= diff >> 8;
diff |= diff >> 16;
diff ^= diff >> 1;
return (x & diff) ? 1 : -1;
}
```
舉例 `x = 199` 與 `y = 39`
| | x | 1100 0111 |
| -------- | -------- | -------- |
| \| | y | 0010 0111 |
| | | |
| = | diff | 1110 0111 |
| \| | diff >> 1 | 0111 0011 |
| | | |
| = | diff | 1111 0111 |
| \| | diff >> 2 | 0011 1101 |
| | | |
| = | diff | 1111 1111 |
| \| | diff >> 4 | 0000 1111 |
| | | |
| = | diff | 1111 1111 |
| | .... | ... |
| | diff | 1111 1111 |
| ^ | diff >> 1 | 0111 1111 |
| = | diff | 1000 0000 |
最後 `diff` 可以得到有差異的最高位,`(x & diff) ? 1 : -1` 即可比較出兩數字之間的大小。
將其改寫成可以連接鏈結串列
```c
static int cmp(void *priv,
const struct list_head *a,
const struct list_head *b)
{
element_t *node_a = list_entry(a, element_t, list);
element_t *node_b = list_entry(b, element_t, list);
uint32_t diff = node_a->value ^ node_b->value;
if(!diff)
/* *a == *b */
return 0;
diff = diff | (diff >> 1);
diff |= diff >> 2;
diff |= diff >> 4;
diff |= diff >> 8;
diff |= diff >> 16;
diff ^= diff >> 1;
return (node_a->value & diff) ? 1 : -1;
}
```
在加入 `list_sort` 之前需重新定義
```c
#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)
typedef int
__attribute__((nonnull(2, 3))) (*list_cmp_func_t)(void *,
const struct list_head *,
const struct list_head *);
typedef uint8_t u8;
```
最後改寫 `sort()` 使其能與鏈結串列結合並進行排序的工作。
```diff
static void sort(void *arg)
{
char *name = arg;
preempt_disable();
- uint32_t *arr = malloc(ARR_SIZE * sizeof(uint32_t));
+ struct list_head head;
+ INIT_LIST_HEAD(&head);
+ uint32_t r = getpid();
+ for (int i = 0; i < ARR_SIZE; i++) {
+ element_t *node = malloc(sizeof(element_t));
+ node->value = (r = random_shuffle(r));
+ list_add_tail(&node->list, &head);
+ }
preempt_enable();
task_printf("[%s] %s: begin\n", name, __func__);
- uint32_t r = getpid();
- for (int i = 0; i < ARR_SIZE; i++)
- arr[i] = (r = random_shuffle(r));
task_printf("[%s] %s: start sorting\n", name, __func__);
- qsort_r(arr, ARR_SIZE, sizeof(uint32_t), cmp_u32, name);
+ list_sort(NULL, &head, cmp);
- for (int i = 0; i < ARR_SIZE - 1; i++)
- if (arr[i] > arr[i + 1]) {
- task_printf("[%s] %s: failed: a[%d]=%u, a[%d]=%u\n", name, __func__,
- i, arr[i], i + 1, arr[i + 1]);
- abort();
- }
+ element_t *node, *safe;
+ list_for_each_entry_safe(node, safe, &head, list)
+ {
+ if (&safe->list != &head && node->value > safe->value) {
+ task_printf("[%s] %s: failed: node->value=%u, safe->value=%u\n", name, __func__,
+ node->value, safe->value);
+ abort();
+ }
+ }
task_printf("[%s] %s: end\n", name, __func__);
preempt_disable();
+ struct list_head *pos, *tmp;
+ list_for_each_safe (pos, tmp, &head) {
+ element_t *node = list_entry(pos, element_t, list);
+ free(node);
+ }
- free(arr);
preempt_enable();
}
```
輸出結果:
```
$ make
cc -O2 -Wall -std=gnu99 -o task_sched task_sched.c
$ ./task_sched
[1] sort: begin
[1] sort: start sorting
[2] sort: begin
[2] sort: start sorting
[3] sort: begin
[3] sort: start sorting
[3] sort: end
[2] sort: end
[1] sort: end
$ ./task_sched
[1] sort: begin
[1] sort: start sorting
[2] sort: begin
[2] sort: start sorting
[3] sort: begin
[3] sort: start sorting
[2] sort: end
[3] sort: end
[1] sort: end
```
為何會造成 3 個 task 工作結束的不一致
- 排序值的隨機性,造成處理時間不一樣
- 任務切換
### 案例 2 : Timsort 整合 preemptive coroutine
回顧第一周的`測驗 2` 來了解 Timsort 在 Merge sort 的基礎上做了哪些改進
Timsort 結合 Merge sort 和 Insertion Sort,改進的目的有以下:
1. 加速 merge 的過程
2. 減少合併次數
3. 在處理有部分已排序的資料下,有更好的表現
值得一提的是, Timsort 將資料由 run 所構成,這裡的 run 由部分已排列的資料所構成。
那 run 的長度該怎麼決定,定義一個 minrun (最小運行長度),用以表示每個 run 的最小長度。有幾點需要我們考慮到:
- minrun 不宜太長,會造成花費更多時間在執行插入排序
- minrun 不宜太短,否則在進行 merge 時次數變多
- 理想情況下 run 的數量等於或者略小於 2 的冪,效率會最高
在 `find_run` 函式會尋找符合遞增順序的元素,並組成一個 run ,接著使用 `merge_collapse` 來讓 run 的數量保持平衡,且須滿足以下條件:
- 假設有三個 run 分別為 A、B、C
- A > B + C
- B > C
只要有一項不符合則將 B 與 A、C 中較小者進行合併,舉例說明:
-----
合併分成兩種:
- 在這我們假設有 run A 及 run B
```
A : [3, 7, 11, 12, 13, 31, 45]
B : [21, 22, 24, 24, 29, 300, 400]
```
1. merge sort 從 A、 B 的開頭逐一比較來進行合併,稱為 `one-pair-at-a-time`
2. Timsort 的合併基於 merge sort 做出改進,首先將 A 與 B 中長度較短者放入一塊臨時記憶區 `temp` (這裡 A 為較短者)
```
A : []
B : [21, 22, 24, 24, 29, 300, 400]
temp : [3, 7, 11, 12, 13, 31, 45]
```
接著 B 的第一個元素 $B_0$ 會去找在 A 的何處 ($A_4$ 與 $A_5$ 之間),並將 $A_0$ ~ $A_4$ 放回 A 數組中
```
A : [3, 7, 11, 12, 13]
B : [21, 22, 24, 24, 29, 300, 400]
temp : [31, 45]
```
接著將 temp[0] 比對 B ,介於 B[4] 與 B[5] 之間
```
A : [3, 7, 11, 12, 13, 21, 22, 24, 24, 29]
B : [300, 400]
temp : [31, 45]
```
以此類推
```
A : [3, 7, 11, 12, 13, 21, 22, 24, 24, 29, 31, 45]
B : [300, 400]
temp : []
```
==>
```
A : [3, 7, 11, 12, 13, 21, 22, 24, 24, 29, 31, 45, 300, 400]
B : []
temp : []
```
這種合併方法被稱為 Galloping mode 。
---
TODO : 待整合