Try   HackMD

Linux 核心專題: 改進 lab0

執行人: andy155224

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
提問清單

  • ?

任務簡述

重做 lab0 並彙整其他學員的成果。

TODO: 達成作業所有要求

開發環境

$ gcc --version
gcc (Ubuntu 11.3.0-1ubuntu1~22.04.1) 11.3.0
Copyright (C) 2021 Free Software Foundation, Inc.
This is free software; see the source for copying conditions.  There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.

$ lscpu
Architecture:            x86_64
  CPU op-mode(s):        32-bit, 64-bit
  Address sizes:         46 bits physical, 48 bits virtual
  Byte Order:            Little Endian
CPU(s):                  24
  On-line CPU(s) list:   0-23
Vendor ID:               GenuineIntel
  Model name:            13th Gen Intel(R) Core(TM) i7-13700
    CPU family:          6
    Model:               183
    Thread(s) per core:  2
    Core(s) per socket:  16
    Socket(s):           1
    Stepping:            1
    CPU max MHz:         5200.0000
    CPU min MHz:         800.0000
    BogoMIPS:            4224.00

開發過程

q_new

宣告一個結構體 list_head 的指標 head ,然後透過 list.h 中的 INIT_LIST_HEAD() 來對 head 配置記憶體。

struct list_head *q_new()
{
    struct list_head *head = malloc(sizeof(struct list_head));
    if (head) {
        INIT_LIST_HEAD(head);
        return head;
    }
    return NULL;
}

q_free

透過 list_for_each_entry_safe 逐一走訪 queue 中的每一個 element,並透過 container_of 計算每一個 element 的記憶體起始位置,計算出位置後再用 q_release_element 將整個結構體 element 所配置的記憶體空間,包含結構體內的其他結構體和指標變數所指向的記憶體一同釋放,最後再將 queue 的 head l 釋放掉,以保證整個 queue 的記憶體空間都被釋放。

void q_free(struct list_head *l)
{
    if (!l)
        return;
    element_t *pos, *next;
    list_for_each_entry_safe (pos, next, l, list) {
        q_release_element(container_of(&pos->list, element_t, list));
    }
    free(l);
}

q_insert_head

原本的寫法如下:

bool q_insert_head(struct list_head *head, char *s)
{
    if (!head || !s)
        return false;
    element_t *n = malloc(1 * sizeof(element_t));
    char *value = malloc((strlen(s) + 1) * sizeof(char));
    if (!n || !value)
        return false;
    memcpy(value, s, strlen(s) + 1);
    n->value = value;
    list_add(&n->list, head);
    return true;
}

qwe661234 點出上述程式碼在 element_t *n 成功配置,但 char *value 配置失敗時,會導致配置成功的 element_t *n 之記憶體沒有被釋放。所以修改為:

bool q_insert_head(struct list_head *head, char *s)
{
    if (!head || !s)
        return false;
    element_t *n = malloc(1 * sizeof(element_t));
+    if (!n)
+        return false;
    char *value = malloc((strlen(s) + 1) * sizeof(char));
-    if (!n || !value)
+    if (!value) {
+        free(n);
        return false;
+    }
    memcpy(value, s, strlen(s) + 1);
    n->value = value;
    list_add(&n->list, head);
}

q_insert_tail

函式運作的邏輯和 q_insert_head 只差在將 list_add() 改為 list_add_tail()

q_remove_head

strncpy, snprintfstrlcpy 的差異。因為 strcpy 會存在潛在的 buffer overflow 的問題,所以應該用 strncpy 來限制寫入的位元組的大小。但是 strncpy 也並非是安全的,因為有可能會發生 dest 字串的結尾並沒有 null terminator 的問題,所以有了更安全的 strlcpy 能選擇。strlcpy 如果遇到寫入的位元組大小比 src 的位元組大小還要小的時候,會在欲寫入的位元組大小的最後一個位元填入 null terminator 。但是因為這需要安裝額外的 package 才能 include bsd/string.h,所以我選擇使用 snprintf 來達到一樣的效果。

element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize)
{
    if (!head || list_empty(head))
        return NULL;
    element_t *tmp = list_first_entry(head, element_t, list);
    list_del_init(&tmp->list);
    if (sp)
        snprintf(sp, bufsize, "%s", tmp->value);
    return tmp;
}

q_remove_tail

函式運作的邏輯和 q_insert_head 只差在將 list_first_entry() 改為 list_last_entry()

q_size

透過 list_for_each() 逐一走訪 queue 中的每個 element 並用 int len 來計數。

int q_size(struct list_head *head)
{
    if (!head)
        return 0;
    int len = 0;
    struct list_head *li;
    list_for_each (li, head)
        len++;
    return len;
}

q_delete_mid

