Try   HackMD

2021q1 Homework2 (quiz2)

contributed by < hankluo6 >

第 2 週測驗題

測驗 1

COND1 = ?

  • (c) fast->next == list

COND2 = ?

  • (b) fast->next->next == list

MMM = ?

  • (e) &get_middle(&q->list)->list

TTT = ?

  • (a) slow

運作原理

  • get_middle(struct list_head *list)

    ​​​​static list_ele_t *get_middle(struct list_head *list)
    ​​​​{
    ​​​​    struct list_head *fast = list->next, *slow;
    ​​​​    list_for_each (slow, list) {
    ​​​​        if (fast->next == list || fast->next->next == list)
    ​​​​            break;
    ​​​​        fast = fast->next->next;
    ​​​​    }
    ​​​​    return list_entry(slow, list_ele_t, list);
    ​​​​}
    

    取得 linked list 中間的 entry,struct list_head *list 為 linked list 的 head,不存在 data,所以要取得 entry 時以 list->next 為第一個 entry 開始。

    fastslow 兩個指標分別以 2:1 的 step 遍歷,當 fast 到達尾端時,slow 位置即為 linked list 的

    12 個 entry。

    因為 list.h 中實作為 circular linked list,判斷是否為尾端使用 fast->next == list 即可,但是 fast 為每次迴圈移動兩個 step,只判斷 fast->next == list 只有在

    size 為奇數時成立,如果 linked list 中 node 個數為偶數時,get_middle() 應回傳第
    12×size
    12×size+1
    個 entry,其中
    size
    為 linked list 中不包含 head 的個數,故分成兩部份討論:

    1. get_middle() 回傳
      12×size
      時,判斷式為 fast->next == list || fast->next->next == listslow 為第
      12×size
      個 node 時,fast 應為第
      size1
      個 node,其 fast->next->next == list
    2. get_middle() 回傳
      12×size+1
      時,判斷式為 fast->next == list || fast == listslow 為第
      12×size+1
      個 node 時,應為上方情形再多一次迴圈,故 fast 會往後移動兩個 step 重新回到 list 這個 head。
  • list_merge

    ​​​​static void list_merge(struct list_head *lhs,
    ​​​​                       struct list_head *rhs,
    ​​​​                       struct list_head *head)
    ​​​​{
    ​​​​    INIT_LIST_HEAD(head);
    ​​​​    if (list_empty(lhs)) {
    ​​​​        list_splice_tail(lhs, head);
    ​​​​        return;
    ​​​​    }
    ​​​​    if (list_empty(rhs)) {
    ​​​​        list_splice_tail(rhs, head);
    ​​​​        return;
    ​​​​    }
    
    ​​​​    while (!list_empty(lhs) && !list_empty(rhs)) {
    ​​​​        char *lv = list_entry(lhs->next, list_ele_t, list)->value;
    ​​​​        char *rv = list_entry(rhs->next, list_ele_t, list)->value;
    ​​​​        struct list_head *tmp = strcmp(lv, rv) <= 0 ? lhs->next : rhs->next;
    ​​​​        list_del(tmp);
    ​​​​        list_add_tail(tmp, head);
    ​​​​    }
    ​​​​    list_splice_tail(list_empty(lhs) ? rhs : lhs, head);
    ​​​​}
    

    lhsrhs 兩個 list 合併至 head 中,先判斷是否有一邊的 list 為空,將另一邊的 list 加入至 head 中並直接回傳,如果兩個 list 皆有元素,則依照大小來選擇。最後當有一邊的 list 處理完後,直接把另一邊的 list 剩下的 node 加入到 head 中。

  • list_merge_sort

    ​​​​void list_merge_sort(queue_t *q)
    ​​​​{
    ​​​​    if (list_is_singular(&q->list))
    ​​​​        return;
    
    ​​​​    queue_t left;
    ​​​​    struct list_head sorted;
    ​​​​    INIT_LIST_HEAD(&left.list);
    ​​​​    list_cut_position(&left.list, &q->list, &get_middle(&q->list)->list);
    ​​​​    list_merge_sort(&left);
    ​​​​    list_merge_sort(q);
    ​​​​    list_merge(&left.list, &q->list, &sorted);
    ​​​​    INIT_LIST_HEAD(&q->list);
    ​​​​    list_splice_tail(&sorted, &q->list);
    ​​​​}
    

    透過 list_cut_positionq->list 分成兩部分,再分別對兩個 sub-list 做 list_merge_sort,最後經過 list_merge 合併至 sorted 中。此時資料存在以 sorted 為 head 的 list 中,故需要再將 list 存回至 q->list 內。

此程式有個錯誤在 list_merge() 的兩個判斷式上,應為在一邊 list 為空時,添加另一邊的 list 至 head 中:


    if (list_empty(lhs)) {
-       list_splice_tail(lhs, head);
+       list_splice_tail(rhs, head);
        return;
    }
    if (list_empty(rhs)) {
-       list_splice_tail(rhs, head);
+       list_splice_tail(lhs, head);
        return;
    }

