changed 10 months ago
Published Linked with GitHub

2024q1 Homework1 (lab0)

contributed by < jujuegg >

Reviewed by kkkkk1109

  • 建議佇列實作中可以附上 github commit,如章節 改善 web 與 linenoise 之衝突問題研讀論文〈Dude, is my code constant time?〉,以便後續和他人討論時可以快速看見程式碼更動。

Reviewed by Kuanch

  • lib/list_sort.c 使用 perf 應該能給出詳細的數據,我有遇到使用 WSL 或 VMWare 是無法完整給出數據的,最後以原生 Linux 機器得到,建議另寫測試,不要使用 qtest 介面,可以給出遭遇的問題;或至少可以使用 dudect/cpucycles.h 計算 cycle 數,yeh-sudo 蒐集了許多資料點後繪製出圖片,在此處也是很好的分析方法
  • 研讀論文〈Dude, is my code constant time?〉傾向至少將程式碼一一和文章對應,譬如它是如何計算執行時間、怎麼做後處理、如何計算統計量等等,其中有些具有可議之處,亦可作為"論文和程式碼實作的出入之處"之補充
  • Git 部分
    有看到僅有 5 個 commit,有些 commit 顯示 null,建議 可以提供 git loggit tree 進一步改進分析 git 指令 命令的使用

既然發現學員表現不好,那你直接做一次你認為好的做法,這樣才有程式碼審查的效果。 :notes: jserv

開發環境

$ gcc --version
gcc (Ubuntu 11.4.0-1ubuntu1~22.04) 11.4.0

$ lscpu
Architecture:            x86_64
  CPU op-mode(s):        32-bit, 64-bit
  Address sizes:         45 bits physical, 48 bits virtual
  Byte Order:            Little Endian
CPU(s):                  3
  On-line CPU(s) list:   0-2
Vendor ID:               GenuineIntel
  Model name:            Intel(R) Core(TM) i7-9700 CPU @ 3.00GHz
    CPU family:          6
    Model:               158
    Thread(s) per core:  1
    Core(s) per socket:  3
    Socket(s):           1
    Stepping:            13
    BogoMIPS:            5999.99

實作指定佇列操作

在開始實作之前,要先搞清楚我們可以使用哪些結構,以及結構中存在的變數、指標,以利後續程式的撰寫。

list_head

Linux 提供了 list_head 這個結構體,以及鏈結串列一系列操作的巨集,讓使用者可以比較輕鬆的實作相關程式碼。他的結構如下:

struct list_head {
    struct list_head *next, *prev;
};

element_t

這是在 queue.h 中定義的結構,他擁有一個字串型態的資料,以及上面提到的 list_head 結構。

typedef struct {
    char *value;
    struct list_head list;
} element_t;

搞清楚佇列的結構之後,再參照 Linux Kernel API 文件與 list.h 文件,搞懂 API 在做的事情,終於可以開始寫程式了。

q_new

配置記憶體空間,以及使用巨集來初始化指標連結。

避免非必要的項目縮排 (即 * - ),以清晰、明確,且流暢的漢語書寫。

q_free

使用 list_for_each_entry_safe,這個巨集會在走訪目前節點的時候先保留下一個節點,這樣在刪除目前節點的資料和指標時就不用擔心會遺失下一個節點的位址。
最後再刪除 head 即可。

q_insert_head / q_insert_tail

  • 建立新節點並賦予它初值
  • 呼叫 list_add / list_add_tail

在跑 make test 的時候發現 Test of malloc failure on insert_head / insert_tail 一直過不了關,參考 LambertWSJ 的程式碼,發現原來要考慮空字串的情況。

遇到這個問題時,就有在想會不會是為了檢查記憶體配置時可能會出現的錯誤,為此設了太多 if 條件來判斷,導致花費過多時間。

