Try   HackMD

Linux 核心專題: 並行程式設計

執行人: csm1735
專題解說錄影

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
提問清單

  • ?

任務簡述

回顧〈並行和多執行緒程式設計〉教材,以課程測驗題給定的並行程式碼為基礎,著手改進,並確保以 C11 Atomics 撰寫出正確且有效的程式碼,所有程式碼應當通過 Thread Sanitizer 的執行時期檢測。

TODO: Reference counting

重做第 8 週測驗題的測驗一,包含延伸問題。

  • 完整程式碼可見 GitHub

解釋上述程式碼運作原理,指出其實作缺失並改進

#define maybe_unused __attribute__((unused))

先定義了 maybe_unused ,其作用為針對可能不會使用到的部份,消除編譯時的警告。

typedef struct {
#ifdef REFCNT_CHECK
    int magic;
#endif
    atomic_uint refcount;
    char data[];
} refcnt_t;

宣告了一個名為 refcnt_t 的 struct,可看到其中除了 data 外,還有一個 atomic_uint 的變數 refcount ,用來以 atomic 的型態儲存 reference counting ,而如果程式中有定義 REFCNT_CHECK , struct 中還會再多一個 magic 的變數,用途為檢查傳入 refcnt_refrefcnt_unref 的 reference 是否由 refcnt_mallocrefcnt_strdup 所建立。

static maybe_unused void *refcnt_malloc(size_t len)
{
    refcnt_t *ref = malloc(sizeof(refcnt_t) + len);
    if (!ref)
        return NULL;
#ifdef REFCNT_CHECK
    ref->magic = REFCNT_MAGIC;
#endif
    atomic_init(&ref->refcount, 1);
    return ref->data;
}

先透過 malloc 配置大小為 sizeof(refcnt_t) + len 的記憶體空間,如果配置失敗則直接回傳 NULL ,如果有定義 REFCNT_CHECK 的話,則將 ref->magic 設為事先定義好的魔術數 REFCNT_MAGIC ,接著再透過 atomic_init 將代表 reference counting 的 ref->refcount 初始化為 1,最後會回傳 ref->data

static maybe_unused void *refcnt_realloc(void *ptr, size_t len)
{
    refcnt_t *ref = (void *) (ptr - offsetof(refcnt_t, data));
#ifdef REFCNT_CHECK
    assert(ref->magic == REFCNT_MAGIC);
#endif
    ref = realloc(ref, sizeof(refcnt_t) + len);
    if (!ref)
        return NULL;
    return ref->data;
}

使用 offsetof 計算 refcnt_t 的成員 data 距離結構起始位址的偏移量,以透過 (ptr - offsetof(refcnt_t, data)) 取得結構起始位址,如果有定義 REFCNT_CHECK 的話,則會去檢查 ref->magic ,接著透過 realloc 重新調整 ref 所指向的記憶體空間的大小為 sizeof(refcnt_t) + len ,如果配置失敗則回傳 NULL ,最後會回傳 ref->data

static maybe_unused void *refcnt_ref(void *ptr)
{
    refcnt_t *ref = (void *) (ptr - offsetof(refcnt_t, data));
#ifdef REFCNT_CHECK
    assert(ref->magic == REFCNT_MAGIC && "Invalid refcnt pointer");
#endif
    atomic_fetch_add(&ref->refcount, 1);
    return ref->data;
}

使用 offsetof 計算 refcnt_t 的成員 data 距離結構起始位址的偏移量,以透過 (ptr - offsetof(refcnt_t, data)) 取得結構起始位址,如果有定義 REFCNT_CHECK 的話,則會去檢查 ref->magic ,接著再透過 atomic_fetch_add 去將 ref->refcount 做加一的動作,最後會回傳 ref->data

static maybe_unused void refcnt_unref(void *ptr)
{
    refcnt_t *ref = (void *) (ptr - offsetof(refcnt_t, data));
#ifdef REFCNT_CHECK
    assert(ref->magic == REFCNT_MAGIC && "Invalid refcnt pointer");
#endif
    if (atomic_fetch_sub(&ref->refcount, 1) == 1)
        free(ref);
}

