Try   HackMD

2024q1 Homework2 (quiz1+2)

contributed by <ChiTingCheng>

Quiz1

測驗 1

Q1. 解釋上述程式碼的運作原理,提出改進方案並予以實作。

快速排序法 (Quick Sort) 的實作想法是,從串列中選擇一個節點作為 pivot,接者走訪串列作比較,分成比 pivot 值大及比 pivot 值小的子串列,先對較大的子串列繼續使用快速排序直到子串列內剩餘一個節點,再回去處理較小的子串列。

我們能用遞迴的方式完成上述實作,但每次的遞迴呼叫需要額外的記憶體空間,對記憶體本身部有不友善,因此希望透過加入堆疊的方式來改善這個問題,當我們完成一次分類後,將本來應該各自遞迴計算的子串列邊界紀錄並「放進堆疊中」,這樣當我們處理完其中一邊的子問題時,可以取出堆疊內容來得知尚未處理的子串列邊界,以此模擬原本的遞迴呼叫

node_t

以下解釋非遞迴快速排序法的程式實作:

typedef struct __node {
-   struct __node *left, *right;
    struct __node *next;
    long value;
} node_t;

首先宣告結構體 node_t,此為串列的節點,內部有一個 value 儲存資料,一個 next 指標指向下一個節點位址,可以推測其為一個單向的鏈結串列。
(原本的結構體中還有一個 left 和一個 right 指標但後續程式中並不會使用到,因此將其移除)







struct __node


cluster __node




mode_t
mode_t



next

next



value

value



list_add

程式碼
void list_add(node_t **list, node_t *node_t)
{
    node_t->next = *list;
    *list = node_t;
}






list



list
list



node_1

node_1



list->node_1





NULL
NULL



node_2

node_2



node_1->node_2





node_2->NULL





將節點加入間接指標 list 所指向的節點前方。







list



list
list



node_new

node_new



list->node_new





NULL
NULL



node_1

node_1



node_new->node_1





node_2

node_2



node_1->node_2





node_2->NULL





list_tail

程式碼
node_t *list_tail(node_t **left)
{
    while ((*left) && (*left)->next)
        left = &((*left)->next);
    return *left;
}

從間接指標 left 所指向的節點開始往下一個節點更新 left,直到指向最後一個節點並結束迭代,回傳串列中最後一個節點的位址。

list_length

程式碼
int list_length(node_t **left)
{
    int n = 0;
    while (*left) {
        ++n;
        left = &((*left)->next);
    }
    return n;
}

方式同list_tail() ,從間接指標 left 所指的節點開始到串列結束,利用計數器計算節點數 (包含最初的節點)。

list_construct

程式碼
node_t *list_construct(node_t *list, int n)
{
    node_t *node = malloc(sizeof(node_t));
+   if (!node)
+       return NULL;
    node->next = list;
    node->value = n;
    return node;
}

建立一個新的節點並分配其記憶體空間,分配成功後將資料 n 存入 value 中,但這邊涉及記憶體分配的動作,因此加入一個判斷式確認是否分配成功,如果失敗則回傳 NULL。 (題目最後敘述有假設 malloc 總是成功,但考慮到實作需求還是加入判斷式)

list_free

程式碼
void list_free(node_t **list)
{
    node_t *node = (*list)->next;
    while (*list) {
        free(*list);
        *list = node;
        if (node)
            node = node->next;
    }
}

從間接指標 list 所指的節點開始到串列結束,釋放走訪過的節點所佔用的記憶體空間。

list_is_ordered

原本程式碼
static bool (node_t *list)
{       
    bool first = true;
    long value;
    while (list) {
        if (first) {
            value = list->value;
            first = false;
        } else {
            if (list->value < value)
                return false;
            value = list->value;
        }
        list = list->next;
    }
    return true;
}
改寫後的程式碼
static bool list_is_ordered(node_t *list)
{       
    long value = list->value;
    list = list->next;
    while (list) {
        if (list->value < value)
                return false;
        value = list->value;
        list = list->next;
    }
    return true;
}

判斷從 list 所指的節點開始到串列結束,節點是否依遞增順序排序,若成立回傳 true 反之則回傳 false,原本程式碼使用一個 bool 值來判斷 list 是否為第一個節點,但只使用一次,且在跌代時會需要處裡多餘分支,因此稍作改寫讓 value 一開始就有初始值。

shuffle

程式碼
void shuffle(int *array, size_t n)
{
    if (n <= 0)
        return;

    for (size_t i = 0; i < n - 1; i++) {
        size_t j = i + rand() / (RAND_MAX / (n - i) + 1);
        int t = array[j];
        array[j] = array[i];
        array[i] = t;
    }
}

