Try   HackMD

2022q1 Homework5 (quiz5)

contribute by < bakudr18 >

測驗題目

測驗 2

lock-free data structure 必須要注意 ABA problem ,例如若有一個 linked list 為 \(a\rightarrow b\rightarrow c\) ,對 linked list 的 head 做 compare_and_swap(&head, new = 'b', expected = 'a') 操作時,若此時有令一個執行緒 連續 pop 'a' 'b' 後再 push 'a' ,則此時 linked list 變為 \(a\rightarrow c\) , CAS 操作仍會成功,就會導致 head 被指向已經被釋放的 'b' 而導致錯誤。

Hazard pointer 就是解決 ABA problem 的其中一種方法,可以避免仍有 reader 在讀取資料被釋放。其概念是當有 reader 要讀取資料時,都會建立一個 hazard pointer 指向該資料,直到 reader 讀取完畢才會移除 hazard pointer,而此時當有 writer 要移除資料時,若發現此資料在 hazard pointers 內,則不能直接移除,此時有兩種作法,一是一直等到資料該資料的 hazard pointer 被 reader 主動移除,writer 才能移除資料,二是先把此資料放到 retire list 中,等下次再來檢查是否有資料在 retire list 且不被 hazard pointer 所指向,此時才可以移除資料。

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

接著看到程式碼:

typedef struct __hp {
    uintptr_t ptr;
    struct __hp *next;
} hp_t;

typedef struct {
    hp_t *pointers;  /* hazard pointer structure */
    hp_t *retired;   /* retire list structure */
    void (*deallocator)(void *);
} domain_t;

static domain_t *config_dom;

config_dom 為管理 hazard pointers 與 retire list 的資料結構,pointersretired 皆為 linked list 結構,而 deallocator 為用來釋放資源的 function pointer。

/* * Load a safe pointer to a shared object. This pointer must be passed to * `drop` once it is no longer needed. Returns 0 (NULL) on error. */ uintptr_t load(domain_t *dom, const uintptr_t *prot_ptr) { const uintptr_t nullptr = 0; while (1) { uintptr_t val = atomic_load(prot_ptr); hp_t *node = list_insert_or_append(&dom->pointers, val); if (!node) return 0; /* Hazard pointer inserted successfully */ if (atomic_load(prot_ptr) == val) return val; /* * This pointer is being retired by another thread - remove this hazard * pointer and try again. We first try to remove the hazard pointer we * just used. If someone else used it to drop the same pointer, we walk * the list. */ uintptr_t tmp = val; if (!atomic_cas(&node->ptr, &tmp, &nullptr)) list_remove(&dom->pointers, val); } } /* * Drop a safe pointer to a shared object. This pointer (`safe_val`) must have * come from `load` */ void drop(domain_t *dom, uintptr_t safe_val) { if (!list_remove(&dom->pointers, safe_val)) __builtin_unreachable(); }

首先看到以上兩個與 reader 相關的函式:

  • load 函式的作用是讀取 prot_ptr 所指向的地址 val,並且把此 val 加入到 hazard pointer 中,注意到在程式碼第 16 行,由於 list_insert_or_append 過程中 val 可能被移除,因此必須重新讀取 port_ptr 確認其指向的地址沒變才代表讀取成功,否則需把 hazard pointer 移除後重新讀取新的 val
  • 在 reader 讀取資料成功後,需主動呼叫 drop 函式,此函式會把 safe_val 從 hazard pointers 結構中移除,此時才表示 reader 已經完成讀取動作,writer 才能直接移除資料。
static void cleanup_ptr(domain_t *dom, uintptr_t ptr, int flags)
{
    if (!list_contains(&dom->pointers, ptr)) { /* deallocate straight away */
        dom->deallocator((void *) ptr);
    } else if (flags & DEFER_DEALLOC) { /* Defer deallocation for later */
        list_insert_or_append(&dom->retired, ptr);
    } else { /* Spin until all readers are done, then deallocate */
        while (list_contains(&dom->pointers, ptr))
            ;
        dom->deallocator((void *) ptr);
    }
}

