Try   HackMD

2025q1 Homework1 (lab0)

contributed by < Andrushika >

作業書寫規範:

  • 無論標題和內文中,中文和英文字元之間要有空白字元 (對排版和文字搜尋有利)
  • 文字訊息 (尤其是程式執行結果) 請避免用圖片來表示,否則不好搜尋和分類
  • 共筆書寫請考慮到日後協作,避免過多的個人色彩,用詞儘量中性
  • 不要在筆記內加入 [TOC] : 筆記左上方已有 Table of Contents (TOC) 功能,不需要畫蛇添足
  • 不要變更預設的 CSS 也不要加入任何佈景主題: 這是「開發紀錄」,用於評分和接受同儕的檢閱
  • 在筆記中貼入程式碼時,避免非必要的行號,也就是該手動將 c=cpp= 變更為 ccpp。行號只在後續討論明確需要行號時,才要出現,否則維持精簡的展現。可留意「你所不知道的 C 語言: linked list 和非連續記憶體」裡頭程式碼展現的方式
  • HackMD 不是讓你張貼完整程式碼的地方,GitHub 才是!因此你在開發紀錄只該列出關鍵程式碼 (善用 diff 標示),可附上對應 GitHub commit 的超連結,列出程式碼是為了「檢討」和「便於他人參與討論」
  • 留意科技詞彙的使用,請參見「資訊科技詞彙翻譯」及「詞彙對照表
  • 不要濫用 :::info, :::success, :::warning 等標示,儘量用清晰的文字書寫。:::danger 則僅限授課教師作為批注使用
  • 避免過多的中英文混用,已有明確翻譯詞彙者,例如「鏈結串列」(linked list) 和「佇列」(queue),就使用該中文詞彙,英文則留給變數名稱、人名,或者缺乏通用翻譯詞彙的場景
  • 在中文敘述中,使用全形標點符號,例如該用「,」,而非 ","。注意書名號的使用,即 ,非「小於」和「大於」符號
  • 避免使用不必要的 emoji 字元

開發環境

$ gcc --version
gcc (Ubuntu 13.3.0-6ubuntu2~24.04) 13.3.0

$ lscpu
架構:                    x86_64
  CPU 作業模式:            32-bit, 64-bit
  Address sizes:          39 bits physical, 48 bits virtual
  Byte Order:             Little Endian
CPU(s):                   16
  On-line CPU(s) list:    0-15
供應商識別號:            GenuineIntel
  Model name:             11th Gen Intel(R) Core(TM) i7-11800H @ 2.30GHz
    CPU 家族:             6
    型號:                 141
    每核心執行緒數:         2
    每通訊端核心數:         8
    Socket(s):            1
    製程:                 1
    CPU(s) scaling MHz:   31%
    CPU max MHz:          4600.0000
    CPU min MHz:          800.0000
    BogoMIPS:             4608.00
Virtualization features:  
  虛擬:                   VT-x
Caches (sum of all):      
  L1d:                    384 KiB (8 instances)
  L1i:                    256 KiB (8 instances)
  L2:                     10 MiB (8 instances)
  L3:                     24 MiB (1 instance)
NUMA:                     
  NUMA 節點:              1
  NUMA node0 CPU(s):     0-15

queue.[ch] 佇列操作程式碼實作

q_free

為預防被釋放的記憶體空間被意外存取,先使用 q_release_elemententry 逐一刪除;最後釋放 head 所使用的記憶體空間。

void q_free(struct list_head *head)
{
    if (!head)
        return;
    element_t *entry = NULL, *safe = NULL;
    list_for_each_entry_safe (entry, safe, head, list) {
        q_release_element(entry);
    }
    free(head);
}

q_insert_head, q_insert_tail

實作步驟

  1. 檢查 head ,若為 NULL 直接 return false
  2. 使用 mallocentry 分配記憶體空間
  3. 使用 list_addlist_add_tail 將節點加入鏈結串列中

因為第二個步驟(malloc entry)在兩個函式中完全重複,所以獨立寫成 create_entry 函式:

element_t *create_entry(char *s)
{
    element_t *entry = malloc(sizeof(element_t));
    if (!entry)
        return false;

    char *s_dup = strdup(s);
    if (!s_dup) {
        free(entry);
        return false;
    }

    entry->value = s_dup;

    return entry;
}

q_insert_head, q_insert_tail 如下,可以直接呼叫 create_entry 使用:

bool q_insert_head(struct list_head *head, char *s)
{
    if (!head)
        return false;
    element_t *entry = create_entry(s);
    list_add(&entry->list, head);

    return true;
}
bool q_insert_tail(struct list_head *head, char *s)
{
    if (!head)
        return false;
    element_t *entry = create_entry(s);
    list_add_tail(&entry->list, head);

    return true;
}

q_remove_head, q_remove_tail

