contributed by <RayPan
>
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 4
On-line CPU(s) list: 0-3
Thread(s) per core: 2
Core(s) per socket: 2
Socket(s): 1
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 60
Model name: Intel(R) Core(TM) i5-4200H CPU @ 2.80GHz
Stepping: 3
CPU MHz: 2869.015
CPU max MHz: 3400.0000
CPU min MHz: 800.0000
BogoMIPS: 5587.49
Virtualization: VT-x
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 3072K
NUMA node0 CPU(s): 0-3
參考資料: Getting Started With POSIX Threads、Pthread程式撰寫
、冼鏡光的並行計算投影片、POSIX threads explained
一個執行的程式(Process)可以分成幾條執行緒(thread),每一條執行緒處理賦予它的工作;ie,一個行程可以分成三條執行緒,一條負責讀取各種輸入、第二條處理視窗的輸出、第三條從事計算的工作,這三條執行線彼此之間相互協調配合、完成該程式的工作。
POSIX標準中支援的執行緒函式庫稱為pthread,我們可以透過pthread結構與pthread_create() 函數執行某個函數指標,以建立新的執行緒。
建立一個thread並執行附帶的function
pthread_t mythread;
pthread_attr_t mythread_attribute;
void thread_function(void *argument);
char *some_argument;
int pthread_create(&mythread, mythread_attribute,
(void *)&thread_function,(void *)&some_argument);
1.參數mythread為pthread的指標,在使用Thread之前必須要先宣告一個pthread_t的變數。
2.參數mythread_attribute為該Thread的屬性,預設是NULL,如果沒有其他特殊的需求直接填入NULL即可。
3.參數thread_function)(void *)為Function pointer,這邊要放入你要執行的Function。
4.參數void *some_argument為Function pointer所要帶的參數。
回傳值: 如果執行成功則回傳0,如果執行失敗則回傳錯誤代碼。
暫停目前執行pthread_join的Thread,等到目標Thread執行完畢之後目前執行pthread_join的Thread才會繼續執行。
int pthread_join (pthread_t thread, void **value_ptr)
1.參數:pthread_t thread為要等待的目標Thread。
2.參數:void **value_ptr用來取得目標Thread的回傳值。
回傳值: 如果執行成功則回傳0,如果執行失敗則回傳錯誤代碼。
/*thread1.c*/
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
void *thread_function(void *arg) {
int i;
for ( i=0; i<20; i++ ) {
printf("Thread says hi!\n");
sleep(1);
}
return NULL;
}
int main(void) {
pthread_t mythread;
if ( pthread_create( &mythread, NULL, thread_function, NULL) ) {
printf("error creating thread.");
abort();
}
if ( pthread_join ( mythread, NULL ) ) {
printf("error joining thread.");
abort();
}
exit(0);
}
主程式中本身會產生一條執行緒,thread_create又建立一條新執行緒,所以這個程式包含兩條執行緒。
主程式的執行緒繼續執行下一個if(pthread_join…)的判斷
新建的執行緒結束時,會停止並等待被併入或加入其他執行緒。
pthread_create() 會分裂成兩條執行緒,pthread_join() 則會將兩條執行緒合併為一。
在 thread_function() 完成時,主程式的執行緒已經呼叫pthread_join(),當這個情況發生時,主程式的執行緒會 block住(go to sleep) 並等待 thread_function() 完成。
當thread_fuction()完成時,pthread_join()會回傳值,所以現在我們的程式又有一條主執行緒,所以必須好好處理每條新建的執行緒,否則達到系統的最大值執行緒數量限制會造成pthread_create錯誤。
/*thread2.c*/
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
int myglobal;
void *thread_function(void *arg) {
int i,j;
for ( i=0; i<20; i++ ) {
j=myglobal;
j=j+1;
printf(".");
fflush(stdout);
sleep(1);
myglobal=j;
}
return NULL;
}
int main(void) {
pthread_t mythread;
int i;
if ( pthread_create( &mythread, NULL, thread_function, NULL) ) {
printf("error creating thread.");
abort();
}
for ( i=0; i<20; i++) {
myglobal=myglobal+1;
printf("o");
fflush(stdout);
sleep(1);
}
if ( pthread_join ( mythread, NULL ) ) {
printf("error joining thread.");
abort();
}
printf("\nmyglobal equals %d\n",myglobal);
exit(0);
}
執行結果
o.o.o.o.o..o.o.o.o.o.o.o.oo..oo.o..o.oo.
myglobal equals 21
myglobal起始值為0,主執行緒與新執行緒各將myglobal增加20,照理說應該是40,但輸出21原因為何?
因為兩條執行緒同時執行,當thread_fucntion把j值寫回myglobal,他會複寫主執行緒中的修改。
解決上述問題
加入互斥鎖(mutex)
/*thread3.c*/
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
int myglobal;
pthread_mutex_t mymutex=PTHREAD_MUTEX_INITIALIZER;
void *thread_function(void *arg) {
int i,j;
for ( i=0; i<20; i++ ) {
pthread_mutex_lock(&mymutex);
j=myglobal;
j=j+1;
printf(".");
fflush(stdout);
sleep(1);
myglobal=j;
pthread_mutex_unlock(&mymutex);
}
return NULL;
}
int main(void) {
pthread_t mythread;
int i;
if ( pthread_create( &mythread, NULL, thread_function, NULL) ) {
printf("error creating thread.");
abort();
}
for ( i=0; i<20; i++) {
pthread_mutex_lock(&mymutex);
myglobal=myglobal+1;
pthread_mutex_unlock(&mymutex);
printf("o");
fflush(stdout);
sleep(1);
}
if ( pthread_join ( mythread, NULL ) ) {
printf("error joining thread.");
abort();
}
printf("\nmyglobal equals %d\n",myglobal);
exit(0);
}
兩條執行緒不能同時被一個互斥鎖鎖住,如果執行緒A想要鎖一個mutex然而執行緒B已經鎖住該mutex,A會sleep。
當B釋出這個mutex(透過pthread_mutex_unlock()),A即可鎖住該mutex。
亦即當一個mutex被鎖住,所有想要鎖的執行緒會被queue up。
pthread_mutex_lock()以及pthread_mutex_unlock()確保一個資料結構同時只能被一個執行緒存取,用來保護該資料結構。
參考資料:記憶體映射函數 mmap
把文件內容映射到一段記憶體上(準確說是虛擬記憶體上),通過對這段記憶體的讀取和修改,實現對文件的讀取和修改。
void *mmap(void *start,size_t length, int prot,
int flags, int fd, off_t offsize);
1.參數start:指向欲映射的核心起始位址,通常設為NULL,代表讓系統自動選定位址,核心會自己在行程位址空間中選擇合適的位址建立映射。
映射成功後返回該位址。如果不是NULL,則給核心一個提示,應該從什麼位址開始映射,核心會選擇start之上的某個合適的位址開始映射。
建立映射後,真正的映射位址通過返回值可以得到。
2.參數length:代表映射的大小。將文件的多大長度映射到記憶體。
3.參數prot:映射區域的保護方式。
可以為以下幾種方式的組合:
PROT_EXEC
映射區域可被執行
PROT_READ
映射區域可被讀取
PROT_WRITE
映射區域可被寫入
PROT_NONE
映射區域不能存取
4.參數flags:影響映射區域的各種特性。在調用mmap()時必須要指定MAP_SHARED 或MAP_PRIVATE。
MAP_FIXED 如果參數start所指的位址無法成功建立映射時,則放棄映射,不對位址做修正。通常不鼓勵用此旗標。
MAP_SHARED 允許其他映射該文件的行程共享,對映射區域的寫入數據會複製回文件。
MAP_PRIVATE 不允許其他映射該文件的行程共享,對映射區域的寫入操作會產生一個映射的複製(copy-on-write),對此區域所做的修改不會寫回原文件。
MAP_ANONYMOUS 建立匿名映射。此時會忽略參數fd,不涉及文件,而且映射區域無法和其他進程共享。
MAP_DENYWRITE 只允許對映射區域的寫入操作,其他對文件直接寫入的操作將會被拒絕。
MAP_LOCKED 將映射區域鎖定住,這表示該區域不會被置換(swap)。
5.參數fd:由open返回的文件描述符,代表要映射到核心中的文件。如果使用匿名核心映射時,即flags中設置了MAP_ANONYMOUS,fd設為-1。
有些系統不支持匿名核心映射,則可以使用fopen打開/dev/zero文件,然後對該文件進行映射,可以同樣達到匿名核心映射的效果。
6.參數offset:從文件映射開始處的偏移量,通常為0,代表從文件最前方開始映射。
offset必須是分頁大小的整數倍(在32位體系統結構上通常是4K)。
1.用open系統調用打開文件, 並返回描述符fd.
2.用mmap建立記憶體映射, 並返回映射開始的地址指標start.
3.對映射(文件)進行各種操作
4.用munmap(void *start, size_t lenght)關閉記憶體映射.
5.用close系統調用關閉文件fd.
首先將原始字典檔DICT_FILE
輸出為一個「每筆資料長度為MAX_LAST_NAME_SIZE,剩餘補\0
」的align.txt檔。
利用open函式打開文件檔align.txt,存取到fd
int fd = open(ALIGN_FILE, O_RDONLY | O_NONBLOCK);
O_RDONLY:以只能讀取方式打開文件
O_NONBLOCK 以不可阻斷的方式打開文件,也就是無論有無數據讀取或等待,都會立刻返回行程之中。
利用mmap函式將fd映射到指標map所指到的記憶體空間。
char *map = mmap(NULL, fs, PROT_READ, MAP_SHARED, fd, 0);
由於程式切成THREAD_NUM個執行緒執行
以下將他們接起來
for (int i = 0; i < THREAD_NUM; i++) {
if (i == 0) {
pHead = app[i]->pHead->pNext;
dprintf("Connect %d head string %s %p\n", i,
app[i]->pHead->pNext->lastName, app[i]->ptr);
} else {
etmp->pNext = app[i]->pHead->pNext;
dprintf("Connect %d head string %s %p\n", i,
app[i]->pHead->pNext->lastName, app[i]->ptr);
}
etmp = app[i]->pLast;
dprintf("Connect %d tail string %s %p\n", i,
app[i]->pLast->lastName, app[i]->ptr);
dprintf("round %d\n", i);
}
assert(argc == 4 && "./main orgfName modfName Padded")
:若符合條件式則程式可以繼續執行;否則會秀出維護錯誤訊息的字串,並結束程式。
參考資料:基於pthread實現的簡單threadpool、wikipedia
有一組預先生成的thread,有個管理員來管理和調度這些thread,方法為管理員管理一個任務佇列(job queue),如果接收到新的任務就將其加到佇列尾,每個thread盯著佇列,如果佇列不是空的,就去佇列頭拿一個任務來處理(每個任務只能被一個thread拿到),處理完了就繼續去佇列取任務。
如果沒有任務,thread就會休眠,直到任務佇列不是空的。
如果這個管理員夠聰明,他可能會在沒有任務或任務少的時候,減少thread數,任務過多時增加thread數,實現資源的動態管理。
Thread Pool 本身,他必須存放所有的 Thread 與任務佇列。
在建立threadpool的時候,預先指定thread的數量,然後去job queue取添加進來的task進行處理就好。
主要定義兩個結構
1.threadpool_task_t
(in threadpool.c)
用於保存一個等待執行的task,包含要執行的函數及其參數。
typedef struct {
void (*function)(void *);
void *argument;
} threadpool_task_t;
2.threadpool_t
(in threadpool.c)
struct threadpool_t {
pthread_mutex_t lock; //互斥鎖
pthread_cond_t notify; //條件變數
pthread_t *threads; //執行續陣列的起始指標
threadpool_task_t *queue; //任務佇列的起始指標
int thread_count; //執行緒數量
int queue_size; //任務佇列長度
int head; //目前任務佇列頭
int tail; //目前任務佇列尾
int count; //目前等待執行的任務數
int shutdown; //threadpool當前狀態是否為關閉
int started; //正在執行的thread數
};
函數
threadpool_create
:建立threadpool。
threadpool_t *threadpool_create(int thread_count, int queue_size,
int flags)
threadpool_add
:添加需要執行的任務。第二個參數為對應函數指標,第三個函數為對應函數參數。
int threadpool_add(threadpool_t *pool, void (*function)(void *),
void *argument, int flags)
threadpool_destory
:銷毀存在的threadpool,若flad=1 為graceful結束,即等待全部做完才結束;若flag=0為immediate結束,即立即結束。int threadpool_destroy(threadpool_t *pool, int flags)
threadpool_free
:釋放threadpool所申請的記憶體空間。int threadpool_free(threadpool_t *pool)
threadpool_thread
:thread pool每個thread所執行的函數。static void *threadpool_thread(void *threadpool)
程式執行後效能比較圖
參考資料: Concurrency系列
包括value computation:對一串運算式計算之結果;side effect:對物件修改的狀態。
對同一個thread下,求值順序關係的描述。
* 若A sequenced before B: 代表A求值會先完成才會對B求值。
* i++
後置運算子,value computation會先於side effect。
* ++i
前置運算子,則相反。
發生在兩個不同thread間的同步行為,當A synchronized-with B的時,代表A對記憶體操作的效果,對於B是可見的。而A和B是兩個不同的thread的某個操作。
1.對於每個獨立的處理單元,執行時都維持程式的順序(Program Order)
2.整個程式以某種順序在所有處理器上執行
作業詳細資訊A06:phonebook-concurrent
RayPan
A06:phonebook-concurrent