owned this note
owned this note
Published
Linked with GitHub
# 2021q1 Homework4 (quiz4)
contributed by < `RainbowEye0486` >
> [題目](https://hackmd.io/@sysprog/linux2021-quiz4)
## 測驗一
### 解釋上述程式碼運作原理,包含 timeout 處理機制,指出改進空間並實作
對於 leibniz 法得到 π 的程式碼解釋
```cpp=
double compute_pi_leibniz(size_t N)
{
double pi = 0.0;
for (size_t i = 0; i < N; i++) {
double tmp = (i & 1) ? (-1) : 1;
pi += tmp / (2 * i + 1);
}
return pi * 4.0;
}
```
根據 $$arctan(1)=\frac{π}{4}=1−\frac{1}{3}+\frac{1}{5}−\frac{1}{7}+ ...$$ 我們可以知道想要得到 π 可以透過等差數列和求得,其中 N 代表此數列共有幾項,當然 N 愈大得到的數字就愈接近 π 。
- **line5**: `tmp` 一開始的時候根據此項是奇數項或是偶數項來決定分子是 +1 或是 -1
- **line6**: `tmp` 除分母1、3、5、7.....,再把每一項總和,可以得到 $\frac{π}{4}$ 的近似值。
$$\sum_{k=1}^{\infty} \frac{1}{16^k}(\frac{4}{8k+1}-\frac{2}{8k+4}-\frac{1}{8k+5}-\frac{1}{8k+6})$$
:::info
題目中提到典型的圓周率演算法必須計算前面的 n-1 位才能夠計算,代表了數列每一項之間具相依性。以公式 $$arctan(1)=\frac{π}{4}=1−\frac{1}{3}+\frac{1}{5}−\frac{1}{7}+ ...$$ 以及 $$\sum_{k=1}^{\infty} \frac{1}{16^k}(\frac{4}{8k+1}-\frac{2}{8k+4}-\frac{1}{8k+5}-\frac{1}{8k+6})$$
來說,因為每一項的計算都只跟當下的 k 值有關,所以每個獨立的項都可以平行計算,但是利用公式$$\frac{\pi}{2}=\frac{2}{1}×\frac{2}{3}×\frac{4}{3}×\frac{4}{5}×\frac{6}{5}×...$$
實做的其中一種方式如下:
```cpp
double pi(size_t N)
{
double pi = 1;
double n = 1;
for(size_t j = 1;j <= N; j++, n++){
if(j & 1 == 0){
pi *= (n / (n + 1));
}else{
pi *= ((n + 1) / n);
}
}
return 2*pi;
}
```
因為算出當前項之前需要先知道前面連乘得到的值 `pi` ,所以無法對每一項做並行運算
:::
#### 程式碼解讀
在討論 thread pool 的實做之前,大致上先看 bpp function 是如何實做並行處理的計算 pi。
```cpp
#define PRECISION 100
```
- 迭代次數,決定了精度
```cpp
static void *bpp(void *arg)
{
int k = *(int *) arg;
double sum = (4.0 / (8 * k + 1)) - (2.0 / (8 * k + 4)) -
(1.0 / (8 * k + 5)) - (1.0 / (8 * k + 6));
double *product = malloc(sizeof(double));
if (product)
*product = 1 / pow(16, k) * sum;
return (void *) product;
}
```
- arg 是一個指向 void 的指標,先將其當做指向 int 的指標的方式解讀(轉型),再用 `*` 加在前面的方式取值。
- `sum` 根據 bpp 公式計算出當前項 $\frac{4}{8k+1}-\frac{2}{8k+4}-\frac{1}{8k+5}-\frac{1}{8k+6}$
- `product` 回傳一個 void pointer ,內容是計算過的 $\frac{1}{16^k}(\frac{4}{8k+1}-\frac{2}{8k+4}-\frac{1}{8k+5}-\frac{1}{8k+6})$ ,變數可以是其他型態的時候,我們可以先將這個變數設定為 pointer to void 的型態,所以在不確定型態的時候用 pointer to void 是比較彈性的作法。
:::warning
用 pointer to void (注意書寫方式) 不只是「比較有彈性」,而是要求 dereference 時,務必「先轉型 (cast) 再取值 (dereference)」,這樣可避免不正確的轉型而導致存取錯誤。
:notes: jserv
:::
```cpp
int main()
{
int bpp_args[PRECISION + 1];
double bpp_sum = 0;
tpool_t pool = tpool_create(4);
tpool_future_t futures[PRECISION + 1];
for (int i = 0; i <= PRECISION; i++) {
bpp_args[i] = i;
futures[i] = tpool_apply(pool, bpp, (void *) &bpp_args[i]);
}
for (int i = 0; i <= PRECISION; i++) {
double *result = tpool_future_get(futures[i], 0 /* blocking wait */);
bpp_sum += *result;
tpool_future_destroy(futures[i]);
free(result);
}
tpool_join(pool);
printf("PI calculated with %d terms: %.15f\n", PRECISION + 1, bpp_sum);
return 0;
}
```
到了 main 用到許多 thread pool 的操作,但是我們現在還不知道有關 thread pool 的函式定義,所以先猜測做了什麼事。 `bpp_args[]` 代表當前處理的是第 "k" 項的 bbp ,接著將這個 k 跟 bpp function 一起丟入 tpool 中分配,做完的 thread 會把結果丟到 `future[]` 中,這邊猜測是誰先做完就先放,沒有順序的差別,而 `bpp_sum` 會一直累加直到 `tpool_future` 裡面已經運算完的項都已經被加上去了,這時候會等待直到新的項運算完畢或是全部項都算完。
口說無憑,回去看 thread pool function 的實做是不是跟預期一樣
```cpp
/**
* Create a thread pool containing specified number of threads.
* If successful, the thread pool is returned. Otherwise, it
* returns NULL.
*/
tpool_t tpool_create(size_t count)
{
jobqueue_t *jobqueue = jobqueue_create();
struct __threadpool *pool = malloc(sizeof(struct __threadpool));
if (!jobqueue || !pool) {
if (jobqueue)
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
pool->count = count, pool->jobqueue = jobqueue;
if ((pool->workers = malloc(count * sizeof(pthread_t)))) {
for (int i = 0; i < count; i++) {
if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch,
(void *) jobqueue)) {
for (int j = 0; j < i; j++)
pthread_cancel(pool->workers[j]);
for (int j = 0; j < i; j++)
pthread_join(pool->workers[j], NULL);
free(pool->workers);
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
}
return pool;
}
jobqueue_destroy(jobqueue);
free(pool);
return NULL;
}
```
- 利用 `jobqueue_create` 創建一個新的 jobqueue ,並建立新的 threadpool
- 如果建立失敗則刪除 `jobqueue` 、 `pool` ,回傳 NULL
- 在 pool 中總共會有 `count` 個 pthread ,這邊把它叫做 workers
- `pthread_create` : 在當前的 process 中開啟一個新的 thread ,這邊我們在 attr 參數給了 NULL ,表示使用預設的方式初始化,並且我們將 `jobqueue_fetch` 當作 start_routine ,表示 thread 一啟動就會執行這個函式。
- `pthread_cancel` : 對 thread 發出取消的信號,thead 的回應基於 `state` 和 `type` 參數對他的控制
- `pthread_join` : 等待 thread 中止後才會回傳,回傳0成功/ error number 失敗
- 如果 `pthread_create` 建立成功,回傳0,任何一次的 pthread 建立失敗都會取消掉所有 thread 並回傳 NULL
```cpp
/**
* Schedules the specific function to be executed.
* If successful, a future object representing the execution of
* the task is returned. Otherwise, it returns NULL.
*/
tpool_future_t tpool_apply(tpool_t pool, void *(*func)(void *), void *arg)
{
jobqueue_t *jobqueue = pool->jobqueue;
threadtask_t *new_head = malloc(sizeof(threadtask_t));
struct __tpool_future *future = tpool_future_create();
if (new_head && future) {
new_head->func = func, new_head->arg = arg, new_head->future = future;
pthread_mutex_lock(&jobqueue->rwlock);
if (jobqueue->head) {
new_head->next = jobqueue->head;
jobqueue->head = new_head;
} else {
jobqueue->head = jobqueue->tail = new_head;
HHH;
}
pthread_mutex_unlock(&jobqueue->rwlock);
} else if (new_head) {
free(new_head);
return NULL;
} else if (future) {
tpool_future_destroy(future);
return NULL;
}
return future;
}
```
- 傳入 pool, \*(\*func)(void \*) **(宣告一個指標指向一個輸入參數型態是 void * ,回傳型態也是 void * 的函式)**、 `*arg`
- 如果 `new_head` ( threadtask_t ) `future` ( \_\_tpool_future ) 建立成功,`new_head` 設定好參數(函式名字、當下的 k 值以及 future )後用 mutex lock 進入 `jobqueqe` 的 critical section
- 將這個初始化後的 `new_head` 加入 `jobqueqe` 中
- ==**HHH = pthread_cond_broadcast(&jobqueue->cond_nonempty);**== : 如果當下的 `jobqueqe` 是空的,除了要把 head 、 tail 都設置成 `new_head` 之外,還要跟大家宣告現在 jobqueue 已經不是空的,此時設置成功不該回傳 NULL ,也不是等候的狀況
```cpp
/**
* Wait for all pending tasks to complete before destroying the thread pool.
*/
int tpool_join(tpool_t pool)
{
size_t num_threads = pool->count;
for (int i = 0; i < num_threads; i++)
tpool_apply(pool, NULL, NULL);
for (int i = 0; i < num_threads; i++)
pthread_join(pool->workers[i], NULL);
free(pool->workers);
jobqueue_destroy(pool->jobqueue);
free(pool);
return 0;
}
```
- 以程式碼為例,這邊的 pool 一共建立 4 條 thread
- 先將 thread 執行的工作變成 NULL
- 等待 thread 中止後 free 掉 thread 、 pool 跟 jobqueue
```cpp
/**
* Return the result when it becomes available.
* If @seconds is non-zero and the result does not arrive within specified time,
* NULL is returned. Each tpool_future_get() resets the timeout status on
* @future.
*/
void *tpool_future_get(tpool_future_t future, unsigned int seconds)
{
pthread_mutex_lock(&future->mutex);
/* turn off the timeout bit set previously */
future->flag &= ~__FUTURE_TIMEOUT;
while ((future->flag & __FUTURE_FINISHED) == 0) {
if (seconds) {
struct timespec expire_time;
clock_gettime(CLOCK_MONOTONIC, &expire_time);
expire_time.tv_sec += seconds;
int status = pthread_cond_timedwait(&future->cond_finished,
&future->mutex, &expire_time);
if (status == ETIMEDOUT) {
future->flag |= __FUTURE_TIMEOUT;
pthread_mutex_unlock(&future->mutex);
return NULL;
}
} else
FFF;
}
```
- 傳入 future 指標以及一個表示超時限制的秒數 `second`
- 一開始先將 flag 中 `__FUTURE_TIMEOUT` 相關的 set bit (bit 2) 透過遮罩設置成 0 ,flag 中的其他 bit 保留原樣,代表清除之前的 timeout 狀態。
- 可以注意到在 main 的實做中,我們傳入的 second 是 0,這代表著程式只會執行 else block 的 ==**FFF=pthread_cond_wait(&future->cond_finished, &future->mutex);**== 這段而已,並不會執行 if(seconds) 裡面的內容,換句話說,無論如何我們都要等到 result 計算出來後才會回傳,不會用到 timeout 機制
- [clock_gettime](https://linux.die.net/man/3/clock_gettime) : 根據 [linux man page](https://linux.die.net/man/3/clock_gettime) ,傳入兩個參數— `clockid_t clk_id` , `struct timespec *tp` ,前者根據不同的條件設定 Epoch ,可以解釋為開始計算時間的參考點,這邊使用的是 `CLOCK_MONOTONIC` ,起始時間點不能夠被確定(概念類似相對參考點),但是針對系統不連續的時間跳動(通常是管理員為了同步之類的目的調快/慢時間)不會影響
> CLOCK_REALTIME
> System-wide clock that measures real (i.e., wall-clock) time.
> Setting this clock requires appropriate privileges. This clock is affected by discontinuous jumps in the system time (e.g., if the system administrator manually changes the clock), and by the incremental adjustments performed by adjtime(3) and NTP.
>
> CLOCK_MONOTONIC
> Clock that cannot be set and represents monotonic time since some unspecified starting point. This clock is not affected by discontinuous jumps in the system time (e.g., if the system administrator manually changes the clock), but is affected by the incremental adjustments performed by adjtime(3) and NTP.
- `expire_time` : 屬於 [time.h](https://man7.org/linux/man-pages/man0/time.h.0p.html) 中的 `struct timespec` ,可以紀錄秒數或是奈秒兩種數量級,在這裡用來表示能夠接受的 deadline ,加上 `seconds` 後代表從 `gettime` 之後過了 `seconds` 秒後過期。
- [pthread_cond_timedwait](https://linux.die.net/man/3/pthread_cond_timedwait) / [pthread_cond_wait](https://linux.die.net/man/3/pthread_cond_wait) : pthread_cond_timedwait 有三個輸入參數,分別是 `pthread_cond_t *restrict cond` 、 `pthread_mutex_t *restrict mutex` 、 `const struct timespec *restrict abstime` ,根據 man page :
> The **pthread_cond_timedwait()** function shall be equivalent to **pthread_cond_wait()**, except that an error is returned if the absolute time specified by abstime passes (that is, system time equals or exceeds abstime) before the condition cond is signaled or broadcasted, or if the absolute time specified by abstime has already been passed at the time of the call.
我們可以知道兩者的差別是時間的限制,如果 system time 已經到了 `expire_time` 的時間,而 cond 狀態仍沒有完成,就回傳 error `ETIMEDOUT`
```cpp
static void *jobqueue_fetch(void *queue)
{
jobqueue_t *jobqueue = (jobqueue_t *) queue;
threadtask_t *task;
int old_state;
pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);
while (1) {
pthread_mutex_lock(&jobqueue->rwlock);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
pthread_testcancel();
GGG;
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
if (jobqueue->head == jobqueue->tail) {
task = jobqueue->tail;
jobqueue->head = jobqueue->tail = NULL;
} else {
threadtask_t *tmp;
for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
tmp = tmp->next)
;
task = tmp->next;
tmp->next = NULL;
jobqueue->tail = tmp;
}
pthread_mutex_unlock(&jobqueue->rwlock);
if (task->func) {
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_CANCELLED) {
pthread_mutex_unlock(&task->future->mutex);
free(task);
continue;
} else {
task->future->flag |= __FUTURE_RUNNING;
pthread_mutex_unlock(&task->future->mutex);
}
void *ret_value = task->func(task->arg);
pthread_mutex_lock(&task->future->mutex);
if (task->future->flag & __FUTURE_DESTROYED) {
pthread_mutex_unlock(&task->future->mutex);
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
} else {
task->future->flag |= KKK;
task->future->result = ret_value;
LLL;
pthread_mutex_unlock(&task->future->mutex);
}
free(task);
} else {
pthread_mutex_destroy(&task->future->mutex);
pthread_cond_destroy(&task->future->cond_finished);
free(task->future);
free(task);
break;
}
}
pthread_cleanup_pop(0);
pthread_exit(NULL);
}
```
- 從 jobqueue 尾端抓取一個 task
- [pthread_cleanup_push](https://man7.org/linux/man-pages/man3/adjtime.3.html) : 將指定的 routine push 進去 thread 的 clean-up handlers stack 中處理當 thread 取消或是中止的時候,並以 `arg` 作為傳入參數,當被 pop 出來的時候,以下三種情形決定是否執行 handler:
1. thread cancelled
2. thread 呼叫 `pthread_exit` 中止的時候,但是不適用於主動執行 return 的情形
3. 如果被 pop 出來的時候傳入參數 `int execute` 不為零,則直接執行。
- 鎖上 mutex lock
- [pthread_setcancelstate](https://man7.org/linux/man-pages/man3/pthread_setcancelstate.3.html) : 第一個傳入參數 `state` 決定當前 thread 可不可以被取消(設定為可以),並且把舊的狀態儲存在 buffer `oldstate` 中。
- [pthread_testcancel](https://man7.org/linux/man-pages/man3/pthread_testcancel.3.html) : 在 thread 中創建一個取消點,原本沒有取消點的 thread 便可以被取消,此函式必定執行成功。
- ==**GGG = while (!jobqueue->tail)
pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);**== ,在 jobqueue 空的情況下,會先等待新的 task 被塞進來,才會執行接下來的動作
- 確定要執行任務後,先讓 pthread 不能夠被取消,因為這樣事情做到一半可能出現非預期結果。
- 如果 `head` == `tail` ,代表 jobqueue 只剩下最後一個 task , pop 後 jobqueue empty ,其他情形從 tail 取出
- 對 task 狀態進行處理,將 `flag` 設置為 RUNNING 的狀態,然後執行 `func(task->arg)` ,並將結果儲存進 `re_value` 中
- 如果 task->future 的狀態不是已經被銷毀,表示計算出來的結果是可以被儲存的,因此根據 flag 的性質,==**KKK = __FUTURE_FINISHED**== ,表示回傳值已經計算完畢並儲存,隨時可以將裡面的值取走; ==**LLL = pthread_cond_broadcast(&task->future->cond_finished);**== 則對應到 `tpool_future_get` 中的
```cpp
status = pthread_cond_timedwait(&future->cond_finished,
&future->mutex, &expire_time);
```
告訴 `tpool_future_get` 這個阻塞的 thread 已經可以取出計算完的值了
:::info
最後,按照順序釐清一下整段程式碼的運作原理。
1. 建立一個 thread pool ,裡面有四條執行緒跟一個 jobqueue ,每條執行緒做的事情都一樣,就是從 jobqueue 的尾端抓一個 task 出來做
2. `tpool_apply` 做了兩件事, push 一個 task 進 jobqueue 跟回傳儲存 future
3. 兩個情形下 thread 會阻塞等待條件滿足,第一個是如果 jobqueue 裡面還沒有 task 的話等待,以及如果當前 future 的值還沒有算出來的話,也是等待
:::
編譯的時候注意根據 pthread 跟 `pow` 的使用需要額外 link
```shell
$ gcc -pthread -o bbp bbp.c -lm
```
才能編譯過
#### bpp 乘法化簡
原先程式碼:
```cpp
static void *bpp(void *arg)
{
int k = *(int *) arg;
double sum = (4.0 / (8 * k + 1)) - (2.0 / (8 * k + 4)) -
(1.0 / (8 * k + 5)) - (1.0 / (8 * k + 6));
double *product = malloc(sizeof(double));
if (product)
*product = 1 / pow(16, k) * sum;
return (void *) product;
}
```
公式中的乘法皆可改為左移的操作
```cpp
static void *bpp(void *arg)
{
int k = *(int *) arg;
double sum = (4.0 / ((k << 3)+ 1)) - (2.0 / ((k << 3) + 4)) -
(1.0 / ((k << 3) + 5)) - (1.0 / ((k << 3) + 6));
double *product = malloc(sizeof(double));
if (product)
*product = 1 / pow(16, k) * sum;
return (void *) product;
}
```
#### jobqueue pop task 的改進
從 `jobqueue_fetch` pop task 的方式
```cpp
for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
tmp = tmp->next)
;
```
可以發現,如果整個 jobqueue 超級長,可能會讓這個 for 迴圈跑很久,因此利用 doubly-linked list 的特性,在 `threadtask_t` 結構中加上 `*prev` ,並且替換掉 for 迴圈
```cpp
tmp = jobqueue->tail->prev;
```
#### 改變 `tpool_future_get` 的等待方式
目前的實做的等待方式是從 future[0] 開始等待,算出答案之後累加才繼續看下一個 future 有沒有做完,但是實際上我們並不知道哪個 future 的值先做好了可以拿,做實驗驗證發現 future 完成順序
```
0 1 4 2 5 8 9 10 11 12 13 14 15 16 7 6 3 17 18 19 20 23 24 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
42 43 25 21 22 44 47 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
...
...
...
99 100 48 87 46
```
可以發現雖然我們是按照順序將 bbp 的每一項 push 進 jobqueue 中,但是執行卻不完全按照順序,甚至有些項完成的時間非常晚 (如 k=46) 假如我們想要讓精度提高,設定 `PRECISION=500` ,則發生第 387 項最後才完成,導致後面的 100 多項都必須等待。
```
496 497 498 499 500 479 476 387
```
因此,考慮利用一個 readqueue 來決定讀取順序,原本利用 `tpool_future_get` 來取得當前這個 future 裡面的值,能不能取用取決於
```cpp
pthread_cond_wait(&future->cond_finished, &future->mutex);
```
阻塞 thread 直到條件滿足,也就是 future 完成
我的想法是只要4條 pthread 的其中一個將 future 值算出來後,就獲取 readqueue 的 mutex lock 並且將這個 future push 進去,讀取的時候一樣從尾端抓取 future 裡面的 `value` 並累加,好處是我們永遠可以從已經算好的
嘗試實做得到錯誤:
```shell
free?Fetch task : 499
free?Fetch task : 500
free?Fetch task : 393
free?Fetch task : 419
free?Fetch task : 495
double free or corruption (out)
```
遇到了重複釋放記憶體空間的問題。
利用 Valgrind 分析記憶體配置情形後,發現在原本 jobqueue 計算完 future value 後會把 task 刪掉,但是由於我在 readqueue 會再刪掉一次,所以造成了雙重釋放,令一個原因是由於多線程並行的關係,的確有可能 jobqueue 想要刪掉 task 時,獲取到的只是 task 的指標,但是內容已經被 readqueue free 掉了
:::info
解決方式: jobqueue 這邊先不釋放記憶體,等到 readqueue 讀取完畢後一併釋放
:::
接著,遇到另一個問題。有時候程式在執行到一半的時候會直接卡住,但是偶爾能夠全部執行完畢,錯誤訊息是
```
__lll_lock_wait()
```
原因是獲取 lock 的時候失敗了,查詢 [pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex)](https://linux.die.net/man/3/pthread_cond_wait) 函式的定義中 :
> The pthread_cond_timedwait() and pthread_cond_wait() functions shall block on a condition variable. They shall be called with mutex locked by the calling thread or undefined behavior results.
>
> These functions atomically release mutex and cause the calling thread to block on the condition variable cond; atomically here means "atomically with respect to access by another thread to the mutex and then the condition variable". That is, if another thread is able to acquire the mutex after the about-to-block thread has released it, then a subsequent call to pthread_cond_broadcast() or pthread_cond_signal() in that thread shall behave as if it were issued after the about-to-block thread has blocked.
`pthread_cond_wait()` 本身有兩個傳入參數:
1. 條件變數,也就是說當前的 thread 會持續阻塞直到其他 thread 調用 broadcast 或是 signal 告訴 `pthread_cond_wait()` 不用再等待了
2. 傳入的互斥鎖,調用 `pthread_cond_wait()` 會讓原本已經獲得的互斥鎖被釋放給其他 thread 使用,直到條件變數被滿足後才會重新嘗試取得該互斥鎖
檢查自己寫的原始碼後,發現是因為對 function 的理解錯誤而在重寫的時候把順序變成了
```cpp
while (!readqueue->tail){
pthread_cond_wait(&readqueue->cond_nonempty, &readqueue->rwlock);
}
pthread_mutex_lock(&readqueue->rwlock);
```
因此發生 deadlock ,原本順序應該是
```cpp
pthread_mutex_lock(&readqueue->rwlock);/*獲得讀寫互斥鎖*/
while (!readqueue->tail){/*如果沒有進迴圈( queue 不為空),則仍然保有互斥鎖*/
pthread_cond_wait(&readqueue->cond_nonempty, &readqueue->rwlock);
/*需要等待新的 task 進來,此時釋放互斥鎖,並等待條件變數更新,更新後重新獲得互斥鎖*/
...
/*從 queue 尾端抓取新的 task */
}
```
:::info
由此可見,對於多執行緒和並行的程式碼,不能單純只是用中斷點或是印出訊息來測試,因為執行順序的不同導致排除問題變難,應了解原理才能分析出問題所在
:::
#### 減少記憶體開銷
實做過的新方式不需要用到 `tpool_future` 中的 `pthread_cond_t cond_finished` ,原本這個條件變量是要在讀取 future 時判定內部計算的值是否計算完成,如果完成才會加到 result 中,否則等待直到 thread 計算完畢並 broadcast ,實做的程式碼確認這個 task 執行完畢,也就是內部的值已經計算完成後才加入 readqueue ,因此等到主執行緒讀取時 future->value 已經計算完成
#### 效能分析
==**經由 Valgrind 分析記憶體容量變化**==
設定
```
PRECISION = 5000
```
原本程式碼 :
![](https://i.imgur.com/RcG4RTn.png)
實做後的程式碼 :
![](https://i.imgur.com/Omc2eMu.png)
節省 22% 記憶體消耗
在 `tool_future_create()` 花費的記憶體容量降低,原因是移除 `pthread_cond_t cond_finished` , `tpool_apply()` 增加少量記憶體,原因是在結構 `threadtask_t` 中新增一個紀錄上一個 task 的指標,且總記憶體用量是下降的
:::info
**補充**
後來查了一下為甚麼可以省下如此多的記憶體,因為一個指標的大小也就只有8 byte 而已,而互斥鎖系列的 `mutex_cond_t` 卻足足用了 48 byte !這其實也很合理,因為在實際實作上的互斥鎖十分複雜,內部不僅有個特化過的 spinlock ,還有自己的 wait queue 跟休眠機制,難怪如此佔記憶體空間。
:::
==**利用 perf 分析效能**==
```shell
$ sudo perf stat ./bbpModefy
PI calculated with 101 terms: 3.141592653589793
Performance counter stats for './bbpModefy':
1.41 msec task-clock # 0.771 CPUs utilized
10 context-switches # 0.007 M/sec
0 cpu-migrations # 0.000 K/sec
101 page-faults # 0.072 M/sec
<not supported> cycles
<not supported> instructions
<not supported> branches
<not supported> branch-misses
0.001822187 seconds time elapsed
0.001663000 seconds user
0.000000000 seconds sys
$ sudo perf stat ./bbp
PI calculated with 101 terms: 3.141592653589793
Performance counter stats for './bbp':
1.72 msec task-clock # 0.805 CPUs utilized
192 context-switches # 0.112 M/sec
0 cpu-migrations # 0.000 K/sec
98 page-faults # 0.057 M/sec
<not supported> cycles
<not supported> instructions
<not supported> branches
<not supported> branch-misses
0.002136373 seconds time elapsed
0.000000000 seconds user
0.001949000 seconds sys
```
執行速度約快14.7%,其中
:::success
**TODO**
1. 弄清楚 `product` 為何要先用 `malloc` 分配記憶體而 `sum` 卻不用
2. 為何函式名字是 bpp 而不是 bbp 呢?
3. 使用 gnplot 視覺化呈現
:::
> 函式名稱是筆誤,請提交 pull request
> :notes: jserv
---
### 研讀 [atomic_threadpool](https://github.com/Taymindis/atomic_threadpool),指出其 atomic 操作的運用方式,並說明該 lock-free 的手法
### 嘗試使用 C11 Atomics 改寫上述程式碼,使其有更好的 scalability
###### tags: `linux2021`