owned this note
owned this note
Published
Linked with GitHub
# 研讀 Linux 核心原始程式碼的 [lib/list_sort.c](https://github.com/torvalds/linux/blob/master/lib/list_sort.c) 筆記
contributed by < [Appmedia06](https://github.com/Appmedia06) >
這篇主要是研讀 ```list_sort.c``` 的筆記,首先先嘗試理解這個檔案寫了什麼,可以看到裡面總共有3個函式,分別是 ```merge``` , ```merge_final``` , ```list_sort``` ,簡單看過一遍後得知 ```list_sort``` 是使用 **merge sort** 實作,因此呼叫到 ```merge``` 和 ```merge_final``` ,以下開始詳細討論個函式功能。
## 研讀筆記
### ```__attribute__((nonnull()))```
在每個函式前面都有這一行程式,它是由 GNU C 編譯器所提供的一個特殊屬性,所謂 ```nonnull``` 是在告訴編譯器函式的第幾個參數不應該為空指標,否則會產生 **undefined behavior** 。
以 ```list_sort``` 為例
```cpp
__attribute__((nonnull(2,3)))
void list_sort(void *priv, struct list_head *head, list_cmp_func_t cmp)
```
第二個參數 ```head``` 和第三個參數 ```cmp``` 不能為空指標。
---
### ```merge```
將兩個以排序的鍊結串列 ```a``` 和 ```b``` 合併成一個已排序鍊結串列。
這和我在 lab0 所實做的 ```merge_Two_list``` 雷同,使用**間接指標**來合併鍊結串列
* 演算方法
1. 宣告一個新的鍊結串列 ```head``` ,當作合併的鍊結串列
2. 宣告一個間接指標 ```tail``` ,指向 ```head```
3. 無窮迴圈中,比較 ```a``` 跟 ```b``` 的大小,將小的指標透過 ```tail``` 放入 ```head``` 中
4. 當有一方使用完後,將另一方的剩餘部份放入 ```head``` 結束迴圈,回傳 ```head```
* 完整程式碼
```cpp
static struct list_head *merge(void *priv, list_cmp_func_t cmp,
struct list_head *a, struct list_head *b)
{
struct list_head *head, **tail = &head;
for (;;) {
/* if equal, take 'a' -- important for sort stability */
if (cmp(priv, a, b) <= 0) {
*tail = a;
tail = &a->next;
a = a->next;
if (!a) {
*tail = b;
break;
}
} else {
*tail = b;
tail = &b->next;
b = b->next;
if (!b) {
*tail = a;
break;
}
}
}
return head;
}
```
註解說到,若 ```a``` 和 ```b``` 的值是相同的,則取 ```a``` 放入,其目的是為了要維持 **sort stability** 。
:::info
sort stability: 在排序演算法中,穩定性(stability)指的是如果兩個元素在排序前的順序相等,那麼在排序後它們的相對順序仍然保持不變。 換句話說,穩定性表示排序演算法在排序時會尊重相等元素的原始順序。
:::
---
### ```merge_final```
和 ```merge``` 的操作類似,主要差別為因為這是在排序的最後才會呼叫到,因此需要實現以下兩件事
1. 當有一方已經結束合併後,去檢查剩餘一方是否出現高度不平衡的狀態(例如:已排序)
```cpp
/* Finish linking remainder of list b on to tail */
tail->next = b;
do {
/*
* If the merge is highly unbalanced (e.g. the input is
* already sorted), this loop may run many iterations.
* Continue callbacks to the client even though no
* element comparison is needed, so the client's cmp()
* routine can invoke cond_resched() periodically.
*/
if (unlikely(!++count))
cmp(priv, b, b);
b->prev = tail;
tail = b;
b = b->next;
} while (b);
```
註解說明了當出現高度不平衡時,這個 do_while 迴圈會被呼叫很多次,但 ```b``` 這個鍊結串列不是被排序好了嗎,為什麼不直接接在 ```head``` 的後面就好啦。這邊就要提到 ```unlikely``` 函式。
在 linux kernel 的 /include/linux/compiler.h 中定義了兩個 macro
```cpp
# define likely(x) __builtin_expect(!!(x), 1)
# define unlikely(x) __builtin_expect(!!(x), 0)
```
* ```__built_expect()``` 是GCC的內建function,用來將branch的相關資訊提供給compiler,告訴compiler設計者期望的比較結果,以便對程式碼改變分支順序來進行優化。
* 使用2次的 NOT operation 使 x 必為 0 或 1
所以這樣做有什麼意義以及好處呢?
如上述所題,如果發生高度不平衡的情況, ```b``` 就會變得非常的長,因此我們就會在迴圈內一直呼叫到 ```unlikely``` ,我們知道在取得資料時,會先從 **Cache** 找,若沒找到(cache miss),就會從 Memory 搬取一個 block 的資料,因為 patial Locality 的特性,程式碼相近的區域可能會一起被抓取進 cache ,進而大大提昇 cache hit 的機率。而 ```unlikely``` 函式就是使不可能的區域往後移,也就使其變成一個對 cache 友善的程式。
2. 因為是最後要合併的鍊結串列,需將 ```prev``` 串上
```cpp
/* And the final links to make a circular doubly-linked list */
tail->next = head;
head->prev = tail;
```
---
### ```list_sort```
:::spoiler 完整程式碼
```cpp
__attribute__((nonnull(2,3)))
void list_sort(void *priv, struct list_head *head, list_cmp_func_t cmp)
{
struct list_head *list = head->next, *pending = NULL;
size_t count = 0; /* Count of pending */
if (list == head->prev) /* Zero or one elements */
return;
/* Convert to a null-terminated singly-linked list. */
head->prev->next = NULL;
/*
* Data structure invariants:
* - All lists are singly linked and null-terminated; prev
* pointers are not maintained.
* - pending is a prev-linked "list of lists" of sorted
* sublists awaiting further merging.
* - Each of the sorted sublists is power-of-two in size.
* - Sublists are sorted by size and age, smallest & newest at front.
* - There are zero to two sublists of each size.
* - A pair of pending sublists are merged as soon as the number
* of following pending elements equals their size (i.e.
* each time count reaches an odd multiple of that size).
* That ensures each later final merge will be at worst 2:1.
* - Each round consists of:
* - Merging the two sublists selected by the highest bit
* which flips when count is incremented, and
* - Adding an element from the input as a size-1 sublist.
*/
do {
size_t bits;
struct list_head **tail = &pending;
/* Find the least-significant clear bit in count */
for (bits = count; bits & 1; bits >>= 1)
tail = &(*tail)->prev;
/* Do the indicated merge */
if (likely(bits)) {
struct list_head *a = *tail, *b = a->prev;
a = merge(priv, cmp, b, a);
/* Install the merged result in place of the inputs */
a->prev = b->prev;
*tail = a;
}
/* Move one element from input list to pending */
list->prev = pending;
pending = list;
list = list->next;
pending->next = NULL;
count++;
} while (list);
/* End of input; merge together all the pending lists. */
list = pending;
pending = pending->prev;
for (;;) {
struct list_head *next = pending->prev;
if (!next)
break;
list = merge(priv, cmp, pending, list);
pending = next;
}
/* The final merge, rebuilding prev links */
merge_final(priv, cmp, head, pending, list);
}
EXPORT_SYMBOL(list_sort);
```
:::
接下來終於來討論 Linux 對於鍊結串列排序的設計,在看完整程式碼前先來看註解。註解不但可以讓我們更快進入狀況,更幫助我們更加理解設計者本身的想法。
```
* list_sort - sort a list
* @priv: private data, opaque to list_sort(), passed to @cmp
* @head: the list to sort
* @cmp: the elements comparison function
```
* ```priv``` : 為一個指標,傳遞 ```cmp``` 函式需要的參數
* ```head``` : 為要被排序的鍊結串列
* ```cmp``` : 需要自定義的比較 function pointer
```
* The comparison function @cmp must return > 0 if @a should sort after
* @b ("@a > @b" if you want an ascending sort), and <= 0 if @a should
* sort before @b *or* their original order should be preserved. It is
* always called with the element that came first in the input in @a,
* and list_sort is a stable sort, so it is not necessary to distinguish
* the @a < @b and @a == @b cases.
*
* This is compatible with two styles of @cmp function:
* - The traditional style which returns <0 / =0 / >0, or
* - Returning a boolean 0/1.
* The latter offers a chance to save a few cycles in the comparison
* (which is used by e.g. plug_ctx_cmp() in block/blk-mq.c).
*
* A good way to write a multi-word comparison is::
*
* if (a->high != b->high)
* return a->high > b->high;
* if (a->middle != b->middle)
* return a->middle > b->middle;
* return a->low > b->low;
```
這邊說明了 ```cmp``` 比較函式的規範,如果 `a` 應該在 `b` 之後排序,則比較函數應傳回大於 0 的值;如果 `a` 應該在 `b` 之前排序或它們原始的順序應該保留,則比較函數應傳回小於等於 0 的值。
```
* This mergesort is as eager as possible while always performing at least
* 2:1 balanced merges. Given two pending sublists of size 2^k, they are
* merged to a size-2^(k+1) list as soon as we have 2^k following elements.
*
* Thus, it will avoid cache thrashing as long as 3*2^k elements can
* fit into the cache. Not quite as good as a fully-eager bottom-up
* mergesort, but it does use 0.2*n fewer comparisons, so is faster in
* the common case that everything fits into L1.
```
這邊說明了這個實作 merge sort 最重要的部份,也就是**如何合併**,我們一開始學習的 merge sort 都是 **top down** 的作法,這樣的作法是先做 partition 再做 merge ,而在做 partition 時會有大量資料在 cache 上移進移出,進而導致發生 Cache Thrashing 。
:::info
Cache Thrashing: 是指在電腦系統中頻繁地發生**快取未命中**和**刷新**的現象,導致系統效能急劇下降的情況。 它通常發生在快取容量不足以容納正在存取的資料或指令時。
:::
而 Linux kernel 的開發者如何避免這問題呢,他們使用了 **bottom up** 的方式實做,具體來說就是始終保持兩個要合併的串列的大小比例為 $2:1$ ,也就是說大的串列是小的串列的兩倍,而合併方式則是如果有兩個大小 $2^{k}$ 的串列來時並不會馬上合併,而是等到第三個大小為 $2^{k}$ 的串列進來才選擇合併成 $2^{k+1}$ 和 $2^{k}$ 的量個串列,這樣的大小比例就是前面說的 $2:1$ 了。
這樣做的原因就是因為 $3\times 2^{k}$ 大小的資料剛好可以放入 L1 Cache 裡,使其很好的避免掉了 Cache Thrashing 的問題。
```
* The merging is controlled by "count", the number of elements in the
* pending lists. This is beautifully simple code, but rather subtle.
*
* Each time we increment "count", we set one bit (bit k) and clear
* bits k-1 .. 0. Each time this happens (except the very first time
* for each bit, when count increments to 2^k), we merge two lists of
* size 2^k into one list of size 2^(k+1).
*
* This merge happens exactly when the count reaches an odd multiple of
* 2^k, which is when we have 2^k elements pending in smaller lists,
* so it's safe to merge away two lists of size 2^k.
*
* After this happens twice, we have created two lists of size 2^(k+1),
* which will be merged into a list of size 2^(k+2) before we create
* a third list of size 2^(k+1), so there are never more than two pending.
*
* The number of pending lists of size 2^k is determined by the
* state of bit k of "count" plus two extra pieces of information:
*
* - The state of bit k-1 (when k == 0, consider bit -1 always set), and
* - Whether the higher-order bits are zero or non-zero (i.e.
* is count >= 2^(k+1)).
```
講完了如何合併後,就來討論**何時合併**,利用一個計數器 `count` 來做判斷:
* 當 `count + 1` 後為 $2^{k}$ 就不合併
* 當 `count + 1` 後不為 $2^{k}$ 就合併
我們透過程式碼看實際如何操作
```cpp=
do {
size_t bits;
struct list_head **tail = &pending;
/* Find the least-significant clear bit in count */
for (bits = count; bits & 1; bits >>= 1)
tail = &(*tail)->prev;
/* Do the indicated merge */
if (likely(bits)) {
struct list_head *a = *tail, *b = a->prev;
a = merge(priv, cmp, b, a);
/* Install the merged result in place of the inputs */
a->prev = b->prev;
*tail = a;
}
/* Move one element from input list to pending */
list->prev = pending;
pending = list;
list = list->next;
pending->next = NULL;
count++;
} while (list);
```
先將上面 do_while 迴圈分成三大部份
1. 第6行:當 `count` 為奇數時執行
2. 第9行:當 `count+1` 不為 $2^{k}$ 時合併
3. 第19行:移動一個節點放入 `pending`
以下以這三點繪製流程圖,假設有一個鍊結串列有四個節點[4, 3, 2, 1]:
:::success
流程圖提示
* 以下流程圖的每個節點的 `prev` 和 `next` 若是沒有箭頭,代表指向 `NULL`
* 紅色箭頭代表這個步驟做的事情
* 若要刪除某個節點的 `prev` 或 `next` 的箭頭,不要直接刪掉,會導致方塊上下不整齊,可以利用在後面添加 `[color="white"]` 使其變白色
:::
* 進入 ```list_sort``` 後的初始動作
```cpp
__attribute__((nonnull(2,3)))
void list_sort(void *priv, struct list_head *head, list_cmp_func_t cmp)
{
struct list_head *list = head->next, *pending = NULL;
size_t count = 0; /* Count of pending */
if (list == head->prev) /* Zero or one elements */
return;
/* Convert to a null-terminated singly-linked list. */
head->prev->next = NULL;
```
```graphviz
digraph ele_list {
rankdir=LR
node[shape=record]
head [label="head|{<left>prev|<right>next}"]
node4 [label="4|{<left>prev|<right>next}"]
node3 [label="3|{<left>prev|<right>next}"]
node2 [label="2|{<left>prev|<right>next}"]
node1 [label="1|{<left>prev|<right>next}"]
list [label="list", style="normal", color="white"]
pending [label="pending", style="normal", color="white"]
NULL [label="NULL", style="normal", color="white"]
head:right:e -> node4:w
node4:left:w -> head:e
node4:right:e -> node3:w
node3:left:w -> node4:e
node3:right:e -> node2:w
node2:left:w -> node3:e
node2:right:e -> node1:w
node1:left:w -> node2:e
list -> node4:n
pending -> NULL:n
node1:right:e -> NULL
}
```
* count = 0 ,第一次進入 do_while 迴圈
1. 第6行: count 為偶數,不執行
2. 第9行: $count + 1 = 1$ 為 $2^{k}$ ,不合併
3. 第19行: 必執行
```graphviz
digraph ele_list {
rankdir=LR
node[shape=record]
head [label="head|{<left>prev|<right>next}"]
node4 [label="4|{<left>prev|<right>next}"]
node3 [label="3|{<left>prev|<right>next}"]
node2 [label="2|{<left>prev|<right>next}"]
node1 [label="1|{<left>prev|<right>next}"]
list [label="list", style="normal", color="white"]
pending [label="pending", style="normal", color="white"]
tail [label="tail", style="normal", color="white"]
NULL [label="NULL", style="normal", color="white"]
head:right:e -> node4:w
node4:right:e -> node3:w[color="white"]
node3:left:w -> node4:e
node3:right:e -> node2:w
node2:left:w -> node3:e
node2:right:e -> node1:w
node1:left:w -> node2:e
list -> node3:n[color="red"]
pending -> node4:n[color="red"]
tail -> pending:n[color="red"]
node1:right:e -> NULL
}
```
* count = 1 ,第二次進入 do_while 迴圈
1. 第6行: count 為奇數,執行
2. 第9行: $count + 1 = 2$ 為 $2^{k}$ ,不合併
3. 第19行: 必執行
```graphviz
digraph ele_list {
rankdir=LR
node[shape=record]
head [label="head|{<left>prev|<right>next}"]
node4 [label="4|{<left>prev|<right>next}"]
node3 [label="3|{<left>prev|<right>next}"]
node2 [label="2|{<left>prev|<right>next}"]
node1 [label="1|{<left>prev|<right>next}"]
list [label="list", style="normal", color="white"]
pending [label="pending", style="normal", color="white"]
tail [label="tail", style="normal", color="white"]
NULL [label="NULL", style="normal", color="white"]
head:right:e -> node4:w
node4:right:e -> node3:w[color="white"]
node3:left:w -> node4:e
node3:right:e -> node2:w[color="white"]
node2:left:w -> node3:e
node2:right:e -> node1:w
node1:left:w -> node2:e
list -> node2:n[color="red"]
pending -> node3:n[color="red"]
tail -> node4:left:s[color="red"]
node1:right:e -> NULL
}
```
* count = 2 ,第三次進入 do_while 迴圈
1. 第6行: count 為偶數,不執行
2. 第9行: $count + 1 = 3$ 不為 $2^{k}$ ,合併
3. 第19行: 必執行
合併前,`a` 和 `b` 為要合併的部份
```graphviz
digraph ele_list {
rankdir=LR
node[shape=record]
head [label="head|{<left>prev|<right>next}"]
node4 [label="4|{<left>prev|<right>next}"]
node3 [label="3|{<left>prev|<right>next}"]
node2 [label="2|{<left>prev|<right>next}"]
node1 [label="1|{<left>prev|<right>next}"]
list [label="list", style="normal", color="white"]
pending [label="pending", style="normal", color="white"]
tail [label="tail", style="normal", color="white"]
NULL [label="NULL", style="normal", color="white"]
a [label="a", style="normal", color="white"]
b [label="b", style="normal", color="white"]
head:right:e -> node4:w
node4:right:e -> node3:w[color="white"]
node3:left:w -> node4:e
node3:right:e -> node2:w[color="white"]
node2:left:w -> node3:e
node2:right:e -> node1:w
node1:left:w -> node2:e
list -> node2:n
pending -> node3:n
tail -> pending:n[color="red"]
node1:right:e -> NULL
a -> node3[color="red"]
b -> node4:n[color="red"]
}
```
合併後
```graphviz
digraph ele_list {
rankdir=LR
node[shape=record]
head [label="head|{<left>prev|<right>next}"]
node4 [label="4|{<left>prev|<right>next}"]
node3 [label="3|{<left>prev|<right>next}"]
node2 [label="2|{<left>prev|<right>next}"]
node1 [label="1|{<left>prev|<right>next}"]
list [label="list", style="normal", color="white"]
pending [label="pending", style="normal", color="white"]
tail [label="tail", style="normal", color="white"]
NULL [label="NULL", style="normal", color="white"]
a [label="a", style="normal", color="white"]
b [label="b", style="normal", color="white"]
head:right:e -> node4:w
node4:right:e -> node3:w[color="white"]
node3:left:w -> node4:e[color="white"]
node3:right:e -> node2:w[color="white"]
node3:right:e -> node4:e[color="red"]
node2:left:w -> node3:e
node2:right:e -> node1:w
node1:left:w -> node2:e
list -> node2:n
pending -> node3:n
tail -> pending:n
node1:right:e -> NULL
a -> node3
b -> node4:n
}
```
執行19行後
```graphviz
digraph ele_list {
rankdir=LR
node[shape=record]
head [label="head|{<left>prev|<right>next}"]
node4 [label="4|{<left>prev|<right>next}"]
node3 [label="3|{<left>prev|<right>next}"]
node2 [label="2|{<left>prev|<right>next}"]
node1 [label="1|{<left>prev|<right>next}"]
list [label="list", style="normal", color="white"]
pending [label="pending", style="normal", color="white"]
tail [label="tail", style="normal", color="white"]
NULL [label="NULL", style="normal", color="white"]
head:right:e -> node4:w
node4:right:e -> node3:w[color="white"]
node3:left:w -> node4:e[color="white"]
node3:right:e -> node2:w[color="white"]
node3:right:e -> node4:e
node2:left:w -> node3:e
node2:right:e -> node1:w[color="white"]
node1:left:w -> node2:e
list -> node1:n[color="red"]
pending -> node2:n[color="red"]
tail -> pending:n
node1:right:e -> NULL
}
```
* count = 3 ,第四次進入 do_while 迴圈
1. 第6行: count 為奇數,執行
2. 第9行: $count + 1 = 4$ 為 $2^{k}$ ,不合併
3. 第19行: 必執行
```graphviz
digraph ele_list {
rankdir=LR
node[shape=record]
head [label="head|{<left>prev|<right>next}"]
node4 [label="4|{<left>prev|<right>next}"]
node3 [label="3|{<left>prev|<right>next}"]
node2 [label="2|{<left>prev|<right>next}"]
node1 [label="1|{<left>prev|<right>next}"]
list [label="list", style="normal", color="white"]
pending [label="pending", style="normal", color="white"]
tail [label="tail", style="normal", color="white"]
NULL [label="NULL", style="normal", color="white"]
head:right:e -> node4:w
node4:right:e -> node3:w[color="white"]
node3:left:w -> node4:e[color="white"]
node3:right:e -> node2:w[color="white"]
node3:right:e -> node4:e
node2:left:w -> node3:e
node2:right:e -> node1:w[color="white"]
node1:left:w -> node2:e
list -> NULL:n[color="red"]
pending -> node1:n[color="red"]
tail -> node2:left:s[color="red"]
node1:right:e -> NULL
}
```
* count = 4 ,因為 `list` 為 NULL ,離開 do_while 迴圈
在上面的流程可以看到,我們已經將所有節點從 input list 加到 pending list 之中,因此要將 pending list 合併。
```cpp
/* End of input; merge together all the pending lists. */
list = pending;
pending = pending->prev;
for (;;) {
struct list_head *next = pending->prev;
if (!next)
break;
list = merge(priv, cmp, pending, list);
pending = next;
}
```
最後,合併最後的 pending list 之後,將鍊結串列的頭尾接回來。
```cpp
/* The final merge, rebuilding prev links */
merge_final(priv, cmp, head, pending, list);
```