#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>

int myglobal;

void *thread_function(void *arg)
{
    for (int i = 0; i < 20; i++) {
        int j = myglobal;
        j = j + 1;
        printf(".");
        fflush(stdout);
        sleep(1);
        myglobal = j;
    }
    return NULL;
}

int main(void)
{
    pthread_t mythread;

    if (pthread_create(&mythread, NULL,
                       thread_function, NULL)) {
        printf("error creating thread.");
        abort();
    }
    for (int i = 0; i < 20; i++) {
        myglobal = myglobal + 1;
        printf("o");
        fflush(stdout);
        sleep(1);
    }
    if (pthread_join(mythread, NULL)) {
        printf("error joining thread.");
        abort();
    }
    printf("\nmyglobal equals %d\n",myglobal);
    exit(0);
}

執行結果

o.o.o.o.o..o.o.o.o.o.o.o.oo..oo.o..o.oo.
myglobal equals 21

myglobal 起始值為 0,主執行緒與新執行緒各將 myglobal 增加 20,照理說應該是 40,但輸出 21 原因為何?
因為兩條執行緒同時執行,當 thread_fucntion 把 j 值寫回 myglobal,他會複寫主執行緒中的修改。
此便為程式同步執行所造成的競爭危害問題。


將 concurrent / concurrency 統一翻譯為「並行」 jserv

  • 競爭危害(race conditon)

    兩個工作同時存記憶體的過程中,並不會即時更新記憶體資料,造成最後用舊的資料計算,產生錯誤結果。

  • 臨界區域(critical section)

    共用資源被並行存取可能造成不可預期的錯誤,所以有些程式共用資源的存取會被保護,這些保護區域就稱為臨界區域。其只能被一個 process 執行。

  • 解決 critical section 必須滿足3個條件:

    • Mutual Exclusion(互斥鎖,簡稱 Mutex)

      Mutex 是一種並行操作(concurrency control),用在多執行緒程式設計中,目的為避免競爭危害(race conditions),不會有兩個 processes 同時進入同一個臨界區域(critical section)。
    • Progress

      (1)當沒有 process 在 critical section 內執行時,不能阻擋其他請求進入 critical section 的 processes 進入。
      (2)從那些想進入 critical section 之 processes 決定一個 process 可以進入 critical section 的決定時間是有限的。(No Deadlock)
    • Bounded Waiting

      Process 要求進入 critical section 有個等待時間的bound、一個限制,也就是一個 process 不能一直佔住 critical section。i.e. 若有 n 個 processes 皆想進入 critical section,則任一 process 至多等 n-1 次後即可進入。(No Starvation)
  • 解決 critical section 的方法:

    • Software Based Solution - Peterson's Solution

    • Synchronization Hardware

      以硬體方式解決,藉由 lock 來保護 critical section。

      • Atomic Operation 參考:makelinux-Atomic Operation

        原子操作 (Atomic Operation) 意思是不可被中斷的操作,如同原子一般是不可再分割的。
        • Test and Set (TAS)

        • Compare and Swap (CAS)

          • 為一種在多執行緒系統中實現同步的原子操作,操作過程虛擬碼如下
          ​​​​​​​​​​​​​​​​CAS(*位址, 舊值, 新值){
          ​​​​​​​​​​​​​​​​    if(*位址!=舊值) 
          ​​​​​​​​​​​​​​​​        return 0 //失敗
          ​​​​​​​​​​​​​​​​    *位址 = 新值
          ​​​​​​​​​​​​​​​​        return 1 //成功
          ​​​​​​​​​​​​​​​​}
          
          使用上會記錄某塊記憶體的舊值,如果這塊記憶體在期間內沒被修改過,舊值會與記憶體位址中的值相同,這時CAS操作會成功執行並將該記憶體位址存的值變為新值;反之,若在期間被修改過,則記憶體位址中的值會與舊值不同,CAS操作失敗,新值不被寫入。
    • Semaphore

      • mutex 與 semaphore 的差異? 參考:Mutexes VS Semaphores 大揭秘Semaphore 和 Mutex 的差異

        Mutex 和 Semphore 都是在解決同步問題的機制,用在保護 critical section,確保多個 process 同時存取資源時,執行結果不會因為執行程序的時間先後的影響而導致錯誤。然而其差異為:
        • semaphore 是一個 counter 的概念,可以設定有多少個 process 存取資源,如果存取的 process 數量到達上限,其他要求存取資源的 process 會被送到 wait queue 中,等待有 process 釋放資源,再繼續執行。
        • Mutex 就像是一把鑰匙,會記錄擁有這把鑰匙的 owner 是哪個 process ,process 需要取得鑰匙後才能進到 critical section 存取資源,等到存取完成後才釋放出這把鑰匙的擁有權,達到 mutual exclusion。
        • mutex 與 binary semapohre 差異為何?
          mutex 會導致 priority inversion。
        • 優先權倒置 (Priority Inverse) 參考:Jserv's blog-Priority inversion 簡介

          在此情形下,高優先權的 task 會被低優先權的搶先(preempted),使得兩個 task 的相對優先權被顛倒。
          如下圖所示,當具備中度優先權的 task (簡稱 M) 搶先 (preempt) 一個原本享有 resource 的低優先權的 task (簡稱 L),而該 resource 又是一個高優先權的 task (簡稱 H) 所等待。問題就在於,H 與 L 共享 resource,當 L 被 preempt 時,就該放下 resource,這是合理的行為,而原本 H 就在等待 resource 的釋放,因為隨後就會使用到。但問題是,這段 latency 中,M 把這個規則打破,先行 preempt 了 L,也就把 L 的 resource 給「搶走」,這下有趣的事情就發生了。原本 L 與 H 具備相對高低的優先權差異,但因為 M 的介入,造成延遲,這樣的過程可能就讓 H 發生超出 deadline 的情況。我們稱這樣逆轉 L 與 H 的優先權順序的現象為 priority inversion。
  • 經典同步問題

    • Bounded-Buffer Problem

      • 描述
        兩個行程 (生產者、消費者) 共享 buffer(由容量可分為 Bounded (有限的) 及 Unbounded (無限的) ,此討論有限情況),生產者負責產生資料並放入 buffer,然後重複這個過程;同時消費者從 buffer 消耗資料。本問題的關鍵在於,確保 buffer 滿時生產者不再放入資料;buffer 空時消費者不再消耗資料。
      • 解決方法
        生產者在 buffer 滿時進入睡眠 (sleep) 或放棄資料,當消費者消耗資料時再喚醒生產者放資料進 buffer;同理,消費者在 buffer 空時進入睡眠,當生產者放入資料時再喚醒消費者消費資料。可以透過 semapohres 達成,然而不完整解法可能造成死結 (deadlock),即兩個行程都進入睡眠等待被喚醒。
    • Readers and Writers Problem

      • 描述
        多個執行緒想要同時存取 (access) 同一個共用資源 (shared resource),有些想要讀 (read) 稱 reader 、有些想要寫 (write) 稱 writer,共用資源可以同時被讀但不能同時被寫。
        • First readers-writers problem
          writers 可能 starve
        • Second readers-writers problem
          readers 可能 starve
      • 解決方法
        Readers-Writer lock
    • Dining-Philosophers Problem

      • 描述
        五個哲學家只能思考和吃,他們圍著中間放有一碗飯的圓桌就座,每個哲學家左右手各有一支筷子,當一個哲學家餓了就會試著拿起左右各一支筷子吃飯,哲學家彼此無法得知其他人要在吃或在思考。問題的關鍵在於如何不讓哲學家 starve,如果哲學家都拿左邊的筷子,就會永遠等待右邊的筷子造成 Deadlock。
      • 解決方法
        Represent each chopstick with a semaphore

