Try   HackMD

2016q3 Homework3(mergesort-concurrent)

contributed by <andy19950>

預期目標

  • 作為 concurrency 的展示案例
  • 學習 POSIX Thread Programming,特別是 synchronization object
  • 為日後效能分析和 scalability 研究建構基礎建設
  • 學習程式品質分析和相關的開發工具

POSIX THread Synchronization

  • Mutual Exclusion (Mutex) Locks
  • Condition Variables
  • Semaphores

Mutex Locks

  • 確保每次只能有一個 thread 使用 critical section
  • 就像是進入密室,進去的人必須要把門鎖起來,離開也要記得解鎖。
  • pthread_mutexattr_init():設定 mutex 的屬性,由下列兩個參數來決定:
    • PTHREAD_PROCESS_PRIVATE: 只能被該 process 使用
    • PTHREAD_PROCESS_SHARED: 可以被不同 process 使用
  • pthread_mutex_init() 新增 mutex lock,必須傳入兩個參數:
    1. pthread_mutex_t mp:要使用的 mutex lock,通常帶入函式庫定義好的變數 PTHREAD_MUTEX_INITIALIZER
    2. pthread_mutexattr_t mattr:必須先使用 pthread_mutexattr_init() 來初始化 attribute 或 NULL 使用函式庫的 default attribute
  • pthread_mutex_lock():將 mutex lock 鎖起來,要傳入 mutex lock 的記憶體位置,也就是剛剛宣告的 &mp
  • pthread_mutex_unlock():將 mutex lock 解鎖,用法跟上面一樣傳入 &mp
pthread_mutex_t mp = PTHREAD_MUTEX_INITIALIZER; pthread_mutexattr_t mattr = PTHREAD_PROCESS_SHARED; pthread_mutexattr_init(&mattr); //設定 mutex lock 屬性 pthread_mutex_init(&mp, &mattr); //初始化 mutex lock pthread_mutex_lock(&mp); //鎖起來 .. critical section .. pthread_mutex_unlock(&mp); //解鎖

Condition Variable

  • 若條件不符(condition variabe = false),thread 就會被block 起來直到條件符合為止,必須跟 mutex lock 一起使用!!
  • 進入密室後,如果不符合條件,必須離開密室去睡覺,直到有人叫醒你為止。
  • pthread_condatrr_init():設定條件的屬性,跟 mutex 一樣有兩種參數,用法也一樣。
  • pthread_cond_init(): 新增 condition variable 必須傳入兩個參數:
    1. pthread_cond_t cv:要使用的 condition variable,通常帶入函式庫定義的變數 PTHREAD_COND_INITIALIZER
    2. pthread_condattr_t cattr:必須先使用 pthread_condattr_init() 來初始化 attribute 或 NULL 使用函式庫的 default attribute
  • pthread_cond_wait(): 若條件不符,就把 thread block 起來,必須傳入 condition variabel(&cv) 以及 mutex lock(&mp)
  • pthread_cond_signal(): 將 blocked thread 叫醒讓它繼續執行。
  • 下面是一個簡單的 Producer-Consumer Problem
