contributed by < 93i7xo2
>
Source: 2021q1 第 4 週測驗題
count
紀錄 thread pool 內 worker thread (即 workers
) 的數量__jobqueue
抓取被封裝的任務 threadtask_t
執行,以 FIFO 方式取出任務,tail
指向下一個待取出的任務。__jobqueue
必須保證為 exclusive use,若無任務 worker 則以 pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock)
等待新的任務func
執行的結果放在共享變數 future
,而同時可能有其他執行緒存取 future
取得運算結果,同樣需要保護。
__threadtask
與 __tpool_future
成對建立__FUTURE_RUNNING
) 或完成 (__FUTURE_FINISHED
)__FUTURE_CANCELLED
__FUTURE_DESTROYED
tpool_create()
: 建立一定數量的 worker 於 pool,worker 執行 jobqueue_fetch()
抓取 job queue 內的任務tpool_apply()
: 將新建立的 threadtask_t
推進 job queue,每個 threadtask_t
各指向一個 __tpool_future
用以儲存運算結果。同時以 pthread_cond_broadcast()
喚醒等待 job queue 有新任務的 workertpool_future_get()
和 tpool_future_destroy()
: 從 __tpool_future
取回運算結果後釋放tpool_join()
: 呼叫 tpool_apply
新增空任務,worker 拿到空任務後結束執行緒,緊接著釋放 pool 和 jobjobqueue_fetch()
__FUTURE_CANCELLED
與 __FUTURE_DESTROYED
的使用情境
在 tpool_future_destroy()
內,若 future->flag
為 __FUTURE_FINISHED
或__FUTURE_CANCELLED
兩者之一,表示 __threadtask
已被釋放無法被 worker 拿到,故在此釋放資源。
若非以上情況則說明對應的 __threadtask
仍可存取該 future
,故將 future->tag
設置為 __FUTURE_DESTROYED
之後由 worker 來釋放 future 的資源。
jobqueue_destroy()
也有必要依據 __FUTURE_DESTROYED
來釋放 future使用到 pthread_cancel()
不代表設置 __FUTURE_CANCELLED
。pthread_cancel()
僅在 tpool_create()
建立 worker 失敗時使用,向先前建立的 worker 發送 cancellation request
jobqueue_destroy()
是唯一會將 flag 設置為 __FUTURE_CANCELLED
,猜測使用情境是想中斷所有的 worker,摧毀所有 job queue 內的任務但保留 __tpool_future
自行處理 (但並沒有使用到關鍵的 rwlock
)。
目前 jobqueue_destroy()
只有用在 tpool_join()
或是用在 tpool_create()
失敗時摧毀空 queue
總之,沒有一種情況是同時有 worker 在運行。
pthread_cond_timedwait()
的 expire time 使用 CLOCK_MONOTONIC
pthread_cond_timedwait
The condition variable shall have a clock attribute which specifies the clock that shall be used to measure the time specified by the abstime argument.
pthread_condattr_setclock
The default value of the clock attribute shall refer to the system clock.
因為 pthread_cond_timedwait()
需要 condition variable 具有 clock attribute,而預設的 system clock 用 pthread_condattr_getclock()
取得得到 CLOCK_REALTIME
,非預期的 CLOCK_MONOTONIC
。
CLOCK_REALTIME
會跟 NTP 校正時間,可能往前或往後;而 CLOCK_MONOTONIC
則保證單調遞增,從特定點開始計數,clock_getres 說明在 Liunx 上,特定點指的是開機時間。兩者差異甚大,因此初始化時還需指定 clock attribute。
在 8dc72 實現 timeout 處理,不使用 __FUTURE_CANCELLED
和 __FUTURE_DESTROYED
達成。
TODO: 改用 Linux eventfd/timerfd 來實作 timeout 機制
jserv
使用 tpool_future_get()
取得計算結果發生逾時時,嘗試從 job queue 移除 task 和 future:
__FUTURE_RUNNING
,若有代表 worker 已取得對應的 task 進行計算,現行版本尚不支援中斷進行計算中的 worker,於是等待計算結果,結果等同
__FUTURE_RUNNING
,表示對應的 task 仍存於 jobqueue,將其連同 future 移除並釋放資源。試著從 futures[i]
最後一項開始取得計算結果,設置 time limit = 1 ms。由於計算上是由 futures[0]
開始至 futures[PRECISION]
,一開始尾端的 task 尚未被 worker 取得計算,future 自然等不到 pthread_cond_broadcast()
,因此從輸出可見一些 future 被移除。
基於 atomic_threadpool 改寫
最初的想法是每個 task 各維護 eventfd 產生的 file descriptor (efd
),執行完成時透過 write()
通知 main thread 該 task 完成,這樣做的缺點是 efd
數量一多容易到達上限:
所以改成 worker thread 各共用一個 epoll file descriptor (epfd
),而 task 完成時 worker thread 向 epfd
註冊新的 efd
,其內部 counter 值為 task 內部的 id (>0),以便 main thread 辨識。當然 counter 一開始初始化為 0,寫入 counter 的同時觸發 EPOLLIN event:
改用 eventfd 還能拿掉麻煩的 mutex!
接下來思考 main thread 的 Timeout 機制實作方式。
首先移除作為 worker thread 和 main thread 溝通的工具-tpool_future_t
,以 eventfd 通知 main thread,並從 main thread 通知 worker thread 取消 task。取消 task 的方式:
pthread_cancel()
發送,但這樣需要從 task queue 拿出特定 task,也需要重建 worker thread,由實驗可知建立 thread 成本相當巨大(相對於 task)。而等待 eventfd event 的同時等待 timerfd event。接收到 timerfd event 則依序檢查 task 是否完成,若無,透過 tp_task_cancel
向 worker thread 通知取消;接收到 eventfd event 則將其運算結果累加至 bpp_sum
。
整合上述幾點的實作 xxxxx,測試時發現 -O3
會發生預期外的結果,索幸用 -O0
進行測試:
於 i5-6200U 上測試,統一設定 timeout = 1ms,與未改用 eventfd 的版本相比效能大減。
atomic_threadpool 是一套使用 lock-free FIFO queue (lfqueue_deq
) 實作的函式庫,支援 MS-Windows, macOS 與 Linux,近一半的程式碼是對不同平台所提供的 API 進行調整。
首先看較重要的函式 at_thpool_worker
,功能等同 jobqueue_fetch
,旨在從 job queue 抓取 task,task 由函式 task->do_work
及參數 task->arg
組成,由於沒有需要存入運算結果, task 運算結束後返回 TASK_PENDING
抓取下一個 task。
使用 tp->nrunning
來表示進行中的 worker 數量,設置 tp->is_running=0
能立即終止尚未執行 task 的 worker。
當使用 lfqueue_deq
從 job queue 從尾端取得 task 時,理想狀況其他執行緒可同時使用 lfqueue_enq
插入新的 task 而不衝突,而 lfqueue 也確實做到。
lfqueue
atomic_threadpool 所使用到的 lfqueue
API,共同操作同一個 lfqueue_t 型態的 lfqueue
:
結構內有兩條佇列
head
/tail
: 分別指向佇列前端/尾端,head
同時也指向上一個取出的節點,root_free
/move_free
: root_free
分別指向佇列前端/尾端,紀錄等待被釋放的節點。每個 lfqueue_cas_node_t
型態的節點都帶有 lfq_time_t 型態的 _deactivate_tm
紀錄進入佇列的時間。
__lfq_check_free
第 9 行表明佇列至少存在一個節點,因此 move_free
不存在指向 NULL 的可能性,好處是插入時不用判斷是否指向 NULL
缺點是針對程式起始、結束要特別處理多出來的一個節點,另一條佇列同樣情況。
__lfq_recycle_free
__lfq_check_free
稍早已提到功能是清除以 root_free 為首的佇列,而 __lfq_recycle_free
便是將節點推入佇列。
move_free->nextfree = NULL
。lfqueue_cas_node_t
但非用 next
而是以 nextfree
進行連結。nextfree = NULL
,因此 freenode->nextfree = NULL
。__LFQ_BOOL_COMPARE_AND_SWAP
插入節點 freenode
至 move_free
後端,即限制了直到第 10 結束都沒有其他執行緒可執行下去,形成 critical section。lfqueue_enq
插入新節點
觀察 linux 上的實作,使用到 lock-free 技巧常見的 CAS 等 atomic operation,令人不解的是大部份函式都有 full barrier 的作用,這裡特地引進 __sync_synchronize
(full barrier)。
__sync
開頭的 builtins 已被 gcc 標註為 deprecated,可改用 gcc atomics builtins 或 C11 Atomics。 jserv
lfqueue_deq
從佇列前端移去節點,要判斷的情況較多。改寫部份程式碼和搭配註解。
head
後,原有的 head 指向的節點被其他執行緒釋放導致 head->next
產生未定義行為。實際上釋放會在 2 秒後由 __lfq_check_free
執行,發生未定義行為的可能性極微,即使加了 CAS 也不能避免 8~9 行間 __lfq_check_free
釋放掉 head
。使用 gdb 觀察 31 行發生當下:
gdb 顯示 next != head->next
,明顯與上方程式碼矛盾。表明 7~10 行即使通過 __LFQ_BOOL_COMPARE_AND_SWAP
,head
可能經由其他執行緒的 enqueue,導致 next
指向其他新的節點。
不會採用此方式實作 thread pool,因為沒有看到解決 ABA problem 的方法。
The idea of "lock free" is not really not having any lock, the idea is to minimize the number of locks and/or critical sections, by using some techniques that allow us not to use locks for most operations.
In this sense, the lock in lock-free does not refer directly to mutexes, but rather to the possibility of “locking up” the entire application in some way, whether it’s deadlock, livelock – or even due to hypothetical thread scheduling decisions made by your worst enemy. - An Introduction to Lock-Free Programming
廣泛的說,lock-free 並非指不使用 mutex (lock) 而是指鎖住整個 process 的可能性,下方程式碼雖然沒有使用到 mutex,但若兩個 thread 執行同樣的程式碼,在特定執行順序下永遠無法離開 loop。
而使用 mutex 的 thread 在最差情況下,所使用的 mutex 尚未被其他 thread 所釋放,而停滯不前,自然不算在 lock-free 範疇。
lock-free 所使用到的 do{...}while(CAS)
雖然是 spinlock,但能節省 mutex 所需的 syscall,一般認為效能更好。
參考 RZHuangJeff 使用 ring buffer 實作 atomic queue。 使用 ring buffer 有無需管理記憶體及固定緩衝區大小的好處,再以 mmap 處理緩衝區邊界,減少判斷讀寫是否會超出邊界所帶來的效能影響。
設計上只有一個 producer,使用 count
來紀錄存入的資料數,consumer 依照 count
決定是否往下執行,避開判斷邊界(如下)。由於 head
, tail
都是 atomic,依據邊界來判斷需於一個 CAS 指令內對兩個變數進行操作,較為不便。
為避免 ABA problem,將用來存 offset 的變數前 32-bit 放置要存入 buffer 的資料的一小部份,例如 (void*) 型態的資料為 8 byte,假設資料前 4 byte 相同不具特徵(e.g.連續記憶體位置的指標),將後 4 byte 併入 offset 的高 4 byte,這麼做即使在 CAS 判斷 offset 相同,也能透過前 4 byte 得知資料更動。
實作 lock-free 時找到期末專題提及一篇參考資料-Thread safety with affine thread pools,便一並實作。該文提到 thread 需要為在處理器核 (core) 切換間付出 context switching 及 cache refresh/invalidation 的代價,因此主張同一段程式碼應固定在同一個處理器執行。
此外,也提出 work stealing 的實作方式:所有 thread 應有屬於自己的 private queue 及共享的 shared queue。thread 從 private queue 提取任務執行,在閒置時則從 shared queue 提取,以最大化利用資源。
afn_threadpool.c 使用 lock-free queue 實作上述功能,達到以下需求:
pthread_setaffinity_np()
固定在不同的處理器上。afn_threadpool_pi.c 採用 afn_threadpool.c 來計算 pi。
比較 3 種 thread pool 計算 (PRECISION=1000) 所需時間
./threadpool_pi
./afn_threadpool_pi
pthread_setaffinity_np()
./afn_threadpool_pi_v2
pthread_setaffinity_np()
執行方式
實驗結果 I - Linode Dedicated CPU Instances (32 cores/64G RAM)
使用 lock-free 明顯減少執行時間,可惜隨著 thread 增加 throughtput 反而下降。
接下來參考 eecheng87 的作法使用 sched_yield()
,在 dequeue 失敗時讓出 CPU,這樣的好處是如果單一處理器上有多個 thread,執行任務的優先拿到 CPU,減少無謂的 dequeue。因此可見 lock-free queue 的版本執行時間下降,本來就是單個 thread 獨占處理器的則不影響。
實驗結果 II - AMD Ryzen 7 3800XT & Intel i5-6200U
由於 Linode 提供的虛擬機無法得知 CPU mapping 和實際分配的 physical core 數目,因此改用其他機器進行實驗,同時加入 likwid-topology。
likwid-topology 是一套列出 SMT thread 、快取與處理器核階層關係的工具。HWThread
代表在 linux 出現的 CPU 編號,也就是 htop 看到的 CPU0、CPU1…;Thread
則是處理器核上的 SMT Thread 編號;Core
代表 physical core,如下列所示。
實驗先使用 likwid 函式庫取得 cpu topology,將 thread 固定在不同的 physical core 上 (雖然從 HWThread=0
循序往下放效果一樣,但使用 likwid 方便日後指定在任意處理器核上實驗)。
實驗過程中發現 likwid 初始化時間頗長,為了凸顯 lock-free 和 mutuex protected 兩者實作的差異,將 thread pool 初始化和釋放排除在測量時間外(b3cd6
),結果發現時間大幅縮短,簡短的在 i5-6200U 上測試建立及釋放時間:
和 task 相比,可見 worker thread 的建立及釋放時間頗長,thread pool 有其必要性。再來是運算時間:
AMD R7-3800XT
Intel i5-6200U
在 lock-free 的測試中,執行時間有 pinned 與否沒差多少。而隨著 thread count 增加,lock-free 實作始終優於原始版本。
取自 man-pages:
Creation: pthread_create()
thread
: 指向新建的 thread ID
start_routine
: thread 建立後執行的函式,參數為 arg
Cancellation clean-up handlers
建立方式是由 pthread_cleanup_push()
將將函式逐一推進堆疊中,執行時從堆疊最上方依序執行
pthread_exit()
Any clean-up handlers established bypthread_cleanup_push(3)
that have not yet been popped, are popped (in the reverse of the order in which they were pushed) and executed.
Clean-up handlers 作用是在執行緒結束前釋放資源,包括
等不會在執行緒結束時釋放的資源。觸發情境有幾種:
pthread_exit()
,執行堆疊中所有 handlerspthread_cancel()
發出請求 (cancellation request),當具有 Deferred cancelability 的執行緒執行到 cancellation point 時,執行堆疊中所有 handlerspthread_cleanup_pop()
從堆疊上取出最上層的 handler,可選擇執行與否,與 pthread_cleanup_push()
搭配使用值得注意的是,結束執行緒若使用 return
, handlers 將不會被呼叫,而 return val
意同 pthread_exit(val)
。
Clean-up handlers are not called if the thread terminates by performing a return from the thread start function.
Performing a return from the start function of any thread other than the main thread results in an implicit call to
pthread_exit()
, using the function's return value as the thread's exit status.
Cancellation point
是執行緒用來檢查是否取消的時間點。"取消"一詞指的是執行緒的終止,被請求或是正常執行到最後,最終釋放所有資源,雖然 pthread_kill()
也能做到執行緒的終止,但不會釋放資源。
Cancellation point 可由 pthread_testcancel()
進行設置,其他執行緒以 pthread_cancel()
送出請求 (cancellation request),當執行緒執行到 cancellation point 時才會取消。
由 pthread_setcancelstate()
進行設置 cancelability 決定觸發與否,預設是 ENABLE
由 pthread_setcanceltype()
設置 cancelability tpye,預設是 DEFERRED
,意思是請求會被推遲到下一個 cancellation point,ASYNCHRONOUS
則是接收到請求後立刻取消。
當執行緒會保留/釋放資源時,設置 ASYNCHRONOUS
,會讓執行緒收到請求後立刻處理,無法確立資源狀態是釋放前還是釋放後,使得 clean-up handler 無法正確的處理,不建議使用。而在文件中也註明只有 compute-bound loop 的執行緒或是下列 async-cancel-safe functions 適合用 ASYNCHRONOUS
。
pthreads 明確定義哪些函式必須是/可能是 cancellation point
pthread_cond_wait()
/pthread_cond_timewait()
為何是 cancellation point? 是為了防止 indefinite wait,例如等待輸入的資料從未被送出(e.g. read()
),仍可以取消執行緒的執行。當接收到 cancellation request,如同 unblocked thread 一樣重新取得 mutex,但不是返回呼叫 pthread_cond_wait()
/pthread_cond_timedwait()
的地方而是執行 clean-up handlers.
pthread_exit()
於執行緒結束時呼叫,返回的數值 retval
供其他呼叫 pthread_join()
的執行緒取得。retval
不可存在於執行緒的 stack 上,否則產生未定義行為。
pthread_join()
負責將目標執行緒返回的 exit status 複製到指定地址,如果該執行緒先前已取消,則複製 PTHREAD_CANCELED
到指定地址。
epoll_ctl
This system call is used to add, modify, or remove entries in the
interest list of the epoll(7) instance referred to by the file
descriptor epfd. It requests that the operation op be performed
for the target file descriptor, fd.
Valid values for the op argument are:
EPOLL_CTL_ADD
Add an entry to the interest list of the epoll file
descriptor, epfd. The entry includes the file descriptor,
fd, a reference to the corresponding open file description
(see epoll(7) and open(2)), and the settings specified in
event.
eventfd
eventfd() creates an "eventfd object" that can be used as an event wait/notify mechanism by user-space applications, and by the kernel to notify user-space applications of events. The object contains an unsigned 64-bit integer (
uint64_t
) counter that is maintained by the kernel. This counter is initialized with the value specified in the argumentinitval
.
The file descriptor is readable (the select(2) readfds argument; the poll(2) POLLIN flag) if the counter has a value greater than 0.
pthread_exit(NULL)
會造成 valgrind 誤判資源未釋放,可改用 return NULL