Try   HackMD

2021q1 Homework2 (quiz2)

contributed by < WayneLin1992 >

tags: linux2021

延伸問題 第一題

  • 解釋程式碼運作原理,指出改進空間並著手實作
  • 實作出能抽離為可單獨執行 (standalone) 的使用層級應用程式
  • 設計效能評比的程式碼,說明 Linux 核心的 lib/list_sort.c 最佳化手法

list_head 結構

struct list_head {
    struct list_head *prev, *next;
};

INIT_LIST_HEAD

static inline void INIT_LIST_HEAD(struct list_head *head)
{
    head->next = head; head->prev = head;
}






list_add_node_t



struct1

prev

next



struct1:f0->struct1:f1





struct1:f1->struct1:f0





get_middle

static list_ele_t *get_middle(struct list_head *list)
{
    struct list_head *fast = list->next, *slow;
    list_for_each (slow, list) {
        if (COND1 || COND2)
            break;
        fast = fast->next->next;
    }
    return list_entry(TTT, list_ele_t, list);
}

使用龜兔賽跑技巧, fast 前進兩位, slow 前進一位,等到 fast 繞一圈,因此 COND1 = fast->next = list COND2 = fast->next->next == list, slow 會在中間的位置,所以 TTT = slow

TODO: 探討偶數和奇數個節點的 list 在上述演算法的行為

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
jserv

當總節點為奇數(共五個節點)時:
初始位置







list_add_node_t



node1

prev

next



node2

prev

next



node1:f1->node2:f0





node5

prev

next



node1:f0->node5:f1





node2:f0->node1:f1





node3

prev

next



node2:f1->node3:f0





node3:f0->node2:f1





node4

prev

next



node3:f1->node4:f0





node4:f0->node3:f1





node4:f1->node5:f0





node5:f1->node1:f0





node5:f0->node4:f1





slow

slow



slow->node1





fast

fast



fast->node2





第二迴圈







list_add_node_t



node1

prev

next



node2

prev

next



node1:f1->node2:f0





node5

prev

next



node1:f0->node5:f1





node2:f0->node1:f1





node3

prev

next



node2:f1->node3:f0





node3:f0->node2:f1





node4

prev

next



node3:f1->node4:f0





node4:f0->node3:f1





node4:f1->node5:f0





node5:f1->node1:f0





node5:f0->node4:f1





slow

slow



slow->node2





fast

fast



fast->node4





第三迴圈:因為 fast->next->next 會指到 list 所以會 break,而此時 slow 指向第三節點。







list_add_node_t



node1

prev

next



node2

prev

next



node1:f1->node2:f0





node5

prev

next



node1:f0->node5:f1





node2:f0->node1:f1





node3

prev

next



node2:f1->node3:f0





node3:f0->node2:f1





node4

prev

next



node3:f1->node4:f0





node4:f0->node3:f1





node4:f1->node5:f0





node5:f1->node1:f0





node5:f0->node4:f1





slow

slow



slow->node3





fast

fast



fast->node1





當總節點為偶數(共四個節點)時
初始位置







list_add_node_t



node1

prev

next



node2

prev

next



node1:f1->node2:f0





node4

prev

next



node1:f0->node4:f1





node2:f0->node1:f1





node3

prev

next



node2:f1->node3:f0





node3:f0->node2:f1





node3:f1->node4:f0





node4:f1->node1:f0





node4:f0->node3:f1





slow

slow



slow->node1





fast

fast



fast->node2





第二迴圈







list_add_node_t



node1

prev

next



node2

prev

next



node1:f1->node2:f0





node4

prev

next



node1:f0->node4:f1





node2:f0->node1:f1





node3

prev

next



node2:f1->node3:f0





node3:f0->node2:f1





node3:f1->node4:f0





node4:f1->node1:f0





node4:f0->node3:f1





slow

slow



slow->node2





fast

fast



fast->node4





第三迴圈: 因為 fast->next = list 所以 break 此時由上圖知道 slow 指向第二節點的位置,
總結:由此可知 slow 是指向中間的位置。

merge_sort

void list_merge_sort(queue_t *q)
{
    if (list_is_singular(&q->list))
        return;

    queue_t left;
    struct list_head sorted;
    INIT_LIST_HEAD(&left.list);
    list_cut_position(&left.list, &q->list, MMM);
    list_merge_sort(&left);
    list_merge_sort(q);
    list_merge(&left.list, &q->list, &sorted);
    INIT_LIST_HEAD(&q->list);
    list_splice_tail(&sorted, &q->list);
}

