Try   HackMD

Linux 核心專題: 《Concurrency Primer》校訂和範例撰寫

執行人: GaberPlaysGame
Github
專題解說錄影

提問清單

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
提問清單

  • ?
  • 根據〈Concurrency Primer〉所言,使用 compare_exchange_weak 會忽略 "Spurious LL/SC Failure",這段我其實沒有理解的很清楚以及它可能的發生條件。
  • 我有試過將 free 與 malloc 包裝成 critical section,並利用 TAS 進行鎖的操作。問題似乎不會因此解決,但整體只發生一次,後續利用 GDB 測試的結果有看到部分執行緒被重新 join,因此懷疑是有一個或以上進行 producer 的執行緒卡住了 queue_pop 的執行。但後續的 data race 因為產出結果的 Previous Write 段落不完整,不太能夠解讀並想到要如何排錯。

任務簡述

檢視〈Concurrency Primer〉草稿並記錄閱讀過程中的疑惑,嘗試用 C11 Atomics 改寫原有程式碼,並確保在 x86-64 及 QEMU/Arm 正確運作。預計產出:

  • 以 C11 Atomics 實作經典 lock-free 程式
  • 探討 memory ordering/barrier

TODO: 校訂〈Concurrency Primer〉

檢視〈Concurrency Primer〉草稿並記錄閱讀過程中的疑惑,,參見〈Concurrency Primer〉導讀,確保原本的 C++ 程式都以 C11 Atomics 改寫並驗證。

改寫時,先列出原本的 C++11 程式碼,隨後則是改寫後的 C11 Atomics 實作

2. Enforcing law and order

C++11 程式碼:

#include <atomic>

int v = 0;
std::atomic_bool v_ready(false);

void threadA(){
    v = 42;
    v_ready = true;
}
void threadB(){
    while (!v_ready) { /* wait */ }
    const int my_v = v;
    // Do something with my_v...
}

以 C11 改寫後的程式碼:

#include <stdatomic.h>
#include <stdbool.h>

int v = 0;
atomic_bool v_ready = false;

void *threadA() {
    v = 42;
    v_ready = true;
}

void *threadB() {
    while(!v_ready) { /* wait */ }
    const int my_v = v;
    // Do something with my_v...
}

5.2 Test and set

C++11 程式碼:

std::atomic_flag af;

void lock() {
    while (af.test_and_set()) { /* wait */ }
}

void unlock() { af.clear(); }

C11 程式碼:

atomic_flag af = ATOMIC_FLAG_INIT;

void lock() {
    while (atomic_flag_test_and_set(&af)) { /* wait */ }
}

void unlock() { atomic_flag_clear(&af); }

這邊利用 TAS 與 Atomic Flag 製作一個類似於互斥鎖 (Mutex) 的程式。使用時將 lock 與 unlock 放在 Critical Section 頭尾便可。

根據 IBM Documentation 對 Atomic Flag 的解釋,若是 C11 Atomic Flag 沒有被初始化,則它會被初始成未定狀態。

You can use the macro ATOMIC_FLAG_INIT to initialize an atomic_flag object to the clear state. If an atomic_flag object is not explicitly initialized with ATOMIC_FLAG_INIT, it is initially in an indeterminate state.

5.4 Compare and swap

C++11 程式碼:

enum class TaskState : int8_t {
    Idle, Running, Cancelled
};
std::atomic<TaskState> ts;
void taskLoop()
{
    ts = TaskState::Running;
    while (ts == TaskState::Running) {
        // Do good work.
    }
}

bool cancel() {
    auto expected = TaskState::Running;
    return ts.compare_exchange_strong(expected, TaskState::Cancelled);
}

C11 程式碼:

enum taskstate {
    Idle, Running, Cancelled
};
typedef enum taskstate TaskState;

_Atomic TaskState ts;
void taskLoop() {
    ts = Running;
    while (ts == Running) {
        // Do good work.
    }
}

bool cancel() {
    TaskState expected = Running;
    return atomic_compare_exchange_strong(&ts, &expected, Cancelled);
} 

CAS (又稱 Compare Exchange) 可以用來比較並在 ts = expected 時修改數值,在這個例子中它被用來停止迴圈。

8.1 Spurious LL/SC failures

C++11 程式碼:

