changed 4 years ago
Published Linked with GitHub

第16回

第16章 恐れるな!並行性

2020/11/25 原山和之


本日やること

  • スレッドを生成して、複数のコードを同時に走らせる方法
  • チャンネルがスレッド間でメッセージを送るメッセージ受け渡し並行性
  • 複数のスレッドが何らかのデータにアクセスする状態共有並行性
  • 標準ライブラリが提供する型だけでなく、ユーザが定義した型に対してもRustの並行性の安全保証を拡張するSyncとSendトレイト

スレッドを使用してコードを同時に走らせる

  • スレッドがデータやリソースに矛盾した順番でアクセスする競合状態
  • 2つのスレッドがお互いにもう一方が持っているリソースを使用し終わるのを待ち、両者が継続するのを防ぐデッドロック
  • 特定の状況でのみ起き、確実な再現や修正が困難なバグ

1つのOSスレッドに対して1つの言語スレッドは

プログラミング言語によってスレッドはいくつかの方法で実装されています。多くのOSで、新規スレッドを生成するAPIが提供されています。 言語がOSのAPIを呼び出してスレッドを生成するこのモデルを時に1:1と呼びます。


グリーンスレッド

グリーンスレッドを使用する言語は、それを異なる数のOSスレッドの文脈で実行します。 このため、グリーンスレッドのモデルはM:Nモデルと呼ばれます。
M個のグリーンスレッドに対して、 N個のOSスレッドがあり、MとNは必ずしも同じ数字ではありません。


ランタイムの意味

  • 言語によって全てのバイナリに含まれるコードのことを意味します。
    • 口語的に誰かが「ノーランタイム」と言ったら、「小さいランタイム」のことを意味することがしばしばあります。 ランタイムが小さいと機能も少ないですが、バイナリのサイズも小さくなるという利点があります。
    • 多くの言語では、 より多くの機能と引き換えにランタイムのサイズが膨れ上がるのは、受け入れられることですが、 Rustにはほとんどゼロのランタイムが必要でパフォーマンスを維持するためにCコードを呼び出せることを妥協できないのです。

Rustでは

  • Rustの標準ライブラリは、1:1スレッドの実装のみを提供しています。
    • M:Nのグリーンスレッドモデルは、スレッドを管理するのにより大きな言語ランタイムが必要です。
    • M:Nのグリーンスレッドモデルを実装したクレートがある。

新規スレッドを生成例

