# linux2023: RinHizakura - 作業 1
> * [2023 年暑期 Linux 核心課程第 1 次作業](https://hackmd.io/@sysprog/linux2023-summer-quiz0)
> * [linux-summer-2023: hw1](https://github.com/RinHizakura/linux-summer-2023/tree/main/hw1)
## 測驗 α
### S-tree 程式碼運作原理
#### 資料結構
```cpp
/* S-Tree uses hints to decide whether to perform a balancing operation or not.
* Hints are similar to AVL-trees' height property, but they are not
* required to be absolutely accurate. A hint provides an approximation
* of the longest chain of nodes under the node to which the hint is attached.
*/
struct st_node {
short hint;
struct st_node *parent;
struct st_node *left, *right;
};
struct st_root {
struct st_node *root;
};
```
首先從資料結構中探討起,我們可以看出 S-tree 的設計上與一般的 BST 類似,結構中維護左右與親代的子節點。特別的是 `hint` 這個成員,這讓 S-tree 可以在追求快速的 insert/delete 操作速度的同時,也可以**近乎**達到 AVL tree 所期待的平衡效果以獲得更佳/更一致(consistent)的 lookup 延遲。其作用細節在後續的實作中解釋。
:::danger
設計上覺得 S-tree 提供的介面(API,即 `st_*` 系列函式) 似乎有點彆扭? 比方說 `st_insert` 的實作需要由外部的 caller 先確認好 node 之間的 comparison order。假設我們確立 S-tree 就是只作為 BST 來使用,我覺得理想的介面應該是:
```cpp
void st_insert(struct st_node **root, struct st_node *n);
```
可能再加上做 compare 的函式指標 (`*cmp`),或是為 `root` 的 `st_node` 再包裝一個更高級的資料結構並將 `cmp` 作為其中一員。
:::
#### `st_first`
```cpp
struct st_node *st_first(struct st_node *n)
{
if (!st_left(n))
return n;
return st_first(st_left(n));
}
struct st_node *st_last(struct st_node *n)
{
if (!st_right(n))
return n;
return st_last(st_right(n));
}
```
`st_first` 和 `st_last` 的作用是找到以 `n` 為 root 的 subtree 中,order 最低即最高(根據對節點間 comparison 定義)的節點。
#### `st_replace_right`
`st_replace_right` 的功能是用 `r` 來取代 `n` 原本在樹上的位置。更精準的說,這其實是 BST 之 remove 操作的一種形式,實際上 `r` 與 `n` 的關係必然滿足 `r = st_first(st_right(n))`(見 `st_remove`)。基於此前提下,這個 replace 實作才是有效且正確的。
可以想見有幾個節點間的關係可能會因應 replace 而進行調整:
1. `r` 的左子樹
2. `r` 的右子樹
3. `r` 的父節點
4. ~~`r` 原本的左子樹之父節點~~: 因為 `r` 是 `st_first(st_right(n))`,左子樹必定為 NULL
5. `r` 原本的右子樹之父節點
6. `r` 原本的父節點之左or右子樹
7. `r` 原本的父節點之父節點
8. `n` 原本的左子樹之父節點
9. `n` 原本的右子樹之父節點
10. `n` 原本的父節點之左or右子樹
在實作 replace 時,我們要考慮這些 link 是否都調整正確。
```cpp
static inline void st_replace_right(struct st_node *n, struct st_node *r)
{
struct st_node *p = st_parent(n), *rp = st_parent(r);
if (st_left(rp) == r) {
st_left(rp) = st_right(r);
if (st_right(r))
st_rparent(r) = rp;
}
```
首先判斷 `r` 是 `rp` 的左子還是右子,如果是左子,則將 `rp` 左子替換為 `r` 的右子樹(6),右子樹的 parent 也需要更新(5)。
:::info
1. 如果 `r` 是 `rp` 的右子,其實可以預期此時 `n` 就是 `rp`,則此時就不必將 `r` 的右子樹交給其他人繼承,可想而知 `rp` 的左右也不可能繼承任何子樹,稍後就會被刪除
2. `r` 的左子樹怎麼辦呢? 留意到因為在刪除操作中 `r` 必然是來自某一子樹 `tree` 的 `st_first(tree)`,因此可預期 `st_left(r) = NULL`
:::
```cpp
if (st_parent(rp) == n)
st_parent(rp) = r;
```
`rp` 的父節點是 `n` 的狀況下,由於 `n` 即將被刪除且由 `r` 取代,因此 `st_parent(rp)` 勢必要被修改(7)。
```cpp
st_parent(r) = p;
st_left(r) = st_left(n);
```
`r` 將取代 `n` 原本的位置,這裡調整此關係(1 & 3)。
```cpp
if (st_right(n) != r) {
st_right(r) = st_right(n);
st_rparent(n) = r;
}
```
`r` 不正好是 `n` 的右子樹的情況下,`r` 需繼承 `n` 的右子樹,反過來的關係也需建立(2 & 9)。
```cpp
if (p && st_left(p) == n)
st_left(p) = r;
else if (p)
st_right(p) = r;
```
這裡調整 `n` 之父節點的子代(10)。
```cpp
if (st_left(n))
st_lparent(n) = r;
}
```
調整 `n` 原本左子樹的父節點關係(8)。
#### `st_replace_right`
與 `st_replace_left` 類似,這裡就不多做探討。
#### `st_remove`
`st_remove` 將給定的 `del` 從樹中移除。
```cpp
void st_remove(struct st_node **root, struct st_node *del)
{
if (st_right(del)) {
struct st_node *least = st_first(st_right(del));
if (del == *root)
*root = least;
AAAA;
BBBB;
return;
}
```
刪除方法的基本邏輯近似一般的 BST,若右子樹存在,則找到右子樹中 order 最小者(`st_left` 必然為 NULL),以其取代原本的 `del` 即可。
* AAAA = `st_replace_right(del, least)`
* BBBB = `st_update(root, st_right(least))`
```cpp
if (st_left(del)) {
struct st_node *most = st_last(st_left(del));
if (del == *root)
*root = most;
CCCC;
DDDD;
return;
}
```
若右子樹不存在,則找到左子樹中 order 最大者(`st_right` 必然為 NULL),以其取代原本的 `del`。
* CCCC = `st_replace_left(del, most)`
* DDDD = `st_update(root, st_left(most))`
```cpp
if (del == *root) {
*root = 0;
return;
}
```
考慮到刪除的節點剛好是 root 的情況下,也需要更新 root 指標。
```cpp
/* empty node */
struct st_node *parent = st_parent(del);
if (st_left(parent) == del)
st_left(parent) = 0;
else
st_right(parent) = 0;
EEEE;
}
```
如果被被刪除的節點沒有左右都沒有節點,意即其為 leaf,只要將調整其 parent 的 child 關係,再直接刪除之就好。
EEEE = `st_update(root, parent)`
#### `st_rotate_right`
rotate 是為了平衡 BST 所需進行的操作,它調整數個節點間的親子關係以達此目的。方便理解,我們以下圖為例示範如何對 node 6 做 rotate right 操作。
```cpp
static inline void st_rotate_right(struct st_node *n)
{
struct st_node *r = st_right(n), *p = st_parent(n);
```
根據程式碼定義初始的狀態應該如下。
```graphviz
digraph BST {
node [fontname="Arial" ];
l3 [ label = "3" ];
l4 [ label = "4(*p)" ];
l5 [ label = "5" ];
l6 [ label = "6(*n)" ];
l7 [ label = "7(*r)" ];
n1 [ label = "NULL", color=white];
n2 [ label = "8"];
n3 [ label = "NULL", color=white];
n4 [ label = "9"];
l4 -> { l3 l6 };
l6 -> { l5 l7 };
l7 -> { n1 n2 };
n2 -> { n3 n4 };
}
```
```cpp
st_parent(r) = st_parent(n);
st_right(n) = st_left(r);
st_parent(n) = r;
st_left(r) = n;
if (p && st_left(p) == n)
st_left(p) = r;
else if (p)
st_right(p) = r;
if (st_right(n))
st_rparent(n) = n;
}
```
則可以根據程式碼行為一步一步用紙筆檢查 pointer 的調整,完成旋轉後應該會變成如下圖的樣子。看到 node 7 被從右子樹往上提一個等級,藉此降低原本子樹的高度。
```graphviz
digraph BST {
node [fontname="Arial" ];
l3 [ label = "3" ];
l4 [ label = "4(*p)" ];
l5 [ label = "5" ];
l6 [ label = "6(*n)" ];
l7 [ label = "7(*r)" ];
n1 [ label = "NULL", color=white];
n2 [ label = "8"];
n3 [ label = "NULL", color=white];
n4 [ label = "9"];
l4 -> { l3 l7 }
l6 -> { l5 n1 }
l7 -> { l6 n2}
n2 -> { n3 n4 };
}
```
#### `st_rotate_left`
道理類似 `st_rotate_right`,這邊就不再費時說明。
#### `st_update`
```cpp
static inline void st_update(struct st_node **root, struct st_node *n)
{
if (!n)
return;
int b = st_balance(n);
int prev_hint = n->hint;
```
`st_balance` 判斷節點 `n` 在左右子樹的平衡程度,當 b > 0,表示左子樹的高度大於右子樹,反之 b < 0 則表示是右子樹較高。注意到嚴格來說 `hint` 只是對高度的估計而非完全正確的高度值。
```cpp
struct st_node *p = st_parent(n);
if (b < -1) {
/* leaning to the right */
if (n == *root)
*root = st_right(n);
st_rotate_right(n);
}
else if (b > 1) {
/* leaning to the left */
if (n == *root)
*root = st_left(n);
st_rotate_left(n);
}
```
如果右子樹比較高,我們就做 `st_rotate_right`;與之相對,左子樹較高的情況是做 `st_rotate_left` 來平衡 BST。
```cpp
n->hint = st_max_hint(n);
if (n->hint == 0 || n->hint != prev_hint)
st_update(root, p);
}
```
旋轉之後,因為節點本身左右高度的變化要更新 hint。而當自身 hint 有變或是變成 leaf 時,也要向**原本的** parent 方向(以前面 `st_rotate_right` 旋轉 node 6 為例,即是要 update node 4)繼續更新 `hint` 和調整平衡。
#### `st_insert`
`st_insert` 將節點插入到 S-tree 中。
```cpp
void st_insert(struct st_node **root,
struct st_node *p,
struct st_node *n,
enum st_dir d)
{
if (d == LEFT)
st_left(p) = n;
else
st_right(p) = n;
st_parent(n) = p;
st_update(root, n);
}
```
實作上沒有什麼複雜之處,就是將 `n` 插入成指定的 parent `p` 之左或右子節點。
### 基於 S-tree 的 `treeint` 運作原理
#### 資料結構
```cpp
struct treeint {
int value;
struct st_node st_n;
};
```
模仿 Linux 的 linked list 風格,`st_node` 可以被置於任何 struct 下以串連不同種類的資料結構。本題中是以包含單一 int 的節點示範。
#### `treeint_insert`
```cpp
struct treeint *treeint_insert(int a)
{
struct st_node *p = NULL;
enum st_dir d;
for (struct st_node *n = st_root(tree); n;) {
struct treeint *t = container_of(n, struct treeint, st_n);
if (a == t->value)
return t;
p = n;
if (a < t->value) {
n = st_left(n);
d = LEFT;
} else if (a > t->value) {
n = st_right(n);
d = RIGHT;
}
}
```
因為目前 `st_insert` 提供的介面,`treeint_insert` 必須先找出要插入的位置。
注意到嚴格的 BST 中是不存在重複 key 值的節點的,因此若存在重複我們直接回傳該已存在的相同 key 值節點即可。
```cpp
struct treeint *i = calloc(sizeof(struct treeint), 1);
if (st_root(tree))
st_insert(&st_root(tree), p, &i->st_n, d);
else
st_root(tree) = &i->st_n;
i->value = a;
return i;
}
```
找到符合 BST 的正確位置後,完成節點的插入。
#### `__treeint_dump`
```cpp
static void __treeint_dump(struct st_node *n, int depth)
{
if (!n)
return;
__treeint_dump(FFFF, depth + 1);
struct treeint *v = treeint_entry(n);
printf("%d\n", v->value);
__treeint_dump(GGGG, depth + 1);
}
```
經典的 inorder 走訪。
* FFFF = `st_left(n)`
* GGGG = `st_right(n)`
### S-tree 的介面改進
雖然原題目沒有要求這項改進。不過正如前面所說,我認為 S-tree 目前的 API 設計很彆扭。若我們將 S-tree 作為函式庫來看,從使用者的角度而言,他應該只關心插入和移除的鍵值為何,而不必明確知道 S-tree 內部是如何管理節點之間關係的。然而目前的設計下我們可以在 `treeint_insert` 和 `treeint_remove` 看到依然存在複雜的邏輯來走訪整個樹,後者應該被隱藏在函式庫內部的實作才是。
為此,我重新定義了 S-tree 提供的 public interface。
```cpp
struct st_tree *st_create(cmp_t *cmp,
struct st_node *(*create_node)(void *key),
void (*destroy_node)(struct st_node *n));
void st_destroy(struct st_tree *tree);
int st_insert(struct st_tree *tree, void *key);
int st_remove(struct st_tree *tree, void *key);
struct st_node *st_find(struct st_tree *tree, void *key);
```
可以在 `st_create` 中看到節點之間的 order 比較(`cmp`)、節點的建立與銷毀(`create_node`/`destroy_node`) 被從 S-tree 中獨立出來,以 callback function 的方式讓函式庫的使用者自行實作。其餘包含樹的走訪邏輯、root node 的維護等都直接在內部完成。
順帶一提,原本 `treeint_insert` 和 `treeint_remove` 中走訪 S-tree 的邏輯其實很相似。這裡我們實際上也可以重用 `st_find` 函式來歸納之,避免重寫相同的邏輯,以減少程式碼編譯後產生之二進位檔的大小。
完整的實作改進可以參照 [s_tree.c](https://github.com/RinHizakura/linux-summer-2023/blob/main/hw1/treeint/s_tree.c)。有了這項調整,未來就可以很輕易地將 S-tree 應用在其他合適的專案之中了!
:::warning
雖然介面上變得簡單許多,不過其實如果對照 Linux 的 rbtree 實作,會發現其反而避免使用了使用 callback 的方式,作法比較接近本題原始的實作。
原因可參照原作者在 [rbtree.h](https://elixir.bootlin.com/linux/latest/source/include/linux/rbtree.h#L9) 中的註解說明:
> To use rbtrees you'll have to implement your own insert and search cores. This will avoid us to use callbacks and to drop drammatically performances. I know it's not the cleaner way, but in C (not in C++) to get performances and genericity...
其實不是很確定作者所述的 performance drop 具體有多高? 主要來自什麼部份(cache locality?),或許值得探討一下。畢竟如果能夠以 callback 方式實作,毫無疑問對使用者的上手門檻是更低的,應用上也更為容易
:::
### 探討 S-tree 與 rbtree 效能落差
首先,為利於後續加入其他演算法進行效能評比,對原始程式碼做出以下改動,以建立相關基礎建設:
1. 加入一個 macro `benchmark`,方便測量一段程式碼完成的時間
* 主要測量 insert/find/remove 三種操作的時間
* ~~macro 寫得不是很漂亮~~,不過暫時還算堪用
2. 撰寫一個簡單的 python script `tree-perf.py`,方便將測量結果視覺化
* 為了讓測量結果盡量不受 noise 干擾,且視覺化後的曲線圖可以更平滑,試著用簡單的標準差法排除蒐集到的時間中之 outlier
3. 將 BST 的各項操作也抽象成 callback function pointer
* 雖然前面提到 callback 的方式可能會對函式呼叫整體造成額外成本,但這裡我們主要的目標是要比較不同 tree 的 throughput,而非注重單一 tree 本身在實際應用上的可能優化
* 因此取捨傾向先以能夠容易且快速的加入各種 BST 實作的設計方式為主
效能評比程式有了,那麼 rbtree 的函式庫該從何而來呢? 稍微搜尋了一下,一方面似乎沒有比較被大眾廣泛接受的紅黑樹相關 C 函式庫;一方面,S-tree 的實作和 Linux 中的 rbtree 介面上其實存在不少相似之處。因此我們乾脆將 Linux 的紅黑樹實作拉到 userspace 來應用。這裡參照 Linux 6.4 版本中的程式碼,並且只擷取與 insert/find/remove 三項操作相關的函式實作,具體整理出來的程式碼可以參考:
* [rbtree.h](https://github.com/RinHizakura/linux-summer-2023/blob/main/hw1/treeint/rbtree.h)
* [rbtree.c](https://github.com/RinHizakura/linux-summer-2023/blob/main/hw1/treeint/rbtree.c)
接著我們就可以來測量各個操作間的差異。
根據老師的建議將實驗分為循序輸入和亂序輸入的場景。對於循序輸入的部份,我們將從 `0` 一直到 `tree_size - 1` 的 key 值插入到樹中,然後再以相同的次序進行搜尋和刪除。過程記錄不同的樹節點數量與完成相應操作所需之累積時間的關係。則實驗的結果如下圖:

循序狀況下,似乎在樹節點數量較少時兩者差異不大,而節點增加時,S-tree 相較於 rbtree 的表現更好?
而亂序輸入的場景下,我們會隨機產生任意的插入/搜尋/移除值,實驗結果如下:

從結果來看,亂序狀況下,S-tree 整體的效能看起來稍優於 rbtree。且在亂序情形下 rbtree 似乎更容易有 worst case (圖中極端值) 的出現。
該怎麼依此給出合理的解釋呢?
* 需注意到其實無論是插入/搜尋/刪除操作,其實三者都具備**搜尋**的部份,而插入和刪除只是多了調整節點間關係+旋轉的步驟
* 則如果瓶頸發生在搜尋部份,可以預期三種操作測量出的時間曲線圖將很相似(例如本實驗)?
* 對於亂序下出現特殊的極端值(圖中的尖峰),可能是對某個 key 值的搜尋會導致最差情況(worst case)的發生?
* 不認為是硬體(CPU、Cache)干擾的原因在於三種操作在同一節點數量都出現極端值
* 理論上,在樹的平衡上 S-tree 應具有較為明顯的優勢,如下圖以循序插入 0 - 9 的節點為例,可以看到在限制樹高度的能力上 S-tree 是更有效的
> 
> S-tree 從 0 到 9 插入節點後
> 
> rbtree 從 0 到 9 插入節點後
:::warning
1. 以上敘述純屬推測(~~嘴炮~~),需拿出更多證據支持
2. 設計何種場景可以彰顯 rbtree 在插入/移除上的優勢?
:::
### S-tree 的最壞場景
然而在特定輸入上可以體現 S-tree 的 hint 對高度估計的不精準導致的 worst case。我們首先輸入 1,接著輸入 1000,則 S-tree 會呈現以下樣貌:
```graphviz
digraph BST {
node [fontname="Arial" ];
n0 [label = "1(1)" ];
n1 [label = "NULL", color=white ];
n2 [label = "1000(0)" ];
n0 -> { n1 n2 }
}
```
此時 node 1 的 hint 為 1,node 1000 的 hint 為 0(括號中數字)。接著插入 900,最開始會呈現以下樣貌:
```graphviz
digraph BST {
node [fontname="Arial" ];
n0 [label = "1(1)" ];
n1 [label = "NULL", color=white ];
n2 [label = "1000(0)" ];
n3 [label = "900(0)" ];
n4 [label = "NULL", color=white ];
n0 -> { n1 n2 }
n2 -> { n3 n4 }
}
```
然後我們對 node 900 做 `st_update` 後,接著往 parent 方向也對 node 1000 做 `st_update`,後者的 hint 會被更新:
```graphviz
digraph BST {
node [fontname="Arial" ];
n0 [label = "1(1)" ];
n1 [label = "NULL", color=white ];
n2 [label = "1000(1)" ];
n3 [label = "900(0)" ];
n4 [label = "NULL", color=white ];
n0 -> { n1 n2 }
n2 -> { n3 n4 }
}
```
繼續往 parent 方向,也對 node 1 做 `st_update`。此時因為 hint 條件下不 balance,將會進行右旋轉 (`st_rotate_right`)。旋轉後的結果呈現以下狀態。
```graphviz
digraph BST {
node [fontname="Arial" ];
n0 [label = "1(1)" ];
n1 [label = "NULL", color=white ];
n2 [label = "1000(1)" ];
n3 [label = "NULL", color=white ];
n4 [label = "900(0)"];
n2 -> { n0 n1 }
n0 -> { n3 n4 }
}
```
這邊可以看到 S-tree 潛在的問題。由於旋轉的關係,node 1000 的連結關係被改變,然而 update 的手法將不會對其 hint 進行二度調整,最後導致 hint 並不能正確反映樹的高度,進而也沒有機會將二元樹平衡。
想像上,若強硬的去追蹤每個節點的高度並平衡將迫使 S-tree 與 AVL 相近,進而失去原本設計上想帶來的優勢。那麼該怎麼權衡可以在有效率的插入/刪除的同時保持一定程度的二元樹平衡呢? 這可能就需要更深入的思考了。
## 測驗 β
### 程式碼運作原理
```cpp
#include <stdint.h>
static inline uintptr_t align_up(uintptr_t sz, size_t alignment)
{
uintptr_t mask = alignment - 1;
if ((alignment & mask) == 0) { /* power of two? */
return MMMM;
}
return (((sz + mask) / alignment) * alignment);
}
```
MMMM = `(sz + mask) & ~mask`
不囉唆,直接看 [align.h](https://github.com/torvalds/linux/blob/master/include/linux/align.h)。
## 測驗 γ
### 考古
[qsort](https://man7.org/linux/man-pages/man3/qsort.3p.html) 是由 C 語言標準中定義的排序介面,而內部的實作則根據函式庫的實現各有不同。值得注意的是,`qsort` 並不等於 quick sort,而其一般是以優化過的 hybrid sort 方式實作,後者混合多種類的 sorting 演算法,並根據輸入選擇合適者,此作法在提供普遍(general)的排序算法時更為有利,如本測驗題的實作也是一種。
:::warning
有點好奇XD :rolling_on_the_floor_laughing:
翻了一下 C 語言標準中(7.22.5.2 The qsort function)的描述,似乎其沒有對 qsort 的時間複雜度給予規範? 不曉得這是否意味假如我們自己開發標準函式庫時,用 bubble sort 作為 qsort 的內部實現也是合法的? 或是我漏掉了其他的章節?
:::
稍微搜尋了一下,本測驗題應該是基於 J. L. Bentley 和 M. D. McIlroy 在 [Engineering a Sort Function](https://onlinelibrary.wiley.com/doi/epdf/10.1002/spe.4380231105) (~~很抱歉沒付錢看不了~~,[連結](https://web.ecs.syr.edu/~royer/cis675/slides/07engSort.pdf)倒是可以找一個簡單的投影片) 提出的方法作為延伸,該方法的身影也可以在 [FreeBSD libc](https://svnweb.freebsd.org/base/head/lib/libc/stdlib/qsort.c?view=markup#l103)、[apple 的 XNU](https://github.com/apple-oss-distributions/xnu/blob/main/bsd/kern/qsort.c) 等等專案中發現。而測驗題則善用其中 quick sort 部份會需要 partition 成兩段陣列並分別排序的特性,嘗試將這些排序平行/並行運算來提昇執行效率。
### 程式碼運作原理
#### `qsort_mt`
`qsort_mt` 是本測驗題所要實作的 multithread qsort 的介面本體。
```cpp
void qsort_mt(void *a,
size_t n,
size_t es,
cmp_t *cmp,
int maxthreads,
int forkelem)
{
struct qsort *qs;
struct common c;
int i, islot;
bool bailout = true;
if (n < forkelem)
goto f1;
errno = 0;
/* Try to initialize the resources we need. */
if (pthread_mutex_init(&c.mtx_al, NULL) != 0)
goto f1;
if ((c.pool = calloc(maxthreads, sizeof(struct qsort))) == NULL)
goto f2;
```
`forkelem` 是決定需不需要額外啟動一個 thread 來平行排序的參數,如果陣列單元數量小於此參數的情況下,可以直接用標準函式庫的 `qsort` 就好。
如果要啟動 multithread qsort,這裡的實作是採用 thread pool 的方式。最為直接的實作下,我們可以判斷需要平行運算時才建立 thread,並在 thread 完成後直接銷毀之。這樣的作法對於資源的控管和 thread 的設計相對輕鬆。但缺點也很明顯,頻繁的建立和銷毀 thread 將造成額外的開銷,進而帶來額外的延遲時間。
取而代之,我們可以預先建立好數個 thread worker,並在需要時才從中挑選 idle thread 出來工作,詳細的說明可以在 [案例: Thread Pool 實作和改進](https://hackmd.io/@sysprog/concurrency/https%3A%2F%2Fhackmd.io%2F%40sysprog%2Fconcurrency-thread-pool#%E6%A1%88%E4%BE%8B-Thread-Pool-%E5%AF%A6%E4%BD%9C%E5%92%8C%E6%94%B9%E9%80%B2)中閱讀到。總而言之,thread pool 雖然管理上更為複雜(後續就會見識到了XD),但在分配平行任務上更為高效,而這裡我們就需要為每個 thread 建立一個專屬的 context `struct qsort`,後者可用來攜帶參數,以告訴 thread 接下來要排序的陣列和其大小等資訊。
```cpp
for (islot = 0; islot < maxthreads; islot++) {
qs = &c.pool[islot];
if (pthread_mutex_init(&qs->mtx_st, NULL) != 0)
goto f3;
if (pthread_cond_init(&qs->cond_st, NULL) != 0) {
verify(pthread_mutex_destroy(&qs->mtx_st));
goto f3;
}
qs->st = ts_idle;
qs->common = &c;
if (pthread_create(&qs->id, NULL, qsort_thread, qs) != 0) {
verify(pthread_mutex_destroy(&qs->mtx_st));
verify(pthread_cond_destroy(&qs->cond_st));
goto f3;
}
}
```
此 for 迴圈初始每個 thread context,包含初始化各自的 mutex 和 condition variable,並建立對應的 thread 等。
```cpp
/* All systems go. */
bailout = false;
/* Initialize common elements. */
c.swaptype = ((char *) a - (char *) 0) % sizeof(long) || es % sizeof(long)
? 2
: es == sizeof(long) ? 0
: 1;
```
`swaptype` 的目的是確認所排序陣列的指標對齊狀況,這攸關後續將一個(`swap`)或多個(`vecswap`)元素作為一個單位進行 swap 的手法。首先仔細觀察其中的條件式:
1. `((char *) a - (char *) 0) % sizeof(long)`: 確認陣列 `a` 是否與 long 型別長度對齊 (若否為 true)
2. `es % sizeof(long)`: 檢查陣列中的元素是否與 long 型別對齊 (若否為 true)
3. `es == sizeof(long)`: 這條件在 1 和 2 皆不成立時才檢查,換句話說 `a` 必然對齊 `sizeof(long)`,且元素長度也對齊之,此時判斷元素大小是否直接等同 `long` (若是為 true)
則總結 `swaptype` 之值代表的意涵以及對應的 swap 手法為:
* 0 代表 `a` 對齊 `sizeof(long)`,且 `es` 等同 `sizeof(long)`
* swap: 直接將指標 cast 成 long 交換即可
* vecswap: 假設總共要交換的空間大小為 `n`,則以 `sizeof(long)` 為單位交換 `n / sizeof(long)` 次
* 1: `a` 對齊 `sizeof(long)`,`es` 對齊 `sizeof(long)` 但不等同
* swap / vecswap: 假設總共要交換的空間大小為 `n`,則以 `sizeof(long)` 為單位交換 `n / sizeof(long)` 次
* 2: `a` 不對齊 `sizeof(long)` 或 `es` 不對齊 `sizeof(long)`
* swap / vecswap: 假設總共要交換的空間大小為 `n`,則以 `sizeof(char)` 為單位交換 `n / sizeof(char)` 次
* 顯然 `sizeof(char) < sizeof(long)`,因此 `swaptype == 2` 之情況下可以預期是用較無效率的方法進行的
:::info
:notes:
在 C 語言標準 6.5.3.4 The sizeof and alignof operators 中提到:
> When sizeof is applied to an operand that has type char, unsigned char, or signed char, (or a qualified version thereof) the result is 1.
換句話說,在不對齊狀況下,每次只能以 1 bytes 為單位進行 swap。
:::
```cpp
c.es = es;
c.cmp = cmp;
c.forkelem = forkelem;
c.idlethreads = c.nthreads = maxthreads;
```
`struct common` 的 `c` 攜帶一些與陣列整體相關的資訊,原則上是在各個 thread 中只可讀的資訊。(`idlethreads` 應該是唯一的例外?)
```cpp
/* Hand out the first work batch. */
qs = &c.pool[0];
verify(pthread_mutex_lock(&qs->mtx_st));
qs->a = a;
qs->n = n;
qs->st = ts_work;
c.idlethreads--;
verify(pthread_cond_signal(&qs->cond_st));
verify(pthread_mutex_unlock(&qs->mtx_st));
```
必要的初始化都完成後,首先就可以啟動 thread 0 來展開排序的第一步。
```cpp
/* Wait for all threads to finish, and free acquired resources. */
f3:
for (i = 0; i < islot; i++) {
qs = &c.pool[i];
if (bailout) {
verify(pthread_mutex_lock(&qs->mtx_st));
qs->st = ts_term;
verify(pthread_cond_signal(&qs->cond_st));
verify(pthread_mutex_unlock(&qs->mtx_st));
}
verify(pthread_join(qs->id, NULL));
verify(pthread_mutex_destroy(&qs->mtx_st));
verify(pthread_cond_destroy(&qs->cond_st));
}
free(c.pool);
f2:
verify(pthread_mutex_destroy(&c.mtx_al));
if (bailout) {
fprintf(stderr, "Resource initialization failed; bailing out.\n");
f1:
qsort(a, n, es, cmp);
}
}
```
之後,main thread 就在 for 迴圈中藉由 `pthread_join` 逐一等待每個 thread 結束,並在 thread 結束時銷毀當初分配的資源。
順帶一提,在 thread pool 的方式下,要怎麼知道後續已經沒有工作需要 thread worker,可以將各個 thread 結束呢? 一種可行的做法是整理所有 thread 的狀態是否 idle。由於我們可以預期直到整個陣列的排序完成前都會有 thread 在運作,因此反過來說當所有 thread 都 idle 時該次排序應該已經完成。後續我們會在 [`qsort_thread`](#qsort_thread) 看到對應實作。
#### `qsort_thread`
每個 worker 都會執行各自的 `qsort_thread`。其主要流程可以分成三個階段:
1. 等待被喚醒,閒閒沒事(?)的 thread 之 `qs->st` 會是 `ts_idle`,直到狀態被切換至 `ts_work` 開始工作,或是切換到 `ts_term` 後結束
* 避免 busy-waiting,可以搭配 [condition variable](https://linux.die.net/man/3/pthread_cond_init)
2. 喚醒後根據 context 設定進行對應的 sorting
3. 回到 idle 狀態,此外,若發現自己完成後所有的 thread 已經都是 idle,表示最初指派的任務已經完成,可以結束自身,也將其他 thread 之狀態切換到 `ts_term` 以結束之
這三階段就對應到下方的程式碼。
```cpp
static void *qsort_thread(void *p)
{
struct qsort *qs, *qs2;
int i;
struct common *c;
qs = p;
c = qs->common;
again:
/* Wait for work to be allocated. */
verify(pthread_mutex_lock(&qs->mtx_st));
while (qs->st == ts_idle)
verify(HHHH);
verify(pthread_mutex_unlock(&qs->mtx_st));
if (qs->st == ts_term) {
return NULL;
}
assert(qs->st == ts_work);
```
這裡是第一階段行為的描述。其中,對於 HHH 的答案,從外層迴圈行為是在等待 state 的變更,以及該迴圈是搭配 mutex 建立的 critical section 使用的狀態來看,不難看出 HHHH 應該是在等待 condition variable 被 signal,因此:
* HHHH = `pthread_cond_wait(&qs->cond_st, &qs->mtx_st)`
```cpp
qsort_algo(qs);
```
第二階段則進行主要任務 [`qsort_algo`](#qsort_algo)。
```cpp
verify(pthread_mutex_lock(&c->mtx_al));
qs->st = ts_idle;
c->idlethreads++;
if (c->idlethreads == c->nthreads) {
for (i = 0; i < c->nthreads; i++) {
qs2 = &c->pool[i];
if (qs2 == qs)
continue;
verify(pthread_mutex_lock(&qs2->mtx_st));
qs2->st = ts_term;
verify(JJJJ);
verify(pthread_mutex_unlock(&qs2->mtx_st));
}
verify(pthread_mutex_unlock(&c->mtx_al));
return NULL;
}
verify(pthread_mutex_unlock(&c->mtx_al));
goto again;
}
```
第三階段也是如前面的敘述,當 `c->idlethreads == c->nthreads`,則將其他的 thread 都切換 state 為 `st_term`,因此也需要透過 condition variable 喚醒對應 thread,則:
* JJJJ = `pthread_cond_signal(&qs2->cond_st)`
#### `qsort_algo`
```cpp
/* Thread-callable quicksort. */
static void qsort_algo(struct qsort *qs)
{
char *pa, *pb, *pc, *pd, *pl, *pm, *pn;
int d, r, swaptype, swap_cnt;
void *a; /* Array of elements. */
size_t n, es; /* Number of elements; size. */
cmp_t *cmp;
int nl, nr;
struct common *c;
struct qsort *qs2;
/* Initialize qsort arguments. */
c = qs->common;
es = c->es;
cmp = c->cmp;
swaptype = c->swaptype;
a = qs->a;
n = qs->n;
```
前面只是初始化一些 local 變數,以建立簡潔易讀的程式碼。
```cpp
top:
/* From here on qsort(3) business as usual. */
swap_cnt = 0;
if (n < 7) {
for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)
for (pl = pm; pl > (char *) a && CMP(thunk, pl - es, pl) > 0;
pl -= es)
swap(pl, pl - es);
return;
}
```
最開始針對 array 比較小的狀況(<7)做優化,該情況下用簡易的 insertion sort 處理即可。
:::warning
在下方 `swap_cnt` 為 0 之處也有一段 insertion sort,感覺或許可以歸納成一個 function 以簡化程式碼?
:::
:::info
這邊由於 C 語言不提供泛型,因應 sort 可以針對不同型別排序的情況下,需要以 `(char *)a + es * i` 去存取第 i 個 element,不得不說閱讀起來有點不舒服XD
:::
```cpp
pm = (char *) a + (n / 2) * es;
if (n > 7) {
pl = (char *) a;
pn = (char *) a + (n - 1) * es;
if (n > 40) {
d = (n / 8) * es;
pl = med3(pl, pl + d, pl + 2 * d, cmp, thunk);
pm = med3(pm - d, pm, pm + d, cmp, thunk);
pn = med3(pn - 2 * d, pn - d, pn, cmp, thunk);
}
pm = med3(pl, pm, pn, cmp, thunk);
}
swap(a, pm);
```
陣列比較大的情況下,quick sort 會是更佳的選擇。眾所周知 quick sort 需要選擇 pivot 來將陣列進行二分。而為了避免 pivot 的選擇不好而導致容易出現 worst case (即時間複雜度為 $O(n^2)$)的狀況,一種常見的作法是透過 median of 3 (`med3`) 來挑選合適的 pivot。
> 延伸閱讀: [Improving Quicksort with Median of 3 and Cutoffs](https://youtu.be/1Vl2TB7DoAM)
這裡在挑選出 pivot 後,也將其置換為 array 的首個 element。
```cpp
pa = pb = (char *) a + es;
pc = pd = (char *) a + (n - 1) * es;
for (;;) {
while (pb <= pc && (r = CMP(thunk, pb, a)) <= 0) {
if (r == 0) {
swap_cnt = 1;
swap(pa, pb);
pa += es;
}
pb += es;
}
while (pb <= pc && (r = CMP(thunk, pc, a)) >= 0) {
if (r == 0) {
swap_cnt = 1;
swap(pc, pd);
pd -= es;
}
pc -= es;
}
if (pb > pc)
break;
swap(pb, pc);
swap_cnt = 1;
pb += es;
pc -= es;
}
```
```graphviz
digraph {
a [label="a", color=white];
pa [label="pa", color=white];
pb [label="pb", color=white];
pc [label="pc", color=white];
pd [label="pd", color=white];
node [shape=record, fontcolor=black, fontsize=14, width=4.75, fixedsize=true];
values [label="<f0> A[0] | <f1> A[1] | <f2> A[2] | <f3> ...... | <f4> A[n - 2] | <f5> A[n - 1]", color=blue, fillcolor=lightblue, style=filled];
edge [color=blue];
a -> values:f0;
pa -> values:f1;
pb -> values:f1;
pc -> values:f5;
pd -> values:f5;
}
```
接下來這邊是 quick sort 中經典的 partition 實作。因為主要目標是要將 order 比 pivot 小者放到 pivot 左方,order 比 pivot 大者放到右方,因此作法是從 `pb` 往右找到比 pivot 大的元素,從 `pc` 往左找到比 pivot 大者,然後將兩者交換,並重複此流程直到兩個 pointer 交會。
注意到與普通 quick sort 相比實作上還是有一些小差異,這提供演算法後續更多的優化空間:
* 期間也記錄下進行 swap 的次數
* 特地將與 pivot order 相同的元素放置在從 `pa` 往右/從 `pd` 往左之處
```cpp
pn = (char *) a + n * es;
r = min(pa - (char *) a, pb - pa);
vecswap(a, pb - r, r);
r = min(pd - pc, pn - pd - es);
vecswap(pb, pn - r, r);
```
這裡的兩個 `vecswap` 將 pivot 最後置換到正確的位置中。
```cpp
if (swap_cnt == 0) { /* Switch to insertion sort */
r = 1 + n / 4; /* n >= 7, so r >= 2 */
for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)
for (pl = pm; pl > (char *) a && CMP(thunk, pl - es, pl) > 0;
pl -= es) {
swap(pl, pl - es);
if (++swap_cnt > r)
goto nevermind;
}
return;
}
```
如果 `swap_cnt` 為 0,這表示在前面的 partition 中沒發生任何置換,即原始的陣列可以在不置換的前提下完美分成 `< pivot` 和 `> pivot` 的兩個子陣列。這情況很可能表示陣列其實已經完成排序或接近完成,因此我們嘗試直接對整個陣列進行 insert sort。
在此我們仍然持續追蹤完成 sort 所需要進行的 swap 數量,如果期間發現仍需要比較多的 swap,那麼可以反悔,還是進行標準的 quick sort。
```cpp
nevermind:
nl = (pb - pa) / es;
nr = (pd - pc) / es;
/* Now try to launch subthreads. */
if (nl > c->forkelem && nr > c->forkelem &&
(qs2 = allocate_thread(c)) != NULL) {
qs2->a = a;
qs2->n = nl;
verify(pthread_cond_signal(&qs2->cond_st));
verify(pthread_mutex_unlock(&qs2->mtx_st));
} else if (nl > 0) {
qs->a = a;
qs->n = nl;
qsort_algo(qs);
}
```
如果選擇做標準的 quick sort,那就是以 pivot 為界線將陣列區分為左右兩組。對於左邊這組,我們可以考量左右子陣列剩餘需要排序的元素數量,以決定是否從 thread pool 中喚醒另一個 thread 來並行的完成 quick sort。
```cpp
if (nr > 0) {
a = pn - nr * es;
n = nr;
goto top;
}
}
```
而右邊的子陣列則可以沿用原本的 thread 繼續運算。
:::warning
想像上這一段寫成以下形式會不會在可讀性上更佳?
```cpp
if (nr > 0) {
qs->a = pn - nr * es;
qs->n = nr;
qsort_algo(qs);
}
```
:::
### 實作上的 Data race
在編譯選項中加上 `-fsanitize=thread` 可以在程式中引進 [Thread Sanitizer](https://github.com/google/sanitizers/wiki/ThreadSanitizerCppManual),後者是能夠偵測執行緒間 data race 的強大工具。另外建議也加入 `-g` 選項來導入 DWARF,以提升除錯訊息的詳細程度。
加入 flag 後,執行結果如下:
```diff
==================
- WARNING: ThreadSanitizer: data race (pid=6973)
Write of size 4 at 0x7b4000000080 by thread T1 (mutexes: write M0):
#0 allocate_thread /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:212 (qsort-mt.out+0x3a7f)
#1 qsort_algo /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:315 (qsort-mt.out+0x3a7f)
#2 qsort_thread /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:352 (qsort-mt.out+0x3fd8)
Previous read of size 4 at 0x7b4000000080 by thread T2 (mutexes: write M2):
#0 qsort_thread /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:344 (qsort-mt.out+0x3f9a)
Location is heap block of size 256 at 0x7b4000000000 allocated by main thread:
#0 calloc ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:672 (libtsan.so.0+0x31edc)
#1 qsort_mt /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:133 (qsort-mt.out+0x43e3)
#2 main /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:505 (qsort-mt.out+0x2ad3)
Mutex M0 (0x7fff028782d8) created at:
#0 pthread_mutex_init ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:1227 (libtsan.so.0+0x4bee1)
#1 qsort_mt /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:131 (qsort-mt.out+0x43bc)
#2 main /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:505 (qsort-mt.out+0x2ad3)
Mutex M2 (0x7b40000000a8) created at:
#0 pthread_mutex_init ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:1227 (libtsan.so.0+0x4bee1)
#1 qsort_mt /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:137 (qsort-mt.out+0x44ae)
#2 main /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:505 (qsort-mt.out+0x2ad3)
Thread T1 (tid=6975, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 qsort_mt /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:145 (qsort-mt.out+0x4468)
#2 main /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:505 (qsort-mt.out+0x2ad3)
Thread T2 (tid=6976, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 qsort_mt /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:145 (qsort-mt.out+0x4468)
#2 main /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:505 (qsort-mt.out+0x2ad3)
SUMMARY: ThreadSanitizer: data race /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:212 in allocate_thread
==================
```
Thread Sanitizer 明確指出出現 data race 的程式碼位置,對照後會發現問題是出在 `allocate_thread` 中,在存取 `st` 成員前應該先對其相應的 mutex lock 進行上鎖動作。
```diff
static struct qsort *allocate_thread(struct common *c)
{
verify(pthread_mutex_lock(&c->mtx_al));
for (int i = 0; i < c->nthreads; i++)
if (c->pool[i].st == ts_idle) {
c->idlethreads--;
+ verify(pthread_mutex_lock(&c->pool[i].mtx_st));
c->pool[i].st = ts_work;
- verify(pthread_mutex_lock(&c->pool[i].mtx_st));
...
```
完成上述改動後重新編譯,即可以排除 Thread Sanitizer 的警告訊息。