# 2022q1 Homework1 (lab0) contributed by < `kdnvt` > ## 開發環境 ```bash $ gcc --version gcc (Ubuntu 9.3.0-17ubuntu1~20.04) 9.3.0 $ lscpu Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little Endian Address sizes: 39 bits physical, 48 bits virtual CPU(s): 8 On-line CPU(s) list: 0-7 Thread(s) per core: 2 Core(s) per socket: 4 Socket(s): 1 NUMA node(s): 1 Vendor ID: GenuineIntel CPU family: 6 Model: 126 Model name: Intel(R) Core(TM) i5-1035G1 CPU @ 1.00GHz Stepping: 5 CPU MHz: 1200.000 CPU max MHz: 3600.0000 CPU min MHz: 400.0000 BogoMIPS: 2380.80 Virtualization: VT-x L1d cache: 192 KiB L1i cache: 128 KiB L2 cache: 2 MiB L3 cache: 6 MiB NUMA node0 CPU(s): 0-7 ``` ## 作業要求 ### [lab0](https://hackmd.io/@sysprog/linux2022-lab0#-%E4%BD%9C%E6%A5%AD%E8%A6%81%E6%B1%82) ## 開發過程 ### list_head_t 我覺得每次都要寫 `struct` 過於冗長,所以定義了一個 `list_head_t` 。 ```c typedef struct list_head list_head_t; ``` :::danger 不要這樣作!這是取鏡於 Linux 核心的 API,請保持一致的風格。課程要求學員做好協作的準備,這類的細節和態度就是其一。現在就改回來! :notes: jserv ::: :::success 已將程式碼中 list_head_t 的部份都刪除。 ::: ### q_new ```c struct list_head *q_new() { struct list_head *node = malloc(sizeof(*node)); if (node) INIT_LIST_HEAD(node); return node; } ``` ### q_free 一開始的程式為以下 ```c void q_free(struct list_head *l) { element_t *entry, *safe; list_for_each_entry_safe (entry, safe, l, list) { if (entry->value) free(entry->value); free(entry); } free(l); return; } ``` 但當我完成 insert , remove 等程式,發現有關 malloc failure on new 的測試有時會發生錯誤,而且是 segmentation fault 。我決定用 `make valgrind` 來檢查出錯的地方,於是有了以下訊息: ``` # Test of malloc failure on new ==26221== Invalid read of size 8 ==26221== at 0x10E6C9: q_free (queue.c:45) ==26221== by 0x10ADC4: queue_quit (qtest.c:838) ==26221== by 0x10D54E: do_quit (console.c:253) ==26221== by 0x10DF9F: finish_cmd (console.c:593) ==26221== by 0x10C728: main (qtest.c:963) ==26221== Address 0x8 is not stack'd, malloc'd or (recently) free'd ``` 根據以上訊息,可以知道問題發生在 ```q_free``` 中,而 ```q_free``` 中第一個遇到的就是 ```list_for_each_entry_safe``` 這個 MACRO ,於是我去看 ```list.h``` 中此巨集的實作方法: ```c #define list_for_each_entry_safe(entry, safe, head, member) \ for (entry = list_entry((head)->next, __typeof__(*entry), member), \ safe = list_entry(entry->member.next, __typeof__(*entry), member); \ &entry->member != (head); entry = safe, \ safe = list_entry(safe->member.next, __typeof__(*entry), member)) ``` 可以看到這個巨集一開始就直接用 >entry = list_entry((head)->next, __typeof__(*entry), member) 將 `list_entry` 的內容指派給 `entry` 。而且也直接使用`(head)->next` ,所以我很快發現我忘記檢查傳入的引數是否為 `NULL` 或 empty queue,於是我將程式碼改成以下: ```c void q_free(struct list_head *l) { if (!l) return; if (list_empty(l)) { free(l); return; } element_t *entry, *safe; list_for_each_entry_safe (entry, safe, l, list) { if (entry->value) free(entry->value); free(entry); } free(l); return; } ``` :::warning 考慮用 `q_release_element` 取代迴圈內部。 ::: #### 其他想法: 我覺得上面那段程式中 ```c if (!l) return; if (list_empty(l)) { free(l); return; } ``` 有點冗長,於是我嘗試將程式碼改成以下: ```c if (!l || (list_empty(l) && (free(l),true))) return; ``` 其中用到了 || 及 && 在前者條件已經確定下,後者不會被執行的技巧。而後面的 ``(free(l),true)`` 則是我參閱規格書中 comma operator : >The left operand of a comma operator is evaluated as a void expression; there is a sequence point after its evaluation. Then the right operand is evaluated; the result has its type and value. 97) If an attempt is made to modify the result of a comma operator or to access it after the next sequence point, the behavior is undefined. 因此我想它的行為應該會是執行完前面的 `free(l)` 後直接以 true 來取代這段 expression 。 在 qtest 中上面這段程式碼可以正確執行,但我還沒有放到<s>正式版本</s>中。 :::warning 疑問:不知道類似以上的寫法是不是好的實作方式,有沒有其他我沒想到的問題或是可讀性太差? ::: 最後發現根本不用這麼麻煩,因為如果傳入 free 的指標本身就指向 NULL ,free 是不會有任何行為的,所以根本不用多做一個檢查。 簡化後的版本: ```c void q_free(struct list_head *l) { if (!l || list_empty(l)) { free(l); return; } element_t *entry, *safe; list_for_each_entry_safe (entry, safe, l, list) q_release_element(entry); free(l); return; } ``` ### q_insert_head ```c bool q_insert_head(struct list_head *head, char *s) { if (!head) return false; element_t *node = element_new(s); if (!node) return false; list_add(&node->list, head); return true; } ``` 在實作 `q_insert_head` 以及 `q_insert_tail` 時,我發現兩者有許多重複的地方,於是我嘗試將那些部份寫成另一個函式 `element_new` 。此函式以一個字串 `s` 為參數,配置一個新的記憶體空間給 `element_t` ,並將 `value` 複製為該字串(也是獨立配置記憶體空間),如果過程中配置失敗會回傳 `NULL` ,如果 `s` 指向 `NULL` 則會將 `value` 也指向 `NULL`。 :::warning 疑問:我看到許多危險的函式,如 `strcpy`, `gets` 等等。`strlen` 會不會也有潛在風險? > 請詳閱 [strlen(3)](https://man7.org/linux/man-pages/man3/strlen.3.html),摘錄: > > In cases where the input buffer may not contain a terminating null byte, strnlen(3) should be used instead. > :notes: jserv ::: ```c element_t *element_new(char *s) { element_t *node; if (!(node = malloc(sizeof(*node)))) return NULL; node->value = NULL; if (s == NULL) return node; char *str; int n = strlen(s) + 1; if (!(str = malloc(sizeof(*str) * n))) { free(node); return NULL; } strncpy(str, s, n); node->value = str; return node; } ``` 在與教授的討論中,提到了 `sizeof(*str)` (同 `sizeof(char)`) 因為結果是 `1`,所以其實可以拿掉不用寫。 我因此產生出了:有沒有可能在某些機器或實作上, `sizeof(char)` 不是一? 查了一些資料發現, c 語言規格書 6.5.3 中就有提到: > The sizeof operator yields the size (in bytes) of its operand, which may be an expression or the parenthesized name of a type. The size is determined from the type of the operand. The result is an integer. If the type of the operand is a variable length array type, the operand is evaluated; otherwise, the operand is not evaluated and the result is an integer constant. > > When applied to an operand that has type char, unsigned char, or signed char, (or a qualified version thereof) the result is 1. When applied to an operand that has array type, the result is the total number of bytes in the array.) When applied to an operand that has structure or union type, the result is the total number of bytes in such an object, including internal and trailing padding. 因此, sizeof(char) 的結果永遠會是一,程式碼中確實可以刪去這部份使其更精簡。 :::warning 不要花太多心思在「查了一些資料發現」,不如用在探索 C 語言規格書。 :notes: jserv ::: > 延伸閱讀: > [Are there machines, where sizeof(char) != 1, or at least CHAR_BIT > 8?](https://stackoverflow.com/questions/2215445/are-there-machines-where-sizeofchar-1-or-at-least-char-bit-8) 簡化後的版本: ```c element_t *element_new(char *s) { element_t *node; if (!(node = malloc(sizeof(*node)))) return NULL; char *str; int n = strlen(s) + 1; if (!(str = malloc(n))) { free(node); return NULL; } strncpy(str, s, n); node->value = str; return node; } ``` ### q_insert_tail ```c bool q_insert_tail(struct list_head *head, char *s) { if (!head) return false; element_t *node = element_new(s); if (!node) return false; list_add_tail(&node->list, head); return true; } ``` ### q_remove_head ```c element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize) { if (!head || list_empty(head)) return NULL; struct list_head *rm_node = head->next; list_del(rm_node); element_t *rm_ele = list_entry(rm_node, element_t, list); // If the value of removed element points to NULL, do nothing. if (!sp || !(rm_ele->value)) return rm_ele; strncpy(sp, rm_ele->value, bufsize); sp[bufsize - 1] = '\0'; return rm_ele; } ``` ### q_remove_tail ```c element_t *q_remove_tail(struct list_head *head, char *sp, size_t bufsize) { if (!head || list_empty(head)) return NULL; struct list_head *rm_node = head->prev; list_del(rm_node); element_t *rm_ele = list_entry(rm_node, element_t, list); // If the value of removed element points to NULL, do nothing. if (!sp || !(rm_ele->value)) return rm_ele; strncpy(sp, rm_ele->value, bufsize); sp[bufsize - 1] = '\0'; return rm_ele; } ``` ### q_size ```c int q_size(struct list_head *head) { if (!head || list_empty(head)) return 0; struct list_head *node; unsigned count = 0; list_for_each (node, head) count++; return count; } ``` ### q_delete_mid ```c bool q_delete_mid(struct list_head *head) { if (!head || list_empty(head)) return false; struct list_head *fast, *slow; for (fast = slow = head->next; fast != head && fast->next != head; slow = slow->next, fast = fast->next->next) ; list_del(slow); q_release_element(list_entry(slow, element_t, list)); return true; } ``` ### q_delete_dup 因為走訪整個 queue 的過程中有可能會移除當前的 node ,因此使用 `list_for_each_safe` 而不是 `list_for_each` 。 原始版本: ```c bool q_delete_dup(struct list_head *head) { if (!head || list_empty(head)) return false; struct list_head *node, *safe; bool last_dup = false; list_for_each_safe (node, safe, head) { element_t *cur = list_entry(node, element_t, list); if (node->next != head && !strcmp(cur->value, list_entry(node->next, element_t, list)->value)) { list_del(node); q_release_element(cur); last_dup = true; } else if (last_dup) { list_del(node); q_release_element(cur); last_dup = false; } } return true; } ``` 經過教授的建議,我重新思考是否可以移除特例,並減少重複的程式碼,最後改成以下: ```c bool q_delete_dup(struct list_head *head) { if (!head || list_empty(head)) return false; struct list_head *node, *safe; bool last_dup = false; list_for_each_safe (node, safe, head) { element_t *cur = list_entry(node, element_t, list); bool match = node->next != head && !strcmp(cur->value, list_entry(node->next, element_t, list)->value); if (match || last_dup) { list_del(node); q_release_element(cur); } last_dup = match; } return true; } ``` ### q_swap 在一開始的想法中,每次迴圈都讓 `node` 往前兩次,後來發現 swap 的過程中 `node` 所指的 `struct list_head` 本身就會往前一次,因此迴圈結束只要往前一次。 ```c void q_swap(struct list_head *head) { if (!head || list_empty(head)) return; struct list_head *node; /* When traverse all queue, the operation swapping automatically * move the node forward once. In the end of each iteration, it * will only need (node = node->next) instead of * (node = node->next->next). */ for (node = head->next; node != head && node->next != head; node = node->next) list_move_tail(node->next, node); } ``` ### q_reverse ```c void q_reverse(struct list_head *head) { if (!head || list_empty(head)) return; struct list_head *node, *safe; list_for_each_safe (node, safe, head) list_move(node, head); } ``` ### q_sort 一開始決定以<s>疊代</s> **迭代**的方式實作 merge sort 。 :::warning 依據《[國家教育研究院雙語詞彙、學術名詞暨辭書資訊網](https://terms.naer.edu.tw/)》,"iterative" 一詞翻譯為「迭代」。 注意「[疊](https://dict.revised.moe.edu.tw/dictView.jsp?ID=2195)」和「[迭](https://dict.revised.moe.edu.tw/dictView.jsp?ID=2169)」語意不同: * 疊: 堆聚、累積 * 迭: 輪流、更替,如:「更迭」。《詩經.邶風.柏風》:「日居月諸,胡迭而微。」 依本處情境來,乃逐一節點走訪,存取特定的記憶體內容,因此該用「迭」 :notes: jserv ::: 初始版本: ```c #define STACKSIZE 100000 void q_sort(struct list_head *head) { if (!head || list_empty(head) || list_is_singular(head)) return; int count = 0, n = q_size(head); struct list_head *sorted[STACKSIZE]; struct list_head *cur, *safe; list_for_each_safe (cur, safe, head) INIT_LIST_HEAD(sorted[count++] = cur); for (int size_each_list = 1; size_each_list < n; size_each_list *= 2) { for (int i = 0; i + size_each_list < n; i += size_each_list * 2) { struct list_head *left = sorted[i]; struct list_head *right = sorted[i + size_each_list]; sorted[i] = merge(left, right); } } list_add_tail(head, sorted[0]); } /* * The list in this function is doubly circular linked_list without * the Head node. * Move each node in list right to list left at corresponding position. */ struct list_head *merge(struct list_head *left, struct list_head *right) { struct list_head *head = left; if (strcmp(list_entry(left, element_t, list)->value, list_entry(right, element_t, list)->value) > 0) list_move_tail(head = (right = right->next)->prev, left); struct list_head *tail = head->prev; while (left != tail && right->next != left) { int cmp = strcmp(list_entry(left, element_t, list)->value, list_entry(right, element_t, list)->value); if (cmp <= 0) // to keep sorting stable, split condition as <= , > left = left->next; else list_move_tail((right = right->next)->prev, left); } while (right->next != left && right->next != head) { int cmp = strcmp(list_entry(left, element_t, list)->value, list_entry(right, element_t, list)->value); list_move_tail((right = right->next)->prev, cmp < 0 ? head : left); // Shoud be cmp <= 0 because of stability } return head; } ``` 我覺得上面的 `merge` 的實作有點笨,本來是想把右列的 sorted list 一個一個判斷後插入左列,但越寫越不直覺,還要想一些例外狀況,可讀性差也不易理解,於是寫完之後決定重寫一份: ```c struct list_head *merge(struct list_head *left, struct list_head *right) { struct list_head *head; int cmp = strcmp(list_entry(left, element_t, list)->value, list_entry(right, element_t, list)->value); struct list_head **chosen = cmp <= 0 ? &left : &right; // cmp <= 0 for stability head = *chosen; *chosen = (*chosen)->next; list_del_init(head); while (left->next != head && right->next != head) { cmp = strcmp(list_entry(left, element_t, list)->value, list_entry(right, element_t, list)->value); chosen = cmp <= 0 ? &left : &right; // cmp <= 0 for stability list_move_tail((*chosen = (*chosen)->next)->prev, head); } struct list_head *remain = left->next != head ? left : right; struct list_head *tail = head->prev; head->prev = remain->prev; head->prev->next = head; remain->prev = tail; remain->prev->next = remain; return head; } ``` 上面這段程式碼是先判斷找出 `value` 最小的 element 當成第一個 node ,然後比較左右列最小的 node 一個一個插入進去。 執行起來較為對稱,也沒有太多例外狀況,較好理解可讀性也較佳。 需要特別注意的是為了保持排序的穩定性,條件為 `cmp <= 0` 而不是 `cmp < 0` 。 不過 `q_sort` 中有一個致命的缺點,就是陣列 sorted 是固定長度,若 queue 太小會很浪費空間,太大則會無法執行。另外,這次作業似乎禁止在 `q_sort` 中使用 `malloc` ,不過就算能用,對於記憶體也可能會是很大的負擔。於是我想了另一套實作方法來解決這個問題: ```c void q_sort(struct list_head *head) { if (!head || list_empty(head) || list_is_singular(head)) return; /* * The sorted list implementation below doesn't include the head * node. Which means every node in a sorted list is a member of * element_t. * */ /* * In order to save memory space, this implementation use * member pointer next to determine each sorted list. * * If a pointer * * struct list_head *tail; * * points the end of a sorted list, its tail->next will point to * next sorted list, instead of first node of sorted list. * At beginning , merge sort split the list, so each node itself * is a sorted list. Therefore, each node with prev pointing to * itself, and next pointing to next node. * Final sorted list has it tail's next point to head. */ struct list_head *cur, *safe; list_for_each_safe (cur, safe, head) cur->prev = cur; /* pointer first points to first sorted list */ struct list_head *first = head->next; INIT_LIST_HEAD(head); while (first->prev->next != head) { struct list_head **last = &first; struct list_head *next_list = (*last)->prev->next; struct list_head *next_next_list = next_list->prev->next; while ((*last) != head && next_list != head) { /* * Make each sorted list doubly circular list * to make use of function merge. */ (*last)->prev->next = (*last); next_list->prev->next = next_list; (*last) = merge((*last), next_list); /* * The result of function merge is a doubly circular list, * so make tail of list point it next to next sorted list. */ last = &((*last)->prev->next); *last = next_next_list; next_list = (*last)->prev->next; next_next_list = next_list->prev->next; } } list_add_tail(head, first); } ``` 這個方法主要使用每個 sorted list 尾端節點的 `next` 指向下一個 sorted list ,如此一來我就可以用 linked_list 來走訪各個子 sorted list ,不需要額外的空間來記錄這些 sorted list 的位置。 結構大致上如下: ```graphviz digraph "Doubly Linked List" { rankdir=LR; node [shape=record]; e [label="{<ref2> | first }"]; a [label="{ <ref1> | <data> 1 | <ref2> }"] b [label="{ <ref1> | <data> 5 | <ref2> }"]; c [label="{ <ref1> | <data> 7 | <ref2> }"]; f [label="{ <ref1> | <data> 2 | <ref2> }"] g [label="{ <ref1> | <data> 3 | <ref2> }"]; h [label="{ <ref1> | <data> 6 | <ref2> }"]; d [label="{ <ref1> | head | <ref2> }"]; a:data:n -> e:ref2:c [arrowhead=dot, arrowtail=vee, dir=both, headclip=false]; c:data:s -> a:ref1:s [arrowhead=dot, arrowtail=vee, dir=both, headclip=false]; a:ref2:c -> b:data:n [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; b:ref2:c -> c:data:n [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; c:ref2:c -> f:data:n [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; c:ref1:c -> b:data:s [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; b:ref1:c -> a:data:s [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; h:data:s -> f:ref1:s [arrowhead=dot, arrowtail=vee, dir=both, headclip=false]; f:ref2:c -> g:data:n [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; g:ref2:c -> h:data:n [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; h:ref2:c -> d [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; h:ref1:c -> g:data:s [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; g:ref1:c -> f:data:s [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; d:ref2:c -> d [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; d:ref1:c -> d [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; } ``` 圖中 1 5 7 為一個 sorted list , 2 3 6 為另一個 sorted list ,以一個 `first` 來記錄整個 linked list 的位置,最後一個節點則指向 `head` ,而 head 指向自己。 ==註:此圖僅為說明,若是圖中的 queue ,以程式碼執行的結果應該不會出現三個三個一組的情形。== 範例說明: 假如一個 queue 順序為 1 6 7 2 5 3 。 程式一開始先以 ```c struct list_head *cur, *safe; list_for_each_safe (cur, safe, head) cur->prev = cur; /* pointer first points to first sorted list */ struct list_head *first = head->next; INIT_LIST_HEAD(head); ``` 將 queue 切割成單個 sorted list ,圖如下: ```graphviz digraph "Doubly Linked List" { rankdir=LR; node [shape=record]; e [label="{<ref2> | first }"]; a [label="{ <ref1> | <data> 1 | <ref2> }"] b [label="{ <ref1> | <data> 6 | <ref2> }"]; c [label="{ <ref1> | <data> 7 | <ref2> }"]; f [label="{ <ref1> | <data> 2 | <ref2> }"] g [label="{ <ref1> | <data> 5 | <ref2> }"]; h [label="{ <ref1> | <data> 3 | <ref2> }"]; d [label="{ <ref1> | head | <ref2> }"]; a:data:n -> e:ref2:c [arrowhead=dot, arrowtail=vee, dir=both, headclip=false]; a:data:s -> a:ref1:s [arrowhead=dot, arrowtail=vee, dir=both, headclip=false]; a:ref2:c -> b:data:n [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; b:ref2:c -> c:data:n [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; c:ref2:c -> f:data:n [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; c:ref1:c -> c:data:s [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; b:ref1:c -> b:data:s [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; f:data:s -> f:ref1:s [arrowhead=dot, arrowtail=vee, dir=both, headclip=false]; f:ref2:c -> g:data:n [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; g:ref2:c -> h:data:n [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; h:ref2:c -> d [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; h:ref1:c -> h:data:s [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; g:ref1:c -> g:data:s [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; d:ref2:c -> d [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; d:ref1:c -> d [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; } ``` 接著對每兩個相鄰的 sorted list 做合併: ```graphviz digraph "Doubly Linked List" { rankdir=LR; node [shape=record]; e [label="{<ref2> | first }"]; a [label="{ <ref1> | <data> 1 | <ref2> }"] b [label="{ <ref1> | <data> 6 | <ref2> }"]; c [label="{ <ref1> | <data> 2 | <ref2> }"]; f [label="{ <ref1> | <data> 7 | <ref2> }"] g [label="{ <ref1> | <data> 3 | <ref2> }"]; h [label="{ <ref1> | <data> 5 | <ref2> }"]; d [label="{ <ref1> | head | <ref2> }"]; a:data:n -> e:ref2:c [arrowhead=dot, arrowtail=vee, dir=both, headclip=false]; b:data:s -> a:ref1:s [arrowhead=dot, arrowtail=vee, dir=both, headclip=false]; a:ref2:c -> b:data:n [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; b:ref2:c -> c:data:n [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; c:ref2:c -> f:data:n [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; c:ref1:s -> f:data:s [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; b:ref1:c -> a:data:s [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; c:data:s -> f:ref1:s [arrowhead=dot, arrowtail=vee, dir=both, headclip=false]; f:ref2:c -> g:data:n [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; g:ref2:c -> h:data:n [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; h:ref2:c -> d [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; h:ref1:c -> g:data:s [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; g:ref1:s -> h:data:s [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; d:ref2:c -> d [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; d:ref1:c -> d [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; } ``` ```graphviz digraph "Doubly Linked List" { rankdir=LR; node [shape=record]; e [label="{<ref2> | first }"]; a [label="{ <ref1> | <data> 1 | <ref2> }"] b [label="{ <ref1> | <data> 2 | <ref2> }"]; c [label="{ <ref1> | <data> 6 | <ref2> }"]; f [label="{ <ref1> | <data> 7 | <ref2> }"] g [label="{ <ref1> | <data> 3 | <ref2> }"]; h [label="{ <ref1> | <data> 5 | <ref2> }"]; d [label="{ <ref1> | head | <ref2> }"]; a:data:n -> e:ref2:c [arrowhead=dot, arrowtail=vee, dir=both, headclip=false]; f:data:s -> a:ref1:s [arrowhead=dot, arrowtail=vee, dir=both, headclip=false]; a:ref2:c -> b:data:n [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; b:ref2:c -> c:data:n [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; c:ref2:c -> f:data:n [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; c:ref1:c -> b:data:s [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; b:ref1:c -> a:data:s [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; c:data:s -> f:ref1:c [arrowhead=dot, arrowtail=vee, dir=both, headclip=false]; f:ref2:c -> g:data:n [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; g:ref2:c -> h:data:n [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; h:ref2:c -> d [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; h:ref1:c -> g:data:s [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; g:ref1:s -> h:data:s [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; d:ref2:c -> d [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; d:ref1:c -> d [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; } ``` ```graphviz digraph "Doubly Linked List" { rankdir=LR; node [shape=record]; e [label="{<ref2> | first }"]; a [label="{ <ref1> | <data> 1 | <ref2> }"] b [label="{ <ref1> | <data> 2 | <ref2> }"]; c [label="{ <ref1> | <data> 3 | <ref2> }"]; f [label="{ <ref1> | <data> 5 | <ref2> }"] g [label="{ <ref1> | <data> 6 | <ref2> }"]; h [label="{ <ref1> | <data> 7 | <ref2> }"]; d [label="{ <ref1> | head | <ref2> }"]; a:data:n -> e:ref2:c [arrowhead=dot, arrowtail=vee, dir=both, headclip=false]; h:data:s -> a:ref1:s [arrowhead=dot, arrowtail=vee, dir=both, headclip=false]; a:ref2:c -> b:data:n [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; b:ref2:c -> c:data:n [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; c:ref2:c -> f:data:n [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; c:ref1:c -> b:data:s [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; b:ref1:c -> a:data:s [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; c:data:s -> f:ref1:c [arrowhead=dot, arrowtail=vee, dir=both, headclip=false]; f:ref2:c -> g:data:n [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; g:ref2:c -> h:data:n [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; h:ref2:c -> d [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; h:ref1:c -> g:data:s [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; g:ref1:c -> f:data:s [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; d:ref2:c -> d [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; d:ref1:c -> d [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; } ``` 最後再將 `head` 插回排序好的 queue 中,再回傳就完成了。 >graphviz 參考資料: https://stackoverflow.com/questions/70441786/draw-doubly-linked-list-using-graphviz ### 研讀 lib/list_sort.c 除了程式碼及註解外,我參考了 [研讀 Linux 核心的 lib/list_sort.c 原始程式碼](https://hackmd.io/@DokiDokiPB/SJfES914O#list_sort) ,才發現 commit message 也是很重要的資訊來源。如果實作上較為複雜或有其他考量,開發者會把重要的資訊及考量的點也寫在 commit message 中。 >延伸閱讀: >[lib/list_sort: optimize number of calls to comparison function](https://github.com/torvalds/linux/commit/b5c56e0cdd62979dd538e5363b06be5bdf735a09?branch=b5c56e0cdd62979dd538e5363b06be5bdf735a09&diff=split) 研讀 `lib/list_sort.c` 時我發現 `list_sort.c` 實作方法跟我的有一些相似之處,例如用 `prev` 來串起排序好的子串列,以及用 bottom up 的方法實作排序。不過也有許多不同的地方, `list_sort.c` 在合併的過程中,直接把子串列當成單向鏈結串列,因此操作上省去了許多 `prev` 的指標操作,只要在最後一次 merge 時再串起即可。 而選取合併的子串列時,雖然是用 bottom up 的方法,但不同於一般的 level order merge ——即每一層相同大小的子串列都合併完後,才合併更大的子串列—— `list_sort.c` 的實作上採用了 depth-first order ,目的是為了避免 thrashing 。 level order merge 在每一層都會幾乎讀取所有子串列(有些實作上最後一條子串列可能不對讀到),當要排序的雙向鏈結串列超過 L1 cache 的大小,同一層前面排序好的子串列必須從 cache 中搬出,讓給後面要排序的子串列。而到了下一層,剛被搬出的子串列又要被搬進 cache ,造成資料不斷被搬進搬出 cache ,頻繁讀取卻沒有利用到 cache ,對於效能是很大的傷害。如果採用 depth-first order,就可以趁相近的子串列都還在 L1 cache 時,盡量將大小相同的兩者合併,進而打造出 cache friendly 的實作方法。 不過只用 depth-first order 還有一個很大的問題,就是會造成 unbalanced merge ,即兩個大小差異極大的子串列做合併。(我的實作中沒有考慮到這一點,這是一個可以再改進的部份。) 因此,這份 commit :[lib/list_sort: optimize number of calls to comparison function](https://github.com/torvalds/linux/commit/b5c56e0cdd62979dd538e5363b06be5bdf735a09?branch=b5c56e0cdd62979dd538e5363b06be5bdf735a09&diff=split) 的作者 George Spelvin 提出了另一種實作方式來改善這個缺點。 首先,他分析了不同實作下所需的 compares 次數。 $n*\log_{2}(n) - K*n + O(1)$ 其中使用一般 Top down merge sort 因為可以將子串列平均分割,因此平均情況下 K = 1.248 ,而一般的 bottom up merge sort 平均情況下 K = 0.965 。(George Spelvin 做的實驗顯示 K = 1.01) 但是使用 Top down merge sort 或其他改良版的 merge sort (如 queue merge sort)時,都會用到 breadth-first multi-pass structure ,與我們想要以 depth-first order 善用 cache 的需求不符。 George Spelvin 透過確保每次合併時,兩個子串列的大小不會大於 $2 : 1$ ,成功的將 K 值增加到 1.207,省下了 0.2 * n 次的 compares 操作。 >George Spelvin 在 commit message 最後附上了一些分析合併排序的論文: > >[Bottom-up Mergesort: A Detailed Analysis](https://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.6.5260) >Wolfgang Panny, Helmut Prodinger Algorithmica 14(4):340--354, October 1995 > >[The cost distribution of queue-mergesort, optimal mergesorts, and power-of-two rules](https://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.4.5380) >Wei-Mei Chen, Hsien-Kuei Hwang, Gen-Huey Chen Journal of Algorithms 30(2); Pages 423--448, February 1999 > >[Queue-Mergesort](https://doi.org/10.1016/0020-0190(93)90088-q) >Mordecai J. Golin, Robert Sedgewick Information Processing Letters, 48(5):253--259, 10 December 1993 #### 確保 2 : 1 的實作方法 這段操作十分精簡且高效,因此花了我好長一段時間去理解,這到底是什麼巫術。 George Spelvin 甚至在 commit message 的最後寫上了: > I confess to being more than a little bit proud of how clean this code turned out. It took a lot of thinking, but the resultant inner loop is very simple and efficient. 先引入一個變數 `count` 來記錄目前讀進的元素個數,並根據其 bit pattern 來判斷每次的合併。 節錄 `list_sort.c` 中的註解: ```c /* * The number of pending lists of size 2^k is determined by the * state of bit k of "count" plus two extra pieces of information: * * - The state of bit k-1 (when k == 0, consider bit -1 always set), and * - Whether the higher-order bits are zero or non-zero (i.e. * is count >= 2^(k+1)). * * There are six states we distinguish. "x" represents some arbitrary * bits, and "y" represents some arbitrary non-zero bits: * 0: 00x: 0 pending of size 2^k; x pending of sizes < 2^k * 1: 01x: 0 pending of size 2^k; 2^(k-1) + x pending of sizes < 2^k * 2: x10x: 0 pending of size 2^k; 2^k + x pending of sizes < 2^k * 3: x11x: 1 pending of size 2^k; 2^(k-1) + x pending of sizes < 2^k * 4: y00x: 1 pending of size 2^k; 2^k + x pending of sizes < 2^k * 5: y01x: 2 pending of size 2^k; 2^(k-1) + x pending of sizes < 2^k * (merge and loop back to state 2) */ ``` 其中 ```c /* k k-1 * v v * 0: 0 0 x: 0 pending of size 2^k; x pending of sizes < 2^k * 1: 0 1 x: 0 pending of size 2^k; 2^(k-1) + x pending of sizes < 2^k * 2: x 1 0 x: 0 pending of size 2^k; 2^k + x pending of sizes < 2^k * 3: x 1 1 x: 1 pending of size 2^k; 2^(k-1) + x pending of sizes < 2^k * 4: y 0 0 x: 1 pending of size 2^k; 2^k + x pending of sizes < 2^k * 5: y 0 1 x: 2 pending of size 2^k; 2^(k-1) + x pending of sizes < 2^k * (merge and loop back to state 2) */ ``` 左邊的 0/1 代表第 k 個 bit ,右邊的 0/1 代表第 k-1 個 bit 。 一開始不知道那一個才是 bit k ,看了好久才看懂。 x 最多為 k-2 bits ,因此最大為 $2^{k-1}-1$ 。 對於第 k bit ,可以將其分為以下六種狀態,並由其所在的狀態來判斷目前長度為 $2^k$ 的串列數量: `0: 00x: 0 pending of size 2^k; x pending of sizes < 2^k` 代表目前有 x 數量個元素在長度小於 $2^k$ 的子串列中。 `1: 01x: 0 pending of size 2^k; 2^(k-1) + x pending of sizes < 2^k` 代表有 ($2^{k-1}$ + x) 個元素在長度小於 $2^k$ 的子串列中。 `2: x10x: 0 pending of size 2^k; 2^k + x pending of sizes < 2^k` 代表有 ($2^k$ + x) 個元素在長度小於 $2^k$ 的子串列中,但為了確保之後的合併不會超過 2 : 1 ,因此不會將那 $2^k$ 個合併成一個長度為 $2^k$ 的串列。 `3: x11x: 1 pending of size 2^k; 2^(k-1) + x pending of sizes < 2^k` 代表有一個長度為 $2^k$ 的串列,另外有 ($2^k$ + x) 個元素在長度小於 $2^k$ 的子串列中。 因為至少有 $2^k$ + $2^{k-1}$ 個元素,所以可以將那 $2^k$ 個元素合併。如果之後都沒有其他元素了,剩下長度差異最多也就是 $2^k : 2^{k-1}=2:1$ 。 `4: y00x: 1 pending of size 2^k; 2^k + x pending of sizes < 2^k` 代表有一個長度為 $2^k$ 的串列,另外有 ($2^k$ + x) 個元素在長度小於 $2^k$ 的子串列中。跟狀態 2 一樣,雖然有可合併成 $2^k$ 的子串列,但不會立即合併成 $2^k$。 `5: y01x: 2 pending of size 2^k; 2^(k-1) + x pending of sizes < 2^k` 代表有兩個長度為 $2^k$ 的串列,另外有 ($2^{k-1}$ + x) 個元素在長度小於 $2^k$ 的子串列中。兩個長度為 $2^k$ 的子串列一樣不會立即合併成 $2^{k+1}$ ,唯有確定後面的元素個數達到 $2^k$ ,足以確保差距不會超過 $2^{k+1}:2^k =2:1$ ,才會進行合併並且回到狀態 2 。 >參考[研讀 Linux 核心的 lib/list_sort.c 原始程式碼 的表格](https://hackmd.io/@DokiDokiPB/SJfES914O#list_sort) >以及[hankluo6 的共筆 表格](https://hackmd.io/@hankluo6/2021q1quiz2#%E8%AA%AA%E6%98%8E-Linux-%E6%A0%B8%E5%BF%83%E7%9A%84-liblist_sortc-%E6%9C%80%E4%BD%B3%E5%8C%96%E6%89%8B%E6%B3%95) |count decimal|count binary|merge|before merge|after merge | -------- | -------- | -------- |---|---| 0|0000 |No| NULL| X 1|0001 |No| 1 |X 2|0010 |Yes| 1-1 |2 3|0011 |No| 1-2 |X 4|0100 |Yes| 1-1-2 |2-2 5|0101 |Yes| 1-2-2 |1-4 6|0110 |Yes| 1-1-4 |2-4 7|0111 |No| 1-2-4 |X 8|1000 |Yes| 1-1-2-4 |2-2-4 9|1001 |Yes| 1-2-2-4 |1-4-4 10|1010 |Yes | 1-1-4-4| 2-4-4 11|1011 |Yes | 1-2-4-4| 1-2-8 12|1100 |Yes| 1-1-2-8| 2-2-8 13|1101 |Yes | 1-2-2-8 |1-4-8 14|1110|Yes | 1-1-4-8 |2-4-8 15|1111 |No | 1-2-4-8 |X 16|10000 |Yes| 1-1-2-4-8| 2-2-4-8 有了以上的初步理解,就可以來看 list_sort.c 程式實作: ```c do { size_t bits; struct list_head **tail = &pending; /* Find the least-significant clear bit in count */ for (bits = count; bits & 1; bits >>= 1) tail = &(*tail)->prev; /* Do the indicated merge */ if (likely(bits)) { struct list_head *a = *tail, *b = a->prev; a = merge(priv, cmp, b, a); /* Install the merged result in place of the inputs */ a->prev = b->prev; *tail = a; } /* Move one element from input list to pending */ list->prev = pending; pending = list; list = list->next; pending->next = NULL; count++; } while (list); ``` 先分析每一次讀取時 `count` 的變化,有可能是: $0\overbrace{11..11}^\text{k 個} = 01x$ (狀態 1) 這樣的話會被下面的程式排除,不給合併。 ```c for (bits = count; bits & 1; bits >>= 1) tail = &(*tail)->prev; /* Do the indicated merge */ if (likely(bits)) { /*....*/ } ``` 另一種則是 $y0\overbrace{11..11}^\text{k 個} = y01x$ (狀態 5) 每一次 `count` + 1 ,都會有一個 bit k 從 0 進位成 1 ,這個 bit 為 least signifcant clear bit 。而這個 bit 處於狀態 5 ,也就是目前有兩個長度為 $2^k$ 的子串列。 在它之前的所有 bits 皆為狀態 3 。 $x111..11 = x11x$ (狀態 3) 包括第 0 個 bit > The state of bit k-1 (when k == 0, consider bit -1 always set) 根據狀態 3 的敘述,可以說目前 $\forall \ i \in Z, 0\le i\le k-1$ 都有剛好一條長度為 $2^i$ 的子串列。 也就是目前的子串列大小排列是: $\overset{pending}{\implies}\overbrace{2^0\overset{prev}{\implies}2^1\overset{prev}{\implies}.....\overset{prev}{\implies}2^k}^\text{k 個 prev}\overset{prev}{\implies}2^k\overset{prev}{\implies}..$ 目前在長度小於 $2^k$ 的子串列中的元素個數為: $(2^0 + 2^1 + 2^2 + ... + 2^{k-1}) + 1 = (2^k - 1)+1 = 2^k$ 數學式中最後的 $1$ 並不在各個子串列中,他是這個迴圈的結尾才會被加進的元素。這個數學式讓我們得知這個迴圈結束時長度小於 $2^k$ 的子串列中的元素個數已經到了 $2^k$ ,因此可以合併兩條大小為 $2^k$ 的子串列。 所以在一開始的迴圈只要往前 k 次就可以到達要合併子串列: ```c for (bits = count; bits & 1; bits >>= 1) tail = &(*tail)->prev; ``` 到達後進行合併: ```c if (likely(bits)) { struct list_head *a = *tail, *b = a->prev; a = merge(priv, cmp, b, a); /* Install the merged result in place of the inputs */ a->prev = b->prev; *tail = a; } ``` 然後輸入下一個元素: ```c /* Move one element from input list to pending */ list->prev = pending; pending = list; list = list->next; pending->next = NULL; count++; ``` 沒有待輸入的元素時,將剩餘的子串列從小到大合併: ```c /* End of input; merge together all the pending lists. */ list = pending; pending = pending->prev; for (;;) { struct list_head *next = pending->prev; if (!next) break; list = merge(priv, cmp, pending, list); pending = next; } ``` 最後在用 `merge_final` 合併最後兩條子串列,並串起 `prev` : ```c merge_final(priv, cmp, head, pending, list); ``` 用這種方式就可以達成 depth-first merge sort 同時保證兩者差異不會大於 $2:1$ 。 #### 一些證明 前提:所有子串列其長度皆為 2 的冪,所有合併皆為同大小串列。 設 $t_i$ 為目前長度小於 $2^i$ 的所有子串列的節點數總和。$i$ 為正整數。 第一點: 證明當 $t_i$ 從 $3\times2^{i-1}-1$ 加一變成 $3\times2^{i-1}$ 時,不存在正整數 $j \neq i$ 使得 $t_j$ = $3\times2^{j-1}$ 。 假設存在 $j>i$ 且 $t_j$ = $3\times2^{j-1}$ 。則有一個正整數 $l$ 使得 $3\times2^{i-1} + l = 3\times2^{j-1}$ 。因為長度小於 $2^{i}$ 的所有子串列的節點數總和都包含在 $t_i$ 裡面了,所以 $l$ 都在長度大於等於 $2^i$ 的串列裡。因此可以把 $l$ 寫成 $A\times2^i$ 。其中 A 等於任意正整數。 $3\times2^{i-1} + A\times2^i = 3\times2^{j-1}$ $(3+2A)\times2^{i-1} = 3\times2^{j-1}$ $\dfrac{(3+2A)}{3} = 2^{j-i}$ $1+\dfrac{2A}{3} = 2^{j-i}$ 令 $A = 3B$ 代入,$B$ 為任意正整數。 $1+2B = 2^{j-i}$ 因為 $j>i$ ,所以 $2^{j-i}$ 必為偶數,不存在任何正整數 $B$ 使得 $1+2B$ 等於偶數,假設不成立,因此不存在 $j>i$ 且 $t_j$ = $3\times2^j$ 。 假設存在 $j<i$ 且 $t_j$ = $3\times2^{j-1}$ 。則有一個正整數 $l$ 使得 $3\times2^{i-1} = 3\times2^{j-1}+l$ 。因為長度小於 $2^{j}$ 的所有子串列的節點數總和都包含在 $t_j$ 裡面了,所以 $l$ 都在長度大於等於 $2^j$ 的串列裡。因此可以把 $l$ 寫成 $A\times2^j$ 。其中 A 等於任意正整數。 $3\times2^{i-1} = 3\times2^{j-1} + A\times2^j$ $3\times2^{i-1} = (3+2A)\times2^{j-1}$ $\dfrac{(3+2A)}{3} = 2^{i-j}$ $1+\dfrac{2A}{3} = 2^{i-j}$ 令 $A = 3B$ 代入,$B$ 為任意正整數。 $1+2B = 2^{i-j}$ 因為 $j<i$ ,所以 $2^{i-j}$ 必為偶數,不存在任正整數 $B$ 使得 $1+2B$ 等於偶數,假設不成立,因此不存在 $j<i$ 且 $t_j$ = $3\times2^j$ 。 總和上述兩點,可得證。 第二點: 證明在 "能保持差異最大 $2:1$ 的情況下,能合併則合併。" 的條件下,當$t_{n+1}$ 從 $3\times2^n-1$ 增加到 $3\times2^n$ 時,一定存在兩個長度為 $2^n$ 的子串列可供合併。 當 $n = 0$,因為長度最短為 1, $3\times2^0$ = 3,長度分別為 1,1,1 ,因此可以合併,成立。 設當 $n = k$ 時成立,則當 n = k+1 時,必須證明當 $t_{k+2}$ 增加到 $3\times2^{k+1}$ 時,一定存在兩個長度為 $2^{k+1}$ 的子串列可合併。 第一個 $2^{k+1}$ 子串列在 $t_{k+1}$來到 $3\times2^k$ 時根據假設誕生。此時 $t_{k+2}$ 也等於 $3\times2^k$ ,合併後 $t_{k+2}$ = $3\times2^k$,$t_{k+1}$ = $2^k$ 。 第二個 $2^{k+1}$ 子串列在 $t_{k+1}$再次來到 $3\times2^k$ 時根據假設誕生,此時 $t_{k+2}$ 來到 $5\times2^k$ 。 由上述兩點可知,在 $t_{k+2}$ 從 0 增加到 $3\times2^{k+1}$ 的過程中,一定會經過 $3\times2^k$ 以及 $5\times2^k$ ,並在這兩時刻產生出長度為 $2^{k+1}$ 的串列。所以當 $t_{k+2}$ 增加到 $6\times2^{k} = 3\times2^{k+1}$ 時,一定存在兩個長度為 $2^{k+1}$ 的子串列可供合併,根據數學歸納法得證。 第三點: 證明 $2^i$ 長度串列的合併(兩個 $2^{i-1}$ 合併成 $2^i$)只會發生在 $t_i$ 變成 $3\times2^{i-1}$ 。 由第二點可知,對所有自然數 $i$ ,$t_i$ 不會超過 $3\times 2^{i-1}$ ,因為只要等於 $3\times2^{i-1}$ 就會合併串列並變回 $2^{i-1}$ 。所以合併不會發生在 $t_i > 3\times2^{i-1}$ 。 當 $t_i < 3\times 2^{i-1}$ ,此時合併成 $2^i$ 串列無法保證兩條串列比例差異不會超過 $2:1$ ,所以不會合併。 故合併只會發生在 $t_i = 3\times 2^{i-1}$ 得證。 綜合上述三點,可得出當 count 增加到 count+1 ,最多只會存在一正整數 $i$ 使得 $t_i$ 為 $3\times 2^{i-1}$ 。若此 $i$ 存在,則一定可合併成長度為 $2^i$ 的串列,且此串列為此時合併的唯一串列。 每次 count 遞增所合併的串列也不會造成更長串列的連鎖合併,因為合併成 $2^i$ 只會改變 $t_i$ 的值,不會改變其他的 $t$ 值,所以不會有其他 $t_j$ 增加成 $3\times 2^{j-1}$ 而造成合併。 ### 為什麼 list_sort 中串列大小的比例差距限制是二比一 因為目的是為了避免合併差距過大,因此使用二比一而不是更大三比一、四比一很合理。需要考慮的是為什麼不用三比二或四比三這種比例。 先簡述一下 list_sort 的演算法: 第一階段: 將節點一個一個讀取,如果確定合併的大小不會超過整體的特定比例,就將之前的串列合併。因為本質還是 bottom up merge sort ,所以此階段的合併都是同大小,所有子串列也都會是二的冪。 第二階段: 當所有節點都讀進並盡可能的同大小合併後,就來到必須合併不同大小的階段。這個階段會將子串列由小大到一個接一個合併。 從以上兩階段可以看出,因為第一階段都是二的冪,在維持 $2m : 2m+1$ 比例的情況下,第一階段結束時最大的子串列就是 $2^{\lfloor log_22mn/{(2m+1)} \rfloor}$ 。 在第二篇論文 [The cost distribution of queue-mergesort, optimal mergesorts, and power-of-two rules](https://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.4.5380) 中看到以下這段話: >Note that $2^{\lfloor log_22n/3 \rfloor}$ is the unique power of two lying between $n/3$ and $2n/3$ and that the choice of rationals other than $2/3$ will not be more balanced. For example, if $2^{\lfloor log_2n/2 \rfloor}$ or $2^{\lfloor log_25n/9 \rfloor}$ then the sizes of two subproblems are $(2,5)$ for $n = 7$ while the balanced power-of-two gives $(3,4)$. 這段話其實沒有很精準,因為當 $n = 3\times 2^k$ 時, 在包括的情況下介於 $n/3$ 以及 $2n/3$ 的二的冪會有兩個,再不包括的情況下卻又會不存在。如果是寫成 $n/3 < 2^{\lfloor log_22n/3 \rfloor} \leq 2n/3$ 就符合這段話的描述。 而這段敘述講述了一件事, $2^{\lfloor log_22n/3 \rfloor}$ 可以找到最接近 $n/2$ 的二的冪。 如果今天是用 $2^{\lfloor log_23n/5 \rfloor}$ 去求 $2^k$ ,則當 $n = 13$ 時,$2^{\lfloor log_23n/5 \rfloor} = 4$,而最接近 $13/2$ 的二的冪卻是 $8$ 。 這代表如果在第一階段用 $3:2$ 的比例去限制,在第二階段合併中,最後一次合併會是 $9:4$ ,這個比例差距已經超過第一階段的 $3:2$ ,而同樣的反例在 $2m:2m+1 ,m>2$ 中也都會發生。因此這樣的演算法只能用在 $2:1$ 的情況。 ### 證明 list_sort 的演算法是 optimal mergesort optimal merge sort 定義:在最差的情況下,比較次數小於等於所有其他 mergesort 。 對所有 mergesort ,都可以繪製出一個 merge tree 來表示其合併的順序及長度,每一個節點的權重代表當下的串列長度。方形的是 external node (leaf) ,圓形的是 internal node 。如下圖所示: ```graphviz digraph BST { node [fontname="Arial" ]; l1 [ label = "5" ]; l21 [ label = "3" ]; l22 [ label = "2" ]; l31 [ label = "2" ]; l32 [shape=rect label = "1" ]; l33 [shape=rect label = "1" ]; l34 [shape=rect label = "1" ]; l41 [shape=rect label = "1" ]; l42 [shape=rect label = "1" ]; l1 -> { l21 l22 }; l21 -> { l31 l32 }; l22 -> { l33 l34 }; l31 -> { l41 l42 }; } ``` >[graphviz參考連結](https://stackoverflow.com/questions/41194837/how-to-get-graphviz-dot-to-represent-binary-tree-correctly) internal node 的數量為 leaf 數減一。在最差的情況下,合併 (x,y) 的次數為 x+y-1 ,因此最差情況下總共的比較次數就是 $\sum\limits_{v}^{T}({w(v)} - 1) = \sum\limits_{v}^{T}{w(v)} - (n-1)$ 其中 $n$ 為這次排序的節點總數,也是 leaf 的總數。而 $v$ 為所有的 internal node 。$w(v)$ 為該 internal node 的權重(也就是長度)。在這邊因為對所有 合併排序 n - 1 都是固定的,所以只要看 $\sum\limits_{v}^{T}{w(v)}$ 就好。 引述論文 [Queue-Mergesort](https://www.sciencedirect.com/science/article/pii/002001909390088Q?via%3Dihub) 中以下段落: >We use the fact that the weight of a merge tree is equal to its external path length. The height h(f) of a node I in a tree is the distance of a path from 1 to the root. The external path length of a tree T’ is the sum E(T’) = $\sum\limits_{l\ a\ leaf\ of\ T'}{h(l)}$ 以及以下段落: >Thus a merge tree T describes an optimal merge-sort on n items if and only if T has minimum external path length $\sum\limits_{l\ a\ leaf}{h(l)}$. It is known that this occurs if and only if the following condition is satisfied: all of T’s leaves are located on its bottom two levels. 因此我們可以得知,只要證明 list_sort 的 merge tree 符合所有 leaf 都在最下面兩層這個條件,就可以證明它是 optimal merge sort 。 分析 list_sort 的演算法,可以得出以下兩點: 第一點:在合併的過程中,最多只有一個子串列不是二的冪(第二階段排最後的子串列)。 第二點:在合併的過程中,兩者差異不大於兩倍。 **證明合併過程中對所有長度 n 的子串列,其 merge tree 的所有 leaf 都在最下面兩層。** **證明過程:** 第一階段所有合併皆為二的冪,符合命題。 **用數學歸納法證明第二階段:** 最小端的子串列只可能是 (1,1) 或 (1,2),兩者合併都符合命題。 對第二階段過程中的任意合併,假設其兩個子串列都符合命題。 則合併的過程中,由第一點可知,其中一個子串列一定為二的冪,因此其 merge tree 為 full binary tree ,所有 leaf 都在最底層。另其長度為 $2^{k_1}$ ,則 merge tree 的高度為 $k_1$ 。 當另一個子串列如果也是二的冪,因為第二點中兩者差異不大於兩倍,因此其大小只可能是 $2^{k_1-1}$ 或 $2^{k_1+1}$ 。 merge tree 的高度 $k_2 = k_1\pm1$,符合命題。 當第二的子串列不為二的冪,那根據假設,此串列的 merge tree 所有 leaf 都在最下面兩層,而其中一層就是 $k_1$ ,否則會違反第二點兩者差異不大於兩倍的描述,因此也符合命題。 根據數學歸納法,對第二階段過程中的任意合併,其 merge tree 的所有 leaf 都在最下面兩層。所以可以得出最後一次合併所產生的 merge tree 也符合命題。 根據論文中所述,可得知 list_sort 中的演算法也是 optimal merge sort 。 ### 用效能分析工具比較 list_sort.c 及自己實作的排序 設計實驗並用效能分析工具比較兩者個差異,除了執行時間外,會特別關注 `list_sort.c` 特別優化的以下兩點: - cache miss rate - compare 次數 我先用 `ADD_COMMAND` 加入了 `kernel_sort` 這到命令,執行時會呼叫引入 `qtest.c` 的 `list_sort` 。 用以下命令檢測執行時間。我特別挑了 260000 以及 270000 這兩個數字,一個在 $2^{18}$(262144) 前,一個在 $2^{18}$ 後,剛好可以看出 queue 的長度對於 comparison 數的影響。關於這部份的分析及圖表在下方有更詳細的實驗。 ``` cmd> new cmd> it RAND 260000 cmd> time sort Delta time = 0.386 cmd> free cmd> new cmd> it RAND 270000 cmd> time sort Delta time = 0.448 cmd> free cmd> new cmd> it RAND 260000 cmd> time kernel_sort Delta time = 0.177 cmd> free cmd> new cmd> it RAND 270000 cmd> time kernel_sort Delta time = 0.188 cmd> free ``` 在 260000 的長度下,兩者差了 2.18 倍。 而在 270000 的長度下,則差了 2.38 倍。 ### comparison 次數 因為對於 George Spelvin 在其 commit message 中所提到的 $K$ 值特別感興趣,因此參閱了他所提供的第一篇論文:[Bottom-up Mergesort: A Detailed Analysis](https://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.6.5260) 主要是用理論來分析 bottom up 以及 top down 兩種方法輸入的 sequence 為最佳,最差以及平均的順序下,輸入的順序對於兩者比較次數的影響。 這篇論文一開始就以以下數學式為例: $C_{w,td}(N) = N lg N + NA(lg N)+1$ 其中 $C_{w,td}$ 代表 top down 方法中最差情況下比較次數的函數。而 $A$ 則如下: $A(x) = 1 - \left\{x\right\} - 2^{1-\left\{x\right\}}$ 其中大括號 $\left\{ \right\}$ 是取小數的意思,$\left\{x \right\}$ 就是取 $x$ 的小數部份。 所以也寫成 $A(x) = A(\left\{x \right\})$ ,可以看到 $A$ 是一個跟輸入變數的小數有關的函數,也可說他是一個週期為一的週期函數。而不只是 $C_{w,td}(N)$ , $C_{b,td}(N)$ 其他如 $C_{a,td}(N)$, $C_{w,td}(N)$ ,$C_{b,bu}(N)$ , $C_{b,bu}(N)$ 以及 $C_{b,bu}(N)$ 都有類似的特性。 將所有情況總結成以下式子: $C(N) = NlgN - NK(lgN) + O(1)$ 其中 $K$ 擁有剛剛所提到週期函數的特性。 所以我在設計有關估計 $K$ 值的實驗時,只要讓輸入變數從一個整數跑到另一個整數,把過程中產生的函數值取平均,就可以得出 $K$ 的平均。 而 $lgN$ 從一個整數 $N_1$ 到另一個整數 $N_2$ ,最簡單的方法就是讓 $N_1$ 為二的次方,而讓 $N_2$ 為 $N_1\times 2-1$ 。 最後我設計出了一個實驗函式: ```c double average_K(int size, int kernel) { if (l_meta.l) q_free(l_meta.l); int end = size * 2; double sum_k = 0; for (int n = size; n < end; n++) { l_meta.l = q_new(); for (int j = 0; j < n; j++) { char randstr_buf[MAX_RANDSTR_LEN]; fill_rand_string(randstr_buf, sizeof(randstr_buf)); q_insert_tail(l_meta.l, randstr_buf); } if (kernel) list_sort(NULL, l_meta.l, my_cmp); else q_sort(l_meta.l); sum_k += log2(n) - (double) (cmp_count-1) / n; q_free(l_meta.l); } l_meta.l = NULL; sum_k /= size; return sum_k; } ``` 這個函式有兩個參數,第一個 `size` 就是剛剛所說的 $N_1$ ,呼叫的人必須確保該變數是二次方,否則會被內部的檢查擋掉。 第二個變數 `kernel` 則是用來切換我自己實作的 `q_sort` 以及 linux kernel 所用的 `list_sort` 。 我新增了一個變數 `cmp_count` 來記錄比較次數,當 `q_sort` 或 `list_sort` 被呼叫時,他們會先將此變數清零,然後每次比較就將他加一。 雖然 average case 的正確定義是所有可能排列的比較次數平均值,但要跑過所有可能排列的時間複雜度實在太高,因此在我的實驗中是隨機產生出一串 queue ,再對 $N$ 到 $2N-1$ 測出的 $K$ 值取平均。 以下是實驗結果: ``` cmd> average_k sort 512 1.027611 l = NULL cmd> average_k sort 1024 1.017292 l = NULL cmd> average_k sort 2048 1.013593 l = NULL cmd> average_k kernel_sort 512 1.208019 l = NULL cmd> average_k kernel_sort 1024 1.208229 l = NULL cmd> average_k kernel_sort 2048 1.208240 l = NULL ``` 可以看到 kernel 的 `list_sort` K 值都在 1.208 上下,而我自己實作的 `q_sort` 則在 1.01 到 1.03 之間,確實差了快要 0.2 。而 $K$ 又是 $N$ 的係數,這樣下來就差了 $0.2\times N$ 次的比較。 而對於 George Spelvin 中提到 1025 跟 1028 相比,1025 的表現會比較好,因為剩下的那一個平均到最常串列的二分之一就會被合併完,而 1028 剩下的 4 個則是要平均到最長串列的四分之三。我也有做類似的實驗,不過是用 1030 而不是 1028 。 我先用了 `add_param` 將剛剛所提到的 `cmp_count` 加入 option ,並在每次排序完記下它的值並算出 $K$ 值。 使用 `q_sort` 的情況下,隨機插入 1025 個值並排序,三次的 $K$ 值分別為 1.25 , 1.16 , 0.7 。 而長度為 1030 的 $K$ 值則為 0.49 , 0.5 , 0.43 。 確實如 George Spelvin 所述,長度為 1025 的 queue 的 $K$ 值有可能很差,但也有可能很好。但長度為 1030 的 queue 就幾乎都很差了。 在使用 `list_sort` 的情況下,長度為 1025 的三個 $K$ 值分別為 1.23 , 1.23 , 1.25 ,長度為 1030 的 $K$ 值則為 1.25 , 1.24 , 1.25 。強制 $2:1$ 的條件真的讓 $K$ 值增加且穩定很多。 將各個長度的 queue 算出的 $K$ 值用 gnuplot 繪製成折線圖,將長度從 2048 算到 4095 ,可以得到下圖: ![](https://i.imgur.com/cxOflxY.png) 其中 $x$ 軸為 $log_2N$,$y$ 軸為 $K$ 值。 在我的實驗以及 George Spelvin 的 commit message 中, $K$ 值前面是有加負號的,但在論文中類似的函數並沒有。為了與論文中的圖做比較,我另外繪製了一張 $y$ 軸為 $-K$ 的圖: ![](https://i.imgur.com/uRk81mW.png) 而以下是論文中 top down merge sort (下面的函數) 與 bottom up merge sort (上面的函數) 的 average case 比較圖: ![](https://i.imgur.com/jkLgusQ.png) 雖然實驗中沒有照 average case 的嚴格定義針對所有的排列都做統計,只是以同長度的隨機排序去做測試,但我所實作的 bottom up merge sort 整體趨勢還是與理論上的圖形相似。 從圖中可以看到,一般的 bottom up merge sort 中 $K$ 值隨著 $log_{2}N$ 的小數部份不同變化很大,而且可能很小。 但 kernel 中實作的 `list_sort` 中,強制每次合併相差不超過 $2:1$ 後,$K$ 值十分穩定,最差也只是在 1.2 附近徘徊。 最後附上一張從 512 算到 2045 的圖,可以看出 K 值的週期性。 ![](https://i.imgur.com/egPZm1S.png) ### 改良合併排序 在第三篇論文 [Queue-mergesort](https://www.sciencedirect.com/science/article/pii/002001909390088Q?via%3Dihub) 中可以看到以下這段: > On the other hand a simple modification makes bottom-up mergesort optimal. As in the standard bottom-up algorithm we make repeated passes over the lists, pairing them up, two by two. The modification is that, at the end of a pass - in the case that there are an odd number of lists - we do not leave the last list by itself but merge it with the new list that was just created when the second and third to last lists on that pass were merged. It is not difficult to show that all of the leaves of the merge-tree created are on its bottom two levels so this new mergesort is an optimal one. 我試著套用他所說的方法,將程式碼加入函式 `q_sort` 的以下片段: ```diff while ((*last) != head && next_list != head) { /* * Make each sorted list doubly circular list * to make use of function merge. */ (*last)->prev->next = (*last); next_list->prev->next = next_list; (*last) = merge((*last), next_list); + if(next_next_list!=head && next_next_list->prev->next == head){ + next_next_list->prev->next = next_next_list; + (*last) = merge((*last),next_next_list); + (*last)->prev->next = head; + break; + } /* * The result of function merge is a doubly circular list, * so make tail of list point it next to next sorted list. */ last = &((*last)->prev->next); *last = next_next_list; next_list = (*last)->prev->next; next_next_list = next_list->prev->next; } ``` 以下是改良後我的排序與 list_sort 的 $K$ 值比較圖: ![](https://i.imgur.com/FMOSEut.png) ### 用 valgrind cachegrind 分析 cache miss: 測試的過程中整個執行時期可能會用到 q_insert_head 等等跟排序無關的函式,為了能更深入聚焦在排序的 cache miss,我參考了 [bakudr18 的共筆](https://hackmd.io/@MR-9Qa9wQLWglSAUyd6UhQ/BkvLMLkeq?fbclid=IwAR0u24vhtJjsTn2-z-1cqdMWDuAVIcRFCVr7CEM7DUyQU0O9ML4s5-7MLvs),用 valgrind cachegrind 來更細節的分析各個函式的 cache miss 。 此實驗主要是執行排序效能測試的 trace-15-perf.cmd ```bash $ valgrind --tool=cachegrind ./qtest -f traces/trace-15-perf.cmd ``` 以及我新增的 trace-15-perf.cmd ,內容是將 trace-15-perf.cmd 中的 sort command 都改成 kernel_sort command (核心所用的 list_sort): ```bash $ valgrind --tool=cachegrind ./qtest -f traces/trace-18-perf.cmd ``` 以下是 trace-15-perf.cmd 的執行結果,因為數據太多,只節錄跟排序有關的部份: ``` -------------------------------------------------------------------------------- Ir I1mr ILmr Dr D1mr DLmr Dw D1mw DLmw -------------------------------------------------------------------------------- 521,225,980 1,730 1,696 149,836,464 12,874,954 8,796,052 72,543,881 981,208 564,538 PROGRAM TOTALS -------------------------------------------------------------------------------- Ir I1mr ILmr Dr D1mr DLmr Dw D1mw DLmw file:function -------------------------------------------------------------------------------- 81,493,493 7 7 18,428,848 4,040,333 3,313,315 0 0 0 /build/glibc-eX1tMB/glibc-2.31/string/../sysdeps/x86_64/multiarch/strcmp-avx2.S:__strcmp_avx2 78,340,865 4 4 35,327,586 3,890,710 2,996,411 10,890,546 400 3 /home/wuyu/linux2022/lab0-c/queue.c:merge 56,091,504 3 3 14,079,648 9 3 5,279,868 0 0 /build/glibc-eX1tMB/glibc-2.31/stdlib/random_r.c:random_r 36,959,076 2 2 14,079,648 0 0 3,519,912 0 0 /build/glibc-eX1tMB/glibc-2.31/stdlib/random.c:random 31,887,778 1 1 10,415,930 0 0 21,471,848 94,144 65,263 /home/wuyu/linux2022/lab0-c/list.h:merge 31,602,778 45 45 5,807,513 117,899 14,137 5,327,283 199,998 199,988 /build/glibc-eX1tMB/glibc-2.31/malloc/malloc.c:_int_malloc 25,930,469 16 16 7,363,067 29,997 16,248 3,201,704 13 8 /build/glibc-eX1tMB/glibc-2.31/malloc/malloc.c:_int_free 20,647,168 5 4 2,081,071 3 2 3,521,024 0 0 /home/wuyu/linux2022/lab0-c/qtest.c:fill_rand_string 17,284,143 8 8 2,560,594 1,906,637 627,926 640,546 24 12 /home/wuyu/linux2022/lab0-c/qtest.c:show_queue 14,736,552 7 7 3,841,852 55 26 1,490,789 2 0 /build/glibc-eX1tMB/glibc-2.31/malloc/malloc.c:malloc 14,419,110 60 58 7,209,516 283 136 19 1 1 ???:??? 13,439,748 4 4 3,839,928 320,260 231,806 0 0 0 /build/glibc-eX1tMB/glibc-2.31/string/../sysdeps/x86_64/multiarch/strcmp-sse42.S:__strcasecmp_l_avx 12,800,117 4 4 3,200,030 6 4 3,840,033 83,377 29,663 /home/wuyu/linux2022/lab0-c/harness.c:test_malloc 12,160,190 7 7 3,840,036 204,064 109,204 2,240,018 360,651 140,791 /home/wuyu/linux2022/lab0-c/harness.c:test_free 8,895,058 5 5 640,010 0 0 1,280,032 1 1 /build/glibc-eX1tMB/glibc-2.31/string/../sysdeps/x86_64/multiarch/memset-vec-unaligned-erms.S:__memset_avx2_unaligned_erms 8,321,288 3 3 2,880,588 1,359,523 944,791 1,920,012 81,063 42,639 /home/wuyu/linux2022/lab0-c/queue.c:q_sort ``` 以下是 trace-18-perf.cmd 的執行結果: ``` -------------------------------------------------------------------------------- Ir I1mr ILmr Dr D1mr DLmr Dw D1mw DLmw -------------------------------------------------------------------------------- 499,032,158 1,730 1,696 124,592,177 9,062,954 2,743,449 54,042,530 1,228,965 728,496 PROGRAM TOTALS -------------------------------------------------------------------------------- Ir I1mr ILmr Dr D1mr DLmr Dw D1mw DLmw file:function -------------------------------------------------------------------------------- 81,028,035 7 7 18,310,676 2,604,640 618,802 0 0 0 /build/glibc-eX1tMB/glibc-2.31/string/../sysdeps/x86_64/multiarch/strcmp-avx2.S:__strcmp_avx2 56,079,744 3 3 14,076,696 9 3 5,278,761 0 0 /build/glibc-eX1tMB/glibc-2.31/stdlib/random_r.c:random_r 51,072,039 2 2 6,643,617 509,045 58,414 9,447,378 6 2 /home/wuyu/linux2022/lab0-c/qtest.c:merge 36,951,327 2 2 14,076,696 0 0 3,519,174 0 0 /build/glibc-eX1tMB/glibc-2.31/stdlib/random.c:random 32,955,606 2 2 14,646,936 2,201,729 335,476 3,661,734 0 0 /home/wuyu/linux2022/lab0-c/qtest.c:my_cmp 31,602,778 45 45 5,807,513 118,032 14,163 5,327,283 199,998 199,988 /build/glibc-eX1tMB/glibc-2.31/malloc/malloc.c:_int_malloc 25,930,469 16 16 7,363,067 29,964 16,211 3,201,704 13 8 /build/glibc-eX1tMB/glibc-2.31/malloc/malloc.c:_int_free 20,637,179 5 4 2,080,013 3 2 3,519,597 0 0 /home/wuyu/linux2022/lab0-c/qtest.c:fill_rand_string 17,284,143 8 8 2,560,594 1,906,820 630,148 640,546 24 12 /home/wuyu/linux2022/lab0-c/qtest.c:show_queue 14,736,600 7 7 3,841,852 55 22 1,490,789 2 0 /build/glibc-eX1tMB/glibc-2.31/malloc/malloc.c:malloc 14,371,084 58 56 7,185,503 284 137 19 1 1 ???:??? 13,439,748 4 4 3,839,928 320,270 269,812 0 0 0 /build/glibc-eX1tMB/glibc-2.31/string/../sysdeps/x86_64/multiarch/strcmp-sse42.S:__strcasecmp_l_avx 12,872,510 6 6 2,239,856 162,950 108,048 2,675,566 423,420 271,718 /home/wuyu/linux2022/lab0-c/qtest.c:list_sort ``` | q_sort | DLmr | | ------:| -----------:| | strcmp | 3,545,121 | | merge | 2,996,411 | | q_sort | 944,791 | | total | 7,486,323 | | list_sort | DLmr | | ---------:| -----------:| | strcmp | 888,614 | | my_cmp | 335,476 | | merge | 58,414 | | list_sort | 108,048 | | total | 1,390,552 | 註: strcmp 數據中每個排序各有兩條,這部份我還不知道兩者有何區別,此表是將每個排序中分開的兩條相加。 從 DLmr (LL cache data read misses) 的數據中可以看到,`q_sort` 總數為 8,796,052 ,`list_sort` 則是 2,743,449 ,兩者差了好幾倍。不過這包括了其他函式,因此我們再細看跟排序有關的幾個。 讓我意外的是,原本預期 `merge` 的 cache misses 是最高,但竟然冒出一個 `strcmp` ,在 `q_sort` 中比 `merge` 還要高。但想一想發現並不奇怪,因為在 `merge `中呼叫 `strcmp` 所傳入的字串只是該字串的位址,真正要讀取到字串的記憶體內容已經到 `strcmp` 中了,所以 cache misses 才會很高。剩下的還有 `q_sort` 函式本體, 加總為 7,486,323 次。 `list_sort` 也有類似的現象, 最高的是 `strcmp` ,而因為實際讀取 `element_t` 是在 `my_cmp` ——即自訂的比較函式——裡面,所以再來就是 `my_cmp` 。接著是 `list_sort` 本體, `merge` 反而最低,加總為 1,390,552 。 因此,與函式相關的 cache misses 就差了快六倍。可見雖然都是 bottom up 的合併排序,但深度優先或廣度優先對於 cache misses 確實差很多。 為了驗證我前面對於 `strcmp` 數量這麼高的解釋,並想出辦法改善,我想起之前看到 [komark06](https://hackmd.io/@komark06/linux2022-lab0#q_insert_head-%E5%92%8C-q_insert_tail-%E5%AF%A6%E4%BD%9C)的共筆,他曾經在配置 `element_t` 以及其中的字串時,將兩者的記憶體只用一次 `malloc` 配置,以達到 cache friendly 的效果。 如果使用此方法,我應該就有很大的機會可以在 `merge` 中使用讀取 `element_t` 的記憶體時,利用 locality 將字串也一同放進 cache 中,避免 `strcmp` cache misses 。 以下是採用該方法的數據: q_sort: ``` -------------------------------------------------------------------------------- Ir I1mr ILmr Dr D1mr DLmr Dw D1mw DLmw -------------------------------------------------------------------------------- 453,977,831 1,734 1,696 132,540,488 9,782,921 3,884,637 62,098,816 808,154 332,413 PROGRAM TOTALS -------------------------------------------------------------------------------- Ir I1mr ILmr Dr D1mr DLmr Dw D1mw DLmw file:function -------------------------------------------------------------------------------- 83,513,787 7 7 18,430,238 1,573,618 545,893 0 0 0 /build/glibc-eX1tMB/glibc-2.31/string/../sysdeps/x86_64/multiarch/strcmp-avx2.S:__strcmp_avx2 78,345,619 4 4 35,330,139 3,754,560 1,723,334 10,891,102 271 2 /home/wuyu/linux2022/lab0-c/queue.c:merge 50,997,184 3 3 12,800,912 9 0 4,800,342 0 0 /build/glibc-eX1tMB/glibc-2.31/stdlib/random_r.c:random_r 33,602,394 2 2 12,800,912 0 0 3,200,228 0 0 /build/glibc-eX1tMB/glibc-2.31/stdlib/random.c:random 31,890,280 0 0 10,416,764 0 0 21,473,516 94,748 39,202 /home/wuyu/linux2022/lab0-c/list.h:merge 20,643,906 5 3 2,080,447 3 1 3,520,558 0 0 /home/wuyu/linux2022/lab0-c/qtest.c:fill_rand_string 17,284,143 8 8 2,560,594 1,905,942 527,696 640,546 24 8 /home/wuyu/linux2022/lab0-c/qtest.c:show_queue 15,807,359 43 43 2,904,908 60,879 1 2,664,700 100,005 99,993 /build/glibc-eX1tMB/glibc-2.31/malloc/malloc.c:_int_malloc 15,232,198 7 7 3,839,928 80,166 35,239 0 0 0 /build/glibc-eX1tMB/glibc-2.31/string/../sysdeps/x86_64/multiarch/strcmp-sse42.S:__strcasecmp_l_avx 12,970,348 16 16 3,683,034 10,907 2,809 1,601,696 13 5 /build/glibc-eX1tMB/glibc-2.31/malloc/malloc.c:_int_free 12,819,982 61 58 6,409,952 283 76 19 1 1 ???:??? 8,321,288 5 5 2,880,588 1,358,357 707,691 1,920,012 80,612 40,023 /home/wuyu/linux2022/lab0-c/queue.c:q_sort ``` list_sort: ``` -------------------------------------------------------------------------------- Ir I1mr ILmr Dr D1mr DLmr Dw D1mw DLmw -------------------------------------------------------------------------------- 431,788,188 1,734 1,695 107,304,825 6,709,798 1,305,855 43,600,561 1,054,654 439,309 PROGRAM TOTALS -------------------------------------------------------------------------------- Ir I1mr ILmr Dr D1mr DLmr Dw D1mw DLmw file:function -------------------------------------------------------------------------------- 82,999,961 7 7 18,310,096 1,471,603 128,413 0 0 0 /build/glibc-eX1tMB/glibc-2.31/string/../sysdeps/x86_64/multiarch/strcmp-avx2.S:__strcmp_avx2 51,070,461 2 2 6,643,509 1,293 2 9,447,162 6 0 /home/wuyu/linux2022/lab0-c/qtest.c:merge 51,008,368 3 3 12,803,720 9 0 4,801,395 0 0 /build/glibc-eX1tMB/glibc-2.31/stdlib/random_r.c:random_r 33,609,765 2 2 12,803,720 0 0 3,200,930 0 0 /build/glibc-eX1tMB/glibc-2.31/stdlib/random.c:random 32,954,562 2 2 14,646,472 2,032,733 139,050 3,661,618 0 0 /home/wuyu/linux2022/lab0-c/qtest.c:my_cmp 20,643,122 5 3 2,079,984 3 1 3,520,446 0 0 /home/wuyu/linux2022/lab0-c/qtest.c:fill_rand_string 17,284,143 8 8 2,560,594 1,905,959 529,617 640,546 24 8 /home/wuyu/linux2022/lab0-c/qtest.c:show_queue 15,807,485 43 43 2,904,926 60,794 1 2,664,720 100,005 99,993 /build/glibc-eX1tMB/glibc-2.31/malloc/malloc.c:_int_malloc 15,242,754 7 7 3,839,928 80,300 49,657 0 0 0 /build/glibc-eX1tMB/glibc-2.31/string/../sysdeps/x86_64/multiarch/strcmp-sse42.S:__strcasecmp_l_avx 12,970,389 16 16 3,683,047 11,009 2,781 1,601,708 13 5 /build/glibc-eX1tMB/glibc-2.31/malloc/malloc.c:_int_free 12,872,463 6 6 2,239,856 107,497 53,828 2,675,550 422,320 186,001 /home/wuyu/linux2022/lab0-c/qtest.c:list_sort ``` | q_sort | 分開 malloc | 一次 malloc | | ------:| -----------:| -----------:| | strcmp | 3,545,121 | 581,132 | | merge | 2,996,411 | 1,723,334 | | q_sort | 944,791 | 707,691 | | total | 7,486,323 | 3,012,157 | | list_sort | 分開 malloc | 一次 malloc | | ------:| -----------:| -----------:| | strcmp | 888,614 | 178,070 | | my_cmp | 335,476 | 139,050 | | merge | 58,414 | 2 | | list_sort | 108,048 | 53,828 | | total | 1,390,552 | 370,950 | 可以看到只進行一次 `malloc` 將 `element_t` 以及字串配置在一起的話,不論是那一種排序,對 DLmr 都有很大的改善。細看各個函式的 miss 次數,也可以看到 `strcmp` 佔所有 DLmr 的比例也大幅下降,因為在讀到字串之前就會先讀到 `element_t` ,因此遇到 element_t 的 `merge` (`q_sort`) 以及 `my_cmp` (`list_sort`) 佔所有 DLmr 的比例也相對提高。 ### 在 qtest 提供新的命令 shuffle 在 qtest.c 中加入以下程式碼: ```c void q_shuffle(struct list_head *head) { if (!head || list_empty(head)) return; srand(time(NULL)); int n = q_size(head); struct list_head *first = head->next; list_del_init(head); for (int i = 0; i < n - 1; i++) { int rnd = rand() % (n - i); int dir = rnd > (n - i) / 2 ? 0 : 1; rnd = dir ? n - i - rnd : rnd; for (int j = 0; j < rnd; j++) { first = dir ? first->prev : first->next; } list_move(first->next, head); } list_move(first, head); } static bool do_shuffle(int argc, char *argv[]) { if (argc != 1) { report(1, "%s takes no arguments", argv[0]); return false; } if (!l_meta.l) report(3, "Warning: Try to access null queue"); error_check(); set_noallocate_mode(true); if (exception_setup(true)) q_shuffle(l_meta.l); exception_cancel(); set_noallocate_mode(false); show_queue(3); return !error_check(); } ``` 本實作時間複雜度為 $O(n^2)$ ,希望可以找到更好的方法改善。 在 `console_init()` 中加入這一行: ```c static void console_init() { /* * other code ... */ ADD_COMMAND(shuffle, " | Shuffle the queue"); /* * other code ... */ } ``` ### RIO 套件的使用 在 `lab0-c` 專案中,主要用到 rio 套件的地方是 `console.c` 中的 `readline` 這個函式 。 首先要分析 rio 套件的目的以及優點。 一般讀檔案時,是像下圖: ```graphviz digraph "Doubly Linked List" { rankdir=UD; node [shape=record]; a [label="kernel", width=5,height=1] c [label="usr_buffer", width=3] a:s -> c:c [arrowhead=vee, arrowtail=vee, dir=both, tailclip=false,label=" read(system call)",color=red]; } ``` 直接透過 `read` 等 system call (trap) 將檔案的內容讀進 user buffer 中。但這會有個問題,如果今天要執行一連串單位小數量多的讀取動作,如 `console.c` 中的 `readline` 一個一個字元去檢查是否為 `'\n'` ,則勢必需要呼叫很多次 `read` 。但是 system call 涉及了 kernel/user space 之間的切換,因此大量的 system call 非常沒效率。 於是就有了 rio 套件,透過引入一個中間的 rio buffer ,只用一次 system call 將大量的資料讀進這個 buffer 中,當 usr_buffer 要讀取檔案時,只需要從 rio buffer 將資料搬運到 user buffer 就好。以這種方式,就可以避免 readline 的時候,重複使用 system call 。 ```graphviz digraph "Doubly Linked List" { rankdir=UD; node [shape=record]; a [label="kernel", width=5,height=1] c [label="rio_buffer", width=3] d [label="usr_buffer", width=3] a:s -> c:c [arrowhead=vee, arrowtail=vee, dir=both, tailclip=false,label=" read (system call)",color=red]; c:s -> d:c [arrowhead=vee, arrowtail=vee, dir=both, tailclip=false]; } ``` #### rio 套件現有缺陷 比較 `readline` 與 cs:app 中的類似函式 `rio_readlineb` 會發現,`readline` 並沒有任何參數,而 `rio_readlineb` 則有許多參數: `rio_readlineb(rio_t *rp, char *usrbuf, size_t maxline)` 。 其中 `rp` 是指向 `rio_t` 的指標,這個結構如下: ```c typedef struct RIO_ELE rio_t, *rio_ptr; struct RIO_ELE { int fd; /* File descriptor */ int cnt; /* Unread bytes in internal buffer */ char *bufptr; /* Next unread byte in internal buffer */ char buf[RIO_BUFSIZE]; /* Internal buffer */ rio_ptr prev; /* Next element in stack */ }; ``` 它包含了目標檔案的 file descriptor ,一個 buffer ,還有兩個變數用來記目前 buffer 內還沒讀的 byte 以及目前讀到的地方。`prev` 是 `lab0-c` 專案中特別加入的,為了可以讀取複數個檔案,用 singly linked list 的方式實作一個簡單的 stack ,讓程式可以 push pop 不同的檔案。 另一個參數 `userbuf` 是呼叫 `rio_readlineb` 要自己提供的 buffer 。 最後一個參數 `maxline` 則是讀取的上限。 `readline` 沒有接收任何 `rio_t` 以及 buffer 相關的參數,它要把資料放在哪裡呢? 答案是這兩個變數: `linebuf` 以及 `buf_stack` 。 他們是宣告在 `console.c` 中的 static global variable 。 ```c static rio_ptr buf_stack; static char linebuf[RIO_BUFSIZE]; ``` 這樣的實作方式有幾個缺點: 第一個是這個 function 並沒有 reentrant 的特性。 當一個 thread 正在使用 `readline`,在用完之前被中斷(如被作業系統切換或被 signal handler 切換成別的程式,甚至是自己)並在繼續執行之前有其他 thread (或是自己) 呼叫的 `readline` ,就可能會產生錯誤 。 考慮以下程式: ```c int main() { signal(SIGALRM,sigalrmhandler); /* other code */ readline(); } void sigalrmhandler() { char *buf = readline(); printf("%s\n",buf); } ``` 如果在 `readline` 執行到一半時,程式接到了 `SIGALRM` 這個 signal ,那他就會跳到 `sigalrmhandler` 執行另一個 `readline` ,最後回到一開始的 `readline` 時,因為 `linebuf` 中的內容可能已經被蓋掉,而且 `buf_stack` 中的資料也可能被讀走,因此結果就可能有錯。 如果是用課本給的 `readlineb` 版本,因為 buffer 都是呼叫者所提供的,所以不會衝突到,可以安全的讀寫也能任意被打斷。 第二點是如果執行過程中要讀取複數個檔案,而且是交錯讀取不是一次讀取,如檔案 A 讀一行檔案 B 讀一行再換檔案 A 讀一行,則必須不斷 push 、 pop file ,而且還要記下 pop 出來的 file 。 如過是用課本給的 `readlineb` 版本,傳入的參數本身就會附上要讀寫的檔案以及相對應的 buffer ,所以也不用頻繁的 pop 、 push file 。 以上兩點在目前的 `lab0-c` 中還不太會發生。