Try   HackMD

2025q1 第 15 週測驗題

目的: 檢驗學員對並行程式設計和 RCU 的認知

作答表單: 測驗 1

測驗 1

RCU 是 Linux 核心廣泛採用的同步處理機制,允許多個讀取端(reader)在 lock-free 狀況下並行存取共享資料結構,同時讓更新端(updater)能安全地修改或釋放舊版本。以下實作簡化的 RCU:

Linux 核心 RCU 本例
核心空間
錯綜複雜的 IRQ/NMI、CPU hotplug
使用者空間
per-CPU 資料、callback 佇列 每執行緒鏈結,無 callback
smp_mb() 家族巨集 直接用 C11 atomics
IPI 觸發跨 CPU fence membarrier 系統呼叫
CPU offline/on-line rcu_thread_online()offline()

仍保留 RCU 精髓:

  • 讀取端幾乎零成本、絕不阻塞
  • 更新端處理一次,每個寬限期 (GP) 僅 2 次重量級 barrier
  • 藉由精心排列的fence與旗標,提供立刻可回收 (reclaim) 的保證
+--------------+      membarrier      +--------------+
|   Reader A   |<-------------------->|   Reader B   |
|  nesting++   |                     |  nesting--   |
| signal_fence |                     | signal_fence |
+--------------+                     +--------------+
         \                                   /
          \                               __/ (qs: quiescent state)
           \__                            /
              \                          /
         (讀取端報告 qs)                /
                \                      /
             +--------------------------------+
             |             Updater            |
             |   gp_lock → 全域寬限期主體       |
             +--------------------------------+

每執行緒狀態:一個計數器+一個旗標

欄位 功能 存取模式
read_lock_nestingatomic_int rcu_read_lock()rcu_read_unlock() 的巢狀深度 讀取端遞增/遞減,更新端僅讀取
need_qsatomic_bool 要求該執行緒回報 quiescent state (QS) 更新端設為 true;讀取端或更新端設為 false
  • 只有在 need_qsfalseread_lock_nesting == 0 時,執行緒才被視為已經靜止。
  • 更新端可在掃描時直接清除此旗標,以免白忙一場。

讀取端快速路徑(x86-64 上約 10 ns)

rcu_read_lock():
    ++nesting                    // relaxed
    atomic_signal_fence();       // Fence A
  • atomic_signal_fence() 僅為 compiler barrier,不會產生 CPU 指令
  • 完全無系統呼叫、無 mutext lock、也無重量級 memory barrier
rcu_read_unlock():
    atomic_signal_fence();       // Fence B
    --nesting
    if (nesting == 1 && need_qs) // 關鍵分支
        report_qs();

report_qs() 先以 exchange 清掉 need_qs,再遞減全域 gp_holdouts

3 全域狀態

/* Per‑thread RCU state */
struct rcu_thread_state {
    atomic_int read_lock_nesting;  /* Nesting depth of rcu_read_lock() */
    struct rcu_thread_state *next; /* Intrusive list links        */
    struct rcu_thread_state **pprev;
    atomic_bool need_qs;
};

struct rcu_state {
    pthread_mutex_t   gp_lock;     /* 只由更新端持有 */
    rcu_thread_state *thread_head; /* 執行緒鏈結串列開頭 */
    uint32_t          thread_count;
    atomic_uint       gp_holdouts; /* 尚未靜止的讀取端數量 */
};
  • 同一時間僅一個更新端會執行 synchronize_rcu()(靠 gp_lock 序列化)
  • 讀取端完全不觸及 mutex lock 與 gp_holdouts

啟動寬限期

synchronize_rcu()

  1. 取得 gp_lock
  2. gp_holdouts = thread_countrelaxed
  3. atomic_thread_fence(memory_order_release)(Fence E):確保讀取端看見正確初值
  4. 對每個執行緒設 need_qs = true
  5. 執行私有 membarrier(Fence F):在所有處理器核插入 SC fence
  6. 掃描每個執行緒:若 nesting == 0,直接清 need_qs 並遞減 gp_holdouts,之後再發一次 membarrier(Fence G),確保順序對稱
  7. 若仍有剩餘 holdout,睡在 futex;最後一個讀取端會 futex_wake()
  8. atomic_thread_fence(memory_order_seq_cst)(Fence H)與讀取端 Fence D 配對,關閉整個寬限期
  9. 釋放 gp_lock,回傳

