Try   HackMD

linux2023: RinHizakura - 作業 1

測驗 α

S-tree 程式碼運作原理

資料結構

/* S-Tree uses hints to decide whether to perform a balancing operation or not.
 * Hints are similar to AVL-trees' height property, but they are not
 * required to be absolutely accurate. A hint provides an approximation
 * of the longest chain of nodes under the node to which the hint is attached.
 */
struct st_node {
    short hint;
    struct st_node *parent;
    struct st_node *left, *right;
};

struct st_root {
    struct st_node *root;
};

首先從資料結構中探討起,我們可以看出 S-tree 的設計上與一般的 BST 類似,結構中維護左右與親代的子節點。特別的是 hint 這個成員,這讓 S-tree 可以在追求快速的 insert/delete 操作速度的同時,也可以近乎達到 AVL tree 所期待的平衡效果以獲得更佳/更一致(consistent)的 lookup 延遲。其作用細節在後續的實作中解釋。

設計上覺得 S-tree 提供的介面(API,即 st_* 系列函式) 似乎有點彆扭? 比方說 st_insert 的實作需要由外部的 caller 先確認好 node 之間的 comparison order。假設我們確立 S-tree 就是只作為 BST 來使用,我覺得理想的介面應該是:

void st_insert(struct st_node **root, struct st_node *n);

可能再加上做 compare 的函式指標 (*cmp),或是為 rootst_node 再包裝一個更高級的資料結構並將 cmp 作為其中一員。

st_first

struct st_node *st_first(struct st_node *n)
{
    if (!st_left(n))
        return n;

    return st_first(st_left(n));
}

struct st_node *st_last(struct st_node *n)
{
    if (!st_right(n))
        return n;

    return st_last(st_right(n));
}

st_firstst_last 的作用是找到以 n 為 root 的 subtree 中,order 最低即最高(根據對節點間 comparison 定義)的節點。

st_replace_right

st_replace_right 的功能是用 r 來取代 n 原本在樹上的位置。更精準的說,這其實是 BST 之 remove 操作的一種形式,實際上 rn 的關係必然滿足 r = st_first(st_right(n))(見 st_remove)。基於此前提下,這個 replace 實作才是有效且正確的。

可以想見有幾個節點間的關係可能會因應 replace 而進行調整:

  1. r 的左子樹
  2. r 的右子樹
  3. r 的父節點
  4. r 原本的左子樹之父節點: 因為 rst_first(st_right(n)),左子樹必定為 NULL
  5. r 原本的右子樹之父節點
  6. r 原本的父節點之左or右子樹
  7. r 原本的父節點之父節點
  8. n 原本的左子樹之父節點
  9. n 原本的右子樹之父節點
  10. n 原本的父節點之左or右子樹

在實作 replace 時,我們要考慮這些 link 是否都調整正確。

static inline void st_replace_right(struct st_node *n, struct st_node *r)
{
    struct st_node *p = st_parent(n), *rp = st_parent(r);

    if (st_left(rp) == r) {
        st_left(rp) = st_right(r);
        if (st_right(r))
            st_rparent(r) = rp;
    }

首先判斷 rrp 的左子還是右子,如果是左子,則將 rp 左子替換為 r 的右子樹(6),右子樹的 parent 也需要更新(5)。

  1. 如果 rrp 的右子,其實可以預期此時 n 就是 rp,則此時就不必將 r 的右子樹交給其他人繼承,可想而知 rp 的左右也不可能繼承任何子樹,稍後就會被刪除
  2. r 的左子樹怎麼辦呢? 留意到因為在刪除操作中 r 必然是來自某一子樹 treest_first(tree),因此可預期 st_left(r) = NULL
    if (st_parent(rp) == n)
        st_parent(rp) = r;

rp 的父節點是 n 的狀況下,由於 n 即將被刪除且由 r 取代,因此 st_parent(rp) 勢必要被修改(7)。

    st_parent(r) = p;
    st_left(r) = st_left(n);

r 將取代 n 原本的位置,這裡調整此關係(1 & 3)。

    if (st_right(n) != r) {
        st_right(r) = st_right(n);
        st_rparent(n) = r;
    }

r 不正好是 n 的右子樹的情況下,r 需繼承 n 的右子樹,反過來的關係也需建立(2 & 9)。

    if (p && st_left(p) == n)
        st_left(p) = r;
    else if (p)
        st_right(p) = r;

這裡調整 n 之父節點的子代(10)。

    if (st_left(n))
        st_lparent(n) = r;
}

調整 n 原本左子樹的父節點關係(8)。

st_replace_right

st_replace_left 類似,這裡就不多做探討。

st_remove

st_remove 將給定的 del 從樹中移除。

