## 測驗 $\alpha$ 在s-tree被初始化之後先隨意插入100個數值 接著依照ascending order印出來 打印的函式為`__treeint_dump` 因為它也是BST所以要依照ascending order走訪的話只需要從root開始進行inorder traversal ```c /* ascending order */ static void __treeint_dump(struct st_node *n, int depth) { if (!n) return; __treeint_dump(FFFF, depth + 1); struct treeint *v = treeint_entry(n); printf("%d\n", v->value); __treeint_dump(GGGG, depth + 1); } ``` 所以`FFFF`會是`n`的左子樹, `GGGG`是`n`的右子樹 使用原本就定義好的巨集`st_left`和`st_right`來取得`n`的左子樹和右子樹 `FFFF`:`st_left(n)` `GGGG`:`st_right(n)` 剩下五個問題都在`st_remove`當中 依照註解的敘述分別會是 `AAAA`:`st_replace_right(del, least)` `BBBB`:`st_update(root, st_first(least))` `CCCC`:`st_replace_left(del, most)` `DDDD`:`st_update(root, st_last(most))` `EEEE`:`st_update(root, parent)` `FFFF`:`st_left(n)` `GGGG`:`st_right(n)` ## 測驗 $\alpha - 1$ ### Insert 插入tree node主要由兩個函式負責 首先使用`treeint_insert` 接著實際插入節點的動作由`st_insert`進行 **treeint_insert** ```c struct treeint *treeint_insert(int a) { struct st_node *p = NULL; enum st_dir d; // basic insertion step for (struct st_node *n = st_root(tree); n;) { struct treeint *t = container_of(n, struct treeint, st_n); // get the address of st_n in n if (a == t->value) return t; p = n; if (a < t->value) { n = st_left(n); d = LEFT; } else if (a > t->value) { n = st_right(n); d = RIGHT; } } // create a new node and insert it into the tree struct treeint *i = calloc(sizeof(struct treeint), 1); if (st_root(tree)) st_insert(&st_root(tree), p, &i->st_n, d); else st_root(tree) = &i->st_n; i->value = a; return i; } ``` 此函式執行的動作可以分為兩部份 1. 找到新節點要插入的位置 過程和binary search相同, 透過for loop向子節點尋找要插入的位置 若找到數值和自己相同節點則回傳該節點 2. 判斷目前此樹是否為空, 若是則將新節點作為root, 不是則呼叫`st_insert`進行插入的動作 **st_insert** 將新節點放入新的位置並將父節點對到child的pointer和自己指向父節點的pointer設定好 之後交由`st_update`來更新此樹, 確保此樹符合規則 ### Update 像紅黑樹還有AVL tree一樣 此樹也有update method來確保insertion或deletion之後, 此樹依然保持它要求的規則 例如維持平衡 由`st_update`負責執行此動作 首先利用`st_balance`計算此樹的balance factor 基本上是左子樹高減掉右子樹高 應該注意到每個`st_node`都有一個成員`hint`作為儲存節點在數中高度的數字 不過它並非完全精準的計算 或許後面可以改進此部份計算高度的函式 * 若左傾, 則進行左旋 * 若右傾, 則進行右旋 需要注意的是如果n就是root, 則需要先將root改變再進行旋轉 且如果旋轉完之後n變成leaf node或者跟之前的高度不同 則持續往上對n的parent做update ### Remove remove節點由`treeint_remove`執行 先找出要刪除的節點位置後進行`st_remove` 之後再free該節點 ## 測驗 $\alpha - 2$ 我認為這段程式碼所實作出的S-tree有數個可以改進的地方 1. **st_balance跟st_max_hint動作重複很多, 可以在計算st_balance同時更新hint** `st_balance`和`st_max_hint`程式碼高度重複 可以在計算balance factor的時候同時更新`n->hint` 並且我們知道若插入節點後沒有造成旋轉, 則`hint`不會改變, 所以可以把`st_max_hint`搬入rotate function當中以減少重複運算的次數 2. **沒有rotate就應該繼續往上update** 原先是否繼續往上update還要經過判斷 但插入節點後所有父輩節點的高度都會改變都有機會不平衡, 只要沒有進行旋轉都應該繼續update 一旦旋轉後這個操作必須保證讓s tree回到平衡所以不需要繼續update 3. **此樹並沒有做到真的平衡** 如果依照順序插入10, 5, 20, 2, 7, 25 此時s tree是平衡的沒問題 但如果這時候插入23, 結果會發現它只有執行一次right rotate, 樹還是不平衡的 甚至接著繼續插入19, 它依舊沒發現自己不平衡 s tree判斷平衡的方式和AVL tree很類似 但AVL tree還有將不平衡的類型分為四種而進行對應不同的旋轉方式 s tree在RL, LR的情況旋轉之後依然不平衡 只要依序插入20, 25, 23即可發現 4. **replace裡面的if太多** `st_replace_right, st_replace_left`當中的if判斷式太多 可讀性很差 **改進** 1. **進行st_balance同時計算n->hint** 如此一來只有旋轉後才需要進行`st_max_hint`來更新節點高度 ```diff static inline int st_balance(struct st_node *n) { int l = 0, r = 0; if (st_left(n)) l = st_left(n)->hint + 1; if (st_right(n)) r = st_right(n)->hint + 1; + n->hint = l > r ? l : r; return l - r; } ``` 2. **改變向上執行st_update的條件** 只要插入後沒有旋轉都應該繼續往上update 而只有b被偵測出不平衡才會旋轉 所以b <= 1 or b >= -1都應該進行update 如此一來還能減少一個if的判斷 ```diff static inline void st_update(struct st_node **root, struct st_node *n) { if (!n) return; int b = st_balance(n); int prev_hint = n->hint; struct st_node *p = st_parent(n); if (b < -1) { /* leaning to the right */ if (n == *root) *root = st_right(n); st_rotate_right(n); } else if (b > 1) { /* leaning to the left */ if (n == *root) *root = st_left(n); st_rotate_left(n); + } else { + st_update(root, p); + } - n->hint = st_max_hint(n); - if (n->hint == 0 || n->hint != prev_hint) - st_update(root, p); } ``` 3. **處理RL, LR旋轉後依然不平衡的情況** 相較AVL tree在這兩種情況會進行兩次旋轉的處理方式 我只進行一次, 不過做法和一般旋轉不同 如此一來可以保證經過旋轉後s tree一定平衡 *實作有錯誤的地方 delete的時候會造成segmentation fault 仍須更正* ```c static inline void st_rl_rotate(struct st_node *n) { struct st_node *r = st_right(n), *p = st_parent(n); struct st_node *rl = st_left(r); st_parent(rl) = st_parent(n); st_right(n) = st_left(rl); st_left(r) = st_right(rl); st_parent(n) = rl; st_parent(r) = rl; st_left(rl) = n; st_right(rl) = r; if (p && st_left(p) == n) st_left(p) = rl; else if (p) st_right(p) = rl; if (st_right(n)) st_rparent(n) = n; if (st_left(r)) st_lparent(r) = r; n->hint = st_max_hint(n); } static inline void st_update(struct st_node **root, struct st_node *n) { if (!n) return; int b = st_balance(n); int prev_hint = n->hint; struct st_node *p = st_parent(n); if (b < -1) { /* leaning to the right */ if (n == *root) *root = st_right(n); if (st_right(st_right(n))) st_rotate_right(n); else st_rl_rotate(n); } else if (b > 1) { /* leaning to the left */ if (n == *root) *root = st_left(n); if (st_left(st_left(n))) st_rotate_left(n); else st_lr_rotate(n); } else { st_update(root, p); } } ``` ### 測驗 $\alpha - 3$ TODO --- ## 測驗 $\beta$ ```c #include <stdint.h> static inline uintptr_t align_up(uintptr_t sz, size_t alignment) { uintptr_t mask = alignment - 1; if ((alignment & mask) == 0) { /* power of two? */ return (sz + mask) & (~mask); } return (((sz + mask) / alignment) * alignment); } ``` ### 測驗 $\beta - 1$ 若`alignment`在都是2的指數情況下 也就是1000, 0100, 0010等 答案會是`sz`把`alignment`的msb後面的bit都清為0 之後再加上`alignment` 解答會是`((sz / alignment) + 1) * alignment` 不過提示有說不能有`alignment`, 我們又知道`mask = alignment - 1` 所以想辦法把`alignment`用`mask`表示就可以 因為`alignment`剛好是二的指數所以`mask`剛好會是把`alignment`的msb後面的0全部換成1 利用這個特性先將`mask`給反轉之後跟`sz`做AND 就能達到上述第一步 接著加上`mask + 1`就是加上`alignment` ### 測驗 $\beta - 2$ 在[sort.c](https://elixir.bootlin.com/linux/latest/source/lib/sort.c)當中有一個函式如下 ```clike /** * is_aligned - is this pointer & size okay for word-wide copying? * @base: pointer to data * @size: size of each element * @align: required alignment (typically 4 or 8) * * Returns true if elements can be copied using word loads and stores. * The size must be a multiple of the alignment, and the base address must * be if we do not have CONFIG_HAVE_EFFICIENT_UNALIGNED_ACCESS. * * For some reason, gcc doesn't know to optimize "if (a & mask || b & mask)" * to "if ((a | b) & mask)", so we do that by hand. */ __attribute_const__ __always_inline static bool is_aligned(const void *base, size_t size, unsigned char align) { unsigned char lsbits = (unsigned char)size; (void)base; #ifndef CONFIG_HAVE_EFFICIENT_UNALIGNED_ACCESS lsbits |= (unsigned char)(uintptr_t)base; #endif return (lsbits & (align - 1)) == 0; } ``` ## 測驗 $\gamma$ `HHHH`:`pthread_cond_wait(&qs->cond_st, &qs->mtx_st)` `JJJJ`:`pthread_cond_signal(&qs2->cond_st)` ## 測驗 $\gamma - 1$ 主要探討`qsort_mt`和其相關內容 實作此程式碼首先探討兩個特別的結構 ```clike struct qsort { enum thread_state st; /* For coordinating work. */ struct common *common; /* Common shared elements. */ void *a; /* Array base. */ size_t n; /* Number of elements. */ pthread_t id; /* Thread id. */ pthread_mutex_t mtx_st; /* For signalling state change. */ pthread_cond_t cond_st; /* For signalling state change. */ }; struct common { int swaptype; /* Code to use for swapping */ size_t es; /* Element size. */ void *thunk; /* Thunk for qsort_r */ cmp_t *cmp; /* Comparison function */ int nthreads; /* Total number of pool threads. */ int idlethreads; /* Number of idle threads in pool. */ int forkelem; /* Minimum number of elements for a new thread. */ struct qsort *pool; /* Fixed pool of threads. */ pthread_mutex_t mtx_al; /* For allocating threads in the pool. */ }; ``` 首先有個`struct common c` 初始化它的mutex lock並建立一個pool, 當中有`maxthreads`個thread 之後依序建立這些thread並執行`qsort_thread`(注意此處還沒真正開始sort, `a`沒有被送進去) 注意到所有thread共享同一個`struct common` 如果取得`qs->mtx_st`這個mutex lock 就可以進行quick sort 也就是進行`qsort_algo` 之後thread進入idle狀態 如果全部thread都idle, 則依序把thread都terminate 如果沒有, 就回到again的地方再來一次 **`qsort_algo`** 此函式為主要進行quick sort的部分 如果array size小於7則直接進行熟知的bubble sort ```clike if (n < 7) { for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es) for (pl = pm; pl > (char *) a && CMP(thunk, pl - es, pl) > 0; pl -= es) swap(pl, pl - es); return; } ``` 如果大於7則要進行quick sort, 而quick sort在升冪或降冪排列的時候時間複雜度會變成$O(n)$ 為了避免這個情形, 盡量讓每次sorting都是average case 要進行以下操作以取得中位數然後放到array head的地方 ```clike pm = (char *) a + (n / 2) * es; if (n > 7) { pl = (char *) a; pn = (char *) a + (n - 1) * es; if (n > 40) { d = (n / 8) * es; pl = med3(pl, pl + d, pl + 2 * d, cmp, thunk); pm = med3(pm - d, pm, pm + d, cmp, thunk); pn = med3(pn - 2 * d, pn - d, pn, cmp, thunk); } pm = med3(pl, pm, pn, cmp, thunk); } swap(a, pm); ``` 然後進行真正的quick sort ```clike pa = pb = (char *) a + es; pc = pd = (char *) a + (n - 1) * es; for (;;) { while (pb <= pc && (r = CMP(thunk, pb, a)) <= 0) { if (r == 0) { swap_cnt = 1; swap(pa, pb); pa += es; } pb += es; } while (pb <= pc && (r = CMP(thunk, pc, a)) >= 0) { if (r == 0) { swap_cnt = 1; swap(pc, pd); pd -= es; } pc -= es; } if (pb > pc) break; swap(pb, pc); swap_cnt = 1; pb += es; pc -= es; } ``` 做的事跟普通的quick sort沒太大差別, 不過這邊比較特別是只把跟pivot相同的element搬到前面 排完後前面有一段element都是相同的元素 cmp有可能出錯而造成sorting被打斷 如果發生這情況就從尾巴往前把還沒做完的部分給sort完 一開始只需要在意pb, pc pb是array second element, pc是array last element, a就是課本的pivot 如果pb所指向的element跟a指向的element大小相同 則把a跟pb交換並且讓`a += es` 重複上述過程直到pb碰到比a大的元素 不斷重複上述的sorting過程直到`pb > pc` 此時`a~pa`, `pb~pd`都是大小和`*a`相同的元素 以下的swapping procedure是為了把這些相同的元素擺到array中間 ```clike pn = (char *) a + n * es; r = min(pa - (char *) a, pb - pa); vecswap(a, pb - r, r); r = min(pd - pc, pn - pd - es); vecswap(pb, pn - r, r); ``` 這時候頭尾兩段元素還沒排好 ```clike nl = (pb - pa) / es; nr = (pd - pc) / es; ``` 如果沒有swap過(沒有重複元素) 則進行insertion sort 不過如果inseriton sort的時候swap太多次則直接跳出 不然就做到結束然後return 這時候如果尚未排列好的元素的數量需要多創建threads來排列 則喚醒躺在cond裡面的thread 若沒有則在同一個thread裡遞迴執行qsort ## 測驗 $\gamma - 2$ 使用thread sanitizer去檢驗程式碼後 發現數個錯誤 第一個在`allocate_thread`中 ```clike static struct qsort *allocate_thread(struct common *c) { verify(pthread_mutex_lock(&c->mtx_al)); for (int i = 0; i < c->nthreads; i++) if (c->pool[i].st == ts_idle) { c->idlethreads--; c->pool[i].st = ts_work; verify(pthread_mutex_lock(&c->pool[i].mtx_st)); verify(pthread_mutex_unlock(&c->mtx_al)); return (&c->pool[i]); } verify(pthread_mutex_unlock(&c->mtx_al)); return (NULL); } ``` `verify(pthread_mutex_lock(&c->pool[i].mtx_st))`之後沒有對應的unlock 修正如下 ```diff static struct qsort *allocate_thread(struct common *c) { verify(pthread_mutex_lock(&c->mtx_al)); for (int i = 0; i < c->nthreads; i++) { + verify(pthread_mutex_lock(&c->pool[i].mtx_st)); if (c->pool[i].st == ts_idle) { c->idlethreads--; c->pool[i].st = ts_work; + verify(pthread_mutex_unlock(&c->pool[i].mtx_st)); verify(pthread_mutex_unlock(&c->mtx_al)); return (&c->pool[i]); } + verify(pthread_mutex_unlock(&c->pool[i].mtx_st)); } verify(pthread_mutex_unlock(&c->mtx_al)); return (NULL); } ``` 接著修正`qsort_algo`當中配置資源時候沒有做好的mutual exclusion ```diff /* Initialize qsort arguments. */ + verify(pthread_mutex_lock(&qs->mtx_st)); c = qs->common; + verify(pthread_mutex_lock(&c->mtx_al)); es = c->es; cmp = c->cmp; swaptype = c->swaptype; + verify(pthread_mutex_unlock(&c->mtx_al)); a = qs->a; n = qs->n; + verify(pthread_mutex_unlock(&qs->mtx_st)); ``` ```diff /* Now try to launch subthreads. */ if (nl > c->forkelem && nr > c->forkelem && (qs2 = allocate_thread(c)) != NULL) { + verify(pthread_mutex_lock(&qs2->mtx_st)); qs2->a = a; qs2->n = nl; verify(pthread_cond_signal(&qs2->cond_st)); verify(pthread_mutex_unlock(&qs2->mtx_st)); ``` ## 測驗 $\gamma - 3$ 為了嘗試改進此排序的效能 要分析它的算法 先不考慮multi-thread的情況 在`qsort_algo`當中進行的quick sort每做完一次回圈 `a ~ pa-es`都會是相同的數值 若是重複的數字很少, 甚至是沒有相同的數字 此處的動作就毫無意義, 直接進入以下的insertion sort 還不如一開始就進行insertion sort 若是有大量數字重複, 則此演算法表現應該十分突出