# Linux 核心專題: 《Concurrency Primer》校訂和範例撰寫 > 執行人: GaberPlaysGame > [Github](https://github.com/GaberPlaysGame/hungry-birds) > [專題解說錄影](https://youtu.be/OWCRpzT100o) ## 提問清單 :::success :question: 提問清單 * ? ::: * 根據〈Concurrency Primer〉所言,使用 compare_exchange_weak 會忽略 "Spurious LL/SC Failure",這段我其實沒有理解的很清楚以及它可能的發生條件。 * 我有試過將 free 與 malloc 包裝成 critical section,並利用 TAS 進行鎖的操作。問題似乎不會因此解決,但整體只發生一次,後續利用 GDB 測試的結果有看到部分執行緒被重新 join,因此懷疑是有一個或以上進行 producer 的執行緒卡住了 queue_pop 的執行。但後續的 data race 因為產出結果的 Previous Write 段落不完整,不太能夠解讀並想到要如何排錯。 ## 任務簡述 檢視〈Concurrency Primer〉草稿並記錄閱讀過程中的疑惑,嘗試用 C11 Atomics 改寫原有程式碼,並確保在 x86-64 及 QEMU/Arm 正確運作。預計產出: * 以 C11 Atomics 實作經典 lock-free 程式 * 探討 memory ordering/barrier ## TODO: 校訂〈Concurrency Primer〉 檢視〈Concurrency Primer〉草稿並記錄閱讀過程中的疑惑,,參見[〈Concurrency Primer〉導讀](https://hackmd.io/@sysprog/concurrency-primer),確保原本的 C++ 程式都以 C11 Atomics 改寫並驗證。 > 改寫時,先列出原本的 C++11 程式碼,隨後則是改寫後的 C11 Atomics 實作 ### 2. Enforcing law and order C++11 程式碼: ```cpp #include <atomic> int v = 0; std::atomic_bool v_ready(false); void threadA(){ v = 42; v_ready = true; } void threadB(){ while (!v_ready) { /* wait */ } const int my_v = v; // Do something with my_v... } ``` 以 C11 改寫後的程式碼: ```c #include <stdatomic.h> #include <stdbool.h> int v = 0; atomic_bool v_ready = false; void *threadA() { v = 42; v_ready = true; } void *threadB() { while(!v_ready) { /* wait */ } const int my_v = v; // Do something with my_v... } ``` ### 5.2 Test and set C++11 程式碼: ```cpp std::atomic_flag af; void lock() { while (af.test_and_set()) { /* wait */ } } void unlock() { af.clear(); } ``` C11 程式碼: ```c atomic_flag af = ATOMIC_FLAG_INIT; void lock() { while (atomic_flag_test_and_set(&af)) { /* wait */ } } void unlock() { atomic_flag_clear(&af); } ``` 這邊利用 TAS 與 Atomic Flag 製作一個類似於互斥鎖 (Mutex) 的程式。使用時將 lock 與 unlock 放在 Critical Section 頭尾便可。 根據 [IBM Documentation](https://www.ibm.com/docs/en/zos/2.1.0?topic=c11-types) 對 Atomic Flag 的解釋,若是 C11 Atomic Flag 沒有被初始化,則它會被初始成未定狀態。 > You can use the macro ATOMIC_FLAG_INIT to initialize an atomic_flag object to the clear state. If an atomic_flag object is not explicitly initialized with ATOMIC_FLAG_INIT, it is initially in an indeterminate state. ### 5.4 Compare and swap C++11 程式碼: ```cpp enum class TaskState : int8_t { Idle, Running, Cancelled }; std::atomic<TaskState> ts; void taskLoop() { ts = TaskState::Running; while (ts == TaskState::Running) { // Do good work. } } bool cancel() { auto expected = TaskState::Running; return ts.compare_exchange_strong(expected, TaskState::Cancelled); } ``` C11 程式碼: ```c enum taskstate { Idle, Running, Cancelled }; typedef enum taskstate TaskState; _Atomic TaskState ts; void taskLoop() { ts = Running; while (ts == Running) { // Do good work. } } bool cancel() { TaskState expected = Running; return atomic_compare_exchange_strong(&ts, &expected, Cancelled); } ``` CAS (又稱 Compare Exchange) 可以用來比較並在 ts = expected 時修改數值,在這個例子中它被用來停止迴圈。 ### 8.1 Spurious LL/SC failures C++11 程式碼: ```cpp void atomicMultiply(int by) { int expected = foo; // Which CAS should we use? while (!foo.compare_exchange_?(expected, expected * by)) { // Empty loop. // (On failure, expected is updated with foo's most recent value.) } } ``` C11 程式碼: ```c atomic_int foo; void atomicMultiply(int by) { int expected = foo; // Which CAS should we use? while (!atomic_compare_exchange_?(&foo, &expected, expected * by)) { // Empty loop. // (On failure, expected is updated with foo's most recent value.) } } ``` 這邊 Concurrency Primer 認為說,在使用 _strong 函式的情況下編譯器必須生成內外兩個迴圈來防止我們遇到 Sequential Consistent 錯誤,但其實我們不在乎錯誤發生何時發生,因此另有 _weak 版本的函式讓 CAS 只要沒有正確完成時就直接回傳。 ### 10 Memory ordering C++11 程式碼: ```cpp void lock() { while (af.test_and_set(memory_order_acquire)) { /* wait */ } } void unlock() { af.clear(memory_order_release); } int i = foo.load(memory_order_acquire); void atomicMultiply(int by) { int expected = foo; while (!foo.compare_exchange_weak( expected, expected * by, memory_order_seq_cst, // On success memory_order_relaxed)) // On failure { /* empty loop */ } } ``` C11 程式碼: ```c atomic_flag af = ATOMIC_FLAG_INIT; void lock() { while (atomic_flag_test_and_set_explicit(&af, memory_order_acquire)) { /* wait */ } } void unlock() { atomic_flag_clear_explicit(&af, memory_order_release); } int i = atomic_load_explicit(&foo, memory_order_acquire); void atomicMultiply(int by) { int expected = foo; while (!atomic_compare_exchange_weak_explicit( &foo, &expected, expected * by, memory_order_seq_cst, // On success memory_order_relaxed)) // On failure { /* empty loop */ } } ``` 這邊必須將原本的函式後方加上 _explicit,才可以設定 Memory Order。 ### 10.1 Acquire and release C++11 程式碼: ```cpp int acquireFoo() { return foo.load(memory_order_acquire); } void releaseFoo(int i) { foo.store(i, memory_order_release); } int v; std::atomic_bool v_ready(false); void threadA() { v = 42; v_ready.store(true, memory_order_release); } void threadB() { while (!v_ready.load(memory_order_acquire)) { // wait } assert(v == 42); // Must be true } ``` C11 程式碼: ```c int acquireFoo() { return atomic_load_explicit(&foo, memory_order_acquire); } void releaseFoo(int i) { atomic_store_explicit(&foo, i, memory_order_release); } int v; atomic_bool v_ready = false; void threadA() { v = 42; atomic_store_explicit(&v_ready, true, memory_order_release); } void threadB() { while (!atomic_load_explicit(&v_ready, memory_order_acquire)) { // wait } assert(v == 42); // Must be true } ``` Acquire 與 Release 是兩個成對的記憶體排序。Acquire 會將任何所有在它之後的記憶體操作維持排序並與其他 CPU 同步,通常會搭配記憶體柵欄來防止 CPU 重新排列操作。Release 則是與其相反。兩者常被用來進行鎖的實作,也因此 Acquire 常被用在 Load 操作,而 Release 被用在 Store 操作。 ### 10.2 Relaxed C++11 程式碼: ```cpp atomic_bool stop(false); void worker() { while (!stop.load(memory_order_relaxed)) { // Do good work. } } void atomicMultiply(int by) { int expected = foo.load(memory_order_relaxed); while (!foo.compare_exchange_weak( expected, expected * by, memory_order_release, memory_order_relaxed)) { /* empty loop */ } } int main() { launchWorker(); // Wait some... stop = true; // seq_cst joinWorker(); } ``` C11 程式碼: ```c atomic_bool stop = false; void worker() { while (!atomic_load_explicit(&stop, memory_order_relaxed)) { // Do good work. } } void atomicMultiply(int by) { int expected = atomic_load_explicit(&foo, memory_order_relaxed); while (!atomic_compare_exchange_weak_explicit( &foo, &expected, expected * by, memory_order_release, memory_order_relaxed)) { /* empty loop */ } } int main() { launchWorker(); // Wait some... stop = true; // seq_cst joinWorker(); } ``` Relaxed 是相對 Acquire Release 來講比較弱的記憶體操作,它可以保證兩個 Relaxed 操作不會被重新排序,但它不會為其他 CPU 去做同步操作。 ### 10.3 Acquire-Release C++11 程式碼: ```cpp atomic_int refCount; void inc() { refCount.fetch_add(1, memory_order_relaxed); } void dec() { if (refCount.fetch_sub(1, memory_order_acq_rel) == 1) { // No more references, delete the data. } } ``` C11 程式碼: ```c atomic_int refCount; void inc() { atomic_fetch_add_explicit(&refCount, 1, memory_order_relaxed); } void dec() { if (atomic_fetch_sub_explicit(&refCount, 1, memory_order_acq_rel) == 1) { // No more references, delete the data. } } ``` Acq-Rel 可以說類似於上面提到的 Acquire 與 Release,只不過它被特化用於同時需要進行讀取與儲存的函式,如 atomic_compare_exchange 系列函式。 另外還有一種排序為 SeqCst,保證程式之間可以有 Sequential Consistency,被認為是最強的記憶體排序,但因為每個 Atomic 操作都會對 CPU 快取造成一定的開銷,SeqCst 的花費自然比其他記憶體操作來得要大,因此它並不是唯一推薦的記憶體排序。 ## TODO: 以 C11 Atomics 實作經典 lock-free 程式 以 C11 Atomics 改寫 [hungry-birds](https://github.com/jserv/hungry-birds) 並解釋運作原理 (注意: 存在若干實作錯誤),需要用 ThreadSanitizer 驗證,作為日後教材的補充內容。 > 對照 [Issue #5](https://github.com/jserv/hungry-birds/issues/5) > 延伸閱讀: [Atomics](https://wusyong.github.io/posts/atomic/) ### 運作原理 ```c static const size_t sentinel = 0xdeadc0de; static const size_t alignment = sizeof(size_t); typedef struct node { atomic_uintptr_t next; } node; struct __QueueInternal { atomic_uintptr_t head; atomic_uintptr_t tail; size_t item_size; }; ``` 該 lock-free 程式使用 C11 Atomics 實作佇列,利用 `atomic_uintptr_t` 這個指標類型儲存佇列頭尾以及 `next` 的值以構成 Linked List 的結構。 - `head`:佇列頭,主要用於 `queue_pop` 操作。 - `tail`:佇列尾,主要用於 `queue_push` 操作。 - `item_size`:儲存佇列中每個元素的大小。 #### queue_create ```c queue_p queue_create(size_t item_size) { size_t *ptr = calloc(sizeof(struct __QueueInternal) + alignment, 1); ptr[0] = sentinel; queue_p q = (queue_p)(ptr + 1); atomic_init(&q->head, 0); atomic_init(&q->tail, 0); q->item_size = item_size; return q; } ``` 在該程式創建佇列時,會利用 sentinel 這個 [Hexspeak](https://en.wikipedia.org/wiki/Hexspeak) 去表示佇列的開頭,並將頭尾用 atomic 函式設置為 0。 #### queue_push ```c QueueResult queue_push(queue_p q, void *data) { assert(q); /* create the new tail */ node *new_tail = malloc(sizeof(node) + q->item_size); if (!new_tail) { return QUEUE_OUT_OF_MEMORY; } atomic_init(&new_tail->next, 0); memcpy(new_tail + 1, data, q->item_size); /* swap the new tail with the old */ node *old_tail = (node *) atomic_exchange(&q->tail, (uintptr_t) new_tail); /* link the old tail to the new */ if (old_tail) { atomic_store(&old_tail->next, (uintptr_t) new_tail); } else { atomic_store(&q->head, (uintptr_t) new_tail); } return QUEUE_SUCCESS; } ``` 推入佇列的動作由 queue_push 函式達成,程式會首先配置一塊新的記憶體區塊 new_tail,並利用 atomic_init 初始化。接下來透過 atomic_exchange 操作交換佇列尾端與 new_tail 的值,取得下來的指標這邊以 old_tail 表示。 交換之後 old_tail 會有兩種可能情況: - 若 old_tail 為 0,則根據 queue_create 函式的設置可知道佇列此時沒有任何元素存在,因此將佇列的頭利用 atomic_store 存入剛剛配置的 new_tail 的值。 - 若 old_tail 不為 0,則代表佇列尾早前已經被存入其它記憶體的值,因此就只需要將 old_tail 的下一個指標設為 new_tail 即可。 #### queue_pop ```c QueueResult queue_has_front(queue_p q) { assert(q); if (atomic_load(&q->head) == 0) return QUEUE_FALSE; return QUEUE_TRUE; } QueueResult queue_pop(queue_p q) { assert(q); assert(queue_has_front(q) == QUEUE_TRUE); /* get the head */ node *popped = (node *) atomic_load(&q->head); node *compare = popped; /* set the tail and head to nothing if they are the same */ if (atomic_compare_exchange_strong(&q->tail, &compare, 0)) { compare = popped; /* it is possible for another thread to have pushed after * we swap out the tail. In this case, the head will be different * then what was popped, so we just do a blind exchange regardless * of the result. */ atomic_compare_exchange_strong(&q->head, &compare, 0); } else { /* tail is different from head, set the head to the next value */ node *new_head = 0; while (!new_head) { /* it is possible that the next node has not been assigned yet, * so just spin until the pushing thread stores the value. */ new_head = (node *) atomic_load_explicit(&popped->next, memory_order_acquire); } atomic_store_explicit(&q->head, (uintptr_t) new_head, memory_order_release); } free(popped); return QUEUE_SUCCESS; } ``` 取出佇列的動作由 `queue_pop` 完成,函式首先將 `q->head` 的初始值存儲於 `popped` 指標上,並將 `popped` 的值複製到 `compare` 並進行 CAS。 - 佇列要先透過 `queue_has_front` 函式確認有元素,否則會產生斷言錯誤。 - 若 `compare` 的值與佇列尾相同,則代表這個佇列只有一個元素。 - 佇列尾會被設為 0,且將 `popped` 的值再次複製到 `compare` 一次。這個操作我不清楚原因,但我認為是為了確保接下來與佇列頭的比較可以順利在 MPSC 的情況下執行。 - 由於佇列尾被設為 0,因此當有其它執行緒同時進行 `queue_push` 時,佇列會被判定為「沒有元素」的情況,進而更改了佇列頭的值。因此,進行 `q->head` 與 compare 的比較時結果可能不相同。因此函式再次進行一次 CAS,若佇列頭的值沒有被更動則將其設為 0。 - 若 `compare` 的值與佇列尾不同,則佇列有多個元素。 - 新建一個指標 `new_head` 用來複製原本佇列頭的下一個值。由於佇列頭的下一個值可能還沒有被存入,即有執行緒還在 `queue_push` 的 `atomic_store` 區塊,因此這邊函式利用 while 迴圈讓 `new_head` 反覆存取 `popped->next` 直到其非 0。 - 當 `new_head` 存取到值後,`q->head` 會存入 `new_head` 的值而完成操作。 當以上操作執行完後 `popped` 指標的記憶體會被釋放並回傳取出成功。 ### 以 ThreadSanitizer 驗證 無修改運行時程式不會提示任何錯誤而順利結束。若在 Makefile 內加上 `-fsanitize=thread` 標籤後運行,當程式進行到 Stress Test 區塊可以看到終端機發現有 data race 發生在 `queue_pop` 與 `queue_push` 函式,進而導致程式無法順利結束。 ``` WARNING: ThreadSanitizer: data race (pid=16033) Write of size 8 at 0x7b0400000800 by thread T1: #0 free ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:711 (libtsan.so.0+0x37ab8) #1 queue_pop /home/gaberil0903/linux2023/hungry-birds/queue.c:79 (pc-test+0x1fde) Previous write of size 8 at 0x7b0400000800 by thread T2: #0 malloc ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:655 (libtsan.so.0+0x31c57) #1 queue_push /home/gaberil0903/linux2023/hungry-birds/queue.c:87 (pc-test+0x205f) Thread T1 (tid=16035, running) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8) #1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:129 (pc-test+0x19dd) Thread T2 (tid=16036, running) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8) #1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:133 (pc-test+0x1a55) SUMMARY: ThreadSanitizer: data race /home/gaberil0903/linux2023/hungry-birds/queue.c:79 in queue_pop ``` ``` WARNING: ThreadSanitizer: data race (pid=16033) Write of size 8 at 0x7b0400000808 by thread T1: #0 free ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:711 (libtsan.so.0+0x37ab8) #1 queue_pop /home/gaberil0903/linux2023/hungry-birds/queue.c:79 (pc-test+0x1fde) Previous write of size 8 at 0x7b0400000808 by thread T2: [failed to restore the stack] Thread T1 (tid=16035, running) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8) #1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:129 (pc-test+0x19dd) Thread T2 (tid=16036, running) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8) #1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:133 (pc-test+0x1a55) SUMMARY: ThreadSanitizer: data race /home/gaberil0903/linux2023/hungry-birds/queue.c:79 in queue_pop ``` 以 GDB 分析過後可以發現儘管進行了多次的 `queue_push`,data race 總是會在第一次的 `queue_pop` 的 free 函式執行時發生,原因疑似是 `queue_pop` 執行 `free` 函式時,`queue_push` 同時對記憶體進行 `malloc`。 我嘗試利用 `atomic_test_and_set` 函式,設置一個 `atomic_flag` 來達到類似互斥鎖的效果去執行問題段,雖未能解決大部分問題,但 data race 的發生時間有所變化。 ``` WARNING: ThreadSanitizer: data race (pid=9070) Write of size 8 at 0x7b0400000800 by thread T1: #0 free ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:711 (libtsan.so.0+0x37ab8) #1 queue_pop /home/gaberil0903/linux2023/hungry-birds/queue.c:81 (pc-test+0x1fee) Previous write of size 8 at 0x7b0400000800 by thread T2: #0 malloc ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:655 (libtsan.so.0+0x31c57) #1 queue_push /home/gaberil0903/linux2023/hungry-birds/queue.c:92 (pc-test+0x208a) Thread T1 (tid=9072, running) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8) #1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:130 (pc-test+0x19dd) Thread T2 (tid=9073, running) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8) #1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:134 (pc-test+0x1a55) SUMMARY: ThreadSanitizer: data race /home/gaberil0903/linux2023/hungry-birds/queue.c:81 in queue_pop ``` 第一次的 data race 一樣發生在第一次的 free 與 malloc 同時進行。 ``` WARNING: ThreadSanitizer: data race (pid=9070) Write of size 8 at 0x7b0400000818 by thread T1: #0 free ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:711 (libtsan.so.0+0x37ab8) #1 queue_pop /home/gaberil0903/linux2023/hungry-birds/queue.c:81 (pc-test+0x1fee) Previous write of size 1 at 0x7b040000081b by thread T2: #0 memcpy ../../../../src/libsanitizer/sanitizer_common/sanitizer_common_interceptors.inc:827 (libtsan.so.0+0x6243e) #1 memcpy ../../../../src/libsanitizer/sanitizer_common/sanitizer_common_interceptors.inc:819 (libtsan.so.0+0x6243e) #2 queue_push /home/gaberil0903/linux2023/hungry-birds/queue.c:102 (pc-test+0x20f4) Thread T1 (tid=9072, running) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8) #1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:130 (pc-test+0x19dd) Thread T2 (tid=9073, running) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8) #1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:134 (pc-test+0x1a55) SUMMARY: ThreadSanitizer: data race /home/gaberil0903/linux2023/hungry-birds/queue.c:81 in queue_pop ``` 第二次的 data race 為新出現的問題,在第二次的 free 與 memcpy 同時進行的情況下發生。 ``` WARNING: ThreadSanitizer: data race (pid=9070) Write of size 8 at 0x7b040002e0d0 by thread T1: #0 free ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:711 (libtsan.so.0+0x37ab8) #1 queue_pop /home/gaberil0903/linux2023/hungry-birds/queue.c:81 (pc-test+0x1fee) Previous write of size 8 at 0x7b040002e0d0 by thread T11: [failed to restore the stack] Thread T1 (tid=9072, running) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8) #1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:130 (pc-test+0x19dd) Thread T11 (tid=9082, running) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8) #1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:134 (pc-test+0x1a55) SUMMARY: ThreadSanitizer: data race /home/gaberil0903/linux2023/hungry-birds/queue.c:81 in queue_pop ``` 第三次的 data race 與原本的第二次 data race 為同種問題,但發生時間被延後到數次 free 函式後,而非原本的在第一次 free 發生的情況。 :::warning 以 atomics 保護 critical section :notes: jserv :::