# 2024q1 Homework2 (quiz1+2)
contributed by < `jujuegg` >
## 第一周測驗題
### 測驗一
本測驗是在鏈結串列上實作 quick sort,且是非遞迴版本的,使用==堆疊==來模擬原本的遞迴呼叫。
#### 程式碼解讀
```c
// variables
node_t *begin[max_level], *end[max_level];
node_t *result = NULL, *left = NULL, *right = NULL;
```
- `begin` 與 `end` 是用來記錄 `pivot` 未完成排序的節點。
- `max_level` 是串列長度的 2 倍。
- `left` 與 `right` 分別存放比 `pivot` 大/小 的節點。
```c
begin[0] = *list;
end[0] = list_tail(list);
```
`begin` 紀錄頭端,而 `end` 紀錄尾端,所以一開始把整個串列視為未完成排序。
```c
node_t *L = begin[i], *R = end[i];
```
每一輪使用 `L` 和 `R` 此輪要排序的串列的頭和尾端。`i` 是用來告知程式碼目前要處理的未完成排序的串列的所在位置。
##### 此輪有不只一個節點時
```c
node_t *pivot = L;
value = pivot->value;
node_t *p = pivot->next;
pivot->next = NULL;
```
如果兩個節點的值不一樣,則代表長度 > 1,即可把第一個節點當作 pivot,也就是 `L` 節點。
接著用 `p` 來走訪整個串列。
```c
while (p) {
node_t *n = p;
p = p->next; //CCCC
list_add(n->value > value ? &right : &left, n);
}
```
`p` 會提前跑到下一個節點的位置,有點像 `list_for_each_safe` 的概念。並把目前節點與 `pivot` 比較,大的就加進 `right`;小於等於的加進 `left`。
```c
begin[i] = left;
end[i] = list_tail(&left); //DDDD
begin[i + 1] = pivot;
end[i + 1] = pivot;
begin[i + 2] = right;
end[i + 2] = list_tail(&right); //EEEE
left = right = NULL;
i += 2;
```
分好大小邊後,把 `left` 裡面的節點設置為「第 ==i== 個」未處理的串列,把 `right` 裡面的節點設置為「第 ==i+2== 個」未處理的串列。而 `pivot` 設置為「第 ==i+1== 個」未處理的串列。
最後把兩邊清空,調整 `i`,準備下一輪。
##### 此輪 只有一個節點/沒有節點 時
```c
if (L)
list_add(&result, L);
i--;
```
若此輪只有一個節點,此時它會是所有未完成排序的節點中值最大的,直接將其加到最終串列中。這蠻好理解的,因為每輪結束的時候,`i` 會加 2,代表程式選取 `right` 串列作為下一輪的輸入。
要注意,如果上一輪中,沒有存在比 `pivot` 大的值,那這邊有可能會是空的。
#### 圖解
- 設 `list` 如下:
```graphviz
digraph structs {
node[shape=plaintext];
begin_0 [fontcolor="red"];
end_0 [fontcolor="red"];
pivot [fontcolor="black"]
node[shape=box];
struct0 [label= "3"];
struct1 [label= "4"];
struct2 [label= "5"];
struct3 [label= "2"];
struct4 [label= "1"];
{
rank="same";
struct0 -> struct1
struct1 -> struct2
struct2 -> struct3
struct3 -> struct4
}
begin_0 -> struct0
end_0 -> struct4
pivot -> struct0
}
```
- 把 `pivot` 以外的所有節點逐一跟 `pivot` 比較,並分到 `right` / `left` 中。
```graphviz
digraph structs {
node[shape=plaintext];
pivot [fontcolor="black"]
right [fontcolor="blue"]
left [fontcolor="blue"]
node[shape=box];
struct0 [label= "3"];
struct1 [label= "4"];
struct2 [label= "5"];
struct3 [label= "2"];
struct4 [label= "1"];
{
rank="same";
struct2 -> struct1
struct4 -> struct3
}
left -> struct4
pivot -> struct0
right -> struct2
}
```
- 把這 3 個串列分別放入 `begin` 與 `end` 正確的位置。下一輪則用 `i == 2` 的位置進行排序。
```graphviz
digraph structs {
node[shape=plaintext];
begin_0 [fontcolor="red"]
begin_1 [fontcolor="orange"]
begin_2 [fontcolor="purple"]
end_0 [fontcolor="red"]
end_1 [fontcolor="orange"]
end_2 [fontcolor="purple"]
node[shape=box];
struct0 [label= "3"];
struct1 [label= "4"];
struct2 [label= "5"];
struct3 [label= "2"];
struct4 [label= "1"];
{
rank="same";
struct2 -> struct1
struct4 -> struct3
}
begin_0 -> struct4
end_0 -> struct3
begin_1 -> struct0
end_1 -> struct0
begin_2 -> struct2
end_2 -> struct1
}
```
#### 優化方案
- 或許 `begin` 與 `end` 也可以用鏈結串列來儲存?
- 動機:程式利用 `i` 來存取這兩個陣列,但 `i` 在執行程式的過程中並不會大範圍跳躍,最多連續加 2 而已,可以避免宣告不必要的記憶體空間。
- 使用雙向鏈結串列。
#### 使用 Linux 核心風格的 List API 改寫
:::info
[程式碼](<https://github.com/jujuegg/Linux-HW2/tree/master/QuickSort>)
:::
首先把 `quick_sort.h` 裡面宣告的結構改成用 `list_head`,以及把上個作業的 `list.h` 拿過來使用,有些重複的函式就統一使用內建的 API。
```diff
typedef struct __node {
- struct __node *left, *right;
- struct __node *next;
+ struct list_head list;
long value;
} node_t;
```
:::info
接著是最麻煩的地方,雖然想法很簡單,但是我卻在這邊花了將近半天的時間,期間一直發生 Segmentation fault,非常的折磨人。但沒辦法,菜就多練...
:::
我建立了另一個結構 `begin_t`,它的作用是儲存那些尚未完成排序的串列的 「`head`」,然後再將此節點存入另一個鏈結串列中,==並不像原本的程式碼是用陣列來模擬堆疊==。原諒我不會取名。
```c
typedef struct begins {
struct list_head list;
struct list_head *head;
} begin_t;
```
現在要使用 Linux 的 API,所以變數形態要更換,且因為使用雙向鏈結串列,所以不須留下 `end`。
```diff
- node_t *begin[max_level], *end[max_level];
- node_t *result = NULL, *left = NULL, *right = NULL;
+ struct list_head *begins = malloc(sizeof(struct list_head));
+ struct list_head *result = malloc(sizeof(struct list_head));
+ struct list_head *left = malloc(sizeof(struct list_head));
+ struct list_head *right = malloc(sizeof(struct list_head));
+ struct list_head *mid = malloc(sizeof(struct list_head));
```
一開始把整個串列的 `head` 放進 `begins` 中。
`now` 是用來模擬堆疊的 `top` 的位置。
`begins` 則是目前==所有==未完成排序的串列所在的堆疊。
```c
begin_t *all = new_begin(*head);
list_add_tail(&all->list, begins);
struct list_head *now = begins->next;
```
`now_entry_head` 是這輪要處理的未完成排序的串列的 `head`。
```c
struct list_head *now_entry_head = list_entry(now, begin_t, list)->head;
```
`pivot` 放入中間的串列。
```c
node_t *pivot = list_first_entry(now_entry_head, node_t, list);
list_del(&pivot->list);
list_add(&pivot->list, mid);
```
中間將資料依照大小區分為 `left` 與 `right` 的過程因為與之前無異,所以省略不提。
因為要依序將 `left`、`pivot`、`right` 放入 `begins` 中,所以會需要額外在堆疊中增加 2 個節點,而 `left` 就會直接將目前正在處理排序的鏈結串列取代,分別 push `pivot` 和 `right`。
注意如果 `left` 或 `right` 裡沒有東西,也會需要 push 空串列的 `head` 進去。
```c
// 1
list_splice_tail_init(left, now_entry_head);
INIT_LIST_HEAD(left);
// 2
begin_t *mid_begin = new_begin(mid);
list_add_tail(&mid_begin->list, begins);
INIT_LIST_HEAD(mid);
// 3
begin_t *right_begin = new_begin(right);
list_add_tail(&right_begin->list, begins);
INIT_LIST_HEAD(right);
```
`top` 的指標往上 2 格。`pop` 時則是往下一格。
```c
// push 2 elements into stack
now = now->next->next;
// pop 1 element out of stack
now = now->prev;
```
操作大致上是這樣,注意當要 pop `begins` 的節點時,由於內部是儲存鏈結串列的頭,所以要記得呼叫 `list_free` 來釋放記憶體空間。
#### 實驗結果
- 原本的程式的記憶體用量
![image](https://hackmd.io/_uploads/HkEBSiFap.png)
- 使用鏈結串列實作堆疊的記憶體使用量
![image](https://hackmd.io/_uploads/SyTzPsFaT.png)
在 100000 筆資料的情況下,用鏈結串列去實作堆疊可以節省約 0.8 MB 的記憶體。
### 測驗二
合併串列時,若 run 的數量等於或略小於 2 的冪,效率會最高;若略大於 2 的冪,效率就特別低。
所以要定義 minrun 來控制每個 run 的長度。
而為了避免一開始就掃描整個資料來產生 run, timsort 會用堆疊來暫存 run,運用 `merge_collapse` 函式來達成,該函式的功能如下:
- 檢查 A 的長度要大於 B 和 C 的長度總和
- 檢查 B 的長度要大於 C 的長度
注意合併只能在相鄰的 2 個 run 進行,以確保穩定性。
##### Galloping mode
在合併兩個序列 (A & B) 時,考慮到它們都是已經排序好的數列,因此可以加快比較的過程。
首先尋找 B 的首個元素 ( $B_0$ ) 在 A 中的排序位置,即可確定 A 中有一小段是小於 $B_0$,就不需處理這段,接著再尋找 A 的首個元素在 B 中的排序位置,如此反覆進行。
#### 程式碼解讀
##### `find_run`
如果目前節點比下一個節點大,則代表是降冪排列,需要將序列倒轉。`do while` 迴圈中間在將 `list` 所在節點與 `next` 交換,直到下一個元素大於等於目前元素,或沒有下一元素就停止。
```c
if (cmp(priv, list, next) > 0) {
/* decending run, also reverse the list */
struct list_head *prev = NULL;
do {
len++;
list->next = prev;
prev = list;
list = next;
next = list->next;
head = list;
} while (next && cmp(priv, list, next) > 0);
list->next = prev;
}
```
如果目前是升冪排列,則找到此升冪排列的最後一個元素
```c
else {
do {
len++;
list = next;
next = list->next;
} while (next && cmp(priv, list, next) <= 0);
list->next = NULL;
}
```
假設有 1 序列如下:
```graphviz
digraph structs {
node[shape=plaintext];
prev [fontcolor="black"]
list [fontcolor="black"]
next [fontcolor="black"]
head [fontcolor="black"]
node[shape=box];
struct0 [label= "3"]
struct1 [label= "2"]
struct2 [label= "1"]
struct3 [label= "3"]
{
rank="same";
struct0 -> struct1
struct1 -> struct2
struct2 -> struct3
}
{
list -> struct0
head -> struct0
next -> struct1
}
}
```
做完一次迴圈會變成這樣:
```graphviz
digraph structs {
node[shape=plaintext];
prev [fontcolor="black"]
list [fontcolor="black"]
next [fontcolor="black"]
head [fontcolor="black"]
node[shape=box];
struct0 [label= "3"]
struct1 [label= "2"]
struct2 [label= "1"]
struct3 [label= "3"]
{
rank="same";
struct1 -> struct2
struct2 -> struct3
}
{
list -> struct1
prev -> struct0
head -> struct1
next -> struct2
}
}
```
第二次:
```graphviz
digraph structs {
node[shape=plaintext];
prev [fontcolor="black"]
list [fontcolor="black"]
next [fontcolor="black"]
head [fontcolor="black"]
node[shape=box];
struct0 [label= "3"]
struct1 [label= "2"]
struct2 [label= "1"]
struct3 [label= "3"]
{
rank="same";
struct1 -> struct0
struct2 -> struct3
}
{
list -> struct2
prev -> struct1
head -> struct2
next -> struct3
}
}
```
結束後就會跳出 `while` 迴圈了,最後再設定 `list->next = prev`,與後續操作。
```c
head->prev = NULL;
head->next->prev = (struct list_head *) len;
result.head = head, result.next = next;
return result;
```
```graphviz
digraph structs {
node[shape=plaintext];
prev [fontcolor="black"]
list [fontcolor="black"]
next [fontcolor="black"]
head [fontcolor="black"]
node[shape=box];
struct0 [label= "3"]
struct1 [label= "2"]
struct2 [label= "1"]
struct3 [label= "3"]
{
rank="same";
struct1 -> struct0
struct2 -> struct1
}
{
list -> struct2
prev -> struct1
head -> struct2
next -> struct3
}
}
```
##### `*merge_collapse`
只要目前有超過 2 個 run,就會持續進行合併操作。
```c
while ((n = stk_size) >= 2)
```
如果前 4 個 run 中,
- run B 的長度小於等於 run C + run D
- run A 的長度小於等於 run B + run C
就會啟動合併。
```c
if ((n >= 3 &&
run_size(tp->prev->prev) <= run_size(tp->prev) + run_size(tp)) ||
(n >= 4 && run_size(tp->prev->prev->prev) <=
run_size(tp->prev->prev) + run_size(tp->prev)))
```
```graphviz
digraph structs {
node[shape=plaintext];
A [fontcolor="black"]
B [fontcolor="black"]
C [fontcolor="black"]
D [fontcolor="black"]
node[shape=box];
struct0 [label= "tp"]
struct1 [label= "tp->prev"]
struct2 [label= "tp->prev->prev"]
struct3 [label= "tp->prev->prev->prev"]
{
rank="same";
struct0 -> struct1
struct1 -> struct2
struct2 -> struct3
}
{
A -> struct3
B -> struct2
C -> struct1
D -> struct0
}
}
```
怎麼決定要合併哪個 run:
- run B 小於 run D:合併 run B 與 run C
- 否則合併 run C 與 run D
```c
if (run_size(tp->prev->prev) < run_size(tp)) {
tp->prev = merge_at(priv, cmp, tp->prev);
} else {
tp = merge_at(priv, cmp, tp);
}
```
如果只有 2 個 run,且 run C 小於等於 run D,則合併兩者。
```c
} else if (run_size(tp->prev) <= run_size(tp)) {
tp = merge_at(priv, cmp, tp);
} else {
break;
}
```
##### `*merge_force_collapse`
這個步驟會強制合併所有的 run,不須檢查要合併的 run 的長度和是否已經大於另一個 run。
注意,這個步驟會讓堆疊中剩下 2 個 run。
##### `timsort`
每次先找到一個 run,將這個 run 放入堆疊中,並將 `list` 指標移到剛剛找到的 run 的後面一個節點,然後觀察堆疊中最上面的 4 個 run,看需不需要合併,走訪完串列後,再將剩下的 run 合併。
```c
do {
/* Find next run */
struct pair result = find_run(priv, list, cmp);
result.head->prev = tp;
tp = result.head;
list = result.next;
stk_size++;
tp = merge_collapse(priv, cmp, tp);
} while (list);
```
#### 優化方案
:::info
[程式碼](<https://github.com/jujuegg/Linux-HW2/tree/master/Timsort>)
:::
我發現在[測驗題目](<https://hackmd.io/@sysprog/linux2024-quiz1#%E6%B8%AC%E9%A9%97-2>)中的 Galloping mode 方法並沒有實現在原本 timsort 的程式中,因此我選擇改優化這部分。但還沒開始實作就想到一個問題,程式是怎麼不用比較就找到要插入的位置的,所以這個想法先被擱置。
再來程式也沒有考慮到 minrun 的長度來調整每次找到的 run,目前想要改進這方面,但實作上遇到一點問題,正在調整中。
## 第二周測驗題
### 測驗一
#### 程式碼解讀
用於處理 hash 數值碰撞
```c
struct hlist_node {
struct hlist_node *next, **pprev;
};
```
Hash table 中的頭節點,唯一代表一 hash 值
```c
struct hlist_head {
struct hlist_node *first;
};
```
##### `hlist_add_head`
運作流程如下一系列圖:設有一 `hlist_node_new` 節點要執行此函式。首先,若該列表是有節點而非 NULL 的,則調整 `h->first->pprev` 指標(如圖二);接著調整新節點之 `*next` 及 `*pprev` 指標(如圖三);最後將頭節點之 `*first` 指向新節點(如圖四)即可。
```c
static inline void hlist_add_head(struct hlist_node *n, struct hlist_head *h)
{
if (h->first)
h->first->pprev = &n->next;
n->next = h->first; //AAAA
n->pprev = &h->first;
h->first = n;
}
```
```graphviz
digraph add_head {
rankdir=LR;
node[shape=record];
map [label="hash_table |<ht0> |<ht1> |<ht2>hlist_head2.first |<ht3> |<ht4>..."];
hn1 [label="hlist_node1 | {<prev>pprev | <next>next}"];
hn2 [label="hlist_node2 | {<prev>pprev | <next>next}"];
hn3 [label="hlist_node_new | {<prev>pprev | <next>next}"];
node [shape=none]
null [label=NULL]
map:ht2 -> hn1;
hn1:next -> hn2;
hn1:prev -> map:ht2;
hn2:next -> null;
hn2:prev -> hn1:next;
}
```
```graphviz
digraph add_head {
rankdir=LR;
node[shape=record];
map [label="hash_table |<ht0> |<ht1> |<ht2>hlist_head2.first |<ht3> |<ht4>..."];
hn1 [label="hlist_node1 | {<prev>pprev | <next>next}"];
hn2 [label="hlist_node2 | {<prev>pprev | <next>next}"];
hn3 [label="hlist_node_new | {<prev>pprev | <next>next}"];
node [shape=none]
null [label=NULL]
map:ht2 -> hn1;
hn1:next -> hn2;
hn1:prev -> hn3:next;
hn2:next -> null;
hn2:prev -> hn1:next;
}
```
```graphviz
digraph add_head {
rankdir=LR;
node[shape=record];
map [label="hash_table |<ht0> |<ht1> |<ht2>hlist_head2.first |<ht3> |<ht4>..."];
hn1 [label="hlist_node1 | {<prev>pprev | <next>next}"];
hn2 [label="hlist_node2 | {<prev>pprev | <next>next}"];
hn3 [label="hlist_node_new | {<prev>pprev | <next>next}"];
node [shape=none]
null [label=NULL]
map:ht2 -> hn1;
hn1:next -> hn2;
hn1:prev -> hn3:next;
hn2:next -> null;
hn2:prev -> hn1:next;
hn3:next -> hn1;
hn3:prev -> map:ht2;
}
```
```graphviz
digraph add_head {
rankdir=LR;
node[shape=record];
map [label="hash_table |<ht0> |<ht1> |<ht2>hlist_head2.first |<ht3> |<ht4>..."];
hn1 [label="hlist_node1 | {<prev>pprev | <next>next}"];
hn2 [label="hlist_node2 | {<prev>pprev | <next>next}"];
hn3 [label="hlist_node_new | {<prev>pprev | <next>next}"];
node [shape=none]
null [label=NULL]
map:ht2 -> hn3;
hn1:next -> hn2;
hn1:prev -> hn3:next;
hn2:next -> null;
hn2:prev -> hn1:next;
hn3:next -> hn1;
hn3:prev -> map:ht2;
}
```
##### `*dfs`
`pre_low/high` 即該 `*preorder` 之左右範圍;`in_low/high` 同理,為 `*inorder` 之左右範圍。
要知道的是,此測驗一之程式碼即是要重建二元樹,且 dfs 存取順序實際上就是 preorder,所以會傳 `preorder[pre_low]`(表中間節點) 進 `find()`,以尋找該節點在 inorder 中的位置索引 `idx`,找到後「切分左右子樹(`tn->left` 及 `tn->right`)」,分別再對 inorder 的 `in_low ~ (idx-1)` 與 `(idx+1) ~ in_high` 作 dfs。
由 preorder 可知哪個節點在上面(越靠前的越上面)、而由 inorder 可知哪個節點靠左(越靠前的越左邊),於是可得知 preorder 的第一個元素一定是 root 的值,且該元素對應到 inorder 後,左邊的值就在左邊,右邊的值就在右邊。
接著就將 left 和 right 分別當作新的 root node 下去做即可。遞迴的中止條件是在 inorder 中找不到對應元素。
```c
static struct TreeNode *dfs(int *preorder,
int pre_low,
int pre_high,
int *inorder,
int in_low,
int in_high,
struct hlist_head *in_heads,
int size)
{
if (in_low > in_high || pre_low > pre_high)
return NULL;
struct TreeNode *tn = malloc(sizeof(*tn));
tn->val = preorder[pre_low];
int idx = find(preorder[pre_low], size, in_heads);
tn->left = dfs(preorder, pre_low + 1, pre_low + (idx - in_low), inorder,
in_low, idx - 1, in_heads, size);
tn->right = dfs(preorder, pre_high - (in_high - idx - 1), pre_high, inorder,
idx + 1, in_high, in_heads, size);
return tn;
}
```
##### `buildTree`
建樹的程式碼。起初會先將 inorder 節點建立一 hash table 以方便 `dfs()` 函式的內部實作,之後再呼叫 `dfs()` 函式,遞迴求出該二元樹並回傳結果。
```c
static struct TreeNode *buildTree(int *preorder,
int preorderSize,
int *inorder,
int inorderSize)
{
struct hlist_head *in_heads = malloc(inorderSize * sizeof(*in_heads));
for (int i = 0; i < inorderSize; i++)
INIT_HLIST_HEAD(&in_heads[i]);
for (int i = 0; i < inorderSize; i++)
node_add(inorder[i], i, inorderSize, in_heads);
return dfs(preorder, 0, preorderSize - 1, inorder, 0, inorderSize - 1,
in_heads, inorderSize);
}
```
#### 測試程式
:::info
[程式碼](<https://github.com/jujuegg/Linux-HW2/tree/master/BinaryTreeTesting>)
:::
設定一 preorder 及 inorder 的序列,其會有一對應的樹,也會有對應的 postorder 序列。本測試程式即比較 buildTree 後的 postorder,與期望的 postorder 有無差別,若無,表示程式運作無誤。
#### 嘗試改進
- 用紅黑樹代替鏈表之儲存資料方式
用 Hash table 找特定資料的時間複雜度通常是 $O(1)$,但在最差情況下(目標資料在鏈表尾端),就可能會達到 $O(n)$。
因為碰撞發生的時候,要線性查找鏈表以找到目標資料。若用紅黑樹儲存的話,其可以保持樹的高度平衡,來保證在最壞情況下的查找時間複雜度降為 $O(log n)$。
比起 Hash table,紅黑樹在處理碰撞時不用線性查找鏈表,而是透過比較節點值來定位目標資料,所以其在查找上的性能更為穩定,特別是在「處理大量資料」或「碰撞發生頻繁」的情況下,紅黑樹的優勢會更明顯。
#### 探討 Pre-order Walk
參照 [linux/kernel/cgroup/cgroup.c](https://github.com/torvalds/linux/blob/master/kernel/cgroup/cgroup.c),發現 `css_next_descendant_pre()` 實作了 Pre-order Walk。
- `css_next_descendant_pre()`:找到下一個要進行 Preorder Traversal 的後代節點
- `@pos`:當前位置
- `@root`:要走訪其後代節點的 CSS
```c
struct cgroup_subsys_state *
css_next_descendant_pre(struct cgroup_subsys_state *pos,
struct cgroup_subsys_state *root)
{
struct cgroup_subsys_state *next;
cgroup_assert_mutex_or_rcu_locked();
/* if first iteration, visit @root */
if (!pos)
return root;
/* visit the first child if exists */
next = css_next_child(NULL, pos);
if (next)
return next;
/* no child, visit my or the closest ancestor's next sibling */
while (pos != root) {
next = css_next_child(pos, pos->parent);
if (next)
return next;
pos = pos->parent;
}
return NULL;
}
```
尋找要訪問的下一個後代節點,以進行 `@root` ((含)第一個要訪問的節點)的後代節點的 Preorder Traversal。
雖然此函式需要 cgroup_mutex 或 RCU read locking,但不需要整個走訪過程都跑在 critical section 裡。 只要 `@pos` 和 `@root` 可訪問,且 `@pos` 是 `@root` 的後代節點,其就會回傳下一個後代節點。
因此,其用於實現對 cgroup 子系統狀態的一種很有效率的 Pre-access traversal。
且在「遞迴訪問 cgroup 的後代來走訪整個 cgroup 樹」的過程中,還保持對 cgroup_mutex 或 RCU read locking 的佔用,確保不會有多餘一個 process 跑進 critical section。
### 測驗二
#### 程式碼解讀
`hlist_del()` 的操作邏輯如下系列圖(假設要刪除 node1),簡單來說就是刪除一個鏈表節點。
```graphviz
digraph hlist_del {
rankdir=LR;
node[shape=record];
map [label="hash_table |<ht0> |<ht1> |<ht2>hlist_head2.first |<ht3> |<ht4>..."];
hn0 [label="hlist_node0 | {<prev>pprev | <next>next}"];
hn1 [label="hlist_node1(n) | {<prev>pprev | <next>next}"];
hn2 [label="hlist_node2 | {<prev>pprev | <next>next}"];
node [shape=none]
null [label=NULL]
map:ht2 -> hn0;
hn0:next -> hn1;
hn0:prev -> map:ht2;
hn1:next -> hn2;
hn1:prev -> hn0:next;
hn2:next -> null;
hn2:prev -> hn1:next;
}
```
```graphviz
digraph hlist_del {
rankdir=LR;
node[shape=record];
map [label="hash_table |<ht0> |<ht1> |<ht2>hlist_head2.first |<ht3> |<ht4>..."];
hn0 [label="hlist_node0 | {<prev>pprev | <next>next}"];
hn1 [label="hlist_node1(n) | {<prev>pprev | <next>next}"];
hn2 [label="hlist_node2 | {<prev>pprev | <next>next}"];
node [shape=none];
null [label=NULL];
node [shape=none];
next [label=next];
next -> hn2;
pprev [label=pprev];
pprev -> hn0:next;
map:ht2 -> hn0;
hn0:next -> hn1;
hn0:prev -> map:ht2;
hn1:next -> hn2;
hn1:prev -> hn0:next;
hn2:next -> null;
hn2:prev -> hn1:next;
}
```
```graphviz
digraph hlist_del {
rankdir=LR;
node[shape=record];
map [label="hash_table |<ht0> |<ht1> |<ht2>hlist_head2.first |<ht3> |<ht4>..."];
hn0 [label="hlist_node0 | {<prev>pprev | <next>next}"];
hn1 [label="hlist_node1(n) | {<prev>pprev | <next>next}"];
hn2 [label="hlist_node2 | {<prev>pprev | <next>next}"];
node [shape=none];
null [label=NULL];
node [shape=none];
next [label=next];
next -> hn2;
pprev [label=pprev];
pprev -> hn0:next;
map:ht2 -> hn0;
hn0:next -> hn2;
hn0:prev -> map:ht2;
hn1:next -> hn2;
hn1:prev -> hn0:next;
hn2:next -> null;
hn2:prev -> hn0:next;
}
```
LRU Cache 及 LRU Node 的結構如下:
- `capacity`:該 LRU cache 「能紀錄的資料量」
- `count`:當前該 LRU cache 紀錄的「資料筆數」
- `dhead`:一 Doubly Linked List,將所有資料串接起來
- `hhead`:處理碰撞使用的 hlist 結構體陣列
```c
typedef struct {
int capacity;
int count;
struct list_head dhead;
struct hlist_head hhead[];
} LRUCache;
typedef struct {
int key;
int value;
struct hlist_node node;
struct list_head link;
} LRUNode;
```
```graphviz
digraph LRUcache {
node [shape=record];
rankdir = LR;
subgraph cluster_a {
label = LRUCache;
struct0 [label="capacity"];
struct1 [label="count"];
struct2 [label="dhead | {<prev>prev | <next>next}"];
struct3 [label="<head>hhead[0] | hhead[1] | ..."];
}
subgraph cluster_b {
label = LRUNode1;
struct4 [label="key1"];
struct5 [label="value1"];
struct6 [label="link | {<prev>prev | <next>next}"];
struct7 [label="node | {<prev>pprev | <next>next}"];
}
subgraph cluster_c {
label = LRUNode2;
struct8 [label="key2"];
struct9 [label="value2"];
struct10 [label="link | {<prev>prev | <next>next}"];
struct11 [label="node | {<prev>pprev | <next>next}"];
}
node [shape=none];
null1 [label="NULL"];
null2 [label="NULL"];
struct2:next -> struct6;
struct6:prev -> struct2;
struct6:next -> struct10;
struct10:prev -> struct6;
struct10:next -> null1;
struct3:head -> struct7;
struct7:prev -> struct3:head;
struct7:next -> struct11;
struct11:prev -> struct7;
struct11:next -> null2;
}
```
##### `lRUCacheCreate`
建立一 LRU Cache 並初始化其內部變數、結構,並指定其能紀錄的資料量。
##### `lRUCacheFree`
釋放該 Cache 的 dhead 串內,指定 LRU 節點的記憶體空間(`list_head link`)。
```c
void lRUCacheFree(LRUCache *obj)
{
struct list_head *pos, *n;
list_for_each_safe (pos, n, &obj->dhead) {
LRUNode *cache = list_entry(pos, LRUNode, link); //FFFF
list_del(&cache->link);
free(cache);
}
free(obj);
}
```
##### `lRUCacheGet`
其 hash 值是透過「key % 該 cache 之可記錄資料量」計算的。
取得 hash 值後,到該 hash 值對應的 `hhead[hash]` 走訪該鏈表,若有找到對應 key 值,則回傳該節點之 `value`;否則回傳 `-1`,表未有此 key 值。
從這裡可揣摩出使用 LRU Cache 的好處及意義。若 hash function 設計得當,找到對應資料需要的時間複雜度會很小;倘若只使用 dhead 雙向鏈結串列,走訪整條串列來找資料會花很久的時間,最差能到 $O(n)$。
```c
int lRUCacheGet(LRUCache *obj, int key)
{
int hash = key % obj->capacity;
struct hlist_node *pos;
hlist_for_each (pos, &obj->hhead[hash]) {
LRUNode *cache = list_entry(pos, LRUNode, node); //HHHH
if (cache->key == key) {
list_move(&cache->link, &obj->dhead); //IIII
return cache->value;
}
}
return -1;
}
```
##### `lRUCachePut`
先找看看對應 hash 值的 `hhead[hash]` 鏈表內有無對應 key 值之節點。若有,則將該節點「移到雙向鏈結串列的最前端」,表「**最近使用過**」,並指定該 cache 之 value 為此節點之值(更新其值);若無對應 key 值之節點,表「**Cache Miss**」,有兩種運作情況:
- 若**快取已滿**,須將「雙向鏈結串列 dhead 最尾端的節點(表 Least Recently Used 節點)」刪除,並將新節點插入到 hhead 的頭部,同時添加到 dhead 的前面。
- 若**快取未滿**,則創建一個新的節點,並將其添加到 hhead 的頭部,同時添加到 dhead 的最前端,更新快取的 key 值。
#### 測試程式
:::info
[程式碼](<https://github.com/jujuegg/Linux-HW2/tree/master/LRUCacheTesting>)
:::
撰寫一 `checkLRU()` 函式回傳 bool,搭配 `assert()` 以測試 dhead 中的首個元素,是否為最近互動過的 key 值節點。
#### 嘗試改進
`lRUCachePut()`:Cache Hit 時,除了將節點移動 dhead 最前端,對應 hash 值的`hhead[hash]`內的對應節點,也應該要移到最前端。
```diff
if (c->key == key) {
list_move(KKKK, &obj->dhead);
cache = c;
+ hlist_del(&c->node);
+ hlist_add_head(&c->node, &obj->hhead[hash]);
}
```
#### 探討 Linux 的 LRU 相關機制
比較常用到的 page 會被放到 `active` 鏈表,不常用的 page 則放到 `inactive` 鏈表中。兩列表的 page 可相互轉移,若 `active` 的 page 越來越不常使用,其可能被轉放到 `inactive` 鏈表。
Linux 在作 Page 回收時,會先從 `inactive` 鏈表的==尾部==開始回收。
每個 zone 有 5 個 LRU 鏈表,用來放不同「最近使用狀態」的 pages,其中 INACTIVE_ANON、ACTIVE_ANON、INACTIVE_FILE 及 ACTIVE_FILE 中的 pages 是可以回收的。
```c
enum lru_list {
LRU_INACTIVE_ANON = LRU_BASE,
LRU_ACTIVE_ANON = LRU_BASE + LRU_ACTIVE,
LRU_INACTIVE_FILE = LRU_BASE + LRU_FILE,
LRU_ACTIVE_FILE = LRU_BASE + LRU_FILE + LRU_ACTIVE,
LRU_UNEVICTABLE,
NR_LRU_LISTS
};
```
另外,為了評估 page 的常用程度,Kernel 引入了 `PG_referenced` 及 `PG_active` 兩個 bits。其一表示「當前活躍程度」,其一表示「最近是否被引用過」。
![圖片](https://hackmd.io/_uploads/BJq-4Udpa.png)
> Refer from [Linux Kernel 內存回收機制](http://www.wowotech.net/memory_management/233.html/comment-page-2)。
基本上有以下步驟:
1. 如果 page 很常用,設定 PG_active bit,並將其放在 ACTIVE LRU 鏈表;反之在 INACTIVE。
2. 每次存取 page 時,透過 `mark_page_accessed()` 來設定 PG_referenced bit。
3. 使用 `page_referenced()` 函式,透過 PG_referenced bit 以及「由逆向映射提供的信息」來確定 page 常用程度(每次清除該 bit 時都會檢測)。
4. 再次呼叫 `mark_page_accessed()`。此時,如果發現 PG_referenced bit 已被設定,表 `page_referenced()` 沒有作檢查,因此對於 `mark_page_accessed()` 的呼叫比 `page_referenced()` 更頻繁-表該 page 經常被存取。
> 如果該 page 位於 INACTIVE 鏈表,將其移動到 ACTIVE,此外還會設定 PG_active bit,並清除 PG_referenced bit;
5. 也有可能從 ACTIVE 移到 INACTIVE-在 page 比較不常用時,可能連續呼叫兩次 `page_referenced()` 而途中沒有 `mark_page_accessed()`。
如果存取內存 page 的頻率穩定,表對 `page_referenced()` 和 `mark_page_accessed()` 的呼叫大致上平衡,此時會把 page 放在「當前 LRU 鏈表」。
此動作確保了 page 不會在 ACTIVE 與 INACTIVE 鏈表間跳來跳去。
#### Page 回收的實現
主要有兩種觸發方式:
- 內存嚴重不足。透過 `try_to_free_pages()` 函式來檢查內存區域的 pages。
- 透過後台程序 `kswapd` 來觸發,其會週期性檢查內存夠不夠,若不足,則會觸發 page 回收機制。使用 `balance_pgdat()` 作為媒介。
參照 [linux/mm/vmscan.c](https://github.com/torvalds/linux/blob/master/mm/vmscan.c)。
當呼叫 try_to_free_pages() 時,其會重複呼叫 `shrink_zones()` 和 `shrink_slab()` 來釋放一定數量的 pages(預設為 32 個)。而 `shrink_zones()` 會分別對內存區域列表中的所有區域呼叫 `shrink_node()` 函式-其即是「從內存中**回收較少使用的 pages**」的核心函式。
```c=
static void shrink_node(pg_data_t *pgdat, struct scan_control *sc)
{
unsigned long nr_reclaimed, nr_scanned, nr_node_reclaimed;
struct lruvec *target_lruvec;
bool reclaimable = false;
if (lru_gen_enabled() && root_reclaim(sc)) {
lru_gen_shrink_node(pgdat, sc);
return;
}
target_lruvec = mem_cgroup_lruvec(sc->target_mem_cgroup, pgdat);
again:
memset(&sc->nr, 0, sizeof(sc->nr));
nr_reclaimed = sc->nr_reclaimed;
nr_scanned = sc->nr_scanned;
prepare_scan_control(pgdat, sc);
shrink_node_memcgs(pgdat, sc);
flush_reclaim_state(sc);
nr_node_reclaimed = sc->nr_reclaimed - nr_reclaimed;
if (!sc->proactive)
vmpressure(sc->gfp_mask, sc->target_mem_cgroup, true,
sc->nr_scanned - nr_scanned, nr_node_reclaimed);
if (nr_node_reclaimed)
reclaimable = true;
if (current_is_kswapd()) {
if (sc->nr.writeback && sc->nr.writeback == sc->nr.taken)
set_bit(PGDAT_WRITEBACK, &pgdat->flags);
if (sc->nr.unqueued_dirty == sc->nr.file_taken)
set_bit(PGDAT_DIRTY, &pgdat->flags);
if (sc->nr.immediate)
reclaim_throttle(pgdat, VMSCAN_THROTTLE_WRITEBACK);
}
if (sc->nr.dirty && sc->nr.dirty == sc->nr.congested) {
if (cgroup_reclaim(sc) && writeback_throttling_sane(sc))
set_bit(LRUVEC_CGROUP_CONGESTED, &target_lruvec->flags);
if (current_is_kswapd())
set_bit(LRUVEC_NODE_CONGESTED, &target_lruvec->flags);
}
if (!current_is_kswapd() && current_may_throttle() &&
!sc->hibernation_mode &&
(test_bit(LRUVEC_CGROUP_CONGESTED, &target_lruvec->flags) ||
test_bit(LRUVEC_NODE_CONGESTED, &target_lruvec->flags)))
reclaim_throttle(pgdat, VMSCAN_THROTTLE_CONGESTED);
if (should_continue_reclaim(pgdat, nr_node_reclaimed, sc))
goto again;
if (reclaimable)
pgdat->kswapd_failures = 0;
}
```
其第 22 行的 `shrink_node_memcgs()` 會呼叫 `shrink_lruvec()` 及 `shrink_slab()` 等與 page 回收相關的函式,並會直/間接呼叫下列兩函式:
- `shrink_active_list()`: 將某些 pages 移到 active 鏈表。
- `shrink_inactive_list()`:inactive 鏈表中選擇一定數量的 pages 放到一臨時鏈表中,最終呼叫 [page_alloc.c](https://github.com/torvalds/linux/blob/master/mm/page_alloc.c#L2520) 中定義的`free_unref_page_list()` 函式將其回收。
### 測驗三
#### 程式碼理解
這些巨集定義了一些用來計算**對應數字的二進制表示**中「包含多少個位元值為 1」的函式,其可以針對 8-bit、16-bit、32-bit、64-bit 等數字去執行。
```c
#define __const_hweight8(w) \
((unsigned int) ((!!((w) & (1ULL << 0))) + (!!((w) & (1ULL << 1))) + \
(!!((w) & (1ULL << 2))) + (!!((w) & (1ULL << 3))) + \
(!!((w) & (1ULL << 4))) + (!!((w) & (1ULL << 5))) + \
(!!((w) & (1ULL << 6))) + (!!((w) & (1ULL << 7)))))
#define __const_hweight16(w) (__const_hweight8(w) + __const_hweight8((w) >> 8))
#define __const_hweight32(w) \
(__const_hweight16(w) + __const_hweight16((w) >> 16))
#define __const_hweight64(w) \
(__const_hweight32(w) + __const_hweight32((w) >> 32))
```
##### `FIND_NTH_BIT`
此巨集用來「在指定的位圖中,由左至右,查找第 n 個 **set bit**(即位元值為 1 的位元)所對應的索引」。
首先計算字中被設置的位元數目,然後比較以找到第 `num` 個被設置的位元。如果找到了,回傳該位元的索引值;否則,繼續往下個位元找。
##### `__clear_bit`
傳入一 `addr`,設定其「特定的位元」為 0。
```c
static inline void __clear_bit(unsigned long nr, volatile unsigned long *addr)
{
unsigned long mask = BIT_MASK(nr);
unsigned long *p = ((unsigned long *) addr) + BIT_WORD(nr);
*p &= ~mask;
}
```
##### `fns`
與 `ffs()` 不同的是,此函式目的是在 `word` 中尋找並回傳「第 n 個被設置過的位元(即位元 1)之索引值」。如果 `word` 中沒有位元被設置過,則回傳 `BITS_PER_LONG`。
其用到 `__ffs()` 來找每次循環中「第一個被設置的位元」,然後將其索引位置存進 `bit` 變數中。一旦 n 的值為 0,則立即回傳當前的 bit 值。否則,用 `__clear_bit()` 清除該位元(成 0),然後 n 減少 1。反覆執行,直到找到第 n 個被設置的位元,最後回傳其索引位置。
舉例來說:假設 `word` 的二進位表示為 `11001001`,而我們想要找到第 2 個被設置的位元(即由右至左,找到第 2 個位元 1 的位置)。首先,`__ffs()` 先回傳 0,因為第一個被設置的位元在索引 0 上,然後清除此位元,繼續尋找下一個被設置的位元。照此邏輯,最後會回傳索引值 **3**。
```c
static inline unsigned long fns(unsigned long word, unsigned int n)
{
while (word) {
unsigned int bit = __ffs(word);
if (n-- == 0)
return bit;
__clear_bit(bit, &word);
}
return BITS_PER_LONG;
}
```
##### `find_nth_bit`
此函式用來「在一個記憶體區域中找到第 n 個被設置的位元的索引位置」。
其會先檢查傳入的 `size` 是否小於等於機器字長(`BITS_PER_LONG`,透過 `small_const_nbits(size)` 檢查),並且大於 0。
- 如果是,表示位元索引在 `size` 內,這時**可以直接對記憶體中的位元進行處理,而不用跨字節**-先用 `GENMASK()` 設定一個遮罩,把要處理的範圍遮出來;然後,將遮罩後的值傳進 `fns()` 來找第 n 個被設置的位元的索引位置。
- 否則,需要**在處理位元時跨越字節邊界**。需呼叫 `FIND_NTH_BIT()`,來處理跨字節邊界的情況。
#### Linux 的 `find_nth_bit()` 應用案例之解說與分析
`find_nth_bit()` 通常用在處理器排程相關的操作中。透過 CPU mask 等技術實作相關位元操作,來表示程序的 CPU 指定,而要得知某程序中的 CPU 指定狀況,就有可能會需要用到 `find_nth_bit()` 的函式邏輯。
對該函式延伸搜尋,可知其在 Linux 原始程式碼中使用在 [linux/kernel/trace/trace_events_filter.c](https://github.com/torvalds/linux/blob/master/kernel/trace/trace_events_filter.c#L688) 中的 `do_filter_cpumask_scalar()` 函式,而其又是以 `cpumask_nth()` 函式來呼叫 `find_nth_bit()`。其主要目的就是透過 `find_nth_bit()` 函式,來找尋 cpu mask 中的第 n 個 CPU 之位置。
對於 CPU mask 的概念與詳細理解,可參閱 [Linux Kernel:CPU 狀態管理之 cpumask](https://zhuanlan.zhihu.com/p/670194359)。
利用 struct cpumask 這個 bitmap 來代表系統當中 CPU 的集合,每個 bit 代表一個 CPU,其中只有 `nr_cpu_ids` 個位元是有效的。
#### CPU Affinity
其又稱 Processor Affinity。
指在 Linux 中,process 想儘量長時間運行在某個指定的 CPU 上 ,且不被轉移至其他 CPU 的傾向。
在 Symmetric Multi-Processing 的架構下,Linux 排程器會根據 CPU Affinity 的設置,讓指定的程序運行在「指定」的 CPU 上,而不會在別的 CPU 上運行。
CPU Affinity 會用 bitmask 表示,每個位元表一個 CPU,若為 1,則表示「已指定」。
最低位元表第一個 CPU;最高位元則表最後一個 CPU。
> 舉個例子(用十六進制表示):
`0x00000001` 表指定*處理器 1*;
`0xFFFFFFFF` 表指定*所有處理器(0~31)*。
參照 [linux/kernel/sched/core.c](https://github.com/torvalds/linux/blob/master/kernel/sched/core.c#L8396),其檔案內容與處理排程非常相關,其中有 `sched_setaffinity()` 函式也用到了 CPU mask 的概念,而這也是使用到 `find_nth_bit()` 的前置步驟,沒有處理器遮罩,就不會有相關位元操作來找 CPU 了。
可透過 `sched_setaffinity` 函式來指定 CPU Affinity。