# 2025q1 Homework1 (lab0)
contributed by < `Andrushika` >
{%hackmd NrmQUGbRQWemgwPfhzXj6g %}
## 開發環境
```shell
$ gcc --version
gcc (Ubuntu 13.3.0-6ubuntu2~24.04) 13.3.0
$ lscpu
架構: x86_64
CPU 作業模式: 32-bit, 64-bit
Address sizes: 39 bits physical, 48 bits virtual
Byte Order: Little Endian
CPU(s): 16
On-line CPU(s) list: 0-15
供應商識別號: GenuineIntel
Model name: 11th Gen Intel(R) Core(TM) i7-11800H @ 2.30GHz
CPU 家族: 6
型號: 141
每核心執行緒數: 2
每通訊端核心數: 8
Socket(s): 1
製程: 1
CPU(s) scaling MHz: 31%
CPU max MHz: 4600.0000
CPU min MHz: 800.0000
BogoMIPS: 4608.00
Virtualization features:
虛擬: VT-x
Caches (sum of all):
L1d: 384 KiB (8 instances)
L1i: 256 KiB (8 instances)
L2: 10 MiB (8 instances)
L3: 24 MiB (1 instance)
NUMA:
NUMA 節點: 1
NUMA node0 CPU(s): 0-15
```
## `queue.[ch]` 佇列操作程式碼實作
### q_free
為預防被釋放的記憶體空間被意外存取,先使用 `q_release_element` 將 `entry` 逐一刪除;最後釋放 `head` 所使用的記憶體空間。
```c
void q_free(struct list_head *head)
{
if (!head)
return;
element_t *entry = NULL, *safe = NULL;
list_for_each_entry_safe (entry, safe, head, list) {
q_release_element(entry);
}
free(head);
}
```
### q_insert_head, q_insert_tail
#### 實作步驟
> 1. 檢查 `head` ,若為 `NULL` 直接 `return false`
> 2. 使用 `malloc` 為 `entry` 分配記憶體空間
> 3. 使用 `list_add` 或 `list_add_tail` 將節點加入鏈結串列中
因為第二個步驟(malloc entry)在兩個函式中完全重複,所以獨立寫成 `create_entry` 函式:
```c
element_t *create_entry(char *s)
{
element_t *entry = malloc(sizeof(element_t));
if (!entry)
return false;
char *s_dup = strdup(s);
if (!s_dup) {
free(entry);
return false;
}
entry->value = s_dup;
return entry;
}
```
`q_insert_head`, `q_insert_tail` 如下,可以直接呼叫 `create_entry` 使用:
```c
bool q_insert_head(struct list_head *head, char *s)
{
if (!head)
return false;
element_t *entry = create_entry(s);
list_add(&entry->list, head);
return true;
}
```
```c
bool q_insert_tail(struct list_head *head, char *s)
{
if (!head)
return false;
element_t *entry = create_entry(s);
list_add_tail(&entry->list, head);
return true;
}
```
### q_remove_head, q_remove_tail
:::info
一開始在閱讀 ``queue.h`` 時,發現這兩個函式關於 `*sp` 的 annotation 有些奇怪:
```
q_remove_tail() - Remove the element from tail of queue
@sp: string would be inserted
```
應修正如下較貼近原意:
```
@sp: Pointer to a buffer where the removed element's string will be copied.
```
:::
#### 實作步驟
> 1. 檢查 `head` ,若為 `NULL` 或 `empty` 則 `return NULL`
> 2. 使用 `list_first_entry` 或 `list_last_entry` 取得欲移除的 `element_t*`
> 3. 將欲刪除的 `entry->value` 存入 `*sp`
> 4. 使用 `list_del` 將其從鏈結串列中刪除
因兩者的實作幾乎相同,在此僅附上 `q_remove_head` 的程式碼:
```c
element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize)
{
if (!head || list_empty(head))
return NULL;
element_t *entry = list_first_entry(head, element_t, list);
if (sp && bufsize > 0) {
strncpy(sp, entry->value, bufsize - 1);
sp[bufsize - 1] = '\0';
}
list_del_init(&entry->list);
return entry;
}
```
### q_delete_mid
本函式的實作難點是「找到正確的 mid node」,因為一開始沒看清楚題目的要求,所以折騰很久。
根據 queue.h 的敘述,應該刪除 `⌊n/2⌋th node`。
#### 實作步驟
> 1. 檢查 `head` ,若為 `NULL` 或 `empty` 則 `return false`
> 2. 使用 `q_find_mid` 搭配快慢指標技巧,找到欲刪除的中點
> 3. 使用 **Linux Kernel list API** 提供的 `list_del` 刪除節點,並釋放記憶體空間
```c
struct list_head *q_find_mid(struct list_head *head)
{
// The middle node of a linked list of size n is the
// ⌊n / 2⌋ th node from the start using 0-based indexing.
if (!head || list_empty(head))
return NULL;
// handle the 1-node case
if (list_is_singular(head))
return head->next;
// find the middle node
struct list_head *slow = head->next, *fast = slow->next->next;
while (fast != head && fast->next != head) {
slow = slow->next;
fast = fast->next->next;
}
return slow->next;
}
```
```c
bool q_delete_mid(struct list_head *head)
{
// https://leetcode.com/problems/delete-the-middle-node-of-a-linked-list/
if (!head || list_empty(head))
return false;
// find the middle node
struct list_head *mid = q_find_mid(head);
element_t *target = list_entry(mid, element_t, list);
// delete node from the list
list_del(&target->list);
q_release_element(target);
return true;
}
```
### q_delete_dup
在已排序過的鏈節串列中,刪除重複出現的節點。
#### 實作步驟
> 1. 檢查 `head` ,若為 `NULL` 或 `empty` 則 `return false`
> 2. 使用 `list_for_each_safe` 安全走訪節點(因涉及刪除操作,使用 safe mode)
> **a.** 檢查 `curr` 和 `next` 是否相同;若出現重複,則持續刪除 `next`
> **b.** 如果出現重複,`curr` 本身也需要被刪除
```c
bool q_delete_dup(struct list_head *head)
{
// https://leetcode.com/problems/remove-duplicates-from-sorted-list-ii/
if (!head || list_empty(head))
return false;
struct list_head *curr, *next;
list_for_each_safe (curr, next, head) {
element_t *entry = list_entry(curr, element_t, list);
if (next != head &&
strcmp(entry->value, list_entry(next, element_t, list)->value) ==
0) {
while (next != head &&
strcmp(entry->value,
list_entry(next, element_t, list)->value) == 0) {
list_del(next);
q_release_element(list_entry(next, element_t, list));
next = curr->next;
}
list_del(curr);
q_release_element(entry);
}
}
return true;
}
```
### q_swap
#### 實作步驟
>1. 檢查 `head`,若為 `NULL` 或 `empty` 則直接 `return`
>2. 使用 `list_for_each_safe` 安全走訪節點
> **a.** 取出 `curr`(當前節點)與 `next`(下一個節點)
> **b.** 若 `next` 為 `head`,表示已走訪完整個鏈結串列,則跳出迴圈
> **c.** 使用 `list_del` 將 `next` 從原位置移除
> **d.** 使用 `list_add` 將 `next` 插入到 `curr` 前一個位置(節點交換)
> **e.** 更新 `next`,使其指向 `curr->next`,確保走訪能正確進行
其中 `2.e` 步驟看起來有些微妙。
因為 `swap` 時是兩兩一組,所以會希望 iterator 一次跳躍兩個節點。
`next = curr->next` 除了可以調整 `curr`, `next` 在 `swap` 過後的相對位置外,也剛好讓 iterator 前進了一步;加上 `list_for_each_safe` 的迴圈更新條件,剛好可以讓 iterator 達成前進兩步的目標。
```c
void q_swap(struct list_head *head)
{
if (!head || list_empty(head))
return;
// https://leetcode.com/problems/swap-nodes-in-pairs/
struct list_head *curr, *next;
list_for_each_safe (curr, next, head) {
if (next == head)
break;
list_del(next);
list_add(next, curr->prev);
next = curr->next;
}
}
```
### q_reverse
#### 實作步驟
>1. 檢查 `head`,若為 `NULL` 或 `empty` 則直接 `return`
>2. 使用 `list_for_each_safe` 安全走訪節點
>3. 依序把節點移除,並插入鏈結串列的最前方
實際上就像當作 stack 進行操作,就可以達到翻轉鏈節串列的效果。
```c
void q_reverse(struct list_head *head)
{
if (!head || list_empty(head))
return;
struct list_head *curr, *next;
list_for_each_safe (curr, next, head) {
list_del(curr);
list_add(curr, head);
}
}
```
### q_reverseK
這個函式的實作,我參照了先前 `reverse` 的實作方式:
* 不斷將新節點插入鏈節串列的最前方,來達成翻轉的效果。
* 不同的是,因為是以 `k` 個節點為一組進行翻轉,在每次走訪時,用一個 `count` 來紀錄當前累積走過了多少節點;當 `count == k` 就立刻進行翻轉操作。
* 雖仿造 `stack` , 但插入點不會永遠都在 `head` ,所以必須不斷紀錄新的插入點。
#### 變數說明
`pending`:下一輪即將進行 reverse 操作的節點
`last`:進行局部 reverse 操作時,`pending` 的插入點
`curr`, `next`:提供給 `list_for_each_safe` 走訪使用的 iterator
`count`:紀錄當前組別已累積走過多少個 `node`
```c
void q_reverseK(struct list_head *head, int k)
{
// https://leetcode.com/problems/reverse-nodes-in-k-group/
if (!head || list_empty(head) || k <= 1)
return;
// nodes waiting to be add & start add point
struct list_head *pending = head->next, *last = head;
struct list_head *curr, *next, *next_pend;
int count = 0;
list_for_each_safe (curr, next, head) {
if (++count >= k) {
while (count) {
count--;
next_pend = pending->next;
list_del(pending);
list_add(pending, last);
pending = next_pend;
}
last = next->prev;
}
}
}
```
### merge_two_sorted_list (utils)
因為後續的 sort, merge 都會涉及合併兩個「已排列鏈節串列」,所以先寫成一個工具函式來簡化操作。需要注意的是,傳入這個函式處理的 `list` 要先打斷 circular list 的頭尾連結,且不需要將 `guard node` 傳入。
#### 實作步驟
>1. 初始化 `guard` 節點,`tail` 指向 `guard` 以追蹤新鏈結串列的最後節點。
>2. 合併兩個排序鏈結串列,依 `q_cmp()` 比較 `a` 和 `b`,將較小者連接到 `tail`,並前進到下一節點。
>3. 處理剩餘節點,若 `a` 或 `b` 尚未走訪完,直接連接至 `tail`。
>4. 回傳排序後的鏈結串列起點(`guard.next`)。
:::info
這裡的實作比較冗長,且課程當中已有提及類似程式,故不在此詳細列出。
詳情請見 [github repo](https://github.com/andrew99154/lab0-c/blob/master/queue.c#L253)!
:::
### q_sort
#### top-down merge sort 的問題
* 當遇到 `size=2M` 的 test case,會一直出現程式執行時間過長的提示。
* 主要是因為在分割鏈結串列時,會使用到前面提及的 `q_find_mid`,其時間複雜度為 O(n)。
* 且儲存分割完成的鏈結串列也需要額外的記憶體空間,故改採使用 bottom-up 方式實作。
#### bottom-up merge sort 實作
* 概念很簡單:每次造訪一個節點,當遇到可以合併的(`size` 相同的兩個 `list`),就馬上進行合併。
* 全部的節點走訪完,再把剩下未合併的 `list` 逐一合併。
* 合併可以直接呼叫之前實作的 `merge_two_sorted_list`。
* 不必找 `list` 分割中點,省下了大把時間。
會需要額外使用一個 `part[]` 來存放部份合併完成的 `list`。
`part[]` 有特殊的存放規則:`part[n]` 中若不為 `empty`,就是大小為 $2^n$ 的 `list`
(這樣一來,檢查是否存在 `size` 相同的 `list` 就相當方便!)
```c
void q_sort(struct list_head *head, bool descend)
{
struct list_head *list;
struct list_head *part[65];
int level;
if (!head || list_empty(head) || list_is_singular(head))
return;
// break circular
head->prev->next = NULL;
head->prev = NULL;
list = head->next;
INIT_LIST_HEAD(head);
// init part[]
memset(part, 0, sizeof(part));
while (list) {
struct list_head *curr = list;
struct list_head *next = list->next;
curr->prev = NULL;
curr->next = NULL;
list = next;
// the number of nodes in part[] will always be 0 or 2^level
// [1, 2, 4, 8, 16...] or [0, 2, 0, 0, 16] are acceptable
// depends on level (idx)
// keep merging until unable to merge
for (level = 0; part[level]; level++) {
curr = merge_two_sorted_list(descend, part[level], curr);
part[level] = NULL;
}
part[level] = curr;
}
list = NULL;
for (level = 0; level < (int) (sizeof(part) / sizeof(part[0])); level++) {
if (!part[level])
continue;
list = merge_two_sorted_list(descend, part[level], list);
}
// reconstruct list
while (list) {
struct list_head *next = list->next;
list_add_tail(list, head);
list = next;
}
}
```
:::info
此處排序時是使用 `strcmp` ,所以節點是依照字典序 ([Lexicographic order](https://en.wikipedia.org/wiki/Lexicographic_order)) 排列。
e.g.
如果對 `[17, 2, 189, 38]` 進行字典序排列:
直覺上的數字由小到大排列結果,會是 `[2, 17, 38, 189]` ( X )
但實際的字典序排列結果為: `[17, 189, 2, 38]` ( O )
:::
### q_ascend, q_descend
本函式將會維護一個單調嚴格遞增(or 遞減)的 `list`。
在非鏈結串列的情境下,可以透過 `stack` 達成:
* 每次插入新數值到 `list` 時,和 `stack` 頂部的節點比大小
* 若 `stack` 頂部節點會違反遞增 or 遞減的條件,則持續 `pop stack` 直到符合條件為止
事實上,即使不使用 `stack`,也可以在 doubly linked list 上實作這個函式。
透過維護一個 `*tail`,其實際效用和 `stack.top()` 類似,代表維護的單調序列的最後一個節點位置。
因兩者實作幾乎相同,僅附上 `q_ascend` 作為參考:
```c
int q_ascend(struct list_head *head)
{
// https://leetcode.com/problems/remove-nodes-from-linked-list/
if (!head || list_empty(head))
return 0;
if (list_is_singular(head))
return 1;
struct list_head *tail = head->next, *curr, *tmp;
list_for_each (curr, head) {
while (tail != head &&
strcmp(list_entry(curr, element_t, list)->value,
list_entry(tail, element_t, list)->value) < 0) {
tmp = tail->prev;
list_del(tail);
q_release_element(list_entry(tail, element_t, list));
tail = tmp;
}
tail = curr;
}
return q_size(head);
}
```
### q_merge
#### 實作步驟
>1. 檢查 `head` 是否為 `NULL` or `empty`,若是則直接回傳 0
>2. 初始化變數,設定 `curr`(當前 queue),`result`(合併後的結果)
>3. 使用 `list_for_each_entry` 走訪 `head` 中的每個 `queue_context_t`,進行合併
>a. 斷開 `curr->q`(原為 circular list),確保 `merge_two_sorted_list()` 正常運作
>b. 將 `curr->q` 併入 `result`,維持 `result` 為當前已排序的結果
>c. `curr->q->next = curr->q;` 清除 `q` 之中已經排序完成的節點
>4. 將合併後的結果存入第一個 `queue`
>a. 取 `head->next` 作為 `ans_entry`,初始化 `ans_entry->q`
>b. 使用 `list_add_tail` 逐一將`result` 中的節點加入 `ans_entry->q`,重建合併完成的 queue
>5. 回傳合併後的 `q_size(ans_entry->q)`
``` c
int q_merge(struct list_head *head, bool descend)
{
// https://leetcode.com/problems/merge-k-sorted-lists/
if (!head || list_empty(head))
return 0;
queue_contex_t *curr = NULL, *ans_entry;
struct list_head *result = NULL;
list_for_each_entry (curr, head, chain) {
curr->q->prev->next = NULL;
result = merge_two_sorted_list(descend, result, curr->q->next);
curr->q->next = curr->q;
}
ans_entry = list_entry(head->next, queue_contex_t, chain);
INIT_LIST_HEAD(ans_entry->q);
while (result) {
struct list_head *next = result->next;
list_add_tail(result, ans_entry->q);
result = next;
}
return q_size(ans_entry->q);
}
```
## 研讀 Linux 核心原始碼 list_sort.c
### 引入 `list_sort.c`
> 以 2023 年的作業觀摩中 [Risheng1128](https://hackmd.io/@wanghanchi/linux2023-lab0#%E5%B0%87-list_sortc-%E5%8A%A0%E5%85%A5%E5%B0%88%E6%A1%88) 同學的實作方式為參考
首先,加入 `list_sort.c` 和 `list_sort.h` 到專案中。
* 在 linux kernel 中實作的版本,會傳入 `void *priv` 參數,而在 `lab0-c` 中都用不到,可以直接拿掉
* 需要使 `cmp` 函式有接收 `descend` 參數的功能,原先想要直接偷 `queue.c` 裡之前做的來用;但因 `queue.h` 不允許修改,所以直接複製一份 `q_cmp` 到 `list_sort.c` 中
更動範例如下(僅舉例,其他省略,請參閱 repo):
```diff
- static struct list_head *merge(void *priv, list_cmp_func_t cmp,
- struct list_head *a, struct list_head *b)
+ static struct list_head *merge(struct list_head *a, struct list_head *b, int descend)
```
為了後續測試時切換方便,在 qtest.c 的 console_init() 中加入 `list_sort` param,讓使用者可以直接指定要使用的排序演算法。
```c
add_param("listsort", &use_list_sort, "use linux kernel style sorting algorithm from lib/list_sort.c", NULL);
```
在 `do_sort()` 實作的地方,依據 `use_list_sort`(option 會改變的變數),去判斷當前應該使用哪個排序演算法:
```c
if (current && exception_setup(true)){
if (use_list_sort)
list_sort(current->q, descend);
else
q_sort(current->q, descend);
}
```
### 與我自己的 `q_sort()` 進行效能比較
首先,在 `trace/` 目錄下新增 `trace-sort.cmd`:
(根據需要,可以自行選擇要測試的 sorting 模式、資料量等等)
```
option listsort 1
new
ih RAND 1000000
time
sort
time
```
上方將會選用 lib/list_sort.c 中的演算法,執行 `1000000` 次插入操作後,進行排序與計時。
因為要測試的數據量比較大,會觸發到 harness.c 中定義的 alarm 觸發時間,而造成程式直接中斷;為了測試方便,先給一個比較寬容的 `time_limit`。
```c
// harness.c
static int time_limit = 10;
```
接下來,在 `script/` 目錄下新增用來統計排序執行時間的 `test_sort_perf.py`:
```python
import subprocess
import re
NUM_RUNS = 10
delta_times = []
for i in range(NUM_RUNS):
result = subprocess.run(["./qtest", "-f", "traces/trace-sort.cmd"],
capture_output=True, text=True)
output = result.stdout
matches = re.findall(r"Delta time\s*=\s*([\d\.]+)", output)
if len(matches) < 2:
print(f"{i+1}th test: No Delta time finded.")
continue
delta_time = float(matches[1])
delta_times.append(delta_time)
print(f"{i+1}th test: Delta time = {delta_time}")
if delta_times:
avg = sum(delta_times) / len(delta_times)
print(f"\naverage Delta time of {len(delta_times)} of sorting: {avg}")
else:
print("No Delta time finded.")
```
這段 script 將會連續執行 `NUM_RUNS` 次(預設為`10`)排序,並用 list 紀錄下每次排序花費的時間,最終對所有秒數取平均作為結果。
#### 測試結果
<div style="display: flex;">
<div style="margin-right: 20px;">
##### 1M 筆資料
| Round | q_sort | list_sort |
| -------- | -------- | -------- |
| 1 | 2.722 | 2.582 |
| 2 | 2.74 | 2.614 |
| 3 | 2.629 | 2.459 |
| 4 | 2.66 | 2.447 |
| 5 | 2.741 | 2.386 |
| 6 | 2.605 | 2.497 |
| 7 | 2.782 | 2.471 |
| 8 | 2.762 | 2.443 |
| 9 | 2.582 | 2.396 |
| 10 | 2.592 | 2.393 |
| **AVG** | **2.6815** | **2.4688** |
</div>
<div>
##### 2M 筆資料
| Round | q_sort | list_sort |
| -------- | -------- | -------- |
| 1 | 6.075 | 5.567 |
| 2 | 5.812 | 5.564 |
| 3 | 5.84 | 5.724 |
| 4 | 5.978 | 5.823 |
| 5 | 5.845 | 5.494 |
| 6 | 6.351 | 5.647 |
| 7 | 6.15 | 5.672 |
| 8 | 5.817 | 5.565 |
| 9 | 6.18 | 5.727 |
| 10 | 5.76 | 5.736 |
| **AVG** | **5.98** | **5.652** |
</div>
</div>
可以觀察到:
* 在資料量 1M 時,list_sort 比我的 q_sort 快大約 `0.22` 秒
* 在資料量 2M 時,list_sort 比我的 q_sort 快大約 `0.33` 秒
## 解釋 select 系統呼叫於本程式的使用
### console.c 處理 web 指令
當使用者輸入 command `web` 之後,將會建立 server socket,並使 `linenoise.c` 裡面的 `eventmux_callback` 的 function pointer 指向 `web_eventmux()`。
```c
static bool do_web(int argc, char *argv[])
{
// init... (ignore)
web_fd = web_open(port);
if (web_fd > 0) {
printf("listen on port %d, fd is %d\n", port, web_fd);
line_set_eventmux_callback(web_eventmux);
use_linenoise = false;
} else {
// handle error
}
return true;
}
```
### linenoise.c 中的 `line_edit()`
在 `line_edit()` 中使用 `while` 讀取輸入的地方,可以看到幾行程式碼:
```c
while (1) {
// init...(ignore)
if (eventmux_callback != NULL) {
int result = eventmux_callback(l.buf);
if (result != 0)
return result;
}
// handle input from command line...(ignore)
}
```
`eventmux_callback` 就是先前在 command line 執行 `web` 所設置的 function pointer,可以用來 handle 來自 web server 的命令。
### web.c 中的 `web_eventmux()`
這裡是主要使用系統呼叫 `select()` 的地方。`select()` 採用了 I/O Multiplexing Model,參考下圖:

`web_eventmux()` 中同時監聽來自 web server 和 stdin 的 file descriptor,任何一方觸發都會使程式繼續執行下去。
本函式主要 handle 來自 web server 的事件;因此,若是 stdin 觸發監聽,將不會進行對命令的處理,而是回到 linenoise.c 中的 `line_edit()` 才處理。
```c
int web_eventmux(char *buf)
{
fd_set listenset;
FD_ZERO(&listenset);
FD_SET(STDIN_FILENO, &listenset);
int max_fd = STDIN_FILENO;
if (server_fd > 0) {
FD_SET(server_fd, &listenset);
max_fd = max_fd > server_fd ? max_fd : server_fd;
}
int result = select(max_fd + 1, &listenset, NULL, NULL, NULL);
if (result < 0)
return -1;
if (server_fd > 0 && FD_ISSET(server_fd, &listenset)) {
//handle cmd from web server...(ignore)
return strlen(buf);
}
FD_CLR(STDIN_FILENO, &listenset);
return 0;
}
```
### 運用 RIO 套件的考量點
RIO 全名為 Robust I/O,根據[卡內基美隆大學的課程](https://(https://scs.hosted.panopto.com/Panopto/Pages/Viewer.aspx?id=f107c2ce-79d5-4529-baeb-2bb495d8c11a))中的介紹,RIO 可以有效解決 short count 的問題。除此之外,因為每次呼叫 `read()` 都需要從 user mode 切換到 kernel mode,如果頻繁呼叫會大幅降低效能。
RIO 套件的做法是:當系統呼叫 `read()` 時,一次性讀取一大塊資料到 user space buffer 中 (同時處理 short count),後續再多次從 buffer 取用即可;當讀到 buffer 為空時,才再次系統呼叫。
#### `rio_t` 結構
其中 `count` 代表著 buffer 中尚未被取用的資料長度。
所以 `count <= 0` 時,代表 buffer 已經空了,需要再次 `read()`。
```c
typedef struct {
int fd; /* descriptor for this buf */
int count; /* unread byte in this buf */
char *bufptr; /* next unread byte in this buf */
char buf[BUFSIZE]; /* internal buffer */
} rio_t;
```
#### `rio_read()`
```c
static ssize_t rio_read(rio_t *rp, char *usrbuf, size_t n)
{
int cnt;
while (rp->count <= 0) { /* refill if buf is empty */
rp->count = read(rp->fd, rp->buf, sizeof(rp->buf));
if (rp->count < 0) {
if (errno != EINTR) /* interrupted by sig handler return */
return -1;
} else if (rp->count == 0) { /* EOF */
return 0;
} else
rp->bufptr = rp->buf; /* reset buffer ptr */
}
/* Copy min(n, rp->count) bytes from internal buf to user buf */
// 處理 short count
cnt = n;
if (rp->count < n)
cnt = rp->count;
memcpy(usrbuf, rp->bufptr, cnt);
rp->bufptr += cnt;
rp->count -= cnt;
return cnt;
}
```
#### console.c 中很酷的 `buf_stack`
一開始讀到以下程式碼,覺得很困惑,什麼情景下會用到這個?
```c
static rio_t *buf_stack;
//
/* Create new buffer for named file.
* Name == NULL for stdin.
* Return true if successful.
*/
static bool push_file(char *fname)
{
int fd = fname ? open(fname, O_RDONLY) : STDIN_FILENO;
has_infile = fname ? true : false;
if (fd < 0)
return false;
if (fd > fd_max)
fd_max = fd;
rio_t *rnew = malloc_or_fail(sizeof(rio_t), "push_file");
rnew->fd = fd;
rnew->count = 0;
rnew->bufptr = rnew->buf;
rnew->prev = buf_stack;
buf_stack = rnew;
return true;
}
/* Pop a file buffer from stack */
static void pop_file()
{
if (buf_stack) {
rio_t *rsave = buf_stack;
buf_stack = rsave->prev;
close(rsave->fd);
free_block(rsave, sizeof(rio_t));
}
}
```
後來找到使用了 `push_file()` 的地方:
```c
static bool do_source(int argc, char *argv[])
{
// error hadling...(ignore)
if (!push_file(argv[1])) {
report(1, "Could not open source file '%s'", argv[1]);
return false;
}
return true;
}
```
發現原來是執行 `source` 的時候會用到,是為了處理 nested `source` commands 的情況。
舉例來說,以下有三個 file,將執行一些命令:
##### fileA
```shell
> source fileB
> it 6
```
##### fileB
```shell
> source fileC
> it 3
```
##### fileC
```shell
> new
> it 8
```
##### result
```c
q = [8, 3, 6]
```
如果使用者執行 `./qtest` 後輸入 `source fileA`,就會造成遞迴式的開檔、執行,而先前宣告的 `buf_stack` 就是為了因應這種情況而存在。
每當讀到 `source`,代表需要開新的檔案;新增、初始化新檔案對應的 `rio_t` 之後,將其 `push` 到 `buf_stack` 中;直到檔案內容被讀完就 `pop_file()`,就會回到先前讀到一半的檔案狀態。
可以對應到 console.c 中 `readline()` 的其中一段程式碼:
(系統呼叫 `read()` 之後,發現沒有剩餘的可讀資料,代表 EOF)
```c
if (buf_stack->count <= 0) {
/* Need to read from input file */
buf_stack->count = read(buf_stack->fd, buf_stack->buf, RIO_BUFSIZE);
buf_stack->bufptr = buf_stack->buf;
if (buf_stack->count <= 0) {
/* Encountered EOF */
pop_file();
if (cnt > 0) {
/* Last line of file did not terminate with newline. */
/* Terminate line & return it */
*lptr++ = '\n';
*lptr++ = '\0';
if (echo) {
report_noreturn(1, prompt);
report_noreturn(1, linebuf);
}
return linebuf;
}
return NULL;
}
}
```
### 可能的改善方向
當我在閱讀這段 code 的時候,覺得有些不直覺。目前的實作會在 `line_edit()` 裡面同時檢查來自 web server 的 request。
在 OOP 中有所謂的 [SOLID 原則](https://en.wikipedia.org/wiki/SOLID)(即便 C 並非 OO),其中的[單一職責原則 (Single-resbonsibility principle)](https://en.wikipedia.org/wiki/Single-responsibility_principle) 提到,函式應將關注點分離,每個函式只專注做一件事情。
因此,`line_edit()` 是否應該專注於處理來自 command line 的命令?
又剛好看到老師老師撰寫的[並行程式設計:排程器](https://hackmd.io/@sysprog/concurrency-sched),裡面的 `event_loop()` 實作給了我一些啟發:

我想可以參考這樣的函式呼叫方式,但不以 coroutine 的方式實作。
是否有可能仿照,在 `main_loop()` 中僅去呼叫 `select()`;當 web server 或 command line 事件被觸發時,才去呼叫對應的 handling 函式?
## 實作 shuffle 命令
在 `queue.c` 裡新增了 `q_shuffle`,實作參考的是 Fisher–Yates shuffle 演算法。
考量到原演算法中有多次存取陣列內容、swap 操作,而我需要對 doubly linked list 進行 `shuffle`;在不以其他資料結構加速存取過程的狀況下,查找特定節點的時間複雜度為 `O(n)`。
所以實作時佔以一個 `struct list_head **arr` 存下各節點的位置,待 `shuffle` 操作執行完畢後,再逐一將節點串起來。
### q_shuffle 實作
```c
void q_shuffle(struct list_head *head)
{
if (!head || list_empty(head) || list_is_singular(head))
return;
int len = q_size(head), i = 0;
struct list_head **arr = malloc(len * sizeof(struct list_head *));
struct list_head *curr, *tmp;
list_for_each (curr, head) {
arr[i++] = curr;
}
// pick a node randomly and swap
for (i = len - 1; i > 0; i--) {
int j = rand() % (i + 1);
tmp = arr[i];
arr[i] = arr[j];
arr[j] = tmp;
}
// reconstruct the doubly linked list
for (i = 0; i + 1 < len; i++) {
arr[i]->next = arr[i + 1];
arr[i + 1]->prev = arr[i];
}
head->next = arr[0];
arr[0]->prev = head;
head->prev = arr[len - 1];
arr[len - 1]->next = head;
free(arr);
}
```
### 以 Chi-squared test 檢測「亂度」
為了要檢測每次的 `shuffle` 是否足夠隨機,可以套用假設檢定的技巧來判斷。
> 所謂「夠亂」,可以視為在做了足夠多次 shuffle 後,每組排列組合出現的機率相等(Uniform Distribution)。
為此,可以使用卡方檢定。該檢定可以用於判斷兩機率分佈是否相似。基於作業說明中提供的測試用 script,進行 `shuffle` 操作 `1000000` 次,得到各排列組合的次數分佈如下:

#### Chi-sqaured value 公式
```python
def chiSquared(observation, expectation):
return ((observation - expectation) ** 2) / expectation
```
#### 實驗數據
自由度:`23`(`4` 個數字共有 `4! = 24` 種排列組合,自由度 = `N-1`)
期望頻率:`1000000 / 24 = 41666`
Chi-squared value sum: `24.662986607785726`
p-value: `0.3678926504294987`
因 p-value 大於顯著水準 (alpha = 0.05),無法拒絕 $H_0$
($H_0$:shuffle 結果之分佈**不為** uniform distribution)
| 排列組合 | 出現頻率 | Chi-squared value |
| -------- | -------- | -------- |
| 1234 | 41488 | 0.7604281668506696 |
| 1243 | 42096 | 4.437671002736044 |
| 1324 | 41581 | 0.17340277444439112 |
| 1342 | 41835 | 0.6854749675994816 |
| 1423 | 41853 | 0.8392694283108529 |
| 1432 | 41970 | 2.218019488311813 |
| 2134 | 41631 | 0.02940047040752652 |
| 2143 | 41550 | 0.322949167186675 |
| 2314 | 41584 | 0.16137858205731292 |
| 2341 | 41872 | 1.018480295684731 |
| 2413 | 41318 | 2.906542504680075 |
| 2431 | 41879 | 1.0888734219747516 |
| 3124 | 41698 | 0.024576393222291555 |
| 3142 | 41459 | 1.0283924542792684 |
| 3214 | 41558 | 0.2799404790476648 |
| 3241 | 41873 | 1.0283924542792684 |
| 3412 | 41534 | 0.41818269092305477 |
| 3421 | 41842 | 0.7434358949743196 |
| 4123 | 41823 | 0.5915854653674458 |
| 4132 | 41516 | 0.5400086401382422 |
| 4213 | 41787 | 0.35138962223395576 |
| 4231 | 41521 | 0.5046080737291797 |
| 4312 | 41429 | 1.3480775692411078 |
| 4321 | 41303 | 3.1625066001056017 |
### 改善作業說明所提供的 test script
作業說明中的 script 存在小缺陷,導致執行效率極差,問題出在模擬輸入 command 的地方:
```python
test_count = 1000000
input = "new\nit 1\nit 2\nit 3\nit 4\n"
for i in range(test_count):
input += "shuffle\n"
input += "free\nquit\n"
```
在 python 中的字串處理,直接使用 `+=` 進行操作,實際上是重複 allocate 新記憶體空間、複製舊字串、貼上新字串;時間複雜度可達到 $O(n^2)$。
用以下方式改善:
```python
input_commands = ["new\n", "it 1\n", "it 2\n", "it 3\n", "it 4\n"]
input_commands.extend(["shuffle\n"] * test_count)
input_commands.append("free\nquit\n")
input = "".join(input_commands)
```
先對 `list` 進行新增重複性高的 command substring,再一次性使用 `join()` 建立字串,可以省下不必要的 allocate 操作。
## 研讀論文〈Dude, is my code constant time?〉
:::info
原論文:[Dude, is my code constant time?](https://eprint.iacr.org/2016/1123.pdf)
Dudect 工具:https://github.com/oreparaz/dudect
:::
作者提出了一種使用統計學上「假設檢定」的方式,來檢測函式是否為常數時間複雜度。可以大致總結為以下步驟:
1. **測量函式執行時間**
首先將 input data 分為兩大類:`fix class`, `random class`
- `fix class` 的資料內容固定,且可以在常數執行時間完成
- `random class` 為隨機產生
把兩類資料餵給欲測試的函式,並分類記錄下每次函式執行所花的時間。
(在實作中,執行時間使用 `cpu_cycles()` 測量)
2. **資料後處理 (post-processing)**
測量結果常會受到外部環境影響(如 OS interrupt),而出現異常極端值。因此,對數據後處理可以提升分析的精準度。
在[原專案](https://github.com/oreparaz/dudect)中,會給定一系列的裁切比例,將測量值較高的部分數據丟棄。
3. **套用假設檢定**
使用 Welch’s t-test,檢測剛剛獲得的兩類資料的執行時間分佈是否相似。如果 `random class` 和 `fix class` 的執行時間分布不同,則可以合理推斷該函式「非」常數時間複雜度。
### 原專案的實作步驟
1. 統計一函式的執行時間
- 使用 `cpu_cycles()` 取得 `before_ticks` 和 `after_ticks`
- `exec_time = after_ticks - before_ticks`
3. 用測得的時間資料,透過 `t_push()` 在線更新 `mean`, `m2`, `n`
4. 呼叫 `t_compute()` 透過 `mean`, `m2`, `n` 計算 `t_value`
5. 使用 `t_value` 判斷兩個 `class` 的分佈是否存在顯著差異,若 `abs(t_value)` 高於一定 threshold,則可合理判斷兩者分佈不同
#### t_push
每次計算完 `exec_time` 都會將其加入統計資料中。
此時需要使用線上更新的方式,對 `mean`, `m2`, `n` 做更新。
##### online 數值更新公式
###### n 更新
$$n \leftarrow n+1$$
###### delta 更新
$$\delta \leftarrow x-\bar{x}_{old}$$
###### 平均數 mean 更新
$$ \bar{x}_{new} \leftarrow \bar{x}_{old}+\frac{\delta}{n}$$
###### 累積變異數 $M_2$ 更新
$$M_{2,new} \leftarrow M_{2,old}+\delta(x-\bar{x}_{new})$$
```c
void t_push(t_context_t *ctx, double x, uint8_t class)
{
assert(class == 0 || class == 1);
ctx->n[class]++;
/* Welford method for computing online variance
* in a numerically stable way.
*/
double delta = x - ctx->mean[class];
ctx->mean[class] = ctx->mean[class] + delta / ctx->n[class];
ctx->m2[class] = ctx->m2[class] + delta * (x - ctx->mean[class]);
}
```
#### t_compute
利用先前統計資料計算 t-value,公式如下:
$$t=\frac{\bar{X}_0-\bar{X}_1}{\sqrt{\frac{{s_0}^2}{N_0}-\frac{{s_1}^2}{N_1}}}$$
$S_0$, $S_1$ 可以先前得到的累積變異數 $M_2$ 求得:
$$s^2 = \frac{M_2}{n-1}$$
```c
double t_compute(t_context_t *ctx)
{
double var[2] = {0.0, 0.0};
var[0] = ctx->m2[0] / (ctx->n[0] - 1);
var[1] = ctx->m2[1] / (ctx->n[1] - 1);
double num = (ctx->mean[0] - ctx->mean[1]);
double den = sqrt(var[0] / ctx->n[0] + var[1] / ctx->n[1]);
double t_value = num / den;
return t_value;
}
```
### `lab0-c` 中的 `dudect` 現存問題
#### 本機端與 github runner 的測試結果不同
> 此節屬於個人猜測,真正的原因仍須更完整的驗證
在本機端執行 `make test` 時,都能全部通過(`100/100`)。但推送到 github 時,會發生最後一項 test complexity 無法通過的狀況(`95/100`)。
(甚至會發生 `insert_tail` 通過,但 `insert_head` 沒有通過的奇怪現象)
我認為可能是本地端的執行環境和 github runner 環境不同所造成,在虛擬環境下的 `exec_times` 資料可能包含更多雜訊(如 OS interrupt 等),測量結果不準確。
#### 缺乏 post-processing 的 `percentile()` 實作
根據作業說明,在 `lab0-c` 未有完整實作 `percentile` 功能。
在計算 `t-value` 時丟棄部份 `exec_time` 極端值,可以更精確的確認 `random class` 和 `fix class` 的分佈是否相似。
- 解決方式:給定多筆指定的 `percentile`,並同時維護不同 `percentile` 下的 `t-value`。這些 `t-values` 將用於進一步判斷函式是否為常數時間複雜度。
- e.g. 給定 $100$ 筆 `percentile`,會需要維護 $100+1$ 個 `t-value`
(加上未經極端值處理的 raw data)
:::info
心中未解的疑惑:
目前的實作是對所有 `t-values` 取最大值,但這樣是否有助於改善極端值所帶來的影響?極端值仍會影響到那組未 crop 的 raw data;而該組 `t-value` 也可以去影響到 `max t-value` 的計算結果。
在 lab0-c 裡引入 percentile 實作後,可以順利通過全部的測試 `(100/100)`。
但為何可以成功改善,我還搞不懂,仍須進一步了解。
:::