Try   HackMD

研讀 Linux 核心原始程式碼的 lib/list_sort.c 筆記

contributed by < Appmedia06 >

這篇主要是研讀 list_sort.c 的筆記,首先先嘗試理解這個檔案寫了什麼,可以看到裡面總共有3個函式,分別是 merge , merge_final , list_sort ,簡單看過一遍後得知 list_sort 是使用 merge sort 實作,因此呼叫到 mergemerge_final ,以下開始詳細討論個函式功能。

研讀筆記

__attribute__((nonnull()))

在每個函式前面都有這一行程式,它是由 GNU C 編譯器所提供的一個特殊屬性,所謂 nonnull 是在告訴編譯器函式的第幾個參數不應該為空指標,否則會產生 undefined behavior

list_sort 為例

__attribute__((nonnull(2,3)))
void list_sort(void *priv, struct list_head *head, list_cmp_func_t cmp)

第二個參數 head 和第三個參數 cmp 不能為空指標。


merge

將兩個以排序的鍊結串列 ab 合併成一個已排序鍊結串列。

這和我在 lab0 所實做的 merge_Two_list 雷同,使用間接指標來合併鍊結串列

  • 演算方法

    1. 宣告一個新的鍊結串列 head ,當作合併的鍊結串列
    2. 宣告一個間接指標 tail ,指向 head
    3. 無窮迴圈中,比較 ab 的大小,將小的指標透過 tail 放入 head
    4. 當有一方使用完後,將另一方的剩餘部份放入 head 結束迴圈,回傳 head
  • 完整程式碼

static struct list_head *merge(void *priv, list_cmp_func_t cmp,
				struct list_head *a, struct list_head *b)
{
	struct list_head *head, **tail = &head;

	for (;;) {
		/* if equal, take 'a' -- important for sort stability */
		if (cmp(priv, a, b) <= 0) {
			*tail = a;
			tail = &a->next;
			a = a->next;
			if (!a) {
				*tail = b;
				break;
			}
		} else {
			*tail = b;
			tail = &b->next;
			b = b->next;
			if (!b) {
				*tail = a;
				break;
			}
		}
	}
	return head;
}

註解說到,若 ab 的值是相同的,則取 a 放入,其目的是為了要維持 sort stability

sort stability: 在排序演算法中,穩定性(stability)指的是如果兩個元素在排序前的順序相等,那麼在排序後它們的相對順序仍然保持不變。 換句話說,穩定性表示排序演算法在排序時會尊重相等元素的原始順序。


merge_final

merge 的操作類似,主要差別為因為這是在排序的最後才會呼叫到,因此需要實現以下兩件事

  1. 當有一方已經結束合併後,去檢查剩餘一方是否出現高度不平衡的狀態(例如:已排序)
/* Finish linking remainder of list b on to tail */
tail->next = b;
do {
    /*
     * If the merge is highly unbalanced (e.g. the input is
     * already sorted), this loop may run many iterations.
     * Continue callbacks to the client even though no
     * element comparison is needed, so the client's cmp()
     * routine can invoke cond_resched() periodically.
     */
    if (unlikely(!++count))
        cmp(priv, b, b);
    b->prev = tail;
    tail = b;
    b = b->next;
} while (b);

註解說明了當出現高度不平衡時,這個 do_while 迴圈會被呼叫很多次,但 b 這個鍊結串列不是被排序好了嗎,為什麼不直接接在 head 的後面就好啦。這邊就要提到 unlikely 函式。

在 linux kernel 的 /include/linux/compiler.h 中定義了兩個 macro

# define likely(x)	__builtin_expect(!!(x), 1)
# define unlikely(x)	__builtin_expect(!!(x), 0)
  • __built_expect() 是GCC的內建function,用來將branch的相關資訊提供給compiler,告訴compiler設計者期望的比較結果,以便對程式碼改變分支順序來進行優化。
  • 使用2次的 NOT operation 使 x 必為 0 或 1

所以這樣做有什麼意義以及好處呢?

如上述所題,如果發生高度不平衡的情況, b 就會變得非常的長,因此我們就會在迴圈內一直呼叫到 unlikely ,我們知道在取得資料時,會先從 Cache 找,若沒找到(cache miss),就會從 Memory 搬取一個 block 的資料,因為 patial Locality 的特性,程式碼相近的區域可能會一起被抓取進 cache ,進而大大提昇 cache hit 的機率。而 unlikely 函式就是使不可能的區域往後移,也就使其變成一個對 cache 友善的程式。

  1. 因為是最後要合併的鍊結串列,需將 prev 串上