void st_remove(struct st_node **root, struct st_node *del)
{
    if (st_right(del)) {
        struct st_node *least = st_first(st_right(del));
        if (del == *root)
            *root = least;

        AAAA;
        BBBB;
        return;
    }

刪除方法的基本邏輯近似一般的 BST,若右子樹存在,則找到右子樹中 order 最小者(st_left 必然為 NULL),以其取代原本的 del 即可。

  • AAAA = st_replace_right(del, least)
  • BBBB = st_update(root, st_right(least))
    if (st_left(del)) {
        struct st_node *most = st_last(st_left(del));
        if (del == *root)
            *root = most;

        CCCC;
        DDDD;
        return;
    }

若右子樹不存在,則找到左子樹中 order 最大者(st_right 必然為 NULL),以其取代原本的 del

  • CCCC = st_replace_left(del, most)
  • DDDD = st_update(root, st_left(most))
    if (del == *root) {
        *root = 0;
        return;
    }

考慮到刪除的節點剛好是 root 的情況下,也需要更新 root 指標。

    /* empty node */
    struct st_node *parent = st_parent(del);

    if (st_left(parent) == del)
        st_left(parent) = 0;
    else
        st_right(parent) = 0;

    EEEE;
}

如果被被刪除的節點沒有左右都沒有節點,意即其為 leaf,只要將調整其 parent 的 child 關係,再直接刪除之就好。

EEEE = st_update(root, parent)

st_rotate_right

rotate 是為了平衡 BST 所需進行的操作,它調整數個節點間的親子關係以達此目的。方便理解,我們以下圖為例示範如何對 node 6 做 rotate right 操作。

static inline void st_rotate_right(struct st_node *n)
{
    struct st_node *r = st_right(n), *p = st_parent(n);

根據程式碼定義初始的狀態應該如下。







BST



l3

3



l4

4(*p)



l4->l3





l6

6(*n)



l4->l6





l5

5



l6->l5





l7

7(*r)



l6->l7





n1

NULL



l7->n1





n2

8



l7->n2





n3

NULL



n2->n3





n4

9



n2->n4





    st_parent(r) = st_parent(n);
    st_right(n) = st_left(r);
    st_parent(n) = r;
    st_left(r) = n;

    if (p && st_left(p) == n)
        st_left(p) = r;
    else if (p)
        st_right(p) = r;

    if (st_right(n))
        st_rparent(n) = n;
}

則可以根據程式碼行為一步一步用紙筆檢查 pointer 的調整,完成旋轉後應該會變成如下圖的樣子。看到 node 7 被從右子樹往上提一個等級,藉此降低原本子樹的高度。







BST



l3

3



l4

4(*p)



l4->l3





l7

7(*r)



l4->l7





l5

5



l6

6(*n)



l6->l5





n1

NULL



l6->n1





l7->l6





n2

8



l7->n2





n3

NULL



n2->n3





n4

9



n2->n4





st_rotate_left

道理類似 st_rotate_right,這邊就不再費時說明。

st_update

static inline void st_update(struct st_node **root, struct st_node *n)
{
    if (!n)
        return;

    int b = st_balance(n);
    int prev_hint = n->hint;

st_balance 判斷節點 n 在左右子樹的平衡程度,當 b > 0,表示左子樹的高度大於右子樹,反之 b < 0 則表示是右子樹較高。注意到嚴格來說 hint 只是對高度的估計而非完全正確的高度值。

    struct st_node *p = st_parent(n);
    if (b < -1) {
        /* leaning to the right */
        if (n == *root)
            *root = st_right(n);
        st_rotate_right(n);
    }

    else if (b > 1) {
        /* leaning to the left */
        if (n == *root)
            *root = st_left(n);
        st_rotate_left(n);
    }

如果右子樹比較高,我們就做 st_rotate_right;與之相對,左子樹較高的情況是做 st_rotate_left 來平衡 BST。

    n->hint = st_max_hint(n);
    if (n->hint == 0 || n->hint != prev_hint)
        st_update(root, p);
}

旋轉之後,因為節點本身左右高度的變化要更新 hint。而當自身 hint 有變或是變成 leaf 時,也要向原本的 parent 方向(以前面 st_rotate_right 旋轉 node 6 為例,即是要 update node 4)繼續更新 hint 和調整平衡。

st_insert

st_insert 將節點插入到 S-tree 中。

void st_insert(struct st_node **root,
               struct st_node *p,
               struct st_node *n,
               enum st_dir d)
{
    if (d == LEFT)
        st_left(p) = n;
    else
        st_right(p) = n;

    st_parent(n) = p;
    st_update(root, n);
}

實作上沒有什麼複雜之處,就是將 n 插入成指定的 parent p 之左或右子節點。

基於 S-tree 的 treeint 運作原理

資料結構

struct treeint {
    int value;
    struct st_node st_n;
};

模仿 Linux 的 linked list 風格,st_node 可以被置於任何 struct 下以串連不同種類的資料結構。本題中是以包含單一 int 的節點示範。

treeint_insert

