Try   HackMD

2021q1 Homework2 (quiz2)

contributed by < 93i7xo2 >

Source: 2021q1 第 2 週測驗題

To-do list

  • 測驗3
    • Linux 核心原始程式碼找出逐 bit 進行資料複製的程式碼,並解說相對應的情境
  • 測驗4
    • 上述程式碼使用到 gcc atomics builtins,請透過 POSIX Thread 在 GNU/Linux 驗證多執行緒的環境中,string interning 能否符合預期地運作?若不能,請提出解決方案
  • 測驗2
    • 研讀 slab allocator,探索 Linux 核心的 slab 實作,找出運用 power-of-2 的程式碼和解讀
  • 共筆

測驗 1

解析container_of

#define container_of(ptr, type, member) \ __extension__({ \ const __typeof__(((type *) 0)->member) *__pmember = (ptr); \ (type *) ((char *) __pmember - offsetof(type, member)); \ })

首先在你所不知道的C語言:技巧篇提到__extension__是用來防止 gcc 產生警告的修飾字,在這裡用來防止使用到非 ANSI C 標準語句 ({}) (braced-group within expression)時出現警告。

({})是 gcc extension 的一項,視多個 statement 組成的 compound statement 為 expression,同時最後一個 expression 的值會視為整體的值。作用是讓 marco 更加安全,例如將會造成 double evaluation 的 max()

#define max(a,b) ((a) > (b) ? (a) : (b))

改寫成

#define max(a,b) \
   ({ typeof (a) _a = (a), _b = (b); _a > _b ? _a : _b; }) 

再來參考 How to understand "((size_t) &((TYPE *)0)->MEMBER)"? 的描述,第 3 行可分解為以下步驟

1.                  (type *) 0)
2.                 ((type *) 0)->member)
3.      __typeof__(((type *) 0)->member)
4.const __typeof__(((type *) 0)->member) *__pmember = (ptr)
  1. 在地址 0 上有一型態為 type 的結構
  2. 取得此結構的成員 memeber
  3. 取得結構成員 member 的型態
  4. __pmember 是指向 memberconst pointer,其值為 ptr。可以看到 0 並沒有任何作用,可帶入任意值。

第4行 (type *) ((char *) __pmember - offsetof(type, member)); 用意在取得包覆 __pmemberstruct 起始位置,因此需要 offsetof(type, member) 來取得 __pmember 與該 struct 的相對偏移量。

接著仿效 A simple example with gcc and objdump 透過 objdumpgcc -g 觀察編譯後產生的 object file

gcc -c -g -O0 -std=c99 -o ./main.o ./main.c
objdump -d -M intel -S main.o
#include <stdio.h>
#include <stdlib.h>
#include "main.h"

typedef struct _list_head {
    int value;
    struct _list_head *prev, *next;
} list_head;

