# 2021q1 Homework2 (quiz2)
contributed by < [`toastcheng`](https://github.com/toastcheng) >
###### tags: `linux2021`
## 進度
- [x] Q1 - 解釋上述程式碼運作原理,指出改進空間並著手實作
- [x] Q1 - 將 `lib/list_sort.c` 的實作抽離為可單獨執行的使用層級應用程式
- [x] Q1 - 並設計效能評比的程式碼,說明 Linux 核心的 `lib/list_sort.c` 最佳化手法
- [ ] Q1 - 將 quiz1 提出的改進實作,和 Linux 核心的 `lib/list_sort.c` 進行效能比較,需要提出涵蓋足夠廣泛的 benchmark
- [x] Q2 - 解釋程式碼運作原理
- [ ] Q2 - 舉例說明 `is_power_of_2` 其作用和考量
- [ ] Q2 - 研讀 slab allocator,探索 Linux 核心的 slab 實作,找出運用 power-of-2 的程式碼和解讀
- [ ] Q3
- [ ] Q4 - 程式碼運作原理
- [ ] Q4 - 多執行緒環境下的 string interning
- [ ] Q4 - 分析 `chriso/intern` 效能表現
## Q1: Linux 的 `include/linux/list.h`
### 1. 解釋上述程式碼運作原理,指出改進空間並著手實作
#### `container_of`
```cpp
#define container_of(ptr, type, member) \
__extension__({ \
const __typeof__(((type *) 0)->member) *__pmember = (ptr); \
(type *) ((char *) __pmember - offsetof(type, member)); \
})
```
`container_of` 包含了 `__extension__` 和 `__typeof__` 的使用,使用 `__extension__()` 的效果是在這其中的 GCC extension 時編譯器不會跳出警告,因為開發者有明確表示自己要使用 extension (而這裡使用到的 extension 就是 `__typeof__`)。
`__typeof__()` 可以給出目標的型別,第 3 行中宣告了 `__pmember` 變數並將 `ptr` 的值指派給 `__pmember`。
`__pmember` 的型別則是 `__typeof__(((type *) 0)->member)`,也就是`list_head`。`type` 和 `member` 是傳給 `container_of` 的參數,分別是 container 的型別 (`listitem`),以及連結串列指標在該 container 中的名字 (`list`)。
```cpp
const __typeof__(((type *) 0)->member) *__pmember = (ptr);
```
再細看 `__typeof__(((type *) 0)->member)` 這個技巧,是先將 0 轉型為 container 的指標 (`listitem*`),再對 member (`list`) 取值,因此會得到 `list*` 型別,傳進`__typeof__` 便得到 `list*`。
相等於:
```cpp
const list_head *__pmember = (ptr);
```
最後再將 `ptr` 的記憶體位址用 `offsetof()` 減去其在 container 裡的偏移量,便可得到該 container 的位置。
```cpp
(type *) ((char *) __pmember - offsetof(type, member));
```
看到 `((type *) 0)->member` 的用法令我很吃驚,0 應該是空指標指向的位址,若真的對空指標取值會造成 segmentation fault,因此應該是 `__typeof__` 這個 extension 可以不去 dereference 便知道,這部分尚未深入研究,但是很快地自己試了一下:
```cpp
typedef struct {
int x;
} s;
int main() {
__typeof__(((s *) 0)->x) a = 1; // int a = 1;
printf("%d\n", a); // output: 1
return 0;
}
```
如果沒有真的 dereference,那這個指標似乎也不一定要是 0:
```cpp
int main() {
__typeof__(((s *) 5566)->x) a = 1; // int a = 1;
printf("%d\n", a); // output: 1
return 0;
}
```
同樣可以達到一樣的效果。
:::warning
TODO: 比較產生的組合語言列表
:notes: jserv
:::
:::info
試著比較組合語言列表,發現完全沒有不同:
```shell
$ gcc -o t1 -O0 typeof_test_1.c
$ gcc -o t2 -O0 typeof_test_2.c
$ objdump -d ./t1 > o1
$ objdump -d ./t2 > o2
$ diff o1 o2
2c2
< ./t1: file format elf64-x86-64
---
> ./t2: file format elf64-x86-64
```
```
0000000000001129 <main>:
1129: f3 0f 1e fa endbr64
112d: 55 push %rbp
112e: 48 89 e5 mov %rsp,%rbp
1131: c7 45 fc 01 00 00 00 movl $0x1,-0x4(%rbp)
1138: b8 00 00 00 00 mov $0x0,%eax
113d: 5d pop %rbp
113e: c3 retq
113f: 90 nop
```
一時不知該從何分析,後來想到用完全沒有 `__typeof__` 的程式觀察看看:
```cpp
typedef struct {
int x;
} s;
int main() {
int a = 1;
return 0;
}
```
```shell
$ gcc -o t3 -O0 typeof_test_3.c
$ objdump -d ./t3 > o3
$ diff o1 o3
2c2
< ./t1: file format elf64-x86-64
---
> ./t3: file format elf64-x86-64
```
也沒有任何組合語言上的不同,由此觀察 `__typeof__` 在使用上是應該是在 compile time 就完成判斷,而非 run time,與其相對的是許多程式語言中會有的 reflect 概念,在 runtime 時才去觀察一變數的型態。
:::
兜了一圈的感覺,思考為什麼需要這樣做?後來了解這樣能得到 ~~type safe~~ double evaluation 的效果,拿 `max` 來說明:
```cpp
#define max(a,b) \
({ __typeof__ (a) _a = (a); \
__typeof__ (b) _b = (b); \
_a > _b ? _a : _b; })
```
如果使用下面的方式,看起來感覺更簡潔:
```cpp
#define max(a, b) (((a) > (b)) ? (a) : (b))
```
但稍微設計一下以下情境,就可以了解好處,`max_nonsafe` 中 increment 執行了兩次。
:::warning
正式術語為 double evaluation,可見 [MIN and MAX macro considered harmful](https://dustri.org/b/min-and-max-macro-considered-harmful.html)
:notes: jserv
:::
:::info
已修正!
:::
```cpp
#define max_safe(a,b) \
({ __typeof__ (a) _a = (a); \
__typeof__ (b) _b = (b); \
_a > _b ? _a : _b; })
#define max(a,b) \
((a) > (b)) ? (a) : (b)
int main() {
int a = 1, b = 2;
int c = 1, d = 2;
printf("safe: %d\n", max_safe(a++, b++));
printf("not safe: %d\n", max(c++, d++));
printf("a: %d, b: %d, c: %d, d: %d\n", a, b, c, d);
return 0;
}
```
> 預期輸出:
> ```
> safe: 2
> not safe: 3
> a: 2, b: 3, c: 2, d: 4
> ```
#### `list_del`
```cpp
static inline void list_del(struct list_head *node)
{
struct list_head *next = node->next, *prev = node->prev;
next->prev = prev; prev->next = next;
}
```
要移去一個節點不需要 `head`,只需要將該節點的前後相連即可。
#### `list_empty` 和 `list_is_singular`
```cpp
static inline int list_empty(const struct list_head *head)
{
return (head->next == head);
}
```
因為是 circular 的 linked list,要判斷 list 是否為空只要確認 `head` 的下一個是不是自己就行了。
```cpp
static inline int list_is_singular(const struct list_head *head)
{
return (!list_empty(head) && head->prev == head->next);
}
```
而判斷是否只有單獨一個節點 (i.e. size = 1) 的方式則是先確認 list 非空,再檢查 `head` 的前一個和後一個是否相同。
#### `list_splice_tail`
```cpp
static inline void list_splice_tail(struct list_head *list,
struct list_head *head)
{
struct list_head *head_last = head->prev;
struct list_head *list_first = list->next, *list_last = list->prev;
if (list_empty(list))
return;
head->prev = list_last;
list_last->next = head;
list_first->prev = head_last;
head_last->next = list_first;
}
```
將 `list` 接在 `head` 的尾端。
#### `list_cut_position`
```cpp
static inline void list_cut_position(struct list_head *head_to,
struct list_head *head_from,
struct list_head *node)
{
struct list_head *head_from_first = head_from->next;
if (list_empty(head_from))
return;
if (head_from == node) {
INIT_LIST_HEAD(head_to);
return;
}
head_from->next = node->next;
head_from->next->prev = head_from;
head_to->prev = node;
node->next = head_to;
head_to->next = head_from_first;
head_to->next->prev = head_to;
}
```
- 將 `head_to` 的開頭雙向地設成 `head_from` 的開頭, `head_to` 的結尾雙向地設成 `node`。
- 將 `head_from` 的開頭雙向地設成 `node` 的下一個節點,`head_from` 的結尾維持不動。
操作結束後,`head_to` 會是左半邊從開頭到 `node` 的 linked list,而 `head_from` 會是右半邊從 `node` 之後到結尾的 linked list。
#### `get_middle`
```cpp
static list_ele_t *get_middle(struct list_head *list)
{
struct list_head *fast = list->next, *slow;
list_for_each (slow, list) {
if (fast->next == list || fast->next->next == list)
break;
fast = fast->next->next;
}
return list_entry(/* TTT */ slow, list_ele_t, list);
}
```
用快慢兩個指標,走訪節點,`slow` 每次往前一個節點,`fast` 每次往前兩個節點。
當 `fast` 到達最後一個節點或者到達倒數第二個節點時終止迴圈,假設 linked list 有 N 個節點,此時 `slow` 會正好在:
- $N/2$ 如果 N 是偶數
- $\lfloor N/2 \rfloor +1$ 如果 N 是奇數
#### `list_merge_sort`
```cpp
void list_merge_sort(queue_t *q)
{
if (list_is_singular(&q->list))
return;
queue_t left;
struct list_head sorted;
INIT_LIST_HEAD(&left.list);
list_cut_position(&left.list, &q->list, /* MMM */ &get_middle(&q->list)->list);
list_merge_sort(&left);
list_merge_sort(q);
list_merge(&left.list, &q->list, &sorted);
INIT_LIST_HEAD(&q->list);
list_splice_tail(&sorted, &q->list);
}
```
`list_merge_sort` 很符合 high level 的 merge sort 演算法,首先利用 `get_middle` 找到位於中間(或兩個中間節點的左邊)的節點,再用 `list_cut_position` 將 linked list 劃分為左右兩個,接著遞迴地呼叫。接著 `list_merge` 將左右已分別排序好的 linked list merge。
#### `list_merge`
```cpp
static void list_merge(struct list_head *lhs,
struct list_head *rhs,
struct list_head *head)
{
INIT_LIST_HEAD(head);
if (list_empty(lhs)) {
list_splice_tail(lhs, head);
return;
}
if (list_empty(rhs)) {
list_splice_tail(rhs, head);
return;
}
while (!list_empty(lhs) && !list_empty(rhs)) {
char *lv = list_entry(lhs->next, list_ele_t, list)->value;
char *rv = list_entry(rhs->next, list_ele_t, list)->value;
struct list_head *tmp = strcmp(lv, rv) <= 0 ? lhs->next : rhs->next;
list_del(tmp);
list_add_tail(tmp, head);
}
list_splice_tail(list_empty(lhs) ? rhs : lhs, head);
}
```
真正發生 merge 的函式,執行時 `lhs` 和 `rhs`都必須是已排序好的 linked list,`list_merge_sort` 必須確保這一點。
將兩個 linked list 從第一個節點開始比較,較小的就接到 `head` 的尾巴,然後前進到下一個節點繼續比較。
其中有一個 linked list 走訪完所有節點後,把另外一個 linked list 直接接在 `head` 的尾端,完成 merge。
### 觀察可改進的地方
觀察資料結構 `list_ele_t` 會發現矛盾的事情,`list_head` 的功能是,只要將其嵌入任何 struct,就能創造出以該 struct 為節點的 linked list ,也能便利地使用 `list_head` 提供的函式。而該 struct 就不需要知道任何前後 struct 的資訊,例如 `next`,因此可以將 `list_ele_t` 中的 `next` 移除。
```cpp
typedef struct __element {
char *value;
struct __element *next; // it's redundant
struct list_head list;
} list_ele_t;
```
而 `queue_t` 中也有類似的矛盾:
```cpp
typedef struct {
list_ele_t *head; /* Linked list of elements */
list_ele_t *tail;
size_t size;
struct list_head list;
} queue_t;
```
只要有作為 linked list 開頭的 `list_head`,我們就能取得開頭、結尾等資訊,`queue_t` 的 `head`, `tail`, `list` 其實都不需要,但 `list_head` 沒有 `queue_t` 的 `size`, `q_free` 等功能,能不能將這些功能都實作在 `list_head` 全都要呢?
先將本來用到 `queue_t` 之處都直接換成 `list_head`,改寫對應的函式。
再來考量到如果 `list_head` 想要支援不同的 container,那 free 的邏輯應該由 container 負責實作,因此 `list_head` 由一個 macro `list_free`,讓 container 提供 `free_func` 來釋放記憶體。
```cpp
#define list_free(head, type, member, free_func) \
struct list_head *_node; \
list_for_each(_node, head) { \
free_func(list_entry(_node, type, member)); \
}
```
`list_ele_t` 的 `free_func`:
```cpp
void ele_free(list_ele_t *ele) {
free(ele->value);
free(ele);
}
```
用 valgrind 檢查,可以確認所有記憶體都有被釋放,但出現 `Invalid read of size 8`:
```shell
$ valgrind --leak-check=full ./t
...
==9985== Invalid read of size 8
==9985== at 0x109B18: main (in /data/week2/q1/t)
==9985== Address 0x56ac6b0 is 16 bytes inside a block of size 24 free'd
==9985== at 0x483CA3F: free (in /usr/lib/x86_64-linux-gnu/valgrind/vgpreload_memcheck-amd64-linux.so)
==9985== by 0x10993C: ele_free (in /data/week2/q1/t)
==9985== by 0x109B10: main (in /data/week2/q1/t)
==9985== Block was alloc'd at
==9985== at 0x483B7F3: malloc (in /usr/lib/x86_64-linux-gnu/valgrind/vgpreload_memcheck-amd64-linux.so)
==9985== by 0x109871: list_insert_tail (in /data/week2/q1/t)
==9985== by 0x109A5F: main (in /data/week2/q1/t)
==9985==
==9985==
==9985== HEAP SUMMARY:
==9985== in use at exit: 0 bytes in 0 blocks
==9985== total heap usage: 281,483 allocs, 281,483 frees, 6,031,675 bytes allocated
==9985==
==9985== All heap blocks were freed -- no leaks are possible
==9985==
==9985== For lists of detected and suppressed errors, rerun with: -s
==9985== ERROR SUMMARY: 93827 errors from 1 contexts (suppressed: 0 from 0)
```
從 `Address 0x56ac6b0 is 16 bytes inside a block of size 24 free'd` 可以推敲 `16 bytes` 是兩個指標的大小,也就是 `list_head`,而 `24 bytes` 則應該是 `list_ele_t`,含有一個 `list_ele_t` 再加上一個 `char *`,共 24 bytes,也就是說程式中已經 free 了 `list_ele_t`,但還去讀在他其中的 `list_head`。
經一番研究後,發現是走訪節點和釋放記憶體的順序問題,因為使用 `list_for_each` 這個 macro 時會在最後將 `_node` 指標往前一步,但這時該指標所屬的記憶體空間早已被釋放了,因此 `_node->next` 這個行為就讀取到了已經釋放的記憶體區域。
將順序修改後的版本:
```cpp
#define list_free(head, type, member, free_func) \
struct list_head *_node = (head)->next; \
while (_node != (head)) { \
struct list_head *_tmp = _node; \
_node = _node->next; \
free_func(list_entry(_tmp, type, member)); \
}
```
順利執行:
```shell
==9991== HEAP SUMMARY:
==9991== in use at exit: 0 bytes in 0 blocks
==9991== total heap usage: 281,483 allocs, 281,483 frees, 6,031,675 bytes allocated
==9991==
==9991== All heap blocks were freed -- no leaks are possible
==9991==
==9991== For lists of detected and suppressed errors, rerun with: -s
==9991== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
```
詳細的程式碼在 [GitHub](https://github.com/ToastCheng/linux2021_w2_q1/blob/main/mergesort.c)。
:::info
小記:透過 `offsetof` 推算 `list_head` 所屬的 struct 的記憶體位址,簡單的 struct 只要將 `list_head` 加入 member ,頓時也有了像是 `extends`, `implements` 等高階物件導向語言才有的特性,更加理解物件導向是一種設計哲學這件事(某語言是/不是物件導向語言並不是絕對)。
:::
:::warning
TODO: 研讀 [Everything you never wanted to know about kobjects, ksets, and ktypes](https://www.kernel.org/doc/Documentation/kobject.txt),裡頭舉了生動的案例,說明 Linux 核心如何在 C 語言基礎之上,建構物件導向程式設計環境:
> the UIO code in drivers/uio/uio.c has a structure that defines the memory region associated with a uio device:
> ```cpp
> struct uio_map {
> struct kobject kobj;
> struct uio_mem *mem;
> };
> ```
>
> If you have a struct uio_map structure, finding its embedded kobject is just a matter of using the kobj member. Code that works with kobjects will often have the opposite problem, however: given a struct kobject pointer, what is the pointer to the containing structure? You must avoid tricks (such as assuming that the kobject is at the beginning of the structure) and, instead, use the container_of() macro, found in <linux/kernel.h>:
> ```cpp
> container_of(pointer, type, member)
> ```
>
> where:
> * "pointer" is the pointer to the embedded kobject,
> * "type" is the type of the containing structure, and
> * "member" is the name of the structure field to which "pointer" points.
>
> The return value from container_of() is a pointer to the corresponding container type. So, for example, a pointer "kp" to a struct kobject embedded *within* a struct uio_map could be converted to a pointer to the
*containing* uio_map structure with:
> ```cpp
> struct uio_map *u_map = container_of(kp, struct uio_map, kobj);
> ```
>
> For convenience, programmers often define a simple macro for "back-casting" kobject pointers to the containing type. Exactly this happens in the
earlier drivers/uio/uio.c, as you can see here:
> ```cpp
> struct uio_map {
> struct kobject kobj;
> struct uio_mem *mem;
> };
>
> #define to_map(map) container_of(map, struct uio_map, kobj)
> ```
>
> where the macro argument "map" is a pointer to the struct kobject in question. That macro is subsequently invoked with:
> ```cpp
> struct uio_map *map = to_map(kobj);
> ```
:notes: jserv
:::
### 2. 將 `lib/list_sort.c` 的實作抽離為可單獨執行的使用層級應用程式
#### `list_sort` 原始碼閱讀
先從註解開始,`list_sort` 支援兩種回傳值的 `cmp` 函式:
```cpp
/*
* This is compatible with two styles of @cmp function:
* - The traditional style which returns <0 / =0 / >0, or
* - Returning a boolean 0/1.
*/
```
並提供了一個 `cmp` 函式的範例:
```cpp
/*
* A good way to write a multi-word comparison is::
*
* if (a->high != b->high)
* return a->high > b->high;
* if (a->middle != b->middle)
* return a->middle > b->middle;
* return a->low > b->low;
*/
```
在 quick sort 裡最好的情況應該是每次都能用 pivot 對半切,達到較少的比較次數,merge sort 因為可以總是對半切,所以能保證 $O(n \log n)$,但這裡卻選擇 2:1 或更不平衡的切法:
```cpp
/*
* This mergesort is as eager as possible while always performing at least
* 2:1 balanced merges. Given two pending sublists of size 2^k, they are
* merged to a size-2^(k+1) list as soon as we have 2^k following elements.
*/
```
這裡的考量不只是演算法層面,還有硬體層面,之所以要切成不平衡的比例,是為了讓較小的 list 可以完整放進 L1 cache。
```cpp
/*
* Thus, it will avoid cache thrashing as long as 3*2^k elements can
* fit into the cache. Not quite as good as a fully-eager bottom-up
* mergesort, but it does use 0.2*n fewer comparisons, so is faster in
* the common case that everything fits into L1.
*/
```
而要怎麼 merge 是由變數 `count` 決定的,count 的第 k 個 bit 代表現存大小為 $2^k$ sublist 的數量,所以註解說的意思是每次多了一個大小 $2^k$ 的 sublist 時,因為一定是消耗了 $2^{k-1}$ 的 sublist 來 merge 成 $2^k$,所以要把 bit k-1 設成 0。再仔細想$2^{k-1}$ 也是從 $2^{k-2}$ merge 來的,所以註解才會說 "clear bits k-1 .. 0"。
(發現裡面有個 typo:`beautiully`)
```cpp
/*
* The merging is controlled by "count", the number of elements in the
* pending lists. This is beautiully simple code, but rather subtle.
*
* Each time we increment "count", we set one bit (bit k) and clear
* bits k-1 .. 0. Each time this happens (except the very first time
* for each bit, when count increments to 2^k), we merge two lists of
* size 2^k into one list of size 2^(k+1).
*
```
接著是偏向描述 merge 時的細節:
```cpp
/*
* This merge happens exactly when the count reaches an odd multiple of
* 2^k, which is when we have 2^k elements pending in smaller lists,
* so it's safe to merge away two lists of size 2^k.
*
* After this happens twice, we have created two lists of size 2^(k+1),
* which will be merged into a list of size 2^(k+2) before we create
* a third list of size 2^(k+1), so there are never more than two pending.
*
* The number of pending lists of size 2^k is determined by the
* state of bit k of "count" plus two extra pieces of information:
*
* - The state of bit k-1 (when k == 0, consider bit -1 always set), and
* - Whether the higher-order bits are zero or non-zero (i.e.
* is count >= 2^(k+1)).
*
* There are six states we distinguish. "x" represents some arbitrary
* bits, and "y" represents some arbitrary non-zero bits:
* 0: 00x: 0 pending of size 2^k; x pending of sizes < 2^k
* 1: 01x: 0 pending of size 2^k; 2^(k-1) + x pending of sizes < 2^k
* 2: x10x: 0 pending of size 2^k; 2^k + x pending of sizes < 2^k
* 3: x11x: 1 pending of size 2^k; 2^(k-1) + x pending of sizes < 2^k
* 4: y00x: 1 pending of size 2^k; 2^k + x pending of sizes < 2^k
* 5: y01x: 2 pending of size 2^k; 2^(k-1) + x pending of sizes < 2^k
* (merge and loop back to state 2)
*
* We gain lists of size 2^k in the 2->3 and 4->5 transitions (because
* bit k-1 is set while the more significant bits are non-zero) and
* merge them away in the 5->2 transition. Note in particular that just
* before the 5->2 transition, all lower-order bits are 11 (state 3),
* so there is one list of each smaller size.
*
* When we reach the end of the input, we merge all the pending
* lists, from smallest to largest. If you work through cases 2 to
* 5 above, you can see that the number of elements we merge with a list
* of size 2^k varies from 2^(k-1) (cases 3 and 5 when x == 0) to
* 2^(k+1) - 1 (second merge of case 5 when x == 2^(k-1) - 1).
*/
```
閱讀註解大概可以有個方向,但可以用 gdb 更快速的互動了解演算法的行為:
```shell
$ gcc list_sort.c -g -c
$ gcc list_sort.o person.c -g -o person
$ gdb ./person
```
在 `list_sort` 裡設置斷點:
```shell
(gdb) list list_sort.c:list_sort
192 */
193 __attribute__((nonnull(2,3)))
194 void list_sort(void *priv, struct list_head *head,
195 int (*cmp)(void *priv, struct list_head *a,
196 struct list_head *b))
197 {
198 struct list_head *list = head->next, *pending = NULL;
199 size_t count = 0; /* Count of pending */
200
201 if (list == head->prev) /* Zero or one elements */
(gdb) b list_sort.c:198
Breakpoint 1 at 0x1470: file list_sort.c, line 198.
```
突然發現在 `gdb` 中非常需要 `container_of` 的幫助,否則再怎麼觀察只能看到一堆 `list_head` 的位址:
```shell
(gdb) p pending
$7 = (struct list_head *) 0x55e22743a4b0
```
所幸,`gdb` 有提供 `python` 的工具可以使用,將這個 [pr](https://git.congatec.com/arm/imx6_kernel_4.1/-/blob/b0fecd8c570310c5041035a94eda7a4610402ace/scripts/gdb/linux/utils.py) 中的 `utils.py` 加入自己的目錄,然後再 `source` 就可以使用了:
```shell
(gdb) source utils/container_of.py
```
排序的狀況一目了然:
```shell
(gdb) p $container_of(pending, "person", "list")
$7 = (person *) 0x55e22743a490
(gdb) p $container_of(pending, "person", "list")->age
$7 = 12
```
演算法的流程很像在玩 [2048](https://play2048.co/) 這個遊戲。要能最大化的利用空間,就要不斷盡可能將可以合併的方塊合併。
#### Merge Sort 參考資料
[External Sorting, Merge Sort, Double Buffering, Replacement Sort](https://www.youtube.com/watch?v=YjFI9CJy6x0&ab_channel=ChaoXu)
想要參考不同變形的 merge sort,找到了這個課程影片,主題為資料庫,在講解如何排序太大以致無法放進 RAM 的資料,說明得非常清楚。雖然 `list_sort` 並非要處理巨量的資料,但同樣是想運用硬體特性將小量要進行 merge 的資料放進快速的記憶體。
- `list_sort`: 將資料從 memory 放進 L1
- 資料庫: 將資料從 disk 放進 memory
merge sort 的優點不只有 stable,在切割 subproblem 非常的彈性自由,可以照需求去 unbalanced 的切出大小兩個 subproblem,甚至一次切成 3, 4, ..., 500 個 subproblem 也是可以的。
> This is why we use merge sort, if we have two billions things that you want to merge, you can bring them in one thousand at a time.
還有一個彈性,還記得經典 merge sort 的 base case 是單一個值,因此必定是已排序,但其實 Base case 也可以是某一個已經排序的 list,且是先用別種演算法排過,如 quicksort。影片中舉例的 base case 是一個放得進記憶體的 page,裡面是已排序好的 list。
> What sort do you use (on base case) doesn't make a difference because it's in RAM. So use a really fast one, quicksort is probably the best one. Different database systems use different ones.
>
#### 應用情景
`list_sort.c` 中的 `list_sort` 可以排序含有 `list_head` 的 struct,這邊實作一個可以依照 struct 中不同 member 來排序的使用情景,考慮以下 struct:
```cpp
typedef struct {
char *first_name;
char *last_name;
int age;
int birth_month;
int birth_date;
struct list_head list;
} person;
```
並提供三種 `cmp` 函式,分別依照名字、年齡和生日排序:
```cpp
int sort_by_name(void *priv, struct list_head *a, struct list_head *b)
{
person *pa = list_entry(a, person, list);
person *pb = list_entry(b, person, list);
int diff = strcmp(pa->last_name, pb->last_name);
if (diff)
return diff;
diff = strcmp(pa->first_name, pb->first_name);
return diff;
}
int sort_by_age(void *priv, struct list_head *a, struct list_head *b)
{
person *pa = list_entry(a, person, list);
person *pb = list_entry(b, person, list);
return pa->age > pb->age;
}
int sort_by_birthday(void *priv, struct list_head *a, struct list_head *b)
{
person *pa = list_entry(a, person, list);
person *pb = list_entry(b, person, list);
if (pa->birth_month != pb->birth_month)
return pa->birth_month > pb->birth_month;
return pa->birth_date > pb->birth_date;
}
```
並以以下程式測試:
```cpp
int main() {
LIST_HEAD(head);
INIT_LIST_HEAD(&head);
gen_person(&head);
printf("sort by name: \n");
list_sort(NULL, &head, sort_by_name);
display(&head);
printf("\nsort by age: \n");
list_sort(NULL, &head, sort_by_age);
display(&head);
printf("\nsort by birthday: \n");
list_sort(NULL, &head, sort_by_birthday);
display(&head);
}
```
輸出:
```shell
sort by name:
張志明 age: 32 birthday: 11/21
徐淑娟 age: 58 birthday: 5/20
徐金龍 age: 64 birthday: 9/6
葉志偉 age: 100 birthday: 2/22
葉金龍 age: 1 birthday: 11/20
郭俊傑 age: 61 birthday: 2/2
郭美惠 age: 74 birthday: 2/12
郭美玲 age: 67 birthday: 8/1
陳淑貞 age: 68 birthday: 4/1
陳麗華 age: 12 birthday: 7/12
sort by age:
葉金龍 age: 1 birthday: 11/20
陳麗華 age: 12 birthday: 7/12
張志明 age: 32 birthday: 11/21
徐淑娟 age: 58 birthday: 5/20
郭俊傑 age: 61 birthday: 2/2
徐金龍 age: 64 birthday: 9/6
郭美玲 age: 67 birthday: 8/1
陳淑貞 age: 68 birthday: 4/1
郭美惠 age: 74 birthday: 2/12
葉志偉 age: 100 birthday: 2/22
sort by birthday:
郭俊傑 age: 61 birthday: 2/2
郭美惠 age: 74 birthday: 2/12
葉志偉 age: 100 birthday: 2/22
陳淑貞 age: 68 birthday: 4/1
徐淑娟 age: 58 birthday: 5/20
陳麗華 age: 12 birthday: 7/12
郭美玲 age: 67 birthday: 8/1
徐金龍 age: 64 birthday: 9/6
葉金龍 age: 1 birthday: 11/20
張志明 age: 32 birthday: 11/21
```
詳細程式碼可見 [GitHub](https://github.com/ToastCheng/linux2021_w2_q1/blob/main/person.c)
### 3. 並設計效能評比的程式碼,說明 Linux 核心的 `lib/list_sort.c` 最佳化手法
#### 效能評比
每個 input size 重複測試 1000 次,將平均繪成散點,陰影為一個標準差:

詳細程式碼可見 GitHub:
- [benchmark](https://github.com/ToastCheng/linux2021_w2_q1/blob/main/person_bm.c)
- [plot.py](https://github.com/ToastCheng/linux2021_w2_q1/blob/main/utils/plot.py)
#### 最佳化手法
##### null-terminated singly-linked list
在處理大部分的 sublist 時,不必維持本來的雙向及循環結構,省去維護的開銷,只要在最後讓其恢復成 circular doubly-linked list 即可。
##### branch prediction: `likely`, `unlikely`
在只有少數次或大多會進入的分支中加入 `unlikely` 及 `likely` 幫助判斷要 fetch 的分支,例如:
```cpp
if (likely(bits)) {
struct list_head *a = *tail, *b = a->prev;
a = merge(priv, (cmp_func)cmp, b, a);
/* Install the merged result in place of the inputs */
a->prev = b->prev;
*tail = a;
}
```
該分支只有在第一次,及 sublist 要 merge 成目前最大的 sublist ($2^{k+1}$)才會跳過,其他情況都會進入,因此加上 `likely`。
##### prevent tight loop: `cond_sched`
在 `merge_final` 中最後的迴圈解釋道,如果 merge 時兩個 sublist 非常不平衡,那便會在迴圈重複很多次,雖然這個階段只要不斷將剩餘的 sublist 中的節點加入 `tail`,但依然還是呼叫 `cmp`,原因是給使用者彈性,可以在 `cmp` 中呼叫 `cond_resched`,來讓其他 process 有機會可以搶佔。
```cpp
do {
/*
* If the merge is highly unbalanced (e.g. the input is
* already sorted), this loop may run many iterations.
* Continue callbacks to the client even though no
* element comparison is needed, so the client's cmp()
* routine can invoke cond_resched() periodically.
*/
// if (unlikely(!++count))
if (!++count)
cmp(priv, b, b);
b->prev = tail;
tail = b;
b = b->next;
} while (b);
```
在 `fs/ubifs/gc.c`、 `fs/ubifs/replay.c` 中可以看到使用範例:
```cpp
static int data_nodes_cmp(void *priv, struct list_head *a, struct list_head *b)
{
ino_t inuma, inumb;
struct ubifs_info *c = priv;
struct ubifs_scan_node *sa, *sb;
cond_resched(); // here.
if (a == b)
return 0;
sa = list_entry(a, struct ubifs_scan_node, list);
sb = list_entry(b, struct ubifs_scan_node, list);
ubifs_assert(c, key_type(c, &sa->key) == UBIFS_DATA_KEY);
ubifs_assert(c, key_type(c, &sb->key) == UBIFS_DATA_KEY);
ubifs_assert(c, sa->type == UBIFS_DATA_NODE);
ubifs_assert(c, sb->type == UBIFS_DATA_NODE);
inuma = key_inum(c, &sa->key);
inumb = key_inum(c, &sb->key);
if (inuma == inumb) {
unsigned int blka = key_block(c, &sa->key);
unsigned int blkb = key_block(c, &sb->key);
if (blka <= blkb)
return -1;
} else if (inuma <= inumb)
return -1;
return 1;
}
```
`// TODO: 說明 cond_resched`
#### 參考資料
[RCU, cond_resched(), and performance regressions](https://lwn.net/Articles/603252/)
### 4. 將 quiz1 提出的改進實作,和 Linux 核心的 `lib/list_sort.c` 進行效能比較,需要提出涵蓋足夠廣泛的 benchmark
---
## Q2: 回傳小於或等於 N 的 power-of-2
### 1. 解釋程式碼運作原理
```cpp
uint16_t func(uint16_t N) {
/* change all right side bits to 1 */
N |= N >> 1;
N |= N >> /* X */ 2;
N |= N >> /* Y */ 4;
N |= N >> /* Z */ 8;
return (N + 1) >> 1;
}
```
`func` 的作用在於找到最大的「等於或小於 `N` 的 power of 2」,假設 N 最高位非 0 的 bit 為從右數來第 x 個 bit ,其想法為
1. 將第 x - 1 至 0 的 bit 都設為 1,前四行為逐步「填滿」的過程。
3. 最後加 1 便能進位,得到僅有第 x + 1 bit 為 1 的數。
4. 再右移後便能得到欲求的數。
若以 N = 323 舉例:
```
N = 0000 0001 0100 0011
N |= N >> 1 = 0000 0001 1110 0011
N |= N >> 2 = 0000 0001 1111 1011
N |= N >> 4 = 0000 0001 1111 1111
N |= N >> 8 = 0000 0001 1111 1111
(N + 1) >> 1 = 0000 0001 0000 0000
```
`uin16_t` 的最大值為 65535,若發生了 overflow 那勢必會有不如函式預期的結果,但實際測試卻可以發現沒有問題:
```cpp
int main() {
uint16_t a = func(65535);
printf("Output: %u\n", a);
}
```
> 預期輸出:
> ```
> Output: 32768
> ```
透過 [Stackoverflow 的討論](https://stackoverflow.com/questions/55066827/why-cant-you-shift-a-uint16-t)找到了 integer promotion 這個術語,並在 [C Standard 文件](http://www.open-std.org/jtc1/sc22/wg14/www/docs/n1570.pdf)找到詳細的說明。簡單來說,在 `uint16_t` 進行運算時,會將其當作 int 進行運算,接著再截斷 (truncate) 回原本的大小。
在文件中這不是 undefined behavior 且有明確定義的行為,而題目的情境至多也只會多用第 17 個 bit 來運算,並不會左移碰觸到 sign bit。
但若是想要計算 `uint32_t` 呢?這應該不在 integer promotion 的作用範疇,若輸入大於 $2^{32}$,在計算時會需要用到第 33 個 bit,依然會有 overflow 的問題。
若不使用 integer promotion,可以利用 `__builtin_clz` 來得到最高位的位置,再將其設為 1 也能得到一樣的數值,先測試從 0 到 65535:
```cpp
uint16_t func2(uint16_t N) {
int lead = __builtin_clz(N);
return 1 << (31 - lead);
}
int main() {
for (int _i = 0; _i <= 65535; _i++) {
uint16_t i = _i;
uint16_t a = func(i);
uint16_t b = func2(i);
if (a != b) {
printf("Assertion failed at %u: %u != %u\n", i, a, b);
}
}
}
```
> 預期輸出:
> ```
> Assertion failed at 0: 0 != 1
> ```
需要注意的是對 `__builtin_clz` 來說輸入 0 是 [undefined behavior](https://gcc.gnu.org/onlinedocs/gcc/Other-Builtins.html):
> Returns the number of leading 0-bits in x, starting at the most significant bit position. If x is 0, the result is undefined.
因此不應依賴 `__builtin_clz` 對 0 做運算。但考量函式要回傳比 N 還小的 power of 2,不論 0 或是 1 應該都不是預期的答案,最小的 power of N 應該是 1,因此 0 不應該在預期的輸入範圍內。
再來是 `uint32_t` 的版本,因為不佔用篇幅,只測試到 $2^{32}$ 後 10 個數,可以看到原本的函式 `func` 都有 overflow 的問題:
```cpp
uint32_t func(uint32_t N) {
/* change all right side bits to 1 */
N |= N >> 1;
N |= N >> /* X */ 2;
N |= N >> /* Y */ 4;
N |= N >> /* Z */ 8;
N |= N >> /* Z */ 16;
return (N + 1) >> 1;
}
uint32_t func2(uint32_t N) {
return 1 << (31 - __builtin_clz(N));
}
int main() {
uint64_t max = (uint64_t) INT_MAX + 10;
for (uint64_t _i = 1; _i <= max; _i++) {
uint32_t i = _i;
uint32_t a = func(i);
uint32_t b = func2(i);
if (a != b) {
printf("Assertion failed at %u: %u != %u\n", i, a, b);
}
}
}
```
> 預期輸出:
> ```
> Assertion failed at 2147483648: 0 != 2147483648
> Assertion failed at 2147483649: 0 != 2147483648
> Assertion failed at 2147483650: 0 != 2147483648
> Assertion failed at 2147483651: 0 != 2147483648
> Assertion failed at 2147483652: 0 != 2147483648
> Assertion failed at 2147483653: 0 != 2147483648
> Assertion failed at 2147483654: 0 != 2147483648
> Assertion failed at 2147483655: 0 != 2147483648
> Assertion failed at 2147483656: 0 != 2147483648
> Assertion failed at 2147483657: 0 != 2147483648
> ```
### 2. 舉例說明 `is_power_of_2` 其作用和考量
在 `pgtable.c` 可以看到 macro `is_power_of_2` 的實作:
```cpp
#define is_power_of_2(x) ((x) != 0 && (((x) & ((x) - 1)) == 0))
```
回傳值如其名,可以判斷一個數是否為 power of 2。做法很巧妙,只有 power of 2 減 1 會得到最高位非 0 bit 後全為 1 的數,與其做 AND 運算便能得到 0。
`include/linux/log2.h`
```cpp
/**
* __roundup_pow_of_two() - round up to nearest power of two
* @n: value to round up
*/
static inline __attribute__((const))
unsigned long __roundup_pow_of_two(unsigned long n)
{
return 1UL << fls_long(n - 1);
}
```
```cpp
/**
* __rounddown_pow_of_two() - round down to nearest power of two
* @n: value to round down
*/
static inline __attribute__((const))
unsigned long __rounddown_pow_of_two(unsigned long n)
{
return 1UL << (fls_long(n) - 1);
}
```
### 3. 研讀 slab allocator,探索 Linux 核心的 slab 實作,找出運用 power-of-2 的程式碼和解讀
:::success
延伸問題:
1. 解釋上述程式碼運作原理
2. 在 The Linux Kernel API 頁面搜尋 “power of 2”,可見 is_power_of_2,查閱 Linux 核心原始程式碼,舉例說明其作用和考量,特別是 round up/down power of 2
- 特別留意 __roundup_pow_of_two 和 __rounddown_pow_of_two 的實作機制
4. 研讀 slab allocator,探索 Linux 核心的 slab 實作,找出運用 power-of-2 的程式碼和解讀
:::
---
## Q3: bitcpy
### 1. 程式碼運作原理
例如:_read = 13
`read_lfs = 13 & 7 = 5` (color orchid in byte 1)
`read_rhs = 8 - 5 = 3` (color lightblue in byte 1)
```graphviz
digraph G {
node [shape=record]
{ rank = same; B0 B1 }
B0 [
shape=none
xlabel = "Byte 0"
label=<<table border="0" cellspacing="0">
<tr>
<td port="7" border="1" bgcolor="orchid">7</td>
<td port="6" border="1" bgcolor="orchid">6</td>
<td port="5" border="1" bgcolor="orchid">5</td>
<td port="4" border="1" bgcolor="orchid">4</td>
<td port="3" border="1" bgcolor="orchid">3</td>
<td port="2" border="1" bgcolor="orchid">2</td>
<td port="1" border="1" bgcolor="orchid">1</td>
<td port="0" border="1" bgcolor="orchid">0</td>
</tr>
</table>>
]
B1 [
shape=none
xlabel = "Byte 1"
label=<<table border="0" cellspacing="0">
<tr>
<td port="7" border="1" bgcolor="orchid">7</td>
<td port="6" border="1" bgcolor="orchid">6</td>
<td port="5" border="1" bgcolor="orchid">5</td>
<td port="4" border="1" bgcolor="orchid">4</td>
<td port="3" border="1" bgcolor="orchid">3</td>
<td port="2" border="1" bgcolor="lightblue">2</td>
<td port="1" border="1" bgcolor="lightblue">1</td>
<td port="0" border="1" bgcolor="lightblue">0</td>
</tr>
</table>>
]
// subgraph cluster0 {
// rankdir=LR
// node [shape=record]
// pencolor = "white"
// B0 [
// shape=none
// label=<<table border="0" cellspacing="0">
// <tr>
// <td port="7" border="1" bgcolor="orchid">7</td>
// <td port="6" border="1" bgcolor="orchid">6</td>
// <td port="5" border="1" bgcolor="orchid">5</td>
// <td port="4" border="1" bgcolor="orchid">4</td>
// <td port="3" border="1" bgcolor="orchid">3</td>
// <td port="2" border="1" bgcolor="orchid">2</td>
// <td port="1" border="1" bgcolor="orchid">1</td>
// <td port="0" border="1" bgcolor="orchid">0</td>
// </tr>
// </table>>
// ]
// label = "Byte 0"
// }
// subgraph cluster1 {
// node [shape=record]
// pencolor = "white"
// B1 [
// shape=none
// label=<<table border="0" cellspacing="0">
// <tr>
// <td port="7" border="1" bgcolor="orchid">7</td>
// <td port="6" border="1" bgcolor="orchid">6</td>
// <td port="5" border="1" bgcolor="orchid">5</td>
// <td port="4" border="1" bgcolor="orchid">4</td>
// <td port="3" border="1" bgcolor="lightblue">3</td>
// <td port="2" border="1" bgcolor="lightblue">2</td>
// <td port="1" border="1" bgcolor="lightblue">1</td>
// <td port="0" border="1" bgcolor="lightblue">0</td>
// </tr>
// </table>>
// ]
// label = "Byte 1"
// }
src [label="source" shape=plaintext]
// read [label="_read" shape=plaintext]
src->B1:7
// read->B1:4
// B1:7->B1:4 [
// label="read_lhs"
// dir="none"
// headport="nw"
// tailport="n"
// ]
}
```
:::success
延伸問題:
1. 解釋上述程式碼運作原理,並嘗試重寫為同樣功能但效率更高的實作
2. 在 Linux 核心原始程式碼找出逐 bit 進行資料複製的程式碼,並解說相對應的情境
:::
---
## Q4: String interning
### 1. 程式碼運作原理
先從 struct 觀察
```cpp
typedef struct __cstr_data {
char *cstr;
uint32_t hash_size;
uint16_t type;
uint16_t ref;
} * cstring;
typedef struct __cstr_buffer {
cstring str;
} cstr_buffer[1];
```
```cpp
struct __cstr_node {
char buffer[CSTR_INTERNING_SIZE];
struct __cstr_data str;
struct __cstr_node *next;
};
struct __cstr_pool {
struct __cstr_node node[INTERNING_POOL_SIZE];
};
struct __cstr_interning {
int lock;
int index;
unsigned size;
unsigned total;
struct __cstr_node **hash;
struct __cstr_pool *pool;
};
static struct __cstr_interning __cstr_ctx;
```
```cpp
#define CSTR_LOCK() \
({ \
while (__sync_lock_test_and_set(&(__cstr_ctx.lock), 1)) { \
} \
})
#define CSTR_UNLOCK() ({ __sync_lock_release(&(__cstr_ctx.lock)); })
```
`__sync_lock_test_and_set` 將 `__cstr_ctx.lock` 的值更新為 1,並回傳原本的值,因此若原本是 1,則會停留在 while 迴圈,達到 lock 的效果;而 `__sync_lock_release` 則是將 `__cstr_ctx.lock` 設回 0。
### 2. 多執行緒環境下的 string interning
### 3. 分析 `chriso/intern` 效能表現
:::success
延伸問題:
1. 解釋上述程式碼運作原理
2. 上述程式碼使用到 gcc [atomics builtins](https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html),請透過 POSIX Thread 在 GNU/Linux 驗證多執行緒的環境中,string interning 能否符合預期地運作?若不能,請提出解決方案
3. [chriso/intern](https://github.com/chriso/intern) 是個更好的 string interning 實作,請探討其特性和設計技巧,並透過內建的 benchmark 分析其效能表現
:::