/* Swaps the contents of a shared pointer with a new pointer. The old value will
 * be deallocated by calling the `deallocator` function for the domain, provided
 * when `domain_new` was called. If `flags` is 0, this function will wait
 * until no more references to the old object are held in order to deallocate
 * it. If flags is `DEFER_DEALLOC`, the old object will only be deallocated
 * if there are already no references to it; otherwise the cleanup will be done
 * the next time `cleanup` is called.
 */
void swap(domain_t *dom, uintptr_t *prot_ptr, uintptr_t new_val, int flags)
{
    const uintptr_t old_obj = atomic_exchange(prot_ptr, new_val);
    cleanup_ptr(dom, old_obj, flags);
}

然後是與 writer 相關的函式:

  • swap 函式會將 port_ptr 指向 new_val ,並嘗試釋放 old_obj
  • cleanup_ptr 會嘗試把 old_obj 釋放,如同前述提過的,當發現 old_obj 仍在 hazard pointers 內,此時有兩個選擇。若 flags == DEFER_DEALLOC ,則把 old_obj 放到 retire list 中,等待下次再釋放;或是一直 busy waiting 直到 old_obj 被 reader 移出 hazard pointers 中,然後釋放資源。
#define N_READERS 10
#define N_WRITERS 1
#define N_ITERS 20

static void *reader_thread(void *arg)
{
    (void) arg;
    struct timespec t1 = {.tv_sec = 0, .tv_nsec = 100};

    for (int i = 0; i < N_ITERS; ++i) {
        config_t *safe_config =
            (config_t *) load(config_dom, (uintptr_t *) &shared_config);
        if (!safe_config)
            err(EXIT_FAILURE, "load");

        print_config("read config    ", safe_config);
        drop(config_dom, (uintptr_t) safe_config);
        nanosleep(&t1, NULL);
    }

    return NULL;
}

static void *writer_thread(void *arg)
{
    (void) arg;

    for (int i = 0; i < N_ITERS / 2; ++i) {
        config_t *new_config = create_config();
        new_config->v1 = rand();
        new_config->v2 = rand();
        new_config->v3 = rand();
        print_config("updating config", new_config);

        swap(config_dom, (uintptr_t *) &shared_config, (uintptr_t) new_config,
             DEFER_DEALLOC);
        print_config("updated config ", new_config);
    }

    return NULL;
}

以上是 reader thread 與 writer thread ,為了讓 reader 與 writer 比較容易能交互執行,我在 reader thread 使用了 nanosleep 讓每次 read 成功後都暫停 thread 執行時間至少 100 ns,執行結果片段如下。

read config     : { 0x00000000, 0x00000000, 0x00000000 }
read config     : { 0x00000000, 0x00000000, 0x00000000 }
updating config : { 0x6b8b4567, 0x327b23c6, 0x643c9869 }
updated config  : { 0x6b8b4567, 0x327b23c6, 0x643c9869 }
updating config : { 0x66334873, 0x74b0dc51, 0x19495cff }
updated config  : { 0x66334873, 0x74b0dc51, 0x19495cff }
updating config : { 0x2ae8944a, 0x625558ec, 0x238e1f29 }
read config     : { 0x2ae8944a, 0x625558ec, 0x238e1f29 }
read config     : { 0x2ae8944a, 0x625558ec, 0x238e1f29 }
read config     : { 0x2ae8944a, 0x625558ec, 0x238e1f29 }
read config     : { 0x2ae8944a, 0x625558ec, 0x238e1f29 }
updated config  : { 0x2ae8944a, 0x625558ec, 0x238e1f29 }
read config     : { 0x2ae8944a, 0x625558ec, 0x238e1f29 }
updating config : { 0x46e87ccd, 0x3d1b58ba, 0x507ed7ab }
read config     : { 0x2ae8944a, 0x625558ec, 0x238e1f29 }
updated config  : { 0x46e87ccd, 0x3d1b58ba, 0x507ed7ab }
/* Forces the cleanup of old objects that have not been deallocated yet. Just
 * like `swap`, if `flags` is 0, this function will wait until there are no
 * more references to each object. If `flags` is `DEFER_DEALLOC`, only
 * objects that already have no living references will be deallocated.
 */