但此問題並不影響程式輸出,分析程式碼,可以發現 list_merge(&left.list, &q->list, &sorted);left.listq->list 為 empty 時會出錯,但是前方的 list_cut_position(&left.list, &q->list, &get_middle(&q->list)->list) 為從中間之 node 分開,故可保證長度為 2 以上的 list 經過 list_cut_position 後能讓 left.listq->list 皆不為空,而長度為 1 的 list 則會在開頭的 list_is_singular(&q->list) 就被剔除,所以可以保證 lhsrhs 永遠不為 empty,表示上方錯誤的程式碼可以全部移除。

根據上述分析可以知道當 list_merge_sort 傳入參數中的 list 為長度 1 以上時可以順利運行,但當長度為 0 時並沒有做適當的處理,應要在檢查 singular 時一併檢查是否為空,否則會導致 segmation fault:

void list_merge_sort(queue_t *q)
{
    if (list_empty(&q->list) || list_is_singular(&q->list))
        return;
    ...        
}

說明 Linux 核心的 lib/list_sort.c 最佳化手法

TODO: 透過 git 取得 linux 核心的原始程式碼,然後執行 git log lib/list_sort.c,觀察修改記錄。例如:

commit b5c56e0cdd62979dd538e5363b06be5bdf735a09
Author: George Spelvin lkml@sdf.org
Date: Tue May 14 15:43:02 2019 -0700
lib/list_sort: optimize number of calls to comparison function

commit 043b3f7b6388fca6be86ca82979f66c5723a0d10
Author: George Spelvin lkml@sdf.org
Date: Tue May 14 15:42:58 2019 -0700
lib/list_sort: simplify and remove MAX_LIST_LENGTH_BITS

:notes: jserv

commit message
​​​​lib/list_sort: optimize number of calls to comparison function
​​​​
​​​​CONFIG_RETPOLINE has severely degraded indirect function call
​​​​performance, so it's worth putting some effort into reducing the number
​​​​of times cmp() is called.
​​​​
​​​​This patch avoids badly unbalanced merges on unlucky input sizes.  It
​​​​slightly increases the code size, but saves an average of 0.2*n calls to
​​​​cmp().
​​​​
​​​​x86-64 code size 739 -> 803 bytes (+64)
​​​​
​​​​Unfortunately, there's not a lot of low-hanging fruit in a merge sort;
​​​​it already performs only n*log2(n) - K*n + O(1) compares.  The leading
​​​​coefficient is already at the theoretical limit (log2(n!) corresponds to
​​​​K=1.4427), so we're fighting over the linear term, and the best
​​​​mergesort can do is K=1.2645, achieved when n is a power of 2.
​​​​
​​​​The differences between mergesort variants appear when n is *not* a
​​​​power of 2; K is a function of the fractional part of log2(n).  Top-down
​​​​mergesort does best of all, achieving a minimum K=1.2408, and an average
​​​​(over all sizes) K=1.248.  However, that requires knowing the number of
​​​​entries to be sorted ahead of time, and making a full pass over the
​​​​input to count it conflicts with a second performance goal, which is
​​​​cache blocking.
​​​​
​​​​Obviously, we have to read the entire list into L1 cache at some point,
​​​​and performance is best if it fits.  But if it doesn't fit, each full
​​​​pass over the input causes a cache miss per element, which is
​​​​undesirable.
​​​​
​​​​While textbooks explain bottom-up mergesort as a succession of merging
​​​​passes, practical implementations do merging in depth-first order: as
​​​​soon as two lists of the same size are available, they are merged.  This
​​​​allows as many merge passes as possible to fit into L1; only the final
​​​​few merges force cache misses.
​​​​
​​​​This cache-friendly depth-first merge order depends on us merging the
​​​​beginning of the input as much as possible before we've even seen the
​​​​end of the input (and thus know its size).
​​​​
​​​​The simple eager merge pattern causes bad performance when n is just
​​​​over a power of 2.  If n=1028, the final merge is between 1024- and
​​​​4-element lists, which is wasteful of comparisons.  (This is actually
​​​​worse on average than n=1025, because a 1024:1 merge will, on average,
​​​​end after 512 compares, while 1024:4 will walk 4/5 of the list.)
​​​​
​​​​Because of this, bottom-up mergesort achieves K < 0.5 for such sizes,
​​​​and has an average (over all sizes) K of around 1.  (My experiments show
​​​​K=1.01, while theory predicts K=0.965.)
​​​​
​​​​There are "worst-case optimal" variants of bottom-up mergesort which
​​​​avoid this bad performance, but the algorithms given in the literature,
​​​​such as queue-mergesort and boustrodephonic mergesort, depend on the
​​​​breadth-first multi-pass structure that we are trying to avoid.
​​​​
​​​​This implementation is as eager as possible while ensuring that all
​​​​merge passes are at worst 1:2 unbalanced.  This achieves the same
​​​​average K=1.207 as queue-mergesort, which is 0.2*n better then
​​​​bottom-up, and only 0.04*n behind top-down mergesort.
​​​​
​​​​Specifically, defers merging two lists of size 2^k until it is known
​​​​that there are 2^k additional inputs following.  This ensures that the
​​​​final uneven merges triggered by reaching the end of the input will be
​​​​at worst 2:1.  This will avoid cache misses as long as 3*2^k elements
​​​​fit into the cache.
​​​​
​​​​(I confess to being more than a little bit proud of how clean this code
​​​​turned out.  It took a lot of thinking, but the resultant inner loop is
​​​​very simple and efficient.)