一開始在閱讀 queue.h 時,發現這兩個函式關於 *sp 的 annotation 有些奇怪:

q_remove_tail() - Remove the element from tail of queue
@sp: string would be inserted

應修正如下較貼近原意:

@sp: Pointer to a buffer where the removed element's string will be copied.

實作步驟

  1. 檢查 head ,若為 NULLemptyreturn NULL
  2. 使用 list_first_entrylist_last_entry 取得欲移除的 element_t*
  3. 將欲刪除的 entry->value 存入 *sp
  4. 使用 list_del 將其從鏈結串列中刪除

因兩者的實作幾乎相同,在此僅附上 q_remove_head 的程式碼:

element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize)
{
    if (!head || list_empty(head))
        return NULL;
    element_t *entry = list_first_entry(head, element_t, list);

    if (sp && bufsize > 0) {
        strncpy(sp, entry->value, bufsize - 1);
        sp[bufsize - 1] = '\0';
    }
    list_del_init(&entry->list);
    return entry;
}

q_delete_mid

本函式的實作難點是「找到正確的 mid node」,因為一開始沒看清楚題目的要求,所以折騰很久。
根據 queue.h 的敘述,應該刪除 ⌊n/2⌋th node

實作步驟

  1. 檢查 head ,若為 NULLemptyreturn false
  2. 使用 q_find_mid 搭配快慢指標技巧,找到欲刪除的中點
  3. 使用 Linux Kernel list API 提供的 list_del 刪除節點,並釋放記憶體空間
struct list_head *q_find_mid(struct list_head *head)
{
    // The middle node of a linked list of size n is the
    // ⌊n / 2⌋ th node from the start using 0-based indexing.

    if (!head || list_empty(head))
        return NULL;

    // handle the 1-node case
    if (list_is_singular(head))
        return head->next;

    // find the middle node
    struct list_head *slow = head->next, *fast = slow->next->next;
    while (fast != head && fast->next != head) {
        slow = slow->next;
        fast = fast->next->next;
    }

    return slow->next;
}
bool q_delete_mid(struct list_head *head)
{
    // https://leetcode.com/problems/delete-the-middle-node-of-a-linked-list/

    if (!head || list_empty(head))
        return false;

    // find the middle node
    struct list_head *mid = q_find_mid(head);

    element_t *target = list_entry(mid, element_t, list);
    // delete node from the list
    list_del(&target->list);
    q_release_element(target);
    return true;
}

q_delete_dup

在已排序過的鏈節串列中,刪除重複出現的節點。

實作步驟

  1. 檢查 head ,若為 NULLemptyreturn false
  2. 使用 list_for_each_safe 安全走訪節點(因涉及刪除操作,使用 safe mode)
    a. 檢查 currnext 是否相同;若出現重複,則持續刪除 next
    b. 如果出現重複,curr 本身也需要被刪除
bool q_delete_dup(struct list_head *head)
{
    // https://leetcode.com/problems/remove-duplicates-from-sorted-list-ii/

    if (!head || list_empty(head))
        return false;

    struct list_head *curr, *next;

    list_for_each_safe (curr, next, head) {
        element_t *entry = list_entry(curr, element_t, list);
        if (next != head &&
            strcmp(entry->value, list_entry(next, element_t, list)->value) ==
                0) {
            while (next != head &&
                   strcmp(entry->value,
                          list_entry(next, element_t, list)->value) == 0) {
                list_del(next);
                q_release_element(list_entry(next, element_t, list));
                next = curr->next;
            }
            list_del(curr);
            q_release_element(entry);
        }
    }
    return true;
}

q_swap

實作步驟

  1. 檢查 head,若為 NULLempty 則直接 return
  2. 使用 list_for_each_safe 安全走訪節點
    a. 取出 curr(當前節點)與 next(下一個節點)
    b.nexthead,表示已走訪完整個鏈結串列,則跳出迴圈
    c. 使用 list_delnext 從原位置移除
    d. 使用 list_addnext 插入到 curr 前一個位置(節點交換)
    e. 更新 next,使其指向 curr->next,確保走訪能正確進行

其中 2.e 步驟看起來有些微妙。
因為 swap 時是兩兩一組,所以會希望 iterator 一次跳躍兩個節點。
next = curr->next 除了可以調整 curr, nextswap 過後的相對位置外,也剛好讓 iterator 前進了一步;加上 list_for_each_safe 的迴圈更新條件,剛好可以讓 iterator 達成前進兩步的目標。

void q_swap(struct list_head *head)
{
    if (!head || list_empty(head))
        return;
    // https://leetcode.com/problems/swap-nodes-in-pairs/
    struct list_head *curr, *next;
    list_for_each_safe (curr, next, head) {
        if (next == head)
            break;
        list_del(next);
        list_add(next, curr->prev);
        next = curr->next;
    }
}

