contributed by <vic85821>
先來搞懂posix thread,了解thread 同步的方法機制
pthread_cond_t
pthread_cond_t myconvar = PTHREAD_COND_INITIALIZER;
pthread_cond_init()
pthread_cond_init (&count_threshold_cv, NULL);
pthread_mutex_lock
, pthread_mutex_unlock
來達到synchronize
pthread_mutex_lock
避免其他thread同時取得pthread_mutex_unlock
釋放為了讓各個thread可以load-balance,透過thread pool來實作,同樣的,先來弄清楚thread pool的運作
thread pool中的job/task,讓thread知道該做什麼事
typedef struct {
void (*function)(void *);
void *argument;
} threadpool_task_t;
struct threadpool_t {
pthread_mutex_t lock;
pthread_cond_t notify;
pthread_t *threads;
threadpool_task_t *queue;
int thread_count;
int queue_size;
int head;
int tail;
int count;
int shutdown;
int started;
};
for(;;) {
/* Lock must be taken to wait on conditional variable */
pthread_mutex_lock(&(pool->lock));
/* Wait on condition variable, check for spurious wakeups.
When returning from `pthread_cond_wait`(), we own the
lock. */
while((pool->count == 0) && (!pool->shutdown)) {
pthread_cond_wait(&(pool->notify), &(pool->lock));
}
if((pool->shutdown == immediate_shutdown) ||
((pool->shutdown == graceful_shutdown) &&
(pool->count == 0))) {
break;
}
/* Grab our task */
task.function = pool->queue[pool->head].function;
task.argument = pool->queue[pool->head].argument;
pool->head += 1;
pool->head =
(pool->head == pool->queue_size) ? 0 : pool->head;
pool->count -= 1;
/* Unlock */
pthread_mutex_unlock(&(pool->lock));
/* Get to work */
(*(task.function))(task.argument);
}
pool->started--;
pthread_mutex_unlock(&(pool->lock));
pthread_exit(NULL);
return(NULL);
}
Lock對效能的影響顯著,因此不希望thread是透過lock來實作。原本的condition variable功能可以透過semaphore來取代。改進方向大致上分為這些
sigemptyset(&zeromask);
sigemptyset(&newmask);
sigaddset(&newmask, SIGXX);
sigprocmask(SIG_BLOCK, &newmask, &oldmask) ;
while (!CONDITION)
sigsuspend(&zeromask);
sigprocmask(SIG_SETMASK, &oldmask, NULL)
是起步慢嗎? jserv
需要#include <signal.h>
sigemptyset()
: initializes the signal set given by set to empty, with all signals excluded from the set.sigfillset()
: initializes set to full, including all signals.sigaddset()
and sigdelset()
: add and delete respectively signal signum from set.int sigprocmask(int how, const sigset_t *set, sigset_t *oldset);
: used to fetch and/or change the signal mask of the calling thread.int sigsuspend(const sigset_t *mask);
: wait for a signal假設某段鐵路是單線的,因此,一次只允許一列火車通過;semaphore用於協調同步通過該軌道的火車,火車在進入單一軌道之前必須等待信號燈變為允許通行的狀態,火車進入軌道後,必須改變信號燈狀態,防止其他火車進入該軌道;火車離開這段軌道時,必須再次更改信號燈的狀態,以便允許其他火車進入軌道。
Semaphore可以用於紀錄某一特定資源剩下多少數目可使用;process或thread透過semaphore可以安全的使用共享資源,若特定資源已使用完時,會需要等待資源被釋放。
Semaphore包含兩種:binary semaphore(二進位信號)和counting semaphore(計數信號)。
binary semaphore
counting semaphore
semaphore有兩種操作方式,為atomic operation(all or nothing)
sem_post()
執行V opeation,sem_wait()
執行P operation。#define _POSIX_SEM_VALUE_MAX 32767
int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_post(sem_t *sem);
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_destroy(sem_t *sem);
recursive bubble sort
typedef struct list {
int data;
struct list *next;
} LIST;
LIST *sort( LIST *start )
{
if( start == NULL ) return NULL;
start->next = sort(start->next);
if( start->next != NULL && start->data > start->next->data ) {
start = move( start );
}
return start;
}
LIST *move( LIST *x )
{
LIST *n, *p, *ret;
p = x;
n = x->next;
ret = n;
while( n != NULL && x->data > n->data ) {
p = n;
n = n->next;
}
/* we now move the top item between p and n */
p->next = x;
x->next = n;
return ret;
}
透過一層一層的呼叫sort,由link list的最後端排列,到整個list都排列完成
sudo apt-get mutrace
e.g. : (for i in {1..8}; do echo $RANDOM; done) | mutrace ./sort 4 8
mutrace: Showing statistics for process sort (pid 6081).
mutrace: 3 mutexes used.
Mutex #0 (0x0x6ad9b0) first referenced by:
/usr/lib/mutrace/libmutrace.so(pthread_mutex_init+0xf2) [0x7fa70f9ea4b2]
./sort(tqueue_init+0x38) [0x401277]
./sort(tpool_init+0x6a) [0x4014cc]
./sort(main+0x161) [0x401c74]
/lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xf0) [0x7fa70f421830]
Mutex #1 (0x0x7fa70cff0860) first referenced by:
/usr/lib/mutrace/libmutrace.so(pthread_mutex_lock+0x49) [0x7fa70f9ea6b9]
/lib/x86_64-linux-gnu/libgcc_s.so.1(_Unwind_Find_FDE+0x2c) [0x7fa70cdecfec]
[(nil)]
Mutex #2 (0x0x603120) first referenced by:
/usr/lib/mutrace/libmutrace.so(pthread_mutex_init+0xf2) [0x7fa70f9ea4b2]
./sort(main+0x11d) [0x401c30]
/lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xf0) [0x7fa70f421830]
mutrace: Showing 3 most contended mutexes:
Mutex # Locked Changed Cont. tot.Time[ms] avg.Time[ms] max.Time[ms] Flags
0 76 14 7 0.023 0.000 0.002 Mx.--.
1 20 10 4 0.008 0.000 0.002 M-.--.
2 13 4 0 0.004 0.000 0.000 M-.--.
||||||
/|||||
Object: M = Mutex, W = RWLock /||||
State: x = dead, ! = inconsistent /|||
Use: R = used in realtime thread /||
Mutex Type: r = RECURSIVE, e = ERRRORCHECK, a = ADAPTIVE /|
Mutex Protocol: i = INHERIT, p = PROTECT /
RWLock Kind: r = PREFER_READER, w = PREFER_WRITER, W = PREFER_WRITER_NONREC
mutrace: Note that the flags column R is only valid in --track-rt mode!
mutrace: Total runtime is 1.006 ms.
e.g.
val_t改為char array(大小是根據phonebook的MAX_LAST_NAME_SIZE)
typedef char val_t[MAX_LAST_NAME_LEN];
merge_list()
while (a->size && b->size) {
llist_t *small = (llist_t *)
((intptr_t) a * (strcmp(a->head->data,b->head->data) < 0 ? 1:0) +
(intptr_t) b * (strcmp(a->head->data,b->head->data) < 0 ? 0:1));
if (current) {
current->next = small->head;
current = current->next;
} else {
_list->head = small->head;
current = _list->head;
}
small->head = small->head->next;
--small->size;
++_list->size;
current->next = NULL;
}
CHECK_CFLAGS = -std=gnu99 -Wall -g -pthread -DTEST
check_right :
$(CC) $(CHECK_CFLAGS) -o list.o -MMD -MF .list.o.d -c list.c
$(CC) $(CHECK_CFLAGS) -o threadpool.o -MMD -MF .threadpool.o.d -c threadpool.c
$(CC) $(CHECK_CFLAGS) -o main.o -MMD -MF .main.o.d -c main.c
$(CC) $(CFLAGS) -o $@ $(OBJS) -rdynamic
uniq words.txt | sort -R > input.txt
./check_right 4 349900 < input.txt > output.txt
diff words.txt output.txt
diff的結果兩個檔案內容一致,可以知道sort的結果正確
mergesort
thread pool