--- tags: concurrency --- # [並行程式設計](https://hackmd.io/@sysprog/concurrency): Thread Pool 實作和改進 > 共筆貢獻者: jserv, 93i7xo2, ccs100203, linD026, bakudr18 ## Thread Pool [thread pool](https://en.wikipedia.org/wiki/Thread_pool) 的設計考量如下: 1. 在大量的執行緒環境中,建立和銷毀執行緒物件的開銷相當可觀,而且頻繁的執行緒物件建立和銷毀,會對高度並行化的應用程式帶來額外的延遲時間 2. 考慮到硬體的有效處理器數量有限,使用 thread pool 可控管執行分配到處理器的執行緒數量 用醫院來比喻: * 沒有 thread pool 時: 醫院每天要面對成千上萬的病人,每當一個病人求診,就找一位醫生處理,看診後醫生也跟著離開。當看病時間較短時,醫生來去的時間,顯得尤為費時 * 初步引入 thread pool: 醫院設置門診,把醫生全派出去坐診,病人想看診之前,強制先掛號排隊,醫生根據病人隊列順序,依次處理各個病人,這樣就省去醫生往返的時間。但倘若病患很少,醫生卻過多,這會使得很多醫生閒置,浪費醫療資源 * 改進 thread pool: 門診一開始只派出部分醫生,但增加一位協調人 (現實就是護理師擔任),病人依舊是排隊看病,協調人負責調度醫療資源。當病人很多、醫生忙不過來時,協調人就呼叫更多醫生來幫忙;當病人不多、醫生過多時,協調人就安排部分醫生休息待命,避免醫療資源的浪費 示意圖: ![](https://i.imgur.com/GedtciF.png) ## 適用並行運算的圓周率計算 3 月 14 日是[圓周率日](https://en.wikipedia.org/wiki/Pi_Day),這天也是愛因斯坦的生日,求圓周率近似值的討論可見: * video: [除了割圓術,圓周率還可以這樣算](https://youtu.be/BkDbVypDgSs) * video: [古人如何計算圓周率 π?](https://youtu.be/AvMaNDh_R0w) * video: [如何計算圓周率 π 的 1 億位?](https://youtu.be/BkDbVypDgSs) [Gregory-Leibniz 級數](https://mathworld.wolfram.com/GregorySeries.html)可優雅地計算圓周率,參考 [Leibniz's Formula for Pi](https://proofwiki.org/wiki/Leibniz%27s_Formula_for_Pi)。從下面的 _Madhava–Leibniz series_ 開始推導: $$ \arctan(1) = \dfrac{\pi}{4} = 1 - \dfrac{1}{3} + \dfrac{1}{5} - \dfrac{1}{7} +\ ... $$ 首先積分下列[數列](https://proofwiki.org/wiki/Leibniz%27s_Formula_for_Pi/Lemma) $$ \dfrac{1}{1+t^2} = 1 - t^2 + t^4 - t^6 + t^8 + ... + t^{4n} - \frac{t^{4n+2}}{1+t^2} $$ 從 $0$ 積分到 $x$, $0\leq{x}\leq1$ $$ \int_{0}^{x}\dfrac{1}{1+t^2}dt=x-\frac{x^3}{3}+\frac{x^5}{5}-\frac{x^7}{7}+...+\frac{x^{4n+1}}{4n+1}-R_n(x) \\where\ R_n(x)=\int_{0}^{x}\dfrac{t^{4n+2}}{1+t^2}dt $$ 先看 $R_n(x)$ ,因為 $1\leq1+t^2$,得到 $$ 0\leq{R_n(x)}\leq\int_{0}^{x}t^{4n+2}dt=\frac{x^{4n+3}}{4n+3} $$ 又因為 $$ \frac{x^{4n+3}}{4n+3}\leq\frac{1}{4n+3}, 0\leq{x}\leq1 $$ 依據[夾擠定理](https://zh.wikipedia.org/wiki/%E5%A4%BE%E6%93%A0%E5%AE%9A%E7%90%86) (squeeze theorem,也稱為 sandwich theorem),當 $n\rightarrow\infty, \frac{1}{4n+3}\rightarrow0$ ,於是得出下列式子 $$ \int_{0}^{x}\dfrac{1}{1+t^2}dt = x-\frac{x^3}{3}+\frac{x^5}{5}-\frac{x^7}{7}+\ ... $$ 而且$\frac{d}{dx}arctan(x)=\frac{1}{1+t^2}$ $$ \arctan(x) = x-\frac{x^3}{3}+\frac{x^5}{5}-\frac{x^7}{7}+\ ... $$ 此時將 $x$ 代入 1,即可得 $\frac{\pi}{4}$ 以下是對應實作: ```c double compute_pi_leibniz(size_t N) { double pi = 0.0; for (size_t i = 0; i < N; i++) { double tmp = (i & 1) ? (-1) : 1; pi += tmp / (2 * i + 1); } return pi * 4.0; } ``` 比較單執行緒、多執行緒和 SIMD 版本的表現: ![](https://i.imgur.com/x4gz1oE.png) 1995 年提出的 [Bailey–Borwein–Plouffe formula](https://en.wikipedia.org/wiki/Bailey%E2%80%93Borwein%E2%80%93Plouffe_formula),可用這三位發表者的姓氏開頭簡稱為 BBP 公式,最初的公式如下: $\pi=\displaystyle\sum_{k=0}^{\infty}\frac{1}{16^k}(\frac{4}{8k+1}-\frac{2}{8k+4}-\frac{1}{8k+5}-\frac{1}{8k+6})$ BBP 公式跳脫典型的圓周率的演算法,可計算圓周率的任意第 $n$ 位,而不用事先計算前面的 $n - 1$ 位,於是 BBP 公式很適合透過並行運算來求圓周率近似值。 典型的圓周率演算法必須計算前面的 $n- 1$ 位才能夠計算,代表數列每一項之間具相依性。以公式 $$ arctan(1)=\frac{π}{4}=1−\frac{1}{3}+\frac{1}{5}−\frac{1}{7}+ ... $$ 以及 $$ \sum_{k=1}^{\infty} \frac{1}{16^k}(\frac{4}{8k+1}-\frac{2}{8k+4}-\frac{1}{8k+5}-\frac{1}{8k+6}) $$ 來說,因為每一項的計算都只跟當下的 k 值有關,所以每個獨立的項都可以平行計算,但利用公式 $$\frac{\pi}{2}=\frac{2}{1}×\frac{2}{3}×\frac{4}{3}×\frac{4}{5}×\frac{6}{5}×... $$ 實作的其中一種方式如下: ```c double pi(size_t N) { double pi = 1; double n = 1; for (size_t j = 1;j <= N; j++, n++) { if (j & 1 == 0) { pi *= (n / (n + 1)); } else { pi *= ((n + 1) / n); } } return 2 * pi; } ``` 因為算出目前項之前需要先知道前面連乘得到的值 `pi` ,所以無法對每一項做並行運算。以下採用 BBP 公式搭配 Thread Pool 進行驗證。 ## 實作 以下是一個 [thread pool](https://en.wikipedia.org/wiki/Thread_pool) 實作: [tpool.c](https://github.com/sysprog21/concurrent-programs/blob/master/tpool/tpool.c) 預期執行輸出: ``` PI calculated with 101 terms: 3.141592653589793 ``` 程式架構示意: ```graphviz digraph struct { node [shape=record]; rankdir=LR; jobqueue_fetch [label="jobqueue_fetch()" shape=plaintext] bpp [label="bpp()" shape=plaintext] bpp2 [label="bpp()" shape=plaintext] "__threadpool" [ label =<<TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0"> <TR><TD width="100" PORT="ll"><B>__threadpool</B></TD></TR> <TR><TD PORT="l0">count</TD></TR> <TR><TD PORT="l1">workers</TD></TR> <TR><TD PORT="l2">jobqueue</TD></TR> </TABLE>> shape = "none" ]; "jobqueue_t" [ label =<<TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0"> <TR><TD width="100" PORT="ll"><B>jobqueue_t</B></TD></TR> <TR><TD PORT="l0">*head</TD></TR> <TR><TD PORT="l1">*tail</TD></TR> <TR><TD PORT="l2">cond_nonempty</TD></TR> <TR><TD PORT="l3">rwlock</TD></TR> </TABLE>> shape = "none" ]; "threadtask_t" [ label =<<TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0"> <TR><TD width="100" PORT="ll"><B>threadtask_t</B></TD></TR> <TR><TD PORT="l0">*func</TD></TR> <TR><TD PORT="l1">*arg</TD></TR> <TR><TD PORT="l2">*future</TD></TR> <TR><TD PORT="l3">*next</TD></TR> </TABLE>> shape = "none" ]; "threadtask_t1" [ label =<<TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0"> <TR><TD width="100" PORT="ll"><B>threadtask_t</B></TD></TR> <TR><TD PORT="l0">*func</TD></TR> <TR><TD PORT="l1">*arg</TD></TR> <TR><TD PORT="l2">*future</TD></TR> <TR><TD PORT="l3">*next</TD></TR> </TABLE>> shape = "none" ]; "__tpool_future" [ label =<<TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0"> <TR><TD width="100" PORT="ll"><B>__tpool_future</B></TD></TR> <TR><TD PORT="l0">flag</TD></TR> <TR><TD PORT="l1">*result</TD></TR> <TR><TD PORT="l2">mutex</TD></TR> <TR><TD PORT="l3">cond_finished</TD></TR> </TABLE>> shape = "none" ]; "__tpool_future1" [ label =<<TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0"> <TR><TD width="100" PORT="ll"><B>__tpool_future</B></TD></TR> <TR><TD PORT="l0">flag</TD></TR> <TR><TD PORT="l1">*result</TD></TR> <TR><TD PORT="l2">mutex</TD></TR> <TR><TD PORT="l3">cond_finished</TD></TR> </TABLE>> shape = "none" ]; threadtask_t:l2->__tpool_future:ll threadtask_t1:l2->__tpool_future1:ll jobqueue_t:l0->threadtask_t:ll jobqueue_t:l1->threadtask_t1:ll threadtask_t:l0->bpp:w threadtask_t1:l0->bpp2:w threadtask_t:l3->threadtask_t1:ll __threadpool:l2->jobqueue_t:ll __threadpool:l1->jobqueue_fetch:w } ``` ### Thread Pool & Job Queue ```c struct __threadpool { size_t count; pthread_t *workers; jobqueue_t *jobqueue; }; typedef struct __jobqueue { threadtask_t *head, *tail; pthread_cond_t cond_nonempty; pthread_mutex_t rwlock; } jobqueue_t; ``` - `count` 紀錄 thread pool 內 worker thread (即 `workers`) 的數量 - worker 閒置時向 `__jobqueue` 抓取被封裝的任務 `threadtask_t` 執行,以 FIFO 方式取出任務,`tail` 指向下一個待取出的任務。 - 存取共享變數 `__jobqueue` 必須保證為 exclusive use,若無任務 worker 則以 `pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock)` 等待新的任務 ### Thread Task ```c typedef struct __threadtask { void *(*func)(void *); void *arg; struct __tpool_future *future; struct __threadtask *next; } threadtask_t; struct __tpool_future { int flag; void *result; pthread_mutex_t mutex; pthread_cond_t cond_finished; }; ``` - worker 取得函式 `func` 執行的結果放在共享變數 `future`,而同時可能有其他執行緒存取 `future` 取得運算結果,同樣需要保護。 - `__threadtask` 與 `__tpool_future` 成對建立 - worker 設置 flag 表示運算進行 (`__FUTURE_RUNNING`) 或完成 (`__FUTURE_FINISHED`) - 其他執行緒設置 flag 以干涉 worker 行為 - `__FUTURE_CANCELLED` - `__FUTURE_DESTROYED` ## 程式流程 1. `tpool_create()`: 建立一定數量的 worker 於 pool,worker 執行 `jobqueue_fetch()` 抓取 job queue 內的任務 2. `tpool_apply()`: 將新建立的 `threadtask_t` 推進 job queue,每個 `threadtask_t` 各指向一個 `__tpool_future` 用以儲存運算結果。同時以 `pthread_cond_broadcast()` 喚醒等待 job queue 有新任務的 worker 3. `tpool_future_get()` 和 `tpool_future_destroy()`: 從 `__tpool_future` 取回運算結果後釋放 4. `tpool_join()`: 呼叫 `tpool_apply` 新增空任務,worker 拿到空任務後結束執行緒,緊接著釋放 pool 和 job - `jobqueue_fetch()` ```graphviz digraph finite_state_machine { node [shape=point,label=""]ENTRY,EXIT; rankdir=TD get_task[shape=box label="Get a new task from queue"] set_run[shape=box label="Set RUNNING"] set_fin[shape=box label="Set FINISHED"] broadcast[shape=box label="pthread_cond_broadcast()"] free_task[shape=box label="free(__threadtask)"] free_future[shape=box label="free(__tpool_future)"] is_cancelled[shape=diamond, label="CANCELLED?"] is_destroyed[shape=diamond, label="DESTROYED?"] is_null_func[shape=diamond, label="Valid function?"] ENTRY->get_task; get_task->is_null_func [tailport=e] is_null_func->is_cancelled [label="Yes"] is_cancelled->free_task [label="Y", tailport=e, headport=e] is_cancelled->set_run [label="N"] set_run->is_destroyed [label=" Wait for function to finish"] is_destroyed->free_future [label="Y"] is_destroyed->set_fin [label="N", tailport=e] free_future->free_task set_fin->broadcast broadcast->free_task free_task->get_task [tailport=w, headport=w] is_null_func->EXIT [tailport=e, label="No. Terminated"] } ``` - `__FUTURE_CANCELLED` 與 `__FUTURE_DESTROYED` 的使用情境 - 在 `tpool_future_destroy()` 內,若 `future->flag` 為 `__FUTURE_FINISHED` 或`__FUTURE_CANCELLED` 兩者之一,表示 `__threadtask` 已被釋放無法被 worker 拿到,故在此釋放資源。 若非以上情況則說明對應的 `__threadtask` 仍可存取該 `future`,故將 `future->flag` 設置為 **`__FUTURE_DESTROYED`** 之後由 worker 來釋放 future 的資源。 - 同理 `jobqueue_destroy()` 也有必要依據 **`__FUTURE_DESTROYED`** 來釋放 future - 使用到 `pthread_cancel()` 不代表設置 `__FUTURE_CANCELLED`。`pthread_cancel()` 僅在 `tpool_create()` 建立 worker 失敗時使用,向先前建立的 worker 發送 cancellation request ```c! /* jobqueue_fetch */ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state); pthread_testcancel(); while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state); ``` - `jobqueue_destroy()` 是唯一會將 flag 設置為 `__FUTURE_CANCELLED`,猜測使用情境是想中斷所有的 worker,摧毀所有 job queue 內的任務但保留 `__tpool_future` 自行處理 (但並沒有使用到關鍵的 `rwlock`)。 目前 `jobqueue_destroy()` 只有用在 `tpool_join()` 或是用在 `tpool_create()` 失敗時摧毀空 queue ```c int tpool_join(struct __threadpool *pool) { ... jobqueue_destroy(pool->jobqueue); ... } ``` ```c struct __threadpool *tpool_create(size_t count) { ... jobqueue_destroy(jobqueue); free(pool); return NULL; } ``` ```c static void jobqueue_destroy(jobqueue_t *jobqueue) { threadtask_t *tmp = jobqueue->head; while (tmp) { /* Never executed */ } pthread_mutex_destroy(&jobqueue->rwlock); pthread_cond_destroy(&jobqueue->cond_nonempty); free(jobqueue); } ``` 總之,沒有一種情況是同時有 worker 在運行。 ## 實作缺失回顧 ### 修正 clock 屬性 `pthread_cond_timedwait()` 的 expire time 使用 `CLOCK_MONOTONIC` ```c struct timespec expire_time; clock_gettime(CLOCK_MONOTONIC, &expire_time); expire_time.tv_sec += seconds; int status = pthread_cond_timedwait(&future->cond_finished, &future->mutex, &expire_time); ``` > **pthread_cond_timedwait** > The condition variable shall have a clock attribute which specifies the clock that shall be used to measure the time specified by the abstime argument. > **pthread_condattr_setclock** > The default value of the clock attribute shall refer to the system clock. 因為 `pthread_cond_timedwait()` 需要 condition variable 具有 clock attribute,而預設的 system clock 用 `pthread_condattr_getclock()` 取得得到 `CLOCK_REALTIME`,非預期的 `CLOCK_MONOTONIC`。 `CLOCK_REALTIME` 會跟 NTP 校正時間,可能往前或往後;而 `CLOCK_MONOTONIC` 則保證單調遞增,從特定點開始計數,[clock_getres](https://man7.org/linux/man-pages/man2/clock_getres.2.html) 說明在 Linux 上,特定點指的是開機時間。兩者差異甚大,因此初始化時還需指定 clock attribute。 ```diff static struct __tpool_future *tpool_future_create(void) { struct __tpool_future *future = malloc(sizeof(struct __tpool_future)); if (future) { future->flag = 0; future->result = NULL; pthread_mutex_init(&future->mutex, NULL); pthread_condattr_t attr; pthread_condattr_init(&attr); + pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); pthread_cond_init(&future->cond_finished, &attr); pthread_condattr_destroy(&attr); } return future; } ``` ### Timeout 處理 在 [8dc72](https://github.com/93i7xo2/sysprog2021q1/commit/8dc72a44e8ccc73685d9c9b7aaeefd49c1796f27) 實現 timeout 處理,不使用 `__FUTURE_CANCELLED` 和 `__FUTURE_DESTROYED` 達成。 使用 `tpool_future_get()` 取得計算結果發生逾時時,嘗試從 job queue 移除 task 和 future: 1. 檢查 future->flag 是否有 `__FUTURE_RUNNING`,若有代表 worker 已取得對應的 task 進行計算,現行版本尚不支援中斷進行計算中的 worker,於是等待計算結果,結果等同 ```c tpool_future_get(futures[i], 0) ``` 2. 若無 `__FUTURE_RUNNING`,表示對應的 task 仍存於 jobqueue,將其連同 future 移除並釋放資源。 試著從 `futures[i]` 最後一項開始取得計算結果,設置 time limit = 1 ms。由於計算上是由 `futures[0]` 開始至 `futures[PRECISION]`,一開始尾端的 task 尚未被 worker 取得計算,future 自然等不到 `pthread_cond_broadcast()`,因此從輸出可見一些 future 被移除。 ```diff - for (int i = 0; i <= PRECISION; i++) { + for (int i = PRECISION; i >= 0; i--) { if (!futures[i]) continue; double *result = tpool_future_get(futures[i], time_limit, pool->jobqueue); if (result) { bpp_sum += *result; free(result); tpool_future_destroy(futures[i]); DEBUG_PRINT(("Future[%d] completed!\n", i)); } else DEBUG_PRINT(("Cannot get future[%d], timeout after %d milliseconds.\n", i, time_limit)); } ``` ```bash $ ./pi 4 1 Thread count: 4 Time limit: 1 ms Cannot get future[100], timeout after 1 milliseconds. /* future[100] is removed */ Future[99] completed! ... Future[0] completed! Elapsed time: 11592788 ns PI calculated with 101 terms: 3.141592653589793 ``` ## 鏈結串列的效率 原實作中 [pop task](https://github.com/bakudr18/quiz4/blob/87d88e4cbaa11f6d3ed51e90c6f06f6c4a55d1b2/tpool.c#L166-L169) 的方式是走訪整個 鏈結串列取出 tail ,其 time complexity 為 $O(n)$ ,造成在 critical section 內駐留過久,這對於 multithread 效能的傷害是很大的,藉由 `sudo taskset 0xF0 ./pi` 以 4 core CPU 測量 `PRECISION = 100000` 時的執行時間,多執行幾次可發現,雖然大多數執行時間落在 160~200 milliseconds ,但偶爾會有超過 100 seconds 或更久的執行時間,以 [perf record](https://man7.org/linux/man-pages/man1/perf-record.1.html) 紀錄超過 100 seconds 的 process,發現有約 83% 的時間在執行 `for (tmp = jobqueue->head; tmp->next != jobqueue->tail; tmp = tmp->next);` ,而越是減少可用的 CPU 此情況會更嚴重(因為當 main thread 執行時不斷 push task,若 OS scheduler 沒有頻繁 context switch 給 `jobqueue_fetch` 消化 task,鏈結串列會增長非常快)。 因此,為了使 pop task 達到 $O(1)$ ,可將原本的 `jobqueue` 改成雙向鏈結串列,後來更是直接引入 [linux/list.h](https://github.com/torvalds/linux/blob/master/include/linux/list.h) 以環狀雙向鏈結串列取代原本的單向鏈結串列,如此不但使 pop task 達到 $O(1)$ ,也減少了在 `jobqueue->head == jobqueue->tail` 的 if statement ,詳細程式碼可見 [commit f8761](https://github.com/bakudr18/quiz4/commit/f8761d6b4c884f1a96ef80f08d590fd5573aacee) ,而修改後以 4 core 執行的平均時間約為 138 milliseconds,且沒有特別的 outliers 。 ```diff typedef struct __threadtask { void *(*func)(void *); /* the function that task should execute */ void *arg; /* argument passed to func */ struct __tpool_future *future; /* A structure to store task status and result */ - struct __threadtask *next; /* pointer to next task */ + struct list_head list; /* linked list of task structure */ } threadtask_t; typedef struct __jobqueue { - threadtask_t *head, *tail; /* store head and tail of queue */ + struct list_head head; /* list head of task */ pthread_cond_t cond_nonempty; /* condition variable to check if queue is non-empty */ pthread_mutex_t rwlock; /* lock share resources like head and tail */ } jobqueue_t; ``` ## 研讀 [atomic_threadpool](https://github.com/Taymindis/atomic_threadpool) atomic_threadpool 是一套使用 lock-free FIFO queue (`lfqueue_deq`) 實作的函式庫,支援 MS-Windows, macOS 與 Linux,近一半的程式碼是對不同平台所提供的 API 進行調整。 首先看較重要的函式 `at_thpool_worker`,功能等同 `jobqueue_fetch`,旨在從 job queue 抓取 task,task 由函式 `task->do_work` 及參數 `task->arg` 組成,由於沒有需要存入運算結果, task 運算結束後返回 `TASK_PENDING` 抓取下一個 task。 使用 `tp->nrunning` 來表示進行中的 worker 數量,設置 `tp->is_running=0` 能立即終止尚未執行 task 的 worker。 ```c void *at_thpool_worker(void *_tp) { at_thpool_t *tp = (at_thpool_t *) _tp; AT_THPOOL_INC(&tp->nrunning); // __sync_fetch_and_add(v, 1) at_thtask_t *task; void *_task; lfqueue_t *tq = &tp->taskqueue; TASK_PENDING: while (tp->is_running) { if ((_task = lfqueue_deq(tq))) goto HANDLE_TASK; lfqueue_sleep(1); } AT_THPOOL_DEC(&tp->nrunning); // __sync_fetch_and_dec(v, 1) return NULL; HANDLE_TASK: task = (at_thtask_t*) _task; task->do_work(task->args); AT_THPOOL_FREE(task); goto TASK_PENDING; } ``` 當使用 `lfqueue_deq` 從 job queue 從尾端取得 task 時,理想狀況其他執行緒可同時使用 `lfqueue_enq` 插入新的 task 而不衝突,而 lfqueue 也確實做到。 ```c while (tp->is_running) { if ((_task = lfqueue_deq(tq))) goto HANDLE_TASK; lfqueue_sleep(1); } ``` ### `lfqueue` atomic_threadpool 所使用到的 `lfqueue` API,共同操作同一個 lfqueue_t 型態的 `lfqueue`: ```c extern int lfqueue_init(lfqueue_t *lfqueue); extern int lfqueue_enq(lfqueue_t *lfqueue, void *value); extern void* lfqueue_deq(lfqueue_t *lfqueue); extern void lfqueue_destroy(lfqueue_t *lfqueue); ``` ```c typedef struct { lfqueue_cas_node_t *head, *tail, *root_free, *move_free; volatile size_t size; volatile lfq_bool_t in_free_mode; lfqueue_malloc_fn _malloc; lfqueue_free_fn _free; void *pl; } lfqueue_t; ``` 結構內有兩條佇列 1. `head`/`tail`: 分別指向佇列前端/尾端,`head` 同時也指向上一個取出的節點, 2. `root_free`/`move_free`: `root_free` 分別指向佇列前端/尾端,紀錄等待被釋放的節點。每個 `lfqueue_cas_node_t` 型態的節點都帶有 lfq_time_t 型態的 `_deactivate_tm` 紀錄進入佇列的時間。 - `__lfq_check_free` ```c= static void __lfq_check_free(lfqueue_t *lfqueue) { lfq_time_t curr_time; // 限制只有一條執行緒能夠執行 if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->in_free_mode, 0, 1)) { // 以 rootfree 為起始刪除後面節點 // 並限制離 _deactivate_tm 超過 2s lfq_get_curr_time(&curr_time); lfqueue_cas_node_t *rtfree = lfqueue->root_free, *nextfree; while ( rtfree && (rtfree != lfqueue->move_free) ) { nextfree = rtfree->nextfree; if ( lfq_diff_time(curr_time, rtfree->_deactivate_tm) > 2) { // printf("%p\n", rtfree); lfqueue->_free(lfqueue->pl, rtfree); rtfree = nextfree; } else { break; } } lfqueue->root_free = rtfree; __LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->in_free_mode, 1, 0); } __LFQ_SYNC_MEMORY(); } ``` 第 9 行表明佇列至少存在一個節點,因此 `move_free` 不存在指向 NULL 的可能性,好處是**插入時不用判斷是否指向 NULL** ```c move_free->next = new_node; ``` 缺點是針對程式起始、結束要特別處理多出來的一個節點,另一條佇列同樣情況。 ```c int lfqueue_init_mf(lfqueue_t *lfqueue, void* pl, lfqueue_malloc_fn lfqueue_malloc, lfqueue_free_fn lfqueue_free) { ... freebase->value = NULL; freebase->next = NULL; freebase->nextfree = NULL; freebase->_deactivate_tm = 0; lfqueue->head = lfqueue->tail = base; // Not yet to be free for first node only lfqueue->root_free = lfqueue->move_free = freebase; // Not yet to be free for first node only ... } ``` ```c void lfqueue_destroy(lfqueue_t *lfqueue) { ... if (rtfree) { // rtfree will never be NULL lfqueue->_free(lfqueue->pl, rtfree); } ... } ``` ### `__lfq_recycle_free` `__lfq_check_free` 稍早已提到功能是清除以 root_free 為首的佇列,而 `__lfq_recycle_free` 便是將節點推入佇列。因為在多執行緒下,如果直接把節點的資源釋放掉可能會在之後的操作造成錯誤。 ```graphviz digraph main { rankdir = LR node [shape = box] freebase [label = "free base"] node_1 [label = "free node 1"] subgraph cluster_move { label = "CAS(&freed->nextfree, NULL, freenode)" freebase -> node_1[label = "nextfree"] subgraph cluster_freed { label = "lfqueue->root_free\nlfqueue->move_free\nfreed" freebase } } } ``` ```graphviz digraph main { rankdir = LR node [shape = box] compound = true freebase [label = "free base"] node_1 [label = "free node 1"] subgraph cluster_cas { label = "CAS(&lfqueue->move_free, freed, freenode)" subgraph cluster_freed { label = "lfqueue->root_free\nlfqueue->move_free\nfreed" freebase } subgraph cluster_move { label = "lfqueue->move_free\"" } subgraph cluster_freed_l { label = "freed\"" temp [label = "free base"] } freebase -> temp[ltail = cluster_freed, lhead = cluster_freed_l] freebase -> node_1[label = "compare\n success" ltail = cluster_freed, lhead = cluster_move] } } ``` ```c= static void __lfq_recycle_free(lfqueue_t *lfqueue, lfqueue_cas_node_t* freenode) { lfqueue_cas_node_t *freed; do { freed = lfqueue->move_free; } while (!__LFQ_BOOL_COMPARE_AND_SWAP(&freed->nextfree, NULL, freenode) ); lfq_get_curr_time(&freenode->_deactivate_tm); __LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->move_free, freed, freenode); } ``` - 佇列尾端 `move_free->nextfree = NULL`。 - 節點型態雖為 `lfqueue_cas_node_t` 但非用 `next` 而是以 `nextfree` 進行連結。 - 節點在 enqueue 時即初始化 `nextfree = NULL`,因此 `freenode->nextfree = NULL`。 - 當第 6 行以 `__LFQ_BOOL_COMPARE_AND_SWAP` 插入節點 `freenode` 至 `move_free` 後端,即限制了直到第 10 結束都沒有其他執行緒可執行下去,形成 critical section。 ```graphviz digraph { // rankdir=TB subgraph MVF { label="Local Datacenter"; mvf [shape=record label="|<h>nextfree"] NULL1 [shape=plaintext label="NULL"] } subgraph FN { label="Local Datacenter"; fn [shape=record label="|<h>nextfree"] NULL2 [shape=plaintext label="NULL"] } movefree [shape=plaintext] freenode [shape=plaintext] movefree -> mvf freenode -> fn mvf:h:e -> NULL1:w [style="dashed"] mvf:h:e -> fn:w fn:h:e -> NULL2:w {rank = same; movefree; freenode;} {rank = same; NULL1; NULL2;} } ``` ```graphviz digraph main { rankdir = LR node [shape = box] label = "free mode\n\nloop\nif rtfree != NULL && rtfree != lfqueue->move_free" labelloc = "t" compound = true freebase [label = "lfqueue->root_free\n(free base)"] node_1 [label = "free node 1"] subgraph cluster_free { label = "free ( functoin )" subgraph cluster_rtfree { label = "rtfree" freebase } } subgraph cluster_move { label = "lfqueue->move_free" node_1 } freebase -> node_1[label = "nextfree"] node_1 -> freebase[label = "assign to\nrbfree", lhead = cluster_rtfree] } ``` ### `lfqueue_enq` 插入新節點 ```c static int _enqueue(lfqueue_t *lfqueue, void* value) { lfqueue_cas_node_t *tail, *node; node = (lfqueue_cas_node_t*) lfqueue->_malloc(lfqueue->pl, sizeof(lfqueue_cas_node_t)); if (node == NULL) { perror("malloc"); return errno; } node->value = value; node->next = NULL; node->nextfree = NULL; for (;;) { __LFQ_SYNC_MEMORY(); // 沒有 memory barrier 會因為編譯器最佳化或是因為 out-of-order execution 無法拿到最新的 tail tail = lfqueue->tail; // 不可能發生 tail = NULL if (__LFQ_BOOL_COMPARE_AND_SWAP(&tail->next, NULL, node)) { /* 由於使用 atomic operation * 同一時間只會有一個執行緒更新 tail->next * 接下來才會將 tail 指向新的尾端完成整個插入流程 * 另一個執行緒所拿到的 tail->next 才會是 NULL */ // compulsory swap as tail->next is no NULL anymore, it has fenced on other thread __LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->tail, tail, node); __lfq_check_free(lfqueue); // cleanup return 0; } } /*It never be here*/ return -1; } ``` 觀察 linux 上的實作,使用到 lock-free 技巧常見的 CAS 等 atomic operation,令人不解的是大部份函式都有 [full barrier](https://gcc.gnu.org/onlinedocs/gcc-4.5.3/gcc/Atomic-Builtins.html) 的作用,這裡特地引進 `__sync_synchronize` (full barrier)。 ```c #define __LFQ_VAL_COMPARE_AND_SWAP __sync_val_compare_and_swap #define __LFQ_BOOL_COMPARE_AND_SWAP __sync_bool_compare_and_swap #define __LFQ_FETCH_AND_ADD __sync_fetch_and_add // skip #define __LFQ_ADD_AND_FETCH __sync_add_and_fetch #define __LFQ_YIELD_THREAD sched_yield #define __LFQ_SYNC_MEMORY __sync_synchronize ``` ### `lfqueue_deq` 從佇列前端移去節點,要判斷的情況較多。改寫部份程式碼和搭配註解。 ```graphviz digraph main { rankdir = LR node [shape = box] compound = true inter [label = "..."] head [label = "head" shape = plaintext] tail [label = "tail" shape = plaintext] node_1 -> _next -> inter -> node_n tail -> node_n {rank = same node_n, tail} subgraph cluster_pop { label = "CAS(&lfqueue->head, head, head)" head -> node_1 } subgraph cluster_next { label = "next" labelloc = "t" _next } } ``` - 8 行的 CAS 應是為了避免取得 `head` 後,原有的 head 指向的節點被其他執行緒釋放導致 `head->next` 產生未定義行為。實際上釋放會在 2 秒後由 `__lfq_check_free` 執行,發生未定義行為的可能性極微,即使加了 CAS 也不能避免 8~9 行間 `__lfq_check_free` 釋放掉 `head`。 ```c= static void * _dequeue(lfqueue_t *lfqueue) { lfqueue_cas_node_t *head, *next; void *val; for (;;) { head = lfqueue->head; if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->head, head, head)) { next = head->next; if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->tail, head, head)) { /* head == tail 代表無可取出節點,此時 next 必為 NULL * 這時可能有 `lfqueue_enq` 執行插入節點產生 `next != NULL` 的情況 * 故需做判斷 */ if (next == NULL) { /* 遇到這種情況重新取值即可,但一些函式需要回傳值判斷 queue 為空 */ val = NULL; goto _done; } } else { /* 目的是排除 next = NULL 的狀況,以免 dereference 出錯 * 正常情況下 next = NULL 只有在 head = tail 情況成立 */ if (next) { val = next->value; if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->head, head, next)) { break; } } else { val = NULL; goto _done; } } } } __lfq_recycle_free(lfqueue, head); _done: // __asm volatile("" ::: "memory"); __LFQ_SYNC_MEMORY(); __lfq_check_free(lfqueue); return val; } ``` 使用 gdb 觀察 31 行發生當下: ```bash (gdb) p next $14 = (lfqueue_cas_node_t *) 0x0 (gdb) p head->next $15 = (struct lfqueue_cas_node_s *) 0x55555bf83db0 (gdb) p head $17 = (lfqueue_cas_node_t *) 0x55555bf83d80 (gdb) p *head->next $18 = {value = 0x555555c57490, next = 0x55555bf83de0, nextfree = 0x55555bf83de0, _deactivate_tm = 1623174704} ``` ```c=7 head = lfqueue->head; next = head->next; ``` gdb 顯示 `next != head->next`,明顯與上方程式碼矛盾。表明 7~10 行即使通過 `__LFQ_BOOL_COMPARE_AND_SWAP`,`head` 可能經由其他執行緒的 enqueue,導致 `next` 指向其他新的節點。 ```graphviz digraph main { rankdir = LR node [shape = box] compound = true inter [label = "..."] head [label = "head" shape = plaintext] tail [label = "tail" shape = plaintext] subgraph cluster_empty { label = "CAS(&lfqueue->tail, head, head)" labelloc = "t" head node_1 } head -> node_1 node_1 -> _next -> inter -> node_n node_n -> tail[dir = back] subgraph cluster_next { label = "next" labelloc = "b" _next } NULL [label = "next == NULL", shape = oval] _next -> NULL [ltail = cluster_next] val [label = "next != NULL", shape = oval] _next -> val [ltail = cluster_next] inter -> NULL[style = invis] {rank = same inter, NULL, val} } ``` ```graphviz digraph main { rankdir = LR node [shape = box] compound = true head [label = "head" shape = plaintext] temp [label = "assign\n_next to\nhead" shape = plaintext] val [label = "next != NULL", shape = oval] val -> head [lhead = cluster_pop_suc] subgraph cluster_pop_suc { label = "CAS(&lfqueue->head, head, next)" node_1 -> _next head -> node_1 {rank = same head, node_1} head -> temp [label = "if lfqueue->head\nis node_1 (head)"] temp -> _next[label = "head' "] {rank = same temp, _next} } } ``` :::info 此實作並未解決 [ABA problem](https://en.wikipedia.org/wiki/ABA_problem) ::: ## 使用 C11 Atomics 改寫 > The idea of "lock free" is not really not having any lock, the idea is to minimize the number of locks and/or critical sections, by using some techniques that allow us not to use locks for most operations. > In this sense, the lock in lock-free does not refer directly to mutexes, but rather to the possibility of “locking up” the entire application in some way, whether it’s deadlock, livelock – or even due to hypothetical thread scheduling decisions made by your worst enemy. - [An Introduction to Lock-Free Programming](https://preshing.com/20120612/an-introduction-to-lock-free-programming/) 廣泛的說,lock-free 並非指不使用 mutex (lock) 而是指鎖住整個 process 的可能性,下方程式碼雖然沒有使用到 mutex,但若兩個 thread 執行同樣的程式碼,在特定執行順序下永遠無法離開 loop。 ```c while (X == 0) { X = 1 - X; } ``` 而使用 mutex 的 thread 在最差情況下,所使用的 mutex 尚未被其他 thread 所釋放,而停滯不前,甚至導致 lock 之間的競爭,自然不算在 lock-free 範疇。 ### lock-free queue 參考 [RZHuangJeff](https://github.com/RZHuangJeff/tpool/commit/f57bb7e86caef6808ff2c7d9b4b5868ef49ef6f3) 使用 ring buffer 實作 atomic queue。 使用 ring buffer 有無需管理記憶體及固定緩衝區大小的好處,再以 [mmap](https://man7.org/linux/man-pages/man2/mmap.2.html) 處理緩衝區邊界,減少判斷讀寫是否會超出邊界所帶來的效能影響。 設計上只有一個 producer,使用 `count` 來紀錄存入的資料數,consumer 依照 `count` 決定是否往下執行,避開判斷邊界(如下)。由於 `head`, `tail` 都是 atomic,依據邊界來判斷需於一個 CAS 指令內對兩個變數進行操作,較為不便。 ```c // is full head == (tail ^ rbf->size*2) // is empty head == tail ``` 為避免 ABA problem,將用來存 offset 的變數前 32-bit 放置要存入 buffer 的資料的一小部份,例如 `(void *)` 型態的資料為 8 byte,假設資料前 4 byte 相同不具特徵(e.g. 連續記憶體位置的指標),將後 4 byte 併入 offset 的高 4 byte,這麼做即使在 CAS 判斷 offset 相同,也能透過前 4 byte 得知資料更動。 ```c /* producer */ bool enqueue(ringbuffer_t *rb, void **src) { uint64_t _read, _write; _read = atomic_load(&rb->read_offset) & RB_OFF_MASK; _write = atomic_load(&rb->write_offset) & RB_OFF_MASK; if (_read == (_write ^ rb->size)) return false; memcpy(&rb->buffer[_write], src, sizeof(void *)); _write = (_write + sizeof(void *)) & rb->mask; _write |= ((uint64_t)*src << 32); atomic_store_explicit(&rb->write_offset, _write, memory_order_release); atomic_fetch_add_explicit(&rb->count, 1, memory_order_release); return true; } /* consumer */ bool dequeue(ringbuffer_t *rb, void **dst) { int64_t count, new_count; do { count = atomic_load(&rb->count); new_count = count - 1; if (__builtin_expect((new_count < 0), 1)) return false; } while (!atomic_compare_exchange_weak(&rb->count, &count, new_count)); uint64_t _read, new_read; do { _read = atomic_load(&rb->read_offset); new_read = (((_read & RB_OFF_MASK) + sizeof(void *)) & rb->mask); memcpy(dst, &rb->buffer[_read & RB_OFF_MASK], sizeof(void *)); } while (!atomic_compare_exchange_weak(&rb->read_offset, &_read, new_read)); return true; } ``` ### affinity-based thread pool [Thread safety with affine thread pools](https://bartoszsypytkowski.com/thread-safety-with-affine-thread-pools/) 一文提到執行緒需要為在處理器核 (core) 切換間付出 context switching 及 cache refresh/invalidation 的代價,因此主張同一段程式碼應固定在同一個處理器執行。 此外,該文也提出 [work stealing](https://en.wikipedia.org/wiki/Work_stealing) 的實作方式:所有 thread 應有屬於自己的 private queue 及共享的 shared queue。thread 從 private queue 提取任務執行,在閒置時則從 shared queue 提取,以最大化利用資源。 [afn_threadpool.c](https://github.com/93i7xo2/sysprog2021q1/blob/master/quiz4/afn_threadpool.c) 使用 lock-free queue 實作上述功能,達到以下需求: - 建立與處理器核數 (physical,而非 logical) 相同數量的 threads 接收任務,並使用 `pthread_setaffinity_np()` 固定在不同的處理器上。 - 提供方法將任務排進 private queue 或 shared queue - thread 每執行 32 個任務即交換 private queue 和 shared queue,以防 starvation。 [afn_threadpool_pi.c](https://github.com/93i7xo2/sysprog2021q1/blob/master/quiz4/afn_threadpool_pi.c) 採用 afn_threadpool.c 來計算 pi。 ```c int main(int argc, char **argv) { ... threadpool_t *tp; if (!tp_init(&tp, nthreads)) exit(EXIT_FAILURE); tpool_future_t *futures[PRECISION + 1]; for (int i = 0; i <= PRECISION; i++) { bpp_args[i] = i; futures[i] = tp_queue(tp, bpp, (void *)&bpp_args[i]); } for (int i = 0; i <= PRECISION; i++) { if (!futures[i]) continue; double *result = tpool_future_get(futures[i], time_limit); if (result) { bpp_sum += *result; free(result); tpool_future_destroy(futures[i]); } } tp_join(tp); tp_destroy(tp); ... } ``` ### 效能比較 比較 3 種 thread pool 計算 $\pi$ (PRECISION=1000) 所需時間 1. mutex protected queue (orginal) - `./threadpool_pi` 2. lock-free queue + affinity-based thread pool - `./afn_threadpool_pi` - w/o `pthread_setaffinity_np()` 3. lock-free queue - `./afn_threadpool_pi_v2` - w/ `pthread_setaffinity_np()` [原始程式碼](https://github.com/93i7xo2/sysprog2021q1/tree/master/quiz4) 與其執行方式: ```bash $ make benchmark && make plot ``` - [ ] 實驗結果 I - [Linode Dedicated CPU Instances](https://www.linode.com/blog/linode/introducing-linode-dedicated-cpu-instances/) (32 cores/64G RAM) ``` $ lscpu Model name: AMD EPYC 7501 32-Core Processor Stepping: 2 CPU MHz: 1999.998 BogoMIPS: 3999.99 Hypervisor vendor: KVM Virtualization type: full L1d cache: 2 MiB L1i cache: 2 MiB L2 cache: 16 MiB L3 cache: 512 MiB $ uname -r 5.4.0-72-generic $ gcc --version gcc (Ubuntu 9.3.0-17ubuntu1~20.04) 9.3.0 ``` 使用 lock-free 明顯減少執行時間,可惜隨著 thread 增加 throughtput 反而下降。 ![](https://i.imgur.com/FDMvZdh.png) 接下來參考 [eecheng87](https://hackmd.io/@eecheng/BJpGSJWq8) 的作法使用 `sched_yield()`,在 dequeue 失敗時讓出 CPU,這樣的好處是如果單一處理器上有多個 thread,執行任務的優先拿到 CPU,減少無謂的 dequeue。因此可見 lock-free queue 的版本執行時間下降,本來就是單個 thread 獨占處理器的則不影響。 ```diff /* afn_thradpool.c */ - !(dequeue(p_queue, (void **)&task) || dequeue(s_queue, (void **)&task))); + !(dequeue(p_queue, (void **)&task) || dequeue(s_queue, (void **)&task))){ + sched_yield(); +} ``` ![](https://i.imgur.com/cTcsNHq.png) - [ ] 實驗結果 II - AMD Ryzen 7 3800XT & Intel i5-6200U 由於 Linode 提供的虛擬機無法得知 CPU mapping 和實際分配的 physical core 數目,因此改用其他機器進行實驗,同時加入 likwid-topology。 [likwid-topology](https://github.com/RRZE-HPC/likwid/wiki/likwid-topology) 是一套列出 SMT thread 、快取與處理器核階層關係的工具。`HWThread` 代表在 linux 出現的 CPU 編號,也就是 htop 看到的 CPU0、CPU1...;`Thread` 則是處理器核上的 SMT Thread 編號;`Core` 代表 physical core,如下列所示。 實驗先使用 likwid 函式庫取得 cpu topology,將 thread 固定在不同的 physical core 上 (雖然從 `HWThread=0` 循序往下放效果一樣,但使用 likwid 方便日後指定在任意處理器核上實驗)。 ```shell $ likwid-topology -g -------------------------------------------------------------------------------- CPU name: AMD Ryzen 7 3800XT 8-Core Processor CPU type: nil CPU stepping: 0 ******************************************************************************** Hardware Thread Topology ******************************************************************************** Sockets: 1 Cores per socket: 8 Threads per core: 2 -------------------------------------------------------------------------------- HWThread Thread Core Socket Available 0 0 0 0 * 1 0 1 0 * 2 0 2 0 * 3 0 3 0 * 4 0 4 0 * 5 0 5 0 * 6 0 6 0 * 7 0 7 0 * 8 1 0 0 * 9 1 1 0 * 10 1 2 0 * 11 1 3 0 * 12 1 4 0 * 13 1 5 0 * 14 1 6 0 * 15 1 7 0 * -------------------------------------------------------------------------------- ``` ```c /* afn_threadpool_pi.c */ CpuTopology_t topo = get_cpuTopology(); int numSockets = topo->numSockets, numCoresPerSocket = topo->numCoresPerSocket, numHWThreads = topo->numHWThreads, cpulist[topo->numHWThreads], idx = 0; for (int socket = 0; socket < numSockets; ++socket) { for (int core = 0; core < numCoresPerSocket; ++core) { for (int i = 0; i < numHWThreads; ++i) { int threadId = topo->threadPool[i].threadId, coreId = topo->threadPool[i].coreId, packageId = topo->threadPool[i].packageId, apicId = topo->threadPool[i].apicId; if (packageId == socket && coreId == core) { cpulist[idx + threadId * (numCoresPerSocket * numSockets)] = apicId; } } idx++; } } topology_finalize(); ``` 實驗過程中發現 likwid 初始化時間頗長,為了凸顯 lock-free 和 mutuex protected 兩者實作的差異,將 thread pool 初始化和釋放排除在測量時間外([`b3cd6`](https://github.com/93i7xo2/sysprog2021q1/commit/b3cd6764f44441c75da20a3d77e58533ed445d39)),結果發現時間大幅縮短,簡短的在 i5-6200U 上測試建立及釋放時間: ![](https://i.imgur.com/rES7enO.png) ![](https://i.imgur.com/YtEXftT.png) ![](https://i.imgur.com/KHpydVm.png) 和 task 相比,可見 worker thread 的建立及釋放時間頗長,thread pool 有其必要性。再來是運算時間: - AMD R7-3800XT ![](https://i.imgur.com/XXWPQTx.png) - Intel i5-6200U ![](https://i.imgur.com/irLSMol.png) 在 lock-free 的測試中,執行時間有 pinned 與否沒差多少。而隨著 thread count 增加,lock-free 實作始終優於原始版本。 ## Pthread: Cancellation point > 取自 man-pages: - Creation: [`pthread_create()`](https://man7.org/linux/man-pages/man3/pthread_create.3.html) ```c int pthread_create(pthread_t *restrict thread, const pthread_attr_t *restrict attr, void *(*start_routine)(void *), void *restrict arg); ``` `thread`: 指向新建的 thread ID `start_routine`: thread 建立後執行的函式,參數為 `arg` ```c // Creating a new thread pthread_create(&ptid, NULL, &func, NULL); ``` - Cancellation clean-up handlers 建立方式是由 `pthread_cleanup_push()` 將將函式逐一推進堆疊中,執行時從堆疊最上方依序執行 > [`pthread_exit()`](https://man7.org/linux/man-pages/man3/pthread_exit.3.html) > Any clean-up handlers established by `pthread_cleanup_push(3)` that have not yet been popped, are popped (in the reverse of the order in which they were pushed) and executed. Clean-up handlers 作用是在執行緒結束前釋放資源,包括 - mutex - condition variables - semaphores - file descriptor 等不會在執行緒結束時釋放的資源。觸發情境有幾種: 1. 執行緒即將結束時呼叫 `pthread_exit()`,執行堆疊中所有 handlers 2. 由其他執行緒呼叫 `pthread_cancel()` 發出請求 (cancellation request),當具有 Deferred cancelability 的執行緒執行到 cancellation point 時,執行堆疊中所有 handlers 3. 執行緒呼叫 `pthread_cleanup_pop()` 從堆疊上取出最上層的 handler,可選擇執行與否,與 `pthread_cleanup_push()` 搭配使用 值得注意的是,結束執行緒若使用 `return`, handlers 將不會被呼叫,而 `return val` 意同 `pthread_exit(val)`。 > Clean-up handlers are not called if the thread terminates by performing a return from the thread start function. > Performing a return from the start function of any thread other than the main thread results in an implicit call to `pthread_exit()`, using the function's return value as the thread's exit status. - Cancellation point - 是執行緒用來檢查是否取消的時間點。"取消"一詞指的是執行緒的終止,被請求或是正常執行到最後,最終釋放所有資源,雖然 `pthread_kill()` 也能做到執行緒的終止,但不會釋放資源。 - Cancellation point 可由 `pthread_testcancel()` 進行設置,其他執行緒以 [`pthread_cancel()`](https://man7.org/linux/man-pages/man3/pthread_testcancel.3.html) 送出請求 (cancellation request),當執行緒執行到 cancellation point 時才會取消。 - 由 [`pthread_setcancelstate()`](https://man7.org/linux/man-pages/man3/pthread_setcancelstate.3.html) 進行設置 cancelability 決定觸發與否,預設是 `ENABLE` ```c pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate) ``` - 由 [`pthread_setcanceltype()`](https://man7.org/linux/man-pages/man3/pthread_setcancelstate.3.html) 設置 cancelability tpye,預設是 `DEFERRED`,意思是請求會被推遲到下一個 cancellation point,`ASYNCHRONOUS` 則是接收到請求後立刻取消。 ```c pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &oldtype) pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &oldtype) ``` 當執行緒會保留/釋放資源時,設置 `ASYNCHRONOUS`,會讓執行緒收到請求後立刻處理,無法確立資源狀態是釋放前還是釋放後,使得 clean-up handler 無法正確的處理,不建議使用。而在文件中也註明只有 compute-bound loop 的執行緒或是下列 async-cancel-safe functions 適合用 `ASYNCHRONOUS`。 - pthread_cancel() - pthread_setcancelstate() - pthread_setcanceltype() - [pthreads](https://man7.org/linux/man-pages/man7/pthreads.7.html) 明確定義哪些函式必須是/可能是 cancellation point - 必須是 - pthread_cond_timedwait() - pthread_cond_wait() - pthread_testcancel() [`pthread_cond_wait()`/`pthread_cond_timewait()`](https://man7.org/linux/man-pages/man3/pthread_cond_timedwait.3p.html) 為何是 cancellation point? 是為了防止 indefinite wait,例如等待輸入的資料從未被送出(e.g. `read()`),仍可以取消執行緒的執行。當接收到 cancellation request,如同 unblocked thread 一樣重新取得 mutex,但不是返回呼叫 `pthread_cond_wait()`/`pthread_cond_timedwait()` 的地方而是執行 clean-up handlers. - [`pthread_join`](https://man7.org/linux/man-pages/man3/pthread_join.3.html)/[`pthread_exit()`](https://man7.org/linux/man-pages/man3/pthread_exit.3.html) ```c void pthread_exit(void *retval); int pthread_join(pthread_t thread, void **retval); ``` `pthread_exit()` 於執行緒結束時呼叫,返回的數值 `retval` 供其他呼叫 `pthread_join()` 的執行緒取得。`retval` 不可存在於執行緒的 stack 上,否則產生未定義行為。 `pthread_join()` 負責將目標執行緒返回的 exit status 複製到指定地址,如果該執行緒先前已取消,則複製 `PTHREAD_CANCELED` 到指定地址。 ```c /* example */ void *app1(void *x) { pthread_exit(20); } int main() { int ret; pthread_t t1; pthread_create(&t1, NULL, app1, NULL); pthread_join(t1, &ret); return 0; } ``` ## 參考資料 - [Geoff Langdale - Lock-Free Programming](https://www.cs.cmu.edu/~410-s05/lectures/L31_LockFree.pdf) - [Acquire and Release Semantics](https://preshing.com/20120913/acquire-and-release-semantics/) - [Where does the wait queue for threads lies in POSIX pthread mutex lock and unlock?](https://stackoverflow.com/questions/25419225/where-does-the-wait-queue-for-threads-lies-in-posix-pthread-mutex-lock-and-unloc) - [Memory Reordering Caught in the Act](https://preshing.com/20120515/memory-reordering-caught-in-the-act/) - [Memory Barriers Are Like Source Control Operations](https://preshing.com/20120710/memory-barriers-are-like-source-control-operations/) - [Weak vs. Strong Memory Models](https://preshing.com/20120930/weak-vs-strong-memory-models/) - [This Is Why They Call It a Weakly-Ordered CPU](https://preshing.com/20121019/this-is-why-they-call-it-a-weakly-ordered-cpu/)