Try   HackMD

Linux 核心專題: 《Concurrency Primer》校訂和範例撰寫

執行人: PlusThousand0107
專題解說錄影

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
提問清單

  • ?

任務簡述

檢視〈Concurrency Primer〉草稿並記錄閱讀過程中的疑惑,嘗試用 C11 Atomics 改寫原有程式碼,並確保在 x86-64 及 QEMU/Arm 正確運作。預計產出:

  • 以 C11 Atomics 實作經典 lock-free 程式
  • 探討 memory ordering/barrier

TODO: 校訂〈Concurrency Primer〉

檢視〈Concurrency Primer〉草稿並記錄閱讀過程中的疑惑,,參見〈Concurrency Primer〉導讀,確保原本的 C++ 程式都以 C11 Atomics 改寫並驗證。

改寫時,先列出原本的 C++11 程式碼,隨後則是改寫後的 C11 Atomics 實作

以 C11 Atomics 改寫

Enforcing law and order

原始程式碼 :

int v;
bool v_ready = false;

void threadA()
{
    // Write the value
    // and set its ready flag.
    v = 42;
    v_ready = true;
}

void threadB()
{
    // Await a value change and read it.
    while (!v_ready) { /* wait */ }
    const int my_v = v;
    // Do something with my_v...
}

這裡無法保證在 v 被設為 42 之後能馬上將 v_ready 設為 true ,可能會有其他執行序在 v_ready 被更改前搶先執行,這就會導致最後的答案錯誤。

以 C11 Atomics 改寫後 :

#include <stdatomic.h>

atomic_int v;
atomic_bool v_ready = ATOMIC_VAR_INIT(false);

void threadA()
{
    atomic_store(&v, 42);
    atomic_store(&v_ready, true);
}

void threadB()
{
    while (!atomic_load(&v_ready)) { /* wait */ }
    const atomic_int my_v = atomic_load(&v);
}

原本 int 以及 bool 的變數型態在這裡會用 atomic_intatomic_bool 來取代,且存取值的時候不會直接存取,而是使用 atomic_store()atomic_load() 的方式來載入和儲存。

Test and set

原始程式碼 :

std::atomic_flag af;

void lock()
{
    while (af.test_and_set()) { /* wait */ }
}

void unlock() { af.clear(); }

這裡 af 可以決定是否讓程式執行或是等待,如果 af 的上一個值為 false 則表示可以進入並執行,如果是 true 的話則表示目前正被占用,必須等待直到釋放。

以 C11 Atomics 改寫後 :

#include <stdatomic.h>

atomic_flag af = ATOMIC_FLAG_INIT;

void lock()
{
    while (atomic_flag_test_and_set(&af)) { /* wait */ }
}

void unlock()
{
    atomic_flag_clear(&af);
}

af.test_and_set()af.clear() 改成 atomic_flag_test_and_set()atomic_flag_clear()

Compare and swap

原始程式碼 :

enum class TaskState : int8_t {
    Idle, Running, Cancelled
};

std::atomic<TaskState> ts;

void taskLoop()
{
    ts = TaskState::Running;
    while (ts == TaskState::Running) {
    // Do good work.
    }
}

bool cancel()
{
    auto expected = TaskState::Running;
    return ts.compare_exchange_strong(expected, TaskState::Cancelled);
}
  1. TakeState 有 3 種狀態,分別是 IdleRunningCancelled ,這裡用 ts 紀錄當前狀態。
  2. taskLoop() 為一個迴圈,讓 task 持續執行。
  3. cancel() 可以取消正在執行的 task ,當 tsexpected 相等時,會更新 tsTaskState::Cancelled ,並回傳 true ,若不相等則不對 ts 做修改且直接回傳 false

以 C11 Atomics 改寫後 :

#include <stdatomic.h>

enum TaskState {
    Idle, Running, Cancelled
};

atomic_int_least8_t ts;

void taskLoop()
{
    atomic_store(&ts, Running);
    while (atomic_load(&ts) == Running) {
        // Do good work.
    }
}

bool cancel()
{
    int expected = Running;
    return atomic_compare_exchange_strong(&ts, &expected, Cancelled);
}

CAS 的部分會從原來的 ts.compare_exchange_strong() 改成 atomic_compare_exchange_strong()

TODO: 以 C11 Atomics 實作經典 lock-free 程式