use std::thread; use std::time::Duration; fn main() { thread::spawn(|| { for i in 1..10 { // やあ!立ち上げたスレッドから数字{}だよ! println!("hi number {} from the spawned thread!", i); thread::sleep(Duration::from_millis(1)); } }); for i in 1..5 { // メインスレッドから数字{}だよ! println!("hi number {} from the main thread!", i); thread::sleep(Duration::from_millis(1)); } }

生成結果

hi number 1 from the main thread! hi number 1 from the spawned thread! hi number 2 from the main thread! hi number 2 from the spawned thread! hi number 3 from the main thread! hi number 3 from the spawned thread! hi number 4 from the main thread! hi number 4 from the spawned thread! hi number 5 from the spawned thread!

joinハンドルで全スレッドの終了を待つ

use std::thread; use std::time::Duration; fn main() { let handle = thread::spawn(|| { for i in 1..10 { println!("hi number {} from the spawned thread!", i); thread::sleep(Duration::from_millis(1)); } }); for i in 1..5 { println!("hi number {} from the main thread!", i); thread::sleep(Duration::from_millis(1)); } handle.join().unwrap(); }

Joinの結果

hi number 1 from the spawned thread! hi number 2 from the spawned thread! hi number 3 from the spawned thread! hi number 4 from the spawned thread! hi number 5 from the spawned thread! hi number 6 from the spawned thread! hi number 7 from the spawned thread! hi number 8 from the spawned thread! hi number 9 from the spawned thread! hi number 1 from the main thread! hi number 2 from the main thread! hi number 3 from the main thread! hi number 4 from the main thread!

スレッドでmoveクロージャを使用する

  • moveクロージャは、thread::spawnとともによく使用されます。 あるスレッドのデータを別のスレッドで使用できるようになるからです。
use std::thread; fn main() { let v = vec![1, 2, 3]; let handle = thread::spawn(move || { println!("Here's a vector: {:?}", v); }); handle.join().unwrap(); }

メッセージ受け渡しを使ってスレッド間でデータを転送する

  • プログラミングにおけるチャンネルは、2分割できます: 転送機と受信機です。転送機はアヒルのおもちゃを川に置く上流になり、 受信機は、アヒルのおもちゃが行き着く下流になります
use std::sync::mpsc; fn main() { let (tx, rx) = mpsc::channel(); }

mpsc::channel関数

  • mpscはmultiple producer, single consumerを表しています。
  • 1つのチャンネルが値を生成する複数の送信側と、 その値を消費するたった1つの受信側を持つことができるということを意味します。
  • mpsc::channel関数はタプルを返し、1つ目の要素は、送信側、2つ目の要素は受信側になります。
  • txとrxという略称は、多くの分野で伝統的に転送機と受信機
  • let文を使うと、 mp
  • mpsc::channelで返ってくるタプルの部品を抽出するのが便利になります。

やりとり例

use std::thread; use std::sync::mpsc; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); let received = rx.recv().unwrap(); // 値は{}です println!("Got: {}", received); }

チャンネルと所有権の転送

use std::thread; use std::sync::mpsc; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); // valは{} println!("val is {}", val); }); let received = rx.recv().unwrap(); println!("Got: {}", received); }

複数の値を送信し、受信側が待機するのを確かめる

use std::thread; use std::sync::mpsc; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { // スレッドからやあ(hi from the thread) let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("thread"), ]; for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); for received in rx { println!("Got: {}", received); } }

転送機をクローンして複数の生成器を作成する

let (tx, rx) = mpsc::channel(); let tx1 = mpsc::Sender::clone(&tx); thread::spawn(move || { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("thread"), ]; for val in vals { tx1.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); thread::spawn(move || { // 君のためにもっとメッセージを(more messages for you) let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); for received in rx { println!("Got: {}", received); }

状態共有並行性

  • メモリ共有並行性は、複数の所有権に似ています: 複数のスレッドが同時に同じメモリ位置にアクセスできるのです。

ミューテックス

  • どんな時も1つのスレッドにしかなんらかのデータへのアクセスを許可しないというように、 "mutual exclusion"(相互排他)の省略形です。

  • ミューテックスにあるデータにアクセスする = ミューテックスのロック

  • ミューテックスのロック = データを死守する(guarding)


ミューテックスは、2つの規則

  • データを使用する前にロックの獲得を試みなければならない。
  • ミューテックスが死守しているデータの使用が終わったら、他のスレッドがロックを獲得できるように、 データをアンロックしなければならない。

Mutex<T>のAPI

  • まずはシングルスレッド
use std::sync::Mutex; fn main() { let m = Mutex::new(5); { let mut num = m.lock().unwrap(); *num = 6; } println!("m = {:?}", m); }

複数のスレッド間でMutex<T>を共有する

  • 10個のスレッドエラーになる
use std::sync::Mutex; use std::thread; fn main() { let counter = Mutex::new(0); let mut handles = vec![]; for _ in 0..10 { let handle = thread::spawn(move || { let mut num = counter.lock().unwrap(); *num += 1; }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } println!("Result: {}", *counter.lock().unwrap()); }

修正

  • 2つのスレッドを作成してやってみるエラー
use std::sync::Mutex; use std::thread; fn main() { let counter = Mutex::new(0); let mut handles = vec![]; let handle = thread::spawn(move || { let mut num = counter.lock().unwrap(); *num += 1; }); handles.push(handle); let handle2 = thread::spawn(move || { let mut num2 = counter.lock().unwrap(); *num2 += 1; }); handles.push(handle2); for handle in handles { handle.join().unwrap(); } println!("Result: {}", *counter.lock().unwrap()); }

さらに修正 Rc<T>使う

  • 複数のスレッドで複数の所有権
use std::rc::Rc; use std::sync::Mutex; use std::thread; fn main() { let counter = Rc::new(Mutex::new(0)); let mut handles = vec![]; for _ in 0..10 { let counter = Rc::clone(&counter); let handle = thread::spawn(move || { let mut num = counter.lock().unwrap(); *num += 1; }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } println!("Result: {}", *counter.lock().unwrap()); }

Arc<T>で原子的な参照カウント

use std::sync::{Mutex, Arc}; use std::thread; fn main() { let counter = Arc::new(Mutex::new(0)); let mut handles = vec![]; for _ in 0..10 { let counter = Arc::clone(&counter); let handle = thread::spawn(move || { let mut num = counter.lock().unwrap(); *num += 1; }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } println!("Result: {}", *counter.lock().unwrap()); }

RefCell<T>/Rc<T>とMutex<T>/Arc<T>の類似性

counterは不変なのに、その内部にある値への可変参照を得ることができたことに気付いたでしょうか; つまり、Mutex<T>は、Cell系のように内部可変性を提供するわけです。 第15章でRefCell<T>を使用してRc<T>の内容を可変化できるようにしたのと同様に、 Mutex<T>を使用してArc<T>の内容を可変化しているのです。


SyncとSendトレイトで拡張可能な並行性

  • Rust言語には、寡少な並行性機能があります。
  • Sendでスレッド間の所有権の転送を許可する。
  • Syncで複数のスレッドからのアクセスを許可する。
  • SendとSyncを手動で実装するのは非安全である。

Select a repo