# 2016q3 Homework3 (mergesort-concurrent) contributed by `<vic85821>` ## 預期目標 * 作為 [concurrency](/s/H10MXXoT) 的展示案例 * 學習 POSIX Thread Programming,特別是 [synchronization object](https://docs.oracle.com/cd/E19683-01/806-6867/6jfpgdcnd/index.html) * 為日後效能分析和 scalability 研究建構基礎建設 * 學習程式品質分析和相關的開發工具 ## POSIX Thread 先來搞懂posix thread,了解thread 同步的方法機制 * condition variable * allow threads to synchronize based upon the actual value of data * always used in conjunction with a mutex lock. * declare : `pthread_cond_t` * 使用前必須先初始化 * static: `pthread_cond_t myconvar = PTHREAD_COND_INITIALIZER;` * dynamic: 透過 `pthread_cond_init()` * example `pthread_cond_init (&count_threshold_cv, NULL);` * mutex * implement synchronization by controlling thread access to data * use `pthread_mutex_lock`, `pthread_mutex_unlock`來達到synchronize * 當取得資源,使用`pthread_mutex_lock`避免其他thread同時取得 * 結束後,透過`pthread_mutex_unlock`釋放 ![](http://i.cmpnet.com/ddj/blogs/2011/06/inheritance.png) ## Thread Pool 為了讓各個thread可以load-balance,透過thread pool來實作,同樣的,先來弄清楚thread pool的運作 ### task thread pool中的job/task,讓thread知道該做什麼事 ```c== typedef struct { void (*function)(void *); void *argument; } threadpool_task_t; ``` * function pointer : 傳入thread 該做的工作內容 * argument : function所用到的參數 ### thread pool ![](https://hackpad-attachments.s3.amazonaws.com/embedded2016.hackpad.com_VJmq0R0ILi6_p.537916_1459234875775_001.PNG) ```c== struct threadpool_t { pthread_mutex_t lock; pthread_cond_t notify; pthread_t *threads; threadpool_task_t *queue; int thread_count; int queue_size; int head; int tail; int count; int shutdown; int started; }; ``` * pthread_t : 利用pointer,紀錄所有的thread * head, tail : pthread array offset * queue : 存放等待被執行的task ```c== for(;;) { /* Lock must be taken to wait on conditional variable */ pthread_mutex_lock(&(pool->lock)); /* Wait on condition variable, check for spurious wakeups. When returning from `pthread_cond_wait`(), we own the lock. */ while((pool->count == 0) && (!pool->shutdown)) { pthread_cond_wait(&(pool->notify), &(pool->lock)); } if((pool->shutdown == immediate_shutdown) || ((pool->shutdown == graceful_shutdown) && (pool->count == 0))) { break; } /* Grab our task */ task.function = pool->queue[pool->head].function; task.argument = pool->queue[pool->head].argument; pool->head += 1; pool->head = (pool->head == pool->queue_size) ? 0 : pool->head; pool->count -= 1; /* Unlock */ pthread_mutex_unlock(&(pool->lock)); /* Get to work */ (*(task.function))(task.argument); } pool->started--; pthread_mutex_unlock(&(pool->lock)); pthread_exit(NULL); return(NULL); } ``` * thread 一開始先去搶thread pool的mutex,若取得mutex則lock,避免其他thread也取得 * 如果queue沒有task,則一直等到有工作或是thread pool shutdown * shotdown * immediate_shutdown : 馬上停止thread pool * graceful_shutdown : 等到各個thread的工作都結束,task queue是空的才停止thread pool * 若取得mutex則執行task,執行後釋放mutex ## Lock-free Thread Pool Lock對效能的影響顯著,因此不希望thread是透過lock來實作。原本的condition variable功能可以透過semaphore來取代。改進方向大致上分為這些 * 避免資源的共享,如此一來就不需要lock * 各個thread擁有自己的task queue * main thread 加入task 與 thread 取出task所產生的競爭問題,可以透過**ring buffer**來解決 * condition variable 本質上就是提出來解決執行緒之間的通訊議題,透過semaphore來實作 ```c== sigemptyset(&zeromask); sigemptyset(&newmask); sigaddset(&newmask, SIGXX); sigprocmask(SIG_BLOCK, &newmask, &oldmask) ; while (!CONDITION) sigsuspend(&zeromask); sigprocmask(SIG_SETMASK, &oldmask, NULL) ``` >> 是起步慢嗎? [name=jserv] **需要`#include <signal.h>`** * `sigemptyset()` : initializes the signal set given by set to empty, with all signals excluded from the set. * `sigfillset()` : initializes set to full, including all signals. * `sigaddset()` and `sigdelset()` : add and delete respectively signal signum from set. * `int sigprocmask(int how, const sigset_t *set, sigset_t *oldset);` : used to fetch and/or change the signal mask of the calling thread. * `int sigsuspend(const sigset_t *mask);` : wait for a signal [reference](https://linux.die.net/man/3/sigemptyset) ## Semaphore 假設某段鐵路是單線的,因此,一次只允許一列火車通過;semaphore用於協調同步通過該軌道的火車,火車在進入單一軌道之前必須等待信號燈變為允許通行的狀態,火車進入軌道後,必須改變信號燈狀態,防止其他火車進入該軌道;火車離開這段軌道時,必須再次更改信號燈的狀態,以便允許其他火車進入軌道。 Semaphore可以用於紀錄某一特定資源剩下多少數目可使用;process或thread透過semaphore可以安全的使用共享資源,若特定資源已使用完時,會需要等待資源被釋放。 Semaphore包含兩種:binary semaphore(二進位信號)和counting semaphore(計數信號)。 * binary semaphore * 只能有0跟1,概念上類似mutex * mutex有owner的概念,唯有釋放所有權,才能被其他thread存取 * semaphore同樣限制一次只能被一個process/thread存取,但沒有owner的概念限制 * counting semaphore * 依據semaphore.h的SEM_VALUE_MAX (semvmx)定義。 * 又稱作general semaphore semaphore有兩種操作方式,為atomic operation(all or nothing) * V operation * V()會將semaphore的值加1,signal函數或是sem_post() * 釋放資源 * P operation * P()會將semaphore的值減1,wait函數或是sem_wait() * 獲得資源 * semaphore在減小之前必須為正,確保semaphore值不為負 ### 使用 * POSIX semaphore是標準的counting semaphore,使用`sem_post()`執行V opeation,`sem_wait()`執行P operation。 * POSIX semaphore的最大值於limits.h中所定義 * `#define _POSIX_SEM_VALUE_MAX 32767` * 一個semaphore並非被單一thread所擁有 * thread 執行wait, 另一個thread可以進行signal,但是作業系統在實作時,同一時間點僅能執行其中一個指令,以維持semaphore的一致性。 * 當建立一semaphore時,須設定semaphore的初始值 * Semaphore動作執行成功會回傳0,失敗會回傳 -1 * sem_init():初始化semaphore * `int sem_init(sem_t *sem, int pshared, unsigned int value);` * 第一個參數為semaphore的位址 * 第二個參數為設定semaphore是否可讓不同process使用 * 第三個參數為semaphore初始值 * sem_post():增加semaphore值(加1) * `int sem_post(sem_t *sem);` * sem_wait():減少semaphore值(減1) * `int sem_wait(sem_t *sem);` * sem_trywait():嘗試減少semaphore值(減1) * `int sem_trywait(sem_t *sem);` * sem_destroy():銷毀semaphore * `int sem_destroy(sem_t *sem);` [reference](http://www.syscom.com.tw/ePaper_Content_EPArticledetail.aspx?id=213&EPID=176&j=5&HeaderName=%E6%8A%80%E8%A1%93%E5%88%86%E4%BA%AB) ## 對Link list 排序 **recursive bubble sort** ```c== typedef struct list { int data; struct list *next; } LIST; LIST *sort( LIST *start ) { if( start == NULL ) return NULL; start->next = sort(start->next); if( start->next != NULL && start->data > start->next->data ) { start = move( start ); } return start; } LIST *move( LIST *x ) { LIST *n, *p, *ret; p = x; n = x->next; ret = n; while( n != NULL && x->data > n->data ) { p = n; n = n->next; } /* we now move the top item between p and n */ p->next = x; x->next = n; return ret; } ``` 透過一層一層的呼叫sort,由link list的最後端排列,到整個list都排列完成 [reference](http://faculty.salina.k-state.edu/tim/CMST302/study_guide/topic7/bubble.html) ## 案例分析 ### Tools * **Mutrace** * `sudo apt-get mutrace` e.g. : `(for i in {1..8}; do echo $RANDOM; done) | mutrace ./sort 4 8` ``` mutrace: Showing statistics for process sort (pid 6081). mutrace: 3 mutexes used. Mutex #0 (0x0x6ad9b0) first referenced by: /usr/lib/mutrace/libmutrace.so(pthread_mutex_init+0xf2) [0x7fa70f9ea4b2] ./sort(tqueue_init+0x38) [0x401277] ./sort(tpool_init+0x6a) [0x4014cc] ./sort(main+0x161) [0x401c74] /lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xf0) [0x7fa70f421830] Mutex #1 (0x0x7fa70cff0860) first referenced by: /usr/lib/mutrace/libmutrace.so(pthread_mutex_lock+0x49) [0x7fa70f9ea6b9] /lib/x86_64-linux-gnu/libgcc_s.so.1(_Unwind_Find_FDE+0x2c) [0x7fa70cdecfec] [(nil)] Mutex #2 (0x0x603120) first referenced by: /usr/lib/mutrace/libmutrace.so(pthread_mutex_init+0xf2) [0x7fa70f9ea4b2] ./sort(main+0x11d) [0x401c30] /lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xf0) [0x7fa70f421830] mutrace: Showing 3 most contended mutexes: Mutex # Locked Changed Cont. tot.Time[ms] avg.Time[ms] max.Time[ms] Flags 0 76 14 7 0.023 0.000 0.002 Mx.--. 1 20 10 4 0.008 0.000 0.002 M-.--. 2 13 4 0 0.004 0.000 0.000 M-.--. |||||| /||||| Object: M = Mutex, W = RWLock /|||| State: x = dead, ! = inconsistent /||| Use: R = used in realtime thread /|| Mutex Type: r = RECURSIVE, e = ERRRORCHECK, a = ADAPTIVE /| Mutex Protocol: i = INHERIT, p = PROTECT / RWLock Kind: r = PREFER_READER, w = PREFER_WRITER, W = PREFER_WRITER_NONREC mutrace: Note that the flags column R is only valid in --track-rt mode! mutrace: Total runtime is 1.006 ms. ``` * **GraphViz** * 依據給定指令的製圖軟體,不過說是繪圖軟體,它能繪的圖並不是一般人想像中的漫畫或 logo,而是數學意義上的 “graph” * [指令](http://www.graphviz.org/doc/info/shapes.html) e.g. ```graphviz digraph { 開始Process[shape="box", style=rounded]; 是否為結束Task[shape="diamond"]; 結束Process[shape="box", style=rounded]; 開始Process->提取Task->是否為結束Task 是否為結束Task->重發結束Task[label="是"] 重發結束Task->結束Process 是否為結束Task->執行Task[label="否"] 執行Task->提取Task; } ``` ### 修改 * 將程式修改成針對string的mergesort val_t改為char array(大小是根據phonebook的MAX_LAST_NAME_SIZE) ```c== typedef char val_t[MAX_LAST_NAME_LEN]; ``` merge_list() ```c== while (a->size && b->size) { llist_t *small = (llist_t *) ((intptr_t) a * (strcmp(a->head->data,b->head->data) < 0 ? 1:0) + (intptr_t) b * (strcmp(a->head->data,b->head->data) < 0 ? 0:1)); if (current) { current->next = small->head; current = current->next; } else { _list->head = small->head; current = _list->head; } small->head = small->head->next; --small->size; ++_list->size; current->next = NULL; } ``` * 設計自動化測試 ```== CHECK_CFLAGS = -std=gnu99 -Wall -g -pthread -DTEST check_right : $(CC) $(CHECK_CFLAGS) -o list.o -MMD -MF .list.o.d -c list.c $(CC) $(CHECK_CFLAGS) -o threadpool.o -MMD -MF .threadpool.o.d -c threadpool.c $(CC) $(CHECK_CFLAGS) -o main.o -MMD -MF .main.o.d -c main.c $(CC) $(CFLAGS) -o $@ $(OBJS) -rdynamic uniq words.txt | sort -R > input.txt ./check_right 4 349900 < input.txt > output.txt diff words.txt output.txt ``` diff的結果兩個檔案內容一致,可以知道sort的結果正確 * 測試不同thread number的效能 ###### tags: `mergesort` `thread pool`