/* And the final links to make a circular doubly-linked list */
tail->next = head;
head->prev = tail;

list_sort

完整程式碼
__attribute__((nonnull(2,3)))
void list_sort(void *priv, struct list_head *head, list_cmp_func_t cmp)
{
	struct list_head *list = head->next, *pending = NULL;
	size_t count = 0;	/* Count of pending */

	if (list == head->prev)	/* Zero or one elements */
		return;

	/* Convert to a null-terminated singly-linked list. */
	head->prev->next = NULL;

	/*
	 * Data structure invariants:
	 * - All lists are singly linked and null-terminated; prev
	 *   pointers are not maintained.
	 * - pending is a prev-linked "list of lists" of sorted
	 *   sublists awaiting further merging.
	 * - Each of the sorted sublists is power-of-two in size.
	 * - Sublists are sorted by size and age, smallest & newest at front.
	 * - There are zero to two sublists of each size.
	 * - A pair of pending sublists are merged as soon as the number
	 *   of following pending elements equals their size (i.e.
	 *   each time count reaches an odd multiple of that size).
	 *   That ensures each later final merge will be at worst 2:1.
	 * - Each round consists of:
	 *   - Merging the two sublists selected by the highest bit
	 *     which flips when count is incremented, and
	 *   - Adding an element from the input as a size-1 sublist.
	 */
	do {
		size_t bits;
		struct list_head **tail = &pending;

		/* Find the least-significant clear bit in count */
		for (bits = count; bits & 1; bits >>= 1)
			tail = &(*tail)->prev;
		/* Do the indicated merge */
		if (likely(bits)) {
			struct list_head *a = *tail, *b = a->prev;

			a = merge(priv, cmp, b, a);
			/* Install the merged result in place of the inputs */
			a->prev = b->prev;
			*tail = a;
		}

		/* Move one element from input list to pending */
		list->prev = pending;
		pending = list;
		list = list->next;
		pending->next = NULL;
		count++;
	} while (list);

	/* End of input; merge together all the pending lists. */
	list = pending;
	pending = pending->prev;
	for (;;) {
		struct list_head *next = pending->prev;

		if (!next)
			break;
		list = merge(priv, cmp, pending, list);
		pending = next;
	}
	/* The final merge, rebuilding prev links */
	merge_final(priv, cmp, head, pending, list);
}
EXPORT_SYMBOL(list_sort);

接下來終於來討論 Linux 對於鍊結串列排序的設計,在看完整程式碼前先來看註解。註解不但可以讓我們更快進入狀況,更幫助我們更加理解設計者本身的想法。

 * list_sort - sort a list
 * @priv: private data, opaque to list_sort(), passed to @cmp
 * @head: the list to sort
 * @cmp: the elements comparison function
  • priv : 為一個指標,傳遞 cmp 函式需要的參數
  • head : 為要被排序的鍊結串列
  • cmp : 需要自定義的比較 function pointer
 * The comparison function @cmp must return > 0 if @a should sort after
 * @b ("@a > @b" if you want an ascending sort), and <= 0 if @a should
 * sort before @b *or* their original order should be preserved.  It is
 * always called with the element that came first in the input in @a,
 * and list_sort is a stable sort, so it is not necessary to distinguish
 * the @a < @b and @a == @b cases.
 *
 * This is compatible with two styles of @cmp function:
 * - The traditional style which returns <0 / =0 / >0, or
 * - Returning a boolean 0/1.
 * The latter offers a chance to save a few cycles in the comparison
 * (which is used by e.g. plug_ctx_cmp() in block/blk-mq.c).
 *
 * A good way to write a multi-word comparison is::
 *
 *	if (a->high != b->high)
 *		return a->high > b->high;
 *	if (a->middle != b->middle)
 *		return a->middle > b->middle;
 *	return a->low > b->low;

這邊說明了 cmp 比較函式的規範,如果 a 應該在 b 之後排序,則比較函數應傳回大於 0 的值;如果 a 應該在 b 之前排序或它們原始的順序應該保留,則比較函數應傳回小於等於 0 的值。

 * This mergesort is as eager as possible while always performing at least
 * 2:1 balanced merges.  Given two pending sublists of size 2^k, they are
 * merged to a size-2^(k+1) list as soon as we have 2^k following elements.
 *
 * Thus, it will avoid cache thrashing as long as 3*2^k elements can
 * fit into the cache.  Not quite as good as a fully-eager bottom-up
 * mergesort, but it does use 0.2*n fewer comparisons, so is faster in
 * the common case that everything fits into L1.

