Try   HackMD

2024q1 Homework2 (quiz1+2)

contributed by < jujuegg >

第一周測驗題

測驗一

本測驗是在鏈結串列上實作 quick sort,且是非遞迴版本的,使用堆疊來模擬原本的遞迴呼叫。

程式碼解讀

// variables
node_t *begin[max_level], *end[max_level];
node_t *result = NULL, *left = NULL, *right = NULL;
  • beginend 是用來記錄 pivot 未完成排序的節點。
  • max_level 是串列長度的 2 倍。
  • leftright 分別存放比 pivot 大/小 的節點。
begin[0] = *list;
end[0] = list_tail(list);

begin 紀錄頭端,而 end 紀錄尾端,所以一開始把整個串列視為未完成排序。

node_t *L = begin[i], *R = end[i];

每一輪使用 LR 此輪要排序的串列的頭和尾端。i 是用來告知程式碼目前要處理的未完成排序的串列的所在位置。

此輪有不只一個節點時
node_t *pivot = L;
value = pivot->value;
node_t *p = pivot->next;
pivot->next = NULL;

如果兩個節點的值不一樣,則代表長度 > 1,即可把第一個節點當作 pivot,也就是 L 節點。
接著用 p 來走訪整個串列。

while (p) {
    node_t *n = p;
    p = p->next;    //CCCC
    list_add(n->value > value ? &right : &left, n);
}

p 會提前跑到下一個節點的位置,有點像 list_for_each_safe 的概念。並把目前節點與 pivot 比較,大的就加進 right;小於等於的加進 left

begin[i] = left;
end[i] = list_tail(&left);         //DDDD
begin[i + 1] = pivot;
end[i + 1] = pivot;
begin[i + 2] = right;
end[i + 2] = list_tail(&right);    //EEEE

left = right = NULL;
i += 2;

分好大小邊後,把 left 裡面的節點設置為「第 i 個」未處理的串列,把 right 裡面的節點設置為「第 i+2 個」未處理的串列。而 pivot 設置為「第 i+1 個」未處理的串列。
最後把兩邊清空,調整 i,準備下一輪。

此輪 只有一個節點/沒有節點 時
if (L)
    list_add(&result, L);
i--;

若此輪只有一個節點,此時它會是所有未完成排序的節點中值最大的,直接將其加到最終串列中。這蠻好理解的,因為每輪結束的時候,i 會加 2,代表程式選取 right 串列作為下一輪的輸入。

要注意,如果上一輪中,沒有存在比 pivot 大的值,那這邊有可能會是空的。

圖解

  • list 如下:






structs



begin_0
begin_0



struct0

3



begin_0->struct0





end_0
end_0



struct4

1



end_0->struct4





pivot
pivot



pivot->struct0





struct1

4



struct0->struct1





struct2

5



struct1->struct2





struct3

2



struct2->struct3





struct3->struct4





  • pivot 以外的所有節點逐一跟 pivot 比較,並分到 right / left 中。






structs



pivot
pivot



struct0

3



pivot->struct0





right
right



struct2

5



right->struct2





left
left



struct4

1



left->struct4





struct1

4



struct2->struct1





struct3

2



struct4->struct3





  • 把這 3 個串列分別放入 beginend 正確的位置。下一輪則用 i == 2 的位置進行排序。






structs



begin_0
begin_0



struct4

1



begin_0->struct4





begin_1
begin_1



struct0

3



begin_1->struct0





begin_2
begin_2



struct2

5



begin_2->struct2





end_0
end_0



struct3

2



end_0->struct3





end_1
end_1



end_1->struct0





end_2
end_2



struct1

4



end_2->struct1





struct2->struct1





struct4->struct3





優化方案

  • 或許 beginend 也可以用鏈結串列來儲存?

    • 動機:程式利用 i 來存取這兩個陣列,但 i 在執行程式的過程中並不會大範圍跳躍,最多連續加 2 而已,可以避免宣告不必要的記憶體空間。
  • 使用雙向鏈結串列。

使用 Linux 核心風格的 List API 改寫

首先把 quick_sort.h 裡面宣告的結構改成用 list_head,以及把上個作業的 list.h 拿過來使用,有些重複的函式就統一使用內建的 API。

  typedef struct __node {
-     struct __node *left, *right;
-     struct __node *next;
+     struct list_head list;
      long value;
  } node_t;

接著是最麻煩的地方,雖然想法很簡單,但是我卻在這邊花了將近半天的時間,期間一直發生 Segmentation fault,非常的折磨人。但沒辦法,菜就多練

我建立了另一個結構 begin_t,它的作用是儲存那些尚未完成排序的串列的 「head」,然後再將此節點存入另一個鏈結串列中,並不像原本的程式碼是用陣列來模擬堆疊。原諒我不會取名。

