Try   HackMD

2016q3 Homework2 (phonebook-concurrent)

contributed by <RayPan>

開發環境

  • Ubuntu 16.04 LTS
  • CPU
Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                4
On-line CPU(s) list:   0-3
Thread(s) per core:    2
Core(s) per socket:    2
Socket(s):             1
NUMA node(s):          1
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 60
Model name:            Intel(R) Core(TM) i5-4200H CPU @ 2.80GHz
Stepping:              3
CPU MHz:               2869.015
CPU max MHz:           3400.0000
CPU min MHz:           800.0000
BogoMIPS:              5587.49
Virtualization:        VT-x
L1d cache:             32K
L1i cache:             32K
L2 cache:              256K
L3 cache:              3072K
NUMA node0 CPU(s):     0-3

開發目標

  • 學習第二週提及的 concurrency 程式設計
  • 學習 POSIX Thread
  • 學習效能分析工具
  • code refactoring 練習
  • 探索 clz 的應用

POSIX Thread

參考資料: Getting Started With POSIX ThreadsPthread程式撰寫
冼鏡光的並行計算投影片POSIX threads explained

  • Parallel vs. Concurrent

  1. parallel : 兩件事齊頭並進
  2. concurrent:電腦系統中,執行的程式數目通常比CPU(加上它們的核心,core)的數目大很多,因此作業系統的做法是讓這個程式使用CPU很短一段時間、再讓另一個程式使用很短一段時間等等(交錯執行)。因爲每一個程式佔用CPU的時間很短,很快又輪回同一個程式,所以對使用人而言可能完全沒有程式在斷斷續續執行的感覺,我們會覺得動作是連續的。
  • Process/Thread

一個執行的程式(Process)可以分成幾條執行緒(thread),每一條執行緒處理賦予它的工作;ie,一個行程可以分成三條執行緒,一條負責讀取各種輸入、第二條處理視窗的輸出、第三條從事計算的工作,這三條執行線彼此之間相互協調配合、完成該程式的工作。

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

圖片來源

POSIX標準中支援的執行緒函式庫稱為pthread,我們可以透過pthread結構與pthread_create() 函數執行某個函數指標,以建立新的執行緒。

  • pthread_create

建立一個thread並執行附帶的function

pthread_t         mythread;
pthread_attr_t    mythread_attribute;
void              thread_function(void *argument);
char              *some_argument;
int pthread_create(&mythread, mythread_attribute,
                  (void *)&thread_function,(void *)&some_argument);

1.參數mythread為pthread的指標,在使用Thread之前必須要先宣告一個pthread_t的變數。
2.參數mythread_attribute為該Thread的屬性,預設是NULL,如果沒有其他特殊的需求直接填入NULL即可。
3.參數thread_function)(void *)為Function pointer,這邊要放入你要執行的Function。
4.參數void *some_argument為Function pointer所要帶的參數。
回傳值: 如果執行成功則回傳0,如果執行失敗則回傳錯誤代碼。

  • pthread_join

暫停目前執行pthread_join的Thread,等到目標Thread執行完畢之後目前執行pthread_join的Thread才會繼續執行。

int pthread_join (pthread_t thread, void  **value_ptr)

1.參數:pthread_t thread為要等待的目標Thread。
2.參數:void **value_ptr用來取得目標Thread的回傳值。
回傳值: 如果執行成功則回傳0,如果執行失敗則回傳錯誤代碼。

  • 簡單範例程式
/*thread1.c*/ #include <pthread.h> #include <stdlib.h> #include <stdio.h> #include <unistd.h> void *thread_function(void *arg) { int i; for ( i=0; i<20; i++ ) { printf("Thread says hi!\n"); sleep(1); } return NULL; } int main(void) { pthread_t mythread; if ( pthread_create( &mythread, NULL, thread_function, NULL) ) { printf("error creating thread."); abort(); } if ( pthread_join ( mythread, NULL ) ) { printf("error joining thread."); abort(); } exit(0); }