論文研讀:A Pragmatic Implementation Of Non-Blocking Linked-Lists

This paper present a novel implementation of linked-lists which is non-blocking, linearizable and which is based on the compare-and-swap(CAS)operation found on contemporary processoers.
問題:若是在移除節點(10)的同時,插入新節點(20),可能會造成新插入節點的遺失

解決:作者提出使用兩個分開的 CAS (compare and swap) 分兩階段移除節點

  1. Logically deleted : 先 mark 欲刪除節點(10)的 next 指標
  2. Physically deleted : 刪除該節點(10)
    所以假設要並行插入新節點(20)時,程式會觀察(10)是否已被 logically deleted,若有則嘗試 logically delete 它再插入新節點。

Thread Pool實作原理

在 Thread-Per-Message 模式中,當每次請求來到,就建立一個新的執行緒,用完就不再使用,然而執行緒的建立需要系統資源,對於一個接受許多請求的情況,不斷的建立新執行緒,會導致系統效能的降低。

搭配閱讀: Thread pools and work queues jserv

若能重複使用所建立的執行緒,而不是用完就丟,可以有效的重複利用資源。在 Worker Thread 模式的範例中,預先建立好執行緒,當請求佇列有新請求時,通知等待的執行緒取出請求進行處理,其實就是一種重複使用執行緒的方式。
示意圖如下:

digraph hierarchy {
    nodesep=0.5 // increases the separation between nodes
    node [color=black,fontname=Courier,shape=box] //All nodes will this shape and colour
    edge [color=black, style=dashed] // All the lines look like this
    Manager->{Worker1 Worker2 Worker3}
}

為了分配工作,在 worker thread 實作 Task 的機制。每個主要的操作會先被放進 task queue 裡頭,空閒的 thread 再從 task queue 裡頭提取 task 執行,如下面的步驟:

digraph {
開始Process[shape="box", style=rounded];
是否為結束Task[shape="diamond"];
結束Process[shape="box", style=rounded];
開始Process->提取Task->是否為結束Task
是否為結束Task->重發結束Task[label="是"]
重發結束Task->結束Process
是否為結束Task->執行Task[label="否"]
執行Task->提取Task;
}

大多數 thread pool 實做都離不開 lock 的使用,如 pthread_mutex 結合 (condition variable) pthread_cond。一般來說,lock 的使用對於程式效能影響較大,雖然現有的 pthread_mutex 在 lock 的取得 (acquire) 與釋放,已在 Linux 核心和對應函式庫進行效能提昇,但我們仍會希望有不仰賴 lock 的 thread pool 的實做。

如上圖所示,workqueue (工作佇列) 由 main thread (主執行緒) 和 worker thread 共享,main thread 將任務放進 workqueue,worker thread 從 workqueue 中取出任務執行。要注意到,共享 workqueue 的操作必須在 mutex 的保護下安全進行,main thread 將任務放進 workqueue 時,若偵測到目前待執行的工作數目小於 worker thread 總數,則要透過 condition variable 喚醒可能處於等待狀態的 worker thread。

Lock-Free Thread Pool 實作原理


為解決 lock-free 所面臨的議題,我們一定要避免共享資源的競爭 (contention),因此將共享 workqueue 加以拆分成每 worker thread 一個 workqueue 的方式,如上圖。

沒有顯著進展!你們到底做了什麼? jserv

未修改前

  • sort 1 thread mutrace: Total runtime is 0.734 ms.
  • sort 2 thread mutrace: Total runtime is 1.097 ms.

發現thread開越多時間越長儂偉

參考mbrossard 完整的 ThreadPool 原始碼 去優化threadPool

未優化(4thread)

 Mutex #   Locked  Changed    Cont. tot.Time[ms] avg.Time[ms] max.Time[ms]  Flags
       1      177       43       27        0.048        0.000        0.002 Mx.--.
       0       20       15        9        0.011        0.001        0.001 M-.--.
       2       13        8        0        0.004        0.000        0.000 M-.--.
                                                                           ||||||
                                                                           /|||||
          Object:                                     M = Mutex, W = RWLock /||||
           State:                                 x = dead, ! = inconsistent /|||
             Use:                                 R = used in realtime thread /||
      Mutex Type:                 r = RECURSIVE, e = ERRRORCHECK, a = ADAPTIVE /|
  Mutex Protocol:                                      i = INHERIT, p = PROTECT /
     RWLock Kind: r = PREFER_READER, w = PREFER_WRITER, W = PREFER_WRITER_NONREC 

mutrace: Note that the flags column R is only valid in --track-rt mode!

需要詳述優化策略,以及實作 fine-grained locks 和接著做到 lock-free jserv

著手修改 task_run

