Try   HackMD

2021q1 第 3 週測驗題

tags: linux2021

目的: 檢驗學員對 bitwise operation, bit-field, C 語言:記憶體管理 的認知

題目解說錄影
作答表單

測驗 1

在伺服器運算 (server-side computing) 領域中,常會遇到頻繁的短字串處理,為此 Facebook 特別發展 folly::fbstring 作為 C++ std::string 的高效能替代品,而 folly::fbstring 之所以可有效能的突破,在於透過緊湊的記憶體佈局,施行 SSO (small string optimization) 和 CoW (copy on write) 這兩種最佳化手法:

  1. 當字串長度小於等於 23 時,會使用堆疊 (stack) 上的空間來保存字串。此時被看作「短字串」;
  2. 當字串長度介於 24 和 254 (含) 之間,視作「中等長度的字串」,採用動態配置記憶體;
  3. 當字串長度大於 255 時,視作「長字串」,會透過 CoW 手段進行最佳化:進行字串複製操作時,倘若字串本身沒有修改,就會共享記憶體空間,從而減少記憶體佔用和省去複製的開銷。換言之,真正的「複製」操作僅發生在字串內容發生變更。

在 stack 上的空間 (也就是不計算透過 malloc 所取得的 heap 空間) 佔用尤其精巧,folly::fbstring 本體使用 24 個位元組,但卻能表達 23 個位元組長度的字串,沒有任何ㄧ個位元組浪費。相較之下,std::string 本身佔用 32 個位元組,但在堆疊上卻只能表達 16 個位元組的字串。

folly::fbstring 實作主要手法透過下方的 union:

union {
    struct {
        char *data;
        size_t size;
        size_t capacity;
    } heap;

    struct {
        char data[23];
        int8_t length;    // align 1
    } stack; 
};

這個 union 的佔用空間為 23 個 char (即 data[]) 加上 1 個 int8_t (即 length),合計 24 個位元組,上述的「短字串」就保存在此,而「中等長度」和「長字串」顯然就要透過 char * 放到 heap 空間。接著討論以下三個問題:

  1. 「短字串」的長度記錄在何處?
  2. 既然做了三種字串分類,如何區分?
  3. 「長字串」做 CoW 時,也有 reference counting,那 counter 要存哪裡?

folly::fbstring 精緻的設計可依序解答上述問題:

  1. 「短字串」的長度記錄於最後一個成員 (即 length) 裡頭,因為至多 24 個位元長度,那用一個位元組來描述長度就綽綽有餘;
  2. 既然「短字串」長度不會超過 24,那就利用 length 成員最高 2 個位元來記錄型態即可。也因挪用 2 個位元,須考慮機器的 endian (Big 或 little endian)。可透過程式碼的 kIsLittleEndian 來定義;
    • 假設短字串 23 個位元組都拿來保存字串,結尾就是 \0,因此是 0x00。挪用 2 個位元紀錄,它把短字串訂成 00,中等長度中字串訂成 10,長字串訂成 01。短字串結尾是 0x00,不受影響;
    • 巧妙之處在於,在短字串底下,不紀錄長度,而是紀錄「最大短字串長度減去現在長度」。最大短字串長度是 23,若現在字串長度也是 23,那最後一個位元組就是 0,剛好也是 \0;
    • 如果字串長度為 22 個位元組。倒數第二個位元組是\0,最後一個位元組是 1,存放 1 這個位元組,最前 2 個位元用以紀錄短中長字串形態的資訊,而短字串本來就是 00 兩個位元,仍不影響;
  3. 因為已可依據上述 (2) 區分出型態,於是 char *data 就可以保存 reference counter;

這裡我們嘗試用 C 語言重寫上述的 folly::fbstring,預期的應用案例如下:

    xs string = *xs_tmp("  foobarbar  \n\n");
    xs_trim(&string, "\n ");
    printf("[%s] : %2zu\n", xs_data(&string), xs_size(&string));

    xs prefix = *xs_tmp("((("), suffix = *xs_tmp(")))");
    xs_concat(&string, &prefix, &suffix);
    printf("[%s] : %2zu\n", xs_data(&string), xs_size(&string));

參考執行結果:

[foobarbar] :  9
[(((foobarbar)))] : 15

起初我們建立一個形態為 xs 的物件,使其內容為 " foobarbar \n\n",藉由 xs_trim 將該物件裡頭的字串去除前後空白字元,達成「修剪」(trim 本意) 作用,接著再將 foobarbar 字串 (長度為 9) 前後加上 ((())),最終字串長度為 15

