# `READ_ONCE` 實驗 ###### tags: `linux2022` [並行程式設計: Atomics 操作](https://hackmd.io/@sysprog/concurrency-atomics#Memory-Ordering-%E5%92%8C-Barrier) 提到 在 Linux 核心使用 `volatile` 的場景,絕大部分是誤用,搭配 [Why the "volatile" type class should not be used](https://www.kernel.org/doc/Documentation/process/volatile-considered-harmful.rst) 可以整理如下: * volatile 不能視為一種簡單的 atomic 變數 * volatile 會抑制編譯器最佳化,要求編譯器每次使用該變數時,都要從記憶體地址中讀出最新內容,但不能保證 CPU 不會遭遇重排 * 當程式碼寫得得宜,volatile 只會拖慢速度,可以使用 spinlocks, mutexes, memory barriers 等方法,spinlocks 的舉例如下 ```c spin_lock(&the_lock); do_something_on(&shared_data); do_something_else_with(&shared_data); spin_unlock(&the_lock); ``` 跟著 [並行程式設計: Atomics 操作](https://hackmd.io/@sysprog/concurrency-atomics#Memory-Ordering-%E5%92%8C-Barrier) 做實驗,實作 [Dekker’s Algorithm](https://en.wikipedia.org/wiki/Dekker%27s_algorithm) 將 `flag1`、`flag2`、`counter` 定義為 `volatile ` 的 Global 變數 ```c static volatile int flag1 = 0, flag2 = 0, turn = 1; static volatile int counter = 0; int loop_cnt; ``` 當 `dekker1` 要執行時就舉旗,並將 `ture` 交給 `dekker2`,要做的事情就是 `counter++` ```c static void dekker1(void) { flag1 = 1; turn = 2; // __atomic_thread_fence(__ATOMIC_SEQ_CST); while ((flag2 == 1) && (turn == 2)) ; /* critical section */ counter++; /* let the other task run */ flag1 = 0; } static void dekker2(void) { flag2 = 1; turn = 1; // __atomic_thread_fence(__ATOMIC_SEQ_CST); while ((flag1 == 1) && (turn == 1)) ; /* critical section */ counter++; /* leave critical section */ flag2 = 0; } static void *task1(void *arg) { printf("Starting %s\n", __func__); for (int i = loop_cnt; i > 0; i--) dekker1(); return NULL; } static void *task2(void *arg) { printf("Starting %s\n", __func__); for (int i = loop_cnt; i > 0; i--) dekker2(); return NULL; } ``` `expected_sum = 2 * loop_cnt` 主要是我們的預期總和與迴圈次數設定關係 ```c int main(int argc, char **argv) { pthread_t thread1, thread2; if (argc != 2) { fprintf(stderr, "Usage: %s <loopcount>\n", argv[0]); exit(1); } loop_cnt = atoi(argv[1]); /* FIXME: format checks */ int expected_sum = 2 * loop_cnt; (void) pthread_create(&thread1, NULL, task1, NULL); (void) pthread_create(&thread2, NULL, task2, NULL); void *ret; (void) pthread_join(thread1, &ret); (void) pthread_join(thread2, &ret); printf("Both threads terminated\n"); /* Check result */ if (counter != expected_sum) { printf("[-] Dekker did not work, sum %d rather than %d.\n", counter, expected_sum); printf("%d missed updates due to memory consistency races.\n", (expected_sum - counter)); return 1; } printf("[+] Dekker worked.\n"); return 0; } ``` 實驗結果如下 ```shell $ gcc -O2 -o dekker dekker.c -lpthread $ ./dekker 10000000 Starting task1 Starting task2 Both threads terminated [-] Dekker did not work, sum 19999792 rather than 20000000. 208 missed updates due to memory consistency races. ``` 觀察以下的組合語言做了什麼事 ```c flag1 = 1; turn = 2; while ((flag2 == 1) && (turn == 2)) ``` ``` L5: movl $1, flag1(%rip) movl $2, turn(%rip) jmp .L4 .L11: movl turn(%rip), %eax cmpl $2, %eax jne .L3 .L4: movl flag2(%rip), %eax cmpl $1, %eax je .L11 ``` 可以看見 `volatile` 只保證將變數讀寫至主記憶體,而 CPU 在執行過程中重排,使得該 load 操作讀到「舊值」,從而導致混亂,若要輸出正確,改用 `__atomic_thread_fence(__ATOMIC_SEQ_CST)` 即可,是 memory barrier 的作法 * Data Transfer (mov instruction) : Moves data between processor & memory (loads and saves variables between processor and memory) 來源 : [Intro to x86 Instruction Set](https://ee.usc.edu/~redekopp/cs356/slides/CS356Unit4_x86_ISA.pdf) * When a thread reaches a barrier, it will wait at the barrier until all the threads reach the barrier, and then they'll all proceed together. 來源 : [Synchronization, Part 6: Implementing a barrier](https://github.com/angrave/SystemProgramming/wiki/Synchronization,-Part-6:-Implementing-a-barrier) :::info TODO 了解 `__atomic_thread_fence` 與 release acquire 同步的關係 ::: 終於可以了解 `READ_ONCE` 與 `WRITE_ONCE` 的用途 [include/linux/compiler.h](https://github.com/torvalds/linux/blob/5bfc75d92efd494db37f5c4c173d3639d4772966/tools/include/linux/compiler.h) ```c #define READ_ONCE(x) ({ union { typeof(x) __val; char __c[1]; } __u = { .__c = { 0 } }; // 注意是用 sizeof(x) __read_once_size(&(x), __u.__c, sizeof(x)); __u.__val; }) #define WRITE_ONCE(x, val) ({ union { typeof(x) __val; char __c[1]; } __u = { .__val = (val) }; __write_once_size(&(x), __u.__c, sizeof(x)); __u.__val; }) static __always_inline void __read_once_size(const volatile void *p, void *res, int size) { switch (size) { case 1: *(__u8_alias_t *) res = *(volatile __u8_alias_t *) p; break; case 2: *(__u16_alias_t *) res = *(volatile __u16_alias_t *) p; break; case 4: *(__u32_alias_t *) res = *(volatile __u32_alias_t *) p; break; case 8: *(__u64_alias_t *) res = *(volatile __u64_alias_t *) p; break; default: barrier(); __builtin_memcpy((void *)res, (const void *)p, size); barrier(); } } static __always_inline void __write_once_size(volatile void *p, void *res, int size) { switch (size) { case 1: *(volatile __u8_alias_t *) p = *(__u8_alias_t *) res; break; case 2: *(volatile __u16_alias_t *) p = *(__u16_alias_t *) res; break; case 4: *(volatile __u32_alias_t *) p = *(__u32_alias_t *) res; break; case 8: *(volatile __u64_alias_t *) p = *(__u64_alias_t *) res; break; default: barrier(); __builtin_memcpy((void *)p, (const void *)res, size); barrier(); } } ```