TODO: 〈並行和多執行緒程式設計〉系列講座並紀錄問題和提出改進 ## 〈[概念](https://hackmd.io/@sysprog/concurrency/%2F%40sysprog%2Fconcurrency-concepts)〉 **多工處理** - 循序式程式(sequential program) - 時間的演變與程式的執行同一方向,在一特定時間,僅有一項工作正在進行。 :::info 若在執行工作期間,該任務發生錯誤,是否就會導致整個程式沒法中止? ::: - 前景/背景式程式(foreground / background program) - 具備中斷 - 須確保資料的一致性問題。例如若中斷在使用者修改控制器參數的途中到來,則可能計算出不正確的控制訊號。因此對前景(中斷處理; ISR )與背景共用的資料區必需加以保護。 - 多工程式 (multi-tasking program) - 多工程式設計、並行成式、多重程式 - TODO:https://www.geeksforgeeks.org/difference-between-multitasking-multithreading-and-multiprocessing/ - 並行程式設計,在系統中若包含多個 CPU 或不對等 (heterogeneous) 的執行單元時,必須將工作分散至各執行單元同時執行。 - 理想情況上若各子工作可以完全獨立運行,則系統將可達到其最高效率。但各執行單元間彼此仍需共用系統資料(如記憶體、I/O 等)。 | | Multiprogramming 多重程式| Multitasking 多工| Multithreading 多執行緒| Multiprocessing 多重處理| | -------- | -------- | -------- | - | - | | 定義 | 在單一 CPU 上執行多個程式 | 在單一 CPU 上執行多個工作 | 在單一工作上運行多個執行緒 | 在多個 CPU 上運行多個行程 | | 共享資源 | 資源(CPU、記憶體)在程式之間共享 | 資源(CPU、記憶體)在工作之間共享 | 資源(CPU、記憶體)在執行緒之間共享 | 每個行程都有自己一組的資源(CPU、記憶體) | | 排程 | 使用循環或基於優先權的排程為程式分配 CPU 時間 | 使用基於優先權或時間分割的調度為工作分配 CPU 時間 | 使用基於優先權或時間分割的調度為執行緒分配 CPU 時間 | 每個行程可以有自己的排程演算法 | | 記憶體管理 | 每個程式都有自己的記憶體空間 | 每個工作都有自己的記憶體空間 | 執行緒在工作內共享記憶體空間| 每個行程都有自己的記憶體空間 | | 工作切換 | 需要工作切換才能在程式之間切換 | 需要工作切換才能在工作之間切換 | 需要工作切換才能在執行緒之間切換 | 需要工作切換才能在行程之間切換 | | Inter-Process Communication (IPC) | 使用訊息傳遞或共享記憶體進行 IPC | 使用訊息傳遞或共享記憶體進行 IPC | 使用 IPC 的執行緒同步機制(例如 lock、semaphores) | 使用 IPC 的進程間通訊機制(例如 pipes、sockets) | :::info 不對等 (heterogeneous) 的執行單元,代表的是什麼意思?為何在系統中會有不對等的情況? ::: 並行程式設計對單一 CPU 來說,同一時間只執行一個任務,但利用快速切換的方式,看似一次在執行多的任務。 ![visualization](https://hackmd.io/_uploads/HJCirEYXC.png) ![visualization (1)](https://hackmd.io/_uploads/BkXhSEKXA.png) 多工程式與循序式與前景/背景式程式,有著顯著差異: 1. 系統設計者完全以工作的性質來區分程式,不再遵循程式執行順序的限制 2. 各工作佔用系統資源的情況,可進行嚴密地管理與時間的分配 3. 多工的機制較符合現實,執行主要任務的期間,伴隨著會進行其他工作一起進行。 :::info 這樣快速切換的方法,在單一 CPU 上是否會付出更多的成本,在速度上與循序式相比是否真的能更快? ::: **task/thread** 在多個 CPU 的環境中,並行處理的具體作法即是將一個循序執行的程式打散在個別 CPU 中,而每一 CPU 所執行的部份稱為一個執行緒。為了達成某個目標,可能需要執行多個任務,而每個任務被分割出多個執行緒(即多工多執行緒系統(multi-tasking multi-threading systems))。 **工作切換(context switch)** 多工作業系統核心其中一項關鍵功能是,切換工作時必須保證被停止的工作可在未來繼續執行。 當工作切換發生時,系統必須記錄切換前的下一次執行地址,當工作切換回復時, CPU 將繼續執行切換前的動作與狀態,以保證該工作擁有的資料區不受污染或破壞 (corruption)。 **排程器(Scheduler)** 用作決定 CPU 下一個要執行的任務。 經典的演算法有: - 優先順序式 (priority scheduling) 特別針對即時系統 (real-time system; RTS),優先順序式排程是相當重要的特徵,通常硬即時 (hard real-time) 的工作,其優先順序較高。 - 時間分割式 (round-robin scheduling 或 time slicing) 時間分割式的排程法,則針對重要性相同的工作,以切割 CPU 的執行時間來達到並行處理的幻覺 (illusion)。 圖(a)當某特定工作執行時間很長,將導致 CPU 的其他工作進入漫長的等待,因此時間分割法將 CPU 的執行時間做切割,各工作均能分配一定的資源與時間,其中時間切割 (time slice)的分配可以式動態的,系統可針對不同的狀況在執行時對分割做動態 (dynamic) 改變。 ![visualization (2)](https://hackmd.io/_uploads/ryoGzStXA.png) ![visualization (3)](https://hackmd.io/_uploads/BkpMzBYm0.png) **搶佔式與非搶佔式核心** 差別在於工作本身對 CPU 使用權的交出是強制達成或是自願性的。在非搶佔式的作業系統中,各工作的程式碼中必須包含交出 CPU 使用權的動作,為達並行的需求,該動作的頻率必須夠高,否則會讓使用者感受到明顯的等待。 ![visualization](https://hackmd.io/_uploads/Bk5xK3FQR.png) ![visualization (1)](https://hackmd.io/_uploads/ry3gKhKQC.png) 工作 1,2,3 的優先順序分別為 $Task_1>Task_2>Task_3$。 一開始工作 2 正在執行,當排程器(B)觸發,使得工作 1 與工作 3 排定執行。在非搶占式的作業系統中,必須等待工作 2 自動放棄 CPU 使用權時才能執行工作 1、3,因此反應速度難以估計,無法滿足許多即時系統應用的需求。在搶佔式的作業系統中,當工作 1 就緒時,工作 2 由於優先順序較低,將被迫放棄 CPU 使用權而交由工作 1 使用。由這個比較可知時間分割式的排程法 (time slicing),必須使用搶佔式的核心方可落實。 > 須將工作進行時間分割,才能落實將優先度較高的工作進行搶占。 在設計上,搶佔式的作法最大優點為反應快速,但也大幅增加實作難度,同時也必須注意程式碼的再進入性 (reentrancy) 與保護共用資料區等。 **經典的 Fork-Join 模型** 分成三部分: 1. fork:準備進入要平行化的區域時,主執行緒產生一組執行緒 ![image](https://hackmd.io/_uploads/HyUObaFQC.png) 2. parallel region: 這些執行緒合力執行給定的工作 ![image](https://hackmd.io/_uploads/Hyf5WaFQC.png) 3. join: 工作完成之後, 又回到 fork 之前, 單一執行緒的狀態 ![image](https://hackmd.io/_uploads/BJHc-6tQC.png) fork-join 模型就是不斷重複上述過程。 **Concurrency(並行) vs. Parallelism (平行)** - Concurrency 是指程式架構,將程式拆開成多個可獨立運作的工作,但不須要平行化。案例: 裝置驅動程式 - 拆開多個工作不一定要同時運行 - 多個工作在單核 CPU 上運行 - Parallelism 是指程式執行,同時執行多個程式。Concurrency 可能會用到 parallelism,但不一定要用 parallelism 才能實現 concurrency。案例: 向量內積計算 - 程式會同時執行(例如:分支後,同時執行,再收集結果) - 一個工作在多核 CPU 上運行 ![image](https://hackmd.io/_uploads/Sk_vDpF7A.png) > 向量內積可以使用迴圈的方式來完成運算(沒有 Parallelism ),也可以使用平行的方式來同時執行每個相對應位置的乘法。因此平行化只是一種選擇,並非一定。 **如何確保並行執行結果正確** ![image](https://hackmd.io/_uploads/BJyug0YQ0.png) - Split a program: 將燒書過程中,切割好各地鼠負責的工作。 - Synchronization: 確保地鼠在交接工作時,能夠有效率且正確的溝通。 ## 〈[排程器原理](https://hackmd.io/@sysprog/concurrency/%2F%40sysprog%2Fconcurrency-sched#%E4%B8%A6%E8%A1%8C%E7%A8%8B%E5%BC%8F%E8%A8%AD%E8%A8%88-%E6%8E%92%E7%A8%8B%E5%99%A8%E5%8E%9F%E7%90%86)〉 **探究排程器** - 在 Linux 核心中要處理多個客戶端,想到的可能是為何不為每個客戶端建立一個 thread ? 但在 Linux 中我們建立 thread 是有限制的 ```c $ cat /proc/sys/kernel/threads-max 63704 # Threads in linux are light weight processes (LWP). $ cat /proc/sys/kernel/pid_max 131072 # Maximum number of VMAs a process can own. $ cat /proc/sys/vm/max_map_count 65530 ``` - 除了行程與執行緒的資源限制,也有 I/O 的限制 - 在 I/O 中可以將 socket 標記為 non-blocking ,在等待 I/O 的時候仍繼續尋找下個客戶端。 - 作業系統提供用作多工的 API -> select (取自數位邏輯的多工器),在眾多的連線中追蹤誰已經 ready **coroutine** - 非搶佔式的多工(non-preemptive multitasking),又稱協同式多工。 函式呼叫,主程式呼叫到 B(),結束時回到主程式 A()。 ![image](https://hackmd.io/_uploads/H1mnhZcXR.png) coroutine 還是函式呼叫的概念,但不用回到 A(),順序完全取決於期待。 ![image](https://hackmd.io/_uploads/HJdbabq7A.png) - setjmp 和 longjmp 可以看作是用於函式之間跳轉的 goto。當發生內文切換時,setjmp 會將當前的執行環境(如寄存器和堆疊指標)保存到一個 jmp_buf 結構中,並標示一個目標位置(跳躍的目的地)。隨後,longjmp 可以恢復到最近一次 setjmp() 呼叫的位置,並繼續執行該位置後的程式碼。 **coroutine 實作** [coro](https://github.com/sysprog21/concurrent-programs/tree/master/coro) : 一百多行的 code 實現簡單的協同是多工,使用 `setjmp` 與 `longjmp` 來切換任務。 **neco** :::info 課程影片提到 buffered I/O 解決 coroutine 中間狀態造成的事件 ? 我不了解 buffer 在 coroutine 中扮演的角色,及做了什麼? ::: **混合使用者和核心層級執行緒的模式** user space 與 kernel space 的數量不一樣,user space 必須自己察覺到這件事 (M : N) ![image](https://hackmd.io/_uploads/ByAHA0aXR.png) 上圖 user space 執行緒數量 -> 4 ; kernel space 執行緒數量 -> 3 - 何時會不適用 1 : 1 的機制 ex: 網頁伺服器 在網頁伺服器中,M : N 機制的 user space 先將 Thread D 、 Thread C 進行排序,相較以往 1 : 1 需要大量的執行緒與 lock 來確保資源一致。 - 為何不能將所有事情交給 kernel 只要有一個系統單元被系統呼叫或 lock 所拖延,那就無法實現到低延遲。 - futex - 用於在 user space 實現快速的互斥鎖。 - 僅有 4 個系統呼叫 - FUTEX_WAKE : 通知即將離開,要使用共享資源的準備進入 - FUTEX_WAIT : 即將進入,其他人洗洗睡 - thread pool - 在資源有限(CPU、 main memory)的情況下 - 當系統有 n 個 CPU ,建立 n 個行程 / 執行緒,分別對應到不同 CPU 上。 **應用 coroutine 到事件驅動** 目的 : coroutine 讓切換成本很小,趕緊把任務完成 - nginx ![image](https://hackmd.io/_uploads/HyRz8XyVC.png) 當事件發生,Taskqueue 會接收由 worker process 來的工作,在 Thread pool 中將任務分配給不同的 worker ,在前有提到 Thread pool 會將任務綁定給特定 CPU ,才能充分使用到 CPU 上的資源。 - [事件驅動伺服器:原理和實例](https://hackmd.io/@sysprog/event-driven-server) - Blocking I\O Model 在 kernel 等待資料就緒是件非常耗費時間的事 ![image](https://hackmd.io/_uploads/B1axYmyEA.png) - Nonblocking I/O Model 在等待的過程中,可以一直得去查看資料是就緒,這段等待的期間就能去執行其他的工作。(Nonblocking 並非完全沒有 Block ,只是在等待的期間能繼續執行其他任務) ![image](https://hackmd.io/_uploads/BJWFtX1ER.png) - Asynchronous I/O Model `aio_read` 去幫你監控任務是否完成,有個地方要注意, `aio_read` 要與主程式在不同的 CPU 上,不然可能會發生衝突。 ![image](https://hackmd.io/_uploads/HkBWpmkEC.png) - I/O Multiplexing Model 如同名子的意思,同時監控多個等待資料,當有資料完成時通知某資料已完成,就如同數位電路中的多工器,(在這不需要用到多核方式,本質上還是基於 Blocking) ![image](https://hackmd.io/_uploads/S1nfyNyV0.png) :::info 問題 在 I/O Multiplexing Model 有提到等到完成會去通知資料已完成,但為何 Blocking I\O Model 中不能實現在通知完成以前去執行其他的工作,必須使用到 Nonblocking 的作法一直去檢查是否完成 ? ::: ## 〈[執行順序](https://hackmd.io/@sysprog/concurrency/%2F%40sysprog%2Fconcurrency-ordering)〉 ## 〈[Atomics 操作](https://hackmd.io/@sysprog/concurrency/%2F%40sysprog%2Fconcurrency-atomics)〉 在並行程式設計中,常用 lock 避免多執行續的情況下修改同一資料,造成 race condition 情況。 - race condtion 假設兩執行緒各自將全域變數的值 +1 | Thread 1 | Thread 2 | | value | | -------- | -------- | -------- |-------- | | | | |0 | | read | | <- |0 | | increase | | |0 | | write back | | -> |1 | | | read | <- |1 | | | increase | |1 | | | write back | -> |2 | 若沒有使用 lock 或是同步機制,可能遇到下面問題 | Thread 1 | Thread 2 | | value | | -------- | -------- | -------- |-------- | | | | |0 | | read | | <- |0 | | | read | <- | | | increase | | |0 | | | increase | |0 | | write back | | -> |1 | | | write back | -> |1 | 因此結果並非預期的 2 而是 1。 **阻塞式同步機制** 當多執行緒或行程會用到共享資源時,需要阻塞 (blocking) 其他執行緒來讓某執行緒能夠符合預期的執行。 缺點: 1. 若藉由阻塞來達到同步,被阻塞的執行緒就必須等待,無法充分運用硬體能力。 2. 當持有 lock 的執行緒發生非預期的錯誤,將導致 lock 無法釋放,導致使用者認為的卡住,但處理器仍然在運作中。 3. 互斥鎖在特定情況下的代價很大,例如 priority inversion 的議題,還需要一些驗證機制以防止死鎖、priority inversion 的情況發生。 - priority inversion (優先權反轉) 考慮三種優先級的任務 : 高優先級 H、中優先級 M (不需要共享資源)、 低優先級 L 在多執行緒的情況下,H 被迫等待 L 釋放鎖,而 L 又會被 M 給搶佔,導致 L 無法及時釋放共享資源,進而導致高優先級的 H 無法即時執行。 - 解決方法: Priority Inheritance : 當 L 持有鎖時 H 正在等待,此時 L 會臨時繼承 H 的優先級,直到釋放鎖,這樣可以避免中優先級的任務佔用 CPU Priority Ceiling : 設定一個最高優先級,當任務獲取鎖時,他的優先級會臨時提升到最高優先級,以防止其他任務搶佔。 若不希望因為單一執行緒而拖延到整個進程,就必須考慮到阻塞式同步機制以外的方案。 補充: > 在 〈POSIX Thread〉 中有舉例優先權反轉的問題及解決方法 ![image](https://hackmd.io/_uploads/BJr3LzNVA.png) > 斜線面表示使用到共用資源,因此被鎖住。 上圖中可以看到當 Task 1(H) 高優先權任務進來後,因使用到共用資源,但 Task 3 (L) 還沒有釋放鎖,必須先回到 Task 3 (L) 直到鎖被釋放,此時 Task 2 (M) 可能因特定事件被喚醒( Task 2 (M) 並不會用到共用資源),因此改為執行 Task 2 (M),等 Task 2 (M) 結束時又會回到 Task 3 (L) 直到鎖釋放,才會執行 Task 1 (H)。 ![image](https://hackmd.io/_uploads/Sy70tzVER.png) 上圖為 priority inheritance (PI) 是其中一種 Priority Inversion 解法。其解法為將 Task 3 (L) 拿到鎖後的優先權變成跟 Task 1 (H) 一樣高,這樣才不會被 Task 2 (M) 搶佔,符合 H -> M -> L 的優先權順序。 **Atomic 操作:非阻塞式同步的基石** 在 c 語言實現 compare-and-swap (CAS) ```c int CAS(int *reg, int old, int new) { ATOMIC(); int tmp = *reg; if(tmp == old) *reg = new; END_ATOMIC(); return tmp; } ``` 在多執行緒下,必須使用 atomic 操作確保不會被中斷。 :::info atomic 指令是否會像 mutex 一樣有什麼成本 ? mutex 阻塞其他執行緒來確保資料的一致性,atomic 指令為何能夠達到取代 mutex 的效果? ::: TODO:測試 CAS 函式操作 **多核處理器之間的 cache coherence** 在同一時間的同一個變數在不同的 CPU 可能會有不同的值。 舉例 : 當 CPU0 剛修改記憶體某處的資料,最新的數值是先寫入到 CPU0 的 cache,等待 cache line 被標註為 invalidate,才會寫入到主記憶體,若此時另一個 CPU1 打算存取主記憶體中同一位置資料, CPU1 將存取到主記憶體中舊的資料。因此我們需要一個機制來確保 cache 在不同 CPU 之間的一致性。 在設計的概念上,使用 cache 時,確保個別 CPU 看到一致的資料。而非讓 cache 與 memory 保持同步。維護這點的協定統稱為 cache-coherence protocol。 實作 cache-coherence protocol 時,設計者需要定義 CPU 對自己 cache 的請求,及 CPU 對其他 CPU 的 cache 的請求,定義如下: - Read Hit: CPU 要讀取資料時,發現自己的 cache 中含有這份資料 - Write Hit: CPU 要寫入資料時,發現自己的 cache 中含有這份資料 - Read Miss: CPU 要讀取資料時,發現自己的 cache 中沒有這份資料 - Write Miss: CPU 要讀取資料時,發現自己的 cache 中沒有這份資料 - Probe Read: 某一 CPU 要讀取資料時,先去看其他 CPU 中的 cache 有沒有更新這份資料 - Probe Write: 某一 CPU 要寫入資料時,去看其他 CPU 中的 cache 是否存有這份資料 - Probe Read Hit: 其他 CPU 中的 cache 打算讀取這份資料,且該 CPU 含有這份資料 - Probe Write Hit: 其他 CPU 中的 cache 打算寫入這份資料,且該 CPU 含有這份資料 **MESI protocol** 從 MESI protocol 中了解 CPU 之間的資料一致的關係 | | M | E | S | I | | --- |:------------------:|:------------------:|:------------------:|:------------------:| | M | :x: | :x: | :x: | :heavy_check_mark: | | E | :x: | :x: | :x: | :heavy_check_mark: | | S | :x: | :x: | :heavy_check_mark: | :heavy_check_mark: | | I | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | - Modified (M) - 在 cache 中為最新的資料,與主記憶體的資料不一致 - 資料僅存在該 CPU 的 cache 中(獨佔) - 若將該資料從 cache 中驅逐,需寫回主記憶體 - Exclusive (E) - 資料在該 CPU 的 cache 與主記憶體的資料一致 ex: 第一次寫入 - 資料僅存在該 CPU 的 cache 中(獨佔) - 如需將該資料從 cache 中驅逐,不須寫回主記憶體 - Shared (S) - 資料在該 CPU 的 cache 與主記憶體的資料一致 - 可能存在多個 cache 中 - 如需將該資料從 cache 中驅逐,不須寫回主記憶體 - Invaild (I) - cache 中存有的這份資料是過時的 舉例說明 : 1. CPU0 對主記憶體中的 Block 0 進行讀取,因 CPU0 為第一個且第一次讀取,因此該 cacheline 狀態設定為 `Exclusive` 2. CPU1 想對 Block 0 進行讀取,因此發送 Probe read 給 CPU0,接著 CPU0 收到 Probe read hit,因 CPU0 的 cache 中有 Block 0 的資料。此時,CPU1 從 CPU0 讀取資料,並將其 cacheline 狀態設定成 `Shared` 3. CPU0 想對 Block 0 進行寫入,CPU0 含有 Block 0 的 cacheline 被設定成 Modified,他必須發 Probe Write 給 CPU1,去看 CPU1 的 cache 中是否含有 Block 0。接著 CPU1 會收到 Probe write hit,因為其 cache 中有 Block 0 的資料。此時,CPU1 含有 Block 0 的 cacheline 也會被設定成 `Invalid`,因為其資料已過時。 4. CPU1 想對 Block 0 進行讀取,他會發 Probe read 給 CPU0,接著 CPU0 會收到 Probe read hit,因為其 cache 中有 Block 0 的資料。因為這個 cacheline 的狀態是 Modified,當收到 Probe read hit 時,會將這份資料寫回主記憶體中,並將狀態改為 Shared,接著 CPU1 從 CPU0 讀取資料,並將其 cacheline 狀態設定成 `Shared` 在實作上用 3 個 bit 來表示 cacheline 的狀態 | | Valid bit | Dirty bit | Shared bit | | -------- | -------- | -------- | - | | Modified | 1 | 1 | 0 | | Exclusive | 1 | 0 | 0 | | Shared | 1 | 0 | 1 | | Invalid | 0 | X | X | **MOESI protocol** 為避免將資料寫回主記憶體, MOESI protocol 加入 Owner ,並略為修改 Shared 狀態 - Owner (O) - 其他 CPU 的 cache 可能含有這份資料 - 擁有最新的資料,指主記憶體中的不一致 - 其他 CPU 的 cache 發 probe read 時,會將資料提供給他且不會寫回 memory - Shared (S) - 其他 CPU 的 cache 必定含有同一份資料 - 在主記憶體中的這份資料和 cache 中存有的這份資料可能是一樣的 | | Valid bit | Dirty bit | Shared bit | | -------- | -------- | -------- | - | | Modified | 1 | 1 | 0 | | Owner | 1 | 1 | 1 | | Exclusive | 1 | 0 | 0 | | Shared | 1 | 0 | 1 | | Invalid | 0 | X | X | 舉例: 延續上個例子且略為修改 1. CPU1 想對 Block 0 進行讀取,他會發 Probe read 給 CPU0,接著 CPU0 會收到 Probe read hit,因為其 cache 中有 Block 0 的資料,CPU1 直接從 CPU0 讀取資料,並將其 cacheline 狀態設定成 `Shared`,而 CPU0 上的 cacheline 狀態則會從 `Modified` 變為 `Owner`。 2. CPU0 想對 Block 0 進行寫入,CPU0 含有 Block 0 的 cacheline 從 `Owner` 被設定成 `Modified`,他必須發 Probe Write 給 CPU1,去看 CPU1 的 cache 中是否含有 Block 0。接著 CPU1 會收到 Probe write hit,因為其 cache 中有 Block 0 的資料。此時,CPU1 含有 Block 0 的 cacheline 也會被設定成 `Invalid`,因為其資料已過時。 在 MESI 協議中,當一個 CPU 修改資料後,如果其他處理器需要讀取該資料,則修改過的資料必須先寫回主記憶體,然後再提供給請求者。而在 MOESI 協議中,Owner 狀態允許資料持有者直接提供資料給請求者,而不需要寫回主記憶體,這樣可以減少主記憶體的存取次數,提高系統性能。 **false sharing** ![image](https://hackmd.io/_uploads/rJXhjPfNR.png) 圖中有兩執行緒,當 CPU0 不斷的改變變數 `X` , CPU1 的 cache 該變數會變為無效,即使他並沒有修改此變量。這會導致頻繁的高速緩存無效和重新載入操作,從而降低性能。 :::info 如何做到抑制 false sharing ::: **[Memory Ordering](https://hackmd.io/@sysprog/concurrency/%2F%40sysprog%2Fconcurrency-atomics#Memory-Ordering-%E5%92%8C-Barrier)** 在多核環境下,對記憶體操作的順序進行重新排列的技術。這種技術的目的是提高性能和效率,但是可能會導致一些複雜性,特別是在並行程序設計。 - In-order processors (循序) - Out-of-order processors (亂序) 討論 cache coherence 時,MESI 與 MOESI 皆須等待另個 CPU 做出回應才能夠確保資料的一致性,但付出的代價是寫入 cache 的速度較慢,讓 CPU 閒置。 ![image](https://hackmd.io/_uploads/B1qKsufVC.png) 假設 CPU0 打算寫入資料到位置 X,CPU1 的 cache 有 X 的內含值。存取延遲 (stall) 的原因如下: - CPU0 要等 CPU1 回應 invalidate ack - CPU1 的 cache 可能太忙而拖慢了回覆時間 (比方同時從 cache 大量的讀寫資料,或是短時間收到大量 invalidate ack)。 減少 CPU 閒置方法: 1. store buffer :::info 在範例中我並不能理解 store buffer 是如何縮短 CPU 的閒置時間? ::: 2. invalidate :::info ::: TODO: 硬體實作策略 TODO: 軟體觀點 TODO: 易於誤用 `volatile` 關鍵字 TODO: 處理器架構和其 Memory Order ## 〈[POSIX Thread](https://hackmd.io/@sysprog/concurrency/%2F%40sysprog%2Fposix-threads)〉 ### Thread Management **創造執行緒 `pthread_create`** 建立一個新執行緒並使其可執行。 ```c pthread_create(thread, attr, start_routine, arg) ``` 這裡的輸入參數需要注意: - thrad : 指向執行緒名稱的指針 - attr : 設置執行緒的屬性,設 `NULL` 表示為預設屬性 - start_routine : 執行緒創建後將執行的 C routine - arg : 傳遞給 `start_routine` 的單一參數。這個函數必須接受一個 `void *` 類型的參數,並返回一個 `void *` 類型的結果。。如果不傳遞任何參數,則可以使用 `NULL` 。 **中止執行緒與 `pthread_exit(status)`** 執行緒可以通過多種方式終止: 1. 執行緒正常返回其起始例程,表示其工作已完成 2. 執行緒調用 `pthread_exit` 子例程,不論其工作是否完成 3. 執行緒被另一個執行緒透過 `pthread_cancel` 例程取消 4. 整個行程 (process) 終止,例如調用 `exec()` 或` exit()` 5. 如果 `main()` 先完成,無需 `pthread_exit` 明確呼叫自身 - 在線程函數中調用 `pthread_exit` 執行緒在完成其工作後可以調用 `pthread_exit` 終止自己,並返回結果給其他執行緒。 - 在 main 函數中調用 `pthread_exit` - 如果主執行緒(通常是 main 函數所屬的執行緒)在其他執行緒結束之前終止,則整個行程會終止,並且所有執行緒都會被強制終止。 - 調用 `pthread_exit` 可以使主執行緒結束執行,而不會終止整個行程,允許其他執行緒繼續運行直到它們自己終止。 實作: > 參照 [POSIX Threads Programming ](https://hpc-tutorials.llnl.gov/posix/) ```c #include <pthread.h> #include <stdio.h> #include <stdlib.h> #define NUM_THREAD 4 void *print(void *thread_id) { long tid; tid = (long)thread_id; printf("Hello World!, By thread #%ld\n", tid); pthread_exit(NULL); } int main(int argc, char *argv[]) { // Create NUM_THREADS threads pthread_t threads[NUM_THREAD]; // Receive the return value of the pthread_create function int rc; long t; for(t = 0; t < NUM_THREAD; t++){ printf("Create thread %ld\n",t); rc = pthread_create(&threads[t], NULL, print, (void *)t); // If pthread_create successfully creates a thread, the return value rc will be 0. if (rc){ printf("ERROR\n"); exit(-1); } } // Prevent the whole process from terminating due to main() ending first. pthread_exit(NULL); } ``` 結果: ``` Create thread 0 Create thread 1 Create thread 2 Hello World!, By thread #0 Create thread 3 Hello World!, By thread #1 Hello World!, By thread #2 Hello World!, By thread #3 ``` **連接和分離線程** join 是實現執行緒之間"同步"的一種方法 ![image](https://hackmd.io/_uploads/B1HIysO4C.png) - Joinable or Not? - 在建立執行緒時,其屬性之一的定義是可連接或是可分離的。只有創建可連接的執行緒時該執行緒才能進行連接。如過執行緒被創建為分離的,則它永遠不能連接。 - 若要明確建立可連接或可分離的執行緒,需使用到 `attr` 的參數 - pthread_create() 的 4 步驟 : - `pthread_attr_t` 聲明資料類型的 pthread 屬性變數 - 初始化屬性變數`pthread_attr_init()` - 設定屬性分離狀態`pthread_attr_setdetachstate()` - 完成後,釋放屬性`pthread_attr_destroy()` ::: info 若明確知道一執行緒永遠不需與另一個執行緒連接請考慮以分離狀態建立它,因為這可能會減少開銷。 ::: :::info 我該從何得知預設屬性為何? 例如,在上個範例中 `pthread_create(thread, attr, start_routine, arg)` attr 設為 NULL 時即為預設屬性,但我並不知道預設屬性是? ::: 實作可加入的執行緒: > 參考 [Joining and Detaching Threads](https://hpc-tutorials.llnl.gov/posix/joining_and_detaching/) > 將上個範例改寫成可加入執行緒的情況 ```c #include <pthread.h> #include <stdio.h> #include <stdlib.h> #define NUM_THREADS 3 void *print(void *thread_id) { long tid = (long) thread_id; printf("Hello World!, By thread #%ld\n", tid); pthread_exit((void *) tid); } int main(int argc, char *argv[]) { pthread_t threads[NUM_THREADS]; /* Declare a pthread attribute variable of the pthread_attr_t data type */ pthread_attr_t attr; int rc; long t; void *status; /* Initialize and set thread detached attribute */ /* PTHREAD_CREATE_JOINABLE is joinable */ pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); for (t = 0; t < NUM_THREADS; t++) { printf("Creating thread %ld\n", t); rc = pthread_create(&threads[t], &attr, print, (void *)t); if (rc) { printf("ERROR\n"); exit(-1); } } /* Free attribute and wait for the other threads */ pthread_attr_destroy(&attr); for (t = 0; t < NUM_THREADS; t++) { rc = pthread_join(threads[t], &status); if (rc) { printf("ERROR\n"); exit(-1); } printf("Completed join with thread %ld having status %ld\n", t, (long)status); } printf("Main thread completed.\n"); pthread_exit(NULL); } ``` 在 `print` 函式中的 `pthread_exit((void *) tid);` 返回一個狀態值 (status) 給其他等待這個執行緒的執行緒。 結果 ``` Creating thread 0 Creating thread 1 Hello World!, By thread #0 Creating thread 2 Hello World!, By thread #1 Hello World!, By thread #2 Completed join with thread 0 having status 0 Completed join with thread 1 having status 1 Completed join with thread 2 having status 2 Main thread completed. ``` **Stack Management** 學習如何正確地設置和獲取線程的 Stack 大小,以適應需要大量堆棧空間的任務。可以避免 Stack 溢出,並確保線程能夠正確執行。 > [Stack Management](https://hpc-tutorials.llnl.gov/posix/stack_management/) 中的範例用多個執行緒並進行矩陣計算。 ```c #include <pthread.h> #include <stdio.h> #include <stdlib.h> #define NTHREADS 4 #define N 1000 #define MEGEXTRA 1000000 pthread_attr_t attr; void *dowork(void *threadid) { double A[N][N]; int i, j; long tid; size_t mystacksize; tid = (long)threadid; pthread_attr_getstacksize(&attr, &mystacksize); printf("Thread %ld: stack size = %li bytes \n", tid, mystacksize); for (i = 0; i < N; i++) { for (j = 0; j < N; j++) { A[i][j] = ((i * j) / 3.452) + (N - i); } } pthread_exit(NULL); } int main(int argc, char *argv[]) { pthread_t threads[NTHREADS]; size_t stacksize; int rc; long t; pthread_attr_init(&attr); pthread_attr_getstacksize(&attr, &stacksize); printf("Default stack size = %li\n", stacksize); stacksize = sizeof(double)*N*N+MEGEXTRA; printf("Amount of stack needed per thread = %li\n", stacksize); pthread_attr_setstacksize (&attr, stacksize); printf("Creating threads with stack size = %li bytes\n", stacksize); for(t=0; t<NTHREADS; t++){ rc = pthread_create(&threads[t], &attr, dowork, (void *)t); if (rc){ printf("ERROR; return code from pthread_create() is %d\n", rc); exit(-1); } } printf("Created %ld threads.\n", t); pthread_exit(NULL); } ``` - `pthread_attr_getstacksize(attr, stacksize)` 獲取當前 Stack 大小 - `pthread_attr_setstacksize(attr, stacksize)`設置 Stack 大小 結果 ``` Default stack size = 8388608 Amount of stack needed per thread = 9000000 Creating threads with stack size = 9000000 bytes Created 4 threads. Thread 1: stack size = 9000000 bytes Thread 0: stack size = 9000000 bytes Thread 3: stack size = 9000000 bytes Thread 2: stack size = 9000000 bytes ``` ### Synchronizing Threads 3 basic synchronization primitives: 1. 互斥鎖 (Mutex) 2. 條件變數 (Condition Variable) 3. 信號量 (Semaphore) **互斥鎖 (Mutex)** 創建互斥鎖 - 在使用前必須先初始化互斥變數 ```c //Statically pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER; //Dinamically, using pthread_mutex_init(). pthread_mutex_t mymutex; pthread_mutex_init(&mymutex, NULL); ``` 銷毀互斥鎖 `pthread_mutex_destroy`: 銷毀不再需要互斥的變數 ```c int pthread_mutex_destroy(pthread_mutex_t *mutex); ``` 鎖定和解鎖互斥鎖 `pthread_mutex_lock`: 執行緒使用此函數來取得互斥變數的鎖。如果該互斥鎖已經被其他執行緒鎖定,則此調用會阻塞當前執行緒直到互斥鎖被解鎖。 ```c int pthread_mutex_lock(pthread_mutex_t *mutex); ``` `pthread_mutex_trylock`: 嘗試取得互斥鎖。如果互斥鎖已經被鎖定,則此函數會立即返回並且返回一個“忙碌”錯誤代碼。 ```c int pthread_mutex_trylock(pthread_mutex_t *mutex); ``` `pthread_mutex_unlock`: 解鎖一個由當前執行緒擁有的互斥鎖。若出現以下情況,則回傳錯誤: 1. 互斥鎖已被解鎖 2. 互斥鎖被其他執行緒擁有 ```c int pthread_mutex_unlock(pthread_mutex_t *mutex); ``` 案例 : [實作兩個向量的乘積和 (dot product)](https://hpc-tutorials.llnl.gov/posix/example_using_mutexes/) 結果: 每個執行緒計算部分向量的點積,並使用互斥鎖保護全域變數的更新。 ``` Now it is thread 0 , Sum = 100.000000 Now it is thread 1 , Sum = 200.000000 Now it is thread 2 , Sum = 300.000000 Now it is thread 3 , Sum = 400.000000 Sum = 400.000000 ``` 在案例中將多的參數定義在結構體當中 ```c typedef struct { double *a; double *b; double sum; int veclen; } DOTDATA; ``` - 指標指向兩個向量 `a` 、 `b` - 內積總和 `sum` - 向量長度 `veclen` 在 `dotprod` 函式中進行向量的計算,當中使用互斥鎖 `mutexsum` 來保護 `dotstr.sum` 的更新,以避免 race condition 。 ```c pthread_mutex_lock(&mutexsum); dotstr.sum += mysum; pthread_mutex_unlock(&mutexsum); ``` 在主函式中要注意的是主執行緒必須等待其他執行緒確實完成才能繼續, `pthread_join` 暫時阻塞主執行緒,等待所有執行緒完成繼續。 ```c /* Wait on the other threads */ for(i = 0; i < NUMTHRDS; i++) { pthread_join(callThd[i], &status); printf("Thread %ld completed with status %ld\n", i, (long)status); } ``` 其他 API - `pthread_self()` 取得呼叫執行緒ID - `pthread_equal(thread1, thread2)` 比較兩個線程 ID。如果兩個 ID 不同,則傳回 0,否則傳回非零值。 **條件變量 (Condition Variable)** 條件變數允許執行緒阻塞,直到特定條件成立,阻塞的執行緒進入一個等待的佇列,等到條件成立時,其他執行緒會向阻塞的執行緒發出信號。 `pthread_cond_init` 用來初始化一個條件變數 `cond` , `attr` 用來指定條件變數的屬性。如果 `attr` 是 `NULL` ,則使用預設。 ```c int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr); ``` `pthread_cond_destroy` 顧名思義,消除條件變數 `cond` 。 ```c int pthread_cond_destroy(pthread_cond_t *cond); ``` `pthread_cond_wait` ```c int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex); ``` 阻塞當前線程,直到條件變數 cond 被信號喚醒。等待時會自動釋放互斥鎖 mutex,被喚醒後會重新獲取互斥鎖。 `pthread_cond_signal` 喚醒等待條件變數 `cond` 的一個執行緒。 ```c int pthread_cond_signal(pthread_cond_t *cond); ``` `pthread_cond_broadcast` 喚醒等待條件變數 `cond` 的所有執行緒。 ```c int pthread_cond_broadcast(pthread_cond_t *cond); ``` 範例 [Using Condition Variables](https://hpc-tutorials.llnl.gov/posix/example_using_cond_vars/) - 主函式建立三個執行緒 - 其中兩個執行緒執行工作並更新 `count` 變數 - 第三個執行緒等待,直到計數變數達到指定值。 結果 ``` Starting watch_count(): thread 1 inc_count(): thread 2, count = 1 -- unlocking mutex watch_count(): thread 1 Count= 1. Going into wait... inc_count(): thread 3, count = 2 -- unlocking mutex inc_count(): thread 2, count = 3 -- unlocking mutex inc_count(): thread 3, count = 4 -- unlocking mutex inc_count(): thread 2, count = 5 -- unlocking mutex inc_count(): thread 3, count = 6 -- unlocking mutex inc_count(): thread 2, count = 7 -- unlocking mutex inc_count(): thread 3, count = 8 -- unlocking mutex inc_count(): thread 2, count = 9 -- unlocking mutex inc_count(): thread 3, count = 10 -- unlocking mutex inc_count(): thread 2, count = 11 -- unlocking mutex inc_count(): thread 3, count = 12 -- threshold reached.Just sent signal. inc_count(): thread 3, count = 12 -- unlocking mutex watch_count(): thread 1 Condition signal received. Count= 12 watch_count(): thread 1 Updating the value of count... watch_count(): thread 1 count now = 137. watch_count(): thread 1 Unlocking mutex. inc_count(): thread 2, count = 138 -- unlocking mutex inc_count(): thread 3, count = 139 -- unlocking mutex inc_count(): thread 2, count = 140 -- unlocking mutex inc_count(): thread 3, count = 141 -- unlocking mutex inc_count(): thread 2, count = 142 -- unlocking mutex inc_count(): thread 3, count = 143 -- unlocking mutex inc_count(): thread 2, count = 144 -- unlocking mutex inc_count(): thread 3, count = 145 -- unlocking mutex Main(): Waited and joined with 3 threads. Final value of count = 145. Done. ``` 流程: - 首先定義 thread 1 用作監視 count 值, thread 2、3 用作更新 count 值 - 函式 `watch_count` 一開始對 count 進行上鎖,但 `count < COUNT_LIMIT` ,因此進入等待並釋放鎖,等到條件變數達成又會自動獲取鎖 (`pthread_cond_wait(&count_threshold_cv, &count_mutex);`) ```c while (count < COUNT_LIMIT) { printf("watch_count(): thread %ld Count= %d. Going into wait...\n", my_id,count); pthread_cond_wait(&count_threshold_cv, &count_mutex); printf("watch_count(): thread %ld Condition signal received. Count= %d\n", my_id,count); } ``` - 執行緒 2、3 分別在函式 `inc_count` 中更新 count 的值,但因為有 mutex 的存在,當前執行緒更新 count 時阻塞另個執行緒,一直到釋放鎖,來防止 race condition。 - 當 `count == COUNT_LIMIT` 喚醒正在等待的執行緒 1 ,並將 count 加上 125 ```c if (count == COUNT_LIMIT) { printf("inc_count(): thread %ld, count = %d -- threshold reached.", my_id, count); pthread_cond_signal(&count_threshold_cv); printf("Just sent signal.\n"); } ``` Overview - 條件變數為執行緒同步提供了另一種方式。 mutex 透過控制執行緒對資料的存取來實現同步,而條件變數允許執行緒根據資料的實際值進行同步。 - 如果沒有條件變數,程式設計需要讓執行緒不斷輪詢(可能在關鍵部分),以檢查條件是否滿足。這可能會非常消耗資源,因為執行緒在此活動中會持續忙碌。條件變數是一種無需輪詢即可實現相同目標的方法。 - 條件變數必須與互斥鎖結合使用。 下面顯示了使用條件變數的代表性順序。 | Main Thread | | | -------- | -------- | | Declare and initialize global data/variables which require synchronization (such as "count") Declare and initialize a condition variable object Declare and initialize an associated mutex Create threads A and B to do work | | |**Thread A**| **Thread B**| | Do work up to the point where a certain condition must occur (such as "count" must reach a specified value)|Do work| Lock associated mutex and check value of a global variable|Lock associated mutex| Call `pthread_cond_wait()` to perform a blocking wait for signal from Thread-B. Note that a call to `pthread_cond_wait()` automatically and atomically unlocks the associated mutex variable so that it can be used by Thread-B.|Change the value of the global variable that Thread-A is waiting upon.| When signalled, wake up. Mutex is automatically and atomically locked.|Check value of the global Thread-A wait variable. If it fulfills the desired condition, signal Thread-A.| Explicitly unlock mutex|Unlock mutex.| Continue |Continue| |**Main Thread**|| |Join / Continue|| **semaphores** ```c sem_t semaphore; ``` 定義訊號量變數 ```c int sem_init(sem_t *sem, int pshared, unsigned int value); ``` - `sem` 指向要初始化的訊號量 - `pshared` :若設置為非 0 ,則訊號量在多個行程間共享,若設置為 0 ,則訊號量僅在單一行程內的多執行緒間使用 - `value` 訊號量的初始值 - 若成功返回 `0` ,反之失敗為 `-1` ```c int sem_wait(sem_t *sem); ``` - 若訊號量的值大於零,則減少該值並立即返回;若訊號量的值為零,則阻塞該執行緒直到訊號量的值變為正數。 ```c int sem_post(sem_t *sem); ``` - 這將增加訊號量的值,若有任何執行緒因等待信號量而被阻塞,則將其中一個執行續被喚醒。 - 不能保證是哪條執行緒被喚醒 實作: ```c #include <stdio.h> #include <pthread.h> #include <semaphore.h> #include <unistd.h> sem_t mutex; void* thread(void* arg) { //wait sem_wait(&mutex); printf("\nEntered..\n"); //critical section sleep(1); //signal printf("\nJust Exiting...\n"); sem_post(&mutex); } int main() { sem_init(&mutex, 0, 1); pthread_t t1,t2; pthread_create(&t1,NULL,thread,NULL); pthread_create(&t2,NULL,thread,NULL); pthread_join(t1,NULL); pthread_join(t2,NULL); sem_destroy(&mutex); return 0; } ``` ## [〈實作輕量級的 Mutex Lock〉](https://hackmd.io/@sysprog/concurrency/%2F%40sysprog%2Fconcurrency-mutex) ### Futex 全名 Fast Userspace Mutex ,Futex 可用於實作使用者空間之同步機制。 由於在 Linux 上 futex 沒有直接的 C 語言函式介面,因此需藉由 `syscall` 來進行。 ```c long syscall(SYS_futex, uint32_t *uaddr, int futex_op, uint32_t val, const struct timespec *timeout, /* or: uint32_t val2 */ uint32_t *uaddr2, uint32_t val3); ``` ### 實作 Mutex lock 在了解 mutex_trylock 之前必須先看 [mutex/atomic.h](https://github.com/sysprog21/concurrent-programs/blob/master/mutex/atomic.h) ```c #define load(obj, order) atomic_load_explicit(obj, memory_order_##order) #define fetch_or(obj, arg, order) \ atomic_fetch_or_explicit(obj, arg, memory_order_##order) #define thread_fence(obj, order) atomic_thread_fence(memory_order_##order #define exchange(obj, value, order) \ atomic_exchange_explicit(obj, value, memory_order_##order) ``` `mutex_trylock` ```c static bool mutex_trylock(mutex_t *mutex) { int state = load(&mutex->state, relaxed); if (state & MUTEX_LOCKED) return false; state = fetch_or(&mutex->state, MUTEX_LOCKED, relaxed); if (state & MUTEX_LOCKED) return false; thread_fence(&mutex->state, acquire); return true; } ``` - 使用 `atomic_load_explicit` 以 relaxed 記憶體順序從 mutex->state 讀取當前狀態,若被上鎖返回 `false` 。 - 使用 `atomic_fetch_or_explicit ` 將 `MUTEX_LOCK` bitflag 設置在 mutex->state 上,並獲取此操作之前的 state ,如果此狀態包含 `MUTEX_LOCKED`,則說明在嘗試鎖定時鎖已被其他執行緒持有,返回 false 。 - 使用 `atomic_thread_fence` 以 acquire 記憶體順序設置屏障,確保此操作之後的讀寫操作不會被重排序到此操作之前。 `memory_order_relaxed` : 對其他讀取或寫入沒有同步或排序約束 `memory_order_acquire` : 在此載入之前,當前執行緒中的任何讀取或寫入都不能重新排序 `mutex_lock` ```c static inline void mutex_lock(mutex_t *mutex) { #define MUTEX_SPINS 128 for (int i = 0; i < MUTEX_SPINS; ++i) { if (mutex_trylock(mutex)) return; spin_hint(); } int state = exchange(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING, relaxed); while (state & MUTEX_LOCKED) { futex_wait(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING); state = exchange(&mutex->state, MUTEX_LOCKED | MUTEX_SLEEPING, relaxed); } thread_fence(&mutex->state, acquire); } ``` - 嘗試使用 `mutex_trylock` 來獲取鎖,若在 `for` 迴圈的這段時間都得不到 lock ,改藉由 `atomic_exchange_explicit` 將 state 更改成 `MUTEX_LOCKED | MUTEX_SLEEPING` 的方式來持有,並返回舊的 state - `MUTEX_LOCKED | MUTEX_SLEEPING` 是什麼? 其中額外的 MUTEX_SLEEPING 表示執行緒不僅要試著 lock mutex,且若它未能即時得到,會用 futex_wait 把自己加入到 wait queue 中。因此日後要 unlock 時就需要辨別該 flag 以 futex_wake 才能正確喚醒之。 - 若進入 `while (state & MUTEX_LOCKED)` 迴圈表示鎖已被其他執行緒所持有,並使用 `futex_wait` 進入睡眠,等待鎖釋放,當被喚醒後再次嘗試設置狀態並檢查 state 是否還被其他執行緒持有鎖。 :::info 在 `for` 迴圈中的 `spin_hint` 有何作用? ```c #if defined(__i386__) || defined(__x86_64__) #define spin_hint() __builtin_ia32_pause() #elif defined(__aarch64__) #define spin_hint() __asm__ __volatile__("isb\n") #else #define spin_hint() ((void) 0) #endif ``` 在同步機制中,經常需要以 spin-wait loop 方式檢測 lock 的狀態,以嘗試持有 lock。但這些頻繁的檢測將導致 pipeline 上都是對 lock 狀態的讀操作,那麼當另一個執行緒對 lock 進行寫操作時,因為相依關係 pipeline 就必須重排,這導致性能的下降和更多的耗電。 在其他執行緒寫入後使用 PAUSE 指令,使自旋鎖稍微延遲等待 pipeline 重整,再繼續以 spin-wait loop 方式檢測 lock 的狀態。(像是計算機結構中加入 `NOP`) ![image](https://hackmd.io/_uploads/Bk2Xo73NR.png) ::: ### 實作 Condition variable ### 測試和驗證 模擬情境: 給定 16 個工作節點 $n_0$~$n_{16}$ ,他們彼此間具備從屬關係的: $n_k$ 的親代節點是 $n_{k-1}$ 。每個節點需要都等待親代節點就緒 (ready) 方可進行下一階段的工作 ( $n_0$ 沒有親代節點,因此無此限制)。 16 個節點共享一個 `clock`,`clock` 從 tick = 1 開始累計,節點在每個 tick 都有必須完成的事:在偶數 tick 時想像成是完成階段任務,可改變自身狀態為就緒並通知其子節點繼續後續任務,而在奇數 tick 時則推動時間讓 tick += 1。 結構體 ```c struct clock { mutex_t mutex; cond_t cond; int ticks; }; struct node { struct clock *clock; // Each node points to a shared clock struct node *parent; // Point to the parent node mutex_t mutex; cond_t cond; bool ready; // Proceed to the next node only when ready is true }; ``` - `clock` 中的 `ticks` 決定是否繼續下一步驟,同步上使用 condition variable + mutex 機制去通知此事件,且通知應該是用 broadcast 方式讓所有正在等待的執行緒得知 > 在 POSIX_Thread 中 condition variable 的實作中因只有一個執行緒在等待,使用 `pthread_cond_signal` - `node` 中 `ready` 主要讓結點能進行下個工作,因此當親代節點就緒時,主動通知此事。同樣需要 condition variable + mutex 機制,但以 signal 方式通知子節點就好。 **`thread_func(void *ptr)`** ```c static void *thread_func(void *ptr) { struct node *self = ptr; bool bit = false; for (int i = 1; clock_wait(self->clock, i); ++i) { printf("Thread [%c] | i : %d\n", self->name, i); if (self->parent){ printf("Thread [%c] wait parent\n", self->name); node_wait(self->parent); } if (bit) { printf("Thread [%c] send signal\n", self->name); node_signal(self); } else { printf("Thread [%c] trigger clock\n", self->name); clock_tick(self->clock); } bit = !bit; } node_signal(self); return NULL; } ``` **`main(void)`** ```c int main(void) { struct clock clock; clock_init(&clock); #define N_NODES 3 struct node nodes[N_NODES]; node_init(&clock, NULL, &nodes[0]); for (int i = 1; i < N_NODES; ++i) node_init(&clock, &nodes[i - 1], &nodes[i]); printf("\nParent releationship : NULL "); for(int i = 0; i < N_NODES; ++i) printf(" -> [%c] ", nodes[i].name); printf("\n"); pthread_t threads[N_NODES]; for (int i = 0; i < N_NODES; ++i) { if (pthread_create(&threads[i], NULL, thread_func, &nodes[i]) != 0) return EXIT_FAILURE; } printf("Tick start~\n"); clock_tick(&clock); clock_wait(&clock, 1 << N_NODES); clock_stop(&clock); printf("\nTick stop~\n"); for (int i = 0; i < N_NODES; ++i) { if (pthread_join(threads[i], NULL) != 0) return EXIT_FAILURE; } return EXIT_SUCCESS; } ``` `main` 執行緒呼叫第一個 `clock_tick` 來讓 `tick` 變為 `1`,這樣其他執行緒就可以開始根據 `tick` 逐步進行。而這裡的 `clock_wait` 會一直等待 `tick` 到 `1 << N_NODES` 之後再用 `clock_stop` 來讓節點的執行緒得以結束。 稍微改寫 [main.c](https://github.com/jeremy90307/Posix_thread/blob/master/mutex/example/main.c) 使其更容易了解每個執行緒的進行過程,以下為輸出結果: :::info 在執行程式時會出現下面錯誤,想請問為何會有這樣的問題? ``` FATAL: ThreadSanitizer: unexpected memory mapping 0x5c9bd4d2b000-0x5c9bd4d4b000 ``` 輸入 `sudo sysctl vm.mmap_rnd_bits=28` 來解決問題 > 參考 [FATAL: ThreadSanitizer: unexpected memory mapping when running on Linux Kernels 6.6+](https://stackoverflow.com/questions/77850769/fatal-threadsanitizer-unexpected-memory-mapping-when-running-on-linux-kernels) ::: ``` Parent releationship : NULL -> [A] -> [B] -> [C] Tick start~ ============clock_tick() tick : 1============ Thread [C] | i : 1 Thread [C] wait parent Thread [B] | i : 1 Thread [B] wait parent Thread [A] | i : 1 Thread [A] trigger clock ============clock_tick() tick : 2============ Thread [A] | i : 2 Thread [A] send signal Thread [A] becomes not ready. Thread [B] trigger clock ============clock_tick() tick : 3============ Thread [B] | i : 2 Thread [B] wait parent Thread [A] | i : 3 Thread [A] trigger clock ============clock_tick() tick : 4============ Thread [A] | i : 4 Thread [A] send signal Thread [A] becomes not ready. Thread [B] send signal Thread [B] | i : 3 Thread [B] becomes not ready. Thread [C] trigger clock ============clock_tick() tick : 5============ Thread [B] wait parent Thread [A] | i : 5 Thread [A] trigger clock Thread [C] | i : 2 Thread [C] wait parent ============clock_tick() tick : 6============ Thread [A] | i : 6 Thread [A] send signal Thread [A] becomes not ready. Thread [B] trigger clock ============clock_tick() tick : 7============ Thread [B] | i : 4 Thread [B] wait parent Thread [A] | i : 7 Thread [A] trigger clock ============clock_tick() tick : 8============ Thread [A] | i : 8 Thread [A] send signal Tick stop~ Thread [A] becomes not ready. Thread [B] send signal Thread [B] becomes not ready. Thread [C] send signal ``` - 一開始 main 執行緒 `clock_tick` 先將 tick 增加 1 接著進入 `clock_wait` 中進行等待,一直到 `clock->ticks` 等於 $2^{N_{NODES}}$ - 假設 $N_{NODES}$ 為 3 ,分別有 `Thread [A]`、`Thread [B]`、`Thread [C]` 來執行這三個 node ,且他們之間的關係為 `NULL -> [A] -> [B] -> [C] ` - 接著每個 node 進入 `thread_func` ,且預設的 `bit` 為 `false` ,每次迭代都會進行反轉,使其執行不同工作內容 - Thread [A] 因沒有親代節點關係,跳過等待親代節點 ready 的過程,也因此在每次 tick 增加後都能看 Thread [A] 不斷的執行 ``` ============clock_tick() tick : 2============ Thread [A] | i : 2 Thread [A] send signal Thread [A] becomes not ready. Thread [B] trigger clock ``` 在 tick : 2 中 - `Thread [A]` 發送節點的訊號喚醒正在等待的 `Thread [B]` (喚醒時 `Thread [A]` 轉變為 ready 因此 `Thread [B]` 才能執行工作,此時 `Thread [A]` 交棒給 `Thread [B]` 後再次轉變為 not ready) - 此時的 `Thread [B]` 因第一次執行工作 `bit = false` ,所以進行 tick 加一的工作 ``` ============clock_tick() tick : 3============ Thread [B] | i : 2 Thread [B] wait parent Thread [A] | i : 3 Thread [A] trigger clock ``` 在 tick : 3 中 - `Thread [A]` 尚未 ready ,因此 `Thread [B]` 再次進入等待親代節點的狀態 - `Thread [A]` 不受親代節點影響繼續執行工作 ``` ============clock_tick() tick : 4============ Thread [A] | i : 4 Thread [A] send signal Thread [A] becomes not ready. Thread [B] send signal Thread [B] | i : 3 Thread [B] becomes not ready. Thread [C] trigger clock ``` 在 tick : 4 中 - `Thread [A]` 發送節點的訊號喚醒正在等待的 `Thread [B]` (喚醒時 `Thread [A]` 轉變為 ready 因此 `Thread [B]` 才能執行工作,此時 `Thread [A]` 交棒給 `Thread [B]` 後再次轉變為 not ready) - `Thread [B]` 因為 `bit` 的關係工作內容變成發送節點訊號喚醒正在等待的 `Thread [C]` 後續都是相似的步驟,因此不再贅述 最後在 tick : 8 中, `clock->ticks` 等於 $2^{N_{NODES}}$ 喚醒 main 執行緒,接著進入 `clock_stop(&clock)` 將 ticks 的值設為 `-1`,導致在執行緒函式中 `for` 迴圈中止。 :::info 為何在 `thread_func` 最後需要在進行 `node_signal(self)` ? ::: ### 實作 Priority-inheritance mutex 在 [mutex/pi-test/main.c](https://github.com/sysprog21/concurrent-programs/blob/master/mutex/pi-test/main.c) 函式中主要建立一個 Priority inversion 的情況 定義一個具集 `TRY(f)` 用來檢視錯誤,其中 `#f` 將參數 `f` 轉換為字符串,這樣可以在錯誤訊息中顯示出具體是哪個函式失敗。 ```c #define TRY(f) \ do { \ int __r; \ if ((__r = (f != 0))) { \ fprintf(stderr, "Run function %s = %d\n", #f, __r); \ return __r; \ } \ } while (0) ``` `ctx_init` 函式中的 `mutexattr_setprotocol` 設置了優先級繼承,這樣可以避免優先級反轉。 ```c static void ctx_init(struct ctx *ctx) { mutexattr_t mattr; mutexattr_setprotocol(&mattr, PRIO_INHERIT); mutex_init(&ctx->m0, &mattr); } ``` ```c #include <pthread.h> int pthread_mutexattr_setprotocol(pthread_mutexattr_t *attr, int protocol); ``` `attr` : 指向互斥鎖屬性的指標 `protocol` : 協議屬性 - `PTHREAD_PRIO_INHERIT` : 使用優先級繼承 - `PTHREAD_PRIO_PROTECT` : 使用優先級上限 - `PTHREAD_PRIO_NONE` : 不使用優先級繼承或上限的策略 **執行** - 必須透過 `sudo` 執行,因為牽涉對執行緒優先權的調整,需要 root 權限 - 必須使用 taskset 將程式鎖定在單一 CPU 上,否則多 CPU 資源的情況下,就沒辦法讓 middle 佔據唯一的 CPU。 ``` sudo taskset -c 1 ``` :::info 問題 1 在研讀的過程中 `pthread_create_with_prio` 函式主要用作創建一個具有優先及的執行緒,當中 `sched_param` 的結構體又是從何而來?在完整程式碼中看起來沒有引用到排程器相關標頭檔。 部份程式碼 ```c static int pthread_create_with_prio(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine)(void *), void *arg, int prio) { struct sched_param param; param.sched_priority = prio; TRY(pthread_attr_setschedparam(attr, &param)); TRY(pthread_create(thread, attr, start_routine, arg)); return 0; } ``` ::: 在 [pthread_setschedparam(3) — Linux manual page](https://www.man7.org/linux/man-pages/man3/pthread_setschedparam.3.html) 找到 `sched_param` 結構體 ```c struct sched_param { int sched_priority; /* Scheduling priority */ }; ``` :::info `pthread_attr_setschedpolicy(&attr, SCHED_FIFO)` `pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED)` 理解這兩 API 代表意思 ::: ```c int pthread_attr_setinheritsched(pthread_attr_t *attr,int inheritsched); ``` `attr` : 指向執行緒屬性的指標 `inheritsched` : 是否繼承負執行緒的排程屬性 - `PTHREAD_EXPLICIT_SCHED` : 不繼承,新執行緒使用由 `pthread_attr_setschedparam` 和 `pthread_attr_setschedpolicy` 設置的排程屬性,執行緒 - `PTHREAD_INHERIT_SCHED` : 繼承父執行緒的排程屬性 返回值:成功返回 `0` ,失敗返回非 0 值 ```c int pthread_attr_setschedpolicy(pthread_attr_t *attr, int policy); ``` `attr` : 指向執行緒屬性的指標 `policy` : 排程屬性 - `SCHED_FIFO` : 搶佔式排程,即會被優先即更高的執行緒搶佔 - `SCHED_RR` : 輪詢式排程,擁有同樣優先權的執行緒會以一固定時間量,又稱「時間配量」(time slice),用輪詢方式執行。 - `SCHED_OTHER` : 為 Linux 中默認。僅用於靜態優先級為 0 的執行緒(i.e., threads under real-time policies always have priority over `SCHED_OTHER` processes),從靜態優先級 0 列表中選擇要運行的執行緒,由動態優先級決定(即 nice 值)。 每個時間段中,當執行緒準備好運行但被排程器拒絕時,動態優先級會增加。這確保了所有 `SCHED_OTHER` 執行緒之間的公平進展。 返回值:成功返回 `0` ,失敗返回非 0 值 > [sched(7) — Linux manual page](https://www.man7.org/linux/man-pages/man7/sched.7.html) ```c int pthread_attr_setschedparam(pthread_attr_t *restrict attr, const struct sched_param *restrict param); ``` 返回值:成功返回 `0` ,失敗返回非 0 的值 :::info 問題 2 請問若沒有給定優先權的級別,系統的判定如何決定下個執行的執行緒? ::: TODO : 理解 nice 值、CFS **測試** 完整測試程式 -> [mutex/test_priority](https://github.com/jeremy90307/Posix_thread/tree/master/mutex/test_priority) 基於問題 3 進行相關測試。設定三個執行緒,Thread 1 與 Thread 2 將優先級設置在 1 (其中優先級設置範圍 1 ~ 99),Thread 使用預設屬性(即 `SCHED_OTHER`),並進行兩種排程屬性:`SCHED_FIFO`、`SCHED_RR` 測試 - `SCHED_FIFO` ``` $ make run sudo taskset -c 1 /home/jeremy/posix_thread/mutex/test_priority/test_priority 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 ``` 在搶佔式的排程中,Thread 1 並沒有讓出資源給 Thread 2 執行。 :::info 問題 3 當這種靜態優先級與動態優先級同時出現時,如何決定動態優先級的執行緒何時能執行?在 `SCHED_FIFO` 的結果中看到 Thread 3 並沒有成功執行。 ::: - `SCHED_RR` ``` $ make run sudo taskset -c 1 /home/jeremy/posix_thread/mutex/test_priority/test_priority 1 1 1 1 1 1 2 1 2 3 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 3 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 ``` 可以看到 Thread 1 與 Thread 2 將資源的平均分配給不同執行緒,Thread 3 的非即時執行緒根據動態優先級偶爾也能分配到資源。 ## [〈建立相容於 POSIX Thread 的實作〉](https://hackmd.io/@sysprog/concurrency/%2F%40sysprog%2Fconcurrency-thread-package) ### 實作 priority protection mutex **priority protection mutex 設定** `muthread_mutexattr_setprioceiling`: 設定 mutex 的 `prioceiling` ```c muthread_mutexattr_setprotocol(&mattr, TBTHREAD_PRIO_INHERIT); muthread_mutexattr_setprioceiling(&mattr, 25); muthread_mutex_init(&mutex_normal, &mattr); muthread_mutex_init(&mutex_normal_2, &mattr); ``` :::info 在[測驗程式 Tests/test-04-priority-inversion.c](https://github.com/qwe661234/MuThreadPackage/blob/main/Tests/test-04-priority-inversion.c) 中,我們在已經設定好會發生 priority inversion 的情況下去實作出 priority protection mutex 是否會過於不實際,若面對到更複雜的情況下使用 PP 是否對效能真的有更多提升?該如何驗證? ::: ## [〈Lock-Free Programming〉](https://hackmd.io/@sysprog/concurrency-lockfree) 何謂 - ABA? - Read–modify–write? - blocking 與 non-blocking - Compiler Barriers 的具體定義 > [浅谈Memory Reordering](https://web.archive.org/web/20220331124643/http://dreamrunner.org/blog/2014/06/28/qian-tan-memory-reordering/) -