pthread_mutex_lock(&(pool->mutex)); while ( pool->size == 0) { pthread_cond_wait(&(pool->cond), &(pool->mutex)); } if (pool->size == 0) break; pthread_mutex_unlock(&(pool->mutex)); task_t *_task = tqueue_pop(pool);

優化後(thread)

Mutex #   Locked  Changed    Cont. tot.Time[ms] avg.Time[ms] max.Time[ms]  Flags
       1       20       17        5        0.009        0.000        0.002 M-.--.
       0       13        9        2        0.020        0.002        0.007 Mx.--.
                                                                           ||||||
                                                                           /|||||
          Object:                                     M = Mutex, W = RWLock /||||
           State:                                 x = dead, ! = inconsistent /|||
             Use:                                 R = used in realtime thread /||
      Mutex Type:                 r = RECURSIVE, e = ERRRORCHECK, a = ADAPTIVE /|
  Mutex Protocol:                                      i = INHERIT, p = PROTECT /
     RWLock Kind: r = PREFER_READER, w = PREFER_WRITER, W = PREFER_WRITER_NONREC 

mutrace: Note that the flags column R is only valid in --track-rt mode!

發現lock數量大大減少儂偉

./sort 1 8
input unsorted data line-by-line

sorted results:
[8] [7] [6] [5] [4] [3] [2] [1] 

sorted results:
[1] [2] [3] [4] [5] [6] [7] [8] 

./sort 2 8
input unsorted data line-by-line

sorted results:
[8] [7] [6] [5] [4] [3] [2] [1] 

只要開超過2個thread就執行失敗,待解決儂偉
發現原因mbrossard 的 ThreadPool 直接在cut套用會產生鎖兩次 造成死鎖,重新動手改原本的thread_pool儂偉

thread_pool 加入pthread_cond_t機制 參考mbrossard 的 ThreadPool

typedef struct { pthread_t *threads; uint32_t count; task_t *head, *tail; pthread_mutex_t mutex; pthread_cond_t cond; uint32_t size; } tpool_t;
  • 將tqueue與tpool_t合併為tpool_t,使pool結構更簡單

新增bench

bench:intput.txt for i in 1 2 4 8 16 32 64; do \ while read line;do \ #逐行讀取 echo $$line ; \ done < intput.txt | ./sort $$i $ 10000 ;\ done # '<':將原本需要由鍵盤輸入的資料,改由檔案內容('input.txt')來取代 # '|':管線命令。僅能處理經由前面一個指令傳來的正確資訊,也就是 standard output 的資訊,對於 stdandard error 並沒有直接處理的能力。

產生測資

  • makefile
random_num: random_num.c $(CC) $(CFLAGS) $^ -o $@ intput.txt: random_num ./random_num $ 10000
  • random_num.c 利用洗牌來達到均勻分布的目的
/*random_num.c*/ srand(time(NULL)); int num_count, i; FILE *input = fopen("input.txt", "w"); num_count = atoi(argv[1]); int *arr; int pos , tmp; arr = (int *) malloc(sizeof(int) * num_count); for (i = 0; i < num_count; i++) //產生num_count筆數據 arr[i] = i + 1; for (i = 0; i < num_count; i++) { //進行洗牌 pos = rand() % num_count; tmp = arr[i]; arr[i] = arr [pos]; arr[pos] = tmp; //進行arr[i]及arr[pos]交換 } for (i = 0; i < num_count; i++) fprintf(input, "%d\n", arr[i]); free(arr); fclose(input);

程式碼縮排為 4 個空白,不是 Tab,請修正 jserv
已修正儂偉

比較圖

由於實驗用CPU為4thread,所以4thread最快,並且超過4的時間就會越長儂偉


Lock contention

  • 閱讀Lock contention
    • lock overhead : 使用 lock 的額外資源,像是記憶體空間、初始化和摧毀 lock 的 CPU 時間等等。
    • lock contention : 這會發生在一個程序或是執行緒要去請求一個正在被使用中的 lock 時。
    • deadlock : 當兩個 task 正在搶奪一個正被另一個 task 使用爭的 lock。除非有什麼完成了,否則這兩個 task 會一直等待。
    • granularity : 指的是 lock 中所保護的 data 數量。如果選擇少量的 lock 去保護大量的資料的話,會有較少的 lock overhead 但是較差的 concurrency 表現,這是因為會有 lock contention 的問題產生。 The more coarse the lock (這個不知道怎麼翻), lock 越容易去停止不相關的多工請求。