void cleanup(domain_t *dom, int flags)
{
    hp_t *node;

    LIST_ITER (&dom->retired, node) {
        uintptr_t ptr = node->ptr;
        if (!ptr)
            continue;

        if (!list_contains(&dom->pointers, ptr)) {
            /* We can deallocate straight away */
            if (list_remove(&dom->pointers, ptr))
                dom->deallocator((void *) ptr);
        } else if (!(flags & DEFER_DEALLOC)) {
            /* Spin until all readers are done, then deallocate */
            while (list_contains(&dom->pointers, ptr))
                ;
            if (list_remove(&dom->pointers, ptr))
                dom->deallocator((void *) ptr);
        }
    }
}

cleanup 函式的目的是嘗試釋放在 retire list 中的資料,其檢查方式與 cleanup_ptr 相同。為此我另外建了一個 cleaner_thread 使用 timerfd 週期性的執行 cleanup 來釋放資源。

static void *cleaner_thread(void *arg)
{
    atomic_bool *stop = (atomic_bool *) arg;
    struct itimerspec new_value;
    new_value.it_value.tv_sec = 0;
    new_value.it_value.tv_nsec = 1000;

    new_value.it_interval.tv_sec = 1;
    new_value.it_interval.tv_nsec = 0;

    int fd = timerfd_create(CLOCK_MONOTONIC, 0);
    if (fd == -1)
        warn("timerfd_create");

    if (timerfd_settime(fd, TFD_TIMER_ABSTIME, &new_value, NULL))
        warn("timerfd_settime");

    while (!atomic_load (stop)) {
        uint64_t exp;
        ssize_t s = read(fd, &exp, sizeof(uint64_t));
        if (s != sizeof(uint64_t)) {
            warn("read failed");
            return NULL;
        }
        printf("cleanup\n");
        cleanup(config_dom, DEFER_DEALLOC);
    }
    close(fd);
    return NULL;
}

int main()
{
    pthread_t readers[N_READERS], writers[N_WRITERS], cleaner;
    atomic_bool stop = ATOMIC_VAR_INIT(false);

    init();

    /* Start threads */
    for (size_t i = 0; i < ARRAY_SIZE(readers); ++i) {
        if (pthread_create(readers + i, NULL, reader_thread, NULL))
            warn("pthread_create");
    }
    for (size_t i = 0; i < ARRAY_SIZE(writers); ++i) {
        if (pthread_create(writers + i, NULL, writer_thread, NULL))
            warn("pthread_create");
    }

    if (pthread_create(&cleaner, NULL, cleaner_thread, &stop))
        warn("pthread_create");

    /* Wait for threads to finish */
    for (size_t i = 0; i < ARRAY_SIZE(readers); ++i) {
        if (pthread_join(readers[i], NULL))
            warn("pthread_join");
    }
    for (size_t i = 0; i < ARRAY_SIZE(writers); ++i) {
        if (pthread_join(writers[i], NULL))
            warn("pthread_join");
    }

    atomic_store(&stop, true);
    if (pthread_join(cleaner, NULL))
        warn("pthread_join");

    deinit();

    return EXIT_SUCCESS;
}

Memory order

事前準備:

loadswap 都會操作相同的 share object prot_ptr ,而原程式碼對 prot_ptr 的 atomic 操作都是使用 sequentially-consistent memory order ,這裡嘗試更改成 release-acquire memory order,但首先要先搞懂兩者的差別。

首先看到 gcc docs 對 release-acquire memory order 的定義

__ATOMIC_ACQUIRE: Creates an inter-thread happens-before constraint from the release (or stronger) semantic store to this acquire load. Can prevent hoisting of code to before the operation.

__ATOMIC_RELEASE: Creates an inter-thread happens-before constraint to acquire (or stronger) semantic loads that read from this release store. Can prevent sinking of code to after the operation.

__ATOMIC_ACQ_REL: Combines the effects of both __ATOMIC_ACQUIRE and __ATOMIC_RELEASE.

acquire 與 release 需要成對的使用,是用來建立不同執行緒之間的 happens-before 關係,__ATOMIC_ACQUIRE 可以避免後面的指令被重排到此指令之前,而 __ATOMIC_RELEASE 可以避免前面的指令被重排到此指令之後。舉例來說

/* shared variable */
bool M = false;
int a = 0;

void *thread1() {
    a = 1;
    atomic_store_explicit(&M, true, memory_order_release);
}

void *thread2() {
    if (atomic_load_explicit(&M, memory_order_acquire)) {
        assert(a == 1);
    }
}