typedef struct begins {
    struct list_head list;
    struct list_head *head;
} begin_t;

現在要使用 Linux 的 API,所以變數形態要更換,且因為使用雙向鏈結串列,所以不須留下 end

- node_t *begin[max_level], *end[max_level];
- node_t *result = NULL, *left = NULL, *right = NULL;

+ struct list_head *begins = malloc(sizeof(struct list_head));
+ struct list_head *result = malloc(sizeof(struct list_head));
+ struct list_head *left = malloc(sizeof(struct list_head));
+ struct list_head *right = malloc(sizeof(struct list_head));
+ struct list_head *mid = malloc(sizeof(struct list_head));

一開始把整個串列的 head 放進 begins 中。
now 是用來模擬堆疊的 top 的位置。
begins 則是目前所有未完成排序的串列所在的堆疊。

begin_t *all = new_begin(*head);
list_add_tail(&all->list, begins);
    
struct list_head *now = begins->next;

now_entry_head 是這輪要處理的未完成排序的串列的 head

struct list_head *now_entry_head = list_entry(now, begin_t, list)->head;

pivot 放入中間的串列。

node_t *pivot = list_first_entry(now_entry_head, node_t, list);
list_del(&pivot->list);
list_add(&pivot->list, mid);

中間將資料依照大小區分為 leftright 的過程因為與之前無異,所以省略不提。

因為要依序將 leftpivotright 放入 begins 中,所以會需要額外在堆疊中增加 2 個節點,而 left 就會直接將目前正在處理排序的鏈結串列取代,分別 push pivotright
注意如果 leftright 裡沒有東西,也會需要 push 空串列的 head 進去。

// 1
list_splice_tail_init(left, now_entry_head);
INIT_LIST_HEAD(left);

// 2            
begin_t *mid_begin = new_begin(mid);
list_add_tail(&mid_begin->list, begins);
INIT_LIST_HEAD(mid);

// 3
begin_t *right_begin = new_begin(right);
list_add_tail(&right_begin->list, begins);
INIT_LIST_HEAD(right);

top 的指標往上 2 格。pop 時則是往下一格。

// push 2 elements into stack
now = now->next->next;
// pop 1 element out of stack
now = now->prev;

操作大致上是這樣,注意當要 pop begins 的節點時,由於內部是儲存鏈結串列的頭,所以要記得呼叫 list_free 來釋放記憶體空間。

實驗結果

  • 原本的程式的記憶體用量

image

  • 使用鏈結串列實作堆疊的記憶體使用量

image

在 100000 筆資料的情況下,用鏈結串列去實作堆疊可以節省約 0.8 MB 的記憶體。

測驗二

合併串列時,若 run 的數量等於或略小於 2 的冪,效率會最高;若略大於 2 的冪,效率就特別低。
所以要定義 minrun 來控制每個 run 的長度。

而為了避免一開始就掃描整個資料來產生 run, timsort 會用堆疊來暫存 run,運用 merge_collapse 函式來達成,該函式的功能如下:

  • 檢查 A 的長度要大於 B 和 C 的長度總和
  • 檢查 B 的長度要大於 C 的長度

注意合併只能在相鄰的 2 個 run 進行,以確保穩定性。

Galloping mode

在合併兩個序列 (A & B) 時,考慮到它們都是已經排序好的數列,因此可以加快比較的過程。

首先尋找 B 的首個元素 (

B0 ) 在 A 中的排序位置,即可確定 A 中有一小段是小於
B0
,就不需處理這段,接著再尋找 A 的首個元素在 B 中的排序位置,如此反覆進行。

程式碼解讀

find_run

如果目前節點比下一個節點大,則代表是降冪排列,需要將序列倒轉。do while 迴圈中間在將 list 所在節點與 next 交換,直到下一個元素大於等於目前元素,或沒有下一元素就停止。

if (cmp(priv, list, next) > 0) {
    /* decending run, also reverse the list */
    struct list_head *prev = NULL;
    do {
        len++;
        list->next = prev;
        prev = list;
        list = next;
        next = list->next;
        head = list;
    } while (next && cmp(priv, list, next) > 0);
    list->next = prev;
}

如果目前是升冪排列,則找到此升冪排列的最後一個元素

else {
    do {
        len++;
        list = next;
        next = list->next;
    } while (next && cmp(priv, list, next) <= 0);
    list->next = NULL;
}

假設有 1 序列如下:







structs



prev
prev



list
list



struct0

3



list->struct0





next
next



struct1

2



next->struct1





head
head



head->struct0





struct0->struct1





struct2

1



struct1->struct2





struct3

3



struct2->struct3





做完一次迴圈會變成這樣:







structs



prev
prev



struct0

3



prev->struct0





