--- tags: linux2022 --- # 2022q1 Homework5 (quiz5) contribute by < `bakudr18` > > [測驗題目](https://hackmd.io/@sysprog/linux2022-quiz5) ## 測驗 `2` lock-free data structure 必須要注意 [ABA problem](https://en.wikipedia.org/wiki/ABA_problem) ,例如若有一個 linked list 為 $a\rightarrow b\rightarrow c$ ,對 linked list 的 head 做 `compare_and_swap(&head, new = 'b', expected = 'a')` 操作時,若此時有令一個執行緒 連續 pop `'a' 'b'` 後再 push `'a'` ,則此時 linked list 變為 $a\rightarrow c$ , CAS 操作仍會成功,就會導致 `head` 被指向已經被釋放的 `'b'` 而導致錯誤。 而 [Hazard pointer](https://en.wikipedia.org/wiki/Hazard_pointer) 就是解決 ABA problem 的其中一種方法,可以避免仍有 reader 在讀取資料被釋放。其概念是當有 reader 要讀取資料時,都會建立一個 hazard pointer 指向該資料,直到 reader 讀取完畢才會移除 hazard pointer,而此時當有 writer 要移除資料時,若發現此資料在 hazard pointers 內,則不能直接移除,此時有兩種作法,一是一直等到資料該資料的 hazard pointer 被 reader 主動移除,writer 才能移除資料,二是先把此資料放到 retire list 中,等下次再來檢查是否有資料在 retire list 且不被 hazard pointer 所指向,此時才可以移除資料。 ![](https://i.imgur.com/gXoizWa.png) 接著看到程式碼: ```cpp typedef struct __hp { uintptr_t ptr; struct __hp *next; } hp_t; typedef struct { hp_t *pointers; /* hazard pointer structure */ hp_t *retired; /* retire list structure */ void (*deallocator)(void *); } domain_t; static domain_t *config_dom; ``` `config_dom` 為管理 hazard pointers 與 retire list 的資料結構,`pointers` 與 `retired` 皆為 linked list 結構,而 `deallocator` 為用來釋放資源的 function pointer。 ```cpp= /* * Load a safe pointer to a shared object. This pointer must be passed to * `drop` once it is no longer needed. Returns 0 (NULL) on error. */ uintptr_t load(domain_t *dom, const uintptr_t *prot_ptr) { const uintptr_t nullptr = 0; while (1) { uintptr_t val = atomic_load(prot_ptr); hp_t *node = list_insert_or_append(&dom->pointers, val); if (!node) return 0; /* Hazard pointer inserted successfully */ if (atomic_load(prot_ptr) == val) return val; /* * This pointer is being retired by another thread - remove this hazard * pointer and try again. We first try to remove the hazard pointer we * just used. If someone else used it to drop the same pointer, we walk * the list. */ uintptr_t tmp = val; if (!atomic_cas(&node->ptr, &tmp, &nullptr)) list_remove(&dom->pointers, val); } } /* * Drop a safe pointer to a shared object. This pointer (`safe_val`) must have * come from `load` */ void drop(domain_t *dom, uintptr_t safe_val) { if (!list_remove(&dom->pointers, safe_val)) __builtin_unreachable(); } ``` 首先看到以上兩個與 reader 相關的函式: * `load` 函式的作用是讀取 `prot_ptr` 所指向的地址 `val`,並且把此 `val` 加入到 hazard pointer 中,注意到在程式碼第 16 行,由於 `list_insert_or_append` 過程中 `val` 可能被移除,因此必須重新讀取 `port_ptr` 確認其指向的地址沒變才代表讀取成功,否則需把 hazard pointer 移除後重新讀取新的 `val`。 * 在 reader 讀取資料成功後,需主動呼叫 `drop` 函式,此函式會把 `safe_val` 從 hazard pointers 結構中移除,此時才表示 reader 已經完成讀取動作,writer 才能直接移除資料。 ```cpp static void cleanup_ptr(domain_t *dom, uintptr_t ptr, int flags) { if (!list_contains(&dom->pointers, ptr)) { /* deallocate straight away */ dom->deallocator((void *) ptr); } else if (flags & DEFER_DEALLOC) { /* Defer deallocation for later */ list_insert_or_append(&dom->retired, ptr); } else { /* Spin until all readers are done, then deallocate */ while (list_contains(&dom->pointers, ptr)) ; dom->deallocator((void *) ptr); } } /* Swaps the contents of a shared pointer with a new pointer. The old value will * be deallocated by calling the `deallocator` function for the domain, provided * when `domain_new` was called. If `flags` is 0, this function will wait * until no more references to the old object are held in order to deallocate * it. If flags is `DEFER_DEALLOC`, the old object will only be deallocated * if there are already no references to it; otherwise the cleanup will be done * the next time `cleanup` is called. */ void swap(domain_t *dom, uintptr_t *prot_ptr, uintptr_t new_val, int flags) { const uintptr_t old_obj = atomic_exchange(prot_ptr, new_val); cleanup_ptr(dom, old_obj, flags); } ``` 然後是與 writer 相關的函式: * `swap` 函式會將 `port_ptr` 指向 `new_val` ,並嘗試釋放 `old_obj` 。 * `cleanup_ptr` 會嘗試把 `old_obj` 釋放,如同前述提過的,當發現 `old_obj` 仍在 hazard pointers 內,此時有兩個選擇。若 `flags == DEFER_DEALLOC` ,則把 `old_obj` 放到 retire list 中,等待下次再釋放;或是一直 busy waiting 直到 `old_obj` 被 reader 移出 hazard pointers 中,然後釋放資源。 ```cpp #define N_READERS 10 #define N_WRITERS 1 #define N_ITERS 20 static void *reader_thread(void *arg) { (void) arg; struct timespec t1 = {.tv_sec = 0, .tv_nsec = 100}; for (int i = 0; i < N_ITERS; ++i) { config_t *safe_config = (config_t *) load(config_dom, (uintptr_t *) &shared_config); if (!safe_config) err(EXIT_FAILURE, "load"); print_config("read config ", safe_config); drop(config_dom, (uintptr_t) safe_config); nanosleep(&t1, NULL); } return NULL; } static void *writer_thread(void *arg) { (void) arg; for (int i = 0; i < N_ITERS / 2; ++i) { config_t *new_config = create_config(); new_config->v1 = rand(); new_config->v2 = rand(); new_config->v3 = rand(); print_config("updating config", new_config); swap(config_dom, (uintptr_t *) &shared_config, (uintptr_t) new_config, DEFER_DEALLOC); print_config("updated config ", new_config); } return NULL; } ``` 以上是 reader thread 與 writer thread ,為了讓 reader 與 writer 比較容易能交互執行,我在 reader thread 使用了 [nanosleep](https://man7.org/linux/man-pages/man2/nanosleep.2.html) 讓每次 read 成功後都暫停 thread 執行時間至少 100 ns,執行結果片段如下。 ```shell read config : { 0x00000000, 0x00000000, 0x00000000 } read config : { 0x00000000, 0x00000000, 0x00000000 } updating config : { 0x6b8b4567, 0x327b23c6, 0x643c9869 } updated config : { 0x6b8b4567, 0x327b23c6, 0x643c9869 } updating config : { 0x66334873, 0x74b0dc51, 0x19495cff } updated config : { 0x66334873, 0x74b0dc51, 0x19495cff } updating config : { 0x2ae8944a, 0x625558ec, 0x238e1f29 } read config : { 0x2ae8944a, 0x625558ec, 0x238e1f29 } read config : { 0x2ae8944a, 0x625558ec, 0x238e1f29 } read config : { 0x2ae8944a, 0x625558ec, 0x238e1f29 } read config : { 0x2ae8944a, 0x625558ec, 0x238e1f29 } updated config : { 0x2ae8944a, 0x625558ec, 0x238e1f29 } read config : { 0x2ae8944a, 0x625558ec, 0x238e1f29 } updating config : { 0x46e87ccd, 0x3d1b58ba, 0x507ed7ab } read config : { 0x2ae8944a, 0x625558ec, 0x238e1f29 } updated config : { 0x46e87ccd, 0x3d1b58ba, 0x507ed7ab } ``` ```cpp /* Forces the cleanup of old objects that have not been deallocated yet. Just * like `swap`, if `flags` is 0, this function will wait until there are no * more references to each object. If `flags` is `DEFER_DEALLOC`, only * objects that already have no living references will be deallocated. */ void cleanup(domain_t *dom, int flags) { hp_t *node; LIST_ITER (&dom->retired, node) { uintptr_t ptr = node->ptr; if (!ptr) continue; if (!list_contains(&dom->pointers, ptr)) { /* We can deallocate straight away */ if (list_remove(&dom->pointers, ptr)) dom->deallocator((void *) ptr); } else if (!(flags & DEFER_DEALLOC)) { /* Spin until all readers are done, then deallocate */ while (list_contains(&dom->pointers, ptr)) ; if (list_remove(&dom->pointers, ptr)) dom->deallocator((void *) ptr); } } } ``` `cleanup` 函式的目的是嘗試釋放在 retire list 中的資料,其檢查方式與 `cleanup_ptr` 相同。為此我另外建了一個 `cleaner_thread` 使用 [timerfd](https://man7.org/linux/man-pages/man2/timerfd_create.2.html) 週期性的執行 `cleanup` 來釋放資源。 ```cpp static void *cleaner_thread(void *arg) { atomic_bool *stop = (atomic_bool *) arg; struct itimerspec new_value; new_value.it_value.tv_sec = 0; new_value.it_value.tv_nsec = 1000; new_value.it_interval.tv_sec = 1; new_value.it_interval.tv_nsec = 0; int fd = timerfd_create(CLOCK_MONOTONIC, 0); if (fd == -1) warn("timerfd_create"); if (timerfd_settime(fd, TFD_TIMER_ABSTIME, &new_value, NULL)) warn("timerfd_settime"); while (!atomic_load (stop)) { uint64_t exp; ssize_t s = read(fd, &exp, sizeof(uint64_t)); if (s != sizeof(uint64_t)) { warn("read failed"); return NULL; } printf("cleanup\n"); cleanup(config_dom, DEFER_DEALLOC); } close(fd); return NULL; } int main() { pthread_t readers[N_READERS], writers[N_WRITERS], cleaner; atomic_bool stop = ATOMIC_VAR_INIT(false); init(); /* Start threads */ for (size_t i = 0; i < ARRAY_SIZE(readers); ++i) { if (pthread_create(readers + i, NULL, reader_thread, NULL)) warn("pthread_create"); } for (size_t i = 0; i < ARRAY_SIZE(writers); ++i) { if (pthread_create(writers + i, NULL, writer_thread, NULL)) warn("pthread_create"); } if (pthread_create(&cleaner, NULL, cleaner_thread, &stop)) warn("pthread_create"); /* Wait for threads to finish */ for (size_t i = 0; i < ARRAY_SIZE(readers); ++i) { if (pthread_join(readers[i], NULL)) warn("pthread_join"); } for (size_t i = 0; i < ARRAY_SIZE(writers); ++i) { if (pthread_join(writers[i], NULL)) warn("pthread_join"); } atomic_store(&stop, true); if (pthread_join(cleaner, NULL)) warn("pthread_join"); deinit(); return EXIT_SUCCESS; } ``` ### Memory order 事前準備: * [並行程式設計: 執行順序](https://hackmd.io/@sysprog/concurrency/https%3A%2F%2Fhackmd.io%2F%40sysprog%2Fconcurrency-ordering) * [並行程式設計: Atomics 操作](https://hackmd.io/@sysprog/concurrency/https%3A%2F%2Fhackmd.io%2F%40sysprog%2Fconcurrency-atomics) `load` 和 `swap` 都會操作相同的 share object `prot_ptr` ,而原程式碼對 `prot_ptr` 的 atomic 操作都是使用 sequentially-consistent memory order ,這裡嘗試更改成 release-acquire memory order,但首先要先搞懂兩者的差別。 首先看到 [gcc docs](https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html) 對 release-acquire memory order 的定義 > **__ATOMIC_ACQUIRE:** Creates an inter-thread happens-before constraint from the release (or stronger) semantic store to this acquire load. Can prevent hoisting of code to before the operation. > > **__ATOMIC_RELEASE:** Creates an inter-thread happens-before constraint to acquire (or stronger) semantic loads that read from this release store. Can prevent sinking of code to after the operation. > > **__ATOMIC_ACQ_REL:** Combines the effects of both __ATOMIC_ACQUIRE and __ATOMIC_RELEASE. acquire 與 release 需要成對的使用,是用來建立不同執行緒之間的 happens-before 關係,__ATOMIC_ACQUIRE 可以避免後面的指令被重排到此指令之前,而 __ATOMIC_RELEASE 可以避免前面的指令被重排到此指令之後。舉例來說 ```cpp /* shared variable */ bool M = false; int a = 0; void *thread1() { a = 1; atomic_store_explicit(&M, true, memory_order_release); } void *thread2() { if (atomic_load_explicit(&M, memory_order_acquire)) { assert(a == 1); } } ``` 若 store `M` happens-before load `M` , 則在 thread2 `M = true` 時,必定能保證 `a == 1` ,因為 thread1 `a = 1;` 的操作 happens-before store `M`。 接著看到 sequentially-consistant memory order ,根據 C11 standard >$5.1.2.4.7: All modifications to a particular atomic object M occur in some particular total order, called the modification order of M. If A and B are modifications of an atomic object M, and A happens before B, then A shall precede B in the modification order of M. > >$7.17.3.6: For memory_order_seq_cst, there shall be a single total order S on all operations, consistent with the ‘‘happens before’’ order and modification orders for all affected locations, such that each memory_order_seq_cst operation that loads a value observes either the last preceding modification according to this order S, or the result of an operation that is not memory_order_seq_cst. 意思是,對於每一個 `memory_order_seq_cst` 操作(無論在哪個 thread),都確保只會看到相同的修改順序,也就是 single total order。 單純看定義真的很難懂,後來我在 [cppreference](https://en.cppreference.com/w/cpp/atomic/memory_order#Sequentially-consistent_ordering) 找到一個例子,以及 [How std::memory_order_seq_cst works](https://stackoverflow.com/a/48984938/11425048) 有比較白話的解釋,能夠清楚區分 `memory_order_seq_cst` 與 `memory_order_acq_rel` 的差別,程式碼如下 ```cpp #include <thread> #include <atomic> #include <cassert> std::atomic<bool> x = {false}; std::atomic<bool> y = {false}; std::atomic<int> z = {0}; void write_x() { x.store(true, std::memory_order_seq_cst); } void write_y() { y.store(true, std::memory_order_seq_cst); } void read_x_then_y() { while (!x.load(std::memory_order_seq_cst)) ; if (y.load(std::memory_order_seq_cst)) { ++z; } } void read_y_then_x() { while (!y.load(std::memory_order_seq_cst)) ; if (x.load(std::memory_order_seq_cst)) { ++z; } } int main() { std::thread a(write_x); std::thread b(write_y); std::thread c(read_x_then_y); std::thread d(read_y_then_x); a.join(); b.join(); c.join(); d.join(); assert(z.load() != 0); // will never happen } ``` 上述程式碼能夠保證最後結果永遠為 `z != 0` ,原因是,對於 `x`, `y` 兩變數,假設其修改順序為先 `x = true;` 然後才是 `y = true;`,則在任意時間點所有的 thread 都只有可能看到以下三種 `x`, `y` 的狀態 * state1: `x == false && y == false` * state2: `x == true && y == false` * state3: `x == true && y == true` 因此,若 `x`, `y` 處於 state2 時,此時 thread c 開始執行,則 `++z;` 可能會發生也可能不會發生,因為執行 thread c 期間有可能 `x`, `y` 變為 state 3。 而當 state2 對 thread c 為**可見**時,同一時間 thread d 也只可能看到 state2 ,因此 thread d 必定會執行到 `++z;`。 然而,若改為使用 `memory_order_acq_rel` ,當 thread a 的結果對 thread c 是可見時,這裡只保證了 thread a 之前的指令(此例子為空指令)對 thread c 也是可見的,並**不表示** thread a 的結果對 thread d 也是可見的。 同理,當 thread b 的結果對 thread d 是可見的時,**不表示** thread b 的結果對 thread c 也是可見的。 因此可能發生以下情況 * thread a is happens-before thread c, but is **not** happens-before thread d * thread b is happens-before thread d, but is **not** happens-before thread c * thread c 看見 state: `{x = true, y = false}` 的同時,thread d 看見 state: `{x = false, y = true}` * 則此時 `assert(z != 0)` 發生 --- 回到原題目程式碼 ```cpp= /* reader */ uintptr_t load(domain_t *dom, const uintptr_t *prot_ptr) { const uintptr_t nullptr = 0; while (1) { uintptr_t val = atomic_load(prot_ptr, __ATOMIC_ACQUIRE); hp_t *node = list_insert_or_append(&dom->pointers, val); if (!node) return 0; /* Hazard pointer inserted successfully */ if (atomic_load(prot_ptr, __ATOMIC_ACQUIRE) == val) return val; ... } } /* writer */ void swap(domain_t *dom, uintptr_t *prot_ptr, uintptr_t new_val, int flags) { const uintptr_t old_obj = atomic_exchange(prot_ptr, new_val, __ATOMIC_ACQ_REL); cleanup_ptr(dom, old_obj, flags); } static void *writer_thread(void *arg) { (void) arg; for (int i = 0; i < N_ITERS / 2; ++i) { config_t *new_config = create_config(); new_config->v1 = rand(); new_config->v2 = rand(); new_config->v3 = rand(); print_config("updating config", new_config); swap(config_dom, (uintptr_t *) &shared_config, (uintptr_t) new_config, DEFER_DEALLOC); print_config("updated config ", new_config); } return NULL; } ``` 對於 reader 中對 `prot_ptr` 的 load 操作,只要保證 writer 的 store 操作(這裡指第 24 行的 `atomic_exchange`) 與其之前的指令 (如第 34~36 行 assign value to `new_config`) happens-before reader 的 load 操作即可。 閱讀 [kdnvt](https://hackmd.io/@kdnvt/linux2022-quiz5#Memory-barrier) 同學的共筆,發現一個我沒注意到的問題 ```shell= reader | writer ----------------------------------------------------------- | /* store on ptr */ /* invisible to | val = atomic_exchange(ptr,new); reader */ | /* still in write buffer */ | | /* load old */ | if(!list_contain(val)){ /* store on list */ | list_insert(val); | | /* load on ptr | ptr = old */ | if(atomic_load(ptr) == val){ | return val; | } | | /* visible to | // ptr = new; reader now */ | free(val); | } ``` 若使用 `memory_order_acq_rel` ,在 writer 執行第 3 行 `val = atomic_exchange(ptr,new)` 時,`ptr = new` 的結果會被存在硬體的 [store buffer](https://hackmd.io/@sysprog/concurrency/https%3A%2F%2Fhackmd.io%2F%40sysprog%2Fconcurrency-atomics#Store-Buffer) 上,接著照以上順序執行到第 14 行時,`atomic_load(ptr)` 仍會取到 `old` ,因此回傳 `val` 為 `old`,此時執行 writer 的 CPU 才廣播 invalidate `ptr` 以為時已晚,執行 `free(val)` 就會造成 reader 讀到的值被釋放掉。 我認為 kdnvt 同學的說法是對的,但是我自己做修改如 [commit d91dab ](https://github.com/bakudr18/hazard_pointer/commit/d91dabcfdea69bc9a1cfdd505eeb4b37e6dcde86)使用 `memory_order_acq_rel` ThreadSanitizer 也沒有報錯,嘗試建立更多的 writer/reader 如 [commit 0e9baf](https://github.com/bakudr18/hazard_pointer/commit/0e9bafc461e3b24584a40538b3d44114c530a87b) 也沒有出錯,有可能只是剛好 CPU 沒有以此順序執行。 經老師的提示,我嘗試換個環境測試,我找了一片 Raspberry Pi 3B+ 來測試,執行環境如下 ```shell ubuntu@ubuntu:~$ uname -a Linux ubuntu 5.13.0-1024-raspi #26-Ubuntu SMP PREEMPT Fri Mar 25 10:54:36 UTC 2022 aarch64 aarch64 aarch64 GNU/Linux ``` 同樣執行 [commit 0e9baf](https://github.com/bakudr18/hazard_pointer/commit/0e9bafc461e3b24584a40538b3d44114c530a87b) 也沒有出錯,因此開始嘗試減化程式碼讓測試更單純,也希望較少的程式碼能增加 memory reordering 機率,完整程式碼可見 [hp_ordering](https://github.com/bakudr18/hp_ordering),以下節錄片段 ```cpp /* Only accept one reader */ static void *reader_thread(void *arg) { (void) arg; const uintptr_t nullptr = 0; pthread_barrier_wait(&barr); for (int i = 0; i < N_ITERS; i++) { while (1) { uintptr_t val = (uintptr_t) __atomic_load_n(&shared_config, __ATOMIC_ACQUIRE); __atomic_store(&hp_ptr, &val, __ATOMIC_RELEASE); if (__atomic_load_n(&shared_config, __ATOMIC_ACQUIRE) == (config_t *) val) { print_config("read config ", (config_t *) val); break; } __atomic_store(&hp_ptr, &nullptr, __ATOMIC_RELEASE); } } __atomic_store(&hp_ptr, &nullptr, __ATOMIC_RELEASE); return NULL; } static void *writer_thread(void *arg) { (void) arg; pthread_barrier_wait(&barr); for (int i = 0; i < N_ITERS; i++) { config_t *new_config = create_config(); new_config->v1 = rand(); const uintptr_t old_obj = (uintptr_t) __atomic_exchange_n( &shared_config, new_config, __ATOMIC_ACQ_REL); while (__atomic_load_n(&hp_ptr, __ATOMIC_ACQUIRE) == old_obj) ; delete_config((void *) old_obj); } return NULL; } ``` 首先在 writer 與 reader 開頭使用 [pthread_barrier_wait](https://linux.die.net/man/3/pthread_barrier_wait),其作用是會等待多個執行緒都執行到同一個等待點才允許繼續執行,這樣做可以讓兩者交互執行機率變較高。 另外可以看到 reader 的部份我使用 `hp_ptr` 只允許有一個 reader,取代原本的 hazard pointer 結構,目的是減少兩次 load `shared_config` 之間的指令數量,希望能增加 reorder 機會,而 writer 部份也是盡可能簡化,不過同樣允許多個 writer 同時執行。 然而結果一樣不如我所想,ThreadSanitizer 沒有觸發 data race。 考量到有可能是 ThreadSanitizer 沒辦法偵測到這類型的 data race ,我另外修改了 [commit 137592](https://github.com/bakudr18/hp_ordering/commit/137592f9f852f93a27b87fc1d2344ad15bc868ab) 用 assert 來偵測,但在 RPi 3 上仍然不會發生錯誤。 ```cpp /* Only accept one reader */ static void *reader_thread(void *arg) { (void) arg; const uintptr_t nullptr = 0; pthread_barrier_wait(&barr); for (int i = 0; i < N_ITERS; i++) { while (1) { uintptr_t val = (uintptr_t) __atomic_load_n(&shared_config, __ATOMIC_ACQUIRE); __atomic_store(&hp_ptr, &val, __ATOMIC_RELEASE); if (__atomic_load_n(&shared_config, __ATOMIC_ACQUIRE) == (config_t *) val) { assert(((config_t *) val)->v1 != ERR_FATAL); break; } __atomic_store(&hp_ptr, &nullptr, __ATOMIC_RELEASE); } } __atomic_store(&hp_ptr, &nullptr, __ATOMIC_RELEASE); return NULL; } static void *writer_thread(void *arg) { (void) arg; pthread_barrier_wait(&barr); for (int i = 0; i < N_ITERS; i++) { config_t *new_config = create_config(); new_config->v1 = rand() & N_ITERS; config_t *old_obj = __atomic_exchange_n(&shared_config, new_config, __ATOMIC_ACQ_REL); while (__atomic_load_n(&hp_ptr, __ATOMIC_ACQUIRE) == (uintptr_t) old_obj) ; // old_obj is retired old_obj->v1 = ERR_FATAL; } return NULL; } ``` ### 重新實驗 有鑑於我對 atomic operation 的實驗方式有些疑惑,因此決定從簡單的實驗做起。考慮以下學習 atomic operation 很常見的範例 ```cpp /* X and published are both initialize as zero */ /* thread A */ atomic_store_explicit(&X, 1, memory_order_relaxed); atomic_store_explicit(&published, 1, memory_order_relaxed); /* thread B */ if(atomic_load_explicit(&published, memory_order_relaxed) == 1){ assert(atomic_load_explicit(&X, memory_order_relaxed) == 1); } ``` `memory_order_relaxed` 只保證了本身操作的 atomic,允許編譯器與處理器充份發揮 memory ordering,因此 thread A 的 `X` 與 `published` 操作是有可能被重新排序的,注意到只是**可能**,並不表示實際執行時一定會發生。實際實驗時,我模仿 [Memory Reordering Caught in the Act](https://preshing.com/20120515/memory-reordering-caught-in-the-act/) 使用 semaphore 讓 thread A 與 thread B 能一直恢復 `X` 與 `published` 初始狀態重新執行,完整程式碼如下,我在 intel core i7-1165G7 與 Rasberry Pi 3 B+ 上兩個硬體上測試皆沒有觸發 `assert`。 ```cpp #include <stdio.h> #include <assert.h> #include <err.h> #include <stdlib.h> #include <stdatomic.h> #include <pthread.h> #include <semaphore.h> static atomic_uint published; static atomic_uint X; static sem_t begin_sem1, begin_sem2, end_sem; void *storer(void *arg){ for(;;){ sem_wait(&begin_sem1); atomic_store_explicit(&X, 1, memory_order_relaxed); atomic_store_explicit(&published, 1, memory_order_relaxed); sem_post(&end_sem); } return NULL; } void *loader(void *arg){ for(;;){ sem_wait(&begin_sem2); if(atomic_load_explicit(&published, memory_order_relaxed) == 1){ assert(atomic_load_explicit(&X, memory_order_relaxed) == 1); } sem_post(&end_sem); } return NULL; } int main(){ pthread_t lthread, sthread; sem_init(&begin_sem1, 0, 0); sem_init(&begin_sem2, 0, 0); sem_init(&end_sem, 0, 0); if(pthread_create(&lthread, NULL, loader, NULL)) warn("pthread_create"); if(pthread_create(&sthread, NULL, storer, NULL)) warn("pthread_create"); for(;;){ atomic_store_explicit(&published, 0, memory_order_relaxed); atomic_store_explicit(&X, 0, memory_order_relaxed); sem_post(&begin_sem1); sem_post(&begin_sem2); sem_wait(&end_sem); sem_wait(&end_sem); } if(pthread_join(lthread, NULL)) warn("pthread_join"); if(pthread_join(sthread, NULL)) warn("pthread_join"); return 0; } ``` 我目前可以想到的情況是,在一個 CPU core pipeline 沒有很多指令需要執行時,也就是不夠忙碌時,其實不一定會嘗試做 memory reordering,因此,簡單的想法就是增加允許被重新排序的指令數量,如增加與 `X` 相同作用的 `Y`, `Z` 兩個變數,重新實驗在 RPi 3 上確實發生了`assert`。 ```diff void *storer(void *arg){ for(;;){ sem_wait(&begin_sem1); atomic_store_explicit(&X, 1, memory_order_relaxed); + atomic_store_explicit(&Y, 2, memory_order_relaxed); + atomic_store_explicit(&Z, 3, memory_order_relaxed); atomic_store_explicit(&published, 1, memory_order_relaxed); sem_post(&end_sem); } return NULL; } void *loader(void *arg){ for(;;){ sem_wait(&begin_sem2); if(atomic_load_explicit(&published, memory_order_relaxed) == 1){ assert(atomic_load_explicit(&X, memory_order_relaxed) == 1); + assert(atomic_load_explicit(&Y, memory_order_relaxed) == 2); + assert(atomic_load_explicit(&Z, memory_order_relaxed) == 3); } sem_post(&end_sem); } return NULL; } ``` 用相同方法驗證 hazard pointer,如 [commit 241b59](https://github.com/bakudr18/hp_ordering/commit/241b59c7bac033c20442a2ca4ee870a2d11f24b1),在 intel Core i7-1165G7 上也驗證到了以下 `assert` (在 RPi 3 上會因為資源不足導致 process 被 Kill)。 ```shell hp: main.c:79: reader_thread: Assertion `((config_t *) val)->v1 != ERR_FATAL' failed. Aborted (core dumped) ``` 從以上經驗可知,memory order 的驗證方式其實非常困難,很有可能即使概念是錯的,但剛好硬體沒有對指令重新排序,導致最後執行結果依舊成功,因此,在驗證上除了可以簡化程式碼,也可能需要使用 [I/O ordering 學習紀錄](https://hackmd.io/@butastur/rkkQEGjUN) 與 [Axiomatic validation of memory barriers and atomic instructions](https://lwn.net/Articles/608550/) 裡提到的 [litmus7](http://diy.inria.fr/doc/litmus.html) 等驗證工具才能完整驗證概念的正確性。 ### Multiple writers 原程式碼如果有兩個以上的 writer ,執行時期 ThreadSanitizer 會偵測到 data race 如下 ```cpp ================== WARNING: ThreadSanitizer: data race (pid=40244) Write of size 8 at 0x7b0400000008 by thread T5: #0 free ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:711 (libtsan.so.0+0x37ab8) #1 delete_config <null> (hp+0x15be) #2 writer_thread <null> (hp+0x1899) Previous read of size 4 at 0x7b0400000008 by thread T4: #0 writer_thread <null> (hp+0x18a1) Thread T5 (tid=40250, running) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605f8) #1 main <null> (hp+0x135a) Thread T4 (tid=40249, finished) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605f8) #1 main <null> (hp+0x133a) SUMMARY: ThreadSanitizer: data race (/home/bakud/linux2022/hazard_pointer/hp+0x15be) in delete_config ================== ``` 會產生 data race 的原因在於, writer 在 `swap` 成功後,試圖去讀取 `new_config` 資料,就可能發生在讀取當下有其他 wrtier 執行 `delete_config` 把資料釋放掉,造成錯誤結果。 解法如 [commit a355f3](https://github.com/bakudr18/hazard_pointer/commit/a355f33724e2b239b37e978a3b55d778a321f902) 所示,在 writer 執行 `swap` 之前先將 `new_config` 加進 hazard pointer list 中,等待 writer 使用完畢後主動的刪除 `new_config` 的 hazard pointer,此舉可以想成把 writer 自己也當成一個 reader。