使用 offsetof 計算 refcnt_t 的成員 data 距離結構起始位址的偏移量,以透過 (ptr - offsetof(refcnt_t, data)) 取得結構起始位址,如果有定義 REFCNT_CHECK 的話,則會去檢查 ref->magic ,接著再透過 atomic_fetch_sub 去將 ref->refcount 做減一的動作,如果 ref->refcount 的值在減一之前為 1 ,即做完減一的動作後會變成 0 的話,則將 ref 做 free 的動作。

static maybe_unused char *refcnt_strdup(char *str)
{
    refcnt_t *ref = malloc(sizeof(refcnt_t) + strlen(str) + 1);
    if (!ref)
        return NULL;
#ifdef REFCNT_CHECK
    ref->magic = REFCNT_MAGIC;
#endif
    atomic_init(&ref->refcount, 1);
    strcpy(ref->data, str);
    return ref->data;
}

先透過 malloc 配置大小為 sizeof(refcnt_t) + strlen(str) + 1 的記憶體空間,如果配置失敗則直接回傳 NULL ,如果有定義 REFCNT_CHECK 的話,則將 ref->magic 設為事先定義好的魔術數 REFCNT_MAGIC ,接著再透過 atomic_init 將代表 reference counting 的 ref->refcount 初始化為 1,再使用 strcpy 將字串 str 複製到 ref->data,最後會回傳 ref->data

#define N_ITERATIONS 100

static void *test_thread(void *arg)
{
    char *str = arg;
    for (int i = 0; i < N_ITERATIONS; i++) {
        char *str2 = refcnt_ref(str);
        fprintf(stderr, "Thread %u, %i: %s\n", (unsigned int) pthread_self(), i,
                str2);
        refcnt_unref(str2);
    }
    refcnt_unref(str);
    return NULL;
}

#define N_THREADS 64

int main(int argc, char **argv)
{
    /* Create threads */
    pthread_t threads[N_THREADS];

    /* Create a new string that is count referenced */
    char *str = refcnt_strdup("Hello, world!");

    /* Start the threads, passing a new counted copy of the referece */
    for (int i = 0; i < N_THREADS; i++)
        pthread_create(&threads[i], NULL, test_thread, refcnt_ref(str));

    /* We no longer own the reference */
    refcnt_unref(str);

    /* Whichever thread finishes last will free the string */
    for (int i = 0; i < N_THREADS; i++)
        pthread_join(threads[i], NULL);

    void *ptr = malloc(100);
    /* This should cause a heap overflow while checking the magic num which the
     * sanitizer checks.
     * Leaving commented out for now
     */
    // refcnt_ref(ptr);

    free(ptr);
    return 0;
}

main 中使用 pthread_create 來建立 N_THREADS 個執行緒,而新建立的執行緒會去調用 test_thread 來開始執行,refcnt_ref(str) 則是作為 test_thread 的參數傳遞。
test_thread 中可發現每次做 fprintf 前都會做 refcnt_ref 的動作,在fprintf 完後再做 refcnt_unref 的動作,而在 fprintf 中可發現有使用到 pthread_self() ,其功能為取得執行緒自身的 ID 。
此外, main 中也使用了 pthread_join ,其作用為去等待指定的執行緒執行完畢,如果沒有去使用 pthread_join 可能會造成我們所建立的執行緒沒有執行完的問題。

研讀〈Linux 核心模組運作原理〉並在 Linux 核心原始程式碼中找出相關 reference counting 的程式碼,予以解讀和分析

在取得 Linux 核心原始程式碼後,可用以下命令去搜尋包含關鍵字 reference count 的 commit。

git log --grep="reference count"

可發現在 commit ddaf098 提及 reference count ,而相關的程式碼可以在 drivers/base/class.cinclude/linux/device/class.h 裡面查看。
根據註解及程式碼推測,subsys_get 的功能與 reference count 的 INCREMENT 有關,而 subsys_put 的功能則與 reference count 的 DECREMENT 有關。
lxr 搜尋 subsys_get