list
list



struct1

2



list->struct1





next
next



struct2

1



next->struct2





head
head



head->struct1





struct1->struct2





struct3

3



struct2->struct3





第二次:







structs



prev
prev



struct1

2



prev->struct1





list
list



struct2

1



list->struct2





next
next



struct3

3



next->struct3





head
head



head->struct2





struct0

3



struct1->struct0





struct2->struct3





結束後就會跳出 while 迴圈了,最後再設定 list->next = prev,與後續操作。

head->prev = NULL;
head->next->prev = (struct list_head *) len;
result.head = head, result.next = next;
return result;






structs



prev
prev



struct1

2



prev->struct1





list
list



struct2

1



list->struct2





next
next



struct3

3



next->struct3





head
head



head->struct2





struct0

3



struct1->struct0





struct2->struct1





*merge_collapse

只要目前有超過 2 個 run,就會持續進行合併操作。

while ((n = stk_size) >= 2)

如果前 4 個 run 中,

  • run B 的長度小於等於 run C + run D
  • run A 的長度小於等於 run B + run C

就會啟動合併。

if ((n >= 3 &&
     run_size(tp->prev->prev) <= run_size(tp->prev) + run_size(tp)) ||
    (n >= 4 && run_size(tp->prev->prev->prev) <=
               run_size(tp->prev->prev) + run_size(tp->prev)))






structs



A
A



struct3

tp->prev->prev->prev



A->struct3





B
B



struct2

tp->prev->prev



B->struct2





C
C



struct1

tp->prev



C->struct1





D
D



struct0

tp



D->struct0





struct0->struct1





struct1->struct2





struct2->struct3





怎麼決定要合併哪個 run:

  • run B 小於 run D:合併 run B 與 run C
  • 否則合併 run C 與 run D
    if (run_size(tp->prev->prev) < run_size(tp)) {
        tp->prev = merge_at(priv, cmp, tp->prev);
    } else {
        tp = merge_at(priv, cmp, tp);
    }

如果只有 2 個 run,且 run C 小於等於 run D,則合併兩者。

} else if (run_size(tp->prev) <= run_size(tp)) {
    tp = merge_at(priv, cmp, tp);
} else {
    break;
}
*merge_force_collapse

這個步驟會強制合併所有的 run,不須檢查要合併的 run 的長度和是否已經大於另一個 run。
注意,這個步驟會讓堆疊中剩下 2 個 run。

timsort

每次先找到一個 run,將這個 run 放入堆疊中,並將 list 指標移到剛剛找到的 run 的後面一個節點,然後觀察堆疊中最上面的 4 個 run,看需不需要合併,走訪完串列後,再將剩下的 run 合併。

do {
    /* Find next run */
    struct pair result = find_run(priv, list, cmp);
    result.head->prev = tp;
    tp = result.head;
    list = result.next;
    stk_size++;
    tp = merge_collapse(priv, cmp, tp);
} while (list);

優化方案

我發現在測驗題目中的 Galloping mode 方法並沒有實現在原本 timsort 的程式中,因此我選擇改優化這部分。但還沒開始實作就想到一個問題,程式是怎麼不用比較就找到要插入的位置的,所以這個想法先被擱置。

再來程式也沒有考慮到 minrun 的長度來調整每次找到的 run,目前想要改進這方面,但實作上遇到一點問題,正在調整中。

第二周測驗題

測驗一

程式碼解讀

用於處理 hash 數值碰撞

struct hlist_node {
    struct hlist_node *next, **pprev;
};

Hash table 中的頭節點,唯一代表一 hash 值

struct hlist_head {
    struct hlist_node *first;
}; 
hlist_add_head

運作流程如下一系列圖:設有一 hlist_node_new 節點要執行此函式。首先,若該列表是有節點而非 NULL 的,則調整 h->first->pprev 指標(如圖二);接著調整新節點之 *next*pprev 指標(如圖三);最後將頭節點之 *first 指向新節點(如圖四)即可。

static inline void hlist_add_head(struct hlist_node *n, struct hlist_head *h)
{
    if (h->first) 
        h->first->pprev = &n->next;
    n->next = h->first; //AAAA
    n->pprev = &h->first;
    h->first = n;
}






add_head



map

hash_table

 

 

hlist_head2.first

 

...



hn1

hlist_node1

pprev

next



map:ht2->hn1





hn1:prev->map:ht2





hn2

hlist_node2

pprev

next



hn1:next->hn2





hn2:prev->hn1:next





null
NULL



hn2:next->null





hn3

hlist_node_new

pprev

next









add_head



map

hash_table

 

 

hlist_head2.first

 

...



hn1

hlist_node1

pprev

next



map:ht2->hn1





hn2

hlist_node2

pprev

next



