Try   HackMD

2023q1 Homework1 (lab0)

contributed by < seasonwang0905 >

開發環境

$ gcc --version
gcc (Ubuntu 11.3.0-1ubuntu1~22.04) 11.3.0

$ lscpu
Architecture:            x86_64
CPU op-mode(s):          32-bit, 64-bit
Address sizes:           39 bits physical, 48 bits virtual
Byte Order:              Little Endian
CPU(s):                  6
On-line CPU(s) list:     0-5
Vendor ID:               GenuineIntel
Model name:              Intel(R) Core(TM) i5-9500 CPU @ 3.00GHz
CPU family:              6
Model:                   158
Thread(s) per core:      1
Core(s) per socket:      6
Socket(s):               1
Stepping:                10
CPU max MHz:             4400.0000
CPU min MHz:             800.0000
BogoMIPS:                6000.00
Virtualization:          VT-x
L1d:                     192 KiB (6 instances)
L1i:                     192 KiB (6 instances)
L2:                      1.5 MiB (6 instances)
L3:                      9 MiB (1 instance
NUMA node(s):            1
NUMA node0 CPU(s):       0-5

佇列實作

作業要求

進度表
  • q_new
  • q_free
  • q_insert_head
  • q_insert_tail
  • q_remove_head
  • q_remove_tail
  • q_size
  • q_delete_mid
  • q_delete_dup
  • q_swap
  • q_reverse
  • q_reverseK
  • q_sort
  • q_descend
  • q_merge

q_new

Create an empty queue

Return: NULL for allocation failed.

檢查發現 list.h 之中有一巨集 INIT_LIST_HEAD 可以用來初始化

struct list_head *q_new()
{
    struct list_head *q = malloc(sizeof(struct list_head));
    if (!q)
        return NULL;
    INIT_LIST_HEAD(q);
    return q;
}

q_free

Free all storage used by queue, no effect if header is NULL.

這裡使用了巨集 list_for_each_entry_safe 走訪每個節點並刪除之

void q_free(struct list_head *l)
{
-   if (list_empty(l))
+   if (!l)
        return;
    element_t *current, *next;
    list_for_each_entry_safe (current, next, l, list) {
        list_del(&current->list);
        q_release_element(current);
    }
    free(l);
}

你所不知道的 c 語言: linked list 和非連續記憶體知可對 current 作移出而不會產生不可預期的行為,並使用 q_release_element 釋放 element_t 中字串佔用的記憶體。

應該用 !l ,而非 list_empty(l) ,如此才能釋放空佇列 (e.g. l=[])

q_insert_head

Insert an element at head of queue

Return: true for success, false for allocation failed or queue is NULL.

參考 list.h 中的 list_add 函式

static inline void list_add(struct list_head *node, struct list_head *head)
{
    struct list_head *next = head->next;

    next->prev = node;
    node->next = next;
    node->prev = head;
    head->next = node;
}

上述原始碼的功能是在 head 之後新增一個 node ,於是這裡使用 malloc 建立新的 node 並紀錄欲加入的字串

bool q_insert_head(struct list_head *head, char *s)
{
    if (!head)
        return false;
    element_t *new = malloc(sizeof(element_t));
    if (!new)
        return false;
    new->value = strdup(s);
    if (!new->value) {
        free(new);
        return false;
    }
    list_add(&new->list, head);
    return true;
}

參考strdup,原本以 strncpy 實作,後來發現strdup 配合 malloc 可不需要計算個數而複製字串 ,使用上更方便簡潔。

q_insert_tail

Insert an element at tail of queue

Return: true for success, false for allocation failed or queue is NULL

list.h中有函式 list_add_tail ,此函式會將新的 node 加在 head 之前,也就是 tail 的位置

bool q_insert_tail(struct list_head *head, char *s)
{
    if (!head)
        return false;
    element_t *new = malloc(sizeof(element_t));
    if (!new)
        return false;
    new->value = strdup(s);
    if (!new->value) {
        free(new);
        return false;
    }
    list_add_tail(&new->list, head);
    return true;
}

q_remove_head

Remove an element from head of queue

Return: the pointer to element, %NULL if queue is NULL or empty.

list_first_entry 可藉由輸入 head 來反向得知第一個節點的資訊並設 rm 為欲刪除的節點,當 sp 不為 NULL 時複製 rm->valuesp 後移走 rm

element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize)
{
    if (!head || list_empty(head))
        return NULL;
    element_t *rm = list_first_entry(head, element_t, list);
    if (sp != NULL) {
        strncpy(sp, rm->value, bufsize - 1);
        sp[bufsize - 1] = '\0';
    }
    list_del(&rm->list);
    return rm;
}

