---
title: 2020 第三週作業
tags: _核心設計
breaks: true
---
# 2020q1 Homework2 (quiz2)
contributed by < `babysuse` >
# 大綱
[toc]
# 作業描述
以 Facebook 針對 C++ std::string 用 SSO (Small String Optimization) 和 CoW (Copy on Write) 來實作出高效替代品 folly::fbstring 結構如下:
```cpp
union {
struct {
char *data;
size_t sizse;
size_t capacity;
} heap;
struct {
char *data[23];
int8_t length;
} stack;
}
```
根據 [Folly 文件](https://github.com/facebook/folly/blob/master/folly/FBString.h#L208),長中短字串特性分別如下:<br>
1. 結尾 2 bits (rightmost) 用來標記長中短字串
- 00 為短字串
- 10 為中字串
- 01 為長字串
2. 短字串為字串長度 ≤ 23 者,而 length 紀錄「最大長度 − 實際長度」
- 當長度剛好為 23 時,第 24 位 length 便會是 `'\0'` (null charactor),其值剛好為 0
- 當長度為 ≤ 22 時,第 24 位 length 的左邊六位元 (扣掉用來標記長中短字串的兩位元) 拿來紀錄「最大長度 − 實際長度」
3. 在中長字串當中,
- 除了末 2 bits 之外,就是長字串使用到 reference-counted 和 copied lazily<br><br>
以 C 重寫,節錄出以下三段程式碼
### union xs
```cpp
typedef union {
char data[16];
struct {
uint8_t filler[15],
space_left : 4,
/* if it is on heap, set to 1 */
is_ptr : 1,
flag1 : 1,
flag2 : 1,
flag3 : 1;
};
/* heap allocated */
struct {
char *ptr;
size_t size : 54,
/* capacity is always a power of 2 (unsigned)-1 */
capacity : 6;
/* the last 4 bits are important flags */
};
} xs;
```
### xs_new(xs *, const void *)
```cpp
xs *xs_new(xs *x, const void *p)
{
*x = xs_literal_empty();
size_t len = strlen(p) + 1;
if (len > AAA) {
x->capacity = ilog2(len) + 1;
x->size = len - 1;
x->is_ptr = true;
x->ptr = malloc((size_t) 1 << x->capacity);
memcpy(x->ptr, p, len);
} else {
memcpy(x->data, p, len);
x->space_left = 15 - (len - 1);
}
return x;
}
#define xs_tmp(x) \
((void) ((struct { \
_Static_assert(sizeof(x) <= 16, "it is too big"); \
int dummy; \
}){1}), \
xs_new(&xs_literal_empty(), "" x))
```
### xs_trim(xs *, const char *)
```cpp
xs *xs_trim(xs *x, const char *trimset)
{
if (!trimset[0])
return x;
char *dataptr = xs_data(x), *orig = dataptr;
/* similar to strspn/strpbrk but it operates on binary data */
uint8_t mask[BBB] = {0};
#define check_bit(byte) (mask[(uint8_t) byte / 8] CCC 1 << (uint8_t) byte % 8)
#define set_bit(byte) (mask[(uint8_t) byte / 8] |= 1 << (uint8_t) byte % 8)
size_t i, slen = xs_size(x), trimlen = strlen(trimset);
for (i = 0; i < trimlen; i++)
set_bit(trimset[i]);
for (i = 0; i < slen; i++)
if (!check_bit(dataptr[i]))
break;
for (; slen > 0; slen--)
if (!check_bit(dataptr[slen - 1]))
break;
dataptr += i;
slen -= i;
/* reserved space as a buffer on the heap.
* Do not reallocate immediately. Instead, reuse it as possible.
* Do not shrink to in place if < 16 bytes.
*/
memmove(orig, dataptr, slen);
/* do not dirty memory unless it is needed */
if (orig[slen])
orig[slen] = 0;
if (xs_is_ptr(x))
x->size = slen;
else
x->space_left = 15 - slen;
return x;
#undef check_bit
#undef set_bit
}
```
預期達到
```c=
xs string = *xs_tmp(" foobarbar \n\n");
xs_trim(&string, "\n ");
printf("%s : %2zu\n", xs_data(&string), xs_size(&string));
xs prefix = *xs_tmp("((("), suffix = *xs_tmp(")))");
xs_concat(&string, &prefix, &suffix);
printf("%s : %2zu\n", xs_data(&string), xs_size(&string));
```
```
foobarbar : 9
(((foobarbar))) : 15
```
其中
- `xs_trim` 為移除指定字元的函式
- `xs_tmp` 透過 macro 以 `xs_new` 來創建新的字串
- `xs_literal_empty` 是以 [Designated Initializers](https://gcc.gnu.org/onlinedocs/gcc/Designated-Inits.html) 來將 size
- `ilog2` 二的對數 (二進位表示法下的位數 - 1)<br><br>
1. 試問: AAA, BBB, CCC 分別為何?(AAA, BBB 為整數,CCC 為運算子)
2. 程式碼運作原理和改進方案
3. 以上述程式碼為基礎實作 strcpy (CoW 列入考慮) 和 strtok 功能函式
4. 設計實驗確認上述字串帶來空間效益 (locality of reference 列入考慮)
5. 在 Linux Kernel 中找出類似 SSO 和 CoW 案例並解說
# 完成程式碼
首先比較特別的是這裡的「短字串」上限為 15 (後續位元的使用同 folly::fbstring),而用來初始化的 `xs_new` 判斷式是要區分短字串與中長字串,加上 + 1 為了加上 null character `'\0'`,所以 **AAA** 為 16
再來這邊註解提到的兩個函式
- [`strspn(str1, str2)`](http://www.cplusplus.com/reference/cstring/strspn/): 算出 `str1` 從頭開始連續有多少個 chars 在 `str2` 中
- [`strpbrk(str1, str2)`](http://www.cplusplus.com/reference/cstring/strpbrk/): 回傳 `str2` 任一 char 在 `str1` 首次出現的位置
-
我是覺得功能和這裡的 xs_trim 差有點多,但是可以理解應該是想要表達和 mask 的存在意義同樣為「做比較」的概念。不過 **BBB** 可以直接用位元進行判斷: `check_bit(byte)` 和 `set_bit(byte)` 中 cast 成的 `uint8_t` 為 8 bits,而 `/8` 相當於 `>> 3`,所以最大可以表示到 2^5^ - 1,因此 **BBB** 為 32
最後 `#define check_bit(...) ...` 開宗明義就直接講明是要檢查位元了,搭配 1 來進行檢查就必定是 `&`,因為 1 是 `&` 的單位元。
# 運作原理和改進方案
## 運作原理 / 解讀程式碼
這裡就按照上面和以下截錄出比較重要的程式碼講解:
### [`union xs`](https://hackmd.io/@babysuse2018/Linux核心設計_Lab1_3?view#union-xs)
主要就是兩種型態:
- 短字串 (儲存在 stack)
- 模仿利用 **rightmost byte** 來進行**長度**和 **flags** 的操作和紀錄
- 長度命名為 `space_left` 顧名思義為「最大長度 - 實際長度」
- flags 中 `is_ptr` 用來區分短字串與中長字串,其餘 3 個未定義
- 中長字串 (儲存在 heap)
- `ptr` 是字串/記憶體的 handler
- `size` 和 `capacity` 應該是想要模仿 C++ 中 [std::string](http://www.cplusplus.com/reference/string/string/) 的 [size](http://www.cplusplus.com/reference/string/string/size/) 和 [capacity](http://www.cplusplus.com/reference/string/string/capacity/) 的關係
- size 為字串長度 (即實際使用空間 - 1)
- capacity 為目前以分配到的記憶體空間 (並且都是以 2 的指數在分配),為了節省記憶體資源 (union 只以 6 bits 紀錄),這裡以 2 的對數表示,到 `xs_capacity` 再進行換算。而對 `xs_capacity` 回傳值 - 1 的意義應該是為了扣掉 null character `\0`。
### [`xs_new(xs *, const void *)`](https://hackmd.io/@babysuse2018/Linux核心設計_Lab1_3?view#xs_newxs--const-void-)
函數本身就是區分兩種狀況分別賦值 (短字串與中長字串)
比較有意思的是 `xs_tmp` 這個 macro,重新排版可以看成以逗號隔開的兩個 expressions
```c=
(
(void) (
(struct {
_Static_assert(sizeof(x) <= 16, "it is too big");
int dummy;
}) {1}
),
xs_new(&xs_literal_empty(), "" x)
)
```
研究了半天我的理解和質疑如下
- 上半部就是用 C 的 macro `_Static_assert()` 來檢查,只是
- 第一,如果只是要檢查,何必宣告成 struct?畢竟`_Static_assert()` 再怎麼去實作不是 declaration 就是 expression 那多加一層 struct 的意義何在?
- 第二,可以理解 `{1}` 是為了將 struct 的 declaration 轉換成 expression,但是變數 `int dummy` 的意義何在?
- 最上方的 `(void)` 應該是用來丟棄 expression 的值,只是如果是這樣的話這個 casting 是不是有點多餘?畢竟以逗號隔開的 expressions 只會回傳最後一個 (也就是 `xs_new()` 的值)
- 下半部就是用 `xs_new()` 來初始化變數
:::warning
你可以試著用更長的 C-style 字串進行初始化,會發現一些有趣的事
:notes: jserv
:::
### [`xs_trim(xs *, const char *)`](https://hackmd.io/@babysuse2018/Linux核心設計_Lab1_3?view#xs_trimxs--const-char-)
從 `mask[32]` 和 `check_bit(byte)`,`set_bit(byte)` 切入來看:<br>
`check_bit` 可以改寫成 (`set_bit` 同理)
```cpp
mask[(uint8_t) byte >> 3] & (1 << (uint8_t) byte % 8)
```
- cast 為數值型態才有辦法做位元運算,由於 char 為 1 byte,用 uint8_t 剛好
- 照我的理解看來,這行程式碼將 8 bits 的 ascii 分成兩個部份: leftmost 5 bits 和 rightmost 3 bits (為了避免 Big Edian 和 Little Edian 的狀況,我用 leftmost/rightmost 來取代前/後位或高/低位)
- rightmost 會留 3 bits 原因在於最大表示到 7,也就是在不溢位前提下 `uint8_t` 能做到的最大的向左位移
- 剩下 leftmost 5 bits 自然就用 `mask[]` 的 index 來做編碼
- 接下來三個 for loop 依序為
- 設定要刪除掉的 masks (或稱「以 array of uint8_t 編碼的 ascii」)
- 從頭刪
- 從尾刪<br><br>
### `xs_grow(xs *, size_t)`
```cpp
/* grow up to specified size */
xs *xs_grow(xs *x, size_t len)
{
if (len <= xs_capacity(x))
return x;
len = ilog2(len) + 1;
if (xs_is_ptr(x))
x->ptr = realloc(x->ptr, (size_t) 1 << len);
else {
char buf[16];
memcpy(buf, x->data, 16);
x->ptr = malloc((size_t) 1 << len);
memcpy(x->ptr, buf, 16);
}
x->is_ptr = true;
x->capacity = len;
return x;
}
```
- 如前面所提,`xs_grow` 是依照 capacity (2 的指數) 來索取記憶體
- 至於資料的轉移只有在短字串變為中長字串的時候才有。否則就是單純的 `realloc`<br><br>
### `xs_concat(xs *, const xs *, const xs *)`
```cpp
xs *xs_concat(xs *string, const xs *prefix, const xs *suffix)
{
size_t pres = xs_size(prefix), sufs = xs_size(suffix),
size = xs_size(string), capacity = xs_capacity(string);
char *pre = xs_data(prefix), *suf = xs_data(suffix),
*data = xs_data(string);
if (size + pres + sufs <= capacity) {
memmove(data + pres, data, size);
memcpy(data, pre, pres);
memcpy(data + pres + size, suf, sufs + 1);
string->space_left = 15 - (size + pres + sufs);
} else {
xs tmps = xs_literal_empty();
xs_grow(&tmps, size + pres + sufs);
char *tmpdata = xs_data(&tmps);
memcpy(tmpdata + pres, data, size);
memcpy(tmpdata, pre, pres);
memcpy(tmpdata + pres + size, suf, sufs + 1);
xs_free(string);
*string = tmps;
string->size = size + pres + sufs;
}
return string;
}
```
- 就是區分為兩種狀況
- capacity 足夠的情形下,資料直接轉移
- capacity 不夠的情形下,由於 `xs_grow()` 資料不一定會轉移,所以直接除舊佈新,要了新的,釋放舊的<br><br>
## 改進方案
***TBC***<br><br><br>
# 實作 [strcpy](http://www.cplusplus.com/reference/cstring/strcpy/) 和 [strtok](http://www.cplusplus.com/reference/cstring/strtok/) 功能的函式
- `char *strcpy ( char *destination, const char *source);`
- 回傳 destination<br>
- `char *strtok ( char *str_parsed, const char *delimiters);`
- `str_parsed` 或是 `NULL` 沿用上一個 `str_parsed`
- 回傳開頭的 token<br>
`strcpy` 功能函式實作如下
```cpp
#define COUNTER_SIZE 10
struct Ref_Counter {
char *ref;
size_t count;
} ref_counter[COUNTER_SIZE];
static inline size_t xs_is_ref(const xs *x)
{
for (int i = 0; i < COUNTER_SIZE; ++i)
if (ref_counter[i].ref == xs_data(x))
return ref_counter[i].count;
return 0;
}
static inline void ref_add(char *str)
{
for (int i = 0; i < COUNTER_SIZE; ++i)
if (!ref_counter[i].count) {
ref_counter[i].ref = str;
ref_counter[i].count = 2;
break;
}
}
// i can be +1 or -1
static inline void ref_chg(const char *str, int i)
{
for (int i = 0; i < COUNTER_SIZE; ++i)
if (ref_counter[i].ref == str) {
ref_counter[i].count += i;
break;
}
}
```
```cpp
static inline xs *xs_free(xs *x)
{
if (xs_is_ptr(x)) {
if(xs_is_ref(x) > 1)
ref_chg(xs_data(x), -1);
else
free(xs_data(x));
}
return xs_newempty(x);
}
```
```cpp
void xs_copy(xs *dest, const xs *src)
{
size_t dest_size = xs_size(dest), src_size = xs_size(src);
char *dest_str = xs_data(dest), *src_str = xs_data(src);
xs_grow(dest, src_size);
// CoW if long enough
if (xs_is_ptr(src)) {
xs_free(dest);
dest->ptr = src_str;
if(xs_is_ref(src))
ref_chg(xs_data(src), 1);
else
ref_add(xs_data(src));
} else {
memcpy(dest_str, src, src_size + 1);
}
if (xs_is_ptr(dest))
dest->size = src_size;
else
dest->space_left = 15 - src_size;
}
```
- 由於 `xs_grow()` 會檢查是否需要額外擴充空間,空間大小直接交給它檢查
- 若 `src` 為長字串,`dest` 根據 `xs_grow(xs *, size_t)` 的實作,必有配置記憶體。釋放完了之後由於是長字串,直接以 CoW 的方式賦值,以節省記憶空間
- 為了搭配 CoW, 順便實作了 Reference Counting
- 理論上應該用 Linked List 來動態調整追蹤清單的大小,這裡為了簡單實作就直接使用 array
- 由於加上了 Reference Counting,`xs_free(xs *)` 也就一併更改
再來是 `strtok` 功能函式實作如下
```cpp
char *xs_token(xs *xs_parsed, const char *delimiters)
{
static xs *xs_default = NULL;
if (xs_parsed)
xs_default = xs_parsed;
if (!xs_default || !xs_size(xs_default))
return NULL;
char *str = xs_data(xs_default);
size_t str_size = strlen(str), del_size = strlen(delimiters);
uint8_t mask[32] = {0};
#define check_bit(byte) (mask[(uint8_t) byte / 8] & 1 << (uint8_t) byte % 8)
#define set_bit(byte) (mask[(uint8_t) byte / 8] |= 1 << (uint8_t) byte % 8)
size_t begin_token, end_token;
// find token
for (int i = 0; i < del_size; ++i)
set_bit(delimiters[i]);
for (begin_token = 0; begin_token < str_size; ++begin_token)
if (!check_bit(str[begin_token]))
break;
for (end_token = begin_token; end_token < str_size; ++end_token)
if (check_bit(str[end_token]))
break;
end_token -= 1;
size_t token_size = end_token - begin_token + 1;
char *token = (char *) malloc(token_size + 1);
memcpy(token, str + begin_token, token_size);
token[ token_size ] = '\0';
// remove parsed section (include '\0')
for (int i = 0; i <= str_size - end_token; ++i)
str[i] = str[i + end_token + 1];
if (xs_is_ptr(xs_default))
xs_default->size -= end_token + 1;
else
xs_default->space_left += end_token + 1;
return token;
#undef check_bit
#undef set_bit
}
```
- 其中為了真正模仿 `strtok(NULL, delimiters)` 這種呼叫方式,多宣告了個 static 的 `xs_default` 在裡頭
- 尋找 token 的方式我沿用了 `xs_trim(xs *, const char *)` 中的 `set_bit(byte)` 和 `check_bit(byte)`
- 唯一不確定的點在於回傳的函式要回傳字串有兩種方案: 第一種是用 static 變數來存字串;第二種是用 malloc 出的的記憶體來存。有鑑於前者宣告時的空間總有不足的可能性,又不想直接浪費空間宣告到最大,所以都傾向於用後者。可是後者的缺點在於為了避免 memory leak 的狀況發生,還要額外再去釋放每一次的回傳值。
測試結果如下
```cpp
int main()
{
xs string = *xs_tmp("\n foobarbar \n\n\n");
xs_trim(&string, "\n ");
printf("%s : %2zu\n", xs_data(&string), xs_size(&string));
// testing xs_concat(xs *, xs *, xs *)
xs prefix = *xs_tmp("(((((("), suffix = *xs_tmp("))))))");
xs_concat(&string, &prefix, &suffix);
printf("%s : %2zu\n", xs_data(&string), xs_size(&string));
// testing xs_token(xs *, xs *)
printf("\nbefore prefix: %s\n", xs_data(&prefix));
xs_copy(&prefix, &string);
printf("after prefix: %s\n\n", xs_data(&prefix));
// testing xs_token(xs *, const char *)
char *temp = NULL;
temp = xs_token(&string, "r");
while (temp) {
printf("%s\n", temp);
if (temp) {
free(temp);
temp = NULL;
}
temp = xs_token(&string, "r");
}
}
```
```
foobarbar : 9
((((((foobarbar)))))) : 21
before prefix: ((((((
after prefix: ((((((foobarbar))))))
((((((fooba
ba
))))))
```
- 因為直接沿用的之前 `xs_concat(xs *, xs *, xs *)` 測試的變數,所以一併截錄出來
# 驗證空間效益
***TBC***
# Linux Kernel 中類似案例
我參考的是 Understanding The Linux Kernel 這本書中所提到 CoW 的內容。
最初的 Unix 系統中,fork() 所創建的 process 的過程相當複雜、耗資源:
- 替 child process 的 **Page Tables** 和 **Pages** 分配 **Page Frames**
- 初始化 child process 的 **Page Tables**
- 將 parent process 的 **Pages** 複製到 child process 相對應的 **Pages** 中
- 這邊順便幫自己複習一下
- **Pages Frames** 是 paging 機制下在 memory 上的單位,屬於 **physical pages**
- **Pages** 是虛擬化之後程式用來分配的 **virtual pages**
- **Page Table** 則是紀錄每個 process 所使用到 Pages 的清單
現代的 Linux kernels 採用 CoW 來分配 address space 給 child process。以下節錄出書中來自於 Linux kernel 2.6 版的程式碼
```c=
if (pte_present(entry)) {
if (write_access) {
if (!pte_write(entry))
return do_wp_page(mm, vma, address, pte, pmd, entry);
entry = pte_mkdirty(entry)
}
...
}
```
這是當 page fault exception 是由 *"access to a page present in memory"* 時所實行的 instructions。其中 [do_wp_page()](https://elixir.bootlin.com/linux/v2.6.31/source/mm/memory.c#L1947) 是在當 users 要去 write 一個 shared page 的時候,他會去讀取 page descriptor (以下摘錄一些 [do_wp_page()](https://elixir.bootlin.com/linux/v2.6.31/source/mm/memory.c#L1947) 的程式碼)
```c=1957
old_page = vm_normal_page(vma, address, orig_pte);
if (!old_page) {
/*
* VM_MIXEDMAP !pfn_valid() case
*
* We should not cow pages in a shared writeable mapping.
* Just mark the pages writable as we can't do any dirty
* accounting on raw pfn maps.
*/
if ((vma->vm_flags & (VM_WRITE|VM_SHARED)) ==
(VM_WRITE|VM_SHARED))
goto reuse;
goto gotten;
}
```
```c=2075
if (reuse) {
reuse:
flush_cache_page(vma, address, pte_pfn(orig_pte));
...
```
```c=2068
/*
* Ok, we need to copy. Oh, well..
*/
page_cache_get(old_page);
gotten:
pte_unmap_unlock(page_table, ptl);
if (unlikely(anon_vma_prepare(vma)))
goto oom;
VM_BUG_ON(old_page == ZERO_PAGE(0));
new_page = alloc_page_vma(GFP_HIGHUSER_MOVABLE, vma, address);
if (!new_page)
goto oom;
```
```c=2101
if (old_page) {
if (!PageAnon(old_page)) {
dec_mm_counter(mm, file_rss);
inc_mm_counter(mm, anon_rss);
}
} else
inc_mm_counter(mm, anon_rss);
```
透過上面以及書中的論述提及 (vm(a) 是指 virtual memory (area))
- 如果 process 是擁有者或是 `_count` field 為零或是 page 為 writable 則不會 CoW
- 反之如果對 process 而言是 write-protected,執行了 COW,`_count` 就會減少
- 第一部份 (1957 行) 是判斷是要複製與否
- 第二部份 (2075 行) 是不執行 CoW,直接寫入原 page
- 第三部份 (2068 行) 是採用 CoW,分配一份 page 出來
- 第四部份 (2101 行) 則是搭配 reference counting 的部份