# [2021q1](http://wiki.csie.ncku.edu.tw/linux/schedule) 第 10 週測驗題
###### tags: `linux2021`
:::info
目的: 檢驗學員對 ==[Linux 核心設計: 不僅是個執行單元的 Process](https://hackmd.io/@sysprog/linux-process)==, ==[Linux 核心設計: 淺談同步機制](https://hackmd.io/@sysprog/linux-sync)==, ==[並行程式設計](https://hackmd.io/@sysprog/concurrency)== 的認知
:::
==[作答表單](https://docs.google.com/forms/d/e/1FAIpQLSf6MGyB9e3svQGb5AuFf2L8EHmtUfGwGESKLLa9NN44D7f29g/viewform)==
### 測驗 `1`
本題嘗試透過 C11 Atomics 和 Linux 核心提供的 [futex](https://man7.org/linux/man-pages/man2/futex.2.html) 系統呼叫,模擬 Go 程式語言的 [goroutine](https://tour.golang.org/concurrency/1) 及 [channel](https://tour.golang.org/concurrency/2) 機制。
[goroutine](https://tour.golang.org/concurrency/1) 是 Go 程式語言中的使用者層級的執行緒 (user-level thread, ULT),在語言層級支援多工處理,範例程式碼:
```go
func main() {
go say("world")
say("hello")
}
```
示意圖:
![](https://i.imgur.com/AvE11MY.png)
> `say("world")` 執行於 goroutine 上,達到並行
換言之,goroutine 建立一組 ULT,語法:
```go
go f(x, y, z)
```
以 `go` 開頭的函式呼叫可使 `f` 執行在另一個 goroutine,其中 `f`, `x`, `y`, `z` 取自目前的 goroutine,注意到 `main` 函式也是執行於 goroutine 上。當名為 `main` 的 goroutine 執行結束,其他 goroutine 會跟著關閉。
多執行緒環境中,經常需要處理的是執行緒之間的狀態管理,其中一個常見的操作是等待 (wait),例如執行緒 `A` 需要等執行緒 `B` 運算並取得資料後,方可繼續執行。伴隨著 [goroutine](https://tour.golang.org/concurrency/1) 存在,還有個名為 [channel](https://tour.golang.org/concurrency/2) 的機制,最初是 goroutine 之間通訊的用途,但因其 blocking 的特性,也可作為等待 goroutine 的用途。
程式碼範例:
```go
func say(s string, c chan string) {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
c <- "FINISH"
}
func main() {
ch := make(chan string)
go say("world", ch)
go say("hello", ch)
<-ch
<-ch
}
```
![](https://i.imgur.com/OaZaIJO.png)
> 系統ㄓ起了兩個 goroutine,即 `say("world", ch)` 和 `say("hello", ch)`,因此需要等待 2 個 `FINISH` 推入 Channel 中,才能結束 main goroutine。
Go channel 區分「讀」和「寫」的操作:
```cpp
Write(chan<- int)
Read(<-chan int)
ReadWrite(chan int)
```
兩者的區分就看 `<-` 符號放在哪邊: `chan<-` 指向自己就是「寫」,`<-chan` 離開自己就是「讀」。
以下是用 C11 Atomics + futex 重寫的 Go channel 實作:
```cpp
#include <linux/futex.h>
#include <stdatomic.h>
#include <stdint.h>
#include <sys/syscall.h>
#include <unistd.h>
static inline long futex_wait(_Atomic uint32_t *uaddr, uint32_t val)
{
return syscall(SYS_futex, uaddr, FUTEX_WAIT, val, NULL, NULL, 0);
}
static inline long futex_wake(_Atomic uint32_t *uaddr, uint32_t val)
{
return syscall(SYS_futex, uaddr, FUTEX_WAKE, val, NULL, NULL, 0);
}
struct mutex {
_Atomic uint32_t val;
};
#define MUTEX_INITIALIZER \
(struct mutex) { .val = 0 }
enum {
UNLOCKED = 0,
LOCKED_NO_WAITER = 1,
LOCKED = 2,
};
void mutex_init(struct mutex *mu)
{
mu->val = UNLOCKED;
}
void mutex_unlock(struct mutex *mu)
{
uint32_t orig =
atomic_fetch_sub_explicit(&mu->val, 1, memory_order_relaxed);
if (orig != LOCKED_NO_WAITER) {
mu->val = UNLOCKED;
futex_wake(&mu->val, 1);
}
}
static uint32_t cas(_Atomic uint32_t *ptr, uint32_t expect, uint32_t new)
{
atomic_compare_exchange_strong_explicit(
ptr, &expect, new, memory_order_acq_rel, memory_order_acquire);
return expect;
}
void mutex_lock(struct mutex *mu)
{
uint32_t val = cas(&mu->val, UNLOCKED, LOCKED_NO_WAITER);
if (val != UNLOCKED) {
do {
if (val == LOCKED ||
cas(&mu->val, LOCKED_NO_WAITER, LOCKED) != UNLOCKED)
futex_wait(&mu->val, LOCKED);
} while ((val = cas(&mu->val, UNLOCKED, LOCKED)) != UNLOCKED);
}
}
#include <stdbool.h>
struct chan_item {
_Atomic uint32_t lap;
void *data;
};
struct chan {
_Atomic bool closed;
/* Unbuffered channels only: the pointer used for data exchange. */
_Atomic(void **) datap;
/* Unbuffered channels only: guarantees that at most one writer and one
* reader have the right to access.
*/
struct mutex send_mtx, recv_mtx;
/* For unbuffered channels, these futexes start from 1 (CHAN_NOT_READY).
* They are incremented to indicate that a thread is waiting.
* They are decremented to indicate that data exchange is done.
*
* For buffered channels, these futexes represent credits for a reader or
* write to retry receiving or sending.
*/
_Atomic uint32_t send_ftx, recv_ftx;
/* Buffered channels only: number of waiting threads on the futexes. */
_Atomic size_t send_waiters, recv_waiters;
/* Ring buffer */
size_t cap;
_Atomic uint64_t head, tail;
struct chan_item ring[0];
};
typedef void *(*chan_alloc_func_t)(size_t);
#include <errno.h>
#include <limits.h>
#include <stdlib.h>
#include <string.h>
enum {
CHAN_READY = 0,
CHAN_NOT_READY = 1,
CHAN_WAITING = 2,
CHAN_CLOSED = 3,
};
static void chan_init(struct chan *ch, size_t cap)
{
ch->closed = false;
ch->datap = NULL;
mutex_init(&ch->send_mtx), mutex_init(&ch->recv_mtx);
if (!cap)
ch->send_ftx = ch->recv_ftx = CHAN_NOT_READY;
else
ch->send_ftx = ch->recv_ftx = 0;
ch->send_waiters = ch->recv_waiters = 0;
ch->cap = cap;
ch->head = (uint64_t) 1 << 32;
ch->tail = 0;
if (ch->cap > 0) memset(ch->ring, 0, cap * sizeof(struct chan_item));
}
struct chan *chan_make(size_t cap, chan_alloc_func_t alloc)
{
struct chan *ch;
if (!alloc || !(ch = alloc(sizeof(*ch) + cap * sizeof(struct chan_item))))
return NULL;
chan_init(ch, cap);
return ch;
}
static int chan_trysend_buf(struct chan *ch, void *data)
{
if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) {
errno = EPIPE;
return -1;
}
uint64_t tail, new_tail;
struct chan_item *item;
do {
tail = atomic_load_explicit(&ch->tail, memory_order_acquire);
uint32_t pos = tail, lap = tail >> 32;
item = ch->ring + pos;
if (atomic_load_explicit(&item->lap, memory_order_acquire) != lap) {
errno = EAGAIN;
return -1;
}
if (pos + 1 == ch->cap)
new_tail = (uint64_t)(lap + 2) << 32;
else
new_tail = tail + 1;
} while (!atomic_compare_exchange_weak_explicit(&ch->tail, &tail, new_tail,
memory_order_acq_rel,
memory_order_acquire));
item->data = data;
atomic_fetch_add_explicit(&item->lap, 1, memory_order_release);
return 0;
}
static int chan_send_buf(struct chan *ch, void *data)
{
while (chan_trysend_buf(ch, data) == -1) {
if (errno != EAGAIN) return -1;
uint32_t v = 1;
while (!atomic_compare_exchange_weak_explicit(&ch->send_ftx, &v, v - 1,
memory_order_acq_rel,
memory_order_acquire)) {
if (v == 0) {
atomic_fetch_add_explicit(&ch->send_waiters, 1,
memory_order_acq_rel);
futex_wait(&ch->send_ftx, 0);
atomic_fetch_sub_explicit(&ch->send_waiters, 1,
memory_order_acq_rel);
v = 1;
}
}
}
atomic_fetch_add_explicit(&ch->recv_ftx, 1, memory_order_acq_rel);
if (atomic_load_explicit(&ch->recv_waiters, memory_order_relaxed) > 0)
futex_wake(&ch->recv_ftx, 1);
return 0;
}
static int chan_tryrecv_buf(struct chan *ch, void **data)
{
if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) {
errno = EPIPE;
return -1;
}
uint64_t head, new_head;
struct chan_item *item;
do {
head = atomic_load_explicit(&ch->head, memory_order_acquire);
uint32_t pos = head, lap = head >> 32;
item = ch->ring + pos;
if (atomic_load_explicit(&item->lap, memory_order_acquire) != lap) {
errno = EAGAIN;
return -1;
}
if (pos + 1 == ch->cap)
new_head = (uint64_t)(XXX) << 32;
else
new_head = head + 1;
} while (!atomic_compare_exchange_weak_explicit(&ch->head, &head, new_head,
memory_order_acq_rel,
memory_order_acquire));
*data = item->data;
atomic_fetch_add_explicit(&item->lap, 1, memory_order_release);
return 0;
}
static int chan_recv_buf(struct chan *ch, void **data)
{
while (chan_tryrecv_buf(ch, data) == -1) {
if (errno != EAGAIN) return -1;
uint32_t v = 1;
while (!atomic_compare_exchange_weak_explicit(&ch->recv_ftx, &v, v - 1,
memory_order_acq_rel,
memory_order_acquire)) {
if (v == 0) {
atomic_fetch_add_explicit(&ch->recv_waiters, 1,
memory_order_acq_rel);
futex_wait(&ch->recv_ftx, 0);
atomic_fetch_sub_explicit(&ch->recv_waiters, 1,
memory_order_acq_rel);
v = 1;
}
}
}
atomic_fetch_add_explicit(&ch->send_ftx, 1, memory_order_acq_rel);
if (atomic_load_explicit(&ch->send_waiters, memory_order_relaxed) > 0)
futex_wake(&ch->send_ftx, 1);
return 0;
}
static int chan_send_unbuf(struct chan *ch, void *data)
{
if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) {
errno = EPIPE;
return -1;
}
mutex_lock(&ch->send_mtx);
void **ptr = NULL;
if (!atomic_compare_exchange_strong_explicit(&ch->datap, &ptr, &data,
memory_order_acq_rel,
memory_order_acquire)) {
*ptr = data;
atomic_store_explicit(&ch->datap, NULL, memory_order_release);
if (atomic_fetch_sub_explicit(&ch->recv_ftx, 1, memory_order_acquire) ==
CHAN_WAITING)
futex_wake(&ch->recv_ftx, YYY);
} else {
if (atomic_fetch_add_explicit(&ch->send_ftx, 1, memory_order_acquire) ==
CHAN_NOT_READY) {
do {
futex_wait(&ch->send_ftx, CHAN_WAITING);
} while (atomic_load_explicit(
&ch->send_ftx, memory_order_acquire) == CHAN_WAITING);
if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) {
errno = EPIPE;
return -1;
}
}
}
mutex_unlock(&ch->send_mtx);
return 0;
}
static int chan_recv_unbuf(struct chan *ch, void **data)
{
if (!data) {
errno = EINVAL;
return -1;
}
if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) {
errno = EPIPE;
return -1;
}
mutex_lock(&ch->recv_mtx);
void **ptr = NULL;
if (!atomic_compare_exchange_strong_explicit(&ch->datap, &ptr, data,
memory_order_acq_rel,
memory_order_acquire)) {
*data = *ptr;
atomic_store_explicit(&ch->datap, NULL, memory_order_release);
if (atomic_fetch_sub_explicit(&ch->send_ftx, 1, memory_order_acquire) ==
CHAN_WAITING)
futex_wake(&ch->send_ftx, 1);
} else {
if (atomic_fetch_add_explicit(&ch->recv_ftx, 1, memory_order_acquire) ==
CHAN_NOT_READY) {
do {
futex_wait(&ch->recv_ftx, CHAN_WAITING);
} while (atomic_load_explicit(
&ch->recv_ftx, memory_order_acquire) == CHAN_WAITING);
if (atomic_load_explicit(&ch->closed, memory_order_relaxed)) {
errno = EPIPE;
return -1;
}
}
}
mutex_unlock(&ch->recv_mtx);
return 0;
}
void chan_close(struct chan *ch)
{
ch->closed = true;
if (!ch->cap) {
atomic_store(&ch->recv_ftx, CHAN_CLOSED);
atomic_store(&ch->send_ftx, CHAN_CLOSED);
}
futex_wake(&ch->recv_ftx, INT_MAX);
futex_wake(&ch->send_ftx, INT_MAX);
}
int chan_send(struct chan *ch, void *data)
{
return !ch->cap ? chan_send_unbuf(ch, data) : chan_send_buf(ch, data);
}
int chan_recv(struct chan *ch, void **data)
{
return !ch->cap ? chan_recv_unbuf(ch, data) : chan_recv_buf(ch, data);
}
#include <pthread.h>
typedef void *(*thread_func_t)(void *);
#include <assert.h>
#include <err.h>
#include <stdio.h>
enum {
MSG_MAX = 100000,
THREAD_MAX = 1024,
};
struct thread_arg {
size_t id;
size_t from, to;
struct chan *ch;
};
static pthread_t reader_tids[THREAD_MAX], writer_tids[THREAD_MAX];
struct thread_arg reader_args[THREAD_MAX], writer_args[THREAD_MAX];
static _Atomic size_t msg_total, msg_count[MSG_MAX];
static void *writer(void *arg)
{
struct thread_arg *a = arg;
for (size_t i = a->from; i < a->to; i++)
if (chan_send(a->ch, (void *) i) == -1) break;
return 0;
}
static void *reader(void *arg)
{
struct thread_arg *a = arg;
size_t msg, received = 0, expect = a->to - a->from;
while (received < expect) {
if (chan_recv(a->ch, (void **) &msg) == -1) break;
atomic_fetch_add_explicit(&msg_count[msg], 1, memory_order_relaxed);
++received;
}
return 0;
}
static void create_threads(const size_t n,
thread_func_t fn,
struct thread_arg *args,
pthread_t *tids,
struct chan *ch)
{
size_t each = msg_total / n, left = msg_total % n;
size_t from = 0;
for (size_t i = 0; i < n; i++) {
size_t batch = each;
if (left > 0) {
batch++;
left--;
}
args[i] = (struct thread_arg){
.id = i,
.ch = ch,
.from = from,
.to = from + batch,
};
pthread_create(&tids[i], NULL, fn, &args[i]);
from += batch;
}
}
static void join_threads(const size_t n, pthread_t *tids)
{
for (size_t i = 0; i < n; i++) pthread_join(tids[i], NULL);
}
static void test_chan(const size_t repeat,
const size_t cap,
const size_t total,
const size_t n_readers,
thread_func_t reader_fn,
const size_t n_writers,
thread_func_t writer_fn)
{
if (n_readers > THREAD_MAX || n_writers > THREAD_MAX)
errx(1, "too many threads to create");
if (total > MSG_MAX) errx(1, "too many messages to send");
struct chan *ch = chan_make(cap, malloc);
if (!ch) errx(1, "fail to create channel");
msg_total = total;
for (size_t rep = 0; rep < repeat; rep++) {
printf("cap=%zu readers=%zu writers=%zu msgs=%zu ... %zu/%zu\n", cap,
n_readers, n_writers, msg_total, rep + 1, repeat);
memset(msg_count, 0, sizeof(size_t) * msg_total);
create_threads(n_readers, reader_fn, reader_args, reader_tids, ch);
create_threads(n_writers, writer_fn, writer_args, writer_tids, ch);
join_threads(n_readers, reader_tids);
join_threads(n_writers, writer_tids);
for (size_t i = 0; i < msg_total; i++) assert(msg_count[i] == 1);
}
chan_close(ch);
free(ch);
}
int main()
{
test_chan(50, 0, 500, 80, reader, 80, writer);
test_chan(50, 7, 500, 80, reader, 80, writer);
return 0;
}
```
參考輸出如下:
```
cap=0 readers=80 writers=80 msgs=500 ... 1/50
cap=0 readers=80 writers=80 msgs=500 ... 2/50
cap=0 readers=80 writers=80 msgs=500 ... 3/50
...
cap=0 readers=80 writers=80 msgs=500 ... 48/50
cap=0 readers=80 writers=80 msgs=500 ... 49/50
cap=0 readers=80 writers=80 msgs=500 ... 50/50
cap=7 readers=80 writers=80 msgs=500 ... 1/50
cap=7 readers=80 writers=80 msgs=500 ... 2/50
cap=7 readers=80 writers=80 msgs=500 ... 3/50
...
cap=7 readers=80 writers=80 msgs=500 ... 48/50
cap=7 readers=80 writers=80 msgs=500 ... 49/50
cap=7 readers=80 writers=80 msgs=500 ... 50/50
```
延伸閱讀:
* [The Go Blog: Share Memory By Communicating](https://blog.golang.org/codelab-share)
請補上程式碼,使運作符合預期。
==作答區==
XXX = ?
YYY = ?
:::success
延伸問題:
1. 解釋上述程式碼運作原理
2. 設計實驗來探討上述讀/寫操作的效率
3. 排除 ThreadSanitizer 的錯誤訊息並提出改進方案
:::