q_reverse

實作步驟

  1. 檢查 head,若為 NULLempty 則直接 return
  2. 使用 list_for_each_safe 安全走訪節點
  3. 依序把節點移除,並插入鏈結串列的最前方

實際上就像當作 stack 進行操作,就可以達到翻轉鏈節串列的效果。

void q_reverse(struct list_head *head)
{
    if (!head || list_empty(head))
        return;
    struct list_head *curr, *next;
    list_for_each_safe (curr, next, head) {
        list_del(curr);
        list_add(curr, head);
    }
}

q_reverseK

這個函式的實作,我參照了先前 reverse 的實作方式:

  • 不斷將新節點插入鏈節串列的最前方,來達成翻轉的效果。
  • 不同的是,因為是以 k 個節點為一組進行翻轉,在每次走訪時,用一個 count 來紀錄當前累積走過了多少節點;當 count == k 就立刻進行翻轉操作。
  • 雖仿造 stack , 但插入點不會永遠都在 head ,所以必須不斷紀錄新的插入點。

變數說明

pending:下一輪即將進行 reverse 操作的節點
last:進行局部 reverse 操作時,pending 的插入點
curr, next:提供給 list_for_each_safe 走訪使用的 iterator
count:紀錄當前組別已累積走過多少個 node

void q_reverseK(struct list_head *head, int k)
{
    // https://leetcode.com/problems/reverse-nodes-in-k-group/
    if (!head || list_empty(head) || k <= 1)
        return;
    // nodes waiting to be add & start add point
    struct list_head *pending = head->next, *last = head;
    struct list_head *curr, *next, *next_pend;
    int count = 0;
    list_for_each_safe (curr, next, head) {
        if (++count >= k) {
            while (count) {
                count--;
                next_pend = pending->next;
                list_del(pending);
                list_add(pending, last);
                pending = next_pend;
            }
            last = next->prev;
        }
    }
}

merge_two_sorted_list (utils)

因為後續的 sort, merge 都會涉及合併兩個「已排列鏈節串列」,所以先寫成一個工具函式來簡化操作。需要注意的是,傳入這個函式處理的 list 要先打斷 circular list 的頭尾連結,且不需要將 guard node 傳入。

實作步驟

  1. 初始化 guard 節點,tail 指向 guard 以追蹤新鏈結串列的最後節點。
  2. 合併兩個排序鏈結串列,依 q_cmp() 比較 ab,將較小者連接到 tail,並前進到下一節點。
  3. 處理剩餘節點,若 ab 尚未走訪完,直接連接至 tail
  4. 回傳排序後的鏈結串列起點(guard.next)。

這裡的實作比較冗長,且課程當中已有提及類似程式,故不在此詳細列出。
詳情請見 github repo

q_sort

top-down merge sort 的問題

  • 當遇到 size=2M 的 test case,會一直出現程式執行時間過長的提示。
  • 主要是因為在分割鏈結串列時,會使用到前面提及的 q_find_mid,其時間複雜度為 O(n)。
  • 且儲存分割完成的鏈結串列也需要額外的記憶體空間,故改採使用 bottom-up 方式實作。

bottom-up merge sort 實作

  • 概念很簡單:每次造訪一個節點,當遇到可以合併的(size 相同的兩個 list),就馬上進行合併。
  • 全部的節點走訪完,再把剩下未合併的 list 逐一合併。
  • 合併可以直接呼叫之前實作的 merge_two_sorted_list
  • 不必找 list 分割中點,省下了大把時間。

會需要額外使用一個 part[] 來存放部份合併完成的 list
part[] 有特殊的存放規則:part[n] 中若不為 empty,就是大小為 2nlist
(這樣一來,檢查是否存在 size 相同的 list 就相當方便!)

void q_sort(struct list_head *head, bool descend)
{
    struct list_head *list;
    struct list_head *part[65];
    int level;

    if (!head || list_empty(head) || list_is_singular(head))
        return;

    // break circular
    head->prev->next = NULL;
    head->prev = NULL;

    list = head->next;
    INIT_LIST_HEAD(head);

    // init part[]
    memset(part, 0, sizeof(part));

    while (list) {
        struct list_head *curr = list;
        struct list_head *next = list->next;

        curr->prev = NULL;
        curr->next = NULL;

        list = next;

        // the number of nodes in part[] will always be 0 or 2^level
        // [1, 2, 4, 8, 16...] or [0, 2, 0, 0, 16] are acceptable
        // depends on level (idx)
        // keep merging until unable to merge
        for (level = 0; part[level]; level++) {
            curr = merge_two_sorted_list(descend, part[level], curr);
            part[level] = NULL;
        }
        part[level] = curr;
    }

    list = NULL;
    for (level = 0; level < (int) (sizeof(part) / sizeof(part[0])); level++) {
        if (!part[level])
            continue;
        list = merge_two_sorted_list(descend, part[level], list);
    }

    // reconstruct list
    while (list) {
        struct list_head *next = list->next;
        list_add_tail(list, head);
        list = next;
    }
}

