# Ch. 19: Concurrency ### Programming Rust, 2nd Edition 導讀 ###### tags: `Rust` `concurrency` `Talk` `Programming Rust` --- <!-- .slide: style="font-size: 32px;" --> ## Concurrency Comes Safe & Fast - Before Rust: Programs that use threads well are full of unwritten rules. - Now with Rust: The unwritten rules are ==written down== and ==forced by the compilers==. --- <!-- .slide: style="font-size: 32px;" --> ## 3 Ways to Use Threads - Fork-join parallelism - Channels - Shared mutable state --- <!-- .slide: style="font-size: 32px;" --> ## Fork-join parallelism ![](https://hackmd.io/_uploads/BJhVZCN1n.png) ---- <!-- .slide: style="font-size: 32px;" --> ### `std::thread::spawn()` ```rust pub fn spawn<F, T>(f: F) -> JoinHandle<T> where F: FnOnce() -> T + Send + 'static, T: Send + 'static, ``` - `F: FnOnce () -> T` → captures 會被 move 進 closure。 - `F: Send` & `T: Send` → 因為 closure 和回傳值都要在 threads 間傳遞。 - F: \`static & T: \`static → 因為 children threads 可能活得比 parent thread 長。 ---- <!-- .slide: style="font-size: 32px;" --> ### `std::thread::JoinHandle::join()` ```rust pub fn join(self) -> Result<T> ``` ```rust match thread_handle.join() { Ok(t) => { // t 為 closure 的回傳值。 } Err => { // 表示 thread 發生 panic。 // Children thread panic 不會(直接)害 parent panic, // 讓 parent 有機會處理。 } } ``` - 如果 thread handle 在 join 之前就被 drop 掉,就會變成 detached thread。若此時 `main()` 也結束的話,threads 就會直接被 kill 掉。 ---- <!-- .slide: style="font-size: 32px;" --> ### Sharing Immutable Data Across Threads - 不能直接把 ref 傳給 children threads,因為 parent 的 lifetime 不一定比 children 長。 - Use `std::sync::Arc` (atomic reference counting pointer): ```rust let glossary = Arc::new(GigabyteMap::new()); let glossary_for_child = glossary.clone(); let handle = spawn(move || process_files(worklist, &glossary_for_child)); ``` ---- <!-- .slide: style="font-size: 32px;" --> ### Rayon ```rust use rayon::prelude::*; fn process_files_in_parallel(filenames: Vec<String>, glossary: &GigabyteMap) -> io::Result<()> { filenames .par_iter() // iterator 瞬間變成 parallel iterator! .map(|filename| process_file(filename, glossary)) // 不需要 Arc 了! .reduce_with(|r1, r1| { // 順序不固定;和 fold 不同。 if r1.is_err() { r1 } else { r2 } }) .unwrap_or(Ok(())) } ``` - 另外,Rayon 會自動依據系統硬體能力進行 load balancing。 --- <!-- .slide: style="font-size: 32px;" --> ## Channels ![](https://hackmd.io/_uploads/r1q-xJrJ3.png) ---- <!-- .slide: style="font-size: 32px;" --> - 把資料從一個 thread 送到另一個 thread 的單行道 -- thread-safe queue。 - Channel 的兩端各被不同的 threads 擁有。Sender 只有寫入的權限,receiver 只有移出的權限。 - Value 的 ownership 透過 channel 從 sender 傳送給 receiver。 - 如果 channel 中沒有資料,receive 端就會被 block 住。 ---- <!-- .slide: style="font-size: 28px;" --> ```rust= use std::sync::mpsc; // multi-producer, single consumer let (sender, receiver) = mpsc::channel(); // 同時產生 sender & receiver let handle = std::thread::spawn(move || { // sender 的 ownership 會被 move 進這個 closure。 // 若希望在多個 thread 中使用 sender,必須要 clone() for filename in documents { let text = std::fs::read_to_string(filename)?; // 若 receiver 被 dropped,send() 會回傳 Err if sender.send(text).is_err() { break; } } Ok(()) }); while let Ok(text) = receiver.recv() { /*...*/ } // 若 sender dropped,會回傳 Err // 或用 iterator 方式使用 receiver for text in receiver { /*...*/ } ``` ---- <!-- .slide: style="font-size: 32px;" --> ### Performance Concerns - Channel 會依照使用的狀況,動態調整使用的資源,因此不會 suffer 太多的 overhead。 - 當 sender 速度比 receiver 速度快很多時,執行效率會很差。 - Synchronise channel: `send()` might be blocked. - 書中的 pipeline model,瓶頸會發生在最慢的 stage。 - Option 1: 拆成更細的 stages - Option 2: 用多個 threads 跑那個 stage。 ---- ### Traits `Send` and `Sync` `Send` : 此型別的資料可以在 threads 間轉移。 `Sync` : 此型別的資料可以用 immutable ref 的方式在 threads 間共享。 - 是 Sync 就一定是 Send,但反之不然。像 `Receiver<T>` 就只能 Send,但不能 Sync。 --- ## Shared Mutable State ---- <!-- .slide: style="font-size: 32px;" --> ### Mutex - 強制 threads 輪流存取相同的一筆資料:同一時間只能有一個 thread 取得 lock。 1. 防止 data racing 2. 防止來自不同 threads 的操作穿插出現 3. 實作 invariant(內部正確性?) - 在 Rust 中,mutex (lock) 和 data 是綁在一起的,可以避免錯誤使用。 ---- <!-- .slide: style="font-size: 32px;" --> ### `Mutex<T>` ```rust= // Mutex::new() 用起來雖然和 smart pointers 很像,但 Mutex 不管 memory 在哪配置, // 也不管怎麼分配給不同 threads 參考。因此常會搭配 Arc 使用。 let waiting_list = Arc::new(Mutex::new(vec![])); { let mut guard = waiting_list.lock().unwrap(); // critical section 開始 // 雖然 waiting_list 是 immutable,但因為 Mutex 具有 iterior mutability, // 因此可以 lock 為 mutable. guard.push(player); } // guard 被 dropped -> mutex unlocked. critical section 結束 ``` ---- <!-- .slide: style="font-size: 32px;" --> ### Deadlock - 在同一個 thread 中,試著去 lock 一個已經 locked 的 mutex 就會發生 deadlock。 - 應該儘量讓 critical section 越小越好。 ### Poisoned Mutex - 如果某個 thread 在 lock mutex 的情況下發生 panic,那麼 mutex 就會變成 "poisoned" 狀態。 - Mutex poisoned 的話,其他的 threads 呼叫 `lock()` 就會回傳 `Err`。 - 其他的 threads 還是有辦法把 poisoned mutex 內部的資料取出,但必須了解:此時的資料正確性可能有問題。 ---- ### Mutex 不是萬靈丹 - Mutex 無法防止 data racing 以外的 race conditions,例如:存取資料的 timing/順序。 - 太過依賴 mutex 可能會設計出太過肥大的資料結構,造成管理/執行效率的問題。(← OOP 的通病) - Mutex 仍可能發生 deadlock 或 poisoned mutex 等的問題。 ---- <!-- .slide: style="font-size: 28px;" --> ### Multiconsumer Channels Using Mutexes ```rust let (sender, receiver) = shared_channel(); let handles = (0..5) .map(|idx|{ let worker_receiver = receiver.clone(); thread::spawn(move || { let v = worker_receiver.into_iter().collect::<Vec<_>>(); println!("Thread {} got {:?}", idx, v); }) }) .collect::<Vec<_>>(); for idx in 0..20 { sender.send(idx).unwrap(); } drop(sender); // sender 沒有結束的話,receivers 會一直等待.... for h in handles { h.join().unwrap() } ``` --- <!-- .slide: style="font-size: 28px;" --> ## Other Tools in `std::sync` - `RwLock<T>` (Read/Write Locks) 同時間可以有多個 read-lock 或一個 write-lock。 適合「多讀、少寫」的應用。 - `Condvar` (Condition Variables) 用來 block thread 執行,直到條件成立為止。 - Atomics 可以保証存取動作不會被打斷的資料型別(及相關 operations)。 和 mutex 相比,overhead 最小、不使用 system calls。 可能只需單一的 CPU instruction 即完成操作。 --- <!-- .slide: style="font-size: 28px;" --> ## Global Variables - 全域變數可能被多個 threads 存取,因此必須 thread-safe。 - Global `static mut` variables 只能使用 `unsafe` 存取。 - 可以使用 immutable atomic integer. (Interior mutable) 但能用的型別就被限制。 - 可以使用 immutable `Mutex<T>`、`RwLock<T>` 等。 但 `T` 的初始化函式必須為 `const` -- 可在編譯時算出初始值。 - 可以使用 crate `lazy_static` 的 `lazy_static!`,讓 `T` 在第一次被使用到的地方呼叫 non-`const` 函式進行初始化。 --- ## Thank You
{"metaMigratedAt":"2023-06-17T22:53:33.720Z","metaMigratedFrom":"YAML","title":"Ch. 19: Concurrency","breaks":true,"slideOptions":"{\"transition\":\"slide\",\"theme\":\"moon\"}","contributors":"[{\"id\":\"eabb56da-9f98-45a8-859d-d4bc46846c02\",\"add\":7620,\"del\":803}]"}
    174 views