課程教材選定 lock-free 程式碼 (例如第 15 週測驗題),以 C11 Atomics 實作並解釋運作原理,需要用 ThreadSanitizer 驗證,作為日後教材的補充內容。

lock-free 實作與解釋 (以第 15 週測驗題為例)

spsc_queue 結構

typedef struct spsc_queue {
    counter_t head; /* Mostly accessed by producer */
    volatile uint32_t batch_head;
    counter_t tail __ALIGN; /* Mostly accessed by consumer */
    volatile uint32_t batch_tail;
    unsigned long batch_history;

    /* For testing purpose */
    uint64_t start_c __ALIGN;
    uint64_t stop_c;

    element_t data[SPSC_QUEUE_SIZE] __ALIGN; /* accessed by prod and coms */
} __ALIGN spsc_queue_t;
  • 這邊以 ring buffer 的資料結構來實作, ring buffer 的特性為容量固定且頭尾相接,適合用於需 FIFO (先進先出) 的狀況,透過 counter_t 來紀錄讀取端 (開始) 以及寫入端 (結束) 的索引,可以幫助判斷當下佇列是否為空 (empty) 或滿 (full)。
  • head : 主要供 enqueue 使用,會追蹤 producer 的位置,並記錄下一個要寫入的索引。
  • tail : 主要供 dequeue 使用,會追蹤 consumer 的位置,並記錄下一個要讀取的索引。
  • batch_history : 負責記錄 batch_size
  • start_cstop_c : 於測試時使用,負責量測操作的執行時間。
  • data[SPSC_QUEUE_SIZE] : 保存 spsc_queue 的資料,供之後 producerconsumer 存取使用。

dequeue 函式

if (self->tail.r == self->batch_tail) {
    uint32_t tmp_tail = self->tail.r + SPSC_BATCH_SIZE;
    if (tmp_tail >= SPSC_QUEUE_SIZE) {
        tmp_tail = 0;
        if (self->batch_history < SPSC_BATCH_SIZE) {
            self->batch_history =
                (SPSC_BATCH_SIZE < (self->batch_history + SPSC_BATCH_INCREAMENT)) ? 
                 SPSC_BATCH_SIZE : (self->batch_history + SPSC_BATCH_INCREAMENT);
        }
    }

    batch_size = self->batch_history;
    while (!(self->data[tmp_tail])) {
        wait_ticks(SPSC_CONGESTION_PENALTY);

        batch_size >>= 1;
        if (batch_size == 0)
            return SPSC_Q_EMPTY;

        tmp_tail = self->tail.r + batch_size;
        if (tmp_tail >= SPSC_QUEUE_SIZE)
            tmp_tail = 0;
    }
    self->batch_history = batch_size;

    if (tmp_tail == self->tail.r)
        tmp_tail = (tmp_tail + 1) >= SPSC_QUEUE_SIZE ? 0 : tmp_tail + 1;
    self->batch_tail = tmp_tail;
}
  1. 首先,會先確認 self->tail.rself->batch_tail 是否相等。
  2. 如果相等的話,會先用 tmp_tail 暫存一個可能的位置,但若 tmp_tail 超過或是等於 SPSC_QUEUE_SIZE 的話,則將 tmp_tail 設為 0。此外,如果 self->batch_history 小於 SPSC_BATCH_SIZE 的話,會把 self->batch_historySPSC_BATCH_INCREAMENT 相加,但不能超過 SPSC_BATCH_SIZE
  3. 接下來,會透過迴圈尋找元素位置,每次迴圈都會執行 wait_ticks(SPSC_CONGESTION_PENALTY) 來防止 data race 發生 ( 這裡 SPSC_CONGESTION_PENALTY 等於 1000 ),如果 batch_size 等於 0 的話表示佇列為空並直接回傳 SPSC_Q_EMPTY ,否則會將 tmp_tail 設為 self->tail.r + batch_size 繼續找下去。
*val_ptr = self->data[self->tail.r];
self->data[self->tail.r] = SPSC_QUEUE_ELEMENT_ZERO;
self->tail.w++;

這裡 *val_ptr 會指向最尾端的資料 self->data[self->tail.r] ,接著, self->data[self->tail.r] 會被 SPSC_QUEUE_ELEMENT_ZERO 清空,因為移除了一個元素所以要做 self->tail.w++