void atomicMultiply(int by)
{
    int expected = foo;
    // Which CAS should we use?
    while (!foo.compare_exchange_?(expected, expected * by)) {
        // Empty loop.
        // (On failure, expected is updated with foo's most recent value.)
    }
}

C11 程式碼:

atomic_int foo;
void atomicMultiply(int by)
{
    int expected = foo;
    // Which CAS should we use?
    while (!atomic_compare_exchange_?(&foo, &expected, expected * by)) {
        // Empty loop.
        // (On failure, expected is updated with foo's most recent value.)
    }
}

這邊 Concurrency Primer 認為說,在使用 _strong 函式的情況下編譯器必須生成內外兩個迴圈來防止我們遇到 Sequential Consistent 錯誤,但其實我們不在乎錯誤發生何時發生,因此另有 _weak 版本的函式讓 CAS 只要沒有正確完成時就直接回傳。

10 Memory ordering

C++11 程式碼:

void lock()
{
    while (af.test_and_set(memory_order_acquire)) {
        /* wait */
    }
}

void unlock()
{
    af.clear(memory_order_release);
}

int i = foo.load(memory_order_acquire);
void atomicMultiply(int by) {
    int expected = foo;
    while (!foo.compare_exchange_weak(
        expected, expected * by,
        memory_order_seq_cst, // On success
        memory_order_relaxed)) // On failure
        { /* empty loop */ }
}

C11 程式碼:

atomic_flag af = ATOMIC_FLAG_INIT;

void lock() {
    while (atomic_flag_test_and_set_explicit(&af, memory_order_acquire)) {
        /* wait */
    }
}

void unlock() {
    atomic_flag_clear_explicit(&af, memory_order_release);
}

int i = atomic_load_explicit(&foo, memory_order_acquire);
void atomicMultiply(int by) {
    int expected = foo;
    while (!atomic_compare_exchange_weak_explicit(
        &foo,
        &expected, 
        expected * by,
        memory_order_seq_cst, // On success
        memory_order_relaxed)) // On failure
        { /* empty loop */ }
}

這邊必須將原本的函式後方加上 _explicit,才可以設定 Memory Order。

10.1 Acquire and release

C++11 程式碼:

int acquireFoo()
{
    return foo.load(memory_order_acquire);
}
void releaseFoo(int i)
{
    foo.store(i, memory_order_release);
}

int v;
std::atomic_bool v_ready(false);
void threadA()
{
    v = 42;
    v_ready.store(true, memory_order_release);
}
void threadB()
{
    while (!v_ready.load(memory_order_acquire)) {
    // wait
    }
    assert(v == 42); // Must be true
}

C11 程式碼:

int acquireFoo()
{
    return atomic_load_explicit(&foo, memory_order_acquire);
}
void releaseFoo(int i)
{
    atomic_store_explicit(&foo, i, memory_order_release);
}

int v;
atomic_bool v_ready = false;
void threadA()
{
    v = 42;
    atomic_store_explicit(&v_ready, true, memory_order_release);
}
void threadB()
{
    while (!atomic_load_explicit(&v_ready, memory_order_acquire)) {
    // wait
    }
    assert(v == 42); // Must be true
}

Acquire 與 Release 是兩個成對的記憶體排序。Acquire 會將任何所有在它之後的記憶體操作維持排序並與其他 CPU 同步,通常會搭配記憶體柵欄來防止 CPU 重新排列操作。Release 則是與其相反。兩者常被用來進行鎖的實作,也因此 Acquire 常被用在 Load 操作,而 Release 被用在 Store 操作。

10.2 Relaxed

C++11 程式碼:

atomic_bool stop(false);
void worker()
{
    while (!stop.load(memory_order_relaxed)) {
        // Do good work.
    }
}

void atomicMultiply(int by)
{
    int expected = foo.load(memory_order_relaxed);
    while (!foo.compare_exchange_weak(
        expected, expected * by,
        memory_order_release,
        memory_order_relaxed)) {
            /* empty loop */
        }
}

int main()
{
    launchWorker();
    // Wait some...
    stop = true; // seq_cst
    joinWorker();
}

C11 程式碼:

atomic_bool stop = false;
void worker()
{
    while (!atomic_load_explicit(&stop, memory_order_relaxed)) {
        // Do good work.
    }
}

void atomicMultiply(int by)
{
    int expected = atomic_load_explicit(&foo, memory_order_relaxed);
    while (!atomic_compare_exchange_weak_explicit(
        &foo,
        &expected, 
        expected * by,
        memory_order_release,
        memory_order_relaxed)) {
            /* empty loop */
        }
}