記憶體順序

讀取端                     更新端
------------              ---------------------------
A: signal_fence           ... 普通存取 ...
                          E: release fence (publish)
B: signal_fence           F: membarrier  ⇆ A
                          ... 掃描 need_qs ...
C: signal_fence           F: membarrier ⇆ C
D: acq_rel fence          H: SC fence ⇆ D
  • A ⇆ F:避免更新端的「舊資料」穿越到讀取臨界區
  • C ⇆ F:解決 store-buffer 競賽:要嘛讀取端看到 need_qs==true,要嘛更新端看到 nesting==0,不會同時漏看
  • D ⇆ H:確保讀取端臨界區內的所有寫入在寬限期結束前可見

Fence F 前的記憶體是舊世界;Fence H 後是新世界,沒有讀取端會橫跨二者。因此,在 synchronize_rcu() 返回後,舊資料就安全可釋放、重用或重建 —— 這正是 RCU 的本意。

考慮檔案名稱為 main.c,編譯和測試:

$ gcc -O2 -Wall -o main main.c -lpthread
$ ./main -r 32 -u 8 -t 3000 -i 20

參考輸出:

starting stress test: 32 readers, 8 updaters, 3000 ms, 20 µs
reader 21: 602853772 iterations
reader 19: 598778888 iterations
reader 14: 596326515 iterations
reader 12: 600671767 iterations
reader 29: 605255911 iterations
updater 3: 8248 iterations
...
reader 15: 569099859 iterations
updater 6: 8227 iterations
stress test complete in 3.00 s

標頭檔和巨集:

#include <inttypes.h>
#include <limits.h>
#include <linux/futex.h>
#include <linux/membarrier.h>
#include <pthread.h>
#include <stdalign.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <sys/syscall.h>
#include <sys/time.h>
#include <time.h>
#include <unistd.h>

#define CACHE_LINE_SIZE 64
#define CACHE_ALIGNED alignas(CACHE_LINE_SIZE)
#define unlikely(x) __builtin_expect(!!(x), 0)

測試程式碼:

#define MAX_READERS 64
#define MAX_UPDATERS 64

static atomic_bool should_exit;
static _Atomic(uint64_t *) global_shared_state;
static __thread unsigned long long iterations = 0;

static unsigned reader_count;
static unsigned updater_count;
static unsigned update_interval_us;
static unsigned test_time_ms;

static void *reader_func(void *arg)
{
    int id = (int) (intptr_t) arg;

    rcu_thread_online();

    while (!atomic_load_explicit(&should_exit, memory_order_relaxed)) {
        rcu_read_lock();

        /* Consume load with data dependency. */
        uint64_t value =
            *atomic_load_explicit(&global_shared_state, memory_order_relaxed);
        (void) value;

        iterations++;
        rcu_read_unlock();
    }

    printf("reader %d: %llu iterations\n", id, (unsigned long long) iterations);

    rcu_thread_offline();
    return NULL;
}

static void update_global_state(void)
{
    uint64_t *old_state;
    uint64_t *new_state = malloc(sizeof(*new_state));
    *new_state = 5;

    old_state = atomic_exchange_explicit(&global_shared_state, new_state,
                                         memory_order_release);

    if (old_state) {
        synchronize_rcu();
        *old_state = UINT64_MAX;
    }
}

static void *updater_func(void *arg)
{
    int id = (int) (intptr_t) arg;

    (void) id; /* Unused in current diagnostics */

    while (!atomic_load_explicit(&should_exit, memory_order_relaxed)) {
        iterations++;
        update_global_state();
        usleep(update_interval_us);
    }

    printf("updater %d: %llu iterations\n", id,
           (unsigned long long) iterations);
    return NULL;
}

static bool parse_opts(int argc, char *argv[])
{
    int opt;

    while ((opt = getopt(argc, argv, "r:u:t:i:")) != -1) {
        switch (opt) {
        case 'r':
            reader_count = atoi(optarg);
            break;
        case 'u':
            updater_count = atoi(optarg);
            break;
        case 't':
            test_time_ms = atoi(optarg);
            break;
        case 'i':
            update_interval_us = atoi(optarg);
            break;
        default:
            return false;
        }
    }

    if (test_time_ms == 0 || update_interval_us == 0 || reader_count == 0 ||
        updater_count == 0 || reader_count > MAX_READERS ||
        updater_count > MAX_UPDATERS) {
        return false;
    }

    return true;
}

