---
###### tags: `sysprog2021q1`
---
# 2021q1 Homework3 (quiz3)
contributed by < `93i7xo2` >
> Source: [J07: quiz3](https://hackmd.io/@sysprog/linux2021-quiz3)
- [x] 解釋上述程式碼運作原理,並提出改進方案;
- [x] 以上述程式碼為基礎,提供字串複製 (應考慮到 CoW) 的函式實作;
- [x] 設計實驗,確認上述字串處理最佳化 (針對空間效率) 帶來的效益,應考慮到 [locality of reference](https://en.wikipedia.org/wiki/Locality_of_reference),善用 GDB 追蹤和分析,確認字串的確符合預期地配置於 stack 或 heap;
- [ ] 嘗試將 [quiz2](https://hackmd.io/@sysprog/linux2021-quiz2) 提到的 [string interning](https://en.wikipedia.org/wiki/String_interning) 機制整合,提出對空間高度最佳化的字串處理工具函式庫
- [x] 在 Linux 核心原始程式碼中,找出類似手法的 SSO 或 CoW 案例並解說;
## 運作原理
### 資料結構
### xs_tmp
建立指定字串的 `xs` 物件。
```cpp
/* Memory leaks happen if the string is too long but it is still useful for
* short strings.
*/
#define xs_tmp(x) \
((void) ((struct { \
_Static_assert(sizeof(x) <= MAX_STR_LEN, "it is too big"); \
int dummy; \
}){1}), \
xs_new(&xs_literal_empty(), x))
#define xs_literal_empty() \
(xs) { .space_left = 15 }
```
相關技巧可參照 [hankluo6](https://hackmd.io/@hankluo6/2021q1quiz3) 的解釋
### `xs_free`
```cpp=
static inline xs *xs_free(xs *x)
{
if (xs_is_ptr(x) && xs_dec_refcnt(x) <= 0)
free(x->ptr);
return xs_newempty(x);
}
```
釋放中、長字串在 heap 所佔有的空間,由於儲存的字串可被其他 `xs` 物件所參照,因此只有在 `refcnt = 0` 時候才釋放。
### `xs_cow_lazy_copy`
此函式目的是用來為 `refcnt >= 2` 的長字串分配新的記憶體空間,實現 copy on write。簡單說,修改 `xs_copy` 複製出後的物件前,必須呼叫此函式複製字串。
```cpp=
static bool xs_cow_lazy_copy(xs *x, char **data)
{
if (xs_get_refcnt(x) <= 1)
return false;
/* Lazy copy */
xs_dec_refcnt(x);
xs_allocate_data(x, x->size, 0);
if (data) {
memcpy(xs_data(x), *data, x->size);
/* Update the newly allocated pointer */
*data = xs_data(x);
}
return true;
}
```
為了方便說明假設 `x` 為 `refcnt = 2` 的長字串。從 `xs_trim` 的一小段呼叫開始:
```cpp
char *dataptr = xs_data(x), *orig = dataptr;
if (xs_cow_lazy_copy(x, &dataptr))
orig = dataptr;
```
```cpp=6
/* Lazy copy */
xs_dec_refcnt(x);
xs_allocate_data(x, x->size, 0);
if (data) {
memcpy(xs_data(x), *data, x->size);
/* Update the newly allocated pointer */
*data = xs_data(x);
}
```
`xs_dec_refcnt(x)` 將附屬在長字串前的 `refcnt` 減一,`xs_allocate_data` 分配新的空間。`x->size` 有可能低於 `LARGE_STRING_LEN`,`xs_allocate_data` 為其分配中字串的空間,所以後續進行舊字串資料複製時,以 `xs_data(x)` 取得地址。最後將 `*data` 指向新字串地址。
:::warning
原本 `*data` 所指向的舊空間沒有被釋放,依然被其他 `xs` 物件所使用。
:::
### `xs_allocate_data`
分配新空間給中、長字串。
```cpp=
static void xs_allocate_data(xs *x, size_t len, bool reallocate)
{
/* Medium string */
if (len < LARGE_STRING_LEN) {
x->ptr = reallocate ? realloc(x->ptr, (size_t) 1 << x->capacity)
: malloc((size_t) 1 << x->capacity);
return;
}
/* Large string */
x->is_large_string = 1;
/* The extra 4 bytes are used to store the reference count */
x->ptr = reallocate ? realloc(x->ptr, (size_t)(1 << x->capacity) + 4)
: malloc((size_t)(1 << x->capacity) + 4);
xs_set_refcnt(x, 1);
}
```
常與 `xs_new` 一併使用
```cpp=
xs *xs_new(xs *x, const void *p)
{
*x = xs_literal_empty();
size_t len = strlen(p) + 1;
if (len > 16) { // NNN
x->capacity = ilog2(len) + 1;
x->size = len - 1;
x->is_ptr = true;
xs_allocate_data(x, x->size, 0);
memcpy(xs_data(x), p, len);
} else {
memcpy(x->data, p, len);
x->space_left = 15 - (len - 1);
}
return x;
}
```
### `xs_trim`
```cpp=
xs *xs_trim(xs *x, const char *trimset)
{
if (!trimset[0])
return x;
char *dataptr = xs_data(x), *orig = dataptr;
if (xs_cow_lazy_copy(x, &dataptr))
orig = dataptr;
```
`dataptr` 與 `orig` 同樣作為 `char *` 指向真實字串存放地址 `xs_data(x)`。而 [Printable characters](https://en.wikipedia.org/wiki/ASCII#Printable_characters) 的範圍落在 `010 0000`~`111 1110`,這裡使用類似分頁的技巧,將前 4 bit 視為 index,後 3 bit 對應到 `1 << x` 的數值,例如 `111 1110 => mask[15] = 1 << 6`,利用
```cpp
#define set_bit(byte) (mask[(uint8_t) byte / 8] |= 1 << (uint8_t) byte % 8)
```
將同一 index 數值匯集在同一位元組,預先建表。假設源字串長度 m,欲去除的字集長度 n,這樣的查詢方式相較使用時間複雜度為 $O(mn)$ 的雙層迴圈,只需 $O(n+2m)$。
```cpp=11
/* similar to strspn/strpbrk but it operates on binary data */
uint8_t mask[32] = {0};
#define check_bit(byte) (mask[(uint8_t) byte / 8] & 1 << (uint8_t) byte % 8) // CCC
#define set_bit(byte) (mask[(uint8_t) byte / 8] |= 1 << (uint8_t) byte % 8) // SSS
size_t i, slen = xs_size(x), trimlen = strlen(trimset);
for (i = 0; i < trimlen; i++)
set_bit(trimset[i]);
for (i = 0; i < slen; i++)
if (!check_bit(dataptr[i]))
break;
for (; slen > 0; slen--)
if (!check_bit(dataptr[slen - 1]))
break;
dataptr += i;
slen -= i;
```
圖解 20~25 行操作
```
|----------- len -----------|
dataptr-> | | | | | | |c|c|c|c|c| | | |
|---- i ----|
20~22
|-------- len' -------|
23~25
```
後續分別對字串紀錄長度。
```cpp=29
/* reserved space as a buffer on the heap.
* Do not reallocate immediately. Instead, reuse it as possible.
* Do not shrink to in place if < 16 bytes.
*/
memmove(orig, dataptr, slen);
/* do not dirty memory unless it is needed */
if (orig[slen])
orig[slen] = 0;
if (xs_is_ptr(x))
x->size = slen;
else
x->space_left = 15 - slen;
return x;
#undef check_bit
#undef set_bit
}
```
### `xs_grow`
作為 `xs_concat` 被呼叫的函式,目的是為 `x` 物件預留足夠的記憶體空間。`is_ptr` 的設置要在判斷後。
```diff
xs *xs_grow(xs *x, size_t len)
{
char buf[16];
if (len <= xs_capacity(x))
return x;
/* Backup first */
if (!xs_is_ptr(x))
memcpy(buf, x->data, 16);
-- x->is_ptr = true;
x->capacity = ilog2(len) + 1;
if (xs_is_ptr(x)) {
xs_allocate_data(x, len, 1);
} else {
++ x->is_ptr = true;
xs_allocate_data(x, len, 0);
memcpy(xs_data(x), buf, 16);
}
return x;
}
```
### `xs_concat`
作為 `xs.c` 的核心,涵蓋大部分上述的函式呼叫。擴展 `string` 的空間使其足以容納下 `prefix` 和 `suffix`。
```cpp
xs *xs_concat(xs *string, const xs *prefix, const xs *suffix)
{
size_t pres = xs_size(prefix), sufs = xs_size(suffix),
size = xs_size(string), capacity = xs_capacity(string);
char *pre = xs_data(prefix), *suf = xs_data(suffix),
*data = xs_data(string);
xs_cow_lazy_copy(string, &data);
if (size + pres + sufs <= capacity) {
memmove(data + pres, data, size);
memcpy(data, pre, pres);
memcpy(data + pres + size, suf, sufs + 1);
if (xs_is_ptr(string))
string->size = size + pres + sufs;
else
string->space_left = 15 - (size + pres + sufs);
} else {
xs tmps = xs_literal_empty();
xs_grow(&tmps, size + pres + sufs);
char *tmpdata = xs_data(&tmps);
memcpy(tmpdata + pres, data, size);
memcpy(tmpdata, pre, pres);
memcpy(tmpdata + pres + size, suf, sufs + 1);
xs_free(string);
*string = tmps;
string->size = size + pres + sufs;
}
return string;
}
```
## CoW 函數實作
### `xs_copy`
從 `xs_tmp` 看到,`xs` 物件的其實在呼叫 `xs_new()` 前就存在於 caller 的 stack,這點可透過 gdb 觀察 `xs_new()` 返回的指標的指向和 stack frame 的關係。
```cpp
#define xs_tmp(x) \
((void)((struct { \
_Static_assert(sizeof(x) <= MAX_STR_LEN, "it is too big"); \
int dummy; \
}){1}), \
xs_new(&xs_literal_empty(), x))
```
同理,`xs_free()` 的功能也只是在重設 `xs` 物件的內容,並不會真的去執行 `free(x)`,返回的 `xs*` 也非指向 `malloc` 新分配出的記憶體空間,而是參數 `x` 原本所指向的位置。
```cpp
static inline xs *xs_free(xs *x)
{
if (xs_is_ptr(x) && xs_dec_refcnt(x) <= 0)
free(x->ptr);
return xs_newempty(x);
}
```
如果沒有考慮到上述情境,實作出的 `xs_copy` 可能在離開函式後依然存取 `xs_literal_empty()` 創立的物件,造成 Undefined behavior。用 `cppcheck --enable=all` 或是 `gcc -fsanitize=undefined` 均無法偵測到,而 `gcc -O3` 會做更深層的分析,勉強可作為靜態分析工具使用。
```cpp
xs *xs_copy(xs *x)
{
xs *tmp = &xs_literal_empty();
...
return tmp;
}
```
```bash
~$ gcc -O3 -g test.c
warning: function returns address of local variable [-Wreturn-local-addr]
| return tmp;
| ^~~
```
最後實作如下,仿造 `xs_tmp` 呼叫時傳入 `&xs_literal_empty()`。用 `xs_is_large_string` 來決定是否分配新的記憶體空間。
```cpp
#define xs_copy(x) \
__xs_copy(&xs_literal_empty(), x)
xs *__xs_copy(xs* tmp, xs *x)
{
if (!xs_is_ptr(x))
{
memcpy(xs_data(tmp), x, SHORT_STRING_LEN + 1);
}
else
{
tmp->capacity = x->capacity;
tmp->is_ptr = true;
tmp->size = x->size;
if (xs_is_large_string(x))
{
xs_inc_refcnt(x);
tmp->is_large_string = true;
tmp->ptr = x->ptr;
}
else
{
xs_allocate_data(tmp, x->size, 0);
memcpy(xs_data(tmp), xs_data(x), x->size);
}
}
return tmp;
}
```
- 測試程式碼
```cpp
#include "xs.h"
int main(int argc, char *argv[])
{
xs short_string = *xs_tmp("\n 0123456789 \n");
xs short_string_dup = *xs_copy(&short_string);
printf("[%p]\n", xs_data(&short_string));
printf("[%p]\n", xs_data(&short_string_dup));
xs large_string = *xs_tmp("");
for (int i = 0; i < 16; ++i)
xs_concat(&large_string, xs_tmp(""), xs_tmp(" 0123456789abcde"));
xs large_string_dup = *xs_copy(&large_string);
printf("[%p]\n", xs_data(&large_string));
printf("[%p]\n", xs_data(&large_string_dup));
return 0;
}
```
- 輸出
```
[0x7fffc7934dd0]
[0x7fffc7934df0]
[0x55dcb99ceba4]
[0x55dcb99ceba4]
```
## `xs` v.s. libstdc++
gcc >= 5 後的 libstdc++ string 引入 SSO

```bash
g++ -std=c++11 -stdlib=libstdc++ input.cpp
```
```cpp=
#include <string>
#include <iostream>
int main() {
std::string a = std::string(5, 'a');
// 32, 15, 5
std::cout << sizeof(a) << ", " << a.capacity() << ", " << a.size() << std::endl;
}
```
即便如此,相同 capacity 下 24 bytes 的 `xs` 或 `fbstring` 仍然在空間上較 gcc string 少 [33%](https://github.com/CppCon/CppCon2016/blob/master/Presentations/The%20strange%20details%20of%20std%20string%20at%20Facebook/The%20strange%20details%20of%20std%20string%20at%20Facebook%20-%20Nicholas%20Ormrod%20-%20CppCon%202016.pdf)。若將 `xs` 擴展為 32 bytes 使其容下以 10 進位表示的 64-bit 隨機數或 user ID (`fbstring` 的使用情境) 的短字串,與 gcc string 進行同樣的讀寫測試(隨機修改陣列中的字串),發現 `string` 耗時和 cache-misses 次數也較多。
- 測試環境
- Intel G6400 CPU, L1/L2/L3: 64KB/512KB/4MB
- [g++ 9.3.0: libstdc++ 6.0.28](https://gcc.gnu.org/onlinedocs/libstdc++/manual/abi.html)
- gcc 9.3.0
- [程式碼](https://github.com/93i7xo2/sysprog2021q1/tree/master/quiz3)
```cpp
/* xs.c */
#define SIZE 131072
xs string[SIZE];
for (int i = 0; i < 1000; ++i)
{
xs_data(&string[rand() % SIZE])[20] = 'A';
}
```
```cpp
/* xs.cpp */
#define SIZE 131072
std::string str[SIZE];
for (int i = 0; i < 1000; ++i)
{
str[rand() % SIZE][20] = 'A';
}
```
- 測試結果
```bash
make test
```
```
Performance counter stats for 'taskset -c 3 ./xs':
41,9891 cache-misses:u
0.099210182 seconds time elapsed
0.032779000 seconds user
0.065558000 seconds sys
Performance counter stats for 'taskset -c 3 ./string':
145,7098 cache-misses:u
0.359553643 seconds time elapsed
0.235599000 seconds user
0.123789000 seconds sys
```
在 95% 信心水準下,`xs` 與 `string` 隨機修改字串所需時間。

觀察兩者的空間使用率
- 在 `xs` 測試中,一開始 stack 上的 5MB 分給 `xs arr[131072]=4MB` 及 `uint32_t* tmp[131072]=1MB`,後續 heap (黃) 512kB 配給 `tmp`。

- 在 `string` 測試中,heap (橘) 分配給 `string` 為每個字串 31 bytes,佔用 3.9MB (131072*31 bytes)。

## 整合 string interning
回顧 `__cstr_data` 的最佳化方法,起初定義 4 種型態
1. PERMANENT
2. INTERNING
3. ONSTACK
4. Others
接著看合併時的型態轉換
- 若欲合併的字串在 stack (*ONSTACK*),且合併產生的字串長度低於 `CSTR_STACK_SIZE`,複製到 stack 上預先配置好的空間。
- 若不符上條件,合併產生的字串長度仍低於 `CSTR_INTERNING_SIZE`,才儲存於預先配置空間的 interned table,此時字串型態設為 *ONSTACK*,否則新分配空間的字串型態設為 *Others*
- *Others* 型態的字串實作 CoW。reference count 只有在使用 `cstr_grab` 複製的字串時才會增加,但缺少如 `xs_cow_lazy_copy` 的實作。
`xs` 的 large string 有 CoW 的輔助,short string 實作 SSO,適合整合進 interned table 的應是 medium string,因此在 medium string 底下再細分新增一類 interned string,所有跟改變 `capacity` 相關的程式都需要改寫。
:::info
TODO
:::
## Linux 核心內的 CoW 案例
以 `linux/fs/btrfs` 為例。[F2FS and BtrFS](https://hackmd.io/@nckuoverload/F2FSBtrFS#Btrfs) 指出,Btrfs 是以 B-Tree 為基礎,當插入新的節點時,先複製一份父節點,再指向修改後的新的節點,並使用 counter 來紀錄多少節點指向自己,這些 counter 存在於節點之外的 extent-tree。
如果以 `refcount` 下去搜尋,會發現不少結構裡都有此成員。以實作在 [commit 16cdc](https://github.com/torvalds/linux/commit/16cdcec736cd214350cdb591bf1091f8beedefa0) 的 `btrfs_delayed_node` 為例:
```cpp
struct btrfs_delayed_node {
...
refcount_t refs;
...
};
```
以 `btrfs_get_delayed_node` 取得 inode 中的 delayed_node 時會將 refs+1,離開時呼叫 `atomic_dec` 將 ref-1。
```cpp
static inline struct btrfs_delayed_node *btrfs_get_delayed_node(
struct inode *inode)
{
...
if (delayed_node)
atomic_inc(&delayed_node->refs);
return delayed_node;
}
void btrfs_get_delayed_items(struct inode *inode, struct list_head *ins_list,
struct list_head *del_list)
{
struct btrfs_delayed_node *delayed_node;
struct btrfs_delayed_item *item;
delayed_node = btrfs_get_delayed_node(inode);
if (!delayed_node)
return;
mutex_lock(&delayed_node->mutex);
...
mutex_unlock(&delayed_node->mutex);
/*
* This delayed node is still cached in the btrfs inode, so refs
* must be > 1 now, and we needn't check it is going to be freed
* or not.
*
* Besides that, this function is used to read dir, we do not
* insert/delete delayed items in this period. So we also needn't
* requeue or dequeue this delayed node.
*/
atomic_dec(&delayed_node->refs);
}
```
> This delayed node is still cached in the btrfs inode, so refs must be > 1 now, and we needn't check it is going to be freed or not.
從註解可猜到,當 ref 歸 0 時應該有對應的函式負責清理。下方 `__btrfs_release_delayed_node` 的 `atomic_dec_and_test` 就在目標變數減去 1 為 0 時回傳 true,接著刪除。
```cpp
static void __btrfs_release_delayed_node(
struct btrfs_delayed_node *delayed_node,
int mod)
{
...
if (atomic_dec_and_test(&delayed_node->refs)) {
struct btrfs_root *root = delayed_node->root;
spin_lock(&root->inode_lock);
if (atomic_read(&delayed_node->refs) == 0) {
radix_tree_delete(&root->delayed_nodes_tree,
delayed_node->inode_id);
kmem_cache_free(delayed_node_cache, delayed_node);
}
spin_unlock(&root->inode_lock);
}
}
```
[commit 16cdc](https://github.com/torvalds/linux/commit/16cdcec736cd214350cdb591bf1091f8beedefa0) commit message 說明 delayed node 的出現是為了解決 btrfs 頻繁對 b+ tree 插入 inode/directory name item/directory name index,於是使用兩個 list 來維護用來創建或刪除檔案的 delayed node,另一個用來維護等待被 worker 處理的 delayed nodes,在 delay nodes 高於閥值才會增加到 work queue。delayed node 內各有兩個 rbTree,分別管理即將插入或刪除到 b+ tree 的 directory name index 。採用 delayed node 的作法能改善效能。