# 2016q3 Homework3 (mergesort-concurrent) contributed by <`carolc0708`> * [作業說明A07: mergesort-concurrent](/s/rJ-GWtJ0) * [WEEK3 課程筆記](/s/r1ddPnNC) ### 預期目標 * 作為 [concurrency](/s/H10MXXoT) 的展示案例 * 學習 POSIX Thread Programming,特別是 [synchronization object](https://docs.oracle.com/cd/E19683-01/806-6867/6jfpgdcnd/index.html) * 為日後效能分析和 scalability 研究建構基礎建設 * 學習程式品質分析和相關的開發工具 ### 作業要求 - [ ] 將 merge sort 的實做改為可接受 [phonebook-concurrent](https://github.com/sysprog21/phonebook-concurrent) 的 35 萬筆資料輸入的資料檔 * 字典檔資料需要事先用 `sort -R` 處理過 * 思考如何得到均勻分佈的亂數排列,並且設計自動測試的機制 - [ ]研究 thread pool 管理 worker thread 的實做,提出實做層面的不足,並且參照 [concurrent-ll](https://github.com/jserv/concurrent-ll),提出 lock-free 的實做 - [ ] 學習 [concurrent-ll](https://github.com/jserv/concurrent-ll) (concurrent linked-list 實作) 的 scalability 分析方式,透過 gnuplot 製圖比較 merge sort 在不同執行緒數量操作的效能 * 注意到 linked list 每個節點配置的記憶體往往是不連續,思考這對效能分析的影響 - [ ] 一併嘗試重構 (refactor) 給定的程式碼,使得程式更容易閱讀和維護。延續 [A05: introspect](/s/BkhIF92p),不只是在共筆上用文字提出良性詳盡的批評,也該反映在程式碼的變革 * 共筆的內容儘量用 GraphViz 製作 * 截止日期: * 08:00AM Oct 7, 2016 (含) 之前 * 越早在 GitHub 上有動態、越早接受 code review,評分越高 ### 挑戰題 * 引入 C11 本身的 thread.h 實作多執行並用 `_Atomic` 改寫。參考資料: * [Atomics in C programming](https://www2.informatik.hu-berlin.de/~keil/docs/keil_-_c11_atomics_20140202.pdf) * [You Can Do Any Kind of Atomic Read-Modify-Write Operation](http://preshing.com/20150402/you-can-do-any-kind-of-atomic-read-modify-write-operation/) * [Boost atomic examples](http://www.boost.org/doc/libs/1_61_0/doc/html/atomic/usage_examples.html) ### Hint 計算merge sort 的空間複雜度,建立memory pool給merge sort 使用,分析完後配置最大空間,但是同一時間可能會同時有兩個地方在merge,防範overlap ,要把記憶體切成若干區段,最後的資料搬動(找memory pool的實作),malloc的回收排序的資料在記憶體中不連續(找link-list cache) ## [Thread Models](http://maxim.int.ru/bookshelf/PthreadsProgram/htm/r_19.html) #### The boss/worker model ![](https://i.imgur.com/ipAz6Jd.gif) * The boss creates each worker thread, assigns it tasks, and, if necessary, waits for it to finish. * In the **pthread_create** call it uses to create each worker thread, the boss **specifies the task-related routine** the thread will execute. * After creating each worker, the boss returns to the top of its loop to process the next request. * Once finished, each worker thread can be made responsible for any output resulting from its task, or it can **synchronize with the boss** and let it handle its output. ``` main() /* The boss */ { forever { get a request switch request case X : pthread_create( ... taskX) case Y : pthread_create( ... taskY) . . . } } taskX() /* Workers processing requests of type X */ { perform the task, synchronize as needed if accessing shared resources done } taskY() /* Workers processing requests of type Y */ { perform the task, synchronize as needed if accessing shared resources done } ``` * Alternatively, the boss could save some run-time overhead by **creating all worker threads up front.** ==> thread pool ``` main() /* The boss */ { for the number of workers pthread_create( ... pool_base ) forever { get a request place request in work queue signal sleeping threads that work is available } } pool_base() /* All workers */ { forever { sleep until awoken by boss dequeue a work request switch case request X: taskX() case request Y: taskY() . . . } } ``` * it is important that you **minimize the frequency with which the boss and workers communicate** * can't create too many interdependencies among the workers, or all workers will **suffer a slowdown**. #### The peer model (workcrew model) ![](https://i.imgur.com/NiU9afg.gif) * all threads work concurrently on their tasks without a specific leader. * one thread must create all the other peer threads when the program starts, this thread then **acts as just another peer thread** that processes requests, or **suspends itself waiting** for the other peers to finish. * this model makes each thread responsible for its own input: A peer knows its own input ahead of time, has its own private way of obtaining its input, or shares a single point of input with other peers. ``` main() { pthread_create( ... thread1 ... task1 ) pthread_create( ... thread2 ... task2 ) . . . signal all workers to start wait for all workers to finish do any clean up } task1() { wait for start perform task, synchronize as needed if accessing shared resources done } task2() { wait for start perform task, synchronize as needed if accessing shared resources done } ``` * suitable for applications that have **a fixed or well-defined set of inputs**, such as matrix multipliers, parallel database search engines, and prime number generators. ==> e.g. 一個地區分很多區塊,每個區塊的事情丟給不同的 thread * Because there is no boss, peers themselves must **synchronize their access to any common sources of input**. * However, like workers in the boss/worker model, peers can also slow down if they must frequently synchronize to access shared resources. #### The pipeline model ![](https://i.imgur.com/nzwEu9c.gif) * The pipeline model assumes: * A long stream of input * A series of suboperations (known as stages or filters) through which every unit of input must be processed * Each processing stage can handle a different unit of input at a time * Applications in which the pipeline might be useful are image processing and text processing or any application that can be broken down into a series of filter steps on a stream of input. ``` main() { pthread_create( ... stage1 ) pthread_create( ... stage2 ) . . . wait for all pipeline threads to finish do any clean up } stage1() { forever { get next input for the program do stage 1 processing of the input pass result to next thread in pipeline } } stage2() { forever { get input from previous thread in pipeline do stage 2 processing of the input pass result to next thread in pipeline } } stageN() { forever { get input from previous thread in pipeline do stage N processing to the input pass result to program output } } ``` * We could add **multiplexing or demultiplexing** to this pipeline, allowing multiple threads to work in parallel on a particular stage. * We could also dynamically configure the pipeline at run time, having it **create and terminate stages** (and the threads to service them) as needed. * the overall throughput of a pipeline is limited by **the thread that processes its slowest stage.** * when designing, programmer should aim at **balancing the work** to be performed across all stages; that is, all stages should take about the same amount of time to complete. ## Mutrace: 分析 mutex contention * **Lock contention**: this occurs whenever **one process or thread attempts to acquire a lock held by another process or thread.** The more **fine-grained** the available locks, the less likely one process/thread will request a lock held by the other. (For example, locking a row rather than the entire table, or locking a cell rather than the entire row.) * mutrace v.s. valgrind/drd In contrast to valgrind/drd it **does not virtualize the CPU instruction set**, making it a lot faster. In fact, the hooks mutrace relies on to profile mutex operations should only minimally influence application runtime. Mutrace is **not useful for finding synchronizations bugs**, it is solely useful for profiling locks. * About mutrace * mutrace is implemented entirely in **userspace**. * build your application with `-rdynamic` to make the backtraces mutrace generates useful. * `-r`| --track-rt: checks on each mutex operation wheter it is executed by a realtime thread or not. =>use this to track down which mutexes are good candidates for **priority inheritance**. * `matrace` that can be used to track down **memory allocation** operations in realtime threads. * 將原始碼抓下來看有更多詳細內容可以參考 ``` $ git clone http://git.0pointer.net/clone/mutrace.git ``` * 其中 `mutrace.in` 裡面可以看到其他不同 flags 的功能 * 也有 `matrace` 的內容可以參考 * 安裝 mutrace ``` $ sudo apt-get install mutrace ``` * 在 `mergesort-concurrent` 程式執行看看: ``` (for i in {1..8}; do echo $RANDOM; done) | mutrace ./sort 4 8 ``` 結果如下: ``` mutrace: 0.2 sucessfully initialized for process sort (pid 4814). input unsorted data line-by-line sorted results: [9602] [10669] [11911] [16147] [19714] [22776] [22870] [32443] mutrace: Showing statistics for process sort (pid 4814). mutrace: 3 mutexes used. Mutex #1 (0x0x216c9b0) first referenced by: /usr/lib/mutrace/libmutrace.so(pthread_mutex_init+0xf2) [0x7f5a530484b2] ./sort(tqueue_init+0x38) [0x401277] ./sort(tpool_init+0x6a) [0x4014cc] ./sort(main+0x161) [0x401c74] /lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xf0) [0x7f5a52a7f830] Mutex #0 (0x0x7f5a5064e860) first referenced by: /usr/lib/mutrace/libmutrace.so(pthread_mutex_lock+0x49) [0x7f5a530486b9] /lib/x86_64-linux-gnu/libgcc_s.so.1(_Unwind_Find_FDE+0x2c) [0x7f5a5044afec] [(nil)] mutrace: Showing 2 most contended mutexes: Mutex # Locked Changed Cont. tot.Time[ms] avg.Time[ms] max.Time[ms] Flags 1 62 5 1 0.005 0.000 0.001 Mx.--. 0 20 3 0 0.002 0.000 0.000 M-.--. |||||| /||||| Object: M = Mutex, W = RWLock /|||| State: x = dead, ! = inconsistent /||| Use: R = used in realtime thread /|| Mutex Type: r = RECURSIVE, e = ERRRORCHECK, a = ADAPTIVE /| Mutex Protocol: i = INHERIT, p = PROTECT / RWLock Kind: r = PREFER_READER, w = PREFER_WRITER, W = PREFER_WRITER_NONREC mutrace: Note that the flags column R is only valid in --track-rt mode! mutrace: Total runtime is 0.375 ms. mutrace: Results for SMP with 4 processors. ``` * 表格意思: * `Locked`: how often the mutex was locked during the entire runtime. * `Changed`: how often the owning thread of the mutex changed. =>If the number is high this means the risk of contention is also high. * `Cont.`: how often the lock was already taken when we tried to take it and we had to wait. ==> Mutex #1 is the most contended lock * `tot.Time[ms]`: for how long during the entire runtime the lock was locked * `avg.Time[ms]`: the average lock time * `max.Time[ms]`: the longest time the lock was held. * `Flags`: what kind of mutex this is (recursive, normal or otherwise). :::info 可用 `addr2line` 來找到實際對應的程式行數: * 注意: 要確保編譯時加入 `-g` 參數,確保包含 debug info 的執行檔正確產生。以 `(tqueue_init+0x38) [0x401277]` 這個地址來說,對應的原始程式碼為: ``` $ addr2line -e sort 0x401277 /home/carol/mergesort-concurrent/threadpool.c:15 ``` ::: >我用 0x38 結果是 `??:0` [name=Carol Chen] ## W2-QA: 以 linked-list 實作 recursive merge sort * 將 [Linked List Bubble Sort](http://faculty.salina.k-state.edu/tim/CMST302/study_guide/topic7/bubble.html) 裡頭,對 linked list 的排序演算法從 recursive bubble sort 更換為 recursive merge sort --- * 參考 [Merge Sort for Linked Lists](http://www.geeksforgeeks.org/merge-sort-for-linked-list/) :::info **MergeSort(headRef)** 1) If head is NULL or there is only one element in the Linked List then return. 2) Else divide the linked list into two halves. FrontBackSplit(head, &a, &b); /* a and b are two halves */ 3) Sort the two halves a and b. MergeSort(a);
MergeSort(b);
4) Merge the sorted a and b (using SortedMerge() discussed here)
   and update the head pointer using headRef.
      *headRef = SortedMerge(a, b); 這邊是先沒有 lock。 ```C //初始化 threadpool int tpool_init(tpool_t *the_pool, uint32_t count, void *(*func)(void *)); ``` ```C //釋放 threadpool 記憶體空間 int tpool_free(tpool_t *the_pool); ``` * 此函式回傳值無意義 ==> 增加 `the_pool` 是否存在的判斷,以及`pthread_join()` 是否成功的判斷 ```C= int tpool_free(tpool_t *the_pool) { if(the_pool == NULL) return -1; for (uint32_t i = 0; i < the_pool->count; ++i) if(pthread_join(the_pool->threads[i], NULL) != 0) return -1; free(the_pool->threads); tqueue_free(the_pool->queue); return 0; } ``` :::info **Threadpool 問題總整理** 1. 定義了 condition variable 但從頭到尾沒有使用 2. shutdown 的方式沒有定義(立即 shutdown 或是 等事情做完再 shutdown) ::: --- ### `main.c` * 參考 `LanKuDot` 同學[共筆](https://hackmd.io/s/S1ezGhIA),看了好久才看懂 * 程式主要可以分兩個部分: * 未達 max_cut: `cut_func()` recursive 切 左右 list * 達 max_cut 之後: `merge_sort()` recursive 切左右 list 並 merge 回來 ```C //dispatch the division task to queue void cut_func(void *data); ``` * `main()` 中 `init_tpool()` 後作的**第一件事** * 在 `main()` 中有定義 `max_cut` = MIN(`thread_count`, `data_count`)-1 * 此函式在 `cut_count` 未達 `max_cut` 或 `list->size<1` 之前,會將 list 對半切,並將 左右 list 和 `cut_func()` 作為 task 的 `arg` 和 `func`, push 到 `t_queue()` 內 * 達到條件之後,不管切到哪裡就直接將傳入的 list 放入 `merge_sort()` 並將結果 `merge()` * 在 critical section 有 lock 的設計 * 在此更改變數名稱 `_list` 和 `list` 為 `llist` 和 `rlist` 增加程式易讀性。也改正左右 list 註解相反錯誤之處。 ```C //merge the cross level list void merge(void *data); ``` * critical section 有做 lock 處理 * 若 _list->size < data_count: 另一邊的 list 還未進入,則先將自己記錄在 `tmp_list`;若進入了,再將 task push 到 `t_queue()`中 * task 的 `arg` : `merge_list(左,右)` * task 的 `func` : `merge()` * 若 _list->size = data_count: 終止條件。全部 sort 完畢,將結果以全域變數 `the_list` 紀錄之,並 `print_list()` 印出 * ==這邊還將空 `func` push 到 `t_queue()` 中== >不太懂這樣做的意義為何? [name=Carol Chen] ``` //cut list to left & right and then merge back llist_t *merge_sort(llist_t *list); ``` * 為 recursive function * 將傳入的 `list` recursive 切左右,直到 `list->size<2` * 回傳透過 `merge_list()` merge 好的結果 ```C //merge left & right list llist_t *merge_list(llist_t *a, llist_t *b); ``` * 比大小來做 `*a` 和 `*b` list 的 merge ```C //grab one task from task queue and excute static void *task_run(void *data); ``` * 相當於 [mbrossard/threadpool](https://github.com/mbrossard/threadpool/blob/master/src/threadpool.c) 中的 `threadpool_thread()`,由 t_queue 中取出一個 task,並派遣給 thread 去執行 * 覺得結構上應該要放在 `tpool.c` 中 * 在此沒有設計 lock 或 condition varible ```C int main(int argc, char const *argv[]); ``` 1. 讀入 `thread_count`、`data_count`,計算 `max_cut` 2. 讀入 input 數字串建立 `list` 3. 初始化 `tpool` 以及其他全域變數 4. `cut_func()` 作為第一個 task,push 到 `t_queue()` 中 ## 設計自動測試實驗 * 參考 `Tempojiji` 同學的[共筆](https://hackmd.io/s/B1-Kytv0) 對於自動測試程式的設計,建立一個 `input_generator.c` 作為產生亂數的程式: ```C= #include <stdio.h> #include <time.h> #include <stdlib.h> int main(int argc, char *argv[]) { int MAX_SIZE = atoi(argv[1]); FILE *fp = fopen("input","w+"); srand((long int)time(NULL)); for(int i = 0; i < MAX_SIZE; i++) fprintf(fp, "%u\n", rand() / 10000); fclose(fp); } ``` * 在 `main.c` 中使用`clock_gettime()` 量測時間並加入這一段: ```C= # if defined(BENCH) FILE *fp = fopen("input","r"); long int data; while((fscanf(fp, "%ld\n", &data)) != EOF){ e = list_add(e,data); } fclose(fp); # endif . . . #if defined(BENCH) fp = fopen("output","a+"); if(thread_count == 1) fprintf(fp, "%d,", data_count); fprintf(fp, "%lf", cpu_time); if(thread_count == 64) fprintf(fp, "\n"); else fprintf(fp,","); fclose(fp); printf("%d %d %lf\n", data_count, thread_count, cpu_time); #endif ``` * 接著修改 `Makefile`: ```C bench: for i in `seq 100 100 800`; do \ ./input_generator $$i; \ for j in 1 2 4 8 16 32 64; do \ ./sort $$j $$i; \ done \ done plot: bench gnuplot runtime.gp ``` * 也要撰寫相對應的 `runtime.gp` 繪圖腳本 * 以下是目前測試結果: ![](https://i.imgur.com/mNP67Qf.png) 可以看到 `thread_num` 越大,時間跳動越明顯。 >不知道為什麼沒有辦法執行到 10000 個數字以上的 sort, >時間就會卡很久在那邊不動... [name=Carol Chen] ## scalability 分析方式 :::info **scalability** 在 wikipedia 上的定義: * An algorithm, design, networking protocol, program, or other system is said to **scale** if it is **suitably efficient and practical when applied to large situations** (e.g. a large input data set, a large number of outputs or users, or a large number of participating nodes in the case of a distributed system). If the design or system fails when a quantity increases, it does not scale. * In practice, if there are a large number of things (n) that affect scaling, then resource requirements (for example, algorithmic time-complexity) must grow less than n^2 as n increases. An example is a search engine, which scales not only for the number of users, but also for the number of objects it indexes. * Scalability refers to the ability of a site to increase in size as demand warrants. 在 Stackoverflow 上面的討論: * Scalability is **the ability of a program to scale**. For example, if you can do something on a small database (say less than 1000 records), a program that is highly scalable would **work well on a small set as well as working well on a large set** (say millions, or billions of records). * It would have a **linear growth of resource requirements**. Look up Big-O notation for more details about how programs can require more computation the larger the data input gets. Something parabolic like Big-O(x^2) is far less efficient with large x inputs than something linear like Big-O(x). ::: --- * 參考 `LanKuDot` 同學的[共筆](https://hackmd.io/s/S1ezGhIA),對於 scalability 的整理,以下是他的筆記: #### 運作 1. 執行時須帶有參數,第一個參數指定 `core`,`shift` 會讓所有參數左移 (原本第二個參數會變第一個) 2. 執行 `config` 及 `lock_exec` script 3. 讀取指定執行程式名到 `prog`,剩下後接的參數到 `params` 4. 運行 1~`core` 核心的測試,並輸出**與 core 1 的執行結果比較** 5. ==一共有四個資訊會輸出 (下標 N 為執行的 core 數) a. #cores:執行的核心數 b. throughput:每單位時間的執行工作數量 c. %linear:$(1-\frac{N-scalaility}{N})\times100\%=(\frac{scalability}{N})\times100\%$,每個 core 的平均 scalability in percent d. scalability:$\frac{throughput_N}{ throughput_1}$,throughput 增加比== 6. 執行 `unlock_exec` script --- #### [concurrent-ll](https://github.com/jserv/concurrent-ll) 對於 scalability 的分析 #### `scalability.sh` * `scalability1.sh` 和 `scalability2.sh` 差在一次可執行的程式數為 1 個 或 2 個 * 執行看看 ``` carol@carol-PC:~/concurrent-ll$ scripts/scalability1.sh all ./out/test-lock -i128 ## Use 4 as the number of cores. ## If not correct, change it in scripts/config #cores throughput %linear scalability 1 309985 100.00 1 2 186428 30.07 0.60 3 195184 20.99 0.63 4 2493 0.20 0.01 ``` ``` carol@carol-PC:~/concurrent-ll$ scripts/scalability2.sh all ./out/test-lock ./out/test-lockfree -i100 ## Use 4 as the number of cores. ## If not correct, change it in scripts/config # ./out/test-lock ./out/test-lockfree #cores throughput %linear scalability throughput %linear scalability 1 310796 100.00 1 464000 100.00 1 2 185595 29.86 0.60 736644 79.38 1.59 3 192483 20.64 0.62 1033290 74.23 2.23 4 8355 0.67 0.03 1039714 56.02 2.24 ``` >後面傳入的這個 `-i128` 和 `-i100` 是什麼意思? [name=Carol Chen] >怎麼用它來測自己的程式? [name=Carol Chen] ## 更改實作使其可接受 phonebook 的資料檔 ## threadpool 使用 condition variable * 原程式的 threadpool 定義了 condition variable 但未使用 * 參考 [mbrossard/threadpool](https://github.com/mbrossard/threadpool/blob/master/src/threadpool.c) 的設計 --- * 在 `tqueue_pop()`: 若定義了 MONITOR 就要把 lock 拿掉。 ```C= task_t *tqueue_pop(tqueue_t *the_queue) { task_t *ret; #ifndef MONITOR pthread_mutex_lock(&(the_queue->mutex)); #endif ret = the_queue->head; if (ret) { the_queue->head = ret->next;//if it is NULL, then let it be if (the_queue->head) { the_queue->head->last = NULL; } else { the_queue->tail = NULL; } the_queue->size--; } #ifndef MONITOR pthread_mutex_unlock(&(the_queue->mutex)); #endif return ret; } ``` * `tqueue_push()` ```C= int tqueue_push(tqueue_t *the_queue, task_t *task) { #ifndef MONITOR if(task == NULL) return -1; #endif pthread_mutex_lock(&(the_queue->mutex)); task->next = NULL; task->last = the_queue->tail; if (the_queue->tail) the_queue->tail->next = task; the_queue->tail = task; //only one task in queue if (the_queue->size++ == 0) the_queue->head = task;//only one task in queue #if defined(MONITOR) pthread_cond_signal(&(the_queue->cond)); #endif pthread_mutex_unlock(&(the_queue->mutex)); return 0; } ``` * `tqueue_free()` ```C= int tqueue_free(tqueue_t *the_queue) { if(the_queue == NULL) return -1; task_t *cur = the_queue->head; while (cur) { the_queue->head = the_queue->head->next; free(cur); cur = the_queue->head; } #if defined(MONITOR) pthread_mutex_lock(&(the_queue->mutex)); #endif pthread_mutex_destroy(&(the_queue->mutex)); pthread_cond_destroy(&(the_queue->cond)); return 0; } ``` * `tpool_free()` ```C= int tpool_free(tpool_t *the_pool) { #if defined(MONITOR) pthread_cond_broadcast(&(the_pool->queue->cond)); pthread_mutex_unlock(&(the_pool->queue->mutex)); #endif if(the_pool == NULL) return -1; for (uint32_t i = 0; i < the_pool->count; ++i) if(pthread_join(the_pool->threads[i], NULL) != 0) return -1; free(the_pool->threads); tqueue_free(the_pool->queue); return 0; } ``` * `task_run()` ```C= static void *task_run(void *data) { (void) data; while (1) { #if defined(MONITOR) pthread_mutex_lock(&(pool->queue->mutex)); while( pool->queue->size == 0) { pthread_cond_wait(&(pool->queue->cond), &(pool->queue->mutex)); } if (pool->queue->size == 0) break; #endif task_t *_task = tqueue_pop(pool->queue); #if defined(MONITOR) pthread_mutex_unlock(&(pool->queue->mutex)); #endif if (_task) { if (!_task->func) {//if function does not exist, push task back to queue tqueue_push(pool->queue, _task); break; } else { _task->func(_task->arg);//excute task free(_task); } } } pthread_exit(NULL); } ``` --- * 結果入下: ![](https://i.imgur.com/jKkaw3A.png) 比原先快很多,可以看到 y 軸最高只有 0.014 秒 * 和原版的比較: ![](https://i.imgur.com/V3yAGxy.png) ## 改良 threadpool 為 lock-free 版本 * 參考 [concurrent-ll](https://github.com/jserv/concurrent-ll) 對於 lock-free threadpool 的做法 * [Hungry-bird](https://github.com/jserv/hungry-birds) lock-free queue 的做法 * [Lock-free linked-list slides](http://www.slideshare.net/anniyappa/linked-lists-28793865) ## 修改 queue 結構比較其效能 * 參考 `第 20 組` 同學們 [共筆](https://hackmd.io/s/SJUfbulyx),將 queue 的結構改成 ring structure 並比較效能優劣