一圖勝千語:
將函式呼叫從原本的面貌: [ source ]
轉換成以下:
C 程式實作 coroutines 的手法相當多,像是:
取自Programming Parallel Machines Spring 2019第7周教材
A thread can possess an independent flow of control and be schedulable because it maintains its own:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
void printMsg(char *msg)
{
int *status = malloc(sizeof(int));
*status = 1;
printf("%s\n", msg);
pthread_exit(status);
}
int main(int argc, char **argv)
{
pthread_t thrdID;
int *status = malloc(sizeof(int));
printf("creating a new thread\n");
pthread_create(&thrdID, NULL, (void *) printMsg, argv[1]);
printf("created thread %lu\n", thrdID);
pthread_join(thrdID, (void**) &status);
printf("Thread %lu exited with status %d\n", thrdID, *status);
return 0;
}
執行結果
$ ./a.out "Hello world!"
creating a new thread
created thread 219465472
Hello world!
Thread 219465472 exited with status 1
延伸閱讀: POSIX Threads Programming
3 basic synchronization primitives
取自 Ching-Kuang Shene (冼鏡光) 教授的教材:Part IV Other Systems: IIIPthreads: A Brief Review
「冼」音同「顯」
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int pthread_mutex_init(pthread_mutex_t *mutex, pthread_mutexattr_t *attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
pthread_mutex_trylock()
returns EBUSY
, the lock is already locked. Otherwise, the calling thread becomes the owner of this lock.pthread_mutexattr_settype()
, the type of a mutex can be set to allow recursive locking or report deadlock if the owner locks againpthread_mutex_t bufLock;
void producer(char* buf1) {
char* buf = (char*) buf1;
for(;;) {
while(count == MAX_SIZE);
pthread_mutex_lock(&bufLock);
buf[count] = (char) ((int)('a')+count);
count++;
printf("%d ", count);
pthread_mutex_unlock(&bufLock);
}
}
void consumer(void* buf1) {
char* buf = (char*) buf1;
for(;;) {
while(count == 0);
pthread_mutex_lock(&bufLock);
count--;
printf("%d ", count);
pthread_mutex_unlock(&bufLock);
}
}
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond); // all threads waiting on a condition need to be woken up
wait
call should occur under the protection of a mutexpthread_mutex_t *lock;
pthread_cond_t *notFull, *notEmpty;
void consumer(char* buf) {
for(;;) {
pthread_mutex_lock(lock);
while(count == 0)
pthread_cond_wait(notEmpty, lock);
useChar(buf[count-1]);
count--;
pthread_cond_signal(notFull);
pthread_mutex_unlock(lock);
}
}
sem_t semaphore;
int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_wait(sem_t *sem);
int sem_post(sem_t *sem);
sem_t empty, full;
void producer(char* buf) {
int in = 0;
for(;;) {
sem_wait(&empty);
buf[in] = getChar();
in = (in + 1) % MAX_SIZE;
sem_post(&full);
}
}
void consumer(char* buf) {
int out = 0;
for(;;) {
sem_wait(&full);
useChar(buf[out]);
out = (out + 1) % MAX_SIZE;
sem_post(&empty);
}
}
2016q1:Homework2 提到 raytracing 程式,UCLA Computer Science 35L, Winter 2016 Software Construction 課程有個作業值得參考:S35L_Assign8_Multithreading
編譯與測試
$ git clone https://github.com/maxwyb/CS35L_Assign8_Multithreading.git raytracing-threads
$ cd raytracing-threads
$ make clean all
$ ./srt 4 > out.ppm
$ diff -u out.ppm baseline.ppm
預期會得到下圖:
將 srt
後面的數字換成 1, 2, 4, 8 來測試執行時間
[ main.c ]
#include <pthread.h>
pthread_t* threadID = malloc(nthreads * sizeof(pthread_t));
int res = pthread_create(&threadID[t], 0, pixelProcessing, (void *)&intervals[t]);
int res = pthread_join(threadID[t], &retVal);
Condition variables must be declared with type pthread_cond_t, and must be initialized before they can be used. There are two ways to initialize a condition variable:
Statically, when it is declared. For example:
pthread_cond_t myconvar = PTHREAD_COND_INITIALIZER;
Dynamically, with the pthread_cond_init() routine. The ID of the created condition variable is returned to the calling thread through the condition parameter. This method permits setting condition variable object attributes, attr.
Introduction to Computer Systems 對應的教科書 CS:APP Concurrent Programming, Page 40-56
CS:APP Synchronization: Advanced, Page 40-49
計數器的程式範例:
// 全域變數
volatile long cnt = 0; // 計數器
void *thread(void *vargp) {
long niters = *((long *)vargp);
for (long i = 0; i < niters; i++)
cnt++;
return NULL;
}
int main(int argc, char **argv) {
long niters;
pthread_t tid1, tid2;
niters = atoi(argv[1]);
pthread_create(&tid1, NULL, thread, &niters);
pthread_create(&tid2, NULL, thread, &niters);
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
if (cnt != (2 * niters))
printf("Wrong! cnt=%ld\n", cnt);
else
printf("Correct! cnt=%ld\n", cnt);
exit(0);
}
運行的結果每次不一樣:
$ ./badcnt 10000
Correct! cnt=20000
$ ./badcnt 10000
Wrong! cnt=10458
把 cnt
變數操作的部分抽出來看:
執行緒函式的迴圈程式碼:
for (long i = 0; i < niters; i++)
cnt++;
對應的組合語言程式碼:
# 以下四句為 Head 部分,記為 H
movq (%rdi), %rcx
testq %rcx, %rcx
jle .L2
movl $0, %eax
.L3:
movq cnt(%rip), %rdx # 讀取 cnt,記為 L
addq $1, %rdx # 更新 cnt,記為 U
movq %rdx, cnt(%rip) # 寫入 cnt,記為 S
# 以下為 Tail 部分,記為 T
addq $1, %rax
cmpq %rcx, %rax
jne .L3
.L2:
cnt 使用 volatile
關鍵字聲明,意思是避免編譯器產生的程式碼中,透過暫存器來保存數值,無論是讀取還是寫入,都在主記憶體操作。
細部的步驟分成 5 步:H
-> L
-> U
-> S
-> T
,尤其要注意 LUS 這三個操作,這三個操作必須在一次執行中完成,一旦次序打亂,就會出現問題,不同執行緒拿到的值就不一定是最新的。
也就是說該函式的正確執行和指令的執行順序有關
將 n 個 concurrent 執行緒的執行模型描述為 n 維空間中的軌跡線,每條軸 k 對應 k 的進度。下圖展示對於 H1, L1, U1, H2, L2, S1, T1, U2, S2, T2 的軌跡線的進度圖:
對於執行緒編號 i
,操作共享變數 cnt
內含值的指令 (,Li,Ui,Si) 構成了一個 CS,每個 CS 不應該和其他執行緒的 CS 交替執行。換言之,需要對共享變數進行互斥的存取:
mutual exclusion (互斥) 手段的選擇,不是根據 CS 的大小,而是根據 CS 的性質,以及有哪些部分的程式碼,也就是,仰賴於核心內部的執行路徑。
semaphore 和 spinlock 屬於不同層次的互斥手段,前者的實現仰賴於後者,可類比於 HTTP 和 TCP/IP 的關係,儘管都算是網路通訊協定,但層次截然不同:
spinlock 的本意和目的就是保證資料修改的 atomics,因此也沒有理由在 spinlock 鎖住的 CS 中停留。
案例: Mutex and Semaphore (你今天用對 mutex 了嗎?)
優先權: Task1(H) > Task2(M) > Task 3(L)
(1)
Task3 正在執行,而 Task1 和 Task2 正等待特定事件發生,例如 timer interrupt(2)
Task3 為了存取共用資源,必須先獲取 lock(3)
Task3 使用共用資源 (灰色區域)。(4)
Task1 等待的事件發生 (可以是 "delay n ticks",此時 delay 結束),作業系統核心暫停 Task3 改執行 Task1,因為 Task1 有更高的優先權(5)
(6)
Task1 執行一段時間直到試圖存取與 Task3 共用的資源,但 lock 已被 Task3 取走了,Task1 只能等待 Task3 釋出 lock(7)
(8)
Task3 恢復執行,直到 Task2 因特定事件被喚醒,如上述 (4)
的 Task3(9)
(10)
Task2 執行結束後,將 CPU 資源讓給 Task3(11)
(12)
Task3 用完共享資源後釋放 lock。作業系統核心知道尚有個更高優先權的任務 (即 Task1) 正在等待此 lock,執行 context switch 切換到 Task1(13)
Task1 取得 lock,開始存取共用資源。Task1 原本該有最高優先權,但現在卻降到 Task3 的等級,於是反到是 Task~2 和 Task3 有較高的優先權,優先權順序跟預期不同,這就是 Priority Inversion(優先權反轉)。
priority inheritance (PI) 是其中一種 Priority Inversion 解法:
針對上圖解析
(1)
(2)
同上例,Task3 取得 lock(3)
(4)
(5)
Task3 存取資源時被 Task 1 搶佔(6)
Task1 試圖取得 lock。作業系統核心發現 Task3 持有 lock,但 Task3 優先權卻低於 Task1,於是作業系統核心將 Task3 優先權提升到與 Task1 的等級(7)
作業系統核心讓 Task3 恢復執行,Task1 則繼續等待(8)
Task3 用畢資源,釋放 lock,作業系統核心將 Task3 恢復成原本的優先權,恢復 Task1 的執行(9)
(10)
Task1 用畢資源,釋放 lock(11)
Task1 釋出 CPU,Task2 取得 CPU 控制權。在這個解法中,Task2 不會導致 priority inversionMutex vs. Semaphores – Part 1: Semaphores
Mutex vs. Semaphores – Part 2: The Mutex
Mutex vs. Semaphores – Part 3: Mutual Exclusion Problems
Mutexes and Semaphores Demystified