foobar 是電腦程式領域裡頭的術語,無實際用途和參考意義,一如漢語裡頭張三李四,也可簡稱 "foo"。這裡的 "foobarbar" 則是「富爸爸」的諧音 (靈感來自書名《富爸爸·窮爸爸》)。
[ 出處 ] 1938 年 Robert Clampett 在華納卡通導演的達菲鴨 (Daffy Doc),達菲鴨做個手勢說 "SILENCE IS FOO!" 是極早版本

我們仿效 folly::fbstring,提出以下實作:

typedef union {
    /* allow strings up to 15 bytes to stay on the stack
     * use the last byte as a null terminator and to store flags
     * much like fbstring:
     * https://github.com/facebook/folly/blob/master/folly/docs/FBString.md
     */
    char data[16];

    struct {
        uint8_t filler[15],
            /* how many free bytes in this stack allocated string
             * same idea as fbstring
             */
            space_left : 4,
            /* if it is on heap, set to 1 */
            is_ptr : 1, flag1 : 1, flag2 : 1, flag3 : 1;
    };

    /* heap allocated */
    struct {
        char *ptr;
        /* supports strings up to 2^54 - 1 bytes */
        size_t size : 54,
            /* capacity is always a power of 2 (unsigned)-1 */
            capacity : 6;
        /* the last 4 bits are important flags */
    };
} xs;

xs union 定義 16 個位元組,應用與實作細節如下:

  • 字串長度小於或等於 15 個位元組,則放在 stack。
  • 字串長度大於或等於 16 個位元組,則放在 heap (透過 malloc 系統呼叫配置所需字串大小)
  • heap struct
    • ptr: 8 個位元組指標 (64 位元架構: x86_64/aarch64 等等)。
    • size: 字串長度。因定義 54 位元,故最大字串長度為
      2541
      位元組。
    • capacity: 從 heap 配置的空間大小,其單位為 2capacity,故用 6 個位元即可涵蓋 size 長度。
    • 有 4 個位元沒有定義,是為了避免覆寫另一結構成員: is_ptr, flag1, flag 2flag3

完整程式碼如下: (檔名為 xs.c)

#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>

#define MAX_STR_LEN_BITS (54)
#define MAX_STR_LEN ((1UL << MAX_STR_LEN_BITS) - 1)

#define LARGE_STRING_LEN 256

typedef union {
    /* allow strings up to 15 bytes to stay on the stack
     * use the last byte as a null terminator and to store flags
     * much like fbstring:
     * https://github.com/facebook/folly/blob/master/folly/docs/FBString.md
     */
    char data[16];

    struct {
        uint8_t filler[15],
            /* how many free bytes in this stack allocated string
             * same idea as fbstring
             */
            space_left : 4,
            /* if it is on heap, set to 1 */
            is_ptr : 1, is_large_string : 1, flag2 : 1, flag3 : 1;
    };

    /* heap allocated */
    struct {
        char *ptr;
        /* supports strings up to 2^MAX_STR_LEN_BITS - 1 bytes */
        size_t size : MAX_STR_LEN_BITS,
                      /* capacity is always a power of 2 (unsigned)-1 */
                      capacity : 6;
        /* the last 4 bits are important flags */
    };
} xs;

static inline bool xs_is_ptr(const xs *x) { return x->is_ptr; }

static inline bool xs_is_large_string(const xs *x)
{
    return x->is_large_string;
}

static inline size_t xs_size(const xs *x)
{
    return xs_is_ptr(x) ? x->size : 15 - x->space_left;
}

static inline char *xs_data(const xs *x)
{
    if (!xs_is_ptr(x))
        return (char *) x->data;

    if (xs_is_large_string(x))
        return (char *) (x->ptr + OFF);
    return (char *) x->ptr;
}

static inline size_t xs_capacity(const xs *x)
{
    return xs_is_ptr(x) ? ((size_t) 1 << x->capacity) - 1 : 15;
}

static inline void xs_set_refcnt(const xs *x, int val)
{
    *((int *) ((size_t) x->ptr)) = val;
}

static inline void xs_inc_refcnt(const xs *x)
{
    if (xs_is_large_string(x))
        ++(*(int *) ((size_t) x->ptr));
}

static inline int xs_dec_refcnt(const xs *x)
{
    if (!xs_is_large_string(x))
        return 0;
    return --(*(int *) ((size_t) x->ptr));
}