struct treeint *treeint_insert(int a)
{
    struct st_node *p = NULL;
    enum st_dir d;
    for (struct st_node *n = st_root(tree); n;) {
        struct treeint *t = container_of(n, struct treeint, st_n);
        if (a == t->value)
            return t;

        p = n;

        if (a < t->value) {
            n = st_left(n);
            d = LEFT;
        } else if (a > t->value) {
            n = st_right(n);
            d = RIGHT;
        }
    }

因為目前 st_insert 提供的介面,treeint_insert 必須先找出要插入的位置。

注意到嚴格的 BST 中是不存在重複 key 值的節點的,因此若存在重複我們直接回傳該已存在的相同 key 值節點即可。

    struct treeint *i = calloc(sizeof(struct treeint), 1);
    if (st_root(tree))
        st_insert(&st_root(tree), p, &i->st_n, d);
    else
        st_root(tree) = &i->st_n;

    i->value = a;
    return i;
}

找到符合 BST 的正確位置後,完成節點的插入。

__treeint_dump

static void __treeint_dump(struct st_node *n, int depth)
{
    if (!n)
        return;

    __treeint_dump(FFFF, depth + 1);

    struct treeint *v = treeint_entry(n);
    printf("%d\n", v->value);

    __treeint_dump(GGGG, depth + 1);
}

經典的 inorder 走訪。

  • FFFF = st_left(n)
  • GGGG = st_right(n)

S-tree 的介面改進

雖然原題目沒有要求這項改進。不過正如前面所說,我認為 S-tree 目前的 API 設計很彆扭。若我們將 S-tree 作為函式庫來看,從使用者的角度而言,他應該只關心插入和移除的鍵值為何,而不必明確知道 S-tree 內部是如何管理節點之間關係的。然而目前的設計下我們可以在 treeint_inserttreeint_remove 看到依然存在複雜的邏輯來走訪整個樹,後者應該被隱藏在函式庫內部的實作才是。

為此,我重新定義了 S-tree 提供的 public interface。

struct st_tree *st_create(cmp_t *cmp,
                          struct st_node *(*create_node)(void *key),
                          void (*destroy_node)(struct st_node *n));
void st_destroy(struct st_tree *tree);
int st_insert(struct st_tree *tree, void *key);
int st_remove(struct st_tree *tree, void *key);
struct st_node *st_find(struct st_tree *tree, void *key);

可以在 st_create 中看到節點之間的 order 比較(cmp)、節點的建立與銷毀(create_node/destroy_node) 被從 S-tree 中獨立出來,以 callback function 的方式讓函式庫的使用者自行實作。其餘包含樹的走訪邏輯、root node 的維護等都直接在內部完成。

順帶一提,原本 treeint_inserttreeint_remove 中走訪 S-tree 的邏輯其實很相似。這裡我們實際上也可以重用 st_find 函式來歸納之,避免重寫相同的邏輯,以減少程式碼編譯後產生之二進位檔的大小。

完整的實作改進可以參照 s_tree.c。有了這項調整,未來就可以很輕易地將 S-tree 應用在其他合適的專案之中了!

雖然介面上變得簡單許多,不過其實如果對照 Linux 的 rbtree 實作,會發現其反而避免使用了使用 callback 的方式,作法比較接近本題原始的實作。

原因可參照原作者在 rbtree.h 中的註解說明:

To use rbtrees you'll have to implement your own insert and search cores. This will avoid us to use callbacks and to drop drammatically performances. I know it's not the cleaner way, but in C (not in C++) to get performances and genericity

其實不是很確定作者所述的 performance drop 具體有多高? 主要來自什麼部份(cache locality?),或許值得探討一下。畢竟如果能夠以 callback 方式實作,毫無疑問對使用者的上手門檻是更低的,應用上也更為容易

探討 S-tree 與 rbtree 效能落差

首先,為利於後續加入其他演算法進行效能評比,對原始程式碼做出以下改動,以建立相關基礎建設:

  1. 加入一個 macro benchmark,方便測量一段程式碼完成的時間
  • 主要測量 insert/find/remove 三種操作的時間
  • macro 寫得不是很漂亮,不過暫時還算堪用
  1. 撰寫一個簡單的 python script tree-perf.py,方便將測量結果視覺化
  • 為了讓測量結果盡量不受 noise 干擾,且視覺化後的曲線圖可以更平滑,試著用簡單的標準差法排除蒐集到的時間中之 outlier
  1. 將 BST 的各項操作也抽象成 callback function pointer
  • 雖然前面提到 callback 的方式可能會對函式呼叫整體造成額外成本,但這裡我們主要的目標是要比較不同 tree 的 throughput,而非注重單一 tree 本身在實際應用上的可能優化
  • 因此取捨傾向先以能夠容易且快速的加入各種 BST 實作的設計方式為主

效能評比程式有了,那麼 rbtree 的函式庫該從何而來呢? 稍微搜尋了一下,一方面似乎沒有比較被大眾廣泛接受的紅黑樹相關 C 函式庫;一方面,S-tree 的實作和 Linux 中的 rbtree 介面上其實存在不少相似之處。因此我們乾脆將 Linux 的紅黑樹實作拉到 userspace 來應用。這裡參照 Linux 6.4 版本中的程式碼,並且只擷取與 insert/find/remove 三項操作相關的函式實作,具體整理出來的程式碼可以參考:

接著我們就可以來測量各個操作間的差異。

根據老師的建議將實驗分為循序輸入和亂序輸入的場景。對於循序輸入的部份,我們將從 0 一直到 tree_size - 1 的 key 值插入到樹中,然後再以相同的次序進行搜尋和刪除。過程記錄不同的樹節點數量與完成相應操作所需之累積時間的關係。則實驗的結果如下圖:

循序狀況下,似乎在樹節點數量較少時兩者差異不大,而節點增加時,S-tree 相較於 rbtree 的表現更好?

而亂序輸入的場景下,我們會隨機產生任意的插入/搜尋/移除值,實驗結果如下:

從結果來看,亂序狀況下,S-tree 整體的效能看起來稍優於 rbtree。且在亂序情形下 rbtree 似乎更容易有 worst case (圖中極端值) 的出現。

該怎麼依此給出合理的解釋呢?

  • 需注意到其實無論是插入/搜尋/刪除操作,其實三者都具備搜尋的部份,而插入和刪除只是多了調整節點間關係+旋轉的步驟
    • 則如果瓶頸發生在搜尋部份,可以預期三種操作測量出的時間曲線圖將很相似(例如本實驗)?
  • 對於亂序下出現特殊的極端值(圖中的尖峰),可能是對某個 key 值的搜尋會導致最差情況(worst case)的發生?
    • 不認為是硬體(CPU、Cache)干擾的原因在於三種操作在同一節點數量都出現極端值
  • 理論上,在樹的平衡上 S-tree 應具有較為明顯的優勢,如下圖以循序插入 0 - 9 的節點為例,可以看到在限制樹高度的能力上 S-tree 是更有效的


S-tree 從 0 到 9 插入節點後

rbtree 從 0 到 9 插入節點後

  1. 以上敘述純屬推測(嘴炮),需拿出更多證據支持
  2. 設計何種場景可以彰顯 rbtree 在插入/移除上的優勢?

S-tree 的最壞場景

然而在特定輸入上可以體現 S-tree 的 hint 對高度估計的不精準導致的 worst case。我們首先輸入 1,接著輸入 1000,則 S-tree 會呈現以下樣貌:







BST



n0

1(1)



n1

NULL



n0->n1





n2

1000(0)



n0->n2





此時 node 1 的 hint 為 1,node 1000 的 hint 為 0(括號中數字)。接著插入 900,最開始會呈現以下樣貌:







BST



n0

1(1)



n1

NULL



n0->n1





n2

1000(0)



n0->n2





n3

900(0)



n2->n3





n4

NULL



n2->n4





然後我們對 node 900 做 st_update 後,接著往 parent 方向也對 node 1000 做 st_update,後者的 hint 會被更新:







BST



n0

1(1)



n1

NULL



n0->n1





n2

1000(1)



n0->n2





n3

900(0)



n2->n3





n4

NULL



n2->n4





繼續往 parent 方向,也對 node 1 做 st_update。此時因為 hint 條件下不 balance,將會進行右旋轉 (st_rotate_right)。旋轉後的結果呈現以下狀態。







BST



n0

1(1)



n3

NULL



n0->n3





n4

900(0)



n0->n4





n1

NULL



n2

1000(1)



n2->n0





n2->n1





這邊可以看到 S-tree 潛在的問題。由於旋轉的關係,node 1000 的連結關係被改變,然而 update 的手法將不會對其 hint 進行二度調整,最後導致 hint 並不能正確反映樹的高度,進而也沒有機會將二元樹平衡。

想像上,若強硬的去追蹤每個節點的高度並平衡將迫使 S-tree 與 AVL 相近,進而失去原本設計上想帶來的優勢。那麼該怎麼權衡可以在有效率的插入/刪除的同時保持一定程度的二元樹平衡呢? 這可能就需要更深入的思考了。

測驗 β

程式碼運作原理

#include <stdint.h>

static inline uintptr_t align_up(uintptr_t sz, size_t alignment)
{
    uintptr_t mask = alignment - 1;
    if ((alignment & mask) == 0) {  /* power of two? */
        return MMMM;       
    }
    return (((sz + mask) / alignment) * alignment);
}

MMMM = (sz + mask) & ~mask

不囉唆,直接看 align.h

測驗 γ

考古

qsort 是由 C 語言標準中定義的排序介面,而內部的實作則根據函式庫的實現各有不同。值得注意的是,qsort 並不等於 quick sort,而其一般是以優化過的 hybrid sort 方式實作,後者混合多種類的 sorting 演算法,並根據輸入選擇合適者,此作法在提供普遍(general)的排序算法時更為有利,如本測驗題的實作也是一種。

有點好奇XD :rolling_on_the_floor_laughing:

翻了一下 C 語言標準中(7.22.5.2 The qsort function)的描述,似乎其沒有對 qsort 的時間複雜度給予規範? 不曉得這是否意味假如我們自己開發標準函式庫時,用 bubble sort 作為 qsort 的內部實現也是合法的? 或是我漏掉了其他的章節?

稍微搜尋了一下,本測驗題應該是基於 J. L. Bentley 和 M. D. McIlroy 在 Engineering a Sort Function (很抱歉沒付錢看不了連結倒是可以找一個簡單的投影片) 提出的方法作為延伸,該方法的身影也可以在 FreeBSD libcapple 的 XNU 等等專案中發現。而測驗題則善用其中 quick sort 部份會需要 partition 成兩段陣列並分別排序的特性,嘗試將這些排序平行/並行運算來提昇執行效率。

程式碼運作原理

qsort_mt

qsort_mt 是本測驗題所要實作的 multithread qsort 的介面本體。

void qsort_mt(void *a,
              size_t n,
              size_t es,
              cmp_t *cmp,
              int maxthreads,
              int forkelem)
{
    struct qsort *qs;
    struct common c;
    int i, islot;
    bool bailout = true;

    if (n < forkelem)
        goto f1;
    errno = 0;
    /* Try to initialize the resources we need. */
    if (pthread_mutex_init(&c.mtx_al, NULL) != 0)
        goto f1;
    if ((c.pool = calloc(maxthreads, sizeof(struct qsort))) == NULL)
        goto f2;

forkelem 是決定需不需要額外啟動一個 thread 來平行排序的參數,如果陣列單元數量小於此參數的情況下,可以直接用標準函式庫的 qsort 就好。

如果要啟動 multithread qsort,這裡的實作是採用 thread pool 的方式。最為直接的實作下,我們可以判斷需要平行運算時才建立 thread,並在 thread 完成後直接銷毀之。這樣的作法對於資源的控管和 thread 的設計相對輕鬆。但缺點也很明顯,頻繁的建立和銷毀 thread 將造成額外的開銷,進而帶來額外的延遲時間。

取而代之,我們可以預先建立好數個 thread worker,並在需要時才從中挑選 idle thread 出來工作,詳細的說明可以在 案例: Thread Pool 實作和改進中閱讀到。總而言之,thread pool 雖然管理上更為複雜(後續就會見識到了XD),但在分配平行任務上更為高效,而這裡我們就需要為每個 thread 建立一個專屬的 context struct qsort,後者可用來攜帶參數,以告訴 thread 接下來要排序的陣列和其大小等資訊。

    for (islot = 0; islot < maxthreads; islot++) {
        qs = &c.pool[islot];
        if (pthread_mutex_init(&qs->mtx_st, NULL) != 0)
            goto f3;
        if (pthread_cond_init(&qs->cond_st, NULL) != 0) {
            verify(pthread_mutex_destroy(&qs->mtx_st));
            goto f3;
        }
        qs->st = ts_idle;
        qs->common = &c;
        if (pthread_create(&qs->id, NULL, qsort_thread, qs) != 0) {
            verify(pthread_mutex_destroy(&qs->mtx_st));
            verify(pthread_cond_destroy(&qs->cond_st));
            goto f3;
        }
    }

此 for 迴圈初始每個 thread context,包含初始化各自的 mutex 和 condition variable,並建立對應的 thread 等。

    /* All systems go. */
    bailout = false;

    /* Initialize common elements. */
    c.swaptype = ((char *) a - (char *) 0) % sizeof(long) || es % sizeof(long)
                     ? 2
                 : es == sizeof(long) ? 0
                                      : 1;

swaptype 的目的是確認所排序陣列的指標對齊狀況,這攸關後續將一個(swap)或多個(vecswap)元素作為一個單位進行 swap 的手法。首先仔細觀察其中的條件式:

  1. ((char *) a - (char *) 0) % sizeof(long): 確認陣列 a 是否與 long 型別長度對齊 (若否為 true)
  2. es % sizeof(long): 檢查陣列中的元素是否與 long 型別對齊 (若否為 true)
  3. es == sizeof(long): 這條件在 1 和 2 皆不成立時才檢查,換句話說 a 必然對齊 sizeof(long),且元素長度也對齊之,此時判斷元素大小是否直接等同 long (若是為 true)

則總結 swaptype 之值代表的意涵以及對應的 swap 手法為:

  • 0 代表 a 對齊 sizeof(long),且 es 等同 sizeof(long)
    • swap: 直接將指標 cast 成 long 交換即可
    • vecswap: 假設總共要交換的空間大小為 n,則以 sizeof(long) 為單位交換 n / sizeof(long)
  • 1: a 對齊 sizeof(long)es 對齊 sizeof(long) 但不等同
    • swap / vecswap: 假設總共要交換的空間大小為 n,則以 sizeof(long) 為單位交換 n / sizeof(long)
  • 2: a 不對齊 sizeof(long)es 不對齊 sizeof(long)
    • swap / vecswap: 假設總共要交換的空間大小為 n,則以 sizeof(char) 為單位交換 n / sizeof(char)
    • 顯然 sizeof(char) < sizeof(long),因此 swaptype == 2 之情況下可以預期是用較無效率的方法進行的

:notes:

在 C 語言標準 6.5.3.4 The sizeof and alignof operators 中提到:

When sizeof is applied to an operand that has type char, unsigned char, or signed char, (or a qualified version thereof) the result is 1.

換句話說,在不對齊狀況下,每次只能以 1 bytes 為單位進行 swap。

    c.es = es;
    c.cmp = cmp;
    c.forkelem = forkelem;
    c.idlethreads = c.nthreads = maxthreads;

struct commonc 攜帶一些與陣列整體相關的資訊,原則上是在各個 thread 中只可讀的資訊。(idlethreads 應該是唯一的例外?)

    /* Hand out the first work batch. */
    qs = &c.pool[0];
    verify(pthread_mutex_lock(&qs->mtx_st));
    qs->a = a;
    qs->n = n;
    qs->st = ts_work;
    c.idlethreads--;
    verify(pthread_cond_signal(&qs->cond_st));
    verify(pthread_mutex_unlock(&qs->mtx_st));

必要的初始化都完成後,首先就可以啟動 thread 0 來展開排序的第一步。

    /* Wait for all threads to finish, and free acquired resources. */
f3:
    for (i = 0; i < islot; i++) {
        qs = &c.pool[i];
        if (bailout) {
            verify(pthread_mutex_lock(&qs->mtx_st));
            qs->st = ts_term;
            verify(pthread_cond_signal(&qs->cond_st));
            verify(pthread_mutex_unlock(&qs->mtx_st));
        }
        verify(pthread_join(qs->id, NULL));
        verify(pthread_mutex_destroy(&qs->mtx_st));
        verify(pthread_cond_destroy(&qs->cond_st));
    }
    free(c.pool);
f2:
    verify(pthread_mutex_destroy(&c.mtx_al));
    if (bailout) {
        fprintf(stderr, "Resource initialization failed; bailing out.\n");
    f1:
        qsort(a, n, es, cmp);
    }
}

