contributed by < Chihsiang
>
作業說明
參考實驗共筆
在作業區上的連結記得是貼「發佈」後的短連結喔!
課程助教
mmap + text_align
從共筆分析動機來看用這個做法目的是希望可以加速 file read 的速度,做法把字串補足成一樣長度,把 file 的資料對應到 memory,利用 offset 的方式移動建立資料儲存。
text_align(DICT_FILE, ALIGN_FILE, MAX_LAST_NAME_SIZE);
int fd = open(ALIGN_FILE, O_RDONLY | O_NONBLOCK);
off_t file_size = fsize(ALIGN_FILE);
在讀取檔案的部分使用O_NONBLOCK
char *map;
map = mmap(NULL, file_size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
參考 CheHsuan
mmap 是將檔案內容映射到一塊 virtual memory address,藉由對 memory address 修改的方式提高 IO 的效率相對傳統 read / write ,mmap 減少資料 load 到緩衝區的 overhead
關於其中 flags MAP_PRIVATE 當修改內容時,並不會直接修改到原始檔案,也不會更新回原本的檔案,依然是修改到一份映射的複製品。
entry_pool = (entry *)malloc(sizeof(entry) *
file_size / MAX_LAST_NAME_SIZE);
這部分直接分配足夠存入所有資料的記憶體,給建立資料結構所使用
有幾點疑問:
pthread
pthread_setconcurrency(THREAD_NUM + 1);
//main.c#101
pthread_setconcurrency(THREAD_NUM + 1);
for (int i = 0; i < THREAD_NUM; i++)
// Created by malloc, remeber to free them.
thread_args[i] = createThread_arg(map + MAX_LAST_NAME_SIZE * i, map + file_size, i,
THREAD_NUM, entry_pool + i);
/* Deliver the jobs to all threads and wait for completing */
for (int i = 0; i < THREAD_NUM; i++)
pthread_create(&threads[i], NULL, (void *)&append, (void *)thread_args[i]);
for (int i = 0; i < THREAD_NUM; i++)
pthread_join(threads[i], NULL);
/* Connect the linked list of each thread */
for (int i = 0; i < THREAD_NUM; i++) {
if (i == 0) {
pHead = thread_args[i]->lEntry_head->pNext;
} else {
e->pNext = thread_args[i]->lEntry_head->pNext;
}
e = thread_args[i]->lEntry_tail;
}
// phonebook_opt.c
void append(void *arg)
{
thread_arg *t_arg = (thread_arg *) arg;
int count = 0;
entry *j = t_arg->lEntryPool_begin;
for (char *i = t_arg->data_begin; i < t_arg->data_end;
i += MAX_LAST_NAME_SIZE * t_arg->numOfThread,
j += t_arg->numOfThread, count++) {
/* Append the new at the end of the local linked list */
t_arg->lEntry_tail->pNext = j;
t_arg->lEntry_tail = t_arg->lEntry_tail->pNext;
t_arg->lEntry_tail->lastName = i;
t_arg->lEntry_tail->pNext = NULL;
t_arg->lEntry_tail->dtl = NULL;
}
pthread_exit(NULL);
}
研究許久,先從pthread_create
的部份看起,根據指定的 Thread 數量,把 append 分配給不同的 thread 處理,pthread_join
的部份則將他們處理完之後合併回主 Thread ,最後一部分比較主要的把處理好的資料連結再一起。
Thread Pool 定義&宣告
概念上參考Wiki,以及C pool 筆記
簡略步驟大概是:
設計定義 Thread Pool 結構
typedef struct _THREAD_POOL_ threadpool_t;
struct _THREAD_POOL_ {
pthread_t threads;
threadpool_task_t *queue;
pthread_mutex_t lock;
pthread_cond_t notify;
int thread_count;
int queue_size;
int head;
int tail;
int shutdown;
int started;
int count;
};
解析:
建立 Task queue 結構
typedef struct _THREAD_TASK_ {
void (*func)(void *);
void *argument;
} threadpool_task_t;
解析:
建立 Thread_pool,參數thread數量、queue大小、thread args
threadpool_t *threadpool_init(int thread_count, int queue_size, int flags);
新增 Task to Queuek
int threadpool_add_task(threadpool_t *pool, void(*routine)(void *),
void *arg, int flags);
刪除 Thread pool
int threadpool_destroy(threadpool_t *pool, int flags);
Thread pool 實作
thread init
threadpool_t *threadpool_init(int thread_count, int queue_size, int flags)
{
/* #0 malloc pool */
threadpool *pool = (threadpool_t *) malloc(sizeof(threadpool_t));
int i;
assert( "threadpool_init()" && thread_count > 0 && queue_size > 0 );
/* #1 Initialize */
pool->thread_count = 0;
pool->queue_size = queue_size;
pool->head = pool->tail = pool->count = 0;
pool->shutdown = pool->started = 0;
/* #2 Allocate thread and task queue */
pool->threads = (pthread_t *) malloc (sizeof(pthread_t) * thread_count);
pool->queue = (threadpool_task_t *) malloc (sizeof(threadpool_task_t) * queue_size);
/* #3 Initialize mutex and conditional variable first */
pthread_mutex_init(&(pool->lock), NULL);
pthread_cond_init(&(pool->notify), NULL);
/* #4 Start worker threads */
for (i = 0; i < thread_count; i++) {
¦ if (pthread_create(&(pool->threads[i]), NULL,
¦ ¦ ¦ ¦ ¦ ¦ threadpool_thread, (void *)pool) != 0 ) {
¦ ¦ threadpool_destroy(pool, 0);
¦ ¦ return NULL;
¦ }
¦ pool->thread_count++;
¦ pool->started++;
}
return pool;
}
解析步驟:
thread add task
int threadpool_add_task(threadpool_t *pool, void(*func)(void *),void *arg, int flags)
{
int err = 0;
int next;
/* Check argument pool & function */
if (pool == NULL || func == NULL)
¦ return threadpool_invalid;
/* Check if we get lock ? */
if (pthread_mutex_lock(&(pool->lock)) != 0)
¦ return threadpool_lock_failure;
next = (pool->tail + 1) % pool->queue_size;
/* loop do add task */
do {
¦ /* Are task full ? */
¦ if (pool->count == pool->queue_size) {
¦ ¦ err = threadpool_queue_full;
¦ ¦ break;
¦ }
¦ /* Are we shutting down */
¦ if (pool->shutdown) {
¦ ¦ err = threadpool_shutdown;
¦ ¦ break;
¦ }
¦ /* Add task to queue */
¦ pool->queue[pool->tail].func = func;
¦ pool->queue[pool->tail].argument = arg;
¦ pool->tail = next;
¦ pool->count += 1;
¦ /* pthread_cond_broadcask */
¦ if (pthread_cond_signal(&(pool->notify)) != 0) {
¦ ¦ err = threadpool_lock_failure;
¦ ¦ break;
¦ }
} while (0);
if (pthread_mutex_unlock(&(pool->lock)) != 0) {
¦ err = threadpool_lock_failure;
}
return err;
}
thread destroy
int threadpool_destroy(threadpool_t *pool, int flags)
{
int i, err = 0;
if (pool == NULL) {
¦ return threadpool_invalid;
}
if (pthread_mutex_lock(&(pool->lock)) != 0) {
¦ return threadpool_lock_failure;
}
do {
¦ /* Already shutting down */
¦ if (pool->shutdown) {
¦ ¦ err = threadpool_shutdown;
¦ ¦ break;
¦ }
¦ pool->shutdown = (flags & threadpool_graceful) ?
¦ ¦ ¦graceful_shutdown : immediate_shutdown;
¦ /* Wake up all worker threads */
¦ if ((pthread_cond_broadcast(&(pool->notify)) ! = 0) ||
¦ ¦ (pthread_mutex_unlock(&(pool->lock)) != 0)) {
¦ ¦ ¦err = threadpool_lock_failure;
¦ ¦ ¦break;
¦ }
¦ /* Join all worker thread */
¦ for (i = 0; i < pool->thread_count; i++) {
¦ ¦ if (pthread_join(pool->threads[i], NULL) != 0) {
¦ ¦ ¦ err = threadpool_thread_failure;
¦ ¦ }
¦ }
} while (0);
/* Only if everything went well do we deallocate the pool */
if (!err) {
¦ threadpool_free(pool);
}
return err;
}
threadpool free
int threadpool_free(threadpool_t *pool)
{
if (pool = NULL || pool->started > 0) {
¦ return -1;
}
/* Did we manage to allocate ? */
if (pool->threads) {
¦ free(pool->threads);
¦ free(pool->queue);
¦ pthread_mutex_lock(&(pool->lock));
¦ pthread_mutex_destroy(&(pool->lock));
¦ pthread_cond_destroy(&(pool->notify));
}
free(pool);
return 0;
}
最主要指定每個 thread 要執行的 Function
static void *threadpool_thread(void *thread_pool)
{
threadpool_t *pool = (threadpool_t *)threadpool;
threadpool_task_t task;
for(;;) {
/* Lock must be taken to wait on conditional variable */
pthread_mutex_lock(&(pool->lock));
/* Wait on condition variable, check for spurious wakeups.
When returning from pthread_cond_wait(), we own the lock. */
while((pool->count == 0) && (!pool->shutdown)) {
pthread_cond_wait(&(pool->notify), &(pool->lock));
}
if((pool->shutdown == immediate_shutdown) ||
((pool->shutdown == graceful_shutdown) &&
(pool->count == 0))) {
break;
}
/* Grab our task */
task.func = pool->queue[pool->head].function;
task.argument = pool->queue[pool->head].argument;
pool->head = (pool->head + 1) % pool->queue_size;
pool->count -= 1;
/* Unlock */
pthread_mutex_unlock(&(pool->lock));
/* Get to work */
(*(task.func))(task.argument);
}
pool->started--;
pthread_mutex_unlock(&(pool->lock));
pthread_exit(NULL);
return(NULL);
}