將陣列內的元素執行洗牌的動作,n 為陣列長度,從陣列的第 1 個元素開始,與後方的隨機一個元素交換,交換完後再移動到下一個元素,隨機挑選一個剩餘元素交換,重複動作直到陣列中所有元素都交換完成。

srand(time(NULL));

但當我查看執行洗牌後的串列時,發現串列雖然有重新排序,但每次顯示的結果都相同,這是 rand() 的問題,rand() 並不是真的隨機產生亂數,而是會依照亂數種子去產生亂數,若不設定初始的亂數種子,每次洗牌結果會相同,這裡使用時間作為亂數種子,並加在 main 當中。

快速排序法實作

實作非遞迴快速排序法的方法,原理可參考 Optimized QuickSort — C Implementation (Non-Recursive),題目使用的是鏈結串列的方式實作

程式碼
void quick_sort(node_t **list)
{
    int n = list_length(list);
    int value;
    int i = 0;
    int max_level = 2 * n;
    node_t *begin[max_level], *end[max_level];
    node_t *result = NULL, *left = NULL, *right = NULL;
    
    begin[0] = *list;
    end[0] = list_tail(list);
            
    while (i >= 0) {
        node_t *L = begin[i], *R = end[i];
        if (L != R) {
            node_t *pivot = L;
            value = pivot->value;
            node_t *p = pivot->next;
            pivot->next = NULL;
    
            while (p) {
                node_t *n = p;
                p = p->next;
                list_add(n->value > value ? &right : &left, n);
            }

            begin[i] = left;
            end[i] = list_tail(left);
            begin[i + 1] = pivot;
            end[i + 1] = pivot;
            begin[i + 2] = right;
            end[i + 2] = list_tail(right);
            left = right = NULL;
            i += 2;
        } else {
            if (L)
                list_add(&result, L);
            i--;
        }
    }
    *list = result;
}

首先選擇第一個節點作為 pivot,依序對串列中的每個節點做比較,當節點的值比 pivot 的值大時,放入子串列 right 中,反之則放入 left 中。
(註:之後以 Graphviz 作圖補上)
當比較完所有節點後,begin[i] 放入 left 串列第一個值的位址,end[i] 存放的是 left 的最後一個值的位址,以紀錄 left 串列的左右邊界,begin[i+1]、end[i+1] 則放入 pivot,最後將 right 的左右邊界放入 begin[i+2]、end[i+2]。
(註:之後以 Graphviz 作圖補上)
完成以上動作後,會進入下一次迭代,從較大的子串列開始做下一次排序,重覆以上動作直到子串列為單個節點時(左右邊界相同),將節點放入串列 result 存放,再往前處理前面的子串列,這邊可以將 begin 想成是一個堆疊,紀錄尚未排序的子串列,當串列經 pivot 切割至只剩單個節點時,再從堆疊取出放入 result 中,重複以上動作,最後將排序好的串列 result 回傳。

Q2. 使用 Linux 核心風格的 List API 改寫上述程式碼,並針對鏈結串列,提出可避免最差狀況的快速排序實作,應設計效能評比的測試程式。

分析上面的程式碼後可以有以下改進點:

  • pivot 的選擇會影響排序結果,當子串列本身已是遞增或遞減時,會造成最差狀況的排序實作。
  • 分配給陣列 begin 和 end 的空間是考慮最差情況,但實際上會有許多空間是浪費的。
  • 引入 List API 去重程式內容。

首先引入 lab0 中的 list.h 並定義新結構體 node_t 如下所示:

typedef struct __node{
    long value;
    struct list_head list;
} node_t;

串列會變成環狀雙向鏈結串列,對應的操作函式改寫紀錄在 GitHub 上。







struct __node


cluster __node




mode_t
mode_t



value

value



list_head

list_head



避免最糟狀況

由於 pivot 會影響切割產生的子串列,在最佳的情況下,每次都剛好切割為相同長度的子串列,時間複雜度為

O(nlogn),在最遭的情況(串列本身已是遞增或遞減),每次切割僅僅將串列分為 0 與 n - 1(扣掉 pivot),時間複雜度為
O(n2)

如果希望減少最糟情況的產生,可以讓下一個 pivot 的選擇改為隨機選擇。我的想法是用 rand() 先隨機取一個數(範圍為待處理的子串列長度),再將串列中選擇到的節點移到串列開頭,這樣就能隨機選擇一個節點作為 pivot。
實作的程式碼如下:

void rand_pivot(struct list_head *list)
{
    int r = rand() % list_length(list);
    struct list_head *cur = list->next;
    for (int i = 0; i < r && cur->next != list; i++)
        cur = cur->next;
    list_move(cur, list);
    return;
}

再來引入 time.h 分別對資料長度10, 100, 1000, 10000, 100000 作測試,觀察最糟情況下(未洗牌)排序所需時間的差別。以下是測試結果。

時間單位:sec

