Try   HackMD

2022q1 Homework1 (lab0)

contributed by < ray90514 >

實驗環境

$ gcc --version
gcc (Ubuntu 9.3.0-17ubuntu1~20.04) 9.3.0
Copyright (C) 2019 Free Software Foundation, Inc.
This is free software; see the source for copying conditions.  There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
$ lscpu
Architecture:                    x86_64
CPU op-mode(s):                  32-bit, 64-bit
Byte Order:                      Little Endian
Address sizes:                   43 bits physical, 48 bits virtual
CPU(s):                          8
On-line CPU(s) list:             0-7
Thread(s) per core:              2
Core(s) per socket:              4
Socket(s):                       1
NUMA node(s):                    1
Vendor ID:                       AuthenticAMD
CPU family:                      23
Model:                           24
Model name:                      AMD Ryzen 5 PRO 3500U w/ Radeon Vega Mobile Gfx
Stepping:                        1
Frequency boost:                 enabled
CPU MHz:                         1675.446
CPU max MHz:                     2100.0000
CPU min MHz:                     1400.0000
BogoMIPS:                        4192.05
Virtualization:                  AMD-V
L1d cache:                       128 KiB
L1i cache:                       256 KiB
L2 cache:                        2 MiB
L3 cache:                        4 MiB
NUMA node0 CPU(s):               0-7

開發過程

q_new

struct list_head *q_new()
{
    struct list_head *head = malloc(sizeof(struct list_head));
    if (!head)
        return NULL;

    INIT_LIST_HEAD(head);

    return head;
}

檢查 malloc 是否成功以及使用 INIT_LIST_HEAD 初始化。

q_free

void q_free(struct list_head *head)
{
    if (!head)
        return;

    struct list_head *node;
    struct list_head *safe;

    list_for_each_safe (node, safe, head) {
        q_release_element(list_entry(node, element_t, list));
    }

    free(head);
}

因為要釋放節點及其所屬的 entry ,所以使用 list_for_each_safe 而不是 list_for_each 。前者會在走訪節點的過程,保存每個節點的 next 指標,所以可以安全釋放記憶體,而不會影響後續走訪。

使用通順的漢語書寫。

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
jserv

q_insert_head

bool q_insert_head(struct list_head *head, char *s)
{
    if (!head)
        return false;

    element_t *element = malloc(sizeof(element_t));
    if (!element)
        return false;

    element->value = strdup(s);
    if (!element->value) {
        free(element);
        return false;
    }

    list_add(&element->list, head);
    return true;
}

使用 list_add 將節點插入佇列的開頭,若是字串的記憶體分配失敗時,要記得釋放節點的記憶體。

q_insert_tail

q_insert_head 相似,將 list_add 改成 list_add_tail

q_remove_head

element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize)
{
    if (!head || list_empty(head))
        return NULL;

    element_t *element = list_first_entry(head, element_t, list);
    list_del(&element->list);
    if (sp) {
        strncpy(sp, element->value, bufsize - 1);
        sp[bufsize - 1] = '\0';
    }
    return element;
}

使用 list_first_entry 取得開頭的entry,利用 list_del 將其自佇列中移除。再判斷是否有要將 value 字串裡的值複製到 sp 。使用 strncpy 可以複製給定數量的前 n 個字元,因此不保證 '\0' 做結尾,需要手動補上。

q_remove_tail

q_insert_head 相似,將 list_first_entry 改成 list_last_entry ,以取得尾端的entry 。

q_size

int q_size(struct list_head *head)
{
    if (!head)
        return 0;

    struct list_head *node;
    int len = 0;

    list_for_each (node, head)
        len++;
    return len;
}

走訪佇列中所有節點,紀錄過程中的節點數量。

q_delete_mid

bool q_delete_mid(struct list_head *head)
{
    if (!head || list_empty(head))
        return false;

    struct list_head *forward = head->next;
    struct list_head *backward = head;

    while (forward != backward) {
        backward = backward->prev;
        if (forward == backward)
            break;
        forward = forward->next;
    }

    list_del(forward);
    q_release_element(list_entry(forward, element_t, list));
    return true;
}

由兩個指標一個向前走訪,一個向後走訪,當兩個指標相同時所指到的節點即為中間的節點。因為索引是由0開始,所以由 forward 指標先走訪。

