# 2017q1 Homework4 (phonebook-concurrent) contributed by < `Chihsiang`> [作業說明](https://hackmd.io/s/rkOExTOoe) 參考[實驗共筆](https://hackmd.io/s/BkN1JNQp) >>在作業區上的連結記得是貼「發佈」後的短連結喔! >>[name=課程助教][color=red] ## 分析程式 * mmap + text_align 從共筆分析動機來看用這個做法目的是希望可以加速 file read 的速度,做法把字串補足成一樣長度,把 file 的資料對應到 memory,利用 offset 的方式移動建立資料儲存。 - text align ```c= text_align(DICT_FILE, ALIGN_FILE, MAX_LAST_NAME_SIZE); int fd = open(ALIGN_FILE, O_RDONLY | O_NONBLOCK); off_t file_size = fsize(ALIGN_FILE); ``` 在讀取檔案的部分使用==O_NONBLOCK== - mmap ```c= char *map; map = mmap(NULL, file_size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0); ``` 參考 [CheHsuan](https://hackmd.io/s/rJPYFYRa) mmap 是將檔案內容映射到一塊 virtual memory address,藉由對 memory address 修改的方式提高 IO 的效率相對傳統 read / write ,mmap 減少資料 load 到緩衝區的 overhead 關於其中 flags ==MAP_PRIVATE== 當修改內容時,並不會直接修改到原始檔案,也不會更新回原本的檔案,依然是修改到一份映射的複製品。 * memory pool ```c= entry_pool = (entry *)malloc(sizeof(entry) * file_size / MAX_LAST_NAME_SIZE); ``` 這部分直接分配足夠存入所有資料的記憶體,給建立資料結構所使用 - 有幾點疑問: - 做 test align 的同時,就花費額外的資源補足效果好嗎? * pthread - pthread_setconcurrency ```c= pthread_setconcurrency(THREAD_NUM + 1); ``` - 主程式 ```c= //main.c#101 pthread_setconcurrency(THREAD_NUM + 1); for (int i = 0; i < THREAD_NUM; i++) // Created by malloc, remeber to free them. thread_args[i] = createThread_arg(map + MAX_LAST_NAME_SIZE * i, map + file_size, i, THREAD_NUM, entry_pool + i); /* Deliver the jobs to all threads and wait for completing */ for (int i = 0; i < THREAD_NUM; i++) pthread_create(&threads[i], NULL, (void *)&append, (void *)thread_args[i]); for (int i = 0; i < THREAD_NUM; i++) pthread_join(threads[i], NULL); /* Connect the linked list of each thread */ for (int i = 0; i < THREAD_NUM; i++) { if (i == 0) { pHead = thread_args[i]->lEntry_head->pNext; } else { e->pNext = thread_args[i]->lEntry_head->pNext; } e = thread_args[i]->lEntry_tail; } ``` ```c= // phonebook_opt.c void append(void *arg) { thread_arg *t_arg = (thread_arg *) arg; int count = 0; entry *j = t_arg->lEntryPool_begin; for (char *i = t_arg->data_begin; i < t_arg->data_end; i += MAX_LAST_NAME_SIZE * t_arg->numOfThread, j += t_arg->numOfThread, count++) { /* Append the new at the end of the local linked list */ t_arg->lEntry_tail->pNext = j; t_arg->lEntry_tail = t_arg->lEntry_tail->pNext; t_arg->lEntry_tail->lastName = i; t_arg->lEntry_tail->pNext = NULL; t_arg->lEntry_tail->dtl = NULL; } pthread_exit(NULL); } ``` 研究許久,先從`pthread_create`的部份看起,根據指定的 Thread 數量,把 append 分配給不同的 thread 處理,`pthread_join`的部份則將他們處理完之後合併回主 Thread ,最後一部分比較主要的把處理好的資料連結再一起。 ## 修改方向 * Thread Pool 定義&宣告 概念上參考[Wiki](https://en.wikipedia.org/wiki/Thread_pool),以及[C pool 筆記](http://swind.code-life.info/posts/c-thread-pool.html) 簡略步驟大概是: 1. 設計定義 Thread Pool 結構 ```c= typedef struct _THREAD_POOL_ threadpool_t; struct _THREAD_POOL_ { pthread_t threads; threadpool_task_t *queue; pthread_mutex_t lock; pthread_cond_t notify; int thread_count; int queue_size; int head; int tail; int shutdown; int started; int count; }; ``` 解析: - threads: 儲存所有 create 的threads - queue: 工作佇列結構(Linklist) - lock: threadpool中的互斥變數 - notify: threadpool中的條件變數 - thread_count: 總共 thread 數量 - queue_size: task 佇列大小 - head: 第一個 element 的索引 - tail: 下一個 element 的索引 - count: 等待中的 task 數量 - started: 可使用的 thread 數量 - shutdown: 標籤確認 pool 是否關閉 2. 建立 Task queue 結構 ```c= typedef struct _THREAD_TASK_ { void (*func)(void *); void *argument; } threadpool_task_t; ``` 解析: - void (*func)(void *): 存放執行 function 的位置(function pointer) - void *argument: 參數指標 (將參數設計成一個結構體) 3. 建立 Thread_pool,參數thread數量、queue大小、thread args ```c= threadpool_t *threadpool_init(int thread_count, int queue_size, int flags); ``` 4. 新增 Task to Queuek ```c= int threadpool_add_task(threadpool_t *pool, void(*routine)(void *), void *arg, int flags); ``` 5. 刪除 Thread pool ```c= int threadpool_destroy(threadpool_t *pool, int flags); ``` * Thread pool 實作 - thread init ```c= threadpool_t *threadpool_init(int thread_count, int queue_size, int flags) { /* #0 malloc pool */ threadpool *pool = (threadpool_t *) malloc(sizeof(threadpool_t)); int i; assert( "threadpool_init()" && thread_count > 0 && queue_size > 0 ); /* #1 Initialize */ pool->thread_count = 0; pool->queue_size = queue_size; pool->head = pool->tail = pool->count = 0; pool->shutdown = pool->started = 0; /* #2 Allocate thread and task queue */ pool->threads = (pthread_t *) malloc (sizeof(pthread_t) * thread_count); pool->queue = (threadpool_task_t *) malloc (sizeof(threadpool_task_t) * queue_size); /* #3 Initialize mutex and conditional variable first */ pthread_mutex_init(&(pool->lock), NULL); pthread_cond_init(&(pool->notify), NULL); /* #4 Start worker threads */ for (i = 0; i < thread_count; i++) { ¦ if (pthread_create(&(pool->threads[i]), NULL, ¦ ¦ ¦ ¦ ¦ ¦ threadpool_thread, (void *)pool) != 0 ) { ¦ ¦ threadpool_destroy(pool, 0); ¦ ¦ return NULL; ¦ } ¦ pool->thread_count++; ¦ pool->started++; } return pool; } ``` 解析步驟: - #0 初始化(申請記憶體) threadpool - #1 初始化 thread pool struct 內容 - #2 初始化(申請記憶體) threads & task queue 總共的大小 - #3 初始化==互斥變數==&==條件變數== - #4 建立 pthread 實體,儲存於 threadpool->threads,thread_count & started 紀錄數量 - thread add task ```c= int threadpool_add_task(threadpool_t *pool, void(*func)(void *),void *arg, int flags) { int err = 0; int next; /* Check argument pool & function */ if (pool == NULL || func == NULL) ¦ return threadpool_invalid; /* Check if we get lock ? */ if (pthread_mutex_lock(&(pool->lock)) != 0) ¦ return threadpool_lock_failure; next = (pool->tail + 1) % pool->queue_size; /* loop do add task */ do { ¦ /* Are task full ? */ ¦ if (pool->count == pool->queue_size) { ¦ ¦ err = threadpool_queue_full; ¦ ¦ break; ¦ } ¦ /* Are we shutting down */ ¦ if (pool->shutdown) { ¦ ¦ err = threadpool_shutdown; ¦ ¦ break; ¦ } ¦ /* Add task to queue */ ¦ pool->queue[pool->tail].func = func; ¦ pool->queue[pool->tail].argument = arg; ¦ pool->tail = next; ¦ pool->count += 1; ¦ /* pthread_cond_broadcask */ ¦ if (pthread_cond_signal(&(pool->notify)) != 0) { ¦ ¦ err = threadpool_lock_failure; ¦ ¦ break; ¦ } } while (0); if (pthread_mutex_unlock(&(pool->lock)) != 0) { ¦ err = threadpool_lock_failure; } return err; } ``` - thread destroy ```c= int threadpool_destroy(threadpool_t *pool, int flags) { int i, err = 0; if (pool == NULL) { ¦ return threadpool_invalid; } if (pthread_mutex_lock(&(pool->lock)) != 0) { ¦ return threadpool_lock_failure; } do { ¦ /* Already shutting down */ ¦ if (pool->shutdown) { ¦ ¦ err = threadpool_shutdown; ¦ ¦ break; ¦ } ¦ pool->shutdown = (flags & threadpool_graceful) ? ¦ ¦ ¦graceful_shutdown : immediate_shutdown; ¦ /* Wake up all worker threads */ ¦ if ((pthread_cond_broadcast(&(pool->notify)) ! = 0) || ¦ ¦ (pthread_mutex_unlock(&(pool->lock)) != 0)) { ¦ ¦ ¦err = threadpool_lock_failure; ¦ ¦ ¦break; ¦ } ¦ /* Join all worker thread */ ¦ for (i = 0; i < pool->thread_count; i++) { ¦ ¦ if (pthread_join(pool->threads[i], NULL) != 0) { ¦ ¦ ¦ err = threadpool_thread_failure; ¦ ¦ } ¦ } } while (0); /* Only if everything went well do we deallocate the pool */ if (!err) { ¦ threadpool_free(pool); } return err; } ``` * threadpool free ```c= int threadpool_free(threadpool_t *pool) { if (pool = NULL || pool->started > 0) { ¦ return -1; } /* Did we manage to allocate ? */ if (pool->threads) { ¦ free(pool->threads); ¦ free(pool->queue); ¦ pthread_mutex_lock(&(pool->lock)); ¦ pthread_mutex_destroy(&(pool->lock)); ¦ pthread_cond_destroy(&(pool->notify)); } free(pool); return 0; } ``` * 最主要指定每個 thread 要執行的 Function ```c= static void *threadpool_thread(void *thread_pool) { threadpool_t *pool = (threadpool_t *)threadpool; threadpool_task_t task; for(;;) { /* Lock must be taken to wait on conditional variable */ pthread_mutex_lock(&(pool->lock)); /* Wait on condition variable, check for spurious wakeups. When returning from pthread_cond_wait(), we own the lock. */ while((pool->count == 0) && (!pool->shutdown)) { pthread_cond_wait(&(pool->notify), &(pool->lock)); } if((pool->shutdown == immediate_shutdown) || ((pool->shutdown == graceful_shutdown) && (pool->count == 0))) { break; } /* Grab our task */ task.func = pool->queue[pool->head].function; task.argument = pool->queue[pool->head].argument; pool->head = (pool->head + 1) % pool->queue_size; pool->count -= 1; /* Unlock */ pthread_mutex_unlock(&(pool->lock)); /* Get to work */ (*(task.func))(task.argument); } pool->started--; pthread_mutex_unlock(&(pool->lock)); pthread_exit(NULL); return(NULL); } ```