# [mergesort-concurrent](https://hackmd.io/s/rJ-GWtJ0) [github](https://github.com/diana0651/mergesort-concurrent) contributed by <`Diana Ho`> ###### tags: `d0651` `sys` ## 案例分析 #### 對單向 Linked List ### 兩大重點 - 排序的對象是 singly-linked list - 利用 POSIX Thread 處理,需要一併操作 synchronization object ### 預期目標 - [ ] 作為 [concurrency](/s/H10MXXoT) 的展示案例 - [ ] 學習 POSIX Thread Programming,特別是 [synchronization object](https://docs.oracle.com/cd/E19683-01/806-6867/6jfpgdcnd/index.html) - [ ] 為日後效能分析和 scalability 研究建構基礎建設 - [ ] 學習程式品質分析和相關的開發工具 ### 作業要求 - [ ] 將 merge sort 的實做改為可接受 [phonebook-concurrent](https://github.com/sysprog21/phonebook-concurrent) 的 35 萬筆資料輸入的資料檔 * 字典檔資料需要事先用 `sort -R` 處理過 * 思考如何得到均勻分佈的亂數排列,並且設計自動測試的機制 - [ ] 研究 thread pool 管理 worker thread 的實做,提出實做層面的不足,並且參照 [concurrent-ll](https://github.com/jserv/concurrent-ll),提出 lock-free 的實做 - [ ] 學習 [concurrent-ll](https://github.com/jserv/concurrent-ll) (concurrent linked-list 實作) 的 scalability 分析方式,透過 gnuplot 製圖比較 merge sort 在不同執行緒數量操作的效能 * 注意到 linked list 每個節點配置的記憶體往往是不連續,思考這對效能分析的影響 - [ ] 一併嘗試重構 (refactor) 給定的程式碼,使得程式更容易閱讀和維護。延續 [A05: introspect](/s/BkhIF92p),不只是在共筆上用文字提出良性詳盡的批評,也該反映在程式碼的變革 - [ ] 共筆的內容儘量用 GraphViz 製作 --- ## POSIX Thread Programming >>[參考概念](https://hackmd.io/AwTgzARgJhBMBmBaAbPAxgU0QFmWiiEAHAIYiKwDsoyRVEAjGsEA) ### Model >[Designing Threaded Programs](http://maxim.int.ru/bookshelf/PthreadsProgram/htm/r_19.html) * Boss/Worker Model Boss 接收到一個新要求時,動態建立一個 Worker 完成該項要求,適用於伺服器上。可於程式開始時先建立好一定數量的執行緒(Thread Pool),減少執行時期的負擔。 * Peer Model (Workcrew model) 相較於 Boss/Worker 模式,此模式的每執行緒擁有自己的輸入,適用於矩陣運算、資料庫搜尋等。 * Pipeline Model 執行緒由上一階段取得輸入資料,並將結果傳至下一階段的執行緒。整體輸出受制於執行最久的階段,須注意每一階段的工作負擔要盡可能平均。 >>[參考概念](https://hackmd.io/IYTgzAJhBGEKwFo4jgDgQFgAwgEwOgDZV8AzMAUyJA2gEZxSg===) ### manager-worker [架構](http://maxim.int.ru/bookshelf/PthreadsProgram/htm/r_19.html) ```graphviz digraph hierarchy { nodesep=0.5 // increases the separation between nodes node [color=black,fontname=Courier,shape=box] //All nodes will this shape and colour edge [color=black, style=dashed] // All the lines look like this Manager->{Worker1 Worker2 Worker3} } ``` 為了分配工作,在 worker thread 實作 Task 的機制。每個主要的操作會先被放進 task queue 裡頭,空閒的 thread 再從 task queue 裡頭提取 task 執行,只要把我們的操作寫成 task,就能順利執行。 ```graphviz digraph { 開始Process[shape="box", style=rounded]; 是否為結束Task[shape="diamond"]; 結束Process[shape="box", style=rounded]; 開始Process->提取Task->是否為結束Task 是否為結束Task->重發結束Task[label="是"] 重發結束Task->結束Process 是否為結束Task->執行Task[label="否"] 執行Task->提取Task; } ``` >>[參考圖片](https://hackmd.io/s/rJ-GWtJ0) ### Synchronization [synchronization object](https://docs.oracle.com/cd/E19683-01/806-6867/6jfpgdcnd/index.html) * pthread_join: 某執行緒暫停執行直到另一直行緒結束 * Mutex: 同一時間內只有一執行緒可以保有該 lock 及保護資料的存取權 * Initialize * Static:```PTHREAD_MUTEX_INITIALIZER``` * Dynamic:```pthread_mutex_init()``` * Lock * ```pthread_mutex_lock()``` * Release * ```pthread_mutex_unlock()``` * Condition variable: 將事件實質化,並提供函式喚醒等待該事件的執行緒 * Initialize * Static:```PTHREAD_COND_INITIALIZER``` * Dynamic:```pthread_cond_init()``` * Waits for condition variables * 等待直到喚醒:```pthread_cond_wait()``` * 等待特定時間:```pthread_cond_timewait()``` * 喚醒等待中執行緒 * 喚醒其中之一:```pthread_cond_signal()``` * 喚醒所有等待中執行緒:```pthread_cond_broadcast()``` * 為避免 [Spurious Wake Ups](https://en.wikipedia.org/wiki/Spurious_wakeup) 或其他先被喚醒的執行緒執行導致條件變數再次不成立,一般會以迴圈判斷條件是否成立,不成立則再次進入等待 * pthread_once: 保證初始函式被許多直行緒呼叫時僅執行一次 * 宣告 pthread_once_t 並靜態初始為```PTHREAD_ONCE_INIT``` * 以 pthread_once() 與 once 區域呼叫目標函式 * 不容許傳遞參數至 once 所保護的函式 ### Pthreads Management #### Key 一種指標,將資料和執行緒進行關聯 #### Thread cancellation * States: * ```PTHREAD_CANCEL_DISABLE``` * Type: ignored * ```PTHREAD_CANCEL_ENABLE``` * Types: * ```PTHREAD_CANCEL_ASYNCHRONOUS```:立刻取消 * ```PTHREAD_CANCEL_DEFERRED```:當執行到 Cancellation-point 時發生 * Cancellation-point * ```pthread_cond_wait(), pthread_cond_timewait(), pthread_join()``` * 使用者定義```pthread_testcancel()``` * Asynchronous Cancellation: 當執行緒在此狀態時,即使在執行函式庫呼叫或是系統呼叫時也能夠被取消,除非被定義為 cancellation-safe 否則應防止該情形發生 ### mutex contention mutrace 可用來偵測 [lock contention](https://en.wikipedia.org/wiki/Lock_(computer_science)#Granularity),使用方便,不需要重新編譯程式碼。 >>[參考概念](https://hackmd.io/CwQwxmCsAcBGAMBaaAzeTgEYDMAmRC2miA7NufAJzqwiwCmQA===?both) * **Lock contention**: this occurs whenever **one process or thread attempts to acquire a lock held by another process or thread.** The more **fine-grained** the available locks, the less likely one process/thread will request a lock held by the other. (For example, locking a row rather than the entire table, or locking a cell rather than the entire row.) * mutrace v.s. valgrind/drd In contrast to valgrind/drd it **does not virtualize the CPU instruction set**, making it a lot faster. In fact, the hooks mutrace relies on to profile mutex operations should only minimally influence application runtime. Mutrace is **not useful for finding synchronizations bugs**, it is solely useful for profiling locks. * About mutrace * mutrace is implemented entirely in **userspace**. * build your application with `-rdynamic` to make the backtraces mutrace generates useful. * `-r`| --track-rt: checks on each mutex operation wheter it is executed by a realtime thread or not. =>use this to track down which mutexes are good candidates for **priority inheritance**. * `matrace` that can be used to track down **memory allocation** operations in realtime threads. > [Measuring Lock Contention](http://0pointer.de/blog/projects/mutrace.html) > [More Mutrace](http://0pointer.net/blog/projects/mutrace2.html) --- ## 閱讀程式碼 ### list.[ch] #### list.h * node:儲存實際的資料 ```clike= typedef struct node { val_t data; struct node *next; } node_t; ``` * linked list:建立 linked list 順便紀錄長度,所以在分割 list 的時候只需要花 $O(N)$ 的時間就好,不需要重新計算長度。 ```clike= typedef struct llist { node_t *head; uint32_t size; } llist_t; ``` :::info #### intptr_t 資料型態 >>[When/Why to use intptr_t for type-casting in C?](http://stackoverflow.com/questions/6326338/why-when-to-use-intptr-t-for-type-casting-in-c) >>[What is the use of intptr_t?](http://stackoverflow.com/questions/35071200/what-is-the-use-of-intptr-t) * 功能 - 可pointer轉換成integer型態,做位元運算與加減法。 相較於 `void *`,`intptr_t` 的位址可以做 bitwise operation 而 `void *` 不能。做位元運算,無號的`uintptr_t`會比較好。 ```clike= /* Types for `void *' pointers. */ #if __WORDSIZE == 64 # ifndef __intptr_t_defined typedef long int intptr_t; # define __intptr_t_defined # endif typedef unsigned long int uintptr_t; #else # ifndef __intptr_t_defined typedef int intptr_t; # define __intptr_t_defined # endif typedef unsigned int uintptr_t; #endif ``` - pointer 轉成 integer 時,32 bits 或 64 bits 的 machine 會有 pointer 長度的差別。若使用到 `intptr_t`,往後我們只需 `#include <stdint>` 就能免除麻煩的型態轉換。除此之外,也是確保型態轉換上不會出問題的保守作法。 ```clike= #ifdef MACHINE 64 typedef long intptr; #else // MACHINE 32 typedef int intptr; #endif ``` * 目的 - 讓 node 中的資料可以是指標型態,也可以是一般資料型態。 - 為了相容不同平台所設計。 ::: #### list.c - node_new:在 next 前方放入一個新的 node,並回傳指向該 node 之指標 - list_new:初始化一串新的 linked-list - list_add:在 linked-list 的尾端加入一個新的 node ### threadpool.[ch] #### threadpool.h * task:讓 thread 知道自己要做什麼 採用的 doubly linked list。紀錄前一個 task 的用意在於,當從 queue 中拿出工作後,可以在 $O(1)$ 的時間將 queue 的 tail 指向應有的地方。 ```clike= typedef struct _task { void (*func)(void *); void *arg; struct _task *next, *last; } task_t; ``` * queue:裡面放置著等待要被處理的 task 們 利用 mutex 來確保不會有 race condition 的發生。 而 condition variable 在觀看的時候發現只有被初始化而已,沒有其他地方使用到。 ```clike= typedef struct { task_t *head, *tail; pthread_mutex_t mutex; pthread_cond_t cond; uint32_t size; } tqueue_t; ``` * thread pool:worker thread 的管理員,裡面放置了所有 thread 和 job queue ```clike= typedef struct { pthread_t *threads; uint32_t count; tqueue_t *queue; } tpool_t; ``` #### threadpool.c - task_free:釋放單一 task 先前所配置的記憶體空間 - tqueue_init:初始化 queue - tqueue_pop:將 queue 中的單一 task 取出,派遣給 thread - tqueue_size:回傳 queue 當前的大小 - tqueue_push:將新的 task 加入 queue 中 - tqueue_free:釋放已配置的 queue 空間 - tpool_init:初始化 threadpool - tpool_free:釋放 threadpool 記憶體空間 ### main.c * `merge_list()` 傳入兩個list,然後去比較2個list最小的那個再一一挑出,放到新的list裡面,直到其中一個list的數被挑完,還有數字的list就接到已排序好的list後面。 * `merge_sort()` 如果傳入的list size小於2直接回傳該list,否則就將他從中間分成2個list之後丟進 `merge_list()` 做排序並回傳排序好的list。 * `merge` 裡面去判斷現在的list總共串了幾個,如果小於輸入的資料數,就繼續做排序串起來,不然就設置終止task然後印出排序好的list。 * ==發現cut_func()跟merge_sort()都是再做切割list,而cut_func裡面判斷cut_count是否小於max_cut其實沒有必要,所以把cut_count等相關變數刪掉,也把merge_sort()刪掉,留下cut_func()就好。== * `task_run` 將我們要做的事寫成task,放進job queue裡,空閒的thread會從queue裡拿task出來做。 ### Makefile >[參考](https://gcc.gnu.org/onlinedocs/gcc/Preprocessor-Options.html) ``` -M 生成文件關聯的信息。包含目標文件所依賴的所有源代碼 -MM 和-M一樣,但是它將忽略由#include<file>;造成的依賴關係。 -MD 和-M相同,但是輸出將導入到.d的文件裡面 -MMD 和-MM相同,但是輸出將導入到.d的文件裡面 -MF 指定輸出到某個檔案,否則預設是原始檔案檔名.d ``` [參考](https://gcc.gnu.org/onlinedocs/gcc/Link-Options.html) ``` -rdynamic: 指定 linker 將所有 label 寫入到 dynamic symbol table。 ``` ## 程式實驗 ### mutrace ```clike= $ (for i in {1..8}; do echo $RANDOM; done) | mutrace ./sort 4 8 mutrace: 0.2 sucessfully initialized for process sort (pid 11679). input unsorted data line-by-line sorted results: [300] [2022] [6225] [10989] [17303] [23622] [32251] [32551] mutrace: Showing statistics for process sort (pid 11679). mutrace: 3 mutexes used. mutrace: Showing 3 most contended mutexes: Mutex # Locked Changed Cont. tot.Time[ms] avg.Time[ms] max.Time[ms] Flags 0 145 40 30 0.023 0.000 0.001 Mx.--. 1 13 10 1 0.002 0.000 0.000 M-.--. 2 20 3 0 0.006 0.000 0.001 M-.--. |||||| /||||| Object: M = Mutex, W = RWLock /|||| State: x = dead, ! = inconsistent /||| Use: R = used in realtime thread /|| Mutex Type: r = RECURSIVE, e = ERRRORCHECK, a = ADAPTIVE /| Mutex Protocol: i = INHERIT, p = PROTECT / RWLock Kind: r = PREFER_READER, w = PREFER_WRITER, W = PREFER_WRITER_NONREC mutrace: Note that the flags column R is only valid in --track-rt mode! mutrace: Total runtime is 0.883 ms. mutrace: Results for SMP with 8 processors. ``` * 表格: * `Locked`: how often the mutex was locked during the entire runtime. * `Changed`: how often the owning thread of the mutex changed. =>If the number is high this means the risk of contention is also high. * `Cont.`: how often the lock was already taken when we tried to take it and we had to wait. ==> Mutex #1 is the most contended lock * `tot.Time[ms]`: for how long during the entire runtime the lock was locked * `avg.Time[ms]`: the average lock time * `max.Time[ms]`: the longest time the lock was held. * `Flags`: what kind of mutex this is (recursive, normal or otherwise). 按照作業要求中所教的使用 `addr2line` 找到實際對應的程式行數,卻顯示 `??:0`, 目前還在尋找原因 ```clike= $ addr2line -e sort 0x38 ??:0 ``` ### UNIX 指令組合 將 [phonebook-concurrent](https://github.com/sysprog21/phonebook-concurrent) 裡的 `dictionary/words.txt` 透過 UNIX 指令打亂順序,之後重新導向到另一個檔案 `input.txt`,變成merge sort新的資料輸入 ```clike= $ uniq words.txt | sort -R > input.txt ``` * `sort`:將輸入資料排序 * `-R` flag:隨機排序 * `uniq`:可以用來將重複的行刪除而只顯示一個 * `-c` flag:顯示重複字詞的出現次數 * `wc`:列出檔案有多少行,多少單字,多少字元 * `-l/w/m`:僅列出行、單字、字元 ### refactor * list.[ch] - list_add * return 永遠是 0。 ==> return list head 回傳建好後新的 `llist` 指標,使用更彈性。 * 註解提到:若傳入的 val 已經存在,則不再重新建立新的 node,原程式碼中並沒有相對應的檢查機制。 ```clike= llist_t list_add(llist_t *list, val_t val) { //check if value already exist node_t *cur = list->head; while(cur) { if(cur-> data == val) return list; cur = cur->next; } //add to the head of the list node_t *e = node_new(val, NULL); e->next = list->head; list->head = e; list->size++; return list; } ``` - list_nth - index out of range 條件判斷有誤,把 nth 的 n 當作 array 的 index,排序為 0 ~ list->size-1 ==> 判斷 out of range 條件改為 `if(idx > list->size-1)` ==> 多加一個判斷`if(idx == 0) return cur;` ```clike= node_t *list_nth(llist_t *list, uint32_t idx) { if (idx > list->size-1) return NULL; node_t *head = list->head; if(idx == 0) return head; while (idx--) head = head->next; return head; } ``` * threadpool.[ch] - tqueue_init: * 未進行所傳入 `tqueue_t` 的記憶體空間配置 ==> 直接在函式中用 malloc 配置空間,成功回傳 0,失敗回傳 -1 ```clike= int tqueue_init(tqueue_t *the_queue) { // if((the_queue = (tqueue_t *)malloc(sizeof(tqueue_t)) )== NULL) // return -1; the_queue->head = NULL; the_queue->tail = NULL; pthread_mutex_init(&(the_queue->mutex), NULL); pthread_cond_init(&(the_queue->cond), NULL); the_queue->size = 0; return 0; } ``` ==> 後來把 `if((the_queue = (tqueue_t *)malloc(sizeof(tqueue_t)) )== NULL) return -1;` 註解掉,才不會發生 segmentation fault(core dumped) - tqueue_pop: * Queue 應該是 FIFO,原來程式碼 pop 出來的不是頭而是尾 ==> 把 `tail` 改成 `head` ```clike= task_t *tqueue_pop(tqueue_t *the_queue) { task_t *ret; pthread_mutex_lock(&(the_queue->mutex)); ret = the_queue->head; if (ret) { the_queue->head = ret->next; if (the_queue->head) { the_queue->head->last = NULL; } else { the_queue->tail = NULL; } the_queue->size--; } pthread_mutex_unlock(&(the_queue->mutex)); return ret; } ``` - tqueue_push: * 回傳值沒有處理錯誤情況 ==> 若 task 不存在則回傳 -1,成功 push 則回傳 0 * 新加入的 task 應該置於 queue 的尾端,這裡放在頭 ==> 將 `head` 改為 `tail` ```clike= int tqueue_push(tqueue_t *the_queue, task_t *task) { if(task == NULL) return -1; pthread_mutex_lock(&(the_queue->mutex)); task->next = NULL; task->last = the_queue->tail; if (the_queue->tail) the_queue->tail->next = task; the_queue->tail = task; if (the_queue->size++ == 0) the_queue->head = task; pthread_mutex_unlock(&(the_queue->mutex)); return 0; } ``` - tqueue_free: * 回傳值無意義 ==> 判斷 `the_queue` 是否存在的情況,否則回傳 -1,並且不在執行下面釋放記憶體部分 * condition variable 有被初始,但是沒有被 destroy ==>`thread_cond_detroy(&(the_queue->cond));` ```clike= int tqueue_free(tqueue_t *the_queue) { if(the_queue == NULL) return -1; task_t *cur = the_queue->head; while (cur) { the_queue->head = the_queue->head->next; free(cur); cur = the_queue->head; } pthread_mutex_destroy(&(the_queue->mutex)); pthread_cond_destroy(&(the_queue->cond)); return 0; } ``` - tpool_free: * 此函式回傳值無意義 ==> 增加 `the_pool` 是否存在的判斷,以及`pthread_join()` 是否成功的判斷 ```clike= int tpool_free(tpool_t *the_pool) { if(the_pool == NULL) return -1; for (uint32_t i = 0; i < the_pool->count; ++i) if(pthread_join(the_pool->threads[i], NULL) != 0) return -1; free(the_pool->threads); tqueue_free(the_pool->queue); return 0; } ``` >>[參考](https://hackmd.io/MwQwxqCmDsBsC00AMATF8AsBOD14iQDMBGeMSAIxSUljDAoCZYg=) >>[參考](https://hackmd.io/MwQwRgpiAcAMBmBaA7MALMxaBsAmArItLsNIgJwjACMI2yu02wEQA===) >>[參考](https://hackmd.io/IYTgzAJhBGEKwFo4jgDgQFgAwgEwOgDZV8AzMAUyJA2gEZxSg===) >>[參考](https://hackmd.io/CwQwxmCsAcBGAMBaaAzeTgEYDMAmRC2miA7NufAJzqwiwCmQA===) ### thread pool [concurrent-ll](https://github.com/jserv/concurrent-ll) lock-free ### 設計自動測試實驗 在 Makefile 中加入規則,利用系統內建的工具來隨機產生資料以及比較結果。 >>[參考實作](https://hackmd.io/MwQwxqCmDsBsC00AMATF8AsBOD14iQDMBGeMSAIxSUljDAoCZYg=) ```clike= check: all sort -R words.txt | ./sort $(THREAD_NUM) $(shell wc -l words.txt) > sorted.txt diff -u words.txt sorted.txt && echo "Verified!" || echo "Failed!" ``` #### 其他方法 >>[參考實作](https://hackmd.io/s/B1-Kytv0) ### Scalability [Scalability](https://en.wikipedia.org/wiki/Scalability) * Horizontal scalar (scale out/in):新增/減少節點到系統中。例如:分散式運算,由多台電腦共同執行任務 * Vertical scalar (scale up/down):增強/減弱單台電腦的性能。例如:使用多核心 #### 研究 [concurrent-II](https://github.com/jserv/concurrent-ll) 如何計算 scalabilty >>[參考實作](https://hackmd.io/IYTgzAJhBGEKwFo4jgDgQFgAwgEwOgDZV8AzMAUyJA2gEZxSg===?both) --- ### Git Hooks [source](http://tech.mozilla.com.tw/posts/5306) >[Customizing Git - Git Hooks](https://git-scm.com/book/zh-tw/v2/Customizing-Git-Git-Hooks) --- <style> h2.part {color: red;} h3.part {color: green;} h4.part {color: blue;} h5.part {color: black;} </style>