之後,main thread 就在 for 迴圈中藉由 pthread_join 逐一等待每個 thread 結束,並在 thread 結束時銷毀當初分配的資源。

順帶一提,在 thread pool 的方式下,要怎麼知道後續已經沒有工作需要 thread worker,可以將各個 thread 結束呢? 一種可行的做法是整理所有 thread 的狀態是否 idle。由於我們可以預期直到整個陣列的排序完成前都會有 thread 在運作,因此反過來說當所有 thread 都 idle 時該次排序應該已經完成。後續我們會在 qsort_thread 看到對應實作。

qsort_thread

每個 worker 都會執行各自的 qsort_thread。其主要流程可以分成三個階段:

  1. 等待被喚醒,閒閒沒事(?)的 thread 之 qs->st 會是 ts_idle,直到狀態被切換至 ts_work 開始工作,或是切換到 ts_term 後結束
  1. 喚醒後根據 context 設定進行對應的 sorting
  2. 回到 idle 狀態,此外,若發現自己完成後所有的 thread 已經都是 idle,表示最初指派的任務已經完成,可以結束自身,也將其他 thread 之狀態切換到 ts_term 以結束之

這三階段就對應到下方的程式碼。

static void *qsort_thread(void *p)
{
    struct qsort *qs, *qs2;
    int i;
    struct common *c;

    qs = p;
    c = qs->common;
again:
    /* Wait for work to be allocated. */
    verify(pthread_mutex_lock(&qs->mtx_st));
    while (qs->st == ts_idle)
        verify(HHHH);
    verify(pthread_mutex_unlock(&qs->mtx_st));
    if (qs->st == ts_term) {
        return NULL;
    }
    assert(qs->st == ts_work);

這裡是第一階段行為的描述。其中,對於 HHH 的答案,從外層迴圈行為是在等待 state 的變更,以及該迴圈是搭配 mutex 建立的 critical section 使用的狀態來看,不難看出 HHHH 應該是在等待 condition variable 被 signal,因此:

  • HHHH = pthread_cond_wait(&qs->cond_st, &qs->mtx_st)
    qsort_algo(qs);

第二階段則進行主要任務 qsort_algo

    verify(pthread_mutex_lock(&c->mtx_al));
    qs->st = ts_idle;
    c->idlethreads++;
    if (c->idlethreads == c->nthreads) {
        for (i = 0; i < c->nthreads; i++) {
            qs2 = &c->pool[i];
            if (qs2 == qs)
                continue;
            verify(pthread_mutex_lock(&qs2->mtx_st));
            qs2->st = ts_term;
            verify(JJJJ);
            verify(pthread_mutex_unlock(&qs2->mtx_st));
        }
        verify(pthread_mutex_unlock(&c->mtx_al));
        return NULL;
    }
    verify(pthread_mutex_unlock(&c->mtx_al));
    goto again;
}

