# 2016q3 Homework2 (phonebook-concurrent) contributed by ``TempoJiJi`` ## 預期目標 * 學習第二週提及的 concurrency 程式設計 * 學習 POSIX Thread * 學習效能分析工具 * code refactoring 練習 * 探索 clz 的應用 ## 開發環境 * Ubuntu 16.04 LTS * L1d cache: 32K * L1i cache: 32K * L2 cache: 256K * L3 cache: 3072K * Architecture: x86_64 * CPU op-mode(s): 32-bit, 64-bit * Byte Order: Little Endian * CPU(s): 4 * Model name: Intel(R) Core(TM) i5-4200U CPU @ 1.60GHz --- 首先來測試程式效能跟正確性: ```shell size of entry : 24 bytes execution time of append() : 0.006030 sec execution time of findName() : 0.003554 sec ``` 確認正確性: ```C= #ifdef OPT show_entry(e); #endif ``` 將輸出轉入檔案: ```shell $ ./phonebook_opt > output ``` 結果發現output跟dictionary底下的words.txt檔不一致@@...output少了幾個。寫了個普通的`checker.c`檢查後發現少了前面4個words 觀察main.c裏將4個list接起來的地方: ```c for (int i = 0; i < THREAD_NUM; i++) { if (i == 0) { pHead = app[i]->pHead->pNext; dprintf("Connect %d head string %s %p\n", i, app[i]->pHead->pNext->lastName, app[i]->ptr); } else { etmp->pNext = app[i]->pHead->pNext; dprintf("Connect %d head string %s %p\n", i, app[i]->pHead->pNext->lastName, app[i]->ptr); } etmp = app[i]->pLast; dprintf("Connect %d tail string %s %p\n", i, app[i]->pLast->lastName, app[i]->ptr); dprintf("round %d\n", i); } ``` 這裏可以看到 `pHead = app[i]->pHead->pNext` 跟 `etmp->pNext = app[i]->pHead->pNext` 這兩個將head連接起來時是`head->next`而不是head,因此才會少了4個head的entry 修改成 `pHead = app[i]->pHead` 跟 `etmp->pNext = app[i]->pHead` 結果就對了 --- ## Code Refactoring 我把append裏的for拆成while loop,因爲原本的for loop裏太多參數了,也把ntread這個變數刪掉,改用`THREAD_NUM`,還能節省結構的大小: ```C= char *i = app->ptr; entry *j = app->entryStart; while(i < app->eptr){ app->pLast->pNext = j; app->pLast = app->pLast->pNext; app->pLast->lastName = i; app->pLast->pNext = NULL; i += MAX_LAST_NAME_SIZE * THREAD_NUM; j += THREAD_NUM; } ``` main.c裏的`pthread_create`也可以跟上面的`new_append_a`合在一起: ```C= for (int i = 0; i < THREAD_NUM; i++){ app[i] = new_append_a(map + MAX_LAST_NAME_SIZE * i, map + fs, i, entry_pool + i); pthread_create(&tid[i], NULL, (void *) &append, (void *) app[i]); } ``` 這樣每次`new_append_a`好後,就能直接`pthread_create`而不需等到所有`app[i]`都計算好後才開始,`new_append_a`的參數太多,我將他們分成2行了 最後main.c裏連接所有list的部分,可以將`app[0]`額外處理,這樣for loop裏面還能少一個branch: ```C= entry *etmp; pHead = app[0]->pHead; etmp = app[0]->pLast; for (int i = 1; i < THREAD_NUM; i++) { etmp->pNext = app[i]->pHead; etmp = app[i]->pLast; } ``` 其實我不太確定這裏能不能把`dprintf`都刪掉?因爲都是用來debug的資訊,並不影響程式的行爲... main.c裏有很多分散的initializations,可以將它們都放在特定的地方,可以讓整個程式美觀一點: ```C= #ifndef OPT ... #else #include "file.c" #include "debug.h" #include <fcntl.h> #define ALIGN_FILE "align.txt" file_align(DICT_FILE, ALIGN_FILE, MAX_LAST_NAME_SIZE); int fd = open(ALIGN_FILE, O_RDONLY | O_NONBLOCK); off_t fs = fsize(ALIGN_FILE); #endif ``` 像是上面這裏,可以將它們都直接移到append之前 --- ## 整理一些重要的閱讀資料 ### [An Introduction to Lock-Free Programming](http://preshing.com/20120612/an-introduction-to-lock-free-programming): 參考資料:[Lock-Free 编程是什么?](http://www.cnblogs.com/gaochundong/p/lock_free_programming.html) ![](https://i.imgur.com/QtQfwON.png) 當程式的某個部分滿足以下需求時,那這個部分就是lock-free: * 多執行緒 * shared memory * 執行緒之間不會影響對方,如某個執行緒掛掉了,並不會影響到整個程序的執行 ==Lock-Free Programming Techniques==: ![](https://i.imgur.com/FniTgyR.png) 当我们准备要满足 Lock-Free 编程中的非阻塞条件时,有一系列的技术和方法可供使用,如原子操作(Atomic Operations)、内存栅栏(Memory Barrier)、避免 ABA 问题(Avoiding ABA Problem)等。在什麼情況下要使用哪種技術,可以根據上圖來判斷。 * Atomic Read-Modify-Write Operations * Atomic 是種同步機制,不須透過 explicit lock (如 mutex),也能確保變數之間的同步 * Atomic operations are ones which manipulate memory in a way that appears indivisible. No thread can observe the operation half-complete. 在操作內存時可以被視爲是不可分割的,不會有執行緒只執行到一半 * [Read-modify-write (RMW)](https://en.wikipedia.org/wiki/Read-modify-write): Allowing you to perform more complex transactions atomically. They’re especially useful when a lock-free algorithm must support multiple writers, because when multiple threads attempt an RMW on the same address, they’ll effectively line up in a row and execute those operations one-at-a-time. 可以執行更復雜的原子操作,確保在多個執行緒對同個位置的內存進行存取修改時,只有一個執行緒能夠執行 * Compare-And-Swap Loops * Often, programmers perform compare-and-swap in a loop to repeatedly attempt a transaction. This pattern typically involves copying a shared variable to a local variable, performing some speculative work, and attempting to publish the changes 開發員會重復的執行CAS來完成一個操作,有3個步驟: 1. 將一個共享的變數存進區域變數裏 2. 計算出新的值 3. 检测如果内存位置,如果還是原始的值时,则将新值写入该内存位置 ```C= void LockFreeQueue::push(Node* newHead) { for (;;) { // Copy a shared variable (m_Head) to a local. Node* oldHead = m_Head; // Do some speculative work, not yet // visible to other threads. newHead->next = oldHead; // Next, attempt to publish our changes to the shared variable. // If the shared variable hasn't changed, the CAS succeeds and we return. // Otherwise, repeat. if (_InterlockedCompareExchange(&m_Head, newHead, oldHead) == oldHead) return; } } ``` ### Sequential Consistency * Sequential consistency means that all threads agree on the order in which memory operations occurred, and that order is consistent with the order of operations in the program source code. 所有執行緒都能在規定的順序下執行 ### Concurrency (並行) vs. Parallelism (平行) * Concurrency: 在一個CPU裏執行兩個或以上的tasks,每個tasks的執行時間都不一致 * Concurrency solves the problem of having scarce CPU resources and many tasks. So, you create threads or independent paths of execution through code in order to share time on the scarce resource. Up until recently, concurrency has dominated the discussion because of CPU availability. * 在有限的資源下,分享給每個執行緒(tasks)共用,且每個執行緒都能獨立執行。執行緒之間的執行時間都不一致,因爲每個所佔用到的CPU時間都不一樣 * Parallelism: 同一個問題拆成幾個部分,然後同步同時運行在多個CPU上,可以想成是多個執行緒同時開始同步執行。 * Parallelism solves the problem of finding enough tasks and appropriate tasks (ones that can be split apart correctly) and distributing them over plentiful CPU resources. Parallelism has always been around of course, but it’s coming to the forefront because multi-core processors are so cheap. ### Condition Variables * 當有兩個執行緒A跟B。其中A需要等到一個變數cond轉爲true時才能繼續執行,而這個變數cond掌握在B的手中,那B將cond改爲true後,就能用signal來喚醒在等待中的A。 * 如果不使用這種方法,那A就會需要一直判斷cond是否改爲true了,這會損失很多效能成本,且不能保證cond是否還掌握在其它執行緒手中。 ![](https://i.imgur.com/ChpEMkM.png) --- ## 修改OPT實做Thread Pool 參考資料: [wiki](https://en.wikipedia.org/wiki/Thread_pool) [Multithreaded Programming Guide ](http://docs.oracle.com/cd/E19253-01/816-5137/ggedn/index.html)[C 的 Thread Pool 筆記 ](http://swind.code-life.info/posts/c-thread-pool.html)[source](http://people.clarkson.edu/~jmatthew/cs644.archive/cs644.fa2001/proj/locksmith/code/ExampleTest/threadpool.c) 這裏先做個基本的Thread pool(沒有lock-free),因爲自己本身對thread pool還不是很熟悉 首先來參考[threadpool-mbrossard](https://github.com/mbrossard/threadpool)的code看看如何實做: ```C= threadpool_t *threadpool_create(int thread_count, int queue_size, int flags) { ... ... /* Initialize pool*/ ... /* Start worker threads */ for(i = 0; i < thread_count; i++) { if(pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void*)pool) != 0) { threadpool_destroy(pool, 0); return NULL; } pool->thread_count++; pool->started++; } return pool; ... } ``` 這裏在建立thread pool,都是一些初始化的動作,然後建立執行緒到`threadpool_thread`裏等待task ```C= static void *threadpool_thread(void *threadpool) { ... ... for(;;) { /* Lock must be taken to wait on conditional variable */ pthread_mutex_lock(&(pool->lock)); /* Wait on condition variable, check for spurious wakeups. When returning from pthread_cond_wait(), we own the lock. */ while((pool->count == 0) && (!pool->shutdown)) { pthread_cond_wait(&(pool->notify), &(pool->lock)); } if((pool->shutdown == immediate_shutdown) || ((pool->shutdown == graceful_shutdown) && (pool->count == 0))) { break; } /* Grab our task */ task.function = pool->queue[pool->head].function; task.argument = pool->queue[pool->head].argument; pool->head = (pool->head + 1) % pool->queue_size; pool->count -= 1; /* Unlock */ pthread_mutex_unlock(&(pool->lock)); /* Get to work */ (*(task.function))(task.argument); } ... ... ``` 這裏會先搶lock,搶到後就執行cond_wait等待工作(signal由`threadpool_add`發送),收到signal被喚醒後就做些初始化的動作就可以去執行task。 ```C= int threadpool_add(threadpool_t *pool, void (*function)(void *), void *argument, int flags) { ... do { ... ... /* Add task to queue */ pool->queue[pool->tail].function = function; pool->queue[pool->tail].argument = argument; pool->tail = next; pool->count += 1; /* pthread_cond_broadcast */ if(pthread_cond_signal(&(pool->notify)) != 0) { err = threadpool_lock_failure; break; } } while(0); ... ``` 這裏是發送signal給在`threadpool_thread`等待task中的執行緒 --- 參考完畢後,也大致上了解thread pool的流程後,就開始實做: 在main.c裏建立thread pool: ```C= threadpool_t *pool = threadpool_create(THREAD_NUM, POOL_SIZE ,0); for (int i = 0; i < THREAD_NUM; i++){ app[i] = new_append_a(map + MAX_LAST_NAME_SIZE * i, map + fs, entry_pool + i, i); threadpool_add(pool, &append, (void *)app[i], 0); } threadpool_destroy(pool, 0); ``` 執行後發現程式並沒有像我的想象中那麼順利,跑出了`Segmentation fault`... 利用gdb找出問題所在: ```shell Thread 1 "phonebook_opt" received signal SIGSEGV, Segmentation fault. __strncasecmp_l_avx () at ../sysdeps/x86_64/multiarch/strcmp-sse42.S:165 165 ../sysdeps/x86_64/multiarch/strcmp-sse42.S: No such file or directory. (gdb) backtrace #0 __strncasecmp_l_avx () at ../sysdeps/x86_64/multiarch/strcmp-sse42.S:165 #1 0x00000000004019d7 in findName (lastname=0x7fffffffdc20 "zyxel", pHead=0x7ffff6a97028) at phonebook_opt.c:14 #2 0x000000000040177a in main (argc=1, argv=0x7fffffffdd38) at main.c:144 (gdb) frame 1 #1 0x00000000004019d7 in findName (lastname=0x7fffffffdc20 "zyxel", pHead=0x7ffff6a97028) at phonebook_opt.c:14 14 if (strncasecmp(lastname, pHead->lastName, len) == 0 (gdb) print pHead->lastName $1 = 0x0 ``` 看樣子是append並沒有順利建好,研究了一段時間後,發現問題處在destroy thread pool,觀察`threadpool_destroy`: ```C= pool->shutdown = (flags & threadpool_graceful) ? graceful_shutdown : immediate_shutdown; /* Wake up all worker threads */ if((pthread_cond_broadcast(&(pool->notify)) != 0) || (pthread_mutex_unlock(&(pool->lock)) != 0)) { err = threadpool_lock_failure; break; } /* Join all worker thread */ for(i = 0; i < pool->thread_count; i++) { if(pthread_join(pool->threads[i], NULL) != 0) { err = threadpool_thread_failure; } } ``` 這裏可以看到`pool->shutdown`這一行如果設成`immeidate_shutdown`的話,那麼在`threadpool_thread`的這裏會直接被shutdown,意思就是在等待工作的執行緒會直接被摧毀: ```C= static void *threadpool_thread(void *threadpool) { ... if((pool->shutdown == immediate_shutdown) || ((pool->shutdown == graceful_shutdown) && (pool->count == 0))) { break; } ... ``` 所以將`pool->shutdown`設成`graceful_shutdown`就不會影響到還在等待的執行緒了 不同執行緒比較圖: ![](https://i.imgur.com/tj9Jocc.png) 在這裏因爲看到Thread Num=8的時候,Cache Miss高的有點誇張,因此就多做一個Cache Miss的比較圖。Thread的數量影響Pool的結構大小,因此會影響到一點效能: ![](https://i.imgur.com/9oCnP2y.png) --- ## Lock-Free Thread Pool ### Lock-Free 確保每個程序中都至少有一個執行緒在執行中,並不會因爲某個process掛掉了就影響到其它的不能繼續執行。例如搶到lock的process如果沒有釋放lock後就掛掉,那麼其它的process就進不去critical section了,整個程序就會被block着。 再來理解關於lock-free thread pool的原理: ![](https://i.imgur.com/sOiq2Xx.png) 之前所使用的thread pool只有一個work queue,所以所有thread都需要爲共享資源而競爭(像是在等待job的worker)。現在將它們分成很多份queue,這樣不同的執行緒就在不同的work queue裏,因此就可以避免一些共享資源的競爭。 參考[xhjcehust/LFTPool](https://github.com/xhjcehust/LFTPool)的code,先觀察結構: ```c= typedef struct tpool_work { void (*routine)(void *); void *arg; struct tpool_work *next; } tpool_work_t; /* Structure for each thread */ typedef struct { pthread_t id; int shutdown; #ifdef DEBUG int num_works_done; #endif unsigned int in; /* offset from start of work_queue where to put work next */ unsigned int out; /* offset from start of work_queue where to get work next */ tpool_work_t work_queue[WORK_QUEUE_SIZE]; } thread_t; /* thread pool */ typedef struct tpool tpool_t; typedef thread_t* (*schedule_thread_func)(tpool_t *tpool); struct tpool { int num_threads; thread_t threads[MAX_THREAD_NUM]; schedule_thread_func schedule_thread; }; ``` * tpool_work_t 就是每個worker要做的task * thread_t 則是每個執行緒的結構 * tpool_t 就是thread pool,schedule_thread_func是用來決定要用哪種方式來進行put work ### 程式流程: ```c= void *tpool_init(int num_threads) ``` * 一開始對thread pool進行初始化 * 初始化時決定要用哪種方式進行put work,這裏選用` ring-buffer`的方式將worker加入queue ```c= static int wait_for_thread_registration(int num_expected) ``` * 確保所有執行緒都在準備中 ```c= int tpool_add_work(void *pool, void(*routine)(void *), void *arg) ``` * 配合`dispatch`加入task到work queue中(put work) ```c= static void *tpool_thread(void *arg) ``` * 再來就是給worker task的地方,這裏信號的溝通處理方式還需要再研究一下.... * 這裏並沒有用到任何的mutex_lock,觀察了一下也沒有發現有critical section的部分存在,所以就算執行緒在這裏或某個地方掛掉了,也不會影響到其它的process ```c= static tpool_work_t *get_work_concurrently(thread_t *thread) ``` * `get_work_concurrently`利用CAS讓每個執行緒拿到自己的task,因爲要確保從work queue出來時,遞減`out`的變數時有同步到 最後就是destroy了,destroy時會判斷所有的queue是不是空的,確保所有worker都已經完成工作 --- 使用上述Lock-Free Thread Pool的結果: ```shell size of entry : 24 bytes execution time of append() : 0.097705 sec execution time of findName() : 0.003397 sec ``` 效能並沒有想象中好,比之前使用mutex的還來的差 最後由於這裏其實還不是很理解關於signal的部分,加上覺得那份code應該還有一些改進的空間,所以就先不做不同執行緒數量的比較 --- ## 理解Signal Linux系統定義了64種訊號,分爲兩大類:可靠信號和不可靠信號: * 不可靠信號:不支持排隊,信號可能會丟失,例如發送多次相同的訊號,行程也只會收到一次 * 可靠信號:支持排隊,信號不會丟失,發多少次,就收到多少次 ```c= sigemptyset(&zeromask); sigemptyset(&newmask); sigaddset(&newmask, SIGXX); sigprocmask(SIG_BLOCK, &newmask, &oldmask) ; while (!CONDITION) sigsuspend(&zeromask); sigprocmask(SIG_SETMASK, &oldmask, NULL) ``` 參考資料: * [高效线程池之无锁化实现(Linux C)](http://blog.csdn.net/xhjcehust/article/details/45844901) * [Non-blocking algorithm](https://en.wikipedia.org/wiki/Non-blocking_algorithm) * [Linux信号(signal)机制](http://gityuan.com/2015/12/20/signal/#section-5zh) * [阻塞信号](http://docs.linuxtone.org/ebooks/C&CPP/c/ch33s03.html) * [lock free的理解](http://www.isnowfy.com/understand-to-lock-free/) ###### tags: `TempoJiJi` `phonebook-concurrent` `sysprog21`