static inline struct subsys_private *subsys_get(struct subsys_private *sp)
{
	if (sp)
		kset_get(&sp->subsys);
	return sp;
}

subsys_get 中有呼叫 kset_get

static inline struct kset *kset_get(struct kset *k)
{
	return k ? to_kset(kobject_get(&k->kobj)) : NULL;
}

kset_get 中有呼叫 kobject_get

struct kobject *kobject_get(struct kobject *kobj)
{
	if (kobj) {
		if (!kobj->state_initialized)
			WARN(1, KERN_WARNING
				"kobject: '%s' (%p): is not initialized, yet kobject_get() is being called.\n",
			     kobject_name(kobj), kobj);
		kref_get(&kobj->kref);
	}
	return kobj;
}

kobject_get 中有呼叫 kref_get

/**
 * kref_get - increment refcount for object.
 * @kref: object.
 */
static inline void kref_get(struct kref *kref)
{
	refcount_inc(&kref->refcount);
}

kref_get 中會呼叫 refcount_inc 去做 reference count 的 INCREMENT。
由此可見,subsys_get 確實與增加 reference count 有所關聯。

接著搜尋 subsys_put

static inline void subsys_put(struct subsys_private *sp)
{
	if (sp)
		kset_put(&sp->subsys);
}

subsys_put 中呼叫了 kset_put

static inline void kset_put(struct kset *k)
{
	kobject_put(&k->kobj);
}

kset_put 中呼叫了 kobject_put

void kobject_put(struct kobject *kobj)
{
	if (kobj) {
		if (!kobj->state_initialized)
			WARN(1, KERN_WARNING
				"kobject: '%s' (%p): is not initialized, yet kobject_put() is being called.\n",
			     kobject_name(kobj), kobj);
		kref_put(&kobj->kref, kobject_release);
	}
}

kobject_put 中呼叫了 kref_put

/**
 * kref_put - decrement refcount for object.
 * @kref: object.
 * @release: pointer to the function that will clean up the object when the
 *	     last reference to the object is released.
 *	     This pointer is required, and it is not acceptable to pass kfree
 *	     in as this function.
 *
 * Decrement the refcount, and if 0, call release().
 * Return 1 if the object was removed, otherwise return 0.  Beware, if this
 * function returns 0, you still can not count on the kref from remaining in
 * memory.  Only use the return value if you want to see if the kref is now
 * gone, not present.
 */
static inline int kref_put(struct kref *kref, void (*release)(struct kref *kref))
{
	if (refcount_dec_and_test(&kref->refcount)) {
		release(kref);
		return 1;
	}
	return 0;
}

kref_put 中會去做 reference count 的 DECREMENT,而當最後的 reference 被釋放之後,該物件也會被釋放。
由此可見,subsys_put 也確實與減少 reference count 有所關聯。

drivers/base/class.cclass_to_subsys 的功能是將 struct class 轉換為 struct subsys_private,做這樣的轉換是因為 driver core 內部是需要處理 struct subsys_private,不是外部的 struct class,而在找到匹配的 class 後就會 return 與該 class 相關的內部結構 subsys_private,如果沒有找到匹配的 class 則會 return NULL,而在不為 NULL 的時候,就會呼叫 subsys_get 去做 reference count 的 INCREMENT,因此在使用完畢後必須呼叫 subsys_put() 才能正確地去做釋放的動作。

commit ddaf098 所修正的問題就是當呼叫 class_dev_iter_init 函式去初始化 class_dev_iter 的時候使用了 class_to_subsys 去取得 subsys_private 結構並使其 reference count 增加,但當 class_dev_iter 使用完畢後,卻沒有將 subsys_private 的 reference count 做減少的動作,由於這個缺失,漸漸地就會造成 memory leak 的問題。

