Try   HackMD

2016q3 Homework3 (mergesort-concurrent)

contributed by <carolc0708>

預期目標

  • 作為 concurrency 的展示案例
  • 學習 POSIX Thread Programming,特別是 synchronization object
  • 為日後效能分析和 scalability 研究建構基礎建設
  • 學習程式品質分析和相關的開發工具

作業要求

  • 將 merge sort 的實做改為可接受 phonebook-concurrent 的 35 萬筆資料輸入的資料檔

    • 字典檔資料需要事先用 sort -R 處理過
    • 思考如何得到均勻分佈的亂數排列,並且設計自動測試的機制
  • [ ]研究 thread pool 管理 worker thread 的實做,提出實做層面的不足,並且參照 concurrent-ll,提出 lock-free 的實做

  • 學習 concurrent-ll (concurrent linked-list 實作) 的 scalability 分析方式,透過 gnuplot 製圖比較 merge sort 在不同執行緒數量操作的效能

    • 注意到 linked list 每個節點配置的記憶體往往是不連續,思考這對效能分析的影響
  • 一併嘗試重構 (refactor) 給定的程式碼,使得程式更容易閱讀和維護。延續 A05: introspect,不只是在共筆上用文字提出良性詳盡的批評,也該反映在程式碼的變革

  • 共筆的內容儘量用 GraphViz 製作
  • 截止日期:
    • 08:00AM Oct 7, 2016 (含) 之前
    • 越早在 GitHub 上有動態、越早接受 code review,評分越高

挑戰題

Hint

計算merge sort 的空間複雜度,建立memory pool給merge sort 使用,分析完後配置最大空間,但是同一時間可能會同時有兩個地方在merge,防範overlap ,要把記憶體切成若干區段,最後的資料搬動(找memory pool的實作),malloc的回收排序的資料在記憶體中不連續(找link-list cache)

Thread Models

The boss/worker model

  • The boss creates each worker thread, assigns it tasks, and, if necessary, waits for it to finish.
  • In the pthread_create call it uses to create each worker thread, the boss specifies the task-related routine the thread will execute.
  • After creating each worker, the boss returns to the top of its loop to process the next request.
  • Once finished, each worker thread can be made responsible for any output resulting from its task, or it can synchronize with the boss and let it handle its output.
main()	
/* The boss */	
{	
forever {	
          get a request	
          switch request	
          case X : pthread_create( ... taskX)	
          case Y : pthread_create( ... taskY)	
          .	
          .	
          .	
}	
}	
taskX() /* Workers processing requests of type X */	
{	
perform the task, synchronize as needed if accessing shared resources	
done	
}	
taskY() /* Workers processing requests of type Y */	
{	
perform the task, synchronize as needed if accessing shared resources	
done	
}
  • Alternatively, the boss could save some run-time overhead by creating all worker threads up front. ==> thread pool
main()	
/* The boss */	
{	
for the number of workers	
         pthread_create( ... pool_base )	
forever {	
         get a request	
         place request in work queue	
         signal sleeping threads that work is available	
}	
}	
pool_base() /* All workers */	
{	
forever {	
         sleep until awoken by boss	
         dequeue a work request	
         switch	
           case request X: taskX()	
           case request Y: taskY()	
                  .	
                  .	
                  .	
}	
}
  • it is important that you minimize the frequency with which the boss and workers communicate
  • can't create too many interdependencies among the workers, or all workers will suffer a slowdown.

The peer model (workcrew model)

  • all threads work concurrently on their tasks without a specific leader.
  • one thread must create all the other peer threads when the program starts, this thread then acts as just another peer thread that processes requests, or suspends itself waiting for the other peers to finish.
  • this model makes each thread responsible for its own input: A peer knows its own input ahead of time, has its own private way of obtaining its input, or shares a single point of input with other peers.
