--- tags: concurrency --- # [並行程式設計](https://hackmd.io/@sysprog/concurrency): POSIX Thread ## Process vs. Thread vs. Coroutines * With threads, the operating system switches running tasks **preemptively** according to its scheduling algorithm. * With coroutines, the programmer chooses, meaning tasks are cooperatively multitasked by pausing and resuming functions at set points. * coroutine switches are cooperative, meaning the programmer controls when a switch will happen. * The kernel is not involved in coroutine switches. 一圖勝千語: ![](https://hackpad-attachments.s3.amazonaws.com/embedded2016.hackpad.com_K6DJ0ZtiecH_p.537916_1460615185290_undefined) 將函式呼叫從原本的面貌: [ [source](http://www.csl.mtu.edu/cs4411.ck/www/NOTES/non-local-goto/coroutine.html) ] ![](https://hackpad-attachments.s3.amazonaws.com/embedded2016.hackpad.com_K6DJ0ZtiecH_p.537916_1460615014454_undefined) 轉換成以下: ![](https://hackpad-attachments.s3.amazonaws.com/embedded2016.hackpad.com_K6DJ0ZtiecH_p.537916_1460615044111_undefined) C 程式實作 coroutines 的手法相當多,像是: * [setjmp / longjmp](http://descent-incoming.blogspot.tw/2014/02/coroutine.html) ### Thread & Process 取自[Programming Parallel Machines Spring 2019](https://engineering.purdue.edu/~smidkiff/ece563/)第7周教材 ![](https://i.imgur.com/QW1YWsC.png) ![](https://i.imgur.com/gUF3Vz9.png) A thread can possess an independent flow of control and be schedulable because it maintains its own: - Stack pointer - Registers - Scheduling properties (such as policy or priority) - Set of pending and blocked signals - Thread specific data. ### PThread (POSIX threads) ![](https://i.imgur.com/0yeKpoT.png) #### Example ```c #include <stdio.h> #include <stdlib.h> #include <pthread.h> void printMsg(char *msg) { int *status = malloc(sizeof(int)); *status = 1; printf("%s\n", msg); pthread_exit(status); } int main(int argc, char **argv) { pthread_t thrdID; int *status = malloc(sizeof(int)); printf("creating a new thread\n"); pthread_create(&thrdID, NULL, (void *) printMsg, argv[1]); printf("created thread %lu\n", thrdID); pthread_join(thrdID, (void**) &status); printf("Thread %lu exited with status %d\n", thrdID, *status); return 0; } ``` 執行結果 ```shell $ ./a.out "Hello world!" creating a new thread created thread 219465472 Hello world! Thread 219465472 exited with status 1 ``` 延伸閱讀: [POSIX Threads Programming](https://hpc-tutorials.llnl.gov/posix/) ### Synchronizing Threads 3 basic synchronization primitives - mutex locks - condition variables - semaphores 取自 Ching-Kuang Shene (冼鏡光) 教授的教材:[Part IV Other Systems: IIIPthreads: A Brief Review](http://pages.mtu.edu/~shene/FORUM/Taiwan-Forum/ComputerScience/004-Concurrency/WWW/SLIDES/15-Pthreads.pdf) :::info 「[冼](https://zh.wikipedia.org/wiki/%E5%86%BC%E5%A7%93)」音同「顯」 ::: ### mutex locks ```cpp pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; int pthread_mutex_init(pthread_mutex_t *mutex, pthread_mutexattr_t *attr); int pthread_mutex_destroy(pthread_mutex_t *mutex); int pthread_mutex_lock(pthread_mutex_t *mutex); int pthread_mutex_unlock(pthread_mutex_t *mutex); int pthread_mutex_trylock(pthread_mutex_t *mutex); ``` ![](https://i.imgur.com/mE4l7n1.png) - Only the ==owner== can unlock a mutex. Since mutexes cannot be copied, use pointers. - If `pthread_mutex_trylock()` returns `EBUSY`, the lock is already locked. Otherwise, the calling thread becomes the owner of this lock. - With `pthread_mutexattr_settype()`, the type of a mutex can be set to allow recursive locking or report deadlock if the owner locks again ```cpp pthread_mutex_t bufLock; void producer(char* buf1) { char* buf = (char*) buf1; for(;;) { while(count == MAX_SIZE); pthread_mutex_lock(&bufLock); buf[count] = (char) ((int)('a')+count); count++; printf("%d ", count); pthread_mutex_unlock(&bufLock); } } void consumer(void* buf1) { char* buf = (char*) buf1; for(;;) { while(count == 0); pthread_mutex_lock(&bufLock); count--; printf("%d ", count); pthread_mutex_unlock(&bufLock); } } ``` ### condition variables ```cpp int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr); int pthread_cond_destroy(pthread_cond_t *cond); int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex); int pthread_cond_signal(pthread_cond_t *cond); int pthread_cond_broadcast(pthread_cond_t *cond); // all threads waiting on a condition need to be woken up ``` - Condition variables allow a thread to block until a specific condition becomes true - blocked thread goes to wait queue for condition - When the condition becomes true, some other thread signals the blocked thread(s) ![](https://i.imgur.com/9gRzRDG.png) - Conditions in Pthreads are usually used with a mutex to enforce mutual exclusion. - the `wait` call should occur under the protection of a mutex ```cpp pthread_mutex_t *lock; pthread_cond_t *notFull, *notEmpty; void consumer(char* buf) { for(;;) { pthread_mutex_lock(lock); while(count == 0) pthread_cond_wait(notEmpty, lock); useChar(buf[count-1]); count--; pthread_cond_signal(notFull); pthread_mutex_unlock(lock); } } ``` ### semaphores ```cpp sem_t semaphore; int sem_init(sem_t *sem, int pshared, unsigned int value); int sem_wait(sem_t *sem); int sem_post(sem_t *sem); ``` - Can do increments and decrements of semaphore value - Semaphore can be initialized to any value - Thread blocks if semaphore value is less than or equal to zero when a decrement is attempted - As soon as semaphore value is greater than zero, one of the blocked threads wakes up and continues - no guarantees as to which thread this might be ```cpp sem_t empty, full; void producer(char* buf) { int in = 0; for(;;) { sem_wait(&empty); buf[in] = getChar(); in = (in + 1) % MAX_SIZE; sem_post(&full); } } void consumer(char* buf) { int out = 0; for(;;) { sem_wait(&full); useChar(buf[out]); out = (out + 1) % MAX_SIZE; sem_post(&empty); } } ``` ## POSIX Threads ![](https://hackpad-attachments.s3.amazonaws.com/embedded2016.hackpad.com_xBRCF9BsC50_p.537916_1457976043696_fork-join.jpg) ## POSIX Thread 實例: 光線追蹤 [2016q1:Homework2](http://wiki.csie.ncku.edu.tw/embedded/2016q1h2) 提到 raytracing 程式,[UCLA Computer Science 35L, Winter 2016 Software Construction](http://web.cs.ucla.edu/classes/winter16/cs35L/) 課程有個作業值得參考:**[S35L_Assign8_Multithreading](https://github.com/maxwyb/CS35L_Assign8_Multithreading)** 編譯與測試 ```shell $ git clone https://github.com/maxwyb/CS35L_Assign8_Multithreading.git raytracing-threads $ cd raytracing-threads $ make clean all $ ./srt 4 > out.ppm $ diff -u out.ppm baseline.ppm ``` 預期會得到下圖: ![](https://hackpad-attachments.s3.amazonaws.com/embedded2016.hackpad.com_xBRCF9BsC50_p.537916_1457975632540_out.png) 將 `srt` 後面的數字換成 1, 2, 4, 8 來測試執行時間 [ [main.c](https://github.com/maxwyb/CS35L_Assign8_Multithreading/blob/master/main.c) ] ```Cpp #include <pthread.h> pthread_t* threadID = malloc(nthreads * sizeof(pthread_t)); int res = pthread_create(&threadID[t], 0, pixelProcessing, (void *)&intervals[t]); int res = pthread_join(threadID[t], &retVal); ``` ## POSIX Thread - [ ] [POSIX Threads Programming](https://hpc-tutorials.llnl.gov/posix/) * Condition variables provide yet another way for threads to synchronize. While mutexes implement synchronization by controlling thread access to data, ==condition variables allow threads to synchronize based upon the actual value of data.== * Condition Variable * 當有兩個執行緒 A 跟 B。其中 A 需要等到一個變數的狀態轉爲 true 時,才能繼續執行,而這個變數狀態卻掌握在 B 的手中,那 B 將變數狀態改爲 true 後,就能用 signal 來喚醒在等待中的 A * 如果不這樣作,那麼 A 就會需要一直判斷變數狀態是否改爲true 了,這會損失很多效能成本,且不能保證狀態是否還掌握在其它執行緒手中 * condition variable 總是搭配 mutex lock 使用 ![](http://i.cmpnet.com/ddj/blogs/2011/06/inheritance.png) Condition variables must be declared with type pthread_cond_t, and must be initialized before they can be used. There are two ways to initialize a condition variable: 1. Statically, when it is declared. For example:  **pthread_cond_t myconvar = PTHREAD_COND_INITIALIZER;** 1. Dynamically, with the pthread_cond_init() routine. The ID of the created condition variable is returned to the calling thread through the _condition_ parameter. This method permits setting condition variable object attributes, _attr_. ## Synchronization [Intro to Computer Systems](http://www.cs.cmu.edu/~213/schedule.html) 對應的教科書 CS:APP [Concurrent Programming](https://www.cs.cmu.edu/~213/lectures/23-concprog.pdf), Page 40-56 ![](https://i.imgur.com/54C4mj9.png) CS:APP [Synchronization: Advanced](https://www.cs.cmu.edu/~213/lectures/25-sync-advanced.pdf), Page 40-49 ![](https://i.imgur.com/8Uhm2TA.png) 計數器的程式範例: ```cpp // 全域變數 volatile long cnt = 0; // 計數器 void *thread(void *vargp) { long niters = *((long *)vargp); for (long i = 0; i < niters; i++) cnt++; return NULL; } int main(int argc, char **argv) { long niters; pthread_t tid1, tid2; niters = atoi(argv[1]); pthread_create(&tid1, NULL, thread, &niters); pthread_create(&tid2, NULL, thread, &niters); pthread_join(tid1, NULL); pthread_join(tid2, NULL); if (cnt != (2 * niters)) printf("Wrong! cnt=%ld\n", cnt); else printf("Correct! cnt=%ld\n", cnt); exit(0); } ``` 運行的結果每次不一樣: ``` $ ./badcnt 10000 Correct! cnt=20000 $ ./badcnt 10000 Wrong! cnt=10458 ``` 把 `cnt` 變數操作的部分抽出來看: 執行緒函式的迴圈程式碼: ```cpp for (long i = 0; i < niters; i++) cnt++; ``` 對應的組合語言程式碼: ``` # 以下四句為 Head 部分,記為 H movq (%rdi), %rcx testq %rcx, %rcx jle .L2 movl $0, %eax .L3: movq cnt(%rip), %rdx # 讀取 cnt,記為 L addq $1, %rdx # 更新 cnt,記為 U movq %rdx, cnt(%rip) # 寫入 cnt,記為 S # 以下為 Tail 部分,記為 T addq $1, %rax cmpq %rcx, %rax jne .L3 .L2: ``` cnt 使用 `volatile` 關鍵字聲明,意思是避免編譯器產生的程式碼中,透過暫存器來保存數值,無論是讀取還是寫入,都在主記憶體操作。 細部的步驟分成 5 步:`H` -> `L` -> `U` -> `S` -> `T`,尤其要注意 LUS 這三個操作,這三個操作必須在一次執行中完成,一旦次序打亂,就會出現問題,不同執行緒拿到的值就不一定是最新的。 也就是說該函式的正確執行和指令的執行順序有關 ![](https://i.imgur.com/EU3fibV.png) 將 n 個 concurrent 執行緒的執行模型描述為 n 維空間中的軌跡線,每條軸 k 對應 k 的進度。下圖展示對於 H1, L1, U1, H2, L2, S1, T1, U2, S2, T2 的軌跡線的進度圖: ![](https://i.imgur.com/rADk08t.png) 對於執行緒編號 `i`,操作共享變數 `cnt` 內含值的指令 (,Li,Ui,Si) 構成了一個 CS,每個 CS 不應該和其他執行緒的 CS 交替執行。換言之,需要對共享變數進行互斥的存取: * 當執行緒繞過不安全區,叫做安全軌跡線; * 反之,為不安全軌跡線; ![](https://i.imgur.com/vx3U1qg.png) mutual exclusion ([互斥](http://terms.naer.edu.tw/detail/50571/)) 手段的選擇,不是根據 CS 的大小,而是根據 CS 的性質,以及有哪些部分的程式碼,也就是,仰賴於核心內部的執行路徑。 semaphore 和 spinlock 屬於不同層次的互斥手段,前者的實現仰賴於後者,可類比於 HTTP 和 TCP/IP 的關係,儘管都算是網路通訊協定,但層次截然不同: * [POSIX semaphore](http://www.csc.villanova.edu/~mdamian/threads/posixsem.html) 用於多個 process 之間對資源的互斥 * 雖然也是在核心中,但是該核心執行路徑是以 process 的身份,代表 process 來爭奪資源 * 如果 競爭不上,會有 context switch,process 可以去sleep,但 CPU 不會停下來,會接著運作其他的執行路徑 * 從概念上說,這和單核 CPU 或多核 CPU 沒有直接的關係,只是在 semaphore 本身的實作層面上,為了保證 semaphore 的 atomics,在多核 CPU 中需要 spinlock 來互斥 * 在多核 CPU 中, 加上了其他 CPU 的干擾,因此需要 spinlock 來幫助。一旦 CPU 進入 spinlock,就不會做別的事,直到鎖定成功為止。因此,這就決定了被 spinlock 鎖住的 CS 不能停 spinlock 的本意和目的就是保證資料修改的 atomics,因此也沒有理由在 spinlock 鎖住的 CS 中停留。 + [Part 1: Mutex Locks](https://github.com/angrave/SystemProgramming/wiki/Synchronization,-Part-1:-Mutex-Locks) + [Part 2: Counting Semaphores](https://github.com/angrave/SystemProgramming/wiki/Synchronization,-Part-2:-Counting-Semaphores) + [Part 3: Working with Mutexes And Semaphores](https://github.com/angrave/SystemProgramming/wiki/Synchronization,-Part-3:-Working-with-Mutexes-And-Semaphores) + [Part 4: The Critical Section Problem](https://github.com/angrave/SystemProgramming/wiki/Synchronization,-Part-4:-The-Critical-Section-Problem) + [Part 5: Condition Variables](https://github.com/angrave/SystemProgramming/wiki/Synchronization,-Part-5:-Condition-Variables) + [Part 6: Implementing a barrier](https://github.com/angrave/SystemProgramming/wiki/Synchronization,-Part-6:-Implementing-a-barrier) + [Part 7: The Reader Writer Problem](https://github.com/angrave/SystemProgramming/wiki/Synchronization,-Part-7:-The-Reader-Writer-Problem) + [Part 8: Ring Buffer Example](https://github.com/angrave/SystemProgramming/wiki/Synchronization,-Part-8:-Ring-Buffer-Example) + [Part 9: Synchronization Across Processes](https://github.com/angrave/SystemProgramming/wiki/Synchronization,-Part-9:-Synchronization-Across-Processes) 案例: [Mutex and Semaphore](https://hackmd.io/@nKngvyhpQdGagg1V6GKLwA/Skh9Z2DpX) (你今天用對 mutex 了嗎?) [Priority Inversion on Mars](http://wiki.csie.ncku.edu.tw/embedded/priority-inversion-on-Mars.pdf) ![](https://i.imgur.com/JosQbfn.png) > 優先權: Task~1~(H) > Task~2~(M) > Task ~3~(L) * 針對上圖解析 * `(1)` Task~3~ 正在執行,而 Task~1~ 和 Task~2~ 正等待特定事件發生,例如 timer interrupt * `(2)` Task~3~ 為了存取共用資源,必須先獲取 lock * `(3)` Task~3~ 使用共用資源 (灰色區域)。 * `(4)` Task~1~ 等待的事件發生 (可以是 "delay n ticks",此時 delay 結束),作業系統核心暫停 Task~3~ 改執行 Task~1~,因為 Task~1~ 有更高的優先權 * `(5)` `(6)` Task~1~ 執行一段時間直到試圖存取與 Task~3~ 共用的資源,但 lock 已被 Task~3~ 取走了,Task~1~ 只能等待 Task~3~ 釋出 lock * `(7)` `(8)` Task~3~ 恢復執行,直到 Task~2~ 因特定事件被喚醒,如上述 `(4)` 的 Task~3~ * `(9)` `(10)` Task~2~ 執行結束後,將 CPU 資源讓給 Task~3~ * `(11)` `(12)` Task~3~ 用完共享資源後釋放 lock。作業系統核心知道尚有個更高優先權的任務 (即 Task~1~) 正在等待此 lock,執行 context switch 切換到 Task~1~ * `(13)` Task~1~ 取得 lock,開始存取共用資源。 Task~1~ 原本該有最高優先權,但現在卻降到 Task~3~ 的等級,於是反到是 Task~2 和 Task~3~ 有較高的優先權,優先權順序跟預期不同,這就是 Priority Inversion(優先權反轉)。 priority inheritance (PI) 是其中一種 Priority Inversion 解法: ![](https://i.imgur.com/pDYc0iL.png) * 針對上圖解析 * `(1)` `(2)` 同上例,Task~3~ 取得 lock * `(3)` `(4)` `(5)` Task~3~ 存取資源時被 Task ~1~ 搶佔 * `(6)` Task~1~ 試圖取得 lock。作業系統核心發現 Task~3~ 持有 lock,但 Task~3~ 優先權卻低於 Task~1~,於是作業系統核心將 Task~3~ 優先權提升到與 Task~1~ 的等級 * `(7)` 作業系統核心讓 Task~3~ 恢復執行,Task~1~ 則繼續等待 * `(8)` Task~3~ 用畢資源,釋放 lock,作業系統核心將 Task~3~ 恢復成原本的優先權,恢復 Task~1~ 的執行 * `(9)` `(10)` Task~1~ 用畢資源,釋放 lock * `(11)` Task~1~ 釋出 CPU,Task~2~ 取得 CPU 控制權。在這個解法中,Task~2~ 不會導致 priority inversion * [Mutex vs. Semaphores – Part 1: Semaphores](https://blog.feabhas.com/2009/09/mutex-vs-semaphores-%E2%80%93-part-1-semaphores/) * [Edsger Dijkstra](https://en.wikipedia.org/wiki/Edsger_W._Dijkstra) 在 1965 年提出 binary semaphore 的構想,嘗試去解決在 concurrent programs 中可能發生的 race conditions。想法便是透過兩個 function calls 存取系統資源 Semaphore,來標明要進入或離開 critical region。 * 使用 Semaphore 擁有的風險 * Accidental release * This problem arises mainly due to a bug fix, product enhancement or cut-and-paste mistake. * Recursive deadlock * 某一個 task 重複地想要 lock 它自己已經鎖住的 Semaphore,通常發生在 libraries 或 recursive functions。 * 某一個 task 其持有的 Semaphore 已經被終止或發生錯誤。 * Priority inversion * Semaphore as a signal * **Synchronization** between tasks is where, typically, one task waits to be notified by another task before it can continue execution (unilateral rendezvous). A variant of this is either task may wait, called the bidirectional rendezvous. **Quite different to mutual exclusion, which is a protection mechanism.** * 使用 Semaphore 作為處理 Synchronization 的方法容易造成 Debug 困難及增加 "accidental release" 類性的問題。 * [Mutex vs. Semaphores – Part 2: The Mutex](https://blog.feabhas.com/2009/09/mutex-vs-semaphores-%E2%80%93-part-2-the-mutex/) * 作業系統實作中,需要引入的機制。"The major use of the term mutex appears to have been driven through the development of the common programming specification for UNIX based systems." * 儘管 Mutex 及 Binary Semaphore 的概念極為相似,但有一個最重要的差異:**the principle of ownership**。 * 而 Ownership 的概念可以解決上一章所討論的五個風險 * Accidental release * Mutex 若遭遇 Accidental release 將會直接發出 Error Message,因為它在當下並不是 owner。 * Recursive Deadlock * 由於 Ownership,Mutex 可以支援 relocking 相同的 Mutex 多次,只要它被 released 相同的次數。 * Priority Inversion * Priority Inheritance Protocol * [Priority Ceiling Protocol](https://en.wikipedia.org/wiki/Priority_ceiling_protocol) * [投影片](https://tams.informatik.uni-hamburg.de/lehre/2016ss/vorlesung/es/doc/cs.lth.se-RTP-F6b.pdf): 介紹 PCP,並實際推導兩個例子。 * Death Detection * 如果某 task 因為某些原因而中止,則 RTOS 會去檢查此 task 是否擁有 Mutex ;若有, RTOS 則會負責通知所有 waiting 此 Mutex 的 tasks 們。最後再依據這些 tasks 們的情況,有不同的處理方式。 * All tasks readied with error condition * 所有 tasks 假設 critical region 為未定義狀態,必需重新初始化。 * Only one task readied * Ownership 交給此 readied tesk,並確保 the integrity of critical region,之後便正常執行。 * Semaphore as a signal * Mutex 不會用來處理 Synchronization。 * [Mutex vs. Semaphores – Part 3: Mutual Exclusion Problems](https://blog.feabhas.com/2009/10/mutex-vs-semaphores-%E2%80%93-part-3-final-part-mutual-exclusion-problems/) * 但還是有一些問題是無法使用 mutex 解決的。 * Circular deadlock * One task is blocked waiting on a mutex owned by another task. That other task is also block waiting on a mutex held by the first task. * Necessary Conditions * Mutual Exclusion * Hold and Wait * Circular Waiting * No preemption * Solution: Priority Ceiling Protocol * PCP 的設計可以讓低優先權的 task 提升至 ceiling value,讓其中一個必要條件 "Hold and Wait" 不會成立。 * Non-cooperation * Most important aspect of mutual exclusion covered in these ramblings relies on one founding principle: **we have to rely on all tasks to access critical regions using mutual exclusion primitives**. * Unfortunately this is dependent on the design of the software and cannot be detected by the run-time system. * Solution: Monitor * Not typically supplied by the RTOS * A monitor simply encapsulates the shared resource and the locking mechanism into a single construct. * Access to the shared resource is through a controlled interface which cannot be bypassed. * [Mutexes and Semaphores Demystified](http://www.barrgroup.com/Embedded-Systems/How-To/RTOS-Mutex-Semaphore) * Myth: Mutexes and Semaphores are Interchangeable * Reality: 即使 Mutexes 跟 Semaphores 在實作上有極為相似的地方,但他們通常被用在不同的情境。 * The correct use of a semaphore is for **signaling** from one task to another. * A mutex is meant to be taken and released, always in that order, by each task that uses the shared resource it protects. * 還有一個重要的差別在於,即便是正確地使用 mutex 去存取共用資源,仍有可能造成危險的副作用。 * 任兩個擁有不同優先權的 RTOS task 存取同一個 mutex,可能會創造 **Priority Inversion**。 * Definition: The real trouble arises at run-time, when a medium-priority task preempts a lower-priority task using a shared resource on which the higher-priority task is pending. If the higher-priority task is otherwise ready to run, but a medium-priority task is currently running instead, a priority inversion is said to occur. [[Source]](https://barrgroup.com/Embedded-Systems/How-To/RTOS-Priority-Inversion) * 但幸運的是,Priority Inversion 的風險可以透過修改作業系統裡 mutex 的實作而減少,其中會增加 acquiring and releasing muetexs 時的 overhead。 * 當 Semaphores 被用作 signaling 時,是不會造成 Priority Inversion 的,也因此是沒有必要去跟 Mutexes 一樣去更改 Semaphores 的實作。 * This is a second important reason for having distinct APIs for these two very different RTOS primitives.