Try   HackMD

2023q1 Homework1 (lab0)

contributed by < willwillhi1 >

開發環境

$ gcc --version
gcc (Ubuntu 9.4.0-1ubuntu1~20.04.1) 9.4.0
Copyright (C) 2019 Free Software Foundation, Inc.
This is free software; see the source for copying conditions.  There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.

$ lscpu
Architecture:                    x86_64
CPU op-mode(s):                  32-bit, 64-bit
Byte Order:                      Little Endian
Address sizes:                   39 bits physical, 48 bits virtual
CPU(s):                          8
On-line CPU(s) list:             0-7
Thread(s) per core:              2
Core(s) per socket:              4
Socket(s):                       1
NUMA node(s):                    1
Vendor ID:                       GenuineIntel
CPU family:                      6
Model:                           142
Model name:                      Intel(R) Core(TM) i5-10210U CPU @ 1.60GHz
Stepping:                        12
CPU MHz:                         2100.000
CPU max MHz:                     4200.0000
CPU min MHz:                     400.0000
BogoMIPS:                        4199.88
Virtualization:                  VT-x
L1d cache:                       128 KiB
L1i cache:                       128 KiB
L2 cache:                        1 MiB
L3 cache:                        6 MiB
NUMA node0 CPU(s):               0-7

開發過程

先閱讀你所不知道的 C 語言: linked list 和非連續記憶體

可以發現程式中的 head 為上圖的 Head
Node0,1,2element_t

完成 queue.c

q_new

利用 malloc 一個 list_head 並命名為 head,接著判斷是否成功執行 malloc 後用 INIT_LIST_HEAD 來對 head 初始化,也就是把 head->next,head->prev 指向 head

struct list_head *q_new()
{
    struct list_head *head = malloc(sizeof(struct list_head));
    if (head == NULL) 
        return NULL;
    INIT_LIST_HEAD(head);
    return head; 
}

q_free

用迴圈走訪整個佇列,並且 free 掉走過的點。

改進上述漢語表達
:notes: jserv

void q_free(struct list_head *l) 
{
    if (l)
        return;
    struct list_head *next = l->next;
    while (l != next) {
        //list_del(next);
        element_t *node = list_entry(next, element_t, list);
        next = next->next;
        free(node->value);
        free(node);
    }
    free(l);
}

q_insert_head

malloc 一個 element_t 物件叫做 node,然後用 strdup 配置空間給 node->value 後將 s 複製過去,最後用 list_addnode 插入到佇列的開頭

bool q_insert_head(struct list_head *head, char *s)
{
    if (head == NULL) 
        return false;
    element_t *node = malloc(sizeof(struct element_t));
    if (node == NULL) 
        return false;
    node->value = strdup(s);
    if (node->value == NULL) {
        free(node);
        return false;
    }
    list_add(&node->list, head);
    return true;
}

q_insert_tail

q_insert_head 相似,把 list_add 改成 list_add_tail

bool q_insert_tail(struct list_head *head, char *s)
{
    if (head == NULL) 
        return false;
    element_t *node = malloc(sizeof(struct element_t));
    if (node == NULL) 
        return false;
    node->value = strdup(s);
    if (node->value == NULL) {
        free(node);
        return false;
    }
    list_add_tail(&node->list, head);
    return true;
}

q_remove_head

去除佇列的第一個元素,sp 為一個已配置空間的字元陣列,bufsize 為其長度。先用 list_del 去除第一個元素,再用 strncpy 把去除元素的 value 複製到 sp 中。

element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize)
{
    if (head == NULL || list_empty(head)) 
        return NULL;
    element_t *node = list_entry(head->next, element_t, list);
    list_del(head->next);
    if (sp != NULL) {
        strncpy(sp, node->value, bufsize-1);
        sp[bufsize-1] = '\0';
    }
    return node;
}

q_remove_tail

q_remove_head 相似,把 head->next 改成 head->prev

element_t *q_remove_tail(struct list_head *head, char *sp, size_t bufsize)
{
    if (head == NULL || list_empty(head)) 
        return NULL;
    element_t *node = list_entry(head->prev, element_t, list);
    list_del(head->prev);
    if (sp != NULL) {
        strncpy(sp, node->value, bufsize-1);
        sp[bufsize-1] = '\0';
    }
    return node;
}

q_size

如果佇列為空或只有 head 就回傳 0,之後用 list_for_each 走訪整個佇列計算 node 數量

int q_size(struct list_head *head)
{
    if (head == NULL || list_empty(head))
        return 0;
    int size = 0;
    struct list_head *l;
    list_for_each(l, head)
        size++;
    return size;
}

q_delete_mid

宣告 firsthead->next, secondhead->prev,用 while 迴圈判斷 firstsecond 是否相遇,若還沒則兩個各走一步,直到相遇時 first 代表中間點,之後便用 list_del 把其從佇列中去除,然後再 free 掉該 node

bool q_delete_mid(struct list_head *head)
{
    // https://leetcode.com/problems/delete-the-middle-node-of-a-linked-list/
    if (head == NULL || list_empty(head)) 
        return false;
    struct list_head *first = head->next;
    struct list_head *second = head->prev;
    while ((first != second) && (first->next != second)) {
        first = first->next;
        second = second->prev;
    }
    list_del(first);
    element_t *node = list_entry(first, element_t, list);
    free(node->value);
    free(node);
    return true;
}

q_delete_dup

先確認佇列是否為空。宣告兩個 element_t 指標 N1,N2N1 代表目前沒有重複的最後一個點,N2 代表目前重複的數的第一個點







del_mid



node0

head



node1

1



node0->node1





node2

2