remove 只是將節點移出 linked list ,在實作時必須注意不要刪除到節點的資訊,並記得在複製欲刪除的字串到 sp 後加上結尾字元 \0

q_remove_head

Remove an element from tail of queue

Return: the pointer to element, %NULL if queue is NULL or empty.

list_first_entry 改為 list_last_entry 即為刪除結尾節點的版本

element_t *q_remove_tail(struct list_head *head, char *sp, size_t bufsize)
{
    if (!head || list_empty(head))
        return NULL;
    element_t *rm = list_last_entry(head, element_t, list);
    if (sp != NULL) {
        strncpy(sp, rm->value, bufsize - 1);
        sp[bufsize - 1] = '\0';
    }
    list_del(&rm->list);
    return rm;
}

q_size

Return number of elements in queue

Return: the number of elements in queue, zero if queue is NULL or empty

走訪每個節點並回傳 linked list 個數 n ,注意 n++ 不能寫為 {n++} ,否則會報錯

int q_size(struct list_head *head)
{
    if (!head || list_empty(head))
        return 0;
    struct list_head *node;
    int n = 0;
    list_for_each (node, head)
        n++;
    return n;
}

q_delete_mid

Delete the middle node in queue

Return: true for success, false if list is NULL or empty.

這裡使用龜兔賽跑 (Tortoise and Hare Algorithm) 的方式來刪除中間節點,當 fastfast->next 最後指向 headslow 恰好指向我們需要刪除的節點

bool q_delete_mid(struct list_head *head)
{
    if (!head || !head->next)
        return false;
    struct list_head *fast = head->next, *slow = head->next;
    while (fast != head && fast->next != head) {
        fast = fast->next->next;
        slow = slow->next;
    }
    element_t *e = container_of(head, element_t, list);
    list_del(slow);
-   if (!e->value)
-       q_release_element(e);
+   q_release_element(list_entry(slow, element_t, list));
    return true;
}

container_of 也可用 list_entry 代替

make test 測試時發現記憶體沒有被正確釋放,原因在 !e->value 排除了空佇列,導致程式結束時仍有指標指向空佇列。

q_delete_dup

Delete all nodes that have duplicate string

Return: true for success, false if list is NULL.

根據定義,我們要把佇列中重複出現的所有節點刪除,實作方式是改編自 list_for_each_entry_safe 的功能,差別在這裡用 index 代表每次紀錄的 del 之值的位置,再走訪剩餘節點以找出相同的字串並刪除。

bool q_delete_dup(struct list_head *head)
{
    if (!head || list_empty(head))
        return false;
    element_t *current, *forward;
    struct list_head *index = head->next;
    char *del = NULL;
    while (index != head) {
        del = list_entry(index, element_t, list)->value;
        bool flag = 0;
        for (current = list_entry(index, element_t, list),
            forward = list_entry(current->list.next, element_t, list);
             &current->list != head; current = forward,
            forward = list_entry(forward->list.next, element_t, list)) {
            if (del && !strcmp(current->value, del) &&
                &current->list != index) {
                list_del_init(&current->list);
                q_release_element(current);
                flag = 1;
            }
        };
        if (del && flag) {
            struct list_head *temp = index;
            index = index->next;
            list_del(temp);
            q_release_element(list_entry(temp, element_t, list));
            flag = 0;
        } else
            index = index->next;
    }
    return true;
}

上述程式碼沒有將佇列重新排序,也就是這邊直接對 unsorted linked list 做重複刪除

./qtest 測試的結果如下:

重複字串相鄰

cmd> new
l = []
cmd> ih e
l = [e]
cmd> ih e
l = [e e]
cmd> ih v
l = [v e e]
cmd> dedup
l = [v]

重複字串不相鄰

