#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
int myglobal;
void *thread_function(void *arg)
{
for (int i = 0; i < 20; i++) {
int j = myglobal;
j = j + 1;
printf(".");
fflush(stdout);
sleep(1);
myglobal = j;
}
return NULL;
}
int main(void)
{
pthread_t mythread;
if (pthread_create(&mythread, NULL,
thread_function, NULL)) {
printf("error creating thread.");
abort();
}
for (int i = 0; i < 20; i++) {
myglobal = myglobal + 1;
printf("o");
fflush(stdout);
sleep(1);
}
if (pthread_join(mythread, NULL)) {
printf("error joining thread.");
abort();
}
printf("\nmyglobal equals %d\n",myglobal);
exit(0);
}
執行結果
o.o.o.o.o..o.o.o.o.o.o.o.oo..oo.o..o.oo.
myglobal equals 21
myglobal 起始值為 0,主執行緒與新執行緒各將 myglobal 增加 20,照理說應該是 40,但輸出 21 原因為何?
因為兩條執行緒同時執行,當 thread_fucntion 把 j 值寫回 myglobal,他會複寫主執行緒中的修改。
此便為程式同步執行所造成的競爭危害問題。
將 concurrent / concurrency 統一翻譯為「並行」 jserv
兩個工作同時存記憶體的過程中,並不會即時更新記憶體資料,造成最後用舊的資料計算,產生錯誤結果。
共用資源被並行存取可能造成不可預期的錯誤,所以有些程式共用資源的存取會被保護,這些保護區域就稱為臨界區域。其只能被一個 process 執行。
解決 critical section 必須滿足3個條件:
解決 critical section 的方法:
以硬體方式解決,藉由 lock 來保護 critical section。
CAS(*位址, 舊值, 新值){
if(*位址!=舊值)
return 0 //失敗
*位址 = 新值
return 1 //成功
}
論文研讀:A Pragmatic Implementation Of Non-Blocking Linked-Lists
This paper present a novel implementation of linked-lists which is non-blocking, linearizable and which is based on the compare-and-swap(CAS)operation found on contemporary processoers.
問題:若是在移除節點(10)的同時,插入新節點(20),可能會造成新插入節點的遺失
解決:作者提出使用兩個分開的 CAS (compare and swap) 分兩階段移除節點
在 Thread-Per-Message 模式中,當每次請求來到,就建立一個新的執行緒,用完就不再使用,然而執行緒的建立需要系統資源,對於一個接受許多請求的情況,不斷的建立新執行緒,會導致系統效能的降低。
搭配閱讀: Thread pools and work queues jserv
若能重複使用所建立的執行緒,而不是用完就丟,可以有效的重複利用資源。在 Worker Thread 模式的範例中,預先建立好執行緒,當請求佇列有新請求時,通知等待的執行緒取出請求進行處理,其實就是一種重複使用執行緒的方式。
示意圖如下:
digraph hierarchy {
nodesep=0.5 // increases the separation between nodes
node [color=black,fontname=Courier,shape=box] //All nodes will this shape and colour
edge [color=black, style=dashed] // All the lines look like this
Manager->{Worker1 Worker2 Worker3}
}
為了分配工作,在 worker thread 實作 Task 的機制。每個主要的操作會先被放進 task queue 裡頭,空閒的 thread 再從 task queue 裡頭提取 task 執行,如下面的步驟:
digraph {
開始Process[shape="box", style=rounded];
是否為結束Task[shape="diamond"];
結束Process[shape="box", style=rounded];
開始Process->提取Task->是否為結束Task
是否為結束Task->重發結束Task[label="是"]
重發結束Task->結束Process
是否為結束Task->執行Task[label="否"]
執行Task->提取Task;
}
大多數 thread pool 實做都離不開 lock 的使用,如 pthread_mutex 結合 (condition variable) pthread_cond。一般來說,lock 的使用對於程式效能影響較大,雖然現有的 pthread_mutex 在 lock 的取得 (acquire) 與釋放,已在 Linux 核心和對應函式庫進行效能提昇,但我們仍會希望有不仰賴 lock 的 thread pool 的實做。
如上圖所示,workqueue (工作佇列) 由 main thread (主執行緒) 和 worker thread 共享,main thread 將任務放進 workqueue,worker thread 從 workqueue 中取出任務執行。要注意到,共享 workqueue 的操作必須在 mutex 的保護下安全進行,main thread 將任務放進 workqueue 時,若偵測到目前待執行的工作數目小於 worker thread 總數,則要透過 condition variable 喚醒可能處於等待狀態的 worker thread。
為解決 lock-free 所面臨的議題,我們一定要避免共享資源的競爭 (contention),因此將共享 workqueue 加以拆分成每 worker thread 一個 workqueue 的方式,如上圖。
沒有顯著進展!你們到底做了什麼? jserv
mutrace: Total runtime is 0.734 ms.
mutrace: Total runtime is 1.097 ms.
發現thread開越多時間越長儂偉
Mutex # Locked Changed Cont. tot.Time[ms] avg.Time[ms] max.Time[ms] Flags
1 177 43 27 0.048 0.000 0.002 Mx.--.
0 20 15 9 0.011 0.001 0.001 M-.--.
2 13 8 0 0.004 0.000 0.000 M-.--.
||||||
/|||||
Object: M = Mutex, W = RWLock /||||
State: x = dead, ! = inconsistent /|||
Use: R = used in realtime thread /||
Mutex Type: r = RECURSIVE, e = ERRRORCHECK, a = ADAPTIVE /|
Mutex Protocol: i = INHERIT, p = PROTECT /
RWLock Kind: r = PREFER_READER, w = PREFER_WRITER, W = PREFER_WRITER_NONREC
mutrace: Note that the flags column R is only valid in --track-rt mode!
需要詳述優化策略,以及實作 fine-grained locks 和接著做到 lock-free jserv
著手修改 task_run
pthread_mutex_lock(&(pool->mutex));
while ( pool->size == 0) {
pthread_cond_wait(&(pool->cond), &(pool->mutex));
}
if (pool->size == 0)
break;
pthread_mutex_unlock(&(pool->mutex));
task_t *_task = tqueue_pop(pool);
Mutex # Locked Changed Cont. tot.Time[ms] avg.Time[ms] max.Time[ms] Flags
1 20 17 5 0.009 0.000 0.002 M-.--.
0 13 9 2 0.020 0.002 0.007 Mx.--.
||||||
/|||||
Object: M = Mutex, W = RWLock /||||
State: x = dead, ! = inconsistent /|||
Use: R = used in realtime thread /||
Mutex Type: r = RECURSIVE, e = ERRRORCHECK, a = ADAPTIVE /|
Mutex Protocol: i = INHERIT, p = PROTECT /
RWLock Kind: r = PREFER_READER, w = PREFER_WRITER, W = PREFER_WRITER_NONREC
mutrace: Note that the flags column R is only valid in --track-rt mode!
發現lock數量大大減少儂偉
./sort 1 8
input unsorted data line-by-line
sorted results:
[8] [7] [6] [5] [4] [3] [2] [1]
sorted results:
[1] [2] [3] [4] [5] [6] [7] [8]
./sort 2 8
input unsorted data line-by-line
sorted results:
[8] [7] [6] [5] [4] [3] [2] [1]
只要開超過2個thread就執行失敗,待解決儂偉
發現原因mbrossard 的 ThreadPool 直接在cut套用會產生鎖兩次 造成死鎖,重新動手改原本的thread_pool儂偉
typedef struct {
pthread_t *threads;
uint32_t count;
task_t *head, *tail;
pthread_mutex_t mutex;
pthread_cond_t cond;
uint32_t size;
} tpool_t;
bench:intput.txt
for i in 1 2 4 8 16 32 64; do \
while read line;do \ #逐行讀取
echo $$line ; \
done < intput.txt | ./sort $$i $ 10000 ;\
done # '<':將原本需要由鍵盤輸入的資料,改由檔案內容('input.txt')來取代
# '|':管線命令。僅能處理經由前面一個指令傳來的正確資訊,也就是 standard output 的資訊,對於 stdandard error 並沒有直接處理的能力。
random_num: random_num.c
$(CC) $(CFLAGS) $^ -o $@
intput.txt: random_num
./random_num $ 10000
/*random_num.c*/
srand(time(NULL));
int num_count, i;
FILE *input = fopen("input.txt", "w");
num_count = atoi(argv[1]);
int *arr;
int pos , tmp;
arr = (int *) malloc(sizeof(int) * num_count);
for (i = 0; i < num_count; i++) //產生num_count筆數據
arr[i] = i + 1;
for (i = 0; i < num_count; i++) { //進行洗牌
pos = rand() % num_count;
tmp = arr[i];
arr[i] = arr [pos];
arr[pos] = tmp; //進行arr[i]及arr[pos]交換
}
for (i = 0; i < num_count; i++)
fprintf(input, "%d\n", arr[i]);
free(arr);
fclose(input);
程式碼縮排為 4 個空白,不是 Tab,請修正 jserv
已修正儂偉
由於實驗用CPU為4thread,所以4thread最快,並且超過4的時間就會越長儂偉
coarse 和 fine-grained 互為反義字,前者表示較為粗糙的切割和處理方式,而後者則較為細緻,以 lock 來說,就是「大而粗略」和「細緻且多處」的差異。 jserv
做實驗了嗎?你們設計哪些實驗?不要只是「看」,這樣無從驗證自己的認知 jserv
提醒中英文關鍵字間要以空白區隔喔!課程助教
Mutex # Locked Changed Cont. tot.Time[ms] avg.Time[ms] max.Time[ms] Flags
1 177 43 27 0.048 0.000 0.002 Mx.--.
0 20 15 9 0.011 0.001 0.001 M-.--.
2 13 8 0 0.004 0.000 0.000 M-.--.
||||||
/|||||
Object: M = Mutex, W = RWLock /||||
State: x = dead, ! = inconsistent /|||
Use: R = used in realtime thread /||
Mutex Type: r = RECURSIVE, e = ERRRORCHECK, a = ADAPTIVE /|
Mutex Protocol: i = INHERIT, p = PROTECT /
RWLock Kind: r = PREFER_READER, w = PREFER_WRITER, W = PREFER_WRITER_NONREC
mutrace: Note that the flags column R is only valid in --track-rt mode!
Mutex # Locked Changed Cont. tot.Time[ms] avg.Time[ms] max.Time[ms] Flags
1 53 30 11 0.070 0.001 0.016 M-.--.
0 20 12 3 0.009 0.000 0.002 M-.--.
2 13 5 0 0.004 0.000 0.000 M-.--.
||||||
/|||||
Object: M = Mutex, W = RWLock /||||
State: x = dead, ! = inconsistent /|||
Use: R = used in realtime thread /||
Mutex Type: r = RECURSIVE, e = ERRRORCHECK, a = ADAPTIVE /|
Mutex Protocol: i = INHERIT, p = PROTECT /
RWLock Kind: r = PREFER_READER, w = PREFER_WRITER, W = PREFER_WRITER_NONREC
mutrace: Note that the flags column R is only valid in --track-rt mode!
發現lock數量跟contend都大幅降低儂偉
說好的程式碼呢? jserv
static void *task_run(void *data)
{
(void) data;
while (1) {
task_t *ret , *ret_last, *ret_next, *the_pool_head, *tmp;
ret = tmp = pool->tail;
ret_last = ret->last;
ret_next = ret->next;
the_pool_head = pool->head;
if (ret != NULL && !is_marked_ref(ret)) {
if (CAS_PTR(&pool->tail, ret, ret_last) == ret) {
ret = get_unmarked_ref(ret_last);
ret_last = ret->last;
ret_next = ret->next;
if (ret && !is_marked_ref(ret_next)) {
if (CAS_PTR(&ret->next, ret_next, NULL) == ret_next) {
ret->next = NULL;
FAD_U32(&(pool->size));
task_t *_task = tmp;
if (_task) {
if (!_task->func) {
tqueue_push(pool, _task);
break;
} else {
_task->func(_task->arg);
free(_task);
}
}
}
}
else if (!ret && !is_marked_ref(the_pool_head)) {
if (CAS_PTR(&pool->head, the_pool_head, NULL) == the_pool_head) {
FAD_U32(&(pool->size));
task_t *_task = tmp;
if (_task) {
if (!_task->func) {
tqueue_push(pool, _task);
break;
} else {
_task->func(_task->arg);
free(_task);
}
}
}
}
}
}
else if (ret == NULL) {
task_t *_task = tmp;
if (_task) {
if (!_task->func) {
tqueue_push(pool, _task);
break;
} else {
_task->func(_task->arg);
free(_task);
}
}
}
}
pthread_exit(NULL);
}
目前 mark 部份仍有問題 解決中…
讀書:
參考資料