此處排序時是使用 strcmp ,所以節點是依照字典序 (Lexicographic order) 排列。

e.g.
如果對 [17, 2, 189, 38] 進行字典序排列:
直覺上的數字由小到大排列結果,會是 [2, 17, 38, 189] ( X )
但實際的字典序排列結果為: [17, 189, 2, 38] ( O )

q_ascend, q_descend

本函式將會維護一個單調嚴格遞增(or 遞減)的 list

在非鏈結串列的情境下,可以透過 stack 達成:

  • 每次插入新數值到 list 時,和 stack 頂部的節點比大小
  • stack 頂部節點會違反遞增 or 遞減的條件,則持續 pop stack 直到符合條件為止

事實上,即使不使用 stack,也可以在 doubly linked list 上實作這個函式。
透過維護一個 *tail,其實際效用和 stack.top() 類似,代表維護的單調序列的最後一個節點位置。

因兩者實作幾乎相同,僅附上 q_ascend 作為參考:

int q_ascend(struct list_head *head)
{
    // https://leetcode.com/problems/remove-nodes-from-linked-list/

    if (!head || list_empty(head))
        return 0;
    if (list_is_singular(head))
        return 1;

    struct list_head *tail = head->next, *curr, *tmp;

    list_for_each (curr, head) {
        while (tail != head &&
               strcmp(list_entry(curr, element_t, list)->value,
                      list_entry(tail, element_t, list)->value) < 0) {
            tmp = tail->prev;
            list_del(tail);
            q_release_element(list_entry(tail, element_t, list));
            tail = tmp;
        }
        tail = curr;
    }

    return q_size(head);
}

q_merge

實作步驟

  1. 檢查 head 是否為 NULL or empty,若是則直接回傳 0
  2. 初始化變數,設定 curr(當前 queue),result(合併後的結果)
  3. 使用 list_for_each_entry 走訪 head 中的每個 queue_context_t,進行合併
    a. 斷開 curr->q(原為 circular list),確保 merge_two_sorted_list() 正常運作
    b. 將 curr->q 併入 result,維持 result 為當前已排序的結果
    c. curr->q->next = curr->q; 清除 q 之中已經排序完成的節點
  4. 將合併後的結果存入第一個 queue
    a. 取 head->next 作為 ans_entry,初始化 ans_entry->q
    b. 使用 list_add_tail 逐一將result 中的節點加入 ans_entry->q,重建合併完成的 queue
  5. 回傳合併後的 q_size(ans_entry->q)
int q_merge(struct list_head *head, bool descend)
{
    // https://leetcode.com/problems/merge-k-sorted-lists/
    if (!head || list_empty(head))
        return 0;

    queue_contex_t *curr = NULL, *ans_entry;

    struct list_head *result = NULL;

    list_for_each_entry (curr, head, chain) {
        curr->q->prev->next = NULL;
        result = merge_two_sorted_list(descend, result, curr->q->next);
        curr->q->next = curr->q;
    }

    ans_entry = list_entry(head->next, queue_contex_t, chain);
    INIT_LIST_HEAD(ans_entry->q);

    while (result) {
        struct list_head *next = result->next;
        list_add_tail(result, ans_entry->q);
        result = next;
    }

    return q_size(ans_entry->q);
}

研讀 Linux 核心原始碼 list_sort.c

引入 list_sort.c

以 2023 年的作業觀摩中 Risheng1128 同學的實作方式為參考

首先,加入 list_sort.clist_sort.h 到專案中。

  • 在 linux kernel 中實作的版本,會傳入 void *priv 參數,而在 lab0-c 中都用不到,可以直接拿掉
  • 需要使 cmp 函式有接收 descend 參數的功能,原先想要直接偷 queue.c 裡之前做的來用;但因 queue.h 不允許修改,所以直接複製一份 q_cmplist_sort.c

更動範例如下(僅舉例,其他省略,請參閱 repo):

- static struct list_head *merge(void *priv, list_cmp_func_t cmp, 
-                            struct list_head *a, struct list_head *b)
+ static struct list_head *merge(struct list_head *a, struct list_head *b, int descend)

為了後續測試時切換方便,在 qtest.c 的 console_init() 中加入 list_sort param,讓使用者可以直接指定要使用的排序演算法。

add_param("listsort", &use_list_sort, "use linux kernel style sorting algorithm from lib/list_sort.c", NULL);

do_sort() 實作的地方,依據 use_list_sort(option 會改變的變數),去判斷當前應該使用哪個排序演算法:

if (current && exception_setup(true)){
    if (use_list_sort)
        list_sort(current->q, descend);
    else
        q_sort(current->q, descend);
}

與我自己的 q_sort() 進行效能比較

首先,在 trace/ 目錄下新增 trace-sort.cmd
(根據需要,可以自行選擇要測試的 sorting 模式、資料量等等)

option listsort 1
new
ih RAND 1000000
time
sort
time

上方將會選用 lib/list_sort.c 中的演算法,執行 1000000 次插入操作後,進行排序與計時。
因為要測試的數據量比較大,會觸發到 harness.c 中定義的 alarm 觸發時間,而造成程式直接中斷;為了測試方便,先給一個比較寬容的 time_limit

// harness.c
static int time_limit = 10;

接下來,在 script/ 目錄下新增用來統計排序執行時間的 test_sort_perf.py

import subprocess
import re

NUM_RUNS = 10
delta_times = []

for i in range(NUM_RUNS):
    result = subprocess.run(["./qtest", "-f", "traces/trace-sort.cmd"],
                            capture_output=True, text=True)
    output = result.stdout

    matches = re.findall(r"Delta time\s*=\s*([\d\.]+)", output)
    if len(matches) < 2:
        print(f"{i+1}th test: No Delta time finded.")
        continue

    delta_time = float(matches[1])
    delta_times.append(delta_time)
    print(f"{i+1}th test: Delta time = {delta_time}")

if delta_times:
    avg = sum(delta_times) / len(delta_times)
    print(f"\naverage Delta time of {len(delta_times)} of sorting: {avg}")
else:
    print("No Delta time finded.")

這段 script 將會連續執行 NUM_RUNS 次(預設為10)排序,並用 list 紀錄下每次排序花費的時間,最終對所有秒數取平均作為結果。

測試結果

1M 筆資料
Round q_sort list_sort
1 2.722 2.582
2 2.74 2.614
3 2.629 2.459
4 2.66 2.447
5 2.741 2.386
6 2.605 2.497
7 2.782 2.471
8 2.762 2.443
9 2.582 2.396
10 2.592 2.393
AVG 2.6815 2.4688
2M 筆資料
Round q_sort list_sort
1 6.075 5.567
2 5.812 5.564
3 5.84 5.724
4 5.978 5.823
5 5.845 5.494
6 6.351 5.647
7 6.15 5.672
8 5.817 5.565
9 6.18 5.727
10 5.76 5.736
AVG 5.98 5.652

可以觀察到:

  • 在資料量 1M 時,list_sort 比我的 q_sort 快大約 0.22
  • 在資料量 2M 時,list_sort 比我的 q_sort 快大約 0.33

解釋 select 系統呼叫於本程式的使用

console.c 處理 web 指令

當使用者輸入 command web 之後,將會建立 server socket,並使 linenoise.c 裡面的 eventmux_callback 的 function pointer 指向 web_eventmux()

static bool do_web(int argc, char *argv[])
{
    // init... (ignore)
    
    web_fd = web_open(port);
    if (web_fd > 0) {
        printf("listen on port %d, fd is %d\n", port, web_fd);
        line_set_eventmux_callback(web_eventmux);
        use_linenoise = false;
    } else {
        // handle error
    }
    return true;
}

linenoise.c 中的 line_edit()

line_edit() 中使用 while 讀取輸入的地方,可以看到幾行程式碼:

while (1) {
    // init...(ignore)

    if (eventmux_callback != NULL) {
        int result = eventmux_callback(l.buf);
        if (result != 0)
            return result;
    }
    // handle input from command line...(ignore)
}

eventmux_callback 就是先前在 command line 執行 web 所設置的 function pointer,可以用來 handle 來自 web server 的命令。

web.c 中的 web_eventmux()

這裡是主要使用系統呼叫 select() 的地方。select() 採用了 I/O Multiplexing Model,參考下圖:

image

web_eventmux() 中同時監聽來自 web server 和 stdin 的 file descriptor,任何一方觸發都會使程式繼續執行下去。
本函式主要 handle 來自 web server 的事件;因此,若是 stdin 觸發監聽,將不會進行對命令的處理,而是回到 linenoise.c 中的 line_edit() 才處理。

int web_eventmux(char *buf)
{
    fd_set listenset;

    FD_ZERO(&listenset);
    FD_SET(STDIN_FILENO, &listenset);
    int max_fd = STDIN_FILENO;
    if (server_fd > 0) {
        FD_SET(server_fd, &listenset);
        max_fd = max_fd > server_fd ? max_fd : server_fd;
    }
    int result = select(max_fd + 1, &listenset, NULL, NULL, NULL);
    if (result < 0)
        return -1;

    if (server_fd > 0 && FD_ISSET(server_fd, &listenset)) {
        //handle cmd from web server...(ignore)
        return strlen(buf);
    }

    FD_CLR(STDIN_FILENO, &listenset);
    return 0;
}