查看 list_cut_position

/**
 * list_cut_position() - Move beginning of a list to another list
 * @head_to: pointer to the head of the list which receives nodes
 * @head_from: pointer to the head of the list
 * @node: pointer to the node in which defines the cutting point
 */
static inline void list_cut_position(struct list_head *head_to,
                                     struct list_head *head_from,
                                     struct list_head *node)
{
    struct list_head *head_from_first = head_from->next;

    if (list_empty(head_from))
        return;

    if (head_from == node) {
        INIT_LIST_HEAD(head_to);
        return;
    }

    head_from->next = node->next;
    head_from->next->prev = head_from;

    head_to->prev = node;
    node->next = head_to;
    head_to->next = head_from_first;
    head_to->next->prev = head_to;
}

可以看出來 MMM 的位置是 struct list_head *node 題目是要 merge sort 實作,需要的是 get_middle 中 slow 的位置,&get_middle(&q->list) 因為 get_middle 回傳 type 為 list_ele_t ,但需要的是 list_head 所以 &get_middle(&q->list)->list

assert

由 C99 規格書 7.2章,講解 assert.h ,允許寫狀態診斷訊息
assert(validate(head));
可以由 validate( ) 看出此 function 為判斷,是否有從小到大依序排列,假如沒有就會出現下行的訊息。