hn1:next->hn2





hn3

hlist_node_new

pprev

next



hn1:prev->hn3:next





hn2:prev->hn1:next





null
NULL



hn2:next->null











add_head



map

hash_table

 

 

hlist_head2.first

 

...



hn1

hlist_node1

pprev

next



map:ht2->hn1





hn2

hlist_node2

pprev

next



hn1:next->hn2





hn3

hlist_node_new

pprev

next



hn1:prev->hn3:next





hn2:prev->hn1:next





null
NULL



hn2:next->null





hn3:prev->map:ht2





hn3:next->hn1











add_head



map

hash_table

 

 

hlist_head2.first

 

...



hn3

hlist_node_new

pprev

next



map:ht2->hn3





hn1

hlist_node1

pprev

next



hn2

hlist_node2

pprev

next



hn1:next->hn2





hn1:prev->hn3:next





hn2:prev->hn1:next





null
NULL



hn2:next->null





hn3:prev->map:ht2





hn3:next->hn1





*dfs

pre_low/high 即該 *preorder 之左右範圍;in_low/high 同理,為 *inorder 之左右範圍。

要知道的是,此測驗一之程式碼即是要重建二元樹,且 dfs 存取順序實際上就是 preorder,所以會傳 preorder[pre_low](表中間節點) 進 find(),以尋找該節點在 inorder 中的位置索引 idx,找到後「切分左右子樹(tn->lefttn->right)」,分別再對 inorder 的 in_low ~ (idx-1)(idx+1) ~ in_high 作 dfs。

由 preorder 可知哪個節點在上面(越靠前的越上面)、而由 inorder 可知哪個節點靠左(越靠前的越左邊),於是可得知 preorder 的第一個元素一定是 root 的值,且該元素對應到 inorder 後,左邊的值就在左邊,右邊的值就在右邊。

接著就將 left 和 right 分別當作新的 root node 下去做即可。遞迴的中止條件是在 inorder 中找不到對應元素。

static struct TreeNode *dfs(int *preorder,
                            int pre_low,
                            int pre_high,
                            int *inorder,
                            int in_low,
                            int in_high,
                            struct hlist_head *in_heads,
                            int size)
{
    if (in_low > in_high || pre_low > pre_high)
        return NULL;

    struct TreeNode *tn = malloc(sizeof(*tn));
    tn->val = preorder[pre_low];
    int idx = find(preorder[pre_low], size, in_heads);
    tn->left = dfs(preorder, pre_low + 1, pre_low + (idx - in_low), inorder,
                   in_low, idx - 1, in_heads, size);
    tn->right = dfs(preorder, pre_high - (in_high - idx - 1), pre_high, inorder,
                    idx + 1, in_high, in_heads, size);
    return tn;
}
buildTree

建樹的程式碼。起初會先將 inorder 節點建立一 hash table 以方便 dfs() 函式的內部實作,之後再呼叫 dfs() 函式,遞迴求出該二元樹並回傳結果。

static struct TreeNode *buildTree(int *preorder,
                                  int preorderSize,
                                  int *inorder,
                                  int inorderSize)
{
    struct hlist_head *in_heads = malloc(inorderSize * sizeof(*in_heads));
    for (int i = 0; i < inorderSize; i++)
        INIT_HLIST_HEAD(&in_heads[i]);
    for (int i = 0; i < inorderSize; i++)
        node_add(inorder[i], i, inorderSize, in_heads);

    return dfs(preorder, 0, preorderSize - 1, inorder, 0, inorderSize - 1,
               in_heads, inorderSize);
}

測試程式

設定一 preorder 及 inorder 的序列,其會有一對應的樹,也會有對應的 postorder 序列。本測試程式即比較 buildTree 後的 postorder,與期望的 postorder 有無差別,若無,表示程式運作無誤。

嘗試改進

  • 用紅黑樹代替鏈表之儲存資料方式

用 Hash table 找特定資料的時間複雜度通常是

O(1),但在最差情況下(目標資料在鏈表尾端),就可能會達到
O(n)

因為碰撞發生的時候,要線性查找鏈表以找到目標資料。若用紅黑樹儲存的話,其可以保持樹的高度平衡,來保證在最壞情況下的查找時間複雜度降為
O(logn)

比起 Hash table,紅黑樹在處理碰撞時不用線性查找鏈表,而是透過比較節點值來定位目標資料,所以其在查找上的性能更為穩定,特別是在「處理大量資料」或「碰撞發生頻繁」的情況下,紅黑樹的優勢會更明顯。

探討 Pre-order Walk

參照 linux/kernel/cgroup/cgroup.c,發現 css_next_descendant_pre() 實作了 Pre-order Walk。

  • css_next_descendant_pre():找到下一個要進行 Preorder Traversal 的後代節點
  • @pos:當前位置
  • @root:要走訪其後代節點的 CSS
