# [2021q1](http://wiki.csie.ncku.edu.tw/linux/schedule) 第 4 週測驗題 ###### tags: `linux2021` :::info 目的: 檢驗學員對 **[bitwise operation](https://hackmd.io/@sysprog/c-bitwise)** 及 **[並行和多執行緒程式設計](https://hackmd.io/@sysprog/concurrency)** 的認知 ::: ==[作答表單](https://docs.google.com/forms/d/e/1FAIpQLScPK4tAN4a_lBfxQzE4lCXm9MDIhVI3deEtBH6IuYcC3tu26g/viewform)== ### 測驗 `1` 3 月 14 日是[圓周率日](https://en.wikipedia.org/wiki/Pi_Day),這天也是愛因斯坦的生日,求圓周率近似值的討論可見: * video: [除了割圓術,圓周率還可以這樣算](https://youtu.be/BkDbVypDgSs) * video: [古人如何計算圓周率 π?](https://youtu.be/AvMaNDh_R0w) * video: [如何計算圓周率 π 的 1 億位?](https://youtu.be/BkDbVypDgSs) [Gregory-Leibniz 級數](https://mathworld.wolfram.com/GregorySeries.html)可優雅地計算圓周率,參考 [Leibniz's Formula for Pi](https://proofwiki.org/wiki/Leibniz%27s_Formula_for_Pi)。從下面的 _Madhava–Leibniz series_ 開始推導: $$ \arctan(1) = \dfrac{\pi}{4} = 1 - \dfrac{1}{3} + \dfrac{1}{5} - \dfrac{1}{7} +\ ... $$ 首先積分下列[數列](https://proofwiki.org/wiki/Leibniz%27s_Formula_for_Pi/Lemma) $$ \dfrac{1}{1+t^2} = 1 - t^2 + t^4 - t^6 + t^8 + ... + t^{4n} - \frac{t^{4n+2}}{1+t^2} $$ 從 $0$ 積分到 $x$, $0\leq{x}\leq1$ $$ \int_{0}^{x}\dfrac{1}{1+t^2}dt=x-\frac{x^3}{5}+\frac{x^5}{5}-\frac{x^7}{7}+...+\frac{x^{4n+1}}{4n+1}-R_n(x) \\where\ R_n(x)=\int_{0}^{x}\dfrac{t^{4n+2}}{1+t^2}dt $$ 先看 $R_n(x)$ ,因為 $1\leq1+t^2$,得到 $$ 0\leq{R_n(x)}\leq\int_{0}^{x}t^{4n+2}dt=\frac{x^{4n+3}}{4n+3} $$ 又因為 $$ \frac{x^{4n+3}}{4n+3}\leq\frac{1}{4n+3}, 0\leq{x}\leq1 $$ 依據[夾擠定理](https://zh.wikipedia.org/wiki/%E5%A4%BE%E6%93%A0%E5%AE%9A%E7%90%86) (squeeze theorem,也稱為 sandwich theorem),當 $n\rightarrow\infty, \frac{1}{4n+3}\rightarrow0$ ,於是得出下列式子 $$ \int_{0}^{x}\dfrac{1}{1+t^2}dt = x-\frac{x^3}{3}+\frac{x^5}{5}-\frac{x^7}{7}+\ ... $$ 而且$\frac{d}{dx}arctan(x)=\frac{1}{1+t^2}$ $$ \arctan(x) = x-\frac{x^3}{3}+\frac{x^5}{5}-\frac{x^7}{7}+\ ... $$ 此時將 $x$ 代入 1,即可得 $\frac{\pi}{4}$ 以下是對應實作: ```cpp double compute_pi_leibniz(size_t N) { double pi = 0.0; for (size_t i = 0; i < N; i++) { double tmp = (i & 1) ? (-1) : 1; pi += tmp / (2 * i + 1); } return pi * 4.0; } ``` 比較單執行緒、多執行緒和 SIMD 版本的表現: ![](https://i.imgur.com/x4gz1oE.png) 1995 年提出的 [Bailey–Borwein–Plouffe formula](https://en.wikipedia.org/wiki/Bailey%E2%80%93Borwein%E2%80%93Plouffe_formula),可用這三位發表者的姓氏開頭簡稱為 BBP 公式,最初的公式如下: $\pi=\displaystyle\sum_{k=0}^{\infty}\frac{1}{16^k}(\frac{4}{8k+1}-\frac{2}{8k+4}-\frac{1}{8k+5}-\frac{1}{8k+6})$ BBP 公式跳脫典型的圓周率的演算法,可計算圓周率的任意第 $n$ 位,而不用事先計算前面的 $n - 1$ 位,於是 BBP 公式很適合透過並行運算來求圓周率近似值。 在 [並行和多執行緒程式設計](https://hackmd.io/@sysprog/concurrency) 提到 [thread pool](https://en.wikipedia.org/wiki/Thread_pool),如此設計的考量如下: 1. 在大量的執行緒環境中,建立和銷毀執行緒物件的開銷相當可觀,而且頻繁的執行緒物件建立和銷毀,會對高度並行化的應用程式帶來額外的延遲時間 2. 考慮到硬體的有效處理器數量有限,使用 thread pool 可控管執行分配到處理器的執行緒數量 用醫院來比喻: * 沒有 thread pool 時: 醫院每天要面對成千上萬的病人,每當一個病人求診,就找一位醫生處理,看診後醫生也跟著離開。當看病時間較短時,醫生來去的時間,顯得尤為費時 * 初步引入 thread pool: 醫院設置門診,把醫生全派出去坐診,病人想看診之前,強制先掛號排隊,醫生根據病人隊列順序,依次處理各個病人,這樣就省去醫生往返的時間。但倘若病患很少,醫生卻過多,這會使得很多醫生閒置,浪費醫療資源 * 改進 thread pool: 門診一開始只派出部分醫生,但增加一位協調人 (現實就是護理師擔任),病人依舊是排隊看病,協調人負責調度醫療資源。當病人很多、醫生忙不過來時,協調人就呼叫更多醫生來幫忙;當病人不多、醫生過多時,協調人就安排部分醫生休息待命,避免醫療資源的浪費 示意圖: ![](https://i.imgur.com/GedtciF.png) 以下是一個 [thread pool](https://en.wikipedia.org/wiki/Thread_pool) 實作: ```cpp #include <stddef.h> typedef struct __tpool_future *tpool_future_t; typedef struct __threadpool *tpool_t; /** * Create a thread pool containing specified number of threads. * If successful, the thread pool is returned. Otherwise, it * returns NULL. */ tpool_t tpool_create(size_t count); /** * Schedules the specific function to be executed. * If successful, a future object representing the execution of * the task is returned. Otherwise, it returns NULL. */ tpool_future_t tpool_apply(tpool_t pool, void *(*func)(void *), void *arg); /** * Wait for all pending tasks to complete before destroying the thread pool. */ int tpool_join(tpool_t pool); /** * Return the result when it becomes available. * If @seconds is non-zero and the result does not arrive within specified time, * NULL is returned. Each tpool_future_get() resets the timeout status on * @future. */ void *tpool_future_get(tpool_future_t future, unsigned int seconds); /** * Destroy the future object and free resources once it is no longer used. * It is an error to refer to a destroyed future object. Note that destroying * a future object does not prevent a pending task from being executed. */ int tpool_future_destroy(tpool_future_t future); #include <errno.h> #include <pthread.h> #include <stdlib.h> #include <time.h> enum __future_flags { __FUTURE_RUNNING = 01, __FUTURE_FINISHED = 02, __FUTURE_TIMEOUT = 04, __FUTURE_CANCELLED = 010, __FUTURE_DESTROYED = 020, }; typedef struct __threadtask { void *(*func)(void *); void *arg; struct __tpool_future *future; struct __threadtask *next; } threadtask_t; typedef struct __jobqueue { threadtask_t *head, *tail; pthread_cond_t cond_nonempty; pthread_mutex_t rwlock; } jobqueue_t; struct __tpool_future { int flag; void *result; pthread_mutex_t mutex; pthread_cond_t cond_finished; }; struct __threadpool { size_t count; pthread_t *workers; jobqueue_t *jobqueue; }; static struct __tpool_future *tpool_future_create(void) { struct __tpool_future *future = malloc(sizeof(struct __tpool_future)); if (future) { future->flag = 0; future->result = NULL; pthread_mutex_init(&future->mutex, NULL); pthread_condattr_t attr; pthread_condattr_init(&attr); pthread_cond_init(&future->cond_finished, &attr); pthread_condattr_destroy(&attr); } return future; } int tpool_future_destroy(struct __tpool_future *future) { if (future) { pthread_mutex_lock(&future->mutex); if (future->flag & __FUTURE_FINISHED || future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&future->mutex); pthread_mutex_destroy(&future->mutex); pthread_cond_destroy(&future->cond_finished); free(future); } else { future->flag |= __FUTURE_DESTROYED; pthread_mutex_unlock(&future->mutex); } } return 0; } void *tpool_future_get(struct __tpool_future *future, unsigned int seconds) { pthread_mutex_lock(&future->mutex); /* turn off the timeout bit set previously */ future->flag &= ~__FUTURE_TIMEOUT; while ((future->flag & __FUTURE_FINISHED) == 0) { if (seconds) { struct timespec expire_time; clock_gettime(CLOCK_MONOTONIC, &expire_time); expire_time.tv_sec += seconds; int status = pthread_cond_timedwait(&future->cond_finished, &future->mutex, &expire_time); if (status == ETIMEDOUT) { future->flag |= __FUTURE_TIMEOUT; pthread_mutex_unlock(&future->mutex); return NULL; } } else FFF; } pthread_mutex_unlock(&future->mutex); return future->result; } static jobqueue_t *jobqueue_create(void) { jobqueue_t *jobqueue = malloc(sizeof(jobqueue_t)); if (jobqueue) { jobqueue->head = jobqueue->tail = NULL; pthread_cond_init(&jobqueue->cond_nonempty, NULL); pthread_mutex_init(&jobqueue->rwlock, NULL); } return jobqueue; } static void jobqueue_destroy(jobqueue_t *jobqueue) { threadtask_t *tmp = jobqueue->head; while (tmp) { jobqueue->head = jobqueue->head->next; pthread_mutex_lock(&tmp->future->mutex); if (tmp->future->flag & __FUTURE_DESTROYED) { pthread_mutex_unlock(&tmp->future->mutex); pthread_mutex_destroy(&tmp->future->mutex); pthread_cond_destroy(&tmp->future->cond_finished); free(tmp->future); } else { tmp->future->flag |= __FUTURE_CANCELLED; pthread_mutex_unlock(&tmp->future->mutex); } free(tmp); tmp = jobqueue->head; } pthread_mutex_destroy(&jobqueue->rwlock); pthread_cond_destroy(&jobqueue->cond_nonempty); free(jobqueue); } static void __jobqueue_fetch_cleanup(void *arg) { pthread_mutex_t *mutex = (pthread_mutex_t *) arg; pthread_mutex_unlock(mutex); } static void *jobqueue_fetch(void *queue) { jobqueue_t *jobqueue = (jobqueue_t *) queue; threadtask_t *task; int old_state; pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock); while (1) { pthread_mutex_lock(&jobqueue->rwlock); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state); pthread_testcancel(); GGG; pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state); if (jobqueue->head == jobqueue->tail) { task = jobqueue->tail; jobqueue->head = jobqueue->tail = NULL; } else { threadtask_t *tmp; for (tmp = jobqueue->head; tmp->next != jobqueue->tail; tmp = tmp->next) ; task = tmp->next; tmp->next = NULL; jobqueue->tail = tmp; } pthread_mutex_unlock(&jobqueue->rwlock); if (task->func) { pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&task->future->mutex); free(task); continue; } else { task->future->flag |= __FUTURE_RUNNING; pthread_mutex_unlock(&task->future->mutex); } void *ret_value = task->func(task->arg); pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_DESTROYED) { pthread_mutex_unlock(&task->future->mutex); pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); } else { task->future->flag |= KKK; task->future->result = ret_value; LLL; pthread_mutex_unlock(&task->future->mutex); } free(task); } else { pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); free(task); break; } } pthread_cleanup_pop(0); pthread_exit(NULL); } struct __threadpool *tpool_create(size_t count) { jobqueue_t *jobqueue = jobqueue_create(); struct __threadpool *pool = malloc(sizeof(struct __threadpool)); if (!jobqueue || !pool) { if (jobqueue) jobqueue_destroy(jobqueue); free(pool); return NULL; } pool->count = count, pool->jobqueue = jobqueue; if ((pool->workers = malloc(count * sizeof(pthread_t)))) { for (int i = 0; i < count; i++) { if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch, (void *) jobqueue)) { for (int j = 0; j < i; j++) pthread_cancel(pool->workers[j]); for (int j = 0; j < i; j++) pthread_join(pool->workers[j], NULL); free(pool->workers); jobqueue_destroy(jobqueue); free(pool); return NULL; } } return pool; } jobqueue_destroy(jobqueue); free(pool); return NULL; } struct __tpool_future *tpool_apply(struct __threadpool *pool, void *(*func)(void *), void *arg) { jobqueue_t *jobqueue = pool->jobqueue; threadtask_t *new_head = malloc(sizeof(threadtask_t)); struct __tpool_future *future = tpool_future_create(); if (new_head && future) { new_head->func = func, new_head->arg = arg, new_head->future = future; pthread_mutex_lock(&jobqueue->rwlock); if (jobqueue->head) { new_head->next = jobqueue->head; jobqueue->head = new_head; } else { jobqueue->head = jobqueue->tail = new_head; HHH; } pthread_mutex_unlock(&jobqueue->rwlock); } else if (new_head) { free(new_head); return NULL; } else if (future) { tpool_future_destroy(future); return NULL; } return future; } int tpool_join(struct __threadpool *pool) { size_t num_threads = pool->count; for (int i = 0; i < num_threads; i++) tpool_apply(pool, NULL, NULL); for (int i = 0; i < num_threads; i++) pthread_join(pool->workers[i], NULL); free(pool->workers); jobqueue_destroy(pool->jobqueue); free(pool); return 0; } ``` 我們使用 [Bailey–Borwein–Plouffe formula](https://en.wikipedia.org/wiki/Bailey%E2%80%93Borwein%E2%80%93Plouffe_formula) 和上述的 thread pool 撰寫求圓周率近似值的程式碼: ```cpp #include <math.h> #include <stdio.h> #define PRECISION 100 /* upper bound in BPP sum */ /* Use Bailey–Borwein–Plouffe formula to approximate PI */ static void *bpp(void *arg) { int k = *(int *) arg; double sum = (4.0 / (8 * k + 1)) - (2.0 / (8 * k + 4)) - (1.0 / (8 * k + 5)) - (1.0 / (8 * k + 6)); double *product = malloc(sizeof(double)); if (product) *product = 1 / pow(16, k) * sum; return (void *) product; } int main() { int bpp_args[PRECISION + 1]; double bpp_sum = 0; tpool_t pool = tpool_create(4); tpool_future_t futures[PRECISION + 1]; for (int i = 0; i <= PRECISION; i++) { bpp_args[i] = i; futures[i] = tpool_apply(pool, bpp, (void *) &bpp_args[i]); } for (int i = 0; i <= PRECISION; i++) { double *result = tpool_future_get(futures[i], 0 /* blocking wait */); bpp_sum += *result; tpool_future_destroy(futures[i]); free(result); } tpool_join(pool); printf("PI calculated with %d terms: %.15f\n", PRECISION + 1, bpp_sum); return 0; } ``` 預期執行輸出: ``` PI calculated with 101 terms: 3.141592653589793 ``` 請補完程式碼,使執行結果符合預期。 ==作答區== FFF = ? * `(a)` `pthread_cond_wait(&future->cond_finished, &future->mutex)` * `(b)` `pthread_cond_wait(&future->mutex, &future->cond_finished)` * `(c)` `pthread_cond_broadcast(&future->cond_finished, &future->mutex)` * `(d)` `return NULL` GGG = ? * `(a)` `while (!jobqueue->tail) /* wait */` * `(b)` `/* no operation */` * `(c)` `while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock)` * `(d)` `while (!jobqueue->tail) pthread_cond_broadcast(&jobqueue->cond_nonempty)` HHH = ? * `(a)` `pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock)` * `(b)` `pthread_cond_broadcast(&jobqueue->cond_nonempty)` KKK = ? * `(a)` `__FUTURE_RUNNING` * `(b)` `__FUTURE_FINISHED` * `(c)` `__FUTURE_TIMEOUT` * `(d)` `__FUTURE_CANCELLED` * `(e)` `__FUTURE_DESTROYED` LLL = ? * `(a)` `pthread_cond_broadcast(&task->future->cond_finished)` * `(b)` `pthread_cond_wait(&task->future->cond_finished, &future->mutex)` :::success 延伸問題: 1. 解釋上述程式碼運作原理,包含 timeout 處理機制,指出改進空間並實作 2. 研讀 [atomic_threadpool](https://github.com/Taymindis/atomic_threadpool),指出其 atomic 操作的運用方式,並說明該 lock-free 的手法 3. 嘗試使用 C11 Atomics 改寫上述程式碼,使其有更好的 scalability :::