--- title: 2020 第三週作業 tags: _核心設計 breaks: true --- # 2020q1 Homework2 (quiz2) contributed by < `babysuse` > # 大綱 [toc] # 作業描述 以 Facebook 針對 C++ std::string 用 SSO (Small String Optimization) 和 CoW (Copy on Write) 來實作出高效替代品 folly::fbstring 結構如下: ```cpp union { struct { char *data; size_t sizse; size_t capacity; } heap; struct { char *data[23]; int8_t length; } stack; } ``` 根據 [Folly 文件](https://github.com/facebook/folly/blob/master/folly/FBString.h#L208),長中短字串特性分別如下:<br> 1. 結尾 2 bits (rightmost) 用來標記長中短字串 - 00 為短字串 - 10 為中字串 - 01 為長字串 2. 短字串為字串長度 &le; 23 者,而 length 紀錄「最大長度 &minus; 實際長度」 - 當長度剛好為 23 時,第 24 位 length 便會是 `'\0'` (null charactor),其值剛好為 0 - 當長度為 &le; 22 時,第 24 位 length 的左邊六位元 (扣掉用來標記長中短字串的兩位元) 拿來紀錄「最大長度 &minus; 實際長度」 3. 在中長字串當中, - 除了末 2 bits 之外,就是長字串使用到 reference-counted 和 copied lazily<br><br> 以 C 重寫,節錄出以下三段程式碼 ### union xs ```cpp typedef union { char data[16]; struct { uint8_t filler[15], space_left : 4, /* if it is on heap, set to 1 */ is_ptr : 1, flag1 : 1, flag2 : 1, flag3 : 1; }; /* heap allocated */ struct { char *ptr; size_t size : 54, /* capacity is always a power of 2 (unsigned)-1 */ capacity : 6; /* the last 4 bits are important flags */ }; } xs; ``` ### xs_new(xs *, const void *) ```cpp xs *xs_new(xs *x, const void *p) { *x = xs_literal_empty(); size_t len = strlen(p) + 1; if (len > AAA) { x->capacity = ilog2(len) + 1; x->size = len - 1; x->is_ptr = true; x->ptr = malloc((size_t) 1 << x->capacity); memcpy(x->ptr, p, len); } else { memcpy(x->data, p, len); x->space_left = 15 - (len - 1); } return x; } #define xs_tmp(x) \ ((void) ((struct { \ _Static_assert(sizeof(x) <= 16, "it is too big"); \ int dummy; \ }){1}), \ xs_new(&xs_literal_empty(), "" x)) ``` ### xs_trim(xs *, const char *) ```cpp xs *xs_trim(xs *x, const char *trimset) { if (!trimset[0]) return x; char *dataptr = xs_data(x), *orig = dataptr; /* similar to strspn/strpbrk but it operates on binary data */ uint8_t mask[BBB] = {0}; #define check_bit(byte) (mask[(uint8_t) byte / 8] CCC 1 << (uint8_t) byte % 8) #define set_bit(byte) (mask[(uint8_t) byte / 8] |= 1 << (uint8_t) byte % 8) size_t i, slen = xs_size(x), trimlen = strlen(trimset); for (i = 0; i < trimlen; i++) set_bit(trimset[i]); for (i = 0; i < slen; i++) if (!check_bit(dataptr[i])) break; for (; slen > 0; slen--) if (!check_bit(dataptr[slen - 1])) break; dataptr += i; slen -= i; /* reserved space as a buffer on the heap. * Do not reallocate immediately. Instead, reuse it as possible. * Do not shrink to in place if < 16 bytes. */ memmove(orig, dataptr, slen); /* do not dirty memory unless it is needed */ if (orig[slen]) orig[slen] = 0; if (xs_is_ptr(x)) x->size = slen; else x->space_left = 15 - slen; return x; #undef check_bit #undef set_bit } ``` 預期達到 ```c= xs string = *xs_tmp(" foobarbar \n\n"); xs_trim(&string, "\n "); printf("%s : %2zu\n", xs_data(&string), xs_size(&string)); xs prefix = *xs_tmp("((("), suffix = *xs_tmp(")))"); xs_concat(&string, &prefix, &suffix); printf("%s : %2zu\n", xs_data(&string), xs_size(&string)); ``` ``` foobarbar : 9 (((foobarbar))) : 15 ``` 其中 - `xs_trim` 為移除指定字元的函式 - `xs_tmp` 透過 macro 以 `xs_new` 來創建新的字串 - `xs_literal_empty` 是以 [Designated Initializers](https://gcc.gnu.org/onlinedocs/gcc/Designated-Inits.html) 來將 size - `ilog2` 二的對數 (二進位表示法下的位數 - 1)<br><br> 1. 試問: AAA, BBB, CCC 分別為何?(AAA, BBB 為整數,CCC 為運算子) 2. 程式碼運作原理和改進方案 3. 以上述程式碼為基礎實作 strcpy (CoW 列入考慮) 和 strtok 功能函式 4. 設計實驗確認上述字串帶來空間效益 (locality of reference 列入考慮) 5. 在 Linux Kernel 中找出類似 SSO 和 CoW 案例並解說 # 完成程式碼 首先比較特別的是這裡的「短字串」上限為 15 (後續位元的使用同 folly::fbstring),而用來初始化的 `xs_new` 判斷式是要區分短字串與中長字串,加上 + 1 為了加上 null character `'\0'`,所以 **AAA** 為 16 再來這邊註解提到的兩個函式 - [`strspn(str1, str2)`](http://www.cplusplus.com/reference/cstring/strspn/): 算出 `str1` 從頭開始連續有多少個 chars 在 `str2` 中 - [`strpbrk(str1, str2)`](http://www.cplusplus.com/reference/cstring/strpbrk/): 回傳 `str2` 任一 char 在 `str1` 首次出現的位置 - 我是覺得功能和這裡的 xs_trim 差有點多,但是可以理解應該是想要表達和 mask 的存在意義同樣為「做比較」的概念。不過 **BBB** 可以直接用位元進行判斷: `check_bit(byte)` 和 `set_bit(byte)` 中 cast 成的 `uint8_t` 為 8 bits,而 `/8` 相當於 `>> 3`,所以最大可以表示到 2^5^ - 1,因此 **BBB** 為 32 最後 `#define check_bit(...) ...` 開宗明義就直接講明是要檢查位元了,搭配 1 來進行檢查就必定是 `&`,因為 1 是 `&` 的單位元。 # 運作原理和改進方案 ## 運作原理 / 解讀程式碼 這裡就按照上面和以下截錄出比較重要的程式碼講解: ### [`union xs`](https://hackmd.io/@babysuse2018/Linux核心設計_Lab1_3?view#union-xs) 主要就是兩種型態: - 短字串 (儲存在 stack) - 模仿利用 **rightmost byte** 來進行**長度**和 **flags** 的操作和紀錄 - 長度命名為 `space_left` 顧名思義為「最大長度 - 實際長度」 - flags 中 `is_ptr` 用來區分短字串與中長字串,其餘 3 個未定義 - 中長字串 (儲存在 heap) - `ptr` 是字串/記憶體的 handler - `size` 和 `capacity` 應該是想要模仿 C++ 中 [std::string](http://www.cplusplus.com/reference/string/string/) 的 [size](http://www.cplusplus.com/reference/string/string/size/) 和 [capacity](http://www.cplusplus.com/reference/string/string/capacity/) 的關係 - size 為字串長度 (即實際使用空間 - 1) - capacity 為目前以分配到的記憶體空間 (並且都是以 2 的指數在分配),為了節省記憶體資源 (union 只以 6 bits 紀錄),這裡以 2 的對數表示,到 `xs_capacity` 再進行換算。而對 `xs_capacity` 回傳值 - 1 的意義應該是為了扣掉 null character `\0`。 ### [`xs_new(xs *, const void *)`](https://hackmd.io/@babysuse2018/Linux核心設計_Lab1_3?view#xs_newxs--const-void-) 函數本身就是區分兩種狀況分別賦值 (短字串與中長字串) 比較有意思的是 `xs_tmp` 這個 macro,重新排版可以看成以逗號隔開的兩個 expressions ```c= ( (void) ( (struct { _Static_assert(sizeof(x) <= 16, "it is too big"); int dummy; }) {1} ), xs_new(&xs_literal_empty(), "" x) ) ``` 研究了半天我的理解和質疑如下 - 上半部就是用 C 的 macro `_Static_assert()` 來檢查,只是 - 第一,如果只是要檢查,何必宣告成 struct?畢竟`_Static_assert()` 再怎麼去實作不是 declaration 就是 expression 那多加一層 struct 的意義何在? - 第二,可以理解 `{1}` 是為了將 struct 的 declaration 轉換成 expression,但是變數 `int dummy` 的意義何在? - 最上方的 `(void)` 應該是用來丟棄 expression 的值,只是如果是這樣的話這個 casting 是不是有點多餘?畢竟以逗號隔開的 expressions 只會回傳最後一個 (也就是 `xs_new()` 的值) - 下半部就是用 `xs_new()` 來初始化變數 :::warning 你可以試著用更長的 C-style 字串進行初始化,會發現一些有趣的事 :notes: jserv ::: ### [`xs_trim(xs *, const char *)`](https://hackmd.io/@babysuse2018/Linux核心設計_Lab1_3?view#xs_trimxs--const-char-) 從 `mask[32]` 和 `check_bit(byte)`,`set_bit(byte)` 切入來看:<br> `check_bit` 可以改寫成 (`set_bit` 同理) ```cpp mask[(uint8_t) byte >> 3] & (1 << (uint8_t) byte % 8) ``` - cast 為數值型態才有辦法做位元運算,由於 char 為 1 byte,用 uint8_t 剛好 - 照我的理解看來,這行程式碼將 8 bits 的 ascii 分成兩個部份: leftmost 5 bits 和 rightmost 3 bits (為了避免 Big Edian 和 Little Edian 的狀況,我用 leftmost/rightmost 來取代前/後位或高/低位) - rightmost 會留 3 bits 原因在於最大表示到 7,也就是在不溢位前提下 `uint8_t` 能做到的最大的向左位移 - 剩下 leftmost 5 bits 自然就用 `mask[]` 的 index 來做編碼 - 接下來三個 for loop 依序為 - 設定要刪除掉的 masks (或稱「以 array of uint8_t 編碼的 ascii」) - 從頭刪 - 從尾刪<br><br> ### `xs_grow(xs *, size_t)` ```cpp /* grow up to specified size */ xs *xs_grow(xs *x, size_t len) { if (len <= xs_capacity(x)) return x; len = ilog2(len) + 1; if (xs_is_ptr(x)) x->ptr = realloc(x->ptr, (size_t) 1 << len); else { char buf[16]; memcpy(buf, x->data, 16); x->ptr = malloc((size_t) 1 << len); memcpy(x->ptr, buf, 16); } x->is_ptr = true; x->capacity = len; return x; } ``` - 如前面所提,`xs_grow` 是依照 capacity (2 的指數) 來索取記憶體 - 至於資料的轉移只有在短字串變為中長字串的時候才有。否則就是單純的 `realloc`<br><br> ### `xs_concat(xs *, const xs *, const xs *)` ```cpp xs *xs_concat(xs *string, const xs *prefix, const xs *suffix) { size_t pres = xs_size(prefix), sufs = xs_size(suffix), size = xs_size(string), capacity = xs_capacity(string); char *pre = xs_data(prefix), *suf = xs_data(suffix), *data = xs_data(string); if (size + pres + sufs <= capacity) { memmove(data + pres, data, size); memcpy(data, pre, pres); memcpy(data + pres + size, suf, sufs + 1); string->space_left = 15 - (size + pres + sufs); } else { xs tmps = xs_literal_empty(); xs_grow(&tmps, size + pres + sufs); char *tmpdata = xs_data(&tmps); memcpy(tmpdata + pres, data, size); memcpy(tmpdata, pre, pres); memcpy(tmpdata + pres + size, suf, sufs + 1); xs_free(string); *string = tmps; string->size = size + pres + sufs; } return string; } ``` - 就是區分為兩種狀況 - capacity 足夠的情形下,資料直接轉移 - capacity 不夠的情形下,由於 `xs_grow()` 資料不一定會轉移,所以直接除舊佈新,要了新的,釋放舊的<br><br> ## 改進方案 ***TBC***<br><br><br> # 實作 [strcpy](http://www.cplusplus.com/reference/cstring/strcpy/) 和 [strtok](http://www.cplusplus.com/reference/cstring/strtok/) 功能的函式 - `char *strcpy ( char *destination, const char *source);` - 回傳 destination<br> - `char *strtok ( char *str_parsed, const char *delimiters);` - `str_parsed` 或是 `NULL` 沿用上一個 `str_parsed` - 回傳開頭的 token<br> `strcpy` 功能函式實作如下 ```cpp #define COUNTER_SIZE 10 struct Ref_Counter { char *ref; size_t count; } ref_counter[COUNTER_SIZE]; static inline size_t xs_is_ref(const xs *x) { for (int i = 0; i < COUNTER_SIZE; ++i) if (ref_counter[i].ref == xs_data(x)) return ref_counter[i].count; return 0; } static inline void ref_add(char *str) { for (int i = 0; i < COUNTER_SIZE; ++i) if (!ref_counter[i].count) { ref_counter[i].ref = str; ref_counter[i].count = 2; break; } } // i can be +1 or -1 static inline void ref_chg(const char *str, int i) { for (int i = 0; i < COUNTER_SIZE; ++i) if (ref_counter[i].ref == str) { ref_counter[i].count += i; break; } } ``` ```cpp static inline xs *xs_free(xs *x) { if (xs_is_ptr(x)) { if(xs_is_ref(x) > 1) ref_chg(xs_data(x), -1); else free(xs_data(x)); } return xs_newempty(x); } ``` ```cpp void xs_copy(xs *dest, const xs *src) { size_t dest_size = xs_size(dest), src_size = xs_size(src); char *dest_str = xs_data(dest), *src_str = xs_data(src); xs_grow(dest, src_size); // CoW if long enough if (xs_is_ptr(src)) { xs_free(dest); dest->ptr = src_str; if(xs_is_ref(src)) ref_chg(xs_data(src), 1); else ref_add(xs_data(src)); } else { memcpy(dest_str, src, src_size + 1); } if (xs_is_ptr(dest)) dest->size = src_size; else dest->space_left = 15 - src_size; } ``` - 由於 `xs_grow()` 會檢查是否需要額外擴充空間,空間大小直接交給它檢查 - 若 `src` 為長字串,`dest` 根據 `xs_grow(xs *, size_t)` 的實作,必有配置記憶體。釋放完了之後由於是長字串,直接以 CoW 的方式賦值,以節省記憶空間 - 為了搭配 CoW, 順便實作了 Reference Counting - 理論上應該用 Linked List 來動態調整追蹤清單的大小,這裡為了簡單實作就直接使用 array - 由於加上了 Reference Counting,`xs_free(xs *)` 也就一併更改 再來是 `strtok` 功能函式實作如下 ```cpp char *xs_token(xs *xs_parsed, const char *delimiters) { static xs *xs_default = NULL; if (xs_parsed) xs_default = xs_parsed; if (!xs_default || !xs_size(xs_default)) return NULL; char *str = xs_data(xs_default); size_t str_size = strlen(str), del_size = strlen(delimiters); uint8_t mask[32] = {0}; #define check_bit(byte) (mask[(uint8_t) byte / 8] & 1 << (uint8_t) byte % 8) #define set_bit(byte) (mask[(uint8_t) byte / 8] |= 1 << (uint8_t) byte % 8) size_t begin_token, end_token; // find token for (int i = 0; i < del_size; ++i) set_bit(delimiters[i]); for (begin_token = 0; begin_token < str_size; ++begin_token) if (!check_bit(str[begin_token])) break; for (end_token = begin_token; end_token < str_size; ++end_token) if (check_bit(str[end_token])) break; end_token -= 1; size_t token_size = end_token - begin_token + 1; char *token = (char *) malloc(token_size + 1); memcpy(token, str + begin_token, token_size); token[ token_size ] = '\0'; // remove parsed section (include '\0') for (int i = 0; i <= str_size - end_token; ++i) str[i] = str[i + end_token + 1]; if (xs_is_ptr(xs_default)) xs_default->size -= end_token + 1; else xs_default->space_left += end_token + 1; return token; #undef check_bit #undef set_bit } ``` - 其中為了真正模仿 `strtok(NULL, delimiters)` 這種呼叫方式,多宣告了個 static 的 `xs_default` 在裡頭 - 尋找 token 的方式我沿用了 `xs_trim(xs *, const char *)` 中的 `set_bit(byte)` 和 `check_bit(byte)` - 唯一不確定的點在於回傳的函式要回傳字串有兩種方案: 第一種是用 static 變數來存字串;第二種是用 malloc 出的的記憶體來存。有鑑於前者宣告時的空間總有不足的可能性,又不想直接浪費空間宣告到最大,所以都傾向於用後者。可是後者的缺點在於為了避免 memory leak 的狀況發生,還要額外再去釋放每一次的回傳值。 測試結果如下 ```cpp int main() { xs string = *xs_tmp("\n foobarbar \n\n\n"); xs_trim(&string, "\n "); printf("%s : %2zu\n", xs_data(&string), xs_size(&string)); // testing xs_concat(xs *, xs *, xs *) xs prefix = *xs_tmp("(((((("), suffix = *xs_tmp("))))))"); xs_concat(&string, &prefix, &suffix); printf("%s : %2zu\n", xs_data(&string), xs_size(&string)); // testing xs_token(xs *, xs *) printf("\nbefore prefix: %s\n", xs_data(&prefix)); xs_copy(&prefix, &string); printf("after prefix: %s\n\n", xs_data(&prefix)); // testing xs_token(xs *, const char *) char *temp = NULL; temp = xs_token(&string, "r"); while (temp) { printf("%s\n", temp); if (temp) { free(temp); temp = NULL; } temp = xs_token(&string, "r"); } } ``` ``` foobarbar : 9 ((((((foobarbar)))))) : 21 before prefix: (((((( after prefix: ((((((foobarbar)))))) ((((((fooba ba )))))) ``` - 因為直接沿用的之前 `xs_concat(xs *, xs *, xs *)` 測試的變數,所以一併截錄出來 # 驗證空間效益 ***TBC*** # Linux Kernel 中類似案例 我參考的是 Understanding The Linux Kernel 這本書中所提到 CoW 的內容。 最初的 Unix 系統中,fork() 所創建的 process 的過程相當複雜、耗資源: - 替 child process 的 **Page Tables** 和 **Pages** 分配 **Page Frames** - 初始化 child process 的 **Page Tables** - 將 parent process 的 **Pages** 複製到 child process 相對應的 **Pages** 中 - 這邊順便幫自己複習一下 - **Pages Frames** 是 paging 機制下在 memory 上的單位,屬於 **physical pages** - **Pages** 是虛擬化之後程式用來分配的 **virtual pages** - **Page Table** 則是紀錄每個 process 所使用到 Pages 的清單 現代的 Linux kernels 採用 CoW 來分配 address space 給 child process。以下節錄出書中來自於 Linux kernel 2.6 版的程式碼 ```c= if (pte_present(entry)) { if (write_access) { if (!pte_write(entry)) return do_wp_page(mm, vma, address, pte, pmd, entry); entry = pte_mkdirty(entry) } ... } ``` 這是當 page fault exception 是由 *"access to a page present in memory"* 時所實行的 instructions。其中 [do_wp_page()](https://elixir.bootlin.com/linux/v2.6.31/source/mm/memory.c#L1947) 是在當 users 要去 write 一個 shared page 的時候,他會去讀取 page descriptor (以下摘錄一些 [do_wp_page()](https://elixir.bootlin.com/linux/v2.6.31/source/mm/memory.c#L1947) 的程式碼) ```c=1957 old_page = vm_normal_page(vma, address, orig_pte); if (!old_page) { /* * VM_MIXEDMAP !pfn_valid() case * * We should not cow pages in a shared writeable mapping. * Just mark the pages writable as we can't do any dirty * accounting on raw pfn maps. */ if ((vma->vm_flags & (VM_WRITE|VM_SHARED)) == (VM_WRITE|VM_SHARED)) goto reuse; goto gotten; } ``` ```c=2075 if (reuse) { reuse: flush_cache_page(vma, address, pte_pfn(orig_pte)); ... ``` ```c=2068 /* * Ok, we need to copy. Oh, well.. */ page_cache_get(old_page); gotten: pte_unmap_unlock(page_table, ptl); if (unlikely(anon_vma_prepare(vma))) goto oom; VM_BUG_ON(old_page == ZERO_PAGE(0)); new_page = alloc_page_vma(GFP_HIGHUSER_MOVABLE, vma, address); if (!new_page) goto oom; ``` ```c=2101 if (old_page) { if (!PageAnon(old_page)) { dec_mm_counter(mm, file_rss); inc_mm_counter(mm, anon_rss); } } else inc_mm_counter(mm, anon_rss); ``` 透過上面以及書中的論述提及 (vm(a) 是指 virtual memory (area)) - 如果 process 是擁有者或是 `_count` field 為零或是 page 為 writable 則不會 CoW - 反之如果對 process 而言是 write-protected,執行了 COW,`_count` 就會減少 - 第一部份 (1957 行) 是判斷是要複製與否 - 第二部份 (2075 行) 是不執行 CoW,直接寫入原 page - 第三部份 (2068 行) 是採用 CoW,分配一份 page 出來 - 第四部份 (2101 行) 則是搭配 reference counting 的部份