## 測驗 $\alpha$
在s-tree被初始化之後先隨意插入100個數值
接著依照ascending order印出來
打印的函式為`__treeint_dump` 因為它也是BST所以要依照ascending order走訪的話只需要從root開始進行inorder traversal
```c
/* ascending order */
static void __treeint_dump(struct st_node *n, int depth)
{
if (!n)
return;
__treeint_dump(FFFF, depth + 1);
struct treeint *v = treeint_entry(n);
printf("%d\n", v->value);
__treeint_dump(GGGG, depth + 1);
}
```
所以`FFFF`會是`n`的左子樹, `GGGG`是`n`的右子樹
使用原本就定義好的巨集`st_left`和`st_right`來取得`n`的左子樹和右子樹
`FFFF`:`st_left(n)`
`GGGG`:`st_right(n)`
剩下五個問題都在`st_remove`當中
依照註解的敘述分別會是
`AAAA`:`st_replace_right(del, least)`
`BBBB`:`st_update(root, st_first(least))`
`CCCC`:`st_replace_left(del, most)`
`DDDD`:`st_update(root, st_last(most))`
`EEEE`:`st_update(root, parent)`
`FFFF`:`st_left(n)`
`GGGG`:`st_right(n)`
## 測驗 $\alpha - 1$
### Insert
插入tree node主要由兩個函式負責
首先使用`treeint_insert`
接著實際插入節點的動作由`st_insert`進行
**treeint_insert**
```c
struct treeint *treeint_insert(int a)
{
struct st_node *p = NULL;
enum st_dir d;
// basic insertion step
for (struct st_node *n = st_root(tree); n;) {
struct treeint *t = container_of(n, struct treeint,
st_n); // get the address of st_n in n
if (a == t->value)
return t;
p = n;
if (a < t->value) {
n = st_left(n);
d = LEFT;
} else if (a > t->value) {
n = st_right(n);
d = RIGHT;
}
}
// create a new node and insert it into the tree
struct treeint *i = calloc(sizeof(struct treeint), 1);
if (st_root(tree))
st_insert(&st_root(tree), p, &i->st_n, d);
else
st_root(tree) = &i->st_n;
i->value = a;
return i;
}
```
此函式執行的動作可以分為兩部份
1. 找到新節點要插入的位置
過程和binary search相同, 透過for loop向子節點尋找要插入的位置
若找到數值和自己相同節點則回傳該節點
2. 判斷目前此樹是否為空, 若是則將新節點作為root, 不是則呼叫`st_insert`進行插入的動作
**st_insert**
將新節點放入新的位置並將父節點對到child的pointer和自己指向父節點的pointer設定好
之後交由`st_update`來更新此樹, 確保此樹符合規則
### Update
像紅黑樹還有AVL tree一樣
此樹也有update method來確保insertion或deletion之後, 此樹依然保持它要求的規則
例如維持平衡
由`st_update`負責執行此動作
首先利用`st_balance`計算此樹的balance factor
基本上是左子樹高減掉右子樹高
應該注意到每個`st_node`都有一個成員`hint`作為儲存節點在數中高度的數字
不過它並非完全精準的計算 或許後面可以改進此部份計算高度的函式
* 若左傾, 則進行左旋
* 若右傾, 則進行右旋
需要注意的是如果n就是root, 則需要先將root改變再進行旋轉
且如果旋轉完之後n變成leaf node或者跟之前的高度不同
則持續往上對n的parent做update
### Remove
remove節點由`treeint_remove`執行
先找出要刪除的節點位置後進行`st_remove`
之後再free該節點
## 測驗 $\alpha - 2$
我認為這段程式碼所實作出的S-tree有數個可以改進的地方
1. **st_balance跟st_max_hint動作重複很多, 可以在計算st_balance同時更新hint**
`st_balance`和`st_max_hint`程式碼高度重複
可以在計算balance factor的時候同時更新`n->hint`
並且我們知道若插入節點後沒有造成旋轉, 則`hint`不會改變, 所以可以把`st_max_hint`搬入rotate function當中以減少重複運算的次數
2. **沒有rotate就應該繼續往上update**
原先是否繼續往上update還要經過判斷
但插入節點後所有父輩節點的高度都會改變都有機會不平衡, 只要沒有進行旋轉都應該繼續update
一旦旋轉後這個操作必須保證讓s tree回到平衡所以不需要繼續update
3. **此樹並沒有做到真的平衡**
如果依照順序插入10, 5, 20, 2, 7, 25
此時s tree是平衡的沒問題
但如果這時候插入23, 結果會發現它只有執行一次right rotate, 樹還是不平衡的
甚至接著繼續插入19, 它依舊沒發現自己不平衡
s tree判斷平衡的方式和AVL tree很類似
但AVL tree還有將不平衡的類型分為四種而進行對應不同的旋轉方式
s tree在RL, LR的情況旋轉之後依然不平衡
只要依序插入20, 25, 23即可發現
4. **replace裡面的if太多**
`st_replace_right, st_replace_left`當中的if判斷式太多
可讀性很差
**改進**
1. **進行st_balance同時計算n->hint**
如此一來只有旋轉後才需要進行`st_max_hint`來更新節點高度
```diff
static inline int st_balance(struct st_node *n)
{
int l = 0, r = 0;
if (st_left(n))
l = st_left(n)->hint + 1;
if (st_right(n))
r = st_right(n)->hint + 1;
+ n->hint = l > r ? l : r;
return l - r;
}
```
2. **改變向上執行st_update的條件**
只要插入後沒有旋轉都應該繼續往上update
而只有b被偵測出不平衡才會旋轉
所以b <= 1 or b >= -1都應該進行update
如此一來還能減少一個if的判斷
```diff
static inline void st_update(struct st_node **root, struct st_node *n)
{
if (!n)
return;
int b = st_balance(n);
int prev_hint = n->hint;
struct st_node *p = st_parent(n);
if (b < -1) {
/* leaning to the right */
if (n == *root)
*root = st_right(n);
st_rotate_right(n);
}
else if (b > 1) {
/* leaning to the left */
if (n == *root)
*root = st_left(n);
st_rotate_left(n);
+ } else {
+ st_update(root, p);
+ }
- n->hint = st_max_hint(n);
- if (n->hint == 0 || n->hint != prev_hint)
- st_update(root, p);
}
```
3. **處理RL, LR旋轉後依然不平衡的情況**
相較AVL tree在這兩種情況會進行兩次旋轉的處理方式
我只進行一次, 不過做法和一般旋轉不同
如此一來可以保證經過旋轉後s tree一定平衡
*實作有錯誤的地方 delete的時候會造成segmentation fault 仍須更正*
```c
static inline void st_rl_rotate(struct st_node *n)
{
struct st_node *r = st_right(n), *p = st_parent(n);
struct st_node *rl = st_left(r);
st_parent(rl) = st_parent(n);
st_right(n) = st_left(rl);
st_left(r) = st_right(rl);
st_parent(n) = rl;
st_parent(r) = rl;
st_left(rl) = n;
st_right(rl) = r;
if (p && st_left(p) == n)
st_left(p) = rl;
else if (p)
st_right(p) = rl;
if (st_right(n))
st_rparent(n) = n;
if (st_left(r))
st_lparent(r) = r;
n->hint = st_max_hint(n);
}
static inline void st_update(struct st_node **root, struct st_node *n)
{
if (!n)
return;
int b = st_balance(n);
int prev_hint = n->hint;
struct st_node *p = st_parent(n);
if (b < -1) {
/* leaning to the right */
if (n == *root)
*root = st_right(n);
if (st_right(st_right(n)))
st_rotate_right(n);
else
st_rl_rotate(n);
}
else if (b > 1) {
/* leaning to the left */
if (n == *root)
*root = st_left(n);
if (st_left(st_left(n)))
st_rotate_left(n);
else
st_lr_rotate(n);
} else {
st_update(root, p);
}
}
```
### 測驗 $\alpha - 3$
TODO
---
## 測驗 $\beta$
```c
#include <stdint.h>
static inline uintptr_t align_up(uintptr_t sz, size_t alignment)
{
uintptr_t mask = alignment - 1;
if ((alignment & mask) == 0) { /* power of two? */
return (sz + mask) & (~mask);
}
return (((sz + mask) / alignment) * alignment);
}
```
### 測驗 $\beta - 1$
若`alignment`在都是2的指數情況下
也就是1000, 0100, 0010等
答案會是`sz`把`alignment`的msb後面的bit都清為0
之後再加上`alignment`
解答會是`((sz / alignment) + 1) * alignment`
不過提示有說不能有`alignment`, 我們又知道`mask = alignment - 1`
所以想辦法把`alignment`用`mask`表示就可以
因為`alignment`剛好是二的指數所以`mask`剛好會是把`alignment`的msb後面的0全部換成1
利用這個特性先將`mask`給反轉之後跟`sz`做AND 就能達到上述第一步
接著加上`mask + 1`就是加上`alignment`
### 測驗 $\beta - 2$
在[sort.c](https://elixir.bootlin.com/linux/latest/source/lib/sort.c)當中有一個函式如下
```clike
/**
* is_aligned - is this pointer & size okay for word-wide copying?
* @base: pointer to data
* @size: size of each element
* @align: required alignment (typically 4 or 8)
*
* Returns true if elements can be copied using word loads and stores.
* The size must be a multiple of the alignment, and the base address must
* be if we do not have CONFIG_HAVE_EFFICIENT_UNALIGNED_ACCESS.
*
* For some reason, gcc doesn't know to optimize "if (a & mask || b & mask)"
* to "if ((a | b) & mask)", so we do that by hand.
*/
__attribute_const__ __always_inline
static bool is_aligned(const void *base, size_t size, unsigned char align)
{
unsigned char lsbits = (unsigned char)size;
(void)base;
#ifndef CONFIG_HAVE_EFFICIENT_UNALIGNED_ACCESS
lsbits |= (unsigned char)(uintptr_t)base;
#endif
return (lsbits & (align - 1)) == 0;
}
```
## 測驗 $\gamma$
`HHHH`:`pthread_cond_wait(&qs->cond_st, &qs->mtx_st)`
`JJJJ`:`pthread_cond_signal(&qs2->cond_st)`
## 測驗 $\gamma - 1$
主要探討`qsort_mt`和其相關內容
實作此程式碼首先探討兩個特別的結構
```clike
struct qsort {
enum thread_state st; /* For coordinating work. */
struct common *common; /* Common shared elements. */
void *a; /* Array base. */
size_t n; /* Number of elements. */
pthread_t id; /* Thread id. */
pthread_mutex_t mtx_st; /* For signalling state change. */
pthread_cond_t cond_st; /* For signalling state change. */
};
struct common {
int swaptype; /* Code to use for swapping */
size_t es; /* Element size. */
void *thunk; /* Thunk for qsort_r */
cmp_t *cmp; /* Comparison function */
int nthreads; /* Total number of pool threads. */
int idlethreads; /* Number of idle threads in pool. */
int forkelem; /* Minimum number of elements for a new thread. */
struct qsort *pool; /* Fixed pool of threads. */
pthread_mutex_t mtx_al; /* For allocating threads in the pool. */
};
```
首先有個`struct common c`
初始化它的mutex lock並建立一個pool, 當中有`maxthreads`個thread
之後依序建立這些thread並執行`qsort_thread`(注意此處還沒真正開始sort, `a`沒有被送進去)
注意到所有thread共享同一個`struct common`
如果取得`qs->mtx_st`這個mutex lock 就可以進行quick sort
也就是進行`qsort_algo`
之後thread進入idle狀態
如果全部thread都idle, 則依序把thread都terminate
如果沒有, 就回到again的地方再來一次
**`qsort_algo`**
此函式為主要進行quick sort的部分
如果array size小於7則直接進行熟知的bubble sort
```clike
if (n < 7) {
for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)
for (pl = pm; pl > (char *) a && CMP(thunk, pl - es, pl) > 0;
pl -= es)
swap(pl, pl - es);
return;
}
```
如果大於7則要進行quick sort, 而quick sort在升冪或降冪排列的時候時間複雜度會變成$O(n)$
為了避免這個情形, 盡量讓每次sorting都是average case
要進行以下操作以取得中位數然後放到array head的地方
```clike
pm = (char *) a + (n / 2) * es;
if (n > 7) {
pl = (char *) a;
pn = (char *) a + (n - 1) * es;
if (n > 40) {
d = (n / 8) * es;
pl = med3(pl, pl + d, pl + 2 * d, cmp, thunk);
pm = med3(pm - d, pm, pm + d, cmp, thunk);
pn = med3(pn - 2 * d, pn - d, pn, cmp, thunk);
}
pm = med3(pl, pm, pn, cmp, thunk);
}
swap(a, pm);
```
然後進行真正的quick sort
```clike
pa = pb = (char *) a + es;
pc = pd = (char *) a + (n - 1) * es;
for (;;) {
while (pb <= pc && (r = CMP(thunk, pb, a)) <= 0) {
if (r == 0) {
swap_cnt = 1;
swap(pa, pb);
pa += es;
}
pb += es;
}
while (pb <= pc && (r = CMP(thunk, pc, a)) >= 0) {
if (r == 0) {
swap_cnt = 1;
swap(pc, pd);
pd -= es;
}
pc -= es;
}
if (pb > pc)
break;
swap(pb, pc);
swap_cnt = 1;
pb += es;
pc -= es;
}
```
做的事跟普通的quick sort沒太大差別, 不過這邊比較特別是只把跟pivot相同的element搬到前面
排完後前面有一段element都是相同的元素
cmp有可能出錯而造成sorting被打斷
如果發生這情況就從尾巴往前把還沒做完的部分給sort完
一開始只需要在意pb, pc
pb是array second element, pc是array last element, a就是課本的pivot
如果pb所指向的element跟a指向的element大小相同
則把a跟pb交換並且讓`a += es`
重複上述過程直到pb碰到比a大的元素
不斷重複上述的sorting過程直到`pb > pc`
此時`a~pa`, `pb~pd`都是大小和`*a`相同的元素
以下的swapping procedure是為了把這些相同的元素擺到array中間
```clike
pn = (char *) a + n * es;
r = min(pa - (char *) a, pb - pa);
vecswap(a, pb - r, r);
r = min(pd - pc, pn - pd - es);
vecswap(pb, pn - r, r);
```
這時候頭尾兩段元素還沒排好
```clike
nl = (pb - pa) / es;
nr = (pd - pc) / es;
```
如果沒有swap過(沒有重複元素)
則進行insertion sort
不過如果inseriton sort的時候swap太多次則直接跳出
不然就做到結束然後return
這時候如果尚未排列好的元素的數量需要多創建threads來排列
則喚醒躺在cond裡面的thread
若沒有則在同一個thread裡遞迴執行qsort
## 測驗 $\gamma - 2$
使用thread sanitizer去檢驗程式碼後
發現數個錯誤
第一個在`allocate_thread`中
```clike
static struct qsort *allocate_thread(struct common *c)
{
verify(pthread_mutex_lock(&c->mtx_al));
for (int i = 0; i < c->nthreads; i++)
if (c->pool[i].st == ts_idle) {
c->idlethreads--;
c->pool[i].st = ts_work;
verify(pthread_mutex_lock(&c->pool[i].mtx_st));
verify(pthread_mutex_unlock(&c->mtx_al));
return (&c->pool[i]);
}
verify(pthread_mutex_unlock(&c->mtx_al));
return (NULL);
}
```
`verify(pthread_mutex_lock(&c->pool[i].mtx_st))`之後沒有對應的unlock
修正如下
```diff
static struct qsort *allocate_thread(struct common *c)
{
verify(pthread_mutex_lock(&c->mtx_al));
for (int i = 0; i < c->nthreads; i++) {
+ verify(pthread_mutex_lock(&c->pool[i].mtx_st));
if (c->pool[i].st == ts_idle) {
c->idlethreads--;
c->pool[i].st = ts_work;
+ verify(pthread_mutex_unlock(&c->pool[i].mtx_st));
verify(pthread_mutex_unlock(&c->mtx_al));
return (&c->pool[i]);
}
+ verify(pthread_mutex_unlock(&c->pool[i].mtx_st));
}
verify(pthread_mutex_unlock(&c->mtx_al));
return (NULL);
}
```
接著修正`qsort_algo`當中配置資源時候沒有做好的mutual exclusion
```diff
/* Initialize qsort arguments. */
+ verify(pthread_mutex_lock(&qs->mtx_st));
c = qs->common;
+ verify(pthread_mutex_lock(&c->mtx_al));
es = c->es;
cmp = c->cmp;
swaptype = c->swaptype;
+ verify(pthread_mutex_unlock(&c->mtx_al));
a = qs->a;
n = qs->n;
+ verify(pthread_mutex_unlock(&qs->mtx_st));
```
```diff
/* Now try to launch subthreads. */
if (nl > c->forkelem && nr > c->forkelem &&
(qs2 = allocate_thread(c)) != NULL) {
+ verify(pthread_mutex_lock(&qs2->mtx_st));
qs2->a = a;
qs2->n = nl;
verify(pthread_cond_signal(&qs2->cond_st));
verify(pthread_mutex_unlock(&qs2->mtx_st));
```
## 測驗 $\gamma - 3$
為了嘗試改進此排序的效能
要分析它的算法
先不考慮multi-thread的情況
在`qsort_algo`當中進行的quick sort每做完一次回圈
`a ~ pa-es`都會是相同的數值
若是重複的數字很少, 甚至是沒有相同的數字
此處的動作就毫無意義, 直接進入以下的insertion sort
還不如一開始就進行insertion sort
若是有大量數字重複, 則此演算法表現應該十分突出