Try   HackMD

2025q1 Homework1 (lab0)

contributed by < ibat10clw >

作業書寫規範:

  • 無論標題和內文中,中文和英文字元之間要有空白字元 (對排版和文字搜尋有利)
  • 文字訊息 (尤其是程式執行結果) 請避免用圖片來表示,否則不好搜尋和分類
  • 共筆書寫請考慮到日後協作,避免過多的個人色彩,用詞儘量中性
  • 不要在筆記內加入 [TOC] : 筆記左上方已有 Table of Contents (TOC) 功能,不需要畫蛇添足
  • 不要變更預設的 CSS 也不要加入任何佈景主題: 這是「開發紀錄」,用於評分和接受同儕的檢閱
  • 在筆記中貼入程式碼時,避免非必要的行號,也就是該手動將 c=cpp= 變更為 ccpp。行號只在後續討論明確需要行號時,才要出現,否則維持精簡的展現。可留意「你所不知道的 C 語言: linked list 和非連續記憶體」裡頭程式碼展現的方式
  • HackMD 不是讓你張貼完整程式碼的地方,GitHub 才是!因此你在開發紀錄只該列出關鍵程式碼 (善用 diff 標示),可附上對應 GitHub commit 的超連結,列出程式碼是為了「檢討」和「便於他人參與討論」
  • 留意科技詞彙的使用,請參見「資訊科技詞彙翻譯」及「詞彙對照表
  • 不要濫用 :::info, :::success, :::warning 等標示,儘量用清晰的文字書寫。:::danger 則僅限授課教師作為批注使用
  • 避免過多的中英文混用,已有明確翻譯詞彙者,例如「鏈結串列」(linked list) 和「佇列」(queue),就使用該中文詞彙,英文則留給變數名稱、人名,或者缺乏通用翻譯詞彙的場景
  • 在中文敘述中,使用全形標點符號,例如該用「,」,而非 ","。注意書名號的使用,即 ,非「小於」和「大於」符號
  • 避免使用不必要的 emoji 字元

開發環境

$ uname -a
Linux henry-ThinkBook-14-G7-IML 6.11.0-17-generic #17~24.04.2-Ubuntu SMP PREEMPT_DYNAMIC Mon Jan 20 22:48:29 UTC 2 x86_64 x86_64 x86_64 GNU/Linux

$ gcc --version
gcc (Ubuntu 13.3.0-6ubuntu2~24.04) 13.3.0
Copyright (C) 2023 Free Software Foundation, Inc.
This is free software; see the source for copying conditions.  There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.

$ lscpu
Architecture:             x86_64
  CPU op-mode(s):         32-bit, 64-bit
  Address sizes:          46 bits physical, 48 bits virtual
  Byte Order:             Little Endian
CPU(s):                   22
  On-line CPU(s) list:    0-21
Vendor ID:                GenuineIntel
  Model name:             Intel(R) Core(TM) Ultra 7 155H
    CPU family:           6
    Model:                170
    Thread(s) per core:   2
    Core(s) per socket:   16
    Socket(s):            1
    Stepping:             4
    CPU(s) scaling MHz:   23%
    CPU max MHz:          4800.0000
    CPU min MHz:          400.0000
    BogoMIPS:             5990.40
    Flags:                fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge m
                          ca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 s
                          s ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc 
                          art arch_perfmon pebs bts rep_good nopl xtopology nons
                          top_tsc cpuid aperfmperf tsc_known_freq pni pclmulqdq 
                          dtes64 monitor ds_cpl vmx smx est tm2 ssse3 sdbg fma c
                          x16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt t
                          sc_deadline_timer aes xsave avx f16c rdrand lahf_lm ab
                          m 3dnowprefetch cpuid_fault epb intel_ppin ssbd ibrs i
                          bpb stibp ibrs_enhanced tpr_shadow flexpriority ept vp
                          id ept_ad fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms
                           invpcid rdseed adx smap clflushopt clwb intel_pt sha_
                          ni xsaveopt xsavec xgetbv1 xsaves split_lock_detect us
                          er_shstk avx_vnni dtherm ida arat pln pts hwp hwp_noti
                          fy hwp_act_window hwp_epp hwp_pkg_req hfi vnmi umip pk
                          u ospke waitpkg gfni vaes vpclmulqdq rdpid bus_lock_de
                          tect movdiri movdir64b fsrm md_clear serialize arch_lb
                          r ibt flush_l1d arch_capabilities
Virtualization features:  
  Virtualization:         VT-x
Caches (sum of all):      
  L1d:                    544 KiB (14 instances)
  L1i:                    896 KiB (14 instances)
  L2:                     18 MiB (9 instances)
  L3:                     24 MiB (1 instance)
NUMA:                     
  NUMA node(s):           1
  NUMA node0 CPU(s):      0-21
Vulnerabilities:          
  Gather data sampling:   Not affected
  Itlb multihit:          Not affected
  L1tf:                   Not affected
  Mds:                    Not affected
  Meltdown:               Not affected
  Mmio stale data:        Not affected
  Reg file data sampling: Not affected
  Retbleed:               Not affected
  Spec rstack overflow:   Not affected
  Spec store bypass:      Mitigation; Speculative Store Bypass disabled via prct
                          l
  Spectre v1:             Mitigation; usercopy/swapgs barriers and __user pointe
                          r sanitization
  Spectre v2:             Mitigation; Enhanced / Automatic IBRS; IBPB conditiona
                          l; RSB filling; PBRSB-eIBRS Not affected; BHI BHI_DIS_
                          S
  Srbds:                  Not affected
  Tsx async abort:        Not affected

實作 queue.[ch]

在本次作業需以環狀雙向鏈結串列實作佇列,其中環狀雙向鍊結串列與 Linux 核心中採取的設計一致,將存放指標的結構體 struct list_head 嵌入實際存放資料的結構體 element_t 中。

q_new: 建立新的「空」佇列;

此函式之功能為建立一個空佇列,須注意空佇列代表佇列本身是存在的,但是佇列中沒有任何資料。
在給定的函式定義中,將 struct list_head * 作為回傳值,可得知此處需要透過 malloc 來分配 struct list_head 所需之記憶體空間,才得以在函式結束時仍保留記憶體資訊。

struct list_head *q_new()
{
    struct list_head *q = (struct list_head *) malloc(sizeof(struct list_head));
    if (!q)
        return NULL;    
    INIT_LIST_HEAD(q);
    return q;
}

INIT_LIST_HEAD 函式的功能為將傳入的參數 list_head 其兩個指標都指向自身。對照 list_empty 函式可得知當 head->next == next 時,代表此串列無任何節點,亦可代表佇列為空的狀態。

透過 qtest 預期可以看到下列結果

$ ./qtest 
cmd> show
l = NULL
cmd> new
l = []
cmd> show
Current queue ID: 0
l = []

q_insert_head: 在佇列開頭 (head) 插入 (insert) 給定的新節點 (以 LIFO 準則);

完成佇列之建立後,接下來實作與加入資料相關的函式。
在給定的函式定義中,此函式傳入兩個參數,分別代表要操作的佇列以及欲加入佇列之字串資料。

bool q_insert_head(struct list_head *head, char *s)
{
    if (!head)
        return false;

    element_t *node = (element_t *) malloc(sizeof(element_t));
    if (!node)
        return false;
    
    node->value = strdup(s);
    if (!node->value) {
        free(node);
        return false;
    }

    list_add(&node->list, head);
    return true;
}

在這個地方可以透過 strdup 一個函式就完成 buffer 的分配以及字串資料的複製,此外考慮傳入字串可能很長,若沒有先確定串列節點有分配成功,就進行字串相關處理,則可能產生浪費,因此在此函式中會在串列節點分配完成後才進行字串相關的操作。

透過 qtest 確認函式之正確性,預期可看到以下結果

$ ./qtest 
cmd> new
l = []
cmd> ih linux
l = [linux]
cmd> ih kernel
l = [kernel linux]

q_insert_tail: 在佇列尾端 (tail) 插入 (insert) 給定的新節點 (以 FIFO 準則);

此函式與 q_insert_head 的邏輯相似,僅需將最後使用的 list_add 調整成 list_add_tail 即可。

bool q_insert_tail(struct list_head *head, char *s)
{
    ...  // Same as q_insert_head
    list_add_tail(&node->list, head);    
    return true;
}

透過 qtest 確認函式之正確性,預期可看到以下結果

$ ./qtest 
cmd> new
l = []
cmd> ih linux
l = [linux]
cmd> it kernel
l = [linux kernel]