資料長度 10 100 1000 10000 100000
pivot 固定 0.000021 0.000076 0.002148 0.222101 23.305
pivot 隨機 0.000037 0.000058 0.000306 0.001819 0.02685

可以觀察到加入隨機 pivot 的快速排序法,在不同資料長度下執行時間都有明顯改善,而且隨著資料長度增加,改善的情況會更加明顯。

調整 begin 的大小

對於 begin 陣列的大小的設定,我參考 vax-r 同學的作法,先觀察 begin 陣列的大小,發現原本程式在最遭的情況下(串列為遞增或遞減),至少要分配 n+1 的大小(n 為串列長度),才能正常執行,但在加入 rand_pivot 後,陣列所需長度有明顯減少。以下是加入 rand_pivot 後,各資料大小下所需 begin 陣列長度。

資料長度 10 100 1000 10000 100000
begin 最大長度 6 14 22 38 52

而應該如何決定適當的堆疊大小,vax-r 同學提出的想法是觀察 begin 長度的變化,資料長度每增加 10 倍,堆疊長度約增加 10 左右,但我認為雖然 pivot 為隨機,但在極端情況下仍有可能會超出堆疊長度。

在查閱資料時我發現另一個由 Branislav Ďurian 提出的作法 ,其想法是紀錄上次 pivot 的值,當我們完成下一次的分類時,將前次 pivot (藍色標示),移到本次 pivot (紅色標示)旁,這樣當我們處理完其中一邊的子串列時,只要將原本的 pivot 往後比較,即可正確還原邊界,也不用額外堆疊去紀錄,但目前尚未實作成功,測試程式仍以堆疊執行。
image

圖片來源

測驗 2

Q1. 解釋上述程式碼的運作原理,提出改進方案並予以實作。

閱讀改進 lib/list_sort.c ,裡面有對改良後的 Timsort 原理的相關解說與證明,目的在改善原本合併排序法的缺點,原本的合併排序法會先將資料拆分再合併,但這個做法會讓原本已經排序過的部分資料也一並拆開重排,同時排序法在合併時以廣度優先,不斷處理新資料,對快取的使用並不友善。
Timsort 認為在現實世界中,其實有許多資料是已經排序過的,因此可以將這些排序過的資料先分組,再執行合併,以此避免不必要的分割。Timsort 稱這些分組過的資料為 runs,以下方資料為例:







list



8

8



7

7



8->7





6

6



7->6





5

5



6->5





9

9



5->9





10

10



9->10





1

1



10->1





2

2



1->2





3

3



2->3





可以分為 3 個 runs







list



8

8



7

7



8->7





6

6



7->6





5

5



6->5





9

9



10

10



9->10





1

1



2

2



1->2





3

3



2->3





此時再以 run 為單位做合併,這樣就能減少不必要的分割,Timsort 在走訪串列的過程中同時分割出 runs,並將其放入堆疊中,以適當的合併策略使堆疊內的每個 run 長度不會差距過大,通過深度優先演算法來合併子串列,能減少 cache trashing 現象,提高快取的利用率。

以下為 Timsort 的程式實作:

run_size

程式碼
static inline size_t run_size(struct list_head *head)
{
    if (!head)
        return 0;
    if (!head->next)
        return 1;
    return (size_t) (head->next->prev);
}

取得 run 的大小,雖然 list_head 是雙向鏈結串列,但在實作中,會將其當作是單向鏈結串列使用,而 prev 指標用來記錄其他資訊,在每個 runs 的第一個節點的 prev 指標用來串接 stack ,而第二個節點的 prev 指標則用來儲存 runs 的大小。

merge、merge_final

程式碼
static struct list_head *merge(void *priv,
                               list_cmp_func_t cmp,
                               struct list_head *a,
                               struct list_head *b)
{
    struct list_head *head;
    struct list_head **tail = &head;

    for (;;) {
        /* if equal, take 'a' -- important for sort stability */
        if (cmp(priv, a, b) <= 0) {
            *tail = a;
            tail = &a->next;
            a = a->next;
            if (!a) {
                *tail = b;
                break;
            }
        } else {
            *tail = b;
            tail = &b->next;
            b = b->next;
            if (!b) {
                *tail = a;
                break;
            }
        }
    }
    return head;
}

執行合併排序法,比較 a、b 串列後,將較小值放入新串列,間接指標 tail 會記錄指向新串列中下一個節點的指標的位址,如果其中一個串列排序完成時,將另一個串列剩餘部分放入新串列中(假設 a、b 串列皆已經過排序)。merge_finalmerge 結構類似,差別在進行最後一次合並時,合併完成的串列要恢復成環狀雙向鍊結串列的結構。

