---
tags: linux2022
---
# 2022q1 Homework5 (quiz5)
contribute by < `bakudr18` >
> [測驗題目](https://hackmd.io/@sysprog/linux2022-quiz5)
## 測驗 `2`
lock-free data structure 必須要注意 [ABA problem](https://en.wikipedia.org/wiki/ABA_problem) ,例如若有一個 linked list 為 $a\rightarrow b\rightarrow c$ ,對 linked list 的 head 做 `compare_and_swap(&head, new = 'b', expected = 'a')` 操作時,若此時有令一個執行緒 連續 pop `'a' 'b'` 後再 push `'a'` ,則此時 linked list 變為 $a\rightarrow c$ , CAS 操作仍會成功,就會導致 `head` 被指向已經被釋放的 `'b'` 而導致錯誤。
而 [Hazard pointer](https://en.wikipedia.org/wiki/Hazard_pointer) 就是解決 ABA problem 的其中一種方法,可以避免仍有 reader 在讀取資料被釋放。其概念是當有 reader 要讀取資料時,都會建立一個 hazard pointer 指向該資料,直到 reader 讀取完畢才會移除 hazard pointer,而此時當有 writer 要移除資料時,若發現此資料在 hazard pointers 內,則不能直接移除,此時有兩種作法,一是一直等到資料該資料的 hazard pointer 被 reader 主動移除,writer 才能移除資料,二是先把此資料放到 retire list 中,等下次再來檢查是否有資料在 retire list 且不被 hazard pointer 所指向,此時才可以移除資料。
![](https://i.imgur.com/gXoizWa.png)
接著看到程式碼:
```cpp
typedef struct __hp {
uintptr_t ptr;
struct __hp *next;
} hp_t;
typedef struct {
hp_t *pointers; /* hazard pointer structure */
hp_t *retired; /* retire list structure */
void (*deallocator)(void *);
} domain_t;
static domain_t *config_dom;
```
`config_dom` 為管理 hazard pointers 與 retire list 的資料結構,`pointers` 與 `retired` 皆為 linked list 結構,而 `deallocator` 為用來釋放資源的 function pointer。
```cpp=
/*
* Load a safe pointer to a shared object. This pointer must be passed to
* `drop` once it is no longer needed. Returns 0 (NULL) on error.
*/
uintptr_t load(domain_t *dom, const uintptr_t *prot_ptr)
{
const uintptr_t nullptr = 0;
while (1) {
uintptr_t val = atomic_load(prot_ptr);
hp_t *node = list_insert_or_append(&dom->pointers, val);
if (!node)
return 0;
/* Hazard pointer inserted successfully */
if (atomic_load(prot_ptr) == val)
return val;
/*
* This pointer is being retired by another thread - remove this hazard
* pointer and try again. We first try to remove the hazard pointer we
* just used. If someone else used it to drop the same pointer, we walk
* the list.
*/
uintptr_t tmp = val;
if (!atomic_cas(&node->ptr, &tmp, &nullptr))
list_remove(&dom->pointers, val);
}
}
/*
* Drop a safe pointer to a shared object. This pointer (`safe_val`) must have
* come from `load`
*/
void drop(domain_t *dom, uintptr_t safe_val)
{
if (!list_remove(&dom->pointers, safe_val))
__builtin_unreachable();
}
```
首先看到以上兩個與 reader 相關的函式:
* `load` 函式的作用是讀取 `prot_ptr` 所指向的地址 `val`,並且把此 `val` 加入到 hazard pointer 中,注意到在程式碼第 16 行,由於 `list_insert_or_append` 過程中 `val` 可能被移除,因此必須重新讀取 `port_ptr` 確認其指向的地址沒變才代表讀取成功,否則需把 hazard pointer 移除後重新讀取新的 `val`。
* 在 reader 讀取資料成功後,需主動呼叫 `drop` 函式,此函式會把 `safe_val` 從 hazard pointers 結構中移除,此時才表示 reader 已經完成讀取動作,writer 才能直接移除資料。
```cpp
static void cleanup_ptr(domain_t *dom, uintptr_t ptr, int flags)
{
if (!list_contains(&dom->pointers, ptr)) { /* deallocate straight away */
dom->deallocator((void *) ptr);
} else if (flags & DEFER_DEALLOC) { /* Defer deallocation for later */
list_insert_or_append(&dom->retired, ptr);
} else { /* Spin until all readers are done, then deallocate */
while (list_contains(&dom->pointers, ptr))
;
dom->deallocator((void *) ptr);
}
}
/* Swaps the contents of a shared pointer with a new pointer. The old value will
* be deallocated by calling the `deallocator` function for the domain, provided
* when `domain_new` was called. If `flags` is 0, this function will wait
* until no more references to the old object are held in order to deallocate
* it. If flags is `DEFER_DEALLOC`, the old object will only be deallocated
* if there are already no references to it; otherwise the cleanup will be done
* the next time `cleanup` is called.
*/
void swap(domain_t *dom, uintptr_t *prot_ptr, uintptr_t new_val, int flags)
{
const uintptr_t old_obj = atomic_exchange(prot_ptr, new_val);
cleanup_ptr(dom, old_obj, flags);
}
```
然後是與 writer 相關的函式:
* `swap` 函式會將 `port_ptr` 指向 `new_val` ,並嘗試釋放 `old_obj` 。
* `cleanup_ptr` 會嘗試把 `old_obj` 釋放,如同前述提過的,當發現 `old_obj` 仍在 hazard pointers 內,此時有兩個選擇。若 `flags == DEFER_DEALLOC` ,則把 `old_obj` 放到 retire list 中,等待下次再釋放;或是一直 busy waiting 直到 `old_obj` 被 reader 移出 hazard pointers 中,然後釋放資源。
```cpp
#define N_READERS 10
#define N_WRITERS 1
#define N_ITERS 20
static void *reader_thread(void *arg)
{
(void) arg;
struct timespec t1 = {.tv_sec = 0, .tv_nsec = 100};
for (int i = 0; i < N_ITERS; ++i) {
config_t *safe_config =
(config_t *) load(config_dom, (uintptr_t *) &shared_config);
if (!safe_config)
err(EXIT_FAILURE, "load");
print_config("read config ", safe_config);
drop(config_dom, (uintptr_t) safe_config);
nanosleep(&t1, NULL);
}
return NULL;
}
static void *writer_thread(void *arg)
{
(void) arg;
for (int i = 0; i < N_ITERS / 2; ++i) {
config_t *new_config = create_config();
new_config->v1 = rand();
new_config->v2 = rand();
new_config->v3 = rand();
print_config("updating config", new_config);
swap(config_dom, (uintptr_t *) &shared_config, (uintptr_t) new_config,
DEFER_DEALLOC);
print_config("updated config ", new_config);
}
return NULL;
}
```
以上是 reader thread 與 writer thread ,為了讓 reader 與 writer 比較容易能交互執行,我在 reader thread 使用了 [nanosleep](https://man7.org/linux/man-pages/man2/nanosleep.2.html) 讓每次 read 成功後都暫停 thread 執行時間至少 100 ns,執行結果片段如下。
```shell
read config : { 0x00000000, 0x00000000, 0x00000000 }
read config : { 0x00000000, 0x00000000, 0x00000000 }
updating config : { 0x6b8b4567, 0x327b23c6, 0x643c9869 }
updated config : { 0x6b8b4567, 0x327b23c6, 0x643c9869 }
updating config : { 0x66334873, 0x74b0dc51, 0x19495cff }
updated config : { 0x66334873, 0x74b0dc51, 0x19495cff }
updating config : { 0x2ae8944a, 0x625558ec, 0x238e1f29 }
read config : { 0x2ae8944a, 0x625558ec, 0x238e1f29 }
read config : { 0x2ae8944a, 0x625558ec, 0x238e1f29 }
read config : { 0x2ae8944a, 0x625558ec, 0x238e1f29 }
read config : { 0x2ae8944a, 0x625558ec, 0x238e1f29 }
updated config : { 0x2ae8944a, 0x625558ec, 0x238e1f29 }
read config : { 0x2ae8944a, 0x625558ec, 0x238e1f29 }
updating config : { 0x46e87ccd, 0x3d1b58ba, 0x507ed7ab }
read config : { 0x2ae8944a, 0x625558ec, 0x238e1f29 }
updated config : { 0x46e87ccd, 0x3d1b58ba, 0x507ed7ab }
```
```cpp
/* Forces the cleanup of old objects that have not been deallocated yet. Just
* like `swap`, if `flags` is 0, this function will wait until there are no
* more references to each object. If `flags` is `DEFER_DEALLOC`, only
* objects that already have no living references will be deallocated.
*/
void cleanup(domain_t *dom, int flags)
{
hp_t *node;
LIST_ITER (&dom->retired, node) {
uintptr_t ptr = node->ptr;
if (!ptr)
continue;
if (!list_contains(&dom->pointers, ptr)) {
/* We can deallocate straight away */
if (list_remove(&dom->pointers, ptr))
dom->deallocator((void *) ptr);
} else if (!(flags & DEFER_DEALLOC)) {
/* Spin until all readers are done, then deallocate */
while (list_contains(&dom->pointers, ptr))
;
if (list_remove(&dom->pointers, ptr))
dom->deallocator((void *) ptr);
}
}
}
```
`cleanup` 函式的目的是嘗試釋放在 retire list 中的資料,其檢查方式與 `cleanup_ptr` 相同。為此我另外建了一個 `cleaner_thread` 使用 [timerfd](https://man7.org/linux/man-pages/man2/timerfd_create.2.html) 週期性的執行 `cleanup` 來釋放資源。
```cpp
static void *cleaner_thread(void *arg)
{
atomic_bool *stop = (atomic_bool *) arg;
struct itimerspec new_value;
new_value.it_value.tv_sec = 0;
new_value.it_value.tv_nsec = 1000;
new_value.it_interval.tv_sec = 1;
new_value.it_interval.tv_nsec = 0;
int fd = timerfd_create(CLOCK_MONOTONIC, 0);
if (fd == -1)
warn("timerfd_create");
if (timerfd_settime(fd, TFD_TIMER_ABSTIME, &new_value, NULL))
warn("timerfd_settime");
while (!atomic_load (stop)) {
uint64_t exp;
ssize_t s = read(fd, &exp, sizeof(uint64_t));
if (s != sizeof(uint64_t)) {
warn("read failed");
return NULL;
}
printf("cleanup\n");
cleanup(config_dom, DEFER_DEALLOC);
}
close(fd);
return NULL;
}
int main()
{
pthread_t readers[N_READERS], writers[N_WRITERS], cleaner;
atomic_bool stop = ATOMIC_VAR_INIT(false);
init();
/* Start threads */
for (size_t i = 0; i < ARRAY_SIZE(readers); ++i) {
if (pthread_create(readers + i, NULL, reader_thread, NULL))
warn("pthread_create");
}
for (size_t i = 0; i < ARRAY_SIZE(writers); ++i) {
if (pthread_create(writers + i, NULL, writer_thread, NULL))
warn("pthread_create");
}
if (pthread_create(&cleaner, NULL, cleaner_thread, &stop))
warn("pthread_create");
/* Wait for threads to finish */
for (size_t i = 0; i < ARRAY_SIZE(readers); ++i) {
if (pthread_join(readers[i], NULL))
warn("pthread_join");
}
for (size_t i = 0; i < ARRAY_SIZE(writers); ++i) {
if (pthread_join(writers[i], NULL))
warn("pthread_join");
}
atomic_store(&stop, true);
if (pthread_join(cleaner, NULL))
warn("pthread_join");
deinit();
return EXIT_SUCCESS;
}
```
### Memory order
事前準備:
* [並行程式設計: 執行順序](https://hackmd.io/@sysprog/concurrency/https%3A%2F%2Fhackmd.io%2F%40sysprog%2Fconcurrency-ordering)
* [並行程式設計: Atomics 操作](https://hackmd.io/@sysprog/concurrency/https%3A%2F%2Fhackmd.io%2F%40sysprog%2Fconcurrency-atomics)
`load` 和 `swap` 都會操作相同的 share object `prot_ptr` ,而原程式碼對 `prot_ptr` 的 atomic 操作都是使用 sequentially-consistent memory order ,這裡嘗試更改成 release-acquire memory order,但首先要先搞懂兩者的差別。
首先看到 [gcc docs](https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html) 對 release-acquire memory order 的定義
> **__ATOMIC_ACQUIRE:** Creates an inter-thread happens-before constraint from the release (or stronger) semantic store to this acquire load. Can prevent hoisting of code to before the operation.
>
> **__ATOMIC_RELEASE:** Creates an inter-thread happens-before constraint to acquire (or stronger) semantic loads that read from this release store. Can prevent sinking of code to after the operation.
>
> **__ATOMIC_ACQ_REL:** Combines the effects of both __ATOMIC_ACQUIRE and __ATOMIC_RELEASE.
acquire 與 release 需要成對的使用,是用來建立不同執行緒之間的 happens-before 關係,__ATOMIC_ACQUIRE 可以避免後面的指令被重排到此指令之前,而 __ATOMIC_RELEASE 可以避免前面的指令被重排到此指令之後。舉例來說
```cpp
/* shared variable */
bool M = false;
int a = 0;
void *thread1() {
a = 1;
atomic_store_explicit(&M, true, memory_order_release);
}
void *thread2() {
if (atomic_load_explicit(&M, memory_order_acquire)) {
assert(a == 1);
}
}
```
若 store `M` happens-before load `M` , 則在 thread2 `M = true` 時,必定能保證 `a == 1` ,因為 thread1 `a = 1;` 的操作 happens-before store `M`。
接著看到 sequentially-consistant memory order ,根據 C11 standard
>$5.1.2.4.7: All modifications to a particular atomic object M occur in some particular total order, called the modification order of M. If A and B are modifications of an atomic object M, and A happens before B, then A shall precede B in the modification order of M.
>
>$7.17.3.6: For memory_order_seq_cst, there shall be a single total order S on all operations, consistent with the ‘‘happens before’’ order and modification orders for all affected locations, such that each memory_order_seq_cst operation that loads a value observes either the last preceding modification according to this order S, or the result of an operation that is not memory_order_seq_cst.
意思是,對於每一個 `memory_order_seq_cst` 操作(無論在哪個 thread),都確保只會看到相同的修改順序,也就是 single total order。
單純看定義真的很難懂,後來我在 [cppreference](https://en.cppreference.com/w/cpp/atomic/memory_order#Sequentially-consistent_ordering) 找到一個例子,以及 [How std::memory_order_seq_cst works](https://stackoverflow.com/a/48984938/11425048) 有比較白話的解釋,能夠清楚區分 `memory_order_seq_cst` 與 `memory_order_acq_rel` 的差別,程式碼如下
```cpp
#include <thread>
#include <atomic>
#include <cassert>
std::atomic<bool> x = {false};
std::atomic<bool> y = {false};
std::atomic<int> z = {0};
void write_x()
{
x.store(true, std::memory_order_seq_cst);
}
void write_y()
{
y.store(true, std::memory_order_seq_cst);
}
void read_x_then_y()
{
while (!x.load(std::memory_order_seq_cst))
;
if (y.load(std::memory_order_seq_cst)) {
++z;
}
}
void read_y_then_x()
{
while (!y.load(std::memory_order_seq_cst))
;
if (x.load(std::memory_order_seq_cst)) {
++z;
}
}
int main()
{
std::thread a(write_x);
std::thread b(write_y);
std::thread c(read_x_then_y);
std::thread d(read_y_then_x);
a.join(); b.join(); c.join(); d.join();
assert(z.load() != 0); // will never happen
}
```
上述程式碼能夠保證最後結果永遠為 `z != 0` ,原因是,對於 `x`, `y` 兩變數,假設其修改順序為先 `x = true;` 然後才是 `y = true;`,則在任意時間點所有的 thread 都只有可能看到以下三種 `x`, `y` 的狀態
* state1: `x == false && y == false`
* state2: `x == true && y == false`
* state3: `x == true && y == true`
因此,若 `x`, `y` 處於 state2 時,此時 thread c 開始執行,則 `++z;` 可能會發生也可能不會發生,因為執行 thread c 期間有可能 `x`, `y` 變為 state 3。
而當 state2 對 thread c 為**可見**時,同一時間 thread d 也只可能看到 state2 ,因此 thread d 必定會執行到 `++z;`。
然而,若改為使用 `memory_order_acq_rel` ,當 thread a 的結果對 thread c 是可見時,這裡只保證了 thread a 之前的指令(此例子為空指令)對 thread c 也是可見的,並**不表示** thread a 的結果對 thread d 也是可見的。
同理,當 thread b 的結果對 thread d 是可見的時,**不表示** thread b 的結果對 thread c 也是可見的。
因此可能發生以下情況
* thread a is happens-before thread c, but is **not** happens-before thread d
* thread b is happens-before thread d, but is **not** happens-before thread c
* thread c 看見 state: `{x = true, y = false}` 的同時,thread d 看見 state: `{x = false, y = true}`
* 則此時 `assert(z != 0)` 發生
---
回到原題目程式碼
```cpp=
/* reader */
uintptr_t load(domain_t *dom, const uintptr_t *prot_ptr)
{
const uintptr_t nullptr = 0;
while (1) {
uintptr_t val = atomic_load(prot_ptr, __ATOMIC_ACQUIRE);
hp_t *node = list_insert_or_append(&dom->pointers, val);
if (!node)
return 0;
/* Hazard pointer inserted successfully */
if (atomic_load(prot_ptr, __ATOMIC_ACQUIRE) == val)
return val;
...
}
}
/* writer */
void swap(domain_t *dom, uintptr_t *prot_ptr, uintptr_t new_val, int flags)
{
const uintptr_t old_obj =
atomic_exchange(prot_ptr, new_val, __ATOMIC_ACQ_REL);
cleanup_ptr(dom, old_obj, flags);
}
static void *writer_thread(void *arg)
{
(void) arg;
for (int i = 0; i < N_ITERS / 2; ++i) {
config_t *new_config = create_config();
new_config->v1 = rand();
new_config->v2 = rand();
new_config->v3 = rand();
print_config("updating config", new_config);
swap(config_dom, (uintptr_t *) &shared_config, (uintptr_t) new_config,
DEFER_DEALLOC);
print_config("updated config ", new_config);
}
return NULL;
}
```
對於 reader 中對 `prot_ptr` 的 load 操作,只要保證 writer 的 store 操作(這裡指第 24 行的 `atomic_exchange`) 與其之前的指令 (如第 34~36 行 assign value to `new_config`) happens-before reader 的 load 操作即可。
閱讀 [kdnvt](https://hackmd.io/@kdnvt/linux2022-quiz5#Memory-barrier) 同學的共筆,發現一個我沒注意到的問題
```shell=
reader | writer
-----------------------------------------------------------
| /* store on ptr */
/* invisible to | val = atomic_exchange(ptr,new);
reader */ | /* still in write buffer */
|
| /* load old */
| if(!list_contain(val)){
/* store on list */ |
list_insert(val); |
|
/* load on ptr |
ptr = old */ |
if(atomic_load(ptr) == val){ |
return val; |
} |
|
/* visible to | // ptr = new;
reader now */ | free(val);
| }
```
若使用 `memory_order_acq_rel` ,在 writer 執行第 3 行 `val = atomic_exchange(ptr,new)` 時,`ptr = new` 的結果會被存在硬體的 [store buffer](https://hackmd.io/@sysprog/concurrency/https%3A%2F%2Fhackmd.io%2F%40sysprog%2Fconcurrency-atomics#Store-Buffer) 上,接著照以上順序執行到第 14 行時,`atomic_load(ptr)` 仍會取到 `old` ,因此回傳 `val` 為 `old`,此時執行 writer 的 CPU 才廣播 invalidate `ptr` 以為時已晚,執行 `free(val)` 就會造成 reader 讀到的值被釋放掉。
我認為 kdnvt 同學的說法是對的,但是我自己做修改如 [commit d91dab ](https://github.com/bakudr18/hazard_pointer/commit/d91dabcfdea69bc9a1cfdd505eeb4b37e6dcde86)使用 `memory_order_acq_rel` ThreadSanitizer 也沒有報錯,嘗試建立更多的 writer/reader 如 [commit 0e9baf](https://github.com/bakudr18/hazard_pointer/commit/0e9bafc461e3b24584a40538b3d44114c530a87b) 也沒有出錯,有可能只是剛好 CPU 沒有以此順序執行。
經老師的提示,我嘗試換個環境測試,我找了一片 Raspberry Pi 3B+ 來測試,執行環境如下
```shell
ubuntu@ubuntu:~$ uname -a
Linux ubuntu 5.13.0-1024-raspi #26-Ubuntu SMP PREEMPT Fri Mar 25 10:54:36 UTC 2022 aarch64 aarch64 aarch64 GNU/Linux
```
同樣執行 [commit 0e9baf](https://github.com/bakudr18/hazard_pointer/commit/0e9bafc461e3b24584a40538b3d44114c530a87b) 也沒有出錯,因此開始嘗試減化程式碼讓測試更單純,也希望較少的程式碼能增加 memory reordering 機率,完整程式碼可見 [hp_ordering](https://github.com/bakudr18/hp_ordering),以下節錄片段
```cpp
/* Only accept one reader */
static void *reader_thread(void *arg)
{
(void) arg;
const uintptr_t nullptr = 0;
pthread_barrier_wait(&barr);
for (int i = 0; i < N_ITERS; i++) {
while (1) {
uintptr_t val =
(uintptr_t) __atomic_load_n(&shared_config, __ATOMIC_ACQUIRE);
__atomic_store(&hp_ptr, &val, __ATOMIC_RELEASE);
if (__atomic_load_n(&shared_config, __ATOMIC_ACQUIRE) ==
(config_t *) val) {
print_config("read config ", (config_t *) val);
break;
}
__atomic_store(&hp_ptr, &nullptr, __ATOMIC_RELEASE);
}
}
__atomic_store(&hp_ptr, &nullptr, __ATOMIC_RELEASE);
return NULL;
}
static void *writer_thread(void *arg)
{
(void) arg;
pthread_barrier_wait(&barr);
for (int i = 0; i < N_ITERS; i++) {
config_t *new_config = create_config();
new_config->v1 = rand();
const uintptr_t old_obj = (uintptr_t) __atomic_exchange_n(
&shared_config, new_config, __ATOMIC_ACQ_REL);
while (__atomic_load_n(&hp_ptr, __ATOMIC_ACQUIRE) == old_obj)
;
delete_config((void *) old_obj);
}
return NULL;
}
```
首先在 writer 與 reader 開頭使用 [pthread_barrier_wait](https://linux.die.net/man/3/pthread_barrier_wait),其作用是會等待多個執行緒都執行到同一個等待點才允許繼續執行,這樣做可以讓兩者交互執行機率變較高。
另外可以看到 reader 的部份我使用 `hp_ptr` 只允許有一個 reader,取代原本的 hazard pointer 結構,目的是減少兩次 load `shared_config` 之間的指令數量,希望能增加 reorder 機會,而 writer 部份也是盡可能簡化,不過同樣允許多個 writer 同時執行。
然而結果一樣不如我所想,ThreadSanitizer 沒有觸發 data race。
考量到有可能是 ThreadSanitizer 沒辦法偵測到這類型的 data race ,我另外修改了 [commit 137592](https://github.com/bakudr18/hp_ordering/commit/137592f9f852f93a27b87fc1d2344ad15bc868ab) 用 assert 來偵測,但在 RPi 3 上仍然不會發生錯誤。
```cpp
/* Only accept one reader */
static void *reader_thread(void *arg)
{
(void) arg;
const uintptr_t nullptr = 0;
pthread_barrier_wait(&barr);
for (int i = 0; i < N_ITERS; i++) {
while (1) {
uintptr_t val =
(uintptr_t) __atomic_load_n(&shared_config, __ATOMIC_ACQUIRE);
__atomic_store(&hp_ptr, &val, __ATOMIC_RELEASE);
if (__atomic_load_n(&shared_config, __ATOMIC_ACQUIRE) ==
(config_t *) val) {
assert(((config_t *) val)->v1 != ERR_FATAL);
break;
}
__atomic_store(&hp_ptr, &nullptr, __ATOMIC_RELEASE);
}
}
__atomic_store(&hp_ptr, &nullptr, __ATOMIC_RELEASE);
return NULL;
}
static void *writer_thread(void *arg)
{
(void) arg;
pthread_barrier_wait(&barr);
for (int i = 0; i < N_ITERS; i++) {
config_t *new_config = create_config();
new_config->v1 = rand() & N_ITERS;
config_t *old_obj =
__atomic_exchange_n(&shared_config, new_config, __ATOMIC_ACQ_REL);
while (__atomic_load_n(&hp_ptr, __ATOMIC_ACQUIRE) ==
(uintptr_t) old_obj)
;
// old_obj is retired
old_obj->v1 = ERR_FATAL;
}
return NULL;
}
```
### 重新實驗
有鑑於我對 atomic operation 的實驗方式有些疑惑,因此決定從簡單的實驗做起。考慮以下學習 atomic operation 很常見的範例
```cpp
/* X and published are both initialize as zero */
/* thread A */
atomic_store_explicit(&X, 1, memory_order_relaxed);
atomic_store_explicit(&published, 1, memory_order_relaxed);
/* thread B */
if(atomic_load_explicit(&published, memory_order_relaxed) == 1){
assert(atomic_load_explicit(&X, memory_order_relaxed) == 1);
}
```
`memory_order_relaxed` 只保證了本身操作的 atomic,允許編譯器與處理器充份發揮 memory ordering,因此 thread A 的 `X` 與 `published` 操作是有可能被重新排序的,注意到只是**可能**,並不表示實際執行時一定會發生。實際實驗時,我模仿 [Memory Reordering Caught in the Act](https://preshing.com/20120515/memory-reordering-caught-in-the-act/) 使用 semaphore 讓 thread A 與 thread B 能一直恢復 `X` 與 `published` 初始狀態重新執行,完整程式碼如下,我在 intel core i7-1165G7 與 Rasberry Pi 3 B+ 上兩個硬體上測試皆沒有觸發 `assert`。
```cpp
#include <stdio.h>
#include <assert.h>
#include <err.h>
#include <stdlib.h>
#include <stdatomic.h>
#include <pthread.h>
#include <semaphore.h>
static atomic_uint published;
static atomic_uint X;
static sem_t begin_sem1, begin_sem2, end_sem;
void *storer(void *arg){
for(;;){
sem_wait(&begin_sem1);
atomic_store_explicit(&X, 1, memory_order_relaxed);
atomic_store_explicit(&published, 1, memory_order_relaxed);
sem_post(&end_sem);
}
return NULL;
}
void *loader(void *arg){
for(;;){
sem_wait(&begin_sem2);
if(atomic_load_explicit(&published, memory_order_relaxed) == 1){
assert(atomic_load_explicit(&X, memory_order_relaxed) == 1);
}
sem_post(&end_sem);
}
return NULL;
}
int main(){
pthread_t lthread, sthread;
sem_init(&begin_sem1, 0, 0);
sem_init(&begin_sem2, 0, 0);
sem_init(&end_sem, 0, 0);
if(pthread_create(<hread, NULL, loader, NULL))
warn("pthread_create");
if(pthread_create(&sthread, NULL, storer, NULL))
warn("pthread_create");
for(;;){
atomic_store_explicit(&published, 0, memory_order_relaxed);
atomic_store_explicit(&X, 0, memory_order_relaxed);
sem_post(&begin_sem1);
sem_post(&begin_sem2);
sem_wait(&end_sem);
sem_wait(&end_sem);
}
if(pthread_join(lthread, NULL))
warn("pthread_join");
if(pthread_join(sthread, NULL))
warn("pthread_join");
return 0;
}
```
我目前可以想到的情況是,在一個 CPU core pipeline 沒有很多指令需要執行時,也就是不夠忙碌時,其實不一定會嘗試做 memory reordering,因此,簡單的想法就是增加允許被重新排序的指令數量,如增加與 `X` 相同作用的 `Y`, `Z` 兩個變數,重新實驗在 RPi 3 上確實發生了`assert`。
```diff
void *storer(void *arg){
for(;;){
sem_wait(&begin_sem1);
atomic_store_explicit(&X, 1, memory_order_relaxed);
+ atomic_store_explicit(&Y, 2, memory_order_relaxed);
+ atomic_store_explicit(&Z, 3, memory_order_relaxed);
atomic_store_explicit(&published, 1, memory_order_relaxed);
sem_post(&end_sem);
}
return NULL;
}
void *loader(void *arg){
for(;;){
sem_wait(&begin_sem2);
if(atomic_load_explicit(&published, memory_order_relaxed) == 1){
assert(atomic_load_explicit(&X, memory_order_relaxed) == 1);
+ assert(atomic_load_explicit(&Y, memory_order_relaxed) == 2);
+ assert(atomic_load_explicit(&Z, memory_order_relaxed) == 3);
}
sem_post(&end_sem);
}
return NULL;
}
```
用相同方法驗證 hazard pointer,如 [commit 241b59](https://github.com/bakudr18/hp_ordering/commit/241b59c7bac033c20442a2ca4ee870a2d11f24b1),在 intel Core i7-1165G7 上也驗證到了以下 `assert` (在 RPi 3 上會因為資源不足導致 process 被 Kill)。
```shell
hp: main.c:79: reader_thread: Assertion `((config_t *) val)->v1 != ERR_FATAL' failed.
Aborted (core dumped)
```
從以上經驗可知,memory order 的驗證方式其實非常困難,很有可能即使概念是錯的,但剛好硬體沒有對指令重新排序,導致最後執行結果依舊成功,因此,在驗證上除了可以簡化程式碼,也可能需要使用 [I/O ordering 學習紀錄](https://hackmd.io/@butastur/rkkQEGjUN) 與 [Axiomatic validation of memory barriers and atomic instructions](https://lwn.net/Articles/608550/) 裡提到的 [litmus7](http://diy.inria.fr/doc/litmus.html) 等驗證工具才能完整驗證概念的正確性。
### Multiple writers
原程式碼如果有兩個以上的 writer ,執行時期 ThreadSanitizer 會偵測到 data race 如下
```cpp
==================
WARNING: ThreadSanitizer: data race (pid=40244)
Write of size 8 at 0x7b0400000008 by thread T5:
#0 free ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:711 (libtsan.so.0+0x37ab8)
#1 delete_config <null> (hp+0x15be)
#2 writer_thread <null> (hp+0x1899)
Previous read of size 4 at 0x7b0400000008 by thread T4:
#0 writer_thread <null> (hp+0x18a1)
Thread T5 (tid=40250, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605f8)
#1 main <null> (hp+0x135a)
Thread T4 (tid=40249, finished) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605f8)
#1 main <null> (hp+0x133a)
SUMMARY: ThreadSanitizer: data race (/home/bakud/linux2022/hazard_pointer/hp+0x15be) in delete_config
==================
```
會產生 data race 的原因在於, writer 在 `swap` 成功後,試圖去讀取 `new_config` 資料,就可能發生在讀取當下有其他 wrtier 執行 `delete_config` 把資料釋放掉,造成錯誤結果。
解法如 [commit a355f3](https://github.com/bakudr18/hazard_pointer/commit/a355f33724e2b239b37e978a3b55d778a321f902) 所示,在 writer 執行 `swap` 之前先將 `new_config` 加進 hazard pointer list 中,等待 writer 使用完畢後主動的刪除 `new_config` 的 hazard pointer,此舉可以想成把 writer 自己也當成一個 reader。