# Linux 核心專題: 《Concurrency Primer》校訂和範例撰寫
> 執行人: PlusThousand0107
> [專題解說錄影](https://youtu.be/hvrH77DLo0Y)
:::success
:question: 提問清單
* ?
:::
## 任務簡述
檢視〈Concurrency Primer〉草稿並記錄閱讀過程中的疑惑,嘗試用 C11 Atomics 改寫原有程式碼,並確保在 x86-64 及 QEMU/Arm 正確運作。預計產出:
* 以 C11 Atomics 實作經典 lock-free 程式
* 探討 memory ordering/barrier
## TODO: 校訂〈Concurrency Primer〉
檢視〈Concurrency Primer〉草稿並記錄閱讀過程中的疑惑,,參見[〈Concurrency Primer〉導讀](https://hackmd.io/@sysprog/concurrency-primer),確保原本的 C++ 程式都以 C11 Atomics 改寫並驗證。
> 改寫時,先列出原本的 C++11 程式碼,隨後則是改寫後的 C11 Atomics 實作
### 以 C11 Atomics 改寫
#### Enforcing law and order
原始程式碼 :
```cpp
int v;
bool v_ready = false;
void threadA()
{
// Write the value
// and set its ready flag.
v = 42;
v_ready = true;
}
void threadB()
{
// Await a value change and read it.
while (!v_ready) { /* wait */ }
const int my_v = v;
// Do something with my_v...
}
```
這裡無法保證在 `v` 被設為 42 之後能馬上將 `v_ready` 設為 `true` ,可能會有其他執行序在 `v_ready` 被更改前搶先執行,這就會導致最後的答案錯誤。
以 C11 Atomics 改寫後 :
```c
#include <stdatomic.h>
atomic_int v;
atomic_bool v_ready = ATOMIC_VAR_INIT(false);
void threadA()
{
atomic_store(&v, 42);
atomic_store(&v_ready, true);
}
void threadB()
{
while (!atomic_load(&v_ready)) { /* wait */ }
const atomic_int my_v = atomic_load(&v);
}
```
原本 `int` 以及 `bool` 的變數型態在這裡會用 `atomic_int` 和 `atomic_bool` 來取代,且存取值的時候不會直接存取,而是使用 `atomic_store()` 和 `atomic_load()` 的方式來載入和儲存。
#### Test and set
原始程式碼 :
```cpp
std::atomic_flag af;
void lock()
{
while (af.test_and_set()) { /* wait */ }
}
void unlock() { af.clear(); }
```
這裡 `af` 可以決定是否讓程式執行或是等待,如果 `af` 的上一個值為 `false` 則表示可以進入並執行,如果是 `true` 的話則表示目前正被占用,必須等待直到釋放。
以 C11 Atomics 改寫後 :
```c
#include <stdatomic.h>
atomic_flag af = ATOMIC_FLAG_INIT;
void lock()
{
while (atomic_flag_test_and_set(&af)) { /* wait */ }
}
void unlock()
{
atomic_flag_clear(&af);
}
```
將 `af.test_and_set()` 和 `af.clear()` 改成 `atomic_flag_test_and_set()` 和 `atomic_flag_clear()` 。
#### Compare and swap
原始程式碼 :
```cpp
enum class TaskState : int8_t {
Idle, Running, Cancelled
};
std::atomic<TaskState> ts;
void taskLoop()
{
ts = TaskState::Running;
while (ts == TaskState::Running) {
// Do good work.
}
}
bool cancel()
{
auto expected = TaskState::Running;
return ts.compare_exchange_strong(expected, TaskState::Cancelled);
}
```
1. `TakeState` 有 3 種狀態,分別是 `Idle` 、 `Running` 、 `Cancelled` ,這裡用 `ts` 紀錄當前狀態。
2. `taskLoop()` 為一個迴圈,讓 task 持續執行。
3. `cancel()` 可以取消正在執行的 task ,當 `ts` 和 `expected` 相等時,會更新 `ts` 為 `TaskState::Cancelled` ,並回傳 `true` ,若不相等則不對 `ts` 做修改且直接回傳 `false` 。
以 C11 Atomics 改寫後 :
```c
#include <stdatomic.h>
enum TaskState {
Idle, Running, Cancelled
};
atomic_int_least8_t ts;
void taskLoop()
{
atomic_store(&ts, Running);
while (atomic_load(&ts) == Running) {
// Do good work.
}
}
bool cancel()
{
int expected = Running;
return atomic_compare_exchange_strong(&ts, &expected, Cancelled);
}
```
CAS 的部分會從原來的 `ts.compare_exchange_strong()` 改成 `atomic_compare_exchange_strong()` 。
## TODO: 以 C11 Atomics 實作經典 lock-free 程式
從[課程教材](http://wiki.csie.ncku.edu.tw/linux/schedule)選定 lock-free 程式碼 (例如第 15 週測驗題),以 C11 Atomics 實作並解釋運作原理,需要用 ThreadSanitizer 驗證,作為日後教材的補充內容。
### lock-free 實作與解釋 (以第 15 週測驗題為例)
#### `spsc_queue` 結構
```c
typedef struct spsc_queue {
counter_t head; /* Mostly accessed by producer */
volatile uint32_t batch_head;
counter_t tail __ALIGN; /* Mostly accessed by consumer */
volatile uint32_t batch_tail;
unsigned long batch_history;
/* For testing purpose */
uint64_t start_c __ALIGN;
uint64_t stop_c;
element_t data[SPSC_QUEUE_SIZE] __ALIGN; /* accessed by prod and coms */
} __ALIGN spsc_queue_t;
```
- 這邊以 ring buffer 的資料結構來實作, ring buffer 的特性為容量固定且頭尾相接,適合用於需 FIFO (先進先出) 的狀況,透過 `counter_t` 來紀錄讀取端 (開始) 以及寫入端 (結束) 的索引,可以幫助判斷當下佇列是否為空 (empty) 或滿 (full)。
- `head` : 主要供 `enqueue` 使用,會追蹤 `producer` 的位置,並記錄下一個要寫入的索引。
- `tail` : 主要供 `dequeue` 使用,會追蹤 `consumer` 的位置,並記錄下一個要讀取的索引。
- `batch_history` : 負責記錄 `batch_size`。
- `start_c` 、 `stop_c` : 於測試時使用,負責量測操作的執行時間。
- `data[SPSC_QUEUE_SIZE]` : 保存 `spsc_queue` 的資料,供之後 `producer` 和 `consumer` 存取使用。
#### `dequeue` 函式
```c
if (self->tail.r == self->batch_tail) {
uint32_t tmp_tail = self->tail.r + SPSC_BATCH_SIZE;
if (tmp_tail >= SPSC_QUEUE_SIZE) {
tmp_tail = 0;
if (self->batch_history < SPSC_BATCH_SIZE) {
self->batch_history =
(SPSC_BATCH_SIZE < (self->batch_history + SPSC_BATCH_INCREAMENT)) ?
SPSC_BATCH_SIZE : (self->batch_history + SPSC_BATCH_INCREAMENT);
}
}
batch_size = self->batch_history;
while (!(self->data[tmp_tail])) {
wait_ticks(SPSC_CONGESTION_PENALTY);
batch_size >>= 1;
if (batch_size == 0)
return SPSC_Q_EMPTY;
tmp_tail = self->tail.r + batch_size;
if (tmp_tail >= SPSC_QUEUE_SIZE)
tmp_tail = 0;
}
self->batch_history = batch_size;
if (tmp_tail == self->tail.r)
tmp_tail = (tmp_tail + 1) >= SPSC_QUEUE_SIZE ? 0 : tmp_tail + 1;
self->batch_tail = tmp_tail;
}
```
1. 首先,會先確認 `self->tail.r` 和 `self->batch_tail` 是否相等。
2. 如果相等的話,會先用 `tmp_tail` 暫存一個可能的位置,但若 `tmp_tail` 超過或是等於 `SPSC_QUEUE_SIZE` 的話,則將 `tmp_tail` 設為 0。此外,如果 `self->batch_history` 小於 `SPSC_BATCH_SIZE` 的話,會把 `self->batch_history` 和 `SPSC_BATCH_INCREAMENT` 相加,但不能超過 `SPSC_BATCH_SIZE`。
3. 接下來,會透過迴圈尋找元素位置,每次迴圈都會執行 `wait_ticks(SPSC_CONGESTION_PENALTY)` 來防止 `data race` 發生 ( 這裡 `SPSC_CONGESTION_PENALTY` 等於 `1000` ),如果 `batch_size` 等於 0 的話表示佇列為空並直接回傳 `SPSC_Q_EMPTY` ,否則會將 `tmp_tail` 設為 `self->tail.r + batch_size` 繼續找下去。
```c
*val_ptr = self->data[self->tail.r];
self->data[self->tail.r] = SPSC_QUEUE_ELEMENT_ZERO;
self->tail.w++;
```
這裡 `*val_ptr` 會指向最尾端的資料 `self->data[self->tail.r]` ,接著, `self->data[self->tail.r]` 會被 `SPSC_QUEUE_ELEMENT_ZERO` 清空,因為移除了一個元素所以要做 `self->tail.w++` 。
#### `enqueue` 函式
```c
if (self->head.r == self->batch_head) {
uint32_t tmp_head = self->head.r + SPSC_BATCH_SIZE;
if (tmp_head >= SPSC_QUEUE_SIZE)
tmp_head = 0;
if (self->data[tmp_head]) {
/* run spin cycle penality */
wait_ticks(SPSC_CONGESTION_PENALTY);
return SPSC_Q_FULL;
}
self->batch_head = tmp_head;
}
```
1. 先檢查 `self->head.r` 和 `self->batch_head` 是否相等。
2. 如果相等,會先用 `tmp_head` 暫存可能的位置,若超過或等於 `SPSC_QUEUE_SIZE` 的話,則會被設為 0。
3. 如果 `self->data[tmp_head]` 不為 0 ,就表示裡面已經有存放別的值了,也就是滿了的意思,這時會用 `wait_ticks` 延遲 `SPSC_CONGESTION_PENALTY` 的時間,並且回傳 `SPSC_Q_FULL` 。
4. 如果沒有滿的話則會執行 `self->batch_head = tmp_head` ,並將值寫入 `self->data[self->head.r]` ,因為新增了一個所以要用 `self->head.w++` 。
#### `consumer` 函式
```c
cpu_set_t cur_mask;
CPU_ZERO(&cur_mask);
CPU_SET(cpu_id * 2, &cur_mask);
printf("consumer %d: ---%d----\n", cpu_id, 2 * cpu_id);
if (sched_setaffinity(0, sizeof(cur_mask), &cur_mask) < 0) {
printf("Error: sched_setaffinity\n");
return NULL;
}
```
`sched_setaffinity()` 能讓執行序在指定的 CPU 上執行,第一個參數為 `pid` ,設為 0 表示當前的 process 。如果最後 `sched_setaffinity()` 的回傳小於 0 ,則表示配置失敗,會顯示錯誤資訊並終止。
```c
queues[cpu_id].start_c = read_tsc();
for (uint64_t i = 1; i <= TEST_SIZE; i++) {
while (dequeue(&queues[cpu_id], &value) != 0)
;
assert((old_value + 1) == value);
old_value = value;
}
queues[cpu_id].stop_c = read_tsc();
```
1. 用 `queues[cpu_id]` 的 `start_c` 紀錄當下時間作為開始的時間。
2. 這裡會執行 `TEST_SIZE` 次,且如果 `dequeue()` 的回傳結果不等於 0 就表示目前還不是 `SPSC_OK` ,所以會被卡住,不會繼續往下執行。
3. 用 `queues[cpu_id]` 的 `stop_c` 紀錄結束時間。
4. 最後會用 `queues[cpu_id]` 的 `stop_c` 時間減去 `start_c` 時間並除以 `(TEST_SIZE + 1)` 來表示操作所花費的平均 cycle 。
#### `producer` 函式
```c
uint64_t start_p = read_tsc();
for (uint64_t i = 1; i <= TEST_SIZE + SPSC_BATCH_SIZE; i++) {
for (int32_t j = 1; j < num; j++) {
element_t value = i;
while (enqueue(&queues[j], value) != 0)
;
}
}
uint64_t stop_p = read_tsc();
```
1. 用 `uint64_t` 型態的 `start_c` 紀錄當下時間作為開始的時間。
2. 這裡有兩個迴圈,外層的會執行 `TEST_SIZE + SPSC_BATCH_SIZE` ,內層的則會根據傳入的引數 `num` 來決定要跑幾輪。和 `consumer` 一樣,當 `enqueue()` 的回傳結果不等於 0 時也會被卡住不繼續往下執行。
3. 用 `uint64_t` 型態的 `stop_c` 紀錄結束時間。
#### 執行結果
編譯 (需加上 `-lpthread`) :
```c
$ gcc -Wall -O2 -I. -o main main.c -lpthread
```
輸出 (這裡的 `max_threads = 2`) :
```c
$ ./main
producer 0: ---1----
consumer 1: ---2----
Consumer created...
producer 10 cycles/op
consumer: 10 cycles/op
Done!
```
從結果上可以看到 `producer` 和 `consumer` 所分配到的 CPU ID ,以及執行所花費的平均 cycle。
#### 使用 ThreadSanitizer
編譯 (需再加上 `-fsanitize=thread`) :
```c
$ gcc -Wall -O2 -I. -o main main.c -lpthread -fsanitize=thread
```
輸出 :
```c
$ ./main
producer 0: ---1----
consumer 1: ---2----
Consumer created...
==================
WARNING: ThreadSanitizer: data race (pid=5072)
Write of size 4 at 0x55b40fe99284 by main thread:
#0 producer <null> (main+0x1bf2)
#1 main <null> (main+0x13f1)
Previous read of size 4 at 0x55b40fe99284 by thread T1:
#0 consumer <null> (main+0x18ee)
Location is global 'queues' of size 263680 at 0x55b40fe91100 (main+0x00000000c284)
Thread T1 (tid=5074, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 main <null> (main+0x13ae)
SUMMARY: ThreadSanitizer: data race (/home/plusthousand0107/main+0x1bf2) in producer
==================
==================
WARNING: ThreadSanitizer: data race (pid=5072)
Read of size 4 at 0x55b40fe99280 by thread T1:
#0 consumer <null> (main+0x174d)
Previous write of size 4 at 0x55b40fe99280 by main thread:
#0 producer <null> (main+0x1bf2)
#1 main <null> (main+0x13f1)
Location is global 'queues' of size 263680 at 0x55b40fe91100 (main+0x00000000c280)
Thread T1 (tid=5074, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 main <null> (main+0x13ae)
SUMMARY: ThreadSanitizer: data race (/home/plusthousand0107/main+0x174d) in consumer
==================
==================
WARNING: ThreadSanitizer: data race (pid=5072)
Write of size 4 at 0x55b40fe99a80 by main thread:
#0 producer <null> (main+0x1bf2)
#1 main <null> (main+0x13f1)
Previous read of size 4 at 0x55b40fe99a80 by thread T1:
#0 consumer <null> (main+0x1869)
Location is global 'queues' of size 263680 at 0x55b40fe91100 (main+0x00000000ca80)
Thread T1 (tid=5074, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 main <null> (main+0x13ae)
SUMMARY: ThreadSanitizer: data race (/home/plusthousand0107/main+0x1bf2) in producer
==================
```
可以看到運行至 `producer` 和 `consumer` 時會觸發 data race 而導致程式無法正常結束,所以接下來會以 C11 Atomics 來改寫。
### 以 C11 Atomics 改寫
#### `counter_t` 和 `spsc_queue` 結構
```c
typedef union {
atomic_uint w;
atomic_uint r;
} counter_t;
#define __ALIGN __attribute__((aligned(64)))
typedef struct spsc_queue {
counter_t head; /* Mostly accessed by producer */
atomic_uint batch_head;
counter_t tail __ALIGN; /* Mostly accessed by consumer */
atomic_uint batch_tail;
unsigned long batch_history;
/* For testing purpose */
uint64_t start_c __ALIGN;
uint64_t stop_c;
element_t data[SPSC_QUEUE_SIZE] __ALIGN; /* accessed by prod and coms */
} __ALIGN spsc_queue_t;
```
原先使用 `volatile` 的部分在這裡都被給成了 `atomic_uint`。
#### `dequeue` 函式
```c
static int dequeue(spsc_queue_t *self, element_t *val_ptr)
{
unsigned long batch_size = self->batch_history;
*val_ptr = SPSC_QUEUE_ELEMENT_ZERO;
/* Try to zero in on the next batch tail */
if (atomic_load(&self->tail.r) == atomic_load(&self->batch_tail)) {
uint32_t tmp_tail = atomic_load(&self->tail.r) + SPSC_BATCH_SIZE;
if (tmp_tail >= SPSC_QUEUE_SIZE) {
tmp_tail = 0;
if (self->batch_history < SPSC_BATCH_SIZE) {
self->batch_history =
(SPSC_BATCH_SIZE <
(self->batch_history + SPSC_BATCH_INCREAMENT))
? SPSC_BATCH_SIZE
: (self->batch_history + SPSC_BATCH_INCREAMENT);
}
}
batch_size = self->batch_history;
while (!(self->data[tmp_tail])) {
wait_ticks(SPSC_CONGESTION_PENALTY);
batch_size >>= 1;
if (batch_size == 0)
return SPSC_Q_EMPTY;
tmp_tail = atomic_load(&self->tail.r) + batch_size;
if (tmp_tail >= SPSC_QUEUE_SIZE)
tmp_tail = 0;
}
self->batch_history = batch_size;
if (tmp_tail == atomic_load(&self->tail.r))
tmp_tail = (tmp_tail + 1) >= SPSC_QUEUE_SIZE ? 0 : tmp_tail + 1;
atomic_store(&self->batch_tail, tmp_tail);
}
/* Actually pull out the data element. */
*val_ptr = self->data[atomic_load(&self->tail.r)];
self->data[atomic_load(&self->tail.r)] = SPSC_QUEUE_ELEMENT_ZERO;
atomic_fetch_add(&self->tail.w, 1);
if (atomic_load(&self->tail.r) >= SPSC_QUEUE_SIZE)
atomic_store(&self->tail.w, 0);
return SPSC_OK;
}
```
可以發現到本來可以直接比較的 `self->tail.r` 和 `self->batch_tail` 在這裡需先經過 `atomic_load()` 載入後才能進行比較。儲存時也是一樣,必須用 `atomic_store()` 來存以防存入的結果不同。此外,做計算時也會改用 `atomic_fetch_add()` 。
#### `enqueue` 函式
```c
static int enqueue(spsc_queue_t *self, element_t value)
{
/* Try to zero in on the next batch head. */
if (atomic_load(&self->head.r) == atomic_load(&self->batch_head)) {
uint32_t tmp_head = atomic_load(&self->head.r) + SPSC_BATCH_SIZE;
if (tmp_head >= SPSC_QUEUE_SIZE)
tmp_head = 0;
if (self->data[tmp_head]) {
/* run spin cycle penalty */
wait_ticks(SPSC_CONGESTION_PENALTY);
return SPSC_Q_FULL;
}
atomic_store(&self->batch_head, tmp_head);
}
self->data[atomic_load(&self->head.r)] = value;
atomic_fetch_add(&self->head.w, 1);
if (atomic_load(&self->head.r) >= SPSC_QUEUE_SIZE)
atomic_store(&self->head.w, 0);
return SPSC_OK;
}
```
#### 執行結果
編譯 :
```shell
$ gcc -Wall -O2 -I. -o main spsc_atomics.c -lpthread
```
輸出 :
```shell
$ ./main
producer 0: ---1----
consumer 1: ---2----
Consumer created...
producer 39 cycles/op
consumer: 39 cycles/op
Done!
```
可以看到改成 C11 Atomics 後會犧牲掉一些效能,所以花費的平均 cycle 從原先的 10 增加到了 39。
:::warning
這樣的解釋不足以反映出你的變更,請善用 [uftrace](https://github.com/namhyung/uftrace) 一類的工具找出具體原因
:notes: jserv
:::
#### 使用 ThreadSanitizer
編譯 (需再加上 `-fsanitize=thread`) :
```c
$ gcc -Wall -O2 -I. -o main spsc_atomics.c -lpthread -fsanitize=thread
```
輸出 :
```c
$ ./main
producer 0: ---1----
consumer 1: ---2----
Consumer created...
==================
WARNING: ThreadSanitizer: data race (pid=16259)
Write of size 4 at 0x5594bc6e6284 by main thread:
#0 producer <null> (main+0x1bf4)
#1 main <null> (main+0x1451)
Previous read of size 4 at 0x5594bc6e6284 by thread T1:
#0 consumer <null> (main+0x191c)
Location is global 'queues' of size 263680 at 0x5594bc6de100 (main+0x00000000c284)
Thread T1 (tid=16261, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 main <null> (main+0x140e)
SUMMARY: ThreadSanitizer: data race (/home/plusthousand0107/main+0x1bf4) in producer
==================
==================
WARNING: ThreadSanitizer: data race (pid=16259)
Write of size 4 at 0x5594bc6e6288 by main thread:
#0 producer <null> (main+0x1bf4)
#1 main <null> (main+0x1451)
Previous read of size 4 at 0x5594bc6e6288 by thread T1:
[failed to restore the stack]
Location is global 'queues' of size 263680 at 0x5594bc6de100 (main+0x00000000c288)
Thread T1 (tid=16261, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 main <null> (main+0x140e)
SUMMARY: ThreadSanitizer: data race (/home/plusthousand0107/main+0x1bf4) in producer
==================
==================
WARNING: ThreadSanitizer: data race (pid=16259)
Read of size 4 at 0x5594bc6e6280 by thread T1:
#0 consumer <null> (main+0x17d3)
Previous write of size 4 at 0x5594bc6e6280 by main thread:
#0 producer <null> (main+0x1bf4)
#1 main <null> (main+0x1451)
Location is global 'queues' of size 263680 at 0x5594bc6de100 (main+0x00000000c280)
Thread T1 (tid=16261, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 main <null> (main+0x140e)
SUMMARY: ThreadSanitizer: data race (/home/plusthousand0107/main+0x17d3) in consumer
==================
==================
WARNING: ThreadSanitizer: data race (pid=16259)
Read of size 4 at 0x5594bc6e6a84 by thread T1:
#0 consumer <null> (main+0x189b)
Previous write of size 4 at 0x5594bc6e6a84 by main thread:
#0 producer <null> (main+0x1bf4)
#1 main <null> (main+0x1451)
Location is global 'queues' of size 263680 at 0x5594bc6de100 (main+0x00000000ca84)
Thread T1 (tid=16261, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 main <null> (main+0x140e)
SUMMARY: ThreadSanitizer: data race (/home/plusthousand0107/main+0x189b) in consumer
==================
==================
WARNING: ThreadSanitizer: data race (pid=16259)
Read of size 4 at 0x5594bc6e6700 by thread T1:
#0 consumer <null> (main+0x17d3)
Previous write of size 4 at 0x5594bc6e6700 by main thread:
[failed to restore the stack]
Location is global 'queues' of size 263680 at 0x5594bc6de100 (main+0x00000000c700)
Thread T1 (tid=16261, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 main <null> (main+0x140e)
SUMMARY: ThreadSanitizer: data race (/home/plusthousand0107/main+0x17d3) in consumer
==================
==================
WARNING: ThreadSanitizer: data race (pid=16259)
Read of size 4 at 0x5594bc6e7284 by thread T1:
#0 consumer <null> (main+0x189b)
Previous write of size 4 at 0x5594bc6e7284 by main thread:
[failed to restore the stack]
Location is global 'queues' of size 263680 at 0x5594bc6de100 (main+0x00000000d284)
Thread T1 (tid=16261, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 main <null> (main+0x140e)
SUMMARY: ThreadSanitizer: data race (/home/plusthousand0107/main+0x189b) in consumer
==================
```
雖然已經加上 atomics 但仍有 data race 的問題,這表示可能還有需加上 atomics 保護的部分沒有被修改到。
## TODO: 探討 memory ordering/barrier
研讀以下材料:
* [Linux-Kernel Memory Ordering: Help Arrives At Last!](http://www.rdrop.com/users/paulmck/scalability/paper/LinuxMM.2017.04.08b.Barcamp.pdf) / [video](https://www.youtube.com/watch?v=ULFytshTvIY)
* [I/O ordering 學習紀錄](https://hackmd.io/@butastur/rkkQEGjUN), [Memory ordering](https://hackmd.io/@Cbg1XpL0Rim8U6YOMdjgUg/rJ6NNKdqc)
* [Experiments on Concurrency - Happens-before](https://hackmd.io/@butastur/concurrency-happens-before)
* [Memory barriers in C](https://mariadb.org/wp-content/uploads/2017/11/2017-11-Memory-barriers.pdf)
紀錄認知和疑惑,並回答以下:
* compiler barrier 的作用:編譯器最佳化如何影響 memory order,舉例說明並對照 C 語言規格 (以 C11 為主)
* memory barrier 的考量因素和 Linux 核心的處置方式
* 搭配 diy/litmus 來解說 memory ordering 並驗證
### compiler barrier
- compiler barrier 是 memory barrier 的一種 (另一種為 CPU barrier) ,用於控制編譯器對程式碼進行的優化,像是 `volatile` 。
- `volatile` 會告訴編譯器定義的變數可能會隨時改變,所以編譯完後程式若想要存取這個變數就要直接去這個變數的記憶體位址讀取
- 如果編譯器為了達到最佳化而去使用暫存器內的值,則可能導致最後的結果與預期有所不同,因為暫存器內的值有可能被其他執行序修改。
- 編譯器最佳化可能會影響到讀取記憶體的順序,像是指令重排列 (instruction reordering) 。
- 指令重排列主要是希望透過改變指令的順序來提升指令的平行度以及效能,但這就可能造成執行序讀取記憶體時結果不一致的問題,所以為了讓指令能照順序執行,可以使用 `compiler barrier` 來防止重排。
- 例子
1. 沒使用 `volatile`
```c
#include <stdio.h>
int main() {
int a = 1, b = 2, c = 0, d = 0;
printf("%d", a + b);
a = b;
c = b;
d = b;
printf("%d", c + d);
return 0;
}
```
這裡使用 `$ gcc -S -O2 -masm=intel test_without_volatile.c` 來編譯,產生出的組合語言如下。
```asm
main:
.LFB23:
.cfi_startproc
endbr64
push rbp
.cfi_def_cfa_offset 16
.cfi_offset 6, -16
lea rbp, .LC0[rip]
mov edx, 3
xor eax, eax
mov rsi, rbp
mov edi, 1
call __printf_chk@PLT
mov rsi, rbp
mov edx, 4
xor eax, eax
mov edi, 1
call __printf_chk@PLT
xor eax, eax
pop rbp
.cfi_def_cfa_offset 8
ret
.cfi_endproc
```
2. 使用 `volatile`
```c
#include <stdio.h>
int main() {
volatile int a = 1, b = 2, c = 0, d = 0;
printf("%d", a + b);
a = b;
c = b;
d = b;
printf("%d", c + d);
return 0;
}
```
這裡也是用 `$ gcc -S -O2 -masm=intel test_with_volatile.c` 來編譯並產生組合語言。
```asm
main:
.LFB23:
.cfi_startproc
endbr64
push rbp
.cfi_def_cfa_offset 16
.cfi_offset 6, -16
lea rbp, .LC0[rip]
mov edi, 1
mov rsi, rbp
sub rsp, 16
.cfi_def_cfa_offset 32
mov DWORD PTR [rsp], 1
mov DWORD PTR 4[rsp], 2
mov DWORD PTR 8[rsp], 0
mov DWORD PTR 12[rsp], 0
mov edx, DWORD PTR [rsp]
mov eax, DWORD PTR 4[rsp]
add edx, eax
xor eax, eax
call __printf_chk@PLT
mov eax, DWORD PTR 4[rsp]
mov rsi, rbp
mov edi, 1
mov DWORD PTR [rsp], eax
mov eax, DWORD PTR 4[rsp]
mov DWORD PTR 8[rsp], eax
mov eax, DWORD PTR 4[rsp]
mov DWORD PTR 12[rsp], eax
mov edx, DWORD PTR 8[rsp]
mov eax, DWORD PTR 12[rsp]
add edx, eax
xor eax, eax
call __printf_chk@PLT
add rsp, 16
.cfi_def_cfa_offset 16
xor eax, eax
pop rbp
.cfi_def_cfa_offset 8
ret
.cfi_endproc
```
可以發現到有加 `volatile` 的組合語言會用 `mov DWORD PTR` 的方式來存取資料,所以轉換後的組合語言明顯比沒加 `volatile` 的要更長一點,這主要和少了重排列的優化有關,但這樣所帶來的好處就是保障了變數資料的正確性,不會因為執行序順序上的不同而有錯誤的結果。
### memory barrier
- memory barrier 能維持對記憶體存取時的順序,避免優化而導致指令重排的問題。
- 以 ARM 為例,barrier 會將指令分成前後兩半,前面的指令只能在 barrier 前執行,而 barrier 後的則只能在後面執行,不會發生 barrier 前的指令跑到 barrier 後執行或是 barrier 後的跑到 barrier 前執行的情況。
- memory barrier 所需考量因素 :
1. 在不同的平台架構可能會有不同的 memory barrier 可供支援,所以應先了解該平台所對應的支援指令再從中選出適合的。
2. 使用 memory barrier 時可能會為了保持執行的正確性而有需要 stall 的狀況,這將會對效能造成影響,因此在效能和正確性之間應該思考如何做到權衡。
3. 加入 memory barrier 後可能會使程式看起來更複雜,導致可讀性下降,不利於維護。