q_size: 計算目前佇列的節點總量;

此函式功能為計算佇列之長度。
由於 struct list_head 中並未有紀錄長度之變數,因此計算長度需實際走訪串列,此處可以使用 list_for_each 的巨集來走訪串列,查看 list.h 中可以得知此巨集傳入一參數 node 作為 iterator 使用,並且將其初始化成 head->next,而終止條件為 node != head,所以進到迴圈的次數會跟實際節點存在的數量等,在每一個回合將 sz 遞增即可計算出結果。

int q_size(struct list_head *head)
{
    if (!head)
        return -1;
    int sz = 0;
    struct list_head *it;
    list_for_each(it, head)
        sz++;
    return sz;
}

透過 qtest 確認函式之正確性,預期可看到以下結果

$ ./qtest 
cmd> new
l = []
cmd> ih linux 500
...
cmd> size
Queue size = 500
...
cmd> it kernel 500
...
cmd> size
Queue size = 1000
...

q_free: 釋放佇列所佔用的記憶體;

此函式之功能為釋放傳入佇列所佔用的記憶體,在佇列的建立以及節點新增的過程中,涉及了 struct list_head, element_t 以及 element_t->value 三處的記憶體分配,在釋放整個佇列時需回收這三部份的空間。

需注意不要造成 Use-After-Free (UAF) 的狀況,因此釋放的順序應當由內向外進行釋放。

void q_free(struct list_head *head)
{
    if (!head)
        return;

    element_t *it, *tmp;
    list_for_each_entry_safe (it, tmp, head, list) {
        free(it->value);
        free(it);
    }
    free(head);
}

q_remove_head: 在佇列開頭 (head) 移去 (remove) 給定的節點;

此函式之功能為移除佇列開頭之節點,並且當傳入參數 sp 不為 NULL 時,將節點上之字串複製至 sp

使用 list_del_init 將佇列開頭之節點從佇列中移除。
關於字串複製的部份要避免 overflow 的發生,考慮 bufsize 可能小於節點上字串之長度,因此此處需要判斷要複製多少位元組的資料到 sp 裡面。當字串長度小於 bufsize - 1 時,我們可以將字串完整複製至 sp 中,並在最後加入 null terminator,反之若字串長度大於 bufsize - 1 時,將其 truncate 至 bufsize - 1 後,同樣在最後加入 null terminator。

element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize)
{
    if (!head || list_empty(head))
        return NULL;
    
    struct list_head *lh = head->next;
    element_t *ele = list_entry(lh, element_t, list);
    list_del_init(lh);
    if (sp) {
        size_t sz = min(strlen(ele->value), bufsize - 1);
        memcpy(sp, ele->value, sz);
        sp[sz] = '\0';
    }
    return ele;
}

在此我們需要實作用於判斷兩數字間大小關係的 min 函式,此處採取 branchless 之設計,以減少分支預測失敗帶來的效能損失。參考來源

static inline int min(int a, int b) {
    return b ^ ((a ^ b) & -(a < b));
}

此函式透過 XOR 的性質,當 a < b 成立時,-(a < b) 相當於 0xFFFFFFFF,代表括號中的 a ^ b 會被完整保留,最後外層的 XOR 運算時 b 會被抵銷掉,於是剩下 a
a < b 不成立,則 a ^ b 會被 bitwise AND 給清空成 0,最後與外層 XOR 運算時,可留下 b

另外此處不直接使用 scrncpy 搭配 bufsize 的考量為根據 man page 中的敘述 If the source has too few non-null bytes to fill the destination, the functions pad the destination with trailing null bytes.,如果要複製的的 src 不足的話,會有填充 null bytes 的行為,但是如果要把 sp 當成字串解釋,只需要確保在正確的位置有一個 \0 即可。

q_remove_tail: 在佇列尾端 (tail) 移去 (remove) 給定的節點;

此函式與 q_remove_head 的邏輯相似,僅需將 list_del_init 的對象由 head->next 調整成 head->prev 即可。

element_t *q_remove_tail(struct list_head *head, char *sp, size_t bufsize)
{
    ...  // Same as q_remove_head
    struct list_head *lt = head->prev;
    ...  // Same as q_remove_head        
}

q_sort

q_sort 需根據傳入的參數來決定是遞增排序還是遞減排序,因此這邊先實作比較函式

static inline bool str_cmp_asc(const char *a, const char *b) 
{
    return strcmp(a, b) <= 0;
}
static inline bool str_cmp_dsc(const char *a, const char *b) 
{
    return strcmp(a, b) >= 0;
}

根據 strcmp 的 man page

0, if the s1 and s2 are equal;
a negative value if s1 is less than s2;
a positive value if s1 is greater than s2.

s1 的字典序小於 s2 時,回傳一個負數,在此 str_cmp_asc<= 0 來將結果轉換為布林值,反之亦然。這兩個函式稍後會在 q_sort 中根據參數 descend 賦值給函式指標,這樣可以在 q_sort 呼叫時做一次,讓後續比較都可以使用到對應的函式。

接下來是排序演算法的選擇,在此我選擇實作 mergesort,首先是前置處理的部份,我將雙向環狀鍊結串列的環狀結構破壞掉,考慮 q_sort 傳入的參數 head 不帶有資料,因此也將其暫時移除,等待排序結束後再進行恢復。

void q_sort(struct list_head *head, bool descend) {
    ...
    struct list_head *tmp = head->next;
    bool (*cmp)(const char *a, const char *b) = descend ? str_cmp_dsc :   str_cmp_asc;
    head->prev->next = NULL;
    head->next = NULL;
    struct list_head *sorted = mergesort(tmp, cmp);    
    ...
}

mergesort

mergesort 的實作中參考了 你所不知道的 C 語言: linked list 和非連續記憶體 中提及的快慢指標找串列中點的技巧,找出串列的中點,並且對串列進行分割。

struct list_head *mergesort(struct list_head *head, bool (*cmp)(const char *a, const char *b))
{
    if (!head->next) {
        return head;
    }
    struct list_head *fast = head;
    struct list_head *slow = head;
    while (fast->next && fast->next->next) {
        fast = fast->next->next;
        slow = slow->next;
    }
    struct list_head *mid = slow->next;
    slow->next = NULL;
    struct list_head *left = mergesort(head, cmp);
    struct list_head *right = mergesort(mid, cmp);
    return merge(left, right, cmp);
}

merge 的部份同樣參考了你所不知道的 C 語言: linked list 和非連續記憶體中提及的技巧,透過 indirect pointer 來簡化程式邏輯。

在 merge 的階段中暫時不處理 prev 的指標,等待排序完成後再一次進行處理。

struct list_head *merge(struct list_head *l1, struct list_head *l2, bool (*cmp)(const char *a, const char *b)) {
    struct list_head *head = NULL, **ptr = &head, **node;
    for (node = NULL; l1 && l2; *node = (*node)->next) {
        node = cmp(list_entry(l1, element_t, list)->value,
                      list_entry(l2, element_t, list)->value)
                   ? &l1
                   : &l2;
        *ptr = *node;
        ptr = &(*ptr)->next;
    }
    *ptr = l1 ? l1 : l2;
    return head;
}

此處共有三個指標,先說明 head 以及 ptr 的作用,ptr 作為指向 head 指標的指標,可以透過解引用對 head 進行賦值,這樣就可以避免宣告或者分配一個 dummy 節點的成本,當 head 已經不為 NULL 後(也就是第一次進入迴圈時),可再將 ptr 的值向前移動。

node 也是 indirect pointer,他的作用是決定當前要從 l1 還是 l2 取出節點,此處便可用上 cmp 這個函式指標作判斷,將 ptr 指向對應目標,不論選擇的是 l1 還是 l2,都可以透過解引用將 node 賦值給 ptr,再將 ptr 給向前移動,由於 ptr 會指向 l1l2 其一,因此這個行為相當於將 l1l2 已經掛上的節點給去除。

透過上述的實作,我們已經能獲得一個排序過的串列,可以透過 next 來按照升序或者降序進行拜訪,剩餘的部份則是要恢復環狀結構,以及處理 merge 階段沒有做的 prev 指標的部份。

void q_sort(struct list_head *head, bool descend) {
    ...
    // Sort the list and ignore prev link in this stage
    struct list_head *sorted = mergesort(tmp, cmp);
    // Recover previous link and stop at last node
    struct list_head *current = sorted;
    while (current->next) {
        current->next->prev = current;
        current = current->next;
    }    
    
    // Recover circular structure
    sorted->prev = head;
    head->next = sorted;
    head->prev = current;
    current->next = head;   
}

