Try   HackMD

2021q1 Homework4 (quiz4)

contributed by < ccs100203 >

tags: linux2021

第 4 週測驗題

測驗 1

此測驗利用 pthread 實作了一個 thread pool 程式,並使Gregory-Leibniz 級數 來計算圓周率。

Pthread API 解釋

pthread_create

int pthread_create(pthread_t *restrict thread,
                          const pthread_attr_t *restrict attr,
                          void *(*start_routine)(void *),
                          void *restrict arg);

建立新的 thread 來執行 start_routine,而 argstart_routine 的參數。

根據 man page

The new thread terminates in one of the following ways:
1. It calls pthread_exit(3), specifying an exit status value that
is available to another thread in the same process that calls
pthread_join(3).
2. It returns from start_routine(). This is equivalent to calling
pthread_exit(3) with the value supplied in the return
statement.
3. It is canceled (see pthread_cancel(3)).
4. Any of the threads in the process calls exit(3), or the main
thread performs a return from main(). This causes the
termination of all threads in the process.

建立出的 thread 會在下述情況時終止:

  1. 呼叫 pthread_exit(),並且可以藉由 pthread_join() 得知其 status
  2. 從 start_routine() 中 return
  3. 該 thread 被 cancel
  4. 任一 process 中的 thread 呼叫 exit()

pthread_mutex_init

int pthread_mutex_init(pthread_mutex_t *restrict mutex,
           const pthread_mutexattr_t *restrict attr);
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

初始化一個 mutex,attr 為 NULL 的話會設定為 default mutex。

pthread_mutex_destroy

int pthread_mutex_destroy(pthread_mutex_t *mutex);

使一個 mutex 回到未被初始化的狀態。

根據 man page

It shall be safe to destroy an initialized mutex that is unlocked. Attempting to destroy a locked mutex, or a mutex that another thread is attempting to lock, or a mutex that is being used in a pthread_cond_timedwait() or pthread_cond_wait() call by another thread, results in undefined behavior.

嘗試 destroy 一個 unlocked 的 mutex 應該是安全的,但是如果 destroy 一個 locked mutex 或是已經被 pthread_cond_timedwait() 或 pthread_cond_wait() 等待的 mutex,是一個 undefined behavior。

pthread_cond_init && pthread_cond_destroy

int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *restrict cond,
   const pthread_condattr_t *restrict attr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

與上面的 mutex 操作相似,對一個 condition variable 進行初始化或是回到未被初始化的狀態。
同樣要注意的是如果嘗試 destroy 一個被其他 thread block 的 condition variable,也屬於 undefined behavior。

pthread_condattr_init && pthread_condattr_destroy

int pthread_condattr_destroy(pthread_condattr_t *attr);
int pthread_condattr_init(pthread_condattr_t *attr);

對 condition variable attribute 進行初始化或是回到未被初始化的狀態。

Results are undefined if pthread_condattr_init() is called specifying an already initialized attr attributes object.

如果對一個已經 initialized 的 attr 做 pthread_condattr_init() 會是 undefined behavior,亦即不能夠 double initialize。

pthread_mutex_lock && pthread_mutex_unlock && pthread_mutex_trylock

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

lock 與 trylock 最大的差別在於,lock 一個 locked mutex,該 thread 會 block,一直到取得 lock。而 trylock 一個 locked mutex,該 thread 會立即回傳而不會 block。

pthread_cancel && pthread_setcancelstate && pthread_setcanceltype

int pthread_cancel(pthread_t thread);
int pthread_setcancelstate(int state, int *oldstate);
int pthread_setcanceltype(int type, int *oldtype);

呼叫 cancel 時的行為會根據 state 跟 type 而有所差別:

  • state
    • PTHREAD_CANCEL_ENABLE: 此為預設值,代表此 thread 為 cancelable。
    • PTHREAD_CANCEL_DISABLE: 代表此 thread 為 uncancelable,如果收到 cancel 的請求,會將該 thread block 直到可以被 cancel。
  • type
    • PTHREAD_CANCEL_DEFERRED: 此為預設值,cancel 的請求會推遲到下一個 cancellation point 才會執行。
    • PTHREAD_CANCEL_ASYNCHRONOUS: 理論上系統會立即 cancel 該 thread,但此行為不被系統 guarantee。

