---
###### tags: `sysprog2021q1`
---
# 2021q1 Homework4 (quiz4)
contributed by < `93i7xo2` >
> Source: [2021q1 第 4 週測驗題](https://hackmd.io/@sysprog/linux2021-quiz4)
## 資料結構
### Thread Pool & Job Queue
```cpp
struct __threadpool {
size_t count;
pthread_t *workers;
jobqueue_t *jobqueue;
};
typedef struct __jobqueue {
threadtask_t *head, *tail;
pthread_cond_t cond_nonempty;
pthread_mutex_t rwlock;
} jobqueue_t;
```
- `count` 紀錄 thread pool 內 worker thread (即 `workers`) 的數量
- worker 閒置時向 `__jobqueue` 抓取被封裝的任務 `threadtask_t` 執行,以 FIFO 方式取出任務,`tail` 指向下一個待取出的任務。
- 存取共享變數 `__jobqueue` 必須保證為 exclusive use,若無任務 worker 則以 `pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock)` 等待新的任務
### Thread Task
```cpp
typedef struct __threadtask {
void *(*func)(void *);
void *arg;
struct __tpool_future *future;
struct __threadtask *next;
} threadtask_t;
struct __tpool_future {
int flag;
void *result;
pthread_mutex_t mutex;
pthread_cond_t cond_finished;
};
```
- worker 取得函式 `func` 執行的結果放在共享變數 `future`,而同時可能有其他執行緒存取 `future` 取得運算結果,同樣需要保護。
- `__threadtask` 與 `__tpool_future` 成對建立
- worker 設置 flag 表示運算進行 (`__FUTURE_RUNNING`) 或完成 (`__FUTURE_FINISHED`)
- 其他執行緒設置 flag 以干涉 worker 行為
- `__FUTURE_CANCELLED`
- `__FUTURE_DESTROYED`
## 程式流程
1. `tpool_create()`: 建立一定數量的 worker 於 pool,worker 執行 `jobqueue_fetch()` 抓取 job queue 內的任務
2. `tpool_apply()`: 將新建立的 `threadtask_t` 推進 job queue,每個 `threadtask_t` 各指向一個 `__tpool_future` 用以儲存運算結果。同時以 `pthread_cond_broadcast()` 喚醒等待 job queue 有新任務的 worker
3. `tpool_future_get()` 和 `tpool_future_destroy()`: 從 `__tpool_future` 取回運算結果後釋放
4. `tpool_join()`: 呼叫 `tpool_apply` 新增空任務,worker 拿到空任務後結束執行緒,緊接著釋放 pool 和 job
- `jobqueue_fetch()`
```graphviz
digraph finite_state_machine {
node [shape=point,label=""]ENTRY,EXIT;
rankdir=TD
get_task[shape=box label="Get a new task from queue"]
set_run[shape=box label="Set RUNNING"]
set_fin[shape=box label="Set FINISHED"]
broadcast[shape=box label="pthread_cond_broadcast()"]
free_task[shape=box label="free(__threadtask)"]
free_future[shape=box label="free(__tpool_future)"]
is_cancelled[shape=diamond, label="CANCELLED?"]
is_destroyed[shape=diamond, label="DESTROYED?"]
is_null_func[shape=diamond, label="Valid function?"]
ENTRY->get_task;
get_task->is_null_func [tailport=e]
is_null_func->is_cancelled [label="Yes"]
is_cancelled->free_task [label="Y", tailport=e, headport=e]
is_cancelled->set_run [label="N"]
set_run->is_destroyed [label=" Wait for function to finish"]
is_destroyed->free_future [label="Y"]
is_destroyed->set_fin [label="N", tailport=e]
free_future->free_task
set_fin->broadcast
broadcast->free_task
free_task->get_task [tailport=w, headport=w]
is_null_func->EXIT [tailport=e, label="No. Terminated"]
}
```
- `__FUTURE_CANCELLED` 與 `__FUTURE_DESTROYED` 的使用情境
- 在 `tpool_future_destroy()` 內,若 `future->flag` 為 `__FUTURE_FINISHED` 或`__FUTURE_CANCELLED` 兩者之一,表示 `__threadtask` 已被釋放無法被 worker 拿到,故在此釋放資源。
若非以上情況則說明對應的 `__threadtask` 仍可存取該 `future`,故將 `future->tag` 設置為 **`__FUTURE_DESTROYED`** 之後由 worker 來釋放 future 的資源。
- 同理 `jobqueue_destroy()` 也有必要依據 **`__FUTURE_DESTROYED`** 來釋放 future
- 使用到 `pthread_cancel()` 不代表設置 `__FUTURE_CANCELLED`。`pthread_cancel()` 僅在 `tpool_create()` 建立 worker 失敗時使用,向先前建立的 worker 發送 cancellation request
```cpp!
/* jobqueue_fetch */
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
pthread_testcancel();
while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
```
- `jobqueue_destroy()` 是唯一會將 flag 設置為 `__FUTURE_CANCELLED`,猜測使用情境是想中斷所有的 worker,摧毀所有 job queue 內的任務但保留 `__tpool_future` 自行處理 (但並沒有使用到關鍵的 `rwlock`)。
目前 `jobqueue_destroy()` 只有用在 `tpool_join()` 或是用在 `tpool_create()` 失敗時摧毀空 queue
```cpp
int tpool_join(struct __threadpool *pool) {
...
jobqueue_destroy(pool->jobqueue);
...
}
```
```cpp
struct __threadpool *tpool_create(size_t count) {
...
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
```
```cpp
static void jobqueue_destroy(jobqueue_t *jobqueue) {
threadtask_t *tmp = jobqueue->head;
while (tmp) {
/* Never executed */
}
pthread_mutex_destroy(&jobqueue->rwlock);
pthread_cond_destroy(&jobqueue->cond_nonempty);
free(jobqueue);
}
```
總之,沒有一種情況是同時有 worker 在運行。
## Known issues
### 修正 clock attribute
`pthread_cond_timedwait()` 的 expire time 使用 `CLOCK_MONOTONIC`
```cpp
struct timespec expire_time;
clock_gettime(CLOCK_MONOTONIC, &expire_time);
expire_time.tv_sec += seconds;
int status = pthread_cond_timedwait(&future->cond_finished,
&future->mutex, &expire_time);
```
> **pthread_cond_timedwait**
> The condition variable shall have a clock attribute which specifies the clock that shall be used to measure the time specified by the abstime argument.
> **pthread_condattr_setclock**
> The default value of the clock attribute shall refer to the system clock.
因為 `pthread_cond_timedwait()` 需要 condition variable 具有 clock attribute,而預設的 system clock 用 `pthread_condattr_getclock()` 取得得到 `CLOCK_REALTIME`,非預期的 `CLOCK_MONOTONIC`。
`CLOCK_REALTIME` 會跟 NTP 校正時間,可能往前或往後;而 `CLOCK_MONOTONIC` 則保證單調遞增,從特定點開始計數,[clock_getres](https://man7.org/linux/man-pages/man2/clock_getres.2.html) 說明在 Liunx 上,特定點指的是開機時間。兩者差異甚大,因此初始化時還需指定 clock attribute。
```diff
static struct __tpool_future *tpool_future_create(void) {
struct __tpool_future *future = malloc(sizeof(struct __tpool_future));
if (future) {
future->flag = 0;
future->result = NULL;
pthread_mutex_init(&future->mutex, NULL);
pthread_condattr_t attr;
pthread_condattr_init(&attr);
+ pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
pthread_cond_init(&future->cond_finished, &attr);
pthread_condattr_destroy(&attr);
}
return future;
}
```
### Timeout handling
在 [8dc72](https://github.com/93i7xo2/sysprog2021q1/commit/8dc72a44e8ccc73685d9c9b7aaeefd49c1796f27) 實現 timeout 處理,不使用 `__FUTURE_CANCELLED` 和 `__FUTURE_DESTROYED` 達成。
:::warning
TODO: 改用 Linux eventfd/timerfd 來實作 timeout 機制
:notes: jserv
:::
使用 `tpool_future_get()` 取得計算結果發生逾時時,嘗試從 job queue 移除 task 和 future:
1. 檢查 future->flag 是否有 `__FUTURE_RUNNING`,若有代表 worker 已取得對應的 task 進行計算,現行版本尚不支援中斷進行計算中的 worker,於是等待計算結果,結果等同
```cpp
tpool_future_get(futures[i], 0)
```
2. 若無 `__FUTURE_RUNNING`,表示對應的 task 仍存於 jobqueue,將其連同 future 移除並釋放資源。
試著從 `futures[i]` 最後一項開始取得計算結果,設置 time limit = 1 ms。由於計算上是由 `futures[0]` 開始至 `futures[PRECISION]`,一開始尾端的 task 尚未被 worker 取得計算,future 自然等不到 `pthread_cond_broadcast()`,因此從輸出可見一些 future 被移除。
```diff
- for (int i = 0; i <= PRECISION; i++) {
+ for (int i = PRECISION; i >= 0; i--) {
if (!futures[i])
continue;
double *result = tpool_future_get(futures[i], time_limit, pool->jobqueue);
if (result) {
bpp_sum += *result;
free(result);
tpool_future_destroy(futures[i]);
DEBUG_PRINT(("Future[%d] completed!\n", i));
} else
DEBUG_PRINT(("Cannot get future[%d], timeout after %d milliseconds.\n", i,
time_limit));
}
```
```bash
$ ./pi 4 1
Thread count: 4
Time limit: 1 ms
Cannot get future[100], timeout after 1 milliseconds. /* future[100] is removed */
Future[99] completed!
...
Future[0] completed!
Elapsed time: 11592788 ns
PI calculated with 101 terms: 3.141592653589793
```
### 使用 eventfd/timerfd 實現 Timeout 機制
> 基於 atomic_threadpool 改寫
#### 使用 eventfd 通知 main thread
最初的想法是每個 task 各維護 eventfd 產生的 file descriptor (`efd`),執行完成時透過 `write()` 通知 main thread 該 task 完成,這樣做的缺點是 `efd` 數量一多容易到達上限:
```bash
ulimit -n // 8192
```
所以改成 worker thread 各共用一個 epoll file descriptor (`epfd`),而 task 完成時 worker thread 向 `epfd` 註冊新的 `efd`,其內部 counter 值為 task 內部的 id (>0),以便 main thread 辨識。當然 counter 一開始初始化為 0,寫入 counter 的同時觸發 EPOLLIN event:
```cpp
static void add_event(int epoll_fd, int fd, int state) {
struct epoll_event ev = {
.events = state,
.data.fd = fd,
};
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev);
return;
}
/* worker thread */
do_task();
int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
add_event(epfd, efd, EPOLLIN);
eventfd_write(efd, id); // id > 0
/* main thread */
struct epoll_event events[MAX_EVENTS_SIZE];
uint64_t id;
for (;;) {
nfds = epoll_wait(epfd, events, MAX_EVENTS_SIZE, -1);
for (i = 0; i < nfds; i++) {
if (events[i].events & EPOLLIN) {
int ret = read(events[i].data.fd, &id, sizeof(uint64_t));
close(events[i].data.fd);
if (ret > 0)
done_list[id] = 1;
}
}
}
```
改用 eventfd 還能拿掉麻煩的 mutex!
#### 加入 timeout 機制
接下來思考 main thread 的 Timeout 機制實作方式。
首先移除作為 worker thread 和 main thread 溝通的工具-`tpool_future_t` ,以 eventfd 通知 main thread,並從 main thread 通知 worker thread 取消 task。取消 task 的方式:
1. main thread 向所有 worker thread 傳送 cancellation request
2. cancellation request 是由 `pthread_cancel()` 發送,但這樣需要從 task queue 拿出特定 task,也需要重建 worker thread,由實驗可知建立 thread 成本相當巨大(相對於 task)。
3. 折衷之下,將 cancellation request 定義為 main thread 向 worker thread 發送的 eventfd event。每個 worker thread 維護各自的 epoll instance,用來接收 main thread 取消特定 task 的通知並紀錄,等到 dequeue 到該 task 即捨棄。
而等待 eventfd event 的同時等待 timerfd event。接收到 timerfd event 則依序檢查 task 是否完成,若無,透過 `tp_task_cancel` 向 worker thread 通知取消;接收到 eventfd event 則將其運算結果累加至 `bpp_sum`。
```cpp=
/* create timer */
int timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK);
/* add timer fd to epoll monitor event */
add_event(epfd, timer_fd, EPOLLIN);
/* update timer */
struct itimerspec its = {
.it_interval.tv_sec = ms / 1000,
.it_interval.tv_nsec = (ms % 1000) * 1000 * 1000,
.it_value.tv_sec = ms / 1000,
.it_value.tv_nsec = (ms % 1000) * 1000 * 1000,
};
if (timerfd_settime(timer_fd, 0, &its, NULL) < 0) return -1;
int received, expected;
received = expected = 0
while (received + expected < PRECISION + 1) {
int fire_events = epoll_wait(epfd, events, MAX_EVENTES, time_limit);
for (int i = 0; i < fire_events; ++i) {
uint64_t counter;
ssize_t size = read(events[i].data.fd, &counter, sizeof(uint64_t));
if (size != sizeof(uint64_t))
handle_error("read error");
if (events[i].data.fd == timer_fd) {
/* check if the task is finished */
if (futures[expected].status & __TASK_PENDING) {
futures[expected].status |= __TASK_TIMEOUT;
tp_task_cancel(tp, futures[expected].id);
printf("Cancel task-%d\n", futures[expected].id);
}
expected++;
} else {
/* store caculated data */
int idx = counter - 1;
if (~(futures[idx].status & __TASK_TIMEOUT)) {
futures[idx].status |= __TASK_FINISHED;
bpp_sum += *((double *)futures[idx].result);
received++;
}
close(events[i].data.fd);
}
}
}
close(timer_fd);
```
整合上述幾點的實作 [xxxxx](),測試時發現 `-O3` 會發生預期外的結果,索幸用 `-O0` 進行測試:
```bash
~$ make benchmark
```
於 i5-6200U 上測試,統一設定 timeout = 1ms,與未改用 eventfd 的版本相比效能大減。
```diff
/* benchmark.sh */
- result=$(./$file $thread_count)
+ result=$(./$file $thread_count 1)
```
![](https://i.imgur.com/VzYDJKC.png)
## 研讀 [atomic_threadpool](https://github.com/Taymindis/atomic_threadpool)
atomic_threadpool 是一套使用 lock-free FIFO queue (`lfqueue_deq`) 實作的函式庫,支援 MS-Windows, macOS 與 Linux,近一半的程式碼是對不同平台所提供的 API 進行調整。
首先看較重要的函式 `at_thpool_worker`,功能等同 `jobqueue_fetch`,旨在從 job queue 抓取 task,task 由函式 `task->do_work` 及參數 `task->arg` 組成,由於沒有需要存入運算結果, task 運算結束後返回 `TASK_PENDING` 抓取下一個 task。
使用 `tp->nrunning` 來表示進行中的 worker 數量,設置 `tp->is_running=0` 能立即終止尚未執行 task 的 worker。
```cpp
void* at_thpool_worker(void *_tp) {
#endif
at_thpool_t *tp = (at_thpool_t*)_tp;
AT_THPOOL_INC(&tp->nrunning); // __sync_fetch_and_add(v, 1)
at_thtask_t *task;
void *_task;
lfqueue_t *tq = &tp->taskqueue;
TASK_PENDING:
while (tp->is_running) {
if ( (_task = lfqueue_deq(tq)) ) {
goto HANDLE_TASK;
}
lfqueue_sleep(1);
}
AT_THPOOL_DEC(&tp->nrunning); // __sync_fetch_and_dec(v, 1)
#if defined _WIN32 || defined _WIN64
return 0;
#else
return NULL;
#endif
HANDLE_TASK:
task = (at_thtask_t*) _task;
task->do_work(task->args);
AT_THPOOL_FREE(task);
goto TASK_PENDING;
}
```
當使用 `lfqueue_deq` 從 job queue 從尾端取得 task 時,理想狀況其他執行緒可同時使用 `lfqueue_enq` 插入新的 task 而不衝突,而 lfqueue 也確實做到。
```cpp
while (tp->is_running) {
if ( (_task = lfqueue_deq(tq)) ) {
goto HANDLE_TASK;
}
lfqueue_sleep(1);
}
```
### `lfqueue`
atomic_threadpool 所使用到的 `lfqueue` API,共同操作同一個 lfqueue_t 型態的 `lfqueue`:
```cpp
extern int lfqueue_init(lfqueue_t *lfqueue);
extern int lfqueue_enq(lfqueue_t *lfqueue, void *value);
extern void* lfqueue_deq(lfqueue_t *lfqueue);
extern void lfqueue_destroy(lfqueue_t *lfqueue);
```
```cpp
typedef struct {
lfqueue_cas_node_t *head, *tail, *root_free, *move_free;
volatile size_t size;
volatile lfq_bool_t in_free_mode;
lfqueue_malloc_fn _malloc;
lfqueue_free_fn _free;
void *pl;
} lfqueue_t;
```
結構內有兩條佇列
1. `head`/`tail`: 分別指向佇列前端/尾端,`head` 同時也指向上一個取出的節點,
2. `root_free`/`move_free`: `root_free` 分別指向佇列前端/尾端,紀錄等待被釋放的節點。每個 `lfqueue_cas_node_t` 型態的節點都帶有 lfq_time_t 型態的 `_deactivate_tm` 紀錄進入佇列的時間。
- `__lfq_check_free`
```cpp=
static void
__lfq_check_free(lfqueue_t *lfqueue) {
lfq_time_t curr_time;
// 限制只有一條執行緒能夠執行
if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->in_free_mode, 0, 1)) {
// 以 rootfree 為起始刪除後面節點
// 並限制離 _deactivate_tm 超過 2s
lfq_get_curr_time(&curr_time);
lfqueue_cas_node_t *rtfree = lfqueue->root_free, *nextfree;
while ( rtfree && (rtfree != lfqueue->move_free) ) {
nextfree = rtfree->nextfree;
if ( lfq_diff_time(curr_time, rtfree->_deactivate_tm) > 2) {
// printf("%p\n", rtfree);
lfqueue->_free(lfqueue->pl, rtfree);
rtfree = nextfree;
} else {
break;
}
}
lfqueue->root_free = rtfree;
__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->in_free_mode, 1, 0);
}
__LFQ_SYNC_MEMORY();
}
```
第 9 行表明佇列至少存在一個節點,因此 `move_free` 不存在指向 NULL 的可能性,好處是**插入時不用判斷是否指向 NULL**
```cpp
move_free->next = new_node;
```
缺點是針對程式起始、結束要特別處理多出來的一個節點,另一條佇列同樣情況。
```cpp
int
lfqueue_init_mf(lfqueue_t *lfqueue, void* pl, lfqueue_malloc_fn lfqueue_malloc, lfqueue_free_fn lfqueue_free) {
...
freebase->value = NULL;
freebase->next = NULL;
freebase->nextfree = NULL;
freebase->_deactivate_tm = 0;
lfqueue->head = lfqueue->tail = base; // Not yet to be free for first node only
lfqueue->root_free = lfqueue->move_free = freebase; // Not yet to be free for first node only
...
}
```
```cpp
void
lfqueue_destroy(lfqueue_t *lfqueue) {
...
if (rtfree) { // rtfree will never be NULL
lfqueue->_free(lfqueue->pl, rtfree);
}
...
}
```
### `__lfq_recycle_free`
`__lfq_check_free` 稍早已提到功能是清除以 root_free 為首的佇列,而 `__lfq_recycle_free` 便是將節點推入佇列。
```cpp=
static void
__lfq_recycle_free(lfqueue_t *lfqueue, lfqueue_cas_node_t* freenode) {
lfqueue_cas_node_t *freed;
do {
freed = lfqueue->move_free;
} while (!__LFQ_BOOL_COMPARE_AND_SWAP(&freed->nextfree, NULL, freenode) );
lfq_get_curr_time(&freenode->_deactivate_tm);
__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->move_free, freed, freenode);
}
```
- 佇列尾端 `move_free->nextfree = NULL`。
- 節點型態雖為 `lfqueue_cas_node_t` 但非用 `next` 而是以 `nextfree` 進行連結。
- 節點在 enqueue 時即初始化 `nextfree = NULL`,因此 `freenode->nextfree = NULL`。
- 當第 6 行以 `__LFQ_BOOL_COMPARE_AND_SWAP` 插入節點 `freenode` 至 `move_free` 後端,即限制了直到第 10 結束都沒有其他執行緒可執行下去,形成 critical section。
```graphviz
digraph {
// rankdir=TB
subgraph MVF {
label="Local Datacenter";
mvf [shape=record label="|<h>nextfree"]
NULL1 [shape=plaintext label="NULL"]
}
subgraph FN {
label="Local Datacenter";
fn [shape=record label="|<h>nextfree"]
NULL2 [shape=plaintext label="NULL"]
}
movefree [shape=plaintext]
freenode [shape=plaintext]
movefree -> mvf
freenode -> fn
mvf:h:e -> NULL1:w [style="dashed"]
mvf:h:e -> fn:w
fn:h:e -> NULL2:w
{rank = same; movefree; freenode;}
{rank = same; NULL1; NULL2;}
}
```
### `lfqueue_enq`
插入新節點
```cpp
static int
_enqueue(lfqueue_t *lfqueue, void* value) {
lfqueue_cas_node_t *tail, *node;
node = (lfqueue_cas_node_t*) lfqueue->_malloc(lfqueue->pl, sizeof(lfqueue_cas_node_t));
if (node == NULL) {
perror("malloc");
return errno;
}
node->value = value;
node->next = NULL;
node->nextfree = NULL;
for (;;) {
__LFQ_SYNC_MEMORY(); // 沒有 memory barrier 會因為編譯器最佳化或是因為 out-of-order execution 無法拿到最新的 tail
tail = lfqueue->tail; // 不可能發生 tail = NULL
if (__LFQ_BOOL_COMPARE_AND_SWAP(&tail->next, NULL, node)) {
/* 由於使用 atomic operation
* 同一時間只會有一個執行緒更新 tail->next
* 接下來才會將 tail 指向新的尾端完成整個插入流程
* 另一個執行緒所拿到的 tail->next 才會是 NULL
*/
// compulsory swap as tail->next is no NULL anymore, it has fenced on other thread
__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->tail, tail, node);
__lfq_check_free(lfqueue); // cleanup
return 0;
}
}
/*It never be here*/
return -1;
}
```
觀察 linux 上的實作,使用到 lock-free 技巧常見的 CAS 等 atomic operation,令人不解的是大部份函式都有 [full barrier](https://gcc.gnu.org/onlinedocs/gcc-4.5.3/gcc/Atomic-Builtins.html) 的作用,這裡特地引進 `__sync_synchronize` (full barrier)。
```cpp
#define __LFQ_VAL_COMPARE_AND_SWAP __sync_val_compare_and_swap
#define __LFQ_BOOL_COMPARE_AND_SWAP __sync_bool_compare_and_swap
#define __LFQ_FETCH_AND_ADD __sync_fetch_and_add // skip
#define __LFQ_ADD_AND_FETCH __sync_add_and_fetch
#define __LFQ_YIELD_THREAD sched_yield
#define __LFQ_SYNC_MEMORY __sync_synchronize
```
:::warning
1. `__sync` 開頭的 builtins 已被 gcc 標註為 deprecated,可改用 [gcc atomics builtins](https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html) 或 C11 Atomics。
2. memory barrier 就是第 17 週課程內容
:notes: jserv
:::
### `lfqueue_deq`
從佇列前端移去節點,要判斷的情況較多。改寫部份程式碼和搭配註解。
- 8 行的 CAS 應是為了避免取得 `head` 後,原有的 head 指向的節點被其他執行緒釋放導致 `head->next` 產生未定義行為。實際上釋放會在 2 秒後由 `__lfq_check_free` 執行,發生未定義行為的可能性極微,即使加了 CAS 也不能避免 8~9 行間 `__lfq_check_free` 釋放掉 `head`。
```cpp=
static void *
_dequeue(lfqueue_t *lfqueue) {
lfqueue_cas_node_t *head, *next;
void *val;
for (;;) {
head = lfqueue->head;
if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->head, head, head)) {
next = head->next;
if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->tail, head, head)) {
/* head == tail 代表無可取出節點,此時 next 必為 NULL
* 這時可能有 `lfqueue_enq` 執行插入節點產生 `next != NULL` 的情況
* 故需做判斷
*/
if (next == NULL) {
/* 遇到這種情況重新取值即可,但一些函式需要回傳值判斷 queue 為空 */
val = NULL;
goto _done;
}
}
else {
/* 目的是排除 next = NULL 的狀況,以免 dereference 出錯
* 正常情況下 next = NULL 只有在 head = tail 情況成立
*/
if (next) {
val = next->value;
if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->head, head, next)) {
break;
}
} else {
val = NULL;
goto _done;
}
}
}
}
__lfq_recycle_free(lfqueue, head);
_done:
// __asm volatile("" ::: "memory");
__LFQ_SYNC_MEMORY();
__lfq_check_free(lfqueue);
return val;
}
```
使用 gdb 觀察 31 行發生當下:
```bash
(gdb) p next
$14 = (lfqueue_cas_node_t *) 0x0
(gdb) p head->next
$15 = (struct lfqueue_cas_node_s *) 0x55555bf83db0
(gdb) p head
$17 = (lfqueue_cas_node_t *) 0x55555bf83d80
(gdb) p *head->next
$18 = {value = 0x555555c57490, next = 0x55555bf83de0, nextfree = 0x55555bf83de0, _deactivate_tm = 1623174704}
```
```cpp=7
head = lfqueue->head;
next = head->next;
```
gdb 顯示 `next != head->next`,明顯與上方程式碼矛盾。表明 7~10 行即使通過 `__LFQ_BOOL_COMPARE_AND_SWAP`,`head` 可能經由其他執行緒的 enqueue,導致 `next` 指向其他新的節點。
:::info
不會採用此方式實作 thread pool,因為沒有看到解決 [ABA problem](https://en.wikipedia.org/wiki/ABA_problem) 的方法。
:::
## 使用 C11 Atomics 改寫
> The idea of "lock free" is not really not having any lock, the idea is to minimize the number of locks and/or critical sections, by using some techniques that allow us not to use locks for most operations.
> In this sense, the lock in lock-free does not refer directly to mutexes, but rather to the possibility of “locking up” the entire application in some way, whether it’s deadlock, livelock – or even due to hypothetical thread scheduling decisions made by your worst enemy. - [An Introduction to Lock-Free Programming](https://preshing.com/20120612/an-introduction-to-lock-free-programming/)
廣泛的說,lock-free 並非指不使用 mutex (lock) 而是指鎖住整個 process 的可能性,下方程式碼雖然沒有使用到 mutex,但若兩個 thread 執行同樣的程式碼,在特定執行順序下永遠無法離開 loop。
```cpp
while (X == 0)
{
X = 1 - X;
}
```
而使用 mutex 的 thread 在最差情況下,所使用的 mutex 尚未被其他 thread 所釋放,而停滯不前,自然不算在 lock-free 範疇。
lock-free 所使用到的 `do{...}while(CAS)` 雖然是 spinlock,但能節省 mutex 所需的 syscall,一般認為效能更好。
### lock-free queue
參考 [RZHuangJeff](https://github.com/RZHuangJeff/tpool/commit/f57bb7e86caef6808ff2c7d9b4b5868ef49ef6f3) 使用 ring buffer 實作 atomic queue。 使用 ring buffer 有無需管理記憶體及固定緩衝區大小的好處,再以 [mmap](https://man7.org/linux/man-pages/man2/mmap.2.html) 處理緩衝區邊界,減少判斷讀寫是否會超出邊界所帶來的效能影響。
設計上只有一個 producer,使用 `count` 來紀錄存入的資料數,consumer 依照 `count` 決定是否往下執行,避開判斷邊界(如下)。由於 `head`, `tail` 都是 atomic,依據邊界來判斷需於一個 CAS 指令內對兩個變數進行操作,較為不便。
```cpp
// is full
head == (tail ^ rbf->size*2)
// is empty
head == tail
```
為避免 ABA problem,將用來存 offset 的變數前 32-bit 放置要存入 buffer 的資料的一小部份,例如 (void*) 型態的資料為 8 byte,假設資料前 4 byte 相同不具特徵(e.g.連續記憶體位置的指標),將後 4 byte 併入 offset 的高 4 byte,這麼做即使在 CAS 判斷 offset 相同,也能透過前 4 byte 得知資料更動。
```cpp=
/* producer */
bool enqueue(ringbuffer_t *rb, void **src) {
uint64_t _read, _write;
_read = atomic_load(&rb->read_offset) & RB_OFF_MASK;
_write = atomic_load(&rb->write_offset) & RB_OFF_MASK;
if (_read == (_write ^ rb->size))
return false;
memcpy(&rb->buffer[_write], src, sizeof(void *));
_write = (_write + sizeof(void *)) & rb->mask;
_write |= ((uint64_t)*src << 32);
atomic_store_explicit(&rb->write_offset, _write, memory_order_release);
atomic_fetch_add_explicit(&rb->count, 1, memory_order_release);
return true;
}
/* consumer */
bool dequeue(ringbuffer_t *rb, void **dst) {
int64_t count, new_count;
do {
count = atomic_load(&rb->count);
new_count = count - 1;
if (__builtin_expect((new_count < 0), 1))
return false;
} while (!atomic_compare_exchange_weak(&rb->count, &count, new_count));
uint64_t _read, new_read;
do {
_read = atomic_load(&rb->read_offset);
new_read = (((_read & RB_OFF_MASK) + sizeof(void *)) & rb->mask);
memcpy(dst, &rb->buffer[_read & RB_OFF_MASK], sizeof(void *));
} while (!atomic_compare_exchange_weak(&rb->read_offset, &_read, new_read));
return true;
}
```
### affinity-based thread pool
實作 lock-free 時找到[期末專題](https://hackmd.io/oSZcq1_STo6LgInSfBn2_Q?view)提及一篇參考資料-[Thread safety with affine thread pools](https://bartoszsypytkowski.com/thread-safety-with-affine-thread-pools/),便一並實作。該文提到 thread 需要為在處理器核 (core) 切換間付出 context switching 及 cache refresh/invalidation 的代價,因此主張同一段程式碼應固定在同一個處理器執行。
此外,也提出 [work stealing](https://en.wikipedia.org/wiki/Work_stealing) 的實作方式:所有 thread 應有屬於自己的 private queue 及共享的 shared queue。thread 從 private queue 提取任務執行,在閒置時則從 shared queue 提取,以最大化利用資源。
[afn_threadpool.c](https://github.com/93i7xo2/sysprog2021q1/blob/master/quiz4/afn_threadpool.c) 使用 lock-free queue 實作上述功能,達到以下需求:
- 建立與處理器核數 (physical,而非 logical) 相同數量的 threads 接收任務,並使用 `pthread_setaffinity_np()` 固定在不同的處理器上。
- 提供方法將任務排進 private queue 或 shared queue,~~排進 shared queue 時以 round-robin 方式喚醒固定再不同核上的 thread,以求資源利用最大化。~~
- thread 每執行 32 個任務即交換 private queue 和 shared queue,以防 starvation。
[afn_threadpool_pi.c](https://github.com/93i7xo2/sysprog2021q1/blob/master/quiz4/afn_threadpool_pi.c) 採用 afn_threadpool.c 來計算 pi。
```cpp
int main(int argc, char **argv) {
...
threadpool_t *tp;
if (!tp_init(&tp, nthreads))
exit(EXIT_FAILURE);
tpool_future_t *futures[PRECISION + 1];
for (int i = 0; i <= PRECISION; i++) {
bpp_args[i] = i;
futures[i] = tp_queue(tp, bpp, (void *)&bpp_args[i]);
}
for (int i = 0; i <= PRECISION; i++) {
if (!futures[i])
continue;
double *result = tpool_future_get(futures[i], time_limit);
if (result) {
bpp_sum += *result;
free(result);
tpool_future_destroy(futures[i]);
}
}
tp_join(tp);
tp_destroy(tp);
...
}
```
### 效能比較
比較 3 種 thread pool 計算 $\pi$ (PRECISION=1000) 所需時間
1. mutex protected queue (orginal)
- `./threadpool_pi`
2. lock-free queue + affinity-based thread pool
- `./afn_threadpool_pi`
- w/o `pthread_setaffinity_np()`
3. lock-free queue
- `./afn_threadpool_pi_v2`
- w/ `pthread_setaffinity_np()`
- [source code](https://github.com/93i7xo2/sysprog2021q1/tree/master/quiz4)
- 執行方式
```bash
make benchmark && make plot
```
- 實驗結果 I - [Linode Dedicated CPU Instances](https://www.linode.com/blog/linode/introducing-linode-dedicated-cpu-instances/) (32 cores/64G RAM)
```
~$ lscpu
Model name: AMD EPYC 7501 32-Core Processor
Stepping: 2
CPU MHz: 1999.998
BogoMIPS: 3999.99
Hypervisor vendor: KVM
Virtualization type: full
L1d cache: 2 MiB
L1i cache: 2 MiB
L2 cache: 16 MiB
L3 cache: 512 MiB
~$ uname -r
5.4.0-72-generic
~$ gcc --version
gcc (Ubuntu 9.3.0-17ubuntu1~20.04) 9.3.0
```
使用 lock-free 明顯減少執行時間,可惜隨著 thread 增加 throughtput 反而下降。
![](https://i.imgur.com/FDMvZdh.png)
接下來參考 [eecheng87](https://hackmd.io/@eecheng/BJpGSJWq8) 的作法使用 `sched_yield()`,在 dequeue 失敗時讓出 CPU,這樣的好處是如果單一處理器上有多個 thread,執行任務的優先拿到 CPU,減少無謂的 dequeue。因此可見 lock-free queue 的版本執行時間下降,本來就是單個 thread 獨占處理器的則不影響。
```diff
/* afn_thradpool.c */
- !(dequeue(p_queue, (void **)&task) || dequeue(s_queue, (void **)&task)));
+ !(dequeue(p_queue, (void **)&task) || dequeue(s_queue, (void **)&task))){
+ sched_yield();
+}
```
![](https://i.imgur.com/cTcsNHq.png)
- 實驗結果 II - AMD Ryzen 7 3800XT & Intel i5-6200U
由於 Linode 提供的虛擬機無法得知 CPU mapping 和實際分配的 physical core 數目,因此改用其他機器進行實驗,同時加入 likwid-topology。
[likwid-topology](https://github.com/RRZE-HPC/likwid/wiki/likwid-topology) 是一套列出 SMT thread 、快取與處理器核階層關係的工具。`HWThread` 代表在 linux 出現的 CPU 編號,也就是 htop 看到的 CPU0、CPU1...;`Thread` 則是處理器核上的 SMT Thread 編號;`Core` 代表 physical core,如下列所示。
實驗先使用 likwid 函式庫取得 cpu topology,將 thread 固定在不同的 physical core 上 (雖然從 `HWThread=0` 循序往下放效果一樣,但使用 likwid 方便日後指定在任意處理器核上實驗)。
```bash
~$ likwid-topology -g
--------------------------------------------------------------------------------
CPU name: AMD Ryzen 7 3800XT 8-Core Processor
CPU type: nil
CPU stepping: 0
********************************************************************************
Hardware Thread Topology
********************************************************************************
Sockets: 1
Cores per socket: 8
Threads per core: 2
--------------------------------------------------------------------------------
HWThread Thread Core Socket Available
0 0 0 0 *
1 0 1 0 *
2 0 2 0 *
3 0 3 0 *
4 0 4 0 *
5 0 5 0 *
6 0 6 0 *
7 0 7 0 *
8 1 0 0 *
9 1 1 0 *
10 1 2 0 *
11 1 3 0 *
12 1 4 0 *
13 1 5 0 *
14 1 6 0 *
15 1 7 0 *
--------------------------------------------------------------------------------
```
```cpp
/* afn_threadpool_pi.c */
CpuTopology_t topo = get_cpuTopology();
int numSockets = topo->numSockets,
numCoresPerSocket = topo->numCoresPerSocket,
numHWThreads = topo->numHWThreads, cpulist[topo->numHWThreads], idx = 0;
for (int socket = 0; socket < numSockets; ++socket) {
for (int core = 0; core < numCoresPerSocket; ++core) {
for (int i = 0; i < numHWThreads; ++i) {
int threadId = topo->threadPool[i].threadId,
coreId = topo->threadPool[i].coreId,
packageId = topo->threadPool[i].packageId,
apicId = topo->threadPool[i].apicId;
if (packageId == socket && coreId == core) {
cpulist[idx + threadId * (numCoresPerSocket * numSockets)] = apicId;
}
}
idx++;
}
}
topology_finalize();
```
實驗過程中發現 likwid 初始化時間頗長,為了凸顯 lock-free 和 mutuex protected 兩者實作的差異,將 thread pool 初始化和釋放排除在測量時間外([`b3cd6`](https://github.com/93i7xo2/sysprog2021q1/commit/b3cd6764f44441c75da20a3d77e58533ed445d39)),結果發現時間大幅縮短,簡短的在 i5-6200U 上測試建立及釋放時間:
![](https://i.imgur.com/rES7enO.png)
![](https://i.imgur.com/YtEXftT.png)
![](https://i.imgur.com/KHpydVm.png)
和 task 相比,可見 worker thread 的建立及釋放時間頗長,thread pool 有其必要性。再來是運算時間:
- AMD R7-3800XT
![](https://i.imgur.com/XXWPQTx.png)
- Intel i5-6200U
![](https://i.imgur.com/irLSMol.png)
在 lock-free 的測試中,執行時間有 pinned 與否沒差多少。而隨著 thread count 增加,lock-free 實作始終優於原始版本。
## Pthread: Cancellation point
> 取自 man-pages:
- Creation: [`pthread_create()`](https://man7.org/linux/man-pages/man3/pthread_create.3.html)
```cpp
int pthread_create(pthread_t *restrict thread,
const pthread_attr_t *restrict attr,
void *(*start_routine)(void *),
void *restrict arg);
```
`thread`: 指向新建的 thread ID
`start_routine`: thread 建立後執行的函式,參數為 `arg`
```cpp
// Creating a new thread
pthread_create(&ptid, NULL, &func, NULL);
```
- Cancellation clean-up handlers
建立方式是由 `pthread_cleanup_push()` 將將函式逐一推進堆疊中,執行時從堆疊最上方依序執行
> [`pthread_exit()`](https://man7.org/linux/man-pages/man3/pthread_exit.3.html)
> Any clean-up handlers established by `pthread_cleanup_push(3)` that have not yet been popped, are popped (in the reverse of the order in which they were pushed) and executed.
Clean-up handlers 作用是在執行緒結束前釋放資源,包括
- mutex
- condition variables
- semaphores
- file descriptor
等不會在執行緒結束時釋放的資源。觸發情境有幾種:
1. 執行緒即將結束時呼叫 `pthread_exit()`,執行堆疊中所有 handlers
2. 由其他執行緒呼叫 `pthread_cancel()` 發出請求 (cancellation request),當具有 Deferred cancelability 的執行緒執行到 cancellation point 時,執行堆疊中所有 handlers
3. 執行緒呼叫 `pthread_cleanup_pop()` 從堆疊上取出最上層的 handler,可選擇執行與否,與 `pthread_cleanup_push()` 搭配使用
值得注意的是,結束執行緒若使用 `return`, handlers 將不會被呼叫,而 `return val` 意同 `pthread_exit(val)`。
> Clean-up handlers are not called if the thread terminates by performing a return from the thread start function.
> Performing a return from the start function of any thread other than the main thread results in an implicit call to `pthread_exit()`, using the function's return value as the thread's exit status.
- Cancellation point
- 是執行緒用來檢查是否取消的時間點。"取消"一詞指的是執行緒的終止,被請求或是正常執行到最後,最終釋放所有資源,雖然 `pthread_kill()` 也能做到執行緒的終止,但不會釋放資源。
- Cancellation point 可由 `pthread_testcancel()` 進行設置,其他執行緒以 [`pthread_cancel()`](https://man7.org/linux/man-pages/man3/pthread_testcancel.3.html) 送出請求 (cancellation request),當執行緒執行到 cancellation point 時才會取消。
- 由 [`pthread_setcancelstate()`](https://man7.org/linux/man-pages/man3/pthread_setcancelstate.3.html) 進行設置 cancelability 決定觸發與否,預設是 `ENABLE`
```cpp
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate)
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate)
```
- 由 [`pthread_setcanceltype()`](https://man7.org/linux/man-pages/man3/pthread_setcancelstate.3.html) 設置 cancelability tpye,預設是 `DEFERRED`,意思是請求會被推遲到下一個 cancellation point,`ASYNCHRONOUS` 則是接收到請求後立刻取消。
```cpp
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &oldtype)
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &oldtype)
```
當執行緒會保留/釋放資源時,設置 `ASYNCHRONOUS`,會讓執行緒收到請求後立刻處理,無法確立資源狀態是釋放前還是釋放後,使得 clean-up handler 無法正確的處理,不建議使用。而在文件中也註明只有 compute-bound loop 的執行緒或是下列 async-cancel-safe functions 適合用 `ASYNCHRONOUS`。
- pthread_cancel()
- pthread_setcancelstate()
- pthread_setcanceltype()
- [pthreads](https://man7.org/linux/man-pages/man7/pthreads.7.html) 明確定義哪些函式必須是/可能是 cancellation point
- 必須是
- pthread_cond_timedwait()
- pthread_cond_wait()
- pthread_testcancel()
[`pthread_cond_wait()`/`pthread_cond_timewait()`](https://man7.org/linux/man-pages/man3/pthread_cond_timedwait.3p.html) 為何是 cancellation point? 是為了防止 indefinite wait,例如等待輸入的資料從未被送出(e.g. `read()`),仍可以取消執行緒的執行。當接收到 cancellation request,如同 unblocked thread 一樣重新取得 mutex,但不是返回呼叫 `pthread_cond_wait()`/`pthread_cond_timedwait()` 的地方而是執行 clean-up handlers.
- [`pthread_join`](https://man7.org/linux/man-pages/man3/pthread_join.3.html)/[`pthread_exit()`](https://man7.org/linux/man-pages/man3/pthread_exit.3.html)
```cpp
void pthread_exit(void *retval);
int pthread_join(pthread_t thread, void **retval);
```
`pthread_exit()` 於執行緒結束時呼叫,返回的數值 `retval` 供其他呼叫 `pthread_join()` 的執行緒取得。`retval` 不可存在於執行緒的 stack 上,否則產生未定義行為。
`pthread_join()` 負責將目標執行緒返回的 exit status 複製到指定地址,如果該執行緒先前已取消,則複製 `PTHREAD_CANCELED` 到指定地址。
```cpp
/* example */
void *app1(void *x)
{
pthread_exit(20);
}
int main()
{
int ret;
pthread_t t1;
pthread_create(&t1, NULL, app1, NULL);
pthread_join(t1, &ret);
return 0;
}
```
- epoll_ctl
This system call is used to add, modify, or remove entries in the
interest list of the epoll(7) instance referred to by the file
descriptor epfd. It requests that the operation op be performed
for the target file descriptor, fd.
Valid values for the op argument are:
EPOLL_CTL_ADD
Add an entry to the interest list of the epoll file
descriptor, epfd. The entry includes the file descriptor,
fd, a reference to the corresponding open file description
(see epoll(7) and open(2)), and the settings specified in
event.
- eventfd
> eventfd() creates an "eventfd object" that can be used as an event wait/notify mechanism by user-space applications, and by the kernel to notify user-space applications of events. **The object contains an unsigned 64-bit integer (`uint64_t`) counter that is maintained by the kernel.** This counter is initialized with the value specified in the argument `initval`.
> The file descriptor is readable (the select(2) readfds argument; the poll(2) POLLIN flag) **if the counter has a value greater than 0.**
## 參考資料
- [c cleanup unused threads](https://stackoverflow.com/questions/9951891/c-cleanup-unused-threads)
- 使用 `pthread_exit(NULL)` 會造成 valgrind 誤判資源未釋放,可改用 `return NULL`
- [Makefile 語法和示範](https://hackmd.io/@sysprog/SySTMXPvl)
- [Geoff Langdale - Lock-Free Programming](https://www.cs.cmu.edu/~410-s05/lectures/L31_LockFree.pdf)
- [Acquire and Release Semantics](https://preshing.com/20120913/acquire-and-release-semantics/)
- [Where does the wait queue for threads lies in POSIX pthread mutex lock and unlock?](https://stackoverflow.com/questions/25419225/where-does-the-wait-queue-for-threads-lies-in-posix-pthread-mutex-lock-and-unloc)
- [Memory Reordering Caught in the Act](https://preshing.com/20120515/memory-reordering-caught-in-the-act/)
- [Memory Barriers Are Like Source Control Operations](https://preshing.com/20120710/memory-barriers-are-like-source-control-operations/)
- [Weak vs. Strong Memory Models](https://preshing.com/20120930/weak-vs-strong-memory-models/)
- [This Is Why They Call It a Weakly-Ordered CPU](https://preshing.com/20121019/this-is-why-they-call-it-a-weakly-ordered-cpu/)
- [让事件飞 ——Linux eventfd 原理与实践](https://zhuanlan.zhihu.com/p/40572954)