若 store M happens-before load M , 則在 thread2 M = true 時,必定能保證 a == 1 ,因為 thread1 a = 1; 的操作 happens-before store M

接著看到 sequentially-consistant memory order ,根據 C11 standard

$5.1.2.4.7: All modifications to a particular atomic object M occur in some particular total order, called the modification order of M. If A and B are modifications of an atomic object M, and A happens before B, then A shall precede B in the modification order of M.

$7.17.3.6: For memory_order_seq_cst, there shall be a single total order S on all operations, consistent with the ‘‘happens before’’ order and modification orders for all affected locations, such that each memory_order_seq_cst operation that loads a value observes either the last preceding modification according to this order S, or the result of an operation that is not memory_order_seq_cst.

意思是,對於每一個 memory_order_seq_cst 操作(無論在哪個 thread),都確保只會看到相同的修改順序,也就是 single total order。

單純看定義真的很難懂,後來我在 cppreference 找到一個例子,以及 How std::memory_order_seq_cst works 有比較白話的解釋,能夠清楚區分 memory_order_seq_cstmemory_order_acq_rel 的差別,程式碼如下

#include <thread>
#include <atomic>
#include <cassert>
 
std::atomic<bool> x = {false};
std::atomic<bool> y = {false};
std::atomic<int> z = {0};
 
void write_x()
{
    x.store(true, std::memory_order_seq_cst);
}
 
void write_y()
{
    y.store(true, std::memory_order_seq_cst);
}
 
void read_x_then_y()
{
    while (!x.load(std::memory_order_seq_cst))
        ;
    if (y.load(std::memory_order_seq_cst)) {
        ++z;
    }
}
 
void read_y_then_x()
{
    while (!y.load(std::memory_order_seq_cst))
        ;
    if (x.load(std::memory_order_seq_cst)) {
        ++z;
    }
}
 
int main()
{
    std::thread a(write_x);
    std::thread b(write_y);
    std::thread c(read_x_then_y);
    std::thread d(read_y_then_x);
    a.join(); b.join(); c.join(); d.join();
    assert(z.load() != 0);  // will never happen
}

上述程式碼能夠保證最後結果永遠為 z != 0 ,原因是,對於 x, y 兩變數,假設其修改順序為先 x = true; 然後才是 y = true;,則在任意時間點所有的 thread 都只有可能看到以下三種 x, y 的狀態

  • state1: x == false && y == false
  • state2: x == true && y == false
  • state3: x == true && y == true

因此,若 x, y 處於 state2 時,此時 thread c 開始執行,則 ++z; 可能會發生也可能不會發生,因為執行 thread c 期間有可能 x, y 變為 state 3。

而當 state2 對 thread c 為可見時,同一時間 thread d 也只可能看到 state2 ,因此 thread d 必定會執行到 ++z;

然而,若改為使用 memory_order_acq_rel ,當 thread a 的結果對 thread c 是可見時,這裡只保證了 thread a 之前的指令(此例子為空指令)對 thread c 也是可見的,並不表示 thread a 的結果對 thread d 也是可見的。

同理,當 thread b 的結果對 thread d 是可見的時,不表示 thread b 的結果對 thread c 也是可見的。

因此可能發生以下情況

  • thread a is happens-before thread c, but is not happens-before thread d
  • thread b is happens-before thread d, but is not happens-before thread c
  • thread c 看見 state: {x = true, y = false} 的同時,thread d 看見 state: {x = false, y = true}
  • 則此時 assert(z != 0) 發生

回到原題目程式碼

/* reader */ uintptr_t load(domain_t *dom, const uintptr_t *prot_ptr) { const uintptr_t nullptr = 0; while (1) { uintptr_t val = atomic_load(prot_ptr, __ATOMIC_ACQUIRE); hp_t *node = list_insert_or_append(&dom->pointers, val); if (!node) return 0; /* Hazard pointer inserted successfully */ if (atomic_load(prot_ptr, __ATOMIC_ACQUIRE) == val) return val; ... } } /* writer */ void swap(domain_t *dom, uintptr_t *prot_ptr, uintptr_t new_val, int flags) { const uintptr_t old_obj = atomic_exchange(prot_ptr, new_val, __ATOMIC_ACQ_REL); cleanup_ptr(dom, old_obj, flags); } static void *writer_thread(void *arg) { (void) arg; for (int i = 0; i < N_ITERS / 2; ++i) { config_t *new_config = create_config(); new_config->v1 = rand(); new_config->v2 = rand(); new_config->v3 = rand(); print_config("updating config", new_config); swap(config_dom, (uintptr_t *) &shared_config, (uintptr_t) new_config, DEFER_DEALLOC); print_config("updated config ", new_config); } return NULL; }