main()	
{	
   pthread_create( ... thread1 ... task1 )	
   pthread_create( ... thread2 ... task2 )	
   .	
   .	
   .	
   signal all workers to start	
   wait for all workers to finish	
   do any clean up	
}	
task1()	
{	
   wait for start	
   perform task, synchronize as needed if accessing shared resources	
   done	
}	
task2()	
{	
   wait for start	
   perform task, synchronize as needed if accessing shared resources	
   done	
}
  • suitable for applications that have a fixed or well-defined set of inputs, such as matrix multipliers, parallel database search engines, and prime number generators.
    ==> e.g. 一個地區分很多區塊,每個區塊的事情丟給不同的 thread
  • Because there is no boss, peers themselves must synchronize their access to any common sources of input.
  • However, like workers in the boss/worker model, peers can also slow down if they must frequently synchronize to access shared resources.

The pipeline model

  • The pipeline model assumes:

    • A long stream of input
    • A series of suboperations (known as stages or filters) through which every unit of input must be processed
    • Each processing stage can handle a different unit of input at a time
  • Applications in which the pipeline might be useful are image processing and text processing or any application that can be broken down into a series of filter steps on a stream of input.

main()	
{	
  pthread_create( ... stage1 )	
  pthread_create( ... stage2 )	
  .	
  .	
  .	
  wait for all pipeline threads to finish	
  do any clean up	
}	
stage1()	
{	
forever {	
         get next input for the program	
         do stage 1 processing of the input	
         pass result to next thread in pipeline	
         }	
}	
stage2()	
{	
forever {	
         get input from previous thread in pipeline	
         do stage 2 processing of the input	
         pass result to next thread in pipeline	
         }	
}	
stageN()	
{	
forever {	
         get input from previous thread in pipeline	
         do stage N processing to the input	
         pass result to program output	
         }	
}
  • We could add multiplexing or demultiplexing to this pipeline, allowing multiple threads to work in parallel on a particular stage.
  • We could also dynamically configure the pipeline at run time, having it create and terminate stages (and the threads to service them) as needed.
  • the overall throughput of a pipeline is limited by the thread that processes its slowest stage.
  • when designing, programmer should aim at balancing the work to be performed across all stages; that is, all stages should take about the same amount of time to complete.

Mutrace: 分析 mutex contention

  • Lock contention: this occurs whenever one process or thread attempts to acquire a lock held by another process or thread. The more fine-grained the available locks, the less likely one process/thread will request a lock held by the other. (For example, locking a row rather than the entire table, or locking a cell rather than the entire row.)

  • mutrace v.s. valgrind/drd

    In contrast to valgrind/drd it does not virtualize the CPU instruction set, making it a lot faster. In fact, the hooks mutrace relies on to profile mutex operations should only minimally influence application runtime. Mutrace is not useful for finding synchronizations bugs, it is solely useful for profiling locks.

  • About mutrace

    • mutrace is implemented entirely in userspace.
    • build your application with -rdynamic to make the backtraces mutrace generates useful.
    • -r| track-rt: checks on each mutex operation wheter it is executed by a realtime thread or not.
      =>use this to track down which mutexes are good candidates for priority inheritance.
    • matrace that can be used to track down memory allocation operations in realtime threads.
  • 將原始碼抓下來看有更多詳細內容可以參考

    ​​​​$ git clone http://git.0pointer.net/clone/mutrace.git
    
    • 其中 mutrace.in 裡面可以看到其他不同 flags 的功能
    • 也有 matrace 的內容可以參考
  • 安裝 mutrace

$ sudo apt-get install mutrace
  • mergesort-concurrent 程式執行看看:
(for i in {1..8}; do echo $RANDOM; done) | mutrace ./sort 4 8

結果如下:

mutrace: 0.2 sucessfully initialized for process sort (pid 4814).
input unsorted data line-by-line

sorted results:
[9602] [10669] [11911] [16147] [19714] [22776] [22870] [32443]

mutrace: Showing statistics for process sort (pid 4814).
mutrace: 3 mutexes used.

Mutex #1 (0x0x216c9b0) first referenced by:
        /usr/lib/mutrace/libmutrace.so(pthread_mutex_init+0xf2) [0x7f5a530484b2]
        ./sort(tqueue_init+0x38) [0x401277]
        ./sort(tpool_init+0x6a) [0x4014cc]
        ./sort(main+0x161) [0x401c74]
        /lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xf0) [0x7f5a52a7f830]

