linux2023: shengwen-tw - 作業1

GitHub

測驗 α

1. 解釋上述程式碼運作原理

在 BST (Binary Tree Search) 中對某一節點搜尋的次數和樹高有關,如果 BST 是平衡的 (即沒有傾斜向左或右其中一邊),則時間複雜度會是 \(O(log\ n)\), 反之則是 \(O(n)\)

1-1: BST 基本規則:

  • 若左子樹非空,則左子樹下所有節點的數值皆小於親代節點。
  • 若右子樹非空,則右子樹下所有節點的數值皆大於親代節點。

1-2: 關於平衡:

  • 概念上 BST 上每一層能容納 \(2^{n-1}\) 個節點 (\(n = 1, 2, 3, ...\)),如果某一層沒有填滿卻往下一層插入節點便會導致傾斜狀態產生。

1-3: S-Tree 程式分析:

st_first(): 遞迴地找出最左下方的 Leaf 節點。

st_last(): 遞迴地找出最右下方的 Leaf 節點。

st_balance(): 計算某一節點向左或向右的傾斜度 (負值右傾、正值左傾)。

st_max_hint(): 計算某一節點的 hint 值,取左邊和右邊最大的那一方加1。

st_update(): 檢測 S-Tree 是否向左或右傾斜,如有傾斜則對節點進行旋轉以達到平衡。

st_insert(): 給定某一截點 p 在其下面插入節點 n (插入時要指定左或右),並呼叫 st_update() 重新平衡。

st_rotate_left():

可將節點的左旋變換前後關係繪製如下:

Case 1 (p > n > lr > l:
        (p)               (p)
        /                 /
       /                 /
     (n)               (l)
     /                   \
    /         ===>         \   
  (l)                     (n)
    \                     /
     \                   /
    (lr)              (lr)
    
    
Case 2 (lr > l > n > p):
  (p)               (p)
    \                 \
     \                 \
     (n)               (l)
     /                   \
    /         ===>         \   
  (l)                     (n)
    \                     /
     \                   /
    (lr)              (lr)

st_rotate_right():

可將節點的右旋變換前後關係繪製如下:

Case 1 (p > r > rl > n):
      (p)                  (p)
      /                    /
     /                    /
   (n)                  (r)
     \        ===>      /
      \                /
      (r)            (n)
      /                \
     /                  \
  (rl)                  (rl)

Case 2 (r > rl > n > p):
  (p)                  (p)
    \                    \
     \                    \
     (n)                  (r)
       \      ===>        /
        \                /
        (r)            (n)
        /                \
       /                  \
    (rl)                 (rl)

st_replace_left():

//用 l 取代 n, 這代表所有接到 n 的節點都要改接到 l 上 (包括親代節點和子節點)
static inline void st_replace_left(struct st_node *n, struct st_node *l)
{
    /*
     *  p 是 n 的親代節點
     * lp 是 l 的親代節點
     */
    struct st_node *p = st_parent(n), *lp = st_parent(l);

    /*
     * 因為 l 要從原本的位置取出並取代到 n 的位置上,因此:
     * (1.) lp 的右節點要取代為 lp 的右子樹中最小的節點 (原先是連接到 l)
     * (2.) l 的左子樹 (即所有比 lp 大但去除 l 的節點)要取代為 lp 的右子樹
     */
    if (st_right(lp) == l) {       //如果 l 是 lp 的右節點
        st_right(lp) = st_left(l); //lp 的右節點改為 l 的左節點
        if (st_left(l))            //如果 l 的左節點存在
            st_lparent(l) = lp;    //l 的左節點的親代節點改為 lp
    }

    if (st_parent(lp) == n) //如果 lp 的親代節點是 n
        st_parent(lp) = l;  //將 lp 的親代節點取代為 l

    st_parent(l) = p;          //因為要用 l 取代 n, 因此 l 的親代節點必須改成 p
    st_right(l) = st_right(n); //n 的右節點改成 l 的右節點

    if (st_left(n) != l) {       //如果 n 的左節點不是 l
        st_left(l) = st_left(n); //n 的左節點改成 l 的左節點
        st_lparent(n) = l;       //n 的左節點的親代節點由 n 取代為 l
    }

    if (p && st_left(p) == n) //如果 p 存在且 p 的左節點是 n
        st_left(p) = l;       //將 p 的左節點取代為 l
    else if (p)               //p 是 n 的親代節點,如果 n 不是 p 的左節點則必定是右節點
        st_right(p) = l;      //將 p 的右節點取代為 l

    if (st_right(n))       //如果 n 有右節點
        st_rparent(n) = l; //將 n 的右節點的親代節點由 n 取代為 l
}

st_replace_right():

// 用 r 取代 n, 這代表所有接到 n 的節點要改接到 r 上 (包括親代節點和子節點)
static inline void st_replace_right(struct st_node *n, struct st_node *r)
{
    /*
     *  p 是 n 的親代節點
     * rp 是 r 的親代節點
     */
    struct st_node *p = st_parent(n), *rp = st_parent(r);

    /*
     * 因為 r 要從原本的位置取出並取代到 n 的位置上, 因此:
     * (1.) rp 的左節點要取代為 rp 的右子樹中最大的節點 (原本是連接到 r)
     * (2.) r 的右子樹 (即所有比 rp 小但是去除 r 的節點) 要取代為 rp 的左子樹
     */
    if (st_left(rp) == r) {        //如果 r 是 rp 的左節點
        st_left(rp) = st_right(r); //rp 的左節點改成 r 的右節點
        if (st_right(r))           //如果 r 的右節點存在
            st_rparent(r) = rp;    //r 的右節點的親代節點改成 rp
    }

    if (st_parent(rp) == n) //如果 rp 的親代節點是 n
        st_parent(rp) = r;  //將 rp 的親代節點取代為 r

    st_parent(r) = p;        //因為要用 r 取代 n, 因此 r 的親代節點必須改成 p
    st_left(r) = st_left(n); //n 的左點改成 r 的左節點

    if (st_right(n) != r) {        //如果 n 的右節點不是 r
        st_right(r) = st_right(n); //n 的右節點改成 r 的右節點
        st_rparent(n) = r;         //n 的右節點的親代節點取代為 r
    }

    if (p && st_left(p) == n) //如果 p 且 p 的左節點是 n
        st_left(p) = r;       //將 p 的左節點取代為 r
    else if (p)               //p 是 n 的親代節點,如果 n 不是 p 的左節點則必定是右節點
        st_right(p) = r;      //將 p 的右節點取代為 r

    if (st_left(n))        //如果 n 有左節點
        st_lparent(n) = r; //將 n 的左節點的親代節點由 n 取代為 l
}

st_remove():

要從樹中移除一節點 del 的話:

  • 檢查 del 的右節點是否非空,若非空則將 del 的右節點以 Left least leaf 取代並重新平衡
  • 檢查 del 的左節點是否非空,若非空則將 del 的左節點以 Right most leaf 取代並重新平衡
  • 如果 del 就是 Leaf 節點則直接清空再重新平衡即可

1-4: 測試程式分析:

型態為 int 的 S-Tree 實體,實做上有物件導向繼承的風格:

struct treeint {
    int value;
    struct st_node st_n;
};

container_of() 透過計算 member 在結構體的位移量回推結構體的起始記憶體位址:

#define container_of(ptr, type, member) \
    ((type *) ((char *) (ptr) - (offsetof(type, member))))
#define treeint_entry(ptr) container_of(ptr, struct treeint, st_n)

treeint_init(): 用 calloc 向作業系統請求儲存整數 S-Tree 的記憶體空間,calloc 會將此空間內容設定為0。

__treeint_destroy(), treeint_destroy(): 遞迴地 free() 出各個左右節點所佔的空間。

treeint_insert(): 給定一個新的數值,往下遞迴的比較大小,小於時向左,大於時向右,當到達空的 Leaf 時 new() 出一個新的節點並插入。

treeint_find(): 遞迴地向左右節點比較大小直到找到為止或是節點不存在。

treeint_remove(): 先找到想移除的數值的節點,在呼叫st_remove()將節點從樹中移除,最後free()出記憶體。

__treeint_dump(), treeint_dump(): 向左右節點遞迴地印出節點數值。

1-5: 參考資料:

3. 設計效能評比程式,探討上述程式碼和 red-black tree 效能落差

參考 RinHizakura, 將 Linux kernel 的紅黑樹程式移植到作業中實驗。

實驗設計:

  • 計算隨機插入及刪除的總時間 (改自作業原程式)
  • 計算隨機插入所需要的總時間
  • 計算隨機刪除所需要的總時間
  • 每次進行統計時給予 S-Tree 和 Red Black Tree 程式相同的隨機種子 (確保操作的資料一致)
  • 使用 CSV 格式統計數據,並以 Octave (MATLAB) 繪製圖表
  • X 軸表示測試的節點數量,Y 軸表示所需時間

實驗1: 隨機插入加刪除。節點數量由 100 增加至 100000,遞增量為 100。用於觀測數量遞增下的表現。

實驗2: 隨機插入加刪除。節點數量固定為 100000。用於觀測同一數量下的表現。

實驗3: 僅測量隨機插入所需的總時間。節點數量固定為 100000。用於比較插入的時間表現。

實驗4: 僅測量隨機刪除所需的總時間。節點數量固定為 100000。用於比較刪除的時間表現。

根據四次實驗可知,S-Tree 整體表現優於 Red Black Tree,但是在刪除所需要的時間上基本持平。

測驗 β

1. 說明上述程式碼的運作原理

align_up() 程式將運算分成了對齊量為2的冪非2的冪的兩種情況:

static inline uintptr_t align_up(uintptr_t sz, size_t alignment)
{
    uintptr_t mask = alignment - 1;
    
    //2的冪的情況
    if ((alignment & mask) == 0) { /* power of two? */
        return (sz + mask) & ~mask;
    }
    
    //非2的冪的情況
    return (((sz + mask) / alignment) * alignment);
}
  • alignment 表示要給定數值要用 N 個 Bytes 進行對齊
  • 當給定數值超過對齊的量後則要取 ceiling 到下一個對齊的位置上

先考慮非2的冪的情況:

return (((sz + mask) / alignment) * alignment);

相當於

return (((sz + alignment - 1) / alignment) * alignment);

帶入幾個數值來看看這段程式會有什麼效果

  • sz = 50, alignment = 10:
    ((50 + 10 - 1) / 10) * 105.9 * 10 (取整數) → 5 * 10 = 50
  • sz = 55, alignment = 10:
    ((55 + 10 - 1) / 10) * 106.4 * 10 (取整數) → 6 * 10 = 60

可以看的出來:

  • - 1 運算確保了本來就已經對齊的數值不會受到影響
  • + alignment 的運算用作於類似取 ceiling 的效果
  • 這段程式也利用了整數運算會捨去除法餘數的特性

alignment - 1 正是 mask 的定義。

2的冪的情況:

return (sz + mask) & ~mask; 的程式中:

~mask 產生相反的遮罩,以16位元無號整數舉例:

alignment mask ~mask
8 (0'0000000000001000) 7 (0'0000000000000111) 0'1111111111111000

如果進行了 val & ~mask 的運算則會將 mask 的幾個 LSB (Least Significant Bit) 清0。

(sz + mask) 實現了類似 ceiling 的效果。如果 sz 不需要對齊到下一個合法位置上,則此運算只會影響到 mask bits 的部分。但若 sz 需要對齊到下一個合法位置上,則 mask bits 以上的數值將產生進位。無論是哪種情形,最終 mask bits 的數值都會透過 val & ~mask 被清0,以確保 align_up() 結果是整除 arrangement 的數值的。

2. 在 Linux 核心原始程式碼找出類似 align_up 的程式碼,並舉例說明其用法

Linux kernel 的 x86 初始化程式碼中有一段程式直接使用到了 Align up 的邏輯:

arch/x86/kernel/setup.c:

+extern unsigned long _brk_end; ... void * __init extend_brk(size_t size, size_t align) { size_t mask = align - 1; void *ret; BUG_ON(_brk_start == 0); BUG_ON(align & mask); + _brk_end = (_brk_end + mask) & ~mask; BUG_ON((char *)(_brk_end + size) > __brk_limit); ret = (void *)_brk_end; _brk_end += size; memset(ret, 0, size); return ret; }

arch/x86/mm/init.c 中有段程式碼如下:

__ref void *alloc_low_pages(unsigned int num)
{
...
        if ((pgt_buf_end + num) > pgt_buf_top || !can_use_brk_pgt) {
                unsigned long ret = 0;

                if (min_pfn_mapped < max_pfn_mapped) {
                        ret = memblock_phys_alloc_range(
                                        PAGE_SIZE * num, PAGE_SIZE,
                                        min_pfn_mapped << PAGE_SHIFT,
                                        max_pfn_mapped << PAGE_SHIFT);
                }
                if (!ret && can_use_brk_pgt)
+                       ret = __pa(extend_brk(PAGE_SIZE * num, PAGE_SIZE));

                if (!ret)
                        panic("alloc_low_pages: can not alloc memory");

                pfn = ret >> PAGE_SHIFT;
        } else {
                pfn = pgt_buf_end;
                pgt_buf_end += num;
        }
...
}

arch/x86/kernel/head32.c 也有提到 _brk_end,但皆和 x86 的 Page table 初始化有關:

/*
 * Initialize page tables.  This creates a PDE and a set of page
+* tables, which are located immediately beyond __brk_base.  The variable
+* _brk_end is set up to point to the first "safe" location.
 * Mappings are created both at virtual address 0 (identity mapping)
 * and PAGE_OFFSET for up to _end.
 *
 * In PAE mode initial_page_table is statically defined to contain
 * enough entries to cover the VMSPLIT option (that is the top 1, 2 or 3
 * entries). The identity mapping is handled by pointing two PGD entries
 * to the first kernel PMD. Note the upper half of each PMD or PTE are
 * always zero at this stage.
 */
 void __init mk_early_pgtbl_32(void)
 {
 ...
 }

Linux 中也有 brk() 的系統呼叫,用於增加或減少 Process 的 Heap 大小,是 malloc(), new() 等的底層操作。

@jserv:
這似乎跟 x86 平台的 Page 較有關聯。想請教 _brk_start, _brk_end 和 Linux 中的 brk() 有關嗎?

測驗 γ

1. 解釋上述程式碼運作原理

1-1: qsort 程式分析:

Quick Sort 是一種 Divide and Conquer (分治法) 的排序演算法,基本邏輯如下:

  • 從序列中隨機選擇一元素稱為 Pivot。
  • 透過交換調整序列的排序,使得 Pivot 左側的數值都小於 Pivot, 右側的數值都大於 Pivot。
  • 將 Pivot 左右的兩個序列視為新的子問題遞迴地處理,直到排序完成。

qsort 程式的執行緒有三種狀態:

enum thread_state {
    ts_idle, //空閒
    ts_work, //工作中
    ts_term  //請求終止
};

qsort 程式使用了以下的結構體儲存資料:

struct qsort {
    enum thread_state st;   //即 ts_idle, ts_work 或 ts_term
    struct common *common;  //各個執行緒共同持有的資料
    void *a;                //數列(陣列)的起始位址
    size_t n;               //數列中有幾個元素
    pthread_t id;           //執行緒 ID
    pthread_mutex_t mtx_st; //狀態改變時需要使用的 Mutex
    pthread_cond_t cond_st; //狀態改變時通知等待中執行緒用的 Conditional variable
};
struct common {
    int swaptype;
    size_t es;
    void *thunk;
    cmp_t *cmp;             //Function pointer, 用於指定要用哪一個比較函式的實做
    int nthreads;           //Thread pool 中有幾個執行緒
    int idlethreads;        //Thread pool 中有幾個空閒的執行緒
    int forkelem;           //每個用於排序的執行緒最少要有幾個元素,太小的話不值得使用多執行緒處理
    struct qsort *pool;     //Thread pool, 列表內儲存各個執行緒的資訊
    pthread_mutex_t mtx_al; //產生新的執行緒時用來保護數列資料不被修改的 Mutex
};

qsort_mt():

  • qsort 程式多執行緒模式下的進入點。
  • 進入後根據 maxthreads 的量值產生多個執行緒但都設定為空閒狀態 (ts_idle)。當所有執行緒都被建立後會將 Thread pool 中的第一個執行緒 (c.pool[0]) 設定為工作狀態 (ts_work),如此便會觸發第一層的排序。
  • 產生的執行緒的進入點是 qsort_thread()
  • 當這些執行緒都完成任務結束後 (即 pthread_join() 成功後) 程式便會結束。

qsort_thread():

  • qsort 程式的執行緒處理函式 (Thread handler function)。會呼叫 qsort_algo() 對給定的數列進行排序。
  • 只有在狀態為 ts_work 才會進行排序。ts_idle 的情況下會透過 pthread_cond_wait() 進入睡眠直到被通知/喚醒起來進行排序。
  • qsort_algo() 有可能再喚醒新的執行緒進行分治。新喚醒的執行緒亦會使用 qsort_thread() 觸發排序。

allocate_thread():

  • qsort 的 Thread pool 中的執行緒在第一次排序觸發前便都已經被建立,並除了第一個執行緒以外都被設定為了空閒狀態 (ts_idle)。allocate_thread() 的目的是當需要新的執行緒進行平行運算時,將狀態由 ts_idle 改為 ts_work
  • allocate_thread() 只有分配新的 Thread, 喚醒要再透過 pthread_cond_signal() (稍後有說明)。
  • 執行緒喚醒前必須先鎖上 c->mtx_al 防止其他執行緒修改數列內容。
  • 同時因為要修改執行緒的狀態 (即 c->pool[i].st) 但同時此變數也可能被其他執行緒讀取,因此必須使用 c->pool[i].mtx_st 進行保護。

allocate_thread() 的這段程式中:

/* Wait for work to be allocated. */ verify(pthread_mutex_lock(&qs->mtx_st)); while (qs->st == ts_idle) verify(pthread_cond_wait(&qs->cond_st, &qs->mtx_st)); //HHHH verify(pthread_mutex_unlock(&qs->mtx_st));

因為 qs->st 的存取需要考慮 Data race 問題,因此存取前後必須使用 Mutex 保護以進入 Critical section。而 pthread_cond_wait() 則讓執行緒在狀態為 ts_idle 時可以進入睡眠,同時將 Mutex 解鎖。pthread_cond_wait() 使得其他執行緒可以修改狀態的內容,並當執行緒被 pthread_cond_signal() 重新喚醒時,行4的 pthread_cond_wait() 在離開 while 迴圈前會將 Mutex 重新鎖上。

qsort_algo():

  • 排序程式的演算法。細節理解上仍有困難。大致推斷會先根據給定數列大小使用 swap()med3() 的方法進行排序。且可以發現在 swap_cnt == 0 的條件下會觸發 Insertion sort, 所以 qsort 程式並不是純粹基於 Quick sort 演算法邏輯的程式。
  • 排列完後會產生左右兩子數列,如果子數列夠大,左邊的部分就會分配給新的執行緒運算,否則直接遞迴處理。 (太短的數列使用新的執行緒沒有充分的效益)。右邊的子數列則由當前執行緒繼續排序。
  • 喚醒新的執行緒的方法是先透過呼叫 allocate_thread() 再透過 pthread_cond_signal() 喚醒執行緒。

1-2: Quick sort 與 Insertion sort:

ChatGPT 對於 Quick sort 和 Insertion sort 比較上的說明:

快速排序(Quick Sort)和插入排序(Insertion Sort)都是排序算法,
但它們在不同的情況下可能表現出不同的性能。
一般來說,快速排序通常在大部分情況下的效能優於插入排序,
但在某些特定情況下,插入排序可能會比快速排序表現得更好。

---

快速排序的優點:
快速排序的平均時間複雜度為 O(n log n),在大部分情況下,其效能表現優異。
它適合處理大型數據集合,能夠在一些情況下進行「原地排序」(in-place sorting),
並且可以透過優化和隨機選擇主元(pivot)來減少最壞情況的出現。

---

插入排序的優點:
插入排序在小型數據集合上可能比較快,因為它的時間複雜度為 O(n^2),但在「部分有序」的情況下,它的表現可能更好。
插入排序是一種「穩定排序」,在特定情況下,它可能比快速排序更適合。
插入排序在以下情況下可能比快速排序表現得更好:

--

小型數據集合:
當待排序的數據集合相對較小時,插入排序的性能可能比快速排序更好,
因為插入排序的常數因子較小。

部分有序數據: 
如果數據集合已經部分有序,插入排序可能會受益於這種情況,
因為它只需要少量操作來將元素插入到已排序的區域,
而不需要像快速排序一樣進行大量的分割和比較。

--

總之,插入排序在處理小型數據集合和部分有序數據時可能優於快速排序。
但在大部分情況下,特別是處理大型數據集合時,快速排序的效能通常會更好。
選擇排序算法時,需要考慮數據的特性和規模,以及適用於該情況的排序算法。

2. 以 Thread Sanitizer 找出上述程式碼的 data race 並著手修正

編譯選項引入 ThreadSanitizer:

gcc -Wall -fsanitize=thread -o qsort qsort_mt.c -lpthread

執行程式後得到:

==================
WARNING: ThreadSanitizer: data race (pid=6579)
  Write of size 4 at 0x7b4000000080 by thread T1 (mutexes: write M0):
    #0 allocate_thread <null> (qsort+0x3476)
    #1 qsort_algo <null> (qsort+0x4193)
    #2 qsort_thread <null> (qsort+0x45b1)

  Previous read of size 4 at 0x7b4000000080 by thread T2 (mutexes: write M2):
    #0 qsort_thread <null> (qsort+0x44c7)

  Location is heap block of size 256 at 0x7b4000000000 allocated by main thread:
    #0 calloc ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:672 (libtsan.so.0+0x31edc)
    #1 qsort_mt <null> (qsort+0x28fa)
    #2 main <null> (qsort+0x500e)

  Mutex M0 (0x7ffc39e32d48) created at:
    #0 pthread_mutex_init ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:1227 (libtsan.so.0+0x4bee1)
    #1 qsort_mt <null> (qsort+0x28dd)
    #2 main <null> (qsort+0x500e)

  Mutex M2 (0x7b40000000a8) created at:
    #0 pthread_mutex_init ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:1227 (libtsan.so.0+0x4bee1)
    #1 qsort_mt <null> (qsort+0x297f)
    #2 main <null> (qsort+0x500e)

  Thread T1 (tid=6581, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
    #1 qsort_mt <null> (qsort+0x2a8e)
    #2 main <null> (qsort+0x500e)

  Thread T2 (tid=6582, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
    #1 qsort_mt <null> (qsort+0x2a8e)
    #2 main <null> (qsort+0x500e)

SUMMARY: ThreadSanitizer: data race (/home/shengwen/workspace/ncku/linux2023-summer-quiz/3-qsort/qsort+0x3476) in allocate_thread
==================
ThreadSanitizer: reported 1 warnings

根據警告訊息, ThreadSanitizer 發現了一個 Data race 的異常。Data race 是指多個執行緒在沒有正確上鎖保護的情況下存取了相同的記憶體位置,且至少有一個執行緒寫入了資料。這會使並行程式的執行發生異常,並可能導致結果不正確。

警告訊息的前三段說明了具體情況:

WARNING: ThreadSanitizer: data race (pid=6579)
  Write of size 4 at 0x7b4000000080 by thread T1 (mutexes: write M0):
    #0 allocate_thread <null> (qsort+0x3476)
    #1 qsort_algo <null> (qsort+0x4193)
    #2 qsort_thread <null> (qsort+0x45b1)

  Previous read of size 4 at 0x7b4000000080 by thread T2 (mutexes: write M2):
    #0 qsort_thread <null> (qsort+0x44c7)

  Location is heap block of size 256 at 0x7b4000000000 allocated by main thread:
    #0 calloc ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:672 (libtsan.so.0+0x31edc)
    #1 qsort_mt <null> (qsort+0x28fa)
    #2 main <null> (qsort+0x500e)

即: 記憶體位址為 0x7b4000000080 的內容同時被 T1 寫入和 T2 讀取,前者使用了 Mutex M0 而後者使用 Mutex M2。同時 0x7b4000000080 是位於 0x7b4000000000 位址、Block size 大小為 256 的 Heap 記憶體。

因為根據上述訊息,問題主要是發生在 allocate_thread() 的不正常寫入 (T1 寫入資料時沒有保護以至於 T2 可以讀取)。經過實驗可以發現需要進行以下修改:

static struct qsort *allocate_thread(struct common *c)
{
    verify(pthread_mutex_lock(&c->mtx_al));
    for (int i = 0; i < c->nthreads; i++)
        if (c->pool[i].st == ts_idle) {
            c->idlethreads--;
-           c->pool[i].st = ts_work;
            verify(pthread_mutex_lock(&c->pool[i].mtx_st));
+           c->pool[i].st = ts_work;
            verify(pthread_mutex_unlock(&c->mtx_al));
            return (&c->pool[i]);
        }
    verify(pthread_mutex_unlock(&c->mtx_al));
    return (NULL);
}

如此便可排除問題。

Select a repo