在此使用 mergesort 的回傳值作為起點,開始依序拜訪串列上的節點,並將當前節點的下一個節點的 prev 指向自己,之後向前移動,此處的 while 迴圈會停留在最後一個節點。
這恰好能作為恢復環狀結構的依據,只要將 head 與首與個節點和末個節點進行串接,雙向環狀鍊結串列的排序即可完成。

q_delete_mid

此函式的功能為移除並釋放串列的中點。

bool q_delete_mid(struct list_head *head)
{
    if (!head || list_empty(head))
        return false;

    struct list_head *slow = head->next, *fast = head->next->next;

    while (fast != head && fast->next != head) {
        slow = slow->next;
        fast = fast->next->next;
    }
    element_t *tmp = list_entry(slow, element_t, list);
    list_del(slow);
    free(tmp->value);
    free(tmp);    
    return true;
}

如先前提到的快慢指標的技巧,得以找出位於中間的位置,假設串列長度為 n,while 迴圈執行

n12次,此時 slow 可以停在第
n2
個節點上,這時候將節點相關的鍊結給移除,並且釋放記憶體,即完成操作。

q_reverse

此函式的功能為將雙向環狀鍊結串列的順序反轉。

void q_reverse(struct list_head *head) {
    struct list_head* cur = head->next;
    struct list_head *tmp;
    while (cur != head) {
        tmp = cur->next;
        cur->next = cur->prev;
        cur->prev = tmp;
        cur = tmp;
    }
    tmp = head->next;
    head->next = head->prev;
    head->prev = tmp;
}

在雙向鍊結串列中,只要依序將 next 以及 prev 指標對調,即可完成反轉。

q_swap

此函式功能為將鄰近串列之節點進行交換,考慮此處節點之資料型態為字串,如果單純地交換節點字串的話,當字串長度長的時候會造成負擔,因此此處實作應該採取交換指標的方式。

查看 linux kernel 的 list.h 中可以發現有 list_swap 的實作

static inline void list_replace(struct list_head *old,
				struct list_head *new)
{
	new->next = old->next;
	new->next->prev = new;
	new->prev = old->prev;
	new->prev->next = new;
}
static inline void list_swap(struct list_head *entry1,
			     struct list_head *entry2)
{
	struct list_head *pos = entry2->prev;

	list_del(entry2);
	list_replace(entry1, entry2);
	if (pos == entry1)
		pos = entry2;
	list_add(entry1, pos);
}

直接使用這兩個函式固然能有正確結果,但這兩個函式並不只為了處理相鄰節點的交換,因此這裡稍作修改,加入 __glibc_unlikely(pos == entry1) 輔助編譯器進行優化,並且在 q_swap 中確保 list_swap 傳入的參數符合 unlikely 的假設。

void q_swap(struct list_head *head)
{
    // https://leetcode.com/problems/swap-nodes-in-pairs/
    struct list_head *cur = head->next;
    while (cur != head && cur->next != head) {
        struct list_head *tmp = cur->next->next;
        list_swap(cur->next, cur);
        cur = tmp;
    }
}

q_delete_dup

此函式要求刪除串列中所有重複的數值,在此假設串列以為排序狀態。
由於串列為排序狀態,因此若有出現重複的情況,則兩節點必定相鄰,透過這樣的思維去比較當前節點與下一個節點的字串是否相同來決定要不要刪除節點。

由於此處需要頻繁地取得下一個節點的資料,此處引入 linux kernel 中的巨集 list_next_entry 來獲取下一個節點的資料。

#define list_next_entry(pos, member) \
	list_entry((pos)->member.next, typeof(*(pos)), member)
bool q_delete_dup(struct list_head *head)
{
    if (!head || list_is_singular(head) || list_empty(head))
        return false;
    // https://leetcode.com/problems/remove-duplicates-from-sorted-list-ii/
    struct list_head *cur = head->next;
    while (cur != head && cur->next != head) {
        element_t *e1= list_entry(cur, element_t, list);
        element_t *e2 = list_next_entry(e1, list);
        bool flag = 0;
        while (cur->next != head && !strcmp(e1->value, e2->value)) {
            list_del(cur->next);
            free(e2->value);
            free(e2);
            flag = 1;
            e2 = list_next_entry(e1, list);
        }
        struct list_head *tmp = cur->next;
        if (flag) {
            list_del(cur);
            free(e1->value);
            free(e1);
        }
        cur = tmp;
    }
    return true;
}

當發現重複節點時,就先固定的位置 cur,並且不斷刪除重複的節點,直到 cur->next 不再重複,或者已到達串列的結尾。

可能的改進方向

查看 glibc 的實作,strcmp 必須逐個 byte 去比較

https://github.com/zerovm/glibc/blob/master/string/strcmp.c
do
{
  c1 = (unsigned char) *s1++;
  c2 = (unsigned char) *s2++;
  if (c1 == '\0')
    return c1 - c2;
} while (c1 == c2);

strcmp 的最糟情況發生時,代表整個 e1 紀錄的字串都會看過。如果在過程中紀錄 e1 字串的長度,並且改用如memcmp 這種一次可比較不只一個 byte 的函式是否能有更好的效能。

q_reversek

此函式將串列中 k 個節點視為一個群組,並且將這個群組進行反轉。
此處可善用先前實作過的 reverse 來輔助完成。

void q_reverseK(struct list_head *head, int k)
{
    // https://leetcode.com/problems/reverse-nodes-in-k-group/
    if (k <= 1 || !head || list_empty(head) || list_is_singular(head))
        return;

    struct list_head *cur, *tmp;
    struct list_head *sub_head = head;
    unsigned int count = 0;

    list_for_each_safe (cur, tmp, head) {
        count++;
        if (count == k) {
            struct list_head sublist;
            INIT_LIST_HEAD(&sublist);
            list_cut_position(&sublist, sub_head, cur);
            q_reverse(&sublist);
            list_splice(&sublist, tmp->prev);
            count = 0;
            sub_head = tmp->prev;
        }
    }
}

此處透過 list_for_each_safe 走訪串列,並當計數達到 k 的時候,透過 list_cut_position 巨集截出子串列,再使用已實作的 q_reverse 反轉此部份,最後再將結果透過 list_splice 加入至原先的位置。

要注意的是 list_cut_position 的第二個參數 head_from 預設傳入的是不帶資料的串列開頭,因此實際切割的範圍會是 head_from->nextnode,所以此處 sub_head 初始化為 head 並且後續更新時會將位置設在實際想切割的節點的前一個位置。

list_splice 的行為也類似,他假設傳入的是串列開頭,因此 tmp 雖是 cur 的下一個節點,但是依然使用 tmp->prev 作為參數。

q_ascend, q_descend

這兩個函式取自於 Leetcode 2487. Remove Nodes From Linked List,題目要透過刪除特定節點使串列變為遞增或者遞減的排序狀態。

q_ascend 為例,對於串列中任意節點,只要在其右側存在比自己小的元素,那就要將該元素刪除。由於 queue.c 中的串列為雙向環狀鍊結串列,可以在給定開頭節點的情況下,反向拜訪串列。若透過 next 指標拜訪串列為遞增,則反向拜訪時必須要呈現遞減,可以利用這個特性來解題。

int q_ascend(struct list_head *head)
{
    element_t *cur, *tmp;
    const element_t *min_node = list_last_entry(head, element_t, list);
    list_for_each_entry_prev_safe(cur, tmp, head, list)
    {
        if (strcmp(cur->value, min_node->value) > 0) {
            list_del(&cur->list);
            free(cur->value);
            free(cur);
        } else {
            min_node = cur;
        }
    }
    return q_size(head);
}

此處加入 list_for_each_entry_prev_safe 巨集來簡化程式碼複雜度並反向拜訪串列。
此處預期串列節點的字典序要越看越小,在進入迴圈前先將 min_node 設為串列尾巴的節點,當 cur 的字典序大於 min_node 時,就將 cur 給刪除,否則就更新 min_node 的資訊。

q_merge

此函式功能為將數個以排序的佇列合併成一個佇列(須維持排序狀態),此函式有個比較不一樣的地方是參數 list_head 是一個定義在 q_test.c 中的變數 chain 這個串列的開頭(串列上的元素為 q_context_t),我們需要用這個開頭來獲得所有的佇列,並將其合併。