static inline int xs_get_refcnt(const xs *x)
{
    if (!xs_is_large_string(x))
        return 0;
    return *(int *) ((size_t) x->ptr);
}

#define xs_literal_empty() \
    (xs) { .space_left = 15 }

/* lowerbound (floor log2) */
static inline int ilog2(uint32_t n) { return LLL; }

static void xs_allocate_data(xs *x, size_t len, bool reallocate)
{
    /* Medium string */
    if (len < LARGE_STRING_LEN) {
        x->ptr = reallocate ? realloc(x->ptr, (size_t) 1 << x->capacity)
                            : malloc((size_t) 1 << x->capacity);
        return;
    }

    /* Large string */
    x->is_large_string = 1;

    /* The extra 4 bytes are used to store the reference count */
    x->ptr = reallocate ? realloc(x->ptr, (size_t)(1 << x->capacity) + 4)
                        : malloc((size_t)(1 << x->capacity) + 4);

    xs_set_refcnt(x, 1);
}

xs *xs_new(xs *x, const void *p)
{
    *x = xs_literal_empty();
    size_t len = strlen(p) + 1;
    if (len > NNN) {
        x->capacity = ilog2(len) + 1;
        x->size = len - 1;
        x->is_ptr = true;
        xs_allocate_data(x, x->size, 0);
        memcpy(xs_data(x), p, len);
    } else {
        memcpy(x->data, p, len);
        x->space_left = 15 - (len - 1);
    }
    return x;
}

/* Memory leaks happen if the string is too long but it is still useful for
 * short strings.
 */
#define xs_tmp(x)                                                   \
    ((void) ((struct {                                              \
         _Static_assert(sizeof(x) <= MAX_STR_LEN, "it is too big"); \
         int dummy;                                                 \
     }){1}),                                                        \
     xs_new(&xs_literal_empty(), x))

/* grow up to specified size */
xs *xs_grow(xs *x, size_t len)
{
    char buf[16];

    if (len <= xs_capacity(x))
        return x;

    /* Backup first */
    if (!xs_is_ptr(x))
        memcpy(buf, x->data, 16);

    x->is_ptr = true;
    x->capacity = ilog2(len) + 1;

    if (xs_is_ptr(x)) {
        xs_allocate_data(x, len, 1);
    } else {
        xs_allocate_data(x, len, 0);
        memcpy(xs_data(x), buf, 16);
    }
    return x;
}

static inline xs *xs_newempty(xs *x)
{
    *x = xs_literal_empty();
    return x;
}

static inline xs *xs_free(xs *x)
{
    if (xs_is_ptr(x) && xs_dec_refcnt(x) <= 0)
        free(x->ptr);
    return xs_newempty(x);
}

static bool xs_cow_lazy_copy(xs *x, char **data)
{
    if (xs_get_refcnt(x) <= 1)
        return false;

    /* Lazy copy */
    xs_dec_refcnt(x);
    xs_allocate_data(x, x->size, 0);

    if (data) {
        memcpy(xs_data(x), *data, x->size);

        /* Update the newly allocated pointer */
        *data = xs_data(x);
    }
    return true;
}

xs *xs_concat(xs *string, const xs *prefix, const xs *suffix)
{
    size_t pres = xs_size(prefix), sufs = xs_size(suffix),
           size = xs_size(string), capacity = xs_capacity(string);

    char *pre = xs_data(prefix), *suf = xs_data(suffix),
         *data = xs_data(string);

    xs_cow_lazy_copy(string, &data);

    if (size + pres + sufs <= capacity) {
        memmove(data + pres, data, size);
        memcpy(data, pre, pres);
        memcpy(data + pres + size, suf, sufs + 1);

        if (xs_is_ptr(string))
            string->size = size + pres + sufs;
        else
            string->space_left = 15 - (size + pres + sufs);
    } else {
        xs tmps = xs_literal_empty();
        xs_grow(&tmps, size + pres + sufs);
        char *tmpdata = xs_data(&tmps);
        memcpy(tmpdata + pres, data, size);
        memcpy(tmpdata, pre, pres);
        memcpy(tmpdata + pres + size, suf, sufs + 1);
        xs_free(string);
        *string = tmps;
        string->size = size + pres + sufs;
    }
    return string;
}

