Try   HackMD

2017q1 Homework4 (phonebook-concurrent)

contributed by < Chihsiang>
作業說明
參考實驗共筆

在作業區上的連結記得是貼「發佈」後的短連結喔!
課程助教

分析程式

  • mmap + text_align

    從共筆分析動機來看用這個做法目的是希望可以加速 file read 的速度,做法把字串補足成一樣長度,把 file 的資料對應到 memory,利用 offset 的方式移動建立資料儲存。

    • text align
    ​​​​ text_align(DICT_FILE, ALIGN_FILE, MAX_LAST_NAME_SIZE); ​​​​ int fd = open(ALIGN_FILE, O_RDONLY | O_NONBLOCK); ​​​​ off_t file_size = fsize(ALIGN_FILE);

    在讀取檔案的部分使用O_NONBLOCK

    • mmap
    ​​​​char *map; ​​​​map = mmap(NULL, file_size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);

    參考 CheHsuan

    mmap 是將檔案內容映射到一塊 virtual memory address,藉由對 memory address 修改的方式提高 IO 的效率相對傳統 read / write ,mmap 減少資料 load 到緩衝區的 overhead

    關於其中 flags MAP_PRIVATE 當修改內容時,並不會直接修改到原始檔案,也不會更新回原本的檔案,依然是修改到一份映射的複製品。

    • memory pool
    ​​​​​entry_pool = (entry *)malloc(sizeof(entry) * ​​​​​ file_size / MAX_LAST_NAME_SIZE);

    這部分直接分配足夠存入所有資料的記憶體,給建立資料結構所使用

    • 有幾點疑問:

      • 做 test align 的同時,就花費額外的資源補足效果好嗎?
  • pthread

    • pthread_setconcurrency
    ​​​​pthread_setconcurrency(THREAD_NUM + 1);
    • 主程式
    ​​​​//main.c#101 ​​​​ ​​​​pthread_setconcurrency(THREAD_NUM + 1); ​​​​for (int i = 0; i < THREAD_NUM; i++) ​​​​ // Created by malloc, remeber to free them. ​​​​ thread_args[i] = createThread_arg(map + MAX_LAST_NAME_SIZE * i, map + file_size, i, ​​​​ THREAD_NUM, entry_pool + i); ​​​​ /* Deliver the jobs to all threads and wait for completing */ ​​​​for (int i = 0; i < THREAD_NUM; i++) ​​​​ pthread_create(&threads[i], NULL, (void *)&append, (void *)thread_args[i]); ​​​​for (int i = 0; i < THREAD_NUM; i++) ​​​​ pthread_join(threads[i], NULL); ​​​​ /* Connect the linked list of each thread */ ​​​​for (int i = 0; i < THREAD_NUM; i++) { ​​​​ if (i == 0) { ​​​​ pHead = thread_args[i]->lEntry_head->pNext; ​​​​ } else { ​​​​ e->pNext = thread_args[i]->lEntry_head->pNext; ​​​​ } ​​​​ e = thread_args[i]->lEntry_tail; ​​​​}
    ​​​​// phonebook_opt.c ​​​​ ​​​​void append(void *arg) ​​​​{ ​​​​ thread_arg *t_arg = (thread_arg *) arg; ​​​​ int count = 0; ​​​​ entry *j = t_arg->lEntryPool_begin; ​​​​ for (char *i = t_arg->data_begin; i < t_arg->data_end; ​​​​ i += MAX_LAST_NAME_SIZE * t_arg->numOfThread, ​​​​ j += t_arg->numOfThread, count++) { ​​​​ /* Append the new at the end of the local linked list */ ​​​​ t_arg->lEntry_tail->pNext = j; ​​​​ t_arg->lEntry_tail = t_arg->lEntry_tail->pNext; ​​​​ t_arg->lEntry_tail->lastName = i; ​​​​ t_arg->lEntry_tail->pNext = NULL; ​​​​ t_arg->lEntry_tail->dtl = NULL; ​​​​ } ​​​​ pthread_exit(NULL); ​​​​}

    研究許久,先從pthread_create的部份看起,根據指定的 Thread 數量,把 append 分配給不同的 thread 處理,pthread_join的部份則將他們處理完之後合併回主 Thread ,最後一部分比較主要的把處理好的資料連結再一起。

