# Linux 核心專題: 《Concurrency Primer》校訂和範例撰寫
> 執行人: GaberPlaysGame
> [Github](https://github.com/GaberPlaysGame/hungry-birds)
> [專題解說錄影](https://youtu.be/OWCRpzT100o)
## 提問清單
:::success
:question: 提問清單
* ?
:::
* 根據〈Concurrency Primer〉所言,使用 compare_exchange_weak 會忽略 "Spurious LL/SC Failure",這段我其實沒有理解的很清楚以及它可能的發生條件。
* 我有試過將 free 與 malloc 包裝成 critical section,並利用 TAS 進行鎖的操作。問題似乎不會因此解決,但整體只發生一次,後續利用 GDB 測試的結果有看到部分執行緒被重新 join,因此懷疑是有一個或以上進行 producer 的執行緒卡住了 queue_pop 的執行。但後續的 data race 因為產出結果的 Previous Write 段落不完整,不太能夠解讀並想到要如何排錯。
## 任務簡述
檢視〈Concurrency Primer〉草稿並記錄閱讀過程中的疑惑,嘗試用 C11 Atomics 改寫原有程式碼,並確保在 x86-64 及 QEMU/Arm 正確運作。預計產出:
* 以 C11 Atomics 實作經典 lock-free 程式
* 探討 memory ordering/barrier
## TODO: 校訂〈Concurrency Primer〉
檢視〈Concurrency Primer〉草稿並記錄閱讀過程中的疑惑,,參見[〈Concurrency Primer〉導讀](https://hackmd.io/@sysprog/concurrency-primer),確保原本的 C++ 程式都以 C11 Atomics 改寫並驗證。
> 改寫時,先列出原本的 C++11 程式碼,隨後則是改寫後的 C11 Atomics 實作
### 2. Enforcing law and order
C++11 程式碼:
```cpp
#include <atomic>
int v = 0;
std::atomic_bool v_ready(false);
void threadA(){
v = 42;
v_ready = true;
}
void threadB(){
while (!v_ready) { /* wait */ }
const int my_v = v;
// Do something with my_v...
}
```
以 C11 改寫後的程式碼:
```c
#include <stdatomic.h>
#include <stdbool.h>
int v = 0;
atomic_bool v_ready = false;
void *threadA() {
v = 42;
v_ready = true;
}
void *threadB() {
while(!v_ready) { /* wait */ }
const int my_v = v;
// Do something with my_v...
}
```
### 5.2 Test and set
C++11 程式碼:
```cpp
std::atomic_flag af;
void lock() {
while (af.test_and_set()) { /* wait */ }
}
void unlock() { af.clear(); }
```
C11 程式碼:
```c
atomic_flag af = ATOMIC_FLAG_INIT;
void lock() {
while (atomic_flag_test_and_set(&af)) { /* wait */ }
}
void unlock() { atomic_flag_clear(&af); }
```
這邊利用 TAS 與 Atomic Flag 製作一個類似於互斥鎖 (Mutex) 的程式。使用時將 lock 與 unlock 放在 Critical Section 頭尾便可。
根據 [IBM Documentation](https://www.ibm.com/docs/en/zos/2.1.0?topic=c11-types) 對 Atomic Flag 的解釋,若是 C11 Atomic Flag 沒有被初始化,則它會被初始成未定狀態。
> You can use the macro ATOMIC_FLAG_INIT to initialize an atomic_flag object to the clear state. If an atomic_flag object is not explicitly initialized with ATOMIC_FLAG_INIT, it is initially in an indeterminate state.
### 5.4 Compare and swap
C++11 程式碼:
```cpp
enum class TaskState : int8_t {
Idle, Running, Cancelled
};
std::atomic<TaskState> ts;
void taskLoop()
{
ts = TaskState::Running;
while (ts == TaskState::Running) {
// Do good work.
}
}
bool cancel() {
auto expected = TaskState::Running;
return ts.compare_exchange_strong(expected, TaskState::Cancelled);
}
```
C11 程式碼:
```c
enum taskstate {
Idle, Running, Cancelled
};
typedef enum taskstate TaskState;
_Atomic TaskState ts;
void taskLoop() {
ts = Running;
while (ts == Running) {
// Do good work.
}
}
bool cancel() {
TaskState expected = Running;
return atomic_compare_exchange_strong(&ts, &expected, Cancelled);
}
```
CAS (又稱 Compare Exchange) 可以用來比較並在 ts = expected 時修改數值,在這個例子中它被用來停止迴圈。
### 8.1 Spurious LL/SC failures
C++11 程式碼:
```cpp
void atomicMultiply(int by)
{
int expected = foo;
// Which CAS should we use?
while (!foo.compare_exchange_?(expected, expected * by)) {
// Empty loop.
// (On failure, expected is updated with foo's most recent value.)
}
}
```
C11 程式碼:
```c
atomic_int foo;
void atomicMultiply(int by)
{
int expected = foo;
// Which CAS should we use?
while (!atomic_compare_exchange_?(&foo, &expected, expected * by)) {
// Empty loop.
// (On failure, expected is updated with foo's most recent value.)
}
}
```
這邊 Concurrency Primer 認為說,在使用 _strong 函式的情況下編譯器必須生成內外兩個迴圈來防止我們遇到 Sequential Consistent 錯誤,但其實我們不在乎錯誤發生何時發生,因此另有 _weak 版本的函式讓 CAS 只要沒有正確完成時就直接回傳。
### 10 Memory ordering
C++11 程式碼:
```cpp
void lock()
{
while (af.test_and_set(memory_order_acquire)) {
/* wait */
}
}
void unlock()
{
af.clear(memory_order_release);
}
int i = foo.load(memory_order_acquire);
void atomicMultiply(int by) {
int expected = foo;
while (!foo.compare_exchange_weak(
expected, expected * by,
memory_order_seq_cst, // On success
memory_order_relaxed)) // On failure
{ /* empty loop */ }
}
```
C11 程式碼:
```c
atomic_flag af = ATOMIC_FLAG_INIT;
void lock() {
while (atomic_flag_test_and_set_explicit(&af, memory_order_acquire)) {
/* wait */
}
}
void unlock() {
atomic_flag_clear_explicit(&af, memory_order_release);
}
int i = atomic_load_explicit(&foo, memory_order_acquire);
void atomicMultiply(int by) {
int expected = foo;
while (!atomic_compare_exchange_weak_explicit(
&foo,
&expected,
expected * by,
memory_order_seq_cst, // On success
memory_order_relaxed)) // On failure
{ /* empty loop */ }
}
```
這邊必須將原本的函式後方加上 _explicit,才可以設定 Memory Order。
### 10.1 Acquire and release
C++11 程式碼:
```cpp
int acquireFoo()
{
return foo.load(memory_order_acquire);
}
void releaseFoo(int i)
{
foo.store(i, memory_order_release);
}
int v;
std::atomic_bool v_ready(false);
void threadA()
{
v = 42;
v_ready.store(true, memory_order_release);
}
void threadB()
{
while (!v_ready.load(memory_order_acquire)) {
// wait
}
assert(v == 42); // Must be true
}
```
C11 程式碼:
```c
int acquireFoo()
{
return atomic_load_explicit(&foo, memory_order_acquire);
}
void releaseFoo(int i)
{
atomic_store_explicit(&foo, i, memory_order_release);
}
int v;
atomic_bool v_ready = false;
void threadA()
{
v = 42;
atomic_store_explicit(&v_ready, true, memory_order_release);
}
void threadB()
{
while (!atomic_load_explicit(&v_ready, memory_order_acquire)) {
// wait
}
assert(v == 42); // Must be true
}
```
Acquire 與 Release 是兩個成對的記憶體排序。Acquire 會將任何所有在它之後的記憶體操作維持排序並與其他 CPU 同步,通常會搭配記憶體柵欄來防止 CPU 重新排列操作。Release 則是與其相反。兩者常被用來進行鎖的實作,也因此 Acquire 常被用在 Load 操作,而 Release 被用在 Store 操作。
### 10.2 Relaxed
C++11 程式碼:
```cpp
atomic_bool stop(false);
void worker()
{
while (!stop.load(memory_order_relaxed)) {
// Do good work.
}
}
void atomicMultiply(int by)
{
int expected = foo.load(memory_order_relaxed);
while (!foo.compare_exchange_weak(
expected, expected * by,
memory_order_release,
memory_order_relaxed)) {
/* empty loop */
}
}
int main()
{
launchWorker();
// Wait some...
stop = true; // seq_cst
joinWorker();
}
```
C11 程式碼:
```c
atomic_bool stop = false;
void worker()
{
while (!atomic_load_explicit(&stop, memory_order_relaxed)) {
// Do good work.
}
}
void atomicMultiply(int by)
{
int expected = atomic_load_explicit(&foo, memory_order_relaxed);
while (!atomic_compare_exchange_weak_explicit(
&foo,
&expected,
expected * by,
memory_order_release,
memory_order_relaxed)) {
/* empty loop */
}
}
int main()
{
launchWorker();
// Wait some...
stop = true; // seq_cst
joinWorker();
}
```
Relaxed 是相對 Acquire Release 來講比較弱的記憶體操作,它可以保證兩個 Relaxed 操作不會被重新排序,但它不會為其他 CPU 去做同步操作。
### 10.3 Acquire-Release
C++11 程式碼:
```cpp
atomic_int refCount;
void inc()
{
refCount.fetch_add(1, memory_order_relaxed);
}
void dec()
{
if (refCount.fetch_sub(1, memory_order_acq_rel) == 1) {
// No more references, delete the data.
}
}
```
C11 程式碼:
```c
atomic_int refCount;
void inc()
{
atomic_fetch_add_explicit(&refCount, 1, memory_order_relaxed);
}
void dec()
{
if (atomic_fetch_sub_explicit(&refCount, 1, memory_order_acq_rel) == 1) {
// No more references, delete the data.
}
}
```
Acq-Rel 可以說類似於上面提到的 Acquire 與 Release,只不過它被特化用於同時需要進行讀取與儲存的函式,如 atomic_compare_exchange 系列函式。
另外還有一種排序為 SeqCst,保證程式之間可以有 Sequential Consistency,被認為是最強的記憶體排序,但因為每個 Atomic 操作都會對 CPU 快取造成一定的開銷,SeqCst 的花費自然比其他記憶體操作來得要大,因此它並不是唯一推薦的記憶體排序。
## TODO: 以 C11 Atomics 實作經典 lock-free 程式
以 C11 Atomics 改寫 [hungry-birds](https://github.com/jserv/hungry-birds) 並解釋運作原理 (注意: 存在若干實作錯誤),需要用 ThreadSanitizer 驗證,作為日後教材的補充內容。
> 對照 [Issue #5](https://github.com/jserv/hungry-birds/issues/5)
> 延伸閱讀: [Atomics](https://wusyong.github.io/posts/atomic/)
### 運作原理
```c
static const size_t sentinel = 0xdeadc0de;
static const size_t alignment = sizeof(size_t);
typedef struct node {
atomic_uintptr_t next;
} node;
struct __QueueInternal {
atomic_uintptr_t head;
atomic_uintptr_t tail;
size_t item_size;
};
```
該 lock-free 程式使用 C11 Atomics 實作佇列,利用 `atomic_uintptr_t` 這個指標類型儲存佇列頭尾以及 `next` 的值以構成 Linked List 的結構。
- `head`:佇列頭,主要用於 `queue_pop` 操作。
- `tail`:佇列尾,主要用於 `queue_push` 操作。
- `item_size`:儲存佇列中每個元素的大小。
#### queue_create
```c
queue_p queue_create(size_t item_size)
{
size_t *ptr = calloc(sizeof(struct __QueueInternal) + alignment, 1);
ptr[0] = sentinel;
queue_p q = (queue_p)(ptr + 1);
atomic_init(&q->head, 0);
atomic_init(&q->tail, 0);
q->item_size = item_size;
return q;
}
```
在該程式創建佇列時,會利用 sentinel 這個 [Hexspeak](https://en.wikipedia.org/wiki/Hexspeak) 去表示佇列的開頭,並將頭尾用 atomic 函式設置為 0。
#### queue_push
```c
QueueResult queue_push(queue_p q, void *data)
{
assert(q);
/* create the new tail */
node *new_tail = malloc(sizeof(node) + q->item_size);
if (!new_tail) {
return QUEUE_OUT_OF_MEMORY;
}
atomic_init(&new_tail->next, 0);
memcpy(new_tail + 1, data, q->item_size);
/* swap the new tail with the old */
node *old_tail = (node *) atomic_exchange(&q->tail,
(uintptr_t) new_tail);
/* link the old tail to the new */
if (old_tail) {
atomic_store(&old_tail->next, (uintptr_t) new_tail);
} else {
atomic_store(&q->head, (uintptr_t) new_tail);
}
return QUEUE_SUCCESS;
}
```
推入佇列的動作由 queue_push 函式達成,程式會首先配置一塊新的記憶體區塊 new_tail,並利用 atomic_init 初始化。接下來透過 atomic_exchange 操作交換佇列尾端與 new_tail 的值,取得下來的指標這邊以 old_tail 表示。
交換之後 old_tail 會有兩種可能情況:
- 若 old_tail 為 0,則根據 queue_create 函式的設置可知道佇列此時沒有任何元素存在,因此將佇列的頭利用 atomic_store 存入剛剛配置的 new_tail 的值。
- 若 old_tail 不為 0,則代表佇列尾早前已經被存入其它記憶體的值,因此就只需要將 old_tail 的下一個指標設為 new_tail 即可。
#### queue_pop
```c
QueueResult queue_has_front(queue_p q)
{
assert(q);
if (atomic_load(&q->head) == 0)
return QUEUE_FALSE;
return QUEUE_TRUE;
}
QueueResult queue_pop(queue_p q)
{
assert(q);
assert(queue_has_front(q) == QUEUE_TRUE);
/* get the head */
node *popped = (node *) atomic_load(&q->head);
node *compare = popped;
/* set the tail and head to nothing if they are the same */
if (atomic_compare_exchange_strong(&q->tail, &compare, 0)) {
compare = popped;
/* it is possible for another thread to have pushed after
* we swap out the tail. In this case, the head will be different
* then what was popped, so we just do a blind exchange regardless
* of the result.
*/
atomic_compare_exchange_strong(&q->head, &compare, 0);
} else {
/* tail is different from head, set the head to the next value */
node *new_head = 0;
while (!new_head) {
/* it is possible that the next node has not been assigned yet,
* so just spin until the pushing thread stores the value.
*/
new_head = (node *) atomic_load_explicit(&popped->next, memory_order_acquire);
}
atomic_store_explicit(&q->head, (uintptr_t) new_head, memory_order_release);
}
free(popped);
return QUEUE_SUCCESS;
}
```
取出佇列的動作由 `queue_pop` 完成,函式首先將 `q->head` 的初始值存儲於 `popped` 指標上,並將 `popped` 的值複製到 `compare` 並進行 CAS。
- 佇列要先透過 `queue_has_front` 函式確認有元素,否則會產生斷言錯誤。
- 若 `compare` 的值與佇列尾相同,則代表這個佇列只有一個元素。
- 佇列尾會被設為 0,且將 `popped` 的值再次複製到 `compare` 一次。這個操作我不清楚原因,但我認為是為了確保接下來與佇列頭的比較可以順利在 MPSC 的情況下執行。
- 由於佇列尾被設為 0,因此當有其它執行緒同時進行 `queue_push` 時,佇列會被判定為「沒有元素」的情況,進而更改了佇列頭的值。因此,進行 `q->head` 與 compare 的比較時結果可能不相同。因此函式再次進行一次 CAS,若佇列頭的值沒有被更動則將其設為 0。
- 若 `compare` 的值與佇列尾不同,則佇列有多個元素。
- 新建一個指標 `new_head` 用來複製原本佇列頭的下一個值。由於佇列頭的下一個值可能還沒有被存入,即有執行緒還在 `queue_push` 的 `atomic_store` 區塊,因此這邊函式利用 while 迴圈讓 `new_head` 反覆存取 `popped->next` 直到其非 0。
- 當 `new_head` 存取到值後,`q->head` 會存入 `new_head` 的值而完成操作。
當以上操作執行完後 `popped` 指標的記憶體會被釋放並回傳取出成功。
### 以 ThreadSanitizer 驗證
無修改運行時程式不會提示任何錯誤而順利結束。若在 Makefile 內加上 `-fsanitize=thread` 標籤後運行,當程式進行到 Stress Test 區塊可以看到終端機發現有 data race 發生在 `queue_pop` 與 `queue_push` 函式,進而導致程式無法順利結束。
```
WARNING: ThreadSanitizer: data race (pid=16033)
Write of size 8 at 0x7b0400000800 by thread T1:
#0 free ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:711 (libtsan.so.0+0x37ab8)
#1 queue_pop /home/gaberil0903/linux2023/hungry-birds/queue.c:79 (pc-test+0x1fde)
Previous write of size 8 at 0x7b0400000800 by thread T2:
#0 malloc ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:655 (libtsan.so.0+0x31c57)
#1 queue_push /home/gaberil0903/linux2023/hungry-birds/queue.c:87 (pc-test+0x205f)
Thread T1 (tid=16035, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:129 (pc-test+0x19dd)
Thread T2 (tid=16036, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:133 (pc-test+0x1a55)
SUMMARY: ThreadSanitizer: data race /home/gaberil0903/linux2023/hungry-birds/queue.c:79 in queue_pop
```
```
WARNING: ThreadSanitizer: data race (pid=16033)
Write of size 8 at 0x7b0400000808 by thread T1:
#0 free ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:711 (libtsan.so.0+0x37ab8)
#1 queue_pop /home/gaberil0903/linux2023/hungry-birds/queue.c:79 (pc-test+0x1fde)
Previous write of size 8 at 0x7b0400000808 by thread T2:
[failed to restore the stack]
Thread T1 (tid=16035, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:129 (pc-test+0x19dd)
Thread T2 (tid=16036, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:133 (pc-test+0x1a55)
SUMMARY: ThreadSanitizer: data race /home/gaberil0903/linux2023/hungry-birds/queue.c:79 in queue_pop
```
以 GDB 分析過後可以發現儘管進行了多次的 `queue_push`,data race 總是會在第一次的 `queue_pop` 的 free 函式執行時發生,原因疑似是 `queue_pop` 執行 `free` 函式時,`queue_push` 同時對記憶體進行 `malloc`。
我嘗試利用 `atomic_test_and_set` 函式,設置一個 `atomic_flag` 來達到類似互斥鎖的效果去執行問題段,雖未能解決大部分問題,但 data race 的發生時間有所變化。
```
WARNING: ThreadSanitizer: data race (pid=9070)
Write of size 8 at 0x7b0400000800 by thread T1:
#0 free ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:711 (libtsan.so.0+0x37ab8)
#1 queue_pop /home/gaberil0903/linux2023/hungry-birds/queue.c:81 (pc-test+0x1fee)
Previous write of size 8 at 0x7b0400000800 by thread T2:
#0 malloc ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:655 (libtsan.so.0+0x31c57)
#1 queue_push /home/gaberil0903/linux2023/hungry-birds/queue.c:92 (pc-test+0x208a)
Thread T1 (tid=9072, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:130 (pc-test+0x19dd)
Thread T2 (tid=9073, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:134 (pc-test+0x1a55)
SUMMARY: ThreadSanitizer: data race /home/gaberil0903/linux2023/hungry-birds/queue.c:81 in queue_pop
```
第一次的 data race 一樣發生在第一次的 free 與 malloc 同時進行。
```
WARNING: ThreadSanitizer: data race (pid=9070)
Write of size 8 at 0x7b0400000818 by thread T1:
#0 free ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:711 (libtsan.so.0+0x37ab8)
#1 queue_pop /home/gaberil0903/linux2023/hungry-birds/queue.c:81 (pc-test+0x1fee)
Previous write of size 1 at 0x7b040000081b by thread T2:
#0 memcpy ../../../../src/libsanitizer/sanitizer_common/sanitizer_common_interceptors.inc:827 (libtsan.so.0+0x6243e)
#1 memcpy ../../../../src/libsanitizer/sanitizer_common/sanitizer_common_interceptors.inc:819 (libtsan.so.0+0x6243e)
#2 queue_push /home/gaberil0903/linux2023/hungry-birds/queue.c:102 (pc-test+0x20f4)
Thread T1 (tid=9072, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:130 (pc-test+0x19dd)
Thread T2 (tid=9073, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:134 (pc-test+0x1a55)
SUMMARY: ThreadSanitizer: data race /home/gaberil0903/linux2023/hungry-birds/queue.c:81 in queue_pop
```
第二次的 data race 為新出現的問題,在第二次的 free 與 memcpy 同時進行的情況下發生。
```
WARNING: ThreadSanitizer: data race (pid=9070)
Write of size 8 at 0x7b040002e0d0 by thread T1:
#0 free ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:711 (libtsan.so.0+0x37ab8)
#1 queue_pop /home/gaberil0903/linux2023/hungry-birds/queue.c:81 (pc-test+0x1fee)
Previous write of size 8 at 0x7b040002e0d0 by thread T11:
[failed to restore the stack]
Thread T1 (tid=9072, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:130 (pc-test+0x19dd)
Thread T11 (tid=9082, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 stress_test /home/gaberil0903/linux2023/hungry-birds/pc-test.c:134 (pc-test+0x1a55)
SUMMARY: ThreadSanitizer: data race /home/gaberil0903/linux2023/hungry-birds/queue.c:81 in queue_pop
```
第三次的 data race 與原本的第二次 data race 為同種問題,但發生時間被延後到數次 free 函式後,而非原本的在第一次 free 發生的情況。
:::warning
以 atomics 保護 critical section
:notes: jserv
:::