--- tags: linux2022 --- # 2022q1 Homework1 (lab0) contributed by < `ray90514` > ## 實驗環境 ```shell $ gcc --version gcc (Ubuntu 9.3.0-17ubuntu1~20.04) 9.3.0 Copyright (C) 2019 Free Software Foundation, Inc. This is free software; see the source for copying conditions. There is NO warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. $ lscpu Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little Endian Address sizes: 43 bits physical, 48 bits virtual CPU(s): 8 On-line CPU(s) list: 0-7 Thread(s) per core: 2 Core(s) per socket: 4 Socket(s): 1 NUMA node(s): 1 Vendor ID: AuthenticAMD CPU family: 23 Model: 24 Model name: AMD Ryzen 5 PRO 3500U w/ Radeon Vega Mobile Gfx Stepping: 1 Frequency boost: enabled CPU MHz: 1675.446 CPU max MHz: 2100.0000 CPU min MHz: 1400.0000 BogoMIPS: 4192.05 Virtualization: AMD-V L1d cache: 128 KiB L1i cache: 256 KiB L2 cache: 2 MiB L3 cache: 4 MiB NUMA node0 CPU(s): 0-7 ``` ## 開發過程 ### q_new ```c struct list_head *q_new() { struct list_head *head = malloc(sizeof(struct list_head)); if (!head) return NULL; INIT_LIST_HEAD(head); return head; } ``` 檢查 `malloc` 是否成功以及使用 `INIT_LIST_HEAD` 初始化。 ### q_free ```c void q_free(struct list_head *head) { if (!head) return; struct list_head *node; struct list_head *safe; list_for_each_safe (node, safe, head) { q_release_element(list_entry(node, element_t, list)); } free(head); } ``` 因為要釋放節點及其所屬的 entry ,所以使用 `list_for_each_safe` 而不是 `list_for_each` 。前者會在走訪節點的過程,保存每個節點的 `next` 指標,所以可以安全釋放記憶體,而不會影響後續走訪。 :::warning 使用通順的漢語書寫。 :notes: jserv ::: ### q_insert_head ```c bool q_insert_head(struct list_head *head, char *s) { if (!head) return false; element_t *element = malloc(sizeof(element_t)); if (!element) return false; element->value = strdup(s); if (!element->value) { free(element); return false; } list_add(&element->list, head); return true; } ``` 使用 `list_add` 將節點插入佇列的開頭,若是字串的記憶體分配失敗時,要記得釋放節點的記憶體。 ### q_insert_tail 與 `q_insert_head` 相似,將 `list_add` 改成 `list_add_tail` 。 ### q_remove_head ```c element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize) { if (!head || list_empty(head)) return NULL; element_t *element = list_first_entry(head, element_t, list); list_del(&element->list); if (sp) { strncpy(sp, element->value, bufsize - 1); sp[bufsize - 1] = '\0'; } return element; } ``` 使用 `list_first_entry` 取得開頭的entry,利用 `list_del` 將其自佇列中移除。再判斷是否有要將 `value` 字串裡的值複製到 `sp` 。使用 `strncpy` 可以複製給定數量的前 n 個字元,因此不保證 `'\0'` 做結尾,需要手動補上。 ### q_remove_tail 與 `q_insert_head` 相似,將 `list_first_entry` 改成 `list_last_entry` ,以取得尾端的entry 。 ### q_size ```c int q_size(struct list_head *head) { if (!head) return 0; struct list_head *node; int len = 0; list_for_each (node, head) len++; return len; } ``` 走訪佇列中所有節點,紀錄過程中的節點數量。 ### q_delete_mid ```c bool q_delete_mid(struct list_head *head) { if (!head || list_empty(head)) return false; struct list_head *forward = head->next; struct list_head *backward = head; while (forward != backward) { backward = backward->prev; if (forward == backward) break; forward = forward->next; } list_del(forward); q_release_element(list_entry(forward, element_t, list)); return true; } ``` 由兩個指標一個向前走訪,一個向後走訪,當兩個指標相同時所指到的節點即為中間的節點。因為索引是由0開始,所以由 `forward` 指標先走訪。 ### q_delete_dup ```c bool q_delete_dup(struct list_head *head) { if (!head) return false; struct list_head *node = head->next->next; struct list_head *prev = head; while (node != head) { element_t *element = list_entry(node, element_t, list); element_t *prev_element = list_entry(node->prev, element_t, list); if (strcmp(element->value, prev_element->value) == 0) { while (node != head && strcmp(element->value, prev_element->value) == 0) { q_release_element(prev_element); node = node->next; prev_element = element; element = list_entry(node, element_t, list); } q_release_element(prev_element); prev->next = node; node->prev = prev; if (node != head) node = node->next; } else { node = node->next; prev = prev->next; } } return true; } ``` 由 `prev` 跟 `node` 這兩個相差一個節點的指標進行走訪,如果 `node` 與 `node->prev` 所指向節點的字串相同時,會找出下一個不相同的節點。 ![](https://i.imgur.com/VQQWrj6.png) 然後將相同字串的節點刪除,把 `prev` 所指到的節點與 `node` 所指到的節點相連, `node` 設成 `node->next` ,讓兩指標維持相差一個節點的狀態。 ![](https://i.imgur.com/bFKB4Xq.png) :::danger 使用 Graphviz 重新繪製上圖。 :notes: jserv ::: #### 改進 使用 `list_for_each_entry_safe` 簡化程式碼,加入布林變數 `is_dup` 用來判斷指標所指的節點是否為重複節點的最後一個 。 ```c bool q_delete_dup(struct list_head *head) { if (!head) return false; bool is_dup = false; element_t *entry; element_t *safe; struct list_head *prev = head; list_for_each_entry_safe(entry, safe, head, list) { if(&safe->list != head && strcmp(entry->value, safe->value) == 0) { q_release_element(entry); is_dup = true; } else if (is_dup) { is_dup = false; q_release_element(entry); prev->next = &safe->list; safe->list.prev = prev; } else { prev = prev->next; } } return true; } ``` ### q_swap ```c void q_swap(struct list_head *head) { if (!head || list_empty(head)) return; for (struct list_head *node = head->next; node != head && node->next != head; node = node->next) { struct list_head *next = node->next; node->prev->next = next; next->next->prev = node; node->next = next->next; next->next = node; next->prev = node->prev; node->prev = next; } } ``` 使用 `node` 指標走訪整個佇列,交換 `node` 與 `node->next` 的位置。 ### q_reverse ```c void q_reverse(struct list_head *head) { if (!head || list_empty(head)) return; struct list_head *node = head; struct list_head *next = node->next; do { node->next = node->prev; node->prev = next; node = next; next = node->next; } while (node != head); } ``` 走訪整個佇列,交換每個節點的 `next` 和 `prev` 。 ![](https://i.imgur.com/kyKFDYb.png) ![](https://i.imgur.com/oyZdiYJ.png) ### q_sort ```c #define SORT_BUFSIZE 32 void q_sort(struct list_head *head) { if (!head || head->next == head->prev) return; struct list_head *pending[SORT_BUFSIZE] = {}; struct list_head *result = head->next; struct list_head *next; int i; head->prev->next = NULL; while (result) { next = result->next; result->next = NULL; for (i = 0; i < SORT_BUFSIZE && pending[i]; i++) { result = merge(pending[i], result); pending[i] = NULL; } if (i == SORT_BUFSIZE) i--; pending[i] = result; result = next; } /*merge final*/ result = NULL; for (i = 0; i < SORT_BUFSIZE - 1; i++) { result = merge(pending[i], result); } merge_final(head, result, pending[SORT_BUFSIZE - 1]); } ``` 改寫自 [Bottom-up implementation using lists](https://en.wikipedia.org/wiki/Merge_sort#Bottom-up_implementation_using_lists) 。演算法的運作大致如下: - 遍歷整個佇列,每次自佇列移除一個節點加入 `pending` 這個陣列。 - 從陣列的第0個開始,如果該位置有存指標的話,就將它與節點合併。 ![](https://i.imgur.com/CuEtS7e.png) - 把結果放到下一個位置,如果下一個位置一樣有存指標,就繼續合併,重複直到陣列有空的位置可以放。 ![](https://i.imgur.com/AA11s6o.png) 串列長度為$1, 2, 4, 8, 16, ..., 2^i$ ,對應到陣列的第 i 個。 #### merge ```c struct list_head *merge(struct list_head *a, struct list_head *b) { struct list_head head = {.next = NULL}; struct list_head *tail = &head; while (a && b) { char *sa = list_entry(a, element_t, list)->value; char *sb = list_entry(b, element_t, list)->value; /* if equal, take 'a' -- important for sort stability */ struct list_head **smaller = strcmp(sa, sb) <= 0 ? &a : &b; tail->next = *smaller; tail = tail->next; *smaller = (*smaller)->next; } tail->next = (struct list_head *) ((uintptr_t) a | (uintptr_t) b); return head.next; } ``` 兩個串列的開頭比較,將較小的節點放入排序好的串列。 ``` tail->next = *smaller; ``` 這裡利用到了指標的指標 `smaller` 可以直接將較小串列的開頭移除。 ```c *smaller = (*smaller)->next; ``` 至於 `merge_final` 是用來做最後一次的合併,除了原本的合併操作外還有讓單向串列變回原本雙向串列。 ### shuffle :::danger command 是「命令」,instruction 是「指令」,二者語意不同。 :notes: jserv ::: 使用以下程式將<s>指令</s>命令 `shuffle` 加入至 `qtest` ```c ADD_COMMAND(shuffle, " | Shuffle every nodes in queue"); ``` 執行命令 `shuffle` 會呼叫到 `do_shuffle` 。裡面包含初始化和檢查輸入的參數,主要的洗牌程式是在 `q_shuffle` 中。 ```c void q_shuffle(struct list_head *head) { if (!head) return; for (int i = q_size(head); i > 0; i--) { struct list_head *node = head->next; for(int j = rand() % i; j > 0; j--) { node = node->next; } list_move_tail(node, head); } } ``` 1. 從佇列隨機選擇距離開頭0到i-1的一個節點, 將其放入尾端。 ![](https://i.imgur.com/qZ8Z4dA.png) ![](https://i.imgur.com/oAeTXmF.png) 2. 重複以上動作直到所有節點都被挑選,最後得到的就是洗牌過後的佇列。 ![](https://i.imgur.com/vpdBAQt.png) 這個方始是從原始的Fisher–Yates shuffle改良而來,因為這裡的佇列頭尾相連,所以可以放入直接放回佇列而不用另外放,而且因為是固定放入尾端,所以不用擔心會挑選到已經選過得節點。 ### web 當輸入命令 `web_cmd [port]` 後會初始化 socket ,以及關閉 `linenoise` 。 ```c static bool do_web_cmd(int argc, char *argv[]) { int port = DEFAULT_PORT; if (listenfd > 0) return true; if (argc == 2) { get_int(argv[1], &port); } listenfd = open_listenfd(port); if (listenfd > 0) { report(1, "listen on port %d, fd is %d", port, listenfd); } else { report(1, "Fail to run web server"); return false; } int flags = fcntl(listenfd, F_GETFL); fcntl(listenfd, F_SETFL, flags | O_NONBLOCK); noise = false; return true; } ``` 原本使用 `linenoise` 讀取輸入會改用 `cmd_select` 讀取輸入。 ```c bool run_console(char *infile_name) { if (!push_file(infile_name)) { report(1, "ERROR: Could not open source file '%s'", infile_name); return false; } if (!has_infile) { char *cmdline; while (noise && (cmdline = linenoise(prompt)) != NULL) { interpret_cmd(cmdline); linenoiseHistoryAdd(cmdline); /* Add to the history. */ linenoiseHistorySave(HISTORY_FILE); /* Save the history on disk. */ linenoiseFree(cmdline); } if (!noise) { while (!cmd_done()) { cmd_select(0, NULL, NULL, NULL, NULL); } } } else { while (!cmd_done()) cmd_select(0, NULL, NULL, NULL, NULL); } return err_cnt == 0; } ``` 接下來是修改 `cmd_select` ,先將 server socket 的 file descriptor 加入 `readfds` , `readfds` 的類型是 `fd_set` ,作為 file descriptor 的集合。 ```c if (listenfd != -1) FD_SET(listenfd, readfds); ``` 將 `select` 的第一個參數 `nfds` 設成要監測的 file descriptor 中的最大值加一。 ```c if (listenfd >= nfds) nfds = listenfd + 1; ``` 呼叫 `select` 後,會將 `readfds` 中未準備好讀取的 file decriptor 移除, `FD_ISSET` 可以確認給定的 file descriptor 是否還在 `readfds` 。 `process` 會回傳處理好的 http 請求。 ```c int result = select(nfds, readfds, writefds, exceptfds, timeout); if (result <= 0) return result; infd = buf_stack->fd; if (readfds && FD_ISSET(infd, readfds)) { /* Commandline input available */ FD_CLR(infd, readfds); result--; char *cmdline; cmdline = readline(); if (cmdline) interpret_cmd(cmdline); } else if (readfds && FD_ISSET(listenfd, readfds)) { FD_CLR(listenfd, readfds); result--; int connfd; struct sockaddr_in clientaddr; socklen_t clientlen = sizeof(clientaddr); connfd = accept(listenfd, (SA *) &clientaddr, &clientlen); char *p = process(connfd, &clientaddr); if (p) interpret_cmd(p); free(p); close(connfd); } ``` `infd` 是原本輸入的 file descriptor ,它的值 `buf_stack->fd` 在 `push_file` 被初始化,判斷有沒有檔案作為輸入,如果沒有就是標準輸入。 ## q_sort 與 list_sort 接續前述之 `q_sort` ,Linux Kernel v5.2 以前的 `list_sort` 採用了類似的實作。這個演算法的缺點是,當佇列長度遠大於2的陣列長度次方時,會變得像是插入排序,複雜度為 $O(n^2)$ 。而Linux Kernel v5.2 以後改進的版本則沒有這個限制。 - 舊版的 `list_sort` (Linux Kernel v5.1.21) ```c void list_sort(void *priv, struct list_head *head, int (*cmp)(void *priv, struct list_head *a, struct list_head *b)) { struct list_head *part[MAX_LIST_LENGTH_BITS+1]; /* sorted partial lists -- last slot is a sentinel */ int lev; /* index into part[] */ int max_lev = 0; struct list_head *list; if (list_empty(head)) return; memset(part, 0, sizeof(part)); head->prev->next = NULL; list = head->next; while (list) { struct list_head *cur = list; list = list->next; cur->next = NULL; for (lev = 0; part[lev]; lev++) { cur = merge(priv, cmp, part[lev], cur); part[lev] = NULL; } if (lev > max_lev) { if (unlikely(lev >= ARRAY_SIZE(part)-1)) { printk_once(KERN_DEBUG "list too long for efficiency\n"); lev--; } max_lev = lev; } part[lev] = cur; } for (lev = 0; lev < max_lev; lev++) if (part[lev]) list = merge(priv, cmp, part[lev], list); merge_and_restore_back_links(priv, cmp, head, part[max_lev], list); } ``` 這裡給出了長度過長的訊息。 ```c if (unlikely(lev >= ARRAY_SIZE(part)-1)) { printk_once(KERN_DEBUG "list too long for efficiency\n"); lev--; } ``` - 新版的 `list_sort` : 如果要儲存任意長度的資料,自然是想到用 Linked list ,新版用 `struct list_head *pending` 取代原本的 `struct list_head *part[MAX_LIST_LENGTH_BITS+1]` ,以及加入了 `size_t count` 用來判斷合併的順序。 ### pending 與其再用 `struct list_head` 建立串列的串列,這裡使用比較巧妙的作法,因為合併的過程中只用到 `next` 指標,所以將 `prev` 指標當作連接串列的指標,如圖所示。 ![](https://i.imgur.com/ln6opxn.png) ### count count的是用來做延遲合併的計算,與原本的演算法不同,當 `pending` 中,只有在有兩個 $2^k$ 以及接續的多組串列總數為 $2^k$ 時才會將這兩個 $2^k$ 的串列合併成一個 $2^{k+1}$ 的串列,以避免 cache thrashing ,詳細可以參考 [Linux 核心的 list_sort 實作](https://hackmd.io/@sysprog/c-linked-list#Linux-%E6%A0%B8%E5%BF%83%E5%8E%9F%E5%A7%8B%E7%A8%8B%E5%BC%8F%E7%A2%BC) 。 ### 比較 測試用的 `list_sort`, `merge` 和 `merge_finale` 改成與 `q_sort` 相同的版本,以及移除 `likely()` 其餘不變。測試內容為排序 N 個隨機長度為 1 的字串,執行以下程式碼,使用 `perf stat --repeat 10 ./test` 取得 `task-clock`。 ```c #define N 960000 int main(int argc, char *argv[]) { struct list_head *h1 = q_new(); char s[2] = {0}; for(int i = 0; i < N; i++) { s[0] = rand(); q_insert_head(h1, s); } if(argc == 2 && argv[1][0] == 'l') q_linux_sort(h1); else q_sort(h1); } ``` 下圖為實驗的結果,佇列長度從 750 開始每次加倍,直到 96000 ,我的 L1d cache 大小是 128 KiB ,塞滿需要3278的節點。從結果可以看出隨著佇列長度增加, `list_sort` 快 `q_sort` 約 10% ,延遲合併是更有效率的。而在長度不長時,`q_sort` 使用陣列儲存待合併的串列,相比`list_sort` 的串列有更好的表現。 ![](https://i.imgur.com/nqNgOWC.png) 接下來是對 `q_sort` 長度限制的實驗。 固定對 $2^{16}$ 個字串排序,調整 `pending` 陣列的大小,從 8 到 16。以下是結果,可以看出當陣列長度不足時,效率明顯下降。 ![](https://i.imgur.com/ZMQPqxe.png) ## RIO套件 `console.c` 將原本RIO套件的 `rio_t` 加入了 `rio_ptr prev` ,將檔案以堆疊的方法儲存, 使用 `push_file()` 跟 `pop_file()` 操作,以及一個指向頂部的指標 `buf_stack` 。 ```c struct RIO_ELE { int fd; /* File descriptor */ int cnt; /* Unread bytes in internal buffer */ char *bufptr; /* Next unread byte in internal buffer */ char buf[RIO_BUFSIZE]; /* Internal buffer */ rio_ptr prev; /* Next element in stack */ }; ``` RIO套件主要用在 `readline()` ,用來從堆疊頂部的檔案讀取輸入。與一般的 `read` 相比, `readline()` 是 Buffered I/O,也就是會預先將檔案讀取至記憶體,而使用的時候改從記憶體讀取,這樣可以減少 system call 的次數。 ## simulation 在做`trace-17-complexity` 的測試的時候發現, `remove_tail` 常常有不過的情形,而 `remove_head` 都能通過,明明兩個函式只有一處不相同。接下來找到 `constant.c` ,裡面有測試 `remove_tail` 的程式碼。 ```c case test_remove_tail: for (size_t i = drop_size; i < n_measure - drop_size; i++) { dut_new(); dut_insert_head( get_random_string(), *(uint16_t *) (input_data + i * chunk_size) % 10000); before_ticks[i] = cpucycles(); element_t *e = q_remove_tail(l, NULL, 0); after_ticks[i] = cpucycles(); if (e) q_release_element(e); dut_free(); } break; ``` 乍看之下跟 `test_remove_head` 的部份除了呼叫 `q_remove_tail` 完全一樣,但突然想到會不會是 cache miss 的緣故,因為 `remove_tail` 的測試使用 `dut_insert_head` 將節點插入至頭部,但移除的節點是在尾端,計算了一下我的 L1d Cache 能容納大概 128 * 1024 / (24 + 8 * 8) = 1489 個節點,而插入大於 1489 的機率很大。 為了驗證我的假設,我將 `dut_insert_head` 改成 `dut_insert_tail` ,此時發生了另外一個問題,程式看起來像是卡住一樣非常非常慢。我使用 `perf record` 看一下發生什麼事。以下是結果。 - remove_head ``` 18.01% qtest qtest [.] test_malloc 12.08% qtest [kernel.kallsyms] [k] _extract_crng 11.67% qtest libc-2.31.so [.] _int_malloc 9.79% qtest libc-2.31.so [.] _int_free 9.36% qtest qtest [.] test_strdup 7.86% qtest qtest [.] test_free ``` - remove_tail ``` 92.56% qtest qtest [.] test_free ``` 兩者的 `test_free` 之所以差這麼多,是因為我的 `q_free` 釋放的順序是從頭到尾,與插入的順序相反,加上節點空間大於 cache size 導致大量 cache miss ,從而導致所花的時間變長。 我將 `q_free` 改成反向的版本後,確實測試通過的次數與 `remove_tail` 差不多。也就證實了是插入的順序導致了 cache miss。 ### 缺陷 這裡的缺陷有兩個,第一個是 `dut_free` 跟 `dut_insert_tail` 造成的緩慢,目前想到的解決辦法是使用建立與釋放順序相同的函式,或是維持 `dut_insert_head` 然後使用第二個問題的解決辦法。 第二個問題是 cache miss 所造成的影響,實際上在原本的論文〈Dude, is my code constant time?〉有提到。 > To minimize the effect of environmental changes in the results 這裡就是因為處理數據的程式而不是原本待測程式造成結果的改變。對於 `test_remove_tail` 有比較針對的解決辦法,像是固定將最後一個節點從尾端插入。 想到比較通用的解法是刷新整個 cache 讓測試的時候固定發生 cache miss。