# 2021q1 Homework2 (quiz2) contributed by < `hankluo6` > > [第 2 週測驗題](https://hackmd.io/@sysprog/linux2021-quiz2) ## 測驗 `1` #### `COND1 = ?` - [x] `(c) fast->next == list` #### `COND2 = ?` - [x] `(b) fast->next->next == list` #### `MMM = ?` - [x] `(e) `&get_middle(&q->list)->list #### `TTT = ?` - [x] `(a) slow` ### 運作原理 * `get_middle(struct list_head *list)` ```c static list_ele_t *get_middle(struct list_head *list) { struct list_head *fast = list->next, *slow; list_for_each (slow, list) { if (fast->next == list || fast->next->next == list) break; fast = fast->next->next; } return list_entry(slow, list_ele_t, list); } ``` 取得 linked list 中間的 entry,`struct list_head *list` 為 linked list 的 head,不存在 data,所以要取得 entry 時以 `list->next` 為第一個 entry 開始。 `fast` 及 `slow` 兩個指標分別以 `2:1` 的 step 遍歷,當 `fast` 到達尾端時,`slow` 位置即為 linked list 的 $\frac{1}{2}$ 個 entry。 因為 `list.h` 中實作為 circular linked list,判斷是否為尾端使用 `fast->next == list` 即可,但是 `fast` 為每次迴圈移動兩個 step,只判斷 `fast->next == list` 只有在 $size$ 為奇數時成立,如果 linked list 中 node 個數為偶數時,`get_middle()` 應回傳第 $\frac{1}{2} \times size$ 或 $\frac{1}{2} \times size + 1$ 個 entry,其中 $size$ 為 linked list 中不包含 `head` 的個數,故分成兩部份討論: 1. 當 `get_middle()` 回傳 $\frac{1}{2} \times size$ 時,判斷式為 `fast->next == list || fast->next->next == list`,`slow` 為第 $\frac{1}{2} \times size$ 個 node 時,`fast` 應為第 $size - 1$ 個 node,其 `fast->next->next == list`。 2. 當 `get_middle()` 回傳 $\frac{1}{2} \times size + 1$ 時,判斷式為 `fast->next == list || fast == list`,`slow` 為第 $\frac{1}{2} \times size + 1$ 個 node 時,應為上方情形再多一次迴圈,故 `fast` 會往後移動兩個 step 重新回到 `list` 這個 head。 * `list_merge` ```cpp static void list_merge(struct list_head *lhs, struct list_head *rhs, struct list_head *head) { INIT_LIST_HEAD(head); if (list_empty(lhs)) { list_splice_tail(lhs, head); return; } if (list_empty(rhs)) { list_splice_tail(rhs, head); return; } while (!list_empty(lhs) && !list_empty(rhs)) { char *lv = list_entry(lhs->next, list_ele_t, list)->value; char *rv = list_entry(rhs->next, list_ele_t, list)->value; struct list_head *tmp = strcmp(lv, rv) <= 0 ? lhs->next : rhs->next; list_del(tmp); list_add_tail(tmp, head); } list_splice_tail(list_empty(lhs) ? rhs : lhs, head); } ``` 將 `lhs` 與 `rhs` 兩個 list 合併至 `head` 中,先判斷是否有一邊的 list 為空,將另一邊的 list 加入至 `head` 中並直接回傳,如果兩個 list 皆有元素,則依照大小來選擇。最後當有一邊的 list 處理完後,直接把另一邊的 list 剩下的 node 加入到 `head` 中。 * `list_merge_sort` ```cpp void list_merge_sort(queue_t *q) { if (list_is_singular(&q->list)) return; queue_t left; struct list_head sorted; INIT_LIST_HEAD(&left.list); list_cut_position(&left.list, &q->list, &get_middle(&q->list)->list); list_merge_sort(&left); list_merge_sort(q); list_merge(&left.list, &q->list, &sorted); INIT_LIST_HEAD(&q->list); list_splice_tail(&sorted, &q->list); } ``` 透過 `list_cut_position` 將 `q->list` 分成兩部分,再分別對兩個 sub-list 做 `list_merge_sort`,最後經過 `list_merge` 合併至 `sorted` 中。此時資料存在以 `sorted` 為 head 的 list 中,故需要再將 list 存回至 `q->list` 內。 此程式有個錯誤在 `list_merge()` 的兩個判斷式上,應為在一邊 list 為空時,添加另一邊的 list 至 `head` 中: ```diff if (list_empty(lhs)) { - list_splice_tail(lhs, head); + list_splice_tail(rhs, head); return; } if (list_empty(rhs)) { - list_splice_tail(rhs, head); + list_splice_tail(lhs, head); return; } ``` 但此問題並不影響程式輸出,分析程式碼,可以發現 `list_merge(&left.list, &q->list, &sorted);` 在 `left.list` 或 `q->list` 為 empty 時會出錯,但是前方的 `list_cut_position(&left.list, &q->list, &get_middle(&q->list)->list)` 為從中間之 node 分開,故可保證長度為 2 以上的 list 經過 `list_cut_position` 後能讓 `left.list` 及 `q->list` 皆不為空,而長度為 1 的 list 則會在開頭的 `list_is_singular(&q->list)` 就被剔除,所以可以保證 `lhs` 與 `rhs` 永遠不為 empty,表示上方錯誤的程式碼可以全部移除。 根據上述分析可以知道當 `list_merge_sort` 傳入參數中的 list 為長度 1 以上時可以順利運行,但當長度為 0 時並沒有做適當的處理,應要在檢查 singular 時一併檢查是否為空,否則會導致 segmation fault: ```c void list_merge_sort(queue_t *q) { if (list_empty(&q->list) || list_is_singular(&q->list)) return; ... } ``` ### 說明 Linux 核心的 lib/list_sort.c 最佳化手法 :::warning TODO: 透過 git 取得 [linux 核心的原始程式碼](https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git),然後執行 `git log lib/list_sort.c`,觀察修改記錄。例如: > commit b5c56e0cdd62979dd538e5363b06be5bdf735a09 > Author: George Spelvin <lkml@sdf.org> > Date: Tue May 14 15:43:02 2019 -0700 > lib/list_sort: optimize number of calls to comparison function > > commit 043b3f7b6388fca6be86ca82979f66c5723a0d10 Author: George Spelvin <lkml@sdf.org> Date: Tue May 14 15:42:58 2019 -0700 > lib/list_sort: simplify and remove MAX_LIST_LENGTH_BITS :notes: jserv ::: :::spoiler commit message lib/list_sort: optimize number of calls to comparison function CONFIG_RETPOLINE has severely degraded indirect function call performance, so it's worth putting some effort into reducing the number of times cmp() is called. This patch avoids badly unbalanced merges on unlucky input sizes. It slightly increases the code size, but saves an average of 0.2*n calls to cmp(). x86-64 code size 739 -> 803 bytes (+64) Unfortunately, there's not a lot of low-hanging fruit in a merge sort; it already performs only n*log2(n) - K*n + O(1) compares. The leading coefficient is already at the theoretical limit (log2(n!) corresponds to K=1.4427), so we're fighting over the linear term, and the best mergesort can do is K=1.2645, achieved when n is a power of 2. The differences between mergesort variants appear when n is *not* a power of 2; K is a function of the fractional part of log2(n). Top-down mergesort does best of all, achieving a minimum K=1.2408, and an average (over all sizes) K=1.248. However, that requires knowing the number of entries to be sorted ahead of time, and making a full pass over the input to count it conflicts with a second performance goal, which is cache blocking. Obviously, we have to read the entire list into L1 cache at some point, and performance is best if it fits. But if it doesn't fit, each full pass over the input causes a cache miss per element, which is undesirable. While textbooks explain bottom-up mergesort as a succession of merging passes, practical implementations do merging in depth-first order: as soon as two lists of the same size are available, they are merged. This allows as many merge passes as possible to fit into L1; only the final few merges force cache misses. This cache-friendly depth-first merge order depends on us merging the beginning of the input as much as possible before we've even seen the end of the input (and thus know its size). The simple eager merge pattern causes bad performance when n is just over a power of 2. If n=1028, the final merge is between 1024- and 4-element lists, which is wasteful of comparisons. (This is actually worse on average than n=1025, because a 1024:1 merge will, on average, end after 512 compares, while 1024:4 will walk 4/5 of the list.) Because of this, bottom-up mergesort achieves K < 0.5 for such sizes, and has an average (over all sizes) K of around 1. (My experiments show K=1.01, while theory predicts K=0.965.) There are "worst-case optimal" variants of bottom-up mergesort which avoid this bad performance, but the algorithms given in the literature, such as queue-mergesort and boustrodephonic mergesort, depend on the breadth-first multi-pass structure that we are trying to avoid. This implementation is as eager as possible while ensuring that all merge passes are at worst 1:2 unbalanced. This achieves the same average K=1.207 as queue-mergesort, which is 0.2*n better then bottom-up, and only 0.04*n behind top-down mergesort. Specifically, defers merging two lists of size 2^k until it is known that there are 2^k additional inputs following. This ensures that the final uneven merges triggered by reaching the end of the input will be at worst 2:1. This will avoid cache misses as long as 3*2^k elements fit into the cache. (I confess to being more than a little bit proud of how clean this code turned out. It took a lot of thinking, but the resultant inner loop is very simple and efficient.) ::: `list_sort` 也效利用了 doubly linked list 中的 `prev` 與 `next` 指標,達成可以以類似 dfs 的順序做 botton up merge,而非傳統 level order 的 merge,這樣便能對 cache 有叫好的 locality,因為操作的節點皆為剛剛才使用過的節點。 但只是單純的 dfs 會產生 unbalanced 的問題,如 1028 個節點時,merge sort 做到結束時會產生 1024 個已排序的 linked list 與 4 個未排序的 linked list,此時平均需要再遍歷 $\frac{4}{5}$ 個 list 才能完成排序。 所以 `list_sort` 提出一種 worst-case optimal 的方法來避免這種不平衡的 merge,就是保證排序長度為 $2^k, 2^{k-1}, ..., 2^{1}$ 皆出現在當前處理完的 linked list 部份中,如果要產生 $2^{k+1}$ 的 linked list 的話,便需要兩個長度為 $2^k$ 的 linked list,與長度為 $2^{k-1}, ..., 2^{1}$ 的 linked list 各一個,此時 merge 完後的 linked list 長度各為 $2^{k+1}, 2^{k-1},...,2^1$,如果這時已經到 linked list 的結尾的話,只需要將最後一個節點從 $2^1$ merge 到 $2^{k+1}$ 即完成 merge sort,且可以發現中間 merge 的數量比例最多只會為 2:1(長度為 $2^1$ 與 $1$ 及 $2^{k+1}$ 與 $2^{k}$ 做 merge 的時候)。 要實作上述特徵只需要使用一個變數 `count` 紀錄當前處理的長度即可: | `count` | merge | range | length | |:-------:|:-----:|:-------:|:------:| | 00000 | X | X | X | | 00001 | X | X | X | | 00010 | O | [0,1] | 2 | | 00011 | X | X | X | | 00100 | O | [2,3] | 2 | | 00101 | O | [0,3] | 4 | | 00110 | O | [4,5] | 2 | | 00111 | X | X | X | | 01000 | O | [6,7] | 2 | | 01001 | O | [4,7] | 4 | | 01010 | O | [8,9] | 2 | | 01011 | O | [0,7] | 8 | | 01100 | O | [10,11] | 2 | | 01101 | O | [8,11] | 4 | | 01110 | O | [12,13] | 2 | | 01111 | X | X | X | | 10000 | O | [14,15] | 2 | `count` 為當前處理的節點數,處理完的節點會被放置到 `pending` 中,merge 為此次迴圈是否需要 merge,可以發現當為 2 的冪時不需要 merge,range 為 merge 時的範圍,length 為此範圍的長度,可以看到長度的順序符合上述的分析,便能處理 unbalance 的問題。 `list_sort` 是利用 `prev` 成員連接每個不同的 sorted list,而使用 `next` 成員連接與自身節點排序的 list。如下圖: ```graphviz digraph G { rankdir=LR; node [shape=record]; a3 [label="{ <data> 14 | <ref> }"] b3 [label="{ <data> 15 | <ref> }"]; a3:ref:f -> b3:data [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false, label="next"]; a2 [label="{ <data> 12 | <ref> }"] b2 [label="{ <data> 13 | <ref> }"]; a2:ref:f -> b2:data [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false, label="next"]; a1 [label="{ <data> 8 | <ref> }"] b1 [label="{ <data> 9 | <ref> }"]; c1 [label="{ <data> 10 | <ref> }"]; d1 [label="{ <data> 11 | <ref> }"]; a1:ref:f -> b1:data [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false, label="next"]; b1:ref:f -> c1:data [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false, label="next"]; c1:ref:f -> d1:data [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false, label="next"]; a [label="{ <data> 0 | <ref> }"] b [label="{ <data> 1 | <ref> }"]; c [label="{ <data> 2 | <ref> }"]; d [label="{ <data> 3 | <ref> }"]; e [label="{ <data> 4 | <ref> }"]; f [label="{ <data> 5 | <ref> }"]; g [label="{ <data> 6 | <ref> }"]; h [label="{ <data> 7 | <ref> }"]; a:ref:f -> b:data [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false, label="next"]; b:ref:f -> c:data [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false, label="next"]; c:ref:f -> d:data [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false, label="next"]; d:ref:f -> e:data [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false, label="next"]; e:ref:f -> f:data [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false, label="next"]; f:ref:f -> g:data [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false, label="next"]; g:ref:f -> h:data [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false, label="next"]; a3:ref->a2:data [label="prev"] a2:ref->a1:data [label="prev"] a1:ref->a:data [label="prev"] } ``` ### 與 lib/list_sort.c 效能比較 * 效能比較撰寫在 [quiz1](https://hackmd.io/wo78awq3S1aVdADGmQZ_Lg?view#tip1) --- ## 測驗 `2` #### `X = ?` - [x] `(b) 2` #### `Y = ?` - [x] `(d) 4` #### `Z = ?` - [x] `(h) 8` ### 運作原理 ```c uint16_t func(uint16_t N) { /* change all right side bits to 1 */ N |= N >> 1; N |= N >> 2; N |= N >> 4; N |= N >> 8; return (N + 1) >> 1; } ``` 此函式為將 `N` 無條件捨去至 2 的倍數,此技巧是將最高位元為 1 的位置到 LSB 皆設為 1,再經過 `N + 1` 便會進位,故再往右移 1 位元即可。 原本懷疑當輸入 `N` 為 $2^{15}$ 時導致 overflow 並回傳 0,實際測試後卻發現能順利回傳 $2^{15}$。根據 [C99 規格書](http://www.open-std.org/jtc1/sc22/wg14/www/docs/n1256.pdf)中提到: >If an int can represent all values of the original type, the value is converted to an int; otherwise, it is converted to an unsigned int. These are called the integer promotions. >All other types are unchanged by the integer promotions. 此為 [integer promotion](https://zh.wikipedia.org/wiki/%E6%95%B4%E5%9E%8B%E6%8F%90%E5%8D%87) 的行為,在 `(N + 1) >> 1` 的表達式中,`N` 會被轉型為 integer,而在回傳時再轉型為 `uint16_t`,故不會發生 overflow。 ### `The Linux Kernel API` * `is_power_of_2` ```c /** * is_power_of_2() - check if a value is a power of two * @n: the value to check * * Determine whether some value is a power of two, where zero is * *not* considered a power of two. * Return: true if @n is a power of 2, otherwise false. */ static inline __attribute__((const)) bool is_power_of_2(unsigned long n) { return (n != 0 && ((n & (n - 1)) == 0)); } ``` 當數字 `n` 為 power of 2 時,代表其位元表示法中只有一位數為 1,故 `n - 1` 為從該位元 - 1 到 LSB 的數字皆為 1,此時與原先的 `n` 做 and 運算便能決定是否為 power of 2,如果數字 `n` 不為 power of 2 時,`n - 1` 必定不會改變最靠近 MSB 且為 1 的位元,故 and 運算後該位元仍為 1,此時便會回傳 false。 * `__roundup_pow_of_two` & `__rounddown_pow_of_two` ```c /** * __roundup_pow_of_two() - round up to nearest power of two * @n: value to round up */ static inline __attribute__((const)) unsigned long __roundup_pow_of_two(unsigned long n) { return 1UL << fls_long(n - 1); } /** * __rounddown_pow_of_two() - round down to nearest power of two * @n: value to round down */ static inline __attribute__((const)) unsigned long __rounddown_pow_of_two(unsigned long n) { return 1UL << (fls_long(n) - 1); } static inline unsigned fls_long(unsigned long l) { if (sizeof(l) == 4) return fls(l); return fls64(l); } ``` `fls` 指的是 find last set bit (MSB) in word,如 fls(0) = 0, fls(1) = 1, fls(0x80000000) = 32。 當 `n` 不為 power of 2 時,`n - 1` 不會影響最靠近 MSB 且為 1 的位元,故 `fls_long(n - 1)` 的值都一樣為該位元 + 1,`__roundup_pow_of_two` 便能做到 roundup 的效果。而當 `n` 為 power of 2 時,`n - 1` 會向最靠近 MSB 且為 1 的位元借位,導致 `fls` 回傳的值少 1,讓 roundup 傳回原本的值。 `__rounddown_pow_of_two` 在 $2^k \sim 2^{k+1}-1$ 內要回傳相同的值,而這與 `fls_long` 的範圍相同,故直接帶入 `fls_long(n) - 1` 即為 rounddown 後最高位元為 1 的位置。 `fls` 與 `fls64` 在核心中根據硬體不同而有不同的實作,其內部為 gcc extension 的 `__builtin_clz` 或是組合語言實作。某些硬體會提供 [BSR](https://c9x.me/x86/html/file_module_x86_id_20.html) 、 [LZCNT](https://www.felixcloutier.com/x86/lzcnt) 或 [CLZ](https://www.keil.com/support/man/docs/armasm/armasm_dom1361289868426.htm) 等指令來加速運算,linux kernel 及 `__builtin_clz` 皆會根據硬體來選擇適當的指令,如此便能提供良好的移植性與效能兼顧。 --- ## 測驗 `3` ### `RRR = ?` - [x] `(a) data <<= read_lhs` ### `DDD = ?` - [x] `(b) mask |= write_mask[write_lhs + bitsize]` ### 運作原理 ```c= while (count > 0) { uint8_t data = *source++; size_t bitsize = (count > 8) ? 8 : count; if (read_lhs > 0) { data <<= read_lhs; if (bitsize > read_rhs) data |= (*source >> read_rhs); } if (bitsize < 8) data &= read_mask[bitsize]; uint8_t original = *dest; uint8_t mask = read_mask[write_lhs]; if (bitsize > write_rhs) { /* Cross multiple bytes */ *dest++ = (original & mask) | (data >> write_lhs); original = *dest & write_mask[bitsize - write_rhs]; *dest = original | (data << write_rhs); } else { // Since write_lhs + bitsize is never >= 8, no out-of-bound access. mask |= write_mask[write_lhs + bitsize]; *dest++ = (original & mask) | (data >> write_lhs); } count -= bitsize; } ``` 根據題目敘述: ![](https://i.imgur.com/hJWhXkF.png) * `_read` 為從 MSB 數來的第 `_read` 個位元開始讀取 * `_write` 為從 MSB 數來的第 `_write` 個位元寫入 故可推測出 `_read_lhs` 表示 left hand side (MSB) 開始算, `_read_rhs` 表示 right hand side (MSB) 開始算:`_write_lhs` 與 `_write_rhs` 同理。 `while` 迴圈會以 byte 為單位操作,當剩餘資料不足一個 byte 時,`bitsize` 會被設置為剩餘 bit 的數量。`uint8_t data` 為要傳遞的資料,要注意要傳遞的所有 bit 皆會靠左 (MSB) 對齊。第 4 行的 `if (read_lhs > 0)` 檢查 read 的起始位置是否為 0,如果不為 0 的話,則需要把資料靠左對齊。而第 9 行則是當要讀取的 bit 數 `bitsize` 比 `_read_rhs` 小時,意味著 `bitsize > (8 - _read_lhs)`,需要讀取到下一個 byte 的 bits,透過 `data |= (*source >> read_rhs);` 將剩下要讀取的 bits 移到 `data` 中的 LSB。 ```graphviz digraph { node [shape=record, fontcolor=black, fontsize=14, width=3, fixedsize=true]; a [label="1+ |0+ |1* |1* |0* |0* |0* |1*", color=black, fillcolor=white, style=filled, fontsize=20]; b [label="1* |1* |0* |0* |0* |1* |0+ |0+", color=black, fillcolor=white, style=filled, fontsize=20]; node [shape=plaintext, fontcolor=black, fontsize=50, width=1]; "|" node [shape=record, fontcolor=black, fontsize=14, width=3, fixedsize=true]; c [label="0* |1+ |1+ |1+ |0+ |1+ |1+ |0+", color=black, fillcolor=white, style=filled, fontsize=20]; d[label="0+ |0+ |0+ |0+ |0+ |0+ |0* |1+", color=black, fillcolor=white, style=filled, fontsize=20]; node [shape=plaintext, fontcolor=black, fontsize=30, width=1]; "*Source"->a [style=invis] "*(Source + 1)"->c [style=invis] node [shape=plaintext, fontcolor=black, fontsize=50, width=1]; "=" a->b [label=" << 2", fontsize=25] c->d [label=" >> 6", fontsize=25] node [shape=record, fontcolor=black, fontsize=14, width=3, fixedsize=true]; e[label="1* |1* |0* |0* |0* |1* |0* |1+", color=black, fillcolor=white, style=filled, fontsize=20]; { rank=same; a; c} { rank=same; b; d; "|"; "="; e} node [shape=plaintext, fontcolor=black, fontsize=30. width=2]; "_read_lhs = 2"->"_read_rhs = 6"->"bit_size = 7"->e [style=invis]; e->"data" [dir=back]; } ``` * ==\*== 代表要讀取的 bits * ==\+== 代表不需讀取的 bits 可以發現要讀取的 bits 會被存放到 `data` 的高位元處。第 10 行則是將 `data` 只取`bit_size` 範圍,其他範圍的 bits 則被設置為 0,如圖上`data` LSB 內的 1 會被設置為 0,防止 write 的時候讀取到。 `mask` 的作用是將沒有寫入的 bits 保留,防止被 `data` 複寫。第 15 行跟第 20 行的判斷式用來檢查要寫入的 bit 是否橫跨兩個 byte。 ```c=15 if (bitsize > write_rhs) { /* Cross multiple bytes */ *dest++ = (original & mask) | (data >> write_lhs); original = *dest & write_mask[bitsize - write_rhs]; *dest = original | (data << write_rhs); } ``` 當橫跨兩個 byte 時,`original & mask` 為紀錄不需要改動的 bit,而 `data >> write_lhs` 則是將要寫入的 bit 移動到 `write_lhs` 為開頭的位置,再做 or 運算即是這個 byte 的新資料。`write_mask[bitsize - write_rhs]` 為下一個 byte 需要寫入的 bits 位置,將還未寫進的 bits (`data << write_lhs`) 寫入到下個 byte 中。 ```c=20 else { // Since write_lhs + bitsize is never >= 8, no out-of-bound access. mask |= write_mask[write_lhs + bitsize]; *dest++ = (original & mask) | (data >> write_lhs); } ``` 而不橫跨兩個 byte 時,`mask |= write_mask[write_lhs + bitsize];` 用來紀錄不須寫入的 bits 位置,此時 mask 在要寫入的 bit 位置為 0,其他位置與 `dest` 原本的 bit 相同。第 23 行則與上方相同,將 `data` 位移到寫入的位置並與原本的資料做 or 運算。 ```graphviz digraph { node [shape=record, fontcolor=black, fontsize=14, width=3, fixedsize=true]; a [label="0+ |1+ |0+ |1+ |1+ |0* |1* |1*", color=black, fillcolor=white, style=filled, fontsize=20]; b [label="0+ |1+ |0+ |1+ |1+ |0* |0* |0*", color=black, fillcolor=white, style=filled, fontsize=20]; f [label="1 |1 |1 |1 |1 |1 |0 |0", color=black, fillcolor=white, style=filled, fontsize=20]; node [shape=plaintext, fontcolor=black, fontsize=50, width=1]; "|" node [shape=record, fontcolor=black, fontsize=14, width=3, fixedsize=true]; c [label="1* |1* |0* |0* |0* |1* |0* |1+", color=black, fillcolor=white, style=filled, fontsize=20]; d[label="0+ |0+ |0+ |0+ |0+ |1* |1* |0*", color=black, fillcolor=white, style=filled, fontsize=20]; node [shape=plaintext, fontcolor=black, fontsize=30, width=1]; "*Original"->a [style=invis] "data"->c [style=invis] "mask" node [shape=plaintext, fontcolor=black, fontsize=50, width=1]; "=" a->b [label=" & mask", fontsize=25] c->d [label=" >> _write_lhs", fontsize=25] b->"mask" [style=invis] node [shape=record, fontcolor=black, fontsize=14, width=3, fixedsize=true]; e[label="0+ |1+ |0+ |1+ |1+ |1* |1* |0*", color=black, fillcolor=white, style=filled, fontsize=20]; { rank=same; a; c} { rank=same; b; d; "|"; "="; e} { rank=same; "mask"; f;} node [shape=plaintext, fontcolor=black, fontsize=30. width=2]; "_write_lhs = 5"->"_write_rhs = 3"->"bit_size = 7"->e [style=invis]; } ``` ```graphviz digraph { node [shape=record, fontcolor=black, fontsize=14, width=3, fixedsize=true]; a [label="1* |0* |1* |1* |0+ |1+ |0+ |0+", color=black, fillcolor=white, style=filled, fontsize=20]; b [label="0* |0* |0* |0* |0+ |1+ |0+ |0+", color=black, fillcolor=white, style=filled, fontsize=20]; f [label="1 |1 |1 |1 |1 |1 |0 |0", color=black, fillcolor=white, style=filled, fontsize=20]; node [shape=plaintext, fontcolor=black, fontsize=50, width=1]; "|" node [shape=record, fontcolor=black, fontsize=14, width=3, fixedsize=true]; c [label="1* |1* |0* |0* |0* |1* |0* |1+", color=black, fillcolor=white, style=filled, fontsize=20]; d[label="0* |0* |1* |0* |0+ |0+ |0+ |0+", color=black, fillcolor=white, style=filled, fontsize=20]; node [shape=plaintext, fontcolor=black, fontsize=30, width=1]; "*(Original + 1)"->a [style=invis] "data"->c [style=invis] "mask" node [shape=plaintext, fontcolor=black, fontsize=50, width=1]; "=" a->b [label=" & write_mask[bitsize - write_rhs]", fontsize=25] c->d [label=" << _write_rhs", fontsize=25] b->"mask" [style=invis] node [shape=record, fontcolor=black, fontsize=14, width=3, fixedsize=true]; e[label="0* |0* |1* |0* |0+ |1+ |0+ |0+", color=black, fillcolor=white, style=filled, fontsize=20]; { rank=same; a; c} { rank=same; b; d; "|"; "="; e} { rank=same; "mask"; f;} node [shape=plaintext, fontcolor=black, fontsize=30. width=2]; "_write_lhs = 5"->"_write_rhs = 3"->"bit_size = 7"->e [style=invis]; } ``` * \* 代表要寫入的 bits * \+ 代表不需寫入的 bits 可以發現 `source` 第 2 個位置後的 7 個 bits `1100010` 被寫入到 `dest` 的第 5 個 bits 後方。 ### Linux 核心原始程式碼中逐 bit 資料複製 ```c static inline void bitmap_copy(unsigned long *dst, const unsigned long *src, unsigned int nbits) { unsigned int len = BITS_TO_LONGS(nbits) * sizeof(unsigned long); memcpy(dst, src, len); } /* * Copy bitmap and clear tail bits in last word. */ static inline void bitmap_copy_clear_tail(unsigned long *dst, const unsigned long *src, unsigned int nbits) { bitmap_copy(dst, src, nbits); if (nbits % BITS_PER_LONG) dst[nbits / BITS_PER_LONG] &= BITMAP_LAST_WORD_MASK(nbits); } ``` 目前只找到 [linux/include/linux/bitmap.h]() 中的 `bitmap_copy` 及 `bitmap_copy_clear_tail`,但此兩函式皆不會保留 byte 內沒有複製到的 bit。 --- ## 測驗 `4` ### `CCC = ?` - [x] `(c) s->hash_size` ### 程式原理 ```c enum { CSTR_PERMANENT = 1, CSTR_INTERNING = 2, CSTR_ONSTACK = 4, }; ``` string interning 有三種類型,分別為 `CSTR_PREMANENT` (TODO)、`CSTR_INTERNING` 用來處理 Literal 的常數字串、`CSTR_ONSTACK` 處理以 `CSTR_BUFFER` 產生出來且長度較小的字串變數。 ```c #define CSTR_BUFFER(var) \ char var##_cstring[CSTR_STACK_SIZE] = {0}; \ struct __cstr_data var##_cstr_data = {var##_cstring, 0, CSTR_ONSTACK, 0}; \ cstr_buffer var; \ var->str = &var##_cstr_data; ``` 變數類型的 string 透過 `CSTR_BUFFER` 宣告,先建立一個 size 為 `CSTR_STACK_SIZE (128)` 空間的 array of char 用來存放字串,並將 `struct __cstr_data` 內的 `cstr` 欄位指向該空間,且使用 `cstr_buffer` 內的 `str` 空間指向這個 structure。 ```c #define CSTR_LITERAL(var, cstr) \ static cstring var = NULL; \ if (!var) { \ cstring tmp = cstr_clone("" cstr, (sizeof(cstr) / sizeof(char)) - 1); \ if (tmp->type == 0) { \ tmp->type = CSTR_PERMANENT; \ tmp->ref = 0; \ } \ if (!__sync_bool_compare_and_swap(&var, NULL, tmp)) { \ if (tmp->type == CSTR_PERMANENT) \ free(tmp); \ } \ } ``` 常數類型的 string 透過 `CSTR_LITERAL` 宣告,透過 `cstr_clone` 建立 intern 字串 (詳見下方 `cstr_clone` 函式),而當 `malloc` 失敗 (空間不足) 或是字串過長 (超過 `CSTR_INTERNING_SIZE`) 時,`tmp->type` 被設置為 0,故會將 `tmp->type` 重新設置為 `CSTR_PERMANENT`。 下方的 `if` 判斷式推測是為了處理多執行緒的問題,當兩個執行緒同時運行 `CSTR_LITERAL` 時,兩者皆進入到 `if (!var)` 判斷式內部,且剛好此字串長度大於 `CSTR_INTERNING_SIZE` 而配置兩個空間,故第一個執行緒在下方的 `if` 中運行 `__sync_bool_compare_and_swap(&var, NULL, tmp)` 時回傳 true,而之後的執行緒在運行此行時皆會回傳 false,便能刪除多餘的空間。 > [bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)](https://gcc.gnu.org/onlinedocs/gcc-4.1.1/gcc/Atomic-Builtins.html) > >These builtins perform an atomic compare and swap. That is, if the current value of *ptr is oldval, then write newval into *ptr. The “bool” version returns true if the comparison is successful and newval was written. The “val” version returns the contents of *ptr before the operation. 而當 `type` 不為 0 時就不用移除是因為在 `cstr_interning` 內部,已經有 `CSTR_LOCK()` 與 `CSTR_UNLOCK()` 防止同時建立兩個相同 string 的機制了。 ```c= cstring cstr_cat(cstr_buffer sb, const char *str) { cstring s = sb->str; if (s->type == CSTR_ONSTACK) { int i = s->hash_size; while (i < CSTR_STACK_SIZE - 1) { s->cstr[i] = *str; if (*str == 0) return s; ++s->hash_size; ++str; ++i; } s->cstr[i] = 0; } cstring tmp = s; sb->str = cstr_cat2(tmp->cstr, str); cstr_release(tmp); return sb->str; } static cstring cstr_cat2(const char *a, const char *b) { size_t sa = strlen(a), sb = strlen(b); if (sa + sb < CSTR_INTERNING_SIZE) { char tmp[CSTR_INTERNING_SIZE]; memcpy(tmp, a, sa); memcpy(tmp + sa, b, sb); tmp[sa + sb] = 0; return cstr_interning(tmp, sa + sb, hash_blob(tmp, sa + sb)); } cstring p = xalloc(sizeof(struct __cstr_data) + sa + sb + 1); if (!p) return NULL; char *ptr = (char *) (p + 1); p->cstr = ptr; p->type = 0; p->ref = 1; memcpy(ptr, a, sa); memcpy(ptr + sa, b, sb); ptr[sa + sb] = 0; p->hash_size = 0; return p; } ``` 透過 `cstr_cat` 將字串加入到此 string buffer 當中,分成三種情況: 1. 字串長度小於 `CSTR_STACK_SIZE (128)` 透過 line 4 ~ line 15 來處理,將字串放入 `sb->str->cstr` 中,並以 `sb->str->hash_size` 紀錄當前字串長度。 2. 字串長度大於等於 `CStR_STACK_SIZE` 且小於 `CSTR_INTERNING_SIZE (32)` 進入到 `cstr_cat2` 函式,透過前半段 line 25 ~ line 31 來處理,此時該字串會被放入 `struct __cstr_node` 內的 `buffer` 當中,並把該結構中的成員 `cstring str` 回傳出來,而該結構也會放入內部的 `pool` 及 `hash` 當中,並將回傳的 `str->type` 改為 `CSTR_INTERNING`,且此時 `str->hash_size` 的值用來紀錄此字串的 hash 值。(詳細見下方 `cstr_interning` 函式) 3. 字串長度大於等於 `CSTR_INTERNING_SIZE` 此時會將之串放入到 `struct __cstr_data` 的尾端,並將 `cstring` 的 `type` 設定為 0,`ref` 設定為 1。 ```c struct __cstr_node { char buffer[CSTR_INTERNING_SIZE]; struct __cstr_data str; struct __cstr_node *next; }; struct __cstr_pool { struct __cstr_node node[INTERNING_POOL_SIZE]; }; struct __cstr_interning { int lock; int index; unsigned size; unsigned total; struct __cstr_node **hash; struct __cstr_pool *pool; }; ``` string interning 內部使用 `struct __cstr_interning` 來儲存資料,其成員作用如下: * `lock` 防止多執行續同時處理同樣字串造成錯誤 * `index` 紀錄目前使用到 `pool` 中的第幾個位置 * `size` 紀錄目前 `hash` 的容量 (capacity) * `total` 目前 intern 的字串數量 * `hash` 所有存有 intern 字串的 `struct __cstr_node` 以此 hash 連結,其內部的 `struct __cstr_node` 為 `pool` 中的 `node` * `pool` 為 memory pool,預先分配足夠的 `struct __cstr_node` 便不需要頻繁的 `malloc`,且能讓記憶體更連續防止碎片化 `total` 增加的位置應該要在 `interning` 函式內,而不是在 `cstr_interning` 函式內,假如字串已經存在在 intern hash table 內,此時 `total` 不應該增加,否則會造成實際上 hash 還沒到達限制前就被重新分配空間。應該要放在 `interning` 內並在當從 `pool` 內取出新 node 時才增加。 ```c= static cstring interning(struct __cstr_interning *si, const char *cstr, size_t sz, uint32_t hash) { if (!si->hash) return NULL; int index = (int) (hash & (si->size - 1)); struct __cstr_node *n = si->hash[index]; while (n) { if (n->str.hash_size == hash) { if (!strcmp(n->str.cstr, cstr)) return &n->str; } n = n->next; } // 80% (4/5) threshold if (si->total * 5 >= si->size * 4) return NULL; if (!si->pool) { si->pool = xalloc(sizeof(struct __cstr_pool)); si->index = 0; } n = &si->pool->node[si->index++]; memcpy(n->buffer, cstr, sz); n->buffer[sz] = 0; cstring cs = &n->str; cs->cstr = n->buffer; cs->hash_size = hash; cs->type = CSTR_INTERNING; cs->ref = 0; n->next = si->hash[index]; si->hash[index] = n; return cs; } ``` `interning` 用來處理需要 intern 的字串 (`CSTR_LITERAL` 產生的字串或 local variable 太長的字串)。 * line 6 ~ line 7 為第一次進入 `interning` 時,因 hash table 尚未分配空間,經過此行回傳並透過 `expand` 函式重新分配 hash table 空間後再進入。 * line 9 ~ line 17 計算 hash 值並在 hash table 中搜尋,如果已經存在則不需要再從 `pool` 中取 `node` 來用,直接將 `cstring` 取出回傳即可。 * line 19 ~ line 20 當 hash table 快不足時,回傳 NULL 並經過 `expand` 重新分配空間後再進入。 * line 21 ~ line 24 為第一次進入 `interning` 時,因 `pool` 尚未分配空間,經過此行初始化。 * line 25 ~ line 27 因為目前 hash table 中沒有紀錄此字串,故從 `pool` 中取一個新的 `__cstr_node` 的位置 `n`,將字串複製到該結構內的 buffer 中。 * line 29 ~ line 33 主要要回傳的 `cstring` 為此 node 內的 `str` 成員位置,而 `cstring` 的字串則指向此 node 的 buffer,並設置對應的 type 及 hash 值方便之後計算。 * line 35 ~ line 36 將這個新的 node 放到 hash 中。 ```c= static void expand(struct __cstr_interning *si) { unsigned new_size = si->size * 2; if (new_size < HASH_START_SIZE) new_size = HASH_START_SIZE; struct __cstr_node **new_hash = xalloc(sizeof(struct __cstr_node *) * new_size); memset(new_hash, 0, sizeof(struct __cstr_node *) * new_size); for (unsigned i = 0; i < si->size; ++i) { struct __cstr_node *node = si->hash[i]; while (node) { struct __cstr_node *tmp = node->next; insert_node(new_hash, new_size, node); node = tmp; } } free(si->hash); si->hash = new_hash; si->size = new_size; } ``` 此 hash 以 [Dynamic Array](https://en.wikipedia.org/wiki/Dynamic_array) 實作,故當空間不足時,重新分配空間並把原本的 hash table 複製到新的 hash table 上。這邊空間不足是以 80% 為 threshold,因為當 hash 的物件越多,hash table 的時間複雜度會漸漸趨近於 $O(n)$,故等到 hash table 滿時才做 `expand` 會導致 collision 過多。 ```c= static inline uint32_t hash_blob(const char *buffer, size_t len) { const uint8_t *ptr = (const uint8_t *) buffer; size_t h = len; size_t step = (len >> 5) + 1; for (size_t i = len; i >= step; i -= step) h = h ^ ((h << 5) + (h >> 2) + ptr[i - 1]); return h == 0 ? 1 : h; } ``` 利用此 function 計算 hash 值,`buffer` 為目標字串,`len` 為其長度。 ```c= cstring cstr_clone(const char *cstr, size_t sz) { if (sz < CSTR_INTERNING_SIZE) return cstr_interning(cstr, sz, hash_blob(cstr, sz)); cstring p = xalloc(sizeof(struct __cstr_data) + sz + 1); if (!p) return NULL; void *ptr = (void *) (p + 1); p->cstr = ptr; p->type = 0; p->ref = 1; memcpy(ptr, cstr, sz); ((char *) ptr)[sz] = 0; p->hash_size = 0; return p; } ``` `cstr_clone` 為給定一字串,回傳其 `cstring`,當 `cstr_grab` 對 stack 上取字串或是創建 Literal 的 `CSTR_LITERAL` 會呼叫。其內容與 `cstr_cat` 的 2, 3 種情況一樣。 ```c= int cstr_equal(cstring a, cstring b) { if (a == b) return 1; if ((a->type == CSTR_INTERNING) && (b->type == CSTR_INTERNING)) return 0; if ((a->type == CSTR_ONSTACK) && (b->type == CSTR_ONSTACK)) { if (a->hash_size != b->hash_size) return 0; return memcmp(a->cstr, b->cstr, a->hash_size) == 0; } uint32_t hasha = cstr_hash(a); uint32_t hashb = cstr_hash(b); if (hasha != hashb) return 0; return !strcmp(a->cstr, b->cstr); } ``` 比較兩個字串是否相同,如果兩個的 type 皆為 `CSTR_INTERNING` 則回傳 false,因為每個相同 intern 字串只會存在一個。當兩個 type 皆為 `CSTR_ONSTACK` 時,檢查其大小跟內容是否相同。line 12 ~ line 13 為兩個 type 不同,透過 `cstr_hash` 計算其字串 hash 值是否相同。而 line 16 則是當兩個字串 hash 相同,但有可能字串內容不同,故還需對照字串內容。 ```c= static size_t cstr_hash(cstring s) { if (s->type == CSTR_ONSTACK) return hash_blob(s->cstr, s->hash_size); if (s->hash_size == 0) s->hash_size = hash_blob(s->cstr, strlen(s->cstr)); return s->hash_size; } ``` 依據 type 計算其 hash 值,當 `s->type == CSTR_ONSTACK` 時表示其存在 stack 中,`s->hash_size` 為紀錄目前字串長度而非 hash 值,透過 `hash_blob` 計算並回傳。 `s->hash_size == 0` 時,表示其字串過長不存在 intern hash table 中,也是額外計算並回傳。最後一種可能為字串存在 intern hash table 中,其 `s->hash_size` 就是 hash 值。 ```c= cstring cstr_grab(cstring s) { if (s->type & (CSTR_PERMANENT | CSTR_INTERNING)) return s; if (s->type == CSTR_ONSTACK) return cstr_clone(s->cstr, s->hash_size); if (s->ref == 0) s->type = CSTR_PERMANENT; else __sync_add_and_fetch(&s->ref, 1); return s; } ``` 將在 stack 上的字串放入 intern hash table 中,並回傳對應的 `cstring`。第 10 行為當字串是過長字串時,`cstr_grab` 會使其 reference count + 1。 第 8 行目前看起來好像沒作用,會使 `ref` 為 0 只有兩種可能:當字串太長時,經過 `CSTR_LITERAL()` 建立出來的 `cstring`,但此時其 `type` 為 `CSTR_PERMANENT`,故會在第 4 行就回傳;另一種可能是 `cstr_release` 太多次導致 reference count 為 0,但因為 `cstr_release` 內是使用 `__sync_sub_and_fetch` 來處理數值,當 `s->ref` 為 0 時,`s` 就會被釋放,所以在 dereference `s` 時就會出錯。 ```c= void cstr_release(cstring s) { if (s->type || !s->ref) return; if (__sync_sub_and_fetch(&s->ref, 1) == 0) free(s); } ``` 當 type 不為 0 或 reference count 不為 0 時,直接回傳,否則檢查 reference count 是否只剩一個物件要 release,是的話釋放其空間。 ###### tags: `linux2021`