struct cgroup_subsys_state *
css_next_descendant_pre(struct cgroup_subsys_state *pos,
			struct cgroup_subsys_state *root)
{
	struct cgroup_subsys_state *next;

	cgroup_assert_mutex_or_rcu_locked();

	/* if first iteration, visit @root */
	if (!pos)
		return root;

	/* visit the first child if exists */
	next = css_next_child(NULL, pos);
	if (next)
		return next;

	/* no child, visit my or the closest ancestor's next sibling */
	while (pos != root) {
		next = css_next_child(pos, pos->parent);
		if (next)
			return next;
		pos = pos->parent;
	}

	return NULL;
}

尋找要訪問的下一個後代節點,以進行 @root ((含)第一個要訪問的節點)的後代節點的 Preorder Traversal。

雖然此函式需要 cgroup_mutex 或 RCU read locking,但不需要整個走訪過程都跑在 critical section 裡。 只要 @pos@root 可訪問,且 @pos@root 的後代節點,其就會回傳下一個後代節點。

因此,其用於實現對 cgroup 子系統狀態的一種很有效率的 Pre-access traversal。
且在「遞迴訪問 cgroup 的後代來走訪整個 cgroup 樹」的過程中,還保持對 cgroup_mutex 或 RCU read locking 的佔用,確保不會有多餘一個 process 跑進 critical section。

測驗二

程式碼解讀

hlist_del() 的操作邏輯如下系列圖(假設要刪除 node1),簡單來說就是刪除一個鏈表節點。







hlist_del



map

hash_table

 

 

hlist_head2.first

 

...



hn0

hlist_node0

pprev

next



map:ht2->hn0





hn0:prev->map:ht2





hn1

hlist_node1(n)

pprev

next



hn0:next->hn1





hn1:prev->hn0:next





hn2

hlist_node2

pprev

next



hn1:next->hn2





hn2:prev->hn1:next





null
NULL



hn2:next->null











hlist_del



map

hash_table

 

 

hlist_head2.first

 

...



hn0

hlist_node0

pprev

next



map:ht2->hn0





hn0:prev->map:ht2





hn1

hlist_node1(n)

pprev

next



hn0:next->hn1





hn1:prev->hn0:next





hn2

hlist_node2

pprev

next



hn1:next->hn2





hn2:prev->hn1:next





null
NULL



hn2:next->null





next
next



next->hn2





pprev
pprev



pprev->hn0:next











hlist_del



map

hash_table

 

 

hlist_head2.first

 

...



hn0

hlist_node0

pprev

next



map:ht2->hn0





hn0:prev->map:ht2





hn2

hlist_node2

pprev

next



hn0:next->hn2





hn1

hlist_node1(n)

pprev

next



hn1:prev->hn0:next





hn1:next->hn2





hn2:prev->hn0:next





null
NULL



hn2:next->null





next
next



next->hn2





pprev
pprev



pprev->hn0:next





LRU Cache 及 LRU Node 的結構如下:

  • capacity:該 LRU cache 「能紀錄的資料量」
  • count:當前該 LRU cache 紀錄的「資料筆數」
  • dhead:一 Doubly Linked List,將所有資料串接起來
  • hhead:處理碰撞使用的 hlist 結構體陣列
typedef struct {
    int capacity;
    int count;
    struct list_head dhead;
    struct hlist_head hhead[];
} LRUCache;

typedef struct {
    int key;
    int value;
    struct hlist_node node;
    struct list_head link;
} LRUNode;






LRUcache


cluster_a

LRUCache


cluster_b

LRUNode1


cluster_c

LRUNode2



struct0

capacity



struct1

count



struct2

dhead

prev

next



struct6

link

prev

next



struct2:next->struct6





struct3

hhead[0]

hhead[1]

...



struct7

node

pprev

next



struct3:head->struct7





struct4

key1



struct5

value1



struct6:prev->struct2





struct10

link

prev

next



struct6:next->struct10





struct7:prev->struct3:head





struct11

node

pprev

next



struct7:next->struct11





struct8

key2



struct9

value2



struct10:prev->struct6





null1
NULL



struct10:next->null1





struct11:prev->struct7





null2
NULL



struct11:next->null2





lRUCacheCreate

建立一 LRU Cache 並初始化其內部變數、結構,並指定其能紀錄的資料量。

lRUCacheFree

釋放該 Cache 的 dhead 串內,指定 LRU 節點的記憶體空間(list_head link)。

void lRUCacheFree(LRUCache *obj)
{
    struct list_head *pos, *n;
    list_for_each_safe (pos, n, &obj->dhead) {
        LRUNode *cache = list_entry(pos, LRUNode, link); //FFFF
        list_del(&cache->link);
        free(cache);
    }
    free(obj);
}
lRUCacheGet

