# Ch. 19: Concurrency
### Programming Rust, 2nd Edition 導讀
###### tags: `Rust` `concurrency` `Talk` `Programming Rust`
---
<!-- .slide: style="font-size: 32px;" -->
## Concurrency Comes Safe & Fast
- Before Rust: Programs that use threads well are full of unwritten rules.
- Now with Rust: The unwritten rules are ==written down== and ==forced by the compilers==.
---
<!-- .slide: style="font-size: 32px;" -->
## 3 Ways to Use Threads
- Fork-join parallelism
- Channels
- Shared mutable state
---
<!-- .slide: style="font-size: 32px;" -->
## Fork-join parallelism

----
<!-- .slide: style="font-size: 32px;" -->
### `std::thread::spawn()`
```rust
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
```
- `F: FnOnce () -> T` → captures 會被 move 進 closure。
- `F: Send` & `T: Send` → 因為 closure 和回傳值都要在 threads 間傳遞。
- F: \`static & T: \`static → 因為 children threads 可能活得比 parent thread 長。
----
<!-- .slide: style="font-size: 32px;" -->
### `std::thread::JoinHandle::join()`
```rust
pub fn join(self) -> Result<T>
```
```rust
match thread_handle.join() {
Ok(t) => {
// t 為 closure 的回傳值。
}
Err => {
// 表示 thread 發生 panic。
// Children thread panic 不會(直接)害 parent panic,
// 讓 parent 有機會處理。
}
}
```
- 如果 thread handle 在 join 之前就被 drop 掉,就會變成 detached thread。若此時 `main()` 也結束的話,threads 就會直接被 kill 掉。
----
<!-- .slide: style="font-size: 32px;" -->
### Sharing Immutable Data Across Threads
- 不能直接把 ref 傳給 children threads,因為 parent 的 lifetime 不一定比 children 長。
- Use `std::sync::Arc` (atomic reference counting pointer):
```rust
let glossary = Arc::new(GigabyteMap::new());
let glossary_for_child = glossary.clone();
let handle =
spawn(move || process_files(worklist, &glossary_for_child));
```
----
<!-- .slide: style="font-size: 32px;" -->
### Rayon
```rust
use rayon::prelude::*;
fn process_files_in_parallel(filenames: Vec<String>, glossary: &GigabyteMap)
-> io::Result<()>
{
filenames
.par_iter() // iterator 瞬間變成 parallel iterator!
.map(|filename| process_file(filename, glossary)) // 不需要 Arc 了!
.reduce_with(|r1, r1| { // 順序不固定;和 fold 不同。
if r1.is_err() { r1 } else { r2 }
})
.unwrap_or(Ok(()))
}
```
- 另外,Rayon 會自動依據系統硬體能力進行 load balancing。
---
<!-- .slide: style="font-size: 32px;" -->
## Channels

----
<!-- .slide: style="font-size: 32px;" -->
- 把資料從一個 thread 送到另一個 thread 的單行道 -- thread-safe queue。
- Channel 的兩端各被不同的 threads 擁有。Sender 只有寫入的權限,receiver 只有移出的權限。
- Value 的 ownership 透過 channel 從 sender 傳送給 receiver。
- 如果 channel 中沒有資料,receive 端就會被 block 住。
----
<!-- .slide: style="font-size: 28px;" -->
```rust=
use std::sync::mpsc; // multi-producer, single consumer
let (sender, receiver) = mpsc::channel(); // 同時產生 sender & receiver
let handle = std::thread::spawn(move || {
// sender 的 ownership 會被 move 進這個 closure。
// 若希望在多個 thread 中使用 sender,必須要 clone()
for filename in documents {
let text = std::fs::read_to_string(filename)?;
// 若 receiver 被 dropped,send() 會回傳 Err
if sender.send(text).is_err() { break; }
}
Ok(())
});
while let Ok(text) = receiver.recv() { /*...*/ } // 若 sender dropped,會回傳 Err
// 或用 iterator 方式使用 receiver
for text in receiver { /*...*/ }
```
----
<!-- .slide: style="font-size: 32px;" -->
### Performance Concerns
- Channel 會依照使用的狀況,動態調整使用的資源,因此不會 suffer 太多的 overhead。
- 當 sender 速度比 receiver 速度快很多時,執行效率會很差。
- Synchronise channel: `send()` might be blocked.
- 書中的 pipeline model,瓶頸會發生在最慢的 stage。
- Option 1: 拆成更細的 stages
- Option 2: 用多個 threads 跑那個 stage。
----
### Traits `Send` and `Sync`
`Send`
: 此型別的資料可以在 threads 間轉移。
`Sync`
: 此型別的資料可以用 immutable ref 的方式在 threads 間共享。
- 是 Sync 就一定是 Send,但反之不然。像 `Receiver<T>` 就只能 Send,但不能 Sync。
---
## Shared Mutable State
----
<!-- .slide: style="font-size: 32px;" -->
### Mutex
- 強制 threads 輪流存取相同的一筆資料:同一時間只能有一個 thread 取得 lock。
1. 防止 data racing
2. 防止來自不同 threads 的操作穿插出現
3. 實作 invariant(內部正確性?)
- 在 Rust 中,mutex (lock) 和 data 是綁在一起的,可以避免錯誤使用。
----
<!-- .slide: style="font-size: 32px;" -->
### `Mutex<T>`
```rust=
// Mutex::new() 用起來雖然和 smart pointers 很像,但 Mutex 不管 memory 在哪配置,
// 也不管怎麼分配給不同 threads 參考。因此常會搭配 Arc 使用。
let waiting_list = Arc::new(Mutex::new(vec![]));
{
let mut guard = waiting_list.lock().unwrap(); // critical section 開始
// 雖然 waiting_list 是 immutable,但因為 Mutex 具有 iterior mutability,
// 因此可以 lock 為 mutable.
guard.push(player);
} // guard 被 dropped -> mutex unlocked. critical section 結束
```
----
<!-- .slide: style="font-size: 32px;" -->
### Deadlock
- 在同一個 thread 中,試著去 lock 一個已經 locked 的 mutex 就會發生 deadlock。
- 應該儘量讓 critical section 越小越好。
### Poisoned Mutex
- 如果某個 thread 在 lock mutex 的情況下發生 panic,那麼 mutex 就會變成 "poisoned" 狀態。
- Mutex poisoned 的話,其他的 threads 呼叫 `lock()` 就會回傳 `Err`。
- 其他的 threads 還是有辦法把 poisoned mutex 內部的資料取出,但必須了解:此時的資料正確性可能有問題。
----
### Mutex 不是萬靈丹
- Mutex 無法防止 data racing 以外的 race conditions,例如:存取資料的 timing/順序。
- 太過依賴 mutex 可能會設計出太過肥大的資料結構,造成管理/執行效率的問題。(← OOP 的通病)
- Mutex 仍可能發生 deadlock 或 poisoned mutex 等的問題。
----
<!-- .slide: style="font-size: 28px;" -->
### Multiconsumer Channels Using Mutexes
```rust
let (sender, receiver) = shared_channel();
let handles = (0..5)
.map(|idx|{
let worker_receiver = receiver.clone();
thread::spawn(move || {
let v = worker_receiver.into_iter().collect::<Vec<_>>();
println!("Thread {} got {:?}", idx, v);
})
})
.collect::<Vec<_>>();
for idx in 0..20 {
sender.send(idx).unwrap();
}
drop(sender); // sender 沒有結束的話,receivers 會一直等待....
for h in handles {
h.join().unwrap()
}
```
---
<!-- .slide: style="font-size: 28px;" -->
## Other Tools in `std::sync`
- `RwLock<T>` (Read/Write Locks)
同時間可以有多個 read-lock 或一個 write-lock。
適合「多讀、少寫」的應用。
- `Condvar` (Condition Variables)
用來 block thread 執行,直到條件成立為止。
- Atomics
可以保証存取動作不會被打斷的資料型別(及相關 operations)。
和 mutex 相比,overhead 最小、不使用 system calls。
可能只需單一的 CPU instruction 即完成操作。
---
<!-- .slide: style="font-size: 28px;" -->
## Global Variables
- 全域變數可能被多個 threads 存取,因此必須 thread-safe。
- Global `static mut` variables 只能使用 `unsafe` 存取。
- 可以使用 immutable atomic integer. (Interior mutable)
但能用的型別就被限制。
- 可以使用 immutable `Mutex<T>`、`RwLock<T>` 等。
但 `T` 的初始化函式必須為 `const` -- 可在編譯時算出初始值。
- 可以使用 crate `lazy_static` 的 `lazy_static!`,讓 `T` 在第一次被使用到的地方呼叫 non-`const` 函式進行初始化。
---
## Thank You
{"metaMigratedAt":"2023-06-17T22:53:33.720Z","metaMigratedFrom":"YAML","title":"Ch. 19: Concurrency","breaks":true,"slideOptions":"{\"transition\":\"slide\",\"theme\":\"moon\"}","contributors":"[{\"id\":\"eabb56da-9f98-45a8-859d-d4bc46846c02\",\"add\":7620,\"del\":803}]"}