主程式中本身會產生一條執行緒,thread_create又建立一條新執行緒,所以這個程式包含兩條執行緒。
主程式的執行緒繼續執行下一個if(pthread_join)的判斷
新建的執行緒結束時,會停止並等待被併入或加入其他執行緒。
pthread_create() 會分裂成兩條執行緒,pthread_join() 則會將兩條執行緒合併為一。
在 thread_function() 完成時,主程式的執行緒已經呼叫pthread_join(),當這個情況發生時,主程式的執行緒會 block住(go to sleep) 並等待 thread_function() 完成。
當thread_fuction()完成時,pthread_join()會回傳值,所以現在我們的程式又有一條主執行緒,所以必須好好處理每條新建的執行緒,否則達到系統的最大值執行緒數量限制會造成pthread_create錯誤。

/*thread2.c*/ #include <pthread.h> #include <stdlib.h> #include <unistd.h> #include <stdio.h> int myglobal; void *thread_function(void *arg) { int i,j; for ( i=0; i<20; i++ ) { j=myglobal; j=j+1; printf("."); fflush(stdout); sleep(1); myglobal=j; } return NULL; } int main(void) { pthread_t mythread; int i; if ( pthread_create( &mythread, NULL, thread_function, NULL) ) { printf("error creating thread."); abort(); } for ( i=0; i<20; i++) { myglobal=myglobal+1; printf("o"); fflush(stdout); sleep(1); } if ( pthread_join ( mythread, NULL ) ) { printf("error joining thread."); abort(); } printf("\nmyglobal equals %d\n",myglobal); exit(0); }

執行結果

o.o.o.o.o..o.o.o.o.o.o.o.oo..oo.o..o.oo.
myglobal equals 21

myglobal起始值為0,主執行緒與新執行緒各將myglobal增加20,照理說應該是40,但輸出21原因為何?
因為兩條執行緒同時執行,當thread_fucntion把j值寫回myglobal,他會複寫主執行緒中的修改。

解決上述問題
加入互斥鎖(mutex)

/*thread3.c*/ #include <pthread.h> #include <stdlib.h> #include <unistd.h> #include <stdio.h> int myglobal; pthread_mutex_t mymutex=PTHREAD_MUTEX_INITIALIZER; void *thread_function(void *arg) { int i,j; for ( i=0; i<20; i++ ) { pthread_mutex_lock(&mymutex); j=myglobal; j=j+1; printf("."); fflush(stdout); sleep(1); myglobal=j; pthread_mutex_unlock(&mymutex); } return NULL; } int main(void) { pthread_t mythread; int i; if ( pthread_create( &mythread, NULL, thread_function, NULL) ) { printf("error creating thread."); abort(); } for ( i=0; i<20; i++) { pthread_mutex_lock(&mymutex); myglobal=myglobal+1; pthread_mutex_unlock(&mymutex); printf("o"); fflush(stdout); sleep(1); } if ( pthread_join ( mythread, NULL ) ) { printf("error joining thread."); abort(); } printf("\nmyglobal equals %d\n",myglobal); exit(0); }

兩條執行緒不能同時被一個互斥鎖鎖住,如果執行緒A想要鎖一個mutex然而執行緒B已經鎖住該mutex,A會sleep。
當B釋出這個mutex(透過pthread_mutex_unlock()),A即可鎖住該mutex。
亦即當一個mutex被鎖住,所有想要鎖的執行緒會被queue up。
pthread_mutex_lock()以及pthread_mutex_unlock()確保一個資料結構同時只能被一個執行緒存取,用來保護該資料結構。

  • fflush(stdout) : forces a write of all user-space buffered data for the given output or update stream via the stream's underlying write function. The open status of the stream is unaffected.
    stdout是標準輸出。有時候,我們輸出到stdout的內容不能及時輸出,因為stdout的緩衝區沒有滿或者其他原因,fflush(stdout)就是強迫把stdout內容輸出並清空stdout。