coarse 和 fine-grained 互為反義字,前者表示較為粗糙的切割和處理方式,而後者則較為細緻,以 lock 來說,就是「大而粗略」和「細緻且多處」的差異。 jserv

做實驗了嗎?你們設計哪些實驗?不要只是「看」,這樣無從驗證自己的認知 jserv

提醒中英文關鍵字間要以空白區隔喔!課程助教

task_run未加入pthread_cond_t機制

 Mutex #   Locked  Changed    Cont. tot.Time[ms] avg.Time[ms] max.Time[ms]  Flags
       1      177       43       27        0.048        0.000        0.002 Mx.--.
       0       20       15        9        0.011        0.001        0.001 M-.--.
       2       13        8        0        0.004        0.000        0.000 M-.--.
                                                                           ||||||
                                                                           /|||||
          Object:                                     M = Mutex, W = RWLock /||||
           State:                                 x = dead, ! = inconsistent /|||
             Use:                                 R = used in realtime thread /||
      Mutex Type:                 r = RECURSIVE, e = ERRRORCHECK, a = ADAPTIVE /|
  Mutex Protocol:                                      i = INHERIT, p = PROTECT /
     RWLock Kind: r = PREFER_READER, w = PREFER_WRITER, W = PREFER_WRITER_NONREC 

mutrace: Note that the flags column R is only valid in --track-rt mode!

task_run加入pthread_cond_t機制

 Mutex #   Locked  Changed    Cont. tot.Time[ms] avg.Time[ms] max.Time[ms]  Flags
       1       53       30       11        0.070        0.001        0.016 M-.--.
       0       20       12        3        0.009        0.000        0.002 M-.--.
       2       13        5        0        0.004        0.000        0.000 M-.--.
                                                                           ||||||
                                                                           /|||||
          Object:                                     M = Mutex, W = RWLock /||||
           State:                                 x = dead, ! = inconsistent /|||
             Use:                                 R = used in realtime thread /||
      Mutex Type:                 r = RECURSIVE, e = ERRRORCHECK, a = ADAPTIVE /|
  Mutex Protocol:                                      i = INHERIT, p = PROTECT /
     RWLock Kind: r = PREFER_READER, w = PREFER_WRITER, W = PREFER_WRITER_NONREC 

mutrace: Note that the flags column R is only valid in --track-rt mode!

發現lock數量跟contend都大幅降低儂偉


Lock-free

  • 嘗試透過 atomic 操作來實作

說好的程式碼呢? jserv

static void *task_run(void *data) { (void) data; while (1) { task_t *ret , *ret_last, *ret_next, *the_pool_head, *tmp; ret = tmp = pool->tail; ret_last = ret->last; ret_next = ret->next; the_pool_head = pool->head; if (ret != NULL && !is_marked_ref(ret)) { if (CAS_PTR(&pool->tail, ret, ret_last) == ret) { ret = get_unmarked_ref(ret_last); ret_last = ret->last; ret_next = ret->next; if (ret && !is_marked_ref(ret_next)) { if (CAS_PTR(&ret->next, ret_next, NULL) == ret_next) { ret->next = NULL; FAD_U32(&(pool->size)); task_t *_task = tmp; if (_task) { if (!_task->func) { tqueue_push(pool, _task); break; } else { _task->func(_task->arg); free(_task); } } } } else if (!ret && !is_marked_ref(the_pool_head)) { if (CAS_PTR(&pool->head, the_pool_head, NULL) == the_pool_head) { FAD_U32(&(pool->size)); task_t *_task = tmp; if (_task) { if (!_task->func) { tqueue_push(pool, _task); break; } else { _task->func(_task->arg); free(_task); } } } } } } else if (ret == NULL) { task_t *_task = tmp; if (_task) { if (!_task->func) { tqueue_push(pool, _task); break; } else { _task->func(_task->arg); free(_task); } } } } pthread_exit(NULL); }

目前 mark 部份仍有問題 解決中


讀書:


參考資料

Select a repo