--- tags: linux2022 --- # 2022q1 Homework1 (lab0) contributed by < [`bakudr18`](https://github.com/bakudr18/lab0-c)> > [作業要求](https://hackmd.io/@sysprog/linux2022-lab0) ## queue.c 實作 ### q_new 及 q_free `q_new` 在 list head 配置一個 `struct list_head` 即可,而 `q_free` 需要走訪整個 list 來 delete node ,因為走訪過程會 remove node ,因此需要用 `list_for_each_entry_safe` 保留一個 `tmp` node 來避免迭代過程錯誤 ,刪除完所有 node 後最後再釋放 list head 。 ```cpp struct list_head *q_new() { struct list_head *head = malloc(sizeof(struct list_head)); if (!head) return NULL; INIT_LIST_HEAD(head); return head; } void q_free(struct list_head *l) { if (!l) return; element_t *curr, *tmp; list_for_each_entry_safe (curr, tmp, l, list) { list_del(&curr->list); q_release_element(curr); } free(l); } ``` ### q_insert_head, q_insert_tail, q_remove_head, q_remove_tail :::warning 避免在標題中使用 `&`,可能會讓人誤會為 logical/bitwise operator :notes: jserv ::: `q_insert_head` 會先配置 `element_t` 後在配置字串 `s` 所需的空間即可,僅需注意 `malloc` 失敗要做 error handle ,而 `q_remove_head` 須注意 `bufsize` 是由使用者傳進來的參數, size 很可能小於 `node->value` ,因此需做適當的裁切。 `q_insert/remove_tail` 基本上概念相同就不特別放上來了。 ```cpp #define min(x, y) (x) < (y) ? (x) : (y) static element_t *ele_alloc(const char *s) { element_t *node = malloc(sizeof(element_t)); if (!node) return NULL; int len = strlen(s) + 1; node->value = malloc(sizeof(char) * len); if (!node->value) { free(node); return NULL; } strncpy(node->value, s, len); return node; } bool q_insert_head(struct list_head *head, char *s) { if (!head) return false; element_t *newh = ele_alloc(s); if (!newh) return false; list_add(&newh->list, head); return true; } element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize) { element_t *node = NULL; if (!head || list_empty(head)) return node; node = list_first_entry(head, element_t, list); list_del(&node->list); if (sp) { int len = strlen(node->value) + 1; len = min(bufsize, len); strncpy(sp, node->value, len); sp[len - 1] = '\0'; } return node; } ``` :::info 註:`#define min(x, y) (x) < (y) ? (x) : (y)` 的實作仍有許多缺陷,例如 `min(a, min(a, b))` 會有冗贅的重複運算,甚至`min(a++, b++)` 會有錯誤的運算結果等,但因這裡使用情境單純因此不做特別擴充。 延伸閱讀:==[Linux 核心原始程式碼巨集: `max`, `min`](https://hackmd.io/@sysprog/linux-macro-minmax)== ::: ### q_size `q_size` 因題目取消時間複雜度 $O(1)$ 的要求,因此用最單純的 for loop 來計算 size 即可,時間複雜度為 $O(n)$ 。 ```cpp int q_size(struct list_head *head) { if (!head) return 0; struct list_head *node; int size = 0; list_for_each (node, head) { size++; } return size; } ``` ### q_delete_middle 利用 circular doubly-linked list 的特性,分別由順向 `front` 與逆向 `back` 出發,當兩者碰頭之處即為 middle ,需注意當 queue size 為偶數(即 `front->next == back` )時,要刪除的節點為 `back` 。 ```cpp bool q_delete_mid(struct list_head *head) { if (!head || list_empty(head)) return false; struct list_head *front, *back; front = head->next; back = head->prev; while (front != back && front->next != back) { front = front->next; back = back->prev; } element_t *node = list_entry(back, element_t, list); list_del(back); q_release_element(node); return true; } ``` ### q_swap 此函式會將相鄰節點兩兩交換,例如 `0->1->2->3` swap 後變成 `1->0->3->2`,而 `0->1->2` swap 後變成 `1->0->2` 。 實作方式為在 `while` 迴圈內走訪尋找兩相鄰節點,先交換 `next` pointer 後再交換 `prev` pointer,這裡透過指標的指標來進行`next` 和 `prev` (型態為`struct list_head*`) 的 swap ,值得一提的是,因為 doubly-linked list 有 `prev/next` 紀錄前後一個 node 的位置,因此 swap 操作可以省略 `tmp` variable。 ```cpp void q_swap(struct list_head *head) { if (!head || list_empty(head)) return; struct list_head **indir = &head->next; while (*indir != head && (*indir)->next != head) { // swap next pointer struct list_head **sec = &(*indir)->next; *indir = *sec; *sec = (*sec)->next; (*indir)->next = (*indir)->prev; // swap prev pointer indir = &(*sec)->prev; sec = &(*indir)->prev; *indir = *sec; *sec = (*sec)->prev; (*indir)->prev = (*sec)->next; indir = &(*indir)->next; } } ``` ### q_reverse Reverse doubly-linked list 只要將每一個 node 的 `next` 和 `prev` 交換即可,實作上反而比 singly-linked list 更為單純。 ```cpp void q_reverse(struct list_head *head) { if (!head || list_empty(head) || head->next == head->prev) return; struct list_head *curr = head; do { struct list_head *tmp = curr->next; curr->next = curr->prev; curr->prev = tmp; curr = curr->prev; } while (curr != head); } ``` ### q_delete_dup 當找到相同 `value` 的兩相鄰 node 時,`do while` 刪除後方相同的 node ,最後在刪除前方第一個相同的 node。 ```cpp bool q_delete_dup(struct list_head *head) { // https://leetcode.com/problems/remove-duplicates-from-sorted-list-ii/ if (!head) return false; if (head->prev == head->next) return true; struct list_head **indir = &head->next; while (*indir != head && (*indir)->next != head) { char *a = list_entry(*indir, element_t, list)->value; char *b = list_entry((*indir)->next, element_t, list)->value; if (strcmp(a, b) == 0) { struct list_head *tmp = (*indir)->next; do { list_del(tmp); q_release_element(list_entry(tmp, element_t, list)); tmp = (*indir)->next; } while (tmp != head && strcmp(a, list_entry(tmp, element_t, list)->value) == 0); tmp = *indir; list_del(tmp); q_release_element(list_entry(tmp, element_t, list)); } else indir = &(*indir)->next; } return true; } ``` ## Merge sort list ### 事前準備: * [你所不知道的 C 語言: linked list 和非連續記憶體 [1]](https://hackmd.io/@sysprog/c-linked-list) * [Merge Sort 與它的變化 [2]](https://hackmd.io/@lambert-wu/modified-merge-sort) [[1]](https://hackmd.io/@sysprog/c-linked-list#%E6%A1%88%E4%BE%8B%E6%8E%A2%E8%A8%8E-LeetCode-21-Merge-Two-Sorted-Lists) 文章內探討了 [LeetCode 23. Merge k Sorted Lists](https://leetcode.com/problems/merge-k-sorted-lists/) 的實作,因有同學在 [FB討論區](https://www.facebook.com/groups/system.software2022/permalink/1982054235309310/) 提問到「分斷合併」和「Divide and Conquer」的差異究竟改善了什麼,因此我做了以下實驗。 首先先來比較程式碼, **1. 分段合併** 概念是在每一次 iteration 都會走訪整個 lists ,然後將左右鄰居兩兩合併。 ```graphviz digraph G { { node[shape=none,label="interval = 1"]; i1 node[shape=none,label="interval = 2"]; i2 node[shape=none,label="interval = 4"]; i4 node[shape=none,label="interval = 8"]; i8 } interval1[label="<f0>L0|<f1>L1|<f2>L2|<f3>L3|<f4>L4|<f5>L5|<f6>L6|<f7>L7", shape=record, fixedsize=false,width=6] interval2[label="<f0>L01|<f1>|<f2>L23|<f3>|<f4>L45|<f5>|<f6>L67|<f7>", shape=record, fixedsize=false,width=6] interval4[label="<f0>L0123|<f1>|<f2>|<f3>|<f4>L4567|<f5>|<f6>|<f7>", shape=record, fixedsize=false,width=6] interval8[label="<f0>result|<f1>|<f2>|<f3>|<f4>|<f5>|<f6>|<f7>", shape=record, fixedsize=false,width=6] i1->i2[style=invis] i2->i4[style=invis] i4->i8[style=invis] interval1:f0 -> interval2:f0 interval1:f1 -> interval2:f0 interval1:f2 -> interval2:f2 interval1:f3 -> interval2:f2 interval1:f4 -> interval2:f4 interval1:f5 -> interval2:f4 interval1:f6 -> interval2:f6 interval1:f7 -> interval2:f6 interval1:f7 -> interval2:f7[style=invis] interval2:f0 -> interval4:f0 interval2:f2 -> interval4:f0 interval2:f4 -> interval4:f4 interval2:f6 -> interval4:f4 interval2:f7 -> interval4:f7[style=invis] interval4:f0 -> interval8:f0 interval4:f4 -> interval8:f0 interval4:f7 -> interval8:f7[style=invis] } ``` ```cpp struct ListNode *mergeKLists_interval(struct ListNode **lists, int listsSize) { if (listsSize == 0) return NULL; for (int interval = 1; interval < listsSize; interval *= 2) for (int i = 0; i + interval < listsSize; i += interval * 2) lists[i] = mergeTwoLists(lists[i], lists[i + interval]); return lists[0]; } ``` **2. Divide and Conquer** 為 recursion 作法,每次遞迴會先 mergesort 左半邊的 lists 後再 mergesort 右半邊的 lists ,最後將左右合併在一起,如下圖,merge 的執行順序為 a $\to$ b $\to$ c $\to$ d $\to$ e $\to$ f $\to$ g。這種總是先 merge 左半邊的特性,會先集中處理完一半的 data,相對分段合併法每次迭代都需要走訪到每個 node ,Divide and Conquer 明顯更 [cache friendly](https://en.wikipedia.org/wiki/Cache-oblivious_algorithm)。 ```graphviz digraph G { compound=true node[shape=box, height=.1, width=1];sorted_0 sorted_1 sorted_2 sorted_3 sorted_4 sorted_5 sorted_6 sorted_7; node[shape=record, height=.1];input result; node[shape=record, height=.1];divide_0123 divide_4567 divide_01 divide_23 divide_45 divide_67 node[shape=record, height=.1];merge_01 merge_23 merge_45 merge_67 merge_0123 merge_4567; input[label="<f0>L0|<f1>L1|<f2>L2|<f3>L3|<f4>L4|<f5>L5|<f6>L6|<f7>L7", width=8] subgraph cluster_g{ style=filled; label="g" color="#fffaaa" result[label="L01234567", width=8] } divide_0123[label="<f1>L0|<f2>L1|<f3>L2|<f4>L3", width=4] divide_4567[label="<f1>L4|<f2>L5|<f3>L6|<f4>L7", width=4] divide_01[label="<f0>L0|<f1>L1", width=2] divide_23[label="<f0>L2|<f1>L3", width=2] divide_45[label="<f0>L4|<f1>L5", width=2] divide_67[label="<f0>L6|<f1>L7", width=2] sorted_0[label="L0"] sorted_1[label="L1"] sorted_2[label="L2"] sorted_3[label="L3"] sorted_4[label="L4"] sorted_5[label="L5"] sorted_6[label="L6"] sorted_7[label="L7"] subgraph cluster_a{ style=filled; label="a" color="#fffaaa" merge_01[label="L01", width=2] } subgraph cluster_b{ style=filled; label="b" color="#fffaaa" merge_23[label="L23", width=2] } subgraph cluster_d{ style=filled; label="d" color="#fffaaa" merge_45[label="L45", width=2] } subgraph cluster_e{ style=filled; label="e" color="#fffaaa" merge_67[label="L67", width=2] } subgraph cluster_c{ style=filled; label="c" color="#fffaaa" merge_0123[label="L0123", width=4] } subgraph cluster_f{ style=filled; label="f" color="#fffaaa" merge_4567[label="L4567", width=4] } input -> divide_0123 input -> divide_4567 divide_0123 -> divide_01 divide_0123 -> divide_23 divide_4567 -> divide_45 divide_4567 -> divide_67 divide_01:f0 -> sorted_0 divide_01:f1 -> sorted_1 divide_23 -> sorted_2 divide_23 -> sorted_3 divide_45:f0 -> sorted_4 divide_45:f1 -> sorted_5 divide_67:f0 -> sorted_6 divide_67:f1 -> sorted_7 sorted_0 -> merge_01 sorted_1 -> merge_01 sorted_2 -> merge_23 sorted_3 -> merge_23 sorted_4 -> merge_45[constraint=false] sorted_5 -> merge_45 sorted_6 -> merge_67 sorted_7 -> merge_67 rank=same; merge_0123; merge_4567; rank=same; merge_0123; merge_4567; merge_01 -> merge_0123 merge_23 -> merge_0123 merge_45 -> merge_4567 merge_67 -> merge_4567 merge_0123 -> result merge_4567 -> result } ``` ```cpp struct ListNode *merge(struct ListNode **lists, int low, int high) { if (low > high) return NULL; if (low == high) return lists[low]; int mid = low + (high - low) / 2; struct ListNode *left = merge(lists, low, mid); struct ListNode *right = merge(lists, mid + 1, high); return mergeTwoLists(left, right); } struct ListNode *mergeKLists_div_conq(struct ListNode **lists, int listsSize) { return merge(lists, 0, listsSize - 1); } ``` **實驗** 用來測試的程式碼如下,`lscpu` 顯示我的測試環境 L3 Cache 大小為 6MB ,而 `sizeof(struct ListNode) == 16` ,因此測試時故意使用大約兩倍 L3 Cache 大小的資料量來製造 cache miss ,12 * 1024 個 sorted list,每個 sorted list 有 64 個 node 。 ```cpp struct ListNode { int val; struct ListNode *next; }; static struct ListNode *(*mergeKLists[3])(struct ListNode **, int) = { &mergeKLists_headtail, &mergeKLists_interval, &mergeKLists_div_conq }; #define N 12 * 1024 int main(int argc, char **argv){ int k = 2; if (argc == 2) k = argv[1][0] - '0'; struct ListNode *lists[N]; for(int i = 0 ; i < N; i++) { lists[i] = createSortedList(64); } struct ListNode *head = mergeKLists[k](lists, N); return 0; } ``` ```shell $ lscpu Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little Endian Address sizes: 39 bits physical, 48 bits virtual CPU(s): 8 On-line CPU(s) list: 0-7 Thread(s) per core: 2 Core(s) per socket: 4 Socket(s): 1 NUMA node(s): 1 Vendor ID: GenuineIntel CPU family: 6 Model: 71 Model name: Intel(R) Core(TM) i7-5700HQ CPU @ 2.70GHz Stepping: 1 CPU MHz: 1000.000 CPU max MHz: 3500.0000 CPU min MHz: 800.0000 BogoMIPS: 5387.51 Virtualization: VT-x L1d cache: 128 KiB L1i cache: 128 KiB L2 cache: 1 MiB L3 cache: 6 MiB NUMA node0 CPU(s): 0-7 ``` 為了驗證想法是否正確,我嘗試了以下兩個工具分析 * [**perf**](https://perf.wiki.kernel.org/index.php/Tutorial) 首先以 perf 來量測 cache-misses 發生次數,可以看到 Divide and Conquer cache-misses 次數明顯較少,然而,這個量測是有包含 `createSortedList()` 的,測試結果並非完全準確。 ```shell ## 分段合併 $ sudo perf stat -e cache-references,cache-misses ./merge_k_sorted_list 1 Performance counter stats for './merge_k_sorted_list 1': 2692,3703 cache-references 1323,1775 cache-misses # 49.145 % of all cache refs 0.297886217 seconds time elapsed 0.293623000 seconds user 0.004022000 seconds sys ``` ```shell ## Divide and Conquer $ sudo perf stat -e cache-references,cache-misses ./merge_k_sorted_list 2 Performance counter stats for './merge_k_sorted_list 2': 2461,9759 cache-references 813,3442 cache-misses # 33.036 % of all cache refs 0.264960501 seconds time elapsed 0.244755000 seconds user 0.020061000 seconds sys ``` * **[Cachegrind](https://valgrind.org/docs/manual/cg-manual.html)** 為 [Valgrind](https://valgrind.org/) 底下的其中一個 Tool , 專門用來量測 cache 使用與 branch predictor。執行 `valgrind --tool=cachegrind PROGRAM` 來紀錄程式執行過程,`cg_annotate FILE` 可以列出 function-by-function 的 cache access 結果。 測試結果如下,其中 LL 表示 Last Level Cache,這裡我們只關心 LLd misses (Last Level data Cache misses) 與 DLmr (Last Level data cache misses read) 即可。 ```shell ## 分段合併 $ valgrind --tool=cachegrind ./merge_k_sorted_list 1 ==44596== Cachegrind, a cache and branch-prediction profiler ==44596== Copyright (C) 2002-2017, and GNU GPL'd, by Nicholas Nethercote et al. ==44596== Using Valgrind-3.15.0 and LibVEX; rerun with -h for copyright info ==44596== Command: ./merge_k_sorted_list 1 ==44596== --44596-- warning: L3 cache found, using its data for the LL simulation. ==44596== brk segment overflow in thread #1: can't grow to 0x484f000 ==44596== (see section Limitations in user manual) ==44596== NOTE: further instances of this message will not be shown ==44596== ==44596== I refs: 609,569,307 ==44596== I1 misses: 1,044 ==44596== LLi misses: 1,031 ==44596== I1 miss rate: 0.00% ==44596== LLi miss rate: 0.00% ==44596== ==44596== D refs: 316,279,363 (237,687,452 rd + 78,591,911 wr) ==44596== D1 misses: 9,519,364 ( 9,122,157 rd + 397,207 wr) ==44596== LLd misses: 6,165,960 ( 5,770,568 rd + 395,392 wr) ==44596== D1 miss rate: 3.0% ( 3.8% + 0.5% ) ==44596== LLd miss rate: 1.9% ( 2.4% + 0.5% ) ==44596== ==44596== LL refs: 9,520,408 ( 9,123,201 rd + 397,207 wr) ==44596== LL misses: 6,166,991 ( 5,771,599 rd + 395,392 wr) ==44596== LL miss rate: 0.7% ( 0.7% + 0.5% ) $ cg_annotate cachegrind.out.44596 -------------------------------------------------------------------------------- I1 cache: 32768 B, 64 B, 8-way associative D1 cache: 32768 B, 64 B, 8-way associative LL cache: 6291456 B, 64 B, 12-way associative Command: ./merge_k_sorted_list 1 Data file: cachegrind.out.44596 Events recorded: Ir I1mr ILmr Dr D1mr DLmr Dw D1mw DLmw Events shown: Ir I1mr ILmr Dr D1mr DLmr Dw D1mw DLmw Event sort order: Ir I1mr ILmr Dr D1mr DLmr Dw D1mw DLmw Thresholds: 0.1 100 100 100 100 100 100 100 100 Include dirs: User annotated: Auto-annotation: off -------------------------------------------------------------------------------- Ir I1mr ILmr Dr D1mr DLmr Dw D1mw DLmw -------------------------------------------------------------------------------- 609,569,307 1,044 1,031 237,687,452 9,122,157 5,770,568 78,591,911 397,207 395,392 PROGRAM TOTALS -------------------------------------------------------------------------------- Ir I1mr ILmr Dr D1mr DLmr Dw D1mw DLmw file:function -------------------------------------------------------------------------------- 293,695,455 3 3 160,063,477 9,111,294 5,760,742 42,745,848 0 0 /home/bakud/linux2022/merge_k_sorted_list/merge_k_sorted_list.c:mergeTwoLists 110,886,726 19 19 18,874,468 2 1 18,874,312 393,286 393,208 /build/glibc-eX1tMB/glibc-2.31/malloc/malloc.c:_int_malloc 83,533,824 5 5 22,917,120 1 0 4,214,784 2 2 /build/glibc-eX1tMB/glibc-2.31/stdlib/random_r.c:srandom_r 37,748,702 6 6 9,437,178 1 0 3,145,727 0 0 /build/glibc-eX1tMB/glibc-2.31/malloc/malloc.c:malloc 36,470,784 5 5 11,108,352 1 0 5,591,040 1 1 /home/bakud/linux2022/merge_k_sorted_list/merge_k_sorted_list.c:createSortedList 25,067,520 3 3 6,291,456 3 0 2,359,296 0 0 /build/glibc-eX1tMB/glibc-2.31/stdlib/random_r.c:random_r 16,515,072 2 2 6,291,456 0 0 1,572,864 1 1 /build/glibc-eX1tMB/glibc-2.31/stdlib/random.c:random 3,170,521 21 20 1,585,232 10 4 15 2 2 ???:??? 1,572,864 0 0 786,432 1 0 0 0 0 /build/glibc-eX1tMB/glibc-2.31/stdlib/../sysdeps/unix/sysv/linux/x86/lowlevellock.h:random ``` ```shell ## Divide and Conquer $ valgrind --tool=cachegrind ./merge_k_sorted_list 2 ==44614== Cachegrind, a cache and branch-prediction profiler ==44614== Copyright (C) 2002-2017, and GNU GPL'd, by Nicholas Nethercote et al. ==44614== Using Valgrind-3.15.0 and LibVEX; rerun with -h for copyright info ==44614== Command: ./merge_k_sorted_list 2 ==44614== --44614-- warning: L3 cache found, using its data for the LL simulation. ==44614== brk segment overflow in thread #1: can't grow to 0x484f000 ==44614== (see section Limitations in user manual) ==44614== NOTE: further instances of this message will not be shown ==44614== ==44614== I refs: 610,011,501 ==44614== I1 misses: 1,043 ==44614== LLi misses: 1,030 ==44614== I1 miss rate: 0.00% ==44614== LLi miss rate: 0.00% ==44614== ==44614== D refs: 316,574,161 (237,834,817 rd + 78,739,344 wr) ==44614== D1 misses: 8,357,449 ( 7,958,201 rd + 399,248 wr) ==44614== LLd misses: 1,664,146 ( 1,268,706 rd + 395,440 wr) ==44614== D1 miss rate: 2.6% ( 3.3% + 0.5% ) ==44614== LLd miss rate: 0.5% ( 0.5% + 0.5% ) ==44614== ==44614== LL refs: 8,358,492 ( 7,959,244 rd + 399,248 wr) ==44614== LL misses: 1,665,176 ( 1,269,736 rd + 395,440 wr) ==44614== LL miss rate: 0.2% ( 0.1% + 0.5% ) $ cg_annotate cachegrind.out.44614 -------------------------------------------------------------------------------- I1 cache: 32768 B, 64 B, 8-way associative D1 cache: 32768 B, 64 B, 8-way associative LL cache: 6291456 B, 64 B, 12-way associative Command: ./merge_k_sorted_list 2 Data file: cachegrind.out.44614 Events recorded: Ir I1mr ILmr Dr D1mr DLmr Dw D1mw DLmw Events shown: Ir I1mr ILmr Dr D1mr DLmr Dw D1mw DLmw Event sort order: Ir I1mr ILmr Dr D1mr DLmr Dw D1mw DLmw Thresholds: 0.1 100 100 100 100 100 100 100 100 Include dirs: User annotated: Auto-annotation: off -------------------------------------------------------------------------------- Ir I1mr ILmr Dr D1mr DLmr Dw D1mw DLmw -------------------------------------------------------------------------------- 610,011,501 1,043 1,030 237,834,817 7,958,201 1,268,706 78,739,344 399,248 395,440 PROGRAM TOTALS -------------------------------------------------------------------------------- Ir I1mr ILmr Dr D1mr DLmr Dw D1mw DLmw file:function -------------------------------------------------------------------------------- 293,695,455 3 3 160,063,477 7,952,456 1,265,014 42,745,848 0 0 /home/bakud/linux2022/merge_k_sorted_list/merge_k_sorted_list.c:mergeTwoLists 110,886,726 19 19 18,874,468 2 1 18,874,312 393,286 393,208 /build/glibc-eX1tMB/glibc-2.31/malloc/malloc.c:_int_malloc 83,533,824 5 5 22,917,120 1 0 4,214,784 2 2 /build/glibc-eX1tMB/glibc-2.31/stdlib/random_r.c:srandom_r 37,748,702 6 6 9,437,178 1 0 3,145,727 0 0 /build/glibc-eX1tMB/glibc-2.31/malloc/malloc.c:malloc 36,470,784 5 5 11,108,352 1 0 5,591,040 1 1 /home/bakud/linux2022/merge_k_sorted_list/merge_k_sorted_list.c:createSortedList 25,067,520 3 3 6,291,456 3 0 2,359,296 0 0 /build/glibc-eX1tMB/glibc-2.31/stdlib/random_r.c:random_r 16,515,072 2 2 6,291,456 0 0 1,572,864 1 1 /build/glibc-eX1tMB/glibc-2.31/stdlib/random.c:random 3,170,521 21 20 1,585,232 10 4 15 2 2 ???:??? 1,572,864 0 0 786,432 1 0 0 0 0 /build/glibc-eX1tMB/glibc-2.31/stdlib/../sysdeps/unix/sysv/linux/x86/lowlevellock.h:random 823,251 3 3 319,471 2,559 1,543 172,022 3,576 53 /home/bakud/linux2022/merge_k_sorted_list/merge_k_sorted_list.c:merge ``` 由以上結果可知,分段合併的 LLd cache read 總共有 5770568 次 cache-misses,其中 `mergeTwoLists()` 佔了 5760742 次,而 Divide and Conquer 的 LLd cache read 只有 1268706 次 cache-misses,其中 `mergeTwoLists()` 佔了 1265014 次,可以發現到大部分的 cache misses 都是 `mergeTwoLists()` 造成的,而 Divide and Conquer 的 cache misses 次數明顯更少,因此也驗證了 Divide and Conquer 更加 cache friendly 的說法。 ### q_sort 觀察可知如果 [Merge k Sorted Lists](https://leetcode.com/problems/merge-k-sorted-lists/) 的 k 個 list size 都是 1, 其實就等於 merge sort list。因此這裡嘗試使用分段合併跟 Divide and conquer 來實作 `q_sort` **1. 分段合併** ```cpp // return head, head->prev point to tail static struct list_head *mergeTwoSortedList(struct list_head *l1, struct list_head *l2) { struct list_head *head = NULL, **ptr = &head, **node; for (node = NULL; l1 && l2; *node = (*node)->next) { node = strcmp(list_entry(l1, element_t, list)->value, list_entry(l2, element_t, list)->value) < 0 ? &l1 : &l2; *ptr = *node; ptr = &(*ptr)->next; } *ptr = (struct list_head *) ((uintptr_t) l1 | (uintptr_t) l2); // find tail while ((*ptr)->next) ptr = &(*ptr)->next; // head->prev point to head head->prev = *ptr; return head; } static struct list_head *list_split(struct list_head *head, int n) { struct list_head **curr = &head; struct list_head *remain; for (int i = 0; i < n && *curr; i++) { curr = &(*curr)->next; } remain = *curr; *curr = NULL; return remain; } void list_mergesort(struct list_head *head, int n) { head->prev->next = NULL; for (int i = 1; i < n; i *= 2) { struct list_head **l1 = &head->next; while (*l1) { struct list_head *l2 = list_split(*l1, i); struct list_head *remain = list_split(l2, i); *l1 = mergeTwoSortedList(*l1, l2); (*l1)->prev->next = remain; l1 = &(*l1)->prev->next; } } // rebuild prev pointer and circular list head->prev = head->next->prev; head->prev->next = head; head->next->prev = head; struct list_head *curr = head->next; while (curr != head) { curr->next->prev = curr; curr = curr->next; } } void q_sort(struct list_head *head) { if (!head || list_empty(head) || head->prev == head->next) return; int n = q_size(head); list_mergesort(head, n); } ``` 注意到以上實作每一次 for loop 都會排序 power of 2 的倍數個 node ,原理跟前面提到的 `mergeKLists_interval()` 是一樣的,差別在於每次 `list_split` 都會需要走訪 `i` 個 node 才能 split list ,可以預期這個實作每次 for loop 都會比 `mergeKLists_interval()` 多走訪 n 個 node,整體會多走訪 nlog(n) 個 node,相對效能就會更差。 另外,雖然這是 circular doubly-linked list ,但不必急著每次 sort 一小段 list 就還原所有的 `prev` ,等到所有 node 都 sort 完在一次重建即可。 **2. Divide and Conquer** 前面提過的 `mergeKLists_div_conq()` 是使用遞迴作法,更準確來說是使用 top-down merge sort,然而如果這裡 `list_mergesort()` 也使用 top-down,就會需要思考如何 divide list ,一個簡單的作法是使用與 `q_delete_middle()` 相同的快慢指標來找到 middle ,但顯然的這樣做會和分段合併一樣每個遞迴 function level 都需要走訪 n 個 node ,整體相對於 `mergeKLists_div_conq()` 就會多走訪 nlog(n) 個 node,顯然不是很有效率,為了避免此問題,下方的實作採用的是 bottom-up divide and conquer。 ```cpp static void list_split_merge(struct list_head **l1, int n) { struct list_head *l2 = list_split(*l1, n); struct list_head *remain = list_split(l2, n); *l1 = mergeTwoSortedList(*l1, l2); (*l1)->prev->next = remain; } /* * Merge at most n nodes starting from remain, * then merge l1(length n) and remain(at most length n). * Return tail of merged list. */ static struct list_head **list_merge_all(struct list_head **l1, struct list_head **remain, int n) { if (!*l1) return NULL; if (n > 1) { struct list_head **tmp = NULL; for (int i = 1; i < n; i <<= 1) tmp = list_merge_all(remain, tmp, i); } list_split_merge(l1, n); return &(*l1)->prev->next; } void list_mergesort(struct list_head *head) { head->prev->next = NULL; struct list_head **l1 = &head->next; struct list_head **remain = NULL; int i = 1; do { remain = list_merge_all(l1, remain, i); i <<= 1; } while (*remain); // rebuild prev pointer and circular list head->prev = head->next->prev; head->prev->next = head; head->next->prev = head; struct list_head *curr = head->next; while (curr != head) { curr->next->prev = curr; curr = curr->next; } } void q_sort(struct list_head *head) { if (!head || list_empty(head) || head->prev == head->next) return; list_mergesort(head); } ``` 雖然說是 bottom-up merge sort ,但並不代表完全不可以使用遞迴,bottom-up 與 top-down 只是概念上不同,相對於 top-down 作法在每一個遞迴 function level 都需要走訪所有 node 才找出要 split 的 middle 位置,然後才能開始 split and merge, bottom-up 的做法是走到哪就 split and merge 到哪。 注意到 `list_merge_all()` 傳入的 `l1` 與 `remain` 分別代表的是 head of sorted left sub-list 與 head of unsorted right sub-list ,`n` 為要 split and merge 的兩個 list 的 size ,而這裡利用 `list_merge_all()` 回傳值來 maintain head of unsorted sub-list, 就可以避免掉要一直走訪整個 list 來找 middle 的問題。 ### merge sort list 結論 比較兩個方法的 `q_sort` 實作,divide and conquer 比起分段合併少走訪 nlog(n) 個 node,而且更 cache friendly。最後附上加上 `option verbose 0` 後執行測項15的 [perf-stat](https://man7.org/linux/man-pages/man1/perf-stat.1.html) 表現。 ```shell ## 分段合併 $ sudo perf stat ./qtest -f traces/trace-15-perf.cmd cmd> option verbose 0 Performance counter stats for './qtest -f traces/trace-15-perf.cmd': 706.50 msec task-clock # 0.994 CPUs utilized 126 context-switches # 178.344 /sec 0 cpu-migrations # 0.000 /sec 3591 page-faults # 5.083 K/sec 24,1857,8031 cycles # 3.423 GHz 5,2740,5221 instructions # 0.22 insn per cycle 1,0894,3489 branches # 154.201 M/sec 207,3416 branch-misses # 1.90% of all branches 0.711020069 seconds time elapsed 0.702968000 seconds user 0.003994000 seconds sys ``` ```shell ## Divide and conquer $ sudo perf stat ./qtest -f traces/trace-15-perf.cmd cmd> option verbose 0 Performance counter stats for './qtest -f traces/trace-15-perf.cmd': 374.42 msec task-clock # 0.997 CPUs utilized 39 context-switches # 104.161 /sec 0 cpu-migrations # 0.000 /sec 3589 page-faults # 9.585 K/sec 12,7331,8326 cycles # 3.401 GHz 5,3414,8883 instructions # 0.42 insn per cycle 1,0962,3118 branches # 292.781 M/sec 216,6357 branch-misses # 1.98% of all branches 0.375412713 seconds time elapsed 0.375089000 seconds user 0.000000000 seconds sys ``` ## Linux Kernel `list_sort` 實作 嘗試將 Linux Kernel 中的 [lib/list_sort.c](https://github.com/torvalds/linux/blob/master/lib/list_sort.c) 移植到 `q_sort` 使用,首先看到函式宣告。 ```cpp /** * list_sort - sort a list * @priv: private data, opaque to list_sort(), passed to @cmp * @head: the list to sort * @cmp: the elements comparison function */ __attribute__((nonnull(2, 3))) void list_sort(void *priv, struct list_head *head, list_cmp_func_t cmp) ``` `prev`: 傳入給 `cmp` 當參數的 private data `head`: list head `cmp`: compare function, 用來決定 sorting order,以 function pointer 型態傳入 [`__attribute__((nonnull))`](https://www.keil.com/support/man/docs/armcc/armcc_chr1359124976631.htm): 為 compiler 提供的 attribute function ,指定函式傳入的參數必須為 non-null ,如果傳入 null 編譯時期會發出警告。 繼續看到函式註解 ```cpp /* This mergesort is as eager as possible while always performing at least * 2:1 balanced merges. Given two pending sublists of size 2^k, they are * merged to a size-2^(k+1) list as soon as we have 2^k following elements. * * Thus, it will avoid cache thrashing as long as 3*2^k elements can * fit into the cache. Not quite as good as a fully-eager bottom-up * mergesort, but it does use 0.2*n fewer comparisons, so is faster in * the common case that everything fits into L1. ``` 這裡的 mergesort 會盡可能的將已經走訪過的 node 維持在 sorted : unsorted 為 2 : 1的平衡,相對於前面我所使用的 bottom-up divide and conquer 方法會在走訪到 $2^{k+1}$ 的 node 時立刻合併前面的兩個 $2^k$ sub-list,linux kernel 的作法是會等到**走訪到 $3 * 2^k$ 的 node 時才合併前面的兩個 $2^k$ sub-list**。 這種作法在 $3*2^k$ 個 node 能夠完全放進 cache 中的情況下能夠避免 [cache thrashing](https://en.wikipedia.org/wiki/Thrashing_(computer_science)) 的發生。所謂的 cache thrashing 是當 cache 快滿的情況下,在 access data 時就很容易頻繁發生 cache miss,導致需要一直 swap data 進而影響效能。 註解提到,這種作法並非比 full-eager bottom-up (也就是 bottom-up divide and conquer) 的方法好,畢竟兩者相較起來預先走訪較多的 node 會更容易發生 cache miss,但是,在大部份應用情境下,要排序的資料通常都能夠完全放進 L1 cache,那這種作法可以省去 $0.2*n$ 次的 comparison 。 接下來的這段註解需要搭配程式碼來看比較容易懂 ```cpp /* The merging is controlled by "count", the number of elements in the * pending lists. This is beautifully simple code, but rather subtle. * * Each time we increment "count", we set one bit (bit k) and clear * bits k-1 .. 0. Each time this happens (except the very first time * for each bit, when count increments to 2^k), we merge two lists of * size 2^k into one list of size 2^(k+1). ``` * `count`: 用來紀錄 pending lists 內有多少 node ,當每一次 count + 1 時,以二進位來看, `count` 的第 k bit 會設為 1 ,而第 k-1 到 0 bits 會清 0,每當此情況發生時,此演算法會把 2 個 $2^k$ lists 合併為一個 $2^{k+1}$ list。但是有一個例外情況,當 `count` 為 $2^k$ 時不做合併。 舉例: 1. `count = 11 + 1`: 二進位表示由 $1011_2$ 變成 $1100_2$ , bit 2 設為 1,清掉 bit 1 ~ bit 0,則此時演算法會把 2 個 $2^2$ list 合併成 $2^3$ list。 1. `count = 7 + 1`: 二進位表示由 $0111_2$ 變成 $1000_2$ ,因為 `count` 為 $2^3$ 滿足 $2^k$ 條件,因此不會進行合併。 接著搭配程式碼來看 ```cpp= __attribute__((nonnull(2, 3))) void list_sort(void *priv, struct list_head *head, list_cmp_func_t cmp) { struct list_head *list = head->next, *pending = NULL; size_t count = 0; /* Count of pending */ if (list == head->prev) /* Zero or one elements */ return; /* Convert to a null-terminated singly-linked list. */ head->prev->next = NULL; do { size_t bits; struct list_head **tail = &pending; /* Find the least-significant clear bit in count */ for (bits = count; bits & 1; bits >>= 1) tail = &(*tail)->prev; /* Do the indicated merge */ if (likely(bits)) { struct list_head *a = *tail, *b = a->prev; a = merge(priv, cmp, b, a); /* Install the merged result in place of the inputs */ a->prev = b->prev; *tail = a; } /* Move one element from input list to pending */ list->prev = pending; pending = list; list = list->next; pending->next = NULL; count++; } while (list); ... ``` 函式的一開始會把 circular 斷開,接著進入回圈準備做 merge,注意到 merge 期間只會維護 singly-linked list, `prev` 另有用途。 * `pending`: 為 head of pending lists ,而 pending lists 是用於暫存待合併的 sub-list of lists,每一個 sub-list 的 size 都為 power of 2 的倍數,並且各個 sub-list 以 `prev` 連結。 * `tail`: 指標的指標,會指向 `pending` 裡準備要合併的 sub-list,要合併的 sub-list 是由 `count` 決定。 * `list`: 目前走訪到的 node。 這邊直接帶實際例子比較好理解,以下 `count` 以 `x + 1` 表示,`x` 為在 `do while()` 開頭處 (程式碼第 14 行) `count` 的值, `+1` 表示本次 iteration 結束後 `count` 值會 +1,這裡重要的地方是 `count` + 1 時由二進位來看會 set bit k, clear bits k-1 to 0,當 `count` 等於以下: 1. `count = 5 + 1`: 表示目前走訪到第 6 個 node, `list` 指向 a,而 `pending = list->prev` ,pending list 透過 `prev` 將各個 sub-list 連結起來,可以看到下圖的 pending lists 的內容為 ef $\gets$ cd $\gets$ b,分別存了 size 為 2 $\gets$ 2 $\gets$ 1 的 sorted sub-list。 `count` 若以二進位表示是由 $101_2$ 變成 $110_2$,bit 1 設為 1,清掉 bit 0,表示本次 iteration 要把 2 個 $2^1$ sub-list 合併成 $2^2$ sub-list。 此時 `count =` $101_2$ ,在程式碼第18行的 `for loop` 會從 `pending` 出發將 `tail` 移到現在要 merge 的 sub-list 位置,也就是下圖的 node c ,接著準備開始合併 sub-list cd 與 sub-list ef 。 **合併前的 list 狀態:** ```graphviz digraph ele_list { rankdir=LR; node[shape=record]; head [label="head|{<left>prev|<right>next}", style="bold"]; subgraph cluster_list{ style=filled; label="list" color="#fffaaa" a [label="a|{<left>prev|<right>next}", style="bold"]; } subgraph cluster_pending{ style=filled; label="pending" color="#fffaaa" b [label="b|{<left>prev|<right>next}", style="bold"]; } subgraph cluster_tail{ style=filled; label="*tail" color="#fffaaa" c [label="c|{<left>prev|<right>next}", style="bold"]; } d [label="d|{<left>prev|<right>next}", style="bold"]; e [label="e|{<left>prev|<right>next}", style="bold"]; f [label="f|{<left>prev|<right>next}", style="bold"]; null[shape=plaintext]; head:right:e -> e:w[arrowhead=normal, arrowtail=normal]; e:left:w -> null:e[constraint=false]; e:right:e -> f:w[arrowhead=normal, arrowtail=normal]; c:right:e -> d:w[arrowhead=normal, arrowtail=normal]; f:right:e -> c:w[style="invis"] d:right:e -> b:w[style="invis"] b:right:e -> a:w[style="invis"] a:left:w -> b:n[constraint=false]; b:left:w -> c:n[constraint=false]; c:left:w -> e:n[constraint=false]; } ``` 在 `merge` 回傳合併完的 sub-list 後(這裡是 sub-list cdef ),要更新原本 new head of sub-list `a` 的 `prev` ,以及 `*tail` 所指向的 node,程式執行到第 28 行的結果如下圖所示。 **合併後的 list 狀態:** ```graphviz digraph ele_list { rankdir=LR; node[shape=record]; head [label="head|{<left>prev|<right>next}", style="bold"]; subgraph cluster_list{ style=filled; label="list" color="#fffaaa" a [label="a|{<left>prev|<right>next}", style="bold"]; } subgraph cluster_pending{ style=filled; label="pending" color="#fffaaa" b [label="b|{<left>prev|<right>next}", style="bold"]; } subgraph cluster_tail{ style=filled; label="*tail" color="#fffaaa" c [label="c|{<left>prev|<right>next}", style="bold"]; } d [label="d|{<left>prev|<right>next}", style="bold"]; e [label="e|{<left>prev|<right>next}", style="bold"]; f [label="f|{<left>prev|<right>next}", style="bold"]; null[shape=plaintext]; head:right:e -> c:w[arrowhead=normal, arrowtail=normal]; c:left:w -> null:e[constraint=false]; e:right:e -> f:w[arrowhead=normal, arrowtail=normal]; d:right:e -> e:w[arrowhead=normal, arrowtail=normal]; c:right:e -> d:w[arrowhead=normal, arrowtail=normal]; f:right:e -> b:w[style="invis"]; b:right:e -> a:w[style="invis"]; a:left:w -> b:n[constraint=false]; b:left:w -> c:n[constraint=false]; } ``` 2. `count = 3 + 1`: 表示目前走訪到第 4 個 node, `list` 指向 c,而 `pending = list->prev` ,pending lists 透過 `prev` 將各個 sub-list 連結起來,可以看到下圖的 pending lists 的內容為 ef $\gets$ d,分別存了 size 為 2 $\gets$ 1 的 sorted sub-list。 `count` 若以二進位表示是由 $011_2$ 變成 $100_2$,符合前面提到的例外情況,當 `count` 更新為 $2^k$ 時不進行 merge。 此時 `count =` $011_2$ ,在程式碼第18行的 `for loop` 會逐一把 bits 清零,也就不會進入 merge 階段,這裡 `tail` 的位置就不是很重要了。 **不會進行合併的 list 狀態:** ```graphviz digraph ele_list { rankdir=LR; node[shape=record]; head [label="head|{<left>prev|<right>next}", style="bold"]; subgraph cluster_list{ style=filled; label="list" color="#fffaaa" c [label="c|{<left>prev|<right>next}", style="bold"]; } subgraph cluster_pending{ style=filled; label="pending" color="#fffaaa" d [label="d|{<left>prev|<right>next}", style="bold"]; } e [label="e|{<left>prev|<right>next}", style="bold"]; f [label="f|{<left>prev|<right>next}", style="bold"]; subgraph cluster_tail{ style=filled; label="*tail" color="#fffaaa" null[shape=plaintext, height=0]; } head:right:e -> e:w[arrowhead=normal, arrowtail=normal]; e:left:w -> null:e[constraint=false]; e:right:e -> f:w[arrowhead=normal, arrowtail=normal]; d:right:e -> c:w[style="invis"]; f:right:e -> d:w[style="invis"]; c:left:w -> d:n[constraint=false]; d:left:w -> e:n[constraint=false]; } ``` ```cpp=39 /* End of input; merge together all the pending lists. */ list = pending; pending = pending->prev; for (;;) { struct list_head *next = pending->prev; if (!next) break; list = merge(priv, cmp, pending, list); pending = next; } /* The final merge, rebuilding prev links */ merge_final(priv, cmp, head, pending, list); } ``` 接著進到 `list_sort` 的下半段,所有的 node 都存在 pending lists 裡,因此現在要做的就是合併 pending lists ,在 `for` loop 中會將 pending lists 中的 sub-list 由 `pending` 往 `prev` 方向逐一合併,直到剩下兩個 sub-list 後跳出 `for` loop,以下以圖片說明 merge pending lists 前後的狀態。 **在程式碼第 42 行,list 的狀態:** ```graphviz digraph ele_list { rankdir=LR; node[shape=record]; head [label="head|{<left>prev|<right>next}", style="bold"]; subgraph cluster_list{ style=filled; label="list" color="#fffaaa" a [label="a|{<left>prev|<right>next}", style="bold"]; } subgraph cluster_pending{ style=filled; label="pending" color="#fffaaa" b [label="b|{<left>prev|<right>next}", style="bold"]; } c [label="c|{<left>prev|<right>next}", style="bold"]; d [label="d|{<left>prev|<right>next}", style="bold"]; e [label="e|{<left>prev|<right>next}", style="bold"]; f [label="f|{<left>prev|<right>next}", style="bold"]; null[shape=plaintext]; head:right:e -> c:w[arrowhead=normal, arrowtail=normal]; c:left:w -> null:e[constraint=false]; e:right:e -> f:w[arrowhead=normal, arrowtail=normal]; d:right:e -> e:w[arrowhead=normal, arrowtail=normal]; c:right:e -> d:w[arrowhead=normal, arrowtail=normal]; f:right:e -> b:w[style="invis"]; b:right:e -> a:w[style="invis"]; a:left:w -> b:n[constraint=false]; b:left:w -> c:n[constraint=false]; } ``` **跳出迴圈後 list 的狀態:** ```graphviz digraph ele_list { rankdir=LR; node[shape=record]; head [label="head|{<left>prev|<right>next}", style="bold"]; subgraph cluster_list{ style=filled; label="list" color="#fffaaa" a [label="a|{<left>prev|<right>next}", style="bold"]; } b [label="b|{<left>prev|<right>next}", style="bold"]; subgraph cluster_pending{ style=filled; label="pending" color="#fffaaa" c [label="c|{<left>prev|<right>next}", style="bold"]; } d [label="d|{<left>prev|<right>next}", style="bold"]; e [label="e|{<left>prev|<right>next}", style="bold"]; f [label="f|{<left>prev|<right>next}", style="bold"]; null[shape=plaintext]; head:right:e -> c:w[arrowhead=normal, arrowtail=normal]; c:left:w -> null:e[constraint=false]; e:right:e -> f:w[arrowhead=normal, arrowtail=normal]; d:right:e -> e:w[arrowhead=normal, arrowtail=normal]; c:right:e -> d:w[arrowhead=normal, arrowtail=normal]; f:right:e -> a:w[style="invis"]; a:right:e -> b:w[arrowhead=normal, arrowtail=normal]; } ``` 最後進到 `merge_final`,此函式會先把最後的 `a`, `b` 合併後,再把 `prev` 重建回來,值得一提的是中間有註解提到,如果 merge is highly unbalanced,例如一開始就是 sorted list,則有可能會需要更多次 iteration 來重建 `prev`,因為這裡利用 `count` 為 `u8` 型態會 wrap around 的特性,定期的執行 `cmp` , 則 client 可以根據需求在 `cmp` 內 call `cond_resched()` 來讓出 process 執行權限,避免 sorting 執行過久總是佔用執行權限。 目前還不太明白為什麼在 input is already sorted 的情況下會需要比較多次的 iteration,我所認知的 merge is highly unbalanced 是指 list `b` size 遠大於 list `a` size。 ```cpp __attribute__((nonnull(2, 3, 4, 5))) static void merge_final( void *priv, list_cmp_func_t cmp, struct list_head *head, struct list_head *a, struct list_head *b) { struct list_head *tail = head; u8 count = 0; for (;;) { /* if equal, take 'a' -- important for sort stability */ if (cmp(priv, a, b) <= 0) { tail->next = a; a->prev = tail; tail = a; a = a->next; if (!a) break; } else { tail->next = b; b->prev = tail; tail = b; b = b->next; if (!b) { b = a; break; } } } /* Finish linking remainder of list b on to tail */ tail->next = b; do { /* * If the merge is highly unbalanced (e.g. the input is * already sorted), this loop may run many iterations. * Continue callbacks to the client even though no * element comparison is needed, so the client's cmp() * routine can invoke cond_resched() periodically. */ if (unlikely(!++count)) cmp(priv, b, b); b->prev = tail; tail = b; b = b->next; } while (b); /* And the final links to make a circular doubly-linked list */ tail->next = head; head->prev = tail; } ``` 最後回頭看到提出此修改的作者 George Spelvin 寫的 [commit message](https://github.com/torvalds/linux/commit/b5c56e0cdd62979dd538e5363b06be5bdf735a09) > CONFIG_RETPOLINE has severely degraded indirect function call > performance, so it's worth putting some effort into reducing the number > of times cmp() is called. > > This patch avoids badly unbalanced merges on unlucky input sizes. It > slightly increases the code size, but saves an average of 0.2*n calls to > cmp(). Linux kernel 對 [CONFIG_RETPOLINE](https://cateee.net/lkddb/web-lkddb/RETPOLINE.html) 的說明如下 > Compile kernel with the retpoline compiler options to guard against kernel-to-user data leaks by avoiding speculative indirect branches. Requires a compiler with -mindirect-branch=thunk-extern support for full protection. The kernel may run slower. ### Retpoline [Retpoline](https://support.google.com/faqs/answer/7625886) 是由 Google's [Project Zero](https://googleprojectzero.blogspot.com/) team 所提出的方法,用於解決現代 CPU 會遇到的 [branch target injection](https://www.intel.com/content/www/us/en/developer/articles/technical/software-security-guidance/advisory-guidance/branch-target-injection.html) 漏洞。 這個漏洞造成的原因在於現代 CPU 的 [speculative execution](https://en.wikipedia.org/wiki/Speculative_execution) 功能,此功能會讓 CPU 事先預測指令的執行路徑並且先行計算,以節省 [branch](https://en.wikipedia.org/wiki/Branch_(computer_science)) instruction 如 `call`, `jmp`, `ret` 所帶來的開銷,然而此功能卻會在遇到 [indirect branch](https://en.wikipedia.org/wiki/Indirect_branch) 時產生漏洞。 所謂 indirect branch 是指在 branch 所指定的跳轉位址並非絕對位址,而是一個在執行時期才知道位址(如 register 或 pointer of memory address ),以下以 x86 instruction 舉例,表示需要跳轉到 address referenced by r1 register。 ```cpp asm("jmp %r1"); ``` 而 speculative execution 就會去預測這個 indirect branch 所指向的位址,並把預測的位址存在對應的 branch prediction table 中 ,例如 [branch predictor](https://en.wikipedia.org/wiki/Branch_predictor) 內的 branch target buffer (BTB)。 接著回頭來看 Branch target injection,引用 [More details about mitigations for the CPU Speculative Execution issue](https://security.googleblog.com/2018/01/more-details-about-mitigations-for-cpu_4.html) 的解釋: > This attack variant uses the ability of one process to influence the speculative execution behavior of code in another security context (i.e., guest/host mode, CPU ring, or process) running on the same physical CPU core. > Modern processors predict the destination for indirect jumps and calls that a program may take and start speculatively executing code at the predicted location. The tables used to drive prediction are shared between processes running on a physical CPU core, and it is possible for one process to pollute the branch prediction tables to influence the branch prediction of another process or kernel code. 如果能夠汙染 branch prediction table,就有機會讓 CPU 執行到錯誤的 address,導致漏洞發生。因此 Project Zero team 就提出了一個解法稱之為 retpoline (取自 "return” and “trampoline" 的結合),讓 speculative execution 無止盡的原地 "bounce",直到 CPU 真的執行到該 indirect branch instruction 為止。 [Retpoline: a software construct for preventing branch-target-injection](https://support.google.com/faqs/answer/7625886) 文章中以 x86 架構來舉例,`call` instruction 會去執行對應的 target 位址,然後在遇到 `ret` 後跳回到 `call` 的下一行繼續執行,而因為此行為通常用在 function call,其 prediction 命中率很高,因此 Intel 與 AMD 分別針對此設計 return stack buffer (RSB) 與 return address stack (RAS),用來儲存 predictor 預測的 return address。 > The key property here is that the hardware’s cache for the return target (the RSB) and the actual destination which is maintained on stack are distinct. Retpoline 這個解法的重點在於, 存在 hardware cache 上的 RSB 與 stack 上實際維護的 destination 是不同的,因此這裡就有機會在 stack pointer 上動手腳。針對 indirect branch of `jmp` insturction,使用了以下的 constuction 來展開: ```shell jmp *%r11 | call set_up_target; (1) | capture_spec: (4) | pause; | jmp capture_spec; | set_up_target: | mov %r11, (%rsp); (2) | ret; (3) ``` 左側為原始指令,行為是 jump 到 `%r11` register 所存的位址,右側為使用 Retpoline 方法 contruct 後展開的指令,以下為執行順序 (1). 這裡的 `set_up_target` 屬於 direct branch,在這裡 speculative execution 會產生 RSB 儲存下一行的 `capture_spec:` 。 (2). 將 `%r11` 存到 `%rsp` 上,直接修改了 stack pointer 的位址。 (3). 在這裡 speculative execution 與實際 CPU 執行 `ret` 時會真正產生分歧: * Speculative instruction 會跳轉到 RSB 內存的位址,也就是 (4) 的位置執行,並且在這裡無窮 pause and loop。 * 而當 CPU 實際執行到 `ret` 時會發生是 mis-prediction ,因此會遺棄掉 speculative execution 的結果,去執行真正存在 `%rsp` 內的位址,也就是 `%r11`。 而對於 indirect branch of `call` instruction 的 construction 展開如下, ```diff + call *%r11 | jmp set_up_return; + | inner_indirect_branch: | call set_up_target; | capture_spec: | pause; | jmp capture_spec; | set_up_target: | mov %r11, (%rsp); | ret; + | set_up_return: + | call inner_indirect_branch; (1) ``` 相對於 indirect branch of `jmp` 的解法,這裡用綠色來標註兩者的不同,可以看到從開頭就 jump 到 `set_up_return` ,接著就會執行到 outer `call` (1),此時 RSB 與 on-stack value 的值會一樣且會是實際執行時要 return 的值,接著進到 inner `call` 的部份就跟前述的 indirect branch of `jmp` 行為一樣,這種 construction 的好處是只會跟 `jmp` 一樣產生一次的 mis-prediction。 Linux kernel 針對此漏洞所修改的程式碼在 [arch/x86/lib/retpoline.S](https://elixir.bootlin.com/linux/v5.17-rc6/source/arch/x86/lib/retpoline.S#L15) 底下,概念上是一樣的,只是中間多了一個 `lfence` 指令,根據 [Intel® 64 and IA-32 Architectures Developer's Manual: Vol. 1](https://www.intel.com.tw/content/www/tw/zh/architecture-and-technology/64-ia-32-architectures-software-developer-vol-1-manual.html) >The LFENCE instruction establishes a memory fence for loads. It guarantees ordering between two loads and prevents speculative loads from passing the load fence (that is, no speculative loads are allowed until all loads specified before the load fence have been carried out). 可以知道在執行到 `lfence` 時,會禁止 speculative load 繼續執行,直到 CPU 真的 load 完 fence 之前的指令為止。 參考資料: * [Retpoline: a software construct for preventing branch-target-injection](https://support.google.com/faqs/answer/7625886) * [More details about mitigations for the CPU Speculative Execution issue](https://security.googleblog.com/2018/01/more-details-about-mitigations-for-cpu_4.html) * [What is a retpoline and how does it work?](https://stackoverflow.com/a/48099456/11425048) :::info 註1: 針對這個 Retpoline 我還有很多不了解之處,在此就不過多"舉燭" :dizzy_face: ,面對不熟悉的議題目前沒有想法要怎麼做實驗 :exploding_head: ::: :::info 註2: 查了資料發現這個漏洞似乎在當年造成很大討論,而 Intel 針對此漏洞提出的修補 [Indirect Branch Restricted Speculation(IBRS)](https://www.intel.com/content/www/us/en/developer/articles/technical/software-security-guidance/technical-documentation/indirect-branch-restricted-speculation.html) ,因為沒有實際揭露造成效能多大影響,但許多單位包含 Linux kernel 開發者們測試後都發現造成很大的效能影響,因此 Linus Torvalds 針對此 patch 回覆: [the patches are COMPLETE AND UTTER GARBAGE.](https://lkml.iu.edu/hypermail/linux/kernel/1801.2/04628.html) :satisfied: ::: ### Compare 次數 由於上述 Retpoline 大幅影響 CPU speculative execution 的效能,因此減少 compare 次數就顯的重要許多,要進可能減少執行 `cmp` 內 `if else` 的 branch instruction。 > Unfortunately, there's not a lot of low-hanging fruit in a merge sort; it already performs only n*log2(n) - K*n + O(1) compares. The leading coefficient is already at the theoretical limit (log2(n!) corresponds to K=1.4427), so we're fighting over the linear term, and the best mergesort can do is K=1.2645, achieved when n is a power of 2. > >The differences between mergesort variants appear when n is *not* a power of 2; K is a function of the fractional part of log2(n). Top-down mergesort does best of all, achieving a minimum K=1.2408, and an average (over all sizes) K=1.248. However, that requires knowing the number of entries to be sorted ahead of time, and making a full pass over the input to count it conflicts with a second performance goal, which is cache blocking. 這段描述我參考了 [kdnvt](https://hackmd.io/WuqBNTdFQnuNxkFtR6FIWg?view#comparison-%E6%AC%A1%E6%95%B8) 同學的共筆才稍微理解,根據 [Bottom-up Mergesort: A Detailed Analysis](https://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.6.5260) 論文的推導可以得知,以下函式為輸入 sequence 最差情況下比較次數的函數 $C(N) = NlgN - NK(lgN) + O(1)$ 比較次數 $C(N)$ 愈小愈好,而當 $K=1.4427$ 時是 $C(N)$ 的理論最小值,接著 George Spelvin 繼續提到,`list_sort` 若以 top-down mergesort 實作可實現 minimum $K=1.2408$ ,但是此方法必須先知道 list size,那就必須先走訪過整個 list,顯然的不是 cache friendly 的作法。 >The simple eager merge pattern causes bad performance when n is just over a power of 2. If n=1028, the final merge is between 1024- and 4-element lists, which is wasteful of comparisons. (This is actually worse on average than n=1025, because a 1204:1 merge will, on average, end after 512 compares, while 1024:4 will walk 4/5 of the list.) 前面已經討論過 bottom-up mergesort 更加 cache friendly,然而這種 full eager mergesort ,在遇到例如 $N=1028$ 時,最後就會需要合併 size 比為 $1024:4$ 的兩個 list,平均需要比較 $4/5$ 個 list 才能完成合併,相較於 `list_sort` 的實作,2 個 list 最差的 size 比大約為 $2:1$。 我繼續模仿 [kdnvt](https://hackmd.io/WuqBNTdFQnuNxkFtR6FIWg?view#comparison-%E6%AC%A1%E6%95%B8) 同學做的實驗,比較 `list_sort` 與我自己實作的 merge sort 的 $K$ 值,同樣得出類似的結果,在 $N=1028$ 時,我所實作的 merge sort $K$ 值在 $0.32$ 到 $0.93$ 之間跳動,而 `list_sort` 的 $K$ 則很穩定的在 $1.25$ 到 $1.29$ 左右。 而使用隨機順序產生長度由 $N$ 到 $2N−1$ queue 所測出的 $K$ 值取平均的結果如下 ```shell cmd> average_k sort 512 1.028458 l = NULL cmd> average_k sort 1024 1.016955 l = NULL cmd> average_k sort 2048 1.013210 l = NULL cmd> average_k kernel_sort 512 1.209122 l = NULL cmd> average_k kernel_sort 1024 1.208120 l = NULL cmd> average_k kernel_sort 2048 1.208331 l = NULL ``` 同樣可以看到兩個實作差距約在 0.2 左右,與 George Spelvin 在 commit message 中提到的結果一致。