---
tags: System Software
---
# Concurrent Programming
這篇文章將逐步探討 [concurrent-programs](https://github.com/RinHizakura/concurrent-programs) 中的各個程式以及其延伸議題,整理並學習與並行程式設計相關的知識。
## Thread Pool
> Material: [2021q1 第 4 週測驗題](https://hackmd.io/@sysprog/linux2021-quiz4)
此章節所要探討的是 [thread pool](https://github.com/sysprog21/concurrent-programs/tree/master/tpool) 的實作邏輯及其中細節。在存在大量執行緒的環境中,建立和銷毀執行緒物件的成本相當可觀,如果頻繁的對執行緒做建立和銷毀,可能導致帶來額外的延遲時間超出實體的任務本身。考慮到硬體的有效處理器數量有限,執行緒的數量與計算效能的提升並非總是正向成長,使用 thread pool 可控管執行分配到處理器的執行緒數量,並妥善的運用已存在的 thread。
在原題目中,希望透過 [Bailey–Borwein–Plouffe formula](https://en.wikipedia.org/wiki/Bailey%E2%80%93Borwein%E2%80%93Plouffe_formula) 撰寫求圓周率近似值的程式碼,得益於此公式,圓周率的計算可以被分成許多個子任務。而透過所設計的 thread pool 介面,可以將任務指派到不同的 thread 進行,最後再整合子任務的結果得到圓周率近似值。
下面來看設計 thread pool 的關鍵程式碼。
### 實作
#### `jobqueue_create`
```cpp
typedef struct __jobqueue {
threadtask_t *head, *tail;
pthread_cond_t cond_nonempty;
pthread_mutex_t rwlock;
} jobqueue_t;
static jobqueue_t *jobqueue_create(void)
{
jobqueue_t *jobqueue = malloc(sizeof(jobqueue_t));
if (jobqueue) {
jobqueue->head = jobqueue->tail = NULL;
pthread_cond_init(&jobqueue->cond_nonempty, NULL);
pthread_mutex_init(&jobqueue->rwlock, NULL);
}
return jobqueue;
}
```
* 建立一個 `jobqueue_t` 結構,結構中包含整個由任務單元(`threadtask_t`)形成的 linked-list 的頭尾 reference,以及一個 [condition variable](https://en.wikipedia.org/wiki/Monitor_(synchronization)#Condition_variables) + [mutex](https://en.wikipedia.org/wiki/Mutual_exclusion)
#### `tpool_create`
```cpp
struct __threadpool *tpool_create(size_t count)
{
jobqueue_t *jobqueue = jobqueue_create();
struct __threadpool *pool = malloc(sizeof(struct __threadpool));
if (!jobqueue || !pool) {
if (jobqueue)
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
...
```
* 建立 `__threadpool` 結構所需的 jobqueue,以及配置失敗的處理
```cpp
...
pool->count = count, pool->jobqueue = jobqueue;
if ((pool->workers = malloc(count * sizeof(pthread_t)))) {
for (int i = 0; i < count; i++) {
if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch,
(void *) jobqueue)) {
for (int j = 0; j < i; j++)
pthread_cancel(pool->workers[j]);
for (int j = 0; j < i; j++)
pthread_join(pool->workers[j], NULL);
free(pool->workers);
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
}
return pool;
}
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
```
* 根據 `count` 建立相應數量的 thread,加入 `pool` 中,可以看到 thread 建立後會運行 `jobqueue_fetch`
* 如果過程中有任何 `malloc`、`pthread_create` 失敗,回收先前的成功配置
#### `jobqueue_fetch`
```cpp
static void *jobqueue_fetch(void *queue)
{
jobqueue_t *jobqueue = (jobqueue_t *) queue;
threadtask_t *task;
int old_state;
pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);
```
* 所有的 thread 會那到一個共享的參數(`jobqueue_t`) `queue`
* [`pthread_cleanup_push`](https://man7.org/linux/man-pages/man3/pthread_cleanup_push.3.html) 註冊當 thread 非正常終止時(例如受 [`pthread_cancel`](https://man7.org/linux/man-pages/man3/pthread_cancel.3.html) 而退出),若在與下個成對 [`pthread_cleanup_pop()`](https://linux.die.net/man/3/pthread_cleanup_pop) 之間發生終止,就呼叫 `__jobqueue_fetch_cleanup` 先釋放必要的資源
* 可以看到 `__jobqueue_fetch_cleanup` 的任務就是把 mutex 釋放,避免中止時還佔用資源導致 deadlock
```cpp
while (1) {
pthread_mutex_lock(&jobqueue->rwlock);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
pthread_testcancel();
while (!jobqueue->tail)
pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
```
首先需注意到 `pthread_cancel` 不總是直接讓 thread 退出,根據 `pthread_cancel` 的文件
> **Whether and when the target thread reacts to the cancellation request depends on two attributes that are under the control of that thread: its cancelability state and type.**
第一是 cancel 是否被允許
> **A thread's cancelability state, determined by pthread_setcancelstate(3), can be enabled (the default for new threads) or disabled.**
第二是 cancel 的處理方式
> **A thread's cancellation type, determined by pthread_setcanceltype(3), may be either asynchronous or deferred (the default for new threads).**
因此:
* [`pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state)`](https://man7.org/linux/man-pages/man3/pthread_setcancelstate.3.html) 使得 cancel 請求可以被接收
* [`pthread_testcancel`](https://man7.org/linux/man-pages/man3/pthread_testcancel.3.html) 建立 cancellation point,執行到 `pthread_testcancel` 時,thread 就可以響應 cancel 然後退出
* thread 會一直等待 `jobqueue->tail` 被 pending 新的任務,利用 condition variable 避免佔有 mutex busy waiting(詳見 [pthread_cond_wait](https://linux.die.net/man/3/pthread_cond_wait))
```cpp
if (jobqueue->head == jobqueue->tail) {
task = jobqueue->tail;
jobqueue->head = jobqueue->tail = NULL;
} else {
threadtask_t *tmp;
for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
tmp = tmp->next)
;
task = tmp->next;
tmp->next = NULL;
jobqueue->tail = tmp;
}
pthread_mutex_unlock(&jobqueue->rwlock);
...
```
* 取出 `jobqueue->tail`,更新 queue 的狀態
* `jonqueue->head == jobqueue->tail` 表示這是 pending 的最後一個任務,直接取出任務,之後 queue 的狀態是 `jobqueue->head = jobqueue->tail = NULL;`
* 否則 `jobqueue->tail` 要更新到倒數第二個任務,因為維護方式是單向 linked-list 所以只好 for 迴圈線性查找
```cpp
if (task->func) {
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_CANCELLED) {
pthread_mutex_unlock(&task->future->mutex);
free(task);
continue;
} else {
task->future->flag |= __FUTURE_RUNNING;
pthread_mutex_unlock(&task->future->mutex);
}
void *ret_value = task->func(task->arg);
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_DESTROYED) {
pthread_mutex_unlock(&task->future->mutex);
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
} else {
task->future->flag |= __FUTURE_FINISHED;
task->future->result = ret_value;
pthread_cond_broadcast(&task->future->cond_finished);
pthread_mutex_unlock(&task->future->mutex);
}
free(task);
...
```
* `task->func` 表示 queue 中存在實際的任務,每個 `task` 中的 `future.flag` 欄位決定了對任務的處理方式
* 如果 `__FUTURE_CANCELLED` 被設置,不執行 `task->func` 並釋放 `task`
* `__FUTURE_CANCELLED` 可能在 `jobqueue_destroy` 被設置
* 否則設置 `__FUTURE_RUNNING` 並且執行 `task_func`
* `__FUTURE_RUNNING` 在目前的程式中沒有真正的被用上
* 執行之後,檢查 `__FUTURE_DESTROYED`,如果被設置則將 `task->future` 相關的 mutex / cond variable 及本身做釋放
* `__FUTURE_DESTROYED` 在 `tpool_future_destroy` 會有機會被設置,後者會判斷 task 的執行狀況決定在 main thread 釋放 `future` 及相關資源,還是延後到 work thread 完成一定的任務後再自行刪除
* 否則設置 `__FUTURE_FINISHED`,紀錄回傳值在 `future->result` 中並透過 [`pthread_cond_broadcast`](https://linux.die.net/man/3/pthread_cond_broadcast) 通知函式的運行已完成,可以讀取回傳值了
```cpp
} else {
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
free(task);
break;
}
}
pthread_cleanup_pop(0);
pthread_exit(NULL);
```
* 否則回收 `task->future` 和 `task` 的相關資源及本身
* 最後 `pthread_cleanup_pop` 對應前面的 `pthread_cleanup_push`
* [`pthread_exit`](https://man7.org/linux/man-pages/man3/pthread_exit.3.html) 讓 thread 回傳 NULL 退出
#### `tpool_apply`
```cpp
struct __tpool_future *tpool_apply(struct __threadpool *pool,
void *(*func)(void *),
void *arg)
{
jobqueue_t *jobqueue = pool->jobqueue;
threadtask_t *new_head = malloc(sizeof(threadtask_t));
struct __tpool_future *future = tpool_future_create();
if (new_head && future) {
new_head->func = func, new_head->arg = arg, new_head->future = future;
pthread_mutex_lock(&jobqueue->rwlock);
if (jobqueue->head) {
new_head->next = jobqueue->head;
jobqueue->head = new_head;
} else {
jobqueue->head = jobqueue->tail = new_head;
pthread_cond_broadcast(&jobqueue->cond_nonempty);
}
pthread_mutex_unlock(&jobqueue->rwlock);
} else if (new_head) {
free(new_head);
return NULL;
} else if (future) {
tpool_future_destroy(future);
return NULL;
}
return future;
}
```
* 建立 `threadtask_t` 結構,準備加入 jobqueue 中
* `tpool_future_create` 初始化 `__tpool_future`
* 值得注意的是 [`pthread_cond_init`](https://linux.die.net/man/3/pthread_cond_init) 透過 [`pthread_condattr_init`](https://linux.die.net/man/3/pthread_condattr_init) 所得到的 default `pthread_condattr_t` 結構進行初始化,主要是因為會用到
:::warning
猜測初始化方式與會使用到 [`pthread_cond_timewait`](https://linux.die.net/man/3/pthread_cond_timedwait) 有關,但是不確定 `pthread_cond_init(..., NULL)` 不就是使用 default attribute 嗎?
:::
* 如果 `threadtask_t` 和 `__tpool_future` 結構都初始化成功(`if (new_head && future)`),就把 `threadtask_t` 結構必要的內容設定好,並且加入到 jobqueue 的 head
* 如果是加入到空的 queue 中,需要額外設置 condition variable 喚醒執行 `jobqueue_fetch` 的 thread
* 否則的話做必要的清理
* 回傳的 future 主要是讓使用 thread pool 的程式可以拿回運行後的回傳值
#### `tpool_future_get`
```cpp
void *tpool_future_get(struct __tpool_future *future, unsigned int seconds)
{
pthread_mutex_lock(&future->mutex);
/* turn off the timeout bit set previously */
future->flag &= ~__FUTURE_TIMEOUT;
while ((future->flag & __FUTURE_FINISHED) == 0) {
if (seconds) {
struct timespec expire_time;
clock_gettime(CLOCK_MONOTONIC, &expire_time);
expire_time.tv_sec += seconds;
int status = pthread_cond_timedwait(&future->cond_finished,
&future->mutex, &expire_time);
if (status == ETIMEDOUT) {
future->flag |= __FUTURE_TIMEOUT;
pthread_mutex_unlock(&future->mutex);
return NULL;
}
} else
pthread_cond_wait(&future->cond_finished, &future->mutex);
}
pthread_mutex_unlock(&future->mutex);
return future->result;
}
```
* 等待 task 的計算結束(`(future->flag & __FUTURE_FINISHED) != 0`),如果未結束會透過 `pthread_cond_timedwait`(如果有設置等待時間的要求) 或者 ` pthread_cond_wait` 等待 task 的計算結束則得以通過,回傳計算後的結果 `future->result`
### 小結
理解前面對關鍵函式的說明之後,回過頭來就很容易理解。因為 [Bailey–Borwein–Plouffe formula](https://en.wikipedia.org/wiki/Bailey%E2%80%93Borwein%E2%80%93Plouffe_formula) 可以將 $\pi$ 的計算分成多個獨立的計算,我們預先建立 4 個 thread ,每個 thread 的工作是嘗試從 queue 從取出任務並執行,且 thread 完成任務(執行完某個 function pointer 然後得到回傳值)後會再嘗試從 queue 中取出一個任務,直到 queue 中為空。
因為不是為每個計算都創造一個 thread,並且在使用完之後回收,可以減少建立和銷毀 thread 的時間成本。畢竟 thread 所需執行內容如果並不複雜,使用太多的 thread 來處理,可能建立和銷毀 thread 產生的代價超出計算本身。況且,建立數百個 thread 也不代表所有 thread 都能在相近時間點結束運算。反之,透過 thread pool,多個任務可以有效的善用已經被建立的 thread 進行工作。
## Tiny nc
> Material: [2021q1 第 5 週測驗題](https://hackmd.io/@sysprog/linux2021-quiz5#%E6%B8%AC%E9%A9%97-2)
[nc(netcat)](https://linux.die.net/man/1/nc) 是 UNIX 系統的網路工具程式,支援包含 TCP 連線、UDP 封包相關的功能等。[tinync](https://github.com/sysprog21/concurrent-programs/tree/master/tinync) 展示了一個精簡的 nc 實作,關鍵在於透過 [coroutine](https://en.wikipedia.org/wiki/Coroutine) 技巧達到此目的,下面來看實作的手法。
### 實作
#### `main`
讓我們先從主函式看起。
```cpp
int main(int argc, char *argv[])
{
if (argc != 3) {
fprintf(stderr, "USAGE: %s <ip> <port>\n", argv[0]);
return 1;
}
char *host = argv[1];
int port = atoi(argv[2]);
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
perror("socket()");
return 1;
}
if (nonblock(fd) < 0) {
perror("nonblock() socket");
return 1;
}
if (nonblock(STDIN_FILENO) < 0) {
perror("nonblock() stdin");
return 1;
}
if (nonblock(STDOUT_FILENO) < 0) {
perror("nonblock() stdout");
return 1;
}
```
* 可以看到設計上參數 1(`argv[1]`) 是 host 的名稱字串,參數 2 (`argv[2]`)是 port number
* [`socket`](https://man7.org/linux/man-pages/man2/socket.2.html) 建立一個連接的端點(endpoint),返回的值是該端點的 file descriptor
* `AF_INET`: IPv4 Internet protocols
* `SOCK_STREAM`: 使用TCP(資料流)方式提供可靠、雙向、串流的通信頻道
* 利用 [`nonblock`](#nonblock) 將該 endpoint、STDIN、STDOUT 都設置為 nonblock mode
```cpp
struct sockaddr_in addr = {
.sin_family = AF_INET,
.sin_addr =
{
.s_addr = inet_addr(host),
},
.sin_port = htons(port),
};
connect(fd, (struct sockaddr *) &addr, sizeof(struct sockaddr_in));
```
* [`connect`](https://man7.org/linux/man-pages/man2/connect.2.html) 連接 `fd` 所代表的 socket,`addr` 結構中則是 `struct sockaddr` 結構,包含連接 socket 的相關資訊
* `.sin_family`: `AF_INET` = IPv4 Internet protocols
* `.sin_addr.s_addr`: [`inet_addr`](https://linux.die.net/man/3/inet_addr) 將符合 IPv4 由數字和點形成的字串格式 `host` 轉成其二進制的表示
* `.sin_port`: [`htons`](https://linux.die.net/man/3/htons) 則將 `port` 從 `uint16_t` 數字轉換成 network byte order
```cpp
byte_queue_t queue = cr_queue_init();
cr_context(stdin_loop) = cr_context_init();
cr_context(socket_read_loop) = cr_context_init();
cr_context(socket_write_loop) = cr_context_init();
```
* `cr_context` 宣告一個 `struct cr` 變數,並透過 `cr_context_init` 分別為連接的 socket 讀和寫 / STDIN 初始化一個之
* 成員的 `label` 設為 NULL
* `status` 則設為 `CR_BLOCKED`
* `cr_queue_init` 則初始化 `byte_queue_t`,後者由 `typedef cr_queue(uint8_t, 4096) byte_queue_t;` 定義而來,加上 `cr_queue` 的定可以知道是一個 `uint8_t` 為單位的 4096 個元素大小之陣列,加上 `r` / `w` 兩個 `size_t` 型態的成員
```cpp
while (cr_status(stdin_loop) == CR_BLOCKED &&
cr_status(socket_read_loop) == CR_BLOCKED) {
if (cr_queue_empty(&queue)) {
fd_set fds;
FD_ZERO(&fds);
FD_SET(STDIN_FILENO, &fds);
FD_SET(fd, &fds);
select(fd + 1, &fds, NULL, NULL, NULL);
}
cr_run(socket_read_loop, fd);
cr_run(socket_write_loop, &queue, fd);
cr_run(stdin_loop, &queue);
}
...
```
* `cr_status` 檢查 `cr_stdin` 和 `cr_socket_read` 的 `status` 是否皆為 `CR_BLOCKED`,若是則表示輸入內容尚未完整讀完且寫至 socket,且 socket 未 shutdown,重複迴圈操作
* while 迴圈中透過 [`select`](https://man7.org/linux/man-pages/man2/select.2.html) 去監視 file descriptor 的狀態,若有 file descriptor 被操作就會在往下的三個 `cr_run` 做對應處理(細節參見下方對 3 個函式的說明)
### `nonblock`
```cpp
static int nonblock(int fd)
{
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1)
return -1;
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
```
* [fncntl](https://man7.org/linux/man-pages/man2/fcntl.2.html) 根據第二個參數內容的給定的 file descriptor 進行操作
* 例如這裡是先用 `F_GETFL` 得到 file 原本的存取模式和 status flags
* 再用 `F_SETFL` 嘗試加設置 `O_NONBLOCK` 特性
> [`O_NONBLOCK`](https://man7.org/linux/man-pages/man2/open.2.html)
>
> **When possible, the file is opened in nonblocking mode. Neither the open() nor any subsequent I/O operations on the file descriptor which is returned will cause the calling process to wait.**
#### `cr_line`
```cpp
#define __cr_line3(name, line) _cr_##name##line
#define __cr_line2(name, line) __cr_line3(name, line)
#define __cr_line(name) __cr_line2(name, __LINE__)
```
* `__LINE__` 根據 [3.7.1 Standard Predefined Macros](https://gcc.gnu.org/onlinedocs/cpp/Standard-Predefined-Macros.html) 會被 gcc 轉換成該行的行數
* 之所以要多一層 `__cr_line2` 是為了讓 preprocessor 先將 `__LINE__` 轉為 constant 再組織 label 字串
:::info
```cpp
#define __cr_line2(name, line) _cr_##name##line
#define __cr_line(name) __cr_line2(name, __LINE__)
```
用更清楚的範例說明: 假如用以上的定義,`__cr_line(name)` 會被變成 `_cr_label__LINE__`
:::
#### `cr_label`
```cpp
#define cr_label(o, stat) \
do { \
(o)->status = (stat); \
__cr_line(label) : (o)->label = &&__cr_line(label); \
} while (0)
```
* 首先,將 `status` 設為指定的 `stat`
* 同時透過 `cr_line` 建立一個獨立的 label 紀錄到 `label` 成員中,使用 [Labels as Values](https://gcc.gnu.org/onlinedocs/gcc/Labels-as-Values.html) 可以達到此目的
#### `cr_wait`
```cpp
#define cr_wait(o, cond) \
do { \
cr_label(o, CR_BLOCKED); \
if (!(cond)) \
return; \
} while (0)
```
* `cr_label` 設置 `status` 為 `CR_BLOCKED`,並且設下新的 label
* 如果給定的條件 `cond` 不符合,則退出函式,
#### `cr_sys`
```cpp
#define cr_sys(call) \
cr_wait((errno = 0) || \
!(((call) == -1) && (errno == EAGAIN || errno == EWOULDBLOCK || \
errno == EINPROGRESS || errno == EINTR)))
```
* 先重設 `errno` (`errno = 0` 恆為 0)
* 執行 system call,如果 system call 回傳 -1,且 `errno` 是所列的 error,先行返回,而下次回到函式時會再次嘗試此 system call
#### `cr_begin`
```cpp
#define cr_begin(o) \
do { \
if ((o)->status == CR_FINISHED) \
return; \
if ((o)->label) \
goto *(o)->label; \
} while (0)
```
* 先檢查 `status` 是否為 `CR_FINISHED` 如果是,表示此函式的任務已經完成,直接返回
* 否則就 goto 到之前離開的地方,也就是 `label` 所紀錄的位址
#### `stdin_loop`
```cpp
static void cr_proto(stdin_loop, byte_queue_t *out)
{
cr_local uint8_t b;
cr_local int r;
cr_begin();
for (;;) {
cr_sys(r = read(STDIN_FILENO, &b, 1));
if (r == 0) {
cr_wait(cr_queue_empty(out));
cr_exit(1);
}
cr_wait(!cr_queue_full(out));
cr_queue_push(out, b);
}
cr_end();
}
```
* `cr_proto` 確保 coroutine 相關的函式介面定義,需包含參數 `struct cr *`,然後就可以透過 `cr_run` 介面來呼叫此函數的執行
* `cr_begin` 判斷函式的任務是否已經完成,如果是則直接返回,否則就跳到下次返回的地方,成對的 `cr_end` 則會設置表示任務完成的 status `CR_FINISHED`,並且更新 label
* 否則,for 迴圈的任務大致是:
* 從 STDIN 中透過 [`read`](https://linux.die.net/man/2/read) 逐 byte 讀取內容
* 如果 queue 中已填滿 (`cr_queue_full(out)`),`cr_wait` 會設置 label 並返回,讓其他的函式先運行,等待 queue 中的內容被處理掉
* 否則就 `cr_queue_push` 將內容寫入 queue
* 直到讀到 EOF(`r==0`), `cr_wait(cr_queue_empty(out))` 一直等到 queue 中的字元皆被處理完成,`cr_exit(1)` 設置 `CR_FINISHED` flag 表示函式的任務完成
#### `socket_write_loop`
```cpp
static void cr_proto(socket_write_loop, byte_queue_t *in, int fd)
{
cr_local uint8_t *b;
cr_begin();
for (;;) {
cr_wait(!cr_queue_empty(in));
b = cr_queue_pop(in);
cr_sys(send(fd, b, 1, 0));
}
cr_end();
}
```
* 近似於 `stdin_loop`,這裡 for 迴圈的任務是
* 等待 queue 中存在內容 `!cr_queue_empty(in)`
* 取得該字元並且透過 [`send`](https://man7.org/linux/man-pages/man2/send.2.html) 傳送到 socket
#### `socket_read_loop`
```cpp
static void cr_proto(socket_read_loop, int fd)
{
cr_local uint8_t b;
cr_local int r;
cr_begin();
for (;;) {
cr_sys(r = recv(fd, &b, 1, 0));
if (r == 0)
cr_exit(1);
cr_sys(write(STDOUT_FILENO, &b, 1));
}
cr_end();
}
```
* 近似於 `stdin_loop`,這裡 for 迴圈的任務是
* [`recv`](https://man7.org/linux/man-pages/man2/recv.2.html) 從 socket 獲取訊息
* 如果 socket shutdown(`r == 0`),透過 `cr_exit` 結束此任務
* 否則透過 [`write`](https://man7.org/linux/man-pages/man2/write.2.html) 將獲取訊息寫至 STDOUT
### 小結
在這個範例中,我們看到了 C 語言中實現 coroutine 的一種技巧。透過 [Labels as Values](https://gcc.gnu.org/onlinedocs/gcc/Labels-as-Values.html),可以紀錄函式返回的地方並在下次重新進入時透過 goto 直接從中途開始。
## fiber
> Material: [2021q1 第 6 週測驗題](https://hackmd.io/@sysprog/linux2021-quiz6#%E6%B8%AC%E9%A9%97-3)
[fiber](https://en.wikipedia.org/wiki/Fiber_(computer_science)) 所指為輕量化(lightweight)的 thread,屬於 user-level thread。user space 的程式可以自行決定其運作方式,而屬於 kernel 的排程器則不會將其視為一個排程單位來處理。fiber 間透過協同式多工([cooperative multitasking](https://en.wikipedia.org/wiki/Cooperative_multitasking)) 來交互運作。
### 實作
#### 資料結構
```cpp
typedef struct {
pid_t pid; /* The pid of the child thread as returned by clone */
void *stack; /* The stack pointer */
} fiber_t;
/* The fiber "queue" */
static fiber_t fiber_list[MAX_FIBERS];
/* The pid of the parent process */
static pid_t parent;
/* The number of active fibers */
static int num_fibers = 0;
```
* `fiber_t` 是每個 fiber 的相關資料結構,其內容包含自己的 pid 和 stack 指標
* `fiber_list` 是管理每個建立 fiber 的 queue
* `parent` 是所有 fiber 共同的 parent process
* `num_fibers` 是存在的 fiber 總數量
#### `fiber_init()`
```cpp
void fiber_init()
{
for (int i = 0; i < MAX_FIBERS; ++i)
fiber_list[i].pid = 0, fiber_list[i].stack = 0;
parent = getpid();
}
```
* 將 queue 進行初始化,並且透過 [`getpid`](https://man7.org/linux/man-pages/man2/getpid.2.html) 得到 process 的 pid
#### `fiber_spawn`
```cpp
/* Creates a new fiber, running the func that is passed as an argument. */
int fiber_spawn(void (*func)(void))
{
if (num_fibers == MAX_FIBERS)
return FIBER_MAXFIBERS;
if ((fiber_list[num_fibers].stack = malloc(FIBER_STACK)) == 0)
return FIBER_MALLOC_ERROR;
```
* 如果已存在的 fiber 數量已經大於允許總量,返回錯誤
* 如果 malloc 給 stack 的空間失敗,返回錯誤
```cpp
struct fiber_args *args;
if ((args = malloc(sizeof(*args))) == 0) {
free(fiber_list[num_fibers].stack);
return FIBER_MALLOC_ERROR;
}
args->func = func;
```
* 同時也分配空間給 fiber 需要運行的函式指標 `args`,將其設置為給定的參數 `func`
```cpp
fiber_list[num_fibers].pid = clone(
fiber_start, (char *) fiber_list[num_fibers].stack + FIBER_STACK,
SIGCHLD | CLONE_FS | CLONE_FILES | CLONE_SIGHAND | CLONE_VM, args);
if (fiber_list[num_fibers].pid == -1) {
free(fiber_list[num_fibers].stack);
free(args);
return FIBER_CLONE_ERROR;
}
num_fibers++;
return FIBER_NOERROR;
}
```
* 透過 [`clone`](https://man7.org/linux/man-pages/man2/clone3.2.html) 建立 fiber 的實體
* `fiber_start` 是不同任務的 fiber 共同的介面,其內部取出執行實際的任務 pointer 並執行之
* `fiber_list[num_fibers].stack + FIBER_STACK` 傳遞建立的 stack 之頂端位置,細節如以下文件所述[1]
* flag 對於特性的設置包含
* `SIGCHLD`: child 退出時會向 parent 發送 `SIGCHLD` signal
* `CLONE_FS`: caller 和 child 共享 filesystem information
* `CLONE_FILES`: caller 和 child 共享 file descriptor table
* `CLONE_SIGHAND`: caller 和 child 共享 signal handlers table
* `CLONE_VM`: caller 和 child 共享 memory space
* 執行的實際函式作為參數的 `args` 傳遞
> [1] Since the child and calling process may share memory, it is not possible for the child process to execute in the same stack as the calling process. The calling process must therefore set up memory space for the child stack and pass a pointer to this space to clone(). Stacks grow downward on all processors that run Linux (except the HP PA processors), so stack usually points to the topmost address of the memory space set up for the child stack.
* 如果 clone 回傳的值為 -1,表示 clone 失敗,進行相應的回收
* 否則就更新 `num_fibers` 並且回傳 `FIBER_NOERROR`
> 延伸參考: [Light-Weight Processes: Dissecting Linux Threads](http://malgenomeproject.org/os2018fall/04_thread_b.pdf)
#### `fiber_wait_all`
```cpp
/* Execute the fibers until they all quit. */
int fiber_wait_all()
{
/* Check to see if we are in a fiber, since we do not get signals in the
* child threads
*/
pid_t pid = getpid();
if (pid != parent)
return FIBER_INFIBER;
/* Wait for the fibers to quit, then free the stacks */
while (num_fibers > 0) {
if ((pid = wait(0)) == -1)
exit(1);
/* Find the fiber, free the stack, and swap it with the last one */
for (int i = 0; i < num_fibers; ++i) {
if (fiber_list[i].pid == pid) {
free(fiber_list[i].stack);
if (i != --num_fibers)
fiber_list[i] = fiber_list[num_fibers];
break;
}
}
}
return FIBER_NOERROR;
}
```
整理來說,就是透過 `fiber_wait_all` 來等待 thread 的完成並且回收資源
#### 主程式
```cpp
static void fibonacci()
{
int fib[2] = {0, 1};
printf("Fib(0) = 0\nFib(1) = 1\n");
for (int i = 2; i < 15; ++i) {
int next = fib[0] + fib[1];
printf("Fib(%d) = %d\n", i, next);
fib[0] = fib[1];
fib[1] = next;
fiber_yield();
}
}
static void squares()
{
for (int i = 1; i < 10; ++i) {
printf("%d * %d = %d\n", i, i, i * i);
fiber_yield();
}
}
int main()
{
fiber_init();
fiber_spawn(&fibonacci);
fiber_spawn(&squares);
/* Since these are non-preemptive, we must allow them to run */
fiber_wait_all();
return 0;
}
```
回頭來看 fiber 的 API 是如何被使用的: 透過 `fiber_spawn` 分別建立計算 fib 和 squares 的兩個 fiber,兩個 fiber 在 for loop 每個 iteration 會呼叫 `fiber_yield`,`fiber_yield` 內部透過 [`sched_yield`](https://man7.org/linux/man-pages/man2/sched_yield.2.html) 主動讓出 cpu,因此 fiber 不會執著於將自己的 timeslice 用完,而是每次完成一次的結果計算之後,就讓其他的 fiber 有機會進行別的任務。
### 問題與改進
範例程式最顯著的問題在於 `fibonacci` 和 `squares` 的輸出會有重複、甚至某些字元消失等問題。而原因在於同 memory space 的執行緒下,`printf` 中的 buffer 會共享。一個最容易的作法是透過 lock 來確保 `printf` 總是只有一個 thread 在 context 中。或者可以透過更低階的 function 來產生輸出:
```cpp
#define SAFE_PRINT(fmt, ...) \
do { \
char str[64]; \
int n = sprintf(str, fmt __VA_OPT__(, ) __VA_ARGS__); \
write(STDOUT_FILENO, str, n); \
} while (0)
```
不過不難發現這個實作存在許多改進的空間,例如字串長度的限制等。此外值得一提的是這裡透過 [`__VA_OPT__(, )`](https://gcc.gnu.org/onlinedocs/cpp/Variadic-Macros.html) 使得 `SAFE_PRINT` 可以接受無參數的純字串。
## HTTP daemon
> Material: [2021q1 第 7 週測驗題: 測驗 2](https://hackmd.io/@sysprog/linux2021-quiz7/https%3A%2F%2Fhackmd.io%2F%40sysprog%2FHJsyxSFSd)
httpd 展示了一個精簡的多執行緒網頁伺服器,可以接受請求並且回應在 resources 目錄下的 index.html 檔案給 client 端,下面一起來看看實作的細節。
### 實作
#### enum
```cpp
typedef int status_t;
enum {
STATUS_OK = 200,
STATUS_BAD_REQUEST = 400,
STATUS_FORBIDDEN = 403,
STATUS_NOT_FOUND = 404,
STATUS_REQUEST_TIMEOUT = 408,
STATUS_REQUEST_TOO_LARGE = 413,
STATUS_SERVER_ERROR = 500,
};
```
* [List of HTTP status codes](https://en.wikipedia.org/wiki/List_of_HTTP_status_codes)
#### data structures
```cpp
typedef struct __node {
int fd;
struct __node *next;
} node_t;
typedef struct {
node_t *head, *tail;
pthread_mutex_t *head_lock, *tail_lock; /* guards head and tail */
pthread_cond_t *non_empty;
int size; /* only used for connection timeout heuristic */
} queue_t;
```
* 和 [Thread Pool](#Thread-Pool) 章節之 queue 類似的結構,只是 node 中的操作對象變成了 file descriptor `fd`
#### `enqueue`
```cpp
static void enqueue(queue_t *q, int fd)
{
/* Construct new node */
node_t *node = malloc(sizeof(node_t));
node->fd = fd, node->next = NULL;
pthread_mutex_lock(q->tail_lock);
/* Add node to end of queue */
q->tail->next = node;
q->tail = node;
q->size++;
/* Wake any sleeping worker threads */
pthread_cond_signal(q->non_empty);
pthread_mutex_unlock(q->tail_lock);
}
```
* 將 fd 加入 queue 的 tail 中,因為可能有多個 thread 同時要 enqueue 所以需要 mutex 來保護 queue
* [`pthread_cond_signal`](https://linux.die.net/man/3/pthread_cond_signal) 讓 dequeue 時如果 queue 暫時沒有物件可以先 suspend 而不用 busy waiting
#### `dequeue`
```cpp
static void dequeue(queue_t *q, int *fd)
{
node_t *old_head;
pthread_mutex_lock(q->head_lock);
/* Wait until signaled that queue is non_empty.
* Need while loop in case a new thread manages to steal the queue
* element after the waiting thread is signaled, but before it can
* re-acquire head_lock.
*/
while (!q->head->next) /* i.e. q is empty */
pthread_cond_wait(q->non_empty, q->head_lock);
/* Store dequeued value and update dummy head */
old_head = q->head;
*fd = old_head->next->fd;
q->head = q->head->next;
q->size--;
pthread_mutex_unlock(q->head_lock);
free(old_head);
}
```
* 如果 queue 為空(`!q->head->next`),`pthread_cond_wait` 釋放鎖並 suspend,等待 signal
* 取出 head 的 fd,注意 queue 的結構存在一個 dummy node,所以取出的是下一個 node 的 fd 然後釋放本身的 node 結構(好處是 linked list 間的指向調整很容易,但有點不直覺,應該可以稍做修改),同樣需要 mutex 來保護 queue
#### `listening_socket`
```cpp
static int listening_socket()
{
struct sockaddr_in serveraddr;
memset(&serveraddr, 0, sizeof(serveraddr));
serveraddr.sin_family = AF_INET;
serveraddr.sin_port = htons(PORT);
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
int listenfd = socket_(AF_INET, SOCK_STREAM, 0);
bind_(listenfd, (struct sockaddr *) &serveraddr, sizeof(serveraddr));
listen_(listenfd, BACKLOG);
return listenfd;
}
```
* 利用 `socket_` (`socket_` 是對 [`socket`](https://man7.org/linux/man-pages/man2/socket.2.html) 包含出錯處理的封裝) 建立一個連接的端點(endpoint)
* `AF_INET`: IPv4 Internet protocols
* `SOCK_STREAM`: 使用TCP(資料流)方式提供可靠、雙向、串流的通信頻道
* 利用 `bind_` (對 [`bind`](https://man7.org/linux/man-pages/man2/bind.2.html) 包含出錯處理的封裝) 和 socket 建立綁定
* `struct sockaddr` 結構包含連接 socket 的相關資訊
* `.sin_family`: `AF_INET` = IPv4 Internet protocols
* `.sin_port`: [`htons`](https://linux.die.net/man/3/htons) 將 `PORT` 從 unsigned short 數字轉換成 network byte order
* `.sin_addr.s_addr`: [`htonl`](https://linux.die.net/man/3/htonl) 將 unsigned integer 數字轉換成 network byte order
> [ip](https://man7.org/linux/man-pages/man7/ip.7.html): INADDR_ANY (0.0.0.0) means any address for binding
* 利用 `listen_` (對 [`listen`](https://man7.org/linux/man-pages/man2/listen.2.html) 包含出錯處理的封裝) 去監聽 socket
* `BACKLOG` 是連接請求能夠 pending 的最大數量
#### `greeter_routine`
```cpp
void *greeter_routine(void *arg)
{
struct greeter_args *ga = (struct greeter_args *) arg;
int listfd = ga->listfd;
queue_t *q = ga->q;
struct sockaddr_in clientaddr;
struct timeval timeout;
/* Accept connections, set their timeouts, and enqueue them */
while (1) {
socklen_t clientlen = sizeof(clientaddr);
int connfd =
accept(listfd, (struct sockaddr *) &clientaddr, &clientlen);
if (connfd < 0) {
perror("accept");
continue;
}
```
* 對監聽的 socket 之 file descriptor `listfd` 進行 [`accept()`](https://man7.org/linux/man-pages/man2/accept.2.html) system call,獲得在監聽對列中的首個連接請求
* `clientaddr` 在 system call 後會填入請求的 client 端之相關資訊
* `clientlen` 作為參數時設為 `clientaddr` 結構的大小,返回時則會是返回的 `clientaddr` 內容的實際大小
* 返回新的 socket `connfd` 可以用來跟接受的 client 做 communication
```cpp
/* Basic heuristic for timeout based on queue length.
* Minimum timeout 10s + another second for every 50 connections on the
* queue.
*/
int n = q->size;
timeout.tv_sec = 10;
if (n > 0)
timeout.tv_sec += n / 50;
setsockopt(connfd, SOL_SOCKET, SO_RCVTIMEO, (void *) &timeout,
sizeof(timeout));
enqueue(q, connfd);
}
}
```
* [`setsockopt`](https://linux.die.net/man/3/setsockopt) 設定 socket 的特性
* `SOL_SOCKET` 是設定特性的 protocol level 為 socket level
* `SO_RCVTIMEO` 表示要設定的選項是等待 input(receive) 的 timeout value,通過 `struct timeval` `timeout` 設置
* `enqueue` 將 client 的 fd 加入 queue 中,後續會在 worker thread 中取用
#### `worker_routine`
```cpp
static void *worker_routine(void *arg)
{
pthread_detach(pthread_self());
int connfd, file, len, recv_bytes;
char msg[MAXMSG], buf[1024];
status_t status;
http_request_t *request = malloc(sizeof(http_request_t));
queue_t *q = (queue_t *) arg;
struct stat st;
while (1) {
loopstart:
dequeue(q, &connfd);
memset(msg, 0, MAXMSG);
recv_bytes = 0;
```
* 首先將 thread 本身 [`pthread_detach`](https://man7.org/linux/man-pages/man3/pthread_detach.3.html),detach thread 在結束後會自動釋放回系統而不是由其他 thread 透過 join 來回收
* while loop 中,首先透過 dequeue 來取得之前在 greeter thread 通過 `accept` 得到的 file descriptor
```cpp
/* Loop until full HTTP msg is received */
while (strstr(strndup(msg, recv_bytes), "\r\n\r\n") == NULL &&
strstr(strndup(msg, recv_bytes), "\n\n") == NULL &&
recv_bytes < MAXMSG) {
if ((len = recv(connfd, msg + recv_bytes, MAXMSG - recv_bytes,
0)) <= 0) {
/* If client has closed, then close and move on */
if (len == 0) {
close(connfd);
goto loopstart;
}
/* If timeout or error, skip parsing and send appropriate
* error message
*/
if (errno == EWOULDBLOCK) {
status = STATUS_REQUEST_TIMEOUT;
} else {
status = STATUS_SERVER_ERROR;
perror("recv");
}
goto send;
}
recv_bytes += len;
}
/* Parse (complete) message */
status = parse_request(msg, request);
```
* 利用 [`strndup`](https://en.cppreference.com/w/c/experimental/dynamic/strndup) 複製得到的字串,並且 [`strstr`](https://www.cplusplus.com/reference/cstring/strstr/) 比對是否字串中是否存在 ``"\r\n\r\n"`` 或者 `"\n\n"`,表示已經取得 client message 中的 start-line 和 header 部份
:::warning
應該要修改程式碼以釋放 [`strndup`](https://en.cppreference.com/w/c/experimental/dynamic/strndup) 回傳的字串以避免 memory leak,詳見 [GitHub](https://github.com/RinHizakura/concurrent-programs/blob/master/httpd/httpd.c)
:::
* 內層的 while 迴圈中 [`recv`](https://man7.org/linux/man-pages/man2/recv.2.html) 從 socket 取得 message,如果回傳的 len $\leq$ 0
* 0: client shutdown
* -1: 錯誤發生,檢查 errno 傳遞正確的錯誤信息(`status`)
* 如果取得完整 message,`parse_request` 解析之
```cpp
send:
/* Send initial line */
len = sprintf(msg, "HTTP/1.%d %d %s\r\n", request->protocol_version,
status, status_to_str(status));
send(connfd, msg, len, 0);
```
* 透過 [`send`](https://man7.org/linux/man-pages/man2/send.2.html) 回傳 response 的 status line
```cpp
/* Send header lines */
time_t now;
time(&now);
len = strftime(buf, 1024, "Date: %a, %d %b %Y %H:%M:%S GMT\r\n",
gmtime(&now));
send(connfd, buf, len, 0);
```
* 傳輸 header 內容中的 Date 欄位,[strftime](https://www.cplusplus.com/reference/ctime/strftime/) 透過 `struct tm *` 來 format 字串,該結構可以利用 [`gmtime`](https://www.cplusplus.com/reference/ctime/gmtime/) 取得
```cpp
if (status == STATUS_OK && request->method == GET) {
stat(request->path, &st);
len = sprintf(msg, "Content-Length: %d\r\n", (int) st.st_size);
send(connfd, msg, len, 0);
len = sprintf(msg, "Content-Type: %s\r\n",
type_to_str(request->type));
send(connfd, msg, len, 0);
}
send(connfd, "\r\n", 2, 0);
```
* 正確收到 request 的情形下,傳遞包含 Content-Length([`stat`](https://man7.org/linux/man-pages/man2/lstat.2.html) 得到路徑所指的檔案相關信息)以及 Content-Type
* 最後補上一個 CRLF("\r\n"),準備填入 message body
```cpp
/* If request was well-formed GET, then send file */
if (status == STATUS_OK && request->method == GET) {
if ((file = open(request->path, O_RDONLY)) < 0)
perror("open");
while ((len = read(file, msg, MAXMSG)) > 0)
if (send(connfd, msg, len, 0) < 0)
perror("sending file");
close(file);
}
/* If HTTP/1.0 or recv error, close connection. */
if (request->protocol_version == 0 || status != STATUS_OK)
close(connfd);
else /* Otherwise, keep connection alive and re-enqueue */
enqueue(q, connfd);
}
return NULL;
}
```
* 如果 request 成功,就將路徑下的 `index.html` 內容傳遞出去
* 最後判斷判斷是否要關閉 socket,或者加入 queue
* http 1.0 的連線在完成請求後會斷開,1.1 則可以保持連線(可參考 [HTTP 1.0 / 1.1 / 2.0 的區別](https://iter01.com/548698.html))
#### `parse_request`
```cpp
static status_t parse_request(char *msg, http_request_t *request)
{
char *line;
TRY_CATCH_S(line = strsep_newline(&msg));
TRY_CATCH(parse_initial_line(line, request));
while ((line = strsep_newline(&msg)) != NULL && *line != '\0')
TRY_CATCH(parse_header(line, request));
return STATUS_OK;
}
```
* `TRY_CATCH` 和 `TRY_CATCH_S` 是對呼叫的函式在解析時出現錯誤時可以直接回傳相應的 status
* `strsep_newline` 以 ``"\r\n"`` 或者 `"\n"` 為基準切開字串,因此得到的 `line` 會是 start-line,`parse_initial_line` 解析之
* 接下來的 while 迴圈中逐行解析 header
* 事實上目前的 `parse_header` 甚麼也沒做就是了
#### `parse_initial_line`
```cpp
static status_t parse_initial_line(char *line, http_request_t *request)
{
char *token;
TRY_CATCH_S(token = strsep_whitespace(&line));
TRY_CATCH(parse_method(token, request));
TRY_CATCH_S(token = strsep_whitespace(&line));
TRY_CATCH(parse_path(token, request));
TRY_CATCH_S(token = strsep_whitespace(&line));
TRY_CATCH(parse_protocol_version(token, request));
return STATUS_OK;
}
```
* `strsep_whitespace` 以空白切開 line,因此會得到的 token 應該是 http method
* 第二次 `strsep_whitespace`,預期得到字串 `/index.html`,調整 `request` 結構以設定 resource 底下 index.html 之路徑
* 第三次 `strsep_whitespace`,預期得到 `"HTTP/1.0"` 或者 `"HTTP/1.1"`
#### 主程式
```cpp
int main()
{
queue_t *connections;
pthread_t workers[N_THREADS / 2], greeters[N_THREADS / 2];
/* Get current working directory */
char cwd[1024];
const char *RESOURCES = "/resources";
if (getcwd(cwd, sizeof(cwd) - sizeof(RESOURCES)) == NULL)
perror("getcwd");
/* Assign document root */
DOCUMENT_ROOT = strcat(cwd, RESOURCES);
/* Initalize connections queue */
connections = malloc(sizeof(queue_t));
queue_init(connections);
...
```
* [`getcwd`](https://man7.org/linux/man-pages/man3/getcwd.3.html) 取得 working directory 的路徑,因為後續要接上 `/resources` 路徑所以長度是 `sizeof(cwd) - sizeof(RESOURCES)`
* `queue_init` 初始化 `queue_t` 結構,包含配置 node 的空間和 mutex / condition variable 的初始化
```cpp
/* Initialize listening socket */
int listfd = listening_socket();
/* Package arguments for greeter threads */
struct greeter_args ga = {.listfd = listfd, .q = connections};
/* Spawn greeter threads. */
for (int i = 0; i < N_THREADS / 2; i++)
pthread_create(&greeters[i], NULL, greeter_routine, (void *) (&ga));
```
* `listening_socket` 建立 socket 並對給定 prot 進行監聽
* 建立 greeter thread
```cpp
/* Spawn worker threads. These will immediately block until signaled by
* main server thread pushes connections onto the queue and signals.
*/
for (int i = 0; i < N_THREADS / 2; i++)
pthread_create(&workers[i], NULL, worker_routine, (void *) connections);
pthread_exit(NULL);
return 0;
}
```
* 建立 worker thread
### 小結
在這個範例中,許多技巧在此前的章節其實已經有大致的討論。除了展示了 server 端該透過哪些 API 來建立 socket、監聽連接隊列、接收 request 以外,還值得注意的是在 enqueue 和 dequeue 的操作上,mutex / condition variable 的正確性與效率,如何驗證並改進,還需要更進一步的思考。
### 延伸閱讀
> * [TCP Socket Programming 學習筆記](http://zake7749.github.io/2015/03/17/SocketProgramming/)
> * [Understanding INADDR_ANY for socket programming](https://stackoverflow.com/questions/16508685/understanding-inaddr-any-for-socket-programming)
> * [socket connect() vs bind()](https://stackoverflow.com/questions/27014955/socket-connect-vs-bind)
> * [what is parameter level in getsockopt?](https://stackoverflow.com/questions/26281787/what-is-parameter-level-in-getsockopt)
> * [HTTP/1.1 — 訊息格式 (Message Format)](https://notfalse.net/39/http-message-format)
> * [HTTP/1.1:Response](https://www.w3.org/Protocols/rfc2616/rfc2616-sec6.html)
> * [HTTP/1.1 Request](https://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html)
## Ring buffer
> Material: [2021q1 第 7 週測驗題: 測驗 3](https://hackmd.io/@sysprog/linux2021-quiz7/https%3A%2F%2Fhackmd.io%2F%40sysprog%2FHk4ZxHKHu)
[Ring buffer](https://en.wikipedia.org/wiki/Circular_buffer) 是一個固定長度且頭尾相連的 queue 資料結構。在這個範例中,將展示一個考慮到 [lock-less](https://en.wikipedia.org/wiki/Non-blocking_algorithm),並適用於 SPSC (single-producer and single-consumer) 情境的 ring buffer。
### 實作
#### 資料結構
```cpp
typedef struct {
struct { /** Ring producer status. */
uint32_t watermark; /**< Maximum items before EDQUOT. */
uint32_t size; /**< Size of ring. */
uint32_t mask; /**< Mask (size - 1) of ring. */
volatile uint32_t head, tail; /**< Producer head and tail. */
} prod __attribute__((__aligned__(CACHE_LINE_SIZE)));
struct { /** Ring consumer status. */
uint32_t size; /**< Size of the ring. */
uint32_t mask; /**< Mask (size - 1) of ring. */
volatile uint32_t head, tail; /**< Consumer head and tail. */
} cons __attribute__((__aligned__(CACHE_LINE_SIZE)));
void *ring[] __attribute__((__aligned__(CACHE_LINE_SIZE)));
} ringbuf_t;
```
* `head` 和 `tail` 的設計是為了滿足 [lock-less](https://en.wikipedia.org/wiki/Non-blocking_algorithm) 的考量,在某個時間點,如果 `head` 和 `tail` 的值不同,應表示生產者/消費者正在工作中,則消費者/生產者可以知道自己可以取用的範圍
* 舉例來說,以生產者的角度而言,從 `cons.tail` 往下數到 `cons.head` 是消費者正在取用,尚不可生產新東西的範圍
* 注意 `head` 和 `tail` 的數值範圍與給定的 size 無關,恆為 0 ~ $2^{32}$ - 1,然而受益於 `size` 必須是 `2^n` 大小的限制,可以透過 `mask` 是 `size - 1` 的結果來 index 正確的 queue 位置而不會 overflow
* `ringbuf_t` 結構中包含 buffer 本身(`ring`),其中儲存的元素單位是指標,且指定為 `CACHE_LINE_SIZE` 對齊,可以降低 [false sharing](https://hackmd.io/@RinHizakura/rJTl9K5tv#Cache-and-False-Sharing) 的問題
:::warning
除了 false sharing 以外,對齊 cache line 還有其他好處嗎?
:::
#### `ringbuf_create`
```cpp
ringbuf_t *ringbuf_create(const unsigned count)
{
ssize_t ring_size = ringbuf_get_memsize(count);
if (ring_size < 0)
return NULL;
ringbuf_t *r = malloc(ring_size);
if (r)
ringbuf_init(r, count);
return r;
}
```
* `ringbuf_get_memsize` 確認要求的 buffer 大小是否合法,並回傳包含 meta data 和 buffer 本身所需的大小
* `ringbuf_init` 初始化 ring buffer 的相關成員
#### `ringbuf_get_memsize`
```cpp
ssize_t ringbuf_get_memsize(const unsigned count)
{
/* Requested size is invalid, must be power of 2, and do not exceed the
* size limit RING_SIZE_MASK.
*/
if ((!IS_POWEROF2(count)) || (count > RING_SIZE_MASK))
return -EINVAL;
ssize_t sz = sizeof(ringbuf_t) + count * sizeof(void *);
sz = ALIGN_FLOOR(sz, CACHE_LINE_SIZE);
return sz;
}
```
* 大小限制為 $2^n$ 且不能超過上限
* 回傳 buffer 本身加上其他相關資料的大小
:::danger
這裡 `ALIGN_FLOOR` 的目的為何?不應該是 `ALIGN_CEIL` 嗎?
* `sizeof(ringbuf_t)` = 128,總大小為 $128 + (2^{count} \times 8)$
* 因此如果 $n \geq 3$,`ALIGN_FLOOR` 不改變 sz
* $n < 3$ 則不會分配給 `ring` 的空間
:::
#### `ringbuf_is_full`
```cpp
static inline bool ringbuf_is_full(const ringbuf_t *r)
{
uint32_t prod_tail = r->prod.tail, cons_tail = r->cons.tail;
return ((cons_tail - prod_tail - 1) & r->prod.mask) == 0;
}
```
* 如果 consumer 和 producer 間位置差距為 `mask` 表示 ring buffer 為滿
* 因此 `(cons_tail + ~prod_tail) & mask) == 0`
* 又 `-prod_tail` = `~prod_tail + 1`,因此條件如回傳值
#### `ringbuf_is_empty`
```cpp
static inline bool ringbuf_is_empty(const ringbuf_t *r)
{
uint32_t prod_tail = r->prod.tail, cons_tail = r->cons.tail;
return cons_tail == prod_tail;
}
```
* 如果 consumer 和 producer 相同則表示為空
* 因此可以注意到 buffer 中會有一個位置永遠被空下來,藉此區別 empty 和 full 的差異
#### `ringbuffer_sp_do_enqueue`
```cpp
static inline int ringbuf_sp_enqueue(ringbuf_t *r, void *obj)
{
return ringbuffer_sp_do_enqueue(r, &obj, 1);
}
```
* `ringbuf_sp_enqueue` 間接呼叫 `ringbuffer_sp_do_enqueue`,後者是主要將指標加入 buffer 的函數
```cpp
static inline int ringbuffer_sp_do_enqueue(ringbuf_t *r,
void *const *obj_table,
const unsigned n)
{
uint32_t mask = r->prod.mask;
uint32_t prod_head = r->prod.head, cons_tail = r->cons.tail;
/* The subtraction is done between two unsigned 32-bits value (the result
* is always modulo 32 bits even if we have prod_head > cons_tail). So
* @free_entries is always between 0 and size(ring) - 1.
*/
uint32_t free_entries = mask + cons_tail - prod_head;
/* check that we have enough room in ring */
if ((n > free_entries))
return -ENOBUFS;
```
* `prod_head - cons_tail` 是 consumer 還等待消費的數量,而 `mask` 是 queue 中可允許的最大數量,因此要加入 queue 的總數 `n` 不能超過可以生產的最大量 `mask - (prod_head - cons_tail)`
```cpp
uint32_t prod_next = prod_head + n;
r->prod.head = prod_next;
/* write entries in ring */
ENQUEUE_PTRS();
__compiler_barrier();
r->prod.tail = prod_next;
/* if we exceed the watermark */
return ((mask + 1) - free_entries + n) > r->prod.watermark ? -EDQUOT : 0;
}
```
* `prod_next` 是正式生產完成時的 index 位置,首先更新 `head` 表示未來會生產到的地方
* `ENQUEUE_PTRS()` 是真正 enqueue 指標的關鍵程式碼
* ` __compiler_barrier()` 展開為 `asm volatile("" : : : "memory");`,後者是 inline assembly 透過 compiler 建立 memory barrier,在 barrier 前後的程式順序不會反轉
* 生產完成,更新 `tail` 表示消費者可取用至此新位置
#### `ENQUEUE_PTRS`
```cpp
#define ENQUEUE_PTRS() \
do { \
const uint32_t size = r->prod.size; \
uint32_t i, idx = prod_head & mask; \
if (idx + n < size) { \
for (i = 0; i < (n & ((~(unsigned) 0x3))); i += 4, idx += 4) { \
r->ring[idx] = obj_table[i]; \
r->ring[idx + 1] = obj_table[i + 1]; \
r->ring[idx + 2] = obj_table[i + 2]; \
r->ring[idx + 3] = obj_table[i + 3]; \
} \
switch (n & 0x3) { \
case 3: \
r->ring[idx++] = obj_table[i++]; \
__attribute__((fallthrough)); \
case 2: \
r->ring[idx++] = obj_table[i++]; \
__attribute__((fallthrough)); \
case 1: \
r->ring[idx++] = obj_table[i++]; \
} \
} else { \
for (i = 0; idx < size; i++, idx++) \
r->ring[idx] = obj_table[i]; \
for (idx = 0; i < n; i++, idx++) \
r->ring[idx] = obj_table[i]; \
} \
} while (0)
```
* 計算出插入的起點 `idx`
* 如果 `idx` 不需要 round 回 buffer 起點(`if (idx + n < size)...`),則隨 `idx++` 逐個加入即可
* 反之(`else...`),則要分成兩個步驟將指標加入 buffer
:::info
dequeue 的部份與 enqueue 的重點大致相同,因此省略細節的說明
:::
### 小結
在此範例中,我們看到了一個可應用於 SPSC 情境下的 ringbuffer。程式中透過 head 和 tail 的操作,確保 producer 或 consumer 可以在不需要使用 lock 的條件下正確的操作 buffer 中的內容。這應可以有效的同步讀寫兩端的操作。當然,需要更嚴謹的實驗來驗證程式的正確性和效能提升。
此外也值得思考的問題是,我們該如何接其擴展至 single producer + multiple consumer (SPMC) 或 multiple producer + multiple consumer (MPMC) 呢?
## Message bus
> Material: [2021q1 第 7 週測驗題: 測驗 4](https://hackmd.io/@sysprog/linux2021-quiz7/https%3A%2F%2Fhackmd.io%2F%40sysprog%2FryZQlSYru)
本範例中 `mbus.c` 展示了一種特殊的 [message bus](https://en.wikipedia.org/wiki/Message_broker) 設計,thread A 可以對預先註冊對信息處理的 [callback function](https://en.wikipedia.org/wiki/Callback_(computer_programming)),之後,thread B 可以向 thread A 傳遞訊息。有趣的是,在 thread B 所傳送的信息將在 thread A 中註冊的 callback 被處理,後續我們將會討論這個設計可能具備的優勢。
### 實作
#### `bus_t`
```cpp
typedef struct {
bool registered;
unsigned int refcnt;
bus_client_cb_t callback;
void *ctx;
} bus_client_t;
typedef struct {
bus_client_t *clients;
const unsigned int n_clients;
} bus_t;
```
`bus_t` 是 mbus 的主體結構,其中成員包含:
* `bus_client_t`: 會被動態配置成 `bus_client_t` 的陣列,共有 `n_clients` 個元素
* `n_clients`: `bus_t` 可接受的最大 client 註冊數量
`bus_client_t` 則是存在 mbus 中的每個 client:
* `registered`: 表示這個 client 是否處於註冊狀態(可對其傳遞信息)
* `refcnt`: 是嘗試傳遞給這個 client 的 thread 數量
* `callback`: 當 client 收到訊息時需要執行的 function pointer
* `ctx`: 是 client 要執行自己的 callback 時攜帶的參數,例如在此案例中是自己的 `bus_client_id_t`
#### `CAS`
```cpp
#define CAS(dst, expected, value) \
__atomic_compare_exchange(dst, expected, value, 0, __ATOMIC_SEQ_CST, \
__ATOMIC_SEQ_CST)
```
* CAS 是對 `__atomic_compare_exchange` 的封裝,後者會比對 `dst` 與 `expected` 的指標內容是否相同,如果是則用 `value` 取代之
#### `bus_new`
```cpp
bool __attribute__((warn_unused_result))
bus_new(bus_t **bus, unsigned int n_clients)
{
if (n_clients > BUS_MAX_CLIENTS)
return false;
bus_t *b;
if (!(b = malloc(sizeof(bus_t))))
return false;
/* Initialize bus struct */
*(unsigned int *) &b->n_clients =
!n_clients ? BUS_DEFAULT_CLIENTS : n_clients;
if (!(b->clients = calloc(b->n_clients, sizeof(bus_client_t)))) {
free(b);
return false;
}
*bus = b;
return true;
}
```
配置 `bus_t` 的空間與設置和相關成員及成員之空間:
* `__attribute__((warn_unused_result))` 確保回傳的變數不被使用時,編譯器會輸出警告信息,因此使用此 API 需要進行 error handling
#### `bus_register`
```cpp
bool __attribute__((warn_unused_result, nonnull(1)))
bus_register(bus_t *bus,
bus_client_id_t id,
bus_client_cb_t callback,
void *ctx)
{
if (id >= bus->n_clients)
return false;
bus_client_t null_client = {0};
bus_client_t new_client = {
.registered = true,
.callback = callback,
.ctx = ctx,
.refcnt = 0,
};
return (bool) CAS(&(bus->clients[id]), &null_client, &new_client);
}
```
根據 `bus_client_id_t` 向 `bus_t` 結構註冊一個 callback function:
* `nonnull(1)` 要求第一個參數 `*bus` 不可為 NULL
* `bus_client_id_t` 實際上是一個 unsigned int,因為每個 thread 都有獨立的 `bus_client_id_t`,因此可以用來代表註冊之 thread 的身份編碼,同時也對應一個專屬的 `bus->clients` 位置
* `bus_client_cb_t` 是一個 function pointer,其參數及回傳形式規範為 `typedef void (*bus_client_cb_t)(void *ctx, void *msg);`,`*ctx` 可以攜帶 callback 需要的額外參數(或設為 NULL),`*msg` 則是傳遞的信息本身
* `CAS` 確保對應 id 的 client 尚未被註冊,以新的結構 `new_client` 註冊之
#### `bus_send`
```cpp
bool __attribute__((warn_unused_result, nonnull(1)))
bus_send(bus_t *bus, bus_client_id_t id, void *msg, bool broadcast)
{
if (broadcast) {
for (id = 0; id < bus->n_clients; ++id)
execute_client_callback(&(bus->clients[id]), msg);
return true;
}
if (id >= bus->n_clients)
return false;
return execute_client_callback(&(bus->clients[id]), msg);
}
```
訊息的傳遞可以透過兩種方式進行,一種是 broadcast 方法,會將所有有註冊的 client 之 callback 皆執行,另一種則是針對指定的 client id 傳遞。兩者都是透過 `execute_client_callback` 對指定的 client 進行訊息傳遞。
#### `execute_client_callback`
```cpp
static bool execute_client_callback(bus_client_t *client, void *msg)
{
/* Load the client with which we are attempting to communicate. */
bus_client_t local_client;
__atomic_load(client, &local_client, __ATOMIC_SEQ_CST);
/* Loop until reference count isupdated or client becomes unregistered */
while (local_client.registered) {
/* The expected reference count is the current one + 1 */
bus_client_t new_client = local_client;
++(new_client.refcnt);
/* If CAS succeeds, the client had the expected reference count, and
* we updated it successfully. If CAS fails, the client was updated
* recently. The actual value is copied to local_client.
*/
if (CAS(client, &local_client, &new_client)) {
/* Send a message and decrease the reference count back */
local_client.callback(local_client.ctx, msg);
__atomic_fetch_sub(&(client->refcnt), 1, __ATOMIC_SEQ_CST);
return true;
}
}
/* Client was not registered or got unregistered while we attempted to send
* a message
*/
return false;
}
```
`execute_client_callback` 在 `bus_send` 中被呼叫以完成主要的傳送請求。
* `__atomic_load` 首先載入 client 到 `local_client` 中,並建立副本的 `new_client` 將 `refcnt` 加一,表示一個對 client 的新請求
* 透過 `CAS`,如果 `local_client` 與目前的 `client` 內容相同,表示過程中沒有其他 thread 也在使用同一個 client,因此副本的結果更新到 client 主體、執行 callback、並且將 `refcnt` 減一
#### `bus_unregister`
```cpp
bool __attribute__((warn_unused_result, nonnull(1)))
bus_unregister(bus_t *bus, bus_client_id_t id)
{
if (id >= bus->n_clients)
return false;
/* Load the client we are attempting to unregister */
bus_client_t local_client, null_client = {0};
__atomic_load(&(bus->clients[id]), &local_client, __ATOMIC_SEQ_CST);
/* It was already unregistered */
if (!local_client.registered)
return false;
```
`bus_unregister` 取消指定的 `id` 對 `bus` 的註冊
* 首先載入 id 對應的 client 到 `local_client`,並確定原先是有被註冊的
```cpp
do {
local_client.refcnt = 0; /* the expected reference count */
/* If CAS succeeds, the client had refcnt = 0 and got unregistered.
* If CAS does not succeed, the value of the client gets copied into
* local_client.
*/
if (CAS(&(bus->clients[id]), &local_client, &null_client))
return true;
} while (local_client.registered);
/* Someone else unregistered this client */
return true;
}
```
* 可以合法取消的 client 必須滿足與載入時的 `local_client` 除了 `refcnt` 相同,`refcnt` 需要被設為 0,表示已經沒有再發向 client 的請求,因此可以取消註冊了
#### `thread_func`
```cpp
static void *thread_func(void *_data)
{
thread_data_t *data = (thread_data_t *) _data;
bus_client_id_t dest = (data->id + 1) % NUM_THREADS;
/* Register our callback */
if (!bus_register(data->bus, data->id, &bus_callback, &(data->id))) {
perror("bus_register");
return NULL;
}
printf("Registered callback from thread %u\n", data->id);
/* Loop until the destination is registered from a separate thread */
while (!bus_send(data->bus, dest, &(data->id), false))
;
if (bus_unregister(data->bus, dest))
return NULL;
return NULL;
}
```
`thread_func` 是 thread 的主要任務,綜合前面對 mbus 相關函式的解釋,這裡的流程為:
* 每個 thread 都註冊自己成為 client
* 並且向嘗試另一個 thread client(為自己的 id 加上 1 % `NUM_THREADS`),直到成功為止,而傳送的訊息是自己的 id
* 最後,將該 client 從 bus 中取消註冊
#### 主程式
```cpp
int main()
{
pthread_t threads[NUM_THREADS];
thread_data_t ctx[NUM_THREADS];
bus_t *bus;
if (!bus_new(&bus, 0)) {
perror("bus_new");
exit(EXIT_FAILURE);
}
/* Launch threads, each with their own context containing a reference to the
* bus and their ID
*/
for (int i = 0; i < NUM_THREADS; ++i) {
ctx[i].bus = bus, ctx[i].id = i;
if (pthread_create(&threads[i], NULL, thread_func, &ctx[i]))
perror("pthread_create");
}
/* Wait until completion */
for (int i = 0; i < NUM_THREADS; ++i) {
if (pthread_join(threads[i], NULL))
perror("pthread_join");
}
bus_free(bus);
return 0;
}
```
最後,主程式的部份並不難理解,先初始化 `bus` 相關的空間含成員之後,建立 `NUM_THREADS` 數量的 thread 並執行 `thread_func` 的任務,最後再透過 `pthread_join` 回收這些 thread。
### 小結
整個 mbus 的設計並沒有太過難以理解的地方,大致是透過共用的 `bus` 結構,讓 thread 之間可以意識到彼此並交換信息,透過註冊 client 端的 callback function,每個 client 可以制定獨立的方式來處理接收到的 message。
*---- 以下僅為個人想法,需要相關資料或實驗驗證是否正確 ----*
比較特別的地方是其 callback 設計,對於從 sender 發送到 client 的信息,可以直接在發送端的 thread context 底下就進行處理(完全或部份),而不是等到切換至 client 自己的 thread context 。如果在 client 端需要處理的 message 數量很多時,這個作法應可以把成本分散到其他的 thread。或者是當某些 message 需要被迅速的回應(response)時,也許不需要等到 client 的 thread context,可以在 sender 端直接處理之。
*----------*
## SPMC queue
> Material: [2021q1 第 8 週測驗題: 測驗 1](https://hackmd.io/@sysprog/linux2021-quiz8/https%3A%2F%2Fhackmd.io%2F%40sysprog%2FB1pE7D-8d)
在 [Ring buffer](#Ring-buffer) 章節,展示了 SPSC(single-producer, single-consumer) 的模型。在這個章節,則展示 SPMC(single-producer, multiple-consumer) 的範例。
### C11 atomics
在開始閱讀程式碼前,先介紹在 C11 標準中新增的關鍵字: [`_Atomic`](https://en.cppreference.com/w/c/language/atomic),在 [C 語言規格書](http://www.open-std.org/jtc1/sc22/wg14/www/docs/n1548.pdf?fbclid=IwAR2k5K3ZPS4CzQ9AbVS96QD-5N2UQyE23Ui4ic270JX5Df3pEXJkg1cBDHA) 中,我們可以看到對此關鍵字的詳細規範。
在 [Time to move to C11 atomics?](https://lwn.net/Articles/691128/) 可以看到此關鍵字帶來的重要性與好處,因為 compiler 可能會重排指令以優化程式的運行,在並行(concurrent)的多執行緒程式中,由於操作的是相同的記憶體空間,這可能帶來非預期的結果。因此,我們會需要適當的語法以保證編譯器遵守預期的記憶體存取順序,在此之外仍然可以極大化的優化程式。藉由 `_Atomic` 關鍵字,在不同執行緒中對同一變數操作的 data race 得以被避免
### 實作
#### `spmc_ref_t`
```cpp
typedef struct __spmc_node {
size_t cap; /* One more than the number of slots available */
_Atomic size_t front, back;
struct __spmc_node *_Atomic next;
uintptr_t buf[];
} spmc_node_t;
typedef void (*spmc_destructor_t)(uintptr_t);
struct spmc_base {
/* current node which enqueues/dequeues */
spmc_node_t *_Atomic curr_enqueue, *_Atomic curr_dequeue;
uint8_t last_power;
spmc_destructor_t destructor;
};
typedef struct spmc_base *spmc_ref_t;
```
`struct spmc_base` 是 buffer 的基礎結構,在程式中會由全域變數向其操作 `*spmc_ref_t` 其成員包含:
* `curr_enqueue` 和 `curr_dequeue` 都是 `_Atomic` 類型的 `spmc_node_t *`,指向在 queue 中要消費或者生產的下一個節點
* `destructor` 是在 `spmc_delete` 的回收階段需要額外執行的函式,spmc 的使用者根據使用方式,需要進行必要的回收
* `last_power`: 是最近一次分配新的空間給 `spmc_node_t` 的大小 `cap`=$2^{last\_{power}}$
而 `spmc_node_t` 是包含 buffer 的節點,這些 node 同時還會形成一個 linked list,當對 node 中的 buffer 已達生產上限(尚未被消費掉),就產生一個新的 node 並配置一個更大的 buffer(詳細搭配後面的程式):
* `buf[]` 是實際要操作的元素之 buffer,這是 [Arrays of Length Zero](https://gcc.gnu.org/onlinedocs/gcc/Zero-Length.html) 語法,允許對一個 struct 分配變數長度大小的空間
* `front` 是消費者對該 node 的 buffer 所操作的下個 index,`back` 是生產者對該 node 的 buffer 所操作的下個 index,兩者需要搭配使用確認 buffer 是否是空或滿狀態
* `next` 是下一個 buffer 節點
* `cap` 是該 buffer 的空間大小
#### `spmc_new`
```cpp
spmc_ref_t spmc_new(size_t initial_cap, spmc_destructor_t destructor)
{
assert(initial_cap < sizeof(size_t) * CHAR_BIT);
const uint8_t power = initial_cap ? initial_cap : DEFAULT_INITIAL_POWER;
const size_t cap = 1 << power;
/* Allocate spmc_base and head spmc_node in the same underlying buffer */
spmc_ref_t spmc = malloc(
SIZE_FROM_CAP(cap, sizeof(struct spmc_base) + sizeof(spmc_node_t)));
spmc_node_t *const head = HEAD_OF(spmc);
init_node(head, head, cap);
```
`spmc_new` 為一個 `spmc_ref_t` 類型的結構配置需要的空間及成員
* `power` 可以是預設(`initial_cap==0`)或者根據給定的 `initial_cap` 來選擇初始 buffer 的容量 $2^{power}$
* `SIZE_FROM_CAP(cap, offset)` 展開為 `((cap) * sizeof(uintptr_t) + (offset))` 因此會分配出一個 `struct spmc_base` 大小 + (`spmc_node_t` 大小的 linked list 首個節點跟其下的 `buf` 需要的 `cap` 個 `uintptr_t`
* `HEAD_OF` 將 `spmc` 的地址加上一個 `struct spmc_base` 大小 + 1,因此可以找到 `spmc_node_t` 節點的位置
* `init_node(head, head, cap)` 初始化該節點,將 `head->next` 指向自己並設定其下的 `buf` 大小 `cap`
```cpp
atomic_init(&spmc->curr_enqueue, head);
atomic_init(&spmc->curr_dequeue, head);
spmc->destructor = destructor;
spmc->last_power = power;
return spmc;
}
```
* 最後初始化 `spmc` 成員所包含的成員
* `curr_enqueue` 和 `curr_dequeue` 都先指向第一個節點
#### `spmc_enqueue`
```cpp
bool spmc_enqueue(spmc_ref_t spmc, uintptr_t element)
{
spmc_node_t *node =
atomic_load_explicit(&spmc->curr_enqueue, memory_order_relaxed);
size_t idx;
retry:
idx = atomic_load_explicit(&node->back, memory_order_consume);
if (!IS_WRITABLE(idx, node)) {
spmc_node_t *const next =
atomic_load_explicit(&node->next, memory_order_relaxed);
/* Never move to write on top of the node that is currently being read;
* In that case, items would be read out of order they were enqueued.
*/
if (next !=
atomic_load_explicit(&spmc->curr_dequeue, memory_order_relaxed)) {
node = next;
goto retry;
}
```
`spmc_enqueue` 將 `element` 加入 buffer,注意只有一個生產者需要做 `enqueue` 操作
* `atomic_load_explicit` atomically 的載入 `spmc->curr_enqueue` 指向的節點
:::info
`memory_order_relaxed` 是最鬆散的 memory order,僅保證 load / store 本身的 atomical,對於 load / store thread 之間的指令順序的同步則無保證
:::
* 使用 `atomic_load_explicit` 載入 `node->back` 的值
:::info
`memory_order_acquire` (for load) 與 `memory_order_release` (for write) 成對使用以建立 Release-Acquire ordering,效果為禁止指令的重排:
* 在 load 以後的讀寫將被禁止重排到自己之前
* 在 store 以前的讀寫將被禁止重排自己之後
主要效果為: 確保在 thread A 的 store 之前對記憶體的操作,對 thread B 的 load 是可見的。
`memory_order_consume` (for load) 與 `memory_order_release` (for write) 成對使用以建立 Release-Consume ordering,後者相較於 Release-Acquire ordering 的限制更弱,因此可以更好的最佳化。在 load 以後與操作變數有 dependency 的其他讀寫將被限制重排,但沒有 dependency 的操作則不在此限制之列。
> [The Purpose of memory_order_consume in C++11](https://preshing.com/20140709/the-purpose-of-memory_order_consume-in-cpp11/)
:::
* `IS_WRITABLE` 檢查讀出來的 `node->back` 與 `node->front` 是否相差 `node->cap`,若是表示該 node 中的 queue 為滿狀態,需要建立一個新的 node
* 若已滿,需要找到 `next` 是正在 dequeue 的節點(`spmc->curr_dequeue`指向),並把新節點安插在這個 `next` 以前
:::warning
`IS_WRITABLE` 對 `node->front` 的操作是沒有 atomic 的?
:::
```cpp
const uint8_t power = ++spmc->last_power;
assert(power < sizeof(size_t) * CHAR_BIT);
const size_t cap = 1 << power;
spmc_node_t *new_node = malloc(SIZE_FROM_CAP(cap, sizeof(spmc_node_t)));
if (!new_node)
return false;
init_node(new_node, next, cap);
atomic_store_explicit(&node->next, new_node, memory_order_release);
idx = 0;
node = new_node;
}
...
```
* 新的節點承載的 queue 大小為 2^(last_power + 1),last_power 也要進行更新
* 從節點 link 的過程可以看出應會形成一個單向的 circular linked list
```cpp
node->buf[INDEX_OF(idx, node)] = element;
atomic_store_explicit(&spmc->curr_enqueue, node, memory_order_relaxed);
atomic_fetch_add_explicit(&node->back, 1, memory_order_release);
return true;
```
* 將 `element` 置入 buffer 中並更新 `spmc->curr_enqueue` 的位置,並更新 `node->back` + 1
#### `spmc_dequeue`
```cpp
bool spmc_dequeue(spmc_ref_t spmc, uintptr_t *slot)
{
spmc_node_t *node =
atomic_load_explicit(&spmc->curr_dequeue, memory_order_consume);
size_t idx;
no_increment:
do {
idx = atomic_load_explicit(&node->front, memory_order_consume);
if (!IS_READABLE(idx, node)) {
if (node != spmc->curr_enqueue)
atomic_compare_exchange_strong(
&spmc->curr_dequeue, &node,
atomic_load_explicit(&node->next, memory_order_relaxed));
goto no_increment;
} else
*slot = node->buf[INDEX_OF(idx, node)];
} while (
!atomic_compare_exchange_weak(&node->front, &(size_t){idx}, idx + 1));
return true;
}
```
`spmc_dequeue` 消費 buffer 中的元素,spmc 情境下會有多個 thread 同時在進行 `spmc_dequeue`
* `atomic_load_explicit` atomically 的載入 `spmc->curr_dequeue` 指向的節點
* 使用 `atomic_load_explicit` 載入 `node->front` 的值
* `IS_READABLE(idx, node)` 檢查讀出來的 `node->front` 與 `node->back` 是否相同,這表示 queue 的狀態為空
* 如果 queue 不為空,則直接取出 buffer 中的元素即可
* 如果 queue 為空且 `spmc->curr_enqueue` 指向的也是目前的 `spmc->curr_dequeue` 所指向,則只能 goto 持續等待 queue 被填入
* 否則透過 `atomic_compare_exchange_strong` 嘗試更新 `spmc->curr_dequeue` 到下個 node,之所以需要 compare-and-swap 因為是 multiple consumer 的 `spmc->curr_dequeue` 會有多個 thread 嘗試改寫
* 最後要 compare-and-swap 更新 `node->front` 將其加一,確保 consumer 互相不會重複的取到同個 element
#### `spmc_delete`
```cpp
void spmc_delete(spmc_ref_t spmc)
{
const spmc_node_t *const head = HEAD_OF(spmc);
spmc_node_t *prev;
if (spmc->destructor) {
for (spmc_node_t *node = head->next; node != head;
prev = node, node = node->next, free(prev))
for (size_t i = node->front; IS_READABLE(i, node); ++i)
spmc->destructor(node->buf[i]);
} else {
for (spmc_node_t *node = head->next; node != head;
prev = node, node = node->next, free(prev))
;
}
/* Also frees the head; it resides reside in the same buffer. */
free(spmc);
}
```
將整個 `spmc_ref_t` 結構的相關動態配置空間進行釋放
* 如果沒有初始化階段沒有指定 destructor,則只是逐一釋放 `head` 以外的節點釋放
* 需記得 `head` 是在 `spmc_ref_t` 中一起配置的
* 如果有設定 destructor,需要對 buffer 中每個元素運行之
#### `producer_thread`
```cpp
static void *producer_thread(void *arg)
{
spmc_ref_t spmc = arg;
for (uintptr_t i = 0; i < N_ITEMS; ++i) {
if (!spmc_enqueue(spmc, i))
fprintf(stderr, "Failed to enqueue on %zu.\n", (size_t) i);
}
return NULL;
}
```
`producer_thread` 只有一個被建立,核心的 `spmc_enqueue` 將多個 item 加入到 queue 中
#### `mc_thread`
```cpp
static void *mc_thread(void *arg)
{
spmc_ref_t spmc = arg;
uintptr_t element = 0, greatest = 0;
for (;;) {
greatest = (greatest > element) ? greatest : element;
if (!spmc_dequeue(spmc, &element))
fprintf(stderr, "Failed to dequeue in mc_thread.\n");
else if (observed_count[element]++)
fprintf(stderr, "Consumed twice!\n");
else if (element < greatest)
fprintf(stderr, "%zu after %zu; bad order!\n", (size_t) element,
(size_t) greatest);
printf("Observed %zu.\n", (size_t) element);
/* Test for sentinel signalling termination */
if (element >= (N_MC_ITEMS - 1)) {
spmc_enqueue(spmc, element + 1); /* notify other threads */
break;
}
}
return NULL;
}
```
多個 `mc_thread` 被建立,`observed_count` 作為初始為 0 的陣列會紀錄每次 dequeue 得到的內容以檢查正確運行的情況
* 沒有重複的 dequeue 結果
* 每次 dequeue 的結果值需大於前一次的值
一直取出 element 到第一個 `>= N_MC_ITEMS - 1` 的值的時候,該 thread 會結束,且 thread 結束前會把 element 再加入回 queue 中讓其他 thread 之後也會結束。
#### 主程式
```cpp=
#define N_MC_THREADS 16
int main()
{
spmc_ref_t spmc = spmc_new(0, NULL);
pthread_t mc[N_MC_THREADS], producer;
pthread_create(&producer, NULL, producer_thread, spmc);
for (int i = 0; i < N_MC_THREADS; i++)
pthread_create(&mc[i], NULL, mc_thread, spmc);
pthread_join(producer, NULL);
for (int i = 0; i < N_MC_THREADS; i++)
pthread_join(mc[i], NULL);
for (size_t i = 0; i < N_MC_ITEMS; ++i) {
if (observed_count[i] == 1)
continue;
fprintf(stderr, "An item seen %zu times: %zu.\n", observed_count[i], i);
}
spmc_delete(spmc);
return 0;
}
```
綜合前面的各個函式之解釋,從主程式中可看到,一個生產者的 `producer_thread` 與 16 個消費者 `mc_thread` 被建立,如果結果如預期的話,`observed_count` 的每個元素值應皆為 1,表示每個生產的元素皆被且僅被消費一次。
### ThreadSanitizer 的警告訊息
1. 使用以下選項執行時,TSAN 是沒有檢查到錯誤的,故可知問題高機率是發生在 memory ordering 的錯誤設置。
```
TSAN_OPTIONS="force_seq_cst_atomics=1" ./spmc
```
2. 發現修改 `spmc_dequeue` 的 `atomic_load_explicit(&node->next, memory_order_relaxed)` 的 memory oreder 為 `memory_order_consume` 就沒有 TSAN 的錯誤訊息了,然而:
* 直接編譯成 assembly 時(`gcc -S -o spmc.s spmc.c`) 兩個 order 產生之結果沒有差異
### 小結
在這個章節中,我們可以比較 SPMC 與 [SPSC](#Ring-buffer) 在設計上的不同考量,除了生產者與消費者間的同步之外,還需要注意到不同消費者之間的同步。相較於直接使用 lock,如果可以善用 atomic operation,且設置正確的 memory order ,則可以兼顧優化與並避免編譯器重排可能導致的 data race,設計出高效率的 SPMC queue。
:::warning
由於對 memory order 的了解尚不夠深刻,因此目前我對 memory order 在此章節程式碼的使用無法十分詳盡的總結。未來若有時間與機深入探討再回頭做補充 QQ
:::
### 延伸資料
* [Atomic operations library](https://en.cppreference.com/w/c/atomic)
* [std::memory_order](https://en.cppreference.com/w/cpp/atomic/memory_order)
* [Memory Model Basic](https://levelup.gitconnected.com/memory-model-basic-d8b5f8fddd5f)
* [如何理解 C++11 的六种 memory order?](https://www.zhihu.com/question/24301047)
## Map-Reduce
> Material: [2021q1 第 8 週測驗題: 測驗 2](https://hackmd.io/@sysprog/linux2021-quiz8/https%3A%2F%2Fhackmd.io%2F%40sysprog%2FrkbyTdGUd)
### Functional Programming
> 直接引用 [案例探討: Map-Reduce](https://hackmd.io/@sysprog/concurrency-mapreduce):
>
> *Functional Programming (以下簡稱 FP) 是種 programming paradigm (開發典範),不是 design pattern 也不是 framework,更不是 language。簡單來說,FP 是種以數學函數為中心的「思考方式」與「程式風格」。*
#### 常見操作
在 FP 中,定義一個 list 為
```
listof ∗ ::= nil | cons ∗ (listof ∗)
```
這表示一個列表(list)可以是 `nil`,表示無元素的列表。或者是一個元素 `*` 與另一個列表的 Cons。通常列表僅以簡單以方括號(`[` 與 `]`) 表示,從以下的範例更清楚的展示如何定義 list:
```
[] 表示 nil
[1] 表示 cons 1 nil
[1 2 3] 表示 cons 1 ( cons 2 ( cons 3 nil ))
```
則對於 FP 中常見的 reduce / map / filter 操作:
* reduce: `reduce f a` 可以理解為將 list 表示的 `cons` 替換成 `f` 且將 `nil` 替換成 `a`
* map: `map f` 操作為使 f 作用於 list 的每個元素上,定義覆合函數 `.` 為 `(f . g) h = f(g h)`,則可以理解為 `map f = reduce (cons . f) nil`
* filter: 檢索與篩選 list 中的每個元素
#### Side Effect
FP 沒有**隱含 (implicit)** 的 [Side Effect](https://en.wikipedia.org/wiki/Side_effect_(computer_science))。Side effect 簡單來說保證了每次呼叫函數時,對於相同的輸入,總可以保證傳回的數值不會改變。
#### Lazy evaluation
FP 具有 [lazy evaluation](https://en.wikipedia.org/wiki/Lazy_evaluation) 的特性。舉例來說,對一個程式 `(g.f)` 提供一個輸入 `input`,則 `input` 會經過 f 之後輸出,且此輸出成為 g 的輸入得到最終結果:
```
g (f input)
```
則 FP 中會有以下特性
* 在 FP 中,f 和 g 需要嚴格的同步運行。只有試圖讀取 g 的 output 時,才需要去啟動 f
* 因為沒有 side effect,所以可以把每個 input 對應的結果紀錄下來,則下次再遇到相同 input 的時候可以直接使用
### 實作
此章節中展示了藉由 POSIX Thread 實作的 MapReduce 風格之 word count 程式碼。
#### `mr_init`
```cpp
int mr_init(void)
{
if (pthread_barrier_init(&barrier, NULL, n_threads)) {
perror("barrier init");
return -1;
}
if (fa_init(file_name, n_threads, &file_size)) return -1;
if (wc_init(n_threads, file_size / MAX_WORD_SIZE)) return -1;
return 0;
}
```
* [`pthread_barrier_init`](https://linux.die.net/man/3/pthread_barrier_init) 建立一個 thread 之間的同步機制,只有當所有 thread 都抵達這個 barrier 之後,到達此除的 thread 才可以繼續向下執行,參數 `n_threads` 表示要有此數量的 thread 皆呼叫 `pthread_barrier_wait()` 才允許通過
#### `fa_init`
```cpp
static inline int fa_init(char *file, uint32_t n_threads, off_t *fsz)
{
/* Opening file */
if ((fd = open(file, O_RDONLY)) < 0) {
perror("open");
return -1;
}
/* Get file size */
if ((file_size = lseek(fd, 0, SEEK_END)) < 0) {
perror("lseek");
return -1;
}
file_content = mmap(NULL, file_size, PROT_READ, MMAP_FLAGS, fd, 0);
if (file_content == MAP_FAILED) file_content = NULL;
*fsz = file_size;
return 0;
}
```
由於每個 thread 都要讀取同一個檔案,`fa_init` 打開檔案並且藉由 `mmap` 建立檔案映射到全域的 `file_content`。`mmap` 的使用可以避免常規的檔案操作需要從 disk 到 kernel space 再到 user space 的拷貝過程(因為 page cache 機制會先在 kernel space 存住檔案,可以參考我的筆記[Linux 核心設計: Memory-The Page Cache](https://hackmd.io/@RinHizakura/rJTl9K5tv#The-Page-Cache) 或其他網路資源),程式可以藉由指標操作映射的記憶體段更有效率的來讀寫檔案。
:::danger
因公務繁忙(?)此章節待完成
:::
### 延伸閱讀
[案例探討: Map-Reduce](https://hackmd.io/@sysprog/concurrency-mapreduce)
[函数式程序设计为什么至关重要](https://byvoid.com/zhs/blog/why-functional-programming/)
[Concurrency Managed Workqueue之(一):workqueue的基本概念](http://www.wowotech.net/irq_subsystem/workqueue.html)
[Concurrency Managed Workqueue之(二):CMWQ概述](http://www.wowotech.net/irq_subsystem/cmwq-intro.html)
## RCU
> Matetial: [2021q1 第 13 週測驗題](https://hackmd.io/@sysprog/linux2021-quiz13)
### 初識 RCU
> 主要摘錄自 [What is RCU, Fundamentally?](https://lwn.net/Articles/262464/)
RCU 是 Linux kernel 中重要的同步機制,允許讀取與更新可以同時發生,相較於常規不論讀寫執行緒,直接確保 mutual exclusion 的作法;或者允許多個讀同時,但不允許讀寫同時的作法,RCU 具有更高的可擴展性(scalability),適合用在 reader 多但 writer 少的情境。
Reader 和 writer 可以同時執行的關鍵在被 RCU 保護的數據有兩份,一份是舊的,一份是新的。換句話說,RCU 中的 reader 可能看到舊的或者新的資料。如果我們的場景對資料的新舊不敏感(只要確保不是新到舊之間的中間狀態/無論存取新或舊資料都不會 corrupt),且使用情境為頻繁的讀取但鮮少更新,可以使用 RCU 來克服其他 lock 的缺點。
RCU 可由 3 個主要的元件組成:
1. writer/insertion: 發行與訂閱機制(Publish-Subscribe Mechanism)
2. writer/deletion: 等待原存在的 RCU reader 完成
3. readers: 維護多版本的共享物件
#### 發行與訂閱機制
```cpp=
struct foo {
int a;
int b;
int c;
};
struct foo *gp = NULL;
/* . . . */
p = kmalloc(sizeof(*p), GFP_KERNEL);
p->a = 1;
p->b = 2;
p->c = 3;
gp = p;
```
對於更新端,其 11 到 14 行的程式必須確保不能發生重排,`p` assign 到 `gp` 的操作必須發生於 `p` 的 member 都設置好之後,因此對應 RCU 的介面要修改為:
```cpp
p->a = 1;
p->b = 2;
p->c = 3;
rcu_assign_pointer(gp, p);
```
此操作為指標 `gp` *發佈(publish)* 新的結構。
```cpp
rcu_read_lock();
p = rcu_dereference(gp);
if (p != NULL) {
do_something_with(p->a, p->b, p->c);
}
rcu_read_unlock();
```
對於讀者端則 *訂閱(subscribe)* `gp` 的內容,也要確保其得到的 `gp` 是某個 *發佈(publish)* 後的版本。
#### 等待 RCU reader 完成
在上個例子中,可以看見額外的 `rcu_read_lock()` 和 `rcu_read_unlock()` 定義了讀者端的 critical section(RCU read-side critical section),這與 RCU 需要回收舊版本的物件有關。 RCU read-side critical sections 可以是巢狀的,但程式碼中不能涉及 block 或者 preempt(先不考慮 [SRCU](https://lwn.net/Articles/202847/))。
為了防止 RCU reader 所讀取物件被錯誤的先行釋放,需要界定每個舊物件的有效生命周期。基本的演算法形式為:
1. 進行改動,比如替換 linked list 中的一個元素
2. 等待所有已經存在的 RCU read-side critical sections 完成(比如,使用`synchronize_rcu()`)
* 關鍵點是接下來的 RCU read-side critical sections 無法再得到這個刪除元素 reference
3. 等待結束後,可以釋放上述被替換的元素
以下程式段之 19, 20, 21 行則展示了這個 3 個步驟對應實作:
```cpp=
struct foo {
struct list_head list;
int a;
int b;
int c;
};
LIST_HEAD(head);
/* . . . */
p = search(head, key);
if (p == NULL) {
/* Take appropriate action, unlock, and return. */
}
q = kmalloc(sizeof(*p), GFP_KERNEL);
*q = *p;
q->b = 2;
q->c = 3;
list_replace_rcu(&p->list, &q->list);
synchronize_rcu();
kfree(p);
```
簡單來說,假設 `synchronize_rcu()` 被呼叫的點為 $t$,則等待在時間點 $t$ 以前開始的 reader 完成後,釋放舊的內容就不會造成 corruption。該怎麼確保在 `synchronize_rcu()` 以前呼叫的 reader 都完成呢? 由於 RCU 通過 `rcu_read_lock()` 和 `rcu_read_unlock()` 界定的範圍是不允許 block 和 preempt 的。因此,當一個 CPU 進行 context switch 的時候,我們可以知道該 CPU 的 RCU read-side critical sections 已經完成。也就是說,在時間 $t$ 之後,只要確保每個 CPU 都至少進行了一次 context switch,那麼所有 $t$ 時間前的 RCU read-side critical sections 也就保證都完成了,即 `synchronize_rcu()` 可以安全返回。這段等待所有 CPU 跑完的時間稱作 grace period。
:::danger
這表示在 `rcu_read_lock()` 和 `rcu_read_unlock()` 間的執行不允許 context switch? 因此 RCU read-side critical section 間若工作太繁瑣會影響其他 RCU 無關的 process?
:::
#### 維護多版本的共享物件
RCU 藉由維護多個版本的物件(文章中以 linked list 為案例)以發揮其允許讀取與更新可以同時發生的特性,以下將藉由兩個案例來探討。
以下的程式碼展示了刪除節點的操作:
```cpp
p = search(head, key);
if (p != NULL) {
list_del_rcu(&p->list);
synchronize_rcu();
kfree(p);
}
```
並假設 linked list 為以下圖片的狀態:

每個節點中的三個數字分別代表節點中的 3 個 int `a`、`b` 和 `c` 的值。每個節點的紅色邊框表示 reader 持有對它們的 reference。由於 reader 不會直接與 writer 同步,reader 可能與進行更換的過程同時運行。
`list_del_rcu()` 完成後,`5,6,7` 的節點已從 list 中刪除,如下所示。但由於 reader 不直接與 writer 同步,reader 可能會在進行替換的途中同時掃描這個 list。更清楚的說,reader 可能會也可能不會看到新刪除的節點。獲取指向新刪除節點的 reader 可能會在刪除後的很長一段時間內看到舊版本的 list。因此,我們現在有兩個版本的列表,一個包含 `5、6、7`,一個沒有。

在退出 RCU read-side critical section 後,該 reader 對 `5、6、7` 的reference 就該被解除。因此,一旦 `synchronize_rcu()` 完成,從而保證所有預先存在的 reader 都已完成,就不會再有 reader reference `5, 6, 7`(黑色邊框)。

`5, 6, 7` 可以被回收(`kfree`) 回到如下的單一版本。

另一個例子展示替換一個節點時兩個版本的 linked list 的案例。
```cpp=
q = kmalloc(sizeof(*p), GFP_KERNEL);
*q = *p;
q->b = 2;
q->c = 3;
list_replace_rcu(&p->list, &q->list);
synchronize_rcu();
kfree(p);
```
假設 linked list 的初始狀態如下圖:

第 1 行配置一個還沒初始化的節點:

第 2 ~ 4 行複製 `5, 6, 7` 並改動部份內容:

第 5 行進行替換,以便新的節點最終對 reader 可見。此時如下圖所示,我們有兩個版本的 list。預先存在的 reader 可能會看到 `5,6,7` 元素,但新 reader 會看到 `5,2,3` 元素。但無論如何,任何 reader 都可以保證看到一個擁有完整結構的 list。

在第 6 行的 `synchronized_rcu()` 後,在 `list_replace_rcu()` 之前開始的 reader 都該完成。更詳細的說,任何可能持有 `5,6,7` 節點 reference 的 reader 都保證已經退出了他們的 RCU read-side critical section,不再有任何 reader 持有 `5、6、7` (黑色邊框)。

`5, 6, 7` 得以被釋放(`kfree`),我們又再次回到單一版本的 list。

### 實作
:::danger
總感覺目前的 [`rcu_list`](https://github.com/RinHizakura/concurrent-programs/blob/master/rcu_list/rcu_list.c) 專案簡化的難以掌握其核心?雖然可以理解程式碼行為但是看不出與 RCU 相關文章對應的機制?(也有可能是我對 RCU 仍理解得不夠透徹QQ)
:::
### 參考資料
[Linux内核同步机制之(七):RCU基础](http://www.wowotech.net/kernel_synchronization/rcu_fundamentals.html)