Try   HackMD

2023q1 第 8 週測驗題

目的: 檢驗學員對並行程式設計、記憶體管理的認知

作答表單: 測驗 1-2 (針對 Linux 核心「設計」課程)
作答表單: 測驗 3 (針對 Linux 核心「實作」課程)

測驗 1

並行和多執行緒程式設計〉和〈Linux 核心模組運作原理〉提及 reference counting,以下程式碼嘗試以 C11 Atomics 實作,參考執行輸出:

hread 1632626432, 0: Hello, world!
Thread 1632626432, 1: Hello, world!
Thread 1632626432, 2: Hello, world!
Thread 1632626432, 3: Hello, world!
...
Thread 1103886080, 97: Hello, world!
Thread 1103886080, 98: Hello, world!
Thread 1103886080, 99: Hello, world!

其中 1103886080 是執行緒的編號 (thread ID)。一旦 REFCNT_TRACE 事先定義,程式輸出如下:

...
main.c:141:(test_thread) refcnt_t_unref(str2)main.c:138:(test_thread) refcnt_t_ref(str)Thread 1174378240, 97: Hello, world!
main.c:141:(test_thread) refcnt_t_unref(str2)main.c:138:(test_thread) refcnt_t_ref(str)Thread 1174378240, 98: Hello, world!
main.c:141:(test_thread) refcnt_t_unref(str2)main.c:138:(test_thread) refcnt_t_ref(str)Thread 1174378240, 99: Hello, world!

原始程式碼如下 (檔名: main.c,部分遮蔽)

/*
 * Generic reference counting in C.
 *
 * Use refcnt_malloc/refcnt_unref in instead of malloc/free. Use refcnt_ref to
 * duplicate a reference as necessary. Use refcnt unref to stop using a pointer.
 *
 * The resulting string must be released using refcnt_unref since refcnt_strdup
 * function utilizes refcnt_malloc.
 *
 * This implementation is thread-safe with the exception of refcnt_realloc. If
 * you need to use refcnt_realloc in a multi-threaded environment, you must
 * synchronize access to the reference.
 *
 * If you define REFCNT_CHECK, references passed into refcnt_ref and and
 * refcnt_unref will be checked that they were created by refcnt_malloc. This is
 * useful for debugging, but it will slow down your program somewhat.
 *
 * If you define REFCNT_TRACE, refcnt_malloc and refcnt_unref will print
 * the line number and file name where they were called. This is useful for
 * debugging memory leaks or use after free errors.
 */

#include <assert.h>
#include <stdatomic.h>
#include <stddef.h>
#include <stdlib.h>
#include <string.h>

#define REFCNT_CHECK

#define maybe_unused __attribute__((unused))

typedef struct {
#ifdef REFCNT_CHECK
    int magic;
#endif
    atomic_uint refcount;
    char data[];
} refcnt_t;

