# [2021q1](http://wiki.csie.ncku.edu.tw/linux/schedule) 第 4 週測驗題
###### tags: `linux2021`
:::info
目的: 檢驗學員對 **[bitwise operation](https://hackmd.io/@sysprog/c-bitwise)** 及 **[並行和多執行緒程式設計](https://hackmd.io/@sysprog/concurrency)** 的認知
:::
==[作答表單](https://docs.google.com/forms/d/e/1FAIpQLScPK4tAN4a_lBfxQzE4lCXm9MDIhVI3deEtBH6IuYcC3tu26g/viewform)==
### 測驗 `1`
3 月 14 日是[圓周率日](https://en.wikipedia.org/wiki/Pi_Day),這天也是愛因斯坦的生日,求圓周率近似值的討論可見:
* video: [除了割圓術,圓周率還可以這樣算](https://youtu.be/BkDbVypDgSs)
* video: [古人如何計算圓周率 π?](https://youtu.be/AvMaNDh_R0w)
* video: [如何計算圓周率 π 的 1 億位?](https://youtu.be/BkDbVypDgSs)
[Gregory-Leibniz 級數](https://mathworld.wolfram.com/GregorySeries.html)可優雅地計算圓周率,參考 [Leibniz's Formula for Pi](https://proofwiki.org/wiki/Leibniz%27s_Formula_for_Pi)。從下面的 _Madhava–Leibniz series_ 開始推導:
$$
\arctan(1) = \dfrac{\pi}{4} = 1 - \dfrac{1}{3} + \dfrac{1}{5} - \dfrac{1}{7} +\ ...
$$
首先積分下列[數列](https://proofwiki.org/wiki/Leibniz%27s_Formula_for_Pi/Lemma)
$$
\dfrac{1}{1+t^2} = 1 - t^2 + t^4 - t^6 + t^8 + ... + t^{4n} - \frac{t^{4n+2}}{1+t^2}
$$
從 $0$ 積分到 $x$, $0\leq{x}\leq1$
$$
\int_{0}^{x}\dfrac{1}{1+t^2}dt=x-\frac{x^3}{5}+\frac{x^5}{5}-\frac{x^7}{7}+...+\frac{x^{4n+1}}{4n+1}-R_n(x)
\\where\ R_n(x)=\int_{0}^{x}\dfrac{t^{4n+2}}{1+t^2}dt
$$
先看 $R_n(x)$ ,因為 $1\leq1+t^2$,得到
$$
0\leq{R_n(x)}\leq\int_{0}^{x}t^{4n+2}dt=\frac{x^{4n+3}}{4n+3}
$$
又因為
$$
\frac{x^{4n+3}}{4n+3}\leq\frac{1}{4n+3}, 0\leq{x}\leq1
$$
依據[夾擠定理](https://zh.wikipedia.org/wiki/%E5%A4%BE%E6%93%A0%E5%AE%9A%E7%90%86) (squeeze theorem,也稱為 sandwich theorem),當 $n\rightarrow\infty, \frac{1}{4n+3}\rightarrow0$ ,於是得出下列式子
$$
\int_{0}^{x}\dfrac{1}{1+t^2}dt = x-\frac{x^3}{3}+\frac{x^5}{5}-\frac{x^7}{7}+\ ...
$$
而且$\frac{d}{dx}arctan(x)=\frac{1}{1+t^2}$
$$
\arctan(x) = x-\frac{x^3}{3}+\frac{x^5}{5}-\frac{x^7}{7}+\ ...
$$
此時將 $x$ 代入 1,即可得 $\frac{\pi}{4}$
以下是對應實作:
```cpp
double compute_pi_leibniz(size_t N)
{
double pi = 0.0;
for (size_t i = 0; i < N; i++) {
double tmp = (i & 1) ? (-1) : 1;
pi += tmp / (2 * i + 1);
}
return pi * 4.0;
}
```
比較單執行緒、多執行緒和 SIMD 版本的表現:
![](https://i.imgur.com/x4gz1oE.png)
1995 年提出的 [Bailey–Borwein–Plouffe formula](https://en.wikipedia.org/wiki/Bailey%E2%80%93Borwein%E2%80%93Plouffe_formula),可用這三位發表者的姓氏開頭簡稱為 BBP 公式,最初的公式如下:
$\pi=\displaystyle\sum_{k=0}^{\infty}\frac{1}{16^k}(\frac{4}{8k+1}-\frac{2}{8k+4}-\frac{1}{8k+5}-\frac{1}{8k+6})$
BBP 公式跳脫典型的圓周率的演算法,可計算圓周率的任意第 $n$ 位,而不用事先計算前面的 $n - 1$ 位,於是 BBP 公式很適合透過並行運算來求圓周率近似值。
在 [並行和多執行緒程式設計](https://hackmd.io/@sysprog/concurrency) 提到 [thread pool](https://en.wikipedia.org/wiki/Thread_pool),如此設計的考量如下:
1. 在大量的執行緒環境中,建立和銷毀執行緒物件的開銷相當可觀,而且頻繁的執行緒物件建立和銷毀,會對高度並行化的應用程式帶來額外的延遲時間
2. 考慮到硬體的有效處理器數量有限,使用 thread pool 可控管執行分配到處理器的執行緒數量
用醫院來比喻:
* 沒有 thread pool 時: 醫院每天要面對成千上萬的病人,每當一個病人求診,就找一位醫生處理,看診後醫生也跟著離開。當看病時間較短時,醫生來去的時間,顯得尤為費時
* 初步引入 thread pool: 醫院設置門診,把醫生全派出去坐診,病人想看診之前,強制先掛號排隊,醫生根據病人隊列順序,依次處理各個病人,這樣就省去醫生往返的時間。但倘若病患很少,醫生卻過多,這會使得很多醫生閒置,浪費醫療資源
* 改進 thread pool: 門診一開始只派出部分醫生,但增加一位協調人 (現實就是護理師擔任),病人依舊是排隊看病,協調人負責調度醫療資源。當病人很多、醫生忙不過來時,協調人就呼叫更多醫生來幫忙;當病人不多、醫生過多時,協調人就安排部分醫生休息待命,避免醫療資源的浪費
示意圖:
![](https://i.imgur.com/GedtciF.png)
以下是一個 [thread pool](https://en.wikipedia.org/wiki/Thread_pool) 實作:
```cpp
#include <stddef.h>
typedef struct __tpool_future *tpool_future_t;
typedef struct __threadpool *tpool_t;
/**
* Create a thread pool containing specified number of threads.
* If successful, the thread pool is returned. Otherwise, it
* returns NULL.
*/
tpool_t tpool_create(size_t count);
/**
* Schedules the specific function to be executed.
* If successful, a future object representing the execution of
* the task is returned. Otherwise, it returns NULL.
*/
tpool_future_t tpool_apply(tpool_t pool, void *(*func)(void *), void *arg);
/**
* Wait for all pending tasks to complete before destroying the thread pool.
*/
int tpool_join(tpool_t pool);
/**
* Return the result when it becomes available.
* If @seconds is non-zero and the result does not arrive within specified time,
* NULL is returned. Each tpool_future_get() resets the timeout status on
* @future.
*/
void *tpool_future_get(tpool_future_t future, unsigned int seconds);
/**
* Destroy the future object and free resources once it is no longer used.
* It is an error to refer to a destroyed future object. Note that destroying
* a future object does not prevent a pending task from being executed.
*/
int tpool_future_destroy(tpool_future_t future);
#include <errno.h>
#include <pthread.h>
#include <stdlib.h>
#include <time.h>
enum __future_flags {
__FUTURE_RUNNING = 01,
__FUTURE_FINISHED = 02,
__FUTURE_TIMEOUT = 04,
__FUTURE_CANCELLED = 010,
__FUTURE_DESTROYED = 020,
};
typedef struct __threadtask {
void *(*func)(void *);
void *arg;
struct __tpool_future *future;
struct __threadtask *next;
} threadtask_t;
typedef struct __jobqueue {
threadtask_t *head, *tail;
pthread_cond_t cond_nonempty;
pthread_mutex_t rwlock;
} jobqueue_t;
struct __tpool_future {
int flag;
void *result;
pthread_mutex_t mutex;
pthread_cond_t cond_finished;
};
struct __threadpool {
size_t count;
pthread_t *workers;
jobqueue_t *jobqueue;
};
static struct __tpool_future *tpool_future_create(void)
{
struct __tpool_future *future = malloc(sizeof(struct __tpool_future));
if (future) {
future->flag = 0;
future->result = NULL;
pthread_mutex_init(&future->mutex, NULL);
pthread_condattr_t attr;
pthread_condattr_init(&attr);
pthread_cond_init(&future->cond_finished, &attr);
pthread_condattr_destroy(&attr);
}
return future;
}
int tpool_future_destroy(struct __tpool_future *future)
{
if (future) {
pthread_mutex_lock(&future->mutex);
if (future->flag & __FUTURE_FINISHED ||
future->flag & __FUTURE_CANCELLED) {
pthread_mutex_unlock(&future->mutex);
pthread_mutex_destroy(&future->mutex);
pthread_cond_destroy(&future->cond_finished);
free(future);
} else {
future->flag |= __FUTURE_DESTROYED;
pthread_mutex_unlock(&future->mutex);
}
}
return 0;
}
void *tpool_future_get(struct __tpool_future *future, unsigned int seconds)
{
pthread_mutex_lock(&future->mutex);
/* turn off the timeout bit set previously */
future->flag &= ~__FUTURE_TIMEOUT;
while ((future->flag & __FUTURE_FINISHED) == 0) {
if (seconds) {
struct timespec expire_time;
clock_gettime(CLOCK_MONOTONIC, &expire_time);
expire_time.tv_sec += seconds;
int status = pthread_cond_timedwait(&future->cond_finished,
&future->mutex, &expire_time);
if (status == ETIMEDOUT) {
future->flag |= __FUTURE_TIMEOUT;
pthread_mutex_unlock(&future->mutex);
return NULL;
}
} else
FFF;
}
pthread_mutex_unlock(&future->mutex);
return future->result;
}
static jobqueue_t *jobqueue_create(void)
{
jobqueue_t *jobqueue = malloc(sizeof(jobqueue_t));
if (jobqueue) {
jobqueue->head = jobqueue->tail = NULL;
pthread_cond_init(&jobqueue->cond_nonempty, NULL);
pthread_mutex_init(&jobqueue->rwlock, NULL);
}
return jobqueue;
}
static void jobqueue_destroy(jobqueue_t *jobqueue)
{
threadtask_t *tmp = jobqueue->head;
while (tmp) {
jobqueue->head = jobqueue->head->next;
pthread_mutex_lock(&tmp->future->mutex);
if (tmp->future->flag & __FUTURE_DESTROYED) {
pthread_mutex_unlock(&tmp->future->mutex);
pthread_mutex_destroy(&tmp->future->mutex);
pthread_cond_destroy(&tmp->future->cond_finished);
free(tmp->future);
} else {
tmp->future->flag |= __FUTURE_CANCELLED;
pthread_mutex_unlock(&tmp->future->mutex);
}
free(tmp);
tmp = jobqueue->head;
}
pthread_mutex_destroy(&jobqueue->rwlock);
pthread_cond_destroy(&jobqueue->cond_nonempty);
free(jobqueue);
}
static void __jobqueue_fetch_cleanup(void *arg)
{
pthread_mutex_t *mutex = (pthread_mutex_t *) arg;
pthread_mutex_unlock(mutex);
}
static void *jobqueue_fetch(void *queue)
{
jobqueue_t *jobqueue = (jobqueue_t *) queue;
threadtask_t *task;
int old_state;
pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);
while (1) {
pthread_mutex_lock(&jobqueue->rwlock);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
pthread_testcancel();
GGG;
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
if (jobqueue->head == jobqueue->tail) {
task = jobqueue->tail;
jobqueue->head = jobqueue->tail = NULL;
} else {
threadtask_t *tmp;
for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
tmp = tmp->next)
;
task = tmp->next;
tmp->next = NULL;
jobqueue->tail = tmp;
}
pthread_mutex_unlock(&jobqueue->rwlock);
if (task->func) {
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_CANCELLED) {
pthread_mutex_unlock(&task->future->mutex);
free(task);
continue;
} else {
task->future->flag |= __FUTURE_RUNNING;
pthread_mutex_unlock(&task->future->mutex);
}
void *ret_value = task->func(task->arg);
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_DESTROYED) {
pthread_mutex_unlock(&task->future->mutex);
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
} else {
task->future->flag |= KKK;
task->future->result = ret_value;
LLL;
pthread_mutex_unlock(&task->future->mutex);
}
free(task);
} else {
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
free(task);
break;
}
}
pthread_cleanup_pop(0);
pthread_exit(NULL);
}
struct __threadpool *tpool_create(size_t count)
{
jobqueue_t *jobqueue = jobqueue_create();
struct __threadpool *pool = malloc(sizeof(struct __threadpool));
if (!jobqueue || !pool) {
if (jobqueue)
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
pool->count = count, pool->jobqueue = jobqueue;
if ((pool->workers = malloc(count * sizeof(pthread_t)))) {
for (int i = 0; i < count; i++) {
if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch,
(void *) jobqueue)) {
for (int j = 0; j < i; j++)
pthread_cancel(pool->workers[j]);
for (int j = 0; j < i; j++)
pthread_join(pool->workers[j], NULL);
free(pool->workers);
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
}
return pool;
}
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
struct __tpool_future *tpool_apply(struct __threadpool *pool,
void *(*func)(void *),
void *arg)
{
jobqueue_t *jobqueue = pool->jobqueue;
threadtask_t *new_head = malloc(sizeof(threadtask_t));
struct __tpool_future *future = tpool_future_create();
if (new_head && future) {
new_head->func = func, new_head->arg = arg, new_head->future = future;
pthread_mutex_lock(&jobqueue->rwlock);
if (jobqueue->head) {
new_head->next = jobqueue->head;
jobqueue->head = new_head;
} else {
jobqueue->head = jobqueue->tail = new_head;
HHH;
}
pthread_mutex_unlock(&jobqueue->rwlock);
} else if (new_head) {
free(new_head);
return NULL;
} else if (future) {
tpool_future_destroy(future);
return NULL;
}
return future;
}
int tpool_join(struct __threadpool *pool)
{
size_t num_threads = pool->count;
for (int i = 0; i < num_threads; i++)
tpool_apply(pool, NULL, NULL);
for (int i = 0; i < num_threads; i++)
pthread_join(pool->workers[i], NULL);
free(pool->workers);
jobqueue_destroy(pool->jobqueue);
free(pool);
return 0;
}
```
我們使用 [Bailey–Borwein–Plouffe formula](https://en.wikipedia.org/wiki/Bailey%E2%80%93Borwein%E2%80%93Plouffe_formula) 和上述的 thread pool 撰寫求圓周率近似值的程式碼:
```cpp
#include <math.h>
#include <stdio.h>
#define PRECISION 100 /* upper bound in BPP sum */
/* Use Bailey–Borwein–Plouffe formula to approximate PI */
static void *bpp(void *arg)
{
int k = *(int *) arg;
double sum = (4.0 / (8 * k + 1)) - (2.0 / (8 * k + 4)) -
(1.0 / (8 * k + 5)) - (1.0 / (8 * k + 6));
double *product = malloc(sizeof(double));
if (product)
*product = 1 / pow(16, k) * sum;
return (void *) product;
}
int main()
{
int bpp_args[PRECISION + 1];
double bpp_sum = 0;
tpool_t pool = tpool_create(4);
tpool_future_t futures[PRECISION + 1];
for (int i = 0; i <= PRECISION; i++) {
bpp_args[i] = i;
futures[i] = tpool_apply(pool, bpp, (void *) &bpp_args[i]);
}
for (int i = 0; i <= PRECISION; i++) {
double *result = tpool_future_get(futures[i], 0 /* blocking wait */);
bpp_sum += *result;
tpool_future_destroy(futures[i]);
free(result);
}
tpool_join(pool);
printf("PI calculated with %d terms: %.15f\n", PRECISION + 1, bpp_sum);
return 0;
}
```
預期執行輸出:
```
PI calculated with 101 terms: 3.141592653589793
```
請補完程式碼,使執行結果符合預期。
==作答區==
FFF = ?
* `(a)` `pthread_cond_wait(&future->cond_finished, &future->mutex)`
* `(b)` `pthread_cond_wait(&future->mutex, &future->cond_finished)`
* `(c)` `pthread_cond_broadcast(&future->cond_finished, &future->mutex)`
* `(d)` `return NULL`
GGG = ?
* `(a)` `while (!jobqueue->tail) /* wait */`
* `(b)` `/* no operation */`
* `(c)` `while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock)`
* `(d)` `while (!jobqueue->tail) pthread_cond_broadcast(&jobqueue->cond_nonempty)`
HHH = ?
* `(a)` `pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock)`
* `(b)` `pthread_cond_broadcast(&jobqueue->cond_nonempty)`
KKK = ?
* `(a)` `__FUTURE_RUNNING`
* `(b)` `__FUTURE_FINISHED`
* `(c)` `__FUTURE_TIMEOUT`
* `(d)` `__FUTURE_CANCELLED`
* `(e)` `__FUTURE_DESTROYED`
LLL = ?
* `(a)` `pthread_cond_broadcast(&task->future->cond_finished)`
* `(b)` `pthread_cond_wait(&task->future->cond_finished, &future->mutex)`
:::success
延伸問題:
1. 解釋上述程式碼運作原理,包含 timeout 處理機制,指出改進空間並實作
2. 研讀 [atomic_threadpool](https://github.com/Taymindis/atomic_threadpool),指出其 atomic 操作的運用方式,並說明該 lock-free 的手法
3. 嘗試使用 C11 Atomics 改寫上述程式碼,使其有更好的 scalability
:::