對於 list_cmp_func_t cmp 的用法可以參考 The Linux Kernel API 裡的介紹:
The comparison function cmp must return > 0 if a should sort after b ("a > b" if you want an ascending sort), and <= 0 if a should sort before b or their original order should be preserved. It is always called with the element that came first in the input in a, and list_sort is a stable sort, so it is not necessary to distinguish the a < b and a == b cases.

程式碼
static void build_prev_link(struct list_head *head,
                            struct list_head *tail,
                            struct list_head *list)
{
    tail->next = list;
    do {
        list->prev = tail;
        tail = list;
        list = list->next;
    } while (list);

    /* The final links to make a circular doubly-linked list */
    tail->next = head;
    head->prev = tail;
}

將雙向鍊結串列重新恢復成環狀雙向鍊結串列的結構,當跌代結束時,tail 指標指向串列中最後一個節點的位置,該節點的 next 指標指向 head 的位址,head 的 prev 指標指向 tail 的位址。

find_run

程式碼
static struct pair find_run(void *priv,
                            struct list_head *list,
                            list_cmp_func_t cmp)
{
    size_t len = 1;
    struct list_head *next = list->next, *head = list;
    struct pair result;

    if (!next) {
        result.head = head, result.next = next;
        return result;
    }

    if (cmp(priv, list, next) > 0) {
        /* decending run, also reverse the list */
        struct list_head *prev = NULL;
        do {
            len++;
            list->next = prev;
            prev = list;
            list = next;
            next = list->next;
            head = list;
        } while (next && cmp(priv, list, next) > 0);
        list->next = prev;
    } else {
        do {
            len++;
            list = next;
            next = list->next;
        } while (next && cmp(priv, list, next) <= 0);
        list->next = NULL;
    }
    head->prev = NULL;
    head->next->prev = (struct list_head *) len;
    result.head = head, result.next = next;
    return result;
}

將串列分開形成 run,有兩種情況,第一種當串列為遞增,往後走訪直到出現的值比節點的值小時,將串列分割,第一個節點的 prev 指標指向 NULL,第二個節點的 prev 指標紀錄子串列長度。第二種當串列為遞減,往後走訪直到出現的值比節點的值大時,將串列分割,prev 指標處裡與第一種情況相同,但在第二種情況走訪過程中,要將串列做反轉,這樣得到的 run 皆是遞增順序的子串列。

merge_at

程式碼
static struct list_head *merge_at(void *priv,
                                  list_cmp_func_t cmp,
                                  struct list_head *at)
{
    size_t len = run_size(at) + run_size(at->prev);
    struct list_head *prev = at->prev->prev;
    struct list_head *list = merge(priv, cmp, at->prev, at);
    list->prev = prev;
    list->next->prev = (struct list_head *) len;
    --stk_size;
    return list;
}

這邊是為了實作 Timsort 中提到的,將堆疊中兩個 runs 合併,合併後堆疊長度減一,並將合併完成的 run 紀錄在原本兩個 runs 中較底層堆疊的位置。

merge_force_collapse

程式碼
static struct list_head *merge_force_collapse(void *priv,
                                              list_cmp_func_t cmp,
                                              struct list_head *tp)
{
    while (stk_size >= 3) {
        if (run_size(tp->prev->prev) < run_size(tp)) {
            tp->prev = merge_at(priv, cmp, tp->prev);
        } else {
            tp = merge_at(priv, cmp, tp);
        }
    }
    return tp;
}

Timsort 中平衡各個 runs 長度的方法。為了不讓各個 runs 長度差距過大,merge_force_collapse 會觀察,當堆疊中最上層的 A run 長度大於第三層的 C run 時,將 B run 與 C run 合併,反之則將 A run 與 B run 合併。

merge_collapse

程式碼
static struct list_head *merge_collapse(void *priv,
                                        list_cmp_func_t cmp,
                                        struct list_head *tp)
{
    int n;
    while ((n = stk_size) >= 2) {
        if ((n >= 3 &&
             run_size(tp->prev->prev) <= run_size(tp->prev) + run_size(tp)) ||
            (n >= 4 && run_size(tp->prev->prev->prev) <=
                           run_size(tp->prev->prev) + run_size(tp->prev))) {
            if (run_size(tp->prev->prev) < run_size(tp)) {
                tp->prev = merge_at(priv, cmp, tp->prev);
            } else {
                tp = merge_at(priv, cmp, tp);
            }
        } else if (run_size(tp->prev) <= run_size(tp)) {
            tp = merge_at(priv, cmp, tp);
        } else {
            break;
        }
    }

    return tp;
}

對整個堆疊操作,當 A run 加 B run 長度大於等於 C run 長度或 B run 加 C run 長度大於等於 D run 長度時,用 merge_force_collapse 的方式處理 A、B、C run。A run 長度大於 B run 時,將 A run 與 B run 合併。重複動作直到堆疊中剩下兩個 runs 為止。

Timsort