其 hash 值是透過「key % 該 cache 之可記錄資料量」計算的。
取得 hash 值後,到該 hash 值對應的 hhead[hash] 走訪該鏈表,若有找到對應 key 值,則回傳該節點之 value;否則回傳 -1,表未有此 key 值。

從這裡可揣摩出使用 LRU Cache 的好處及意義。若 hash function 設計得當,找到對應資料需要的時間複雜度會很小;倘若只使用 dhead 雙向鏈結串列,走訪整條串列來找資料會花很久的時間,最差能到

O(n)

int lRUCacheGet(LRUCache *obj, int key)
{
    int hash = key % obj->capacity;
    struct hlist_node *pos;
    hlist_for_each (pos, &obj->hhead[hash]) {
        LRUNode *cache = list_entry(pos, LRUNode, node); //HHHH
        if (cache->key == key) {
            list_move(&cache->link, &obj->dhead); //IIII
            return cache->value;
        }
    }
    return -1;
}
lRUCachePut

先找看看對應 hash 值的 hhead[hash] 鏈表內有無對應 key 值之節點。若有,則將該節點「移到雙向鏈結串列的最前端」,表「最近使用過」,並指定該 cache 之 value 為此節點之值(更新其值);若無對應 key 值之節點,表「Cache Miss」,有兩種運作情況:

  • 快取已滿,須將「雙向鏈結串列 dhead 最尾端的節點(表 Least Recently Used 節點)」刪除,並將新節點插入到 hhead 的頭部,同時添加到 dhead 的前面。
  • 快取未滿,則創建一個新的節點,並將其添加到 hhead 的頭部,同時添加到 dhead 的最前端,更新快取的 key 值。

測試程式

撰寫一 checkLRU() 函式回傳 bool,搭配 assert() 以測試 dhead 中的首個元素,是否為最近互動過的 key 值節點。

嘗試改進

lRUCachePut():Cache Hit 時,除了將節點移動 dhead 最前端,對應 hash 值的hhead[hash]內的對應節點,也應該要移到最前端。

  if (c->key == key) {
      list_move(KKKK, &obj->dhead);
      cache = c;
+     hlist_del(&c->node);
+     hlist_add_head(&c->node, &obj->hhead[hash]);
  }

探討 Linux 的 LRU 相關機制

比較常用到的 page 會被放到 active 鏈表,不常用的 page 則放到 inactive 鏈表中。兩列表的 page 可相互轉移,若 active 的 page 越來越不常使用,其可能被轉放到 inactive 鏈表。

Linux 在作 Page 回收時,會先從 inactive 鏈表的尾部開始回收。

每個 zone 有 5 個 LRU 鏈表,用來放不同「最近使用狀態」的 pages,其中 INACTIVE_ANON、ACTIVE_ANON、INACTIVE_FILE 及 ACTIVE_FILE 中的 pages 是可以回收的。

enum lru_list {
    LRU_INACTIVE_ANON = LRU_BASE,
    LRU_ACTIVE_ANON = LRU_BASE + LRU_ACTIVE,
    LRU_INACTIVE_FILE = LRU_BASE + LRU_FILE,
    LRU_ACTIVE_FILE = LRU_BASE + LRU_FILE + LRU_ACTIVE,
    LRU_UNEVICTABLE,
    
    NR_LRU_LISTS
};

另外,為了評估 page 的常用程度,Kernel 引入了 PG_referencedPG_active 兩個 bits。其一表示「當前活躍程度」,其一表示「最近是否被引用過」。

圖片

Refer from Linux Kernel 內存回收機制

基本上有以下步驟:

  1. 如果 page 很常用,設定 PG_active bit,並將其放在 ACTIVE LRU 鏈表;反之在 INACTIVE。
  2. 每次存取 page 時,透過 mark_page_accessed() 來設定 PG_referenced bit。
  3. 使用 page_referenced() 函式,透過 PG_referenced bit 以及「由逆向映射提供的信息」來確定 page 常用程度(每次清除該 bit 時都會檢測)。
  4. 再次呼叫 mark_page_accessed()。此時,如果發現 PG_referenced bit 已被設定,表 page_referenced() 沒有作檢查,因此對於 mark_page_accessed() 的呼叫比 page_referenced() 更頻繁-表該 page 經常被存取。

    如果該 page 位於 INACTIVE 鏈表,將其移動到 ACTIVE,此外還會設定 PG_active bit,並清除 PG_referenced bit;

  5. 也有可能從 ACTIVE 移到 INACTIVE-在 page 比較不常用時,可能連續呼叫兩次 page_referenced() 而途中沒有 mark_page_accessed()

