contributed by TempoJiJi
首先來測試程式效能跟正確性:
size of entry : 24 bytes
execution time of append() : 0.006030 sec
execution time of findName() : 0.003554 sec
確認正確性:
#ifdef OPT
show_entry(e);
#endif
將輸出轉入檔案:
$ ./phonebook_opt > output
結果發現output跟dictionary底下的words.txt檔不一致@@…output少了幾個。寫了個普通的checker.c
檢查後發現少了前面4個words
觀察main.c裏將4個list接起來的地方:
for (int i = 0; i < THREAD_NUM; i++) {
if (i == 0) {
pHead = app[i]->pHead->pNext;
dprintf("Connect %d head string %s %p\n", i,
app[i]->pHead->pNext->lastName, app[i]->ptr);
} else {
etmp->pNext = app[i]->pHead->pNext;
dprintf("Connect %d head string %s %p\n", i,
app[i]->pHead->pNext->lastName, app[i]->ptr);
}
etmp = app[i]->pLast;
dprintf("Connect %d tail string %s %p\n", i, app[i]->pLast->lastName, app[i]->ptr);
dprintf("round %d\n", i);
}
這裏可以看到 pHead = app[i]->pHead->pNext
跟 etmp->pNext = app[i]->pHead->pNext
這兩個將head連接起來時是head->next
而不是head,因此才會少了4個head的entry
修改成 pHead = app[i]->pHead
跟 etmp->pNext = app[i]->pHead
結果就對了
我把append裏的for拆成while loop,因爲原本的for loop裏太多參數了,也把ntread這個變數刪掉,改用THREAD_NUM
,還能節省結構的大小:
char *i = app->ptr;
entry *j = app->entryStart;
while(i < app->eptr){
app->pLast->pNext = j;
app->pLast = app->pLast->pNext;
app->pLast->lastName = i;
app->pLast->pNext = NULL;
i += MAX_LAST_NAME_SIZE * THREAD_NUM;
j += THREAD_NUM;
}
main.c裏的pthread_create
也可以跟上面的new_append_a
合在一起:
for (int i = 0; i < THREAD_NUM; i++){
app[i] = new_append_a(map + MAX_LAST_NAME_SIZE * i,
map + fs, i, entry_pool + i);
pthread_create(&tid[i], NULL, (void *) &append, (void *) app[i]);
}
這樣每次new_append_a
好後,就能直接pthread_create
而不需等到所有app[i]
都計算好後才開始,new_append_a
的參數太多,我將他們分成2行了
最後main.c裏連接所有list的部分,可以將app[0]
額外處理,這樣for loop裏面還能少一個branch:
entry *etmp;
pHead = app[0]->pHead;
etmp = app[0]->pLast;
for (int i = 1; i < THREAD_NUM; i++) {
etmp->pNext = app[i]->pHead;
etmp = app[i]->pLast;
}
其實我不太確定這裏能不能把dprintf
都刪掉?因爲都是用來debug的資訊,並不影響程式的行爲…
main.c裏有很多分散的initializations,可以將它們都放在特定的地方,可以讓整個程式美觀一點:
#ifndef OPT
...
#else
#include "file.c"
#include "debug.h"
#include <fcntl.h>
#define ALIGN_FILE "align.txt"
file_align(DICT_FILE, ALIGN_FILE, MAX_LAST_NAME_SIZE);
int fd = open(ALIGN_FILE, O_RDONLY | O_NONBLOCK);
off_t fs = fsize(ALIGN_FILE);
#endif
像是上面這裏,可以將它們都直接移到append之前
參考資料:Lock-Free 编程是什么?
當程式的某個部分滿足以下需求時,那這個部分就是lock-free:
Lock-Free Programming Techniques:
当我们准备要满足 Lock-Free 编程中的非阻塞条件时,有一系列的技术和方法可供使用,如原子操作(Atomic Operations)、内存栅栏(Memory Barrier)、避免 ABA 问题(Avoiding ABA Problem)等。在什麼情況下要使用哪種技術,可以根據上圖來判斷。
Atomic Read-Modify-Write Operations
Compare-And-Swap Loops
void LockFreeQueue::push(Node* newHead)
{
for (;;) {
// Copy a shared variable (m_Head) to a local.
Node* oldHead = m_Head;
// Do some speculative work, not yet
// visible to other threads.
newHead->next = oldHead;
// Next, attempt to publish our changes to the shared variable.
// If the shared variable hasn't changed, the CAS succeeds and we return.
// Otherwise, repeat.
if (_InterlockedCompareExchange(&m_Head, newHead, oldHead) == oldHead)
return;
}
}
Concurrency: 在一個CPU裏執行兩個或以上的tasks,每個tasks的執行時間都不一致
Parallelism: 同一個問題拆成幾個部分,然後同步同時運行在多個CPU上,可以想成是多個執行緒同時開始同步執行。
參考資料:
wiki
Multithreaded Programming Guide
C 的 Thread Pool 筆記
source
這裏先做個基本的Thread pool(沒有lock-free),因爲自己本身對thread pool還不是很熟悉
首先來參考threadpool-mbrossard的code看看如何實做:
threadpool_t *threadpool_create(int thread_count, int queue_size, int flags)
{
...
... /* Initialize pool*/
...
/* Start worker threads */
for(i = 0; i < thread_count; i++) {
if(pthread_create(&(pool->threads[i]), NULL,
threadpool_thread, (void*)pool) != 0) {
threadpool_destroy(pool, 0);
return NULL;
}
pool->thread_count++;
pool->started++;
}
return pool;
...
}
這裏在建立thread pool,都是一些初始化的動作,然後建立執行緒到threadpool_thread
裏等待task
static void *threadpool_thread(void *threadpool)
{
...
...
for(;;) {
/* Lock must be taken to wait on conditional variable */
pthread_mutex_lock(&(pool->lock));
/* Wait on condition variable, check for spurious wakeups.
When returning from pthread_cond_wait(), we own the lock. */
while((pool->count == 0) && (!pool->shutdown)) {
pthread_cond_wait(&(pool->notify), &(pool->lock));
}
if((pool->shutdown == immediate_shutdown) ||
((pool->shutdown == graceful_shutdown) &&
(pool->count == 0))) {
break;
}
/* Grab our task */
task.function = pool->queue[pool->head].function;
task.argument = pool->queue[pool->head].argument;
pool->head = (pool->head + 1) % pool->queue_size;
pool->count -= 1;
/* Unlock */
pthread_mutex_unlock(&(pool->lock));
/* Get to work */
(*(task.function))(task.argument);
}
...
...
這裏會先搶lock,搶到後就執行cond_wait等待工作(signal由threadpool_add
發送),收到signal被喚醒後就做些初始化的動作就可以去執行task。
int threadpool_add(threadpool_t *pool, void (*function)(void *),
void *argument, int flags)
{
...
do {
...
...
/* Add task to queue */
pool->queue[pool->tail].function = function;
pool->queue[pool->tail].argument = argument;
pool->tail = next;
pool->count += 1;
/* pthread_cond_broadcast */
if(pthread_cond_signal(&(pool->notify)) != 0) {
err = threadpool_lock_failure;
break;
}
} while(0);
...
這裏是發送signal給在threadpool_thread
等待task中的執行緒
參考完畢後,也大致上了解thread pool的流程後,就開始實做:
在main.c裏建立thread pool:
threadpool_t *pool = threadpool_create(THREAD_NUM, POOL_SIZE ,0);
for (int i = 0; i < THREAD_NUM; i++){
app[i] = new_append_a(map + MAX_LAST_NAME_SIZE * i,
map + fs, entry_pool + i, i);
threadpool_add(pool, &append, (void *)app[i], 0);
}
threadpool_destroy(pool, 0);
執行後發現程式並沒有像我的想象中那麼順利,跑出了Segmentation fault
…
利用gdb找出問題所在:
Thread 1 "phonebook_opt" received signal SIGSEGV, Segmentation fault.
__strncasecmp_l_avx () at ../sysdeps/x86_64/multiarch/strcmp-sse42.S:165
165 ../sysdeps/x86_64/multiarch/strcmp-sse42.S: No such file or directory.
(gdb) backtrace
#0 __strncasecmp_l_avx () at ../sysdeps/x86_64/multiarch/strcmp-sse42.S:165
#1 0x00000000004019d7 in findName (lastname=0x7fffffffdc20 "zyxel", pHead=0x7ffff6a97028)
at phonebook_opt.c:14
#2 0x000000000040177a in main (argc=1, argv=0x7fffffffdd38) at main.c:144
(gdb) frame 1
#1 0x00000000004019d7 in findName (lastname=0x7fffffffdc20 "zyxel", pHead=0x7ffff6a97028)
at phonebook_opt.c:14
14 if (strncasecmp(lastname, pHead->lastName, len) == 0
(gdb) print pHead->lastName
$1 = 0x0
看樣子是append並沒有順利建好,研究了一段時間後,發現問題處在destroy thread pool,觀察threadpool_destroy
:
pool->shutdown = (flags & threadpool_graceful) ?
graceful_shutdown : immediate_shutdown;
/* Wake up all worker threads */
if((pthread_cond_broadcast(&(pool->notify)) != 0) ||
(pthread_mutex_unlock(&(pool->lock)) != 0)) {
err = threadpool_lock_failure;
break;
}
/* Join all worker thread */
for(i = 0; i < pool->thread_count; i++) {
if(pthread_join(pool->threads[i], NULL) != 0) {
err = threadpool_thread_failure;
}
}
這裏可以看到pool->shutdown
這一行如果設成immeidate_shutdown
的話,那麼在threadpool_thread
的這裏會直接被shutdown,意思就是在等待工作的執行緒會直接被摧毀:
static void *threadpool_thread(void *threadpool)
{
...
if((pool->shutdown == immediate_shutdown) ||
((pool->shutdown == graceful_shutdown) &&
(pool->count == 0))) {
break;
}
...
所以將pool->shutdown
設成graceful_shutdown
就不會影響到還在等待的執行緒了
不同執行緒比較圖:
在這裏因爲看到Thread Num=8的時候,Cache Miss高的有點誇張,因此就多做一個Cache Miss的比較圖。Thread的數量影響Pool的結構大小,因此會影響到一點效能:
確保每個程序中都至少有一個執行緒在執行中,並不會因爲某個process掛掉了就影響到其它的不能繼續執行。例如搶到lock的process如果沒有釋放lock後就掛掉,那麼其它的process就進不去critical section了,整個程序就會被block着。
再來理解關於lock-free thread pool的原理:
之前所使用的thread pool只有一個work queue,所以所有thread都需要爲共享資源而競爭(像是在等待job的worker)。現在將它們分成很多份queue,這樣不同的執行緒就在不同的work queue裏,因此就可以避免一些共享資源的競爭。
參考xhjcehust/LFTPool的code,先觀察結構:
typedef struct tpool_work {
void (*routine)(void *);
void *arg;
struct tpool_work *next;
} tpool_work_t;
/* Structure for each thread */
typedef struct {
pthread_t id;
int shutdown;
#ifdef DEBUG
int num_works_done;
#endif
unsigned int in; /* offset from start of work_queue where to put work next */
unsigned int out; /* offset from start of work_queue where to get work next */
tpool_work_t work_queue[WORK_QUEUE_SIZE];
} thread_t;
/* thread pool */
typedef struct tpool tpool_t;
typedef thread_t* (*schedule_thread_func)(tpool_t *tpool);
struct tpool {
int num_threads;
thread_t threads[MAX_THREAD_NUM];
schedule_thread_func schedule_thread;
};
void *tpool_init(int num_threads)
ring-buffer
的方式將worker加入queuestatic int wait_for_thread_registration(int num_expected)
int tpool_add_work(void *pool, void(*routine)(void *), void *arg)
dispatch
加入task到work queue中(put work)static void *tpool_thread(void *arg)
static tpool_work_t *get_work_concurrently(thread_t *thread)
get_work_concurrently
利用CAS讓每個執行緒拿到自己的task,因爲要確保從work queue出來時,遞減out
的變數時有同步到最後就是destroy了,destroy時會判斷所有的queue是不是空的,確保所有worker都已經完成工作
使用上述Lock-Free Thread Pool的結果:
size of entry : 24 bytes
execution time of append() : 0.097705 sec
execution time of findName() : 0.003397 sec
效能並沒有想象中好,比之前使用mutex的還來的差
最後由於這裏其實還不是很理解關於signal的部分,加上覺得那份code應該還有一些改進的空間,所以就先不做不同執行緒數量的比較
Linux系統定義了64種訊號,分爲兩大類:可靠信號和不可靠信號:
sigemptyset(&zeromask);
sigemptyset(&newmask);
sigaddset(&newmask, SIGXX);
sigprocmask(SIG_BLOCK, &newmask, &oldmask) ;
while (!CONDITION)
sigsuspend(&zeromask);
sigprocmask(SIG_SETMASK, &oldmask, NULL)
參考資料:
TempoJiJi
phonebook-concurrent
sysprog21