--- tags: linux2025 --- # [2025q1](https://wiki.csie.ncku.edu.tw/linux/schedule) 第 15 週測驗題 :::info 目的: 檢驗學員對[並行程式設計](https://hackmd.io/@sysprog/concurrency)和 RCU 的認知 ::: ==[作答表單: 測驗 `1`](https://docs.google.com/forms/d/e/1FAIpQLSd8OIroXm3xX56RCHHI_5n8iO7jq0O6opZqmaQkY0WE2LarAA/viewform?usp=dialog)== ## 測驗 `1` [RCU](https://hackmd.io/@sysprog/linux-rcu) 是 Linux 核心廣泛採用的同步處理機制,允許多個讀取端(reader)在 lock-free 狀況下並行存取共享資料結構,同時讓更新端(updater)能安全地修改或釋放舊版本。以下實作簡化的 RCU: | Linux 核心 RCU | 本例 | | --------------------- | --------------------------------- | | 核心空間<br>錯綜複雜的 IRQ/NMI、CPU hotplug | 使用者空間 | | per-CPU 資料、callback 佇列 | 每執行緒鏈結,無 callback | | `smp_mb()` 家族巨集 | 直接用 C11 atomics | | IPI 觸發跨 CPU fence | [membarrier](https://man7.org/linux/man-pages/man2/membarrier.2.html) 系統呼叫 | | CPU offline/on-line | `rcu_thread_online()`/`offline()` | 仍保留 RCU 精髓: * 讀取端幾乎零成本、絕不阻塞 * 更新端處理一次,每個寬限期 (GP) 僅 2 次重量級 barrier * 藉由精心排列的fence與旗標,提供立刻可回收 (reclaim) 的保證 ``` +--------------+ membarrier +--------------+ | Reader A |<-------------------->| Reader B | | nesting++ | | nesting-- | | signal_fence | | signal_fence | +--------------+ +--------------+ \ / \ __/ (qs: quiescent state) \__ / \ / (讀取端報告 qs) / \ / +--------------------------------+ | Updater | | gp_lock → 全域寬限期主體 | +--------------------------------+ ``` ### 每執行緒狀態:一個計數器+一個旗標 | 欄位 | 功能 | 存取模式 | | --------------------------------- | ------------------------------------------- | ---------------------------------- | | `read_lock_nesting`(`atomic_int`) | `rcu_read_lock()`/`rcu_read_unlock()` 的巢狀深度 | 讀取端遞增/遞減,更新端僅讀取 | | `need_qs`(`atomic_bool`) | 要求該執行緒回報 quiescent state (QS) | 更新端設為 `true`;讀取端或更新端設為 `false` | * 只有在 `need_qs` 為 `false` 且 `read_lock_nesting == 0` 時,執行緒才被視為已經靜止。 * 更新端可在掃描時直接清除此旗標,以免白忙一場。 ### 讀取端快速路徑(x86-64 上約 10 ns) ```c rcu_read_lock(): ++nesting // relaxed atomic_signal_fence(); // Fence A ``` * `atomic_signal_fence()` 僅為 compiler barrier,不會產生 CPU 指令 * 完全無系統呼叫、無 mutext lock、也無重量級 memory barrier ```c rcu_read_unlock(): atomic_signal_fence(); // Fence B --nesting if (nesting == 1 && need_qs) // 關鍵分支 report_qs(); ``` `report_qs()` 先以 `exchange` 清掉 `need_qs`,再遞減全域 `gp_holdouts`。 ### 3 全域狀態 ```c /* Per‑thread RCU state */ struct rcu_thread_state { atomic_int read_lock_nesting; /* Nesting depth of rcu_read_lock() */ struct rcu_thread_state *next; /* Intrusive list links */ struct rcu_thread_state **pprev; atomic_bool need_qs; }; struct rcu_state { pthread_mutex_t gp_lock; /* 只由更新端持有 */ rcu_thread_state *thread_head; /* 執行緒鏈結串列開頭 */ uint32_t thread_count; atomic_uint gp_holdouts; /* 尚未靜止的讀取端數量 */ }; ``` * 同一時間僅一個更新端會執行 `synchronize_rcu()`(靠 `gp_lock` 序列化) * 讀取端完全不觸及 mutex lock 與 `gp_holdouts` ### 啟動寬限期 > `synchronize_rcu()` 1. 取得 `gp_lock` 2. `gp_holdouts = thread_count`(`relaxed`) 3. `atomic_thread_fence(memory_order_release)`(Fence E):確保讀取端看見正確初值 4. 對每個執行緒設 `need_qs = true` 5. 執行私有 `membarrier`(Fence F):在所有處理器核插入 SC fence 6. 掃描每個執行緒:若 `nesting == 0`,直接清 `need_qs` 並遞減 `gp_holdouts`,之後再發一次 `membarrier`(Fence G),確保順序對稱 7. 若仍有剩餘 holdout,睡在 futex;最後一個讀取端會 `futex_wake()` 8. `atomic_thread_fence(memory_order_seq_cst)`(Fence H)與讀取端 Fence D 配對,關閉整個寬限期 9. 釋放 `gp_lock`,回傳 ### 記憶體順序 ``` 讀取端 更新端 ------------ --------------------------- A: signal_fence ... 普通存取 ... E: release fence (publish) B: signal_fence F: membarrier ⇆ A ... 掃描 need_qs ... C: signal_fence F: membarrier ⇆ C D: acq_rel fence H: SC fence ⇆ D ``` * A ⇆ F:避免更新端的「舊資料」穿越到讀取臨界區 * C ⇆ F:解決 store-buffer 競賽:要嘛讀取端看到 `need_qs==true`,要嘛更新端看到 `nesting==0`,不會同時漏看 * D ⇆ H:確保讀取端臨界區內的所有寫入在寬限期結束前可見 Fence F 前的記憶體是舊世界;Fence H 後是新世界,沒有讀取端會橫跨二者。因此,在 `synchronize_rcu()` 返回後,舊資料就安全可釋放、重用或重建 —— 這正是 RCU 的本意。 考慮檔案名稱為 `main.c`,編譯和測試: ```shell $ gcc -O2 -Wall -o main main.c -lpthread $ ./main -r 32 -u 8 -t 3000 -i 20 ``` 參考輸出: ``` starting stress test: 32 readers, 8 updaters, 3000 ms, 20 µs reader 21: 602853772 iterations reader 19: 598778888 iterations reader 14: 596326515 iterations reader 12: 600671767 iterations reader 29: 605255911 iterations updater 3: 8248 iterations ... reader 15: 569099859 iterations updater 6: 8227 iterations stress test complete in 3.00 s ``` 標頭檔和巨集: ```c #include <inttypes.h> #include <limits.h> #include <linux/futex.h> #include <linux/membarrier.h> #include <pthread.h> #include <stdalign.h> #include <stdatomic.h> #include <stdbool.h> #include <stddef.h> #include <stdint.h> #include <stdio.h> #include <stdlib.h> #include <sys/mman.h> #include <sys/syscall.h> #include <sys/time.h> #include <time.h> #include <unistd.h> #define CACHE_LINE_SIZE 64 #define CACHE_ALIGNED alignas(CACHE_LINE_SIZE) #define unlikely(x) __builtin_expect(!!(x), 0) ``` 測試程式碼: ```c #define MAX_READERS 64 #define MAX_UPDATERS 64 static atomic_bool should_exit; static _Atomic(uint64_t *) global_shared_state; static __thread unsigned long long iterations = 0; static unsigned reader_count; static unsigned updater_count; static unsigned update_interval_us; static unsigned test_time_ms; static void *reader_func(void *arg) { int id = (int) (intptr_t) arg; rcu_thread_online(); while (!atomic_load_explicit(&should_exit, memory_order_relaxed)) { rcu_read_lock(); /* Consume load with data dependency. */ uint64_t value = *atomic_load_explicit(&global_shared_state, memory_order_relaxed); (void) value; iterations++; rcu_read_unlock(); } printf("reader %d: %llu iterations\n", id, (unsigned long long) iterations); rcu_thread_offline(); return NULL; } static void update_global_state(void) { uint64_t *old_state; uint64_t *new_state = malloc(sizeof(*new_state)); *new_state = 5; old_state = atomic_exchange_explicit(&global_shared_state, new_state, memory_order_release); if (old_state) { synchronize_rcu(); *old_state = UINT64_MAX; } } static void *updater_func(void *arg) { int id = (int) (intptr_t) arg; (void) id; /* Unused in current diagnostics */ while (!atomic_load_explicit(&should_exit, memory_order_relaxed)) { iterations++; update_global_state(); usleep(update_interval_us); } printf("updater %d: %llu iterations\n", id, (unsigned long long) iterations); return NULL; } static bool parse_opts(int argc, char *argv[]) { int opt; while ((opt = getopt(argc, argv, "r:u:t:i:")) != -1) { switch (opt) { case 'r': reader_count = atoi(optarg); break; case 'u': updater_count = atoi(optarg); break; case 't': test_time_ms = atoi(optarg); break; case 'i': update_interval_us = atoi(optarg); break; default: return false; } } if (test_time_ms == 0 || update_interval_us == 0 || reader_count == 0 || updater_count == 0 || reader_count > MAX_READERS || updater_count > MAX_UPDATERS) { return false; } return true; } int main(int argc, char *argv[]) { pthread_t readers[MAX_READERS]; pthread_t updaters[MAX_UPDATERS]; if (!parse_opts(argc, argv)) { fprintf( stderr, "usage: %s -r <readers 1-%u> -u <updaters 1-%u> -t <ms> -i <us>\n", argv[0], MAX_READERS, MAX_UPDATERS); return EXIT_FAILURE; } if (rcu_init() != 0) return EXIT_FAILURE; update_global_state(); printf("starting stress test: %u readers, %u updaters, %u ms, %u µs\n", reader_count, updater_count, test_time_ms, update_interval_us); struct timespec start; clock_gettime(CLOCK_MONOTONIC, &start); for (unsigned i = 0; i < reader_count; i++) { if (pthread_create(&readers[i], NULL, reader_func, (void *) (intptr_t) i) != 0) { perror("pthread_create reader"); return EXIT_FAILURE; } } for (unsigned i = 0; i < updater_count; i++) { if (pthread_create(&updaters[i], NULL, updater_func, (void *) (intptr_t) i) != 0) { perror("pthread_create updater"); return EXIT_FAILURE; } } usleep(test_time_ms * 1000); atomic_store_explicit(&should_exit, true, memory_order_relaxed); for (unsigned i = 0; i < reader_count; i++) pthread_join(readers[i], NULL); for (unsigned i = 0; i < updater_count; i++) pthread_join(updaters[i], NULL); struct timespec end; clock_gettime(CLOCK_MONOTONIC, &end); uint64_t elapsed_us = (uint64_t) (end.tv_sec - start.tv_sec) * 1000000ULL + (uint64_t) (end.tv_nsec - start.tv_nsec) / 1000ULL; printf("stress test complete in %llu.%02llu s\n", (unsigned long long) (elapsed_us / 1000000ULL), (unsigned long long) ((elapsed_us % 1000000ULL) / 10000ULL)); return EXIT_SUCCESS; } ``` RCU 實作程式碼: ```c /* One instance per thread */ static __thread CACHE_ALIGNED struct rcu_thread_state rcu_thread_state; static void synchronize_rcu(void); /* Lightweight SC fence */ static inline void light_fence_seq_cst_light(void) { /* Readers only need a signal fence. */ atomic_signal_fence(AAAA); } /* Read‑side primitives */ static inline void rcu_read_lock(void) { int nesting = atomic_load_explicit(&rcu_thread_state.read_lock_nesting, memory_order_relaxed); atomic_store_explicit(&rcu_thread_state.read_lock_nesting, nesting + 1, memory_order_relaxed); /* Fence A – pairs with Fence F in synchronize_rcu(). */ light_fence_seq_cst_light(); } static CACHE_ALIGNED struct rcu_state global_state; static void futex_wait(_Atomic(uint32_t) *ptr, uint32_t expected) { syscall(SYS_futex, ptr, FUTEX_WAIT, expected, NULL); } static void futex_wake(_Atomic(uint32_t) *ptr) { syscall(SYS_futex, ptr, FUTEX_WAKE, INT_MAX); } /* Reader‑side quiescent‑state reporting helper */ static void rcu_read_unlock_report_qs(void) { /* Fast path: updater already cleared need_qs. */ if (!atomic_exchange_explicit(&rcu_thread_state.need_qs, false, memory_order_relaxed)) return; /* Fence D – pairs with Fence H. */ atomic_thread_fence(BBBB); if (atomic_fetch_sub_explicit(&global_state.gp_holdouts, 1, memory_order_relaxed) == 1) futex_wake(CCCC); } static inline void rcu_read_unlock(void) { /* Fence B – pairs with Fence G in synchronize_rcu(). */ light_fence_seq_cst_light(); int nesting = atomic_load_explicit(&rcu_thread_state.read_lock_nesting, memory_order_relaxed); atomic_store_explicit(&rcu_thread_state.read_lock_nesting, GGGG, memory_order_relaxed); if (nesting == 1) { /* Fence C – guards against store buffering. */ light_fence_seq_cst_light(); if (unlikely(atomic_load_explicit(&rcu_thread_state.need_qs, memory_order_relaxed))) rcu_read_unlock_report_qs(); } } /* Heavyweight SC fence (membarrier) */ static void heavy_fence_seq_cst(void) { /* Kernel provides an SC fence across all threads. */ syscall(SYS_membarrier, MEMBARRIER_CMD_PRIVATE_EXPEDITED, 0, 0); } static int rcu_init(void) { if (syscall(SYS_membarrier, MEMBARRIER_CMD_REGISTER_PRIVATE_EXPEDITED, 0, 0) != 0) { perror("membarrier register failed"); return -1; } if (pthread_mutex_init(&global_state.gp_lock, NULL) != 0) { perror("pthread_mutex_init"); return -1; } return 0; } static void rcu_thread_online(void) { pthread_mutex_lock(&global_state.gp_lock); rcu_thread_state.next = global_state.thread_head; rcu_thread_state.pprev = &global_state.thread_head; if (global_state.thread_head) global_state.thread_head->pprev = &rcu_thread_state.next; global_state.thread_head = &rcu_thread_state; global_state.thread_count++; pthread_mutex_unlock(&global_state.gp_lock); } static void rcu_thread_offline(void) { pthread_mutex_lock(&global_state.gp_lock); *rcu_thread_state.pprev = rcu_thread_state.next; if (rcu_thread_state.next) rcu_thread_state.next->pprev = rcu_thread_state.pprev; global_state.thread_count--; pthread_mutex_unlock(&global_state.gp_lock); } /* Grace‑period core */ static void synchronize_rcu(void) { uint32_t thread_count; uint32_t quiescent = 0; pthread_mutex_lock(&global_state.gp_lock); thread_count = global_state.thread_count; atomic_store_explicit(&global_state.gp_holdouts, thread_count, memory_order_relaxed); /* Fence E – readers must see initial gp_holdouts value. */ atomic_thread_fence(DDDD); for (struct rcu_thread_state *t = global_state.thread_head; t; t = t->next) { atomic_store_explicit(&t->need_qs, true, memory_order_relaxed); } /* Fence F – closes first half of grace period. */ heavy_fence_seq_cst(); for (struct rcu_thread_state *t = global_state.thread_head; t; t = t->next) { if (atomic_load_explicit(&t->read_lock_nesting, memory_order_relaxed) == 0) { if (atomic_exchange_explicit(&t->need_qs, false, memory_order_relaxed)) quiescent++; } } if (FFFF > 0) { /* Reported some readers directly – emit Fence G. */ heavy_fence_seq_cst(); atomic_fetch_sub_explicit(&global_state.gp_holdouts, quiescent, memory_order_relaxed); } if (quiescent != thread_count) { /* Wait for remaining readers. */ for (;;) { uint32_t h = atomic_load_explicit(&global_state.gp_holdouts, memory_order_relaxed); if (h == 0) break; futex_wait(&global_state.gp_holdouts, h); } /* Fence H – pairs with Fence D. */ atomic_thread_fence(EEEE); } pthread_mutex_unlock(&global_state.gp_lock); } ``` 請補完程式碼,使其運作符合預期,書寫規範: * AAAA, BBBB, DDDD, EEEE 為 C11 atomics 規範的 memory order * CCCC, FFFF 為合法表示式 * 均不包含空白字元 :::success 延伸問題: * 解釋上述程式碼的原理,指出其缺失並予以改進 * 開發得以搭配 [Userspace RCU](https://liburcu.org/) 和改進後的上方程式碼運作的效能評比程式,量化在高度並行場景的效能表現 * 將改進後的上方程式碼,運用於[高度並行的 Valkey 實作](https://hackmd.io/@sysprog/HyqlsB7Zxe),並評估其效能表現,隨後指出改進方案 :::