contributed by < Andrushika
>
$ gcc --version
gcc (Ubuntu 13.3.0-6ubuntu2~24.04) 13.3.0
$ lscpu
架構: x86_64
CPU 作業模式: 32-bit, 64-bit
Address sizes: 39 bits physical, 48 bits virtual
Byte Order: Little Endian
CPU(s): 16
On-line CPU(s) list: 0-15
供應商識別號: GenuineIntel
Model name: 11th Gen Intel(R) Core(TM) i7-11800H @ 2.30GHz
CPU 家族: 6
型號: 141
每核心執行緒數: 2
每通訊端核心數: 8
Socket(s): 1
製程: 1
CPU(s) scaling MHz: 31%
CPU max MHz: 4600.0000
CPU min MHz: 800.0000
BogoMIPS: 4608.00
Virtualization features:
虛擬: VT-x
Caches (sum of all):
L1d: 384 KiB (8 instances)
L1i: 256 KiB (8 instances)
L2: 10 MiB (8 instances)
L3: 24 MiB (1 instance)
NUMA:
NUMA 節點: 1
NUMA node0 CPU(s): 0-15
queue.[ch]
佇列操作程式碼實作為預防被釋放的記憶體空間被意外存取,先使用 q_release_element
將 entry
逐一刪除;最後釋放 head
所使用的記憶體空間。
void q_free(struct list_head *head)
{
if (!head)
return;
element_t *entry = NULL, *safe = NULL;
list_for_each_entry_safe (entry, safe, head, list) {
q_release_element(entry);
}
free(head);
}
- 檢查
head
,若為NULL
直接return false
- 使用
malloc
為entry
分配記憶體空間- 使用
list_add
或list_add_tail
將節點加入鏈結串列中
因為第二個步驟(malloc entry)在兩個函式中完全重複,所以獨立寫成 create_entry
函式:
element_t *create_entry(char *s)
{
element_t *entry = malloc(sizeof(element_t));
if (!entry)
return false;
char *s_dup = strdup(s);
if (!s_dup) {
free(entry);
return false;
}
entry->value = s_dup;
return entry;
}
q_insert_head
, q_insert_tail
如下,可以直接呼叫 create_entry
使用:
bool q_insert_head(struct list_head *head, char *s)
{
if (!head)
return false;
element_t *entry = create_entry(s);
list_add(&entry->list, head);
return true;
}
bool q_insert_tail(struct list_head *head, char *s)
{
if (!head)
return false;
element_t *entry = create_entry(s);
list_add_tail(&entry->list, head);
return true;
}
一開始在閱讀 queue.h
時,發現這兩個函式關於 *sp
的 annotation 有些奇怪:
q_remove_tail() - Remove the element from tail of queue
@sp: string would be inserted
應修正如下較貼近原意:
@sp: Pointer to a buffer where the removed element's string will be copied.
- 檢查
head
,若為NULL
或empty
則return NULL
- 使用
list_first_entry
或list_last_entry
取得欲移除的element_t*
- 將欲刪除的
entry->value
存入*sp
- 使用
list_del
將其從鏈結串列中刪除
因兩者的實作幾乎相同,在此僅附上 q_remove_head
的程式碼:
element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize)
{
if (!head || list_empty(head))
return NULL;
element_t *entry = list_first_entry(head, element_t, list);
if (sp && bufsize > 0) {
strncpy(sp, entry->value, bufsize - 1);
sp[bufsize - 1] = '\0';
}
list_del_init(&entry->list);
return entry;
}
本函式的實作難點是「找到正確的 mid node」,因為一開始沒看清楚題目的要求,所以折騰很久。
根據 queue.h 的敘述,應該刪除 ⌊n/2⌋th node
。
- 檢查
head
,若為NULL
或empty
則return false
- 使用
q_find_mid
搭配快慢指標技巧,找到欲刪除的中點- 使用 Linux Kernel list API 提供的
list_del
刪除節點,並釋放記憶體空間
struct list_head *q_find_mid(struct list_head *head)
{
// The middle node of a linked list of size n is the
// ⌊n / 2⌋ th node from the start using 0-based indexing.
if (!head || list_empty(head))
return NULL;
// handle the 1-node case
if (list_is_singular(head))
return head->next;
// find the middle node
struct list_head *slow = head->next, *fast = slow->next->next;
while (fast != head && fast->next != head) {
slow = slow->next;
fast = fast->next->next;
}
return slow->next;
}
bool q_delete_mid(struct list_head *head)
{
// https://leetcode.com/problems/delete-the-middle-node-of-a-linked-list/
if (!head || list_empty(head))
return false;
// find the middle node
struct list_head *mid = q_find_mid(head);
element_t *target = list_entry(mid, element_t, list);
// delete node from the list
list_del(&target->list);
q_release_element(target);
return true;
}
在已排序過的鏈節串列中,刪除重複出現的節點。
- 檢查
head
,若為NULL
或empty
則return false
- 使用
list_for_each_safe
安全走訪節點(因涉及刪除操作,使用 safe mode)
a. 檢查curr
和next
是否相同;若出現重複,則持續刪除next
b. 如果出現重複,curr
本身也需要被刪除
bool q_delete_dup(struct list_head *head)
{
// https://leetcode.com/problems/remove-duplicates-from-sorted-list-ii/
if (!head || list_empty(head))
return false;
struct list_head *curr, *next;
list_for_each_safe (curr, next, head) {
element_t *entry = list_entry(curr, element_t, list);
if (next != head &&
strcmp(entry->value, list_entry(next, element_t, list)->value) ==
0) {
while (next != head &&
strcmp(entry->value,
list_entry(next, element_t, list)->value) == 0) {
list_del(next);
q_release_element(list_entry(next, element_t, list));
next = curr->next;
}
list_del(curr);
q_release_element(entry);
}
}
return true;
}
- 檢查
head
,若為NULL
或empty
則直接return
- 使用
list_for_each_safe
安全走訪節點
a. 取出curr
(當前節點)與next
(下一個節點)
b. 若next
為head
,表示已走訪完整個鏈結串列,則跳出迴圈
c. 使用list_del
將next
從原位置移除
d. 使用list_add
將next
插入到curr
前一個位置(節點交換)
e. 更新next
,使其指向curr->next
,確保走訪能正確進行
其中 2.e
步驟看起來有些微妙。
因為 swap
時是兩兩一組,所以會希望 iterator 一次跳躍兩個節點。
next = curr->next
除了可以調整 curr
, next
在 swap
過後的相對位置外,也剛好讓 iterator 前進了一步;加上 list_for_each_safe
的迴圈更新條件,剛好可以讓 iterator 達成前進兩步的目標。
void q_swap(struct list_head *head)
{
if (!head || list_empty(head))
return;
// https://leetcode.com/problems/swap-nodes-in-pairs/
struct list_head *curr, *next;
list_for_each_safe (curr, next, head) {
if (next == head)
break;
list_del(next);
list_add(next, curr->prev);
next = curr->next;
}
}
- 檢查
head
,若為NULL
或empty
則直接return
- 使用
list_for_each_safe
安全走訪節點- 依序把節點移除,並插入鏈結串列的最前方
實際上就像當作 stack 進行操作,就可以達到翻轉鏈節串列的效果。
void q_reverse(struct list_head *head)
{
if (!head || list_empty(head))
return;
struct list_head *curr, *next;
list_for_each_safe (curr, next, head) {
list_del(curr);
list_add(curr, head);
}
}
這個函式的實作,我參照了先前 reverse
的實作方式:
k
個節點為一組進行翻轉,在每次走訪時,用一個 count
來紀錄當前累積走過了多少節點;當 count == k
就立刻進行翻轉操作。stack
, 但插入點不會永遠都在 head
,所以必須不斷紀錄新的插入點。pending
:下一輪即將進行 reverse 操作的節點
last
:進行局部 reverse 操作時,pending
的插入點
curr
, next
:提供給 list_for_each_safe
走訪使用的 iterator
count
:紀錄當前組別已累積走過多少個 node
void q_reverseK(struct list_head *head, int k)
{
// https://leetcode.com/problems/reverse-nodes-in-k-group/
if (!head || list_empty(head) || k <= 1)
return;
// nodes waiting to be add & start add point
struct list_head *pending = head->next, *last = head;
struct list_head *curr, *next, *next_pend;
int count = 0;
list_for_each_safe (curr, next, head) {
if (++count >= k) {
while (count) {
count--;
next_pend = pending->next;
list_del(pending);
list_add(pending, last);
pending = next_pend;
}
last = next->prev;
}
}
}
因為後續的 sort, merge 都會涉及合併兩個「已排列鏈節串列」,所以先寫成一個工具函式來簡化操作。需要注意的是,傳入這個函式處理的 list
要先打斷 circular list 的頭尾連結,且不需要將 guard node
傳入。
- 初始化
guard
節點,tail
指向guard
以追蹤新鏈結串列的最後節點。- 合併兩個排序鏈結串列,依
q_cmp()
比較a
和b
,將較小者連接到tail
,並前進到下一節點。- 處理剩餘節點,若
a
或b
尚未走訪完,直接連接至tail
。- 回傳排序後的鏈結串列起點(
guard.next
)。
這裡的實作比較冗長,且課程當中已有提及類似程式,故不在此詳細列出。
詳情請見 github repo!
size=2M
的 test case,會一直出現程式執行時間過長的提示。q_find_mid
,其時間複雜度為 O(n)。size
相同的兩個 list
),就馬上進行合併。list
逐一合併。merge_two_sorted_list
。list
分割中點,省下了大把時間。會需要額外使用一個 part[]
來存放部份合併完成的 list
。
part[]
有特殊的存放規則:part[n]
中若不為 empty
,就是大小為 list
(這樣一來,檢查是否存在 size
相同的 list
就相當方便!)
void q_sort(struct list_head *head, bool descend)
{
struct list_head *list;
struct list_head *part[65];
int level;
if (!head || list_empty(head) || list_is_singular(head))
return;
// break circular
head->prev->next = NULL;
head->prev = NULL;
list = head->next;
INIT_LIST_HEAD(head);
// init part[]
memset(part, 0, sizeof(part));
while (list) {
struct list_head *curr = list;
struct list_head *next = list->next;
curr->prev = NULL;
curr->next = NULL;
list = next;
// the number of nodes in part[] will always be 0 or 2^level
// [1, 2, 4, 8, 16...] or [0, 2, 0, 0, 16] are acceptable
// depends on level (idx)
// keep merging until unable to merge
for (level = 0; part[level]; level++) {
curr = merge_two_sorted_list(descend, part[level], curr);
part[level] = NULL;
}
part[level] = curr;
}
list = NULL;
for (level = 0; level < (int) (sizeof(part) / sizeof(part[0])); level++) {
if (!part[level])
continue;
list = merge_two_sorted_list(descend, part[level], list);
}
// reconstruct list
while (list) {
struct list_head *next = list->next;
list_add_tail(list, head);
list = next;
}
}
此處排序時是使用 strcmp
,所以節點是依照字典序 (Lexicographic order) 排列。
e.g.
如果對 [17, 2, 189, 38]
進行字典序排列:
直覺上的數字由小到大排列結果,會是 [2, 17, 38, 189]
( X )
但實際的字典序排列結果為: [17, 189, 2, 38]
( O )
本函式將會維護一個單調嚴格遞增(or 遞減)的 list
。
在非鏈結串列的情境下,可以透過 stack
達成:
list
時,和 stack
頂部的節點比大小stack
頂部節點會違反遞增 or 遞減的條件,則持續 pop stack
直到符合條件為止事實上,即使不使用 stack
,也可以在 doubly linked list 上實作這個函式。
透過維護一個 *tail
,其實際效用和 stack.top()
類似,代表維護的單調序列的最後一個節點位置。
因兩者實作幾乎相同,僅附上 q_ascend
作為參考:
int q_ascend(struct list_head *head)
{
// https://leetcode.com/problems/remove-nodes-from-linked-list/
if (!head || list_empty(head))
return 0;
if (list_is_singular(head))
return 1;
struct list_head *tail = head->next, *curr, *tmp;
list_for_each (curr, head) {
while (tail != head &&
strcmp(list_entry(curr, element_t, list)->value,
list_entry(tail, element_t, list)->value) < 0) {
tmp = tail->prev;
list_del(tail);
q_release_element(list_entry(tail, element_t, list));
tail = tmp;
}
tail = curr;
}
return q_size(head);
}
- 檢查
head
是否為NULL
orempty
,若是則直接回傳 0- 初始化變數,設定
curr
(當前 queue),result
(合併後的結果)- 使用
list_for_each_entry
走訪head
中的每個queue_context_t
,進行合併
a. 斷開curr->q
(原為 circular list),確保merge_two_sorted_list()
正常運作
b. 將curr->q
併入result
,維持result
為當前已排序的結果
c.curr->q->next = curr->q;
清除q
之中已經排序完成的節點- 將合併後的結果存入第一個
queue
a. 取head->next
作為ans_entry
,初始化ans_entry->q
b. 使用list_add_tail
逐一將result
中的節點加入ans_entry->q
,重建合併完成的 queue- 回傳合併後的
q_size(ans_entry->q)
int q_merge(struct list_head *head, bool descend)
{
// https://leetcode.com/problems/merge-k-sorted-lists/
if (!head || list_empty(head))
return 0;
queue_contex_t *curr = NULL, *ans_entry;
struct list_head *result = NULL;
list_for_each_entry (curr, head, chain) {
curr->q->prev->next = NULL;
result = merge_two_sorted_list(descend, result, curr->q->next);
curr->q->next = curr->q;
}
ans_entry = list_entry(head->next, queue_contex_t, chain);
INIT_LIST_HEAD(ans_entry->q);
while (result) {
struct list_head *next = result->next;
list_add_tail(result, ans_entry->q);
result = next;
}
return q_size(ans_entry->q);
}
list_sort.c
以 2023 年的作業觀摩中 Risheng1128 同學的實作方式為參考
首先,加入 list_sort.c
和 list_sort.h
到專案中。
void *priv
參數,而在 lab0-c
中都用不到,可以直接拿掉cmp
函式有接收 descend
參數的功能,原先想要直接偷 queue.c
裡之前做的來用;但因 queue.h
不允許修改,所以直接複製一份 q_cmp
到 list_sort.c
中更動範例如下(僅舉例,其他省略,請參閱 repo):
- static struct list_head *merge(void *priv, list_cmp_func_t cmp,
- struct list_head *a, struct list_head *b)
+ static struct list_head *merge(struct list_head *a, struct list_head *b, int descend)
為了後續測試時切換方便,在 qtest.c 的 console_init() 中加入 list_sort
param,讓使用者可以直接指定要使用的排序演算法。
add_param("listsort", &use_list_sort, "use linux kernel style sorting algorithm from lib/list_sort.c", NULL);
在 do_sort()
實作的地方,依據 use_list_sort
(option 會改變的變數),去判斷當前應該使用哪個排序演算法:
if (current && exception_setup(true)){
if (use_list_sort)
list_sort(current->q, descend);
else
q_sort(current->q, descend);
}
q_sort()
進行效能比較首先,在 trace/
目錄下新增 trace-sort.cmd
:
(根據需要,可以自行選擇要測試的 sorting 模式、資料量等等)
option listsort 1
new
ih RAND 1000000
time
sort
time
上方將會選用 lib/list_sort.c 中的演算法,執行 1000000
次插入操作後,進行排序與計時。
因為要測試的數據量比較大,會觸發到 harness.c 中定義的 alarm 觸發時間,而造成程式直接中斷;為了測試方便,先給一個比較寬容的 time_limit
。
// harness.c
static int time_limit = 10;
接下來,在 script/
目錄下新增用來統計排序執行時間的 test_sort_perf.py
:
import subprocess
import re
NUM_RUNS = 10
delta_times = []
for i in range(NUM_RUNS):
result = subprocess.run(["./qtest", "-f", "traces/trace-sort.cmd"],
capture_output=True, text=True)
output = result.stdout
matches = re.findall(r"Delta time\s*=\s*([\d\.]+)", output)
if len(matches) < 2:
print(f"{i+1}th test: No Delta time finded.")
continue
delta_time = float(matches[1])
delta_times.append(delta_time)
print(f"{i+1}th test: Delta time = {delta_time}")
if delta_times:
avg = sum(delta_times) / len(delta_times)
print(f"\naverage Delta time of {len(delta_times)} of sorting: {avg}")
else:
print("No Delta time finded.")
這段 script 將會連續執行 NUM_RUNS
次(預設為10
)排序,並用 list 紀錄下每次排序花費的時間,最終對所有秒數取平均作為結果。
Round | q_sort | list_sort |
---|---|---|
1 | 2.722 | 2.582 |
2 | 2.74 | 2.614 |
3 | 2.629 | 2.459 |
4 | 2.66 | 2.447 |
5 | 2.741 | 2.386 |
6 | 2.605 | 2.497 |
7 | 2.782 | 2.471 |
8 | 2.762 | 2.443 |
9 | 2.582 | 2.396 |
10 | 2.592 | 2.393 |
AVG | 2.6815 | 2.4688 |
Round | q_sort | list_sort |
---|---|---|
1 | 6.075 | 5.567 |
2 | 5.812 | 5.564 |
3 | 5.84 | 5.724 |
4 | 5.978 | 5.823 |
5 | 5.845 | 5.494 |
6 | 6.351 | 5.647 |
7 | 6.15 | 5.672 |
8 | 5.817 | 5.565 |
9 | 6.18 | 5.727 |
10 | 5.76 | 5.736 |
AVG | 5.98 | 5.652 |
可以觀察到:
0.22
秒0.33
秒當使用者輸入 command web
之後,將會建立 server socket,並使 linenoise.c
裡面的 eventmux_callback
的 function pointer 指向 web_eventmux()
。
static bool do_web(int argc, char *argv[])
{
// init... (ignore)
web_fd = web_open(port);
if (web_fd > 0) {
printf("listen on port %d, fd is %d\n", port, web_fd);
line_set_eventmux_callback(web_eventmux);
use_linenoise = false;
} else {
// handle error
}
return true;
}
line_edit()
在 line_edit()
中使用 while
讀取輸入的地方,可以看到幾行程式碼:
while (1) {
// init...(ignore)
if (eventmux_callback != NULL) {
int result = eventmux_callback(l.buf);
if (result != 0)
return result;
}
// handle input from command line...(ignore)
}
eventmux_callback
就是先前在 command line 執行 web
所設置的 function pointer,可以用來 handle 來自 web server 的命令。
web_eventmux()
這裡是主要使用系統呼叫 select()
的地方。select()
採用了 I/O Multiplexing Model,參考下圖:
web_eventmux()
中同時監聽來自 web server 和 stdin 的 file descriptor,任何一方觸發都會使程式繼續執行下去。
本函式主要 handle 來自 web server 的事件;因此,若是 stdin 觸發監聽,將不會進行對命令的處理,而是回到 linenoise.c 中的 line_edit()
才處理。
int web_eventmux(char *buf)
{
fd_set listenset;
FD_ZERO(&listenset);
FD_SET(STDIN_FILENO, &listenset);
int max_fd = STDIN_FILENO;
if (server_fd > 0) {
FD_SET(server_fd, &listenset);
max_fd = max_fd > server_fd ? max_fd : server_fd;
}
int result = select(max_fd + 1, &listenset, NULL, NULL, NULL);
if (result < 0)
return -1;
if (server_fd > 0 && FD_ISSET(server_fd, &listenset)) {
//handle cmd from web server...(ignore)
return strlen(buf);
}
FD_CLR(STDIN_FILENO, &listenset);
return 0;
}
RIO 全名為 Robust I/O,根據卡內基美隆大學的課程中的介紹,RIO 可以有效解決 short count 的問題。除此之外,因為每次呼叫 read()
都需要從 user mode 切換到 kernel mode,如果頻繁呼叫會大幅降低效能。
RIO 套件的做法是:當系統呼叫 read()
時,一次性讀取一大塊資料到 user space buffer 中 (同時處理 short count),後續再多次從 buffer 取用即可;當讀到 buffer 為空時,才再次系統呼叫。
rio_t
結構其中 count
代表著 buffer 中尚未被取用的資料長度。
所以 count <= 0
時,代表 buffer 已經空了,需要再次 read()
。
typedef struct {
int fd; /* descriptor for this buf */
int count; /* unread byte in this buf */
char *bufptr; /* next unread byte in this buf */
char buf[BUFSIZE]; /* internal buffer */
} rio_t;
rio_read()
static ssize_t rio_read(rio_t *rp, char *usrbuf, size_t n)
{
int cnt;
while (rp->count <= 0) { /* refill if buf is empty */
rp->count = read(rp->fd, rp->buf, sizeof(rp->buf));
if (rp->count < 0) {
if (errno != EINTR) /* interrupted by sig handler return */
return -1;
} else if (rp->count == 0) { /* EOF */
return 0;
} else
rp->bufptr = rp->buf; /* reset buffer ptr */
}
/* Copy min(n, rp->count) bytes from internal buf to user buf */
// 處理 short count
cnt = n;
if (rp->count < n)
cnt = rp->count;
memcpy(usrbuf, rp->bufptr, cnt);
rp->bufptr += cnt;
rp->count -= cnt;
return cnt;
}
buf_stack
一開始讀到以下程式碼,覺得很困惑,什麼情景下會用到這個?
static rio_t *buf_stack;
//
/* Create new buffer for named file.
* Name == NULL for stdin.
* Return true if successful.
*/
static bool push_file(char *fname)
{
int fd = fname ? open(fname, O_RDONLY) : STDIN_FILENO;
has_infile = fname ? true : false;
if (fd < 0)
return false;
if (fd > fd_max)
fd_max = fd;
rio_t *rnew = malloc_or_fail(sizeof(rio_t), "push_file");
rnew->fd = fd;
rnew->count = 0;
rnew->bufptr = rnew->buf;
rnew->prev = buf_stack;
buf_stack = rnew;
return true;
}
/* Pop a file buffer from stack */
static void pop_file()
{
if (buf_stack) {
rio_t *rsave = buf_stack;
buf_stack = rsave->prev;
close(rsave->fd);
free_block(rsave, sizeof(rio_t));
}
}
後來找到使用了 push_file()
的地方:
static bool do_source(int argc, char *argv[])
{
// error hadling...(ignore)
if (!push_file(argv[1])) {
report(1, "Could not open source file '%s'", argv[1]);
return false;
}
return true;
}
發現原來是執行 source
的時候會用到,是為了處理 nested source
commands 的情況。
舉例來說,以下有三個 file,將執行一些命令:
> source fileB
> it 6
> source fileC
> it 3
> new
> it 8
q = [8, 3, 6]
如果使用者執行 ./qtest
後輸入 source fileA
,就會造成遞迴式的開檔、執行,而先前宣告的 buf_stack
就是為了因應這種情況而存在。
每當讀到 source
,代表需要開新的檔案;新增、初始化新檔案對應的 rio_t
之後,將其 push
到 buf_stack
中;直到檔案內容被讀完就 pop_file()
,就會回到先前讀到一半的檔案狀態。
可以對應到 console.c 中 readline()
的其中一段程式碼:
(系統呼叫 read()
之後,發現沒有剩餘的可讀資料,代表 EOF)
if (buf_stack->count <= 0) {
/* Need to read from input file */
buf_stack->count = read(buf_stack->fd, buf_stack->buf, RIO_BUFSIZE);
buf_stack->bufptr = buf_stack->buf;
if (buf_stack->count <= 0) {
/* Encountered EOF */
pop_file();
if (cnt > 0) {
/* Last line of file did not terminate with newline. */
/* Terminate line & return it */
*lptr++ = '\n';
*lptr++ = '\0';
if (echo) {
report_noreturn(1, prompt);
report_noreturn(1, linebuf);
}
return linebuf;
}
return NULL;
}
}
當我在閱讀這段 code 的時候,覺得有些不直覺。目前的實作會在 line_edit()
裡面同時檢查來自 web server 的 request。
在 OOP 中有所謂的 SOLID 原則(即便 C 並非 OO),其中的單一職責原則 (Single-resbonsibility principle) 提到,函式應將關注點分離,每個函式只專注做一件事情。
因此,line_edit()
是否應該專注於處理來自 command line 的命令?
又剛好看到老師老師撰寫的並行程式設計:排程器,裡面的 event_loop()
實作給了我一些啟發:
我想可以參考這樣的函式呼叫方式,但不以 coroutine 的方式實作。
是否有可能仿照,在 main_loop()
中僅去呼叫 select()
;當 web server 或 command line 事件被觸發時,才去呼叫對應的 handling 函式?
在 queue.c
裡新增了 q_shuffle
,實作參考的是 Fisher–Yates shuffle 演算法。
考量到原演算法中有多次存取陣列內容、swap 操作,而我需要對 doubly linked list 進行 shuffle
;在不以其他資料結構加速存取過程的狀況下,查找特定節點的時間複雜度為 O(n)
。
所以實作時佔以一個 struct list_head **arr
存下各節點的位置,待 shuffle
操作執行完畢後,再逐一將節點串起來。
void q_shuffle(struct list_head *head)
{
if (!head || list_empty(head) || list_is_singular(head))
return;
int len = q_size(head), i = 0;
struct list_head **arr = malloc(len * sizeof(struct list_head *));
struct list_head *curr, *tmp;
list_for_each (curr, head) {
arr[i++] = curr;
}
// pick a node randomly and swap
for (i = len - 1; i > 0; i--) {
int j = rand() % (i + 1);
tmp = arr[i];
arr[i] = arr[j];
arr[j] = tmp;
}
// reconstruct the doubly linked list
for (i = 0; i + 1 < len; i++) {
arr[i]->next = arr[i + 1];
arr[i + 1]->prev = arr[i];
}
head->next = arr[0];
arr[0]->prev = head;
head->prev = arr[len - 1];
arr[len - 1]->next = head;
free(arr);
}
為了要檢測每次的 shuffle
是否足夠隨機,可以套用假設檢定的技巧來判斷。
所謂「夠亂」,可以視為在做了足夠多次 shuffle 後,每組排列組合出現的機率相等(Uniform Distribution)。
為此,可以使用卡方檢定。該檢定可以用於判斷兩機率分佈是否相似。基於作業說明中提供的測試用 script,進行 shuffle
操作 1000000
次,得到各排列組合的次數分佈如下:
def chiSquared(observation, expectation):
return ((observation - expectation) ** 2) / expectation
自由度:23
(4
個數字共有 4! = 24
種排列組合,自由度 = N-1
)
期望頻率:1000000 / 24 = 41666
Chi-squared value sum: 24.662986607785726
p-value: 0.3678926504294987
因 p-value 大於顯著水準 (alpha = 0.05),無法拒絕
(
排列組合 | 出現頻率 | Chi-squared value |
---|---|---|
1234 | 41488 | 0.7604281668506696 |
1243 | 42096 | 4.437671002736044 |
1324 | 41581 | 0.17340277444439112 |
1342 | 41835 | 0.6854749675994816 |
1423 | 41853 | 0.8392694283108529 |
1432 | 41970 | 2.218019488311813 |
2134 | 41631 | 0.02940047040752652 |
2143 | 41550 | 0.322949167186675 |
2314 | 41584 | 0.16137858205731292 |
2341 | 41872 | 1.018480295684731 |
2413 | 41318 | 2.906542504680075 |
2431 | 41879 | 1.0888734219747516 |
3124 | 41698 | 0.024576393222291555 |
3142 | 41459 | 1.0283924542792684 |
3214 | 41558 | 0.2799404790476648 |
3241 | 41873 | 1.0283924542792684 |
3412 | 41534 | 0.41818269092305477 |
3421 | 41842 | 0.7434358949743196 |
4123 | 41823 | 0.5915854653674458 |
4132 | 41516 | 0.5400086401382422 |
4213 | 41787 | 0.35138962223395576 |
4231 | 41521 | 0.5046080737291797 |
4312 | 41429 | 1.3480775692411078 |
4321 | 41303 | 3.1625066001056017 |
作業說明中的 script 存在小缺陷,導致執行效率極差,問題出在模擬輸入 command 的地方:
test_count = 1000000
input = "new\nit 1\nit 2\nit 3\nit 4\n"
for i in range(test_count):
input += "shuffle\n"
input += "free\nquit\n"
在 python 中的字串處理,直接使用 +=
進行操作,實際上是重複 allocate 新記憶體空間、複製舊字串、貼上新字串;時間複雜度可達到
用以下方式改善:
input_commands = ["new\n", "it 1\n", "it 2\n", "it 3\n", "it 4\n"]
input_commands.extend(["shuffle\n"] * test_count)
input_commands.append("free\nquit\n")
input = "".join(input_commands)
先對 list
進行新增重複性高的 command substring,再一次性使用 join()
建立字串,可以省下不必要的 allocate 操作。
作者提出了一種使用統計學上「假設檢定」的方式,來檢測函式是否為常數時間複雜度。可以大致總結為以下步驟:
測量函式執行時間
首先將 input data 分為兩大類:fix class
, random class
fix class
的資料內容固定,且可以在常數執行時間完成random class
為隨機產生把兩類資料餵給欲測試的函式,並分類記錄下每次函式執行所花的時間。
(在實作中,執行時間使用 cpu_cycles()
測量)
資料後處理 (post-processing)
測量結果常會受到外部環境影響(如 OS interrupt),而出現異常極端值。因此,對數據後處理可以提升分析的精準度。
在原專案中,會給定一系列的裁切比例,將測量值較高的部分數據丟棄。
套用假設檢定
使用 Welch’s t-test,檢測剛剛獲得的兩類資料的執行時間分佈是否相似。如果 random class
和 fix class
的執行時間分布不同,則可以合理推斷該函式「非」常數時間複雜度。
cpu_cycles()
取得 before_ticks
和 after_ticks
exec_time = after_ticks - before_ticks
t_push()
在線更新 mean
, m2
, n
t_compute()
透過 mean
, m2
, n
計算 t_value
t_value
判斷兩個 class
的分佈是否存在顯著差異,若 abs(t_value)
高於一定 threshold,則可合理判斷兩者分佈不同每次計算完 exec_time
都會將其加入統計資料中。
此時需要使用線上更新的方式,對 mean
, m2
, n
做更新。
void t_push(t_context_t *ctx, double x, uint8_t class)
{
assert(class == 0 || class == 1);
ctx->n[class]++;
/* Welford method for computing online variance
* in a numerically stable way.
*/
double delta = x - ctx->mean[class];
ctx->mean[class] = ctx->mean[class] + delta / ctx->n[class];
ctx->m2[class] = ctx->m2[class] + delta * (x - ctx->mean[class]);
}
利用先前統計資料計算 t-value,公式如下:
double t_compute(t_context_t *ctx)
{
double var[2] = {0.0, 0.0};
var[0] = ctx->m2[0] / (ctx->n[0] - 1);
var[1] = ctx->m2[1] / (ctx->n[1] - 1);
double num = (ctx->mean[0] - ctx->mean[1]);
double den = sqrt(var[0] / ctx->n[0] + var[1] / ctx->n[1]);
double t_value = num / den;
return t_value;
}
lab0-c
中的 dudect
現存問題此節屬於個人猜測,真正的原因仍須更完整的驗證
在本機端執行 make test
時,都能全部通過(100/100
)。但推送到 github 時,會發生最後一項 test complexity 無法通過的狀況(95/100
)。
(甚至會發生 insert_tail
通過,但 insert_head
沒有通過的奇怪現象)
我認為可能是本地端的執行環境和 github runner 環境不同所造成,在虛擬環境下的 exec_times
資料可能包含更多雜訊(如 OS interrupt 等),測量結果不準確。
percentile()
實作根據作業說明,在 lab0-c
未有完整實作 percentile
功能。
在計算 t-value
時丟棄部份 exec_time
極端值,可以更精確的確認 random class
和 fix class
的分佈是否相似。
percentile
,並同時維護不同 percentile
下的 t-value
。這些 t-values
將用於進一步判斷函式是否為常數時間複雜度。percentile
,會需要維護 t-value
心中未解的疑惑:
目前的實作是對所有 t-values
取最大值,但這樣是否有助於改善極端值所帶來的影響?極端值仍會影響到那組未 crop 的 raw data;而該組 t-value
也可以去影響到 max t-value
的計算結果。
在 lab0-c 裡引入 percentile 實作後,可以順利通過全部的測試 (100/100)
。
但為何可以成功改善,我還搞不懂,仍須進一步了解。