contributed by < jujuegg
>
kkkkk1109
Kuanch
lib/list_sort.c
使用 perf
應該能給出詳細的數據,我有遇到使用 WSL 或 VMWare 是無法完整給出數據的,最後以原生 Linux 機器得到,建議另寫測試,不要使用 qtest
介面,可以給出遭遇的問題;或至少可以使用 dudect/cpucycles.h
計算 cycle 數,yeh-sudo 蒐集了許多資料點後繪製出圖片,在此處也是很好的分析方法git log
或 git tree
進一步改進分析 git
既然發現學員表現不好,那你直接做一次你認為好的做法,這樣才有程式碼審查的效果。 jserv
$ gcc --version
gcc (Ubuntu 11.4.0-1ubuntu1~22.04) 11.4.0
$ lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Address sizes: 45 bits physical, 48 bits virtual
Byte Order: Little Endian
CPU(s): 3
On-line CPU(s) list: 0-2
Vendor ID: GenuineIntel
Model name: Intel(R) Core(TM) i7-9700 CPU @ 3.00GHz
CPU family: 6
Model: 158
Thread(s) per core: 1
Core(s) per socket: 3
Socket(s): 1
Stepping: 13
BogoMIPS: 5999.99
在開始實作之前,要先搞清楚我們可以使用哪些結構,以及結構中存在的變數、指標,以利後續程式的撰寫。
list_head
Linux 提供了 list_head
這個結構體,以及鏈結串列一系列操作的巨集,讓使用者可以比較輕鬆的實作相關程式碼。他的結構如下:
struct list_head {
struct list_head *next, *prev;
};
element_t
這是在 queue.h 中定義的結構,他擁有一個字串型態的資料,以及上面提到的 list_head
結構。
typedef struct {
char *value;
struct list_head list;
} element_t;
搞清楚佇列的結構之後,再參照 Linux Kernel API 文件與 list.h 文件,搞懂 API 在做的事情,終於可以開始寫程式了。
q_new
配置記憶體空間,以及使用巨集來初始化指標連結。
避免非必要的項目縮排 (即 *
和 -
),以清晰、明確,且流暢的漢語書寫。
q_free
使用 list_for_each_entry_safe
,這個巨集會在走訪目前節點的時候先保留下一個節點,這樣在刪除目前節點的資料和指標時就不用擔心會遺失下一個節點的位址。
最後再刪除 head 即可。
q_insert_head
/ q_insert_tail
list_add
/ list_add_tail
在跑 make test 的時候發現 Test of malloc failure on insert_head / insert_tail 一直過不了關,參考 LambertWSJ 的程式碼,發現原來要考慮空字串的情況。
遇到這個問題時,就有在想會不會是為了檢查記憶體配置時可能會出現的錯誤,為此設了太多 if
條件來判斷,導致花費過多時間。
element_t *new_node(char *s)
{
element_t *node = malloc(sizeof(element_t));
if (!node)
return NULL;
node->value = strdup(s);
- if (!node->value) {
+ if (__glibc_unlikely(!node->value)) {
free(node);
return NULL;
}
return node;
}
bool q_insert_head(struct list_head *head, char *s)
{
- if (!head || !s)
+ if (__glibc_unlikely(!head || !s))
return false;
element_t *node = new_node(s);
if (!node)
return false;
list_add(&node->list, head);
return true;
}
僅僅一個 insert_head
就使用了 4 個 if
條件,所以我開始查閱別人的筆記來尋找答案,我看到 25077667 會在某些 if
條件中加入 __glibc_unlikely()
,這是為了告訴編譯器這裡面的條件「比較不常發生」,告訴編譯器把此條件放在記憶體比較後面的位址,來減少運行多餘程式碼的機率。
q_remove_head
/ q_remove_tail
找到 head / tail 的 entry 並刪除即可,注意這邊並不用呼叫 C 的 free
函式,因為 remove 與 delete 不同。
在跑 make test
的時候發現會一直出現 Segmentation fault,猜大概是因為我使用 strncpy
來複製節點內的字串到 sp
中,而因為 strncpy
不會複製最後的 \0
,如果 sp
裡面原本就有字串,複製過去的字串若小於原本的長度,字串後段就會有錯誤的字元。
所以我預設在 sp
最後面加一個 \0
。這時候如果 bufsize
大於 sp
的長度的話,就會出問題了,所以要檢查兩者之間的大小關係。
strncpy(sp, node->value, bufsize);
+ if (strlen(sp) >= bufsize)
sp[bufsize - 1] = '\0';
q_delete_mid
避免非必要的項目縮排 (即 *
和 -
),以清晰、明確,且流暢的漢語書寫。
計算佇列有多少節點
找到中間節點的 entry 並刪除
TODO : 此方法要走訪佇列1.5次,如果佇列很長的話,會花很久時間,之後改成其他方法。
q_delete_dup
每次檢查目前節點與下一節點的字串是否相同,若是則刪除本節點,但此條件不適用於連續重複字串的最後一個節點,需另外記錄目前是否還在連續重複字串節點中。
需注意鏈結串列的尾端的 next 是頭,而頭是沒有字串值的,要檢查現在節點是否為尾端。
q_swap
原本是用暴力的方式操作它的指標,交換兩個節點涉及了自己與其前面以及後面 2 個節點 (共 4 個),要進行 6 次指標操作。
但後來發現 Linux 已經有提供刪除和新增相關函式,所以將後一個節點刪除再新增至前一個節點的前面即可,後來也發現有 list_move
可以使用,其本質就是呼叫 del_list
和 list_add
。
q_reverse
原本想用 list_for_each_reverse
來手動更改指標的位置,後來想到使用 list_move
直接將每個節點移動到 head 的前面即可。
q_reverseK
想法跟 q_reverse
一樣,把節點放到 head 的前面,但是執行完 k 次之後,新的一輪不能放在上一輪的前面,所以要找出要插入的位置在哪邊。
如果使用 list_for_each_safe
來走訪節點,當執行完一輪後,我們可以把目前 safe->prev 的所在位置視為新一輪 reverse 的 head。但是要用另一個指標來記錄,因為 safe 會隨時間變動。
再來要考慮節點不足 k 個的情況,所以先看是否有足夠的節點可以倒轉。
但我在想或許我們可以善用上面寫的 q_reverse
,所以我想到了另一個方案:
把 K 個節點想成一個區塊,在每個區塊中,其實節點之間的順序與 q_reverse
一樣是反過來的,只是在q_reverseK
中,需要調整各個區塊之間的順序。所以我想要先全部 reverse 一次,再去調整區塊間的順序。
調整的順序由後往前開始走訪舊串列,每 K 個點為一組,移動到新串列的尾端,最後若是跑完迴圈,但舊串列裡面還有剩餘節點,就代表尾端有不足 K 個節點,倒轉回來即可。
蓋進你的漢語表達。
但尷尬的是,作業裡面的list.h
沒有 list_for_each_safe_reverse
或相關巨集,於是我改寫了 list_for_each_safe
,新增一個 reverse 的版本 。
#define list_for_each_safe(node, safe, head) \
for (node = (head)->next, safe = node->next; node != (head); \
node = safe, safe = node->next)
#define list_for_each_safe_reverse(node, safe, head) \
for (node = (head)->prev, safe = node->prev; node != (head); \
node = safe, safe = node->prev)
注意我們不能修改 list.h
和 queue.h
,不然會不能 commit。
q_sort
Linked List Sort 提及 4 種針對鏈結串列的排序方法,我選擇實作 Merge sort。
首先需要持續呼叫 q_sort
函式來分割鏈結串列,我們需要找到中間的節點,目前我有想到 2 種方法可以達到此效果:
struct list_head *slow = head, *fast = head;
do {
fast = fast->next->next;
slow = slow->next;
} while (!(fast == head || fast->next == head));
找到中心節點後,用 list_cut_position(*head_to, *head_from, *node)
將鏈結串列分成兩個,這個函式會把 head_from
串列中從第一個到 *node 節點分割到 head_to
串列,而原本的 head_from
串列就會剩下被分割的後半段。
將這兩個鏈結串列分別傳入 q_sort
做遞迴,最後呼叫 merge_two_sorted_list
來合併串列。
你如何確保目前的測試已涵蓋排序演算法最差的狀況?
merge_two_sorted_list
想法很簡單,先用另一個 head 來記錄合併的串列。由於兩個鏈結串列已經是排序好的情況,只需要依次比較兩個串列的頭節點的大小,找到需要的值,把該節點移至合併字串的尾端即可。
當偵測到其中一個串列為空,代表後續就不需要比較了,將另一串列接到合併串列的尾端即可。
q_descend
先建立一個新的佇列來存放答案,我稱為 ans_queue
,這個佇列會維持在 descending 的狀態。
從給定的鏈結串列的頭開始走訪,把目前節點的字串值從 ans_queue
的尾端往前比較,遇到比自己小的值就將它刪除,直到遇到比自己大或者一樣的字串就停下來,並且新增到 ans_queue
的尾端。
注意如果現在 ans_queue
為空,則無條件將目前節點放入 ans_queue
中。
q_merge
我的疑問是,該怎麼存取其他的佇列? 參考 25077667 同學的解法才知道要用到 queue_contex_t
。
那剩下就簡單啦,上面已經實作出 q_sort
和 merge_two_sorted_list
了,但同時我又想到另一個問題:
merge_two_sorted_list
合併以上哪個方法會比較快?
你的數學分析呢?
LIST_HEAD
與 INIT_LIST_HEAD
container_of
與 offset_of
存在的意義當我們想要走訪某個節點,並存取其中的資料時,需要透過 .
或 ->
的操作來獲取,其本質就是計算資料對於節點本身記憶體位址 (entry) 的偏移量。以 Linux 的鏈結串列舉例:
struct list_head {
struct list_head *next, *prev;
};
struct Node {
int value1; //結構在記憶體中的起始位址
int value2;
struct list_head list; //節點之間的連結在這裡
};
struct Node node;
struct Node *node_ptr;
我們可以透過 node.value1
或 node_ptr->value2
來拿到該結構中的資料。
在結構定義中,記憶體位址的順序是按照變數宣告順序排列的,也就是說第一個變數所在的記憶體位置等同於此節點本身在記憶體中的位址 (entry),但在我們自行定義的結構中,list
不一定會宣告在結構的第一個位置。
再觀察 list_head
結構中,next
與 prev
都是指向跟自己相同的結構,可以得知,Linux 所設計的鏈結串列在走訪節點的時候,是透過 list_head
作為媒介。
這時候問題來了:我們該如何拿到該結構中的資料?
我們走訪節點時,指標所指到的記憶體位址「並不是」該節點在記憶體中的「起始位址」。 所以我們不能夠單純的使用 ->
來存取資料,因此 linux 提供了 container_of
和 offset_of
巨集來獲取節點在記憶體中的起始位址 (entry),也就誕生了 list_for_each_entry
與後續相關巨集。
linux/list.h
內部函式的實作方法在研讀文件的時候,發現很多功能類似的函式都是呼叫相同的函式,只差在傳進去的參數的不同,對應到 linux 喜歡把特例變成一般的情況。
舉例來說:
static inline void list_add(struct list_head *new, struct list_head *head)
{
__list_add(new, head, head->next);
}
static inline void list_add_tail(struct list_head *new, struct list_head *head)
{
__list_add(new, head->prev, head);
}
兩個函式都會呼叫 __list_add
函式,但傳進去的參數不同。
當我在測試一些東西的時候,看到了自動測試程式有以下輸出:
cmd> size
ERROR: Computed queue size as 1, but correct value is 0
l = [ ... ]
就覺得很好奇為何會有這種情況發生,於是我嘗試以下操作:
cmd> it 1
l = [3 ... ]
ERROR: Queue has more than 1 elements
cmd> rt
Removed 1 from queue
l = [ ... ]
...
存在於鏈結串列中,但是他卻無法被移除,而且也無法觀察該節點的資料。
我發現好像是因為自動測試程式會依據函式返回的值來判斷鏈結串列目前應該要有多少個節點,如果不符合就會出現上面那種情況。
理工人說話應該避免「好像」,拿證據來說話!
改進你的漢語表達。台大電機系李宏毅教授對於 ChatGPT 的看法是,要善用人工智慧的成果,一旦人們得以善用,人類的書寫不該比 GPT 一類的語言模型差。
\(\to\) 2023 年 Linux 核心設計/實作課程第一次作業檢討
list_for_each_entry_safe
裡面用的 entry 和 safe 不用 malloc ?因為它實際上並沒有像節點一樣儲存資料,只是一個用來指向節點的指標。
lib/list_sort.c
lib/list_sort.c
merge
在 merge
函式中,我上面的方法是使用 list_move
來移動比較過後的節點,但 Linux 卻只使用了三行程式碼來完成,且運用到了 pointer to pointer 的概念。且它並不是一次只移動一個節點到 head
,而是將該節點所在串列移動到 head
的尾端。以下是部分程式碼:
struct list_head *head, **tail = &head;
// a and b are two sotred list heads
if (cmp(priv, a, b) <= 0) {
*tail = a;
tail = &a->next;
a = a->next;
if (!a) {
*tail = b;
break;
}
}
我們是需要自己去定義 cmp
的。而對於 priv
的解釋則是參考 vax-r 的解釋:
可以注意到函式的第一個參數 priv ,可以在 /lib/test_list_sort.c 當中看到 priv 是用來使得 check 這個函式經過 KUnit 測試後可以 failed。
merge_final
再來看到 merge_final
,他比起 merge
多了下面這個步驟,b
是當其中一個串列為空時,剩下的串列所在的位置。這看起來是很沒必要的行為,因為是拿同一個節點進行比較,但凡事都有原因。
do {
if (unlikely(!++count))
cmp(priv, b, b);
b->prev = tail;
tail = b;
b = b->next;
} while (b);
註解提到:
If the merge is highly unbalanced (e.g. the input is already sorted), this loop may run many iterations. Continue callbacks to the client even though no element comparison is needed, so the client's cmp() routine can invoke cond_resched() periodically.
如果其中一個串列還剩很多節點,持續呼叫 cmp()
可以週期性的呼叫 cond_resched()
,這個函式與 Kernel 安排 CPU time 有關。
list_sort
如果有兩個尚未合併的 \(2^k\) 大小的串列,等到下一個也是 \(2^k\) 大小的串列出現,前兩個就會合併。
只要這 \(3*2^k\) 個節點可以全部放到快取中,就可以避免 cache thrashing,簡單來說就是一直重複替換快取中某個地方的資料,非常浪費時間。
count 會持續記錄尚未合併的節點數量,當達到 \(2^k\) 的奇數倍時,就會啟動合併。
發生 2 次合併後,所產生的 2 個 \(2^{k+1}\) 大小的串列會在第 3 個串列出現前合併,所以永遠不會有過多沒有合併的串列。
程式碼分析:
*list = head->next
:要排序的串列的第一個節點*pending = NULL
:還沒合併的串列count = 0
:pending
中節點的數量**tail = &pending
:指向 pending
指標的指標,它的作用是尋找 pending
中要做合併的位置,並且會指向該合併點尾端的指標在整個過程中,list->prev
會持續指向 pending
。
有趣的是,在第一行,程式就把串列變為暫時的單向鏈結串列。但卻沒有更動頭的 prev
。
/* Convert to a null-terminated singly-linked list. */
head->prev->next = NULL;
當 count
的 LSB 是 1 的時候,才會進入迴圈,並且持續右移,直到有 0 出現。這邊在尋找要合併的位置。
/* Find the least-significant clear bit in count */
for (bits = count; bits & 1; bits >>= 1)
tail = &(*tail)->prev;
這邊的意思是,如果 bits
裡面還有剩餘的 1 的話,就會啟動合併。換言之,如果目前 count + 1
為 2 的冪,則不合併。合併後的串列會儲存在 a
,後續再修改 pending (*tail)
來指到剛合併完的串列。
一開始在追蹤程式碼的時候可能會覺得很奇怪,因為 pending
一開始是只有藉由 prev
連接的單向鏈結串列,而下面是將 tail
所在位置與 tail->prev
進行合併,而在第一次呼叫 merge
之後才會將 pending
中的 next
指標接好。
if (likely(bits)) {
struct list_head *a = *tail, *b = a->prev;
a = merge(priv, cmp, b, a);
/* Install the merged result in place of the inputs */
a->prev = b->prev;
*tail = a;
}
使用 bottom up 的方法實作 merge sort 相較於 top down 方法省去了 partition 的步驟,由於合併的過程是與相鄰的串列進行,這方法會讓最近被合併過的資料,在不久後馬上跟附近的串列合併,在快取中的使用率會相對地頻繁。
你連數學分析和實驗都沒做,怎麼會有「結論」?
陸游:「紙上得來終覺淺,絕知此事要躬行」
list_sort
到 lab0-c 當中list_sort.c
避免非必要的項目縮排 (即 *
和 -
),以清晰、明確,且流暢的漢語書寫。
首先修改需要 include
的檔案。
#include <linux/kernel.h>
- #include <linux/bug.h>
- #include <linux/compiler.h>
- #include <linux/export.h>
#include <linux/string.h>
- #include <linux/list_sort.h>
- #include <linux/list.h>
+ #include <list_sort.h>
+ #include "queue.h"
新增 likely
與 unlikely
巨集。
# define likely(x) __builtin_expect(!!(x), 1)
# define unlikely(x) __builtin_expect(!!(x), 0)
把變數型態宣告 u8
改成 uint8_t
。
- u8 count = 0;
+ uint8_t count = 0;
新增 cmp
函式,把 priv
移除,並加入指定 descend
功能,記得要修改 nonnull
的變數位置。
int cmp(const struct list_head *a, const struct list_head *b, bool descend)
{
element_t *a_entry = list_entry(a, element_t, list);
element_t *b_entry = list_entry(b, element_t, list);
if (!descend)
return strcmp(a_entry->value, b_entry->value) < 0 ? 0 : 1;
else
return strcmp(a_entry->value, b_entry->value) > 0 ? 0 : 1;
}
把最後面的 EXPORT_SYMBOL
移除,暫時用不到
q_test.c
首先新增標頭檔。
+ #include "queue.h"
+ #include "list_sort.h"
並且新增 do_lsort
命令,我是直接參考裡面的 do_sort
寫法,沒有更改太多東西。
Makefile
OBJS := qtest.o report.o console.o harness.o queue.o \
random.o dudect/constant.o dudect/fixture.o dudect/ttest.o \
shannon_entropy.o \
- linenoise.o web.o
+ linenoise.o web.o list_sort.o
我們可以藉由三種數據來比較各個排序演算法之間的差異:
我選擇比較執行時間的差距,在 Dude, is my code constant time? 中提到,可以測量程式的 CPU cycle 來做為時間的依據。但我就沒有再執行後續更嚴謹的步驟,也就是設立虛無假說和計算 F 值。
以下設計是參考 chiangkd 的方法,我預計使用隨機生成且不同大小的測資,測量 10 次來取平均。大概測到 500 萬左右個節點的時候就記憶體就爆了滿了。
何謂「爆了」,用精準清晰的漢語書寫。
command 是「命令」,而非「指令」(instruction)
執行以下命令 :
sudo perf stat --repeat 10 -e instructions,cycles ./qtest -f traces/trace-sort.cmd
我原本是想要用 cycles 作為比較依據的,但遇到了一點麻煩,目前無法解決,所以先用時間。
q_sort
Data Size | Average Time (Sec) |
---|---|
50000 | 0.0207 |
100000 | 0.0630 |
500000 | 0.4865 |
1000000 | 1.0961 |
list_sort
Data Size | Average Time (Sec) |
---|---|
50000 | 0.0167 |
100000 | 0.0607 |
500000 | 0.4223 |
1000000 | 0.9389 |
確實有比較快的趨勢。
用你的電腦科學知識來解讀!
在一些已經存在的方法中,需以某種方式對硬體進行模擬,而 CPU 的訊息並不是完全公開的,在實現上難度很高,這是它們共同的缺點。
而 dudect 是基於 leakage 偵測,來判斷程式碼的複雜度是否為 constant time 的一種工具。
當設定 simulation
為 1 後,在呼叫 it
, ih
, rt
, rh
這 4 種命令時,會呼叫相對用來測試是否為 constant time 的函式:
bool ok = pos == POS_TAIL ? is_insert_tail_const() : is_insert_head_const();
if (!ok) {
report(1, "ERROR: Probably not constant time or wrong implementation");
return false;
}
report(1, "Probably constant time");
return ok;
但我怎麼都找不到這些函式被定義在哪邊,直到我看到 var-x 同學提到它寫在 fixure.h
中才知道。但這部分實在超出我的能力值太多,所以先放著。
#define _(x) bool is_##x##_const(void);
DUT_FUNCS
#undef _
總歸來說,最終呼叫的是 test_const
這個函式,並用 mode
來區分目前是測試哪個命令。其中會持續的使用 doit
來測試,只要有一次通過,程式就會中斷並判定為 constant time。相對的,如果跑了足夠多次並且都沒有成功,則宣布不是 constant time。
static bool test_const(char *text, int mode)
{
bool result = false;
t = malloc(sizeof(t_context_t));
for (int cnt = 0; cnt < TEST_TRIES; ++cnt) {
printf("Testing %s...(%d/%d)\n\n", text, cnt, TEST_TRIES);
init_once();
for (int i = 0; i < ENOUGH_MEASURE / (N_MEASURES - DROP_SIZE * 2) + 1;
++i)
result = doit(mode);
printf("\033[A\033[2K\033[A\033[2K");
if (result)
break;
}
free(t);
return result;
}
在 doit
裡面,以下為主要測試區塊的程式碼
bool ret = measure(before_ticks, after_ticks, input_data, mode);
differentiate(exec_times, before_ticks, after_ticks);
update_statistics(exec_times, classes);
ret &= report();
measure
: 被定義在 constant.c
中,它的測量方法是先插入一個字串,之後再測量插入/刪除另一個字串的時間。differentiate
:計算 before_ticks
與 after_ticks
的差值。update_statistics
:檢測測量數據的平均值。report
:藉由檢查 t 統計值 max_t
是否超過閾值來判斷仿照 dudect.h 對 lab0-c 做處理。詳細程式碼請見 commit 82be777。
t_context_t
結構中新增 percentiles
,方便統一管理。prepare_percentile
。update_statistics
中,這個迴圈省去了前面 10 筆的資料,我也仿照它的作法。改進程式碼之後,終於拿到 分了。
確認程式碼和論文出入之處,就可準備提交 pull request
與 SuNsHiNe-75 協作
這是一個 C 函式庫,用來處理命令列單行輸入,並支援自動補齊命令和命令歷史記錄功能,間接讓命令列應用程式的開發變得更便利。
*linenoise()
:linenoise 函式庫的主要 API。它首先透過檢查「Stupid terminals 的黑名單」來檢查終端是否具有基本功能。根據結果,擇一執行下列動作:
line_edit()
fgets()
函式,確保在最不利的情況下,使用者還是能輸入命令。line_raw()
:使用「設置為 raw mode 的 STDIN 檔案描述子」來呼叫 line_edit()
。
Raw mode:輸入的資料將不會被緩衝,而是直接傳遞給應用程序,同時輸出的資料會直接發送到終端,而不經過作業系統的進一步處理。
呼叫
line_edit(STDIN_FILENO, STDOUT_FILENO, buf, buflen, prompt)
。
line_edit()
:行編輯能力的核心。它期望 fd
已經處於 raw mode,這樣每按一個鍵都會立即回傳給 read()
。當使用者輸入 enter 或 ctrl+d 時,「結果字串」將被放入 buf
中,最後回傳當前緩衝區的長度。
write(l.ofd, prompt, l.plen)
參考 write(2),其將資料寫入到檔案描述子對應的檔案,可在終端機代替 printf 輸出對應 prompt。
read(l.ifd, &c, 1)
參考 read(2),其在檔案描述子對應的檔案中讀取資料,可取代 scanf 在命令列讀取資料。
select()
以下為此函式所要傳入的參數,參考 Linux Manual Page-select() 文件:
readfds
:檢查這些檔案描述子是否準備好被進行讀取。在 select()
return 後,readfds
會清除所有未準備好被讀取的檔案描述子。writefds
:檢查這些檔案描述子是否準備好被進行寫入。在 select()
return 後,writefds
會清除所有未準備好寫入的檔案描述子。nfds
:此參數應設置為任一集合中「最大檔案描述子的數字加 1」。以下為此函數的回傳值:
readfds
、writefds
和 exceptfds
中被設置的檔案描述子數量。console.c
在我嘗試理解 console.c
時,大概可以感覺到與命令列及 linenoise 互動相關的重要函式為 do_web()
、cmd_select()
、run_console()
。
do_web()
預設 port 為 9999,並呼叫 web.c 中的 web_open()
來建立網頁伺服器。
若成功,則輸出相關訊息、設定 web_fd 為 1,並設定 use_linenoise
參數為 false。
若失敗則 perror("ERROR")
。
cmd_select()
類似 select
函式,但會先檢查命令輸入是否存在於內部緩衝區或從命令輸入中可讀取。如果沒被阻擋的話,把緩衝區中的 fd
加入到 select
的 readset
中,如果 web 還沒準備好監聽,也把 web_fd
加入到 readset
中,再調整 nfds
。
呼叫 select()
,然後觀察哪個檔案描述子還在 readset
中,分別執行對應操作,最後回傳 result 值。
run_console()
該函式會呼叫一堆 linenoise.c 中定義的 API,值得注意的是也會判斷先前定義的 use_linenoise 參數的布林值,若為 "true",才會進一步提供命令自動補齊、歷史紀錄等操作。
我觀察到以下程式碼:
if (web_fd > 0) {
printf("listen on port %d, fd is %d\n", port, web_fd);
linenoise_webfd(web_fd);
use_linenoise = false;
}
是不是只要單純改一個變數就可以解決問題? 怎麼可能。
在我將 use_linenoise
設為 true
後,發現 qtest
端能正常運行 linenoise 套件。然而另一終端機遠端輸入指令時卻直接卡住了。
此外,若將 run_console()
中 while()
判斷中的 use_linenoise
移除,結果也是如此。
commit 09478ba
原本的流程如下:
do_web
把 use_linenoise 設為 false,所以 run_console()
會執行 socket 的相關操作,也就不會有 linenoise 的運作了。
既然 stdin 及 socket 都會執行 cmd_select()
,那我在 cmd_select()
中直接引入 linenoise。
- char *cmdline = readline();
+ char *cmdline = linenoise(prompt);
如此改動會造成在 line_edit()
中的 read()
被阻塞時,linenoise 套件會不能運作。也就是輸入 web 命令後,linenoise 會暫時失效,等到另一終端機下命令後,linenoise 才可重新運作。
參考 vax-r
若將linenoise()
擺在select()
之後才執行,也就是確認是 std input file descriptor 變為 ready 時才使用,則會導致我們必須按下兩次 ENTER 才能正確執行一次命令,因為第一次 ENTER 用來通知select()
這個 fd 的狀態改變,第二次 ENTER 則是用來跳出line_edit()
函式。
既然最後都要呼叫 line_edit()
,則將 select()
移到該函式中,同時監控 stdin 之檔案描述子或 socket 之檔案描述子就好。
只需要在 cmd_select()
將兩邊的命令傳入 linenoise()
,最後再呼叫 line_edit()
即可。
在後續實驗中,發現如果 qtest
端有輸入字元但是沒有按 Enter,Web 端發送命令後會等待qtest
端完成命令輸入後,才會執行 Web curl。
與 SuNsHiNe-75 協作
分為 Original version 及 Modern version:
設有一陣列 arr = {1, 2, 3, 4}
及一空陣列 rst = {}
。
剛開始時每個數字的選取機率皆為 \(\frac{1}{4}\),假設選中 3,則將其從 arr
移除,並移到 rst[0]
的位置,rst = {3}
。
第一輪過後 arr = {1, 2, 4}
,則目前每個數字的選中機率為 \(\frac{1}{3}\),以此類推,直到 arr
陣列為空。
先預設目前排頭是陣列中的第一個數字,從排頭之後 (不包含排頭) 的陣列中隨機選取數字後,將其與目前排頭交換,每完成一次步驟,排頭往後一個數字。
改進你的漢語表達。
此方法的優點在於時間複雜度與空間複雜度的降低-時間複雜度由 \(O(n^2)\) 降為 \(O(n)\),空間複雜度則由 \(O(n)\) 降為 \(O(1)\)。
用程式把上述的 Modern version 實作出來,再用前面提到的方法來新增一個 shuffle 命令即可。
根據下方輸出及統計圖表的結果可知,Fisher–Yates Shuffle 演算法是 Uniform Distribution。
Expectation: 20833
Observation: {'1234': 20976, '1243': 20984, '1324': 20687, '1342': 20555,
'1423': 21081, '1432': 20716, '2134': 20734, '2143': 20831, '2314': 20966,
'2341': 20991, '2413': 20496, '2431': 21126, '3124': 20804, '3142': 20719,
'3214': 20754, '3241': 20679, '3412': 20739, '3421': 21037, '4123': 20927,
'4132': 20781, '4213': 20876, '4231': 21005, '4312': 20884, '4321': 20652}
chi square sum: 30.79249267988288
ttt
命令在 qtest.c
中新增一條命令:
ADD_COMMAND(ttt, "Tic-Tac-Toe", "");
並實作 do_ttt
函式,把 jserv/ttt 中必要的檔案引入到 lab0-c 中。
我在 do_ttt
結束前會把 move_count
設為 0,這樣本輪紀錄的步數就不會留到下一輪。
+ move_count = 0;
其中,在 jserv/ttt/tree/main/agents 有提供已經寫好的 negamax AI,以及 zobrist hashing 與 mt19937-64 等若干工具可直接使用,詳細請見 commit f3ba748。
以下黑白棋舉例,使用 Monte Carlo Tree Search 會讓電腦在下每一步棋之前計算他這次下在哪邊會得到最高的勝率。但要注意,他並不會考慮到之後的所有延伸結果,因為其複雜度是無可計量的,因此這其實不是真正意義上的最佳解。
主要分成 3 個步驟:
可以發現在 mcts.c 中與 double score
有關的操作皆使用浮點數運算,從這邊開始進行定點數運算的優化。
double
本身是 64-bit 的雙精度浮點數資料型態,其儲存的格式如下:
為了方便比較大小,其指數部分是使用 bias 形式表示,而不是使用「二補數」表示,就像我們在計算機組織中學習的一樣。
首先我們要知道,為何 Linux 核心會偏好定點數的實作,理由在於定點數的運算比浮點數要快,需要的硬體也比較少。假如已經事先知道要計算的數值範圍,並且限制在較小的範圍內,定點數運算可以有效的使用其位元。
使用定點數運算並不會受到是否有浮點運算器(FPU)的影響,可移植性比浮點數高,且不是所有 Linux 支援的硬體都具備 FPU 等因素,因此 Linux 核心比較建議使用定點數運算。
定點數表示法中,小數部份和整數部份一樣,表示為進制底數 \(b\) 的冪次,不過是以負數冪次來表示。即若儲存了 n-bit 的小數,其數值一定是 \(b^{−n}\) 的整數倍。
定點數類型的值其實就是個整數,但需要額外作比例進位,而進多少位,則需要根據具體的定點數類型決定。如二進制定點數下的負數常常會用二補數型式下的有號整數表示,其中隱含著縮放係數(或小數長度)。
舉個例子,8-bit 二進制有號整數 \((11110101)_2 = −11\),若分別配合 -3, +5 及 +12 的縮放位元,它們的數值即為 \(\frac{−11}{2^{−3}} = −88\)、\(\frac{−11}{2^5} = −0.34375\),和 \(\frac{−11}{2^{12}} = −0.002685546875\)。
下列是定點數運算的常見規則:
縮放係數就是用來確定小數點的位置在該整數型態的哪一位,即 fractional bits。
較大的縮放係數可以增加定點數的範圍,但相對犧牲的是精度。反過來說,較小的縮放係數可以提高精度,但須要犧牲可表示的範圍。
鑒於 double 為 64-bit,實作上我利用 stdint.h 中定義的 uint64_t
型態來取代原 score
的 double
型態。另設定縮放位元有 12-bit,即縮放係數為 \(2^{12}\)。
縮放係數固然越大越好,但也不能犧牲掉計算精度,藉由觀察相關運算函式(uct_score
及其中的 log
等)後,覺得此縮放係數在這個應用中可以兩者兼具,且不會有溢位的問題。
首先引入 stdint.h,並將相關變數型態改為 uint64_t
。
再來定義縮放係數。
+ #define FIXED_SCALING_BITS 12
calculate_win_value
將其回傳值型態改成定點數即可。注意回傳值型態為 uint64_t
。
uint64_t calculate_win_value(char win, char player)
{
if (win == player)
+ return 1ULL << FIXED_SCALING_BITS;
- return 1.0;
if (win == (player ^ 'O' ^ 'X'))
+ return 0ULL;
- return 0.0;
+ return 1ULL << (FIXED_SCALING_BITS - 1);
- return 0.5;
}
sqrt
使用 二分逼近法 來實現。
uint64_t fixed_sqrt(uint64_t x)
{
if (x <= 1)
return x;
uint64_t low = 0ULL;
uint64_t high = x;
uint64_t precision = 1ULL << (FIXED_SCALING_BITS - 2);
while (high - low > precision) {
uint64_t mid = (low + high) / 2;
uint64_t square = (mid * mid) >> FIXED_SCALING_BITS;
if (square < x) {
low = mid;
} else {
high = mid;
}
}
return (low + high) / 2;
}
log
實作 log 的方法有很多種,我選擇比較常見也比較容易實現的泰勒級數展開。
將一個函數「以多項式來表示」的一種方式。
一多項式函數 \(f(x)\) 在 \(x = a\) 時,\(n\) 階的泰勒展開式 \(P_n(x)\) 是:
\(P_n(x) = f(a) + \frac{f^{(1)}(a)}{1!}(x-a)^1 + \frac{f^{(2)}(a)}{2!}(x-a)^2 +...+ \frac{f^{(n)}(a)}{n!}(x-a)^n + R_n(x)\)
而對於對數函數,可以使用「自然對數」的泰勒展開式來近似計算。
\(ln(1+x) = x - \frac{x^2}{2} + \frac{x^3}{3} - \frac{x^4}{4} +......\)
static uint64_t fixed_log(uint64_t x)
{
uint64_t fixed_x = x << SCALING_BITS;
if (x == 0) return UINT64_MAX;
if (x == 1) return 0ULL;
uint64_t result = 0;
for (int i = 1; i <= 16; ++i) {
if (i % 2 == 0) {
result -= fixed_power_int(fixed_x, FIXED_SCALING_BITS, i) / i;
} else {
result += fixed_power_int(fixed_x, FIXED_SCALING_BITS, i) / i;
}
fixed_x = (fixed_x * (x << SCALING_BITS)) >> SCALING_BITS;
}
return result;
}
在 \(x\) 的冪次部分,使用教材中提供的 fixed_power_int 函式來實現,並修改為回傳值為 uint64_t
。
詳細實作請見 commit 53ec983。
在 qtest
中加入藉由 option 命令來調整為電腦 vs. 電腦的選項:
// global variable
int ai_vs_ai = 0;
// in console_init()
add_param("AI_vs_AI", &ai_vs_ai, "Let AI and AI play Tic-Tac-Toe", NULL);
在原本是玩家的回合中,檢測目前是否為電腦 vs. 電腦,如是,則讓 AI 決定下一步:
if (turn == ai) {
int move = negamax_predict(table, ai).move;
if (move != -1) {
table[move] = ai;
record_move(move);
}
}
else
{
+ if (ai_vs_ai) { // AI vs. AI
+ int move = negamax_predict(table, turn).move;
+ if (move != -1) {
+ table[move] = turn;
+ record_move(move);
+ }
} else { // Player vs. AI
draw_board(table);
int move;
while (1) {
move = get_input(turn);
if (table[move] == ' ') {
break;
}
printf("Invalid operation: the position has been marked\n");
}
table[move] = turn;
record_move(move);
}
}
經過我反覆測試發現,每次的電腦 vs. 電腦對局的過程都會一模一樣。為了讓此對局有變化性,我決定讓先手的電腦隨機選擇第一步。
由 get_input
函式可以知道,玩家輸入的英文字母及數字會被轉變為 { 0
~ BOARD_SIZE * BOARD_SIZE - 1
} 之間的數字,所以我讓電腦在第一步時隨機選擇範圍內的數字。
bool first_move = true;
if (first_move) {
move = rand() % (BOARD_SIZE * BOARD_SIZE - 1);
first_move = false;
}
最後再加入顯示時間的功能,此部分參考 vax-r 同學的程式碼:
draw_board(table);
time_t timer = time(NULL);
struct tm *tm_info = localtime(&timer);
char buffer[26];
strftime(buffer, 26, "%Y-%m-%d %H:%M:%S", tm_info);
puts(buffer);