node1->node2





node3

2



node2->node3





node4

3



node3->node4





N1
N1



N1->node1





N2
N2



N2->node2





接著就是判斷 N2->value 是否等於 N2->next->value,若是則刪除 N2->next 直到不相同為止,最後再把重複的 N2 刪除

bool q_delete_dup(struct list_head *head)
{
    // https://leetcode.com/problems/remove-duplicates-from-sorted-list-ii/
    if (!head)
        return false;
    element_t *N1 = list_entry(head, element_t, list);
    element_t *N2 = list_entry(head->next, element_t, list);
    while (&N2->list != head) {
        char *value = list_entry(N2->list.next, element_t, list)->value;
        if (N2->list.next != head && strcmp(N2->value, value) == 0) {
            while (N2->list.next != head && strcmp(N2->value, value) == 0) {
                element_t *node = list_entry(N2->list.next, element_t, list);
                list_del(N2->list.next);
                q_release_element(node);
                value = list_entry(N2->list.next, element_t, list)->value;
            }
            list_del(&N2->list);
            element_t *node = list_entry(&N2->list, element_t, list);
            q_release_element(node);
            N2 = list_entry(N1->list.next, element_t, list);
        }
        else {
            N1 = list_entry(N1->list.next, element_t, list);
            N2 = list_entry(N2->list.next, element_t, list);
        }
    }
    return true;
}
  1. 留意程式碼風格,N1N2 並非恰當的命名。
  2. if (head == NULL) 改為更簡短的 if (!head)
  3. strcmp(..) == 0 改為更剪短的 !strcmp(...)
  4. 縮減迴圈內的分支指令數量

:notes: jserv

  • 改進後的版本
    額外使用 isdup 判斷是否需要刪除目前的節點 first
    下圖可以看到因為 first->value 等於 second->value 所以把 first 刪除 以及把 isdup 設成 true






del_mid



node0

head



node1

1



node0->node1





node2

1



node1->node2





node3

2



node2->node3





node4

3



node3->node4





first
first



first->node1





second
second



second->node2





接著來因為 isdup 等於 true,所以把 first 刪除後,再把 isdup 改回 false







del_mid



node0

head



node1

1



node0->node1





node2

2



node1->node2





node3

3



node2->node3





first
first



first->node1





second
second



second->node2