當一個 cancel 請求執行時,會執行下列的指令:

  1. Cancellation clean-up handlers are popped (in the reverse of
    the order in which they were pushed) and called. (See pthread_cleanup_push(3).)
    2. Thread-specific data destructors are called, in an unspecified order. (See pthread_key_create(3).)
    3. The thread is terminated. (See pthread_exit(3).
  1. 會去 pop cancellation clean-up handlers
  2. 以不特定的順序呼叫 Thread-specific data destructors
  3. 將 thread 終止

而 pthread_join 是唯一的方法去得知一個 cancellation 有被完成。

pthread_testcancel

void pthread_testcancel(void);

會建立一個 cancellation point,對應到上述的 DEFER state,就可以讓原本還不能 cancel 的請求在此時執行。

pthread_cleanup_push && pthread_cleanup_pop

void pthread_cleanup_push(void (*routine)(void *), void *arg);
void pthread_cleanup_pop(int execute);

These functions manipulate the calling thread's stack of thread-cancellation clean-up handlers. A clean-up handler is a function that is automatically executed when a thread is canceled

這兩個函式可用來操作一個叫做 clean-up handler 的 stack,裡面的函式會在 thread 被 cancel 時自動執行。

  • pthread_cleanup_push
    會放一個 routine 進入 stack,而 arg 就是該 routine 執行時的參數。
  • pthread_cleanup_pop
    會從 stack 中 pop 一個 routine,如果 execute 不為 0 的話就會執行該 routine。

要注意如果是使用 pthread_exit() 來終止 thread,則他的 clean-up handlers 會全部執行,但如果是用 return 的方式終止則不會呼叫 clean-up handlers。

pthread_cond_wait && pthread_cond_timedwait

int pthread_cond_timedwait(pthread_cond_t *restrict cond,
   pthread_mutex_t *restrict mutex,
   const struct timespec *restrict abstime);
int pthread_cond_wait(pthread_cond_t *restrict cond,
   pthread_mutex_t *restrict mutex);

這兩個函式會將傳入的 mutex 在 atomic 的情況下釋放,並使 thread block 在該 cond。注意 mutex 需要已經被呼叫的 thread lock,函式才能正常運作。

而 timewait 則是多了一個 abstime,如果 thread 被 block 超過此時間的話就會回傳 error。

pthread_cond_broadcast && - pthread_cond_signal

  • pthread_cond_broadcast
    會 unblock 所有被傳入的 condition variable 所 block 的 threads
  • pthread_cond_signal
    會 unblock 至少一個被傳入的 condition variable 所 block 的 threads

同樣注意 mutex 需要已經被呼叫的 thread lock,函式才能正常運作。

pthread_join

int pthread_join(pthread_t thread, void **retval);

會一直等待傳入的 thread 直到其 terminate。
如果 retval 是 NULL,會將 pthread_exit 中的 retval 複製進這裡的 retval,如果傳入的 thread 被 cancel 了,則會在 retval 放入 PTHREAD_CANCELED,這也對應到上面所述 只有 pthread_join 能夠知道 cancel 是否完成

pthread_exit

noreturn void pthread_exit(void *retval);

終止呼叫此函式的 thread,並執行在 clean-up handlers 內的 routines,而傳入的retval 可利用 pthread_join 取得。

TODO
what's ROBUST mutex

Thread pool 程式原理

Structure







struct



jobqueue_fetch
jobqueue_fetch()



bpp
bpp()



bpp2
bpp()



__threadpool

__threadpool

count

workers

jobqueue



__threadpool:l1->jobqueue_fetch:w





jobqueue_t

jobqueue_t

*head

*tail

cond_noempty

rwlock



__threadpool:l2->jobqueue_t:ll





threadtask_t

threadtask_t

*func

*arg

*future

*next



jobqueue_t:l0->threadtask_t:ll





threadtask_t1

threadtask_t

*func

*arg

*future

*next



jobqueue_t:l1->threadtask_t1:ll





threadtask_t:l0->bpp:w





threadtask_t:l3->threadtask_t1:ll





__tpool_future

__tpool_future

flag

*result

mutex

cond_finished



threadtask_t:l2->__tpool_future:ll





threadtask_t1:l0->bpp2:w





__tpool_future1

__tpool_future

flag

*result

mutex

cond_finished



threadtask_t1:l2->__tpool_future1:ll





上圖是 thread pool 的架構簡圖

  • __threadpool
    • 整個架構最外層的包裝,worker 會存放數量為 count 的 pthread,且 start routine 為jobqueue_fetch()
    • jobqueue 則是對應到 jobqueue_t
  • jobqueue_t
    • 藉由 *head*tail 維持一條 task 的 linked list,會存放目前有什麼 task 需要給 pthread 執行
    • cond_noempty 是代表 task 不為空的 condition variable。
    • rwlock 是用來保證對 lisk 操作的正確性
  • threadtask_t
    • *funcarg 為存放的 task,這邊就是存放 bpp()
    • *future 用來指向一個 __tpool_future,表示此 task 當前的狀態
    • *next 則是用來指向下一個 task 的指標
  • __tpool_future
    • flag 為 future 的當前狀態
    • *result 存放執行結果
    • mutex 確保操作的正確性,對其餘 member 進行操作時都要取得 mutex
    • cond_finished 是代表運算完成的 condition variable

Tpool 程式流程

一開始會先用 tpool_create() 創造一個 thread pool。再來用 tpool_apply() 將參數中傳入的 func 放入 task 的 linked list 內,以便讓 thread pool 中的 thread 提取來做。接著呼叫 tpool_future_get 來提取運算的結果,最後會利用 tpool_join 在 destroy 掉 thread pool 之前確保所有 task 都不是處於 pending。

函式解釋

  • jobqueue_create && tpool_create
static jobqueue_t *jobqueue_create(void) { jobqueue_t *jobqueue = malloc(sizeof(jobqueue_t)); if (jobqueue) { jobqueue->head = jobqueue->tail = NULL; pthread_cond_init(&jobqueue->cond_nonempty, NULL); pthread_mutex_init(&jobqueue->rwlock, NULL); } return jobqueue; } struct __threadpool *tpool_create(size_t count) { jobqueue_t *jobqueue = jobqueue_create(); struct __threadpool *pool = malloc(sizeof(struct __threadpool)); if (!jobqueue || !pool) { if (jobqueue) jobqueue_destroy(jobqueue); free(pool); return NULL; } pool->count = count, pool->jobqueue = jobqueue; if ((pool->workers = malloc(count * sizeof(pthread_t)))) { for (int i = 0; i < count; i++) { if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch, (void *) jobqueue)) { for (int j = 0; j < i; j++) pthread_cancel(pool->workers[j]); for (int j = 0; j < i; j++) pthread_join(pool->workers[j], NULL); free(pool->workers); jobqueue_destroy(jobqueue); free(pool); return NULL; } } return pool; } jobqueue_destroy(jobqueue); free(pool); return NULL; }