pthread_mutex_t count_lock; pthread_cond_t count_nonzero; unsigned count; Consumer() { pthread_mutex_lock(&count_lock); while (count == 0) //如果沒有工作了,就把 thread block 起來 pthread_cond_wait(&count_nonzero, &count_lock); count = count - 1; pthread_mutex_unlock(&count_lock); } Producer() { pthread_mutex_lock(&count_lock); if (count == 0) //增加工作後把 consumer 叫醒 pthread_cond_signal(&count_nonzero); count = count + 1; pthread_mutex_unlock(&count_lock); }

POSIX Semaphores

  • 允許多人一起進入 critical section 但是有人數限制。
  • 就像銀行有多個櫃台,每個人都要領號碼牌等待,叫到你的號碼就可以到櫃台去處理事情。
  • sem_init(sem_t *sem, int pshared, int value)
    • sem:semaphore object 指標
    • pshared:決定 semaphore 能不能被其他 fork process 使用
    • value: 決定 semaphore 的數量限制
  • sem_wait(sem_t *sem):如果 semaphore value 為負數,代表已經達到數量上限,該 thread 就會被 block 起來。
  • sem_post(sem_t *sem):把 semaphore value +1 然後把 blocked thread 叫醒繼續工作。

常見的 Sychronization algorithm


延續上一次 Phonebook 作業

  • 在資料正確性的部份,我在 main.c 裡面有寫驗證資料的方法,只是它需要O(n2),差不多 30 分鐘左右來驗證。
  • 若使用 linked-list mergesort 來把 phonebook 裡面的 linked-list sort 一次後就可以跟 原始資料一對一筆對看看是否正確。
  • 為了讓 mergesort 能夠更快的排序完成,老師讓我們使用 concurrent mergesort,把 mergesort 分成許多不同的 work 用多執行緒同時執行來加速排序。

遇到的問題

  • mergesort-concurrent 的輸入是 ./sort [thread_num] [data_num],並且要手動輸入數字,我們要把它改成讀 input.txt 當作輸入。
  • mergesort-concurrent 目前只能 sort 數字,要把它改成可以 sort 英文字串。
  • 輸入資料太龐大,output 格式不適合用 stdout 的方式,改成輸出到output.txt
  • 剩下的在新增

更改資料結構

  • 把 data 的型態改成 char *,如果想實做各種資料型態都可以 sort 的話,可以把型態改成 void *,然後比照 qsort 讓使用者傳入比較函式 compare()。
  • 在 list.h 內更改資料型態:
    • typedef intptr_t val_t
    • typedef char *val_t
#define MAX_LAST_NAME_SIZE 16 typedef struct node { //必須給定資料空間或者每次都 malloc 空間給它 val_t data[MAX_LAST_NAME_SIZE]; struct node *next; } node_t;

更改資料讀入的方式

  • 將輸入的形式改為 ./sort [thread_num] [input_file]
  • 把 for 迴圈的 scanf 手動輸入資料改為, while 迴圈的 fgets 自動讀檔,並且把 \n -> \0
while(fgets(line, sizeof(line), fp) != NULL) { line[strlen(line)-1] = '\0'; list_add(the_list, line); }

更改比較大小的方式

  • merge_list() 裡面有一個比較 a & b 的地方,這邊的作法是:
small = a * (..省略..) + b * (..省略..);
  • 其中省略的地方是一個比較的動作,它會回傳 0 or 1,所以如果 a 的資料筆 b 小,則 a 後面括號的值會 = 1,b 後面的值會 = 0
  • 如此一來 small = a*1 + b*0 = a,反之 small = b,這是一個很聰明的方法可以避免使用 if else 所產生的 branch。
  • 而我們的作法是將比較的部份改為呼叫 strcmp 來替我們比較大小
  • strcmp 會比較兩字串的大小(採 alphabetical order),比較小的回傳 -1 一樣回傳 0 比較大的回傳 1
  • 所以我們可以把它改成:
int cmp = strcmp(a->head->data, b->head->data); llist_t *small = (llist_t *) ((intptr_t *) a * (cmp <= 0) + (intptr_t *) b * (cmp > 0));

更改輸出方式

  • 原本是利用 list_print() 把每個節點走訪一次然後印出來,為了我們驗證方便,我把它輸出的型態改為 "%s\n" 這樣比較好跟原始資料做比較。
  • 而且我用開關檔的方式讓答案直接輸出在 output.txt 裡面。

增加驗證的函式 data_correctness()

void data_correctness() { FILE *result, *answer; result = fopen("output.txt", "r"); answer = fopen("words.txt", "r"); char a[BUF_SIZ], b[BUF_SIZ]; while(fgets(a, sizeof(a), answer) != NULL){ fgets(b, sizeof(b), result); assert(!strcmp(a, b) && "ERROR : Some name is not in the linked-list!!"); } printf("CORRECT : Every name is in the linked-list!!\n"); }

結果

input unsorted data line-by-line

sorted results in output file: output.txt
CORRECT : Every name is in the linked-list!!
  • 確認排序沒有問題後就來看看不同 thread 數量跑起來的效果如何吧。

cache information

thread cache miss cache reference miss rate
1 27,821,458 48,255,510 63%
2 27,742,811 47,788,466 61%
4 28,409,123 109,737,179 30%
8 28,358,122 138,450,096 23%

修改成 lock-free threadpool

讓每個 thread 都有自己的 work queue。

  • 依照 thread 的數量在tpool_init()內給定the_pool->queue 空間大小。
  • 在創造 thread 前使用tqueue_init()初始化每個 work queue。
  • 利用 pthread_create() 的第四個參數(原本是NULL),讓每個 thread 可以知道自己的 queue 的位置。
the_pool->queue = (tqueue_t *) malloc(sizeof(tqueue_t) * tcount); for (uint32_t i = 0; i < tcount; ++i) { tqueue_init(the_pool->queue + i); pthread_create(&(the_pool->threads[i]), &attr, func, (void *)(the_pool->queue + i)); }

task_run() 時讓每個 thread 都跟自己的 queue 拿工作

  • 因為我們在使用 pthread_create()時有把每個 thread 的 queue 的位置傳進來,所以在這邊可以把 void *data轉成原本的tqueue_t *_queue來取得指定的 queue
  • 如此一來每個 thread 會使用這裡的 while 迴圈不斷取的自己 queue 裡面的工作,直到收到 NULL 的 func 為止。
static void *task_run(void *data) { tqueue_t *_queue = (tqueue_t *) data; ... }
  • 然後就可以把全部的 mutex lock 拿掉!!!
  • 重點提醒

因為原本只有一個 queue 所以在 merge 的最後只有給一次 NULL 的 func 到唯一的 queue 裡面,但這邊我們使用多個 queue 所以在 merge 的最後也要使用 for loop 來給每個 queue NULL 的 func 才能讓每個 thread 結束執行,否則整支程式會因為某個 thread 不斷執行這個 while 而卡住。

else { //進入這個 else 代表 linked-list 已經排序完畢,要結束所有 thread 並印出結果。 the_list = _list; task_t *_task = (task_t *) malloc(sizeof(task_t)); _task->func = NULL; for(int i=0; i<pool->count; i++) tqueue_push(pool->queue + i, _task); list_print(_list); }

使用 tqueue_push 指派工作到不同的 work queue

  • 在 main 裡面會有許多次呼叫到 tqueue_push() 來指派工作到 queue 裡面,但因為我們實做出多個 queue,所以必須要讓每次呼叫 tqueue_push()時把工作指派到不同的 queue。
  • 我在 main 裡面宣告了全域變數 static uint32_t now = 0,剛開始的時候指派到第一個 queue。
  • 利用下面這一行使用 round-robin 方法來指派工作
tqueue_push(pool->queue + now++%thread_count, _task);
  • 因為型態是uint32_t 的關係所以不會有 integer overflow 的問題,就算同時有不同的 thread 使用這行頂多只是分派到同一個 work queue 不會有太大的問題。

結果

  • 對於 1~32 threads 其實沒有明顯的改變,但是更多 threads 可以發現執行時間會變長。
  • 但是可以從下面的表格發現 4/8 threads 的 cache reference 次數降低滿多的,也因此 miss rate 提高了。

這句解讀的方式很奇怪,沒說錯嗎? jserv
其實我想說的是,跟上面的表格比較起來看似 miss rate 提高了,但其實是因為 cache reference 次數減少,實際上 cache miss 次數差不多 吳庭安

cache information

thread cache miss cache reference miss rate
1 27,043,638 46,433,299 58.42%
2 26,789,642 46,633,605 57.44%
4 26,811,962 46,982,814 57.21%
8 27,058,363 47,194,774 57.33%

問題

  • 思考了很久為什麼改成 lock-free 卻沒有增加效能,在重新跟著程式碼的邏輯走之後,我發現其實分配給 work queue 的工作只有 cut_func merge,但真正花時間的 merge_sort merge_list 卻沒有切成小的工作單位給每個 queue 去執行。
  • 但我也不確定這是不是真正的原因,請各位大大幫我解惑QQ

為何不用 perf 確認呢?
可一併分析 gprof 輸出 jserv


Future work

  1. 研究 concurrent ll 程式碼
  2. 學習 graphViz
  3. 嘗試減少 cache miss

要先改善 lock contention 的問題,否則執行緒數量和處理器核心增加也無助於效能 jserv


Reference

  1. Further Threads Programming : Synchronization
  2. POSIX Semaphores