cmd> new
l = []
cmd> ih e
l = [e]
cmd> ih v
l = [v e]
cmd> ih e
l = [e v e]
cmd> dedup
ERROR: Duplicate strings are in queue or distinct strings are not in queue
l = [v]

發現若重複字串相鄰可以正確刪除,不相鄰則會出現 ERROR: Duplicate strings are in queue or distinct strings are not in queue ,但輸出結果是正常的,檢查 q_size :

cmd> size
ERROR: Computed queue size as 1, but correct value is 3
l = [v]

這代表佇列中某些東西沒有正確的釋放而被偵測到了,目前檢查不出是哪裡的問題,不過根據測試的結果,可以先用 sort 排列使得重複字串相鄰再予以刪除,也許就不會有這個錯誤訊息了 (施工中..)

q_swap

Swap every two adjacent nodes

nodenode->next 兩兩交換,過程是將 node 移出 linked list 後再使用 list_addnode 加到 node->next 後面即完成一次交換

void q_swap(struct list_head *head)
{
    if (!head)
        return;
    struct list_head *node;
    for (node = head->next; node != head && node->next != head;
         node = node->next) {
        struct list_head *next = node->next;
        list_del_init(node);
        list_add(node, next);
    }
}

q_reverse

Reverse elements in queue

currentnextprev 重新指向 prevnext ,迭代完一次後 prevcurrentnext 皆往前一個節點並重複動作

void q_reverse(struct list_head *head)
{
    if (!head || list_empty(head))
        return;
    struct list_head *prev = head->prev, *current = head, *next = NULL;
    while (next != head) {
        next = current->next;
        current->next = prev;
        current->prev = next;
        prev = current;
        current = next;
    }
}

q_reverseK

Reverse the nodes of the list k at a time

list.h 中的定義得知, list_cut_positionlist_splice_init 在切斷與接合的時候需要建立一個假頭以方便實作,這裡參考 komark06 同學所撰寫的 q_reverseK

void q_reverseK(struct list_head *head, int k)
{
    if (!head || list_empty(head) ||k <= 1)
        return;
    struct list_head *node, *safe, *temp = head;
    LIST_HEAD(pseudo);
    int count = 0;
    list_for_each_safe (node, safe, head) {
        count++;
        if (count == k) {
            list_cut_position(&pseudo, temp, node);
            q_reverse(&pseudo);
            list_splice_init(&pseudo, temp);
            count = 0;
            temp = safe->prev;
        }
    }
}

q_sort

Sort elements of queue in ascending order

參考你所不知道的 c 語言: linked list 和非連續記憶體Linked List Sort 中提到的Merge Sort ,並以 Risheng1128 的程式碼為實作。首先,我們先做兩個 list 的合併和排序

/* Merge two list into one queue */
struct list_head *merge_two_list(struct list_head *l1, struct list_head *l2)
{
    struct list_head *temp = NULL;
    struct list_head **indirect = &temp;
    for (struct list_head **node = NULL; l1 && l2; *node = (*node)->next) {
        element_t *e1 = list_entry(l1, element_t, list);
        element_t *e2 = list_entry(l2, element_t, list);
        if (strcmp(e1->value, e2->value) < 0)
            node = &l1;
        else
            node = &l2;
        *indirect = *node;
        indirect = &(*indirect)->next;
    }
    *indirect = (struct list_head *) ((u_int64_t) l1 | (u_int64_t) l2);
    return temp;
}

有了基本的兩個 list 合併後,接著就能使用函式遞迴的方式來分割佇列,並且用 merge_two_list 來逐一回傳排序好的佇列。

/* Divide the queue and sort every element */
struct list_head *mergesort(struct list_head *head)
{
    if (!head || !head->next)
        return head;
    struct list_head *fast = head, *slow = head;
    while (fast && fast->next) {
        fast = fast->next->next;
        slow = slow->next;
    }
    fast = slow;
    slow->prev->next = NULL;
    struct list_head *l1 = mergesort(head);
    struct list_head *l2 = mergesort(fast);
    return merge_two_list(l1, l2);
}

最後,在 q_sort 中做重新連結,使排好的佇列可以雙向溝通