對於 reader 中對 prot_ptr 的 load 操作,只要保證 writer 的 store 操作(這裡指第 24 行的 atomic_exchange) 與其之前的指令 (如第 34~36 行 assign value to new_config) happens-before reader 的 load 操作即可。

閱讀 kdnvt 同學的共筆,發現一個我沒注意到的問題

reader | writer ----------------------------------------------------------- | /* store on ptr */ /* invisible to | val = atomic_exchange(ptr,new); reader */ | /* still in write buffer */ | | /* load old */ | if(!list_contain(val)){ /* store on list */ | list_insert(val); | | /* load on ptr | ptr = old */ | if(atomic_load(ptr) == val){ | return val; | } | | /* visible to | // ptr = new; reader now */ | free(val); | }

若使用 memory_order_acq_rel ,在 writer 執行第 3 行 val = atomic_exchange(ptr,new) 時,ptr = new 的結果會被存在硬體的 store buffer 上,接著照以上順序執行到第 14 行時,atomic_load(ptr) 仍會取到 old ,因此回傳 valold,此時執行 writer 的 CPU 才廣播 invalidate ptr 以為時已晚,執行 free(val) 就會造成 reader 讀到的值被釋放掉。

我認為 kdnvt 同學的說法是對的,但是我自己做修改如 commit d91dab 使用 memory_order_acq_rel ThreadSanitizer 也沒有報錯,嘗試建立更多的 writer/reader 如 commit 0e9baf 也沒有出錯,有可能只是剛好 CPU 沒有以此順序執行。

經老師的提示,我嘗試換個環境測試,我找了一片 Raspberry Pi 3B+ 來測試,執行環境如下

ubuntu@ubuntu:~$ uname -a
Linux ubuntu 5.13.0-1024-raspi #26-Ubuntu SMP PREEMPT Fri Mar 25 10:54:36 UTC 2022 aarch64 aarch64 aarch64 GNU/Linux

同樣執行 commit 0e9baf 也沒有出錯,因此開始嘗試減化程式碼讓測試更單純,也希望較少的程式碼能增加 memory reordering 機率,完整程式碼可見 hp_ordering,以下節錄片段

/* Only accept one reader */
static void *reader_thread(void *arg)
{
    (void) arg;
    const uintptr_t nullptr = 0;

    pthread_barrier_wait(&barr);

    for (int i = 0; i < N_ITERS; i++) {
        while (1) {
            uintptr_t val =
                (uintptr_t) __atomic_load_n(&shared_config, __ATOMIC_ACQUIRE);

            __atomic_store(&hp_ptr, &val, __ATOMIC_RELEASE);

            if (__atomic_load_n(&shared_config, __ATOMIC_ACQUIRE) ==
                (config_t *) val) {
                print_config("read config  ", (config_t *) val);
                break;
            }

            __atomic_store(&hp_ptr, &nullptr, __ATOMIC_RELEASE);
        }
    }

    __atomic_store(&hp_ptr, &nullptr, __ATOMIC_RELEASE);

    return NULL;
}

static void *writer_thread(void *arg)
{
    (void) arg;

    pthread_barrier_wait(&barr);

    for (int i = 0; i < N_ITERS; i++) {
        config_t *new_config = create_config();
        new_config->v1 = rand();
        const uintptr_t old_obj = (uintptr_t) __atomic_exchange_n(
            &shared_config, new_config, __ATOMIC_ACQ_REL);
        while (__atomic_load_n(&hp_ptr, __ATOMIC_ACQUIRE) == old_obj)
            ;
        delete_config((void *) old_obj);
    }

    return NULL;
}

首先在 writer 與 reader 開頭使用 pthread_barrier_wait,其作用是會等待多個執行緒都執行到同一個等待點才允許繼續執行,這樣做可以讓兩者交互執行機率變較高。

