contributed by <carolc0708
>
將 merge sort 的實做改為可接受 phonebook-concurrent 的 35 萬筆資料輸入的資料檔
sort -R
處理過[ ]研究 thread pool 管理 worker thread 的實做,提出實做層面的不足,並且參照 concurrent-ll,提出 lock-free 的實做
學習 concurrent-ll (concurrent linked-list 實作) 的 scalability 分析方式,透過 gnuplot 製圖比較 merge sort 在不同執行緒數量操作的效能
一併嘗試重構 (refactor) 給定的程式碼,使得程式更容易閱讀和維護。延續 A05: introspect,不只是在共筆上用文字提出良性詳盡的批評,也該反映在程式碼的變革
_Atomic
改寫。參考資料:
計算merge sort 的空間複雜度,建立memory pool給merge sort 使用,分析完後配置最大空間,但是同一時間可能會同時有兩個地方在merge,防範overlap ,要把記憶體切成若干區段,最後的資料搬動(找memory pool的實作),malloc的回收排序的資料在記憶體中不連續(找link-list cache)
main()
/* The boss */
{
forever {
get a request
switch request
case X : pthread_create( ... taskX)
case Y : pthread_create( ... taskY)
.
.
.
}
}
taskX() /* Workers processing requests of type X */
{
perform the task, synchronize as needed if accessing shared resources
done
}
taskY() /* Workers processing requests of type Y */
{
perform the task, synchronize as needed if accessing shared resources
done
}
main()
/* The boss */
{
for the number of workers
pthread_create( ... pool_base )
forever {
get a request
place request in work queue
signal sleeping threads that work is available
}
}
pool_base() /* All workers */
{
forever {
sleep until awoken by boss
dequeue a work request
switch
case request X: taskX()
case request Y: taskY()
.
.
.
}
}
main()
{
pthread_create( ... thread1 ... task1 )
pthread_create( ... thread2 ... task2 )
.
.
.
signal all workers to start
wait for all workers to finish
do any clean up
}
task1()
{
wait for start
perform task, synchronize as needed if accessing shared resources
done
}
task2()
{
wait for start
perform task, synchronize as needed if accessing shared resources
done
}
The pipeline model assumes:
Applications in which the pipeline might be useful are image processing and text processing or any application that can be broken down into a series of filter steps on a stream of input.
main()
{
pthread_create( ... stage1 )
pthread_create( ... stage2 )
.
.
.
wait for all pipeline threads to finish
do any clean up
}
stage1()
{
forever {
get next input for the program
do stage 1 processing of the input
pass result to next thread in pipeline
}
}
stage2()
{
forever {
get input from previous thread in pipeline
do stage 2 processing of the input
pass result to next thread in pipeline
}
}
stageN()
{
forever {
get input from previous thread in pipeline
do stage N processing to the input
pass result to program output
}
}
Lock contention: this occurs whenever one process or thread attempts to acquire a lock held by another process or thread. The more fine-grained the available locks, the less likely one process/thread will request a lock held by the other. (For example, locking a row rather than the entire table, or locking a cell rather than the entire row.)
mutrace v.s. valgrind/drd
In contrast to valgrind/drd it does not virtualize the CPU instruction set, making it a lot faster. In fact, the hooks mutrace relies on to profile mutex operations should only minimally influence application runtime. Mutrace is not useful for finding synchronizations bugs, it is solely useful for profiling locks.
About mutrace
-rdynamic
to make the backtraces mutrace generates useful.-r
| –track-rt: checks on each mutex operation wheter it is executed by a realtime thread or not.matrace
that can be used to track down memory allocation operations in realtime threads.將原始碼抓下來看有更多詳細內容可以參考
$ git clone http://git.0pointer.net/clone/mutrace.git
mutrace.in
裡面可以看到其他不同 flags 的功能matrace
的內容可以參考安裝 mutrace
$ sudo apt-get install mutrace
mergesort-concurrent
程式執行看看:(for i in {1..8}; do echo $RANDOM; done) | mutrace ./sort 4 8
結果如下:
mutrace: 0.2 sucessfully initialized for process sort (pid 4814).
input unsorted data line-by-line
sorted results:
[9602] [10669] [11911] [16147] [19714] [22776] [22870] [32443]
mutrace: Showing statistics for process sort (pid 4814).
mutrace: 3 mutexes used.
Mutex #1 (0x0x216c9b0) first referenced by:
/usr/lib/mutrace/libmutrace.so(pthread_mutex_init+0xf2) [0x7f5a530484b2]
./sort(tqueue_init+0x38) [0x401277]
./sort(tpool_init+0x6a) [0x4014cc]
./sort(main+0x161) [0x401c74]
/lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xf0) [0x7f5a52a7f830]
Mutex #0 (0x0x7f5a5064e860) first referenced by:
/usr/lib/mutrace/libmutrace.so(pthread_mutex_lock+0x49) [0x7f5a530486b9]
/lib/x86_64-linux-gnu/libgcc_s.so.1(_Unwind_Find_FDE+0x2c) [0x7f5a5044afec]
[(nil)]
mutrace: Showing 2 most contended mutexes:
Mutex # Locked Changed Cont. tot.Time[ms] avg.Time[ms] max.Time[ms] Flags
1 62 5 1 0.005 0.000 0.001 Mx.--.
0 20 3 0 0.002 0.000 0.000 M-.--.
||||||
/|||||
Object: M = Mutex, W = RWLock /||||
State: x = dead, ! = inconsistent /|||
Use: R = used in realtime thread /||
Mutex Type: r = RECURSIVE, e = ERRRORCHECK, a = ADAPTIVE /|
Mutex Protocol: i = INHERIT, p = PROTECT /
RWLock Kind: r = PREFER_READER, w = PREFER_WRITER, W = PREFER_WRITER_NONREC
mutrace: Note that the flags column R is only valid in --track-rt mode!
mutrace: Total runtime is 0.375 ms.
mutrace: Results for SMP with 4 processors.
表格意思:
Locked
: how often the mutex was locked during the entire runtime.Changed
: how often the owning thread of the mutex changed.Cont.
: how often the lock was already taken when we tried to take it and we had to wait.tot.Time[ms]
: for how long during the entire runtime the lock was lockedavg.Time[ms]
: the average lock timemax.Time[ms]
: the longest time the lock was held.Flags
: what kind of mutex this is (recursive, normal or otherwise).可用 addr2line
來找到實際對應的程式行數:
-g
參數,確保包含 debug info 的執行檔正確產生。以 (tqueue_init+0x38) [0x401277]
這個地址來說,對應的原始程式碼為:$ addr2line -e sort 0x401277
/home/carol/mergesort-concurrent/threadpool.c:15
我用 0x38 結果是
??:0
Carol Chen
MergeSort(headRef)
list.h
typedef intptr_t val_t;
定義 intptr_t
的目的是讓 node 中的資料可以是指標型態,也可以是一般資料型態。
When/Why to use intptr_t in type-casting: 我們再將 pointer 轉成 integer 時,32 bits 或 64 bits 的 machine 下會有 pointer 長度的差別。會使用到 intptr_t
,是因為它能夠做到以下:
#ifdef MACHINE 64
typedef long intptr;
#else // MACHINE 32
typedef int intptr;
#endif
往後我們只需 #include <stdint>
就能做到上述,免除麻煩的型態轉換。除此之外,也是確保型態轉換上不會出問題的保守作法。
參照 Stackoverflow 對於 intptr_t
的討論: 相較於 void *
,intptr_t
的位址可以做 bitwise operation 而 void *
不能。然而,若要做 bitwise operation,使用 uintptr_t
會比 intptr_t
更佳。
typedef struct node {
val_t data;
struct node *next;
} node_t;
一般的 linked-list 資料型態,每個 node 當中會有儲存自己 data
的部分和指向下一個 node 的指標 next
。
typedef struct llist {
node_t *head;
uint32_t size;
} llist_t;
一串 linked-list 必須要紀錄 head
的位置,在這裡還多將 linked-list 的長度 size
包入資料結構中。
list.c
//在 next 前方放入一個新的 node,並回傳指向該 node 之指標
static node_t *node_new(val_t val, node_t *next);
//初始化一串新的 linked-list
llist_t *list_new();
//在 linked-list 的尾端加入一個新的 node
int list_add(llist_t *the_list, val_t val);
//check if value already exist
node_t *cur = list->head;
while(cur){
if(cur-> data == val) return list;
cur = cur->next;
}
list_add
做完之後回傳 0 並沒有太大的意義,回傳值須跟著狀況調整。llist
指標//回傳 linked-list 中,第 index 個 node 的指標
node_t *list_nth(llist_t *the_list, uint32_t index);
list->size-1
if(idx > list->size-1)
head
改為 cur
if(idx == 0) return cur;
//印出整串 linked-list 內容
void list_print(llist_t *the_list);
threadpool.h
typedef struct _task {
void (*func)(void *);
void *arg;
struct _task *next, *last;
} task_t;
定義 task 型態,基本上會包含工作函式*func
和傳入參數 *arg
。除此之外,task queue 結構中,單一 task 的前後指標 *next
和 *last
也一併定義在這裡。
typedef struct {
task_t *head, *tail;
pthread_mutex_t mutex;
pthread_cond_t cond;
uint32_t size;
} tqueue_t;
task queue 的型態。 紀錄 *head
和 *tail
的 task、queue 的大小 size
, mutex
以及 condition variable cond
。
typedef struct {
pthread_t *threads;
uint32_t count;
tqueue_t *queue;
} tpool_t;
threadpool 的型態。當中定義了指向所有 pthread 的指標 *threads
、 thread 的數目 count
、指向 task queue 的指標 *queue
。
threadpool.c
//釋放單一 task 先前所配置的記憶體空間
int task_free(task_t *the_task);
//初始化 task queue
int tqueue_init(tqueue_t *the_queue);
tqueue_t
的記憶體空間配置
//將 task queue 中的單一 task 取出派遣給 thread
task_t *tqueue_pop(tqueue_t *the_queue);
tail
改成 head
task_t *tqueue_pop(tqueue_t *the_queue)
{
task_t *ret;
pthread_mutex_lock(&(the_queue->mutex));
ret = the_queue->head;
if (ret) {
the_queue->head = ret->last;//if it is NULL, then let it be
if (the_queue->head) {
the_queue->head->next = NULL;
} else {
the_queue->tail = NULL;
}
the_queue->size--;
}
pthread_mutex_unlock(&(the_queue->mutex));
return ret;
}
//回傳 task queue 當前的大小
uint32_t tqueue_size(tqueue_t *the_queue);
//將新的 task 加入 task queue 中
int tqueue_push(tqueue_t *the_queue, task_t *task);
head
改為 tail
int tqueue_push(tqueue_t *the_queue, task_t *task)
{
if(task == NULL) return -1;
pthread_mutex_lock(&(the_queue->mutex));
task->next = NULL;
task->last = the_queue->tail;
if (the_queue->tail)
the_queue->tail->next = task;
the_queue->tail = task; //only one task in queue
if (the_queue->size++ == 0)
the_queue->head = task;//only one task in queue
pthread_mutex_unlock(&(the_queue->mutex));
return 0;
}
//釋放已配置的 task queue 空間
int tqueue_free(tqueue_t *the_queue);
the_queue
是否存在的情況,否則回傳 -1,並且不在執行下面釋放記憶體部分thread_cond_detroy(&(the_queue->cond));
在 phonebook-concurrent 作業中,參照 mbrossard/threadpool,在 destroy 二者之前有先 lock 住。 不確定為什麼要這樣做,不這樣做會怎樣? 這邊是先沒有 lock。
//初始化 threadpool
int tpool_init(tpool_t *the_pool, uint32_t count, void *(*func)(void *));
//釋放 threadpool 記憶體空間
int tpool_free(tpool_t *the_pool);
the_pool
是否存在的判斷,以及pthread_join()
是否成功的判斷
int tpool_free(tpool_t *the_pool)
{
if(the_pool == NULL) return -1;
for (uint32_t i = 0; i < the_pool->count; ++i)
if(pthread_join(the_pool->threads[i], NULL) != 0) return -1;
free(the_pool->threads);
tqueue_free(the_pool->queue);
return 0;
}
Threadpool 問題總整理
main.c
LanKuDot
同學共筆,看了好久才看懂cut_func()
recursive 切 左右 listmerge_sort()
recursive 切左右 list 並 merge 回來//dispatch the division task to queue
void cut_func(void *data);
main()
中 init_tpool()
後作的第一件事main()
中有定義 max_cut
= MIN(thread_count
, data_count
)-1cut_count
未達 max_cut
或 list->size<1
之前,會將 list 對半切,並將 左右 list 和 cut_func()
作為 task 的 arg
和 func
, push 到 t_queue()
內merge_sort()
並將結果 merge()
_list
和 list
為 llist
和 rlist
增加程式易讀性。也改正左右 list 註解相反錯誤之處。//merge the cross level list
void merge(void *data);
tmp_list
;若進入了,再將 task push 到 t_queue()
中
arg
: merge_list(左,右)
func
: merge()
the_list
紀錄之,並 print_list()
印出
func
push 到 t_queue()
中不太懂這樣做的意義為何? Carol Chen
//cut list to left & right and then merge back
llist_t *merge_sort(llist_t *list);
list
recursive 切左右,直到 list->size<2
merge_list()
merge 好的結果//merge left & right list
llist_t *merge_list(llist_t *a, llist_t *b);
*a
和 *b
list 的 merge//grab one task from task queue and excute
static void *task_run(void *data);
threadpool_thread()
,由 t_queue 中取出一個 task,並派遣給 thread 去執行tpool.c
中int main(int argc, char const *argv[]);
thread_count
、data_count
,計算 max_cut
list
tpool
以及其他全域變數cut_func()
作為第一個 task,push 到 t_queue()
中Tempojiji
同學的共筆 對於自動測試程式的設計,建立一個 input_generator.c
作為產生亂數的程式:
#include <stdio.h>
#include <time.h>
#include <stdlib.h>
int main(int argc, char *argv[])
{
int MAX_SIZE = atoi(argv[1]);
FILE *fp = fopen("input","w+");
srand((long int)time(NULL));
for(int i = 0; i < MAX_SIZE; i++)
fprintf(fp, "%u\n", rand() / 10000);
fclose(fp);
}
main.c
中使用clock_gettime()
量測時間並加入這一段:
# if defined(BENCH)
FILE *fp = fopen("input","r");
long int data;
while((fscanf(fp, "%ld\n", &data)) != EOF){
e = list_add(e,data);
}
fclose(fp);
# endif
.
.
.
#if defined(BENCH)
fp = fopen("output","a+");
if(thread_count == 1) fprintf(fp, "%d,", data_count);
fprintf(fp, "%lf", cpu_time);
if(thread_count == 64) fprintf(fp, "\n");
else fprintf(fp,",");
fclose(fp);
printf("%d %d %lf\n", data_count, thread_count, cpu_time);
#endif
Makefile
:bench:
for i in `seq 100 100 800`; do \
./input_generator $$i; \
for j in 1 2 4 8 16 32 64; do \
./sort $$j $$i; \
done \
done
plot: bench
gnuplot runtime.gp
runtime.gp
繪圖腳本可以看到 thread_num
越大,時間跳動越明顯。
不知道為什麼沒有辦法執行到 10000 個數字以上的 sort,
時間就會卡很久在那邊不動… Carol Chen
scalability
在 wikipedia 上的定義:
在 Stackoverflow 上面的討論:
Scalability is the ability of a program to scale. For example, if you can do something on a small database (say less than 1000 records), a program that is highly scalable would work well on a small set as well as working well on a large set (say millions, or billions of records).
It would have a linear growth of resource requirements. Look up Big-O notation for more details about how programs can require more computation the larger the data input gets. Something parabolic like Big-O(x^2) is far less efficient with large x inputs than something linear like Big-O(x).
LanKuDot
同學的共筆,對於 scalability 的整理,以下是他的筆記:core
,shift
會讓所有參數左移 (原本第二個參數會變第一個)config
及 lock_exec
scriptprog
,剩下後接的參數到 params
core
核心的測試,並輸出與 core 1 的執行結果比較unlock_exec
scriptscalability.sh
scalability1.sh
和 scalability2.sh
差在一次可執行的程式數為 1 個 或 2 個carol@carol-PC:~/concurrent-ll$ scripts/scalability1.sh all ./out/test-lock -i128
## Use 4 as the number of cores.
## If not correct, change it in scripts/config
#cores throughput %linear scalability
1 309985 100.00 1
2 186428 30.07 0.60
3 195184 20.99 0.63
4 2493 0.20 0.01
carol@carol-PC:~/concurrent-ll$ scripts/scalability2.sh all ./out/test-lock ./out/test-lockfree -i100
## Use 4 as the number of cores.
## If not correct, change it in scripts/config
# ./out/test-lock ./out/test-lockfree
#cores throughput %linear scalability throughput %linear scalability
1 310796 100.00 1 464000 100.00 1
2 185595 29.86 0.60 736644 79.38 1.59
3 192483 20.64 0.62 1033290 74.23 2.23
4 8355 0.67 0.03 1039714 56.02 2.24
後面傳入的這個
-i128
和-i100
是什麼意思? Carol Chen
怎麼用它來測自己的程式? Carol Chen
tqueue_pop()
:
task_t *tqueue_pop(tqueue_t *the_queue)
{
task_t *ret;
#ifndef MONITOR
pthread_mutex_lock(&(the_queue->mutex));
#endif
ret = the_queue->head;
if (ret) {
the_queue->head = ret->next;//if it is NULL, then let it be
if (the_queue->head) {
the_queue->head->last = NULL;
} else {
the_queue->tail = NULL;
}
the_queue->size--;
}
#ifndef MONITOR
pthread_mutex_unlock(&(the_queue->mutex));
#endif
return ret;
}
tqueue_push()
int tqueue_push(tqueue_t *the_queue, task_t *task)
{
#ifndef MONITOR
if(task == NULL) return -1;
#endif
pthread_mutex_lock(&(the_queue->mutex));
task->next = NULL;
task->last = the_queue->tail;
if (the_queue->tail)
the_queue->tail->next = task;
the_queue->tail = task; //only one task in queue
if (the_queue->size++ == 0)
the_queue->head = task;//only one task in queue
#if defined(MONITOR)
pthread_cond_signal(&(the_queue->cond));
#endif
pthread_mutex_unlock(&(the_queue->mutex));
return 0;
}
tqueue_free()
int tqueue_free(tqueue_t *the_queue)
{
if(the_queue == NULL) return -1;
task_t *cur = the_queue->head;
while (cur) {
the_queue->head = the_queue->head->next;
free(cur);
cur = the_queue->head;
}
#if defined(MONITOR)
pthread_mutex_lock(&(the_queue->mutex));
#endif
pthread_mutex_destroy(&(the_queue->mutex));
pthread_cond_destroy(&(the_queue->cond));
return 0;
}
tpool_free()
int tpool_free(tpool_t *the_pool)
{
#if defined(MONITOR)
pthread_cond_broadcast(&(the_pool->queue->cond));
pthread_mutex_unlock(&(the_pool->queue->mutex));
#endif
if(the_pool == NULL) return -1;
for (uint32_t i = 0; i < the_pool->count; ++i)
if(pthread_join(the_pool->threads[i], NULL) != 0) return -1;
free(the_pool->threads);
tqueue_free(the_pool->queue);
return 0;
}
task_run()
static void *task_run(void *data)
{
(void) data;
while (1) {
#if defined(MONITOR)
pthread_mutex_lock(&(pool->queue->mutex));
while( pool->queue->size == 0) {
pthread_cond_wait(&(pool->queue->cond), &(pool->queue->mutex));
}
if (pool->queue->size == 0) break;
#endif
task_t *_task = tqueue_pop(pool->queue);
#if defined(MONITOR)
pthread_mutex_unlock(&(pool->queue->mutex));
#endif
if (_task) {
if (!_task->func) {//if function does not exist, push task back to queue
tqueue_push(pool->queue, _task);
break;
} else {
_task->func(_task->arg);//excute task
free(_task);
}
}
}
pthread_exit(NULL);
}
比原先快很多,可以看到 y 軸最高只有 0.014 秒
第 20 組
同學們 共筆,將 queue 的結構改成 ring structure 並比較效能優劣