運用 RIO 套件的考量點

RIO 全名為 Robust I/O,根據卡內基美隆大學的課程中的介紹,RIO 可以有效解決 short count 的問題。除此之外,因為每次呼叫 read() 都需要從 user mode 切換到 kernel mode,如果頻繁呼叫會大幅降低效能。
RIO 套件的做法是:當系統呼叫 read() 時,一次性讀取一大塊資料到 user space buffer 中 (同時處理 short count),後續再多次從 buffer 取用即可;當讀到 buffer 為空時,才再次系統呼叫。

rio_t 結構

其中 count 代表著 buffer 中尚未被取用的資料長度。
所以 count <= 0 時,代表 buffer 已經空了,需要再次 read()

typedef struct {
    int fd;            /* descriptor for this buf */
    int count;         /* unread byte in this buf */
    char *bufptr;      /* next unread byte in this buf */
    char buf[BUFSIZE]; /* internal buffer */
} rio_t;

rio_read()

static ssize_t rio_read(rio_t *rp, char *usrbuf, size_t n)
{
    int cnt;
    while (rp->count <= 0) { /* refill if buf is empty */
        rp->count = read(rp->fd, rp->buf, sizeof(rp->buf));
        if (rp->count < 0) {
            if (errno != EINTR) /* interrupted by sig handler return */
                return -1;
        } else if (rp->count == 0) { /* EOF */
            return 0;
        } else
            rp->bufptr = rp->buf; /* reset buffer ptr */
    }

    /* Copy min(n, rp->count) bytes from internal buf to user buf */
    // 處理 short count     
    cnt = n;
    if (rp->count < n)
        cnt = rp->count;
    memcpy(usrbuf, rp->bufptr, cnt);
    rp->bufptr += cnt;
    rp->count -= cnt;
    return cnt;
}

console.c 中很酷的 buf_stack

一開始讀到以下程式碼,覺得很困惑,什麼情景下會用到這個?

static rio_t *buf_stack;

//
/* Create new buffer for named file.
 * Name == NULL for stdin.
 * Return true if successful.
 */
static bool push_file(char *fname)
{
    int fd = fname ? open(fname, O_RDONLY) : STDIN_FILENO;
    has_infile = fname ? true : false;
    if (fd < 0)
        return false;

    if (fd > fd_max)
        fd_max = fd;

    rio_t *rnew = malloc_or_fail(sizeof(rio_t), "push_file");
    rnew->fd = fd;
    rnew->count = 0;
    rnew->bufptr = rnew->buf;
    rnew->prev = buf_stack;
    buf_stack = rnew;

    return true;
}

/* Pop a file buffer from stack */
static void pop_file()
{
    if (buf_stack) {
        rio_t *rsave = buf_stack;
        buf_stack = rsave->prev;
        close(rsave->fd);
        free_block(rsave, sizeof(rio_t));
    }
}

後來找到使用了 push_file() 的地方:

static bool do_source(int argc, char *argv[])
{
    // error hadling...(ignore)
    
    if (!push_file(argv[1])) {
        report(1, "Could not open source file '%s'", argv[1]);
        return false;
    }

    return true;
}

發現原來是執行 source 的時候會用到,是為了處理 nested source commands 的情況。
舉例來說,以下有三個 file,將執行一些命令:

fileA
> source fileB
> it 6
fileB
> source fileC
> it 3
fileC
> new
> it 8
result
q = [8, 3, 6]

如果使用者執行 ./qtest 後輸入 source fileA,就會造成遞迴式的開檔、執行,而先前宣告的 buf_stack 就是為了因應這種情況而存在。

每當讀到 source,代表需要開新的檔案;新增、初始化新檔案對應的 rio_t 之後,將其 pushbuf_stack 中;直到檔案內容被讀完就 pop_file(),就會回到先前讀到一半的檔案狀態。
可以對應到 console.c 中 readline() 的其中一段程式碼:
(系統呼叫 read() 之後,發現沒有剩餘的可讀資料,代表 EOF)

if (buf_stack->count <= 0) {
    /* Need to read from input file */
    buf_stack->count = read(buf_stack->fd, buf_stack->buf, RIO_BUFSIZE);
    buf_stack->bufptr = buf_stack->buf;
    if (buf_stack->count <= 0) {
        /* Encountered EOF */
        pop_file();
        if (cnt > 0) {
            /* Last line of file did not terminate with newline. */
            /*  Terminate line & return it */
            *lptr++ = '\n';
            *lptr++ = '\0';
            if (echo) {
                report_noreturn(1, prompt);
                report_noreturn(1, linebuf);
            }
            return linebuf;
        }
        return NULL;
    }
}

可能的改善方向