解釋為何 Linux 核心的同步機制不依賴 reference counting (提示: 參閱 RCU 的設計理念)

參閱 RCU 同步機制中 RCU 與 reference counting 的對比如下

Reference Counting RCU
Unreclaimed objects Bounded Unbounded
Contention among readers High None
Traversal forward progress lock-free wait-free
Reclamation forward progress lock-free blocking
Traversal speed atomic no-overhead
Reference acquisition unconditional unconditional
Automatic reclamation yes no
Purpose of domains N/A isolate slow reader

首先,reference counting 的主要問題是 atomic。多個執行緒可能同時增加或減少 reference counting ,這就需要使用 atomic operation 來確保計數的一致性。然而,atomic operation 是相對高成本的操作,特別是當多個執行緒同時競爭一個資源時。在高度並行的情況下,可能會導致性能下降。

再來,reference counting 可能會在多個 readers 的情況下導致高度競爭,如果多個執行緒同時增加或減少引用計數,可能會產生競爭,導致計數結果不正確,而為了解決這個問題,需要使用其他同步機制,但也會使得整體的複雜性增加。

另外,當 reference counting 存在 reference cycles (an object which refers directly or indirectly to itself) 的情況時,會導致計數保持非零,無法正確釋放資源。這種情況需要特殊的機制來檢測和處理,以確保資源能夠正確地釋放,但也會增加 reference counting 的成本和複雜性。

基於以上這些性能、競爭、和資源釋放等方面的考慮,這也是為什麼 Linux 核心的同步機制不依賴 reference counting。

TODO: Hazard Pointer

重做第 8 週測驗題的測驗三,包含延伸問題。

  • 完整程式碼可見 GitHub

解釋上述程式碼運作原理

struct lfq_node {
    void *data;
    union {
        struct lfq_node *next;
        struct lfq_node *free_next;
    };
    bool can_free;
};

struct lfq_ctx {
    alignas(64) struct lfq_node *head;
    int count;
    struct lfq_node **HP; /* hazard pointers */
    int *tid_map;
    bool is_freeing;
    struct lfq_node *fph, *fpt; /* free pool head/tail */

    /* FIXME: get rid of struct. Make it configurable */
    int MAX_HP_SIZE;

    /* avoid cacheline contention */
    alignas(64) struct lfq_node *tail;
};

lfq.h 中,先定義二個結構體。
struct lfq_node 是 lock-free queue node 的結構,其中除了資料以及代表能否 free 的一個 bool 以外,還嵌入了 union 的型別去儲存 nextfree_nextnext 是 queue 中的下一個 node,free_next 則是 free pool 中的下一個 node。
struct lfq_ctx 是 lock-free queue handler 的結構,儲存了 queue 及 free pool 的頭尾,以及 tid 的 map,還有一個 bool 用來確認是否正在做 free 的動作,此外,也儲存了 hazard pointers 和他的最大長度。

int lfq_enqueue(struct lfq_ctx *ctx, void *data)
{
    struct lfq_node *insert_node = calloc(1, sizeof(struct lfq_node));
    if (!insert_node)
        return -errno;

    insert_node->data = data;
    struct lfq_node *old_tail = XCHG(&ctx->tail, insert_node);
    /* We have claimed our spot in the insertion order by modifying tail.
     * we are the only inserting thread with a pointer to the old tail.
     *
     * Now we can make it part of the list by overwriting the NULL pointer in
     * the old tail. This is safe whether or not other threads have updated
     * ->next in our insert_node.
     */
#ifdef DEBUG
    assert(!(old_tail->next) && "old tail was not NULL");
#endif
    old_tail->next = insert_node;
    /* TODO: could a consumer thread could have freed the old tail?  no because
     * that would leave head=NULL
     */

    return 0;
}

lfq_enqueue 的功能為 push 資料進入 queue 中,作法是將 queue 的尾端替換成 insert_node ,再將原先舊尾端 old_tailnext 指向 insert_node