jobqueue_create 會建立一個 jobqueue_t 並對裡頭的 mutex, condition variable 做初始化。
tpool_create 會建立一個 thread pool。

第 24 行的 if block,根據 count 去配置 pthread 所需要的空間並且同時做檢查,再來會逐一的呼叫 pthread_create 將 jubqueue_fetch 放入 thread 的 start routine。
此時同樣利用 if 去做檢查,因為 pthread_create 的 return value 為 0 時代表成功。所以 if 通過的話代表 create 出錯,此時會利用 pthread_cancel 取消前面已經 create 的 thread,從文章一開始的 api 解釋中可以找到這是其中一種 terminate thread 的方法,因為 pthread_cancel 會去自動處理 clean-up handlers 內的 routine,所以可以藉此來一起釋放 lock (因為 jobqueue_fetch 內會取得 rwlock)。

再來用 pthread_join 確保所有的 cancel 已經完成 (唯一的方式),在將其餘空間釋放掉即可。

  • jobqueue_fetch
static void *jobqueue_fetch(void *queue) { jobqueue_t *jobqueue = (jobqueue_t *) queue; threadtask_t *task; int old_state; pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock); while (1) { pthread_mutex_lock(&jobqueue->rwlock); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state); pthread_testcancel(); while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state); if (jobqueue->head == jobqueue->tail) { task = jobqueue->tail; jobqueue->head = jobqueue->tail = NULL; } else { threadtask_t *tmp; for (tmp = jobqueue->head; tmp->next != jobqueue->tail; tmp = tmp->next) ; task = tmp->next; tmp->next = NULL; jobqueue->tail = tmp; } pthread_mutex_unlock(&jobqueue->rwlock); if (task->func) { pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&task->future->mutex); free(task); continue; } else { task->future->flag |= __FUTURE_RUNNING; pthread_mutex_unlock(&task->future->mutex); } void *ret_value = task->func(task->arg); pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_DESTROYED) { pthread_mutex_unlock(&task->future->mutex); pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); } else { task->future->flag |= __FUTURE_FINISHED; task->future->result = ret_value; pthread_cond_broadcast(&task->future->cond_finished); pthread_mutex_unlock(&task->future->mutex); } free(task); } else { pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); free(task); break; } } pthread_cleanup_pop(0); pthread_exit(NULL); }
pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);

就是上面的 create 使用 pthread_cancel 而不是 pthread_exit 的原因,因為可以在 terminate 之前先將 thread 所拿的 rwlock 也一併釋放掉。

pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
pthread_testcancel();

上面兩行就是為了配合 pthread_cancel 的使用,所以加入的程式碼。

