andy19950
>pthread_mutexattr_init()
:設定 mutex 的屬性,由下列兩個參數來決定:
PTHREAD_PROCESS_PRIVATE
: 只能被該 process 使用PTHREAD_PROCESS_SHARED
: 可以被不同 process 使用pthread_mutex_init()
新增 mutex lock,必須傳入兩個參數:
pthread_mutex_t mp
:要使用的 mutex lock,通常帶入函式庫定義好的變數 PTHREAD_MUTEX_INITIALIZER
pthread_mutexattr_t mattr
:必須先使用 pthread_mutexattr_init()
來初始化 attribute 或 NULL
使用函式庫的 default attributepthread_mutex_lock()
:將 mutex lock 鎖起來,要傳入 mutex lock 的記憶體位置,也就是剛剛宣告的 &mp
pthread_mutex_unlock()
:將 mutex lock 解鎖,用法跟上面一樣傳入 &mp
pthread_mutex_t mp = PTHREAD_MUTEX_INITIALIZER;
pthread_mutexattr_t mattr = PTHREAD_PROCESS_SHARED;
pthread_mutexattr_init(&mattr); //設定 mutex lock 屬性
pthread_mutex_init(&mp, &mattr); //初始化 mutex lock
pthread_mutex_lock(&mp); //鎖起來
..
critical section
..
pthread_mutex_unlock(&mp); //解鎖
pthread_condatrr_init()
:設定條件的屬性,跟 mutex 一樣有兩種參數,用法也一樣。pthread_cond_init()
: 新增 condition variable 必須傳入兩個參數:
pthread_cond_t cv
:要使用的 condition variable,通常帶入函式庫定義的變數 PTHREAD_COND_INITIALIZER
pthread_condattr_t cattr
:必須先使用 pthread_condattr_init()
來初始化 attribute 或 NULL
使用函式庫的 default attributepthread_cond_wait()
: 若條件不符,就把 thread block 起來,必須傳入 condition variabel(&cv)
以及 mutex lock(&mp)
pthread_cond_signal()
: 將 blocked thread 叫醒讓它繼續執行。
pthread_mutex_t count_lock;
pthread_cond_t count_nonzero;
unsigned count;
Consumer() {
pthread_mutex_lock(&count_lock);
while (count == 0) //如果沒有工作了,就把 thread block 起來
pthread_cond_wait(&count_nonzero, &count_lock);
count = count - 1;
pthread_mutex_unlock(&count_lock);
}
Producer() {
pthread_mutex_lock(&count_lock);
if (count == 0) //增加工作後把 consumer 叫醒
pthread_cond_signal(&count_nonzero);
count = count + 1;
pthread_mutex_unlock(&count_lock);
}
sem_init(sem_t *sem, int pshared, int value)
sem
:semaphore object 指標pshared
:決定 semaphore 能不能被其他 fork process 使用value
: 決定 semaphore 的數量限制sem_wait(sem_t *sem)
:如果 semaphore value 為負數,代表已經達到數量上限,該 thread 就會被 block 起來。sem_post(sem_t *sem)
:把 semaphore value +1 然後把 blocked thread 叫醒繼續工作。./sort [thread_num] [data_num]
,並且要手動輸入數字,我們要把它改成讀 input.txt 當作輸入。output.txt
char *
,如果想實做各種資料型態都可以 sort 的話,可以把型態改成 void *
,然後比照 qsort 讓使用者傳入比較函式 compare()。typedef intptr_t val_t
typedef char *val_t
#define MAX_LAST_NAME_SIZE 16
typedef struct node {
//必須給定資料空間或者每次都 malloc 空間給它
val_t data[MAX_LAST_NAME_SIZE];
struct node *next;
} node_t;
./sort [thread_num] [input_file]
\n -> \0
while(fgets(line, sizeof(line), fp) != NULL) {
line[strlen(line)-1] = '\0';
list_add(the_list, line);
}
merge_list()
裡面有一個比較 a & b 的地方,這邊的作法是:small = a * (..省略..) + b * (..省略..);
small = a*1 + b*0 = a
,反之 small = b
,這是一個很聰明的方法可以避免使用 if else 所產生的 branch。
int cmp = strcmp(a->head->data, b->head->data);
llist_t *small = (llist_t *)
((intptr_t *) a * (cmp <= 0) +
(intptr_t *) b * (cmp > 0));
list_print()
把每個節點走訪一次然後印出來,為了我們驗證方便,我把它輸出的型態改為 "%s\n"
這樣比較好跟原始資料做比較。output.txt
裡面。data_correctness()
void data_correctness()
{
FILE *result, *answer;
result = fopen("output.txt", "r");
answer = fopen("words.txt", "r");
char a[BUF_SIZ], b[BUF_SIZ];
while(fgets(a, sizeof(a), answer) != NULL){
fgets(b, sizeof(b), result);
assert(!strcmp(a, b) && "ERROR : Some name is not in the linked-list!!");
}
printf("CORRECT : Every name is in the linked-list!!\n");
}
input unsorted data line-by-line
sorted results in output file: output.txt
CORRECT : Every name is in the linked-list!!
thread | cache miss | cache reference | miss rate |
---|---|---|---|
1 | 27,821,458 | 48,255,510 | 63% |
2 | 27,742,811 | 47,788,466 | 61% |
4 | 28,409,123 | 109,737,179 | 30% |
8 | 28,358,122 | 138,450,096 | 23% |
tpool_init()
內給定the_pool->queue
空間大小。tqueue_init()
初始化每個 work queue。pthread_create()
的第四個參數(原本是NULL
),讓每個 thread 可以知道自己的 queue 的位置。
the_pool->queue = (tqueue_t *) malloc(sizeof(tqueue_t) * tcount);
for (uint32_t i = 0; i < tcount; ++i) {
tqueue_init(the_pool->queue + i);
pthread_create(&(the_pool->threads[i]), &attr, func,
(void *)(the_pool->queue + i));
}
task_run()
時讓每個 thread 都跟自己的 queue 拿工作pthread_create()
時有把每個 thread 的 queue 的位置傳進來,所以在這邊可以把 void *data
轉成原本的tqueue_t *_queue
來取得指定的 queueNULL
的 func 為止。
static void *task_run(void *data)
{
tqueue_t *_queue = (tqueue_t *) data;
...
}
因為原本只有一個 queue 所以在 merge 的最後只有給一次 NULL
的 func 到唯一的 queue 裡面,但這邊我們使用多個 queue 所以在 merge 的最後也要使用 for loop 來給每個 queue NULL
的 func 才能讓每個 thread 結束執行,否則整支程式會因為某個 thread 不斷執行這個 while 而卡住。
else { //進入這個 else 代表 linked-list 已經排序完畢,要結束所有 thread 並印出結果。
the_list = _list;
task_t *_task = (task_t *) malloc(sizeof(task_t));
_task->func = NULL;
for(int i=0; i<pool->count; i++)
tqueue_push(pool->queue + i, _task);
list_print(_list);
}
tqueue_push
指派工作到不同的 work queuetqueue_push()
來指派工作到 queue 裡面,但因為我們實做出多個 queue,所以必須要讓每次呼叫 tqueue_push()
時把工作指派到不同的 queue。static uint32_t now = 0
,剛開始的時候指派到第一個 queue。tqueue_push(pool->queue + now++%thread_count, _task);
uint32_t
的關係所以不會有 integer overflow 的問題,就算同時有不同的 thread 使用這行頂多只是分派到同一個 work queue 不會有太大的問題。這句解讀的方式很奇怪,沒說錯嗎? jserv
其實我想說的是,跟上面的表格比較起來看似 miss rate 提高了,但其實是因為 cache reference 次數減少,實際上 cache miss 次數差不多 吳庭安
thread | cache miss | cache reference | miss rate |
---|---|---|---|
1 | 27,043,638 | 46,433,299 | 58.42% |
2 | 26,789,642 | 46,633,605 | 57.44% |
4 | 26,811,962 | 46,982,814 | 57.21% |
8 | 27,058,363 | 47,194,774 | 57.33% |
cut_func merge
,但真正花時間的 merge_sort merge_list
卻沒有切成小的工作單位給每個 queue 去執行。為何不用 perf 確認呢?
可一併分析 gprof 輸出 jserv
要先改善 lock contention 的問題,否則執行緒數量和處理器核心增加也無助於效能 jserv