q_delete_dup

bool q_delete_dup(struct list_head *head)
{
    if (!head)
        return false;

    struct list_head *node = head->next->next;
    struct list_head *prev = head;

    while (node != head) {
        element_t *element = list_entry(node, element_t, list);
        element_t *prev_element = list_entry(node->prev, element_t, list);

        if (strcmp(element->value, prev_element->value) == 0) {
            while (node != head &&
                   strcmp(element->value, prev_element->value) == 0) {
                q_release_element(prev_element);
                node = node->next;
                prev_element = element;
                element = list_entry(node, element_t, list);
            }
  
            q_release_element(prev_element);
            prev->next = node;
            node->prev = prev;
            if (node != head)
                node = node->next;
        } else {
            node = node->next;
            prev = prev->next;
        }
    }
    return true;
}

prevnode 這兩個相差一個節點的指標進行走訪,如果 nodenode->prev 所指向節點的字串相同時,會找出下一個不相同的節點。

然後將相同字串的節點刪除,把 prev 所指到的節點與 node 所指到的節點相連, node 設成 node->next ,讓兩指標維持相差一個節點的狀態。

使用 Graphviz 重新繪製上圖。

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
jserv

改進

使用 list_for_each_entry_safe 簡化程式碼,加入布林變數 is_dup 用來判斷指標所指的節點是否為重複節點的最後一個 。

bool q_delete_dup(struct list_head *head)
{
    if (!head)
        return false;

    bool is_dup = false;
    element_t *entry;
    element_t *safe;
    struct list_head *prev = head;

    list_for_each_entry_safe(entry, safe, head, list) {
        if(&safe->list != head && strcmp(entry->value, safe->value) == 0) {
            q_release_element(entry);
            is_dup = true;
        }
        else if (is_dup) {
            is_dup = false;
            q_release_element(entry);
            prev->next = &safe->list;
            safe->list.prev = prev;
        }
        else {
            prev = prev->next;
        }
    }
    return true;
}

q_swap

void q_swap(struct list_head *head)
{
    if (!head || list_empty(head))
        return;

    for (struct list_head *node = head->next;
         node != head && node->next != head; node = node->next) {
        struct list_head *next = node->next;
        node->prev->next = next;
        next->next->prev = node;
        node->next = next->next;
        next->next = node;
        next->prev = node->prev;
        node->prev = next;
    }
}

使用 node 指標走訪整個佇列,交換 nodenode->next 的位置。

q_reverse

void q_reverse(struct list_head *head)
{
    if (!head || list_empty(head))
        return;

    struct list_head *node = head;
    struct list_head *next = node->next;
    do {
        node->next = node->prev;
        node->prev = next;
        node = next;
        next = node->next;
    } while (node != head);
}

走訪整個佇列,交換每個節點的 nextprev

q_sort

#define SORT_BUFSIZE 32
void q_sort(struct list_head *head)
{
    if (!head || head->next == head->prev)
        return;

    struct list_head *pending[SORT_BUFSIZE] = {};
    struct list_head *result = head->next;
    struct list_head *next;
    int i;

    head->prev->next = NULL;
    while (result) {
        next = result->next;
        result->next = NULL;
        for (i = 0; i < SORT_BUFSIZE && pending[i]; i++) {
            result = merge(pending[i], result);
            pending[i] = NULL;
        }

        if (i == SORT_BUFSIZE)
            i--;
        pending[i] = result;
        result = next;
    }

    /*merge final*/
    result = NULL;
    for (i = 0; i < SORT_BUFSIZE - 1; i++) {
        result = merge(pending[i], result);
    }
    merge_final(head, result, pending[SORT_BUFSIZE - 1]);
}

改寫自 Bottom-up implementation using lists 。演算法的運作大致如下:

  • 遍歷整個佇列,每次自佇列移除一個節點加入 pending 這個陣列。
  • 從陣列的第0個開始,如果該位置有存指標的話,就將它與節點合併。

  • 把結果放到下一個位置,如果下一個位置一樣有存指標,就繼續合併,重複直到陣列有空的位置可以放。

串列長度為

1,2,4,8,16,...,2i ,對應到陣列的第 i 個。

merge

struct list_head *merge(struct list_head *a, struct list_head *b)
{
    struct list_head head = {.next = NULL};
    struct list_head *tail = &head;

