執行人: andy155224
重做 lab0 並彙整其他學員的成果。
$ gcc --version
gcc (Ubuntu 11.3.0-1ubuntu1~22.04.1) 11.3.0
Copyright (C) 2021 Free Software Foundation, Inc.
This is free software; see the source for copying conditions. There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
$ lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Address sizes: 46 bits physical, 48 bits virtual
Byte Order: Little Endian
CPU(s): 24
On-line CPU(s) list: 0-23
Vendor ID: GenuineIntel
Model name: 13th Gen Intel(R) Core(TM) i7-13700
CPU family: 6
Model: 183
Thread(s) per core: 2
Core(s) per socket: 16
Socket(s): 1
Stepping: 1
CPU max MHz: 5200.0000
CPU min MHz: 800.0000
BogoMIPS: 4224.00
宣告一個結構體 list_head
的指標 head
,然後透過 list.h 中的 INIT_LIST_HEAD()
來對 head
配置記憶體。
struct list_head *q_new()
{
struct list_head *head = malloc(sizeof(struct list_head));
if (head) {
INIT_LIST_HEAD(head);
return head;
}
return NULL;
}
透過 list_for_each_entry_safe
逐一走訪 queue 中的每一個 element,並透過 container_of
計算每一個 element 的記憶體起始位置,計算出位置後再用 q_release_element
將整個結構體 element
所配置的記憶體空間,包含結構體內的其他結構體和指標變數所指向的記憶體一同釋放,最後再將 queue 的 head l
釋放掉,以保證整個 queue 的記憶體空間都被釋放。
void q_free(struct list_head *l)
{
if (!l)
return;
element_t *pos, *next;
list_for_each_entry_safe (pos, next, l, list) {
q_release_element(container_of(&pos->list, element_t, list));
}
free(l);
}
原本的寫法如下:
bool q_insert_head(struct list_head *head, char *s)
{
if (!head || !s)
return false;
element_t *n = malloc(1 * sizeof(element_t));
char *value = malloc((strlen(s) + 1) * sizeof(char));
if (!n || !value)
return false;
memcpy(value, s, strlen(s) + 1);
n->value = value;
list_add(&n->list, head);
return true;
}
但 qwe661234
點出上述程式碼在 element_t *n
成功配置,但 char *value
配置失敗時,會導致配置成功的 element_t *n
之記憶體沒有被釋放。所以修改為:
bool q_insert_head(struct list_head *head, char *s)
{
if (!head || !s)
return false;
element_t *n = malloc(1 * sizeof(element_t));
+ if (!n)
+ return false;
char *value = malloc((strlen(s) + 1) * sizeof(char));
- if (!n || !value)
+ if (!value) {
+ free(n);
return false;
+ }
memcpy(value, s, strlen(s) + 1);
n->value = value;
list_add(&n->list, head);
}
函式運作的邏輯和 q_insert_head
只差在將 list_add()
改為 list_add_tail()
。
strncpy
, snprintf
和 strlcpy
的差異。因為 strcpy
會存在潛在的 buffer overflow 的問題,所以應該用 strncpy
來限制寫入的位元組的大小。但是 strncpy
也並非是安全的,因為有可能會發生 dest
字串的結尾並沒有 null terminator 的問題,所以有了更安全的 strlcpy
能選擇。strlcpy
如果遇到寫入的位元組大小比 src
的位元組大小還要小的時候,會在欲寫入的位元組大小的最後一個位元填入 null terminator 。但是因為這需要安裝額外的 package 才能 include bsd/string.h
,所以我選擇使用 snprintf
來達到一樣的效果。
element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize)
{
if (!head || list_empty(head))
return NULL;
element_t *tmp = list_first_entry(head, element_t, list);
list_del_init(&tmp->list);
if (sp)
snprintf(sp, bufsize, "%s", tmp->value);
return tmp;
}
函式運作的邏輯和 q_insert_head
只差在將 list_first_entry()
改為 list_last_entry()
。
透過 list_for_each()
逐一走訪 queue 中的每個 element 並用 int len
來計數。
int q_size(struct list_head *head)
{
if (!head)
return 0;
int len = 0;
struct list_head *li;
list_for_each (li, head)
len++;
return len;
}
透過一個型別為 list_head
的指標變數 tmp
和一個布林變數 flag
,在逐一走訪 queue
中的每一個 entry 時,每當 flag
為 true 時將 tmp
指向其下一個 entry 的 list。這樣當走訪完整個 queue
後 tmp
就會指向 queue
的第
bool q_delete_mid(struct list_head *head)
{
if (!head || list_empty(head))
return false;
bool flag = true;
element_t *pos;
struct list_head *tmp = head;
list_for_each_entry (pos, head, list) {
if (flag)
tmp = tmp->next;
flag = !flag;
}
element_t *mid = list_entry(tmp, element_t, list);
list_del_init(&mid->list);
q_release_element(mid);
return true;
}
但因為 flag
會反覆在 0 和 1 之間變化,造成以下程式碼在
if (flag)
tmp = tmp->next;
不做任何處理的情況下會產生
要減少 branch 次數的其中一種做法是透過快慢指標來找尋中間節點,但因為 lab0 所使用的資料結構為 doubly linked-list,所以可以考慮到雙向的特性,用更少次的操作找到中間節點,如下:
bool q_delete_mid(struct list_head *head)
{
if (!head || list_empty(head))
return false;
struct list_head *right, *left;
right = head->next;
left = head->prev;
while (right != left && left->prev != right) {
right = right->next;
left = left->prev;
}
list_del(left);
q_release_element(list_entry(left, element_t, list));
return true;
}
透過兩個指標 right
和 left
,每次迭代時 right
會指向 right->next
, left
會指向 left->prev
,這樣一旦 right
== left
或是 left->prev
== right
,此時的 left
即為中間節點。藉由這種做法就能在比使用快慢指標還少的操作下同時也有較少的 branch 產生。
透過 list_for_each_entry_safe
來逐一走訪整個 queue,每次迭代時會比較 curr->value
和 next->value
是否相同,如果相同的話將 curr
刪除並釋放記憶體空間,並把布林變數 flag
設為 true 以在 curr-> value
和 next->value
不同時將 curr
刪除並釋放記憶體空間。
bool q_delete_dup(struct list_head *head)
{
if (!head || list_empty(head))
return false;
bool flag = false;
element_t *curr, *next;
list_for_each_entry_safe (curr, next, head, list) {
if (&next->list != head && strcmp(curr->value, next->value) == 0) {
flag = true;
list_del(&curr->list);
q_release_element(curr);
} else if (flag) {
flag = false;
list_del(&curr->list);
q_release_element(curr);
}
}
return true;
}
透過 list_for_each()
逐一走訪整個 queue,每當布林變數 flag
為 true 時用 list_move()
將 node
移至 tmp
前,最後再將 node
和 tmp
指向 node->next
以確保走訪的正確性。
void q_swap(struct list_head *head)
{
if (!head || list_empty(head))
return;
bool flag = false;
struct list_head *node, *tmp = head;
list_for_each (node, head) {
if (flag == true) {
list_move(node, tmp);
node = node->next;
tmp = node;
}
flag = !flag;
}
}
透過 list_for_each_safe()
逐一走訪整個 queue 並將走訪到的元素用 list_move()
移至 head
後面,進而完成 reverse 整個 queue。
void q_reverse(struct list_head *head)
{
if (!head || list_empty(head))
return;
struct list_head *node, *safe;
list_for_each_safe (node, safe, head) {
list_move(node, head);
}
}
原本的想法是透過 list_for_each_safe()
逐一走訪整個 queue,同時透過 int countK
來計數,走訪時會將元素 node
移至 tmp
後面,每當 countK mod k == 0
時調整 tmp
的位置來完成功能。
void q_reverseK(struct list_head *head, int k)
{
if (!head || list_empty(head))
return;
int countK = 0;
struct list_head *node, *safe, *tmp = head;
list_for_each_safe (node, safe, head) {
countK++;
list_move(node, tmp);
if (countK % k == 0) {
tmp = safe->prev;
}
}
}
但是實際的效果和預期的不同,因為這和功能所述的 Reverse the nodes of the list k at a time
,不一致,上述程式碼是每次都會執行 list_move
而非當 countK mod k == 0
時才將前面 k
個元素 reverse,差異如下:
l = [1 2 3 4 5]
cmd> reverseK 3
l = [3 2 1 5 4] <-- 執行結果
l = [3 2 1 4 5] <-- 預期結果
而我想到的解決方式是額外宣告一個變數 int count
並將其設為 q_size / k
,也就是總共會發生的 reverse 次數。這樣就能透過 count
來避免額外的 reverse 發生,只有在 count > 0
時才會執行 list_move()
。
void q_reverseK(struct list_head *head, int k)
{
if (!head || list_empty(head))
return;
int countK = 0;
+ int count = q_size(head) / k;
struct list_head *node, *safe, *tmp = head;
list_for_each_safe (node, safe, head) {
countK++;
+ if (count)
list_move(node, tmp);
if (countK % k == 0) {
tmp = safe->prev;
+ count--;
}
}
}
參考了 你所不知道的 C 語言: linked list 和非連續記憶體 ,思路是透過 Divide and Conquer 先遞迴將佇列拆分成 q_size
個子佇列,再兩兩排序合併,最後就會得到一個已排序的佇列。
void mergeTwoLists(struct list_head *L1, struct list_head *L2)
{
if (!L1 || !L2)
return;
struct list_head *Lnode = L1->next, *Rnode = L2->next;
while (Lnode != L1 && Rnode != L2) {
if (strcmp(list_entry(Lnode, element_t, list)->value,
list_entry(Rnode, element_t, list)->value) > 0) {
struct list_head *tmp = Rnode->next;
list_move_tail(Rnode, Lnode);
Rnode = tmp;
} else {
Lnode = Lnode->next;
}
}
if (q_size(L2) != 0) {
list_splice_tail(L2, L1);
}
}
/* Sort elements of queue in ascending order */
void q_sort(struct list_head *head)
{
if (!head || list_empty(head) || q_size(head) == 1)
return;
struct list_head *tmp, *mid;
tmp = head->next;
mid = head->prev;
while (tmp != mid && mid->prev != tmp) {
tmp = tmp->next;
mid = mid->prev;
}
LIST_HEAD(right);
list_cut_position(&right, head, mid->prev);
q_sort(head);
q_sort(&right);
mergeTwoLists(head, &right);
}
因為 lab0 所使用的資料結構為 doubly linked-list,所以只需要從 queue 的尾部開始逐一反向走訪,並且不斷去比較 struct list_head *curr
對應的 element 的 value 是否大於 struct list_head *curr->prev
對應的 element 的 value,如果大於的話則將 struct list_head *curr->prev
對應的 element 刪除即可。
int q_descend(struct list_head *head)
{
if (!head || list_empty(head))
return 0;
struct list_head *curr = head->prev;
while (curr->prev != head) {
if (strcmp(list_entry(curr, element_t, list)->value,
list_entry(curr->prev, element_t, list)->value) > 0) {
struct list_head *tmp = curr->prev;
list_del(tmp);
q_release_element(list_entry(tmp, element_t, list));
} else {
curr = curr->prev;
}
}
return q_size(head);
}
先透過 container_of
找出連接著各個 queue 的結構體 queue_contex_t *queue_head
的記憶體位置,然後逐一走訪相連著的 queue 並透過 mergeTwoLists()
將第二個至最後一個 queue 合併進第一個 queue 中。因為每一個 queue 預設都已排序,所以 mergeTwoLists()
完就會是一個已排序的結果。
之所以不先將所有 queue 都透過 list_splice_init
合併後再用 q_sort()
合併是因為每一個 queue 都已排序,若再執行 q_sort()
中 divide 顯而易見是多餘的。
在 list.h
中的 list_splice_init()
有提到 "The @list head will not end up in an uninitialized state like when using list_splice. Instead the @list is initialized again to the an empty list/unlinked state.",所以在迴圈中每當合併完兩個 queue 都需要 INIT_LIST_HEAD(queue->q)
。
int q_merge(struct list_head *head)
{
if (!head)
return 0;
queue_contex_t *queue_head =
container_of(head->next, queue_contex_t, chain);
for (struct list_head *curr = head->next->next; curr != head;
curr = curr->next) {
queue_contex_t *queue = container_of(curr, queue_contex_t, chain);
mergeTwoLists(queue_head->q, queue->q);
INIT_LIST_HEAD(queue->q);
queue->size = 0;
}
return q_size(queue_head->q);
}
lab0-c
將 list_sort.c 和 list_sort.h 放到 lab0
的專案下並修改 Makefile 使得在 make 時會編譯 list_sort.c
OBJS := qtest.o report.o console.o harness.o queue.o \
random.o dudect/constant.o dudect/fixture.o dudect/ttest.o \
shannon_entropy.o \
linenoise.o web.o \
+ list_sort.o
在 list_sort.c
中有使用到 likely()
和 unlikely
,這是在 linux/compiler.h 裡的巨集,顧將這兩個巨集的定義搬移至 lab0
專案中的 list_sort.c
中。
#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)
修改 list_sort.c
和 list_sort.h
,調整函式的參數並修改一些型別定義使得 list_sort.c
能在 lab0
中運作。
void *priv
移除nonnull()
list_sort.c
中定義的型別 u8
改成 uint8_t
merge()
中針對 struct list_head *head
去抑制 cppcheck 的檢查,因為 head
不會是一個 unassigned variable將 list_sort.c
中的 cmp()
都改成使用 strcmp()
來比較字串大小,並抑制 cppcheck 的 nullPointer 檢查。
// cppcheck-suppress nullPointer
if (strcmp(list_entry(a, element_t, list)->value,
// cppcheck-suppress nullPointer
list_entry(b, element_t, list)->value) <= 0)
console.c
中新增 parameter algo
藉由全域變數 algo
來選擇排序演算法要使用 q_sort()
還是 list_sort()
int algo = 0;
透過 add_param()
新增有關 algo
的說明,並且這樣就能使用 option algo
來變更 algo
的值,以此決定要使用哪一個排序演算法。
add_param("algo", &algo, "Select sort algorithm, q_sort = 0, list_sort = 1", NULL);
do_sort
讓 do_sort() 可以根據 algo
的數值來決定要使用哪一個排序演算法。
- if (current && exception_setup(true))
+ if (current && exception_setup(true)) {
+ if (algo == 0)
q_sort(current->q);
+ else
+ list_sort(current->q);
+ }
q_sort()
和 linux kernel 中的 list_sort()
之間的效能落差新增 trace-sort_algorithm.cmd
來測試自己實作的 q_sort()
和 list_sort()
各別在排序 10000、500000 和 1000000 個節點時排序所需要的時間。在這個實驗中會執行十次 trace-sort_algorithm.cmd
,並統計在不同節點數量的情況下的平均排序時間各別為何。
option verbose 1
option algo 0
new
ih RAND 100000
time sort
free
option algo 1
new
ih RAND 100000
time sort
free
option algo 0
new
ih RAND 500000
time sort
free
option algo 1
new
ih RAND 500000
time sort
free
option algo 0
new
ih RAND 1000000
time sort
free
option algo 1
new
ih RAND 1000000
time sort
free
節點數量 | q_sort 平均排序時間 | list_sort 平均排序時間 |
---|---|---|
100000 | 0.0226(s) | 0.0213(s) |
5000000 | 0.236(s) | 0.2548(s) |
1000000 | 0.7739(s) | 0.6448(s) |
從十次測試的平均中可以看到,在節點數為 100000 時,list_sort()
優於 q_sort()
6%,在節點數為 500000 時,q_sort()
優於 list_sort()
7.4%,而在節點數為 1000000 時,list_sort()
優於 q_sort()
17%。
需要解讀效能落差的成因,以及你能否從中學習到其精髓。
lib/list_sort.c
並量化分析解讀 Linux 核心原始程式碼的 lib/list_sort.c
,設計實驗來量化分析,探討其實作手法,需要善用 perf 一類的工具。解析程式碼要有完整的圖文、數學證明,和如何達到 cache 友善。
首先先來分析自己實作的 q_sort()
和 linux kernel 中的 list_sort()
到底有何不同。
q_sort
採取的是 merge sort,也就是先不斷遞迴分割,直到每一個子佇列都只有一個節點後後再兩兩合併成一個排序過的佇列,而會對 cache 不友善的主因也是發生在 merge
這個階段。因為此做法採取的是將同樣 level 的子佇列都合併完之後才會繼續遞迴往上合併,直到成為一個和原本佇列大小為止。但是這種做法會導致一但要排序的子佇列的大小超過了 L1 cache 的大小,cache 中的子佇列會不斷的在不同層級的 cache 中搬移,形成 cache thrashing,對 cache 不友善。
而 list_sort
之所以能達到 cache 友善是因為其合併方式採取了一個特殊的作法:
每次合併時保持合併與不合併的子佇列比例不會大於 2:1,這樣能確保合併的子佇列的大小可以放到 L1 cache 中,進而避免 cache thrashing。而在註解中也有提到藉由 2:1 的比例,可以省下 0.2*n 次的比較操作。
那實作中是如何確保 2:1 的比例呢?
實際上這是一段非常精美的程式碼,只透過了一個變數 count
來紀錄目前讀到的節點數,每當 count
為 2 的冪時不合併,反之則合併,以下表格為
使用 Linux 核心的 Random Number Algorithm Definitions 來產生隨機數,作為上述排序程式 (包含 Linux 核心的實作) 的輸入,以檢驗鏈結串列的排序實作。