Try   HackMD

2024q1 Homework1 (lab0)

contributed by < csotaku0926 >

開發環境

$ gcc --version
gcc (Ubuntu 11.4.0-1ubuntu1~22.04) 11.4.0

$ lscpu
Architecture:            x86_64
CPU op-mode(s):          32-bit, 64-bit
Byte Order:              Little Endian
Address sizes:           39 bits physical, 48 bits virtual
CPU(s):                  12
On-line CPU(s) list:     0-11
Thread(s) per core:      2
Core(s) per socket:      8
Socket(s):               1
NUMA node(s):            1
Vendor ID:               GenuineIntel
CPU family:              6
Model:                   154
Model name:              12th Gen Intel(R) Core(TM) i5-12450H
Stepping:                3
CPU max MHz:             4400.0000
CPU min MHz:             400.0000
BogoMIPS:                4992.00
Virtualization:          VT-x
L1d cache:               320 KiB 
L1i cache:               384 KiB 
L2 cache:                7 MiB 
L3 cache:                12 MiB 
NUMA node0 CPU(s):       0-11

針對佇列操作的程式碼實作

q_new

struct list_head *q_new()
{
    struct list_head *head = malloc(sizeof(struct list_head));
    if (!head)
        return NULL;

    INIT_LIST_HEAD(head);
    return head;
}

需要注意由 harness.h 中巨集定義的 malloc 可能會有分配 配置失敗的狀況,這時會回傳 NULL 需要加以判斷。

allocate 翻譯為「配置」,而非「分配」,是為了跟「分發」(dispatch) 區隔,二者都會在作業系統領域多次出現。

local variable 翻譯為「區域變數」,而非「局部變數」,以避免跟 partial 一詞混淆。在程式語言理論中,存在 "partial" 變數。

此處的「這邊也可以」和「一下」無助於溝通。

這邊也可以 複習一下 函式回傳值的部份:回傳區域變數的位址是不合法的,例如:

struct list_head head;
return &head;

根據 C99 [6.2.4] Storage Duration of Objects:

  • An object has a storage duration that determines its lifetime. There are three storage durations: static, automatic, and allocated.
  • An object whose identifier is declared with no linkage and without the storage-class specifier static has automatic storage duration.

由此可知道 head 適用於 automatic storage

  • The lifetime of an object is the portion of program execution during which storage is guaranteed to be reserved for it. An object exists, has a constant address and retains its last-stored value throughout its lifetime. If an object is referred to outside of its lifetime, the behavior is undefined.

head 於他的存活時間 (lifetime) 之外存取,將被定義為 undefined behavior 。上述的「不合法」是不精確的說詞

「我們都知道」這樣的陳述無助於專業溝通。
「回傳局部變數的位址是不合法的」不是精準的說法,請查閱 C 語言規格書,列出對應的描述並解讀。

已修正

但如果是回傳指標,那就是回傳區域變數的數值 (value),正如 q_new 的回傳值,這是沒問題的

q_free

初步的想法如下

void q_free(struct list_head *l) 
{
+   if(!l)
+       return;

    element_t *entry, *safe;
    list_for_each_entry_safe(entry, safe, l, list){
        free(entry->value);
        free(entry);
    }
        
    free(l);
}

參閱 free(3) 可知 "If ptr is NULL, no operation is performed.",意味著你不用撰寫形如 if (entry->value) { free(entry->value); } 的程式碼,直接呼叫 free 即可,不用額外的 if 敘述。

已修正

根據 你所不知道的 C 語言: linked list 和非連續記憶體 一文,可以得知,在給定的結構體中加入 sturct list_head 就可使用一致存取介面的鏈結串列。
正因如此,實作 q_free 時不只有 struct list_head,還有 struct element_t 的記憶體要被釋放。
在逐一存取並進行刪除操作時,需要注意鏈結的維護。
綜合上述兩點,要選用的 list.h API 應為 list_for_each_entry_safe

然而,在測試 malloc failure on new 時會發生 Segmentation Fault 的錯誤,
利用 make valgrind 測試得到以下資訊:

+++ TESTING trace trace-13-malloc:
# Test of malloc failure on new
==5507== Invalid read of size 8
==5507==    at 0x10F709: q_free (queue.c:28)
==5507==    by 0x10B1BA: q_quit (qtest.c:1103)
==5507==    by 0x10E3D4: do_quit (console.c:246)
==5507==    by 0x10EF65: finish_cmd (console.c:635)
==5507==    by 0x10D3CF: main (qtest.c:1261)
==5507==  Address 0x8 is not stack'd, malloc'd or (recently) free'd
==5507== 
Segmentation fault occurred.  You dereferenced a NULL or invalid pointer

原來出錯的原因並非 q_new 的實作有誤,而是 q_free 沒有做好 NULL pointer 的檢查

q_insert_head

初步想法如下

bool q_insert_head(struct list_head *head, char *s)
{
    if (!head)
        return false;
    
    element_t *node_element = malloc(sizeof(element_t));
    if (!node_element)
        return false;
    
    node_element->value = malloc((strlen(s) + 1) * sizeof(char));
    if(!node_element->value)
        return false;
    // strcpy copies null character too
    strncpy(node_element->value, s, strlen(s) + 1);

    list_add(&node_element->list, head);

    return true;
}

根據 C99 [7.21.6.3] The strlen function:

The strlen function returns the number of characters that precede the terminating null character.

因此進行記憶體配置時,要在 strlen(s) 後面再加一以存放 null character
利用 list.h 中的 list_add ,可將新建立的 list_head 新增於串列的開頭

q_insert_tail

bool q_insert_tail(struct list_head *head, char *s)
{
    // same as "q_insert_head"
    if (!head)
        return false;
    
-    // ...
-    list_add_tail(&node_element->list, head);
    
-    return true;
+    return q_insert_head(head->prev, s);
}

q_insert_head 大體一致,只是自 list.h 使用的 API 函式換成了 list_add_tail
換個想法,可以直接利用 q_insert_head 完成這項功能,省去重複的程式碼

q_remove_head

element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize)
{
    // if queue is NULL or empty
    if (!head || list_empty(head))
        return NULL;

    element_t *head_element = list_first_entry(head, element_t, list);

    list_del_init(head->next);

    if (sp) {
-        strncpy(sp, head_element->value, bufsize - 1);
-        strcat(sp, "\0");
+        strncpy(sp, head_element->value, bufsize);
+        sp[bufsize - 1] = '\0';
    }

    return head_element;
}

作業說明(一) 一文中已闡述 remove 與 delete 的區別。q_remove_head 的目的是要斷開第一項節點的連結。
因此根據 list.h 的註解說明,使用 list_del 斷開連結。
注意 "\0" 並不是正確使用 null character的方式

q_remove_tail

element_t *q_remove_tail(struct list_head *head, char *sp, size_t bufsize)
{
    // if queue is NULL or empty
    if (!head || list_empty(head))
        return NULL;

-    element_t *tail_element = list_last_entry(head, element_t, list);
-    // same logic as "q_remove_head"
-    // ...
+    return q_remove_head(head->prev->prev, sp, bufsize);
}

