# 2024q1 Homework2 (quiz1+2) contributed by < `jujuegg` > ## 第一周測驗題 ### 測驗一 本測驗是在鏈結串列上實作 quick sort,且是非遞迴版本的,使用==堆疊==來模擬原本的遞迴呼叫。 #### 程式碼解讀 ```c // variables node_t *begin[max_level], *end[max_level]; node_t *result = NULL, *left = NULL, *right = NULL; ``` - `begin` 與 `end` 是用來記錄 `pivot` 未完成排序的節點。 - `max_level` 是串列長度的 2 倍。 - `left` 與 `right` 分別存放比 `pivot` 大/小 的節點。 ```c begin[0] = *list; end[0] = list_tail(list); ``` `begin` 紀錄頭端,而 `end` 紀錄尾端,所以一開始把整個串列視為未完成排序。 ```c node_t *L = begin[i], *R = end[i]; ``` 每一輪使用 `L` 和 `R` 此輪要排序的串列的頭和尾端。`i` 是用來告知程式碼目前要處理的未完成排序的串列的所在位置。 ##### 此輪有不只一個節點時 ```c node_t *pivot = L; value = pivot->value; node_t *p = pivot->next; pivot->next = NULL; ``` 如果兩個節點的值不一樣,則代表長度 > 1,即可把第一個節點當作 pivot,也就是 `L` 節點。 接著用 `p` 來走訪整個串列。 ```c while (p) { node_t *n = p; p = p->next; //CCCC list_add(n->value > value ? &right : &left, n); } ``` `p` 會提前跑到下一個節點的位置,有點像 `list_for_each_safe` 的概念。並把目前節點與 `pivot` 比較,大的就加進 `right`;小於等於的加進 `left`。 ```c begin[i] = left; end[i] = list_tail(&left); //DDDD begin[i + 1] = pivot; end[i + 1] = pivot; begin[i + 2] = right; end[i + 2] = list_tail(&right); //EEEE left = right = NULL; i += 2; ``` 分好大小邊後,把 `left` 裡面的節點設置為「第 ==i== 個」未處理的串列,把 `right` 裡面的節點設置為「第 ==i+2== 個」未處理的串列。而 `pivot` 設置為「第 ==i+1== 個」未處理的串列。 最後把兩邊清空,調整 `i`,準備下一輪。 ##### 此輪 只有一個節點/沒有節點 時 ```c if (L) list_add(&result, L); i--; ``` 若此輪只有一個節點,此時它會是所有未完成排序的節點中值最大的,直接將其加到最終串列中。這蠻好理解的,因為每輪結束的時候,`i` 會加 2,代表程式選取 `right` 串列作為下一輪的輸入。 要注意,如果上一輪中,沒有存在比 `pivot` 大的值,那這邊有可能會是空的。 #### 圖解 - 設 `list` 如下: ```graphviz digraph structs { node[shape=plaintext]; begin_0 [fontcolor="red"]; end_0 [fontcolor="red"]; pivot [fontcolor="black"] node[shape=box]; struct0 [label= "3"]; struct1 [label= "4"]; struct2 [label= "5"]; struct3 [label= "2"]; struct4 [label= "1"]; { rank="same"; struct0 -> struct1 struct1 -> struct2 struct2 -> struct3 struct3 -> struct4 } begin_0 -> struct0 end_0 -> struct4 pivot -> struct0 } ``` - 把 `pivot` 以外的所有節點逐一跟 `pivot` 比較,並分到 `right` / `left` 中。 ```graphviz digraph structs { node[shape=plaintext]; pivot [fontcolor="black"] right [fontcolor="blue"] left [fontcolor="blue"] node[shape=box]; struct0 [label= "3"]; struct1 [label= "4"]; struct2 [label= "5"]; struct3 [label= "2"]; struct4 [label= "1"]; { rank="same"; struct2 -> struct1 struct4 -> struct3 } left -> struct4 pivot -> struct0 right -> struct2 } ``` - 把這 3 個串列分別放入 `begin` 與 `end` 正確的位置。下一輪則用 `i == 2` 的位置進行排序。 ```graphviz digraph structs { node[shape=plaintext]; begin_0 [fontcolor="red"] begin_1 [fontcolor="orange"] begin_2 [fontcolor="purple"] end_0 [fontcolor="red"] end_1 [fontcolor="orange"] end_2 [fontcolor="purple"] node[shape=box]; struct0 [label= "3"]; struct1 [label= "4"]; struct2 [label= "5"]; struct3 [label= "2"]; struct4 [label= "1"]; { rank="same"; struct2 -> struct1 struct4 -> struct3 } begin_0 -> struct4 end_0 -> struct3 begin_1 -> struct0 end_1 -> struct0 begin_2 -> struct2 end_2 -> struct1 } ``` #### 優化方案 - 或許 `begin` 與 `end` 也可以用鏈結串列來儲存? - 動機:程式利用 `i` 來存取這兩個陣列,但 `i` 在執行程式的過程中並不會大範圍跳躍,最多連續加 2 而已,可以避免宣告不必要的記憶體空間。 - 使用雙向鏈結串列。 #### 使用 Linux 核心風格的 List API 改寫 :::info [程式碼](<https://github.com/jujuegg/Linux-HW2/tree/master/QuickSort>) ::: 首先把 `quick_sort.h` 裡面宣告的結構改成用 `list_head`,以及把上個作業的 `list.h` 拿過來使用,有些重複的函式就統一使用內建的 API。 ```diff typedef struct __node { - struct __node *left, *right; - struct __node *next; + struct list_head list; long value; } node_t; ``` :::info 接著是最麻煩的地方,雖然想法很簡單,但是我卻在這邊花了將近半天的時間,期間一直發生 Segmentation fault,非常的折磨人。但沒辦法,菜就多練... ::: 我建立了另一個結構 `begin_t`,它的作用是儲存那些尚未完成排序的串列的 「`head`」,然後再將此節點存入另一個鏈結串列中,==並不像原本的程式碼是用陣列來模擬堆疊==。原諒我不會取名。 ```c typedef struct begins { struct list_head list; struct list_head *head; } begin_t; ``` 現在要使用 Linux 的 API,所以變數形態要更換,且因為使用雙向鏈結串列,所以不須留下 `end`。 ```diff - node_t *begin[max_level], *end[max_level]; - node_t *result = NULL, *left = NULL, *right = NULL; + struct list_head *begins = malloc(sizeof(struct list_head)); + struct list_head *result = malloc(sizeof(struct list_head)); + struct list_head *left = malloc(sizeof(struct list_head)); + struct list_head *right = malloc(sizeof(struct list_head)); + struct list_head *mid = malloc(sizeof(struct list_head)); ``` 一開始把整個串列的 `head` 放進 `begins` 中。 `now` 是用來模擬堆疊的 `top` 的位置。 `begins` 則是目前==所有==未完成排序的串列所在的堆疊。 ```c begin_t *all = new_begin(*head); list_add_tail(&all->list, begins); struct list_head *now = begins->next; ``` `now_entry_head` 是這輪要處理的未完成排序的串列的 `head`。 ```c struct list_head *now_entry_head = list_entry(now, begin_t, list)->head; ``` `pivot` 放入中間的串列。 ```c node_t *pivot = list_first_entry(now_entry_head, node_t, list); list_del(&pivot->list); list_add(&pivot->list, mid); ``` 中間將資料依照大小區分為 `left` 與 `right` 的過程因為與之前無異,所以省略不提。 因為要依序將 `left`、`pivot`、`right` 放入 `begins` 中,所以會需要額外在堆疊中增加 2 個節點,而 `left` 就會直接將目前正在處理排序的鏈結串列取代,分別 push `pivot` 和 `right`。 注意如果 `left` 或 `right` 裡沒有東西,也會需要 push 空串列的 `head` 進去。 ```c // 1 list_splice_tail_init(left, now_entry_head); INIT_LIST_HEAD(left); // 2 begin_t *mid_begin = new_begin(mid); list_add_tail(&mid_begin->list, begins); INIT_LIST_HEAD(mid); // 3 begin_t *right_begin = new_begin(right); list_add_tail(&right_begin->list, begins); INIT_LIST_HEAD(right); ``` `top` 的指標往上 2 格。`pop` 時則是往下一格。 ```c // push 2 elements into stack now = now->next->next; // pop 1 element out of stack now = now->prev; ``` 操作大致上是這樣,注意當要 pop `begins` 的節點時,由於內部是儲存鏈結串列的頭,所以要記得呼叫 `list_free` 來釋放記憶體空間。 #### 實驗結果 - 原本的程式的記憶體用量 ![image](https://hackmd.io/_uploads/HkEBSiFap.png) - 使用鏈結串列實作堆疊的記憶體使用量 ![image](https://hackmd.io/_uploads/SyTzPsFaT.png) 在 100000 筆資料的情況下,用鏈結串列去實作堆疊可以節省約 0.8 MB 的記憶體。 ### 測驗二 合併串列時,若 run 的數量等於或略小於 2 的冪,效率會最高;若略大於 2 的冪,效率就特別低。 所以要定義 minrun 來控制每個 run 的長度。 而為了避免一開始就掃描整個資料來產生 run, timsort 會用堆疊來暫存 run,運用 `merge_collapse` 函式來達成,該函式的功能如下: - 檢查 A 的長度要大於 B 和 C 的長度總和 - 檢查 B 的長度要大於 C 的長度 注意合併只能在相鄰的 2 個 run 進行,以確保穩定性。 ##### Galloping mode 在合併兩個序列 (A & B) 時,考慮到它們都是已經排序好的數列,因此可以加快比較的過程。 首先尋找 B 的首個元素 ( $B_0$ ) 在 A 中的排序位置,即可確定 A 中有一小段是小於 $B_0$,就不需處理這段,接著再尋找 A 的首個元素在 B 中的排序位置,如此反覆進行。 #### 程式碼解讀 ##### `find_run` 如果目前節點比下一個節點大,則代表是降冪排列,需要將序列倒轉。`do while` 迴圈中間在將 `list` 所在節點與 `next` 交換,直到下一個元素大於等於目前元素,或沒有下一元素就停止。 ```c if (cmp(priv, list, next) > 0) { /* decending run, also reverse the list */ struct list_head *prev = NULL; do { len++; list->next = prev; prev = list; list = next; next = list->next; head = list; } while (next && cmp(priv, list, next) > 0); list->next = prev; } ``` 如果目前是升冪排列,則找到此升冪排列的最後一個元素 ```c else { do { len++; list = next; next = list->next; } while (next && cmp(priv, list, next) <= 0); list->next = NULL; } ``` 假設有 1 序列如下: ```graphviz digraph structs { node[shape=plaintext]; prev [fontcolor="black"] list [fontcolor="black"] next [fontcolor="black"] head [fontcolor="black"] node[shape=box]; struct0 [label= "3"] struct1 [label= "2"] struct2 [label= "1"] struct3 [label= "3"] { rank="same"; struct0 -> struct1 struct1 -> struct2 struct2 -> struct3 } { list -> struct0 head -> struct0 next -> struct1 } } ``` 做完一次迴圈會變成這樣: ```graphviz digraph structs { node[shape=plaintext]; prev [fontcolor="black"] list [fontcolor="black"] next [fontcolor="black"] head [fontcolor="black"] node[shape=box]; struct0 [label= "3"] struct1 [label= "2"] struct2 [label= "1"] struct3 [label= "3"] { rank="same"; struct1 -> struct2 struct2 -> struct3 } { list -> struct1 prev -> struct0 head -> struct1 next -> struct2 } } ``` 第二次: ```graphviz digraph structs { node[shape=plaintext]; prev [fontcolor="black"] list [fontcolor="black"] next [fontcolor="black"] head [fontcolor="black"] node[shape=box]; struct0 [label= "3"] struct1 [label= "2"] struct2 [label= "1"] struct3 [label= "3"] { rank="same"; struct1 -> struct0 struct2 -> struct3 } { list -> struct2 prev -> struct1 head -> struct2 next -> struct3 } } ``` 結束後就會跳出 `while` 迴圈了,最後再設定 `list->next = prev`,與後續操作。 ```c head->prev = NULL; head->next->prev = (struct list_head *) len; result.head = head, result.next = next; return result; ``` ```graphviz digraph structs { node[shape=plaintext]; prev [fontcolor="black"] list [fontcolor="black"] next [fontcolor="black"] head [fontcolor="black"] node[shape=box]; struct0 [label= "3"] struct1 [label= "2"] struct2 [label= "1"] struct3 [label= "3"] { rank="same"; struct1 -> struct0 struct2 -> struct1 } { list -> struct2 prev -> struct1 head -> struct2 next -> struct3 } } ``` ##### `*merge_collapse` 只要目前有超過 2 個 run,就會持續進行合併操作。 ```c while ((n = stk_size) >= 2) ``` 如果前 4 個 run 中, - run B 的長度小於等於 run C + run D - run A 的長度小於等於 run B + run C 就會啟動合併。 ```c if ((n >= 3 && run_size(tp->prev->prev) <= run_size(tp->prev) + run_size(tp)) || (n >= 4 && run_size(tp->prev->prev->prev) <= run_size(tp->prev->prev) + run_size(tp->prev))) ``` ```graphviz digraph structs { node[shape=plaintext]; A [fontcolor="black"] B [fontcolor="black"] C [fontcolor="black"] D [fontcolor="black"] node[shape=box]; struct0 [label= "tp"] struct1 [label= "tp->prev"] struct2 [label= "tp->prev->prev"] struct3 [label= "tp->prev->prev->prev"] { rank="same"; struct0 -> struct1 struct1 -> struct2 struct2 -> struct3 } { A -> struct3 B -> struct2 C -> struct1 D -> struct0 } } ``` 怎麼決定要合併哪個 run: - run B 小於 run D:合併 run B 與 run C - 否則合併 run C 與 run D ```c if (run_size(tp->prev->prev) < run_size(tp)) { tp->prev = merge_at(priv, cmp, tp->prev); } else { tp = merge_at(priv, cmp, tp); } ``` 如果只有 2 個 run,且 run C 小於等於 run D,則合併兩者。 ```c } else if (run_size(tp->prev) <= run_size(tp)) { tp = merge_at(priv, cmp, tp); } else { break; } ``` ##### `*merge_force_collapse` 這個步驟會強制合併所有的 run,不須檢查要合併的 run 的長度和是否已經大於另一個 run。 注意,這個步驟會讓堆疊中剩下 2 個 run。 ##### `timsort` 每次先找到一個 run,將這個 run 放入堆疊中,並將 `list` 指標移到剛剛找到的 run 的後面一個節點,然後觀察堆疊中最上面的 4 個 run,看需不需要合併,走訪完串列後,再將剩下的 run 合併。 ```c do { /* Find next run */ struct pair result = find_run(priv, list, cmp); result.head->prev = tp; tp = result.head; list = result.next; stk_size++; tp = merge_collapse(priv, cmp, tp); } while (list); ``` #### 優化方案 :::info [程式碼](<https://github.com/jujuegg/Linux-HW2/tree/master/Timsort>) ::: 我發現在[測驗題目](<https://hackmd.io/@sysprog/linux2024-quiz1#%E6%B8%AC%E9%A9%97-2>)中的 Galloping mode 方法並沒有實現在原本 timsort 的程式中,因此我選擇改優化這部分。但還沒開始實作就想到一個問題,程式是怎麼不用比較就找到要插入的位置的,所以這個想法先被擱置。 再來程式也沒有考慮到 minrun 的長度來調整每次找到的 run,目前想要改進這方面,但實作上遇到一點問題,正在調整中。 ## 第二周測驗題 ### 測驗一 #### 程式碼解讀 用於處理 hash 數值碰撞 ```c struct hlist_node { struct hlist_node *next, **pprev; }; ``` Hash table 中的頭節點,唯一代表一 hash 值 ```c struct hlist_head { struct hlist_node *first; }; ``` ##### `hlist_add_head` 運作流程如下一系列圖:設有一 `hlist_node_new` 節點要執行此函式。首先,若該列表是有節點而非 NULL 的,則調整 `h->first->pprev` 指標(如圖二);接著調整新節點之 `*next` 及 `*pprev` 指標(如圖三);最後將頭節點之 `*first` 指向新節點(如圖四)即可。 ```c static inline void hlist_add_head(struct hlist_node *n, struct hlist_head *h) { if (h->first) h->first->pprev = &n->next; n->next = h->first; //AAAA n->pprev = &h->first; h->first = n; } ``` ```graphviz digraph add_head { rankdir=LR; node[shape=record]; map [label="hash_table |<ht0> |<ht1> |<ht2>hlist_head2.first |<ht3> |<ht4>..."]; hn1 [label="hlist_node1 | {<prev>pprev | <next>next}"]; hn2 [label="hlist_node2 | {<prev>pprev | <next>next}"]; hn3 [label="hlist_node_new | {<prev>pprev | <next>next}"]; node [shape=none] null [label=NULL] map:ht2 -> hn1; hn1:next -> hn2; hn1:prev -> map:ht2; hn2:next -> null; hn2:prev -> hn1:next; } ``` ```graphviz digraph add_head { rankdir=LR; node[shape=record]; map [label="hash_table |<ht0> |<ht1> |<ht2>hlist_head2.first |<ht3> |<ht4>..."]; hn1 [label="hlist_node1 | {<prev>pprev | <next>next}"]; hn2 [label="hlist_node2 | {<prev>pprev | <next>next}"]; hn3 [label="hlist_node_new | {<prev>pprev | <next>next}"]; node [shape=none] null [label=NULL] map:ht2 -> hn1; hn1:next -> hn2; hn1:prev -> hn3:next; hn2:next -> null; hn2:prev -> hn1:next; } ``` ```graphviz digraph add_head { rankdir=LR; node[shape=record]; map [label="hash_table |<ht0> |<ht1> |<ht2>hlist_head2.first |<ht3> |<ht4>..."]; hn1 [label="hlist_node1 | {<prev>pprev | <next>next}"]; hn2 [label="hlist_node2 | {<prev>pprev | <next>next}"]; hn3 [label="hlist_node_new | {<prev>pprev | <next>next}"]; node [shape=none] null [label=NULL] map:ht2 -> hn1; hn1:next -> hn2; hn1:prev -> hn3:next; hn2:next -> null; hn2:prev -> hn1:next; hn3:next -> hn1; hn3:prev -> map:ht2; } ``` ```graphviz digraph add_head { rankdir=LR; node[shape=record]; map [label="hash_table |<ht0> |<ht1> |<ht2>hlist_head2.first |<ht3> |<ht4>..."]; hn1 [label="hlist_node1 | {<prev>pprev | <next>next}"]; hn2 [label="hlist_node2 | {<prev>pprev | <next>next}"]; hn3 [label="hlist_node_new | {<prev>pprev | <next>next}"]; node [shape=none] null [label=NULL] map:ht2 -> hn3; hn1:next -> hn2; hn1:prev -> hn3:next; hn2:next -> null; hn2:prev -> hn1:next; hn3:next -> hn1; hn3:prev -> map:ht2; } ``` ##### `*dfs` `pre_low/high` 即該 `*preorder` 之左右範圍;`in_low/high` 同理,為 `*inorder` 之左右範圍。 要知道的是,此測驗一之程式碼即是要重建二元樹,且 dfs 存取順序實際上就是 preorder,所以會傳 `preorder[pre_low]`(表中間節點) 進 `find()`,以尋找該節點在 inorder 中的位置索引 `idx`,找到後「切分左右子樹(`tn->left` 及 `tn->right`)」,分別再對 inorder 的 `in_low ~ (idx-1)` 與 `(idx+1) ~ in_high` 作 dfs。 由 preorder 可知哪個節點在上面(越靠前的越上面)、而由 inorder 可知哪個節點靠左(越靠前的越左邊),於是可得知 preorder 的第一個元素一定是 root 的值,且該元素對應到 inorder 後,左邊的值就在左邊,右邊的值就在右邊。 接著就將 left 和 right 分別當作新的 root node 下去做即可。遞迴的中止條件是在 inorder 中找不到對應元素。 ```c static struct TreeNode *dfs(int *preorder, int pre_low, int pre_high, int *inorder, int in_low, int in_high, struct hlist_head *in_heads, int size) { if (in_low > in_high || pre_low > pre_high) return NULL; struct TreeNode *tn = malloc(sizeof(*tn)); tn->val = preorder[pre_low]; int idx = find(preorder[pre_low], size, in_heads); tn->left = dfs(preorder, pre_low + 1, pre_low + (idx - in_low), inorder, in_low, idx - 1, in_heads, size); tn->right = dfs(preorder, pre_high - (in_high - idx - 1), pre_high, inorder, idx + 1, in_high, in_heads, size); return tn; } ``` ##### `buildTree` 建樹的程式碼。起初會先將 inorder 節點建立一 hash table 以方便 `dfs()` 函式的內部實作,之後再呼叫 `dfs()` 函式,遞迴求出該二元樹並回傳結果。 ```c static struct TreeNode *buildTree(int *preorder, int preorderSize, int *inorder, int inorderSize) { struct hlist_head *in_heads = malloc(inorderSize * sizeof(*in_heads)); for (int i = 0; i < inorderSize; i++) INIT_HLIST_HEAD(&in_heads[i]); for (int i = 0; i < inorderSize; i++) node_add(inorder[i], i, inorderSize, in_heads); return dfs(preorder, 0, preorderSize - 1, inorder, 0, inorderSize - 1, in_heads, inorderSize); } ``` #### 測試程式 :::info [程式碼](<https://github.com/jujuegg/Linux-HW2/tree/master/BinaryTreeTesting>) ::: 設定一 preorder 及 inorder 的序列,其會有一對應的樹,也會有對應的 postorder 序列。本測試程式即比較 buildTree 後的 postorder,與期望的 postorder 有無差別,若無,表示程式運作無誤。 #### 嘗試改進 - 用紅黑樹代替鏈表之儲存資料方式 用 Hash table 找特定資料的時間複雜度通常是 $O(1)$,但在最差情況下(目標資料在鏈表尾端),就可能會達到 $O(n)$。 因為碰撞發生的時候,要線性查找鏈表以找到目標資料。若用紅黑樹儲存的話,其可以保持樹的高度平衡,來保證在最壞情況下的查找時間複雜度降為 $O(log n)$。 比起 Hash table,紅黑樹在處理碰撞時不用線性查找鏈表,而是透過比較節點值來定位目標資料,所以其在查找上的性能更為穩定,特別是在「處理大量資料」或「碰撞發生頻繁」的情況下,紅黑樹的優勢會更明顯。 #### 探討 Pre-order Walk 參照 [linux/kernel/cgroup/cgroup.c](https://github.com/torvalds/linux/blob/master/kernel/cgroup/cgroup.c),發現 `css_next_descendant_pre()` 實作了 Pre-order Walk。 - `css_next_descendant_pre()`:找到下一個要進行 Preorder Traversal 的後代節點 - `@pos`:當前位置 - `@root`:要走訪其後代節點的 CSS ```c struct cgroup_subsys_state * css_next_descendant_pre(struct cgroup_subsys_state *pos, struct cgroup_subsys_state *root) { struct cgroup_subsys_state *next; cgroup_assert_mutex_or_rcu_locked(); /* if first iteration, visit @root */ if (!pos) return root; /* visit the first child if exists */ next = css_next_child(NULL, pos); if (next) return next; /* no child, visit my or the closest ancestor's next sibling */ while (pos != root) { next = css_next_child(pos, pos->parent); if (next) return next; pos = pos->parent; } return NULL; } ``` 尋找要訪問的下一個後代節點,以進行 `@root` ((含)第一個要訪問的節點)的後代節點的 Preorder Traversal。 雖然此函式需要 cgroup_mutex 或 RCU read locking,但不需要整個走訪過程都跑在 critical section 裡。 只要 `@pos` 和 `@root` 可訪問,且 `@pos` 是 `@root` 的後代節點,其就會回傳下一個後代節點。 因此,其用於實現對 cgroup 子系統狀態的一種很有效率的 Pre-access traversal。 且在「遞迴訪問 cgroup 的後代來走訪整個 cgroup 樹」的過程中,還保持對 cgroup_mutex 或 RCU read locking 的佔用,確保不會有多餘一個 process 跑進 critical section。 ### 測驗二 #### 程式碼解讀 `hlist_del()` 的操作邏輯如下系列圖(假設要刪除 node1),簡單來說就是刪除一個鏈表節點。 ```graphviz digraph hlist_del { rankdir=LR; node[shape=record]; map [label="hash_table |<ht0> |<ht1> |<ht2>hlist_head2.first |<ht3> |<ht4>..."]; hn0 [label="hlist_node0 | {<prev>pprev | <next>next}"]; hn1 [label="hlist_node1(n) | {<prev>pprev | <next>next}"]; hn2 [label="hlist_node2 | {<prev>pprev | <next>next}"]; node [shape=none] null [label=NULL] map:ht2 -> hn0; hn0:next -> hn1; hn0:prev -> map:ht2; hn1:next -> hn2; hn1:prev -> hn0:next; hn2:next -> null; hn2:prev -> hn1:next; } ``` ```graphviz digraph hlist_del { rankdir=LR; node[shape=record]; map [label="hash_table |<ht0> |<ht1> |<ht2>hlist_head2.first |<ht3> |<ht4>..."]; hn0 [label="hlist_node0 | {<prev>pprev | <next>next}"]; hn1 [label="hlist_node1(n) | {<prev>pprev | <next>next}"]; hn2 [label="hlist_node2 | {<prev>pprev | <next>next}"]; node [shape=none]; null [label=NULL]; node [shape=none]; next [label=next]; next -> hn2; pprev [label=pprev]; pprev -> hn0:next; map:ht2 -> hn0; hn0:next -> hn1; hn0:prev -> map:ht2; hn1:next -> hn2; hn1:prev -> hn0:next; hn2:next -> null; hn2:prev -> hn1:next; } ``` ```graphviz digraph hlist_del { rankdir=LR; node[shape=record]; map [label="hash_table |<ht0> |<ht1> |<ht2>hlist_head2.first |<ht3> |<ht4>..."]; hn0 [label="hlist_node0 | {<prev>pprev | <next>next}"]; hn1 [label="hlist_node1(n) | {<prev>pprev | <next>next}"]; hn2 [label="hlist_node2 | {<prev>pprev | <next>next}"]; node [shape=none]; null [label=NULL]; node [shape=none]; next [label=next]; next -> hn2; pprev [label=pprev]; pprev -> hn0:next; map:ht2 -> hn0; hn0:next -> hn2; hn0:prev -> map:ht2; hn1:next -> hn2; hn1:prev -> hn0:next; hn2:next -> null; hn2:prev -> hn0:next; } ``` LRU Cache 及 LRU Node 的結構如下: - `capacity`:該 LRU cache 「能紀錄的資料量」 - `count`:當前該 LRU cache 紀錄的「資料筆數」 - `dhead`:一 Doubly Linked List,將所有資料串接起來 - `hhead`:處理碰撞使用的 hlist 結構體陣列 ```c typedef struct { int capacity; int count; struct list_head dhead; struct hlist_head hhead[]; } LRUCache; typedef struct { int key; int value; struct hlist_node node; struct list_head link; } LRUNode; ``` ```graphviz digraph LRUcache { node [shape=record]; rankdir = LR; subgraph cluster_a { label = LRUCache; struct0 [label="capacity"]; struct1 [label="count"]; struct2 [label="dhead | {<prev>prev | <next>next}"]; struct3 [label="<head>hhead[0] | hhead[1] | ..."]; } subgraph cluster_b { label = LRUNode1; struct4 [label="key1"]; struct5 [label="value1"]; struct6 [label="link | {<prev>prev | <next>next}"]; struct7 [label="node | {<prev>pprev | <next>next}"]; } subgraph cluster_c { label = LRUNode2; struct8 [label="key2"]; struct9 [label="value2"]; struct10 [label="link | {<prev>prev | <next>next}"]; struct11 [label="node | {<prev>pprev | <next>next}"]; } node [shape=none]; null1 [label="NULL"]; null2 [label="NULL"]; struct2:next -> struct6; struct6:prev -> struct2; struct6:next -> struct10; struct10:prev -> struct6; struct10:next -> null1; struct3:head -> struct7; struct7:prev -> struct3:head; struct7:next -> struct11; struct11:prev -> struct7; struct11:next -> null2; } ``` ##### `lRUCacheCreate` 建立一 LRU Cache 並初始化其內部變數、結構,並指定其能紀錄的資料量。 ##### `lRUCacheFree` 釋放該 Cache 的 dhead 串內,指定 LRU 節點的記憶體空間(`list_head link`)。 ```c void lRUCacheFree(LRUCache *obj) { struct list_head *pos, *n; list_for_each_safe (pos, n, &obj->dhead) { LRUNode *cache = list_entry(pos, LRUNode, link); //FFFF list_del(&cache->link); free(cache); } free(obj); } ``` ##### `lRUCacheGet` 其 hash 值是透過「key % 該 cache 之可記錄資料量」計算的。 取得 hash 值後,到該 hash 值對應的 `hhead[hash]` 走訪該鏈表,若有找到對應 key 值,則回傳該節點之 `value`;否則回傳 `-1`,表未有此 key 值。 從這裡可揣摩出使用 LRU Cache 的好處及意義。若 hash function 設計得當,找到對應資料需要的時間複雜度會很小;倘若只使用 dhead 雙向鏈結串列,走訪整條串列來找資料會花很久的時間,最差能到 $O(n)$。 ```c int lRUCacheGet(LRUCache *obj, int key) { int hash = key % obj->capacity; struct hlist_node *pos; hlist_for_each (pos, &obj->hhead[hash]) { LRUNode *cache = list_entry(pos, LRUNode, node); //HHHH if (cache->key == key) { list_move(&cache->link, &obj->dhead); //IIII return cache->value; } } return -1; } ``` ##### `lRUCachePut` 先找看看對應 hash 值的 `hhead[hash]` 鏈表內有無對應 key 值之節點。若有,則將該節點「移到雙向鏈結串列的最前端」,表「**最近使用過**」,並指定該 cache 之 value 為此節點之值(更新其值);若無對應 key 值之節點,表「**Cache Miss**」,有兩種運作情況: - 若**快取已滿**,須將「雙向鏈結串列 dhead 最尾端的節點(表 Least Recently Used 節點)」刪除,並將新節點插入到 hhead 的頭部,同時添加到 dhead 的前面。 - 若**快取未滿**,則創建一個新的節點,並將其添加到 hhead 的頭部,同時添加到 dhead 的最前端,更新快取的 key 值。 #### 測試程式 :::info [程式碼](<https://github.com/jujuegg/Linux-HW2/tree/master/LRUCacheTesting>) ::: 撰寫一 `checkLRU()` 函式回傳 bool,搭配 `assert()` 以測試 dhead 中的首個元素,是否為最近互動過的 key 值節點。 #### 嘗試改進 `lRUCachePut()`:Cache Hit 時,除了將節點移動 dhead 最前端,對應 hash 值的`hhead[hash]`內的對應節點,也應該要移到最前端。 ```diff if (c->key == key) { list_move(KKKK, &obj->dhead); cache = c; + hlist_del(&c->node); + hlist_add_head(&c->node, &obj->hhead[hash]); } ``` #### 探討 Linux 的 LRU 相關機制 比較常用到的 page 會被放到 `active` 鏈表,不常用的 page 則放到 `inactive` 鏈表中。兩列表的 page 可相互轉移,若 `active` 的 page 越來越不常使用,其可能被轉放到 `inactive` 鏈表。 Linux 在作 Page 回收時,會先從 `inactive` 鏈表的==尾部==開始回收。 每個 zone 有 5 個 LRU 鏈表,用來放不同「最近使用狀態」的 pages,其中 INACTIVE_ANON、ACTIVE_ANON、INACTIVE_FILE 及 ACTIVE_FILE 中的 pages 是可以回收的。 ```c enum lru_list { LRU_INACTIVE_ANON = LRU_BASE, LRU_ACTIVE_ANON = LRU_BASE + LRU_ACTIVE, LRU_INACTIVE_FILE = LRU_BASE + LRU_FILE, LRU_ACTIVE_FILE = LRU_BASE + LRU_FILE + LRU_ACTIVE, LRU_UNEVICTABLE, NR_LRU_LISTS }; ``` 另外,為了評估 page 的常用程度,Kernel 引入了 `PG_referenced` 及 `PG_active` 兩個 bits。其一表示「當前活躍程度」,其一表示「最近是否被引用過」。 ![圖片](https://hackmd.io/_uploads/BJq-4Udpa.png) > Refer from [Linux Kernel 內存回收機制](http://www.wowotech.net/memory_management/233.html/comment-page-2)。 基本上有以下步驟: 1. 如果 page 很常用,設定 PG_active bit,並將其放在 ACTIVE LRU 鏈表;反之在 INACTIVE。 2. 每次存取 page 時,透過 `mark_page_accessed()` 來設定 PG_referenced bit。 3. 使用 `page_referenced()` 函式,透過 PG_referenced bit 以及「由逆向映射提供的信息」來確定 page 常用程度(每次清除該 bit 時都會檢測)。 4. 再次呼叫 `mark_page_accessed()`。此時,如果發現 PG_referenced bit 已被設定,表 `page_referenced()` 沒有作檢查,因此對於 `mark_page_accessed()` 的呼叫比 `page_referenced()` 更頻繁-表該 page 經常被存取。 > 如果該 page 位於 INACTIVE 鏈表,將其移動到 ACTIVE,此外還會設定 PG_active bit,並清除 PG_referenced bit; 5. 也有可能從 ACTIVE 移到 INACTIVE-在 page 比較不常用時,可能連續呼叫兩次 `page_referenced()` 而途中沒有 `mark_page_accessed()`。 如果存取內存 page 的頻率穩定,表對 `page_referenced()` 和 `mark_page_accessed()` 的呼叫大致上平衡,此時會把 page 放在「當前 LRU 鏈表」。 此動作確保了 page 不會在 ACTIVE 與 INACTIVE 鏈表間跳來跳去。 #### Page 回收的實現 主要有兩種觸發方式: - 內存嚴重不足。透過 `try_to_free_pages()` 函式來檢查內存區域的 pages。 - 透過後台程序 `kswapd` 來觸發,其會週期性檢查內存夠不夠,若不足,則會觸發 page 回收機制。使用 `balance_pgdat()` 作為媒介。 參照 [linux/mm/vmscan.c](https://github.com/torvalds/linux/blob/master/mm/vmscan.c)。 當呼叫 try_to_free_pages() 時,其會重複呼叫 `shrink_zones()` 和 `shrink_slab()` 來釋放一定數量的 pages(預設為 32 個)。而 `shrink_zones()` 會分別對內存區域列表中的所有區域呼叫 `shrink_node()` 函式-其即是「從內存中**回收較少使用的 pages**」的核心函式。 ```c= static void shrink_node(pg_data_t *pgdat, struct scan_control *sc) { unsigned long nr_reclaimed, nr_scanned, nr_node_reclaimed; struct lruvec *target_lruvec; bool reclaimable = false; if (lru_gen_enabled() && root_reclaim(sc)) { lru_gen_shrink_node(pgdat, sc); return; } target_lruvec = mem_cgroup_lruvec(sc->target_mem_cgroup, pgdat); again: memset(&sc->nr, 0, sizeof(sc->nr)); nr_reclaimed = sc->nr_reclaimed; nr_scanned = sc->nr_scanned; prepare_scan_control(pgdat, sc); shrink_node_memcgs(pgdat, sc); flush_reclaim_state(sc); nr_node_reclaimed = sc->nr_reclaimed - nr_reclaimed; if (!sc->proactive) vmpressure(sc->gfp_mask, sc->target_mem_cgroup, true, sc->nr_scanned - nr_scanned, nr_node_reclaimed); if (nr_node_reclaimed) reclaimable = true; if (current_is_kswapd()) { if (sc->nr.writeback && sc->nr.writeback == sc->nr.taken) set_bit(PGDAT_WRITEBACK, &pgdat->flags); if (sc->nr.unqueued_dirty == sc->nr.file_taken) set_bit(PGDAT_DIRTY, &pgdat->flags); if (sc->nr.immediate) reclaim_throttle(pgdat, VMSCAN_THROTTLE_WRITEBACK); } if (sc->nr.dirty && sc->nr.dirty == sc->nr.congested) { if (cgroup_reclaim(sc) && writeback_throttling_sane(sc)) set_bit(LRUVEC_CGROUP_CONGESTED, &target_lruvec->flags); if (current_is_kswapd()) set_bit(LRUVEC_NODE_CONGESTED, &target_lruvec->flags); } if (!current_is_kswapd() && current_may_throttle() && !sc->hibernation_mode && (test_bit(LRUVEC_CGROUP_CONGESTED, &target_lruvec->flags) || test_bit(LRUVEC_NODE_CONGESTED, &target_lruvec->flags))) reclaim_throttle(pgdat, VMSCAN_THROTTLE_CONGESTED); if (should_continue_reclaim(pgdat, nr_node_reclaimed, sc)) goto again; if (reclaimable) pgdat->kswapd_failures = 0; } ``` 其第 22 行的 `shrink_node_memcgs()` 會呼叫 `shrink_lruvec()` 及 `shrink_slab()` 等與 page 回收相關的函式,並會直/間接呼叫下列兩函式: - `shrink_active_list()`: 將某些 pages 移到 active 鏈表。 - `shrink_inactive_list()`:inactive 鏈表中選擇一定數量的 pages 放到一臨時鏈表中,最終呼叫 [page_alloc.c](https://github.com/torvalds/linux/blob/master/mm/page_alloc.c#L2520) 中定義的`free_unref_page_list()` 函式將其回收。 ### 測驗三 #### 程式碼理解 這些巨集定義了一些用來計算**對應數字的二進制表示**中「包含多少個位元值為 1」的函式,其可以針對 8-bit、16-bit、32-bit、64-bit 等數字去執行。 ```c #define __const_hweight8(w) \ ((unsigned int) ((!!((w) & (1ULL << 0))) + (!!((w) & (1ULL << 1))) + \ (!!((w) & (1ULL << 2))) + (!!((w) & (1ULL << 3))) + \ (!!((w) & (1ULL << 4))) + (!!((w) & (1ULL << 5))) + \ (!!((w) & (1ULL << 6))) + (!!((w) & (1ULL << 7))))) #define __const_hweight16(w) (__const_hweight8(w) + __const_hweight8((w) >> 8)) #define __const_hweight32(w) \ (__const_hweight16(w) + __const_hweight16((w) >> 16)) #define __const_hweight64(w) \ (__const_hweight32(w) + __const_hweight32((w) >> 32)) ``` ##### `FIND_NTH_BIT` 此巨集用來「在指定的位圖中,由左至右,查找第 n 個 **set bit**(即位元值為 1 的位元)所對應的索引」。 首先計算字中被設置的位元數目,然後比較以找到第 `num` 個被設置的位元。如果找到了,回傳該位元的索引值;否則,繼續往下個位元找。 ##### `__clear_bit` 傳入一 `addr`,設定其「特定的位元」為 0。 ```c static inline void __clear_bit(unsigned long nr, volatile unsigned long *addr) { unsigned long mask = BIT_MASK(nr); unsigned long *p = ((unsigned long *) addr) + BIT_WORD(nr); *p &= ~mask; } ``` ##### `fns` 與 `ffs()` 不同的是,此函式目的是在 `word` 中尋找並回傳「第 n 個被設置過的位元(即位元 1)之索引值」。如果 `word` 中沒有位元被設置過,則回傳 `BITS_PER_LONG`。 其用到 `__ffs()` 來找每次循環中「第一個被設置的位元」,然後將其索引位置存進 `bit` 變數中。一旦 n 的值為 0,則立即回傳當前的 bit 值。否則,用 `__clear_bit()` 清除該位元(成 0),然後 n 減少 1。反覆執行,直到找到第 n 個被設置的位元,最後回傳其索引位置。 舉例來說:假設 `word` 的二進位表示為 `11001001`,而我們想要找到第 2 個被設置的位元(即由右至左,找到第 2 個位元 1 的位置)。首先,`__ffs()` 先回傳 0,因為第一個被設置的位元在索引 0 上,然後清除此位元,繼續尋找下一個被設置的位元。照此邏輯,最後會回傳索引值 **3**。 ```c static inline unsigned long fns(unsigned long word, unsigned int n) { while (word) { unsigned int bit = __ffs(word); if (n-- == 0) return bit; __clear_bit(bit, &word); } return BITS_PER_LONG; } ``` ##### `find_nth_bit` 此函式用來「在一個記憶體區域中找到第 n 個被設置的位元的索引位置」。 其會先檢查傳入的 `size` 是否小於等於機器字長(`BITS_PER_LONG`,透過 `small_const_nbits(size)` 檢查),並且大於 0。 - 如果是,表示位元索引在 `size` 內,這時**可以直接對記憶體中的位元進行處理,而不用跨字節**-先用 `GENMASK()` 設定一個遮罩,把要處理的範圍遮出來;然後,將遮罩後的值傳進 `fns()` 來找第 n 個被設置的位元的索引位置。 - 否則,需要**在處理位元時跨越字節邊界**。需呼叫 `FIND_NTH_BIT()`,來處理跨字節邊界的情況。 #### Linux 的 `find_nth_bit()` 應用案例之解說與分析 `find_nth_bit()` 通常用在處理器排程相關的操作中。透過 CPU mask 等技術實作相關位元操作,來表示程序的 CPU 指定,而要得知某程序中的 CPU 指定狀況,就有可能會需要用到 `find_nth_bit()` 的函式邏輯。 對該函式延伸搜尋,可知其在 Linux 原始程式碼中使用在 [linux/kernel/trace/trace_events_filter.c](https://github.com/torvalds/linux/blob/master/kernel/trace/trace_events_filter.c#L688) 中的 `do_filter_cpumask_scalar()` 函式,而其又是以 `cpumask_nth()` 函式來呼叫 `find_nth_bit()`。其主要目的就是透過 `find_nth_bit()` 函式,來找尋 cpu mask 中的第 n 個 CPU 之位置。 對於 CPU mask 的概念與詳細理解,可參閱 [Linux Kernel:CPU 狀態管理之 cpumask](https://zhuanlan.zhihu.com/p/670194359)。 利用 struct cpumask 這個 bitmap 來代表系統當中 CPU 的集合,每個 bit 代表一個 CPU,其中只有 `nr_cpu_ids` 個位元是有效的。 #### CPU Affinity 其又稱 Processor Affinity。 指在 Linux 中,process 想儘量長時間運行在某個指定的 CPU 上 ,且不被轉移至其他 CPU 的傾向。 在 Symmetric Multi-Processing 的架構下,Linux 排程器會根據 CPU Affinity 的設置,讓指定的程序運行在「指定」的 CPU 上,而不會在別的 CPU 上運行。 CPU Affinity 會用 bitmask 表示,每個位元表一個 CPU,若為 1,則表示「已指定」。 最低位元表第一個 CPU;最高位元則表最後一個 CPU。 > 舉個例子(用十六進制表示): `0x00000001` 表指定*處理器 1*; `0xFFFFFFFF` 表指定*所有處理器(0~31)*。 參照 [linux/kernel/sched/core.c](https://github.com/torvalds/linux/blob/master/kernel/sched/core.c#L8396),其檔案內容與處理排程非常相關,其中有 `sched_setaffinity()` 函式也用到了 CPU mask 的概念,而這也是使用到 `find_nth_bit()` 的前置步驟,沒有處理器遮罩,就不會有相關位元操作來找 CPU 了。 可透過 `sched_setaffinity` 函式來指定 CPU Affinity。