void *lfq_dequeue_tid(struct lfq_ctx *ctx, int tid)
{
    struct lfq_node *old_head, *new_head;

    /* HP[tid] is necessary for deallocation. */
    do {
    retry:
        /* continue jumps to the bottom of the loop, and would attempt a CAS
         * with uninitialized new_head.
         */
        old_head = ctx->head;

        /* seq-cst store.
         * FIXME: use xchg instead of mov + mfence on x86.
         */
        ctx->HP[tid] = old_head;
        mb();

        /* another thread freed it before seeing our HP[tid] store */
        if (old_head != ctx->head)
            goto retry;
        new_head = old_head->next;
        if (new_head == 0) {
            ctx->HP[tid] = 0;
            return NULL; /* never remove the last node */
        }
#ifdef DEBUG
        // FIXME: check for already-freed nodes
        // assert(new_head != (void *) -1 && "read an already-freed node");
#endif
    } while (!CAS(&ctx->head, old_head, new_head));

    /* We have atomically advanced head, and we are the thread that won the race
     * to claim a node. We return the data from the *new* head. The list starts
     * off with a dummy node, so the current head is always a node that is
     * already been read.
     */
    ctx->HP[tid] = 0;
    void *ret = new_head->data;
    new_head->can_free = true;

    /* we need to avoid freeing until other readers are definitely not going to
     * load its ->next in the CAS loop
     */
    safe_free(ctx, (struct lfq_node *) old_head);

    return ret;
}

void *lfq_dequeue(struct lfq_ctx *ctx)
{
    int tid = alloc_tid(ctx);
    /* To many thread race */
    if (tid == -1)
        return (void *) -1;

    void *ret = lfq_dequeue_tid(ctx, tid);
    free_tid(ctx, tid);
    return ret;
}

lfq_dequeue 的功能為將資料從 queue 中 pop 出來,首先,會先透過 alloc_tid 取得 tid ,如果順利取得非 -1 的值,就會透過 lfq_dequeue_tid 去做 pop,在 lfq_dequeue_tid 中,會先將要 pop 的 old_head 存入 ctx->HP[tid] 之中,然後去將 queue 的 head 替換成 new_head 也就是 old_head->next,完成後再將 ctx->HP[tid] 重設回 0,再對 old_headsafe_free

static void safe_free(struct lfq_ctx *ctx, struct lfq_node *node)
{
    if (node->can_free && !in_hp(ctx, node)) {
        /* free is not thread-safe */
        if (CAS(&ctx->is_freeing, 0, 1)) {
            /* poison the pointer to detect use-after-free */
            node->next = (void *) -1;
            free(node); /* we got the lock; actually free */
            ctx->is_freeing = false;
            smb();
        } else /* we did not get the lock; only add to a freelist */
            insert_pool(ctx, node);
    } else
        insert_pool(ctx, node);
    free_pool(ctx, false);
}

如果這個 node 能被 free 且不在 hazard pointers 內,就嘗試去做 free 的動作,否則就透過 insert_pool 只將 node 加入 free pool,而如果嘗試去做 CAS(&ctx->is_freeing, 0, 1) 成功的話,就能確實地去將 node 給 free 掉,但如果失敗了就一樣只將 node 加入 free pool,最後再呼叫 free_pool 去嘗試 free 掉 free pool 內的 node。

用 C11 Atomics 改寫,使得能夠支援 x86(-64) 以外的處理器架構

題目所給的 atomics.h 中所定義的內容主要針對 x86(-64),故這邊用 C11 Atomics 將以 __sync 開頭的內建函式做改寫,為此需要先 #include <stdatomic.h>

// #define ATOMIC_SUB __sync_sub_and_fetch
#define ATOMIC_SUB atomic_fetch_sub

將原先的 __sync_sub_and_fetch 改寫成 atomic_fetch_sub,然而原先的 __sync_sub_and_fetch 所回傳的是做完減法後的新值,而 atomic_fetch_sub 所回傳的則是減法之前的舊值,但查看程式中對 ATOMIC_SUB 的使用,沒有去對回傳值做存取的部份,故沒有進一步修改

