---
tags: linux2025
---
# [2025q1](https://wiki.csie.ncku.edu.tw/linux/schedule) 第 15 週測驗題
:::info
目的: 檢驗學員對[並行程式設計](https://hackmd.io/@sysprog/concurrency)和 RCU 的認知
:::
==[作答表單: 測驗 `1`](https://docs.google.com/forms/d/e/1FAIpQLSd8OIroXm3xX56RCHHI_5n8iO7jq0O6opZqmaQkY0WE2LarAA/viewform?usp=dialog)==
## 測驗 `1`
[RCU](https://hackmd.io/@sysprog/linux-rcu) 是 Linux 核心廣泛採用的同步處理機制,允許多個讀取端(reader)在 lock-free 狀況下並行存取共享資料結構,同時讓更新端(updater)能安全地修改或釋放舊版本。以下實作簡化的 RCU:
| Linux 核心 RCU | 本例 |
| --------------------- | --------------------------------- |
| 核心空間<br>錯綜複雜的 IRQ/NMI、CPU hotplug | 使用者空間 |
| per-CPU 資料、callback 佇列 | 每執行緒鏈結,無 callback |
| `smp_mb()` 家族巨集 | 直接用 C11 atomics |
| IPI 觸發跨 CPU fence | [membarrier](https://man7.org/linux/man-pages/man2/membarrier.2.html) 系統呼叫 |
| CPU offline/on-line | `rcu_thread_online()`/`offline()` |
仍保留 RCU 精髓:
* 讀取端幾乎零成本、絕不阻塞
* 更新端處理一次,每個寬限期 (GP) 僅 2 次重量級 barrier
* 藉由精心排列的fence與旗標,提供立刻可回收 (reclaim) 的保證
```
+--------------+ membarrier +--------------+
| Reader A |<-------------------->| Reader B |
| nesting++ | | nesting-- |
| signal_fence | | signal_fence |
+--------------+ +--------------+
\ /
\ __/ (qs: quiescent state)
\__ /
\ /
(讀取端報告 qs) /
\ /
+--------------------------------+
| Updater |
| gp_lock → 全域寬限期主體 |
+--------------------------------+
```
### 每執行緒狀態:一個計數器+一個旗標
| 欄位 | 功能 | 存取模式 |
| --------------------------------- | ------------------------------------------- | ---------------------------------- |
| `read_lock_nesting`(`atomic_int`) | `rcu_read_lock()`/`rcu_read_unlock()` 的巢狀深度 | 讀取端遞增/遞減,更新端僅讀取 |
| `need_qs`(`atomic_bool`) | 要求該執行緒回報 quiescent state (QS) | 更新端設為 `true`;讀取端或更新端設為 `false` |
* 只有在 `need_qs` 為 `false` 且 `read_lock_nesting == 0` 時,執行緒才被視為已經靜止。
* 更新端可在掃描時直接清除此旗標,以免白忙一場。
### 讀取端快速路徑(x86-64 上約 10 ns)
```c
rcu_read_lock():
++nesting // relaxed
atomic_signal_fence(); // Fence A
```
* `atomic_signal_fence()` 僅為 compiler barrier,不會產生 CPU 指令
* 完全無系統呼叫、無 mutext lock、也無重量級 memory barrier
```c
rcu_read_unlock():
atomic_signal_fence(); // Fence B
--nesting
if (nesting == 1 && need_qs) // 關鍵分支
report_qs();
```
`report_qs()` 先以 `exchange` 清掉 `need_qs`,再遞減全域 `gp_holdouts`。
### 3 全域狀態
```c
/* Per‑thread RCU state */
struct rcu_thread_state {
atomic_int read_lock_nesting; /* Nesting depth of rcu_read_lock() */
struct rcu_thread_state *next; /* Intrusive list links */
struct rcu_thread_state **pprev;
atomic_bool need_qs;
};
struct rcu_state {
pthread_mutex_t gp_lock; /* 只由更新端持有 */
rcu_thread_state *thread_head; /* 執行緒鏈結串列開頭 */
uint32_t thread_count;
atomic_uint gp_holdouts; /* 尚未靜止的讀取端數量 */
};
```
* 同一時間僅一個更新端會執行 `synchronize_rcu()`(靠 `gp_lock` 序列化)
* 讀取端完全不觸及 mutex lock 與 `gp_holdouts`
### 啟動寬限期
> `synchronize_rcu()`
1. 取得 `gp_lock`
2. `gp_holdouts = thread_count`(`relaxed`)
3. `atomic_thread_fence(memory_order_release)`(Fence E):確保讀取端看見正確初值
4. 對每個執行緒設 `need_qs = true`
5. 執行私有 `membarrier`(Fence F):在所有處理器核插入 SC fence
6. 掃描每個執行緒:若 `nesting == 0`,直接清 `need_qs` 並遞減 `gp_holdouts`,之後再發一次 `membarrier`(Fence G),確保順序對稱
7. 若仍有剩餘 holdout,睡在 futex;最後一個讀取端會 `futex_wake()`
8. `atomic_thread_fence(memory_order_seq_cst)`(Fence H)與讀取端 Fence D 配對,關閉整個寬限期
9. 釋放 `gp_lock`,回傳
### 記憶體順序
```
讀取端 更新端
------------ ---------------------------
A: signal_fence ... 普通存取 ...
E: release fence (publish)
B: signal_fence F: membarrier ⇆ A
... 掃描 need_qs ...
C: signal_fence F: membarrier ⇆ C
D: acq_rel fence H: SC fence ⇆ D
```
* A ⇆ F:避免更新端的「舊資料」穿越到讀取臨界區
* C ⇆ F:解決 store-buffer 競賽:要嘛讀取端看到 `need_qs==true`,要嘛更新端看到 `nesting==0`,不會同時漏看
* D ⇆ H:確保讀取端臨界區內的所有寫入在寬限期結束前可見
Fence F 前的記憶體是舊世界;Fence H 後是新世界,沒有讀取端會橫跨二者。因此,在 `synchronize_rcu()` 返回後,舊資料就安全可釋放、重用或重建 —— 這正是 RCU 的本意。
考慮檔案名稱為 `main.c`,編譯和測試:
```shell
$ gcc -O2 -Wall -o main main.c -lpthread
$ ./main -r 32 -u 8 -t 3000 -i 20
```
參考輸出:
```
starting stress test: 32 readers, 8 updaters, 3000 ms, 20 µs
reader 21: 602853772 iterations
reader 19: 598778888 iterations
reader 14: 596326515 iterations
reader 12: 600671767 iterations
reader 29: 605255911 iterations
updater 3: 8248 iterations
...
reader 15: 569099859 iterations
updater 6: 8227 iterations
stress test complete in 3.00 s
```
標頭檔和巨集:
```c
#include <inttypes.h>
#include <limits.h>
#include <linux/futex.h>
#include <linux/membarrier.h>
#include <pthread.h>
#include <stdalign.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <sys/syscall.h>
#include <sys/time.h>
#include <time.h>
#include <unistd.h>
#define CACHE_LINE_SIZE 64
#define CACHE_ALIGNED alignas(CACHE_LINE_SIZE)
#define unlikely(x) __builtin_expect(!!(x), 0)
```
測試程式碼:
```c
#define MAX_READERS 64
#define MAX_UPDATERS 64
static atomic_bool should_exit;
static _Atomic(uint64_t *) global_shared_state;
static __thread unsigned long long iterations = 0;
static unsigned reader_count;
static unsigned updater_count;
static unsigned update_interval_us;
static unsigned test_time_ms;
static void *reader_func(void *arg)
{
int id = (int) (intptr_t) arg;
rcu_thread_online();
while (!atomic_load_explicit(&should_exit, memory_order_relaxed)) {
rcu_read_lock();
/* Consume load with data dependency. */
uint64_t value =
*atomic_load_explicit(&global_shared_state, memory_order_relaxed);
(void) value;
iterations++;
rcu_read_unlock();
}
printf("reader %d: %llu iterations\n", id, (unsigned long long) iterations);
rcu_thread_offline();
return NULL;
}
static void update_global_state(void)
{
uint64_t *old_state;
uint64_t *new_state = malloc(sizeof(*new_state));
*new_state = 5;
old_state = atomic_exchange_explicit(&global_shared_state, new_state,
memory_order_release);
if (old_state) {
synchronize_rcu();
*old_state = UINT64_MAX;
}
}
static void *updater_func(void *arg)
{
int id = (int) (intptr_t) arg;
(void) id; /* Unused in current diagnostics */
while (!atomic_load_explicit(&should_exit, memory_order_relaxed)) {
iterations++;
update_global_state();
usleep(update_interval_us);
}
printf("updater %d: %llu iterations\n", id,
(unsigned long long) iterations);
return NULL;
}
static bool parse_opts(int argc, char *argv[])
{
int opt;
while ((opt = getopt(argc, argv, "r:u:t:i:")) != -1) {
switch (opt) {
case 'r':
reader_count = atoi(optarg);
break;
case 'u':
updater_count = atoi(optarg);
break;
case 't':
test_time_ms = atoi(optarg);
break;
case 'i':
update_interval_us = atoi(optarg);
break;
default:
return false;
}
}
if (test_time_ms == 0 || update_interval_us == 0 || reader_count == 0 ||
updater_count == 0 || reader_count > MAX_READERS ||
updater_count > MAX_UPDATERS) {
return false;
}
return true;
}
int main(int argc, char *argv[])
{
pthread_t readers[MAX_READERS];
pthread_t updaters[MAX_UPDATERS];
if (!parse_opts(argc, argv)) {
fprintf(
stderr,
"usage: %s -r <readers 1-%u> -u <updaters 1-%u> -t <ms> -i <us>\n",
argv[0], MAX_READERS, MAX_UPDATERS);
return EXIT_FAILURE;
}
if (rcu_init() != 0)
return EXIT_FAILURE;
update_global_state();
printf("starting stress test: %u readers, %u updaters, %u ms, %u µs\n",
reader_count, updater_count, test_time_ms, update_interval_us);
struct timespec start;
clock_gettime(CLOCK_MONOTONIC, &start);
for (unsigned i = 0; i < reader_count; i++) {
if (pthread_create(&readers[i], NULL, reader_func,
(void *) (intptr_t) i) != 0) {
perror("pthread_create reader");
return EXIT_FAILURE;
}
}
for (unsigned i = 0; i < updater_count; i++) {
if (pthread_create(&updaters[i], NULL, updater_func,
(void *) (intptr_t) i) != 0) {
perror("pthread_create updater");
return EXIT_FAILURE;
}
}
usleep(test_time_ms * 1000);
atomic_store_explicit(&should_exit, true, memory_order_relaxed);
for (unsigned i = 0; i < reader_count; i++)
pthread_join(readers[i], NULL);
for (unsigned i = 0; i < updater_count; i++)
pthread_join(updaters[i], NULL);
struct timespec end;
clock_gettime(CLOCK_MONOTONIC, &end);
uint64_t elapsed_us = (uint64_t) (end.tv_sec - start.tv_sec) * 1000000ULL +
(uint64_t) (end.tv_nsec - start.tv_nsec) / 1000ULL;
printf("stress test complete in %llu.%02llu s\n",
(unsigned long long) (elapsed_us / 1000000ULL),
(unsigned long long) ((elapsed_us % 1000000ULL) / 10000ULL));
return EXIT_SUCCESS;
}
```
RCU 實作程式碼:
```c
/* One instance per thread */
static __thread CACHE_ALIGNED struct rcu_thread_state rcu_thread_state;
static void synchronize_rcu(void);
/* Lightweight SC fence */
static inline void light_fence_seq_cst_light(void)
{
/* Readers only need a signal fence. */
atomic_signal_fence(AAAA);
}
/* Read‑side primitives */
static inline void rcu_read_lock(void)
{
int nesting = atomic_load_explicit(&rcu_thread_state.read_lock_nesting,
memory_order_relaxed);
atomic_store_explicit(&rcu_thread_state.read_lock_nesting, nesting + 1,
memory_order_relaxed);
/* Fence A – pairs with Fence F in synchronize_rcu(). */
light_fence_seq_cst_light();
}
static CACHE_ALIGNED struct rcu_state global_state;
static void futex_wait(_Atomic(uint32_t) *ptr, uint32_t expected)
{
syscall(SYS_futex, ptr, FUTEX_WAIT, expected, NULL);
}
static void futex_wake(_Atomic(uint32_t) *ptr)
{
syscall(SYS_futex, ptr, FUTEX_WAKE, INT_MAX);
}
/* Reader‑side quiescent‑state reporting helper */
static void rcu_read_unlock_report_qs(void)
{
/* Fast path: updater already cleared need_qs. */
if (!atomic_exchange_explicit(&rcu_thread_state.need_qs, false,
memory_order_relaxed))
return;
/* Fence D – pairs with Fence H. */
atomic_thread_fence(BBBB);
if (atomic_fetch_sub_explicit(&global_state.gp_holdouts, 1,
memory_order_relaxed) == 1)
futex_wake(CCCC);
}
static inline void rcu_read_unlock(void)
{
/* Fence B – pairs with Fence G in synchronize_rcu(). */
light_fence_seq_cst_light();
int nesting = atomic_load_explicit(&rcu_thread_state.read_lock_nesting,
memory_order_relaxed);
atomic_store_explicit(&rcu_thread_state.read_lock_nesting, GGGG,
memory_order_relaxed);
if (nesting == 1) {
/* Fence C – guards against store buffering. */
light_fence_seq_cst_light();
if (unlikely(atomic_load_explicit(&rcu_thread_state.need_qs,
memory_order_relaxed)))
rcu_read_unlock_report_qs();
}
}
/* Heavyweight SC fence (membarrier) */
static void heavy_fence_seq_cst(void)
{
/* Kernel provides an SC fence across all threads. */
syscall(SYS_membarrier, MEMBARRIER_CMD_PRIVATE_EXPEDITED, 0, 0);
}
static int rcu_init(void)
{
if (syscall(SYS_membarrier, MEMBARRIER_CMD_REGISTER_PRIVATE_EXPEDITED, 0,
0) != 0) {
perror("membarrier register failed");
return -1;
}
if (pthread_mutex_init(&global_state.gp_lock, NULL) != 0) {
perror("pthread_mutex_init");
return -1;
}
return 0;
}
static void rcu_thread_online(void)
{
pthread_mutex_lock(&global_state.gp_lock);
rcu_thread_state.next = global_state.thread_head;
rcu_thread_state.pprev = &global_state.thread_head;
if (global_state.thread_head)
global_state.thread_head->pprev = &rcu_thread_state.next;
global_state.thread_head = &rcu_thread_state;
global_state.thread_count++;
pthread_mutex_unlock(&global_state.gp_lock);
}
static void rcu_thread_offline(void)
{
pthread_mutex_lock(&global_state.gp_lock);
*rcu_thread_state.pprev = rcu_thread_state.next;
if (rcu_thread_state.next)
rcu_thread_state.next->pprev = rcu_thread_state.pprev;
global_state.thread_count--;
pthread_mutex_unlock(&global_state.gp_lock);
}
/* Grace‑period core */
static void synchronize_rcu(void)
{
uint32_t thread_count;
uint32_t quiescent = 0;
pthread_mutex_lock(&global_state.gp_lock);
thread_count = global_state.thread_count;
atomic_store_explicit(&global_state.gp_holdouts, thread_count,
memory_order_relaxed);
/* Fence E – readers must see initial gp_holdouts value. */
atomic_thread_fence(DDDD);
for (struct rcu_thread_state *t = global_state.thread_head; t;
t = t->next) {
atomic_store_explicit(&t->need_qs, true, memory_order_relaxed);
}
/* Fence F – closes first half of grace period. */
heavy_fence_seq_cst();
for (struct rcu_thread_state *t = global_state.thread_head; t;
t = t->next) {
if (atomic_load_explicit(&t->read_lock_nesting, memory_order_relaxed) ==
0) {
if (atomic_exchange_explicit(&t->need_qs, false,
memory_order_relaxed))
quiescent++;
}
}
if (FFFF > 0) {
/* Reported some readers directly – emit Fence G. */
heavy_fence_seq_cst();
atomic_fetch_sub_explicit(&global_state.gp_holdouts, quiescent,
memory_order_relaxed);
}
if (quiescent != thread_count) {
/* Wait for remaining readers. */
for (;;) {
uint32_t h = atomic_load_explicit(&global_state.gp_holdouts,
memory_order_relaxed);
if (h == 0)
break;
futex_wait(&global_state.gp_holdouts, h);
}
/* Fence H – pairs with Fence D. */
atomic_thread_fence(EEEE);
}
pthread_mutex_unlock(&global_state.gp_lock);
}
```
請補完程式碼,使其運作符合預期,書寫規範:
* AAAA, BBBB, DDDD, EEEE 為 C11 atomics 規範的 memory order
* CCCC, FFFF 為合法表示式
* 均不包含空白字元
:::success
延伸問題:
* 解釋上述程式碼的原理,指出其缺失並予以改進
* 開發得以搭配 [Userspace RCU](https://liburcu.org/) 和改進後的上方程式碼運作的效能評比程式,量化在高度並行場景的效能表現
* 將改進後的上方程式碼,運用於[高度並行的 Valkey 實作](https://hackmd.io/@sysprog/HyqlsB7Zxe),並評估其效能表現,隨後指出改進方案
:::