#ifdef REFCNT_TRACE
#define _REFCNT_TRACE(call)                                                \
    ({                                                                     \
        fprintf(stderr, "%s:%d:(%s) %s", __FILE__, __LINE__, __FUNCTION__, \
                #call);                                                    \
        call;                                                              \
    })
#define refcnt_malloc refcnt_t_malloc
#define refcnt_realloc refcnt_t_realloc
#define refcnt_ref refcnt_t_ref
#define refcnt_unref refcnt_t_unref
#define refcnt_strdup refcnt_t_strdup
#endif

#define REFCNT_MAGIC 0xDEADBEEF

static maybe_unused void *refcnt_malloc(size_t len)
{
    refcnt_t *ref = malloc(sizeof(refcnt_t) + len);
    if (!ref)
        return NULL;
#ifdef REFCNT_CHECK
    ref->magic = REFCNT_MAGIC;
#endif
    atomic_init(&ref->refcount, 1);
    return ref->data;
}

static maybe_unused void *refcnt_realloc(void *ptr, size_t len)
{
    refcnt_t *ref = (void *) (ptr - offsetof(AAAA, BBBB));
#ifdef REFCNT_CHECK
    assert(ref->magic == REFCNT_MAGIC);
#endif
    ref = realloc(ref, sizeof(refcnt_t) + len);
    if (!ref)
        return NULL;
    return ref->data;
}

static maybe_unused void *refcnt_ref(void *ptr)
{
    refcnt_t *ref = (void *) (ptr - offsetof(refcnt_t, data));
#ifdef REFCNT_CHECK
    assert(ref->magic == REFCNT_MAGIC && "Invalid refcnt pointer");
#endif
    CCCC(&ref->refcount, 1);
    return ref->data;
}

static maybe_unused void refcnt_unref(void *ptr)
{
    refcnt_t *ref = (void *) (ptr - offsetof(DDDD, EEEE));
#ifdef REFCNT_CHECK
    assert(ref->magic == REFCNT_MAGIC && "Invalid refcnt pointer");
#endif
    if (FFFF(&ref->refcount, 1) == 1)
        free(ref);
}

static maybe_unused char *refcnt_strdup(char *str)
{
    refcnt_t *ref = malloc(sizeof(refcnt_t) + strlen(str) + 1);
    if (!ref)
        return NULL;
#ifdef REFCNT_CHECK
    ref->magic = REFCNT_MAGIC;
#endif
    atomic_init(&ref->refcount, 1);
    strcpy(ref->data, str);
    return ref->data;
}

#ifdef REFCNT_TRACE
#undef refcnt_malloc
#undef refcnt_realloc
#undef refcnt_ref
#undef refcnt_unref
#undef refcnt_strdup
#define refcnt_malloc(len) _REFCNT_TRACE(refcnt_t_malloc(len))
#define refcnt_realloc(ptr, len) _REFCNT_TRACE(refcnt_t_realloc(ptr, len))
#define refcnt_ref(ptr) _REFCNT_TRACE(refcnt_t_ref(ptr))
#define refcnt_unref(ptr) _REFCNT_TRACE(refcnt_t_unref(ptr))
#define refcnt_strdup(ptr) _REFCNT_TRACE(refcnt_t_strdup(ptr))
#endif

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

#define N_ITERATIONS 100

static void *test_thread(void *arg)
{
    char *str = arg;
    for (int i = 0; i < N_ITERATIONS; i++) {
        char *str2 = refcnt_ref(str);
        fprintf(stderr, "Thread %u, %i: %s\n", (unsigned int) GGGG, i,
                str2);
        refcnt_unref(str2);
    }
    refcnt_unref(str);
    return NULL;
}

#define N_THREADS 64

int main(int argc, char **argv)
{
    /* Create threads */
    pthread_t threads[N_THREADS];

    /* Create a new string that is count referenced */
    char *str = refcnt_strdup("Hello, world!");

    /* Start the threads, passing a new counted copy of the referece */
    for (int i = 0; i < N_THREADS; i++)
        pthread_create(&threads[i], NULL, test_thread, refcnt_ref(str));

    /* We no longer own the reference */
    refcnt_unref(str);

    /* Whichever thread finishes last will free the string */
    for (int i = 0; i < N_THREADS; i++)
        pthread_join(threads[i], NULL);

    void *ptr = malloc(100);
    /* This should cause a heap overflow while checking the magic num which the
     * sanitizer checks.
     * Leaving commented out for now
     */
    // refcnt_ref(ptr);

    free(ptr);
    return 0;
}

編譯方式如下:

gcc -o main -Wall -O2 -fsanitize=thread main.c -lpthread

請補完程式碼,使其運作符合預期,且不會觸發 Thread Sanitizer 在執行時期的各項錯誤或警告。作答規範:

  • 以最精簡的程式碼實作,均不包含空白字元
  • AAAA, BBBB, DDDD, EEEE 均為 identifier
  • CCCCFFFF 是 C11 atomics 規範的函式,均以 atomic_fetch_ 開頭
  • GGGG 是 POSIX Thread 規範的函式

延伸問題:

  1. 解釋上述程式碼運作原理,指出其實作缺失並改進
  2. 研讀〈Linux 核心模組運作原理〉並在 Linux 核心原始程式碼中找出相關 reference counting 的程式碼,予以解讀和分析
  3. 解釋為何 Linux 核心的同步機制不依賴 reference counting (提示: 參閱 RCU 的設計理念)

測驗 2

Linked list (鏈結串列) 允許時間複雜度為

O(1) 的隨機插入 (insert) 及隨機移去 (remove) 節點,但搜尋卻需要
O(n)
時間複雜度。假設一個鏈結串列內容如下:
HEAD110183822394759next

從開頭 (即上方 HEAD) 找起,59 這個節點需要存取或比對 7 次才能找到,你直覺可能想到二分法,但對於 linked list 來說,隨機存取節點的時間複雜度仍是

O(n),也就是說,二分法無法帶來效益。

我們可在鏈結串列中,增加額外的 "skip" (作用是提供資料存取的捷徑) 節點,如下:

"skip" 一詞在英語的意思很多,這裡取韋伯字典提出的解釋: "to bound off one point after another"

如此一來,我們就可依據 "skip" 節點查詢,只需要比對 3 次即可找到上述的 59。至於若想搜尋 47,我們先根據 "skip" 節點來比對,於是在節點 22 上,它的 "skip" 指標會指向 59,後者大於 47,因此我們可知,47 可能會存在於節點 22 和節點 59 之間,這時候再根據鏈結串列順序搜尋,一共要比對 5 次。

隨著節點的增多,我們的 "skip" 鏈結串列會變成這樣:

"skip" 節點的密度約為普通節點的一半,能否再提高密度呢?我們可在這基礎上再加一層新的 "skip" 節點,這層節點的密度為第一層 "skip" 節點的一半。

換個視角,呈現如下:

我們再給予新的限制:每層的 "skip" 節點都由它下一層的 "skip" 節點中產生,最終我們可得類似下圖的 "Skip List":

Skip List 示意圖

於是,我們不再區分每層是原節點還是 "skip" 節點,而將最底下的那層節點通稱為第一層 (level 1) 節點,第一層節點上面為第二層 (level 2) 節點,再來是第三層,以此類推。假設每層的節點數為下一層的一半,於是搜尋的時間複雜度為

O(logn)

一般的 linked list 搜尋時必須從第一個開始,按照順序一個一個搜尋,因此不論是搜尋或存取的時間複雜度皆為

O(N),而 Skip list 可將存取、搜尋、新增和刪除等操作的平均時間複雜度降到
O(logN)

Know Thy Complexities!

skip list 是種針對 sorted linked list 的資料結構,透過不同 level 做出不同的 linked list 達到加速搜尋。
舉例來說,當我們想在上方 Skip List 示意圖中搜尋 7,步驟如下:

  1. 從 level 4 (圖中左上方) 比對,因 1 <= 7 得知 level 4 不存在 7,但其他 level 可能具備。於是跳到 level 3 繼續找
  2. 從 level 3 比對,因 4 <= 76 <= 7 得知 level 3 一樣不存在 7,於是跳到 level 2 繼續找
  3. 從 level 2 比對,因 9 >= 7 表示此 level 不存在 7,跳去 level 1
  4. 在 level 1 比對,發現 7

總共比對為 5 次,比直接循序查詢(需要 7 次存取) 還快。

如之前所及,skip list 是具有分層結構的鏈結串列,那麼每層的節點該如何產生呢?

我們可在新增元素時,使用隨機方法 (稱作 "coin flip",也就是「擲硬幣看正反面結果」) 決定這個元素有幾層節點。設定該元素僅有 1 層節點機率為

12,且有 2 層節點機率為
14
,僅有 3 層節點機率為
18
,以此類推。然後觸發隨機事件,當機率為
12
的事件發生時,該元素有 1 層節點,機率為
12
的事件發生時,該元素有 2 層節點,以此類推。另外,我們限定一個 "skip" 表格應當具有最大的層數限制。

假設一個 "skip" 表格最大層數限制為4,於是我們可設定一個整數區間為

[1,241],即
[1,8]
。然後取一個介於 1 到 8 之間的隨機數,當落在
[5,8]
區間時有 1 層節點,落在
[3,4]
區間時有 2 層節點,落在
[2,2]
區間時有 3 層,落在
[1,1]
上時有 4 層。

總結 Skip List 的特徵:

  • 第一層包含所有的元素
  • 每一層都是個有序的 linked list
  • skip list 是隨機產生,所以即使元素完全相同的 linked list ,最終的生成的 skip list 有可能不一樣
  • 如果節點 x 出現在第 i 層,則所有比 i 小的層都包含 x

建立 skip list 的流程:

  1. 最底層 (level 1) 是包含所有元素的 linked list
  2. 保留第一個和最後一個元素,從其他元素中隨機選出一些元素形成新的一層 linked list
  3. 為剛選出的每個元素新增指標,指向下一層和自己相等的元素
  4. 重複上述 (2) 和 (3) 步驟,直到不再能選擇出除了第一個和最後一個元素以外的元素

下方動畫展示在 Skip List 新增節點:

Google 的開放原始碼 Key-Value storage (可簡稱 KV) 專案 LevelDB 和 Facebook 延伸 LevelDB 而發展出的 RocksDB,都用到 Skip List 的變形。Redis 內建一個高效率的 Skip List 實作。

延伸閱讀:

論文〈Cache-Sensitive Skip List: Efficient Range Queries on modern CPUs

Figure 1

Skip list 由多個不同層級的鍵值通道 (Lane) 組成,如圖 Figure 1。

在第 Level i 的通道中,平均具有

n×pi 個元素,其中 n 為元素總數且
0<p<1
,所以 Skip list 是一個透過機率所決定出來的資料結構,雖然可以很有效地更新資料,但對於確切的結構仍是不可預測的。

論文採用 deterministic variant 的 skip list perfectly balanced skip lists 作為骨架,balanced skip list 的第 Level i+1 通道包含 Level i 通道的每 1/p 個元素,如 Figure 1 所示,當 p = 0.5 時, Level 1 的通道包含了 Level 0 的每

10.5=2 個元素 (1, 3, 5, 7, 9)、Level 2 的通道包含了 Level 1 的每 2 個元素 (1, 5, 9)。

當 p 值較小時,通道可視為稀疏的,可一次跳過許多元素,而在 p 值較大時,通道是密集的,需要在通道中比較多次的元素。

通道的功用主要用來縮少搜尋的範圍,例如在 Figure 1 搜尋元素 6 ,從最高 Leve 2 開始搜尋 6 ,當遇到比 6 還大的值就停止,往節點的下個 Level 繼續搜尋,因此找到元素 9 比 6 大,接著就從 5 往 Level 1 搜尋,遇到 7 停止,從 5 往 Level 0 搜尋,找到 6 。

在 p = 0.5 的 balanced skip list 之最壞情況,搜尋的複雜度需要

O(logn)

原始論文〈Skip lists: a probabilistic alternative to balanced trees〉藉由 fat keys 來實作,fat keys 包含指向每個通道下個節點的指標以及數組資料,所有的節點皆是一致的,因此方便實作,但透過這樣的實作方式,會使得指向高 Level 下個節點的指標在大多情形都是指向 NULL ,且至少需要

O(m×n) 的指標空間,其中 m 為通道的總數,故具有較差的空間使用率。

而指標所指向的記憶體空間在大多的情況下為不連續,若採用上述的實作方式實作 skip list,藉由大量的指標操作來進行搜尋,勢必會遇到許多的 cache misses,不利於現代微處理器。

例如,在一個 64-bit architecture 上使用 32-bit 整數鍵值以及具有 5 個通道數的 skip list,每個節點需要佔 4 bytes + (5+1) * 8 bytes = 52 bytes 的空間大小,假設一個 cache line 為 64 bytes,而在遍歷 skip list 的節點時,會幾乎佔用了所有的 cache line 空間 (52 bytes),因此大部分情況下都會發生 cache misses ,但其實僅需 4 bytes + 8 bytes = 12 bytes (數組+指向下個節點的指標) 的空間來存放即可。

Cache-Sensitive Skip List

Figure 2

為了能充分利用 cache,最直覺的想法就是將通道改為陣列表示,如此一來具有 spatial locality 的特性可減少 cache misses ,也可允許 SIMD 指令的使用來加速操作。

Figure 2 包含兩個通道 (Linearized Fast Lane Array) 用來管理 32 筆整數資料 (Data List),其中 p = 0.5,例如搜尋 7 時的走訪路徑如紅色指示所示,而 CSSL 是基於 balanced skip list 實作,故 Level i 的每 1/0.5 = 2 個元素為 Level i+1 的元素,因此通道的元素數量是已知的,也只需透過簡單計算得出元素後續的位置。

此篇論文的實作是先假設最大鍵值 t 並根據 t 來預先分配一定的記憶體空間,當遇到插入的鍵值 n 大於 t 時,就得重新建立通道 (詳見更新策略)。

和傳統的 skip list 相比,CSSL 僅需要更少的空間配置,假設鍵值的大小為 k 、指標大小為 r ,傳統 skip list 需要

n×(m×r+r+k) 的空間,而 CSSL 只須
n×(r+k)+i=1mpi×n×k
,其中 n 為資料量、m 為通道數。

Optimizations

Figure 3

進一步改進如下

  1. 將通道大小調整為 cache line 大小的倍數,如 Figure 3。
  2. 在最低 level 的通道與 data list 之間引入了一個附加的代理通道 (Proxy Lane Array),如 Figure 2,用來存放指向 data 的指標。
  3. 於最高 level 的通道上使用二元搜尋法,這是因為當通道數少的時候,最高 level 的通道所含的元素遠超過 1/p ,使得 cpu 在搜索最高 level 通道的成本往往非常高。

Updates
為了能夠支援即時更新,進行以下調整:

  1. Inserting keys:
    由於通道使用陣列來管理,因此插入新值的時候,需要使用大量的位移操作來使元素保持有序,所以該實作僅直接將新值插入到 data list (common linked list) 的適當位置,直到 skip list 需要重新建立時再為其分配通道陣列空間,可透過 atomic compare-and-swap instruction 來實作 latch-free 的插入演算法。
  2. Deleting keys:
    與插入相反,需先從通道陣列中刪除目標值,但如果將目標值改成 NULL 則需要重新排序通道陣列,因此改用與目標值相鄰的元素來取代目標值當作刪除,直到重建時才將重複的值刪除,而在 data list 中將指向目標節點的節點改為指向目標節點的後繼節點。
  3. Updating keys

以下是個精簡的 skip list 實作,參考執行輸出如下:

0 search 0
1 search 1
2 search 2
3 search 3
4 search 4
5 search 5
6 search 6
7 search 7
8 search 8
9 search 9
[sl_erase(list, i)]: return 0, index 10000, size 1

程式碼列表如下: (部分遮蔽)

/* total number of node is 2^32
 * the level here is log2(n), which is log2(2^32) = 32
 */
#define SL_MAXLEVEL 32

struct sl_link {
    struct sl_link *prev, *next;
};

struct sl_list {
    int size;
    int level;
    struct sl_link head[SL_MAXLEVEL];
};

struct sl_list *sl_list_alloc(void);
void sl_delete(struct sl_list *list);
void *sl_search(struct sl_list *list, int key);
int sl_insert(struct sl_list *list, int key, void *val);
int sl_erase(struct sl_list *list, int key);

#include <errno.h>
#include <stddef.h>
#include <stdint.h>
#include <stdlib.h>
#include <time.h>

#define compiler_barrier() asm volatile("" : : : "memory")

struct sl_node {
    int key;
    void *val;
    struct sl_link link[0];
};

static inline void list_init(struct sl_link *node)
{
    node->next = node;
    compiler_barrier();
    node->prev = node;
}

static inline void __list_add(struct sl_link *new,
                              struct sl_link *prev,
                              struct sl_link *next)
{
    next->prev = new;
    new->next = next;
    new->prev = prev;
    compiler_barrier();
    prev->next = new;
}

static inline void list_add(struct sl_link *new, struct sl_link *prev)
{
    __list_add(new, prev, prev->next);
}

static inline void __list_del(struct sl_link *prev, struct sl_link *next)
{
    next->prev = prev;
    compiler_barrier();
    prev->next = next;
}

static inline void list_del(struct sl_link *node)
{
    __list_del(node->prev, node->next);
    list_init(node);
}

#define list_for_each_from(pos, head) for (; pos != head; pos = pos->next)

#define list_for_each_safe_from(pos, n, head) \
    for (n = pos->next; pos != head; pos = n, n = pos->next)

#define container_of(ptr, type, member)                         \
    __extension__({                                             \
        const __typeof__(((type *) 0)->member) *__mptr = (ptr); \
        (type *) ((char *) __mptr - offsetof(type, member));    \
    })

#define list_entry(ptr, i) container_of(ptr, struct sl_node, link[i])

/* the probability set at 1/2 */

static inline void set_random(void)
{
    time_t current_time;
    srandom(time(&current_time));
}

static inline int random_level(void)
{
    int level = 0;
    uint32_t random_seed = (uint32_t) random();

    while (random_seed && (random_seed & HHHH)) {
        random_seed >>= 1;
        level++;
    }

    return level >= SL_MAXLEVEL ? SL_MAXLEVEL - 1 : level;
}

static struct sl_node *sl_node_alloc(int key, void *val, int level)
{
    struct sl_node *node =
        malloc(sizeof(struct sl_node) + (level + 1) * sizeof(struct sl_link));
    if (!node)
        return NULL;

    node->key = key, node->val = val;

    return node;
}

struct sl_list *sl_list_alloc(void)
{
    struct sl_list *list = malloc(sizeof(struct sl_list));
    if (!list)
        return NULL;

    list->level = 0;
    list->size = 0;
    set_random();
    for (int i = 0; i < SL_MAXLEVEL; i++)
        list_init(&list->head[i]);

    return list;
}

void sl_delete(struct sl_list *list)
{
    struct sl_link *n, *pos = list->head[0].next;

    list_for_each_safe_from (pos, n, &list->head[0]) {
        struct sl_node *node = list_entry(pos, 0);
        free(node);
    }
    free(list);
}

void *sl_search(struct sl_list *list, int key)
{
    int i = list->level;
    struct sl_link *pos = &list->head[i];
    struct sl_link *head = &list->head[i];

    for (; i >= 0; i--) {
        pos = pos->next;
        list_for_each_from (pos, head) {
            struct sl_node *node = list_entry(pos, i);
            if (node->key > key)
                break;
            else if (node->key == key)
                return node->val;
        }
        pos = pos->prev;
        pos--;
        head--;
    }

    return NULL;
}

int sl_insert(struct sl_list *list, int key, void *val)
{
    int i, level = random_level();
    struct sl_link *pos = &list->head[level];
    struct sl_link *head = &list->head[level];
    struct sl_node *new = sl_node_alloc(key, val, level);

    if (!new)
        return -ENOMEM;
    if (level > list->level)
        list->level = level;

    for (i = level; i >= 0; i--) {
        pos = pos->next;
        list_for_each_from (pos, head) {
            struct sl_node *tmp = list_entry(pos, i);
            if (tmp->key > key) {
                break;
            } else if (tmp->key == key)
                goto failed;
        }
        pos = pos->prev;
        IIII;
        pos--;
        head--;
    }

    list->size++;

    return 0;
failed:
    free(new);
    return -EEXIST;
}

int sl_erase(struct sl_list *list, int key)
{
    int i = list->level;
    struct sl_link *pos = &list->head[i];
    struct sl_link *n, *head = &list->head[i];

    for (; i >= 0; i--) {
        pos = pos->next;
        JJJJ {
            struct sl_node *tmp = list_entry(pos, i);
            if (tmp->key == key) {
                for (; i >= 0; i--) {
                    list_del(pos--);
                    if (list->head[i].next == &list->head[i])
                        list->level--;
                }
                free(tmp);
                list->size--;
                return 0;
            } else if (tmp->key > key)
                break;
        }
        pos = pos->prev;
        pos--;
        head--;
    }

    return -EINVAL;
}

#include <assert.h>
#include <stdio.h>

#define test(ops, index, size) \
    printf("[" #ops            \
           "]: "               \
           "return %d, "       \
           "index %d, "        \
           "size %d\n",        \
           (int) ops, index, size)

#define times 10000

int main(int argc, char *argv[])
{
    struct sl_list *list = sl_list_alloc();
    int i, arr[10] = {0};
    for (i = 0; i < times; i++)
        assert(sl_insert(list, i, &arr[i]) == 0);
    test(sl_insert(list, i, NULL), i, list->size);

    for (i = 0; i < 10; i++) {
        arr[i] = i;
        printf("%d search %d\n", i, *(int *) sl_search(list, i));
    }

    for (i = 0; i < times; i++)
        assert(sl_erase(list, i) == 0);
    test(sl_erase(list, i), i, list->size);

    sl_delete(list);
    return 0;
}

請補完程式碼,使其運作符合預期。作答規範:

  • 以最精簡的形式撰寫
  • HHHH 為正整數
  • IIIIJJJJ 為合法的 C 敘述,均以 list_ 開頭
  • 符合 lab0-c 指定的程式碼排版風格

延伸問題:

  1. 解釋上述程式碼運作原理,指出設計缺失並改進
  2. 研讀〈A kernel skiplist implementation〉和〈Skiplists II: API and benchmarks〉,探討 Linux 核心引入的 skip list 的考量,找出現行 Linux 核心的應用案例
  3. 分析 Concurrent-Skip-list 並以 C11 Atomics 重新實作,搭配閱讀〈Concurrent Skiplists〉,實作更高效的程式碼。

測驗 3

並行程式設計: Hazard pointer〉提到:

在並行程式設計中,當我們在存取共用的記憶體物件時,需要考慮到其他執行緒是否有可能也正在存取同一個物件,若要釋放該記憶體物件時,不考慮這個問題,會引發嚴重的後果,例如 dangling pointer。

使用 mutex 是最簡單且直觀的方法:存取共用記憶體時,acquire lock 即可保證沒有其他執行緒正在存取同一物件,也就可安全地釋放記憶體。但若我們正在存取的是一種 lock-free 資料結構,當然就不能恣意地使用 lock,因為會違反 lock-free 特性,即無論任何執行緒失敗,其他執行緒都能可繼續執行。於是乎,我們需要某種同為 lock-free 的記憶體物件回收機制。

lfq 嘗試實作精簡的並行佇列 (concurrent queue),運用 hazard pointer 來釋放並行處理過程中的記憶體,程式碼可見: lfq (部分遮蔽),其中 atomics.h 如下:

#pragma once

/* TODO: rewrite in C11 Atomics */
#if !defined(__x86_64__) || defined(__i386)
#error "This implementation is specific to x86(-64)"
#endif

#include <stdbool.h>
#include <stdlib.h>
#include <string.h>

#ifndef asm
#define asm __asm
#endif

#define cmpxchg(ptr, _old, _new)                                \
    {                                                           \
        volatile uint32_t *__ptr = (volatile uint32_t *) (ptr); \
        uint32_t __ret;                                         \
        asm volatile("lock; cmpxchgl %2,%1"                     \
                     : "=a"(__ret), "+m"(*__ptr)                \
                     : "r"(_new), "0"(_old)                     \
                     : "memory");                               \
  );                                                            \
        __ret;                                                  \
    }