typedef struct {
    struct list_head head;
    int size;
} queue_chain_t;
static queue_chain_t chain = {.size = 0};
static queue_contex_t *current = NULL;
//q_test.c: do_new
if (exception_setup(true)) {
    queue_contex_t *qctx = malloc(sizeof(queue_contex_t));
    list_add_tail(&qctx->chain, &chain.head);

    qctx->size = 0;
    qctx->q = q_new();
    qctx->id = chain.size++;

    current = qctx;
}
//q_test.c: do_merge 
if (current && exception_setup(true))
    len = q_merge(&chain.head, descend);

上述程式碼中顯示了 q_test 是如何在 new 命令時,將一個新的佇列加入至 chain 上,q_test 中除了 do_merge 以外的函式在呼叫 queue.c 中的則是將 current->q 作為參數。

int q_merge(struct list_head *head, bool descend)
{
    queue_contex_t *cur;
    bool (*cmp)(const char *a, const char *b) =
        descend ? str_cmp_dsc : str_cmp_asc;
    struct list_head *lh = NULL;
    list_for_each_entry (cur, head, chain) {
        if (!lh) {
            lh = cur->q;
            lh->prev->next = NULL;
        } else {
            cur->q->prev->next = NULL;
            lh->next = merge(lh->next, cur->q->next, cmp);
            INIT_LIST_HEAD(cur->q);
        }
    }
    struct list_head *tmp = lh->next, *prev = lh;
    while (tmp) {
        tmp->prev = prev;
        prev = tmp;
        tmp = tmp->next;
    }
    lh->prev = prev;
    prev->next = lh;
    return q_size(lh);
}

善用先前在 mergesort 實作的 merge 函式,來將 chain 上所有佇列都合併到 chain 上的第一個佇列。
這邊的原因是 q_test 中的 do_merge 有一段程式會將 chain 上的佇列釋放掉。

struct list_head *cur = chain.head.next->next;
while ((uintptr_t) cur != (uintptr_t) &chain.head) {
    queue_contex_t *ctx = list_entry(cur, queue_contex_t, chain);
    cur = cur->next;
    q_free(ctx->q);
    free(ctx);
}

因此為了確保所有含有字串的節點能夠保留,必須選擇將節點掛在 chain 的第一個佇列上,同時這也是 merge(lh->next, cur->q->next, cmp); 完成後需要將 cur->q 初始化的原因,也是為了避免這段程式碼的 q_free 執行時發生錯誤。

論文 <Dude, is my code constant time?> 閱讀

這篇論文提出了一項稱為 dudect 的工具,可用於檢測程式的執行時間是否為常數時間,dudect 透過直接執行程式的方式作為統計資料的來源。

dudect 工具的執行流程可以分為下面三個步驟

  1. 執行時間量測
  2. 資料後處理
  3. 假設檢定

來判定程式的執行時間是否為常數。

執行時間量測

在量測時間的部份,首先我們需要定義兩組類別的輸入,分別為 fixed value 以及 random value。在 fixed value 的部份,論文提到可以選擇可觸發 special corner-case 的內容作為輸入。

接著是量測時間的方式,根據現代 CPU 提供的 cycle counter,例如 X86 架構的 TSC 或者 ARM 架構的 systick,透過這些裝置取得的數值,可以精準量測執行時間。

最後是去除環境干擾的部份,考慮測試環境並不是特別在裸機環境中測試,於作業系統環境中量測時間時,可能會受到許多干擾(排程器的上下文切換(context switch)、資源分配以及系統中斷等諸多元素),因此每一輪要量測的項目必須從 fixed 或者 random 中隨機選擇一組。
關於測試資料選擇帶來的影響在 Statistics and Secret Leakage論文中有更詳細的說明。

資料後處理

在作資料分析的時候,如何去除不具代表性的離群值是一個對分析結果至關重要的因素。
論文中提到說對於量測時間通常會呈現正偏態(positive skewed)的形式,如下圖所示
image
正偏態指的是大多數的資料會集中在左側,代表未受干擾的執行時間,而受到干擾的執行時間在此應屬於小部份的資料,圖中的 x 軸表示的為執行時間,y 軸則代表出現的頻率(frequency)。

對於去除離群值的手法,論文中提到兩種方式,分別為

  • 裁剪(cropping): 此方法設定一與輸入資料類別獨立的閥值,將蒐集到的資料中大於閥值的部份去除
  • 高階預處理(higher-order preprocessing): 此方法的目的為進一步捕捉資料在高階統計量的差異性,可讓後續假設檢定的部份對高階統計量更敏感。

假設檢定

這個階段主要是對蒐集到的兩組執行時間資料做檢定,已判斷隨機輸入的執行時間是否跟固定輸入的執行時間在分佈相等。

假設檢定的方法有很多,論文中提到兩種,這邊著重在 Welch’s t-test 上。

假設檢定的流程主要如下,由研究者提出一段虛無假說(Null hypothesis,記為

H0),接著提出一組對立假說(Alternative hypothesis, 紀為
H1
),接著選定顯著水準作
α
作為最終是否能接受
H0
的標準。

根據 Welch's t-test 我們先計算兩組測試資料的平均數以及變異數,此處的

X¯ 為樣本平均數,
sx2
則為樣本變異數,令 X 代表的為固定測資的量測時間,Y 為隨機測資的量測時間,下標代表為第幾筆的執行時間,n 為總共有多少筆資料。
X¯=1nxi=1nxXi

sX2=1nx1i=1nx(XiX¯)2

Y¯=1nyi=1nyYi

sY2=1ny1i=1ny(YiY¯)2

接著我們根據公式以及先前的結果計算出 t 值,我們可以觀察一下公式中的數字對於 t 值結果的影響,當兩組數據平均值差異很大時,t 值會變大(或變小)

t=X¯Y¯sX2nX+sY2nY

當 t 值越接近零時,代表兩組數據的差異性越小,到此已經可以根據先前定義的

α 下結論,當
|t|
值大於
α
時,我們認為兩組數據存在差異,反之認為兩組數據相同。

lab0-c 中 dudect 的實作

首先我們從 qtest.c 中可以看到幾個函式

  • is_remove_tail_const, is_remove_head_const, is_insert_tail_constis_insert_head_const

不過如果使用 grep is_remove_tail_const -nr 會發現找不到函式的定義在哪裡,原因是這些函式是由巨集輔助定義而來。

首先從 fixture.h 中找到一個巨集定義 _,傳入一個參數 x,並且展開為 bool is_##x##_const(void);

/* define in consant.h
#define DUT_FUNCS  \
    _(insert_head) \
    _(insert_tail) \
    _(remove_head) \
    _(remove_tail)
*/
#define _(x) bool is_##x##_const(void);
DUT_FUNCS
#undef _

此處可參閱 C 語言規格書 6.10.3.3 對於 ## 運算子的說明

For both object-like and function-like macro invocations, before the replacement list is reexamined for more macro names to replace, each instance of a ## preprocessing token in the replacement list (not from an argument) is deleted and the preceding preprocessing token is concatenated with the following preprocessing token. Placemarker preprocessing tokens are handled specially: concatenation of two placemarkers results in a single placemarker preprocessing token, and concatenation of a placemarker with a non-placemarker preprocessing token results in the non-placemarker preprocessing token. If the result is not a valid preprocessing token, the behavior is undefined. The resulting token is available for further macro replacement. The order of evaluation of ## operators is unspecified.

其中提到,當要被替換的參數前後方都跟著 ## 運算子的話,那前置處理器就會在文字替換完成後將他們拼接在一起。
由此處可以得知,此段巨集被展開後會成為 bool is_remove_tail_const(void) 等等,也就是我們在 qtest.c 中所看到的函式名稱。

接著看 constant.h 中的巨集,也用到了類似的技巧

#define DUT_FUNCS  \
    _(insert_head) \
    _(insert_tail) \
    _(remove_head) \
    _(remove_tail)

#define DUT(x) DUT_##x

enum {
#define _(x) DUT(x),
    DUT_FUNCS
#undef _
};

巨集 DUT_FUNCS 會展開成_(is_insert_head) 等一系列的內容,與 fixture.h 的巨集搭配,則可以產生出函式的定義bool is_insert_head_const(void); 等一系列定義

接著再次使用 DUT_FUNCS 達成 enum 的定義,可以看到在這裡面有一個 _ 的定義,會被替換成 DUT(x),,而 DUT(x) 又會被替換成 DUT_x 根據傳入的 x 可以產生出不同的拼接效果。

enum 的內部使用了 DUT_FUNCS 得到 _(is_insert_head) 等等的內容,在此就會被替換成 DUT_insert_head, 等等內容,由這些內容組成最終的 enum

