```cpp #include <pthread.h> #include <stdlib.h> #include <unistd.h> #include <stdio.h> int myglobal; void *thread_function(void *arg) { for (int i = 0; i < 20; i++) { int j = myglobal; j = j + 1; printf("."); fflush(stdout); sleep(1); myglobal = j; } return NULL; } int main(void) { pthread_t mythread; if (pthread_create(&mythread, NULL, thread_function, NULL)) { printf("error creating thread."); abort(); } for (int i = 0; i < 20; i++) { myglobal = myglobal + 1; printf("o"); fflush(stdout); sleep(1); } if (pthread_join(mythread, NULL)) { printf("error joining thread."); abort(); } printf("\nmyglobal equals %d\n",myglobal); exit(0); } ``` 執行結果 ``` o.o.o.o.o..o.o.o.o.o.o.o.oo..oo.o..o.oo. myglobal equals 21 ``` myglobal 起始值為 0,主執行緒與新執行緒各將 myglobal 增加 20,照理說應該是 40,但輸出 21 原因為何? 因為兩條執行緒同時執行,當 thread_fucntion 把 j 值寫回 myglobal,他會複寫主執行緒中的修改。 此便為程式同步執行所造成的競爭危害問題。 --- >> 將 concurrent / concurrency 統一翻譯為「並行」 [name=jserv] * ### 競爭危害(race conditon) 兩個工作同時存記憶體的過程中,並不會即時更新記憶體資料,造成最後用舊的資料計算,產生錯誤結果。 * ### 臨界區域(critical section) 共用資源被並行存取可能造成不可預期的錯誤,所以有些程式共用資源的存取會被保護,這些保護區域就稱為臨界區域。其只能被一個 process 執行。 * 解決 critical section 必須滿足3個條件: * #### Mutual Exclusion(互斥鎖,簡稱 Mutex) Mutex 是一種並行操作(concurrency control),用在多執行緒程式設計中,目的為避免競爭危害(race conditions),不會有兩個 processes 同時進入同一個臨界區域(critical section)。 * #### Progress (1)當沒有 process 在 critical section 內執行時,不能阻擋其他請求進入 critical section 的 processes 進入。 (2)從那些想進入 critical section 之 processes 決定一個 process 可以進入 critical section 的決定時間是有限的。(No Deadlock) * #### Bounded Waiting Process 要求進入 critical section 有個等待時間的bound、一個限制,也就是一個 process 不能一直佔住 critical section。i.e. 若有 n 個 processes 皆想進入 critical section,則任一 process 至多等 n-1 次後即可進入。(No Starvation) * 解決 critical section 的方法: * #### Software Based Solution - Peterson's Solution * #### Synchronization Hardware 以硬體方式解決,藉由 lock 來保護 critical section。 * #### Atomic Operation [參考:makelinux-Atomic Operation](http://www.makelinux.net/books/lkd2/ch09lev1sec1) 原子操作 (Atomic Operation) 意思是不可被中斷的操作,如同原子一般是不可再分割的。 * #### Test and Set (TAS) * #### Compare and Swap (CAS) * 為一種在多執行緒系統中實現同步的原子操作,操作過程虛擬碼如下 ```clike CAS(*位址, 舊值, 新值){ if(*位址!=舊值) return 0 //失敗 *位址 = 新值 return 1 //成功 } ``` 使用上會記錄某塊記憶體的舊值,如果這塊記憶體在期間內沒被修改過,舊值會與記憶體位址中的值相同,這時CAS操作會成功執行並將該記憶體位址存的值變為新值;反之,若在期間被修改過,則記憶體位址中的值會與舊值不同,CAS操作失敗,新值不被寫入。 * #### Semaphore * #### mutex 與 semaphore 的差異? 參考:[Mutexes VS Semaphores 大揭秘](http://swem.logdown.com/posts/177651-mutexes-and-semaphores-demystified)、[Semaphore 和 Mutex 的差異](https://hackpad.com/ep/pad/static/5TOjUJI2rKu) Mutex 和 Semphore 都是在解決同步問題的機制,用在保護 critical section,確保多個 process 同時存取資源時,執行結果不會因為執行程序的時間先後的影響而導致錯誤。然而其差異為: * semaphore 是一個 counter 的概念,可以設定有多少個 process 存取資源,如果存取的 process 數量到達上限,其他要求存取資源的 process 會被送到 wait queue 中,等待有 process 釋放資源,再繼續執行。 * Mutex 就像是一把鑰匙,會記錄擁有這把鑰匙的 owner 是哪個 process ,process 需要取得鑰匙後才能進到 critical section 存取資源,等到存取完成後才釋放出這把鑰匙的擁有權,達到 mutual exclusion。 * mutex 與 binary semapohre 差異為何? mutex 會導致 priority inversion。 * #### 優先權倒置 (Priority Inverse) [參考:Jserv's blog-Priority inversion 簡介](http://blog.linux.org.tw/~jserv/archives/001299.html) 在此情形下,高優先權的 task 會被低優先權的搶先(preempted),使得兩個 task 的相對優先權被顛倒。 如下圖所示,當具備中度優先權的 task (簡稱 M) 搶先 (preempt) 一個原本享有 resource 的低優先權的 task (簡稱 L),而該 resource 又是一個高優先權的 task (簡稱 H) 所等待。問題就在於,H 與 L 共享 resource,當 L 被 preempt 時,就該放下 resource,這是合理的行為,而原本 H 就在等待 resource 的釋放,因為隨後就會使用到。但問題是,這段 latency 中,M 把這個規則打破,先行 preempt 了 L,也就把 L 的 resource 給「搶走」,這下有趣的事情就發生了。原本 L 與 H 具備相對高低的優先權差異,但因為 M 的介入,造成延遲,這樣的過程可能就讓 H 發生超出 deadline 的情況。我們稱這樣逆轉 L 與 H 的優先權順序的現象為 priority inversion。 ![](https://i.imgur.com/Wy9kRk2.gif) * ### 經典同步問題 * #### Bounded-Buffer Problem * 描述 兩個行程 (生產者、消費者) 共享 buffer(由容量可分為 Bounded (有限的) 及 Unbounded (無限的) ,此討論有限情況),生產者負責產生資料並放入 buffer,然後重複這個過程;同時消費者從 buffer 消耗資料。本問題的關鍵在於,確保 buffer 滿時生產者不再放入資料;buffer 空時消費者不再消耗資料。 * 解決方法 生產者在 buffer 滿時進入睡眠 (sleep) 或放棄資料,當消費者消耗資料時再喚醒生產者放資料進 buffer;同理,消費者在 buffer 空時進入睡眠,當生產者放入資料時再喚醒消費者消費資料。可以透過 semapohres 達成,然而不完整解法可能造成死結 (deadlock),即兩個行程都進入睡眠等待被喚醒。 * #### Readers and Writers Problem * 描述 多個執行緒想要同時存取 (access) 同一個共用資源 (shared resource),有些想要讀 (read) 稱 reader 、有些想要寫 (write) 稱 writer,共用資源可以同時被讀但不能同時被寫。 * First readers-writers problem writers 可能 starve * Second readers-writers problem readers 可能 starve * 解決方法 Readers-Writer lock * #### Dining-Philosophers Problem ![](https://i.imgur.com/L6hsfYm.jpg) * 描述 五個哲學家只能思考和吃,他們圍著中間放有一碗飯的圓桌就座,每個哲學家左右手各有一支筷子,當一個哲學家餓了就會試著拿起左右各一支筷子吃飯,哲學家彼此無法得知其他人要在吃或在思考。問題的關鍵在於如何不讓哲學家 starve,如果哲學家都拿左邊的筷子,就會永遠等待右邊的筷子造成 Deadlock。 * 解決方法 Represent each chopstick with a semaphore --- 論文研讀:[A Pragmatic Implementation Of Non-Blocking Linked-Lists](https://timharris.uk/papers/2001-disc.pdf) ` This paper present a novel implementation of linked-lists which is non-blocking, linearizable and which is based on the compare-and-swap(CAS)operation found on contemporary processoers. ` 問題:若是在移除節點(10)的同時,插入新節點(20),可能會造成新插入節點的遺失 ![](https://i.imgur.com/MAjVTbn.png) 解決:作者提出使用兩個分開的 CAS (compare and swap) 分兩階段移除節點 1. Logically deleted : 先 mark 欲刪除節點(10)的 next 指標 2. Physically deleted : 刪除該節點(10) 所以假設要並行插入新節點(20)時,程式會觀察(10)是否已被 logically deleted,若有則嘗試 logically delete 它再插入新節點。 --- ### Thread Pool實作原理 在 Thread-Per-Message 模式中,當每次請求來到,就建立一個新的執行緒,用完就不再使用,然而執行緒的建立需要系統資源,對於一個接受許多請求的情況,不斷的建立新執行緒,會導致系統效能的降低。 >> 搭配閱讀: [Thread pools and work queues](http://www.ibm.com/developerworks/java/library/j-jtp0730/index.html) [name=jserv] 若能重複使用所建立的執行緒,而不是用完就丟,可以有效的重複利用資源。在 Worker Thread 模式的範例中,預先建立好執行緒,當請求佇列有新請求時,通知等待的執行緒取出請求進行處理,其實就是一種重複使用執行緒的方式。 示意圖如下: ```graphviz digraph hierarchy { nodesep=0.5 // increases the separation between nodes node [color=black,fontname=Courier,shape=box] //All nodes will this shape and colour edge [color=black, style=dashed] // All the lines look like this Manager->{Worker1 Worker2 Worker3} } ``` 為了分配工作,在 worker thread 實作 Task 的機制。每個主要的操作會先被放進 task queue 裡頭,空閒的 thread 再從 task queue 裡頭提取 task 執行,如下面的步驟: ```graphviz digraph { 開始Process[shape="box", style=rounded]; 是否為結束Task[shape="diamond"]; 結束Process[shape="box", style=rounded]; 開始Process->提取Task->是否為結束Task 是否為結束Task->重發結束Task[label="是"] 重發結束Task->結束Process 是否為結束Task->執行Task[label="否"] 執行Task->提取Task; } ``` 大多數 thread pool 實做都離不開 lock 的使用,如 pthread_mutex 結合 (condition variable) pthread_cond。一般來說,lock 的使用對於程式效能影響較大,雖然現有的 pthread_mutex 在 lock 的取得 (acquire) 與釋放,已在 Linux 核心和對應函式庫進行效能提昇,但我們仍會希望有不仰賴 lock 的 thread pool 的實做。 ![](https://i.imgur.com/ktljGJC.png) 如上圖所示,workqueue (工作佇列) 由 main thread (主執行緒) 和 worker thread 共享,main thread 將任務放進 workqueue,worker thread 從 workqueue 中取出任務執行。要注意到,共享 workqueue 的操作必須在 mutex 的保護下安全進行,main thread 將任務放進 workqueue 時,若偵測到目前待執行的工作數目小於 worker thread 總數,則要透過 condition variable 喚醒可能處於等待狀態的 worker thread。 ### Lock-Free Thread Pool 實作原理 ![](https://i.imgur.com/GjNMAjd.png) 為解決 lock-free 所面臨的議題,我們一定要避免共享資源的競爭 (contention),因此將共享 workqueue 加以拆分成每 worker thread 一個 workqueue 的方式,如上圖。 >> 沒有顯著進展!你們到底做了什麼? [name=jserv] #### 未修改前 * sort 1 thread `mutrace: Total runtime is 0.734 ms.` * sort 2 thread `mutrace: Total runtime is 1.097 ms.` >發現thread開越多時間越長[name=儂偉] #### 參考[mbrossard 完整的 ThreadPool](https://github.com/mbrossard/threadpool/blob/master/src/threadpool.c) 原始碼 去優化threadPool #### 未優化(4thread) ``` Mutex # Locked Changed Cont. tot.Time[ms] avg.Time[ms] max.Time[ms] Flags 1 177 43 27 0.048 0.000 0.002 Mx.--. 0 20 15 9 0.011 0.001 0.001 M-.--. 2 13 8 0 0.004 0.000 0.000 M-.--. |||||| /||||| Object: M = Mutex, W = RWLock /|||| State: x = dead, ! = inconsistent /||| Use: R = used in realtime thread /|| Mutex Type: r = RECURSIVE, e = ERRRORCHECK, a = ADAPTIVE /| Mutex Protocol: i = INHERIT, p = PROTECT / RWLock Kind: r = PREFER_READER, w = PREFER_WRITER, W = PREFER_WRITER_NONREC mutrace: Note that the flags column R is only valid in --track-rt mode! ``` >> 需要詳述優化策略,以及實作 fine-grained locks 和接著做到 lock-free [name=jserv] 著手修改 task_run ```c= pthread_mutex_lock(&(pool->mutex)); while ( pool->size == 0) { pthread_cond_wait(&(pool->cond), &(pool->mutex)); } if (pool->size == 0) break; pthread_mutex_unlock(&(pool->mutex)); task_t *_task = tqueue_pop(pool); ``` #### 優化後(thread) ``` Mutex # Locked Changed Cont. tot.Time[ms] avg.Time[ms] max.Time[ms] Flags 1 20 17 5 0.009 0.000 0.002 M-.--. 0 13 9 2 0.020 0.002 0.007 Mx.--. |||||| /||||| Object: M = Mutex, W = RWLock /|||| State: x = dead, ! = inconsistent /||| Use: R = used in realtime thread /|| Mutex Type: r = RECURSIVE, e = ERRRORCHECK, a = ADAPTIVE /| Mutex Protocol: i = INHERIT, p = PROTECT / RWLock Kind: r = PREFER_READER, w = PREFER_WRITER, W = PREFER_WRITER_NONREC mutrace: Note that the flags column R is only valid in --track-rt mode! ``` >>發現lock數量大大減少[name=儂偉] ``` ./sort 1 8 input unsorted data line-by-line sorted results: [8] [7] [6] [5] [4] [3] [2] [1] sorted results: [1] [2] [3] [4] [5] [6] [7] [8] ./sort 2 8 input unsorted data line-by-line sorted results: [8] [7] [6] [5] [4] [3] [2] [1] ``` >> 只要開超過2個thread就執行失敗,待解決[name=儂偉] >>發現原因[mbrossard 的 ThreadPool](https://github.com/mbrossard/threadpool/blob/master/src/threadpool.c) 直接在cut套用會產生鎖兩次 造成死鎖,重新動手改原本的thread_pool[name=儂偉] #### thread_pool 加入pthread_cond_t機制 參考[mbrossard 的 ThreadPool](https://github.com/mbrossard/threadpool/blob/master/src/threadpool.c) ```c= typedef struct { pthread_t *threads; uint32_t count; task_t *head, *tail; pthread_mutex_t mutex; pthread_cond_t cond; uint32_t size; } tpool_t; ``` * 將tqueue與tpool_t合併為tpool_t,使pool結構更簡單 #### 新增bench ```shell= bench:intput.txt for i in 1 2 4 8 16 32 64; do \ while read line;do \ #逐行讀取 echo $$line ; \ done < intput.txt | ./sort $$i $ 10000 ;\ done # '<':將原本需要由鍵盤輸入的資料,改由檔案內容('input.txt')來取代 # '|':管線命令。僅能處理經由前面一個指令傳來的正確資訊,也就是 standard output 的資訊,對於 stdandard error 並沒有直接處理的能力。 ``` #### 產生測資 * makefile ```shell= random_num: random_num.c $(CC) $(CFLAGS) $^ -o $@ intput.txt: random_num ./random_num $ 10000 ``` * random_num.c 利用洗牌來達到均勻分布的目的 ```c= /*random_num.c*/ srand(time(NULL)); int num_count, i; FILE *input = fopen("input.txt", "w"); num_count = atoi(argv[1]); int *arr; int pos , tmp; arr = (int *) malloc(sizeof(int) * num_count); for (i = 0; i < num_count; i++) //產生num_count筆數據 arr[i] = i + 1; for (i = 0; i < num_count; i++) { //進行洗牌 pos = rand() % num_count; tmp = arr[i]; arr[i] = arr [pos]; arr[pos] = tmp; //進行arr[i]及arr[pos]交換 } for (i = 0; i < num_count; i++) fprintf(input, "%d\n", arr[i]); free(arr); fclose(input); ``` >> 程式碼縮排為 4 個空白,不是 Tab,請修正 [name=jserv] >> 已修正[name=儂偉] #### 比較圖 ![](https://i.imgur.com/k5e2FsT.png) >>由於實驗用CPU為4thread,所以4thread最快,並且超過4的時間就會越長[name=儂偉] --- ## Lock contention * 閱讀[Lock contention](https://en.wikipedia.org/wiki/Lock_(computer_science)#Granularity) * lock overhead : 使用 lock 的額外資源,像是記憶體空間、初始化和摧毀 lock 的 CPU 時間等等。 * lock contention : 這會發生在一個程序或是執行緒要去請求一個正在被使用中的 lock 時。 * deadlock : 當兩個 task 正在搶奪一個正被另一個 task 使用爭的 lock。除非有什麼完成了,否則這兩個 task 會一直等待。 * granularity : 指的是 lock 中所保護的 data 數量。如果選擇少量的 lock 去保護大量的資料的話,會有較少的 lock overhead 但是較差的 concurrency 表現,這是因為會有 lock contention 的問題產生。 The more coarse the lock (這個不知道怎麼翻), lock 越容易去停止不相關的多工請求。 >> coarse 和 fine-grained 互為反義字,前者表示較為粗糙的切割和處理方式,而後者則較為細緻,以 lock 來說,就是「大而粗略」和「細緻且多處」的差異。 [name=jserv] * 閱讀[Measuring Lock Contention](http://0pointer.de/blog/projects/mutrace.html) >> 做實驗了嗎?你們設計哪些實驗?不要只是「看」,這樣無從驗證自己的認知 [name=jserv] >> 提醒中英文關鍵字間要以空白區隔喔![color=red][name=課程助教] #### task_run未加入pthread_cond_t機制 ``` Mutex # Locked Changed Cont. tot.Time[ms] avg.Time[ms] max.Time[ms] Flags 1 177 43 27 0.048 0.000 0.002 Mx.--. 0 20 15 9 0.011 0.001 0.001 M-.--. 2 13 8 0 0.004 0.000 0.000 M-.--. |||||| /||||| Object: M = Mutex, W = RWLock /|||| State: x = dead, ! = inconsistent /||| Use: R = used in realtime thread /|| Mutex Type: r = RECURSIVE, e = ERRRORCHECK, a = ADAPTIVE /| Mutex Protocol: i = INHERIT, p = PROTECT / RWLock Kind: r = PREFER_READER, w = PREFER_WRITER, W = PREFER_WRITER_NONREC mutrace: Note that the flags column R is only valid in --track-rt mode! ``` #### task_run加入pthread_cond_t機制 ``` Mutex # Locked Changed Cont. tot.Time[ms] avg.Time[ms] max.Time[ms] Flags 1 53 30 11 0.070 0.001 0.016 M-.--. 0 20 12 3 0.009 0.000 0.002 M-.--. 2 13 5 0 0.004 0.000 0.000 M-.--. |||||| /||||| Object: M = Mutex, W = RWLock /|||| State: x = dead, ! = inconsistent /||| Use: R = used in realtime thread /|| Mutex Type: r = RECURSIVE, e = ERRRORCHECK, a = ADAPTIVE /| Mutex Protocol: i = INHERIT, p = PROTECT / RWLock Kind: r = PREFER_READER, w = PREFER_WRITER, W = PREFER_WRITER_NONREC mutrace: Note that the flags column R is only valid in --track-rt mode! ``` >> 發現lock數量跟contend都大幅降低[name=儂偉] --- ### Lock-free * 嘗試透過 atomic 操作來實作 >> 說好的程式碼呢? [name=jserv] ```c= static void *task_run(void *data) { (void) data; while (1) { task_t *ret , *ret_last, *ret_next, *the_pool_head, *tmp; ret = tmp = pool->tail; ret_last = ret->last; ret_next = ret->next; the_pool_head = pool->head; if (ret != NULL && !is_marked_ref(ret)) { if (CAS_PTR(&pool->tail, ret, ret_last) == ret) { ret = get_unmarked_ref(ret_last); ret_last = ret->last; ret_next = ret->next; if (ret && !is_marked_ref(ret_next)) { if (CAS_PTR(&ret->next, ret_next, NULL) == ret_next) { ret->next = NULL; FAD_U32(&(pool->size)); task_t *_task = tmp; if (_task) { if (!_task->func) { tqueue_push(pool, _task); break; } else { _task->func(_task->arg); free(_task); } } } } else if (!ret && !is_marked_ref(the_pool_head)) { if (CAS_PTR(&pool->head, the_pool_head, NULL) == the_pool_head) { FAD_U32(&(pool->size)); task_t *_task = tmp; if (_task) { if (!_task->func) { tqueue_push(pool, _task); break; } else { _task->func(_task->arg); free(_task); } } } } } } else if (ret == NULL) { task_t *_task = tmp; if (_task) { if (!_task->func) { tqueue_push(pool, _task); break; } else { _task->func(_task->arg); free(_task); } } } } pthread_exit(NULL); } ``` >> 目前 mark 部份仍有問題 解決中... --- 讀書: * [Common threads: POSIX threads explained](http://www.ibm.com/developerworks/library/l-posix2/) * [Measuring Lock Contention](http://0pointer.de/blog/projects/mutrace.html) * [More Mutrace](http://0pointer.net/blog/projects/mutrace2.html) * [POSIX Threads for Windows](https://www.sourceware.org/pthreads-win32/manual/pthread_mutex_init.html) --- 參考資料 * [A07: mergesort-concurrent](https://hackmd.io/s/rJ-GWtJ0) * [宅學習-作業系統](http://sls.weco.net/node/21326) * [Thread Pool 模式](http://openhome.cc/Gossip/DesignPattern/ThreadPool.htm) * [Toward Concurrency](https://hackmd.io/s/H10MXXoT) * [Wikipedia-CAS](https://en.wikipedia.org/wiki/Compare-and-swap) * [Wikipedia-FAI](https://en.wikipedia.org/wiki/Fetch-and-add) * [gcc Atomic-Builtins doc](https://gcc.gnu.org/onlinedocs/gcc-4.1.2/gcc/Atomic-Builtins.html)