int main() {
    list_head *obj = (list_head*) malloc(sizeof(list_head));
    list_head **nptr = &(obj->next);
    list_head *cptr = container_of(nptr, list_head, next);
    printf("%p\n", cptr);
    free(obj);
    return 0;
}
int main() {
   0:   f3 0f 1e fa             endbr64 
   4:   55                      push   rbp
   5:   48 89 e5                mov    rbp,rsp
   8:   48 83 ec 20             sub    rsp,0x20
    list_head *obj = (list_head*) malloc(sizeof(list_head));
   c:   bf 18 00 00 00          mov    edi,0x18
  11:   e8 00 00 00 00          call   16 <main+0x16>
  16:   48 89 45 e0             mov    QWORD PTR [rbp-0x20],rax
    list_head **nptr = &(obj->next);
  1a:   48 8b 45 e0             mov    rax,QWORD PTR [rbp-0x20]
  1e:   48 83 c0 10             add    rax,0x10
  22:   48 89 45 e8             mov    QWORD PTR [rbp-0x18],rax // save rax to [rbp-0x18]
    list_head *cptr = container_of(nptr, list_head, next);
  26:   48 8b 45 e8             mov    rax,QWORD PTR [rbp-0x18] // load [rbp-0x18] to rax
  2a:   48 89 45 f0             mov    QWORD PTR [rbp-0x10],rax // save rax to [rbp-0x10]
  2e:   48 8b 45 f0             mov    rax,QWORD PTR [rbp-0x10] // load [rbp-0x10]
  32:   48 83 e8 10             sub    rax,0x10                 // rax = rax - 0x10
  36:   48 89 45 f8             mov    QWORD PTR [rbp-0x8],rax  // save rax to [rbp-0x8]

26~2a 行:將 nptr 複製到 [rbp-0x10]
2e~32 行:使 [rbp-0x10]0x10 相減,對應 (type *) ((char *) __pmember - offsetof(type, member));,而 0x10 恰好是 4 byte integer + 8 byte pointer + 4 byte padding 的大小。

其中有許多冗餘片段,試著開啟最佳化 -O1 再編譯

list_head **nptr = &(obj->next);
list_head *cptr = container_of(nptr, list_head, next);
printf("%p\n", cptr);
free(obj);
2b:   48 89 df                mov    rdi,rbx
2e:   e8 00 00 00 00          call   33 <main+0x33>

解析 list.h 精簡實作

Linux 鏈結串列 struct list_head 研究提到一般應用程式使用 linux/list.h 的方法是將要用到的函式複製至code中,而例題與 linux/list.h 顯然有些相異之處。

list_add_tail, list_del

static inline void list_add_tail(struct list_head *node, struct list_head *head)
{
    struct list_head *prev = head->prev;

    prev->next = node;
    node->next = head;
    node->prev = prev;
    head->prev = node;
}

static inline void list_del(struct list_head *node)
{
    struct list_head *next = node->next, *prev = node->prev;
    next->prev = prev; prev->next = next;
}

讓新加入的節點在串列末端,從串列前端移除節點,捨棄 linux/list.h 包裝 __list_add__list_del 的作法,意味整條串列操作起來類似 Queue。

  • list_empty, list_is_singular
static inline int list_empty(const struct list_head *head)
{
    return (head->next == head);
}

static inline int list_is_singular(const struct list_head *head)
{
    return (!list_empty(head) && head->prev == head->next);
}

用以判斷串列是否只有1~2個節點,從兩個函式猜測最前面的節點應是作為維護整條串列使用,本身不具有任何資料。

  • head: 維護整條串列的節點
  • entry: 實際存放資料串列的節點






example



head

 

head

 



entry1

 

entry1

 



head->entry1:f0





entry1:f0->head:f1





entry2

 

entry2

 



entry1:f1->entry2:f0





entry2:f0->entry1:f1





blank



entry2:f1->blank





blank->entry2:f1





list_splice_tail

static inline void list_splice_tail(struct list_head *list, struct list_head *head) { struct list_head *head_last = head->prev; struct list_head *list_first = list->next, *list_last = list->prev; if (list_empty(list)) return; head->prev = list_last; list_last->next = head; list_first->prev = head_last; head_last->next = list_first; }

插入非空串列 list 底下所有 entry 至 head 串列的最後方。

list_cut_position

static inline void list_cut_position(struct list_head *head_to, struct list_head *head_from, struct list_head *node) { struct list_head *head_from_first = head_from->next; if (list_empty(head_from)) return; if (head_from == node) { INIT_LIST_HEAD(head_to); return; } head_from->next = node->next; head_from->next->prev = head_from; head_to->prev = node; node->next = head_to; head_to->next = head_from_first; head_to->next->prev = head_to; }

目的是將從 head_from->next 直到 node 的部份串列移到未初始化的空串列 head_to 之後,成為新的串列,空串列定義為使用 list_empty 判斷返回 true。如果與 linux/list.hlist_cut_position 進行比較,86~92行實際上是將原先用來包裝的 __list_cut_position 寫進 list_cut_position

static inline void __list_cut_position(struct list_head *list,
		struct list_head *head, struct list_head *entry)
{
	struct list_head *new_first = entry->next;
	list->next = head->next;
	list->next->prev = list;
	list->prev = entry;
	entry->next = list;
	head->next = new_first;
	new_first->prev = head;
}

但缺少下面這段

if (list_is_singular(head) &&
    (head->next != entry && head != entry))
    return;

上方意思是不對空佇列或元素個數只有為1的情況進行處理。出自 commit 00e8a

* @entry: an entry within head, could be the head itself * and if so we won't cut the list

TODO: 對照閱讀 Linux 核心的 git log/blame,查閱程式碼註解和相關的編修紀錄。程式碼給你,就是希望來日你可以做出貢獻。

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
jserv

  • git blame include/linux/lish.h
    在 commit 00e8a 找到 list_cut_position__list_cut_position 被新增進來,之後沒有任何編輯紀錄。
    ​​​​list.h: add list_cut_position()
    
    ​​​​This adds list_cut_position() which lets you cut a list into
    ​​​​two lists given a pivot in the list.
    
    ​​​​Signed-off-by: Luis R. Rodriguez <lrodriguez@atheros.com>
    ​​​​Signed-off-by: John W. Linville <linville@tuxdriver.com>
    

解題思路

get_middle

static list_ele_t *get_middle(struct list_head *list) { struct list_head *fast = list->next, *slow; list_for_each (slow, list) { if (fast->next == list || fast->next->next == list) break; fast = fast->next->next; } return list_entry(slow, list_ele_t, list); }

先將巨集 list_for_each (slow, list) 展開,看到這段巨集幫我們以 list->(next) 初始化 slow:

for (slow = (list)->next; slow != (list); slow = slow->next)

接下來只要定義迴圈的終止條件,返回 slow 的容器即可。

list_merge_sort

void list_merge_sort(queue_t *q) { if (list_is_singular(&q->list)) return; queue_t left; struct list_head sorted; INIT_LIST_HEAD(&left.list); list_cut_position(&left.list, &q->list, &get_middle(&q->list)->list); list_merge_sort(&left); list_merge_sort(q); list_merge(&left.list, &q->list, &sorted); INIT_LIST_HEAD(&q->list); list_splice_tail(&sorted, &q->list); }

搭配 queue_t 一起看

typedef struct {
    list_ele_t *head; // 應是串列頭
    list_ele_t *tail; // 串列的尾
    size_t size; // 實作O(1)查找串列大小?
    struct list_head list; // head
} queue_t;

考慮到 list_cut_position 最後一個參數接受指向 list_head 的指標,必須先取得get_middle 返回容器的 list 成員再傳入,而所有新宣告的 list_head 都透過 INIT_LIST_HEAD 進行初始化。

list_merge

static void list_merge(struct list_head *lhs,
                       struct list_head *rhs,
                       struct list_head *head)
{
    INIT_LIST_HEAD(head);
    if (list_empty(lhs)) {
        list_splice_tail(lhs, head);
        return;
    }
    if (list_empty(rhs)) {
        list_splice_tail(rhs, head);
        return;
    }

    while (!list_empty(lhs) && !list_empty(rhs)) {
        char *lv = list_entry(lhs->next, list_ele_t, list)->value;
        char *rv = list_entry(rhs->next, list_ele_t, list)->value;
        struct list_head *tmp = strcmp(lv, rv) <= 0 ? lhs->next : rhs->next;
        list_del(tmp);
        list_add_tail(tmp, head);
    }
    list_splice_tail(list_empty(lhs) ? rhs : lhs, head);
}

延伸問題:

  • 研讀 Linux 核心的 lib/list_sort.c 原始程式碼,學習 sysprog21/linux-list 的手法,將 lib/list_sort.c 的實作抽離為可單獨執行 (standalone) 的使用層級應用程式,並設計效能評比的程式碼,說明 Linux 核心的 lib/list_sort.c 最佳化手法
  • 將你在 quiz1 提出的改進實作,和 Linux 核心的 lib/list_sort.c 進行效能比較,需要提出涵蓋足夠廣泛的 benchmark

測驗2

解題思路

uint16_t N = 0x8000 // 1000 0000 0000 0000
N |= N >> 1; // 1100 0000 0000 0000
N |= N >> 2; // 1111 0000 0000 0000
N |= N >> 4; // 1111 1111 0000 0000
N |= N >> 8; // 1111 1111 1111 1111

假設 N0x8000,上述程式碼的作用在於將 MSB 以倍數的方式複製至後面每一個位元,至於 MSB 後面是否有非 0 bit 並不重要;若 N 的首個非 0 bit 不在 MSB 位置上,可能提早完成傳遞。作用同註解所說:將右邊所有位元設 1。

toastcheng 發現 uint16_t 因為 integer promotion 的關係實際 Bitwise shift 運算元是擴展成 int 處理,遇到 uint16_t 最大值 65535 能正確處理,但遇到 uint32_t 則有 overflow 的風險,同時提出以 __builtin_clz 實作出相容於 uint32_t 的版本。

延伸問題:

  • The Linux Kernel API 頁面搜尋 "power of 2",可見 is_power_of_2,查閱 Linux 核心原始程式碼,舉例說明其作用和考量,特別是 round up/down power of 2
    • 特別留意 __roundup_pow_of_two__rounddown_pow_of_two 的實作機制
  • 研讀 slab allocator,探索 Linux 核心的 slab 實作,找出運用 power-of-2 的程式碼和解讀

測驗3

void bitcpy(void *_dest, /* Address of the buffer to write to */ size_t _write, /* Bit offset to start writing to */ const void *_src, /* Address of the buffer to read from */ size_t _read, /* Bit offset to start reading from */ size_t count) { size_t read_lhs = _read & 7; size_t read_rhs = 8 - read_lhs; const uint8_t *source = (const uint8_t *) _src + (_read / 8); size_t write_lhs = _write & 7; size_t write_rhs = 8 - write_lhs; uint8_t *dest = (uint8_t *) _dest + (_write / 8); static const uint8_t read_mask[] = { 0x00, /* == 0 00000000b */ 0x80, /* == 1 10000000b */ 0xC0, /* == 2 11000000b */ 0xE0, /* == 3 11100000b */ 0xF0, /* == 4 11110000b */ 0xF8, /* == 5 11111000b */ 0xFC, /* == 6 11111100b */ 0xFE, /* == 7 11111110b */ 0xFF /* == 8 11111111b */ }; static const uint8_t write_mask[] = { 0xFF, /* == 0 11111111b */ 0x7F, /* == 1 01111111b */ 0x3F, /* == 2 00111111b */ 0x1F, /* == 3 00011111b */ 0x0F, /* == 4 00001111b */ 0x07, /* == 5 00000111b */ 0x03, /* == 6 00000011b */ 0x01, /* == 7 00000001b */ 0x00 /* == 8 00000000b */ }; while (count > 0) { uint8_t data = *source++; size_t bitsize = (count > 8) ? 8 : count; if (read_lhs > 0) { data <<= read_lhs; if (bitsize > read_rhs) data |= (*source >> read_rhs); } if (bitsize < 8) data &= read_mask[bitsize]; uint8_t original = *dest; uint8_t mask = read_mask[write_lhs]; if (bitsize > write_rhs) { /* Cross multiple bytes */ *dest++ = (original & mask) | (data >> write_lhs); original = *dest & write_mask[bitsize - write_rhs]; *dest = original | (data << write_rhs); } else { // Since write_lhs + bitsize is never >= 8, no out-of-bound access. mask |= write_mask[write_lhs + bitsize]; *dest++ = (original & mask) | (data >> write_lhs); } count -= bitsize; } }

解題思路

以從 read_rhs 起始位置讀取 1 位元組的資料為例

  • bitsize=8
  • read_lhs=2
  • read_rhs=6
  • write_lhs=5
  • write_rhs=3

(Round 1) Read







%0






bit
bit



->bit





byte
byte



bit->byte





lhs/rhs
lhs/rhs



byte->lhs/rhs





byte0


7


6


5


4


3


2


1


0



byte1


7


6

5

4

3

2

1

0



byte2

7

6

5

4

3

2

1

0



t0

0

1

2

3

4

5

6

7



t0:head->byte0:port0





t1

8

9

10

11

12

13

14

15



t1:head->byte1:port0





t2

16

17

18

19

20

21

22

23



t2:head->byte2:port0





bitsize
bitsize



bitsize->byte0:port2





bitsize->byte0:port6





bitsize->byte1:port1





read_lhs

read_lhs



read_lhs->t0:s




(Round 1) Mask data







%0



       data
       data



       mask
       mask



       data->       mask





masked data
masked data



       mask->masked data





byte0


5


4


3


2


1


0


7


6



mask

1

1

1

1

1

1

1

1



byte1


5


4


3


2


1


0


7


6



(Round 1) Write







%0






bit
bit



->bit





byte
byte



bit->byte





lhs/rhs
lhs/rhs



byte->lhs/rhs





byte0


7


6


5


4


3


5


4


3



byte1


2


1


0


7


6

2

1

0



byte2

7

6

5

4

3

2

1

0



t0

0

1

2

3

4

5

6

7



t0:head->byte0:port0





t1

8

9

10

11

12

13

14

15



t1:head->byte1:port0





t2

16

17

18

19

20

21

22

23



t2:head->byte2:port0





bitsize
bitsize



bitsize->byte0:port5





bitsize->byte1:port4





bitsize->byte1:port0





write_lhs

write_lhs



write_lhs->t0:s




(Round 2) Read







%0






bit
bit



->bit





byte
byte



bit->byte





lhs/rhs
lhs/rhs



byte->lhs/rhs





byte0


7


6


5


4


3


2


1


0



byte1


7


6


5


4


3


2


1


0



byte2


7


6

5

4

3

2

1

0



t0

0

1

2

3

4

5

6

7



t0:head->byte0:port0





t1

8

9

10

11

12

13

14

15



t1:head->byte1:port0





t2

16

17

18

19

20

21

22

23



t2:head->byte2:port0





bitsize
bitsize



bitsize->byte1:port2





bitsize->byte1:port6





bitsize->byte2:port1





read_lhs

read_lhs



read_lhs->t1:s





考慮變數帶有以下後綴的意義

  • _lhs:表示寫入/讀取位元從起始位置,從 MSB 開始
  • _rhs:表示從 _lhs 開始最多能寫入/讀取的位元數。例如 _lhs=2, _rhs=6 表示最多能寫入/讀取 6 bits。

設計上,bitcpy() 儘可能的以1位元組作為寫入/讀取單位以節省操作次數

size_t bitsize = (count > 8) ? 8 : count;

而在讀取/寫入指定的 count 位元數前,每一次分為讀取與寫入兩步驟,先看寫入:

uint8_t data = *source++;
size_t bitsize = (count > 8) ? 8 : count;
// 從 source 讀取的資料往左位移 `read_lhs`,取得所需位元
if (read_lhs > 0) {
    data <<= read_lhs;
    // 在位移後,有空間塞入其他資料
    // 這時判斷 bitsize 是否超過能讀取的位元數 read_rhs ,需跨到另一個位元組
    // 如果是,很貪婪的一次拿 (8-read_rhs) 位元
    if (bitsize > read_rhs)
        data |= (*source >> read_rhs);
}

// 將 bitsize 後無效位元寫0,欲與其他資料進行 OR 運算結合
if (bitsize < 8)
    data &= read_mask[bitsize];

我們取得 0xBEEF0000 類型向左對齊的資料後再看到寫入,分做兩種情況:

  1. bitsize <= write_rhs: 不需跨兩位元組寫入
  2. bitsize > write_rhs: 需跨兩位元組寫入

第 1 種情況,不需跨兩位元組寫入,只需在一位元組內將舊資料 original 與新資料 data 結合

*dest++ = (original & mask) | (data >> write_lhs);

mask 是中間挖空的遮罩,預期組成為

1.......1 | 0.....0 | 1.................1
write_lhs   bitsize   8-bitsize-write_lhs

考慮 bitsize+write_lhs 的各種情況:

8-bitsize-write_lhs bitsize+write_lhs expected mask
8 0 0b11111111 (invalid, bitsize 不能為0)
7 1 0b01111111
6 2 0b00111111
5 3 0b00011111
4 4 0b00001111
3 5 0b00000111
2 6 0b00000011
1 7 0b00000001
0 8 0b00000000
遮罩 mask 前面的部份由 read_mask[write_lhs],後者恰好由 write_mask[write_lhs + bitsize] 所對應,因此無跨位元寫入程式碼為:
uint8_t mask = read_mask[write_lhs];
mask |= write_mask[write_lhs + bitsize]; *dest++ = (original & mask) | (data >> write_lhs);

第2種情況,跨位元組寫入,情況稍微複雜,但判斷機制與讀取一樣:

bitsize > write_rhs

以圖示說明兩次寫入的 BYTE 1, BYTE 2 的預期遮罩 mask1, mask2

          ⌜            bitsize              ⌝
1.......1 | 0........0 | 0.................0 | 1...................1 |
write_lhs   writh_rhs  | bitsize-writh_rhs     8-(bitsize-writh_rhs) |

 BYTE 1                | BYTE 2

同樣考慮 bitsize-writh_rhs 的各種情況,恰好對應到表 write_mask

8-(bitsize-writh_rhs) bitsize-write_rhs expected mask
8 0 0b11111111 (invalid,
bitsizewrite_rhs>0
)
7 1 0b01111111
6 2 0b00111111
5 3 0b00011111
4 4 0b00001111
3 5 0b00000111
2 6 0b00000011
1 7 0b00000001
0 8 0b00000000 (invalid,
bitsize8
)

因此

  • mask1 = read_mask[write_lhs]
  • mask2 = write_mask[bitsize - write_rhs]

跨位元寫入程式碼為所對應的程式碼為:

uint8_t mask = read_mask[write_lhs]; if (bitsize > write_rhs) { /* Cross multiple bytes */ *dest++ = (original & mask) | (data >> write_lhs); original = *dest & write_mask[bitsize - write_rhs]; *dest = original | (data << write_rhs);

總結 read_maskwrite_mask 的意義:

  • read_mask:用來過濾真實的資料。在寫入也會使用到,因為寫入時也是需要將資料讀出來進行合併。
  • write_mask:在指定位元組上謄出空間給資料寫入。

雖然能描述上述程式碼,但不認為能在短時間寫出。

改良 bitcpy

使用 perf -F max 以最大取樣頻率分析函式內部熱點,而為了採樣更多的數據分析,放大原本的input, output, i, j, k 為32倍,此時 perf.data 來到100M。Intel 和 AT&T 語法尚有些微差異perf report 時使用 -M intel 更好閱讀。

sudo perf record -F max ./main
sudo perf report -M intel

發現函式內部熱點大多集中在 mov,而 mov 的 register operand 最大可為 64bit,既然如此,實作 64-bit 的 bitcpy 對效能應有所改善。

  1.09 │       lea     rcx,[rip+0xd6e]
  1.71 │       movzx   eax,BYTE PTR [rax+rcx*1]
  0.78 │       and     eax,edx
  5.07 │       mov     BYTE PTR [rbp-0x3a],al
       │     *dest = original | (data << write_rhs);
  0.27 │       movzx   eax,BYTE PTR [rbp-0x3b]
  0.07 │       mov     rdx,QWORD PTR [rbp-0x10]
  0.00 │       mov     ecx,edx
  3.19 │       shl     eax,cl
  0.02 │       mov     edx,eax
  3.13 │       movzx   eax,BYTE PTR [rbp-0x3a]
  2.34 │       or      eax,edx
  0.07 │       mov     edx,eax
  0.00 │       mov     rax,QWORD PTR [rbp-0x30]
  6.00 │       mov     BYTE PTR [rax],dl
  0.10 │     ↓ jmp     1ca

以下為 64-bit 版的 bitcpy

#define __ALIGN_KERNEL(x, a) __ALIGN_KERNEL_MASK(x, (__typeof__(x))(a)-1) #define __ALIGN_KERNEL_MASK(x, mask) (((x) + (mask)) & ~(mask)) #define ALIGN_DOWN(x, a) __ALIGN_KERNEL((x) - ((a)-1), (a)) #define reverse_byte(n) \ __extension__({ \ __typeof__(n + 0) _n = (n); \ _n = ((_n & 0xffffffff00000000) >> 32) | ((_n & 0x00000000ffffffff) << 32); \ _n = ((_n & 0xffff0000ffff0000) >> 16) | ((_n & 0x0000ffff0000ffff) << 16); \ _n = ((_n & 0xff00ff00ff00ff00) >> 8) | ((_n & 0x00ff00ff00ff00ff) << 8); \ }) void bitcpy64(void *_dest, /* Address of the buffer to write to */ size_t _write, /* Bit offset to start writing to */ const void *_src, /* Address of the buffer to read from */ size_t _read, /* Bit offset to start reading from */ size_t count) { size_t read_lhs = _read & 63; size_t read_rhs = 64 - read_lhs; const uint64_t *source = (const uint64_t *)_src + (_read / 64); size_t write_lhs = _write & 63; size_t write_rhs = 64 - write_lhs; uint64_t *dest = (uint64_t *)_dest + (_write / 64); while (count > 0) { /* Downgrade 64-bit version to 8-bit version */ if (count < 64) { size_t _read_lhs = ALIGN_DOWN(read_lhs, 8); size_t _write_lhs = ALIGN_DOWN(write_lhs, 8); bitcpy(((uint8_t *)dest) + _write_lhs / 8, write_lhs - _write_lhs, ((uint8_t *)source) + _read_lhs / 8, read_lhs - _read_lhs, count); return; } uint64_t data = reverse_byte(*source++); size_t bitsize = (count > 64) ? 64 : count; if (read_lhs > 0) { data <<= read_lhs; if (bitsize > read_rhs) data |= (reverse_byte(*source) >> read_rhs); } if (bitsize < 64) data &= (int64_t)(1ULL << 63) >> (bitsize - 1); uint64_t original = reverse_byte(*dest); uint64_t mask = write_lhs ? ((int64_t)(1ULL << 63) >> (write_lhs - 1)) : 0; if (bitsize > write_rhs) { /* Cross multiple bytes */ *dest++ = reverse_byte((original & mask) | (data >> write_lhs)); original = reverse_byte(*dest) & ~((int64_t)(1ULL << 63) >> (bitsize - write_rhs - 1)); *dest = reverse_byte(original | (data << write_rhs)); } else { // Since write_lhs + bitsize is never >= 64, no out-of-bound access. mask |= ~((int64_t)(1ULL << 63) >> (write_lhs + bitsize - 1)); *dest++ = reverse_byte((original & mask) | (data >> write_lhs)); } count -= bitsize; } }

上面程式碼假定使用者會在呼叫函式前判斷寫入/讀取處理是否越界。

64位元版本的 bitcpy 與原先版本差在 buffer 從 8-bit 變化至 64-bit,這裡用 32-bit buffer 進行說明,於原先的記憶體配置下得到的資料會是:

            0xAA             0xBB              0xCC             0xDD
bits  7 6 5 4 3 2 1 0 | 7 6 5 4 3 2 1 0 | 7 6 5 4 3 2 1 0 | 7 6 5 4 3 2 1 0 
            Byte0           Byte1
input 0 1 2 3 4 5 6 7   8 9 ...

                                      ||
                                      \/

uint32_t data = *source++;

            0xDD             0xCC              0xBB             0xAA
      7 6 5 4 3 2 1 0 | 7 6 5 4 3 2 1 0 | 7 6 5 4 3 2 1 0 | 7 6 5 4 3 2 1 0 
bits  32 31 30 ...                                             .....  2 1 0
      H                                                                   L

原先 <<, >>, |, & 之類的操作因為 Little Endian byte ordering 的關係無法使用,因此讀取需要用 reverse_byte 將位元順序翻轉再進行操作,寫入時再翻轉回來,有點像是線性轉換/反轉換。

  • +0技巧是為了去掉 const qualifier。

    ​​​​__typeof__(n + 0) _n = (n);
  • 一開始擔心 & 在一邊是 unsigned 另一邊是 signed 型態轉換上會有問題,在規格書看到會先做 Usual arithmetic conversions,往下找到對應解釋

    Otherwise, if the operand that has unsigned integer type has rank greater or equal to the rank of the type of the other operand, then the operand with signed integer type is converted to the type of the operand with unsigned integer type.

    放心使用

    ​​​​data &= (int64_t)(1ULL << 63) >> (bitsize - 1);

幫你補充,根據 C99 standard 6.5.7.5

The result of E1 >> E2 is E1 right-shifted E2 bit positions. If E1 has an unsigned type
or if E1 has a signed type and a nonnegative value, the value of the result is the integral
part of the quotient of E1 / 2E2. If E1 has a signed type and a negative value, the
resulting value is implementation-defined.

因此 gcc documentation 有明確定義:

Bitwise operators act on the representation of the value including both the sign and value bits, where the sign bit is considered immediately above the highest-value value bit. Signed ‘>>’ acts on negative numbers by sign extension.

當然不能說你錯,但建議可以改寫避免此用法。

BakudYao

參考 bakudr18 共筆改寫為可讀性較高的用法:

#define READMASK(x) ((uint64_t)(~0ULL) << (64 - (x)))
#define WRITEMASK(x) (~READ_MASK(x))

然而 READ_MASK(0) 有 undefined behavior 不得已加上分支

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

#define READMASK(x) (__builtin_expect(!!(x), 1) ? ((uint64_t)(~0ULL) << (64 - (x))) : 0ULL)
#define WRITEMASK(x) (~READMASK(x))
  • count<64 使用原先版本進行複製,地址、位移需要重新計算,借用到 kernel.hALIGN_DOWN
    ​​​​/* Downgrade 64-bit version to 8-bit version */ ​​​​if (count < 64) ​​​​{ ​​​​ size_t _read_lhs = ALIGN_DOWN(read_lhs, 8); ​​​​ size_t _write_lhs = ALIGN_DOWN(write_lhs, 8); ​​​​ bitcpy(((uint8_t *)dest) + _write_lhs / 8, ​​​​ write_lhs - _write_lhs, ​​​​ ((uint8_t *)source) + _read_lhs / 8, ​​​​ read_lhs - _read_lhs, ​​​​ count); ​​​​ return; ​​​​}

觀察複製 0~4kB 的效能差距,每次測試讀取/寫入偏移量涵蓋 0~63,64-bit 版約省下一半以上的時間:


code

看起來很有機會加入減少 branch 的作法獲得不錯的效能提升!也許你可以試試,細節可參考 bakudr18 共筆

BakudYao

一開始有想過減少分支數量,但 Mispredicted branch 比重太低,而且編譯器 -O3 開下去有時效果更好,會感覺又在做白工。

gcc -g -O0 -o main main.c
sudo perf record -b ./main
sudo perf report --sort mispredict --symbols bitcpy,bitcpy64,bitcpy64_likely
Samples: 12M of event 'cycles', Event count (approx.): 12608064
Overhead  Source Symbol                                   Target Symbol                                   Branch Mispredicted
   0.01%  [.] main                                        [.] main                                        Y
   0.01%  [.] __memset_avx2_unaligned_erms                [.] __memset_avx2_unaligned_erms                Y
   0.01%  [.] bitcpy64_likely                             [.] bitcpy64_likely                             Y
   0.00%  [.] bitcpy64                                    [.] bitcpy64                                    Y
   0.00%  [.] prepare_exit_to_usermode                    [.] swapgs_restore_regs_and_return_to_usermode  N
   0.00%  [.] swapgs_restore_regs_and_return_to_usermode  [.] swapgs_restore_regs_and_return_to_usermode  N
   0.00%  [.] swapgs_restore_regs_and_return_to_usermode  [.] native_iret                                 N
   0.00%  [.] note_gp_changes                             [.] rcu_core                                    N
   0.00%  [.] native_irq_return_iret                      [.] bitcpy64_likely                             N
   0.00%  [.] native_irq_return_iret                      [.] bitcpy64                                    N
   0.00%  [.] bitcpy64                                    [.] irq_entries_start                           N
   0.00%  [.] __memset_avx2_erms                          [.] main                                        Y 
   0.00%  [.] irq_exit                                    [.] smp_apic_timer_interrupt                    Y

bitcpy64_likely 偏好跨 8 位元組

​​​uint64_t original = reverse_byte(*dest);
​​​uint64_t mask = READ_MASK(write_lhs);
​​​if (__builtin_expect(!!(bitsize > write_rhs), 1))

雖然如此,還是仿照 bakudr18/quiz2 實作消除 branch-prediction 的版本

#define READMASK(x) (__builtin_expect(!!(x), 1) ? ((uint64_t)(~0ULL) << (64 - (x))) : 0ULL) #define WRITEMASK(x) (~READMASK(x)) void bitcpy64_branch_predict(void *_dest, /* Address of the buffer to write to */ size_t _write, /* Bit offset to start writing to */ const void *_src, /* Address of the buffer to read from */ size_t _read, /* Bit offset to start reading from */ size_t count) { size_t read_lhs = _read & 63; size_t read_rhs = 64 - read_lhs; const uint64_t *source = (const uint64_t *)_src + (_read / 64); size_t write_lhs = _write & 63; size_t write_rhs = 64 - write_lhs; uint64_t *dest = (uint64_t *)_dest + (_write / 64); uint64_t data, original; /* copy until count < 64 bits */ for (size_t bytecount = count >> 6; bytecount > 0; bytecount--) { data = reverse_byte(*source++); data = data << read_lhs | (reverse_byte(*source) >> read_rhs); original = reverse_byte(*dest) & READMASK(write_lhs); *dest++ = reverse_byte(original | (data >> write_lhs)); *dest = reverse_byte((reverse_byte(*dest) >> write_lhs) | (data << write_rhs)); } count &= 63; /* copy the remaining count */ data = reverse_byte(*source++); data = ((data << read_lhs) | (reverse_byte(*source) >> read_rhs)) & READMASK(count); original = (reverse_byte(*dest) & READMASK(write_lhs)); *dest++ = reverse_byte(original | ((data & READMASK(count)) >> write_lhs)); if (count > write_rhs) *dest = reverse_byte((reverse_byte(*dest) & WRITEMASK(count - write_rhs)) | (data << write_rhs)); }

測量 0~16 kbits 執行100次的效能

  • -O0
  • -O3

發現消弭分支帶來的效益不大。其實以 -O0 重現 bakudr18/quiz2 的實驗也與原本實驗結果有落差,branch misses 並效能差距的主要原因。

sudo perf record -b ./bench_bitcpy
sudo perf report --sort \
  symbol_from,symbol_to,mispredict --symbols bitcpy,bitcpy_branch_predict
Samples: 15K of event 'cycles', Event count (approx.): 15808
Overhead  Source Symbol              Target Symbol              Branch Mispredicted  IPC   [IPC Coverage]
  29.07%  [.] bitcpy                 [.] bitcpy                 N                    2.06  [ 90.8%]
  19.46%  [.] bitcpy_branch_predict  [.] bitcpy_branch_predict  N                    1.02  [ 96.4%]
   0.03%  [.] bitcpy                 [.] bitcpy                 Y                    2.06  [ 90.8%] <=====

延伸問題:

  • 在 Linux 核心原始程式碼找出逐 bit 進行資料複製的程式碼,並解說相對應的情境

測驗4

延伸問題:

  • 解釋上述程式碼運作原理
  • 上述程式碼使用到 gcc atomics builtins,請透過 POSIX Thread 在 GNU/Linux 驗證多執行緒的環境中,string interning 能否符合預期地運作?若不能,請提出解決方案
  • chriso/intern 是個更好的 string interning 實作,請探討其特性和設計技巧,並透過內建的 benchmark 分析其效能表現

1. cstr 資料結構

  • __cstr_data

    ​​​​enum {
    ​​​​    CSTR_PERMANENT = 1,
    ​​​​    CSTR_INTERNING = 2,
    ​​​​    CSTR_ONSTACK = 4,
    ​​​​};
    
    ​​​​typedef struct __cstr_data {
    ​​​​    char *cstr;
    ​​​​    uint32_t hash_size;
    ​​​​    uint16_t type;
    ​​​​    uint16_t ref;
    ​​​​} * cstring;
    
    • cstr: 字串。
    • type: 表示字串 cstr 所處位置,有 HEAP 與 STACK (CSTR_ONSTACK)。
    • hash_size: 紀錄字串長度,之後的 hash table 使用此值計算 hash,應改用 length 為名較為合適。hash_size = 0 為特殊狀況(??)。
    • ref: 字串位於 heap 時,用來紀錄 refcount,從非 0 轉變成 0 時需要要釋放記憶體。
  • __cstr_buffer

    ​​​​typedef struct __cstr_buffer {
    ​​​​    cstring str;
    ​​​​} cstr_buffer[1];
    
    ​​​​#define CSTR_BUFFER(var)                                                      \
    ​​​​    char var##_cstring[CSTR_STACK_SIZE] = {0};                                \
    ​​​​    struct __cstr_data var##_cstr_data = {var##_cstring, 0, CSTR_ONSTACK, 0}; \
    ​​​​    cstr_buffer var;                                                          \
    ​​​​    var->str = &var##_cstr_data;
    
    ​​​​// cstr_buffer sb => __cstr_buffer *sb
    ​​​​cstring cstr_cat(cstr_buffer sb, const char *str){ 
    ​​​​    cstring s = sb->str;
    ​​​​    ...
    ​​​​    sb->str = cstr_cat2(tmp->cstr, str);
    ​​​​    cstr_release(tmp);
    ​​​​    return sb->str;
    ​​​​}
    
    

    cstr_buffer var 經由 CSTR_BUFFER(var) 創立後,透過對以下操作對 __cstr_buffer 內的 str 進行修改

    ​​​​CSTR_BUFFER(var);
    ​​​​var[0].str = NULL;
    ​​​​var->str = NULL;
    

    或是透過 cstr_cat()。由於轉型成指標的關係,函式內部透過 sb->str 存取 str

    ​​​​CSTR_BUFFER(a); // ON_STACK
    ​​​​cstr_cat(a, "Hello ");
    

    __cstr_buffer 也能創立在 heap。

    ​​​​cstr_buffer *b = (__cstr_buffer *sb) calloc(sizeof(__cstr_buffer));
    ​​​​cstr_cat(b, "Hello ");
    

    __cstr_buffer 建立在 stack、cstring str 使用 struct __cstr_buffer 包住的動機不明。

2. 從 cstr_cat 切入

cstring cstr_cat(cstr_buffer sb, const char *str)
{
    cstring s = sb->str;
    if (s->type == CSTR_ONSTACK) {
        int i = s->hash_size;
        while (i < CSTR_STACK_SIZE - 1) {
            s->cstr[i] = *str;
            if (*str == 0)
                return s;
            ++s->hash_size;
            ++str;
            ++i;
        }
        s->cstr[i] = 0;
    }
    cstring tmp = s;
    sb->str = cstr_cat2(tmp->cstr, str);
    cstr_release(tmp);
    return sb->str;
}

以上功能為

  1. s 指向的 __cstr_data 位在 stack 上(由CSTR_BUFFER創建的結果),儘可能附加 str 到 stack 上的 cstr 後,含 \0 最多 CSTR_STACK_SIZE 字元。若不足 CSTR_STACK_SIZE 則返回 cstr
  2. 接著不管 __cstr_data 是在 heap 或 stack,透過 cstr_cat2 結合新舊 cstr 取得/生成新的 __cstr_data,釋放舊的 __cstr_data 物件。

往下看 cstr_cat2

static cstring cstr_cat2(const char *a, const char *b)
{
    size_t sa = strlen(a), sb = strlen(b);
    if (sa + sb < CSTR_INTERNING_SIZE) {
        char tmp[CSTR_INTERNING_SIZE];
        memcpy(tmp, a, sa);
        memcpy(tmp + sa, b, sb);
        tmp[sa + sb] = 0;
        return cstr_interning(tmp, sa + sb, hash_blob(tmp, sa + sb));
    }
    cstring p = xalloc(sizeof(struct __cstr_data) + sa + sb + 1);
    if (!p)
        return NULL;

    char *ptr = (char *) (p + 1);
    p->cstr = ptr;
    p->type = 0;
    p->ref = 1;
    memcpy(ptr, a, sa);
    memcpy(ptr + sa, b, sb);
    ptr[sa + sb] = 0;
    p->hash_size = 0;
    return p;
}
  • cstr_cat2() 最終需返回指向新 __cstr_datacstring
    • 若輸入字串 ab 總長低於 CSTR_INTERNING_SIZE,返回的是指向 interning table 內元素(__cstr_data)的 cstring
      • 使用 hash_blob(tmp, sa + sb) 作為 hash value
    • 其他情況下將創建字串 cstr 附加在 __cstr_data
      • type = 0
      • hash_size = 0
      • ref = 1

cstr_interning() 作為整個程式核心,負責維護 interned string table (struct __cstr_node **hash)。

__cstr_ctx 有幾項作用

  • 指向 table
  • table 的 lock
  • 分別透過 total、size 紀錄元素數量、容量,容量不足時需透過 expand() 倍增容量
  • 指向存放多個 __cstr_nodepool
  • 紀錄 pool 內元素大小的 index

依據 ISO/IEC 9899:1999 (E) 6.7.8__cstr_ctx 的成員被初始化為 0 或 null pointer。

If an object that has automatic storage duration is not initialized explicitly, its value is indeterminate. If an object that has static storage duration is not initialized explicitly, then:
— if it has pointer type, it is initialized to a null pointer;
— if it has arithmetic type, it is initialized to (positive or unsigned) zero;
— if it is an aggregate, every member is initialized (recursively) according to these rules;
— if it is a union, the first named member is initialized (recursively) according to these
rules.

struct __cstr_interning {
    int lock;
    int index;
    unsigned size;
    unsigned total;
    struct __cstr_node **hash;
    struct __cstr_pool *pool;
};

static struct __cstr_interning __cstr_ctx;

static cstring cstr_interning(const char *cstr, size_t sz, uint32_t hash)
{
    cstring ret;
    CSTR_LOCK();
    ret = interning(&__cstr_ctx, cstr, sz, hash);
    if (!ret) {
        expand(&__cstr_ctx);
        ret = interning(&__cstr_ctx, cstr, sz, hash);
    }
    ++__cstr_ctx.total;
    CSTR_UNLOCK();
    return ret;
}

interned string table 是一張使用 Separate chaining 處理碰撞的 hash table,

由於 __cstr_data 自身無法連結其他資料,須使用 __cstr_node 包裝。

struct __cstr_node {
    char buffer[CSTR_INTERNING_SIZE]; // 暫時無用處
    struct __cstr_data str;
    struct __cstr_node *next;
};

在 table 內新增 entry 只能透過 interning(),以傳遞進的 hash 作為 index,__cstr_interning.hash 上儲存的無疑是指向一連串連續 __cstr_node* 起始位置的指標。

  1. 先是在對應的 list 上尋找有無字串相符的 __cstr_data
    ​​​​int index = (int) (hash & (si->size - 1));
    ​​​​struct __cstr_node *n = si->hash[index];
    ​​​​while (n) {
    ​​​​    if (n->str.hash_size == hash) {
    ​​​​        if (!strcmp(n->str.cstr, cstr))
    ​​​​            return &n->str;
    ​​​​    }
    ​​​​    n = n->next;
    ​​​​}
    
  2. 若無,檢查 capacity
    ​​​​// 80% (4/5) threshold
    ​​​​if (si->total * 5 >= si->size * 4)
    ​​​​    return NULL;
    
    ​​​​if (!si->pool) {
    ​​​​    si->pool = xalloc(sizeof(struct __cstr_pool));
    ​​​​    si->index = 0;
    ​​​​}
    
  3. 在 heap 上的 pool 新建 __cstr_node,這樣做的好處是這些新建的 __cstr_node 都使用相鄰的記憶體位置,甚至連 __cstr_node.str.cstr 的空間也不分配,直接指向 __cstr_node.buffer[]
    ​​​​/* cstr.c */
    ​​​​struct __cstr_pool {
    ​​​​    struct __cstr_node node[INTERNING_POOL_SIZE];
    ​​​​};
    
    ​​​​n = &si->pool->node[si->index++]; /* 沒有檢查 index < INTERNING_POOL_SIZE */
    ​​​​memcpy(n->buffer, cstr, sz);
    ​​​​n->buffer[sz] = 0;
    
    ​​​​cstring cs = &n->str;
    ​​​​cs->cstr = n->buffer;
    ​​​​cs->hash_size = hash;
    ​​​​cs->type = CSTR_INTERNING; /* important */
    ​​​​cs->ref = 0;
    
    ​​​​n->next = si->hash[index];
    ​​​​si->hash[index] = n;
    
    示意圖如下
    
    
    
    
    
    
    %0
    
    
    
    __cstr_interning
    
    __cstr_interning
    
    lock
    
    index
    
    size
    
    total
    
    hash
    
    pool
    
    
    
    __cstr_pool
    
    __cstr_pool
    
    node[0]
    
    node[1]
    
    buffer
    
    str
    
    next
    
    node[2]
    
    node[3]
    
    node[4]
    
    ...
    
    
    
    __cstr_interning:pool->__cstr_pool
    
    
    
    
    
    hash
    
    0
    
    1
    
    2
    
    3
    
    4
    
    5
    
    ...
    
    size-1
    
    
    
    __cstr_interning:hash->hash:head
    
    
    
    
    
    __cstr_pool:next->__cstr_pool:n2
    
    
    
    
    
    __cstr_pool:n2->__cstr_pool:n4
    
    
    
    
    
    hash:2->__cstr_pool:n1
    
    
    
    
    
    

3. CSTR_LITERAL

用法

CSTR_LITERAL(hello, "Hello string");

產生的 __cstr_data 可能具有最後一種不會被 cstr_release 影響到的型態:CSTR_PERMANENT,具體行為如下:

  1. cstr_clone() 嘗試將 "Hello string" 放到 string interned table 中,或仿照 cstr_cat2 複製到 heap 上
    ​​​​void *ptr = (void *) (p + 1);
    ​​​​p->cstr = ptr;
    ​​​​p->type = 0;
    ​​​​p->ref = 1;
    ​​​​memcpy(ptr, cstr, sz);
    ​​​​((char *) ptr)[sz] = 0;
    ​​​​p->hash_size = 0;
    
  2. ​​​​tmp->type = CSTR_PERMANENT;
    ​​​​tmp->ref = 0;   
    
#define CSTR_LITERAL(var, cstr)                                               \
    static cstring var = NULL;                                                \
    if (!var) {                                                               \
        cstring tmp = cstr_clone("" cstr, (sizeof(cstr) / sizeof(char)) - 1); \
        if (tmp->type == 0) {                                                 \
            tmp->type = CSTR_PERMANENT;                                       \
            tmp->ref = 0;                                                     \
        }                                                                     \
        if (!__sync_bool_compare_and_swap(&var, NULL, tmp)) {                 \
            if (tmp->type == CSTR_PERMANENT)                                  \
                free(tmp);                                                    \
        }                                                                     \
    }

無法想像到 __sync_bool_compare_and_swap 失敗的情境

4. cstr_release

From GCC - Atomic-Builtins

type __sync_sub_and_fetch (type *ptr, type value, ...)

These builtins perform the operation suggested by the name, and return the new value. That is,
          { *ptr op= value; return *ptr; }
enum {
    CSTR_PERMANENT = 1,
    CSTR_INTERNING = 2, // OK
    CSTR_ONSTACK = 4, // OK
};

void cstr_release(cstring s)
{
    if (s->type || !s->ref)
        return;
    if (__sync_sub_and_fetch(&s->ref, 1) == 0)
        free(s);
}

將只有位在 heap 的 __cstr_data 且型態非

  • CSTR_PERMANENT: 不於 table 又在 heap 上,由使用者自行維護
  • CSTR_INTERNING: table 內物件

進行 refcnt - 1 並釋放,atomic operation 確保一個 refcnt 不會同時進行操作。

5. cstr_grab

cstring cstr_grab(cstring s) { if (s->type & (CSTR_PERMANENT | CSTR_INTERNING)) return s; if (s->type == CSTR_ONSTACK) return cstr_clone(s->cstr, s->hash_size); if (s->ref == 0) s->type = CSTR_PERMANENT; else __sync_add_and_fetch(&s->ref, 1); return s; }

整份程式也只有呼叫過一次,不難看出為了在離開函式也能使用 __cstr_data,stack 上的物件必須複製一份 (行 7)。之後符合 s->ref == 0 的情況 (行 8),不可能發生。

/* main.c */
static cstring cmp(cstring t)
{
    CSTR_LITERAL(hello, "Hello string");
    CSTR_BUFFER(ret);
    ...
    return cstr_grab(CSTR_S(ret));
}

cstring b = cmp(CSTR_S(a));

6. cstr_equal

int cstr_equal(cstring a, cstring b) { if (a == b) return 1; if ((a->type == CSTR_INTERNING) && (b->type == CSTR_INTERNING)) return 0; if ((a->type == CSTR_ONSTACK) && (b->type == CSTR_ONSTACK)) { if (a->hash_size != b->hash_size) return 0; return memcmp(a->cstr, b->cstr, a->hash_size) == 0; } uint32_t hasha = cstr_hash(a); uint32_t hashb = cstr_hash(b); if (hasha != hashb) return 0; return !strcmp(a->cstr, b->cstr); } static size_t cstr_hash(cstring s) { if (s->type == CSTR_ONSTACK) return hash_blob(s->cstr, s->hash_size); if (s->hash_size == 0) s->hash_size = hash_blob(s->cstr, strlen(s->cstr)); return s->hash_size; }

值得注意的是行 12~16 的邏輯,比完 hash value 才去比較字串 cstr,拿去計算 hash value 的都是用 cstrstrlen(cstr),像 CSTR_INTERNINGCSTR_PERMANENT 這種型態內 hash_size 無實質意義,就得用 strlen() 重新取得長度。


比對兩 cstring 是否相等,得依其順序進行比較

  1. type
  2. hash_size
  3. memcmp

型態和 hash_size 之間的關係:

type memory location ref hash_size
CSTR_ONSTACK stack 0 string length
CSTR_INTERNING heap 0 hash_blob(cstr, strlen(cstr))
CSTR_PERMANENT heap 0 0
0 (default) heap >=1 0

7. 多執行緒

8. array-to-pointer decay

array-to-pointer decay 或是 array decay 指的是陣列型態退化成指標,失去原先陣列所帶有的資訊,例如陣列大小。出現在以下情境:

  1. function declarators
  2. 發生在表達式的 implicit conversion

關於第 1 點在 6.7.5.3 Function declarators 能找到相關敘述,或是參照 Daniel Chen 筆記。第 2 點常發生在 assignment,而 assignment 左式必須為 modifiable lvalue,ISO/IEC 1999:9899 - 6.3.2.1 這樣定義:

When an object is said to have a particular type, the type is specified by the lvalue used to designate the object. A modifiable lvalue is an lvalue that does not have array type, does not have an incomplete type, does not have a const-qualified type, and if it is a structure or union, does not have any member (including, recursively, any member or element of all contained aggregates or unions) with a const-qualified type.

這樣的限制,使下方程式無法編譯成功

int a[3], b[3];
a = b; // error: assignment to expression with array type

隨後條目指出

Except when it is the operand of the sizeof operator or the unary & operator, or is a string literal used to initialize an array, an expression that has type ‘‘array of type’’ is converted to an expression with type ‘‘pointer to type’’ that points to the initial element of the array object and is not an lvalue. If the array object has register storage class, the behavior is undefined.

因此 assignment 右方具有陣列型態的表達式將被轉型

int a[3], b[3][4], *p, (*q)[4];
p = a; // conversion to &a[0]
q = b; // conversion to &b[0]

關於 lvalue 的定義一般認知是 locator value,但規格書對 lvalue 的定義看起來很廣:

An lvalue is an expression with an object type or an incomplete type other than void; if an lvalue does not designate an object when it is evaluated, the behavior is undefined.

type
Objects, functions, and expressions have a property called type, which determines the interpretation of the binary value stored in an object or evaluated by the expression.

9. 與 chriso/intern 比較

為了讓測試進行下去,先套用 iLeafy11 提到的 __cstr_ctx.total 誤植問題

static cstring cstr_interning(const char *cstr, size_t sz, uint32_t hash)
{
    ...
-   ++__cstr_ctx.total;
    ...
}
static cstring interning(struct __cstr_interning *si,
                         const char *cstr,
                         size_t sz,
                         uint32_t hash)
{
    ...
+   ++__cstr_ctx.total;
}

另一個問題是 void *ptr = (void *) (p + 1); 使用 gcc -Wpedantic -Wall -std=c99 編譯會出現警告。嚴格說,在 void* 上進行算術運算是非法操作,但 gcc 實作允許。

main.c:4:20: warning: pointer of type ‘void *’ used in arithmetic [-Wpointer-arith]
4 |   printf("%p\n", a + 1);

Reference

  • 用法

    ​​​​// 使用上得先創立 `strings`
    ​​​​char buffer[12] = "Hello world";
    ​​​​struct strings *strings = strings_new();
    
    ​​​​// 接著向 interned table 塞入字串,取得從 1 遞增的 id
    ​​​​uint32_t id = strings_intern(strings, buffer);
    
    ​​​​// 以 id 查詢字串
    ​​​​const char* str = strings_lookup_id(strings, id);
    
    ​​​​printf("%s\n", str);
    
  • 仿造 chriso/internbenchmark 量測插入 1~5000000 共 5M 個字串到 interned table 平均所需要的額外開銷

    • chriso/intern
      • 修改
        ​​​​​​​​​​​​/* benchmark.c */
        ​​​​​​​​​​​​- char buffer[12] = {'x'};
        ​​​​​​​​​​​​+ char buffer[12] = {0};
        ​​​​​​​​​​​​uint32_t count = 5000000;
        ​​​​​​​​​​​​size_t string_bytes = 0;
        
        ​​​​​​​​​​​​for (uint32_t id = 1; id <= count; id++) {
        ​​​​​​​​​​​​-    unsigned_string(buffer + 1, id);
        ​​​​​​​​​​​​+    unsigned_string(buffer, id);
        ​​​​​​​​​​​​    string_bytes += strlen(buffer);
        ​​​​​​​​​​​​    assert(strings_intern(strings, buffer) == id);
        ​​​​​​​​​​​​}
        
      • 實驗結果
        ​​​​​​​​​​​​Interned 5M unique strings
        ​​​​​​​​​​​​Overhead per string: 44.4 bytes
        
    • cstr
      • 為了容納 5000000 個字串,配置足夠大小的 pool
        ​​​​​​​​​​​​#define INTERNING_POOL_SIZE 1<<23
        
      • 新增 strings_allocated_bytes() 量測 __cstr_interning 佔用空間
        ​​​​​​​​​​​​size_t strings_allocated_bytes()
        ​​​​​​​​​​​​{
        ​​​​​​​​​​​​    size_t s_pool = 0, s_table = 0, s_ctx = 0;
        ​​​​​​​​​​​​    CSTR_LOCK();
        ​​​​​​​​​​​​    if (__cstr_ctx.pool){
        ​​​​​​​​​​​​        s_pool = sizeof(*__cstr_ctx.pool);
        ​​​​​​​​​​​​        printf("pool: %ld bytes\n", s_pool);
        ​​​​​​​​​​​​    }
        ​​​​​​​​​​​​    s_table = sizeof(struct __cstr_node *) * __cstr_ctx.size;
        ​​​​​​​​​​​​    printf("hash table: %ld bytes\n", s_table);
        ​​​​​​​​​​​​    s_ctx = sizeof(__cstr_ctx);
        ​​​​​​​​​​​​    printf("ctx: %ld bytes\n", s_ctx);
        ​​​​​​​​​​​​    CSTR_UNLOCK();
        ​​​​​​​​​​​​    return s_pool + s_table + s_ctx;
        ​​​​​​​​​​​​}
        
      • 實驗結果
        ​​​​​​​​​​​​~$ mkdir build && cd build/
        ​​​​​​​​​​​​~$ cmake ../ && make
        ​​​​​​​​​​​​~$ ./benchmark
        
        ​​​​​​​​​​​​pool: 469762048 bytes
        ​​​​​​​​​​​​hash table: 67108864 bytes
        ​​​​​​​​​​​​ctx: 32 bytes
        ​​​​​​​​​​​​Interned 5000000 unique strings
        ​​​​​​​​​​​​Overhead per string: 99.6 bytes
        
        cstr 額外開銷足足有 chriso/intern 的兩倍,而 pool size 佔了大部份。為了減少開銷,在不採用 LRU 等手段減少 pool size 的情況下,能借鑑的只有 chriso/intern 本身的資料結構。
      • 原始碼

10. chriso/intern 資料結構

/* block.h */
struct block {
    size_t page_size;
    void **pages;
    size_t *offsets;
    size_t count;
    size_t size;
};

/* strings.c */
struct strings {
    struct block *hashes;
    struct block *strings;
    struct block *index;
    tree_t hash_map;
    uint32_t total;
    uint32_t hash_seed;
};

/* strings.h */
uint32_t strings_intern(struct strings*, const char *string);
const char *strings_lookup_id(struct strings*, uint32_t id);

Reference