while (!jobqueue->tail) 
        pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);

這裡可以看到在 task 的 linked list 還屬於 empty 的狀態時,這些 thread 會釋放掉 rwlock 並等待 cond_nonempty,一直到被喚醒。而利用 while (!jobqueue->tail) 的原因我認為是因為如果同時有多個 thread 在等待 cond_nonempty,在放入一個 task 之後會喚醒所有的 thread,但只有一個 thread 能拿到 task 並執行,其餘的需要重新去等待又空了的 task list。

if (jobqueue->head == jobqueue->tail) {
    task = jobqueue->tail;
    jobqueue->head = jobqueue->tail = NULL;
} else {
    threadtask_t *tmp;
    for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
         tmp = tmp->next)
        ;
    task = tmp->next;
    tmp->next = NULL;
    jobqueue->tail = tmp;
}
pthread_mutex_unlock(&jobqueue->rwlock);

會從 linked list 的最後面拿取 task,由於越先插入的 task 會在越後面,所以這是一種 FIFO 的設計。如果已經拿到最後一個 task 的話要將 headtail 都指向 NULL。
由於對 linked list 的操作已經結束了,所以這時就可以將 rwlock 釋放。

if (task->func) { pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&task->future->mutex); free(task); continue; } else { task->future->flag |= __FUTURE_RUNNING; pthread_mutex_unlock(&task->future->mutex); } void *ret_value = task->func(task->arg); pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_DESTROYED) { pthread_mutex_unlock(&task->future->mutex); pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); } else { task->future->flag |= __FUTURE_FINISHED; task->future->result = ret_value; pthread_cond_broadcast(&task->future->cond_finished); pthread_mutex_unlock(&task->future->mutex); } free(task); } else { pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); free(task); break; }

如果 func 不是 NULL 代表需要執行他,要注意如果需要讀寫 flag 時都需要先取得 mutex,一開始會先判斷該 task 是否已經為__FUTURE_CANCELLED,是的話就會將 task 釋放,不是的話就會為他加上 __FUTURE_RUNNING 的狀態。

會在第 12 行執行該 func

再來會根據 flag 是 destroyed 或 finished 而有不同的操作

  • destroyed
    代表此 future 需要 destroyed,會將其內部的 mutex 與 condition variable 都銷毀,並釋放掉 future。
  • finished
    代表 func 已經算出結果,會將結果放到 result 中,並通過 pthread_cond_broadcast 讓正在等待的 thread 知道計算已經完成。

如果 func 是 NULL,這邊是配合著 tpool_join 使用,會藉此將 future 所佔用的資源釋放並跳出迴圈。

pthread_cleanup_pop(0);
pthread_exit(NULL);

上面的兩行程式碼是在 while loop 的外部,只有在 func 為 NULL 時,才能夠藉由最後的 break 而執行到。
pthread_cleanup_pop 是用來將一開始 push 進去 handler 的 routine pop 掉 (並不會執行),因為一開始 push 進去是為了在 create 的階段如果出錯,會藉著 pthread_cancel 將 rwlock 釋放掉。如今 rwlock 已經在正常的執行下釋放了,所以就要在這裡把 routine 移除掉。
最後在利用 pthread_exit 將該 thread terminate

  • tpool_apply

此函式的目的在於將傳入的 func 接上 task linked list,就像是作為提供給 thread 工作的 producer。

struct __tpool_future *tpool_apply(struct __threadpool *pool, void *(*func)(void *), void *arg) { jobqueue_t *jobqueue = pool->jobqueue; threadtask_t *new_head = malloc(sizeof(threadtask_t)); struct __tpool_future *future = tpool_future_create(); if (new_head && future) { new_head->func = func, new_head->arg = arg, new_head->future = future; pthread_mutex_lock(&jobqueue->rwlock); if (jobqueue->head) { new_head->next = jobqueue->head; jobqueue->head = new_head; } else { jobqueue->head = jobqueue->tail = new_head; pthread_cond_broadcast(&jobqueue->cond_nonempty); } pthread_mutex_unlock(&jobqueue->rwlock); } else if (new_head) { free(new_head); return NULL; } else if (future) { tpool_future_destroy(future); return NULL; } return future; }

在第 8 行確保 threadtask_t__tpool_future 的創建都沒問題後,會逐一的將參數放置進 struct 內的位置。要注意到若是要對 task linked list 進行操作,都需要先拿取 rwlock

第 11 行時已取得 rwlock,而此時就要將新的 task 插入 list 的 head,唯一要注意的是如果原本的 task 為 empty,那要進行 pthread_cond_broadcast(&jobqueue->cond_nonempty); 來通知 pool 內正在等待的 thread 可以拿取 task 來做了。

  • tpool_future_get

