contributed by < ccs100203
>
linux2021
此測驗利用 pthread 實作了一個 thread pool 程式,並使用 Gregory-Leibniz 級數 來計算圓周率。
int pthread_create(pthread_t *restrict thread,
const pthread_attr_t *restrict attr,
void *(*start_routine)(void *),
void *restrict arg);
建立新的 thread 來執行 start_routine
,而 arg
是 start_routine
的參數。
根據 man page
The new thread terminates in one of the following ways:
1. It calls pthread_exit(3), specifying an exit status value that
is available to another thread in the same process that calls
pthread_join(3).
2. It returns from start_routine(). This is equivalent to calling
pthread_exit(3) with the value supplied in the return
statement.
3. It is canceled (see pthread_cancel(3)).
4. Any of the threads in the process calls exit(3), or the main
thread performs a return from main(). This causes the
termination of all threads in the process.
建立出的 thread 會在下述情況時終止:
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr);
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
初始化一個 mutex,attr
為 NULL 的話會設定為 default mutex。
int pthread_mutex_destroy(pthread_mutex_t *mutex);
使一個 mutex 回到未被初始化的狀態。
根據 man page
It shall be safe to destroy an initialized mutex that is unlocked. Attempting to destroy a locked mutex, or a mutex that another thread is attempting to lock, or a mutex that is being used in a pthread_cond_timedwait() or pthread_cond_wait() call by another thread, results in undefined behavior.
嘗試 destroy 一個 unlocked 的 mutex 應該是安全的,但是如果 destroy 一個 locked mutex 或是已經被 pthread_cond_timedwait() 或 pthread_cond_wait() 等待的 mutex,是一個 undefined behavior。
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
與上面的 mutex 操作相似,對一個 condition variable 進行初始化或是回到未被初始化的狀態。
同樣要注意的是如果嘗試 destroy 一個被其他 thread block 的 condition variable,也屬於 undefined behavior。
int pthread_condattr_destroy(pthread_condattr_t *attr);
int pthread_condattr_init(pthread_condattr_t *attr);
對 condition variable attribute 進行初始化或是回到未被初始化的狀態。
Results are undefined if pthread_condattr_init() is called specifying an already initialized attr attributes object.
如果對一個已經 initialized 的 attr 做 pthread_condattr_init() 會是 undefined behavior,亦即不能夠 double initialize。
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
lock 與 trylock 最大的差別在於,lock 一個 locked mutex,該 thread 會 block,一直到取得 lock。而 trylock 一個 locked mutex,該 thread 會立即回傳而不會 block。
int pthread_cancel(pthread_t thread);
int pthread_setcancelstate(int state, int *oldstate);
int pthread_setcanceltype(int type, int *oldtype);
呼叫 cancel 時的行為會根據 state 跟 type 而有所差別:
當一個 cancel 請求執行時,會執行下列的指令:
- Cancellation clean-up handlers are popped (in the reverse of
the order in which they were pushed) and called. (See pthread_cleanup_push(3).)
2. Thread-specific data destructors are called, in an unspecified order. (See pthread_key_create(3).)
3. The thread is terminated. (See pthread_exit(3).
而 pthread_join 是唯一的方法去得知一個 cancellation 有被完成。
void pthread_testcancel(void);
會建立一個 cancellation point,對應到上述的 DEFER state,就可以讓原本還不能 cancel 的請求在此時執行。
void pthread_cleanup_push(void (*routine)(void *), void *arg);
void pthread_cleanup_pop(int execute);
These functions manipulate the calling thread's stack of thread-cancellation clean-up handlers. A clean-up handler is a function that is automatically executed when a thread is canceled
這兩個函式可用來操作一個叫做 clean-up handler 的 stack,裡面的函式會在 thread 被 cancel 時自動執行。
arg
就是該 routine 執行時的參數。execute
不為 0 的話就會執行該 routine。要注意如果是使用 pthread_exit() 來終止 thread,則他的 clean-up handlers 會全部執行,但如果是用 return 的方式終止則不會呼叫 clean-up handlers。
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex,
const struct timespec *restrict abstime);
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
這兩個函式會將傳入的 mutex 在 atomic 的情況下釋放,並使 thread block 在該 cond
。注意 mutex
需要已經被呼叫的 thread lock,函式才能正常運作。
而 timewait 則是多了一個 abstime
,如果 thread 被 block 超過此時間的話就會回傳 error。
同樣注意 mutex
需要已經被呼叫的 thread lock,函式才能正常運作。
int pthread_join(pthread_t thread, void **retval);
會一直等待傳入的 thread 直到其 terminate。
如果 retval
是 NULL,會將 pthread_exit 中的 retval
複製進這裡的 retval
,如果傳入的 thread
被 cancel 了,則會在 retval
放入 PTHREAD_CANCELED,這也對應到上面所述 只有 pthread_join 能夠知道 cancel 是否完成。
noreturn void pthread_exit(void *retval);
終止呼叫此函式的 thread,並執行在 clean-up handlers 內的 routines,而傳入的retval
可利用 pthread_join 取得。
TODO
what's ROBUST mutex
上圖是 thread pool 的架構簡圖
worker
會存放數量為 count
的 pthread,且 start routine 為jobqueue_fetch()
jobqueue
則是對應到 jobqueue_t
*head
與 *tail
維持一條 task 的 linked list,會存放目前有什麼 task 需要給 pthread 執行cond_noempty
是代表 task 不為空的 condition variable。rwlock
是用來保證對 lisk 操作的正確性*func
與 arg
為存放的 task,這邊就是存放 bpp()
*future
用來指向一個 __tpool_future
,表示此 task 當前的狀態*next
則是用來指向下一個 task 的指標flag
為 future 的當前狀態*result
存放執行結果mutex
確保操作的正確性,對其餘 member 進行操作時都要取得 mutexcond_finished
是代表運算完成的 condition variable一開始會先用 tpool_create()
創造一個 thread pool。再來用 tpool_apply()
將參數中傳入的 func
放入 task 的 linked list 內,以便讓 thread pool 中的 thread 提取來做。接著呼叫 tpool_future_get
來提取運算的結果,最後會利用 tpool_join
在 destroy 掉 thread pool 之前確保所有 task 都不是處於 pending。
static jobqueue_t *jobqueue_create(void)
{
jobqueue_t *jobqueue = malloc(sizeof(jobqueue_t));
if (jobqueue) {
jobqueue->head = jobqueue->tail = NULL;
pthread_cond_init(&jobqueue->cond_nonempty, NULL);
pthread_mutex_init(&jobqueue->rwlock, NULL);
}
return jobqueue;
}
struct __threadpool *tpool_create(size_t count)
{
jobqueue_t *jobqueue = jobqueue_create();
struct __threadpool *pool = malloc(sizeof(struct __threadpool));
if (!jobqueue || !pool) {
if (jobqueue)
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
pool->count = count, pool->jobqueue = jobqueue;
if ((pool->workers = malloc(count * sizeof(pthread_t)))) {
for (int i = 0; i < count; i++) {
if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch,
(void *) jobqueue)) {
for (int j = 0; j < i; j++)
pthread_cancel(pool->workers[j]);
for (int j = 0; j < i; j++)
pthread_join(pool->workers[j], NULL);
free(pool->workers);
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
}
return pool;
}
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
jobqueue_create
會建立一個 jobqueue_t
並對裡頭的 mutex, condition variable 做初始化。
而 tpool_create
會建立一個 thread pool。
第 24 行的 if block,根據 count
去配置 pthread 所需要的空間並且同時做檢查,再來會逐一的呼叫 pthread_create 將 jubqueue_fetch 放入 thread 的 start routine。
此時同樣利用 if 去做檢查,因為 pthread_create 的 return value 為 0 時代表成功。所以 if 通過的話代表 create 出錯,此時會利用 pthread_cancel
取消前面已經 create 的 thread,從文章一開始的 api 解釋中可以找到這是其中一種 terminate thread 的方法,因為 pthread_cancel
會去自動處理 clean-up handlers 內的 routine,所以可以藉此來一起釋放 lock (因為 jobqueue_fetch
內會取得 rwlock)。
再來用 pthread_join
確保所有的 cancel 已經完成 (唯一的方式),在將其餘空間釋放掉即可。
static void *jobqueue_fetch(void *queue)
{
jobqueue_t *jobqueue = (jobqueue_t *) queue;
threadtask_t *task;
int old_state;
pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);
while (1) {
pthread_mutex_lock(&jobqueue->rwlock);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
pthread_testcancel();
while (!jobqueue->tail)
pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
if (jobqueue->head == jobqueue->tail) {
task = jobqueue->tail;
jobqueue->head = jobqueue->tail = NULL;
} else {
threadtask_t *tmp;
for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
tmp = tmp->next)
;
task = tmp->next;
tmp->next = NULL;
jobqueue->tail = tmp;
}
pthread_mutex_unlock(&jobqueue->rwlock);
if (task->func) {
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_CANCELLED) {
pthread_mutex_unlock(&task->future->mutex);
free(task);
continue;
} else {
task->future->flag |= __FUTURE_RUNNING;
pthread_mutex_unlock(&task->future->mutex);
}
void *ret_value = task->func(task->arg);
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_DESTROYED) {
pthread_mutex_unlock(&task->future->mutex);
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
} else {
task->future->flag |= __FUTURE_FINISHED;
task->future->result = ret_value;
pthread_cond_broadcast(&task->future->cond_finished);
pthread_mutex_unlock(&task->future->mutex);
}
free(task);
} else {
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
free(task);
break;
}
}
pthread_cleanup_pop(0);
pthread_exit(NULL);
}
pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);
就是上面的 create 使用 pthread_cancel
而不是 pthread_exit
的原因,因為可以在 terminate 之前先將 thread 所拿的 rwlock 也一併釋放掉。
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
pthread_testcancel();
上面兩行就是為了配合 pthread_cancel
的使用,所以加入的程式碼。
while (!jobqueue->tail)
pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);
這裡可以看到在 task 的 linked list 還屬於 empty 的狀態時,這些 thread 會釋放掉 rwlock 並等待 cond_nonempty
,一直到被喚醒。而利用 while (!jobqueue->tail)
的原因我認為是因為如果同時有多個 thread 在等待 cond_nonempty
,在放入一個 task 之後會喚醒所有的 thread,但只有一個 thread 能拿到 task 並執行,其餘的需要重新去等待又空了的 task list。
if (jobqueue->head == jobqueue->tail) {
task = jobqueue->tail;
jobqueue->head = jobqueue->tail = NULL;
} else {
threadtask_t *tmp;
for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
tmp = tmp->next)
;
task = tmp->next;
tmp->next = NULL;
jobqueue->tail = tmp;
}
pthread_mutex_unlock(&jobqueue->rwlock);
會從 linked list 的最後面拿取 task,由於越先插入的 task 會在越後面,所以這是一種 FIFO 的設計。如果已經拿到最後一個 task 的話要將 head
跟 tail
都指向 NULL。
由於對 linked list 的操作已經結束了,所以這時就可以將 rwlock
釋放。
if (task->func) {
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_CANCELLED) {
pthread_mutex_unlock(&task->future->mutex);
free(task);
continue;
} else {
task->future->flag |= __FUTURE_RUNNING;
pthread_mutex_unlock(&task->future->mutex);
}
void *ret_value = task->func(task->arg);
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_DESTROYED) {
pthread_mutex_unlock(&task->future->mutex);
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
} else {
task->future->flag |= __FUTURE_FINISHED;
task->future->result = ret_value;
pthread_cond_broadcast(&task->future->cond_finished);
pthread_mutex_unlock(&task->future->mutex);
}
free(task);
} else {
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
free(task);
break;
}
如果 func
不是 NULL 代表需要執行他,要注意如果需要讀寫 flag
時都需要先取得 mutex
,一開始會先判斷該 task 是否已經為__FUTURE_CANCELLED
,是的話就會將 task 釋放,不是的話就會為他加上 __FUTURE_RUNNING
的狀態。
會在第 12 行執行該 func
。
再來會根據 flag
是 destroyed 或 finished 而有不同的操作
func
已經算出結果,會將結果放到 result
中,並通過 pthread_cond_broadcast
讓正在等待的 thread 知道計算已經完成。如果 func
是 NULL,這邊是配合著 tpool_join
使用,會藉此將 future
所佔用的資源釋放並跳出迴圈。
pthread_cleanup_pop(0);
pthread_exit(NULL);
上面的兩行程式碼是在 while loop 的外部,只有在 func
為 NULL 時,才能夠藉由最後的 break
而執行到。
pthread_cleanup_pop
是用來將一開始 push 進去 handler 的 routine pop 掉 (並不會執行),因為一開始 push 進去是為了在 create 的階段如果出錯,會藉著 pthread_cancel
將 rwlock 釋放掉。如今 rwlock 已經在正常的執行下釋放了,所以就要在這裡把 routine 移除掉。
最後在利用 pthread_exit
將該 thread terminate
此函式的目的在於將傳入的 func
接上 task linked list,就像是作為提供給 thread 工作的 producer。
struct __tpool_future *tpool_apply(struct __threadpool *pool,
void *(*func)(void *),
void *arg)
{
jobqueue_t *jobqueue = pool->jobqueue;
threadtask_t *new_head = malloc(sizeof(threadtask_t));
struct __tpool_future *future = tpool_future_create();
if (new_head && future) {
new_head->func = func, new_head->arg = arg, new_head->future = future;
pthread_mutex_lock(&jobqueue->rwlock);
if (jobqueue->head) {
new_head->next = jobqueue->head;
jobqueue->head = new_head;
} else {
jobqueue->head = jobqueue->tail = new_head;
pthread_cond_broadcast(&jobqueue->cond_nonempty);
}
pthread_mutex_unlock(&jobqueue->rwlock);
} else if (new_head) {
free(new_head);
return NULL;
} else if (future) {
tpool_future_destroy(future);
return NULL;
}
return future;
}
在第 8 行確保 threadtask_t
與 __tpool_future
的創建都沒問題後,會逐一的將參數放置進 struct 內的位置。要注意到若是要對 task linked list 進行操作,都需要先拿取 rwlock
。
第 11 行時已取得 rwlock
,而此時就要將新的 task 插入 list 的 head,唯一要注意的是如果原本的 task 為 empty,那要進行 pthread_cond_broadcast(&jobqueue->cond_nonempty);
來通知 pool 內正在等待的 thread 可以拿取 task 來做了。
此函式用來拿取 task 的執行結果
void *tpool_future_get(struct __tpool_future *future, unsigned int seconds)
{
pthread_mutex_lock(&future->mutex);
/* turn off the timeout bit set previously */
future->flag &= ~__FUTURE_TIMEOUT;
while ((future->flag & __FUTURE_FINISHED) == 0) {
if (seconds) {
struct timespec expire_time;
clock_gettime(CLOCK_MONOTONIC, &expire_time);
expire_time.tv_sec += seconds;
int status = pthread_cond_timedwait(&future->cond_finished,
&future->mutex, &expire_time);
if (status == ETIMEDOUT) {
future->flag |= __FUTURE_TIMEOUT;
pthread_mutex_unlock(&future->mutex);
return NULL;
}
} else
pthread_cond_wait(&future->cond_finished, &future->mutex);
}
pthread_mutex_unlock(&future->mutex);
return future->result;
}
此程式會在 __FUTURE_FINISHED
set 前不斷等待執行結果,除非程式已經超過 seconds
所設定的秒數而 timeout。
如果沒有設定 seconds
,會藉由 pthread_cond_wait(&future->cond_finished, &future->mutex);
一直等到結果出來。
如果有設定 seconds
,則會改由使用 pthread_cond_timedwait(&future->cond_finished, &future->mutex, &expire_time);
,在超時後將 __FUTURE_TIMEOUT
set,並回傳 NULL。
(發現其餘程式並未處理 timeout 後的情況)
if (future) {
pthread_mutex_lock(&future->mutex);
if (future->flag & __FUTURE_FINISHED ||
future->flag & __FUTURE_FINISHED) {
pthread_mutex_unlock(&future->mutex);
pthread_mutex_destroy(&future->mutex);
pthread_cond_destroy(&future->cond_finished);
free(future);
} else {
future->flag |= __FUTURE_DESTROYED;
pthread_mutex_unlock(&future->mutex);
}
}
用來將 future 釋放掉,可以注意到他分為兩種情況
__FUTURE_DESTROYED
set,代表此 future 不是在正常結束的狀況,在將其設為 destroyed 後,就會在之後的 tpool_join
中利用 jobqueue_destroy
或是 jobqueue_fetch
將其釋放掉。(好奇並不是立即釋放,以及 timeout 並未判斷)
等到所有的 thread 都執行結束後,將佔用的空間都釋放掉
int tpool_join(struct __threadpool *pool)
{
size_t num_threads = pool->count;
for (int i = 0; i < num_threads; i++)
tpool_apply(pool, NULL, NULL);
for (int i = 0; i < num_threads; i++)
pthread_join(pool->workers[i], NULL);
free(pool->workers);
jobqueue_destroy(pool->jobqueue);
free(pool);
return 0;
}
tpool_apply(pool, NULL, NULL);
: 前面有提過,利用 NULL func
可以配合 jobqueue_fetch
將程式佔用的資源釋放,並且做 terminate。
pthread_join(pool->workers[i], NULL);
: 確保所有的 terminate 都已經完成。
for (int i = 0; i <= PRECISION; i++) {
double *result = tpool_future_get(futures[i], 1);
bpp_sum += *result;
tpool_future_destroy(futures[i]);
free(result);
}
tpool_future_get
的回傳會是 NULL,這樣提取 *result
就會發生 segmentation fault。if(result)
bpp_sum += *result;
這個問題可以加個判斷式解決,但理論上得到的結果會是錯的數字。
我認為可以印一個 timeout 的訊息告知使用者結果錯誤之類的。
目前想用 cancel 處理,但還沒想到合適的 cancel 方法,有參考其他同學的方法,但貌似不太正確,尚在研究。
TODO
在 tpool_future_destroy 拿pid/tid(?),然後做 cancel,然後重啟 thread,之類的方法。
還在思考
27 at 5/25