另外可以看到 reader 的部份我使用 hp_ptr 只允許有一個 reader,取代原本的 hazard pointer 結構,目的是減少兩次 load shared_config 之間的指令數量,希望能增加 reorder 機會,而 writer 部份也是盡可能簡化,不過同樣允許多個 writer 同時執行。

然而結果一樣不如我所想,ThreadSanitizer 沒有觸發 data race。

考量到有可能是 ThreadSanitizer 沒辦法偵測到這類型的 data race ,我另外修改了 commit 137592 用 assert 來偵測,但在 RPi 3 上仍然不會發生錯誤。

/* Only accept one reader */
static void *reader_thread(void *arg)
{
    (void) arg;
    const uintptr_t nullptr = 0;

    pthread_barrier_wait(&barr);

    for (int i = 0; i < N_ITERS; i++) {
        while (1) {
            uintptr_t val =
                (uintptr_t) __atomic_load_n(&shared_config, __ATOMIC_ACQUIRE);

            __atomic_store(&hp_ptr, &val, __ATOMIC_RELEASE);

            if (__atomic_load_n(&shared_config, __ATOMIC_ACQUIRE) ==
                (config_t *) val) {
                assert(((config_t *) val)->v1 != ERR_FATAL);
                break;
            }

            __atomic_store(&hp_ptr, &nullptr, __ATOMIC_RELEASE);
        }
    }

    __atomic_store(&hp_ptr, &nullptr, __ATOMIC_RELEASE);

    return NULL;
}

static void *writer_thread(void *arg)
{
    (void) arg;

    pthread_barrier_wait(&barr);

    for (int i = 0; i < N_ITERS; i++) {
        config_t *new_config = create_config();
        new_config->v1 = rand() & N_ITERS;
        config_t *old_obj =
            __atomic_exchange_n(&shared_config, new_config, __ATOMIC_ACQ_REL);
        while (__atomic_load_n(&hp_ptr, __ATOMIC_ACQUIRE) ==
               (uintptr_t) old_obj)
            ;
        // old_obj is retired
        old_obj->v1 = ERR_FATAL;
    }

    return NULL;
}

重新實驗

有鑑於我對 atomic operation 的實驗方式有些疑惑,因此決定從簡單的實驗做起。考慮以下學習 atomic operation 很常見的範例

/* X and published are both initialize as zero */

/* thread A */
atomic_store_explicit(&X, 1, memory_order_relaxed);
atomic_store_explicit(&published, 1, memory_order_relaxed);

/* thread B */
if(atomic_load_explicit(&published, memory_order_relaxed) == 1){
    assert(atomic_load_explicit(&X, memory_order_relaxed) == 1);
}

memory_order_relaxed 只保證了本身操作的 atomic,允許編譯器與處理器充份發揮 memory ordering,因此 thread A 的 Xpublished 操作是有可能被重新排序的,注意到只是可能,並不表示實際執行時一定會發生。實際實驗時,我模仿 Memory Reordering Caught in the Act 使用 semaphore 讓 thread A 與 thread B 能一直恢復 Xpublished 初始狀態重新執行,完整程式碼如下,我在 intel core i7-1165G7 與 Rasberry Pi 3 B+ 上兩個硬體上測試皆沒有觸發 assert

#include <stdio.h>
#include <assert.h>
#include <err.h>
#include <stdlib.h>
#include <stdatomic.h>
#include <pthread.h>
#include <semaphore.h>

static atomic_uint published;
static atomic_uint X;
static sem_t begin_sem1, begin_sem2, end_sem;

void *storer(void *arg){

    for(;;){
        sem_wait(&begin_sem1);

        atomic_store_explicit(&X, 1, memory_order_relaxed);
        atomic_store_explicit(&published, 1, memory_order_relaxed);
	
        sem_post(&end_sem);
    }
    return NULL;
}