這邊說明了這個實作 merge sort 最重要的部份,也就是如何合併,我們一開始學習的 merge sort 都是 top down 的作法,這樣的作法是先做 partition 再做 merge ,而在做 partition 時會有大量資料在 cache 上移進移出,進而導致發生 Cache Thrashing 。

Cache Thrashing: 是指在電腦系統中頻繁地發生快取未命中刷新的現象,導致系統效能急劇下降的情況。 它通常發生在快取容量不足以容納正在存取的資料或指令時。

而 Linux kernel 的開發者如何避免這問題呢,他們使用了 bottom up 的方式實做,具體來說就是始終保持兩個要合併的串列的大小比例為

2:1 ,也就是說大的串列是小的串列的兩倍,而合併方式則是如果有兩個大小
2k
的串列來時並不會馬上合併,而是等到第三個大小為
2k
的串列進來才選擇合併成
2k+1
2k
的量個串列,這樣的大小比例就是前面說的
2:1
了。

這樣做的原因就是因為

3×2k 大小的資料剛好可以放入 L1 Cache 裡,使其很好的避免掉了 Cache Thrashing 的問題。

 * The merging is controlled by "count", the number of elements in the
 * pending lists.  This is beautifully simple code, but rather subtle.
 *
 * Each time we increment "count", we set one bit (bit k) and clear
 * bits k-1 .. 0.  Each time this happens (except the very first time
 * for each bit, when count increments to 2^k), we merge two lists of
 * size 2^k into one list of size 2^(k+1).
 *
 * This merge happens exactly when the count reaches an odd multiple of
 * 2^k, which is when we have 2^k elements pending in smaller lists,
 * so it's safe to merge away two lists of size 2^k.
 *
 * After this happens twice, we have created two lists of size 2^(k+1),
 * which will be merged into a list of size 2^(k+2) before we create
 * a third list of size 2^(k+1), so there are never more than two pending.
 *
 * The number of pending lists of size 2^k is determined by the
 * state of bit k of "count" plus two extra pieces of information:
 *
 * - The state of bit k-1 (when k == 0, consider bit -1 always set), and
 * - Whether the higher-order bits are zero or non-zero (i.e.
 *   is count >= 2^(k+1)).

講完了如何合併後,就來討論何時合併,利用一個計數器 count 來做判斷:

  • count + 1 後為
    2k
    就不合併
  • count + 1 後不為
    2k
    就合併

我們透過程式碼看實際如何操作

do { size_t bits; struct list_head **tail = &pending; /* Find the least-significant clear bit in count */ for (bits = count; bits & 1; bits >>= 1) tail = &(*tail)->prev; /* Do the indicated merge */ if (likely(bits)) { struct list_head *a = *tail, *b = a->prev; a = merge(priv, cmp, b, a); /* Install the merged result in place of the inputs */ a->prev = b->prev; *tail = a; } /* Move one element from input list to pending */ list->prev = pending; pending = list; list = list->next; pending->next = NULL; count++; } while (list);

先將上面 do_while 迴圈分成三大部份

  1. 第6行:當 count 為奇數時執行
  2. 第9行:當 count+1 不為
    2k
    時合併
  3. 第19行:移動一個節點放入 pending

以下以這三點繪製流程圖,假設有一個鍊結串列有四個節點[4, 3, 2, 1]:

流程圖提示

  • 以下流程圖的每個節點的 prevnext 若是沒有箭頭,代表指向 NULL
  • 紅色箭頭代表這個步驟做的事情
  • 若要刪除某個節點的 prevnext 的箭頭,不要直接刪掉,會導致方塊上下不整齊,可以利用在後面添加 [color="white"] 使其變白色
  • 進入 list_sort 後的初始動作
