# 2021q1 Homework2 (quiz2) contributed by < `iLeafy11` > ###### tags: `linux2021` ## 作業描述 ## 測驗 1 ### 解釋 `include/linux/list.h` 的精簡實作的運作原理 #### Function-like macro: `offsetof` `offsetof` 定義在 `<linux/stddef.h>` 中,用來計算某一個 struct 結構的成員相對於該結構起始地址的偏移量 (offset)。 ```c #define offsetof(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER) ``` ``` 1. 0 2. ((TYPE *)0) 3. ( ((TYPE *)0)->MEMBER ) 4. &( ((TYPE *)0)->MEMBER ) 5. ( (size_t) &( ((TYPE *)0)->MEMBER ) ``` 上述程式碼的 operation 可以拆解成以下: 1. 首先從數值 `0` 開始。 2. 將數值 `0` (`NULL`) 強制轉型成某個 `struct` 結構的指標 `(TYPE *)` 型別,`0` 會被認定是該 `struct` 結構 `TYPE` 的起始地址。 3. 然後再指向該 `struct` 結構型別內的某個成員 `MEMBER`。 4. 取得其地址 (address of `( ((TYPE *)0)->MEMBER )`)。 5. 再將該地址轉型回 `size_t`。 因為起始地址為 `0`,所以 `MEMBER` 的地址也就等於 `MEMBER` 與起始地址 `0` 的偏移 (offset)。 如果將起始地址 `0` 換成不同的數字,那麼就要記得將結果扣掉該數字,這樣才可以算出偏移量 (offset)。 ```c #define offsetof(TYPE, MEMBER) ((size_t) &(((TYPE *)87)->MEMBER) - 87) ``` 上述程式碼實現依賴於早期編譯器,所以不會有 Undefined behavior: dereference a `NULL` pointer 的問題,如今以 C11 的標準來看,`offsetof` 被定義於 gcc 內部實作: ```c #define offsetof(st, m) __builtin_offsetof(st, m) ``` 詳細資料參照: [GCC syntastic extension: Offsetof macro](https://gcc.gnu.org/onlinedocs/gcc/Offsetof.html#Offsetof) 至於 `__builtin_offsetof(st, m)` gcc 的實作,在此不討論,參照: [gcc/cp/parser.c](https://raw.githubusercontent.com/gcc-mirror/gcc/master/gcc/cp/parser.c) #### Function-like macro: `container_of` `container_of() - Calculate address of object that contains address ptr` ```c #define container_of(ptr, type, member) \ __extension__({ \ const __typeof__(((type *) 0)->member) *__pmember = (ptr); \ (type *) ((char *) __pmember - offsetof(type, member)); \ }) ``` `container_of()` macro 使用到 gcc extention `__extension__` ,包含 `__typeof__()` 以及 `offsetof()` 的使用。 `container_of()` 實作的原理: 1. 與上面 `offsetof()` 原理相似,只是從原本第 `4.` 步取地址,變成用 `__typeof__()` 去取得其型別,並宣告該型別的指標 `__pmember` 去接住被複製的 `(ptr)` 的值。 2. `__pmember` 的值減去 `offsetof()` 該結構 `struct` 型別 `type` 的偏移量,就可以得到該結構 `struct` 型別 `type` 其內部成員 `member` 的 container 的地址。 對於為何 `const __typeof__(((type *) 0)->member) *__pmember = (ptr)` 不會產生 Undefined behavior: dereference a `NULL` pointer,~~我們將問題推給 gcc 對於 `typeof` extension 的實作~~,參照: ["Why this 0 in ((type*)0)->member in C?" from Stack Overflow](https://stackoverflow.com/questions/13723422/why-this-0-in-type0-member-in-c) :::info **以 GNU extension `typeof` operator 優化 function-like macro 實作** 若在定義 Macro 時,善用 GNU extension 的 `typeof` operator 可以避免 double evaluation。 **Double evaluation 的範例** 假設我們實作了 function-like macro `max()`,並定義函式 `x()`、`y()` 如下 ```c #define max(a,b) (a < b ? b : a) int x(){ turnLeft(); return 0; } int y(){ turnRight(); return 1; } ``` 呼叫上述實作的 function-like macro `max()` ```c int findmax = max(x(), y()); ``` 將上面呼叫的 function-like macro `max()` 展開來看 ```c int findmax = max(x() < y() ? y() : x()); ``` 我們可以觀察到,`max()` 在被呼叫後,在比較大小 `x() < y()` 的過程中,`x()` 和 `y()` 都會個別被呼叫 1 次,此時 `turnLeft()` 和 `turnRight()` 也各會被執行一次。 因為 `y()` 函式 `return` 的值 `1` 比 `x()` 函式 `return` 的值 `0` 還要大,所以 `y()` 函式會被呼叫第 2 次,於是 `turnRight()` 會再被執行 1 次。 這樣造成的結果就會導致 `max()` 在被呼叫後,`turnLeft()` 被執行 1 次,而 `turnRight` 被執行 2 次的情形。 所以說,如果單純以 [運算表達式 (`expressions`)](https://en.cppreference.com/w/c/language/expressions),只用 `operators` 和 `operands` 去實作 function-like macro,一不小心就有可能產生會造成 double evaluation 的 function-like macro。 **以 GNU extension 的 `typeof` operator 改寫 `max()`** 實作如下: ```c #define max(a,b) \ ({ typeof (a) _a = (a); \ typeof (b) _b = (b); \ _a > _b ? _a : _b; }) ``` 若 `max()` 改用 GNU extension 的 `typeof` operator 實作,即可避免 double evaluation,同時亦可確保型別 (type) 的安全。 在 GNU C 裡面,GNU extension 可以利用 `__auto_type` operator 提供更好的方式實作 `max()`,如下: ```c #define max(a,b) \ ({ __auto_type _a = (a); \ __auto_type _b = (b); \ _a > _b ? _a : _b; }) ``` Reference: * ["What is double evaluation and why should it be avoided?" from Stack Overflow](https://stackoverflow.com/questions/39439181/what-is-double-evaluation-and-why-should-it-be-avoided) * [6.7 Referring to a Type with typeof from C Extensions (Using the GNU Compiler Collection (GCC))](https://gcc.gnu.org/onlinedocs/gcc/Typeof.html#Typeof) ::: #### Data Structure: `struct list_head - Head and node of a doubly-linked list` ```c struct list_head { struct list_head *prev, *next; }; ``` ```graphviz digraph list_head { rankdir=LR; node[shape=record, style=bold]; head [label="{<prev>prev|<next>next}"]; } ``` ```c typedef struct __element { char *value; struct list_head list; } list_ele_t; ``` ```graphviz digraph list { rankdir=LR; node[shape=record, style=bold]; subgraph cluster_0 { node [shape=record]; value [label="{value}"]; head [label="{<prev>prev|<next>next}"]; style=bold; label=list_ele_t } } ``` 簡化成下圖: ```graphviz digraph list_ele { rankdir=LR; node[shape=record]; node [shape=record]; head [label="value|{<left>prev|<right>next}", style=bold]; } ``` #### Function-like macro: `LIST_HEAD - Declare list head and initialize it` ```c #define LIST_HEAD(head) struct list_head head = {&(head), &(head)} ``` ```graphviz digraph list_init { rankdir=LR; node[shape=record]; style=bold node [shape=record]; head [label="value|{<left>prev|<right>next}", style="bold"]; head:right:e -> head:left:w[arrowhead=normal, arrowtail=normal, dir=both]; } ``` `list_entry() - Calculate address of entry that contains list node` ```c #define list_entry(node, type, member) container_of(node, type, member) ``` ```graphviz digraph container { rankdir=LR; node[shape=record]; compound=true; style=bold; subgraph cluster_0 { container [label = "container" style=filled,color=white]; subgraph cluster_1 { node [shape=record]; element [label="|{|}", style="bold"]; label = "member" } } element -> container[ltail=cluster_1, lhead=cluster_0]; } ``` `list_for_each - iterate over list nodes` ```c #define list_for_each(node, head) \ for (node = (head)->next; node != (head); node = node->next) ``` ```graphviz digraph ele_list { rankdir=LR; node[shape=record]; h [label="head", style=dashed, color=grey]; h -> e1:right:w [style=dashed, color=grey]; e1 [label="head node|{<left>prev|<right>next}", style="bold", color=grey]; e2 [label="cat|{<left>prev|<right>next}", style="bold"]; e3 [label="...", style="bold"]; e4 [label="eat|{<left>prev|<right>next}", style="bold"]; e5 [label="fat|{<left>prev|<right>next}", style="bold"]; e1:right:e -> e2:left:w[arrowhead=normal, arrowtail=normal, dir=both]; e2:right:e -> e3:left:w[arrowhead=normal, arrowtail=normal, dir=both]; e3:right:e -> e4:left:w[arrowhead=normal, arrowtail=normal, dir=both]; e4:right:e -> e5:left:w[arrowhead=normal, arrowtail=normal, dir=both]; e5:right:e -> e1:left:w[arrowhead=normal, arrowtail=normal, dir=both]; } ``` :::info TODO: 採用 circular linked list 是基於哪些考量? ::: #### Functions for implementing Merge Sort ### 改進上述程式碼並實作 #### 去除不必要的資料結構 不需要用到佇列 (Queue) 資料結構,此結構無益於 linked list merge sort 的排序,且同時浪費記憶體以及效能。 ```diff typedef struct __element { char *value; - struct __element *next; struct list_head list; } list_ele_t; - typedef struct { - list_ele_t *head; /* Linked list of elements */ - list_ele_t *tail; - size_t size; - struct list_head list; - } queue_t; ``` #### 修改函式 **函式: `list_del()`** `list_del()` 以命名上來說是 **刪除(delete)** list 的某個節點結構,將該節點移出 list 並將釋放其記憶體。然而觀察 `list_del()` 程式碼,其行為卻是只有將目標節點移出 list,而沒有實質上的去 __刪除 (delete)__ 該目標節點。因此這裡我認為比較好的命名方式是 __拆除 (remove)__ 目標節點,函式更名為 `list_remove()`。 ```diff /** - * list_del() - Remove a list node from the list + * list_remove() - Remove a list node from the list * @node: pointer to the node */ - static inline void list_del(struct list_head *node) + static inline void list_remove(struct list_head *node) { struct list_head *next = node->next, *prev = node->prev; next->prev = prev; prev->next = next; } ``` **函式: `list_merge_sort()`** 因為刪除不必要的佇列 (Queue) 結構,所以傳遞佇列 (Queue) 結構參數的函式必須要做修正,改成傳遞 `struct list_head` 的指標參數。 ```diff - void list_merge_sort(queue_t *q) + void list_merge_sort(struct list_head *list) { - if (list_is_singular(&q->list)) - return; - queue_t left; - INIT_LIST_HEAD(&left.list); - list_cut_position(&left.list, &q->list, &get_middle(&q->list)->list); + if (list_is_singular(list)) + return; + struct list_head left; + INIT_LIST_HEAD(&left); + list_cut_position(&left, list, &get_middle(list)->list); struct list_head sorted; list_merge_sort(&left); - list_merge_sort(q); - list_merge(&left.list, &q->list, &sorted); - INIT_LIST_HEAD(&q->list); - list_splice_tail(&sorted, &q->list); + list_merge_sort(list); + list_merge(&left, list, &sorted); + INIT_LIST_HEAD(list); + list_splice_tail(&sorted, list); } ``` **函式: `validate()`** 修改原因及結果同上。 ```diff - static bool validate(queue_t *q) + static bool validate(struct list_head *list) { struct list_head *node; - list_for_each (node, &q->list) { - if (node->next == &q->list) - break; + list_for_each (node, list) { + if (node->next == list) + break; if (strcmp(list_entry(node, list_ele_t, list)->value, list_entry(node->next, list_ele_t, list)->value) > 0) return false; } return true; } ``` **函式: `list_merge()`** 觀察函式 `list_merge()` 裡面,`if` `while` 以及最底下的三元運算子 (ternary operator) 的條件判斷皆與 `list_empty()` 相關。其中,不難發現,前面兩個 `if` 條件式所做的行為,與底下 `while` 以及最底下三元運算子 (ternary operator) 所做的行為自相矛盾。 而這兩個 `if` 條件式內有 `return` 且寫在 `while` 迴圈之前,因此先判斷 `list_empty(lhs)` 而後判斷 `list_empty(rhs)` 的寫法也會在順序上出現問題,因為 `list_empty(lhs)` 與 `list_empty(rhs)` 兩個條件應當是要被同時考量的存在。 舉例來說,在執行 `merge_sort()` 的過程中,會呼叫到 `get_middle()` 來切割 linked list。假設被切割的 linked list 的節點個數為奇數,最終在遞迴的過程中勢必會切割到其中一邊的 linked list 只有 1 個節點,而另一邊 linked list 有 2 個節點的情形。 假設傳進來的參數 `rhs` 接收到的數值是,含有 2 個節點的 linked list 指標,而 `lhs` 接收到的數值是,只含有 1 個節點的 linked list 指標,那麼在不修改題目原始程式碼的情形下,`if` 條件的行為便會造成 `merge_sort()` 的錯誤。 結論是,這兩個 `if` 條件除了是多餘的程式碼 (redundant code) 之外,也會造成 `merge_sort()` 演算法出錯,因此必須予以刪除。 ```diff static void list_merge(struct list_head *lhs, struct list_head *rhs, struct list_head *head) { INIT_LIST_HEAD(head); - if (list_empty(lhs)) { - list_splice_tail(lhs, head); - return; - } - if (list_empty(rhs)) { - list_splice_tail(rhs, head); - return; - } while (!list_empty(lhs) && !list_empty(rhs)) { char *lv = list_entry(lhs->next, list_ele_t, list)->value; char *rv = list_entry(rhs->next, list_ele_t, list)->value; struct list_head *tmp = strcmp(lv, rv) <= 0 ? lhs->next : rhs->next; list_remove(tmp); list_add_tail(tmp, head); } list_splice_tail(list_empty(lhs) ? rhs : lhs, head); } ``` **函式:`merge_sort()`** `merge_sort()` 需要考慮當 list 為空的情形,否則會有 segmentation fault。 ```diff void list_merge_sort(struct list_head *list) { - if (list_is_singular(list)) - return; + if (list_empty(list) || list_is_singular(list)) + return; struct list_head left; struct list_head sorted; INIT_LIST_HEAD(&left); list_cut_position(&left, list, &get_middle(list)->list); list_merge_sort(&left); list_merge_sort(list); list_merge(&left, list, &sorted); INIT_LIST_HEAD(list); list_splice_tail(&sorted, list); } ``` :::info TODO: ::: #### 新增函式 **函式: `list_alloc_add_tail()`** 動態配置記憶體,新增 `list_ele_t` 的節點 `new`,同時再次動態配置記憶體,複製一份 `char *s` 傳遞進來的字串到 `new->value`。最後再藉由 `list_add_tail()` 函式以及傳遞進來的 `struct list_head` 的指標參數,將新的節點 `new` 新增至 circular doubly linked list。 函式執行期間,若動態配置記憶體失敗,則 `return false`。需要注意,如果是在 `strdup()` 動態配置記憶體失敗,則必須歸還在 `list_alloc_add_tail()` 函式內先前已成功動態配置的記憶體 `new`。若兩次的動態配置記憶體皆已成功配置,則在函式執行到最後時 `return true`。 此實作手法類似於 [`quiz1`](https://hackmd.io/g1_AyhXXRRmwtRh965SKLw?view) 的 `q_insert_head()` 以及 `q_insert_tail` 的實作。 ```c bool list_alloc_add_tail(struct list_head *head, char *s) { list_ele_t *new = malloc(sizeof(list_ele_t)); if (!new) return false; new->value = strdup(s); if (!new->value) { free(new); return false; } list_add_tail(&new->list, head); return true; } ``` **Function-like macro: `list_for_each_safe()`** 為了能夠實作 `list_free()`,刪除整個 list 的函式,所以參考了 [linux/list.h](https://github.com/torvalds/linux/blob/master/include/linux/list.h) 的實作 `list_for_each_safe()`。 之所以使用 `list_for_each_safe()` 而非 `list_for_each()` 的原因是,使用 `list_for_each()` 會在刪除節點的過程中將存取下個節點的指標一併抹除,所以為了避免在 iterate over a list 的過程中存取不到下個節點,於是選擇採用此實作。 ```c /** * Shamelessly copy from linux/list.h * list_for_each_safe - iterate over a list safe against removal of list entry * @pos: the &struct list_head to use as a loop cursor. * @n: another &struct list_head to use as temporary storage * @head: the head for your list. */ #define list_for_each_safe(pos, n, head) \ for (pos = (head)->next, n = pos->next; pos != (head); \ pos = n, n = pos->next) ``` **函式: `list_free()`** 利用 `list_for_each_safe()`,走訪整個 linked list 的所有節點,然後再利用 `list_entry()` ,找到在 container (`list_ele_t` structure) 內部以及 container 本身被動態配置的記憶體,然後再將其釋放 (`free`) 掉。 ```c void list_free(struct list_head *head) { struct list_head *curr, *next; list_for_each_safe(curr, next, head) { list_remove(curr); list_ele_t *target = list_entry(curr, list_ele_t, list); free(target->value); free(target); } } ``` #### 修改主程式以及編譯執行 **主程式:`main()`** ```diff int main(void) { FILE *fp = fopen("cities.txt", "r"); if (!fp) { perror("failed to open cities.txt"); exit(EXIT_FAILURE); } - queue_t *q = q_new(); + LIST_HEAD(list); char buf[256]; while (fgets(buf, 256, fp)) { - q_insert_head(q, buf); + list_alloc_add_tail(&list, buf); } fclose(fp); - list_merge_sort(q); - assert(validate(q)); - q_free(q); + list_merge_sort(&list); + assert(validate(&list)); + list_free(&list); return 0; } ``` ``` $ gcc -Wall -o merge_sort merge_sort.c $ ./merge_sort ``` #### 檢查是否有 memory leak ``` $ valgrind --leak-check=full ./merge_sort ==19101== Memcheck, a memory error detector ==19101== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al. ==19101== Using Valgrind-3.15.0 and LibVEX; rerun with -h for copyright info ==19101== Command: ./merge_sort ==19101== ==19101== ==19101== HEAP SUMMARY: ==19101== in use at exit: 0 bytes in 0 blocks ==19101== total heap usage: 187,656 allocs, 187,656 frees, 4,529,436 bytes allocated ==19101== ==19101== All heap blocks were freed -- no leaks are possible ==19101== ==19101== For lists of detected and suppressed errors, rerun with: -s ==19101== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0) ``` ### 將 lib/list_sort.c 的實作抽離為可單獨執行 :::info ::: #### compile & execution :::info TODO:附上解釋 ::: #### 設計實驗 & 效能分析 ### 與 quiz1 作效能比較 (linked list merge_sort vs linked list quick_sort)\ ??用相同的資料結構?? :::info TODO:用相同資料結構刻出 QuickSort 並比較效能 ::: --- ## 測驗 2 考慮函式 `func()` 接受一個 16 位元無號整數 N,並回傳小於或等於 N 的 power-of-2。 ```c uint16_t func(uint16_t N) { /* change all right side bits to 1 */ N |= N >> 1; N |= N >> 2; N |= N >> 4; N |= N >> 8; return (N + 1) >> 1; } ``` 其實上述程式碼在做的事情是,就是 Find most significant (left-most) bit。 ### 解釋 `func()` 的運作原理 ```c N |= N >> 1; N |= N >> 2; N |= N >> 4; N |= N >> 8; ``` 上述程式碼的作用是,將所有比 N 的 most significant (left-most) bit 位置還要低的 bit 通通變成 1。原理是 `N |= N >> 1` 會將 most significant (left-most) bit 的下一個 bit 利用 `bitwise or` 也變成是 1,而 `N |= N >> 2` 會利用 `N |= N >> 1` 的結果將 most significant (left-most) bit 的下一個 bit 的後面兩個 bit 也變成是 1,以此類推。 因為函式 func 是接受一個 16 位元無號整數 N,所以 right shift 的最大值就會是 8。 * Example: N = (1000 0000 0000 0000)<sub>2</sub> = (32768)<sub>10</sub> ```c 1000 0000 0000 0000 or 0100 0000 0000 0000 (right-shifted by 1) is 1100 0000 0000 0000 ``` ```c 1100 0000 0000 0000 or 0011 0000 0000 0000 (right-shifted by 2) is 1111 0000 0000 0000 ``` ```c 1111 0000 0000 0000 or 0000 1111 0000 0000 (right-shifted by 4) is 1111 1111 0000 0000 ``` ```c 1111 1111 0000 0000 or 0000 0000 1111 1111 (right-shifted by 8) is 1111 1111 1111 1111 ``` :::info TODO: 描述並解釋 ::: 考慮到 `func()` 最後 `return` 的值為 `(N + 1) >> 1`,若 `N` 的值 = (1000 0000 0000 0000)<sub>2</sub> = (32768)<sub>10</sub>,推測在 `(N + 1)` 時有可能會產生 overflow。 稍微實作實驗一下 `func.c`: ```c #include <stdio.h> #include <stdint.h> uint16_t func(uint16_t N) { N = N | N >> 1; N = N | N >> 2; N = N | N >> 4; N = N | N >> 8; return (N + 1) >> 1; } int main(int argc, char *argv[]) { printf("%u\n", func(32768)); return 0; } ``` 執行時沒有發現什麼問題。 ``` $ gcc -Wall -g -o func func.c $ ./func 32768 ``` 使用 Address Sanitizer 也沒有報錯。 ``` $ clang -fsanitize=address -O1 -fno-omit-frame-pointer -g test.c $ ./a.out ``` 用 gcc 加入參數 `-Wconversion` 編譯時,就會報錯。 ``` $ gcc -o func -Wconversion func.c test.c: In function ‘func’: test.c:10:20: warning: conversion from ‘int’ to ‘uint16_t’ {aka ‘short unsigned int’} may change value [-Wconversion] 10 | return (N + 1) >> 1; | ~~~~~~~~^~~~ ``` 查看 C99 規格書。 :::info **C99 Standard - §6.3.1.1** If an `int` can represent all values of the original type, the value is converted to an `int`; otherwise, it is converted to an `unsigned int`. These are called the integer promotions. All other types are unchanged by the integer promotions. ::: 因此是不會發生 overflow 的。在 **C99 Standard** 中,有定義所謂 "integer promotions" 的行為,所以 `return (N + 1) >> 1;` 這段程式碼實際上會等同於: ```c return (uint8_t)((int)(N + 1) >> 1); ``` 用 gcc 編譯查看其 Assembly 可發現其端倪。 ``` $ gcc -S func.c -o func.s $ cat func.s ``` ```diff .file "func.c" .text .globl func .type func, @function func: .LFB6: .cfi_startproc endbr64 pushq %rbp .cfi_def_cfa_offset 16 .cfi_offset 6, -16 movq %rsp, %rbp .cfi_def_cfa_register 6 movl %edi, %eax movw %ax, -4(%rbp) movzwl -4(%rbp), %eax shrw %ax orw %ax, -4(%rbp) movzwl -4(%rbp), %eax shrw $2, %ax orw %ax, -4(%rbp) movzwl -4(%rbp), %eax shrw $4, %ax orw %ax, -4(%rbp) movzwl -4(%rbp), %eax shrw $8, %ax orw %ax, -4(%rbp) movzwl -4(%rbp), %eax addl $1, %eax + sarl %eax popq %rbp .cfi_def_cfa 7, 8 ret .cfi_endproc .LFE6: .size func, .-func .section .rodata .LC0: ... ``` :::info * `eax` is a 32 bit register. * `sar` right shifts (signed divides) a byte, word, or long value for a count specified by an immediate value and stores the quotient in that byte, word, or long respectively. The second variation right shifts by a count value specified in the CL register. sar rounds toward negative infinity; the high-order bit remains unchanged. notes: CSAPP: 3/E chapter 3 ::: 不過如果是在不同的機器上運作,便不能確定上述程式碼是安全的,因為 32-bit 的 `int` type 只會在有 64-bit 暫存器的機器上實現。所以我們最好還是把數值操作限縮在 `uint16_t` 大小的資料。 從 C99 規格書中可以得知 right-shift 為 well-defined behavior。 :::info **C99 Standard - §6.5.7** The result of **E1 >> E2** is **E1** right-shifted **E2** bit positions. If **E1** has an unsigned type or if **E1** has a signed type and a nonnegative value, the value of the result is the integral part of the quotient of **E1** / 2<sup>**E2**</sup>. If **E1** has a signed type and a negative value, the resulting value is implementation-defined. ::: 所以程式碼可以改寫如下: ```diff - return (N + 1) >> 1; + return (N >> 1) + 1; ``` ### Linux 核心的 round up/down power of 2 的運作機制 :::info TODO ::: --- ## 測驗 3 ### 解釋 `bitcpy()` 程式碼運作原理 #### 傳遞參數 ```c void bitcpy(void *_dest, /* Address of the buffer to write to */ size_t _write, /* Bit offset to start writing to */ const void *_src, /* Address of the buffer to read from */ size_t _read, /* Bit offset to start reading from */ size_t count) ``` [題目敘述](https://hackmd.io/@sysprog/linux2021-quiz2#%E6%B8%AC%E9%A9%97-3) * `input/_src`: 長度為 8 個 uint8 陣列 (總共 64 位元)。同時也是 `input` 的起始地址。 * `output/_dest`: 長度為 8 個 uint8 陣列 (總共 64 位元)。同時也是 `output` 的起始地址。 * `_write`: 從 `_dest` 的第 `_write` 個位元開始寫入 `_count` 位元數。 * `_read`: 從 `_src` 的第 `_read` 個位元開始讀取 `_count` 位元數。 * `count`: 讀取/寫入的位元數。 #### 變數 `read_lhs` `read_rhs` `source` `write_lhs` `write_rhs` `dest` 的意義 ```c size_t read_lhs = _read & 7; size_t read_rhs = 8 - read_lhs; const uint8_t *source = (const uint8_t *) _src + (_read / 8); ``` * `read_lhs`: `_read & 7` 可以視為是 `_read % 8`。決定在一個 byte 的大小內,從左邊數來的第 `read_lhs` 個 bit 開始讀取。 * `read_rhs`: 決定在一個 byte 的大小內,從右邊數來的第 `read_rhs` 個 bit 開始讀取。 * `source`: 指向 `uint8_t` 的指標,指標數值加 1 等同於指標指向下一個 8 個 bit 的起始位置。其值以 `_src` 為起始地址,加上 `(_read / 8)` 來決定從哪一個 byte 開始讀取。如下圖: ```graphviz digraph{ node [shape=plaintext, fontsize=60]; { node [shape=record, fontsize=50, width=30, fixsize=true]; bit [label="<p0>1111 1111 | <p1>1111 1111 | <p2>1111 1111 | <p3>1111 1111 | <p4>1111 1111 | <p5>1111 1111 | <p6>1111 1111 | <p7>1111 1111", style=bold]; _src [label="<p0>_src + 0 | <p1>_src + 1 | <p2> _src + 2 | <p3>_src + 3 | <p4>_src + 4 | <p5>_src + 5 | <p6>_src + 6 | <p7> _src + 7"]; } source [label="source", style=dashed]; edge [color=grey, style=bold]; source -> _src:p0; source -> _src:p1; source -> _src:p2; source -> _src:p3; source -> _src:p4; source -> _src:p5; source -> _src:p6; source -> _src:p7; edge [color=black, style=bold]; _src:p0 -> bit:p0; _src:p1 -> bit:p1; _src:p2 -> bit:p2; _src:p3 -> bit:p3; _src:p4 -> bit:p4; _src:p5 -> bit:p5; _src:p6 -> bit:p6; _src:p7 -> bit:p7; } ``` ```c size_t write_lhs = _write & 7; size_t write_rhs = 8 - write_lhs; uint8_t *dest = (uint8_t *) _dest + (_write / 8); ``` * `write_lhs`: 原理同 `read_lhs`。決定在一個 byte 的大小內,從左邊數來的第 `write_lhs` 個 bit 開始寫入。 * `write_rhs`: 原理同 `read_rhs`。決定在一個 byte 的大小內,從右邊數來的第 `write_rhs` 個 bit 開始寫入。 * `dest`: 原理同 `source`。以 `_dest` 為起始地址,加上`(_write / 8)` 來決定從哪一個 byte 開始寫入。如下圖: ```graphviz digraph{ node [shape=plaintext, fontsize=60]; { node [shape=record, fontsize=50, width=30, fixsize=true]; bit [label="<p0>1111 1111 | <p1>1111 1111 | <p2>1111 1111 | <p3>1111 1111 | <p4>1111 1111 | <p5>1111 1111 | <p6>1111 1111 | <p7>1111 1111", style=bold] _dest [label="<p0>_dest + 0 | <p1>_dest + 1 | <p2> _dest + 2 | <p3>_dest + 3 | <p4>_dest + 4 | <p5>_dest + 5 | <p6>_dest + 6 | <p7> _dest + 7"]; } edge [color=grey, style=bold]; dest -> _dest:p0; dest -> _dest:p1; dest -> _dest:p2; dest -> _dest:p3; dest -> _dest:p4; dest -> _dest:p5; dest -> _dest:p6; dest -> _dest:p7; edge [color=black, style=bold]; _dest:p0 -> bit:p0; _dest:p1 -> bit:p1; _dest:p2 -> bit:p2; _dest:p3 -> bit:p3; _dest:p4 -> bit:p4; _dest:p5 -> bit:p5; _dest:p6 -> bit:p6; _dest:p7 -> bit:p7; } ``` #### `bit_mask` 的使用 ```c static const uint8_t read_mask[] = { 0x00, /* == 0 00000000b */ 0x80, /* == 1 10000000b */ 0xC0, /* == 2 11000000b */ 0xE0, /* == 3 11100000b */ 0xF0, /* == 4 11110000b */ 0xF8, /* == 5 11111000b */ 0xFC, /* == 6 11111100b */ 0xFE, /* == 7 11111110b */ 0xFF /* == 8 11111111b */ }; static const uint8_t write_mask[] = { 0xFF, /* == 0 11111111b */ 0x7F, /* == 1 01111111b */ 0x3F, /* == 2 00111111b */ 0x1F, /* == 3 00011111b */ 0x0F, /* == 4 00001111b */ 0x07, /* == 5 00000111b */ 0x03, /* == 6 00000011b */ 0x01, /* == 7 00000001b */ 0x00 /* == 8 00000000b */ }; ``` 可以觀察到其實 `read_mask[]` 和 `write_mask[]` 的內容其實是一樣的,只是順序是倒過來的而已。所以其中一個 `bit_mask` 其實是多餘的。 #### 迴圈終止條件 & 讀取/寫入 bit 實作 bit_copy ```c while (count > 0) { ... size_t bitsize = (count > 8) ? 8 : count; ... count -= bitsize; } ``` `while` 迴圈會在 `count` 為 `0` 的時候,也就是所有的 bit 被複製完的時候離開 `while` 迴圈。 考量到 `count` 不一定為 `8` 的倍數,因此宣告一個 `bitsize` 變數,其值為迴圈的一次循環中,讀取/寫入的 bit 的個數,一次最多為 `8` 個 bit。而當迴圈執行最後一次時,`bitsize` 的值便是 `count` 除以 `8` 的餘數。 考慮到以下 bitwise operation 來讀取/寫入 bit,以實作 bit_copy。 * `X |= 0 -> X` * `X &= 1 -> X` #### 讀取 ```c uint8_t data = *source++; size_t bitsize = (count > 8) ? 8 : count; if (read_lhs > 0) { data <<= read_lhs; if (bitsize > read_rhs) data |= (*source >> read_rhs); } ``` 1. 首先 `data` 複製讀取的 `*source` 的起始 byte 的資料,大小為 1 byte,也就是 8 個 bit。`*source` 被讀完 1 個 byte 之後前往下一個 byte,也就是下一個 8-bit 的起始位置。 2. 在該 byte 內,如果要讀取的起始 bit 的位置沒有對齊讀進來的 byte 的起始位置,也就是當 `read_lhs` 不為 `0` 的時候,則讀取要從左邊數來的第 `read_lhs` 個 bit 開始讀取。所以 `data` 必須要左移 `read_lhs` 個 bit,`data` 右邊的 `read_lhs` 個 bit 會補上 `0`。 3. `bitsize > read_rhs` 的意義是,是否所有要讀取的 bit 都已經完全讀取結束,如果沒有完全讀取結束,則需要跨兩個 byte,再從已前進至下一個 byte 位置的 `*source`,將其值右移 `read_lhs` 個 bit,如此才能複製前 `read_lhs` 個 bit 至 `data` 剛剛被清 `0` 的空間。 ```c if (bitsize < 8) data &= read_mask[bitsize]; ``` 4. `bitsize < 8` 會意味著 `while` 迴圈的執行是否已經到達最後一次,若是最後一次,則利用 `read_mask` 做為遮罩,將多餘讀取的 bit 抹除掉。 ```graphviz digraph { node [shape=plaintext, fontsize=60]; text [label="Assume read_lhs = 3"]; { node [shape=record, fontsize=50, width=30, fixsize=true]; next [label="<p0>X | <p1>X | <p2>X | <p3>X | <p4>X | <p5>X | <p6>X | <p7>X", style=bold] curr [label="<p0>X | <p1>X | <p2>X | <p3>X | <p4>X | <p5>X | <p6>X | <p7>X", style=bold]; prev [label="<p0>X | <p1>X | <p2>X | <p3>X | <p4>X | <p5>0 | <p6>0 | <p7>0", style=bold]; } { rank=same; prev; "" "left-shift by 3" } edge [color=white, style=bold]; text -> prev; prev:p0 -> curr:p0; prev:p7 -> curr:p7; curr:p0 -> next:p0; } ``` ``` read --- XXXXXXXX data copy from *source --- if read_lhs = 3, read_rhs = 5, bitsize <= read_rhs XXXXX000 data left-shift by read_lhs --- if read_lhs = 3, read_rhs = 5, bitsize > read_rhs XXXXX000 data left-shift by read_lhs or 00000XXX *source right-shift by read_rhs is XXXXXXXX --- if bitsize = 6 XXXXXXXX data and 11111100 read_mask[bitsize] is XXXXXX00 --- read end --- ``` #### 寫入 ```c uint8_t original = *dest; uint8_t mask = read_mask[write_lhs]; if (bitsize > write_rhs) { /* Cross multiple bytes */ *dest++ = (original & mask) | (data >> write_lhs); original = *dest & write_mask[bitsize - write_rhs]; *dest = original | (data << write_rhs); } else { // Since write_lhs + bitsize is never >= 8, no out-of-bound access. mask |= write_mask[write_lhs + bitsize]; *dest++ = (original & mask) | (data >> write_lhs); } ``` ``` write --- XXXXXXXX original copy from *dest --- if write_lhs = 6 11111100 mask = read_mask[write_lhs] --- if write_rhs = 2, bitsize = 3 XXXXXXXX original and 11111100 mask is XXXXXX00 original & mask 000000XX data right-shift by write_lhs XXXXXX00 original & mask or 000000XX data right-shift by write_lhs XXXXXXXX ``` ### 重寫為同樣功能但效率更高的實作 #### 改善 bit_mask 的實作 #### :::info unaligned/aligned memory memcpy() ::: #### :::info TODO ::: ### Linux 核心對於 `bit_cpy` 的實作 :::info TODO ::: --- ## 測驗 4 ### 解釋字串駐留 (String Interning) 運作原理 https://hackmd.io/@sysprog/linux2021-quiz2#%E6%B8%AC%E9%A9%97-4 :::info TODO ::: ### 解釋程式碼運作原理 #### `cstr.h` **`enum`** ```c enum { CSTR_PERMANENT = 1, CSTR_INTERNING = 2, CSTR_ONSTACK = 4, }; ``` * `CSTR_PERMANENT` - 表示字串儲存在 heap 的狀態。 * `CSTR_INTERNING` - 表示儲存字串的空間作為 string interning 使用的狀態。 * `CSTR_ONSTACK` - 表示字串儲存在 stack 的狀態。 **`struct`** ```c typedef struct __cstr_data { char *cstr; uint32_t hash_size; uint16_t type; uint16_t ref; } * cstring; ``` * `cstr` - 儲存字串的空間。 * `hash_size` - 當字串的狀態為 `CSTR_ONSTACK`,`hash_size` 的值為字串的長度 (`size`),否則其值為 hash values (`hash`)。 * `type` - 當前字串儲存在的空間的狀態。 * `ref` - reference counting。 `cstring` 為一個指向 `struct __cstr_data` 結構的指標。 ```c typedef struct __cstr_buffer { cstring str; } cstr_buffer[1]; ``` * str - `cstring` type,為指向 `struct __cstr_data` 結構的指標。 **Function-like macro: `CSTR_BUFFER()`** ```c #define CSTR_BUFFER(var) \ char var##_cstring[CSTR_STACK_SIZE] = {0}; \ struct __cstr_data var##_cstr_data = {var##_cstring, 0, CSTR_ONSTACK, 0}; \ cstr_buffer var; \ var->str = &var##_cstr_data; ``` :::info Reference: C99 Standard - §6.10.3.3 The ## operator ::: 創造一個 `cstring` 指標結構的 buffer (`cstr_buffer`),其字串指標成員 `cstr` 指向從 stack 上抓取的大小為 `CSTR_STACK_SIZE * sizeof(char)` 的一段空間,`hash_size` (在此為 `size`) 以及 `ref` (reference counting) 此時皆設為 `0`,狀態設為 `ON_STACK`。 **Function-like macro: `CSTR_LITERAL()`** ```c #define CSTR_LITERAL(var, cstr) \ static cstring var = NULL; \ if (!var) { \ cstring tmp = cstr_clone("" cstr, (sizeof(cstr) / sizeof(char)) - 1); \ if (tmp->type == 0) { \ tmp->type = CSTR_PERMANENT; \ tmp->ref = 0; \ } \ if (!__sync_bool_compare_and_swap(&var, NULL, tmp)) { \ if (tmp->type == CSTR_PERMANENT) \ free(tmp); \ } \ } ``` 首先,被宣告的變數 `var` 為 static variable,擁有完整的程式執行生命週期 (life cycle)。 之後,被宣告的 `cstring` 變數 `tmp` 會接住 `cstr_clone()` 的回傳值,其中 `cstr_clone()` 會複製 `cstr` 字串並對此做 string interning,詳細情形會在 **`cstr.c`** 做說明。 而有關 `__sync_bool_compare_and_swap()` GNU C Extentions 的使用,則為考量到多執行緒的狀況下,因 `cstr_clone()` 執行過程中有機會動態配置記憶體,可能因此造成 memory leak 的問題。 :::info Reference: [GNU C extensions - §5.44 Built-in functions for atomic memory access](https://gcc.gnu.org/onlinedocs/gcc-4.1.1/gcc/Atomic-Builtins.html) ::: **Function-like macro: `CSTR_CLOSE()`** ```c #define CSTR_CLOSE(var) \ do { \ if (!(var)->str->type) \ cstr_release((var)->str); \ } while (0) ``` 被傳進來的參數 `var` 預期的值為指向結構體 `struct __cstr_buffer` 的指標。而 `do{ ... } while (0)` 會強制讓整個迴圈內的行為只留在迴圈內,並且只執行一次。 `cstr_release()` 會釋放掉在 `cstring` 結構內的成員變數 `cstr` 被動態配置的記憶體。 有關 `var->str->type` 的值為 `0` 的狀況以及 `cstr_release()` 的作用會在 `cstr.c` 詳細說明。 #### `cstr.c` **`struct`** ```c struct __cstr_node { char buffer[CSTR_INTERNING_SIZE]; struct __cstr_data str; struct __cstr_node *next; }; ``` * `buffer` - 存成員 `str` 內部的字串值的 buffer。 * `str` - `struct __cstr_data` 型別,內容包含字串值,記憶體存放區域及狀態,hash value or size,以及 reference counting。 * `next` - 指向下一個 `__cstr_node` 結構的指標。 ```c struct __cstr_pool { struct __cstr_node node[INTERNING_POOL_SIZE]; }; ``` * `node` - 為 `__cstr_node` 結構體的陣列,陣列大小為 `INTERNING_POOL_SIZE`。 此結構存在的目的是為了讓 `malloc()` 在動態配置記憶體時,能夠配置出一段連續的記憶體空間。讓在 string interning 的過程中,實現 cache 機制優化效能,也避免了過多次 `malloc()` 造成 [memory fragmentation](https://www.researchgate.net/publication/295010953_Dynamic_Memory_Allocation_and_Fragmentation) 的問題。 ```c struct __cstr_interning { int lock; int index; unsigned size; unsigned total; struct __cstr_node **hash; struct __cstr_pool *pool; }; ``` * `lock` - 考量多執行緒的情況,需要此成員變數來避免存取字串過程中造成錯誤。 * `index` - 存 `pool` 指向的陣列的某個 index。 * `size` - 存目前 hash table 的大小。 * `total` - 存目前被駐留的字串 (interned strings) 的數量。 * `hash` - 為一個指向 `struct __cstr_node` 結構體的指標的指標。在 string interning 過程中會連接 `pool` 裡面的 node 而逐漸成為一個以 `struct __cstr_node` 指標為單位所組成的 hash table。 * `pool` - 為一個指向 `struct __cstr_pool` 結構體的指標,其成員存著一塊連續的記憶體 (`node[INTERNING_POOL_SIZE]`)。而這塊連續的空間之後會在 string interning 的過程中,逐漸成為一個 hash table。 **global variable** ```c static struct __cstr_interning __cstr_ctx; ``` **函式: `xalloc()`** ```c static void *xalloc(size_t n) { void *m = malloc(n); if (!m) exit(-1); return m; } ``` 動態配置記憶體。 `xalloc()` 用 `if` 條件式包裝 `malloc()`,讓 `malloc()` 若在動態配置記憶體失敗回傳 `NULL` 時,直接終止整支程式。 **函式:`insert_node()`** ```c static inline void insert_node(struct __cstr_node **hash, int sz, struct __cstr_node *node) { uint32_t h = node->str.hash_size; int index = h & (sz - 1); node->next = hash[index]; hash[index] = node; } ``` 將 `node` 放進 hash table `hash` 裡面。 其中 `h & (sz - 1)` 可以視為是 `h % sz`,目的是為了確保 hash value 沒有越界存取記憶體,造成 segmentation fault。但是此舉也有可能造成不同字串有著相同的 hash value,發生碰撞。 **函式: `expand()`** ```c static void expand(struct __cstr_interning *si) { unsigned new_size = si->size * 2; if (new_size < HASH_START_SIZE) new_size = HASH_START_SIZE; struct __cstr_node **new_hash = xalloc(sizeof(struct __cstr_node *) * new_size); memset(new_hash, 0, sizeof(struct __cstr_node *) * new_size); for (unsigned i = 0; i < si->size; ++i) { struct __cstr_node *node = si->hash[i]; while (node) { struct __cstr_node *tmp = node->next; insert_node(new_hash, new_size, node); node = tmp; } } free(si->hash); si->hash = new_hash; si->size = new_size; } ``` 增加 hash table size,每次增加一倍。 注意新宣告的 `unsigned` type 的變數 `new_size`,有可能在增加一倍的時候 overflow,所以必須要加上這段程式碼: ```c if (new_size < HASH_START_SIZE) new_size = HASH_START_SIZE; ``` 在成功動態配置一段新的記憶體之後,將舊的 hash table 指向的每個 node 放進這塊新的空間裡面,再將舊的 hash table 佔的空間 `free()` 掉,然後再讓 `si->hash` 指向這段新的空間,成為一個新的 hash table。 **函式: `interning()`** ```c static cstring interning(struct __cstr_interning *si, const char *cstr, size_t sz, uint32_t hash) ``` 實現字串駐留 (string interning) 的實作。 ```c if (!si->hash) return NULL; ``` 當實現字串駐留 (string interning) 裡面的 hash table 為空,無法找到 interned strings,回傳 `NULL`。 ```c int index = (int) (hash & (si->size - 1)); struct __cstr_node *n = si->hash[index]; ``` `(int) (hash & (si->size - 1))` 可以視為是 `(int) (hash % si->size)`,此舉確保 hash value 不會超過 hash table 的 index 的最大值,同時確保與函式 `insert_node()` 在實際決定存取 hash table 的 index 值時,有著同樣的規則。 ```c while (n) { if (n->str.hash_size == hash) { if (!strcmp(n->str.cstr, cstr)) return &n->str; } n = n->next; } ``` `while()` 迴圈內的行為是,找到實際真正對應的 interned string。 由於先前在 `insert_node()` 的實作,發現有可能發生 hash value 的碰撞,故而發生碰撞的資料會存成一個 singly linked list 連接在該 hash table 對應的位置。因此需要迴圈內跑完整個 linked list 找出實際真正對應的 interned string。 若成功找到對應的 interned string,則回傳該 interned string 的記憶體參照 (reference)。 ```c // 80% (4/5) threshold if (si->total * 5 >= si->size * 4) return NULL; ``` 若先前沒有成功找到對應的 interned string,且 hash table 內節點密度太高,有可能發生 hash value 碰撞的次數太多(在這裡的情況是,interned string 的節點個數為 hash table 能容納的大小的 80%),有可能因此影響到 hash 的效率,所以不宜再增加 interned string 的節點數,於是回傳 `NULL`。 但是此方法無法保證 hash table 內,interned string 節點皆是散在 hash table 的各處,此判斷方式並不嚴謹。 ```c if (!si->pool) { si->pool = xalloc(sizeof(struct __cstr_pool)); si->index = 0; } n = &si->pool->node[si->index++]; memcpy(n->buffer, cstr, sz); n->buffer[sz] = 0; cstring cs = &n->str; cs->cstr = n->buffer; cs->hash_size = hash; cs->type = CSTR_INTERNING; cs->ref = 0; n->next = si->hash[index]; si->hash[index] = n; return cs; ``` 若先前沒有成功找到對應的 interned string,且 hash table 內的節點的密度沒有太大,則允許 hash table 加入更多的 interned string 節點。 首先確認可被拿來使用的空間 `pool` 是否為空,若為空,則動態配置記憶體並初始化其對應資料 (`si->index`)。之後拿這段空間,以`__cstr_node` 為單位,來當作即將放入 hash table 的節點。 由於先前已經有確認在 interned string 的節點個數為 hash table 大小的 80% 時會回傳 `NULL`,所以 `si->index++` 不會因此越界存取記憶體而造成 segmentation fault。 再加入 interned string 節點至 hash table 的過程中,首先先複製傳進來的字串參數至 buffer (`n->buffer`) 中,結尾要加上 null terminator `'\0'` (`0`),之後再將真正要存 interned string 的字串指標 (`n->str->cstr`) 指向該 buffer (`n->buffer`) 的位置,之後再 interned string 的節點結構的內部成員 `cs->type` 狀態賦值為 `CSTR_INTERNING`,同時因為是新增至 hash table 的節點,所以 reference count `cs->ref` 為 `0`。 最後才真正將該節點放入 hash table 內,讓該 interned string 節點指向 hash table 對應的位置。 **函式: `cstr_interning()`** ```c static cstring cstr_interning(const char *cstr, size_t sz, uint32_t hash) { cstring ret; CSTR_LOCK(); ret = interning(&__cstr_ctx, cstr, sz, hash); if (!ret) { expand(&__cstr_ctx); ret = interning(&__cstr_ctx, cstr, sz, hash); } ++__cstr_ctx.total; CSTR_UNLOCK(); return ret; } ``` 函式 `interning()` 的包裝以及,考量多執行緒環境下的字串駐留 (string interning) 實現。 當第一次實行字串駐留 (string interning) `interning()` 失敗,回傳 `NULL`,則呼叫 `expand()` 將 hash table 擴大一倍,再次實行字串駐留 (string interning) 呼叫 `interning()`。 :::danger 假使 total 變數的意義是記錄 interned strings 的個數,那麼這裡 `++__cstr_ctx.total` 便不應該出現在此 `cstr_interning()` 函式中,因為 `interning()` 函式有可能回傳的值是已經被駐留的字串 (interned strings) 在 hash table 裡的記憶體位置參照,而非新增至 hash table 裡面的 interned string 節點。 所以 `++__cstr_ctx.total` 這行 expression 應該要放在 `interning()` 函式中。 ```diff static cstring cstr_interning(const char *cstr, size_t sz, uint32_t hash) { ... - ++__cstr_ctx.total; ... } ``` ```diff static cstring interning(struct __cstr_interning *si, const char *cstr, size_t sz, uint32_t hash) { ... n->next = si->hash[index]; si->hash[index] = n; + ++__cstr_ctx.total; } ``` ::: **函式: `hash_blob()`** ```c static inline uint32_t hash_blob(const char *buffer, size_t len) { const uint8_t *ptr = (const uint8_t *) buffer; size_t h = len; size_t step = (len >> 5) + 1; for (size_t i = len; i >= step; i -= step) h = h ^ ((h << 5) + (h >> 2) + ptr[i - 1]); return h == 0 ? 1 : h; } ``` 此為打散資料分布至 hash table 的 hash function。 :::info TODO: explain this hash function. ::: **函式: `cstr_clone()`** ```c cstring cstr_clone(const char *cstr, size_t sz) { if (sz < CSTR_INTERNING_SIZE) return cstr_interning(cstr, sz, hash_blob(cstr, sz)); cstring p = xalloc(sizeof(struct __cstr_data) + sz + 1); if (!p) return NULL; void *ptr = (void *) (p + 1); p->cstr = ptr; p->type = 0; p->ref = 1; memcpy(ptr, cstr, sz); ((char *) ptr)[sz] = 0; p->hash_size = 0; return p; } ``` 傳入要被讀取字串的資料的副本,確認是否符合 interned string 的大小,再決定要不要呼叫 `cstr_interning()`。若傳入的字串沒有做字串駐留 (string interning),則動態配置記憶體,賦予其對應的 `cstring` 型別結構體,再回傳其地址。 :::info 這裡 `cstring p = xalloc(sizeof(struct __cstr_data) + sz + 1);` 可以保證記憶體連續,所以在存取此 `cstring` 型別比起額外 `malloc()` 或者 `strdup()` 動態配置記憶體給 `p->cstr`,可以增加效能。 ::: :::warning 因為已經有 `xalloc()` 包裝 `malloc()`,所以當 `malloc()` 回傳 `NULL` 時,就會直接終止程式,因此就不需要額外再關注 `p` 的值是否為 `NULL`。 ```diff cstring cstr_clone(const char *cstr, size_t sz) { ... cstring p = xalloc(sizeof(struct __cstr_data) + sz + 1); - if (!p) - return NULL; ... return p; } ``` 但是若再多執行緒的情形下是否需要此 `if` 條件,尚待釐清。 ```diff cstring cstr_clone(const char *cstr, size_t sz) { ... - void *ptr = (void *) (p + 1); + char *ptr = (char *) (p + 1); ... } ``` 此處的 `(void *)` 型別應改為 `(char *)`,但是其輸出卻與型別不同沒有什麼差別,且觀察 `p` 後面不管加何種數值,輸出皆為一致。 實驗: **`test.c`** ```c #include <stdio.h> #include <stdlib.h> #include <stdint.h> #include <string.h> typedef struct __cstr_data { char *cstr; uint32_t hash_size; uint16_t type; uint16_t ref; } * cstring; cstring cstr_clone(const char *cstr, size_t sz) { cstring p = malloc(sizeof(struct __cstr_data) + sz + 1); void *ptr = (void *) (p + 1); p->cstr = ptr; p->type = 1; p->ref = 2; memcpy(ptr, cstr, sz); ((char *) ptr)[sz] = 0; p->hash_size = 3; return p; } cstring cstr_clone2(const char *cstr, size_t sz) { cstring p = malloc(sizeof(struct __cstr_data) + sz + 1); char *ptr = (char *) (p + 9999); p->cstr = ptr; p->type = 1; p->ref = 2; memcpy(ptr, cstr, sz); ((char *) ptr)[sz] = 0; p->hash_size = 3; return p; } int main(int argc, char *argv[]) { char test[] = "hello world!"; int len = strlen(test); cstring p = cstr_clone(test, len); cstring p2 = cstr_clone(test, len); printf("%d %d %d %s\n", p->hash_size, p->ref, p->type, p->cstr); printf("%d %d %d %s\n", p2->hash_size, p2->ref, p2->type, p2->cstr); return 0; } ``` ``` $ gcc test.c $ ./a.out 3 2 1 hello world! 3 2 1 hello world! ``` 尚待釐清原因。 // 推測指向字串的指標可為非連續記憶體 實驗: printf("%p")... ::: **函式: `cstr_grab()`** ```c cstring cstr_grab(cstring s) { if (s->type & (CSTR_PERMANENT | CSTR_INTERNING)) return s; if (s->type == CSTR_ONSTACK) return cstr_clone(s->cstr, s->hash_size); if (s->ref == 0) s->type = CSTR_PERMANENT; else __sync_add_and_fetch(&s->ref, 1); return s; } ``` 此函式為抓取一個 `cstring` 型別,為指向 `struct __cstr_data` 結構體的指標,判斷其內部資料後,對其做相對應的處理。 檢查此結構體裡面的成員 `type`,分為以下的 case: * 若 `type` 是 `CSTR_PERMANENT` 或是 `CSTR_INTERNING`,意義上,為被動態配置記憶體但因字串大小過大而沒有做字串駐留 (string interning) 的字串,或是已經字串駐留的字串 (interned string),則回傳 `s` 指標本身。 * 若 `type` 是 `CSTR_ONSTACK`,則代表 `__cstr_data` 結構體本身是作為一個 buffer 存在,因此呼叫 `cstr_clone()` 並回傳其回傳的 `cstring` 指標。 * 當 `type` 為 `0` 時,且 `ref` (reference count) 為 `0` 時,將 `type` 賦值為 `CSTR_PERMANENT`。 * 當 `type` 為 `0` 時,且 `ref` (reference count) 不為 `0`,則將 `ref` 的值 + 1。 **函式: `cstr_release()`** ```c void cstr_release(cstring s) { if (s->type || !s->ref) return; if (__sync_sub_and_fetch(&s->ref, 1) == 0) free(s); } ``` 此函式會將符合條件的 `cstring` 指標指向的結構體,釋放其被動態配置的記憶體。 符合的條件如下: * 其成員變數 `type` 必須為 `0`。 * 其成員變數 `ref` (reference counting) 被 `-1` 後必須為 `0`。 若 `type` 為 `0` 但 `ref` (reference count) 不為 `0`,則將 `ref` 的值 `-1`。 :::warning 但上述程式碼產生一個問題:若傳進來的 `cstring s` 一開始的 `type` 為 `0` 且 `ref` 為 `0` 時,就會直接 `return`,但是若只觀察在 `quiz2` 的程式碼,直覺上不會發生這種情況,不知此設計基於何種考量?是否為多執行緒上的考量? ::: **函式: `cstr_hash()`** ```c static size_t cstr_hash(cstring s) { if (s->type == CSTR_ONSTACK) return hash_blob(s->cstr, s->hash_size); if (s->hash_size == 0) s->hash_size = hash_blob(s->cstr, strlen(s->cstr)); return s->hash_size; } ``` 對於傳進來 `cstring s`,判斷其內部成員的資料後,決定並回傳 `hash_size` 的值。 當傳進來的 `cstring s` 字串的狀態為 `CSTR_ONSTACK`,代表此結構體的 `hash_size` 的值為字串的長度 (`size`),而非 hash values (`hash`),因此呼叫 `hash_blob()` 計算並回傳其 hash value。 如果 `(hash_size == 0)` 成立,則代表傳進來的字串會是字串大小過大而沒有做字串駐留 (string interning) 的字串結構體。不會是作為 buffer 的結構體是因為 buffer 本身狀態是 `CSTR_ONSTACK`,所以會是上一個條件判斷的 case。,此情形亦會計算並回傳其 hash value。 **函式: `cstr_equal()`** ```c int cstr_equal(cstring a, cstring b) { if (a == b) return 1; if ((a->type == CSTR_INTERNING) && (b->type == CSTR_INTERNING)) return 0; if ((a->type == CSTR_ONSTACK) && (b->type == CSTR_ONSTACK)) { if (a->hash_size != b->hash_size) return 0; return memcmp(a->cstr, b->cstr, a->hash_size) == 0; } uint32_t hasha = cstr_hash(a); uint32_t hashb = cstr_hash(b); if (hasha != hashb) return 0; return !strcmp(a->cstr, b->cstr); } ``` 傳進來參數是 `cstring` 型別的 `a`、`b`,為指向 `struct __cstr_data` 結構體的指標,此函式會判斷兩個指標中,結構體內的字串是否相同。處理方式會分幾個 case,詳述如下: * 若傳進來的 `a`、`b` 指標為同一個指標,則 `return 1` 代表字串內容相同。 * 若傳進來的 `a`、`b` 皆為 interned string,則皆為 hash table 內的節點,因此字串內容不會是一樣的,所以 `return 0`。 * 若傳進來的 `a`、`b` 其結構體內所存的字串皆在 stack 上,`type ` 皆為 `CSTR_ONSTACK`,則兩個結構體內的 `hash_size` 紀錄的會是字串的長度 (`size`),而非 hash values (`hash`),因此若兩者的長度不同,便可以得知兩個字串的內容不相符。若兩者長度相同,則交由 `memcmp()` 逐 byte (`unsigned char`) 檢查兩者是否相同。 * 若傳進來的 `a`、`b` 皆不符合上述的 case,則交由 hash function 比對兩者 hash 值,若 hash 值不相同,則 `return 0`。 * 最後若上述的所有 case 沒有一個是符合的,才交由 `strcmp()` 比對字串並回傳結果。 :::info `memcmp()` vs `strcmp()` // TODO reference: [What is the difference between memcmp, strcmp and strncmp in C? from Stack Overflow](https://stackoverflow.com/questions/13095513/what-is-the-difference-between-memcmp-strcmp-and-strncmp-in-c) [memcmp versus strcmp, why is memcmp faster? from Tek-Tips.COM](https://www.tek-tips.com/viewthread.cfm?qid=1323083) ::: **函式: `cstr_cat2()`** ```c static cstring cstr_cat2(const char *a, const char *b) { size_t sa = strlen(a), sb = strlen(b); if (sa + sb < CSTR_INTERNING_SIZE) { char tmp[CSTR_INTERNING_SIZE]; memcpy(tmp, a, sa); memcpy(tmp + sa, b, sb); tmp[sa + sb] = 0; return cstr_interning(tmp, sa + sb, hash_blob(tmp, sa + sb)); } cstring p = xalloc(sizeof(struct __cstr_data) + sa + sb + 1); if (!p) return NULL; char *ptr = (char *) (p + 1); p->cstr = ptr; p->type = 0; p->ref = 1; memcpy(ptr, a, sa); memcpy(ptr + sa, b, sb); ptr[sa + sb] = 0; p->hash_size = 0; return p; } ``` // duplicate & concat strings, case for heap allocated strings. Also interns the concatenated string after duplicate & concat is done. **函式: `cstr_cat()`** ```c cstring cstr_cat(cstr_buffer sb, const char *str) { cstring s = sb->str; if (s->type == CSTR_ONSTACK) { int i = s->hash_size; while (i < CSTR_STACK_SIZE - 1) { s->cstr[i] = *str; if (*str == 0) return s; ++s->hash_size; ++str; ++i; } s->cstr[i] = 0; } cstring tmp = s; sb->str = cstr_cat2(tmp->cstr, str); cstr_release(tmp); return sb->str; } ``` // duplicate & concat strings, case for ONSTACK buffer. ### 對應測試程式 // TODO :::danger 還沒改之前,用 valgrind 測試是否有 memory leak ``` $ valgrind --leak-check=full ./cstr ==45589== Memcheck, a memory error detector ==45589== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al. ==45589== Using Valgrind-3.15.0 and LibVEX; rerun with -h for copyright info ==45589== Command: ./cstr ==45589== equal ==45589== ==45589== HEAP SUMMARY: ==45589== in use at exit: 57,472 bytes in 2 blocks ==45589== total heap usage: 3 allocs, 1 frees, 58,496 bytes allocated ==45589== ==45589== LEAK SUMMARY: ==45589== definitely lost: 0 bytes in 0 blocks ==45589== indirectly lost: 0 bytes in 0 blocks ==45589== possibly lost: 0 bytes in 0 blocks ==45589== still reachable: 57,472 bytes in 2 blocks ==45589== suppressed: 0 bytes in 0 blocks ==45589== Reachable blocks (those to which a pointer was found) are not shown. ==45589== To see them, rerun with: --leak-check=full --show-leak-kinds=all ==45589== ==45589== For lists of detected and suppressed errors, rerun with: -s ==45589== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0) ``` 有 memory leak,要改進. TODO: rerun the valgrind for adding the option --leak-check=full --show-leak-kinds=all -s for tracing the memory leak function. ::: ### 考量多執行緒實現字串駐留 (String Interning) :::info TODO ::: ### 分析效能表現 :::info TODO :::