第三階段也是如前面的敘述,當 c->idlethreads == c->nthreads,則將其他的 thread 都切換 state 為 st_term,因此也需要透過 condition variable 喚醒對應 thread,則:

  • JJJJ = pthread_cond_signal(&qs2->cond_st)

qsort_algo

/* Thread-callable quicksort. */
static void qsort_algo(struct qsort *qs)
{
    char *pa, *pb, *pc, *pd, *pl, *pm, *pn;
    int d, r, swaptype, swap_cnt;
    void *a;      /* Array of elements. */
    size_t n, es; /* Number of elements; size. */
    cmp_t *cmp;
    int nl, nr;
    struct common *c;
    struct qsort *qs2;

    /* Initialize qsort arguments. */
    c = qs->common;
    es = c->es;
    cmp = c->cmp;
    swaptype = c->swaptype;
    a = qs->a;
    n = qs->n;

前面只是初始化一些 local 變數,以建立簡潔易讀的程式碼。

top:
    /* From here on qsort(3) business as usual. */
    swap_cnt = 0;
    if (n < 7) {
        for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)
            for (pl = pm; pl > (char *) a && CMP(thunk, pl - es, pl) > 0;
                 pl -= es)
                swap(pl, pl - es);
        return;
    }

最開始針對 array 比較小的狀況(<7)做優化,該情況下用簡易的 insertion sort 處理即可。

在下方 swap_cnt 為 0 之處也有一段 insertion sort,感覺或許可以歸納成一個 function 以簡化程式碼?

這邊由於 C 語言不提供泛型,因應 sort 可以針對不同型別排序的情況下,需要以 (char *)a + es * i 去存取第 i 個 element,不得不說閱讀起來有點不舒服XD

    pm = (char *) a + (n / 2) * es;
    if (n > 7) {
        pl = (char *) a;
        pn = (char *) a + (n - 1) * es;
        if (n > 40) {
            d = (n / 8) * es;
            pl = med3(pl, pl + d, pl + 2 * d, cmp, thunk);
            pm = med3(pm - d, pm, pm + d, cmp, thunk);
            pn = med3(pn - 2 * d, pn - d, pn, cmp, thunk);
        }
        pm = med3(pl, pm, pn, cmp, thunk);
    }
    swap(a, pm);

陣列比較大的情況下,quick sort 會是更佳的選擇。眾所周知 quick sort 需要選擇 pivot 來將陣列進行二分。而為了避免 pivot 的選擇不好而導致容易出現 worst case (即時間複雜度為

O(n2))的狀況,一種常見的作法是透過 median of 3 (med3) 來挑選合適的 pivot。

延伸閱讀: Improving Quicksort with Median of 3 and Cutoffs

這裡在挑選出 pivot 後,也將其置換為 array 的首個 element。

    pa = pb = (char *) a + es;
    pc = pd = (char *) a + (n - 1) * es;
    for (;;) {
        while (pb <= pc && (r = CMP(thunk, pb, a)) <= 0) {
            if (r == 0) {
                swap_cnt = 1;
                swap(pa, pb);
                pa += es;
            }
            pb += es;
        }
        while (pb <= pc && (r = CMP(thunk, pc, a)) >= 0) {
            if (r == 0) {
                swap_cnt = 1;
                swap(pc, pd);
                pd -= es;
            }
            pc -= es;
        }
        if (pb > pc)
            break;
        swap(pb, pc);
        swap_cnt = 1;
        pb += es;
        pc -= es;
    }






%0



a

a



values

A[0]

A[1]

A[2]

......

A[n - 2]

A[n - 1]



a->values:f0





pa

pa



pa->values:f1





pb

pb



pb->values:f1





pc

pc



pc->values:f5





pd

pd



pd->values:f5





接下來這邊是 quick sort 中經典的 partition 實作。因為主要目標是要將 order 比 pivot 小者放到 pivot 左方,order 比 pivot 大者放到右方,因此作法是從 pb 往右找到比 pivot 大的元素,從 pc 往左找到比 pivot 大者,然後將兩者交換,並重複此流程直到兩個 pointer 交會。

注意到與普通 quick sort 相比實作上還是有一些小差異,這提供演算法後續更多的優化空間:

  • 期間也記錄下進行 swap 的次數
  • 特地將與 pivot order 相同的元素放置在從 pa 往右/從 pd 往左之處
    pn = (char *) a + n * es;
    r = min(pa - (char *) a, pb - pa);
    vecswap(a, pb - r, r);
    r = min(pd - pc, pn - pd - es);
    vecswap(pb, pn - r, r);

這裡的兩個 vecswap 將 pivot 最後置換到正確的位置中。

    if (swap_cnt == 0) { /* Switch to insertion sort */
        r = 1 + n / 4;   /* n >= 7, so r >= 2 */
        for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)
            for (pl = pm; pl > (char *) a && CMP(thunk, pl - es, pl) > 0;
                 pl -= es) {
                swap(pl, pl - es);
                if (++swap_cnt > r)
                    goto nevermind;
            }
        return;
    }