void q_sort(struct list_head *head)
{
    if (!head || list_empty(head))
        return;
    head->prev->next = NULL;
    head->next = mergesort(head->next);
    struct list_head *current = head, *next = head->next;
    while (next) {
        next->prev = current;
        current = next;
        next = next->next;
    }
    current->next = head;
    head->prev = current;
}

到此就完成了 Merge Sort

q_descend

Remove every node which has a node with a strictly greater value anywhere to the right side of it

根據需求,往 prev 的方向做比較和刪除會容易些,遞迴式也是參照 list_for_each_entry_safe 來改編的,total 是總節點數, count 是刪除的節點數,最後回傳剩餘的節點數

int q_descend(struct list_head *head)
{
    if (!head || list_empty(head))
        return 0;
    element_t *current, *forward;
    char *max = NULL;
    int total = 0, count = 0;
    for (current = list_entry(head->prev, element_t, list),
        forward = list_entry(current->list.prev, element_t, list);
         &current->list != head; current = forward,
        forward = list_entry(forward->list.prev, element_t, list)) {
        total++;
        if (!max || strcmp(current->value, max) > 0) {
            max = current->value;
        }
        if (max && strcmp(current->value, max) < 0) {
            list_del(&current->list);
            q_release_element(current);
            count++;
        }
    }
    return total - count;
}

參考 strcmp ,若 current->value 較大回傳正值 ,反之則回傳負值

q_merge

Merge all the queues into one sorted queue, which is in ascending order

參考 brianlin314 同學的 q_merge ,發現原來只需要用 list_splice_initchain 裡面每個佇列的 head 接起來,程式碼如下

int q_merge(struct list_head *head)
{
    if(!head || list_empty(head)) {
        return 0;
    }
    if(list_is_singular(head)) {
        return list_entry(head->next, queue_contex_t, chain)->size;
    }
    queue_contex_t *merged_list = list_entry(head->next, queue_contex_t, chain);
    struct list_head *node = NULL, *safe = NULL;
    list_for_each_safe (node, safe, head) {
        if(node == head->next) {
            continue;
        }
        queue_contex_t *temp = list_entry(node, queue_contex_t, chain);
        list_splice_init(temp->q, merged_list->q);
    }
    q_sort(merged_list->q);
    return merged_list->size;
}

之前不知道是不是沒有用 list_for_each_safe 的原因,自己嘗試實作都會出現 Segmentation Fault ,所以卡在實作的部份很久

使用 Valgrind 檢查記憶體錯誤

執行命令 make valgrind 會出現下列記憶體錯誤訊息

==16958== 40 bytes in 1 blocks are still reachable in loss record 1 of 38
==16958==    at 0x4848899: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==16958==    by 0x10D9AD: malloc_or_fail (report.c:215)
==16958==    by 0x10E830: add_cmd (console.c:85)
==16958==    by 0x10EBCB: init_cmd (console.c:424)
==16958==    by 0x10CF43: main (qtest.c:1207)

根據 Valgrind 常見問題中的解答,記憶體遺失的檢側共分為四種類型,上述程式碼的錯誤顯示是 still recheable 的類型,其描述為下

"still reachable" means your program is probably ok -- it didn't free some memory it could have. This is quite common and often reasonable. Don't use --show-reachable=yes if you don't want to see these reports.

意思是程式結束前有些記憶體並未被正常釋放,但是不會影響 code 的執行,如果檢查底下的測試分數,如

=16929== 56 bytes in 1 blocks are still reachable in loss record 2 of 2
==16929==    at 0x4848899: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==16929==    by 0x10F128: test_malloc (harness.c:133)
==16929==    by 0x10F52D: q_new (queue.c:17)
==16929==    by 0x10CC1F: do_new (qtest.c:150)
==16929==    by 0x10DE12: interpret_cmda (console.c:181)
==16929==    by 0x10E3C7: interpret_cmd (console.c:201)
==16929==    by 0x10E7C8: cmd_select (console.c:610)
==16929==    by 0x10F0B4: run_console (console.c:705)
==16929==    by 0x10D204: main (qtest.c:1228)
==16929== 
---     trace-01-ops    5/5

會發現程式正確執行且有拿到分數,既然如此,就要從它顯示的路徑倒回去找看看是哪邊導致記憶體出了問題,於是在 qtest.c 中的 q_quit