如果存取內存 page 的頻率穩定,表對 page_referenced()mark_page_accessed() 的呼叫大致上平衡,此時會把 page 放在「當前 LRU 鏈表」。
此動作確保了 page 不會在 ACTIVE 與 INACTIVE 鏈表間跳來跳去。

Page 回收的實現

主要有兩種觸發方式:

  • 內存嚴重不足。透過 try_to_free_pages() 函式來檢查內存區域的 pages。
  • 透過後台程序 kswapd 來觸發,其會週期性檢查內存夠不夠,若不足,則會觸發 page 回收機制。使用 balance_pgdat() 作為媒介。

參照 linux/mm/vmscan.c
當呼叫 try_to_free_pages() 時,其會重複呼叫 shrink_zones()shrink_slab() 來釋放一定數量的 pages(預設為 32 個)。而 shrink_zones() 會分別對內存區域列表中的所有區域呼叫 shrink_node() 函式-其即是「從內存中回收較少使用的 pages」的核心函式。

static void shrink_node(pg_data_t *pgdat, struct scan_control *sc) { unsigned long nr_reclaimed, nr_scanned, nr_node_reclaimed; struct lruvec *target_lruvec; bool reclaimable = false; if (lru_gen_enabled() && root_reclaim(sc)) { lru_gen_shrink_node(pgdat, sc); return; } target_lruvec = mem_cgroup_lruvec(sc->target_mem_cgroup, pgdat); again: memset(&sc->nr, 0, sizeof(sc->nr)); nr_reclaimed = sc->nr_reclaimed; nr_scanned = sc->nr_scanned; prepare_scan_control(pgdat, sc); shrink_node_memcgs(pgdat, sc); flush_reclaim_state(sc); nr_node_reclaimed = sc->nr_reclaimed - nr_reclaimed; if (!sc->proactive) vmpressure(sc->gfp_mask, sc->target_mem_cgroup, true, sc->nr_scanned - nr_scanned, nr_node_reclaimed); if (nr_node_reclaimed) reclaimable = true; if (current_is_kswapd()) { if (sc->nr.writeback && sc->nr.writeback == sc->nr.taken) set_bit(PGDAT_WRITEBACK, &pgdat->flags); if (sc->nr.unqueued_dirty == sc->nr.file_taken) set_bit(PGDAT_DIRTY, &pgdat->flags); if (sc->nr.immediate) reclaim_throttle(pgdat, VMSCAN_THROTTLE_WRITEBACK); } if (sc->nr.dirty && sc->nr.dirty == sc->nr.congested) { if (cgroup_reclaim(sc) && writeback_throttling_sane(sc)) set_bit(LRUVEC_CGROUP_CONGESTED, &target_lruvec->flags); if (current_is_kswapd()) set_bit(LRUVEC_NODE_CONGESTED, &target_lruvec->flags); } if (!current_is_kswapd() && current_may_throttle() && !sc->hibernation_mode && (test_bit(LRUVEC_CGROUP_CONGESTED, &target_lruvec->flags) || test_bit(LRUVEC_NODE_CONGESTED, &target_lruvec->flags))) reclaim_throttle(pgdat, VMSCAN_THROTTLE_CONGESTED); if (should_continue_reclaim(pgdat, nr_node_reclaimed, sc)) goto again; if (reclaimable) pgdat->kswapd_failures = 0; }

其第 22 行的 shrink_node_memcgs() 會呼叫 shrink_lruvec()shrink_slab() 等與 page 回收相關的函式,並會直/間接呼叫下列兩函式:

  • shrink_active_list(): 將某些 pages 移到 active 鏈表。
  • shrink_inactive_list():inactive 鏈表中選擇一定數量的 pages 放到一臨時鏈表中,最終呼叫 page_alloc.c 中定義的free_unref_page_list() 函式將其回收。

測驗三

程式碼理解

這些巨集定義了一些用來計算對應數字的二進制表示中「包含多少個位元值為 1」的函式,其可以針對 8-bit、16-bit、32-bit、64-bit 等數字去執行。

#define __const_hweight8(w)                                              \
    ((unsigned int) ((!!((w) & (1ULL << 0))) + (!!((w) & (1ULL << 1))) + \
                     (!!((w) & (1ULL << 2))) + (!!((w) & (1ULL << 3))) + \
                     (!!((w) & (1ULL << 4))) + (!!((w) & (1ULL << 5))) + \
                     (!!((w) & (1ULL << 6))) + (!!((w) & (1ULL << 7)))))

#define __const_hweight16(w) (__const_hweight8(w) + __const_hweight8((w) >> 8))
#define __const_hweight32(w) \
    (__const_hweight16(w) + __const_hweight16((w) >> 16))