程式碼
void timsort(void *priv, struct list_head *head, list_cmp_func_t cmp)
{
    stk_size = 0;

    struct list_head *list = head->next, *tp = NULL;
    if (head == head->prev)
        return;

    /* Convert to a null-terminated singly-linked list. */
    head->prev->next = NULL;

    do {
        /* Find next run */
        struct pair result = find_run(priv, list, cmp);
        result.head->prev = tp;
        tp = result.head;
        list = result.next;
        stk_size++;
        tp = merge_collapse(priv, cmp, tp);
    } while (list);

    /* End of input; merge together all the runs. */
    tp = merge_force_collapse(priv, cmp, tp);

    /* The final merge; rebuild prev links */
    struct list_head *stk0 = tp, *stk1 = stk0->prev;
    while (stk1 && stk1->prev)
        stk0 = stk0->prev, stk1 = stk1->prev;
    if (stk_size <= 1) {
        build_prev_link(head, head, stk0);
        return;
    }
    merge_final(priv, cmp, head, stk1, stk0);
}

首先將原串列變為雙向鍊結串列(解開環狀結構),再讓指標 list 指向串列中第一個節點,tp 作為堆疊指標會指向目前堆疊中最上層的位址。接著對串列做分割並將分割好的 run 放入堆疊中,過程中以 merge_collapse 檢查 runs 長度是否有平衡。完成分割後再以 merge_force_collapse 對整個堆疊做合併。最後用 merge_final 將堆疊中剩餘的兩個 runs 合併調整,恢復為環狀雙向鍊結串列即可完成排序(如果堆疊中僅剩下一個 run 則直接將 stk0 恢復成環狀雙向鍊結結構)。

建立符合 Timsort 的資料分布

