---
tags: Rust, System Software
---
# Rust: Rayon
> [Rayon](https://github.com/rayon-rs/rayon)
以學習 Rust、系統軟體的應用技巧為目的,此文章用來記錄針對 Rust 專案的 trace code 過程,並嘗試對專案可能的不足之處提出想法並修改(~~可能的話啦~~)。作為日後查找的用途,比較艱澀的語法或知識等相關文件也會被附註於其中。
## 簡介
Rayon 是一個平行計算的 Rust 函式庫,提供十分便利的 API 以將序列計算轉換為平行計算(特別在 iterator 的部份)。藉由 Rust 對 ownership 概念的嚴格檢查,這個函式庫還確保了 data-race 的問題是被避免的。
關於這個專案的起源與更多介紹,可以閱讀原作者的文章 [Rayon: data parallelism in Rust](https://smallcultfollowing.com/babysteps/blog/2015/12/18/rayon-data-parallelism-in-rust/)。
## `par_iter`
讓我們從該專案中所展示的最基本的使用方式開始。如下是一個將陣列的每個元素進行平方和的函式:
```rust
fn sum_of_squares(input: &[i32]) -> i32 {
input.par_iter() // <-- just change that!
.map(|&i| i * i)
.sum()
}
```
### `IntoParallelRefIterator`
由於 rayon 對 array reference 實現了 `IntoParallelRefIterator` trait,因此該型別中被安插了 `par_iter` 這個 method。該 trait 之定義如下:
```rust
pub trait IntoParallelIterator {
type Iter: ParallelIterator<Item = Self::Item>;
type Item: Send;
fn into_par_iter(self) -> Self::Iter;
}
pub trait IntoParallelRefIterator<'data> {
type Iter: ParallelIterator<Item = Self::Item>;
type Item: Send + 'data;
fn par_iter(&'data self) -> Self::Iter;
}
```
type 關鍵字是用來標注為每個結構定義 trait 時的 `associated types`:
> * [Advanced Traits](https://doc.rust-lang.org/book/ch19-03-advanced-traits.html)
> * [Associated Type](http://web.mit.edu/rust-lang_v1.25/arch/amd64_ubuntu1404/share/doc/rust/html/book/first-edition/associated-types.html)
> * [Associated items](https://doc.rust-lang.org/rust-by-example/generics/assoc_items.html)
其中,可以看到 `par_iter` 回傳的型別被要求具備 [`ParallelIterator`](https://github.com/rayon-rs/rayon/blob/3153b48/src/iter/mod.rs#L348) trait,後者是 Rayon 所定義專屬於平行用途的 iterator 所需具備的 trait,具有和[標準庫的 iterator](https://doc.rust-lang.org/std/iter/trait.Iterator.html) 相容的 method,我們會嘗試在後續進行深入探討。
> * [Matching a generic parameter to an associated type in an impl](https://stackoverflow.com/questions/29345708/matching-a-generic-parameter-to-an-associated-type-in-an-impl) (沒找到語法的官方術語? 待補充更詳盡的說明)
```rust
impl<'data, T: Sync + 'data> IntoParallelIterator for &'data [T] {
type Item = &'data T;
type Iter = Iter<'data, T>;
fn into_par_iter(self) -> Self::Iter {
Iter { slice: self }
}
}
impl<'data, I: 'data + ?Sized> IntoParallelRefIterator<'data> for I
where
&'data I: IntoParallelIterator,
{
type Iter = <&'data I as IntoParallelIterator>::Iter;
type Item = <&'data I as IntoParallelIterator>::Item;
fn par_iter(&'data self) -> Self::Iter {
self.into_par_iter()
}
}
```
而在 Rayon 中對 slice reference 的 trait 實現則如上程式碼所示。可以發現無論是使用 `input.par_iter()` 或者 `input.into_par_iter()`,對於這個 slice refererence 來說最後都是得到一個 [immutable slice iterator](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/slice/mod.rs#L545)。
:::info
:notes: `type Iter = Iter<'data, T>` statement 中,第一個 `Iter` 是 associated type 的 keyword,第二個 [`Iter`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/slice/mod.rs#L545) 則是 Rayon 所定義的 slice iterator,兩者並不相同
:::
### `map`
接著,Iter 的 `map` 方法被呼叫(來自於 `ParallelIterator` trait):
```rust
fn map<F, R>(self, map_op: F) -> Map<Self, F>
where
F: Fn(Self::Item) -> R + Sync + Send,
R: Send,
{
Map::new(self, map_op)
}
```
實際上是先透過其 [Map](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/map.rs#L15) 去建立相關資料結構。map 中將包含前面建立出來的 `Iter` 本身,以及 [closure](https://doc.rust-lang.org/book/ch13-01-closures.html)。
```rust
fn sum<S>(self) -> S
where
S: Send + Sum<Self::Item> + Sum<S>,
{
sum::sum(self)
}
pub(super) fn sum<PI, S>(pi: PI) -> S
where
PI: ParallelIterator,
S: Send + Sum<PI::Item> + Sum,
{
pi.drive_unindexed(SumConsumer::new())
}
```
### `sum`
接著再透過 `sum` 方法(`Map` 本身也具有 `ParallelIterator` trait)來作總和的計算。對於 `sum` 會用到的 `drive_unindexed`,`Map` 的 [`drive_unindexed`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/map.rs#L44) 會呼叫底下的 `Iter` 之 [`drive_unindexed`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/slice/mod.rs#L555) 結構之實作。
```rust
pub fn bridge<I, C>(par_iter: I, consumer: C) -> C::Result
where
I: IndexedParallelIterator,
C: Consumer<I::Item>,
{
let len = par_iter.len();
return par_iter.with_producer(Callback { len, consumer });
struct Callback<C> {
len: usize,
consumer: C,
}
impl<C, I> ProducerCallback<I> for Callback<C>
where
C: Consumer<I>,
{
type Output = C::Result;
fn callback<P>(self, producer: P) -> C::Result
where
P: Producer<Item = I>,
{
bridge_producer_consumer(self.len, producer, self.consumer)
}
}
}
```
### `bridge`
最終,`sum` 會呼叫到 `bridge`,參數的 cosumer 為 [`MapConsumer`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/map.rs#L167),`MapConsumer` 中則包含主體的 [`SumConsumer`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/sum.rs#L19) 和其 closure。
由於 Iter 也具有 [`IndexedParallelIterator`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/mod.rs#L2244) trait,所以其需要實作的 [3 個 method](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/slice/mod.rs#L570) 如下所示:
```rust
impl<'data, T: Sync + 'data> IndexedParallelIterator for Iter<'data, T> {
fn drive<C>(self, consumer: C) -> C::Result
where
C: Consumer<Self::Item>,
{
bridge(self, consumer)
}
fn len(&self) -> usize {
self.slice.len()
}
fn with_producer<CB>(self, callback: CB) -> CB::Output
where
CB: ProducerCallback<Self::Item>,
{
callback.callback(IterProducer { slice: self.slice })
}
}
```
綜合來看,[`bridge`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/plumbing/mod.rs#L351) 函式中會將 slice 長度 `len` 和傳遞進來的 `MapConsumer` 建立為 `Callback` 結構,作為 `with_producer` 的參數。`with_producer` 中則建立一個 [`IterProducer`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/slice/mod.rs#L590),最後這些參數被總結成 [`bridge_producer_consumer`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/plumbing/mod.rs#L391) 的呼叫:
### `bridge_producer_consumer`
```rust
bridge_producer_consumer(len, IterProducer, MapConsumer)
```
> * [Producer](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/plumbing/mod.rs#L59) trait
> * [Consumer](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/plumbing/mod.rs#L128) trait
```rust=
pub fn bridge_producer_consumer<P, C>(len: usize, producer: P, consumer: C) -> C::Result
where
P: Producer,
C: Consumer<P::Item>,
{
let splitter = LengthSplitter::new(producer.min_len(), producer.max_len(), len);
return helper(len, false, splitter, producer, consumer);
fn helper<P, C>(
len: usize,
migrated: bool,
mut splitter: LengthSplitter,
producer: P,
consumer: C,
) -> C::Result
where
P: Producer,
C: Consumer<P::Item>,
{
if consumer.full() {
consumer.into_folder().complete()
}
...
```
`bridge_producer_consumer` 會建立一個 [`LengthSplitter`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/plumbing/mod.rs#L302),然後透過 `helper` 進行核心的計算。`LengthSplitter` 會紀錄一個停止 split 的數值 `min` 以及一個至多可以 split 多少次的數值
* 在 `SumConsumer` 的 `if consumer.full()` 恆為 false
```rust=24
else if splitter.try_split(len, migrated) {
let mid = len / 2;
let (left_producer, right_producer) = producer.split_at(mid);
let (left_consumer, right_consumer, reducer) = consumer.split_at(mid);
let (left_result, right_result) = join_context(
|context| {
helper(
mid,
context.migrated(),
splitter,
left_producer,
left_consumer,
)
},
|context| {
helper(
len - mid,
context.migrated(),
splitter,
right_producer,
right_consumer,
)
},
);
reducer.reduce(left_result, right_result)
}
```
`try_split` 決定是否將 consumer 的 slice 進行分割運算,如果確定要進行,則對 `producer` 呼叫 [`split_at`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/slice/mod.rs#L602),將 slice 從中間的 index 分開成兩份,建立出兩個新的 producer。對於 `MapConsumer`,`consumer` 的 [`split_at`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/map.rs#L188) 實際上則從一個 consumer 結構中建立出 3 個新的 consumer。
[`join_context`](https://github.com/rayon-rs/rayon/blob/3c7489e16816b9a90404d47cb5f5601bdda193e4/rayon-core/src/join/mod.rs#L115) 則是整個 Rayon library 最核心之處的設計,我們會在後面章節深入探討。但整體而言,即是遞迴將任務先做分割(divide)。最後,`reducer` 藉由 [`reduce`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/sum.rs#L73) 方法,將兩個 helper 各自得到的 sum 結果進行相加操作,整合(conquer)兩個任務的結果。
```rust=50
else {
producer.fold_with(consumer.into_folder()).complete()
}
}
}
}
```
如果不再 divde,則 `consumer.into_folder()` 產生一個 [`MapFolder`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/map.rs#L224),後者底下包含一個 [`SumFolder`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/sum.rs#L78) 結構。而 [`producer.fold_with`](https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/mod.rs#L106) 可以透過 `MapFolder` 的 [`consume_iter`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/map.rs#L244) 先將 producer 的 slice 之每個元素透過 closure 進行計算,其結果再藉由 `SumFolder` 之 [`consume_iter`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/sum.rs#L94) 進行逐一相加,最終再透過 [`complete`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/sum.rs#L103) 方法得到相加之結果並且返回。
在此之外,我們可以從實作中發現到: 對 iterator 的 `map` 操作並沒有立刻被計算,而是直到後續的 `sum` 才被計算,符合了 [functional programing](https://en.wikipedia.org/wiki/Functional_programming) 的 [lazy evaluation](https://en.wikipedia.org/wiki/Lazy_evaluation) 特性。
## Rayon 的基本環境
在探討 Rayon 核心的 `join` primitive 之前,有一個關鍵的問題是,我們在此前的程式碼追蹤中未看到 thread pool 等讓 Rayon 可以分配核心資源的程式碼是在哪裡執行的。
我們可以注意到範例的程式碼並不需要先顯式的啟動環境。事實上,Rayon 會在程式內部呼叫 [`global_registry`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/registry.rs#L167),取得分配任務需要的 `Registry` 結構,如果 `global_registry` 是第一次被呼叫則會將此結構進行初始化,以準備好必要的 Rayon 基本環境。
以前述的 `par_iter` 使用案例來說,環境的初始化是在建立 `LengthSplitter` 的時候一併完成的。`LengthSplitter` 內部會建立 [`Splitter`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/plumbing/mod.rs#L256),而後者會呼叫 [`current_num_threads`](https://github.com/rayon-rs/rayon/blob/3c7489e16816b9a90404d47cb5f5601bdda193e4/rayon-core/src/lib.rs#L108)(最後呼叫到 `Registry` 內部的 [`current_num_threads`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/registry.rs#L281))。在這裡,Rayon 的核心資料結構就透過 `global_registry` 進行建立。
### `global_registry`
```rust
pub(super) fn global_registry() -> &'static Arc<Registry> {
set_global_registry(|| Registry::new(ThreadPoolBuilder::new()))
.or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) })
.expect("The global thread pool has not been initialized.")
}
fn set_global_registry<F>(registry: F) -> Result<&'static Arc<Registry>, ThreadPoolBuildError>
where
F: FnOnce() -> Result<Arc<Registry>, ThreadPoolBuildError>,
{
let mut result = Err(ThreadPoolBuildError::new(
ErrorKind::GlobalPoolAlreadyInitialized,
));
THE_REGISTRY_SET.call_once(|| {
result = registry()
.map(|registry: Arc<Registry>| unsafe { &*THE_REGISTRY.get_or_insert(registry) })
});
result
}
```
整個 `global_registry` 的關鍵為
1. [`THE_REGISTRY_SET`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/registry.rs#L162) 是一個 [`Once`](http://web.mit.edu/rust-lang_v1.25/arch/amd64_ubuntu1404/share/doc/rust/html/std/sync/struct.Once.html),後者表示一個 one-time global initialization,`call_once` 只有在第一次呼叫時可以執行
2. 第一次執行時會透過 `ThreadPoolBuilder::new()` 建立一個 thread pool builder (但尚未初始化),再透過 `Registry::new(ThreadPoolBuilder::new()` 初始化該 thread pool builder 以及 [`THE_REGISTRY`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/registry.rs#L161)
3. `call_once` 不能執行則需要檢查第一次執行是否成功
:::info
對於 Result 的一些相關操作:
* [`or_else`](https://doc.rust-lang.org/std/result/enum.Result.html#method.or_else)
* [`ok_or`](https://doc.rust-lang.org/std/option/enum.Option.html#method.ok_or)
* [`get_or_insert`](https://doc.rust-lang.org/std/option/enum.Option.html#method.get_or_insert)
:::
### [`Registry::new`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/registry.rs#L211)
```rust=
pub(super) fn new<S>(
mut builder: ThreadPoolBuilder<S>,
) -> Result<Arc<Self>, ThreadPoolBuildError>
where
S: ThreadSpawn,
{
let n_threads = builder.get_num_threads();
let breadth_first = builder.get_breadth_first();
let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads)
.map(|_| {
let worker = if breadth_first {
Worker::new_fifo()
} else {
Worker::new_lifo()
};
let stealer = worker.stealer();
(worker, stealer)
})
.unzip();
```
* [`get_num_threads`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/lib.rs#L400) 初始化並回傳 threadpool 裡的 thread 數量(根據實際 cpu 或環境變數)
* `get_breadth_first` 決定 workqueue 取任務的方向,參考[註解](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/lib.rs#L509)
* 根據 thread 的數量建立各自的 worker / stealer,參考 [`crossbeam-deque`](https://docs.rs/crossbeam-deque/0.8.1/crossbeam_deque/)
```rust=23
let logger = Logger::new(n_threads);
let registry = Arc::new(Registry {
logger: logger.clone(),
thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
sleep: Sleep::new(logger, n_threads),
injected_jobs: Injector::new(),
terminate_count: AtomicUsize::new(1),
panic_handler: builder.take_panic_handler(),
start_handler: builder.take_start_handler(),
exit_handler: builder.take_exit_handler(),
});
// If we return early or panic, make sure to terminate existing threads.
let t1000 = Terminator(®istry);
```
建立新的 `Registry` 結構:
* [`logger`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/log.rs#L109): 記錄事件發生(debug 用)
* [`thread_infos`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/registry.rs#L553): 每個 thread 都有自己對應的 `stealer` 可以取走該 thread 之 work queue 上任務(`stealer` 可以在 thread 之間共享,因此可以使用其他 thread 的 `stealer` 取走屬於其他 thread 之 work queue 的任務)
* [`sleep`](https://github.com/rayon-rs/rayon/tree/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/sleep): 與 worker thread 的休眠與喚醒相關
* [`injected_jobs`](https://docs.rs/crossbeam-deque/0.8.1/crossbeam_deque/): 在 [`crossbeam-deque`](https://docs.rs/crossbeam-deque/0.8.1/crossbeam_deque/) 中的 global FIFO queue
* [`terminate_count`](https://doc.rust-lang.org/std/sync/atomic/struct.AtomicUsize.html)
* `handler` 處理特殊事件的 closure
`Terminator` 用來在後續 spawn thread 失敗時進行正確的回收,參考 [Rayon#288](https://github.com/rayon-rs/rayon/commit/a29cb5ce05739b4e0b7238843c605d18ae09152e#diff-23de7a86ae4545fe32d8594074c28c5e529381161232fdcb8184d454e5d599d0)
```rust=38
for (index, worker) in workers.into_iter().enumerate() {
let thread = ThreadBuilder {
name: builder.get_thread_name(index),
stack_size: builder.get_stack_size(),
registry: registry.clone(),
worker,
index,
};
if let Err(e) = builder.get_spawn_handler().spawn(thread) {
return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e)));
}
}
// Returning normally now, without termination.
mem::forget(t1000);
Ok(registry.clone())
}
```
對於每個 `worker`,`ThreadBuilder` 被建立,`ThreadBuilder` 的主要功用是透過 `builder` 中的 `spawn_handler` (預設為 [`DefaultSpawn`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/registry.rs#L89)) 來啟動其 [`main_loop`](https://github.com/rayon-rs/rayon/blob/6a01573a180c89d853b7d22d20bd2f123f2df5b4/rayon-core/src/registry.rs#L799),後者建立 worker thread 且 worker thread 會從 work queue 中挑選任務並且執行。
如果每個 thread 都成功建立 [`mem::forget`](https://doc.rust-lang.org/std/mem/fn.forget.html) 正確將釋放記憶體的並避免執行 destructor。自此,全域的 `THE_REGISTRY` 被成功初始化。
## `join_context`
[`join_context`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/join/mod.rs#L115) 的輸入參數是兩個 closure `oper_a` 和 `oper_b`。並且,兩個 closure 被執行的函數 `call_a` 與 `call_b` 被定義,可以看到兩者的差異在於 closure 的輸入參數 [`FnContext`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/lib.rs#L758) 之成員 `migrated` 來源不同。一個是由 `call` 函式輸入參數所定義,然後回傳成沒有輸入的 closure,另一個則是需要在使用 closure 時去定義輸入。
```rust=
pub fn join_context<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
where
A: FnOnce(FnContext) -> RA + Send,
B: FnOnce(FnContext) -> RB + Send,
RA: Send,
RB: Send,
{
#[inline]
fn call_a<R>(f: impl FnOnce(FnContext) -> R, injected: bool) -> impl FnOnce() -> R {
move || f(FnContext::new(injected))
}
#[inline]
fn call_b<R>(f: impl FnOnce(FnContext) -> R) -> impl FnOnce(bool) -> R {
move |migrated| f(FnContext::new(migrated))
}
...
```
:::info
:notes:
1. 擁有 `FnOnce` trait 的 closure 會獲取並消費從周邊作用域的變量,因此有可能只能執行一次(變數的生命週期在 closure 執行之後結束)
2. 加上 `move` 可以使 closure 強制獲得變量的所有權
:::
```rust
pub(super) fn in_worker<OP, R>(op: OP) -> R
where
OP: FnOnce(&WorkerThread, bool) -> R + Send,
R: Send,
{
unsafe {
let owner_thread = WorkerThread::current();
if !owner_thread.is_null() {
// Perfectly valid to give them a `&T`: this is the
// current thread, so we know the data structure won't be
// invalidated until we return.
op(&*owner_thread, false)
} else {
global_registry().in_worker_cold(op)
}
}
}
```
`join_context` 會藉由 [`in_worker`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/registry.rs#L864) 來執行一個 closure。`in_worker` 首先判斷執行自己的 thread 是不是 worker thread(具體是通過呼叫 `WorkerThread::current` 來查看 [`WORKER_THREAD_STATE`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/registry.rs#L611),這個 STATE 在每個 thread 初始化時為 null,如果該 thread 是 worker thread,則會被透過 [`set_current`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/registry.rs#L635) 設置),如果是的話則直接執行 closure,否則通過 `global_registry()` 找到 Injector 並把任務丟到 global injector queue 中。
```rust=19
registry::in_worker(|worker_thread, injected| unsafe {
// Create virtual wrapper for task b; this all has to be
// done here so that the stack frame can keep it all live
// long enough.
let job_b = StackJob::new(call_b(oper_b), SpinLatch::new(worker_thread));
let job_b_ref = job_b.as_job_ref();
worker_thread.push(job_b_ref);
```
`in_worker` 中的 closure 首先將 job B 的 reference 加入到 worker queue 中。
```rust=27
// Execute task a; hopefully b gets stolen in the meantime.
let status_a = unwind::halt_unwinding(call_a(oper_a, injected));
let result_a = match status_a {
Ok(v) => v,
Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err),
};
```
直接把 job A 執行了。`unwind::halt_unwinding` 會藉由 [`catch_unwind`](https://doc.rust-lang.org/std/panic/fn.catch_unwind.html) 來執行 closure
:::info
* [`AssertUnwindSafe`](https://doc.rust-lang.org/std/panic/struct.AssertUnwindSafe.html)
* [What is unwind safety?](https://doc.rust-lang.org/stable/std/panic/trait.UnwindSafe.html#what-is-unwind-safety)
:::
```rust=35
// Now that task A has finished, try to pop job B from the
// local stack. It may already have been popped by job A; it
// may also have been stolen. There may also be some tasks
// pushed on top of it in the stack, and we will have to pop
// those off to get to it.
while !job_b.latch.probe() {
if let Some(job) = worker_thread.take_local_job() {
if job == job_b_ref {
// Found it! Let's run it.
//
// Note that this could panic, but it's ok if we unwind here.
let result_b = job_b.run_inline(injected);
return (result_a, result_b);
} else {
worker_thread.execute(job);
}
} else {
// Local deque is empty. Time to steal from other
// threads.
worker_thread.wait_until(&job_b.latch);
debug_assert!(job_b.latch.probe());
break;
}
}
(result_a, job_b.into_result())
```
最後,要確保把 job B 也執行才能返回,這分成以下幾種狀況:
1. 從自己的 work queue 中嘗試取出 job B 來執行,如果依序有其他非 job B 的任務先排近來也要執行掉
2. job B 被其他 thread 偷去執行(stolen),但也還沒做完,此時 thread 可以嘗試在等待的時間裡去偷其他任務過來執行