Mutex #0 (0x0x7f5a5064e860) first referenced by:
        /usr/lib/mutrace/libmutrace.so(pthread_mutex_lock+0x49) [0x7f5a530486b9]
        /lib/x86_64-linux-gnu/libgcc_s.so.1(_Unwind_Find_FDE+0x2c) [0x7f5a5044afec]
        [(nil)]

mutrace: Showing 2 most contended mutexes:

 Mutex #   Locked  Changed    Cont. tot.Time[ms] avg.Time[ms] max.Time[ms]  Flags
       1       62        5        1        0.005        0.000        0.001 Mx.--.
       0       20        3        0        0.002        0.000        0.000 M-.--.
                                                                           ||||||
                                                                           /|||||
          Object:                                     M = Mutex, W = RWLock /||||
           State:                                 x = dead, ! = inconsistent /|||
             Use:                                 R = used in realtime thread /||
      Mutex Type:                 r = RECURSIVE, e = ERRRORCHECK, a = ADAPTIVE /|
  Mutex Protocol:                                      i = INHERIT, p = PROTECT /
     RWLock Kind: r = PREFER_READER, w = PREFER_WRITER, W = PREFER_WRITER_NONREC

mutrace: Note that the flags column R is only valid in --track-rt mode!

mutrace: Total runtime is 0.375 ms.

mutrace: Results for SMP with 4 processors.
  • 表格意思:

    • Locked: how often the mutex was locked during the entire runtime.
    • Changed: how often the owning thread of the mutex changed.
      =>If the number is high this means the risk of contention is also high.
    • Cont.: how often the lock was already taken when we tried to take it and we had to wait.
      ==> Mutex #1 is the most contended lock
    • tot.Time[ms]: for how long during the entire runtime the lock was locked
    • avg.Time[ms]: the average lock time
    • max.Time[ms]: the longest time the lock was held.
    • Flags: what kind of mutex this is (recursive, normal or otherwise).

可用 addr2line 來找到實際對應的程式行數:

  • 注意: 要確保編譯時加入 -g 參數,確保包含 debug info 的執行檔正確產生。以 (tqueue_init+0x38) [0x401277] 這個地址來說,對應的原始程式碼為:
$ addr2line -e sort 0x401277
/home/carol/mergesort-concurrent/threadpool.c:15

我用 0x38 結果是 ??:0 Carol Chen

W2-QA: 以 linked-list 實作 recursive merge sort

  • Linked List Bubble Sort 裡頭,對 linked list 的排序演算法從 recursive bubble sort 更換為 recursive merge sort

MergeSort(headRef)

  1. If head is NULL or there is only one element in the Linked List
    then return.
  2. Else divide the linked list into two halves.
    FrontBackSplit(head, &a, &b); /* a and b are two halves */
  3. Sort the two halves a and b.
    MergeSort(a);
    MergeSort(b);
  4. Merge the sorted a and b (using SortedMerge() discussed here)
    and update the head pointer using headRef.
    *headRef = SortedMerge(a, b);

mergesort-concurrent 程式碼分析 & refactoring

資料結構: linked-list

list.h

typedef intptr_t val_t;
  • 定義 intptr_t 的目的是讓 node 中的資料可以是指標型態,也可以是一般資料型態。

  • When/Why to use intptr_t in type-casting: 我們再將 pointer 轉成 integer 時,32 bits 或 64 bits 的 machine 下會有 pointer 長度的差別。會使用到 intptr_t,是因為它能夠做到以下:

    ​#ifdef MACHINE 64
    ​  typedef long intptr;
    ​#else // MACHINE 32
    ​  typedef int intptr;
    ​#endif
    

    往後我們只需 #include <stdint> 就能做到上述,免除麻煩的型態轉換。除此之外,也是確保型態轉換上不會出問題的保守作法。

  • 參照 Stackoverflow 對於 intptr_t討論: 相較於 void *intptr_t 的位址可以做 bitwise operation 而 void * 不能。然而,若要做 bitwise operation,使用 uintptr_t 會比 intptr_t 更佳。

typedef struct node {
    val_t data;
    struct node *next;
} node_t;

一般的 linked-list 資料型態,每個 node 當中會有儲存自己 data 的部分和指向下一個 node 的指標 next

typedef struct llist {
    node_t *head;
    uint32_t size;
} llist_t;

