# 研讀 Linux 核心原始程式碼的 [lib/list_sort.c](https://github.com/torvalds/linux/blob/master/lib/list_sort.c) 筆記 contributed by < [Appmedia06](https://github.com/Appmedia06) > 這篇主要是研讀 ```list_sort.c``` 的筆記,首先先嘗試理解這個檔案寫了什麼,可以看到裡面總共有3個函式,分別是 ```merge``` , ```merge_final``` , ```list_sort``` ,簡單看過一遍後得知 ```list_sort``` 是使用 **merge sort** 實作,因此呼叫到 ```merge``` 和 ```merge_final``` ,以下開始詳細討論個函式功能。 ## 研讀筆記 ### ```__attribute__((nonnull()))``` 在每個函式前面都有這一行程式,它是由 GNU C 編譯器所提供的一個特殊屬性,所謂 ```nonnull``` 是在告訴編譯器函式的第幾個參數不應該為空指標,否則會產生 **undefined behavior** 。 以 ```list_sort``` 為例 ```cpp __attribute__((nonnull(2,3))) void list_sort(void *priv, struct list_head *head, list_cmp_func_t cmp) ``` 第二個參數 ```head``` 和第三個參數 ```cmp``` 不能為空指標。 --- ### ```merge``` 將兩個以排序的鍊結串列 ```a``` 和 ```b``` 合併成一個已排序鍊結串列。 這和我在 lab0 所實做的 ```merge_Two_list``` 雷同,使用**間接指標**來合併鍊結串列 * 演算方法 1. 宣告一個新的鍊結串列 ```head``` ,當作合併的鍊結串列 2. 宣告一個間接指標 ```tail``` ,指向 ```head``` 3. 無窮迴圈中,比較 ```a``` 跟 ```b``` 的大小,將小的指標透過 ```tail``` 放入 ```head``` 中 4. 當有一方使用完後,將另一方的剩餘部份放入 ```head``` 結束迴圈,回傳 ```head``` * 完整程式碼 ```cpp static struct list_head *merge(void *priv, list_cmp_func_t cmp, struct list_head *a, struct list_head *b) { struct list_head *head, **tail = &head; for (;;) { /* if equal, take 'a' -- important for sort stability */ if (cmp(priv, a, b) <= 0) { *tail = a; tail = &a->next; a = a->next; if (!a) { *tail = b; break; } } else { *tail = b; tail = &b->next; b = b->next; if (!b) { *tail = a; break; } } } return head; } ``` 註解說到,若 ```a``` 和 ```b``` 的值是相同的,則取 ```a``` 放入,其目的是為了要維持 **sort stability** 。 :::info sort stability: 在排序演算法中,穩定性(stability)指的是如果兩個元素在排序前的順序相等,那麼在排序後它們的相對順序仍然保持不變。 換句話說,穩定性表示排序演算法在排序時會尊重相等元素的原始順序。 ::: --- ### ```merge_final``` 和 ```merge``` 的操作類似,主要差別為因為這是在排序的最後才會呼叫到,因此需要實現以下兩件事 1. 當有一方已經結束合併後,去檢查剩餘一方是否出現高度不平衡的狀態(例如:已排序) ```cpp /* Finish linking remainder of list b on to tail */ tail->next = b; do { /* * If the merge is highly unbalanced (e.g. the input is * already sorted), this loop may run many iterations. * Continue callbacks to the client even though no * element comparison is needed, so the client's cmp() * routine can invoke cond_resched() periodically. */ if (unlikely(!++count)) cmp(priv, b, b); b->prev = tail; tail = b; b = b->next; } while (b); ``` 註解說明了當出現高度不平衡時,這個 do_while 迴圈會被呼叫很多次,但 ```b``` 這個鍊結串列不是被排序好了嗎,為什麼不直接接在 ```head``` 的後面就好啦。這邊就要提到 ```unlikely``` 函式。 在 linux kernel 的 /include/linux/compiler.h 中定義了兩個 macro ```cpp # define likely(x) __builtin_expect(!!(x), 1) # define unlikely(x) __builtin_expect(!!(x), 0) ``` * ```__built_expect()``` 是GCC的內建function,用來將branch的相關資訊提供給compiler,告訴compiler設計者期望的比較結果,以便對程式碼改變分支順序來進行優化。 * 使用2次的 NOT operation 使 x 必為 0 或 1 所以這樣做有什麼意義以及好處呢? 如上述所題,如果發生高度不平衡的情況, ```b``` 就會變得非常的長,因此我們就會在迴圈內一直呼叫到 ```unlikely``` ,我們知道在取得資料時,會先從 **Cache** 找,若沒找到(cache miss),就會從 Memory 搬取一個 block 的資料,因為 patial Locality 的特性,程式碼相近的區域可能會一起被抓取進 cache ,進而大大提昇 cache hit 的機率。而 ```unlikely``` 函式就是使不可能的區域往後移,也就使其變成一個對 cache 友善的程式。 2. 因為是最後要合併的鍊結串列,需將 ```prev``` 串上 ```cpp /* And the final links to make a circular doubly-linked list */ tail->next = head; head->prev = tail; ``` --- ### ```list_sort``` :::spoiler 完整程式碼 ```cpp __attribute__((nonnull(2,3))) void list_sort(void *priv, struct list_head *head, list_cmp_func_t cmp) { struct list_head *list = head->next, *pending = NULL; size_t count = 0; /* Count of pending */ if (list == head->prev) /* Zero or one elements */ return; /* Convert to a null-terminated singly-linked list. */ head->prev->next = NULL; /* * Data structure invariants: * - All lists are singly linked and null-terminated; prev * pointers are not maintained. * - pending is a prev-linked "list of lists" of sorted * sublists awaiting further merging. * - Each of the sorted sublists is power-of-two in size. * - Sublists are sorted by size and age, smallest & newest at front. * - There are zero to two sublists of each size. * - A pair of pending sublists are merged as soon as the number * of following pending elements equals their size (i.e. * each time count reaches an odd multiple of that size). * That ensures each later final merge will be at worst 2:1. * - Each round consists of: * - Merging the two sublists selected by the highest bit * which flips when count is incremented, and * - Adding an element from the input as a size-1 sublist. */ do { size_t bits; struct list_head **tail = &pending; /* Find the least-significant clear bit in count */ for (bits = count; bits & 1; bits >>= 1) tail = &(*tail)->prev; /* Do the indicated merge */ if (likely(bits)) { struct list_head *a = *tail, *b = a->prev; a = merge(priv, cmp, b, a); /* Install the merged result in place of the inputs */ a->prev = b->prev; *tail = a; } /* Move one element from input list to pending */ list->prev = pending; pending = list; list = list->next; pending->next = NULL; count++; } while (list); /* End of input; merge together all the pending lists. */ list = pending; pending = pending->prev; for (;;) { struct list_head *next = pending->prev; if (!next) break; list = merge(priv, cmp, pending, list); pending = next; } /* The final merge, rebuilding prev links */ merge_final(priv, cmp, head, pending, list); } EXPORT_SYMBOL(list_sort); ``` ::: 接下來終於來討論 Linux 對於鍊結串列排序的設計,在看完整程式碼前先來看註解。註解不但可以讓我們更快進入狀況,更幫助我們更加理解設計者本身的想法。 ``` * list_sort - sort a list * @priv: private data, opaque to list_sort(), passed to @cmp * @head: the list to sort * @cmp: the elements comparison function ``` * ```priv``` : 為一個指標,傳遞 ```cmp``` 函式需要的參數 * ```head``` : 為要被排序的鍊結串列 * ```cmp``` : 需要自定義的比較 function pointer ``` * The comparison function @cmp must return > 0 if @a should sort after * @b ("@a > @b" if you want an ascending sort), and <= 0 if @a should * sort before @b *or* their original order should be preserved. It is * always called with the element that came first in the input in @a, * and list_sort is a stable sort, so it is not necessary to distinguish * the @a < @b and @a == @b cases. * * This is compatible with two styles of @cmp function: * - The traditional style which returns <0 / =0 / >0, or * - Returning a boolean 0/1. * The latter offers a chance to save a few cycles in the comparison * (which is used by e.g. plug_ctx_cmp() in block/blk-mq.c). * * A good way to write a multi-word comparison is:: * * if (a->high != b->high) * return a->high > b->high; * if (a->middle != b->middle) * return a->middle > b->middle; * return a->low > b->low; ``` 這邊說明了 ```cmp``` 比較函式的規範,如果 `a` 應該在 `b` 之後排序,則比較函數應傳回大於 0 的值;如果 `a` 應該在 `b` 之前排序或它們原始的順序應該保留,則比較函數應傳回小於等於 0 的值。 ``` * This mergesort is as eager as possible while always performing at least * 2:1 balanced merges. Given two pending sublists of size 2^k, they are * merged to a size-2^(k+1) list as soon as we have 2^k following elements. * * Thus, it will avoid cache thrashing as long as 3*2^k elements can * fit into the cache. Not quite as good as a fully-eager bottom-up * mergesort, but it does use 0.2*n fewer comparisons, so is faster in * the common case that everything fits into L1. ``` 這邊說明了這個實作 merge sort 最重要的部份,也就是**如何合併**,我們一開始學習的 merge sort 都是 **top down** 的作法,這樣的作法是先做 partition 再做 merge ,而在做 partition 時會有大量資料在 cache 上移進移出,進而導致發生 Cache Thrashing 。 :::info Cache Thrashing: 是指在電腦系統中頻繁地發生**快取未命中**和**刷新**的現象,導致系統效能急劇下降的情況。 它通常發生在快取容量不足以容納正在存取的資料或指令時。 ::: 而 Linux kernel 的開發者如何避免這問題呢,他們使用了 **bottom up** 的方式實做,具體來說就是始終保持兩個要合併的串列的大小比例為 $2:1$ ,也就是說大的串列是小的串列的兩倍,而合併方式則是如果有兩個大小 $2^{k}$ 的串列來時並不會馬上合併,而是等到第三個大小為 $2^{k}$ 的串列進來才選擇合併成 $2^{k+1}$ 和 $2^{k}$ 的量個串列,這樣的大小比例就是前面說的 $2:1$ 了。 這樣做的原因就是因為 $3\times 2^{k}$ 大小的資料剛好可以放入 L1 Cache 裡,使其很好的避免掉了 Cache Thrashing 的問題。 ``` * The merging is controlled by "count", the number of elements in the * pending lists. This is beautifully simple code, but rather subtle. * * Each time we increment "count", we set one bit (bit k) and clear * bits k-1 .. 0. Each time this happens (except the very first time * for each bit, when count increments to 2^k), we merge two lists of * size 2^k into one list of size 2^(k+1). * * This merge happens exactly when the count reaches an odd multiple of * 2^k, which is when we have 2^k elements pending in smaller lists, * so it's safe to merge away two lists of size 2^k. * * After this happens twice, we have created two lists of size 2^(k+1), * which will be merged into a list of size 2^(k+2) before we create * a third list of size 2^(k+1), so there are never more than two pending. * * The number of pending lists of size 2^k is determined by the * state of bit k of "count" plus two extra pieces of information: * * - The state of bit k-1 (when k == 0, consider bit -1 always set), and * - Whether the higher-order bits are zero or non-zero (i.e. * is count >= 2^(k+1)). ``` 講完了如何合併後,就來討論**何時合併**,利用一個計數器 `count` 來做判斷: * 當 `count + 1` 後為 $2^{k}$ 就不合併 * 當 `count + 1` 後不為 $2^{k}$ 就合併 我們透過程式碼看實際如何操作 ```cpp= do { size_t bits; struct list_head **tail = &pending; /* Find the least-significant clear bit in count */ for (bits = count; bits & 1; bits >>= 1) tail = &(*tail)->prev; /* Do the indicated merge */ if (likely(bits)) { struct list_head *a = *tail, *b = a->prev; a = merge(priv, cmp, b, a); /* Install the merged result in place of the inputs */ a->prev = b->prev; *tail = a; } /* Move one element from input list to pending */ list->prev = pending; pending = list; list = list->next; pending->next = NULL; count++; } while (list); ``` 先將上面 do_while 迴圈分成三大部份 1. 第6行:當 `count` 為奇數時執行 2. 第9行:當 `count+1` 不為 $2^{k}$ 時合併 3. 第19行:移動一個節點放入 `pending` 以下以這三點繪製流程圖,假設有一個鍊結串列有四個節點[4, 3, 2, 1]: :::success 流程圖提示 * 以下流程圖的每個節點的 `prev` 和 `next` 若是沒有箭頭,代表指向 `NULL` * 紅色箭頭代表這個步驟做的事情 * 若要刪除某個節點的 `prev` 或 `next` 的箭頭,不要直接刪掉,會導致方塊上下不整齊,可以利用在後面添加 `[color="white"]` 使其變白色 ::: * 進入 ```list_sort``` 後的初始動作 ```cpp __attribute__((nonnull(2,3))) void list_sort(void *priv, struct list_head *head, list_cmp_func_t cmp) { struct list_head *list = head->next, *pending = NULL; size_t count = 0; /* Count of pending */ if (list == head->prev) /* Zero or one elements */ return; /* Convert to a null-terminated singly-linked list. */ head->prev->next = NULL; ``` ```graphviz digraph ele_list { rankdir=LR node[shape=record] head [label="head|{<left>prev|<right>next}"] node4 [label="4|{<left>prev|<right>next}"] node3 [label="3|{<left>prev|<right>next}"] node2 [label="2|{<left>prev|<right>next}"] node1 [label="1|{<left>prev|<right>next}"] list [label="list", style="normal", color="white"] pending [label="pending", style="normal", color="white"] NULL [label="NULL", style="normal", color="white"] head:right:e -> node4:w node4:left:w -> head:e node4:right:e -> node3:w node3:left:w -> node4:e node3:right:e -> node2:w node2:left:w -> node3:e node2:right:e -> node1:w node1:left:w -> node2:e list -> node4:n pending -> NULL:n node1:right:e -> NULL } ``` * count = 0 ,第一次進入 do_while 迴圈 1. 第6行: count 為偶數,不執行 2. 第9行: $count + 1 = 1$ 為 $2^{k}$ ,不合併 3. 第19行: 必執行 ```graphviz digraph ele_list { rankdir=LR node[shape=record] head [label="head|{<left>prev|<right>next}"] node4 [label="4|{<left>prev|<right>next}"] node3 [label="3|{<left>prev|<right>next}"] node2 [label="2|{<left>prev|<right>next}"] node1 [label="1|{<left>prev|<right>next}"] list [label="list", style="normal", color="white"] pending [label="pending", style="normal", color="white"] tail [label="tail", style="normal", color="white"] NULL [label="NULL", style="normal", color="white"] head:right:e -> node4:w node4:right:e -> node3:w[color="white"] node3:left:w -> node4:e node3:right:e -> node2:w node2:left:w -> node3:e node2:right:e -> node1:w node1:left:w -> node2:e list -> node3:n[color="red"] pending -> node4:n[color="red"] tail -> pending:n[color="red"] node1:right:e -> NULL } ``` * count = 1 ,第二次進入 do_while 迴圈 1. 第6行: count 為奇數,執行 2. 第9行: $count + 1 = 2$ 為 $2^{k}$ ,不合併 3. 第19行: 必執行 ```graphviz digraph ele_list { rankdir=LR node[shape=record] head [label="head|{<left>prev|<right>next}"] node4 [label="4|{<left>prev|<right>next}"] node3 [label="3|{<left>prev|<right>next}"] node2 [label="2|{<left>prev|<right>next}"] node1 [label="1|{<left>prev|<right>next}"] list [label="list", style="normal", color="white"] pending [label="pending", style="normal", color="white"] tail [label="tail", style="normal", color="white"] NULL [label="NULL", style="normal", color="white"] head:right:e -> node4:w node4:right:e -> node3:w[color="white"] node3:left:w -> node4:e node3:right:e -> node2:w[color="white"] node2:left:w -> node3:e node2:right:e -> node1:w node1:left:w -> node2:e list -> node2:n[color="red"] pending -> node3:n[color="red"] tail -> node4:left:s[color="red"] node1:right:e -> NULL } ``` * count = 2 ,第三次進入 do_while 迴圈 1. 第6行: count 為偶數,不執行 2. 第9行: $count + 1 = 3$ 不為 $2^{k}$ ,合併 3. 第19行: 必執行 合併前,`a` 和 `b` 為要合併的部份 ```graphviz digraph ele_list { rankdir=LR node[shape=record] head [label="head|{<left>prev|<right>next}"] node4 [label="4|{<left>prev|<right>next}"] node3 [label="3|{<left>prev|<right>next}"] node2 [label="2|{<left>prev|<right>next}"] node1 [label="1|{<left>prev|<right>next}"] list [label="list", style="normal", color="white"] pending [label="pending", style="normal", color="white"] tail [label="tail", style="normal", color="white"] NULL [label="NULL", style="normal", color="white"] a [label="a", style="normal", color="white"] b [label="b", style="normal", color="white"] head:right:e -> node4:w node4:right:e -> node3:w[color="white"] node3:left:w -> node4:e node3:right:e -> node2:w[color="white"] node2:left:w -> node3:e node2:right:e -> node1:w node1:left:w -> node2:e list -> node2:n pending -> node3:n tail -> pending:n[color="red"] node1:right:e -> NULL a -> node3[color="red"] b -> node4:n[color="red"] } ``` 合併後 ```graphviz digraph ele_list { rankdir=LR node[shape=record] head [label="head|{<left>prev|<right>next}"] node4 [label="4|{<left>prev|<right>next}"] node3 [label="3|{<left>prev|<right>next}"] node2 [label="2|{<left>prev|<right>next}"] node1 [label="1|{<left>prev|<right>next}"] list [label="list", style="normal", color="white"] pending [label="pending", style="normal", color="white"] tail [label="tail", style="normal", color="white"] NULL [label="NULL", style="normal", color="white"] a [label="a", style="normal", color="white"] b [label="b", style="normal", color="white"] head:right:e -> node4:w node4:right:e -> node3:w[color="white"] node3:left:w -> node4:e[color="white"] node3:right:e -> node2:w[color="white"] node3:right:e -> node4:e[color="red"] node2:left:w -> node3:e node2:right:e -> node1:w node1:left:w -> node2:e list -> node2:n pending -> node3:n tail -> pending:n node1:right:e -> NULL a -> node3 b -> node4:n } ``` 執行19行後 ```graphviz digraph ele_list { rankdir=LR node[shape=record] head [label="head|{<left>prev|<right>next}"] node4 [label="4|{<left>prev|<right>next}"] node3 [label="3|{<left>prev|<right>next}"] node2 [label="2|{<left>prev|<right>next}"] node1 [label="1|{<left>prev|<right>next}"] list [label="list", style="normal", color="white"] pending [label="pending", style="normal", color="white"] tail [label="tail", style="normal", color="white"] NULL [label="NULL", style="normal", color="white"] head:right:e -> node4:w node4:right:e -> node3:w[color="white"] node3:left:w -> node4:e[color="white"] node3:right:e -> node2:w[color="white"] node3:right:e -> node4:e node2:left:w -> node3:e node2:right:e -> node1:w[color="white"] node1:left:w -> node2:e list -> node1:n[color="red"] pending -> node2:n[color="red"] tail -> pending:n node1:right:e -> NULL } ``` * count = 3 ,第四次進入 do_while 迴圈 1. 第6行: count 為奇數,執行 2. 第9行: $count + 1 = 4$ 為 $2^{k}$ ,不合併 3. 第19行: 必執行 ```graphviz digraph ele_list { rankdir=LR node[shape=record] head [label="head|{<left>prev|<right>next}"] node4 [label="4|{<left>prev|<right>next}"] node3 [label="3|{<left>prev|<right>next}"] node2 [label="2|{<left>prev|<right>next}"] node1 [label="1|{<left>prev|<right>next}"] list [label="list", style="normal", color="white"] pending [label="pending", style="normal", color="white"] tail [label="tail", style="normal", color="white"] NULL [label="NULL", style="normal", color="white"] head:right:e -> node4:w node4:right:e -> node3:w[color="white"] node3:left:w -> node4:e[color="white"] node3:right:e -> node2:w[color="white"] node3:right:e -> node4:e node2:left:w -> node3:e node2:right:e -> node1:w[color="white"] node1:left:w -> node2:e list -> NULL:n[color="red"] pending -> node1:n[color="red"] tail -> node2:left:s[color="red"] node1:right:e -> NULL } ``` * count = 4 ,因為 `list` 為 NULL ,離開 do_while 迴圈 在上面的流程可以看到,我們已經將所有節點從 input list 加到 pending list 之中,因此要將 pending list 合併。 ```cpp /* End of input; merge together all the pending lists. */ list = pending; pending = pending->prev; for (;;) { struct list_head *next = pending->prev; if (!next) break; list = merge(priv, cmp, pending, list); pending = next; } ``` 最後,合併最後的 pending list 之後,將鍊結串列的頭尾接回來。 ```cpp /* The final merge, rebuilding prev links */ merge_final(priv, cmp, head, pending, list); ```