contributed by < ibat10clw >
$ uname -a
Linux henry-ThinkBook-14-G7-IML 6.11.0-17-generic #17~24.04.2-Ubuntu SMP PREEMPT_DYNAMIC Mon Jan 20 22:48:29 UTC 2 x86_64 x86_64 x86_64 GNU/Linux
$ gcc --version
gcc (Ubuntu 13.3.0-6ubuntu2~24.04) 13.3.0
Copyright (C) 2023 Free Software Foundation, Inc.
This is free software; see the source for copying conditions. There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
$ lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Address sizes: 46 bits physical, 48 bits virtual
Byte Order: Little Endian
CPU(s): 22
On-line CPU(s) list: 0-21
Vendor ID: GenuineIntel
Model name: Intel(R) Core(TM) Ultra 7 155H
CPU family: 6
Model: 170
Thread(s) per core: 2
Core(s) per socket: 16
Socket(s): 1
Stepping: 4
CPU(s) scaling MHz: 23%
CPU max MHz: 4800.0000
CPU min MHz: 400.0000
BogoMIPS: 5990.40
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge m
ca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 s
s ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc
art arch_perfmon pebs bts rep_good nopl xtopology nons
top_tsc cpuid aperfmperf tsc_known_freq pni pclmulqdq
dtes64 monitor ds_cpl vmx smx est tm2 ssse3 sdbg fma c
x16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt t
sc_deadline_timer aes xsave avx f16c rdrand lahf_lm ab
m 3dnowprefetch cpuid_fault epb intel_ppin ssbd ibrs i
bpb stibp ibrs_enhanced tpr_shadow flexpriority ept vp
id ept_ad fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms
invpcid rdseed adx smap clflushopt clwb intel_pt sha_
ni xsaveopt xsavec xgetbv1 xsaves split_lock_detect us
er_shstk avx_vnni dtherm ida arat pln pts hwp hwp_noti
fy hwp_act_window hwp_epp hwp_pkg_req hfi vnmi umip pk
u ospke waitpkg gfni vaes vpclmulqdq rdpid bus_lock_de
tect movdiri movdir64b fsrm md_clear serialize arch_lb
r ibt flush_l1d arch_capabilities
Virtualization features:
Virtualization: VT-x
Caches (sum of all):
L1d: 544 KiB (14 instances)
L1i: 896 KiB (14 instances)
L2: 18 MiB (9 instances)
L3: 24 MiB (1 instance)
NUMA:
NUMA node(s): 1
NUMA node0 CPU(s): 0-21
Vulnerabilities:
Gather data sampling: Not affected
Itlb multihit: Not affected
L1tf: Not affected
Mds: Not affected
Meltdown: Not affected
Mmio stale data: Not affected
Reg file data sampling: Not affected
Retbleed: Not affected
Spec rstack overflow: Not affected
Spec store bypass: Mitigation; Speculative Store Bypass disabled via prct
l
Spectre v1: Mitigation; usercopy/swapgs barriers and __user pointe
r sanitization
Spectre v2: Mitigation; Enhanced / Automatic IBRS; IBPB conditiona
l; RSB filling; PBRSB-eIBRS Not affected; BHI BHI_DIS_
S
Srbds: Not affected
Tsx async abort: Not affected
在本次作業需以環狀雙向鏈結串列實作佇列,其中環狀雙向鍊結串列與 Linux 核心中採取的設計一致,將存放指標的結構體 struct list_head
嵌入實際存放資料的結構體 element_t
中。
q_new
: 建立新的「空」佇列;此函式之功能為建立一個空佇列,須注意空佇列代表佇列本身是存在的,但是佇列中沒有任何資料。
在給定的函式定義中,將 struct list_head *
作為回傳值,可得知此處需要透過 malloc
來分配 struct list_head
所需之記憶體空間,才得以在函式結束時仍保留記憶體資訊。
struct list_head *q_new()
{
struct list_head *q = (struct list_head *) malloc(sizeof(struct list_head));
if (!q)
return NULL;
INIT_LIST_HEAD(q);
return q;
}
INIT_LIST_HEAD
函式的功能為將傳入的參數 list_head
其兩個指標都指向自身。對照 list_empty
函式可得知當 head->next == next
時,代表此串列無任何節點,亦可代表佇列為空的狀態。
透過 qtest
預期可以看到下列結果
$ ./qtest
cmd> show
l = NULL
cmd> new
l = []
cmd> show
Current queue ID: 0
l = []
q_insert_head
: 在佇列開頭 (head) 插入 (insert) 給定的新節點 (以 LIFO 準則);完成佇列之建立後,接下來實作與加入資料相關的函式。
在給定的函式定義中,此函式傳入兩個參數,分別代表要操作的佇列以及欲加入佇列之字串資料。
bool q_insert_head(struct list_head *head, char *s)
{
if (!head)
return false;
element_t *node = (element_t *) malloc(sizeof(element_t));
if (!node)
return false;
node->value = strdup(s);
if (!node->value) {
free(node);
return false;
}
list_add(&node->list, head);
return true;
}
在這個地方可以透過 strdup
一個函式就完成 buffer 的分配以及字串資料的複製,此外考慮傳入字串可能很長,若沒有先確定串列節點有分配成功,就進行字串相關處理,則可能產生浪費,因此在此函式中會在串列節點分配完成後才進行字串相關的操作。
透過 qtest
確認函式之正確性,預期可看到以下結果
$ ./qtest
cmd> new
l = []
cmd> ih linux
l = [linux]
cmd> ih kernel
l = [kernel linux]
q_insert_tail
: 在佇列尾端 (tail) 插入 (insert) 給定的新節點 (以 FIFO 準則);此函式與 q_insert_head
的邏輯相似,僅需將最後使用的 list_add
調整成 list_add_tail
即可。
bool q_insert_tail(struct list_head *head, char *s)
{
... // Same as q_insert_head
list_add_tail(&node->list, head);
return true;
}
透過 qtest
確認函式之正確性,預期可看到以下結果
$ ./qtest
cmd> new
l = []
cmd> ih linux
l = [linux]
cmd> it kernel
l = [linux kernel]
q_size
: 計算目前佇列的節點總量;此函式功能為計算佇列之長度。
由於 struct list_head
中並未有紀錄長度之變數,因此計算長度需實際走訪串列,此處可以使用 list_for_each
的巨集來走訪串列,查看 list.h
中可以得知此巨集傳入一參數 node
作為 iterator 使用,並且將其初始化成 head->next
,而終止條件為 node != head
,所以進到迴圈的次數會跟實際節點存在的數量等,在每一個回合將 sz
遞增即可計算出結果。
int q_size(struct list_head *head)
{
if (!head)
return -1;
int sz = 0;
struct list_head *it;
list_for_each(it, head)
sz++;
return sz;
}
透過 qtest
確認函式之正確性,預期可看到以下結果
$ ./qtest
cmd> new
l = []
cmd> ih linux 500
...
cmd> size
Queue size = 500
...
cmd> it kernel 500
...
cmd> size
Queue size = 1000
...
q_free
: 釋放佇列所佔用的記憶體;此函式之功能為釋放傳入佇列所佔用的記憶體,在佇列的建立以及節點新增的過程中,涉及了 struct list_head
, element_t
以及 element_t->value
三處的記憶體分配,在釋放整個佇列時需回收這三部份的空間。
需注意不要造成 Use-After-Free (UAF) 的狀況,因此釋放的順序應當由內向外進行釋放。
void q_free(struct list_head *head)
{
if (!head)
return;
element_t *it, *tmp;
list_for_each_entry_safe (it, tmp, head, list) {
free(it->value);
free(it);
}
free(head);
}
q_remove_head
: 在佇列開頭 (head) 移去 (remove) 給定的節點;此函式之功能為移除佇列開頭之節點,並且當傳入參數 sp
不為 NULL 時,將節點上之字串複製至 sp
。
使用 list_del_init
將佇列開頭之節點從佇列中移除。
關於字串複製的部份要避免 overflow 的發生,考慮 bufsize
可能小於節點上字串之長度,因此此處需要判斷要複製多少位元組的資料到 sp
裡面。當字串長度小於 bufsize - 1
時,我們可以將字串完整複製至 sp
中,並在最後加入 null terminator,反之若字串長度大於 bufsize - 1
時,將其 truncate 至 bufsize - 1
後,同樣在最後加入 null terminator。
element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize)
{
if (!head || list_empty(head))
return NULL;
struct list_head *lh = head->next;
element_t *ele = list_entry(lh, element_t, list);
list_del_init(lh);
if (sp) {
size_t sz = min(strlen(ele->value), bufsize - 1);
memcpy(sp, ele->value, sz);
sp[sz] = '\0';
}
return ele;
}
在此我們需要實作用於判斷兩數字間大小關係的 min
函式,此處採取 branchless 之設計,以減少分支預測失敗帶來的效能損失。參考來源
static inline int min(int a, int b) {
return b ^ ((a ^ b) & -(a < b));
}
此函式透過 XOR 的性質,當 a < b
成立時,-(a < b)
相當於 0xFFFFFFFF
,代表括號中的 a ^ b
會被完整保留,最後外層的 XOR 運算時 b
會被抵銷掉,於是剩下 a
。
若 a < b
不成立,則 a ^ b
會被 bitwise AND 給清空成 0,最後與外層 XOR 運算時,可留下 b
。
另外此處不直接使用 scrncpy
搭配 bufsize
的考量為根據 man page 中的敘述 If the source has too few non-null bytes to fill the destination, the functions pad the destination with trailing null bytes.
,如果要複製的的 src
不足的話,會有填充 null bytes 的行為,但是如果要把 sp
當成字串解釋,只需要確保在正確的位置有一個 \0
即可。
q_remove_tail
: 在佇列尾端 (tail) 移去 (remove) 給定的節點;此函式與 q_remove_head
的邏輯相似,僅需將 list_del_init
的對象由 head->next
調整成 head->prev
即可。
element_t *q_remove_tail(struct list_head *head, char *sp, size_t bufsize)
{
... // Same as q_remove_head
struct list_head *lt = head->prev;
... // Same as q_remove_head
}
q_sort
q_sort
需根據傳入的參數來決定是遞增排序還是遞減排序,因此這邊先實作比較函式
static inline bool str_cmp_asc(const char *a, const char *b)
{
return strcmp(a, b) <= 0;
}
static inline bool str_cmp_dsc(const char *a, const char *b)
{
return strcmp(a, b) >= 0;
}
根據 strcmp
的 man page
0, if the s1 and s2 are equal;
a negative value if s1 is less than s2;
a positive value if s1 is greater than s2.
當 s1
的字典序小於 s2
時,回傳一個負數,在此 str_cmp_asc
用 <= 0
來將結果轉換為布林值,反之亦然。這兩個函式稍後會在 q_sort
中根據參數 descend
賦值給函式指標,這樣可以在 q_sort
呼叫時做一次,讓後續比較都可以使用到對應的函式。
接下來是排序演算法的選擇,在此我選擇實作 mergesort,首先是前置處理的部份,我將雙向環狀鍊結串列的環狀結構破壞掉,考慮 q_sort
傳入的參數 head
不帶有資料,因此也將其暫時移除,等待排序結束後再進行恢復。
void q_sort(struct list_head *head, bool descend) {
...
struct list_head *tmp = head->next;
bool (*cmp)(const char *a, const char *b) = descend ? str_cmp_dsc : str_cmp_asc;
head->prev->next = NULL;
head->next = NULL;
struct list_head *sorted = mergesort(tmp, cmp);
...
}
mergesort
在 mergesort
的實作中參考了 你所不知道的 C 語言: linked list 和非連續記憶體 中提及的快慢指標找串列中點的技巧,找出串列的中點,並且對串列進行分割。
struct list_head *mergesort(struct list_head *head, bool (*cmp)(const char *a, const char *b))
{
if (!head->next) {
return head;
}
struct list_head *fast = head;
struct list_head *slow = head;
while (fast->next && fast->next->next) {
fast = fast->next->next;
slow = slow->next;
}
struct list_head *mid = slow->next;
slow->next = NULL;
struct list_head *left = mergesort(head, cmp);
struct list_head *right = mergesort(mid, cmp);
return merge(left, right, cmp);
}
在 merge
的部份同樣參考了你所不知道的 C 語言: linked list 和非連續記憶體中提及的技巧,透過 indirect pointer 來簡化程式邏輯。
在 merge 的階段中暫時不處理 prev
的指標,等待排序完成後再一次進行處理。
struct list_head *merge(struct list_head *l1, struct list_head *l2, bool (*cmp)(const char *a, const char *b)) {
struct list_head *head = NULL, **ptr = &head, **node;
for (node = NULL; l1 && l2; *node = (*node)->next) {
node = cmp(list_entry(l1, element_t, list)->value,
list_entry(l2, element_t, list)->value)
? &l1
: &l2;
*ptr = *node;
ptr = &(*ptr)->next;
}
*ptr = l1 ? l1 : l2;
return head;
}
此處共有三個指標,先說明 head
以及 ptr
的作用,ptr
作為指向 head
指標的指標,可以透過解引用對 head
進行賦值,這樣就可以避免宣告或者分配一個 dummy 節點的成本,當 head
已經不為 NULL 後(也就是第一次進入迴圈時),可再將 ptr
的值向前移動。
node
也是 indirect pointer,他的作用是決定當前要從 l1
還是 l2
取出節點,此處便可用上 cmp
這個函式指標作判斷,將 ptr
指向對應目標,不論選擇的是 l1
還是 l2
,都可以透過解引用將 node
賦值給 ptr
,再將 ptr
給向前移動,由於 ptr
會指向 l1
或 l2
其一,因此這個行為相當於將 l1
或 l2
已經掛上的節點給去除。
透過上述的實作,我們已經能獲得一個排序過的串列,可以透過 next
來按照升序或者降序進行拜訪,剩餘的部份則是要恢復環狀結構,以及處理 merge
階段沒有做的 prev
指標的部份。
void q_sort(struct list_head *head, bool descend) {
...
// Sort the list and ignore prev link in this stage
struct list_head *sorted = mergesort(tmp, cmp);
// Recover previous link and stop at last node
struct list_head *current = sorted;
while (current->next) {
current->next->prev = current;
current = current->next;
}
// Recover circular structure
sorted->prev = head;
head->next = sorted;
head->prev = current;
current->next = head;
}
在此使用 mergesort
的回傳值作為起點,開始依序拜訪串列上的節點,並將當前節點的下一個節點的 prev
指向自己,之後向前移動,此處的 while
迴圈會停留在最後一個節點。
這恰好能作為恢復環狀結構的依據,只要將 head
與首與個節點和末個節點進行串接,雙向環狀鍊結串列的排序即可完成。
q_delete_mid
此函式的功能為移除並釋放串列的中點。
bool q_delete_mid(struct list_head *head)
{
if (!head || list_empty(head))
return false;
struct list_head *slow = head->next, *fast = head->next->next;
while (fast != head && fast->next != head) {
slow = slow->next;
fast = fast->next->next;
}
element_t *tmp = list_entry(slow, element_t, list);
list_del(slow);
free(tmp->value);
free(tmp);
return true;
}
如先前提到的快慢指標的技巧,得以找出位於中間的位置,假設串列長度為 n,while
迴圈執行 slow
可以停在第
q_reverse
此函式的功能為將雙向環狀鍊結串列的順序反轉。
void q_reverse(struct list_head *head) {
struct list_head* cur = head->next;
struct list_head *tmp;
while (cur != head) {
tmp = cur->next;
cur->next = cur->prev;
cur->prev = tmp;
cur = tmp;
}
tmp = head->next;
head->next = head->prev;
head->prev = tmp;
}
在雙向鍊結串列中,只要依序將 next
以及 prev
指標對調,即可完成反轉。
q_swap
此函式功能為將鄰近串列之節點進行交換,考慮此處節點之資料型態為字串,如果單純地交換節點字串的話,當字串長度長的時候會造成負擔,因此此處實作應該採取交換指標的方式。
查看 linux kernel 的 list.h
中可以發現有 list_swap
的實作
static inline void list_replace(struct list_head *old,
struct list_head *new)
{
new->next = old->next;
new->next->prev = new;
new->prev = old->prev;
new->prev->next = new;
}
static inline void list_swap(struct list_head *entry1,
struct list_head *entry2)
{
struct list_head *pos = entry2->prev;
list_del(entry2);
list_replace(entry1, entry2);
if (pos == entry1)
pos = entry2;
list_add(entry1, pos);
}
直接使用這兩個函式固然能有正確結果,但這兩個函式並不只為了處理相鄰節點的交換,因此這裡稍作修改,加入 __glibc_unlikely(pos == entry1)
輔助編譯器進行優化,並且在 q_swap
中確保 list_swap
傳入的參數符合 unlikely
的假設。
void q_swap(struct list_head *head)
{
// https://leetcode.com/problems/swap-nodes-in-pairs/
struct list_head *cur = head->next;
while (cur != head && cur->next != head) {
struct list_head *tmp = cur->next->next;
list_swap(cur->next, cur);
cur = tmp;
}
}
q_delete_dup
此函式要求刪除串列中所有重複的數值,在此假設串列以為排序狀態。
由於串列為排序狀態,因此若有出現重複的情況,則兩節點必定相鄰,透過這樣的思維去比較當前節點與下一個節點的字串是否相同來決定要不要刪除節點。
由於此處需要頻繁地取得下一個節點的資料,此處引入 linux kernel 中的巨集 list_next_entry
來獲取下一個節點的資料。
#define list_next_entry(pos, member) \
list_entry((pos)->member.next, typeof(*(pos)), member)
bool q_delete_dup(struct list_head *head)
{
if (!head || list_is_singular(head) || list_empty(head))
return false;
// https://leetcode.com/problems/remove-duplicates-from-sorted-list-ii/
struct list_head *cur = head->next;
while (cur != head && cur->next != head) {
element_t *e1= list_entry(cur, element_t, list);
element_t *e2 = list_next_entry(e1, list);
bool flag = 0;
while (cur->next != head && !strcmp(e1->value, e2->value)) {
list_del(cur->next);
free(e2->value);
free(e2);
flag = 1;
e2 = list_next_entry(e1, list);
}
struct list_head *tmp = cur->next;
if (flag) {
list_del(cur);
free(e1->value);
free(e1);
}
cur = tmp;
}
return true;
}
當發現重複節點時,就先固定的位置 cur
,並且不斷刪除重複的節點,直到 cur->next
不再重複,或者已到達串列的結尾。
查看 glibc 的實作,strcmp
必須逐個 byte 去比較
https://github.com/zerovm/glibc/blob/master/string/strcmp.c
do
{
c1 = (unsigned char) *s1++;
c2 = (unsigned char) *s2++;
if (c1 == '\0')
return c1 - c2;
} while (c1 == c2);
當 strcmp
的最糟情況發生時,代表整個 e1
紀錄的字串都會看過。如果在過程中紀錄 e1
字串的長度,並且改用如memcmp
這種一次可比較不只一個 byte 的函式是否能有更好的效能。
q_reversek
此函式將串列中 k 個節點視為一個群組,並且將這個群組進行反轉。
此處可善用先前實作過的 reverse
來輔助完成。
void q_reverseK(struct list_head *head, int k)
{
// https://leetcode.com/problems/reverse-nodes-in-k-group/
if (k <= 1 || !head || list_empty(head) || list_is_singular(head))
return;
struct list_head *cur, *tmp;
struct list_head *sub_head = head;
unsigned int count = 0;
list_for_each_safe (cur, tmp, head) {
count++;
if (count == k) {
struct list_head sublist;
INIT_LIST_HEAD(&sublist);
list_cut_position(&sublist, sub_head, cur);
q_reverse(&sublist);
list_splice(&sublist, tmp->prev);
count = 0;
sub_head = tmp->prev;
}
}
}
此處透過 list_for_each_safe
走訪串列,並當計數達到 k 的時候,透過 list_cut_position
巨集截出子串列,再使用已實作的 q_reverse
反轉此部份,最後再將結果透過 list_splice
加入至原先的位置。
要注意的是 list_cut_position
的第二個參數 head_from
預設傳入的是不帶資料的串列開頭,因此實際切割的範圍會是 head_from->next
至 node
,所以此處 sub_head
初始化為 head
並且後續更新時會將位置設在實際想切割的節點的前一個位置。
list_splice
的行為也類似,他假設傳入的是串列開頭,因此 tmp
雖是 cur
的下一個節點,但是依然使用 tmp->prev
作為參數。
q_ascend
, q_descend
這兩個函式取自於 Leetcode 2487. Remove Nodes From Linked List,題目要透過刪除特定節點使串列變為遞增或者遞減的排序狀態。
以 q_ascend
為例,對於串列中任意節點,只要在其右側存在比自己小的元素,那就要將該元素刪除。由於 queue.c
中的串列為雙向環狀鍊結串列,可以在給定開頭節點的情況下,反向拜訪串列。若透過 next
指標拜訪串列為遞增,則反向拜訪時必須要呈現遞減,可以利用這個特性來解題。
int q_ascend(struct list_head *head)
{
element_t *cur, *tmp;
const element_t *min_node = list_last_entry(head, element_t, list);
list_for_each_entry_prev_safe(cur, tmp, head, list)
{
if (strcmp(cur->value, min_node->value) > 0) {
list_del(&cur->list);
free(cur->value);
free(cur);
} else {
min_node = cur;
}
}
return q_size(head);
}
此處加入 list_for_each_entry_prev_safe
巨集來簡化程式碼複雜度並反向拜訪串列。
此處預期串列節點的字典序要越看越小,在進入迴圈前先將 min_node
設為串列尾巴的節點,當 cur
的字典序大於 min_node
時,就將 cur
給刪除,否則就更新 min_node
的資訊。
q_merge
此函式功能為將數個以排序的佇列合併成一個佇列(須維持排序狀態),此函式有個比較不一樣的地方是參數 list_head
是一個定義在 q_test.c
中的變數 chain
這個串列的開頭(串列上的元素為 q_context_t
),我們需要用這個開頭來獲得所有的佇列,並將其合併。
typedef struct {
struct list_head head;
int size;
} queue_chain_t;
static queue_chain_t chain = {.size = 0};
static queue_contex_t *current = NULL;
//q_test.c: do_new
if (exception_setup(true)) {
queue_contex_t *qctx = malloc(sizeof(queue_contex_t));
list_add_tail(&qctx->chain, &chain.head);
qctx->size = 0;
qctx->q = q_new();
qctx->id = chain.size++;
current = qctx;
}
//q_test.c: do_merge
if (current && exception_setup(true))
len = q_merge(&chain.head, descend);
上述程式碼中顯示了 q_test
是如何在 new
命令時,將一個新的佇列加入至 chain
上,q_test
中除了 do_merge
以外的函式在呼叫 queue.c
中的則是將 current->q
作為參數。
int q_merge(struct list_head *head, bool descend)
{
queue_contex_t *cur;
bool (*cmp)(const char *a, const char *b) =
descend ? str_cmp_dsc : str_cmp_asc;
struct list_head *lh = NULL;
list_for_each_entry (cur, head, chain) {
if (!lh) {
lh = cur->q;
lh->prev->next = NULL;
} else {
cur->q->prev->next = NULL;
lh->next = merge(lh->next, cur->q->next, cmp);
INIT_LIST_HEAD(cur->q);
}
}
struct list_head *tmp = lh->next, *prev = lh;
while (tmp) {
tmp->prev = prev;
prev = tmp;
tmp = tmp->next;
}
lh->prev = prev;
prev->next = lh;
return q_size(lh);
}
善用先前在 mergesort
實作的 merge
函式,來將 chain
上所有佇列都合併到 chain
上的第一個佇列。
這邊的原因是 q_test
中的 do_merge
有一段程式會將 chain
上的佇列釋放掉。
struct list_head *cur = chain.head.next->next;
while ((uintptr_t) cur != (uintptr_t) &chain.head) {
queue_contex_t *ctx = list_entry(cur, queue_contex_t, chain);
cur = cur->next;
q_free(ctx->q);
free(ctx);
}
因此為了確保所有含有字串的節點能夠保留,必須選擇將節點掛在 chain
的第一個佇列上,同時這也是 merge(lh->next, cur->q->next, cmp);
完成後需要將 cur->q
初始化的原因,也是為了避免這段程式碼的 q_free
執行時發生錯誤。
這篇論文提出了一項稱為 dudect
的工具,可用於檢測程式的執行時間是否為常數時間,dudect
透過直接執行程式的方式作為統計資料的來源。
dudect
工具的執行流程可以分為下面三個步驟
來判定程式的執行時間是否為常數。
在量測時間的部份,首先我們需要定義兩組類別的輸入,分別為 fixed value 以及 random value。在 fixed value 的部份,論文提到可以選擇可觸發 special corner-case 的內容作為輸入。
接著是量測時間的方式,根據現代 CPU 提供的 cycle counter,例如 X86 架構的 TSC 或者 ARM 架構的 systick,透過這些裝置取得的數值,可以精準量測執行時間。
最後是去除環境干擾的部份,考慮測試環境並不是特別在裸機環境中測試,於作業系統環境中量測時間時,可能會受到許多干擾(排程器的上下文切換(context switch)、資源分配以及系統中斷等諸多元素),因此每一輪要量測的項目必須從 fixed 或者 random 中隨機選擇一組。
關於測試資料選擇帶來的影響在 Statistics and Secret Leakage論文中有更詳細的說明。
在作資料分析的時候,如何去除不具代表性的離群值是一個對分析結果至關重要的因素。
論文中提到說對於量測時間通常會呈現正偏態(positive skewed)的形式,如下圖所示
正偏態指的是大多數的資料會集中在左側,代表未受干擾的執行時間,而受到干擾的執行時間在此應屬於小部份的資料,圖中的 x 軸表示的為執行時間,y 軸則代表出現的頻率(frequency)。
對於去除離群值的手法,論文中提到兩種方式,分別為
這個階段主要是對蒐集到的兩組執行時間資料做檢定,已判斷隨機輸入的執行時間是否跟固定輸入的執行時間在分佈相等。
假設檢定的方法有很多,論文中提到兩種,這邊著重在 Welch’s t-test 上。
假設檢定的流程主要如下,由研究者提出一段虛無假說(Null hypothesis,記為
根據 Welch's t-test 我們先計算兩組測試資料的平均數以及變異數,此處的
接著我們根據公式以及先前的結果計算出 t 值,我們可以觀察一下公式中的數字對於 t 值結果的影響,當兩組數據平均值差異很大時,t 值會變大(或變小)
當 t 值越接近零時,代表兩組數據的差異性越小,到此已經可以根據先前定義的
dudect
的實作首先我們從 qtest.c
中可以看到幾個函式
is_remove_tail_const
, is_remove_head_const
, is_insert_tail_const
和 is_insert_head_const
不過如果使用 grep is_remove_tail_const -nr
會發現找不到函式的定義在哪裡,原因是這些函式是由巨集輔助定義而來。
首先從 fixture.h
中找到一個巨集定義 _
,傳入一個參數 x
,並且展開為 bool is_##x##_const(void);
/* define in consant.h
#define DUT_FUNCS \
_(insert_head) \
_(insert_tail) \
_(remove_head) \
_(remove_tail)
*/
#define _(x) bool is_##x##_const(void);
DUT_FUNCS
#undef _
此處可參閱 C 語言規格書 6.10.3.3 對於 ##
運算子的說明
For both object-like and function-like macro invocations, before the replacement list is reexamined for more macro names to replace, each instance of a ## preprocessing token in the replacement list (not from an argument) is deleted and the preceding preprocessing token is concatenated with the following preprocessing token. Placemarker preprocessing tokens are handled specially: concatenation of two placemarkers results in a single placemarker preprocessing token, and concatenation of a placemarker with a non-placemarker preprocessing token results in the non-placemarker preprocessing token. If the result is not a valid preprocessing token, the behavior is undefined. The resulting token is available for further macro replacement. The order of evaluation of ## operators is unspecified.
其中提到,當要被替換的參數前後方都跟著 ##
運算子的話,那前置處理器就會在文字替換完成後將他們拼接在一起。
由此處可以得知,此段巨集被展開後會成為 bool is_remove_tail_const(void)
等等,也就是我們在 qtest.c
中所看到的函式名稱。
接著看 constant.h
中的巨集,也用到了類似的技巧
#define DUT_FUNCS \
_(insert_head) \
_(insert_tail) \
_(remove_head) \
_(remove_tail)
#define DUT(x) DUT_##x
enum {
#define _(x) DUT(x),
DUT_FUNCS
#undef _
};
巨集 DUT_FUNCS
會展開成_(is_insert_head)
等一系列的內容,與 fixture.h
的巨集搭配,則可以產生出函式的定義bool is_insert_head_const(void);
等一系列定義
接著再次使用 DUT_FUNCS
達成 enum
的定義,可以看到在這裡面有一個 _
的定義,會被替換成 DUT(x),
,而 DUT(x)
又會被替換成 DUT_x
根據傳入的 x 可以產生出不同的拼接效果。
enum
的內部使用了 DUT_FUNCS
得到 _(is_insert_head)
等等的內容,在此就會被替換成 DUT_insert_head,
等等內容,由這些內容組成最終的 enum
在 chiangkd 的筆記中看到老師的留言,得知這個技巧稱為 X_macro,意指同一個 _
定義在不同上下文中產生不同展開的效果。
最後查看 fixture.c
中,可以看到下列定義
#define DUT_FUNC_IMPL(op) \
bool is_##op##_const(void) { return test_const(#op, DUT(op)); }
#define _(x) DUT_FUNC_IMPL(x)
DUT_FUNCS
#undef _
此處的 _
會將 DUT_FUNCS
展開成函式的實作
bool is_insert_head_const(void) { return test_const("insert_head", DUT_insert_head); }
到此便得知了 is_xxx_const
這系列函式實際上是呼叫 test_const
來完成相關的操作。
查看 test_const
函式,可以發現會進行 TEST_TRIES
輪的測試,每輪測試將蒐集 ENOUGH_MEASURE / (N_MEASURES - DROP_SIZE * 2)
進行測試。
// in fixture.c
static t_context_t *t
static bool test_const(char *text, int mode)
{
...
t = malloc(sizeof(t_context_t));
for (int cnt = 0; cnt < TEST_TRIES; ++cnt) {
init_once();
for (int i = 0; i < ENOUGH_MEASURE / (N_MEASURES - DROP_SIZE * 2) + 1;
++i)
result = doit(mode);
if (result)
break;
}
...
}
// in constant.c
static struct list_head *l = NULL;
迴圈一開始透過 init_once
初始化 t
和 l
這兩個變數,將 l
修改為 NULL,以及將 t_context_t
的內容初始化為 0。
typedef struct {
double mean[2];
double m2[2];
double n[2];
} t_context_t;
接著查看 doit
函式的內容
static bool doit(int mode)
{
// 分配相關記憶體資源
...
if (!before_ticks || !after_ticks || !exec_times || !classes ||
!input_data) {
die();
}
prepare_inputs(input_data, classes);
bool ret = measure(before_ticks, after_ticks, input_data, mode);
differentiate(exec_times, before_ticks, after_ticks);
update_statistics(exec_times, classes);
ret &= report();
// 釋放相關資源
...
return ret;
}
此處又有幾個關鍵的函式,首先為 prepare_inputs
對應論文方法的第一階段
void prepare_inputs(uint8_t *input_data, uint8_t *classes)
{
randombytes(input_data, N_MEASURES * CHUNK_SIZE);
for (size_t i = 0; i < N_MEASURES; i++) {
classes[i] = randombit();
if (classes[i] == 0)
memset(input_data + (size_t) i * CHUNK_SIZE, 0, CHUNK_SIZE);
}
for (size_t i = 0; i < N_MEASURES; ++i) {
/* Generate random string */
randombytes((uint8_t *) random_string[i], 7);
random_string[i][7] = 0;
}
}
這邊首先將 input_data
整塊初始化為隨機生成的資料,再來根據論文所敘述每一輪測試應該要隨機挑選 fixed value 或是 random value,因此再次獲得一個隨機的 bit 來決定該輪的測試資料要使用哪一種類型,如果此值為 0 就將 input_data 對應的位移量(offset)設成 0。
input_data
也產生 N_MEASURES
組的隨機字串供後續使用。
接著是 measure
的部份,裡面有一個 switch
包含 default 共對應了 5 個 case,但考慮前面有 assert
,且每個 case 都含有 break
敘述,因此 default
應該是不會執行到,另外 inset
和 remove
各有兩組這邊各留下一組作為說明。
bool measure(int64_t *before_ticks,
int64_t *after_ticks,
uint8_t *input_data,
int mode)
{
assert(mode == DUT(insert_head) || mode == DUT(insert_tail) ||
mode == DUT(remove_head) || mode == DUT(remove_tail));
switch (mode) {
case DUT(insert_head):
for (size_t i = DROP_SIZE; i < N_MEASURES - DROP_SIZE; i++) {
char *s = get_random_string();
dut_new();
dut_insert_head(
get_random_string(),
*(uint16_t *) (input_data + i * CHUNK_SIZE) % 10000);
int before_size = q_size(l);
before_ticks[i] = cpucycles();
dut_insert_head(s, 1);
after_ticks[i] = cpucycles();
int after_size = q_size(l);
dut_free();
if (before_size != after_size - 1)
return false;
}
break;
case DUT(remove_tail):
for (size_t i = DROP_SIZE; i < N_MEASURES - DROP_SIZE; i++) {
dut_new();
dut_insert_head(
get_random_string(),
*(uint16_t *) (input_data + i * CHUNK_SIZE) % 10000 + 1);
int before_size = q_size(l);
before_ticks[i] = cpucycles();
element_t *e = q_remove_tail(l, NULL, 0);
after_ticks[i] = cpucycles();
int after_size = q_size(l);
if (e)
q_release_element(e);
dut_free();
if (before_size != after_size + 1)
return false;
}
break;
}
此處有個問題是為何迴圈是從 DROP_SIZE
開始算起,不應該是跑完後才決定有多少離群值應該要刪除嗎,查看 dudect
論文的原版沒有看到這樣的行為,先紀錄下此狀況。
static void measure(dudect_ctx_t *ctx) {
for (size_t i = 0; i < ctx->config->number_measurements; i++) {
ctx->ticks[i] = cpucycles();
do_one_computation(ctx->input_data + i * ctx->config->chunk_size);
}
for (size_t i = 0; i < ctx->config->number_measurements-1; i++) {
ctx->exec_times[i] = ctx->ticks[i+1] - ctx->ticks[i];
}
}
從 insert_head
和 remove_tail
的共同處檢視
dut_new
: 用於呼叫 q_new()
產生一個新的佇列dut_insert_head
: 此處為巨集定義,展開後如下
do {
int j = *(uint16_t *) (input_data + i * 2) % 10000;
while (j--)
q_insert_head(l, get_random_string());
} while (0)
input_data
中隨機取了一個大小 2 位元組的無號整數,用於將佇列的長度調整為 j
,並且此處輸入的皆是隨機產生的字串dut_free
: 用於呼叫 q_free()
釋放測試用的佇列測試程式主要的部份如下,開始測試前紀錄佇列目前的大小以及當前的 cpu cycles,然後呼叫待測函式,在函式結束後再次讀取 cpu cycles,之後可以用兩值差異作為此次執行時間之依據。
int before_size = q_size(l);
before_ticks[i] = cpucycles();
dut_insert_tail(s, 1);
after_ticks[i] = cpucycles();
int after_size = q_size(l);
dut_free();
if (before_size != after_size - 1)
return false;
若待測函式執行後,佇列大小不如預期則直接 return
,在 remove_tail
則多了一個將取得的 element_t
給釋放的動作
在此發現第二個問題,隨機測資的機制似乎不完善,在 insert
中永遠都使用隨機字串,而在 remove
的案例中的隨機變數應該設為是否將字串從佇列中複製出來。
接下來的 differentiate(exec_times, before_ticks, after_ticks);
函式計算每一筆 cpu cycles 前後的差異,並且將結果存在 exec_times
中。
再來的 update_statistics(exec_times, classes);
則是將每一筆量測到的執行時間拿來計算 t test 所需要的統計量,樣本平均數以及樣本變異數(函式 t_push
負責計算)。注意此處缺少了去除離群值的實作,將帶有誤差的資料拿來計算可能是我們無法通過評分的原因之一。
最後的 report
函式負責判斷此次測試的結果是否為常數時間
tatic bool report(void)
{
double max_t = fabs(t_compute(t));
double number_traces_max_t = t->n[0] + t->n[1];
double max_tau = max_t / sqrt(number_traces_max_t);
printf("\033[A\033[2K");
printf("meas: %7.2lf M, ", (number_traces_max_t / 1e6));
if (number_traces_max_t < ENOUGH_MEASURE) {
printf("not enough measurements (%.0f still to go).\n",
ENOUGH_MEASURE - number_traces_max_t);
return false;
}
printf("max t: %+7.2f, max tau: %.2e, (5/tau)^2: %.2e.\n", max_t, max_tau,
(double) (5 * 5) / (double) (max_tau * max_tau));
/* Definitely not constant time */
if (max_t > t_threshold_bananas)
return false;
/* Probably not constant time. */
if (max_t > t_threshold_moderate)
return false;
/* For the moment, maybe constant time. */
return true;
}
注意此處在尚未獲得足夠多的測試資料時,會直接離開函式,應該先行判斷測資數量是否充足,再進行後續的操作。
統整一下目前的執行流程,從 q_test.c
中的 queue_XXX
作為函式呼叫的起點
queue_XXX(position_t pos, int argc, char *argv[])
=> is_XXX_const
=> test_const("XXX", DUT_XXX)
=> doit(DUT_XXX)
當 doit(DUT_XXX)
執行完成後便以獲得統計檢驗的結果,此結果一路回傳至 q_test.c
中的函式,並且根據結果印出是否通過測試。
measure
函式取得測資的計數有誤commit f9837fc
在 dudect 的原實作中,可以看到他將取得的執行時間資料的前幾筆給丟棄,不參與統計量的計算,但在 lab0-c 中卻是在量測執行時間時的迴圈用 DROP_SIZE
扣掉要量測的次數
bool measure(int64_t *before_ticks,
case DUT(insert_tail):
- for (size_t i = DROP_SIZE; i < N_MEASURES - DROP_SIZE; i++) {
+ for (size_t i = 0; i < N_MEASURES; i++) {
char *s = get_random_string();
dut_new();
dut_insert_head(
static void update_statistics(const int64_t *exec_times, uint8_t *classes)
{
+ for (size_t i = DROP_SIZE; i < N_MEASURES; i++) {
在此我們固定在 measure
內量測 N_MEASURES
次,並且在計算統計量時才去除前 DROP_SIZE
筆資料。
update_statistics
沒有去除離群值commit f9837fc
在 dudect 的論文方法中,有提到剪裁(cropping)的資料後處理手法,dudect 的實作中將每一筆執行時間的資料分為 102 種不同的 case 處理,已確保在不同資料處理的狀況下,量測函式都要能通過 t-test
#define DUDECT_NUMBER_PERCENTILES 100
static void update_statistics(dudect_ctx_t *ctx) {
for (size_t i = 10 /* discard the first few measurements */; i < (ctx->config->number_measurements-1); i++) {
int64_t difference = ctx->exec_times[i];
...
t_push(ctx->ttest_ctxs[0], difference, ctx->classes[i]);
for (size_t crop_index = 0; crop_index < DUDECT_NUMBER_PERCENTILES; crop_index++) {
if (difference < ctx->percentiles[crop_index]) {
t_push(ctx->ttest_ctxs[crop_index + 1], difference, ctx->classes[i]);
}
if (ctx->ttest_ctxs[0]->n[0] > 10000) {
double centered = (double)difference - ctx->ttest_ctxs[0]->mean[ctx->classes[i]];
t_push(ctx->ttest_ctxs[1 + DUDECT_NUMBER_PERCENTILES], centered * centered, ctx->classes[i]);
}
}
}
在此我們引入同樣的手法,計算出百分位數作為去除離群值的依據
typedef struct {
double mean[2];
double m2[2];
double n[2];
+ int64_t *percentiles;
+ bool first;
} t_context_t;
在結構體中新增兩個成員,用於判斷是否要計算百分位數以及儲存計算的結果
@@ -45,5 +45,6 @@ void t_init(t_context_t *ctx)
ctx->m2[class] = 0.0;
ctx->n[class] = 0.0;
}
+ ctx->first = true;
在 init_once
呼叫的 t_init
中,固定把 first
設成 true
static bool doit(int mode) {
...
bool ret = measure(before_ticks, after_ticks, input_data, mode);
differentiate(exec_times, before_ticks, after_ticks);
+ if (t->first) {
+ t->percentiles = calloc(DUDECT_NUMBER_PERCENTILES, sizeof(int64_t));
+ prepare_percentiles(t, exec_times);
+ } else
+ update_statistics(exec_times, classes);
ret &= report();
...
}
並且將第一組獲得的執行時間用於計算百分位數,不參與統計量的計算,百分位數的計算參考原 dudect 的實作,將完整的一組資料做排序,並且計算出各個 index 的閥值,除存在結構體的成員中。
static int64_t percentile(int64_t *a, double which, size_t size)
{
size_t array_position = (size_t) ((double) size * (double) which);
return a[array_position];
}
t->percentiles[i] = percentile(
exec_times,
1 - (pow(0.5, 10 * (double) (i + 1) / DUDECT_NUMBER_PERCENTILES)),
N_MEASURES);
接這是統計量計算函式的修改
+#define PERCENTILE_UPPER_BOUND 95
static void update_statistics(const int64_t *exec_times, uint8_t *classes)
{
for (size_t i = DROP_SIZE; i < N_MEASURES; i++) {
int64_t difference = exec_times[i];
/* CPU cycle counter overflowed or dropped measurement */
if (difference <= 0)
continue;
+ if (difference < t->percentiles[PERCENTILE_UPPER_BOUND]) {
+ t_push(t, difference, classes[i]);
}
}
}
我們將先前計算好的百分位數拿來與執行時間 difference
作比較,僅保留小於閥值的資料計算統計量。
並且考慮去除部份數值後,資料量可能不足以滿足 dudect 的評測,因此原本固定的迴圈次數修改為不斷執行,直到取得足夠測試資料為止。
static bool test_const(char *text, int mode)
{
bool result = false;
...
- for (int i = 0; i < ENOUGH_MEASURE / (N_MEASURES - DROP_SIZE * 2) + 1;
- ++i)
+ while (t->n[0] + t->n[1] < ENOUGH_MEASURE)
result = doit(mode);
...
return result;
}
report
函式的優化commit 676b460
在 lab0-c 的實作中 report
函式每次都會計算 t value,但是在測試資料不足時卻又不採用此次結果。
static bool report(void) {
double number_traces_max_t = t->n[0] + t->n[1];
printf("\033[A\033[2K");
if (number_traces_max_t < ENOUGH_MEASURE) {
printf("not enough measurements (%.0f still to go).\n",
ENOUGH_MEASURE - number_traces_max_t);
return false;
}
double max_t = fabs(t_compute(t));
...
}
因此在這邊將判斷測試資料的程式碼移動至函式最前面,已確保不會產生不必要的計算。
measure
中沒有對完整的測資做隨機處理commit 8f57667
回想一下 queue.c
中 insert 和 remove 的函式參數
element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize)
bool q_insert_head(struct list_head *head, char *s)
有一個共通點為佇列,另外 insert 函式中有一個字串參數 s
,而 remove 函式中則是有 sp
以及 bufsize
,對照 dudect 論文所述,必須能產生隨機類型的輸入。對應上這些參數,我們可以透過隨機變數初始化下列參數:
head
: 輸入佇列的長度s
: 輸入字串的長度bufsize
: 需要複製出的資料大小sp
: 是否為 NULL(需與 bufsize
搭配)在原本 lab0-c 的實作中,僅將取得的隨機數用於初始化佇列的大小,對於 remove 函式所需要的變數皆設為固定的數值element_t *e = q_remove_tail(l, NULL, 0);
,這便是先前 remove 函式能通過測試的原因。
我將整個 measure
函式改寫成下面的形式
bool measure(int64_t *before_ticks,
int64_t *after_ticks,
uint8_t *input_data,
int mode)
{
for (size_t i = 0; i < N_MEASURES; ++i) {
char *s, tmp[32] = {0};
char *sp = NULL;
int buf_sz = 0;
uint16_t rn = *(uint16_t *) (input_data + i * CHUNK_SIZE) % 10000;
if (rn == 0) {
s = "FIXEDSTR";
} else {
s = get_random_string();
sp = tmp;
buf_sz = (rn % 8) + 1;
}
dut_new();
if (mode == DUT_remove_head || mode == DUT_remove_tail)
rn++;
dut_insert_head(get_random_string(), rn);
bool (*insert_func)(struct list_head *, char *) = NULL;
element_t *(*remove_func)(struct list_head *, char *, size_t) = NULL;
if (mode == DUT_insert_head || mode == DUT_insert_tail)
insert_func =
mode == DUT_insert_head ? q_insert_head : q_insert_tail;
else if (mode == DUT_remove_head || mode == DUT_remove_tail)
remove_func =
mode == DUT_remove_head ? q_remove_head : q_remove_tail;
int before_size = q_size(l);
if (insert_func) {
before_ticks[i] = cpucycles();
insert_func(l, s);
after_ticks[i] = cpucycles();
} else {
before_ticks[i] = cpucycles();
element_t *e = remove_func(l, sp, buf_sz);
after_ticks[i] = cpucycles();
if (e)
q_release_element(e);
}
int after_size = q_size(l);
dut_free();
if (insert_func && before_size != after_size - 1)
return false;
else if (remove_func && before_size != after_size + 1)
return false;
}
return true;
}
重要的改動有以下幾點:
mode
以及函式指標決定受測函式(25 ~ 30 行)input_data
隨機取出的 rn
判斷此次測試使用固定測資還是隨機測資(7 ~ 16 行)做了上述修改後,現在四個佇列操作函式皆無法通過測試,不過我認為這是預期內的狀況
這個 repo 是我從原版 dudect 複製修改的
我將測試資料大小設定為 8 bytes,並且以 memcmp
作為測試函式,此處預期至少會比較 1 個位元組的資料,至多 8 個,在執行時間上的差異上可以算是非常小。
int check_tag(uint8_t *x, uint8_t *y, size_t len) {
return memcmp(x, y, len);
}
uint8_t do_one_computation(uint8_t *data) {
/* simulate totally bogus MAC check in non-constant time */
return check_tag(data, secret, 8);
}
以下是測試結果,依然被 deduct 被判定為非常數執行時間,由此可得知 deduct 具有相當的敏感度。
$ ./dudect_simple_O0
meas: 0.10 M, max t: +2.00, max tau: 6.41e-03, (5/tau)^2: 6.09e+05. For the moment, maybe constant time.
meas: 0.19 M, max t: +3.56, max tau: 8.20e-03, (5/tau)^2: 3.72e+05. For the moment, maybe constant time.
meas: 0.30 M, max t: +3.85, max tau: 7.04e-03, (5/tau)^2: 5.04e+05. For the moment, maybe constant time.
meas: 0.40 M, max t: +3.95, max tau: 6.25e-03, (5/tau)^2: 6.40e+05. For the moment, maybe constant time.
...
...
meas: 5.63 M, max t: +9.87, max tau: 4.16e-03, (5/tau)^2: 1.45e+06. For the moment, maybe constant time.
meas: 5.73 M, max t: +9.93, max tau: 4.15e-03, (5/tau)^2: 1.45e+06. For the moment, maybe constant time.
meas: 5.83 M, max t: +9.80, max tau: 4.06e-03, (5/tau)^2: 1.52e+06. For the moment, maybe constant time.
meas: 5.93 M, max t: +9.95, max tau: 4.09e-03, (5/tau)^2: 1.50e+06. For the moment, maybe constant time.
meas: 6.03 M, max t: +9.91, max tau: 4.03e-03, (5/tau)^2: 1.54e+06. For the moment, maybe constant time.
meas: 6.13 M, max t: +10.10, max tau: 4.08e-03, (5/tau)^2: 1.50e+06. Probably not constant time.
考慮我 lab0-c 實作的佇列操作使用 memcpy
以及 strdup
等函式進行資料複製,這些函式執行時間本身會隨著資料量而改變,此外還有 insert 函式中的 malloc 涉及作業系統資源分配,因此即便 list_add
以及 list_del
屬於常數時間操作,但整體上來說佇列的 insert 以及 remove 並非常數時間,導致未能通過測試。
首先我們觀察 console 的命令是如何被建立的,在 console.h
中可以看到一個巨集
#define ADD_COMMAND(cmd, msg, param) add_cmd(#cmd, do_##cmd, msg, param)
這個巨集在函式 console_init
中被頻繁使用,它用到了前面說過得 ##
運算子,以及 #
運算子。
#
運算子的作用是將要代換的文字內容給字串化,在 C 語言來說也就是加上 ""
進行修飾。
以下是一個使用的範例
ADD_COMMAND(dedup, "Delete all nodes that have duplicate string", "");
add_cmd("dedup", do_dedup, "Delete all nodes that have duplicate string", "")
第一個參數 dedup
會被字串化成 "dedup"
後作為 add_cmd
的第一個參數,另外 dedup
也會被拼接成 du_dedup
,由此可知我們要加入的函式必須取名為 do_xxx
的形式,並且在 qtest.c
中實作這個函式,在由這個 do_xxx
的函式去完成我們想要達成的內容。
接著開始新增命令,同樣地在 console_init
中加入一個 ADD_COMMAND
ADD_COMMAND(shuffle, "Shuffle nodes in queue randomly", "");
並且在 q_test.c
中提供介面
static bool do_shuffle(int argc, char *argv[])
{
q_shuffle(current->q);
return true;
}
q_shuffle
的本體實作如下
commit ecf7321
+ struct list_head **nodes =
+ (struct list_head **) malloc(sizeof(struct list_head *) * sz);
+ struct list_head *cur = head->next;
+ for (int i = 0; i < sz; i++) {
+ nodes[i] = cur;
+ cur = cur->next;
+ }
+ for (int i = sz - 1; i > 0; i--) {
+ randombytes(buf, sizeof(buf));
+ int j = *(uint32_t *) buf % (i + 1);
+ if (i != j) {
+ struct list_head *tmp = nodes[i];
+ nodes[i] = nodes[j];
+ nodes[j] = tmp;
+ }
+ }
+ INIT_LIST_HEAD(head);
+ for (int i = 0; i < sz; i++) {
+ list_add_tail(nodes[i], head);
由於串列上的資料為字串,要直接在串列上採用Yates 演算法需要頻繁地複製字串資料,因此藉由一個動態分配的陣列以快速取得每一個指向串列節點的指標,也可以透過陣列隨機訪問的特性快速向陣列後方交換元素。
當 Yates 演算法在儲存串列節點指標的陣列完成洗牌後,我們可以在根據這個順序恢復串列的結構。
首先是新增一個 option 可以設定 PRNG 的值,以決定使用內建(/dev/urandom
)作為亂數來源或者是使用新增的 XOR PRNG 來產生隨機數。
@@ -75,6 +75,8 @@ static int string_length = MAXSTRING;
+static int prng = 0;
@@ -173,7 +175,14 @@ static void fill_rand_string(char *buf, size_t buf_size)
- randombytes((uint8_t *) randstr_buf_64, len * sizeof(uint64_t));
+ switch (prng) {
+ case 1:
+ // TODO: implement a prng use xor shift
+ break;
+ case 0:
+ default:
+ randombytes((uint8_t *) randstr_buf_64, len * sizeof(uint64_t));
+ }
@@ -1111,6 +1120,9 @@ static void console_init()
+ add_param("prng", &prng,
+ "Select the random number generator [0:/dev/urandom, 1:xor]",
+ NULL);
首先新增一個變數,用來區分當前的亂數模式,並且在產生隨機字串的函式加入判斷,根據當前模式產生出亂數。
模仿其他 option 的實作,可透過 add_param
將變數 prng
加入 param_list
這個串列上。
/* Integer-valued parameters */
typedef struct __param_element {
char *name;
int *valp;
char *summary;
/* Function that gets called whenever parameter changes */
setter_func_t setter;
struct __param_element *next;
} param_element_t;
param_list
串列除了紀錄 option 的數值以外,也紀錄 option 的名稱以及提示訊息,可以在 qtest
中輸入 help 查看 option 的狀態,以及使用方式。
commit 220eb9a
在實作 XOR shift 前,我先行閱讀了Xorshift RNGs這篇論文,得知 XOR shift 有一個限制為存在週期,週期的長短跟 bit 數有關,最長為
static void xorshift(u_int64_t *val)
{
*val ^= *val << 13;
*val ^= *val >> 7;
*val ^= *val << 17;
}
首先查看 xorshift
函式的運算,每次進行三組 XOR + shift 的操作。
這個運算的原理來自於線性代數,可以將這個過程視為一個線性轉換
考慮一個 GF(2) 上的 4 階單位矩陣與一個
當
單位矩陣
這時候就完成了一個狀態的轉換。
關於為什麼一次 xor shift 涵蓋了三次狀態轉換的原因,擷取自論文原文
Unfortunately, when n is 32 or 64, no choices for a and b will
provide such a T with the required order. (A preliminary test on candidates is to square T n times. If the result is not
T , then T cannot have order. By representing the rows of a binary matrix as a computer word, then xoring all
of the rows for which a (left) multiplying binary vector has 1’s, very fast binary matrix products can be evaluated with
simple C or Fortran programs, and it only takes a few minutes to check every possible a, b in
However, there are many choices for a, b, c for which the matriceshave or-
der 2n − 1, when n = 32 or n = 64. There are 81 triples (a, b, c), a < c, for which the 32 × 32 binary matrix
has order
為了避免週期過早的重複,在 32 或者 64 bits 的整數上才採取三個一組的運算。
接著是透過 xor shift 生成隨機數的函式本體
void xor_rng(uint8_t *buf, size_t n)
{
static struct {
u_int64_t x;
int cnt;
} state = {.x = 0, .cnt = 7}; // each state x can generate 8 bytes data
if (__glibc_unlikely(state.x == 0)) {
int ret = getrandom(&state.x, sizeof(state.x), 0);
assert(ret);
}
size_t offset = 0;
while (n--) {
if (state.cnt == 0) {
xorshift(&state.x);
state.cnt = 7;
}
buf[offset++] = (state.x >> (state.cnt << 3)) & 0xFF;
state.cnt -= 1;
}
}
使用 64 bit 時,一次可以產生 8 個 8 位元組的隨機數,為了不要浪費產生的結果而快速抵達新的週期,因此使用一個結構體包裝狀態以及記數 cnt
,當記數值歸零的時候,就更新當前的狀態。
另外由於每次要取 8 byte,透過將 state.x
向右位移 8 的倍數次,便可以取得每一個 byte 的狀態。其中 state.cnt << 3
相當於乘 8,也就是說透過 state.cnt
的值決定要取出 state.x
的哪個部份。
另外也將這個新的隨機數生成器整合至洗牌函式中
- randombytes(buf, sizeof(buf));
+ if (prng == 0)
+ randombytes(buf, sizeof(buf));
+ else
+ xor_rng(buf, sizeof(buf));
在這個地方,我們使用卡方檢定以及熵來比較 linux kernel 中的 PRNG 以及我們實作的 xor shift 是否能達到足夠的亂度(各結果出現的頻率接近且無法預測其順序)。
測試程式取自於作業說明中的 python 程式,測試步驟如下
qtest
並建立一個新佇列1
, 2
, 3
, 4
shuffle
命令共 1000000 次PRNG: /dev/urandom
Expectation: 41666
Observation: {'1234': 42174, '1243': 41662, '1324': 41774, '1342': 41450, '1423': 41631, '1432': 41652, '2134': 41798, '2143': 41769, '2314': 41722, '2341': 41576, '2413': 41710, '2431': 41407, '3124': 41806, '3142': 41419, '3214': 41607, '3241': 41642, '3412': 41522, '3421': 41709, '4123': 41638, '4132': 41630, '4213': 41813, '4231': 41652, '4312': 41482, '4321': 41755}
chi square sum: 14.37671002736044
PRNG: xor shift
Expectation: 41666
Observation: {'1234': 41565, '1243': 41704, '1324': 41749, '1342': 41707, '1423': 42060, '1432': 41478, '2134': 41617, '2143': 41700, '2314': 41527, '2341': 41729, '2413': 41397, '2431': 42126, '3124': 41919, '3142': 41602, '3214': 41551, '3241': 41729, '3412': 41990, '3421': 41594, '4123': 41786, '4132': 41779, '4213': 41434, '4231': 41242, '4312': 41769, '4321': 41246}
chi square sum: 27.956607305716886
在繼續進行卡方檢定前,先看兩組數據畫出的直方圖,可以看到資料的分佈大致上是平均的
我們算出了兩組卡方數為 14.36 以及 27.95,以常見的
此處需要進行查表找到 p value 為多少,p value 代表此次觀察到的結果所發生的紀律,並且與
我們的測試資料可能的結果共有
DF\P | 0.995 | 0.975 | 0.20 | 0.10 | 0.05 | 0.025 | 0.02 | 0.01 | 0.005 | 0.002 | 0.001 |
---|---|---|---|---|---|---|---|---|---|---|---|
21 | 8.034 | 10.283 | 26.171 | 29.615 | 32.671 | 35.479 | 36.343 | 38.932 | 41.401 | 44.522 | 46.797 |
22 | 8.643 | 10.982 | 27.301 | 30.813 | 33.924 | 36.781 | 37.659 | 40.289 | 42.796 | 45.962 | 48.268 |
23 | 9.260 | 11.689 | 28.429 | 32.007 | 35.172 | 38.076 | 38.968 | 41.638 | 44.181 | 47.391 | 49.728 |
第一組內建隨機數的卡方數為 14.36,對應的 p 值在 0.975 ~ 0.2 之間,第二組採取 xor shift 的卡方數為 27.95,對應的 p 值同樣在 0.975 ~ 0.2 之間。
因此當顯著水準為 0.05 時,不足以否定此組資料屬於均勻分佈。
另外將同一組資料拿來計算熵,兩者的數值相當接近
Entropy for /dev/urandom: 4.5850 bits
Entropy for xor shift: 4.5849 bits
距離理論值相當接近,因此可以相信此組資料具備一定的亂度。
不過根據卡方檢定和熵的計算,不難發現兩者皆在乎最後的頻率,並不在意產生的順序,根據 chatgpt 的建議,這邊引入一樣測試自我相關函數,來驗證隨機生成的序列是否存在特定的模式,也就是
自我相關函數的計算流程如下
進一步將結果繪製成圖後可以觀察到所