    while (a && b) {
        char *sa = list_entry(a, element_t, list)->value;
        char *sb = list_entry(b, element_t, list)->value;

        /* if equal, take 'a' -- important for sort stability */
        struct list_head **smaller = strcmp(sa, sb) <= 0 ? &a : &b;
        tail->next = *smaller;
        tail = tail->next;
        *smaller = (*smaller)->next;
    }

    tail->next = (struct list_head *) ((uintptr_t) a | (uintptr_t) b);
    return head.next;
}

兩個串列的開頭比較,將較小的節點放入排序好的串列。

tail->next = *smaller;

這裡利用到了指標的指標 smaller 可以直接將較小串列的開頭移除。

*smaller = (*smaller)->next;

至於 merge_final 是用來做最後一次的合併,除了原本的合併操作外還有讓單向串列變回原本雙向串列。

shuffle

command 是「命令」,instruction 是「指令」,二者語意不同。

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
jserv

使用以下程式將指令命令 shuffle 加入至 qtest

ADD_COMMAND(shuffle, "                | Shuffle every nodes in queue");

執行命令 shuffle 會呼叫到 do_shuffle 。裡面包含初始化和檢查輸入的參數,主要的洗牌程式是在 q_shuffle 中。

void q_shuffle(struct list_head *head)
{
    if (!head)
        return;

    for (int i = q_size(head); i > 0; i--) {
        struct list_head *node = head->next;
        for(int j = rand() % i; j > 0; j--) {
            node = node->next;
        }
        list_move_tail(node, head);
    }
}
  1. 從佇列隨機選擇距離開頭0到i-1的一個節點, 將其放入尾端。

  1. 重複以上動作直到所有節點都被挑選,最後得到的就是洗牌過後的佇列。

這個方始是從原始的Fisher–Yates shuffle改良而來,因為這裡的佇列頭尾相連,所以可以放入直接放回佇列而不用另外放,而且因為是固定放入尾端,所以不用擔心會挑選到已經選過得節點。

web

當輸入命令 web_cmd [port] 後會初始化 socket ,以及關閉 linenoise

static bool do_web_cmd(int argc, char *argv[])
{
    int port = DEFAULT_PORT;

    if (listenfd > 0)
        return true;
    if (argc == 2) {
        get_int(argv[1], &port);
    }

    listenfd = open_listenfd(port);
    if (listenfd > 0) {
        report(1, "listen on port %d, fd is %d", port, listenfd);
    } else {
        report(1, "Fail to run web server");
        return false;
    }
    int flags = fcntl(listenfd, F_GETFL);
    fcntl(listenfd, F_SETFL, flags | O_NONBLOCK);
    noise = false;
    return true;
}

原本使用 linenoise 讀取輸入會改用 cmd_select 讀取輸入。

bool run_console(char *infile_name)
{
    if (!push_file(infile_name)) {
        report(1, "ERROR: Could not open source file '%s'", infile_name);
        return false;
    }

    if (!has_infile) {
        char *cmdline;
        while (noise && (cmdline = linenoise(prompt)) != NULL) {
            interpret_cmd(cmdline);
            linenoiseHistoryAdd(cmdline);       /* Add to the history. */
            linenoiseHistorySave(HISTORY_FILE); /* Save the history on disk. */
            linenoiseFree(cmdline);
        }
        if (!noise) {
            while (!cmd_done()) {
                cmd_select(0, NULL, NULL, NULL, NULL);
            }
        }
    } else {
        while (!cmd_done())
            cmd_select(0, NULL, NULL, NULL, NULL);
    }

    return err_cnt == 0;
}

接下來是修改 cmd_select ,先將 server socket 的 file descriptor 加入 readfdsreadfds 的類型是 fd_set ,作為 file descriptor 的集合。

if (listenfd != -1)
    FD_SET(listenfd, readfds);

select 的第一個參數 nfds 設成要監測的 file descriptor 中的最大值加一。

if (listenfd >= nfds)
            nfds = listenfd + 1;

呼叫 select 後,會將 readfds 中未準備好讀取的 file decriptor 移除, FD_ISSET 可以確認給定的 file descriptor 是否還在 readfdsprocess 會回傳處理好的 http 請求。

int result = select(nfds, readfds, writefds, exceptfds, timeout);
if (result <= 0)
    return result;