q_remove_head 大體一致,只是要移除的元素改成以 list_last_entry 選取
這裡參考了 yanjiew1 的想法,利用 q_remove_head 達成節省 類似程式碼的效果。

不是「節省」,而是減少對相同流程的程式碼進行維護的需要,亦即改進程式碼的可重用程度。

q_size

int q_size(struct list_head *head)
{
    if (!head)
        return 0;

    int len = 0;
    struct list_head *node;

    list_for_each (node, head)
        len++;
    return len;
}

走訪串列,每經過一個節點就將 len 加一

q_delete_mid

原先的想法是建立 start, end 兩個指向 struct list_head 的指標。這兩個指標每次都往反方向走一步,相會的節點即是要移除的中間節點。這樣會需要走遍一整圈的串列的時間。

bool q_delete_mid(struct list_head *head)
{
    if (!head || list_empty(head))
        return false;
    
    // two-pointer approach
    struct list_head *start = head, *end = head;

    do {
        end = end->prev;
        if(start == end)
            break;
        start = start->next;
    } while(start != end);

    // delete the mid node
    list_del(start);
    
    element_t *mid_element = list_entry(start, element_t, list);
    free(mid_element->value);
    free(mid_element);

    return true;
}

q_delete_dup

bool q_delete_dup(struct list_head *head)
{
    // https://leetcode.com/problems/remove-duplicates-from-sorted-list-ii/
    if (!head)
        return false;
    
    // for each list element
    struct list_head *cur = head->next, *del = NULL;
    bool remove_cur;

    while (cur != head) {
        // remove all node that has the same string as cur
        del = cur->next;
        remove_cur = false;
        
        while (del != head && 
            strcmp(list_entry(del, element_t, list)->value,
                   list_entry(cur, element_t, list)->value) == 0) {
            remove_cur = true;
            list_del_init(del);
            free(list_entry(del, element_t, list)->value);
            free(list_entry(del, element_t, list));  
            del = cur->next;
        }

        // assign next first so it can find next node before unlink
        cur = cur->next;
        if (remove_cur){
            del = cur->prev;
            list_del_init(del);
            free(list_entry(del, element_t, list)->value);
            free(list_entry(del, element_t, list));
        }

    }
    return true;
}

一般而言,如果要在一個串列移除重複元素,能想的到較有效率的作法,是利用雜湊表紀錄每個獨一的元素。不過要用 C 語言實作雜湊表的話,需要考量不少因素(如雜湊函數的使用,雜湊表的大小等)
因此先用最基本的方式實作,也就是針對每個元素,向後尋找擁有相同字串的元素。
由於題目假設串列已經過排序,因此若下一個元素與目前元素不同,便能說接下來的元素不會再與目前元素擁有相同字串。

q_reverse

原先想的較複雜,想要依序從 head->next 開始依序和最後一個節點交換連結。
參考 komark06 才發現我把問題複雜化了。原來只要把過程想成從第一個節點開始依序解除連結 (list_del),再重新加入到開頭就可以了 (list_add)。這是佇列 FIFO 的特性。

void q_reverse(struct list_head *head) 
{
    if (!head || list_empty(head))
        return;

    // traverse through list, put to head in order... 
    // so previously last element be the first
    struct list_head *node, *safe;
    list_for_each_safe (node, safe, head) {
        list_move(node, head);
    }

}

q_reverseK

這題較難以直觀的方式想出代碼。
於是參考 leetcode 的想法後,發覺這題可以以類似遞迴的概念處理
換句話說,可以把每個 K 元素的子串列從原來的串列分割出來後,利用前面已經實作好的 q_reverse 做反轉的動作,再把其拼接到原來串列的開頭。

一開始的版本乍看沒有問題,但卻有程式陷入無窮迴圈的問題
原因是反向處理好的 k 個元素的子串列,不斷被插入到原先串列的尾端,導致 end 指標在佇列元素多於2的情況下,永遠無法到達 head

改進的實作方式為,將反轉過的子串列,放在用來走訪串列的指標之前,才不會重複拜訪
同時,使用 list_entry_safe 可以讓原本的程式碼變得更整潔

void q_reverseK(struct list_head *head, int k)
{
    // https://leetcode.com/problems/reverse-nodes-in-k-group/
    if (!head || list_empty(head))
        return;

-    struct list_head *end = head->next;
+    int count = k-1;
+    struct list_head *node, *cut, *next_head = head;
    // dummy list head
    LIST_HEAD(new_head);

    // cut the remaining list out of the k-node list
-    while (end != head) {
-        for(int i=1; i<k && end != head; i++)
-            end = end->next;
-        if(end == head)
-            return;

-        end = end->next;
-        list_cut_position(&new_head, head, end->prev);
-        q_reverse(&new_head);
-        list_splice_tail_init(&new_head, head);
-    }
    
+    list_for_each_safe(node, cut, head) {
+        if (count--)
+            continue;
+
+        count = k-1;
+        list_cut_position(&new_head, next_head, node);
+        q_reverse(&new_head);
+        list_splice_init(&new_head, next_head);
+        next_head = cut->prev;
+    }

}

q_sort

原先的版本會出現 segmentation fault
需要注意 start end 之間的關係

void q_sort(struct list_head *head, bool descend) 
{
    // zero or one element, or NULL
    if (!head || head->next == head->prev)
        return;

    // find middle point (two-pointer)
    struct list_head *start = head, *end = head;
    do {
        start = start->next;
        end = end->prev;
-    } while(start != end && end->next != start);
+    } while(start != end && start->next != end);

    // parition (recursive)
    LIST_HEAD(new_head);
    list_cut_position(&new_head, head, start);
    q_sort(&new_head, descend);
    q_sort(head, descend);
    q_merge_two(head, &new_head, descend);
}

q_merge_two 為另外設立的功能,作用是融合兩個已排序的串列
原先參考 linux 核心風格list_sortmerge,以 indirect pointer 去實作
不過由於原先版本是針對非環狀鏈結串列,這讓改動增加許多困難,徒增許多成本,反而得不償失。
最後利用 list.h 內建功能,將這個操作實作於環狀鏈結串列

你如何確保目前的測試已涵蓋排序演算法最差的狀況?

q_descend

int q_descend(struct list_head *head)
{
    // https://leetcode.com/problems/remove-nodes-from-linked-list/

    if (!head || list_empty(head))
        return 0;

    element_t *current = list_last_entry(head, element_t, list);
    struct list_head *it = head->prev->prev;
    int count = 1;

    while(current->list.prev != head) {
        element_t *it_entry = list_entry(it, element_t, list);
        if (strcmp(current->value, it_entry->value) > 0) {
            // remove lesser value
            it = it->prev;
            list_del(it->next);
            q_release_element(it_entry);
        } else {
            // update current
            current = it_entry;
            count++;
            it = it->prev;
        }
    }

    return count;
}

我們原本的要求,是要移除那些在其右側存在數值大於本身數值的節點。
換句話說,從尾端向開頭走訪節點時,節點值呈現遞增的順序。
因此,我們只需要移除那些不符合從右側走訪而來遞增的節點即可。
若改成從開頭走訪到尾端,將會增加實作的難度。

改進你的漢語表達