int main()
{
    launchWorker();
    // Wait some...
    stop = true; // seq_cst
    joinWorker();
}

Relaxed 是相對 Acquire Release 來講比較弱的記憶體操作,它可以保證兩個 Relaxed 操作不會被重新排序,但它不會為其他 CPU 去做同步操作。

10.3 Acquire-Release

C++11 程式碼:

atomic_int refCount;
void inc()
{
    refCount.fetch_add(1, memory_order_relaxed);
}
void dec()
{
    if (refCount.fetch_sub(1, memory_order_acq_rel) == 1) {
        // No more references, delete the data.
    }
}

C11 程式碼:

atomic_int refCount;
void inc()
{
    atomic_fetch_add_explicit(&refCount, 1, memory_order_relaxed);
}
void dec()
{
    if (atomic_fetch_sub_explicit(&refCount, 1, memory_order_acq_rel) == 1) {
        // No more references, delete the data.
    }
}

Acq-Rel 可以說類似於上面提到的 Acquire 與 Release,只不過它被特化用於同時需要進行讀取與儲存的函式,如 atomic_compare_exchange 系列函式。

另外還有一種排序為 SeqCst,保證程式之間可以有 Sequential Consistency,被認為是最強的記憶體排序,但因為每個 Atomic 操作都會對 CPU 快取造成一定的開銷,SeqCst 的花費自然比其他記憶體操作來得要大,因此它並不是唯一推薦的記憶體排序。

TODO: 以 C11 Atomics 實作經典 lock-free 程式

以 C11 Atomics 改寫 hungry-birds 並解釋運作原理 (注意: 存在若干實作錯誤),需要用 ThreadSanitizer 驗證,作為日後教材的補充內容。

對照 Issue #5
延伸閱讀: Atomics

運作原理

static const size_t sentinel = 0xdeadc0de;
static const size_t alignment = sizeof(size_t);

typedef struct node {
    atomic_uintptr_t next;
} node;

struct __QueueInternal {
    atomic_uintptr_t head;
    atomic_uintptr_t tail;
    size_t item_size;
};

該 lock-free 程式使用 C11 Atomics 實作佇列,利用 atomic_uintptr_t 這個指標類型儲存佇列頭尾以及 next 的值以構成 Linked List 的結構。

  • head:佇列頭,主要用於 queue_pop 操作。
  • tail:佇列尾,主要用於 queue_push 操作。
  • item_size:儲存佇列中每個元素的大小。

queue_create

queue_p queue_create(size_t item_size)
{
    size_t *ptr = calloc(sizeof(struct __QueueInternal) + alignment, 1);
    ptr[0] = sentinel;                                     
    queue_p q = (queue_p)(ptr + 1);
    atomic_init(&q->head, 0);
    atomic_init(&q->tail, 0);
    q->item_size = item_size;
    return q;
}

在該程式創建佇列時,會利用 sentinel 這個 Hexspeak 去表示佇列的開頭,並將頭尾用 atomic 函式設置為 0。

queue_push

QueueResult queue_push(queue_p q, void *data)
{
    assert(q);
    /* create the new tail */
    node *new_tail = malloc(sizeof(node) + q->item_size);
    if (!new_tail) {
        return QUEUE_OUT_OF_MEMORY;
    }

    atomic_init(&new_tail->next, 0);
    memcpy(new_tail + 1, data, q->item_size); 

    /* swap the new tail with the old */
    node *old_tail = (node *) atomic_exchange(&q->tail,
                                              (uintptr_t) new_tail);

    /* link the old tail to the new */
    if (old_tail) {
        atomic_store(&old_tail->next, (uintptr_t) new_tail);
    } else {
        atomic_store(&q->head, (uintptr_t) new_tail);
    }
    return QUEUE_SUCCESS;
}

推入佇列的動作由 queue_push 函式達成,程式會首先配置一塊新的記憶體區塊 new_tail,並利用 atomic_init 初始化。接下來透過 atomic_exchange 操作交換佇列尾端與 new_tail 的值,取得下來的指標這邊以 old_tail 表示。

