TODO: 〈並行和多執行緒程式設計〉系列講座並紀錄問題和提出改進
多工處理
若在執行工作期間,該任務發生錯誤,是否就會導致整個程式沒法中止?
Multiprogramming 多重程式 | Multitasking 多工 | Multithreading 多執行緒 | Multiprocessing 多重處理 | |
---|---|---|---|---|
定義 | 在單一 CPU 上執行多個程式 | 在單一 CPU 上執行多個工作 | 在單一工作上運行多個執行緒 | 在多個 CPU 上運行多個行程 |
共享資源 | 資源(CPU、記憶體)在程式之間共享 | 資源(CPU、記憶體)在工作之間共享 | 資源(CPU、記憶體)在執行緒之間共享 | 每個行程都有自己一組的資源(CPU、記憶體) |
排程 | 使用循環或基於優先權的排程為程式分配 CPU 時間 | 使用基於優先權或時間分割的調度為工作分配 CPU 時間 | 使用基於優先權或時間分割的調度為執行緒分配 CPU 時間 | 每個行程可以有自己的排程演算法 |
記憶體管理 | 每個程式都有自己的記憶體空間 | 每個工作都有自己的記憶體空間 | 執行緒在工作內共享記憶體空間 | 每個行程都有自己的記憶體空間 |
工作切換 | 需要工作切換才能在程式之間切換 | 需要工作切換才能在工作之間切換 | 需要工作切換才能在執行緒之間切換 | 需要工作切換才能在行程之間切換 |
Inter-Process Communication (IPC) | 使用訊息傳遞或共享記憶體進行 IPC | 使用訊息傳遞或共享記憶體進行 IPC | 使用 IPC 的執行緒同步機制(例如 lock、semaphores) | 使用 IPC 的進程間通訊機制(例如 pipes、sockets) |
不對等 (heterogeneous) 的執行單元,代表的是什麼意思?為何在系統中會有不對等的情況?
並行程式設計對單一 CPU 來說,同一時間只執行一個任務,但利用快速切換的方式,看似一次在執行多的任務。
這樣快速切換的方法,在單一 CPU 上是否會付出更多的成本,在速度上與循序式相比是否真的能更快?
task/thread
在多個 CPU 的環境中,並行處理的具體作法即是將一個循序執行的程式打散在個別 CPU 中,而每一 CPU 所執行的部份稱為一個執行緒。為了達成某個目標,可能需要執行多個任務,而每個任務被分割出多個執行緒(即多工多執行緒系統(multi-tasking multi-threading systems))。
工作切換(context switch)
多工作業系統核心其中一項關鍵功能是,切換工作時必須保證被停止的工作可在未來繼續執行。
當工作切換發生時,系統必須記錄切換前的下一次執行地址,當工作切換回復時, CPU 將繼續執行切換前的動作與狀態,以保證該工作擁有的資料區不受污染或破壞 (corruption)。
排程器(Scheduler)
用作決定 CPU 下一個要執行的任務。
經典的演算法有:
搶佔式與非搶佔式核心
差別在於工作本身對 CPU 使用權的交出是強制達成或是自願性的。在非搶佔式的作業系統中,各工作的程式碼中必須包含交出 CPU 使用權的動作,為達並行的需求,該動作的頻率必須夠高,否則會讓使用者感受到明顯的等待。
須將工作進行時間分割,才能落實將優先度較高的工作進行搶占。
在設計上,搶佔式的作法最大優點為反應快速,但也大幅增加實作難度,同時也必須注意程式碼的再進入性 (reentrancy) 與保護共用資料區等。
經典的 Fork-Join 模型
分成三部分:
Concurrency(並行) vs. Parallelism (平行)
向量內積可以使用迴圈的方式來完成運算(沒有 Parallelism ),也可以使用平行的方式來同時執行每個相對應位置的乘法。因此平行化只是一種選擇,並非一定。
如何確保並行執行結果正確
探究排程器
$ cat /proc/sys/kernel/threads-max
63704
# Threads in linux are light weight processes (LWP).
$ cat /proc/sys/kernel/pid_max
131072
# Maximum number of VMAs a process can own.
$ cat /proc/sys/vm/max_map_count
65530
coroutine
函式呼叫,主程式呼叫到 B(),結束時回到主程式 A()。
coroutine 還是函式呼叫的概念,但不用回到 A(),順序完全取決於期待。
coroutine 實作
coro : 一百多行的 code 實現簡單的協同是多工,使用 setjmp
與 longjmp
來切換任務。
neco
課程影片提到
buffered I/O 解決 coroutine 中間狀態造成的事件 ?
我不了解 buffer 在 coroutine 中扮演的角色,及做了什麼?
混合使用者和核心層級執行緒的模式
user space 與 kernel space 的數量不一樣,user space 必須自己察覺到這件事 (M : N)
何時會不適用 1 : 1 的機制 ex: 網頁伺服器
在網頁伺服器中,M : N 機制的 user space 先將 Thread D 、 Thread C 進行排序,相較以往 1 : 1 需要大量的執行緒與 lock 來確保資源一致。
為何不能將所有事情交給 kernel
只要有一個系統單元被系統呼叫或 lock 所拖延,那就無法實現到低延遲。
futex
thread pool
應用 coroutine 到事件驅動
目的 : coroutine 讓切換成本很小,趕緊把任務完成
aio_read
去幫你監控任務是否完成,有個地方要注意, aio_read
要與主程式在不同的 CPU 上,不然可能會發生衝突。問題
在 I/O Multiplexing Model 有提到等到完成會去通知資料已完成,但為何 Blocking I\O Model 中不能實現在通知完成以前去執行其他的工作,必須使用到 Nonblocking 的作法一直去檢查是否完成 ?
在並行程式設計中,常用 lock 避免多執行續的情況下修改同一資料,造成 race condition 情況。
假設兩執行緒各自將全域變數的值 +1
Thread 1 | Thread 2 | value | |
---|---|---|---|
0 | |||
read | <- | 0 | |
increase | 0 | ||
write back | -> | 1 | |
read | <- | 1 | |
increase | 1 | ||
write back | -> | 2 |
若沒有使用 lock 或是同步機制,可能遇到下面問題
Thread 1 | Thread 2 | value | |
---|---|---|---|
0 | |||
read | <- | 0 | |
read | <- | ||
increase | 0 | ||
increase | 0 | ||
write back | -> | 1 | |
write back | -> | 1 |
因此結果並非預期的 2 而是 1。
阻塞式同步機制
當多執行緒或行程會用到共享資源時,需要阻塞 (blocking) 其他執行緒來讓某執行緒能夠符合預期的執行。
缺點:
priority inversion (優先權反轉)
考慮三種優先級的任務 : 高優先級 H、中優先級 M (不需要共享資源)、 低優先級 L
在多執行緒的情況下,H 被迫等待 L 釋放鎖,而 L 又會被 M 給搶佔,導致 L 無法及時釋放共享資源,進而導致高優先級的 H 無法即時執行。
若不希望因為單一執行緒而拖延到整個進程,就必須考慮到阻塞式同步機制以外的方案。
補充:
在 〈POSIX Thread〉 中有舉例優先權反轉的問題及解決方法
斜線面表示使用到共用資源,因此被鎖住。
上圖中可以看到當 Task 1(H) 高優先權任務進來後,因使用到共用資源,但 Task 3 (L) 還沒有釋放鎖,必須先回到 Task 3 (L) 直到鎖被釋放,此時 Task 2 (M) 可能因特定事件被喚醒( Task 2 (M) 並不會用到共用資源),因此改為執行 Task 2 (M),等 Task 2 (M) 結束時又會回到 Task 3 (L) 直到鎖釋放,才會執行 Task 1 (H)。
上圖為 priority inheritance (PI) 是其中一種 Priority Inversion 解法。其解法為將 Task 3 (L) 拿到鎖後的優先權變成跟 Task 1 (H) 一樣高,這樣才不會被 Task 2 (M) 搶佔,符合 H -> M -> L 的優先權順序。
Atomic 操作:非阻塞式同步的基石
在 c 語言實現 compare-and-swap (CAS)
int CAS(int *reg, int old, int new)
{
ATOMIC();
int tmp = *reg;
if(tmp == old)
*reg = new;
END_ATOMIC();
return tmp;
}
在多執行緒下,必須使用 atomic 操作確保不會被中斷。
atomic 指令是否會像 mutex 一樣有什麼成本 ?
mutex 阻塞其他執行緒來確保資料的一致性,atomic 指令為何能夠達到取代 mutex 的效果?
TODO:測試 CAS 函式操作
多核處理器之間的 cache coherence
在同一時間的同一個變數在不同的 CPU 可能會有不同的值。
舉例 : 當 CPU0 剛修改記憶體某處的資料,最新的數值是先寫入到 CPU0 的 cache,等待 cache line 被標註為 invalidate,才會寫入到主記憶體,若此時另一個 CPU1 打算存取主記憶體中同一位置資料, CPU1 將存取到主記憶體中舊的資料。因此我們需要一個機制來確保 cache 在不同 CPU 之間的一致性。
在設計的概念上,使用 cache 時,確保個別 CPU 看到一致的資料。而非讓 cache 與 memory 保持同步。維護這點的協定統稱為 cache-coherence protocol。
實作 cache-coherence protocol 時,設計者需要定義 CPU 對自己 cache 的請求,及 CPU 對其他 CPU 的 cache 的請求,定義如下:
MESI protocol
從 MESI protocol 中了解 CPU 之間的資料一致的關係
M | E | S | I | |
---|---|---|---|---|
M |
Image Not Showing
Possible Reasons
|
Image Not Showing
Possible Reasons
|
Image Not Showing
Possible Reasons
|
Image Not Showing
Possible Reasons
|
E |
Image Not Showing
Possible Reasons
|
Image Not Showing
Possible Reasons
|
Image Not Showing
Possible Reasons
|
Image Not Showing
Possible Reasons
|
S |
Image Not Showing
Possible Reasons
|
Image Not Showing
Possible Reasons
|
Image Not Showing
Possible Reasons
|
Image Not Showing
Possible Reasons
|
I |
Image Not Showing
Possible Reasons
|
Image Not Showing
Possible Reasons
|
Image Not Showing
Possible Reasons
|
Image Not Showing
Possible Reasons
|
舉例說明 :
Exclusive
Shared
Invalid
,因為其資料已過時。Shared
在實作上用 3 個 bit 來表示 cacheline 的狀態
Valid bit | Dirty bit | Shared bit | |
---|---|---|---|
Modified | 1 | 1 | 0 |
Exclusive | 1 | 0 | 0 |
Shared | 1 | 0 | 1 |
Invalid | 0 | X | X |
MOESI protocol
為避免將資料寫回主記憶體, MOESI protocol 加入 Owner ,並略為修改 Shared 狀態
Valid bit | Dirty bit | Shared bit | |
---|---|---|---|
Modified | 1 | 1 | 0 |
Owner | 1 | 1 | 1 |
Exclusive | 1 | 0 | 0 |
Shared | 1 | 0 | 1 |
Invalid | 0 | X | X |
舉例:
延續上個例子且略為修改
Shared
,而 CPU0 上的 cacheline 狀態則會從 Modified
變為 Owner
。Owner
被設定成 Modified
,他必須發 Probe Write 給 CPU1,去看 CPU1 的 cache 中是否含有 Block 0。接著 CPU1 會收到 Probe write hit,因為其 cache 中有 Block 0 的資料。此時,CPU1 含有 Block 0 的 cacheline 也會被設定成 Invalid
,因為其資料已過時。在 MESI 協議中,當一個 CPU 修改資料後,如果其他處理器需要讀取該資料,則修改過的資料必須先寫回主記憶體,然後再提供給請求者。而在 MOESI 協議中,Owner 狀態允許資料持有者直接提供資料給請求者,而不需要寫回主記憶體,這樣可以減少主記憶體的存取次數,提高系統性能。
false sharing
圖中有兩執行緒,當 CPU0 不斷的改變變數 X
, CPU1 的 cache 該變數會變為無效,即使他並沒有修改此變量。這會導致頻繁的高速緩存無效和重新載入操作,從而降低性能。
如何做到抑制 false sharing
Memory Ordering
在多核環境下,對記憶體操作的順序進行重新排列的技術。這種技術的目的是提高性能和效率,但是可能會導致一些複雜性,特別是在並行程序設計。
討論 cache coherence 時,MESI 與 MOESI 皆須等待另個 CPU 做出回應才能夠確保資料的一致性,但付出的代價是寫入 cache 的速度較慢,讓 CPU 閒置。
假設 CPU0 打算寫入資料到位置 X,CPU1 的 cache 有 X 的內含值。存取延遲 (stall) 的原因如下:
減少 CPU 閒置方法:
在範例中我並不能理解 store buffer 是如何縮短 CPU 的閒置時間?
TODO: 硬體實作策略
TODO: 軟體觀點
TODO: 易於誤用 volatile
關鍵字
TODO: 處理器架構和其 Memory Order
創造執行緒 pthread_create
建立一個新執行緒並使其可執行。
pthread_create(thread, attr, start_routine, arg)
這裡的輸入參數需要注意:
NULL
表示為預設屬性start_routine
的單一參數。這個函數必須接受一個 void *
類型的參數,並返回一個 void *
類型的結果。。如果不傳遞任何參數,則可以使用 NULL
。中止執行緒與 pthread_exit(status)
執行緒可以通過多種方式終止:
pthread_exit
子例程,不論其工作是否完成pthread_cancel
例程取消exec()
或 exit()
main()
先完成,無需 pthread_exit
明確呼叫自身pthread_exit
pthread_exit
終止自己,並返回結果給其他執行緒。pthread_exit
pthread_exit
可以使主執行緒結束執行,而不會終止整個行程,允許其他執行緒繼續運行直到它們自己終止。實作:
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#define NUM_THREAD 4
void *print(void *thread_id)
{
long tid;
tid = (long)thread_id;
printf("Hello World!, By thread #%ld\n", tid);
pthread_exit(NULL);
}
int main(int argc, char *argv[])
{
// Create NUM_THREADS threads
pthread_t threads[NUM_THREAD];
// Receive the return value of the pthread_create function
int rc;
long t;
for(t = 0; t < NUM_THREAD; t++){
printf("Create thread %ld\n",t);
rc = pthread_create(&threads[t], NULL, print, (void *)t);
// If pthread_create successfully creates a thread, the return value rc will be 0.
if (rc){
printf("ERROR\n");
exit(-1);
}
}
// Prevent the whole process from terminating due to main() ending first.
pthread_exit(NULL);
}
結果:
Create thread 0
Create thread 1
Create thread 2
Hello World!, By thread #0
Create thread 3
Hello World!, By thread #1
Hello World!, By thread #2
Hello World!, By thread #3
連接和分離線程
join 是實現執行緒之間"同步"的一種方法
attr
的參數pthread_attr_t
聲明資料類型的 pthread 屬性變數pthread_attr_init()
pthread_attr_setdetachstate()
pthread_attr_destroy()
若明確知道一執行緒永遠不需與另一個執行緒連接請考慮以分離狀態建立它,因為這可能會減少開銷。
我該從何得知預設屬性為何?
例如,在上個範例中 pthread_create(thread, attr, start_routine, arg)
attr 設為 NULL 時即為預設屬性,但我並不知道預設屬性是?
實作可加入的執行緒:
參考 Joining and Detaching Threads
將上個範例改寫成可加入執行緒的情況
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#define NUM_THREADS 3
void *print(void *thread_id) {
long tid = (long) thread_id;
printf("Hello World!, By thread #%ld\n", tid);
pthread_exit((void *) tid);
}
int main(int argc, char *argv[]) {
pthread_t threads[NUM_THREADS];
/* Declare a pthread attribute variable of the pthread_attr_t data type */
pthread_attr_t attr;
int rc;
long t;
void *status;
/* Initialize and set thread detached attribute */
/* PTHREAD_CREATE_JOINABLE is joinable */
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
for (t = 0; t < NUM_THREADS; t++) {
printf("Creating thread %ld\n", t);
rc = pthread_create(&threads[t], &attr, print, (void *)t);
if (rc) {
printf("ERROR\n");
exit(-1);
}
}
/* Free attribute and wait for the other threads */
pthread_attr_destroy(&attr);
for (t = 0; t < NUM_THREADS; t++) {
rc = pthread_join(threads[t], &status);
if (rc) {
printf("ERROR\n");
exit(-1);
}
printf("Completed join with thread %ld having status %ld\n", t, (long)status);
}
printf("Main thread completed.\n");
pthread_exit(NULL);
}
在 print
函式中的 pthread_exit((void *) tid);
返回一個狀態值 (status) 給其他等待這個執行緒的執行緒。
結果
Creating thread 0
Creating thread 1
Hello World!, By thread #0
Creating thread 2
Hello World!, By thread #1
Hello World!, By thread #2
Completed join with thread 0 having status 0
Completed join with thread 1 having status 1
Completed join with thread 2 having status 2
Main thread completed.
Stack Management
學習如何正確地設置和獲取線程的 Stack 大小,以適應需要大量堆棧空間的任務。可以避免 Stack 溢出,並確保線程能夠正確執行。
Stack Management 中的範例用多個執行緒並進行矩陣計算。
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#define NTHREADS 4
#define N 1000
#define MEGEXTRA 1000000
pthread_attr_t attr;
void *dowork(void *threadid)
{
double A[N][N];
int i, j;
long tid;
size_t mystacksize;
tid = (long)threadid;
pthread_attr_getstacksize(&attr, &mystacksize);
printf("Thread %ld: stack size = %li bytes \n", tid, mystacksize);
for (i = 0; i < N; i++) {
for (j = 0; j < N; j++) {
A[i][j] = ((i * j) / 3.452) + (N - i);
}
}
pthread_exit(NULL);
}
int main(int argc, char *argv[])
{
pthread_t threads[NTHREADS];
size_t stacksize;
int rc;
long t;
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr, &stacksize);
printf("Default stack size = %li\n", stacksize);
stacksize = sizeof(double)*N*N+MEGEXTRA;
printf("Amount of stack needed per thread = %li\n", stacksize);
pthread_attr_setstacksize (&attr, stacksize);
printf("Creating threads with stack size = %li bytes\n", stacksize);
for(t=0; t<NTHREADS; t++){
rc = pthread_create(&threads[t], &attr, dowork, (void *)t);
if (rc){
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
printf("Created %ld threads.\n", t);
pthread_exit(NULL);
}
pthread_attr_getstacksize(attr, stacksize)
獲取當前 Stack 大小pthread_attr_setstacksize(attr, stacksize)
設置 Stack 大小結果
Default stack size = 8388608
Amount of stack needed per thread = 9000000
Creating threads with stack size = 9000000 bytes
Created 4 threads.
Thread 1: stack size = 9000000 bytes
Thread 0: stack size = 9000000 bytes
Thread 3: stack size = 9000000 bytes
Thread 2: stack size = 9000000 bytes
3 basic synchronization primitives:
互斥鎖 (Mutex)
創建互斥鎖
//Statically
pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER;
//Dinamically, using pthread_mutex_init().
pthread_mutex_t mymutex;
pthread_mutex_init(&mymutex, NULL);
銷毀互斥鎖
pthread_mutex_destroy
:
銷毀不再需要互斥的變數
int pthread_mutex_destroy(pthread_mutex_t *mutex);
鎖定和解鎖互斥鎖
pthread_mutex_lock
:
執行緒使用此函數來取得互斥變數的鎖。如果該互斥鎖已經被其他執行緒鎖定,則此調用會阻塞當前執行緒直到互斥鎖被解鎖。
int pthread_mutex_lock(pthread_mutex_t *mutex);
pthread_mutex_trylock
:
嘗試取得互斥鎖。如果互斥鎖已經被鎖定,則此函數會立即返回並且返回一個“忙碌”錯誤代碼。
int pthread_mutex_trylock(pthread_mutex_t *mutex);
pthread_mutex_unlock
:
解鎖一個由當前執行緒擁有的互斥鎖。若出現以下情況,則回傳錯誤:
int pthread_mutex_unlock(pthread_mutex_t *mutex);
結果:
每個執行緒計算部分向量的點積,並使用互斥鎖保護全域變數的更新。
Now it is thread 0 , Sum = 100.000000
Now it is thread 1 , Sum = 200.000000
Now it is thread 2 , Sum = 300.000000
Now it is thread 3 , Sum = 400.000000
Sum = 400.000000
在案例中將多的參數定義在結構體當中
typedef struct
{
double *a;
double *b;
double sum;
int veclen;
} DOTDATA;
a
、 b
sum
veclen
在 dotprod
函式中進行向量的計算,當中使用互斥鎖 mutexsum
來保護 dotstr.sum
的更新,以避免 race condition 。
pthread_mutex_lock(&mutexsum);
dotstr.sum += mysum;
pthread_mutex_unlock(&mutexsum);
在主函式中要注意的是主執行緒必須等待其他執行緒確實完成才能繼續, pthread_join
暫時阻塞主執行緒,等待所有執行緒完成繼續。
/* Wait on the other threads */
for(i = 0; i < NUMTHRDS; i++) {
pthread_join(callThd[i], &status);
printf("Thread %ld completed with status %ld\n", i, (long)status);
}
其他 API
pthread_self()
取得呼叫執行緒IDpthread_equal(thread1, thread2)
比較兩個線程 ID。如果兩個 ID 不同,則傳回 0,否則傳回非零值。條件變量 (Condition Variable)
條件變數允許執行緒阻塞,直到特定條件成立,阻塞的執行緒進入一個等待的佇列,等到條件成立時,其他執行緒會向阻塞的執行緒發出信號。
pthread_cond_init
用來初始化一個條件變數 cond
, attr
用來指定條件變數的屬性。如果 attr
是 NULL
,則使用預設。
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
pthread_cond_destroy
顧名思義,消除條件變數 cond
。
int pthread_cond_destroy(pthread_cond_t *cond);
pthread_cond_wait
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
阻塞當前線程,直到條件變數 cond 被信號喚醒。等待時會自動釋放互斥鎖 mutex,被喚醒後會重新獲取互斥鎖。
pthread_cond_signal
喚醒等待條件變數 cond
的一個執行緒。
int pthread_cond_signal(pthread_cond_t *cond);
pthread_cond_broadcast
喚醒等待條件變數 cond
的所有執行緒。
int pthread_cond_broadcast(pthread_cond_t *cond);
count
變數結果
Starting watch_count(): thread 1
inc_count(): thread 2, count = 1 -- unlocking mutex
watch_count(): thread 1 Count= 1. Going into wait...
inc_count(): thread 3, count = 2 -- unlocking mutex
inc_count(): thread 2, count = 3 -- unlocking mutex
inc_count(): thread 3, count = 4 -- unlocking mutex
inc_count(): thread 2, count = 5 -- unlocking mutex
inc_count(): thread 3, count = 6 -- unlocking mutex
inc_count(): thread 2, count = 7 -- unlocking mutex
inc_count(): thread 3, count = 8 -- unlocking mutex
inc_count(): thread 2, count = 9 -- unlocking mutex
inc_count(): thread 3, count = 10 -- unlocking mutex
inc_count(): thread 2, count = 11 -- unlocking mutex
inc_count(): thread 3, count = 12 -- threshold reached.Just sent signal.
inc_count(): thread 3, count = 12 -- unlocking mutex
watch_count(): thread 1 Condition signal received. Count= 12
watch_count(): thread 1 Updating the value of count...
watch_count(): thread 1 count now = 137.
watch_count(): thread 1 Unlocking mutex.
inc_count(): thread 2, count = 138 -- unlocking mutex
inc_count(): thread 3, count = 139 -- unlocking mutex
inc_count(): thread 2, count = 140 -- unlocking mutex
inc_count(): thread 3, count = 141 -- unlocking mutex
inc_count(): thread 2, count = 142 -- unlocking mutex
inc_count(): thread 3, count = 143 -- unlocking mutex
inc_count(): thread 2, count = 144 -- unlocking mutex
inc_count(): thread 3, count = 145 -- unlocking mutex
Main(): Waited and joined with 3 threads. Final value of count = 145. Done.
流程:
watch_count
一開始對 count 進行上鎖,但 count < COUNT_LIMIT
,因此進入等待並釋放鎖,等到條件變數達成又會自動獲取鎖 (pthread_cond_wait(&count_threshold_cv, &count_mutex);
) while (count < COUNT_LIMIT) {
printf("watch_count(): thread %ld Count= %d. Going into wait...\n", my_id,count);
pthread_cond_wait(&count_threshold_cv, &count_mutex);
printf("watch_count(): thread %ld Condition signal received. Count= %d\n", my_id,count);
}
inc_count
中更新 count 的值,但因為有 mutex 的存在,當前執行緒更新 count 時阻塞另個執行緒,一直到釋放鎖,來防止 race condition。count == COUNT_LIMIT
喚醒正在等待的執行緒 1 ,並將 count 加上 125 if (count == COUNT_LIMIT) {
printf("inc_count(): thread %ld, count = %d -- threshold reached.",
my_id, count);
pthread_cond_signal(&count_threshold_cv);
printf("Just sent signal.\n");
}
Overview
條件變數為執行緒同步提供了另一種方式。 mutex 透過控制執行緒對資料的存取來實現同步,而條件變數允許執行緒根據資料的實際值進行同步。
如果沒有條件變數,程式設計需要讓執行緒不斷輪詢(可能在關鍵部分),以檢查條件是否滿足。這可能會非常消耗資源,因為執行緒在此活動中會持續忙碌。條件變數是一種無需輪詢即可實現相同目標的方法。
條件變數必須與互斥鎖結合使用。
下面顯示了使用條件變數的代表性順序。
Main Thread | |
---|---|
Declare and initialize global data/variables which require synchronization (such as "count") | |
Declare and initialize a condition variable object | |
Declare and initialize an associated mutex | |
Create threads A and B to do work | |
Thread A | Thread B |
Do work up to the point where a certain condition must occur (such as "count" must reach a specified value) | Do work |
Lock associated mutex and check value of a global variable | Lock associated mutex |
Call pthread_cond_wait() to perform a blocking wait for signal from Thread-B. Note that a call to pthread_cond_wait() automatically and atomically unlocks the associated mutex variable so that it can be used by Thread-B. |
Change the value of the global variable that Thread-A is waiting upon. |
When signalled, wake up. Mutex is automatically and atomically locked. | Check value of the global Thread-A wait variable. If it fulfills the desired condition, signal Thread-A. |
Explicitly unlock mutex | Unlock mutex. |
Continue | Continue |
Main Thread | |
Join / Continue |
semaphores
sem_t semaphore;
定義訊號量變數
int sem_init(sem_t *sem, int pshared, unsigned int value);
sem
指向要初始化的訊號量pshared
:若設置為非 0 ,則訊號量在多個行程間共享,若設置為 0 ,則訊號量僅在單一行程內的多執行緒間使用value
訊號量的初始值0
,反之失敗為 -1
int sem_wait(sem_t *sem);
int sem_post(sem_t *sem);
實作:
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
sem_t mutex;
void* thread(void* arg)
{
//wait
sem_wait(&mutex);
printf("\nEntered..\n");
//critical section
sleep(1);
//signal
printf("\nJust Exiting...\n");
sem_post(&mutex);
}
int main()
{
sem_init(&mutex, 0, 1);
pthread_t t1,t2;
pthread_create(&t1,NULL,thread,NULL);
pthread_create(&t2,NULL,thread,NULL);
pthread_join(t1,NULL);
pthread_join(t2,NULL);
sem_destroy(&mutex);
return 0;
}
全名 Fast Userspace Mutex ,Futex 可用於實作使用者空間之同步機制。
由於在 Linux 上 futex 沒有直接的 C 語言函式介面,因此需藉由 syscall
來進行。
long syscall(SYS_futex, uint32_t *uaddr, int futex_op, uint32_t val,
const struct timespec *timeout, /* or: uint32_t val2 */
uint32_t *uaddr2, uint32_t val3);
在了解 mutex_trylock 之前必須先看 mutex/atomic.h
#define load(obj, order) atomic_load_explicit(obj, memory_order_##order)
#define fetch_or(obj, arg, order) \
atomic_fetch_or_explicit(obj, arg, memory_order_##order)
#define thread_fence(obj, order) atomic_thread_fence(memory_order_##order
#define exchange(obj, value, order) \
atomic_exchange_explicit(obj, value, memory_order_##order)
mutex_trylock
static bool mutex_trylock(mutex_t *mutex)
{
int state = load(&mutex->state, relaxed);
if (state & MUTEX_LOCKED)
return false;
state = fetch_or(&mutex->state, MUTEX_LOCKED, relaxed);
if (state & MUTEX_LOCKED)
return false;
thread_fence(&mutex->state, acquire);
return true;
}
atomic_load_explicit
以 relaxed 記憶體順序從 mutex->state 讀取當前狀態,若被上鎖返回 false
。atomic_fetch_or_explicit
將 MUTEX_LOCK
bitflag 設置在 mutex->state 上,並獲取此操作之前的 state ,如果此狀態包含 MUTEX_LOCKED
,則說明在嘗試鎖定時鎖已被其他執行緒持有,返回 false 。atomic_thread_fence
以 acquire 記憶體順序設置屏障,確保此操作之後的讀寫操作不會被重排序到此操作之前。memory_order_relaxed
: 對其他讀取或寫入沒有同步或排序約束
memory_order_acquire
: 在此載入之前,當前執行緒中的任何讀取或寫入都不能重新排序
mutex_lock
static inline void mutex_lock(mutex_t *mutex)
{
#define MUTEX_SPINS 128
for (int i = 0; i < MUTEX_SPINS; ++i) {
if (mutex_trylock(mutex))
return;
spin_hint();
}
int state = exchange(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING, relaxed);
while (state & MUTEX_LOCKED) {
futex_wait(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING);
state = exchange(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING, relaxed);
}
thread_fence(&mutex->state, acquire);
}
mutex_trylock
來獲取鎖,若在 for
迴圈的這段時間都得不到 lock ,改藉由 atomic_exchange_explicit
將 state 更改成 MUTEX_LOCKED | MUTEX_SLEEPING
的方式來持有,並返回舊的 stateMUTEX_LOCKED | MUTEX_SLEEPING
是什麼?while (state & MUTEX_LOCKED)
迴圈表示鎖已被其他執行緒所持有,並使用 futex_wait
進入睡眠,等待鎖釋放,當被喚醒後再次嘗試設置狀態並檢查 state 是否還被其他執行緒持有鎖。在 for
迴圈中的 spin_hint
有何作用?
#if defined(__i386__) || defined(__x86_64__)
#define spin_hint() __builtin_ia32_pause()
#elif defined(__aarch64__)
#define spin_hint() __asm__ __volatile__("isb\n")
#else
#define spin_hint() ((void) 0)
#endif
在同步機制中,經常需要以 spin-wait loop 方式檢測 lock 的狀態,以嘗試持有 lock。但這些頻繁的檢測將導致 pipeline 上都是對 lock 狀態的讀操作,那麼當另一個執行緒對 lock 進行寫操作時,因為相依關係 pipeline 就必須重排,這導致性能的下降和更多的耗電。
在其他執行緒寫入後使用 PAUSE 指令,使自旋鎖稍微延遲等待 pipeline 重整,再繼續以 spin-wait loop 方式檢測 lock 的狀態。(像是計算機結構中加入 NOP
)
模擬情境:
給定 16 個工作節點
16 個節點共享一個 clock
,clock
從 tick = 1 開始累計,節點在每個 tick 都有必須完成的事:在偶數 tick 時想像成是完成階段任務,可改變自身狀態為就緒並通知其子節點繼續後續任務,而在奇數 tick 時則推動時間讓 tick += 1。
結構體
struct clock {
mutex_t mutex;
cond_t cond;
int ticks;
};
struct node {
struct clock *clock; // Each node points to a shared clock
struct node *parent; // Point to the parent node
mutex_t mutex;
cond_t cond;
bool ready; // Proceed to the next node only when ready is true
};
clock
中的 ticks
決定是否繼續下一步驟,同步上使用 condition variable + mutex 機制去通知此事件,且通知應該是用 broadcast 方式讓所有正在等待的執行緒得知在 POSIX_Thread 中 condition variable 的實作中因只有一個執行緒在等待,使用
pthread_cond_signal
node
中 ready
主要讓結點能進行下個工作,因此當親代節點就緒時,主動通知此事。同樣需要 condition variable + mutex 機制,但以 signal 方式通知子節點就好。thread_func(void *ptr)
static void *thread_func(void *ptr)
{
struct node *self = ptr;
bool bit = false;
for (int i = 1; clock_wait(self->clock, i); ++i) {
printf("Thread [%c] | i : %d\n", self->name, i);
if (self->parent){
printf("Thread [%c] wait parent\n", self->name);
node_wait(self->parent);
}
if (bit) {
printf("Thread [%c] send signal\n", self->name);
node_signal(self);
} else {
printf("Thread [%c] trigger clock\n", self->name);
clock_tick(self->clock);
}
bit = !bit;
}
node_signal(self);
return NULL;
}
main(void)
int main(void)
{
struct clock clock;
clock_init(&clock);
#define N_NODES 3
struct node nodes[N_NODES];
node_init(&clock, NULL, &nodes[0]);
for (int i = 1; i < N_NODES; ++i)
node_init(&clock, &nodes[i - 1], &nodes[i]);
printf("\nParent releationship : NULL ");
for(int i = 0; i < N_NODES; ++i)
printf(" -> [%c] ", nodes[i].name);
printf("\n");
pthread_t threads[N_NODES];
for (int i = 0; i < N_NODES; ++i) {
if (pthread_create(&threads[i], NULL, thread_func, &nodes[i]) != 0)
return EXIT_FAILURE;
}
printf("Tick start~\n");
clock_tick(&clock);
clock_wait(&clock, 1 << N_NODES);
clock_stop(&clock);
printf("\nTick stop~\n");
for (int i = 0; i < N_NODES; ++i) {
if (pthread_join(threads[i], NULL) != 0)
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
main
執行緒呼叫第一個 clock_tick
來讓 tick
變為 1
,這樣其他執行緒就可以開始根據 tick
逐步進行。而這裡的 clock_wait
會一直等待 tick
到 1 << N_NODES
之後再用 clock_stop
來讓節點的執行緒得以結束。
稍微改寫 main.c 使其更容易了解每個執行緒的進行過程,以下為輸出結果:
在執行程式時會出現下面錯誤,想請問為何會有這樣的問題?
FATAL: ThreadSanitizer: unexpected memory mapping 0x5c9bd4d2b000-0x5c9bd4d4b000
輸入 sudo sysctl vm.mmap_rnd_bits=28
來解決問題
參考 FATAL: ThreadSanitizer: unexpected memory mapping when running on Linux Kernels 6.6+
Parent releationship : NULL -> [A] -> [B] -> [C]
Tick start~
============clock_tick() tick : 1============
Thread [C] | i : 1
Thread [C] wait parent
Thread [B] | i : 1
Thread [B] wait parent
Thread [A] | i : 1
Thread [A] trigger clock
============clock_tick() tick : 2============
Thread [A] | i : 2
Thread [A] send signal
Thread [A] becomes not ready.
Thread [B] trigger clock
============clock_tick() tick : 3============
Thread [B] | i : 2
Thread [B] wait parent
Thread [A] | i : 3
Thread [A] trigger clock
============clock_tick() tick : 4============
Thread [A] | i : 4
Thread [A] send signal
Thread [A] becomes not ready.
Thread [B] send signal
Thread [B] | i : 3
Thread [B] becomes not ready.
Thread [C] trigger clock
============clock_tick() tick : 5============
Thread [B] wait parent
Thread [A] | i : 5
Thread [A] trigger clock
Thread [C] | i : 2
Thread [C] wait parent
============clock_tick() tick : 6============
Thread [A] | i : 6
Thread [A] send signal
Thread [A] becomes not ready.
Thread [B] trigger clock
============clock_tick() tick : 7============
Thread [B] | i : 4
Thread [B] wait parent
Thread [A] | i : 7
Thread [A] trigger clock
============clock_tick() tick : 8============
Thread [A] | i : 8
Thread [A] send signal
Tick stop~
Thread [A] becomes not ready.
Thread [B] send signal
Thread [B] becomes not ready.
Thread [C] send signal
clock_tick
先將 tick 增加 1 接著進入 clock_wait
中進行等待,一直到 clock->ticks
等於 Thread [A]
、Thread [B]
、Thread [C]
來執行這三個 node ,且他們之間的關係為 NULL -> [A] -> [B] -> [C]
thread_func
,且預設的 bit
為 false
,每次迭代都會進行反轉,使其執行不同工作內容
============clock_tick() tick : 2============
Thread [A] | i : 2
Thread [A] send signal
Thread [A] becomes not ready.
Thread [B] trigger clock
在 tick : 2 中
Thread [A]
發送節點的訊號喚醒正在等待的 Thread [B]
(喚醒時 Thread [A]
轉變為 ready 因此 Thread [B]
才能執行工作,此時 Thread [A]
交棒給 Thread [B]
後再次轉變為 not ready)Thread [B]
因第一次執行工作 bit = false
,所以進行 tick 加一的工作============clock_tick() tick : 3============
Thread [B] | i : 2
Thread [B] wait parent
Thread [A] | i : 3
Thread [A] trigger clock
在 tick : 3 中
Thread [A]
尚未 ready ,因此 Thread [B]
再次進入等待親代節點的狀態Thread [A]
不受親代節點影響繼續執行工作============clock_tick() tick : 4============
Thread [A] | i : 4
Thread [A] send signal
Thread [A] becomes not ready.
Thread [B] send signal
Thread [B] | i : 3
Thread [B] becomes not ready.
Thread [C] trigger clock
在 tick : 4 中
Thread [A]
發送節點的訊號喚醒正在等待的 Thread [B]
(喚醒時 Thread [A]
轉變為 ready 因此 Thread [B]
才能執行工作,此時 Thread [A]
交棒給 Thread [B]
後再次轉變為 not ready)Thread [B]
因為 bit
的關係工作內容變成發送節點訊號喚醒正在等待的 Thread [C]
後續都是相似的步驟,因此不再贅述
最後在 tick : 8 中, clock->ticks
等於 clock_stop(&clock)
將 ticks 的值設為 -1
,導致在執行緒函式中 for
迴圈中止。
為何在 thread_func
最後需要在進行 node_signal(self)
?
在 mutex/pi-test/main.c 函式中主要建立一個 Priority inversion 的情況
定義一個具集 TRY(f)
用來檢視錯誤,其中 #f
將參數 f
轉換為字符串,這樣可以在錯誤訊息中顯示出具體是哪個函式失敗。
#define TRY(f) \
do { \
int __r; \
if ((__r = (f != 0))) { \
fprintf(stderr, "Run function %s = %d\n", #f, __r); \
return __r; \
} \
} while (0)
ctx_init
函式中的 mutexattr_setprotocol
設置了優先級繼承,這樣可以避免優先級反轉。
static void ctx_init(struct ctx *ctx)
{
mutexattr_t mattr;
mutexattr_setprotocol(&mattr, PRIO_INHERIT);
mutex_init(&ctx->m0, &mattr);
}
#include <pthread.h>
int pthread_mutexattr_setprotocol(pthread_mutexattr_t *attr, int protocol);
attr
: 指向互斥鎖屬性的指標
protocol
: 協議屬性
PTHREAD_PRIO_INHERIT
: 使用優先級繼承PTHREAD_PRIO_PROTECT
: 使用優先級上限PTHREAD_PRIO_NONE
: 不使用優先級繼承或上限的策略執行
sudo
執行,因為牽涉對執行緒優先權的調整,需要 root 權限sudo taskset -c 1
問題 1
在研讀的過程中 pthread_create_with_prio
函式主要用作創建一個具有優先及的執行緒,當中 sched_param
的結構體又是從何而來?在完整程式碼中看起來沒有引用到排程器相關標頭檔。
部份程式碼
static int pthread_create_with_prio(pthread_t *thread,
pthread_attr_t *attr,
void *(*start_routine)(void *),
void *arg,
int prio)
{
struct sched_param param;
param.sched_priority = prio;
TRY(pthread_attr_setschedparam(attr, ¶m));
TRY(pthread_create(thread, attr, start_routine, arg));
return 0;
}
在 pthread_setschedparam(3) — Linux manual page 找到 sched_param
結構體
struct sched_param {
int sched_priority; /* Scheduling priority */
};
pthread_attr_setschedpolicy(&attr, SCHED_FIFO)
pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED)
理解這兩 API 代表意思
int pthread_attr_setinheritsched(pthread_attr_t *attr,int inheritsched);
attr
: 指向執行緒屬性的指標
inheritsched
: 是否繼承負執行緒的排程屬性
PTHREAD_EXPLICIT_SCHED
: 不繼承,新執行緒使用由 pthread_attr_setschedparam
和 pthread_attr_setschedpolicy
設置的排程屬性,執行緒PTHREAD_INHERIT_SCHED
: 繼承父執行緒的排程屬性返回值:成功返回 0
,失敗返回非 0 值
int pthread_attr_setschedpolicy(pthread_attr_t *attr, int policy);
attr
: 指向執行緒屬性的指標
policy
: 排程屬性
SCHED_FIFO
: 搶佔式排程,即會被優先即更高的執行緒搶佔SCHED_RR
: 輪詢式排程,擁有同樣優先權的執行緒會以一固定時間量,又稱「時間配量」(time slice),用輪詢方式執行。SCHED_OTHER
: 為 Linux 中默認。僅用於靜態優先級為 0 的執行緒(i.e., threads under real-time policies always have priority over SCHED_OTHER
processes),從靜態優先級 0 列表中選擇要運行的執行緒,由動態優先級決定(即 nice 值)。SCHED_OTHER
執行緒之間的公平進展。返回值:成功返回 0
,失敗返回非 0 值
int pthread_attr_setschedparam(pthread_attr_t *restrict attr,
const struct sched_param *restrict param);
返回值:成功返回 0
,失敗返回非 0 的值
問題 2
請問若沒有給定優先權的級別,系統的判定如何決定下個執行的執行緒?
TODO : 理解 nice 值、CFS
測試
完整測試程式 -> mutex/test_priority
基於問題 3 進行相關測試。設定三個執行緒,Thread 1 與 Thread 2 將優先級設置在 1 (其中優先級設置範圍 1 ~ 99),Thread 使用預設屬性(即 SCHED_OTHER
),並進行兩種排程屬性:SCHED_FIFO
、SCHED_RR
測試
SCHED_FIFO
$ make run
sudo taskset -c 1 /home/jeremy/posix_thread/mutex/test_priority/test_priority
1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
在搶佔式的排程中,Thread 1 並沒有讓出資源給 Thread 2 執行。
問題 3
當這種靜態優先級與動態優先級同時出現時,如何決定動態優先級的執行緒何時能執行?在 SCHED_FIFO
的結果中看到 Thread 3 並沒有成功執行。
SCHED_RR
$ make run
sudo taskset -c 1 /home/jeremy/posix_thread/mutex/test_priority/test_priority
1 1 1 1 1 1 2 1 2 3 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 3 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2
可以看到 Thread 1 與 Thread 2 將資源的平均分配給不同執行緒,Thread 3 的非即時執行緒根據動態優先級偶爾也能分配到資源。
priority protection mutex 設定
muthread_mutexattr_setprioceiling
: 設定 mutex 的 prioceiling
muthread_mutexattr_setprotocol(&mattr, TBTHREAD_PRIO_INHERIT);
muthread_mutexattr_setprioceiling(&mattr, 25);
muthread_mutex_init(&mutex_normal, &mattr);
muthread_mutex_init(&mutex_normal_2, &mattr);
在測驗程式 Tests/test-04-priority-inversion.c 中,我們在已經設定好會發生 priority inversion 的情況下去實作出 priority protection mutex 是否會過於不實際,若面對到更複雜的情況下使用 PP 是否對效能真的有更多提升?該如何驗證?
何謂