list_sort 也效利用了 doubly linked list 中的 prevnext 指標,達成可以以類似 dfs 的順序做 botton up merge,而非傳統 level order 的 merge,這樣便能對 cache 有叫好的 locality,因為操作的節點皆為剛剛才使用過的節點。

但只是單純的 dfs 會產生 unbalanced 的問題,如 1028 個節點時,merge sort 做到結束時會產生 1024 個已排序的 linked list 與 4 個未排序的 linked list,此時平均需要再遍歷

45 個 list 才能完成排序。

所以 list_sort 提出一種 worst-case optimal 的方法來避免這種不平衡的 merge,就是保證排序長度為

2k,2k1,...,21 皆出現在當前處理完的 linked list 部份中,如果要產生
2k+1
的 linked list 的話,便需要兩個長度為
2k
的 linked list,與長度為
2k1,...,21
的 linked list 各一個,此時 merge 完後的 linked list 長度各為
2k+1,2k1,...,21
,如果這時已經到 linked list 的結尾的話,只需要將最後一個節點從
21
merge 到
2k+1
即完成 merge sort,且可以發現中間 merge 的數量比例最多只會為 2:1(長度為
21
1
2k+1
2k
做 merge 的時候)。

要實作上述特徵只需要使用一個變數 count 紀錄當前處理的長度即可:

count merge range length
00000 X X X
00001 X X X
00010 O [0,1] 2
00011 X X X
00100 O [2,3] 2
00101 O [0,3] 4
00110 O [4,5] 2
00111 X X X
01000 O [6,7] 2
01001 O [4,7] 4
01010 O [8,9] 2
01011 O [0,7] 8
01100 O [10,11] 2
01101 O [8,11] 4
01110 O [12,13] 2
01111 X X X
10000 O [14,15] 2

count 為當前處理的節點數,處理完的節點會被放置到 pending 中,merge 為此次迴圈是否需要 merge,可以發現當為 2 的冪時不需要 merge,range 為 merge 時的範圍,length 為此範圍的長度,可以看到長度的順序符合上述的分析,便能處理 unbalance 的問題。

list_sort 是利用 prev 成員連接每個不同的 sorted list,而使用 next 成員連接與自身節點排序的 list。如下圖:







G



a3

14

 



b3

15

 



a3:f->b3:data



next



a2

12

 



a3:ref->a2:data


prev



b2

13

 



a2:f->b2:data



next



a1

8

 



a2:ref->a1:data


prev



b1

9

 



a1:f->b1:data



next



a

0

 



a1:ref->a:data


prev



c1

10

 



b1:f->c1:data



next



d1

11

 



c1:f->d1:data



next



b

1

 



a:f->b:data



next



c

2

 



b:f->c:data



next



d

3

 



c:f->d:data



next



e

4

 



d:f->e:data



next



f

5

 



e:f->f:data



next



g

6

 



f:f->g:data



next



h

7

 



g:f->h:data



next



與 lib/list_sort.c 效能比較

  • 效能比較撰寫在 quiz1

測驗 2

X = ?

  • (b) 2

Y = ?

  • (d) 4

Z = ?

  • (h) 8

運作原理

uint16_t func(uint16_t N) {
    /* change all right side bits to 1 */
    N |= N >> 1;
    N |= N >> 2;
    N |= N >> 4;
    N |= N >> 8;

    return (N + 1) >> 1;
}

此函式為將 N 無條件捨去至 2 的倍數,此技巧是將最高位元為 1 的位置到 LSB 皆設為 1,再經過 N + 1 便會進位,故再往右移 1 位元即可。

原本懷疑當輸入 N

215 時導致 overflow 並回傳 0,實際測試後卻發現能順利回傳
215
。根據 C99 規格書中提到:

If an int can represent all values of the original type, the value is converted to an int; otherwise, it is converted to an unsigned int. These are called the integer promotions.
All other types are unchanged by the integer promotions.

此為 integer promotion 的行為,在 (N + 1) >> 1 的表達式中,N 會被轉型為 integer,而在回傳時再轉型為 uint16_t,故不會發生 overflow。