int main(int argc, char *argv[])
{
    pthread_t readers[MAX_READERS];
    pthread_t updaters[MAX_UPDATERS];

    if (!parse_opts(argc, argv)) {
        fprintf(
            stderr,
            "usage: %s -r <readers 1-%u> -u <updaters 1-%u> -t <ms> -i <us>\n",
            argv[0], MAX_READERS, MAX_UPDATERS);
        return EXIT_FAILURE;
    }

    if (rcu_init() != 0)
        return EXIT_FAILURE;

    update_global_state();

    printf("starting stress test: %u readers, %u updaters, %u ms, %u µs\n",
           reader_count, updater_count, test_time_ms, update_interval_us);

    struct timespec start;
    clock_gettime(CLOCK_MONOTONIC, &start);

    for (unsigned i = 0; i < reader_count; i++) {
        if (pthread_create(&readers[i], NULL, reader_func,
                           (void *) (intptr_t) i) != 0) {
            perror("pthread_create reader");
            return EXIT_FAILURE;
        }
    }

    for (unsigned i = 0; i < updater_count; i++) {
        if (pthread_create(&updaters[i], NULL, updater_func,
                           (void *) (intptr_t) i) != 0) {
            perror("pthread_create updater");
            return EXIT_FAILURE;
        }
    }

    usleep(test_time_ms * 1000);

    atomic_store_explicit(&should_exit, true, memory_order_relaxed);

    for (unsigned i = 0; i < reader_count; i++)
        pthread_join(readers[i], NULL);
    for (unsigned i = 0; i < updater_count; i++)
        pthread_join(updaters[i], NULL);

    struct timespec end;
    clock_gettime(CLOCK_MONOTONIC, &end);

    uint64_t elapsed_us = (uint64_t) (end.tv_sec - start.tv_sec) * 1000000ULL +
                          (uint64_t) (end.tv_nsec - start.tv_nsec) / 1000ULL;

    printf("stress test complete in %llu.%02llu s\n",
           (unsigned long long) (elapsed_us / 1000000ULL),
           (unsigned long long) ((elapsed_us % 1000000ULL) / 10000ULL));

    return EXIT_SUCCESS;
}

RCU 實作程式碼:

/* One instance per thread */
static __thread CACHE_ALIGNED struct rcu_thread_state rcu_thread_state;

static void synchronize_rcu(void);

/* Lightweight SC fence */
static inline void light_fence_seq_cst_light(void)
{
    /* Readers only need a signal fence. */
    atomic_signal_fence(AAAA);
}

/* Read‑side primitives */
static inline void rcu_read_lock(void)
{
    int nesting = atomic_load_explicit(&rcu_thread_state.read_lock_nesting,
                                       memory_order_relaxed);
    atomic_store_explicit(&rcu_thread_state.read_lock_nesting, nesting + 1,
                          memory_order_relaxed);

    /* Fence A – pairs with Fence F in synchronize_rcu(). */
    light_fence_seq_cst_light();
}

static CACHE_ALIGNED struct rcu_state global_state;

static void futex_wait(_Atomic(uint32_t) *ptr, uint32_t expected)
{
    syscall(SYS_futex, ptr, FUTEX_WAIT, expected, NULL);
}

static void futex_wake(_Atomic(uint32_t) *ptr)
{
    syscall(SYS_futex, ptr, FUTEX_WAKE, INT_MAX);
}

/* Reader‑side quiescent‑state reporting helper */
static void rcu_read_unlock_report_qs(void)
{
    /* Fast path: updater already cleared need_qs. */
    if (!atomic_exchange_explicit(&rcu_thread_state.need_qs, false,
                                  memory_order_relaxed))
        return;

    /* Fence D – pairs with Fence H. */
    atomic_thread_fence(BBBB);

    if (atomic_fetch_sub_explicit(&global_state.gp_holdouts, 1,
                                  memory_order_relaxed) == 1)
        futex_wake(CCCC);
}

