# 2024q1 Homework2 (quiz1+2) contributed by [< `HenryChaing` >](https://github.com/HenryChaing/quiz1-2) ## [第一週測驗題](https://hackmd.io/@sysprog/linux2024-quiz1) :::warning 不用列出參考答案,你要解讀和分析的對象是程式碼,而非列於 Google 表單的選項 ::: ## 測驗一 ### `list_tail` ```c node_t *list_tail(node_t **left) { while ((*left) && (*left)->next) left = &((*left)->next); return *left; } ``` 解釋 : 這個是 `list_tail` 函式,我們必須將指標的指標 `left` ,移到該鏈結串列串列的尾端,才可進行回傳。 ```graphviz digraph graphname{ node[shape=box] left[shape=plaintext,fontcolor=blue] next[label="(addr of A's next)",shape=plaintext,fontcolor=red] now[label="(addr of A)",shape=plaintext,fontcolor=red] { rankdir = LR A[label=A] B[label=B] C[label=C] NULL1[label=NULL,shape=plaintext] } { rank="same"; left->now->A } { rank="same"; next->B } rankdir=LR A->B->C->NULL1 } ``` ```graphviz digraph graphname{ node[shape=box] left[shape=plaintext,fontcolor=blue] next[label="(addr of A's next)",shape=plaintext,fontcolor=red] now[label="(addr of A)",shape=plaintext,fontcolor=red] { rankdir = LR A[label=A] B[label=B] C[label=C] NULL1[label=NULL,shape=plaintext] } { rank="same"; now->A } { rank="same"; left->next->B } rankdir=LR A->B->C->NULL1 } ``` ### `list_length` ```c int list_length(node_t **left) { int n = 0; while (*left) { ++n; left = &((*left)->next); } return n; } ``` 解釋 : 原理及示意圖皆與 `list_tail` 相同, `left` 每間接指到一個節點,計數就會增加一。 ### `quick_sort` ```c void quick_sort(node_t **list) { int n = list_length(list); int value; int i = 0; int max_level = 2 * n; node_t *begin[max_level], *end[max_level]; node_t *result = NULL, *left = NULL, *right = NULL; begin[0] = *list; end[0] = list_tail(list); while (i >= 0) { node_t *L = begin[i], *R = end[i]; if (L != R) { node_t *pivot = L; value = pivot->value; node_t *p = pivot->next; pivot->next = NULL; while (p) { node_t *n = p; p = p->next; list_add(n->value > value ? &right : &left, n); } begin[i] = left; end[i] = &list_tail(left); begin[i + 1] = pivot; end[i + 1] = pivot; begin[i + 2] = right; end[i + 2] = &list_tail(right); left = right = NULL; i += 2; } else { if (L) list_add(&result, L); i--; } } *list = result; } ``` **內層迴圈** : 在這個迴圈當中,會將 `begin[i] +1` 到 `end[i]` 的節點,與 `pivot` 節點作比較,若小於 `pivot` 節點值會加入 `left` 當中,反之則加入 `right` 。這題的原理與前兩題一致,就是要讓 `p` 走訪`begin[i] +1` 到 `end[i]` 的所有節點。 **外層迴圈** : 這裡是利用 `divide and conquer` 的概念,將鏈結串列串列分成 `1. 小於等於 pivot ` , `2. pivot ` 以及 `3. 大於 pivot ` 的三個新的串列,分別儲存於 `begin[i]`,`begin[i+1]`,`begin[i+2]`當中。 而 `end` 則是會紀錄各個 `begin` 的串列尾端,為了當作判別子串列已經排序完成時使用。 ### 避免快速排序法的 worst case 首先快速排序法的 worst case 有很多種,這邊舉例的是研究所常考的 worst case , 也就是 pivot 值,剛好為串列的 最大值或最小值,導致時間函數變成 ${T(n) = T(n-1) + O(n)}$ ,而得到時間複雜度 $O(n^2)$ 。 ``` pivot 為陣列第一個元素 arr = [5,1,2,3,4] pass 1: arr= [1,2,3,4],5 pass 2: arr= 1,[2,3,4],5 pass 3: arr= 1,2,[3,4],5 pass 4: arr= 1,2,3,[4],5 done ``` 後來我採用 median-of-three 試著避免 worst case , 這項方法會採取陣列第一個以及最後一個元素還有陣列中間的元素,並算出三者之間的中位數,而被列為中位數者就會擔當此回合的 pivot 值,藉以避免 worst case 產生。 ### 加入 median-of-three 在快速排序法當中。 > [commit 733b4c2](https://github.com/HenryChaing/quiz1-2/commit/733b4c2256c6d4f89bee5e692694204682856395) ```c= void quick_sort(node_t **list) { int n = list_length(list); int value; int i = 0; int max_level = 2 * n; node_t *begin[max_level], *end[max_level]; node_t *result = NULL, *left = NULL, *right = NULL; begin[0] = *list; end[0] = list_tail(list); while (i >= 0) { node_t *L = begin[i], *R = end[i]; if (L != R) { node_t *pivot = L; value = pivot->value; node_t *p = pivot->next; pivot->next = NULL; while (p) { node_t *n = p; p = p->next; list_add(n->value > value ? &right : &left, n); } begin[i] = left; end[i] = &list_tail(left); begin[i + 1] = pivot; end[i + 1] = pivot; begin[i + 2] = right; end[i + 2] = &list_tail(right); left = right = NULL; i += 2; } else { if (L) list_add(&result, L); i--; } } *list = result; } ``` `line 16` 會得到串列的中間節點 `line 17` 會得到三個節點中的中位數節點 `line 18` 會把中位數節點以及第一個節點的 `value` 值互換 :::warning TODO: 撰寫成更精簡的程式碼。 ::: ### 實際驗證 > [commit 8baec3b](https://github.com/HenryChaing/quiz1-2/commit/8baec3be1f2aad470e2f291b55040ca96869330e) ```diff + #include <time.h> + clock_t t; + t = clock(); quick_sort(&list); + t = clock() - t; + printf("%f seconds.\n",((float)t) / CLOCKS_PER_SEC); ``` 於是我開始對上述的有及沒有採用 `median-of-three` 的快速排序進行了測試,我利用 `time.h` 中的計時方法算出運行快速排序需要花上多少時間。 ```shell 原版快速排序 $ ./quicksort 0.050543 seconds. $ ./quicksort 0.048564 seconds. $ ./quicksort 0.048454 seconds. 採用 median-of-three 結果 $ ./quicksort 0.070798 seconds. $ ./quicksort 0.057685 seconds. $ ./quicksort 0.064300 seconds. ``` 最後結果發現,採用改進的方法並未讓快速排序更有效率,反而增進了近三成的時間成本。 以下是我的推測,因為資料量過大共十萬筆資料。因此極值擔任 `pivot` 機率反而很低,不易出現時間函式為 ${T(n) = T(n-1) + T(1) + O(n)}$ 的情形,反而更有機率為 ${T(n) = T(n/t) + T(1 - n/t) + O(n)}$ ,因此時間複雜度依然為 ${O(n\log{n})}$ 。 這時挑選 `median-of-three` 的過程就會變成額外的負擔,造成執行時間不如原版出色甚至更差。 :::warning TODO: 翻閱教材書,彙整相關分析並提出改進方案。 ::: ### 將 median of three 改進後重新測試 > 參考: [快慢指標](https://hackmd.io/@sysprog/ry8NwAMvT) 及 [yehsudo](https://hackmd.io/@yehsudo/linux2024-homework2) 閱讀了 yehsudo,發現到了快慢指標這項技巧可以讓我們更快找到串列的中間節點,因此也就能更有效提昇 median of three 的效率。 在看完快慢指標的筆記後,發現快慢指標的優勢在於快取記憶體不容易發生分頁錯誤。因為兩個指標會指到重複的節點,因此也讓這些節點產生了時間局部性,相比單一指標確認過長度再移動到中間節點的方式,前者的時間局部性可以有效降低分頁錯誤的機率。 ```graphviz digraph "Circular Linked List" { rankdir = "LR"; node1[shape = circle, label = "1"]; node2[shape = circle, label = "2"]; node3[shape = circle, label = "3"]; node4[shape = circle, label = "4"]; node5[shape = circle, label = "5"]; node0__[shape = none, label = "fast", fontcolor = "blue"]; node1__[shape = circle, label = "1", style = invis]; node2__[shape = circle, label = "2", style = invis]; node0_[shape = none, label = "slow", fontcolor = "red"]; node1_[shape = circle, label = "1", style = invis]; node0_ -> node1_; node0__ -> node1__ -> node2__; node1 -> node2; node2 -> node1; node2 -> node3; node3 -> node2; node3 -> node4; node4 -> node3; node4 -> node5; node5 -> node4; } ``` > [commit 8c85227](https://github.com/HenryChaing/quiz1-2/commit/8c852270bd351135df825a9d28a3c115af03d45d) 這是我用快慢指標重新實作的尋找中間節點函式。 ```c /*lead to mid node*/ node_t *list_mid(node_t **left) { node_t *slow,*fast; slow = fast = &left; while (fast && fast->next) { fast = fast->next->next; slow = slow->next; } return slow; } ``` ```shell 原版快速排序 $ ./quicksort 0.070792 seconds.​ (可能為最差狀況) $ ./quicksort 0.050522 seconds. $ ./quicksort 0.054528 seconds. 改進後 median-of-three 結果 $ ./quicksort 0.044691 seconds. $ ./quicksort 0.049003 seconds. $ ./quicksort 0.050412 seconds. ``` 在認識快慢指標後,認知道時間複雜度相同的方法,可以利用快取特性讓程式執行時間更短。這次 median of three 讓效能有了提昇。 ## 測驗二 ::: danger 不要急著逐行解析程式碼,你應該探討 Timsort 的設計想法和實作議題。 ::: **Timsort 使用情境**: ```graphviz digraph linkedlist { rankdir=LR; subgraph{ node [shape=record]; b [label="{ <prev> |<data> 6 | <next> }"]; c [label="{ <prev> |<data> 5 | <next> }"]; d [label="{ <prev> |<data> 7 | <next> }"]; e [label="{ <prev> |<data> 8 | <next> }"]; b:next:c -> c:prev [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; c:next:c -> d:prev [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; d:next:c -> e:prev [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; } subgraph{ node [shape=record]; f [label="{ <prev> |<data> head | <next> }", width=1.2] g [label="{ <prev> |<data> 0 | <next> }"]; h [label="{ <prev> |<data> 1 | <next> }"]; i [label="{ <prev> |<data> 2 | <next> }"]; j [label="{ <prev> |<data> 3 | <next> }"]; f:next:c -> g:prev [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false, arrowsize=1.2]; g:next:c -> h:prev [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; h:next:c -> i:prev [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; i:next:c -> j:prev [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; } j:next:c -> b:prev:c } ``` Timsort 的使用情境是,當有一組快要完成排序的串列,則我們可以將它分成數個已經排序完成的 run ,接著在進行合併,因此我們可以將上述案例分成以下兩個大小為四的 run 。 ```graphviz digraph linkedlist { rankdir=LR; NULL[shape = plaintext, label = "NULL", group = list] subgraph{ node [shape=record]; b [label="{ <prev> |<data> 6 | <next> }"]; c [label="{ <prev> |<data> 5 | <next> }"]; d [label="{ <prev> |<data> 7 | <next> }"]; e [label="{ <prev> |<data> 8 | <next> }"]; b:next:c -> c:prev [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; c:next:c -> d:prev [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; d:next:c -> e:prev [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; } subgraph{ node [shape=record]; f [label="{ <prev> |<data> head | <next> }", width=1.2] g [label="{ <prev> |<data> 0 | <next> }"]; h [label="{ <prev> |<data> 1 | <next> }"]; i [label="{ <prev> |<data> 2 | <next> }"]; j [label="{ <prev> |<data> 3 | <next> }"]; g:next:c -> h:prev [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; h:next:c -> i:prev [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; i:next:c -> j:prev [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; } f:next:c -> NULL; } ``` 以上是分割完後的預期結果,實際執行我們會預先決定 run 的大小,因為合併時希望 run 的個數為 2 的冪,這樣合併成本才越小。而 run 的大小決定方式,會將串列長度 $N$ 重複執行除以二的動作,每進行除法一次相當於進行一次分割。令分割次數為 $S$ ,而 run 的個數也為 $2^{S}$ , run 的長度也就為 $\dfrac{N}{2^S}$ ,這樣可以保證合併時成本最小。 至於 run 在上述確立完長度後,會將目前串列尚未列入 run 的節點取前 $\dfrac{N}{2^S}$ 個節點,並且將其調換成單調遞增的形式,而在確立每個 run 皆為單調遞增後,就可以進入最後的合併階段。 調整為單調遞增: ```graphviz digraph linkedlist { rankdir=LR; NULL[shape = plaintext, label = "NULL", group = list] subgraph{ node [shape=record]; b [label="{ <prev> |<data> 5 | <next> }"]; c [label="{ <prev> |<data> 6 | <next> }"]; d [label="{ <prev> |<data> 7 | <next> }"]; e [label="{ <prev> |<data> 8 | <next> }"]; b:next:c -> c:prev [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; c:next:c -> d:prev [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; d:next:c -> e:prev [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; } subgraph{ node [shape=record]; f [label="{ <prev> |<data> head | <next> }", width=1.2] g [label="{ <prev> |<data> 0 | <next> }"]; h [label="{ <prev> |<data> 1 | <next> }"]; i [label="{ <prev> |<data> 2 | <next> }"]; j [label="{ <prev> |<data> 3 | <next> }"]; g:next:c -> h:prev [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; h:next:c -> i:prev [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; i:next:c -> j:prev [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; } f:next:c -> NULL; } ``` 合併相鄰的 run: ```graphviz digraph linkedlist { rankdir=LR; subgraph{ node [shape=record]; b [label="{ <prev> |<data> 5 | <next> }"]; c [label="{ <prev> |<data> 6 | <next> }"]; d [label="{ <prev> |<data> 7 | <next> }"]; e [label="{ <prev> |<data> 8 | <next> }"]; b:next:c -> c:prev [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; c:next:c -> d:prev [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; d:next:c -> e:prev [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; } subgraph{ node [shape=record]; f [label="{ <prev> |<data> head | <next> }", width=1.2] g [label="{ <prev> |<data> 0 | <next> }"]; h [label="{ <prev> |<data> 1 | <next> }"]; i [label="{ <prev> |<data> 2 | <next> }"]; j [label="{ <prev> |<data> 3 | <next> }"]; f:next:c -> g:prev [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false, arrowsize=1.2]; g:next:c -> h:prev [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; h:next:c -> i:prev [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; i:next:c -> j:prev [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false]; } j:next:c -> b:prev:c } ``` **設計思維**: 我認為 timsort 目的是要減少比較次數,因為資料若建立在已經快要完成排序的情境下,我們只須逐一確認相鄰節點間值的大小,而不須像傳統合併排序每個 run 之間還須額外比較,並且合併排序的 run 也並不保證為 2 的冪個(以 bottom-up 角度)。因此 timsort 我認為在快要完成排序的串列中屬於合併排序的改進。 ### main ```c= int main(void) { struct list_head sample_head, warmdata_head, testdata_head; int count; int nums = SAMPLES; /* Assume ASLR */ srand((uintptr_t) &main); test_t tests[] = { {.name = "timesort", .impl = timsort}, {NULL, NULL}, }; test_t *test = tests; INIT_LIST_HEAD(&sample_head); element_t *samples = malloc(sizeof(*samples) * SAMPLES); element_t *warmdata = malloc(sizeof(*warmdata) * SAMPLES); element_t *testdata = malloc(sizeof(*testdata) * SAMPLES); create_sample(&sample_head, samples, nums); while (test->impl) { printf("==== Testing %s ====\n", test->name); /* Warm up */ INIT_LIST_HEAD(&warmdata_head); INIT_LIST_HEAD(&testdata_head); copy_list(&sample_head, &testdata_head, testdata); copy_list(&sample_head, &warmdata_head, warmdata); test->impl(&count, &warmdata_head, compare); /* Test */ count = 0; test->impl(&count, &testdata_head, compare); printf(" Comparisons: %d\n", count); printf(" List is %s\n", check_list(&testdata_head, nums) ? "sorted" : "not sorted"); test++; } return 0; } ``` 首先我們先看到測試函式,可以看到測試資料被分為兩類,分別是 `line 31` 使用到的暖身資料集,以及 `line 35` 使用到的測試資料集,它們的資料皆相同,並且都是來自 `line 22` 產生的樣本資料集。 這個函式中值得說的是,使用 `timsort` 的方法是利用函式指標的方式運行,並且 `line 18-20` 行新增陣列的方式是利用自身的數量倍去新增。 ### `find_run` ```c= static struct pair find_run(void *priv, struct list_head *list, list_cmp_func_t cmp) { size_t len = 1; struct list_head *next = list->next, *head = list; struct pair result; if (!next) { result.head = head, result.next = next; return result; } if (cmp(priv, list, next) > 0) { /* decending run, also reverse the list */ struct list_head *prev = NULL; do { len++; list->next = prev; prev = list; list = next; next = list->next; head = list; } while (next && cmp(priv, list, next) > 0); list->next = prev; } else { do { len++; list = next; next = list->next; } while (next && cmp(priv, list, next) <= 0); list->next = NULL; } head->prev = NULL; head->next->prev = (struct list_head *) len; result.head = head, result.next = next; return result; } ``` 這個是 `timsort` 當中非常重要的 `run` 概念,因為 `timsort` 就是要透由合併 `run` 來完成排序。而 `run` 又有一個特性,就是必須是升冪排序的串列,因此我們可以看到 `find_run` 當中 `line 14` 的條件式分成了兩個案例,(1)串列的第一個元素 ### `Timsort` ```c void timsort(void *priv, struct list_head *head, list_cmp_func_t cmp) { stk_size = 0; struct list_head *list = head->next, *tp = NULL; if (head == head->prev) return; /* Convert to a null-terminated singly-linked list. */ head->prev->next = NULL; do { /* Find next run */ struct pair result = find_run(priv, list, cmp); result.head->prev = tp; tp = result.head; list = result.next; stk_size++; tp = merge_collapse(priv, cmp, tp); } while (list); /* End of input; merge together all the runs. */ tp = merge_force_collapse(priv, cmp, tp); /* The final merge; rebuild prev links */ struct list_head *stk0 = tp, *stk1 = stk0->prev; while (stk1 && stk1->prev) stk0 = stk0->prev, stk1 = stk1->prev; if (stk_size <= FFFF) { build_prev_link(head, head, stk0); return; } merge_final(priv, cmp, head, stk1, stk0); } ``` ### 將 Timsort 加入 lab0-c > [commit 360ac91](https://github.com/HenryChaing/lab0-c/commit/360ac91349ac1b152bb2a0ce579e7c11ae85e7ad) 在這次 commit 之後,可以在 `./qtest` 執行 `sort` 以及 `timsort` 兩項命令,分別是 Linux 核心實作的合併排序,以及測驗二中的 `timsort` 。 測試函式如下,比較次數會在每次執行 `element_t` 兩個節點間的比較時增加計數。而計時則是計算執行完排序函式時總過經過的時間。至於測試資料,會因為要測試 `timsort` 之最佳及最差狀況而有所不同,以下提供的是測試 `timsort` 最差狀況時的程式碼。 - [ ] 效能測試函式 ```c //效能測試函式 void q_timsort(struct list_head *head, bool descend) { int count = 0; clock_t begin = clock(); timsort(&count, head, compare); printf(" Elapsed time: %ld\n", clock() - begin); printf(" Comparisons: %d\n", count); if (descend) { q_reverse(head); } } ``` - [ ] 測試資料提供函式 ```c void q_perf(void (*sort)(struct list_head *head, bool descend), struct list_head *head, bool descend) { char s[100]; for (int i = 0; i < 10000; i++) { char randstr_buf[MAX_RANDSTR_LEN]; fill_rand_string(randstr_buf, sizeof(randstr_buf)); q_insert_head(head, randstr_buf); } sort(head, descend); q_swap(head); sort(head, descend); for (int i = 0; i < 10000; i++) { q_remove_head(head, s, 100); } } ``` 測驗函式設計方法如下,由於 `timsort` 是將已經排序過的 run 依序做合併,因此若要展現 `timsort` 的最佳狀況即是把已經快要排序完成的串列進行排序,並且也以相同的測試資料(快要排序完成的串列),對原本的合併排序進行比較。至於測試資料的獲得方式,可以先將資料先進行排序,並插入值極小的資料在串列的任意位置,這樣就可以得到快要排序完成的串列。 以下是最後結果,對快要完成排序的串列進行排序 (timsort 及 mergesort) ,可以看到不論是執行時間或是比較次數,皆是 `timsort` 取得最佳結果。 - [ ] timsort ```shell timsort: cmd> new l = [] cmd> timsort Elapsed time: 420 Comparisons: 15001 l = [] ``` - [ ] mergesort ```shell cmd> new l = [] cmd> sort Elapsed time: 1219 Comparisons: 73280 l = [] ``` 接著是 `timsort` 的最壞狀況,也就是找到一個 run 需要花最久的時間,我的預想是連續單調遞增(減)的資料長度都接近於一。至於測試資料的實作方式,我先將資料作排序之後再進行 `q_swap()` ,這樣就可以得到這次測試的理想資料。 以下是實際測試結果, `timsort` 在執行時間以及比較次數都比合併排序要多,因為先經過排序在經過調換的串列資料不利於 `timsort` 進行整理。 - [ ] timsort ```shell cmd> new l = [] cmd> timsort Elapsed time: 4534 Comparisons: 76711 l = [] ``` - [ ] mergesort ```shell cmd> new l = [] cmd> sort Elapsed time: 1754 Comparisons: 68272 l = [] ``` ### timsort 的 cache 使用分析 > 參考 [Linux 核心專題: 改進 lib/list_sort.c](https://hackmd.io/@sysprog/Hy5hmaKBh) 在參考文章中,有提到 `timsort` 相比合併排序,可以有效降低 `cache miss ratio` 。我採用了相同的快取記憶體分析工具 `cachegrind`,分析快取記憶體的讀寫狀況,並且利用圖像化工具 `kcachegrind` 以數據及色塊有效分析。 **最差狀況(worst case)**: 首先 `kcachegrind` 可以按照所選擇的函式來分析這個函式執行時的快取記憶體使用狀況,至於分析的項目有 (1)L1 cache Instruction Read (2) L1 cache Data Read (miss) (3) L1 cache Data Write (miss) 等快取相關操作結果。 timsort: ![timsort(worst)](https://hackmd.io/_uploads/SkPBA636p.png) merge sort: ![kernel_sort(worst)](https://hackmd.io/_uploads/SkFDAphap.png) 我們這邊選擇關注的是 L1 快取的分頁錯誤,其中量值要看的是 `incl.`(inclusive) 這個項目,這代表這個函式在執行時(包括執行到其他函式)佔這個項目的百分比。例如 L1 快取的分頁錯誤的這個項目,在 `timsort` 發生的比率是 `2%` ,而在合併排序時發生的比率為 `5%` 。 **最佳狀況(best case)**: timsort: ![timsort(best)](https://hackmd.io/_uploads/SyZVX1TpT.png) merge sort: ![kernel_sort(best)](https://hackmd.io/_uploads/r1iBXJ6ap.png) 在著重觀察 L1 快取的分頁錯誤後,我們可以發現 `timsort` 可以助於降低快取分頁錯誤率。而且若資料呈現對 `timsort` 有利的排序,分頁錯誤在排序時發生的比例又更低一些。 :::warning TODO: 採取 [ANOVA 分析](https://en.wikipedia.org/wiki/Analysis_of_variance) ::: --- ## [第二週測驗題](https://hackmd.io/@sysprog/linux2024-quiz2) ## 測驗一 ### `hlist` ```c struct hlist_node { struct hlist_node *next, **pprev; }; struct hlist_head { struct hlist_node *first; }; ``` `hlist` ,這個是 Linux 所設計的特殊雙向鏈結串列串列,與一般雙向鏈結串列串列不同的點在於, `pprev` 是指向上一個節點的 `next` 指標,而不是節點本身。優點是刪除頭節點時無須擔心還需要重新指派的問題,以下是其示意圖: ```graphviz digraph G { rankdir = LR; splines = false; node[shape = "record"] list_head[label = "<m>list_head | <n>first"] node_1[label = "<m>hlist_node_1 | {<p>pprev | <n>next}", group = list]; NULL[shape = plaintext, label = "NULL", group = list] {rank = "min" list_head} list_head -> node_1[ weight = 10, style = invis ] list_head:n -> node_1:m; node_1:p -> list_head:n; node_1:n -> NULL; } ``` ### `hlist_add_head` ```c static inline void hlist_add_head(struct hlist_node *n, struct hlist_head *h) { if (h->first) h->first->pprev = &n->next; n->next = h->first; n->pprev = &h->first; h->first = n; } ``` 這個是 `hlist` 的插入,會將新節點置於頭節點旁,因此四行程式碼皆是在處理鏈結的重新配置,其中條件式的部份是在處理空串列插入的例外情形,如下圖所示。 ```graphviz digraph G { rankdir = LR; splines = false; node[shape = "record"] list_head[label = "<m>list_head | <n>first"] node_1[label = "<m>hlist_node_1 | {<p>pprev | <n>next}", group = list]; NULL[shape = plaintext, label = "NULL", group = list] {rank = "min" list_head} list_head -> node_1[ weight = 10, style = invis ] list_head:n -> node_1:m[color = red]; node_1:p -> list_head:n[color = red]; node_1:n -> NULL[color = red]; } ``` ```graphviz digraph G { rankdir = LR; splines = false; node[shape = "record"] list_head[label = "<m>list_head | <n>first"] node_1[label = "<m>hlist_node_1 | {<p>pprev | <n>next}", group = list]; node_2[label = "<m>hlist_node_2 | {<p>pprev | <n>next}", group = list]; node_3[label = "<m>hlist_node_3 | {<p>pprev | <n>next}", group = list]; NULL[shape = plaintext, label = "NULL", group = list] {rank = "min" list_head} list_head -> node_1 -> node_2 -> node_3[ weight = 10, style = invis ] list_head:n -> node_1:m[color = red]; node_1:p -> list_head:n[color = red]; node_1:n -> node_2:m[color = red]; node_2:p -> node_1:n[color = red]; node_2:n -> node_3:m; node_3:p -> node_2:n; node_3:n -> NULL; } ``` #### Treenode , order_node ```c struct TreeNode { int val; struct TreeNode *left, *right; }; struct order_node { struct hlist_node node; int val; int idx; }; ``` `TreeNode` 是用於最後實際由 `dfs` 函式建樹的鏈結節點,`left` 及 `right` 分別接上左右子樹, `value` 則是該節點的內容。 再來是 `order_node` , `order_node` 就是上述 `hlist` 的節點。 ```graphviz digraph BST { node [fontname="Arial" ]; l1 [ label = "value" ]; l32 [shape=plaintext label = "left" ]; l33 [shape=plaintext label = "right"]; l1 -> { l32 l33 }; } ``` #### node_add ```c static inline void node_add(int val, int idx, int size, struct hlist_head *heads) { struct order_node *on = malloc(sizeof(*on)); on->val = val; on->idx = idx; int hash = (val < 0 ? -val : val) % size; hlist_add_head(&on->node, &heads[hash]); } ``` 這裡的函式雖然寫作 `node_add` ,但實質上是把節點雜湊到雜湊表當中的函式。在 linux 的雜湊表當中,用來處理碰撞機制的方式是採用鏈結串列串列,而鏈結串列串列的結構又如上圖所示,綜合以上描述,以下是它的示意圖: ```graphviz digraph G { rankdir=LR; node[shape=record]; map [label="hlist_head heads[] |<ht0> |<ht1> |<ht2> |<ht3> |<ht4> |<ht5> |<ht7> |<ht8> "]; node[shape=none] null1 [label=NULL] null2 [label=NULL] subgraph cluster_1 { style=filled; color=lightgrey; node [shape=record]; hn1 [label="hlist_node | {<prev>pprev | <next>next}"]; label="order_node 1" } subgraph cluster_2 { style=filled; color=lightgrey; node [shape=record]; hn2 [label="hlist_node | {<prev>pprev | <next>next} "]; label="order_node 2" } subgraph cluster_3 { style=filled; color=lightgrey; node [shape=record]; hn3 [label="hlist_node | {<prev>pprev | <next>next}"]; label="order_node 3" } map:ht1 -> hn1 hn1:next -> hn2 hn2:next -> null1 hn2:prev:s -> hn1:next:s map:ht5 -> hn3 hn3:next -> null2 hn1:prev:s -> map:ht1 hn3:prev:s -> map:ht5 } ``` 因此本函式將 `order_node` 雜湊到對應的鏈結串列串列當中,再透過 `hlist_add_head` 函式將此節點插入到頭節點的位置。 ### `find` ```c static int find(int num, int size, const struct hlist_head *heads) { struct hlist_node *p; int hash = (num < 0 ? -num : num) % size; hlist_for_each (p, &heads[hash]) { struct order_node *on = container_of(p, struct order_node, node); if (num == on->val) return on->idx; } return -1; } ``` 因為有了雜湊這操作,因此這次要從雜湊表當中找出對應的鏈結串列串列以及節點。首先我們會透過雜湊值找到對應的鏈結串列串列,再來透過巨集 `hlist_for_each` 中的迭代,找出對應值得節點。 其中巨集 `container_of` 是用結構變數(`struct`)中的成員資料位址,來反推出結構變數的起始記憶體位址,藉以透過結構變數去作其他操作。 ### `dfs` ```c static struct TreeNode *dfs(int pre_low, int pre_high, int in_low, int in_high) { if (in_low > in_high || pre_low > pre_high) return NULL; struct TreeNode *tn = malloc(sizeof(*tn)); tn->val = preorder[pre_low]; int idx = find(preorder[pre_low], node_count, in_heads); tn->left = dfs(pre_low + 1, pre_low + (idx - in_low), in_low, idx - 1); tn->right = dfs(pre_high - (in_high - idx - 1), pre_high, idx + 1, in_high); return tn; } ``` 最終來到了建樹的部份,但在介紹程式碼之前,要先回顧資料結構所教的 前序+中序 的建樹方式。以下是他的演算法 (已知 前序走訪陣列 及 中序走訪陣列): 1. 選擇前序陣列的第一個元素,作為樹的根節點,此元素值為 $v$ 。 2. 將中序陣列分成 在 $v$ 之前 與 在 $v$ 之後 的兩個陣列 $left$ 及 $right$, 3. 其中 $left$ 及 $right$ 分別擁有左子樹及右子樹的元素。 4. 同理將前序陣列分成 $left$ 及 $right$ ,前序陣列將分割為 { $v$, $left$, $right$ } 的形式。 5. 將前序及中序中的 $left$ 視為新陣列重複上述步驟,直到陣列長度為一, $right$ 同理。 :::danger 避免急著分析程式碼,應揣摩程式碼背後的流程,並在張貼程式碼時就指出缺失。 ::: 其中 `dfs` 就是採用這個演算法,差別是它不新增串列,而是使用陣列上下界的索引值分割 $left$ 及 $right$ 。 ``` --- pass 0 --- 前序[3,9,20,15,7] 中序[9,3,15,20,7] --- pass 1 --- 3 [9][20,15,7] [9] 3 [15,20,7] --- pass 2 --- 20 [15][7] [15] 20 [7] --- done --- ``` ```graphviz digraph BST { node [fontname="Arial" ]; l1 [ label = "3" ]; l21 [ label = "9" ]; l22 [ label = "20,15,7" ]; l1 -> { l21 l22 }; l3 [ label = "3" ]; l4 [ label = "9" ]; l5 [ label = "15" ]; l6 [ label = "7" ]; l7 [ label = "20" ]; l3 -> { l4 l5 }; l5->{ l6 l7}; } ``` ### `buildTree` ```c static struct TreeNode *buildTree() { in_heads = malloc(node_count * sizeof(*in_heads)); for (int i = 0; i < node_count; i++) INIT_HLIST_HEAD(&in_heads[i]); for (int i = 0; i < node_count; i++) node_add(inorder[i], i, node_count, in_heads); return dfs( 0, node_count - 1, 0, node_count - 1); } ``` 這個函式整合了上述概念,讓 `dfs` 可以用多種資料結構實作。首先 `INIT_HLIST_HEAD` ,將 `node_add` 圖中的雜湊表作初始化, `node_add` 則將中序節點雜湊進雜湊表當中,其中節點也會紀錄索引值以及數值內容。 ### 程式碼改進以及測試 > 測試程式碼: [commit 7b99bd3](https://github.com/HenryChaing/quiz1-2/commit/7b99bd3eff2d1f1c56d2bb3b213539712ba842a4) 測試程式碼: ```c void level_order_search(struct TreeNode *BT){ node[end++] = BT; printf("BFS ( binarytree ) = [ "); while(front != end){ struct TreeNode *pop_node = node[front++]; if(!pop_node){ printf("NULL, "); }else{ printf("%d, ",pop_node->val); node[end++] = pop_node->left; node[end++] = pop_node->right; } } printf(" ]\n"); } ``` ``` preorder: [3,9,20,15,7] inorder: [9,3,15,20,7] 輸出結果: BFS ( binarytree ) = [ 3, 9, 20, NULL, NULL, 15, 7, NULL, NULL, NULL, NULL, ] ``` 我利用廣度優先搜尋的方式,還原出我們建立的二元樹。程式碼說明如下,由於廣度優先搜尋可以用佇列的方式實作,因此我先用陣列來實作佇列,其中需要 `front` 及 `last` 作為佇列的頭尾, `pop` 需要將 `front++` , `push` 需要將 `last++` , `front == last` 則佇列為空。 :::danger access 的翻譯是「存取」,而非「拜訪」(visiting) ::: 用佇列來實作廣度搜尋,因為廣度優先搜尋在二元樹中有一層一層存取的特性,因此可以透過將即將存取的節點先加入佇列中,存取後印出結果,並將其子節點加入佇列中的方式,完成廣度優先搜尋。 ```graphviz digraph BST { node [fontname="Arial" ]; l3 [ label = "3" ]; l4 [ label = "9" ]; l5 [ label = "15" ]; l6 [ label = "7" ]; l7 [ label = "20" ]; l8 [shape=plaintext label = "NULL" ]; l9 [shape=plaintext label = "NULL" ]; l10 [shape=plaintext label = "NULL" ]; l11 [shape=plaintext label = "NULL" ]; l12 [shape=plaintext label = "NULL" ]; l13 [shape=plaintext label = "NULL" ]; l3 -> { l4 l5 }; l5->{ l6 l7}; l4->{ l8 l9}; l6->{ l10 l11}; l7->{ l12 l13}; } ``` ### 程式碼改進 > [commit 7b99bd3](https://github.com/HenryChaing/quiz1-2/commit/dddff20e7aab3b9ff604182844f413eeaf793423) 改進程式碼: ```c /*global variable*/ struct TreeNode *node[100]; int front=0, end=0; int preorder[] = {3,9,20,15,7}; int inorder[] = {9,3,15,20,7}; struct hlist_head *in_heads; int node_count; static struct TreeNode *dfs(int pre_low, int pre_high, int in_low, int in_high) { if (in_low > in_high || pre_low > pre_high) return NULL; struct TreeNode *tn = malloc(sizeof(*tn)); tn->val = preorder[pre_low]; int idx = find(preorder[pre_low], node_count, in_heads); tn->left = dfs(pre_low + 1, pre_low + (idx - in_low), in_low, idx - 1); tn->right = dfs(pre_high - (in_high - idx - 1), pre_high, idx + 1, in_high); return tn; } static inline void node_add(int val, int idx, int size, struct hlist_head *heads) { struct order_node *on = malloc(sizeof(*on)); on->val = val; on->idx = idx; int hash = (val < 0 ? -val : val) % size; hlist_add_head(&on->node, &heads[hash]); } static struct TreeNode *buildTree() { in_heads = malloc(node_count * sizeof(*in_heads)); for (int i = 0; i < node_count; i++) INIT_HLIST_HEAD(&in_heads[i]); for (int i = 0; i < node_count; i++) node_add(inorder[i], i, node_count, in_heads); return dfs( 0, node_count - 1, 0, node_count - 1); } int main(){ node_count = sizeof(inorder)/sizeof(inorder[0]); struct TreeNode *BT = buildTree(); } ``` 可以觀察到的是 `buildTree` 及 `dfs` 的參數減少了,因為將其改為函數皆可共用的全域變數。優點在於 `dfs` 屬於遞迴函式,因此每次遞迴時需要在記憶體堆疊中儲存的變數減少了,可以大幅減少記憶體的消耗,也間接提昇了 `dfs` 程式碼的易讀性,因為只剩下前中序的上下限,剛好四個參數。 ### Linux Kernel cgroups 中的 pre-order walk #### struct cgroup_subsys_state ```c struct cgroup_subsys_state { /* siblings list anchored at the parent's ->children */ struct list_head sibling; struct list_head children; u64 serial_nr; /* * PI: the parent css. Placed here for cache proximity to following * fields of the containing structure. */ struct cgroup_subsys_state *parent; }; ``` 此為方便展演的簡化版 `cgroup_subsys_state` 以下簡稱 `css` ,我們即將用前序走訪的方式存取各個 `css` ,其中 `children` 紀錄的是其(左)子節點, `sibling` 紀錄的是其兄弟節點,還有紀錄父節點的 `parent` ,與以往資料結構的二元樹節點略有不同。 CSS 結構變數之間的關係: ```graphviz digraph G { rankdir=LR; node[shape=record]; hn1 [label=" css_1 |<p> parent |...| <s> sibling | <c> child "]; hn2 [label=" css_2 |<p> parent |...| <s> sibling | <c> child "]; hn3 [label=" css_3 |<p> parent |...| <s> sibling | <c> child "]; { rank="same"; } null1 [label=NULL,shape="plaintext"] hn1:c -> hn2:n hn2:s -> hn3:w //hn3:s:e -> hn1 hn2:c:s -> null1 hn3:c -> null1 } ``` CSS 樹狀結構示意圖: ```graphviz digraph BST { rankdir=LR node [fontname="Arial" ]; l1 [ label = "0" ]; l21 [ label = "1" ]; l22 [ label = "3" ]; l3 [ label = "2" ]; NULL1[label=NULL,shape=plaintext] NULL2[label=NULL,shape=plaintext] { rank="same"; l1 -> l21 -> l3 ->NULL1; } { rank="same"; l22 -> NULL2 } l21 -> l22; } ``` #### css_next_descendant_pre ```c= struct cgroup_subsys_state * css_next_descendant_pre(struct cgroup_subsys_state *pos, struct cgroup_subsys_state *root) { struct cgroup_subsys_state *next; cgroup_assert_mutex_or_rcu_locked(); /* if first iteration, visit @root */ if (!pos) return root; /* visit the first child if exists */ next = css_next_child(NULL, pos); if (next) return next; /* no child, visit my or the closest ancestor's next sibling */ while (pos != root) { next = css_next_child(pos, pos->parent); if (next) return next; pos = pos->parent; } return NULL; } EXPORT_SYMBOL_GPL(css_next_descendant_pre); ``` 這個是前序走訪的函式,後面會提到輔助函式 `css_next_child` 。首先這個函式並不是完成前序走訪,而是利用前序的方式找到下一個未存取的節點並回傳該節點。 `line 7` 是確保程式互斥,設立臨界區間。 `line 10-11` 判斷若為第一次存取此 `css` 樹,則直接回傳根節點。 `line 14-16` 按照前序的方式,先存取(左)子節點,若此節點存在則直接回傳該節點。 `line 19-24` 若不存在子節點,則會存取其兄弟節點,若兄弟節點也不存在,則存取父節點的兄弟節點,上述循環到找到節點或回朔回根節點。 :::warning 不要急著「舉燭」,你知道上述程式碼提出的動機嗎?為何不看 git log 呢?先把應用場景說清楚,然後才是詳閱程式碼。 ::: #### css_next_child ```c struct cgroup_subsys_state *css_next_child(struct cgroup_subsys_state *pos, struct cgroup_subsys_state *parent) { struct cgroup_subsys_state *next; cgroup_assert_mutex_or_rcu_locked(); if (!pos) { next = list_entry_rcu(parent->children.next, struct cgroup_subsys_state, sibling); } else if (likely(!(pos->flags & CSS_RELEASED))) { next = list_entry_rcu(pos->sibling.next, struct cgroup_subsys_state, sibling); } else { list_for_each_entry_rcu(next, &parent->children, sibling, lockdep_is_held(&cgroup_mutex)) if (next->serial_nr > pos->serial_nr) break; } if (&next->sibling != &parent->children) return next; return NULL; } ``` 主要會執行第二個條件判斷的結果,也因此加上 likely 確認經常為 true 減少分支指令所造成的危障。而其中 `list_entry_rcu` 代表的是 `container_of` ,而這行的意思也就是說 `pos` 的兄弟節點設為 `next` 。最後則是在判斷若此兄弟節點並非最右端的節點,則回傳節點,反之則回傳不存在兄弟節點。 ## 測驗二 ### `hlist_del` ```c void hlist_del(struct hlist_node *n) { struct hlist_node *next = n->next, **pprev = n->pprev; *pprev = next; if (next) next->pprev = pprev; } ``` 這個是 Linux 雙向鏈結串列串列的節點移除函式,以移除節點為基準,會將前一個節點的 `next` 指向下一個節點,並將下一個節點的間接指標 `pprev` 指向上一個節點的 `next` 指標。以下為示意圖: ```graphviz digraph G { rankdir = LR; splines = false; node[shape = "record"] list_head[label = "<m>list_head | <n>first"] node_1[label = "<m>hlist_node_1 | {<p>pprev | <n>next}", group = list]; node_2[label = "<m>hlist_node_2 | {<p>pprev | <n>next}", group = list]; node_3[label = "<m>hlist_node_3 | {<p>pprev | <n>next}", group = list]; NULL[shape = plaintext, label = "NULL", group = list] {rank = "min" list_head} list_head -> node_1 -> node_2 -> node_3[ weight = 10, style = invis ] list_head:n -> node_1:m; node_1:p -> list_head:n; node_1:n -> node_2:m; node_2:p -> node_1:n; node_2:n -> node_3:m; node_3:p -> node_2:n; node_3:n -> NULL; } ``` ```graphviz digraph G { rankdir = LR; splines = false; node[shape = "record"] list_head[label = "<m>list_head | <n>first"] node_1[label = "<m>hlist_node_1 | {<p>pprev | <n>next}", group = list]; node_2[label = "<m>hlist_node_2 | {<p>pprev | <n>next}", group = list]; node_3[label = "<m>hlist_node_3 | {<p>pprev | <n>next}", group = list]; NULL[shape = plaintext, label = "NULL", group = list] {rank = "min" list_head} list_head -> node_2 -> node_3[ weight = 10, style = invis ] node_1:p -> list_head:n; node_1:n -> node_3:w; list_head:n -> node_2:m; node_2:p -> list_head:n; node_2:n -> node_3:m; node_3:p -> node_2:n; node_3:n -> NULL; } ``` 以上示意圖為移除 `node_1` 後的結果。 ### `LRUCache`, `LRUNode` ```c typedef struct { int capacity; //快取的頁面數 int count; //目前使用的頁面數 struct list_head dhead; //紀錄 LRU struct hlist_head hhead[]; //雜湊表 } LRUCache; typedef struct { int key; //索引值 int value; //儲存內容 struct hlist_node node; struct list_head link; } LRUNode; ``` 為了回答後續的題目,這裡必須介紹這兩個結構變數。 首先是 `LRUCache` , `dhead` 用鏈結串列的形式來儲存最近被使用的頁面,其中越近被使用的頁面,節點會離 `dhead` 越近。 `hhead` 則是用來儲存頁面的雜湊表,與上個測驗相同會被雜湊到對應的鏈節串列。 再來是 `LRUNode` , `node` 會被雜湊並插入鏈結串列當中,而 `link` 則是用來 `dhead` 串接出來的串列,隨著被使用時而改變。 LRUCache & LRUNode 關係示意圖: ```graphviz digraph G { rankdir=LR; node[shape=record]; subgraph cluster_4 { style=filled; color=lightgrey; dhlist_head [label="dhead | {<prev>prev | <next>next}"]; map [label=" hhead[] |<ht0>|<ht1>|<ht2>|<ht3>|<ht4>|<ht5>|<ht7>|<ht8>|<ht9>|<ht10>|<ht11>|<ht12>"]; label="LRUCache" } subgraph cluster_1 { style=filled; color=lightgrey; node [shape=record]; hn5 [label="link | {<prev>prev | <next>next} "]; hn1 [label="node | {<prev>pprev | <next>next} "]; label="LRUNode 1" } subgraph cluster_3 { style=filled; color=lightgrey; node [shape=record]; hn6 [label="link | {<prev>prev | <next>next} "]; hn3 [label="node | {<prev>pprev | <next>next}"]; label="LRUNode 2" } null1 [label=NULL,shape="plaintext"] null2 [label=NULL,shape="plaintext"] map:ht1 -> hn1 hn1:next -> null1 map:ht10 -> hn3 hn3:next -> null2 hn1:prev:s -> map:ht1 hn3:prev:s -> map:ht10 dhlist_head:next -> hn5:prev [color = red] hn5:next-> hn6 [color = red] hn6:next ->dhlist_head:e [color = red] } ``` #### `lRUCacheFree` ```c void lRUCacheFree(LRUCache *obj) { struct list_head *pos, *n; list_for_each_safe (pos, n, &obj->dhead) { LRUNode *cache = list_entry(pos, LRUNode, link); list_del(&cache->link); free(cache); } free(obj); } ``` `list_entry` 就是 `container_of` 巨集,因此可以透過 `list` 指標找到對應的 `LRUNode` ,至於 `list_del` 則是要將這個節點從鏈結中移除。 #### lRUCacheGet ```c int lRUCacheGet(LRUCache *obj, int key) { int hash = key % obj->capacity; struct hlist_node *pos; hlist_for_each (pos, &obj->hhead[hash]) { LRUNode *cache = list_entry(pos, LRUNode, node); if (cache->key == key) { list_move(&cache->link, &obj->dhead); return cache->value; } } return -1; } ``` 這個函式目的是在快取記憶體中用索引值查找對應的儲存資料,其中 `list_move` 是在確認找到資料後執行。 `list_move` 要作的是把對應的節點插入到頭節點旁,而這麼作的目的是因為 `dhead` 要更新 LRU 紀錄,因為找到的節點剛被使用,因此這項節點在 `dhead` 當中需要被更新。 #### lRUCachePut ```c void lRUCachePut(LRUCache *obj, int key, int value) { LRUNode *cache = NULL; int hash = key % obj->capacity; struct hlist_node *pos; hlist_for_each (pos, &obj->hhead[hash]) { LRUNode *c = list_entry(pos, LRUNode, node); if (c->key == key) { list_move(&c->link, &obj->dhead); cache = c; } } if (!cache) { if (obj->count == obj->capacity) { cache = list_last_entry(&obj->dhead, LRUNode, link); list_move(&cache->link, &obj->dhead); hlist_del(&cache->node); hlist_add_head(&cache->node, &obj->hhead[hash]); } else { cache = malloc(sizeof(LRUNode)); hlist_add_head(&cache->node, &obj->hhead[hash]); list_add(&cache->link, &obj->dhead); obj->count++; } cache->key = key; } cache->value = value; } ``` 這項操作是想將新的內容放入快取當中,但是會有三種不同的狀況,分別是: 1. 資料已儲存在快取中 2. 快取空間已滿且資料不在快取中 3. 快取未滿且資料不在快取當中 針對第一種情形,我們需要更新該頁面的內容,並且透過 `list_move` 更新 LRU 的紀錄。 再來是第二種情形,這時需要從雜湊表刪除最久沒被使用的資料,並且插入新的節點到雜湊表的鏈結串列,最後更新 LRU 。第三種情形則額外配置節點空間,插入新節點到雜湊表,更新 LRU 。 ### 測試程式及改善程式碼 > 測試程式: [commit 8a40180](https://github.com/HenryChaing/quiz1-2/commit/8a4018088c53dfbeddd46bdd7b215f829ff83ed0) ```c int main(){ LRUCache *cache = lRUCacheCreate(2); lRUCachePut(cache,0,100); lRUCachePut(cache,1,200); lRUCachePut(cache,2,300); printf("[key:value] = {[0,%d],[1,%d],[2,%d]}\n",lRUCacheGet(cache,0),lRUCacheGet(cache,1),lRUCacheGet(cache,2)); lRUCacheFree(cache); } ``` 我們的測試函式主要是要測驗快取的 LRU 機制是否正常運行。因此我們新增了一個頁面數為2的快取,並對快取放入數值三次,最後檢查兩個頁面的儲存情形。 在此例子當中,我們先放入 (鍵值,數值): (0,100), (1,200), (2,300)。在放入第三個數值時,會遇到空間已滿的問題,這時會把紀錄 LRU 串列中的最後一個節點移到前方,並把雜湊表中的節點移除,最後雜湊新的節點並把剛才移到前方的節點數值改為新的數值。 因此最後印出來的結果為。確實地把最不常使用的 (0,100) 移除。 #### 改善程式碼 ```diff +int capacity = obj->capacity; -int hash = key % obj->capacity; -if (obj->count == obj->capacity) { +int hash = key % capacity; +if (obj->count == capacity) { ``` 這個是 `lRUCachePut` 當中必定會執行到的兩行的程式碼,而 `obj->capacity` 會比一般變數還需要多一次記憶體存取,因此我先將其設為常數變數,由此加快快取記憶體的置入。 ### Linux 核心 LRU 相關程式碼 在經過搜尋後,發現 `lib/lru_cache.c` 有紀錄跟 LRU 相關的程式碼,並且與本題相同是在實作 cache 的替換機制。以下探討會著重在 cache 的資料放入以及提出資料的操作。 #### `__lc_get` ```c=351 static struct lc_element *__lc_get(struct lru_cache *lc, unsigned int enr, unsigned int flags) { struct lc_element *e; PARANOIA_ENTRY(); if (test_bit(__LC_STARVING, &lc->flags)) { ++lc->starving; RETURN(NULL); } e = __lc_find(lc, enr, 1); /* if lc_new_number != lc_number, * this enr is currently being pulled in already, * and will be available once the pending transaction * has been committed. */ if (e) { if (e->lc_new_number != e->lc_number) { /* It has been found above, but on the "to_be_changed" * list, not yet committed. Don't pull it in twice, * wait for the transaction, then try again... */ if (!(flags & LC_GET_MAY_USE_UNCOMMITTED)) RETURN(NULL); /* ... unless the caller is aware of the implications, * probably preparing a cumulative transaction. */ ++e->refcnt; ++lc->hits; RETURN(e); } /* else: lc_new_number == lc_number; a real hit. */ ++lc->hits; if (e->refcnt++ == 0) lc->used++; list_move(&e->list, &lc->in_use); /* Not evictable... */ RETURN(e); } /* e == NULL */ ++lc->misses; if (!(flags & LC_GET_MAY_CHANGE)) RETURN(NULL); /* To avoid races with lc_try_lock(), first, mark us dirty * (using test_and_set_bit, as it implies memory barriers), ... */ test_and_set_bit(__LC_DIRTY, &lc->flags); /* ... only then check if it is locked anyways. If lc_unlock clears * the dirty bit again, that's not a problem, we will come here again. */ if (test_bit(__LC_LOCKED, &lc->flags)) { ++lc->locked; RETURN(NULL); } /* In case there is nothing available and we can not kick out * the LRU element, we have to wait ... */ if (!lc_unused_element_available(lc)) { set_bit(__LC_STARVING, &lc->flags); RETURN(NULL); } /* It was not present in the active set. We are going to recycle an * unused (or even "free") element, but we won't accumulate more than * max_pending_changes changes. */ if (lc->pending_changes >= lc->max_pending_changes) RETURN(NULL); e = lc_prepare_for_change(lc, enr); BUG_ON(!e); clear_bit(__LC_STARVING, &lc->flags); BUG_ON(++e->refcnt != 1); lc->used++; lc->pending_changes++; RETURN(e); } ``` :::warning 不要急著「舉燭」,你知道上述程式碼提出的動機嗎?為何不看 git log 呢?先把應用場景說清楚,然後才是詳閱程式碼。 ::: > 參照 [Linux 核心設計: 淺談同步機制](https://hackmd.io/@owlfox/SyVVY3EgI/https%3A%2F%2Fhackmd.io%2Fs%2FSJpp-bN0m#Universal-Scalability-Law-USL) 之所以提出資料會看似這麼複雜,是因為這個函式有一半在做臨界區間的控制,以及必要的除錯,而其中控制行程是否能進入臨界區間的方式是採用 `test_bit` , `set_bit` 等位元操作,以下會進行說明。 `line 355` 除了除錯環節最後就是會進行 `test_and_set()` ,目的是防止其他程式也要在之後的時間同時進入臨界區間,利用 `semaphore` 的方式達成互斥,並且使用分佈在各行的 `RETURN()` 離開臨界區間,保障可進行性。 `line 356~359` 會利用 `test_bit` 觀察目前 lrucache 是否處於 STARVING 的狀態(也就是 cache 空間全滿),若處於 STARVING 則必須等待,反之若不為 STARVING 則可以進行後續存取動作。 `line 386~386` 若 `lc_find` 有在 cache slot 中找到資料,則回傳資料,也要更新 LRU 將這筆資料移到 `in_use` 串列的最前端。 #### lc_put ```c=549 unsigned int lc_put(struct lru_cache *lc, struct lc_element *e) { PARANOIA_ENTRY(); PARANOIA_LC_ELEMENT(lc, e); BUG_ON(e->refcnt == 0); BUG_ON(e->lc_number != e->lc_new_number); if (--e->refcnt == 0) { /* move it to the front of LRU. */ list_move(&e->list, &lc->lru); lc->used--; clear_bit_unlock(__LC_STARVING, &lc->flags); } RETURN(e->refcnt); } ``` 確認快取資料未再使用後,將發生資料置換,並且清除使用以及 STARVING 狀態。 #### lru_cache.h ```c /* flag-bits for lru_cache */ enum { /* debugging aid, to catch concurrent access early. * user needs to guarantee exclusive access by proper locking! */ __LC_PARANOIA, /* annotate that the set is "dirty", possibly accumulating further * changes, until a transaction is finally triggered */ __LC_DIRTY, /* Locked, no further changes allowed. * Also used to serialize changing transactions. */ __LC_LOCKED, /* if we need to change the set, but currently there is no free nor * unused element available, we are "starving", and must not give out * further references, to guarantee that eventually some refcnt will * drop to zero and we will be able to make progress again, changing * the set, writing the transaction. * if the statistics say we are frequently starving, * nr_elements is too small. */ __LC_STARVING, }; #define LC_PARANOIA (1<<__LC_PARANOIA) #define LC_DIRTY (1<<__LC_DIRTY) #define LC_LOCKED (1<<__LC_LOCKED) #define LC_STARVING (1<<__LC_STARVING) ``` 以上是關於這些 bitops 的解釋,例如 `PARANOIA` 為除錯用, `STARVING` 為確認沒有可用的快取資料。 ## 測驗三 #### `__ffs` ```c static inline unsigned long __ffs(unsigned long word) { int num = 0; #if BITS_PER_LONG == 64 if ((word & 0xffffffff) == 0) { num += 32; word >>= 32; } #endif if ((word & 0xffff) == 0) { num += 16; word >>= 16; } if ((word & 0xff) == 0) { num += 8; word >>= 8; } if ((word & 0xf) == 0) { num += 4; word >>= 4; } if ((word & 0x3) == 0) { num += 2; word >>= 2; } if ((word & 0x1) == 0) num += 1; return num; } ``` :::danger power of 2 的翻譯是「2 的冪」 $\to$ [資訊科技詞彙翻譯](https://hackmd.io/@sysprog/it-vocabulary) ::: 這個是找到這個字組從 LSB 數來的第一個設定位元 (set bit)位置(從零開始起算)。作法是用遮罩的方式進行,原理如下: 因為數字必定可以使用二的冪之<s>二的密次方</s> 和的方式呈現,因此假設這個字組尾端連續7個數字為零,就可以將其拆解成 ${7 = 2^2+2^1+2^0}$ 。上述程式碼也運用了相同的原理,會逐一使用遮罩判別是否為二的冪次方個零並把字組移位,運行如下: ``` word = 11011010000000 num = 0 --- (word & 0xf) == 0 成立 --- word = 1101101000 num = 4 --- (word & 0x3) == 0 成立 --- word = 11011010 num = 6 --- (word & 0x1) == 0 成立 --- word = 1101101 num = 7 --- done --- ``` 因此我們藉由數尾端連續零的個數,來找到第一個被設定的位元的位置,有了這項功能才能接著 `fns` 的實作。 #### `__clear_bit` ```c static inline void __clear_bit(unsigned long nr, volatile unsigned long *addr) { unsigned long mask = BIT_MASK(nr); unsigned long *p = ((unsigned long *) addr) + BIT_WORD(nr); *p &= ~mask; } ``` 首先 `BIT_MASK` 是之前定義的巨集,它會建立只有 `nr` 位置設置(為1)的遮罩。再來是 `BIT_WORD` 巨集,它會因位元位置在字組之內回傳0,反之回傳1。 在假設 `BIT_WORD` 回傳為 0 的情況。接下來要進行清除位元的動作,而運用的就是遮罩的原理。目前 mask 只有要清除的位元位置為 1,若直接進行 and 位元運算則只會留下清除位元。因此我們將 mask <s>取反</s> 位元反轉,再進行 and 位元運算,就可以成功留下清除位元外的所有位元。 ``` 清除第5個位元: p = 10010010 mask = 00010000 ~mask = 11101111 p & ~mask = 10000010 ``` #### fns ```c static inline unsigned long fns(unsigned long word, unsigned int n) { while (word) { unsigned int bit = __ffs(word); if (n-- == 0) return bit; __clear_bit(bit, &word); } return BITS_PER_LONG; } ``` 這個函式的全名是 `find nth set bit` ,也就是要找出字組 (word) 中從 LSB 數來第 n 個被設定的位元位置。 首先從 `__ffs` 開始看起,它回傳的是現在字組第一個被設定的位元位置,而在經由條件式判斷並非原始字組第 n 個設定位元之後,會經由 `__clear_bit` 將該位元清除為0。 其中最外層的迴圈在判斷若還未找到第 n 個位元,但字組已被清除為全0,則回傳 64 表示並未找到。最後條件式會在找到第 n 個設定位元時成立,並回傳其位元位置。 :::warning 提及微處理器架構的特性 ::: ``` 找到第4個位元位置: n=4 1101011 --- 第一次迭代 --- n=3 110101[0] --- 第二次迭代 --- n=2 11010[0]0 --- 第三次迭代 --- n=1 110[0]000 --- 第四次迭代 --- n=0 1[0]00000 return 5 --- 結束 --- ``` ### Linux `find_nth_bit` 的應用案例 #### cpumask_nth ```c static inline unsigned int cpumask_nth(unsigned int cpu, const struct cpumask *srcp) { return find_nth_bit(cpumask_bits(srcp), small_cpumask_bits, cpumask_check(cpu)); } ``` 首先這裡會計算可用的處理器並將其整理為位元字串形式,再利用 `find_nth_bit` 找到相對應的處理器。 #### drivers/crypto/intel/iaa/iaa_crypto_main.c ```c /* * Rebalance the wq table so that given a cpu, it's easy to find the * closest IAA instance. The idea is to try to choose the most * appropriate IAA instance for a caller and spread available * workqueues around to clients. */ static void rebalance_wq_table(void) { const struct cpumask *node_cpus; int node, cpu, iaa = -1; if (nr_iaa == 0) return; pr_debug("rebalance: nr_nodes=%d, nr_cpus %d, nr_iaa %d, cpus_per_iaa %d\n", nr_nodes, nr_cpus, nr_iaa, cpus_per_iaa); clear_wq_table(); if (nr_iaa == 1) { for (cpu = 0; cpu < nr_cpus; cpu++) { if (WARN_ON(wq_table_add_wqs(0, cpu))) { pr_debug("could not add any wqs for iaa 0 to cpu %d!\n", cpu); return; } } return; } for_each_node_with_cpus(node) { node_cpus = cpumask_of_node(node); for (cpu = 0; cpu < nr_cpus_per_node; cpu++) { int node_cpu = cpumask_nth(cpu, node_cpus); if (WARN_ON(node_cpu >= nr_cpu_ids)) { pr_debug("node_cpu %d doesn't exist!\n", node_cpu); return; } if ((cpu % cpus_per_iaa) == 0) iaa++; if (WARN_ON(wq_table_add_wqs(iaa, node_cpu))) { pr_debug("could not add any wqs for iaa %d to cpu %d!\n", iaa, cpu); return; } } } } ``` 這是我找到跟 CPU 使用偏好最相近的程式碼,但還是與 CPU Affinity 的意思相去甚遠,這段程式碼主要是為了平衡 `wq_table` 並找出最合適的處理器作分配使用。 :::warning 不要急著「舉燭」,你知道上述程式碼提出的動機嗎?為何不看 git log 呢?先把應用場景說清楚,然後才是詳閱程式碼。 :::