void *loader(void *arg){
    
    for(;;){
        sem_wait(&begin_sem2);
    	if(atomic_load_explicit(&published, memory_order_relaxed) == 1){
            assert(atomic_load_explicit(&X, memory_order_relaxed) == 1);
    	}
        sem_post(&end_sem);
    }
    
    return NULL;
}
int main(){
    pthread_t lthread, sthread;
    sem_init(&begin_sem1, 0, 0);
    sem_init(&begin_sem2, 0, 0);
    sem_init(&end_sem, 0, 0);
    
    if(pthread_create(&lthread, NULL, loader, NULL))
        warn("pthread_create");

    if(pthread_create(&sthread, NULL, storer, NULL))
        warn("pthread_create");

    for(;;){
        atomic_store_explicit(&published, 0, memory_order_relaxed);
        atomic_store_explicit(&X, 0, memory_order_relaxed);

        sem_post(&begin_sem1);
        sem_post(&begin_sem2);

        sem_wait(&end_sem);
        sem_wait(&end_sem);
    }

    if(pthread_join(lthread, NULL))
        warn("pthread_join");

    if(pthread_join(sthread, NULL))
        warn("pthread_join");

    return 0;
}

我目前可以想到的情況是,在一個 CPU core pipeline 沒有很多指令需要執行時,也就是不夠忙碌時,其實不一定會嘗試做 memory reordering,因此,簡單的想法就是增加允許被重新排序的指令數量,如增加與 X 相同作用的 Y, Z 兩個變數,重新實驗在 RPi 3 上確實發生了assert

void *storer(void *arg){

    for(;;){
    	sem_wait(&begin_sem1);

        atomic_store_explicit(&X, 1, memory_order_relaxed);
+       atomic_store_explicit(&Y, 2, memory_order_relaxed);
+       atomic_store_explicit(&Z, 3, memory_order_relaxed);
        atomic_store_explicit(&published, 1, memory_order_relaxed);
	
    	sem_post(&end_sem);
    }
    return NULL;
}

void *loader(void *arg){
    
    for(;;){
        sem_wait(&begin_sem2);
        if(atomic_load_explicit(&published, memory_order_relaxed) == 1){
            assert(atomic_load_explicit(&X, memory_order_relaxed) == 1);
+           assert(atomic_load_explicit(&Y, memory_order_relaxed) == 2);
+           assert(atomic_load_explicit(&Z, memory_order_relaxed) == 3);
    	}
        sem_post(&end_sem);
    }
    
    return NULL;
}

用相同方法驗證 hazard pointer,如 commit 241b59,在 intel Core i7-1165G7 上也驗證到了以下 assert (在 RPi 3 上會因為資源不足導致 process 被 Kill)。

hp: main.c:79: reader_thread: Assertion `((config_t *) val)->v1 != ERR_FATAL' failed.
Aborted (core dumped)

從以上經驗可知,memory order 的驗證方式其實非常困難,很有可能即使概念是錯的,但剛好硬體沒有對指令重新排序,導致最後執行結果依舊成功,因此,在驗證上除了可以簡化程式碼,也可能需要使用 I/O ordering 學習紀錄Axiomatic validation of memory barriers and atomic instructions 裡提到的 litmus7 等驗證工具才能完整驗證概念的正確性。

Multiple writers

原程式碼如果有兩個以上的 writer ,執行時期 ThreadSanitizer 會偵測到 data race 如下

==================
WARNING: ThreadSanitizer: data race (pid=40244)
  Write of size 8 at 0x7b0400000008 by thread T5:
    #0 free ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:711 (libtsan.so.0+0x37ab8)
    #1 delete_config <null> (hp+0x15be)
    #2 writer_thread <null> (hp+0x1899)

  Previous read of size 4 at 0x7b0400000008 by thread T4:
    #0 writer_thread <null> (hp+0x18a1)

  Thread T5 (tid=40250, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605f8)
    #1 main <null> (hp+0x135a)

  Thread T4 (tid=40249, finished) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605f8)
    #1 main <null> (hp+0x133a)

SUMMARY: ThreadSanitizer: data race (/home/bakud/linux2022/hazard_pointer/hp+0x15be) in delete_config
==================

會產生 data race 的原因在於, writer 在 swap 成功後,試圖去讀取 new_config 資料,就可能發生在讀取當下有其他 wrtier 執行 delete_config 把資料釋放掉,造成錯誤結果。

解法如 commit a355f3 所示,在 writer 執行 swap 之前先將 new_config 加進 hazard pointer list 中,等待 writer 使用完畢後主動的刪除 new_config 的 hazard pointer,此舉可以想成把 writer 自己也當成一個 reader。