bool q_delete_dup(struct list_head *head) { // https://leetcode.com/problems/remove-duplicates-from-sorted-list-ii/ if (!head) return false; element_t *first; element_t *second; bool isdup = false; list_for_each_entry_safe(first, second, head, list) { if (&second->list != head \ && !strcmp(first->value, second->value)) { list_del(&first->list); q_release_element(first); isdup = true; } else if (isdup) { list_del(&first->list); q_release_element(first); isdup = false; } } return true; }

q_swap

確認 head 是否為 NULL 或佇列是否為空,用 first,second 代表要交換的一組 node,下方第 9 到 14 行進行交換,接著就把 first,second 指到下一組 node,直到 firstsecond 等於 head 為止

void q_swap(struct list_head *head) { // https://leetcode.com/problems/swap-nodes-in-pairs/ if (head == NULL || list_empty(head)) return; struct list_head *first = head->next; struct list_head *second = head->next->next; while (first != head && second != head) { first->prev->next = second; first->next = second->next; second->prev = first->prev; first->prev = second; second->next->prev = first; second->next = first; first = first->next; second = first->next; } }

q_reverse

確認 head 是否為 NULL 或佇列是否為空。初始化 first = head,second = head->next,接著互換 firstprevnext(8, 9 行),最後 11, 12 行是跳到下兩個節點,直到 first != head,也就是佇列裡的 prev,next 都被換過

void q_reverse(struct list_head *head) { if (head == NULL || list_empty(head)) return; struct list_head *first = head; struct list_head *second = head->next; do { first->next = first->prev; first->prev = second; first = second; second = first->next; } while (first != head); }

q_reverseK

最初的想法是用之前實作的 q_reverse 來寫,也就是把整個佇列分成每次以 k 個點為單位進行反轉。
首先利用 q_size 找出佇列的長度,然後除以 k 得到 times 代表共要做幾次 q_reverse
在每次的反轉裡,都會從原佇列切出來長度為 k+1 的佇列,first,last 代表該切割出來的佇列的頭尾節點。front,end 用來記錄該切割下來的佇列在原本佇列的前後節點,用來把反轉後的佇列接回去原佇列。
以下圖為例假設 k = 3,且要反轉點 1,2,3,所以 first 會在 head 上,last3front,end 分別記錄該切割下來的佇列的前後點是 64







del_mid



node0

head



node1

1



node0->node1





node2

2



node1->node2





node3

3



node2->node3





node4

4



node3->node4





node5

5



node4->node5





node6

6



node5->node6





node6->node0






first
first



first->node0





last
last



last->node3





end
end



end->node4





front
front



front->node6





改進你的漢語表達
:notes: jserv

void q_reverseK(struct list_head *head, int k)
{
    // https://leetcode.com/problems/reverse-nodes-in-k-group/
    if (head == NULL) 
        return;
    struct list_head *front = head;
    struct list_head *end = head;
    struct list_head *first = head;
    struct list_head *last = head;
    int times = q_size(head)/k;
    for (int i = 0; i < times; ++i) {
        for (int j = 0; j < k; ++j)
            last = last->next;
        front = first->prev;
        end = last->next;
        last->next = first;
        first->prev = last;
        q_reverse(first);
        first->prev->next = end;
        end->prev = first->prev;
        first->prev = front;
        // update first, last
        first = end->prev;
        last = first;
    }
}
  • 改進後的版本
    先宣告和初始化兩個 head_list: tmpresult,本來想用 malloc,但是這次作業好像禁止用,所以改用 LIST_HEAD
    接著與前一個實作想法相同一次對 k 個節點作反轉,只不過這次會先透過 list_cut_position 把這 k 個節點移到 tmp 上,反轉後把 tmplist_splice_tail_init 接到 result 的尾端且初始化 tmp
    最後把 resultlist_splice_init 接到 head 開頭。
void q_reverseK(struct list_head *head, int k) { // https://leetcode.com/problems/reverse-nodes-in-k-group/ if (!head) return; struct list_head *last = head->next; int times = q_size(head)/k; LIST_HEAD(tmp); LIST_HEAD(result); for (int i = 0; i < times; ++i) { for (int j = 0; j < k; ++j) last = last->next; list_cut_position(&tmp, head, last->prev); q_reverse(&tmp); list_splice_tail_init(&tmp, &result); } list_splice_init(&result, head); }

q_sort

參考 你所不知道的 C 語言: linked list 和非連續記憶體merge sort 的實做。

實作共三個函式 q_sort,mergesort,mergelist
mergesort 輸入的佇列如果只有一個節點或空則直接回傳,接著把輸入的佇列分成兩半,然後對這兩個佇列做 mergesortmergesort 會把這兩個佇列排序。

struct list_head *mergesort(struct list_head *l) { if (l == NULL || l->next == NULL) return l; struct list_head *fast = l; struct list_head *slow = l; while (fast->next != NULL && fast->next->next != NULL) { fast = fast->next->next; slow = slow->next; } struct list_head *l1 = l; struct list_head *l2 = slow->next; slow->next = NULL; return mergelist(mergesort(l1), mergesort(l2)); }

最後對這兩個已排序的佇列用 mergelist 合併。
mergelistpointer to pointer 的概念來做合併,縮短程式碼長度。

struct list_head *mergelist(struct list_head *l1, struct list_head *l2) { struct list_head *head = NULL; struct list_head **cur = &head; while (l1 && l2) { element_t *e1 = list_entry(l1, element_t, list); element_t *e2 = list_entry(l2, element_t, list); if (strcmp(e1->value, e2->value) >= 0) { *cur = l2; l2 = l2->next; } else { *cur = l1; l1 = l1->next; } cur = &(*cur)->next; } *cur = (struct list_head *) ((u_int64_t)l1 | (u_int64_t)l2); return head; }

避免張貼過長的程式碼,畢竟 HackMD 是紀錄和討論的協作環境,你應該摘錄關鍵程式碼。實作後應提出改進事項。善用 qtest 內建的 time 命令,觀察你撰寫程式的行為。
node 的翻譯是「節點」,而非「點」
:notes: jserv

q_sort 將原本被拆開的節點重新連結起來

void q_sort(struct list_head *head) { if (head == NULL || list_empty(head)) return; head->prev->next = NULL; head->next = mergesort(head->next); struct list_head *cur = head; struct list_head *next = head->next; while (next) { next->prev = cur; cur = next; next = next->next; } cur->next = head; head->prev = cur; }

再這之後我利用雙向鏈結串列的特性,改寫 mergesort 用快慢指標找中間節點的作法,改成從頭尾開始去找,這樣便可以從共需要走訪 1.5 次佇列到只需要 1 次即可。

struct list_head *mergesort(struct list_head *l) { if (!l || !l->next) return l; struct list_head *first = l; struct list_head *last = l->prev; while (first != last && first->next != last) { first = first->next; last = last->prev; } struct list_head *l1 = l; struct list_head *l2 = first->next; l2->prev = l->prev; l1->prev = first; first->next = NULL; return mergelist(mergesort(l1), mergesort(l2)); }

接下來我用 qtest time 去作效能測試,可以發現效能有明顯的提高

改進前 改進後
100000 筆 1.140 0.123
200000 筆 0.288 0.277
300000 筆 0.472 0.449
400000 筆 0.652 0.622
500000 筆 0.824 0.797
600000 筆 1.024 0.968
700000 筆 1.219 1.189
800000 筆 1.409 1.351

q_descend

如果點 a 的右邊有比它的值還大的點就要把點 a 移除。
關於 q_descend 的回傳值可以去 qtest.c 找,在 do_descend 裡可以看到這行

if (exception_setup(true)) current->size = q_descend(current->q);

表示 q_descend 要回傳佇列的長度。
因為是雙向鏈結串列,所以可以從尾端開始走訪,在走訪的過程中若發現下一個點的值小於等於當前的點的值就移除。
make test 進行測試時,發現之前的實作有誤,移除的點還要用 q_release_element 釋放空間。

改進漢語表達
:notes: jserv

int q_descend(struct list_head *head) { // https://leetcode.com/problems/remove-nodes-from-linked-list/ if (head == NULL || list_empty(head)) return 0; element_t *first = list_entry(head->prev, element_t, list); element_t *second = list_entry(head->prev->prev, element_t, list); while (&second->list != head) { if (strcmp(first->value, second->value) < 0) { second = list_entry(second->list.prev, element_t, list); first = list_entry(first->list.prev, element_t, list); } else { list_del(&second->list); q_release_element(second); second = list_entry(first->list.prev, element_t, list); } } return q_size(head); }

q_merge

queue_contex_t 是用來管理與其他佇列連接的結構,這邊的輸入 list_head *head 代表 list_head chainhead
實作程式的想法是參考你所不知道的 C 語言: linked list 和非連續記憶體: 案例探討: LeetCode 21. Merge Two Sorted Lists 的 Merge k Sorted Lists 的部分。
因為在文中有提到直觀的想法是用第一條佇列接上剩下的佇列,這樣會導致第一條佇列愈來愈長,立即會遇到的問題是:多數的情況下合併速度會愈來愈慢。
所以我採用的方法是 分段合併







G



i1
interval = 1



i2
interval = 2




i4
interval = 4




i8
interval = 8




interval1

L0

L1

L2

L3

L4

L5

L6

L7



interval2

L01

 

L23

 

L45

 

L67

 



interval1:f0->interval2:f0





interval1:f1->interval2:f0





interval1:f2->interval2:f2





interval1:f3->interval2:f2





interval1:f4->interval2:f4





interval1:f5->interval2:f4





interval1:f6->interval2:f6





interval1:f7->interval2:f6






interval4

L0123

 

 

 

L4567

 

 

 



interval2:f0->interval4:f0





interval2:f2->interval4:f0





interval2:f4->interval4:f4





interval2:f6->interval4:f4






interval8

result

 

 

 

 

 

 

 



interval4:f0->interval8:f0





interval4:f4->interval8:f0






第一層 for 迴圈數等同 qsize(head)/2,裡面的 while 迴圈裡則是做兩兩合併的操作,first 是第一個要合併的佇列,second 是第二個要合併的佇列,merge_two_list 會把兩個佇列合併,然後把結果存到 first 並回傳合併後佇列的長度,接下來把 NULL 賦值給空的 second 然後移到 head 的尾端,用來當作 while 迴圈的中止條件,最後 firstsecond 往下移一個節點繼續做合併。

int q_merge(struct list_head *head) { // https://leetcode.com/problems/merge-k-sorted-lists/ if (!head || list_empty(head)) return 0; else if (list_is_singular(head)) return q_size(list_first_entry(head, queue_contex_t, chain)->q); int size = q_size(head); int count = (size%2) ? size/2+1 : size/2; int queue_size = 0; for (int i = 0; i < count; ++i) { queue_contex_t *first = list_first_entry(head, queue_contex_t, chain); queue_contex_t *second = list_entry(first->chain.next, queue_contex_t, chain); while (first->q && second->q) { queue_size = merge_two_list(first->q, second->q); second->q = NULL; list_move_tail(&second->chain, head); first = list_entry(first->chain.next, queue_contex_t, chain); second = list_entry(first->chain.next, queue_contex_t, chain); } } return queue_size; }

更新實作:
因為 second->q = NULL 會造成 memory leak 所以改寫 q_mergewhile 迴圈

while (!list_empty(first->q) && !list_empty(second->q)) { queue_size = merge_two_list(first->q, second->q); list_move_tail(&second->chain, head); first = list_entry(first->chain.next, queue_contex_t, chain); second = list_entry(first->chain.next, queue_contex_t, chain); }

以及把 merge_two_list 函式裡面的

list_splice_tail(second, &temp_head)

改成

list_splice_tail_init(second, &temp_head);

在 qtest 新增 shuffle 命令

console.h 可以發現以下

/* Add a new command */ void add_cmd(char *name, cmd_func_t operation, char *summary, char *parameter); #define ADD_COMMAND(cmd, msg, param) add_cmd(#cmd, do_##cmd, msg, param)

add_cmd 會在命令列表中添加新命令,要新增一條新命令 shuffle 則要實作 do_shuffle,並在qtest.cconsole_init() 替新命令加上 ADD_COMMAND

ADD_COMMAND(shuffle, "Do Fisher-Yates shuffle", "");

qtest 裡的 do_shuffle :
目前只有檢查輸入的佇列是否為空或只有一個節點
q_shuffle 要去 queue.h 新增

static bool do_shuffle(int argc, char *argv[]) { if (!current || !current->q) report(3, "Warning: Calling shuffle on null queue"); error_check(); if (q_size(current->q) < 2) report(3, "Warning: Calling shuffle on single queue"); error_check(); if (exception_setup(true)) q_shuffle(current->q); q_show(3); return !error_check(); }

shuffle 的實作分成兩個函式: q_shuffleswap
在實作 swap 時要注意 :

  1. node_1 總是在 node_2 之後
  2. 當兩個要交換的節點是相鄰時不要做 list_move(node_2, node_1_prev),因為 node_1_prev 等於 node_2,如果做了 list_move(node_2, node_1_prev) 會把 node_2 這個節點移除。
static inline void swap(struct list_head *node_1, struct list_head *node_2) { if (node_1 == node_2) return; struct list_head *node_1_prev = node_1->prev; struct list_head *node_2_prev = node_2->prev; if (node_1->prev != node_2) list_move(node_2, node_1_prev); list_move(node_1, node_2_prev); }

我的 q_shuffle 的想法是按照 Fisher–Yates shuffle 提供的演算法:
new = last->prev 是最後一個未被抽到的節點,
oldrandom 出來的節點,
oldnew 交換,再將 size 減一,直到 size 變為零為止。

void q_shuffle(struct list_head *head) { if (!head || list_is_singular(head)) return; struct list_head *last = head; int size = q_size(head); while (size > 0) { int index = rand() % size; struct list_head *new = last->prev; struct list_head *old = new; while (index--) old = old->prev; swap(new, old); last = last->prev; size--; } return; }

接下來利用腳本去測試對 [1, 2, 3] 執行 shuffle 的結果

Expectation: 166666 Observation: {'123': 166945, '132': 167113, '213': 166658, '231': 165775, '312': 166737, '321': 166772}

可以從結果看到 shuffle 的結果分佈大致上是 Uniform distribution

研讀 Linux 核心原始程式碼的 lib/list_sort.c

先看到函式宣告的部分

/** * list_sort - sort a list * @priv: private data, opaque to list_sort(), passed to @cmp * @head: the list to sort * @cmp: the elements comparison function */ __attribute__((nonnull(2,3))) void list_sort(void *priv, struct list_head *head, list_cmp_func_t cmp)
  • __attribute__((nonnull())): compiler 提供的 attribute function,可以指定該函式傳入第幾個變數必須是 non-null,否則會發出警告。
  • prev: private datacmp 的參數。
  • head: 整個 listhead
  • cmp: compare function, 用來決定 sorting order,是 function pointer 的型態。

list_sort 函式
註解的部分跟老師的筆記大致相同

* This mergesort is as eager as possible while always performing at least
 * 2:1 balanced merges.  Given two pending sublists of size 2^k, they are
 * merged to a size-2^(k+1) list as soon as we have 2^k following elements.
 *
 * Thus, it will avoid cache thrashing as long as 3*2^k elements can
 * fit into the cache.  Not quite as good as a fully-eager bottom-up
 * mergesort, but it does use 0.2*n fewer comparisons, so is faster in
 * the common case that everything fits into L1.

主要是在做 merge 時,會將走訪過的節點維持在 2:1 的比例,其中 2 代表兩個已經排序過的佇列, 1 代表新進來尚未排序的節點,也就是走訪到 3∗2^k 個 node 時才合併前面的兩個 2^k sub-list
這種作法在 3∗2^k 個節點能夠完全放進 cache 中的情況下能夠避免 cache thrashing 的發生。
接下來就是程式碼的部分:
先把整個 list 的最後一個點的 next 指向 NULL:

head->prev->next = NULL;

count 從零開始,代表 pending list 節點個數,每次加入一個節點到 pending list 就加一。
配合前面的 merge 時機,只要該次迴圈的countcount+1 時,若 count+1 為 2k 就不做 merge,反之則要做 merge
舉例來說:

  1. count(二進位) 變化為 0011 -> 0100,不做 merge
  2. count(二進位) 變化為 0010 -> 0011,做 merge

由上述可以歸納出(以 0 ~ 16 為例)只要 count = 0000, 0001, 0011, 0111, 1111 都不做 merge
接下來的程式用圖來舉例說明:

  • pending: 為 head of pending lists ,而 pending lists 是用於暫存待合併的 sub-list of lists,每一個 sub-list 的 size 都為 power of 2 的倍數,並且各個 sub-list 以 prev 連結。
  • tail: 指標的指標,會指向 pending 裡準備要合併的 sub-list,要合併的 sub-list 是由 count 決定。
  • list: 目前走訪到的 node。
do { size_t bits; struct list_head **tail = &pending; /* Find the least-significant clear bit in count */ for (bits = count; bits & 1; bits >>= 1) tail = &(*tail)->prev; /* Do the indicated merge */ if (likely(bits)) { struct list_head *a = *tail, *b = a->prev; a = merge(priv, cmp, b, a); /* Install the merged result in place of the inputs */ a->prev = b->prev; *tail = a; } /* Move one element from input list to pending */ list->prev = pending; pending = list; list = list->next; pending->next = NULL; count++; } while (list);

這邊舉個例子,由於不熟 grapgviz,所以先用 drawio 畫圖,之後有空會補。
以實際例子來說,如果要排序一個佇列(4->1->2->3),可以邊對照下方表格邊看圖解。

count 變化 count 二進位 merge pending 上的節點
0 -> 1 0000 -> 0001 no(20 1
1 -> 2 0001 -> 0010 no(21 1 + 1
2 -> 3 0010 -> 0011 yes (2) + 1
3 -> 4 0011 -> 0100 no(22 2 + 1 + 1
  • 初始化 count 為零,所以第一個迴圈的 count 變化為 0 -> 1,看表可以知道不用做 merge,把 pendinglist 往後讀一個節點。下圖為此次迴圈做到最後的結果。
  • 此次 count 變化為 1 -> 2,所以也不用做 merge,所以 pendinglist 往後讀一個節點。最後整個佇列會如下圖:
  • count 變化為 2 -> 3, 要做 merge
    這邊的 tail 會指向 pending 的 address,
    ​​​​struct list_head **tail = &pending;
    所以在執行下程式時 a 會在上一張圖的在 1b4,之後便是做 merge
    然後 merge 會回傳合併之後的 head 並指派給 a,在本例中就是下圖1
    a->prev = b->prevb->prev 就是指向下一個 sub-listhead,本例中因為這就是第一個了所以是 NULL
    最後 *tail = a 就是把 pending 指到 a,也就是 1
    ​​​​if (likely(bits)) { ​​​​ struct list_head *a = *tail, *b = a->prev; ​​​​ a = merge(priv, cmp, b, a); ​​​​ /* Install the merged result in place of the inputs */ ​​​​ a->prev = b->prev; ​​​​ *tail = a; ​​​​}
    此時的 list 指向下圖的 2pending 指向下圖的 1,經過以下的運算可以得到下圖:
    ​​​​list->prev = pending ​​​​pending = list; ​​​​list = list->next; ​​​​pending->next = NULL; ​​​​count++;
  • count 的變化為 3 -> 4,不做 merge,所以跟前幾個相同,pendinglist 往後讀一個節點。
  • 由於 list 等於 NULL,所以跳出迴圈
    list 指到與 pending 相同的節點後,
    pending 往前移一個節點。
    ​​​​list = pending; ​​​​pending = pending->prev;
  • 進入 for 迴圈,next 指到 pending->prev 也就是下一個 sub-listhead,如果 next != NULL 就做對 pendinglistmerge,然後 pending 指到 next 所指的節點也就是下一個 sub-listhead
  • 下一次進入 for 迴圈因為 next 等於 NULL,所以跳出迴圈,對 pendinglistmerge_final,此函式會先把最後的兩個佇列合併後,再把 prev 重建回來。
  • 最後就可以得到整個排序的佇列了!

比較 list_sort.c 與自己的 sort.c 效能差異

先在 qtest.cadd_param 新增 parameter

add_param("linux_sort", &enable_linux_list_sort, "Enable linux list sort", NULL);

enable_linux_list_sort 是在 qtest 宣告的全域變數

static int enable_linux_list_sort = 0;

接下來修改部份 do_sort 程式:

if (current && exception_setup(true)) { if (enable_linux_list_sort) list_sort(NULL, current->q, compare); else q_sort(current->q); }

這樣就可以透過以下命令來切換要用哪個 sort

option linux_sort 0 option linux_sort 1

接下來先修改 queue.h
其中 likely 和 unlikely 可以在 compiler.h 找到

# define likely(x) __builtin_expect(!!(x), 1) # define unlikely(x) __builtin_expect(!!(x), 0) typedef uint8_t u8; typedef int __attribute__((nonnull(2,3))) (*list_cmp_func_t)(); /** * list_sort - use linux list sort * @priv: private data, opaque to list_sort(), passed to @cmp * @head: the list to sort * @cmp: the elements comparison function */ void list_sort(void *priv, struct list_head *head, list_cmp_func_t cmp); /** * Compare - this function must return > 0 if @a should sort after * @b ("@a > @b" if you want an ascending sort), and <= 0 if @a should * sort before @b. */ int compare(void* priv, struct list_head *a, struct list_head *b);

接下來是 queue.c 中的 compare 函式

int compare(void* priv, struct list_head *a, struct list_head *b) { element_t *ele_a = list_entry(a, element_t, list); element_t *ele_b = list_entry(b, element_t, list); return strcmp(ele_a->value, ele_b->value); }

接下來比較的方式用 perf 來做測試
參考 blueskyson 的方式
lab0-c 新增 perf-traces 目錄,裡面放了:

100000.cmd
200000.cmd
300000.cmd
400000.cmd
500000.cmd

前面的數字代表要幾個隨機字串,以 100000.cmd 為例

# sort 100000 random strings option linux_sort 0 new it RAND 100000 sort free

以下是我的實驗結果:

  • 100000 筆
/* my merge sort */ Performance counter stats for './qtest -v 0 -f perf-traces/100000.cmd' (10 runs): 5,465,689 cache-misses # 42.893 % of all cache refs ( +- 3.47% ) 12,923,171 cache-references ( +- 1.43% ) 430,796,599 instructions # 1.26 insn per cycle ( +- 0.05% ) 350,214,286 cycles ( +- 1.42% ) 0.21634 +- 0.00417 seconds time elapsed ( +- 1.93% ) ---------------------------------------------------------------------------------------------------- /* linux list sort */ Performance counter stats for './qtest -v 0 -f perf-traces/100000.cmd' (10 runs): 4,715,115 cache-misses # 43.500 % of all cache refs ( +- 0.66% ) 10,830,099 cache-references ( +- 0.75% ) 434,026,705 instructions # 1.42 insn per cycle ( +- 0.02% ) 303,486,799 cycles ( +- 0.16% ) 0.193264 +- 0.000793 seconds time elapsed ( +- 0.41% )
  • 200000 筆
/* my merge sort */ Performance counter stats for './qtest -v 0 -f perf-traces/200000.cmd' (10 runs): 14,711,476 cache-misses # 50.953 % of all cache refs ( +- 0.54% ) 28,758,512 cache-references ( +- 0.16% ) 866,082,498 instructions # 1.23 insn per cycle ( +- 0.01% ) 703,121,988 cycles ( +- 0.16% ) 0.442772 +- 0.000732 seconds time elapsed ( +- 0.17% ) ---------------------------------------------------------------------------------------------------- /* linux list sort */ Performance counter stats for './qtest -v 0 -f perf-traces/200000.cmd' (10 runs): 12,785,220 cache-misses # 52.422 % of all cache refs ( +- 0.66% ) 24,082,873 cache-references ( +- 0.40% ) 876,536,039 instructions # 1.33 insn per cycle ( +- 0.06% ) 651,662,599 cycles ( +- 0.96% ) 0.41534 +- 0.00510 seconds time elapsed ( +- 1.23% )
  • 300000 筆
/* my merge sort */ Performance counter stats for './qtest -v 0 -f perf-traces/300000.cmd' (10 runs): 25,176,158 cache-misses # 53.992 % of all cache refs ( +- 0.40% ) 46,354,816 cache-references ( +- 0.14% ) 1,306,457,419 instructions # 1.18 insn per cycle ( +- 0.01% ) 1,099,759,769 cycles ( +- 0.14% ) 0.69770 +- 0.00257 seconds time elapsed ( +- 0.37% ) ---------------------------------------------------------------------------------------------------- /* linux list sort */ Performance counter stats for './qtest -v 0 -f perf-traces/300000.cmd' (10 runs): 21,191,601 cache-misses # 53.300 % of all cache refs ( +- 2.69% ) 38,848,369 cache-references ( +- 0.91% ) 1,322,133,992 instructions # 1.23 insn per cycle ( +- 0.03% ) 1,016,682,373 cycles ( +- 1.69% ) 0.6824 +- 0.0115 seconds time elapsed ( +- 1.69% )
  • 400000 筆
/* my merge sort */ Performance counter stats for './qtest -v 0 -f perf-traces/400000.cmd' (10 runs): 36,354,050 cache-misses # 56.179 % of all cache refs ( +- 0.36% ) 64,409,386 cache-references ( +- 0.20% ) 1,747,604,644 instructions # 1.16 insn per cycle ( +- 0.01% ) 1,498,409,395 cycles ( +- 0.13% ) 0.94878 +- 0.00400 seconds time elapsed ( +- 0.42% ) ---------------------------------------------------------------------------------------------------- /* linux list sort */ Performance counter stats for './qtest -v 0 -f perf-traces/400000.cmd' (10 runs): 30,814,049 cache-misses # 57.978 % of all cache refs ( +- 0.11% ) 53,256,480 cache-references ( +- 0.07% ) 1,771,579,815 instructions # 1.29 insn per cycle ( +- 0.00% ) 1,373,081,425 cycles ( +- 0.12% ) 0.86360 +- 0.00113 seconds time elapsed ( +- 0.13% )
  • 500000 筆
/* my merge sort */ Performance counter stats for './qtest -v 0 -f perf-traces/500000.cmd' (10 runs): 47,344,165 cache-misses # 56.546 % of all cache refs ( +- 0.18% ) 83,221,177 cache-references ( +- 0.18% ) 2,190,772,850 instructions # 1.14 insn per cycle ( +- 0.01% ) 1,905,805,048 cycles ( +- 0.13% ) 1.20848 +- 0.00371 seconds time elapsed ( +- 0.31% ) ---------------------------------------------------------------------------------------------------- /* linux list sort */ Performance counter stats for './qtest -v 0 -f perf-traces/500000.cmd' (10 runs): 40,109,266 cache-misses # 58.315 % of all cache refs ( +- 0.27% ) 68,806,922 cache-references ( +- 0.18% ) 2,219,682,272 instructions # 1.27 insn per cycle ( +- 0.00% ) 1,747,506,164 cycles ( +- 0.38% ) 1.09820 +- 0.00412 seconds time elapsed ( +- 0.37% )
  • 可以發現 linux list sort 的效能因為 cache miss 的次數明顯低於我實作的 merge sort,所以執行時間也比較快

Valgrind 排除 qtest 執行時的記憶體錯誤

發現在使用兩次以上的 new 後再做 free 會進入無限迴圈。
利用 GDB+pwndbg 作測試,可以發現程式在 is_circular 這個函式無限迴圈

pwndbg> run Starting program: /home/will/linux2023/lab0-c/qtest cmd> new l = [] cmd> new l = [] cmd> free ^C Program received signal SIGINT, Interrupt. is_circular () at qtest.c:856 856 while (cur != current->q) {

繼續往下看可以發現是在做 do_free 裡的 q_showis_circular 進入無限迴圈的

► f 0 0x555555556acc q_show+67 f 1 0x555555556acc q_show+67 f 2 0x5555555572a2 do_free+252 f 3 0x55555555a055 interpret_cmda+128 f 4 0x55555555a625 interpret_cmd+353 f 5 0x55555555b2e3 run_console+174 f 6 0x5555555593e2 main+1601 f 7 0x7ffff7c97083 __libc_start_main+243

最後仔細檢查可以發現 q_free 裡的

if (chain.size > 1) { qnext = ((uintptr_t) &current->chain.next == (uintptr_t) &chain.head) ? chain.head.next : current->chain.next; }

&current->chain.next 不用加 &
結果發現 sysprog/lab0-c 已經修正該錯誤了。

  • trace-03-ops 的錯誤
+++ TESTING trace trace-03-ops: # Test of insert_head, insert_tail, remove_head, reverse and merge ERROR: Freed queue, but 2 blocks are still allocated ==7825== 112 bytes in 2 blocks are still reachable in loss record 1 of 1 ==7825== at 0x483B7F3: malloc (in /usr/lib/x86_64-linux-gnu/valgrind/vgpreload_memcheck-amd64-linux.so) ==7825== by 0x10F3F4: test_malloc (harness.c:133) ==7825== by 0x10F8A0: q_new (queue.c:18) ==7825== by 0x10CD91: do_new (qtest.c:151) ==7825== by 0x10E066: interpret_cmda (console.c:181) ==7825== by 0x10E636: interpret_cmd (console.c:201) ==7825== by 0x10EA73: cmd_select (console.c:610) ==7825== by 0x10F37E: run_console (console.c:705) ==7825== by 0x10D3F3: main (qtest.c:1246) ==7825== --- trace-03-ops 0/6

實際去對 trace-03-ops 的每個指令去作測試,發現在對兩個 new 出來的佇列做 merge 時會跳出上述的問題,因此我推測是 merge 有什麼地方寫錯,後來發現 q_merge 裡的 while 迴圈的 second->q = NULL 是不對的。預期的輸出是若有兩個佇列 ab 要和併到 a,把 b 的所有節點移到 a 時也要把 b 做初始化的動作,也就是要保留 second->q,所以如果直接做 second->q = NULL 會造成 memory leak 的問題。
重新修改程式後順利排除記憶體的所有問題

will@will:~/linux2023/lab0-c$ make valgrind # Explicitly disable sanitizer(s) make clean SANITIZER=0 qtest make[1]: Entering directory '/home/will/linux2023/lab0-c' rm -f qtest.o report.o console.o harness.o queue.o random.o dudect/constant.o dudect/fixture.o dudect/ttest.o shannon_entropy.o linenoise.o web.o .qtest.o.d .report.o.d .console.o.d .harness.o.d .queue.o.d .random.o.d .dudect/constant.o.d .dudect/fixture.o.d .dudect/ttest.o.d .shannon_entropy.o.d .linenoise.o.d .web.o.d *~ qtest /tmp/qtest.* rm -rf .dudect rm -rf *.dSYM (cd traces; rm -f *~) CC qtest.o CC report.o CC console.o CC harness.o CC queue.o CC random.o CC dudect/constant.o CC dudect/fixture.o CC dudect/ttest.o CC shannon_entropy.o CC linenoise.o CC web.o LD qtest make[1]: Leaving directory '/home/will/linux2023/lab0-c' cp qtest /tmp/qtest.5cWFNg chmod u+x /tmp/qtest.5cWFNg sed -i "s/alarm/isnan/g" /tmp/qtest.5cWFNg scripts/driver.py -p /tmp/qtest.5cWFNg --valgrind --- Trace Points +++ TESTING trace trace-01-ops: # Test of insert_head and remove_head --- trace-01-ops 5/5 +++ TESTING trace trace-02-ops: # Test of insert_head, insert_tail, remove_head, remove_tail, and delete_mid --- trace-02-ops 6/6 +++ TESTING trace trace-03-ops: # Test of insert_head, insert_tail, remove_head, reverse and merge --- trace-03-ops 6/6 +++ TESTING trace trace-04-ops: # Test of insert_head, insert_tail, size, swap, and sort --- trace-04-ops 6/6 +++ TESTING trace trace-05-ops: # Test of insert_head, insert_tail, remove_head, reverse, size, swap, and sort --- trace-05-ops 6/6 +++ TESTING trace trace-06-ops: # Test of insert_head, insert_tail, delete duplicate, sort, descend and reverseK --- trace-06-ops 6/6 +++ TESTING trace trace-07-string: # Test of truncated strings --- trace-07-string 6/6 +++ TESTING trace trace-08-robust: # Test operations on empty queue --- trace-08-robust 6/6 +++ TESTING trace trace-09-robust: # Test remove_head with NULL argument --- trace-09-robust 6/6 +++ TESTING trace trace-10-robust: # Test operations on NULL queue --- trace-10-robust 6/6 +++ TESTING trace trace-11-malloc: # Test of malloc failure on insert_head --- trace-11-malloc 6/6 +++ TESTING trace trace-12-malloc: # Test of malloc failure on insert_tail --- trace-12-malloc 6/6 +++ TESTING trace trace-13-malloc: # Test of malloc failure on new --- trace-13-malloc 6/6 +++ TESTING trace trace-14-perf: # Test performance of insert_tail, reverse, and sort --- trace-14-perf 6/6 +++ TESTING trace trace-15-perf: # Test performance of sort with random and descending orders # 10000: all correct sorting algorithms are expected pass # 50000: sorting algorithms with O(n^2) time complexity are expected failed # 100000: sorting algorithms with O(nlogn) time complexity are expected pass --- trace-15-perf 6/6 +++ TESTING trace trace-16-perf: # Test performance of insert_tail --- trace-16-perf 6/6 +++ TESTING trace trace-17-complexity: # Test if time complexity of q_insert_tail, q_insert_head, q_remove_tail, and q_remove_head is constant Probably constant time Probably constant time Probably constant time Probably constant time --- trace-17-complexity 5/5 --- TOTAL 100/100

論文閱讀:Dude, is my code constant time?

論文連結
dudect Github
參考
這篇論文的貢獻是,透過自己開發的 dudect,一個透過統計分析的程式判斷程式是否是 constant-time 在給定的平台之下。
實作可分為以下步驟:

  • A. Step 1: Measure execution time
    • a) Classes definition: 給定兩種不同類別的輸入資料,重複對方法進行測量,最常見的就是 fix-vs-random tests
    • b) Cycle counters: 現今的 CPU 提供的 cycle counters 可以很精準的獲得執行的時間。
    • c) Environmental conditions: 為了減少環境的差異,每次測量都會對應隨機的輸入類別。
  • B. Step 2: Apply post-processing
    • a) Cropping: 剪裁掉一些超過 threshold 的測量結果
    • b) Higher-order preprocessing: Depending on the statistical test applied, it may be beneficial to apply some higherorder pre-processing
  • C. Step 3: Apply statistical test
    • a) t-test: Welch’s t-test,測試兩個平均數是否相同
    • b) Non-parametric tests:(無母數統計分析): Kolmogorov-Smirnov

dudect

  • 跟據作業說明:「在 oreparaz/dudect 的程式碼具備 percentile 的處理,但在 lab0-c 則無。」。所以我去對照發現 lab0-cfixture.c 確實沒有實作,造成極端值沒有被去除,導致判斷結果出錯。
  • oreparaz/dudect 處理 percentile 的部分加入 lab0-cfixture.c 後便可以正確分析 insert_head, insert_tail, remove_head, remove_tail 的執行時間。
static int cmp(const int64_t *a, const int64_t *b) { return (int) (*a - *b); } static int64_t percentile(int64_t *a, double which, size_t size) { qsort(a, size, sizeof(int64_t), (int (*)(const void *, const void *)) cmp); size_t array_position = (size_t)((double) size * (double) which); assert(array_position >= 0); assert(array_position < size); return a[array_position]; } /* set different thresholds for cropping measurements. the exponential tendency is meant to approximately match the measurements distribution, but there's not more science than that. */ static void prepare_percentiles(int64_t *percentiles, int64_t *exec_times) { for (size_t i = 0; i < DUDECT_NUMBER_PERCENTILES; i++) { percentiles[i] = percentile( exec_times, 1 - (pow(0.5, 10 * (double) (i + 1) / DUDECT_NUMBER_PERCENTILES)), N_MEASURES); } }

引入 treesort

筆記連結