owned this note
owned this note
Published
Linked with GitHub
# linux2023: ranvd
## 測驗 $\alpha-1$
```c
struct st_node {
short hint;
struct st_node *parent;
struct st_node *left, *right;
};
struct st_root {
struct st_node *root;
};
enum st_dir { LEFT, RIGHT };
```
`st_node` 為 S-Tree 中節點的結構體,其中 left、right、parent 分別表示左節點、右節點、親代節點,hint 為用來維持平衡的參數;`st_root` 為指向 S-Tree 根節點的結構體;`st_dir` 為一個 enum,用來表示訪問節點時是向右還是向左走。
### Insert
```c
struct treeint *treeint_insert(int a) {
struct st_node *p = NULL;
enum st_dir d;
for (struct st_node *n = st_root(tree); n;) {
struct treeint *t = container_of(n, struct treeint, st_n);
if (a == t->value) return t;
p = n;
if (a < t->value) {
n = st_left(n);
d = LEFT;
} else if (a > t->value) {
n = st_right(n);
d = RIGHT;
}
}
struct treeint *i = calloc(sizeof(struct treeint), 1);
if (st_root(tree))
st_insert(&st_root(tree), p, &i->st_n, d);
else
st_root(tree) = &i->st_n;
i->value = a;
return i;
}
```
上述程式碼為 S-Tree 的 insert 方式。for 迴圈的功能為尋找目前要插入的數值 `a` 應該要放的位置,此方法與 BST 插入的方式雷同。尋找的過程中使用 `container_of()` 得到節點實際結構體(`struct treeint`)的位址,並在每次迭代時紀錄上一次走的方向是 `LEFT` 還是 `RIGHT`。
找到適當的位址後,透過 `if (st_root(tree))` 判斷目前是否已經有根節點存在 S-Tree 中,如果沒有的話就將其節點設為根節點,否則就呼叫 `st_insert(&st_root(tree), p, &i->st_n, d)`
```c
void st_insert(struct st_node **root, struct st_node *p, struct st_node *n,
enum st_dir d) {
if (d == LEFT)
st_left(p) = n;
else
st_right(p) = n;
st_parent(n) = p;
st_update(root, n);
}
```
`st_insert` 將節點擺放至先前找到的位置後呼叫 `st_update` 維持樹的平衡。
```c
static inline void st_update(struct st_node **root, struct st_node *n) {
if (!n) return;
int b = st_balance(n);
int prev_hint = n->hint;
struct st_node *p = st_parent(n);
if (b < -1) {
/* leaning to the right */
if (n == *root) *root = st_right(n);
st_rotate_right(n);
}
else if (b > 1) {
/* leaning to the left */
if (n == *root) *root = st_left(n);
st_rotate_left(n);
}
n->hint = st_max_hint(n);
if (n->hint == 0 || n->hint != prev_hint) st_update(root, p);
}
```
上述程式碼中 `st_balance(n)` 的回傳值為節點 `n` 的左節點的 hint 減右節點的 hint;`st_max_hint(n)` 的回傳值為透過左右子樹的 hint 算出目前節點 n 的 hint,主要用在更新 hint 時使用。`st_update` 會在左右子樹的 hint 差距超過 1 以上時候進行旋轉,必且在新插入節點與 hint 發生更動時向親代節點進行遞迴。這麼做的問題也如同[林俊燁](https://www.facebook.com/groups/system.software2023/permalink/709439087674915/)同學在課程社團中提到的,在特定的插入順序下有些節點沒辦法更新 hint 的數值,導致存取的時間在 worst-case 下為 $O(n)$。
### Remove
```c
int treeint_remove(int a) {
struct treeint *n = treeint_find(a);
if (!n) return -1;
st_remove(&st_root(tree), &n->st_n);
free(n);
return 0;
}
```
在 `treeint_remove` 中會先搜尋要移除的位置,如果存在則會呼叫 `st_remove` 進行移除。
```c
void st_remove(struct st_node **root, struct st_node *del) {
if (st_right(del)) {
struct st_node *least = st_first(st_right(del));
if (del == *root) *root = least;
st_replace_right(del, least) /*AAAA*/;
least->hint = st_max_hint(least) /*BBBB*/;
return;
}
if (st_left(del)) {
struct st_node *most = st_last(st_left(del));
if (del == *root) *root = most;
st_replace_left(del, most) /*CCCC*/;
most->hint = st_max_hint(most) /*DDDD*/;
return;
}
if (del == *root) {
*root = 0;
return;
}
/* empty node */
struct st_node *parent = st_parent(del);
if (st_left(parent) == del)
st_left(parent) = 0;
else
st_right(parent) = 0;
parent->hint = st_max_hint(parent) /*EEEE*/;
}
```
`st_remove` 會先嘗試搜尋在右子樹中最小的節點,並透過 `st_replace_left` 將節點移除,再透過 `st_max_hint` 將 hint 更新,如果沒有右子樹則會嘗試搜尋在左子樹中最大的節點,再透過 `st_replace_left` 將節點移除,在透過 `st_max_hint` 將 hint 更新。如果左右子樹皆不存在,則可直接將該點移除,並更新親代節點的 hint。
## 測驗 $\alpha-2$
首先先討論程式碼上可以改進的地方。
```c
#define st_root(r) (r->root)
#define st_left(n) (n->left)
#define st_right(n) (n->right)
#define st_parent(n) (n->parent)
```
以上的巨集的實作我認為可以改成以下這種方式。
```c
#define st_root(r) ((r)->root)
#define st_left(n) ((n)->left)
#define st_right(n) ((n)->right)
#define st_parent(n) ((n)->parent)
```
這種方式可以讓巨集更為泛用,例如嘗試用 `st_root(r+1)` 這種方式使用巨集,原始的作法會因為展開時語法錯誤而導致編譯錯誤;反之,新的做法則可以正常使用。
接著討論 S-Tree 在 update 上的的問題,一開始我先入為主的認為 S-Tree 只是容許高度差為 2 的 AVL tree 而已,透過[林俊燁](https://www.facebook.com/groups/system.software2023/permalink/709439087674915/)同學提出的 worst-case 讓我意識到 S-Tree 與 AVL Tree 在處理平衡時存在根本的差別。以下先直接講更動的地方與效能分析,後面在討論更動的想法並嘗試解釋正確性。
```c
struct st_node {
short hint;
bool label;
struct st_node *parent;
struct st_node *left, *right;
};
```
首先是結構上的,我將 `st_node` 加上了一個 `label` 作為紀錄,主要用來處理[林俊燁](https://www.facebook.com/groups/system.software2023/permalink/709439087674915/)同學提到的 worst-case。(`label` 其實這只占一個 bit,可以將其隱藏至 `parent` 指標中,但由於會更動到太多程式碼,以後有機會在實作)
```c
static inline void st_update(struct st_node **root, struct st_node *n) {
...
if (n->hint == 0 || n->hint != prev_hint || st_islabeled(n)){
n->label = false;
st_update(root, p);
}
else
n->label = true;
...
}
```
接著是 `st_update` 中判斷是否要向親節點進行 update 的判斷方式,當該點 hint 沒有發生變化的時候就會將 label 設定為 true,並在下次遇到的時候就會強制執行 `st_update(root, p)`。
以下為「插入」操作的 worst-case 的執行結果,可以看到原始版本遠高於修改後的版本。(綠色那條幾乎貼在地上),因此可以推論修改後的結果確實能夠處理原本版本的 worst-case。但這並不表示修改後的演算法不存在 worst-case。

接著下圖是隨機插入資料的結果,因為兩者幾乎重疊,因此再下一張圖為經過平滑處理的圖片,可以看到兩者結果幾乎不相上下。


### 討論正確性
S-Tree 與 AVL Tree 主要的差別在 AVL Tree 面對 RL 或 LR 時會將其轉成 RR 或 LL 後在做一次 Right rotate 或 Left rotate 使其結構能夠盡量維持平衡,而 S-Tree 由於總是在 hint 差距為 2 的時候才進行調整,這導致 RL 或 LR 發生時無法透過旋轉來解決,因為 RL 做 Left rotate 會變成 LR,反之亦然,如下圖。
```graphviz
graph G{
rankdir=LR
{rank="same" a1, c1}
{rank="same" a2, c2}
a1[label=a shape=circle]
b1[label=b shape=circle]
c1[label=c shape=circle]
a2[label=a shape=circle]
b2[label=b shape=circle]
c2[label=c shape=circle]
a1 -- b1
b1 -- c1
b1 -- b2[style=invis]
a2 -- b2
b2 -- c2
}
```
```c
static inline void st_update(struct st_node **root, struct st_node *n) {
if (!n) return;
int b = st_balance(n);
int prev_hint = n->hint;
...
n->hint = st_max_hint(n);
if (n->hint == 0 || n->hint != prev_hint) st_update(root, p);
}
```
上面的程式碼為原始 `st_update` 向親代遞迴更新的條件,以下用下圖說明發生 worst-case 的情況,如果要將結果從左邊變成右邊需要一個 Right rotate,這時候我們不用考慮親代節點的方向。當 `w.hint == x.hint + 1` 且 `w.hint - z.hint > 0` 時 `v.hint == w.hint + 1`,這時候如果有一筆資料插入 x 導致 `w.hint` 發生改變且不至於觸發旋轉,這時 `v.hint` 應該要加 1 並發生旋轉。由於旋轉的依據是透過 `st_balance` 來觀看最新的狀況,但此時的 `v.hint` 並沒有更新,因此在準備發生 Right ratate 時 `w.hint == v.hint == x.hint + 1`,接著發生旋轉並執行 `n->hint = st_max_hint(n)`,但此時的 `v.hint` 因為被轉下去,導致 `v.hint == x.hint + 1`,因此錯失了向親代節點的更新的機會。

接著我們考慮 `w.hint == y.hint + 1` 且 `w.hint - z.hint > 0` 的情況,接著我們在 y 插入一筆資料導致 `w.hint` 發生改變且不至於觸發旋轉,這時 `v.hint` 也應該加 1 並觸發旋轉且 `y.hint - x.hint > 0`,此時在旋轉前 `v.hint == w.hint == y.hint + 1` 且此時的 `y.hint - z.hint > 0`、`y.hint - x.hint > 0`,在這種情況下如果要向上面說的使 `v.hint` 因為沒有更新數值而導致沒有向親節點進行更新的話必須 `x.hint + 1 == v.hint`,然而 `v.hint == y.hint + 1 > x.hint` 因此不可能發生 `v.hint` 沒有改變的情況,也就是說我們只需要專注處理 RL 或 LR 的情況即可。
因此我們只需要在旋轉的時候發現 `v.hint` 沒有變動則先將其標記,等待下一次插入 v.hint 也沒有變動時,強制將向親代節點更新,並將標記清除。

然而這種方式是否存在 worst-case?目前有待確認,但相比原始版本更不容易發生 worst-case。
## 測驗 $\alpha-3$
## 測驗 $\beta-1$
```c
static inline uintptr_t align_up(uintptr_t sz, size_t alignment) {
uintptr_t mask = alignment - 1;
if ((alignment & mask) == 0) { /* power of two? */
return sz + (~(sz & mask) + 1 & mask) /*MMMM*/;
}
return (((sz + mask) / alignment) * alignment);
}
```
上述程式碼功能是依照 `alignment` 大小對 `sz` 的數值做對齊。
首先先看 `(((sz + mask) / alignment) * alignment)`,該程式碼可以在任意 `alignment` 大小都做到 `sz` 的對齊,此行程式碼的想法將 `sz` 依照 `alignment` 的大小分成 `x` 塊,最後在將 `x` 乘以 `alignment` 就會是對齊後的 `sz` 的大小。
依照此對齊的規則,當 `sz % alignment == 0` 時 `x = sz / alignment`;反之,當 `sz % alignment != 0` 時,`x = sz / alignment + 1`。透過 C 語言對整數除法會無條件捨去小數點後位數的特性,我們可以將 `sz` 加上 `alignment-1` 做到上述的功能,也就是說,當 `sz % alignment == 0` 時 `(sz + alignment - 1) / alignment` 可以與 `sz / alignmen` 相等;同時,當 `sz % alignment != 0` 時 `(sz + alignment - 1) / alignment` 可以與 `sz / alignemt + 1` 相等。
接著看 `sz + (~(sz & mask) + 1 & mask)`。因為 `if ((alignment & mask) == 0)` 可以確定 `sz` 為 $2^n$,因此只要在 `sz % alignment != 0` 時,將 `sz` 的第 n+1 bit 加 1 即可。透過 `sz + ~(sz & mask) + 1` 可以確保 `sz` 的第 n+1 bit 一定會加 1,但我們希望當 `sz % alignment == 0` 時不要在 n+1 bit 上加 1,因此使用了 `mask` 將 `~(sz & mask) + 1` 中超過第 n bit 的 bits 都清成 0。
> 透過 8/10 直播內容得知正確解答為 (x + mask) & ~mask。
## 測驗 $\beta-2$
> 透過 8/10 直播內容得知正確答案在 tools/testing/selftests/kvm/include/test_util.h:147 有相關實作。
## 測驗 $\gamma-1$
[qsort-mt.c](https://gist.github.com/jserv/38bca4ebfea1c434e1dfc15337f80eeb) 程式碼在多執行緒時排序的想法是將要排序的資料 a 分成三段。假設有一個數值 x,第一段為小於 x 的資料、第二段為等於 x 的資料、第三段為大於 x 的資料。因此第二段資料已經放在正確的位置了,接著將第一段資料丟給其他執行緒處理,自己則再次將第三段資料分成三段,不斷的迭代下去。因此此演算法可視為快速排序法的變形。
```c
void qsort_mt(void *a,
size_t n,
size_t es,
cmp_t *cmp,
int maxthreads,
int forkelem)
{
...
if (pthread_mutex_init(&c.mtx_al, NULL) != 0)
goto f1;
if ((c.pool = calloc(maxthreads, sizeof(struct qsort))) == NULL)
goto f2;
for (islot = 0; islot < maxthreads; islot++) {
qs = &c.pool[islot];
if (pthread_mutex_init(&qs->mtx_st, NULL) != 0)
goto f3;
if (pthread_cond_init(&qs->cond_st, NULL) != 0) {
verify(pthread_mutex_destroy(&qs->mtx_st));
goto f3;
}
qs->st = ts_idle;
qs->common = &c;
if (pthread_create(&qs->id, NULL, qsort_thread, qs) != 0) {
verify(pthread_mutex_destroy(&qs->mtx_st));
verify(pthread_cond_destroy(&qs->cond_st));
goto f3;
}
}
...
}
```
上述程式碼為呼叫多執行緒排序時的前處理動作,透過呼叫 `pthread_` 系列的函式將 mutex、condition variable 做初始化,並透過 `goto` 與 `verify` 處理函式發生錯誤時對應的處理方式。從程式碼中還可以看到使用 threads pool 的方式將多個執行緒先建立起來,並將其狀態設定為 `ts_idle` 等待之後使用。
```c
/* Hand out the first work batch. */
qs = &c.pool[0];
verify(pthread_mutex_lock(&qs->mtx_st));
qs->a = a;
qs->n = n;
qs->st = ts_work;
c.idlethreads--;
verify(pthread_cond_signal(&qs->cond_st));
verify(pthread_mutex_unlock(&qs->mtx_st));
```
上述程式碼為在所有的前置處理都做完後,將 thread pool 中編號為 0 的執行緒透過 `pthread_cond_signal` 叫起來做排序演算法。
```c
/* Wait for all threads to finish, and free acquired resources. */
f3:
for (i = 0; i < islot; i++) {
qs = &c.pool[i];
if (bailout) {
verify(pthread_mutex_lock(&qs->mtx_st));
qs->st = ts_term;
verify(pthread_cond_signal(&qs->cond_st));
verify(pthread_mutex_unlock(&qs->mtx_st));
}
verify(pthread_join(qs->id, NULL));
verify(pthread_mutex_destroy(&qs->mtx_st));
verify(pthread_cond_destroy(&qs->cond_st));
}
free(c.pool);
```
接著,「主」執行緒就會被 `pthread_join` block,停在這行等待 threads 返回,而前面提到編號為 0 的執行緒則在 `qsort_thread` 中處理相對應的工作。
```c
/* Thread-callable quicksort. */
static void *qsort_thread(void *p)
{
...
again:
/* Wait for work to be allocated. */
verify(pthread_mutex_lock(&qs->mtx_st));
while (qs->st == ts_idle)
verify(pthread_cond_wait(&qs->cond_st, &qs->mtx_st)/*HHHH*/);
verify(pthread_mutex_unlock(&qs->mtx_st));
if (qs->st == ts_term) {
return NULL;
}
assert(qs->st == ts_work);
qsort_algo(qs);
...
```
上述的程式碼可以看到執行緒被建立後會透過 `pthread_cond_wait` 函數將整個執行緒 block 等待其他執行緒向他傳送 signal。並且如果被叫起來的時候的狀態為 `ts_term` 則會直接結束工作並回傳 `NULL`,同時狀態必須是 `ts_work` 才會繼續進行,否則程式就會被 assert 中止,如果狀態是 `ts_work` 就會呼叫 `qsort_algo` 開始多執行緒的排序演算法。
```c
/* Thread-callable quicksort. */
static void qsort_algo(struct qsort *qs)
{
...
top:
/* From here on qsort(3) business as usual. */
swap_cnt = 0;
if (n < 7) {
for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)
for (pl = pm; pl > (char *) a && CMP(thunk, pl - es, pl) > 0;
pl -= es)
swap(pl, pl - es);
return;
}
...
}
```
上面時有提到,此排序法會將資料分成三份,並且不斷的迭代處理,而這項迭代的中止條件就是當資料的個數小於 7 時會直接採用插入排序法,並直接 return。
```c
pm = (char *) a + (n / 2) * es;
if (n > 7) {
pl = (char *) a;
pn = (char *) a + (n - 1) * es;
if (n > 40) {
d = (n / 8) * es;
pl = med3(pl, pl + d, pl + 2 * d, cmp, thunk);
pm = med3(pm - d, pm, pm + d, cmp, thunk);
pn = med3(pn - 2 * d, pn - d, pn, cmp, thunk);
}
pm = med3(pl, pm, pn, cmp, thunk);
}
swap(a, pm);
```
如果目前的資料個數 n 大於 7 的話,則會從三個點 `pl`、`pm`、`pn` 中取出中位數作為分成三部分的基準 x。(n 超過 40 就需要用另一種方式決定 `pl`、`pm`、`pn` 的原因尚待釐清)
> Recall: 第一段為小於 x 的資料、第二段為等於 x 的資料、第三段為大於 x 的資料
```c
pa = pb = (char *) a + es;
pc = pd = (char *) a + (n - 1) * es;
for (;;) {
while (pb <= pc && (r = CMP(thunk, pb, a)) <= 0) {
if (r == 0) {
swap_cnt = 1;
swap(pa, pb);
pa += es;
}
pb += es;
}
while (pb <= pc && (r = CMP(thunk, pc, a)) >= 0) {
if (r == 0) {
swap_cnt = 1;
swap(pc, pd);
pd -= es;
}
pc -= es;
}
if (pb > pc)
break;
swap(pb, pc);
swap_cnt = 1;
pb += es;
pc -= es;
}
pn = (char *) a + n * es;
r = min(pa - (char *) a, pb - pa);
vecswap(a, pb - r, r);
r = min(pd - pc, pn - pd - es);
vecswap(pb, pn - r, r);
```
接著上面這段程式碼正是將資料分為三段的方式,首先透過 for 迴圈先將與 x 相同的資料移動到整體資料的兩端,接著再透過 `vecswap` 將資料移動成我們上述所說的三段。此外過程中還透過 swap_cnt 紀錄是否有在 for 迴圈中有進行交換,其功能在下一段程式碼進行說明。
```c
if (swap_cnt == 0) { /* Switch to insertion sort */
r = 1 + n / 4; /* n >= 7, so r >= 2 */
for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)
for (pl = pm; pl > (char *) a && CMP(thunk, pl - es, pl) > 0;
pl -= es) {
swap(pl, pl - es);
if (++swap_cnt > r)
goto nevermind;
}
return;
}
```
在上面這段程式碼中,如果 `swap_cnt == 0` 就會進行 `if` 內的動作,而 `swap_cnt == 0` 代表資料的分布剛好左邊皆小於 x,右邊皆大於 x,此情況代表排序快要完成了,因此使用 `if` 內使用插入排序法進行排序,且要求利用 `r` 來控制交換的次數,主要的功能為檢測目前資料是否真的接近排序完成,如果離完成還有一段距離則跳至 `nervermind`。
```c
nevermind:
nl = (pb - pa) / es;
nr = (pd - pc) / es;
/* Now try to launch subthreads. */
if (nl > c->forkelem && nr > c->forkelem &&
(qs2 = allocate_thread(c)) != NULL) {
qs2->a = a;
qs2->n = nl;
verify(pthread_cond_signal(&qs2->cond_st));
verify(pthread_mutex_unlock(&qs2->mtx_st));
} else if (nl > 0) {
qs->a = a;
qs->n = nl;
qsort_algo(qs);
}
if (nr > 0) {
a = pn - nr * es;
n = nr;
goto top;
}
```
上面的程式碼中,`nl` 就是第一段資料的數量,`nr` 則是第三段資料的數量。同時檢查數量是否超過設定的 `forkelem`,如果超過的話就使用 `allocate_thread` 函式將一個 thread 從 thread pool 中喚醒,同時將其需要處理的範圍給定;反之,則遞迴呼叫自己。最後在使用 goto 跳回 top 的部分,重新將第三段資料分成三段,之後反覆相同動作。
```c
static void *qsort_thread(void *p)
{
...
qsort_algo(qs);
verify(pthread_mutex_lock(&c->mtx_al));
qs->st = ts_idle;
c->idlethreads++;
if (c->idlethreads == c->nthreads) {
for (i = 0; i < c->nthreads; i++) {
qs2 = &c->pool[i];
if (qs2 == qs)
continue;
verify(pthread_mutex_lock(&qs2->mtx_st));
qs2->st = ts_term;
verify(pthread_cond_signal(&qs2->cond_st)/*JJJJ*/);
verify(pthread_mutex_unlock(&qs2->mtx_st));
}
verify(pthread_mutex_unlock(&c->mtx_al));
return NULL;
}
verify(pthread_mutex_unlock(&c->mtx_al));
goto again;
}
```
接著,看回到 `qsort_thread` 函式,`qsort_thread` 透過 `if (c->idlethreads == c->nthreads)` 讓當前的 thread 可以知道自己是不是最後一個完成的 thread,如果是最後一個,代表所有的工作都已經做完了,因此會執行 `if` 內的程式,將所有自己以外的 thread 的狀態改成 `ts_term`,並透過 `pthread_cond_signal` 使其他 thread 能夠結束工作並返回主程式。最後自己在透過 `return NULL` 返回。
## 測驗 $\gamma-2$
```shell
# 編譯方式
gcc -fsanitize=thread -g gamma.c
```
以下為 `./a.out` 的執行結果
```shell
WARNING: ThreadSanitizer: data race (pid=15546)
Write of size 4 at 0x7b4000000080 by thread T1 (mutexes: write M0):
#0 allocate_thread /home/minired/Code/temp_code/hw1/gamma.c:211 (a.out+0x3476)
#1 qsort_algo /home/minired/Code/temp_code/hw1/gamma.c:314 (a.out+0x4193)
#2 qsort_thread /home/minired/Code/temp_code/hw1/gamma.c:351 (a.out+0x45b1)
Previous read of size 4 at 0x7b4000000080 by thread T2 (mutexes: write M2):
#0 qsort_thread /home/minired/Code/temp_code/hw1/gamma.c:343 (a.out+0x44c7)
Location is heap block of size 256 at 0x7b4000000000 allocated by main thread:
#0 calloc ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:672 (libtsan.so.0+0x31edc)
#1 qsort_mt /home/minired/Code/temp_code/hw1/gamma.c:132 (a.out+0x28fa)
#2 main /home/minired/Code/temp_code/hw1/gamma.c:503 (a.out+0x500e)
Mutex M0 (0x7ffd1e121fc8) created at:
#0 pthread_mutex_init ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:1227 (libtsan.so.0+0x4bee1)
#1 qsort_mt /home/minired/Code/temp_code/hw1/gamma.c:130 (a.out+0x28dd)
#2 main /home/minired/Code/temp_code/hw1/gamma.c:503 (a.out+0x500e)
Mutex M2 (0x7b40000000a8) created at:
#0 pthread_mutex_init ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:1227 (libtsan.so.0+0x4bee1)
#1 qsort_mt /home/minired/Code/temp_code/hw1/gamma.c:136 (a.out+0x297f)
#2 main /home/minired/Code/temp_code/hw1/gamma.c:503 (a.out+0x500e)
Thread T1 (tid=15551, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 qsort_mt /home/minired/Code/temp_code/hw1/gamma.c:144 (a.out+0x2a8e)
#2 main /home/minired/Code/temp_code/hw1/gamma.c:503 (a.out+0x500e)
Thread T2 (tid=15552, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 qsort_mt /home/minired/Code/temp_code/hw1/gamma.c:144 (a.out+0x2a8e)
#2 main /home/minired/Code/temp_code/hw1/gamma.c:503 (a.out+0x500e)
SUMMARY: ThreadSanitizer: data race /home/minired/Code/temp_code/hw1/gamma.c:211 in allocate_thread
==================
ThreadSanitizer: reported 1 warnings
```
從上面可以看到,分別在 211 行與 343 行的時候發生 data race,211 行要寫而 343 行要讀,以下將程式碼列出來觀看。
```c=205
static struct qsort *allocate_thread(struct common *c)
{
verify(pthread_mutex_lock(&c->mtx_al));
for (int i = 0; i < c->nthreads; i++)
if (c->pool[i].st == ts_idle) {
c->idlethreads--;
c->pool[i].st = ts_work;
verify(pthread_mutex_lock(&c->pool[i].mtx_st));
verify(pthread_mutex_unlock(&c->mtx_al));
return (&c->pool[i]);
}
verify(pthread_mutex_unlock(&c->mtx_al));
return (NULL);
}
```
```c=342
verify(pthread_mutex_lock(&qs->mtx_st));
while (qs->st == ts_idle)
verify(pthread_cond_wait(&qs->cond_st, &qs->mtx_st)/*HHHH*/);
verify(pthread_mutex_unlock(&qs->mtx_st));
```
從程式碼中可以看到,在 211 行做 `c->pool[i].st = ts_work` 時沒有將 mutex 鎖上,而是在做完之後才鎖上。因此我們只需要將 `verify(pthread_mutex_lock(&c->pool[i].mtx_st))` 移至 `c->pool[i].st = ts_work` 前即可,移動後結果如下:
```c=211
verify(pthread_mutex_lock(&c->pool[i].mtx_st));
c->pool[i].st = ts_work;
```
為什麼不用將 mutex 解鎖?這是因為在 `allocate_thread()` 結束後會將其 mutex 解鎖,如下程式碼,但我認為應該先將其解鎖在做 signal,也就是說 317 行應該與 318 行對調。
```c=313
if (nl > c->forkelem && nr > c->forkelem &&
(qs2 = allocate_thread(c)) != NULL) {
qs2->a = a;
qs2->n = nl;
verify(pthread_cond_signal(&qs2->cond_st));
verify(pthread_mutex_unlock(&qs2->mtx_st));
```
## 測驗 $\gamma-3$