# linux2023: RinHizakura - 作業 1 > * [2023 年暑期 Linux 核心課程第 1 次作業](https://hackmd.io/@sysprog/linux2023-summer-quiz0) > * [linux-summer-2023: hw1](https://github.com/RinHizakura/linux-summer-2023/tree/main/hw1) ## 測驗 α ### S-tree 程式碼運作原理 #### 資料結構 ```cpp /* S-Tree uses hints to decide whether to perform a balancing operation or not. * Hints are similar to AVL-trees' height property, but they are not * required to be absolutely accurate. A hint provides an approximation * of the longest chain of nodes under the node to which the hint is attached. */ struct st_node { short hint; struct st_node *parent; struct st_node *left, *right; }; struct st_root { struct st_node *root; }; ``` 首先從資料結構中探討起,我們可以看出 S-tree 的設計上與一般的 BST 類似,結構中維護左右與親代的子節點。特別的是 `hint` 這個成員,這讓 S-tree 可以在追求快速的 insert/delete 操作速度的同時,也可以**近乎**達到 AVL tree 所期待的平衡效果以獲得更佳/更一致(consistent)的 lookup 延遲。其作用細節在後續的實作中解釋。 :::danger 設計上覺得 S-tree 提供的介面(API,即 `st_*` 系列函式) 似乎有點彆扭? 比方說 `st_insert` 的實作需要由外部的 caller 先確認好 node 之間的 comparison order。假設我們確立 S-tree 就是只作為 BST 來使用,我覺得理想的介面應該是: ```cpp void st_insert(struct st_node **root, struct st_node *n); ``` 可能再加上做 compare 的函式指標 (`*cmp`),或是為 `root` 的 `st_node` 再包裝一個更高級的資料結構並將 `cmp` 作為其中一員。 ::: #### `st_first` ```cpp struct st_node *st_first(struct st_node *n) { if (!st_left(n)) return n; return st_first(st_left(n)); } struct st_node *st_last(struct st_node *n) { if (!st_right(n)) return n; return st_last(st_right(n)); } ``` `st_first` 和 `st_last` 的作用是找到以 `n` 為 root 的 subtree 中,order 最低即最高(根據對節點間 comparison 定義)的節點。 #### `st_replace_right` `st_replace_right` 的功能是用 `r` 來取代 `n` 原本在樹上的位置。更精準的說,這其實是 BST 之 remove 操作的一種形式,實際上 `r` 與 `n` 的關係必然滿足 `r = st_first(st_right(n))`(見 `st_remove`)。基於此前提下,這個 replace 實作才是有效且正確的。 可以想見有幾個節點間的關係可能會因應 replace 而進行調整: 1. `r` 的左子樹 2. `r` 的右子樹 3. `r` 的父節點 4. ~~`r` 原本的左子樹之父節點~~: 因為 `r` 是 `st_first(st_right(n))`,左子樹必定為 NULL 5. `r` 原本的右子樹之父節點 6. `r` 原本的父節點之左or右子樹 7. `r` 原本的父節點之父節點 8. `n` 原本的左子樹之父節點 9. `n` 原本的右子樹之父節點 10. `n` 原本的父節點之左or右子樹 在實作 replace 時,我們要考慮這些 link 是否都調整正確。 ```cpp static inline void st_replace_right(struct st_node *n, struct st_node *r) { struct st_node *p = st_parent(n), *rp = st_parent(r); if (st_left(rp) == r) { st_left(rp) = st_right(r); if (st_right(r)) st_rparent(r) = rp; } ``` 首先判斷 `r` 是 `rp` 的左子還是右子,如果是左子,則將 `rp` 左子替換為 `r` 的右子樹(6),右子樹的 parent 也需要更新(5)。 :::info 1. 如果 `r` 是 `rp` 的右子,其實可以預期此時 `n` 就是 `rp`,則此時就不必將 `r` 的右子樹交給其他人繼承,可想而知 `rp` 的左右也不可能繼承任何子樹,稍後就會被刪除 2. `r` 的左子樹怎麼辦呢? 留意到因為在刪除操作中 `r` 必然是來自某一子樹 `tree` 的 `st_first(tree)`,因此可預期 `st_left(r) = NULL` ::: ```cpp if (st_parent(rp) == n) st_parent(rp) = r; ``` `rp` 的父節點是 `n` 的狀況下,由於 `n` 即將被刪除且由 `r` 取代,因此 `st_parent(rp)` 勢必要被修改(7)。 ```cpp st_parent(r) = p; st_left(r) = st_left(n); ``` `r` 將取代 `n` 原本的位置,這裡調整此關係(1 & 3)。 ```cpp if (st_right(n) != r) { st_right(r) = st_right(n); st_rparent(n) = r; } ``` `r` 不正好是 `n` 的右子樹的情況下,`r` 需繼承 `n` 的右子樹,反過來的關係也需建立(2 & 9)。 ```cpp if (p && st_left(p) == n) st_left(p) = r; else if (p) st_right(p) = r; ``` 這裡調整 `n` 之父節點的子代(10)。 ```cpp if (st_left(n)) st_lparent(n) = r; } ``` 調整 `n` 原本左子樹的父節點關係(8)。 #### `st_replace_right` 與 `st_replace_left` 類似,這裡就不多做探討。 #### `st_remove` `st_remove` 將給定的 `del` 從樹中移除。 ```cpp void st_remove(struct st_node **root, struct st_node *del) { if (st_right(del)) { struct st_node *least = st_first(st_right(del)); if (del == *root) *root = least; AAAA; BBBB; return; } ``` 刪除方法的基本邏輯近似一般的 BST,若右子樹存在,則找到右子樹中 order 最小者(`st_left` 必然為 NULL),以其取代原本的 `del` 即可。 * AAAA = `st_replace_right(del, least)` * BBBB = `st_update(root, st_right(least))` ```cpp if (st_left(del)) { struct st_node *most = st_last(st_left(del)); if (del == *root) *root = most; CCCC; DDDD; return; } ``` 若右子樹不存在,則找到左子樹中 order 最大者(`st_right` 必然為 NULL),以其取代原本的 `del`。 * CCCC = `st_replace_left(del, most)` * DDDD = `st_update(root, st_left(most))` ```cpp if (del == *root) { *root = 0; return; } ``` 考慮到刪除的節點剛好是 root 的情況下,也需要更新 root 指標。 ```cpp /* empty node */ struct st_node *parent = st_parent(del); if (st_left(parent) == del) st_left(parent) = 0; else st_right(parent) = 0; EEEE; } ``` 如果被被刪除的節點沒有左右都沒有節點,意即其為 leaf,只要將調整其 parent 的 child 關係,再直接刪除之就好。 EEEE = `st_update(root, parent)` #### `st_rotate_right` rotate 是為了平衡 BST 所需進行的操作,它調整數個節點間的親子關係以達此目的。方便理解,我們以下圖為例示範如何對 node 6 做 rotate right 操作。 ```cpp static inline void st_rotate_right(struct st_node *n) { struct st_node *r = st_right(n), *p = st_parent(n); ``` 根據程式碼定義初始的狀態應該如下。 ```graphviz digraph BST { node [fontname="Arial" ]; l3 [ label = "3" ]; l4 [ label = "4(*p)" ]; l5 [ label = "5" ]; l6 [ label = "6(*n)" ]; l7 [ label = "7(*r)" ]; n1 [ label = "NULL", color=white]; n2 [ label = "8"]; n3 [ label = "NULL", color=white]; n4 [ label = "9"]; l4 -> { l3 l6 }; l6 -> { l5 l7 }; l7 -> { n1 n2 }; n2 -> { n3 n4 }; } ``` ```cpp st_parent(r) = st_parent(n); st_right(n) = st_left(r); st_parent(n) = r; st_left(r) = n; if (p && st_left(p) == n) st_left(p) = r; else if (p) st_right(p) = r; if (st_right(n)) st_rparent(n) = n; } ``` 則可以根據程式碼行為一步一步用紙筆檢查 pointer 的調整,完成旋轉後應該會變成如下圖的樣子。看到 node 7 被從右子樹往上提一個等級,藉此降低原本子樹的高度。 ```graphviz digraph BST { node [fontname="Arial" ]; l3 [ label = "3" ]; l4 [ label = "4(*p)" ]; l5 [ label = "5" ]; l6 [ label = "6(*n)" ]; l7 [ label = "7(*r)" ]; n1 [ label = "NULL", color=white]; n2 [ label = "8"]; n3 [ label = "NULL", color=white]; n4 [ label = "9"]; l4 -> { l3 l7 } l6 -> { l5 n1 } l7 -> { l6 n2} n2 -> { n3 n4 }; } ``` #### `st_rotate_left` 道理類似 `st_rotate_right`,這邊就不再費時說明。 #### `st_update` ```cpp static inline void st_update(struct st_node **root, struct st_node *n) { if (!n) return; int b = st_balance(n); int prev_hint = n->hint; ``` `st_balance` 判斷節點 `n` 在左右子樹的平衡程度,當 b > 0,表示左子樹的高度大於右子樹,反之 b < 0 則表示是右子樹較高。注意到嚴格來說 `hint` 只是對高度的估計而非完全正確的高度值。 ```cpp struct st_node *p = st_parent(n); if (b < -1) { /* leaning to the right */ if (n == *root) *root = st_right(n); st_rotate_right(n); } else if (b > 1) { /* leaning to the left */ if (n == *root) *root = st_left(n); st_rotate_left(n); } ``` 如果右子樹比較高,我們就做 `st_rotate_right`;與之相對,左子樹較高的情況是做 `st_rotate_left` 來平衡 BST。 ```cpp n->hint = st_max_hint(n); if (n->hint == 0 || n->hint != prev_hint) st_update(root, p); } ``` 旋轉之後,因為節點本身左右高度的變化要更新 hint。而當自身 hint 有變或是變成 leaf 時,也要向**原本的** parent 方向(以前面 `st_rotate_right` 旋轉 node 6 為例,即是要 update node 4)繼續更新 `hint` 和調整平衡。 #### `st_insert` `st_insert` 將節點插入到 S-tree 中。 ```cpp void st_insert(struct st_node **root, struct st_node *p, struct st_node *n, enum st_dir d) { if (d == LEFT) st_left(p) = n; else st_right(p) = n; st_parent(n) = p; st_update(root, n); } ``` 實作上沒有什麼複雜之處,就是將 `n` 插入成指定的 parent `p` 之左或右子節點。 ### 基於 S-tree 的 `treeint` 運作原理 #### 資料結構 ```cpp struct treeint { int value; struct st_node st_n; }; ``` 模仿 Linux 的 linked list 風格,`st_node` 可以被置於任何 struct 下以串連不同種類的資料結構。本題中是以包含單一 int 的節點示範。 #### `treeint_insert` ```cpp struct treeint *treeint_insert(int a) { struct st_node *p = NULL; enum st_dir d; for (struct st_node *n = st_root(tree); n;) { struct treeint *t = container_of(n, struct treeint, st_n); if (a == t->value) return t; p = n; if (a < t->value) { n = st_left(n); d = LEFT; } else if (a > t->value) { n = st_right(n); d = RIGHT; } } ``` 因為目前 `st_insert` 提供的介面,`treeint_insert` 必須先找出要插入的位置。 注意到嚴格的 BST 中是不存在重複 key 值的節點的,因此若存在重複我們直接回傳該已存在的相同 key 值節點即可。 ```cpp struct treeint *i = calloc(sizeof(struct treeint), 1); if (st_root(tree)) st_insert(&st_root(tree), p, &i->st_n, d); else st_root(tree) = &i->st_n; i->value = a; return i; } ``` 找到符合 BST 的正確位置後,完成節點的插入。 #### `__treeint_dump` ```cpp static void __treeint_dump(struct st_node *n, int depth) { if (!n) return; __treeint_dump(FFFF, depth + 1); struct treeint *v = treeint_entry(n); printf("%d\n", v->value); __treeint_dump(GGGG, depth + 1); } ``` 經典的 inorder 走訪。 * FFFF = `st_left(n)` * GGGG = `st_right(n)` ### S-tree 的介面改進 雖然原題目沒有要求這項改進。不過正如前面所說,我認為 S-tree 目前的 API 設計很彆扭。若我們將 S-tree 作為函式庫來看,從使用者的角度而言,他應該只關心插入和移除的鍵值為何,而不必明確知道 S-tree 內部是如何管理節點之間關係的。然而目前的設計下我們可以在 `treeint_insert` 和 `treeint_remove` 看到依然存在複雜的邏輯來走訪整個樹,後者應該被隱藏在函式庫內部的實作才是。 為此,我重新定義了 S-tree 提供的 public interface。 ```cpp struct st_tree *st_create(cmp_t *cmp, struct st_node *(*create_node)(void *key), void (*destroy_node)(struct st_node *n)); void st_destroy(struct st_tree *tree); int st_insert(struct st_tree *tree, void *key); int st_remove(struct st_tree *tree, void *key); struct st_node *st_find(struct st_tree *tree, void *key); ``` 可以在 `st_create` 中看到節點之間的 order 比較(`cmp`)、節點的建立與銷毀(`create_node`/`destroy_node`) 被從 S-tree 中獨立出來,以 callback function 的方式讓函式庫的使用者自行實作。其餘包含樹的走訪邏輯、root node 的維護等都直接在內部完成。 順帶一提,原本 `treeint_insert` 和 `treeint_remove` 中走訪 S-tree 的邏輯其實很相似。這裡我們實際上也可以重用 `st_find` 函式來歸納之,避免重寫相同的邏輯,以減少程式碼編譯後產生之二進位檔的大小。 完整的實作改進可以參照 [s_tree.c](https://github.com/RinHizakura/linux-summer-2023/blob/main/hw1/treeint/s_tree.c)。有了這項調整,未來就可以很輕易地將 S-tree 應用在其他合適的專案之中了! :::warning 雖然介面上變得簡單許多,不過其實如果對照 Linux 的 rbtree 實作,會發現其反而避免使用了使用 callback 的方式,作法比較接近本題原始的實作。 原因可參照原作者在 [rbtree.h](https://elixir.bootlin.com/linux/latest/source/include/linux/rbtree.h#L9) 中的註解說明: > To use rbtrees you'll have to implement your own insert and search cores. This will avoid us to use callbacks and to drop drammatically performances. I know it's not the cleaner way, but in C (not in C++) to get performances and genericity... 其實不是很確定作者所述的 performance drop 具體有多高? 主要來自什麼部份(cache locality?),或許值得探討一下。畢竟如果能夠以 callback 方式實作,毫無疑問對使用者的上手門檻是更低的,應用上也更為容易 ::: ### 探討 S-tree 與 rbtree 效能落差 首先,為利於後續加入其他演算法進行效能評比,對原始程式碼做出以下改動,以建立相關基礎建設: 1. 加入一個 macro `benchmark`,方便測量一段程式碼完成的時間 * 主要測量 insert/find/remove 三種操作的時間 * ~~macro 寫得不是很漂亮~~,不過暫時還算堪用 2. 撰寫一個簡單的 python script `tree-perf.py`,方便將測量結果視覺化 * 為了讓測量結果盡量不受 noise 干擾,且視覺化後的曲線圖可以更平滑,試著用簡單的標準差法排除蒐集到的時間中之 outlier 3. 將 BST 的各項操作也抽象成 callback function pointer * 雖然前面提到 callback 的方式可能會對函式呼叫整體造成額外成本,但這裡我們主要的目標是要比較不同 tree 的 throughput,而非注重單一 tree 本身在實際應用上的可能優化 * 因此取捨傾向先以能夠容易且快速的加入各種 BST 實作的設計方式為主 效能評比程式有了,那麼 rbtree 的函式庫該從何而來呢? 稍微搜尋了一下,一方面似乎沒有比較被大眾廣泛接受的紅黑樹相關 C 函式庫;一方面,S-tree 的實作和 Linux 中的 rbtree 介面上其實存在不少相似之處。因此我們乾脆將 Linux 的紅黑樹實作拉到 userspace 來應用。這裡參照 Linux 6.4 版本中的程式碼,並且只擷取與 insert/find/remove 三項操作相關的函式實作,具體整理出來的程式碼可以參考: * [rbtree.h](https://github.com/RinHizakura/linux-summer-2023/blob/main/hw1/treeint/rbtree.h) * [rbtree.c](https://github.com/RinHizakura/linux-summer-2023/blob/main/hw1/treeint/rbtree.c) 接著我們就可以來測量各個操作間的差異。 根據老師的建議將實驗分為循序輸入和亂序輸入的場景。對於循序輸入的部份,我們將從 `0` 一直到 `tree_size - 1` 的 key 值插入到樹中,然後再以相同的次序進行搜尋和刪除。過程記錄不同的樹節點數量與完成相應操作所需之累積時間的關係。則實驗的結果如下圖: ![](https://hackmd.io/_uploads/SkDF9RJ22.png =400x) 循序狀況下,似乎在樹節點數量較少時兩者差異不大,而節點增加時,S-tree 相較於 rbtree 的表現更好? 而亂序輸入的場景下,我們會隨機產生任意的插入/搜尋/移除值,實驗結果如下: ![](https://hackmd.io/_uploads/BkhEjAkhn.png =400x) 從結果來看,亂序狀況下,S-tree 整體的效能看起來稍優於 rbtree。且在亂序情形下 rbtree 似乎更容易有 worst case (圖中極端值) 的出現。 該怎麼依此給出合理的解釋呢? * 需注意到其實無論是插入/搜尋/刪除操作,其實三者都具備**搜尋**的部份,而插入和刪除只是多了調整節點間關係+旋轉的步驟 * 則如果瓶頸發生在搜尋部份,可以預期三種操作測量出的時間曲線圖將很相似(例如本實驗)? * 對於亂序下出現特殊的極端值(圖中的尖峰),可能是對某個 key 值的搜尋會導致最差情況(worst case)的發生? * 不認為是硬體(CPU、Cache)干擾的原因在於三種操作在同一節點數量都出現極端值 * 理論上,在樹的平衡上 S-tree 應具有較為明顯的優勢,如下圖以循序插入 0 - 9 的節點為例,可以看到在限制樹高度的能力上 S-tree 是更有效的 > ![](https://hackmd.io/_uploads/S1s3Hkg33.png) > S-tree 從 0 到 9 插入節點後 > ![](https://hackmd.io/_uploads/BkEKrJe3h.png) > rbtree 從 0 到 9 插入節點後 :::warning 1. 以上敘述純屬推測(~~嘴炮~~),需拿出更多證據支持 2. 設計何種場景可以彰顯 rbtree 在插入/移除上的優勢? ::: ### S-tree 的最壞場景 然而在特定輸入上可以體現 S-tree 的 hint 對高度估計的不精準導致的 worst case。我們首先輸入 1,接著輸入 1000,則 S-tree 會呈現以下樣貌: ```graphviz digraph BST { node [fontname="Arial" ]; n0 [label = "1(1)" ]; n1 [label = "NULL", color=white ]; n2 [label = "1000(0)" ]; n0 -> { n1 n2 } } ``` 此時 node 1 的 hint 為 1,node 1000 的 hint 為 0(括號中數字)。接著插入 900,最開始會呈現以下樣貌: ```graphviz digraph BST { node [fontname="Arial" ]; n0 [label = "1(1)" ]; n1 [label = "NULL", color=white ]; n2 [label = "1000(0)" ]; n3 [label = "900(0)" ]; n4 [label = "NULL", color=white ]; n0 -> { n1 n2 } n2 -> { n3 n4 } } ``` 然後我們對 node 900 做 `st_update` 後,接著往 parent 方向也對 node 1000 做 `st_update`,後者的 hint 會被更新: ```graphviz digraph BST { node [fontname="Arial" ]; n0 [label = "1(1)" ]; n1 [label = "NULL", color=white ]; n2 [label = "1000(1)" ]; n3 [label = "900(0)" ]; n4 [label = "NULL", color=white ]; n0 -> { n1 n2 } n2 -> { n3 n4 } } ``` 繼續往 parent 方向,也對 node 1 做 `st_update`。此時因為 hint 條件下不 balance,將會進行右旋轉 (`st_rotate_right`)。旋轉後的結果呈現以下狀態。 ```graphviz digraph BST { node [fontname="Arial" ]; n0 [label = "1(1)" ]; n1 [label = "NULL", color=white ]; n2 [label = "1000(1)" ]; n3 [label = "NULL", color=white ]; n4 [label = "900(0)"]; n2 -> { n0 n1 } n0 -> { n3 n4 } } ``` 這邊可以看到 S-tree 潛在的問題。由於旋轉的關係,node 1000 的連結關係被改變,然而 update 的手法將不會對其 hint 進行二度調整,最後導致 hint 並不能正確反映樹的高度,進而也沒有機會將二元樹平衡。 想像上,若強硬的去追蹤每個節點的高度並平衡將迫使 S-tree 與 AVL 相近,進而失去原本設計上想帶來的優勢。那麼該怎麼權衡可以在有效率的插入/刪除的同時保持一定程度的二元樹平衡呢? 這可能就需要更深入的思考了。 ## 測驗 β ### 程式碼運作原理 ```cpp #include <stdint.h> static inline uintptr_t align_up(uintptr_t sz, size_t alignment) { uintptr_t mask = alignment - 1; if ((alignment & mask) == 0) { /* power of two? */ return MMMM; } return (((sz + mask) / alignment) * alignment); } ``` MMMM = `(sz + mask) & ~mask` 不囉唆,直接看 [align.h](https://github.com/torvalds/linux/blob/master/include/linux/align.h)。 ## 測驗 γ ### 考古 [qsort](https://man7.org/linux/man-pages/man3/qsort.3p.html) 是由 C 語言標準中定義的排序介面,而內部的實作則根據函式庫的實現各有不同。值得注意的是,`qsort` 並不等於 quick sort,而其一般是以優化過的 hybrid sort 方式實作,後者混合多種類的 sorting 演算法,並根據輸入選擇合適者,此作法在提供普遍(general)的排序算法時更為有利,如本測驗題的實作也是一種。 :::warning 有點好奇XD :rolling_on_the_floor_laughing: 翻了一下 C 語言標準中(7.22.5.2 The qsort function)的描述,似乎其沒有對 qsort 的時間複雜度給予規範? 不曉得這是否意味假如我們自己開發標準函式庫時,用 bubble sort 作為 qsort 的內部實現也是合法的? 或是我漏掉了其他的章節? ::: 稍微搜尋了一下,本測驗題應該是基於 J. L. Bentley 和 M. D. McIlroy 在 [Engineering a Sort Function](https://onlinelibrary.wiley.com/doi/epdf/10.1002/spe.4380231105) (~~很抱歉沒付錢看不了~~,[連結](https://web.ecs.syr.edu/~royer/cis675/slides/07engSort.pdf)倒是可以找一個簡單的投影片) 提出的方法作為延伸,該方法的身影也可以在 [FreeBSD libc](https://svnweb.freebsd.org/base/head/lib/libc/stdlib/qsort.c?view=markup#l103)、[apple 的 XNU](https://github.com/apple-oss-distributions/xnu/blob/main/bsd/kern/qsort.c) 等等專案中發現。而測驗題則善用其中 quick sort 部份會需要 partition 成兩段陣列並分別排序的特性,嘗試將這些排序平行/並行運算來提昇執行效率。 ### 程式碼運作原理 #### `qsort_mt` `qsort_mt` 是本測驗題所要實作的 multithread qsort 的介面本體。 ```cpp void qsort_mt(void *a, size_t n, size_t es, cmp_t *cmp, int maxthreads, int forkelem) { struct qsort *qs; struct common c; int i, islot; bool bailout = true; if (n < forkelem) goto f1; errno = 0; /* Try to initialize the resources we need. */ if (pthread_mutex_init(&c.mtx_al, NULL) != 0) goto f1; if ((c.pool = calloc(maxthreads, sizeof(struct qsort))) == NULL) goto f2; ``` `forkelem` 是決定需不需要額外啟動一個 thread 來平行排序的參數,如果陣列單元數量小於此參數的情況下,可以直接用標準函式庫的 `qsort` 就好。 如果要啟動 multithread qsort,這裡的實作是採用 thread pool 的方式。最為直接的實作下,我們可以判斷需要平行運算時才建立 thread,並在 thread 完成後直接銷毀之。這樣的作法對於資源的控管和 thread 的設計相對輕鬆。但缺點也很明顯,頻繁的建立和銷毀 thread 將造成額外的開銷,進而帶來額外的延遲時間。 取而代之,我們可以預先建立好數個 thread worker,並在需要時才從中挑選 idle thread 出來工作,詳細的說明可以在 [案例: Thread Pool 實作和改進](https://hackmd.io/@sysprog/concurrency/https%3A%2F%2Fhackmd.io%2F%40sysprog%2Fconcurrency-thread-pool#%E6%A1%88%E4%BE%8B-Thread-Pool-%E5%AF%A6%E4%BD%9C%E5%92%8C%E6%94%B9%E9%80%B2)中閱讀到。總而言之,thread pool 雖然管理上更為複雜(後續就會見識到了XD),但在分配平行任務上更為高效,而這裡我們就需要為每個 thread 建立一個專屬的 context `struct qsort`,後者可用來攜帶參數,以告訴 thread 接下來要排序的陣列和其大小等資訊。 ```cpp for (islot = 0; islot < maxthreads; islot++) { qs = &c.pool[islot]; if (pthread_mutex_init(&qs->mtx_st, NULL) != 0) goto f3; if (pthread_cond_init(&qs->cond_st, NULL) != 0) { verify(pthread_mutex_destroy(&qs->mtx_st)); goto f3; } qs->st = ts_idle; qs->common = &c; if (pthread_create(&qs->id, NULL, qsort_thread, qs) != 0) { verify(pthread_mutex_destroy(&qs->mtx_st)); verify(pthread_cond_destroy(&qs->cond_st)); goto f3; } } ``` 此 for 迴圈初始每個 thread context,包含初始化各自的 mutex 和 condition variable,並建立對應的 thread 等。 ```cpp /* All systems go. */ bailout = false; /* Initialize common elements. */ c.swaptype = ((char *) a - (char *) 0) % sizeof(long) || es % sizeof(long) ? 2 : es == sizeof(long) ? 0 : 1; ``` `swaptype` 的目的是確認所排序陣列的指標對齊狀況,這攸關後續將一個(`swap`)或多個(`vecswap`)元素作為一個單位進行 swap 的手法。首先仔細觀察其中的條件式: 1. `((char *) a - (char *) 0) % sizeof(long)`: 確認陣列 `a` 是否與 long 型別長度對齊 (若否為 true) 2. `es % sizeof(long)`: 檢查陣列中的元素是否與 long 型別對齊 (若否為 true) 3. `es == sizeof(long)`: 這條件在 1 和 2 皆不成立時才檢查,換句話說 `a` 必然對齊 `sizeof(long)`,且元素長度也對齊之,此時判斷元素大小是否直接等同 `long` (若是為 true) 則總結 `swaptype` 之值代表的意涵以及對應的 swap 手法為: * 0 代表 `a` 對齊 `sizeof(long)`,且 `es` 等同 `sizeof(long)` * swap: 直接將指標 cast 成 long 交換即可 * vecswap: 假設總共要交換的空間大小為 `n`,則以 `sizeof(long)` 為單位交換 `n / sizeof(long)` 次 * 1: `a` 對齊 `sizeof(long)`,`es` 對齊 `sizeof(long)` 但不等同 * swap / vecswap: 假設總共要交換的空間大小為 `n`,則以 `sizeof(long)` 為單位交換 `n / sizeof(long)` 次 * 2: `a` 不對齊 `sizeof(long)` 或 `es` 不對齊 `sizeof(long)` * swap / vecswap: 假設總共要交換的空間大小為 `n`,則以 `sizeof(char)` 為單位交換 `n / sizeof(char)` 次 * 顯然 `sizeof(char) < sizeof(long)`,因此 `swaptype == 2` 之情況下可以預期是用較無效率的方法進行的 :::info :notes: 在 C 語言標準 6.5.3.4 The sizeof and alignof operators 中提到: > When sizeof is applied to an operand that has type char, unsigned char, or signed char, (or a qualified version thereof) the result is 1. 換句話說,在不對齊狀況下,每次只能以 1 bytes 為單位進行 swap。 ::: ```cpp c.es = es; c.cmp = cmp; c.forkelem = forkelem; c.idlethreads = c.nthreads = maxthreads; ``` `struct common` 的 `c` 攜帶一些與陣列整體相關的資訊,原則上是在各個 thread 中只可讀的資訊。(`idlethreads` 應該是唯一的例外?) ```cpp /* Hand out the first work batch. */ qs = &c.pool[0]; verify(pthread_mutex_lock(&qs->mtx_st)); qs->a = a; qs->n = n; qs->st = ts_work; c.idlethreads--; verify(pthread_cond_signal(&qs->cond_st)); verify(pthread_mutex_unlock(&qs->mtx_st)); ``` 必要的初始化都完成後,首先就可以啟動 thread 0 來展開排序的第一步。 ```cpp /* Wait for all threads to finish, and free acquired resources. */ f3: for (i = 0; i < islot; i++) { qs = &c.pool[i]; if (bailout) { verify(pthread_mutex_lock(&qs->mtx_st)); qs->st = ts_term; verify(pthread_cond_signal(&qs->cond_st)); verify(pthread_mutex_unlock(&qs->mtx_st)); } verify(pthread_join(qs->id, NULL)); verify(pthread_mutex_destroy(&qs->mtx_st)); verify(pthread_cond_destroy(&qs->cond_st)); } free(c.pool); f2: verify(pthread_mutex_destroy(&c.mtx_al)); if (bailout) { fprintf(stderr, "Resource initialization failed; bailing out.\n"); f1: qsort(a, n, es, cmp); } } ``` 之後,main thread 就在 for 迴圈中藉由 `pthread_join` 逐一等待每個 thread 結束,並在 thread 結束時銷毀當初分配的資源。 順帶一提,在 thread pool 的方式下,要怎麼知道後續已經沒有工作需要 thread worker,可以將各個 thread 結束呢? 一種可行的做法是整理所有 thread 的狀態是否 idle。由於我們可以預期直到整個陣列的排序完成前都會有 thread 在運作,因此反過來說當所有 thread 都 idle 時該次排序應該已經完成。後續我們會在 [`qsort_thread`](#qsort_thread) 看到對應實作。 #### `qsort_thread` 每個 worker 都會執行各自的 `qsort_thread`。其主要流程可以分成三個階段: 1. 等待被喚醒,閒閒沒事(?)的 thread 之 `qs->st` 會是 `ts_idle`,直到狀態被切換至 `ts_work` 開始工作,或是切換到 `ts_term` 後結束 * 避免 busy-waiting,可以搭配 [condition variable](https://linux.die.net/man/3/pthread_cond_init) 2. 喚醒後根據 context 設定進行對應的 sorting 3. 回到 idle 狀態,此外,若發現自己完成後所有的 thread 已經都是 idle,表示最初指派的任務已經完成,可以結束自身,也將其他 thread 之狀態切換到 `ts_term` 以結束之 這三階段就對應到下方的程式碼。 ```cpp static void *qsort_thread(void *p) { struct qsort *qs, *qs2; int i; struct common *c; qs = p; c = qs->common; again: /* Wait for work to be allocated. */ verify(pthread_mutex_lock(&qs->mtx_st)); while (qs->st == ts_idle) verify(HHHH); verify(pthread_mutex_unlock(&qs->mtx_st)); if (qs->st == ts_term) { return NULL; } assert(qs->st == ts_work); ``` 這裡是第一階段行為的描述。其中,對於 HHH 的答案,從外層迴圈行為是在等待 state 的變更,以及該迴圈是搭配 mutex 建立的 critical section 使用的狀態來看,不難看出 HHHH 應該是在等待 condition variable 被 signal,因此: * HHHH = `pthread_cond_wait(&qs->cond_st, &qs->mtx_st)` ```cpp qsort_algo(qs); ``` 第二階段則進行主要任務 [`qsort_algo`](#qsort_algo)。 ```cpp verify(pthread_mutex_lock(&c->mtx_al)); qs->st = ts_idle; c->idlethreads++; if (c->idlethreads == c->nthreads) { for (i = 0; i < c->nthreads; i++) { qs2 = &c->pool[i]; if (qs2 == qs) continue; verify(pthread_mutex_lock(&qs2->mtx_st)); qs2->st = ts_term; verify(JJJJ); verify(pthread_mutex_unlock(&qs2->mtx_st)); } verify(pthread_mutex_unlock(&c->mtx_al)); return NULL; } verify(pthread_mutex_unlock(&c->mtx_al)); goto again; } ``` 第三階段也是如前面的敘述,當 `c->idlethreads == c->nthreads`,則將其他的 thread 都切換 state 為 `st_term`,因此也需要透過 condition variable 喚醒對應 thread,則: * JJJJ = `pthread_cond_signal(&qs2->cond_st)` #### `qsort_algo` ```cpp /* Thread-callable quicksort. */ static void qsort_algo(struct qsort *qs) { char *pa, *pb, *pc, *pd, *pl, *pm, *pn; int d, r, swaptype, swap_cnt; void *a; /* Array of elements. */ size_t n, es; /* Number of elements; size. */ cmp_t *cmp; int nl, nr; struct common *c; struct qsort *qs2; /* Initialize qsort arguments. */ c = qs->common; es = c->es; cmp = c->cmp; swaptype = c->swaptype; a = qs->a; n = qs->n; ``` 前面只是初始化一些 local 變數,以建立簡潔易讀的程式碼。 ```cpp top: /* From here on qsort(3) business as usual. */ swap_cnt = 0; if (n < 7) { for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es) for (pl = pm; pl > (char *) a && CMP(thunk, pl - es, pl) > 0; pl -= es) swap(pl, pl - es); return; } ``` 最開始針對 array 比較小的狀況(<7)做優化,該情況下用簡易的 insertion sort 處理即可。 :::warning 在下方 `swap_cnt` 為 0 之處也有一段 insertion sort,感覺或許可以歸納成一個 function 以簡化程式碼? ::: :::info 這邊由於 C 語言不提供泛型,因應 sort 可以針對不同型別排序的情況下,需要以 `(char *)a + es * i` 去存取第 i 個 element,不得不說閱讀起來有點不舒服XD ::: ```cpp pm = (char *) a + (n / 2) * es; if (n > 7) { pl = (char *) a; pn = (char *) a + (n - 1) * es; if (n > 40) { d = (n / 8) * es; pl = med3(pl, pl + d, pl + 2 * d, cmp, thunk); pm = med3(pm - d, pm, pm + d, cmp, thunk); pn = med3(pn - 2 * d, pn - d, pn, cmp, thunk); } pm = med3(pl, pm, pn, cmp, thunk); } swap(a, pm); ``` 陣列比較大的情況下,quick sort 會是更佳的選擇。眾所周知 quick sort 需要選擇 pivot 來將陣列進行二分。而為了避免 pivot 的選擇不好而導致容易出現 worst case (即時間複雜度為 $O(n^2)$)的狀況,一種常見的作法是透過 median of 3 (`med3`) 來挑選合適的 pivot。 > 延伸閱讀: [Improving Quicksort with Median of 3 and Cutoffs](https://youtu.be/1Vl2TB7DoAM) 這裡在挑選出 pivot 後,也將其置換為 array 的首個 element。 ```cpp pa = pb = (char *) a + es; pc = pd = (char *) a + (n - 1) * es; for (;;) { while (pb <= pc && (r = CMP(thunk, pb, a)) <= 0) { if (r == 0) { swap_cnt = 1; swap(pa, pb); pa += es; } pb += es; } while (pb <= pc && (r = CMP(thunk, pc, a)) >= 0) { if (r == 0) { swap_cnt = 1; swap(pc, pd); pd -= es; } pc -= es; } if (pb > pc) break; swap(pb, pc); swap_cnt = 1; pb += es; pc -= es; } ``` ```graphviz digraph { a [label="a", color=white]; pa [label="pa", color=white]; pb [label="pb", color=white]; pc [label="pc", color=white]; pd [label="pd", color=white]; node [shape=record, fontcolor=black, fontsize=14, width=4.75, fixedsize=true]; values [label="<f0> A[0] | <f1> A[1] | <f2> A[2] | <f3> ...... | <f4> A[n - 2] | <f5> A[n - 1]", color=blue, fillcolor=lightblue, style=filled]; edge [color=blue]; a -> values:f0; pa -> values:f1; pb -> values:f1; pc -> values:f5; pd -> values:f5; } ``` 接下來這邊是 quick sort 中經典的 partition 實作。因為主要目標是要將 order 比 pivot 小者放到 pivot 左方,order 比 pivot 大者放到右方,因此作法是從 `pb` 往右找到比 pivot 大的元素,從 `pc` 往左找到比 pivot 大者,然後將兩者交換,並重複此流程直到兩個 pointer 交會。 注意到與普通 quick sort 相比實作上還是有一些小差異,這提供演算法後續更多的優化空間: * 期間也記錄下進行 swap 的次數 * 特地將與 pivot order 相同的元素放置在從 `pa` 往右/從 `pd` 往左之處 ```cpp pn = (char *) a + n * es; r = min(pa - (char *) a, pb - pa); vecswap(a, pb - r, r); r = min(pd - pc, pn - pd - es); vecswap(pb, pn - r, r); ``` 這裡的兩個 `vecswap` 將 pivot 最後置換到正確的位置中。 ```cpp if (swap_cnt == 0) { /* Switch to insertion sort */ r = 1 + n / 4; /* n >= 7, so r >= 2 */ for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es) for (pl = pm; pl > (char *) a && CMP(thunk, pl - es, pl) > 0; pl -= es) { swap(pl, pl - es); if (++swap_cnt > r) goto nevermind; } return; } ``` 如果 `swap_cnt` 為 0,這表示在前面的 partition 中沒發生任何置換,即原始的陣列可以在不置換的前提下完美分成 `< pivot` 和 `> pivot` 的兩個子陣列。這情況很可能表示陣列其實已經完成排序或接近完成,因此我們嘗試直接對整個陣列進行 insert sort。 在此我們仍然持續追蹤完成 sort 所需要進行的 swap 數量,如果期間發現仍需要比較多的 swap,那麼可以反悔,還是進行標準的 quick sort。 ```cpp nevermind: nl = (pb - pa) / es; nr = (pd - pc) / es; /* Now try to launch subthreads. */ if (nl > c->forkelem && nr > c->forkelem && (qs2 = allocate_thread(c)) != NULL) { qs2->a = a; qs2->n = nl; verify(pthread_cond_signal(&qs2->cond_st)); verify(pthread_mutex_unlock(&qs2->mtx_st)); } else if (nl > 0) { qs->a = a; qs->n = nl; qsort_algo(qs); } ``` 如果選擇做標準的 quick sort,那就是以 pivot 為界線將陣列區分為左右兩組。對於左邊這組,我們可以考量左右子陣列剩餘需要排序的元素數量,以決定是否從 thread pool 中喚醒另一個 thread 來並行的完成 quick sort。 ```cpp if (nr > 0) { a = pn - nr * es; n = nr; goto top; } } ``` 而右邊的子陣列則可以沿用原本的 thread 繼續運算。 :::warning 想像上這一段寫成以下形式會不會在可讀性上更佳? ```cpp if (nr > 0) { qs->a = pn - nr * es; qs->n = nr; qsort_algo(qs); } ``` ::: ### 實作上的 Data race 在編譯選項中加上 `-fsanitize=thread` 可以在程式中引進 [Thread Sanitizer](https://github.com/google/sanitizers/wiki/ThreadSanitizerCppManual),後者是能夠偵測執行緒間 data race 的強大工具。另外建議也加入 `-g` 選項來導入 DWARF,以提升除錯訊息的詳細程度。 加入 flag 後,執行結果如下: ```diff ================== - WARNING: ThreadSanitizer: data race (pid=6973) Write of size 4 at 0x7b4000000080 by thread T1 (mutexes: write M0): #0 allocate_thread /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:212 (qsort-mt.out+0x3a7f) #1 qsort_algo /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:315 (qsort-mt.out+0x3a7f) #2 qsort_thread /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:352 (qsort-mt.out+0x3fd8) Previous read of size 4 at 0x7b4000000080 by thread T2 (mutexes: write M2): #0 qsort_thread /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:344 (qsort-mt.out+0x3f9a) Location is heap block of size 256 at 0x7b4000000000 allocated by main thread: #0 calloc ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:672 (libtsan.so.0+0x31edc) #1 qsort_mt /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:133 (qsort-mt.out+0x43e3) #2 main /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:505 (qsort-mt.out+0x2ad3) Mutex M0 (0x7fff028782d8) created at: #0 pthread_mutex_init ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:1227 (libtsan.so.0+0x4bee1) #1 qsort_mt /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:131 (qsort-mt.out+0x43bc) #2 main /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:505 (qsort-mt.out+0x2ad3) Mutex M2 (0x7b40000000a8) created at: #0 pthread_mutex_init ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:1227 (libtsan.so.0+0x4bee1) #1 qsort_mt /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:137 (qsort-mt.out+0x44ae) #2 main /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:505 (qsort-mt.out+0x2ad3) Thread T1 (tid=6975, running) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8) #1 qsort_mt /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:145 (qsort-mt.out+0x4468) #2 main /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:505 (qsort-mt.out+0x2ad3) Thread T2 (tid=6976, running) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8) #1 qsort_mt /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:145 (qsort-mt.out+0x4468) #2 main /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:505 (qsort-mt.out+0x2ad3) SUMMARY: ThreadSanitizer: data race /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:212 in allocate_thread ================== ``` Thread Sanitizer 明確指出出現 data race 的程式碼位置,對照後會發現問題是出在 `allocate_thread` 中,在存取 `st` 成員前應該先對其相應的 mutex lock 進行上鎖動作。 ```diff static struct qsort *allocate_thread(struct common *c) { verify(pthread_mutex_lock(&c->mtx_al)); for (int i = 0; i < c->nthreads; i++) if (c->pool[i].st == ts_idle) { c->idlethreads--; + verify(pthread_mutex_lock(&c->pool[i].mtx_st)); c->pool[i].st = ts_work; - verify(pthread_mutex_lock(&c->pool[i].mtx_st)); ... ``` 完成上述改動後重新編譯,即可以排除 Thread Sanitizer 的警告訊息。