static void create_sample(struct list_head *head, element_t *space, int samples)
{
    printf("Creating sample\n");
-   for (int i = 0; i < samples; i++) {
+   for (int i = 0; i < samples;) {
+       srand(time(NULL));
+       int num = rand() % 15;
+       if(num % 2) {
+           for(; num >= 0 && i < sample; i++, num--) {
+               element_t *elem = space +i;
+               elem->val = num;
+               list_add_tail(&elem->list, head);
+           }
+       } else {
+           for(int t = 1; t == num || i < sample; i++, t++) {
+               element_t *elem = space +i;
+               elem->val = t;
+               list_add_tail(&elem->list, head);
+           }           
+       }
-       element_t *elem = space + i;                
-       elem->val = rand();
-       list_add_tail(&elem->list, head);
    }
}

更改原本的 create_sample 內容,Timsort 假設的資料分布是部份已經過排序,我的想法是使用 rand 產生一段長度小於 15 的遞增或遞減排序資料,不斷產生資料直到滿足 SAMPLE 所需大小,亂數種子使用時間來產生。

以下比較兩種串列執行 Timsort 所花的時間與 Comparisons 大小,資料長度分別是100, 1000, 10000, 100000 作測試:
執行時間(單位 sec):

資料長度 100 1000 10000 100000
原本的串列 0.000056 0.000357 0.002855 0.047056
符合資料分布的串列 0.000030 0.000174 0.001636 0.040475

comparison:

資料長度 100 1000 10000 100000
原本的串列 622 9749 130619 1641601
符合資料分布的串列 453 7847 111637 1449663

可以看到 Timsort 在處理現實情況的資料(部份經過排序),會比處理完全隨機的資料還要快速。

加入 galloping mode

Q2. 將改進過的 Timsort 實作整合進 sysprog21/lab0-c,作為另一個有效的排序命令,應設計效能評比的測試程式,確保測試案例符合 Timsort 設想的資料分布樣式。

Quiz2

測驗 1

Q1. 解釋上述程式碼的運作,撰寫完整的測試程式,指出其中可改進之處並實作

hlist_add_head

程式碼
static inline void hlist_add_head(struct hlist_node *n, struct hlist_head *h)
{
    if (h->first)
        h->first->pprev = &n->next;
    n->next = first;
    n->pprev = &h->first;
    h->first = n;
}

在雜湊串列 h 中加入一個新節點,應符合以下結構:







struct hlist



hlist_head

hlist_head

first



hlist_node1

hlist_node1

pprev

next




hlist_head:n->hlist_node1:m





hlist_node1:p->hlist_head:n





hlist_node2

hlist_node2

pprev

next




hlist_node1:n->hlist_node2:m





hlist_node2:p->hlist_node1:n





NULL
NULL



hlist_node2:n->NULL





find

程式碼
static int find(int num, int size, const struct hlist_head *heads)
{
    struct hlist_node *p;
    int hash = (num < 0 ? -num : num) % size;
    hlist_for_each (p, &heads[hash]) {
        struct order_node *on = list_entry(p, struct order_node, node);
        if (num == on->val)
            return on->idx;
    }
    return -1;
}

先計算目標點在雜湊陣列中的位置,再走訪該位置的雜湊串列,尋找與 num 值相同的節點,如果發現該節點時,回傳該節點的索引 idx 值,如果走訪完畢後仍未發現相符的節點,則回傳 -1。

dfs

程式碼
static struct TreeNode *dfs(int *preorder,
                            int pre_low,
                            int pre_high,
                            int *inorder,
                            int in_low,
                            int in_high,
                            struct hlist_head *in_heads,
                            int size)
{
    if (in_low > in_high || pre_low > pre_high)
        return NULL;

    struct TreeNode *tn = malloc(sizeof(*tn));
    tn->val = preorder[pre_low];
    int idx = find(preorder[pre_low], size, in_heads);
    tn->left = dfs(preorder, pre_low + 1, pre_low + (idx - in_low), inorder,
                   in_low, idx - 1, in_heads, size);
    tn->right = dfs(preorder, pre_high - (in_high - idx - 1), pre_high, inorder,
                    idx + 1, in_high, in_heads, size);
    return tn;
}

以遞回的方式搜尋 preorder 陣列與 inorder 陣列,pre_low、pre_high、in_low、in_high 分別記錄目前處理陣列的範圍,in_head 為 inorder 的雜湊陣列,用於快速找尋節點在 inorder 陣列中的位置,選擇 preorder 中的第一個元素作為 root,取得索引值後,再從 preorder 中找到左子樹的 root,重複搜尋的步驟直到在 inorder 中找不到對應元素即可回傳 NULL,接者再回 root 處理右子樹。

node_add

程式碼
static inline void node_add(int val,
                            int idx,
                            int size,
                            struct hlist_head *heads)
{
    struct order_node *on = malloc(sizeof(*on));
    on->val = val;
    on->idx = idx;
    int hash = (val < 0 ? -val : val) % size;
    hlist_add_head(&on->node, &heads[hash]);
}

在雜湊陣列中加入一個節點,作法同 find,先計算該節點在雜湊陣列的位置,再將節點放入該位置的雜湊串列中。

程式碼
static struct TreeNode *buildTree(int *preorder,
                                  int preorderSize,
                                  int *inorder,
                                  int inorderSize)
{
    struct hlist_head *in_heads = malloc(inorderSize * sizeof(*in_heads));
    for (int i = 0; i < inorderSize; i++)
        INIT_HLIST_HEAD(&in_heads[i]);
    for (int i = 0; i < inorderSize; i++)
        node_add(inorder[i], i, inorderSize, in_heads);

    return dfs(preorder, 0, preorderSize - 1, inorder, 0, inorderSize - 1,
               in_heads, inorderSize);
}

建立二元樹,先配置雜湊陣列所需的記憶體空間,再初始化雜湊陣列中的每個元素,之後建立 inorder 的雜湊陣列以便後續查找節點,使用 dfs 開始建構二元樹。

Q2. 在 Linux 核心的 cgroups 相關原始程式碼,找出 pre-order walk 程式碼並探討

參考這篇文章的介紹cgroups 的全名是 Control Group,在 Linux Kernel 中的功能是限制、控制與隔離 Process 所使用到的系統資源,例如 CPU、Memory、Disk I/O 等。
linux/kernel/cgroup/cgroup.c 可以找到 css_next_descendant_pre 與 pre-order walk 有關。

struct cgroup_subsys_state *
css_next_descendant_pre(struct cgroup_subsys_state *pos,
			struct cgroup_subsys_state *root)

從一開始的函式宣告可以發現,函式接受兩個引數,pos 指標用來紀錄目前走訪到的位址,預設為 NULL,root 指標指向開始走訪的根節點,執行完成後會回傳結構體 cgroup_subsys_state 的指標,和結構體 cgroup_subsys_state 相關的定義可以在 linux/include/linux/cgroup-defs.h 中找到,主要是存放與子系統狀態相關的資料。

struct cgroup_subsys_state *next;

cgroup_assert_mutex_or_rcu_locked();

/* if first iteration, visit @root */
if (!pos)
    return root;

/* visit the first child if exists */
    next = css_next_child(NULL, pos);
    if (next)
    return next;

先判斷 pos 指標是否有初值,沒有的話將其設為 root。

css_next_child
struct cgroup_subsys_state *css_next_child(struct cgroup_subsys_state *pos,
					   struct cgroup_subsys_state *parent)
{
	struct cgroup_subsys_state *next;

	cgroup_assert_mutex_or_rcu_locked();

	if (!pos) {
		next = list_entry_rcu(parent->children.next, struct cgroup_subsys_state, sibling);
	} else if (likely(!(pos->flags & CSS_RELEASED))) {
		next = list_entry_rcu(pos->sibling.next, struct cgroup_subsys_state, sibling);
	} else {
		list_for_each_entry_rcu(next, &parent->children, sibling,
					lockdep_is_held(&cgroup_mutex))
			if (next->serial_nr > pos->serial_nr)
				break;
	}

	/*
	 * @next, if not pointing to the head, can be dereferenced and is
	 * the next sibling.
	 */
	if (&next->sibling != &parent->children)
		return next;
	return NULL;
}

使用 css_next_child 找下一個子節點,目前的 pos 設為親代節點,如果有發現下一個子節點就將結果回傳。

/* no child, visit my or the closest ancestor's next sibling */
while (pos != root) {
    next = css_next_child(pos, pos->parent);
    if (next)
        return next;
        pos = pos->parent;
    }

    return NULL;
}