一串 linked-list 必須要紀錄 head 的位置,在這裡還多將 linked-list 的長度 size 包入資料結構中。

list.c

//在 next 前方放入一個新的 node,並回傳指向該 node 之指標
static node_t *node_new(val_t val, node_t *next);
//初始化一串新的 linked-list
llist_t *list_new(); 
//在 linked-list 的尾端加入一個新的 node
int list_add(llist_t *the_list, val_t val);
  • 註解中提到: 若傳入的 val 已經存在,則不再重新建立新的 node,原程式碼中並沒有相對應的檢查機制。
    ==> 設計一套檢查機制:
//check if value already exist node_t *cur = list->head; while(cur){ if(cur-> data == val) return list; cur = cur->next; }
  • list_add 做完之後回傳 0 並沒有太大的意義,回傳值須跟著狀況調整。
    ==> 改為回傳建好後新的 llist 指標
//回傳 linked-list 中,第 index 個 node 的指標
node_t *list_nth(llist_t *the_list, uint32_t index);
  • index out of range 條件判斷有誤,在這裡依照習慣把 nth 的 n 當作 array 的 index 來看,排序為 0~list->size-1
    ==> 判斷 out of range 條件改為 if(idx > list->size-1)
    ==> 更改變數名稱(因個人習慣): 移動指標 head 改為 cur
    ==> 多加一行 if(idx == 0) return cur;
//印出整串 linked-list 內容
void list_print(llist_t *the_list);
  • FIXME 處提到須先驗證 list 是否已排序再印出

Threadpool

threadpool.h

typedef struct _task {
    void (*func)(void *);
    void *arg;
    struct _task *next, *last;
} task_t;

定義 task 型態,基本上會包含工作函式*func 和傳入參數 *arg。除此之外,task queue 結構中,單一 task 的前後指標 *next*last 也一併定義在這裡。

typedef struct {
    task_t *head, *tail;
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    uint32_t size;
} tqueue_t;

task queue 的型態。 紀錄 *head*tail 的 task、queue 的大小 sizemutex 以及 condition variable cond

typedef struct {
    pthread_t *threads;
    uint32_t count;
    tqueue_t *queue;
} tpool_t;

threadpool 的型態。當中定義了指向所有 pthread 的指標 *threads、 thread 的數目 count、指向 task queue 的指標 *queue

threadpool.c

//釋放單一 task 先前所配置的記憶體空間
int task_free(task_t *the_task);
  • 該函式回傳值無意義,須隨記憶體釋放情況做調整
  • 無法確定 task 的記憶體空間是由 malloc 配置,free 可能會出現問題
//初始化 task queue
int tqueue_init(tqueue_t *the_queue);
  • 該函式回傳值無意義,須隨初始化情況做調整
  • 未進行所傳入 tqueue_t 的記憶體空間配置
    • 在使用前配置好空間再傳入指標
    • 直接在函式中配置空間
      ==> malloc 配置新的記憶體空間,成功回傳 0,失敗回傳 -1
//將 task queue 中的單一 task 取出派遣給 thread
task_t *tqueue_pop(tqueue_t *the_queue);
  • Queue 應該是 FIFO,這裡 pop 出來的不是頭而是尾
    ==> 把 tail 改成 head