透過一個型別為 list_head 的指標變數 tmp 和一個布林變數 flag,在逐一走訪 queue 中的每一個 entry 時,每當 flag 為 true 時將 tmp 指向其下一個 entry 的 list。這樣當走訪完整個 queuetmp 就會指向 queue 的第

n/2 個 entry,然後再將其刪除。

bool q_delete_mid(struct list_head *head)
{
    if (!head || list_empty(head))
        return false;
    bool flag = true;
    element_t *pos;
    struct list_head *tmp = head;
    list_for_each_entry (pos, head, list) {
        if (flag)
            tmp = tmp->next;
        flag = !flag;
    }
    element_t *mid = list_entry(tmp, element_t, list);
    list_del_init(&mid->list);
    q_release_element(mid);
    return true;
}

但因為 flag 會反覆在 0 和 1 之間變化,造成以下程式碼在

if (flag)
    tmp = tmp->next;

不做任何處理的情況下會產生

queue size/2 次 branch。

要減少 branch 次數的其中一種做法是透過快慢指標來找尋中間節點,但因為 lab0 所使用的資料結構為 doubly linked-list,所以可以考慮到雙向的特性,用更少次的操作找到中間節點,如下:

bool q_delete_mid(struct list_head *head)
{
    if (!head || list_empty(head))
        return false;
    struct list_head *right, *left;
    right = head->next;
    left = head->prev;

    while (right != left && left->prev != right) {
        right = right->next;
        left = left->prev;
    }

    list_del(left);
    q_release_element(list_entry(left, element_t, list));

    return true;
}

透過兩個指標 rightleft,每次迭代時 right 會指向 right->nextleft 會指向 left->prev,這樣一旦 right == left 或是 left->prev == right,此時的 left 即為中間節點。藉由這種做法就能在比使用快慢指標還少的操作下同時也有較少的 branch 產生。

q_delete_dup

透過 list_for_each_entry_safe 來逐一走訪整個 queue,每次迭代時會比較 curr->valuenext->value 是否相同,如果相同的話將 curr 刪除並釋放記憶體空間,並把布林變數 flag 設為 true 以在 curr-> valuenext->value 不同時將 curr 刪除並釋放記憶體空間。

bool q_delete_dup(struct list_head *head)
{
    if (!head || list_empty(head))
        return false;

    bool flag = false;
    element_t *curr, *next;

    list_for_each_entry_safe (curr, next, head, list) {
        if (&next->list != head && strcmp(curr->value, next->value) == 0) {
            flag = true;
            list_del(&curr->list);
            q_release_element(curr);
        } else if (flag) {
            flag = false;
            list_del(&curr->list);
            q_release_element(curr);
        }
    }

    return true;
}

q_swap

透過 list_for_each() 逐一走訪整個 queue,每當布林變數 flag 為 true 時用 list_move()node 移至 tmp 前,最後再將 nodetmp 指向 node->next 以確保走訪的正確性。

void q_swap(struct list_head *head)
{
    if (!head || list_empty(head))
        return;

    bool flag = false;
    struct list_head *node, *tmp = head;

    list_for_each (node, head) {
        if (flag == true) {
            list_move(node, tmp);
            node = node->next;
            tmp = node;
        }
        flag = !flag;
    }
}

q_reverse

透過 list_for_each_safe() 逐一走訪整個 queue 並將走訪到的元素用 list_move() 移至 head 後面,進而完成 reverse 整個 queue。

void q_reverse(struct list_head *head)
{
    if (!head || list_empty(head))
        return;

    struct list_head *node, *safe;

    list_for_each_safe (node, safe, head) {
        list_move(node, head);
    }
}

q_reverseK

原本的想法是透過 list_for_each_safe() 逐一走訪整個 queue,同時透過 int countK 來計數,走訪時會將元素 node 移至 tmp 後面,每當 countK mod k == 0 時調整 tmp 的位置來完成功能。

void q_reverseK(struct list_head *head, int k)
{
    if (!head || list_empty(head))
        return;

    int countK = 0;
    struct list_head *node, *safe, *tmp = head;

    list_for_each_safe (node, safe, head) {
        countK++;
        list_move(node, tmp);
        if (countK % k == 0) {
            tmp = safe->prev;
        }
    }
}

但是實際的效果和預期的不同,因為這和功能所述的 Reverse the nodes of the list k at a time,不一致,上述程式碼是每次都會執行 list_move 而非當 countK mod k == 0 時才將前面 k 個元素 reverse,差異如下:

l = [1 2 3 4 5]
cmd> reverseK 3
l = [3 2 1 5 4] <-- 執行結果
l = [3 2 1 4 5] <-- 預期結果

而我想到的解決方式是額外宣告一個變數 int count 並將其設為 q_size / k,也就是總共會發生的 reverse 次數。這樣就能透過 count 來避免額外的 reverse 發生,只有在 count > 0 時才會執行 list_move()