static inline void rcu_read_unlock(void)
{
    /* Fence B – pairs with Fence G in synchronize_rcu(). */
    light_fence_seq_cst_light();

    int nesting = atomic_load_explicit(&rcu_thread_state.read_lock_nesting,
                                       memory_order_relaxed);
    atomic_store_explicit(&rcu_thread_state.read_lock_nesting, GGGG,
                          memory_order_relaxed);

    if (nesting == 1) {
        /* Fence C – guards against store buffering. */
        light_fence_seq_cst_light();

        if (unlikely(atomic_load_explicit(&rcu_thread_state.need_qs,
                                          memory_order_relaxed)))
            rcu_read_unlock_report_qs();
    }
}

/* Heavyweight SC fence (membarrier) */
static void heavy_fence_seq_cst(void)
{
    /* Kernel provides an SC fence across all threads. */
    syscall(SYS_membarrier, MEMBARRIER_CMD_PRIVATE_EXPEDITED, 0, 0);
}

static int rcu_init(void)
{
    if (syscall(SYS_membarrier, MEMBARRIER_CMD_REGISTER_PRIVATE_EXPEDITED, 0,
                0) != 0) {
        perror("membarrier register failed");
        return -1;
    }

    if (pthread_mutex_init(&global_state.gp_lock, NULL) != 0) {
        perror("pthread_mutex_init");
        return -1;
    }

    return 0;
}

static void rcu_thread_online(void)
{
    pthread_mutex_lock(&global_state.gp_lock);

    rcu_thread_state.next = global_state.thread_head;
    rcu_thread_state.pprev = &global_state.thread_head;
    if (global_state.thread_head)
        global_state.thread_head->pprev = &rcu_thread_state.next;
    global_state.thread_head = &rcu_thread_state;
    global_state.thread_count++;

    pthread_mutex_unlock(&global_state.gp_lock);
}

static void rcu_thread_offline(void)
{
    pthread_mutex_lock(&global_state.gp_lock);

    *rcu_thread_state.pprev = rcu_thread_state.next;
    if (rcu_thread_state.next)
        rcu_thread_state.next->pprev = rcu_thread_state.pprev;
    global_state.thread_count--;

    pthread_mutex_unlock(&global_state.gp_lock);
}

/* Grace‑period core */
static void synchronize_rcu(void)
{
    uint32_t thread_count;
    uint32_t quiescent = 0;

    pthread_mutex_lock(&global_state.gp_lock);

    thread_count = global_state.thread_count;
    atomic_store_explicit(&global_state.gp_holdouts, thread_count,
                          memory_order_relaxed);

    /* Fence E – readers must see initial gp_holdouts value. */
    atomic_thread_fence(DDDD);

    for (struct rcu_thread_state *t = global_state.thread_head; t;
         t = t->next) {
        atomic_store_explicit(&t->need_qs, true, memory_order_relaxed);
    }

    /* Fence F – closes first half of grace period. */
    heavy_fence_seq_cst();

    for (struct rcu_thread_state *t = global_state.thread_head; t;
         t = t->next) {
        if (atomic_load_explicit(&t->read_lock_nesting, memory_order_relaxed) ==
            0) {
            if (atomic_exchange_explicit(&t->need_qs, false,
                                         memory_order_relaxed))
                quiescent++;
        }
    }

    if (FFFF > 0) {
        /* Reported some readers directly – emit Fence G. */
        heavy_fence_seq_cst();
        atomic_fetch_sub_explicit(&global_state.gp_holdouts, quiescent,
                                  memory_order_relaxed);
    }

    if (quiescent != thread_count) {
        /* Wait for remaining readers. */
        for (;;) {
            uint32_t h = atomic_load_explicit(&global_state.gp_holdouts,
                                              memory_order_relaxed);
            if (h == 0)
                break;
            futex_wait(&global_state.gp_holdouts, h);
        }

        /* Fence H – pairs with Fence D. */
        atomic_thread_fence(EEEE);
    }

    pthread_mutex_unlock(&global_state.gp_lock);
}

請補完程式碼,使其運作符合預期,書寫規範:

  • AAAA, BBBB, DDDD, EEEE 為 C11 atomics 規範的 memory order
  • CCCC, FFFF 為合法表示式
  • 均不包含空白字元

延伸問題:

  • 解釋上述程式碼的原理,指出其缺失並予以改進
  • 開發得以搭配 Userspace RCU 和改進後的上方程式碼運作的效能評比程式,量化在高度並行場景的效能表現
  • 將改進後的上方程式碼,運用於高度並行的 Valkey 實作,並評估其效能表現,隨後指出改進方案