$ ./doublylinkedlist 
doublylinkedlist.c:147: main: Assertion `validate(head)' failed.

改進空間並著手實作

改善空間,事實上 queue_t 完全是多餘的, list_ele_tnext 也可以捨去

typedef struct __element {
    char *value;
    struct list_head list;
} list_ele_t;

其中最需要注意的就是 free 部分

static void list_free(struct list_head *list)
{
    if (!list) return;

    struct list_head *current = list;
    while (current != list) {
        struct list_head *tmp = current;
        current = current->next;
        free(list_entry(tmp, list_ele_t, list)->value);
        free(list_entry(tmp, list_ele_t, list));
    }
    free(list_entry(current, list_ele_t, list)->value);
    free(list_entry(current, list_ele_t, list));
}

要用 container_of 知道 struct 起始位置,才能 free 掉。

void list_show(struct list_head *list)
{
    struct list_head *node;
    list_for_each(node, list) {
        printf("%s", list_entry(node, list_ele_t, list)->value);
        printf("->");
    }
}

經過這幾次作業的經驗得知,涉及到 C 語言結構體的記憶體釋放,往往要在每個層面都相當熟悉結構體成員的組成,否則就會造成各式記憶體操作的錯誤。我曾經撰寫一個小程式 nalloc,在 C 語言標準函式庫的 malloc 和 free 之上建構一層包裝,達到 structure aware memory allocator 的作用。

舉例來說,你可以這樣使用 nalloc:

struct matrix { size_t rows, cols; int **data; };

struct matrix *matrix_new(size_t rows, size_t cols)
{
    struct matrix *m = ncalloc(sizeof(*m), NULL);
    m->rows = rows; m->cols = cols;
    m->data = ncalloc(rows * sizeof(*m->data), m);
    for (size_t i = 0; i < rows; i++)
        m->data[i] = nalloc(cols * sizeof(**m->data), m->data);
     return m;
 }
 
void matrix_delete(struct matrix *m) { nfree(m); }

注意在 matrix_delete 函式中,你只需要呼叫一次 nfree,即可將 matrix 結構體內部的 data 成員和結構體本身釋放。nalloc 並未增加太多成本,你可一併思考這樣的實作是如何達成。

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
jserv

還可以實做出 show 看到實際排序後的結果。

將 Linux 核心的 lib/list_sort.c 改為可獨立執行的程式碼

lib/list_sort.c

typedef int __attribute__((nonnull(2,3))) (*cmp_func)(void *,
		struct list_head const *, struct list_head const *);

__attribute__((nonnull(2,3))) 代表2和3參數不能空。
@cmp_func: pointer to comparison function

 * A good way to write a multi-word comparison is::
 *
 *	if (a->high != b->high)
 *		return a->high > b->high;
 *	if (a->middle != b->middle)
 *		return a->middle > b->middle;
 *	return a->low > b->low;

與傳統不同的cmp方式。

將 value 轉成 int 使它能讓 random 數值寫入,並且還需要將程式轉成 user level 所能執行達到 standalone 的目的。

  • dlist.h
typedef struct __element {
    int value;
    struct list_head list;
} list_ele_t;
static list_ele_t *get_middle(struct list_head *list)
{
    struct list_head *fast = list->next, *slow;
    list_for_each (slow, list) {
        if (fast->next == list || fast->next->next == list)
            break;
        fast = fast->next->next;
    }
    return list_entry(slow, list_ele_t, list);
}
  • dlist_sort.c
int main()
{
    int count = 20;
    list_ele_t *head = head_new();
    srand(time(NULL));
    while (count--)
        list_insert_head(head, rand() % 1024);

    list_merge_sort(head);
    list_show(&head->list);
    assert(validate(head));
    list_free(&head->list);

}
39->127->164->291->355->402->409->434->437->437->447->497->638->719->742->838->922->942->956->966->


將 quiz1 的 quick sort 及 non recursive quick sort 一起比較後如下圖

非遞迴 優點:能降低 cache missing 的程度,遞迴 優點:程式好寫,由下表可以看出非遞迴比遞迴的 cache missing 低很多, quick sort 的時間複雜度,快的時候能到 O(nlogn) 慢的時候到 O(n2),為了降低 quick sort 的時間複雜不確定性,將 pivot 能夠盡量的在中間值,能夠複雜度可以降到 O(nlogn)。
但經過我重新思考 quiz1 的改寫,發現到我有修改到 pivot 的位置,導致非遞迴 quick sort 時間複雜度較差。

在測驗一次

將 linux list_sort standalone 來比較

開啟 perf 權限

$ sudo sh -c " echo -1 > /proc/sys/kernel/perf_event_paranoid"
$ perf stat --repeat 6 -e cache-misses,cache-references,instructions,cycles,L1-dcache-load-misses ./testbenchm
  • merge sort
 Performance counter stats for './testbench_1' (6 runs):

        16,980,216      cache-misses              #   12.738 % of all cache refs      ( +-  1.61% )
       133,304,604      cache-references                                              ( +-  0.50% )
    11,536,861,274      instructions              #    1.93  insn per cycle           ( +-  0.00% )
     5,981,082,962      cycles                                                        ( +-  0.54% )
        85,213,334      L1-dcache-load-misses                                         ( +-  0.35% )

           1.44124 +- 0.00892 seconds time elapsed  ( +-  0.62% )
  • quick sort
 Performance counter stats for './testbenchq' (6 runs):

        62,616,374      cache-misses              #   32.664 % of all cache refs      ( +-  1.02% )
       191,699,808      cache-references                                              ( +-  0.70% )
    11,955,203,545      instructions              #    1.00  insn per cycle           ( +-  0.07% )
    11,931,402,273      cycles                                                        ( +-  2.22% )
       136,257,022      L1-dcache-load-misses                                         ( +-  0.62% )

            2.8833 +- 0.0658 seconds time elapsed  ( +-  2.28% )
  • quick sort non recursive
 Performance counter stats for './testbenchqn' (6 runs):

         2,465,038      cache-misses              #   10.800 % of all cache refs      ( +-  1.36% )
        22,825,246      cache-references                                              ( +-  0.21% )
     4,794,616,891      instructions              #    1.41  insn per cycle           ( +-  0.03% )
     3,406,350,521      cycles                                                        ( +-  0.15% )
        12,775,396      L1-dcache-load-misses                                         ( +-  0.11% )

           0.83640 +- 0.00487 seconds time elapsed  ( +-  0.58% )
  • linux kernel merge sort
 Performance counter stats for './list_sort' (6 runs):

        16,893,343      cache-misses              #   16.097 % of all cache refs      ( +-  0.25% )
       104,949,775      cache-references                                              ( +-  0.19% )
     5,341,908,031      instructions              #    1.60  insn per cycle           ( +-  0.00% )
     3,340,490,601      cycles                                                        ( +-  0.83% )
        68,921,142      L1-dcache-load-misses                                         ( +-  0.18% )

            0.8189 +- 0.0103 seconds time elapsed  ( +-  1.26% )

linux kernel merge sort 和 merge sort 比較
linux kernel merge sort 在 cache reference 和 執行時間上 都較好的表現。
quick sort 和 quick sort non recursive 比較
quick sort non recursive 在 cache reference, CPI和執行時間 都有更好的表現。
linux kernel merge sort 和 quick sort non recursive 比較
linux kernel merge sort 在 CPI 及 執行時間好一點。
其中 quick sort non recursive 有比較好的 cache misses 表現,在 privot 及 non recursive 優化的效果是巨大的。

TODO:


延伸問題 第二題

  • 解釋題目程式碼運作原理
  • 研讀 Linux "power of 2" 相關,並說明其中的作用,及考量
  • 研讀 slab allocator,探索 Linux 核心的 slab 實作,並找出運用 power-of-2 的程式碼。

解釋題目程式碼運作原理

uint16_t func(uint16_t N)
{
    N = N | N >> 1;
    N = N | N >> 2;
    N = N | N >> 4;
    N = N | N >> 8;
    return (N+1) >> 1;
}

func(21) N = 101012 第一行 10101 | 01010 = 11111 回傳為 (N+1) >> 1 時 N = 100002 = 1610

is_power_of_2

/**
 * is_power_of_2() - check if a value is a power of two
 * @n: the value to check
 *
 * Determine whether some value is a power of two, where zero is
 * *not* considered a power of two.
 * Return: true if @n is a power of 2, otherwise false.
 */
static inline __attribute__((const))
bool is_power_of_2(unsigned long n)
{
	return (n != 0 && ((n & (n - 1)) == 0));
}

輸入參數 typeunsigned long 代表不考慮有號的情況,將實際數值帶入,令 n = 1610 = 100002 代入後 10000 & 01111 = 00000 故得 n is power of 2。
將非 2 的次方,代入時,令 n = 1310 = 11012 代入後1101 & 1100 = 1100 得到非零解,所以 n is not power of 2。

roundup_pow_of_two

/**
 * __roundup_pow_of_two() - round up to nearest power of two
 * @n: value to round up
 */
static inline __attribute__((const))
unsigned long __roundup_pow_of_two(unsigned long n)
{
	return 1UL << fls_long(n - 1);
}

fls_long 觀察一下

static inline unsigned fls_long(unsigned long l)
{
	if (sizeof(l) == 4)
		return fls(l);
	return fls64(l);
}

fls64fls find last set 找最後一個有值 bit 的位置,sizeof(l) 判斷 unsign long 因為 long 有 4 bytes (Windows) 也有 8 bytes (Linux)。

TODO: 許多現代微處理器提供 Find first set 指令,找出 Linux 核心原始程式碼對應的硬體實作定義,如 x86_64 和 aarch64。

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
jserv

aarch64: clz Count Leading Zeros
clz的操作 e.g. clz(0001010100011011) = 3,從MSB到最後1出現零的數量。

x86: TZCNT Counts the number of trailing least significant zero bits e.g. tzcnt(0000 0000 0000 0000 1000 0000 0000 1000) = 3 ,從 LSB 到1出現零的數量。 ffs = tzcnt + 1

Linux Armv5+ 為例

/*
 * ffs() returns zero if the input was zero, otherwise returns the bit
 * position of the first set bit, where the LSB is 1 and MSB is 32.
 */
static inline int ffs(int x)
{
	return fls(x & -x);
}

e.g. ffs(5403) = fls(0001010100011011 & 1110101011100101) = fls(0000000000000001) = 1 從 LSB 開始計算到第一個 1 出現的位置。

/*
 * fls() returns zero if the input is zero, otherwise returns the bit
 * position of the last set bit, where the LSB is 1 and MSB is 32.
 */
static inline int fls(int x)
{
	if (__builtin_constant_p(x))
	       return constant_fls(x);

	return 32 - __clz(x);
}

e.g. fls(0001010100011011) = 16 - 3 = 13 此為 16 bit 所以用 16 bit 相減。 從 LSB 開始計算到最後 1 的位置。

/*
 * On ARMv5 and above those functions can be implemented around the
 * clz instruction for much better code efficiency.  __clz returns
 * the number of leading zeros, zero input will return 32, and
 * 0x80000000 will return 0.
 */
static inline unsigned int __clz(unsigned int x)
{
	unsigned int ret;

	asm("clz\t%0, %1" : "=r" (ret) : "r" (x));

	return ret;
}

__clz 為比 clz 更有效率的實作 e.g. __clz(0x80000000) = 0 。

延伸閱讀:

TODO: 探討 Linux 核心原始程式碼中,為何會有 clz__clz 這樣貌似同名但實質不同用法的函式,從而分析 Linux 跨平台支援的手法

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
jserv

clz 在 arm 中的定義 CLZ{cond} Rd, Rm 其中 Rm operand register, Rd destination register。
__clz __ 在 linux 代表實際實作,而 clz 由個硬體進行提供。
clz 在 MIPS 中的定義 CLZ rd, rs 其中 rs source register, rd destination register。

參考資料:
MIPS
arm __clz complier
arm clz

1UL 為 unsigned long int 裡面值為 1 ,令 n = 1010 = 010102 代入得到 fls_long(n-1) 得到 4 也就是 1UL << 4 = 100002 = 16 達到 roundup the power of 2 無條件進位的目的。

其中fls_long(n-1)的 n-1 使原本就是 2 的次方的數值,保持一致

由此可以推測出 rounddown 只需要修改成 fls_long(n)-1 就成無條件刪去

slab allocator

我們需要一種配置記憶體的方式,來管理比 page 還小的記憶體,這就是 slab allocator 配置的大小將會以 power of two 來配置 (如:要 10 他就會配置 16)。
slab 配置方式 kmalloc: 負責配置比 page 還小的記憶體空間。(最小為 32 bytes 最大為 32MB)
/linux/slab.h

/*
 * Figure out which kmalloc slab an allocation of a certain size
 * belongs to.
 * 0 = zero alloc
 * 1 =  65 .. 96 bytes
 * 2 = 129 .. 192 bytes
 * n = 2^(n-1)+1 .. 2^n
 */
static __always_inline int kmalloc_index(size_t size)
{
	if (!size)
		return 0;

	if (size <= KMALLOC_MIN_SIZE)
		return KMALLOC_SHIFT_LOW;

	if (KMALLOC_MIN_SIZE <= 32 && size > 64 && size <= 96)
		return 1;
	if (KMALLOC_MIN_SIZE <= 64 && size > 128 && size <= 192)
		return 2;
	if (size <=          8) return 3;
	if (size <=         16) return 4;
	if (size <=         32) return 5;
	if (size <=         64) return 6;
	if (size <=        128) return 7;
	if (size <=        256) return 8;
	if (size <=        512) return 9;
	if (size <=       1024) return 10;
	if (size <=   2 * 1024) return 11;
	if (size <=   4 * 1024) return 12;
	if (size <=   8 * 1024) return 13;
	if (size <=  16 * 1024) return 14;
	if (size <=  32 * 1024) return 15;
	if (size <=  64 * 1024) return 16;
	if (size <= 128 * 1024) return 17;
	if (size <= 256 * 1024) return 18;
	if (size <= 512 * 1024) return 19;
	if (size <= 1024 * 1024) return 20;
	if (size <=  2 * 1024 * 1024) return 21;
	if (size <=  4 * 1024 * 1024) return 22;
	if (size <=  8 * 1024 * 1024) return 23;
	if (size <=  16 * 1024 * 1024) return 24;
	if (size <=  32 * 1024 * 1024) return 25;
	if (size <=  64 * 1024 * 1024) return 26;
	BUG();

	/* Will never be reached. Needed because the compiler may complain */
	return -1;
}
#endif /* !CONFIG_SLOB */

page 的大小並不會固定以 4KB 呈現,會因為不同的結構有所不同 aarch64 in linux 有 16KB, IA64 in linux 有 4KB, 8KB, 16KB, 64KB 不同。
參考資料:
Wikipedia: Page

slab 記憶體配置:
cache friendly and benchmark friendly
目的: page 記憶體配置由 page allocator,但很多時候 不需要 page 那麼大的空間,這時就可以使用 slab allocator 配置 page 以下的記憶體空間,而且 cache 對 slab 的配置是很敏感的,而且 slab 所存入將是 object ,如下圖所示

操作:
static struct kmem_cache 存指向下一個 kmem_cache 的指標, object 大小。
struct kmem_cache_node *node[MAN_NUMNODES] 其中將有 slab 的狀態,狀態可以分三種 full, partial, empty 代表 slab 的空間。
struct array_cache __percpu *cpu_cache 避免找空隙時需要 O(n) 所以會在 array 中放入空隙的 index 方便搜索空隙。
void *freelist page 空隙的位置。
void *s_mem 指向 page 第一個 object 。

NUMA (Non-uniform memory access):
在 SMP (對稱多處理器)時,要使用單一 BUS 向記憶體提取資料,而處理器處理資料的速度高於記憶體給資料速度,處理器會產生 "starved for data" 的情況,所以 NUMA 就希望每個處理器,都能有個 BUS 連接記憶體,避免壅塞的情形,所以延伸出的做法,每個處理器都有屬於自己的本地記憶體 ( local memory),這裡講的本地記憶體,是只靠近處理器的(記憶體包括: cache 、 memory)

NUMA 對 slab 的影響:
在 NUMA 下除了本地記憶體還可以透過詢問其他處理器使遠方記憶體可以進行配置,使 slab 在尋找 slabs free 變得緩慢,所以在 kmem_cache_node 中增加 partial list, full list, empty list,使得 slab allocator 效率整體上升 5%

修正前
w/o patch
Tasks    jobs/min  jti  jobs/min/task      real       cpu
    1      485.36  100       485.3640     11.99      1.91   Sat Apr 30 14:01:51 2005
  100    26582.63   88       265.8263     21.89    144.96   Sat Apr 30 14:02:14 2005
  200    29866.83   81       149.3342     38.97    286.08   Sat Apr 30 14:02:53 2005
  300    33127.16   78       110.4239     52.71    426.54   Sat Apr 30 14:03:46 2005
  400    34889.47   80        87.2237     66.72    568.90   Sat Apr 30 14:04:53 2005
  500    35654.34   76        71.3087     81.62    714.55   Sat Apr 30 14:06:15 2005
  600    36460.83   75        60.7681     95.77    853.42   Sat Apr 30 14:07:51 2005
  700    35957.00   75        51.3671    113.30    990.67   Sat Apr 30 14:09:45 2005
  800    33380.65   73        41.7258    139.48   1140.86   Sat Apr 30 14:12:05 2005
  900    35095.01   76        38.9945    149.25   1281.30   Sat Apr 30 14:14:35 2005
 1000    36094.37   74        36.0944    161.24   1419.66   Sat Apr 30 14:17:17 2005

修正後
w/patch
Tasks    jobs/min  jti  jobs/min/task      real       cpu
    1      484.27  100       484.2736     12.02      1.93   Sat Apr 30 15:59:45 2005
  100    28262.03   90       282.6203     20.59    143.57   Sat Apr 30 16:00:06 2005
  200    32246.45   82       161.2322     36.10    282.89   Sat Apr 30 16:00:42 2005
  300    37945.80   83       126.4860     46.01    418.75   Sat Apr 30 16:01:28 2005
  400    40000.69   81       100.0017     58.20    561.48   Sat Apr 30 16:02:27 2005
  500    40976.10   78        81.9522     71.02    696.95   Sat Apr 30 16:03:38 2005
  600    41121.54   78        68.5359     84.92    834.86   Sat Apr 30 16:05:04 2005
  700    44052.77   78        62.9325     92.48    971.53   Sat Apr 30 16:06:37 2005
  800    41066.89   79        51.3336    113.38   1111.15   Sat Apr 30 16:08:31 2005
  900    38918.77   79        43.2431    134.59   1252.57   Sat Apr 30 16:10:46 2005
 1000    41842.21   76        41.8422    139.09   1392.33   Sat Apr 30 16:13:05 2005

參考資料:
The Slab Allocator in the Linux kernel
NUMA aware slab allocator V4


延伸問題 第三題

  • 解釋程式碼運作原理,
  • 並嘗試重寫為同樣功能但效率更高的實作
  • 在 Linux 核心原始程式碼找出逐 bit 進行資料複製的程式碼,並解說相對應的情境
    while (count > 0) {
        uint8_t data = *source++;
        size_t bitsize = (count > 8) ? 8 : count;
        if (read_lhs > 0) {
            RRR;
            if (bitsize > read_rhs)
                data |= (*source >> read_rhs);
        }

        if (bitsize < 8)
            data &= read_mask[bitsize];

        uint8_t original = *dest;
        uint8_t mask = read_mask[write_lhs];
        if (bitsize > write_rhs) {
            /* Cross multiple bytes */
            *dest++ = (original & mask) | (data >> write_lhs);
            original = *dest & write_mask[bitsize - write_rhs];
            *dest = original | (data << write_rhs);
        } else {
            // Since write_lhs + bitsize is never >= 8, no out-of-bound access.
            DDD;
            *dest++ = (original & mask) | (data >> write_lhs);
        }

        count -= bitsize;
    }

由 if 的條件可以知道, read_lhs > 0 代表 _read & 7 > 0 意思是 _read 讀取的內容不足 8 bit , data = *source++ 所以我們要 RRR = data = data << read_lhs 使 data 成為 8 bit , bitsize > write_rhs 判斷寫入位元 及 位數,上面註解有寫 write_lhs + bitsize is never >= 8 DDD = mask |= mask_rw[write_lhs + bitsize];

重寫實作

可以使用一個表格就好,但表格的參數要有改變,用一個 bit 一個 bit 為代表。

static const uint8_t mask_rw[] = {
    0x00, /*    == 0    00000000b   */
    0x80, /*    == 1    10000000b   */
    0x40, /*    == 2    01000000b   */
    0x20, /*    == 3    00100000b   */
    0x10, /*    == 4    00010000b   */
    0x08, /*    == 5    00001000b   */
    0x04, /*    == 6    00000100b   */
    0x02, /*    == 7    00000010b   */
    0x01  /*    == 8    00000001b   */
};

經測試後得到相同結果

 1: 0: 0 1000000000000000000000000000000000000000000000000000000000000000
 1: 1: 0 0100000000000000000000000000000000000000000000000000000000000000
 1: 2: 0 0010000000000000000000000000000000000000000000000000000000000000
 ...
 1:15: 0 0000000000000001000000000000000000000000000000000000000000000000 

延伸問題 第四題

  • 解釋上述程式碼運作原理
  • 程式碼使用到 gcc atomics builtins,請透過 POSIX Thread 在 GNU/Linux 驗證多執行緒的環境中,string interning 能否符合預期地運作?若不能,請提出解決方案
  • chriso/intern 是個更好的 string interning 實作,請探討其特性和設計技巧,並透過內建的 benchmark 分析其效能表現

解釋程式運作原理

程式目的: string interning 將 string 存入 hash 中,當想使用時只需要將指標指向 string 即可,不需要額外存取,來改善記憶體的使用效率。

enum {
    CSTR_PERMANENT = 1,
    CSTR_INTERNING = 2,
    CSTR_ONSTACK = 4,
};

查看 cstring 是否有對應的 interning。

typedef struct __cstr_data {
    char *cstr;
    uint32_t hash_size;
    uint16_t type;
    uint16_t ref;
} * cstring;

cstr : 字串最長能存到 CSTR_INTERNING_SIZE = 32
hash_size : hash table 的大小
type : CSTR_ONSTACK string 存入 stack 中, CSTR_INTERNING 存入 heap 中, CSTR_PERMANENT 存入 loop 中。
ref reference counting:負責記錄空間是否有被使用過,有使用就會 count++







list_add_node_t



hash_table

cstr_buffer[0]

cstr_buffer[1]

cstr_buffer[2]

cstr_buffer[3]



node3

cstr

hash_size

type

ref:3



hash_table:f3->node3





count = 0 時,代表空間沒在被使用過,就可以 free 掉。

typedef struct __cstr_buffer {
    cstring str;
} cstr_buffer[1];

用 hash table 將 cstring 包裝起來。

#define CSTR_BUFFER(var)                                                      \
    char var##_cstring[CSTR_STACK_SIZE] = {0};                                \
    struct __cstr_data var##_cstr_data = {var##_cstring, 0, CSTR_ONSTACK, 0}; \
    cstr_buffer var;                                                          \
    var->str = &var##_cstr_data;

這 macro 表達創建一個空的 hash table , ## 為連接的意思 e.g. CSTR_BUFFER(king) 表示 king##_cstringking_cstring ,起始 type = CSTR_ONSTACK

struct __cstr_node {
    char buffer[CSTR_INTERNING_SIZE];
    struct __cstr_data str;
    struct __cstr_node *next;
};

CSTR_INTERNING_SIZE = 32

struct __cstr_pool {
    struct __cstr_node node[INTERNING_POOL_SIZE];
};

INTERNING_POOL_SIZE = 1024

cstring cstr_cat(cstr_buffer sb, const char *str)
{
    cstring s = sb->str;
    if (s->type == CSTR_ONSTACK) {
        int i = CCC;
        while (i < CSTR_STACK_SIZE - 1) {
            s->cstr[i] = *str;
            if (*str == 0)
                return s;
            ++s->hash_size;
            ++str;
            ++i;
        }
        s->cstr[i] = 0;
    }
    cstring tmp = s;
    sb->str = cstr_cat2(tmp->cstr, str);
    cstr_release(tmp);
    return sb->str;
}

cstr_cat function 目的:名稱 cat 可以看出,他是將 strsb->str 連接的意思,連接後還需要把 cstring 中的資料更新, 因此要將本來 cstring->hash_size + strlen(str) 並更新, i < CSTR_STACK_SIZE 代表還能寫入,CSTR_STACK_SIZE = 128代表寫入 cstring 中最長的字串長度 , i 代表已寫入字串的長度, CCC = i = s->hash_size

        while (i < CSTR_STACK_SIZE - 1) {
            s->cstr[i] = *str;
            if (*str == 0)
                return s;
            ++s->hash_size;
            ++str;
            ++i;
        }

依序將 str 中的字串,依序寫入 s->cstr[i] 中,當str == 0 return stype = CSTR_PERMANENTCSTR_INTERNING 才會執行 cstr_cat2

static cstring cstr_cat2(const char *a, const char *b)
{
    size_t sa = strlen(a), sb = strlen(b);
    if (sa + sb < CSTR_INTERNING_SIZE) {
        char tmp[CSTR_INTERNING_SIZE];
        memcpy(tmp, a, sa);
        memcpy(tmp + sa, b, sb);
        tmp[sa + sb] = 0;
        return cstr_interning(tmp, sa + sb, hash_blob(tmp, sa + sb));
    }
    cstring p = xalloc(sizeof(struct __cstr_data) + sa + sb + 1);
    if (!p)
        return NULL;

    char *ptr = (char *) (p + 1);
    p->cstr = ptr;
    p->type = 0;
    p->ref = 1;
    memcpy(ptr, a, sa);
    memcpy(ptr + sa, b, sb);
    ptr[sa + sb] = 0;
    p->hash_size = 0;
    return p;
}

sa + sb < CSTR_INTERNING_SIZE 代表有空間寫入sa + sb > CSTR_INTERNING_SIZE 代表空間需要再額外空間才能寫入,所以呼叫 xalloc 存入 heap 當中。

struct __cstr_interning {
    int lock;
    int index;
    unsigned size;
    unsigned total;
    struct __cstr_node **hash;
    struct __cstr_pool *pool;
};

lock : 上鎖數量
index : pool 的 已使用的數量
size : hash table 的大小
total: hash table 已使用的數量
buffer : 會存入與 cstr 相同的 string







hash_table_string



hash_table

cstr_buffer[0]

cstr_buffer[1]

cstr_buffer[2]

cstr_buffer[3]



data3

cstr

hash_size

type

ref:3



hash_table:f3->data3





node1

buffer

str

next



node1:f1->data3





buffer

buffer[0]

buffer[1]

...

buffer[31]



node1:f0->buffer:f0





hash

hash[0]

hash[1]

hash[2]

...

hash[15]



hash:f1->node1:f1





pool1

pool[0]

pool[1]

pool[2]

...

pool[1023]



pool1:f0->node1:f1





interning

lock

index

size

total

hash

pool



interning:f4->hash:f1





interning:f5->pool1





static cstring cstr_interning(const char *cstr, size_t sz, uint32_t hash)
{
    cstring ret;
    CSTR_LOCK();
    ret = interning(&__cstr_ctx, cstr, sz, hash);
    if (!ret) {
        expand(&__cstr_ctx);
        ret = interning(&__cstr_ctx, cstr, sz, hash);
    }
    ++__cstr_ctx.total;
    CSTR_UNLOCK();
    return ret;
}

在寫入時,CSTR_LOCK() 會 lock 住,並在寫完後解鎖 CSTR_UNLOCK()

static cstring interning(struct __cstr_interning *si,
                         const char *cstr,
                         size_t sz,
                         uint32_t hash)
{
    if (!si->hash)
        return NULL;

    int index = (int) (hash & (si->size - 1));
    struct __cstr_node *n = si->hash[index];
    while (n) {
        if (n->str.hash_size == hash) {
            if (!strcmp(n->str.cstr, cstr))
                return &n->str;
        }
        n = n->next;
    }
    // 80% (4/5) threshold
    if (si->total * 5 >= si->size * 4)
        return NULL;
    if (!si->pool) {
        si->pool = xalloc(sizeof(struct __cstr_pool));
        si->index = 0;
    }
    n = &si->pool->node[si->index++];
    memcpy(n->buffer, cstr, sz);
    n->buffer[sz] = 0;

    cstring cs = &n->str;
    cs->cstr = n->buffer;
    cs->hash_size = hash;
    cs->type = CSTR_INTERNING;
    cs->ref = 0;

    n->next = si->hash[index];
    si->hash[index] = n;

    return cs;
}

pool : 將會是連續的依序存取 pool[index-1]
hash table : 會 parameter 存入對應的 hash[hash]

static inline uint32_t hash_blob(const char *buffer, size_t len)
{
    const uint8_t *ptr = (const uint8_t *) buffer;
    size_t h = len;
    size_t step = (len >> 5) + 1;
    for (size_t i = len; i >= step; i -= step)
        h = h ^ ((h << 5) + (h >> 2) + ptr[i - 1]);
    return h == 0 ? 1 : h;
}