Try   HackMD

2024q1 Homework2 (quiz1+2)

contributed by < Appmedia06 >

第一周測驗題

quiz1 測驗一

完整題目

測驗目標

結構體

鏈結串列結構體是由 next 指標連接下一個節點, leftright指標,而節點的內容為 long 型態。

typedef struct __node {
    struct __node *left, *right;
    struct __node *next;
    long value;
} node_t;






queue


cluster node1



cluster node2



cluster node3




node1_1

next



node2_1

next



node1_1->node2_1





node1_2

value



node1_3

left

right



node3_1

next



node2_1->node3_1





node2_2

value



node2_3

left

right



node3_2

value



node3_3

left

right



list_add

  • node_t 加到 list 的最前面。
void list_add(node_t **list, node_t *node_t)
{
    node_t->next = *list;
    *list = node_t;
}

先將 node_tnext 指標指向 list ,利用指標的指標 **list 去指向要加入的 node_t ,就可以使 node_t 成為第一個節點了。

list_tail

  • 回傳指向 left 最後一個節點的指標。
node_t *list_tail(node_t **left)
{
    while ((*left) && (*left)->next)
        left = &((*left)->next); // AAAA = (*left)->next
    return *left;
}

透過走訪鍊結串列來找到最後一個節點,在 while 條件的判斷中,一共有兩項:

  1. (*left) 不為 NULL: 排除掉 left 這個鍊結串列本身就是空的。
  2. (*left)->next 不為 NULL: 若 left 不是空的,將會檢查走訪的節點的下一個節點是否為空。若成立,現在的節點即為最後一個節點。

而透過指標的指標來走訪節點,因此將 AAAA 填入 (*left)->next 使 left 指向鍊結串列的下一個節點。

list_length

  • 回傳 left 鍊結串列的長度。
int list_length(node_t **left)
{
    int n = 0;
    while (*left) {
        ++n;
        left = &((*left)->next); // BBBB = (*left)->next
    }
    return n;
}

list_tail 類似,利用指標的指標 left 走訪整個鍊結串列,用整數 n 來計數有幾個節點並將其回傳。因此 BBBB 要填入 (*left)->next 使 left 指向鍊結串列的下一個節點。

list_construct

  • 建立並回傳一個節點 node ,其 valuen ,其 next 指向 list
node_t *list_construct(node_t *list, int n)
{
    node_t *node = malloc(sizeof(node_t));
    node->next = list;
    node->value = n;
    return node;
}

建立一個節點需先配置 node_t 結構體大小的空間,接著將其 next 指標指向 list ,在把 n 賦值給其節點的 value,最後回傳回去。

list_free

  • 將 indirect pointer list 所指向的鍊結串列的每個節點都釋放掉。
void list_free(node_t **list)
{
    node_t *node = (*list)->next;
    while (*list) {
        free(*list);
        *list = node;
        if (node)
            node = node->next;
    }
}

宣告一個節點 node 用來當作走訪節點指標。在 while 條件判斷裡,如果 *list 不為空則繼續走訪,進入迴圈後釋放掉 **list 所指向的節點,再把 **list 指向下一個節點。

list_is_ordered

  • 檢查 list 鍊結串列是否為排序過的。
/* Verify if list is order */
static bool list_is_ordered(node_t *list)
{       
    bool first = true;
    int value;
    while (list) {
        if (first) {
            value = list->value;
            first = false;
        } else {
            if (list->value < value)
                return false;
            value = list->value;
        }
        list = list->next;
    }
    return true;
}

一樣是透過走訪每個節點來比較 value 的大小,若是第一次進入迴圈,則會直接取得第一個節點的 value 。若不是第一次,則比較上一個節點有無比現在的節點小,若有則走訪下一個節點,若無,則代表其不是排序好的,回傳 false 。

shuffle

  • 將陣列 array 重新洗牌,其中陣列大小 n 必須要比 RAND_MAX 小。

  • C99 [7.20.2.1] The rand function

    The value of the RAND_MAX macro shall be at least 32767.

/* shuffle array, only work if n < RAND_MAX */
void shuffle(int *array, size_t n)
{
    if (n <= 0)
        return;

    for (size_t i = 0; i < n - 1; i++) {
        size_t j = i + rand() / (RAND_MAX / (n - i) + 1);
        int t = array[j];
        array[j] = array[i];
        array[i] = t;
    }
}

若是陣列一開始就是空的則直接回傳回去。迴圈從陣列的第一個元素開始,直到倒數第二個元素為止。迴圈的目的是將每個元素與隨機選擇的另一個元素交換位置。

在每次迭代中,使用 rand() 函式生成一個隨機數,然後將其與 (n - i) 取商。這樣可以確保生成的隨機數在範圍

[0,ni1] 內,然後加上 i,得到在範圍
[i,n1]
內的一個隨機數 j。

最後將 第 i 個元素和 第 j 個元素做交換,就完成一次洗牌了。

quick_sort

  • 實作非遞迴版本的 quick sort

複習 quick sort 怎麼實作:
先找一個基準點。分派兩個代理人,一個從最前面往後找比基準點大的值,另一個從最後面往前找比基準點小的值。若找到了就停下,等到兩邊都找到後就把兩個代理人做交換,並繼續找。兩個代理人若是碰到了就在碰到的那個點和基準點交換,接著以基準點的位置切成兩半,左右兩邊繼續上述操作(左右兩邊就可以使用遞迴來實作)。

而這次要實作的是非遞迴的版本,用 beginend 作為堆疊 (stack) 來替換遞迴的深度,利用變數 i 表示要操作的位置。而堆疊的大小為 max_level ,設置為

2×n ,其中 n 為鍊結串列的長度。

int n = list_length(list);
int value;
int i = 0; // stack index
int max_level = 2 * n;
node_t *begin[max_level], *end[max_level];
node_t *result = NULL, *left = NULL, *right = NULL;

這邊我用一個圖解的例子來說明,有一個串列為 [5, 4, 7, 1, 8, 2] 。







G



NULL
NULL



5

5



4

4



5->4





7

7



4->7





1

1



7->1





8

8



1->8





2

2



8->2





2->NULL





先將 beginend 的最底層分別放上串列的第一個和最後一個節點。

begin[0] = *list;
end[0] = list_tail(list);

接著就進入 while 迴圈,迴圈的中止條件為 i < 0 ,也就是說當堆疊模擬完遞迴的操作並將排序的結果放入 result 後中止,詳細的部份以下說明。

宣告了 LR 分別在每次迴圈時抓 beginend 的值,接著判斷這兩個節點是否相同,若是不同的代表需要做分派代理人的操作。將基準點設置為 L (最前面的節點) , value 為基準點的值,將基準點斷開,另一個指標 p 來走訪串列。