void q_reverseK(struct list_head *head, int k)
{
    if (!head || list_empty(head))
        return;

    int countK = 0;
+    int count = q_size(head) / k;
    struct list_head *node, *safe, *tmp = head;

    list_for_each_safe (node, safe, head) {
        countK++;
+        if (count)
            list_move(node, tmp);
        if (countK % k == 0) {
            tmp = safe->prev;
+            count--;
        }
    }
}

q_sort

參考了 你所不知道的 C 語言: linked list 和非連續記憶體 ,思路是透過 Divide and Conquer 先遞迴將佇列拆分成 q_size 個子佇列,再兩兩排序合併,最後就會得到一個已排序的佇列。

void mergeTwoLists(struct list_head *L1, struct list_head *L2)
{
    if (!L1 || !L2)
        return;

    struct list_head *Lnode = L1->next, *Rnode = L2->next;

    while (Lnode != L1 && Rnode != L2) {
        if (strcmp(list_entry(Lnode, element_t, list)->value,
                   list_entry(Rnode, element_t, list)->value) > 0) {
            struct list_head *tmp = Rnode->next;
            list_move_tail(Rnode, Lnode);
            Rnode = tmp;
        } else {
            Lnode = Lnode->next;
        }
    }

    if (q_size(L2) != 0) {
        list_splice_tail(L2, L1);
    }
}

/* Sort elements of queue in ascending order */
void q_sort(struct list_head *head)
{
    if (!head || list_empty(head) || q_size(head) == 1)
        return;

    struct list_head *tmp, *mid;
    tmp = head->next;
    mid = head->prev;

    while (tmp != mid && mid->prev != tmp) {
        tmp = tmp->next;
        mid = mid->prev;
    }

    LIST_HEAD(right);
    list_cut_position(&right, head, mid->prev);

    q_sort(head);
    q_sort(&right);
    mergeTwoLists(head, &right);
}

q_descend

因為 lab0 所使用的資料結構為 doubly linked-list,所以只需要從 queue 的尾部開始逐一反向走訪,並且不斷去比較 struct list_head *curr 對應的 element 的 value 是否大於 struct list_head *curr->prev 對應的 element 的 value,如果大於的話則將 struct list_head *curr->prev 對應的 element 刪除即可。

int q_descend(struct list_head *head)
{
    if (!head || list_empty(head))
        return 0;

    struct list_head *curr = head->prev;
    while (curr->prev != head) {
        if (strcmp(list_entry(curr, element_t, list)->value,
                   list_entry(curr->prev, element_t, list)->value) > 0) {
            struct list_head *tmp = curr->prev;
            list_del(tmp);
            q_release_element(list_entry(tmp, element_t, list));
        } else {
            curr = curr->prev;
        }
    }

    return q_size(head);
}

q_merge

先透過 container_of 找出連接著各個 queue 的結構體 queue_contex_t *queue_head 的記憶體位置,然後逐一走訪相連著的 queue 並透過 mergeTwoLists() 將第二個至最後一個 queue 合併進第一個 queue 中。因為每一個 queue 預設都已排序,所以 mergeTwoLists() 完就會是一個已排序的結果。

之所以不先將所有 queue 都透過 list_splice_init 合併後再用 q_sort() 合併是因為每一個 queue 都已排序,若再執行 q_sort() 中 divide 顯而易見是多餘的。

list.h 中的 list_splice_init() 有提到 "The @list head will not end up in an uninitialized state like when using list_splice. Instead the @list is initialized again to the an empty list/unlinked state.",所以在迴圈中每當合併完兩個 queue 都需要 INIT_LIST_HEAD(queue->q)

int q_merge(struct list_head *head)
{
    if (!head)
        return 0;

    queue_contex_t *queue_head =
        container_of(head->next, queue_contex_t, chain);

    for (struct list_head *curr = head->next->next; curr != head;
         curr = curr->next) {
        queue_contex_t *queue = container_of(curr, queue_contex_t, chain);
        mergeTwoLists(queue_head->q, queue->q);
        INIT_LIST_HEAD(queue->q);
        queue->size = 0;
    }

    return q_size(queue_head->q);
}

引入 lib/list_sort.clab0-c

list_sort.clist_sort.h 放到 lab0 的專案下並修改 Makefile 使得在 make 時會編譯 list_sort.c

OBJS := qtest.o report.o console.o harness.o queue.o \
        random.o dudect/constant.o dudect/fixture.o dudect/ttest.o \
        shannon_entropy.o \
        linenoise.o web.o \
+        list_sort.o

list_sort.c 中有使用到 likely()unlikely,這是在 linux/compiler.h 裡的巨集,顧將這兩個巨集的定義搬移至 lab0 專案中的 list_sort.c 中。