記憶體映射函數 mmap

參考資料:記憶體映射函數 mmap

  • 簡介

把文件內容映射到一段記憶體上(準確說是虛擬記憶體上),通過對這段記憶體的讀取和修改,實現對文件的讀取和修改。

  • 函數

void *mmap(void *start,size_t length, int prot, 
           int flags, int fd, off_t offsize);

1.參數start:指向欲映射的核心起始位址,通常設為NULL,代表讓系統自動選定位址,核心會自己在行程位址空間中選擇合適的位址建立映射。
映射成功後返回該位址。如果不是NULL,則給核心一個提示,應該從什麼位址開始映射,核心會選擇start之上的某個合適的位址開始映射。
建立映射後,真正的映射位址通過返回值可以得到。

2.參數length:代表映射的大小。將文件的多大長度映射到記憶體。

3.參數prot:映射區域的保護方式。
可以為以下幾種方式的組合:
PROT_EXEC 映射區域可被執行
PROT_READ 映射區域可被讀取
PROT_WRITE 映射區域可被寫入
PROT_NONE 映射區域不能存取

4.參數flags:影響映射區域的各種特性。在調用mmap()時必須要指定MAP_SHARED 或MAP_PRIVATE。
MAP_FIXED 如果參數start所指的位址無法成功建立映射時,則放棄映射,不對位址做修正。通常不鼓勵用此旗標。
MAP_SHARED 允許其他映射該文件的行程共享,對映射區域的寫入數據會複製回文件。
MAP_PRIVATE 不允許其他映射該文件的行程共享,對映射區域的寫入操作會產生一個映射的複製(copy-on-write),對此區域所做的修改不會寫回原文件。
MAP_ANONYMOUS 建立匿名映射。此時會忽略參數fd,不涉及文件,而且映射區域無法和其他進程共享。
MAP_DENYWRITE 只允許對映射區域的寫入操作,其他對文件直接寫入的操作將會被拒絕。
MAP_LOCKED 將映射區域鎖定住,這表示該區域不會被置換(swap)。

5.參數fd:由open返回的文件描述符,代表要映射到核心中的文件。如果使用匿名核心映射時,即flags中設置了MAP_ANONYMOUS,fd設為-1。
有些系統不支持匿名核心映射,則可以使用fopen打開/dev/zero文件,然後對該文件進行映射,可以同樣達到匿名核心映射的效果。

6.參數offset:從文件映射開始處的偏移量,通常為0,代表從文件最前方開始映射。
offset必須是分頁大小的整數倍(在32位體系統結構上通常是4K)。

  • 步驟

1.用open系統調用打開文件, 並返回描述符fd.
2.用mmap建立記憶體映射, 並返回映射開始的地址指標start.
3.對映射(文件)進行各種操作
4.用munmap(void *start, size_t lenght)關閉記憶體映射.
5.用close系統調用關閉文件fd.


程式理解

首先將原始字典檔DICT_FILE輸出為一個「每筆資料長度為MAX_LAST_NAME_SIZE,剩餘補\0」的align.txt檔。

利用open函式打開文件檔align.txt,存取到fd

int fd = open(ALIGN_FILE, O_RDONLY | O_NONBLOCK);

O_RDONLY:以只能讀取方式打開文件
O_NONBLOCK 以不可阻斷的方式打開文件,也就是無論有無數據讀取或等待,都會立刻返回行程之中。

利用mmap函式將fd映射到指標map所指到的記憶體空間。

char *map = mmap(NULL, fs, PROT_READ, MAP_SHARED, fd, 0);

由於程式切成THREAD_NUM個執行緒執行
以下將他們接起來

