# 2021q1 Homework2 (quiz2) contributed by < [`WayneLin1992`](https://github.com/WayneLin1992) > ###### tags: `linux2021` ## 延伸問題 第一題 - [x] 解釋程式碼運作原理,指出改進空間並著手實作 - [x] 實作出能抽離為可單獨執行 (standalone) 的使用層級應用程式 - [x] 設計效能評比的程式碼,說明 Linux 核心的 lib/list_sort.c 最佳化手法 ### `list_head` 結構 ```cpp struct list_head { struct list_head *prev, *next; }; ``` ### `INIT_LIST_HEAD` ```cpp static inline void INIT_LIST_HEAD(struct list_head *head) { head->next = head; head->prev = head; } ``` ```graphviz digraph list_add_node_t { node [shape=record]; struct1 [label="<f0> prev|<f1> next"]; struct1:f0->struct1:f1; struct1:f1->struct1:f0; } ``` ### `get_middle` ```cpp static list_ele_t *get_middle(struct list_head *list) { struct list_head *fast = list->next, *slow; list_for_each (slow, list) { if (COND1 || COND2) break; fast = fast->next->next; } return list_entry(TTT, list_ele_t, list); } ``` 使用龜兔賽跑技巧, fast 前進兩位, slow 前進一位,等到 fast 繞一圈,因此 COND1 = `fast->next = list` COND2 = ` fast->next->next == list`, slow 會在中間的位置,所以 TTT = `slow` 。 > TODO: 探討偶數和奇數個節點的 list 在上述演算法的行為 > :notes: jserv :::info 當總節點為奇數(共五個節點)時: 初始位置 ```graphviz digraph list_add_node_t { node [shape=record]; rankdir=LR; node1 [label="<f0> prev|<f1> next"]; node2 [label="<f0> prev|<f1> next"]; node3 [label="<f0> prev|<f1> next"]; node4 [label="<f0> prev|<f1> next"]; node5 [label="<f0> prev|<f1> next"]; node1:f1->node2:f0; node2:f0->node1:f1; node2:f1->node3:f0; node3:f0->node2:f1; node3:f1->node4:f0; node4:f0->node3:f1; node4:f1->node5:f0; node5:f0->node4:f1; node5:f1->node1:f0; node1:f0->node5:f1; node [shape=circle]; slow->node1; fast->node2; } ``` 第二迴圈 ```graphviz digraph list_add_node_t { node [shape=record]; rankdir=LR; node1 [label="<f0> prev|<f1> next"]; node2 [label="<f0> prev|<f1> next"]; node3 [label="<f0> prev|<f1> next"]; node4 [label="<f0> prev|<f1> next"]; node5 [label="<f0> prev|<f1> next"]; node1:f1->node2:f0; node2:f0->node1:f1; node2:f1->node3:f0; node3:f0->node2:f1; node3:f1->node4:f0; node4:f0->node3:f1; node4:f1->node5:f0; node5:f0->node4:f1; node5:f1->node1:f0; node1:f0->node5:f1; node [shape=circle]; slow->node2; fast->node4; } ``` 第三迴圈:因為 `fast->next->next` 會指到 list 所以會 break,而此時 slow 指向第三節點。 ```graphviz digraph list_add_node_t { node [shape=record]; rankdir=LR; node1 [label="<f0> prev|<f1> next"]; node2 [label="<f0> prev|<f1> next"]; node3 [label="<f0> prev|<f1> next"]; node4 [label="<f0> prev|<f1> next"]; node5 [label="<f0> prev|<f1> next"]; node1:f1->node2:f0; node2:f0->node1:f1; node2:f1->node3:f0; node3:f0->node2:f1; node3:f1->node4:f0; node4:f0->node3:f1; node4:f1->node5:f0; node5:f0->node4:f1; node5:f1->node1:f0; node1:f0->node5:f1; node [shape=circle]; slow->node3; fast->node1; } ``` 當總節點為偶數(共四個節點)時 初始位置 ```graphviz digraph list_add_node_t { node [shape=record]; rankdir=LR; node1 [label="<f0> prev|<f1> next"]; node2 [label="<f0> prev|<f1> next"]; node3 [label="<f0> prev|<f1> next"]; node4 [label="<f0> prev|<f1> next"]; node1:f1->node2:f0; node2:f0->node1:f1; node2:f1->node3:f0; node3:f0->node2:f1; node3:f1->node4:f0; node4:f0->node3:f1; node4:f1->node1:f0; node1:f0->node4:f1; node [shape=circle]; slow->node1; fast->node2; } ``` 第二迴圈 ```graphviz digraph list_add_node_t { node [shape=record]; rankdir=LR; node1 [label="<f0> prev|<f1> next"]; node2 [label="<f0> prev|<f1> next"]; node3 [label="<f0> prev|<f1> next"]; node4 [label="<f0> prev|<f1> next"]; node1:f1->node2:f0; node2:f0->node1:f1; node2:f1->node3:f0; node3:f0->node2:f1; node3:f1->node4:f0; node4:f0->node3:f1; node4:f1->node1:f0; node1:f0->node4:f1; node [shape=circle]; slow->node2; fast->node4; } ``` 第三迴圈: 因為 `fast->next = list` 所以 break 此時由上圖知道 slow 指向第二節點的位置, 總結:由此可知 slow 是指向中間的位置。 ::: ### `merge_sort` ```cpp void list_merge_sort(queue_t *q) { if (list_is_singular(&q->list)) return; queue_t left; struct list_head sorted; INIT_LIST_HEAD(&left.list); list_cut_position(&left.list, &q->list, MMM); list_merge_sort(&left); list_merge_sort(q); list_merge(&left.list, &q->list, &sorted); INIT_LIST_HEAD(&q->list); list_splice_tail(&sorted, &q->list); } ``` 查看 `list_cut_position` ```cpp /** * list_cut_position() - Move beginning of a list to another list * @head_to: pointer to the head of the list which receives nodes * @head_from: pointer to the head of the list * @node: pointer to the node in which defines the cutting point */ static inline void list_cut_position(struct list_head *head_to, struct list_head *head_from, struct list_head *node) { struct list_head *head_from_first = head_from->next; if (list_empty(head_from)) return; if (head_from == node) { INIT_LIST_HEAD(head_to); return; } head_from->next = node->next; head_from->next->prev = head_from; head_to->prev = node; node->next = head_to; head_to->next = head_from_first; head_to->next->prev = head_to; } ``` 可以看出來 MMM 的位置是 `struct list_head *node` 題目是要 merge sort 實作,需要的是 `get_middle` 中 slow 的位置,`&get_middle(&q->list)` 因為 `get_middle` 回傳 type 為 `list_ele_t` ,但需要的是 `list_head` 所以 `&get_middle(&q->list)->list` ### `assert` 由 C99 規格書 7.2章,講解 `assert.h` ,允許寫狀態診斷訊息 `assert(validate(head));` 可以由 `validate( )` 看出此 function 為判斷,是否有從小到大依序排列,假如沒有就會出現下行的訊息。 ```shell $ ./doublylinkedlist doublylinkedlist.c:147: main: Assertion `validate(head)' failed. ```` ### 改進空間並著手實作 改善空間,事實上 `queue_t` 完全是多餘的, `list_ele_t` 中 `next` 也可以捨去 ```cpp typedef struct __element { char *value; struct list_head list; } list_ele_t; ``` 其中最需要注意的就是 free 部分 ```cpp static void list_free(struct list_head *list) { if (!list) return; struct list_head *current = list; while (current != list) { struct list_head *tmp = current; current = current->next; free(list_entry(tmp, list_ele_t, list)->value); free(list_entry(tmp, list_ele_t, list)); } free(list_entry(current, list_ele_t, list)->value); free(list_entry(current, list_ele_t, list)); } ``` 要用 container_of 知道 struct 起始位置,才能 free 掉。 ```cpp void list_show(struct list_head *list) { struct list_head *node; list_for_each(node, list) { printf("%s", list_entry(node, list_ele_t, list)->value); printf("->"); } } ``` :::warning 經過這幾次作業的經驗得知,涉及到 C 語言結構體的記憶體釋放,往往要在每個層面都相當熟悉結構體成員的組成,否則就會造成各式記憶體操作的錯誤。我曾經撰寫一個小程式 [nalloc](https://github.com/jserv/nalloc),在 C 語言標準函式庫的 malloc 和 free 之上建構一層包裝,達到 structure aware memory allocator 的作用。 舉例來說,你可以這樣使用 [nalloc](https://github.com/jserv/nalloc): ```cpp struct matrix { size_t rows, cols; int **data; }; struct matrix *matrix_new(size_t rows, size_t cols) { struct matrix *m = ncalloc(sizeof(*m), NULL); m->rows = rows; m->cols = cols; m->data = ncalloc(rows * sizeof(*m->data), m); for (size_t i = 0; i < rows; i++) m->data[i] = nalloc(cols * sizeof(**m->data), m->data); return m; } void matrix_delete(struct matrix *m) { nfree(m); } ``` 注意在 `matrix_delete` 函式中,你只需要呼叫一次 `nfree`,即可將 `matrix` 結構體內部的 data 成員和結構體本身釋放。[nalloc](https://github.com/jserv/nalloc) 並未增加太多成本,你可一併思考這樣的實作是如何達成。 :notes: jserv ::: :::info ::: 還可以實做出 show 看到實際排序後的結果。 ### 將 Linux 核心的 [lib/list_sort.c](https://github.com/torvalds/linux/blob/master/lib/list_sort.c) 改為可獨立執行的程式碼 :::info [lib/list_sort.c](https://github.com/torvalds/linux/blob/master/lib/list_sort.c) ```cpp typedef int __attribute__((nonnull(2,3))) (*cmp_func)(void *, struct list_head const *, struct list_head const *); ``` `__attribute__((nonnull(2,3)))` 代表2和3參數不能空。 `@cmp_func: pointer to comparison function` ```cpp * A good way to write a multi-word comparison is:: * * if (a->high != b->high) * return a->high > b->high; * if (a->middle != b->middle) * return a->middle > b->middle; * return a->low > b->low; ``` 與傳統不同的`cmp`方式。 ::: 將 value 轉成 int 使它能讓 random 數值寫入,並且還需要將程式轉成 user level 所能執行達到 standalone 的目的。 - [ ] dlist.h ```cpp typedef struct __element { int value; struct list_head list; } list_ele_t; ``` ```cpp static list_ele_t *get_middle(struct list_head *list) { struct list_head *fast = list->next, *slow; list_for_each (slow, list) { if (fast->next == list || fast->next->next == list) break; fast = fast->next->next; } return list_entry(slow, list_ele_t, list); } ``` - [ ] dlist_sort.c ```cpp int main() { int count = 20; list_ele_t *head = head_new(); srand(time(NULL)); while (count--) list_insert_head(head, rand() % 1024); list_merge_sort(head); list_show(&head->list); assert(validate(head)); list_free(&head->list); } ``` ```shell 39->127->164->291->355->402->409->434->437->437->447->497->638->719->742->838->922->942->956->966-> ``` ![](https://i.imgur.com/0j9ENTu.png) 將 quiz1 的 quick sort 及 non recursive quick sort 一起比較後,如下圖 ![](https://i.imgur.com/jTvvMfY.png) :::info 非遞迴 優點:能降低 cache missing 的程度,遞迴 優點:程式好寫,由下表可以看出非遞迴比遞迴的 cache missing 低很多, quick sort 的時間複雜度,快的時候能到 O(nlogn) 慢的時候到 O(n^2^),為了降低 quick sort 的時間複雜不確定性,將 pivot 能夠盡量的在中間值,能夠複雜度可以降到 O(nlogn)。 但經過我重新思考 quiz1 的改寫,發現到我有修改到 pivot 的位置,導致非遞迴 quick sort 時間複雜度較差。 ![](https://i.imgur.com/JgpC6JX.png) 在測驗一次 ![](https://i.imgur.com/3BqtI9R.png) 將 linux list_sort standalone 來比較 ![](https://i.imgur.com/sWnyT59.png) ::: 開啟 perf 權限 ```shell $ sudo sh -c " echo -1 > /proc/sys/kernel/perf_event_paranoid" $ perf stat --repeat 6 -e cache-misses,cache-references,instructions,cycles,L1-dcache-load-misses ./testbenchm ``` - [ ] merge sort ```shell Performance counter stats for './testbench_1' (6 runs): 16,980,216 cache-misses # 12.738 % of all cache refs ( +- 1.61% ) 133,304,604 cache-references ( +- 0.50% ) 11,536,861,274 instructions # 1.93 insn per cycle ( +- 0.00% ) 5,981,082,962 cycles ( +- 0.54% ) 85,213,334 L1-dcache-load-misses ( +- 0.35% ) 1.44124 +- 0.00892 seconds time elapsed ( +- 0.62% ) ``` - [ ] quick sort ```shell Performance counter stats for './testbenchq' (6 runs): 62,616,374 cache-misses # 32.664 % of all cache refs ( +- 1.02% ) 191,699,808 cache-references ( +- 0.70% ) 11,955,203,545 instructions # 1.00 insn per cycle ( +- 0.07% ) 11,931,402,273 cycles ( +- 2.22% ) 136,257,022 L1-dcache-load-misses ( +- 0.62% ) 2.8833 +- 0.0658 seconds time elapsed ( +- 2.28% ) ``` - [ ] quick sort non recursive ```shell Performance counter stats for './testbenchqn' (6 runs): 2,465,038 cache-misses # 10.800 % of all cache refs ( +- 1.36% ) 22,825,246 cache-references ( +- 0.21% ) 4,794,616,891 instructions # 1.41 insn per cycle ( +- 0.03% ) 3,406,350,521 cycles ( +- 0.15% ) 12,775,396 L1-dcache-load-misses ( +- 0.11% ) 0.83640 +- 0.00487 seconds time elapsed ( +- 0.58% ) ``` - [ ] linux kernel merge sort ```shell Performance counter stats for './list_sort' (6 runs): 16,893,343 cache-misses # 16.097 % of all cache refs ( +- 0.25% ) 104,949,775 cache-references ( +- 0.19% ) 5,341,908,031 instructions # 1.60 insn per cycle ( +- 0.00% ) 3,340,490,601 cycles ( +- 0.83% ) 68,921,142 L1-dcache-load-misses ( +- 0.18% ) 0.8189 +- 0.0103 seconds time elapsed ( +- 1.26% ) ``` :::info linux kernel merge sort 和 merge sort 比較 linux kernel merge sort 在 cache reference 和 執行時間上 都較好的表現。 quick sort 和 quick sort non recursive 比較 quick sort non recursive 在 cache reference, CPI和執行時間 都有更好的表現。 linux kernel merge sort 和 quick sort non recursive 比較 linux kernel merge sort 在 CPI 及 執行時間好一點。 其中 quick sort non recursive 有比較好的 cache misses 表現,在 privot 及 non recursive 優化的效果是巨大的。 ::: :::info TODO: ::: --- ## 延伸問題 第二題 - [x] 解釋題目程式碼運作原理 - [x] 研讀 Linux "power of 2" 相關,並說明其中的作用,及考量 - [x] 研讀 slab allocator,探索 Linux 核心的 slab 實作,並找出運用 power-of-2 的程式碼。 ### 解釋題目程式碼運作原理 ```cpp uint16_t func(uint16_t N) { N = N | N >> 1; N = N | N >> 2; N = N | N >> 4; N = N | N >> 8; return (N+1) >> 1; } ``` `func(21)` N = 10101~2~ 第一行 `10101 | 01010 = 11111` 回傳為 `(N+1) >> 1` 時 N = 10000~2~ = 16~10~ ### `is_power_of_2` ```cpp /** * is_power_of_2() - check if a value is a power of two * @n: the value to check * * Determine whether some value is a power of two, where zero is * *not* considered a power of two. * Return: true if @n is a power of 2, otherwise false. */ static inline __attribute__((const)) bool is_power_of_2(unsigned long n) { return (n != 0 && ((n & (n - 1)) == 0)); } ``` 輸入參數 `type` 為 `unsigned long` 代表不考慮有號的情況,將實際數值帶入,令 n = 16~10~ = 10000~2~ 代入後 `10000 & 01111 = 00000` 故得 n is power of 2。 將非 2 的次方,代入時,令 n = 13~10~ = 1101~2~ 代入後`1101 & 1100 = 1100` 得到非零解,所以 n is not power of 2。 ### `roundup_pow_of_two` ```cpp /** * __roundup_pow_of_two() - round up to nearest power of two * @n: value to round up */ static inline __attribute__((const)) unsigned long __roundup_pow_of_two(unsigned long n) { return 1UL << fls_long(n - 1); } ``` 有 `fls_long` 觀察一下 ```cpp static inline unsigned fls_long(unsigned long l) { if (sizeof(l) == 4) return fls(l); return fls64(l); } ``` `fls64` 及 `fls` find last set 找最後一個有值 bit 的位置,`sizeof(l)` 判斷 `unsign long` 因為 `long` 有 4 bytes (Windows) 也有 8 bytes (Linux)。 :::warning TODO: 許多現代微處理器提供 [Find first set](https://en.wikipedia.org/wiki/Find_first_set) 指令,找出 Linux 核心原始程式碼對應的硬體實作定義,如 x86_64 和 aarch64。 :notes: jserv ::: :::info aarch64: `clz` [Count Leading Zeros](https://developer.arm.com/documentation/dui0489/c/arm-and-thumb-instructions/general-data-processing-instructions/clz) `clz`的操作 e.g. `clz(0001010100011011)` = 3,從MSB到最後1出現零的數量。 x86: `TZCNT` [Counts the number of trailing least significant zero bits](https://www.felixcloutier.com/x86/tzcnt) e.g. `tzcnt(0000 0000 0000 0000 1000 0000 0000 1000)` = 3 ,從 LSB 到1出現零的數量。 `ffs = tzcnt + 1` Linux Armv5+ 為例 ```cpp /* * ffs() returns zero if the input was zero, otherwise returns the bit * position of the first set bit, where the LSB is 1 and MSB is 32. */ static inline int ffs(int x) { return fls(x & -x); } ``` e.g. `ffs(5403)` = `fls(0001010100011011 & 1110101011100101)` = `fls(0000000000000001)` = 1 從 LSB 開始計算到第一個 1 出現的位置。 ```cpp /* * fls() returns zero if the input is zero, otherwise returns the bit * position of the last set bit, where the LSB is 1 and MSB is 32. */ static inline int fls(int x) { if (__builtin_constant_p(x)) return constant_fls(x); return 32 - __clz(x); } ``` e.g. `fls(0001010100011011)` = 16 - 3 = 13 此為 16 bit 所以用 16 bit 相減。 從 LSB 開始計算到最後 1 的位置。 ```cpp /* * On ARMv5 and above those functions can be implemented around the * clz instruction for much better code efficiency. __clz returns * the number of leading zeros, zero input will return 32, and * 0x80000000 will return 0. */ static inline unsigned int __clz(unsigned int x) { unsigned int ret; asm("clz\t%0, %1" : "=r" (ret) : "r" (x)); return ret; } ``` `__clz` 為比 clz 更有效率的實作 e.g. `__clz(0x80000000)` = 0 。 延伸閱讀: * [clz 報告](https://hackmd.io/@3xOSPTI6QMGdj6jgMMe08w/Bk-uxCYxz) ::: :::warning TODO: 探討 Linux 核心原始程式碼中,為何會有 `clz` 和 `__clz` 這樣貌似同名但實質不同用法的函式,從而分析 Linux 跨平台支援的手法 :notes: jserv ::: :::info `clz` 在 arm 中的定義 `CLZ{cond} Rd, Rm` 其中 Rm operand register, Rd destination register。 `__clz` `__` 在 linux 代表實際實作,而 `clz` 由個硬體進行提供。 `clz` 在 MIPS 中的定義 `CLZ rd, rs` 其中 rs source register, rd destination register。 參考資料: [MIPS](https://www.cs.cornell.edu/courses/cs3410/2008fa/MIPS_Vol2.pdf) [arm __clz complier](https://developer.arm.com/documentation/dui0491/i/Compiler-specific-Features/--clz-intrinsic) [arm clz](https://www.keil.com/support/man/docs/armasm/armasm_dom1361289868426.htm) ::: `1UL` 為 unsigned long int 裡面值為 1 ,令 n = 10~10~ = 01010~2~ 代入得到 `fls_long(n-1)` 得到 4 也就是 `1UL << 4` = 10000~2~ = 16 達到 roundup the power of 2 無條件進位的目的。 :::info 其中`fls_long(n-1)`的 n-1 使原本就是 2 的次方的數值,保持一致 ::: 由此可以推測出 rounddown 只需要修改成 `fls_long(n)-1` 就成無條件刪去 ### slab allocator 我們需要一種配置記憶體的方式,來管理比 page 還小的記憶體,這就是 slab allocator 配置的大小將會以 power of two 來配置 (如:要 10 他就會配置 16)。 slab 配置方式 `kmalloc`: 負責配置比 page 還小的記憶體空間。(最小為 32 bytes 最大為 32MB) [/linux/slab.h](https://elixir.bootlin.com/linux/v4.8/source/include/linux/slab.h#L177) ```cpp /* * Figure out which kmalloc slab an allocation of a certain size * belongs to. * 0 = zero alloc * 1 = 65 .. 96 bytes * 2 = 129 .. 192 bytes * n = 2^(n-1)+1 .. 2^n */ static __always_inline int kmalloc_index(size_t size) { if (!size) return 0; if (size <= KMALLOC_MIN_SIZE) return KMALLOC_SHIFT_LOW; if (KMALLOC_MIN_SIZE <= 32 && size > 64 && size <= 96) return 1; if (KMALLOC_MIN_SIZE <= 64 && size > 128 && size <= 192) return 2; if (size <= 8) return 3; if (size <= 16) return 4; if (size <= 32) return 5; if (size <= 64) return 6; if (size <= 128) return 7; if (size <= 256) return 8; if (size <= 512) return 9; if (size <= 1024) return 10; if (size <= 2 * 1024) return 11; if (size <= 4 * 1024) return 12; if (size <= 8 * 1024) return 13; if (size <= 16 * 1024) return 14; if (size <= 32 * 1024) return 15; if (size <= 64 * 1024) return 16; if (size <= 128 * 1024) return 17; if (size <= 256 * 1024) return 18; if (size <= 512 * 1024) return 19; if (size <= 1024 * 1024) return 20; if (size <= 2 * 1024 * 1024) return 21; if (size <= 4 * 1024 * 1024) return 22; if (size <= 8 * 1024 * 1024) return 23; if (size <= 16 * 1024 * 1024) return 24; if (size <= 32 * 1024 * 1024) return 25; if (size <= 64 * 1024 * 1024) return 26; BUG(); /* Will never be reached. Needed because the compiler may complain */ return -1; } #endif /* !CONFIG_SLOB */ ``` :::info page 的大小並不會固定以 4KB 呈現,會因為不同的結構有所不同 [aarch64 in linux](http://lxr.linux.no/linux+v3.14.3/arch/arm64/include/asm/page.h#L22) 有 16KB, [IA64 in linux](http://lxr.linux.no/linux+v3.14.3/arch/ia64/include/asm/page.h#L29) 有 4KB, 8KB, 16KB, 64KB 不同。 參考資料: [Wikipedia: Page](https://en.wikipedia.org/wiki/Page_(computer_memory)) ::: :::info [slab 記憶體配置](https://events.static.linuxfound.org/sites/events/files/slides/slaballocators.pdf): cache friendly and benchmark friendly 目的: page 記憶體配置由 page allocator,但很多時候 不需要 page 那麼大的空間,這時就可以使用 slab allocator 配置 page 以下的記憶體空間,而且 cache 對 slab 的配置是很敏感的,而且 slab 所存入將是 object ,如下圖所示 ![](https://i.imgur.com/3ntlcVL.jpg) 操作: `static struct kmem_cache` 存指向下一個 kmem_cache 的指標, object 大小。 `struct kmem_cache_node *node[MAN_NUMNODES]` 其中將有 slab 的狀態,狀態可以分三種 full, partial, empty 代表 slab 的空間。 `struct array_cache __percpu *cpu_cache` 避免找空隙時需要 O(n) 所以會在 array 中放入空隙的 index 方便搜索空隙。 `void *freelist` page 空隙的位置。 `void *s_mem` 指向 page 第一個 object 。 ![](https://i.imgur.com/7eeIW3e.png) NUMA (Non-uniform memory access): 在 SMP (對稱多處理器)時,要使用單一 BUS 向記憶體提取資料,而處理器處理資料的速度高於記憶體給資料速度,處理器會產生 "starved for data" 的情況,所以 NUMA 就希望每個處理器,都能有個 BUS 連接記憶體,避免壅塞的情形,所以延伸出的做法,每個處理器都有屬於自己的本地記憶體 ( local memory),這裡講的本地記憶體,是只靠近處理器的(記憶體包括: cache 、 memory) ![](https://i.imgur.com/ViA5XvL.png) NUMA 對 slab 的影響: 在 NUMA 下除了本地記憶體還可以透過詢問其他處理器使遠方記憶體可以進行配置,使 slab 在尋找 slabs free 變得緩慢,所以在 `kmem_cache_node` 中增加 partial list, full list, empty list,使得 slab allocator 效率整體上升 5% ``` 修正前 w/o patch Tasks jobs/min jti jobs/min/task real cpu 1 485.36 100 485.3640 11.99 1.91 Sat Apr 30 14:01:51 2005 100 26582.63 88 265.8263 21.89 144.96 Sat Apr 30 14:02:14 2005 200 29866.83 81 149.3342 38.97 286.08 Sat Apr 30 14:02:53 2005 300 33127.16 78 110.4239 52.71 426.54 Sat Apr 30 14:03:46 2005 400 34889.47 80 87.2237 66.72 568.90 Sat Apr 30 14:04:53 2005 500 35654.34 76 71.3087 81.62 714.55 Sat Apr 30 14:06:15 2005 600 36460.83 75 60.7681 95.77 853.42 Sat Apr 30 14:07:51 2005 700 35957.00 75 51.3671 113.30 990.67 Sat Apr 30 14:09:45 2005 800 33380.65 73 41.7258 139.48 1140.86 Sat Apr 30 14:12:05 2005 900 35095.01 76 38.9945 149.25 1281.30 Sat Apr 30 14:14:35 2005 1000 36094.37 74 36.0944 161.24 1419.66 Sat Apr 30 14:17:17 2005 修正後 w/patch Tasks jobs/min jti jobs/min/task real cpu 1 484.27 100 484.2736 12.02 1.93 Sat Apr 30 15:59:45 2005 100 28262.03 90 282.6203 20.59 143.57 Sat Apr 30 16:00:06 2005 200 32246.45 82 161.2322 36.10 282.89 Sat Apr 30 16:00:42 2005 300 37945.80 83 126.4860 46.01 418.75 Sat Apr 30 16:01:28 2005 400 40000.69 81 100.0017 58.20 561.48 Sat Apr 30 16:02:27 2005 500 40976.10 78 81.9522 71.02 696.95 Sat Apr 30 16:03:38 2005 600 41121.54 78 68.5359 84.92 834.86 Sat Apr 30 16:05:04 2005 700 44052.77 78 62.9325 92.48 971.53 Sat Apr 30 16:06:37 2005 800 41066.89 79 51.3336 113.38 1111.15 Sat Apr 30 16:08:31 2005 900 38918.77 79 43.2431 134.59 1252.57 Sat Apr 30 16:10:46 2005 1000 41842.21 76 41.8422 139.09 1392.33 Sat Apr 30 16:13:05 2005 ``` 參考資料: [The Slab Allocator in the Linux kernel](https://hammertux.github.io/slab-allocator) [NUMA aware slab allocator V4](https://lwn.net/Articles/137890/) ::: --- ## 延伸問題 第三題 - [x] 解釋程式碼運作原理, - [ ] 並嘗試重寫為同樣功能但效率更高的實作 - [ ] 在 Linux 核心原始程式碼找出逐 bit 進行資料複製的程式碼,並解說相對應的情境 ```cpp while (count > 0) { uint8_t data = *source++; size_t bitsize = (count > 8) ? 8 : count; if (read_lhs > 0) { RRR; if (bitsize > read_rhs) data |= (*source >> read_rhs); } if (bitsize < 8) data &= read_mask[bitsize]; uint8_t original = *dest; uint8_t mask = read_mask[write_lhs]; if (bitsize > write_rhs) { /* Cross multiple bytes */ *dest++ = (original & mask) | (data >> write_lhs); original = *dest & write_mask[bitsize - write_rhs]; *dest = original | (data << write_rhs); } else { // Since write_lhs + bitsize is never >= 8, no out-of-bound access. DDD; *dest++ = (original & mask) | (data >> write_lhs); } count -= bitsize; } ``` 由 if 的條件可以知道, `read_lhs > 0` 代表 `_read & 7 > 0` 意思是 `_read` 讀取的內容不足 8 bit , `data = *source++` 所以我們要 RRR = `data = data << read_lhs` 使 data 成為 8 bit , `bitsize > write_rhs` 判斷寫入位元 及 位數,上面註解有寫 `write_lhs + bitsize is never >= 8` DDD = `mask |= mask_rw[write_lhs + bitsize];` ### 重寫實作 可以使用一個表格就好,但表格的參數要有改變,用一個 bit 一個 bit 為代表。 ```cpp static const uint8_t mask_rw[] = { 0x00, /* == 0 00000000b */ 0x80, /* == 1 10000000b */ 0x40, /* == 2 01000000b */ 0x20, /* == 3 00100000b */ 0x10, /* == 4 00010000b */ 0x08, /* == 5 00001000b */ 0x04, /* == 6 00000100b */ 0x02, /* == 7 00000010b */ 0x01 /* == 8 00000001b */ }; ``` 經測試後得到相同結果 ```shell 1: 0: 0 1000000000000000000000000000000000000000000000000000000000000000 1: 1: 0 0100000000000000000000000000000000000000000000000000000000000000 1: 2: 0 0010000000000000000000000000000000000000000000000000000000000000 ... 1:15: 0 0000000000000001000000000000000000000000000000000000000000000000 ``` --- ## 延伸問題 第四題 - [ ] 解釋上述程式碼運作原理 - [ ] 程式碼使用到 gcc atomics builtins,請透過 POSIX Thread 在 GNU/Linux 驗證多執行緒的環境中,string interning 能否符合預期地運作?若不能,請提出解決方案 - [ ] chriso/intern 是個更好的 string interning 實作,請探討其特性和設計技巧,並透過內建的 benchmark 分析其效能表現 ### 解釋程式運作原理 程式目的: string interning 將 string 存入 hash 中,當想使用時只需要將指標指向 string 即可,不需要額外存取,來改善記憶體的使用效率。 ```cpp enum { CSTR_PERMANENT = 1, CSTR_INTERNING = 2, CSTR_ONSTACK = 4, }; ``` 查看 cstring 是否有對應的 interning。 ```cpp typedef struct __cstr_data { char *cstr; uint32_t hash_size; uint16_t type; uint16_t ref; } * cstring; ``` `cstr` : 字串最長能存到 `CSTR_INTERNING_SIZE` = `32`。 `hash_size` : hash table 的大小 `type` : `CSTR_ONSTACK` `string` 存入 stack 中, `CSTR_INTERNING` 存入 heap 中, `CSTR_PERMANENT` 存入 loop 中。 `ref` reference counting:負責記錄空間是否有被使用過,有使用就會 `count++` 。 ```graphviz digraph list_add_node_t { node [shape=record]; rankdir=LR; hash_table [label="<f0> cstr_buffer[0] |<f1> cstr_buffer[1]|<f2> cstr_buffer[2] |<f3> cstr_buffer[3]"]; node3 [label= "<f0>cstr|<f1>hash_size|<f2>type|<f3>ref:3"] hash_table:f3 -> node3 } ``` 當 `count = 0` 時,代表空間沒在被使用過,就可以 free 掉。 ```cpp typedef struct __cstr_buffer { cstring str; } cstr_buffer[1]; ``` 用 hash table 將 `cstring` 包裝起來。 ```cpp #define CSTR_BUFFER(var) \ char var##_cstring[CSTR_STACK_SIZE] = {0}; \ struct __cstr_data var##_cstr_data = {var##_cstring, 0, CSTR_ONSTACK, 0}; \ cstr_buffer var; \ var->str = &var##_cstr_data; ``` 這 macro 表達創建一個空的 hash table , ## 為連接的意思 e.g. `CSTR_BUFFER(king)` 表示 `king##_cstring` 為 `king_cstring` ,起始 `type = CSTR_ONSTACK` 。 ```cpp struct __cstr_node { char buffer[CSTR_INTERNING_SIZE]; struct __cstr_data str; struct __cstr_node *next; }; ``` `CSTR_INTERNING_SIZE` = `32` ```cpp struct __cstr_pool { struct __cstr_node node[INTERNING_POOL_SIZE]; }; ``` `INTERNING_POOL_SIZE` = `1024` ```cpp cstring cstr_cat(cstr_buffer sb, const char *str) { cstring s = sb->str; if (s->type == CSTR_ONSTACK) { int i = CCC; while (i < CSTR_STACK_SIZE - 1) { s->cstr[i] = *str; if (*str == 0) return s; ++s->hash_size; ++str; ++i; } s->cstr[i] = 0; } cstring tmp = s; sb->str = cstr_cat2(tmp->cstr, str); cstr_release(tmp); return sb->str; } ``` `cstr_cat` function 目的:名稱 `cat` 可以看出,他是將 `str` 與 `sb->str` 連接的意思,連接後還需要把 `cstring` 中的資料更新, 因此要將本來 `cstring->hash_size` + `strlen(str)` 並更新, `i < CSTR_STACK_SIZE` 代表還能寫入,`CSTR_STACK_SIZE = 128`代表寫入 cstring 中最長的字串長度 , `i` 代表已寫入字串的長度, `CCC` = `i = s->hash_size` ```cpp while (i < CSTR_STACK_SIZE - 1) { s->cstr[i] = *str; if (*str == 0) return s; ++s->hash_size; ++str; ++i; } ``` 依序將 `str` 中的字串,依序寫入 `s->cstr[i]` 中,當`str == 0` `return s` 當 `type = CSTR_PERMANENT` 或 `CSTR_INTERNING` 才會執行 `cstr_cat2` ```cpp static cstring cstr_cat2(const char *a, const char *b) { size_t sa = strlen(a), sb = strlen(b); if (sa + sb < CSTR_INTERNING_SIZE) { char tmp[CSTR_INTERNING_SIZE]; memcpy(tmp, a, sa); memcpy(tmp + sa, b, sb); tmp[sa + sb] = 0; return cstr_interning(tmp, sa + sb, hash_blob(tmp, sa + sb)); } cstring p = xalloc(sizeof(struct __cstr_data) + sa + sb + 1); if (!p) return NULL; char *ptr = (char *) (p + 1); p->cstr = ptr; p->type = 0; p->ref = 1; memcpy(ptr, a, sa); memcpy(ptr + sa, b, sb); ptr[sa + sb] = 0; p->hash_size = 0; return p; } ``` `sa + sb < CSTR_INTERNING_SIZE` 代表有空間寫入`sa + sb > CSTR_INTERNING_SIZE` 代表空間需要再額外空間才能寫入,所以呼叫 `xalloc` 存入 heap 當中。 ```cpp struct __cstr_interning { int lock; int index; unsigned size; unsigned total; struct __cstr_node **hash; struct __cstr_pool *pool; }; ``` `lock` : 上鎖數量 `index` : pool 的 已使用的數量 `size` : hash table 的大小 `total`: hash table 已使用的數量 `buffer` : 會存入與 `cstr` 相同的 `string` ```graphviz digraph hash_table_string { node [shape=record]; rankdir=LR; hash_table [label="<f0> cstr_buffer[0] |<f1> cstr_buffer[1]|<f2> cstr_buffer[2] |<f3> cstr_buffer[3]"]; node1 [label="<f0> buffer |<f1> str|<f2> next"] hash [label = "<f0>hash[0]|<f1>hash[1]|<f2>hash[2]|...|<f3>hash[15]"] data3 [label= "<f0>cstr|<f1>hash_size|<f2>type|<f3>ref:3"] buffer [label="<f0>buffer[0]|<f1>buffer[1]|...|buffer[31]"] pool1 [label="<f0>pool[0]|<f1>pool[1]|<f2>pool[2]|...|<f3>pool[1023]"] interning[label="<fo>lock|<f1>index|<f2>size|<f3>total|<f4>hash|<f5>pool"] hash_table:f3 -> data3; node1:f1 -> data3; interning:f5-> pool1; interning:f4-> hash:f1; hash:f1->node1:f1; node1:f0->buffer:f0; pool1:f0->node1:f1 } ``` ```cpp static cstring cstr_interning(const char *cstr, size_t sz, uint32_t hash) { cstring ret; CSTR_LOCK(); ret = interning(&__cstr_ctx, cstr, sz, hash); if (!ret) { expand(&__cstr_ctx); ret = interning(&__cstr_ctx, cstr, sz, hash); } ++__cstr_ctx.total; CSTR_UNLOCK(); return ret; } ``` 在寫入時,`CSTR_LOCK()` 會 lock 住,並在寫完後解鎖 `CSTR_UNLOCK()` ```cpp static cstring interning(struct __cstr_interning *si, const char *cstr, size_t sz, uint32_t hash) { if (!si->hash) return NULL; int index = (int) (hash & (si->size - 1)); struct __cstr_node *n = si->hash[index]; while (n) { if (n->str.hash_size == hash) { if (!strcmp(n->str.cstr, cstr)) return &n->str; } n = n->next; } // 80% (4/5) threshold if (si->total * 5 >= si->size * 4) return NULL; if (!si->pool) { si->pool = xalloc(sizeof(struct __cstr_pool)); si->index = 0; } n = &si->pool->node[si->index++]; memcpy(n->buffer, cstr, sz); n->buffer[sz] = 0; cstring cs = &n->str; cs->cstr = n->buffer; cs->hash_size = hash; cs->type = CSTR_INTERNING; cs->ref = 0; n->next = si->hash[index]; si->hash[index] = n; return cs; } ``` `pool` : 將會是連續的依序存取 `pool[index-1]` `hash table` : 會 parameter 存入對應的 `hash[hash]` ```cpp static inline uint32_t hash_blob(const char *buffer, size_t len) { const uint8_t *ptr = (const uint8_t *) buffer; size_t h = len; size_t step = (len >> 5) + 1; for (size_t i = len; i >= step; i -= step) h = h ^ ((h << 5) + (h >> 2) + ptr[i - 1]); return h == 0 ? 1 : h; } ```