# 2025q1 Homework1 (lab0)
contributed by < ibat10clw >
{%hackmd NrmQUGbRQWemgwPfhzXj6g %}
## 開發環境
```bash
$ uname -a
Linux henry-ThinkBook-14-G7-IML 6.11.0-17-generic #17~24.04.2-Ubuntu SMP PREEMPT_DYNAMIC Mon Jan 20 22:48:29 UTC 2 x86_64 x86_64 x86_64 GNU/Linux
$ gcc --version
gcc (Ubuntu 13.3.0-6ubuntu2~24.04) 13.3.0
Copyright (C) 2023 Free Software Foundation, Inc.
This is free software; see the source for copying conditions. There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
$ lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Address sizes: 46 bits physical, 48 bits virtual
Byte Order: Little Endian
CPU(s): 22
On-line CPU(s) list: 0-21
Vendor ID: GenuineIntel
Model name: Intel(R) Core(TM) Ultra 7 155H
CPU family: 6
Model: 170
Thread(s) per core: 2
Core(s) per socket: 16
Socket(s): 1
Stepping: 4
CPU(s) scaling MHz: 23%
CPU max MHz: 4800.0000
CPU min MHz: 400.0000
BogoMIPS: 5990.40
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge m
ca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 s
s ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc
art arch_perfmon pebs bts rep_good nopl xtopology nons
top_tsc cpuid aperfmperf tsc_known_freq pni pclmulqdq
dtes64 monitor ds_cpl vmx smx est tm2 ssse3 sdbg fma c
x16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt t
sc_deadline_timer aes xsave avx f16c rdrand lahf_lm ab
m 3dnowprefetch cpuid_fault epb intel_ppin ssbd ibrs i
bpb stibp ibrs_enhanced tpr_shadow flexpriority ept vp
id ept_ad fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms
invpcid rdseed adx smap clflushopt clwb intel_pt sha_
ni xsaveopt xsavec xgetbv1 xsaves split_lock_detect us
er_shstk avx_vnni dtherm ida arat pln pts hwp hwp_noti
fy hwp_act_window hwp_epp hwp_pkg_req hfi vnmi umip pk
u ospke waitpkg gfni vaes vpclmulqdq rdpid bus_lock_de
tect movdiri movdir64b fsrm md_clear serialize arch_lb
r ibt flush_l1d arch_capabilities
Virtualization features:
Virtualization: VT-x
Caches (sum of all):
L1d: 544 KiB (14 instances)
L1i: 896 KiB (14 instances)
L2: 18 MiB (9 instances)
L3: 24 MiB (1 instance)
NUMA:
NUMA node(s): 1
NUMA node0 CPU(s): 0-21
Vulnerabilities:
Gather data sampling: Not affected
Itlb multihit: Not affected
L1tf: Not affected
Mds: Not affected
Meltdown: Not affected
Mmio stale data: Not affected
Reg file data sampling: Not affected
Retbleed: Not affected
Spec rstack overflow: Not affected
Spec store bypass: Mitigation; Speculative Store Bypass disabled via prct
l
Spectre v1: Mitigation; usercopy/swapgs barriers and __user pointe
r sanitization
Spectre v2: Mitigation; Enhanced / Automatic IBRS; IBPB conditiona
l; RSB filling; PBRSB-eIBRS Not affected; BHI BHI_DIS_
S
Srbds: Not affected
Tsx async abort: Not affected
```
## 實作 queue.[ch]
在本次作業需以環狀雙向鏈結串列實作佇列,其中環狀雙向鍊結串列與 Linux 核心中採取的設計一致,將存放指標的結構體 `struct list_head` 嵌入實際存放資料的結構體 `element_t` 中。
### `q_new`: 建立新的「空」佇列;
此函式之功能為建立一個空佇列,須注意空佇列代表佇列本身是存在的,但是佇列中沒有任何資料。
在給定的函式定義中,將 `struct list_head *` 作為回傳值,可得知此處需要透過 `malloc` 來分配 `struct list_head` 所需之記憶體空間,才得以在函式結束時仍保留記憶體資訊。
```c
struct list_head *q_new()
{
struct list_head *q = (struct list_head *) malloc(sizeof(struct list_head));
if (!q)
return NULL;
INIT_LIST_HEAD(q);
return q;
}
```
`INIT_LIST_HEAD` 函式的功能為將傳入的參數 `list_head` 其兩個指標都指向自身。對照 `list_empty` 函式可得知當 `head->next == next` 時,代表此串列無任何節點,亦可代表佇列為空的狀態。
透過 `qtest` 預期可以看到下列結果
```
$ ./qtest
cmd> show
l = NULL
cmd> new
l = []
cmd> show
Current queue ID: 0
l = []
```
### `q_insert_head`: 在佇列開頭 (head) 插入 (insert) 給定的新節點 (以 LIFO 準則);
完成佇列之建立後,接下來實作與加入資料相關的函式。
在給定的函式定義中,此函式傳入兩個參數,分別代表要操作的佇列以及欲加入佇列之字串資料。
```c
bool q_insert_head(struct list_head *head, char *s)
{
if (!head)
return false;
element_t *node = (element_t *) malloc(sizeof(element_t));
if (!node)
return false;
node->value = strdup(s);
if (!node->value) {
free(node);
return false;
}
list_add(&node->list, head);
return true;
}
```
在這個地方可以透過 `strdup` 一個函式就完成 buffer 的分配以及字串資料的複製,此外考慮傳入字串可能很長,若沒有先確定串列節點有分配成功,就進行字串相關處理,則可能產生浪費,因此在此函式中會在串列節點分配完成後才進行字串相關的操作。
透過 `qtest` 確認函式之正確性,預期可看到以下結果
```
$ ./qtest
cmd> new
l = []
cmd> ih linux
l = [linux]
cmd> ih kernel
l = [kernel linux]
```
### `q_insert_tail`: 在佇列尾端 (tail) 插入 (insert) 給定的新節點 (以 FIFO 準則);
此函式與 `q_insert_head` 的邏輯相似,僅需將最後使用的 `list_add` 調整成 `list_add_tail` 即可。
```c
bool q_insert_tail(struct list_head *head, char *s)
{
... // Same as q_insert_head
list_add_tail(&node->list, head);
return true;
}
```
透過 `qtest` 確認函式之正確性,預期可看到以下結果
```
$ ./qtest
cmd> new
l = []
cmd> ih linux
l = [linux]
cmd> it kernel
l = [linux kernel]
```
### `q_size`: 計算目前佇列的節點總量;
此函式功能為計算佇列之長度。
由於 `struct list_head` 中並未有紀錄長度之變數,因此計算長度需實際走訪串列,此處可以使用 `list_for_each` 的巨集來走訪串列,查看 `list.h` 中可以得知此巨集傳入一參數 `node` 作為 iterator 使用,並且將其初始化成 `head->next`,而終止條件為 `node != head`,所以進到迴圈的次數會跟實際節點存在的數量等,在每一個回合將 `sz` 遞增即可計算出結果。
```c
int q_size(struct list_head *head)
{
if (!head)
return -1;
int sz = 0;
struct list_head *it;
list_for_each(it, head)
sz++;
return sz;
}
```
透過 `qtest` 確認函式之正確性,預期可看到以下結果
```
$ ./qtest
cmd> new
l = []
cmd> ih linux 500
...
cmd> size
Queue size = 500
...
cmd> it kernel 500
...
cmd> size
Queue size = 1000
...
```
### `q_free`: 釋放佇列所佔用的記憶體;
此函式之功能為釋放傳入佇列所佔用的記憶體,在佇列的建立以及節點新增的過程中,涉及了 `struct list_head`, `element_t` 以及 `element_t->value` 三處的記憶體分配,在釋放整個佇列時需回收這三部份的空間。
需注意不要造成 Use-After-Free (UAF) 的狀況,因此釋放的順序應當由內向外進行釋放。
```c
void q_free(struct list_head *head)
{
if (!head)
return;
element_t *it, *tmp;
list_for_each_entry_safe (it, tmp, head, list) {
free(it->value);
free(it);
}
free(head);
}
```
### `q_remove_head`: 在佇列開頭 (head) 移去 (remove) 給定的節點;
此函式之功能為移除佇列開頭之節點,並且當傳入參數 `sp` 不為 NULL 時,將節點上之字串複製至 `sp`。
使用 `list_del_init` 將佇列開頭之節點從佇列中移除。
關於字串複製的部份要避免 overflow 的發生,考慮 `bufsize` 可能小於節點上字串之長度,因此此處需要判斷要複製多少位元組的資料到 `sp` 裡面。當字串長度小於 `bufsize - 1` 時,我們可以將字串完整複製至 `sp` 中,並在最後加入 null terminator,反之若字串長度大於 `bufsize - 1` 時,將其 truncate 至 `bufsize - 1` 後,同樣在最後加入 null terminator。
```c
element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize)
{
if (!head || list_empty(head))
return NULL;
struct list_head *lh = head->next;
element_t *ele = list_entry(lh, element_t, list);
list_del_init(lh);
if (sp) {
size_t sz = min(strlen(ele->value), bufsize - 1);
memcpy(sp, ele->value, sz);
sp[sz] = '\0';
}
return ele;
}
```
在此我們需要實作用於判斷兩數字間大小關係的 `min` 函式,此處採取 branchless 之設計,以減少分支預測失敗帶來的效能損失。[參考來源](https://www.geeksforgeeks.org/compute-the-minimum-or-maximum-max-of-two-integers-without-branching/)
```c
static inline int min(int a, int b) {
return b ^ ((a ^ b) & -(a < b));
}
```
此函式透過 XOR 的性質,當 `a < b` 成立時,`-(a < b)` 相當於 `0xFFFFFFFF`,代表括號中的 `a ^ b` 會被完整保留,最後外層的 XOR 運算時 `b` 會被抵銷掉,於是剩下 `a`。
若 `a < b` 不成立,則 `a ^ b` 會被 bitwise AND 給清空成 0,最後與外層 XOR 運算時,可留下 `b`。
另外此處不直接使用 `scrncpy` 搭配 `bufsize` 的考量為根據 man page 中的敘述 `If the source has too few non-null bytes to fill the destination, the functions pad the destination with trailing null bytes.`,如果要複製的的 `src` 不足的話,會有填充 null bytes 的行為,但是如果要把 `sp` 當成字串解釋,只需要確保在正確的位置有一個 `\0` 即可。
### `q_remove_tail`: 在佇列尾端 (tail) 移去 (remove) 給定的節點;
此函式與 `q_remove_head` 的邏輯相似,僅需將 `list_del_init` 的對象由 `head->next` 調整成 `head->prev` 即可。
```c
element_t *q_remove_tail(struct list_head *head, char *sp, size_t bufsize)
{
... // Same as q_remove_head
struct list_head *lt = head->prev;
... // Same as q_remove_head
}
```
### `q_sort`
`q_sort` 需根據傳入的參數來決定是遞增排序還是遞減排序,因此這邊先實作比較函式
```c
static inline bool str_cmp_asc(const char *a, const char *b)
{
return strcmp(a, b) <= 0;
}
static inline bool str_cmp_dsc(const char *a, const char *b)
{
return strcmp(a, b) >= 0;
}
```
根據 `strcmp` 的 man page
```
0, if the s1 and s2 are equal;
a negative value if s1 is less than s2;
a positive value if s1 is greater than s2.
```
當 `s1` 的字典序小於 `s2` 時,回傳一個負數,在此 `str_cmp_asc` 用 `<= 0` 來將結果轉換為布林值,反之亦然。這兩個函式稍後會在 `q_sort` 中根據參數 `descend` 賦值給函式指標,這樣可以在 `q_sort` 呼叫時做一次,讓後續比較都可以使用到對應的函式。
接下來是排序演算法的選擇,在此我選擇實作 mergesort,首先是前置處理的部份,我將雙向環狀鍊結串列的環狀結構破壞掉,考慮 `q_sort` 傳入的參數 `head` 不帶有資料,因此也將其暫時移除,等待排序結束後再進行恢復。
```c
void q_sort(struct list_head *head, bool descend) {
...
struct list_head *tmp = head->next;
bool (*cmp)(const char *a, const char *b) = descend ? str_cmp_dsc : str_cmp_asc;
head->prev->next = NULL;
head->next = NULL;
struct list_head *sorted = mergesort(tmp, cmp);
...
}
```
#### `mergesort`
在 `mergesort` 的實作中參考了 [你所不知道的 C 語言: linked list 和非連續記憶體](https://hackmd.io/@sysprog/c-linked-list#%E6%A1%88%E4%BE%8B%E6%8E%A2%E8%A8%8E-Leetcode-2095-Delete-the-Middle-Node-of-a-Linked-List) 中提及的快慢指標找串列中點的技巧,找出串列的中點,並且對串列進行分割。
```c
struct list_head *mergesort(struct list_head *head, bool (*cmp)(const char *a, const char *b))
{
if (!head->next) {
return head;
}
struct list_head *fast = head;
struct list_head *slow = head;
while (fast->next && fast->next->next) {
fast = fast->next->next;
slow = slow->next;
}
struct list_head *mid = slow->next;
slow->next = NULL;
struct list_head *left = mergesort(head, cmp);
struct list_head *right = mergesort(mid, cmp);
return merge(left, right, cmp);
}
```
在 `merge` 的部份同樣參考了[你所不知道的 C 語言: linked list 和非連續記憶體](https://hackmd.io/@sysprog/c-linked-list#%E6%A1%88%E4%BE%8B%E6%8E%A2%E8%A8%8E-LeetCode-21-Merge-Two-Sorted-Lists)中提及的技巧,透過 indirect pointer 來簡化程式邏輯。
在 merge 的階段中暫時不處理 `prev` 的指標,等待排序完成後再一次進行處理。
```c
struct list_head *merge(struct list_head *l1, struct list_head *l2, bool (*cmp)(const char *a, const char *b)) {
struct list_head *head = NULL, **ptr = &head, **node;
for (node = NULL; l1 && l2; *node = (*node)->next) {
node = cmp(list_entry(l1, element_t, list)->value,
list_entry(l2, element_t, list)->value)
? &l1
: &l2;
*ptr = *node;
ptr = &(*ptr)->next;
}
*ptr = l1 ? l1 : l2;
return head;
}
```
此處共有三個指標,先說明 `head` 以及 `ptr` 的作用,`ptr` 作為指向 `head` 指標的指標,可以透過解引用對 `head` 進行賦值,這樣就可以避免宣告或者分配一個 dummy 節點的成本,當 `head` 已經不為 NULL 後(也就是第一次進入迴圈時),可再將 `ptr` 的值向前移動。
`node` 也是 indirect pointer,他的作用是決定當前要從 `l1` 還是 `l2` 取出節點,此處便可用上 `cmp` 這個函式指標作判斷,將 `ptr` 指向對應目標,不論選擇的是 `l1` 還是 `l2`,都可以透過解引用將 `node` 賦值給 `ptr`,再將 `ptr` 給向前移動,由於 `ptr` 會指向 `l1` 或 `l2` 其一,因此這個行為相當於將 `l1` 或 `l2` 已經掛上的節點給去除。
透過上述的實作,我們已經能獲得一個排序過的串列,可以透過 `next` 來按照升序或者降序進行拜訪,剩餘的部份則是要恢復環狀結構,以及處理 `merge` 階段沒有做的 `prev` 指標的部份。
```c
void q_sort(struct list_head *head, bool descend) {
...
// Sort the list and ignore prev link in this stage
struct list_head *sorted = mergesort(tmp, cmp);
// Recover previous link and stop at last node
struct list_head *current = sorted;
while (current->next) {
current->next->prev = current;
current = current->next;
}
// Recover circular structure
sorted->prev = head;
head->next = sorted;
head->prev = current;
current->next = head;
}
```
在此使用 `mergesort` 的回傳值作為起點,開始依序拜訪串列上的節點,並將當前節點的下一個節點的 `prev` 指向自己,之後向前移動,此處的 `while` 迴圈會停留在最後一個節點。
這恰好能作為恢復環狀結構的依據,只要將 `head` 與首與個節點和末個節點進行串接,雙向環狀鍊結串列的排序即可完成。
### `q_delete_mid`
此函式的功能為移除並釋放串列的中點。
```c
bool q_delete_mid(struct list_head *head)
{
if (!head || list_empty(head))
return false;
struct list_head *slow = head->next, *fast = head->next->next;
while (fast != head && fast->next != head) {
slow = slow->next;
fast = fast->next->next;
}
element_t *tmp = list_entry(slow, element_t, list);
list_del(slow);
free(tmp->value);
free(tmp);
return true;
}
```
如先前提到的快慢指標的技巧,得以找出位於中間的位置,假設串列長度為 n,`while` 迴圈執行 $\lfloor \frac{n - 1}{2}\rfloor$次,此時 `slow` 可以停在第 $\lfloor \frac{n}{2}\rfloor$ 個節點上,這時候將節點相關的鍊結給移除,並且釋放記憶體,即完成操作。
### `q_reverse`
此函式的功能為將雙向環狀鍊結串列的順序反轉。
```c
void q_reverse(struct list_head *head) {
struct list_head* cur = head->next;
struct list_head *tmp;
while (cur != head) {
tmp = cur->next;
cur->next = cur->prev;
cur->prev = tmp;
cur = tmp;
}
tmp = head->next;
head->next = head->prev;
head->prev = tmp;
}
```
在雙向鍊結串列中,只要依序將 `next` 以及 `prev` 指標對調,即可完成反轉。
### `q_swap`
此函式功能為將鄰近串列之節點進行交換,考慮此處節點之資料型態為字串,如果單純地交換節點字串的話,當字串長度長的時候會造成負擔,因此此處實作應該採取交換指標的方式。
查看 linux kernel 的 `list.h` 中可以發現有 `list_swap` 的實作
```c
static inline void list_replace(struct list_head *old,
struct list_head *new)
{
new->next = old->next;
new->next->prev = new;
new->prev = old->prev;
new->prev->next = new;
}
static inline void list_swap(struct list_head *entry1,
struct list_head *entry2)
{
struct list_head *pos = entry2->prev;
list_del(entry2);
list_replace(entry1, entry2);
if (pos == entry1)
pos = entry2;
list_add(entry1, pos);
}
```
直接使用這兩個函式固然能有正確結果,但這兩個函式並不只為了處理相鄰節點的交換,因此這裡稍作修改,加入 `__glibc_unlikely(pos == entry1)` 輔助編譯器進行優化,並且在 `q_swap` 中確保 `list_swap` 傳入的參數符合 `unlikely` 的假設。
```c
void q_swap(struct list_head *head)
{
// https://leetcode.com/problems/swap-nodes-in-pairs/
struct list_head *cur = head->next;
while (cur != head && cur->next != head) {
struct list_head *tmp = cur->next->next;
list_swap(cur->next, cur);
cur = tmp;
}
}
```
### `q_delete_dup`
此函式要求刪除串列中所有重複的數值,在此假設串列以為排序狀態。
由於串列為排序狀態,因此若有出現重複的情況,則兩節點必定相鄰,透過這樣的思維去比較當前節點與下一個節點的字串是否相同來決定要不要刪除節點。
由於此處需要頻繁地取得下一個節點的資料,此處引入 linux kernel 中的巨集 `list_next_entry` 來獲取下一個節點的資料。
```c
#define list_next_entry(pos, member) \
list_entry((pos)->member.next, typeof(*(pos)), member)
bool q_delete_dup(struct list_head *head)
{
if (!head || list_is_singular(head) || list_empty(head))
return false;
// https://leetcode.com/problems/remove-duplicates-from-sorted-list-ii/
struct list_head *cur = head->next;
while (cur != head && cur->next != head) {
element_t *e1= list_entry(cur, element_t, list);
element_t *e2 = list_next_entry(e1, list);
bool flag = 0;
while (cur->next != head && !strcmp(e1->value, e2->value)) {
list_del(cur->next);
free(e2->value);
free(e2);
flag = 1;
e2 = list_next_entry(e1, list);
}
struct list_head *tmp = cur->next;
if (flag) {
list_del(cur);
free(e1->value);
free(e1);
}
cur = tmp;
}
return true;
}
```
當發現重複節點時,就先固定的位置 `cur`,並且不斷刪除重複的節點,直到 `cur->next` 不再重複,或者已到達串列的結尾。
#### 可能的改進方向
查看 glibc 的實作,`strcmp` 必須逐個 byte 去比較
```c
https://github.com/zerovm/glibc/blob/master/string/strcmp.c
do
{
c1 = (unsigned char) *s1++;
c2 = (unsigned char) *s2++;
if (c1 == '\0')
return c1 - c2;
} while (c1 == c2);
```
當 `strcmp` 的最糟情況發生時,代表整個 `e1` 紀錄的字串都會看過。如果在過程中紀錄 `e1` 字串的長度,並且改用如`memcmp` 這種一次可比較不只一個 byte 的函式是否能有更好的效能。
### `q_reversek`
此函式將串列中 k 個節點視為一個群組,並且將這個群組進行反轉。
此處可善用先前實作過的 `reverse` 來輔助完成。
```c
void q_reverseK(struct list_head *head, int k)
{
// https://leetcode.com/problems/reverse-nodes-in-k-group/
if (k <= 1 || !head || list_empty(head) || list_is_singular(head))
return;
struct list_head *cur, *tmp;
struct list_head *sub_head = head;
unsigned int count = 0;
list_for_each_safe (cur, tmp, head) {
count++;
if (count == k) {
struct list_head sublist;
INIT_LIST_HEAD(&sublist);
list_cut_position(&sublist, sub_head, cur);
q_reverse(&sublist);
list_splice(&sublist, tmp->prev);
count = 0;
sub_head = tmp->prev;
}
}
}
```
此處透過 `list_for_each_safe` 走訪串列,並當計數達到 k 的時候,透過 `list_cut_position` 巨集截出子串列,再使用已實作的 `q_reverse` 反轉此部份,最後再將結果透過 `list_splice` 加入至原先的位置。
要注意的是 `list_cut_position` 的第二個參數 `head_from` 預設傳入的是不帶資料的串列開頭,因此實際切割的範圍會是 `head_from->next` 至 `node`,所以此處 `sub_head` 初始化為 `head` 並且後續更新時會將位置設在實際想切割的節點的前一個位置。
`list_splice` 的行為也類似,他假設傳入的是串列開頭,因此 `tmp` 雖是 `cur` 的下一個節點,但是依然使用 `tmp->prev` 作為參數。
### `q_ascend`, `q_descend`
這兩個函式取自於 [Leetcode 2487. Remove Nodes From Linked List](https://leetcode.com/problems/remove-nodes-from-linked-list/),題目要透過刪除特定節點使串列變為遞增或者遞減的排序狀態。
以 `q_ascend` 為例,對於串列中任意節點,只要在其右側存在比自己小的元素,那就要將該元素刪除。由於 `queue.c` 中的串列為雙向環狀鍊結串列,可以在給定開頭節點的情況下,反向拜訪串列。若透過 `next` 指標拜訪串列為遞增,則反向拜訪時必須要呈現遞減,可以利用這個特性來解題。
```c
int q_ascend(struct list_head *head)
{
element_t *cur, *tmp;
const element_t *min_node = list_last_entry(head, element_t, list);
list_for_each_entry_prev_safe(cur, tmp, head, list)
{
if (strcmp(cur->value, min_node->value) > 0) {
list_del(&cur->list);
free(cur->value);
free(cur);
} else {
min_node = cur;
}
}
return q_size(head);
}
```
此處加入 `list_for_each_entry_prev_safe` 巨集來簡化程式碼複雜度並反向拜訪串列。
此處預期串列節點的字典序要越看越小,在進入迴圈前先將 `min_node` 設為串列尾巴的節點,當 `cur` 的字典序大於 `min_node` 時,就將 `cur` 給刪除,否則就更新 `min_node` 的資訊。
### `q_merge`
此函式功能為將數個以排序的佇列合併成一個佇列(須維持排序狀態),此函式有個比較不一樣的地方是參數 `list_head` 是一個定義在 `q_test.c` 中的變數 `chain` 這個串列的開頭(串列上的元素為 `q_context_t`),我們需要用這個開頭來獲得所有的佇列,並將其合併。
```c
typedef struct {
struct list_head head;
int size;
} queue_chain_t;
static queue_chain_t chain = {.size = 0};
static queue_contex_t *current = NULL;
//q_test.c: do_new
if (exception_setup(true)) {
queue_contex_t *qctx = malloc(sizeof(queue_contex_t));
list_add_tail(&qctx->chain, &chain.head);
qctx->size = 0;
qctx->q = q_new();
qctx->id = chain.size++;
current = qctx;
}
//q_test.c: do_merge
if (current && exception_setup(true))
len = q_merge(&chain.head, descend);
```
上述程式碼中顯示了 `q_test` 是如何在 `new` 命令時,將一個新的佇列加入至 `chain` 上,`q_test` 中除了 `do_merge` 以外的函式在呼叫 `queue.c` 中的則是將 `current->q` 作為參數。
```c
int q_merge(struct list_head *head, bool descend)
{
queue_contex_t *cur;
bool (*cmp)(const char *a, const char *b) =
descend ? str_cmp_dsc : str_cmp_asc;
struct list_head *lh = NULL;
list_for_each_entry (cur, head, chain) {
if (!lh) {
lh = cur->q;
lh->prev->next = NULL;
} else {
cur->q->prev->next = NULL;
lh->next = merge(lh->next, cur->q->next, cmp);
INIT_LIST_HEAD(cur->q);
}
}
struct list_head *tmp = lh->next, *prev = lh;
while (tmp) {
tmp->prev = prev;
prev = tmp;
tmp = tmp->next;
}
lh->prev = prev;
prev->next = lh;
return q_size(lh);
}
```
善用先前在 `mergesort` 實作的 `merge` 函式,來將 `chain` 上所有佇列都合併到 `chain` 上的第一個佇列。
這邊的原因是 `q_test` 中的 `do_merge` 有一段程式會將 `chain` 上的佇列釋放掉。
```c
struct list_head *cur = chain.head.next->next;
while ((uintptr_t) cur != (uintptr_t) &chain.head) {
queue_contex_t *ctx = list_entry(cur, queue_contex_t, chain);
cur = cur->next;
q_free(ctx->q);
free(ctx);
}
```
因此為了確保所有含有字串的節點能夠保留,必須選擇將節點掛在 `chain` 的第一個佇列上,同時這也是 `merge(lh->next, cur->q->next, cmp);` 完成後需要將 `cur->q` 初始化的原因,也是為了避免這段程式碼的 `q_free` 執行時發生錯誤。
## 論文 <Dude, is my code constant time?> 閱讀
這篇論文提出了一項稱為 `dudect` 的工具,可用於檢測程式的執行時間是否為常數時間,`dudect` 透過直接執行程式的方式作為統計資料的來源。
`dudect` 工具的執行流程可以分為下面三個步驟
1. 執行時間量測
1. 資料後處理
2. 假設檢定
來判定程式的執行時間是否為常數。
### 執行時間量測
在量測時間的部份,首先我們需要定義兩組類別的輸入,分別為 fixed value 以及 random value。在 fixed value 的部份,論文提到可以選擇可觸發 special corner-case 的內容作為輸入。
接著是量測時間的方式,根據現代 CPU 提供的 cycle counter,例如 X86 架構的 TSC 或者 ARM 架構的 systick,透過這些裝置取得的數值,可以精準量測執行時間。
最後是去除環境干擾的部份,考慮測試環境並不是特別在裸機環境中測試,於作業系統環境中量測時間時,可能會受到許多干擾(排程器的上下文切換(context switch)、資源分配以及系統中斷等諸多元素),因此每一輪要量測的項目必須從 fixed 或者 random 中隨機選擇一組。
關於測試資料選擇帶來的影響在 [Statistics and Secret Leakage](https://citeseerx.ist.psu.edu/document?repid=rep1&type=pdf&doi=8a6943177c1a48b2052b059666f7d869d3e0e7c7)論文中有更詳細的說明。
### 資料後處理
在作資料分析的時候,如何去除不具代表性的離群值是一個對分析結果至關重要的因素。
論文中提到說對於量測時間通常會呈現正偏態(positive skewed)的形式,如下圖所示

正偏態指的是大多數的資料會集中在左側,代表未受干擾的執行時間,而受到干擾的執行時間在此應屬於小部份的資料,圖中的 x 軸表示的為執行時間,y 軸則代表出現的頻率(frequency)。
對於去除離群值的手法,論文中提到兩種方式,分別為
* 裁剪(cropping): 此方法設定一與輸入資料類別獨立的閥值,將蒐集到的資料中大於閥值的部份去除
* 高階預處理(higher-order preprocessing): 此方法的目的為進一步捕捉資料在[高階統計量](https://en.wikipedia.org/wiki/Higher-order_statistics)的差異性,可讓後續假設檢定的部份對高階統計量更敏感。
### 假設檢定
這個階段主要是對蒐集到的兩組執行時間資料做檢定,已判斷隨機輸入的執行時間是否跟固定輸入的執行時間在分佈相等。
假設檢定的方法有很多,論文中提到兩種,這邊著重在 Welch’s t-test 上。
假設檢定的流程主要如下,由研究者提出一段虛無假說(Null hypothesis,記為 $H_0$),接著提出一組對立假說(Alternative hypothesis, 紀為 $H_1$),接著選定顯著水準作 $\alpha$ 作為最終是否能接受 $H_0$ 的標準。
根據 Welch's t-test 我們先計算兩組測試資料的平均數以及變異數,此處的 $\bar{X}$ 為樣本平均數,$s^2_x$ 則為樣本變異數,令 X 代表的為固定測資的量測時間,Y 為隨機測資的量測時間,下標代表為第幾筆的執行時間,n 為總共有多少筆資料。
$\Large \bar{X} = \frac{1}{n_x} \sum_{i=1} ^{n_x} X_i$
$\Large {s^2_X} = \frac{1}{n_x - 1} \sum_{i=1} ^{n_x} (X_i - \bar{X})^2$
$\Large \bar{Y} = \frac{1}{n_y} \sum_{i=1} ^{n_y} Y_i$
$\Large {s^2_Y} = \frac{1}{n_y - 1} \sum_{i=1} ^{n_y} (Y_i - \bar{Y})^2$
接著我們根據公式以及先前的結果計算出 t 值,我們可以觀察一下公式中的數字對於 t 值結果的影響,當兩組數據平均值差異很大時,t 值會變大(或變小)
$\Large t = \frac{\bar{X} - \bar{Y}}{\sqrt{\frac{s_X^2}{n_X} + \frac{s_Y^2}{n_Y}}}$
當 t 值越接近零時,代表兩組數據的差異性越小,到此已經可以根據先前定義的 $\alpha$ 下結論,當 $|t|$ 值大於 $\alpha$ 時,我們認為兩組數據存在差異,反之認為兩組數據相同。
### lab0-c 中 `dudect` 的實作
首先我們從 `qtest.c` 中可以看到幾個函式
* `is_remove_tail_const`, `is_remove_head_const`, `is_insert_tail_const` 和 `is_insert_head_const`
不過如果使用 `grep is_remove_tail_const -nr` 會發現找不到函式的定義在哪裡,原因是這些函式是由巨集輔助定義而來。
首先從 `fixture.h` 中找到一個巨集定義 `_`,傳入一個參數 `x`,並且展開為 `bool is_##x##_const(void);`
```c
/* define in consant.h
#define DUT_FUNCS \
_(insert_head) \
_(insert_tail) \
_(remove_head) \
_(remove_tail)
*/
#define _(x) bool is_##x##_const(void);
DUT_FUNCS
#undef _
```
此處可參閱 C 語言規格書 6.10.3.3 對於 `##` 運算子的說明
> For both object-like and function-like macro invocations, before the replacement list is reexamined for more macro names to replace, each instance of a ## preprocessing token in the replacement list (not from an argument) is deleted and the preceding preprocessing token is concatenated with the following preprocessing token. Placemarker preprocessing tokens are handled specially: concatenation of two placemarkers results in a single placemarker preprocessing token, and concatenation of a placemarker with a non-placemarker preprocessing token results in the non-placemarker preprocessing token. If the result is not a valid preprocessing token, the behavior is undefined. The resulting token is available for further macro replacement. The order of evaluation of ## operators is unspecified.
其中提到,當要被替換的參數前後方都跟著 `##` 運算子的話,那前置處理器就會在文字替換完成後將他們拼接在一起。
由此處可以得知,此段巨集被展開後會成為 `bool is_remove_tail_const(void)` 等等,也就是我們在 `qtest.c` 中所看到的函式名稱。
接著看 `constant.h` 中的巨集,也用到了類似的技巧
```c
#define DUT_FUNCS \
_(insert_head) \
_(insert_tail) \
_(remove_head) \
_(remove_tail)
#define DUT(x) DUT_##x
enum {
#define _(x) DUT(x),
DUT_FUNCS
#undef _
};
```
巨集 `DUT_FUNCS` 會展開成`_(is_insert_head)` 等一系列的內容,與 `fixture.h` 的巨集搭配,則可以產生出函式的定義`bool is_insert_head_const(void);` 等一系列定義
接著再次使用 `DUT_FUNCS` 達成 `enum` 的定義,可以看到在這裡面有一個 `_` 的定義,會被替換成 `DUT(x),`,而 `DUT(x)` 又會被替換成 `DUT_x` 根據傳入的 x 可以產生出不同的拼接效果。
`enum` 的內部使用了 `DUT_FUNCS` 得到 `_(is_insert_head)` 等等的內容,在此就會被替換成 `DUT_insert_head,` 等等內容,由這些內容組成最終的 `enum`
在 [chiangkd](https://hackmd.io/@chiangkd/2023spring-lab0-c#%E7%A0%94%E8%AE%80%E8%AB%96%E6%96%87%E4%B8%A6%E6%94%B9%E9%80%B2-dudect) 的筆記中看到老師的留言,得知這個技巧稱為 [X_macro](https://en.wikipedia.org/wiki/X_macro),意指同一個 `_` 定義在不同上下文中產生不同展開的效果。
最後查看 `fixture.c` 中,可以看到下列定義
```c
#define DUT_FUNC_IMPL(op) \
bool is_##op##_const(void) { return test_const(#op, DUT(op)); }
#define _(x) DUT_FUNC_IMPL(x)
DUT_FUNCS
#undef _
```
此處的 `_` 會將 `DUT_FUNCS` 展開成函式的實作
```c
bool is_insert_head_const(void) { return test_const("insert_head", DUT_insert_head); }
```
到此便得知了 `is_xxx_const` 這系列函式實際上是呼叫 `test_const` 來完成相關的操作。
查看 `test_const` 函式,可以發現會進行 `TEST_TRIES` 輪的測試,每輪測試將蒐集 `ENOUGH_MEASURE / (N_MEASURES - DROP_SIZE * 2)` 進行測試。
```c
// in fixture.c
static t_context_t *t
static bool test_const(char *text, int mode)
{
...
t = malloc(sizeof(t_context_t));
for (int cnt = 0; cnt < TEST_TRIES; ++cnt) {
init_once();
for (int i = 0; i < ENOUGH_MEASURE / (N_MEASURES - DROP_SIZE * 2) + 1;
++i)
result = doit(mode);
if (result)
break;
}
...
}
// in constant.c
static struct list_head *l = NULL;
```
迴圈一開始透過 `init_once` 初始化 `t` 和 `l` 這兩個變數,將 `l` 修改為 NULL,以及將 `t_context_t` 的內容初始化為 0。
```c
typedef struct {
double mean[2];
double m2[2];
double n[2];
} t_context_t;
```
接著查看 `doit` 函式的內容
```c
static bool doit(int mode)
{
// 分配相關記憶體資源
...
if (!before_ticks || !after_ticks || !exec_times || !classes ||
!input_data) {
die();
}
prepare_inputs(input_data, classes);
bool ret = measure(before_ticks, after_ticks, input_data, mode);
differentiate(exec_times, before_ticks, after_ticks);
update_statistics(exec_times, classes);
ret &= report();
// 釋放相關資源
...
return ret;
}
```
此處又有幾個關鍵的函式,首先為 `prepare_inputs` 對應論文方法的第一階段
```c
void prepare_inputs(uint8_t *input_data, uint8_t *classes)
{
randombytes(input_data, N_MEASURES * CHUNK_SIZE);
for (size_t i = 0; i < N_MEASURES; i++) {
classes[i] = randombit();
if (classes[i] == 0)
memset(input_data + (size_t) i * CHUNK_SIZE, 0, CHUNK_SIZE);
}
for (size_t i = 0; i < N_MEASURES; ++i) {
/* Generate random string */
randombytes((uint8_t *) random_string[i], 7);
random_string[i][7] = 0;
}
}
```
這邊首先將 `input_data` 整塊初始化為隨機生成的資料,再來根據論文所敘述每一輪測試應該要隨機挑選 fixed value 或是 random value,因此再次獲得一個隨機的 bit 來決定該輪的測試資料要使用哪一種類型,如果此值為 0 就將 input_data 對應的位移量(offset)設成 0。
`input_data` 也產生 `N_MEASURES` 組的隨機字串供後續使用。
接著是 `measure` 的部份,裡面有一個 `switch` 包含 default 共對應了 5 個 case,但考慮前面有 `assert`,且每個 case 都含有 `break` 敘述,因此 `default` 應該是不會執行到,另外 `inset` 和 `remove` 各有兩組這邊各留下一組作為說明。
```c
bool measure(int64_t *before_ticks,
int64_t *after_ticks,
uint8_t *input_data,
int mode)
{
assert(mode == DUT(insert_head) || mode == DUT(insert_tail) ||
mode == DUT(remove_head) || mode == DUT(remove_tail));
switch (mode) {
case DUT(insert_head):
for (size_t i = DROP_SIZE; i < N_MEASURES - DROP_SIZE; i++) {
char *s = get_random_string();
dut_new();
dut_insert_head(
get_random_string(),
*(uint16_t *) (input_data + i * CHUNK_SIZE) % 10000);
int before_size = q_size(l);
before_ticks[i] = cpucycles();
dut_insert_head(s, 1);
after_ticks[i] = cpucycles();
int after_size = q_size(l);
dut_free();
if (before_size != after_size - 1)
return false;
}
break;
case DUT(remove_tail):
for (size_t i = DROP_SIZE; i < N_MEASURES - DROP_SIZE; i++) {
dut_new();
dut_insert_head(
get_random_string(),
*(uint16_t *) (input_data + i * CHUNK_SIZE) % 10000 + 1);
int before_size = q_size(l);
before_ticks[i] = cpucycles();
element_t *e = q_remove_tail(l, NULL, 0);
after_ticks[i] = cpucycles();
int after_size = q_size(l);
if (e)
q_release_element(e);
dut_free();
if (before_size != after_size + 1)
return false;
}
break;
}
```
此處有個問題是為何迴圈是從 `DROP_SIZE` 開始算起,不應該是跑完後才決定有多少離群值應該要刪除嗎,查看 `dudect` 論文的原版沒有看到這樣的行為,先紀錄下此狀況。
```c
static void measure(dudect_ctx_t *ctx) {
for (size_t i = 0; i < ctx->config->number_measurements; i++) {
ctx->ticks[i] = cpucycles();
do_one_computation(ctx->input_data + i * ctx->config->chunk_size);
}
for (size_t i = 0; i < ctx->config->number_measurements-1; i++) {
ctx->exec_times[i] = ctx->ticks[i+1] - ctx->ticks[i];
}
}
```
從 `insert_head` 和 `remove_tail` 的共同處檢視
* `dut_new`: 用於呼叫 `q_new()` 產生一個新的佇列
* `dut_insert_head`: 此處為巨集定義,展開後如下
```c
do {
int j = *(uint16_t *) (input_data + i * 2) % 10000;
while (j--)
q_insert_head(l, get_random_string());
} while (0)
```
可以看到它從 `input_data` 中隨機取了一個大小 2 位元組的無號整數,用於將佇列的長度調整為 `j`,並且此處輸入的皆是隨機產生的字串
* `dut_free`: 用於呼叫 `q_free()` 釋放測試用的佇列
測試程式主要的部份如下,開始測試前紀錄佇列目前的大小以及當前的 cpu cycles,然後呼叫待測函式,在函式結束後再次讀取 cpu cycles,之後可以用兩值差異作為此次執行時間之依據。
```c
int before_size = q_size(l);
before_ticks[i] = cpucycles();
dut_insert_tail(s, 1);
after_ticks[i] = cpucycles();
int after_size = q_size(l);
dut_free();
if (before_size != after_size - 1)
return false;
```
若待測函式執行後,佇列大小不如預期則直接 `return`,在 `remove_tail` 則多了一個將取得的 `element_t` 給釋放的動作
在此發現第二個問題,隨機測資的機制似乎不完善,在 `insert` 中永遠都使用隨機字串,而在 `remove` 的案例中的隨機變數應該設為是否將字串從佇列中複製出來。
接下來的 `differentiate(exec_times, before_ticks, after_ticks);
` 函式計算每一筆 cpu cycles 前後的差異,並且將結果存在 `exec_times` 中。
再來的 `update_statistics(exec_times, classes);` 則是將每一筆量測到的執行時間拿來計算 t test 所需要的統計量,樣本平均數以及樣本變異數(函式 `t_push` 負責計算)。注意此處缺少了去除離群值的實作,將帶有誤差的資料拿來計算可能是我們無法通過評分的原因之一。
最後的 `report` 函式負責判斷此次測試的結果是否為常數時間
```c
tatic bool report(void)
{
double max_t = fabs(t_compute(t));
double number_traces_max_t = t->n[0] + t->n[1];
double max_tau = max_t / sqrt(number_traces_max_t);
printf("\033[A\033[2K");
printf("meas: %7.2lf M, ", (number_traces_max_t / 1e6));
if (number_traces_max_t < ENOUGH_MEASURE) {
printf("not enough measurements (%.0f still to go).\n",
ENOUGH_MEASURE - number_traces_max_t);
return false;
}
printf("max t: %+7.2f, max tau: %.2e, (5/tau)^2: %.2e.\n", max_t, max_tau,
(double) (5 * 5) / (double) (max_tau * max_tau));
/* Definitely not constant time */
if (max_t > t_threshold_bananas)
return false;
/* Probably not constant time. */
if (max_t > t_threshold_moderate)
return false;
/* For the moment, maybe constant time. */
return true;
}
```
注意此處在尚未獲得足夠多的測試資料時,會直接離開函式,應該先行判斷測資數量是否充足,再進行後續的操作。
統整一下目前的執行流程,從 `q_test.c` 中的 `queue_XXX` 作為函式呼叫的起點
```
queue_XXX(position_t pos, int argc, char *argv[])
=> is_XXX_const
=> test_const("XXX", DUT_XXX)
=> doit(DUT_XXX)
```
當 `doit(DUT_XXX)` 執行完成後便以獲得統計檢驗的結果,此結果一路回傳至 `q_test.c` 中的函式,並且根據結果印出是否通過測試。
### lab0-c dudect 實作之改善
#### `measure` 函式取得測資的計數有誤
> commit [f9837fc](https://github.com/ibat10clw/lab0-c/commit/f9837fcd059cf642482c9950e98a3a6b83215c19)
在 dudect 的原實作中,可以看到他將取得的執行時間資料的前幾筆給丟棄,不參與統計量的計算,但在 lab0-c 中卻是在量測執行時間時的迴圈用 `DROP_SIZE` 扣掉要量測的次數
```diff
bool measure(int64_t *before_ticks,
case DUT(insert_tail):
- for (size_t i = DROP_SIZE; i < N_MEASURES - DROP_SIZE; i++) {
+ for (size_t i = 0; i < N_MEASURES; i++) {
char *s = get_random_string();
dut_new();
dut_insert_head(
static void update_statistics(const int64_t *exec_times, uint8_t *classes)
{
+ for (size_t i = DROP_SIZE; i < N_MEASURES; i++) {
```
在此我們固定在 `measure` 內量測 `N_MEASURES` 次,並且在計算統計量時才去除前 `DROP_SIZE` 筆資料。
#### `update_statistics` 沒有去除離群值
> commit [f9837fc](https://github.com/ibat10clw/lab0-c/commit/f9837fcd059cf642482c9950e98a3a6b83215c19)
在 dudect 的論文方法中,有提到剪裁(cropping)的資料後處理手法,dudect 的實作中將每一筆執行時間的資料分為 102 種不同的 case 處理,已確保在不同資料處理的狀況下,量測函式都要能通過 t-test
```c
#define DUDECT_NUMBER_PERCENTILES 100
static void update_statistics(dudect_ctx_t *ctx) {
for (size_t i = 10 /* discard the first few measurements */; i < (ctx->config->number_measurements-1); i++) {
int64_t difference = ctx->exec_times[i];
...
t_push(ctx->ttest_ctxs[0], difference, ctx->classes[i]);
for (size_t crop_index = 0; crop_index < DUDECT_NUMBER_PERCENTILES; crop_index++) {
if (difference < ctx->percentiles[crop_index]) {
t_push(ctx->ttest_ctxs[crop_index + 1], difference, ctx->classes[i]);
}
if (ctx->ttest_ctxs[0]->n[0] > 10000) {
double centered = (double)difference - ctx->ttest_ctxs[0]->mean[ctx->classes[i]];
t_push(ctx->ttest_ctxs[1 + DUDECT_NUMBER_PERCENTILES], centered * centered, ctx->classes[i]);
}
}
}
```
在此我們引入同樣的手法,計算出百分位數作為去除離群值的依據
```diff
typedef struct {
double mean[2];
double m2[2];
double n[2];
+ int64_t *percentiles;
+ bool first;
} t_context_t;
```
在結構體中新增兩個成員,用於判斷是否要計算百分位數以及儲存計算的結果
```diff
@@ -45,5 +45,6 @@ void t_init(t_context_t *ctx)
ctx->m2[class] = 0.0;
ctx->n[class] = 0.0;
}
+ ctx->first = true;
```
在 `init_once` 呼叫的 `t_init` 中,固定把 `first` 設成 `true`
```diff
static bool doit(int mode) {
...
bool ret = measure(before_ticks, after_ticks, input_data, mode);
differentiate(exec_times, before_ticks, after_ticks);
+ if (t->first) {
+ t->percentiles = calloc(DUDECT_NUMBER_PERCENTILES, sizeof(int64_t));
+ prepare_percentiles(t, exec_times);
+ } else
+ update_statistics(exec_times, classes);
ret &= report();
...
}
```
並且將第一組獲得的執行時間用於計算百分位數,不參與統計量的計算,百分位數的計算參考原 dudect 的實作,將完整的一組資料做排序,並且計算出各個 index 的閥值,除存在結構體的成員中。
```c
static int64_t percentile(int64_t *a, double which, size_t size)
{
size_t array_position = (size_t) ((double) size * (double) which);
return a[array_position];
}
t->percentiles[i] = percentile(
exec_times,
1 - (pow(0.5, 10 * (double) (i + 1) / DUDECT_NUMBER_PERCENTILES)),
N_MEASURES);
```
接這是統計量計算函式的修改
```diff
+#define PERCENTILE_UPPER_BOUND 95
static void update_statistics(const int64_t *exec_times, uint8_t *classes)
{
for (size_t i = DROP_SIZE; i < N_MEASURES; i++) {
int64_t difference = exec_times[i];
/* CPU cycle counter overflowed or dropped measurement */
if (difference <= 0)
continue;
+ if (difference < t->percentiles[PERCENTILE_UPPER_BOUND]) {
+ t_push(t, difference, classes[i]);
}
}
}
```
我們將先前計算好的百分位數拿來與執行時間 `difference` 作比較,僅保留小於閥值的資料計算統計量。
並且考慮去除部份數值後,資料量可能不足以滿足 dudect 的評測,因此原本固定的迴圈次數修改為不斷執行,直到取得足夠測試資料為止。
```diff
static bool test_const(char *text, int mode)
{
bool result = false;
...
- for (int i = 0; i < ENOUGH_MEASURE / (N_MEASURES - DROP_SIZE * 2) + 1;
- ++i)
+ while (t->n[0] + t->n[1] < ENOUGH_MEASURE)
result = doit(mode);
...
return result;
}
```
#### `report` 函式的優化
> commit [676b460](https://github.com/ibat10clw/lab0-c/commit/676b460018daf4a57136bb7fe42422401f240ea1)
在 lab0-c 的實作中 `report` 函式每次都會計算 t value,但是在測試資料不足時卻又不採用此次結果。
```c
static bool report(void) {
double number_traces_max_t = t->n[0] + t->n[1];
printf("\033[A\033[2K");
if (number_traces_max_t < ENOUGH_MEASURE) {
printf("not enough measurements (%.0f still to go).\n",
ENOUGH_MEASURE - number_traces_max_t);
return false;
}
double max_t = fabs(t_compute(t));
...
}
```
因此在這邊將判斷測試資料的程式碼移動至函式最前面,已確保不會產生不必要的計算。
#### `measure` 中沒有對完整的測資做隨機處理
> commit [8f57667](https://github.com/ibat10clw/lab0-c/commit/8f5766781b840c11fc62795a7c4205cebe0f6cdf)
回想一下 `queue.c` 中 insert 和 remove 的函式參數
```c
element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize)
bool q_insert_head(struct list_head *head, char *s)
```
有一個共通點為佇列,另外 insert 函式中有一個字串參數 `s`,而 remove 函式中則是有 `sp` 以及 `bufsize`,對照 dudect 論文所述,必須能產生隨機類型的輸入。對應上這些參數,我們可以透過隨機變數初始化下列參數:
* `head`: 輸入佇列的長度
* `s`: 輸入字串的長度
* `bufsize`: 需要複製出的資料大小
* `sp`: 是否為 NULL(需與 `bufsize` 搭配)
在原本 lab0-c 的實作中,僅將取得的隨機數用於初始化佇列的大小,對於 remove 函式所需要的變數皆設為固定的數值`element_t *e = q_remove_tail(l, NULL, 0);`,這便是先前 remove 函式能通過測試的原因。
我將整個 `measure` 函式改寫成下面的形式
```c=
bool measure(int64_t *before_ticks,
int64_t *after_ticks,
uint8_t *input_data,
int mode)
{
for (size_t i = 0; i < N_MEASURES; ++i) {
char *s, tmp[32] = {0};
char *sp = NULL;
int buf_sz = 0;
uint16_t rn = *(uint16_t *) (input_data + i * CHUNK_SIZE) % 10000;
if (rn == 0) {
s = "FIXEDSTR";
} else {
s = get_random_string();
sp = tmp;
buf_sz = (rn % 8) + 1;
}
dut_new();
if (mode == DUT_remove_head || mode == DUT_remove_tail)
rn++;
dut_insert_head(get_random_string(), rn);
bool (*insert_func)(struct list_head *, char *) = NULL;
element_t *(*remove_func)(struct list_head *, char *, size_t) = NULL;
if (mode == DUT_insert_head || mode == DUT_insert_tail)
insert_func =
mode == DUT_insert_head ? q_insert_head : q_insert_tail;
else if (mode == DUT_remove_head || mode == DUT_remove_tail)
remove_func =
mode == DUT_remove_head ? q_remove_head : q_remove_tail;
int before_size = q_size(l);
if (insert_func) {
before_ticks[i] = cpucycles();
insert_func(l, s);
after_ticks[i] = cpucycles();
} else {
before_ticks[i] = cpucycles();
element_t *e = remove_func(l, sp, buf_sz);
after_ticks[i] = cpucycles();
if (e)
q_release_element(e);
}
int after_size = q_size(l);
dut_free();
if (insert_func && before_size != after_size - 1)
return false;
else if (remove_func && before_size != after_size + 1)
return false;
}
return true;
}
```
重要的改動有以下幾點:
1. 將 switch 取消,改為用 `mode` 以及函式指標決定受測函式(25 ~ 30 行)
3. 以 `input_data` 隨機取出的 `rn` 判斷此次測試使用固定測資還是隨機測資(7 ~ 16 行)
4. 以函式指標是否為空判斷後續要使用哪一個函式指標作為受測函式(33 ~ 43 行)
5. 以函式指標判斷函式執行是否有成功(47 ~ 49 行)
### 修改完後的測試
做了上述修改後,現在四個佇列操作函式皆無法通過測試,不過我認為這是預期內的狀況
這個 [repo](https://github.com/ibat10clw/dudect) 是我從原版 [dudect](https://github.com/oreparaz/dudect) 複製修改的
我將測試資料大小設定為 8 bytes,並且以 `memcmp` 作為測試函式,此處預期至少會比較 1 個位元組的資料,至多 8 個,在執行時間上的差異上可以算是非常小。
* 設成 8 是為了與 lab0-c 的 dudect 的隨機字串長度一樣
```c
int check_tag(uint8_t *x, uint8_t *y, size_t len) {
return memcmp(x, y, len);
}
uint8_t do_one_computation(uint8_t *data) {
/* simulate totally bogus MAC check in non-constant time */
return check_tag(data, secret, 8);
}
```
以下是測試結果,依然被 deduct 被判定為非常數執行時間,由此可得知 deduct 具有相當的敏感度。
```bash
$ ./dudect_simple_O0
meas: 0.10 M, max t: +2.00, max tau: 6.41e-03, (5/tau)^2: 6.09e+05. For the moment, maybe constant time.
meas: 0.19 M, max t: +3.56, max tau: 8.20e-03, (5/tau)^2: 3.72e+05. For the moment, maybe constant time.
meas: 0.30 M, max t: +3.85, max tau: 7.04e-03, (5/tau)^2: 5.04e+05. For the moment, maybe constant time.
meas: 0.40 M, max t: +3.95, max tau: 6.25e-03, (5/tau)^2: 6.40e+05. For the moment, maybe constant time.
...
...
meas: 5.63 M, max t: +9.87, max tau: 4.16e-03, (5/tau)^2: 1.45e+06. For the moment, maybe constant time.
meas: 5.73 M, max t: +9.93, max tau: 4.15e-03, (5/tau)^2: 1.45e+06. For the moment, maybe constant time.
meas: 5.83 M, max t: +9.80, max tau: 4.06e-03, (5/tau)^2: 1.52e+06. For the moment, maybe constant time.
meas: 5.93 M, max t: +9.95, max tau: 4.09e-03, (5/tau)^2: 1.50e+06. For the moment, maybe constant time.
meas: 6.03 M, max t: +9.91, max tau: 4.03e-03, (5/tau)^2: 1.54e+06. For the moment, maybe constant time.
meas: 6.13 M, max t: +10.10, max tau: 4.08e-03, (5/tau)^2: 1.50e+06. Probably not constant time.
```
考慮我 lab0-c 實作的佇列操作使用 `memcpy` 以及 `strdup` 等函式進行資料複製,這些函式執行時間本身會隨著資料量而改變,此外還有 insert 函式中的 malloc 涉及作業系統資源分配,因此即便 `list_add` 以及 `list_del` 屬於常數時間操作,但整體上來說佇列的 insert 以及 remove 並非常數時間,導致未能通過測試。
## 新增 shuffle 命令
首先我們觀察 console 的命令是如何被建立的,在 `console.h` 中可以看到一個巨集
`#define ADD_COMMAND(cmd, msg, param) add_cmd(#cmd, do_##cmd, msg, param)`
這個巨集在函式 `console_init` 中被頻繁使用,它用到了前面說過得 `##` 運算子,以及 `#` 運算子。
`#` 運算子的作用是將要代換的文字內容給字串化,在 C 語言來說也就是加上 `""` 進行修飾。
以下是一個使用的範例
```c
ADD_COMMAND(dedup, "Delete all nodes that have duplicate string", "");
add_cmd("dedup", do_dedup, "Delete all nodes that have duplicate string", "")
```
第一個參數 `dedup` 會被字串化成 `"dedup"` 後作為 `add_cmd` 的第一個參數,另外 `dedup` 也會被拼接成 `du_dedup`,由此可知我們要加入的函式必須取名為 `do_xxx` 的形式,並且在 `qtest.c` 中實作這個函式,在由這個 `do_xxx` 的函式去完成我們想要達成的內容。
接著開始新增命令,同樣地在 `console_init` 中加入一個 `ADD_COMMAND`
```c
ADD_COMMAND(shuffle, "Shuffle nodes in queue randomly", "");
```
並且在 `q_test.c` 中提供介面
```c
static bool do_shuffle(int argc, char *argv[])
{
q_shuffle(current->q);
return true;
}
```
`q_shuffle` 的本體實作如下
> commit [ecf7321](https://github.com/ibat10clw/lab0-c/commit/ecf7321051d24ff5b167016f225460cfb8e5ac5e)
```diff
+ struct list_head **nodes =
+ (struct list_head **) malloc(sizeof(struct list_head *) * sz);
+ struct list_head *cur = head->next;
+ for (int i = 0; i < sz; i++) {
+ nodes[i] = cur;
+ cur = cur->next;
+ }
+ for (int i = sz - 1; i > 0; i--) {
+ randombytes(buf, sizeof(buf));
+ int j = *(uint32_t *) buf % (i + 1);
+ if (i != j) {
+ struct list_head *tmp = nodes[i];
+ nodes[i] = nodes[j];
+ nodes[j] = tmp;
+ }
+ }
+ INIT_LIST_HEAD(head);
+ for (int i = 0; i < sz; i++) {
+ list_add_tail(nodes[i], head);
```
由於串列上的資料為字串,要直接在串列上採用[Yates 演算法](https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle)需要頻繁地複製字串資料,因此藉由一個動態分配的陣列以快速取得每一個指向串列節點的指標,也可以透過陣列隨機訪問的特性快速向陣列後方交換元素。
當 Yates 演算法在儲存串列節點指標的陣列完成洗牌後,我們可以在根據這個順序恢復串列的結構。
## 實作 XOR PRNG 並整合至 qtest
> commit [82a267d](https://github.com/ibat10clw/lab0-c/commit/82a267d84ff3242c5f2f720aa26d5c71163a0dea) and [4872ef1](https://github.com/ibat10clw/lab0-c/commit/4872ef19d4319f4cf785eac597e14a29f686e1e2)
首先是新增一個 option 可以設定 PRNG 的值,以決定使用內建(`/dev/urandom`)作為亂數來源或者是使用新增的 XOR PRNG 來產生隨機數。
```diff
@@ -75,6 +75,8 @@ static int string_length = MAXSTRING;
+static int prng = 0;
@@ -173,7 +175,14 @@ static void fill_rand_string(char *buf, size_t buf_size)
- randombytes((uint8_t *) randstr_buf_64, len * sizeof(uint64_t));
+ switch (prng) {
+ case 1:
+ // TODO: implement a prng use xor shift
+ break;
+ case 0:
+ default:
+ randombytes((uint8_t *) randstr_buf_64, len * sizeof(uint64_t));
+ }
@@ -1111,6 +1120,9 @@ static void console_init()
+ add_param("prng", &prng,
+ "Select the random number generator [0:/dev/urandom, 1:xor]",
+ NULL);
```
首先新增一個變數,用來區分當前的亂數模式,並且在產生隨機字串的函式加入判斷,根據當前模式產生出亂數。
模仿其他 option 的實作,可透過 `add_param` 將變數 `prng` 加入 `param_list` 這個串列上。
```c
/* Integer-valued parameters */
typedef struct __param_element {
char *name;
int *valp;
char *summary;
/* Function that gets called whenever parameter changes */
setter_func_t setter;
struct __param_element *next;
} param_element_t;
```
`param_list` 串列除了紀錄 option 的數值以外,也紀錄 option 的名稱以及提示訊息,可以在 `qtest` 中輸入 help 查看 option 的狀態,以及使用方式。
### XOR shift 實作
> commit [220eb9a](https://github.com/ibat10clw/lab0-c/commit/220eb9a18f90f480de3322b58a2be183c07ab8a8)
在實作 XOR shift 前,我先行閱讀了[Xorshift RNGs](https://www.jstatsoft.org/article/view/v008i14)這篇論文,得知 XOR shift 有一個限制為存在週期,週期的長短跟 bit 數有關,最長為 $2^{n-1}$,因此為了避免太快的達到一個週期,而產生出重複的序列,我以 64 bits 進行實作。
```c
static void xorshift(u_int64_t *val)
{
*val ^= *val << 13;
*val ^= *val >> 7;
*val ^= *val << 17;
}
```
首先查看 `xorshift` 函式的運算,每次進行三組 XOR + shift 的操作。
這個運算的原理來自於線性代數,可以將這個過程視為一個線性轉換 $T = (I +L^a)(I+R^b)(I+L^c)$,此處 I 為[單位矩陣](https://zh.wikipedia.org/zh-tw/%E5%96%AE%E4%BD%8D%E7%9F%A9%E9%99%A3),$L^x$ 為向左位移 x 位的矩陣,$R^x$ 為向右位移 x 位的矩陣。
考慮一個 [GF(2)](https://en.wikipedia.org/wiki/GF(2)) 上的 4 階單位矩陣與一個 $L^1$ 矩陣,以及用一個列向量 y 表示二進位數字每個 bit 的狀態
$y^T = \begin{pmatrix}
0 \\ 1 \\ 1 \\ 0
\end{pmatrix}$
$I = \begin{pmatrix}
1 & 0 & 0 & 0 \\
0 & 1 & 0 & 0 \\
0 & 0 & 1 & 0 \\
0 & 0 & 0 & 1
\end{pmatrix}$
$L^{1} = \begin{pmatrix}
0 & 0 & 0 & 0 \\
1 & 0 & 0 & 0 \\
0 & 1 & 0 & 0 \\
0 & 0 & 1 & 0
\end{pmatrix}$
* GF2 是一個有限體,可理解為所有運算的結果都要 mod 2,所以在這個有限體中的加法運算跟 XOR 是等價的,乘法則跟 AND 等價
當 $y * L^{1}$ 時
* $y[0] = (0*0+1*1+1*0+0*0) = 1$
* $y[1] = (0*0+1*0+1*1+0*0) = 1$
* $y[2] = (0*0+1*0+1*0+0*1) = 0$
* $y[3] = (0*0+1*0+1*0+0*0) = 0$
$\hat y^T = \begin{pmatrix}1 \\ 1 \\ 0 \\ 0
\end{pmatrix}$
單位矩陣 $I$ 與 $y$ 的乘積依然是 $y$,最後我們把 $y$ 和 $\hat y$ 相加,可以得到
$y^* = \begin{pmatrix} 1 \\ 0 \\ 1 \\ 0 \end{pmatrix}$
這時候就完成了一個狀態的轉換。
* 因矩陣運算具有分配律,因此對 $I+L^1$ 的結果相乘也會有同樣結果。
關於為什麼一次 xor shift 涵蓋了三次狀態轉換的原因,擷取自論文原文
>Unfortunately, when n is 32 or 64, no choices for a and b will
provide such a T with the required order. (A preliminary test on candidates is to square T n times. If the result is not
T , then T cannot have order $2^{n-1}$. By representing the rows of a binary matrix as a computer word, then xoring all
of the rows for which a (left) multiplying binary vector has 1’s, very fast binary matrix products can be evaluated with
simple C or Fortran programs, and it only takes a few minutes to check every possible a, b in $T = (I + L^a )(I + R^b ).)$
However, there are many choices for a, b, c for which the matrices $T = (I + L^a )(I + R^b )(I + L^c )$ have or-
der 2n − 1, when n = 32 or n = 64. There are 81 triples (a, b, c), a < c, for which the 32 × 32 binary matrix
$T = (I + L^a )(I + R^b )(I + R^c )$ has order $2^{32} −1$
為了避免週期過早的重複,在 32 或者 64 bits 的整數上才採取三個一組的運算。
接著是透過 xor shift 生成隨機數的函式本體
```c
void xor_rng(uint8_t *buf, size_t n)
{
static struct {
u_int64_t x;
int cnt;
} state = {.x = 0, .cnt = 7}; // each state x can generate 8 bytes data
if (__glibc_unlikely(state.x == 0)) {
int ret = getrandom(&state.x, sizeof(state.x), 0);
assert(ret);
}
size_t offset = 0;
while (n--) {
if (state.cnt == 0) {
xorshift(&state.x);
state.cnt = 7;
}
buf[offset++] = (state.x >> (state.cnt << 3)) & 0xFF;
state.cnt -= 1;
}
}
```
使用 64 bit 時,一次可以產生 8 個 8 位元組的隨機數,為了不要浪費產生的結果而快速抵達新的週期,因此使用一個結構體包裝狀態以及記數 `cnt`,當記數值歸零的時候,就更新當前的狀態。
另外由於每次要取 8 byte,透過將 `state.x` 向右位移 8 的倍數次,便可以取得每一個 byte 的狀態。其中 `state.cnt << 3` 相當於乘 8,也就是說透過 `state.cnt` 的值決定要取出 `state.x` 的哪個部份。
另外也將這個新的隨機數生成器整合至洗牌函式中
```diff
- randombytes(buf, sizeof(buf));
+ if (prng == 0)
+ randombytes(buf, sizeof(buf));
+ else
+ xor_rng(buf, sizeof(buf));
```
## 使用統計手法比較隨機數品質
在這個地方,我們使用卡方檢定以及熵來比較 linux kernel 中的 PRNG 以及我們實作的 xor shift 是否能達到足夠的亂度(各結果出現的頻率接近且無法預測其順序)。
測試程式取自於[作業說明](https://hackmd.io/@sysprog/linux2025-lab0/%2F%40sysprog%2Flinux2025-lab0-d)中的 python 程式,測試步驟如下
1. 啟動 `qtest` 並建立一個新佇列
2. 於佇列中插入資料 `1`, `2`, `3`, `4`
3. 執行 `shuffle` 命令共 1000000 次
4. 統計各結果出現的頻率
5. 計算卡方數 $X^2 = \sum_{i=1}^n {(O_i - E_i)^2 \over E_i}$
* $O_i$ 為均勻分佈下各個結果的理論值
* $E_i$ 為觀測到的數值
```
PRNG: /dev/urandom
Expectation: 41666
Observation: {'1234': 42174, '1243': 41662, '1324': 41774, '1342': 41450, '1423': 41631, '1432': 41652, '2134': 41798, '2143': 41769, '2314': 41722, '2341': 41576, '2413': 41710, '2431': 41407, '3124': 41806, '3142': 41419, '3214': 41607, '3241': 41642, '3412': 41522, '3421': 41709, '4123': 41638, '4132': 41630, '4213': 41813, '4231': 41652, '4312': 41482, '4321': 41755}
chi square sum: 14.37671002736044
PRNG: xor shift
Expectation: 41666
Observation: {'1234': 41565, '1243': 41704, '1324': 41749, '1342': 41707, '1423': 42060, '1432': 41478, '2134': 41617, '2143': 41700, '2314': 41527, '2341': 41729, '2413': 41397, '2431': 42126, '3124': 41919, '3142': 41602, '3214': 41551, '3241': 41729, '3412': 41990, '3421': 41594, '4123': 41786, '4132': 41779, '4213': 41434, '4231': 41242, '4312': 41769, '4321': 41246}
chi square sum: 27.956607305716886
```
在繼續進行卡方檢定前,先看兩組數據畫出的直方圖,可以看到資料的分佈大致上是平均的


我們算出了兩組卡方數為 14.36 以及 27.95,以常見的 $\alpha = 0.05$ 作為顯著水準,顯著水準的意義代表 [Type 1 error](https://zh.wikipedia.org/zh-tw/%E5%9E%8B%E4%B8%80%E9%8C%AF%E8%AA%A4%E8%88%87%E5%9E%8B%E4%BA%8C%E9%8C%AF%E8%AA%A4) 發生的機率(也可理解為偽陽性)。
此處需要進行查表找到 p value 為多少,p value 代表此次觀察到的結果所發生的紀律,並且與 $\alpha$ 比大小,來判斷是否能夠接受卡方檢定的虛無假說。
我們的測試資料可能的結果共有 $4! = 24$ 種,因此自由度為 23
| DF\P | 0.995 | 0.975 | 0.20 | 0.10 | 0.05 | 0.025 | 0.02 | 0.01 | 0.005 | 0.002 | 0.001 |
|----|-------|-------|-------|-------|-------|-------|-------|-------|-------|-------|-------|
| 21 | 8.034 | 10.283| 26.171| 29.615| 32.671| 35.479| 36.343| 38.932| 41.401| 44.522| 46.797|
| 22 | 8.643 | 10.982| 27.301| 30.813| 33.924| 36.781| 37.659| 40.289| 42.796| 45.962| 48.268|
| 23 | 9.260 | 11.689| 28.429| 32.007| 35.172| 38.076| 38.968| 41.638| 44.181| 47.391| 49.728|
第一組內建隨機數的卡方數為 14.36,對應的 p 值在 0.975 ~ 0.2 之間,第二組採取 xor shift 的卡方數為 27.95,對應的 p 值同樣在 0.975 ~ 0.2 之間。
因此當顯著水準為 0.05 時,不足以否定此組資料屬於均勻分佈。
另外將同一組資料拿來計算熵,兩者的數值相當接近
```
Entropy for /dev/urandom: 4.5850 bits
Entropy for xor shift: 4.5849 bits
```
距離理論值相當接近,因此可以相信此組資料具備一定的亂度。
$H = \log_2(24) \approx 4.585 \text{ bits}$
不過根據卡方檢定和熵的計算,不難發現兩者皆在乎最後的頻率,並不在意產生的順序,根據 chatgpt 的建議,這邊引入一樣測試[自我相關函數](https://zh.wikipedia.org/zh-tw/%E8%87%AA%E7%9B%B8%E5%85%B3%E5%87%BD%E6%95%B0),來驗證隨機生成的序列是否存在特定的模式,也就是 $x_t$ 與 $x_{t+k}$ 是否存在特定的關係。
自我相關函數的計算流程如下
1. 計算平均數以及變異數
2. 將原本資料扣掉平均數達到去中心化的效果
3. 計算長度 k 的子序列中的共變異數 $\gamma_k = \sum_{t=1}^{N-k} \bigl(x_t - \bar{x}\bigr) \bigl(x_{t+k} - \bar{x}\bigr)$
* 此處的 k 可自己定義大小,通常隨資料量變大或變小
4. 標準化 $gamma_k = \frac{\gamma_k}{\gamma_0}, \quad k = 0, 1, 2, \dots, k_{\max}.$
5. 在給定的信賴區間下,判斷 $\gamma_k$ 是否大於$\pm \frac{1.96}{\sqrt{N}}.$,N 為樣本數量,如果除了 $\gamma_0$ 以外都有在誤差內,那便可以相信此組數據不存在特定的模式。
進一步將結果繪製成圖後可以觀察到所 $\gamma_k$ 皆在紅線(標準)附近,因此可判斷兩組 PRNG 產生的亂數都具備一定品質。
* 此處因為標準化的計算,因此第一筆數值必定為 1.0,可將其忽略
* x 軸的 Lag 表示的為: 第 N 筆資料與 N+k 筆資料的關係,以我們的資料量 100000 筆來說,假設 k 為 2,計算的就是 $x_t$ 與 $x_t+2$ 的差異,並且從 $t = 1$ 到 $t = 100000 - 2$


## 使用 `signal` 系統呼叫判斷程式的超時
在 `qtest` 的執行中,若程式太沒有效率的話會出現下列的錯誤訊息
```
+++ TESTING trace trace-01-ops:
# Test of insert_head and remove_head
ERROR: Time limit exceeded. Either you are in an infinite loop, or your code is too inefficient
```
但此時 `qtest` 並不會卡在這個測試或者是掛掉,而是會繼續執行後續的測試項目,能夠達成這樣的效果用的便是 `signal` 的系統呼叫。
在 `qtest` 中使用了兩個 `signal` 來捕捉程式的異常行為,分別為 `SIGSEGV` 和 `SIGALRM`,前者用於偵測記憶體錯誤,而後者用於監測執行時間是否超時。
註冊一個 `signal` 需要給定對應的編號以及觸發 `signal` 時要執行的函式
```c
signal(SIGALRM, sigalrm_handler);
static void sigalrm_handler(int sig)
{
trigger_exception(
"Time limit exceeded. Either you are in an infinite loop, or your "
"code is too inefficient");
}
```
之後可以透過 `alarm(n);` 的方式,讓 kernel 在特定的時間後發出 `SIGALRM` 的 `signal`,若傳入的 `n` 為 `0` 則代表取消先前設定的數值。
查看 `qtest` 中 `do_xxx` 的程式碼中可以看到諸如此類的架構
```c
if (exception_setup(true))
q_free(current->q);
exception_cancel();
```
在 `exception_setup` 中設定了 `alarm(time_limit);` 的鬧鐘,以及使用 `sigsetjmp` 設定了一個跳轉點,若此時 `q_free` 在 `alarm` 到來前執行完成,那麼順著程式碼會執行 `exception_cancel` 這個函式,透過 `alarm(0);` 取消掉先前設置的鬧鐘。
反之若無法在 `alarm` 到來前執行完成的話,那麼根據先前註冊的 `signal` 此時會執行
```c
void trigger_exception(char *msg)
{
error_occurred = true;
error_message = msg;
if (jmp_ready)
siglongjmp(env, 1);
else
exit(1);
}
```
並透過先前設定的跳轉點,跳轉至 `exception_setup` 的下列片段
```c
if (sigsetjmp(env, 1)) {
/* Got here from longjmp */
jmp_ready = false;
if (time_limited) {
alarm(0);
time_limited = false;
}
if (error_message)
report_event(MSG_ERROR, error_message);
error_message = "";
return false;
}
```
在此便會印出錯誤訊息並回傳 `false`,所以回到 `do_free` 的函式時便會因為 `if` 的不成立而終止 `q_free` 的執行。
## `select` 系統呼叫
在 `qtest` 中有一個 `web` 命令,可啟動一個 web server,此時 `qtest` 便有了兩個來源的輸入,分別為 `stdin` 與來自 web server 的輸入,為了同時處理兩個來源,此處透過 `select` 的系統呼叫來實現。
當在 `qtest` 輸入了 `web` 命令後,`eventmux_callback` 將被 `web_eventmux` 函式給賦值。往後進入到 `line_edit` 函式時,就多了從 `web` 讀取命令的判斷。
```c
if (eventmux_callback != NULL) {
int result = eventmux_callback(l.buf, l.buflen);
if (result != 0)
return result;
}
nread = read(l.ifd, &c, 1);
if (nread <= 0)
return l.len;
```
關鍵的 `select` 系統呼叫的使用則在 `web_eventmux` 中
```c
int web_eventmux(char *buf, size_t buflen)
{
...
int result = select(max_fd + 1, &listenset, NULL, NULL, NULL);
if (result < 0)
return -1;
if (server_fd > 0 && FD_ISSET(server_fd, &listenset)) {
FD_CLR(server_fd, &listenset);
struct sockaddr_in clientaddr;
socklen_t clientlen = sizeof(clientaddr);
int web_connfd =
accept(server_fd, (struct sockaddr *) &clientaddr, &clientlen);
char *p = web_recv(web_connfd, &clientaddr);
char *buffer = "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n\r\n";
web_send(web_connfd, buffer);
strncpy(buf, p, buflen);
buf[buflen] = '\0';
free(p);
close(web_connfd);
return strlen(buf);
}
...
}
```
此處的 `select` 會阻塞住整個程式,直到有一個 fd 的狀態改變時才繼續執行。之後透過 `FD_ISSET` 來判斷 I/O 的來源是否是 web,如果是的話就執行 web 相對應的處理。
> On success, select() and pselect() return the number of file descriptors contained in the three returned descriptor sets (that is, the total number of bits that are set in readfds, writefds, exceptfds) which may be zero if the timeout expires before anything interesting happens. On error, -1 is returned, and errno is set appropriately; the sets and timeout become undefined, so do not rely on their contents after an error.
若來源不是 web,則回傳 `0`,回到 `line_edit` 函式後,再使用 `read` 將資料從 `stdin` 中讀取出來。