# 2025q1 Homework1 (lab0) contributed by < `Andrushika` > {%hackmd NrmQUGbRQWemgwPfhzXj6g %} ## 開發環境 ```shell $ gcc --version gcc (Ubuntu 13.3.0-6ubuntu2~24.04) 13.3.0 $ lscpu 架構: x86_64 CPU 作業模式: 32-bit, 64-bit Address sizes: 39 bits physical, 48 bits virtual Byte Order: Little Endian CPU(s): 16 On-line CPU(s) list: 0-15 供應商識別號: GenuineIntel Model name: 11th Gen Intel(R) Core(TM) i7-11800H @ 2.30GHz CPU 家族: 6 型號: 141 每核心執行緒數: 2 每通訊端核心數: 8 Socket(s): 1 製程: 1 CPU(s) scaling MHz: 31% CPU max MHz: 4600.0000 CPU min MHz: 800.0000 BogoMIPS: 4608.00 Virtualization features: 虛擬: VT-x Caches (sum of all): L1d: 384 KiB (8 instances) L1i: 256 KiB (8 instances) L2: 10 MiB (8 instances) L3: 24 MiB (1 instance) NUMA: NUMA 節點: 1 NUMA node0 CPU(s): 0-15 ``` ## `queue.[ch]` 佇列操作程式碼實作 ### q_free 為預防被釋放的記憶體空間被意外存取,先使用 `q_release_element` 將 `entry` 逐一刪除;最後釋放 `head` 所使用的記憶體空間。 ```c void q_free(struct list_head *head) { if (!head) return; element_t *entry = NULL, *safe = NULL; list_for_each_entry_safe (entry, safe, head, list) { q_release_element(entry); } free(head); } ``` ### q_insert_head, q_insert_tail #### 實作步驟 > 1. 檢查 `head` ,若為 `NULL` 直接 `return false` > 2. 使用 `malloc` 為 `entry` 分配記憶體空間 > 3. 使用 `list_add` 或 `list_add_tail` 將節點加入鏈結串列中 因為第二個步驟(malloc entry)在兩個函式中完全重複,所以獨立寫成 `create_entry` 函式: ```c element_t *create_entry(char *s) { element_t *entry = malloc(sizeof(element_t)); if (!entry) return false; char *s_dup = strdup(s); if (!s_dup) { free(entry); return false; } entry->value = s_dup; return entry; } ``` `q_insert_head`, `q_insert_tail` 如下,可以直接呼叫 `create_entry` 使用: ```c bool q_insert_head(struct list_head *head, char *s) { if (!head) return false; element_t *entry = create_entry(s); list_add(&entry->list, head); return true; } ``` ```c bool q_insert_tail(struct list_head *head, char *s) { if (!head) return false; element_t *entry = create_entry(s); list_add_tail(&entry->list, head); return true; } ``` ### q_remove_head, q_remove_tail :::info 一開始在閱讀 ``queue.h`` 時,發現這兩個函式關於 `*sp` 的 annotation 有些奇怪: ``` q_remove_tail() - Remove the element from tail of queue @sp: string would be inserted ``` 應修正如下較貼近原意: ``` @sp: Pointer to a buffer where the removed element's string will be copied. ``` ::: #### 實作步驟 > 1. 檢查 `head` ,若為 `NULL` 或 `empty` 則 `return NULL` > 2. 使用 `list_first_entry` 或 `list_last_entry` 取得欲移除的 `element_t*` > 3. 將欲刪除的 `entry->value` 存入 `*sp` > 4. 使用 `list_del` 將其從鏈結串列中刪除 因兩者的實作幾乎相同,在此僅附上 `q_remove_head` 的程式碼: ```c element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize) { if (!head || list_empty(head)) return NULL; element_t *entry = list_first_entry(head, element_t, list); if (sp && bufsize > 0) { strncpy(sp, entry->value, bufsize - 1); sp[bufsize - 1] = '\0'; } list_del_init(&entry->list); return entry; } ``` ### q_delete_mid 本函式的實作難點是「找到正確的 mid node」,因為一開始沒看清楚題目的要求,所以折騰很久。 根據 queue.h 的敘述,應該刪除 `⌊n/2⌋th node`。 #### 實作步驟 > 1. 檢查 `head` ,若為 `NULL` 或 `empty` 則 `return false` > 2. 使用 `q_find_mid` 搭配快慢指標技巧,找到欲刪除的中點 > 3. 使用 **Linux Kernel list API** 提供的 `list_del` 刪除節點,並釋放記憶體空間 ```c struct list_head *q_find_mid(struct list_head *head) { // The middle node of a linked list of size n is the // ⌊n / 2⌋ th node from the start using 0-based indexing. if (!head || list_empty(head)) return NULL; // handle the 1-node case if (list_is_singular(head)) return head->next; // find the middle node struct list_head *slow = head->next, *fast = slow->next->next; while (fast != head && fast->next != head) { slow = slow->next; fast = fast->next->next; } return slow->next; } ``` ```c bool q_delete_mid(struct list_head *head) { // https://leetcode.com/problems/delete-the-middle-node-of-a-linked-list/ if (!head || list_empty(head)) return false; // find the middle node struct list_head *mid = q_find_mid(head); element_t *target = list_entry(mid, element_t, list); // delete node from the list list_del(&target->list); q_release_element(target); return true; } ``` ### q_delete_dup 在已排序過的鏈節串列中,刪除重複出現的節點。 #### 實作步驟 > 1. 檢查 `head` ,若為 `NULL` 或 `empty` 則 `return false` > 2. 使用 `list_for_each_safe` 安全走訪節點(因涉及刪除操作,使用 safe mode) > **a.** 檢查 `curr` 和 `next` 是否相同;若出現重複,則持續刪除 `next` > **b.** 如果出現重複,`curr` 本身也需要被刪除 ```c bool q_delete_dup(struct list_head *head) { // https://leetcode.com/problems/remove-duplicates-from-sorted-list-ii/ if (!head || list_empty(head)) return false; struct list_head *curr, *next; list_for_each_safe (curr, next, head) { element_t *entry = list_entry(curr, element_t, list); if (next != head && strcmp(entry->value, list_entry(next, element_t, list)->value) == 0) { while (next != head && strcmp(entry->value, list_entry(next, element_t, list)->value) == 0) { list_del(next); q_release_element(list_entry(next, element_t, list)); next = curr->next; } list_del(curr); q_release_element(entry); } } return true; } ``` ### q_swap #### 實作步驟 >1. 檢查 `head`,若為 `NULL` 或 `empty` 則直接 `return` >2. 使用 `list_for_each_safe` 安全走訪節點 > **a.** 取出 `curr`(當前節點)與 `next`(下一個節點) > **b.** 若 `next` 為 `head`,表示已走訪完整個鏈結串列,則跳出迴圈 > **c.** 使用 `list_del` 將 `next` 從原位置移除 > **d.** 使用 `list_add` 將 `next` 插入到 `curr` 前一個位置(節點交換) > **e.** 更新 `next`,使其指向 `curr->next`,確保走訪能正確進行 其中 `2.e` 步驟看起來有些微妙。 因為 `swap` 時是兩兩一組,所以會希望 iterator 一次跳躍兩個節點。 `next = curr->next` 除了可以調整 `curr`, `next` 在 `swap` 過後的相對位置外,也剛好讓 iterator 前進了一步;加上 `list_for_each_safe` 的迴圈更新條件,剛好可以讓 iterator 達成前進兩步的目標。 ```c void q_swap(struct list_head *head) { if (!head || list_empty(head)) return; // https://leetcode.com/problems/swap-nodes-in-pairs/ struct list_head *curr, *next; list_for_each_safe (curr, next, head) { if (next == head) break; list_del(next); list_add(next, curr->prev); next = curr->next; } } ``` ### q_reverse #### 實作步驟 >1. 檢查 `head`,若為 `NULL` 或 `empty` 則直接 `return` >2. 使用 `list_for_each_safe` 安全走訪節點 >3. 依序把節點移除,並插入鏈結串列的最前方 實際上就像當作 stack 進行操作,就可以達到翻轉鏈節串列的效果。 ```c void q_reverse(struct list_head *head) { if (!head || list_empty(head)) return; struct list_head *curr, *next; list_for_each_safe (curr, next, head) { list_del(curr); list_add(curr, head); } } ``` ### q_reverseK 這個函式的實作,我參照了先前 `reverse` 的實作方式: * 不斷將新節點插入鏈節串列的最前方,來達成翻轉的效果。 * 不同的是,因為是以 `k` 個節點為一組進行翻轉,在每次走訪時,用一個 `count` 來紀錄當前累積走過了多少節點;當 `count == k` 就立刻進行翻轉操作。 * 雖仿造 `stack` , 但插入點不會永遠都在 `head` ,所以必須不斷紀錄新的插入點。 #### 變數說明 `pending`:下一輪即將進行 reverse 操作的節點 `last`:進行局部 reverse 操作時,`pending` 的插入點 `curr`, `next`:提供給 `list_for_each_safe` 走訪使用的 iterator `count`:紀錄當前組別已累積走過多少個 `node` ```c void q_reverseK(struct list_head *head, int k) { // https://leetcode.com/problems/reverse-nodes-in-k-group/ if (!head || list_empty(head) || k <= 1) return; // nodes waiting to be add & start add point struct list_head *pending = head->next, *last = head; struct list_head *curr, *next, *next_pend; int count = 0; list_for_each_safe (curr, next, head) { if (++count >= k) { while (count) { count--; next_pend = pending->next; list_del(pending); list_add(pending, last); pending = next_pend; } last = next->prev; } } } ``` ### merge_two_sorted_list (utils) 因為後續的 sort, merge 都會涉及合併兩個「已排列鏈節串列」,所以先寫成一個工具函式來簡化操作。需要注意的是,傳入這個函式處理的 `list` 要先打斷 circular list 的頭尾連結,且不需要將 `guard node` 傳入。 #### 實作步驟 >1. 初始化 `guard` 節點,`tail` 指向 `guard` 以追蹤新鏈結串列的最後節點。 >2. 合併兩個排序鏈結串列,依 `q_cmp()` 比較 `a` 和 `b`,將較小者連接到 `tail`,並前進到下一節點。 >3. 處理剩餘節點,若 `a` 或 `b` 尚未走訪完,直接連接至 `tail`。 >4. 回傳排序後的鏈結串列起點(`guard.next`)。 :::info 這裡的實作比較冗長,且課程當中已有提及類似程式,故不在此詳細列出。 詳情請見 [github repo](https://github.com/andrew99154/lab0-c/blob/master/queue.c#L253)! ::: ### q_sort #### top-down merge sort 的問題 * 當遇到 `size=2M` 的 test case,會一直出現程式執行時間過長的提示。 * 主要是因為在分割鏈結串列時,會使用到前面提及的 `q_find_mid`,其時間複雜度為 O(n)。 * 且儲存分割完成的鏈結串列也需要額外的記憶體空間,故改採使用 bottom-up 方式實作。 #### bottom-up merge sort 實作 * 概念很簡單:每次造訪一個節點,當遇到可以合併的(`size` 相同的兩個 `list`),就馬上進行合併。 * 全部的節點走訪完,再把剩下未合併的 `list` 逐一合併。 * 合併可以直接呼叫之前實作的 `merge_two_sorted_list`。 * 不必找 `list` 分割中點,省下了大把時間。 會需要額外使用一個 `part[]` 來存放部份合併完成的 `list`。 `part[]` 有特殊的存放規則:`part[n]` 中若不為 `empty`,就是大小為 $2^n$ 的 `list` (這樣一來,檢查是否存在 `size` 相同的 `list` 就相當方便!) ```c void q_sort(struct list_head *head, bool descend) { struct list_head *list; struct list_head *part[65]; int level; if (!head || list_empty(head) || list_is_singular(head)) return; // break circular head->prev->next = NULL; head->prev = NULL; list = head->next; INIT_LIST_HEAD(head); // init part[] memset(part, 0, sizeof(part)); while (list) { struct list_head *curr = list; struct list_head *next = list->next; curr->prev = NULL; curr->next = NULL; list = next; // the number of nodes in part[] will always be 0 or 2^level // [1, 2, 4, 8, 16...] or [0, 2, 0, 0, 16] are acceptable // depends on level (idx) // keep merging until unable to merge for (level = 0; part[level]; level++) { curr = merge_two_sorted_list(descend, part[level], curr); part[level] = NULL; } part[level] = curr; } list = NULL; for (level = 0; level < (int) (sizeof(part) / sizeof(part[0])); level++) { if (!part[level]) continue; list = merge_two_sorted_list(descend, part[level], list); } // reconstruct list while (list) { struct list_head *next = list->next; list_add_tail(list, head); list = next; } } ``` :::info 此處排序時是使用 `strcmp` ,所以節點是依照字典序 ([Lexicographic order](https://en.wikipedia.org/wiki/Lexicographic_order)) 排列。 e.g. 如果對 `[17, 2, 189, 38]` 進行字典序排列: 直覺上的數字由小到大排列結果,會是 `[2, 17, 38, 189]` ( X ) 但實際的字典序排列結果為: `[17, 189, 2, 38]` ( O ) ::: ### q_ascend, q_descend 本函式將會維護一個單調嚴格遞增(or 遞減)的 `list`。 在非鏈結串列的情境下,可以透過 `stack` 達成: * 每次插入新數值到 `list` 時,和 `stack` 頂部的節點比大小 * 若 `stack` 頂部節點會違反遞增 or 遞減的條件,則持續 `pop stack` 直到符合條件為止 事實上,即使不使用 `stack`,也可以在 doubly linked list 上實作這個函式。 透過維護一個 `*tail`,其實際效用和 `stack.top()` 類似,代表維護的單調序列的最後一個節點位置。 因兩者實作幾乎相同,僅附上 `q_ascend` 作為參考: ```c int q_ascend(struct list_head *head) { // https://leetcode.com/problems/remove-nodes-from-linked-list/ if (!head || list_empty(head)) return 0; if (list_is_singular(head)) return 1; struct list_head *tail = head->next, *curr, *tmp; list_for_each (curr, head) { while (tail != head && strcmp(list_entry(curr, element_t, list)->value, list_entry(tail, element_t, list)->value) < 0) { tmp = tail->prev; list_del(tail); q_release_element(list_entry(tail, element_t, list)); tail = tmp; } tail = curr; } return q_size(head); } ``` ### q_merge #### 實作步驟 >1. 檢查 `head` 是否為 `NULL` or `empty`,若是則直接回傳 0 >2. 初始化變數,設定 `curr`(當前 queue),`result`(合併後的結果) >3. 使用 `list_for_each_entry` 走訪 `head` 中的每個 `queue_context_t`,進行合併 >a. 斷開 `curr->q`(原為 circular list),確保 `merge_two_sorted_list()` 正常運作 >b. 將 `curr->q` 併入 `result`,維持 `result` 為當前已排序的結果 >c. `curr->q->next = curr->q;` 清除 `q` 之中已經排序完成的節點 >4. 將合併後的結果存入第一個 `queue` >a. 取 `head->next` 作為 `ans_entry`,初始化 `ans_entry->q` >b. 使用 `list_add_tail` 逐一將`result` 中的節點加入 `ans_entry->q`,重建合併完成的 queue >5. 回傳合併後的 `q_size(ans_entry->q)` ``` c int q_merge(struct list_head *head, bool descend) { // https://leetcode.com/problems/merge-k-sorted-lists/ if (!head || list_empty(head)) return 0; queue_contex_t *curr = NULL, *ans_entry; struct list_head *result = NULL; list_for_each_entry (curr, head, chain) { curr->q->prev->next = NULL; result = merge_two_sorted_list(descend, result, curr->q->next); curr->q->next = curr->q; } ans_entry = list_entry(head->next, queue_contex_t, chain); INIT_LIST_HEAD(ans_entry->q); while (result) { struct list_head *next = result->next; list_add_tail(result, ans_entry->q); result = next; } return q_size(ans_entry->q); } ``` ## 研讀 Linux 核心原始碼 list_sort.c ### 引入 `list_sort.c` > 以 2023 年的作業觀摩中 [Risheng1128](https://hackmd.io/@wanghanchi/linux2023-lab0#%E5%B0%87-list_sortc-%E5%8A%A0%E5%85%A5%E5%B0%88%E6%A1%88) 同學的實作方式為參考 首先,加入 `list_sort.c` 和 `list_sort.h` 到專案中。 * 在 linux kernel 中實作的版本,會傳入 `void *priv` 參數,而在 `lab0-c` 中都用不到,可以直接拿掉 * 需要使 `cmp` 函式有接收 `descend` 參數的功能,原先想要直接偷 `queue.c` 裡之前做的來用;但因 `queue.h` 不允許修改,所以直接複製一份 `q_cmp` 到 `list_sort.c` 中 更動範例如下(僅舉例,其他省略,請參閱 repo): ```diff - static struct list_head *merge(void *priv, list_cmp_func_t cmp, - struct list_head *a, struct list_head *b) + static struct list_head *merge(struct list_head *a, struct list_head *b, int descend) ``` 為了後續測試時切換方便,在 qtest.c 的 console_init() 中加入 `list_sort` param,讓使用者可以直接指定要使用的排序演算法。 ```c add_param("listsort", &use_list_sort, "use linux kernel style sorting algorithm from lib/list_sort.c", NULL); ``` 在 `do_sort()` 實作的地方,依據 `use_list_sort`(option 會改變的變數),去判斷當前應該使用哪個排序演算法: ```c if (current && exception_setup(true)){ if (use_list_sort) list_sort(current->q, descend); else q_sort(current->q, descend); } ``` ### 與我自己的 `q_sort()` 進行效能比較 首先,在 `trace/` 目錄下新增 `trace-sort.cmd`: (根據需要,可以自行選擇要測試的 sorting 模式、資料量等等) ``` option listsort 1 new ih RAND 1000000 time sort time ``` 上方將會選用 lib/list_sort.c 中的演算法,執行 `1000000` 次插入操作後,進行排序與計時。 因為要測試的數據量比較大,會觸發到 harness.c 中定義的 alarm 觸發時間,而造成程式直接中斷;為了測試方便,先給一個比較寬容的 `time_limit`。 ```c // harness.c static int time_limit = 10; ``` 接下來,在 `script/` 目錄下新增用來統計排序執行時間的 `test_sort_perf.py`: ```python import subprocess import re NUM_RUNS = 10 delta_times = [] for i in range(NUM_RUNS): result = subprocess.run(["./qtest", "-f", "traces/trace-sort.cmd"], capture_output=True, text=True) output = result.stdout matches = re.findall(r"Delta time\s*=\s*([\d\.]+)", output) if len(matches) < 2: print(f"{i+1}th test: No Delta time finded.") continue delta_time = float(matches[1]) delta_times.append(delta_time) print(f"{i+1}th test: Delta time = {delta_time}") if delta_times: avg = sum(delta_times) / len(delta_times) print(f"\naverage Delta time of {len(delta_times)} of sorting: {avg}") else: print("No Delta time finded.") ``` 這段 script 將會連續執行 `NUM_RUNS` 次(預設為`10`)排序,並用 list 紀錄下每次排序花費的時間,最終對所有秒數取平均作為結果。 #### 測試結果 <div style="display: flex;"> <div style="margin-right: 20px;"> ##### 1M 筆資料 | Round | q_sort | list_sort | | -------- | -------- | -------- | | 1 | 2.722 | 2.582 | | 2 | 2.74 | 2.614 | | 3 | 2.629 | 2.459 | | 4 | 2.66 | 2.447 | | 5 | 2.741 | 2.386 | | 6 | 2.605 | 2.497 | | 7 | 2.782 | 2.471 | | 8 | 2.762 | 2.443 | | 9 | 2.582 | 2.396 | | 10 | 2.592 | 2.393 | | **AVG** | **2.6815** | **2.4688** | </div> <div> ##### 2M 筆資料 | Round | q_sort | list_sort | | -------- | -------- | -------- | | 1 | 6.075 | 5.567 | | 2 | 5.812 | 5.564 | | 3 | 5.84 | 5.724 | | 4 | 5.978 | 5.823 | | 5 | 5.845 | 5.494 | | 6 | 6.351 | 5.647 | | 7 | 6.15 | 5.672 | | 8 | 5.817 | 5.565 | | 9 | 6.18 | 5.727 | | 10 | 5.76 | 5.736 | | **AVG** | **5.98** | **5.652** | </div> </div> 可以觀察到: * 在資料量 1M 時,list_sort 比我的 q_sort 快大約 `0.22` 秒 * 在資料量 2M 時,list_sort 比我的 q_sort 快大約 `0.33` 秒 ## 解釋 select 系統呼叫於本程式的使用 ### console.c 處理 web 指令 當使用者輸入 command `web` 之後,將會建立 server socket,並使 `linenoise.c` 裡面的 `eventmux_callback` 的 function pointer 指向 `web_eventmux()`。 ```c static bool do_web(int argc, char *argv[]) { // init... (ignore) web_fd = web_open(port); if (web_fd > 0) { printf("listen on port %d, fd is %d\n", port, web_fd); line_set_eventmux_callback(web_eventmux); use_linenoise = false; } else { // handle error } return true; } ``` ### linenoise.c 中的 `line_edit()` 在 `line_edit()` 中使用 `while` 讀取輸入的地方,可以看到幾行程式碼: ```c while (1) { // init...(ignore) if (eventmux_callback != NULL) { int result = eventmux_callback(l.buf); if (result != 0) return result; } // handle input from command line...(ignore) } ``` `eventmux_callback` 就是先前在 command line 執行 `web` 所設置的 function pointer,可以用來 handle 來自 web server 的命令。 ### web.c 中的 `web_eventmux()` 這裡是主要使用系統呼叫 `select()` 的地方。`select()` 採用了 I/O Multiplexing Model,參考下圖: ![image](https://hackmd.io/_uploads/Sy_FAmvskx.png) `web_eventmux()` 中同時監聽來自 web server 和 stdin 的 file descriptor,任何一方觸發都會使程式繼續執行下去。 本函式主要 handle 來自 web server 的事件;因此,若是 stdin 觸發監聽,將不會進行對命令的處理,而是回到 linenoise.c 中的 `line_edit()` 才處理。 ```c int web_eventmux(char *buf) { fd_set listenset; FD_ZERO(&listenset); FD_SET(STDIN_FILENO, &listenset); int max_fd = STDIN_FILENO; if (server_fd > 0) { FD_SET(server_fd, &listenset); max_fd = max_fd > server_fd ? max_fd : server_fd; } int result = select(max_fd + 1, &listenset, NULL, NULL, NULL); if (result < 0) return -1; if (server_fd > 0 && FD_ISSET(server_fd, &listenset)) { //handle cmd from web server...(ignore) return strlen(buf); } FD_CLR(STDIN_FILENO, &listenset); return 0; } ``` ### 運用 RIO 套件的考量點 RIO 全名為 Robust I/O,根據[卡內基美隆大學的課程](https://(https://scs.hosted.panopto.com/Panopto/Pages/Viewer.aspx?id=f107c2ce-79d5-4529-baeb-2bb495d8c11a))中的介紹,RIO 可以有效解決 short count 的問題。除此之外,因為每次呼叫 `read()` 都需要從 user mode 切換到 kernel mode,如果頻繁呼叫會大幅降低效能。 RIO 套件的做法是:當系統呼叫 `read()` 時,一次性讀取一大塊資料到 user space buffer 中 (同時處理 short count),後續再多次從 buffer 取用即可;當讀到 buffer 為空時,才再次系統呼叫。 #### `rio_t` 結構 其中 `count` 代表著 buffer 中尚未被取用的資料長度。 所以 `count <= 0` 時,代表 buffer 已經空了,需要再次 `read()`。 ```c typedef struct { int fd; /* descriptor for this buf */ int count; /* unread byte in this buf */ char *bufptr; /* next unread byte in this buf */ char buf[BUFSIZE]; /* internal buffer */ } rio_t; ``` #### `rio_read()` ```c static ssize_t rio_read(rio_t *rp, char *usrbuf, size_t n) { int cnt; while (rp->count <= 0) { /* refill if buf is empty */ rp->count = read(rp->fd, rp->buf, sizeof(rp->buf)); if (rp->count < 0) { if (errno != EINTR) /* interrupted by sig handler return */ return -1; } else if (rp->count == 0) { /* EOF */ return 0; } else rp->bufptr = rp->buf; /* reset buffer ptr */ } /* Copy min(n, rp->count) bytes from internal buf to user buf */ // 處理 short count cnt = n; if (rp->count < n) cnt = rp->count; memcpy(usrbuf, rp->bufptr, cnt); rp->bufptr += cnt; rp->count -= cnt; return cnt; } ``` #### console.c 中很酷的 `buf_stack` 一開始讀到以下程式碼,覺得很困惑,什麼情景下會用到這個? ```c static rio_t *buf_stack; // /* Create new buffer for named file. * Name == NULL for stdin. * Return true if successful. */ static bool push_file(char *fname) { int fd = fname ? open(fname, O_RDONLY) : STDIN_FILENO; has_infile = fname ? true : false; if (fd < 0) return false; if (fd > fd_max) fd_max = fd; rio_t *rnew = malloc_or_fail(sizeof(rio_t), "push_file"); rnew->fd = fd; rnew->count = 0; rnew->bufptr = rnew->buf; rnew->prev = buf_stack; buf_stack = rnew; return true; } /* Pop a file buffer from stack */ static void pop_file() { if (buf_stack) { rio_t *rsave = buf_stack; buf_stack = rsave->prev; close(rsave->fd); free_block(rsave, sizeof(rio_t)); } } ``` 後來找到使用了 `push_file()` 的地方: ```c static bool do_source(int argc, char *argv[]) { // error hadling...(ignore) if (!push_file(argv[1])) { report(1, "Could not open source file '%s'", argv[1]); return false; } return true; } ``` 發現原來是執行 `source` 的時候會用到,是為了處理 nested `source` commands 的情況。 舉例來說,以下有三個 file,將執行一些命令: ##### fileA ```shell > source fileB > it 6 ``` ##### fileB ```shell > source fileC > it 3 ``` ##### fileC ```shell > new > it 8 ``` ##### result ```c q = [8, 3, 6] ``` 如果使用者執行 `./qtest` 後輸入 `source fileA`,就會造成遞迴式的開檔、執行,而先前宣告的 `buf_stack` 就是為了因應這種情況而存在。 每當讀到 `source`,代表需要開新的檔案;新增、初始化新檔案對應的 `rio_t` 之後,將其 `push` 到 `buf_stack` 中;直到檔案內容被讀完就 `pop_file()`,就會回到先前讀到一半的檔案狀態。 可以對應到 console.c 中 `readline()` 的其中一段程式碼: (系統呼叫 `read()` 之後,發現沒有剩餘的可讀資料,代表 EOF) ```c if (buf_stack->count <= 0) { /* Need to read from input file */ buf_stack->count = read(buf_stack->fd, buf_stack->buf, RIO_BUFSIZE); buf_stack->bufptr = buf_stack->buf; if (buf_stack->count <= 0) { /* Encountered EOF */ pop_file(); if (cnt > 0) { /* Last line of file did not terminate with newline. */ /* Terminate line & return it */ *lptr++ = '\n'; *lptr++ = '\0'; if (echo) { report_noreturn(1, prompt); report_noreturn(1, linebuf); } return linebuf; } return NULL; } } ``` ### 可能的改善方向 當我在閱讀這段 code 的時候,覺得有些不直覺。目前的實作會在 `line_edit()` 裡面同時檢查來自 web server 的 request。 在 OOP 中有所謂的 [SOLID 原則](https://en.wikipedia.org/wiki/SOLID)(即便 C 並非 OO),其中的[單一職責原則 (Single-resbonsibility principle)](https://en.wikipedia.org/wiki/Single-responsibility_principle) 提到,函式應將關注點分離,每個函式只專注做一件事情。 因此,`line_edit()` 是否應該專注於處理來自 command line 的命令? 又剛好看到老師老師撰寫的[並行程式設計:排程器](https://hackmd.io/@sysprog/concurrency-sched),裡面的 `event_loop()` 實作給了我一些啟發: ![image](https://hackmd.io/_uploads/B1Z4iFOsJx.png) 我想可以參考這樣的函式呼叫方式,但不以 coroutine 的方式實作。 是否有可能仿照,在 `main_loop()` 中僅去呼叫 `select()`;當 web server 或 command line 事件被觸發時,才去呼叫對應的 handling 函式? ## 實作 shuffle 命令 在 `queue.c` 裡新增了 `q_shuffle`,實作參考的是 Fisher–Yates shuffle 演算法。 考量到原演算法中有多次存取陣列內容、swap 操作,而我需要對 doubly linked list 進行 `shuffle`;在不以其他資料結構加速存取過程的狀況下,查找特定節點的時間複雜度為 `O(n)`。 所以實作時佔以一個 `struct list_head **arr` 存下各節點的位置,待 `shuffle` 操作執行完畢後,再逐一將節點串起來。 ### q_shuffle 實作 ```c void q_shuffle(struct list_head *head) { if (!head || list_empty(head) || list_is_singular(head)) return; int len = q_size(head), i = 0; struct list_head **arr = malloc(len * sizeof(struct list_head *)); struct list_head *curr, *tmp; list_for_each (curr, head) { arr[i++] = curr; } // pick a node randomly and swap for (i = len - 1; i > 0; i--) { int j = rand() % (i + 1); tmp = arr[i]; arr[i] = arr[j]; arr[j] = tmp; } // reconstruct the doubly linked list for (i = 0; i + 1 < len; i++) { arr[i]->next = arr[i + 1]; arr[i + 1]->prev = arr[i]; } head->next = arr[0]; arr[0]->prev = head; head->prev = arr[len - 1]; arr[len - 1]->next = head; free(arr); } ``` ### 以 Chi-squared test 檢測「亂度」 為了要檢測每次的 `shuffle` 是否足夠隨機,可以套用假設檢定的技巧來判斷。 > 所謂「夠亂」,可以視為在做了足夠多次 shuffle 後,每組排列組合出現的機率相等(Uniform Distribution)。 為此,可以使用卡方檢定。該檢定可以用於判斷兩機率分佈是否相似。基於作業說明中提供的測試用 script,進行 `shuffle` 操作 `1000000` 次,得到各排列組合的次數分佈如下: ![image](https://hackmd.io/_uploads/HkTRM8mjJx.png) #### Chi-sqaured value 公式 ```python def chiSquared(observation, expectation): return ((observation - expectation) ** 2) / expectation ``` #### 實驗數據 自由度:`23`(`4` 個數字共有 `4! = 24` 種排列組合,自由度 = `N-1`) 期望頻率:`1000000 / 24 = 41666` Chi-squared value sum: `24.662986607785726` p-value: `0.3678926504294987` 因 p-value 大於顯著水準 (alpha = 0.05),無法拒絕 $H_0$ ($H_0$:shuffle 結果之分佈**不為** uniform distribution) | 排列組合 | 出現頻率 | Chi-squared value | | -------- | -------- | -------- | | 1234 | 41488 | 0.7604281668506696 | | 1243 | 42096 | 4.437671002736044 | | 1324 | 41581 | 0.17340277444439112 | | 1342 | 41835 | 0.6854749675994816 | | 1423 | 41853 | 0.8392694283108529 | | 1432 | 41970 | 2.218019488311813 | | 2134 | 41631 | 0.02940047040752652 | | 2143 | 41550 | 0.322949167186675 | | 2314 | 41584 | 0.16137858205731292 | | 2341 | 41872 | 1.018480295684731 | | 2413 | 41318 | 2.906542504680075 | | 2431 | 41879 | 1.0888734219747516 | | 3124 | 41698 | 0.024576393222291555 | | 3142 | 41459 | 1.0283924542792684 | | 3214 | 41558 | 0.2799404790476648 | | 3241 | 41873 | 1.0283924542792684 | | 3412 | 41534 | 0.41818269092305477 | | 3421 | 41842 | 0.7434358949743196 | | 4123 | 41823 | 0.5915854653674458 | | 4132 | 41516 | 0.5400086401382422 | | 4213 | 41787 | 0.35138962223395576 | | 4231 | 41521 | 0.5046080737291797 | | 4312 | 41429 | 1.3480775692411078 | | 4321 | 41303 | 3.1625066001056017 | ### 改善作業說明所提供的 test script 作業說明中的 script 存在小缺陷,導致執行效率極差,問題出在模擬輸入 command 的地方: ```python test_count = 1000000 input = "new\nit 1\nit 2\nit 3\nit 4\n" for i in range(test_count): input += "shuffle\n" input += "free\nquit\n" ``` 在 python 中的字串處理,直接使用 `+=` 進行操作,實際上是重複 allocate 新記憶體空間、複製舊字串、貼上新字串;時間複雜度可達到 $O(n^2)$。 用以下方式改善: ```python input_commands = ["new\n", "it 1\n", "it 2\n", "it 3\n", "it 4\n"] input_commands.extend(["shuffle\n"] * test_count) input_commands.append("free\nquit\n") input = "".join(input_commands) ``` 先對 `list` 進行新增重複性高的 command substring,再一次性使用 `join()` 建立字串,可以省下不必要的 allocate 操作。 ## 研讀論文〈Dude, is my code constant time?〉 :::info 原論文:[Dude, is my code constant time?](https://eprint.iacr.org/2016/1123.pdf) Dudect 工具:https://github.com/oreparaz/dudect ::: 作者提出了一種使用統計學上「假設檢定」的方式,來檢測函式是否為常數時間複雜度。可以大致總結為以下步驟: 1. **測量函式執行時間** 首先將 input data 分為兩大類:`fix class`, `random class` - `fix class` 的資料內容固定,且可以在常數執行時間完成 - `random class` 為隨機產生 把兩類資料餵給欲測試的函式,並分類記錄下每次函式執行所花的時間。 (在實作中,執行時間使用 `cpu_cycles()` 測量) 2. **資料後處理 (post-processing)** 測量結果常會受到外部環境影響(如 OS interrupt),而出現異常極端值。因此,對數據後處理可以提升分析的精準度。 在[原專案](https://github.com/oreparaz/dudect)中,會給定一系列的裁切比例,將測量值較高的部分數據丟棄。 3. **套用假設檢定** 使用 Welch’s t-test,檢測剛剛獲得的兩類資料的執行時間分佈是否相似。如果 `random class` 和 `fix class` 的執行時間分布不同,則可以合理推斷該函式「非」常數時間複雜度。 ### 原專案的實作步驟 1. 統計一函式的執行時間 - 使用 `cpu_cycles()` 取得 `before_ticks` 和 `after_ticks` - `exec_time = after_ticks - before_ticks` 3. 用測得的時間資料,透過 `t_push()` 在線更新 `mean`, `m2`, `n` 4. 呼叫 `t_compute()` 透過 `mean`, `m2`, `n` 計算 `t_value` 5. 使用 `t_value` 判斷兩個 `class` 的分佈是否存在顯著差異,若 `abs(t_value)` 高於一定 threshold,則可合理判斷兩者分佈不同 #### t_push 每次計算完 `exec_time` 都會將其加入統計資料中。 此時需要使用線上更新的方式,對 `mean`, `m2`, `n` 做更新。 ##### online 數值更新公式 ###### n 更新 $$n \leftarrow n+1$$ ###### delta 更新 $$\delta \leftarrow x-\bar{x}_{old}$$ ###### 平均數 mean 更新 $$ \bar{x}_{new} \leftarrow \bar{x}_{old}+\frac{\delta}{n}$$ ###### 累積變異數 $M_2$ 更新 $$M_{2,new} \leftarrow M_{2,old}+\delta(x-\bar{x}_{new})$$ ```c void t_push(t_context_t *ctx, double x, uint8_t class) { assert(class == 0 || class == 1); ctx->n[class]++; /* Welford method for computing online variance * in a numerically stable way. */ double delta = x - ctx->mean[class]; ctx->mean[class] = ctx->mean[class] + delta / ctx->n[class]; ctx->m2[class] = ctx->m2[class] + delta * (x - ctx->mean[class]); } ``` #### t_compute 利用先前統計資料計算 t-value,公式如下: $$t=\frac{\bar{X}_0-\bar{X}_1}{\sqrt{\frac{{s_0}^2}{N_0}-\frac{{s_1}^2}{N_1}}}$$ $S_0$, $S_1$ 可以先前得到的累積變異數 $M_2$ 求得: $$s^2 = \frac{M_2}{n-1}$$ ```c double t_compute(t_context_t *ctx) { double var[2] = {0.0, 0.0}; var[0] = ctx->m2[0] / (ctx->n[0] - 1); var[1] = ctx->m2[1] / (ctx->n[1] - 1); double num = (ctx->mean[0] - ctx->mean[1]); double den = sqrt(var[0] / ctx->n[0] + var[1] / ctx->n[1]); double t_value = num / den; return t_value; } ``` ### `lab0-c` 中的 `dudect` 現存問題 #### 本機端與 github runner 的測試結果不同 > 此節屬於個人猜測,真正的原因仍須更完整的驗證 在本機端執行 `make test` 時,都能全部通過(`100/100`)。但推送到 github 時,會發生最後一項 test complexity 無法通過的狀況(`95/100`)。 (甚至會發生 `insert_tail` 通過,但 `insert_head` 沒有通過的奇怪現象) 我認為可能是本地端的執行環境和 github runner 環境不同所造成,在虛擬環境下的 `exec_times` 資料可能包含更多雜訊(如 OS interrupt 等),測量結果不準確。 #### 缺乏 post-processing 的 `percentile()` 實作 根據作業說明,在 `lab0-c` 未有完整實作 `percentile` 功能。 在計算 `t-value` 時丟棄部份 `exec_time` 極端值,可以更精確的確認 `random class` 和 `fix class` 的分佈是否相似。 - 解決方式:給定多筆指定的 `percentile`,並同時維護不同 `percentile` 下的 `t-value`。這些 `t-values` 將用於進一步判斷函式是否為常數時間複雜度。 - e.g. 給定 $100$ 筆 `percentile`,會需要維護 $100+1$ 個 `t-value` (加上未經極端值處理的 raw data) :::info 心中未解的疑惑: 目前的實作是對所有 `t-values` 取最大值,但這樣是否有助於改善極端值所帶來的影響?極端值仍會影響到那組未 crop 的 raw data;而該組 `t-value` 也可以去影響到 `max t-value` 的計算結果。 在 lab0-c 裡引入 percentile 實作後,可以順利通過全部的測試 `(100/100)`。 但為何可以成功改善,我還搞不懂,仍須進一步了解。 :::