xs *xs_trim(xs *x, const char *trimset)
{
    if (!trimset[0])
        return x;

    char *dataptr = xs_data(x), *orig = dataptr;

    if (xs_cow_lazy_copy(x, &dataptr))
        orig = dataptr;

    /* similar to strspn/strpbrk but it operates on binary data */
    uint8_t mask[32] = {0};

#define check_bit(byte) (CCC)
#define set_bit(byte) (SSS)
    size_t i, slen = xs_size(x), trimlen = strlen(trimset);

    for (i = 0; i < trimlen; i++)
        set_bit(trimset[i]);
    for (i = 0; i < slen; i++)
        if (!check_bit(dataptr[i]))
            break;
    for (; slen > 0; slen--)
        if (!check_bit(dataptr[slen - 1]))
            break;
    dataptr += i;
    slen -= i;

    /* reserved space as a buffer on the heap.
     * Do not reallocate immediately. Instead, reuse it as possible.
     * Do not shrink to in place if < 16 bytes.
     */
    memmove(orig, dataptr, slen);
    /* do not dirty memory unless it is needed */
    if (orig[slen])
        orig[slen] = 0;

    if (xs_is_ptr(x))
        x->size = slen;
    else
        x->space_left = 15 - slen;
    return x;
#undef check_bit
#undef set_bit
}

int main(int argc, char *argv[])
{
    xs string = *xs_tmp("\n foobarbar \n\n\n");
    xs_trim(&string, "\n ");
    printf("[%s] : %2zu\n", xs_data(&string), xs_size(&string));

    xs prefix = *xs_tmp("((("), suffix = *xs_tmp(")))");
    xs_concat(&string, &prefix, &suffix);
    printf("[%s] : %2zu\n", xs_data(&string), xs_size(&string));
    return 0;
}

Count Leading Zeros (clz) 或名 Number of Leading Zeros (nlz) 為計算 2 進位數從 MSB 往右數遇到的第一個 1 之前所有 0 的數量,因此,clz(0001010100011011) 會得到 3。GCC 內建了許多功能強大的函式,其中一項就是 __builtin_clz,注意,當參數為 0 時,結果未定義:

int __builtin_clz (unsigned int x)
Returns the number of leading 0-bits in x, starting at the most significant bit position. If x is 0, the result is undefined.
出處: Other Built-in Functions Provided by GCC

編譯方式:

$ gcc -o xs -std=gnu11 xs.c

請補完程式碼,使其執行結果符合預期。

作答區

OFF = ?

  • (a) 1
  • (b) 2
  • (c) 3
  • (d) 4

LLL = ?

  • (a) 32 - __builtin_clz(n)
  • (b) 32 - __builtin_clz(n) - 1
  • (c) 32 - __builtin_clz(n) + 1

NNN = ?

  • (a) 16
  • (b) 15
  • (c) 14
  • (d) 8
  • (e) 7

CCC = ?

  • (a) mask[(uint8_t) byte / 8] << (uint8_t) byte % 8
  • (b) mask[(uint8_t) byte / 8] >> (uint8_t) byte % 8
  • (c) mask[(uint8_t) byte / 8] + 1 << (uint8_t) byte % 8
  • (d) mask[(uint8_t) byte / 8] & 1 << (uint8_t) byte % 8
  • (e) mask[(uint8_t) byte / 8] | 1 << (uint8_t) byte % 8
  • (f) mask[(uint8_t) byte / 8] ^ 1 << (uint8_t) byte % 8
  • (g) mask[(uint8_t) byte / 8] >> 1 << (uint8_t) byte % 8

SSS = ?

  • (a) mask[(uint8_t) byte / 8] ^= 1 << (uint8_t) byte % 8
  • (b) mask[(uint8_t) byte / 8] <<= (uint8_t) byte % 8
  • (c) mask[(uint8_t) byte / 8] |= 1 << (uint8_t) byte % 8
  • (d) mask[(uint8_t) byte / 8] &= 1 << (uint8_t) byte % 8
  • (e) mask[(uint8_t) byte / 8] >>= (uint8_t) byte % 8

參考資料:

延伸問題:

  1. 解釋上述程式碼運作原理,並提出改進方案;
  2. 以上述程式碼為基礎,提供字串複製 (應考慮到 CoW) 的函式實作;
  3. 設計實驗,確認上述字串處理最佳化 (針對空間效率) 帶來的效益,應考慮到 locality of reference,善用 GDB 追蹤和分析,確認字串的確符合預期地配置於 stack 或 heap;
  4. 嘗試將 quiz2 提到的 string interning 機制整合,提出對空間高度最佳化的字串處理工具函式庫
  5. 在 Linux 核心原始程式碼中,找出類似手法的 SSO 或 CoW 案例並解說;