while (i >= 0) {
    node_t *L = begin[i], *R = end[i];
    if (L != R) {
        node_t *pivot = L;
        value = pivot->value;
        node_t *p = pivot->next;
        pivot->next = NULL;






G



NULL
NULL



pivot
pivot



5

5



pivot->5





L
L



L->5:n





R
R



2

2



R->2





p
p



4

4



p->4





7

7



4->7





1

1



7->1





8

8



1->8





8->2





2->NULL





透過 p 走訪串列每個節點,若其節點值小於基準點的值,則放入 left前面 (list_add) ,若其節點值大於基準點的值,則放入 right前面 (list_add) 。因此 CCCC 放入 p->next 來走訪節點。

while (p) {
    node_t *n = p;
    p = p->next; // CCCC = p->next
    list_add(n->value > value ? &right : &left, n);
}






G



NULL1
NULL



NULL2
NULL



NULL3
NULL



left
left



2

2



left->2





right
right



8

8



right->8





pivot
pivot



5

5



pivot->5





5->NULL1





1

1



2->1





4

4



1->4





4->NULL2





7

7



8->7





7->NULL3





分割後,要將其 push 到堆疊裡面,begin 從下到上分別放入, left, pivot, right 的第一個節點,而 end 則是反過來,從下到上分別放入, left, pivot, right 的最後一個節點。

begin[i] = left;
end[i] = list_tail(&left); // DDDD = list_tail(&left)
begin[i + 1] = pivot;
end[i + 1] = pivot;
begin[i + 2] = right;
end[i + 2] = list_tail(&right); // EEEE = list_tail(&right)

將節點加入堆疊後就如下圖所示, 0, 1, 2 分別是 left, pivot, right

stack1.drawio

leftright 重新設置為 NULL ,並將 i 加二。

left = right = NULL;
i += 2;

這樣第一輪迴圈就結束了,第二輪迴圈中, i 為 2,因此 LR 抓的節點為 begin[2](8) 和 end[2](7)







G



NULL1
NULL



NULL2
NULL



pivot
pivot



8

8



pivot->8





L
L



L->8





R
R



7

7



R->7





p
p



p->7





8->NULL1





7->NULL2





因為 7 小於基準點(pivot)的 8 ,因此放入 left







G



NULL1
NULL



NULL2
NULL



left
left



7

7



left->7





right
right



right->NULL2





7->NULL1





接著 push 進堆疊裡。注意到是從 i = 2 開始加入,也就是說它會蓋掉原本 i = 2 的部份。

stack2.drawio

進入第三次迴圈, i = 4 。此時發現 begin[4] = end[4] = NULL 。又因為 L 為 NULL ,因此不動作,i 自減一。

} else {
    if (L)
        list_add(&result, L);
    i--;
}

進入第四次迴圈, i = 3 ,發現 begin[3] = end[3] = 8 ,根據上面程式碼,將 8 加入 result 的最前面。 i 自減一。







G



NULL1
NULL



result
result



8

8



result->8





8->NULL1





進入第五次迴圈, i = 2 ,發現 begin[2] = end[2] = 7 ,根據上面程式碼,將 7 加入 result 的最前面。 i 自減一。







G



NULL1
NULL



result
result



7

7



result->7





8

8



7->8





8->NULL1





進入第六次迴圈, i = 1 ,發現 begin[1] = end[1] = 5 ,根據上面程式碼,將 5 加入 result 的最前面。 i 自減一。







G



NULL1
NULL



result
result



5

5



result->5





7

7



5->7





8

8



7->8





8->NULL1





進入第七次迴圈, i = 0 , LR 分別為 2 和 4 ,繼續拿 2 當基準點,用 p 走訪剩餘節點。







G



NULL1
NULL



NULL2
NULL



pivot
pivot



2

2



pivot->2





L
L



L->2





R
R



4

4



R->4





p
p



1

1



p->1





2->NULL1





1->4





4->NULL2





因為 1 小於基準點(2) 放入 left ,而 4 大於基準點(2) 放入 right







G



NULL1
NULL



NULL2
NULL



left
left



1

1



left->1





right
right



4

4



right->4





1->NULL1





4->NULL2





接著一樣 push 進堆疊裡。注意是從 i = 0 開始放。

stack3.drawio

進入第八次迴圈, i = 2 ,發現 begin[2] = end[2] = 4 ,根據上面程式碼,將 4 加入 result 的最前面。 i 自減一。







G



NULL1
NULL



result
result



4

4



result->4





5

5



4->5





7

7



5->7





8

8



7->8





8->NULL1





進入第九次迴圈, i = 1 ,發現 begin[1] = end[1] = 2 ,根據上面程式碼,將 2 加入 result 的最前面。 i 自減一。







G



NULL1
NULL



result
result



2

2



result->2





4

4



2->4





5

5



4->5





7

7



5->7





8

8



7->8





8->NULL1





進入第十次迴圈, i = 0 ,發現 begin[0] = end[0] = 1 ,根據上面程式碼,將 1 加入 result 的最前面。 i 自減一。







G



NULL1
NULL



result
result



1

1



result->1





2

2



1->2





4

4



2->4





5

5



4->5





7

7



5->7





8

8



7->8





8->NULL1





此時 i = -1 ,小於 0 所以出迴圈,將 indirect pointer list 指向排序好的串列 result

*list = result;

使用 gcc 編譯器編譯,會產生指定輸出的執行檔名稱。

$ gcc -o quickSort quickSort.c
$ ./quickSort

避免最差狀況的快速排序實作

  • 時間複雜度分析

    • Best case: 時間複雜度為
      O(nlogn)
    • Worse case: 當取的基準點(pivot) 為數列中最大或最小的值時,會導致在做切割時切的不平均。導致時間複雜度為
      O(n2)
  • Middle-of-Three
    1. 令 middle = (left + right) / 2
    2. 找出 list[left] , list[middle] , list[right] 的中間值,將其和 list[left] 交換
    3. 取新的 list[left] 作為 pivot

如果 pivot 的位置恰好在 n 筆資料中的

n/10
9n/10
之間,則可以收斂到時間複雜度為
O(nlogn)

  • Randomized

取串列中隨機一筆資料做為 pivot ,以亂數做有機會降低最差情況的發生,但平均情況仍落在時間複雜度為

O(nlogn)

void rand_pivot(node_t *list)
{  
   if (!list)
       return;

   int rand_index = rand() % list_length(&list);
   
   node_t *iter = list;
   while (iter && iter->next) {
       if (rand_index == 0) {
            break;
       }
       iter = iter->next;
       rand_index--;
   }
   swap_node(list, iter);
}
  • Middle-of-Three + Randomized

本題我嘗試將 Middle-of-Three 以及 Randomized 結合,希望效果有所提昇。

  1. 隨機取 3 個亂數,找到串列中對應的節點
  2. 將這三個節點中,第二大(中間大小)的節點作為 pivot

實作細節

  • rand_pivot 不同,用大小為 3 的陣列 rand_index 儲存 3 個亂數,並用 selection sort 排為升序。
  • 為了方便走訪節點,用 delete_remain 函式將亂數陣列每個元素都減去前面數值的累加。
    • 舉例來說,rand_index=[1, 3, 7] -> [1, 2, 3] ,這樣就可以取得第 1, 3, 7 個節點。
    ​​​​void delete_remain(int *arr, int n)
    ​​​​{   
    ​​​​    int value = arr[0], tmp;
    ​​​​    for (int i = 1; i < n; i++) {
    ​​​​        tmp = arr[i];
    ​​​​        arr[i] -= value;
    ​​​​        value += tmp;
    ​​​​    }
    ​​​​}
    
  • 當走訪到對應的節點時,配置一個新的節點插入 tmp ,走訪結束後 tmp 就會有 3 個隨機的節點,再對其排序後,取首個節點的下一個,也就是第二個節點(中間大小節點),在和原本的 list 的首個節點互換。
void rand_threeMedian_pivot(node_t *list)
{
    if (!list)
        return;
    
    int rand_index[3] = {0};
    int len = list_length(&list), i;
    for (i = 0; i < 3; i++) {
        rand_index[i] = rand() % len;
    }
    select_sort(rand_index);
    delete_remain(rand_index, 3);
    
    node_t *iter = list, *tmp = NULL;
    i = 0;
    while (iter && iter->next) {
        if (rand_index[i] <= 0) {
           node_t *new_node = (node_t*) malloc(sizeof(*new_node));
           new_node->value = iter->value;
           list_add(&tmp, new_node);
           i += 1;
           if (i >= 3) {
               break;
           }
        }
        iter = iter->next;
        rand_index[i]--;
    }
    
    select_sort_list(tmp);
    node_t *rand_node = tmp->next;
    swap_node(list, rand_node);
}

Perf 效能比較

針對選取 pivot 不同的策略進行實驗。分別測試上述 3 種策略:

  1. 選取第一個節點
  2. 隨機選取一個節點
  3. 隨機選取 3 個節點再取其中間的節點

其中,又分成一般情況(正常亂度)以及最差情況(已排序),利用 Perf 工具來偵測 time、cache-misses、cache-references、instructions、cycles 。

要得到 perf 的權限,可以先透過以下指令來設置 perf 的最高權限。

$ sudo sh -c "echo -1 > /proc/sys/kernel/perf_event_paranoid"   

在每次開始測量之前,要先清除掉快取的資料。

$ echo 3 | sudo tee /proc/sys/vm/drop_caches
  • 一般情況指令
$ perf stat --repeat 5 -e cache-misses,cache-references,instructions,cycles ./quickSort_non_rand
$ perf stat --repeat 5 -e cache-misses,cache-references,instructions,cycles ./quickSort_rand_pivot
$ perf stat --repeat 5 -e cache-misses,cache-references,instructions,cycles ./quickSort_rand_threeMedian_pivot
  • 最差情況
$ perf stat --repeat 5 -e cache-misses,cache-references,instructions,cycles ./sorted_non_rand
$ perf stat --repeat 5 -e cache-misses,cache-references,instructions,cycles ./sorted_rand_pivot
$ perf stat --repeat 5 -e cache-misses,cache-references,instructions,cycles ./sorted_rand_threeMedian_pivot

一般情況, 100000 筆資料

  1. 選取第一個節點
 Performance counter stats for './quickSort_non_rand' (5 runs):

           146,774      cache-misses                     #    5.31% of all cache refs           ( +-  5.79% )
         2,763,461      cache-references                                                        ( +-  0.16% )
       158,983,681      instructions                     #    1.26  insn per cycle              ( +-  0.01% )
       125,918,787      cycles                                                                  ( +-  0.44% )

           0.03153 +- 0.00101 seconds time elapsed  ( +-  3.20% )
  1. 隨機選取一個節點
 Performance counter stats for './quickSort_rand_pivot' (5 runs):

           132,365      cache-misses                     #   36.52% of all cache refs           ( +-  7.79% )
           362,416      cache-references                                                        ( +-  1.33% )
        54,034,665      instructions                     #    2.03  insn per cycle              ( +-  0.45% )
        26,577,364      cycles                                                                  ( +-  1.37% )

           0.01125 +- 0.00219 seconds time elapsed  ( +- 19.49% )
  1. 隨機選取 3 個節點再取其中間的節點
 Performance counter stats for './quickSort_rand_threeMedian_pivot' (5 runs):

           143,014      cache-misses                     #   45.15% of all cache refs           ( +-  4.31% )
           316,758      cache-references                                                        ( +-  0.54% )
        50,920,766      instructions                     #    2.13  insn per cycle              ( +-  0.20% )
        23,935,507      cycles                                                                  ( +-  1.15% )

          0.009509 +- 0.000436 seconds time elapsed  ( +-  4.58% )

最差情況, 10000 筆資料

  1. 選取第一個節點
 Performance counter stats for './sorted_non_rand' (5 runs):

            21,318      cache-misses                     #   42.98% of all cache refs           ( +-  4.31% )
            49,600      cache-references                                                        ( +-  5.48% )
     2,986,624,736      instructions                     #    2.25  insn per cycle              ( +-  0.00% )
     1,330,053,874      cycles                                                                  ( +-  0.36% )

           0.31873 +- 0.00126 seconds time elapsed  ( +-  0.39% )
  1. 隨機選取一個節點
 Performance counter stats for './sorted_rand_pivot' (5 runs):

            17,998      cache-misses                     #   41.93% of all cache refs           ( +-  9.33% )
            42,922      cache-references                                                        ( +-  3.49% )
       808,919,829      instructions                     #    1.68  insn per cycle              ( +-  0.00% )
       480,738,978      cycles                                                                  ( +-  0.50% )

           0.11808 +- 0.00311 seconds time elapsed  ( +-  2.64% )
  1. 隨機選取 3 個節點再取其中間的節點
 Performance counter stats for './sorted_rand_threeMedian_pivot' (5 runs):

            17,693      cache-misses                     #   41.19% of all cache refs           ( +- 15.01% )
            42,952      cache-references                                                        ( +-  3.52% )
       808,930,894      instructions                     #    1.69  insn per cycle              ( +-  0.00% )
       477,776,040      cycles                                                                  ( +-  0.48% )

           0.11707 +- 0.00252 seconds time elapsed  ( +-  2.15% )

效能分析圖

Figure_1

可以看到 rand_pivotrand_threeMedian_pivot 的效能在最差情況時發揮很大的作用,避免掉了分割不好的問題,而 rand_threeMedian_pivot 只比 rand_pivot 好了一點,可能是程式流程不夠簡潔,未來希望可以嘗試改進。

quiz1 測驗2

完整題目

測驗目標:

  • 解釋並實作 Timsort
  • 在 lab0-c 實作 Timsort

Timsort 演算方法

以前在演算法學過許多時間複雜度為

O(nlogn) 的排序算法,例如快速排序法(Quick Sort)、合併排序法(Merge Sort)、堆疊排序法(Heap Sort)。但會因為使用的情境不同而導致時間複雜度無法達到完美的
O(nlogn)

因此, Tim Peter 結合合併排序法和插入排序法的特點,提出 Timsort 。 Tim Peter 注意到,在現實世界中,許多資料往往是由一些已經部分排序的序列組合而成。因此可以將 Timsort 的特點列舉出來。

  1. 在長度不長的數列中,插入排序法的效率很高
  2. 將數列切分為有
    minrun
    大小的
    run
  3. 提出 Galloping mode ,以提升合併的效率

分割

Tim Peter 注意到,在現實世界中,許多資料往往是由一些已經部分排序的序列組合而成。

首先,要如何分割數列成好幾個 run ,我們可以從數列的左到右開始尋找非嚴格遞增或是嚴格遞減(為了符合 stable )的序列來分割。以下舉例:

{3,4,5,6,4,2,1,2,7,10}

分割之後變成 3 個 run 。

{3,4,5,6},{4,2},{1,2,7,10}

為了合併時不要有太多步驟,必須要設置一個

minrun 作為一個 run 最小的大小,假設我們這邊設 minrun = 3 ,則上面第二個 run 舊部符合規定,因此修改為:

{3,4,5,6},{4,2,1},{2,7,10}

如果切完後長度大於 minrun ,那就繼續往下切,如果長度小於 minrun,就必須補齊長度到 minrun,之後的步驟就是將遞減的序列翻轉,若是 run 有被補元素進去,那就用 Insertion sort 將該個 run 排序好,像是上面的例子。

{3,4,5,6},{1,2,4},{2,7,10}

而該如何選擇 minrun 的大小呢,合併序列時,若 run 的數量等於或者略小於 2 的冪,效率會最高;若略大於 2 的冪,效率就會特別低。Timsort 為了使合併過程更加均衡,需要儘量控制每個 run 的長度,定義一個 minrun (最小運行長度),用以表示每個 run 的最小長度,若長度太短,就用二分插入排序,將 run 後面的元素插入到前面的 run 裡面。我們可以整理出選擇 minrun 需要達成的條件。

  1. minrun 必須小於 64 ,因為大於 64 的 Insertion sort 效率會變差。
  2. 盡可能的平均分配,不能有全部都剛好是 minrun 而最後一個只剩下很少元素的情況發生。
  3. 盡可能將切割完的 run 的數量控制在二的幂或略小於二的幂,方便待會的 Merge sort

因此 Tim 提出將數列的大小轉成二進位,取前 6 位 (

26=64) 的數值
N
,若是後面還有數的話,就將
N+1
作為 minrun 的大小。舉例來說,數列長度
N
為 2003 ,二進位為 111,1101,0011 ,前六位就是 111110 ,換成十進位就是 62 ,而因為後面還有數值,因此
minrun=62+1=63
。就可以分割為 32 個 run ,就發現完美的符合上述 3 種條件了。

合併

Timsort 採用一組堆疊 (stack) 來暫存 run,此舉是為了避免在一開始就掃描整個資料範圍來產生 run,從而降低記憶體開銷。過程中,Timsort 運用 merge_collapse 函式來確保堆疊上的 run 長度保持平衡。該函式主要檢查堆疊頂端的 3 個 run 是否滿足以下原則:

  1. RunA>RunB+RunC
  2. RunB>RunC

當不符合這些條件時,函式會決定是否進行合併。例如,考慮 3 段 run: A 的長度為 30,B 的長度為 20,C 的長度為 10,由於 A 的長度不大於 B 加 C 的總和(亦即

A<=B+C),且 C 的長度小於 A,因此會選擇將 C 與 B 進行合併,形成兩個新的 run

  • A: 30
  • BC: 30

藉此,不僅確保堆疊中 run 的長度平衡,且因合併只能在相鄰的兩個 run 間進行,以確保排序的穩定性,因此合併操作僅可能採取 A+(B+C) 或 (A+B)+C 的形式進行。此舉讓 Timsort 在執行時兼顧高效又穩定。

處理兩個相鄰子數列的合併過程中,直接在記憶體中操作會遇到一定的挑戰,因為大量的記憶體操作不僅實作困難,且效率也未必理想。因此,Timsort 採取了使用一塊臨時記憶區的策略,其大小設定為兩個子數列(A、B)中較短者的長度。

當 A 的長度小於 B 時,會先將 A 數列暫存。直觀的合併方法是從 A 和 B 的開頭開始進行逐對比較,將較小的元素放置於 A 的原位置,這一過程一直持續到 A 和 B 完全排序,類似於經典的合併排序類似,也稱為逐對合併(one-pair-at-a-time)。如下圖所示。

280px-One-one_merging_timsort.svg

Galloping mode

Tim Peter 注意到,在現實世界中,許多資料往往是由一些已經部分排序的序列組合而成。

對於兩個要合併的數列,若都是已排序的狀態,則會多出非常多次無意義的比較,例如

{1,2,3,...,100}
{201,202,203,...,300}
就需要 100 次的比較,因此 Tim 提出 Galloping ,就是只比二的幂的數,比完之後再用 binary search(前面已經排序好了) 找到那個對的位子,這樣總共比較的次數可以壓低到
O(logn)

實際上就是,首先尋找 B 的首個元素(即 )在 A 中的排序位置,從而確定 A 中有一段是小於 者,可將這部分元素放回 A。接著,尋找剩餘 A 的首個元素在 B 中的位置,如此反覆進行,直到完成排序。以下例子說明。

  • A={1,2,3,7,8,9}
  • B={4,5,6,8,9,10,11}

因為 A 比較短,因此先將 A 放入暫存記憶區 temp 。接著找到

B0 在 A 中的位置,即
A2
A3
之間,因
B0>A2
,所以
A0
A2
可以直接放回 A 數列。然後,將 temp 的首個元素放到
B
中適當的位置,如此反覆進行。

但在有些時候數列可能是亂的,而是 linear search 是比 Galloping mode 更有效率,因此在當兩個序列比較到第七次都是由同一個序列取值時,才啟動 Galloping mode。

Timsort 總結

  • 因材施教:Insertion Sort 在面對小塊資料時效能表現出色,而 Merge Sort 在面對處理大規模數據時效率更高。TimSort 巧妙地結合了這兩種算法,根據數據的特性自動切換,發揮它們各自的長處。
  • 保證效率:無論在最佳、平均還是最壞情況下,TimSort 的時間複雜度都能保證在
    O(nlogn)
    的範圍內。這使得它成為一種高效且可靠的排序算法。
  • 穩定性:TimSort 是一種穩定的排序算法,這意味著相等元素的相對位置在排序前後不會改變。這一特性在某些應用場景中非常重要。
  • 適應性:TimSort 能夠根據數據的分佈特點自動調整策略。當面對部分已經排序的數據時,它能夠充分利用數據的有序性,從而獲得更好的性能表現。

Timsort 實作

  1. 初始化和單向鏈表轉換
  • 初始化堆疊大小為 0。
  • 檢查串列是否只有一個元素,如果是則直接返回。
  • 將雙向串列轉換為以 NULL 結尾的單向串列。
  1. 尋找和合併 run
  • 使用 find_run 函數尋找下一個 run,並將其頭指向 tp
  • 更新 list 指向下一個節點,並增加 stk_size
  • 使用 merge_collapse 函數合併必要的 run。
  1. 強制合併所有 run
  • 當輸入處理完畢後,使用 merge_force_collapse 函數合併所有剩餘的 run。
  1. 最終合併和重建 prev 指針
  • 確保 stk0stk1 指向適當的合併起點。
  • 如果只有一個 run,直接重建 prev 鏈接。
  • 使用 merge_final 函數合併兩個部分,完成排序。
void timsort(void *priv, struct list_head *head, list_cmp_func_t cmp)
{
    stk_size = 0;
    
    struct list_head *list = head->next, *tp = NULL;
    if (head == head->prev)
        return;
    
    /* Convert to a null-terminated singly-linked list. */
    head->prev->next = NULL;

    do {
        /* Find next run */
        struct pair result = find_run(priv, list, cmp);
        result.head->prev = tp;
        tp = result.head;
        list = result.next;
        stk_size++;
        tp = merge_collapse(priv, cmp, tp);
    } while (list);
    
    /* End of input; merge together all the runs. */
    tp = merge_force_collapse(priv, cmp, tp);
    
    /* The final merge; rebuild prev links */
    struct list_head *stk0 = tp, *stk1 = stk0->prev;
    while (stk1 && stk1->prev)
        stk0 = stk0->prev, stk1 = stk1->prev;
    if (stk_size <= 1) { // FFFF = 1
        build_prev_link(head, head, stk0);
        return;
    }
    merge_final(priv, cmp, head, stk1, stk0);
}

Merge

將兩個以排序的鍊結串列 a 和 b 合併成一個已排序鍊結串列。這邊的實作和 Linux kernel 裡的 Merge sort 的操作一樣,參考我之前對Linux 核心原始程式碼的 lib/list_sort.c 的筆記。

研讀 Linux 核心原始程式碼的 lib/list_sort.c 筆記

static struct list_head *merge(void *priv,
                               list_cmp_func_t cmp,
                               struct list_head *a,
                               struct list_head *b)
{
    struct list_head *head;
    struct list_head **tail = &head; // AAAA = &head

    for (;;) {
        /* if equal, take 'a' -- important for sort stability */
        if (cmp(priv, a, b) <= 0) {
            *tail = a;
            tail = &a->next; // BBBB = &a->next
            a = a->next;
            if (!a) {
                *tail = b;
                break;
            }
        } else {
            *tail = b;
            tail = &b->next; // CCCC = &b->next
            b = b->next;
            if (!b) {
                *tail = a;
                break;
            }
        }
    }
    return head;
}

在排序後,需要將單向的串列轉回成雙向的串列,也就是將串列的 prev 接回。最後再將串列變回環狀的,因此將頭尾相連。

static void build_prev_link(struct list_head *head,
                            struct list_head *tail,
                            struct list_head *list)
{
    tail->next = list;
    do {
        list->prev = tail;
        tail = list;
        list = list->next;
    } while (list);

    /* The final links to make a circular doubly-linked list */
    tail->next = head; // DDDD = tail->next
    head->prev = tail; // EEEE = head->prev

在 lab0-c 實作 Timsort

2024q1 Homework1 (lab0) - Timsort

GCC 相關議題

在本題中,先用 timsort.c 實作了 timsort 的函式,並在 sort_impl.h 定義了這個函式,接著在 main.c 裡呼叫 timsort 函式。但當我想用 gcc 編譯時,出現了以下錯誤:

timsort.c:193:6: note: previous definition of ‘timsort’ with type ‘void(void *, struct list_head *, int (*)(void *, const struct list_head *, const struct list_head *))

原來是使用 gcc 編譯時,若是有使用到函式是由外部引入時,需要將其程式一起編譯。

-$ gcc -o main main.c
+$ gcc -o main main.c timsort.c

參考

Timsort
M02: quiz1+2
最貼近現實的排序演算法- Timsort
TimSort - Python 內建排序法

第二周測驗題

quit2 測驗一

完整題目

測驗目標:

  • 解釋並改進 Binary Tree
  • 在 Linux 核心的 cgroups 相關原始程式碼,找出 pre-order walk 程式碼並探討

題目運作

  • 給定二個由整數構成陣列的前序 (preorder) 和中序 (inorder) 形式,分別是對同一棵二元樹的前序及中序走訪結果,藉此重建此二元樹。
  • 由前序可知哪個節點在上面 (越前面的越上面)、而由中序可知哪個節點靠左(越前面的越左邊)。
  • 在前序中找到第一個節點,也就是根節點。接著在中序中找到根節點,即可知其左邊的元素即為左子樹,右邊的元素即為右子樹。

hlist

在 Linux Kernel 裡實作的 Hash table 的結構體。由一個指向下一個節點的指標 next ,以及指向前一個節點的指標的指標。

struct hlist_node {
    struct hlist_node *next, **pprev;
};
struct hlist_head {
    struct hlist_node *first;
};

prev 若是使用指標,則它會指向節點本身,也就是說第一個節點的 prev 會指向 NULL,而使我們在一些串列操作是需事先判斷是否為第一個節點,然後去更改 head 的指標。

但若使用指標的指標,而是指向指著自身節點的指標 (指向上一個節點的 next 指標)。也就是說第一個節點可以指向 head 進而消處了上述的特例情況。







G



list_head

hlist_head

first



node_1

hlist_node_1

pprev

next




list_head:n->node_1:m





node_1:p->list_head:n





node_2

hlist_node_2

pprev

next




node_1:n->node_2:m





node_2:p->node_1:n





node_3

hlist_node_3

pprev

next




node_2:n->node_3:m





node_3:p->node_2:n





NULL
NULL



node_3:n->NULL





hlist 是專門為了雜湊表的結構體,相較於 list_head 前後都有兩個指標,當 bucket 很大時,會增加記憶體開銷。而 hlist_head 僅需要保存一個指標 next

hlist_add_head

將節點 n 加到第一個節點,雜湊表中已經存在節點。此時,需要將現有的頭節點的 pprev 指向新的節點 nnext。接著將指標接上即可。

static inline void hlist_add_head(struct hlist_node *n, struct hlist_head *h)
{
    if (h->first)
        h->first->pprev = &n->next;
    n->next = h->first; // AAAA = h->first
    n->pprev = &h->first;
    h->first = n;
}

Treenode & order_node

struct TreeNode {
    int val;
    struct TreeNode *left, *right;
};

struct order_node {
    struct hlist_node node;
    int val;
    int idx;
};

find

在雜湊表中找指定的節點。先計算雜湊值來確定要找的雜湊桶,走訪其節點並找出我們要的節點,最後返回索引值。若沒找到,則回傳 -1 。

static int find(int num, int size, const struct hlist_head *heads)
{
    struct hlist_node *p;
    int hash = (num < 0 ? -num : num) % size;
    hlist_for_each (p, &heads[hash]) { // BBBB = &heads[hash]
        struct order_node *on = container_of(p, struct order_node, node); // CCCC = container_of
        if (num == on->val)
            return on->idx;
    }
    return -1;
}

使用 container_of 用來通過結構體成員的指標獲取包含該成員的結構體的指標。

#define container_of(ptr, type, member) \
    ((type *) ((char *) (ptr) - (size_t) & (((type *) 0)->member)))

node_add

將一個帶有值和索引的新節點插入到雜湊表中。插入過程中,先計算值的雜湊值,以確定節點應該插入的位置,然後使用 hlist_add_head 函式將節點添加到雜湊桶的 head。

static inline void node_add(int val,
                            int idx,
                            int size,
                            struct hlist_head *heads)
{
    struct order_node *on = malloc(sizeof(*on));
    on->val = val;
    on->idx = idx;
    int hash = (val < 0 ? -val : val) % size;
    hlist_add_head(&on->node, &heads[hash]); // DDDD = &heads[hash]
}

結構圖如下:







G


cluster_2

hash_key 2


cluster_3

hash_key 3


cluster_1

hash_key 1



map

hlist_head.first

 

 

 

 

 

 

 

 



hn1

hlist_node

pprev

next



map:ht1->hn1





hn3

hlist_node

pprev

next



map:ht5->hn3





null1
NULL



null2
NULL



hn1:s->map:ht1





hn2

hlist_node

pprev

next



hn1:next->hn2





hn2:next->null1





hn2:s->hn1:s





hn3:s->map:ht5





hn3:next->null2





雜湊名詞:

  • 雜湊表(hash table):利用雜湊函式計算出對應的雜湊值,而建立的表格。
  • 雜湊桶(hash bucket):雜湊表中儲存資料的位置,每個雜湊桶中都是一個鍊結串列,當發生碰撞時,便會將其加入串列。
  • 雜湊表的大小 = 雜湊桶的數量

dfs

dfs 函數通過前序和中序走訪數列來構建二元樹。函數的核心是利用前序走訪的特性(根->左->右)和中序走訪的特性(左->根->右)來確定每個節點的左右子樹。使用雜湊表來快速查找中序走訪中節點的索引,提高了查找效率。最終,函數返回建好的二元樹的根節點。

  • 參數

    • int *preorder:前序遍歷數組的指標。
    • int pre_low:前序遍歷的起始索引。
    • int pre_high:前序遍歷的結束索引。
    • int *inorder:中序遍歷數組的指標。
    • int in_low:中序遍歷的起始索引。
    • int in_high:中序遍歷的結束索引。
    • struct hlist_head *in_heads:中序遍歷數組的雜湊鏈表頭部陣列。
    • int size:雜湊表的大小。
  • 檢查是否為空子樹。

  • 配置記憶體空間給新節點 tn,並設置它的值為當前前序走訪中的第一個元素 preorder[pre_low] 。這個元素是當前子樹的根節點。

  • 使用 find 函數在表中查雜湊表找根節點在中序遍歷中的索引 idx 。這一步確定了根節點在中序遍歷中的位置,從而可以劃分左右子樹。

  • 遞迴建左子樹。左子樹的前序遍歷範圍是 pre_low + 1pre_low + (idx - in_low),中序遍歷範圍是 in_lowidx - 1

  • 遞迴建右子樹。右子樹的前序遍歷範圍是 pre_high - (in_high - idx - 1)pre_high,中序遍歷範圍是 idx + 1in_high

static struct TreeNode *dfs(int *preorder,                                      
                            int pre_low,
                            int pre_high,
                            int *inorder,
                            int in_low,
                            int in_high,
                            struct hlist_head *in_heads,
                            int size)
{
    if (in_low > in_high || pre_low > pre_high)
        return NULL;

    struct TreeNode *tn = malloc(sizeof(*tn));
    tn->val = preorder[pre_low];
    int idx = find(preorder[pre_low], size, in_heads);
    tn->left = dfs(preorder, pre_low + 1, pre_low + (idx - in_low), inorder,
                   in_low, idx - 1, in_heads, size);
    tn->right = dfs(preorder, pre_high - (in_high - idx - 1), pre_high, inorder,
                    idx + 1, in_high, in_heads, size);
    return tn;
}

buildTree

接著我們便可以來建立一顆二元樹,最後透過 dfs 函式回傳一顆二元樹。

static struct TreeNode *buildTree(int *preorder,
                                  int preorderSize,
                                  int *inorder,
                                  int inorderSize)
{   
    struct hlist_head *in_heads = malloc(inorderSize * sizeof(*in_heads));
    for (int i = 0; i < inorderSize; i++)
        INIT_HLIST_HEAD(&in_heads[i]);
    for (int i = 0; i < inorderSize; i++)
        node_add(inorder[i], i, inorderSize, in_heads);

    return dfs(preorder, 0, preorderSize - 1, inorder, 0, inorderSize - 1,
               in_heads, inorderSize);
}

打印二元樹

得到了樹狀結構體 binaryTree ,需要像題目所示將每一層的節點都印出來(包括 NULL)。利用 node[MAX_SIZE] 當作堆疊讓我們走訪每一層的節點, MAX_SIZE 則表示二元樹的最大節點數量。

void printTreeLevels(struct TreeNode *binaryTree) 
{
    struct TreeNode *node[MAX_SIZE];
    int front = 0, end = 0;
    int currentLevelCount = 1;  // 當前層次的節點數量
    int nextLevelCount = 0;     // 下一層次的節點數量

    if (binaryTree == NULL) {
        printf("binarytree = [ NULL ]\n");
        return;
    }

    node[end++] = binaryTree;

    printf("binarytree = [ ");
    while (front != end) {
        struct TreeNode *pop_node = node[front++];
        currentLevelCount--;

        if (pop_node == NULL) {
            printf("NULL, ");
        } else {
            printf("%d, ", pop_node->val);
            node[end++] = pop_node->left;
            node[end++] = pop_node->right;
            nextLevelCount += 2;
        }

        // 當前層次已經處理完
        if (currentLevelCount == 0) {
            // printf("\n");
            currentLevelCount = nextLevelCount;
            nextLevelCount = 0;
        }
    }
    printf("]\n");
}

輸出結果如下,和範例答案一致。

binarytree = [ 3, 9, 20, NULL, NULL, 15, 7, NULL, NULL, NULL, NULL, ]

Linux 核心的 cgroups

cgroup (Control Group) ,控制群組。是實作在 Linux 中用來限制、控制與分離一個行程群組的資源(如CPU、主記憶體、磁碟輸入輸出等)。

cgroup 使用樹狀結構來管理資源,一個 hierarchy 預設會有一個根結點,所有的process (pid) 都會 attach 在這個節點上。

一個 hierarchy 可以對應到零個或多個上述的 subsystem ,並在一個節點內設置上述的那些限制,那這些限制就會套用到在這個節點內的所有process。

可以在 hierarchy 內建立子節點,那子節點就會預設套用父節點的所有設置,然後可以只針對有興趣的項目作更細緻的調正。

一個 process 在一棵 hierarchy 只能 attach 在一個節點上,可以對 process 設定所在的節點。從 process fork 出來的 process 會在同一個節點上,但是搬運 process 到不同的節點,並不會影響子 process 。

cgroup

pre-order walk 程式碼

css_next_descendant_pre 函式用於在 Linux 中的 cgroup(control group)子系統中進行先序走訪,以走訪特定 cgroup 的所有後代節點。

主要用於資源監控和管理。cgroup 被用來限制、計算和隔離一組任務(如 process)所使用的系統資源(如 CPU、記憶體、IO 等)。使用 css_next_descendant_pre 可以走訪特定 cgroup 及其所有後代,從而對整個 cgroup 樹進行監控和管理。

實作過程:

  • 檢查進入程式前互斥鎖(mutex)和 RCU (Read-Copy Update) 已鎖上,確保無同步問題
  • 若是第一次的迭代,則直接回傳根節點
  • 訪問當前節點 pos 的第一個子節點。如果存在子節點,返回這個子節點。
  • 訪問兄弟節點或祖先的兄弟節點
    • 若當前節點沒有子節點,那麼在樹的結構中往上回溯,嘗試訪問當前節點的兄弟節點。
    • 若當前節點沒有兄弟節點,繼續回溯到父節點並重複這個過程,直到回溯到根節點(root)。

可以看出 css_next_descendant_pre 即是先序走訪(Pre-order Walk),即深度優先搜索(DFS)的一種算法。

struct cgroup_subsys_state *
css_next_descendant_pre(struct cgroup_subsys_state *pos,
			struct cgroup_subsys_state *root)
{
	struct cgroup_subsys_state *next;

	cgroup_assert_mutex_or_rcu_locked();

	/* if first iteration, visit @root */
	if (!pos)
		return root;

	/* visit the first child if exists */
	next = css_next_child(NULL, pos);
	if (next)
		return next;

	/* no child, visit my or the closest ancestor's next sibling */
	while (pos != root) {
		next = css_next_child(pos, pos->parent);
		if (next)
			return next;
		pos = pos->parent;
	}

	return NULL;
}
EXPORT_SYMBOL_GPL(css_next_descendant_pre);

參考

cgroups wikipedia
Linux Kernel doc cgroup-v2
Day26 - Cgroups
murmurHash 實作

quiz2 測驗二

完整題目

測驗目標:

  • 解釋並改進 LRU Cache
  • 在 Linux 核心找出 LRU 相關程式碼並探討

題目運作

  • 實作 LRU (Least recently used) 演算法。當儲存空間有新的資料進來時,會移除掉最長時間為被訪存的資料。

LRUCache & LRUNode

LRUCache 中,我們需要知道哪個節點是最近被使用的,為此我們使用了一個 list_head dhead ,作為存儲最近被使用節點的雙向鏈表,越靠近 dhead 的節點代表最近被使用。每個節點也需要放入雜湊表中,這是由 hlist hhead[] 來實現的。而 capacity 代表快取的容量, count 則表示目前雜湊表中儲存的節點數目。

LRUNode 是上述鏈表中的節點,其結構包括連接 dheadlist_head link,以及用於放入雜湊表的 hlist_node node 。此外, keyvalue 用來存放鍵值對,以便在快取中查找和存取。

typedef struct {
    int capacity;
    int count;
    struct list_head dhead;
    struct hlist_head hhead[];
} LRUCache;

typedef struct {
    int key;
    int value;
    struct hlist_node node;
    struct list_head link;
} LRUNode;

結構示意圖如下:

HenryChaing







G


cluster_4

LRUCache


cluster_3

LRUNode 2


cluster_1

LRUNode 1



dhlist_head

dhead

prev

next



hn5

link

prev

next



dhlist_head:next->hn5:prev





map

hhead[]

 

 

 

 

 

 

 

 

 

 

 

 



hn1

node

pprev

next



map:ht1->hn1





hn3

node

pprev

next



map:ht10->hn3





hn6

link

prev

next



hn5:next->hn6





hn1:s->map:ht1





null1
NULL



hn1:next->null1





hn6:next->dhlist_head:e





hn3:s->map:ht10





null2
NULL



hn3:next->null2





lRUCacheCreate

創建一個 LRU Cache 。主要是配置記憶體以及初始化 dhead 和雜湊表 hhead 的每個元素。

LRUCache *lRUCacheCreate(int capacity)
{
    LRUCache *cache = malloc(2 * sizeof(int) + sizeof(struct list_head) +
                             capacity * sizeof(struct list_head));
    cache->capacity = capacity;
    cache->count = 0;
    INIT_LIST_HEAD(&cache->dhead);                                       
    for (int i = 0; i < capacity; i++)
        INIT_HLIST_HEAD(&cache->hhead[i]);
    return cache;
}

lRUCacheFree

要釋放掉整個 LRU cache 。走訪 dhead 鍊結串列的每個節點,使用 list_del 將節點斷開後,再將其釋放。最後釋放雜湊表的部份。 list_entry 的第三個參數為 member ,也就是要將其結構指標的 link 成員傳入取得其節點。

void lRUCacheFree(LRUCache *obj)
{
    struct list_head *pos, *n;
    list_for_each_safe (pos, n, &obj->dhead) {
        LRUNode *cache = list_entry(pos, LRUNode, link); // FFFF = link
        list_del(&cache->link); // GGGG = &cache->link
        free(cache);
    }
    free(obj);
}

hlist_del

要刪除(斷開) hlist 的節點。

void hlist_del(struct hlist_node *n) { struct hlist_node *next = n->next, **pprev = n->pprev *pprev = next; if (next) next->pprev = pprev; // EEEE = next->pprev }

pprev 是指向指著自身節點的指標 (指向上一個節點的 next 指標)

以下說明如何刪除,假設 node_2 是要被刪掉的節點。藍色的箭頭就是要修改的指標。







G



list_head

hlist_head

first



node_1

node_1

pprev

next




list_head:n->node_1:m





node_1:p->list_head:n





node_2

node_2

pprev

next




node_1:n->node_2:m





node_2:p->node_1:n





node_3

node_3

pprev

next




node_2:n->node_3:m





node_3:p->node_2:n





NULL
NULL



node_3:n->NULL





初始狀態即為:

  • n = node2
  • next = node3
  • pprev = &node1->next

執行第4行後

  • *pprev = next = node3

這邊有一點繞,其實他的意思就是 *(&node1->next) = node3 ,在白話文一點說明就是原本 pprev 指向 &node1->next ,第4行就是將其值取出,把 node1->next 變成 next ,也就是 node3。這樣就變成 node1 的 next 指向 node3 。

第5, 6行判斷若要刪除的節點後面還有節點,則將下一個節點的 pprev 變成 *(&node1->next) = node3 ,這樣就把 node2 完全獨立分割出來了。







G



list_head

list_head

first



node_2

node_1

pprev

next




list_head:n->node_2:m





node_1

node_2

pprev

next



node_1:p->node_2:n





NULL1
NULL



node_1:n->NULL1





node_2:p->list_head:n





node_3

node_3

pprev

next




node_2:n->node_3:m





node_3:p->node_2:n





NULL2
NULL



node_3:n->NULL2





lRUCacheGet

要取得快取內的對應 key 的儲存數值。根據快取的結構體,我們可以先取得雜湊值,找到對應的雜湊桶,再去走訪其鍊結串列,找到後使用 list_move 將其串列節點移到串列的頭部,以達成 LRU ,最後回傳儲存數值。

int lRUCacheGet(LRUCache *obj, int key)
{
    int hash = key % obj->capacity;
    struct hlist_node *pos;
    hlist_for_each (pos, &obj->hhead[hash]) {
        LRUNode *cache = list_entry(pos, LRUNode, node); // HHHH = node
        if (cache->key == key) {
            list_move(&cache->link, &obj->dhead); // IIII = &cache->link
            return cache->value;
        }
    }
    return -1;
}

lRUCachePut

將儲存數值以及其 key 放入快取中。考慮到以下狀況:

需要透過走訪 key 值對應的鍊結串列來檢查被儲存資料是否已在快取內。

  1. 要儲存的資料已經在快取中。
    • 使用 list_move 將儲存的節點移到串列的頭部。
  2. 要儲存的資料沒有在快取中,有可分為兩種情況。
    • 快取的容量已滿
      • 因為最後一個節點為最久沒被使用到的,因此要將其節點刪除。
      • 再將新的儲存資料放到最前面的位置
    • 快取的容量未滿
      • 配置記憶體空間給新節點
      • 將新的儲存資料放到最前面的位置
      • 增加 count
void lRUCachePut(LRUCache *obj, int key, int value)
{
    LRUNode *cache = NULL;
    int hash = key % obj->capacity;
    struct hlist_node *pos;
    hlist_for_each (pos, &obj->hhead[hash]) {
        LRUNode *c = list_entry(pos, LRUNode, node); // JJJJ = node
        if (c->key == key) {
            list_move(&c->link, &obj->dhead); // KKKK = &c->link
            cache = c;
        }
    }

    if (!cache) {
        if (obj->count == obj->capacity) {
            cache = list_last_entry(&obj->dhead, LRUNode, link);
            list_move(&cache->link, &obj->dhead);
            hlist_del(&cache->node);
            hlist_add_head(&cache->node, &obj->hhead[hash]);
        } else {
            cache = malloc(sizeof(LRUNode));
            hlist_add_head(&cache->node, &obj->hhead[hash]);
            list_add(&cache->link, &obj->dhead);
            obj->count++;
        }
        cache->key = key;
    }
    cache->value = value;
}

確認正確性

根據 146. LRU Cache 提供的範例的測試程式。

int main(void)
{   
    int capacity = 2; 
    LRUCache *cache = lRUCacheCreate(capacity);
    
    printf("Input: {key:value} = {1, 1}\n");
    lRUCachePut(cache, 1, 1);
    printf("Input: {key:value} = {2, 2}\n");
    lRUCachePut(cache, 2, 2);
    printf("Output: {key:value} = {1, %d}\n", lRUCacheGet(cache, 1));
    printf("Input: {key:value} = {3, 3}\n");
    lRUCachePut(cache, 3, 3);
    printf("Output: {key:value} = {2, %d}\n", lRUCacheGet(cache, 2));
    printf("Input: {key:value} = {4, 4}\n");
    lRUCachePut(cache, 4, 4);
    printf("Output: {key:value} = {1, %d}\n", lRUCacheGet(cache, 1));
    printf("Output: {key:value} = {3, %d}\n", lRUCacheGet(cache, 3));
    printf("Output: {key:value} = {4, %d}\n", lRUCacheGet(cache, 4));
    
    return 0;
}

得到輸出:

Input: {key:value} = {1, 1} Input: {key:value} = {2, 2} Output: {key:value} = {1, 1} Input: {key:value} = {3, 3} Output: {key:value} = {2, -1} // fault Input: {key:value} = {4, 4} Output: {key:value} = {1, -1} // fault Output: {key:value} = {3, 3} Output: {key:value} = {4, 4}

創建大小為 2 的快取。以下用快取結構體中內的 list_head 來說明快取內數據變化。

  • 第 1, 2 行:2 最近被使用,因此被換到最前面。






ele_list



head

list_head

prev

next



node1

2

prev

next



head:e->node1:w





node1:w->head:e





node2

1

prev

next



node1:e->node2:w





node2:w->node1:e





  • 第 3 行:1 被使用,因此 1 被換到最前面






ele_list



head

list_head

prev

next



node1

1

prev

next



head:e->node1:w





node1:w->head:e





node2

2

prev

next



node1:e->node2:w





node2:w->node1:e





  • 第 4 行:輸入 3 進來,因為 2 是最不常被使用的,因此移除。






ele_list



head

list_head

prev

next



node1

3

prev

next



head:e->node1:w





node1:w->head:e





node2

1

prev

next



node1:e->node2:w





node2:w->node1:e





  • 第 5 行:因 2 被移除,因此找不到 2 。
  • 第 6 行:輸入 4 進來,因為 1 是最不常被使用的,因此移除。






ele_list



head

list_head

prev

next



node1

4

prev

next



head:e->node1:w





node1:w->head:e





node2

3

prev

next



node1:e->node2:w





node2:w->node1:e





  • 第 7 行:因 1 被移除,因此找不到 1 。
  • 第 8 行:3 被使用,因此 3 被換到最前面。






ele_list



head

list_head

prev

next



node1

3

prev

next



head:e->node1:w





node1:w->head:e





node2

4

prev

next



node1:e->node2:w





node2:w->node1:e





  • 第 9 行:4 被使用,因此 4 被換到最前面。






ele_list



head

list_head

prev

next



node1

3

prev

next



head:e->node1:w





node1:w->head:e





node2

4

prev

next



node1:e->node2:w





node2:w->node1:e





Linux: Page Frame Reclamation

Linux 中,為了有效管理記憶體和提高效能,核心透過追蹤每個頁面的使用情況,優先回收那些最久未使用的頁面,從而實現記憶體的高效管理。稱作 Page Frame Reclamation

在官方文檔中看到主要討論三件事:

  1. 頁面替換策略 (Page Replacement Policy)
  2. 頁面快取 (Page Cache)
  3. LRU Lists

頁面替換策略 (Page Replacement Policy)

Linux 的 LRU 由兩個 list 組成:

  • active_list: 最近被造訪過的頁面,這些頁面有較高的再使用機率,包含所有進程的工作集。
  • inactive_list: 較長時間未被造訪的頁面,這些頁面有較低的再使用機率,包含待回收的頁面。

由於所有可回收的頁面只存在於這兩個清單中,並且任何進程的頁面都可能被回收,所以這種替換策略是全域性的。

這兩個 list 類似於簡化的 LRU 2Q 演算法,其中維護兩個清單 Am 和 A1。 在 LRU 2Q 中,新分配的頁面首先放在一個 FIFO 佇列 A1 中,如果在該佇列中被引用,則移至一個正常的 LRU 管理的清單 Am。 這大致相當於使用 lru_cache_add() 將頁面放入名為 inactive_list (A1) 的佇列中,並使用 mark_page_accessed() 將其移到 active_list (Am)。 演算法描述如何調整這兩個清單的大小,但 Linux 採取了更簡單的方法,透過 refill_inactive() 將 active_list 底部的頁面移到 inactive_list,以保持 active_list 大約佔總頁面快取的三分之二。下圖說明了 list 的結構以及 page 如何移動。

20200213175820486

頁面快取 (Page Cache)

頁面快取是一組資料結構,其中包含由常規檔案、區塊裝置或交換支援的頁面。該快取存在的主要原因是為了消除不必要的磁碟讀取。 從磁碟讀取的頁面儲存在頁面雜湊表中,該表在結構體 address_space 和存取磁碟之前始終搜尋的偏移量上進行雜湊處理。

LRU Lists

LRU清單由兩個清單組成,稱為 active_list 和 inactive_list 。 它們在 mm/page_alloc.c 中聲明,並受 pagemap_lru_lock spinlock 保護。 從廣義上講,它們分別儲存「熱」頁面和「冷」頁面,或者換句話說,active_list包含系統中的所有工作集,inactive_list 包含回收候選者。

函數 shrink_cache() 是替換演算法的一部分,它從 inactive_list 取得 page 並決定如何將它們換出。 該函數是一個大循環,從 inactive_list 底部掃描 max_scan 個 page 最多來釋放 nr_pagespage ,直到 inactive_list 為空。每種不同類型的 page ,釋放的時候具體的做法不同。

Spinlock: 是 Linux 中使用非常頻繁的 locking 機制。當 Process A 申請 lock 成功後,Process B 申請鎖就會失敗,但不會引發排程,一如粒子原地自旋 (spin)。

原地轉到 Process A 釋放 lock 為止,由於不會 sleep,也不會引發 schedule 的特性使然,在 interrupt context 中,資料的保護通常採用 spinlock。

Linux 核心設計: 淺談同步機制

lib/lru_cache.c

lib/lru_cache.c 是用來實現 LRU 快取的程式碼。 LRU 快取用於追蹤和管理活躍物件集合,以提高存取效能和記憶體使用效率。

這個檔案定義了一個 LRU 快取管理器,該管理器允許在記憶體中追蹤一組「熱」對象,確保最常訪問的對象可以快速訪問,並在快取中管理未使用或較少使用的對象。 文件中包含用於建立、管理和銷毀 LRU 快取。

__lc_get

和我們練習題類似,要取得 LRU 快取內的資料。因為它是核心中文件頁面的快取機制,因此有很多保護機制。

  • 飢餓檢查: 若為快取為飢餓狀態,則增加 starving 計數器並返回 NULL
  • 鎖定檢查: 若快取被鎖定,增加 locked 計數器並返回 NULL。
  • 飢餓狀態處理: 檢查是否有可用的未使用元素,如果沒有,將快取標記為飢餓並返回 NULL。

可以看到在處理 LRU 取出的部份和我們上面的操作是十分相似的。

static struct lc_element *__lc_get(struct lru_cache *lc, unsigned int enr, unsigned int flags)
{
	struct lc_element *e;

	PARANOIA_ENTRY();
	if (test_bit(__LC_STARVING, &lc->flags)) {
		++lc->starving;
		RETURN(NULL);
	}

	e = __lc_find(lc, enr, 1);
	/* if lc_new_number != lc_number,
	 * this enr is currently being pulled in already,
	 * and will be available once the pending transaction
	 * has been committed. */
	if (e) {
		if (e->lc_new_number != e->lc_number) {
			/* It has been found above, but on the "to_be_changed"
			 * list, not yet committed.  Don't pull it in twice,
			 * wait for the transaction, then try again...
			 */
			if (!(flags & LC_GET_MAY_USE_UNCOMMITTED))
				RETURN(NULL);
			/* ... unless the caller is aware of the implications,
			 * probably preparing a cumulative transaction. */
			++e->refcnt;
			++lc->hits;
			RETURN(e);
		}
		/* else: lc_new_number == lc_number; a real hit. */
		++lc->hits;
		if (e->refcnt++ == 0)
			lc->used++;
		list_move(&e->list, &lc->in_use); /* Not evictable... */
		RETURN(e);
	}
	/* e == NULL */

	++lc->misses;
	if (!(flags & LC_GET_MAY_CHANGE))
		RETURN(NULL);

	/* To avoid races with lc_try_lock(), first, mark us dirty
	 * (using test_and_set_bit, as it implies memory barriers), ... */
	test_and_set_bit(__LC_DIRTY, &lc->flags);

	/* ... only then check if it is locked anyways. If lc_unlock clears
	 * the dirty bit again, that's not a problem, we will come here again.
	 */
	if (test_bit(__LC_LOCKED, &lc->flags)) {
		++lc->locked;
		RETURN(NULL);
	}

	/* In case there is nothing available and we can not kick out
	 * the LRU element, we have to wait ...
	 */
	if (!lc_unused_element_available(lc)) {
		set_bit(__LC_STARVING, &lc->flags);
		RETURN(NULL);
	}

	/* It was not present in the active set.  We are going to recycle an
	 * unused (or even "free") element, but we won't accumulate more than
	 * max_pending_changes changes.  */
	if (lc->pending_changes >= lc->max_pending_changes)
		RETURN(NULL);

	e = lc_prepare_for_change(lc, enr);
	BUG_ON(!e);

	clear_bit(__LC_STARVING, &lc->flags);
	BUG_ON(++e->refcnt != 1);
	lc->used++;
	lc->pending_changes++;

	RETURN(e);
}

quiz2 測驗三

完整題目

測驗目標:

  • 解釋並改進 find_nth_bit
  • 在 Linux 核心找出 find_nth_bit 相關程式碼並探討

__ffs

找到 word 中右邊數來第一個非 0 的位元。檢查的方式就是先檢查一半低位元是否為 0 (AND 運算),若為 0 ,將 word 右移後繼續檢查。第一次檢查會先判斷是否為 64 位元的架構。

  • 若是 64 位元架構,檢查低 32 位元是否為 0
  • 檢查低 16 位元是否為 0
  • 檢查低 8 位元是否為 0
  • 檢查低 4 位元是否為 0
  • 檢查低 2 位元是否為 0
  • 檢查低 1 位元是否為 0
static inline unsigned long __ffs(unsigned long word)
{
    int num = 0;

#if BITS_PER_LONG == 64
    if ((word & 0xffffffff) == 0) { // AAAA = 0xffffffff
        num += 32;
        word >>= 32;
    }
#endif
    if ((word & 0xffff) == 0) {
        num += 16;
        word >>= 16;
    }
    if ((word & 0xff) == 0) {
        num += 8;
        word >>= 8;
    }
    if ((word & 0xf) == 0) {
        num += 4;
        word >>= 4;
    }
    if ((word & 0x3) == 0) {
        num += 2;
        word >>= 2;
    }
    if ((word & 0x1) == 0)
        num += 1;
    return num;
}

這是透過位移以及 Bitwise 的操作實現,以下我舉一個 32 位元的例子:

word=1001,1100,0000,0000,0000,0000,0000,0000

  1. 因為是 32 位元,因此第一個判斷不操作。
  2. word & 0xffff = 0 => num = 16,
    word=1001,1100,0000,0000
  3. word & 0xff = 0 => num = 24,
    word=1001,1100
  4. word & 0xf = 1100 ,不成立,
    word=1001,1100
  5. word & 0x3 = 0 => num = 26,
    word=10,0111
  6. word & 0x1 = 1 => ,不成立,
    word=10,0111

__clear_bit

清除 addr 指標中指定位置 nr 的位元。我們先來看下面兩個位元操作。

  • BIT_MASK : 將 1 左移到 nr 的位置上,取餘則是確保位移範圍在指定位元架構下。
  • BIT_WORD : 計算給定位索引 nr 在 unsigned long 數組中的索引
#define BITS_PER_LONG 64

#define BIT_MASK(nr)   (1UL << ((nr) % BITS_PER_LONG))
#define BIT_WORD(nr)   ((nr) / BITS_PER_LONG)

上面提到,每個處理器的架構不同,有些是 64 位,有些是 32 位。而在做 Bitwise 操作時,就要考慮到這個問題。我們傳進來的 addr 是指標型態,有就是說它可能是一段連續的記憶體空間(超過一個數組)。這樣的情況我們就必須先計算地址的索引值。

舉個例子, addr 是一個有兩個數組組成,每個元素有 64 位,一共是有 128 位元,目前都預設為 1 。

// 初始化所有位為 1
volatile unsigned long addr[2] = {0xFFFFFFFFFFFFFFFF, 0xFFFFFFFFFFFFFFFF}; 

我們想要清除第 70 個位元,而產生的 mask 是第 7 位為 1 。

unsigned long nr = 70;
unsigned long mask = BIT_MASK(nr); // 生成 mask,1UL << (70 % 64) = 1UL << 6 = 0x40

因為要清除的位元不在地一個數組裡,所以我們透過 BIT_WORD 來計算是在第幾個,這邊

70/64=1 ,就可以對第二個數組清除了。

  • addr[0] = 0xFFFFFFFFFFFFFFFF
  • addr[1] = 0xFFFFFFFFFFFFFFBF
// 計算包含第 70 位的地址, addr + 1
unsigned long *p = ((unsigned long *) addr) + BIT_WORD(nr); 

而做位元清除操作,其實就是將數組去和遮罩取反的值做 AND 運算,就可以將特定的位元置 0 。

static inline void __clear_bit(unsigned long nr, volatile unsigned long *addr)
{
    unsigned long mask = BIT_MASK(nr);
    unsigned long *p = ((unsigned long *) addr) + BIT_WORD(nr);

    *p &= ~mask; // BBBB = ~mask                                   
}

fns

在一個 word 中找到第 n 個設置的位(值為1的位)。如果找到了第 n 個設置的位,函數返回其位置索引,否則返回 BITS_PER_LONG,表示沒有找到這個設置的位。

利用上面提到的 __ffs 找到最低被設置為 1 的位置來一個一個取得第 n 個位。每次進入迴圈就先取得最低被設置為 1 的位置,若此時還不是我們要的第 n 個的話,則用 __clear_bit 將其位置清除,若是第 n 個的話,則可以直接回傳,就可以得到第 n 個被設置為 1 的位元了。

static inline unsigned long fns(unsigned long word, unsigned int n)
{
    while (word) {
        unsigned int bit = __ffs(word);
        if (n-- == 0)
            return bit;
        __clear_bit(bit, &word);
    }

    return BITS_PER_LONG;
}

find_nth_bit

這是實際上要去找到第 n 個被設置的位元的函式。

  • small_const_nbits : 判斷 nbits 是否小於等於計算機架構位元數,範圍在 1 到 BITS_PER_LONG 之間。
    • __builtin_constant_p(nbits) :這是 GCC 的內建函式,用於檢查 nbits 是否是一個常量表達式。如果是常量表達式,這個內建函數返回真。
    • && (nbits) <= BITS_PER_LONG :檢查 nbits 是否小於或等於 BITS_PER_LONG
    • && (nbits) > 0 :檢查 nbits 是否大於 0。
#define small_const_nbits(nbits) \
    (__builtin_constant_p(nbits) && (nbits) <= BITS_PER_LONG && (nbits) > 0)
  • GENMASK : 生成一個從第 l 位到第 h 位(包含這兩位)設置為 1 的 mask。以下以 BITS_PER_LONG 為 32 位舉 GENMASK(7, 4) 為例子。
    • ((~0UL) - (1UL << (l)) + 1) : (0xFFFFFFFF - 0x10 + 0x01) = 0xFFFFFFF0
      • 換成 2 進制就是 1111,1111,1111,0000 ,可以看到低於第 4 位的皆為 0 ,高於第 4 位(包含第 4 位)皆為 1 。
    • (~0UL >> (BITS_PER_LONG - 1 - (h))) : (0xFFFFFFFF >> (32 - 1 - 7)) = (0xFFFFFFFF >> 24) = 0xFF
      • 換成 2 進制就是 0000,0000,1111,1111 ,可以看到低於第 7 位(包含第 7 位)的皆為 1 ,高於第 7 位皆為 0 。
    • 0xFFFFFFF0 & 0xFF = 0xF0
      • 換成 2 進制就是 0000,0000,1111,0000 ,就將第 4 位到第 7 位(包含這兩位)設置為 1 的 mask。
#define GENMASK(h, l) \                                                 
    (((~0UL) - (1UL << (l)) + 1) & (~0UL >> (BITS_PER_LONG - 1 - (h))))

先透過檢查 n 是否超出範圍,並檢查 size 是否是一個小於等於計算機架構位元數。若是的話,取記憶體地址區域的值並且只保留從第 0 位到第 size-1 位的值。若成功,則進入 fns 計算第 n 個被設置的位元,否則回傳 size

static inline unsigned long find_nth_bit(const unsigned long *addr,
                                           unsigned long size,
                                           unsigned long n)
{
    if (n >= size)
        return size;

    if (small_const_nbits(size)) {
        unsigned long val = *addr & GENMASK(size - 1, 0);

        return val ? fns(val, n) : size;
    }

    return FIND_NTH_BIT(addr[idx], size, n);
}

FIND_NTH_BIT

可以看到在 find_nth_bit 中若是 size 大於計算機架構位元數,則會執行 FIND_NTH_BIT 函式來處理位元跨越字節邊界的情況。走訪整個給定記憶體空間的位置,直到找到要的位置後,將目前走訪的數量加上使用 fns 函式尋找這個位置內第 n 個被設置為 1 的位元。

#define FIND_NTH_BIT(FETCH, size, num)                          \
    ({                                                          \
        unsigned long sz = (size), nr = (num), idx, w, tmp;     \
                                                                \
        for (idx = 0; (idx + 1) * BITS_PER_LONG <= sz; idx++) { \
            if (idx * BITS_PER_LONG + nr >= sz)                 \
                goto out;                                       \
                                                                \
            tmp = (FETCH);                                      \
            w = hweight_long(tmp);                              \
            if (w > nr)                                         \
                goto found;                                     \
                                                                \
            nr -= w;                                            \
        }                                                       \
                                                                \
        if (sz % BITS_PER_LONG)                                 \
            tmp = (FETCH) & BITMAP_LAST_WORD_MASK(sz);          \
    found:                                                      \
        sz = min(idx * BITS_PER_LONG + fns(tmp, nr), sz);       \
    out:                                                        \
        sz;                                                     \
    })

Linux: CPU Mask

Linux kernel 用 CPU Mask 來管理多個 CPU 的狀態。在 include/linux/cpumask.h 中提供了系統裡 CPU 的 Bitmap 。其每個位元代表了各個 CPU 的狀態。

而 CPU 有四種不同的狀態:

  • cpu_possible : 系統啟動時任意時刻都可插入的 CPU ID 集合。
  • cpu_present : 現在插入的 CPU
  • cpu_online : 是 cpu_present 的一個子集,表示哪些CPU 是調度器可存取的
  • cpu_active : 該遮罩的某些位元告訴 Linux 內核,一個任務可能已移至某個處理器。

cpumask_nth

cpumask 結構中找到第 cpu 個被設定為 1 的 CPU。 如果這個 CPU 不存在,則傳回大於或等於 nr_cpu_ids 的值。

static inline unsigned int cpumask_nth(unsigned int cpu, const struct cpumask *srcp)
{
	return find_nth_bit(cpumask_bits(srcp), small_cpumask_bits, cpumask_check(cpu));
}

Linux: CPU Affinity

Linux 允許將 process 或 thread 綁定到 CPU 或一系列CPU,以便 process 或 thread 僅在指定的 CPU 上執行,而非任何CPU。

linux 核心行程調度器天生具有軟 CPU 親和性(affinity)的特性,這意味著 process 通常不會在處理器之間頻繁地轉移。 這種狀態正是我們所希望的,因為 process 轉移的頻率小就代表產生的負載小。

Linux 用 task_struct 來描述一個 process ,如果為給定的 process 設定了給定的位,那麼這個 process 就可以在相關的 CPU 上運作。 因此,如果一個 process 可以在任何 CPU 上運行,並且能夠根據需要在處理器之間進行轉移,那麼位元遮罩就全是 1。 實際上,這就是 Linux 中 process 的預設狀態。

rtaImage

可以在 linux/kernel/sched/core.c 中看到 sched_setaffinity 函式。用於設定任務(process 或 thread)在多核系統中運行的 CPU Affinity 。 確保任務只能在指定的 CPU 集合上運行。而就是通過 Bitwise 來操作 CPU mask 。

static int
__sched_setaffinity(struct task_struct *p, struct affinity_context *ctx)
{
	int retval;
	cpumask_var_t cpus_allowed, new_mask;

	if (!alloc_cpumask_var(&cpus_allowed, GFP_KERNEL))
		return -ENOMEM;

	if (!alloc_cpumask_var(&new_mask, GFP_KERNEL)) {
		retval = -ENOMEM;
		goto out_free_cpus_allowed;
	}

	cpuset_cpus_allowed(p, cpus_allowed);
	cpumask_and(new_mask, ctx->new_mask, cpus_allowed);

	ctx->new_mask = new_mask;
	ctx->flags |= SCA_CHECK;

	retval = dl_task_check_affinity(p, new_mask);
	if (retval)
		goto out_free_new_mask;

	retval = __set_cpus_allowed_ptr(p, ctx);
	if (retval)
		goto out_free_new_mask;

	cpuset_cpus_allowed(p, cpus_allowed);
	if (!cpumask_subset(new_mask, cpus_allowed)) {
		/*
		 * We must have raced with a concurrent cpuset update.
		 * Just reset the cpumask to the cpuset's cpus_allowed.
		 */
		cpumask_copy(new_mask, cpus_allowed);

		/*
		 * If SCA_USER is set, a 2nd call to __set_cpus_allowed_ptr()
		 * will restore the previous user_cpus_ptr value.
		 *
		 * In the unlikely event a previous user_cpus_ptr exists,
		 * we need to further restrict the mask to what is allowed
		 * by that old user_cpus_ptr.
		 */
		if (unlikely((ctx->flags & SCA_USER) && ctx->user_mask)) {
			bool empty = !cpumask_and(new_mask, new_mask,
						  ctx->user_mask);

			if (WARN_ON_ONCE(empty))
				cpumask_copy(new_mask, cpus_allowed);
		}
		__set_cpus_allowed_ptr(p, ctx);
		retval = -EINVAL;
	}

out_free_new_mask:
	free_cpumask_var(new_mask);
out_free_cpus_allowed:
	free_cpumask_var(cpus_allowed);
	return retval;
}

參考

Processor affinity
Linux Kernel:CPU 状态管理之 cpumask
CPU亲和性