如果 swap_cnt 為 0,這表示在前面的 partition 中沒發生任何置換,即原始的陣列可以在不置換的前提下完美分成 < pivot> pivot 的兩個子陣列。這情況很可能表示陣列其實已經完成排序或接近完成,因此我們嘗試直接對整個陣列進行 insert sort。

在此我們仍然持續追蹤完成 sort 所需要進行的 swap 數量,如果期間發現仍需要比較多的 swap,那麼可以反悔,還是進行標準的 quick sort。

nevermind:
    nl = (pb - pa) / es;
    nr = (pd - pc) / es;

    /* Now try to launch subthreads. */
    if (nl > c->forkelem && nr > c->forkelem &&
        (qs2 = allocate_thread(c)) != NULL) {
        qs2->a = a;
        qs2->n = nl;
        verify(pthread_cond_signal(&qs2->cond_st));
        verify(pthread_mutex_unlock(&qs2->mtx_st));
    } else if (nl > 0) {
        qs->a = a;
        qs->n = nl;
        qsort_algo(qs);
    }

如果選擇做標準的 quick sort,那就是以 pivot 為界線將陣列區分為左右兩組。對於左邊這組,我們可以考量左右子陣列剩餘需要排序的元素數量,以決定是否從 thread pool 中喚醒另一個 thread 來並行的完成 quick sort。

    if (nr > 0) {
        a = pn - nr * es;
        n = nr;
        goto top;
    }
}

而右邊的子陣列則可以沿用原本的 thread 繼續運算。

想像上這一段寫成以下形式會不會在可讀性上更佳?

    if (nr > 0) {
        qs->a = pn - nr * es;
        qs->n = nr;
        qsort_algo(qs);
    }

實作上的 Data race

在編譯選項中加上 -fsanitize=thread 可以在程式中引進 Thread Sanitizer,後者是能夠偵測執行緒間 data race 的強大工具。另外建議也加入 -g 選項來導入 DWARF,以提升除錯訊息的詳細程度。

加入 flag 後,執行結果如下:

==================
- WARNING: ThreadSanitizer: data race (pid=6973)
  Write of size 4 at 0x7b4000000080 by thread T1 (mutexes: write M0):
    #0 allocate_thread /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:212 (qsort-mt.out+0x3a7f)
    #1 qsort_algo /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:315 (qsort-mt.out+0x3a7f)
    #2 qsort_thread /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:352 (qsort-mt.out+0x3fd8)

  Previous read of size 4 at 0x7b4000000080 by thread T2 (mutexes: write M2):
    #0 qsort_thread /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:344 (qsort-mt.out+0x3f9a)

  Location is heap block of size 256 at 0x7b4000000000 allocated by main thread:
    #0 calloc ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:672 (libtsan.so.0+0x31edc)
    #1 qsort_mt /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:133 (qsort-mt.out+0x43e3)
    #2 main /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:505 (qsort-mt.out+0x2ad3)

  Mutex M0 (0x7fff028782d8) created at:
    #0 pthread_mutex_init ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:1227 (libtsan.so.0+0x4bee1)
    #1 qsort_mt /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:131 (qsort-mt.out+0x43bc)
    #2 main /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:505 (qsort-mt.out+0x2ad3)

  Mutex M2 (0x7b40000000a8) created at:
    #0 pthread_mutex_init ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:1227 (libtsan.so.0+0x4bee1)
    #1 qsort_mt /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:137 (qsort-mt.out+0x44ae)
    #2 main /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:505 (qsort-mt.out+0x2ad3)

  Thread T1 (tid=6975, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
    #1 qsort_mt /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:145 (qsort-mt.out+0x4468)
    #2 main /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:505 (qsort-mt.out+0x2ad3)

  Thread T2 (tid=6976, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
    #1 qsort_mt /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:145 (qsort-mt.out+0x4468)
    #2 main /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:505 (qsort-mt.out+0x2ad3)

SUMMARY: ThreadSanitizer: data race /home/rin/GitHub/linux-summer-2023/hw1/qsort-mt.c:212 in allocate_thread
==================

Thread Sanitizer 明確指出出現 data race 的程式碼位置,對照後會發現問題是出在 allocate_thread 中,在存取 st 成員前應該先對其相應的 mutex lock 進行上鎖動作。

  static struct qsort *allocate_thread(struct common *c) 
  {
      verify(pthread_mutex_lock(&c->mtx_al));
      for (int i = 0; i < c->nthreads; i++)
          if (c->pool[i].st == ts_idle) {
              c->idlethreads--;
+             verify(pthread_mutex_lock(&c->pool[i].mtx_st));
              c->pool[i].st = ts_work;
-             verify(pthread_mutex_lock(&c->pool[i].mtx_st));
  ...

完成上述改動後重新編譯,即可以排除 Thread Sanitizer 的警告訊息。