chiangkd 的筆記中看到老師的留言,得知這個技巧稱為 X_macro,意指同一個 _ 定義在不同上下文中產生不同展開的效果。

最後查看 fixture.c 中,可以看到下列定義

#define DUT_FUNC_IMPL(op) \
    bool is_##op##_const(void) { return test_const(#op, DUT(op)); }

#define _(x) DUT_FUNC_IMPL(x)
DUT_FUNCS
#undef _

此處的 _ 會將 DUT_FUNCS 展開成函式的實作

bool is_insert_head_const(void) { return test_const("insert_head", DUT_insert_head); } 

到此便得知了 is_xxx_const 這系列函式實際上是呼叫 test_const 來完成相關的操作。

查看 test_const 函式,可以發現會進行 TEST_TRIES 輪的測試,每輪測試將蒐集 ENOUGH_MEASURE / (N_MEASURES - DROP_SIZE * 2) 進行測試。

// in fixture.c
static t_context_t *t
static bool test_const(char *text, int mode)
{
    ...
    t = malloc(sizeof(t_context_t));
    for (int cnt = 0; cnt < TEST_TRIES; ++cnt) {
        init_once();
        for (int i = 0; i < ENOUGH_MEASURE / (N_MEASURES - DROP_SIZE * 2) + 1;
             ++i)
            result = doit(mode);
        if (result)
            break;
    }
    ...
}
// in constant.c
static struct list_head *l = NULL; 

迴圈一開始透過 init_once 初始化 tl 這兩個變數,將 l 修改為 NULL,以及將 t_context_t 的內容初始化為 0。

typedef struct {
    double mean[2];
    double m2[2];
    double n[2];
} t_context_t;

接著查看 doit 函式的內容

static bool doit(int mode)
{
    // 分配相關記憶體資源
    ...
    if (!before_ticks || !after_ticks || !exec_times || !classes ||
        !input_data) {
        die();
    }

    prepare_inputs(input_data, classes);

    bool ret = measure(before_ticks, after_ticks, input_data, mode);
    differentiate(exec_times, before_ticks, after_ticks);
    update_statistics(exec_times, classes);
    ret &= report();
    // 釋放相關資源
    ...
    return ret;
}

此處又有幾個關鍵的函式,首先為 prepare_inputs 對應論文方法的第一階段

void prepare_inputs(uint8_t *input_data, uint8_t *classes)
{
    randombytes(input_data, N_MEASURES * CHUNK_SIZE);
    for (size_t i = 0; i < N_MEASURES; i++) {
        classes[i] = randombit();
        if (classes[i] == 0)
            memset(input_data + (size_t) i * CHUNK_SIZE, 0, CHUNK_SIZE);
    }

    for (size_t i = 0; i < N_MEASURES; ++i) {
        /* Generate random string */
        randombytes((uint8_t *) random_string[i], 7);
        random_string[i][7] = 0;
    }
}

這邊首先將 input_data 整塊初始化為隨機生成的資料,再來根據論文所敘述每一輪測試應該要隨機挑選 fixed value 或是 random value,因此再次獲得一個隨機的 bit 來決定該輪的測試資料要使用哪一種類型,如果此值為 0 就將 input_data 對應的位移量(offset)設成 0。
input_data 也產生 N_MEASURES 組的隨機字串供後續使用。

接著是 measure 的部份,裡面有一個 switch 包含 default 共對應了 5 個 case,但考慮前面有 assert,且每個 case 都含有 break 敘述,因此 default 應該是不會執行到,另外 insetremove 各有兩組這邊各留下一組作為說明。

