contributed by < YOYOPIG
>
linux2021
在第一次研究程式碼時,future 相關程式碼造成理解的障礙,由於習慣程式一行一行執行 (就算是 multithread 也該一行一行執行完結束…吧?),突然要理解 future 這個既存在又其實還沒做完的概念,花上不少時間。因此,在完整讀完程式前,我決定先研究一下使用 future 在程式設計中扮演的角色。這裡參考 Philipp Oppermann 的文章中的說明,整理出 multitasking 的用法與目的大致可以分為兩種:
Future 的概念,就常常用在 Cooperative Multitasking 中。當預期進入長時間任務時,與其等到完全執行完再回傳結果,不如先傳回一個暫時的結果 (當然,要設計能辨別結果完成與否的機制),讓其他 task 可以繼續執行,等到真的需要用到回傳值時,再等待該任務完成。
如上圖所示,下面的方法顯然較省時,在等待 file system I/O 時,先回傳 future file,讓 main 得以繼續執行,假設 foo() 暫時還不會用到 file,那就可以省下等待 file 的時間。這一類的 async/await 操作在一些較新的程式語言裡都有內建好了,如老師之前上課提到的 Rust,以及之前自己練習寫手機 app 時用的 Dart 等。回想當初明明有用過,卻不知道其背後原理,還以為那是 Dart 特有的酷東西,不免對自己的孤陋寡聞感到難過
這份 threadpool 實作融合了上述兩種概念。由於 C 沒有內建 async 操作,這裡透過自定義的 struct 實現:
遇到需要長時間執行的 task 時,不做等待直接回傳 tpool_future
指標,同時把實際的任務交給 thread pool 中的 thread 做計算。而真正的結果,則會透過結構中的 void *
型態指標 (適度轉型後方可 dereference) 來儲存。為了確定 task 目前的狀態,另外加入 flag 作為判斷依據,並透過 mutex lock 及 condition variable 保護 critical section。同時,為了避免上面提到 Cooperative Multitasking 可能會有因設計不良導致單一任務執行過久的問題,這裡透過 preemption 機制解決,詳見 timeout 處理。
完成了 future 的實作後,回來看到在 task 定義中的應用方式。
這裡一個 __threadtask
除了定義基本的 func
及 arg
作為應該執行什麼工作的依據外,也直接包含一個指向 __tpool_future
的指標 *future
,以便隨時回傳一個暫時的結果,之後可以再使用它的 flag 及 result 得到正確的結果。最後,作為 job queue 中的一個節點,需要 *next
指向 job queue 中的下一個 task,如此一來之後的 __jobqueue
結構只需要如往常一樣儲存 queue 的頭尾即可。由於 thread pool 中的各個 thread 會分別來尋找下一個任務,因此這裡另外加上 condition variable 監控 queue 是否為空以及 mutex lock 保護讀寫的 critical section。
最後,定義 threadpool 主體:
為了避免單一任務執行過久或是因程式撰寫不良,導致非預期的 blocking,這裡不使用一般常用的 pthread_cond_wait()
而選擇了 pthread_cond_timedwait()
,除了成功等待到 condition variable 外,也會因為設定的時間而結束等待,詳見 pthread_cond_wait(3)
在 tpool_future_get
函式中,有著對 timeout 的處理機制。在工作尚未完成時,透過計時的方式決定每個 thread 的執行時間,直到 task 完成讓 pthread_cond_timedwait 成功等待到 cond_finished,或是時間到被 preempt,把 flag 設成 __FUTURE_TIMEOUT
。同時,如果使用者未限制執行時間,則正常等待至 task 完成。因此,原先 FFF
處應填 pthread_cond_wait(&future->cond_finished, &future->mutex)
接著看有三處填空的 jobqueue_fetch
:
可以看出,這裡主要透過 while loop 持續執行,從 job queue 中找出下一個任務轉交給 thread pool 中的 thread。因此,在 loop 開頭處應檢查 queue 是否還有元素,若無則應該停下等待,GGG
應選 while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock)
。如果 queue 不為空,則找出下一個 task 開始執行。確認該 task 沒有因故被取消後,就可以把 flag 設為 __FUTURE_RUNNING
,靜待其在多執行緒環境中自動執行。在執行完畢後,該將 flag 設為已完成,即 __FUTURE_FINISHED
,交由後續的處理機制將其釋放。
因此,KKK
應選 (b) __FUTURE_FINISHED
。同時,除了將最後結果存下來,也該透過 broadcast 機制告知該 task 已完成。因此,LLL
選 (a) pthread_cond_broadcast(&task->future->cond_finished)
。
最後一個空格在 tpool_apply()
中:
由於先前的 jobqueue_fetch
會等待 jobqueue 非空的訊號,因此在 HHH
的地方需在新增 jobqueue 中第一個元素後,透過 broadcast 告知,應選 pthread_cond_broadcast(&jobqueue->cond_nonempty)
原始程式碼: Taymindis/atomic_threadpool
Taymindis/atomic_threadpool 的作者使用了 atomic 操作,實現了 lock free 版本的 thread pool。回顧這次小考的程式碼,會發現 mutex lock 主要出現在兩個地方:
比較 lock free 版本的實作:
不難看出,兩種方法其實相當相似,沒有上述兩大 critical section 的主要原因為:
at_thtask_t
預設不會回傳數值,因此並沒有實作紀錄各 thread 狀態及結果的變數,自然也沒有 critical section。為了確保 queue 的存取不會有 race condition,這裡使用了 gcc 支援的 built-in atomic operation,可以在一開始 macro definition 的地方找到:
__sync_
開頭的內建函式已標註為 legacy,應該用 Memory Model Aware Atomic Operations
接著,在插入與取出資料時,使用這些定義好的 atomic operation 取代原本的操作:
下方程式碼縮排改用 4 個空白,讓共筆的視覺效果更緊湊
這裡使用 __LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->tail, head, head)
來檢查 queue 中是否為空,__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->head, head, next)
來實際進行取出第一個元素並把 head 更新至下一個元素的動作。最後,透過 __LFQ_SYNC_MEMORY
(實際上就是 __sync_synchronize
) 作為 memory barrier,確保前後的指令不會在編譯器最佳化或處理器指令順序重組過程中被打亂。
這裡一樣是透過 __LFQ_BOOL_COMPARE_AND_SWAP(&tail->next, NULL, node)
來插入新元素至尾端,再更新 tail 位置。
有了這樣的 atomic operation,就可以確保每個 thread 在讀寫 job queue 時,不會發生未預期的執行順序進而產生錯誤的結果。基於這樣的 lock free queue,就可以實現一樣是 lock free 的 thread pool 了。
gcc 支援的 built-in atomic operation,手冊提到:
The following builtins are intended to be compatible with those described in the Intel Itanium Processor-specific Application Binary Interface, section 7.4. As such, they depart from the normal GCC practice of using the "
__builtin_
"" prefix, and further that they are overloaded such that they work on multiple types.
…
Not all operations are supported by all target processors.
這樣一來,這些 atomic operation 的使用似乎會和硬體限制有關? 不確定在 AMD 或其他處理器的平台是否能有同樣的結果,之後再設計實驗確認
TODO