// #define ATOMIC_ADD __sync_add_and_fetch
#define ATOMIC_ADD atomic_fetch_add

將原先的 __sync_add_and_fetch 改寫成 atomic_fetch_add,然而原先的 __sync_add_and_fetch 所回傳的是做完加法後的新值,而 atomic_fetch_add 所回傳的則是加法之前的舊值,但查看程式中對 ATOMIC_ADD 的使用,只有在 test.c 中的

int tid = ATOMIC_ADD(&cnt_thread, 1);

去對回傳值做存取的動作,雖可針對該行做修改,但原先的版本會取得加法完的新值,也就是使 tid 從 1 開始,而新的版本能使 tid 取得舊值,也就是從 0 開始,故此處也不做修改

// #define CAS __sync_bool_compare_and_swap
#define CAS atomic_compare_exchange_strong

將原先的 __sync_bool_compare_and_swap 改寫成 atomic_compare_exchange_strong,以達到先比較再去修改值的效果,如果成功會 return true ,反之則 return false

// #define XCHG __sync_lock_test_and_set
#define XCHG atomic_exchange

將原先的 __sync_lock_test_and_set 改寫成 atomic_exchange,以達到修改值的效果,而最後都會回傳修改前的值

// #define ATOMIC_SET __sync_lock_test_and_set
// #define ATOMIC_RELEASE __sync_lock_release
#define ATOMIC_SET atomic_flag_test_and_set
#define ATOMIC_RELEASE atomic_flag_clear

ATOMIC_SETATOMIC_RELEASE 在原先程式中都沒有使用到,而其函式功能為 lock 的相關操作,故此處藉由 atomic_flag_test_and_setatomic_flag_clear 去對 atomic_flag 的物件做操作,以達到預期的效果

// #define mb __sync_synchronize
#define mb() atomic_thread_fence(memory_order_seq_cst)

將原先的 __sync_synchronize 改寫成 atomic_thread_fence,memory order 設為 memory_order_seq_cst

原先的程式碼在經由 Thread Sanitizer 檢測的時候,會發生多起 data race 的問題,以下是其中一筆

==================
WARNING: ThreadSanitizer: data race (pid=17162)
  Atomic write of size 4 at 0x56523ee7b018 by main thread:
    #0 __tsan_atomic32_fetch_add ../../../../src/libsanitizer/tsan/tsan_interface_atomic.cpp:615 (libtsan.so.0+0x81fe9)
    #1 main /test.c:97 (main1+0x14a6)

  Previous read of size 4 at 0x56523ee7b018 by thread T1:
    #0 remove_queue /test.c:72 (main1+0x22c3)

  Location is global 'cnt_producer' of size 4 at 0x56523ee7b018 (main1+0x000000005018)

  Thread T1 (tid=17164, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
    #1 main test.c:93 (main1+0x1494)

SUMMARY: ThreadSanitizer: data race /test.c:97 in main
==================

因此,根據 Thread Sanitizer 所提供的訊息,針對 data race 的部份,使用 atomic operation 改寫程式碼,以通過 Thread Sanitizer 的檢測。
以上面的訊息為例,去對 test.c 第 72 行的 cnt_producer 的讀取做改寫

//if (ctx->count || cnt_producer)
if (ctx->count || atomic_load(&cnt_producer))

相關程式碼修改可見 commit 695aff6

遞交 pull request,以上述程式碼的改進版本來取代 lf-queue

已提交 pull request #18 取代原先的 lf-queue,所提交的版本以 Ruslan Nikolaev 的論文 A Scalable, Portable, and Memory-Efficient Lock-Free FIFO Queue 所提及的 SCQ (Scalable Circular Queue) 來實作,並採用基於 hazard pointer 的記憶體物件回收機制,程式碼使用 C11 Atomics 撰寫,並通過 Thread Sanitizer 的驗證。

TODO: MPMC

重做第 9 週測驗題的測驗一,包含延伸問題。