第16章 恐れるな!並行性
2020/11/25 原山和之
- スレッドを生成して、複数のコードを同時に走らせる方法
- チャンネルがスレッド間でメッセージを送るメッセージ受け渡し並行性
- 複数のスレッドが何らかのデータにアクセスする状態共有並行性
- 標準ライブラリが提供する型だけでなく、ユーザが定義した型に対してもRustの並行性の安全保証を拡張するSyncとSendトレイト
- スレッドがデータやリソースに矛盾した順番でアクセスする競合状態
- 2つのスレッドがお互いにもう一方が持っているリソースを使用し終わるのを待ち、両者が継続するのを防ぐデッドロック
- 特定の状況でのみ起き、確実な再現や修正が困難なバグ
プログラミング言語によってスレッドはいくつかの方法で実装されています。多くのOSで、新規スレッドを生成するAPIが提供されています。 言語がOSのAPIを呼び出してスレッドを生成するこのモデルを時に1:1と呼びます。
グリーンスレッドを使用する言語は、それを異なる数のOSスレッドの文脈で実行します。 このため、グリーンスレッドのモデルはM:Nモデルと呼ばれます。
M個のグリーンスレッドに対して、 N個のOSスレッドがあり、MとNは必ずしも同じ数字ではありません。
- 言語によって全てのバイナリに含まれるコードのことを意味します。
- 口語的に誰かが「ノーランタイム」と言ったら、「小さいランタイム」のことを意味することがしばしばあります。 ランタイムが小さいと機能も少ないですが、バイナリのサイズも小さくなるという利点があります。
- 多くの言語では、 より多くの機能と引き換えにランタイムのサイズが膨れ上がるのは、受け入れられることですが、 Rustにはほとんどゼロのランタイムが必要でパフォーマンスを維持するためにCコードを呼び出せることを妥協できないのです。
- Rustの標準ライブラリは、1:1スレッドの実装のみを提供しています。
- M:Nのグリーンスレッドモデルは、スレッドを管理するのにより大きな言語ランタイムが必要です。
- M:Nのグリーンスレッドモデルを実装したクレートがある。
| use std::thread; |
| use std::time::Duration; |
| |
| fn main() { |
| thread::spawn(|| { |
| for i in 1..10 { |
| |
| println!("hi number {} from the spawned thread!", i); |
| thread::sleep(Duration::from_millis(1)); |
| } |
| }); |
| |
| for i in 1..5 { |
| |
| println!("hi number {} from the main thread!", i); |
| thread::sleep(Duration::from_millis(1)); |
| } |
| } |
| hi number 1 from the main thread! |
| hi number 1 from the spawned thread! |
| hi number 2 from the main thread! |
| hi number 2 from the spawned thread! |
| hi number 3 from the main thread! |
| hi number 3 from the spawned thread! |
| hi number 4 from the main thread! |
| hi number 4 from the spawned thread! |
| hi number 5 from the spawned thread! |
| use std::thread; |
| use std::time::Duration; |
| |
| fn main() { |
| let handle = thread::spawn(|| { |
| for i in 1..10 { |
| println!("hi number {} from the spawned thread!", i); |
| thread::sleep(Duration::from_millis(1)); |
| } |
| }); |
| |
| for i in 1..5 { |
| println!("hi number {} from the main thread!", i); |
| thread::sleep(Duration::from_millis(1)); |
| } |
| |
| handle.join().unwrap(); |
| } |
| hi number 1 from the spawned thread! |
| hi number 2 from the spawned thread! |
| hi number 3 from the spawned thread! |
| hi number 4 from the spawned thread! |
| hi number 5 from the spawned thread! |
| hi number 6 from the spawned thread! |
| hi number 7 from the spawned thread! |
| hi number 8 from the spawned thread! |
| hi number 9 from the spawned thread! |
| hi number 1 from the main thread! |
| hi number 2 from the main thread! |
| hi number 3 from the main thread! |
| hi number 4 from the main thread! |
| |
- moveクロージャは、thread::spawnとともによく使用されます。 あるスレッドのデータを別のスレッドで使用できるようになるからです。
| use std::thread; |
| |
| fn main() { |
| let v = vec![1, 2, 3]; |
| |
| let handle = thread::spawn(move || { |
| println!("Here's a vector: {:?}", v); |
| }); |
| |
| handle.join().unwrap(); |
| } |
- プログラミングにおけるチャンネルは、2分割できます: 転送機と受信機です。転送機はアヒルのおもちゃを川に置く上流になり、 受信機は、アヒルのおもちゃが行き着く下流になります…
| use std::sync::mpsc; |
| |
| fn main() { |
| let (tx, rx) = mpsc::channel(); |
| } |
- mpscはmultiple producer, single consumerを表しています。
- 1つのチャンネルが値を生成する複数の送信側と、 その値を消費するたった1つの受信側を持つことができるということを意味します。
- mpsc::channel関数はタプルを返し、1つ目の要素は、送信側、2つ目の要素は受信側になります。
- txとrxという略称は、多くの分野で伝統的に転送機と受信機
- let文を使うと、 mp
- mpsc::channelで返ってくるタプルの部品を抽出するのが便利になります。
| use std::thread; |
| use std::sync::mpsc; |
| |
| fn main() { |
| let (tx, rx) = mpsc::channel(); |
| |
| thread::spawn(move || { |
| let val = String::from("hi"); |
| tx.send(val).unwrap(); |
| }); |
| |
| let received = rx.recv().unwrap(); |
| |
| println!("Got: {}", received); |
| } |
| use std::thread; |
| use std::sync::mpsc; |
| |
| fn main() { |
| let (tx, rx) = mpsc::channel(); |
| |
| thread::spawn(move || { |
| let val = String::from("hi"); |
| tx.send(val).unwrap(); |
| |
| println!("val is {}", val); |
| }); |
| |
| let received = rx.recv().unwrap(); |
| println!("Got: {}", received); |
| } |
| use std::thread; |
| use std::sync::mpsc; |
| use std::time::Duration; |
| |
| fn main() { |
| let (tx, rx) = mpsc::channel(); |
| |
| thread::spawn(move || { |
| |
| let vals = vec![ |
| String::from("hi"), |
| String::from("from"), |
| String::from("the"), |
| String::from("thread"), |
| ]; |
| |
| for val in vals { |
| tx.send(val).unwrap(); |
| thread::sleep(Duration::from_secs(1)); |
| } |
| }); |
| |
| for received in rx { |
| println!("Got: {}", received); |
| } |
| } |
| let (tx, rx) = mpsc::channel(); |
| |
| let tx1 = mpsc::Sender::clone(&tx); |
| thread::spawn(move || { |
| let vals = vec![ |
| String::from("hi"), |
| String::from("from"), |
| String::from("the"), |
| String::from("thread"), |
| ]; |
| |
| for val in vals { |
| tx1.send(val).unwrap(); |
| thread::sleep(Duration::from_secs(1)); |
| } |
| }); |
| |
| thread::spawn(move || { |
| |
| let vals = vec![ |
| String::from("more"), |
| String::from("messages"), |
| String::from("for"), |
| String::from("you"), |
| ]; |
| |
| for val in vals { |
| tx.send(val).unwrap(); |
| thread::sleep(Duration::from_secs(1)); |
| } |
| }); |
| |
| for received in rx { |
| println!("Got: {}", received); |
| } |
- メモリ共有並行性は、複数の所有権に似ています: 複数のスレッドが同時に同じメモリ位置にアクセスできるのです。
どんな時も1つのスレッドにしかなんらかのデータへのアクセスを許可しないというように、 "mutual exclusion"(相互排他)の省略形です。
ミューテックスにあるデータにアクセスする = ミューテックスのロック
ミューテックスのロック = データを死守する(guarding)
- データを使用する前にロックの獲得を試みなければならない。
- ミューテックスが死守しているデータの使用が終わったら、他のスレッドがロックを獲得できるように、 データをアンロックしなければならない。
| use std::sync::Mutex; |
| |
| fn main() { |
| let m = Mutex::new(5); |
| |
| { |
| let mut num = m.lock().unwrap(); |
| *num = 6; |
| } |
| |
| println!("m = {:?}", m); |
| } |
| use std::sync::Mutex; |
| use std::thread; |
| |
| fn main() { |
| let counter = Mutex::new(0); |
| let mut handles = vec![]; |
| |
| for _ in 0..10 { |
| let handle = thread::spawn(move || { |
| let mut num = counter.lock().unwrap(); |
| |
| *num += 1; |
| }); |
| handles.push(handle); |
| } |
| |
| for handle in handles { |
| handle.join().unwrap(); |
| } |
| |
| println!("Result: {}", *counter.lock().unwrap()); |
| } |
| use std::sync::Mutex; |
| use std::thread; |
| |
| fn main() { |
| let counter = Mutex::new(0); |
| let mut handles = vec![]; |
| |
| let handle = thread::spawn(move || { |
| let mut num = counter.lock().unwrap(); |
| |
| *num += 1; |
| }); |
| handles.push(handle); |
| |
| let handle2 = thread::spawn(move || { |
| let mut num2 = counter.lock().unwrap(); |
| |
| *num2 += 1; |
| }); |
| handles.push(handle2); |
| |
| for handle in handles { |
| handle.join().unwrap(); |
| } |
| |
| println!("Result: {}", *counter.lock().unwrap()); |
| } |
さらに修正 … Rc<T>使う
| use std::rc::Rc; |
| use std::sync::Mutex; |
| use std::thread; |
| |
| fn main() { |
| let counter = Rc::new(Mutex::new(0)); |
| let mut handles = vec![]; |
| |
| for _ in 0..10 { |
| let counter = Rc::clone(&counter); |
| let handle = thread::spawn(move || { |
| let mut num = counter.lock().unwrap(); |
| |
| *num += 1; |
| }); |
| handles.push(handle); |
| } |
| |
| for handle in handles { |
| handle.join().unwrap(); |
| } |
| |
| println!("Result: {}", *counter.lock().unwrap()); |
| } |
| use std::sync::{Mutex, Arc}; |
| use std::thread; |
| |
| fn main() { |
| let counter = Arc::new(Mutex::new(0)); |
| let mut handles = vec![]; |
| |
| for _ in 0..10 { |
| let counter = Arc::clone(&counter); |
| let handle = thread::spawn(move || { |
| let mut num = counter.lock().unwrap(); |
| |
| *num += 1; |
| }); |
| handles.push(handle); |
| } |
| |
| for handle in handles { |
| handle.join().unwrap(); |
| } |
| |
| println!("Result: {}", *counter.lock().unwrap()); |
| } |
counterは不変なのに、その内部にある値への可変参照を得ることができたことに気付いたでしょうか; つまり、Mutex<T>は、Cell系のように内部可変性を提供するわけです。 第15章でRefCell<T>を使用してRc<T>の内容を可変化できるようにしたのと同様に、 Mutex<T>を使用してArc<T>の内容を可変化しているのです。
- Rust言語には、寡少な並行性機能があります。
- Sendでスレッド間の所有権の転送を許可する。
- Syncで複数のスレッドからのアクセスを許可する。
- SendとSyncを手動で実装するのは非安全である。
