--- tags: linux2024 --- # [2024q1](http://wiki.csie.ncku.edu.tw/linux/schedule) 第 12 週測驗題 :::info 目的: 檢驗學員對 [Atomics 操作](https://hackmd.io/@sysprog/concurrency-atomics), [Ring buffer](https://hackmd.io/@sysprog/concurrency-ringbuffer), futex, mmap 的認知 ::: ==[作答表單: 測驗 1-2](https://docs.google.com/forms/d/e/1FAIpQLScNGEQr_ojVWxFDZ8nnedr8eaC3wOuh9HDFHIPDOKyRELDMZg/viewform)== (針對 Linux 核心「設計」課程) ==[作答表單: 測驗 3](https://docs.google.com/forms/d/e/1FAIpQLSdaVtO6gjlJLXPMn5vGgoDFYJS42ogNHkUJ3tRzgE8jtumnkA/viewform)== (針對 Linux 核心「實作」課程) ### 測驗 `1` 以下程式碼藉由 futex 系統呼叫,達到等待 3 秒的效果: ```c #include <linux/futex.h> #include <stdint.h> #include <stdio.h> #include <sys/syscall.h> #include <time.h> #include <unistd.h> int futex_sleep(time_t sec, long ns) { uint32_t futex_word = 0; struct timespec timeout = {sec, ns}; return syscall(SYS_futex, AAAA, BBBB, futex_word, &timeout, NULL, 0); } int main() { time_t secs = 3; printf("Before futex_sleep for %ld seconds\n", secs); futex_sleep(secs, 0); printf("After futex_sleep\n"); return 0; } ``` 請補上程式碼,使運作符合預期。 --- ### 測驗 `2` 本題嘗試透過 C11 Atomics 和 Linux 核心提供的 [futex](https://man7.org/linux/man-pages/man2/futex.2.html) 系統呼叫,模擬 Go 程式語言的 [goroutine](https://tour.golang.org/concurrency/1) 及 [channel](https://tour.golang.org/concurrency/2) 機制。 [goroutine](https://tour.golang.org/concurrency/1) 是 Go 程式語言中的使用者層級的執行緒 (user-level thread, ULT),在語言層級支援多工處理,範例程式碼: ```go func main() { go say("world") say("hello") } ``` 示意圖: ![image](https://hackmd.io/_uploads/rJOveIvf0.png) > `say("world")` 執行於 goroutine 上,達到並行 換言之,goroutine 建立一組 ULT,語法: ```go go f(x, y, z) ``` 以 `go` 開頭的函式呼叫可使 `f` 執行在另一個 goroutine,其中 `f`, `x`, `y`, `z` 取自目前的 goroutine,注意到 `main` 函式也是執行於 goroutine 上。當名為 `main` 的 goroutine 執行結束,其他 goroutine 會跟著關閉。 多執行緒環境中,經常需要處理的是執行緒之間的狀態管理,其中一個常見的操作是等待 (wait),例如執行緒 `A` 需要等執行緒 `B` 運算並取得資料後,方可繼續執行。伴隨著 [goroutine](https://tour.golang.org/concurrency/1) 存在,還有個名為 [channel](https://tour.golang.org/concurrency/2) 的機制,最初是 goroutine 之間通訊的用途,但因其 blocking 的特性,也可作為等待 goroutine 的用途。 程式碼範例: ```go func say(s string, c chan string) { for i := 0; i < 5; i++ { time.Sleep(100 * time.Millisecond) fmt.Println(s) } c <- "FINISH" } func main() { ch := make(chan string) go say("world", ch) go say("hello", ch) <-ch <-ch } ``` ![image](https://hackmd.io/_uploads/HJqde8wfA.png) > 系統建立 2 個 goroutine,即 `say("world", ch)` 和 `say("hello", ch)`,因此需要等待 2 個 `FINISH` 推入 Channel 中,才能結束 main goroutine。 Go channel 區分「讀」和「寫」的操作: ```c Write(chan<- int) Read(<-chan int) ReadWrite(chan int) ``` 兩者的區分就看 `<-` 符號放在哪邊: `chan<-` 指向自己就是「寫」,`<-chan` 離開自己就是「讀」。 延伸〈[並行程式設計: Ring buffer](https://hackmd.io/@sysprog/concurrency-ringbuffer)〉,以下是用 C11 Atomics 和 futex 重寫的 Go channel 實作: ```c #include <linux/futex.h> #include <stdatomic.h> #include <stdint.h> #include <sys/syscall.h> #include <unistd.h> static inline long futex_wait(_Atomic uint32_t *uaddr, uint32_t val) { return syscall(SYS_futex, uaddr, FUTEX_WAIT, val, NULL, NULL, 0); } static inline long futex_wake(_Atomic uint32_t *uaddr, uint32_t val) { return syscall(SYS_futex, uaddr, FUTEX_WAKE, val, NULL, NULL, 0); } struct mutex { _Atomic uint32_t val; }; #define MUTEX_INITIALIZER \ (struct mutex) { .val = 0 } enum { UNLOCKED = 0, LOCKED_NO_WAITER = 1, LOCKED = 2, }; void mutex_init(struct mutex *mu) { mu->val = UNLOCKED; } void mutex_unlock(struct mutex *mu) { uint32_t orig = atomic_fetch_sub_explicit(&mu->val, 1, memory_order_relaxed); if (orig != LOCKED_NO_WAITER) { mu->val = UNLOCKED; futex_wake(&mu->val, 1); } } static uint32_t cas(_Atomic uint32_t *ptr, uint32_t expect, uint32_t new) { atomic_compare_exchange_strong_explicit( ptr, &expect, new, memory_order_acq_rel, memory_order_acquire); return expect; } void mutex_lock(struct mutex *mu) { uint32_t val = cas(&mu->val, UNLOCKED, LOCKED_NO_WAITER); if (val != UNLOCKED) { do { if (val == LOCKED || cas(&mu->val, LOCKED_NO_WAITER, LOCKED) != UNLOCKED) futex_wait(&mu->val, LOCKED); } while ((val = cas(&mu->val, UNLOCKED, LOCKED)) != UNLOCKED); } } #include <stdbool.h> struct chan_item { _Atomic uint32_t lap; void *data; }; struct chan { _Atomic bool closed; /* Unbuffered channels only: the pointer used for data exchange. */ _Atomic(void **) datap; /* Unbuffered channels only: guarantees that at most one writer and one * reader have the right to access. */ struct mutex send_mtx, recv_mtx; /* For unbuffered channels, these futexes start from 1 (CHAN_NOT_READY). * They are incremented to indicate that a thread is waiting. * They are decremented to indicate that data exchange is done. * * For buffered channels, these futexes represent credits for a reader or * write to retry receiving or sending. */ _Atomic uint32_t send_ftx, recv_ftx; /* Buffered channels only: number of waiting threads on the futexes. */ _Atomic size_t send_waiters, recv_waiters; /* Ring buffer */ size_t cap; _Atomic uint64_t head, tail; struct chan_item ring[0]; }; typedef void *(*chan_alloc_func_t)(size_t); #include <errno.h> #include <limits.h> #include <stdlib.h> #include <string.h> enum { CHAN_READY = 0, CHAN_NOT_READY = 1, CHAN_WAITING = 2, CHAN_CLOSED = 3, }; static void chan_init(struct chan *ch, size_t cap) { ch->closed = false; ch->datap = NULL; mutex_init(&ch->send_mtx), mutex_init(&ch->recv_mtx); if (!cap) ch->send_ftx = ch->recv_ftx = CHAN_NOT_READY; else ch->send_ftx = ch->recv_ftx = 0; ch->send_waiters = ch->recv_waiters = 0; ch->cap = cap; ch->head = (uint64_t) 1 << 32; ch->tail = 0; if (ch->cap > 0) memset(ch->ring, 0, cap * sizeof(struct chan_item)); } struct chan *chan_make(size_t cap, chan_alloc_func_t alloc) { struct chan *ch; if (!alloc || !(ch = alloc(sizeof(*ch) + cap * sizeof(struct chan_item)))) return NULL; chan_init(ch, cap); return ch; } static int chan_trysend_buf(struct chan *ch, void *data) { if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) { errno = EPIPE; return -1; } uint64_t tail, new_tail; struct chan_item *item; do { tail = atomic_load_explicit(&ch->tail, memory_order_acquire); uint32_t pos = tail, lap = tail >> 32; item = ch->ring + pos; if (atomic_load_explicit(&item->lap, memory_order_acquire) != lap) { errno = EAGAIN; return -1; } if (pos + 1 == ch->cap) new_tail = (uint64_t)(lap + 2) << 32; else new_tail = tail + 1; } while (!atomic_compare_exchange_weak_explicit(&ch->tail, &tail, new_tail, memory_order_acq_rel, memory_order_acquire)); item->data = data; atomic_fetch_add_explicit(&item->lap, 1, memory_order_release); return 0; } static int chan_send_buf(struct chan *ch, void *data) { while (chan_trysend_buf(ch, data) == -1) { if (errno != EAGAIN) return -1; uint32_t v = 1; while (!atomic_compare_exchange_weak_explicit(&ch->send_ftx, &v, v - 1, memory_order_acq_rel, memory_order_acquire)) { if (v == 0) { atomic_fetch_add_explicit(&ch->send_waiters, 1, memory_order_acq_rel); futex_wait(&ch->send_ftx, 0); atomic_fetch_sub_explicit(&ch->send_waiters, 1, memory_order_acq_rel); v = 1; } } } atomic_fetch_add_explicit(&ch->recv_ftx, 1, memory_order_acq_rel); if (atomic_load_explicit(&ch->recv_waiters, memory_order_relaxed) > 0) futex_wake(&ch->recv_ftx, 1); return 0; } static int chan_tryrecv_buf(struct chan *ch, void **data) { if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) { errno = EPIPE; return -1; } uint64_t head, new_head; struct chan_item *item; do { head = atomic_load_explicit(&ch->head, memory_order_acquire); uint32_t pos = head, lap = head >> 32; item = ch->ring + pos; if (atomic_load_explicit(&item->lap, memory_order_acquire) != lap) { errno = EAGAIN; return -1; } if (pos + 1 == ch->cap) new_head = (uint64_t)(lap + 2) << 32; else new_head = head + 1; } while (!atomic_compare_exchange_weak_explicit(&ch->head, &head, new_head, memory_order_acq_rel, memory_order_acquire)); *data = item->data; atomic_fetch_add_explicit(&item->lap, 1, memory_order_release); return 0; } static int chan_recv_buf(struct chan *ch, void **data) { while (chan_tryrecv_buf(ch, data) == -1) { if (errno != EAGAIN) return -1; uint32_t v = 1; while (!atomic_compare_exchange_weak_explicit(&ch->recv_ftx, &v, v - 1, memory_order_acq_rel, memory_order_acquire)) { if (v == 0) { atomic_fetch_add_explicit(&ch->recv_waiters, 1, memory_order_acq_rel); futex_wait(&ch->recv_ftx, 0); atomic_fetch_sub_explicit(&ch->recv_waiters, 1, memory_order_acq_rel); v = 1; } } } atomic_fetch_add_explicit(&ch->send_ftx, 1, memory_order_acq_rel); if (atomic_load_explicit(&ch->send_waiters, memory_order_relaxed) > 0) futex_wake(&ch->send_ftx, 1); return 0; } static int chan_send_unbuf(struct chan *ch, void *data) { if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) { errno = EPIPE; return -1; } mutex_lock(&ch->send_mtx); void **ptr = NULL; if (!atomic_compare_exchange_strong_explicit(&ch->datap, &ptr, &data, memory_order_acq_rel, memory_order_acquire)) { *ptr = data; atomic_store_explicit(&ch->datap, NULL, memory_order_release); if (atomic_fetch_sub_explicit(&ch->recv_ftx, 1, memory_order_acquire) == CHAN_WAITING) futex_wake(&ch->recv_ftx, CCCC); } else { if (atomic_fetch_add_explicit(&ch->send_ftx, 1, memory_order_acquire) == CHAN_NOT_READY) { do { futex_wait(&ch->send_ftx, CHAN_WAITING); } while (atomic_load_explicit( &ch->send_ftx, memory_order_acquire) == CHAN_WAITING); if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) { errno = EPIPE; return -1; } } } mutex_unlock(&ch->send_mtx); return 0; } static int chan_recv_unbuf(struct chan *ch, void **data) { if (!data) { errno = EINVAL; return -1; } if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) { errno = EPIPE; return -1; } mutex_lock(&ch->recv_mtx); void **ptr = NULL; if (!atomic_compare_exchange_strong_explicit(&ch->datap, &ptr, data, memory_order_acq_rel, memory_order_acquire)) { *data = *ptr; atomic_store_explicit(&ch->datap, NULL, memory_order_release); if (atomic_fetch_sub_explicit(&ch->send_ftx, 1, memory_order_acquire) == CHAN_WAITING) futex_wake(&ch->send_ftx, 1); } else { if (atomic_fetch_add_explicit(&ch->recv_ftx, 1, memory_order_acquire) == CHAN_NOT_READY) { do { futex_wait(&ch->recv_ftx, CHAN_WAITING); } while (atomic_load_explicit( &ch->recv_ftx, memory_order_acquire) == CHAN_WAITING); if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) { errno = EPIPE; return -1; } } } mutex_unlock(&ch->recv_mtx); return 0; } void chan_close(struct chan *ch) { ch->closed = true; if (!ch->cap) { atomic_store(&ch->recv_ftx, CHAN_CLOSED); atomic_store(&ch->send_ftx, CHAN_CLOSED); } futex_wake(&ch->recv_ftx, INT_MAX); futex_wake(&ch->send_ftx, INT_MAX); } int chan_send(struct chan *ch, void *data) { return !ch->cap ? chan_send_unbuf(ch, data) : chan_send_buf(ch, data); } int chan_recv(struct chan *ch, void **data) { return !ch->cap ? chan_recv_unbuf(ch, data) : chan_recv_buf(ch, data); } #include <pthread.h> typedef void *(*thread_func_t)(void *); #include <assert.h> #include <err.h> #include <stdio.h> enum { MSG_MAX = 100000, THREAD_MAX = 1024, }; struct thread_arg { size_t id; size_t from, to; struct chan *ch; }; static pthread_t reader_tids[THREAD_MAX], writer_tids[THREAD_MAX]; struct thread_arg reader_args[THREAD_MAX], writer_args[THREAD_MAX]; static _Atomic size_t msg_total, msg_count[MSG_MAX]; static void *writer(void *arg) { struct thread_arg *a = arg; for (size_t i = a->from; i < a->to; i++) if (chan_send(a->ch, (void *) i) == -1) break; return 0; } static void *reader(void *arg) { struct thread_arg *a = arg; size_t msg, received = 0, expect = a->to - a->from; while (received < expect) { if (chan_recv(a->ch, (void **) &msg) == -1) break; atomic_fetch_add_explicit(&msg_count[msg], 1, memory_order_relaxed); ++received; } return 0; } static void create_threads(const size_t n, thread_func_t fn, struct thread_arg *args, pthread_t *tids, struct chan *ch) { size_t each = msg_total / n, left = msg_total % n; size_t from = 0; for (size_t i = 0; i < n; i++) { size_t batch = each; if (left > 0) { batch++; left--; } args[i] = (struct thread_arg){ .id = i, .ch = ch, .from = from, .to = from + batch, }; pthread_create(&tids[i], NULL, fn, &args[i]); from += batch; } } static void join_threads(const size_t n, pthread_t *tids) { for (size_t i = 0; i < n; i++) pthread_join(tids[i], NULL); } static void test_chan(const size_t repeat, const size_t cap, const size_t total, const size_t n_readers, thread_func_t reader_fn, const size_t n_writers, thread_func_t writer_fn) { if (n_readers > THREAD_MAX || n_writers > THREAD_MAX) errx(1, "too many threads to create"); if (total > MSG_MAX) errx(1, "too many messages to send"); struct chan *ch = chan_make(cap, malloc); if (!ch) errx(1, "fail to create channel"); msg_total = total; for (size_t rep = 0; rep < repeat; rep++) { printf("cap=%zu readers=%zu writers=%zu msgs=%zu ... %zu/%zu\n", cap, n_readers, n_writers, msg_total, rep + 1, repeat); memset(msg_count, 0, sizeof(size_t) * msg_total); create_threads(n_readers, reader_fn, reader_args, reader_tids, ch); create_threads(n_writers, writer_fn, writer_args, writer_tids, ch); join_threads(n_readers, reader_tids); join_threads(n_writers, writer_tids); for (size_t i = 0; i < msg_total; i++) assert(msg_count[i] == 1); } chan_close(ch); free(ch); } int main() { test_chan(50, 0, 500, 80, reader, 80, writer); test_chan(50, 7, 500, 80, reader, 80, writer); return 0; } ``` 參考輸出如下: ``` cap=0 readers=80 writers=80 msgs=500 ... 1/50 cap=0 readers=80 writers=80 msgs=500 ... 2/50 cap=0 readers=80 writers=80 msgs=500 ... 3/50 ... cap=0 readers=80 writers=80 msgs=500 ... 48/50 cap=0 readers=80 writers=80 msgs=500 ... 49/50 cap=0 readers=80 writers=80 msgs=500 ... 50/50 cap=7 readers=80 writers=80 msgs=500 ... 1/50 cap=7 readers=80 writers=80 msgs=500 ... 2/50 cap=7 readers=80 writers=80 msgs=500 ... 3/50 ... cap=7 readers=80 writers=80 msgs=500 ... 48/50 cap=7 readers=80 writers=80 msgs=500 ... 49/50 cap=7 readers=80 writers=80 msgs=500 ... 50/50 ``` 在本程式中測試二種不同資料儲存方式,分別是 `unbuffer` 以及 [ring buffer](https://en.wikipedia.org/wiki/Circular_buffer)。在 ring buffer 定義中,每個結構體裡面含有 `lap` 及 `data` + `data`: 即資料存放位置 + `lap`: 類似一個計數器(用以紀錄) 使用 ring buffer 時要注意區分現在是「滿」抑或「空」,本程式藉由 `lap` 用以紀錄讀寫狀態,為了節省資料空間,`lap` 及 `pos` 是放在 `uint64_t` 底下,在需要讀取的時候分別用 bitwise 操作取出,此舉剛好滿足 atomic function 的參數需求 (`uint64_t`)。 ```c /* Ring buffer */ size_t cap; _Atomic uint64_t head, tail; struct chan_item ring[0]; tail = atomic_load_explicit(&ch->tail, memory_order_acquire); uint32_t pos = tail, lap = tail >> 32; item = ch->ring + pos; ``` 程式如何判斷 ring buffer 是否已滿?還是空的?判斷標準是依據 `lap` 的數值而定,假設 `lap` 數值相同即代表可將資料寫入 ring buffer 當中,若不相同則代表 ring buffer 已滿。 ```c if (atomic_load_explicit(&item->lap, memory_order_acquire) != lap) { errno = EAGAIN; return -1; } // the final element of ring buffer(tail) if (pos + 1 == ch->cap) new_tail = (uint64_t)(lap + 2) << 32; else new_tail = tail + 1; } while (!atomic_compare_exchange_weak_explicit(&ch->tail, &tail, new_tail, memory_order_acq_rel, memory_order_acquire)); ``` 可見當 pos 是最後一個空位置時,`channel->tail` 會被改為 `(uint64_t)(lap+2) << 32`。那為何不用指定 `pos`? 因為重新一輪的位置一定是從 0 開始,只要紀錄新 lap 即可。tail 的 lap 會是 2,然後 head 的 lap 會是 1 $\to$ 所以 lap 不符合寫入即停止,直到 head 指向的 item 的 lap 改為 2 這裡要注意到一件事,`head, tail` 的 lap 不會跟 `item` 的 lap 相同,由上述規律我們可以得知 + 當 lap 等於 `2n` 的時候 $\to$ 寫入資料 + 當 lap 等於 `2n + 1` 的時候 $\to$ 讀取資料 延伸閱讀: * [The Go Blog: Share Memory By Communicating](https://blog.golang.org/codelab-share) 請補上程式碼,使運作符合預期。 ==作答區== CCCC = ? :::success 延伸問題: 1. 解釋上述程式碼運作原理,搭配〈[並行程式設計: Ring buffer](https://hackmd.io/@sysprog/concurrency-ringbuffer)〉 2. 設計實驗來探討上述讀/寫操作的效率 3. 排除 ThreadSanitizer 的錯誤訊息並提出改進方案 4. 參照 [libchannel](https://github.com/emilianobilli/libchannel),提出通用性更高的實作 ::: --- ### 測驗 `3` 考慮我們想要實作一個 lock-free 的 single-producer/single-consumer (SPSC) 並行程式,底層使用 [ring buffer](https://en.wikipedia.org/wiki/Circular_buffer),且避免 [false sharing](https://en.wikipedia.org/wiki/False_sharing)。程式碼可見: [main.c](https://gist.github.com/jserv/03adfccf69ffc756a7504f9caf7058e9) (部分遮蔽) 編譯方式: ```shell gcc -Wall -O2 -I. -o main main.c -lpthread ``` 測試程式的參考輸出: (其中 `4` 指定 4 個處理器核) ```shell $ ./main 4 consumer 1: ---2---- producer 0: ---1---- consumer 2: ---4---- Consumer created... consumer 3: ---6---- Consumer created... Consumer created... producer 7 cycles/op consumer: 21 cycles/op consumer: 21 cycles/op consumer: 21 cycles/op Done! ``` 其中利用到針對多核處理器的 [spinlock: significant optimizations](https://en.wikipedia.org/wiki/Spinlock#Significant_optimizations) 技巧,對照 [cserv](https://github.com/sysprog21/cserv) 專案的 [src/util/spinlock.h](https://github.com/sysprog21/cserv/blob/master/src/util/spinlock.h)。 已知執行過程不會遇到任何 assert 錯誤,請補完程式碼,使其執行符合預期。作答規範: * `AAAA` 和 `BBBB` 皆為表示式,不該有括號 (及 `(` 與 `)`) * 以最精簡的形式作答 :::success 延伸問題: 1. 解釋上述程式碼運作原理,搭配〈[並行程式設計: Ring buffer](https://hackmd.io/@sysprog/concurrency-ringbuffer)〉 2. 設計實驗來探討上述 pub-sub 操作的效率 3. 排除 ThreadSanitizer 的錯誤訊息並提出改進方案 :::