infd = buf_stack->fd;
if (readfds && FD_ISSET(infd, readfds)) {
    /* Commandline input available */
    FD_CLR(infd, readfds);
    result--;
    char *cmdline;
    cmdline = readline();
    if (cmdline)
        interpret_cmd(cmdline);
} else if (readfds && FD_ISSET(listenfd, readfds)) {
    FD_CLR(listenfd, readfds);
    result--;
    int connfd;
    struct sockaddr_in clientaddr;
    socklen_t clientlen = sizeof(clientaddr);
    connfd = accept(listenfd, (SA *) &clientaddr, &clientlen);

    char *p = process(connfd, &clientaddr);
    if (p)
        interpret_cmd(p);
    free(p);
    close(connfd);
}

infd 是原本輸入的 file descriptor ,它的值 buf_stack->fdpush_file 被初始化,判斷有沒有檔案作為輸入,如果沒有就是標準輸入。

q_sort 與 list_sort

接續前述之 q_sort ,Linux Kernel v5.2 以前的 list_sort 採用了類似的實作。這個演算法的缺點是,當佇列長度遠大於2的陣列長度次方時,會變得像是插入排序,複雜度為

O(n2) 。而Linux Kernel v5.2 以後改進的版本則沒有這個限制。

  • 舊版的 list_sort (Linux Kernel v5.1.21)
void list_sort(void *priv, struct list_head *head,
		int (*cmp)(void *priv, struct list_head *a,
			struct list_head *b))
{
	struct list_head *part[MAX_LIST_LENGTH_BITS+1]; /* sorted partial lists
						-- last slot is a sentinel */
	int lev;  /* index into part[] */
	int max_lev = 0;
	struct list_head *list;

	if (list_empty(head))
		return;

	memset(part, 0, sizeof(part));

	head->prev->next = NULL;
	list = head->next;

	while (list) {
		struct list_head *cur = list;
		list = list->next;
		cur->next = NULL;

		for (lev = 0; part[lev]; lev++) {
			cur = merge(priv, cmp, part[lev], cur);
			part[lev] = NULL;
		}
		if (lev > max_lev) {
			if (unlikely(lev >= ARRAY_SIZE(part)-1)) {
				printk_once(KERN_DEBUG "list too long for efficiency\n");
				lev--;
			}
			max_lev = lev;
		}
		part[lev] = cur;
	}

	for (lev = 0; lev < max_lev; lev++)
		if (part[lev])
			list = merge(priv, cmp, part[lev], list);

	merge_and_restore_back_links(priv, cmp, head, part[max_lev], list);
}

這裡給出了長度過長的訊息。

if (unlikely(lev >= ARRAY_SIZE(part)-1)) {
	printk_once(KERN_DEBUG "list too long for efficiency\n");
	lev--;
}
  • 新版的 list_sort

如果要儲存任意長度的資料,自然是想到用 Linked list ,新版用 struct list_head *pending 取代原本的 struct list_head *part[MAX_LIST_LENGTH_BITS+1] ,以及加入了 size_t count 用來判斷合併的順序。

pending

與其再用 struct list_head 建立串列的串列,這裡使用比較巧妙的作法,因為合併的過程中只用到 next 指標,所以將 prev 指標當作連接串列的指標,如圖所示。

count

count的是用來做延遲合併的計算,與原本的演算法不同,當 pending 中,只有在有兩個

2k 以及接續的多組串列總數為
2k
時才會將這兩個
2k
的串列合併成一個
2k+1
的串列,以避免 cache thrashing ,詳細可以參考 Linux 核心的 list_sort 實作

比較

測試用的 list_sortmergemerge_finale 改成與 q_sort 相同的版本,以及移除 likely() 其餘不變。測試內容為排序 N 個隨機長度為 1 的字串,執行以下程式碼,使用 perf stat --repeat 10 ./test 取得 task-clock

#define N 960000
int main(int argc, char *argv[]) {
    struct list_head *h1 = q_new();
    char s[2] = {0};
    for(int i = 0; i < N; i++) {
        s[0] = rand();
        q_insert_head(h1, s);
    }

    if(argc == 2 && argv[1][0] == 'l')
        q_linux_sort(h1);
    else
        q_sort(h1);
}

