contributed by < willwillhi1
>
$ gcc --version
gcc (Ubuntu 9.4.0-1ubuntu1~20.04.1) 9.4.0
Copyright (C) 2019 Free Software Foundation, Inc.
This is free software; see the source for copying conditions. There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
$ lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
Address sizes: 39 bits physical, 48 bits virtual
CPU(s): 8
On-line CPU(s) list: 0-7
Thread(s) per core: 2
Core(s) per socket: 4
Socket(s): 1
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 142
Model name: Intel(R) Core(TM) i5-10210U CPU @ 1.60GHz
Stepping: 12
CPU MHz: 2100.000
CPU max MHz: 4200.0000
CPU min MHz: 400.0000
BogoMIPS: 4199.88
Virtualization: VT-x
L1d cache: 128 KiB
L1i cache: 128 KiB
L2 cache: 1 MiB
L3 cache: 6 MiB
NUMA node0 CPU(s): 0-7
先閱讀你所不知道的 C 語言: linked list 和非連續記憶體
可以發現程式中的 head
為上圖的 Head
Node0,1,2… 為 element_t
利用 malloc
一個 list_head
並命名為 head
,接著判斷是否成功執行 malloc
後用 INIT_LIST_HEAD
來對 head
初始化,也就是把 head->next
,head->prev
指向 head
struct list_head *q_new()
{
struct list_head *head = malloc(sizeof(struct list_head));
if (head == NULL)
return NULL;
INIT_LIST_HEAD(head);
return head;
}
用迴圈走訪整個佇列,並且 free 掉走過的點。
改進上述漢語表達
jserv
void q_free(struct list_head *l)
{
if (l)
return;
struct list_head *next = l->next;
while (l != next) {
//list_del(next);
element_t *node = list_entry(next, element_t, list);
next = next->next;
free(node->value);
free(node);
}
free(l);
}
malloc
一個 element_t
物件叫做 node
,然後用 strdup
配置空間給 node->value
後將 s
複製過去,最後用 list_add
把 node
插入到佇列的開頭
bool q_insert_head(struct list_head *head, char *s)
{
if (head == NULL)
return false;
element_t *node = malloc(sizeof(struct element_t));
if (node == NULL)
return false;
node->value = strdup(s);
if (node->value == NULL) {
free(node);
return false;
}
list_add(&node->list, head);
return true;
}
與 q_insert_head
相似,把 list_add
改成 list_add_tail
bool q_insert_tail(struct list_head *head, char *s)
{
if (head == NULL)
return false;
element_t *node = malloc(sizeof(struct element_t));
if (node == NULL)
return false;
node->value = strdup(s);
if (node->value == NULL) {
free(node);
return false;
}
list_add_tail(&node->list, head);
return true;
}
去除佇列的第一個元素,sp
為一個已配置空間的字元陣列,bufsize
為其長度。先用 list_del
去除第一個元素,再用 strncpy
把去除元素的 value
複製到 sp
中。
element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize)
{
if (head == NULL || list_empty(head))
return NULL;
element_t *node = list_entry(head->next, element_t, list);
list_del(head->next);
if (sp != NULL) {
strncpy(sp, node->value, bufsize-1);
sp[bufsize-1] = '\0';
}
return node;
}
與 q_remove_head
相似,把 head->next
改成 head->prev
element_t *q_remove_tail(struct list_head *head, char *sp, size_t bufsize)
{
if (head == NULL || list_empty(head))
return NULL;
element_t *node = list_entry(head->prev, element_t, list);
list_del(head->prev);
if (sp != NULL) {
strncpy(sp, node->value, bufsize-1);
sp[bufsize-1] = '\0';
}
return node;
}
如果佇列為空或只有 head
就回傳 0,之後用 list_for_each
走訪整個佇列計算 node 數量
int q_size(struct list_head *head)
{
if (head == NULL || list_empty(head))
return 0;
int size = 0;
struct list_head *l;
list_for_each(l, head)
size++;
return size;
}
宣告 first
為 head->next
, second
為 head->prev
,用 while
迴圈判斷 first
和 second
是否相遇,若還沒則兩個各走一步,直到相遇時 first
代表中間點,之後便用 list_del
把其從佇列中去除,然後再 free
掉該 node
bool q_delete_mid(struct list_head *head)
{
// https://leetcode.com/problems/delete-the-middle-node-of-a-linked-list/
if (head == NULL || list_empty(head))
return false;
struct list_head *first = head->next;
struct list_head *second = head->prev;
while ((first != second) && (first->next != second)) {
first = first->next;
second = second->prev;
}
list_del(first);
element_t *node = list_entry(first, element_t, list);
free(node->value);
free(node);
return true;
}
先確認佇列是否為空。宣告兩個 element_t
指標 N1
,N2
,N1
代表目前沒有重複的最後一個點,N2
代表目前重複的數的第一個點
接著就是判斷 N2->value
是否等於 N2->next->value
,若是則刪除 N2->next
直到不相同為止,最後再把重複的 N2
刪除
bool q_delete_dup(struct list_head *head)
{
// https://leetcode.com/problems/remove-duplicates-from-sorted-list-ii/
if (!head)
return false;
element_t *N1 = list_entry(head, element_t, list);
element_t *N2 = list_entry(head->next, element_t, list);
while (&N2->list != head) {
char *value = list_entry(N2->list.next, element_t, list)->value;
if (N2->list.next != head && strcmp(N2->value, value) == 0) {
while (N2->list.next != head && strcmp(N2->value, value) == 0) {
element_t *node = list_entry(N2->list.next, element_t, list);
list_del(N2->list.next);
q_release_element(node);
value = list_entry(N2->list.next, element_t, list)->value;
}
list_del(&N2->list);
element_t *node = list_entry(&N2->list, element_t, list);
q_release_element(node);
N2 = list_entry(N1->list.next, element_t, list);
}
else {
N1 = list_entry(N1->list.next, element_t, list);
N2 = list_entry(N2->list.next, element_t, list);
}
}
return true;
}
N1
和 N2
並非恰當的命名。if (head == NULL)
改為更簡短的 if (!head)
strcmp(..) == 0
改為更剪短的 !strcmp(...)
jserv
isdup
判斷是否需要刪除目前的節點 first
first->value
等於 second->value
所以把 first
刪除 以及把 isdup
設成 true
接著來因為 isdup
等於 true
,所以把 first
刪除後,再把 isdup
改回 false
bool q_delete_dup(struct list_head *head)
{
// https://leetcode.com/problems/remove-duplicates-from-sorted-list-ii/
if (!head)
return false;
element_t *first;
element_t *second;
bool isdup = false;
list_for_each_entry_safe(first, second, head, list) {
if (&second->list != head \
&& !strcmp(first->value, second->value)) {
list_del(&first->list);
q_release_element(first);
isdup = true;
}
else if (isdup) {
list_del(&first->list);
q_release_element(first);
isdup = false;
}
}
return true;
}
確認 head
是否為 NULL
或佇列是否為空,用 first
,second
代表要交換的一組 node
,下方第 9 到 14 行進行交換,接著就把 first
,second
指到下一組 node
,直到 first
或 second
等於 head
為止
void q_swap(struct list_head *head)
{
// https://leetcode.com/problems/swap-nodes-in-pairs/
if (head == NULL || list_empty(head))
return;
struct list_head *first = head->next;
struct list_head *second = head->next->next;
while (first != head && second != head) {
first->prev->next = second;
first->next = second->next;
second->prev = first->prev;
first->prev = second;
second->next->prev = first;
second->next = first;
first = first->next;
second = first->next;
}
}
確認 head
是否為 NULL
或佇列是否為空。初始化 first = head
,second = head->next
,接著互換 first
的 prev
和 next
(8, 9 行),最後 11, 12 行是跳到下兩個節點,直到 first != head
,也就是佇列裡的 prev
,next
都被換過
void q_reverse(struct list_head *head)
{
if (head == NULL || list_empty(head))
return;
struct list_head *first = head;
struct list_head *second = head->next;
do {
first->next = first->prev;
first->prev = second;
first = second;
second = first->next;
} while (first != head);
}
最初的想法是用之前實作的 q_reverse
來寫,也就是把整個佇列分成每次以 k
個點為單位進行反轉。
首先利用 q_size
找出佇列的長度,然後除以 k
得到 times
代表共要做幾次 q_reverse
。
在每次的反轉裡,都會從原佇列切出來長度為 k+1
的佇列,first
,last
代表該切割出來的佇列的頭尾節點。front
,end
用來記錄該切割下來的佇列在原本佇列的前後節點,用來把反轉後的佇列接回去原佇列。
以下圖為例假設 k = 3,且要反轉點 1
,2
,3
,所以 first
會在 head
上,last
在 3
,front
,end
分別記錄該切割下來的佇列的前後點是 6
跟 4
。
改進你的漢語表達
jserv
void q_reverseK(struct list_head *head, int k)
{
// https://leetcode.com/problems/reverse-nodes-in-k-group/
if (head == NULL)
return;
struct list_head *front = head;
struct list_head *end = head;
struct list_head *first = head;
struct list_head *last = head;
int times = q_size(head)/k;
for (int i = 0; i < times; ++i) {
for (int j = 0; j < k; ++j)
last = last->next;
front = first->prev;
end = last->next;
last->next = first;
first->prev = last;
q_reverse(first);
first->prev->next = end;
end->prev = first->prev;
first->prev = front;
// update first, last
first = end->prev;
last = first;
}
}
head_list
: tmp
和 result
,本來想用 malloc
,但是這次作業好像禁止用,所以改用 LIST_HEAD
。k
個節點作反轉,只不過這次會先透過 list_cut_position
把這 k
個節點移到 tmp
上,反轉後把 tmp
用 list_splice_tail_init
接到 result
的尾端且初始化 tmp
。result
用 list_splice_init
接到 head
開頭。
void q_reverseK(struct list_head *head, int k)
{
// https://leetcode.com/problems/reverse-nodes-in-k-group/
if (!head)
return;
struct list_head *last = head->next;
int times = q_size(head)/k;
LIST_HEAD(tmp);
LIST_HEAD(result);
for (int i = 0; i < times; ++i) {
for (int j = 0; j < k; ++j)
last = last->next;
list_cut_position(&tmp, head, last->prev);
q_reverse(&tmp);
list_splice_tail_init(&tmp, &result);
}
list_splice_init(&result, head);
}
參考 你所不知道的 C 語言: linked list 和非連續記憶體 的 merge sort
的實做。
實作共三個函式 q_sort
,mergesort
,mergelist
對 mergesort
輸入的佇列如果只有一個節點或空則直接回傳,接著把輸入的佇列分成兩半,然後對這兩個佇列做 mergesort
,mergesort
會把這兩個佇列排序。
struct list_head *mergesort(struct list_head *l)
{
if (l == NULL || l->next == NULL)
return l;
struct list_head *fast = l;
struct list_head *slow = l;
while (fast->next != NULL && fast->next->next != NULL) {
fast = fast->next->next;
slow = slow->next;
}
struct list_head *l1 = l;
struct list_head *l2 = slow->next;
slow->next = NULL;
return mergelist(mergesort(l1), mergesort(l2));
}
最後對這兩個已排序的佇列用 mergelist
合併。
mergelist
用 pointer to pointer
的概念來做合併,縮短程式碼長度。
struct list_head *mergelist(struct list_head *l1, struct list_head *l2)
{
struct list_head *head = NULL;
struct list_head **cur = &head;
while (l1 && l2) {
element_t *e1 = list_entry(l1, element_t, list);
element_t *e2 = list_entry(l2, element_t, list);
if (strcmp(e1->value, e2->value) >= 0) {
*cur = l2;
l2 = l2->next;
}
else {
*cur = l1;
l1 = l1->next;
}
cur = &(*cur)->next;
}
*cur = (struct list_head *) ((u_int64_t)l1 | (u_int64_t)l2);
return head;
}
避免張貼過長的程式碼,畢竟 HackMD 是紀錄和討論的協作環境,你應該摘錄關鍵程式碼。實作後應提出改進事項。善用 qtest
內建的 time
命令,觀察你撰寫程式的行為。
node
的翻譯是「節點」,而非「點」
jserv
q_sort
將原本被拆開的節點重新連結起來
void q_sort(struct list_head *head)
{
if (head == NULL || list_empty(head))
return;
head->prev->next = NULL;
head->next = mergesort(head->next);
struct list_head *cur = head;
struct list_head *next = head->next;
while (next) {
next->prev = cur;
cur = next;
next = next->next;
}
cur->next = head;
head->prev = cur;
}
再這之後我利用雙向鏈結串列的特性,改寫 mergesort
用快慢指標找中間節點的作法,改成從頭尾開始去找,這樣便可以從共需要走訪 1.5
次佇列到只需要 1
次即可。
struct list_head *mergesort(struct list_head *l)
{
if (!l || !l->next)
return l;
struct list_head *first = l;
struct list_head *last = l->prev;
while (first != last && first->next != last) {
first = first->next;
last = last->prev;
}
struct list_head *l1 = l;
struct list_head *l2 = first->next;
l2->prev = l->prev;
l1->prev = first;
first->next = NULL;
return mergelist(mergesort(l1), mergesort(l2));
}
接下來我用 qtest time
去作效能測試,可以發現效能有明顯的提高
改進前 | 改進後 | |
---|---|---|
100000 筆 | 1.140 | 0.123 |
200000 筆 | 0.288 | 0.277 |
300000 筆 | 0.472 | 0.449 |
400000 筆 | 0.652 | 0.622 |
500000 筆 | 0.824 | 0.797 |
600000 筆 | 1.024 | 0.968 |
700000 筆 | 1.219 | 1.189 |
800000 筆 | 1.409 | 1.351 |
如果點 a 的右邊有比它的值還大的點就要把點 a 移除。
關於 q_descend
的回傳值可以去 qtest.c
找,在 do_descend
裡可以看到這行
if (exception_setup(true)) current->size = q_descend(current->q);
表示 q_descend
要回傳佇列的長度。
因為是雙向鏈結串列,所以可以從尾端開始走訪,在走訪的過程中若發現下一個點的值小於等於當前的點的值就移除。
用 make test
進行測試時,發現之前的實作有誤,移除的點還要用 q_release_element
釋放空間。
改進漢語表達
jserv
int q_descend(struct list_head *head)
{
// https://leetcode.com/problems/remove-nodes-from-linked-list/
if (head == NULL || list_empty(head))
return 0;
element_t *first = list_entry(head->prev, element_t, list);
element_t *second = list_entry(head->prev->prev, element_t, list);
while (&second->list != head) {
if (strcmp(first->value, second->value) < 0) {
second = list_entry(second->list.prev, element_t, list);
first = list_entry(first->list.prev, element_t, list);
}
else {
list_del(&second->list);
q_release_element(second);
second = list_entry(first->list.prev, element_t, list);
}
}
return q_size(head);
}
queue_contex_t
是用來管理與其他佇列連接的結構,這邊的輸入 list_head *head
代表 list_head chain
的 head
。
實作程式的想法是參考你所不知道的 C 語言: linked list 和非連續記憶體: 案例探討: LeetCode 21. Merge Two Sorted Lists 的 Merge k Sorted Lists 的部分。
因為在文中有提到直觀的想法是用第一條佇列接上剩下的佇列,這樣會導致第一條佇列愈來愈長,立即會遇到的問題是:多數的情況下合併速度會愈來愈慢。
所以我採用的方法是 分段合併
第一層 for
迴圈數等同 first
是第一個要合併的佇列,second
是第二個要合併的佇列,merge_two_list
會把兩個佇列合併,然後把結果存到 first
並回傳合併後佇列的長度,接下來把 NULL
賦值給空的 second
然後移到 head
的尾端,用來當作 while
迴圈的中止條件,最後 first
和 second
往下移一個節點繼續做合併。
int q_merge(struct list_head *head)
{
// https://leetcode.com/problems/merge-k-sorted-lists/
if (!head || list_empty(head))
return 0;
else if (list_is_singular(head))
return q_size(list_first_entry(head, queue_contex_t, chain)->q);
int size = q_size(head);
int count = (size%2) ? size/2+1 : size/2;
int queue_size = 0;
for (int i = 0; i < count; ++i) {
queue_contex_t *first = list_first_entry(head, queue_contex_t, chain);
queue_contex_t *second = list_entry(first->chain.next, queue_contex_t, chain);
while (first->q && second->q) {
queue_size = merge_two_list(first->q, second->q);
second->q = NULL;
list_move_tail(&second->chain, head);
first = list_entry(first->chain.next, queue_contex_t, chain);
second = list_entry(first->chain.next, queue_contex_t, chain);
}
}
return queue_size;
}
更新實作:
因為 second->q = NULL
會造成 memory leak
所以改寫 q_merge
的 while
迴圈
while (!list_empty(first->q) && !list_empty(second->q)) {
queue_size = merge_two_list(first->q, second->q);
list_move_tail(&second->chain, head);
first = list_entry(first->chain.next, queue_contex_t, chain);
second = list_entry(first->chain.next, queue_contex_t, chain);
}
以及把 merge_two_list
函式裡面的
list_splice_tail(second, &temp_head)
改成
list_splice_tail_init(second, &temp_head);
在 console.h
可以發現以下
/* Add a new command */
void add_cmd(char *name, cmd_func_t operation, char *summary, char *parameter);
#define ADD_COMMAND(cmd, msg, param) add_cmd(#cmd, do_##cmd, msg, param)
add_cmd
會在命令列表中添加新命令,要新增一條新命令 shuffle
則要實作 do_shuffle
,並在qtest.c
的 console_init()
替新命令加上 ADD_COMMAND
。
ADD_COMMAND(shuffle, "Do Fisher-Yates shuffle", "");
qtest
裡的 do_shuffle
:
目前只有檢查輸入的佇列是否為空或只有一個節點
q_shuffle
要去 queue.h
新增
static bool do_shuffle(int argc, char *argv[])
{
if (!current || !current->q)
report(3, "Warning: Calling shuffle on null queue");
error_check();
if (q_size(current->q) < 2)
report(3, "Warning: Calling shuffle on single queue");
error_check();
if (exception_setup(true))
q_shuffle(current->q);
q_show(3);
return !error_check();
}
shuffle 的實作分成兩個函式: q_shuffle
和 swap
在實作 swap
時要注意 :
node_1
總是在 node_2
之後list_move(node_2, node_1_prev)
,因為 node_1_prev
等於 node_2
,如果做了 list_move(node_2, node_1_prev)
會把 node_2
這個節點移除。
static inline void swap(struct list_head *node_1, struct list_head *node_2)
{
if (node_1 == node_2)
return;
struct list_head *node_1_prev = node_1->prev;
struct list_head *node_2_prev = node_2->prev;
if (node_1->prev != node_2) list_move(node_2, node_1_prev);
list_move(node_1, node_2_prev);
}
我的 q_shuffle
的想法是按照 Fisher–Yates shuffle 提供的演算法:
new = last->prev
是最後一個未被抽到的節點,
old
是 random
出來的節點,
將 old
和 new
交換,再將 size
減一,直到 size
變為零為止。
void q_shuffle(struct list_head *head)
{
if (!head || list_is_singular(head))
return;
struct list_head *last = head;
int size = q_size(head);
while (size > 0) {
int index = rand() % size;
struct list_head *new = last->prev;
struct list_head *old = new;
while (index--)
old = old->prev;
swap(new, old);
last = last->prev;
size--;
}
return;
}
接下來利用腳本去測試對 [1, 2, 3]
執行 shuffle
的結果
Expectation: 166666
Observation: {'123': 166945, '132': 167113, '213': 166658, '231': 165775, '312': 166737, '321': 166772}
可以從結果看到 shuffle
的結果分佈大致上是 Uniform distribution
。
先看到函式宣告的部分
/**
* list_sort - sort a list
* @priv: private data, opaque to list_sort(), passed to @cmp
* @head: the list to sort
* @cmp: the elements comparison function
*/
__attribute__((nonnull(2,3)))
void list_sort(void *priv, struct list_head *head, list_cmp_func_t cmp)
__attribute__((nonnull()))
: compiler
提供的 attribute function
,可以指定該函式傳入第幾個變數必須是 non-null
,否則會發出警告。prev
: private data
,cmp
的參數。head
: 整個 list
的 head
。cmp
: compare function
, 用來決定 sorting order
,是 function pointer
的型態。list_sort
函式
註解的部分跟老師的筆記大致相同
* This mergesort is as eager as possible while always performing at least
* 2:1 balanced merges. Given two pending sublists of size 2^k, they are
* merged to a size-2^(k+1) list as soon as we have 2^k following elements.
*
* Thus, it will avoid cache thrashing as long as 3*2^k elements can
* fit into the cache. Not quite as good as a fully-eager bottom-up
* mergesort, but it does use 0.2*n fewer comparisons, so is faster in
* the common case that everything fits into L1.
主要是在做 merge
時,會將走訪過的節點維持在 2:1
的比例,其中 2
代表兩個已經排序過的佇列, 1
代表新進來尚未排序的節點,也就是走訪到 3∗2^k
個 node 時才合併前面的兩個 2^k
sub-list
。
這種作法在 3∗2^k
個節點能夠完全放進 cache
中的情況下能夠避免 cache thrashing
的發生。
接下來就是程式碼的部分:
先把整個 list
的最後一個點的 next
指向 NULL
:
head->prev->next = NULL;
count
從零開始,代表 pending list
節點個數,每次加入一個節點到 pending list
就加一。
配合前面的 merge
時機,只要該次迴圈的count
到 count+1
時,若 count+1
為 2k 就不做 merge
,反之則要做 merge
。
舉例來說:
0011 -> 0100
,不做 merge
。0010 -> 0011
,做 merge
。由上述可以歸納出(以 0 ~ 16 為例)只要 count
= 0000, 0001, 0011, 0111, 1111
都不做 merge
。
接下來的程式用圖來舉例說明:
pending
: 為 head of pending lists ,而 pending lists 是用於暫存待合併的 sub-list of lists,每一個 sub-list 的 size 都為 power of 2 的倍數,並且各個 sub-list 以 prev 連結。tail
: 指標的指標,會指向 pending 裡準備要合併的 sub-list,要合併的 sub-list 是由 count 決定。list
: 目前走訪到的 node。
do {
size_t bits;
struct list_head **tail = &pending;
/* Find the least-significant clear bit in count */
for (bits = count; bits & 1; bits >>= 1)
tail = &(*tail)->prev;
/* Do the indicated merge */
if (likely(bits)) {
struct list_head *a = *tail, *b = a->prev;
a = merge(priv, cmp, b, a);
/* Install the merged result in place of the inputs */
a->prev = b->prev;
*tail = a;
}
/* Move one element from input list to pending */
list->prev = pending;
pending = list;
list = list->next;
pending->next = NULL;
count++;
} while (list);
這邊舉個例子,由於不熟 grapgviz
,所以先用 drawio
畫圖,之後有空會補。
以實際例子來說,如果要排序一個佇列(4->1->2->3),可以邊對照下方表格邊看圖解。
count 變化 | count 二進位 | merge | pending 上的節點 |
---|---|---|---|
0 -> 1 | 0000 -> 0001 | no( |
1 |
1 -> 2 | 0001 -> 0010 | no( |
1 + 1 |
2 -> 3 | 0010 -> 0011 | yes | (2) + 1 |
3 -> 4 | 0011 -> 0100 | no( |
2 + 1 + 1 |
count
為零,所以第一個迴圈的 count
變化為 0 -> 1
,看表可以知道不用做 merge
,把 pending
和 list
往後讀一個節點。下圖為此次迴圈做到最後的結果。count
變化為 1 -> 2
,所以也不用做 merge
,所以 pending
和 list
往後讀一個節點。最後整個佇列會如下圖:count
變化為 2 -> 3
, 要做 merge
。tail
會指向 pending
的 address,
struct list_head **tail = &pending;
a
會在上一張圖的在 1
,b
在 4
,之後便是做 merge
。merge
會回傳合併之後的 head
並指派給 a
,在本例中就是下圖的 1
。a->prev = b->prev
中 b->prev
就是指向下一個 sub-list
的 head
,本例中因為這就是第一個了所以是 NULL
。*tail = a
就是把 pending
指到 a
,也就是 1
。
if (likely(bits)) {
struct list_head *a = *tail, *b = a->prev;
a = merge(priv, cmp, b, a);
/* Install the merged result in place of the inputs */
a->prev = b->prev;
*tail = a;
}
list
指向下圖的 2
,pending
指向下圖的 1
,經過以下的運算可以得到下圖:
list->prev = pending
pending = list;
list = list->next;
pending->next = NULL;
count++;
count
的變化為 3 -> 4
,不做 merge
,所以跟前幾個相同,pending
和 list
往後讀一個節點。list
等於 NULL
,所以跳出迴圈list
指到與 pending
相同的節點後,pending
往前移一個節點。
list = pending;
pending = pending->prev;
for
迴圈,next
指到 pending->prev
也就是下一個 sub-list
的 head
,如果 next != NULL
就做對 pending
和 list
做 merge
,然後 pending
指到 next
所指的節點也就是下一個 sub-list
的 head
。for
迴圈因為 next
等於 NULL
,所以跳出迴圈,對 pending
和 list
做 merge_final
,此函式會先把最後的兩個佇列合併後,再把 prev
重建回來。先在 qtest.c
用 add_param
新增 parameter
add_param("linux_sort", &enable_linux_list_sort, "Enable linux list sort", NULL);
enable_linux_list_sort
是在 qtest
宣告的全域變數
static int enable_linux_list_sort = 0;
接下來修改部份 do_sort
程式:
if (current && exception_setup(true)) {
if (enable_linux_list_sort)
list_sort(NULL, current->q, compare);
else
q_sort(current->q);
}
這樣就可以透過以下命令來切換要用哪個 sort
option linux_sort 0
option linux_sort 1
接下來先修改 queue.h
其中 likely 和 unlikely
可以在 compiler.h 找到
# define likely(x) __builtin_expect(!!(x), 1)
# define unlikely(x) __builtin_expect(!!(x), 0)
typedef uint8_t u8;
typedef int __attribute__((nonnull(2,3))) (*list_cmp_func_t)();
/**
* list_sort - use linux list sort
* @priv: private data, opaque to list_sort(), passed to @cmp
* @head: the list to sort
* @cmp: the elements comparison function
*/
void list_sort(void *priv, struct list_head *head, list_cmp_func_t cmp);
/**
* Compare - this function must return > 0 if @a should sort after
* @b ("@a > @b" if you want an ascending sort), and <= 0 if @a should
* sort before @b.
*/
int compare(void* priv, struct list_head *a, struct list_head *b);
接下來是 queue.c
中的 compare
函式
int compare(void* priv, struct list_head *a, struct list_head *b)
{
element_t *ele_a = list_entry(a, element_t, list);
element_t *ele_b = list_entry(b, element_t, list);
return strcmp(ele_a->value, ele_b->value);
}
接下來比較的方式用 perf
來做測試
參考 blueskyson 的方式
在 lab0-c
新增 perf-traces
目錄,裡面放了:
100000.cmd
200000.cmd
300000.cmd
400000.cmd
500000.cmd
前面的數字代表要幾個隨機字串,以 100000.cmd
為例
# sort 100000 random strings
option linux_sort 0
new
it RAND 100000
sort
free
以下是我的實驗結果:
/* my merge sort */
Performance counter stats for './qtest -v 0 -f perf-traces/100000.cmd' (10 runs):
5,465,689 cache-misses # 42.893 % of all cache refs ( +- 3.47% )
12,923,171 cache-references ( +- 1.43% )
430,796,599 instructions # 1.26 insn per cycle ( +- 0.05% )
350,214,286 cycles ( +- 1.42% )
0.21634 +- 0.00417 seconds time elapsed ( +- 1.93% )
----------------------------------------------------------------------------------------------------
/* linux list sort */
Performance counter stats for './qtest -v 0 -f perf-traces/100000.cmd' (10 runs):
4,715,115 cache-misses # 43.500 % of all cache refs ( +- 0.66% )
10,830,099 cache-references ( +- 0.75% )
434,026,705 instructions # 1.42 insn per cycle ( +- 0.02% )
303,486,799 cycles ( +- 0.16% )
0.193264 +- 0.000793 seconds time elapsed ( +- 0.41% )
/* my merge sort */
Performance counter stats for './qtest -v 0 -f perf-traces/200000.cmd' (10 runs):
14,711,476 cache-misses # 50.953 % of all cache refs ( +- 0.54% )
28,758,512 cache-references ( +- 0.16% )
866,082,498 instructions # 1.23 insn per cycle ( +- 0.01% )
703,121,988 cycles ( +- 0.16% )
0.442772 +- 0.000732 seconds time elapsed ( +- 0.17% )
----------------------------------------------------------------------------------------------------
/* linux list sort */
Performance counter stats for './qtest -v 0 -f perf-traces/200000.cmd' (10 runs):
12,785,220 cache-misses # 52.422 % of all cache refs ( +- 0.66% )
24,082,873 cache-references ( +- 0.40% )
876,536,039 instructions # 1.33 insn per cycle ( +- 0.06% )
651,662,599 cycles ( +- 0.96% )
0.41534 +- 0.00510 seconds time elapsed ( +- 1.23% )
/* my merge sort */
Performance counter stats for './qtest -v 0 -f perf-traces/300000.cmd' (10 runs):
25,176,158 cache-misses # 53.992 % of all cache refs ( +- 0.40% )
46,354,816 cache-references ( +- 0.14% )
1,306,457,419 instructions # 1.18 insn per cycle ( +- 0.01% )
1,099,759,769 cycles ( +- 0.14% )
0.69770 +- 0.00257 seconds time elapsed ( +- 0.37% )
----------------------------------------------------------------------------------------------------
/* linux list sort */
Performance counter stats for './qtest -v 0 -f perf-traces/300000.cmd' (10 runs):
21,191,601 cache-misses # 53.300 % of all cache refs ( +- 2.69% )
38,848,369 cache-references ( +- 0.91% )
1,322,133,992 instructions # 1.23 insn per cycle ( +- 0.03% )
1,016,682,373 cycles ( +- 1.69% )
0.6824 +- 0.0115 seconds time elapsed ( +- 1.69% )
/* my merge sort */
Performance counter stats for './qtest -v 0 -f perf-traces/400000.cmd' (10 runs):
36,354,050 cache-misses # 56.179 % of all cache refs ( +- 0.36% )
64,409,386 cache-references ( +- 0.20% )
1,747,604,644 instructions # 1.16 insn per cycle ( +- 0.01% )
1,498,409,395 cycles ( +- 0.13% )
0.94878 +- 0.00400 seconds time elapsed ( +- 0.42% )
----------------------------------------------------------------------------------------------------
/* linux list sort */
Performance counter stats for './qtest -v 0 -f perf-traces/400000.cmd' (10 runs):
30,814,049 cache-misses # 57.978 % of all cache refs ( +- 0.11% )
53,256,480 cache-references ( +- 0.07% )
1,771,579,815 instructions # 1.29 insn per cycle ( +- 0.00% )
1,373,081,425 cycles ( +- 0.12% )
0.86360 +- 0.00113 seconds time elapsed ( +- 0.13% )
/* my merge sort */
Performance counter stats for './qtest -v 0 -f perf-traces/500000.cmd' (10 runs):
47,344,165 cache-misses # 56.546 % of all cache refs ( +- 0.18% )
83,221,177 cache-references ( +- 0.18% )
2,190,772,850 instructions # 1.14 insn per cycle ( +- 0.01% )
1,905,805,048 cycles ( +- 0.13% )
1.20848 +- 0.00371 seconds time elapsed ( +- 0.31% )
----------------------------------------------------------------------------------------------------
/* linux list sort */
Performance counter stats for './qtest -v 0 -f perf-traces/500000.cmd' (10 runs):
40,109,266 cache-misses # 58.315 % of all cache refs ( +- 0.27% )
68,806,922 cache-references ( +- 0.18% )
2,219,682,272 instructions # 1.27 insn per cycle ( +- 0.00% )
1,747,506,164 cycles ( +- 0.38% )
1.09820 +- 0.00412 seconds time elapsed ( +- 0.37% )
linux list sort
的效能因為 cache miss
的次數明顯低於我實作的 merge sort
,所以執行時間也比較快。發現在使用兩次以上的 new
後再做 free
會進入無限迴圈。
利用 GDB+pwndbg 作測試,可以發現程式在 is_circular
這個函式無限迴圈
pwndbg> run
Starting program: /home/will/linux2023/lab0-c/qtest
cmd> new
l = []
cmd> new
l = []
cmd> free
^C
Program received signal SIGINT, Interrupt.
is_circular () at qtest.c:856
856 while (cur != current->q) {
繼續往下看可以發現是在做 do_free
裡的 q_show
的 is_circular
進入無限迴圈的
► f 0 0x555555556acc q_show+67
f 1 0x555555556acc q_show+67
f 2 0x5555555572a2 do_free+252
f 3 0x55555555a055 interpret_cmda+128
f 4 0x55555555a625 interpret_cmd+353
f 5 0x55555555b2e3 run_console+174
f 6 0x5555555593e2 main+1601
f 7 0x7ffff7c97083 __libc_start_main+243
最後仔細檢查可以發現 q_free
裡的
if (chain.size > 1) {
qnext = ((uintptr_t) ¤t->chain.next == (uintptr_t) &chain.head)
? chain.head.next
: current->chain.next;
}
¤t->chain.next
不用加 &
結果發現 sysprog/lab0-c 已經修正該錯誤了。
+++ TESTING trace trace-03-ops:
# Test of insert_head, insert_tail, remove_head, reverse and merge
ERROR: Freed queue, but 2 blocks are still allocated
==7825== 112 bytes in 2 blocks are still reachable in loss record 1 of 1
==7825== at 0x483B7F3: malloc (in /usr/lib/x86_64-linux-gnu/valgrind/vgpreload_memcheck-amd64-linux.so)
==7825== by 0x10F3F4: test_malloc (harness.c:133)
==7825== by 0x10F8A0: q_new (queue.c:18)
==7825== by 0x10CD91: do_new (qtest.c:151)
==7825== by 0x10E066: interpret_cmda (console.c:181)
==7825== by 0x10E636: interpret_cmd (console.c:201)
==7825== by 0x10EA73: cmd_select (console.c:610)
==7825== by 0x10F37E: run_console (console.c:705)
==7825== by 0x10D3F3: main (qtest.c:1246)
==7825==
--- trace-03-ops 0/6
實際去對 trace-03-ops
的每個指令去作測試,發現在對兩個 new
出來的佇列做 merge
時會跳出上述的問題,因此我推測是 merge
有什麼地方寫錯,後來發現 q_merge
裡的 while
迴圈的 second->q = NULL
是不對的。預期的輸出是若有兩個佇列 a
和 b
要和併到 a
,把 b
的所有節點移到 a
時也要把 b
做初始化的動作,也就是要保留 second->q
,所以如果直接做 second->q = NULL
會造成 memory leak
的問題。
重新修改程式後順利排除記憶體的所有問題
will@will:~/linux2023/lab0-c$ make valgrind
# Explicitly disable sanitizer(s)
make clean SANITIZER=0 qtest
make[1]: Entering directory '/home/will/linux2023/lab0-c'
rm -f qtest.o report.o console.o harness.o queue.o random.o dudect/constant.o dudect/fixture.o dudect/ttest.o shannon_entropy.o linenoise.o web.o .qtest.o.d .report.o.d .console.o.d .harness.o.d .queue.o.d .random.o.d .dudect/constant.o.d .dudect/fixture.o.d .dudect/ttest.o.d .shannon_entropy.o.d .linenoise.o.d .web.o.d *~ qtest /tmp/qtest.*
rm -rf .dudect
rm -rf *.dSYM
(cd traces; rm -f *~)
CC qtest.o
CC report.o
CC console.o
CC harness.o
CC queue.o
CC random.o
CC dudect/constant.o
CC dudect/fixture.o
CC dudect/ttest.o
CC shannon_entropy.o
CC linenoise.o
CC web.o
LD qtest
make[1]: Leaving directory '/home/will/linux2023/lab0-c'
cp qtest /tmp/qtest.5cWFNg
chmod u+x /tmp/qtest.5cWFNg
sed -i "s/alarm/isnan/g" /tmp/qtest.5cWFNg
scripts/driver.py -p /tmp/qtest.5cWFNg --valgrind
--- Trace Points
+++ TESTING trace trace-01-ops:
# Test of insert_head and remove_head
--- trace-01-ops 5/5
+++ TESTING trace trace-02-ops:
# Test of insert_head, insert_tail, remove_head, remove_tail, and delete_mid
--- trace-02-ops 6/6
+++ TESTING trace trace-03-ops:
# Test of insert_head, insert_tail, remove_head, reverse and merge
--- trace-03-ops 6/6
+++ TESTING trace trace-04-ops:
# Test of insert_head, insert_tail, size, swap, and sort
--- trace-04-ops 6/6
+++ TESTING trace trace-05-ops:
# Test of insert_head, insert_tail, remove_head, reverse, size, swap, and sort
--- trace-05-ops 6/6
+++ TESTING trace trace-06-ops:
# Test of insert_head, insert_tail, delete duplicate, sort, descend and reverseK
--- trace-06-ops 6/6
+++ TESTING trace trace-07-string:
# Test of truncated strings
--- trace-07-string 6/6
+++ TESTING trace trace-08-robust:
# Test operations on empty queue
--- trace-08-robust 6/6
+++ TESTING trace trace-09-robust:
# Test remove_head with NULL argument
--- trace-09-robust 6/6
+++ TESTING trace trace-10-robust:
# Test operations on NULL queue
--- trace-10-robust 6/6
+++ TESTING trace trace-11-malloc:
# Test of malloc failure on insert_head
--- trace-11-malloc 6/6
+++ TESTING trace trace-12-malloc:
# Test of malloc failure on insert_tail
--- trace-12-malloc 6/6
+++ TESTING trace trace-13-malloc:
# Test of malloc failure on new
--- trace-13-malloc 6/6
+++ TESTING trace trace-14-perf:
# Test performance of insert_tail, reverse, and sort
--- trace-14-perf 6/6
+++ TESTING trace trace-15-perf:
# Test performance of sort with random and descending orders
# 10000: all correct sorting algorithms are expected pass
# 50000: sorting algorithms with O(n^2) time complexity are expected failed
# 100000: sorting algorithms with O(nlogn) time complexity are expected pass
--- trace-15-perf 6/6
+++ TESTING trace trace-16-perf:
# Test performance of insert_tail
--- trace-16-perf 6/6
+++ TESTING trace trace-17-complexity:
# Test if time complexity of q_insert_tail, q_insert_head, q_remove_tail, and q_remove_head is constant
Probably constant time
Probably constant time
Probably constant time
Probably constant time
--- trace-17-complexity 5/5
--- TOTAL 100/100
論文連結
dudect Github
參考
這篇論文的貢獻是,透過自己開發的 dudect,一個透過統計分析的程式判斷程式是否是 constant-time 在給定的平台之下。
實作可分為以下步驟:
Classes definition
: 給定兩種不同類別的輸入資料,重複對方法進行測量,最常見的就是 fix-vs-random testsCycle counters
: 現今的 CPU 提供的 cycle counters 可以很精準的獲得執行的時間。Environmental conditions
: 為了減少環境的差異,每次測量都會對應隨機的輸入類別。Cropping
: 剪裁掉一些超過 threshold 的測量結果Higher-order preprocessing
: Depending on the statistical test applied, it may be beneficial to apply some higherorder pre-processingt-test
: Welch’s t-test,測試兩個平均數是否相同Non-parametric tests
:(無母數統計分析): Kolmogorov-Smirnovpercentile
的處理,但在 lab0-c
則無。」。所以我去對照發現 lab0-c
的 fixture.c
確實沒有實作,造成極端值沒有被去除,導致判斷結果出錯。percentile
的部分加入 lab0-c
的 fixture.c
後便可以正確分析 insert_head
, insert_tail
, remove_head
, remove_tail
的執行時間。
static int cmp(const int64_t *a, const int64_t *b)
{
return (int) (*a - *b);
}
static int64_t percentile(int64_t *a, double which, size_t size)
{
qsort(a, size, sizeof(int64_t), (int (*)(const void *, const void *)) cmp);
size_t array_position = (size_t)((double) size * (double) which);
assert(array_position >= 0);
assert(array_position < size);
return a[array_position];
}
/*
set different thresholds for cropping measurements.
the exponential tendency is meant to approximately match
the measurements distribution, but there's not more science
than that.
*/
static void prepare_percentiles(int64_t *percentiles, int64_t *exec_times)
{
for (size_t i = 0; i < DUDECT_NUMBER_PERCENTILES; i++) {
percentiles[i] = percentile(
exec_times,
1 - (pow(0.5, 10 * (double) (i + 1) / DUDECT_NUMBER_PERCENTILES)),
N_MEASURES);
}
}