contributed by <leowu0411
>
$gcc --version
gcc (Ubuntu 11.4.0-1ubuntu1~22.04) 11.4.0
$lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Address sizes: 39 bits physical, 48 bits virtual
Byte Order: Little Endian
CPU(s): 4
On-line CPU(s) list: 0-3
Vendor ID: GenuineIntel
Model name: Intel(R) Core(TM) i5-1038NG7 CPU @ 2.00GHz
CPU family: 6
Model: 126
Thread(s) per core: 1
Core(s) per socket: 4
Socket(s): 1
Stepping: 5
BogoMIPS: 3993.33
Caches (sum of all):
L1d: 192 KiB (4 instances)
L1i: 128 KiB (4 instances)
L2: 2 MiB (4 instances)
L3: 6 MiB (1 instance)
NUMA:
NUMA node(s): 1
NUMA node0 CPU(s): 0-3
佇列採用環狀鏈結串列實作,其結構由兩種資料型態——struct list_head
與 element_t
——組成:
struct list_head
包含兩個指標,分別指向前一個與後一個節點,用於維護環狀鏈結串列。element_t
是佇列的節點資料型態,內含 struct list_head
以及節點的值(一個字串)。透過節點內的 struct list_head
來走訪佇列,並使用巨集 list_entry
取得 element_t
的實際記憶體位置,以存取節點資料。
q_new()
函式 q_new()
負責為佇列的起始位置配置記憶體,並回傳其指標。若記憶體配置失敗,則直接回傳 NULL。
該起始位置稱為 head
,為一代表性節點,僅包含 struct list_head
,不存放任何數據,純粹用於標示佇列的開頭。
struct list_head *q_new()
{
struct list_head *ret = malloc(sizeof(struct list_head));
if (ret)
INIT_LIST_HEAD(ret);
return ret;
}
函式 q_free()
用於釋放佇列的所有節點及其起始位置 head
。
head
為 NULL
,則直接返回。list_for_each_entry_safe
遍歷佇列中的所有節點,並使用 q_release_element(entry)
釋放節點資源head
所佔用的記憶體。void q_free(struct list_head *head)
{
if (!head)
return;
element_t *entry = NULL, *safe;
list_for_each_entry_safe (entry, safe, head, list) {
q_release_element(entry);
}
free(head);
}
此處使用巨集 list_for_each_entry_safe
,其作用在於透過額外的指標保存被刪除節點的下一個節點位置,確保在遍歷過程中,即使刪除當前節點,也不會遺失鏈結串列的結構。
在實作時,對於巨集 list_for_each_entry_safe
與 list_for_each_entry
的終止條件感到困惑,即以下程式碼的第 4 行:
#if __LIST_HAVE_TYPEOF
#define list_for_each_entry(entry, head, member) \
for (entry = list_entry((head)->next, typeof(*entry), member); \
&entry->member != (head); \
entry = list_entry(entry->member.next, typeof(*entry), member))
#endif
困惑點在於,當走訪至最後一個節點(即 head
)時,如上所述 head
沒有被包含於 container 內,但程式碼仍舊透過 list_entry()
計算記憶體位置,甚至利用該地址存取 member 的值。
分析後發現,list_entry()
的回傳位置是 member 的記憶體地址減去 container_type 結構中 member 的 offset,即:
entry = (container_type *)((char *)head - offsetof(container_type, member))
而在終止條件的判斷過程中,entry->member
會將這段 offset 補回來:
&entry->member = (char *)entry + offsetof(container_type, member)
= (char *)((char *)head - offsetof(container_type, member)) + offsetof(container_type, member)
= head
確保了當 entry->member
指向 head
時,迴圈終止。
這兩個函式負責為佇列的新節點分配記憶體空間,並確保節點中的值有足夠的空間存放字串,以確保其擁有合法的記憶體位置。
注意下列程式碼第 14 行,原本程式使用 strcpy
來複製字串,但經靜態分析後發現,若輸入字串的長度超過目標緩衝區的大小,將導致記憶體溢位,進而覆蓋後續記憶體區域,造成不可預期的非法行為。
因此,改以 strncpy
來提高程式碼的穩定性與安全性。
bool q_insert_head(struct list_head *head, char *s)
{
if (!head)
return false;
element_t *node = malloc(sizeof(element_t));
if (!node)
return false;
int str_len = strlen(s) + 1;
node->value = malloc(str_len * sizeof(char));
if (!node->value) {
free(node);
return false;
}
strncpy(node->value, s, str_len);
list_add(&node->list, head);
return true;
}
此段僅擷取 q_insert_head
, 而 q_insert_tail
與以上列出之程式碼差異僅第 15 行使用 list_add_tail
將新節點加入佇列尾端。
這兩個函式負責從佇列的開頭或尾端移除一個元素,但不釋放記憶體,僅解除與佇列的鏈結。
sp
為空代表僅需將節點移出佇列,因此提早結束函式strncpy
以及 bufsize
將字串存入 sp
element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize)
{
if (!head || list_empty(head))
return NULL;
element_t *node;
node = list_entry(head->next, typeof(*node), list);
if (!(node->value) || !sp)
return node;
strncpy(sp, node->value, bufsize - 1);
sp[bufsize - 1] = '\0';
list_del_init(&node->list);
return node;
}
與插入佇列的狀況相同,此二函式的行為極為相似,故此只擷取 q_remove_head()
。
q_remove_tail()
唯一的差異僅在於當初始化 node 的值時,傳入 list_entry
的第一個參數為 head->prev
。
此函式單純透過遍歷 head 以外的所有節點,計算並回傳節點總數。
int q_size(struct list_head *head)
{
if (!head || !head->next)
return 0;
int num = 0;
struct list_head *ptr;
for (ptr = head->next; ptr != head; ptr = ptr->next) {
num++;
}
return num;
}
函式的核心目標是定位並移除佇列的中間節點,同時釋放其記憶體空間。此處採用快慢指標進行搜尋,其中:
slow
指標:用於定位中間節點。fast
指標:每次前進兩步。以下為快慢指標的實作(其中 slow 最終指向中間節點),其餘程式碼則負責將該節點自佇列移除並釋放記憶體:
struct list_head *slow = head->next, *fast = slow->next;
element_t *dlt;
for (; fast->next != head && fast->next->next != head;
fast = fast->next->next) {
slow = slow->next;
}
slow = slow->next;
}
此函式在實作時假設輸入佇列已經經過排序,如佇列之重複字串沒有排放在連續節點,演算法則不成立。
透過內層迴圈,所有相鄰且重複的節點將被移除,確保佇列內僅保留唯一的字串值。
bool q_delete_dup(struct list_head *head)
{
if (!head || list_empty(head) || list_is_singular(head))
return false;
struct list_head *ptr = head->next;
element_t *cur = list_entry(ptr, element_t, list),
*nxt = list_entry(ptr->next, element_t, list);
while (&cur->list != head && &nxt->list != head) {
if (strcmp(cur->value, nxt->value) == 0) {
while (&nxt->list != head && strcmp(cur->value, nxt->value) == 0) {
list_del(ptr);
ptr = &nxt->list;
q_release_element(cur);
cur = list_entry(ptr, element_t, list);
nxt = list_entry(ptr->next, element_t, list);
}
list_del(ptr);
q_release_element(cur);
}
ptr = &nxt->list;
cur = list_entry(ptr, element_t, list);
nxt = list_entry(ptr->next, element_t, list);
}
return true;
}
透過快慢指標 (slow 和 fast) 依序交換相鄰節點的位置。
唯一需注意的是指標賦值的順序,以確保交換過程不影響鏈結的完整性,避免指標錯誤。
void q_swap(struct list_head *head)
{
if (!head || list_empty(head) || list_is_singular(head))
return;
struct list_head *slow = head->next, *fast = slow->next;
while (slow != head && fast != head) {
slow->prev->next = fast;
fast->next->prev = slow;
slow->next = fast->next;
fast->next = slow;
fast->prev = slow->prev;
slow->prev = fast;
slow = slow->next;
fast = slow->next;
}
return;
}
在實作完後發現其實可以使用 reverseK 函式,並將 k 參數設為 2,以避免多餘程式碼。
由於此佇列為雙向鏈結串列,因此僅需將每個節點的 prev 指標與 next 指標互換即完成佇列反轉。
void q_reverse(struct list_head *head)
{
if (!head || list_empty(head) || list_is_singular(head))
return;
struct list_head *cur = head;
do {
struct list_head *tmp;
tmp = cur->next;
cur->next = cur->prev;
cur->prev = tmp;
cur = tmp;
} while (cur != head);
return;
}
此函式實作時的想法是:
q_size(head) / k
計算完整的 K 個節點組數。prev_tail
記錄前一組的結尾,new_tail
記錄當前組的開頭(最終變為尾部)。需注意指標賦值順序,確保反轉過程中不會造成鏈結損壞。
void q_reverseK(struct list_head *head, int k)
{
if (!head || list_empty(head) || list_is_singular(head))
return;
int round = q_size(head) / k;
struct list_head *cur = head->next, *prev_tail, *new_tail;
for (int i = 0; i < round; i++) {
prev_tail = cur->prev;
new_tail = cur;
cur->prev = cur->next;
cur = cur->next;
for (int j = 1; j < (k - 1); j++) {
struct list_head *tmp = cur->next;
cur->next = cur->prev;
cur->prev = tmp;
cur = tmp;
}
new_tail->next = cur->next;
cur->next->prev = new_tail;
cur->next = cur->prev;
prev_tail->next = cur;
cur->prev = prev_tail;
cur = new_tail->next;
}
}
實作 merge sort,利用快慢指標定位中間節點,並遞迴呼叫此函式以完成排序。
void q_sort(struct list_head *head, bool descend)
{
if (!head || list_empty(head) || list_is_singular(head))
return;
struct list_head *slow = head->next, *fast = slow->next;
for (; fast->next != head && fast->next->next != head;
fast = fast->next->next) {
slow = slow->next;
}
struct list_head left;
list_cut_position(&left, head, slow);
q_sort(&left, descend);
q_sort(head, descend);
merge_two_list(head, &left, descend);
}
原本實作的排序演算法為快速排序法(如下),但在 make test
後發現必須確保排序演算法保持
void q_sort(struct list_head *head, bool descend)
{
if (!head || list_empty(head) || list_is_singular(head))
return;
struct list_head less, greater;
element_t *node = NULL, *safe = NULL;
element_t *pivot = list_first_entry(head, element_t, list);
list_del(&pivot->list);
INIT_LIST_HEAD(&less);
INIT_LIST_HEAD(&greater);
// cppcheck-suppress unusedLabel
list_for_each_entry_safe (node, safe, head, list) {
if (strcmp(node->value, pivot->value) < 0) {
list_del(&node->list);
list_add_tail(&node->list, &less);
} else {
list_del(&node->list);
list_add_tail(&node->list, &greater);
}
}
q_sort(&less, descend);
q_sort(&greater, descend);
list_add(&pivot->list, head);
if (descend) {
list_splice(&greater, head);
list_splice_tail(&less, head);
} else {
list_splice(&less, head);
list_splice_tail(&greater, head);
}
}
此函式負責將兩個排序後的佇列合併成同樣排序的佇列且最後的佇列會存放於 head1 所指向的佇列。
q1
、p2
)分別代表 head1
以及 head2
所指向之佇列。descend
判斷插入順序。head2
需將其移出原佇列並插入 q1
所指之節點之前,如需插入之節點屬於 head1
則僅需賦值 q1
為 q1->next
即代表插入。head2
所指向之佇列仍有剩餘節點,需將 head2
剩餘部分接到 head1
鏈結串列之尾部。static inline void merge_two_list(struct list_head *head1,
struct list_head *head2,
bool descend)
{
if (!head1 || !head2 || list_empty(head2))
return;
if (list_empty(head1)) {
list_splice_init(head2, head1);
return;
}
struct list_head *q1 = head1->next, *q2 = head2->next;
for (; q1 != head1 && q2 != head2;) {
element_t *e1 = list_entry(q1, element_t, list);
element_t *e2 = list_entry(q2, element_t, list);
if ((!descend && strcmp(e2->value, e1->value) <= 0) ||
(descend && strcmp(e2->value, e1->value) >= 0)) {
struct list_head *next_q2 = q2->next;
list_del(q2);
list_add(q2, q1->prev);
q2 = next_q2;
} else
q1 = q1->next;
}
if (!list_empty(head2))
list_splice_tail_init(head2, head1);
}
可以理解為,自佇列尾到頭必須遵守非嚴格遞增,故實作將 head->prev 作為起始節點,並朝 head 方向走訪佇列。
int q_descend(struct list_head *head)
{
if (!head || list_empty(head))
return 0;
struct list_head *max = head->prev;
int count = 1;
while (max != head) {
if (max->prev == head)
break;
element_t *cur = list_entry(max, element_t, list);
element_t *pre = list_entry(max->prev, element_t, list);
if (strcmp(cur->value, pre->value) > 0) {
list_del(&pre->list);
q_release_element(pre);
} else {
max = max->prev;
count++;
}
}
return count;
}
此函式利用分層合併(tree-like merging)的方法,將多個已排序的佇列進行合併,最終生成單一佇列,並維持排序順序。
queue_contex_t
(qc1
和 qc2
),使用 merge_two_list()
進行合併,並將 qc2
移除。qc2
會被放入 finish
佇列,等待後續合併head
只剩下一個佇列。finish
內的剩餘元素重新拼接回 head
list_splice_tail_init(&finish, head)
將 finish
內的元素接回 headhead
只剩一個 queue_contex_t
帶有完整的排序佇列,其餘則只帶有空佇列並等待後續釋放記憶體
int q_merge(struct list_head *head, bool descend)
{
if (!head || list_empty(head) || list_is_singular(head))
return q_size(list_entry(head->next, queue_contex_t, chain)->q);
struct list_head finish;
INIT_LIST_HEAD(&finish);
while (!list_is_singular(head)) {
struct list_head *cur = head->next;
while (cur != head && cur->next != head) {
queue_contex_t *qc1 = list_entry(cur, queue_contex_t, chain);
queue_contex_t *qc2 = list_entry(cur->next, queue_contex_t, chain);
merge_two_list(qc1->q, qc2->q, descend);
list_del(&qc2->chain);
list_add(&qc2->chain, &finish);
cur = cur->next;
}
}
list_splice_tail_init(&finish, head);
return q_size(list_entry(head->next, queue_contex_t, chain)->q);
}
最初實作 q_merge()
時發生記憶體洩漏,原因在於直接將已合併的 queue_contex_t
節點從 chain
移除,而未使用 finish
進行保存與重新連結。這導致 q_merge()
結束時,被移出的節點未正確釋放記憶體,最終造成洩漏。
在資安領域,確保函式執行時間保持恆定時間至關重要。如果一個函式在處理不同類型的輸入(如負數、浮點數、自然數等)時,其執行時間無法保持一致,則惡意攻擊者便能透過統計函式的執行時間來推測特定輸入的行為,進而非法取得敏感資訊,這類攻擊即稱為時間攻擊(Timing Attacks)。
時間攻擊屬於側通道攻擊的一種,透過測量密碼學運算的執行時間來推測機密資訊(如密鑰或密碼)。與其他側通道攻擊相比,時間攻擊具有以下優勢:
要評估一個函式是否真正符合恆定時間並不容易,即使原本預期應該為恆定時間的實作,有時仍可能存在時間洩漏。
目前大多數現有的變動時間程式碼偵測工具,主要依賴建模硬體行為來計算函式的實際執行時間。然而,這種方法存在幾個問題:
在論文 〈Dude, is my code constant time?〉 中,研究者提出了一種基於統計分析的方法來評估函式是否符合恆定時間,而非依賴硬體建模。這種方式帶來了更簡單且跨平台的檢驗方法
在統計學中,母體的標準差通常無法直接測得,這使得一般的常態分布在某些應用場景下存在侷限性。為了解決這個問題,t 分布(t-distribution) 被提出,其特點是使用樣本標準差來替代母體標準差,特別適用於小樣本分析。
t 分布的形狀由自由度(degrees of freedom) 決定:
此論文利用 Welch’s t-test 以檢測函式是否為恆定時間:Welch’s t-test 主要用於比較不同類別間的數據差異,適合變異數不相等的情況。
透過上述步驟,即可利用統計分析的方法判斷實作是否為恆定時間。
測試分別對四個針對佇列進行操作的函式進行 leakage detection test 以驗證實作是否為恆定時間:
測驗分為兩類輸入:
重複測試兩者間執行時間的差異並依據 Welch’s t-test 計算最後的 t 值以及估算其自由度,以驗證實作是否為恆定時間(虛無假設成立)
N_MEASURES
代表含幾筆測資CHUNK_SIZE
決定佇列隨機長度範圍(實作為 2 代表 16-bits,0 to 65535)input
紀錄每一筆測資在呼叫函式前佇列需含多少成員,故長度為 N_MEASURES * CHUNK_SIZE
randombit()
隨機決定測資類別
input
內對應的記憶體位置的值清空為零randombytes
的賦值 randombytes(input_data, N_MEASURES * CHUNK_SIZE);
for (size_t i = 0; i < N_MEASURES; i++) {
classes[i] = randombit();
if (classes[i] == 0)
memset(input_data + (size_t) i * CHUNK_SIZE, 0, CHUNK_SIZE);
}
for (size_t i = 0; i < N_MEASURES; ++i) {
/* Generate random string */
randombytes((uint8_t *) random_string[i], 7);
random_string[i][7] = 0;
}
before_ticks[i] = cpucycles();
dut_insert_head(s, 1);
after_ticks[i] = cpucycles();
static void differentiate(int64_t *exec_times,
const int64_t *before_ticks,
const int64_t *after_ticks)
{
for (size_t i = 0; i < N_MEASURES; i++)
exec_times[i] = after_ticks[i] - before_ticks[i];
}
void t_push(t_context_t *ctx, double x, uint8_t class)
{
assert(class == 0 || class == 1);
ctx->n[class]++;
double delta = x - ctx->mean[class];
ctx->mean[class] = ctx->mean[class] + delta / ctx->n[class];
ctx->m2[class] = ctx->m2[class] + delta * (x - ctx->mean[class]);
}
double t_compute(t_context_t *ctx)
{
double var[2] = {0.0, 0.0};
var[0] = ctx->m2[0] / (ctx->n[0] - 1);
var[1] = ctx->m2[1] / (ctx->n[1] - 1);
double num = (ctx->mean[0] - ctx->mean[1]);
double den = sqrt(var[0] / ctx->n[0] + var[1] / ctx->n[1]);
double t_value = num / den;
return t_value;
}
在原本的實作中,並未對個別測量值進行後處理,直接進行統計分析,但根據論文〈Dude, is my code constant time?〉II.B ,需藉由後處理以確保統計結果的可靠性,這是因為:時間分佈通常呈現右偏分佈,即執行時間偏向較大的數值,此現象可能因作業系統中斷、背景程序影響或外部噪聲導致,使統計分析失真。
目前的優化較為不嚴謹且直觀,即將執行時間最大的 5% 自測量值移除,以避免上述原因所導致的失真:
FINAL_SIZE
,代表每次測試只保留 95% 測量值#define ENOUGH_MEASURE 10000
#define TEST_TRIES 10
+#define FINAL_SIZE (int) ((N_MEASURES - DROP_SIZE * 2) * 0.95)
ENOUGH_MEASURE
)static bool test_const(char *text, int mode)
for (int cnt = 0; cnt < TEST_TRIES; ++cnt) {
printf("Testing %s...(%d/%d)\n\n", text, cnt, TEST_TRIES);
init_once();
- for (int i = 0; i < ENOUGH_MEASURE / (N_MEASURES - DROP_SIZE * 2) + 1;
- ++i)
+ for (int i = 0; i < ENOUGH_MEASURE / FINAL_SIZE + 1; ++i)
result = doit(mode);
t_push()
更新統計量前,首先對測驗值進行排序,並只允許 FINAL_SIZE
個測驗值更新統計量。+int compare(const void *a, const void *b)
+{
+ return (*(int64_t *) a - *(int64_t *) b);
+}
+
+static void crop_outliner(int64_t *exec_times)
+{
+ qsort(exec_times, N_MEASURES, sizeof(int64_t), compare);
+}
static bool doit(int mode)
bool ret = measure(before_ticks, after_ticks, input_data, mode);
differentiate(exec_times, before_ticks, after_ticks);
+ crop_outliner(exec_times);
update_statistics(exec_times, classes);
static void update_statistics(const int64_t *exec_times, uint8_t *classes)
{
- for (size_t i = 0; i < N_MEASURES; i++) {
+ for (size_t i = (2 * DROP_SIZE); i < FINAL_SIZE + (2 * DROP_SIZE); i++) {
int64_t difference = exec_times[i];
/* CPU cycle counter overflowed or dropped measurement */
if (difference <= 0)
在 dudect.h 中,同樣採用了非線性後處理(Crop)來移除離群值,但相比於我原先的改進方法,其處理方式更為嚴謹。在前述改進中,雖然移除 5% 的數據成功降低了 t-value,但這種下降可能來自於被移除的 5% 正是證明函式並非恆定時間的關鍵測試數據。由於這些數據被裁剪,最終統計結果無法反映其影響。
dudect.h 採用了更細緻的機制來處理離群值:
透過這種方式,能夠更有效地避免誤刪關鍵測試數據,同時減少離群值對統計結果的影響,使測試更具穩健性。
static int64_t percentile(int64_t *a_sorted, double which, size_t size) {
size_t array_position = (size_t)((double)size * (double)which);
assert(array_position < size);
return a_sorted[array_position];
}
/*
set different thresholds for cropping measurements.
the exponential tendency is meant to approximately match
the measurements distribution, but there's not more science
than that.
*/
static void prepare_percentiles(dudect_ctx_t *ctx) {
qsort(ctx->exec_times, ctx->config->number_measurements, sizeof(int64_t), (int (*)(const void *, const void *))cmp);
for (size_t i = 0; i < DUDECT_NUMBER_PERCENTILES; i++) {
ctx->percentiles[i] = percentile(
ctx->exec_times, 1 - (pow(0.5, 10 * (double)(i + 1) / DUDECT_NUMBER_PERCENTILES)),
ctx->config->number_measurements);
}
}
// t-test on the execution time
t_push(ctx->ttest_ctxs[0], difference, ctx->classes[i]);
// t-test on cropped execution times, for several cropping thresholds.
for (size_t crop_index = 0; crop_index < DUDECT_NUMBER_PERCENTILES; crop_index++) {
if (difference < ctx->percentiles[crop_index]) {
t_push(ctx->ttest_ctxs[crop_index + 1], difference, ctx->classes[i]);
}
}
static ttest_ctx_t *max_test(dudect_ctx_t *ctx) {
size_t ret = 0;
double max = 0;
for (size_t i = 0; i < DUDECT_TESTS; i++) {
if (ctx->ttest_ctxs[i]->n[0] > DUDECT_ENOUGH_MEASUREMENTS) {
double x = fabs(t_compute(ctx->ttest_ctxs[i]));
if (max < x) {
max = x;
ret = i;
}
}
}
return ctx->ttest_ctxs[ret];
}
目前疑問:
在原始程式碼中,有一組數據是未經裁切(Crop)後處理的。然而,在選擇最終的 max_t 時,為何結果並不總是來自這組未裁切的數據?按照裁切的方式來看,裁切的數據越少,t 值理應越大。
當變數 simulation 被設為 1 時,q_test() 將呼叫下列介面定義出的函式,測試函式實作是否為恆定時間
Simulation 模式下,測試函式的定義方法具有高度封裝性,對於 c 語言來說是第一次看到,使用者毋需考慮傳入函式以及內部實作,僅需呼叫介面定義函式即可。
#define _(x) bool is_##x##_const(void);
DUT_FUNCS
#undef _
bool is_##x##_const(void);
DUT_FUNCS
利用上述之宏函式,一次性完成多個函式宣告
#define DUT_FUNCS \
_(insert_head) \
_(insert_tail) \
_(remove_head) \
_(remove_tail)
bool is_insert_head_const(void);
_
避免後續誤用測試程式的內部以類似手法完成函式定義(1 ~ 5 行)
#define DUT_FUNC_IMPL(op) \
bool is_##op##_const(void) \
{ \
return test_const(#op, DUT(op)); \
}
#define _(x) DUT_FUNC_IMPL(x)
DUT_FUNCS
#undef _
其中第 4 行中的宏函式 DUT
定義如下
#define DUT(x) DUT_##x
至此,函式呼叫已經從 is_insert_head_const(void)
進行至
test_const(insert_head, DUT_insert_head)
test_count
中利用回圈重複呼叫函式 doit
進行測試,直到測驗次數達到要求。static bool test_const(char *text, int mode)
{
.
.
.
for (int i = 0; i < ENOUGH_MEASURE / FINAL_SIZE + 1; ++i)
result = doit(mode);
printf("\033[A\033[2K\033[A\033[2K");
if (result)
break;
.
.
.
}
doit
函式負責
measure
TBD
在修改 dudect
時,執行 qtest
並測試 remove_head
時發生 Bus error (core dumped),錯誤訊息如下:
./qtest
cmd> simulation 1
cmd> rh
Testing remove_head...(0/10)
ERROR: Attempted to free unallocated block. Address = 0x5555555555555555
Bus error (core dumped)
但當時的改動並不涉及記憶體操作,由於程式碼層層呼叫,在沒有相關工具的情境下,要定位是哪個函式釋放了非法記憶體將非常耗時。
valgrind ./qtest
cmd> option simulation 1
cmd> rh
Testing remove_head...(0/10)
ERROR: Attempted to free unallocated block. Address = 0x5555555555555555
==88116== Invalid read of size 8
==88116== at 0x10FA71: find_header (harness.c:105)
==88116== by 0x10FA71: test_free (harness.c:200)
==88116== by 0x10FD60: q_release_element (queue.h:119)
==88116== by 0x10FD60: q_free (queue.c:24)
==88116== by 0x110B7D: measure (constant.c:126)
==88116== by 0x111094: doit (fixture.c:145)
==88116== by 0x111094: test_const (fixture.c:175)
==88116== by 0x1111FC: is_remove_head_const (fixture.c:191)
==88116== by 0x10C741: queue_remove (qtest.c:304)
==88116== by 0x10CB8B: do_rh (qtest.c:412)
==88116== by 0x10E6C0: interpret_cmda (console.c:183)
==88116== by 0x10EC83: interpret_cmd (console.c:203)
==88116== by 0x10F787: run_console (console.c:661)
==88116== by 0x10DAB2: main (qtest.c:1444)
==88116== Address 0x555555555555554d is not stack'd, malloc'd or (recently) free'd
透過 Valgrind,發現問題來自 q_free()
:
queue_remove()
呼叫 q_remove_head()
,但後續釋放的記憶體地址 0x5555555555555555
並不合法。此錯誤非常不直覺,因為當時所有與佇列操作相關的測試用例皆已通過,但仍然出錯。
進一步檢查發現原本的 q_remove_head()
存在一個關鍵錯誤:
value
為 NULL
或 sp
為 NULL
,函式會直接 return node
,但沒有將該節點從佇列中移除。queue_contex_t
記錄的長度不一致,進而引發一系列錯誤。q_remove_tail()
也存在相同的錯誤改正如下:
element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize)
element_t *node;
node = list_entry(head->next, typeof(*node), list);
- if (!(node->value) || !sp)
- return node;
-
- strncpy(sp, node->value, bufsize - 1);
- sp[bufsize - 1] = '\0';
+ if (node->value && sp) {
+ strncpy(sp, node->value, bufsize - 1);
+ sp[bufsize - 1] = '\0';
+ }
list_del_init(&node->list);
return node;
}
實作 Fisher–Yates shuffle 演算法,對佇列進行隨機排序
void q_shuffle(struct list_head *head)
{
if (!head || list_empty(head) || list_is_singular(head))
return;
struct list_head new_list;
INIT_LIST_HEAD(&new_list);
struct list_head *old;
int len = q_size(head), random = 0;
for (; len > 1; len--) {
randombytes((uint8_t *) &random, sizeof(int));
random = random % len;
for (old = head->next; random > 0; random--)
old = old->next;
list_del(old);
list_add(old, &new_list);
}
list_splice_tail(&new_list, head);
return;
}
len
初始化為 佇列長度,代表當前可供隨機選取的節點數。new_list
作為暫存已選取節點的鏈結串列,以簡化操作。0 ~ len-1
中選擇一個隨機索引值 random
,對應佇列中的某個節點。random
所指的節點,將該節點移除(list_del(old);
)。new_list
,確保該節點不再被選取。len--
,使 尚未選取的節點數量遞減,避免重複抽取。new_list
,最後 使用 list_splice_tail()
將 new_list
重新接回 head
,完成洗牌。目前實作中,在索引 random
節點時,需遍歷佇列以找到目標節點,因此整體時間複雜度為
node_array
。swap
操作。node_array
,依序執行 list_del()
與 list_add()
,將節點從原佇列移除後,按照新順序插入。方法 | 時間複雜度 | 額外空間 |
---|---|---|
目前實作 | ||
使用陣列 |
引入 Pearson's chi-squared test 分析實作是否遵守均勻分布(Uniform Distribution)。
Pearson’s chi-squared test 用來檢驗觀測數據是否符合理論上的期望分布,但要滿足以下條件:
針對洗牌結果,計算各種排列出現的次數,並與理論上的均勻分布進行比較。
公式:
使用作業說明提供的 Python 腳本 進行統計分析,該腳本針對包含元素 1, 2, 3, 4
的佇列,執行 1,000,000 次 shuffle
操作,並統計各種排列出現的次數,計算
1, 2, 3, 4
的排列組合共有 排列組合 | 觀測次數 (Observation) | 期望次數 (Expectation) |
---|---|---|
1234 |
41,556 | 41,666 |
1243 |
41,533 | 41,666 |
1324 |
41,955 | 41,666 |
1342 |
41,899 | 41,666 |
1423 |
41,625 | 41,666 |
1432 |
41,403 | 41,666 |
2134 |
41,836 | 41,666 |
2143 |
41,602 | 41,666 |
2314 |
41,658 | 41,666 |
2341 |
41,800 | 41,666 |
2413 |
41,856 | 41,666 |
2431 |
41,668 | 41,666 |
3124 |
41,515 | 41,666 |
3142 |
41,965 | 41,666 |
3214 |
41,419 | 41,666 |
3241 |
41,651 | 41,666 |
3412 |
41,709 | 41,666 |
3421 |
41,726 | 41,666 |
4123 |
41,269 | 41,666 |
4132 |
41,594 | 41,666 |
4213 |
41,548 | 41,666 |
4231 |
41,736 | 41,666 |
4312 |
41,833 | 41,666 |
4321 |
41,644 | 41,666 |
透過線上工具分析統計結果
表示在顯著水準
將 shuffle
後的結果視覺化,也可以發現各類別結果趨近均值分佈
shuffle
整合至 q_test
命令列表使用宏函式 ADD_COMMAND
註冊新指令 shuffle
void add_cmd(char *name, cmd_func_t operation, char *summary, char *parameter);
#define ADD_COMMAND(cmd, msg, param) add_cmd(#cmd, do_##cmd, msg, param)
msg
描述指令行為param
描述指令需要使用什麼參數cmd
描述指令名稱do_##cmd
進行宏拼接,需於 do_##cmd
函式內實作新增指令方可成功註冊
add_cmd
的函式宣告中,第二個參數 operation
之資料型別為指向回傳值為布林值且參數為 (int argc, char *argv[])
格式的函式指標類型do_##cmd
函式需遵守此 signaturetypedef bool (*cmd_func_t)(int argc, char *argv[]);
q_test
命令集透過單向鏈結串列來維護,而 add_cmd
則負責將新命令的屬性(如 do_##cmd
函式指標等資訊)封裝進新節點,並將其加入鏈結串列。
static bool do_shuffle(int argc, char *argv[])
{
if (argc != 1) {
report(1, "%s takes no arguments", argv[0]);
return false;
}
if (!current || !current->q) {
report(3, "Warning: Try to operate null queue");
return false;
}
error_check();
set_noallocate_mode(true);
if (current && exception_setup(true)) {
q_shuffle(current->q);
}
exception_cancel();
set_noallocate_mode(false);
q_show(3);
return !error_check();
}
首先說明 console.c 如何整合 lineoise 於命令解析:
bool run_console(char *infile_name)
{
if (!push_file(infile_name)) {
report(1, "ERROR: Could not open source file '%s'", infile_name);
return false;
}
if (!has_infile) {
char *cmdline;
while (use_linenoise && (cmdline = linenoise(prompt))) {
interpret_cmd(cmdline);
line_history_add(cmdline); /* Add to the history. */
line_history_save(HISTORY_FILE); /* Save the history on disk. */
line_free(cmdline);
while (buf_stack && buf_stack->fd != STDIN_FILENO)
cmd_select(0, NULL, NULL, NULL, NULL);
has_infile = false;
}
if (!use_linenoise) {
while (!cmd_done())
cmd_select(0, NULL, NULL, NULL, NULL);
}
} else {
while (!cmd_done())
cmd_select(0, NULL, NULL, NULL, NULL);
}
return err_cnt == 0;
}
push_file()
函式,此函式用於:
has_file
flag,決定本次 process 本地輸入端所使用的 fd
STDIN_FILENO
file fd
:當使用 pipe
作為輸入,而非 tty
static bool push_file(char *fname)
{
int fd = fname ? open(fname, O_RDONLY) : STDIN_FILENO;
has_infile = fname ? true : false;
if (fd < 0)
return false;
if (fd > fd_max)
fd_max = fd;
// rio_t structure init, explain later...
return true;
}
linenoise()
,該函式包含等待輸入時的主要迴圈 :將使用者輸入存入字串 cmdline
tty
(has_file == false
) 且沒有 web
連線的狀況下
use_linenoise
flag 永遠為真呼叫 do_web 函式(輸入 web 命令),允許使用者以 socket 傳送指令
static bool do_web(int argc, char *argv[])
{
// init...
web_fd = web_open(port);
if (web_fd > 0) {
printf("listen on port %d, fd is %d\n", port, web_fd);
line_set_eventmux_callback(web_eventmux);
use_linenoise = false;
} else {
perror("ERROR");
exit(web_fd);
}
return true;
}
web_open()
,用以初始化 socket 相關資訊並回傳該 socket 之 fd 稱 web_fd
web_fd
成功初始化,呼叫 callback funciton line_set_eventmux_callback
指派 「能夠處理多個輸入 fd 之函式」:web_eventmux
,給相應變數 eventmux_callback
。
int()(char *)
use_linenoise
flag 設為 false
,使程式跳出 run_console
中的迴圈,代表除了原來用作輸入 fd,需額外處理新的輸入 fd: web_fd
此函式透過 select()
系統呼叫,同時處理 wed_fd
以 STDIN_FILENO
,流程如下:
int select(int nfds,
fd_set *restrict readfds,
fd_set *restrict writefds,
fd_set *restrict errorfds,
struct timeval *restrict timeout);
int web_eventmux(char *buf)
{
fd_set listenset;
FD_ZERO(&listenset);
FD_SET(STDIN_FILENO, &listenset);
int max_fd = STDIN_FILENO;
if (server_fd > 0) {
FD_SET(server_fd, &listenset);
max_fd = max_fd > server_fd ? max_fd : server_fd;
}
int result = select(max_fd + 1, &listenset, NULL, NULL, NULL);
if (result < 0)
return -1;
if (server_fd > 0 && FD_ISSET(server_fd, &listenset)) {
FD_CLR(server_fd, &listenset);
struct sockaddr_in clientaddr;
socklen_t clientlen = sizeof(clientaddr);
int web_connfd =
accept(server_fd, (struct sockaddr *) &clientaddr, &clientlen);
char *p = web_recv(web_connfd, &clientaddr);
char *buffer = "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n\r\n";
web_send(web_connfd, buffer);
strncpy(buf, p, strlen(p) + 1);
free(p);
close(web_connfd);
return strlen(buf);
}
FD_CLR(STDIN_FILENO, &listenset);
return 0;
}
位於 linenoise()->line_raw()->line_edit()
while (1) {
signed char c;
int nread;
char seq[5];
if (eventmux_callback != NULL) {
int result = eventmux_callback(l.buf);
if (result != 0)
return result;
}
nread = read(l.ifd, &c, 1);
if (nread <= 0)
return l.len;
//
TBD