下圖為實驗的結果,佇列長度從 750 開始每次加倍,直到 96000 ,我的 L1d cache 大小是 128 KiB ,塞滿需要3278的節點。從結果可以看出隨著佇列長度增加, list_sortq_sort 約 10% ,延遲合併是更有效率的。而在長度不長時,q_sort 使用陣列儲存待合併的串列,相比list_sort 的串列有更好的表現。

接下來是對 q_sort 長度限制的實驗。 固定對

216 個字串排序,調整 pending 陣列的大小,從 8 到 16。以下是結果,可以看出當陣列長度不足時,效率明顯下降。

RIO套件

console.c 將原本RIO套件的 rio_t 加入了 rio_ptr prev ,將檔案以堆疊的方法儲存,
使用 push_file()pop_file() 操作,以及一個指向頂部的指標 buf_stack

struct RIO_ELE {
    int fd;                /* File descriptor */
    int cnt;               /* Unread bytes in internal buffer */
    char *bufptr;          /* Next unread byte in internal buffer */
    char buf[RIO_BUFSIZE]; /* Internal buffer */
    rio_ptr prev;          /* Next element in stack */
};

RIO套件主要用在 readline() ,用來從堆疊頂部的檔案讀取輸入。與一般的 read 相比, readline() 是 Buffered I/O,也就是會預先將檔案讀取至記憶體,而使用的時候改從記憶體讀取,這樣可以減少 system call 的次數。

simulation

在做trace-17-complexity 的測試的時候發現, remove_tail 常常有不過的情形,而 remove_head 都能通過,明明兩個函式只有一處不相同。接下來找到 constant.c ,裡面有測試 remove_tail 的程式碼。

case test_remove_tail:
    for (size_t i = drop_size; i < n_measure - drop_size; i++) {
        dut_new();
        dut_insert_head(
            get_random_string(),
            *(uint16_t *) (input_data + i * chunk_size) % 10000);
        before_ticks[i] = cpucycles();
        element_t *e = q_remove_tail(l, NULL, 0);
        after_ticks[i] = cpucycles();
    if (e)
        q_release_element(e);
        dut_free();
    }
    break;

乍看之下跟 test_remove_head 的部份除了呼叫 q_remove_tail 完全一樣,但突然想到會不會是 cache miss 的緣故,因為 remove_tail 的測試使用 dut_insert_head 將節點插入至頭部,但移除的節點是在尾端,計算了一下我的 L1d Cache 能容納大概 128 * 1024 / (24 + 8 * 8) = 1489 個節點,而插入大於 1489 的機率很大。
為了驗證我的假設,我將 dut_insert_head 改成 dut_insert_tail ,此時發生了另外一個問題,程式看起來像是卡住一樣非常非常慢。我使用 perf record 看一下發生什麼事。以下是結果。

  • remove_head
  18.01%  qtest    qtest              [.] test_malloc
  12.08%  qtest    [kernel.kallsyms]  [k] _extract_crng
  11.67%  qtest    libc-2.31.so       [.] _int_malloc
   9.79%  qtest    libc-2.31.so       [.] _int_free
   9.36%  qtest    qtest              [.] test_strdup
   7.86%  qtest    qtest              [.] test_free
  • remove_tail
92.56%  qtest    qtest              [.] test_free

兩者的 test_free 之所以差這麼多,是因為我的 q_free 釋放的順序是從頭到尾,與插入的順序相反,加上節點空間大於 cache size 導致大量 cache miss ,從而導致所花的時間變長。

我將 q_free 改成反向的版本後,確實測試通過的次數與 remove_tail 差不多。也就證實了是插入的順序導致了 cache miss。

缺陷

這裡的缺陷有兩個,第一個是 dut_freedut_insert_tail 造成的緩慢,目前想到的解決辦法是使用建立與釋放順序相同的函式,或是維持 dut_insert_head 然後使用第二個問題的解決辦法。
第二個問題是 cache miss 所造成的影響,實際上在原本的論文〈Dude, is my code constant time?〉有提到。

To minimize the effect of environmental changes in the results

這裡就是因為處理數據的程式而不是原本待測程式造成結果的改變。對於 test_remove_tail 有比較針對的解決辦法,像是固定將最後一個節點從尾端插入。 想到比較通用的解法是刷新整個 cache 讓測試的時候固定發生 cache miss。