# Linux 核心專題: 並行程式設計 > 執行人: csm1735 > [專題解說錄影](https://youtu.be/fo0Ehox78RY) :::success :question: 提問清單 * ? ::: ## 任務簡述 回顧〈[並行和多執行緒程式設計](https://hackmd.io/@sysprog/concurrency)〉教材,以課程測驗題給定的並行程式碼為基礎,著手改進,並確保以 C11 Atomics 撰寫出正確且有效的程式碼,所有程式碼應當通過 [Thread Sanitizer](https://github.com/google/sanitizers/wiki/ThreadSanitizerCppManual) 的執行時期檢測。 ## TODO: Reference counting 重做[第 8 週測驗題](https://hackmd.io/@sysprog/linux2023-quiz8)的測驗一,包含延伸問題。 * 完整程式碼可見 [GitHub](https://github.com/csm1735/Concurrent-Programming/tree/main/quiz8_test1) ### 解釋上述程式碼運作原理,指出其實作缺失並改進 ```c #define maybe_unused __attribute__((unused)) ``` 先定義了 `maybe_unused` ,其作用為針對可能不會使用到的部份,消除編譯時的警告。 ```c typedef struct { #ifdef REFCNT_CHECK int magic; #endif atomic_uint refcount; char data[]; } refcnt_t; ``` 宣告了一個名為 `refcnt_t` 的 struct,可看到其中除了 `data` 外,還有一個 atomic_uint 的變數 `refcount` ,用來以 atomic 的型態儲存 reference counting ,而如果程式中有定義 `REFCNT_CHECK` , struct 中還會再多一個 `magic` 的變數,用途為檢查傳入 `refcnt_ref` 和 `refcnt_unref` 的 reference 是否由 `refcnt_malloc` 或 `refcnt_strdup` 所建立。 ```c static maybe_unused void *refcnt_malloc(size_t len) { refcnt_t *ref = malloc(sizeof(refcnt_t) + len); if (!ref) return NULL; #ifdef REFCNT_CHECK ref->magic = REFCNT_MAGIC; #endif atomic_init(&ref->refcount, 1); return ref->data; } ``` 先透過 malloc 配置大小為 `sizeof(refcnt_t) + len` 的記憶體空間,如果配置失敗則直接回傳 NULL ,如果有定義 `REFCNT_CHECK` 的話,則將 `ref->magic` 設為事先定義好的魔術數 `REFCNT_MAGIC` ,接著再透過 atomic_init 將代表 reference counting 的 `ref->refcount` 初始化為 1,最後會回傳 `ref->data`。 ```c static maybe_unused void *refcnt_realloc(void *ptr, size_t len) { refcnt_t *ref = (void *) (ptr - offsetof(refcnt_t, data)); #ifdef REFCNT_CHECK assert(ref->magic == REFCNT_MAGIC); #endif ref = realloc(ref, sizeof(refcnt_t) + len); if (!ref) return NULL; return ref->data; } ``` 使用 offsetof 計算 `refcnt_t` 的成員 `data` 距離結構起始位址的偏移量,以透過 `(ptr - offsetof(refcnt_t, data))` 取得結構起始位址,如果有定義 `REFCNT_CHECK` 的話,則會去檢查 `ref->magic` ,接著透過 realloc 重新調整 `ref` 所指向的記憶體空間的大小為 `sizeof(refcnt_t) + len` ,如果配置失敗則回傳 NULL ,最後會回傳 `ref->data`。 ```c static maybe_unused void *refcnt_ref(void *ptr) { refcnt_t *ref = (void *) (ptr - offsetof(refcnt_t, data)); #ifdef REFCNT_CHECK assert(ref->magic == REFCNT_MAGIC && "Invalid refcnt pointer"); #endif atomic_fetch_add(&ref->refcount, 1); return ref->data; } ``` 使用 offsetof 計算 `refcnt_t` 的成員 `data` 距離結構起始位址的偏移量,以透過 `(ptr - offsetof(refcnt_t, data))` 取得結構起始位址,如果有定義 `REFCNT_CHECK` 的話,則會去檢查 `ref->magic` ,接著再透過 atomic_fetch_add 去將 `ref->refcount` 做加一的動作,最後會回傳 `ref->data`。 ```c static maybe_unused void refcnt_unref(void *ptr) { refcnt_t *ref = (void *) (ptr - offsetof(refcnt_t, data)); #ifdef REFCNT_CHECK assert(ref->magic == REFCNT_MAGIC && "Invalid refcnt pointer"); #endif if (atomic_fetch_sub(&ref->refcount, 1) == 1) free(ref); } ``` 使用 offsetof 計算 `refcnt_t` 的成員 `data` 距離結構起始位址的偏移量,以透過 `(ptr - offsetof(refcnt_t, data))` 取得結構起始位址,如果有定義 `REFCNT_CHECK` 的話,則會去檢查 `ref->magic` ,接著再透過 atomic_fetch_sub 去將 `ref->refcount` 做減一的動作,如果 `ref->refcount` 的值在減一之前為 1 ,即做完減一的動作後會變成 0 的話,則將 `ref` 做 free 的動作。 ```c static maybe_unused char *refcnt_strdup(char *str) { refcnt_t *ref = malloc(sizeof(refcnt_t) + strlen(str) + 1); if (!ref) return NULL; #ifdef REFCNT_CHECK ref->magic = REFCNT_MAGIC; #endif atomic_init(&ref->refcount, 1); strcpy(ref->data, str); return ref->data; } ``` 先透過 malloc 配置大小為 `sizeof(refcnt_t) + strlen(str) + 1` 的記憶體空間,如果配置失敗則直接回傳 NULL ,如果有定義 `REFCNT_CHECK` 的話,則將 `ref->magic` 設為事先定義好的魔術數 `REFCNT_MAGIC` ,接著再透過 atomic_init 將代表 reference counting 的 `ref->refcount` 初始化為 1,再使用 strcpy 將字串 `str` 複製到 `ref->data`,最後會回傳 `ref->data`。 ```c #define N_ITERATIONS 100 static void *test_thread(void *arg) { char *str = arg; for (int i = 0; i < N_ITERATIONS; i++) { char *str2 = refcnt_ref(str); fprintf(stderr, "Thread %u, %i: %s\n", (unsigned int) pthread_self(), i, str2); refcnt_unref(str2); } refcnt_unref(str); return NULL; } #define N_THREADS 64 int main(int argc, char **argv) { /* Create threads */ pthread_t threads[N_THREADS]; /* Create a new string that is count referenced */ char *str = refcnt_strdup("Hello, world!"); /* Start the threads, passing a new counted copy of the referece */ for (int i = 0; i < N_THREADS; i++) pthread_create(&threads[i], NULL, test_thread, refcnt_ref(str)); /* We no longer own the reference */ refcnt_unref(str); /* Whichever thread finishes last will free the string */ for (int i = 0; i < N_THREADS; i++) pthread_join(threads[i], NULL); void *ptr = malloc(100); /* This should cause a heap overflow while checking the magic num which the * sanitizer checks. * Leaving commented out for now */ // refcnt_ref(ptr); free(ptr); return 0; } ``` `main` 中使用 `pthread_create` 來建立 `N_THREADS` 個執行緒,而新建立的執行緒會去調用 `test_thread` 來開始執行,`refcnt_ref(str)` 則是作為 `test_thread` 的參數傳遞。 在 `test_thread` 中可發現每次做 fprintf 前都會做 `refcnt_ref` 的動作,在fprintf 完後再做 `refcnt_unref` 的動作,而在 fprintf 中可發現有使用到 `pthread_self()` ,其功能為取得執行緒自身的 ID 。 此外, `main` 中也使用了 `pthread_join` ,其作用為去等待指定的執行緒執行完畢,如果沒有去使用 `pthread_join` 可能會造成我們所建立的執行緒沒有執行完的問題。 ### 研讀〈[Linux 核心模組運作原理](https://hackmd.io/@sysprog/linux-kernel-module)〉並在 Linux 核心原始程式碼中找出相關 [reference counting](https://en.wikipedia.org/wiki/Reference_counting) 的程式碼,予以解讀和分析 在取得 Linux 核心原始程式碼後,可用以下命令去搜尋包含關鍵字 reference count 的 commit。 ```bash git log --grep="reference count" ``` 可發現在 [commit ddaf098](https://github.com/torvalds/linux/commit/ddaf098ea779b3c1302c7843f6ff01e89b1fd380) 提及 reference count ,而相關的程式碼可以在 [drivers/base/class.c](https://github.com/torvalds/linux/blob/master/drivers/base/class.c) 及 [include/linux/device/class.h](https://github.com/torvalds/linux/blob/master/include/linux/device/class.h) 裡面查看。 根據註解及程式碼推測,`subsys_get` 的功能與 reference count 的 INCREMENT 有關,而 `subsys_put` 的功能則與 reference count 的 DECREMENT 有關。 在 [lxr](https://elixir.bootlin.com/linux/latest/source) 搜尋 `subsys_get` ```c static inline struct subsys_private *subsys_get(struct subsys_private *sp) { if (sp) kset_get(&sp->subsys); return sp; } ``` 在 `subsys_get` 中有呼叫 `kset_get` ```c static inline struct kset *kset_get(struct kset *k) { return k ? to_kset(kobject_get(&k->kobj)) : NULL; } ``` 在 `kset_get` 中有呼叫 `kobject_get` ```c struct kobject *kobject_get(struct kobject *kobj) { if (kobj) { if (!kobj->state_initialized) WARN(1, KERN_WARNING "kobject: '%s' (%p): is not initialized, yet kobject_get() is being called.\n", kobject_name(kobj), kobj); kref_get(&kobj->kref); } return kobj; } ``` 在 `kobject_get` 中有呼叫 `kref_get` ```c /** * kref_get - increment refcount for object. * @kref: object. */ static inline void kref_get(struct kref *kref) { refcount_inc(&kref->refcount); } ``` 在 `kref_get` 中會呼叫 `refcount_inc` 去做 reference count 的 INCREMENT。 由此可見,`subsys_get` 確實與增加 reference count 有所關聯。 接著搜尋 `subsys_put` ```c static inline void subsys_put(struct subsys_private *sp) { if (sp) kset_put(&sp->subsys); } ``` 在 `subsys_put` 中呼叫了 `kset_put` ```c static inline void kset_put(struct kset *k) { kobject_put(&k->kobj); } ``` 在 `kset_put` 中呼叫了 `kobject_put` ```c void kobject_put(struct kobject *kobj) { if (kobj) { if (!kobj->state_initialized) WARN(1, KERN_WARNING "kobject: '%s' (%p): is not initialized, yet kobject_put() is being called.\n", kobject_name(kobj), kobj); kref_put(&kobj->kref, kobject_release); } } ``` 在 `kobject_put` 中呼叫了 `kref_put` ```c /** * kref_put - decrement refcount for object. * @kref: object. * @release: pointer to the function that will clean up the object when the * last reference to the object is released. * This pointer is required, and it is not acceptable to pass kfree * in as this function. * * Decrement the refcount, and if 0, call release(). * Return 1 if the object was removed, otherwise return 0. Beware, if this * function returns 0, you still can not count on the kref from remaining in * memory. Only use the return value if you want to see if the kref is now * gone, not present. */ static inline int kref_put(struct kref *kref, void (*release)(struct kref *kref)) { if (refcount_dec_and_test(&kref->refcount)) { release(kref); return 1; } return 0; } ``` `kref_put` 中會去做 reference count 的 DECREMENT,而當最後的 reference 被釋放之後,該物件也會被釋放。 由此可見,`subsys_put` 也確實與減少 reference count 有所關聯。 在 [drivers/base/class.c](https://github.com/torvalds/linux/blob/master/drivers/base/class.c) 中 `class_to_subsys` 的功能是將 `struct class` 轉換為 `struct subsys_private`,做這樣的轉換是因為 driver core 內部是需要處理 `struct subsys_private`,不是外部的 `struct class`,而在找到匹配的 class 後就會 return 與該 class 相關的內部結構 subsys_private,如果沒有找到匹配的 class 則會 return NULL,而在不為 NULL 的時候,就會呼叫 `subsys_get` 去做 reference count 的 INCREMENT,因此在使用完畢後必須呼叫 `subsys_put()` 才能正確地去做釋放的動作。 而 [commit ddaf098](https://github.com/torvalds/linux/commit/ddaf098ea779b3c1302c7843f6ff01e89b1fd380) 所修正的問題就是當呼叫 `class_dev_iter_init` 函式去初始化 `class_dev_iter` 的時候使用了 `class_to_subsys` 去取得 `subsys_private` 結構並使其 reference count 增加,但當 `class_dev_iter` 使用完畢後,卻沒有將 `subsys_private` 的 reference count 做減少的動作,由於這個缺失,漸漸地就會造成 memory leak 的問題。 ### 解釋為何 Linux 核心的同步機制不依賴 [reference counting](https://en.wikipedia.org/wiki/Reference_counting) (提示: 參閱 RCU 的設計理念) 參閱 [RCU 同步機制](https://hackmd.io/@sysprog/linux-rcu#%E5%B0%8D%E6%AF%94%E5%85%B6%E4%BB%96-lock-free-%E5%90%8C%E6%AD%A5%E6%A9%9F%E5%88%B6)中 RCU 與 reference counting 的對比如下 | | [Reference Counting](https://en.wikipedia.org/wiki/Reference_counting) | RCU |:----------------:|:------------------:|:---:| | Unreclaimed objects | Bounded | Unbounded | | Contention among readers | High | None | | Traversal forward progress | lock-free | wait-free | | Reclamation forward progress | lock-free | blocking | | Traversal speed | atomic | no-overhead | | | Reference acquisition | unconditional | unconditional | | Automatic reclamation | yes | no | | | Purpose of domains | N/A | isolate slow reader | 首先,reference counting 的主要問題是 atomic。多個執行緒可能同時增加或減少 reference counting ,這就需要使用 atomic operation 來確保計數的一致性。然而,atomic operation 是相對高成本的操作,特別是當多個執行緒同時競爭一個資源時。在高度並行的情況下,可能會導致性能下降。 再來,reference counting 可能會在多個 readers 的情況下導致高度競爭,如果多個執行緒同時增加或減少引用計數,可能會產生競爭,導致計數結果不正確,而為了解決這個問題,需要使用其他同步機制,但也會使得整體的複雜性增加。 另外,當 reference counting 存在 reference cycles (an object which refers directly or indirectly to itself) 的情況時,會導致計數保持非零,無法正確釋放資源。這種情況需要特殊的機制來檢測和處理,以確保資源能夠正確地釋放,但也會增加 reference counting 的成本和複雜性。 基於以上這些性能、競爭、和資源釋放等方面的考慮,這也是為什麼 Linux 核心的同步機制不依賴 reference counting。 ## TODO: Hazard Pointer 重做[第 8 週測驗題](https://hackmd.io/@sysprog/linux2023-quiz8)的測驗三,包含延伸問題。 * 完整程式碼可見 [GitHub](https://github.com/csm1735/Concurrent-Programming/tree/main/quiz8_test3) ### 解釋上述程式碼運作原理 ```c struct lfq_node { void *data; union { struct lfq_node *next; struct lfq_node *free_next; }; bool can_free; }; struct lfq_ctx { alignas(64) struct lfq_node *head; int count; struct lfq_node **HP; /* hazard pointers */ int *tid_map; bool is_freeing; struct lfq_node *fph, *fpt; /* free pool head/tail */ /* FIXME: get rid of struct. Make it configurable */ int MAX_HP_SIZE; /* avoid cacheline contention */ alignas(64) struct lfq_node *tail; }; ``` 在 `lfq.h` 中,先定義二個結構體。 `struct lfq_node` 是 lock-free queue node 的結構,其中除了資料以及代表能否 free 的一個 bool 以外,還嵌入了 union 的型別去儲存 `next` 及 `free_next`,`next` 是 queue 中的下一個 node,`free_next` 則是 free pool 中的下一個 node。 `struct lfq_ctx` 是 lock-free queue handler 的結構,儲存了 queue 及 free pool 的頭尾,以及 tid 的 map,還有一個 bool 用來確認是否正在做 free 的動作,此外,也儲存了 hazard pointers 和他的最大長度。 ```c int lfq_enqueue(struct lfq_ctx *ctx, void *data) { struct lfq_node *insert_node = calloc(1, sizeof(struct lfq_node)); if (!insert_node) return -errno; insert_node->data = data; struct lfq_node *old_tail = XCHG(&ctx->tail, insert_node); /* We have claimed our spot in the insertion order by modifying tail. * we are the only inserting thread with a pointer to the old tail. * * Now we can make it part of the list by overwriting the NULL pointer in * the old tail. This is safe whether or not other threads have updated * ->next in our insert_node. */ #ifdef DEBUG assert(!(old_tail->next) && "old tail was not NULL"); #endif old_tail->next = insert_node; /* TODO: could a consumer thread could have freed the old tail? no because * that would leave head=NULL */ return 0; } ``` `lfq_enqueue` 的功能為 push 資料進入 queue 中,作法是將 queue 的尾端替換成 `insert_node` ,再將原先舊尾端 `old_tail` 的 `next` 指向 `insert_node` ```c void *lfq_dequeue_tid(struct lfq_ctx *ctx, int tid) { struct lfq_node *old_head, *new_head; /* HP[tid] is necessary for deallocation. */ do { retry: /* continue jumps to the bottom of the loop, and would attempt a CAS * with uninitialized new_head. */ old_head = ctx->head; /* seq-cst store. * FIXME: use xchg instead of mov + mfence on x86. */ ctx->HP[tid] = old_head; mb(); /* another thread freed it before seeing our HP[tid] store */ if (old_head != ctx->head) goto retry; new_head = old_head->next; if (new_head == 0) { ctx->HP[tid] = 0; return NULL; /* never remove the last node */ } #ifdef DEBUG // FIXME: check for already-freed nodes // assert(new_head != (void *) -1 && "read an already-freed node"); #endif } while (!CAS(&ctx->head, old_head, new_head)); /* We have atomically advanced head, and we are the thread that won the race * to claim a node. We return the data from the *new* head. The list starts * off with a dummy node, so the current head is always a node that is * already been read. */ ctx->HP[tid] = 0; void *ret = new_head->data; new_head->can_free = true; /* we need to avoid freeing until other readers are definitely not going to * load its ->next in the CAS loop */ safe_free(ctx, (struct lfq_node *) old_head); return ret; } void *lfq_dequeue(struct lfq_ctx *ctx) { int tid = alloc_tid(ctx); /* To many thread race */ if (tid == -1) return (void *) -1; void *ret = lfq_dequeue_tid(ctx, tid); free_tid(ctx, tid); return ret; } ``` `lfq_dequeue` 的功能為將資料從 queue 中 pop 出來,首先,會先透過 `alloc_tid` 取得 tid ,如果順利取得非 -1 的值,就會透過 `lfq_dequeue_tid` 去做 pop,在 `lfq_dequeue_tid` 中,會先將要 pop 的 `old_head` 存入 `ctx->HP[tid]` 之中,然後去將 queue 的 `head` 替換成 `new_head` 也就是 `old_head->next`,完成後再將 `ctx->HP[tid]` 重設回 0,再對 `old_head` 做 `safe_free`。 ```c static void safe_free(struct lfq_ctx *ctx, struct lfq_node *node) { if (node->can_free && !in_hp(ctx, node)) { /* free is not thread-safe */ if (CAS(&ctx->is_freeing, 0, 1)) { /* poison the pointer to detect use-after-free */ node->next = (void *) -1; free(node); /* we got the lock; actually free */ ctx->is_freeing = false; smb(); } else /* we did not get the lock; only add to a freelist */ insert_pool(ctx, node); } else insert_pool(ctx, node); free_pool(ctx, false); } ``` 如果這個 node 能被 free 且不在 hazard pointers 內,就嘗試去做 free 的動作,否則就透過 `insert_pool` 只將 node 加入 free pool,而如果嘗試去做 `CAS(&ctx->is_freeing, 0, 1)` 成功的話,就能確實地去將 node 給 free 掉,但如果失敗了就一樣只將 node 加入 free pool,最後再呼叫 `free_pool` 去嘗試 free 掉 free pool 內的 node。 ### 用 C11 Atomics 改寫,使得能夠支援 x86(-64) 以外的處理器架構 題目所給的 `atomics.h` 中所定義的內容主要針對 x86(-64),故這邊用 C11 Atomics 將以 __sync 開頭的內建函式做改寫,為此需要先 `#include <stdatomic.h>` ```c // #define ATOMIC_SUB __sync_sub_and_fetch #define ATOMIC_SUB atomic_fetch_sub ``` 將原先的 `__sync_sub_and_fetch` 改寫成 `atomic_fetch_sub`,然而原先的 `__sync_sub_and_fetch` 所回傳的是做完減法後的新值,而 `atomic_fetch_sub` 所回傳的則是減法之前的舊值,但查看程式中對 `ATOMIC_SUB` 的使用,沒有去對回傳值做存取的部份,故沒有進一步修改 ```c // #define ATOMIC_ADD __sync_add_and_fetch #define ATOMIC_ADD atomic_fetch_add ``` 將原先的 `__sync_add_and_fetch` 改寫成 `atomic_fetch_add`,然而原先的 `__sync_add_and_fetch` 所回傳的是做完加法後的新值,而 `atomic_fetch_add` 所回傳的則是加法之前的舊值,但查看程式中對 `ATOMIC_ADD` 的使用,只有在 `test.c` 中的 ```c int tid = ATOMIC_ADD(&cnt_thread, 1); ``` 去對回傳值做存取的動作,雖可針對該行做修改,但原先的版本會取得加法完的新值,也就是使 `tid` 從 1 開始,而新的版本能使 `tid` 取得舊值,也就是從 0 開始,故此處也不做修改 ```c // #define CAS __sync_bool_compare_and_swap #define CAS atomic_compare_exchange_strong ``` 將原先的 `__sync_bool_compare_and_swap` 改寫成 `atomic_compare_exchange_strong`,以達到先比較再去修改值的效果,如果成功會 return true ,反之則 return false ```c // #define XCHG __sync_lock_test_and_set #define XCHG atomic_exchange ``` 將原先的 `__sync_lock_test_and_set` 改寫成 `atomic_exchange`,以達到修改值的效果,而最後都會回傳修改前的值 ```c // #define ATOMIC_SET __sync_lock_test_and_set // #define ATOMIC_RELEASE __sync_lock_release #define ATOMIC_SET atomic_flag_test_and_set #define ATOMIC_RELEASE atomic_flag_clear ``` `ATOMIC_SET` 跟 `ATOMIC_RELEASE` 在原先程式中都沒有使用到,而其函式功能為 lock 的相關操作,故此處藉由 `atomic_flag_test_and_set` 跟 `atomic_flag_clear` 去對 `atomic_flag` 的物件做操作,以達到預期的效果 ```c // #define mb __sync_synchronize #define mb() atomic_thread_fence(memory_order_seq_cst) ``` 將原先的 `__sync_synchronize` 改寫成 `atomic_thread_fence`,memory order 設為 `memory_order_seq_cst` 原先的程式碼在經由 Thread Sanitizer 檢測的時候,會發生多起 data race 的問題,以下是其中一筆 ```bash ================== WARNING: ThreadSanitizer: data race (pid=17162) Atomic write of size 4 at 0x56523ee7b018 by main thread: #0 __tsan_atomic32_fetch_add ../../../../src/libsanitizer/tsan/tsan_interface_atomic.cpp:615 (libtsan.so.0+0x81fe9) #1 main /test.c:97 (main1+0x14a6) Previous read of size 4 at 0x56523ee7b018 by thread T1: #0 remove_queue /test.c:72 (main1+0x22c3) Location is global 'cnt_producer' of size 4 at 0x56523ee7b018 (main1+0x000000005018) Thread T1 (tid=17164, running) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8) #1 main test.c:93 (main1+0x1494) SUMMARY: ThreadSanitizer: data race /test.c:97 in main ================== ``` 因此,根據 Thread Sanitizer 所提供的訊息,針對 data race 的部份,使用 atomic operation 改寫程式碼,以通過 Thread Sanitizer 的檢測。 以上面的訊息為例,去對 `test.c` 第 72 行的 `cnt_producer` 的讀取做改寫 ```c //if (ctx->count || cnt_producer) if (ctx->count || atomic_load(&cnt_producer)) ``` 相關程式碼修改可見 [commit 695aff6](https://github.com/csm1735/quiz8_test3/commit/695aff6e7640584df96d0d31287f393c5015b7ec) ### 遞交 pull request,以上述程式碼的改進版本來取代 [lf-queue](https://github.com/sysprog21/concurrent-programs/tree/master/lf-queue) 已提交 [pull request #18](https://github.com/sysprog21/concurrent-programs/pull/18) 取代原先的 `lf-queue`,所提交的版本以 Ruslan Nikolaev 的論文 [A Scalable, Portable, and Memory-Efficient Lock-Free FIFO Queue](https://drops.dagstuhl.de/opus/volltexte/2019/11335/pdf/LIPIcs-DISC-2019-28.pdf) 所提及的 SCQ (Scalable Circular Queue) 來實作,並採用基於 hazard pointer 的記憶體物件回收機制,程式碼使用 C11 Atomics 撰寫,並通過 Thread Sanitizer 的驗證。 ## TODO: MPMC 重做[第 9 週測驗題](https://hackmd.io/@sysprog/linux2023-quiz9)的測驗一,包含延伸問題。