---
tags: linux2022
---
# 2022q1 Homework1 (lab0)
contributed by < `ray90514` >
## 實驗環境
```shell
$ gcc --version
gcc (Ubuntu 9.3.0-17ubuntu1~20.04) 9.3.0
Copyright (C) 2019 Free Software Foundation, Inc.
This is free software; see the source for copying conditions. There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
$ lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
Address sizes: 43 bits physical, 48 bits virtual
CPU(s): 8
On-line CPU(s) list: 0-7
Thread(s) per core: 2
Core(s) per socket: 4
Socket(s): 1
NUMA node(s): 1
Vendor ID: AuthenticAMD
CPU family: 23
Model: 24
Model name: AMD Ryzen 5 PRO 3500U w/ Radeon Vega Mobile Gfx
Stepping: 1
Frequency boost: enabled
CPU MHz: 1675.446
CPU max MHz: 2100.0000
CPU min MHz: 1400.0000
BogoMIPS: 4192.05
Virtualization: AMD-V
L1d cache: 128 KiB
L1i cache: 256 KiB
L2 cache: 2 MiB
L3 cache: 4 MiB
NUMA node0 CPU(s): 0-7
```
## 開發過程
### q_new
```c
struct list_head *q_new()
{
struct list_head *head = malloc(sizeof(struct list_head));
if (!head)
return NULL;
INIT_LIST_HEAD(head);
return head;
}
```
檢查 `malloc` 是否成功以及使用 `INIT_LIST_HEAD` 初始化。
### q_free
```c
void q_free(struct list_head *head)
{
if (!head)
return;
struct list_head *node;
struct list_head *safe;
list_for_each_safe (node, safe, head) {
q_release_element(list_entry(node, element_t, list));
}
free(head);
}
```
因為要釋放節點及其所屬的 entry ,所以使用 `list_for_each_safe` 而不是 `list_for_each` 。前者會在走訪節點的過程,保存每個節點的 `next` 指標,所以可以安全釋放記憶體,而不會影響後續走訪。
:::warning
使用通順的漢語書寫。
:notes: jserv
:::
### q_insert_head
```c
bool q_insert_head(struct list_head *head, char *s)
{
if (!head)
return false;
element_t *element = malloc(sizeof(element_t));
if (!element)
return false;
element->value = strdup(s);
if (!element->value) {
free(element);
return false;
}
list_add(&element->list, head);
return true;
}
```
使用 `list_add` 將節點插入佇列的開頭,若是字串的記憶體分配失敗時,要記得釋放節點的記憶體。
### q_insert_tail
與 `q_insert_head` 相似,將 `list_add` 改成 `list_add_tail` 。
### q_remove_head
```c
element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize)
{
if (!head || list_empty(head))
return NULL;
element_t *element = list_first_entry(head, element_t, list);
list_del(&element->list);
if (sp) {
strncpy(sp, element->value, bufsize - 1);
sp[bufsize - 1] = '\0';
}
return element;
}
```
使用 `list_first_entry` 取得開頭的entry,利用 `list_del` 將其自佇列中移除。再判斷是否有要將 `value` 字串裡的值複製到 `sp` 。使用 `strncpy` 可以複製給定數量的前 n 個字元,因此不保證 `'\0'` 做結尾,需要手動補上。
### q_remove_tail
與 `q_insert_head` 相似,將 `list_first_entry` 改成 `list_last_entry` ,以取得尾端的entry 。
### q_size
```c
int q_size(struct list_head *head)
{
if (!head)
return 0;
struct list_head *node;
int len = 0;
list_for_each (node, head)
len++;
return len;
}
```
走訪佇列中所有節點,紀錄過程中的節點數量。
### q_delete_mid
```c
bool q_delete_mid(struct list_head *head)
{
if (!head || list_empty(head))
return false;
struct list_head *forward = head->next;
struct list_head *backward = head;
while (forward != backward) {
backward = backward->prev;
if (forward == backward)
break;
forward = forward->next;
}
list_del(forward);
q_release_element(list_entry(forward, element_t, list));
return true;
}
```
由兩個指標一個向前走訪,一個向後走訪,當兩個指標相同時所指到的節點即為中間的節點。因為索引是由0開始,所以由 `forward` 指標先走訪。
### q_delete_dup
```c
bool q_delete_dup(struct list_head *head)
{
if (!head)
return false;
struct list_head *node = head->next->next;
struct list_head *prev = head;
while (node != head) {
element_t *element = list_entry(node, element_t, list);
element_t *prev_element = list_entry(node->prev, element_t, list);
if (strcmp(element->value, prev_element->value) == 0) {
while (node != head &&
strcmp(element->value, prev_element->value) == 0) {
q_release_element(prev_element);
node = node->next;
prev_element = element;
element = list_entry(node, element_t, list);
}
q_release_element(prev_element);
prev->next = node;
node->prev = prev;
if (node != head)
node = node->next;
} else {
node = node->next;
prev = prev->next;
}
}
return true;
}
```
由 `prev` 跟 `node` 這兩個相差一個節點的指標進行走訪,如果 `node` 與 `node->prev` 所指向節點的字串相同時,會找出下一個不相同的節點。
![](https://i.imgur.com/VQQWrj6.png)
然後將相同字串的節點刪除,把 `prev` 所指到的節點與 `node` 所指到的節點相連, `node` 設成 `node->next` ,讓兩指標維持相差一個節點的狀態。
![](https://i.imgur.com/bFKB4Xq.png)
:::danger
使用 Graphviz 重新繪製上圖。
:notes: jserv
:::
#### 改進
使用 `list_for_each_entry_safe` 簡化程式碼,加入布林變數 `is_dup` 用來判斷指標所指的節點是否為重複節點的最後一個 。
```c
bool q_delete_dup(struct list_head *head)
{
if (!head)
return false;
bool is_dup = false;
element_t *entry;
element_t *safe;
struct list_head *prev = head;
list_for_each_entry_safe(entry, safe, head, list) {
if(&safe->list != head && strcmp(entry->value, safe->value) == 0) {
q_release_element(entry);
is_dup = true;
}
else if (is_dup) {
is_dup = false;
q_release_element(entry);
prev->next = &safe->list;
safe->list.prev = prev;
}
else {
prev = prev->next;
}
}
return true;
}
```
### q_swap
```c
void q_swap(struct list_head *head)
{
if (!head || list_empty(head))
return;
for (struct list_head *node = head->next;
node != head && node->next != head; node = node->next) {
struct list_head *next = node->next;
node->prev->next = next;
next->next->prev = node;
node->next = next->next;
next->next = node;
next->prev = node->prev;
node->prev = next;
}
}
```
使用 `node` 指標走訪整個佇列,交換 `node` 與 `node->next` 的位置。
### q_reverse
```c
void q_reverse(struct list_head *head)
{
if (!head || list_empty(head))
return;
struct list_head *node = head;
struct list_head *next = node->next;
do {
node->next = node->prev;
node->prev = next;
node = next;
next = node->next;
} while (node != head);
}
```
走訪整個佇列,交換每個節點的 `next` 和 `prev` 。
![](https://i.imgur.com/kyKFDYb.png)
![](https://i.imgur.com/oyZdiYJ.png)
### q_sort
```c
#define SORT_BUFSIZE 32
void q_sort(struct list_head *head)
{
if (!head || head->next == head->prev)
return;
struct list_head *pending[SORT_BUFSIZE] = {};
struct list_head *result = head->next;
struct list_head *next;
int i;
head->prev->next = NULL;
while (result) {
next = result->next;
result->next = NULL;
for (i = 0; i < SORT_BUFSIZE && pending[i]; i++) {
result = merge(pending[i], result);
pending[i] = NULL;
}
if (i == SORT_BUFSIZE)
i--;
pending[i] = result;
result = next;
}
/*merge final*/
result = NULL;
for (i = 0; i < SORT_BUFSIZE - 1; i++) {
result = merge(pending[i], result);
}
merge_final(head, result, pending[SORT_BUFSIZE - 1]);
}
```
改寫自 [Bottom-up implementation using lists](https://en.wikipedia.org/wiki/Merge_sort#Bottom-up_implementation_using_lists) 。演算法的運作大致如下:
- 遍歷整個佇列,每次自佇列移除一個節點加入 `pending` 這個陣列。
- 從陣列的第0個開始,如果該位置有存指標的話,就將它與節點合併。
![](https://i.imgur.com/CuEtS7e.png)
- 把結果放到下一個位置,如果下一個位置一樣有存指標,就繼續合併,重複直到陣列有空的位置可以放。
![](https://i.imgur.com/AA11s6o.png)
串列長度為$1, 2, 4, 8, 16, ..., 2^i$ ,對應到陣列的第 i 個。
#### merge
```c
struct list_head *merge(struct list_head *a, struct list_head *b)
{
struct list_head head = {.next = NULL};
struct list_head *tail = &head;
while (a && b) {
char *sa = list_entry(a, element_t, list)->value;
char *sb = list_entry(b, element_t, list)->value;
/* if equal, take 'a' -- important for sort stability */
struct list_head **smaller = strcmp(sa, sb) <= 0 ? &a : &b;
tail->next = *smaller;
tail = tail->next;
*smaller = (*smaller)->next;
}
tail->next = (struct list_head *) ((uintptr_t) a | (uintptr_t) b);
return head.next;
}
```
兩個串列的開頭比較,將較小的節點放入排序好的串列。
```
tail->next = *smaller;
```
這裡利用到了指標的指標 `smaller` 可以直接將較小串列的開頭移除。
```c
*smaller = (*smaller)->next;
```
至於 `merge_final` 是用來做最後一次的合併,除了原本的合併操作外還有讓單向串列變回原本雙向串列。
### shuffle
:::danger
command 是「命令」,instruction 是「指令」,二者語意不同。
:notes: jserv
:::
使用以下程式將<s>指令</s>命令 `shuffle` 加入至 `qtest`
```c
ADD_COMMAND(shuffle, " | Shuffle every nodes in queue");
```
執行命令 `shuffle` 會呼叫到 `do_shuffle` 。裡面包含初始化和檢查輸入的參數,主要的洗牌程式是在 `q_shuffle` 中。
```c
void q_shuffle(struct list_head *head)
{
if (!head)
return;
for (int i = q_size(head); i > 0; i--) {
struct list_head *node = head->next;
for(int j = rand() % i; j > 0; j--) {
node = node->next;
}
list_move_tail(node, head);
}
}
```
1. 從佇列隨機選擇距離開頭0到i-1的一個節點, 將其放入尾端。
![](https://i.imgur.com/qZ8Z4dA.png)
![](https://i.imgur.com/oAeTXmF.png)
2. 重複以上動作直到所有節點都被挑選,最後得到的就是洗牌過後的佇列。
![](https://i.imgur.com/vpdBAQt.png)
這個方始是從原始的Fisher–Yates shuffle改良而來,因為這裡的佇列頭尾相連,所以可以放入直接放回佇列而不用另外放,而且因為是固定放入尾端,所以不用擔心會挑選到已經選過得節點。
### web
當輸入命令 `web_cmd [port]` 後會初始化 socket ,以及關閉 `linenoise` 。
```c
static bool do_web_cmd(int argc, char *argv[])
{
int port = DEFAULT_PORT;
if (listenfd > 0)
return true;
if (argc == 2) {
get_int(argv[1], &port);
}
listenfd = open_listenfd(port);
if (listenfd > 0) {
report(1, "listen on port %d, fd is %d", port, listenfd);
} else {
report(1, "Fail to run web server");
return false;
}
int flags = fcntl(listenfd, F_GETFL);
fcntl(listenfd, F_SETFL, flags | O_NONBLOCK);
noise = false;
return true;
}
```
原本使用 `linenoise` 讀取輸入會改用 `cmd_select` 讀取輸入。
```c
bool run_console(char *infile_name)
{
if (!push_file(infile_name)) {
report(1, "ERROR: Could not open source file '%s'", infile_name);
return false;
}
if (!has_infile) {
char *cmdline;
while (noise && (cmdline = linenoise(prompt)) != NULL) {
interpret_cmd(cmdline);
linenoiseHistoryAdd(cmdline); /* Add to the history. */
linenoiseHistorySave(HISTORY_FILE); /* Save the history on disk. */
linenoiseFree(cmdline);
}
if (!noise) {
while (!cmd_done()) {
cmd_select(0, NULL, NULL, NULL, NULL);
}
}
} else {
while (!cmd_done())
cmd_select(0, NULL, NULL, NULL, NULL);
}
return err_cnt == 0;
}
```
接下來是修改 `cmd_select` ,先將 server socket 的 file descriptor 加入 `readfds` , `readfds` 的類型是 `fd_set` ,作為 file descriptor 的集合。
```c
if (listenfd != -1)
FD_SET(listenfd, readfds);
```
將 `select` 的第一個參數 `nfds` 設成要監測的 file descriptor 中的最大值加一。
```c
if (listenfd >= nfds)
nfds = listenfd + 1;
```
呼叫 `select` 後,會將 `readfds` 中未準備好讀取的 file decriptor 移除, `FD_ISSET` 可以確認給定的 file descriptor 是否還在 `readfds` 。 `process` 會回傳處理好的 http 請求。
```c
int result = select(nfds, readfds, writefds, exceptfds, timeout);
if (result <= 0)
return result;
infd = buf_stack->fd;
if (readfds && FD_ISSET(infd, readfds)) {
/* Commandline input available */
FD_CLR(infd, readfds);
result--;
char *cmdline;
cmdline = readline();
if (cmdline)
interpret_cmd(cmdline);
} else if (readfds && FD_ISSET(listenfd, readfds)) {
FD_CLR(listenfd, readfds);
result--;
int connfd;
struct sockaddr_in clientaddr;
socklen_t clientlen = sizeof(clientaddr);
connfd = accept(listenfd, (SA *) &clientaddr, &clientlen);
char *p = process(connfd, &clientaddr);
if (p)
interpret_cmd(p);
free(p);
close(connfd);
}
```
`infd` 是原本輸入的 file descriptor ,它的值 `buf_stack->fd` 在 `push_file` 被初始化,判斷有沒有檔案作為輸入,如果沒有就是標準輸入。
## q_sort 與 list_sort
接續前述之 `q_sort` ,Linux Kernel v5.2 以前的 `list_sort` 採用了類似的實作。這個演算法的缺點是,當佇列長度遠大於2的陣列長度次方時,會變得像是插入排序,複雜度為 $O(n^2)$ 。而Linux Kernel v5.2 以後改進的版本則沒有這個限制。
- 舊版的 `list_sort` (Linux Kernel v5.1.21)
```c
void list_sort(void *priv, struct list_head *head,
int (*cmp)(void *priv, struct list_head *a,
struct list_head *b))
{
struct list_head *part[MAX_LIST_LENGTH_BITS+1]; /* sorted partial lists
-- last slot is a sentinel */
int lev; /* index into part[] */
int max_lev = 0;
struct list_head *list;
if (list_empty(head))
return;
memset(part, 0, sizeof(part));
head->prev->next = NULL;
list = head->next;
while (list) {
struct list_head *cur = list;
list = list->next;
cur->next = NULL;
for (lev = 0; part[lev]; lev++) {
cur = merge(priv, cmp, part[lev], cur);
part[lev] = NULL;
}
if (lev > max_lev) {
if (unlikely(lev >= ARRAY_SIZE(part)-1)) {
printk_once(KERN_DEBUG "list too long for efficiency\n");
lev--;
}
max_lev = lev;
}
part[lev] = cur;
}
for (lev = 0; lev < max_lev; lev++)
if (part[lev])
list = merge(priv, cmp, part[lev], list);
merge_and_restore_back_links(priv, cmp, head, part[max_lev], list);
}
```
這裡給出了長度過長的訊息。
```c
if (unlikely(lev >= ARRAY_SIZE(part)-1)) {
printk_once(KERN_DEBUG "list too long for efficiency\n");
lev--;
}
```
- 新版的 `list_sort` :
如果要儲存任意長度的資料,自然是想到用 Linked list ,新版用 `struct list_head *pending` 取代原本的 `struct list_head *part[MAX_LIST_LENGTH_BITS+1]` ,以及加入了 `size_t count` 用來判斷合併的順序。
### pending
與其再用 `struct list_head` 建立串列的串列,這裡使用比較巧妙的作法,因為合併的過程中只用到 `next` 指標,所以將 `prev` 指標當作連接串列的指標,如圖所示。
![](https://i.imgur.com/ln6opxn.png)
### count
count的是用來做延遲合併的計算,與原本的演算法不同,當 `pending` 中,只有在有兩個 $2^k$ 以及接續的多組串列總數為 $2^k$ 時才會將這兩個 $2^k$ 的串列合併成一個 $2^{k+1}$ 的串列,以避免 cache thrashing ,詳細可以參考 [Linux 核心的 list_sort 實作](https://hackmd.io/@sysprog/c-linked-list#Linux-%E6%A0%B8%E5%BF%83%E5%8E%9F%E5%A7%8B%E7%A8%8B%E5%BC%8F%E7%A2%BC) 。
### 比較
測試用的 `list_sort`, `merge` 和 `merge_finale` 改成與 `q_sort` 相同的版本,以及移除 `likely()` 其餘不變。測試內容為排序 N 個隨機長度為 1 的字串,執行以下程式碼,使用 `perf stat --repeat 10 ./test` 取得 `task-clock`。
```c
#define N 960000
int main(int argc, char *argv[]) {
struct list_head *h1 = q_new();
char s[2] = {0};
for(int i = 0; i < N; i++) {
s[0] = rand();
q_insert_head(h1, s);
}
if(argc == 2 && argv[1][0] == 'l')
q_linux_sort(h1);
else
q_sort(h1);
}
```
下圖為實驗的結果,佇列長度從 750 開始每次加倍,直到 96000 ,我的 L1d cache 大小是 128 KiB ,塞滿需要3278的節點。從結果可以看出隨著佇列長度增加, `list_sort` 快 `q_sort` 約 10% ,延遲合併是更有效率的。而在長度不長時,`q_sort` 使用陣列儲存待合併的串列,相比`list_sort` 的串列有更好的表現。
![](https://i.imgur.com/nqNgOWC.png)
接下來是對 `q_sort` 長度限制的實驗。 固定對 $2^{16}$ 個字串排序,調整 `pending` 陣列的大小,從 8 到 16。以下是結果,可以看出當陣列長度不足時,效率明顯下降。
![](https://i.imgur.com/ZMQPqxe.png)
## RIO套件
`console.c` 將原本RIO套件的 `rio_t` 加入了 `rio_ptr prev` ,將檔案以堆疊的方法儲存,
使用 `push_file()` 跟 `pop_file()` 操作,以及一個指向頂部的指標 `buf_stack` 。
```c
struct RIO_ELE {
int fd; /* File descriptor */
int cnt; /* Unread bytes in internal buffer */
char *bufptr; /* Next unread byte in internal buffer */
char buf[RIO_BUFSIZE]; /* Internal buffer */
rio_ptr prev; /* Next element in stack */
};
```
RIO套件主要用在 `readline()` ,用來從堆疊頂部的檔案讀取輸入。與一般的 `read` 相比, `readline()` 是 Buffered I/O,也就是會預先將檔案讀取至記憶體,而使用的時候改從記憶體讀取,這樣可以減少 system call 的次數。
## simulation
在做`trace-17-complexity` 的測試的時候發現, `remove_tail` 常常有不過的情形,而 `remove_head` 都能通過,明明兩個函式只有一處不相同。接下來找到 `constant.c` ,裡面有測試 `remove_tail` 的程式碼。
```c
case test_remove_tail:
for (size_t i = drop_size; i < n_measure - drop_size; i++) {
dut_new();
dut_insert_head(
get_random_string(),
*(uint16_t *) (input_data + i * chunk_size) % 10000);
before_ticks[i] = cpucycles();
element_t *e = q_remove_tail(l, NULL, 0);
after_ticks[i] = cpucycles();
if (e)
q_release_element(e);
dut_free();
}
break;
```
乍看之下跟 `test_remove_head` 的部份除了呼叫 `q_remove_tail` 完全一樣,但突然想到會不會是 cache miss 的緣故,因為 `remove_tail` 的測試使用 `dut_insert_head` 將節點插入至頭部,但移除的節點是在尾端,計算了一下我的 L1d Cache 能容納大概 128 * 1024 / (24 + 8 * 8) = 1489 個節點,而插入大於 1489 的機率很大。
為了驗證我的假設,我將 `dut_insert_head` 改成 `dut_insert_tail` ,此時發生了另外一個問題,程式看起來像是卡住一樣非常非常慢。我使用 `perf record` 看一下發生什麼事。以下是結果。
- remove_head
```
18.01% qtest qtest [.] test_malloc
12.08% qtest [kernel.kallsyms] [k] _extract_crng
11.67% qtest libc-2.31.so [.] _int_malloc
9.79% qtest libc-2.31.so [.] _int_free
9.36% qtest qtest [.] test_strdup
7.86% qtest qtest [.] test_free
```
- remove_tail
```
92.56% qtest qtest [.] test_free
```
兩者的 `test_free` 之所以差這麼多,是因為我的 `q_free` 釋放的順序是從頭到尾,與插入的順序相反,加上節點空間大於 cache size 導致大量 cache miss ,從而導致所花的時間變長。
我將 `q_free` 改成反向的版本後,確實測試通過的次數與 `remove_tail` 差不多。也就證實了是插入的順序導致了 cache miss。
### 缺陷
這裡的缺陷有兩個,第一個是 `dut_free` 跟 `dut_insert_tail` 造成的緩慢,目前想到的解決辦法是使用建立與釋放順序相同的函式,或是維持 `dut_insert_head` 然後使用第二個問題的解決辦法。
第二個問題是 cache miss 所造成的影響,實際上在原本的論文〈Dude, is my code constant time?〉有提到。
> To minimize the effect of environmental changes in the results
這裡就是因為處理數據的程式而不是原本待測程式造成結果的改變。對於 `test_remove_tail` 有比較針對的解決辦法,像是固定將最後一個節點從尾端插入。 想到比較通用的解法是刷新整個 cache 讓測試的時候固定發生 cache miss。