enqueue 函式

if (self->head.r == self->batch_head) {
    uint32_t tmp_head = self->head.r + SPSC_BATCH_SIZE;
    if (tmp_head >= SPSC_QUEUE_SIZE)
        tmp_head = 0;

    if (self->data[tmp_head]) {
        /* run spin cycle penality */
        wait_ticks(SPSC_CONGESTION_PENALTY);
        return SPSC_Q_FULL;
    }
    self->batch_head = tmp_head;
}
  1. 先檢查 self->head.rself->batch_head 是否相等。
  2. 如果相等,會先用 tmp_head 暫存可能的位置,若超過或等於 SPSC_QUEUE_SIZE 的話,則會被設為 0。
  3. 如果 self->data[tmp_head] 不為 0 ,就表示裡面已經有存放別的值了,也就是滿了的意思,這時會用 wait_ticks 延遲 SPSC_CONGESTION_PENALTY 的時間,並且回傳 SPSC_Q_FULL
  4. 如果沒有滿的話則會執行 self->batch_head = tmp_head ,並將值寫入 self->data[self->head.r] ,因為新增了一個所以要用 self->head.w++

consumer 函式

cpu_set_t cur_mask;
CPU_ZERO(&cur_mask);
CPU_SET(cpu_id * 2, &cur_mask);

printf("consumer %d:  ---%d----\n", cpu_id, 2 * cpu_id);
if (sched_setaffinity(0, sizeof(cur_mask), &cur_mask) < 0) {
    printf("Error: sched_setaffinity\n");
    return NULL;
}

sched_setaffinity() 能讓執行序在指定的 CPU 上執行,第一個參數為 pid ,設為 0 表示當前的 process 。如果最後 sched_setaffinity() 的回傳小於 0 ,則表示配置失敗,會顯示錯誤資訊並終止。

queues[cpu_id].start_c = read_tsc();

for (uint64_t i = 1; i <= TEST_SIZE; i++) {
    while (dequeue(&queues[cpu_id], &value) != 0)
        ;

    assert((old_value + 1) == value);
    old_value = value;
}
queues[cpu_id].stop_c = read_tsc();
  1. queues[cpu_id]start_c 紀錄當下時間作為開始的時間。
  2. 這裡會執行 TEST_SIZE 次,且如果 dequeue() 的回傳結果不等於 0 就表示目前還不是 SPSC_OK ,所以會被卡住,不會繼續往下執行。
  3. queues[cpu_id]stop_c 紀錄結束時間。
  4. 最後會用 queues[cpu_id]stop_c 時間減去 start_c 時間並除以 (TEST_SIZE + 1) 來表示操作所花費的平均 cycle 。

producer 函式

uint64_t start_p = read_tsc();

for (uint64_t i = 1; i <= TEST_SIZE + SPSC_BATCH_SIZE; i++) {
    for (int32_t j = 1; j < num; j++) {
        element_t value = i;
        while (enqueue(&queues[j], value) != 0)
            ;
    }
}
uint64_t stop_p = read_tsc();
  1. uint64_t 型態的 start_c 紀錄當下時間作為開始的時間。
  2. 這裡有兩個迴圈,外層的會執行 TEST_SIZE + SPSC_BATCH_SIZE ,內層的則會根據傳入的引數 num 來決定要跑幾輪。和 consumer 一樣,當 enqueue() 的回傳結果不等於 0 時也會被卡住不繼續往下執行。
  3. uint64_t 型態的 stop_c 紀錄結束時間。

執行結果

編譯 (需加上 -lpthread) :

$ gcc -Wall -O2 -I. -o  main main.c -lpthread

輸出 (這裡的 max_threads = 2) :

$ ./main
producer 0:  ---1----
consumer 1:  ---2----
Consumer created...
producer 10 cycles/op
consumer: 10 cycles/op
Done!

從結果上可以看到 producerconsumer 所分配到的 CPU ID ,以及執行所花費的平均 cycle。

使用 ThreadSanitizer

編譯 (需再加上 -fsanitize=thread) :

$ gcc -Wall -O2 -I. -o  main main.c -lpthread -fsanitize=thread

輸出 :