#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)

修改 list_sort.clist_sort.h,調整函式的參數並修改一些型別定義使得 list_sort.c 能在 lab0 中運作。

  • 將各個函式中的參數 void *priv 移除
  • 調整各個函式的 nonnull()
  • list_sort.c 中定義的型別 u8 改成 uint8_t
  • merge() 中針對 struct list_head *head 去抑制 cppcheck 的檢查,因為 head 不會是一個 unassigned variable

list_sort.c 中的 cmp() 都改成使用 strcmp() 來比較字串大小,並抑制 cppcheck 的 nullPointer 檢查。

// cppcheck-suppress nullPointer
        if (strcmp(list_entry(a, element_t, list)->value,
                   // cppcheck-suppress nullPointer
                   list_entry(b, element_t, list)->value) <= 0)

console.c 中新增 parameter algo

藉由全域變數 algo 來選擇排序演算法要使用 q_sort() 還是 list_sort()

int algo = 0;

透過 add_param() 新增有關 algo 的說明,並且這樣就能使用 option algo 來變更 algo 的值,以此決定要使用哪一個排序演算法。

add_param("algo", &algo, "Select sort algorithm, q_sort = 0, list_sort = 1", NULL);

修改 do_sort

讓 do_sort() 可以根據 algo 的數值來決定要使用哪一個排序演算法。

-    if (current && exception_setup(true))
+    if (current && exception_setup(true)) {
+        if (algo == 0)
            q_sort(current->q);
+        else
+            list_sort(current->q);
+    }

比較自己實作的 q_sort() 和 linux kernel 中的 list_sort() 之間的效能落差

新增 trace-sort_algorithm.cmd 來測試自己實作的 q_sort()list_sort() 各別在排序 10000、500000 和 1000000 個節點時排序所需要的時間。在這個實驗中會執行十次 trace-sort_algorithm.cmd,並統計在不同節點數量的情況下的平均排序時間各別為何。

option verbose 1
option algo 0
new
ih RAND 100000
time sort
free
option algo 1
new
ih RAND 100000
time sort
free

option algo 0
new
ih RAND 500000
time sort
free
option algo 1
new
ih RAND 500000
time sort
free

option algo 0
new
ih RAND 1000000
time sort
free
option algo 1
new
ih RAND 1000000
time sort
free
節點數量 q_sort 平均排序時間 list_sort 平均排序時間
100000 0.0226(s) 0.0213(s)
5000000 0.236(s) 0.2548(s)
1000000 0.7739(s) 0.6448(s)

從十次測試的平均中可以看到,在節點數為 100000 時,list_sort() 優於 q_sort() 6%,在節點數為 500000 時,q_sort() 優於 list_sort() 7.4%,而在節點數為 1000000 時,list_sort() 優於 q_sort() 17%。

需要解讀效能落差的成因,以及你能否從中學習到其精髓。

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
jserv

TODO: 研讀 Linux 的 lib/list_sort.c 並量化分析

解讀 Linux 核心原始程式碼的 lib/list_sort.c,設計實驗來量化分析,探討其實作手法,需要善用 perf 一類的工具。解析程式碼要有完整的圖文、數學證明,和如何達到 cache 友善。

Bottom up V.S. Top down

首先先來分析自己實作的 q_sort() 和 linux kernel 中的 list_sort() 到底有何不同。

q_sort 採取的是 merge sort,也就是先不斷遞迴分割,直到每一個子佇列都只有一個節點後後再兩兩合併成一個排序過的佇列,而會對 cache 不友善的主因也是發生在 merge 這個階段。因為此做法採取的是將同樣 level 的子佇列都合併完之後才會繼續遞迴往上合併,直到成為一個和原本佇列大小為止。但是這種做法會導致一但要排序的子佇列的大小超過了 L1 cache 的大小,cache 中的子佇列會不斷的在不同層級的 cache 中搬移,形成 cache thrashing,對 cache 不友善。

list_sort 之所以能達到 cache 友善是因為其合併方式採取了一個特殊的作法:

合併方式

每次合併時保持合併與不合併的子佇列比例不會大於 2:1,這樣能確保合併的子佇列的大小可以放到 L1 cache 中,進而避免 cache thrashing。而在註解中也有提到藉由 2:1 的比例,可以省下 0.2*n 次的比較操作。

那實作中是如何確保 2:1 的比例呢?

實際上這是一段非常精美的程式碼,只透過了一個變數 count 來紀錄目前讀到的節點數,每當 count 為 2 的冪時不合併,反之則合併,以下表格為

TODO: 利用 Linux 核心產生隨機數

使用 Linux 核心的 Random Number Algorithm Definitions 來產生隨機數,作為上述排序程式 (包含 Linux 核心的實作) 的輸入,以檢驗鏈結串列的排序實作。