for (int i = 0; i < THREAD_NUM; i++) { if (i == 0) { pHead = app[i]->pHead->pNext; dprintf("Connect %d head string %s %p\n", i, app[i]->pHead->pNext->lastName, app[i]->ptr); } else { etmp->pNext = app[i]->pHead->pNext; dprintf("Connect %d head string %s %p\n", i, app[i]->pHead->pNext->lastName, app[i]->ptr); } etmp = app[i]->pLast; dprintf("Connect %d tail string %s %p\n", i, app[i]->pLast->lastName, app[i]->ptr); dprintf("round %d\n", i); }

assert(argc == 4 && "./main orgfName modfName Padded"):若符合條件式則程式可以繼續執行;否則會秀出維護錯誤訊息的字串,並結束程式。


使用Thread Pool達到效能改善

參考資料:基於pthread實現的簡單threadpoolwikipedia

  • Threadpool

有一組預先生成的thread,有個管理員來管理和調度這些thread,方法為管理員管理一個任務佇列(job queue),如果接收到新的任務就將其加到佇列尾,每個thread盯著佇列,如果佇列不是空的,就去佇列頭拿一個任務來處理(每個任務只能被一個thread拿到),處理完了就繼續去佇列取任務。
如果沒有任務,thread就會休眠,直到任務佇列不是空的。
如果這個管理員夠聰明,他可能會在沒有任務或任務少的時候,減少thread數,任務過多時增加thread數,實現資源的動態管理。

Thread Pool 本身,他必須存放所有的 Thread 與任務佇列。

Thread pool 簡單實現

在建立threadpool的時候,預先指定thread的數量,然後去job queue取添加進來的task進行處理就好。

主要定義兩個結構
1.threadpool_task_t (in threadpool.c)
用於保存一個等待執行的task,包含要執行的函數及其參數。

typedef struct { void (*function)(void *); void *argument; } threadpool_task_t;

2.threadpool_t (in threadpool.c)

struct threadpool_t { pthread_mutex_t lock; //互斥鎖 pthread_cond_t notify; //條件變數 pthread_t *threads; //執行續陣列的起始指標 threadpool_task_t *queue; //任務佇列的起始指標 int thread_count; //執行緒數量 int queue_size; //任務佇列長度 int head; //目前任務佇列頭 int tail; //目前任務佇列尾 int count; //目前等待執行的任務數 int shutdown; //threadpool當前狀態是否為關閉 int started; //正在執行的thread數 };

函數

對外接口
  • threadpool_create :建立threadpool。
threadpool_t *threadpool_create(int thread_count, int queue_size, int flags)
  • threadpool_add :添加需要執行的任務。第二個參數為對應函數指標,第三個函數為對應函數參數。
int threadpool_add(threadpool_t *pool, void (*function)(void *), void *argument, int flags)
  • threadpool_destory :銷毀存在的threadpool,若flad=1 為graceful結束,即等待全部做完才結束;若flag=0為immediate結束,即立即結束。
int threadpool_destroy(threadpool_t *pool, int flags)
內部輔助函數
  • threadpool_free:釋放threadpool所申請的記憶體空間。
int threadpool_free(threadpool_t *pool)
  • threadpool_thread:thread pool每個thread所執行的函數。
static void *threadpool_thread(void *threadpool)

程式執行後效能比較圖


筆記

參考資料: Concurrency系列

  • Evaluation求值

包括value computation:對一串運算式計算之結果;side effect:對物件修改的狀態。

  • Sequenced-before

對同一個thread下,求值順序關係的描述。
* 若A sequenced before B: 代表A求值會先完成才會對B求值。
* i++後置運算子,value computation會先於side effect。
* ++i前置運算子,則相反。

  • Synchronizes-With

發生在兩個不同thread間的同步行為,當A synchronized-with B的時,代表A對記憶體操作的效果,對於B是可見的。而A和B是兩個不同的thread的某個操作。

  • Sequential Consistency

1.對於每個獨立的處理單元,執行時都維持程式的順序(Program Order)
2.整個程式以某種順序在所有處理器上執行


作業詳細資訊A06:phonebook-concurrent

tags RayPan A06:phonebook-concurrent