element_t *new_node(char *s)
{
    element_t *node = malloc(sizeof(element_t));
    if (!node)
        return NULL;
    node->value = strdup(s);
-   if (!node->value) {
+   if (__glibc_unlikely(!node->value)) {
        free(node);
        return NULL;
    }

    return node;
}

bool q_insert_head(struct list_head *head, char *s)
{
-   if (!head || !s)
+   if (__glibc_unlikely(!head || !s))
        return false;

    element_t *node = new_node(s);
    if (!node)
        return false;

    list_add(&node->list, head);

    return true;
}

僅僅一個 insert_head 就使用了 4 個 if 條件,所以我開始查閱別人的筆記來尋找答案,我看到 25077667 會在某些 if 條件中加入 __glibc_unlikely(),這是為了告訴編譯器這裡面的條件「比較不常發生」,告訴編譯器把此條件放在記憶體比較後面的位址,來減少運行多餘程式碼的機率。

q_remove_head / q_remove_tail

找到 head / tail 的 entry 並刪除即可,注意這邊並不用呼叫 C 的 free 函式,因為 remove 與 delete 不同。

在跑 make test 的時候發現會一直出現 Segmentation fault,猜大概是因為我使用 strncpy 來複製節點內的字串到 sp 中,而因為 strncpy 不會複製最後的 \0,如果 sp 裡面原本就有字串,複製過去的字串若小於原本的長度,字串後段就會有錯誤的字元。

所以我預設在 sp 最後面加一個 \0。這時候如果 bufsize 大於 sp 的長度的話,就會出問題了,所以要檢查兩者之間的大小關係。

strncpy(sp, node->value, bufsize);
+ if (strlen(sp) >= bufsize)
    sp[bufsize - 1] = '\0';

q_delete_mid

避免非必要的項目縮排 (即 * - ),以清晰、明確,且流暢的漢語書寫。

計算佇列有多少節點
找到中間節點的 entry 並刪除

TODO : 此方法要走訪佇列1.5次,如果佇列很長的話,會花很久時間,之後改成其他方法。

q_delete_dup

每次檢查目前節點與下一節點的字串是否相同,若是則刪除本節點,但此條件不適用於連續重複字串的最後一個節點,需另外記錄目前是否還在連續重複字串節點中。

需注意鏈結串列的尾端的 next 是頭,而頭是沒有字串值的,要檢查現在節點是否為尾端。

q_swap

原本是用暴力的方式操作它的指標,交換兩個節點涉及了自己與其前面以及後面 2 個節點 (共 4 個),要進行 6 次指標操作。
但後來發現 Linux 已經有提供刪除和新增相關函式,所以將後一個節點刪除再新增至前一個節點的前面即可,後來也發現有 list_move 可以使用,其本質就是呼叫 del_listlist_add

q_reverse

原本想用 list_for_each_reverse 來手動更改指標的位置,後來想到使用 list_move 直接將每個節點移動到 head 的前面即可。

q_reverseK

想法跟 q_reverse 一樣,把節點放到 head 的前面,但是執行完 k 次之後,新的一輪不能放在上一輪的前面,所以要找出要插入的位置在哪邊。
如果使用 list_for_each_safe 來走訪節點,當執行完一輪後,我們可以把目前 safe->prev 的所在位置視為新一輪 reverse 的 head。但是要用另一個指標來記錄,因為 safe 會隨時間變動。

再來要考慮節點不足 k 個的情況,所以先看是否有足夠的節點可以倒轉。

但我在想或許我們可以善用上面寫的 q_reverse,所以我想到了另一個方案:

把 K 個節點想成一個區塊,在每個區塊中,其實節點之間的順序與 q_reverse 一樣是反過來的,只是在q_reverseK 中,需要調整各個區塊之間的順序。所以我想要先全部 reverse 一次,再去調整區塊間的順序。

調整的順序由後往前開始走訪舊串列,每 K 個點為一組,移動到新串列的尾端,最後若是跑完迴圈,但舊串列裡面還有剩餘節點,就代表尾端有不足 K 個節點,倒轉回來即可。

蓋進你的漢語表達。

但尷尬的是,作業裡面的list.h 沒有 list_for_each_safe_reverse 或相關巨集,於是我改寫了 list_for_each_safe,新增一個 reverse 的版本 。

#define list_for_each_safe(node, safe, head)                     \
    for (node = (head)->next, safe = node->next; node != (head); \
         node = safe, safe = node->next)

#define list_for_each_safe_reverse(node, safe, head)             \
    for (node = (head)->prev, safe = node->prev; node != (head); \
         node = safe, safe = node->prev)

注意我們不能修改 list.hqueue.h,不然會不能 commit。

q_sort

Linked List Sort 提及 4 種針對鏈結串列的排序方法,我選擇實作 Merge sort。

首先需要持續呼叫 q_sort 函式來分割鏈結串列,我們需要找到中間的節點,目前我有想到 2 種方法可以達到此效果:

  • 使用快慢指標,快指標一次走兩步,慢指標一次走一步,當快指標所指到的目前 / 下一個節點為 head 時,慢指標即為中心節點。
struct list_head *slow = head, *fast = head;

do {
    fast = fast->next->next;
    slow = slow->next;
} while (!(fast == head || fast->next == head));
  • 使用相反方向指標,當正向指標與反向指標相遇,或正向指標的下一個節點是反向指標時,反向指標即為中心節點。

找到中心節點後,用 list_cut_position(*head_to, *head_from, *node) 將鏈結串列分成兩個,這個函式會把 head_from 串列中從第一個到 *node 節點分割到 head_to 串列,而原本的 head_from 串列就會剩下被分割的後半段。
將這兩個鏈結串列分別傳入 q_sort 做遞迴,最後呼叫 merge_two_sorted_list 來合併串列。

你如何確保目前的測試已涵蓋排序演算法最差的狀況?

merge_two_sorted_list

想法很簡單,先用另一個 head 來記錄合併的串列。由於兩個鏈結串列已經是排序好的情況,只需要依次比較兩個串列的頭節點的大小,找到需要的值,把該節點移至合併字串的尾端即可。

當偵測到其中一個串列為空,代表後續就不需要比較了,將另一串列接到合併串列的尾端即可。

q_descend

先建立一個新的佇列來存放答案,我稱為 ans_queue,這個佇列會維持在 descending 的狀態。

從給定的鏈結串列的頭開始走訪,把目前節點的字串值從 ans_queue尾端往前比較,遇到比自己小的值就將它刪除,直到遇到比自己大或者一樣的字串就停下來,並且新增到 ans_queue的尾端。
注意如果現在 ans_queue 為空,則無條件將目前節點放入 ans_queue 中。

q_merge

我的疑問是,該怎麼存取其他的佇列? 參考 25077667 同學的解法才知道要用到 queue_contex_t

那剩下就簡單啦,上面已經實作出 q_sortmerge_two_sorted_list 了,但同時我又想到另一個問題:

  • 先對各個鏈結串列做排序,再用 merge_two_sorted_list 合併
  • 先把各個鏈結串列相接,再進行排序

以上哪個方法會比較快?

你的數學分析呢?

LIST_HEADINIT_LIST_HEAD

container_ofoffset_of 存在的意義

當我們想要走訪某個節點,並存取其中的資料時,需要透過 .-> 的操作來獲取,其本質就是計算資料對於節點本身記憶體位址 (entry) 的偏移量。以 Linux 的鏈結串列舉例:

struct list_head {
 struct list_head *next, *prev;
};

struct Node {
    int value1;               //結構在記憶體中的起始位址
    int value2;
    struct list_head list;    //節點之間的連結在這裡
};

struct Node node;
struct Node *node_ptr;

我們可以透過 node.value1node_ptr->value2 來拿到該結構中的資料。

在結構定義中,記憶體位址的順序是按照變數宣告順序排列的,也就是說第一個變數所在的記憶體位置等同於此節點本身在記憶體中的位址 (entry),但在我們自行定義的結構中,list 不一定會宣告在結構的第一個位置。

再觀察 list_head 結構中,nextprev 都是指向跟自己相同的結構,可以得知,Linux 所設計的鏈結串列在走訪節點的時候,是透過 list_head 作為媒介。

這時候問題來了:我們該如何拿到該結構中的資料?

我們走訪節點時,指標所指到的記憶體位址「並不是」該節點在記憶體中的「起始位址」。 所以我們不能夠單純的使用 -> 來存取資料,因此 linux 提供了 container_ofoffset_of 巨集來獲取節點在記憶體中的起始位址 (entry),也就誕生了 list_for_each_entry 與後續相關巨集。

linux/list.h 內部函式的實作方法

在研讀文件的時候,發現很多功能類似的函式都是呼叫相同的函式,只差在傳進去的參數的不同,對應到 linux 喜歡把特例變成一般的情況。

舉例來說:

static inline void list_add(struct list_head *new, struct list_head *head)
{
	__list_add(new, head, head->next);
}
static inline void list_add_tail(struct list_head *new, struct list_head *head)
{
	__list_add(new, head->prev, head);
}

兩個函式都會呼叫 __list_add 函式,但傳進去的參數不同。

關於自動測試程式的一些問題

當我在測試一些東西的時候,看到了自動測試程式有以下輸出:

cmd> size
ERROR: Computed queue size as 1, but correct value is 0
l = [ ... ]

就覺得很好奇為何會有這種情況發生,於是我嘗試以下操作:

cmd> it 1
l = [3 ... ]
ERROR:  Queue has more than 1 elements
cmd> rt
Removed 1 from queue
l = [ ... ]

... 存在於鏈結串列中,但是他卻無法被移除,而且也無法觀察該節點的資料。

我發現好像是因為自動測試程式會依據函式返回的值來判斷鏈結串列目前應該要有多少個節點,如果不符合就會出現上面那種情況。

理工人說話應該避免「好像」,拿證據來說話!

改進你的漢語表達。台大電機系李宏毅教授對於 ChatGPT 的看法是,要善用人工智慧的成果,一旦人們得以善用,人類的書寫不該比 GPT 一類的語言模型差。
\(\to\) 2023 年 Linux 核心設計/實作課程第一次作業檢討

巨集使用的疑難雜症

為何 list_for_each_entry_safe 裡面用的 entry 和 safe 不用 malloc ?

因為它實際上並沒有像節點一樣儲存資料,只是一個用來指向節點的指標


改進 lib/list_sort.c

合併排序的問題

解釋 Linux 核心的 lib/list_sort.c

merge

merge 函式中,我上面的方法是使用 list_move 來移動比較過後的節點,但 Linux 卻只使用了三行程式碼來完成,且運用到了 pointer to pointer 的概念。且它並不是一次只移動一個節點到 head,而是將該節點所在串列移動head 的尾端。以下是部分程式碼:

struct list_head *head, **tail = &head;

// a and b are two sotred list heads
if (cmp(priv, a, b) <= 0) {
    *tail = a;
    tail = &a->next;
    a = a->next;
    if (!a) {
        *tail = b;
        break;
    }
} 

我們是需要自己去定義 cmp 的。而對於 priv 的解釋則是參考 vax-r 的解釋:

可以注意到函式的第一個參數 priv ,可以在 /lib/test_list_sort.c 當中看到 priv 是用來使得 check 這個函式經過 KUnit 測試後可以 failed。

merge_final

再來看到 merge_final,他比起 merge 多了下面這個步驟,b 是當其中一個串列為空時,剩下的串列所在的位置。這看起來是很沒必要的行為,因為是拿同一個節點進行比較,但凡事都有原因。

do {
    if (unlikely(!++count))
        cmp(priv, b, b);
    b->prev = tail;
    tail = b;
    b = b->next;
} while (b);

註解提到:

If the merge is highly unbalanced (e.g. the input is already sorted), this loop may run many iterations. Continue callbacks to the client even though no element comparison is needed, so the client's cmp() routine can invoke cond_resched() periodically.

如果其中一個串列還剩很多節點,持續呼叫 cmp() 可以週期性的呼叫 cond_resched(),這個函式與 Kernel 安排 CPU time 有關。

list_sort

如果有兩個尚未合併的 \(2^k\) 大小的串列,等到下一個也是 \(2^k\) 大小的串列出現,前兩個就會合併
只要這 \(3*2^k\) 個節點可以全部放到快取中,就可以避免 cache thrashing,簡單來說就是一直重複替換快取中某個地方的資料,非常浪費時間。

count 會持續記錄尚未合併的節點數量,當達到 \(2^k\) 的奇數倍時,就會啟動合併。

發生 2 次合併後,所產生的 2 個 \(2^{k+1}\) 大小的串列會在第 3 個串列出現前合併,所以永遠不會有過多沒有合併的串列。

程式碼分析:

  • 變數結構:
    • *list = head->next:要排序的串列的第一個節點
    • *pending = NULL :還沒合併的串列
    • count = 0pending 中節點的數量
    • **tail = &pending:指向 pending 指標的指標,它的作用是尋找 pending 中要做合併的位置,並且會指向該合併點尾端的指標

在整個過程中,list->prev 會持續指向 pending

有趣的是,在第一行,程式就把串列變為暫時的單向鏈結串列。但卻沒有更動頭的 prev

/* Convert to a null-terminated singly-linked list. */
head->prev->next = NULL;

count 的 LSB 是 1 的時候,才會進入迴圈,並且持續右移,直到有 0 出現。這邊在尋找要合併的位置。

/* Find the least-significant clear bit in count */
for (bits = count; bits & 1; bits >>= 1)
    tail = &(*tail)->prev;

這邊的意思是,如果 bits 裡面還有剩餘的 1 的話,就會啟動合併。換言之,如果目前 count + 1 為 2 的冪,則不合併。合併後的串列會儲存在 a,後續再修改 pending (*tail) 來指到剛合併完的串列。

一開始在追蹤程式碼的時候可能會覺得很奇怪,因為 pending 一開始是只有藉由 prev 連接的單向鏈結串列,而下面是將 tail 所在位置與 tail->prev 進行合併,而在第一次呼叫 merge 之後才會將 pending 中的 next 指標接好。

if (likely(bits)) {
    struct list_head *a = *tail, *b = a->prev;

    a = merge(priv, cmp, b, a);
    /* Install the merged result in place of the inputs */
    a->prev = b->prev;
    *tail = a;
}

推論:

使用 bottom up 的方法實作 merge sort 相較於 top down 方法省去了 partition 的步驟,由於合併的過程是與相鄰的串列進行,這方法會讓最近被合併過的資料,在不久後馬上跟附近的串列合併,在快取中的使用率會相對地頻繁。

你連數學分析和實驗都沒做,怎麼會有「結論」?
陸游:「紙上得來終覺淺,絕知此事要躬行」

引入 list_sort 到 lab0-c 當中

list_sort.c

避免非必要的項目縮排 (即 * - ),以清晰、明確,且流暢的漢語書寫。

首先修改需要 include 的檔案。

  #include <linux/kernel.h>
- #include <linux/bug.h>
- #include <linux/compiler.h>
- #include <linux/export.h>
  #include <linux/string.h>
- #include <linux/list_sort.h>
- #include <linux/list.h>
+ #include <list_sort.h>
+ #include "queue.h"

新增 likelyunlikely 巨集。

# define likely(x)	__builtin_expect(!!(x), 1)
# define unlikely(x)	__builtin_expect(!!(x), 0)

把變數型態宣告 u8 改成 uint8_t

- u8 count = 0;
+ uint8_t  count = 0;

新增 cmp 函式,把 priv 移除,並加入指定 descend 功能,記得要修改 nonnull 的變數位置。

int cmp(const struct list_head *a, const struct list_head *b, bool descend)
{
    element_t *a_entry = list_entry(a, element_t, list);
    element_t *b_entry = list_entry(b, element_t, list);
    
    if (!descend)
    	return strcmp(a_entry->value, b_entry->value) < 0 ? 0 : 1;
    else
    	return strcmp(a_entry->value, b_entry->value) > 0 ? 0 : 1;
}

把最後面的 EXPORT_SYMBOL 移除,暫時用不到

q_test.c

首先新增標頭檔。

+ #include "queue.h"
+ #include "list_sort.h"

並且新增 do_lsort 命令,我是直接參考裡面的 do_sort 寫法,沒有更改太多東西。

Makefile

OBJS := qtest.o report.o console.o harness.o queue.o \
        random.o dudect/constant.o dudect/fixture.o dudect/ttest.o \
        shannon_entropy.o \
-       linenoise.o web.o
+       linenoise.o web.o list_sort.o

比較效能落差

我們可以藉由三種數據來比較各個排序演算法之間的差異:

  • Execution time
  • The number of comparisons
  • Cache miss ratio

我選擇比較執行時間的差距,在 Dude, is my code constant time? 中提到,可以測量程式的 CPU cycle 來做為時間的依據。但我就沒有再執行後續更嚴謹的步驟,也就是設立虛無假說和計算 F 值。

使用 Linux 效能分析工具 perf 進行計算

以下設計是參考 chiangkd 的方法,我預計使用隨機生成且不同大小的測資,測量 10 次來取平均。大概測到 500 萬左右個節點的時候就記憶體就爆了滿了。

何謂「爆了」,用精準清晰的漢語書寫。

command 是「命令」,而非「指令」(instruction)

執行以下命令 :

sudo perf stat --repeat 10 -e instructions,cycles ./qtest -f traces/trace-sort.cmd 

我原本是想要用 cycles 作為比較依據的,但遇到了一點麻煩,目前無法解決,所以先用時間。

自己實作的 q_sort

Data Size Average Time (Sec)
50000 0.0207
100000 0.0630
500000 0.4865
1000000 1.0961

Linux 的 list_sort

Data Size Average Time (Sec)
50000 0.0167
100000 0.0607
500000 0.4223
1000000 0.9389

確實有比較快的趨勢。

用你的電腦科學知識來解讀!


研讀論文〈Dude, is my code constant time?〉

在一些已經存在的方法中,需以某種方式對硬體進行模擬,而 CPU 的訊息並不是完全公開的,在實現上難度很高,這是它們共同的缺點。
而 dudect 是基於 leakage 偵測,來判斷程式碼的複雜度是否為 constant time 的一種工具。

解釋 Simulation

當設定 simulation 為 1 後,在呼叫 it, ih, rt, rh 這 4 種命令時,會呼叫相對用來測試是否為 constant time 的函式:

bool ok = pos == POS_TAIL ? is_insert_tail_const() : is_insert_head_const();

if (!ok) {
    report(1, "ERROR: Probably not constant time or wrong implementation");
    return false;
}
report(1, "Probably constant time");
return ok;

但我怎麼都找不到這些函式被定義在哪邊,直到我看到 var-x 同學提到它寫在 fixure.h 中才知道。但這部分實在超出我的能力值太多,所以先放著。

#define _(x) bool is_##x##_const(void);
DUT_FUNCS
#undef _

總歸來說,最終呼叫的是 test_const 這個函式,並用 mode 來區分目前是測試哪個命令。其中會持續的使用 doit 來測試,只要有一次通過,程式就會中斷並判定為 constant time。相對的,如果跑了足夠多次並且都沒有成功,則宣布不是 constant time。

static bool test_const(char *text, int mode)
{
    bool result = false;
    t = malloc(sizeof(t_context_t));

    for (int cnt = 0; cnt < TEST_TRIES; ++cnt) {
        printf("Testing %s...(%d/%d)\n\n", text, cnt, TEST_TRIES);
        init_once();
        for (int i = 0; i < ENOUGH_MEASURE / (N_MEASURES - DROP_SIZE * 2) + 1;
             ++i)
            result = doit(mode);
        printf("\033[A\033[2K\033[A\033[2K");
        if (result)
            break;
    }
    free(t);
    return result;
}

doit 裡面,以下為主要測試區塊的程式碼

bool ret = measure(before_ticks, after_ticks, input_data, mode);
differentiate(exec_times, before_ticks, after_ticks);
update_statistics(exec_times, classes);
ret &= report();
  • measure : 被定義在 constant.c 中,它的測量方法是先插入一個字串,之後再測量插入/刪除另一個字串的時間。
  • differentiate:計算 before_ticksafter_ticks 的差值。
  • update_statistics:檢測測量數據的平均值。
  • report:藉由檢查 t 統計值 max_t 是否超過閾值來判斷

解釋 Student's t-distribution

新增 percentile 處理

仿照 dudect.h 對 lab0-c 做處理。詳細程式碼請見 commit 82be777

  • t_context_t 結構中新增 percentiles,方便統一管理。
  • 捨棄第一次測量的 batch,並執行 prepare_percentile
  • update_statistics 中,這個迴圈省去了前面 10 筆的資料,我也仿照它的作法。

改進程式碼之後,終於拿到 :100: 分了。

確認程式碼和論文出入之處,就可準備提交 pull request

改善 web 與 linenoise 之衝突問題

SuNsHiNe-75 協作

Linenoise

這是一個 C 函式庫,用來處理命令列單行輸入,並支援自動補齊命令和命令歷史記錄功能,間接讓命令列應用程式的開發變得更便利。

一些重要函式:

  • *linenoise():linenoise 函式庫的主要 API。它首先透過檢查「Stupid terminals 的黑名單」來檢查終端是否具有基本功能。根據結果,擇一執行下列動作:

    • 呼叫 line_edit()
    • 使用 dummy fgets() 函式,確保在最不利的情況下,使用者還是能輸入命令。
  • line_raw():使用「設置為 raw mode 的 STDIN 檔案描述子」來呼叫 line_edit()

    Raw mode:輸入的資料將不會被緩衝,而是直接傳遞給應用程序,同時輸出的資料會直接發送到終端,而不經過作業系統的進一步處理。

    呼叫 line_edit(STDIN_FILENO, STDOUT_FILENO, buf, buflen, prompt)

  • line_edit():行編輯能力的核心。它期望 fd 已經處於 raw mode,這樣每按一個鍵都會立即回傳給 read()。當使用者輸入 enter 或 ctrl+d 時,「結果字串」將被放入 buf 中,最後回傳當前緩衝區的長度

    write(l.ofd, prompt, l.plen) 參考 write(2),其將資料寫入到檔案描述子對應的檔案,可在終端機代替 printf 輸出對應 prompt。

    read(l.ifd, &c, 1) 參考 read(2),其在檔案描述子對應的檔案中讀取資料,可取代 scanf 在命令列讀取資料。

select()

以下為此函式所要傳入的參數,參考 Linux Manual Page-select() 文件:

  • readfds:檢查這些檔案描述子是否準備好被進行讀取。在 select() return 後,readfds清除所有未準備好被讀取的檔案描述子。
  • writefds:檢查這些檔案描述子是否準備好被進行寫入。在 select() return 後,writefds清除所有未準備好寫入的檔案描述子。
  • nfds:此參數應設置為任一集合中「最大檔案描述子的數字加 1」。

以下為此函數的回傳值:

  • 成功:回傳 readfdswritefdsexceptfds 中被設置的檔案描述子數量。
  • 失敗:回傳 -1。檔案描述子集合不變。

console.c

在我嘗試理解 console.c 時,大概可以感覺到與命令列及 linenoise 互動相關的重要函式為 do_web()cmd_select()run_console()

do_web()

預設 port 為 9999,並呼叫 web.c 中的 web_open() 來建立網頁伺服器。

若成功,則輸出相關訊息、設定 web_fd 為 1,並設定 use_linenoise 參數為 false。
若失敗則 perror("ERROR")

cmd_select()

類似 select函式,但會先檢查命令輸入是否存在於內部緩衝區從命令輸入中可讀取。如果沒被阻擋的話,把緩衝區中的 fd 加入到 selectreadset 中,如果 web 還沒準備好監聽,也把 web_fd 加入到 readset 中,再調整 nfds

呼叫 select(),然後觀察哪個檔案描述子還在 readset 中,分別執行對應操作,最後回傳 result 值。

run_console()

該函式會呼叫一堆 linenoise.c 中定義的 API,值得注意的是也會判斷先前定義的 use_linenoise 參數的布林值,若為 "true",才會進一步提供命令自動補齊、歷史紀錄等操作。

我觀察到以下程式碼:

if (web_fd > 0) {
    printf("listen on port %d, fd is %d\n", port, web_fd);
    linenoise_webfd(web_fd);
    use_linenoise = false;
}

是不是只要單純改一個變數就可以解決問題? 怎麼可能。

在我將 use_linenoise 設為 true 後,發現 qtest 端能正常運行 linenoise 套件。然而另一終端機遠端輸入指令時卻直接卡住了。

此外,若將 run_console()while() 判斷中的 use_linenoise 移除,結果也是如此。

解決 linenoise 與 socket 之衝突

commit 09478ba

原本的流程如下:

do_web 把 use_linenoise 設為 false,所以 run_console() 會執行 socket 的相關操作,也就不會有 linenoise 的運作了。
既然 stdin 及 socket 都會執行 cmd_select(),那我在 cmd_select() 中直接引入 linenoise。

-    char *cmdline = readline();
+    char *cmdline = linenoise(prompt);

如此改動會造成在 line_edit() 中的 read() 被阻塞時,linenoise 套件會不能運作。也就是輸入 web 命令後,linenoise 會暫時失效,等到另一終端機下命令後,linenoise 才可重新運作。

參考 vax-r
若將 linenoise() 擺在 select() 之後才執行,也就是確認是 std input file descriptor 變為 ready 時才使用,則會導致我們必須按下兩次 ENTER 才能正確執行一次命令,因為第一次 ENTER 用來通知 select() 這個 fd 的狀態改變,第二次 ENTER 則是用來跳出 line_edit() 函式。

既然最後都要呼叫 line_edit(),則將 select() 移到該函式中,同時監控 stdin 之檔案描述子或 socket 之檔案描述子就好。
只需要在 cmd_select() 將兩邊的命令傳入 linenoise() ,最後再呼叫 line_edit() 即可。

在後續實驗中,發現如果 qtest 端有輸入字元但是沒有按 Enter,Web 端發送命令後會等待qtest 端完成命令輸入後,才會執行 Web curl。


新命令 Shuffle 之實作

SuNsHiNe-75 協作

Fisher–Yates Shuffle 演算法

分為 Original versionModern version

Original version

設有一陣列 arr = {1, 2, 3, 4} 及一空陣列 rst = {}

剛開始時每個數字的選取機率皆為 \(\frac{1}{4}\),假設選中 3,則將其從 arr 移除,並移到 rst[0] 的位置,rst = {3}

第一輪過後 arr = {1, 2, 4},則目前每個數字的選中機率為 \(\frac{1}{3}\),以此類推,直到 arr 陣列為空。

Modern version

先預設目前排頭是陣列中的第一個數字,從排頭之後 (不包含排頭) 的陣列中隨機選取數字後,將其與目前排頭交換,每完成一次步驟,排頭往後一個數字。

改進你的漢語表達。

此方法的優點在於時間複雜度與空間複雜度的降低-時間複雜度由 \(O(n^2)\) 降為 \(O(n)\),空間複雜度則由 \(O(n)\) 降為 \(O(1)\)

實作

用程式把上述的 Modern version 實作出來,再用前面提到的方法來新增一個 shuffle 命令即可。

測試程式分析

根據下方輸出及統計圖表的結果可知,Fisher–Yates Shuffle 演算法是 Uniform Distribution

Expectation:  20833

Observation:  {'1234': 20976, '1243': 20984, '1324': 20687, '1342': 20555,
'1423': 21081, '1432': 20716, '2134': 20734, '2143': 20831, '2314': 20966,
'2341': 20991, '2413': 20496, '2431': 21126, '3124': 20804, '3142': 20719,
'3214': 20754, '3241': 20679, '3412': 20739, '3421': 21037, '4123': 20927,
'4132': 20781, '4213': 20876, '4231': 21005, '4312': 20884, '4321': 20652}

chi square sum:  30.79249267988288

messageImage_1709562054200


Tic-Tac-Toe

新增 ttt 命令

qtest.c 中新增一條命令:

ADD_COMMAND(ttt, "Tic-Tac-Toe", "");

並實作 do_ttt 函式,把 jserv/ttt 中必要的檔案引入到 lab0-c 中。

我在 do_ttt 結束前會把 move_count 設為 0,這樣本輪紀錄的步數就不會留到下一輪。

+ move_count = 0;

其中,在 jserv/ttt/tree/main/agents 有提供已經寫好的 negamax AI,以及 zobrist hashing 與 mt19937-64 等若干工具可直接使用,詳細請見 commit f3ba748

用定點數取代浮點數運算

Monte Carlo Tree Search (MCTS)

以下黑白棋舉例,使用 Monte Carlo Tree Search 會讓電腦在下每一步棋之前計算他這次下在哪邊會得到最高的勝率。但要注意,他並不會考慮到之後的所有延伸結果,因為其複雜度是無可計量的,因此這其實不是真正意義上的最佳解。

主要分成 3 個步驟:

  • Select Leaf Node: 概念為往目前 Upper Confidence Bound (UCB) 最高的方向尋找,直到找到的點沒有左/右子點。UCB 可達成 Exploitation(選擇已知最好策略) 與 Exploration(選擇探索其他路線)的平衡。
  • Expand / Rollout: 若尋找到的為尚未更新過的新節點,則執行 Rollout,以該節點為起點,模擬完整場遊戲直到結束,並獲得比賽結果。若該節點已更新過,則執行 Expand,往下生成兩個新節點。
  • BackPropagate: Rollout 結束後,向上更新所有相關節點的結果。

浮點數

可以發現在 mcts.c 中與 double score 有關的操作皆使用浮點數運算,從這邊開始進行定點數運算的優化。

double 本身是 64-bit 的雙精度浮點數資料型態,其儲存的格式如下:

IEEE_754_Double_Floating_Point_Format.svg

為了方便比較大小,其指數部分是使用 bias 形式表示,而不是使用「二補數」表示,就像我們在計算機組織中學習的一樣。

定點數

首先我們要知道,為何 Linux 核心會偏好定點數的實作,理由在於定點數的運算比浮點數要快,需要的硬體也比較少。假如已經事先知道要計算的數值範圍,並且限制在較小的範圍內,定點數運算可以有效的使用其位元。

使用定點數運算並不會受到是否有浮點運算器(FPU)的影響,可移植性比浮點數高,且不是所有 Linux 支援的硬體都具備 FPU 等因素,因此 Linux 核心比較建議使用定點數運算。

定點數表示法中,小數部份和整數部份一樣,表示為進制底數 \(b\) 的冪次,不過是以負數冪次來表示。即若儲存了 n-bit 的小數,其數值一定是 \(b^{−n}\) 的整數倍。

定點數類型的值其實就是個整數,但需要額外作比例進位,而進多少位,則需要根據具體的定點數類型決定。如二進制定點數下的負數常常會用二補數型式下的有號整數表示,其中隱含著縮放係數(或小數長度)。

舉個例子,8-bit 二進制有號整數 \((11110101)_2 = −11\),若分別配合 -3, +5 及 +12 的縮放位元,它們的數值即為 \(\frac{−11}{2^{−3}} = −88\)\(\frac{−11}{2^5} = −0.34375\),和 \(\frac{−11}{2^{12}} = −0.002685546875\)

下列是定點數運算的常見規則:

  • 定點數加法與減法:直接進行
  • 定點數乘法:結果須再\(b^n\) (b 為進制,n 為小數位數)
  • 定點數除法:結果須再\(b^n\) (b 為進制,n 為小數位數)

縮放係數

縮放係數就是用來確定小數點的位置在該整數型態的哪一位,即 fractional bits

較大的縮放係數可以增加定點數的範圍,但相對犧牲的是精度。反過來說,較小的縮放係數可以提高精度,但須要犧牲可表示的範圍。

鑒於 double 為 64-bit,實作上我利用 stdint.h 中定義的 uint64_t 型態來取代原 scoredouble 型態。另設定縮放位元有 12-bit,即縮放係數為 \(2^{12}\)

縮放係數固然越大越好,但也不能犧牲掉計算精度,藉由觀察相關運算函式(uct_score 及其中的 log 等)後,覺得此縮放係數在這個應用中可以兩者兼具,且不會有溢位的問題。

實作定點數操作

首先引入 stdint.h,並將相關變數型態改為 uint64_t

再來定義縮放係數。

+ #define FIXED_SCALING_BITS 12
calculate_win_value

將其回傳值型態改成定點數即可。注意回傳值型態為 uint64_t

uint64_t calculate_win_value(char win, char player)
{
    if (win == player)
+       return 1ULL << FIXED_SCALING_BITS;
-       return 1.0;
    if (win == (player ^ 'O' ^ 'X'))
+       return 0ULL;
-       return 0.0;
+   return 1ULL << (FIXED_SCALING_BITS - 1);
-   return 0.5;
}
sqrt

使用 二分逼近法 來實現。

uint64_t fixed_sqrt(uint64_t x)
{
    if (x <= 1)
        return x;

    uint64_t low = 0ULL;
    uint64_t high = x;
    uint64_t precision = 1ULL << (FIXED_SCALING_BITS - 2);

    while (high - low > precision) {
        uint64_t mid = (low + high) / 2;
        uint64_t square = (mid * mid) >> FIXED_SCALING_BITS;

        if (square < x) {
            low = mid;
        } else {
            high = mid;
        }
    }

    return (low + high) / 2;
}
log

實作 log 的方法有很多種,我選擇比較常見也比較容易實現的泰勒級數展開

將一個函數「以多項式來表示」的一種方式。
一多項式函數 \(f(x)\)\(x = a\) 時,\(n\) 階的泰勒展開式 \(P_n(x)\) 是:
\(P_n(x) = f(a) + \frac{f^{(1)}(a)}{1!}(x-a)^1 + \frac{f^{(2)}(a)}{2!}(x-a)^2 +...+ \frac{f^{(n)}(a)}{n!}(x-a)^n + R_n(x)\)

而對於對數函數,可以使用「自然對數」的泰勒展開式來近似計算。

\(ln(1+x) = x - \frac{x^2}{2} + \frac{x^3}{3} - \frac{x^4}{4} +......\)

static uint64_t fixed_log(uint64_t x)
{
    uint64_t fixed_x = x << SCALING_BITS;

    if (x == 0) return UINT64_MAX;

    if (x == 1) return 0ULL;

    uint64_t result = 0;

    for (int i = 1; i <= 16; ++i) {
        if (i % 2 == 0) {
            result -= fixed_power_int(fixed_x, FIXED_SCALING_BITS, i) / i;
        } else {
            result += fixed_power_int(fixed_x, FIXED_SCALING_BITS, i) / i;
        }
        fixed_x = (fixed_x * (x << SCALING_BITS)) >> SCALING_BITS;
    }

    return result;
}

\(x\) 的冪次部分,使用教材中提供的 fixed_power_int 函式來實現,並修改為回傳值為 uint64_t

詳細實作請見 commit 53ec983

電腦 vs. 電腦

qtest 中加入藉由 option 命令來調整為電腦 vs. 電腦的選項:

// global variable
int ai_vs_ai = 0;
// in console_init()
add_param("AI_vs_AI", &ai_vs_ai, "Let AI and AI play Tic-Tac-Toe", NULL);

在原本是玩家的回合中,檢測目前是否為電腦 vs. 電腦,如是,則讓 AI 決定下一步:

if (turn == ai) {
    int move = negamax_predict(table, ai).move;
    if (move != -1) {
        table[move] = ai;
        record_move(move);
    }
} 
else 
{
+   if (ai_vs_ai) {  // AI vs. AI
+       int move = negamax_predict(table, turn).move;
+       if (move != -1) {
+           table[move] = turn;
+           record_move(move);
+       }
    } else {  // Player vs. AI
        draw_board(table);
        int move;
        while (1) {
            move = get_input(turn);
            if (table[move] == ' ') {
                break;
            }
            printf("Invalid operation: the position has been marked\n");
        }
        table[move] = turn;
        record_move(move);
    }
}

經過我反覆測試發現,每次的電腦 vs. 電腦對局的過程都會一模一樣。為了讓此對局有變化性,我決定讓先手的電腦隨機選擇第一步。
get_input 函式可以知道,玩家輸入的英文字母數字會被轉變為 { 0 ~ BOARD_SIZE * BOARD_SIZE - 1 } 之間的數字,所以我讓電腦在第一步時隨機選擇範圍內的數字。

bool first_move = true;

if (first_move) {
    move = rand() % (BOARD_SIZE * BOARD_SIZE - 1);
    first_move = false;
}

最後再加入顯示時間的功能,此部分參考 vax-r 同學的程式碼:

draw_board(table);
time_t timer = time(NULL);
struct tm *tm_info = localtime(&timer);

char buffer[26];
strftime(buffer, 26, "%Y-%m-%d %H:%M:%S", tm_info);
puts(buffer);
Select a repo