如果找不到 pos 的子節點,則回到上一層的親節點,再一次使用 css_next_child 去找 pos 的平輩節點,如果仍找不到節點,則回傳 NULL

測驗 2

Q1. 解釋上述程式碼的運作,撰寫完整的測試程式,指出其中可改進之處並實作

LRUCache、LRUNode

typedef struct {
    int capacity;
    int count;
    struct list_head dhead;
    struct hlist_head hhead[~~]~~;
} LRUCache;

typedef struct {
    int key;
    int value;
    struct hlist_node node;
    struct list_head link;
} LRUNode;

首先定義兩個結構體

hlist_del

程式碼
void hlist_del(struct hlist_node *n)
{
    struct hlist_node *next = n->next, **pprev = n->pprev;
    *pprev = next;
    if (next)
        next->pprev = pprev;
}

從雜湊串列中移除一個節點,結構與測驗 1 中提到的雜湊串列相同。

lRUCacheCreate

程式碼
LRUCache *lRUCacheCreate(int capacity)
{
    LRUCache *cache = malloc(2 * sizeof(int) + sizeof(struct list_head) +
                             capacity * sizeof(struct list_head));
    cache->capacity = capacity;
    cache->count = 0;
    INIT_LIST_HEAD(&cache->dhead);
    for (int i = 0; i < capacity; i++)
        INIT_HLIST_HEAD(&cache->hhead[i]);
    return cache;
}

建立 LRUCache,引數 capacity 是預計要建立的快取大小,分配相對應的記憶體大小與初始化雜湊陣列與雜湊串列。

lRUCacheFree

程式碼
void lRUCacheFree(LRUCache *obj)
{
    struct list_head *pos, *n;
    list_for_each_safe (pos, n, &obj->dhead) {
        LRUNode *cache = list_entry(pos, LRUNode, link);
        list_del(&cache->link);
        free(cache);
    }
    free(obj);
}

釋放 LRUCache 所使用的記憶體空間,pos 會走訪快取串列中的所有節點,使用 list_entry 找到對應的 LRUNode,將該節點移除後再釋放其記憶體空間,完成串列走訪後最後釋放 obj 占用的記憶體空間。

lRUCacheGet

程式碼
int lRUCacheGet(LRUCache *obj, int key)
{
    int hash = key % obj->capacity;
    struct hlist_node *pos;
    hlist_for_each (pos, &obj->hhead[hash]) {
        LRUNode *cache = list_entry(pos, LRUNode, node);
        if (cache->key == key) {
            list_move(&cache->link, &obj->dhead);
            return cache->value;
        }
    }
    return -1;
}

在快取 obj 中查詢是否有對應 key 的值,先取得雜湊值,再從的雜湊陣列中找到相應的雜湊串列,走訪該串列,如果發現有與 key 相符的值,回傳該值並將 LRUNode 移到 dhead 的開頭,代表該快取值剛被使用過。

lRUCachePut

程式碼
void lRUCachePut(LRUCache *obj, int key, int value)
{
    LRUNode *cache = NULL;
    int hash = key % obj->capacity;
    struct hlist_node *pos;
    hlist_for_each (pos, &obj->hhead[hash]) {
        LRUNode *c = list_entry(pos, LRUNode, node);
        if (c->key == key) {
            list_move(&c->link, &obj->dhead);
            cache = c;
        }
    }

    if (!cache) {
        if (obj->count == obj->capacity) {
            cache = list_last_entry(&obj->dhead, LRUNode, link);
            list_move(&cache->link, &obj->dhead);
            hlist_del(&cache->node);
            hlist_add_head(&cache->node, &obj->hhead[hash]);
        } else {
            cache = malloc(sizeof(LRUNode));
            hlist_add_head(&cache->node, &obj->hhead[hash]);
            list_add(&cache->link, &obj->dhead);
            obj->count++;
        }
        cache->key = key;
    }
    cache->value = value;
}