#define __const_hweight64(w) \
    (__const_hweight32(w) + __const_hweight32((w) >> 32))
FIND_NTH_BIT

此巨集用來「在指定的位圖中,由左至右,查找第 n 個 set bit(即位元值為 1 的位元)所對應的索引」。

首先計算字中被設置的位元數目,然後比較以找到第 num 個被設置的位元。如果找到了,回傳該位元的索引值;否則,繼續往下個位元找。

__clear_bit

傳入一 addr,設定其「特定的位元」為 0。

static inline void __clear_bit(unsigned long nr, volatile unsigned long *addr)
{
    unsigned long mask = BIT_MASK(nr);
    unsigned long *p = ((unsigned long *) addr) + BIT_WORD(nr);

    *p &= ~mask;
}
fns

ffs() 不同的是,此函式目的是在 word 中尋找並回傳「第 n 個被設置過的位元(即位元 1)之索引值」。如果 word 中沒有位元被設置過,則回傳 BITS_PER_LONG

其用到 __ffs() 來找每次循環中「第一個被設置的位元」,然後將其索引位置存進 bit 變數中。一旦 n 的值為 0,則立即回傳當前的 bit 值。否則,用 __clear_bit() 清除該位元(成 0),然後 n 減少 1。反覆執行,直到找到第 n 個被設置的位元,最後回傳其索引位置。

舉例來說:假設 word 的二進位表示為 11001001,而我們想要找到第 2 個被設置的位元(即由右至左,找到第 2 個位元 1 的位置)。首先,__ffs() 先回傳 0,因為第一個被設置的位元在索引 0 上,然後清除此位元,繼續尋找下一個被設置的位元。照此邏輯,最後會回傳索引值 3

static inline unsigned long fns(unsigned long word, unsigned int n)
{
    while (word) {
        unsigned int bit = __ffs(word);
        if (n-- == 0)
            return bit;
        __clear_bit(bit, &word);
    }

    return BITS_PER_LONG;
}
find_nth_bit

此函式用來「在一個記憶體區域中找到第 n 個被設置的位元的索引位置」。

其會先檢查傳入的 size 是否小於等於機器字長(BITS_PER_LONG,透過 small_const_nbits(size) 檢查),並且大於 0。

  • 如果是,表示位元索引在 size 內,這時可以直接對記憶體中的位元進行處理,而不用跨字節-先用 GENMASK() 設定一個遮罩,把要處理的範圍遮出來;然後,將遮罩後的值傳進 fns() 來找第 n 個被設置的位元的索引位置。
  • 否則,需要在處理位元時跨越字節邊界。需呼叫 FIND_NTH_BIT(),來處理跨字節邊界的情況。

Linux 的 find_nth_bit() 應用案例之解說與分析

find_nth_bit() 通常用在處理器排程相關的操作中。透過 CPU mask 等技術實作相關位元操作,來表示程序的 CPU 指定,而要得知某程序中的 CPU 指定狀況,就有可能會需要用到 find_nth_bit() 的函式邏輯。

對該函式延伸搜尋,可知其在 Linux 原始程式碼中使用在 linux/kernel/trace/trace_events_filter.c 中的 do_filter_cpumask_scalar() 函式,而其又是以 cpumask_nth() 函式來呼叫 find_nth_bit()。其主要目的就是透過 find_nth_bit() 函式,來找尋 cpu mask 中的第 n 個 CPU 之位置。

對於 CPU mask 的概念與詳細理解,可參閱 Linux Kernel:CPU 狀態管理之 cpumask
利用 struct cpumask 這個 bitmap 來代表系統當中 CPU 的集合,每個 bit 代表一個 CPU,其中只有 nr_cpu_ids 個位元是有效的。

CPU Affinity

其又稱 Processor Affinity。
指在 Linux 中,process 想儘量長時間運行在某個指定的 CPU 上 ,且不被轉移至其他 CPU 的傾向。

在 Symmetric Multi-Processing 的架構下,Linux 排程器會根據 CPU Affinity 的設置,讓指定的程序運行在「指定」的 CPU 上,而不會在別的 CPU 上運行。

CPU Affinity 會用 bitmask 表示,每個位元表一個 CPU,若為 1,則表示「已指定」。
最低位元表第一個 CPU;最高位元則表最後一個 CPU。

舉個例子(用十六進制表示):
0x00000001 表指定處理器 1
0xFFFFFFFF 表指定所有處理器(0~31)

參照 linux/kernel/sched/core.c,其檔案內容與處理排程非常相關,其中有 sched_setaffinity() 函式也用到了 CPU mask 的概念,而這也是使用到 find_nth_bit() 的前置步驟,沒有處理器遮罩,就不會有相關位元操作來找 CPU 了。

可透過 sched_setaffinity 函式來指定 CPU Affinity。