The Linux Kernel API

  • is_power_of_2

    ​​​​/**
    ​​​​ * is_power_of_2() - check if a value is a power of two
    ​​​​ * @n: the value to check
    ​​​​ *
    ​​​​ * Determine whether some value is a power of two, where zero is
    ​​​​ * *not* considered a power of two.
    ​​​​ * Return: true if @n is a power of 2, otherwise false.
    ​​​​ */
    ​​​​static inline __attribute__((const))
    ​​​​bool is_power_of_2(unsigned long n)
    ​​​​{
    ​​​​    return (n != 0 && ((n & (n - 1)) == 0));
    ​​​​}
    

    當數字 n 為 power of 2 時,代表其位元表示法中只有一位數為 1,故 n - 1 為從該位元 - 1 到 LSB 的數字皆為 1,此時與原先的 n 做 and 運算便能決定是否為 power of 2,如果數字 n 不為 power of 2 時,n - 1 必定不會改變最靠近 MSB 且為 1 的位元,故 and 運算後該位元仍為 1,此時便會回傳 false。

  • __roundup_pow_of_two & __rounddown_pow_of_two

    ​​​​/**
    ​​​​ * __roundup_pow_of_two() - round up to nearest power of two
    ​​​​ * @n: value to round up
    ​​​​ */
    ​​​​static inline __attribute__((const))
    ​​​​unsigned long __roundup_pow_of_two(unsigned long n)
    ​​​​{
    ​​​​    return 1UL << fls_long(n - 1);
    ​​​​}
    ​​​​
    ​​​​/**
    ​​​​ * __rounddown_pow_of_two() - round down to nearest power of two
    ​​​​ * @n: value to round down
    ​​​​ */
    ​​​​static inline __attribute__((const))
    ​​​​unsigned long __rounddown_pow_of_two(unsigned long n)
    ​​​​{
    ​​​​    return 1UL << (fls_long(n) - 1);
    ​​​​}
    ​​​​
    ​​​​static inline unsigned fls_long(unsigned long l)
    ​​​​{
    ​​​​    if (sizeof(l) == 4)
    ​​​​        return fls(l);
    ​​​​    return fls64(l);
    ​​​​}
    

    fls 指的是 find last set bit (MSB) in word,如 fls(0) = 0, fls(1) = 1, fls(0x80000000) = 32。

    n 不為 power of 2 時,n - 1 不會影響最靠近 MSB 且為 1 的位元,故 fls_long(n - 1) 的值都一樣為該位元 + 1,__roundup_pow_of_two 便能做到 roundup 的效果。而當 n 為 power of 2 時,n - 1 會向最靠近 MSB 且為 1 的位元借位,導致 fls 回傳的值少 1,讓 roundup 傳回原本的值。

    __rounddown_pow_of_two

    2k2k+11 內要回傳相同的值,而這與 fls_long 的範圍相同,故直接帶入 fls_long(n) - 1 即為 rounddown 後最高位元為 1 的位置。

    flsfls64 在核心中根據硬體不同而有不同的實作,其內部為 gcc extension 的 __builtin_clz 或是組合語言實作。某些硬體會提供 BSRLZCNTCLZ 等指令來加速運算,linux kernel 及 __builtin_clz 皆會根據硬體來選擇適當的指令,如此便能提供良好的移植性與效能兼顧。


測驗 3

RRR = ?

  • (a) data <<= read_lhs

DDD = ?

  • (b) mask |= write_mask[write_lhs + bitsize]

運作原理

