目的: 檢驗學員對並行程式設計和 RCU 的認知
1
RCU 是 Linux 核心廣泛採用的同步處理機制,允許多個讀取端(reader)在 lock-free 狀況下並行存取共享資料結構,同時讓更新端(updater)能安全地修改或釋放舊版本。以下實作簡化的 RCU:
Linux 核心 RCU | 本例 |
---|---|
核心空間 錯綜複雜的 IRQ/NMI、CPU hotplug |
使用者空間 |
per-CPU 資料、callback 佇列 | 每執行緒鏈結,無 callback |
smp_mb() 家族巨集 |
直接用 C11 atomics |
IPI 觸發跨 CPU fence | membarrier 系統呼叫 |
CPU offline/on-line | rcu_thread_online() /offline() |
仍保留 RCU 精髓:
+--------------+ membarrier +--------------+
| Reader A |<-------------------->| Reader B |
| nesting++ | | nesting-- |
| signal_fence | | signal_fence |
+--------------+ +--------------+
\ /
\ __/ (qs: quiescent state)
\__ /
\ /
(讀取端報告 qs) /
\ /
+--------------------------------+
| Updater |
| gp_lock → 全域寬限期主體 |
+--------------------------------+
欄位 | 功能 | 存取模式 |
---|---|---|
read_lock_nesting (atomic_int ) |
rcu_read_lock() /rcu_read_unlock() 的巢狀深度 |
讀取端遞增/遞減,更新端僅讀取 |
need_qs (atomic_bool ) |
要求該執行緒回報 quiescent state (QS) | 更新端設為 true ;讀取端或更新端設為 false |
need_qs
為 false
且 read_lock_nesting == 0
時,執行緒才被視為已經靜止。rcu_read_lock():
++nesting // relaxed
atomic_signal_fence(); // Fence A
atomic_signal_fence()
僅為 compiler barrier,不會產生 CPU 指令rcu_read_unlock():
atomic_signal_fence(); // Fence B
--nesting
if (nesting == 1 && need_qs) // 關鍵分支
report_qs();
report_qs()
先以 exchange
清掉 need_qs
,再遞減全域 gp_holdouts
。
/* Per‑thread RCU state */
struct rcu_thread_state {
atomic_int read_lock_nesting; /* Nesting depth of rcu_read_lock() */
struct rcu_thread_state *next; /* Intrusive list links */
struct rcu_thread_state **pprev;
atomic_bool need_qs;
};
struct rcu_state {
pthread_mutex_t gp_lock; /* 只由更新端持有 */
rcu_thread_state *thread_head; /* 執行緒鏈結串列開頭 */
uint32_t thread_count;
atomic_uint gp_holdouts; /* 尚未靜止的讀取端數量 */
};
synchronize_rcu()
(靠 gp_lock
序列化)gp_holdouts
synchronize_rcu()
gp_lock
gp_holdouts = thread_count
(relaxed
)atomic_thread_fence(memory_order_release)
(Fence E):確保讀取端看見正確初值need_qs = true
membarrier
(Fence F):在所有處理器核插入 SC fencenesting == 0
,直接清 need_qs
並遞減 gp_holdouts
,之後再發一次 membarrier
(Fence G),確保順序對稱futex_wake()
atomic_thread_fence(memory_order_seq_cst)
(Fence H)與讀取端 Fence D 配對,關閉整個寬限期gp_lock
,回傳讀取端 更新端
------------ ---------------------------
A: signal_fence ... 普通存取 ...
E: release fence (publish)
B: signal_fence F: membarrier ⇆ A
... 掃描 need_qs ...
C: signal_fence F: membarrier ⇆ C
D: acq_rel fence H: SC fence ⇆ D
need_qs==true
,要嘛更新端看到 nesting==0
,不會同時漏看Fence F 前的記憶體是舊世界;Fence H 後是新世界,沒有讀取端會橫跨二者。因此,在 synchronize_rcu()
返回後,舊資料就安全可釋放、重用或重建 —— 這正是 RCU 的本意。
考慮檔案名稱為 main.c
,編譯和測試:
$ gcc -O2 -Wall -o main main.c -lpthread
$ ./main -r 32 -u 8 -t 3000 -i 20
參考輸出:
starting stress test: 32 readers, 8 updaters, 3000 ms, 20 µs
reader 21: 602853772 iterations
reader 19: 598778888 iterations
reader 14: 596326515 iterations
reader 12: 600671767 iterations
reader 29: 605255911 iterations
updater 3: 8248 iterations
...
reader 15: 569099859 iterations
updater 6: 8227 iterations
stress test complete in 3.00 s
標頭檔和巨集:
#include <inttypes.h>
#include <limits.h>
#include <linux/futex.h>
#include <linux/membarrier.h>
#include <pthread.h>
#include <stdalign.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <sys/syscall.h>
#include <sys/time.h>
#include <time.h>
#include <unistd.h>
#define CACHE_LINE_SIZE 64
#define CACHE_ALIGNED alignas(CACHE_LINE_SIZE)
#define unlikely(x) __builtin_expect(!!(x), 0)
測試程式碼:
#define MAX_READERS 64
#define MAX_UPDATERS 64
static atomic_bool should_exit;
static _Atomic(uint64_t *) global_shared_state;
static __thread unsigned long long iterations = 0;
static unsigned reader_count;
static unsigned updater_count;
static unsigned update_interval_us;
static unsigned test_time_ms;
static void *reader_func(void *arg)
{
int id = (int) (intptr_t) arg;
rcu_thread_online();
while (!atomic_load_explicit(&should_exit, memory_order_relaxed)) {
rcu_read_lock();
/* Consume load with data dependency. */
uint64_t value =
*atomic_load_explicit(&global_shared_state, memory_order_relaxed);
(void) value;
iterations++;
rcu_read_unlock();
}
printf("reader %d: %llu iterations\n", id, (unsigned long long) iterations);
rcu_thread_offline();
return NULL;
}
static void update_global_state(void)
{
uint64_t *old_state;
uint64_t *new_state = malloc(sizeof(*new_state));
*new_state = 5;
old_state = atomic_exchange_explicit(&global_shared_state, new_state,
memory_order_release);
if (old_state) {
synchronize_rcu();
*old_state = UINT64_MAX;
}
}
static void *updater_func(void *arg)
{
int id = (int) (intptr_t) arg;
(void) id; /* Unused in current diagnostics */
while (!atomic_load_explicit(&should_exit, memory_order_relaxed)) {
iterations++;
update_global_state();
usleep(update_interval_us);
}
printf("updater %d: %llu iterations\n", id,
(unsigned long long) iterations);
return NULL;
}
static bool parse_opts(int argc, char *argv[])
{
int opt;
while ((opt = getopt(argc, argv, "r:u:t:i:")) != -1) {
switch (opt) {
case 'r':
reader_count = atoi(optarg);
break;
case 'u':
updater_count = atoi(optarg);
break;
case 't':
test_time_ms = atoi(optarg);
break;
case 'i':
update_interval_us = atoi(optarg);
break;
default:
return false;
}
}
if (test_time_ms == 0 || update_interval_us == 0 || reader_count == 0 ||
updater_count == 0 || reader_count > MAX_READERS ||
updater_count > MAX_UPDATERS) {
return false;
}
return true;
}
int main(int argc, char *argv[])
{
pthread_t readers[MAX_READERS];
pthread_t updaters[MAX_UPDATERS];
if (!parse_opts(argc, argv)) {
fprintf(
stderr,
"usage: %s -r <readers 1-%u> -u <updaters 1-%u> -t <ms> -i <us>\n",
argv[0], MAX_READERS, MAX_UPDATERS);
return EXIT_FAILURE;
}
if (rcu_init() != 0)
return EXIT_FAILURE;
update_global_state();
printf("starting stress test: %u readers, %u updaters, %u ms, %u µs\n",
reader_count, updater_count, test_time_ms, update_interval_us);
struct timespec start;
clock_gettime(CLOCK_MONOTONIC, &start);
for (unsigned i = 0; i < reader_count; i++) {
if (pthread_create(&readers[i], NULL, reader_func,
(void *) (intptr_t) i) != 0) {
perror("pthread_create reader");
return EXIT_FAILURE;
}
}
for (unsigned i = 0; i < updater_count; i++) {
if (pthread_create(&updaters[i], NULL, updater_func,
(void *) (intptr_t) i) != 0) {
perror("pthread_create updater");
return EXIT_FAILURE;
}
}
usleep(test_time_ms * 1000);
atomic_store_explicit(&should_exit, true, memory_order_relaxed);
for (unsigned i = 0; i < reader_count; i++)
pthread_join(readers[i], NULL);
for (unsigned i = 0; i < updater_count; i++)
pthread_join(updaters[i], NULL);
struct timespec end;
clock_gettime(CLOCK_MONOTONIC, &end);
uint64_t elapsed_us = (uint64_t) (end.tv_sec - start.tv_sec) * 1000000ULL +
(uint64_t) (end.tv_nsec - start.tv_nsec) / 1000ULL;
printf("stress test complete in %llu.%02llu s\n",
(unsigned long long) (elapsed_us / 1000000ULL),
(unsigned long long) ((elapsed_us % 1000000ULL) / 10000ULL));
return EXIT_SUCCESS;
}
RCU 實作程式碼:
/* One instance per thread */
static __thread CACHE_ALIGNED struct rcu_thread_state rcu_thread_state;
static void synchronize_rcu(void);
/* Lightweight SC fence */
static inline void light_fence_seq_cst_light(void)
{
/* Readers only need a signal fence. */
atomic_signal_fence(AAAA);
}
/* Read‑side primitives */
static inline void rcu_read_lock(void)
{
int nesting = atomic_load_explicit(&rcu_thread_state.read_lock_nesting,
memory_order_relaxed);
atomic_store_explicit(&rcu_thread_state.read_lock_nesting, nesting + 1,
memory_order_relaxed);
/* Fence A – pairs with Fence F in synchronize_rcu(). */
light_fence_seq_cst_light();
}
static CACHE_ALIGNED struct rcu_state global_state;
static void futex_wait(_Atomic(uint32_t) *ptr, uint32_t expected)
{
syscall(SYS_futex, ptr, FUTEX_WAIT, expected, NULL);
}
static void futex_wake(_Atomic(uint32_t) *ptr)
{
syscall(SYS_futex, ptr, FUTEX_WAKE, INT_MAX);
}
/* Reader‑side quiescent‑state reporting helper */
static void rcu_read_unlock_report_qs(void)
{
/* Fast path: updater already cleared need_qs. */
if (!atomic_exchange_explicit(&rcu_thread_state.need_qs, false,
memory_order_relaxed))
return;
/* Fence D – pairs with Fence H. */
atomic_thread_fence(BBBB);
if (atomic_fetch_sub_explicit(&global_state.gp_holdouts, 1,
memory_order_relaxed) == 1)
futex_wake(CCCC);
}
static inline void rcu_read_unlock(void)
{
/* Fence B – pairs with Fence G in synchronize_rcu(). */
light_fence_seq_cst_light();
int nesting = atomic_load_explicit(&rcu_thread_state.read_lock_nesting,
memory_order_relaxed);
atomic_store_explicit(&rcu_thread_state.read_lock_nesting, GGGG,
memory_order_relaxed);
if (nesting == 1) {
/* Fence C – guards against store buffering. */
light_fence_seq_cst_light();
if (unlikely(atomic_load_explicit(&rcu_thread_state.need_qs,
memory_order_relaxed)))
rcu_read_unlock_report_qs();
}
}
/* Heavyweight SC fence (membarrier) */
static void heavy_fence_seq_cst(void)
{
/* Kernel provides an SC fence across all threads. */
syscall(SYS_membarrier, MEMBARRIER_CMD_PRIVATE_EXPEDITED, 0, 0);
}
static int rcu_init(void)
{
if (syscall(SYS_membarrier, MEMBARRIER_CMD_REGISTER_PRIVATE_EXPEDITED, 0,
0) != 0) {
perror("membarrier register failed");
return -1;
}
if (pthread_mutex_init(&global_state.gp_lock, NULL) != 0) {
perror("pthread_mutex_init");
return -1;
}
return 0;
}
static void rcu_thread_online(void)
{
pthread_mutex_lock(&global_state.gp_lock);
rcu_thread_state.next = global_state.thread_head;
rcu_thread_state.pprev = &global_state.thread_head;
if (global_state.thread_head)
global_state.thread_head->pprev = &rcu_thread_state.next;
global_state.thread_head = &rcu_thread_state;
global_state.thread_count++;
pthread_mutex_unlock(&global_state.gp_lock);
}
static void rcu_thread_offline(void)
{
pthread_mutex_lock(&global_state.gp_lock);
*rcu_thread_state.pprev = rcu_thread_state.next;
if (rcu_thread_state.next)
rcu_thread_state.next->pprev = rcu_thread_state.pprev;
global_state.thread_count--;
pthread_mutex_unlock(&global_state.gp_lock);
}
/* Grace‑period core */
static void synchronize_rcu(void)
{
uint32_t thread_count;
uint32_t quiescent = 0;
pthread_mutex_lock(&global_state.gp_lock);
thread_count = global_state.thread_count;
atomic_store_explicit(&global_state.gp_holdouts, thread_count,
memory_order_relaxed);
/* Fence E – readers must see initial gp_holdouts value. */
atomic_thread_fence(DDDD);
for (struct rcu_thread_state *t = global_state.thread_head; t;
t = t->next) {
atomic_store_explicit(&t->need_qs, true, memory_order_relaxed);
}
/* Fence F – closes first half of grace period. */
heavy_fence_seq_cst();
for (struct rcu_thread_state *t = global_state.thread_head; t;
t = t->next) {
if (atomic_load_explicit(&t->read_lock_nesting, memory_order_relaxed) ==
0) {
if (atomic_exchange_explicit(&t->need_qs, false,
memory_order_relaxed))
quiescent++;
}
}
if (FFFF > 0) {
/* Reported some readers directly – emit Fence G. */
heavy_fence_seq_cst();
atomic_fetch_sub_explicit(&global_state.gp_holdouts, quiescent,
memory_order_relaxed);
}
if (quiescent != thread_count) {
/* Wait for remaining readers. */
for (;;) {
uint32_t h = atomic_load_explicit(&global_state.gp_holdouts,
memory_order_relaxed);
if (h == 0)
break;
futex_wait(&global_state.gp_holdouts, h);
}
/* Fence H – pairs with Fence D. */
atomic_thread_fence(EEEE);
}
pthread_mutex_unlock(&global_state.gp_lock);
}
請補完程式碼,使其運作符合預期,書寫規範:
延伸問題:
or
By clicking below, you agree to our terms of service.
New to HackMD? Sign up