此函式用來拿取 task 的執行結果

void *tpool_future_get(struct __tpool_future *future, unsigned int seconds) { pthread_mutex_lock(&future->mutex); /* turn off the timeout bit set previously */ future->flag &= ~__FUTURE_TIMEOUT; while ((future->flag & __FUTURE_FINISHED) == 0) { if (seconds) { struct timespec expire_time; clock_gettime(CLOCK_MONOTONIC, &expire_time); expire_time.tv_sec += seconds; int status = pthread_cond_timedwait(&future->cond_finished, &future->mutex, &expire_time); if (status == ETIMEDOUT) { future->flag |= __FUTURE_TIMEOUT; pthread_mutex_unlock(&future->mutex); return NULL; } } else pthread_cond_wait(&future->cond_finished, &future->mutex); } pthread_mutex_unlock(&future->mutex); return future->result; }

此程式會在 __FUTURE_FINISHED set 前不斷等待執行結果,除非程式已經超過 seconds 所設定的秒數而 timeout。
如果沒有設定 seconds,會藉由 pthread_cond_wait(&future->cond_finished, &future->mutex); 一直等到結果出來。
如果有設定 seconds,則會改由使用 pthread_cond_timedwait(&future->cond_finished, &future->mutex, &expire_time);,在超時後將 __FUTURE_TIMEOUT set,並回傳 NULL。
(發現其餘程式並未處理 timeout 後的情況)

  • tpool_future_destroy
if (future) { pthread_mutex_lock(&future->mutex); if (future->flag & __FUTURE_FINISHED || future->flag & __FUTURE_FINISHED) { pthread_mutex_unlock(&future->mutex); pthread_mutex_destroy(&future->mutex); pthread_cond_destroy(&future->cond_finished); free(future); } else { future->flag |= __FUTURE_DESTROYED; pthread_mutex_unlock(&future->mutex); } }

用來將 future 釋放掉,可以注意到他分為兩種情況

  1. __FUTURE_FINISHED 或 __FUTURE_FINISHED
    程式正常的執行結束或是被取消了,那麼就會將資源正常的釋放掉並結束
  2. 其他情況
    會將 __FUTURE_DESTROYED set,代表此 future 不是在正常結束的狀況,在將其設為 destroyed 後,就會在之後的 tpool_join 中利用 jobqueue_destroy 或是 jobqueue_fetch 將其釋放掉。

(好奇並不是立即釋放,以及 timeout 並未判斷)

  • tpool_join

等到所有的 thread 都執行結束後,將佔用的空間都釋放掉

int tpool_join(struct __threadpool *pool) { size_t num_threads = pool->count; for (int i = 0; i < num_threads; i++) tpool_apply(pool, NULL, NULL); for (int i = 0; i < num_threads; i++) pthread_join(pool->workers[i], NULL); free(pool->workers); jobqueue_destroy(pool->jobqueue); free(pool); return 0; }

tpool_apply(pool, NULL, NULL);: 前面有提過,利用 NULL func 可以配合 jobqueue_fetch 將程式佔用的資源釋放,並且做 terminate。

pthread_join(pool->workers[i], NULL);: 確保所有的 terminate 都已經完成。

指出改進空間並實作

程式 timeout 後並未處理

for (int i = 0; i <= PRECISION; i++) {
    double *result = tpool_future_get(futures[i], 1);
    bpp_sum += *result;
    tpool_future_destroy(futures[i]);
    free(result);
}
  1. 假使程式 timeout,那麼 tpool_future_get 的回傳會是 NULL,這樣提取 *result 就會發生 segmentation fault。
if(result)
    bpp_sum += *result;

這個問題可以加個判斷式解決,但理論上得到的結果會是錯的數字。
我認為可以印一個 timeout 的訊息告知使用者結果錯誤之類的。

  1. 也發現另一個問題,程式即使 timeout,也會等到該 task 結束後才進行處理。換句話說如果在 task 放一個無窮迴圈,那就永遠不會結束了,所以這情況也要處理。

目前想用 cancel 處理,但還沒想到合適的 cancel 方法,有參考其他同學的方法,但貌似不太正確,尚在研究。
TODO

在 tpool_future_destroy 拿pid/tid(?),然後做 cancel,然後重啟 thread,之類的方法。
還在思考

27 at 5/25

研讀 atomic_threadpool,指出其 atomic 操作的運用方式,並說明該 lock-free 的手法

嘗試使用 C11 Atomics 改寫上述程式碼,使其有更好的 scalability