while (count > 0) { uint8_t data = *source++; size_t bitsize = (count > 8) ? 8 : count; if (read_lhs > 0) { data <<= read_lhs; if (bitsize > read_rhs) data |= (*source >> read_rhs); } if (bitsize < 8) data &= read_mask[bitsize]; uint8_t original = *dest; uint8_t mask = read_mask[write_lhs]; if (bitsize > write_rhs) { /* Cross multiple bytes */ *dest++ = (original & mask) | (data >> write_lhs); original = *dest & write_mask[bitsize - write_rhs]; *dest = original | (data << write_rhs); } else { // Since write_lhs + bitsize is never >= 8, no out-of-bound access. mask |= write_mask[write_lhs + bitsize]; *dest++ = (original & mask) | (data >> write_lhs); } count -= bitsize; }

根據題目敘述:

  • _read 為從 MSB 數來的第 _read 個位元開始讀取
  • _write 為從 MSB 數來的第 _write 個位元寫入

故可推測出 _read_lhs 表示 left hand side (MSB) 開始算, _read_rhs 表示 right hand side (MSB) 開始算:_write_lhs_write_rhs 同理。

while 迴圈會以 byte 為單位操作,當剩餘資料不足一個 byte 時,bitsize 會被設置為剩餘 bit 的數量。uint8_t data 為要傳遞的資料,要注意要傳遞的所有 bit 皆會靠左 (MSB) 對齊。第 4 行的 if (read_lhs > 0) 檢查 read 的起始位置是否為 0,如果不為 0 的話,則需要把資料靠左對齊。而第 9 行則是當要讀取的 bit 數 bitsize_read_rhs 小時,意味著 bitsize > (8 - _read_lhs),需要讀取到下一個 byte 的 bits,透過 data |= (*source >> read_rhs); 將剩下要讀取的 bits 移到 data 中的 LSB。







%0



a

1+

0+

1*

1*

0*

0*

0*

1*



b

1*

1*

0*

0*

0*

1*

0+

0+



a->b


  << 2



|
|



c

0*

1+

1+

1+

0+

1+

1+

0+



d

0+

0+

0+

0+

0+

0+

0*

1+



c->d


  >> 6



*Source
*Source




*(Source + 1)
*(Source + 1)




=
=



e

1*

1*

0*

0*

0*

1*

0*

1+



data
data



e->data





_read_lhs = 2
_read_lhs = 2



_read_rhs = 6
_read_rhs = 6




bit_size = 7
bit_size = 7





  • * 代表要讀取的 bits
  • + 代表不需讀取的 bits

可以發現要讀取的 bits 會被存放到 data 的高位元處。第 10 行則是將 data 只取bit_size 範圍,其他範圍的 bits 則被設置為 0,如圖上data LSB 內的 1 會被設置為 0,防止 write 的時候讀取到。

mask 的作用是將沒有寫入的 bits 保留,防止被 data 複寫。第 15 行跟第 20 行的判斷式用來檢查要寫入的 bit 是否橫跨兩個 byte。

if (bitsize > write_rhs) { /* Cross multiple bytes */ *dest++ = (original & mask) | (data >> write_lhs); original = *dest & write_mask[bitsize - write_rhs]; *dest = original | (data << write_rhs); }

當橫跨兩個 byte 時,original & mask 為紀錄不需要改動的 bit,而 data >> write_lhs 則是將要寫入的 bit 移動到 write_lhs 為開頭的位置,再做 or 運算即是這個 byte 的新資料。write_mask[bitsize - write_rhs] 為下一個 byte 需要寫入的 bits 位置,將還未寫進的 bits (data << write_lhs) 寫入到下個 byte 中。

else { // Since write_lhs + bitsize is never >= 8, no out-of-bound access. mask |= write_mask[write_lhs + bitsize]; *dest++ = (original & mask) | (data >> write_lhs); }

而不橫跨兩個 byte 時,mask |= write_mask[write_lhs + bitsize]; 用來紀錄不須寫入的 bits 位置,此時 mask 在要寫入的 bit 位置為 0,其他位置與 dest 原本的 bit 相同。第 23 行則與上方相同,將 data 位移到寫入的位置並與原本的資料做 or 運算。







%0



a

0+

1+

0+

1+

1+

0*

1*

1*



b

0+

1+

0+

1+

1+

0*

0*

0*



a->b


  & mask



mask
mask




f

1

1

1

1

1

1

0

0



|
|



c

1*

1*

0*

0*

0*

1*

0*

1+



d

0+

0+

0+

0+

0+

1*

1*

0*



c->d


  >> _write_lhs



*Original
*Original




data
data




=
=



e

0+

1+

0+

1+

1+

1*

1*

0*



_write_lhs = 5
_write_lhs = 5



_write_rhs = 3
_write_rhs = 3




bit_size = 7
bit_size = 7











%0



a

1*

0*

1*

1*

0+

1+

0+

0+



b

0*

0*

0*

0*

0+

1+

0+

0+



a->b


  & write_mask[bitsize - write_rhs]



mask
mask




f

1

1

1

1

1

1

0

0



|
|



c

1*

1*

0*

0*

0*

1*

0*

1+



d

0*

0*

1*

0*

0+

0+

0+

0+



c->d


  << _write_rhs



*(Original + 1)
*(Original + 1)




data
data




=
=



e

0*

0*

1*

0*

0+

1+

0+

0+



_write_lhs = 5
_write_lhs = 5



_write_rhs = 3
_write_rhs = 3




bit_size = 7
bit_size = 7





  • * 代表要寫入的 bits
  • + 代表不需寫入的 bits

可以發現 source 第 2 個位置後的 7 個 bits 1100010 被寫入到 dest 的第 5 個 bits 後方。

Linux 核心原始程式碼中逐 bit 資料複製

static inline void bitmap_copy(unsigned long *dst, const unsigned long *src,
    unsigned int nbits)
{
    unsigned int len = BITS_TO_LONGS(nbits) * sizeof(unsigned long);
    memcpy(dst, src, len);
}

/*
 * Copy bitmap and clear tail bits in last word.
 */
static inline void bitmap_copy_clear_tail(unsigned long *dst,
    const unsigned long *src, unsigned int nbits)
{
    bitmap_copy(dst, src, nbits);
    if (nbits % BITS_PER_LONG)
        dst[nbits / BITS_PER_LONG] &= BITMAP_LAST_WORD_MASK(nbits);
}

目前只找到 linux/include/linux/bitmap.h 中的 bitmap_copybitmap_copy_clear_tail,但此兩函式皆不會保留 byte 內沒有複製到的 bit。


測驗 4

CCC = ?

  • (c) s->hash_size

程式原理

enum {
    CSTR_PERMANENT = 1,
    CSTR_INTERNING = 2,
    CSTR_ONSTACK = 4,
};

string interning 有三種類型,分別為 CSTR_PREMANENT (TODO)、CSTR_INTERNING 用來處理 Literal 的常數字串、CSTR_ONSTACK 處理以 CSTR_BUFFER 產生出來且長度較小的字串變數。

#define CSTR_BUFFER(var)                                                      \
    char var##_cstring[CSTR_STACK_SIZE] = {0};                                \
    struct __cstr_data var##_cstr_data = {var##_cstring, 0, CSTR_ONSTACK, 0}; \
    cstr_buffer var;                                                          \
    var->str = &var##_cstr_data;

變數類型的 string 透過 CSTR_BUFFER 宣告,先建立一個 size 為 CSTR_STACK_SIZE (128) 空間的 array of char 用來存放字串,並將 struct __cstr_data 內的 cstr 欄位指向該空間,且使用 cstr_buffer 內的 str 空間指向這個 structure。

#define CSTR_LITERAL(var, cstr)                                               \
    static cstring var = NULL;                                                \
    if (!var) {                                                               \
        cstring tmp = cstr_clone("" cstr, (sizeof(cstr) / sizeof(char)) - 1); \
        if (tmp->type == 0) {                                                 \
            tmp->type = CSTR_PERMANENT;                                       \
            tmp->ref = 0;                                                     \
        }                                                                     \
        if (!__sync_bool_compare_and_swap(&var, NULL, tmp)) {                 \
            if (tmp->type == CSTR_PERMANENT)                                  \
                free(tmp);                                                    \
        }                                                                     \
    }

常數類型的 string 透過 CSTR_LITERAL 宣告,透過 cstr_clone 建立 intern 字串 (詳見下方 cstr_clone 函式),而當 malloc 失敗 (空間不足) 或是字串過長 (超過 CSTR_INTERNING_SIZE) 時,tmp->type 被設置為 0,故會將 tmp->type 重新設置為 CSTR_PERMANENT

下方的 if 判斷式推測是為了處理多執行緒的問題,當兩個執行緒同時運行 CSTR_LITERAL 時,兩者皆進入到 if (!var) 判斷式內部,且剛好此字串長度大於 CSTR_INTERNING_SIZE 而配置兩個空間,故第一個執行緒在下方的 if 中運行 __sync_bool_compare_and_swap(&var, NULL, tmp) 時回傳 true,而之後的執行緒在運行此行時皆會回傳 false,便能刪除多餘的空間。

bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, )

These builtins perform an atomic compare and swap. That is, if the current value of *ptr is oldval, then write newval into *ptr.
The “bool” version returns true if the comparison is successful and newval was written. The “val” version returns the contents of *ptr before the operation.

而當 type 不為 0 時就不用移除是因為在 cstr_interning 內部,已經有 CSTR_LOCK()CSTR_UNLOCK() 防止同時建立兩個相同 string 的機制了。

cstring cstr_cat(cstr_buffer sb, const char *str) { cstring s = sb->str; if (s->type == CSTR_ONSTACK) { int i = s->hash_size; while (i < CSTR_STACK_SIZE - 1) { s->cstr[i] = *str; if (*str == 0) return s; ++s->hash_size; ++str; ++i; } s->cstr[i] = 0; } cstring tmp = s; sb->str = cstr_cat2(tmp->cstr, str); cstr_release(tmp); return sb->str; } static cstring cstr_cat2(const char *a, const char *b) { size_t sa = strlen(a), sb = strlen(b); if (sa + sb < CSTR_INTERNING_SIZE) { char tmp[CSTR_INTERNING_SIZE]; memcpy(tmp, a, sa); memcpy(tmp + sa, b, sb); tmp[sa + sb] = 0; return cstr_interning(tmp, sa + sb, hash_blob(tmp, sa + sb)); } cstring p = xalloc(sizeof(struct __cstr_data) + sa + sb + 1); if (!p) return NULL; char *ptr = (char *) (p + 1); p->cstr = ptr; p->type = 0; p->ref = 1; memcpy(ptr, a, sa); memcpy(ptr + sa, b, sb); ptr[sa + sb] = 0; p->hash_size = 0; return p; }

透過 cstr_cat 將字串加入到此 string buffer 當中,分成三種情況:

  1. 字串長度小於 CSTR_STACK_SIZE (128)

    透過 line 4 ~ line 15 來處理,將字串放入 sb->str->cstr 中,並以 sb->str->hash_size 紀錄當前字串長度。

  2. 字串長度大於等於 CStR_STACK_SIZE 且小於 CSTR_INTERNING_SIZE (32)

    進入到 cstr_cat2 函式,透過前半段 line 25 ~ line 31 來處理,此時該字串會被放入 struct __cstr_node 內的 buffer 當中,並把該結構中的成員 cstring str 回傳出來,而該結構也會放入內部的 poolhash 當中,並將回傳的 str->type 改為 CSTR_INTERNING,且此時 str->hash_size 的值用來紀錄此字串的 hash 值。(詳細見下方 cstr_interning 函式)

  3. 字串長度大於等於 CSTR_INTERNING_SIZE

    此時會將之串放入到 struct __cstr_data 的尾端,並將 cstringtype 設定為 0,ref 設定為 1。

struct __cstr_node {
    char buffer[CSTR_INTERNING_SIZE];
    struct __cstr_data str;
    struct __cstr_node *next;
};

struct __cstr_pool {
    struct __cstr_node node[INTERNING_POOL_SIZE];
};

struct __cstr_interning {
    int lock;
    int index;
    unsigned size;
    unsigned total;
    struct __cstr_node **hash;
    struct __cstr_pool *pool;
};

string interning 內部使用 struct __cstr_interning 來儲存資料,其成員作用如下:

  • lock 防止多執行續同時處理同樣字串造成錯誤
  • index 紀錄目前使用到 pool 中的第幾個位置
  • size 紀錄目前 hash 的容量 (capacity)
  • total 目前 intern 的字串數量
  • hash 所有存有 intern 字串的 struct __cstr_node 以此 hash 連結,其內部的 struct __cstr_nodepool 中的 node
  • pool 為 memory pool,預先分配足夠的 struct __cstr_node 便不需要頻繁的 malloc,且能讓記憶體更連續防止碎片化

total 增加的位置應該要在 interning 函式內,而不是在 cstr_interning 函式內,假如字串已經存在在 intern hash table 內,此時 total 不應該增加,否則會造成實際上 hash 還沒到達限制前就被重新分配空間。應該要放在 interning 內並在當從 pool 內取出新 node 時才增加。

static cstring interning(struct __cstr_interning *si, const char *cstr, size_t sz, uint32_t hash) { if (!si->hash) return NULL; int index = (int) (hash & (si->size - 1)); struct __cstr_node *n = si->hash[index]; while (n) { if (n->str.hash_size == hash) { if (!strcmp(n->str.cstr, cstr)) return &n->str; } n = n->next; } // 80% (4/5) threshold if (si->total * 5 >= si->size * 4) return NULL; if (!si->pool) { si->pool = xalloc(sizeof(struct __cstr_pool)); si->index = 0; } n = &si->pool->node[si->index++]; memcpy(n->buffer, cstr, sz); n->buffer[sz] = 0; cstring cs = &n->str; cs->cstr = n->buffer; cs->hash_size = hash; cs->type = CSTR_INTERNING; cs->ref = 0; n->next = si->hash[index]; si->hash[index] = n; return cs; }

interning 用來處理需要 intern 的字串 (CSTR_LITERAL 產生的字串或 local variable 太長的字串)。

  • line 6 ~ line 7

    為第一次進入 interning 時,因 hash table 尚未分配空間,經過此行回傳並透過 expand 函式重新分配 hash table 空間後再進入。

  • line 9 ~ line 17

    計算 hash 值並在 hash table 中搜尋,如果已經存在則不需要再從 pool 中取 node 來用,直接將 cstring 取出回傳即可。

  • line 19 ~ line 20

    當 hash table 快不足時,回傳 NULL 並經過 expand 重新分配空間後再進入。

  • line 21 ~ line 24

    為第一次進入 interning 時,因 pool 尚未分配空間,經過此行初始化。

  • line 25 ~ line 27

    因為目前 hash table 中沒有紀錄此字串,故從 pool 中取一個新的 __cstr_node 的位置 n,將字串複製到該結構內的 buffer 中。

  • line 29 ~ line 33

    主要要回傳的 cstring 為此 node 內的 str 成員位置,而 cstring 的字串則指向此 node 的 buffer,並設置對應的 type 及 hash 值方便之後計算。

  • line 35 ~ line 36

    將這個新的 node 放到 hash 中。

static void expand(struct __cstr_interning *si) { unsigned new_size = si->size * 2; if (new_size < HASH_START_SIZE) new_size = HASH_START_SIZE; struct __cstr_node **new_hash = xalloc(sizeof(struct __cstr_node *) * new_size); memset(new_hash, 0, sizeof(struct __cstr_node *) * new_size); for (unsigned i = 0; i < si->size; ++i) { struct __cstr_node *node = si->hash[i]; while (node) { struct __cstr_node *tmp = node->next; insert_node(new_hash, new_size, node); node = tmp; } } free(si->hash); si->hash = new_hash; si->size = new_size; }

此 hash 以 Dynamic Array 實作,故當空間不足時,重新分配空間並把原本的 hash table 複製到新的 hash table 上。這邊空間不足是以 80% 為 threshold,因為當 hash 的物件越多,hash table 的時間複雜度會漸漸趨近於

O(n),故等到 hash table 滿時才做 expand 會導致 collision 過多。

static inline uint32_t hash_blob(const char *buffer, size_t len) { const uint8_t *ptr = (const uint8_t *) buffer; size_t h = len; size_t step = (len >> 5) + 1; for (size_t i = len; i >= step; i -= step) h = h ^ ((h << 5) + (h >> 2) + ptr[i - 1]); return h == 0 ? 1 : h; }

利用此 function 計算 hash 值,buffer 為目標字串,len 為其長度。

cstring cstr_clone(const char *cstr, size_t sz) { if (sz < CSTR_INTERNING_SIZE) return cstr_interning(cstr, sz, hash_blob(cstr, sz)); cstring p = xalloc(sizeof(struct __cstr_data) + sz + 1); if (!p) return NULL; void *ptr = (void *) (p + 1); p->cstr = ptr; p->type = 0; p->ref = 1; memcpy(ptr, cstr, sz); ((char *) ptr)[sz] = 0; p->hash_size = 0; return p; }

cstr_clone 為給定一字串,回傳其 cstring,當 cstr_grab 對 stack 上取字串或是創建 Literal 的 CSTR_LITERAL 會呼叫。其內容與 cstr_cat 的 2, 3 種情況一樣。

int cstr_equal(cstring a, cstring b) { if (a == b) return 1; if ((a->type == CSTR_INTERNING) && (b->type == CSTR_INTERNING)) return 0; if ((a->type == CSTR_ONSTACK) && (b->type == CSTR_ONSTACK)) { if (a->hash_size != b->hash_size) return 0; return memcmp(a->cstr, b->cstr, a->hash_size) == 0; } uint32_t hasha = cstr_hash(a); uint32_t hashb = cstr_hash(b); if (hasha != hashb) return 0; return !strcmp(a->cstr, b->cstr); }

比較兩個字串是否相同,如果兩個的 type 皆為 CSTR_INTERNING 則回傳 false,因為每個相同 intern 字串只會存在一個。當兩個 type 皆為 CSTR_ONSTACK 時,檢查其大小跟內容是否相同。line 12 ~ line 13 為兩個 type 不同,透過 cstr_hash 計算其字串 hash 值是否相同。而 line 16 則是當兩個字串 hash 相同,但有可能字串內容不同,故還需對照字串內容。

static size_t cstr_hash(cstring s) { if (s->type == CSTR_ONSTACK) return hash_blob(s->cstr, s->hash_size); if (s->hash_size == 0) s->hash_size = hash_blob(s->cstr, strlen(s->cstr)); return s->hash_size; }

依據 type 計算其 hash 值,當 s->type == CSTR_ONSTACK 時表示其存在 stack 中,s->hash_size 為紀錄目前字串長度而非 hash 值,透過 hash_blob 計算並回傳。 s->hash_size == 0 時,表示其字串過長不存在 intern hash table 中,也是額外計算並回傳。最後一種可能為字串存在 intern hash table 中,其 s->hash_size 就是 hash 值。

cstring cstr_grab(cstring s) { if (s->type & (CSTR_PERMANENT | CSTR_INTERNING)) return s; if (s->type == CSTR_ONSTACK) return cstr_clone(s->cstr, s->hash_size); if (s->ref == 0) s->type = CSTR_PERMANENT; else __sync_add_and_fetch(&s->ref, 1); return s; }

將在 stack 上的字串放入 intern hash table 中,並回傳對應的 cstring。第 10 行為當字串是過長字串時,cstr_grab 會使其 reference count + 1。

第 8 行目前看起來好像沒作用,會使 ref 為 0 只有兩種可能:當字串太長時,經過 CSTR_LITERAL() 建立出來的 cstring,但此時其 typeCSTR_PERMANENT,故會在第 4 行就回傳;另一種可能是 cstr_release 太多次導致 reference count 為 0,但因為 cstr_release 內是使用 __sync_sub_and_fetch 來處理數值,當 s->ref 為 0 時,s 就會被釋放,所以在 dereference s 時就會出錯。

void cstr_release(cstring s) { if (s->type || !s->ref) return; if (__sync_sub_and_fetch(&s->ref, 1) == 0) free(s); }

當 type 不為 0 或 reference count 不為 0 時,直接回傳,否則檢查 reference count 是否只剩一個物件要 release,是的話釋放其空間。

tags: linux2021