交換之後 old_tail 會有兩種可能情況:

  • 若 old_tail 為 0,則根據 queue_create 函式的設置可知道佇列此時沒有任何元素存在,因此將佇列的頭利用 atomic_store 存入剛剛配置的 new_tail 的值。
  • 若 old_tail 不為 0,則代表佇列尾早前已經被存入其它記憶體的值,因此就只需要將 old_tail 的下一個指標設為 new_tail 即可。

queue_pop

QueueResult queue_has_front(queue_p q)
{
    assert(q);
    if (atomic_load(&q->head) == 0)
        return QUEUE_FALSE;
    return QUEUE_TRUE;
}

QueueResult queue_pop(queue_p q)
{
    assert(q);
    assert(queue_has_front(q) == QUEUE_TRUE);

    /* get the head */
    node *popped = (node *) atomic_load(&q->head);
    node *compare = popped;

    /* set the tail and head to nothing if they are the same */
    if (atomic_compare_exchange_strong(&q->tail, &compare, 0)) {
        compare = popped;
        /* it is possible for another thread to have pushed after
         * we swap out the tail. In this case, the head will be different
         * then what was popped, so we just do a blind exchange regardless
         * of the result.
         */
        atomic_compare_exchange_strong(&q->head, &compare, 0); 
    } else {
        /* tail is different from head, set the head to the next value */
        node *new_head = 0;
        while (!new_head) {
            /* it is possible that the next node has not been assigned yet,
             * so just spin until the pushing thread stores the value.
	     */
            new_head = (node *) atomic_load_explicit(&popped->next, memory_order_acquire);
        }
        atomic_store_explicit(&q->head, (uintptr_t) new_head, memory_order_release);      
    }
    free(popped);
    return QUEUE_SUCCESS;
}

取出佇列的動作由 queue_pop 完成,函式首先將 q->head 的初始值存儲於 popped 指標上,並將 popped 的值複製到 compare 並進行 CAS。

  • 佇列要先透過 queue_has_front 函式確認有元素,否則會產生斷言錯誤。
  • compare 的值與佇列尾相同,則代表這個佇列只有一個元素。
    • 佇列尾會被設為 0,且將 popped 的值再次複製到 compare 一次。這個操作我不清楚原因,但我認為是為了確保接下來與佇列頭的比較可以順利在 MPSC 的情況下執行。
    • 由於佇列尾被設為 0,因此當有其它執行緒同時進行 queue_push 時,佇列會被判定為「沒有元素」的情況,進而更改了佇列頭的值。因此,進行 q->head 與 compare 的比較時結果可能不相同。因此函式再次進行一次 CAS,若佇列頭的值沒有被更動則將其設為 0。
  • compare 的值與佇列尾不同,則佇列有多個元素。
    • 新建一個指標 new_head 用來複製原本佇列頭的下一個值。由於佇列頭的下一個值可能還沒有被存入,即有執行緒還在 queue_pushatomic_store 區塊,因此這邊函式利用 while 迴圈讓 new_head 反覆存取 popped->next 直到其非 0。
    • new_head 存取到值後,q->head 會存入 new_head 的值而完成操作。

當以上操作執行完後 popped 指標的記憶體會被釋放並回傳取出成功。

以 ThreadSanitizer 驗證

無修改運行時程式不會提示任何錯誤而順利結束。若在 Makefile 內加上 -fsanitize=thread 標籤後運行,當程式進行到 Stress Test 區塊可以看到終端機發現有 data race 發生在 queue_popqueue_push 函式,進而導致程式無法順利結束。

