contributed by < ray90514
>
$ gcc --version
gcc (Ubuntu 9.3.0-17ubuntu1~20.04) 9.3.0
Copyright (C) 2019 Free Software Foundation, Inc.
This is free software; see the source for copying conditions. There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
$ lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
Address sizes: 43 bits physical, 48 bits virtual
CPU(s): 8
On-line CPU(s) list: 0-7
Thread(s) per core: 2
Core(s) per socket: 4
Socket(s): 1
NUMA node(s): 1
Vendor ID: AuthenticAMD
CPU family: 23
Model: 24
Model name: AMD Ryzen 5 PRO 3500U w/ Radeon Vega Mobile Gfx
Stepping: 1
Frequency boost: enabled
CPU MHz: 1675.446
CPU max MHz: 2100.0000
CPU min MHz: 1400.0000
BogoMIPS: 4192.05
Virtualization: AMD-V
L1d cache: 128 KiB
L1i cache: 256 KiB
L2 cache: 2 MiB
L3 cache: 4 MiB
NUMA node0 CPU(s): 0-7
struct list_head *q_new()
{
struct list_head *head = malloc(sizeof(struct list_head));
if (!head)
return NULL;
INIT_LIST_HEAD(head);
return head;
}
檢查 malloc
是否成功以及使用 INIT_LIST_HEAD
初始化。
void q_free(struct list_head *head)
{
if (!head)
return;
struct list_head *node;
struct list_head *safe;
list_for_each_safe (node, safe, head) {
q_release_element(list_entry(node, element_t, list));
}
free(head);
}
因為要釋放節點及其所屬的 entry ,所以使用 list_for_each_safe
而不是 list_for_each
。前者會在走訪節點的過程,保存每個節點的 next
指標,所以可以安全釋放記憶體,而不會影響後續走訪。
使用通順的漢語書寫。
bool q_insert_head(struct list_head *head, char *s)
{
if (!head)
return false;
element_t *element = malloc(sizeof(element_t));
if (!element)
return false;
element->value = strdup(s);
if (!element->value) {
free(element);
return false;
}
list_add(&element->list, head);
return true;
}
使用 list_add
將節點插入佇列的開頭,若是字串的記憶體分配失敗時,要記得釋放節點的記憶體。
與 q_insert_head
相似,將 list_add
改成 list_add_tail
。
element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize)
{
if (!head || list_empty(head))
return NULL;
element_t *element = list_first_entry(head, element_t, list);
list_del(&element->list);
if (sp) {
strncpy(sp, element->value, bufsize - 1);
sp[bufsize - 1] = '\0';
}
return element;
}
使用 list_first_entry
取得開頭的entry,利用 list_del
將其自佇列中移除。再判斷是否有要將 value
字串裡的值複製到 sp
。使用 strncpy
可以複製給定數量的前 n 個字元,因此不保證 '\0'
做結尾,需要手動補上。
與 q_insert_head
相似,將 list_first_entry
改成 list_last_entry
,以取得尾端的entry 。
int q_size(struct list_head *head)
{
if (!head)
return 0;
struct list_head *node;
int len = 0;
list_for_each (node, head)
len++;
return len;
}
走訪佇列中所有節點,紀錄過程中的節點數量。
bool q_delete_mid(struct list_head *head)
{
if (!head || list_empty(head))
return false;
struct list_head *forward = head->next;
struct list_head *backward = head;
while (forward != backward) {
backward = backward->prev;
if (forward == backward)
break;
forward = forward->next;
}
list_del(forward);
q_release_element(list_entry(forward, element_t, list));
return true;
}
由兩個指標一個向前走訪,一個向後走訪,當兩個指標相同時所指到的節點即為中間的節點。因為索引是由0開始,所以由 forward
指標先走訪。
bool q_delete_dup(struct list_head *head)
{
if (!head)
return false;
struct list_head *node = head->next->next;
struct list_head *prev = head;
while (node != head) {
element_t *element = list_entry(node, element_t, list);
element_t *prev_element = list_entry(node->prev, element_t, list);
if (strcmp(element->value, prev_element->value) == 0) {
while (node != head &&
strcmp(element->value, prev_element->value) == 0) {
q_release_element(prev_element);
node = node->next;
prev_element = element;
element = list_entry(node, element_t, list);
}
q_release_element(prev_element);
prev->next = node;
node->prev = prev;
if (node != head)
node = node->next;
} else {
node = node->next;
prev = prev->next;
}
}
return true;
}
由 prev
跟 node
這兩個相差一個節點的指標進行走訪,如果 node
與 node->prev
所指向節點的字串相同時,會找出下一個不相同的節點。
然後將相同字串的節點刪除,把 prev
所指到的節點與 node
所指到的節點相連, node
設成 node->next
,讓兩指標維持相差一個節點的狀態。
使用 Graphviz 重新繪製上圖。
使用 list_for_each_entry_safe
簡化程式碼,加入布林變數 is_dup
用來判斷指標所指的節點是否為重複節點的最後一個 。
bool q_delete_dup(struct list_head *head)
{
if (!head)
return false;
bool is_dup = false;
element_t *entry;
element_t *safe;
struct list_head *prev = head;
list_for_each_entry_safe(entry, safe, head, list) {
if(&safe->list != head && strcmp(entry->value, safe->value) == 0) {
q_release_element(entry);
is_dup = true;
}
else if (is_dup) {
is_dup = false;
q_release_element(entry);
prev->next = &safe->list;
safe->list.prev = prev;
}
else {
prev = prev->next;
}
}
return true;
}
void q_swap(struct list_head *head)
{
if (!head || list_empty(head))
return;
for (struct list_head *node = head->next;
node != head && node->next != head; node = node->next) {
struct list_head *next = node->next;
node->prev->next = next;
next->next->prev = node;
node->next = next->next;
next->next = node;
next->prev = node->prev;
node->prev = next;
}
}
使用 node
指標走訪整個佇列,交換 node
與 node->next
的位置。
void q_reverse(struct list_head *head)
{
if (!head || list_empty(head))
return;
struct list_head *node = head;
struct list_head *next = node->next;
do {
node->next = node->prev;
node->prev = next;
node = next;
next = node->next;
} while (node != head);
}
走訪整個佇列,交換每個節點的 next
和 prev
。
#define SORT_BUFSIZE 32
void q_sort(struct list_head *head)
{
if (!head || head->next == head->prev)
return;
struct list_head *pending[SORT_BUFSIZE] = {};
struct list_head *result = head->next;
struct list_head *next;
int i;
head->prev->next = NULL;
while (result) {
next = result->next;
result->next = NULL;
for (i = 0; i < SORT_BUFSIZE && pending[i]; i++) {
result = merge(pending[i], result);
pending[i] = NULL;
}
if (i == SORT_BUFSIZE)
i--;
pending[i] = result;
result = next;
}
/*merge final*/
result = NULL;
for (i = 0; i < SORT_BUFSIZE - 1; i++) {
result = merge(pending[i], result);
}
merge_final(head, result, pending[SORT_BUFSIZE - 1]);
}
改寫自 Bottom-up implementation using lists 。演算法的運作大致如下:
pending
這個陣列。串列長度為
struct list_head *merge(struct list_head *a, struct list_head *b)
{
struct list_head head = {.next = NULL};
struct list_head *tail = &head;
while (a && b) {
char *sa = list_entry(a, element_t, list)->value;
char *sb = list_entry(b, element_t, list)->value;
/* if equal, take 'a' -- important for sort stability */
struct list_head **smaller = strcmp(sa, sb) <= 0 ? &a : &b;
tail->next = *smaller;
tail = tail->next;
*smaller = (*smaller)->next;
}
tail->next = (struct list_head *) ((uintptr_t) a | (uintptr_t) b);
return head.next;
}
兩個串列的開頭比較,將較小的節點放入排序好的串列。
tail->next = *smaller;
這裡利用到了指標的指標 smaller
可以直接將較小串列的開頭移除。
*smaller = (*smaller)->next;
至於 merge_final
是用來做最後一次的合併,除了原本的合併操作外還有讓單向串列變回原本雙向串列。
command 是「命令」,instruction 是「指令」,二者語意不同。
使用以下程式將指令命令 shuffle
加入至 qtest
ADD_COMMAND(shuffle, " | Shuffle every nodes in queue");
執行命令 shuffle
會呼叫到 do_shuffle
。裡面包含初始化和檢查輸入的參數,主要的洗牌程式是在 q_shuffle
中。
void q_shuffle(struct list_head *head)
{
if (!head)
return;
for (int i = q_size(head); i > 0; i--) {
struct list_head *node = head->next;
for(int j = rand() % i; j > 0; j--) {
node = node->next;
}
list_move_tail(node, head);
}
}
這個方始是從原始的Fisher–Yates shuffle改良而來,因為這裡的佇列頭尾相連,所以可以放入直接放回佇列而不用另外放,而且因為是固定放入尾端,所以不用擔心會挑選到已經選過得節點。
當輸入命令 web_cmd [port]
後會初始化 socket ,以及關閉 linenoise
。
static bool do_web_cmd(int argc, char *argv[])
{
int port = DEFAULT_PORT;
if (listenfd > 0)
return true;
if (argc == 2) {
get_int(argv[1], &port);
}
listenfd = open_listenfd(port);
if (listenfd > 0) {
report(1, "listen on port %d, fd is %d", port, listenfd);
} else {
report(1, "Fail to run web server");
return false;
}
int flags = fcntl(listenfd, F_GETFL);
fcntl(listenfd, F_SETFL, flags | O_NONBLOCK);
noise = false;
return true;
}
原本使用 linenoise
讀取輸入會改用 cmd_select
讀取輸入。
bool run_console(char *infile_name)
{
if (!push_file(infile_name)) {
report(1, "ERROR: Could not open source file '%s'", infile_name);
return false;
}
if (!has_infile) {
char *cmdline;
while (noise && (cmdline = linenoise(prompt)) != NULL) {
interpret_cmd(cmdline);
linenoiseHistoryAdd(cmdline); /* Add to the history. */
linenoiseHistorySave(HISTORY_FILE); /* Save the history on disk. */
linenoiseFree(cmdline);
}
if (!noise) {
while (!cmd_done()) {
cmd_select(0, NULL, NULL, NULL, NULL);
}
}
} else {
while (!cmd_done())
cmd_select(0, NULL, NULL, NULL, NULL);
}
return err_cnt == 0;
}
接下來是修改 cmd_select
,先將 server socket 的 file descriptor 加入 readfds
, readfds
的類型是 fd_set
,作為 file descriptor 的集合。
if (listenfd != -1)
FD_SET(listenfd, readfds);
將 select
的第一個參數 nfds
設成要監測的 file descriptor 中的最大值加一。
if (listenfd >= nfds)
nfds = listenfd + 1;
呼叫 select
後,會將 readfds
中未準備好讀取的 file decriptor 移除, FD_ISSET
可以確認給定的 file descriptor 是否還在 readfds
。 process
會回傳處理好的 http 請求。
int result = select(nfds, readfds, writefds, exceptfds, timeout);
if (result <= 0)
return result;
infd = buf_stack->fd;
if (readfds && FD_ISSET(infd, readfds)) {
/* Commandline input available */
FD_CLR(infd, readfds);
result--;
char *cmdline;
cmdline = readline();
if (cmdline)
interpret_cmd(cmdline);
} else if (readfds && FD_ISSET(listenfd, readfds)) {
FD_CLR(listenfd, readfds);
result--;
int connfd;
struct sockaddr_in clientaddr;
socklen_t clientlen = sizeof(clientaddr);
connfd = accept(listenfd, (SA *) &clientaddr, &clientlen);
char *p = process(connfd, &clientaddr);
if (p)
interpret_cmd(p);
free(p);
close(connfd);
}
infd
是原本輸入的 file descriptor ,它的值 buf_stack->fd
在 push_file
被初始化,判斷有沒有檔案作為輸入,如果沒有就是標準輸入。
接續前述之 q_sort
,Linux Kernel v5.2 以前的 list_sort
採用了類似的實作。這個演算法的缺點是,當佇列長度遠大於2的陣列長度次方時,會變得像是插入排序,複雜度為
list_sort
(Linux Kernel v5.1.21)void list_sort(void *priv, struct list_head *head,
int (*cmp)(void *priv, struct list_head *a,
struct list_head *b))
{
struct list_head *part[MAX_LIST_LENGTH_BITS+1]; /* sorted partial lists
-- last slot is a sentinel */
int lev; /* index into part[] */
int max_lev = 0;
struct list_head *list;
if (list_empty(head))
return;
memset(part, 0, sizeof(part));
head->prev->next = NULL;
list = head->next;
while (list) {
struct list_head *cur = list;
list = list->next;
cur->next = NULL;
for (lev = 0; part[lev]; lev++) {
cur = merge(priv, cmp, part[lev], cur);
part[lev] = NULL;
}
if (lev > max_lev) {
if (unlikely(lev >= ARRAY_SIZE(part)-1)) {
printk_once(KERN_DEBUG "list too long for efficiency\n");
lev--;
}
max_lev = lev;
}
part[lev] = cur;
}
for (lev = 0; lev < max_lev; lev++)
if (part[lev])
list = merge(priv, cmp, part[lev], list);
merge_and_restore_back_links(priv, cmp, head, part[max_lev], list);
}
這裡給出了長度過長的訊息。
if (unlikely(lev >= ARRAY_SIZE(part)-1)) {
printk_once(KERN_DEBUG "list too long for efficiency\n");
lev--;
}
list_sort
:如果要儲存任意長度的資料,自然是想到用 Linked list ,新版用 struct list_head *pending
取代原本的 struct list_head *part[MAX_LIST_LENGTH_BITS+1]
,以及加入了 size_t count
用來判斷合併的順序。
與其再用 struct list_head
建立串列的串列,這裡使用比較巧妙的作法,因為合併的過程中只用到 next
指標,所以將 prev
指標當作連接串列的指標,如圖所示。
count的是用來做延遲合併的計算,與原本的演算法不同,當 pending
中,只有在有兩個
測試用的 list_sort
, merge
和 merge_finale
改成與 q_sort
相同的版本,以及移除 likely()
其餘不變。測試內容為排序 N 個隨機長度為 1 的字串,執行以下程式碼,使用 perf stat --repeat 10 ./test
取得 task-clock
。
#define N 960000
int main(int argc, char *argv[]) {
struct list_head *h1 = q_new();
char s[2] = {0};
for(int i = 0; i < N; i++) {
s[0] = rand();
q_insert_head(h1, s);
}
if(argc == 2 && argv[1][0] == 'l')
q_linux_sort(h1);
else
q_sort(h1);
}
下圖為實驗的結果,佇列長度從 750 開始每次加倍,直到 96000 ,我的 L1d cache 大小是 128 KiB ,塞滿需要3278的節點。從結果可以看出隨著佇列長度增加, list_sort
快 q_sort
約 10% ,延遲合併是更有效率的。而在長度不長時,q_sort
使用陣列儲存待合併的串列,相比list_sort
的串列有更好的表現。
接下來是對 q_sort
長度限制的實驗。 固定對 pending
陣列的大小,從 8 到 16。以下是結果,可以看出當陣列長度不足時,效率明顯下降。
console.c
將原本RIO套件的 rio_t
加入了 rio_ptr prev
,將檔案以堆疊的方法儲存,
使用 push_file()
跟 pop_file()
操作,以及一個指向頂部的指標 buf_stack
。
struct RIO_ELE {
int fd; /* File descriptor */
int cnt; /* Unread bytes in internal buffer */
char *bufptr; /* Next unread byte in internal buffer */
char buf[RIO_BUFSIZE]; /* Internal buffer */
rio_ptr prev; /* Next element in stack */
};
RIO套件主要用在 readline()
,用來從堆疊頂部的檔案讀取輸入。與一般的 read
相比, readline()
是 Buffered I/O,也就是會預先將檔案讀取至記憶體,而使用的時候改從記憶體讀取,這樣可以減少 system call 的次數。
在做trace-17-complexity
的測試的時候發現, remove_tail
常常有不過的情形,而 remove_head
都能通過,明明兩個函式只有一處不相同。接下來找到 constant.c
,裡面有測試 remove_tail
的程式碼。
case test_remove_tail:
for (size_t i = drop_size; i < n_measure - drop_size; i++) {
dut_new();
dut_insert_head(
get_random_string(),
*(uint16_t *) (input_data + i * chunk_size) % 10000);
before_ticks[i] = cpucycles();
element_t *e = q_remove_tail(l, NULL, 0);
after_ticks[i] = cpucycles();
if (e)
q_release_element(e);
dut_free();
}
break;
乍看之下跟 test_remove_head
的部份除了呼叫 q_remove_tail
完全一樣,但突然想到會不會是 cache miss 的緣故,因為 remove_tail
的測試使用 dut_insert_head
將節點插入至頭部,但移除的節點是在尾端,計算了一下我的 L1d Cache 能容納大概 128 * 1024 / (24 + 8 * 8) = 1489 個節點,而插入大於 1489 的機率很大。
為了驗證我的假設,我將 dut_insert_head
改成 dut_insert_tail
,此時發生了另外一個問題,程式看起來像是卡住一樣非常非常慢。我使用 perf record
看一下發生什麼事。以下是結果。
18.01% qtest qtest [.] test_malloc
12.08% qtest [kernel.kallsyms] [k] _extract_crng
11.67% qtest libc-2.31.so [.] _int_malloc
9.79% qtest libc-2.31.so [.] _int_free
9.36% qtest qtest [.] test_strdup
7.86% qtest qtest [.] test_free
92.56% qtest qtest [.] test_free
兩者的 test_free
之所以差這麼多,是因為我的 q_free
釋放的順序是從頭到尾,與插入的順序相反,加上節點空間大於 cache size 導致大量 cache miss ,從而導致所花的時間變長。
我將 q_free
改成反向的版本後,確實測試通過的次數與 remove_tail
差不多。也就證實了是插入的順序導致了 cache miss。
這裡的缺陷有兩個,第一個是 dut_free
跟 dut_insert_tail
造成的緩慢,目前想到的解決辦法是使用建立與釋放順序相同的函式,或是維持 dut_insert_head
然後使用第二個問題的解決辦法。
第二個問題是 cache miss 所造成的影響,實際上在原本的論文〈Dude, is my code constant time?〉有提到。
To minimize the effect of environmental changes in the results
這裡就是因為處理數據的程式而不是原本待測程式造成結果的改變。對於 test_remove_tail
有比較針對的解決辦法,像是固定將最後一個節點從尾端插入。 想到比較通用的解法是刷新整個 cache 讓測試的時候固定發生 cache miss。