bool measure(int64_t *before_ticks,
             int64_t *after_ticks,
             uint8_t *input_data,
             int mode)
{
    assert(mode == DUT(insert_head) || mode == DUT(insert_tail) ||
           mode == DUT(remove_head) || mode == DUT(remove_tail));

    switch (mode) {
    case DUT(insert_head):
        for (size_t i = DROP_SIZE; i < N_MEASURES - DROP_SIZE; i++) {
            char *s = get_random_string();
            dut_new();
            dut_insert_head(
                get_random_string(),
                *(uint16_t *) (input_data + i * CHUNK_SIZE) % 10000);
            int before_size = q_size(l);
            before_ticks[i] = cpucycles();
            dut_insert_head(s, 1);
            after_ticks[i] = cpucycles();
            int after_size = q_size(l);
            dut_free();
            if (before_size != after_size - 1)
                return false;
        }
        break;
    case DUT(remove_tail):
        for (size_t i = DROP_SIZE; i < N_MEASURES - DROP_SIZE; i++) {
            dut_new();
            dut_insert_head(
                get_random_string(),
                *(uint16_t *) (input_data + i * CHUNK_SIZE) % 10000 + 1);
            int before_size = q_size(l);
            before_ticks[i] = cpucycles();
            element_t *e = q_remove_tail(l, NULL, 0);
            after_ticks[i] = cpucycles();
            int after_size = q_size(l);
            if (e)
                q_release_element(e);
            dut_free();
            if (before_size != after_size + 1)
                return false;
        }
        break;
}

此處有個問題是為何迴圈是從 DROP_SIZE 開始算起,不應該是跑完後才決定有多少離群值應該要刪除嗎,查看 dudect 論文的原版沒有看到這樣的行為,先紀錄下此狀況。

static void measure(dudect_ctx_t *ctx) {
  for (size_t i = 0; i < ctx->config->number_measurements; i++) {
    ctx->ticks[i] = cpucycles();
    do_one_computation(ctx->input_data + i * ctx->config->chunk_size);
  }

  for (size_t i = 0; i < ctx->config->number_measurements-1; i++) {
    ctx->exec_times[i] = ctx->ticks[i+1] - ctx->ticks[i];
  }
}

insert_headremove_tail 的共同處檢視

  • dut_new: 用於呼叫 q_new() 產生一個新的佇列
  • dut_insert_head: 此處為巨集定義,展開後如下
    ​​​​do { 
    ​​​​    int j = *(uint16_t *) (input_data + i * 2) % 10000; 
    ​​​​    while (j--) 
    ​​​​        q_insert_head(l, get_random_string()); 
    ​​​​} while (0)
    
    可以看到它從 input_data 中隨機取了一個大小 2 位元組的無號整數,用於將佇列的長度調整為 j,並且此處輸入的皆是隨機產生的字串
  • dut_free: 用於呼叫 q_free() 釋放測試用的佇列

測試程式主要的部份如下,開始測試前紀錄佇列目前的大小以及當前的 cpu cycles,然後呼叫待測函式,在函式結束後再次讀取 cpu cycles,之後可以用兩值差異作為此次執行時間之依據。

int before_size = q_size(l);
before_ticks[i] = cpucycles();
dut_insert_tail(s, 1);
after_ticks[i] = cpucycles();
int after_size = q_size(l);
dut_free();
if (before_size != after_size - 1)
    return false;

若待測函式執行後,佇列大小不如預期則直接 return,在 remove_tail 則多了一個將取得的 element_t 給釋放的動作

在此發現第二個問題,隨機測資的機制似乎不完善,在 insert 中永遠都使用隨機字串,而在 remove 的案例中的隨機變數應該設為是否將字串從佇列中複製出來。

接下來的 differentiate(exec_times, before_ticks, after_ticks); 函式計算每一筆 cpu cycles 前後的差異,並且將結果存在 exec_times 中。

再來的 update_statistics(exec_times, classes); 則是將每一筆量測到的執行時間拿來計算 t test 所需要的統計量,樣本平均數以及樣本變異數(函式 t_push 負責計算)。注意此處缺少了去除離群值的實作,將帶有誤差的資料拿來計算可能是我們無法通過評分的原因之一。

最後的 report 函式負責判斷此次測試的結果是否為常數時間

tatic bool report(void)
{
    double max_t = fabs(t_compute(t));
    double number_traces_max_t = t->n[0] + t->n[1];
    double max_tau = max_t / sqrt(number_traces_max_t);

    printf("\033[A\033[2K");
    printf("meas: %7.2lf M, ", (number_traces_max_t / 1e6));
    if (number_traces_max_t < ENOUGH_MEASURE) {
        printf("not enough measurements (%.0f still to go).\n",
               ENOUGH_MEASURE - number_traces_max_t);
        return false;
    }
    printf("max t: %+7.2f, max tau: %.2e, (5/tau)^2: %.2e.\n", max_t, max_tau,
           (double) (5 * 5) / (double) (max_tau * max_tau));

    /* Definitely not constant time */
    if (max_t > t_threshold_bananas)
        return false;
    /* Probably not constant time. */
    if (max_t > t_threshold_moderate)
        return false;
    /* For the moment, maybe constant time. */
    return true;
}

注意此處在尚未獲得足夠多的測試資料時,會直接離開函式,應該先行判斷測資數量是否充足,再進行後續的操作。

統整一下目前的執行流程,從 q_test.c 中的 queue_XXX 作為函式呼叫的起點

queue_XXX(position_t pos, int argc, char *argv[])
    => is_XXX_const 
        => test_const("XXX", DUT_XXX)
            => doit(DUT_XXX)

doit(DUT_XXX) 執行完成後便以獲得統計檢驗的結果,此結果一路回傳至 q_test.c 中的函式,並且根據結果印出是否通過測試。

lab0-c dudect 實作之改善

measure 函式取得測資的計數有誤

commit f9837fc

在 dudect 的原實作中,可以看到他將取得的執行時間資料的前幾筆給丟棄,不參與統計量的計算,但在 lab0-c 中卻是在量測執行時間時的迴圈用 DROP_SIZE 扣掉要量測的次數

bool measure(int64_t *before_ticks,
     case DUT(insert_tail):
-        for (size_t i = DROP_SIZE; i < N_MEASURES - DROP_SIZE; i++) {
+        for (size_t i = 0; i < N_MEASURES; i++) {
             char *s = get_random_string();
             dut_new();
             dut_insert_head(

static void update_statistics(const int64_t *exec_times, uint8_t *classes)
{
+    for (size_t i = DROP_SIZE; i < N_MEASURES; i++) {

在此我們固定在 measure 內量測 N_MEASURES 次,並且在計算統計量時才去除前 DROP_SIZE 筆資料。

update_statistics 沒有去除離群值

commit f9837fc

在 dudect 的論文方法中,有提到剪裁(cropping)的資料後處理手法,dudect 的實作中將每一筆執行時間的資料分為 102 種不同的 case 處理,已確保在不同資料處理的狀況下,量測函式都要能通過 t-test

#define DUDECT_NUMBER_PERCENTILES 100
static void update_statistics(dudect_ctx_t *ctx) {
  for (size_t i = 10 /* discard the first few measurements */; i < (ctx->config->number_measurements-1); i++) {
    int64_t difference = ctx->exec_times[i];
    ...
    t_push(ctx->ttest_ctxs[0], difference, ctx->classes[i]);
    for (size_t crop_index = 0; crop_index < DUDECT_NUMBER_PERCENTILES; crop_index++) {
      if (difference < ctx->percentiles[crop_index]) {
        t_push(ctx->ttest_ctxs[crop_index + 1], difference, ctx->classes[i]);
      }
    if (ctx->ttest_ctxs[0]->n[0] > 10000) {
      double centered = (double)difference - ctx->ttest_ctxs[0]->mean[ctx->classes[i]];
      t_push(ctx->ttest_ctxs[1 + DUDECT_NUMBER_PERCENTILES], centered * centered, ctx->classes[i]);
    }
  }
}

在此我們引入同樣的手法,計算出百分位數作為去除離群值的依據

typedef struct {
    double mean[2];
    double m2[2];
    double n[2];
+    int64_t *percentiles;
+    bool first;
} t_context_t;

在結構體中新增兩個成員,用於判斷是否要計算百分位數以及儲存計算的結果

@@ -45,5 +45,6 @@ void t_init(t_context_t *ctx)
        ctx->m2[class] = 0.0;
        ctx->n[class] = 0.0;
    }
+    ctx->first = true;

init_once 呼叫的 t_init 中,固定把 first 設成 true

static bool doit(int mode) {
    ...
    bool ret = measure(before_ticks, after_ticks, input_data, mode);
    differentiate(exec_times, before_ticks, after_ticks);
+    if (t->first) {
+        t->percentiles = calloc(DUDECT_NUMBER_PERCENTILES, sizeof(int64_t));
+        prepare_percentiles(t, exec_times);
+    } else
+        update_statistics(exec_times, classes);
    ret &= report();
    ...
}

並且將第一組獲得的執行時間用於計算百分位數,不參與統計量的計算,百分位數的計算參考原 dudect 的實作,將完整的一組資料做排序,並且計算出各個 index 的閥值,除存在結構體的成員中。

static int64_t percentile(int64_t *a, double which, size_t size)
{
    size_t array_position = (size_t) ((double) size * (double) which);
    return a[array_position];
}
t->percentiles[i] = percentile(
    exec_times,
    1 - (pow(0.5, 10 * (double) (i + 1) / DUDECT_NUMBER_PERCENTILES)),
    N_MEASURES);

接這是統計量計算函式的修改

+#define PERCENTILE_UPPER_BOUND 95
static void update_statistics(const int64_t *exec_times, uint8_t *classes)
{
    for (size_t i = DROP_SIZE; i < N_MEASURES; i++) {
        int64_t difference = exec_times[i];
        /* CPU cycle counter overflowed or dropped measurement */
        if (difference <= 0)
            continue;
+       if (difference < t->percentiles[PERCENTILE_UPPER_BOUND]) {
+           t_push(t, difference, classes[i]);
        }
    }
}

我們將先前計算好的百分位數拿來與執行時間 difference 作比較,僅保留小於閥值的資料計算統計量。

並且考慮去除部份數值後,資料量可能不足以滿足 dudect 的評測,因此原本固定的迴圈次數修改為不斷執行,直到取得足夠測試資料為止。

static bool test_const(char *text, int mode)
{
    bool result = false;
    ...
-    for (int i = 0; i < ENOUGH_MEASURE / (N_MEASURES - DROP_SIZE * 2) + 1;
-         ++i)    
+    while (t->n[0] + t->n[1] < ENOUGH_MEASURE)
        result = doit(mode);
    ...
    return result;
}

report 函式的優化

commit 676b460

在 lab0-c 的實作中 report 函式每次都會計算 t value,但是在測試資料不足時卻又不採用此次結果。

static bool report(void) {
    double number_traces_max_t = t->n[0] + t->n[1];
    printf("\033[A\033[2K");

    if (number_traces_max_t < ENOUGH_MEASURE) {
        printf("not enough measurements (%.0f still to go).\n",
               ENOUGH_MEASURE - number_traces_max_t);
        return false;
    }
    double max_t = fabs(t_compute(t));
    ...
}

因此在這邊將判斷測試資料的程式碼移動至函式最前面,已確保不會產生不必要的計算。

measure 中沒有對完整的測資做隨機處理

commit 8f57667

回想一下 queue.c 中 insert 和 remove 的函式參數

element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize)
bool q_insert_head(struct list_head *head, char *s)

有一個共通點為佇列,另外 insert 函式中有一個字串參數 s,而 remove 函式中則是有 sp 以及 bufsize,對照 dudect 論文所述,必須能產生隨機類型的輸入。對應上這些參數,我們可以透過隨機變數初始化下列參數:

  • head: 輸入佇列的長度
  • s: 輸入字串的長度
  • bufsize: 需要複製出的資料大小
  • sp: 是否為 NULL(需與 bufsize 搭配)

在原本 lab0-c 的實作中,僅將取得的隨機數用於初始化佇列的大小,對於 remove 函式所需要的變數皆設為固定的數值element_t *e = q_remove_tail(l, NULL, 0);,這便是先前 remove 函式能通過測試的原因。

我將整個 measure 函式改寫成下面的形式

bool measure(int64_t *before_ticks, int64_t *after_ticks, uint8_t *input_data, int mode) { for (size_t i = 0; i < N_MEASURES; ++i) { char *s, tmp[32] = {0}; char *sp = NULL; int buf_sz = 0; uint16_t rn = *(uint16_t *) (input_data + i * CHUNK_SIZE) % 10000; if (rn == 0) { s = "FIXEDSTR"; } else { s = get_random_string(); sp = tmp; buf_sz = (rn % 8) + 1; } dut_new(); if (mode == DUT_remove_head || mode == DUT_remove_tail) rn++; dut_insert_head(get_random_string(), rn); bool (*insert_func)(struct list_head *, char *) = NULL; element_t *(*remove_func)(struct list_head *, char *, size_t) = NULL; if (mode == DUT_insert_head || mode == DUT_insert_tail) insert_func = mode == DUT_insert_head ? q_insert_head : q_insert_tail; else if (mode == DUT_remove_head || mode == DUT_remove_tail) remove_func = mode == DUT_remove_head ? q_remove_head : q_remove_tail; int before_size = q_size(l); if (insert_func) { before_ticks[i] = cpucycles(); insert_func(l, s); after_ticks[i] = cpucycles(); } else { before_ticks[i] = cpucycles(); element_t *e = remove_func(l, sp, buf_sz); after_ticks[i] = cpucycles(); if (e) q_release_element(e); } int after_size = q_size(l); dut_free(); if (insert_func && before_size != after_size - 1) return false; else if (remove_func && before_size != after_size + 1) return false; } return true; }

重要的改動有以下幾點:

  1. 將 switch 取消,改為用 mode 以及函式指標決定受測函式(25 ~ 30 行)
  2. input_data 隨機取出的 rn 判斷此次測試使用固定測資還是隨機測資(7 ~ 16 行)
  3. 以函式指標是否為空判斷後續要使用哪一個函式指標作為受測函式(33 ~ 43 行)
  4. 以函式指標判斷函式執行是否有成功(47 ~ 49 行)

修改完後的測試

做了上述修改後,現在四個佇列操作函式皆無法通過測試,不過我認為這是預期內的狀況
這個 repo 是我從原版 dudect 複製修改的
我將測試資料大小設定為 8 bytes,並且以 memcmp 作為測試函式,此處預期至少會比較 1 個位元組的資料,至多 8 個,在執行時間上的差異上可以算是非常小。

  • 設成 8 是為了與 lab0-c 的 dudect 的隨機字串長度一樣
int check_tag(uint8_t *x, uint8_t *y, size_t len) {
  return memcmp(x, y, len);
}
uint8_t do_one_computation(uint8_t *data) {
  /* simulate totally bogus MAC check in non-constant time */
  return check_tag(data, secret, 8);
}

以下是測試結果,依然被 deduct 被判定為非常數執行時間,由此可得知 deduct 具有相當的敏感度。

$ ./dudect_simple_O0
meas:    0.10 M, max t:   +2.00, max tau: 6.41e-03, (5/tau)^2: 6.09e+05. For the moment, maybe constant time.
meas:    0.19 M, max t:   +3.56, max tau: 8.20e-03, (5/tau)^2: 3.72e+05. For the moment, maybe constant time.
meas:    0.30 M, max t:   +3.85, max tau: 7.04e-03, (5/tau)^2: 5.04e+05. For the moment, maybe constant time.
meas:    0.40 M, max t:   +3.95, max tau: 6.25e-03, (5/tau)^2: 6.40e+05. For the moment, maybe constant time.
...
...
meas:    5.63 M, max t:   +9.87, max tau: 4.16e-03, (5/tau)^2: 1.45e+06. For the moment, maybe constant time.
meas:    5.73 M, max t:   +9.93, max tau: 4.15e-03, (5/tau)^2: 1.45e+06. For the moment, maybe constant time.
meas:    5.83 M, max t:   +9.80, max tau: 4.06e-03, (5/tau)^2: 1.52e+06. For the moment, maybe constant time.
meas:    5.93 M, max t:   +9.95, max tau: 4.09e-03, (5/tau)^2: 1.50e+06. For the moment, maybe constant time.
meas:    6.03 M, max t:   +9.91, max tau: 4.03e-03, (5/tau)^2: 1.54e+06. For the moment, maybe constant time.
meas:    6.13 M, max t:  +10.10, max tau: 4.08e-03, (5/tau)^2: 1.50e+06. Probably not constant time.

考慮我 lab0-c 實作的佇列操作使用 memcpy 以及 strdup 等函式進行資料複製,這些函式執行時間本身會隨著資料量而改變,此外還有 insert 函式中的 malloc 涉及作業系統資源分配,因此即便 list_add 以及 list_del 屬於常數時間操作,但整體上來說佇列的 insert 以及 remove 並非常數時間,導致未能通過測試。

新增 shuffle 命令

首先我們觀察 console 的命令是如何被建立的,在 console.h 中可以看到一個巨集
#define ADD_COMMAND(cmd, msg, param) add_cmd(#cmd, do_##cmd, msg, param)
這個巨集在函式 console_init 中被頻繁使用,它用到了前面說過得 ## 運算子,以及 # 運算子。
# 運算子的作用是將要代換的文字內容給字串化,在 C 語言來說也就是加上 "" 進行修飾。

以下是一個使用的範例

ADD_COMMAND(dedup, "Delete all nodes that have duplicate string", "");
add_cmd("dedup", do_dedup, "Delete all nodes that have duplicate string", "")

第一個參數 dedup 會被字串化成 "dedup" 後作為 add_cmd 的第一個參數,另外 dedup 也會被拼接成 du_dedup,由此可知我們要加入的函式必須取名為 do_xxx 的形式,並且在 qtest.c 中實作這個函式,在由這個 do_xxx 的函式去完成我們想要達成的內容。

接著開始新增命令,同樣地在 console_init 中加入一個 ADD_COMMAND

ADD_COMMAND(shuffle, "Shuffle nodes in queue randomly", "");

並且在 q_test.c 中提供介面

static bool do_shuffle(int argc, char *argv[])
{
    q_shuffle(current->q);
    return true;
}

q_shuffle 的本體實作如下

commit ecf7321

+    struct list_head **nodes =
+        (struct list_head **) malloc(sizeof(struct list_head *) * sz);
    
+    struct list_head *cur = head->next;
+    for (int i = 0; i < sz; i++) {
+        nodes[i] = cur;
+        cur = cur->next;
+    }
+    for (int i = sz - 1; i > 0; i--) {
+        randombytes(buf, sizeof(buf));
+        int j = *(uint32_t *) buf % (i + 1);
+        if (i != j) {
+            struct list_head *tmp = nodes[i];
+            nodes[i] = nodes[j];
+            nodes[j] = tmp;
+       }
+    }
+    INIT_LIST_HEAD(head);
+    for (int i = 0; i < sz; i++) {
+        list_add_tail(nodes[i], head);

由於串列上的資料為字串,要直接在串列上採用Yates 演算法需要頻繁地複製字串資料,因此藉由一個動態分配的陣列以快速取得每一個指向串列節點的指標,也可以透過陣列隨機訪問的特性快速向陣列後方交換元素。

當 Yates 演算法在儲存串列節點指標的陣列完成洗牌後,我們可以在根據這個順序恢復串列的結構。

實作 XOR PRNG 並整合至 qtest

commit 82a267d and 4872ef1

首先是新增一個 option 可以設定 PRNG 的值,以決定使用內建(/dev/urandom)作為亂數來源或者是使用新增的 XOR PRNG 來產生隨機數。

@@ -75,6 +75,8 @@ static int string_length = MAXSTRING;
+static int prng = 0;
@@ -173,7 +175,14 @@ static void fill_rand_string(char *buf, size_t buf_size)
-    randombytes((uint8_t *) randstr_buf_64, len * sizeof(uint64_t));
+    switch (prng) {
+    case 1:
+        // TODO: implement a prng use xor shift
+        break;
+    case 0:
+    default:
+        randombytes((uint8_t *) randstr_buf_64, len * sizeof(uint64_t));
+    }
@@ -1111,6 +1120,9 @@ static void console_init()
+    add_param("prng", &prng,
+              "Select the random number generator [0:/dev/urandom, 1:xor]",
+              NULL);

首先新增一個變數,用來區分當前的亂數模式,並且在產生隨機字串的函式加入判斷,根據當前模式產生出亂數。
模仿其他 option 的實作,可透過 add_param 將變數 prng 加入 param_list 這個串列上。

/* Integer-valued parameters */
typedef struct __param_element {
    char *name;
    int *valp;
    char *summary;
    /* Function that gets called whenever parameter changes */
    setter_func_t setter;
    struct __param_element *next;
} param_element_t;

param_list 串列除了紀錄 option 的數值以外,也紀錄 option 的名稱以及提示訊息,可以在 qtest 中輸入 help 查看 option 的狀態,以及使用方式。

XOR shift 實作

commit 220eb9a

在實作 XOR shift 前,我先行閱讀了Xorshift RNGs這篇論文,得知 XOR shift 有一個限制為存在週期,週期的長短跟 bit 數有關,最長為

2n1,因此為了避免太快的達到一個週期,而產生出重複的序列,我以 64 bits 進行實作。

static void xorshift(u_int64_t *val)
{
    *val ^= *val << 13;
    *val ^= *val >> 7;
    *val ^= *val << 17;
}

首先查看 xorshift 函式的運算,每次進行三組 XOR + shift 的操作。
這個運算的原理來自於線性代數,可以將這個過程視為一個線性轉換

T=(I+La)(I+Rb)(I+Lc),此處 I 為單位矩陣
Lx
為向左位移 x 位的矩陣,
Rx
為向右位移 x 位的矩陣。

考慮一個 GF(2) 上的 4 階單位矩陣與一個

L1 矩陣,以及用一個列向量 y 表示二進位數字每個 bit 的狀態
yT=(0110)

I=(1000010000100001)

L1=(0000100001000010)

  • GF2 是一個有限體,可理解為所有運算的結果都要 mod 2,所以在這個有限體中的加法運算跟 XOR 是等價的,乘法則跟 AND 等價

yL1

  • y[0]=(00+11+10+00)=1
  • y[1]=(00+10+11+00)=1
  • y[2]=(00+10+10+01)=0
  • y[3]=(00+10+10+00)=0

y^T=(1100)
單位矩陣
I
y
的乘積依然是
y
,最後我們把
y
y^
相加,可以得到
y=(1010)

這時候就完成了一個狀態的轉換。

  • 因矩陣運算具有分配律,因此對
    I+L1
    的結果相乘也會有同樣結果。

關於為什麼一次 xor shift 涵蓋了三次狀態轉換的原因,擷取自論文原文

Unfortunately, when n is 32 or 64, no choices for a and b will
provide such a T with the required order. (A preliminary test on candidates is to square T n times. If the result is not
T , then T cannot have order

2n1. By representing the rows of a binary matrix as a computer word, then xoring all
of the rows for which a (left) multiplying binary vector has 1’s, very fast binary matrix products can be evaluated with
simple C or Fortran programs, and it only takes a few minutes to check every possible a, b in
T=(I+La)(I+Rb).)

However, there are many choices for a, b, c for which the matrices
T=(I+La)(I+Rb)(I+Lc)
have or-
der 2n − 1, when n = 32 or n = 64. There are 81 triples (a, b, c), a < c, for which the 32 × 32 binary matrix
T=(I+La)(I+Rb)(I+Rc)
has order
2321

為了避免週期過早的重複,在 32 或者 64 bits 的整數上才採取三個一組的運算。

接著是透過 xor shift 生成隨機數的函式本體

void xor_rng(uint8_t *buf, size_t n)
{
    static struct {
        u_int64_t x;
        int cnt;
    } state = {.x = 0, .cnt = 7};  // each state x can generate 8 bytes data

    if (__glibc_unlikely(state.x == 0)) {
        int ret = getrandom(&state.x, sizeof(state.x), 0);
        assert(ret);
    }
    size_t offset = 0;
    while (n--) {
        if (state.cnt == 0) {
            xorshift(&state.x);
            state.cnt = 7;
        }
        buf[offset++] = (state.x >> (state.cnt << 3)) & 0xFF;
        state.cnt -= 1;
    }
}

使用 64 bit 時,一次可以產生 8 個 8 位元組的隨機數,為了不要浪費產生的結果而快速抵達新的週期,因此使用一個結構體包裝狀態以及記數 cnt,當記數值歸零的時候,就更新當前的狀態。
另外由於每次要取 8 byte,透過將 state.x 向右位移 8 的倍數次,便可以取得每一個 byte 的狀態。其中 state.cnt << 3 相當於乘 8,也就是說透過 state.cnt 的值決定要取出 state.x 的哪個部份。

另外也將這個新的隨機數生成器整合至洗牌函式中

-        randombytes(buf, sizeof(buf));
+        if (prng == 0)
+            randombytes(buf, sizeof(buf));
+        else
+            xor_rng(buf, sizeof(buf));

使用統計手法比較隨機數品質

在這個地方,我們使用卡方檢定以及熵來比較 linux kernel 中的 PRNG 以及我們實作的 xor shift 是否能達到足夠的亂度(各結果出現的頻率接近且無法預測其順序)。

測試程式取自於作業說明中的 python 程式,測試步驟如下

  1. 啟動 qtest 並建立一個新佇列
  2. 於佇列中插入資料 1, 2, 3, 4
  3. 執行 shuffle 命令共 1000000 次
  4. 統計各結果出現的頻率
  5. 計算卡方數
    X2=i=1n(OiEi)2Ei
    • Oi
      為均勻分佈下各個結果的理論值
    • Ei
      為觀測到的數值
PRNG: /dev/urandom
Expectation:  41666
Observation:  {'1234': 42174, '1243': 41662, '1324': 41774, '1342': 41450, '1423': 41631, '1432': 41652, '2134': 41798, '2143': 41769, '2314': 41722, '2341': 41576, '2413': 41710, '2431': 41407, '3124': 41806, '3142': 41419, '3214': 41607, '3241': 41642, '3412': 41522, '3421': 41709, '4123': 41638, '4132': 41630, '4213': 41813, '4231': 41652, '4312': 41482, '4321': 41755}
chi square sum:  14.37671002736044

PRNG: xor shift
Expectation:  41666
Observation:  {'1234': 41565, '1243': 41704, '1324': 41749, '1342': 41707, '1423': 42060, '1432': 41478, '2134': 41617, '2143': 41700, '2314': 41527, '2341': 41729, '2413': 41397, '2431': 42126, '3124': 41919, '3142': 41602, '3214': 41551, '3241': 41729, '3412': 41990, '3421': 41594, '4123': 41786, '4132': 41779, '4213': 41434, '4231': 41242, '4312': 41769, '4321': 41246}
chi square sum:  27.956607305716886

在繼續進行卡方檢定前,先看兩組數據畫出的直方圖,可以看到資料的分佈大致上是平均的
shuffle_result(dev_urandom)
shuffle_result(xor shift)

我們算出了兩組卡方數為 14.36 以及 27.95,以常見的

α=0.05 作為顯著水準,顯著水準的意義代表 Type 1 error 發生的機率(也可理解為偽陽性)。

此處需要進行查表找到 p value 為多少,p value 代表此次觀察到的結果所發生的紀律,並且與

α 比大小,來判斷是否能夠接受卡方檢定的虛無假說。

我們的測試資料可能的結果共有

4!=24 種,因此自由度為 23

DF\P 0.995 0.975 0.20 0.10 0.05 0.025 0.02 0.01 0.005 0.002 0.001
21 8.034 10.283 26.171 29.615 32.671 35.479 36.343 38.932 41.401 44.522 46.797
22 8.643 10.982 27.301 30.813 33.924 36.781 37.659 40.289 42.796 45.962 48.268
23 9.260 11.689 28.429 32.007 35.172 38.076 38.968 41.638 44.181 47.391 49.728

第一組內建隨機數的卡方數為 14.36,對應的 p 值在 0.975 ~ 0.2 之間,第二組採取 xor shift 的卡方數為 27.95,對應的 p 值同樣在 0.975 ~ 0.2 之間。

因此當顯著水準為 0.05 時,不足以否定此組資料屬於均勻分佈。

另外將同一組資料拿來計算熵,兩者的數值相當接近

Entropy for /dev/urandom: 4.5850 bits
Entropy for xor shift: 4.5849 bits

距離理論值相當接近,因此可以相信此組資料具備一定的亂度。

H=log2(24)4.585 bits

不過根據卡方檢定和熵的計算,不難發現兩者皆在乎最後的頻率,並不在意產生的順序,根據 chatgpt 的建議,這邊引入一樣測試自我相關函數,來驗證隨機生成的序列是否存在特定的模式,也就是

xt
xt+k
是否存在特定的關係。

自我相關函數的計算流程如下

  1. 計算平均數以及變異數
  2. 將原本資料扣掉平均數達到去中心化的效果
  3. 計算長度 k 的子序列中的共變異數
    γk=t=1Nk(xtx¯)(xt+kx¯)
    • 此處的 k 可自己定義大小,通常隨資料量變大或變小
  4. 標準化
    gammak=γkγ0,k=0,1,2,,kmax.
  5. 在給定的信賴區間下,判斷
    γk
    是否大於
    ±1.96N.
    ,N 為樣本數量,如果除了
    γ0
    以外都有在誤差內,那便可以相信此組數據不存在特定的模式。

進一步將結果繪製成圖後可以觀察到所

γk 皆在紅線(標準)附近,因此可判斷兩組 PRNG 產生的亂數都具備一定品質。

  • 此處因為標準化的計算,因此第一筆數值必定為 1.0,可將其忽略
  • x 軸的 Lag 表示的為: 第 N 筆資料與 N+k 筆資料的關係,以我們的資料量 100000 筆來說,假設 k 為 2,計算的就是
    xt
    xt+2
    的差異,並且從
    t=1
    t=1000002

    ACF(dev_urandom)
    ACF(xor shift)