當我在閱讀這段 code 的時候,覺得有些不直覺。目前的實作會在 line_edit() 裡面同時檢查來自 web server 的 request。
在 OOP 中有所謂的 SOLID 原則(即便 C 並非 OO),其中的單一職責原則 (Single-resbonsibility principle) 提到,函式應將關注點分離,每個函式只專注做一件事情。
因此,line_edit() 是否應該專注於處理來自 command line 的命令?

又剛好看到老師老師撰寫的並行程式設計:排程器,裡面的 event_loop() 實作給了我一些啟發:
image

我想可以參考這樣的函式呼叫方式,但不以 coroutine 的方式實作。
是否有可能仿照,在 main_loop() 中僅去呼叫 select();當 web server 或 command line 事件被觸發時,才去呼叫對應的 handling 函式?

實作 shuffle 命令

queue.c 裡新增了 q_shuffle,實作參考的是 Fisher–Yates shuffle 演算法。
考量到原演算法中有多次存取陣列內容、swap 操作,而我需要對 doubly linked list 進行 shuffle;在不以其他資料結構加速存取過程的狀況下,查找特定節點的時間複雜度為 O(n)

所以實作時佔以一個 struct list_head **arr 存下各節點的位置,待 shuffle 操作執行完畢後,再逐一將節點串起來。

q_shuffle 實作

void q_shuffle(struct list_head *head)
{
    if (!head || list_empty(head) || list_is_singular(head))
        return;

    int len = q_size(head), i = 0;
    struct list_head **arr = malloc(len * sizeof(struct list_head *));
    struct list_head *curr, *tmp;

    list_for_each (curr, head) {
        arr[i++] = curr;
    }

    // pick a node randomly and swap
    for (i = len - 1; i > 0; i--) {
        int j = rand() % (i + 1);
        tmp = arr[i];
        arr[i] = arr[j];
        arr[j] = tmp;
    }

    // reconstruct the doubly linked list
    for (i = 0; i + 1 < len; i++) {
        arr[i]->next = arr[i + 1];
        arr[i + 1]->prev = arr[i];
    }

    head->next = arr[0];
    arr[0]->prev = head;

    head->prev = arr[len - 1];
    arr[len - 1]->next = head;

    free(arr);
}

以 Chi-squared test 檢測「亂度」

為了要檢測每次的 shuffle 是否足夠隨機,可以套用假設檢定的技巧來判斷。

所謂「夠亂」,可以視為在做了足夠多次 shuffle 後,每組排列組合出現的機率相等(Uniform Distribution)。

為此,可以使用卡方檢定。該檢定可以用於判斷兩機率分佈是否相似。基於作業說明中提供的測試用 script,進行 shuffle 操作 1000000 次,得到各排列組合的次數分佈如下:

image

Chi-sqaured value 公式

def chiSquared(observation, expectation):
    return ((observation - expectation) ** 2) / expectation 

實驗數據

自由度:234 個數字共有 4! = 24 種排列組合,自由度 = N-1
期望頻率:1000000 / 24 = 41666
Chi-squared value sum: 24.662986607785726
p-value: 0.3678926504294987

因 p-value 大於顯著水準 (alpha = 0.05),無法拒絕 H0
H0:shuffle 結果之分佈不為 uniform distribution)

排列組合 出現頻率 Chi-squared value
1234 41488 0.7604281668506696
1243 42096 4.437671002736044
1324 41581 0.17340277444439112
1342 41835 0.6854749675994816
1423 41853 0.8392694283108529
1432 41970 2.218019488311813
2134 41631 0.02940047040752652
2143 41550 0.322949167186675
2314 41584 0.16137858205731292
2341 41872 1.018480295684731
2413 41318 2.906542504680075
2431 41879 1.0888734219747516
3124 41698 0.024576393222291555
3142 41459 1.0283924542792684
3214 41558 0.2799404790476648
3241 41873 1.0283924542792684
3412 41534 0.41818269092305477
3421 41842 0.7434358949743196
4123 41823 0.5915854653674458
4132 41516 0.5400086401382422
4213 41787 0.35138962223395576
4231 41521 0.5046080737291797
4312 41429 1.3480775692411078
4321 41303 3.1625066001056017

改善作業說明所提供的 test script

作業說明中的 script 存在小缺陷,導致執行效率極差,問題出在模擬輸入 command 的地方:

test_count = 1000000
input = "new\nit 1\nit 2\nit 3\nit 4\n"
for i in range(test_count):
    input += "shuffle\n"
input += "free\nquit\n"

在 python 中的字串處理,直接使用 += 進行操作,實際上是重複 allocate 新記憶體空間、複製舊字串、貼上新字串;時間複雜度可達到 O(n2)

用以下方式改善:

input_commands = ["new\n", "it 1\n", "it 2\n", "it 3\n", "it 4\n"]
input_commands.extend(["shuffle\n"] * test_count)
input_commands.append("free\nquit\n")
input = "".join(input_commands)