$ ./main
producer 0:  ---1----
consumer 1:  ---2----
Consumer created...
==================
WARNING: ThreadSanitizer: data race (pid=5072)
  Write of size 4 at 0x55b40fe99284 by main thread:
    #0 producer <null> (main+0x1bf2)
    #1 main <null> (main+0x13f1)

  Previous read of size 4 at 0x55b40fe99284 by thread T1:
    #0 consumer <null> (main+0x18ee)

  Location is global 'queues' of size 263680 at 0x55b40fe91100 (main+0x00000000c284)

  Thread T1 (tid=5074, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
    #1 main <null> (main+0x13ae)

SUMMARY: ThreadSanitizer: data race (/home/plusthousand0107/main+0x1bf2) in producer
==================
==================
WARNING: ThreadSanitizer: data race (pid=5072)
  Read of size 4 at 0x55b40fe99280 by thread T1:
    #0 consumer <null> (main+0x174d)

  Previous write of size 4 at 0x55b40fe99280 by main thread:
    #0 producer <null> (main+0x1bf2)
    #1 main <null> (main+0x13f1)

  Location is global 'queues' of size 263680 at 0x55b40fe91100 (main+0x00000000c280)

  Thread T1 (tid=5074, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
    #1 main <null> (main+0x13ae)

SUMMARY: ThreadSanitizer: data race (/home/plusthousand0107/main+0x174d) in consumer
==================
==================
WARNING: ThreadSanitizer: data race (pid=5072)
  Write of size 4 at 0x55b40fe99a80 by main thread:
    #0 producer <null> (main+0x1bf2)
    #1 main <null> (main+0x13f1)

  Previous read of size 4 at 0x55b40fe99a80 by thread T1:
    #0 consumer <null> (main+0x1869)

  Location is global 'queues' of size 263680 at 0x55b40fe91100 (main+0x00000000ca80)

  Thread T1 (tid=5074, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
    #1 main <null> (main+0x13ae)

SUMMARY: ThreadSanitizer: data race (/home/plusthousand0107/main+0x1bf2) in producer
==================

可以看到運行至 producerconsumer 時會觸發 data race 而導致程式無法正常結束,所以接下來會以 C11 Atomics 來改寫。

以 C11 Atomics 改寫

counter_tspsc_queue 結構

typedef union {
    atomic_uint w;
    atomic_uint r;
} counter_t;

#define __ALIGN __attribute__((aligned(64)))

typedef struct spsc_queue {
    counter_t head; /* Mostly accessed by producer */
    atomic_uint batch_head;
    counter_t tail __ALIGN; /* Mostly accessed by consumer */
    atomic_uint batch_tail;
    unsigned long batch_history;

    /* For testing purpose */
    uint64_t start_c __ALIGN;
    uint64_t stop_c;

    element_t data[SPSC_QUEUE_SIZE] __ALIGN; /* accessed by prod and coms */
} __ALIGN spsc_queue_t;

原先使用 volatile 的部分在這裡都被給成了 atomic_uint

dequeue 函式

static int dequeue(spsc_queue_t *self, element_t *val_ptr)
{
    unsigned long batch_size = self->batch_history;
    *val_ptr = SPSC_QUEUE_ELEMENT_ZERO;

    /* Try to zero in on the next batch tail */
    if (atomic_load(&self->tail.r) == atomic_load(&self->batch_tail)) {
        uint32_t tmp_tail = atomic_load(&self->tail.r) + SPSC_BATCH_SIZE;
        if (tmp_tail >= SPSC_QUEUE_SIZE) {
            tmp_tail = 0;
            if (self->batch_history < SPSC_BATCH_SIZE) {
                self->batch_history =
                    (SPSC_BATCH_SIZE <
                     (self->batch_history + SPSC_BATCH_INCREAMENT))
                        ? SPSC_BATCH_SIZE
                        : (self->batch_history + SPSC_BATCH_INCREAMENT);
            }
        }

        batch_size = self->batch_history;
        while (!(self->data[tmp_tail])) {
            wait_ticks(SPSC_CONGESTION_PENALTY);

            batch_size >>= 1;
            if (batch_size == 0)
                return SPSC_Q_EMPTY;

            tmp_tail = atomic_load(&self->tail.r) + batch_size;
            if (tmp_tail >= SPSC_QUEUE_SIZE)
                tmp_tail = 0;
        }
        self->batch_history = batch_size;

        if (tmp_tail == atomic_load(&self->tail.r))
            tmp_tail = (tmp_tail + 1) >= SPSC_QUEUE_SIZE ? 0 : tmp_tail + 1;
        atomic_store(&self->batch_tail, tmp_tail);
    }

    /* Actually pull out the data element. */
    *val_ptr = self->data[atomic_load(&self->tail.r)];
    self->data[atomic_load(&self->tail.r)] = SPSC_QUEUE_ELEMENT_ZERO;
    atomic_fetch_add(&self->tail.w, 1);
    if (atomic_load(&self->tail.r) >= SPSC_QUEUE_SIZE)
        atomic_store(&self->tail.w, 0);

    return SPSC_OK;
}

可以發現到本來可以直接比較的 self->tail.rself->batch_tail 在這裡需先經過 atomic_load() 載入後才能進行比較。儲存時也是一樣,必須用 atomic_store() 來存以防存入的結果不同。此外,做計算時也會改用 atomic_fetch_add()

enqueue 函式

static int enqueue(spsc_queue_t *self, element_t value)
{
    /* Try to zero in on the next batch head. */
    if (atomic_load(&self->head.r) == atomic_load(&self->batch_head)) {
        uint32_t tmp_head = atomic_load(&self->head.r) + SPSC_BATCH_SIZE;
        if (tmp_head >= SPSC_QUEUE_SIZE)
            tmp_head = 0;

        if (self->data[tmp_head]) {
            /* run spin cycle penalty */
            wait_ticks(SPSC_CONGESTION_PENALTY);
            return SPSC_Q_FULL;
        }
        atomic_store(&self->batch_head, tmp_head);
    }

    self->data[atomic_load(&self->head.r)] = value;
    atomic_fetch_add(&self->head.w, 1);
    if (atomic_load(&self->head.r) >= SPSC_QUEUE_SIZE)
        atomic_store(&self->head.w, 0);

    return SPSC_OK;
}

執行結果

編譯 :

$ gcc -Wall -O2 -I. -o  main spsc_atomics.c -lpthread

輸出 :

$ ./main
producer 0:  ---1----
consumer 1:  ---2----
Consumer created...
producer 39 cycles/op
consumer: 39 cycles/op
Done!

可以看到改成 C11 Atomics 後會犧牲掉一些效能,所以花費的平均 cycle 從原先的 10 增加到了 39。

這樣的解釋不足以反映出你的變更,請善用 uftrace 一類的工具找出具體原因

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
jserv

使用 ThreadSanitizer

編譯 (需再加上 -fsanitize=thread) :

$ gcc -Wall -O2 -I. -o  main spsc_atomics.c -lpthread -fsanitize=thread

輸出 :

$ ./main
producer 0:  ---1----
consumer 1:  ---2----
Consumer created...
==================
WARNING: ThreadSanitizer: data race (pid=16259)
  Write of size 4 at 0x5594bc6e6284 by main thread:
    #0 producer <null> (main+0x1bf4)
    #1 main <null> (main+0x1451)

  Previous read of size 4 at 0x5594bc6e6284 by thread T1:
    #0 consumer <null> (main+0x191c)

  Location is global 'queues' of size 263680 at 0x5594bc6de100 (main+0x00000000c284)

  Thread T1 (tid=16261, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
    #1 main <null> (main+0x140e)

SUMMARY: ThreadSanitizer: data race (/home/plusthousand0107/main+0x1bf4) in producer
==================
==================
WARNING: ThreadSanitizer: data race (pid=16259)
  Write of size 4 at 0x5594bc6e6288 by main thread:
    #0 producer <null> (main+0x1bf4)
    #1 main <null> (main+0x1451)

  Previous read of size 4 at 0x5594bc6e6288 by thread T1:
    [failed to restore the stack]

  Location is global 'queues' of size 263680 at 0x5594bc6de100 (main+0x00000000c288)

  Thread T1 (tid=16261, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
    #1 main <null> (main+0x140e)

SUMMARY: ThreadSanitizer: data race (/home/plusthousand0107/main+0x1bf4) in producer
==================
==================
WARNING: ThreadSanitizer: data race (pid=16259)
  Read of size 4 at 0x5594bc6e6280 by thread T1:
    #0 consumer <null> (main+0x17d3)

  Previous write of size 4 at 0x5594bc6e6280 by main thread:
    #0 producer <null> (main+0x1bf4)
    #1 main <null> (main+0x1451)

  Location is global 'queues' of size 263680 at 0x5594bc6de100 (main+0x00000000c280)

  Thread T1 (tid=16261, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
    #1 main <null> (main+0x140e)

SUMMARY: ThreadSanitizer: data race (/home/plusthousand0107/main+0x17d3) in consumer
==================
==================
WARNING: ThreadSanitizer: data race (pid=16259)
  Read of size 4 at 0x5594bc6e6a84 by thread T1:
    #0 consumer <null> (main+0x189b)

  Previous write of size 4 at 0x5594bc6e6a84 by main thread:
    #0 producer <null> (main+0x1bf4)
    #1 main <null> (main+0x1451)

  Location is global 'queues' of size 263680 at 0x5594bc6de100 (main+0x00000000ca84)

  Thread T1 (tid=16261, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
    #1 main <null> (main+0x140e)

SUMMARY: ThreadSanitizer: data race (/home/plusthousand0107/main+0x189b) in consumer
==================
==================
WARNING: ThreadSanitizer: data race (pid=16259)
  Read of size 4 at 0x5594bc6e6700 by thread T1:
    #0 consumer <null> (main+0x17d3)

  Previous write of size 4 at 0x5594bc6e6700 by main thread:
    [failed to restore the stack]

  Location is global 'queues' of size 263680 at 0x5594bc6de100 (main+0x00000000c700)

  Thread T1 (tid=16261, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
    #1 main <null> (main+0x140e)

SUMMARY: ThreadSanitizer: data race (/home/plusthousand0107/main+0x17d3) in consumer
==================
==================
WARNING: ThreadSanitizer: data race (pid=16259)
  Read of size 4 at 0x5594bc6e7284 by thread T1:
    #0 consumer <null> (main+0x189b)

  Previous write of size 4 at 0x5594bc6e7284 by main thread:
    [failed to restore the stack]

  Location is global 'queues' of size 263680 at 0x5594bc6de100 (main+0x00000000d284)

  Thread T1 (tid=16261, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
    #1 main <null> (main+0x140e)

SUMMARY: ThreadSanitizer: data race (/home/plusthousand0107/main+0x189b) in consumer
==================

雖然已經加上 atomics 但仍有 data race 的問題,這表示可能還有需加上 atomics 保護的部分沒有被修改到。

TODO: 探討 memory ordering/barrier

研讀以下材料:

紀錄認知和疑惑,並回答以下:

  • compiler barrier 的作用:編譯器最佳化如何影響 memory order,舉例說明並對照 C 語言規格 (以 C11 為主)
  • memory barrier 的考量因素和 Linux 核心的處置方式
  • 搭配 diy/litmus 來解說 memory ordering 並驗證

compiler barrier

  • compiler barrier 是 memory barrier 的一種 (另一種為 CPU barrier) ,用於控制編譯器對程式碼進行的優化,像是 volatile

    • volatile 會告訴編譯器定義的變數可能會隨時改變,所以編譯完後程式若想要存取這個變數就要直接去這個變數的記憶體位址讀取
    • 如果編譯器為了達到最佳化而去使用暫存器內的值,則可能導致最後的結果與預期有所不同,因為暫存器內的值有可能被其他執行序修改。
  • 編譯器最佳化可能會影響到讀取記憶體的順序,像是指令重排列 (instruction reordering) 。

    • 指令重排列主要是希望透過改變指令的順序來提升指令的平行度以及效能,但這就可能造成執行序讀取記憶體時結果不一致的問題,所以為了讓指令能照順序執行,可以使用 compiler barrier 來防止重排。
  • 例子

    1. 沒使用 volatile
    ​​​​#include <stdio.h>
    
    ​​​​int main() {
    ​​​​    int a = 1, b = 2, c = 0, d = 0;
    ​​​​    printf("%d", a + b);
    ​​​​    a = b;
    ​​​​    c = b;
    ​​​​    d = b;
    ​​​​    printf("%d", c + d);
    ​​​​    return 0;
    ​​​​}
    

    這裡使用 $ gcc -S -O2 -masm=intel test_without_volatile.c 來編譯,產生出的組合語言如下。

    ​​​​main:
    ​​​​.LFB23:
    ​​​​    .cfi_startproc
    ​​​​    endbr64
    ​​​​    push rbp
    ​​​​    .cfi_def_cfa_offset 16
    ​​​​    .cfi_offset 6, -16
    ​​​​    lea rbp, .LC0[rip]
    ​​​​    mov edx, 3
    ​​​​    xor eax, eax
    ​​​​    mov rsi, rbp
    ​​​​    mov edi, 1
    ​​​​    call __printf_chk@PLT
    ​​​​    mov rsi, rbp
    ​​​​    mov edx, 4
    ​​​​    xor eax, eax
    ​​​​    mov edi, 1
    ​​​​    call __printf_chk@PLT
    ​​​​    xor eax, eax
    ​​​​    pop rbp
    ​​​​    .cfi_def_cfa_offset 8
    ​​​​    ret
    ​​​​    .cfi_endproc
    
    1. 使用 volatile
    ​​​​#include <stdio.h>
    
    ​​​​int main() {
    ​​​​    volatile int a = 1, b = 2, c = 0, d = 0;
    ​​​​    printf("%d", a + b);
    ​​​​    a = b;
    ​​​​    c = b;
    ​​​​    d = b;
    ​​​​    printf("%d", c + d);
    ​​​​    return 0;
    ​​​​}
    

    這裡也是用 $ gcc -S -O2 -masm=intel test_with_volatile.c 來編譯並產生組合語言。

    ​​​​main:
    ​​​​.LFB23:
    ​    .cfi_startproc
    ​    endbr64
    ​    push	rbp
    ​    .cfi_def_cfa_offset 16
    ​    .cfi_offset 6, -16
    ​    lea	rbp, .LC0[rip]
    ​    mov	edi, 1
    ​    mov	rsi, rbp
    ​    sub	rsp, 16
    ​    .cfi_def_cfa_offset 32
    ​    mov	DWORD PTR [rsp], 1
    ​    mov	DWORD PTR 4[rsp], 2
    ​    mov	DWORD PTR 8[rsp], 0
    ​    mov	DWORD PTR 12[rsp], 0
    ​    mov	edx, DWORD PTR [rsp]
    ​    mov	eax, DWORD PTR 4[rsp]
    ​    add	edx, eax
    ​    xor	eax, eax
    ​    call	__printf_chk@PLT
    ​    mov	eax, DWORD PTR 4[rsp]
    ​    mov	rsi, rbp
    ​    mov	edi, 1
    ​    mov	DWORD PTR [rsp], eax
    ​    mov	eax, DWORD PTR 4[rsp]
    ​    mov	DWORD PTR 8[rsp], eax
    ​    mov	eax, DWORD PTR 4[rsp]
    ​    mov	DWORD PTR 12[rsp], eax
    ​    mov	edx, DWORD PTR 8[rsp]
    ​    mov	eax, DWORD PTR 12[rsp]
    ​    add	edx, eax
    ​    xor	eax, eax
    ​    call	__printf_chk@PLT
    ​    add	rsp, 16
    ​    .cfi_def_cfa_offset 16
    ​    xor	eax, eax
    ​    pop	rbp
    ​    .cfi_def_cfa_offset 8
    ​    ret
    ​    .cfi_endproc
    

    可以發現到有加 volatile 的組合語言會用 mov DWORD PTR 的方式來存取資料,所以轉換後的組合語言明顯比沒加 volatile 的要更長一點,這主要和少了重排列的優化有關,但這樣所帶來的好處就是保障了變數資料的正確性,不會因為執行序順序上的不同而有錯誤的結果。

memory barrier

  • memory barrier 能維持對記憶體存取時的順序,避免優化而導致指令重排的問題。

    • 以 ARM 為例,barrier 會將指令分成前後兩半,前面的指令只能在 barrier 前執行,而 barrier 後的則只能在後面執行,不會發生 barrier 前的指令跑到 barrier 後執行或是 barrier 後的跑到 barrier 前執行的情況。
  • memory barrier 所需考量因素 :

    1. 在不同的平台架構可能會有不同的 memory barrier 可供支援,所以應先了解該平台所對應的支援指令再從中選出適合的。
    2. 使用 memory barrier 時可能會為了保持執行的正確性而有需要 stall 的狀況,這將會對效能造成影響,因此在效能和正確性之間應該思考如何做到權衡。
    3. 加入 memory barrier 後可能會使程式看起來更複雜,導致可讀性下降,不利於維護。