#define ATOMIC_SET __sync_lock_test_and_set
#define ATOMIC_RELEASE __sync_lock_release

#define ATOMIC_SUB __sync_sub_and_fetch
#define ATOMIC_SUB64 ATOMIC_SUB
#define CAS __sync_bool_compare_and_swap

/* The 2nd argument is limited to 1 on machines with TAS but not XCHG.
 * On x86 it is an arbitrary value.
 */
#define XCHG __sync_lock_test_and_set

#define ATOMIC_ADD __sync_add_and_fetch
#define ATOMIC_ADD64 ATOMIC_ADD
#define mb __sync_synchronize

/* compiler barrier only: runtime reordering is impossible on x86 */
#define lmb() asm volatile("" ::: "memory")
#define smb() asm volatile("" ::: "memory")

顯然上述實作針對 x86(-64),以 __sync 開頭的內建函式為 GCC Legacy __sync Built-in Functions for Atomic Memory Access

編譯方式:

gcc -std=gnu99 -O3 -Wall -Wextra -o test_p1c1  -D MAX_PRODUCER=1 -D MAX_CONSUMER=1 lfq.c test.c -lpthread

這個實作應該要能夠支援以下組態:

  • -D MAX_PRODUCER=4 -D MAX_CONSUMER=4
  • -D MAX_PRODUCER=100 -D MAX_CONSUMER=10
  • -D MAX_PRODUCER=10 -D MAX_CONSUMER=100

預期輸出:

Producer thread [140583962404608] exited! Still 0 running...
Consumer thread [140583970797312] exited 0
Total push 500000 elements, pop 500000 elements. freelist=1, clean = 0
Test PASS!!

補完程式碼,使其運作符合預期。作答規範:

  • AAAA, BBBB, CCCC 均包含 CAS 巨集,且為合法表示式

延伸閱讀:

延伸問題:

  1. 解釋上述程式碼運作原理
  2. 用 C11 Atomics 改寫,使得能夠支援 x86(-64) 以外的處理器架構
  3. 遞交 pull request,以上述程式碼的改進版本來取代 lf-queue