先對 list 進行新增重複性高的 command substring,再一次性使用 join() 建立字串,可以省下不必要的 allocate 操作。

研讀論文〈Dude, is my code constant time?〉

作者提出了一種使用統計學上「假設檢定」的方式,來檢測函式是否為常數時間複雜度。可以大致總結為以下步驟:

  1. 測量函式執行時間
    首先將 input data 分為兩大類:fix class, random class

    • fix class 的資料內容固定,且可以在常數執行時間完成
    • random class 為隨機產生

    把兩類資料餵給欲測試的函式,並分類記錄下每次函式執行所花的時間。
    (在實作中,執行時間使用 cpu_cycles() 測量)

  2. 資料後處理 (post-processing)
    測量結果常會受到外部環境影響(如 OS interrupt),而出現異常極端值。因此,對數據後處理可以提升分析的精準度。
    原專案中,會給定一系列的裁切比例,將測量值較高的部分數據丟棄。

  3. 套用假設檢定
    使用 Welch’s t-test,檢測剛剛獲得的兩類資料的執行時間分佈是否相似。如果 random classfix class 的執行時間分布不同,則可以合理推斷該函式「非」常數時間複雜度。

原專案的實作步驟

  1. 統計一函式的執行時間
    • 使用 cpu_cycles() 取得 before_ticksafter_ticks
    • exec_time = after_ticks - before_ticks
  2. 用測得的時間資料,透過 t_push() 在線更新 mean, m2, n
  3. 呼叫 t_compute() 透過 mean, m2, n 計算 t_value
  4. 使用 t_value 判斷兩個 class 的分佈是否存在顯著差異,若 abs(t_value) 高於一定 threshold,則可合理判斷兩者分佈不同

t_push

每次計算完 exec_time 都會將其加入統計資料中。
此時需要使用線上更新的方式,對 mean, m2, n 做更新。

online 數值更新公式
n 更新

nn+1

delta 更新

δxx¯old

平均數 mean 更新

x¯newx¯old+δn

累積變異數 M2 更新

M2,newM2,old+δ(xx¯new)

void t_push(t_context_t *ctx, double x, uint8_t class)
{
    assert(class == 0 || class == 1);
    ctx->n[class]++;

    /* Welford method for computing online variance
     * in a numerically stable way.
     */
    double delta = x - ctx->mean[class];
    ctx->mean[class] = ctx->mean[class] + delta / ctx->n[class];
    ctx->m2[class] = ctx->m2[class] + delta * (x - ctx->mean[class]);
}

t_compute

利用先前統計資料計算 t-value,公式如下:
t=X¯0X¯1s02N0s12N1

S0, S1 可以先前得到的累積變異數 M2 求得:
s2=M2n1

double t_compute(t_context_t *ctx)
{
    double var[2] = {0.0, 0.0};
    var[0] = ctx->m2[0] / (ctx->n[0] - 1);
    var[1] = ctx->m2[1] / (ctx->n[1] - 1);
    double num = (ctx->mean[0] - ctx->mean[1]);
    double den = sqrt(var[0] / ctx->n[0] + var[1] / ctx->n[1]);
    double t_value = num / den;
    return t_value;
}

lab0-c 中的 dudect 現存問題

本機端與 github runner 的測試結果不同

此節屬於個人猜測,真正的原因仍須更完整的驗證

在本機端執行 make test 時,都能全部通過(100/100)。但推送到 github 時,會發生最後一項 test complexity 無法通過的狀況(95/100)。
(甚至會發生 insert_tail 通過,但 insert_head 沒有通過的奇怪現象)

我認為可能是本地端的執行環境和 github runner 環境不同所造成,在虛擬環境下的 exec_times 資料可能包含更多雜訊(如 OS interrupt 等),測量結果不準確。

缺乏 post-processing 的 percentile() 實作

根據作業說明,在 lab0-c 未有完整實作 percentile 功能。

在計算 t-value 時丟棄部份 exec_time 極端值,可以更精確的確認 random classfix class 的分佈是否相似。

  • 解決方式:給定多筆指定的 percentile,並同時維護不同 percentile 下的 t-value。這些 t-values 將用於進一步判斷函式是否為常數時間複雜度。
  • e.g. 給定 100percentile,會需要維護 100+1t-value
    (加上未經極端值處理的 raw data)

心中未解的疑惑:
目前的實作是對所有 t-values 取最大值,但這樣是否有助於改善極端值所帶來的影響?極端值仍會影響到那組未 crop 的 raw data;而該組 t-value 也可以去影響到 max t-value 的計算結果。

在 lab0-c 裡引入 percentile 實作後,可以順利通過全部的測試 (100/100)
但為何可以成功改善,我還搞不懂,仍須進一步了解。