# 2021q1 Homework4 (quiz4) contributed by < [Julian-Chu](https://github.com/Julian-Chu) > ###### tags: `linux2021` > [GitHub](https://github.com/Julian-Chu/linux2021-quiz4) ## 程式碼運作原理 ### struct threadpool 自定義了主要四種物件 thread pool, job queue, task, future ![](https://i.imgur.com/ZQQ6pcF.png) ### thread pool: 最外層的物件 包含 job queue, workers(pthread) ```cpp struct __threadpool { size_t count; pthread_t *workers; jobqueue_t *jobqueue; }; ``` #### 建立 pool 同時建立 job queue 跟 workers ```cpp struct __threadpool *tpool_create(size_t count) { jobqueue_t *jobqueue = jobqueue_create(); struct __threadpool *pool = malloc(sizeof(struct __threadpool)); if (!jobqueue || !pool) { if (jobqueue) jobqueue_destroy(jobqueue); free(pool); return NULL; } pool->count = count, pool->jobqueue = jobqueue; if ((pool->workers = malloc(count * sizeof(pthread_t)))) { for (int i = 0; i < count; i++) { // pthread_create 回傳 0, 代表 pthread 建立成功, 不會繼續執行 if block 內程式碼, // 如果建立 pthread 失敗, 回傳 nonzero error code, // 將到目前為止建立的所有 pthread 取消並終止 if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch, (void *) jobqueue)) { for (int j = 0; j < i; j++) pthread_cancel(pool->workers[j]); for (int j = 0; j < i; j++) pthread_join(pool->workers[j], NULL); free(pool->workers); jobqueue_destroy(jobqueue); free(pool); return NULL; } } return pool; } jobqueue_destroy(jobqueue); free(pool); return NULL; } ``` 在以上程式碼中特別需要注意的是 `pthread_cancel` 的使用, `pthread_cancel` 只是單純發出 cancellation request , 還需要執行 `pthread_join` 才能確保 cancellation 是完成的。 `man pthread_cancel(3)`: >After a canceled thread has terminated, a join with that thread using pthread_join(3) obtains PTHREAD_CANCELED as the thread's exit status. (Joining with a thread is the only way to know that cancellation has completed.) `man pthread_cancel(3)` 還有特別提到發出 cancellation request 後, pthread 會執行的清理步驟: >When a cancellation requested is acted on, the following steps occur for thread (in this order): >1. Cancellation clean-up handlers are popped (in the reverse of the order in which they were pushed) and called. (See pthread_cleanup_push(3).) >2. Thread-specific data destructors are called, in an unspecified order. (See pthread_key_create(3).) >3. The thread is terminated. (See pthread_exit(3).) >The above steps happen asynchronously with respect to the pthread_cancel() call; the return status of pthread_cancel() merely informs the caller whether the cancellation request was successfully queued. 可以利用 step 1 提到的 pthread_cleanup_push 對 pthread 內特定資源做釋放的動作 e.g. graceful shutdown #### 將要執行的任務放入 pool 中 ```c struct __tpool_future *tpool_apply(struct __threadpool *pool, void *(*func)(void *), void *arg) { jobqueue_t *jobqueue = pool->jobqueue; threadtask_t *new_head = malloc(sizeof(threadtask_t)); struct __tpool_future *future = tpool_future_create(); // 成功, 將 task 放入 job queue if (new_head && future) { new_head->func = func, new_head->arg = arg, new_head->future = future; pthread_mutex_lock(&jobqueue->rwlock); if (jobqueue->head) { new_head->next = jobqueue->head; jobqueue->head = new_head; } else { // job queue為空的狀態, 放入 task 後, // 藉由 pthread_cond_broadcat 通知 // 因為 empty job queue 阻塞住的 worker 繼續作業 jobqueue->head = jobqueue->tail = new_head; pthread_cond_broadcast(&jobqueue->cond_nonempty); } pthread_mutex_unlock(&jobqueue->rwlock); // 創建 task 或 future 失敗, 釋放記憶體空間, 回傳 NULL } else if (new_head) { free(new_head); return NULL; } else if (future) { tpool_future_destroy(future); return NULL; } return future; } ``` 特別注意`pthread_cond_broadcast(&jobqueue->cond_nonempty)` >These functions shall unblock threads blocked on a condition variable. >The pthread_cond_broadcast() function shall unblock all threads currently blocked on the specified condition variable cond. 留意到`&jobqueue->cond_nonempty`, 在 `jobqueue_fetch`中, 會利用 `&jobqueue->cond_nonempty` 來阻塞 while(1) loop, 避免在沒有待辦任務的時候 while(1) 迴圈空跑消耗 CPU 資源, 當這邊開始放入任務的時候會通知 `jobqueue_fetch` 可以開始領任務了。 #### 等待所有 pthread 結束, 釋放所有資源 ```cpp int tpool_join(struct __threadpool *pool) { size_t num_threads = pool->count; for (int i = 0; i < num_threads; i++) tpool_apply(pool, NULL, NULL); for (int i = 0; i < num_threads; i++) pthread_join(pool->workers[i], NULL); free(pool->workers); jobqueue_destroy(pool->jobqueue); free(pool); return 0; } ``` 留意到 `tpool_apply(pool, NULL, NULL)`, 這邊對每一個 pthread 的最後一個 task 都放入了 NULL function pointer, 讓 worker 可以進入以下的 `jobqueue_fetch` 程式碼區塊, 中止 while 迴圈, 結束 `worker(jobqueue_fetch)` ```cpp } else { pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); free(task); break; } ``` 還是一樣要注意 pthread_join的使用, 確保 pthread terminated >After a successful call to pthread_join(), the caller is guaranteed that the target thread has terminated. The caller may then choose to do any clean-up that is required after termination of the thread (e.g., freeing memory or other resources that were allocated to the target thread). ### job queue: 可以想成是 FIFO 的 todo list , 負責待辦任務的管理以及並行情況下的 lock 處理, ```cpp typedef struct __jobqueue { threadtask_t *head, *tail; pthread_cond_t cond_nonempty; pthread_mutex_t rwlock; } jobqueue_t; ``` #### 建立 job queue ```cpp static jobqueue_t *jobqueue_create(void) { jobqueue_t *jobqueue = malloc(sizeof(jobqueue_t)); if (jobqueue) { jobqueue->head = jobqueue->tail = NULL; pthread_cond_init(&jobqueue->cond_nonempty, NULL); pthread_mutex_init(&jobqueue->rwlock, NULL); } return jobqueue; } ``` 需要注意 condition variable 跟 mutex 的初始化步驟跟是否需要特別類型的 mutex。 #### destroy job queue ```cpp static void jobqueue_destroy(jobqueue_t *jobqueue) { threadtask_t *tmp = jobqueue->head; while (tmp) { jobqueue->head = jobqueue->head->next; pthread_mutex_lock(&tmp->future->mutex); if (tmp->future->flag & __FUTURE_DESTROYED) { pthread_mutex_unlock(&tmp->future->mutex); pthread_mutex_destroy(&tmp->future->mutex); pthread_cond_destroy(&tmp->future->cond_finished); free(tmp->future); } else { tmp->future->flag |= __FUTURE_CANCELLED; pthread_mutex_unlock(&tmp->future->mutex); } free(tmp); tmp = jobqueue->head; } pthread_mutex_destroy(&jobqueue->rwlock); pthread_cond_destroy(&jobqueue->cond_nonempty); free(jobqueue); } ``` #### fetch job from job queue: 主要的工作 function, 每個 worker 藉由這個 function 取得 task, 並做結果的運算 ```cpp // 與 pthread_cleanup_push 跟 pthread_cleanup_pop並用 // 確保 pthread 取消或是終止時, 可以正確的釋放 lock 避免造成 deadlock static void __jobqueue_fetch_cleanup(void *arg) { pthread_mutex_t *mutex = (pthread_mutex_t *) arg; pthread_mutex_unlock(mutex); } static void *jobqueue_fetch(void *queue) { jobqueue_t *jobqueue = (jobqueue_t *) queue; threadtask_t *task; int old_state; pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock); while (1) { pthread_mutex_lock(&jobqueue->rwlock); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state); pthread_testcancel(); while(!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state); if (jobqueue->head == jobqueue->tail) { task = jobqueue->tail; jobqueue->head = jobqueue->tail = NULL; } else { threadtask_t *tmp; for (tmp = jobqueue->head; tmp->next != jobqueue->tail; tmp = tmp->next) ; task = tmp->next; tmp->next = NULL; jobqueue->tail = tmp; } pthread_mutex_unlock(&jobqueue->rwlock); if (task->func) { pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&task->future->mutex); free(task); continue; } else { task->future->flag |= __FUTURE_RUNNING; pthread_mutex_unlock(&task->future->mutex); } void *ret_value = task->func(task->arg); pthread_mutex_lock(&task->future->mutex); if (task->future->flag & __FUTURE_DESTROYED) { pthread_mutex_unlock(&task->future->mutex); pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); } else { task->future->flag |= __FUTURE_FINISHED; task->future->result = ret_value; pthread_cond_broadcast(&task->future->cond_finished); pthread_mutex_unlock(&task->future->mutex); } free(task); } else { pthread_mutex_destroy(&task->future->mutex); pthread_cond_destroy(&task->future->cond_finished); free(task->future); free(task); break; } } pthread_cleanup_pop(0); pthread_exit(NULL); } ``` - task 的取得, 執行, 銷毀 - 從 job queue 裡面取得 task 需使用 job queue lock, 避免取到重覆的 task - task lock 的使用 - cleanup 的使用 - pthread_exit - pthread_setcancelstate - future 狀態的改變 `pthread_setcancelstate` >The pthread_setcancelstate() sets the cancelability state of the calling thread to the value given in state. The previous cancelability state of the thread is returned in the buffer pointed to by oldstate. The state argument must have one of the following values: ### task: 要分配給 worker 執行任務的內容, 包含要執行的函數以及相關參數, 待回傳的結果, 以及下一個任務 ```cpp typedef struct __threadtask { void *(*func)(void *); void *arg; struct __tpool_future *future; struct __threadtask *next; } threadtask_t; ``` ### future: 對應 task 的執行狀況跟回傳結果 ```cpp struct __tpool_future { int flag; void *result; pthread_mutex_t mutex; pthread_cond_t cond_finished; }; enum __future_flags { __FUTURE_RUNNING = 01, __FUTURE_FINISHED = 02, __FUTURE_TIMEOUT = 04, __FUTURE_CANCELLED = 010, __FUTURE_DESTROYED = 020, }; ``` ```cpp static struct __tpool_future *tpool_future_create(void) { struct __tpool_future *future = malloc(sizeof(struct __tpool_future)); if (future) { future->flag = 0; future->result = NULL; pthread_mutex_init(&future->mutex, NULL); pthread_condattr_t attr; pthread_condattr_init(&attr); pthread_cond_init(&future->cond_finished, &attr); pthread_condattr_destroy(&attr); } return future; } ``` 注意 condition variable 跟 mutex 的初始化 ```cpp int tpool_future_destroy(struct __tpool_future *future) { if (future) { pthread_mutex_lock(&future->mutex); if (future->flag & __FUTURE_FINISHED || future->flag & __FUTURE_CANCELLED) { pthread_mutex_unlock(&future->mutex); pthread_mutex_destroy(&future->mutex); pthread_cond_destroy(&future->cond_finished); free(future); } else { future->flag |= __FUTURE_DESTROYED; pthread_mutex_unlock(&future->mutex); } } return 0; } ``` ```cpp void *tpool_future_get(struct __tpool_future *future, unsigned int seconds) { pthread_mutex_lock(&future->mutex); /* turn off the timeout bit set previously */ future->flag &= ~__FUTURE_TIMEOUT; while ((future->flag & __FUTURE_FINISHED) == 0) { if (seconds) { struct timespec expire_time; clock_gettime(CLOCK_MONOTONIC, &expire_time); expire_time.tv_sec += seconds; int status = pthread_cond_timedwait(&future->cond_finished, &future->mutex, &expire_time); if (status == ETIMEDOUT) { future->flag |= __FUTURE_TIMEOUT; pthread_mutex_unlock(&future->mutex); return NULL; } } else pthread_cond_wait(&future->cond_finished, &future->mutex); } pthread_mutex_unlock(&future->mutex); return future->result; } ``` `if(second)` code block, 沒有在此次範例中用到, 設計上應該是針對較長時間或是 IO-bound 的 job, 以 `main.c` 的範例來說, ` double *result = tpool_future_get(futures[i], 0 /* blocking wait */);` 會 block 到 future 的結果完成, 如果要善用 timeout 設計的話, 可能需要利用額外的一個 queue 讀取 future, 將超時的 future 重新放回 queue, 等到下次讀取 #### `main.c` ```cpp #include <math.h> #include <stdio.h> #include "threadpool.h" #define PRECISION 100 /* upper bound in BPP sum */ /* Use Bailey–Borwein–Plouffe formula to approximate PI */ static void *bpp(void *arg) { int k = *(int *) arg; double sum = (4.0 / (8 * k + 1)) - (2.0 / (8 * k + 4)) - (1.0 / (8 * k + 5)) - (1.0 / (8 * k + 6)); double *product = malloc(sizeof(double)); if (product) *product = 1 / pow(16, k) * sum; return (void *) product; } int main() { int bpp_args[PRECISION + 1]; double bpp_sum = 0; tpool_t pool = tpool_create(4); tpool_future_t futures[PRECISION + 1]; for (int i = 0; i <= PRECISION; i++) { bpp_args[i] = i; futures[i] = tpool_apply(pool, bpp, (void *) &bpp_args[i]); } for (int i = 0; i <= PRECISION; i++) { double *result = tpool_future_get(futures[i], 0 /* blocking wait */); bpp_sum += *result; tpool_future_destroy(futures[i]); free(result); } tpool_join(pool); printf("PI calculated with %d terms: %.15f\n", PRECISION + 1, bpp_sum); return 0; } ``` ## Improvement ### 建立測試環境 對原本實作建立測試環境, 計算 1000 次, 紀錄每次的時間 ![](https://i.imgur.com/ycc67I5.png) 單一數據變異過大, 利用 fibdrv 學到的 outlier, 計算 100 次, 去除離群值後計算平均值, 重複 1000 次 ![](https://i.imgur.com/YTeSR2N.png) 測試程式碼 ```cpp== static double run(tpool_t pool){ int bpp_args[PRECISION + 1]; double bpp_sum = 0; tpool_future_t futures[PRECISION + 1]; for (int i = 0; i <= PRECISION; i++) { bpp_args[i] = i; futures[i] = tpool_apply(pool, bpp, (void *) &bpp_args[i]); } for (int i = 0; i <= PRECISION; i++) { double *result = tpool_future_get(futures[i], 0 /* blocking wait */); bpp_sum += *result; tpool_future_destroy(futures[i]); free(result); } return bpp_sum; } int main() { const int size = 1000; const int size_sampling = 100; struct timespec t_start, t_end; long long elapsed_time[size]; double bpp[size]; tpool_t pool = tpool_create(4); for(int i = 0; i < size;i++){ elapsed_time[i] = 0; long long elapsed_time_tmp[size_sampling]; for(int j = 0; j < size_sampling;j++){ clock_gettime(CLOCK_MONOTONIC, &t_start); bpp[i] = run(pool); clock_gettime(CLOCK_MONOTONIC, &t_end); elapsed_time_tmp[j] = (t_end.tv_sec * NANOSEC + t_end.tv_nsec) - (t_start.tv_sec * NANOSEC + t_start.tv_nsec); } long long sum = 0; for(int k = 0; k < size_sampling;k++){ sum += elapsed_time_tmp[k]; } long long mean = sum / size_sampling, sd = 0.0; for(int k = 0; k < size_sampling;k++){ sd += (elapsed_time_tmp[k] - mean) * (elapsed_time_tmp[k] - mean); } sd = sqrt(sd/(size_sampling - 1)); int count = 0; for(int k=0; k < size_sampling; k++){ if(elapsed_time_tmp[k] < (mean - 2*sd) && elapsed_time_tmp[k] > (mean + 2*sd)){ continue; } elapsed_time[i] += elapsed_time_tmp[k]; count++; } elapsed_time[i] = (elapsed_time[i]/count); } tpool_join(pool); for (int i =0; i < size; i++) { printf("%d %lld %.15f\n", i, elapsed_time[i], bpp[i]); } return 0; } ``` ### Implementation - [ ] job fetch 的改善, 可以看到以下 `jobqueue_fetch` 程式碼中 line 6-7的部分, 為了尋找 tail 跟 tail 的前一個 task 使用了 *O(n)* 的列舉,可以搭配 `tpool_apply` line 5-6 更動進行改善。 `jobqueue_fetch` ```cpp== if (jobqueue->head == jobqueue->tail) { task = jobqueue->tail; jobqueue->head = jobqueue->tail = NULL; } else { threadtask_t *tmp; for (tmp = jobqueue->head; tmp->next != jobqueue->tail; tmp = tmp->next) ; task = tmp->next; tmp->next = NULL; jobqueue->tail = tmp; } ``` `tpool_apply` ```cpp== if (new_head && future) { new_head->func = func, new_head->arg = arg, new_head->future = future; pthread_mutex_lock(&jobqueue->rwlock); if (jobqueue->head) { new_head->next = jobqueue->head; jobqueue->head = new_head; } else { jobqueue->head = jobqueue->tail = new_head; pthread_cond_broadcast(&jobqueue->cond_nonempty); } pthread_mutex_unlock(&jobqueue->rwlock); ``` apply 將新的 task 插入到 `jobqueue->tail`, 而 fetch 直接從 `jobqueue->head` 拿取新的 task, 就可以達成 *O(1)* 的新增跟取得操作 `jobqueue_fetch` ```cpp== if (jobqueue->head == jobqueue->tail) { task = jobqueue->tail; jobqueue->head = jobqueue->tail = NULL; } else { task = jobqueue->head; jobqueue->head = jobqueue->head->next; } ``` `tpool_apply` ```cpp== if (jobqueue->head) { jobqueue->tail->next = new_tail; jobqueue->tail = jobqueue->tail->next; } else { jobqueue->head = jobqueue->tail = new_tail; pthread_cond_broadcast(&jobqueue->cond_nonempty); } ``` [GitHub](https://github.com/Julian-Chu/linux2021-quiz4/tree/feature/replace-enumerating-jobqueue) 比起原先版本略有改善, 在任務更多的情況, 差異應會更明顯。 ![](https://i.imgur.com/MB5rmqD.png) - [ ] 觀察 lock 的位置, 會發現多個 worker 會競爭同一個 job queue lock, 可以分配獨立的 job queue 給每一個 worker, 在放入 task 的時候, 利用 consistent hashing 放入對應的 job queue, 可以免除掉 worker 對 job queue lock 的競爭, 不過額外要注意的是每個 worker 處理 task 的速度可能不一致, 可能有 worker 處理完自身的 task 後閒置的情況, 需要額外設計當 worker 閒置時, task 的重分配或是從其他 job queue 取得 task 的機制 ![](https://i.imgur.com/BNl9KmL.png) [GitHub](https://github.com/Julian-Chu/linux2021-quiz4/tree/feature/multi-jobqueue) 與設想不同, 性能反而下滑, 猜測 lock 數量增加反而導致在檢查上的CPU耗費比競爭同一個 lock 來得高或是實作上有問題(代確認) ![](https://i.imgur.com/yoACYZ7.png) ## 研讀atomic_threadpool *研讀忽略對 windows 平臺的實作 ### thread pool - 物件設計上與 quiz4 類似, 缺少承接運算結果的 future, 封裝更多的操作在 `lfqueue`, 同時 task 與 lfqueue 解耦, atomic 的操作主要封裝在 `lfqueue` - synchronzation 上的實作與 quiz4 不同(主要在 lfqueue), 使用 CAS loop 取代 mutex lock :::warning question: CAS loop 是 spin lock 嗎? ::: ```cpp= typedef struct { void (*do_work)(void *); void *args; } at_thtask_t; struct at_thpool_s { pthread_t *threads; lfqueue_t taskqueue; size_t nrunning; int is_running; }; ``` #### create ```cpp at_thpool_t * at_thpool_create(size_t nthreads) { size_t i; if (nthreads > MAX_THREADS) { fprintf(stderr, "The nthreads is > %d, over max thread will affect the system scalability, it might not scale well\n", MAX_THREADS); } at_thpool_t *tp; tp = (at_thpool_t*) AT_THPOOL_MALLOC(sizeof(at_thpool_t)); if (tp == NULL) { AT_THPOOL_ERROR("malloc"); return NULL; } tp->threads = (pthread_t *)AT_THPOOL_MALLOC(sizeof(pthread_t) * nthreads); tp->nrunning = 0; if (tp->threads == NULL) { AT_THPOOL_ERROR("malloc"); AT_THPOOL_FREE(tp); return NULL; } if (lfqueue_init(&tp->taskqueue) < 0) { AT_THPOOL_ERROR("malloc"); AT_THPOOL_FREE(tp->threads); AT_THPOOL_FREE(tp); return NULL; } tp->is_running = 1; for (i = 0; i < nthreads; i++) { if (pthread_create(&(tp->threads[i]), NULL, at_thpool_worker, (void*)tp)) { if (i != 0) { fprintf(stderr, "maximum thread has reached %zu \n", i ); break; } else { AT_THPOOL_ERROR("Failed to establish thread pool"); at_thpool_immediate_shutdown(tp); } } pthread_detach(tp->threads[i]); } return tp; } ``` #### worker ```cpp= void* at_thpool_worker(void *_tp) { at_thpool_t *tp = (at_thpool_t*)_tp; AT_THPOOL_INC(&tp->nrunning); at_thtask_t *task; void *_task; lfqueue_t *tq = &tp->taskqueue; TASK_PENDING: while (tp->is_running) { if ( (_task = lfqueue_deq(tq)) ) { goto HANDLE_TASK; } lfqueue_sleep(1); } AT_THPOOL_DEC(&tp->nrunning); return NULL; HANDLE_TASK: task = (at_thtask_t*) _task; task->do_work(task->args); AT_THPOOL_FREE(task); goto TASK_PENDING; } ``` 注意 `TASK_PENDING` 的 code block, 類似 spin lock, 利用迴圈加上 atomic operation(lfqueue_deq內部)和極短的sleep, 取代掉 quiz4 的 mutex lock 跟 condition variable #### gracefully_shutdown ```cpp void at_thpool_gracefully_shutdown(at_thpool_t *tp) { tp->is_running = 0; int i, ncycle = DEF_SPIN_LOCK_CYCLE; for (;;) { for (i = 0; i < ncycle; i++) { if (tp->nrunning == 0) { goto SHUTDOWN; } } AT_THPOOL_SHEDYIELD(); } SHUTDOWN: lfqueue_destroy(&tp->taskqueue); lfqueue_sleep(100); AT_THPOOL_FREE(tp->threads); AT_THPOOL_FREE(tp); } #define DEF_SPIN_LOCK_CYCLE 2048 #define AT_THPOOL_SHEDYIELD sched_yield ``` 注意 graceful shutdown 的實作, 利用類似 spin lock 的方式跑 ncycle 個迴圈, 在 ncycle 個迴圈後, 還有 running worker 的話, 呼叫 `AT_THEPOOL_SHEYIELD`, 使當前執行緒的讓出 CPU, 讓其他執行緒可以取得 CPU resource, 可以想成是對短時間內利用 spin lock 避免 context switch , 但是長時間還是會利用釋放 CPU 資源的方式, 在不同情況下取得一個資源利用的平衡 `man sched_yield(2)` >sched_yield() causes the calling thread to relinquish the CPU. The thread is moved to the end of the queue for its static priority and a new thread gets to run. > >Note: >If the calling thread is the only thread in the highest priority list at that time, it will continue to run after a call to sched_yield(). > >POSIX systems on which sched_yield() is available define _POSIX_PRIORITY_SCHEDULING in <unistd.h>. > >Strategic calls to sched_yield() can improve performance by giving other threads or processes a chance to run when (heavily) contended resources (e.g., mutexes) have been released by the caller. Avoid calling sched_yield() unnecessarily or inappropriately (e.g., when resources needed by other schedulable threads are still held by the caller), since doing so will result in unnecessary context switches, which will degrade system performance. > >sched_yield() is intended for use with real-time scheduling policies (i.e., SCHED_FIFO or SCHED_RR). Use of sched_yield() with nondeterministic scheduling policies such as SCHED_OTHER is unspecified and very likely means your application design is broken. ### lfqueue(lock free queue) ```cpp= typedef struct { lfqueue_cas_node_t *head, *tail, *root_free, *move_free; volatile size_t size; volatile lfq_bool_t in_free_mode; lfqueue_malloc_fn _malloc; lfqueue_free_fn _free; void *pl; } lfqueue_t; ``` 設計上值得注意的地方: - 由於是在 lfqueue 內部做 free node 的動作, 所以有額外設計 root_free, move_free 來存放 dequeue 後需要釋放資源的node - 利用 dummy head(base) 初始化避免 head 爲 null 的問題, 同時在 dequeue 的時候把 head->next 當新的 dummy head 做 CAS, 實際上每次 dequeue 的是 head->next 而不是 head - `_malloc`, `_free`, `pl` 可以針對額外需求做更動, 增加彈性 :::warning q1: 加上 volatile 可以避免 compiler 做最佳化跟 CAS 操作時讀取到舊值, 但為什麼 lfqueue_cas_node_t 相關的 head/tail/etc 不需要加 volatile ::: ```cpp= struct lfqueue_cas_node_s { void * value; struct lfqueue_cas_node_s *next, *nextfree; lfq_time_t _deactivate_tm; }; ``` 注意 `_deactivate_tm`, 在 CAS 時可能會有兩個有相同值的 node (ABA問題), 利用 `_deactivate_tm` 時間戳可以確保, 不同時間 node 在 CAS 時是不同的, 同時在 `__lfq_check_free` 可以判斷是否要 free ```cpp #define __LFQ_SYNC_MEMORY __sync_synchronize ``` >__sync_synchronize (...) >This built-in function issues a full memory barrier. `enqueue` ```cpp= #define __LFQ_ADD_AND_FETCH __sync_add_and_fetch int lfqueue_enq(lfqueue_t *lfqueue, void *value) { if (_enqueue(lfqueue, value)) { return -1; } __LFQ_ADD_AND_FETCH(&lfqueue->size, 1); return 0; } static int _enqueue(lfqueue_t *lfqueue, void* value) { lfqueue_cas_node_t *tail, *node; node = (lfqueue_cas_node_t*) lfqueue->_malloc(lfqueue->pl, sizeof(lfqueue_cas_node_t)); if (node == NULL) { perror("malloc"); return errno; } node->value = value; node->next = NULL; node->nextfree = NULL; for (;;) { __LFQ_SYNC_MEMORY(); tail = lfqueue->tail; if (__LFQ_BOOL_COMPARE_AND_SWAP(&tail->next, NULL, node)) { // compulsory swap as tail->next is no NULL anymore, it has fenced on other thread __LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->tail, tail, node); __lfq_check_free(lfqueue); return 0; } } /*It never be here*/ return -1; } ``` `dequeue` ```cpp= void *lfqueue_deq(lfqueue_t *lfqueue) { void *v; if (//__LFQ_ADD_AND_FETCH(&lfqueue->size, 0) && (v = _dequeue(lfqueue)) ) { __LFQ_FETCH_AND_ADD(&lfqueue->size, -1); return v; } return NULL; } static void *_dequeue(lfqueue_t *lfqueue) { lfqueue_cas_node_t *head, *next; void *val; for (;;) { head = lfqueue->head; if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->head, head, head)) { next = head->next; if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->tail, head, head)) { if (next == NULL) { val = NULL; goto _done; } } else { if (next) { val = next->value; if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->head, head, next)) { break; } } else { val = NULL; goto _done; } } } } __lfq_recycle_free(lfqueue, head); _done: // __asm volatile("" ::: "memory"); __LFQ_SYNC_MEMORY(); __lfq_check_free(lfqueue); return val; } ``` enqueue 跟 dequeue 都是利用 CAS Loop 實作, 同時並用 `__LFQ_SYNC_MEMORY();` 避免 complier optimization 或是讀取到舊的數據, 導致不正確的資料導致 CAS 判斷錯誤 ```cpp static void __lfq_recycle_free(lfqueue_t *lfqueue, lfqueue_cas_node_t* freenode) { lfqueue_cas_node_t *freed; do { freed = lfqueue->move_free; } while (!__LFQ_BOOL_COMPARE_AND_SWAP(&freed->nextfree, NULL, freenode) ); lfq_get_curr_time(&freenode->_deactivate_tm); __LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->move_free, freed, freenode); } static void __lfq_check_free(lfqueue_t *lfqueue) { lfq_time_t curr_time; if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->in_free_mode, 0, 1)) { lfq_get_curr_time(&curr_time); lfqueue_cas_node_t *rtfree = lfqueue->root_free, *nextfree; while ( rtfree && (rtfree != lfqueue->move_free) ) { nextfree = rtfree->nextfree; if ( lfq_diff_time(curr_time, rtfree->_deactivate_tm) > 2) { // printf("%p\n", rtfree); lfqueue->_free(lfqueue->pl, rtfree); rtfree = nextfree; } else { break; } } lfqueue->root_free = rtfree; __LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->in_free_mode, 1, 0); } __LFQ_SYNC_MEMORY(); } ``` :::warning q1: 對上述程式碼中 if ( lfq_diff_time(curr_time, rtfree->_deactivate_tm) > 2),這個過期時間有疑問, 直接 free 會有問題嗎亦或是效能考量 ::: ### lock free atomic threadpool 中, 利用 CAS loop, 讓 thread(worker/main thread) 可以自行判斷 enqueue 或是 dequeue 的時機, 而不需要使用共用的 mutex lock 或 condition variable, 不會有 blocking 的情況, 可以確保隨時都有 thread 有 progress :::warning q1: spin lock 的使用時機判斷, 長時間使用 spin lock 反而導致 CPU 資源的浪費, 該怎麼拿捏 :::