WARNING: ThreadSanitizer: data race (pid=16033)
  Write of size 8 at 0x7b0400000800 by thread T1:
    #0 free ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:711 (libtsan.so.0+0x37ab8)
    #1 queue_pop /home/gaberil0903/linux2023/hungry-birds/queue.c:79 (pc-test+0x1fde)

  Previous write of size 8 at 0x7b0400000800 by thread T2:
    #0 malloc ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:655 (libtsan.so.0+0x31c57)
    #1 queue_push /home/gaberil0903/linux2023/hungry-birds/queue.c:87 (pc-test+0x205f)

  Thread T1 (tid=16035, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
    #1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:129 (pc-test+0x19dd)

  Thread T2 (tid=16036, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
    #1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:133 (pc-test+0x1a55)

SUMMARY: ThreadSanitizer: data race /home/gaberil0903/linux2023/hungry-birds/queue.c:79 in queue_pop
WARNING: ThreadSanitizer: data race (pid=16033)
  Write of size 8 at 0x7b0400000808 by thread T1:
    #0 free ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:711 (libtsan.so.0+0x37ab8)
    #1 queue_pop /home/gaberil0903/linux2023/hungry-birds/queue.c:79 (pc-test+0x1fde)

  Previous write of size 8 at 0x7b0400000808 by thread T2:
    [failed to restore the stack]

  Thread T1 (tid=16035, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
    #1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:129 (pc-test+0x19dd)

  Thread T2 (tid=16036, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
    #1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:133 (pc-test+0x1a55)

SUMMARY: ThreadSanitizer: data race /home/gaberil0903/linux2023/hungry-birds/queue.c:79 in queue_pop

以 GDB 分析過後可以發現儘管進行了多次的 queue_push,data race 總是會在第一次的 queue_pop 的 free 函式執行時發生,原因疑似是 queue_pop 執行 free 函式時,queue_push 同時對記憶體進行 malloc

我嘗試利用 atomic_test_and_set 函式,設置一個 atomic_flag 來達到類似互斥鎖的效果去執行問題段,雖未能解決大部分問題,但 data race 的發生時間有所變化。

WARNING: ThreadSanitizer: data race (pid=9070)
  Write of size 8 at 0x7b0400000800 by thread T1:
    #0 free ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:711 (libtsan.so.0+0x37ab8)
    #1 queue_pop /home/gaberil0903/linux2023/hungry-birds/queue.c:81 (pc-test+0x1fee)

  Previous write of size 8 at 0x7b0400000800 by thread T2:
    #0 malloc ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:655 (libtsan.so.0+0x31c57)
    #1 queue_push /home/gaberil0903/linux2023/hungry-birds/queue.c:92 (pc-test+0x208a)

  Thread T1 (tid=9072, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
    #1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:130 (pc-test+0x19dd)

  Thread T2 (tid=9073, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
    #1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:134 (pc-test+0x1a55)

SUMMARY: ThreadSanitizer: data race /home/gaberil0903/linux2023/hungry-birds/queue.c:81 in queue_pop

第一次的 data race 一樣發生在第一次的 free 與 malloc 同時進行。

WARNING: ThreadSanitizer: data race (pid=9070)
  Write of size 8 at 0x7b0400000818 by thread T1:
    #0 free ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:711 (libtsan.so.0+0x37ab8)
    #1 queue_pop /home/gaberil0903/linux2023/hungry-birds/queue.c:81 (pc-test+0x1fee)

  Previous write of size 1 at 0x7b040000081b by thread T2:
    #0 memcpy ../../../../src/libsanitizer/sanitizer_common/sanitizer_common_interceptors.inc:827 (libtsan.so.0+0x6243e)
    #1 memcpy ../../../../src/libsanitizer/sanitizer_common/sanitizer_common_interceptors.inc:819 (libtsan.so.0+0x6243e)
    #2 queue_push /home/gaberil0903/linux2023/hungry-birds/queue.c:102 (pc-test+0x20f4)

  Thread T1 (tid=9072, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
    #1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:130 (pc-test+0x19dd)

  Thread T2 (tid=9073, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
    #1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:134 (pc-test+0x1a55)

SUMMARY: ThreadSanitizer: data race /home/gaberil0903/linux2023/hungry-birds/queue.c:81 in queue_pop

第二次的 data race 為新出現的問題,在第二次的 free 與 memcpy 同時進行的情況下發生。

WARNING: ThreadSanitizer: data race (pid=9070)
  Write of size 8 at 0x7b040002e0d0 by thread T1:
    #0 free ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:711 (libtsan.so.0+0x37ab8)
    #1 queue_pop /home/gaberil0903/linux2023/hungry-birds/queue.c:81 (pc-test+0x1fee)

  Previous write of size 8 at 0x7b040002e0d0 by thread T11:
    [failed to restore the stack]

  Thread T1 (tid=9072, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
    #1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:130 (pc-test+0x19dd)

  Thread T11 (tid=9082, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
    #1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:134 (pc-test+0x1a55)

SUMMARY: ThreadSanitizer: data race /home/gaberil0903/linux2023/hungry-birds/queue.c:81 in queue_pop

第三次的 data race 與原本的第二次 data race 為同種問題,但發生時間被延後到數次 free 函式後,而非原本的在第一次 free 發生的情況。

以 atomics 保護 critical section

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
jserv