owned this note
owned this note
Published
Linked with GitHub
---
tags: linux2022
---
# 2022q1 Homework5 (quiz5)
contributed by < [ShienF](https://github.com/ShienF) >
> [課堂作業要求](https://hackmd.io/@sysprog/linux2022-homework5)
>
> 期末作業要求:
> 參照 https://github.com/sysprog21/concurrent-programs/tree/master/thread-rcu 的實作,移植到 quiz5(B) 的程式碼基礎上
## 測驗 2 - Hazard pointer
答案
EXP3 = list_remove(&dom->retired, ptr)
EXP4 = list_remove(&dom->retired, ptr)
```c
// Acquires one hazard pointer
static HPRecType * Acquire() {
// Try to reuse a retired HP record
HPRecType * p = pHead_;
for (; p; p = p->pNext_) {
if (p->active_ || //if 1 wil pass skip CAS, if 0 will check CAS
!CAS(&p->active_, 0, 1))
continue;
return p;
}
// Increment the list length
int oldLen;
do {
oldLen = listLen_;
} while (!CAS(&listLen_, oldLen, oldLen + 1)); //do until sync
// Allocate a new one
HPRecType * p = new HPRecType;
p->active_ = 1;
p->pHazard_ = 0;
// Push it to the front
do {
old = pHead_;
p->pNext_ = old;
} while (!CAS(&pHead_, old, p));
return p;
}
...
```
gcc -o hp HazardPointer.c -lpthread
讀取值時要把值得指標加入 hazard pointer list 裡, 表示有 reader 正在讀, 請勿刪除, 讀的過程中若遇到值被改掉, 就重新讀取, 確保讀的值是最新的
如果要更新值時, 先把舊值放到 retired list, 過段時間掃描 retired list, 若 hazard pointer list 裡沒有, 即可刪除
## [RCU](https://github.com/sysprog21/concurrent-programs/tree/master/thread-rcu)
### rcu.h
```c
/* Target to use Linux Kernel Memory Model (LKMM) for thread-rcu,
* C11 memory model might be not compatible with LKMM.
* Be careful about the architecture or OS you use.
* You can check the paper to see more detail:
*
* http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p0124r6.html
*/
#ifndef __RCU_H__
#define __RCU_H__
/* lock primitives derived from POSIX Threads and compiler primitives */
#include <pthread.h>
#include <stdatomic.h>
#include <stdio.h>
#include <stdlib.h>
typedef pthread_mutex_t spinlock_t;
#define SPINLOCK_INIT PTHREAD_MUTEX_INITIALIZER
static inline void spin_lock(spinlock_t *sp)
{
int ret;
ret = pthread_mutex_lock(sp);
if (ret != 0) {
fprintf(stderr, "spin_lock:pthread_mutex_lock %d\n", ret);
abort();
}
}
static inline void spin_unlock(spinlock_t *sp)
{
int ret;
ret = pthread_mutex_unlock(sp);
if (ret != 0) {
fprintf(stderr, "spin_unlock:pthread_mutex_unlock %d\n", ret);
abort();
}
}
#define current_tid() (uintptr_t) pthread_self()
/* Be careful here, since the C11 terms do no have the same sequential
* consistency for the smp_mb(). Here we use the closely C11 terms,
* memory_order_seq_cst.
*/
#define smp_mb() atomic_thread_fence(memory_order_seq_cst)
/* Compiler barrier, preventing the compiler from reordering memory accesses */
#define barrier() __asm__ __volatile__("" : : : "memory")
```
* pthread.h 中 mutex lock 的使用
* 首先建立一個 `pthread_mutex_t` type 的 mutex lock 變數
* 以此 `PTHREAD_MUTEX_INITIALIZER` 或[其他](http://www.skrenta.com/rt/man/pthread_mutex_init.3.html)將 mutex lock 變數初始化
* 將 mutex lock 變數上鎖:`pthread_mutex_lock(mutex lock 變數)`,回傳 0 表示成功上鎖,否則回傳 error code
* 將 mutex lock 變數解鎖:`pthread_mutex_unlock(mutex lock 變數)`,回傳 0 表示成功解鎖,否則回傳 error code
* 除了靠 atomic 指令,或 lock 機制也不見得能夠達到互斥存取共享資源,因為 CPU 及 compiler 為了提高程式執行效率,會在一定的範圍內重新排序指令,以下是抑制 reordering 的幾種方式
* `atomic_thread_fence(memory order)` 是一種 memory barrier 以防止 CPU 重排指令
memory order 分為以下幾種:[參考](https://hackmd.io/@sysprog/concurrency-atomics#%E8%BB%9F%E9%AB%94%E8%A7%80%E9%BB%9E)
|memory order| 作用|
|----|----|
|memory_order_relaxed | 沒有 memory barrier,允許編譯器和處理器充分發揮。若目前操作為 atomic,則只保證該 atomic 操作的順序|
|memory_order_consume| 後面依賴此 atomic 變數的存取指令,不得重排至此條指令之前|
|memory_order_acquire| 後面存取指令不得重排至此條指令之前|
|memory_order_release| 前面存取指令不得重排至此條指令之後。當此條指令的結果對其他執行緒可見後,之前的所有指令都可見|
|memory_order_acq_rel| acquire + release 語意|
|memory_order_seq_cst| acq_rel 語意外加所有使用 seq_cst 的指令有嚴格地順序,若沒指定 memory order,則預設採用|
* `asm volatile("" ::: "memory")` 是一個 compiler barrier
出現後的讀寫都確實地去記憶體位置上讀寫,不能偷懶用「屏障」之前的臨時結果。[參考](https://hackmd.io/@sysprog/concurrency-atomics#%E8%BB%9F%E9%AB%94%E8%A7%80%E9%BB%9E)
* `volatile` 加在變數宣告前的作用為告知編譯器在每次讀取該變數時,須重新至記憶體讀取,因為此值可能隨時變動,如此可以達成抑制編譯器最佳化。要注意的是除了抑制編譯器最佳化,也需讓 CPU 避免重排指令,才能真正達成互斥存取的效果。
```c
/* To access the shared variable use READ_ONCE() and WRITE_ONCE(). */
/* READ_ONCE() close to those of a C11 volatile memory_order_relaxed atomic
* read. However, for address, data, or control dependency chain, it is more
* like memory_order_consume. But, presently most of implementations promote
* those kind of thing to memory_order_acquire.
*/
#define READ_ONCE(x) \
({ \
barrier(); \
__typeof__(x) ___x = atomic_load_explicit( \
(volatile _Atomic __typeof__(x) *) &x, memory_order_consume); \
barrier(); \
___x; \
})
/* WRITE_ONCE() quite close to C11 volatile memory_order_relaxed atomic store */
#define WRITE_ONCE(x, val) \
do { \
atomic_store_explicit((volatile _Atomic __typeof__(x) *) &x, (val), \
memory_order_relaxed); \
} while (0)
#define smp_store_release(x, val) \
do { \
__typeof__(*x) ___x; \
atomic_store_explicit((volatile _Atomic __typeof__(___x) *) x, (val), \
memory_order_release); \
} while (0) //why 1st line??
#define atomic_fetch_add_release(x, v) \
({ \
__typeof__(*x) ___x; \
atomic_fetch_add_explicit((volatile _Atomic __typeof__(___x) *) x, v, \
memory_order_release); \
})
#define atomic_fetch_or_release(x, v) \
({ \
__typeof__(*x) ___x; \
atomic_fetch_or_explicit((volatile _Atomic __typeof__(___x) *) x, v, \
memory_order_release); //bitwise OR \
})
#define atomic_xchg_release(x, v) \
({ \
__typeof__(*x) ___x; \
atomic_exchange_explicit((volatile _Atomic __typeof__(___x) *) x, v, \
memory_order_release); // v->x return old x \
})
```
* macro `READ_ONCE(x)`
* 做的是把 `x` atomicly load 至新變數,並加入 memery barrier:後面依賴此 atomic 變數的存取指令,不得重排至此條指令之前。
* 共享資源變數前須加入 `_Atomic` (C11)此 qualifier,尚有由 <stdatomic.h> 提供更方便的 [qualifier + type 的宣告方式](https://en.cppreference.com/w/c/thread#Atomic_operations)。
* 兩個 `barrier()` 意在其後方的指令必定重新至記憶體讀取,無法使用前面的暫存。
* 利用 `__typeof__` 操作得到新變數型態,`typeof` 為 [GNU extension](https://gcc.gnu.org/onlinedocs/gcc/Typeof.html),若要使用於符合 ISO C 的程式中的 header file,必須改寫為 `__typeof__`。
* 由 <stdatomic.h> 提供的 [`atomic_load_explicit`](https://en.cppreference.com/w/c/atomic/atomic_load) 來 atomicly load 至新變數。
* macro `WRITE_ONCE(x, val)`
* 做的是把 `val` atomicly store 至 `x` 裡。
* macro `smp_store_release(x, val)` 與 `WRITE_ONCE(x, val)` 的其中一個差異為,`smp_store_release(x, val)` 的 `x` 為指標,store 至 `x` 所指的 obj 的裡,而 `WRITE_ONCE(x, val)` 直接 store 至 `x` 中。==為何這樣做?==
另一差異為 `smp_store_release(x, val)` 複製了一份 `x` 的直到新變數,==為何這樣做?(看起來與避免 double evaluation 無關)==
* [`atomic_store_explicit`](https://en.cppreference.com/w/c/atomic/atomic_store)
* macro `atomic_fetch_add_release(x, v)`
* 做的是把 `v` atomicly 加至 `x` 所指的 obj 裡,根據 [`atomic_fetch_add_explicit`](https://en.cppreference.com/w/c/atomic/atomic_fetch_add) 的操作回傳值為舊值,也就是被加之前的值。並加入 memery barrier:前面存取指令不得重排至此條指令之後。
* macro `atomic_fetch_or_release(x, v)`
* 做的是將 `x` 所指的 obj 對 `v` 做 bitwise OR 後的結果 atomicly load 至 `x` 所指的 obj 裡,根據 [`atomic_fetch_or_explicit`](https://en.cppreference.com/w/cpp/atomic/atomic_fetch_or) 的操作回傳值為舊值,也就是被 bitwise OR 之前的值。並加入 memery barrier:前面存取指令不得重排至此條指令之後。
* macro `atomic_xchg_release(x, v)`
* 做的是將 `v` atomicly store 至 `x` 所指的 obj 裡,根據 [`atomic_exchange_explicit`](https://en.cppreference.com/w/cpp/atomic/atomic_exchange) 的操作回傳值為舊值,也就是被取代之前的值。並加入 memery barrier:前面存取指令不得重排至此條指令之後。
```c
#include <errno.h>
#include <stdatomic.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#ifdef __CHECKER__
#define __rcu __attribute__((noderef, address_space(__rcu)))
#define rcu_check_sparse(p, space) ((void) (((typeof(*p) space *) p) == p))
#define __force __attribute__((force))
#define rcu_uncheck(p) ((__typeof__(*(p)) __force *) (p))
#define rcu_check(p) ((__typeof__(*(p)) __force __rcu *) (p))
#else
#define __rcu
#define rcu_check_sparse(p, space)
#define __force
#define rcu_uncheck(p) p
#define rcu_check(p) p
#endif /* __CHECKER__ */
/* Avoid false sharing */
#define CACHE_LINE_SIZE 64
#define __rcu_aligned __attribute__((aligned(2 * CACHE_LINE_SIZE)))
/* Per-thread variable
*
* rcu_nesting is to distinguish where the current thread is in which grace
* period. The reader will use the global variable rcu_nesting_idx to set up
* the corresponding index of rcu_nesting for the read-side critical section.
* It is a per-thread variable for optimizing the reader-side lock execution.
*
* rcu_nesting uses two lowest bits of __next_rcu_nesting to determine the
* grace period. Do not use __next_rcu_nesting directly. Use the helper macro
* to access it.
*/
struct rcu_node {
uintptr_t tid;
uintptr_t __next_rcu_nesting;
} __rcu_aligned; //
struct rcu_data {
unsigned int nr_thread;
struct rcu_node *head;
unsigned int rcu_nesting_idx;
spinlock_t lock;
};
/* Helper macro of next pointer and rcu_nesting */
#define rcu_nesting(np, idx) \
(READ_ONCE((np)->__next_rcu_nesting) & (0x1 << ((idx) & (0x1)))) // check lowest two bits is set or not (odd/even idx)
#define rcu_set_nesting(np, idx) \
do { \
WRITE_ONCE( \
(np)->__next_rcu_nesting, \
READ_ONCE((np)->__next_rcu_nesting) | (0x1 << ((idx) & (0x1)))); \
} while (0) //set node->__next_rcu_nesting lowest two bits by odd/even idx
#define rcu_unset_nesting(np) \
do { \
smp_store_release(&(np)->__next_rcu_nesting, \
READ_ONCE((np)->__next_rcu_nesting) & ~0x3); \
} while (0) //unset lowest two bits
#define rcu_next(np) \
((struct rcu_node *) (READ_ONCE((np)->__next_rcu_nesting) & ~0x3)) //node->__next_rcu_nesting with leaving lowest two bits to 0
#define rcu_next_mask(nrn) ((struct rcu_node *) ((uintptr_t)(nrn) & ~0x3)) //leave lowest two bits to 0
```
* ==前半 macro==
* `__rcu_aligned`
* [GNU extension 對 variables 的 attribute 設置](https://gcc.gnu.org/onlinedocs/gcc-4.0.0/gcc/Variable-Attributes.html#Variable-Attributes)其中之一為使 variable or structure field by bytes 對齊。藉此提高建立副本的效率。
* `struct rcu_node`
* 一個 `struct rcu_node` 代表一個 reader(thread)。
* `tid` 為 thread id,==為何要用 <stdint.h> uintptr_t 存?==
* `__next_rcu_nesting` 的最低位兩個 bits 代表著對應的 grace period。
* struct rcu_data
* 一個 `struct rcu_data` 為一份共享資料。
* `nr_thread` 代表存取此共享資料的 reader 數量。
* ==head??==
* `rcu_nesting_idx` 代表著一個 grace period,若 `struct rcu_node` 的 `__next_rcu_nesting` 後第一位or第二位與 `rcu_nesting_idx` 相同,代表 `struct rcu_node` 處在該 grace period 裡。
* `lock` 為共享資料的 lock,之後可以將共享資料上鎖。
* 為確保相關 `__next_rcu_nesting` 的操作避免 race condition,定義以下一系列 helper macro:
* `rcu_nesting(np, idx)`
* `(0x1 << ((idx) & (0x1)))` 若 `idx` 為偶數時,值為 0x1,若 `idx` 為奇數時,值為 0x10。
* `READ_ONCE((np)->__next_rcu_nesting)` 與 0x1 or 0x10 & 意在確認 `np` 的 `__next_rcu_nesting` 最低位元或次低位元是否 set。
* `rcu_set_nesting(np, idx)`
* `READ_ONCE((np)->__next_rcu_nesting)` 與 0x1 or 0x10 | 意在 set `np` 的 `__next_rcu_nesting(副本)` 最低位元或次低位元。
* 再執行`WRITE_ONCE(struct rcu_node 的 __next_rcu_nesting, 副本)` 。
* `rcu_unset_nesting(np)`
* `READ_ONCE((np)->__next_rcu_nesting) & ~0x3` 意在將 `np` 的 `__next_rcu_nesting(副本)` 的最低兩位元遮蔽(設為 0 )。
* 再執行`smp_store_release(&struct rcu_node 的 __next_rcu_nesting, 副本)` 。
* ==為何 `rcu_set_nesting` 用 `WRITE_ONCE`,`rcu_unset_nesting` 用 `smp_store_release`?==
* `rcu_next(np)`
* 取得被遮蔽最低兩位元的 `np` 的 `__next_rcu_nesting` 副本,且型態為 `struct rcu_node *`。
* `rcu_next_mask(nrn)`
* 取得被遮蔽最低兩位元的 `nrn` 副本,且型態為 `struct rcu_node *`。
* 上述讀取 `struct rcu_node` 的 `__rcu_per_thread_ptr` 需要 atomic 操作(`READ_ONCE()`)是為了避免指令被重排。
參考[Linux 核心設計: RCU 同步機制中的例子](https://hackmd.io/@sysprog/linux-rcu#%E9%A1%A7%E5%8F%8A-memory-ordering-%E7%9A%84%E5%BD%B1%E9%9F%BF)
```c
static struct rcu_data rcu_data = {
.nr_thread = 0,
.head = NULL,
.rcu_nesting_idx = 0,
.lock = SPINLOCK_INIT,
};
static __thread struct rcu_node *__rcu_per_thread_ptr; //variable per thread
static inline struct rcu_node *__rcu_node_add(uintptr_t tid)
{
struct rcu_node **indirect = &rcu_data.head;
struct rcu_node *node = malloc(sizeof(struct rcu_node));
if (!node) {
fprintf(stderr, "__rcu_node_add: malloc failed\n");
abort();
}
node->tid = tid;
node->__next_rcu_nesting = 0;
spin_lock(&rcu_data.lock);
/* Read-side will write the rcu_nesting field in __next_rcu_nesting
* even if we lock the linked list. So, here we use READ_ONCE().
*/
#define rro_mask(pp) rcu_next_mask(READ_ONCE((*pp)))
while (rro_mask(indirect)) {
if (rro_mask(indirect)->tid == node->tid) {
spin_unlock(&rcu_data.lock);
free(node);
return NULL;
}
indirect = (struct rcu_node **) &rro_mask(indirect)->__next_rcu_nesting;
}
#undef rro_mask
atomic_fetch_or_release((uintptr_t *) indirect, (uintptr_t) node);
rcu_data.nr_thread++;
spin_unlock(&rcu_data.lock);
smp_mb();
return node;
}
```
* 初始化 `rcu_data`,並建立一個 `struct node *` 的變數:`__rcu_per_thread_ptr`。[GNU extension `__thread` specifier](https://gcc.gnu.org/onlinedocs/gcc/Thread-Local.html) 說明變數為僅存在於 thread 裡的變數,也就是每個 thread 的變數皆不同。
* `__rcu_node_add()`
* 第一個 reader thread 進入此 function 時,並不會經過 while loop body,因為此時 head 為 NULL。之後 head 會指向新建立的 struct rcu_node。
* ==不懂此用意:indirect = (struct rcu_node **) &rro_mask(indirect)->__next_rcu_nesting; 被 node pointer 被mask 後如何得知仍然為 node?==
* ==if (rro_mask(indirect)->tid == node->tid) 這個條件何時會成立?==
```c
static inline int rcu_init(void) //create node
{
uintptr_t tid = current_tid();
__rcu_per_thread_ptr = __rcu_node_add(tid);
return __rcu_per_thread_ptr ? 0 : -ENOMEM;
}
static inline void rcu_clean(void)
{
struct rcu_node *node, *tmp;
spin_lock(&rcu_data.lock);
for (node = rcu_data.head; node; node = tmp) {
tmp = rcu_next_mask(node->__next_rcu_nesting);
free(rcu_next_mask(node));
}
rcu_data.head = NULL;
rcu_data.nr_thread = 0;
spin_unlock(&rcu_data.lock);
}
```
* `rcu_init()`
* 將新的 thread 建立一個 `struct rcu_node`,回傳 0 代表建立成功,不成功則回傳 `-ENOMEM`。
* `ENOMEM` 定義於 [<errno.h>](https://man7.org/linux/man-pages/man3/errno.3.html) 意指 Not enough space/cannot allocate memory。
* `rcu_clean()`
* ==由此 rcu_data.head 開始迭代, node->__next_rcu_nesting 最低兩位元被遮蔽的 node???, 再釋放==
```c
/* The per-thread reference count will only modified by their owner
* thread but will read by other threads. So here we use WRITE_ONCE().
*
* We can change the set 1/0 to reference count to make rcu read-side lock
* nesting. But here we simplified it to become once as time.
*/
static inline void rcu_read_lock(void) //set node with the same gp
{
rcu_set_nesting(__rcu_per_thread_ptr, READ_ONCE(rcu_data.rcu_nesting_idx));
}
/* It uses the smp_store_release().
* But in some case, like MacOS (x86_64, M1), it can use WRITE_ONCE().
*/
static inline void rcu_read_unlock(void)
{
rcu_unset_nesting(__rcu_per_thread_ptr);
}
```
* `rcu_read_lock()`
* set `struct rcu_node` 的 `__next_rcu_nesting` 最低位元(當 `rcu_data.rcu_nesting_idx` 為偶數) or 次低位元(當 `rcu_data.rcu_nesting_idx` 為奇數)。
* ==既然 The per-thread reference count will only modified by their owner...,為何 `rcu_set_nesting` 中需要 `READ_ONCE((np)->__next_rcu_nesting)`?==
* `rcu_read_unlock()`
* unset `struct rcu_node` 的 `__next_rcu_nesting` 最低位元(當 `rcu_data.rcu_nesting_idx` 為偶數) or 次低位元(當 `rcu_data.rcu_nesting_idx` 為奇數)
* 為何這裡用smp_store_release()?(同上問題)
```c
static inline void synchronize_rcu(void)
{
struct rcu_node *node;
int i;
smp_mb();
spin_lock(&rcu_data.lock);
/* When rcu_nesting is set, the thread is in the read-side critical
* section. It is safe to plain access rcu_data.rcu_nesting_idx since
* it only modified by the update-side.
*
* Again, if we want the read-side lock to be nesting, we need to change
* rcu_nesting to reference count.
*/
for (node = rcu_data.head; node; node = rcu_next(node)) {
while (rcu_nesting(node, rcu_data.rcu_nesting_idx)) {
//if the node is in the same gp,
//wait for the node(thread) to finish reading,
// then write(only wait for the threads
// which happend before this funciton)
barrier();
}
}
/* Going to next grace period */
i = atomic_fetch_add_release(&rcu_data.rcu_nesting_idx, 1); //update gp while new write
smp_mb();
/* Some read-side threads may be in the linked list, but it enters the
* read-side critical section after the update-side checks it's nesting.
* It may cause a data race since the update-side will think all the reader
* passes through the critical section.
* To stay away from it, we check again after increasing the global index
* variable.
*/
for (node = rcu_data.head; node; node = rcu_next(node)) {
while (rcu_nesting(node, i)) {
barrier();
}
}
spin_unlock(&rcu_data.lock);
smp_mb();
}
```
* writer 更新值後進入 `synchronize_rcu()` 是為了確保有些 reader 還在讀舊值,因此要等到 `synchronize_rcu()` 結束後才能釋放舊值。`synchronize_rcu()` 會讓先前的 reader 讀完。
* `smp_mb()` 確保其前後的存取指令不能重排。
* for loop 迭代目前所有 reader(`struct rcu_node`),reader function 進入 crtical section 前 set `__next_rcu_nesting` ,透過 `__next_rcu_nesting` 是否被 set 來確認 reader 目前是否正在讀取資料,跳出 while loop 的條件為 `__next_rcu_nesting` 被 unset,表示讀取完畢。
* 第一個 for loop,writer 等待前面讀取舊值的 reader 結束讀取,等待中的 grace period 仍是舊值時期開始的。
* 結束第一個 for loop 後才更改成新的 grace period。
* 第二個 for loop 是為了確保在進入新的 grace period 之後才進入 critical section 的 reader 要在更新值之前讀完。
* ==for loop 也會等待讀取新值的 reader 讀完??。==
```c
#define rcu_dereference(p) \
({ \
__typeof__(*p) *___p = (__typeof__(*p) __force *) READ_ONCE(p); \ //多此一舉??? 給 Sparse 給檢查用的?
rcu_check_sparse(p, __rcu); \
___p; \
})
#define rcu_assign_pointer(p, v) \
({ \
rcu_check_sparse(p, __rcu); \
(__typeof__(*p) __force *) atomic_xchg_release((rcu_uncheck(&p)), \
rcu_check(v)); \
}) //RCU protected data pointer to point new data
#endif /* __RCU_H__ */
```
:::warning
參見 [Linux 核心原始程式碼巨集: max, min](https://hackmd.io/@sysprog/linux-macro-minmax)
:notes: jserv
:::
==READ_ONCE()回傳的已為副本,為何還要再 assign 到新的副本?若不用 typeof, 單用READ_ONCE()也不會造成double evaluation吧...?==
* `__force` 的用意在於抑制 [Linux 核心 semantic checker: Sparse](https://www.kernel.org/doc/html/v4.14/dev-tools/sparse.html) 丟出轉型的警告。`rcu_check_sparse(p, __rcu)` 也為使用 Sparse 的用法。
### main.c
```c
#include <pthread.h>
#include <stdatomic.h>
struct barrier_struct {
atomic_int flag;
int count;
pthread_mutex_t lock;
};
static __thread int local_sense = 0;
#define BARRIER_INIT \
{ \
.flag = 0, .count = 0, .lock = PTHREAD_MUTEX_INITIALIZER \
}
#define DEFINE_BARRIER(name) struct barrier_struct name = BARRIER_INIT
static inline void thread_barrier(struct barrier_struct *b, size_t n) //keep n threads execute at the same time
{
local_sense = !local_sense;
pthread_mutex_lock(&b->lock);
b->count++;
if (b->count == n) {
b->count = 0;
pthread_mutex_unlock(&b->lock);
atomic_store_explicit(&b->flag, local_sense, memory_order_release);
} else {
pthread_mutex_unlock(&b->lock);
while (atomic_load_explicit(&b->flag, memory_order_acquire) !=
local_sense)
;
}
}
```
* `thread_barrier()`
* inline function 是 C99 之後的標準,可以讓程式碼在編譯時期直接展開,減少 function call 的成本。也更能掌握程式碼的大小。在 [GNU extension](https://gcc.gnu.org/onlinedocs/gcc/Inline.html) 中,若要使用 inline function,必定要加入 static 才可能會展開,否則編譯器將會假設此 function 會被其他地方呼叫,為了避免重複定義而不展開。
* `local_sense` 由 `__thread` 修飾,因此每個 thread 的 `local_sense` 必定由自己才能修改。如此前 n 個 threads 若要跳出迴圈,須等待第 n 個 thread 將 `flag` 置換掉。藉此以達到讓所有 reader 能夠幾乎同時執行完此 function。
```c
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "rcu.h"
#define GP_IDX_MAX N_UPDATE_RUN + 1
static DEFINE_BARRIER(test_barrier);
struct test {
unsigned int count;
};
static struct test __rcu *dut;
static atomic_uint gp_idx;
static atomic_uint prev_count;
static atomic_uint grace_periods[GP_IDX_MAX];
static void *reader_func(void *argv)
{
struct test *tmp;
unsigned int old_prev_count;
if (rcu_init())
abort();
thread_barrier(&test_barrier, N_READERS + 1);
rcu_read_lock(); //mark the nodes with the same gp, cs start
tmp = rcu_dereference(dut); //dut increase by updater
if (tmp->count != atomic_load_explicit(&prev_count, memory_order_acquire)) {
old_prev_count = atomic_exchange_explicit(&prev_count, tmp->count,
memory_order_release);
if (tmp->count != old_prev_count)
atomic_fetch_add_explicit(&gp_idx, 1, memory_order_release);
if (atomic_load_explicit(&gp_idx, memory_order_acquire) >
N_UPDATE_RUN) {
fprintf(stderr, "grace period index (%u) is over bound (%u).\n",
atomic_load_explicit(&gp_idx, memory_order_acquire),
N_UPDATE_RUN);
abort();
}
}
atomic_fetch_add_explicit(
&grace_periods[atomic_load_explicit(&gp_idx, memory_order_acquire)], 1,
memory_order_relaxed); //mark how many threads in the gp
rcu_read_unlock(); //finish reading
pthread_exit(NULL);
}
static void *updater_func(void *argv)
{
struct test *oldp;
struct test *newval;
unsigned int i = 0;
thread_barrier(&test_barrier, N_READERS + 1);
atomic_thread_fence(memory_order_seq_cst);
while (i++ < N_UPDATE_RUN) {
newval = malloc(sizeof(struct test));
newval->count = i;
oldp = rcu_assign_pointer(dut, newval); //update(write) data
synchronize_rcu(); //wait for all readers finishing reading
free(oldp);
}
pthread_exit(NULL);
}
```
* `reader_func()`
* `rcu_read_lock()` set thread 的 `_next_rcu_nesting` 表示即將納入對應的 grace period, `rcu_read_unlock()` unset thread 表示移出 grace period,writer 可以不需理會,由此兩個 function 包起的範圍為 critical section(cs)。
* 進入 cs 前,由 `thread_barrier()` 來控制所有 reader 盡量同時進入 cs。
* cs 之間,reader 會維護 `prev_count`,來判斷值是否被更新。對 reader 而言 `tmp->count` 是最新的值,因此 `tmp->count` 若與 `prev_count` 不同時代表值被 writer 更新了,所以將最新的值 assgin 給 `prev_count`,此時需進入下一個 greace period。
* ==if (tmp->count != old_prev_count) 此條件何時不成立?==
* ==為何不用 macro==
* `updater_func()`
* 這裡需確保 writer 不會重複寫入
* 更新值後進入 `synchronize_rcu()` 等待讀舊值的 reader 讀完,才能釋放舊值。
* ==atomic variables 初始為0?==
* ==count 是為了區別讀取的資料是否與寫入的資料同步?==
```c
int main(int argc, char *argv[])
{
pthread_t reader[N_READERS];
pthread_t updater;
unsigned int i, total = 0;
dut = (struct test __rcu *) malloc(sizeof(struct test));
rcu_uncheck(dut)->count = 0;
for (i = 0; i < N_READERS; i++)
pthread_create(&reader[i], NULL, reader_func, NULL);
pthread_create(&updater, NULL, updater_func, NULL);
for (i = 0; i < N_READERS; i++)
pthread_join(reader[i], NULL);
pthread_join(updater, NULL);
free(rcu_uncheck(dut));
rcu_clean();
atomic_thread_fence(memory_order_seq_cst);
printf("%u reader(s), %u update run(s), %u grace period(s)\n", N_READERS,
N_UPDATE_RUN, gp_idx + 1);
for (i = 0; i < gp_idx + 1; i++) {
printf("[grace period #%u] %4u reader(s)\n", i, grace_periods[i]);
total += grace_periods[i];
}
if (total != N_READERS)
fprintf(stderr,
"The Sum of records in the array of grace period(s) (%u) is "
"not the same with number of reader(s) (%u)\n",
total, N_READERS);
return 0;
}
```
* `N_READERS` 定義 Makefile 裡,由 preprocessor 處理。
* `dut` 為共享資源
* 透過 <pthread.h> 使用 thread
* [pthread_create()](https://man7.org/linux/man-pages/man3/pthread_create.3.html)
* `int pthread_create(pthread_t *restrict thread,
const pthread_attr_t *restrict attr,
void *(*start_routine)(void *),
void *restrict arg)`
* 首先需建立 `pthread_t` 的變數代表 thread,才能當作 `pthread_create()`的參數。接著還需有 thread 要做的 function。
* [pthread_join()](https://man7.org/linux/man-pages/man3/pthread_join.3.html) 為等待 thread 完成工作。
## RCU 移植到 quiz5(B) 的程式碼基礎上
:::warning
不要急著張貼程式碼,你應該列出關鍵的思維和預計採用的手段,包含如何驗證。
:notes: jserv
:::
### 如何移植
* 主要修改 quiz5(B) Hazard Pointer 裡的 `reader_thread()` 與 `writer_thread()`,在讀取及寫入共享資源時加入 RCU 機制
* `init()` 及 `deinit()`及 中移除 Hazard Pointer 機制:`domain_new()` 及` domain_free()`
* `reader_thread()` 中加入 `rcu_init()` 以建立屬於 thread 的 `struct rcu_node`。並在 critical section 之前加入 `rcu_read_lock()` 來 set rcu_nesting,也就是標記對應的 grace period,離開 critical section 後以 `rcu_read_unlock()` 通知 writer 讀取完畢。critical section 裡 `shared_config` 可以用 `rcu_dereference()` 讀取。
* `writer_thread()` 中加入 `rcu_assign_pointer()` 以更新共享資源,且暫不釋放舊值,待 `synchronize_rcu()` 確保寬限期前所有讀舊值的 reader 都已通過 critical section 讀取完畢(也就是等待 `reader_thread()` 裡的 `rcu_read_unlock()` 跑完 ),在 grace period 結束後才釋放。
### 程式碼
僅附上 [quiz5 Hazard Pointer 程式碼](https://hackmd.io/@sysprog/linux2022-quiz5#%E6%B8%AC%E9%A9%97-2)有更動的部份
include 的 <rcu.h> 於[此](https://github.com/sysprog21/concurrent-programs/blob/master/thread-rcu/rcu.h)
```c=
#include "rcu.h"
...
void init()
{
shared_config = create_config();
}
void deinit()
{
delete_config(shared_config);
}
static void *reader_thread(void *arg)
{
(void) arg;
for (int i = 0; i < N_ITERS; ++i) {
if (rcu_init())
abort();
rcu_read_lock();
config_t *safe_config = rcu_dereference(shared_config);
rcu_read_unlock();
if (!safe_config)
err(EXIT_FAILURE, "load");
print_config("read config ", safe_config);
}
return NULL;
}
static void *writer_thread(void *arg)
{
(void) arg;
for (int i = 0; i < N_ITERS / 2; ++i) {
config_t *new_config = create_config();
config_t *old_config = create_config();
new_config->v1 = rand();
new_config->v2 = rand();
new_config->v3 = rand();
print_config("updating config", new_config);
old_config = rcu_assign_pointer(shared_config, new_config);
synchronize_rcu();
delete_config(old_config);
print_config("updated config ", new_config);
}
return NULL;
}
```
* 執行結果 core dump
* ==使用 GDB 檢視錯誤為 SIGABRT==
## HP vs RCU
hazard pointer -> 讀取的值一定是最新的, 且能在適當的時機刪除舊資料
RCU -> 能保證讀取不斷進行(比起 hazard pointer 要不斷與寫入同步), 僅能保證進行同步後(grace period後)的資料讀取才是最新的, 同步之前(grace period前,中)的讀取僅能維持舊值
TODO:
HP
next_rcu_nesting 如何更新的? -> rcu_read_lock()
indirect(rcu_data 的 head) 如何更新的? -> 由 node 取代
:::spoiler 其他參考
https://ithelp.ithome.com.tw/articles/10281161
https://ithelp.ithome.com.tw/articles/10279728?sc=iThelpR
https://ithelp.ithome.com.tw/articles/10281491
https://ithelp.ithome.com.tw/articles/10218032?sc=rss.iron