將一個新的值放入快取中,首先要確認快取中是否已存在相同 key 的節點,使用 key 找到雜湊值,再走訪雜湊值對應的雜湊串列,如果有找到相同的 key 值,將其移動到 dhead 的開頭,代表該快取值剛使用過,並將 c 賦值給 cache。
如果在快取中沒有存在相同 key 的節點,要將新節點加入快取中,加入前要先判斷目前的快取容量是否已滿,如果滿了則將 dhead 中最後一個節點(放置最久未使用的節點)移除,並釋放其記憶體空間,接者再將節點存入對應的雜湊陣列中並賦值。如果快取容量尚未填滿,則直接分配新的記憶體空間,再加入新的節點。

Q2. 在 Linux 核心找出 LRU 相關程式碼並探討

linux/fs/btrfs/lru_cache.clinux/fs/btrfs/lru_cache.h 有提到對 lru_cache 的操作。

btrfs_lru_cache

struct btrfs_lru_cache {
	struct list_head lru_list;
	struct maple_tree entries;
	/* Number of entries stored in the cache. */
	unsigned int size;
	/* Maximum number of entries the cache can have. */
	unsigned int max_size;
};

一開始先觀察定義的結構體 btrfs_lru_cache ,其中 size 紀錄目前快取的大小,max_size 則是快取的最大容量,透過 lru_list 串列管理快取的內容,這理出現一個新的結構體 maple_tree,在老師整理的另一份教材Linux 核心的紅黑樹中有提到,這是近年 Linux Kernel 開始使用的資料結構,特色是能減少走訪樹狀結構時發生的 cache misses。

Maple trees (the name might refer to the shape of the maple leaf, leading in multiple directions) differ in important aspects from rbtrees. They belong to the B-tree family, so their nodes can contain more than two elements — up to 16 elements in leaf nodes, or ten elements in internal nodes. The use of B-trees also means that there is less need to create new nodes, as nodes may include empty slots that can be filled over time without additional allocations. Each node requires at most 256 bytes, which is a multiple of popular cache line sizes. The increased number of elements in a node and the cache-aligned size means fewer cache misses when traversing the tree.

出處:Introducing maple trees

btrfs_lru_cache_entry

struct btrfs_lru_cache_entry {
	struct list_head lru_list;
    
    u64 key;

	u64 gen;

	struct list_head list;
};

btrfs_lru_cache_entry 內部除了 key 之外,還多了一個 gen,根據程式說明,有可能使用不同的 generation 卻產生相同的 key,於是加入 gen 去區分這些相同的 key。如果不需使用,可以將其設為 0。

btrfs_lru_cache_init

程式碼
void btrfs_lru_cache_init(struct btrfs_lru_cache *cache, unsigned int max_size)
{
	INIT_LIST_HEAD(&cache->lru_list);
	mt_init(&cache->entries);
	cache->size = 0;
	cache->max_size = max_size;
}

初始化快取的內容。

match_entry

static struct btrfs_lru_cache_entry *match_entry(struct list_head *head, u64 key,
						 u64 gen)
{
	struct btrfs_lru_cache_entry *entry;

	list_for_each_entry(entry, head, list) {
		if (entry->key == key && entry->gen == gen)
			return entry;
	}

	return NULL;
}

走訪串列,找尋是否有相符合的 entry,如果沒有則返回 NULL

btrfs_lru_cache_lookup

struct btrfs_lru_cache_entry *btrfs_lru_cache_lookup(struct btrfs_lru_cache *cache,
						     u64 key, u64 gen)
{
	struct list_head *head;
	struct btrfs_lru_cache_entry *entry;

	head = mtree_load(&cache->entries, key);
	if (!head)
		return NULL;

	entry = match_entry(head, key, gen);
	if (entry)
		list_move_tail(&entry->lru_list, &cache->lru_list);

	return entry;
}

這邊出現一個新的函式 mtree_load,函式定義可以在 linux/lib/maple_tree.c 中找到,功能是在 maple tree 中找到對應 key 的值。接著從回傳的串列中找尋對相符的 entry,如果找到的話,將其加入快取的 lru_list 中。

btrfs_lru_cache_remove

void btrfs_lru_cache_remove(struct btrfs_lru_cache *cache,
			    struct btrfs_lru_cache_entry *entry)
{
	struct list_head *prev = entry->list.prev;

	ASSERT(cache->size > 0);
	ASSERT(!mtree_empty(&cache->entries));

	list_del(&entry->list);
	list_del(&entry->lru_list);

	if (list_empty(prev)) {
		struct list_head *head;
		head = mtree_erase(&cache->entries, entry->key);
		ASSERT(head == prev);
		kfree(head);
	}

	kfree(entry);
	cache->size--;
}

測驗 3

Q1. 解釋上述程式碼的運作原理

Q2. 在 Linux 核心原始程式碼找出 find_nth_bit 的應用案例,並予以解說和分析,需要涵蓋 CPU affinity 相關的原始程式碼。