--- ###### tags: `sysprog2021q1` --- # 2021q1 Homework2 (quiz2) contributed by < `93i7xo2` > > Source: [2021q1 第 2 週測驗題](https://hackmd.io/@sysprog/linux2021-quiz2) ## To-do list - 測驗3 - [ ] Linux 核心原始程式碼找出逐 bit 進行資料複製的程式碼,並解說相對應的情境 - 測驗4 - [ ] 上述程式碼使用到 gcc [atomics builtins](https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html),請透過 POSIX Thread 在 GNU/Linux 驗證多執行緒的環境中,string interning 能否符合預期地運作?若不能,請提出解決方案 - 測驗2 - [ ] 研讀 [slab allocator](https://en.wikipedia.org/wiki/Slab_allocation),探索 Linux 核心的 slab 實作,找出運用 power-of-2 的程式碼和解讀 - 共筆 - [ ] [WayneLin1992](https://hackmd.io/@WayneLin1992/SkTJzC3G_) ## 測驗 `1` ### 解析`container_of` ```cpp= #define container_of(ptr, type, member) \ __extension__({ \ const __typeof__(((type *) 0)->member) *__pmember = (ptr); \ (type *) ((char *) __pmember - offsetof(type, member)); \ }) ``` 首先在[你所不知道的C語言:技巧篇](https://hackmd.io/@sysprog/c-trick?type=view)提到`__extension__`是用來防止 gcc 產生警告的修飾字,在這裡用來防止使用到非 ANSI C 標準語句 `({})` (braced-group within expression)時出現警告。 `({})`是 gcc extension 的一項,視多個 statement 組成的 compound statement 為 expression,同時最後一個 expression 的值會視為整體的值。作用是讓 marco 更加安全,例如將會造成 double evaluation 的 `max()` ```cpp #define max(a,b) ((a) > (b) ? (a) : (b)) ``` 改寫成 ```cpp #define max(a,b) \ ({ typeof (a) _a = (a), _b = (b); _a > _b ? _a : _b; }) ``` 再來參考 [How to understand "((size_t) &((TYPE *)0)->MEMBER)"?](https://stackoverflow.com/questions/18554721/how-to-understand-size-t-type-0-member) 的描述,第 3 行可分解為以下步驟 ``` 1. (type *) 0) 2. ((type *) 0)->member) 3. __typeof__(((type *) 0)->member) 4.const __typeof__(((type *) 0)->member) *__pmember = (ptr) ``` 1. 在地址 `0` 上有一型態為 `type` 的結構 2. 取得此結構的成員 `memeber` 3. 取得結構成員 `member` 的型態 4. `__pmember` 是指向 `member` 的 `const pointer`,其值為 `ptr`。可以看到 `0` 並沒有任何作用,可帶入任意值。 第4行 `(type *) ((char *) __pmember - offsetof(type, member));` 用意在取得包覆 `__pmember` 的 `struct` 起始位置,因此需要 `offsetof(type, member)` 來取得 `__pmember` 與該 `struct` 的相對偏移量。 接著仿效 [A simple example with gcc and objdump](https://tuttlem.github.io/2015/01/12/a-simple-example-with-gcc-and-objdump.html) 透過 `objdump` 與 `gcc -g` 觀察編譯後產生的 object file ```bash gcc -c -g -O0 -std=c99 -o ./main.o ./main.c objdump -d -M intel -S main.o ``` ```cpp #include <stdio.h> #include <stdlib.h> #include "main.h" typedef struct _list_head { int value; struct _list_head *prev, *next; } list_head; int main() { list_head *obj = (list_head*) malloc(sizeof(list_head)); list_head **nptr = &(obj->next); list_head *cptr = container_of(nptr, list_head, next); printf("%p\n", cptr); free(obj); return 0; } ``` ```cpp int main() { 0: f3 0f 1e fa endbr64 4: 55 push rbp 5: 48 89 e5 mov rbp,rsp 8: 48 83 ec 20 sub rsp,0x20 list_head *obj = (list_head*) malloc(sizeof(list_head)); c: bf 18 00 00 00 mov edi,0x18 11: e8 00 00 00 00 call 16 <main+0x16> 16: 48 89 45 e0 mov QWORD PTR [rbp-0x20],rax list_head **nptr = &(obj->next); 1a: 48 8b 45 e0 mov rax,QWORD PTR [rbp-0x20] 1e: 48 83 c0 10 add rax,0x10 22: 48 89 45 e8 mov QWORD PTR [rbp-0x18],rax // save rax to [rbp-0x18] list_head *cptr = container_of(nptr, list_head, next); 26: 48 8b 45 e8 mov rax,QWORD PTR [rbp-0x18] // load [rbp-0x18] to rax 2a: 48 89 45 f0 mov QWORD PTR [rbp-0x10],rax // save rax to [rbp-0x10] 2e: 48 8b 45 f0 mov rax,QWORD PTR [rbp-0x10] // load [rbp-0x10] 32: 48 83 e8 10 sub rax,0x10 // rax = rax - 0x10 36: 48 89 45 f8 mov QWORD PTR [rbp-0x8],rax // save rax to [rbp-0x8] ``` 26~2a 行:將 `nptr` 複製到 `[rbp-0x10]` 2e~32 行:使 `[rbp-0x10]` 與 `0x10` 相減,對應 `(type *) ((char *) __pmember - offsetof(type, member));`,而 `0x10` 恰好是 `4 byte integer + 8 byte pointer + 4 byte padding` 的大小。 其中有許多冗餘片段,試著開啟最佳化 `-O1` 再編譯 ``` list_head **nptr = &(obj->next); list_head *cptr = container_of(nptr, list_head, next); printf("%p\n", cptr); free(obj); 2b: 48 89 df mov rdi,rbx 2e: e8 00 00 00 00 call 33 <main+0x33> ``` ### 解析 `list.h` 精簡實作 [Linux 鏈結串列 struct list_head 研究](https://myao0730.blogspot.com/2016/12/linux.html)提到一般應用程式使用 `linux/list.h` 的方法是將要用到的函式複製至code中,而例題與 `linux/list.h` 顯然有些相異之處。 **`list_add_tail`, `list_del`** ```cpp static inline void list_add_tail(struct list_head *node, struct list_head *head) { struct list_head *prev = head->prev; prev->next = node; node->next = head; node->prev = prev; head->prev = node; } static inline void list_del(struct list_head *node) { struct list_head *next = node->next, *prev = node->prev; next->prev = prev; prev->next = next; } ``` 讓新加入的節點在串列末端,從串列前端移除節點,捨棄 `linux/list.h` 包裝 `__list_add`、`__list_del` 的作法,意味整條串列操作起來類似 Queue。 - **`list_empty`, `list_is_singular`** ```cpp static inline int list_empty(const struct list_head *head) { return (head->next == head); } static inline int list_is_singular(const struct list_head *head) { return (!list_empty(head) && head->prev == head->next); } ``` 用以判斷串列是否只有1~2個節點,從兩個函式猜測最前面的節點應是作為維護整條串列使用,本身不具有任何資料。 - head: 維護整條串列的節點 - entry: 實際存放資料串列的節點 ```graphviz digraph example { graph [ rankdir = LR] node [ shape = record ] head [label="{<f0>|head|<f1>}"] entry1 [label="{<f0>|entry1|<f1>}"] entry2 [label="{<f0>|entry2|<f1>}"] blank [label="" shape=none] head-> entry1:f0 entry1:f0-> head:f1 entry1:f1 -> entry2:f0 entry2:f0 -> entry1:f1 entry2:f1 -> blank blank -> entry2:f1 } ``` **`list_splice_tail`** ```cpp= static inline void list_splice_tail(struct list_head *list, struct list_head *head) { struct list_head *head_last = head->prev; struct list_head *list_first = list->next, *list_last = list->prev; if (list_empty(list)) return; head->prev = list_last; list_last->next = head; list_first->prev = head_last; head_last->next = list_first; } ``` 插入非空串列 `list` 底下所有 entry 至 `head` 串列的最後方。 **`list_cut_position`** ```cpp= static inline void list_cut_position(struct list_head *head_to, struct list_head *head_from, struct list_head *node) { struct list_head *head_from_first = head_from->next; if (list_empty(head_from)) return; if (head_from == node) { INIT_LIST_HEAD(head_to); return; } head_from->next = node->next; head_from->next->prev = head_from; head_to->prev = node; node->next = head_to; head_to->next = head_from_first; head_to->next->prev = head_to; } ``` 目的是將從 `head_from->next` 直到 `node` 的部份串列移到未初始化的空串列 `head_to` 之後,成為新的串列,空串列定義為使用 `list_empty` 判斷返回 true。如果與 `linux/list.h` 的 [`list_cut_position`](https://github.com/torvalds/linux/blob/f296bfd5cd04cbb49b8fc9585adc280ab2b58624/include/linux/list.h#L376) 進行比較,86~92行實際上是將原先用來包裝的 `__list_cut_position` 寫進 `list_cut_position` ```cpp static inline void __list_cut_position(struct list_head *list, struct list_head *head, struct list_head *entry) { struct list_head *new_first = entry->next; list->next = head->next; list->next->prev = list; list->prev = entry; entry->next = list; head->next = new_first; new_first->prev = head; } ``` 但缺少下面這段 ```cpp if (list_is_singular(head) && (head->next != entry && head != entry)) return; ``` 上方意思是不對空佇列或元素個數只有為1的情況進行處理。出自 [commit 00e8a](https://github.com/torvalds/linux/commit/00e8a4da8cf0d7dba8cc4b0da28ea0f12dcf6b36) ```cpp=233 * @entry: an entry within head, could be the head itself * and if so we won't cut the list ``` :::warning TODO: 對照閱讀 Linux 核心的 git log/blame,查閱程式碼註解和相關的編修紀錄。程式碼給你,就是希望來日你可以做出貢獻。 :notes: jserv ::: :::warning - `git blame include/linux/lish.h` 在 commit [00e8a](https://github.com/torvalds/linux/commit/00e8a4da8cf0d7dba8cc4b0da28ea0f12dcf6b36) 找到 `list_cut_position` 和 `__list_cut_position` 被新增進來,之後沒有任何編輯紀錄。 ``` list.h: add list_cut_position() This adds list_cut_position() which lets you cut a list into two lists given a pivot in the list. Signed-off-by: Luis R. Rodriguez <lrodriguez@atheros.com> Signed-off-by: John W. Linville <linville@tuxdriver.com> ``` ::: ### 解題思路 **`get_middle`** ```cpp= static list_ele_t *get_middle(struct list_head *list) { struct list_head *fast = list->next, *slow; list_for_each (slow, list) { if (fast->next == list || fast->next->next == list) break; fast = fast->next->next; } return list_entry(slow, list_ele_t, list); } ``` 先將巨集 `list_for_each (slow, list)` 展開,看到這段巨集幫我們以 `list->(next)` 初始化 `slow`: ```cpp for (slow = (list)->next; slow != (list); slow = slow->next) ``` 接下來只要定義迴圈的終止條件,返回 `slow` 的容器即可。 **`list_merge_sort`** ```cpp= void list_merge_sort(queue_t *q) { if (list_is_singular(&q->list)) return; queue_t left; struct list_head sorted; INIT_LIST_HEAD(&left.list); list_cut_position(&left.list, &q->list, &get_middle(&q->list)->list); list_merge_sort(&left); list_merge_sort(q); list_merge(&left.list, &q->list, &sorted); INIT_LIST_HEAD(&q->list); list_splice_tail(&sorted, &q->list); } ``` 搭配 `queue_t` 一起看 ```cpp typedef struct { list_ele_t *head; // 應是串列頭 list_ele_t *tail; // 串列的尾 size_t size; // 實作O(1)查找串列大小? struct list_head list; // head } queue_t; ``` 考慮到 `list_cut_position` 最後一個參數接受指向 `list_head` 的指標,必須先取得`get_middle` 返回容器的 `list` 成員再傳入,而所有新宣告的 `list_head` 都透過 `INIT_LIST_HEAD` 進行初始化。 **`list_merge`** ```cpp static void list_merge(struct list_head *lhs, struct list_head *rhs, struct list_head *head) { INIT_LIST_HEAD(head); if (list_empty(lhs)) { list_splice_tail(lhs, head); return; } if (list_empty(rhs)) { list_splice_tail(rhs, head); return; } while (!list_empty(lhs) && !list_empty(rhs)) { char *lv = list_entry(lhs->next, list_ele_t, list)->value; char *rv = list_entry(rhs->next, list_ele_t, list)->value; struct list_head *tmp = strcmp(lv, rv) <= 0 ? lhs->next : rhs->next; list_del(tmp); list_add_tail(tmp, head); } list_splice_tail(list_empty(lhs) ? rhs : lhs, head); } ``` :::success 延伸問題: - [ ] 研讀 Linux 核心的 [lib/list_sort.c](https://github.com/torvalds/linux/blob/master/lib/list_sort.c) 原始程式碼,學習 [sysprog21/linux-list](https://github.com/sysprog21/linux-list) 的手法,將 [lib/list_sort.c](https://github.com/torvalds/linux/blob/master/lib/list_sort.c) 的實作抽離為可單獨執行 (standalone) 的使用層級應用程式,並設計效能評比的程式碼,說明 Linux 核心的 [lib/list_sort.c](https://github.com/torvalds/linux/blob/master/lib/list_sort.c) 最佳化手法 - [ ] 將你在 [quiz1](https://hackmd.io/@sysprog/linux2021-quiz1) 提出的改進實作,和 Linux 核心的 [lib/list_sort.c](https://github.com/torvalds/linux/blob/master/lib/list_sort.c) 進行效能比較,需要提出涵蓋足夠廣泛的 benchmark ::: --- ## 測驗`2` ### 解題思路 ```cpp uint16_t N = 0x8000 // 1000 0000 0000 0000 N |= N >> 1; // 1100 0000 0000 0000 N |= N >> 2; // 1111 0000 0000 0000 N |= N >> 4; // 1111 1111 0000 0000 N |= N >> 8; // 1111 1111 1111 1111 ``` 假設 `N` 為 `0x8000`,上述程式碼的作用在於將 MSB 以倍數的方式複製至後面每一個位元,至於 MSB 後面是否有非 0 bit 並不重要;若 `N` 的首個非 0 bit 不在 MSB 位置上,可能提早完成傳遞。作用同註解所說:將右邊所有位元設 1。 > [toastcheng](https://hackmd.io/@toastcheng/w2-quiz2) 發現 `uint16_t` 因為 integer promotion 的關係實際 Bitwise shift 運算元是擴展成 `int` 處理,遇到 `uint16_t` 最大值 65535 能正確處理,但遇到 `uint32_t` 則有 overflow 的風險,同時提出以 `__builtin_clz` 實作出相容於 `uint32_t` 的版本。 :::success 延伸問題: - [ ] 在 [The Linux Kernel API](https://www.kernel.org/doc/html/latest/core-api/kernel-api.html) 頁面搜尋 "power of 2",可見 `is_power_of_2`,查閱 Linux 核心原始程式碼,舉例說明其作用和考量,特別是 round up/down power of 2 * 特別留意 `__roundup_pow_of_two` 和 `__rounddown_pow_of_two` 的實作機制 - [ ] 研讀 [slab allocator](https://en.wikipedia.org/wiki/Slab_allocation),探索 Linux 核心的 slab 實作,找出運用 power-of-2 的程式碼和解讀 ::: --- ## 測驗`3` ```cpp= void bitcpy(void *_dest, /* Address of the buffer to write to */ size_t _write, /* Bit offset to start writing to */ const void *_src, /* Address of the buffer to read from */ size_t _read, /* Bit offset to start reading from */ size_t count) { size_t read_lhs = _read & 7; size_t read_rhs = 8 - read_lhs; const uint8_t *source = (const uint8_t *) _src + (_read / 8); size_t write_lhs = _write & 7; size_t write_rhs = 8 - write_lhs; uint8_t *dest = (uint8_t *) _dest + (_write / 8); static const uint8_t read_mask[] = { 0x00, /* == 0 00000000b */ 0x80, /* == 1 10000000b */ 0xC0, /* == 2 11000000b */ 0xE0, /* == 3 11100000b */ 0xF0, /* == 4 11110000b */ 0xF8, /* == 5 11111000b */ 0xFC, /* == 6 11111100b */ 0xFE, /* == 7 11111110b */ 0xFF /* == 8 11111111b */ }; static const uint8_t write_mask[] = { 0xFF, /* == 0 11111111b */ 0x7F, /* == 1 01111111b */ 0x3F, /* == 2 00111111b */ 0x1F, /* == 3 00011111b */ 0x0F, /* == 4 00001111b */ 0x07, /* == 5 00000111b */ 0x03, /* == 6 00000011b */ 0x01, /* == 7 00000001b */ 0x00 /* == 8 00000000b */ }; while (count > 0) { uint8_t data = *source++; size_t bitsize = (count > 8) ? 8 : count; if (read_lhs > 0) { data <<= read_lhs; if (bitsize > read_rhs) data |= (*source >> read_rhs); } if (bitsize < 8) data &= read_mask[bitsize]; uint8_t original = *dest; uint8_t mask = read_mask[write_lhs]; if (bitsize > write_rhs) { /* Cross multiple bytes */ *dest++ = (original & mask) | (data >> write_lhs); original = *dest & write_mask[bitsize - write_rhs]; *dest = original | (data << write_rhs); } else { // Since write_lhs + bitsize is never >= 8, no out-of-bound access. mask |= write_mask[write_lhs + bitsize]; *dest++ = (original & mask) | (data >> write_lhs); } count -= bitsize; } } ``` ### 解題思路 以從 `read_rhs` 起始位置讀取 1 位元組的資料為例 - `bitsize=8` - `read_lhs=2` - `read_rhs=6` - `write_lhs=5` - `write_rhs=3` (Round 1) Read ```graphviz digraph { graph [rankdir = TB] node [shape=plaintext, fontsize=18]; "" -> "bit" -> "byte"->"lhs/rhs" [fontcolor=black, color=white]; node [shape=none, fontcolor=black, fontsize=14, width=4.75, fixedsize=true]; byte0 [ shape=none label=<<table border="0" cellborder="1" cellspacing="0" cellpadding="17"> <tr> <td port="port0" bgcolor="#99004c">7</td> <td port="port1" bgcolor="#99004c">6</td> <td port="port2" bgcolor="#ffcce5"><font color="red">5</font></td> <td port="port3" bgcolor="#ffcce5"><font color="red">4</font></td> <td port="port4" bgcolor="#ffcce5"><font color="red">3</font></td> <td port="port5" bgcolor="#ffcce5"><font color="red">2</font></td> <td port="port6" bgcolor="#ffcce5"><font color="red">1</font></td> <td port="port7" bgcolor="#ffcce5"><font color="red">0</font></td> </tr> </table>> ]; byte1 [ shape=none label=<<table border="0" cellborder="1" cellspacing="0" cellpadding="17"> <tr> <td port="port0" bgcolor="#ffcce5"><font color="red">7</font></td> <td port="port1" bgcolor="#ffcce5"><font color="red">6</font></td> <td port="port2">5</td> <td port="port3">4</td> <td port="port4">3</td> <td port="port5">2</td> <td port="port6">1</td> <td port="port7">0</td> </tr> </table>> ]; byte2 [ shape=none label=<<table border="0" cellborder="1" cellspacing="0" cellpadding="17"> <tr> <td port="port0">7</td> <td port="port1">6</td> <td port="port2">5</td> <td port="port3">4</td> <td port="port4">3</td> <td port="port5">2</td> <td port="port6">1</td> <td port="port7">0</td> </tr> </table>> ]; t0 [shape=record label="<head> 0 | 1 | <s>2 | 3 | 4 | 5 | 6 | 7", color="white", fontcolor="black"]; t1 [shape=record label="<head> 8 | 9 | <s>10 | 11 | 12 | 13 | 14 | 15", color="white", fontcolor="black"]; t2 [shape=record label="<head> 16 | 17 | <s>18 | 19 | 20 | 21 | 22 | 23", color="white", fontcolor="black"]; "bitsize" -> byte0:port2 [color=black]; "bitsize" -> byte1:port1 [color=black]; "bitsize" -> byte0:port6 [color=none]; // magic! t0:head -> byte0:port0 [color=none]; t1:head -> byte1:port0 [color=none]; t2:head -> byte2:port0 [color=none]; read_lhs [shape=record label="read_lhs", color="white", fontcolor="black"]; "read_lhs" -> t0:s [color=black arrowhead=none]; { rank=same; "bit"; byte0; byte1; byte2 } { rank=same; "byte"; t0; t1; t2; } { rank=same; "lhs/rhs"; read_lhs;} } ``` (Round 1) Mask data ```graphviz digraph { graph [rankdir = TB] node [shape=plaintext, fontsize=18]; " data" -> " mask" -> "masked data" [fontcolor=black, color=white]; node [shape=none, fontcolor=black, fontsize=14, width=4.75, fixedsize=true]; byte0 [ shape=none label=<<table border="0" cellborder="1" cellspacing="0" cellpadding="12"> <tr> <td bgcolor="#ffcce5"><font color="red">5</font></td> <td bgcolor="#ffcce5"><font color="red">4</font></td> <td bgcolor="#ffcce5"><font color="red">3</font></td> <td bgcolor="#ffcce5"><font color="red">2</font></td> <td bgcolor="#ffcce5"><font color="red">1</font></td> <td bgcolor="#ffcce5"><font color="red">0</font></td> <td bgcolor="#ffcce5"><font color="red">7</font></td> <td bgcolor="#ffcce5"><font color="red">6</font></td> </tr> </table>> ]; mask [ shape=none label=<<table border="0" cellborder="1" cellspacing="0" cellpadding="12"> <tr> <td>1</td> <td>1</td> <td>1</td> <td>1</td> <td>1</td> <td>1</td> <td>1</td> <td>1</td> </tr> </table>> ]; byte1 [ shape=none label=<<table border="0" cellborder="1" cellspacing="0" cellpadding="12"> <tr> <td bgcolor="#009900"><font color="white">5</font></td> <td bgcolor="#009900"><font color="white">4</font></td> <td bgcolor="#009900"><font color="white">3</font></td> <td bgcolor="#009900"><font color="white">2</font></td> <td bgcolor="#009900"><font color="white">1</font></td> <td bgcolor="#009900"><font color="white">0</font></td> <td bgcolor="#009900"><font color="white">7</font></td> <td bgcolor="#009900"><font color="white">6</font></td> </tr> </table>> ]; { rank=same; " data"; byte0; } { rank=same; " mask"; mask; } { rank=same; "masked data"; byte1; } } ``` (Round 1) Write ```graphviz digraph { graph [rankdir = TB] node [shape=plaintext, fontsize=18]; "" -> "bit" -> "byte"->"lhs/rhs" [fontcolor=black, color=white]; node [shape=none, fontcolor=black, fontsize=14, width=4.75, fixedsize=true]; byte0 [ shape=none label=<<table border="0" cellborder="1" cellspacing="0" cellpadding="17"> <tr> <td port="port0" bgcolor="#004c99">7</td> <td port="port1" bgcolor="#004c99">6</td> <td port="port2" bgcolor="#004c99">5</td> <td port="port3" bgcolor="#004c99">4</td> <td port="port4" bgcolor="#004c99">3</td> <td port="port5" bgcolor="#009900"><font color="white">5</font></td> <td port="port6" bgcolor="#009900"><font color="white">4</font></td> <td port="port7" bgcolor="#009900"><font color="white">3</font></td> </tr> </table>> ]; byte1 [ shape=none label=<<table border="0" cellborder="1" cellspacing="0" cellpadding="17"> <tr> <td port="port0" bgcolor="#009900"><font color="white">2</font></td> <td port="port1" bgcolor="#009900"><font color="white">1</font></td> <td port="port2" bgcolor="#009900"><font color="white">0</font></td> <td port="port3" bgcolor="#009900"><font color="white">7</font></td> <td port="port4" bgcolor="#009900"><font color="white">6</font></td> <td port="port5">2</td> <td port="port6">1</td> <td port="port7">0</td> </tr> </table>> ]; byte2 [ shape=none label=<<table border="0" cellborder="1" cellspacing="0" cellpadding="17"> <tr> <td port="port0">7</td> <td port="port1">6</td> <td port="port2">5</td> <td port="port3">4</td> <td port="port4">3</td> <td port="port5">2</td> <td port="port6">1</td> <td port="port7">0</td> </tr> </table>> ]; t0 [shape=record label="<head> 0 | 1 | 2 | 3 | 4 | <s> 5 | 6 | 7", color="white", fontcolor="black"]; t1 [shape=record label="<head> 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15", color="white", fontcolor="black"]; t2 [shape=record label="<head> 16 | 17 | 18 | 19 | 20 | 21 | 22 | 23", color="white", fontcolor="black"]; "bitsize" -> byte0:port5 [color=black]; "bitsize" -> byte1:port4 [color=black]; "bitsize" -> byte1:port0 [color=none]; // magic! t0:head -> byte0:port0 [color=none]; t1:head -> byte1:port0 [color=none]; t2:head -> byte2:port0 [color=none]; write_lhs [shape=record label="write_lhs", color="white", fontcolor="black"]; "write_lhs" -> t0:s [color=black arrowhead=none]; { rank=same; "bit"; byte0; byte1; byte2 } { rank=same; "byte"; t0; t1; t2; } { rank=same; "lhs/rhs"; write_lhs;} } ``` (Round 2) Read ```graphviz digraph { graph [rankdir = TB] node [shape=plaintext, fontsize=18]; "" -> "bit" -> "byte"->"lhs/rhs" [fontcolor=black, color=white]; node [shape=none, fontcolor=black, fontsize=14, width=4.75, fixedsize=true]; byte0 [ shape=none label=<<table border="0" cellborder="1" cellspacing="0" cellpadding="17"> <tr> <td port="port0" bgcolor="#99004c">7</td> <td port="port1" bgcolor="#99004c">6</td> <td port="port2" bgcolor="#99004c">5</td> <td port="port3" bgcolor="#99004c">4</td> <td port="port4" bgcolor="#99004c">3</td> <td port="port5" bgcolor="#99004c">2</td> <td port="port6" bgcolor="#99004c">1</td> <td port="port7" bgcolor="#99004c">0</td> </tr> </table>> ]; byte1 [ shape=none label=<<table border="0" cellborder="1" cellspacing="0" cellpadding="17"> <tr> <td port="port0" bgcolor="#99004c">7</td> <td port="port1" bgcolor="#99004c">6</td> <td port="port2" bgcolor="#ffcce5"><font color="red">5</font></td> <td port="port3" bgcolor="#ffcce5"><font color="red">4</font></td> <td port="port4" bgcolor="#ffcce5"><font color="red">3</font></td> <td port="port5" bgcolor="#ffcce5"><font color="red">2</font></td> <td port="port6" bgcolor="#ffcce5"><font color="red">1</font></td> <td port="port7" bgcolor="#ffcce5"><font color="red">0</font></td> </tr> </table>> ]; byte2 [ shape=none label=<<table border="0" cellborder="1" cellspacing="0" cellpadding="17"> <tr> <td port="port0" bgcolor="#ffcce5"><font color="red">7</font></td> <td port="port1" bgcolor="#ffcce5"><font color="red">6</font></td> <td port="port2">5</td> <td port="port3">4</td> <td port="port4">3</td> <td port="port5">2</td> <td port="port6">1</td> <td port="port7">0</td> </tr> </table>> ]; t0 [shape=record label="<head> 0 | 1 | <s>2 | 3 | 4 | 5 | 6 | 7", color="white", fontcolor="black"]; t1 [shape=record label="<head> 8 | 9 | <s>10 | 11 | 12 | 13 | 14 | 15", color="white", fontcolor="black"]; t2 [shape=record label="<head> 16 | 17 | <s>18 | 19 | 20 | 21 | 22 | 23", color="white", fontcolor="black"]; "bitsize" -> byte1:port2 [color=black]; "bitsize" -> byte2:port1 [color=black]; "bitsize" -> byte1:port6 [color=none]; // magic! t0:head -> byte0:port0 [color=none]; t1:head -> byte1:port0 [color=none]; t2:head -> byte2:port0 [color=none]; read_lhs [shape=record label="read_lhs", color="white", fontcolor="black"]; "read_lhs" -> t1:s [color=black arrowhead=none]; { rank=same; "bit"; byte0; byte1; byte2 } { rank=same; "byte"; t0; t1; t2; } { rank=same; "lhs/rhs"; read_lhs;} } ``` --- 考慮變數帶有以下後綴的意義 - `_lhs`:表示寫入/讀取位元從起始位置,從 MSB 開始 - `_rhs`:表示從 `_lhs` 開始最多能寫入/讀取的位元數。例如 `_lhs=2, _rhs=6` 表示最多能寫入/讀取 6 bits。 設計上,`bitcpy()` 儘可能的以1位元組作為寫入/讀取單位以節省操作次數 ```cpp size_t bitsize = (count > 8) ? 8 : count; ``` 而在讀取/寫入指定的 count 位元數前,每一次分為讀取與寫入兩步驟,先看寫入: ```cpp uint8_t data = *source++; size_t bitsize = (count > 8) ? 8 : count; // 從 source 讀取的資料往左位移 `read_lhs`,取得所需位元 if (read_lhs > 0) { data <<= read_lhs; // 在位移後,有空間塞入其他資料 // 這時判斷 bitsize 是否超過能讀取的位元數 read_rhs ,需跨到另一個位元組 // 如果是,很貪婪的一次拿 (8-read_rhs) 位元 if (bitsize > read_rhs) data |= (*source >> read_rhs); } // 將 bitsize 後無效位元寫0,欲與其他資料進行 OR 運算結合 if (bitsize < 8) data &= read_mask[bitsize]; ``` 我們取得 `0xBEEF0000` 類型向左對齊的資料後再看到寫入,分做兩種情況: 1. `bitsize <= write_rhs`: 不需跨兩位元組寫入 2. `bitsize > write_rhs`: 需跨兩位元組寫入 第 1 種情況,不需跨兩位元組寫入,只需在一位元組內將舊資料 `original` 與新資料 `data` 結合 ```cpp=60 *dest++ = (original & mask) | (data >> write_lhs); ``` `mask` 是中間挖空的遮罩,預期組成為 ``` 1.......1 | 0.....0 | 1.................1 write_lhs bitsize 8-bitsize-write_lhs ``` 考慮 `bitsize+write_lhs` 的各種情況: 8-bitsize-write_lhs | bitsize+write_lhs | expected mask -|-|- 8|0|0b11111111 (invalid, `bitsize` 不能為0) 7|1|0b01111111 6|2|0b00111111 5|3|0b00011111 4|4|0b00001111 3|5|0b00000111 2|6|0b00000011 1|7|0b00000001 0|8|0b00000000 遮罩 `mask` 前面的部份由 `read_mask[write_lhs]`,後者恰好由 `write_mask[write_lhs + bitsize]` 所對應,因此無跨位元寫入程式碼為: ```cpp=51 uint8_t mask = read_mask[write_lhs]; ``` ```cpp=59 mask |= write_mask[write_lhs + bitsize]; *dest++ = (original & mask) | (data >> write_lhs); ``` 第2種情況,跨位元組寫入,情況稍微複雜,但判斷機制與讀取一樣: ```cpp bitsize > write_rhs ``` 以圖示說明兩次寫入的 BYTE 1, BYTE 2 的預期遮罩 `mask1`, `mask2` ``` ⌜ bitsize ⌝ 1.......1 | 0........0 | 0.................0 | 1...................1 | write_lhs writh_rhs | bitsize-writh_rhs 8-(bitsize-writh_rhs) | BYTE 1 | BYTE 2 ``` 同樣考慮 `bitsize-writh_rhs` 的各種情況,恰好對應到表 `write_mask`: 8-(bitsize-writh_rhs) | bitsize-write_rhs | expected mask -|-|- 8|0|0b11111111 (invalid, $bitsize-write\_rhs> 0$) 7|1|0b01111111 6|2|0b00111111 5|3|0b00011111 4|4|0b00001111 3|5|0b00000111 2|6|0b00000011 1|7|0b00000001 0|8|0b00000000 (invalid, $bitsize\leq 8$) 因此 - `mask1 = read_mask[write_lhs]` - `mask2 = write_mask[bitsize - write_rhs]` 跨位元寫入程式碼為所對應的程式碼為: ```cpp=51 uint8_t mask = read_mask[write_lhs]; if (bitsize > write_rhs) { /* Cross multiple bytes */ *dest++ = (original & mask) | (data >> write_lhs); original = *dest & write_mask[bitsize - write_rhs]; *dest = original | (data << write_rhs); ``` 總結 `read_mask` 與 `write_mask` 的意義: - `read_mask`:用來過濾真實的資料。在寫入也會使用到,因為寫入時也是需要將資料讀出來進行合併。 - `write_mask`:在指定位元組上謄出空間給資料寫入。 雖然能描述上述程式碼,但不認為能在短時間寫出。 ### 改良 `bitcpy` 使用 `perf -F max` 以最大取樣頻率分析函式內部熱點,而為了採樣更多的數據分析,放大原本的`input, output, i, j, k` 為32倍,此時 `perf.data` 來到100M。Intel 和 AT&T 語法尚有[些微差異](http://web.mit.edu/rhel-doc/3/rhel-as-en-3/i386-syntax.html),`perf report` 時使用 `-M intel` 更好閱讀。 ```bash sudo perf record -F max ./main sudo perf report -M intel ``` 發現函式內部熱點大多集中在 `mov`,而 `mov` 的 register operand 最大可為 64bit,既然如此,實作 64-bit 的 `bitcpy` 對效能應有所改善。 ``` 1.09 │ lea rcx,[rip+0xd6e] 1.71 │ movzx eax,BYTE PTR [rax+rcx*1] 0.78 │ and eax,edx 5.07 │ mov BYTE PTR [rbp-0x3a],al │ *dest = original | (data << write_rhs); 0.27 │ movzx eax,BYTE PTR [rbp-0x3b] 0.07 │ mov rdx,QWORD PTR [rbp-0x10] 0.00 │ mov ecx,edx 3.19 │ shl eax,cl 0.02 │ mov edx,eax 3.13 │ movzx eax,BYTE PTR [rbp-0x3a] 2.34 │ or eax,edx 0.07 │ mov edx,eax 0.00 │ mov rax,QWORD PTR [rbp-0x30] 6.00 │ mov BYTE PTR [rax],dl 0.10 │ ↓ jmp 1ca ``` 以下為 64-bit 版的 `bitcpy` ```cpp= #define __ALIGN_KERNEL(x, a) __ALIGN_KERNEL_MASK(x, (__typeof__(x))(a)-1) #define __ALIGN_KERNEL_MASK(x, mask) (((x) + (mask)) & ~(mask)) #define ALIGN_DOWN(x, a) __ALIGN_KERNEL((x) - ((a)-1), (a)) #define reverse_byte(n) \ __extension__({ \ __typeof__(n + 0) _n = (n); \ _n = ((_n & 0xffffffff00000000) >> 32) | ((_n & 0x00000000ffffffff) << 32); \ _n = ((_n & 0xffff0000ffff0000) >> 16) | ((_n & 0x0000ffff0000ffff) << 16); \ _n = ((_n & 0xff00ff00ff00ff00) >> 8) | ((_n & 0x00ff00ff00ff00ff) << 8); \ }) void bitcpy64(void *_dest, /* Address of the buffer to write to */ size_t _write, /* Bit offset to start writing to */ const void *_src, /* Address of the buffer to read from */ size_t _read, /* Bit offset to start reading from */ size_t count) { size_t read_lhs = _read & 63; size_t read_rhs = 64 - read_lhs; const uint64_t *source = (const uint64_t *)_src + (_read / 64); size_t write_lhs = _write & 63; size_t write_rhs = 64 - write_lhs; uint64_t *dest = (uint64_t *)_dest + (_write / 64); while (count > 0) { /* Downgrade 64-bit version to 8-bit version */ if (count < 64) { size_t _read_lhs = ALIGN_DOWN(read_lhs, 8); size_t _write_lhs = ALIGN_DOWN(write_lhs, 8); bitcpy(((uint8_t *)dest) + _write_lhs / 8, write_lhs - _write_lhs, ((uint8_t *)source) + _read_lhs / 8, read_lhs - _read_lhs, count); return; } uint64_t data = reverse_byte(*source++); size_t bitsize = (count > 64) ? 64 : count; if (read_lhs > 0) { data <<= read_lhs; if (bitsize > read_rhs) data |= (reverse_byte(*source) >> read_rhs); } if (bitsize < 64) data &= (int64_t)(1ULL << 63) >> (bitsize - 1); uint64_t original = reverse_byte(*dest); uint64_t mask = write_lhs ? ((int64_t)(1ULL << 63) >> (write_lhs - 1)) : 0; if (bitsize > write_rhs) { /* Cross multiple bytes */ *dest++ = reverse_byte((original & mask) | (data >> write_lhs)); original = reverse_byte(*dest) & ~((int64_t)(1ULL << 63) >> (bitsize - write_rhs - 1)); *dest = reverse_byte(original | (data << write_rhs)); } else { // Since write_lhs + bitsize is never >= 64, no out-of-bound access. mask |= ~((int64_t)(1ULL << 63) >> (write_lhs + bitsize - 1)); *dest++ = reverse_byte((original & mask) | (data >> write_lhs)); } count -= bitsize; } } ``` 上面程式碼假定使用者會在呼叫函式前判斷寫入/讀取處理是否越界。 64位元版本的 `bitcpy` 與原先版本差在 buffer 從 8-bit 變化至 64-bit,這裡用 32-bit buffer 進行說明,於原先的記憶體配置下得到的資料會是: ``` 0xAA 0xBB 0xCC 0xDD bits 7 6 5 4 3 2 1 0 | 7 6 5 4 3 2 1 0 | 7 6 5 4 3 2 1 0 | 7 6 5 4 3 2 1 0 Byte0 Byte1 input 0 1 2 3 4 5 6 7 8 9 ... || \/ uint32_t data = *source++; 0xDD 0xCC 0xBB 0xAA 7 6 5 4 3 2 1 0 | 7 6 5 4 3 2 1 0 | 7 6 5 4 3 2 1 0 | 7 6 5 4 3 2 1 0 bits 32 31 30 ... ..... 2 1 0 H L ``` 原先 `<<, >>, |, &` 之類的操作因為 Little Endian byte ordering 的關係無法使用,因此讀取需要用 `reverse_byte` 將位元順序翻轉再進行操作,寫入時再翻轉回來,有點像是線性轉換/反轉換。 - `+0` 的[技巧](https://stackoverflow.com/questions/18063373/is-it-possible-to-un-const-typeof-in-gcc-pure-c)是為了去掉 const qualifier。 ```cpp=7 __typeof__(n + 0) _n = (n); ``` - 一開始擔心 `&` 在一邊是 `unsigned` 另一邊是 `signed` 型態轉換上會有問題,在[規格書](http://www.open-std.org/jtc1/sc22/wg14/www/docs/n1570.pdf)看到會先做 Usual arithmetic conversions,往下找到對應解釋 > Otherwise, if the operand that has unsigned integer type has rank greater or equal to the rank of the type of the other operand, then the operand with signed integer type is converted to the type of the operand with unsigned integer type. 放心使用 ```cpp=51 data &= (int64_t)(1ULL << 63) >> (bitsize - 1); ``` :::warning 幫你補充,根據 [C99 standard](http://www.open-std.org/jtc1/sc22/wg14/www/docs/n1256.pdf) 6.5.7.5 >The result of E1 >> E2 is E1 right-shifted E2 bit positions. If E1 has an unsigned type or if E1 has a signed type and a nonnegative value, the value of the result is the integral part of the quotient of E1 / 2E2. If E1 has a signed type and a negative value, the resulting value is **implementation-defined**. 因此 [gcc documentation](https://gcc.gnu.org/onlinedocs/gcc/Integers-implementation.html) 有明確定義: >Bitwise operators act on the representation of the value including both the sign and value bits, where the sign bit is considered immediately above the highest-value value bit. **Signed ‘>>’ acts on negative numbers by sign extension.** 當然不能說你錯,但建議可以改寫避免此用法。 >[name=BakudYao] ::: :::info 參考[ bakudr18 共筆](https://hackmd.io/@MR-9Qa9wQLWglSAUyd6UhQ/)改寫為可讀性較高的用法: ```cpp #define READMASK(x) ((uint64_t)(~0ULL) << (64 - (x))) #define WRITEMASK(x) (~READ_MASK(x)) ``` 然而 `READ_MASK(0)` 有 undefined behavior 不得已加上分支:cry: ```cpp #define READMASK(x) (__builtin_expect(!!(x), 1) ? ((uint64_t)(~0ULL) << (64 - (x))) : 0ULL) #define WRITEMASK(x) (~READMASK(x)) ``` ::: - 當 `count<64` 使用原先版本進行複製,地址、位移需要重新計算,借用到 [kernel.h](https://elixir.bootlin.com/linux/latest/source/include/linux/kernel.h#L35) 的 `ALIGN_DOWN` ```cpp=28 /* Downgrade 64-bit version to 8-bit version */ if (count < 64) { size_t _read_lhs = ALIGN_DOWN(read_lhs, 8); size_t _write_lhs = ALIGN_DOWN(write_lhs, 8); bitcpy(((uint8_t *)dest) + _write_lhs / 8, write_lhs - _write_lhs, ((uint8_t *)source) + _read_lhs / 8, read_lhs - _read_lhs, count); return; } ``` 觀察複製 0~4kB 的效能差距,每次測試讀取/寫入偏移量涵蓋 0~63,64-bit 版約省下一半以上的時間: ![](https://i.imgur.com/W2YAWUL.png) [code](https://github.com/93i7xo2/sysprog2021q1/tree/master/quiz2/bitcpy) ::: info 看起來很有機會加入減少 branch 的作法獲得不錯的效能提升!也許你可以試試,細節可參考 [bakudr18 共筆](https://hackmd.io/@MR-9Qa9wQLWglSAUyd6UhQ/SkS-Y_lX_#%E6%94%B9%E5%96%84-bitcpy-%E6%95%88%E8%83%BD)。 >[name=BakudYao] ::: :::info 一開始有想過減少分支數量,但 Mispredicted branch 比重太低,而且編譯器 `-O3` 開下去有時效果更好,會感覺又在做白工。 ```bash gcc -g -O0 -o main main.c sudo perf record -b ./main sudo perf report --sort mispredict --symbols bitcpy,bitcpy64,bitcpy64_likely ``` ``` Samples: 12M of event 'cycles', Event count (approx.): 12608064 Overhead Source Symbol Target Symbol Branch Mispredicted 0.01% [.] main [.] main Y 0.01% [.] __memset_avx2_unaligned_erms [.] __memset_avx2_unaligned_erms Y 0.01% [.] bitcpy64_likely [.] bitcpy64_likely Y 0.00% [.] bitcpy64 [.] bitcpy64 Y 0.00% [.] prepare_exit_to_usermode [.] swapgs_restore_regs_and_return_to_usermode N 0.00% [.] swapgs_restore_regs_and_return_to_usermode [.] swapgs_restore_regs_and_return_to_usermode N 0.00% [.] swapgs_restore_regs_and_return_to_usermode [.] native_iret N 0.00% [.] note_gp_changes [.] rcu_core N 0.00% [.] native_irq_return_iret [.] bitcpy64_likely N 0.00% [.] native_irq_return_iret [.] bitcpy64 N 0.00% [.] bitcpy64 [.] irq_entries_start N 0.00% [.] __memset_avx2_erms [.] main Y 0.00% [.] irq_exit [.] smp_apic_timer_interrupt Y ``` > `bitcpy64_likely` 偏好跨 8 位元組 > ```cpp > uint64_t original = reverse_byte(*dest); > uint64_t mask = READ_MASK(write_lhs); > if (__builtin_expect(!!(bitsize > write_rhs), 1)) > ``` 雖然如此,還是仿照 [bakudr18/quiz2](https://github.com/bakudr18/quiz2) 實作消除 branch-prediction 的版本 ```cpp= #define READMASK(x) (__builtin_expect(!!(x), 1) ? ((uint64_t)(~0ULL) << (64 - (x))) : 0ULL) #define WRITEMASK(x) (~READMASK(x)) void bitcpy64_branch_predict(void *_dest, /* Address of the buffer to write to */ size_t _write, /* Bit offset to start writing to */ const void *_src, /* Address of the buffer to read from */ size_t _read, /* Bit offset to start reading from */ size_t count) { size_t read_lhs = _read & 63; size_t read_rhs = 64 - read_lhs; const uint64_t *source = (const uint64_t *)_src + (_read / 64); size_t write_lhs = _write & 63; size_t write_rhs = 64 - write_lhs; uint64_t *dest = (uint64_t *)_dest + (_write / 64); uint64_t data, original; /* copy until count < 64 bits */ for (size_t bytecount = count >> 6; bytecount > 0; bytecount--) { data = reverse_byte(*source++); data = data << read_lhs | (reverse_byte(*source) >> read_rhs); original = reverse_byte(*dest) & READMASK(write_lhs); *dest++ = reverse_byte(original | (data >> write_lhs)); *dest = reverse_byte((reverse_byte(*dest) >> write_lhs) | (data << write_rhs)); } count &= 63; /* copy the remaining count */ data = reverse_byte(*source++); data = ((data << read_lhs) | (reverse_byte(*source) >> read_rhs)) & READMASK(count); original = (reverse_byte(*dest) & READMASK(write_lhs)); *dest++ = reverse_byte(original | ((data & READMASK(count)) >> write_lhs)); if (count > write_rhs) *dest = reverse_byte((reverse_byte(*dest) & WRITEMASK(count - write_rhs)) | (data << write_rhs)); } ``` 測量 0~16 kbits 執行100次的效能 - `-O0` ![](https://i.imgur.com/zbo7t96.png) - `-O3` ![](https://i.imgur.com/GiHS6Po.png) 發現消弭分支帶來的效益不大。其實以 `-O0` 重現 [bakudr18/quiz2](https://github.com/bakudr18/quiz2) 的實驗也與原本實驗結果有落差,`branch misses` 並效能差距的主要原因。 ![](https://i.imgur.com/YJVnquV.png) ```bash sudo perf record -b ./bench_bitcpy sudo perf report --sort \ symbol_from,symbol_to,mispredict --symbols bitcpy,bitcpy_branch_predict ``` ``` Samples: 15K of event 'cycles', Event count (approx.): 15808 Overhead Source Symbol Target Symbol Branch Mispredicted IPC [IPC Coverage] 29.07% [.] bitcpy [.] bitcpy N 2.06 [ 90.8%] 19.46% [.] bitcpy_branch_predict [.] bitcpy_branch_predict N 1.02 [ 96.4%] 0.03% [.] bitcpy [.] bitcpy Y 2.06 [ 90.8%] <===== ``` ::: :::success 延伸問題: - [ ] 在 Linux 核心原始程式碼找出逐 bit 進行資料複製的程式碼,並解說相對應的情境 ::: --- ## 測驗`4` :::success 延伸問題: - [x] 解釋上述程式碼運作原理 - [ ] 上述程式碼使用到 gcc [atomics builtins](https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html),請透過 POSIX Thread 在 GNU/Linux 驗證多執行緒的環境中,string interning 能否符合預期地運作?若不能,請提出解決方案 - [x] [chriso/intern](https://github.com/chriso/intern) 是個更好的 [string interning](https://en.wikipedia.org/wiki/String_interning) 實作,請探討其特性和設計技巧,並透過內建的 benchmark 分析其效能表現 ::: ### 1. `cstr` 資料結構 - `__cstr_data` ```cpp enum { CSTR_PERMANENT = 1, CSTR_INTERNING = 2, CSTR_ONSTACK = 4, }; typedef struct __cstr_data { char *cstr; uint32_t hash_size; uint16_t type; uint16_t ref; } * cstring; ``` - `cstr`: 字串。 - `type`: 表示字串 `cstr` 所處位置,有 HEAP 與 STACK (`CSTR_ONSTACK`)。 - `hash_size`: 紀錄字串長度,之後的 hash table 使用此值計算 hash,應改用 `length` 為名較為合適。`hash_size = 0` 為特殊狀況(??)。 - `ref`: 字串位於 heap 時,用來紀錄 refcount,從非 0 轉變成 0 時需要要釋放記憶體。 - `__cstr_buffer` ```cpp typedef struct __cstr_buffer { cstring str; } cstr_buffer[1]; #define CSTR_BUFFER(var) \ char var##_cstring[CSTR_STACK_SIZE] = {0}; \ struct __cstr_data var##_cstr_data = {var##_cstring, 0, CSTR_ONSTACK, 0}; \ cstr_buffer var; \ var->str = &var##_cstr_data; // cstr_buffer sb => __cstr_buffer *sb cstring cstr_cat(cstr_buffer sb, const char *str){ cstring s = sb->str; ... sb->str = cstr_cat2(tmp->cstr, str); cstr_release(tmp); return sb->str; } ``` `cstr_buffer var` 經由 `CSTR_BUFFER(var)` 創立後,透過對以下操作對 `__cstr_buffer` 內的 `str` 進行修改 ```cpp CSTR_BUFFER(var); var[0].str = NULL; var->str = NULL; ``` 或是透過 `cstr_cat()`。由於轉型成指標的關係,函式內部透過 `sb->str` 存取 `str`。 ```cpp CSTR_BUFFER(a); // ON_STACK cstr_cat(a, "Hello "); ``` `__cstr_buffer` 也能創立在 heap。 ```cpp cstr_buffer *b = (__cstr_buffer *sb) calloc(sizeof(__cstr_buffer)); cstr_cat(b, "Hello "); ``` > `__cstr_buffer` 建立在 stack、`cstring str` 使用 `struct __cstr_buffer` 包住的動機不明。 ### 2. 從 `cstr_cat` 切入 ```cpp cstring cstr_cat(cstr_buffer sb, const char *str) { cstring s = sb->str; if (s->type == CSTR_ONSTACK) { int i = s->hash_size; while (i < CSTR_STACK_SIZE - 1) { s->cstr[i] = *str; if (*str == 0) return s; ++s->hash_size; ++str; ++i; } s->cstr[i] = 0; } cstring tmp = s; sb->str = cstr_cat2(tmp->cstr, str); cstr_release(tmp); return sb->str; } ``` 以上功能為 1. 若 `s` 指向的 `__cstr_data` 位在 stack 上(由`CSTR_BUFFER`創建的結果),儘可能附加 `str` 到 stack 上的 `cstr` 後,含 `\0` 最多 `CSTR_STACK_SIZE` 字元。若不足 `CSTR_STACK_SIZE` 則返回 `cstr`。 2. 接著不管 `__cstr_data` 是在 heap 或 stack,透過 `cstr_cat2` 結合新舊 `cstr` 取得/生成新的 `__cstr_data`,釋放舊的 `__cstr_data` 物件。 往下看 `cstr_cat2` ```cpp static cstring cstr_cat2(const char *a, const char *b) { size_t sa = strlen(a), sb = strlen(b); if (sa + sb < CSTR_INTERNING_SIZE) { char tmp[CSTR_INTERNING_SIZE]; memcpy(tmp, a, sa); memcpy(tmp + sa, b, sb); tmp[sa + sb] = 0; return cstr_interning(tmp, sa + sb, hash_blob(tmp, sa + sb)); } cstring p = xalloc(sizeof(struct __cstr_data) + sa + sb + 1); if (!p) return NULL; char *ptr = (char *) (p + 1); p->cstr = ptr; p->type = 0; p->ref = 1; memcpy(ptr, a, sa); memcpy(ptr + sa, b, sb); ptr[sa + sb] = 0; p->hash_size = 0; return p; } ``` - `cstr_cat2()` 最終需返回指向新 `__cstr_data` 的 `cstring`。 - 若輸入字串 `a`、`b` 總長低於 `CSTR_INTERNING_SIZE`,返回的是指向 interning table 內元素(`__cstr_data`)的 `cstring`。 - **使用 `hash_blob(tmp, sa + sb)` 作為 hash value** - 其他情況下將創建字串 `cstr` 附加在 `__cstr_data` 後 - `type = 0` - `hash_size = 0` - `ref = 1` :::info `cstr_interning()` 作為整個程式核心,負責維護 interned string table (`struct __cstr_node **hash`)。 `__cstr_ctx` 有幾項作用 - 指向 table - table 的 lock - 分別透過 total、size 紀錄元素數量、容量,容量不足時需透過 `expand()` 倍增容量 - 指向存放多個 `__cstr_node` 的 `pool` - 紀錄 `pool` 內元素大小的 `index` <br/> 依據 [ISO/IEC 9899:1999 (E) 6.7.8](http://open-std.org/JTC1/SC22/WG14/www/standards),`__cstr_ctx` 的成員被初始化為 0 或 null pointer。 > If an object that has automatic storage duration is not initialized explicitly, its value is indeterminate. If an object that has static storage duration is not initialized explicitly, then: > — if it has pointer type, it is initialized to a null pointer; — if it has arithmetic type, it is initialized to (positive or unsigned) zero; > — if it is an aggregate, every member is initialized (recursively) according to these rules; > — if it is a union, the first named member is initialized (recursively) according to these rules. ```cpp struct __cstr_interning { int lock; int index; unsigned size; unsigned total; struct __cstr_node **hash; struct __cstr_pool *pool; }; static struct __cstr_interning __cstr_ctx; static cstring cstr_interning(const char *cstr, size_t sz, uint32_t hash) { cstring ret; CSTR_LOCK(); ret = interning(&__cstr_ctx, cstr, sz, hash); if (!ret) { expand(&__cstr_ctx); ret = interning(&__cstr_ctx, cstr, sz, hash); } ++__cstr_ctx.total; CSTR_UNLOCK(); return ret; } ``` interned string table 是一張使用 [Separate chaining](https://en.wikipedia.org/wiki/Hash_table#Separate_chaining) 處理碰撞的 hash table, ![](https://upload.wikimedia.org/wikipedia/commons/thumb/d/d0/Hash_table_5_0_1_1_1_1_1_LL.svg/450px-Hash_table_5_0_1_1_1_1_1_LL.svg.png) 由於 `__cstr_data` 自身無法連結其他資料,須使用 `__cstr_node` 包裝。 ```cpp struct __cstr_node { char buffer[CSTR_INTERNING_SIZE]; // 暫時無用處 struct __cstr_data str; struct __cstr_node *next; }; ``` 在 table 內新增 entry 只能透過 `interning()`,以傳遞進的 `hash` 作為 index,`__cstr_interning.hash` 上儲存的無疑是指向一連串連續 `__cstr_node*` 起始位置的指標。 1. 先是在對應的 list 上尋找有無字串相符的 `__cstr_data` ```cpp int index = (int) (hash & (si->size - 1)); struct __cstr_node *n = si->hash[index]; while (n) { if (n->str.hash_size == hash) { if (!strcmp(n->str.cstr, cstr)) return &n->str; } n = n->next; } ``` 2. 若無,檢查 capacity ```cpp // 80% (4/5) threshold if (si->total * 5 >= si->size * 4) return NULL; if (!si->pool) { si->pool = xalloc(sizeof(struct __cstr_pool)); si->index = 0; } ``` 3. 在 heap 上的 `pool` 新建 `__cstr_node`,這樣做的好處是這些新建的 `__cstr_node` 都使用相鄰的記憶體位置,甚至連 `__cstr_node.str.cstr` 的空間也不分配,直接指向 `__cstr_node.buffer[]`。 ```cpp /* cstr.c */ struct __cstr_pool { struct __cstr_node node[INTERNING_POOL_SIZE]; }; ``` ```cpp n = &si->pool->node[si->index++]; /* 沒有檢查 index < INTERNING_POOL_SIZE */ memcpy(n->buffer, cstr, sz); n->buffer[sz] = 0; cstring cs = &n->str; cs->cstr = n->buffer; cs->hash_size = hash; cs->type = CSTR_INTERNING; /* important */ cs->ref = 0; n->next = si->hash[index]; si->hash[index] = n; ``` 示意圖如下 ```graphviz digraph { nodesep=0.8 graph [rankdir = BD labelloc = "t"] node [shape=record color=blud] //All nodes will this shape and colour edge [color=Black] __cstr_interning [ label = "{__cstr_interning|{lock|index|size|total|<hash>hash|<pool>pool}}" ] __cstr_pool [ label = "{__cstr_pool|{<n0>node[0]|{<n1>node[1]|{buffer|str|<next>next}}|<n2>node[2]|<n3>node[3]|<n4>node[4]|...}}" ] hash [ label = "{<0>0|<1>1|<2>2|<3>3|<4>4|<5>5|...|size-1}" ] __cstr_interning:hash->hash:head __cstr_interning:pool->__cstr_pool hash:2->__cstr_pool:n1 __cstr_pool:next->__cstr_pool:n2 __cstr_pool:n2->__cstr_pool:n4 } ``` ::: ### 3. `CSTR_LITERAL` 用法 ```cpp CSTR_LITERAL(hello, "Hello string"); ``` 產生的 `__cstr_data` **可能**具有最後一種不會被 `cstr_release` 影響到的型態:`CSTR_PERMANENT`,具體行為如下: 1. `cstr_clone()` 嘗試將 "Hello string" 放到 string interned table 中,或仿照 `cstr_cat2` 複製到 heap 上 ```cpp void *ptr = (void *) (p + 1); p->cstr = ptr; p->type = 0; p->ref = 1; memcpy(ptr, cstr, sz); ((char *) ptr)[sz] = 0; p->hash_size = 0; ``` 2. ```cpp tmp->type = CSTR_PERMANENT; tmp->ref = 0; ``` ```cpp #define CSTR_LITERAL(var, cstr) \ static cstring var = NULL; \ if (!var) { \ cstring tmp = cstr_clone("" cstr, (sizeof(cstr) / sizeof(char)) - 1); \ if (tmp->type == 0) { \ tmp->type = CSTR_PERMANENT; \ tmp->ref = 0; \ } \ if (!__sync_bool_compare_and_swap(&var, NULL, tmp)) { \ if (tmp->type == CSTR_PERMANENT) \ free(tmp); \ } \ } ``` > 無法想像到 `__sync_bool_compare_and_swap` 失敗的情境 ### 4. `cstr_release` From [GCC - Atomic-Builtins](https://gcc.gnu.org/onlinedocs/gcc-4.1.1/gcc/Atomic-Builtins.html) ```! type __sync_sub_and_fetch (type *ptr, type value, ...) These builtins perform the operation suggested by the name, and return the new value. That is, { *ptr op= value; return *ptr; } ``` ```cpp enum { CSTR_PERMANENT = 1, CSTR_INTERNING = 2, // OK CSTR_ONSTACK = 4, // OK }; void cstr_release(cstring s) { if (s->type || !s->ref) return; if (__sync_sub_and_fetch(&s->ref, 1) == 0) free(s); } ``` 將只有位在 heap 的 `__cstr_data` 且型態非 - `CSTR_PERMANENT`: 不於 table 又在 heap 上,由使用者自行維護 - `CSTR_INTERNING`: table 內物件 進行 `refcnt - 1` 並釋放,atomic operation 確保一個 `refcnt` 不會同時進行操作。 ### 5. `cstr_grab` ```cpp= cstring cstr_grab(cstring s) { if (s->type & (CSTR_PERMANENT | CSTR_INTERNING)) return s; if (s->type == CSTR_ONSTACK) return cstr_clone(s->cstr, s->hash_size); if (s->ref == 0) s->type = CSTR_PERMANENT; else __sync_add_and_fetch(&s->ref, 1); return s; } ``` 整份程式也只有呼叫過一次,不難看出為了在離開函式也能使用 `__cstr_data`,stack 上的物件必須複製一份 (行 7)。之後符合 `s->ref == 0` 的情況 (行 8),不可能發生。 ```cpp /* main.c */ static cstring cmp(cstring t) { CSTR_LITERAL(hello, "Hello string"); CSTR_BUFFER(ret); ... return cstr_grab(CSTR_S(ret)); } cstring b = cmp(CSTR_S(a)); ``` ### 6. `cstr_equal` ```cpp= int cstr_equal(cstring a, cstring b) { if (a == b) return 1; if ((a->type == CSTR_INTERNING) && (b->type == CSTR_INTERNING)) return 0; if ((a->type == CSTR_ONSTACK) && (b->type == CSTR_ONSTACK)) { if (a->hash_size != b->hash_size) return 0; return memcmp(a->cstr, b->cstr, a->hash_size) == 0; } uint32_t hasha = cstr_hash(a); uint32_t hashb = cstr_hash(b); if (hasha != hashb) return 0; return !strcmp(a->cstr, b->cstr); } static size_t cstr_hash(cstring s) { if (s->type == CSTR_ONSTACK) return hash_blob(s->cstr, s->hash_size); if (s->hash_size == 0) s->hash_size = hash_blob(s->cstr, strlen(s->cstr)); return s->hash_size; } ``` 值得注意的是行 12~16 的邏輯,比完 hash value 才去比較字串 `cstr`,拿去計算 hash value 的都是用 `cstr` 及 `strlen(cstr)`,像 `CSTR_INTERNING`、`CSTR_PERMANENT` 這種型態內 `hash_size` 無實質意義,就得用 `strlen()` 重新取得長度。 --- 比對兩 `cstring` 是否相等,得依其順序進行比較 1. type 2. hash_size 3. memcmp 型態和 `hash_size` 之間的關係: type|memory location|ref|hash_size -|-|-|- `CSTR_ONSTACK`|stack|0|string length `CSTR_INTERNING`|heap|0|`hash_blob(cstr, strlen(cstr))` `CSTR_PERMANENT`|heap|0|0 `0` (default)|heap|>=1|0 ### 7. 多執行緒 ### 8. array-to-pointer decay array-to-pointer decay 或是 array decay 指的是陣列型態退化成指標,失去原先陣列所帶有的資訊,例如陣列大小。出現在以下情境: 1. function declarators 2. 發生在表達式的 [implicit conversion](https://en.cppreference.com/w/c/language/conversion) 關於第 1 點在 [6.7.5.3 Function declarators](http://www.open-std.org/jtc1/sc22/wg14/www/docs/n1124.pdf) 能找到相關敘述,或是參照 [Daniel Chen 筆記](https://blog.danielchen.cc/2020/05/30/C-Array-Function-Parameter/)。第 2 點常發生在 assignment,而 assignment 左式必須為 modifiable lvalue,[ISO/IEC 1999:9899 - 6.3.2.1]() 這樣定義: > When an object is said to have a particular type, the type is specified by the lvalue used to designate the object. **A modifiable lvalue is an lvalue that does not have array type,** does not have an incomplete type, does not have a const-qualified type, and if it is a structure or union, does not have any member (including, recursively, any member or element of all contained aggregates or unions) with a const-qualified type. 這樣的限制,使下方程式無法編譯成功 ```cpp int a[3], b[3]; a = b; // error: assignment to expression with array type ``` 隨後條目指出 > Except when it is the operand of the sizeof operator or the unary & operator, or is a string literal used to initialize an array, **an expression that has type ‘‘array of type’’ is converted to an expression with type ‘‘pointer to type’’ that points to the initial element of the array object and is not an lvalue.** If the array object has register storage class, the behavior is undefined. 因此 assignment 右方具有陣列型態的表達式將被轉型 ```cpp int a[3], b[3][4], *p, (*q)[4]; p = a; // conversion to &a[0] q = b; // conversion to &b[0] ``` :::info 關於 lvalue 的定義一般認知是 locator value,但規格書對 lvalue 的定義看起來很廣: > **An lvalue is an expression with an object type or an incomplete type other than void**; if an lvalue does not designate an object when it is evaluated, the behavior is undefined. >> [type](https://en.cppreference.com/w/c/language/type) >> Objects, functions, and expressions have a property called type, which determines the interpretation of the binary value **stored in an object or evaluated by the expression.** ::: ### 9. 與 `chriso/intern` 比較 為了讓測試進行下去,先套用 [iLeafy11](https://hackmd.io/@qJEZpNvuSHe0kzJAmMSeQg/HkNhw0H7O) 提到的 `__cstr_ctx.total` 誤植問題 ```diff static cstring cstr_interning(const char *cstr, size_t sz, uint32_t hash) { ... - ++__cstr_ctx.total; ... } static cstring interning(struct __cstr_interning *si, const char *cstr, size_t sz, uint32_t hash) { ... + ++__cstr_ctx.total; } ``` :::info 另一個問題是 `void *ptr = (void *) (p + 1);` 使用 `gcc -Wpedantic -Wall -std=c99` 編譯會出現警告。嚴格說,在 `void*` 上進行算術運算是非法操作,但 gcc 實作允許。 ```diff main.c:4:20: warning: pointer of type ‘void *’ used in arithmetic [-Wpointer-arith] 4 | printf("%p\n", a + 1); ``` Reference - [Pointer arithmetic for void pointer in C](https://stackoverflow.com/questions/3523145/pointer-arithmetic-for-void-pointer-in-c) ::: - 用法 ```cpp // 使用上得先創立 `strings` char buffer[12] = "Hello world"; struct strings *strings = strings_new(); // 接著向 interned table 塞入字串,取得從 1 遞增的 id uint32_t id = strings_intern(strings, buffer); // 以 id 查詢字串 const char* str = strings_lookup_id(strings, id); printf("%s\n", str); ``` - 仿造 [chriso/intern](https://github.com/chriso/intern) 的 `benchmark` 量測插入 `1`~`5000000` 共 5M 個字串到 interned table 平均所需要的額外開銷 - chriso/intern - 修改 ```diff /* benchmark.c */ - char buffer[12] = {'x'}; + char buffer[12] = {0}; uint32_t count = 5000000; size_t string_bytes = 0; for (uint32_t id = 1; id <= count; id++) { - unsigned_string(buffer + 1, id); + unsigned_string(buffer, id); string_bytes += strlen(buffer); assert(strings_intern(strings, buffer) == id); } ``` - 實驗結果 ``` Interned 5M unique strings Overhead per string: 44.4 bytes ``` - cstr - 為了容納 5000000 個字串,配置足夠大小的 pool ``` #define INTERNING_POOL_SIZE 1<<23 ``` - 新增 `strings_allocated_bytes()` 量測 `__cstr_interning` 佔用空間 ```cpp size_t strings_allocated_bytes() { size_t s_pool = 0, s_table = 0, s_ctx = 0; CSTR_LOCK(); if (__cstr_ctx.pool){ s_pool = sizeof(*__cstr_ctx.pool); printf("pool: %ld bytes\n", s_pool); } s_table = sizeof(struct __cstr_node *) * __cstr_ctx.size; printf("hash table: %ld bytes\n", s_table); s_ctx = sizeof(__cstr_ctx); printf("ctx: %ld bytes\n", s_ctx); CSTR_UNLOCK(); return s_pool + s_table + s_ctx; } ``` - 實驗結果 ```bash ~$ mkdir build && cd build/ ~$ cmake ../ && make ~$ ./benchmark ``` ```bash pool: 469762048 bytes hash table: 67108864 bytes ctx: 32 bytes Interned 5000000 unique strings Overhead per string: 99.6 bytes ``` `cstr` 額外開銷足足有 chriso/intern 的兩倍,而 pool size 佔了大部份。為了減少開銷,在不採用 LRU 等手段減少 pool size 的情況下,能借鑑的只有 chriso/intern 本身的資料結構。 - [原始碼](https://github.com/93i7xo2/sysprog2021q1/tree/master/quiz2/string_interning) ### 10. `chriso/intern` 資料結構 ```cpp /* block.h */ struct block { size_t page_size; void **pages; size_t *offsets; size_t count; size_t size; }; /* strings.c */ struct strings { struct block *hashes; struct block *strings; struct block *index; tree_t hash_map; uint32_t total; uint32_t hash_seed; }; /* strings.h */ uint32_t strings_intern(struct strings*, const char *string); const char *strings_lookup_id(struct strings*, uint32_t id); ``` - TODO - Insert/Lookup 效能分析 - 測資: [10-million-password-list-top-1000000.txt](https://github.com/danielmiessler/SecLists/blob/master/Passwords/Common-Credentials/10-million-password-list-top-1000000.txt) ## Reference - [x86 Assembly Guide](https://www.cs.virginia.edu/~evans/cs216/guides/x86.html) - [How to change the colour of one record in multi-record shape ](https://stackoverflow.com/questions/17765301/graphviz-dot-how-to-change-the-colour-of-one-record-in-multi-record-shape) - [2021q1 Homework2 (quiz2) - written by toastcheng](https://hackmd.io/@toastcheng/w2-quiz2)