static bool q_quit(int argc, char *argv[])
{
-   return true;
    report(3, "Freeing queue");
    if (current && current->size > BIG_LIST_SIZE)
        set_cautious_mode(false);

找到第一行為 return true; ,把這一行刪除後執行 make valgrind 就不會出現 still recheable 的錯誤了

使用 Massif 查看記憶體的狀況

執行完 make valgrind 後,輸入命令

$ valgrind --tool=massif ./qtest -f traces/trace-17-complexity.cmd

會跑出 massif.out.xxxxx (xxxxx指編號) 的檔案,再輸入命令

$ massif-visualizer ./massif.out.xxxxx

編號依據不同的紀錄而有所差距,這裡附上 traces/trace-17-complexity.cmd 的 massif 圖像

可以看到上圖在最後的地方沒有釋放完記憶體,不確定是不是因為這樣導致使用 make test 時僅得到 95 分,在時間複雜度這裡一直過不了關,比較奇怪的是 make valgrind 卻可以滿分,猜測問題應該出在 qtest 裡面,但這跟 DudectDude, is my code constant time? 相關,目前仍在嘗試理解中

研讀 Linux 核心原始程式碼的 lib/list_sort.c

在第一行有

__attribute__((nonnull(2,3,4)))

參考 Risheng 的研讀筆記以及 __attribute__((nonnull)) function attribute ,可知上述函式的功能在於檢驗函式的引數是否為 NULL ,若是,則編譯器會發出警告

This function attribute specifies function parameters that are not supposed to be null pointers. This enables the compiler to generate a warning on encountering such a parameter.

後面出現了兩個做 merge 的函式,分別是回傳 list_head 指標的 merge 和沒有回傳值的 merge_final ,以下註解提到

 * Combine final list merge with restoration of standard doubly-linked
 * list structure.  This approach duplicates code from merge(), but
 * runs faster than the tidier alternatives of either a separate final
 * prev-link restoration pass, or maintaining the prev links
 * throughout.

list_sort 函式的最後可以看到 merge_final ,搭配註解知道 merge_final 主要的作用是可以維持或重建 prev 的連結

再來是 list_sort 的註解並且有介紹引數的部份

 * list_sort - sort a list
 * @priv: private data, opaque to list_sort(), passed to @cmp
 * @head: the list to sort
 * @cmp: the elements comparison function

priv 是一個 private 的成員並會傳值給 cmp ,根據 include/linux/list_sort.h 和下列敘述可以知道 cmp 的功能

 * The comparison function @cmp must return > 0 if @a should sort after
 * @b ("@a > @b" if you want an ascending sort), and <= 0 if @a should
 * sort before @b *or* their original order should be preserved.  It is
 * always called with the element that came first in the input in @a,
 * and list_sort is a stable sort, so it is not necessary to distinguish
 * the @a < @b and @a == @b cases.
  • cmp 的資料型態是 list_cmp_func_t
  • cmp 回傳值大於0時,會執行 ascending sort
  • cmp 回傳值小於等於0時,則執行 descending sort 或保留原本順序

再接下去看到的註解是說明這個 list_sort 如何避免 cache thrashing 的問題

 * This mergesort is as eager as possible while always performing at least
 * 2:1 balanced merges.  Given two pending sublists of size 2^k, they are
 * merged to a size-2^(k+1) list as soon as we have 2^k following elements.
 *
 * Thus, it will avoid cache thrashing as long as 3*2^k elements can
 * fit into the cache.  Not quite as good as a fully-eager bottom-up
 * mergesort, but it does use 0.2*n fewer comparisons, so is faster in
 * the common case that everything fits into L1.

來自 ChatGPT 的回答: 簡而言之 cache thrashing 是指緩存重複被寫入和移出資料,最後導致效能降低的現象

當資料數量超過 cache 的存儲空間時, cache 就會被迫重複寫入和移出資料,從而發生 cache thrashing 的現象

從上述可以得知, list_sort 在有

2k 個元素後就會合併成一個 size 為
2k+1
的 list ,如此資料數量就不會超過 cache
32k
的存儲空間,也就可以避免 cache thrashing 的現象發生

原來 list_sort 可以解決傳入資料過大而造成的 cache thrashing 的問題,接著我們試著引入 list_sort 與自己的寫的 mergesort 做比較,看看在資料量大的時候前者的效能是否真的比較好

加入命令到 qtest.c

首先,複製 list_sort.clist_sort.h 各一份到 lab0-c 檔案夾裡,接著到 qtest.c 裡面新增 do_list_sort 函式,並將 q_sort 函式改為 list_sort

bool do_list_sort(int argc, char *argv[])
{
    if (argc != 1) {
        report(1, "%s takes no arguments", argv[0]);
        return false;
    }

    int cnt = 0;
    if (!current || !current->q)
        report(3, "Warning: Calling sort on null queue");
    else
        cnt = q_size(current->q);
    error_check();

    if (cnt < 2)
        report(3, "Warning: Calling sort on single node");
    error_check();

    set_noallocate_mode(true);
    if (current && exception_setup(true))
-       q_sort(current->q);
+       list_sort(current->q);
    exception_cancel();
    set_noallocate_mode(false);

    bool ok = true;
    if (current && current->size) {
        for (struct list_head *cur_l = current->q->next;
             cur_l != current->q && --cnt; cur_l = cur_l->next) {
            /* Ensure each element in ascending order */
            /* FIXME: add an option to specify sorting order */
            element_t *item, *next_item;
            item = list_entry(cur_l, element_t, list);
            next_item = list_entry(cur_l->next, element_t, list);
            if (strcmp(item->value, next_item->value) > 0) {
                report(1, "ERROR: Not sorted in ascending order");
                ok = false;
                break;
            }
        }
    }

    q_show(3);
    return ok && !error_check();
}

do_list_sort 要放在 int main() 之前才會正確執行,算基本要注意但會不小心忘記的點

為了讓 list_sort 可以正常運作,我們要需要修改 list_sort.clist_sort.h 裡面的程式碼:

  • 移除 void *privlist_cmp_func_t cmp ,使用 strcmp 代替 cmp
if (strcmp(container_of(a, element_t, list)->value,
           container_of(b, element_t, list)->value) <= 0)
  • 移除 __attribute__(nonnull())
  • 移除 likelyunlikely 函式
if (unlikely(!++count))
    ...
if (likely(bits))
    ...
  • 移除 u8 count = 0;
  • 移除最後一行 EXPORT_SYMBOL(list_sort);

再來在 qtest.c 裡面找到 console_init 函式並以 ADD_COMMAND 增加一條新的命令

ADD_COMMAND(list_sort, "Sort with list_sort.c", "");

console.h 中可以找到巨集 ADD_COMMAND 的定義

#define ADD_COMMAND(cmd, msg, param) add_cmd(#cmd, do_##cmd, msg, param)

加入命令後,檢查有無成功,輸入 ./qtest -> help

  ih          str [n]      | Insert string str at head of queue n times. Generate random string(s) if str equals RAND. (default: n == 1)
  it          str [n]      | Insert string str at tail of queue n times. Generate random string(s) if str equals RAND. (default: n == 1)
+ list_sort                | Sort with list_sort.c
  log         file         | Copy output to file
  merge                    | Merge all the queues into one sorted queue

看到新增了一條名為list_sort 的命令後代表成功了,測試看看是否可以正常運作

l = [a a a t t b b b b]
cmd> list_sort
l = [a a a b b b b t t]

簡單測試後發現可以正常運作

比較 lib/list_sort.c 與 q_sort

參考 komark06 所撰寫的測試程式,在 scripts 檔案夾裡面新增 randstr.py 寫入隨機字串到 data.txt 之中

產生隨機字串

# randstr.py
import random
import string
import subprocess

def set_cursor(choice):
    if (choice):
        print("\x1b[?25h",end='')
    else:
        print("\x1b[?25l",end='')
        

if __name__ == "__main__":
    max_string_length = 1024
    end = 10000000

    set_cursor(False)
    with open("data.txt","w") as f:
        for i in range(end):
            f.write(''.join(random.choices(string.ascii_letters + string.digits, k=max_string_length))+"\n")
            print("{:.3f}%".format(i/end*100),end="\r")
    set_cursor(True)