修改方向

  • Thread Pool 定義&宣告

    概念上參考Wiki,以及C pool 筆記
    簡略步驟大概是:

    1. 設計定義 Thread Pool 結構

      ​​​​​​​​typedef struct _THREAD_POOL_ threadpool_t; ​​​​​​​​struct _THREAD_POOL_ { ​​​​​​​​ pthread_t threads; ​​​​​​​​ threadpool_task_t *queue; ​​​​​​​​ pthread_mutex_t lock; ​​​​​​​​ pthread_cond_t notify; ​​​​​​​​ int thread_count; ​​​​​​​​ int queue_size; ​​​​​​​​ int head; ​​​​​​​​ int tail; ​​​​​​​​ int shutdown; ​​​​​​​​ int started; ​​​​​​​​ int count; ​​​​​​​​};

      解析:

      • threads: 儲存所有 create 的threads
      • queue: 工作佇列結構(Linklist)
      • lock: threadpool中的互斥變數
      • notify: threadpool中的條件變數
      • thread_count: 總共 thread 數量
      • queue_size: task 佇列大小
      • head: 第一個 element 的索引
      • tail: 下一個 element 的索引
      • count: 等待中的 task 數量
      • started: 可使用的 thread 數量
      • shutdown: 標籤確認 pool 是否關閉
    2. 建立 Task queue 結構

      ​​​​​​​​typedef struct _THREAD_TASK_ { ​​​​​​​​ void (*func)(void *); ​​​​​​​​ void *argument; ​​​​​​​​} threadpool_task_t;

      解析:

      • void (*func)(void *): 存放執行 function 的位置(function pointer)
      • void *argument: 參數指標 (將參數設計成一個結構體)
    3. 建立 Thread_pool,參數thread數量、queue大小、thread args

      ​​​​​​​​threadpool_t *threadpool_init(int thread_count, int queue_size, int flags);
    4. 新增 Task to Queuek

      ​​​​​​​​int threadpool_add_task(threadpool_t *pool, void(*routine)(void *), ​​​​​​​​ void *arg, int flags);
    5. 刪除 Thread pool

      ​​​​​​​​int threadpool_destroy(threadpool_t *pool, int flags);
  • Thread pool 實作

    • thread init

      ​​​​​​​​threadpool_t *threadpool_init(int thread_count, int queue_size, int flags) ​​​​​​​​{ ​​​​​​​​ /* #0 malloc pool */ ​​​​​​​​ threadpool *pool = (threadpool_t *) malloc(sizeof(threadpool_t)); ​​​​​​​​ int i; ​​​​​​​​ assert( "threadpool_init()" && thread_count > 0 && queue_size > 0 ); ​​​​​​​​ /* #1 Initialize */ ​​​​​​​​ pool->thread_count = 0; ​​​​​​​​ pool->queue_size = queue_size; ​​​​​​​​ pool->head = pool->tail = pool->count = 0; ​​​​​​​​ pool->shutdown = pool->started = 0; ​​​​​​​​ /* #2 Allocate thread and task queue */ ​​​​​​​​ pool->threads = (pthread_t *) malloc (sizeof(pthread_t) * thread_count); ​​​​​​​​ pool->queue = (threadpool_task_t *) malloc (sizeof(threadpool_task_t) * queue_size); ​​​​​​​​ /* #3 Initialize mutex and conditional variable first */ ​​​​​​​​ pthread_mutex_init(&(pool->lock), NULL); ​​​​​​​​ pthread_cond_init(&(pool->notify), NULL); ​​​​​​​​ /* #4 Start worker threads */ ​​​​​​​​ for (i = 0; i < thread_count; i++) { ​​​​​​​​ ¦ if (pthread_create(&(pool->threads[i]), NULL, ​​​​​​​​ ¦ ¦ ¦ ¦ ¦ ¦ threadpool_thread, (void *)pool) != 0 ) { ​​​​​​​​ ¦ ¦ threadpool_destroy(pool, 0); ​​​​​​​​ ¦ ¦ return NULL; ​​​​​​​​ ¦ } ​​​​​​​​ ¦ pool->thread_count++; ​​​​​​​​ ¦ pool->started++; ​​​​​​​​ } ​​​​​​​​ return pool; ​​​​​​​​}

      解析步驟:

      • #0 初始化(申請記憶體) threadpool
      • #1 初始化 thread pool struct 內容
      • #2 初始化(申請記憶體) threads & task queue 總共的大小
      • #3 初始化互斥變數&條件變數
      • #4 建立 pthread 實體,儲存於 threadpool->threads,thread_count & started 紀錄數量
    • thread add task

      ​​​​​​​​int threadpool_add_task(threadpool_t *pool, void(*func)(void *),void *arg, int flags) ​​​​​​​​{ ​​​​​​​​ int err = 0; ​​​​​​​​ int next; ​​​​​​​​ /* Check argument pool & function */ ​​​​​​​​ if (pool == NULL || func == NULL) ​​​​​​​​ ¦ return threadpool_invalid; ​​​​​​​​ /* Check if we get lock ? */ ​​​​​​​​ if (pthread_mutex_lock(&(pool->lock)) != 0) ​​​​​​​​ ¦ return threadpool_lock_failure; ​​​​​​​​ next = (pool->tail + 1) % pool->queue_size; ​​​​​​​​ /* loop do add task */ ​​​​​​​​ do { ​​​​​​​​ ¦ /* Are task full ? */ ​​​​​​​​ ¦ if (pool->count == pool->queue_size) { ​​​​​​​​ ¦ ¦ err = threadpool_queue_full; ​​​​​​​​ ¦ ¦ break; ​​​​​​​​ ¦ } ​​​​​​​​ ¦ /* Are we shutting down */ ​​​​​​​​ ¦ if (pool->shutdown) { ​​​​​​​​ ¦ ¦ err = threadpool_shutdown; ​​​​​​​​ ¦ ¦ break; ​​​​​​​​ ¦ } ​​​​​​​​ ¦ /* Add task to queue */ ​​​​​​​​ ¦ pool->queue[pool->tail].func = func; ​​​​​​​​ ¦ pool->queue[pool->tail].argument = arg; ​​​​​​​​ ¦ pool->tail = next; ​​​​​​​​ ¦ pool->count += 1; ​​​​​​​​ ¦ /* pthread_cond_broadcask */ ​​​​​​​​ ¦ if (pthread_cond_signal(&(pool->notify)) != 0) { ​​​​​​​​ ¦ ¦ err = threadpool_lock_failure; ​​​​​​​​ ¦ ¦ break; ​​​​​​​​ ¦ } ​​​​​​​​ } while (0); ​​​​​​​​ if (pthread_mutex_unlock(&(pool->lock)) != 0) { ​​​​​​​​ ¦ err = threadpool_lock_failure; ​​​​​​​​ } ​​​​​​​​ return err; ​​​​​​​​}
    • thread destroy

      ​​​​​​​​int threadpool_destroy(threadpool_t *pool, int flags) ​​​​​​​​{ ​​​​​​​​ int i, err = 0; ​​​​​​​​ if (pool == NULL) { ​​​​​​​​ ¦ return threadpool_invalid; ​​​​​​​​ } ​​​​​​​​ if (pthread_mutex_lock(&(pool->lock)) != 0) { ​​​​​​​​ ¦ return threadpool_lock_failure; ​​​​​​​​ } ​​​​​​​​ do { ​​​​​​​​ ¦ /* Already shutting down */ ​​​​​​​​ ¦ if (pool->shutdown) { ​​​​​​​​ ¦ ¦ err = threadpool_shutdown; ​​​​​​​​ ¦ ¦ break; ​​​​​​​​ ¦ } ​​​​​​​​ ¦ pool->shutdown = (flags & threadpool_graceful) ? ​​​​​​​​ ¦ ¦ ¦graceful_shutdown : immediate_shutdown; ​​​​​​​​ ¦ /* Wake up all worker threads */ ​​​​​​​​ ¦ if ((pthread_cond_broadcast(&(pool->notify)) ! = 0) || ​​​​​​​​ ¦ ¦ (pthread_mutex_unlock(&(pool->lock)) != 0)) { ​​​​​​​​ ¦ ¦ ¦err = threadpool_lock_failure; ​​​​​​​​ ¦ ¦ ¦break; ​​​​​​​​ ¦ } ​​​​​​​​ ¦ /* Join all worker thread */ ​​​​​​​​ ¦ for (i = 0; i < pool->thread_count; i++) { ​​​​​​​​ ¦ ¦ if (pthread_join(pool->threads[i], NULL) != 0) { ​​​​​​​​ ¦ ¦ ¦ err = threadpool_thread_failure; ​​​​​​​​ ¦ ¦ } ​​​​​​​​ ¦ } ​​​​​​​​ } while (0); ​​​​​​​​ /* Only if everything went well do we deallocate the pool */ ​​​​​​​​ if (!err) { ​​​​​​​​ ¦ threadpool_free(pool); ​​​​​​​​ } ​​​​​​​​ return err; ​​​​​​​​}
    • threadpool free

      ​​​​​​​​int threadpool_free(threadpool_t *pool) ​​​​​​​​{ ​​​​​​​​ if (pool = NULL || pool->started > 0) { ​​​​​​​​ ¦ return -1; ​​​​​​​​ } ​​​​​​​​ /* Did we manage to allocate ? */ ​​​​​​​​ if (pool->threads) { ​​​​​​​​ ¦ free(pool->threads); ​​​​​​​​ ¦ free(pool->queue); ​​​​​​​​ ¦ pthread_mutex_lock(&(pool->lock)); ​​​​​​​​ ¦ pthread_mutex_destroy(&(pool->lock)); ​​​​​​​​ ¦ pthread_cond_destroy(&(pool->notify)); ​​​​​​​​ } ​​​​​​​​ ​​​​​​​​ free(pool); ​​​​​​​​ return 0; ​​​​​​​​}
    • 最主要指定每個 thread 要執行的 Function

      ​​​​​​​​static void *threadpool_thread(void *thread_pool) ​​​​​​​​{ ​​​​​​​​ threadpool_t *pool = (threadpool_t *)threadpool; ​​​​​​​​ threadpool_task_t task; ​​​​​​​​ for(;;) { ​​​​​​​​ /* Lock must be taken to wait on conditional variable */ ​​​​​​​​ pthread_mutex_lock(&(pool->lock)); ​​​​​​​​ /* Wait on condition variable, check for spurious wakeups. ​​​​​​​​ When returning from pthread_cond_wait(), we own the lock. */ ​​​​​​​​ while((pool->count == 0) && (!pool->shutdown)) { ​​​​​​​​ pthread_cond_wait(&(pool->notify), &(pool->lock)); ​​​​​​​​ } ​​​​​​​​ if((pool->shutdown == immediate_shutdown) || ​​​​​​​​ ((pool->shutdown == graceful_shutdown) && ​​​​​​​​ (pool->count == 0))) { ​​​​​​​​ break; ​​​​​​​​ } ​​​​​​​​ /* Grab our task */ ​​​​​​​​ task.func = pool->queue[pool->head].function; ​​​​​​​​ task.argument = pool->queue[pool->head].argument; ​​​​​​​​ pool->head = (pool->head + 1) % pool->queue_size; ​​​​​​​​ pool->count -= 1; ​​​​​​​​ /* Unlock */ ​​​​​​​​ pthread_mutex_unlock(&(pool->lock)); ​​​​​​​​ /* Get to work */ ​​​​​​​​ (*(task.func))(task.argument); ​​​​​​​​ } ​​​​​​​​ pool->started--; ​​​​​​​​ pthread_mutex_unlock(&(pool->lock)); ​​​​​​​​ pthread_exit(NULL); ​​​​​​​​ ​​​​​​​​ return(NULL); ​​​​​​​​}