contribute by < bakudr18
>
2
lock-free data structure 必須要注意 ABA problem ,例如若有一個 linked list 為 \(a\rightarrow b\rightarrow c\) ,對 linked list 的 head 做 compare_and_swap(&head, new = 'b', expected = 'a')
操作時,若此時有令一個執行緒 連續 pop 'a' 'b'
後再 push 'a'
,則此時 linked list 變為 \(a\rightarrow c\) , CAS 操作仍會成功,就會導致 head
被指向已經被釋放的 'b'
而導致錯誤。
而 Hazard pointer 就是解決 ABA problem 的其中一種方法,可以避免仍有 reader 在讀取資料被釋放。其概念是當有 reader 要讀取資料時,都會建立一個 hazard pointer 指向該資料,直到 reader 讀取完畢才會移除 hazard pointer,而此時當有 writer 要移除資料時,若發現此資料在 hazard pointers 內,則不能直接移除,此時有兩種作法,一是一直等到資料該資料的 hazard pointer 被 reader 主動移除,writer 才能移除資料,二是先把此資料放到 retire list 中,等下次再來檢查是否有資料在 retire list 且不被 hazard pointer 所指向,此時才可以移除資料。
接著看到程式碼:
typedef struct __hp {
uintptr_t ptr;
struct __hp *next;
} hp_t;
typedef struct {
hp_t *pointers; /* hazard pointer structure */
hp_t *retired; /* retire list structure */
void (*deallocator)(void *);
} domain_t;
static domain_t *config_dom;
config_dom
為管理 hazard pointers 與 retire list 的資料結構,pointers
與 retired
皆為 linked list 結構,而 deallocator
為用來釋放資源的 function pointer。
/*
* Load a safe pointer to a shared object. This pointer must be passed to
* `drop` once it is no longer needed. Returns 0 (NULL) on error.
*/
uintptr_t load(domain_t *dom, const uintptr_t *prot_ptr)
{
const uintptr_t nullptr = 0;
while (1) {
uintptr_t val = atomic_load(prot_ptr);
hp_t *node = list_insert_or_append(&dom->pointers, val);
if (!node)
return 0;
/* Hazard pointer inserted successfully */
if (atomic_load(prot_ptr) == val)
return val;
/*
* This pointer is being retired by another thread - remove this hazard
* pointer and try again. We first try to remove the hazard pointer we
* just used. If someone else used it to drop the same pointer, we walk
* the list.
*/
uintptr_t tmp = val;
if (!atomic_cas(&node->ptr, &tmp, &nullptr))
list_remove(&dom->pointers, val);
}
}
/*
* Drop a safe pointer to a shared object. This pointer (`safe_val`) must have
* come from `load`
*/
void drop(domain_t *dom, uintptr_t safe_val)
{
if (!list_remove(&dom->pointers, safe_val))
__builtin_unreachable();
}
首先看到以上兩個與 reader 相關的函式:
load
函式的作用是讀取 prot_ptr
所指向的地址 val
,並且把此 val
加入到 hazard pointer 中,注意到在程式碼第 16 行,由於 list_insert_or_append
過程中 val
可能被移除,因此必須重新讀取 port_ptr
確認其指向的地址沒變才代表讀取成功,否則需把 hazard pointer 移除後重新讀取新的 val
。drop
函式,此函式會把 safe_val
從 hazard pointers 結構中移除,此時才表示 reader 已經完成讀取動作,writer 才能直接移除資料。static void cleanup_ptr(domain_t *dom, uintptr_t ptr, int flags)
{
if (!list_contains(&dom->pointers, ptr)) { /* deallocate straight away */
dom->deallocator((void *) ptr);
} else if (flags & DEFER_DEALLOC) { /* Defer deallocation for later */
list_insert_or_append(&dom->retired, ptr);
} else { /* Spin until all readers are done, then deallocate */
while (list_contains(&dom->pointers, ptr))
;
dom->deallocator((void *) ptr);
}
}
/* Swaps the contents of a shared pointer with a new pointer. The old value will
* be deallocated by calling the `deallocator` function for the domain, provided
* when `domain_new` was called. If `flags` is 0, this function will wait
* until no more references to the old object are held in order to deallocate
* it. If flags is `DEFER_DEALLOC`, the old object will only be deallocated
* if there are already no references to it; otherwise the cleanup will be done
* the next time `cleanup` is called.
*/
void swap(domain_t *dom, uintptr_t *prot_ptr, uintptr_t new_val, int flags)
{
const uintptr_t old_obj = atomic_exchange(prot_ptr, new_val);
cleanup_ptr(dom, old_obj, flags);
}
然後是與 writer 相關的函式:
swap
函式會將 port_ptr
指向 new_val
,並嘗試釋放 old_obj
。cleanup_ptr
會嘗試把 old_obj
釋放,如同前述提過的,當發現 old_obj
仍在 hazard pointers 內,此時有兩個選擇。若 flags == DEFER_DEALLOC
,則把 old_obj
放到 retire list 中,等待下次再釋放;或是一直 busy waiting 直到 old_obj
被 reader 移出 hazard pointers 中,然後釋放資源。#define N_READERS 10
#define N_WRITERS 1
#define N_ITERS 20
static void *reader_thread(void *arg)
{
(void) arg;
struct timespec t1 = {.tv_sec = 0, .tv_nsec = 100};
for (int i = 0; i < N_ITERS; ++i) {
config_t *safe_config =
(config_t *) load(config_dom, (uintptr_t *) &shared_config);
if (!safe_config)
err(EXIT_FAILURE, "load");
print_config("read config ", safe_config);
drop(config_dom, (uintptr_t) safe_config);
nanosleep(&t1, NULL);
}
return NULL;
}
static void *writer_thread(void *arg)
{
(void) arg;
for (int i = 0; i < N_ITERS / 2; ++i) {
config_t *new_config = create_config();
new_config->v1 = rand();
new_config->v2 = rand();
new_config->v3 = rand();
print_config("updating config", new_config);
swap(config_dom, (uintptr_t *) &shared_config, (uintptr_t) new_config,
DEFER_DEALLOC);
print_config("updated config ", new_config);
}
return NULL;
}
以上是 reader thread 與 writer thread ,為了讓 reader 與 writer 比較容易能交互執行,我在 reader thread 使用了 nanosleep 讓每次 read 成功後都暫停 thread 執行時間至少 100 ns,執行結果片段如下。
read config : { 0x00000000, 0x00000000, 0x00000000 }
read config : { 0x00000000, 0x00000000, 0x00000000 }
updating config : { 0x6b8b4567, 0x327b23c6, 0x643c9869 }
updated config : { 0x6b8b4567, 0x327b23c6, 0x643c9869 }
updating config : { 0x66334873, 0x74b0dc51, 0x19495cff }
updated config : { 0x66334873, 0x74b0dc51, 0x19495cff }
updating config : { 0x2ae8944a, 0x625558ec, 0x238e1f29 }
read config : { 0x2ae8944a, 0x625558ec, 0x238e1f29 }
read config : { 0x2ae8944a, 0x625558ec, 0x238e1f29 }
read config : { 0x2ae8944a, 0x625558ec, 0x238e1f29 }
read config : { 0x2ae8944a, 0x625558ec, 0x238e1f29 }
updated config : { 0x2ae8944a, 0x625558ec, 0x238e1f29 }
read config : { 0x2ae8944a, 0x625558ec, 0x238e1f29 }
updating config : { 0x46e87ccd, 0x3d1b58ba, 0x507ed7ab }
read config : { 0x2ae8944a, 0x625558ec, 0x238e1f29 }
updated config : { 0x46e87ccd, 0x3d1b58ba, 0x507ed7ab }
/* Forces the cleanup of old objects that have not been deallocated yet. Just
* like `swap`, if `flags` is 0, this function will wait until there are no
* more references to each object. If `flags` is `DEFER_DEALLOC`, only
* objects that already have no living references will be deallocated.
*/
void cleanup(domain_t *dom, int flags)
{
hp_t *node;
LIST_ITER (&dom->retired, node) {
uintptr_t ptr = node->ptr;
if (!ptr)
continue;
if (!list_contains(&dom->pointers, ptr)) {
/* We can deallocate straight away */
if (list_remove(&dom->pointers, ptr))
dom->deallocator((void *) ptr);
} else if (!(flags & DEFER_DEALLOC)) {
/* Spin until all readers are done, then deallocate */
while (list_contains(&dom->pointers, ptr))
;
if (list_remove(&dom->pointers, ptr))
dom->deallocator((void *) ptr);
}
}
}
cleanup
函式的目的是嘗試釋放在 retire list 中的資料,其檢查方式與 cleanup_ptr
相同。為此我另外建了一個 cleaner_thread
使用 timerfd 週期性的執行 cleanup
來釋放資源。
static void *cleaner_thread(void *arg)
{
atomic_bool *stop = (atomic_bool *) arg;
struct itimerspec new_value;
new_value.it_value.tv_sec = 0;
new_value.it_value.tv_nsec = 1000;
new_value.it_interval.tv_sec = 1;
new_value.it_interval.tv_nsec = 0;
int fd = timerfd_create(CLOCK_MONOTONIC, 0);
if (fd == -1)
warn("timerfd_create");
if (timerfd_settime(fd, TFD_TIMER_ABSTIME, &new_value, NULL))
warn("timerfd_settime");
while (!atomic_load (stop)) {
uint64_t exp;
ssize_t s = read(fd, &exp, sizeof(uint64_t));
if (s != sizeof(uint64_t)) {
warn("read failed");
return NULL;
}
printf("cleanup\n");
cleanup(config_dom, DEFER_DEALLOC);
}
close(fd);
return NULL;
}
int main()
{
pthread_t readers[N_READERS], writers[N_WRITERS], cleaner;
atomic_bool stop = ATOMIC_VAR_INIT(false);
init();
/* Start threads */
for (size_t i = 0; i < ARRAY_SIZE(readers); ++i) {
if (pthread_create(readers + i, NULL, reader_thread, NULL))
warn("pthread_create");
}
for (size_t i = 0; i < ARRAY_SIZE(writers); ++i) {
if (pthread_create(writers + i, NULL, writer_thread, NULL))
warn("pthread_create");
}
if (pthread_create(&cleaner, NULL, cleaner_thread, &stop))
warn("pthread_create");
/* Wait for threads to finish */
for (size_t i = 0; i < ARRAY_SIZE(readers); ++i) {
if (pthread_join(readers[i], NULL))
warn("pthread_join");
}
for (size_t i = 0; i < ARRAY_SIZE(writers); ++i) {
if (pthread_join(writers[i], NULL))
warn("pthread_join");
}
atomic_store(&stop, true);
if (pthread_join(cleaner, NULL))
warn("pthread_join");
deinit();
return EXIT_SUCCESS;
}
事前準備:
load
和 swap
都會操作相同的 share object prot_ptr
,而原程式碼對 prot_ptr
的 atomic 操作都是使用 sequentially-consistent memory order ,這裡嘗試更改成 release-acquire memory order,但首先要先搞懂兩者的差別。
首先看到 gcc docs 對 release-acquire memory order 的定義
__ATOMIC_ACQUIRE: Creates an inter-thread happens-before constraint from the release (or stronger) semantic store to this acquire load. Can prevent hoisting of code to before the operation.
__ATOMIC_RELEASE: Creates an inter-thread happens-before constraint to acquire (or stronger) semantic loads that read from this release store. Can prevent sinking of code to after the operation.
__ATOMIC_ACQ_REL: Combines the effects of both __ATOMIC_ACQUIRE and __ATOMIC_RELEASE.
acquire 與 release 需要成對的使用,是用來建立不同執行緒之間的 happens-before 關係,__ATOMIC_ACQUIRE 可以避免後面的指令被重排到此指令之前,而 __ATOMIC_RELEASE 可以避免前面的指令被重排到此指令之後。舉例來說
/* shared variable */
bool M = false;
int a = 0;
void *thread1() {
a = 1;
atomic_store_explicit(&M, true, memory_order_release);
}
void *thread2() {
if (atomic_load_explicit(&M, memory_order_acquire)) {
assert(a == 1);
}
}
若 store M
happens-before load M
, 則在 thread2 M = true
時,必定能保證 a == 1
,因為 thread1 a = 1;
的操作 happens-before store M
。
接著看到 sequentially-consistant memory order ,根據 C11 standard
$5.1.2.4.7: All modifications to a particular atomic object M occur in some particular total order, called the modification order of M. If A and B are modifications of an atomic object M, and A happens before B, then A shall precede B in the modification order of M.
$7.17.3.6: For memory_order_seq_cst, there shall be a single total order S on all operations, consistent with the ‘‘happens before’’ order and modification orders for all affected locations, such that each memory_order_seq_cst operation that loads a value observes either the last preceding modification according to this order S, or the result of an operation that is not memory_order_seq_cst.
意思是,對於每一個 memory_order_seq_cst
操作(無論在哪個 thread),都確保只會看到相同的修改順序,也就是 single total order。
單純看定義真的很難懂,後來我在 cppreference 找到一個例子,以及 How std::memory_order_seq_cst works 有比較白話的解釋,能夠清楚區分 memory_order_seq_cst
與 memory_order_acq_rel
的差別,程式碼如下
#include <thread>
#include <atomic>
#include <cassert>
std::atomic<bool> x = {false};
std::atomic<bool> y = {false};
std::atomic<int> z = {0};
void write_x()
{
x.store(true, std::memory_order_seq_cst);
}
void write_y()
{
y.store(true, std::memory_order_seq_cst);
}
void read_x_then_y()
{
while (!x.load(std::memory_order_seq_cst))
;
if (y.load(std::memory_order_seq_cst)) {
++z;
}
}
void read_y_then_x()
{
while (!y.load(std::memory_order_seq_cst))
;
if (x.load(std::memory_order_seq_cst)) {
++z;
}
}
int main()
{
std::thread a(write_x);
std::thread b(write_y);
std::thread c(read_x_then_y);
std::thread d(read_y_then_x);
a.join(); b.join(); c.join(); d.join();
assert(z.load() != 0); // will never happen
}
上述程式碼能夠保證最後結果永遠為 z != 0
,原因是,對於 x
, y
兩變數,假設其修改順序為先 x = true;
然後才是 y = true;
,則在任意時間點所有的 thread 都只有可能看到以下三種 x
, y
的狀態
x == false && y == false
x == true && y == false
x == true && y == true
因此,若 x
, y
處於 state2 時,此時 thread c 開始執行,則 ++z;
可能會發生也可能不會發生,因為執行 thread c 期間有可能 x
, y
變為 state 3。
而當 state2 對 thread c 為可見時,同一時間 thread d 也只可能看到 state2 ,因此 thread d 必定會執行到 ++z;
。
然而,若改為使用 memory_order_acq_rel
,當 thread a 的結果對 thread c 是可見時,這裡只保證了 thread a 之前的指令(此例子為空指令)對 thread c 也是可見的,並不表示 thread a 的結果對 thread d 也是可見的。
同理,當 thread b 的結果對 thread d 是可見的時,不表示 thread b 的結果對 thread c 也是可見的。
因此可能發生以下情況
{x = true, y = false}
的同時,thread d 看見 state: {x = false, y = true}
assert(z != 0)
發生回到原題目程式碼
/* reader */
uintptr_t load(domain_t *dom, const uintptr_t *prot_ptr)
{
const uintptr_t nullptr = 0;
while (1) {
uintptr_t val = atomic_load(prot_ptr, __ATOMIC_ACQUIRE);
hp_t *node = list_insert_or_append(&dom->pointers, val);
if (!node)
return 0;
/* Hazard pointer inserted successfully */
if (atomic_load(prot_ptr, __ATOMIC_ACQUIRE) == val)
return val;
...
}
}
/* writer */
void swap(domain_t *dom, uintptr_t *prot_ptr, uintptr_t new_val, int flags)
{
const uintptr_t old_obj =
atomic_exchange(prot_ptr, new_val, __ATOMIC_ACQ_REL);
cleanup_ptr(dom, old_obj, flags);
}
static void *writer_thread(void *arg)
{
(void) arg;
for (int i = 0; i < N_ITERS / 2; ++i) {
config_t *new_config = create_config();
new_config->v1 = rand();
new_config->v2 = rand();
new_config->v3 = rand();
print_config("updating config", new_config);
swap(config_dom, (uintptr_t *) &shared_config, (uintptr_t) new_config,
DEFER_DEALLOC);
print_config("updated config ", new_config);
}
return NULL;
}
對於 reader 中對 prot_ptr
的 load 操作,只要保證 writer 的 store 操作(這裡指第 24 行的 atomic_exchange
) 與其之前的指令 (如第 34~36 行 assign value to new_config
) happens-before reader 的 load 操作即可。
閱讀 kdnvt 同學的共筆,發現一個我沒注意到的問題
reader | writer
-----------------------------------------------------------
| /* store on ptr */
/* invisible to | val = atomic_exchange(ptr,new);
reader */ | /* still in write buffer */
|
| /* load old */
| if(!list_contain(val)){
/* store on list */ |
list_insert(val); |
|
/* load on ptr |
ptr = old */ |
if(atomic_load(ptr) == val){ |
return val; |
} |
|
/* visible to | // ptr = new;
reader now */ | free(val);
| }
若使用 memory_order_acq_rel
,在 writer 執行第 3 行 val = atomic_exchange(ptr,new)
時,ptr = new
的結果會被存在硬體的 store buffer 上,接著照以上順序執行到第 14 行時,atomic_load(ptr)
仍會取到 old
,因此回傳 val
為 old
,此時執行 writer 的 CPU 才廣播 invalidate ptr
以為時已晚,執行 free(val)
就會造成 reader 讀到的值被釋放掉。
我認為 kdnvt 同學的說法是對的,但是我自己做修改如 commit d91dab 使用 memory_order_acq_rel
ThreadSanitizer 也沒有報錯,嘗試建立更多的 writer/reader 如 commit 0e9baf 也沒有出錯,有可能只是剛好 CPU 沒有以此順序執行。
經老師的提示,我嘗試換個環境測試,我找了一片 Raspberry Pi 3B+ 來測試,執行環境如下
ubuntu@ubuntu:~$ uname -a
Linux ubuntu 5.13.0-1024-raspi #26-Ubuntu SMP PREEMPT Fri Mar 25 10:54:36 UTC 2022 aarch64 aarch64 aarch64 GNU/Linux
同樣執行 commit 0e9baf 也沒有出錯,因此開始嘗試減化程式碼讓測試更單純,也希望較少的程式碼能增加 memory reordering 機率,完整程式碼可見 hp_ordering,以下節錄片段
/* Only accept one reader */
static void *reader_thread(void *arg)
{
(void) arg;
const uintptr_t nullptr = 0;
pthread_barrier_wait(&barr);
for (int i = 0; i < N_ITERS; i++) {
while (1) {
uintptr_t val =
(uintptr_t) __atomic_load_n(&shared_config, __ATOMIC_ACQUIRE);
__atomic_store(&hp_ptr, &val, __ATOMIC_RELEASE);
if (__atomic_load_n(&shared_config, __ATOMIC_ACQUIRE) ==
(config_t *) val) {
print_config("read config ", (config_t *) val);
break;
}
__atomic_store(&hp_ptr, &nullptr, __ATOMIC_RELEASE);
}
}
__atomic_store(&hp_ptr, &nullptr, __ATOMIC_RELEASE);
return NULL;
}
static void *writer_thread(void *arg)
{
(void) arg;
pthread_barrier_wait(&barr);
for (int i = 0; i < N_ITERS; i++) {
config_t *new_config = create_config();
new_config->v1 = rand();
const uintptr_t old_obj = (uintptr_t) __atomic_exchange_n(
&shared_config, new_config, __ATOMIC_ACQ_REL);
while (__atomic_load_n(&hp_ptr, __ATOMIC_ACQUIRE) == old_obj)
;
delete_config((void *) old_obj);
}
return NULL;
}
首先在 writer 與 reader 開頭使用 pthread_barrier_wait,其作用是會等待多個執行緒都執行到同一個等待點才允許繼續執行,這樣做可以讓兩者交互執行機率變較高。
另外可以看到 reader 的部份我使用 hp_ptr
只允許有一個 reader,取代原本的 hazard pointer 結構,目的是減少兩次 load shared_config
之間的指令數量,希望能增加 reorder 機會,而 writer 部份也是盡可能簡化,不過同樣允許多個 writer 同時執行。
然而結果一樣不如我所想,ThreadSanitizer 沒有觸發 data race。
考量到有可能是 ThreadSanitizer 沒辦法偵測到這類型的 data race ,我另外修改了 commit 137592 用 assert 來偵測,但在 RPi 3 上仍然不會發生錯誤。
/* Only accept one reader */
static void *reader_thread(void *arg)
{
(void) arg;
const uintptr_t nullptr = 0;
pthread_barrier_wait(&barr);
for (int i = 0; i < N_ITERS; i++) {
while (1) {
uintptr_t val =
(uintptr_t) __atomic_load_n(&shared_config, __ATOMIC_ACQUIRE);
__atomic_store(&hp_ptr, &val, __ATOMIC_RELEASE);
if (__atomic_load_n(&shared_config, __ATOMIC_ACQUIRE) ==
(config_t *) val) {
assert(((config_t *) val)->v1 != ERR_FATAL);
break;
}
__atomic_store(&hp_ptr, &nullptr, __ATOMIC_RELEASE);
}
}
__atomic_store(&hp_ptr, &nullptr, __ATOMIC_RELEASE);
return NULL;
}
static void *writer_thread(void *arg)
{
(void) arg;
pthread_barrier_wait(&barr);
for (int i = 0; i < N_ITERS; i++) {
config_t *new_config = create_config();
new_config->v1 = rand() & N_ITERS;
config_t *old_obj =
__atomic_exchange_n(&shared_config, new_config, __ATOMIC_ACQ_REL);
while (__atomic_load_n(&hp_ptr, __ATOMIC_ACQUIRE) ==
(uintptr_t) old_obj)
;
// old_obj is retired
old_obj->v1 = ERR_FATAL;
}
return NULL;
}
有鑑於我對 atomic operation 的實驗方式有些疑惑,因此決定從簡單的實驗做起。考慮以下學習 atomic operation 很常見的範例
/* X and published are both initialize as zero */
/* thread A */
atomic_store_explicit(&X, 1, memory_order_relaxed);
atomic_store_explicit(&published, 1, memory_order_relaxed);
/* thread B */
if(atomic_load_explicit(&published, memory_order_relaxed) == 1){
assert(atomic_load_explicit(&X, memory_order_relaxed) == 1);
}
memory_order_relaxed
只保證了本身操作的 atomic,允許編譯器與處理器充份發揮 memory ordering,因此 thread A 的 X
與 published
操作是有可能被重新排序的,注意到只是可能,並不表示實際執行時一定會發生。實際實驗時,我模仿 Memory Reordering Caught in the Act 使用 semaphore 讓 thread A 與 thread B 能一直恢復 X
與 published
初始狀態重新執行,完整程式碼如下,我在 intel core i7-1165G7 與 Rasberry Pi 3 B+ 上兩個硬體上測試皆沒有觸發 assert
。
#include <stdio.h>
#include <assert.h>
#include <err.h>
#include <stdlib.h>
#include <stdatomic.h>
#include <pthread.h>
#include <semaphore.h>
static atomic_uint published;
static atomic_uint X;
static sem_t begin_sem1, begin_sem2, end_sem;
void *storer(void *arg){
for(;;){
sem_wait(&begin_sem1);
atomic_store_explicit(&X, 1, memory_order_relaxed);
atomic_store_explicit(&published, 1, memory_order_relaxed);
sem_post(&end_sem);
}
return NULL;
}
void *loader(void *arg){
for(;;){
sem_wait(&begin_sem2);
if(atomic_load_explicit(&published, memory_order_relaxed) == 1){
assert(atomic_load_explicit(&X, memory_order_relaxed) == 1);
}
sem_post(&end_sem);
}
return NULL;
}
int main(){
pthread_t lthread, sthread;
sem_init(&begin_sem1, 0, 0);
sem_init(&begin_sem2, 0, 0);
sem_init(&end_sem, 0, 0);
if(pthread_create(<hread, NULL, loader, NULL))
warn("pthread_create");
if(pthread_create(&sthread, NULL, storer, NULL))
warn("pthread_create");
for(;;){
atomic_store_explicit(&published, 0, memory_order_relaxed);
atomic_store_explicit(&X, 0, memory_order_relaxed);
sem_post(&begin_sem1);
sem_post(&begin_sem2);
sem_wait(&end_sem);
sem_wait(&end_sem);
}
if(pthread_join(lthread, NULL))
warn("pthread_join");
if(pthread_join(sthread, NULL))
warn("pthread_join");
return 0;
}
我目前可以想到的情況是,在一個 CPU core pipeline 沒有很多指令需要執行時,也就是不夠忙碌時,其實不一定會嘗試做 memory reordering,因此,簡單的想法就是增加允許被重新排序的指令數量,如增加與 X
相同作用的 Y
, Z
兩個變數,重新實驗在 RPi 3 上確實發生了assert
。
void *storer(void *arg){
for(;;){
sem_wait(&begin_sem1);
atomic_store_explicit(&X, 1, memory_order_relaxed);
+ atomic_store_explicit(&Y, 2, memory_order_relaxed);
+ atomic_store_explicit(&Z, 3, memory_order_relaxed);
atomic_store_explicit(&published, 1, memory_order_relaxed);
sem_post(&end_sem);
}
return NULL;
}
void *loader(void *arg){
for(;;){
sem_wait(&begin_sem2);
if(atomic_load_explicit(&published, memory_order_relaxed) == 1){
assert(atomic_load_explicit(&X, memory_order_relaxed) == 1);
+ assert(atomic_load_explicit(&Y, memory_order_relaxed) == 2);
+ assert(atomic_load_explicit(&Z, memory_order_relaxed) == 3);
}
sem_post(&end_sem);
}
return NULL;
}
用相同方法驗證 hazard pointer,如 commit 241b59,在 intel Core i7-1165G7 上也驗證到了以下 assert
(在 RPi 3 上會因為資源不足導致 process 被 Kill)。
hp: main.c:79: reader_thread: Assertion `((config_t *) val)->v1 != ERR_FATAL' failed.
Aborted (core dumped)
從以上經驗可知,memory order 的驗證方式其實非常困難,很有可能即使概念是錯的,但剛好硬體沒有對指令重新排序,導致最後執行結果依舊成功,因此,在驗證上除了可以簡化程式碼,也可能需要使用 I/O ordering 學習紀錄 與 Axiomatic validation of memory barriers and atomic instructions 裡提到的 litmus7 等驗證工具才能完整驗證概念的正確性。
原程式碼如果有兩個以上的 writer ,執行時期 ThreadSanitizer 會偵測到 data race 如下
==================
WARNING: ThreadSanitizer: data race (pid=40244)
Write of size 8 at 0x7b0400000008 by thread T5:
#0 free ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:711 (libtsan.so.0+0x37ab8)
#1 delete_config <null> (hp+0x15be)
#2 writer_thread <null> (hp+0x1899)
Previous read of size 4 at 0x7b0400000008 by thread T4:
#0 writer_thread <null> (hp+0x18a1)
Thread T5 (tid=40250, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605f8)
#1 main <null> (hp+0x135a)
Thread T4 (tid=40249, finished) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605f8)
#1 main <null> (hp+0x133a)
SUMMARY: ThreadSanitizer: data race (/home/bakud/linux2022/hazard_pointer/hp+0x15be) in delete_config
==================
會產生 data race 的原因在於, writer 在 swap
成功後,試圖去讀取 new_config
資料,就可能發生在讀取當下有其他 wrtier 執行 delete_config
把資料釋放掉,造成錯誤結果。
解法如 commit a355f3 所示,在 writer 執行 swap
之前先將 new_config
加進 hazard pointer list 中,等待 writer 使用完畢後主動的刪除 new_config
的 hazard pointer,此舉可以想成把 writer 自己也當成一個 reader。