# 2016q3 Homework2 (phonebook-concurrent) contributed by <`RayPan`> ## 開發環境 * Ubuntu 16.04 LTS * CPU ``` Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little Endian CPU(s): 4 On-line CPU(s) list: 0-3 Thread(s) per core: 2 Core(s) per socket: 2 Socket(s): 1 NUMA node(s): 1 Vendor ID: GenuineIntel CPU family: 6 Model: 60 Model name: Intel(R) Core(TM) i5-4200H CPU @ 2.80GHz Stepping: 3 CPU MHz: 2869.015 CPU max MHz: 3400.0000 CPU min MHz: 800.0000 BogoMIPS: 5587.49 Virtualization: VT-x L1d cache: 32K L1i cache: 32K L2 cache: 256K L3 cache: 3072K NUMA node0 CPU(s): 0-3 ``` --- ## 開發目標 * 學習第二週提及的 concurrency 程式設計 * 學習 POSIX Thread * 學習效能分析工具 * code refactoring 練習 * 探索 clz 的應用 --- ## POSIX Thread 參考資料: [Getting Started With POSIX Threads](http://www.csie.ntu.edu.tw/~r92094/c++/pthread.txt)、[Pthread程式撰寫](http://blog.xuite.net/nikoung/wretch/154071878-Pthread+%E7%A8%8B%E5%BC%8F%E6%92%B0%E5%AF%AB) 、[冼鏡光的並行計算投影片](http://blog.dcview.com/article.php?a=BTgBYw1qUWIAaQ%3D%3D)、[POSIX threads explained](http://www.ibm.com/developerworks/library/l-posix1/index.html) * #### Parallel vs. Concurrent 1. parallel : 兩件事齊頭並進 2. concurrent:電腦系統中,執行的程式數目通常比CPU(加上它們的核心,core)的數目大很多,因此作業系統的做法是讓這個程式使用CPU很短一段時間、再讓另一個程式使用很短一段時間等等(交錯執行)。因爲每一個程式佔用CPU的時間很短,很快又輪回同一個程式,所以對使用人而言可能完全沒有程式在斷斷續續執行的感覺,我們會覺得動作是連續的。 * #### Process/Thread 一個執行的程式(Process)可以分成幾條執行緒(thread),每一條執行緒處理賦予它的工作;ie,一個行程可以分成三條執行緒,一條負責讀取各種輸入、第二條處理視窗的輸出、第三條從事計算的工作,這三條執行線彼此之間相互協調配合、完成該程式的工作。 ![](https://i.imgur.com/TQq21Bb.jpg) ###### [圖片來源](https://hackpad.com/ep/pad/static/5TOjUJI2rKu) POSIX標準中支援的執行緒函式庫稱為pthread,我們可以透過pthread結構與pthread_create() 函數執行某個函數指標,以建立新的執行緒。 * #### pthread_create 建立一個thread並執行附帶的function ```clike pthread_t mythread; pthread_attr_t mythread_attribute; void thread_function(void *argument); char *some_argument; int pthread_create(&mythread, mythread_attribute, (void *)&thread_function,(void *)&some_argument); ``` 1.參數mythread為pthread的指標,在使用Thread之前必須要先宣告一個pthread_t的變數。 2.參數mythread_attribute為該Thread的屬性,預設是NULL,如果沒有其他特殊的需求直接填入NULL即可。 3.參數thread_function)(void *)為Function pointer,這邊要放入你要執行的Function。 4.參數void *some_argument為Function pointer所要帶的參數。 回傳值: 如果執行成功則回傳0,如果執行失敗則回傳錯誤代碼。 * #### pthread_join 暫停目前執行pthread_join的Thread,等到目標Thread執行完畢之後目前執行pthread_join的Thread才會繼續執行。 ```clike int pthread_join (pthread_t thread, void **value_ptr) ``` 1.參數:pthread_t thread為要等待的目標Thread。 2.參數:void **value_ptr用來取得目標Thread的回傳值。 回傳值: 如果執行成功則回傳0,如果執行失敗則回傳錯誤代碼。 * 簡單範例程式 ```c=1 /*thread1.c*/ #include <pthread.h> #include <stdlib.h> #include <stdio.h> #include <unistd.h> void *thread_function(void *arg) { int i; for ( i=0; i<20; i++ ) { printf("Thread says hi!\n"); sleep(1); } return NULL; } int main(void) { pthread_t mythread; if ( pthread_create( &mythread, NULL, thread_function, NULL) ) { printf("error creating thread."); abort(); } if ( pthread_join ( mythread, NULL ) ) { printf("error joining thread."); abort(); } exit(0); } ``` 主程式中本身會產生一條執行緒,thread_create又建立一條新執行緒,所以這個程式包含兩條執行緒。 主程式的執行緒繼續執行下一個if(pthread_join....)的判斷 新建的執行緒結束時,會停止並等待被併入或加入其他執行緒。 pthread_create() 會分裂成兩條執行緒,pthread_join() 則會將兩條執行緒合併為一。 在 thread_function() 完成時,主程式的執行緒已經呼叫pthread_join(),當這個情況發生時,主程式的執行緒會 block住(go to sleep) 並等待 thread_function() 完成。 當thread_fuction()完成時,pthread_join()會回傳值,所以現在我們的程式又有一條主執行緒,所以必須好好處理每條新建的執行緒,否則達到系統的最大值執行緒數量限制會造成pthread_create錯誤。 ```clike=1 /*thread2.c*/ #include <pthread.h> #include <stdlib.h> #include <unistd.h> #include <stdio.h> int myglobal; void *thread_function(void *arg) { int i,j; for ( i=0; i<20; i++ ) { j=myglobal; j=j+1; printf("."); fflush(stdout); sleep(1); myglobal=j; } return NULL; } int main(void) { pthread_t mythread; int i; if ( pthread_create( &mythread, NULL, thread_function, NULL) ) { printf("error creating thread."); abort(); } for ( i=0; i<20; i++) { myglobal=myglobal+1; printf("o"); fflush(stdout); sleep(1); } if ( pthread_join ( mythread, NULL ) ) { printf("error joining thread."); abort(); } printf("\nmyglobal equals %d\n",myglobal); exit(0); } ``` 執行結果 ``` o.o.o.o.o..o.o.o.o.o.o.o.oo..oo.o..o.oo. myglobal equals 21 ``` myglobal起始值為0,主執行緒與新執行緒各將myglobal增加20,照理說應該是40,但輸出21原因為何? 因為兩條執行緒同時執行,當thread_fucntion把j值寫回myglobal,他會複寫主執行緒中的修改。 解決上述問題 加入互斥鎖(mutex) ```c=1 /*thread3.c*/ #include <pthread.h> #include <stdlib.h> #include <unistd.h> #include <stdio.h> int myglobal; pthread_mutex_t mymutex=PTHREAD_MUTEX_INITIALIZER; void *thread_function(void *arg) { int i,j; for ( i=0; i<20; i++ ) { pthread_mutex_lock(&mymutex); j=myglobal; j=j+1; printf("."); fflush(stdout); sleep(1); myglobal=j; pthread_mutex_unlock(&mymutex); } return NULL; } int main(void) { pthread_t mythread; int i; if ( pthread_create( &mythread, NULL, thread_function, NULL) ) { printf("error creating thread."); abort(); } for ( i=0; i<20; i++) { pthread_mutex_lock(&mymutex); myglobal=myglobal+1; pthread_mutex_unlock(&mymutex); printf("o"); fflush(stdout); sleep(1); } if ( pthread_join ( mythread, NULL ) ) { printf("error joining thread."); abort(); } printf("\nmyglobal equals %d\n",myglobal); exit(0); } ``` 兩條執行緒不能同時被一個互斥鎖鎖住,如果執行緒A想要鎖一個mutex然而執行緒B已經鎖住該mutex,A會sleep。 當B釋出這個mutex(透過pthread_mutex_unlock()),A即可鎖住該mutex。 亦即當一個mutex被鎖住,所有想要鎖的執行緒會被queue up。 pthread_mutex_lock()以及pthread_mutex_unlock()確保一個資料結構同時只能被一個執行緒存取,用來保護該資料結構。 * fflush(stdout) : forces a write of all user-space buffered data for the given output or update stream via the stream's underlying write function. The open status of the stream is unaffected. stdout是標準輸出。有時候,我們輸出到stdout的內容不能及時輸出,因為stdout的緩衝區沒有滿或者其他原因,fflush(stdout)就是強迫把stdout內容輸出並清空stdout。 --- ## 記憶體映射函數 mmap 參考資料:[記憶體映射函數 mmap](http://welkinchen.pixnet.net/blog/post/41312211-%E8%A8%98%E6%86%B6%E9%AB%94%E6%98%A0%E5%B0%84%E5%87%BD%E6%95%B8-mmap-%E7%9A%84%E4%BD%BF%E7%94%A8%E6%96%B9%E6%B3%95) * #### 簡介 把文件內容映射到一段記憶體上(準確說是虛擬記憶體上),通過對這段記憶體的讀取和修改,實現對文件的讀取和修改。 * #### 函數 ```clike void *mmap(void *start,size_t length, int prot, int flags, int fd, off_t offsize); ``` 1.參數start:指向欲映射的核心起始位址,通常設為NULL,代表讓系統自動選定位址,核心會自己在行程位址空間中選擇合適的位址建立映射。 映射成功後返回該位址。如果不是NULL,則給核心一個提示,應該從什麼位址開始映射,核心會選擇start之上的某個合適的位址開始映射。 建立映射後,真正的映射位址通過返回值可以得到。 2.參數length:代表映射的大小。將文件的多大長度映射到記憶體。 3.參數prot:映射區域的保護方式。 可以為以下幾種方式的組合: `PROT_EXEC` 映射區域可被執行 `PROT_READ` 映射區域可被讀取 `PROT_WRITE` 映射區域可被寫入 `PROT_NONE` 映射區域不能存取 4.參數flags:影響映射區域的各種特性。在調用mmap()時必須要指定MAP_SHARED 或MAP_PRIVATE。 MAP_FIXED 如果參數start所指的位址無法成功建立映射時,則放棄映射,不對位址做修正。通常不鼓勵用此旗標。 MAP_SHARED 允許其他映射該文件的行程共享,對映射區域的寫入數據會複製回文件。 MAP_PRIVATE 不允許其他映射該文件的行程共享,對映射區域的寫入操作會產生一個映射的複製(copy-on-write),對此區域所做的修改不會寫回原文件。 MAP_ANONYMOUS 建立匿名映射。此時會忽略參數fd,不涉及文件,而且映射區域無法和其他進程共享。 MAP_DENYWRITE 只允許對映射區域的寫入操作,其他對文件直接寫入的操作將會被拒絕。 MAP_LOCKED 將映射區域鎖定住,這表示該區域不會被置換(swap)。 5.參數fd:由open返回的文件描述符,代表要映射到核心中的文件。如果使用匿名核心映射時,即flags中設置了MAP_ANONYMOUS,fd設為-1。 有些系統不支持匿名核心映射,則可以使用fopen打開/dev/zero文件,然後對該文件進行映射,可以同樣達到匿名核心映射的效果。 6.參數offset:從文件映射開始處的偏移量,通常為0,代表從文件最前方開始映射。 offset必須是分頁大小的整數倍(在32位體系統結構上通常是4K)。 * #### 步驟 1.用open系統調用打開文件, 並返回描述符fd. 2.用mmap建立記憶體映射, 並返回映射開始的地址指標start. 3.對映射(文件)進行各種操作 4.用munmap(void *start, size_t lenght)關閉記憶體映射. 5.用close系統調用關閉文件fd. --- ## 程式理解 首先將原始字典檔`DICT_FILE`輸出為一個「每筆資料長度為MAX_LAST_NAME_SIZE,剩餘補`\0`」的align.txt檔。 利用open函式打開文件檔align.txt,存取到fd ```c=55 int fd = open(ALIGN_FILE, O_RDONLY | O_NONBLOCK); ``` O_RDONLY:以只能讀取方式打開文件 O_NONBLOCK 以不可阻斷的方式打開文件,也就是無論有無數據讀取或等待,都會立刻返回行程之中。 利用mmap函式將fd映射到指標map所指到的記憶體空間。 ```c=78 char *map = mmap(NULL, fs, PROT_READ, MAP_SHARED, fd, 0); ``` 由於程式切成THREAD_NUM個執行緒執行 以下將他們接起來 ```c=114 for (int i = 0; i < THREAD_NUM; i++) { if (i == 0) { pHead = app[i]->pHead->pNext; dprintf("Connect %d head string %s %p\n", i, app[i]->pHead->pNext->lastName, app[i]->ptr); } else { etmp->pNext = app[i]->pHead->pNext; dprintf("Connect %d head string %s %p\n", i, app[i]->pHead->pNext->lastName, app[i]->ptr); } etmp = app[i]->pLast; dprintf("Connect %d tail string %s %p\n", i, app[i]->pLast->lastName, app[i]->ptr); dprintf("round %d\n", i); } ``` `assert(argc == 4 && "./main orgfName modfName Padded")`:若符合條件式則程式可以繼續執行;否則會秀出維護錯誤訊息的字串,並結束程式。 --- ## 使用Thread Pool達到效能改善 參考資料:[基於pthread實現的簡單threadpool](http://blog.csdn.net/jcjc918/article/details/50395528)、[wikipedia](https://en.wikipedia.org/wiki/Thread_pool) * ### Threadpool 有一組預先生成的thread,有個管理員來管理和調度這些thread,方法為管理員管理一個任務佇列(job queue),如果接收到新的任務就將其加到佇列尾,每個thread盯著佇列,如果佇列不是空的,就去佇列頭拿一個任務來處理(每個任務只能被一個thread拿到),處理完了就繼續去佇列取任務。 如果沒有任務,thread就會休眠,直到任務佇列不是空的。 如果這個管理員夠聰明,他可能會在沒有任務或任務少的時候,減少thread數,任務過多時增加thread數,實現資源的動態管理。 ![](https://i.imgur.com/P3M4JPO.jpg) Thread Pool 本身,他必須存放所有的 Thread 與任務佇列。 ### Thread pool 簡單實現 在建立threadpool的時候,預先指定thread的數量,然後去job queue取添加進來的task進行處理就好。 主要定義兩個結構 1.`threadpool_task_t` (in threadpool.c) 用於保存一個等待執行的task,包含要執行的函數及其參數。 ```c=53 typedef struct { void (*function)(void *); void *argument; } threadpool_task_t; ``` 2.`threadpool_t` (in threadpool.c) ```c=73 struct threadpool_t { pthread_mutex_t lock; //互斥鎖 pthread_cond_t notify; //條件變數 pthread_t *threads; //執行續陣列的起始指標 threadpool_task_t *queue; //任務佇列的起始指標 int thread_count; //執行緒數量 int queue_size; //任務佇列長度 int head; //目前任務佇列頭 int tail; //目前任務佇列尾 int count; //目前等待執行的任務數 int shutdown; //threadpool當前狀態是否為關閉 int started; //正在執行的thread數 }; ``` 函數 ###### 對外接口 * `threadpool_create` :建立threadpool。 ```c=91 threadpool_t *threadpool_create(int thread_count, int queue_size, int flags) ``` * `threadpool_add` :添加需要執行的任務。第二個參數為對應函數指標,第三個函數為對應函數參數。 ```c=149 int threadpool_add(threadpool_t *pool, void (*function)(void *), void *argument, int flags) ``` * `threadpool_destory` :銷毀存在的threadpool,若flad=1 為graceful結束,即等待全部做完才結束;若flag=0為immediate結束,即立即結束。 ```c=199 int threadpool_destroy(threadpool_t *pool, int flags) ``` ###### 內部輔助函數 * `threadpool_free`:釋放threadpool所申請的記憶體空間。 ```c=243 int threadpool_free(threadpool_t *pool) ``` * `threadpool_thread`:thread pool每個thread所執行的函數。 ```c=266 static void *threadpool_thread(void *threadpool) ``` --- 程式執行後效能比較圖 ![](https://i.imgur.com/JEJhUPJ.png) --- ## 筆記 參考資料: [Concurrency系列](http://enginechang.logdown.com/posts/784600-concurrency-series-1-the-road-to-understand-concurrency) * #### Evaluation求值 包括value computation:對一串運算式計算之結果;side effect:對物件修改的狀態。 * #### Sequenced-before 對同一個thread下,求值順序關係的描述。 * 若A sequenced before B: 代表A求值會先完成才會對B求值。 * `i++`後置運算子,value computation會先於side effect。 * `++i`前置運算子,則相反。 * #### Synchronizes-With 發生在兩個不同thread間的同步行為,當A synchronized-with B的時,代表A對記憶體操作的效果,對於B是可見的。而A和B是兩個不同的thread的某個操作。 * #### Sequential Consistency 1.對於每個獨立的處理單元,執行時都維持程式的順序(Program Order) 2.整個程式以某種順序在所有處理器上執行 --- 作業詳細資訊[A06:phonebook-concurrent](https://hackmd.io/s/rJsgh0na) ###### tags `RayPan` `A06:phonebook-concurrent`