owned this note
owned this note
Published
Linked with GitHub
# [mergesort-concurrent](https://hackmd.io/s/rJ-GWtJ0)
[github](https://github.com/diana0651/mergesort-concurrent) contributed by <`Diana Ho`>
###### tags: `d0651` `sys`
## 案例分析
#### 對單向 Linked List
### 兩大重點
- 排序的對象是 singly-linked list
- 利用 POSIX Thread 處理,需要一併操作 synchronization object
### 預期目標
- [ ] 作為 [concurrency](/s/H10MXXoT) 的展示案例
- [ ] 學習 POSIX Thread Programming,特別是 [synchronization object](https://docs.oracle.com/cd/E19683-01/806-6867/6jfpgdcnd/index.html)
- [ ] 為日後效能分析和 scalability 研究建構基礎建設
- [ ] 學習程式品質分析和相關的開發工具
### 作業要求
- [ ] 將 merge sort 的實做改為可接受 [phonebook-concurrent](https://github.com/sysprog21/phonebook-concurrent) 的 35 萬筆資料輸入的資料檔
* 字典檔資料需要事先用 `sort -R` 處理過
* 思考如何得到均勻分佈的亂數排列,並且設計自動測試的機制
- [ ] 研究 thread pool 管理 worker thread 的實做,提出實做層面的不足,並且參照 [concurrent-ll](https://github.com/jserv/concurrent-ll),提出 lock-free 的實做
- [ ] 學習 [concurrent-ll](https://github.com/jserv/concurrent-ll) (concurrent linked-list 實作) 的 scalability 分析方式,透過 gnuplot 製圖比較 merge sort 在不同執行緒數量操作的效能
* 注意到 linked list 每個節點配置的記憶體往往是不連續,思考這對效能分析的影響
- [ ] 一併嘗試重構 (refactor) 給定的程式碼,使得程式更容易閱讀和維護。延續 [A05: introspect](/s/BkhIF92p),不只是在共筆上用文字提出良性詳盡的批評,也該反映在程式碼的變革
- [ ] 共筆的內容儘量用 GraphViz 製作
---
## POSIX Thread Programming
>>[參考概念](https://hackmd.io/AwTgzARgJhBMBmBaAbPAxgU0QFmWiiEAHAIYiKwDsoyRVEAjGsEA)
### Model
>[Designing Threaded Programs](http://maxim.int.ru/bookshelf/PthreadsProgram/htm/r_19.html)
* Boss/Worker Model
Boss 接收到一個新要求時,動態建立一個 Worker 完成該項要求,適用於伺服器上。可於程式開始時先建立好一定數量的執行緒(Thread Pool),減少執行時期的負擔。
* Peer Model (Workcrew model)
相較於 Boss/Worker 模式,此模式的每執行緒擁有自己的輸入,適用於矩陣運算、資料庫搜尋等。
* Pipeline Model
執行緒由上一階段取得輸入資料,並將結果傳至下一階段的執行緒。整體輸出受制於執行最久的階段,須注意每一階段的工作負擔要盡可能平均。
>>[參考概念](https://hackmd.io/IYTgzAJhBGEKwFo4jgDgQFgAwgEwOgDZV8AzMAUyJA2gEZxSg===)
### manager-worker
[架構](http://maxim.int.ru/bookshelf/PthreadsProgram/htm/r_19.html)
```graphviz
digraph hierarchy {
nodesep=0.5 // increases the separation between nodes
node [color=black,fontname=Courier,shape=box] //All nodes will this shape and colour
edge [color=black, style=dashed] // All the lines look like this
Manager->{Worker1 Worker2 Worker3}
}
```
為了分配工作,在 worker thread 實作 Task 的機制。每個主要的操作會先被放進 task queue 裡頭,空閒的 thread 再從 task queue 裡頭提取 task 執行,只要把我們的操作寫成 task,就能順利執行。
```graphviz
digraph {
開始Process[shape="box", style=rounded];
是否為結束Task[shape="diamond"];
結束Process[shape="box", style=rounded];
開始Process->提取Task->是否為結束Task
是否為結束Task->重發結束Task[label="是"]
重發結束Task->結束Process
是否為結束Task->執行Task[label="否"]
執行Task->提取Task;
}
```
>>[參考圖片](https://hackmd.io/s/rJ-GWtJ0)
### Synchronization
[synchronization object](https://docs.oracle.com/cd/E19683-01/806-6867/6jfpgdcnd/index.html)
* pthread_join: 某執行緒暫停執行直到另一直行緒結束
* Mutex: 同一時間內只有一執行緒可以保有該 lock 及保護資料的存取權
* Initialize
* Static:```PTHREAD_MUTEX_INITIALIZER```
* Dynamic:```pthread_mutex_init()```
* Lock
* ```pthread_mutex_lock()```
* Release
* ```pthread_mutex_unlock()```
* Condition variable: 將事件實質化,並提供函式喚醒等待該事件的執行緒
* Initialize
* Static:```PTHREAD_COND_INITIALIZER```
* Dynamic:```pthread_cond_init()```
* Waits for condition variables
* 等待直到喚醒:```pthread_cond_wait()```
* 等待特定時間:```pthread_cond_timewait()```
* 喚醒等待中執行緒
* 喚醒其中之一:```pthread_cond_signal()```
* 喚醒所有等待中執行緒:```pthread_cond_broadcast()```
* 為避免 [Spurious Wake Ups](https://en.wikipedia.org/wiki/Spurious_wakeup) 或其他先被喚醒的執行緒執行導致條件變數再次不成立,一般會以迴圈判斷條件是否成立,不成立則再次進入等待
* pthread_once: 保證初始函式被許多直行緒呼叫時僅執行一次
* 宣告 pthread_once_t 並靜態初始為```PTHREAD_ONCE_INIT```
* 以 pthread_once() 與 once 區域呼叫目標函式
* 不容許傳遞參數至 once 所保護的函式
### Pthreads Management
#### Key
一種指標,將資料和執行緒進行關聯
#### Thread cancellation
* States:
* ```PTHREAD_CANCEL_DISABLE```
* Type: ignored
* ```PTHREAD_CANCEL_ENABLE```
* Types:
* ```PTHREAD_CANCEL_ASYNCHRONOUS```:立刻取消
* ```PTHREAD_CANCEL_DEFERRED```:當執行到 Cancellation-point 時發生
* Cancellation-point
* ```pthread_cond_wait(), pthread_cond_timewait(), pthread_join()```
* 使用者定義```pthread_testcancel()```
* Asynchronous Cancellation: 當執行緒在此狀態時,即使在執行函式庫呼叫或是系統呼叫時也能夠被取消,除非被定義為 cancellation-safe 否則應防止該情形發生
### mutex contention
mutrace 可用來偵測 [lock contention](https://en.wikipedia.org/wiki/Lock_(computer_science)#Granularity),使用方便,不需要重新編譯程式碼。
>>[參考概念](https://hackmd.io/CwQwxmCsAcBGAMBaaAzeTgEYDMAmRC2miA7NufAJzqwiwCmQA===?both)
* **Lock contention**: this occurs whenever **one process or thread attempts to acquire a lock held by another process or thread.** The more **fine-grained** the available locks, the less likely one process/thread will request a lock held by the other. (For example, locking a row rather than the entire table, or locking a cell rather than the entire row.)
* mutrace v.s. valgrind/drd
In contrast to valgrind/drd it **does not virtualize the CPU instruction set**, making it a lot faster. In fact, the hooks mutrace relies on to profile mutex operations should only minimally influence application runtime. Mutrace is **not useful for finding synchronizations bugs**, it is solely useful for profiling locks.
* About mutrace
* mutrace is implemented entirely in **userspace**.
* build your application with `-rdynamic` to make the backtraces mutrace generates useful.
* `-r`| --track-rt: checks on each mutex operation wheter it is executed by a realtime thread or not.
=>use this to track down which mutexes are good candidates for **priority inheritance**.
* `matrace` that can be used to track down **memory allocation** operations in realtime threads.
> [Measuring Lock Contention](http://0pointer.de/blog/projects/mutrace.html)
> [More Mutrace](http://0pointer.net/blog/projects/mutrace2.html)
---
## 閱讀程式碼
### list.[ch]
#### list.h
* node:儲存實際的資料
```clike=
typedef struct node {
val_t data;
struct node *next;
} node_t;
```
* linked list:建立 linked list 順便紀錄長度,所以在分割 list 的時候只需要花 $O(N)$ 的時間就好,不需要重新計算長度。
```clike=
typedef struct llist {
node_t *head;
uint32_t size;
} llist_t;
```
:::info
#### intptr_t 資料型態
>>[When/Why to use intptr_t for type-casting in C?](http://stackoverflow.com/questions/6326338/why-when-to-use-intptr-t-for-type-casting-in-c)
>>[What is the use of intptr_t?](http://stackoverflow.com/questions/35071200/what-is-the-use-of-intptr-t)
* 功能
- 可pointer轉換成integer型態,做位元運算與加減法。
相較於 `void *`,`intptr_t` 的位址可以做 bitwise operation 而 `void *` 不能。做位元運算,無號的`uintptr_t`會比較好。
```clike=
/* Types for `void *' pointers. */
#if __WORDSIZE == 64
# ifndef __intptr_t_defined
typedef long int intptr_t;
# define __intptr_t_defined
# endif
typedef unsigned long int uintptr_t;
#else
# ifndef __intptr_t_defined
typedef int intptr_t;
# define __intptr_t_defined
# endif
typedef unsigned int uintptr_t;
#endif
```
- pointer 轉成 integer 時,32 bits 或 64 bits 的 machine 會有 pointer 長度的差別。若使用到 `intptr_t`,往後我們只需 `#include <stdint>` 就能免除麻煩的型態轉換。除此之外,也是確保型態轉換上不會出問題的保守作法。
```clike=
#ifdef MACHINE 64
typedef long intptr;
#else // MACHINE 32
typedef int intptr;
#endif
```
* 目的
- 讓 node 中的資料可以是指標型態,也可以是一般資料型態。
- 為了相容不同平台所設計。
:::
#### list.c
- node_new:在 next 前方放入一個新的 node,並回傳指向該 node 之指標
- list_new:初始化一串新的 linked-list
- list_add:在 linked-list 的尾端加入一個新的 node
### threadpool.[ch]
#### threadpool.h
* task:讓 thread 知道自己要做什麼
採用的 doubly linked list。紀錄前一個 task 的用意在於,當從 queue 中拿出工作後,可以在 $O(1)$ 的時間將 queue 的 tail 指向應有的地方。
```clike=
typedef struct _task {
void (*func)(void *);
void *arg;
struct _task *next, *last;
} task_t;
```
* queue:裡面放置著等待要被處理的 task 們
利用 mutex 來確保不會有 race condition 的發生。
而 condition variable 在觀看的時候發現只有被初始化而已,沒有其他地方使用到。
```clike=
typedef struct {
task_t *head, *tail;
pthread_mutex_t mutex;
pthread_cond_t cond;
uint32_t size;
} tqueue_t;
```
* thread pool:worker thread 的管理員,裡面放置了所有 thread 和 job queue
```clike=
typedef struct {
pthread_t *threads;
uint32_t count;
tqueue_t *queue;
} tpool_t;
```
#### threadpool.c
- task_free:釋放單一 task 先前所配置的記憶體空間
- tqueue_init:初始化 queue
- tqueue_pop:將 queue 中的單一 task 取出,派遣給 thread
- tqueue_size:回傳 queue 當前的大小
- tqueue_push:將新的 task 加入 queue 中
- tqueue_free:釋放已配置的 queue 空間
- tpool_init:初始化 threadpool
- tpool_free:釋放 threadpool 記憶體空間
### main.c
* `merge_list()` 傳入兩個list,然後去比較2個list最小的那個再一一挑出,放到新的list裡面,直到其中一個list的數被挑完,還有數字的list就接到已排序好的list後面。
* `merge_sort()` 如果傳入的list size小於2直接回傳該list,否則就將他從中間分成2個list之後丟進 `merge_list()` 做排序並回傳排序好的list。
* `merge` 裡面去判斷現在的list總共串了幾個,如果小於輸入的資料數,就繼續做排序串起來,不然就設置終止task然後印出排序好的list。
* ==發現cut_func()跟merge_sort()都是再做切割list,而cut_func裡面判斷cut_count是否小於max_cut其實沒有必要,所以把cut_count等相關變數刪掉,也把merge_sort()刪掉,留下cut_func()就好。==
* `task_run` 將我們要做的事寫成task,放進job queue裡,空閒的thread會從queue裡拿task出來做。
### Makefile
>[參考](https://gcc.gnu.org/onlinedocs/gcc/Preprocessor-Options.html)
```
-M
生成文件關聯的信息。包含目標文件所依賴的所有源代碼
-MM
和-M一樣,但是它將忽略由#include<file>;造成的依賴關係。
-MD
和-M相同,但是輸出將導入到.d的文件裡面
-MMD
和-MM相同,但是輸出將導入到.d的文件裡面
-MF
指定輸出到某個檔案,否則預設是原始檔案檔名.d
```
[參考](https://gcc.gnu.org/onlinedocs/gcc/Link-Options.html)
```
-rdynamic:
指定 linker 將所有 label 寫入到 dynamic symbol table。
```
## 程式實驗
### mutrace
```clike=
$ (for i in {1..8}; do echo $RANDOM; done) | mutrace ./sort 4 8
mutrace: 0.2 sucessfully initialized for process sort (pid 11679).
input unsorted data line-by-line
sorted results:
[300] [2022] [6225] [10989] [17303] [23622] [32251] [32551]
mutrace: Showing statistics for process sort (pid 11679).
mutrace: 3 mutexes used.
mutrace: Showing 3 most contended mutexes:
Mutex # Locked Changed Cont. tot.Time[ms] avg.Time[ms] max.Time[ms] Flags
0 145 40 30 0.023 0.000 0.001 Mx.--.
1 13 10 1 0.002 0.000 0.000 M-.--.
2 20 3 0 0.006 0.000 0.001 M-.--.
||||||
/|||||
Object: M = Mutex, W = RWLock /||||
State: x = dead, ! = inconsistent /|||
Use: R = used in realtime thread /||
Mutex Type: r = RECURSIVE, e = ERRRORCHECK, a = ADAPTIVE /|
Mutex Protocol: i = INHERIT, p = PROTECT /
RWLock Kind: r = PREFER_READER, w = PREFER_WRITER, W = PREFER_WRITER_NONREC
mutrace: Note that the flags column R is only valid in --track-rt mode!
mutrace: Total runtime is 0.883 ms.
mutrace: Results for SMP with 8 processors.
```
* 表格:
* `Locked`: how often the mutex was locked during the entire runtime.
* `Changed`: how often the owning thread of the mutex changed.
=>If the number is high this means the risk of contention is also high.
* `Cont.`: how often the lock was already taken when we tried to take it and we had to wait.
==> Mutex #1 is the most contended lock
* `tot.Time[ms]`: for how long during the entire runtime the lock was locked
* `avg.Time[ms]`: the average lock time
* `max.Time[ms]`: the longest time the lock was held.
* `Flags`: what kind of mutex this is (recursive, normal or otherwise).
按照作業要求中所教的使用 `addr2line` 找到實際對應的程式行數,卻顯示 `??:0`, 目前還在尋找原因
```clike=
$ addr2line -e sort 0x38
??:0
```
### UNIX 指令組合
將 [phonebook-concurrent](https://github.com/sysprog21/phonebook-concurrent) 裡的 `dictionary/words.txt` 透過 UNIX 指令打亂順序,之後重新導向到另一個檔案 `input.txt`,變成merge sort新的資料輸入
```clike=
$ uniq words.txt | sort -R > input.txt
```
* `sort`:將輸入資料排序
* `-R` flag:隨機排序
* `uniq`:可以用來將重複的行刪除而只顯示一個
* `-c` flag:顯示重複字詞的出現次數
* `wc`:列出檔案有多少行,多少單字,多少字元
* `-l/w/m`:僅列出行、單字、字元
### refactor
* list.[ch]
- list_add
* return 永遠是 0。
==> return list head 回傳建好後新的 `llist` 指標,使用更彈性。
* 註解提到:若傳入的 val 已經存在,則不再重新建立新的 node,原程式碼中並沒有相對應的檢查機制。
```clike=
llist_t list_add(llist_t *list, val_t val)
{
//check if value already exist
node_t *cur = list->head;
while(cur) {
if(cur-> data == val) return list;
cur = cur->next;
}
//add to the head of the list
node_t *e = node_new(val, NULL);
e->next = list->head;
list->head = e;
list->size++;
return list;
}
```
- list_nth
- index out of range 條件判斷有誤,把 nth 的 n 當作 array 的 index,排序為 0 ~ list->size-1
==> 判斷 out of range 條件改為 `if(idx > list->size-1)`
==> 多加一個判斷`if(idx == 0) return cur;`
```clike=
node_t *list_nth(llist_t *list, uint32_t idx)
{
if (idx > list->size-1)
return NULL;
node_t *head = list->head;
if(idx == 0)
return head;
while (idx--)
head = head->next;
return head;
}
```
* threadpool.[ch]
- tqueue_init:
* 未進行所傳入 `tqueue_t` 的記憶體空間配置
==> 直接在函式中用 malloc 配置空間,成功回傳 0,失敗回傳 -1
```clike=
int tqueue_init(tqueue_t *the_queue)
{
// if((the_queue = (tqueue_t *)malloc(sizeof(tqueue_t)) )== NULL)
// return -1;
the_queue->head = NULL;
the_queue->tail = NULL;
pthread_mutex_init(&(the_queue->mutex), NULL);
pthread_cond_init(&(the_queue->cond), NULL);
the_queue->size = 0;
return 0;
}
```
==> 後來把 `if((the_queue = (tqueue_t *)malloc(sizeof(tqueue_t)) )== NULL) return -1;` 註解掉,才不會發生 segmentation fault(core dumped)
- tqueue_pop:
* Queue 應該是 FIFO,原來程式碼 pop 出來的不是頭而是尾
==> 把 `tail` 改成 `head`
```clike=
task_t *tqueue_pop(tqueue_t *the_queue)
{
task_t *ret;
pthread_mutex_lock(&(the_queue->mutex));
ret = the_queue->head;
if (ret) {
the_queue->head = ret->next;
if (the_queue->head) {
the_queue->head->last = NULL;
} else {
the_queue->tail = NULL;
}
the_queue->size--;
}
pthread_mutex_unlock(&(the_queue->mutex));
return ret;
}
```
- tqueue_push:
* 回傳值沒有處理錯誤情況
==> 若 task 不存在則回傳 -1,成功 push 則回傳 0
* 新加入的 task 應該置於 queue 的尾端,這裡放在頭
==> 將 `head` 改為 `tail`
```clike=
int tqueue_push(tqueue_t *the_queue, task_t *task)
{
if(task == NULL) return -1;
pthread_mutex_lock(&(the_queue->mutex));
task->next = NULL;
task->last = the_queue->tail;
if (the_queue->tail)
the_queue->tail->next = task;
the_queue->tail = task;
if (the_queue->size++ == 0)
the_queue->head = task;
pthread_mutex_unlock(&(the_queue->mutex));
return 0;
}
```
- tqueue_free:
* 回傳值無意義
==> 判斷 `the_queue` 是否存在的情況,否則回傳 -1,並且不在執行下面釋放記憶體部分
* condition variable 有被初始,但是沒有被 destroy
==>`thread_cond_detroy(&(the_queue->cond));`
```clike=
int tqueue_free(tqueue_t *the_queue)
{
if(the_queue == NULL) return -1;
task_t *cur = the_queue->head;
while (cur) {
the_queue->head = the_queue->head->next;
free(cur);
cur = the_queue->head;
}
pthread_mutex_destroy(&(the_queue->mutex));
pthread_cond_destroy(&(the_queue->cond));
return 0;
}
```
- tpool_free:
* 此函式回傳值無意義
==> 增加 `the_pool` 是否存在的判斷,以及`pthread_join()` 是否成功的判斷
```clike=
int tpool_free(tpool_t *the_pool)
{
if(the_pool == NULL) return -1;
for (uint32_t i = 0; i < the_pool->count; ++i)
if(pthread_join(the_pool->threads[i], NULL) != 0) return -1;
free(the_pool->threads);
tqueue_free(the_pool->queue);
return 0;
}
```
>>[參考](https://hackmd.io/MwQwxqCmDsBsC00AMATF8AsBOD14iQDMBGeMSAIxSUljDAoCZYg=)
>>[參考](https://hackmd.io/MwQwRgpiAcAMBmBaA7MALMxaBsAmArItLsNIgJwjACMI2yu02wEQA===)
>>[參考](https://hackmd.io/IYTgzAJhBGEKwFo4jgDgQFgAwgEwOgDZV8AzMAUyJA2gEZxSg===)
>>[參考](https://hackmd.io/CwQwxmCsAcBGAMBaaAzeTgEYDMAmRC2miA7NufAJzqwiwCmQA===)
### thread pool
[concurrent-ll](https://github.com/jserv/concurrent-ll)
lock-free
### 設計自動測試實驗
在 Makefile 中加入規則,利用系統內建的工具來隨機產生資料以及比較結果。
>>[參考實作](https://hackmd.io/MwQwxqCmDsBsC00AMATF8AsBOD14iQDMBGeMSAIxSUljDAoCZYg=)
```clike=
check: all
sort -R words.txt | ./sort $(THREAD_NUM) $(shell wc -l words.txt) > sorted.txt
diff -u words.txt sorted.txt && echo "Verified!" || echo "Failed!"
```
#### 其他方法
>>[參考實作](https://hackmd.io/s/B1-Kytv0)
### Scalability
[Scalability](https://en.wikipedia.org/wiki/Scalability)
* Horizontal scalar (scale out/in):新增/減少節點到系統中。例如:分散式運算,由多台電腦共同執行任務
* Vertical scalar (scale up/down):增強/減弱單台電腦的性能。例如:使用多核心
#### 研究 [concurrent-II](https://github.com/jserv/concurrent-ll) 如何計算 scalabilty
>>[參考實作](https://hackmd.io/IYTgzAJhBGEKwFo4jgDgQFgAwgEwOgDZV8AzMAUyJA2gEZxSg===?both)
---
### Git Hooks
[source](http://tech.mozilla.com.tw/posts/5306)
>[Customizing Git - Git Hooks](https://git-scm.com/book/zh-tw/v2/Customizing-Git-Git-Hooks)
---
<style>
h2.part {color: red;}
h3.part {color: green;}
h4.part {color: blue;}
h5.part {color: black;}
</style>