共筆貢獻者: jserv, 93i7xo2, ccs100203, linD026, bakudr18
thread pool 的設計考量如下:
用醫院來比喻:
示意圖:
3 月 14 日是圓周率日,這天也是愛因斯坦的生日,求圓周率近似值的討論可見:
Gregory-Leibniz 級數可優雅地計算圓周率,參考 Leibniz's Formula for Pi。從下面的 Madhava–Leibniz series 開始推導:
首先積分下列數列
從
先看
又因為
依據夾擠定理 (squeeze theorem,也稱為 sandwich theorem),當
而且
此時將
以下是對應實作:
double compute_pi_leibniz(size_t N)
{
double pi = 0.0;
for (size_t i = 0; i < N; i++) {
double tmp = (i & 1) ? (-1) : 1;
pi += tmp / (2 * i + 1);
}
return pi * 4.0;
}
比較單執行緒、多執行緒和 SIMD 版本的表現:
1995 年提出的 Bailey–Borwein–Plouffe formula,可用這三位發表者的姓氏開頭簡稱為 BBP 公式,最初的公式如下:
BBP 公式跳脫典型的圓周率的演算法,可計算圓周率的任意第
典型的圓周率演算法必須計算前面的
以及
來說,因為每一項的計算都只跟當下的 k 值有關,所以每個獨立的項都可以平行計算,但利用公式
實作的其中一種方式如下:
double pi(size_t N) {
double pi = 1;
double n = 1;
for (size_t j = 1;j <= N; j++, n++) {
if (j & 1 == 0) {
pi *= (n / (n + 1));
} else {
pi *= ((n + 1) / n);
}
}
return 2 * pi;
}
因為算出目前項之前需要先知道前面連乘得到的值 pi
,所以無法對每一項做並行運算。以下採用 BBP 公式搭配 Thread Pool 進行驗證。
以下是一個 thread pool 實作: tpool.c
預期執行輸出:
PI calculated with 101 terms: 3.141592653589793
程式架構示意:
struct __threadpool {
size_t count;
pthread_t *workers;
jobqueue_t *jobqueue;
};
typedef struct __jobqueue {
threadtask_t *head, *tail;
pthread_cond_t cond_nonempty;
pthread_mutex_t rwlock;
} jobqueue_t;
count
紀錄 thread pool 內 worker thread (即 workers
) 的數量__jobqueue
抓取被封裝的任務 threadtask_t
執行,以 FIFO 方式取出任務,tail
指向下一個待取出的任務。__jobqueue
必須保證為 exclusive use,若無任務 worker 則以 pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock)
等待新的任務typedef struct __threadtask {
void *(*func)(void *);
void *arg;
struct __tpool_future *future;
struct __threadtask *next;
} threadtask_t;
struct __tpool_future {
int flag;
void *result;
pthread_mutex_t mutex;
pthread_cond_t cond_finished;
};
func
執行的結果放在共享變數 future
,而同時可能有其他執行緒存取 future
取得運算結果,同樣需要保護。
__threadtask
與 __tpool_future
成對建立__FUTURE_RUNNING
) 或完成 (__FUTURE_FINISHED
)__FUTURE_CANCELLED
__FUTURE_DESTROYED
tpool_create()
: 建立一定數量的 worker 於 pool,worker 執行 jobqueue_fetch()
抓取 job queue 內的任務tpool_apply()
: 將新建立的 threadtask_t
推進 job queue,每個 threadtask_t
各指向一個 __tpool_future
用以儲存運算結果。同時以 pthread_cond_broadcast()
喚醒等待 job queue 有新任務的 workertpool_future_get()
和 tpool_future_destroy()
: 從 __tpool_future
取回運算結果後釋放tpool_join()
: 呼叫 tpool_apply
新增空任務,worker 拿到空任務後結束執行緒,緊接著釋放 pool 和 jobjobqueue_fetch()
__FUTURE_CANCELLED
與 __FUTURE_DESTROYED
的使用情境
在 tpool_future_destroy()
內,若 future->flag
為 __FUTURE_FINISHED
或__FUTURE_CANCELLED
兩者之一,表示 __threadtask
已被釋放無法被 worker 拿到,故在此釋放資源。
若非以上情況則說明對應的 __threadtask
仍可存取該 future
,故將 future->flag
設置為 __FUTURE_DESTROYED
之後由 worker 來釋放 future 的資源。
jobqueue_destroy()
也有必要依據 __FUTURE_DESTROYED
來釋放 future使用到 pthread_cancel()
不代表設置 __FUTURE_CANCELLED
。pthread_cancel()
僅在 tpool_create()
建立 worker 失敗時使用,向先前建立的 worker 發送 cancellation request
/* jobqueue_fetch */
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
pthread_testcancel();
while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
jobqueue_destroy()
是唯一會將 flag 設置為 __FUTURE_CANCELLED
,猜測使用情境是想中斷所有的 worker,摧毀所有 job queue 內的任務但保留 __tpool_future
自行處理 (但並沒有使用到關鍵的 rwlock
)。
目前 jobqueue_destroy()
只有用在 tpool_join()
或是用在 tpool_create()
失敗時摧毀空 queue
int tpool_join(struct __threadpool *pool) {
...
jobqueue_destroy(pool->jobqueue);
...
}
struct __threadpool *tpool_create(size_t count) {
...
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
static void jobqueue_destroy(jobqueue_t *jobqueue) {
threadtask_t *tmp = jobqueue->head;
while (tmp) {
/* Never executed */
}
pthread_mutex_destroy(&jobqueue->rwlock);
pthread_cond_destroy(&jobqueue->cond_nonempty);
free(jobqueue);
}
總之,沒有一種情況是同時有 worker 在運行。
pthread_cond_timedwait()
的 expire time 使用 CLOCK_MONOTONIC
struct timespec expire_time;
clock_gettime(CLOCK_MONOTONIC, &expire_time);
expire_time.tv_sec += seconds;
int status = pthread_cond_timedwait(&future->cond_finished,
&future->mutex, &expire_time);
pthread_cond_timedwait
The condition variable shall have a clock attribute which specifies the clock that shall be used to measure the time specified by the abstime argument.
pthread_condattr_setclock
The default value of the clock attribute shall refer to the system clock.
因為 pthread_cond_timedwait()
需要 condition variable 具有 clock attribute,而預設的 system clock 用 pthread_condattr_getclock()
取得得到 CLOCK_REALTIME
,非預期的 CLOCK_MONOTONIC
。
CLOCK_REALTIME
會跟 NTP 校正時間,可能往前或往後;而 CLOCK_MONOTONIC
則保證單調遞增,從特定點開始計數,clock_getres 說明在 Linux 上,特定點指的是開機時間。兩者差異甚大,因此初始化時還需指定 clock attribute。
static struct __tpool_future *tpool_future_create(void) {
struct __tpool_future *future = malloc(sizeof(struct __tpool_future));
if (future) {
future->flag = 0;
future->result = NULL;
pthread_mutex_init(&future->mutex, NULL);
pthread_condattr_t attr;
pthread_condattr_init(&attr);
+ pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
pthread_cond_init(&future->cond_finished, &attr);
pthread_condattr_destroy(&attr);
}
return future;
}
在 8dc72 實現 timeout 處理,不使用 __FUTURE_CANCELLED
和 __FUTURE_DESTROYED
達成。
使用 tpool_future_get()
取得計算結果發生逾時時,嘗試從 job queue 移除 task 和 future:
__FUTURE_RUNNING
,若有代表 worker 已取得對應的 task 進行計算,現行版本尚不支援中斷進行計算中的 worker,於是等待計算結果,結果等同
tpool_future_get(futures[i], 0)
__FUTURE_RUNNING
,表示對應的 task 仍存於 jobqueue,將其連同 future 移除並釋放資源。試著從 futures[i]
最後一項開始取得計算結果,設置 time limit = 1 ms。由於計算上是由 futures[0]
開始至 futures[PRECISION]
,一開始尾端的 task 尚未被 worker 取得計算,future 自然等不到 pthread_cond_broadcast()
,因此從輸出可見一些 future 被移除。
- for (int i = 0; i <= PRECISION; i++) {
+ for (int i = PRECISION; i >= 0; i--) {
if (!futures[i])
continue;
double *result = tpool_future_get(futures[i], time_limit, pool->jobqueue);
if (result) {
bpp_sum += *result;
free(result);
tpool_future_destroy(futures[i]);
DEBUG_PRINT(("Future[%d] completed!\n", i));
} else
DEBUG_PRINT(("Cannot get future[%d], timeout after %d milliseconds.\n", i,
time_limit));
}
$ ./pi 4 1
Thread count: 4
Time limit: 1 ms
Cannot get future[100], timeout after 1 milliseconds. /* future[100] is removed */
Future[99] completed!
...
Future[0] completed!
Elapsed time: 11592788 ns
PI calculated with 101 terms: 3.141592653589793
原實作中 pop task 的方式是走訪整個 鏈結串列取出 tail ,其 time complexity 為 sudo taskset 0xF0 ./pi
以 4 core CPU 測量 PRECISION = 100000
時的執行時間,多執行幾次可發現,雖然大多數執行時間落在 160~200 milliseconds ,但偶爾會有超過 100 seconds 或更久的執行時間,以 perf record 紀錄超過 100 seconds 的 process,發現有約 83% 的時間在執行 for (tmp = jobqueue->head; tmp->next != jobqueue->tail; tmp = tmp->next);
,而越是減少可用的 CPU 此情況會更嚴重(因為當 main thread 執行時不斷 push task,若 OS scheduler 沒有頻繁 context switch 給 jobqueue_fetch
消化 task,鏈結串列會增長非常快)。
因此,為了使 pop task 達到 jobqueue
改成雙向鏈結串列,後來更是直接引入 linux/list.h 以環狀雙向鏈結串列取代原本的單向鏈結串列,如此不但使 pop task 達到 jobqueue->head == jobqueue->tail
的 if statement ,詳細程式碼可見 commit f8761 ,而修改後以 4 core 執行的平均時間約為 138 milliseconds,且沒有特別的 outliers 。
typedef struct __threadtask {
void *(*func)(void *); /* the function that task should execute */
void *arg; /* argument passed to func */
struct __tpool_future
*future; /* A structure to store task status and result */
- struct __threadtask *next; /* pointer to next task */
+ struct list_head list; /* linked list of task structure */
} threadtask_t;
typedef struct __jobqueue {
- threadtask_t *head, *tail; /* store head and tail of queue */
+ struct list_head head; /* list head of task */
pthread_cond_t
cond_nonempty; /* condition variable to check if queue is non-empty */
pthread_mutex_t rwlock; /* lock share resources like head and tail */
} jobqueue_t;
atomic_threadpool 是一套使用 lock-free FIFO queue (lfqueue_deq
) 實作的函式庫,支援 MS-Windows, macOS 與 Linux,近一半的程式碼是對不同平台所提供的 API 進行調整。
首先看較重要的函式 at_thpool_worker
,功能等同 jobqueue_fetch
,旨在從 job queue 抓取 task,task 由函式 task->do_work
及參數 task->arg
組成,由於沒有需要存入運算結果, task 運算結束後返回 TASK_PENDING
抓取下一個 task。
使用 tp->nrunning
來表示進行中的 worker 數量,設置 tp->is_running=0
能立即終止尚未執行 task 的 worker。
void *at_thpool_worker(void *_tp)
{
at_thpool_t *tp = (at_thpool_t *) _tp;
AT_THPOOL_INC(&tp->nrunning); // __sync_fetch_and_add(v, 1)
at_thtask_t *task;
void *_task;
lfqueue_t *tq = &tp->taskqueue;
TASK_PENDING:
while (tp->is_running) {
if ((_task = lfqueue_deq(tq)))
goto HANDLE_TASK;
lfqueue_sleep(1);
}
AT_THPOOL_DEC(&tp->nrunning); // __sync_fetch_and_dec(v, 1)
return NULL;
HANDLE_TASK:
task = (at_thtask_t*) _task;
task->do_work(task->args);
AT_THPOOL_FREE(task);
goto TASK_PENDING;
}
當使用 lfqueue_deq
從 job queue 從尾端取得 task 時,理想狀況其他執行緒可同時使用 lfqueue_enq
插入新的 task 而不衝突,而 lfqueue 也確實做到。
while (tp->is_running) {
if ((_task = lfqueue_deq(tq)))
goto HANDLE_TASK;
lfqueue_sleep(1);
}
lfqueue
atomic_threadpool 所使用到的 lfqueue
API,共同操作同一個 lfqueue_t 型態的 lfqueue
:
extern int lfqueue_init(lfqueue_t *lfqueue);
extern int lfqueue_enq(lfqueue_t *lfqueue, void *value);
extern void* lfqueue_deq(lfqueue_t *lfqueue);
extern void lfqueue_destroy(lfqueue_t *lfqueue);
typedef struct {
lfqueue_cas_node_t *head, *tail, *root_free, *move_free;
volatile size_t size;
volatile lfq_bool_t in_free_mode;
lfqueue_malloc_fn _malloc;
lfqueue_free_fn _free;
void *pl;
} lfqueue_t;
結構內有兩條佇列
head
/tail
: 分別指向佇列前端/尾端,head
同時也指向上一個取出的節點,root_free
/move_free
: root_free
分別指向佇列前端/尾端,紀錄等待被釋放的節點。每個 lfqueue_cas_node_t
型態的節點都帶有 lfq_time_t 型態的 _deactivate_tm
紀錄進入佇列的時間。
__lfq_check_free
static void
__lfq_check_free(lfqueue_t *lfqueue) {
lfq_time_t curr_time;
// 限制只有一條執行緒能夠執行
if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->in_free_mode, 0, 1)) {
// 以 rootfree 為起始刪除後面節點
// 並限制離 _deactivate_tm 超過 2s
lfq_get_curr_time(&curr_time);
lfqueue_cas_node_t *rtfree = lfqueue->root_free, *nextfree;
while ( rtfree && (rtfree != lfqueue->move_free) ) {
nextfree = rtfree->nextfree;
if ( lfq_diff_time(curr_time, rtfree->_deactivate_tm) > 2) {
// printf("%p\n", rtfree);
lfqueue->_free(lfqueue->pl, rtfree);
rtfree = nextfree;
} else {
break;
}
}
lfqueue->root_free = rtfree;
__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->in_free_mode, 1, 0);
}
__LFQ_SYNC_MEMORY();
}
第 9 行表明佇列至少存在一個節點,因此 move_free
不存在指向 NULL 的可能性,好處是插入時不用判斷是否指向 NULL
move_free->next = new_node;
缺點是針對程式起始、結束要特別處理多出來的一個節點,另一條佇列同樣情況。
int
lfqueue_init_mf(lfqueue_t *lfqueue, void* pl, lfqueue_malloc_fn lfqueue_malloc, lfqueue_free_fn lfqueue_free) {
...
freebase->value = NULL;
freebase->next = NULL;
freebase->nextfree = NULL;
freebase->_deactivate_tm = 0;
lfqueue->head = lfqueue->tail = base; // Not yet to be free for first node only
lfqueue->root_free = lfqueue->move_free = freebase; // Not yet to be free for first node only
...
}
void
lfqueue_destroy(lfqueue_t *lfqueue) {
...
if (rtfree) { // rtfree will never be NULL
lfqueue->_free(lfqueue->pl, rtfree);
}
...
}
__lfq_recycle_free
__lfq_check_free
稍早已提到功能是清除以 root_free 為首的佇列,而 __lfq_recycle_free
便是將節點推入佇列。因為在多執行緒下,如果直接把節點的資源釋放掉可能會在之後的操作造成錯誤。
static void
__lfq_recycle_free(lfqueue_t *lfqueue, lfqueue_cas_node_t* freenode) {
lfqueue_cas_node_t *freed;
do {
freed = lfqueue->move_free;
} while (!__LFQ_BOOL_COMPARE_AND_SWAP(&freed->nextfree, NULL, freenode) );
lfq_get_curr_time(&freenode->_deactivate_tm);
__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->move_free, freed, freenode);
}
move_free->nextfree = NULL
。lfqueue_cas_node_t
但非用 next
而是以 nextfree
進行連結。nextfree = NULL
,因此 freenode->nextfree = NULL
。__LFQ_BOOL_COMPARE_AND_SWAP
插入節點 freenode
至 move_free
後端,即限制了直到第 10 結束都沒有其他執行緒可執行下去,形成 critical section。lfqueue_enq
插入新節點
static int
_enqueue(lfqueue_t *lfqueue, void* value) {
lfqueue_cas_node_t *tail, *node;
node = (lfqueue_cas_node_t*) lfqueue->_malloc(lfqueue->pl, sizeof(lfqueue_cas_node_t));
if (node == NULL) {
perror("malloc");
return errno;
}
node->value = value;
node->next = NULL;
node->nextfree = NULL;
for (;;) {
__LFQ_SYNC_MEMORY(); // 沒有 memory barrier 會因為編譯器最佳化或是因為 out-of-order execution 無法拿到最新的 tail
tail = lfqueue->tail; // 不可能發生 tail = NULL
if (__LFQ_BOOL_COMPARE_AND_SWAP(&tail->next, NULL, node)) {
/* 由於使用 atomic operation
* 同一時間只會有一個執行緒更新 tail->next
* 接下來才會將 tail 指向新的尾端完成整個插入流程
* 另一個執行緒所拿到的 tail->next 才會是 NULL
*/
// compulsory swap as tail->next is no NULL anymore, it has fenced on other thread
__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->tail, tail, node);
__lfq_check_free(lfqueue); // cleanup
return 0;
}
}
/*It never be here*/
return -1;
}
觀察 linux 上的實作,使用到 lock-free 技巧常見的 CAS 等 atomic operation,令人不解的是大部份函式都有 full barrier 的作用,這裡特地引進 __sync_synchronize
(full barrier)。
#define __LFQ_VAL_COMPARE_AND_SWAP __sync_val_compare_and_swap
#define __LFQ_BOOL_COMPARE_AND_SWAP __sync_bool_compare_and_swap
#define __LFQ_FETCH_AND_ADD __sync_fetch_and_add // skip
#define __LFQ_ADD_AND_FETCH __sync_add_and_fetch
#define __LFQ_YIELD_THREAD sched_yield
#define __LFQ_SYNC_MEMORY __sync_synchronize
lfqueue_deq
從佇列前端移去節點,要判斷的情況較多。改寫部份程式碼和搭配註解。
head
後,原有的 head 指向的節點被其他執行緒釋放導致 head->next
產生未定義行為。實際上釋放會在 2 秒後由 __lfq_check_free
執行,發生未定義行為的可能性極微,即使加了 CAS 也不能避免 8~9 行間 __lfq_check_free
釋放掉 head
。
static void *
_dequeue(lfqueue_t *lfqueue) {
lfqueue_cas_node_t *head, *next;
void *val;
for (;;) {
head = lfqueue->head;
if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->head, head, head)) {
next = head->next;
if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->tail, head, head)) {
/* head == tail 代表無可取出節點,此時 next 必為 NULL
* 這時可能有 `lfqueue_enq` 執行插入節點產生 `next != NULL` 的情況
* 故需做判斷
*/
if (next == NULL) {
/* 遇到這種情況重新取值即可,但一些函式需要回傳值判斷 queue 為空 */
val = NULL;
goto _done;
}
}
else {
/* 目的是排除 next = NULL 的狀況,以免 dereference 出錯
* 正常情況下 next = NULL 只有在 head = tail 情況成立
*/
if (next) {
val = next->value;
if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->head, head, next)) {
break;
}
} else {
val = NULL;
goto _done;
}
}
}
}
__lfq_recycle_free(lfqueue, head);
_done:
// __asm volatile("" ::: "memory");
__LFQ_SYNC_MEMORY();
__lfq_check_free(lfqueue);
return val;
}
使用 gdb 觀察 31 行發生當下:
(gdb) p next
$14 = (lfqueue_cas_node_t *) 0x0
(gdb) p head->next
$15 = (struct lfqueue_cas_node_s *) 0x55555bf83db0
(gdb) p head
$17 = (lfqueue_cas_node_t *) 0x55555bf83d80
(gdb) p *head->next
$18 = {value = 0x555555c57490, next = 0x55555bf83de0, nextfree = 0x55555bf83de0, _deactivate_tm = 1623174704}
head = lfqueue->head;
next = head->next;
gdb 顯示 next != head->next
,明顯與上方程式碼矛盾。表明 7~10 行即使通過 __LFQ_BOOL_COMPARE_AND_SWAP
,head
可能經由其他執行緒的 enqueue,導致 next
指向其他新的節點。
此實作並未解決 ABA problem
The idea of "lock free" is not really not having any lock, the idea is to minimize the number of locks and/or critical sections, by using some techniques that allow us not to use locks for most operations.
In this sense, the lock in lock-free does not refer directly to mutexes, but rather to the possibility of “locking up” the entire application in some way, whether it’s deadlock, livelock – or even due to hypothetical thread scheduling decisions made by your worst enemy. - An Introduction to Lock-Free Programming
廣泛的說,lock-free 並非指不使用 mutex (lock) 而是指鎖住整個 process 的可能性,下方程式碼雖然沒有使用到 mutex,但若兩個 thread 執行同樣的程式碼,在特定執行順序下永遠無法離開 loop。
while (X == 0)
{
X = 1 - X;
}
而使用 mutex 的 thread 在最差情況下,所使用的 mutex 尚未被其他 thread 所釋放,而停滯不前,甚至導致 lock 之間的競爭,自然不算在 lock-free 範疇。
參考 RZHuangJeff 使用 ring buffer 實作 atomic queue。 使用 ring buffer 有無需管理記憶體及固定緩衝區大小的好處,再以 mmap 處理緩衝區邊界,減少判斷讀寫是否會超出邊界所帶來的效能影響。
設計上只有一個 producer,使用 count
來紀錄存入的資料數,consumer 依照 count
決定是否往下執行,避開判斷邊界(如下)。由於 head
, tail
都是 atomic,依據邊界來判斷需於一個 CAS 指令內對兩個變數進行操作,較為不便。
// is full
head == (tail ^ rbf->size*2)
// is empty
head == tail
為避免 ABA problem,將用來存 offset 的變數前 32-bit 放置要存入 buffer 的資料的一小部份,例如 (void *)
型態的資料為 8 byte,假設資料前 4 byte 相同不具特徵(e.g. 連續記憶體位置的指標),將後 4 byte 併入 offset 的高 4 byte,這麼做即使在 CAS 判斷 offset 相同,也能透過前 4 byte 得知資料更動。
/* producer */
bool enqueue(ringbuffer_t *rb, void **src) {
uint64_t _read, _write;
_read = atomic_load(&rb->read_offset) & RB_OFF_MASK;
_write = atomic_load(&rb->write_offset) & RB_OFF_MASK;
if (_read == (_write ^ rb->size))
return false;
memcpy(&rb->buffer[_write], src, sizeof(void *));
_write = (_write + sizeof(void *)) & rb->mask;
_write |= ((uint64_t)*src << 32);
atomic_store_explicit(&rb->write_offset, _write, memory_order_release);
atomic_fetch_add_explicit(&rb->count, 1, memory_order_release);
return true;
}
/* consumer */
bool dequeue(ringbuffer_t *rb, void **dst) {
int64_t count, new_count;
do {
count = atomic_load(&rb->count);
new_count = count - 1;
if (__builtin_expect((new_count < 0), 1))
return false;
} while (!atomic_compare_exchange_weak(&rb->count, &count, new_count));
uint64_t _read, new_read;
do {
_read = atomic_load(&rb->read_offset);
new_read = (((_read & RB_OFF_MASK) + sizeof(void *)) & rb->mask);
memcpy(dst, &rb->buffer[_read & RB_OFF_MASK], sizeof(void *));
} while (!atomic_compare_exchange_weak(&rb->read_offset, &_read, new_read));
return true;
}
Thread safety with affine thread pools 一文提到執行緒需要為在處理器核 (core) 切換間付出 context switching 及 cache refresh/invalidation 的代價,因此主張同一段程式碼應固定在同一個處理器執行。
此外,該文也提出 work stealing 的實作方式:所有 thread 應有屬於自己的 private queue 及共享的 shared queue。thread 從 private queue 提取任務執行,在閒置時則從 shared queue 提取,以最大化利用資源。
afn_threadpool.c 使用 lock-free queue 實作上述功能,達到以下需求:
pthread_setaffinity_np()
固定在不同的處理器上。afn_threadpool_pi.c 採用 afn_threadpool.c 來計算 pi。
int main(int argc, char **argv) {
...
threadpool_t *tp;
if (!tp_init(&tp, nthreads))
exit(EXIT_FAILURE);
tpool_future_t *futures[PRECISION + 1];
for (int i = 0; i <= PRECISION; i++) {
bpp_args[i] = i;
futures[i] = tp_queue(tp, bpp, (void *)&bpp_args[i]);
}
for (int i = 0; i <= PRECISION; i++) {
if (!futures[i])
continue;
double *result = tpool_future_get(futures[i], time_limit);
if (result) {
bpp_sum += *result;
free(result);
tpool_future_destroy(futures[i]);
}
}
tp_join(tp);
tp_destroy(tp);
...
}
比較 3 種 thread pool 計算
./threadpool_pi
./afn_threadpool_pi
pthread_setaffinity_np()
./afn_threadpool_pi_v2
pthread_setaffinity_np()
原始程式碼 與其執行方式:
$ make benchmark && make plot
$ lscpu
Model name: AMD EPYC 7501 32-Core Processor
Stepping: 2
CPU MHz: 1999.998
BogoMIPS: 3999.99
Hypervisor vendor: KVM
Virtualization type: full
L1d cache: 2 MiB
L1i cache: 2 MiB
L2 cache: 16 MiB
L3 cache: 512 MiB
$ uname -r
5.4.0-72-generic
$ gcc --version
gcc (Ubuntu 9.3.0-17ubuntu1~20.04) 9.3.0
使用 lock-free 明顯減少執行時間,可惜隨著 thread 增加 throughtput 反而下降。
接下來參考 eecheng87 的作法使用 sched_yield()
,在 dequeue 失敗時讓出 CPU,這樣的好處是如果單一處理器上有多個 thread,執行任務的優先拿到 CPU,減少無謂的 dequeue。因此可見 lock-free queue 的版本執行時間下降,本來就是單個 thread 獨占處理器的則不影響。
/* afn_thradpool.c */
- !(dequeue(p_queue, (void **)&task) || dequeue(s_queue, (void **)&task)));
+ !(dequeue(p_queue, (void **)&task) || dequeue(s_queue, (void **)&task))){
+ sched_yield();
+}
由於 Linode 提供的虛擬機無法得知 CPU mapping 和實際分配的 physical core 數目,因此改用其他機器進行實驗,同時加入 likwid-topology。
likwid-topology 是一套列出 SMT thread 、快取與處理器核階層關係的工具。HWThread
代表在 linux 出現的 CPU 編號,也就是 htop 看到的 CPU0、CPU1…;Thread
則是處理器核上的 SMT Thread 編號;Core
代表 physical core,如下列所示。
實驗先使用 likwid 函式庫取得 cpu topology,將 thread 固定在不同的 physical core 上 (雖然從 HWThread=0
循序往下放效果一樣,但使用 likwid 方便日後指定在任意處理器核上實驗)。
$ likwid-topology -g
--------------------------------------------------------------------------------
CPU name: AMD Ryzen 7 3800XT 8-Core Processor
CPU type: nil
CPU stepping: 0
********************************************************************************
Hardware Thread Topology
********************************************************************************
Sockets: 1
Cores per socket: 8
Threads per core: 2
--------------------------------------------------------------------------------
HWThread Thread Core Socket Available
0 0 0 0 *
1 0 1 0 *
2 0 2 0 *
3 0 3 0 *
4 0 4 0 *
5 0 5 0 *
6 0 6 0 *
7 0 7 0 *
8 1 0 0 *
9 1 1 0 *
10 1 2 0 *
11 1 3 0 *
12 1 4 0 *
13 1 5 0 *
14 1 6 0 *
15 1 7 0 *
--------------------------------------------------------------------------------
/* afn_threadpool_pi.c */
CpuTopology_t topo = get_cpuTopology();
int numSockets = topo->numSockets,
numCoresPerSocket = topo->numCoresPerSocket,
numHWThreads = topo->numHWThreads, cpulist[topo->numHWThreads], idx = 0;
for (int socket = 0; socket < numSockets; ++socket) {
for (int core = 0; core < numCoresPerSocket; ++core) {
for (int i = 0; i < numHWThreads; ++i) {
int threadId = topo->threadPool[i].threadId,
coreId = topo->threadPool[i].coreId,
packageId = topo->threadPool[i].packageId,
apicId = topo->threadPool[i].apicId;
if (packageId == socket && coreId == core) {
cpulist[idx + threadId * (numCoresPerSocket * numSockets)] = apicId;
}
}
idx++;
}
}
topology_finalize();
實驗過程中發現 likwid 初始化時間頗長,為了凸顯 lock-free 和 mutuex protected 兩者實作的差異,將 thread pool 初始化和釋放排除在測量時間外(b3cd6
),結果發現時間大幅縮短,簡短的在 i5-6200U 上測試建立及釋放時間:
和 task 相比,可見 worker thread 的建立及釋放時間頗長,thread pool 有其必要性。再來是運算時間:
AMD R7-3800XT
Intel i5-6200U
在 lock-free 的測試中,執行時間有 pinned 與否沒差多少。而隨著 thread count 增加,lock-free 實作始終優於原始版本。
取自 man-pages:
Creation: pthread_create()
int pthread_create(pthread_t *restrict thread,
const pthread_attr_t *restrict attr,
void *(*start_routine)(void *),
void *restrict arg);
thread
: 指向新建的 thread ID
start_routine
: thread 建立後執行的函式,參數為 arg
// Creating a new thread
pthread_create(&ptid, NULL, &func, NULL);
Cancellation clean-up handlers
建立方式是由 pthread_cleanup_push()
將將函式逐一推進堆疊中,執行時從堆疊最上方依序執行
pthread_exit()
Any clean-up handlers established bypthread_cleanup_push(3)
that have not yet been popped, are popped (in the reverse of the order in which they were pushed) and executed.
Clean-up handlers 作用是在執行緒結束前釋放資源,包括
等不會在執行緒結束時釋放的資源。觸發情境有幾種:
pthread_exit()
,執行堆疊中所有 handlerspthread_cancel()
發出請求 (cancellation request),當具有 Deferred cancelability 的執行緒執行到 cancellation point 時,執行堆疊中所有 handlerspthread_cleanup_pop()
從堆疊上取出最上層的 handler,可選擇執行與否,與 pthread_cleanup_push()
搭配使用值得注意的是,結束執行緒若使用 return
, handlers 將不會被呼叫,而 return val
意同 pthread_exit(val)
。
Clean-up handlers are not called if the thread terminates by performing a return from the thread start function.
Performing a return from the start function of any thread other than the main thread results in an implicit call to
pthread_exit()
, using the function's return value as the thread's exit status.
Cancellation point
是執行緒用來檢查是否取消的時間點。"取消"一詞指的是執行緒的終止,被請求或是正常執行到最後,最終釋放所有資源,雖然 pthread_kill()
也能做到執行緒的終止,但不會釋放資源。
Cancellation point 可由 pthread_testcancel()
進行設置,其他執行緒以 pthread_cancel()
送出請求 (cancellation request),當執行緒執行到 cancellation point 時才會取消。
由 pthread_setcancelstate()
進行設置 cancelability 決定觸發與否,預設是 ENABLE
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate)
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate)
由 pthread_setcanceltype()
設置 cancelability tpye,預設是 DEFERRED
,意思是請求會被推遲到下一個 cancellation point,ASYNCHRONOUS
則是接收到請求後立刻取消。
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &oldtype)
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &oldtype)
當執行緒會保留/釋放資源時,設置 ASYNCHRONOUS
,會讓執行緒收到請求後立刻處理,無法確立資源狀態是釋放前還是釋放後,使得 clean-up handler 無法正確的處理,不建議使用。而在文件中也註明只有 compute-bound loop 的執行緒或是下列 async-cancel-safe functions 適合用 ASYNCHRONOUS
。
pthreads 明確定義哪些函式必須是/可能是 cancellation point
pthread_cond_wait()
/pthread_cond_timewait()
為何是 cancellation point? 是為了防止 indefinite wait,例如等待輸入的資料從未被送出(e.g. read()
),仍可以取消執行緒的執行。當接收到 cancellation request,如同 unblocked thread 一樣重新取得 mutex,但不是返回呼叫 pthread_cond_wait()
/pthread_cond_timedwait()
的地方而是執行 clean-up handlers.
void pthread_exit(void *retval);
int pthread_join(pthread_t thread, void **retval);
pthread_exit()
於執行緒結束時呼叫,返回的數值 retval
供其他呼叫 pthread_join()
的執行緒取得。retval
不可存在於執行緒的 stack 上,否則產生未定義行為。
pthread_join()
負責將目標執行緒返回的 exit status 複製到指定地址,如果該執行緒先前已取消,則複製 PTHREAD_CANCELED
到指定地址。
/* example */
void *app1(void *x)
{
pthread_exit(20);
}
int main()
{
int ret;
pthread_t t1;
pthread_create(&t1, NULL, app1, NULL);
pthread_join(t1, &ret);
return 0;
}