# `READ_ONCE` 實驗
###### tags: `linux2022`
[並行程式設計: Atomics 操作](https://hackmd.io/@sysprog/concurrency-atomics#Memory-Ordering-%E5%92%8C-Barrier) 提到 在 Linux 核心使用 `volatile` 的場景,絕大部分是誤用,搭配 [Why the "volatile" type class should not be used](https://www.kernel.org/doc/Documentation/process/volatile-considered-harmful.rst) 可以整理如下:
* volatile 不能視為一種簡單的 atomic 變數
* volatile 會抑制編譯器最佳化,要求編譯器每次使用該變數時,都要從記憶體地址中讀出最新內容,但不能保證 CPU 不會遭遇重排
* 當程式碼寫得得宜,volatile 只會拖慢速度,可以使用 spinlocks, mutexes, memory barriers 等方法,spinlocks 的舉例如下
```c
spin_lock(&the_lock);
do_something_on(&shared_data);
do_something_else_with(&shared_data);
spin_unlock(&the_lock);
```
跟著 [並行程式設計: Atomics 操作](https://hackmd.io/@sysprog/concurrency-atomics#Memory-Ordering-%E5%92%8C-Barrier) 做實驗,實作 [Dekker’s Algorithm](https://en.wikipedia.org/wiki/Dekker%27s_algorithm)
將 `flag1`、`flag2`、`counter` 定義為 `volatile ` 的 Global 變數
```c
static volatile int flag1 = 0, flag2 = 0, turn = 1;
static volatile int counter = 0;
int loop_cnt;
```
當 `dekker1` 要執行時就舉旗,並將 `ture` 交給 `dekker2`,要做的事情就是 `counter++`
```c
static void dekker1(void)
{
flag1 = 1;
turn = 2;
// __atomic_thread_fence(__ATOMIC_SEQ_CST);
while ((flag2 == 1) && (turn == 2))
;
/* critical section */
counter++;
/* let the other task run */
flag1 = 0;
}
static void dekker2(void)
{
flag2 = 1;
turn = 1;
// __atomic_thread_fence(__ATOMIC_SEQ_CST);
while ((flag1 == 1) && (turn == 1))
;
/* critical section */
counter++;
/* leave critical section */
flag2 = 0;
}
static void *task1(void *arg)
{
printf("Starting %s\n", __func__);
for (int i = loop_cnt; i > 0; i--)
dekker1();
return NULL;
}
static void *task2(void *arg)
{
printf("Starting %s\n", __func__);
for (int i = loop_cnt; i > 0; i--)
dekker2();
return NULL;
}
```
`expected_sum = 2 * loop_cnt` 主要是我們的預期總和與迴圈次數設定關係
```c
int main(int argc, char **argv)
{
pthread_t thread1, thread2;
if (argc != 2) {
fprintf(stderr, "Usage: %s <loopcount>\n", argv[0]);
exit(1);
}
loop_cnt = atoi(argv[1]); /* FIXME: format checks */
int expected_sum = 2 * loop_cnt;
(void) pthread_create(&thread1, NULL, task1, NULL);
(void) pthread_create(&thread2, NULL, task2, NULL);
void *ret;
(void) pthread_join(thread1, &ret);
(void) pthread_join(thread2, &ret);
printf("Both threads terminated\n");
/* Check result */
if (counter != expected_sum) {
printf("[-] Dekker did not work, sum %d rather than %d.\n", counter,
expected_sum);
printf("%d missed updates due to memory consistency races.\n",
(expected_sum - counter));
return 1;
}
printf("[+] Dekker worked.\n");
return 0;
}
```
實驗結果如下
```shell
$ gcc -O2 -o dekker dekker.c -lpthread
$ ./dekker 10000000
Starting task1
Starting task2
Both threads terminated
[-] Dekker did not work, sum 19999792 rather than 20000000.
208 missed updates due to memory consistency races.
```
觀察以下的組合語言做了什麼事
```c
flag1 = 1;
turn = 2;
while ((flag2 == 1) && (turn == 2))
```
```
L5:
movl $1, flag1(%rip)
movl $2, turn(%rip)
jmp .L4
.L11:
movl turn(%rip), %eax
cmpl $2, %eax
jne .L3
.L4:
movl flag2(%rip), %eax
cmpl $1, %eax
je .L11
```
可以看見 `volatile` 只保證將變數讀寫至主記憶體,而 CPU 在執行過程中重排,使得該 load 操作讀到「舊值」,從而導致混亂,若要輸出正確,改用 `__atomic_thread_fence(__ATOMIC_SEQ_CST)` 即可,是 memory barrier 的作法
* Data Transfer (mov instruction) : Moves data between processor & memory (loads and saves variables between processor and memory)
來源 : [Intro to x86 Instruction Set](https://ee.usc.edu/~redekopp/cs356/slides/CS356Unit4_x86_ISA.pdf)
* When a thread reaches a barrier, it will wait at the barrier until all the threads reach the barrier, and then they'll all proceed together.
來源 : [Synchronization, Part 6: Implementing a barrier](https://github.com/angrave/SystemProgramming/wiki/Synchronization,-Part-6:-Implementing-a-barrier)
:::info
TODO 了解 `__atomic_thread_fence` 與 release acquire 同步的關係
:::
終於可以了解 `READ_ONCE` 與 `WRITE_ONCE` 的用途
[include/linux/compiler.h](https://github.com/torvalds/linux/blob/5bfc75d92efd494db37f5c4c173d3639d4772966/tools/include/linux/compiler.h)
```c
#define READ_ONCE(x)
({
union { typeof(x) __val; char __c[1]; } __u =
{ .__c = { 0 } };
// 注意是用 sizeof(x)
__read_once_size(&(x), __u.__c, sizeof(x));
__u.__val;
})
#define WRITE_ONCE(x, val)
({
union { typeof(x) __val; char __c[1]; } __u =
{ .__val = (val) };
__write_once_size(&(x), __u.__c, sizeof(x));
__u.__val;
})
static __always_inline void __read_once_size(const volatile void *p, void *res, int size)
{
switch (size) {
case 1: *(__u8_alias_t *) res = *(volatile __u8_alias_t *) p; break;
case 2: *(__u16_alias_t *) res = *(volatile __u16_alias_t *) p; break;
case 4: *(__u32_alias_t *) res = *(volatile __u32_alias_t *) p; break;
case 8: *(__u64_alias_t *) res = *(volatile __u64_alias_t *) p; break;
default:
barrier();
__builtin_memcpy((void *)res, (const void *)p, size);
barrier();
}
}
static __always_inline void __write_once_size(volatile void *p, void *res, int size)
{
switch (size) {
case 1: *(volatile __u8_alias_t *) p = *(__u8_alias_t *) res; break;
case 2: *(volatile __u16_alias_t *) p = *(__u16_alias_t *) res; break;
case 4: *(volatile __u32_alias_t *) p = *(__u32_alias_t *) res; break;
case 8: *(volatile __u64_alias_t *) p = *(__u64_alias_t *) res; break;
default:
barrier();
__builtin_memcpy((void *)p, (const void *)res, size);
barrier();
}
}
```