---
tags: concurrency
---
# [並行程式設計](https://hackmd.io/@sysprog/concurrency): Thread Pool 實作和改進
> 共筆貢獻者: jserv, 93i7xo2, ccs100203, linD026, bakudr18
## Thread Pool
[thread pool](https://en.wikipedia.org/wiki/Thread_pool) 的設計考量如下:
1. 在大量的執行緒環境中,建立和銷毀執行緒物件的開銷相當可觀,而且頻繁的執行緒物件建立和銷毀,會對高度並行化的應用程式帶來額外的延遲時間
2. 考慮到硬體的有效處理器數量有限,使用 thread pool 可控管執行分配到處理器的執行緒數量
用醫院來比喻:
* 沒有 thread pool 時: 醫院每天要面對成千上萬的病人,每當一個病人求診,就找一位醫生處理,看診後醫生也跟著離開。當看病時間較短時,醫生來去的時間,顯得尤為費時
* 初步引入 thread pool: 醫院設置門診,把醫生全派出去坐診,病人想看診之前,強制先掛號排隊,醫生根據病人隊列順序,依次處理各個病人,這樣就省去醫生往返的時間。但倘若病患很少,醫生卻過多,這會使得很多醫生閒置,浪費醫療資源
* 改進 thread pool: 門診一開始只派出部分醫生,但增加一位協調人 (現實就是護理師擔任),病人依舊是排隊看病,協調人負責調度醫療資源。當病人很多、醫生忙不過來時,協調人就呼叫更多醫生來幫忙;當病人不多、醫生過多時,協調人就安排部分醫生休息待命,避免醫療資源的浪費
示意圖:
![](https://i.imgur.com/GedtciF.png)
## 適用並行運算的圓周率計算
3 月 14 日是[圓周率日](https://en.wikipedia.org/wiki/Pi_Day),這天也是愛因斯坦的生日,求圓周率近似值的討論可見:
* video: [除了割圓術,圓周率還可以這樣算](https://youtu.be/BkDbVypDgSs)
* video: [古人如何計算圓周率 π?](https://youtu.be/AvMaNDh_R0w)
* video: [如何計算圓周率 π 的 1 億位?](https://youtu.be/BkDbVypDgSs)
[Gregory-Leibniz 級數](https://mathworld.wolfram.com/GregorySeries.html)可優雅地計算圓周率,參考 [Leibniz's Formula for Pi](https://proofwiki.org/wiki/Leibniz%27s_Formula_for_Pi)。從下面的 _Madhava–Leibniz series_ 開始推導:
$$
\arctan(1) = \dfrac{\pi}{4} = 1 - \dfrac{1}{3} + \dfrac{1}{5} - \dfrac{1}{7} +\ ...
$$
首先積分下列[數列](https://proofwiki.org/wiki/Leibniz%27s_Formula_for_Pi/Lemma)
$$
\dfrac{1}{1+t^2} = 1 - t^2 + t^4 - t^6 + t^8 + ... + t^{4n} - \frac{t^{4n+2}}{1+t^2}
$$
從 $0$ 積分到 $x$, $0\leq{x}\leq1$
$$
\int_{0}^{x}\dfrac{1}{1+t^2}dt=x-\frac{x^3}{3}+\frac{x^5}{5}-\frac{x^7}{7}+...+\frac{x^{4n+1}}{4n+1}-R_n(x)
\\where\ R_n(x)=\int_{0}^{x}\dfrac{t^{4n+2}}{1+t^2}dt
$$
先看 $R_n(x)$ ,因為 $1\leq1+t^2$,得到
$$
0\leq{R_n(x)}\leq\int_{0}^{x}t^{4n+2}dt=\frac{x^{4n+3}}{4n+3}
$$
又因為
$$
\frac{x^{4n+3}}{4n+3}\leq\frac{1}{4n+3}, 0\leq{x}\leq1
$$
依據[夾擠定理](https://zh.wikipedia.org/wiki/%E5%A4%BE%E6%93%A0%E5%AE%9A%E7%90%86) (squeeze theorem,也稱為 sandwich theorem),當 $n\rightarrow\infty, \frac{1}{4n+3}\rightarrow0$ ,於是得出下列式子
$$
\int_{0}^{x}\dfrac{1}{1+t^2}dt = x-\frac{x^3}{3}+\frac{x^5}{5}-\frac{x^7}{7}+\ ...
$$
而且$\frac{d}{dx}arctan(x)=\frac{1}{1+t^2}$
$$
\arctan(x) = x-\frac{x^3}{3}+\frac{x^5}{5}-\frac{x^7}{7}+\ ...
$$
此時將 $x$ 代入 1,即可得 $\frac{\pi}{4}$
以下是對應實作:
```c
double compute_pi_leibniz(size_t N)
{
double pi = 0.0;
for (size_t i = 0; i < N; i++) {
double tmp = (i & 1) ? (-1) : 1;
pi += tmp / (2 * i + 1);
}
return pi * 4.0;
}
```
比較單執行緒、多執行緒和 SIMD 版本的表現:
![](https://i.imgur.com/x4gz1oE.png)
1995 年提出的 [Bailey–Borwein–Plouffe formula](https://en.wikipedia.org/wiki/Bailey%E2%80%93Borwein%E2%80%93Plouffe_formula),可用這三位發表者的姓氏開頭簡稱為 BBP 公式,最初的公式如下:
$\pi=\displaystyle\sum_{k=0}^{\infty}\frac{1}{16^k}(\frac{4}{8k+1}-\frac{2}{8k+4}-\frac{1}{8k+5}-\frac{1}{8k+6})$
BBP 公式跳脫典型的圓周率的演算法,可計算圓周率的任意第 $n$ 位,而不用事先計算前面的 $n - 1$ 位,於是 BBP 公式很適合透過並行運算來求圓周率近似值。
典型的圓周率演算法必須計算前面的 $n- 1$ 位才能夠計算,代表數列每一項之間具相依性。以公式
$$
arctan(1)=\frac{π}{4}=1−\frac{1}{3}+\frac{1}{5}−\frac{1}{7}+ ...
$$
以及
$$
\sum_{k=1}^{\infty} \frac{1}{16^k}(\frac{4}{8k+1}-\frac{2}{8k+4}-\frac{1}{8k+5}-\frac{1}{8k+6})
$$
來說,因為每一項的計算都只跟當下的 k 值有關,所以每個獨立的項都可以平行計算,但利用公式
$$\frac{\pi}{2}=\frac{2}{1}×\frac{2}{3}×\frac{4}{3}×\frac{4}{5}×\frac{6}{5}×...
$$
實作的其中一種方式如下:
```c
double pi(size_t N) {
double pi = 1;
double n = 1;
for (size_t j = 1;j <= N; j++, n++) {
if (j & 1 == 0) {
pi *= (n / (n + 1));
} else {
pi *= ((n + 1) / n);
}
}
return 2 * pi;
}
```
因為算出目前項之前需要先知道前面連乘得到的值 `pi` ,所以無法對每一項做並行運算。以下採用 BBP 公式搭配 Thread Pool 進行驗證。
## 實作
以下是一個 [thread pool](https://en.wikipedia.org/wiki/Thread_pool) 實作: [tpool.c](https://github.com/sysprog21/concurrent-programs/blob/master/tpool/tpool.c)
預期執行輸出:
```
PI calculated with 101 terms: 3.141592653589793
```
程式架構示意:
```graphviz
digraph struct {
node [shape=record];
rankdir=LR;
jobqueue_fetch [label="jobqueue_fetch()" shape=plaintext]
bpp [label="bpp()" shape=plaintext]
bpp2 [label="bpp()" shape=plaintext]
"__threadpool" [
label =<<TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0">
<TR><TD width="100" PORT="ll"><B>__threadpool</B></TD></TR>
<TR><TD PORT="l0">count</TD></TR>
<TR><TD PORT="l1">workers</TD></TR>
<TR><TD PORT="l2">jobqueue</TD></TR>
</TABLE>>
shape = "none"
];
"jobqueue_t" [
label =<<TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0">
<TR><TD width="100" PORT="ll"><B>jobqueue_t</B></TD></TR>
<TR><TD PORT="l0">*head</TD></TR>
<TR><TD PORT="l1">*tail</TD></TR>
<TR><TD PORT="l2">cond_nonempty</TD></TR>
<TR><TD PORT="l3">rwlock</TD></TR>
</TABLE>>
shape = "none"
];
"threadtask_t" [
label =<<TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0">
<TR><TD width="100" PORT="ll"><B>threadtask_t</B></TD></TR>
<TR><TD PORT="l0">*func</TD></TR>
<TR><TD PORT="l1">*arg</TD></TR>
<TR><TD PORT="l2">*future</TD></TR>
<TR><TD PORT="l3">*next</TD></TR>
</TABLE>>
shape = "none"
];
"threadtask_t1" [
label =<<TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0">
<TR><TD width="100" PORT="ll"><B>threadtask_t</B></TD></TR>
<TR><TD PORT="l0">*func</TD></TR>
<TR><TD PORT="l1">*arg</TD></TR>
<TR><TD PORT="l2">*future</TD></TR>
<TR><TD PORT="l3">*next</TD></TR>
</TABLE>>
shape = "none"
];
"__tpool_future" [
label =<<TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0">
<TR><TD width="100" PORT="ll"><B>__tpool_future</B></TD></TR>
<TR><TD PORT="l0">flag</TD></TR>
<TR><TD PORT="l1">*result</TD></TR>
<TR><TD PORT="l2">mutex</TD></TR>
<TR><TD PORT="l3">cond_finished</TD></TR>
</TABLE>>
shape = "none"
];
"__tpool_future1" [
label =<<TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0">
<TR><TD width="100" PORT="ll"><B>__tpool_future</B></TD></TR>
<TR><TD PORT="l0">flag</TD></TR>
<TR><TD PORT="l1">*result</TD></TR>
<TR><TD PORT="l2">mutex</TD></TR>
<TR><TD PORT="l3">cond_finished</TD></TR>
</TABLE>>
shape = "none"
];
threadtask_t:l2->__tpool_future:ll
threadtask_t1:l2->__tpool_future1:ll
jobqueue_t:l0->threadtask_t:ll
jobqueue_t:l1->threadtask_t1:ll
threadtask_t:l0->bpp:w
threadtask_t1:l0->bpp2:w
threadtask_t:l3->threadtask_t1:ll
__threadpool:l2->jobqueue_t:ll
__threadpool:l1->jobqueue_fetch:w
}
```
### Thread Pool & Job Queue
```c
struct __threadpool {
size_t count;
pthread_t *workers;
jobqueue_t *jobqueue;
};
typedef struct __jobqueue {
threadtask_t *head, *tail;
pthread_cond_t cond_nonempty;
pthread_mutex_t rwlock;
} jobqueue_t;
```
- `count` 紀錄 thread pool 內 worker thread (即 `workers`) 的數量
- worker 閒置時向 `__jobqueue` 抓取被封裝的任務 `threadtask_t` 執行,以 FIFO 方式取出任務,`tail` 指向下一個待取出的任務。
- 存取共享變數 `__jobqueue` 必須保證為 exclusive use,若無任務 worker 則以 `pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock)` 等待新的任務
### Thread Task
```c
typedef struct __threadtask {
void *(*func)(void *);
void *arg;
struct __tpool_future *future;
struct __threadtask *next;
} threadtask_t;
struct __tpool_future {
int flag;
void *result;
pthread_mutex_t mutex;
pthread_cond_t cond_finished;
};
```
- worker 取得函式 `func` 執行的結果放在共享變數 `future`,而同時可能有其他執行緒存取 `future` 取得運算結果,同樣需要保護。
- `__threadtask` 與 `__tpool_future` 成對建立
- worker 設置 flag 表示運算進行 (`__FUTURE_RUNNING`) 或完成 (`__FUTURE_FINISHED`)
- 其他執行緒設置 flag 以干涉 worker 行為
- `__FUTURE_CANCELLED`
- `__FUTURE_DESTROYED`
## 程式流程
1. `tpool_create()`: 建立一定數量的 worker 於 pool,worker 執行 `jobqueue_fetch()` 抓取 job queue 內的任務
2. `tpool_apply()`: 將新建立的 `threadtask_t` 推進 job queue,每個 `threadtask_t` 各指向一個 `__tpool_future` 用以儲存運算結果。同時以 `pthread_cond_broadcast()` 喚醒等待 job queue 有新任務的 worker
3. `tpool_future_get()` 和 `tpool_future_destroy()`: 從 `__tpool_future` 取回運算結果後釋放
4. `tpool_join()`: 呼叫 `tpool_apply` 新增空任務,worker 拿到空任務後結束執行緒,緊接著釋放 pool 和 job
- `jobqueue_fetch()`
```graphviz
digraph finite_state_machine {
node [shape=point,label=""]ENTRY,EXIT;
rankdir=TD
get_task[shape=box label="Get a new task from queue"]
set_run[shape=box label="Set RUNNING"]
set_fin[shape=box label="Set FINISHED"]
broadcast[shape=box label="pthread_cond_broadcast()"]
free_task[shape=box label="free(__threadtask)"]
free_future[shape=box label="free(__tpool_future)"]
is_cancelled[shape=diamond, label="CANCELLED?"]
is_destroyed[shape=diamond, label="DESTROYED?"]
is_null_func[shape=diamond, label="Valid function?"]
ENTRY->get_task;
get_task->is_null_func [tailport=e]
is_null_func->is_cancelled [label="Yes"]
is_cancelled->free_task [label="Y", tailport=e, headport=e]
is_cancelled->set_run [label="N"]
set_run->is_destroyed [label=" Wait for function to finish"]
is_destroyed->free_future [label="Y"]
is_destroyed->set_fin [label="N", tailport=e]
free_future->free_task
set_fin->broadcast
broadcast->free_task
free_task->get_task [tailport=w, headport=w]
is_null_func->EXIT [tailport=e, label="No. Terminated"]
}
```
- `__FUTURE_CANCELLED` 與 `__FUTURE_DESTROYED` 的使用情境
- 在 `tpool_future_destroy()` 內,若 `future->flag` 為 `__FUTURE_FINISHED` 或`__FUTURE_CANCELLED` 兩者之一,表示 `__threadtask` 已被釋放無法被 worker 拿到,故在此釋放資源。
若非以上情況則說明對應的 `__threadtask` 仍可存取該 `future`,故將 `future->flag` 設置為 **`__FUTURE_DESTROYED`** 之後由 worker 來釋放 future 的資源。
- 同理 `jobqueue_destroy()` 也有必要依據 **`__FUTURE_DESTROYED`** 來釋放 future
- 使用到 `pthread_cancel()` 不代表設置 `__FUTURE_CANCELLED`。`pthread_cancel()` 僅在 `tpool_create()` 建立 worker 失敗時使用,向先前建立的 worker 發送 cancellation request
```c!
/* jobqueue_fetch */
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
pthread_testcancel();
while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
```
- `jobqueue_destroy()` 是唯一會將 flag 設置為 `__FUTURE_CANCELLED`,猜測使用情境是想中斷所有的 worker,摧毀所有 job queue 內的任務但保留 `__tpool_future` 自行處理 (但並沒有使用到關鍵的 `rwlock`)。
目前 `jobqueue_destroy()` 只有用在 `tpool_join()` 或是用在 `tpool_create()` 失敗時摧毀空 queue
```c
int tpool_join(struct __threadpool *pool) {
...
jobqueue_destroy(pool->jobqueue);
...
}
```
```c
struct __threadpool *tpool_create(size_t count) {
...
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
```
```c
static void jobqueue_destroy(jobqueue_t *jobqueue) {
threadtask_t *tmp = jobqueue->head;
while (tmp) {
/* Never executed */
}
pthread_mutex_destroy(&jobqueue->rwlock);
pthread_cond_destroy(&jobqueue->cond_nonempty);
free(jobqueue);
}
```
總之,沒有一種情況是同時有 worker 在運行。
## 實作缺失回顧
### 修正 clock 屬性
`pthread_cond_timedwait()` 的 expire time 使用 `CLOCK_MONOTONIC`
```c
struct timespec expire_time;
clock_gettime(CLOCK_MONOTONIC, &expire_time);
expire_time.tv_sec += seconds;
int status = pthread_cond_timedwait(&future->cond_finished,
&future->mutex, &expire_time);
```
> **pthread_cond_timedwait**
> The condition variable shall have a clock attribute which specifies the clock that shall be used to measure the time specified by the abstime argument.
> **pthread_condattr_setclock**
> The default value of the clock attribute shall refer to the system clock.
因為 `pthread_cond_timedwait()` 需要 condition variable 具有 clock attribute,而預設的 system clock 用 `pthread_condattr_getclock()` 取得得到 `CLOCK_REALTIME`,非預期的 `CLOCK_MONOTONIC`。
`CLOCK_REALTIME` 會跟 NTP 校正時間,可能往前或往後;而 `CLOCK_MONOTONIC` 則保證單調遞增,從特定點開始計數,[clock_getres](https://man7.org/linux/man-pages/man2/clock_getres.2.html) 說明在 Linux 上,特定點指的是開機時間。兩者差異甚大,因此初始化時還需指定 clock attribute。
```diff
static struct __tpool_future *tpool_future_create(void) {
struct __tpool_future *future = malloc(sizeof(struct __tpool_future));
if (future) {
future->flag = 0;
future->result = NULL;
pthread_mutex_init(&future->mutex, NULL);
pthread_condattr_t attr;
pthread_condattr_init(&attr);
+ pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
pthread_cond_init(&future->cond_finished, &attr);
pthread_condattr_destroy(&attr);
}
return future;
}
```
### Timeout 處理
在 [8dc72](https://github.com/93i7xo2/sysprog2021q1/commit/8dc72a44e8ccc73685d9c9b7aaeefd49c1796f27) 實現 timeout 處理,不使用 `__FUTURE_CANCELLED` 和 `__FUTURE_DESTROYED` 達成。
使用 `tpool_future_get()` 取得計算結果發生逾時時,嘗試從 job queue 移除 task 和 future:
1. 檢查 future->flag 是否有 `__FUTURE_RUNNING`,若有代表 worker 已取得對應的 task 進行計算,現行版本尚不支援中斷進行計算中的 worker,於是等待計算結果,結果等同
```c
tpool_future_get(futures[i], 0)
```
2. 若無 `__FUTURE_RUNNING`,表示對應的 task 仍存於 jobqueue,將其連同 future 移除並釋放資源。
試著從 `futures[i]` 最後一項開始取得計算結果,設置 time limit = 1 ms。由於計算上是由 `futures[0]` 開始至 `futures[PRECISION]`,一開始尾端的 task 尚未被 worker 取得計算,future 自然等不到 `pthread_cond_broadcast()`,因此從輸出可見一些 future 被移除。
```diff
- for (int i = 0; i <= PRECISION; i++) {
+ for (int i = PRECISION; i >= 0; i--) {
if (!futures[i])
continue;
double *result = tpool_future_get(futures[i], time_limit, pool->jobqueue);
if (result) {
bpp_sum += *result;
free(result);
tpool_future_destroy(futures[i]);
DEBUG_PRINT(("Future[%d] completed!\n", i));
} else
DEBUG_PRINT(("Cannot get future[%d], timeout after %d milliseconds.\n", i,
time_limit));
}
```
```bash
$ ./pi 4 1
Thread count: 4
Time limit: 1 ms
Cannot get future[100], timeout after 1 milliseconds. /* future[100] is removed */
Future[99] completed!
...
Future[0] completed!
Elapsed time: 11592788 ns
PI calculated with 101 terms: 3.141592653589793
```
## 鏈結串列的效率
原實作中 [pop task](https://github.com/bakudr18/quiz4/blob/87d88e4cbaa11f6d3ed51e90c6f06f6c4a55d1b2/tpool.c#L166-L169) 的方式是走訪整個 鏈結串列取出 tail ,其 time complexity 為 $O(n)$ ,造成在 critical section 內駐留過久,這對於 multithread 效能的傷害是很大的,藉由 `sudo taskset 0xF0 ./pi` 以 4 core CPU 測量 `PRECISION = 100000` 時的執行時間,多執行幾次可發現,雖然大多數執行時間落在 160~200 milliseconds ,但偶爾會有超過 100 seconds 或更久的執行時間,以 [perf record](https://man7.org/linux/man-pages/man1/perf-record.1.html) 紀錄超過 100 seconds 的 process,發現有約 83% 的時間在執行 `for (tmp = jobqueue->head; tmp->next != jobqueue->tail; tmp = tmp->next);` ,而越是減少可用的 CPU 此情況會更嚴重(因為當 main thread 執行時不斷 push task,若 OS scheduler 沒有頻繁 context switch 給 `jobqueue_fetch` 消化 task,鏈結串列會增長非常快)。
因此,為了使 pop task 達到 $O(1)$ ,可將原本的 `jobqueue` 改成雙向鏈結串列,後來更是直接引入 [linux/list.h](https://github.com/torvalds/linux/blob/master/include/linux/list.h) 以環狀雙向鏈結串列取代原本的單向鏈結串列,如此不但使 pop task 達到 $O(1)$ ,也減少了在 `jobqueue->head == jobqueue->tail` 的 if statement ,詳細程式碼可見 [commit f8761](https://github.com/bakudr18/quiz4/commit/f8761d6b4c884f1a96ef80f08d590fd5573aacee) ,而修改後以 4 core 執行的平均時間約為 138 milliseconds,且沒有特別的 outliers 。
```diff
typedef struct __threadtask {
void *(*func)(void *); /* the function that task should execute */
void *arg; /* argument passed to func */
struct __tpool_future
*future; /* A structure to store task status and result */
- struct __threadtask *next; /* pointer to next task */
+ struct list_head list; /* linked list of task structure */
} threadtask_t;
typedef struct __jobqueue {
- threadtask_t *head, *tail; /* store head and tail of queue */
+ struct list_head head; /* list head of task */
pthread_cond_t
cond_nonempty; /* condition variable to check if queue is non-empty */
pthread_mutex_t rwlock; /* lock share resources like head and tail */
} jobqueue_t;
```
## 研讀 [atomic_threadpool](https://github.com/Taymindis/atomic_threadpool)
atomic_threadpool 是一套使用 lock-free FIFO queue (`lfqueue_deq`) 實作的函式庫,支援 MS-Windows, macOS 與 Linux,近一半的程式碼是對不同平台所提供的 API 進行調整。
首先看較重要的函式 `at_thpool_worker`,功能等同 `jobqueue_fetch`,旨在從 job queue 抓取 task,task 由函式 `task->do_work` 及參數 `task->arg` 組成,由於沒有需要存入運算結果, task 運算結束後返回 `TASK_PENDING` 抓取下一個 task。
使用 `tp->nrunning` 來表示進行中的 worker 數量,設置 `tp->is_running=0` 能立即終止尚未執行 task 的 worker。
```c
void *at_thpool_worker(void *_tp)
{
at_thpool_t *tp = (at_thpool_t *) _tp;
AT_THPOOL_INC(&tp->nrunning); // __sync_fetch_and_add(v, 1)
at_thtask_t *task;
void *_task;
lfqueue_t *tq = &tp->taskqueue;
TASK_PENDING:
while (tp->is_running) {
if ((_task = lfqueue_deq(tq)))
goto HANDLE_TASK;
lfqueue_sleep(1);
}
AT_THPOOL_DEC(&tp->nrunning); // __sync_fetch_and_dec(v, 1)
return NULL;
HANDLE_TASK:
task = (at_thtask_t*) _task;
task->do_work(task->args);
AT_THPOOL_FREE(task);
goto TASK_PENDING;
}
```
當使用 `lfqueue_deq` 從 job queue 從尾端取得 task 時,理想狀況其他執行緒可同時使用 `lfqueue_enq` 插入新的 task 而不衝突,而 lfqueue 也確實做到。
```c
while (tp->is_running) {
if ((_task = lfqueue_deq(tq)))
goto HANDLE_TASK;
lfqueue_sleep(1);
}
```
### `lfqueue`
atomic_threadpool 所使用到的 `lfqueue` API,共同操作同一個 lfqueue_t 型態的 `lfqueue`:
```c
extern int lfqueue_init(lfqueue_t *lfqueue);
extern int lfqueue_enq(lfqueue_t *lfqueue, void *value);
extern void* lfqueue_deq(lfqueue_t *lfqueue);
extern void lfqueue_destroy(lfqueue_t *lfqueue);
```
```c
typedef struct {
lfqueue_cas_node_t *head, *tail, *root_free, *move_free;
volatile size_t size;
volatile lfq_bool_t in_free_mode;
lfqueue_malloc_fn _malloc;
lfqueue_free_fn _free;
void *pl;
} lfqueue_t;
```
結構內有兩條佇列
1. `head`/`tail`: 分別指向佇列前端/尾端,`head` 同時也指向上一個取出的節點,
2. `root_free`/`move_free`: `root_free` 分別指向佇列前端/尾端,紀錄等待被釋放的節點。每個 `lfqueue_cas_node_t` 型態的節點都帶有 lfq_time_t 型態的 `_deactivate_tm` 紀錄進入佇列的時間。
- `__lfq_check_free`
```c=
static void
__lfq_check_free(lfqueue_t *lfqueue) {
lfq_time_t curr_time;
// 限制只有一條執行緒能夠執行
if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->in_free_mode, 0, 1)) {
// 以 rootfree 為起始刪除後面節點
// 並限制離 _deactivate_tm 超過 2s
lfq_get_curr_time(&curr_time);
lfqueue_cas_node_t *rtfree = lfqueue->root_free, *nextfree;
while ( rtfree && (rtfree != lfqueue->move_free) ) {
nextfree = rtfree->nextfree;
if ( lfq_diff_time(curr_time, rtfree->_deactivate_tm) > 2) {
// printf("%p\n", rtfree);
lfqueue->_free(lfqueue->pl, rtfree);
rtfree = nextfree;
} else {
break;
}
}
lfqueue->root_free = rtfree;
__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->in_free_mode, 1, 0);
}
__LFQ_SYNC_MEMORY();
}
```
第 9 行表明佇列至少存在一個節點,因此 `move_free` 不存在指向 NULL 的可能性,好處是**插入時不用判斷是否指向 NULL**
```c
move_free->next = new_node;
```
缺點是針對程式起始、結束要特別處理多出來的一個節點,另一條佇列同樣情況。
```c
int
lfqueue_init_mf(lfqueue_t *lfqueue, void* pl, lfqueue_malloc_fn lfqueue_malloc, lfqueue_free_fn lfqueue_free) {
...
freebase->value = NULL;
freebase->next = NULL;
freebase->nextfree = NULL;
freebase->_deactivate_tm = 0;
lfqueue->head = lfqueue->tail = base; // Not yet to be free for first node only
lfqueue->root_free = lfqueue->move_free = freebase; // Not yet to be free for first node only
...
}
```
```c
void
lfqueue_destroy(lfqueue_t *lfqueue) {
...
if (rtfree) { // rtfree will never be NULL
lfqueue->_free(lfqueue->pl, rtfree);
}
...
}
```
### `__lfq_recycle_free`
`__lfq_check_free` 稍早已提到功能是清除以 root_free 為首的佇列,而 `__lfq_recycle_free` 便是將節點推入佇列。因為在多執行緒下,如果直接把節點的資源釋放掉可能會在之後的操作造成錯誤。
```graphviz
digraph main {
rankdir = LR
node [shape = box]
freebase [label = "free base"]
node_1 [label = "free node 1"]
subgraph cluster_move {
label = "CAS(&freed->nextfree, NULL, freenode)"
freebase -> node_1[label = "nextfree"]
subgraph cluster_freed {
label = "lfqueue->root_free\nlfqueue->move_free\nfreed"
freebase
}
}
}
```
```graphviz
digraph main {
rankdir = LR
node [shape = box]
compound = true
freebase [label = "free base"]
node_1 [label = "free node 1"]
subgraph cluster_cas {
label = "CAS(&lfqueue->move_free, freed, freenode)"
subgraph cluster_freed {
label = "lfqueue->root_free\nlfqueue->move_free\nfreed"
freebase
}
subgraph cluster_move {
label = "lfqueue->move_free\""
}
subgraph cluster_freed_l {
label = "freed\""
temp [label = "free base"]
}
freebase -> temp[ltail = cluster_freed, lhead = cluster_freed_l]
freebase -> node_1[label = "compare\n success" ltail = cluster_freed, lhead = cluster_move]
}
}
```
```c=
static void
__lfq_recycle_free(lfqueue_t *lfqueue, lfqueue_cas_node_t* freenode) {
lfqueue_cas_node_t *freed;
do {
freed = lfqueue->move_free;
} while (!__LFQ_BOOL_COMPARE_AND_SWAP(&freed->nextfree, NULL, freenode) );
lfq_get_curr_time(&freenode->_deactivate_tm);
__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->move_free, freed, freenode);
}
```
- 佇列尾端 `move_free->nextfree = NULL`。
- 節點型態雖為 `lfqueue_cas_node_t` 但非用 `next` 而是以 `nextfree` 進行連結。
- 節點在 enqueue 時即初始化 `nextfree = NULL`,因此 `freenode->nextfree = NULL`。
- 當第 6 行以 `__LFQ_BOOL_COMPARE_AND_SWAP` 插入節點 `freenode` 至 `move_free` 後端,即限制了直到第 10 結束都沒有其他執行緒可執行下去,形成 critical section。
```graphviz
digraph {
// rankdir=TB
subgraph MVF {
label="Local Datacenter";
mvf [shape=record label="|<h>nextfree"]
NULL1 [shape=plaintext label="NULL"]
}
subgraph FN {
label="Local Datacenter";
fn [shape=record label="|<h>nextfree"]
NULL2 [shape=plaintext label="NULL"]
}
movefree [shape=plaintext]
freenode [shape=plaintext]
movefree -> mvf
freenode -> fn
mvf:h:e -> NULL1:w [style="dashed"]
mvf:h:e -> fn:w
fn:h:e -> NULL2:w
{rank = same; movefree; freenode;}
{rank = same; NULL1; NULL2;}
}
```
```graphviz
digraph main {
rankdir = LR
node [shape = box]
label = "free mode\n\nloop\nif rtfree != NULL && rtfree != lfqueue->move_free"
labelloc = "t"
compound = true
freebase [label = "lfqueue->root_free\n(free base)"]
node_1 [label = "free node 1"]
subgraph cluster_free {
label = "free ( functoin )"
subgraph cluster_rtfree {
label = "rtfree"
freebase
}
}
subgraph cluster_move {
label = "lfqueue->move_free"
node_1
}
freebase -> node_1[label = "nextfree"]
node_1 -> freebase[label = "assign to\nrbfree", lhead = cluster_rtfree]
}
```
### `lfqueue_enq`
插入新節點
```c
static int
_enqueue(lfqueue_t *lfqueue, void* value) {
lfqueue_cas_node_t *tail, *node;
node = (lfqueue_cas_node_t*) lfqueue->_malloc(lfqueue->pl, sizeof(lfqueue_cas_node_t));
if (node == NULL) {
perror("malloc");
return errno;
}
node->value = value;
node->next = NULL;
node->nextfree = NULL;
for (;;) {
__LFQ_SYNC_MEMORY(); // 沒有 memory barrier 會因為編譯器最佳化或是因為 out-of-order execution 無法拿到最新的 tail
tail = lfqueue->tail; // 不可能發生 tail = NULL
if (__LFQ_BOOL_COMPARE_AND_SWAP(&tail->next, NULL, node)) {
/* 由於使用 atomic operation
* 同一時間只會有一個執行緒更新 tail->next
* 接下來才會將 tail 指向新的尾端完成整個插入流程
* 另一個執行緒所拿到的 tail->next 才會是 NULL
*/
// compulsory swap as tail->next is no NULL anymore, it has fenced on other thread
__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->tail, tail, node);
__lfq_check_free(lfqueue); // cleanup
return 0;
}
}
/*It never be here*/
return -1;
}
```
觀察 linux 上的實作,使用到 lock-free 技巧常見的 CAS 等 atomic operation,令人不解的是大部份函式都有 [full barrier](https://gcc.gnu.org/onlinedocs/gcc-4.5.3/gcc/Atomic-Builtins.html) 的作用,這裡特地引進 `__sync_synchronize` (full barrier)。
```c
#define __LFQ_VAL_COMPARE_AND_SWAP __sync_val_compare_and_swap
#define __LFQ_BOOL_COMPARE_AND_SWAP __sync_bool_compare_and_swap
#define __LFQ_FETCH_AND_ADD __sync_fetch_and_add // skip
#define __LFQ_ADD_AND_FETCH __sync_add_and_fetch
#define __LFQ_YIELD_THREAD sched_yield
#define __LFQ_SYNC_MEMORY __sync_synchronize
```
### `lfqueue_deq`
從佇列前端移去節點,要判斷的情況較多。改寫部份程式碼和搭配註解。
```graphviz
digraph main {
rankdir = LR
node [shape = box]
compound = true
inter [label = "..."]
head [label = "head" shape = plaintext]
tail [label = "tail" shape = plaintext]
node_1 -> _next -> inter -> node_n
tail -> node_n
{rank = same node_n, tail}
subgraph cluster_pop {
label = "CAS(&lfqueue->head, head, head)"
head -> node_1
}
subgraph cluster_next {
label = "next"
labelloc = "t"
_next
}
}
```
- 8 行的 CAS 應是為了避免取得 `head` 後,原有的 head 指向的節點被其他執行緒釋放導致 `head->next` 產生未定義行為。實際上釋放會在 2 秒後由 `__lfq_check_free` 執行,發生未定義行為的可能性極微,即使加了 CAS 也不能避免 8~9 行間 `__lfq_check_free` 釋放掉 `head`。
```c=
static void *
_dequeue(lfqueue_t *lfqueue) {
lfqueue_cas_node_t *head, *next;
void *val;
for (;;) {
head = lfqueue->head;
if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->head, head, head)) {
next = head->next;
if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->tail, head, head)) {
/* head == tail 代表無可取出節點,此時 next 必為 NULL
* 這時可能有 `lfqueue_enq` 執行插入節點產生 `next != NULL` 的情況
* 故需做判斷
*/
if (next == NULL) {
/* 遇到這種情況重新取值即可,但一些函式需要回傳值判斷 queue 為空 */
val = NULL;
goto _done;
}
}
else {
/* 目的是排除 next = NULL 的狀況,以免 dereference 出錯
* 正常情況下 next = NULL 只有在 head = tail 情況成立
*/
if (next) {
val = next->value;
if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->head, head, next)) {
break;
}
} else {
val = NULL;
goto _done;
}
}
}
}
__lfq_recycle_free(lfqueue, head);
_done:
// __asm volatile("" ::: "memory");
__LFQ_SYNC_MEMORY();
__lfq_check_free(lfqueue);
return val;
}
```
使用 gdb 觀察 31 行發生當下:
```bash
(gdb) p next
$14 = (lfqueue_cas_node_t *) 0x0
(gdb) p head->next
$15 = (struct lfqueue_cas_node_s *) 0x55555bf83db0
(gdb) p head
$17 = (lfqueue_cas_node_t *) 0x55555bf83d80
(gdb) p *head->next
$18 = {value = 0x555555c57490, next = 0x55555bf83de0, nextfree = 0x55555bf83de0, _deactivate_tm = 1623174704}
```
```c=7
head = lfqueue->head;
next = head->next;
```
gdb 顯示 `next != head->next`,明顯與上方程式碼矛盾。表明 7~10 行即使通過 `__LFQ_BOOL_COMPARE_AND_SWAP`,`head` 可能經由其他執行緒的 enqueue,導致 `next` 指向其他新的節點。
```graphviz
digraph main {
rankdir = LR
node [shape = box]
compound = true
inter [label = "..."]
head [label = "head" shape = plaintext]
tail [label = "tail" shape = plaintext]
subgraph cluster_empty {
label = "CAS(&lfqueue->tail, head, head)"
labelloc = "t"
head
node_1
}
head -> node_1
node_1 -> _next -> inter -> node_n
node_n -> tail[dir = back]
subgraph cluster_next {
label = "next"
labelloc = "b"
_next
}
NULL [label = "next == NULL", shape = oval]
_next -> NULL [ltail = cluster_next]
val [label = "next != NULL", shape = oval]
_next -> val [ltail = cluster_next]
inter -> NULL[style = invis]
{rank = same inter, NULL, val}
}
```
```graphviz
digraph main {
rankdir = LR
node [shape = box]
compound = true
head [label = "head" shape = plaintext]
temp [label = "assign\n_next to\nhead" shape = plaintext]
val [label = "next != NULL", shape = oval]
val -> head [lhead = cluster_pop_suc]
subgraph cluster_pop_suc {
label = "CAS(&lfqueue->head, head, next)"
node_1 -> _next
head -> node_1
{rank = same head, node_1}
head -> temp [label = "if lfqueue->head\nis node_1 (head)"]
temp -> _next[label = "head' "]
{rank = same temp, _next}
}
}
```
:::info
此實作並未解決 [ABA problem](https://en.wikipedia.org/wiki/ABA_problem)
:::
## 使用 C11 Atomics 改寫
> The idea of "lock free" is not really not having any lock, the idea is to minimize the number of locks and/or critical sections, by using some techniques that allow us not to use locks for most operations.
> In this sense, the lock in lock-free does not refer directly to mutexes, but rather to the possibility of “locking up” the entire application in some way, whether it’s deadlock, livelock – or even due to hypothetical thread scheduling decisions made by your worst enemy. - [An Introduction to Lock-Free Programming](https://preshing.com/20120612/an-introduction-to-lock-free-programming/)
廣泛的說,lock-free 並非指不使用 mutex (lock) 而是指鎖住整個 process 的可能性,下方程式碼雖然沒有使用到 mutex,但若兩個 thread 執行同樣的程式碼,在特定執行順序下永遠無法離開 loop。
```c
while (X == 0)
{
X = 1 - X;
}
```
而使用 mutex 的 thread 在最差情況下,所使用的 mutex 尚未被其他 thread 所釋放,而停滯不前,甚至導致 lock 之間的競爭,自然不算在 lock-free 範疇。
### lock-free queue
參考 [RZHuangJeff](https://github.com/RZHuangJeff/tpool/commit/f57bb7e86caef6808ff2c7d9b4b5868ef49ef6f3) 使用 ring buffer 實作 atomic queue。 使用 ring buffer 有無需管理記憶體及固定緩衝區大小的好處,再以 [mmap](https://man7.org/linux/man-pages/man2/mmap.2.html) 處理緩衝區邊界,減少判斷讀寫是否會超出邊界所帶來的效能影響。
設計上只有一個 producer,使用 `count` 來紀錄存入的資料數,consumer 依照 `count` 決定是否往下執行,避開判斷邊界(如下)。由於 `head`, `tail` 都是 atomic,依據邊界來判斷需於一個 CAS 指令內對兩個變數進行操作,較為不便。
```c
// is full
head == (tail ^ rbf->size*2)
// is empty
head == tail
```
為避免 ABA problem,將用來存 offset 的變數前 32-bit 放置要存入 buffer 的資料的一小部份,例如 `(void *)` 型態的資料為 8 byte,假設資料前 4 byte 相同不具特徵(e.g. 連續記憶體位置的指標),將後 4 byte 併入 offset 的高 4 byte,這麼做即使在 CAS 判斷 offset 相同,也能透過前 4 byte 得知資料更動。
```c
/* producer */
bool enqueue(ringbuffer_t *rb, void **src) {
uint64_t _read, _write;
_read = atomic_load(&rb->read_offset) & RB_OFF_MASK;
_write = atomic_load(&rb->write_offset) & RB_OFF_MASK;
if (_read == (_write ^ rb->size))
return false;
memcpy(&rb->buffer[_write], src, sizeof(void *));
_write = (_write + sizeof(void *)) & rb->mask;
_write |= ((uint64_t)*src << 32);
atomic_store_explicit(&rb->write_offset, _write, memory_order_release);
atomic_fetch_add_explicit(&rb->count, 1, memory_order_release);
return true;
}
/* consumer */
bool dequeue(ringbuffer_t *rb, void **dst) {
int64_t count, new_count;
do {
count = atomic_load(&rb->count);
new_count = count - 1;
if (__builtin_expect((new_count < 0), 1))
return false;
} while (!atomic_compare_exchange_weak(&rb->count, &count, new_count));
uint64_t _read, new_read;
do {
_read = atomic_load(&rb->read_offset);
new_read = (((_read & RB_OFF_MASK) + sizeof(void *)) & rb->mask);
memcpy(dst, &rb->buffer[_read & RB_OFF_MASK], sizeof(void *));
} while (!atomic_compare_exchange_weak(&rb->read_offset, &_read, new_read));
return true;
}
```
### affinity-based thread pool
[Thread safety with affine thread pools](https://bartoszsypytkowski.com/thread-safety-with-affine-thread-pools/) 一文提到執行緒需要為在處理器核 (core) 切換間付出 context switching 及 cache refresh/invalidation 的代價,因此主張同一段程式碼應固定在同一個處理器執行。
此外,該文也提出 [work stealing](https://en.wikipedia.org/wiki/Work_stealing) 的實作方式:所有 thread 應有屬於自己的 private queue 及共享的 shared queue。thread 從 private queue 提取任務執行,在閒置時則從 shared queue 提取,以最大化利用資源。
[afn_threadpool.c](https://github.com/93i7xo2/sysprog2021q1/blob/master/quiz4/afn_threadpool.c) 使用 lock-free queue 實作上述功能,達到以下需求:
- 建立與處理器核數 (physical,而非 logical) 相同數量的 threads 接收任務,並使用 `pthread_setaffinity_np()` 固定在不同的處理器上。
- 提供方法將任務排進 private queue 或 shared queue
- thread 每執行 32 個任務即交換 private queue 和 shared queue,以防 starvation。
[afn_threadpool_pi.c](https://github.com/93i7xo2/sysprog2021q1/blob/master/quiz4/afn_threadpool_pi.c) 採用 afn_threadpool.c 來計算 pi。
```c
int main(int argc, char **argv) {
...
threadpool_t *tp;
if (!tp_init(&tp, nthreads))
exit(EXIT_FAILURE);
tpool_future_t *futures[PRECISION + 1];
for (int i = 0; i <= PRECISION; i++) {
bpp_args[i] = i;
futures[i] = tp_queue(tp, bpp, (void *)&bpp_args[i]);
}
for (int i = 0; i <= PRECISION; i++) {
if (!futures[i])
continue;
double *result = tpool_future_get(futures[i], time_limit);
if (result) {
bpp_sum += *result;
free(result);
tpool_future_destroy(futures[i]);
}
}
tp_join(tp);
tp_destroy(tp);
...
}
```
### 效能比較
比較 3 種 thread pool 計算 $\pi$ (PRECISION=1000) 所需時間
1. mutex protected queue (orginal)
- `./threadpool_pi`
2. lock-free queue + affinity-based thread pool
- `./afn_threadpool_pi`
- w/o `pthread_setaffinity_np()`
3. lock-free queue
- `./afn_threadpool_pi_v2`
- w/ `pthread_setaffinity_np()`
[原始程式碼](https://github.com/93i7xo2/sysprog2021q1/tree/master/quiz4) 與其執行方式:
```bash
$ make benchmark && make plot
```
- [ ] 實驗結果 I - [Linode Dedicated CPU Instances](https://www.linode.com/blog/linode/introducing-linode-dedicated-cpu-instances/) (32 cores/64G RAM)
```
$ lscpu
Model name: AMD EPYC 7501 32-Core Processor
Stepping: 2
CPU MHz: 1999.998
BogoMIPS: 3999.99
Hypervisor vendor: KVM
Virtualization type: full
L1d cache: 2 MiB
L1i cache: 2 MiB
L2 cache: 16 MiB
L3 cache: 512 MiB
$ uname -r
5.4.0-72-generic
$ gcc --version
gcc (Ubuntu 9.3.0-17ubuntu1~20.04) 9.3.0
```
使用 lock-free 明顯減少執行時間,可惜隨著 thread 增加 throughtput 反而下降。
![](https://i.imgur.com/FDMvZdh.png)
接下來參考 [eecheng87](https://hackmd.io/@eecheng/BJpGSJWq8) 的作法使用 `sched_yield()`,在 dequeue 失敗時讓出 CPU,這樣的好處是如果單一處理器上有多個 thread,執行任務的優先拿到 CPU,減少無謂的 dequeue。因此可見 lock-free queue 的版本執行時間下降,本來就是單個 thread 獨占處理器的則不影響。
```diff
/* afn_thradpool.c */
- !(dequeue(p_queue, (void **)&task) || dequeue(s_queue, (void **)&task)));
+ !(dequeue(p_queue, (void **)&task) || dequeue(s_queue, (void **)&task))){
+ sched_yield();
+}
```
![](https://i.imgur.com/cTcsNHq.png)
- [ ] 實驗結果 II - AMD Ryzen 7 3800XT & Intel i5-6200U
由於 Linode 提供的虛擬機無法得知 CPU mapping 和實際分配的 physical core 數目,因此改用其他機器進行實驗,同時加入 likwid-topology。
[likwid-topology](https://github.com/RRZE-HPC/likwid/wiki/likwid-topology) 是一套列出 SMT thread 、快取與處理器核階層關係的工具。`HWThread` 代表在 linux 出現的 CPU 編號,也就是 htop 看到的 CPU0、CPU1...;`Thread` 則是處理器核上的 SMT Thread 編號;`Core` 代表 physical core,如下列所示。
實驗先使用 likwid 函式庫取得 cpu topology,將 thread 固定在不同的 physical core 上 (雖然從 `HWThread=0` 循序往下放效果一樣,但使用 likwid 方便日後指定在任意處理器核上實驗)。
```shell
$ likwid-topology -g
--------------------------------------------------------------------------------
CPU name: AMD Ryzen 7 3800XT 8-Core Processor
CPU type: nil
CPU stepping: 0
********************************************************************************
Hardware Thread Topology
********************************************************************************
Sockets: 1
Cores per socket: 8
Threads per core: 2
--------------------------------------------------------------------------------
HWThread Thread Core Socket Available
0 0 0 0 *
1 0 1 0 *
2 0 2 0 *
3 0 3 0 *
4 0 4 0 *
5 0 5 0 *
6 0 6 0 *
7 0 7 0 *
8 1 0 0 *
9 1 1 0 *
10 1 2 0 *
11 1 3 0 *
12 1 4 0 *
13 1 5 0 *
14 1 6 0 *
15 1 7 0 *
--------------------------------------------------------------------------------
```
```c
/* afn_threadpool_pi.c */
CpuTopology_t topo = get_cpuTopology();
int numSockets = topo->numSockets,
numCoresPerSocket = topo->numCoresPerSocket,
numHWThreads = topo->numHWThreads, cpulist[topo->numHWThreads], idx = 0;
for (int socket = 0; socket < numSockets; ++socket) {
for (int core = 0; core < numCoresPerSocket; ++core) {
for (int i = 0; i < numHWThreads; ++i) {
int threadId = topo->threadPool[i].threadId,
coreId = topo->threadPool[i].coreId,
packageId = topo->threadPool[i].packageId,
apicId = topo->threadPool[i].apicId;
if (packageId == socket && coreId == core) {
cpulist[idx + threadId * (numCoresPerSocket * numSockets)] = apicId;
}
}
idx++;
}
}
topology_finalize();
```
實驗過程中發現 likwid 初始化時間頗長,為了凸顯 lock-free 和 mutuex protected 兩者實作的差異,將 thread pool 初始化和釋放排除在測量時間外([`b3cd6`](https://github.com/93i7xo2/sysprog2021q1/commit/b3cd6764f44441c75da20a3d77e58533ed445d39)),結果發現時間大幅縮短,簡短的在 i5-6200U 上測試建立及釋放時間:
![](https://i.imgur.com/rES7enO.png)
![](https://i.imgur.com/YtEXftT.png)
![](https://i.imgur.com/KHpydVm.png)
和 task 相比,可見 worker thread 的建立及釋放時間頗長,thread pool 有其必要性。再來是運算時間:
- AMD R7-3800XT
![](https://i.imgur.com/XXWPQTx.png)
- Intel i5-6200U
![](https://i.imgur.com/irLSMol.png)
在 lock-free 的測試中,執行時間有 pinned 與否沒差多少。而隨著 thread count 增加,lock-free 實作始終優於原始版本。
## Pthread: Cancellation point
> 取自 man-pages:
- Creation: [`pthread_create()`](https://man7.org/linux/man-pages/man3/pthread_create.3.html)
```c
int pthread_create(pthread_t *restrict thread,
const pthread_attr_t *restrict attr,
void *(*start_routine)(void *),
void *restrict arg);
```
`thread`: 指向新建的 thread ID
`start_routine`: thread 建立後執行的函式,參數為 `arg`
```c
// Creating a new thread
pthread_create(&ptid, NULL, &func, NULL);
```
- Cancellation clean-up handlers
建立方式是由 `pthread_cleanup_push()` 將將函式逐一推進堆疊中,執行時從堆疊最上方依序執行
> [`pthread_exit()`](https://man7.org/linux/man-pages/man3/pthread_exit.3.html)
> Any clean-up handlers established by `pthread_cleanup_push(3)` that have not yet been popped, are popped (in the reverse of the order in which they were pushed) and executed.
Clean-up handlers 作用是在執行緒結束前釋放資源,包括
- mutex
- condition variables
- semaphores
- file descriptor
等不會在執行緒結束時釋放的資源。觸發情境有幾種:
1. 執行緒即將結束時呼叫 `pthread_exit()`,執行堆疊中所有 handlers
2. 由其他執行緒呼叫 `pthread_cancel()` 發出請求 (cancellation request),當具有 Deferred cancelability 的執行緒執行到 cancellation point 時,執行堆疊中所有 handlers
3. 執行緒呼叫 `pthread_cleanup_pop()` 從堆疊上取出最上層的 handler,可選擇執行與否,與 `pthread_cleanup_push()` 搭配使用
值得注意的是,結束執行緒若使用 `return`, handlers 將不會被呼叫,而 `return val` 意同 `pthread_exit(val)`。
> Clean-up handlers are not called if the thread terminates by performing a return from the thread start function.
> Performing a return from the start function of any thread other than the main thread results in an implicit call to `pthread_exit()`, using the function's return value as the thread's exit status.
- Cancellation point
- 是執行緒用來檢查是否取消的時間點。"取消"一詞指的是執行緒的終止,被請求或是正常執行到最後,最終釋放所有資源,雖然 `pthread_kill()` 也能做到執行緒的終止,但不會釋放資源。
- Cancellation point 可由 `pthread_testcancel()` 進行設置,其他執行緒以 [`pthread_cancel()`](https://man7.org/linux/man-pages/man3/pthread_testcancel.3.html) 送出請求 (cancellation request),當執行緒執行到 cancellation point 時才會取消。
- 由 [`pthread_setcancelstate()`](https://man7.org/linux/man-pages/man3/pthread_setcancelstate.3.html) 進行設置 cancelability 決定觸發與否,預設是 `ENABLE`
```c
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate)
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate)
```
- 由 [`pthread_setcanceltype()`](https://man7.org/linux/man-pages/man3/pthread_setcancelstate.3.html) 設置 cancelability tpye,預設是 `DEFERRED`,意思是請求會被推遲到下一個 cancellation point,`ASYNCHRONOUS` 則是接收到請求後立刻取消。
```c
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &oldtype)
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &oldtype)
```
當執行緒會保留/釋放資源時,設置 `ASYNCHRONOUS`,會讓執行緒收到請求後立刻處理,無法確立資源狀態是釋放前還是釋放後,使得 clean-up handler 無法正確的處理,不建議使用。而在文件中也註明只有 compute-bound loop 的執行緒或是下列 async-cancel-safe functions 適合用 `ASYNCHRONOUS`。
- pthread_cancel()
- pthread_setcancelstate()
- pthread_setcanceltype()
- [pthreads](https://man7.org/linux/man-pages/man7/pthreads.7.html) 明確定義哪些函式必須是/可能是 cancellation point
- 必須是
- pthread_cond_timedwait()
- pthread_cond_wait()
- pthread_testcancel()
[`pthread_cond_wait()`/`pthread_cond_timewait()`](https://man7.org/linux/man-pages/man3/pthread_cond_timedwait.3p.html) 為何是 cancellation point? 是為了防止 indefinite wait,例如等待輸入的資料從未被送出(e.g. `read()`),仍可以取消執行緒的執行。當接收到 cancellation request,如同 unblocked thread 一樣重新取得 mutex,但不是返回呼叫 `pthread_cond_wait()`/`pthread_cond_timedwait()` 的地方而是執行 clean-up handlers.
- [`pthread_join`](https://man7.org/linux/man-pages/man3/pthread_join.3.html)/[`pthread_exit()`](https://man7.org/linux/man-pages/man3/pthread_exit.3.html)
```c
void pthread_exit(void *retval);
int pthread_join(pthread_t thread, void **retval);
```
`pthread_exit()` 於執行緒結束時呼叫,返回的數值 `retval` 供其他呼叫 `pthread_join()` 的執行緒取得。`retval` 不可存在於執行緒的 stack 上,否則產生未定義行為。
`pthread_join()` 負責將目標執行緒返回的 exit status 複製到指定地址,如果該執行緒先前已取消,則複製 `PTHREAD_CANCELED` 到指定地址。
```c
/* example */
void *app1(void *x)
{
pthread_exit(20);
}
int main()
{
int ret;
pthread_t t1;
pthread_create(&t1, NULL, app1, NULL);
pthread_join(t1, &ret);
return 0;
}
```
## 參考資料
- [Geoff Langdale - Lock-Free Programming](https://www.cs.cmu.edu/~410-s05/lectures/L31_LockFree.pdf)
- [Acquire and Release Semantics](https://preshing.com/20120913/acquire-and-release-semantics/)
- [Where does the wait queue for threads lies in POSIX pthread mutex lock and unlock?](https://stackoverflow.com/questions/25419225/where-does-the-wait-queue-for-threads-lies-in-posix-pthread-mutex-lock-and-unloc)
- [Memory Reordering Caught in the Act](https://preshing.com/20120515/memory-reordering-caught-in-the-act/)
- [Memory Barriers Are Like Source Control Operations](https://preshing.com/20120710/memory-barriers-are-like-source-control-operations/)
- [Weak vs. Strong Memory Models](https://preshing.com/20120930/weak-vs-strong-memory-models/)
- [This Is Why They Call It a Weakly-Ordered CPU](https://preshing.com/20121019/this-is-why-they-call-it-a-weakly-ordered-cpu/)