# 2024q1 Homework1 (lab0) contributed by < [david965154](https://github.com/david965154) > :::danger 注意作業規範。留意各式細節,唯有重視小處並步步為營,方可挑戰原始程式碼規模超越三千七百萬行的 Linux 核心。 ::: ## 開發環境 環境設置: 裝置:Macbook air m1 虛擬機器:lima :::danger virtual machine 的翻譯是「虛擬機器」,而非虛擬「機」,後者可會跟 engine (機) 混淆。 ::: 當在輸入命令 `git commit -a` 時,遇到以下問題 :::danger 改進你的漢語表達。 ::: ```shell $ git commit -a Following files need to be cleaned up: queue.c queue.c:46:50: style: Parameter 's' can be declared as pointer to const [constParameterPointer] bool q_insert_head(struct list_head *head, char *s) ^ queue.c:64:50: style: Parameter 's' can be declared as pointer to const [constParameterPointer] bool q_insert_tail(struct list_head *head, char *s) ^ log2_lshift16.h:15:1: information: ValueFlow analysis is limited in log2_lshift16. Use --check-level=exhaustive if full analysis is wanted. [checkLevelNormal] { ^ Fail to pass static analysis. ``` 感謝 [@25077677](https://hackmd.io/@25077667/SkpLspWnT) 找到原因並著手解決 請參考 [Inconsistent Cppcheck Version Leads to Differing Default Enabled Checking Flags #153](https://github.com/sysprog21/lab0-c/issues/153) 第一項為 `cppcheck` 判斷 `const pointer checking` 時因為 `*s` 在函式內不會被改動,所以認為他需要加上 `const` ,不過因為 `queue.h` 原先就沒加 `const` 因此無法添加。 第二項為指出在 `log2_lshift16` 中僅進行了有限的值流分析。如果需要進行完整的分析,請使用 --check-level=exhaustive 參數。 但我這邊不進行修改,而是直接將這兩項添加進 `./script/pre-commit.hook` 的 `CPPCHECK_suppresses` ```diff CPPCHECK_suppresses="--inline-suppr harness.c \ --suppress=missingIncludeSystem \ --suppress=noValidConfiguration \ --suppress=unusedFunction \ --suppress=identicalInnerCondition:log2_lshift16.h \ --suppress=nullPointerRedundantCheck:report.c \ --suppress=nullPointerRedundantCheck:harness.c \ --suppress=nullPointer:queue.c \ --suppress=nullPointer:qtest.c \ --suppress=returnDanglingLifetime:report.c \ --suppress=constParameterCallback:console.c \ --suppress=constParameterPointer:console.c \ + --suppress=checkLevelNormal:log2_lshift16.h \ + --suppress=constParameterPointer:queue.c" ``` 在實作之前,我先花了幾天時間閱讀第一週教材,[2024q1 Homework1 (教材及作業描述,啟發和疑惑)](https://hackmd.io/@david96514/SytkNfvnp) 裡面有我自己的見解及筆記還有疑問,本次實作我本來以指標的指標來實作,不過因常常誤用,且已有巨集可使用,有點太過多餘,因此我重新寫了一般最直覺的指標版本。 ### 一、實作 `queue.c` 內部功能 首先,來觀察 `struct` `element_t` 及 `list_head` ```c /** * element_t - Linked list element * @value: pointer to array holding string * @list: node of a doubly-linked list * * @value needs to be explicitly allocated and freed */ typedef struct { char *value; struct list_head list; } element_t; /** * struct list_head - Head and node of a doubly-linked list * @prev: pointer to the previous node in the list * @next: pointer to the next node in the list */ struct list_head { struct list_head *prev; struct list_head *next; }; ``` `struct list_head` 的成員 `struct list_head *prev` 及 `struct list_head *next` 皆為 `pointer to struct list_head` ,因此無法直接存取到 `element_t` 內的成員 `char *value` 而從 `list.h` 的 `list_head` 可以看到以下 > The list nodes are usually embedded in a container structure which holds the actual data. Such container structure is called entry. The helper list_entry can be used to calculate the structure address from the address of the node. 這段說明 `struct list_head` 實際上嵌在存有資料 `char *value` 的`container structure` 內,而 `container structure` 被稱為 `entry` , `list.h` 內的 macro `list_entry` 可以透過 `struct list_head` 計算 `entry` 之記憶體位置而可以存取 `char *value` ; #### `q_new` :::danger 避免非必要的項目縮排 (即 `* ` 和 `- `),以清晰、明確,且流暢的漢語書寫。 ::: 建立新的「空」佇列; 新的空佇列的指標 `next` 及 `prev` 需指向自身; ```c struct list_head *head = malloc(sizeof(struct list_head)); ``` 再透過檢查是否配置成功後,將 `next` 及 `prev` 需指向自身,再行回傳; #### `q_free` 釋放佇列所佔用的記憶體; 佇列由節點互相鏈結組成,因此須先釋放節點所佔用的記憶體空間; ```c typedef struct { char *value; struct list_head list; } element_t; ``` 節點 `element_t` 內元素 `value` 為 `pointer to char` , 使用 `free` 釋放 `element_t` 並不能同時釋放 `value` 所指向記憶體位置的空間。此時有兩種情況以決定是否需要釋放記憶體: 1. 不需要:直接讓他指向一串字串 2. 需要:先配置一段記憶體空間,再將字串放入, `value` 再指向它。 此項實作後者,因此需要先釋放 `value` 所指向的記憶體空間再釋放 `struct list_head` 所指向的記憶體空間。 1 的原因請參考: [你所不知道的C語言:指標篇](https://hackmd.io/@sysprog/c-pointer#%E4%BD%A0%E6%89%80%E4%B8%8D%E7%9F%A5%E9%81%93%E7%9A%84C%E8%AA%9E%E8%A8%80%EF%BC%9A%E6%8C%87%E6%A8%99%E7%AF%87)內的重新探討「字串」。 2 的原因請參考: [Does freeing structures in C also delete its members? ](https://stackoverflow.com/questions/55937242/does-freeing-structures-in-c-also-delete-its-members) > Every malloc has to be balanced with a corresponding free. > > Your assignment node->data1 = "abc"; assigns a pointer to the read-only literal "abc", so there is no dynamic memory here, and therefore you must not use a free. > > In your particular case, you are able to retain that pointer a having called free on the struct since you did not have to free that memory and it didn't ever belong to the struct. But that does not work in general: if you had used malloc to set node->data1 then (1) you would have to call free on that pointer before you attempt to call free on the struct **方法**: ```c free((list_entry(node,element_t,list)->value)); free(list_entry(node,element_t,list)); ``` 透過 `list.h` 內的 macro `list_for_each_safe` 逐一走訪佇列,此巨集會使用 `pointer to struct list_head` `safe` 指向 `pointer to struct list_head` `node` 的下一個節點,因此可以做移去節點的動作,再依上面程式碼的順序釋放節點所佔用記憶體空間; #### `q_insert_head` 在佇列開頭 (head) 插入 (insert) 給定的新節點 (以 LIFO 準則); 在給定字串的情況下,透過配置記憶體空間來儲存字串,需額外配置 1 個字元的空間給予結尾 `'\0'` ,以免在逐字讀取輸出字串時因為不知道何時停止而輸出別的記憶體位置所儲存之資料。若配置給予字串的記憶體時失敗,需釋放 `element_t` 之空間並返回; ```c list_add(&new_node->list, head); ``` 透過配置新的 `element_t` 記憶體空間並配置存放字串的記憶體空間後,再以 `pointer to element_t` `new_node` 指向 `element_t` 之成員 `list` 去以 `list.h` 內的 `list_add` 新增至佇列頭部; #### `q_insert_tail` 在佇列尾端 (tail) 插入 (insert) 給定的新節點 (以 FIFO 準則); 同 `q_insert_head`; ```c list_add_tail(&new_node->list, head); ``` 類似 `q_insert_head` ,但以 `list.h` 內的 `list_add_tail` 實作; #### `q_remove_head` 移去 (remove) 佇列開頭 (head) 的節點; 移去 (remove) 並非刪去 (delete) ,這裡需將移去之節點返回並將移去節點之字串跟據給定 `bufsize` 複製至 `pointer to char` `*sp` ; ```c struct list_head *return_element = head->next; list_del(return_element); strncpy(sp, (list_entry(return_element, element_t, list)->value), bufsize); ``` 先以一個新建的 `pointer to struct list_head` 指向佇列第一個節點,再利用 `list.h` 內的 macro `list_del` 移去,回傳前先將該節點之成員 `value` 指向的字串跟據給定 `bufsize` 複製到 `sp` ; 測試: `trace-07-string` 說明: # Test of truncated strings 測試會透過指定 `bufsize` 去測試實作是否能指複製指定長度字串,如果 `bufsize` 使用錯誤將造成輸出字串時因為讀取不到 `\0` 而輸出原本不屬於該字串佔用空間之資料; #### `q_remove_tail` 移去 (remove) 佇列尾端 (tail) 的節點; 同 `q_remove_head` ; 類似 `q_remove_head` 但新建立的 `pointer to struct list_head` 指向佇列最後一個節點; #### `q_size` 計算目前佇列的`element_t` 總量; ```c for(; tmp->next != head; tmp = tmp->next, i++); ``` 我這裡沒有使用 `list.h` 內的 `macro` 去計算數量,不過方法是相同的; #### `q_delete_mid` 移走佇列第 ⌊n / 2⌋ 個節點; ```c for (fast = fast->next; fast != head && fast->next != head; fast = fast->next->next, slow = slow->next) ; list_del(slow); free(list_entry(slow, element_t, list)->value); free(list_entry(slow, element_t, list)); ``` 這裡我使用了[你所不知道的 C 語言: linked list 和非連續記憶體]([linked](https://hackmd.io/@sysprog/c-linked-list#))-案例探討: Leetcode 2095. Delete the Middle Node of a Linked List 的快慢指標。 `struct list_head *fast` 一次移動兩個節點,而 `struct list_head *slow` 一次移動一個節點,當 `fast` 抵達末節點時, `slow` 就能剛好在第 ⌊n / 2⌋ 個節點,所需時間為 n/2。 這樣無需使用 `q_size` 計算節點數量,並再次從頭開始向尾移動直到抵達第 ⌊n / 2⌋ 個節點,所需時間為 3n/2 。 也可利用 `doubly-linked list` 的特性,從節點頭尾同時向佇列中間移動直到交錯或是相遇,所需時間為 n/2,不過判斷式數量會變為 2 倍。 #### `q_delete_dup` 在已經排序的狀況,移走佇列中具備重複內容的節點,即只留下原先元素數量就為單一的節點; 在無法配置新空間的狀況下成此功能; 這裡我實作的方法可以輸入未經排序的資料。 ```c list_for_each_safe (node, safe, head) { list_del(node); tmp = struct_list->next; if (struct_list->next != struct_list) { for (int j = 0; j < i; j++) { if (strcmp(list_entry(tmp, element_t, list)->value, list_entry(node, element_t, list)->value) == 0) { free(list_entry(node, element_t, list)->value); free(list_entry(node, element_t, list)); is_single[j] = false; node = NULL; break; } tmp = tmp->next; } } if (node) { list_add_tail(node, struct_list); is_single[i] = true; ++i; } } ``` 原始想法如下 1. 新配置二記憶體空間給予存放字串,以 `rec_str` 及 `rep_str` 代稱 2. 逐一走訪佇列 3. 若節點之成員 `value` 不為 `rec_str` 之子字串 -> 4 ,反之 -> 5 4. 移動節點並串接至 `rec_str` -> 2 5. 若不為 `rep_str` 之子字串,移動節點並串接至 `rep_str` -> 2 此時能知道只有 `rep_str` 內存放重複字串 6. 逐一走訪佇列,只要節點之成員 `value` 為 `rep_str` 之子字串,移去並釋放該節點記憶體 因為每次添加字串需要重新配置記憶體空間,太沒效率又多餘,而且存在重大錯誤: 1. 複製並串接若結尾含有 `\0` ,則每一次在比對時會停在首個添加入的字串,無法比較。 2. 若結尾不含有 `\0` ,若兩個頭尾部分字串結合為另一字串,將會被歸類為 `rec_str` 或 `rep_str` 之子字串。 例:1. `elephant` 2. `estimate` 兩字串串接後為 `elephantestimate` 因此若有字串為: `test`, `ant` 亦或其他子字串,將會產生錯誤。 重新改寫 1. 新配置一佇列給予存放 `struct list_head` `struct_list` ,同時新增一 `bool` `is_single` 陣列儲存新佇列的每個節點是否為重複 2. 逐一走訪佇列 `head` 3. 移去佇列 `head` 節點 `node` 4. 逐一走訪新佇列 `struct_list` ,若節點 `node` 之成員 `value` 與新佇列中節點之字串相同則移去並釋放該節點 `node` 的記憶體,並標示新佇列對應到的節點之 `is_single` 為重複 `false` 5. 若節點 `node` 仍存在 (即在步驟 4 中未被移除) ,則將節點 `node` 新增至 `struct_list` 尾部,並標示對應為`is_single` 為 `true` -> 2 6. 逐一走訪新佇列,只要對應 `is_single` 為 `true` ,則加入原始佇列 `head` 之尾部,否則移去並釋放該節點 這樣的實作,需要每一個 `head` 節點與新佇列中的現有節點逐個比對,不僅不符合 `linux` 風格的寫法,消耗的資源也很多。 #### `q_swap` 每兩個節點為一組,組內互換,即 `q_reverseK` K=2; 不過相較於 `q_reverseK` ( $(n/k)*k*k = 2n$ ),一個專屬於 `q_reverse2` 的實作能達到更少的判斷式 ( $n/2$+(1 if odd, 0 if even )),因此我選擇實作後者。 在無法配置新空間的狀況下成此功能; ```c while (size >= 2) { list_del(cur_node); first_node = cur_node; cur_node = head->next; list_move_tail(cur_node, head); list_add_tail(first_node, head); size -= 2; cur_node = head->next; } ``` 這裡以每兩個節點為單位,先將首個節點取出,再將第二個節點移至尾部,再將取出的首個節點加入佇列尾部,若為佇列節點為奇數個就只需在最後將其移至尾部即可。如此循環可以達成此功能,且只需一次逐一走訪。 #### `q_reverse` 以反向順序重新排列鏈結串列; 在無法配置新空間的狀況下成此功能; ```c while (cur_node != head) { struct list_head *move_node = cur_node; cur_node = cur_node->next; list_move(move_node, head); } ``` 這裡我用一個 `struct list_head` `*move_node` 從第二個節點向尾部遞進,並不斷地將 `move_node` 所在節點移往頭部。 過程將為: 1. 1 2 3 4 5 `move_node` 在第二個節點 將2移往頭部 2. 2 1 3 4 5 `move_node` 在第三個節點 將3移往頭部 3. 3 2 1 4 5 `move_node` 在第四個節點 將4移往頭部 4. 4 3 2 1 5 `move_node` 在第五個節點 將5移往頭部 5. 5 4 3 2 1 #### `q_reverseK` 指定每 k 個節點為一組,進行反向順序排列,剩餘數量不足K則維持原順序; 在無法配置新空間的狀況下成此功能; ```c for (int time = 0; time < size / k; time++) { cur_node = head->next->next; while (i < k - 1) { move_node = cur_node; cur_node = cur_node->next; list_move(move_node, head); ++i; } ++i; while (i > 0) { move_node = head->next; list_move_tail(move_node, head); --i; } } ``` 這裡無法如法炮製 `q_swap` ,取出前 `k-1` 個再依反向放回,因為需要以多個指標去指向這些取出的節點以免造成 `memory leak` ,所以我採用了 `q_reverse` 的技巧,將k-1個節點移向頭部,再將k個節點向尾部移,重複做 `⌊q_size/k⌋` 次,最後將 `q_size%k` 個節點往尾部移,即完成此功能; #### `q_sort` 以遞增或遞減順序來排序鏈結串列的節點; 在無法配置新空間的狀況下成此功能,且時間複雜度需在 $n\log_2n$ 內; 在這裡因為執行上時間與空間的佔用上的差異,我不想使用遞迴來實作 參考 [Iteration Vs Recursion](https://www.linkedin.com/pulse/iteration-vs-recursion-ulugbek-norbutaev) **方法**: 這個功能花了我最多時間,所以我想花多一點時間在討論這裡,原先想試試看以 `binary tree` 實作,只不過若要以 `circular doubly-linked list` 來實作對我來說會非常困難,因為 `binary tree` 的 `leaf` 不該再與其他節點鏈接,因此沒有繼續這個方法,因此使用 `merge sort` ,而 `merge sort` 理應需要額外空間 `n` 個空間儲存,不過受益於 `circular doubly-linked list` 的特性,一取出直接放回尾端,如此一來,在未排序完成前,佇列必定會有至少兩組未排序的節點組,這裡我並未實作每次合併時選擇頭尾,以達成平均下來皆以類似大小的未排序的節點組進行合併以避免合併速度變慢的缺點,詳見[你所不知道的 C 語言: linked list 和非連續記憶體]([linked](https://hackmd.io/@sysprog/c-linked-list#))裡的案例探討: LeetCode 21. Merge Two Sorted Lists。 原先我的想法是透過第一輪以步長 1 ,第一輪為第一節點不斷與第二節點排序合併並置於尾部,第二輪以步長乘以 2 合併,因此為第一、二節點不斷與第三、四節點排序合併並置於尾部,直到步長超出 n/2(如:四組需合併 4 2 3 1 -> 兩組需合併 2, 4 及 1, 3 -> 剩一組 1, 2, 3, 4),不過當排序數量不為 $2^n$ 時,將會造成需合併的節點組數等同於 n 拆分成多個 2 的冪組合,以 1000 為例:會產生 512, 256, 128, 64, 32, 8的已排序數組(如:第一組已排序: 1, 2, ..., 512 第二組已排序: 1, 2, ..., 256 第三組已排序: 1, 2, ..., 128 第四組已排序: 1, 2, ..., 64 第五組已排序: 1, 2, ..., 32 第六組已排序: 1, 2, ..., 8 ),但這 6 組仍需進行合併,因此我想到以 `while` 迴圈,不斷計算合併兩段需合併節點組的大小 (方法是只要下個節點小於此節點則停止,因此第一輪會是節點組長度 512 與 256 合併) 再行合併,卻發現若加上這個想法,這段實作將變得笨重又冗餘,因此我將前面移動步數的迴圈移除,直接變成不經其他考慮的 Timsort ,如此一來合併大小不會侷限在 $2^n$ ,且最差時間會在 $O(n\log_2n)$ 即節點為由大到小排序,最佳時間會在 $O(n)$ 即節點為由小到大排序,因為其迴圈結束條件為兩節點組數總和為 q_size ; ```c for (; merge_list_2 != head && strcmp( list_entry(merge_list_2, element_t, list)->value, list_entry(merge_list_2->next, element_t, list)->value) <= 0;) { merge_list_2 = merge_list_2->next; len++; } tmp = merge_list_2->next; for (; tmp != head && tmp->next != head && strcmp(list_entry(tmp, element_t, list)->value, list_entry(tmp->next, element_t, list)->value) <= 0;) { tmp = tmp->next; unmerge++; } ``` :::warning 現有的合併實作有改進空間,對照看 Linux 核心 `lib/list_sort.c` > 好的謝謝老師 ::: #### `q_ascend` 任意距離右側節點之元素小於目前節點元素,刪去目前節點; ```c while (cmp_node != head) { if (strcmp(list_entry(cmp_node, element_t, list)->value, list_entry(node, element_t, list)->value) < 0) { smaller = true; break; } cmp_node = cmp_node->next; } ``` 從佇列首節點開始,每一個節點皆往右尋找,找到就設定 `smaller = true` 表示右邊存在比當前節點 `value` 小的節點並停止尋找,下個判斷式會判斷 `smaller` ,若為 `true` 則移除該節點並釋放記憶體空間。再從下個節點開始,直到抵達尾部節點,該節點因為沒有可比較的節點,因此結束。 #### `q_descend` 任意距離右側節點之元素大於目前節點元素,刪去目前節點; ```c while (cmp_node != head) { if (strcmp(list_entry(cmp_node, element_t, list)->value, list_entry(node, element_t, list)->value) > 0) { greater = true; break; } cmp_node = cmp_node->next; } ``` 與 `q_ascend` 相同,但判別改為右邊存在比當前節點 `value` 大的節點; #### `q_merge` 合併所有「已排序」的佇列,並合而為一個新的已排序佇列; 將不同已排序佇列統一合併並排序至首個佇列; 實作此功能前需先了解多個佇列是如何鏈結在一起的 `queue.h` ```c /** * queue_contex_t - The context managing a chain of queues * @q: pointer to the head of the queue * @chain: used by chaining the heads of queues * @size: the length of this queue * @id: the unique identification number */ typedef struct { struct list_head *q; struct list_head chain; int size; int id; } queue_contex_t; ``` 這裡可以看出有兩個可能鏈結佇列的,分別是 `struct list_head *q` 及 `struct list_head chain` ,要更確定就必須從 `qtest.c` 觀察 ```c typedef struct { struct list_head head; int size; } queue_chain_t; static queue_chain_t chain = {.size = 0}; static queue_contex_t *current = NULL; ``` 這裡看出 `chain` 為 `queue_chain_t` ,而 `current` 為一 `pointer to queue_contex_t` 再往下看函式 `static bool do_new(int argc, char *argv[])` ```c queue_contex_t *qctx = malloc(sizeof(queue_contex_t)); list_add_tail(&qctx->chain, &chain.head); qctx->size = 0; qctx->q = q_new(); qctx->id = chain.size++; current = qctx; ``` 先配置了一個大小為 `queue_contex_t` 的記憶體空間,並以 `qctx` 指向它,再將 `queue_contex_t` 的成員 `chain` 新增至 `queue_chain_t` `chain` 的成員 `head` 頭部。再往下看到 `qctx->q = q_new()` 這行,這裡為 `qctx` 的成員 `q` 使用 `q_new()` 配置新的空佇列。 因此可以看出多個佇列的鏈結方法為使用 `queue_chain_t` 的成員 `chain` 來鏈結多個 `queue_contex_t` 的成員 `chain` ,而自身不儲存佇列,同時 `queue_contex_t` 的成員 `chain` 也用來鏈結下一個 `queue_contex_t` 的成員 `chain` ,真正儲存佇列的是在 `queue_contex_t` 的成員 `q` ; ```c while (merge_node != merge_list && src_node != src_list) { if (strcmp(list_entry(tail, element_t, list)->value, list_entry(merge_list->prev, element_t, list) ->value) <= 0) { tail = merge_list->prev; } if (tail == merge_list->prev && strcmp(list_entry(merge_list->prev, element_t, list) ->value, list_entry(src_list->next, element_t, list) ->value) <= 0) { list_splice_tail(src_list, merge_list); INIT_LIST_HEAD(src_list); break; } if (strcmp(list_entry(merge_node, element_t, list)->value, list_entry(src_node, element_t, list)->value) <= 0) { merge_safe = merge_node->next; list_del(merge_node); list_add_tail(merge_node, merge_list); merge_node = merge_safe; } else { src_safe = src_node->next; list_del(src_node); list_add_tail(src_node, merge_list); src_node = src_safe; } } ``` 解釋演算法前,先說明如何合併,在這裡第一個 `queue_contex_t` 的成員 `q` 會先後與其他的 `queue_contex_t` 裡的成員 `q` 合併,且其成員皆已經排序,而我直接將第一個 `queue_contex_t` 的成員 `q` 作為合併處,因此合併完就代表已經完成,不需將合併後的節點另外移動的到第一個 `queue_contex_t` 的成員 `q` 。 首兩個 `if` 判斷式皆用來避免發生以下情境: 假設第一個 `queue_contex_t` 的成員 `q` 節點內容如下: `1, 3, 5` 而第n個 `queue_contex_t` 的成員 `q` 節點內容如下: `2, 4, 6` 若以一般排序方式,較小的移動,較大的留到下一輪: `1, 3, 5` vs `2, 4, 6` => `3, 5, 1` vs `2, 4, 6` `3, 5, 1` vs `2, 4, 6` => `3, 5, 1, 2` vs `4, 6` `3, 5, 1, 2` vs `4, 6` => `5, 1, 2, 3` vs `4, 6` `5, 1, 2, 3` vs `4, 6` => `5, 1, 2, 3, 4` vs `6` `5, 1, 2, 3, 4` vs `6` => `1, 2, 3, 4, 5` vs `6` 因為 circular 的特性,會陷入無窮迴圈。 因此需紀錄尾部資訊(因為是最大值)若第一個 `queue_contex_t` 的成員 `q` 佇列尾部成員 `value` 小於第n個 `queue_contex_t` 的成員 `q` 佇列首位成員 `value` ,就將第n個 `queue_contex_t` 的成員 `q` 佇列中所有節點加入第一個 `queue_contex_t` 的成員 `q` 佇列,而尾部資訊就要時常更新,如果尾部節點成員 `value` 大於記錄值(以原先第一個 `queue_contex_t` 的成員 `q` 佇列尾部成員 `value` 初始化)就更新,再以一般合併方式合併。以同樣方式合併所有 `queue_contex_t` 的成員 `q` 即完成; #### `make test` 最後結果為 `95/100` `trace-17` 的 `rh` 及 `rt` 總是會不為`constant time` ## 以 Valgrind 分析記憶體問題 > Valgrind 為一在使用者層級 (user space) 對程式在執行時期的行為提供動態分析的系統軟體框架,可以用來追蹤及分析程式效能,最為人們所熟知的特性就是幫助使用者偵測記憶體錯誤,諸如使用未初始化的記憶體、不當的記憶體配置、或取消配置記憶體,並加以分析。 ```shell # Test if time complexity of q_insert_tail, q_insert_head, q_remove_tail, and q_remove_head is constant Probably constant time Probably constant time Probably constant time Probably constant time --- trace-17-complexity 5/5 --- TOTAL 100/100 Test with specific case by running command: scripts/driver.py -p /tmp/qtest.PpWKau --valgrind -t <tid> ``` 這裡一執行 `make valgrind` 就開始執行自動「評分」,卻得到了 100 分,但我並未對其他檔案作出改動。 ## Massif 記憶體分析 先透過 `valgrind --tool=massif ./qtest -f traces/trace-15-perf.cmd` 產生對應 `massif.out.xxxxx` 檔案 在執行指令 `massif-visualizer massif.out.xxxxx` 時,會遇到以下問題 ```shell qt.qpa.xcb: could not connect to display :0.0 qt.qpa.plugin: Could not load the Qt platform plugin "xcb" in "" even though it was found. This application failed to start because no Qt platform plugin could be initialized. Reinstalling the application may fix this problem. Available platform plugins are: eglfs, linuxfb, minimal, minimalegl, offscreen, vnc, xcb, wayland-egl, wayland, wayland-xcomposite-egl. Aborted (core dumped) ``` 試過多種解法(如添加:export DISPLAY=:0.0),目前仍未解決。 ## shuffle 實作 > 利用 Fisher–Yates shuffle 演算法來實作洗牌(shuffle) > > 先用 q_size 取得 queue 的大小 len。 > 隨機從 0 ~ (len - 1) 中抽出一個數字 random,然後 old 將指向從前面數來第 random 個節點,new 會指向最後一個未被抽到的節點,將 old 和 new 指向的節點的值交換,再將 len - 1。 > 隨著 len 大小變小,已經被抽到過,並交換值到 queue 後面的會愈來愈多,直到所有的節點都已經被抽到過,shuffle 就結束。 ```c while(size){ int p = rand()%size; list_for_each(node, head) if(p-- == 0) break; list_move_tail(node, head); size--; } ``` 這裡我採以最直覺的方法,選定一值 size 即佇列長度,透過不段減少該值設定隨機數 p 取值範圍,再透過 p 走到指定位置並移到最後。 統計方法驗證 `shuffle` Pearson's chi-squared test 能檢驗虛無假說(Null hypothesis),即某特定事件在樣本中觀察到的頻率分佈與特定理論分佈一致,事件必須是互斥且機率為 1。 - $H_0$(虛無假說): shuffle 的結果發生的機率相同,遵守 Uniform distribution - $H_1$(對立假說): shuffle 的結果發生的機率至少有一個不同 **1. 計算 chi-squared test statistic $X^2$** > $$ > X^2 = \sum_{i=1}^n {(O_i - E_i)^2 \over E_i} > $$ > > + $X^2$: Pearson's cumulative test statistic > + $O_i$: the number of observations of type i > + $E_i$: the expected count of type i 在測試 shuffle 1000000 次後,六種結果各自的頻率如下表: ``` Expectation: 41666 Observation: {'1234': 41505, '1243': 41825, '1324': 41301, '1342': 41446, '1423': 41497, '1432': 41612, '2134': 41620, '2143': 41834, '2314': 41684, '2341': 41669, '2413': 41758, '2431': 41748, '3124': 41870, '3142': 41724, '3214': 41665, '3241': 41725, '3412': 41649, '3421': 41558, '4123': 41984, '4132': 41810, '4213': 41547, '4231': 41754, '4312': 41521, '4321': 41694} chi square sum: 12.867901886430182 ``` || 觀察到的頻率| 期待的頻率 |${(O_i - E_i)^2 \over E_i}$| | -------- | -------- | -------- |---| | [1, 2, 3, 4] | 41505 | 41666 |0.6221139538232612| | [1, 2, 4, 3] | 41825 | 41666 |0.6067537080593289| | [1, 3, 2, 4] | 41301 | 41666 |3.1974511592185473| | [1, 3, 4, 2] | 41446 | 41666 |1.1616185858973744| | [1, 4, 2, 3] | 41497 | 41666 |0.6854749675994816| | [1, 4, 3, 2] | 41612 | 41666 |0.0699851197619162| | [2, 1, 3, 4] | 41620 | 41666 |0.05078481255700091| | [2, 1, 4, 3] | 41834 | 41666 |0.677386838189411| | [2, 3, 1, 4] | 41684 | 41666 |0.007776124417990688| | [2, 3, 4, 1] | 41669 | 41666 |0.0002160034560552969| | [2, 4, 1, 3] | 41758 | 41666 |0.20313925022800364| | [2, 4, 3, 1] | 41748 | 41666 |0.16137858205731292| | [3, 1, 2, 4] | 41870 | 41666 |0.9987999807996928| | [3, 1, 4, 2] | 41724 | 41666 |0.08073729179666875| | [3, 2, 1, 4] | 41665 | 41666 |2.4000384006144097e-05| | [3, 2, 4, 1] | 41725 | 41666 |0.08354533672538761| | [3, 4, 1, 2] | 41649 | 41666 |0.006936110977775645| | [3, 4, 2, 1] | 41558 | 41666 |0.2799404790476648| | [4, 1, 2, 3] | 41984 | 41666 |2.4270148322373157| | [4, 1, 3, 2] | 41810 | 41666 |0.497671962751404| | [4, 2, 1, 3] | 41547 | 41666 |0.3398694379110066| | [4, 2, 3, 1] | 41754 | 41666 |0.1858589737435799| | [4, 3, 1, 2] | 41521 | 41666 |0.5046080737291797| | [4, 3, 2, 1] | 41694 | 41666 |0.018816301060816974| |Total|||12.867901886430182| **2. 自由度** > 對於 N 個隨機樣本而言,自由度為 N - 1。 這裡自由度為 $4!-1 = 23$ **3. 選擇顯著水準** > + [顯著水準(Significance level)α](https://en.wikipedia.org/wiki/Statistical_significance) 代表在虛無假說($H_0$)為真下,錯誤地拒絕 $H_0$ 的機率,即型一錯誤發生之機率。 > α = P(拒絕 $H_0$ | $H_0$ 為真) > 我們 alpha 設定為常見的 0.05。 > + [P value](https://zh.wikipedia.org/wiki/P%E5%80%BC) > 從[卡方分布表](https://en.wikipedia.org/wiki/Chi-squared_distribution)找合適的 P value。 自由度為 $23$ ,$X^2$ 為 $12.8679$ ,因為 $11.689 < 12.8679 < 13.091$ ,於是 P value 介於 0.95 和 0.975 之間 #### 4. 統計檢定的結果不拒絕虛無假說 對於要拒絕的虛無假說($H_0$),觀察到的結果必須具有統計顯著性,即觀察到的 P value 小於預先指定的顯著水準 α。 因為 P value(0.95~0.975)> alpha(0.05),統計檢定的結果不拒絕虛無假說($H_0$) 也就是樣本資料不拒絕 shuffle 的結果遵循 Uniform distribution,因為沒有足夠證據推翻。 從圖表來看 shuffle 的頻率是大致符合 Uniform distribution 的。 ![截圖 2024-03-21 下午4.47.52](https://hackmd.io/_uploads/Bk-vEuK0T.png) 不知道是否為執行環境為虛擬機器造成效能不足的原因,光是統計 shuffle 的 python 檔案在執行前面 for 迴圈之指令累加 1000000 次就長達約 8 分鐘。在接下來實際執行更會拉到超過 30 分鐘。而圖形也產生也會發生像上面 massif 同樣問題(無法產生圖形),所以我將輸出(出現次數)結果另外貼到虛擬機器環境外執行。 ## 研讀論文 < [Dude, is my code constant time?](https://eprint.iacr.org/2016/1123.pdf) > :warning: constant time 之所以重要並非只是效率考量,程式執行時間會和資訊安全有關係 **Timing attacks** 為旁路攻擊的其中一類,透過測量**加密程序執行時間**以推敲加密內容。 若加密程序執行時間為恆定,則可解決 主要測試方法 [動態分析 vs 靜態分析](https://www.veracode.com/blog/secure-development/static-testing-vs-dynamic-testing) > In the static test process, the application data and control paths are modeled and then analyzed for security weaknesses. > > Dynamic application security testing (DAST) looks at the application from the outside in — by examining it in its running state and trying to manipulate it in order to discover security vulnerabilities. 本論文提出的方法因無需建立模型,為動態分析工具的一種 > **dudect**: a tool to assess whether a piece of code runs in constant time or not on a given platform > measure the execution time for two different input data classes, and then check whether the two timing distributions are statistically different 其方法基於 **[leakage detection techniques](https://link.springer.com/content/pdf/10.1007/3-540-45472-1_12.pdf)** 此處 leakage 並非指 memory leakage ,而是指 information,方法簡而言之便是輸入兩種不同資料類型,並辨認兩輸出時間分佈是否於統計分佈上是不同的。 **方法:** **Step 1: Measure execution time** a)資料類型:分為隨機及固定值 > first class input data is fixed to a constant value, and the second class input data is chosen at random for each measurement. 其中作者認為隨機輸入較容易出現 [corner-case](https://en.wikipedia.org/wiki/Corner_case) > a problem or situation that occurs only outside normal operating parameters 即操作參數在正常範圍以外的問題或是情形,但就算在範圍內,只要是極端值也是。 b) Cycle counters:精準測量執行時間 c) Environmental conditions 為降低環境變更影響,作者的每一次測量對應到每一種隨機選擇的類別,同時測量前先完成類別分配和輸入準備。 **Step 2: Apply post-processing** a) Cropping:透過設定裁切閾值,可為常數或是認定與輸入類別無關閾值,目的為除去測量法、OS中斷及外部活動影響。 b) Higher-order preprocessing:更高階的前處理,例如:centered product 模擬高階的 [DPA](https://www.rambus.com/understanding-differential-power-analysis/) 攻擊 DPA:透過統計分法分析目標裝置電力消耗狀態,從加密演算法中推算金鑰。 > Differential power analysis(DPA)is a side-channel attack use variations in the electrical power consumption of a targeted device and then breech security in devices by using statistical methods by deriving secret keys from crypto-algorithm. 網路上對於 centered product 的介紹非常少,然而在本論文中提到本方法提出者其論文 ( Towards sound approaches to counteract power-analysis attacks ) 較不易懂,因此我參考了以下論文 [A Statistical Model for Higher Order DPA on Masked Devices](https://eprint.iacr.org/2014/433.pdf) 以下為部分內容: 理論上,在布林掩碼裝置在執行時任何時間點的洩漏資訊會獨立於隱私金鑰,因此不會洩漏金鑰。而這布林掩碼能保護加密系統免於所有僅使用單個時間點做洩漏測量(或與同一個中間資料相關的多個時間點)的第一階攻擊。然而使用多個與多個中間數據相對應的時間點上的泄漏的高階攻擊可以揭露隱私金鑰。 讓我們考慮一個二階攻擊使用在一個由單一掩碼變數 $M$ 保護的裝置在兩個時間點的洩漏 $L(t_0)$ 及 $L(t_1)$ 。一個二階攻擊可以透過最大化猜測之中間資料 $Z^g$ 及兩洩漏 $L(t0)$ 及 $L(t1)$ 之結合函式之間的相關性選擇金鑰 $k_g$ 來破解保護系統,而文中提到的 **centered product** 便是兩洩漏 $L(t_0)$ 及 $L(t_1)$ 之結合函式 $[L(t_0) − E(L(t_0))] × [L(t_1) − E(L(t_1))]$ 其中中間資料為 $Z$ ,掩碼為 $M$ ,透過**邏輯互斥或** $f(Z,M)=Z⊕M$ 加密中間資料 $Z$ **Step 3: Apply statistical test** a) t-test: > Welch 的 t-test 是雙樣本位置測試,用來測試兩個母體均值是否相等的檢驗。 為 Student 的 t-test 調整,當兩個樣本具有不相等的變異數和不相等的樣本大小時能更可靠。這些測試通常稱為「不成對」或「獨立樣本」的 t-test 。 此測試失敗則代表第一階時間資訊洩漏,若與 cropping 前處理同時使用,將會需要額外測試更高階的統計時刻(翻譯不是很好,不過在論文中是 statistic moment ),因為 cropping 是一種非線性變換(直接裁切數據分佈又無法以一線性過程表示將會造成測試本身不精準)。 b) Non-parametric tests:非參數檢定 > Kolmogorov-Smirnov:基於累計分布函數的無母數檢定,用以檢定兩個經驗分布是否不同或一個經驗分布與另一個理想分布是否不同。 優點是這些測試通常對於底層分佈的假設較少;缺點是它們可能收斂速度較慢,並且需要更多的樣本。 **結果:** **A. 實作過程** a) Pre-processing:如同前面所述的 cropping 設定 percentile P 為閾值,只要測量值大於 P 便捨棄。 這裡的理念是測試一個受限制的分佈範圍,特別是較低的 cycle count 尾部。因為較高的尾部可能更受到與數據無關的噪聲的影響。 b) Statistical test:使用 Welch’s t-test 及 [Welford’s online variance](https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance) 計算方法提高數值穩定性。 [online algorithm](https://en.wikipedia.org/wiki/Online_algorithm) online algorithm 可以按順序逐個處理其輸入的演算法,而無需從一開始就擁有全部的輸入。 offline algorithm 從一開始就獲得了整個問題數據,並且需要輸出一個解決當前問題的答案。 這裡應為 Welford’s online variance 而非原論文提到的 online Welch’s t-test , Welch’s t-test 僅為檢定整體分佈是否相同,若以逐個數據輸入並作處理並不能看出**整體分佈**,較不合理。 **B. 記憶體比較** **變數時間實作(即測試目標的實作):** 第一階洩漏檢測檢驗考慮了基於的常態分佈的 16-byte 字串 (隨機類別);而二階洩漏檢測固定輸入秘密字串 s (固定類別) ![image](https://hackmd.io/_uploads/ryusaZiCp.png) 這張即是兩分佈不相等的例子,分別對應到固定及隨機類別,也表示了對於輸入資料是存在洩漏的。 ![image](https://hackmd.io/_uploads/ByyTRbo0T.png) 而這張圖是表示對於 t-statistic 的絕對值相對於測量次數的提升的關係,不同線即代表不同前面所提到的 threshold p ,而 t 只要大於 4.5 即代表兩分佈為不同。 從單條線就可以看出兩分佈是否相同,因此 y 軸 t 應為兩分佈經過 t-test 所得之值。 後面也有提到此圖是 t-test 之結果 > Figure 12 shows the results of the t-test 原先設定 evaluator 知道秘密字串 s (內部實作),現將其改為不知道,因此固定類別的值被設為 0 而非 s 。 ![image](https://hackmd.io/_uploads/rypmGGj0a.png) 可以看到兩數據分佈較為相近,不過還是為不同分佈,統計仍拒絕虛無假設。 ![image](https://hackmd.io/_uploads/HkIyQfj0p.png) 雖然 t 減少了,不過也顯現出有些前處理策略並沒有比較快,可以看到 x 軸的尺度多了 $10^2$ 。 **常數時間實作:** 函式利用邏輯運算比較,不會因不相配就停止。 以下程式碼中,做了完整的測試使用者是否有完整權限。,會執行完所有邏輯運算才回傳值,而不會因為中間某一步失敗就停止。 ```python def check_permissions(user): result = True # 檢查用戶是否具有讀取權限 result = result and user.has_read_permission # 檢查用戶是否具有寫入權限 result = result and user.has_write_permission # 檢查用戶是否已驗證 result = result and user.is_verified return result ``` 這裡的常數時間實作也是為了做完整多面向的測試而選擇邏輯運算比較。 ![image](https://hackmd.io/_uploads/SJ0MiGj0a.png) ![image](https://hackmd.io/_uploads/HJIQjGsAp.png) 而在常數時間實作下,可以看到兩分佈基本是相同的並無法區分,即符合兩分佈是相同的虛無假設。 作者發現時間變異性來自乘法指令,硬體乘法電路會因為一些位元設定為 0 而提早終止。不過作者也提到相同的程式碼會因為參數、架構、處理器及工具而產出不同結果,進而需要調整參數,例如: threshold p 再來是非常重要的需要測量多少次,以變數時間實作的第二張圖來舉例,有些需要 $10^6$ 次測量才能檢測出洩漏,前面的 DPA 更是如此,需要一定的測量次數才有辦法檢測到。不過作者將這個問題丟給測試者。 >It is the evaluator’s responsibility to set a realistic and meaningful value for the number of measurements. 作者也提到有些測試需要 $10^{10}$ 次測量才能檢測出洩漏,而有些對加密操作有限制的應用程式就無法使用,作者就認為他們的因為檢測洩漏幅度所以很有幫助,不過有洩漏與沒洩漏就是在兩邊,既然知道微小洩漏不就是有洩漏,如果說需要 $10^{10}$ 次測量才能檢測出洩漏,那麼為何能檢測出微小洩漏? 作者也提到他們檢測只是提供一個容易受到攻擊的跡象,並非就是代表他是會被攻擊。 本身對資安問題完全沒有涉略,不過想到一個問題,只能設計常數時間執行的程式碼對程式本身功能要求較高,例如對排序。 不過若執行完程式不立即結束,而是以一隨機數去作決定多讓程式執行的時間,是否在不要求程式碼的功能的情況下會有較高的安全性。同時考慮 DPA 是利用程式執行功耗去推算的金鑰以取得資料,多等待的時間必須做一些會影響功耗的指令,當然最好就是將未來會執行到的程式碼拉到前一段程式碼來執行,不過若無法就可以只做簡單的二元運算。 參考作者提出的測量方法 [dudect](https://github.com/oreparaz/dudect/) 並與作業 1 中提供的 dudect 做比較,可以看到作業 1 提供的程式碼中缺少了閾值 percentile P 的實作,無法 Cropping 不屬於程式碼本身執行的影響,而這會影響到 constant time 的測量。 因此直接從作者的程式碼中提取需要的函式及 macro : ```c #define NUM_PERCENTILE 100 static int cmp(const int64_t *a, const int64_t *b) static int64_t percentile(int64_t *a_sorted, double which, size_t size) static void prepare_percentile(int64_t *exec_times, int measure_num, int64_t *percentiles) ``` 不過也需要將他們加入計算過程中,而 lab0 提供的程式碼與原是程式碼最大的不同就是 lab0 將執行過程放在 doit 函式中,因此開始修改 doit 函式: ```diff ... + int64_t *percentiles = calloc(NUM_PERCENTILE, sizeof(int64_t)); prepare_inputs(input_data, classes); bool ret = measure(before_ticks, after_ticks, input_data, mode); differentiate(exec_times, before_ticks, after_ticks); + prepare_percentile(exec_times, N_MEASURES, percentiles); + update_statistics(exec_times, classes, percentiles); ret &= report(); ... ``` 先配置一 percentiles 作為儲存不同百分位對應的執行時間之空資料結構,而 `prepare_percentile` 負責將內容填入,再來透過 update_statistics 進行 t 檢定,因此同時 update_statistics 也要進行修改: ```diff ... /* do a t-test on the execution time */ t_push(t, exec_times[0], classes[i]); //僅做修改 +for (size_t crop_index = 1; crop_index < NUM_PERCENTILE; crop_index++) { + if (difference < percentile[crop_index]) { + t_push(t, difference, classes[i]); + } +} ... ``` 這裡每一次做 t 檢定都選擇不同閾值,大致上流程為: 1. 如果 `exec_times[i]` 的值小於或等於0,則認為是 CPU Counter 溢出或丟失的測量,跳過。 2. 將 `exec_times[0]` 加入 t 檢定 3. 將 `exec_times[i]` 小於該輪迴圈 `exec_times[i]` 百分位加入 t 檢定 因此結果會像以下 y 軸為外部迴圈執行次數,而 x 軸為被加入 t 檢定的 `exec_times[i]` 個數,隨著外部迴圈選定的 `exec_times[i]` 越大,因為比較式 `if (difference < percentile[crop_index])` 而使加入的 `exec_times[i]` 越來越少,而因為 percentile 為 exec_times 之百分位數,因此可以寫成以下: ```sas 1 .......... = 1.00*N_MEASURES-1 2 ......... = 0.99*N_MEASURES-1 3 ........ = 0.98*N_MEASURES-1 4 ....... = 0.97*N_MEASURES-1 5 ...... = 0.96*N_MEASURES-1 6 ..... = 0.95*N_MEASURES-1 7 .... = 0.94*N_MEASURES-1 8 ... = 0.93*N_MEASURES-1 9 .. = 0.92*N_MEASURES-1 10 . = 0.91*N_MEASURES-1 . . . . . . ``` 修改完上述程式碼,再次執行 `make test` 即可通過 `trace-17` 的常數時間測試得到 `100`/`100`。 ## 研讀 Linux 核心原始程式碼的 lib/list_sort.c 內含 3 個函式 ```c static struct list_head *merge(void *priv, list_cmp_func_t cmp, struct list_head *a, struct list_head *b) ``` 這個函式單純就是合併兩**已排序**鏈結串列,不過並不會去建立 `prev` 指標。 ```c static void merge_final(void *priv, list_cmp_func_t cmp, struct list_head *head, struct list_head *a, struct list_head *b) ``` 用在最後的合併,因為會同時建立回雙向鏈結串列。 這裡同時用到一個我看不懂的寫法,同時先附上使用到的 `cmp` 函式: ```c static int cmp(void *priv, const struct list_head *a, const struct list_head *b) { struct debug_el *ela, *elb; ela = container_of(a, struct debug_el, list); elb = container_of(b, struct debug_el, list); check(priv, ela, elb); return ela->value - elb->value; } ``` ```c # define likely(x) __builtin_expect(!!(x), 1) # define unlikely(x) __builtin_expect(!!(x), 0) ``` 先解釋 [unlikely](https://meetonfriday.com/posts/cecba4ef/) ,雖然是中文網頁,但講得很清楚也易懂。 兩者同為 macro , `__builtin_expect` 為期待的結果為何,而加上兩個 `!` 取反是因為確保結果非 0 即 1 ,而這樣做是因為邊義氣在最佳化時已經知道該為期待輸出,所以考慮到 cache 的 spatial locality ,會透過最佳化將其編譯後的程式碼靠近,並有機會同該函式一起搬到 cache 中,降低 cache miss 的機會,可以更有效率的執行該函式。 現在看該程式碼 ```c do { /* * If the merge is highly unbalanced (e.g. the input is * already sorted), this loop may run many iterations. * Continue callbacks to the client even though no * element comparison is needed, so the client's cmp() * routine can invoke cond_resched() periodically. */ if (unlikely(!++count)) cmp(priv, b, b); b->prev = tail; tail = b; b = b->next; } while (b); ``` `cmp(priv, b, b)` 相當於不斷與自身比較,本身就有點奇怪,如果非 NULL或Nan,那麼一定相等,若只要要確認數值有效性,大可以其他方式判斷。 1. 如果只是要將 cmp 的 cache miss 降低,但程式碼其他地方不會再出現 cmp。 2. 若是要避免第一次迴圈時 b 為 NULL ,也不太合理,因為前面一段不會將剩餘未合併處進行合併,且 a 剩餘未合併的鏈結串列也會被 b 指向而接收。 3. 若是要避免後續再執行時 b 為 NULL ,那麼 do while 迴圈的 while(b) 便可避免 b 為 NULL。 :::warning 這裡因為看不太懂,因此需要老師解答 ::: 接下來是 list sort 本體 ```c void list_sort(void *priv, struct list_head *head, list_cmp_func_t cmp) ``` 以下是關鍵部分,大概分成三個子部分: ```python do { size_t bits; struct list_head **tail = &pending; // 1. 只要 bits 為奇數則不斷向右移 1 bit (因為只看最低 bit),相當於 bits/2 // 同時 tail 指標不斷向首部移動 /* Find the least-significant clear bit in count */ for (bits = count; bits & 1; bits >>= 1) tail = &(*tail)->prev; // 2. 先看此時 count ,如果右移操作完為 0 則不會進入判斷式 // 出現此情況即 count = 2^k - 1,即為無須合併的情況。 // 而 tail 透過上部分移到需要正確的合併位置, // 因此直接將它們合併並放回原本 tail 指向的位置。 /* Do the indicated merge */ if (likely(bits)) { struct list_head *a = *tail, *b = a->prev; a = merge(priv, cmp, b, a); /* Install the merged result in place of the inputs */ a->prev = b->prev; *tail = a; } // 3. 這裡是一次次將 list 所指向的節點移到 pending 上, // 可以看到這裡合併的都不是 list 內的節點, // 當 list 內節點移動完之後即結束 /* Move one element from input list to pending */ list->prev = pending; pending = list; list = list->next; pending->next = NULL; count++; } while (list); ``` 我的 q_sort 的改進空間: 雖然我的 q_sort 不屬於 merge sort ,不過還是有許多可以借鑑的地方。 1. 每次合併大小:我在實作的 q_sort 不會去考慮合併大小而直接合併,此時比較次數會被因為輸入原始順序而大幅影響。 2. 先將雙向循環鏈結串列拆成單向鏈結串列的動作,可以大幅節省中間合併操作,只需在最後階段 merge_final 再行復原就好。 再來是 timsort 的可以借鑑的部分,我在寫 q_sort 前世沒有看過 timsort ,因此在寫第二次作業時看到 timsort 的寫法時,發現了許多額外的考慮的實作: 1. 合併時不僅考慮了降序 (升序) ,同時考慮升序 (降序) ,因此能降低**已排序**資料再重新排序時的操作 (因 timsort 認為在真實世界中大部分需要排序的是已排序資料) 。 2. 確保堆疊上的 run 長度保持平衡。 - A 的長度要大於 B 和 C 的長度總和。 - B 的長度要大於 C 的長度。 因此可以使兩合併鏈結串列長度較為平衡,不易產生一條越來越長的鏈節串列與不會增長的鏈節串列作合併,能使在同樣的比較次數下,合併最多的結點。 ## 引入 lib/list_sort.c 至 lab0-c 引入上述的三個函式,另外將輸入函式參數做調整: 1. 移除 `void *priv` 2. 移除 `list_cmp_func_t cmp` 改為直接呼叫該函式 修改函式 `cmp` ,此函式只能比較單字元,無法比較不同長度的字串,因此直接改成 `strcmp` 做回傳。 修改完就開始修改 `qtest.c` 採取以 `q_sort` 相同的做法,但另外將些許變數名稱改為 `listsort` 。 驗證: 這裡我直接將 `do_sort` 裡面呼叫的 `q_sort` 改為 `list_sort` 並執行 `make test` ,可通過測試。 ```shell --- trace-15-perf 6/6 +++ TESTING trace trace-16-perf: # Test performance of insert_tail --- trace-16-perf 6/6 +++ TESTING trace trace-17-complexity: # Test if time complexity of q_insert_tail, q_insert_head, q_remove_tail, and q_remove_head is constant Probably constant time Probably constant time Probably constant time Probably constant time --- trace-17-complexity 5/5 --- TOTAL 100/100 ``` 比較落差: 利用 [2024q1 第 1 週測驗題](https://hackmd.io/@sysprog/linux2024-quiz1) 裡的 測驗2 提供的測試程式做一些修改以能同時比較 `list_sort` 、 `timsort` 及 `q_sort` ,不過因為 `q_sort` 及 `list_sort` 被實作在 `queue.c` ,因此有做額外的更改,同時面對比次數,我將其比較的 value 改為字串,而非原先的整數。 詳見:[commit 84f3cda](https://github.com/david965154/lab0-c/commit/84f3cda5345c87d5e30afc872d51076b97345e27) 資料隨機分佈 每次筆數:100000 ```shell 1. Creating sample ==== Testing timsort ==== CPU Clock Elapsed : 380590 Comparisons: 1638712 List is sorted ==== Testing list_sort ==== CPU Clock Elapsed : 347777 Comparisons: 1542302 List is sorted ==== Testing my_sort ==== CPU Clock Elapsed : 616993 Comparisons: 3829894 List is sorted 2. Creating sample ==== Testing timsort ==== CPU Clock Elapsed : 371150 Comparisons: 1660057 List is sorted ==== Testing list_sort ==== CPU Clock Elapsed : 353576 Comparisons: 1542510 List is sorted ==== Testing my_sort ==== CPU Clock Elapsed : 576646 Comparisons: 3830444 List is sorted 3. Creating sample ==== Testing timsort ==== CPU Clock Elapsed : 355054 Comparisons: 1640335 List is sorted ==== Testing list_sort ==== CPU Clock Elapsed : 341104 Comparisons: 1542293 List is sorted ==== Testing my_sort ==== CPU Clock Elapsed : 582272 Comparisons: 3829944 List is sorted 4. Creating sample ==== Testing timsort ==== CPU Clock Elapsed : 348978 Comparisons: 1633683 List is sorted ==== Testing list_sort ==== CPU Clock Elapsed : 346883 Comparisons: 1542428 List is sorted ==== Testing my_sort ==== CPU Clock Elapsed : 596855 Comparisons: 3829415 List is sorted 5. Creating sample ==== Testing timsort ==== CPU Clock Elapsed : 409423 Comparisons: 1712313 List is sorted ==== Testing list_sort ==== CPU Clock Elapsed : 348257 Comparisons: 1542406 List is sorted ==== Testing my_sort ==== CPU Clock Elapsed : 571975 Comparisons: 3829926 List is sorted ``` 可以看到在比較次數上,我所做的 sort 皆為其餘兩 sort 的約 2.5 倍。 ## 井字遊戲 Tic-Tac-Toe **Linux 核心的浮點數運算** 計算定點數乘冪的函式 `fixed_power_int` compute: $x^n$, in $O(log n)$ time 我不太理解此函式,因為 `unsigned long` 會直接將輸入浮點數轉換成整數型式,導致整串其實都在捨棄非小數點的整數部分,透過直接輸入值測試 ```c printf("%ld\n",fixed_power_int(35.6586, 4, 2)); ``` 我的輸入為 35.6586 ,需要計算 2 次方,不過結果卻是 77 ,這顯然經過了捨棄整數部分去計算的過程,因為原值應為 36.6586*36.6586 = 1343.8529~~5396~~。 撇去我沒辦法實現計算結果的部分,本身程式碼透過 bit 為 1 才需計算的概念,又透過 `result += 1UL << (frac_bits - 1)` 無條件進位以保留小數與整數不分相乘造成的進位,將計算複雜度降至 $O(log n)$ 。 #### `hlist.h` 這裡我大部分使用原本的 `list.h` ,同時參考 `lab0-c` 內的 `list.h` 去做修改。不過在 `cppcheck` 時一直遇到以下問題。 ```c zobrist.c:62:38: error: Null pointer dereference: (struct zobrist_entry_t*)0 [nullPointer] zobrist_entry_t *entry = hlist_entry(hash_table[i].first, zobrist_entry_t, ht_list); ``` 這裡是因為 container_of 所用到的 `__typeof__(((type *) 0)->member)` ,而 `(type *) 0` 也的確是 `null pointer` 所導致的 ```c #define container_of(ptr, type, member) \ __extension__({ \ const __typeof__(((type *) 0)->member) *__pmember = (ptr); \ (type *) ((char *) __pmember - offsetof(type, member)); \ }) ``` 不過又不能移除,因為他要用來取得成員的 `type` ,因此我直接將 `--suppress=nullPointer:zobrist.c` 加入 `lab0-c/scripts/pre-commit.hook` 內的 `CPPCHECK_suppresses` 而成功通過 `cppcheck` 。 #### 新增 ttt 命令,並且允許使用者切換「人 vs.電腦」或「電腦 vs. 電腦」的對弈 ```c ADD_COMMAND(ttt, "Play Tic-Tac-Toe, MODE 0 for PvE with MCTS, 1 for EvE with " "RL, 2 for EvE with Negamax", "[MODE]"); ``` 這裡我選擇三種遊戲方式,第一種為人機對弈,使用 MCTS 演算法,第二種為機器對弈機器,使用 RL ,第三種同樣為機器對弈機器,使用 Negamax 。這三種模式分別對應到選項 1、2、3 ,當選到 2 時,城市會先去搜尋是否有 `state_value.bin` 這個檔案(由 RL 產生出來的檔案,記錄決策及分數),若無則先執行 `train.c` ,再開始執行原先被命名為 `main.c` 的檔案 (後被我改為 `run_ttt.c` )。 #### 使用 Monte Carlo tree search (MCTS) 演算法 ```c if (MODE == 1) { unsigned int state_num = 1; CALC_STATE_NUM(state_num); init_rl_agent(&agent, state_num, 'O'); load_model(&agent, state_num, MODEL_NAME); printf("MODE: RL!\n"); } else if (MODE == 0) { printf("MODE: MCTS!\n"); } else { negamax_init(); printf("MODE: NEGMAX!\n"); } ``` 這裡只要輸入為 0(人機對弈),則使用 `MCTS` ,原先我不想更改這裡的程式碼,直接用 `#ifdef USE_MCTS` 去做選擇演算法的動作,不過我發現在 `qtest.c` 的 `#define` 並非全域,因為我並沒有做 `include` 的動作,因此在 `run_ttt.c` 中會無效,才改成傳遞輸入的參數 `MODE` 。 #### 定點數取代浮點數運算 `MCTS` 中,我只有在 `mcts.c` 內的 `uct_score` 有看到比較需要進行定點數計算的部分。 ```diff static inline double uct_score(int n_total, int n_visits, double score) { if (n_visits == 0) return DBL_MAX; -return score / n_visits + - EXPLORATION_FACTOR * sqrt(log(n_total) / n_visits); + return (double) ((int) (score * 10000)) / (n_visits * 10000) + + sqrt(((int) 2 * log(n_total) * 10000 / n_visits)) / 100; } ``` 這裡我主要將大部分浮點數先轉換成整數再進行除法運算,同時把 `EXPLORATION_FACTOR`( $\sqrt2$ )先併入後面的 `sqrt` ,這樣可以避免進行再一次的 `sqrt` 計算。 這個函式每做一次決策被使用的次數非常多,甚至超出我在 VScode 的terminal 範圍,不過仍有許多地方仍須改成定點數計算 :red_circle: TODO :red_circle:。 #### 螢幕顯示當下的時間 (含秒數) ```c static void displayTime(int displacement) { time_t current_time; struct tm *time_info; char timeString[20]; time(&current_time); time_info = localtime(&current_time); strftime(timeString, sizeof(timeString), "%H:%M:%S", time_info); printf("\033[1A"); printf("\033[2K"); printf("\033[%dD", 3 + displacement); printf("Current time: %s\r", timeString); fflush(stdout); printf("\033[1E"); printf("\033[%dC", 2 + displacement); fflush(stdout); } ``` 這裡輸出順序為:棋盤$\rightarrow$時間$\rightarrow$輸入區域 因此我會在等待輸入時每一秒將游標上移並移到最左處重新覆蓋輸出時間再下移並每過一段時間右移一格以免覆蓋掉前面輸入但尚未按下 enter 的字元,這裡還沒處理到我認為是好的程度,最好的應該是獲取 stdin 的字元進而計算右移格數,不過我還沒找到類似的程式碼去達成此目標。 ```c do { FD_ZERO(&rfds); FD_SET(STDIN_FILENO, &rfds); tv.tv_sec = 0; tv.tv_usec = 0; retval = select(STDIN_FILENO + 1, &rfds, NULL, NULL, &tv); if (retval == -1) { perror("select()"); return 1; } else if (retval) { r = getline(&line, &line_length, stdin); } if (retval && line != NULL) { break; } if (displacement <= 2) displayTime(displacement); else displayTime(2 + displacement / 3); sleep(1); displacement++; } while (1); ``` 原先程式碼並不會因為不輸入就繼續下一行程式碼,因此我使用 select 去偵測是否有「輸入」(有按下 enter 才算),如果沒有就不斷地更新時間直到有輸入並且在下一次等待輸入又進行此更新時間的動作,並且我只選擇在人機對弈有這個輸出時間的機制,因為機器對弈機器的過程太過快速,遊戲開始到結束的過程甚至不到一秒,沒有什麼意義。 #### 引入 coroutine 先從看教材開始 我因為看不懂教材內提供的 [QEMU 中的協程: qemu-coroutine](https://royhunter.github.io/2016/06/24/qemu-coroutine/) 的關於 `getcontest` , `setcontext` 與 `makecontest` ,我看了 [getcontext or setcontext Subroutine](https://www.ibm.com/docs/en/aix/7.3?topic=g-getcontext-setcontext-subroutine) 裡面講的非常詳細 簡單來說 [Contest](https://en.wikipedia.org/wiki/Context_(computing)) > In computer science, a task context is the minimal set of data used by a task (which may be a process, thread, or fiber) that must be saved to allow a task to be interrupted, and later continued from the same point. 因此後綴以 Contest 的 `getcontest` , `setcontext` , `makecontest` 與 `swapcontest` 須貫徹這樣的精神,從哪裡中斷就從哪裡繼續。 `getcontest` : 為保存現在的上下文,包括調用進程的機器暫存器 (register) 的內容、信號屏蔽(signal mask)和當前執行堆疊 (stack) 。 `setcontext` : 主要用於**恢復**先前保存的上下文,通常用於信號處理或特殊的上下文恢復。會完全替換當前的上下文,並從指定的上下文中繼續執行,無法返回到原來的調用點。 `makecontest` : 類似於 `getcontest` ,但**並非**保存**當下**上下文,而是自己建立一個上下文。 `swapcontest` : 會先保存當下上下文,並恢復先前保存的上下文,因此在兩個上下文之間進行**切換**,且之後可返回到原來的調用點。 再來是會並用的 `setjmp` 及 `longjmp` ``` int setjmp( jmp_buf env ); void longjmp( jmp_buf env, int value ); ``` `setjmp` : 保存當下堆疊 (stack),若為直接回傳而非回傳 `longjmp` ,那麼會回傳 0 。 `longjmp` : 恢復當時的堆疊,同時可設定 `setjmp` 該回傳何值以確認是否回傳的為 `longjmp` 。 **trace qemu 的 coroutine 的實作** 首先,要進入 coroutine 狀態需透過 `qemu_coroutine_create` 建立一 coroutine 才能透過 `qemu_coroutine_enter` 進入 ```c coroutine = qemu_coroutine_create(my_coroutine); qemu_coroutine_enter(coroutine, my_data); ``` 而 my_coroutine 則是要進行 coroutine 的事件,如下 ```c void coroutine_fn my_coroutine(void *opaque) { MyData *my_data = opaque; /* do some work */ qemu_coroutine_yield(); /* do some more work */ } ``` `qemu_coroutine_create` ```c Coroutine *qemu_coroutine_create(CoroutineEntry *entry) { Coroutine *co = qemu_coroutine_new(); // 創建一个新的coroutine co->entry = entry; //設置coroutine的入口函数为entry return co; } ``` 這裡看到 `qemu_coroutine_new` 就是建立一個新的 coroutine ,不過他提供的是 `coroutine_new` ,透過實際去 github 查詢後確定是他寫錯,應改為 `qemu_coroutine_new` `qemu_coroutine_new` ```c= static Coroutine *qemu_coroutine_new(void) { const size_t stack_size = 1 << 20; CoroutineUContext *co; ucontext_t old_uc, uc; jmp_buf old_env; union cc_arg arg = {0}; if (getcontext(&uc) == -1) { //獲得當前上下文 abort(); } // 配置空間存放 stack co = g_malloc0(sizeof(*co)); co->stack = g_malloc(stack_size); co->base.entry_arg = &old_env; /* stash away our jmp_buf */ // 儲存上下文訊息 uc.uc_link = &old_uc; uc.uc_stack.ss_sp = co->stack; uc.uc_stack.ss_size = stack_size; uc.uc_stack.ss_flags = 0; arg.p = co; makecontext(&uc, (void (*)(void))coroutine_trampoline, 2, arg.i[0], arg.i[1]); //製造一個上下文,設置該上下文的stack size及相關訊息 /* swapcontext() in, longjmp() back out */ if (!setjmp(old_env)) { //保存當前上下文到old_env中,此时old_env的地址作為co->base.entry_arg swapcontext(&old_uc, &uc); //切换至uc代表的上下文中,入口函数為coroutine_trampoline,返回點为old_uc中 } return &co->base; } ``` :::success 注意 : 第 22 行因為判斷式 if(!setjmp(self->env)) 設下跳躍點後,swapcontext 需要 uc 而進入 coroutine_trampoline (重要 : 第 25 行的 makecontext 並非馬上執行 ) 並因為判斷式 if(!setjmp(self->env)) 進入後因為 longjmp 而跳出接續上次中斷的而回到 qemu_coroutine_new,因此會在 coroutine_trampoline 內會設下跳躍點,又不會進入 while 迴圈,且因為設下 value 為 1 ,在 longjmp 回來後 setjmp(self->env) 回傳 1 而不會執行 swapcontext ,並直接回傳。 ::: `coroutine_trampoline` ```c static void coroutine_trampoline(int i0, int i1) { union cc_arg arg; CoroutineUContext *self; Coroutine *co; arg.i[0] = i0; arg.i[1] = i1; self = arg.p; co = &self->base; // 獲取了通過coroutine_new創建的coroutine結構 /* Initialize longjmp environment and switch back the caller */ if (!setjmp(self->env)) { // 保存當前上下文到co(新協程)的env buffer中,由於第一次setjmp返回的是0,則調用下面的longjmp longjmp(*(jmp_buf *)co->entry_arg, 1); // 此時co->entry_arg為coroutine_new中的old_env保存點,而value給的是1,則swapcontext不會執行,直接return,qemu_coroutine_create就直接返回 } while (true) { co->entry(co->entry_arg); qemu_coroutine_switch(co, co->caller, COROUTINE_TERMINATE); } } ``` 回傳後,呼叫 `qemu_coroutine_enter` ```c void qemu_coroutine_enter(Coroutine *co, void *opaque) { Coroutine *self = qemu_coroutine_self(); //獲取當前協程的co结構,第一次可認為是主協程的控制信息 trace_qemu_coroutine_enter(self, co, opaque); if (co->caller) { fprintf(stderr, "Co-routine re-entered recursively\n"); abort(); } co->caller = self; //新協程的caller为主線程co co->entry_arg = opaque; coroutine_swap(self, co); //通過swap操作從主協程切換至新創建的co } ``` 接下來其實我就不太知道為什麼會能使用到 `qemu_coroutine_switch` ,我也找不到 `coroutine_swap` 的相關訊息,也不會是 `qemu_coroutine_switch` ,因為參數量不符。但可以透過下面程式碼第 14 行的註解看出,他會是下一個使用的函式,並能跳回到 `coroutine_trampoline` 並進入 while 迴圈執行 create 時給的 entry,達成切換 coroutine 執行的動作。 `qemu_coroutine_switch` ```c= //qemu_coroutine_switch 為協程切換的關鍵函数 CoroutineAction qemu_coroutine_switch(Coroutine *from_, Coroutine *to_, CoroutineAction action) { CoroutineUContext *from = DO_UPCAST(CoroutineUContext, base, from_); CoroutineUContext *to = DO_UPCAST(CoroutineUContext, base, to_); CoroutineThreadState *s = coroutine_get_thread_state(); int ret; s->current = to_; ret = setjmp(from->env); //保存當前的上下文到主協程的env中,相當於主協程的上下文在qemu_coroutine_enter中 if (ret == 0) { longjmp(to->env, action); //跳轉至新協程的上下文,新協程的上下文保存點為coroutine_trampoline中的setjmp處,此處action給的是非0,則直接進入co->entry(co->entry_arg);執行create時給的entry. } return ret; } ``` 若要轉移執行控制權到協程的調用者處則用 `qemu_coroutine_yield` ```c void coroutine_fn qemu_coroutine_yield(void) { Coroutine *self = qemu_coroutine_self(); Coroutine *to = self->caller; trace_qemu_coroutine_yield(self, to); if (!to) { fprintf(stderr, "Co-routine is yielding to no one\n"); abort(); } self->caller = NULL; coroutine_swap(self, to); //此處self为新協程,to为主協程 } ```