task_t *tqueue_pop(tqueue_t *the_queue) { task_t *ret; pthread_mutex_lock(&(the_queue->mutex)); ret = the_queue->head; if (ret) { the_queue->head = ret->last;//if it is NULL, then let it be if (the_queue->head) { the_queue->head->next = NULL; } else { the_queue->tail = NULL; } the_queue->size--; } pthread_mutex_unlock(&(the_queue->mutex)); return ret; }
//回傳 task queue 當前的大小
uint32_t tqueue_size(tqueue_t *the_queue);
  • 此函式在程式中沒有被呼叫過,可以刪除。
//將新的 task 加入 task queue 中
int tqueue_push(tqueue_t *the_queue, task_t *task);
  • 回傳值沒有處理錯誤情況
    ==> 若 task 不存在則回傳 -1,成功 push 則回傳 0
  • 新加入的 task 應該置於 queue 的尾端,這裡放在頭
    ==> 將 head 改為 tail
int tqueue_push(tqueue_t *the_queue, task_t *task) { if(task == NULL) return -1; pthread_mutex_lock(&(the_queue->mutex)); task->next = NULL; task->last = the_queue->tail; if (the_queue->tail) the_queue->tail->next = task; the_queue->tail = task; //only one task in queue if (the_queue->size++ == 0) the_queue->head = task;//only one task in queue pthread_mutex_unlock(&(the_queue->mutex)); return 0; }
//釋放已配置的 task queue 空間
int tqueue_free(tqueue_t *the_queue);
  • 此函式回傳值無意義
    ==> 多加了判斷 the_queue 是否存在的情況,否則回傳 -1,並且不在執行下面釋放記憶體部分
  • 少了 condition variable 的 destroy (有被初始化就需 destroy)
    ==>thread_cond_detroy(&(the_queue->cond));

在 phonebook-concurrent 作業中,參照 mbrossard/threadpool,在 destroy 二者之前有先 lock 住。 不確定為什麼要這樣做,不這樣做會怎樣? 這邊是先沒有 lock。

//初始化 threadpool
int tpool_init(tpool_t *the_pool, uint32_t count, void *(*func)(void *));
//釋放 threadpool 記憶體空間
int tpool_free(tpool_t *the_pool);
  • 此函式回傳值無意義
    ==> 增加 the_pool 是否存在的判斷,以及pthread_join() 是否成功的判斷
int tpool_free(tpool_t *the_pool) { if(the_pool == NULL) return -1; for (uint32_t i = 0; i < the_pool->count; ++i) if(pthread_join(the_pool->threads[i], NULL) != 0) return -1; free(the_pool->threads); tqueue_free(the_pool->queue); return 0; }

Threadpool 問題總整理

  1. 定義了 condition variable 但從頭到尾沒有使用
  2. shutdown 的方式沒有定義(立即 shutdown 或是 等事情做完再 shutdown)

main.c

  • 參考 LanKuDot 同學共筆,看了好久才看懂
  • 程式主要可以分兩個部分:
    • 未達 max_cut: cut_func() recursive 切 左右 list
    • 達 max_cut 之後: merge_sort() recursive 切左右 list 並 merge 回來
//dispatch the division task to queue
void cut_func(void *data);
  • main()init_tpool() 後作的第一件事
  • main() 中有定義 max_cut = MIN(thread_count, data_count)-1
  • 此函式在 cut_count 未達 max_cutlist->size<1 之前,會將 list 對半切,並將 左右 list 和 cut_func() 作為 task 的 argfunc, push 到 t_queue()
  • 達到條件之後,不管切到哪裡就直接將傳入的 list 放入 merge_sort() 並將結果 merge()
  • 在 critical section 有 lock 的設計
  • 在此更改變數名稱 _listlistllistrlist 增加程式易讀性。也改正左右 list 註解相反錯誤之處。
//merge the cross level list
void merge(void *data);
  • critical section 有做 lock 處理
  • 若 _list->size < data_count: 另一邊的 list 還未進入,則先將自己記錄在 tmp_list;若進入了,再將 task push 到 t_queue()
    • task 的 arg : merge_list(左,右)
    • task 的 func : merge()
  • 若 _list->size = data_count: 終止條件。全部 sort 完畢,將結果以全域變數 the_list 紀錄之,並 print_list() 印出
    • 這邊還將空 func push 到 t_queue()

不太懂這樣做的意義為何? Carol Chen

//cut list to left & right and then merge back
llist_t *merge_sort(llist_t *list);
  • 為 recursive function
  • 將傳入的 list recursive 切左右,直到 list->size<2
  • 回傳透過 merge_list() merge 好的結果
//merge left & right list
llist_t *merge_list(llist_t *a, llist_t *b);
  • 比大小來做 *a*b list 的 merge
//grab one task from task queue and excute
static void *task_run(void *data);
  • 相當於 mbrossard/threadpool 中的 threadpool_thread(),由 t_queue 中取出一個 task,並派遣給 thread 去執行
  • 覺得結構上應該要放在 tpool.c
  • 在此沒有設計 lock 或 condition varible
int main(int argc, char const *argv[]);
  1. 讀入 thread_countdata_count,計算 max_cut
  2. 讀入 input 數字串建立 list
  3. 初始化 tpool 以及其他全域變數
  4. cut_func() 作為第一個 task,push 到 t_queue()

設計自動測試實驗

  • 參考 Tempojiji 同學的共筆 對於自動測試程式的設計,建立一個 input_generator.c 作為產生亂數的程式:
#include <stdio.h> #include <time.h> #include <stdlib.h> int main(int argc, char *argv[]) { int MAX_SIZE = atoi(argv[1]); FILE *fp = fopen("input","w+"); srand((long int)time(NULL)); for(int i = 0; i < MAX_SIZE; i++) fprintf(fp, "%u\n", rand() / 10000); fclose(fp); }
  • main.c 中使用clock_gettime() 量測時間並加入這一段:
# if defined(BENCH) FILE *fp = fopen("input","r"); long int data; while((fscanf(fp, "%ld\n", &data)) != EOF){ e = list_add(e,data); } fclose(fp); # endif . . . #if defined(BENCH) fp = fopen("output","a+"); if(thread_count == 1) fprintf(fp, "%d,", data_count); fprintf(fp, "%lf", cpu_time); if(thread_count == 64) fprintf(fp, "\n"); else fprintf(fp,","); fclose(fp); printf("%d %d %lf\n", data_count, thread_count, cpu_time); #endif
  • 接著修改 Makefile:
bench:
        for i in `seq 100 100 800`; do \
            ./input_generator $$i; \
            for j in 1 2 4 8 16 32 64; do \
            	./sort $$j $$i; \
            done \
    	done

plot: bench
        gnuplot runtime.gp
  • 也要撰寫相對應的 runtime.gp 繪圖腳本
  • 以下是目前測試結果:

可以看到 thread_num 越大,時間跳動越明顯。

不知道為什麼沒有辦法執行到 10000 個數字以上的 sort,
時間就會卡很久在那邊不動 Carol Chen

scalability 分析方式

scalability

在 wikipedia 上的定義:

  • An algorithm, design, networking protocol, program, or other system is said to scale if it is suitably efficient and practical when applied to large situations (e.g. a large input data set, a large number of outputs or users, or a large number of participating nodes in the case of a distributed system). If the design or system fails when a quantity increases, it does not scale.
  • In practice, if there are a large number of things (n) that affect scaling, then resource requirements (for example, algorithmic time-complexity) must grow less than n^2 as n increases. An example is a search engine, which scales not only for the number of users, but also for the number of objects it indexes.
  • Scalability refers to the ability of a site to increase in size as demand warrants.

在 Stackoverflow 上面的討論:

  • Scalability is the ability of a program to scale. For example, if you can do something on a small database (say less than 1000 records), a program that is highly scalable would work well on a small set as well as working well on a large set (say millions, or billions of records).

  • It would have a linear growth of resource requirements. Look up Big-O notation for more details about how programs can require more computation the larger the data input gets. Something parabolic like Big-O(x^2) is far less efficient with large x inputs than something linear like Big-O(x).


  • 參考 LanKuDot 同學的共筆,對於 scalability 的整理,以下是他的筆記:

運作

  1. 執行時須帶有參數,第一個參數指定 coreshift 會讓所有參數左移 (原本第二個參數會變第一個)
  2. 執行 configlock_exec script
  3. 讀取指定執行程式名到 prog,剩下後接的參數到 params
  4. 運行 1~core 核心的測試,並輸出與 core 1 的執行結果比較
  5. 一共有四個資訊會輸出 (下標 N 為執行的 core 數)
    a. #cores:執行的核心數
    b. throughput:每單位時間的執行工作數量
    c. %linear:
    (1NscalailityN)×100%=(scalabilityN)×100%
    ,每個 core 的平均 scalability in percent
    d. scalability:
    throughputNthroughput1
    ,throughput 增加比
  6. 執行 unlock_exec script

concurrent-ll 對於 scalability 的分析

scalability.sh

  • scalability1.shscalability2.sh 差在一次可執行的程式數為 1 個 或 2 個
  • 執行看看
carol@carol-PC:~/concurrent-ll$ scripts/scalability1.sh all ./out/test-lock -i128

## Use 4 as the number of cores.
## If not correct, change it in scripts/config
#cores  throughput  %linear scalability
1       309985      100.00  1
2       186428      30.07   0.60
3       195184      20.99   0.63
4       2493        0.20    0.01
carol@carol-PC:~/concurrent-ll$ scripts/scalability2.sh all ./out/test-lock ./out/test-lockfree -i100

## Use 4 as the number of cores.
## If not correct, change it in scripts/config
#       ./out/test-lock                 ./out/test-lockfree
#cores  throughput  %linear scalability throughput  %linear scalability
1       310796      100.00  1           464000      100.00  1
2       185595      29.86   0.60        736644      79.38   1.59
3       192483      20.64   0.62        1033290     74.23   2.23
4       8355        0.67    0.03        1039714     56.02   2.24

後面傳入的這個 -i128-i100 是什麼意思? Carol Chen
怎麼用它來測自己的程式? Carol Chen

更改實作使其可接受 phonebook 的資料檔

threadpool 使用 condition variable

  • 原程式的 threadpool 定義了 condition variable 但未使用
  • 參考 mbrossard/threadpool 的設計

  • tqueue_pop():
    若定義了 MONITOR 就要把 lock 拿掉。
task_t *tqueue_pop(tqueue_t *the_queue) { task_t *ret; #ifndef MONITOR pthread_mutex_lock(&(the_queue->mutex)); #endif ret = the_queue->head; if (ret) { the_queue->head = ret->next;//if it is NULL, then let it be if (the_queue->head) { the_queue->head->last = NULL; } else { the_queue->tail = NULL; } the_queue->size--; } #ifndef MONITOR pthread_mutex_unlock(&(the_queue->mutex)); #endif return ret; }
  • tqueue_push()
int tqueue_push(tqueue_t *the_queue, task_t *task) { #ifndef MONITOR if(task == NULL) return -1; #endif pthread_mutex_lock(&(the_queue->mutex)); task->next = NULL; task->last = the_queue->tail; if (the_queue->tail) the_queue->tail->next = task; the_queue->tail = task; //only one task in queue if (the_queue->size++ == 0) the_queue->head = task;//only one task in queue #if defined(MONITOR) pthread_cond_signal(&(the_queue->cond)); #endif pthread_mutex_unlock(&(the_queue->mutex)); return 0; }
  • tqueue_free()
int tqueue_free(tqueue_t *the_queue) { if(the_queue == NULL) return -1; task_t *cur = the_queue->head; while (cur) { the_queue->head = the_queue->head->next; free(cur); cur = the_queue->head; } #if defined(MONITOR) pthread_mutex_lock(&(the_queue->mutex)); #endif pthread_mutex_destroy(&(the_queue->mutex)); pthread_cond_destroy(&(the_queue->cond)); return 0; }
  • tpool_free()
int tpool_free(tpool_t *the_pool) { #if defined(MONITOR) pthread_cond_broadcast(&(the_pool->queue->cond)); pthread_mutex_unlock(&(the_pool->queue->mutex)); #endif if(the_pool == NULL) return -1; for (uint32_t i = 0; i < the_pool->count; ++i) if(pthread_join(the_pool->threads[i], NULL) != 0) return -1; free(the_pool->threads); tqueue_free(the_pool->queue); return 0; }
  • task_run()
static void *task_run(void *data) { (void) data; while (1) { #if defined(MONITOR) pthread_mutex_lock(&(pool->queue->mutex)); while( pool->queue->size == 0) { pthread_cond_wait(&(pool->queue->cond), &(pool->queue->mutex)); } if (pool->queue->size == 0) break; #endif task_t *_task = tqueue_pop(pool->queue); #if defined(MONITOR) pthread_mutex_unlock(&(pool->queue->mutex)); #endif if (_task) { if (!_task->func) {//if function does not exist, push task back to queue tqueue_push(pool->queue, _task); break; } else { _task->func(_task->arg);//excute task free(_task); } } } pthread_exit(NULL); }

  • 結果入下:

比原先快很多,可以看到 y 軸最高只有 0.014 秒

  • 和原版的比較:

改良 threadpool 為 lock-free 版本

修改 queue 結構比較其效能

  • 參考 第 20 組 同學們 共筆,將 queue 的結構改成 ring structure 並比較效能優劣