# 2024q1 Homework1 (lab0)
contributed by < `csotaku0926` >
## 開發環境
```shell
$ gcc --version
gcc (Ubuntu 11.4.0-1ubuntu1~22.04) 11.4.0
$ lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
Address sizes: 39 bits physical, 48 bits virtual
CPU(s): 12
On-line CPU(s) list: 0-11
Thread(s) per core: 2
Core(s) per socket: 8
Socket(s): 1
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 154
Model name: 12th Gen Intel(R) Core(TM) i5-12450H
Stepping: 3
CPU max MHz: 4400.0000
CPU min MHz: 400.0000
BogoMIPS: 4992.00
Virtualization: VT-x
L1d cache: 320 KiB
L1i cache: 384 KiB
L2 cache: 7 MiB
L3 cache: 12 MiB
NUMA node0 CPU(s): 0-11
```
## 針對佇列操作的程式碼實作
### `q_new`
```c
struct list_head *q_new()
{
struct list_head *head = malloc(sizeof(struct list_head));
if (!head)
return NULL;
INIT_LIST_HEAD(head);
return head;
}
```
需要注意由 `harness.h` 中巨集定義的 `malloc` 可能會有<s>分配</s> 配置失敗的狀況,這時會回傳 `NULL` 需要加以判斷。
:::warning
allocate 翻譯為「配置」,而非「分配」,是為了跟「分發」(dispatch) 區隔,二者都會在作業系統領域多次出現。
:::
:::warning
local variable 翻譯為「區域變數」,而非「局部變數」,以避免跟 partial 一詞混淆。在程式語言理論中,存在 "partial" 變數。
:::
:::danger
此處的「這邊也可以」和「一下」無助於溝通。
:::
<s>這邊也可以</s> 複習<s>一下</s> 函式回傳值的部份:回傳區域變數的位址是<s>不合法的</s>,例如:
```c
struct list_head head;
return &head;
```
根據 C99 [6.2.4] Storage Duration of Objects:
- An object has a storage duration that determines its lifetime. There are three storage durations: static, automatic, and allocated.
- An object whose identifier is declared with no linkage and without the storage-class specifier static has **automatic** storage duration.
> 由此可知道 `head` 適用於 automatic storage
- The lifetime of an object is the portion of program execution during which storage is guaranteed to be reserved for it. An object exists, has a constant address and retains its last-stored value throughout its lifetime. If an object is referred to outside of its lifetime, the behavior is undefined.
> 當 `head` 於他的存活時間 (lifetime) 之外存取,將被定義為 `undefined behavior` 。上述的「不合法」是不精確的說詞
:::warning
「我們都知道」這樣的陳述無助於專業溝通。
「回傳局部變數的位址是不合法的」不是精準的說法,請查閱 C 語言規格書,列出對應的描述並解讀。
> 已修正
:::
但如果是回傳指標,那就是回傳區域變數的數值 (value),正如 `q_new` 的回傳值,這是沒問題的
### `q_free`
初步的想法如下
```diff
void q_free(struct list_head *l)
{
+ if(!l)
+ return;
element_t *entry, *safe;
list_for_each_entry_safe(entry, safe, l, list){
free(entry->value);
free(entry);
}
free(l);
}
```
:::warning
參閱 [free(3)](https://man7.org/linux/man-pages/man3/free.3.html) 可知 "If ptr is NULL, no operation is performed.",意味著你不用撰寫形如 `if (entry->value) { free(entry->value); }` 的程式碼,直接呼叫 `free` 即可,不用額外的 `if` 敘述。
> 已修正
:::
根據 [你所不知道的 C 語言: linked list 和非連續記憶體](https://hackmd.io/@sysprog/c-linked-list#%E7%B0%A1%E5%8C%96%E7%9A%84%E5%AF%A6%E4%BD%9C) 一文,可以得知,在給定的結構體中加入 `sturct list_head` 就可使用一致存取介面的鏈結串列。
正因如此,實作 `q_free` 時不只有 `struct list_head`,還有 `struct element_t` 的記憶體要被釋放。
在逐一存取並進行刪除操作時,需要注意鏈結的維護。
綜合上述兩點,要選用的 `list.h` API 應為 `list_for_each_entry_safe`
然而,在測試 `malloc failure on new` 時會發生 `Segmentation Fault` 的錯誤,
利用 `make valgrind` 測試得到以下資訊:
```
+++ TESTING trace trace-13-malloc:
# Test of malloc failure on new
==5507== Invalid read of size 8
==5507== at 0x10F709: q_free (queue.c:28)
==5507== by 0x10B1BA: q_quit (qtest.c:1103)
==5507== by 0x10E3D4: do_quit (console.c:246)
==5507== by 0x10EF65: finish_cmd (console.c:635)
==5507== by 0x10D3CF: main (qtest.c:1261)
==5507== Address 0x8 is not stack'd, malloc'd or (recently) free'd
==5507==
Segmentation fault occurred. You dereferenced a NULL or invalid pointer
```
原來出錯的原因並非 `q_new` 的實作有誤,而是 `q_free` 沒有做好 NULL pointer 的檢查
### `q_insert_head`
初步想法如下
```c
bool q_insert_head(struct list_head *head, char *s)
{
if (!head)
return false;
element_t *node_element = malloc(sizeof(element_t));
if (!node_element)
return false;
node_element->value = malloc((strlen(s) + 1) * sizeof(char));
if(!node_element->value)
return false;
// strcpy copies null character too
strncpy(node_element->value, s, strlen(s) + 1);
list_add(&node_element->list, head);
return true;
}
```
根據 C99 [7.21.6.3] The strlen function:
> The strlen function returns the number of characters that precede the terminating null character.
因此進行記憶體配置時,要在 `strlen(s)` 後面再加一以存放 null character
利用 `list.h` 中的 `list_add` ,可將新建立的 `list_head` 新增於串列的開頭
### `q_insert_tail`
```diff
bool q_insert_tail(struct list_head *head, char *s)
{
// same as "q_insert_head"
if (!head)
return false;
- // ...
- list_add_tail(&node_element->list, head);
- return true;
+ return q_insert_head(head->prev, s);
}
```
和 `q_insert_head` 大體一致,只是自 `list.h` 使用的 API 函式換成了 `list_add_tail`
換個想法,可以直接利用 `q_insert_head` 完成這項功能,省去重複的程式碼
### `q_remove_head`
```diff
element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize)
{
// if queue is NULL or empty
if (!head || list_empty(head))
return NULL;
element_t *head_element = list_first_entry(head, element_t, list);
list_del_init(head->next);
if (sp) {
- strncpy(sp, head_element->value, bufsize - 1);
- strcat(sp, "\0");
+ strncpy(sp, head_element->value, bufsize);
+ sp[bufsize - 1] = '\0';
}
return head_element;
}
```
在 [作業說明(一)](https://hackmd.io/@sysprog/linux2024-lab0-a) 一文中已闡述 remove 與 delete 的區別。`q_remove_head` 的目的是要斷開第一項節點的連結。
因此根據 `list.h` 的註解說明,使用 `list_del` 斷開連結。
注意 `"\0"` 並不是正確使用 null character的方式
### `q_remove_tail`
```diff
element_t *q_remove_tail(struct list_head *head, char *sp, size_t bufsize)
{
// if queue is NULL or empty
if (!head || list_empty(head))
return NULL;
- element_t *tail_element = list_last_entry(head, element_t, list);
- // same logic as "q_remove_head"
- // ...
+ return q_remove_head(head->prev->prev, sp, bufsize);
}
```
與 `q_remove_head` 大體一致,只是要移除的元素改成以 `list_last_entry` 選取
這裡參考了 [yanjiew1](https://github.com/yanjiew1/lab0-c/blob/master/queue.c#L214) 的想法,利用 `q_remove_head` 達成<s>節省</s> 類似程式碼的效果。
:::warning
不是「節省」,而是減少對相同流程的程式碼進行維護的需要,亦即改進程式碼的可重用程度。
:::
### `q_size`
```c
int q_size(struct list_head *head)
{
if (!head)
return 0;
int len = 0;
struct list_head *node;
list_for_each (node, head)
len++;
return len;
}
```
走訪串列,每經過一個節點就將 `len` 加一
### `q_delete_mid`
原先的想法是建立 `start`, `end` 兩個指向 `struct list_head` 的指標。這兩個指標每次都往反方向走一步,相會的節點即是要移除的中間節點。這樣會需要走遍一整圈的串列的時間。
```c
bool q_delete_mid(struct list_head *head)
{
if (!head || list_empty(head))
return false;
// two-pointer approach
struct list_head *start = head, *end = head;
do {
end = end->prev;
if(start == end)
break;
start = start->next;
} while(start != end);
// delete the mid node
list_del(start);
element_t *mid_element = list_entry(start, element_t, list);
free(mid_element->value);
free(mid_element);
return true;
}
```
### `q_delete_dup`
```c
bool q_delete_dup(struct list_head *head)
{
// https://leetcode.com/problems/remove-duplicates-from-sorted-list-ii/
if (!head)
return false;
// for each list element
struct list_head *cur = head->next, *del = NULL;
bool remove_cur;
while (cur != head) {
// remove all node that has the same string as cur
del = cur->next;
remove_cur = false;
while (del != head &&
strcmp(list_entry(del, element_t, list)->value,
list_entry(cur, element_t, list)->value) == 0) {
remove_cur = true;
list_del_init(del);
free(list_entry(del, element_t, list)->value);
free(list_entry(del, element_t, list));
del = cur->next;
}
// assign next first so it can find next node before unlink
cur = cur->next;
if (remove_cur){
del = cur->prev;
list_del_init(del);
free(list_entry(del, element_t, list)->value);
free(list_entry(del, element_t, list));
}
}
return true;
}
```
一般而言,如果要在一個串列移除重複元素,能想的到較有效率的作法,是利用雜湊表紀錄每個獨一的元素。不過要用 C 語言實作雜湊表的話,需要考量不少因素(如雜湊函數的使用,雜湊表的大小等)
因此先用最基本的方式實作,也就是針對每個元素,向後尋找擁有相同字串的元素。
由於題目假設串列已經過排序,因此若下一個元素與目前元素不同,便能說接下來的元素不會再與目前元素擁有相同字串。
### `q_reverse`
原先想的較複雜,想要依序從 `head->next` 開始依序和最後一個節點交換連結。
參考 [komark06](https://hackmd.io/@komark06/SyCUIEYpj) 才發現我把問題複雜化了。原來只要把過程想成從第一個節點開始依序解除連結 (`list_del`),再重新加入到開頭就可以了 (`list_add`)。這是佇列 FIFO 的特性。
```c
void q_reverse(struct list_head *head)
{
if (!head || list_empty(head))
return;
// traverse through list, put to head in order...
// so previously last element be the first
struct list_head *node, *safe;
list_for_each_safe (node, safe, head) {
list_move(node, head);
}
}
```
### `q_reverseK`
這題較難以直觀的方式想出代碼。
於是參考 [leetcode](https://leetcode.com/problems/reverse-nodes-in-k-group/solutions/4675368/simple-and-easy-commented-code-iterative-approach/) 的想法後,發覺這題可以以類似遞迴的概念處理
換句話說,可以把每個 K 元素的子串列從原來的串列分割出來後,利用前面已經實作好的 `q_reverse` 做反轉的動作,再把其拼接到原來串列的開頭。
一開始的版本乍看沒有問題,但卻有程式陷入無窮迴圈的問題
原因是反向處理好的 k 個元素的子串列,不斷被插入到原先串列的尾端,導致 `end` 指標在佇列元素多於2的情況下,永遠無法到達 `head`
改進的實作方式為,將反轉過的子串列,放在用來走訪串列的指標之前,才不會重複拜訪
同時,使用 `list_entry_safe` 可以讓原本的程式碼變得更整潔
```diff
void q_reverseK(struct list_head *head, int k)
{
// https://leetcode.com/problems/reverse-nodes-in-k-group/
if (!head || list_empty(head))
return;
- struct list_head *end = head->next;
+ int count = k-1;
+ struct list_head *node, *cut, *next_head = head;
// dummy list head
LIST_HEAD(new_head);
// cut the remaining list out of the k-node list
- while (end != head) {
- for(int i=1; i<k && end != head; i++)
- end = end->next;
- if(end == head)
- return;
- end = end->next;
- list_cut_position(&new_head, head, end->prev);
- q_reverse(&new_head);
- list_splice_tail_init(&new_head, head);
- }
+ list_for_each_safe(node, cut, head) {
+ if (count--)
+ continue;
+
+ count = k-1;
+ list_cut_position(&new_head, next_head, node);
+ q_reverse(&new_head);
+ list_splice_init(&new_head, next_head);
+ next_head = cut->prev;
+ }
}
```
### `q_sort`
原先的版本會出現 `segmentation fault`
需要注意 `start` `end` 之間的關係
```diff
void q_sort(struct list_head *head, bool descend)
{
// zero or one element, or NULL
if (!head || head->next == head->prev)
return;
// find middle point (two-pointer)
struct list_head *start = head, *end = head;
do {
start = start->next;
end = end->prev;
- } while(start != end && end->next != start);
+ } while(start != end && start->next != end);
// parition (recursive)
LIST_HEAD(new_head);
list_cut_position(&new_head, head, start);
q_sort(&new_head, descend);
q_sort(head, descend);
q_merge_two(head, &new_head, descend);
}
```
`q_merge_two` 為另外設立的功能,作用是融合兩個已排序的串列
原先參考 [linux 核心風格](https://hackmd.io/@sysprog/c-linked-list#Linux-%E6%A0%B8%E5%BF%83%E7%9A%84-list_sort-%E5%AF%A6%E4%BD%9C) 的 `list_sort` 和 `merge`,以 indirect pointer 去實作
不過由於原先版本是針對非環狀鏈結串列,這讓改動增加許多困難,徒增許多成本,反而得不償失。
最後利用 `list.h` 內建功能,將這個操作實作於環狀鏈結串列
:::warning
你如何確保目前的測試已涵蓋排序演算法最差的狀況?
:::
### `q_descend`
```c
int q_descend(struct list_head *head)
{
// https://leetcode.com/problems/remove-nodes-from-linked-list/
if (!head || list_empty(head))
return 0;
element_t *current = list_last_entry(head, element_t, list);
struct list_head *it = head->prev->prev;
int count = 1;
while(current->list.prev != head) {
element_t *it_entry = list_entry(it, element_t, list);
if (strcmp(current->value, it_entry->value) > 0) {
// remove lesser value
it = it->prev;
list_del(it->next);
q_release_element(it_entry);
} else {
// update current
current = it_entry;
count++;
it = it->prev;
}
}
return count;
}
```
我們原本的要求,是要移除那些在其右側存在數值大於本身數值的節點。
換句話說,從尾端向開頭走訪節點時,節點值呈現遞增的順序。
因此,我們只需要移除那些不符合從右側走訪而來遞增的節點即可。
若改成從開頭走訪到尾端,將會增加實作的難度。
:::danger
改進你的漢語表達
:::
至於 `q_ascend` ,目前的作法是照抄 `q_descend` 的程式碼
但在判斷條件把「大於」改成了「小於」
是否有可以重複利用程式碼的方案 (正如 `q_insert_head` `q_insert_tail` 那樣) ?
### `q_merge`
#### 輸入參數
看到這題給的輸入參數,我有些疑惑。
怎麼用僅僅一個 `struct list_head*` 表達數個串列的架構呢?
於是我參考 `qtest.c` 是如何呼叫 `q_merge` 的 (詳見 `do_merge`)。觀察到以下程式碼
```c
// line 827 in qtest.c
if (q_size(&chain.head) > 1) {
chain.size = 1;
current = list_entry(chain.head.next, queue_contex_t, chain);
current->size = len;
```
這段程式碼<s>看起來</s>是在把數個串列合併到 `chain.head`,也就是 `q_merge` 的第一個輸入參數
:::danger
工程人員說話要精準明確,避免「看起來」一類的詞語。
:::
再往下看,可以看到 `list_entry(..., element_t, list)` 的操作
```c
for (struct list_head *cur_l = current->q->next;
cur_l != current->q && --len; cur_l = cur_l->next) {
/* Ensure each element in ascending order */
element_t *item, *next_item;
item = list_entry(cur_l, element_t, list);
next_item = list_entry(cur_l->next, element_t, list);
```
`current` 是用來存取每個子串列的架構 (`queue_contex_t` ), `current->q` 指向每個子串列的開頭。
#### 實作程式
目前的版本是直接將所有其他子串列以 `q_merge_two` 串接在第一條串列上
這是直觀但較沒效率的方法,應該參閱 [教材](https://hackmd.io/@sysprog/c-linked-list#%E6%A1%88%E4%BE%8B%E6%8E%A2%E8%A8%8E-LeetCode-21-Merge-Two-Sorted-Lists) 提出的方法重新改進
```c
int q_merge(struct list_head *head, bool descend)
{
// https://leetcode.com/problems/merge-k-sorted-lists/
if (!head || list_empty(head))
return 0;
queue_contex_t *first = list_entry(head->next, queue_contex_t, chain);
if (head->next == head->prev)
return first->size;
queue_contex_t *second = list_entry(first->chain.next,
queue_contex_t, chain);
int size = 0;
int n = q_size(head) - 1;
while (n--) {
size = q_merge_two(first->q, second->q, descend);
list_move_tail(&second->chain, head);
second = list_entry(first->chain.next, queue_contex_t, chain);
}
return size;
}
```
實測出現以下問題
```
+++ TESTING trace trace-03-ops:
# Test of insert_head, insert_tail, remove_head, reverse and merge
ERROR: Attempted to free unallocated block. Address = 0xdeadbeef
Segmentation fault occurred. You dereferenced a NULL or invalid pointer
```
`q_merge_two` 中需要將第二個參數傳入的 `list_head*` 初始化,也就是把 `list_splice` 改成 `list_splice_init`
並且,原先版本將 `second->q` 設為 NULL 判斷算法是否完成,這樣會發生 Segmentation fault,需要更換中止條件 (例如使用 `q_size`,但顯然不是最佳解)
---
## 改善 dudect 實作
目前我所撰寫的 `q_insert_head` `q_insert_tail` 無法通過第 17 筆測試項目。
```bash
csotaku:lab0-c$ ./qtest -f traces/trace-17-complexity.cmd
# Test if time complexity of
# q_insert_tail, q_insert_head, q_remove_tail, and q_remove_head is constant
ERROR: Probably not constant time or wrong implementation
ERROR: Probably not constant time or wrong implementation
Probably constant time
Probably constant time
```
考慮到目前以 `make valgrind` 測試沒有發現記憶體相關的問題,實作邏輯也符合常數時間的規範,在作業說明又提到目前 dudect 的實作存在致命缺陷。推斷是這個缺陷導致無法通過檢測。
### 研讀 dudect 論文
:::warning
注意標點符號的運用,超連結該有其對應的說明
:::
〈[Dude, is my code constant time?](https://eprint.iacr.org/2016/1123.pdf)〉提出一項檢測程式是否為常數複雜度的工具,其特點如下:
- 檢測方法是基於針對執行時間的 `leakage detection test`
- 利用統計方法而非靜態方法分析,使檢測結果不受特定硬體影響
接著,作者說明實作 `leakage detection test` 的流程
首先,對於兩種不同類型 (class) 的資料,測量密碼學函式的執行時間
其中一種測量方法稱為 "fix-vs-random" -- 其中一種類型的資料固定為一個常數,另一種則是隨機選取。固定資料的選取可用來觸發「特定」邊緣狀況 (corner-case)
得到測量結果後,在進行統計分析之前,作者談到可以應用的後處理 (post-processing)
- Cropping : 計時分佈經常正偏向於更大的執行時間,這可能是受到測量工具,或是其他行程的影響。可以將裁切過大的數值,或是制定一個獨立於資料類別並且固定的閾值,並移除大於這個閾值的測量資料
- Higher-Order preprocessing : 例如 centered product 可以模仿高級別的 DPA attack (這段並<s>不是很明白</s>)
:::danger
不懂就說不懂,不要假裝自己懂一點。誠實面對自己。
:::
最後,進行統計分析 -- 嘗試駁斥虛無假說 (null hypothesis) 「兩個資料的執行時間分佈是相等的」。作者提出以下幾種可行的檢驗方式:
- t-test ([Welch's t-test](https://en.wikipedia.org/wiki/Welch's_t-test)): 這項測試檢驗兩者分佈的平均數是否相同。若檢驗失敗表示存在 "first-order timing information leakage"。若是在分析之前已經進行過裁切處理,那麼就不只是針對平均數,而是也包含更高級別的 statistical moment (這邊並不是很理解)
:::info
Welch's t-test 是 Student's t-test 的改良版,定義詳見維基百科:
定義 $t=\frac{\Delta \bar{X}}{s_{\bar{X}}}$, $\bar{X_i}$: 第 i 筆資料的平均數; $s_{\bar{X_i}}$: 標準差, $N_i$ 是樣本數量
對於不相等的變異數而言,Welch's t-test 比起 Student's t-test 給出更相近於模擬值的 p-value
:::
- Non-parametric test
### 實作探討
首先來探討 [oreparaz/dudect](https://github.com/oreparaz/dudect/blob/master/src/dudect.h) 中的 `percentile` 是怎麼運作的,並且如何運用在這次作業上
在 lab0-c 的 `deduct` 目錄,有三個主要的 C 檔案,
- `constant.c` : "measure"函式中,呼叫測試函式後,以 `cpucycle` 測量執行時間
- `ttest.c` : 定義 Welch's t-test 相關數值
- `fixture.c` : 執行並統整數據,給出最終判斷
在利用了 Welch t-test 所給出的定義算出最大的 t value 後,將其與事先定義的閾值比較。
在測試足夠多筆資料的情況下:
- $t>500$ 表示肯定不是常數時間
- $t>10$ 表示可能不是常數時間
- 其餘狀況,表示可能是常數時間
觀察原作者 github 中的 `percentile`
```c
static int64_t percentile(int64_t *a_sorted, double which, size_t size) {
size_t array_position = (size_t)((double)size * (double)which);
assert(array_position < size);
return a_sorted[array_position];
}
static void prepare_percentiles(dudect_ctx_t *ctx) {
qsort(ctx->exec_times, ctx->config->number_measurements, sizeof(int64_t), (int (*)(const void *, const void *))cmp);
for (size_t i = 0; i < DUDECT_NUMBER_PERCENTILES; i++) {
ctx->percentiles[i] = percentile(
ctx->exec_times, 1 - (pow(0.5, 10 * (double)(i + 1) / DUDECT_NUMBER_PERCENTILES)),
ctx->config->number_measurements);
}
}
```
以及相關的一段註解
> set different thresholds for cropping measurements.
the exponential tendency is meant to approximately match
the measurements distribution, but there's not more science
than that.
可以推敲出,這段程式碼的主要功能是做 Cropping 後處理
參考以上程式碼改動,並做出對應調整後,這次再一次進行 `make test` 測試
可以發現由於上述程式將極端值去除,使得常數測試很快就通過了
:::warning
確認程式碼和論文描述是否存在出入?
:::
---
## 探討 `lib/lib_sort.c`
在 merge sort 中,比較次數以以下式子表示
$$
n \times \log_2{n} - K \times n + O(1)
$$
$n \times \log_2{n}$ 已經是目前理論的極限值,所以優化的方向會注重於改進 linear term ($K \times n$)
### 研讀論文 (commit [b5c56e](https://github.com/torvalds/linux/commit/b5c56e0cdd62979dd538e5363b06be5bdf735a09))
> 需要研讀論文並指出與核心程式碼的異同
- [ ] [Bottom-up Mergesort: A Detailed Analysis](https://doi.org/10.1007/BF01294131)
為什麼會說 bottom-up 比起 top-down merge sort 對 cahce 更加友善呢?
簡單而言,bottom-up 的過程就是一直合併,cache 元素參照的機會更大
top-down 涉及 parition,這麼做會導致元素不斷從 cache 進出,而導致 cache thrashing 的問題
::: info
所謂 cache thrashing 指的是,由於 cache 的容量不足應付存取需求,導致接下來一直出現 cache miss 的狀況
[案例](https://www.quora.com/What-is-cache-thrashing)
考慮一個 LRU cache,假設可存放四個元素,
接著考慮以下存取 pattern : A,B,C,D,E,A,B,C,D,E...
初始配置: A,B,C,D
要求 A: A,B,C,D
要求 B: B,A,C,D
要求 C: C,B,A,D
要求 D: D,C,B,A
要求 E: E,D,C,B (miss)
要求 A: A,E,D,C (miss)
...
接下來每個元素的存取都會導致 cache miss
:::
1. top-down merge sort
- 我實作的版本 (q_merge)
- 以遞迴將整個陣列拆成一個個元素 (稱為 run) 後,重新將這些 run 合併。
- 對於比較次數,在最差以及平均狀況下有最佳的 K
- 需要先知道輸入大小,但這麼做會導致 cache blocking (要將整個陣列讀入)
<img src="https://hackmd.io/_uploads/rJwkpo0Tp.png" alt="top-down" width="500">
> 節錄自 [M01: lab0](https://hackmd.io/@sysprog/linux2024-lab0-e#%E8%97%89%E7%94%B1%E6%B7%B7%E5%90%88%E5%BC%8F%E6%8E%92%E5%BA%8F%E6%BC%94%E7%AE%97%E6%B3%95%E4%BE%86%E6%94%B9%E9%80%B2%E6%95%88%E8%83%BD)
2. bottom-up
- 將整個陣列直接視為具有 n 個 run 的陣列,每次的 run 長度由 1 開始融合,依序乘以二,直到涵蓋至整個陣列為止
- 花費的平均比較次數最多
接著看到 commit 比較的另一種變形
- [ ] [The cost distribution of queue-mergesort, optimal mergesorts, and
power-of-two rules](https://www.sciencedirect.com/science/article/pii/S0196677498909865?via%3Dihub)
3. queue-merge sort
- breadth-first multi-pass structure
- 將每個元素視為佇列,利用 linear merge 將前兩個佇列合併後,放到串列尾端,直到串列只剩下一個元素
- 可以避免 commit 提及的 worst case -- 合併的子陣列彼此長度差距太大

> 節錄自 [The Cost Distribution of Queue-Mergesort, Optimal Mergesorts, and Power-of-2 Rules](https://www.sciencedirect.com/science/article/pii/S0196677498909865?via%3Dihub)
Linux 核心採取的策略有所不同:
- 以 depth-first order 代替 bottom-up: 當兩個一樣大小的子陣列出現時,立刻合併,盡可能 fit L1 cache (除了最後的合併)
- 如此做的成本將會反饋到最終的合併。commit 中也提到當陣列數量略為大於 2 的冪時 (e.g. 1028,使得在合併時需要與 $\frac{4}{5}$ 個陣列元素比較),將會增加成本
- 為了規避上述最壞情況,在合併時最不平衡的狀況下,所有 merge pass 至多來到 1:2 的情況
- 具體而言,程式會就上述合併策略進行,直到確定剩下 $2^n$ 的元素尚未合併
### 引入至 lab0-c
> 比較自己和 linux 核心實作 [merge sort](https://github.com/torvalds/linux/blob/master/lib/list_sort.c) 效能落差
#### 程式碼解讀
linux 核心的 list_sort 與 lab0-c q_sort 不同,沒有提供 descend 的參數輸入; 相對的,允許指定 `list_cmp_func_t` 函式指標,於是呼叫端可指定比較用的函式,較 q_sort 實作靈活
:::warning
「融合」是否適合在資料排序的場景?注意用語。
:::
另外,由於 list_sort 在排序前會將循環串列拆成單向串列,這麼一來,就不能使用 list.h 的內建函式
- [ ] 當剩下 $2^n$ 的元素就不合併 (merge) ,以達成最不平衡也有 2:1 的 merge pass
list_sort 使用 pending 儲存已排序的子陣列 (大小為 2 的冪),彼此使用原先陣列 (list 參數) 的 prev 指標聯繫。這是很精妙的設計,這麼一來就不需要再額外宣告額外指標儲存
每回合中,一旦 pending 中所剩餘的元素數量等同於這對子陣列的元素數量,每對位於 pending 的子陣列會進行合併
換句話說,count + 1 若為 $2^k$ 則不合併,反之亦然
e.g.
count = 0, count + 1 = $2^0$ --> no merge
count = 1, count + 1 = $2^1$ --> no merge
count = 2, count + 1 = $2^1+1$ --> merge!
當 count + 1 是 2 的冪時,count 的二進位應當是由連續的 1 構成的
一個精妙判斷的方式是尋找 least significant bit
```c
/* Find the least-significant clear bit in count */
for (bits = count; bits & 1; bits >>= 1)
tail = &(*tail)->prev;
```
如果剩下的 bits 等於 0 就表示 count + 1 是二的冪
```c
do {
size_t bits;
struct list_head **tail = &pending;
/* Find the least-significant clear bit in count */
for (bits = count; bits & 1; bits >>= 1)
tail = &(*tail)->prev;
/* Do the indicated merge */
if (likely(bits)) {
struct list_head *a = *tail, *b = a->prev;
a = merge(priv, cmp, b, a);
/* Install the merged result in place of the inputs */
a->prev = b->prev;
*tail = a;
}
/* Move one element from input list to pending */
list->prev = pending;
pending = list;
list = list->next;
pending->next = NULL;
count++;
} while (list);
```
(這裡有個細節,合併時要注意參數順序 `a = merge(priv, cmp, b, a);` 否則會造成不穩定排序的狀況,原版程式的變數取名為 a, b 實在是讓人困惑)
首先 pending 指向 NULL (沒有元素),在 while 迴圈中 tail 先移動至待合併的節點。
當 if 判定要合併時,選定倒數兩個子串列進行合併 (合併結果儲存於 a )
而後,`a->prev = b->prev` 的用意是 b 之前的子串列,串接到已排序好的 a 之前
不妨這樣想:
<s>

</s>
```graphviz
digraph G {
node [shape = box, style=rounded]
tail [shape=record, color=white, fontcolor=green];
node3 [label="4"];
node4 [label="2"];
node5 [label="5"];
node6 [label="3"];
{rank=same; node3 -> node4 -> node5 -> node6 [label=prev, dir=back] }
tail -> node6;
node7 [label="2 4"];
node8 [label="3 5"];
node3 -> node7; node4 -> node7;
node5 -> node8; node6 -> node8;
}
```
:::danger
使用 Graphviz 製圖,並嵌入到 HackMD 筆記中。
:::
假設 a 指向 3 這個節點,b 指向 5,當 a b 合併時,就視為一個新的子串列,所以 a->prev 也要指向 b->prev,才能指向下個尚未合併的子串列
注意合併完後,也需要更新 tail 到合併完的子串列
最後,將 list 指向的節點「移動」至 pending
這裡出現一個陌生的巨集: likely
於 linux kernel 2.6 後被定義在 `include/linux/compiler.h` 中
```c
#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)
```
:::warning
避免濫用「通過」,否則你如何區分 pass 的翻譯呢?
:::
<s>通過</s> 藉由這兩個巨集,可以讓編譯器編譯組合語言時做最佳化 (`likely` 表示較可能發生,例如 cache spatial locality 的考量),看來要學 Linux 核心也應當學好編譯器理論
:::warning
1. 避免過多的中英文混用,macro 的翻譯「巨集」在台灣的出版刊物存在數十年。
2. 翻閱 GCC 手冊和計算機組織/結構的教科書,理解 branch prediction 的議題,如此才能解釋上方的 GCC extension
:::
- TODO: Profile-guided optimization 檢閱
- gcc 內建函式:
最後用了看似有些複雜的手段,進行最後合併
原因就是要在最壞情況下保持 2:1 的大小比例
```c
list = pending;
pending = pending->prev;
for (;;) {
struct list_head *next = pending->prev;
if (!next)
break;
list = merge(priv, cmp, pending, list);
pending = next;
}
/* The final merge, rebuilding prev links */
merge_final(priv, cmp, head, pending, list);
```
`merge_final` 如果只是合併加恢復 prev 連結,為什麼不直接用 merge 加上恢復連結功能的程式就好?
- [ ] Depth-first order
那作者宣稱的 depth-first order 又和上述提及的 mergesort 變形有何異同?為何說是 cache friendly?
:::danger
- [ ] traverse (動詞) 和 traversal (名詞)
根據 Dictionary.com 的[解釋](https://www.dictionary.com/browse/traverse
): (作為及物動詞和不及物動詞都有類似的意思,以下列出作為及物動詞的寓意)
* to pass or move over, along, or through.
* to go to and fro over or along.
其實這意思很好懂,就像我們「走過」/「穿越」校園一般,於是 traverse a linked list 就會是「(用某種手段) 存取多個鏈結串列的節點」,但這裡卻沒有必要「所有」的範圍:英語的 "move over/through" 用於某個區域時,根本沒有這樣的隱含意義。如果將 traverse 翻譯為「遍歷」,就會導致「超譯」,也就是跳脫「直譯」和「意譯」。
當我們回頭看 "traverse" 所在的技術描述內容,例如 "traverse every node",若翻譯為「遍歷每個節點」,那麼既然都「遍」(意即「全面」、「到處」),又何來「每個」節點呢?於是,合理的翻譯應改為「逐一走訪每個節點」 —— 差異在哪?在 "traverse every node" 的應用場景中,可能是我們嘗試在鏈結串列尋找特定的節點內含的資料,一旦找到就停止,或者我們要偵測給定的鏈結串列是否包含環狀 (circular) 結構 ,並沒有真的要「遍」(到處/全面)「歷」(意即「經過」) 每個節點。在我們的用語中,要區分「意圖」(intention) 和「實際作用」(reaction),濫用「遍歷」會使得語意不清,從而難以推測英語原文的訴求。
還有個更重要的原因是,「遍歷」這詞已在理工領域存在,且廣泛使用,即「遍歷理論」(Ergodic theory),後者是研究具有不變測度 (invariant measure) 的動力系統及其相關問題的一個數學分支。 遍歷理論研究遍歷變換,由試圖證明統計物理中的遍歷假設 (Ergodic hypothesis) 演進而來。
在統計學中,若單一個物理系統在不同時間內重複相同的實驗 (如丟擲一枚硬幣),其取樣數據所得的統計結果 (如硬幣出現正面的機率) 和極多個完全相同的物理系統的系集 (如丟極多個相同的硬幣) 在同時作相同的實驗取樣數據的統計結果假設為相同時,此種假設即稱為「遍歷性假設」或「遍歷假設」。基於這個假設,對時間平均的統計方式及結果,便可由對系集的平均及統計方式得到。在一般物理系統中,尤其在統計力學範圖中,均採用此遍歷性假設為基本的假設。在流體力學中對亂流的實驗分析,亦是採用此假設。
遍歷 (Ergodic) 源於以下二個希臘詞:
* ergon (對應英語的 work)
* odos (對應英語的 path 或 way)
最初這是由奧地利物理學家波茲曼 ([Ludwig Boltzmann](https://en.wikipedia.org/wiki/Ludwig_Boltzmann)) 於統計力學領域 (statistical mechanics) 提出的概念,其一廣為人知的結果是:在經過長時間後,時間平均將會趨近空間平均。此事實在動力系統扮演極重要的角色,在隨機分析領域亦然。
因此,若貿然將 traverse 翻譯為「遍歷」,一來語意不清,二來增加和理工多項領域專業人員的溝通成本,實在得不償失。
$\to$ [資訊科技詞彙翻譯](https://hackmd.io/@sysprog/it-vocabulary)
:::
經過上方的推導可以明白,在<s>遍歷</s> 逐一走訪每個節點的過程中,會不斷將節點加入 pending,直到達成一定條件才會開始合併 (第一個在 do while 迴圈合併的條件是當 count = 2)
這時 pending 已經有了三個節點 (想像成走到一定的「深度」) 才開始合併,而不是像 bottom-up 那樣,每 K 個節點就進行合併
#### 於 qtest.c 加入命令
以 ADD_COMMAND 加入實作的 list_sort
```c
ADD_COMMAND(listsort, "Sort queue with Linux kernel-style merge sort", "");
```
藉由`priv` 參數,回傳排序過程中的比較次數
- 註: 若要還原檔案至前一個 commit 的版本,可輸入以下命令:
- `git checkout <commit-hash> -- /file/to/restore`
### 嘗試改進針對鏈結串列排序實作的效能
> 詳參: [專題](https://hackmd.io/@sysprog/Hy5hmaKBh#TODO-%E5%B0%87%E6%94%B9%E9%80%B2%E7%9A%84-Timsort-%E5%BC%95%E5%85%A5%E5%88%B0-Linux-%E6%A0%B8%E5%BF%83)
注意到 `merge_final` 有段註解:
> If the merge is highly unbalanced (e.g. the input is already sorted), this loop may run many iterations.
而專題提及的 Timsort,善於處理已經排序過的狀況,並以插入排序處理小串列
list_sort 原先將一個個節點放入 pending 串列,根據數量決定是否合併;
在 Timsort 中,則不斷尋找 run 並合併
這裡的 run 就是已經排序好 (或是反向排序好) 的子串列
原作中用到一個專門的結構體儲存 `find_run` 的回傳值:
```c
struct pair {
struct list_head *head;
struct list_head *next;
};
```
其中 head 紀錄這個 run 的開頭,next 紀錄下個 run 的起始點,
雖說只為一個函式宣告專用的結構體有些不泛用,但這也可以讓主程式較為方便的逐一拜訪各個 run
一個我想到有別於專題的 Timsort 改進是 minrun 的實現
因為當 minrun 略大於 2 的冪時,效率會特別低,因此 Timsort 作者提出一種選取 minrun 的簡潔方式
```c
static size_t find_minrun (struct list_head *head)
{
size_t len = list_length(head);
// find first 6 bit & add up remain bits
size_t minrun = 0;
for(;len >> 6; len >>=1)
minrun += (len & 1);
minrun += len;
return minrun;
}
```
若當前 run 數量不超過 minrun 時,以二元插入法將節點插入已經排序好的 run 中
( 改編自 [Timsort](https://github.com/visitorckw/linux23q1-timsort/blob/main/timsort.c#L94-L123) )
```c
static struct pair find_run(void *priv,
struct list_head *list,
list_cmp_func_t cmp,
size_t minrun)
{
// ...
/* binary insert if len < minrun */
while (DO_MINRUN && next && len < minrun) {
size_t idx = binary_insert(priv, head, next, cmp, len);
struct list_head *inserted = next;
next = inserted->next;
if(!idx) {
inserted->next = head;
head = inserted;
} else {
struct list_head *itr = head;
while(idx-- > 1)
itr = itr->next;
inserted->next = itr->next;
itr->next = inserted;
}
len++;
}
// ...
}
```
初步以隨機資料 1000 筆測試,發現相對原版的 Timsort,加入 minrun 機制確實使比較次數下降約 700 次
不過,加入插入的動作會不會影響穩定度呢?
### 效能分析實驗
> 應對不同的資料分佈進行實驗,具體參考 [listsort 說明文件](https://github.com/python/cpython/blob/main/Objects/listsort.txt)
#### 測試資料
:::danger
分析效能之前,先要準備足以涵蓋多種情境的輸入資料。
:::
參閱 listsort 說明文件,作者提出幾種資料分佈
| 代號 | 說明 |
| ------- | ---------------------------------- |
| `*sort` | 隨機資料 |
| `\sort` | 降序資料 |
| `/sort` | 升序資料 |
| `3sort` | 升序,隨機挑選其中三組交換 |
| `+sort` | 升序,放置十個隨機元素於尾端 |
| `%sort` | 升序,隨機以隨機數值替換 1% 的元素 |
| `~sort` | 包含許多重複數值的元素 |
| `=sort` | 一樣數值的元素 |
| `!sort` | 最糟狀況 |
參閱 [sortperf.py](https://github.com/python/cpython/blob/main/Tools/scripts/sortperf.py#L99-L108),可得知設計細節,例如「最糟狀況」是針對使用 meidan-of-three 的快速排序設計
令 $lg(x)$ 為以 2 為底的對數函數,則 $lg(n!)$ 是資訊理論中 comparison-based 排序法比較數量的極限值 (n 是測試資料的數量)
TODO: 設計專門的測試程式,支援上述資料分佈
原先希望整合於 qtest ,但考慮 qtest 是用於測試各項佇列操作,改為撰寫額外測試檔案
排序程式用的節點數值以整數為主,但 lab0-c 節點的數值是字串,這裡考慮固定長度的字串,但構成的字元是隨機 ASCII code
#### 評估效能
> [測量程式](https://github.com/csotaku0926/lab0-c/blob/master/miscs/measure_sort.c)
這部份,我將藉由評估效能的指標, 比較原先的 top-down, Linux-kernel list sort,專題提及的 Timsort, 以及 minrun Timsort ,針對不同資料分佈之效能差異
- [ ] 執行時間
- [ ] 比較次數
> 參閱 [共筆](https://hackmd.io/byNYeoGvQv-a9TNDc2CKFQ?view#comparison-%E6%AC%A1%E6%95%B8),了解論文測量 K 的機制
- [ ] cache miss ratio
## 實作 shuffle 命令
> [項目說明](https://hackmd.io/@sysprog/linux2024-lab0/%2F%40sysprog%2Flinux2024-lab0-d)
### 實務分析
[Fisher-Yates shuffle](https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle) 為一種將有限序列洗牌的迭代算法
這裡我遇到了一個 bug ,以下的交換動作會導致錯誤
其中 node, safe 是 `list_for_each_safe` 的迭代器,swap_node 是被選中要交換的節點
```c
list_move(node, swap_node);
list_move(swap_node, safe);
```
事實上,由於交換節點是隨機指定,因此有可能遇到兩個交換節點是同一個的狀況
這時候進行交換就會導致錯誤
此外,需要考慮 `b->prev == a` 特殊狀況
要好好思考`list_move` 的 `head` 參數,確保正確交換
最後的 `swap()` 定義如下
> [完整程式碼](https://github.com/csotaku0926/lab0-c/commit/d7b08268ce703e1968dde77f236076a446ded95d)
```c
static inline void swap(struct list_head *a, struct list_head *b)
{
if (a == b)
return;
// b --> a
struct list_head *a_prev = a->prev, *b_prev = b->prev;
if (b != a_prev)
list_move(b, a_prev);
if (a != b_prev)
list_move(a, b_prev);
}
```
實作完成後,以 `ADD_COMMAND` 巨集加入 qtest.c
```c
ADD_COMMAND(shuffle, "Shuffle the queue using Fisher-Yates Shuffle", "");
```
### 統計分析
> [測試程式](https://github.com/csotaku0926/lab0-c/blob/master/measure_shuffle.py)
Python 提供 [subprocess.run](https://docs.python.org/3/library/subprocess.html#subprocess.run) 可以以指定的參數執行 `qtest` 執行檔,並且可以透過 `CompletedProcess.stdout` 取得執行結果
這麼一來,可以很方便的進行後續的統計分析
我將含有四個元素的串列洗牌 1,000,000 次,自由度為 $4!-1=23$
[查表](https://www.oreilly.com/api/v2/epubs/9780470074718/files/images/T0A03.jpg) 得知在顯著水準 (significance level) 定義為 0.05 的狀況下,若要不拒絕虛無假說,對應的卡方分佈最大值為 35.172
以下為統計結果
```
Test time: 1000000
Expectation: 41666
Chi-Squared Sum: 20.82378918062689
Probabily uniform
```
這裡有個疑惑點,通常使用 `rand` 時應該要加入 `srand` 函式確保每次的隨機分佈都不一樣
然而當我在洗牌函式加入 `srand(time(NULL))` 後,整體分佈呈現不平均的狀況 (或者說,卡方分佈值遠大於預期最大值)
```shell
Test time: 1000000
Expectation: 41666
Chi-Squared Sum: 1061730.697691163
Not Uniform
```
---
## 整合網頁伺服器
> [項目說明](https://hackmd.io/@sysprog/linux2024-lab0/%2F%40sysprog%2Flinux2024-lab0-c)
### favicon.ico 處理
以瀏覽器寄送要求,例如 `http://127.0.0.1:9999/new`,除了預期的 new 命令,還會額外要求 favicon.ico
一種方式為加入額外的 html link tag,如:
```html
<link rel="shortcut icon" href="#">
```
這麼一來就不能用瀏覽器,而要自己撰寫程式寄送要求了
### 以 select 處理多個 FD
> [select](https://man7.org/linux/man-pages/man2/select.2.html), [poll](https://man7.org/linux/man-pages/man2/poll.2.html)
根據 [linenoise Github](https://github.com/antirez/linenoise) 的說明,`linenoise` 是擁有處理命令列 (類似 readline) 功能的套件,包括 tab-complete, line hints 等
而根據項目說明,一旦網頁伺服器啟動,由於 linenoise 是以 read 等待使用者輸入。當 read 阻塞時無法接收來自網頁命令
實際前往 `console.c`,看到當網頁伺服器啟動時,linenoise 功能被關閉了
```c
web_fd = web_open(port);
if (web_fd > 0) {
printf("listen on port %d, fd is %d\n", port, web_fd);
use_linenoise = false;
```
若 linenoise 正確啟動,許多命令集的功能 (e.g. tab complete) 可以正常使用
在以上的狀況,當輸入 web 命令後,可以觀察到這些功能都無法使用了
若是故意將 `use_linenoise = false;` 去除,便會發現由於 read 阻塞了輸入流,導致客戶端寄送的命令無法被接收
select 以 `fd_set` 定義檔案描述子 <s>文件描述符</s> 集合,我們的目標是利用其同時處理來自 stdin 以及 socket 的輸入
第一步是要找到對應的描述符,`line_edit()` 中只有 stdin 的描述符,還需要引入 socket 的 listenfd
web.c 中的 `web_open` 會回傳 listenfd,而 console.c 中呼叫 `web_open` 的函式是 `do_web`,listenfd 被儲存於全域變數 `web_fd` 中
觀察 console.c 的 `cmd_select` ,其實已經對於這些描述符做 `select` 了
再來,則是以 `FD_SET` 設定檔案描述子 <s>表示符</s> ???,以 `cmd_select` 為例:
```c
FD_ZERO(readfds);
FD_SET(infd, readfds);
// ...
FD_SET(web_fd, readfds);
```
可以看到設定了來自 STDIN 以及 socket 的讀取表示符
參考 [var-x 共筆](https://hackmd.io/@vax-r/linux2024-homework1#%E6%95%B4%E5%90%88%E7%B6%B2%E9%A0%81%E4%BC%BA%E6%9C%8D%E5%99%A8),得知 linenoise 啟動的機制是以下程式碼:
```c
char *cmdline = linenoise(prompt);
```
TODO:
`cmd_select` 中原先 `readline()` 可以很順暢的接收兩端輸入
將其替換為上述程式碼
則會發現 linenoise 套件可以間隔使用,也就是說 STDIN 和 socket 輸入會輪流讀取,待輪到 STDIN 的時候才能恢復 linenoise (仍然有阻塞的問題)
是否有辦法解決阻塞的問題,又能使用 linenoise?
雖然提示指出可以用 `select` 系統呼叫處理,但考慮到 man page 點出:
> select() can monitor only file descriptors numbers that
are less than FD_SETSIZE (1024)—an unreasonably low limit for
many modern applications—and this limitation will not change.
All modern applications should instead use poll(2) or epoll(7),
which do not suffer this limitation.
因此未來考慮使用功能相似,但能容納更多<s>文件描述符</s> 的 `poll` 改善
:::danger
file descriptor 是「檔案描述子」,而非「文件」(document)
:::
## 整合 tic-tac-toe
> [<s>項目</s> 專案說明](https://hackmd.io/@sysprog/linux2024-ttt)
:::danger
project 的翻譯是「專案」,而非「項目」(item)
:::
### 整合標頭檔
於 [list.h](https://github.com/torvalds/linux/blob/master/include/linux/list.h) 找尋對應的 hlist 相關函式,包括初始化,新增刪除節點,搜尋等
注意到標頭檔中對於 `hlist_unhashed` 函式,還多了另一項 "lockless" 版本的實作:
```c
static inline int hlist_unhashed_lockless(const struct hlist_node *h)
{
return !READ_ONCE(h->pprev);
}
```
原始標頭檔出現許多 const qualifier 應用,根據 C99 規格書 (6.7.3 Type Qualifier) 的描述:
> If an attempt is made to modify an object defined with a const-qualified type through use
of an lvalue with non-const-qualified type, the behavior is undefined.
如 `hlist_unhashed` 函式:
```c
static inline int hlist_unhashed(const struct hlist_node *h)
{
return !h->pprev;
}
```
這時改變 h 指向的 `struct hlist_node` 是合法的,但不能藉由 `*h` 本身修改數值,會造成 Undefined behavior
### 新增 `ttt` 命令
專案提供了 negamax, MCTS 等算法。先嘗試將 negamax 引入 lab0-c
首先看到有點複雜的 #define 巨集
```c
#include "game.h"
#ifdef USE_RL
#include "agents/reinforcement_learning.h"
#elif defined(USE_MCTS)
#include "agents/mcts.h"
#else
#include "agents/negamax.h"
#endif
```
"USE_RL", "USE_MCTS" 是什麼?我並未在 ttt 目錄標頭檔找到它們的定義,程式怎麼知道我現在要用哪種算法,並依此決定要 include 哪些標頭檔?
隨後我於 Makefile 觀察到以下命令:
```Makefile
RL_CFLAGS := $(CFLAGS) -D USE_RL
MCTS_CFLAGS := $(CFLAGS) -D USE_MCTS
```
翻閱 [gcc 手冊](https://gcc.gnu.org/onlinedocs/gcc/Preprocessor-Options.html#Preprocessor-Options) 找到 -D flag 對應說明 (3.13 Options Controlling the Preprocessor) :
> -D name
Predefine name as a macro, with definition 1.
原來透過以上 makefile 指令,就能提前定義巨集並設為 1
我的作法是,將原先的 ttt 目錄整個複製到 lab0-c 目錄後,保留需要的檔案:
- `agents/mcts.[ch]`: 提供 MCTS 算法需要的函式
- MCTS 是一種啟發式搜尋 (heuristic search),藉由增長搜尋樹找到最佳動作
- 利用 UCB formula 處理探索利用困境 (exploration/exploitation problem)
- 經歷四個階段尋找當前最好的動作
- `agents/negamax`, `agents/reinforecment_learning`: 提供另外兩種算法需要的函式
並根據以上改動,調整 Makefile
- [ ] 強化學習 (reinforcement learning)
> 我對於強化學習算法的過程很有興趣,因此紀錄算法原理
> [wiki](https://en.wikipedia.org/wiki/Reinforcement_learning)
RL 是一種不同於典型機器學習的算法,不用標籤,而是透過獎勵促使 agent 於互動環境學習
<img src="https://hackmd.io/_uploads/HJnfWRsyR.png" width="500">
RL 使用馬可夫決策 (Markov decision process) 作為決策模型,以下為其元素:
- 環境 -- agent 互動的地方,以及狀態 (state) 集合 $S$
- 獎賞: 環境中的回饋
- agent 動作的集合 $A$ (如: 前進方向)
- $P_a(s,s')=P(S_{t+1}=s'|S_t=s,A_t=a)$ : probability of transition from state $s$ to state $s'$ under action $a$
- $R_a(s,s')$: 經由行動 a 完成狀態 s 到 s' 轉移後獲得獎賞
顯然,強化學習的目標是最大化獎賞,但如何得知模型參數呢?
在強化學習中,會假設代理 (agent) 可以直接觀察環境狀態 (若真,此問題稱為具有 full observability)
### 定點數實作
> [說明](https://hackmd.io/@sysprog/linux2024-ttt#%E5%AE%9A%E9%BB%9E%E6%95%B8)
原先的 ttt 專案中,只要是計算分數的部份都會用到浮點數。在 Linux 核心中,使用浮點數不僅造成執行時間增加,還需要對 FPU 暫存器進行額外處理
- [ ] 定義
什麼是定點數 (Fixed-point arithmetic)? 和浮點數有何不同? 為什麼比較不會造成負擔?
簡單而言,定點數用以表示整數以及小數時用的 bit 數量是固定的,不論數值大小
而浮點數儲存的 bit 是用以表示數值 (significand or mantissa) 以及指數 (exponent)
所以,定點數本質其實與整數相同,只是一部分的 bit 用來表示小數,也就不需要額外的資源 (FPU) 支援了
考慮一個 8-bit word $01010110_2$ ,以十進位整數而言是 86 ,但考慮小數點安置在第四到第五位之間,就變成 $0101.0110$ ,也就是 5.375
在定點數中,為了明確表示各有幾個 bit 表示整數與小數,定義 Q-format
假設用 m 個 bit 表示整數,n 個 bit 表示小數,則我們說這個數的 Q-format 是 Qm.n 或 Qn (只紀錄小數)
- [ ] 實作
> [完整程式碼](https://github.com/csotaku0926/lab0-c/blob/master/ttt/agents/fixed_point.c)
參照 [Linux 核心的浮點數運算](https://hackmd.io/@sysprog/linux2024-ttt#-Linux-%E6%A0%B8%E5%BF%83%E7%9A%84%E6%B5%AE%E9%BB%9E%E6%95%B8%E9%81%8B%E7%AE%97),我以 `unsigned long` 代表定點數,並且以 `LOAD_INT`, `LOAD_FRAC` (取小數前兩位) 進行整數小數的擷取
運用 typedef 可以更清楚表達定點數的型態
```c
#define FSHIFT 4 /* nr of bits of precision */
#define FIXED_1 (1 << FSHIFT) /* 1.0 as fixed point */
#define LOAD_INT(x) ((x) >> FSHIFT)
/* keep 2 digits of fractions */
#define LOAD_FRAC(x) LOAD_INT(((x) & (FIXED_1 - 1)) * 100)
typedef long fixed_point;
```
為了引入定點數,我們需要對以下浮點數運算進行替換:
- 四則運算
- log
- sqrt
> [commit fc4a2c6](https://github.com/csotaku0926/lab0-c/commit/fc4a2c6cc62a2a1cf50c9543a146134cef6d90ee)
加減乘法十分易懂,但除法稍微複雜些
由於定點數是整數型態,若除數與除數直接相除,小數位會被除去
因此我引進 scale_factor ,讓被除數先左移固定位數後再相除,這樣才能保留部份小數
```c
fixed_point_t div (fixed_point_t a, fixed_point_t b)
{
fixed_point_t result = (a << SCALE_FACTOR) / b;
result = LOAD_FIXED(result);
result >>= SCALE_FACTOR;
return result;
}
```
至於 log, sqrt,要如何轉換呢?我的初步想法是大一微積分學到的泰勒展開
由於 $f(x)=\sqrt{x}$ 的泰勒展開在 x = 0 時不存在,考慮 $f(x)=\sqrt{1+x}$ 的泰勒展開 $T(x)$
$$
T(x)=\sum_{k=0}^{\infty}\frac{f^{(k)}(a)}{k!}(x-a)^k
$$
但是算式本身有些複雜,或許存在更快的算法,如 [var-x 共筆](https://hackmd.io/@vax-r/linux2024-homework1#Monte-Carlo-Search-Tree) 提到的逼近法那樣
此逼近法特點在於不像牛頓逼近法,不需要用到除法,只做加法、位元運算,因此較有效率
```c
fixed_point_t sqrt_f(fixed_point_t num)
{
fixed_point_t res = 0L;
/* i is intialized as the index of first bit 1 in num */
/* t starts from num, divided by two each time */
for(int i = 31 - __builtin_clz(num | 1) ; i >= 0; i--) {
fixed_point_t t = 1UL << i;
if (multi_f(res + t, res + t) <= num )
res += t;
}
return res;
}
```
至於 log ,雖然也可以使用泰勒展開,但一個比較簡潔的方式是使用二分對數法
$$log_e(x)=\frac{log_2(x)}{log_2(e)}$$
分母的部份是常數,log2 也能用簡單的邏輯計算
實作參考論文 [A Fast Binary Logarithm Algorithm (C. S. Turner)](http://www.claysturner.com/dsp/binarylogarithm.pdf)
- [ ] TODO: 以 Python 或 gnuplot 計換 fixed-point 函式與一般浮點數之誤差
首先需要替換的函式為 `uct_score`,其中 UCB formula 的部份涉及 `log`, `sqrt` 這種本身是浮點數的操作
$$
\frac{w_i}{n_i}+C\sqrt{\frac{lg(t)}{n_i}}
$$
其中 $w_i$ 為選擇這個節點後,經過第 i 個動作後獲勝的次數,$n_i$ 是經過第 i 個動作後模擬的次數,C 是探索參數 (設為 $\sqrt{2}$ ),$t$ 是模擬的總數量
完整的修改程式碼可參 commit
> [commit 2ed890d](https://github.com/csotaku0926/lab0-c/commit/2ed890d598a29421fae4de881fa4030d6bc00131)
### 電腦 v.s. 電腦
#### 引入新選項
> [commit a0d07ab](https://github.com/csotaku0926/lab0-c/commit/a0d07ab90b3a124940800de213d9424467ac32c1)
加入選項 "ai_vs_ai" ,達成電腦與電腦間對決
#### 引入 coroutine
> [項目說明](https://hackmd.io/@sysprog/concurrent-sched)
做這個部份時有些疑惑,原先以 while loop 輪流啟動兩個 AI 對手進行對奕是比較直覺的方式,引入 coroutine 這種多工機制改寫,好處在哪裡呢?
- [ ] 定義
什麼是 coroutine?根據 [wikipedia](https://en.wikipedia.org/wiki/Coroutine) 的定義
> Coroutines are computer program components that **allow execution to be suspended and resumed**, generalizing subroutines for cooperative multitasking.
Coroutines are well-suited for implementing familiar program components such as cooperative tasks, exceptions, event loops, iterators, infinite lists and pipes.
簡略而言,就是執行階段可以被暫停,隨後再回復到先前執行階段的電腦程式
想像僅有單一 CPU 的電腦,需要執行多項任務。於是便有先保存當前任務的 context ,切換到另一項任務的機制
而 coroutine 又可分為 stackful,stackless 兩種模式 ( 這裡的 stack 應是指函式,subroutine 的 [call stack](https://en.wikipedia.org/wiki/Call_stack),讓當前函式知道呼叫者是哪個函式 )。stackful 需要配置額外空間儲存堆疊,但允許程式於任何位置暫停 coroutine,[範例](https://blog.varunramesh.net/posts/stackless-vs-stackful-coroutines/) 中提到 Lua, Wren 兩種語言作為案例 ; 而 Javascript 的 `async/await` 則是經典的 stackless coroutine 案例
Coroutine 與 Thread 十分相近,不同的點是 coroutine 允許任務按照一種可變化的順序執行,換句話說 coroutine 提供 concurrency,但不具備平行化的特性
- [ ] 實作
> 參考: [coro](https://github.com/sysprog21/concurrent-programs/blob/master/coro/coro.c)
**1. setjmp/longjmp**
說明文件以 [setjmp](https://man7.org/linux/man-pages/man3/longjmp.3.html), longjmp 達成協同式多工
setjmp 為儲存進入點資訊於 jmp_buf 架構體,longjmp 則會暫停當下執行階段,並跳到儲存的進入點,範例可參考 [Stackoverflow](https://stackoverflow.com/questions/14685406/practical-usage-of-setjmp-and-longjmp-in-c)。在linux 核心實務中,可作為 context switch,也就是任務交換的手段
**2. context switch**
> [commit](https://github.com/csotaku0926/lab0-c/commit/ee0893dac5f5098f3cdbaf58a480eaa4d6e460b7)
針對需要排程的任務,定義 task 架構:
```c
struct task {
jmp_buf env;
struct list_head list;
int n; /* how many time yield control */
int i; /* iterator */
char task_name[16];
};
struct arg {
int n;
int i;
char *task_name;
};
```
其中 `task->env` 用以存放 setjmp 的資訊,`task->list` 用來表示每個任務與 tasklist 的連結。tasklist 是個用來存放即將執行之任務的雙向佇列。
n, i 是執行任務用到的參數,因任務性質不同
接著透過以下兩個函式進行任務佇列的管理:
```c
/* FIFO order */
static void task_add(struct task *task)
{
list_add_tail(&task->list, &tasklist);
}
/* switch to the first task */
static void task_switch()
{
if (!list_empty(&tasklist)) {
struct task *t = list_first_entry(&tasklist, struct task, list);
list_del(&t->list);
curr_task = t;
longjmp(t->env, 1);
}
}
```
任務選取採取 FIFO ,也就是新增任務 (`task_add`) 會放到佇列尾端,存取任務從開頭開始
`task_switch` 函式中,會從 tasklist 當中選取下個任務
這裡使用全域變數 `curr_task` 標示當前任務指標,並透過 `longjmp` 切換到對應的進入點 。這裡要再參考每個任務的呼叫函式:
```c
void task0(void *arg)
{
/* initialize task with arg */
// ...
if (setjmp(task->env) == 0) {
task_add(task);
longjmp(sched, 1);
}
task = curr_task;
for (; task->i < task->n; task->i++) {
if (setjmp(task->env) == 0) {
// task
// ...
task_add(task);
task_switch();
}
task = curr_task;
printf("%s: resume\n", task->task_name);
}
printf("%s: complete\n", task->task_name);
longjmp(sched, 1);
}
```
但每個任務函式會呼叫兩次 `setjmp`: 第一次是在初始化完需要參數之後,在新的任務加入排程後,呼叫 `longjmp(sched, 1)` 讓新任務可以加入排程
```c
void schedule(void)
{
static int i = 0; // what for ?
setjmp(sched);
while (ntasks-- > 0) {
struct arg arg_i = args[i];
tasks[i++](&arg_i);
printf("Never reached\n"); // ?
}
task_switch();
}
```
- 註: `schedule()` 在 ntask = 0 時,需呼叫一次 `task_switch()`,確保當前任務執行完畢後,可以繼續執行尚未執行完的任務
第一次呼叫 `setjmp` 的狀況應該會發生在 `schedule` 的迴圈第一次迭代中,而第二次則會發生在 for 迴圈,也就是開始執行任務時 (以此例而言,費氏數列),假設有兩個 task (i, i+1),所以流程是 (由上而下) :
```
schedule() tasks[i] task_switch()
------------------------------------------------------------------------
setjmp(sched)
tasks[i](args[i]) -->
setjmp(tasks[i]->env)
<-- longjmp(sched, 1)
tasks[i+1](args[i+1]) -->
setjmp(tasks[i+1]->env)
<-- longjmp(sched, 1)
tasks[i](args[i]) -->
task_switch() -->
longjmp(t->env, 1)
```
確實是遇到類似 goto 帶來的困擾,那就是執行流程變得很亂
觀察執行結果,也能驗證我的流程猜想:
```
Task 0: n = 70
Task 1: n = 70
Task 2: n = 70
Task 0 fib(0) = 0
Task 1 fib(1) = 1
Task 2 .(0)
Task 0: resume
Task 0 fib(2) = 1
Task 1: resume
Task 1 fib(3) = 2
```
另外,位於每個任務函式的這行 `task = curr_task;`,看似多餘,但少了這行會導致 task 這個變數無法切換到當前任務
接著嘗試將這些流程套用到 ttt 中
在實作過程中,偶然發現原有實作會導致 Segmentation fault
觀察 gdb 後,發現執行數次 ttt 指令後,下列程式碼中的 `tasks` 會有越界讀取的問題
```c
void schedule(void)
{
static int i = 0; // <--
setjmp(sched);
while (ntasks-- > 0) {
struct arg arg_i = args[i];
tasks[i++](&arg_i); // invalid read
}
task_switch();
}
```
觀察後發現,由於 i 是靜態變數,其生命週期作用在整個程式範圍內,因此其數值會被保留,並不會發生預期的初始化為 0 的結果