contributed by < hankluo6
>
1
參考解答
FFF = ?
(a) pthread_cond_wait(&future->cond_finished, &future->mutex)
GGG = ?
(c) while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock)
HHH = ?
(b) pthread_cond_broadcast(&jobqueue->cond_nonempty)
KKK = ?
(b) __FUTURE_FINISHED
LLL = ?
(a) pthread_cond_broadcast(&task->future->cond_finished)
建立新的執行緒,運行 start_routine()
,並將 arg
傳入給 start_routine()
中。
結束正在執行的執行緒,並將要迴傳給 process 的值放置在 retval
。
等在 thread
的終止,並將 retval
設置為呼叫 pthread_exit
時給予的 retval
。
送出 cancellation 訊號給 thread
執行緒,而 thread
會根據 state
以及 type
來決定如何回應。透過 pthread_setcancelstate
以及 pthread_setcanceltype
設置 state
與 type
:
state
PTHREAD_CANCEL_ENABLE
預設行為,表示此執行緒為可被取消的 (cancelable)。PTHREAD_CANCEL_DISABLE
表示此執行緒無法被取消,當呼叫 pthread_cancel
時,process 會被 block 住,直到此執行緒自行中止或改變狀態。type
PTHREAD_CANCEL_DEFERRED
預測行為,當接收到 cancellation 訊號時,執行緒並不會馬上中止,而是會等待執行緒執行到 cancellation point
(可見下方 cancellation point 欄位) 時才會停止。PTHREAD_CANCEL_ASYNCHRONOUS
當收到 cancellation 訊號時會立刻中止。設定 function handler,在執行緒結束或某些情況下 (pthread_cancel
or pthread_exit
or pthread_cleanup_pop(no zeror)
),使用 arg
參數呼叫 routine
function。
pthread_mutex_init
以及 pthread_mutex_destroy
用來初始化與刪除互斥鎖 (mutex),pthread_mutex_lock
以及 pthread_mutex_unlock
分別對 mutex 上鎖與解鎖。
pthread_cond_init
以及 pthread_cond_destroy
用來初始化與刪除條件變數 (condition variable),而 pthread_condattr_init
與 pthread_condattr_destroy
則用來設置 condition variable 的屬性 attr
。
pthread_cond_timedwait
與 pthread_cond_wait
將互斥鎖釋放,並阻塞住該執行緒,等待其它 process 送出信號觸發此 condition variable,在收到信號時返回並重新獲得互斥鎖。其中前者 timedwait
中的參數 abstime
表示可接受阻塞的最長時間。
pthread_cond_broadcast
則送出信號喚醒所有持有條件變數 cond
的執行緒。
透過 tpool_create
建立 thread pool,並將每個執行緒放入 __threadpool *pool
中。而 jobqueue_t *jobqueue
將每個待執行的 task,以 linked list 的方式連接,當有任務要執行時,從 jobqueue
中 pop 一個任務出來運行,每個任務在程式內是以 threadtask_t
結構封裝。
從 tpool_create
建立出來的執行緒,會運行 jobqueue_fetch
函式,在 jobqueue
為空時,每個 thread 會在 pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);
內被 block 住,等待有新的 task 加入。透過 tpool_apply
將新的 task 加入到 jobqueue
中,如果此時 jobqueue
為空,則透過 pthread_cond_broadcast(&jobqueue->cond_nonempty);
送出信號給 cond_nonempty
條件變數,此時 jobqueue_fetch
內的其中一個執行緒中的 pthread_cond_wait
將會通過,並在下方取得 jobqueue
的 tail
任務。
執行完給定的 function 後,會將相關結果存放在 struct __tpool_future
中,__tpool_future
用來當作使用者可見的執行緒,透過 tpool_future_get
取得 thread 執行完的結果。如果 tpool_future_get
時任務尚未完成,將會被條件變數 future->cond_finished
給 block 住,等待任務執行完畢。
現有程式使用 pthread_cond_timedwait
處理 thread 執行過久的問題,並將 future->flag
設置為 __FUTURE_TIMEOUT
表示該任務超時,但在 tpool_join
沒有相對應的處理。當有執行緒無法完成時,將會產生問題,如以下程式碼:
所以我們需要有方法在無法退出 function 時正確離開 thread。故我在 jobqueue_fetch
內添加以下程式:
將 thread 的 cancel type 改成 asynchronous,此時便不需要經過 cancellation point 也能直接退出執行緒。
在移除 future
時也要正確處理 timeout 時的情形:
最後在 join 的時候,將所有未完成的 task 所在的執行緒皆終止:
我們在執行過程中不能知道哪些執行緒會無法退出,所以當有執行緒進入無窮迴圈時,該執行緒會一直被佔用,無法被重新利用,直到執行 tpool_join
後才能正確被釋放。
atomic_threadpool
atomic_threadppol
使用 lock-free queue 來取代此程式內的 job queue。thread pool 的實現方式與此程式一樣,透過 at_thpool_create
建立足夠的 thread 使用,每個 thread 內部執行 at_thpool_worker
並等待 queue 中有任務產生 (相當於 jobqueue_fetch
),利用 at_thpool_newtask
建立任務 (相當於 tpool_apply
) 並使用 lfqueue_enq
將任務放入 queue 當中。at_thpool_worker
利用 lfqueue_deq
將待處理任務讀出,並執行對應的函式。處理完任務的節點 (lfqueue_cas_node_t
) 會透過 __lfq_recycle_free
以及 __lfq_check_free
釋放其空間。唯一不同的是此實作的 thread pool 無法回傳值給使用者。
只要是跟 queue 有關的操作,都必須為 thread safe,防止多個執行緒同時對 queue 操作而造成錯誤,在 atomic_threadpool
中使用 gcc 的 builtin atomic functions 實現:
size
時,使用 __sync_add_and_fetch()
執行加法,便能保證每次更新的值都保持為最新狀態。tail
時,使用 __sync_bool_compare_and_swap
讓 tail->next
對於同一個 tail
只更新一次,而 lfqueue->tail
則保持在最新值。head
時,使用 __sync_bool_compare_and_swap
讓 lfqueue->head
保持在最新值。lfqueue->tail
,一樣使用 __sync_bool_compare_and_swap
實現。__LFQ_BOOL_COMPARE_AND_SWAP
設置要移除的節點,才不會重複釋放同個記憶體,並使用 __LFQ_BOOL_COMPARE_AND_SWAP
指示現在是否為 in_free_mode
,防止多個執行緒同時進行移除。TODO
linux2021