__attribute__((nonnull(2,3)))
void list_sort(void *priv, struct list_head *head, list_cmp_func_t cmp)
{
	struct list_head *list = head->next, *pending = NULL;
	size_t count = 0;	/* Count of pending */

	if (list == head->prev)	/* Zero or one elements */
		return;

	/* Convert to a null-terminated singly-linked list. */
	head->prev->next = NULL;






ele_list



head

head

prev

next



node4

4

prev

next



head:e->node4:w





node4:w->head:e





node3

3

prev

next



node4:e->node3:w





node3:w->node4:e





node2

2

prev

next



node3:e->node2:w





node2:w->node3:e





node1

1

prev

next



node2:e->node1:w





node1:w->node2:e





NULL

NULL



node1:e->NULL





list

list



list->node4:n





pending

pending



pending->NULL:n





  • count = 0 ,第一次進入 do_while 迴圈
    1. 第6行: count 為偶數,不執行
    2. 第9行:
      count+1=1
      2k
      ,不合併
    3. 第19行: 必執行






ele_list



head

head

prev

next



node4

4

prev

next



head:e->node4:w





node3

3

prev

next



node4:e->node3:w





node3:w->node4:e





node2

2

prev

next



node3:e->node2:w





node2:w->node3:e





node1

1

prev

next



node2:e->node1:w





node1:w->node2:e





NULL

NULL



node1:e->NULL





list

list



list->node3:n





pending

pending



pending->node4:n





tail

tail



tail->pending:n





  • count = 1 ,第二次進入 do_while 迴圈
    1. 第6行: count 為奇數,執行
    2. 第9行:
      count+1=2
      2k
      ,不合併
    3. 第19行: 必執行






ele_list



head

head

prev

next



node4

4

prev

next



head:e->node4:w





node3

3

prev

next



node4:e->node3:w





node3:w->node4:e





node2

2

prev

next



node3:e->node2:w





node2:w->node3:e





node1

1

prev

next



node2:e->node1:w





node1:w->node2:e





NULL

NULL



node1:e->NULL





list

list



list->node2:n





pending

pending



pending->node3:n





tail

tail



tail->node4:s





  • count = 2 ,第三次進入 do_while 迴圈
    1. 第6行: count 為偶數,不執行
    2. 第9行:
      count+1=3
      不為
      2k
      ,合併
    3. 第19行: 必執行

合併前,ab 為要合併的部份







ele_list



head

head

prev

next



node4

4

prev

next



head:e->node4:w





node3

3

prev

next



node4:e->node3:w





node3:w->node4:e





node2

2

prev

next



node3:e->node2:w





node2:w->node3:e





node1

1

prev

next



node2:e->node1:w





node1:w->node2:e





NULL

NULL



node1:e->NULL





list

list



list->node2:n





pending

pending



pending->node3:n





tail

tail



tail->pending:n





a

a



a->node3





b

b



b->node4:n





合併後







ele_list



head

head

prev

next



node4

4

prev

next



head:e->node4:w





node3

3

prev

next



node4:e->node3:w





node3:w->node4:e





node3:e->node4:e





node2

2

prev

next



node3:e->node2:w





node2:w->node3:e





node1

1

prev

next



node2:e->node1:w





node1:w->node2:e





NULL

NULL



node1:e->NULL





list

list



list->node2:n





pending

pending



pending->node3:n





tail

tail



tail->pending:n





a

a



a->node3





b

b



b->node4:n





執行19行後







ele_list



head

head

prev

next



node4

4

prev

next



head:e->node4:w





node3

3

prev

next



node4:e->node3:w





node3:w->node4:e





node3:e->node4:e





node2

2

prev

next



node3:e->node2:w





node2:w->node3:e





node1

1

prev

next



node2:e->node1:w





node1:w->node2:e





NULL

NULL



node1:e->NULL





list

list



list->node1:n





pending

pending



pending->node2:n





tail

tail



tail->pending:n





  • count = 3 ,第四次進入 do_while 迴圈
    1. 第6行: count 為奇數,執行
    2. 第9行:
      count+1=4
      2k
      ,不合併
    3. 第19行: 必執行






ele_list



head

head

prev

next



node4

4

prev

next



head:e->node4:w





node3

3

prev

next



node4:e->node3:w





node3:w->node4:e





node3:e->node4:e





node2

2

prev

next



node3:e->node2:w





node2:w->node3:e





node1

1

prev

next



node2:e->node1:w





node1:w->node2:e





NULL

NULL



node1:e->NULL





list

list



list->NULL:n





pending

pending



pending->node1:n





tail

tail



tail->node2:s





  • count = 4 ,因為 list 為 NULL ,離開 do_while 迴圈

在上面的流程可以看到,我們已經將所有節點從 input list 加到 pending list 之中,因此要將 pending list 合併。

/* End of input; merge together all the pending lists. */
list = pending;
pending = pending->prev;
for (;;) {
    struct list_head *next = pending->prev;

    if (!next)
        break;
    list = merge(priv, cmp, pending, list);
    pending = next;
}

最後,合併最後的 pending list 之後,將鍊結串列的頭尾接回來。

/* The final merge, rebuilding prev links */
merge_final(priv, cmp, head, pending, list);