至於 q_ascend ,目前的作法是照抄 q_descend 的程式碼
但在判斷條件把「大於」改成了「小於」

是否有可以重複利用程式碼的方案 (正如 q_insert_head q_insert_tail 那樣) ?

q_merge

輸入參數

看到這題給的輸入參數,我有些疑惑。
怎麼用僅僅一個 struct list_head* 表達數個串列的架構呢?

於是我參考 qtest.c 是如何呼叫 q_merge 的 (詳見 do_merge)。觀察到以下程式碼

// line 827 in qtest.c
if (q_size(&chain.head) > 1) {
    chain.size = 1;
    current = list_entry(chain.head.next, queue_contex_t, chain);
    current->size = len;

這段程式碼看起來是在把數個串列合併到 chain.head,也就是 q_merge 的第一個輸入參數

工程人員說話要精準明確,避免「看起來」一類的詞語。

再往下看,可以看到 list_entry(..., element_t, list) 的操作

for (struct list_head *cur_l = current->q->next;
     cur_l != current->q && --len; cur_l = cur_l->next) {
    /* Ensure each element in ascending order */
    element_t *item, *next_item;
    item = list_entry(cur_l, element_t, list);
    next_item = list_entry(cur_l->next, element_t, list);

current 是用來存取每個子串列的架構 (queue_contex_t ), current->q 指向每個子串列的開頭。

實作程式

目前的版本是直接將所有其他子串列以 q_merge_two 串接在第一條串列上
這是直觀但較沒效率的方法,應該參閱 教材 提出的方法重新改進

int q_merge(struct list_head *head, bool descend)
{
    // https://leetcode.com/problems/merge-k-sorted-lists/
    if (!head || list_empty(head))
        return 0;
    
    queue_contex_t *first = list_entry(head->next, queue_contex_t, chain);
    if (head->next == head->prev) 
        return first->size;

    queue_contex_t *second = list_entry(first->chain.next,
                                        queue_contex_t, chain);
    int size = 0;
    int n = q_size(head) - 1;

    while (n--) {
        size = q_merge_two(first->q, second->q, descend);
        list_move_tail(&second->chain, head);
        second = list_entry(first->chain.next, queue_contex_t, chain);
    }

    return size;
} 

實測出現以下問題

+++ TESTING trace trace-03-ops:
# Test of insert_head, insert_tail, remove_head, reverse and merge
ERROR: Attempted to free unallocated block.  Address = 0xdeadbeef
Segmentation fault occurred.  You dereferenced a NULL or invalid pointer

q_merge_two 中需要將第二個參數傳入的 list_head* 初始化,也就是把 list_splice 改成 list_splice_init
並且,原先版本將 second->q 設為 NULL 判斷算法是否完成,這樣會發生 Segmentation fault,需要更換中止條件 (例如使用 q_size,但顯然不是最佳解)


改善 dudect 實作

目前我所撰寫的 q_insert_head q_insert_tail 無法通過第 17 筆測試項目。

csotaku:lab0-c$ ./qtest -f traces/trace-17-complexity.cmd
# Test if time complexity of 
# q_insert_tail, q_insert_head, q_remove_tail, and q_remove_head is constant
ERROR: Probably not constant time or wrong implementation
ERROR: Probably not constant time or wrong implementation
Probably constant time
Probably constant time

考慮到目前以 make valgrind 測試沒有發現記憶體相關的問題,實作邏輯也符合常數時間的規範,在作業說明又提到目前 dudect 的實作存在致命缺陷。推斷是這個缺陷導致無法通過檢測。

研讀 dudect 論文

注意標點符號的運用,超連結該有其對應的說明

Dude, is my code constant time?〉提出一項檢測程式是否為常數複雜度的工具,其特點如下:

  • 檢測方法是基於針對執行時間的 leakage detection test
  • 利用統計方法而非靜態方法分析,使檢測結果不受特定硬體影響

接著,作者說明實作 leakage detection test 的流程

首先,對於兩種不同類型 (class) 的資料,測量密碼學函式的執行時間
其中一種測量方法稱為 "fix-vs-random" 其中一種類型的資料固定為一個常數,另一種則是隨機選取。固定資料的選取可用來觸發「特定」邊緣狀況 (corner-case)

得到測量結果後,在進行統計分析之前,作者談到可以應用的後處理 (post-processing)

  • Cropping : 計時分佈經常正偏向於更大的執行時間,這可能是受到測量工具,或是其他行程的影響。可以將裁切過大的數值,或是制定一個獨立於資料類別並且固定的閾值,並移除大於這個閾值的測量資料
  • Higher-Order preprocessing : 例如 centered product 可以模仿高級別的 DPA attack (這段並不是很明白)

不懂就說不懂,不要假裝自己懂一點。誠實面對自己。

最後,進行統計分析 嘗試駁斥虛無假說 (null hypothesis) 「兩個資料的執行時間分佈是相等的」。作者提出以下幾種可行的檢驗方式:

  • t-test (Welch's t-test): 這項測試檢驗兩者分佈的平均數是否相同。若檢驗失敗表示存在 "first-order timing information leakage"。若是在分析之前已經進行過裁切處理,那麼就不只是針對平均數,而是也包含更高級別的 statistical moment (這邊並不是很理解)

Welch's t-test 是 Student's t-test 的改良版,定義詳見維基百科:
定義

t=ΔX¯sX¯,
Xi¯
: 第 i 筆資料的平均數;
sXi¯
: 標準差,
Ni
是樣本數量

對於不相等的變異數而言,Welch's t-test 比起 Student's t-test 給出更相近於模擬值的 p-value

  • Non-parametric test

實作探討

首先來探討 oreparaz/dudect 中的 percentile 是怎麼運作的,並且如何運用在這次作業上

在 lab0-c 的 deduct 目錄,有三個主要的 C 檔案,

  • constant.c : "measure"函式中,呼叫測試函式後,以 cpucycle 測量執行時間
  • ttest.c : 定義 Welch's t-test 相關數值
  • fixture.c : 執行並統整數據,給出最終判斷

在利用了 Welch t-test 所給出的定義算出最大的 t value 後,將其與事先定義的閾值比較。
在測試足夠多筆資料的情況下:

  • t>500
    表示肯定不是常數時間
  • t>10
    表示可能不是常數時間
  • 其餘狀況,表示可能是常數時間

觀察原作者 github 中的 percentile

static int64_t percentile(int64_t *a_sorted, double which, size_t size) {
  size_t array_position = (size_t)((double)size * (double)which);
  assert(array_position < size);
  return a_sorted[array_position];
}

static void prepare_percentiles(dudect_ctx_t *ctx) {
  qsort(ctx->exec_times, ctx->config->number_measurements, sizeof(int64_t), (int (*)(const void *, const void *))cmp);
  for (size_t i = 0; i < DUDECT_NUMBER_PERCENTILES; i++) {
    ctx->percentiles[i] = percentile(
        ctx->exec_times, 1 - (pow(0.5, 10 * (double)(i + 1) / DUDECT_NUMBER_PERCENTILES)),
        ctx->config->number_measurements);
  }
}

以及相關的一段註解

set different thresholds for cropping measurements.
the exponential tendency is meant to approximately match
the measurements distribution, but there's not more science
than that.

可以推敲出,這段程式碼的主要功能是做 Cropping 後處理

參考以上程式碼改動,並做出對應調整後,這次再一次進行 make test 測試
可以發現由於上述程式將極端值去除,使得常數測試很快就通過了

確認程式碼和論文描述是否存在出入?


探討 lib/lib_sort.c

在 merge sort 中,比較次數以以下式子表示

n×log2nK×n+O(1)

n×log2n 已經是目前理論的極限值,所以優化的方向會注重於改進 linear term (
K×n
)

研讀論文 (commit b5c56e)

需要研讀論文並指出與核心程式碼的異同

為什麼會說 bottom-up 比起 top-down merge sort 對 cahce 更加友善呢?

簡單而言,bottom-up 的過程就是一直合併,cache 元素參照的機會更大
top-down 涉及 parition,這麼做會導致元素不斷從 cache 進出,而導致 cache thrashing 的問題

所謂 cache thrashing 指的是,由於 cache 的容量不足應付存取需求,導致接下來一直出現 cache miss 的狀況
案例
考慮一個 LRU cache,假設可存放四個元素,
接著考慮以下存取 pattern : A,B,C,D,E,A,B,C,D,E
初始配置: A,B,C,D
要求 A: A,B,C,D
要求 B: B,A,C,D
要求 C: C,B,A,D
要求 D: D,C,B,A
要求 E: E,D,C,B (miss)
要求 A: A,E,D,C (miss)

接下來每個元素的存取都會導致 cache miss

  1. top-down merge sort
  • 我實作的版本 (q_merge)
  • 以遞迴將整個陣列拆成一個個元素 (稱為 run) 後,重新將這些 run 合併。
  • 對於比較次數,在最差以及平均狀況下有最佳的 K
  • 需要先知道輸入大小,但這麼做會導致 cache blocking (要將整個陣列讀入)
Image Not Showing Possible Reasons
  • The image was uploaded to a note which you don't have access to
  • The note which the image was originally uploaded to has been deleted
Learn More →

節錄自 M01: lab0

  1. bottom-up
  • 將整個陣列直接視為具有 n 個 run 的陣列,每次的 run 長度由 1 開始融合,依序乘以二,直到涵蓋至整個陣列為止
  • 花費的平均比較次數最多

接著看到 commit 比較的另一種變形

  1. queue-merge sort
  • breadth-first multi-pass structure
  • 將每個元素視為佇列,利用 linear merge 將前兩個佇列合併後,放到串列尾端,直到串列只剩下一個元素
  • 可以避免 commit 提及的 worst case 合併的子陣列彼此長度差距太大

queue-mergesort

節錄自 The Cost Distribution of Queue-Mergesort, Optimal Mergesorts, and Power-of-2 Rules

Linux 核心採取的策略有所不同:

  • 以 depth-first order 代替 bottom-up: 當兩個一樣大小的子陣列出現時,立刻合併,盡可能 fit L1 cache (除了最後的合併)
    • 如此做的成本將會反饋到最終的合併。commit 中也提到當陣列數量略為大於 2 的冪時 (e.g. 1028,使得在合併時需要與
      45
      個陣列元素比較),將會增加成本
  • 為了規避上述最壞情況,在合併時最不平衡的狀況下,所有 merge pass 至多來到 1:2 的情況
    • 具體而言,程式會就上述合併策略進行,直到確定剩下
      2n
      的元素尚未合併

引入至 lab0-c

比較自己和 linux 核心實作 merge sort 效能落差

程式碼解讀

linux 核心的 list_sort 與 lab0-c q_sort 不同,沒有提供 descend 的參數輸入; 相對的,允許指定 list_cmp_func_t 函式指標,於是呼叫端可指定比較用的函式,較 q_sort 實作靈活

「融合」是否適合在資料排序的場景?注意用語。

另外,由於 list_sort 在排序前會將循環串列拆成單向串列,這麼一來,就不能使用 list.h 的內建函式

  • 當剩下
    2n
    的元素就不合併 (merge) ,以達成最不平衡也有 2:1 的 merge pass

list_sort 使用 pending 儲存已排序的子陣列 (大小為 2 的冪),彼此使用原先陣列 (list 參數) 的 prev 指標聯繫。這是很精妙的設計,這麼一來就不需要再額外宣告額外指標儲存

每回合中,一旦 pending 中所剩餘的元素數量等同於這對子陣列的元素數量,每對位於 pending 的子陣列會進行合併

換句話說,count + 1 若為

2k 則不合併,反之亦然
e.g.
count = 0, count + 1 =
20
> no merge
count = 1, count + 1 =
21
> no merge
count = 2, count + 1 =
21+1
> merge!

當 count + 1 是 2 的冪時,count 的二進位應當是由連續的 1 構成的
一個精妙判斷的方式是尋找 least significant bit

/* Find the least-significant clear bit in count */
for (bits = count; bits & 1; bits >>= 1)
    tail = &(*tail)->prev;

如果剩下的 bits 等於 0 就表示 count + 1 是二的冪

do {
    size_t bits;
    struct list_head **tail = &pending;

    /* Find the least-significant clear bit in count */
    for (bits = count; bits & 1; bits >>= 1)
        tail = &(*tail)->prev;
    /* Do the indicated merge */
    if (likely(bits)) {
        struct list_head *a = *tail, *b = a->prev;

        a = merge(priv, cmp, b, a);
        /* Install the merged result in place of the inputs */
        a->prev = b->prev;
        *tail = a;
    }

    /* Move one element from input list to pending */
    list->prev = pending;
    pending = list;
    list = list->next;
    pending->next = NULL;
    count++;
} while (list);

(這裡有個細節,合併時要注意參數順序 a = merge(priv, cmp, b, a); 否則會造成不穩定排序的狀況,原版程式的變數取名為 a, b 實在是讓人困惑)

首先 pending 指向 NULL (沒有元素),在 while 迴圈中 tail 先移動至待合併的節點。
當 if 判定要合併時,選定倒數兩個子串列進行合併 (合併結果儲存於 a )
而後,a->prev = b->prev 的用意是 b 之前的子串列,串接到已排序好的 a 之前
不妨這樣想:

![graphviz](https://hackmd.io/_uploads/BJhDhX10a.svg)






G



tail

tail



node6

3



tail->node6





node3

4



node4

2



node3->node4


prev



node7

2  4



node3->node7





node5

5



node4->node5


prev



node4->node7





node5->node6


prev



node8

3  5



node5->node8





node6->node8





使用 Graphviz 製圖,並嵌入到 HackMD 筆記中。

假設 a 指向 3 這個節點,b 指向 5,當 a b 合併時,就視為一個新的子串列,所以 a->prev 也要指向 b->prev,才能指向下個尚未合併的子串列
注意合併完後,也需要更新 tail 到合併完的子串列

最後,將 list 指向的節點「移動」至 pending

這裡出現一個陌生的巨集: likely
於 linux kernel 2.6 後被定義在 include/linux/compiler.h

#define likely(x)	__builtin_expect(!!(x), 1)            
#define unlikely(x)	__builtin_expect(!!(x), 0)

避免濫用「通過」,否則你如何區分 pass 的翻譯呢?

通過 藉由這兩個巨集,可以讓編譯器編譯組合語言時做最佳化 (likely 表示較可能發生,例如 cache spatial locality 的考量),看來要學 Linux 核心也應當學好編譯器理論

  1. 避免過多的中英文混用,macro 的翻譯「巨集」在台灣的出版刊物存在數十年。
  2. 翻閱 GCC 手冊和計算機組織/結構的教科書,理解 branch prediction 的議題,如此才能解釋上方的 GCC extension
  • TODO: Profile-guided optimization 檢閱
  • gcc 內建函式:

最後用了看似有些複雜的手段,進行最後合併
原因就是要在最壞情況下保持 2:1 的大小比例

list = pending;
pending = pending->prev;
for (;;) {
    struct list_head *next = pending->prev;

    if (!next)
        break;
    list = merge(priv, cmp, pending, list);
    pending = next;
}
/* The final merge, rebuilding prev links */
merge_final(priv, cmp, head, pending, list);

merge_final 如果只是合併加恢復 prev 連結,為什麼不直接用 merge 加上恢復連結功能的程式就好?

  • Depth-first order

那作者宣稱的 depth-first order 又和上述提及的 mergesort 變形有何異同?為何說是 cache friendly?

  • traverse (動詞) 和 traversal (名詞)

根據 Dictionary.com解釋: (作為及物動詞和不及物動詞都有類似的意思,以下列出作為及物動詞的寓意)

  • to pass or move over, along, or through.
  • to go to and fro over or along.

其實這意思很好懂,就像我們「走過」/「穿越」校園一般,於是 traverse a linked list 就會是「(用某種手段) 存取多個鏈結串列的節點」,但這裡卻沒有必要「所有」的範圍:英語的 "move over/through" 用於某個區域時,根本沒有這樣的隱含意義。如果將 traverse 翻譯為「遍歷」,就會導致「超譯」,也就是跳脫「直譯」和「意譯」。

當我們回頭看 "traverse" 所在的技術描述內容,例如 "traverse every node",若翻譯為「遍歷每個節點」,那麼既然都「遍」(意即「全面」、「到處」),又何來「每個」節點呢?於是,合理的翻譯應改為「逐一走訪每個節點」 —— 差異在哪?在 "traverse every node" 的應用場景中,可能是我們嘗試在鏈結串列尋找特定的節點內含的資料,一旦找到就停止,或者我們要偵測給定的鏈結串列是否包含環狀 (circular) 結構 ,並沒有真的要「遍」(到處/全面)「歷」(意即「經過」) 每個節點。在我們的用語中,要區分「意圖」(intention) 和「實際作用」(reaction),濫用「遍歷」會使得語意不清,從而難以推測英語原文的訴求。

還有個更重要的原因是,「遍歷」這詞已在理工領域存在,且廣泛使用,即「遍歷理論」(Ergodic theory),後者是研究具有不變測度 (invariant measure) 的動力系統及其相關問題的一個數學分支。 遍歷理論研究遍歷變換,由試圖證明統計物理中的遍歷假設 (Ergodic hypothesis) 演進而來。

在統計學中,若單一個物理系統在不同時間內重複相同的實驗 (如丟擲一枚硬幣),其取樣數據所得的統計結果 (如硬幣出現正面的機率) 和極多個完全相同的物理系統的系集 (如丟極多個相同的硬幣) 在同時作相同的實驗取樣數據的統計結果假設為相同時,此種假設即稱為「遍歷性假設」或「遍歷假設」。基於這個假設,對時間平均的統計方式及結果,便可由對系集的平均及統計方式得到。在一般物理系統中,尤其在統計力學範圖中,均採用此遍歷性假設為基本的假設。在流體力學中對亂流的實驗分析,亦是採用此假設。

遍歷 (Ergodic) 源於以下二個希臘詞:

  • ergon (對應英語的 work)
  • odos (對應英語的 path 或 way)

最初這是由奧地利物理學家波茲曼 (Ludwig Boltzmann) 於統計力學領域 (statistical mechanics) 提出的概念,其一廣為人知的結果是:在經過長時間後,時間平均將會趨近空間平均。此事實在動力系統扮演極重要的角色,在隨機分析領域亦然。

因此,若貿然將 traverse 翻譯為「遍歷」,一來語意不清,二來增加和理工多項領域專業人員的溝通成本,實在得不償失。

資訊科技詞彙翻譯

經過上方的推導可以明白,在遍歷 逐一走訪每個節點的過程中,會不斷將節點加入 pending,直到達成一定條件才會開始合併 (第一個在 do while 迴圈合併的條件是當 count = 2)
這時 pending 已經有了三個節點 (想像成走到一定的「深度」) 才開始合併,而不是像 bottom-up 那樣,每 K 個節點就進行合併

於 qtest.c 加入命令

以 ADD_COMMAND 加入實作的 list_sort

ADD_COMMAND(listsort, "Sort queue with Linux kernel-style merge sort", "");

藉由priv 參數,回傳排序過程中的比較次數

  • 註: 若要還原檔案至前一個 commit 的版本,可輸入以下命令:
  • git checkout <commit-hash> -- /file/to/restore

嘗試改進針對鏈結串列排序實作的效能

詳參: 專題

注意到 merge_final 有段註解:

If the merge is highly unbalanced (e.g. the input is already sorted), this loop may run many iterations.

而專題提及的 Timsort,善於處理已經排序過的狀況,並以插入排序處理小串列

list_sort 原先將一個個節點放入 pending 串列,根據數量決定是否合併;
在 Timsort 中,則不斷尋找 run 並合併
這裡的 run 就是已經排序好 (或是反向排序好) 的子串列

原作中用到一個專門的結構體儲存 find_run 的回傳值:

struct pair {
	struct list_head *head;
	struct list_head *next;
};

其中 head 紀錄這個 run 的開頭,next 紀錄下個 run 的起始點,
雖說只為一個函式宣告專用的結構體有些不泛用,但這也可以讓主程式較為方便的逐一拜訪各個 run

一個我想到有別於專題的 Timsort 改進是 minrun 的實現
因為當 minrun 略大於 2 的冪時,效率會特別低,因此 Timsort 作者提出一種選取 minrun 的簡潔方式

static size_t find_minrun (struct list_head *head) 
{
    size_t len = list_length(head);

    // find first 6 bit & add up remain bits
    size_t minrun = 0;

    for(;len >> 6; len >>=1)
        minrun += (len & 1);
    
    minrun += len;
    
    return minrun;
}

若當前 run 數量不超過 minrun 時,以二元插入法將節點插入已經排序好的 run 中
( 改編自 Timsort )

static struct pair find_run(void *priv,
                            struct list_head *list,
                            list_cmp_func_t cmp, 
                            size_t minrun)
{    
        // ...
        /* binary insert if len < minrun */
        while (DO_MINRUN && next && len < minrun) {
            size_t idx = binary_insert(priv, head, next, cmp, len);
            struct list_head *inserted = next;
            next = inserted->next;

            if(!idx) {
                inserted->next = head;
                head = inserted;
            } else {
                struct list_head *itr = head;
                while(idx-- > 1)
                    itr = itr->next;
                inserted->next = itr->next;
                itr->next = inserted;
            }
            len++;
        }
    // ...
}

初步以隨機資料 1000 筆測試,發現相對原版的 Timsort,加入 minrun 機制確實使比較次數下降約 700 次

不過,加入插入的動作會不會影響穩定度呢?

效能分析實驗

應對不同的資料分佈進行實驗,具體參考 listsort 說明文件

測試資料

分析效能之前,先要準備足以涵蓋多種情境的輸入資料。

參閱 listsort 說明文件,作者提出幾種資料分佈

代號 說明
*sort 隨機資料
\sort 降序資料
/sort 升序資料
3sort 升序,隨機挑選其中三組交換
+sort 升序,放置十個隨機元素於尾端
%sort 升序,隨機以隨機數值替換 1% 的元素
~sort 包含許多重複數值的元素
=sort 一樣數值的元素
!sort 最糟狀況

參閱 sortperf.py,可得知設計細節,例如「最糟狀況」是針對使用 meidan-of-three 的快速排序設計

lg(x) 為以 2 為底的對數函數,則
lg(n!)
是資訊理論中 comparison-based 排序法比較數量的極限值 (n 是測試資料的數量)

TODO: 設計專門的測試程式,支援上述資料分佈
原先希望整合於 qtest ,但考慮 qtest 是用於測試各項佇列操作,改為撰寫額外測試檔案

排序程式用的節點數值以整數為主,但 lab0-c 節點的數值是字串,這裡考慮固定長度的字串,但構成的字元是隨機 ASCII code

評估效能

測量程式

這部份,我將藉由評估效能的指標, 比較原先的 top-down, Linux-kernel list sort,專題提及的 Timsort, 以及 minrun Timsort ,針對不同資料分佈之效能差異

  • 執行時間
  • 比較次數

參閱 共筆,了解論文測量 K 的機制

  • cache miss ratio

實作 shuffle 命令

項目說明

實務分析

Fisher-Yates shuffle 為一種將有限序列洗牌的迭代算法

這裡我遇到了一個 bug ,以下的交換動作會導致錯誤
其中 node, safe 是 list_for_each_safe 的迭代器,swap_node 是被選中要交換的節點

list_move(node, swap_node);
list_move(swap_node, safe);    

事實上,由於交換節點是隨機指定,因此有可能遇到兩個交換節點是同一個的狀況
這時候進行交換就會導致錯誤

此外,需要考慮 b->prev == a 特殊狀況
要好好思考list_movehead 參數,確保正確交換

最後的 swap() 定義如下

完整程式碼

static inline void swap(struct list_head *a, struct list_head *b)
{
    if (a == b)
        return;

    //  b --> a
    struct list_head *a_prev = a->prev, *b_prev = b->prev;

    if (b != a_prev)
        list_move(b, a_prev);
    if (a != b_prev)
        list_move(a, b_prev);
}

實作完成後,以 ADD_COMMAND 巨集加入 qtest.c

ADD_COMMAND(shuffle, "Shuffle the queue using Fisher-Yates Shuffle", "");

統計分析

測試程式

Python 提供 subprocess.run 可以以指定的參數執行 qtest 執行檔,並且可以透過 CompletedProcess.stdout 取得執行結果
這麼一來,可以很方便的進行後續的統計分析

我將含有四個元素的串列洗牌 1,000,000 次,自由度為

4!1=23
查表 得知在顯著水準 (significance level) 定義為 0.05 的狀況下,若要不拒絕虛無假說,對應的卡方分佈最大值為 35.172
以下為統計結果

Test time: 1000000
Expectation: 41666
Chi-Squared Sum: 20.82378918062689
Probabily uniform

這裡有個疑惑點,通常使用 rand 時應該要加入 srand 函式確保每次的隨機分佈都不一樣
然而當我在洗牌函式加入 srand(time(NULL)) 後,整體分佈呈現不平均的狀況 (或者說,卡方分佈值遠大於預期最大值)

Test time: 1000000
Expectation: 41666
Chi-Squared Sum: 1061730.697691163
Not Uniform

整合網頁伺服器

項目說明

favicon.ico 處理

以瀏覽器寄送要求,例如 http://127.0.0.1:9999/new,除了預期的 new 命令,還會額外要求 favicon.ico
一種方式為加入額外的 html link tag,如:

<link rel="shortcut icon" href="#">

這麼一來就不能用瀏覽器,而要自己撰寫程式寄送要求了

以 select 處理多個 FD

select, poll

根據 linenoise Github 的說明,linenoise 是擁有處理命令列 (類似 readline) 功能的套件,包括 tab-complete, line hints 等

而根據項目說明,一旦網頁伺服器啟動,由於 linenoise 是以 read 等待使用者輸入。當 read 阻塞時無法接收來自網頁命令

實際前往 console.c,看到當網頁伺服器啟動時,linenoise 功能被關閉了

web_fd = web_open(port);
if (web_fd > 0) {
    printf("listen on port %d, fd is %d\n", port, web_fd);
    use_linenoise = false;

若 linenoise 正確啟動,許多命令集的功能 (e.g. tab complete) 可以正常使用
在以上的狀況,當輸入 web 命令後,可以觀察到這些功能都無法使用了
若是故意將 use_linenoise = false; 去除,便會發現由於 read 阻塞了輸入流,導致客戶端寄送的命令無法被接收

select 以 fd_set 定義檔案描述子 文件描述符 集合,我們的目標是利用其同時處理來自 stdin 以及 socket 的輸入

第一步是要找到對應的描述符,line_edit() 中只有 stdin 的描述符,還需要引入 socket 的 listenfd

web.c 中的 web_open 會回傳 listenfd,而 console.c 中呼叫 web_open 的函式是 do_web,listenfd 被儲存於全域變數 web_fd

觀察 console.c 的 cmd_select ,其實已經對於這些描述符做 select

再來,則是以 FD_SET 設定檔案描述子 表示符 ???,以 cmd_select 為例:

FD_ZERO(readfds);
FD_SET(infd, readfds);
// ...
FD_SET(web_fd, readfds);

可以看到設定了來自 STDIN 以及 socket 的讀取表示符

參考 var-x 共筆,得知 linenoise 啟動的機制是以下程式碼:

char *cmdline = linenoise(prompt);

TODO:
cmd_select 中原先 readline() 可以很順暢的接收兩端輸入
將其替換為上述程式碼
則會發現 linenoise 套件可以間隔使用,也就是說 STDIN 和 socket 輸入會輪流讀取,待輪到 STDIN 的時候才能恢復 linenoise (仍然有阻塞的問題)
是否有辦法解決阻塞的問題,又能使用 linenoise?

雖然提示指出可以用 select 系統呼叫處理,但考慮到 man page 點出:

select() can monitor only file descriptors numbers that
are less than FD_SETSIZE (1024)—an unreasonably low limit for
many modern applications—and this limitation will not change.
All modern applications should instead use poll(2) or epoll(7),
which do not suffer this limitation.

因此未來考慮使用功能相似,但能容納更多文件描述符poll 改善

file descriptor 是「檔案描述子」,而非「文件」(document)

整合 tic-tac-toe

項目 專案說明

project 的翻譯是「專案」,而非「項目」(item)

整合標頭檔

list.h 找尋對應的 hlist 相關函式,包括初始化,新增刪除節點,搜尋等
注意到標頭檔中對於 hlist_unhashed 函式,還多了另一項 "lockless" 版本的實作:

static inline int hlist_unhashed_lockless(const struct hlist_node *h)
{
	return !READ_ONCE(h->pprev);
}

原始標頭檔出現許多 const qualifier 應用,根據 C99 規格書 (6.7.3 Type Qualifier) 的描述:

If an attempt is made to modify an object defined with a const-qualified type through use
of an lvalue with non-const-qualified type, the behavior is undefined.

hlist_unhashed 函式:

static inline int hlist_unhashed(const struct hlist_node *h)
{
	return !h->pprev;
}

這時改變 h 指向的 struct hlist_node 是合法的,但不能藉由 *h 本身修改數值,會造成 Undefined behavior

新增 ttt 命令

專案提供了 negamax, MCTS 等算法。先嘗試將 negamax 引入 lab0-c

首先看到有點複雜的 #define 巨集

#include "game.h"
#ifdef USE_RL
#include "agents/reinforcement_learning.h"
#elif defined(USE_MCTS)
#include "agents/mcts.h"
#else
#include "agents/negamax.h"
#endif

"USE_RL", "USE_MCTS" 是什麼?我並未在 ttt 目錄標頭檔找到它們的定義,程式怎麼知道我現在要用哪種算法,並依此決定要 include 哪些標頭檔?

隨後我於 Makefile 觀察到以下命令:

RL_CFLAGS := $(CFLAGS) -D USE_RL
MCTS_CFLAGS := $(CFLAGS) -D USE_MCTS

翻閱 gcc 手冊 找到 -D flag 對應說明 (3.13 Options Controlling the Preprocessor) :

-D name
Predefine name as a macro, with definition 1.

原來透過以上 makefile 指令,就能提前定義巨集並設為 1

我的作法是,將原先的 ttt 目錄整個複製到 lab0-c 目錄後,保留需要的檔案:

  • agents/mcts.[ch]: 提供 MCTS 算法需要的函式
    • MCTS 是一種啟發式搜尋 (heuristic search),藉由增長搜尋樹找到最佳動作
    • 利用 UCB formula 處理探索利用困境 (exploration/exploitation problem)
    • 經歷四個階段尋找當前最好的動作
  • agents/negamax, agents/reinforecment_learning: 提供另外兩種算法需要的函式

並根據以上改動,調整 Makefile

  • 強化學習 (reinforcement learning)

我對於強化學習算法的過程很有興趣,因此紀錄算法原理
wiki

RL 是一種不同於典型機器學習的算法,不用標籤,而是透過獎勵促使 agent 於互動環境學習

Image Not Showing Possible Reasons
  • The image was uploaded to a note which you don't have access to
  • The note which the image was originally uploaded to has been deleted
Learn More →

RL 使用馬可夫決策 (Markov decision process) 作為決策模型,以下為其元素:

  • 環境 agent 互動的地方,以及狀態 (state) 集合
    S
  • 獎賞: 環境中的回饋
  • agent 動作的集合
    A
    (如: 前進方向)
  • Pa(s,s)=P(St+1=s|St=s,At=a)
    : probability of transition from state
    s
    to state
    s
    under action
    a
  • Ra(s,s)
    : 經由行動 a 完成狀態 s 到 s' 轉移後獲得獎賞

顯然,強化學習的目標是最大化獎賞,但如何得知模型參數呢?

在強化學習中,會假設代理 (agent) 可以直接觀察環境狀態 (若真,此問題稱為具有 full observability)

定點數實作

說明

原先的 ttt 專案中,只要是計算分數的部份都會用到浮點數。在 Linux 核心中,使用浮點數不僅造成執行時間增加,還需要對 FPU 暫存器進行額外處理

  • 定義

什麼是定點數 (Fixed-point arithmetic)? 和浮點數有何不同? 為什麼比較不會造成負擔?
簡單而言,定點數用以表示整數以及小數時用的 bit 數量是固定的,不論數值大小
而浮點數儲存的 bit 是用以表示數值 (significand or mantissa) 以及指數 (exponent)
所以,定點數本質其實與整數相同,只是一部分的 bit 用來表示小數,也就不需要額外的資源 (FPU) 支援了

考慮一個 8-bit word

010101102 ,以十進位整數而言是 86 ,但考慮小數點安置在第四到第五位之間,就變成
0101.0110
,也就是 5.375

在定點數中,為了明確表示各有幾個 bit 表示整數與小數,定義 Q-format
假設用 m 個 bit 表示整數,n 個 bit 表示小數,則我們說這個數的 Q-format 是 Qm.n 或 Qn (只紀錄小數)

  • 實作

完整程式碼

參照 Linux 核心的浮點數運算,我以 unsigned long 代表定點數,並且以 LOAD_INT, LOAD_FRAC (取小數前兩位) 進行整數小數的擷取
運用 typedef 可以更清楚表達定點數的型態

#define FSHIFT 4  /* nr of bits of precision */
#define FIXED_1 (1 << FSHIFT) /* 1.0 as fixed point */

#define LOAD_INT(x) ((x) >> FSHIFT)
/* keep 2 digits of fractions */
#define LOAD_FRAC(x) LOAD_INT(((x) & (FIXED_1 - 1)) * 100) 

typedef long fixed_point;

為了引入定點數,我們需要對以下浮點數運算進行替換:

  • 四則運算
  • log
  • sqrt

commit fc4a2c6

加減乘法十分易懂,但除法稍微複雜些
由於定點數是整數型態,若除數與除數直接相除,小數位會被除去
因此我引進 scale_factor ,讓被除數先左移固定位數後再相除,這樣才能保留部份小數

fixed_point_t div (fixed_point_t a, fixed_point_t b)
{
    fixed_point_t result = (a << SCALE_FACTOR) / b;
    result = LOAD_FIXED(result);
    result >>= SCALE_FACTOR;
    return result;
}

至於 log, sqrt,要如何轉換呢?我的初步想法是大一微積分學到的泰勒展開
由於

f(x)=x 的泰勒展開在 x = 0 時不存在,考慮
f(x)=1+x
的泰勒展開
T(x)

T(x)=k=0f(k)(a)k!(xa)k

但是算式本身有些複雜,或許存在更快的算法,如 var-x 共筆 提到的逼近法那樣
此逼近法特點在於不像牛頓逼近法,不需要用到除法,只做加法、位元運算,因此較有效率

fixed_point_t sqrt_f(fixed_point_t num)
{
    fixed_point_t res = 0L;
    /* i is intialized as the index of first bit 1 in num */
    /* t starts from num, divided by two each time */
    for(int i = 31 - __builtin_clz(num | 1) ; i >= 0; i--) {
        fixed_point_t t = 1UL << i;
        if (multi_f(res + t, res + t) <= num )
            res += t;
    }
    return res;
}

至於 log ,雖然也可以使用泰勒展開,但一個比較簡潔的方式是使用二分對數法

loge(x)=log2(x)log2(e)
分母的部份是常數,log2 也能用簡單的邏輯計算
實作參考論文 A Fast Binary Logarithm Algorithm (C. S. Turner)

  • TODO: 以 Python 或 gnuplot 計換 fixed-point 函式與一般浮點數之誤差

首先需要替換的函式為 uct_score,其中 UCB formula 的部份涉及 log, sqrt 這種本身是浮點數的操作

wini+Clg(t)ni
其中
wi
為選擇這個節點後,經過第 i 個動作後獲勝的次數,
ni
是經過第 i 個動作後模擬的次數,C 是探索參數 (設為
2
),
t
是模擬的總數量

完整的修改程式碼可參 commit

commit 2ed890d

電腦 v.s. 電腦

引入新選項

commit a0d07ab

加入選項 "ai_vs_ai" ,達成電腦與電腦間對決

引入 coroutine

項目說明

做這個部份時有些疑惑,原先以 while loop 輪流啟動兩個 AI 對手進行對奕是比較直覺的方式,引入 coroutine 這種多工機制改寫,好處在哪裡呢?

  • 定義

什麼是 coroutine?根據 wikipedia 的定義

Coroutines are computer program components that allow execution to be suspended and resumed, generalizing subroutines for cooperative multitasking.
Coroutines are well-suited for implementing familiar program components such as cooperative tasks, exceptions, event loops, iterators, infinite lists and pipes.

簡略而言,就是執行階段可以被暫停,隨後再回復到先前執行階段的電腦程式

想像僅有單一 CPU 的電腦,需要執行多項任務。於是便有先保存當前任務的 context ,切換到另一項任務的機制

而 coroutine 又可分為 stackful,stackless 兩種模式 ( 這裡的 stack 應是指函式,subroutine 的 call stack,讓當前函式知道呼叫者是哪個函式 )。stackful 需要配置額外空間儲存堆疊,但允許程式於任何位置暫停 coroutine,範例 中提到 Lua, Wren 兩種語言作為案例 ; 而 Javascript 的 async/await 則是經典的 stackless coroutine 案例

Coroutine 與 Thread 十分相近,不同的點是 coroutine 允許任務按照一種可變化的順序執行,換句話說 coroutine 提供 concurrency,但不具備平行化的特性

  • 實作

參考: coro

1. setjmp/longjmp

說明文件以 setjmp, longjmp 達成協同式多工

setjmp 為儲存進入點資訊於 jmp_buf 架構體,longjmp 則會暫停當下執行階段,並跳到儲存的進入點,範例可參考 Stackoverflow。在linux 核心實務中,可作為 context switch,也就是任務交換的手段

2. context switch

commit

針對需要排程的任務,定義 task 架構:

struct task {
    jmp_buf env;
    struct list_head list;
    int n; /* how many time yield control */
    int i; /* iterator */
    char task_name[16];
};

struct arg {
    int n;
    int i;
    char *task_name;
};

其中 task->env 用以存放 setjmp 的資訊,task->list 用來表示每個任務與 tasklist 的連結。tasklist 是個用來存放即將執行之任務的雙向佇列。
n, i 是執行任務用到的參數,因任務性質不同

接著透過以下兩個函式進行任務佇列的管理:

/* FIFO order */
static void task_add(struct task *task)
{
    list_add_tail(&task->list, &tasklist);
}

/* switch to the first task */
static void task_switch()
{
    if (!list_empty(&tasklist)) {
        struct task *t = list_first_entry(&tasklist, struct task, list);
        list_del(&t->list);
        curr_task = t;
        longjmp(t->env, 1);
    }
}

任務選取採取 FIFO ,也就是新增任務 (task_add) 會放到佇列尾端,存取任務從開頭開始
task_switch 函式中,會從 tasklist 當中選取下個任務
這裡使用全域變數 curr_task 標示當前任務指標,並透過 longjmp 切換到對應的進入點 。這裡要再參考每個任務的呼叫函式:

void task0(void *arg)
{
    /* initialize task with arg */
    // ...

    if (setjmp(task->env) == 0) {
        task_add(task);
        longjmp(sched, 1);
    }

    task = curr_task;

    for (; task->i < task->n; task->i++) {
        if (setjmp(task->env) == 0) {
            // task
            // ...
            task_add(task);
            task_switch();
        }
        task = curr_task;
        printf("%s: resume\n", task->task_name);
    }

    printf("%s: complete\n", task->task_name);
    longjmp(sched, 1);
}

但每個任務函式會呼叫兩次 setjmp: 第一次是在初始化完需要參數之後,在新的任務加入排程後,呼叫 longjmp(sched, 1) 讓新任務可以加入排程

void schedule(void)
{
    static int i = 0; // what for ?

    setjmp(sched);

    while (ntasks-- > 0) {
        struct arg arg_i = args[i];
        tasks[i++](&arg_i);
        printf("Never reached\n"); // ?
    }

    task_switch();
}
  • 註: schedule() 在 ntask = 0 時,需呼叫一次 task_switch(),確保當前任務執行完畢後,可以繼續執行尚未執行完的任務

第一次呼叫 setjmp 的狀況應該會發生在 schedule 的迴圈第一次迭代中,而第二次則會發生在 for 迴圈,也就是開始執行任務時 (以此例而言,費氏數列),假設有兩個 task (i, i+1),所以流程是 (由上而下) :

schedule()                    tasks[i]                    task_switch()
------------------------------------------------------------------------
setjmp(sched)    
tasks[i](args[i])       -->   
                              setjmp(tasks[i]->env)
                        <--   longjmp(sched, 1)
tasks[i+1](args[i+1])   -->   
                              setjmp(tasks[i+1]->env)
                        <--   longjmp(sched, 1)
tasks[i](args[i])       -->   
                              task_switch()           -->
                                                          longjmp(t->env, 1)

確實是遇到類似 goto 帶來的困擾,那就是執行流程變得很亂
觀察執行結果,也能驗證我的流程猜想:

Task 0: n = 70
Task 1: n = 70
Task 2: n = 70
Task 0 fib(0) = 0
Task 1 fib(1) = 1
Task 2 .(0)
Task 0: resume
Task 0 fib(2) = 1
Task 1: resume
Task 1 fib(3) = 2

另外,位於每個任務函式的這行 task = curr_task;,看似多餘,但少了這行會導致 task 這個變數無法切換到當前任務

接著嘗試將這些流程套用到 ttt 中

在實作過程中,偶然發現原有實作會導致 Segmentation fault
觀察 gdb 後,發現執行數次 ttt 指令後,下列程式碼中的 tasks 會有越界讀取的問題

void schedule(void)
{
    static int i = 0;  // <--

    setjmp(sched);

    while (ntasks-- > 0) {
        struct arg arg_i = args[i];
        tasks[i++](&arg_i); // invalid read
    }

    task_switch();
}

觀察後發現,由於 i 是靜態變數,其生命週期作用在整個程式範圍內,因此其數值會被保留,並不會發生預期的初始化為 0 的結果