# 2017q1 Homework4 (phonebook-concurrent)
contributed by < `Chihsiang`>
[作業說明](https://hackmd.io/s/rkOExTOoe)
參考[實驗共筆](https://hackmd.io/s/BkN1JNQp)
>>在作業區上的連結記得是貼「發佈」後的短連結喔!
>>[name=課程助教][color=red]
## 分析程式
* mmap + text_align
從共筆分析動機來看用這個做法目的是希望可以加速 file read 的速度,做法把字串補足成一樣長度,把 file 的資料對應到 memory,利用 offset 的方式移動建立資料儲存。
- text align
```c=
text_align(DICT_FILE, ALIGN_FILE, MAX_LAST_NAME_SIZE);
int fd = open(ALIGN_FILE, O_RDONLY | O_NONBLOCK);
off_t file_size = fsize(ALIGN_FILE);
```
在讀取檔案的部分使用==O_NONBLOCK==
- mmap
```c=
char *map;
map = mmap(NULL, file_size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
```
參考 [CheHsuan](https://hackmd.io/s/rJPYFYRa)
mmap 是將檔案內容映射到一塊 virtual memory address,藉由對 memory address 修改的方式提高 IO 的效率相對傳統 read / write ,mmap 減少資料 load 到緩衝區的 overhead
關於其中 flags ==MAP_PRIVATE== 當修改內容時,並不會直接修改到原始檔案,也不會更新回原本的檔案,依然是修改到一份映射的複製品。
* memory pool
```c=
entry_pool = (entry *)malloc(sizeof(entry) *
file_size / MAX_LAST_NAME_SIZE);
```
這部分直接分配足夠存入所有資料的記憶體,給建立資料結構所使用
- 有幾點疑問:
- 做 test align 的同時,就花費額外的資源補足效果好嗎?
* pthread
- pthread_setconcurrency
```c=
pthread_setconcurrency(THREAD_NUM + 1);
```
- 主程式
```c=
//main.c#101
pthread_setconcurrency(THREAD_NUM + 1);
for (int i = 0; i < THREAD_NUM; i++)
// Created by malloc, remeber to free them.
thread_args[i] = createThread_arg(map + MAX_LAST_NAME_SIZE * i, map + file_size, i,
THREAD_NUM, entry_pool + i);
/* Deliver the jobs to all threads and wait for completing */
for (int i = 0; i < THREAD_NUM; i++)
pthread_create(&threads[i], NULL, (void *)&append, (void *)thread_args[i]);
for (int i = 0; i < THREAD_NUM; i++)
pthread_join(threads[i], NULL);
/* Connect the linked list of each thread */
for (int i = 0; i < THREAD_NUM; i++) {
if (i == 0) {
pHead = thread_args[i]->lEntry_head->pNext;
} else {
e->pNext = thread_args[i]->lEntry_head->pNext;
}
e = thread_args[i]->lEntry_tail;
}
```
```c=
// phonebook_opt.c
void append(void *arg)
{
thread_arg *t_arg = (thread_arg *) arg;
int count = 0;
entry *j = t_arg->lEntryPool_begin;
for (char *i = t_arg->data_begin; i < t_arg->data_end;
i += MAX_LAST_NAME_SIZE * t_arg->numOfThread,
j += t_arg->numOfThread, count++) {
/* Append the new at the end of the local linked list */
t_arg->lEntry_tail->pNext = j;
t_arg->lEntry_tail = t_arg->lEntry_tail->pNext;
t_arg->lEntry_tail->lastName = i;
t_arg->lEntry_tail->pNext = NULL;
t_arg->lEntry_tail->dtl = NULL;
}
pthread_exit(NULL);
}
```
研究許久,先從`pthread_create`的部份看起,根據指定的 Thread 數量,把 append 分配給不同的 thread 處理,`pthread_join`的部份則將他們處理完之後合併回主 Thread ,最後一部分比較主要的把處理好的資料連結再一起。
## 修改方向
* Thread Pool 定義&宣告
概念上參考[Wiki](https://en.wikipedia.org/wiki/Thread_pool),以及[C pool 筆記](http://swind.code-life.info/posts/c-thread-pool.html)
簡略步驟大概是:
1. 設計定義 Thread Pool 結構
```c=
typedef struct _THREAD_POOL_ threadpool_t;
struct _THREAD_POOL_ {
pthread_t threads;
threadpool_task_t *queue;
pthread_mutex_t lock;
pthread_cond_t notify;
int thread_count;
int queue_size;
int head;
int tail;
int shutdown;
int started;
int count;
};
```
解析:
- threads: 儲存所有 create 的threads
- queue: 工作佇列結構(Linklist)
- lock: threadpool中的互斥變數
- notify: threadpool中的條件變數
- thread_count: 總共 thread 數量
- queue_size: task 佇列大小
- head: 第一個 element 的索引
- tail: 下一個 element 的索引
- count: 等待中的 task 數量
- started: 可使用的 thread 數量
- shutdown: 標籤確認 pool 是否關閉
2. 建立 Task queue 結構
```c=
typedef struct _THREAD_TASK_ {
void (*func)(void *);
void *argument;
} threadpool_task_t;
```
解析:
- void (*func)(void *): 存放執行 function 的位置(function pointer)
- void *argument: 參數指標 (將參數設計成一個結構體)
3. 建立 Thread_pool,參數thread數量、queue大小、thread args
```c=
threadpool_t *threadpool_init(int thread_count, int queue_size, int flags);
```
4. 新增 Task to Queuek
```c=
int threadpool_add_task(threadpool_t *pool, void(*routine)(void *),
void *arg, int flags);
```
5. 刪除 Thread pool
```c=
int threadpool_destroy(threadpool_t *pool, int flags);
```
* Thread pool 實作
- thread init
```c=
threadpool_t *threadpool_init(int thread_count, int queue_size, int flags)
{
/* #0 malloc pool */
threadpool *pool = (threadpool_t *) malloc(sizeof(threadpool_t));
int i;
assert( "threadpool_init()" && thread_count > 0 && queue_size > 0 );
/* #1 Initialize */
pool->thread_count = 0;
pool->queue_size = queue_size;
pool->head = pool->tail = pool->count = 0;
pool->shutdown = pool->started = 0;
/* #2 Allocate thread and task queue */
pool->threads = (pthread_t *) malloc (sizeof(pthread_t) * thread_count);
pool->queue = (threadpool_task_t *) malloc (sizeof(threadpool_task_t) * queue_size);
/* #3 Initialize mutex and conditional variable first */
pthread_mutex_init(&(pool->lock), NULL);
pthread_cond_init(&(pool->notify), NULL);
/* #4 Start worker threads */
for (i = 0; i < thread_count; i++) {
¦ if (pthread_create(&(pool->threads[i]), NULL,
¦ ¦ ¦ ¦ ¦ ¦ threadpool_thread, (void *)pool) != 0 ) {
¦ ¦ threadpool_destroy(pool, 0);
¦ ¦ return NULL;
¦ }
¦ pool->thread_count++;
¦ pool->started++;
}
return pool;
}
```
解析步驟:
- #0 初始化(申請記憶體) threadpool
- #1 初始化 thread pool struct 內容
- #2 初始化(申請記憶體) threads & task queue 總共的大小
- #3 初始化==互斥變數==&==條件變數==
- #4 建立 pthread 實體,儲存於 threadpool->threads,thread_count & started 紀錄數量
- thread add task
```c=
int threadpool_add_task(threadpool_t *pool, void(*func)(void *),void *arg, int flags)
{
int err = 0;
int next;
/* Check argument pool & function */
if (pool == NULL || func == NULL)
¦ return threadpool_invalid;
/* Check if we get lock ? */
if (pthread_mutex_lock(&(pool->lock)) != 0)
¦ return threadpool_lock_failure;
next = (pool->tail + 1) % pool->queue_size;
/* loop do add task */
do {
¦ /* Are task full ? */
¦ if (pool->count == pool->queue_size) {
¦ ¦ err = threadpool_queue_full;
¦ ¦ break;
¦ }
¦ /* Are we shutting down */
¦ if (pool->shutdown) {
¦ ¦ err = threadpool_shutdown;
¦ ¦ break;
¦ }
¦ /* Add task to queue */
¦ pool->queue[pool->tail].func = func;
¦ pool->queue[pool->tail].argument = arg;
¦ pool->tail = next;
¦ pool->count += 1;
¦ /* pthread_cond_broadcask */
¦ if (pthread_cond_signal(&(pool->notify)) != 0) {
¦ ¦ err = threadpool_lock_failure;
¦ ¦ break;
¦ }
} while (0);
if (pthread_mutex_unlock(&(pool->lock)) != 0) {
¦ err = threadpool_lock_failure;
}
return err;
}
```
- thread destroy
```c=
int threadpool_destroy(threadpool_t *pool, int flags)
{
int i, err = 0;
if (pool == NULL) {
¦ return threadpool_invalid;
}
if (pthread_mutex_lock(&(pool->lock)) != 0) {
¦ return threadpool_lock_failure;
}
do {
¦ /* Already shutting down */
¦ if (pool->shutdown) {
¦ ¦ err = threadpool_shutdown;
¦ ¦ break;
¦ }
¦ pool->shutdown = (flags & threadpool_graceful) ?
¦ ¦ ¦graceful_shutdown : immediate_shutdown;
¦ /* Wake up all worker threads */
¦ if ((pthread_cond_broadcast(&(pool->notify)) ! = 0) ||
¦ ¦ (pthread_mutex_unlock(&(pool->lock)) != 0)) {
¦ ¦ ¦err = threadpool_lock_failure;
¦ ¦ ¦break;
¦ }
¦ /* Join all worker thread */
¦ for (i = 0; i < pool->thread_count; i++) {
¦ ¦ if (pthread_join(pool->threads[i], NULL) != 0) {
¦ ¦ ¦ err = threadpool_thread_failure;
¦ ¦ }
¦ }
} while (0);
/* Only if everything went well do we deallocate the pool */
if (!err) {
¦ threadpool_free(pool);
}
return err;
}
```
* threadpool free
```c=
int threadpool_free(threadpool_t *pool)
{
if (pool = NULL || pool->started > 0) {
¦ return -1;
}
/* Did we manage to allocate ? */
if (pool->threads) {
¦ free(pool->threads);
¦ free(pool->queue);
¦ pthread_mutex_lock(&(pool->lock));
¦ pthread_mutex_destroy(&(pool->lock));
¦ pthread_cond_destroy(&(pool->notify));
}
free(pool);
return 0;
}
```
* 最主要指定每個 thread 要執行的 Function
```c=
static void *threadpool_thread(void *thread_pool)
{
threadpool_t *pool = (threadpool_t *)threadpool;
threadpool_task_t task;
for(;;) {
/* Lock must be taken to wait on conditional variable */
pthread_mutex_lock(&(pool->lock));
/* Wait on condition variable, check for spurious wakeups.
When returning from pthread_cond_wait(), we own the lock. */
while((pool->count == 0) && (!pool->shutdown)) {
pthread_cond_wait(&(pool->notify), &(pool->lock));
}
if((pool->shutdown == immediate_shutdown) ||
((pool->shutdown == graceful_shutdown) &&
(pool->count == 0))) {
break;
}
/* Grab our task */
task.func = pool->queue[pool->head].function;
task.argument = pool->queue[pool->head].argument;
pool->head = (pool->head + 1) % pool->queue_size;
pool->count -= 1;
/* Unlock */
pthread_mutex_unlock(&(pool->lock));
/* Get to work */
(*(task.func))(task.argument);
}
pool->started--;
pthread_mutex_unlock(&(pool->lock));
pthread_exit(NULL);
return(NULL);
}
```