# Linux 核心專題: 改進 lab0 > 執行人: andy155224 :::success :question: 提問清單 * ? ::: ## 任務簡述 重做 [lab0](https://hackmd.io/@sysprog/linux2023-lab0) 並彙整其他學員的成果。 ## TODO: 達成作業所有要求 ### 開發環境 ```shell! $ gcc --version gcc (Ubuntu 11.3.0-1ubuntu1~22.04.1) 11.3.0 Copyright (C) 2021 Free Software Foundation, Inc. This is free software; see the source for copying conditions. There is NO warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. $ lscpu Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Address sizes: 46 bits physical, 48 bits virtual Byte Order: Little Endian CPU(s): 24 On-line CPU(s) list: 0-23 Vendor ID: GenuineIntel Model name: 13th Gen Intel(R) Core(TM) i7-13700 CPU family: 6 Model: 183 Thread(s) per core: 2 Core(s) per socket: 16 Socket(s): 1 Stepping: 1 CPU max MHz: 5200.0000 CPU min MHz: 800.0000 BogoMIPS: 4224.00 ``` ### 開發過程 #### q_new 宣告一個結構體 `list_head` 的指標 `head` ,然後透過 list.h 中的 `INIT_LIST_HEAD()` 來對 `head` 配置記憶體。 ```c struct list_head *q_new() { struct list_head *head = malloc(sizeof(struct list_head)); if (head) { INIT_LIST_HEAD(head); return head; } return NULL; } ``` #### q_free 透過 `list_for_each_entry_safe` 逐一走訪 queue 中的每一個 element,並透過 `container_of` 計算每一個 element 的記憶體起始位置,計算出位置後再用 `q_release_element` 將整個結構體 `element` 所配置的記憶體空間,包含結構體內的其他結構體和指標變數所指向的記憶體一同釋放,最後再將 queue 的 head `l` 釋放掉,以保證整個 queue 的記憶體空間都被釋放。 ```c void q_free(struct list_head *l) { if (!l) return; element_t *pos, *next; list_for_each_entry_safe (pos, next, l, list) { q_release_element(container_of(&pos->list, element_t, list)); } free(l); } ``` #### q_insert_head 原本的寫法如下: ```c bool q_insert_head(struct list_head *head, char *s) { if (!head || !s) return false; element_t *n = malloc(1 * sizeof(element_t)); char *value = malloc((strlen(s) + 1) * sizeof(char)); if (!n || !value) return false; memcpy(value, s, strlen(s) + 1); n->value = value; list_add(&n->list, head); return true; } ``` 但 `qwe661234` 點出上述程式碼在 `element_t *n` 成功配置,但 `char *value` 配置失敗時,會導致配置成功的 `element_t *n` 之記憶體沒有被釋放。所以修改為: ```diff bool q_insert_head(struct list_head *head, char *s) { if (!head || !s) return false; element_t *n = malloc(1 * sizeof(element_t)); + if (!n) + return false; char *value = malloc((strlen(s) + 1) * sizeof(char)); - if (!n || !value) + if (!value) { + free(n); return false; + } memcpy(value, s, strlen(s) + 1); n->value = value; list_add(&n->list, head); } ``` #### q_insert_tail 函式運作的邏輯和 `q_insert_head` 只差在將 `list_add()` 改為 `list_add_tail()`。 #### q_remove_head `strncpy`, `snprintf` 和 `strlcpy` 的差異。因為 `strcpy` 會存在潛在的 buffer overflow 的問題,所以應該用 `strncpy` 來限制寫入的位元組的大小。但是 `strncpy` 也並非是安全的,因為有可能會發生 `dest` 字串的結尾並沒有 null terminator 的問題,所以有了更安全的 `strlcpy` 能選擇。`strlcpy` 如果遇到寫入的位元組大小比 `src` 的位元組大小還要小的時候,會在欲寫入的位元組大小的最後一個位元填入 null terminator 。但是因為這需要安裝額外的 package 才能 include `bsd/string.h`,所以我選擇使用 `snprintf` 來達到一樣的效果。 ```c element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize) { if (!head || list_empty(head)) return NULL; element_t *tmp = list_first_entry(head, element_t, list); list_del_init(&tmp->list); if (sp) snprintf(sp, bufsize, "%s", tmp->value); return tmp; } ``` #### q_remove_tail 函式運作的邏輯和 `q_insert_head` 只差在將 `list_first_entry()` 改為 `list_last_entry()`。 #### q_size 透過 `list_for_each()` 逐一走訪 queue 中的每個 element 並用 `int len` 來計數。 ```c int q_size(struct list_head *head) { if (!head) return 0; int len = 0; struct list_head *li; list_for_each (li, head) len++; return len; } ``` #### q_delete_mid 透過一個型別為 `list_head` 的指標變數 `tmp` 和一個布林變數 `flag`,在逐一走訪 `queue` 中的每一個 entry 時,每當 `flag` 為 true 時將 `tmp` 指向其下一個 entry 的 list。這樣當走訪完整個 `queue` 後 `tmp` 就會指向 `queue` 的第 $⌊n / 2⌋$ 個 entry,然後再將其刪除。 ```c bool q_delete_mid(struct list_head *head) { if (!head || list_empty(head)) return false; bool flag = true; element_t *pos; struct list_head *tmp = head; list_for_each_entry (pos, head, list) { if (flag) tmp = tmp->next; flag = !flag; } element_t *mid = list_entry(tmp, element_t, list); list_del_init(&mid->list); q_release_element(mid); return true; } ``` 但因為 `flag` 會反覆在 0 和 1 之間變化,造成以下程式碼在 ```c if (flag) tmp = tmp->next; ``` 不做任何處理的情況下會產生 $⌊queue\ size/2⌋$ 次 branch。 要減少 branch 次數的其中一種做法是透過快慢指標來找尋中間節點,但因為 [lab0](https://hackmd.io/@sysprog/linux2023-lab0/%2F%40sysprog%2Flinux2023-lab0-a) 所使用的資料結構為 doubly linked-list,所以可以考慮到雙向的特性,用更少次的操作找到中間節點,如下: ```c bool q_delete_mid(struct list_head *head) { if (!head || list_empty(head)) return false; struct list_head *right, *left; right = head->next; left = head->prev; while (right != left && left->prev != right) { right = right->next; left = left->prev; } list_del(left); q_release_element(list_entry(left, element_t, list)); return true; } ``` 透過兩個指標 `right` 和 `left`,每次迭代時 `right` 會指向 `right->next`, `left` 會指向 `left->prev`,這樣一旦 `right` == `left` 或是 `left->prev` == `right`,此時的 `left` 即為中間節點。藉由這種做法就能在比使用快慢指標還少的操作下同時也有較少的 branch 產生。 #### q_delete_dup 透過 `list_for_each_entry_safe` 來逐一走訪整個 queue,每次迭代時會比較 `curr->value` 和 `next->value` 是否相同,如果相同的話將 `curr` 刪除並釋放記憶體空間,並把布林變數 `flag` 設為 true 以在 `curr-> value` 和 `next->value` 不同時將 `curr` 刪除並釋放記憶體空間。 ```c bool q_delete_dup(struct list_head *head) { if (!head || list_empty(head)) return false; bool flag = false; element_t *curr, *next; list_for_each_entry_safe (curr, next, head, list) { if (&next->list != head && strcmp(curr->value, next->value) == 0) { flag = true; list_del(&curr->list); q_release_element(curr); } else if (flag) { flag = false; list_del(&curr->list); q_release_element(curr); } } return true; } ``` #### q_swap 透過 `list_for_each()` 逐一走訪整個 queue,每當布林變數 `flag` 為 true 時用 `list_move()` 將 `node` 移至 `tmp` 前,最後再將 `node` 和 `tmp` 指向 `node->next` 以確保走訪的正確性。 ```c void q_swap(struct list_head *head) { if (!head || list_empty(head)) return; bool flag = false; struct list_head *node, *tmp = head; list_for_each (node, head) { if (flag == true) { list_move(node, tmp); node = node->next; tmp = node; } flag = !flag; } } ``` #### q_reverse 透過 `list_for_each_safe()` 逐一走訪整個 queue 並將走訪到的元素用 `list_move()` 移至 `head` 後面,進而完成 reverse 整個 queue。 ```c void q_reverse(struct list_head *head) { if (!head || list_empty(head)) return; struct list_head *node, *safe; list_for_each_safe (node, safe, head) { list_move(node, head); } } ``` #### q_reverseK 原本的想法是透過 `list_for_each_safe()` 逐一走訪整個 queue,同時透過 `int countK` 來計數,走訪時會將元素 `node` 移至 `tmp` 後面,每當 `countK mod k == 0` 時調整 `tmp` 的位置來完成功能。 ```c void q_reverseK(struct list_head *head, int k) { if (!head || list_empty(head)) return; int countK = 0; struct list_head *node, *safe, *tmp = head; list_for_each_safe (node, safe, head) { countK++; list_move(node, tmp); if (countK % k == 0) { tmp = safe->prev; } } } ``` 但是實際的效果和預期的不同,因為這和功能所述的 `Reverse the nodes of the list k at a time`,不一致,上述程式碼是每次都會執行 `list_move` 而非當 `countK mod k == 0` 時才將前面 `k` 個元素 reverse,差異如下: ``` l = [1 2 3 4 5] cmd> reverseK 3 l = [3 2 1 5 4] <-- 執行結果 l = [3 2 1 4 5] <-- 預期結果 ``` 而我想到的解決方式是額外宣告一個變數 `int count` 並將其設為 `q_size / k`,也就是總共會發生的 reverse 次數。這樣就能透過 `count` 來避免額外的 reverse 發生,只有在 `count > 0` 時才會執行 `list_move()`。 ```diff void q_reverseK(struct list_head *head, int k) { if (!head || list_empty(head)) return; int countK = 0; + int count = q_size(head) / k; struct list_head *node, *safe, *tmp = head; list_for_each_safe (node, safe, head) { countK++; + if (count) list_move(node, tmp); if (countK % k == 0) { tmp = safe->prev; + count--; } } } ``` #### q_sort 參考了 [你所不知道的 C 語言: linked list 和非連續記憶體](https://hackmd.io/@sysprog/c-linked-list#%E6%A1%88%E4%BE%8B%E6%8E%A2%E8%A8%8E-LeetCode-21-Merge-Two-Sorted-Lists) ,思路是透過 Divide and Conquer 先遞迴將佇列拆分成 `q_size` 個子佇列,再兩兩排序合併,最後就會得到一個已排序的佇列。 ```c void mergeTwoLists(struct list_head *L1, struct list_head *L2) { if (!L1 || !L2) return; struct list_head *Lnode = L1->next, *Rnode = L2->next; while (Lnode != L1 && Rnode != L2) { if (strcmp(list_entry(Lnode, element_t, list)->value, list_entry(Rnode, element_t, list)->value) > 0) { struct list_head *tmp = Rnode->next; list_move_tail(Rnode, Lnode); Rnode = tmp; } else { Lnode = Lnode->next; } } if (q_size(L2) != 0) { list_splice_tail(L2, L1); } } /* Sort elements of queue in ascending order */ void q_sort(struct list_head *head) { if (!head || list_empty(head) || q_size(head) == 1) return; struct list_head *tmp, *mid; tmp = head->next; mid = head->prev; while (tmp != mid && mid->prev != tmp) { tmp = tmp->next; mid = mid->prev; } LIST_HEAD(right); list_cut_position(&right, head, mid->prev); q_sort(head); q_sort(&right); mergeTwoLists(head, &right); } ``` #### q_descend 因為 [lab0](https://hackmd.io/@sysprog/linux2023-lab0/%2F%40sysprog%2Flinux2023-lab0-a) 所使用的資料結構為 doubly linked-list,所以只需要從 queue 的尾部開始逐一反向走訪,並且不斷去比較 `struct list_head *curr` 對應的 element 的 value 是否大於 `struct list_head *curr->prev` 對應的 element 的 value,如果大於的話則將 `struct list_head *curr->prev` 對應的 element 刪除即可。 ```c int q_descend(struct list_head *head) { if (!head || list_empty(head)) return 0; struct list_head *curr = head->prev; while (curr->prev != head) { if (strcmp(list_entry(curr, element_t, list)->value, list_entry(curr->prev, element_t, list)->value) > 0) { struct list_head *tmp = curr->prev; list_del(tmp); q_release_element(list_entry(tmp, element_t, list)); } else { curr = curr->prev; } } return q_size(head); } ``` #### q_merge 先透過 `container_of` 找出連接著各個 queue 的結構體 `queue_contex_t *queue_head` 的記憶體位置,然後逐一走訪相連著的 queue 並透過 `mergeTwoLists()` 將第二個至最後一個 queue 合併進第一個 queue 中。因為每一個 queue 預設都已排序,所以 `mergeTwoLists()` 完就會是一個已排序的結果。 之所以不先將所有 queue 都透過 `list_splice_init` 合併後再用 `q_sort()` 合併是因為每一個 queue 都已排序,若再執行 `q_sort()` 中 divide 顯而易見是多餘的。 在 `list.h` 中的 `list_splice_init()` 有提到 "The @list head will not end up in an uninitialized state like when using list_splice. Instead the @list is initialized again to the an empty list/unlinked state.",所以在迴圈中每當合併完兩個 queue 都需要 `INIT_LIST_HEAD(queue->q)`。 ```c int q_merge(struct list_head *head) { if (!head) return 0; queue_contex_t *queue_head = container_of(head->next, queue_contex_t, chain); for (struct list_head *curr = head->next->next; curr != head; curr = curr->next) { queue_contex_t *queue = container_of(curr, queue_contex_t, chain); mergeTwoLists(queue_head->q, queue->q); INIT_LIST_HEAD(queue->q); queue->size = 0; } return q_size(queue_head->q); } ``` ### 引入 [lib/list_sort.c](https://github.com/torvalds/linux/blob/master/lib/list_sort.c) 到 `lab0-c` 將 [list_sort.c](https://github.com/torvalds/linux/blob/master/lib/list_sort.c) 和 [list_sort.h](https://github.com/torvalds/linux/blob/master/include/linux/list_sort.h) 放到 `lab0` 的專案下並修改 Makefile 使得在 make 時會編譯 `list_sort.c` ```diff OBJS := qtest.o report.o console.o harness.o queue.o \ random.o dudect/constant.o dudect/fixture.o dudect/ttest.o \ shannon_entropy.o \ linenoise.o web.o \ + list_sort.o ``` 在 `list_sort.c` 中有使用到 `likely()` 和 `unlikely`,這是在 [linux/compiler.h](https://github.com/torvalds/linux/blob/master/include/linux/compiler.h) 裡的巨集,顧將這兩個巨集的定義搬移至 `lab0` 專案中的 `list_sort.c` 中。 ```c #define likely(x) __builtin_expect(!!(x), 1) #define unlikely(x) __builtin_expect(!!(x), 0) ``` 修改 `list_sort.c` 和 `list_sort.h`,調整函式的參數並修改一些型別定義使得 `list_sort.c` 能在 `lab0` 中運作。 - 將各個函式中的參數 `void *priv` 移除 - 調整各個函式的 `nonnull()` - 將 `list_sort.c` 中定義的型別 `u8` 改成 `uint8_t` - 在 `merge()` 中針對 `struct list_head *head` 去抑制 cppcheck 的檢查,因為 `head` 不會是一個 unassigned variable 將 `list_sort.c` 中的 `cmp()` 都改成使用 `strcmp()` 來比較字串大小,並抑制 cppcheck 的 nullPointer 檢查。 ```c // cppcheck-suppress nullPointer if (strcmp(list_entry(a, element_t, list)->value, // cppcheck-suppress nullPointer list_entry(b, element_t, list)->value) <= 0) ``` ### 在 `console.c` 中新增 parameter `algo` 藉由全域變數 `algo` 來選擇排序演算法要使用 `q_sort()` 還是 `list_sort()` ```c int algo = 0; ``` 透過 `add_param()` 新增有關 `algo` 的說明,並且這樣就能使用 `option algo ` 來變更 `algo` 的值,以此決定要使用哪一個排序演算法。 ```c add_param("algo", &algo, "Select sort algorithm, q_sort = 0, list_sort = 1", NULL); ``` ### 修改 `do_sort` 讓 do_sort() 可以根據 `algo` 的數值來決定要使用哪一個排序演算法。 ```diff - if (current && exception_setup(true)) + if (current && exception_setup(true)) { + if (algo == 0) q_sort(current->q); + else + list_sort(current->q); + } ``` ### 比較自己實作的 `q_sort()` 和 linux kernel 中的 `list_sort()` 之間的效能落差 新增 `trace-sort_algorithm.cmd` 來測試自己實作的 `q_sort()` 和 `list_sort()` 各別在排序 10000、500000 和 1000000 個節點時排序所需要的時間。在這個實驗中會執行十次 `trace-sort_algorithm.cmd`,並統計在不同節點數量的情況下的平均排序時間各別為何。 ```bash option verbose 1 option algo 0 new ih RAND 100000 time sort free option algo 1 new ih RAND 100000 time sort free option algo 0 new ih RAND 500000 time sort free option algo 1 new ih RAND 500000 time sort free option algo 0 new ih RAND 1000000 time sort free option algo 1 new ih RAND 1000000 time sort free ``` | 節點數量 | q_sort 平均排序時間 | list_sort 平均排序時間 | | -------- | -------- | -------- | | 100000 | 0.0226(s) | 0.0213(s) | | 5000000 | 0.236(s) | 0.2548(s) | | 1000000 | 0.7739(s) | 0.6448(s) | 從十次測試的平均中可以看到,在節點數為 100000 時,`list_sort()` 優於 `q_sort()` 6%,在節點數為 500000 時,`q_sort()` 優於 `list_sort()` 7.4%,而在節點數為 1000000 時,`list_sort()` 優於 `q_sort()` 17%。 :::danger 需要解讀效能落差的成因,以及你能否從中學習到其精髓。 :notes: jserv ::: ## TODO: 研讀 Linux 的 `lib/list_sort.c` 並量化分析 解讀 Linux 核心原始程式碼的 `lib/list_sort.c`,設計實驗來量化分析,探討其實作手法,需要善用 perf 一類的工具。解析程式碼要有完整的圖文、數學證明,和如何達到 cache 友善。 ### Bottom up V.S. Top down 首先先來分析自己實作的 `q_sort()` 和 linux kernel 中的 `list_sort()` 到底有何不同。 `q_sort` 採取的是 merge sort,也就是先不斷遞迴分割,直到每一個子佇列都只有一個節點後後再兩兩合併成一個排序過的佇列,而會對 cache 不友善的主因也是發生在 `merge` 這個階段。因為此做法採取的是將同樣 level 的子佇列都合併完之後才會繼續遞迴往上合併,直到成為一個和原本佇列大小為止。但是這種做法會導致一但要排序的子佇列的大小超過了 L1 cache 的大小,cache 中的子佇列會不斷的在不同層級的 cache 中搬移,形成 cache thrashing,對 cache 不友善。 而 `list_sort` 之所以能達到 cache 友善是因為其合併方式採取了一個特殊的作法: #### 合併方式 每次合併時保持合併與不合併的子佇列比例不會大於 2:1,這樣能確保合併的子佇列的大小可以放到 L1 cache 中,進而避免 cache thrashing。而在註解中也有提到藉由 2:1 的比例,可以省下 0.2*n 次的比較操作。 那實作中是如何確保 2:1 的比例呢? 實際上這是一段非常精美的程式碼,只透過了一個變數 `count` 來紀錄目前讀到的節點數,每當 `count` 為 2 的冪時不合併,反之則合併,以下表格為 ## TODO: 利用 Linux 核心產生隨機數 使用 Linux 核心的 [Random Number Algorithm Definitions](https://www.kernel.org/doc/html/latest/crypto/api-rng.html) 來產生隨機數,作為上述排序程式 (包含 Linux 核心的實作) 的輸入,以檢驗鏈結串列的排序實作。