--- tags: Rust, System Software --- # Rust: Rayon > [Rayon](https://github.com/rayon-rs/rayon) 以學習 Rust、系統軟體的應用技巧為目的,此文章用來記錄針對 Rust 專案的 trace code 過程,並嘗試對專案可能的不足之處提出想法並修改(~~可能的話啦~~)。作為日後查找的用途,比較艱澀的語法或知識等相關文件也會被附註於其中。 ## 簡介 Rayon 是一個平行計算的 Rust 函式庫,提供十分便利的 API 以將序列計算轉換為平行計算(特別在 iterator 的部份)。藉由 Rust 對 ownership 概念的嚴格檢查,這個函式庫還確保了 data-race 的問題是被避免的。 關於這個專案的起源與更多介紹,可以閱讀原作者的文章 [Rayon: data parallelism in Rust](https://smallcultfollowing.com/babysteps/blog/2015/12/18/rayon-data-parallelism-in-rust/)。 ## `par_iter` 讓我們從該專案中所展示的最基本的使用方式開始。如下是一個將陣列的每個元素進行平方和的函式: ```rust fn sum_of_squares(input: &[i32]) -> i32 { input.par_iter() // <-- just change that! .map(|&i| i * i) .sum() } ``` ### `IntoParallelRefIterator` 由於 rayon 對 array reference 實現了 `IntoParallelRefIterator` trait,因此該型別中被安插了 `par_iter` 這個 method。該 trait 之定義如下: ```rust pub trait IntoParallelIterator { type Iter: ParallelIterator<Item = Self::Item>; type Item: Send; fn into_par_iter(self) -> Self::Iter; } pub trait IntoParallelRefIterator<'data> { type Iter: ParallelIterator<Item = Self::Item>; type Item: Send + 'data; fn par_iter(&'data self) -> Self::Iter; } ``` type 關鍵字是用來標注為每個結構定義 trait 時的 `associated types`: > * [Advanced Traits](https://doc.rust-lang.org/book/ch19-03-advanced-traits.html) > * [Associated Type](http://web.mit.edu/rust-lang_v1.25/arch/amd64_ubuntu1404/share/doc/rust/html/book/first-edition/associated-types.html) > * [Associated items](https://doc.rust-lang.org/rust-by-example/generics/assoc_items.html) 其中,可以看到 `par_iter` 回傳的型別被要求具備 [`ParallelIterator`](https://github.com/rayon-rs/rayon/blob/3153b48/src/iter/mod.rs#L348) trait,後者是 Rayon 所定義專屬於平行用途的 iterator 所需具備的 trait,具有和[標準庫的 iterator](https://doc.rust-lang.org/std/iter/trait.Iterator.html) 相容的 method,我們會嘗試在後續進行深入探討。 > * [Matching a generic parameter to an associated type in an impl](https://stackoverflow.com/questions/29345708/matching-a-generic-parameter-to-an-associated-type-in-an-impl) (沒找到語法的官方術語? 待補充更詳盡的說明) ```rust impl<'data, T: Sync + 'data> IntoParallelIterator for &'data [T] { type Item = &'data T; type Iter = Iter<'data, T>; fn into_par_iter(self) -> Self::Iter { Iter { slice: self } } } impl<'data, I: 'data + ?Sized> IntoParallelRefIterator<'data> for I where &'data I: IntoParallelIterator, { type Iter = <&'data I as IntoParallelIterator>::Iter; type Item = <&'data I as IntoParallelIterator>::Item; fn par_iter(&'data self) -> Self::Iter { self.into_par_iter() } } ``` 而在 Rayon 中對 slice reference 的 trait 實現則如上程式碼所示。可以發現無論是使用 `input.par_iter()` 或者 `input.into_par_iter()`,對於這個 slice refererence 來說最後都是得到一個 [immutable slice iterator](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/slice/mod.rs#L545)。 :::info :notes: `type Iter = Iter<'data, T>` statement 中,第一個 `Iter` 是 associated type 的 keyword,第二個 [`Iter`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/slice/mod.rs#L545) 則是 Rayon 所定義的 slice iterator,兩者並不相同 ::: ### `map` 接著,Iter 的 `map` 方法被呼叫(來自於 `ParallelIterator` trait): ```rust fn map<F, R>(self, map_op: F) -> Map<Self, F> where F: Fn(Self::Item) -> R + Sync + Send, R: Send, { Map::new(self, map_op) } ``` 實際上是先透過其 [Map](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/map.rs#L15) 去建立相關資料結構。map 中將包含前面建立出來的 `Iter` 本身,以及 [closure](https://doc.rust-lang.org/book/ch13-01-closures.html)。 ```rust fn sum<S>(self) -> S where S: Send + Sum<Self::Item> + Sum<S>, { sum::sum(self) } pub(super) fn sum<PI, S>(pi: PI) -> S where PI: ParallelIterator, S: Send + Sum<PI::Item> + Sum, { pi.drive_unindexed(SumConsumer::new()) } ``` ### `sum` 接著再透過 `sum` 方法(`Map` 本身也具有 `ParallelIterator` trait)來作總和的計算。對於 `sum` 會用到的 `drive_unindexed`,`Map` 的 [`drive_unindexed`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/map.rs#L44) 會呼叫底下的 `Iter` 之 [`drive_unindexed`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/slice/mod.rs#L555) 結構之實作。 ```rust pub fn bridge<I, C>(par_iter: I, consumer: C) -> C::Result where I: IndexedParallelIterator, C: Consumer<I::Item>, { let len = par_iter.len(); return par_iter.with_producer(Callback { len, consumer }); struct Callback<C> { len: usize, consumer: C, } impl<C, I> ProducerCallback<I> for Callback<C> where C: Consumer<I>, { type Output = C::Result; fn callback<P>(self, producer: P) -> C::Result where P: Producer<Item = I>, { bridge_producer_consumer(self.len, producer, self.consumer) } } } ``` ### `bridge` 最終,`sum` 會呼叫到 `bridge`,參數的 cosumer 為 [`MapConsumer`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/map.rs#L167),`MapConsumer` 中則包含主體的 [`SumConsumer`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/sum.rs#L19) 和其 closure。 由於 Iter 也具有 [`IndexedParallelIterator`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/mod.rs#L2244) trait,所以其需要實作的 [3 個 method](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/slice/mod.rs#L570) 如下所示: ```rust impl<'data, T: Sync + 'data> IndexedParallelIterator for Iter<'data, T> { fn drive<C>(self, consumer: C) -> C::Result where C: Consumer<Self::Item>, { bridge(self, consumer) } fn len(&self) -> usize { self.slice.len() } fn with_producer<CB>(self, callback: CB) -> CB::Output where CB: ProducerCallback<Self::Item>, { callback.callback(IterProducer { slice: self.slice }) } } ``` 綜合來看,[`bridge`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/plumbing/mod.rs#L351) 函式中會將 slice 長度 `len` 和傳遞進來的 `MapConsumer` 建立為 `Callback` 結構,作為 `with_producer` 的參數。`with_producer` 中則建立一個 [`IterProducer`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/slice/mod.rs#L590),最後這些參數被總結成 [`bridge_producer_consumer`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/plumbing/mod.rs#L391) 的呼叫: ### `bridge_producer_consumer` ```rust bridge_producer_consumer(len, IterProducer, MapConsumer) ``` > * [Producer](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/plumbing/mod.rs#L59) trait > * [Consumer](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/plumbing/mod.rs#L128) trait ```rust= pub fn bridge_producer_consumer<P, C>(len: usize, producer: P, consumer: C) -> C::Result where P: Producer, C: Consumer<P::Item>, { let splitter = LengthSplitter::new(producer.min_len(), producer.max_len(), len); return helper(len, false, splitter, producer, consumer); fn helper<P, C>( len: usize, migrated: bool, mut splitter: LengthSplitter, producer: P, consumer: C, ) -> C::Result where P: Producer, C: Consumer<P::Item>, { if consumer.full() { consumer.into_folder().complete() } ... ``` `bridge_producer_consumer` 會建立一個 [`LengthSplitter`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/plumbing/mod.rs#L302),然後透過 `helper` 進行核心的計算。`LengthSplitter` 會紀錄一個停止 split 的數值 `min` 以及一個至多可以 split 多少次的數值 * 在 `SumConsumer` 的 `if consumer.full()` 恆為 false ```rust=24 else if splitter.try_split(len, migrated) { let mid = len / 2; let (left_producer, right_producer) = producer.split_at(mid); let (left_consumer, right_consumer, reducer) = consumer.split_at(mid); let (left_result, right_result) = join_context( |context| { helper( mid, context.migrated(), splitter, left_producer, left_consumer, ) }, |context| { helper( len - mid, context.migrated(), splitter, right_producer, right_consumer, ) }, ); reducer.reduce(left_result, right_result) } ``` `try_split` 決定是否將 consumer 的 slice 進行分割運算,如果確定要進行,則對 `producer` 呼叫 [`split_at`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/slice/mod.rs#L602),將 slice 從中間的 index 分開成兩份,建立出兩個新的 producer。對於 `MapConsumer`,`consumer` 的 [`split_at`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/map.rs#L188) 實際上則從一個 consumer 結構中建立出 3 個新的 consumer。 [`join_context`](https://github.com/rayon-rs/rayon/blob/3c7489e16816b9a90404d47cb5f5601bdda193e4/rayon-core/src/join/mod.rs#L115) 則是整個 Rayon library 最核心之處的設計,我們會在後面章節深入探討。但整體而言,即是遞迴將任務先做分割(divide)。最後,`reducer` 藉由 [`reduce`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/sum.rs#L73) 方法,將兩個 helper 各自得到的 sum 結果進行相加操作,整合(conquer)兩個任務的結果。 ```rust=50 else { producer.fold_with(consumer.into_folder()).complete() } } } } ``` 如果不再 divde,則 `consumer.into_folder()` 產生一個 [`MapFolder`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/map.rs#L224),後者底下包含一個 [`SumFolder`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/sum.rs#L78) 結構。而 [`producer.fold_with`](https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/mod.rs#L106) 可以透過 `MapFolder` 的 [`consume_iter`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/map.rs#L244) 先將 producer 的 slice 之每個元素透過 closure 進行計算,其結果再藉由 `SumFolder` 之 [`consume_iter`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/sum.rs#L94) 進行逐一相加,最終再透過 [`complete`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/sum.rs#L103) 方法得到相加之結果並且返回。 在此之外,我們可以從實作中發現到: 對 iterator 的 `map` 操作並沒有立刻被計算,而是直到後續的 `sum` 才被計算,符合了 [functional programing](https://en.wikipedia.org/wiki/Functional_programming) 的 [lazy evaluation](https://en.wikipedia.org/wiki/Lazy_evaluation) 特性。 ## Rayon 的基本環境 在探討 Rayon 核心的 `join` primitive 之前,有一個關鍵的問題是,我們在此前的程式碼追蹤中未看到 thread pool 等讓 Rayon 可以分配核心資源的程式碼是在哪裡執行的。 我們可以注意到範例的程式碼並不需要先顯式的啟動環境。事實上,Rayon 會在程式內部呼叫 [`global_registry`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/registry.rs#L167),取得分配任務需要的 `Registry` 結構,如果 `global_registry` 是第一次被呼叫則會將此結構進行初始化,以準備好必要的 Rayon 基本環境。 以前述的 `par_iter` 使用案例來說,環境的初始化是在建立 `LengthSplitter` 的時候一併完成的。`LengthSplitter` 內部會建立 [`Splitter`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/src/iter/plumbing/mod.rs#L256),而後者會呼叫 [`current_num_threads`](https://github.com/rayon-rs/rayon/blob/3c7489e16816b9a90404d47cb5f5601bdda193e4/rayon-core/src/lib.rs#L108)(最後呼叫到 `Registry` 內部的 [`current_num_threads`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/registry.rs#L281))。在這裡,Rayon 的核心資料結構就透過 `global_registry` 進行建立。 ### `global_registry` ```rust pub(super) fn global_registry() -> &'static Arc<Registry> { set_global_registry(|| Registry::new(ThreadPoolBuilder::new())) .or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) }) .expect("The global thread pool has not been initialized.") } fn set_global_registry<F>(registry: F) -> Result<&'static Arc<Registry>, ThreadPoolBuildError> where F: FnOnce() -> Result<Arc<Registry>, ThreadPoolBuildError>, { let mut result = Err(ThreadPoolBuildError::new( ErrorKind::GlobalPoolAlreadyInitialized, )); THE_REGISTRY_SET.call_once(|| { result = registry() .map(|registry: Arc<Registry>| unsafe { &*THE_REGISTRY.get_or_insert(registry) }) }); result } ``` 整個 `global_registry` 的關鍵為 1. [`THE_REGISTRY_SET`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/registry.rs#L162) 是一個 [`Once`](http://web.mit.edu/rust-lang_v1.25/arch/amd64_ubuntu1404/share/doc/rust/html/std/sync/struct.Once.html),後者表示一個 one-time global initialization,`call_once` 只有在第一次呼叫時可以執行 2. 第一次執行時會透過 `ThreadPoolBuilder::new()` 建立一個 thread pool builder (但尚未初始化),再透過 `Registry::new(ThreadPoolBuilder::new()` 初始化該 thread pool builder 以及 [`THE_REGISTRY`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/registry.rs#L161) 3. `call_once` 不能執行則需要檢查第一次執行是否成功 :::info 對於 Result 的一些相關操作: * [`or_else`](https://doc.rust-lang.org/std/result/enum.Result.html#method.or_else) * [`ok_or`](https://doc.rust-lang.org/std/option/enum.Option.html#method.ok_or) * [`get_or_insert`](https://doc.rust-lang.org/std/option/enum.Option.html#method.get_or_insert) ::: ### [`Registry::new`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/registry.rs#L211) ```rust= pub(super) fn new<S>( mut builder: ThreadPoolBuilder<S>, ) -> Result<Arc<Self>, ThreadPoolBuildError> where S: ThreadSpawn, { let n_threads = builder.get_num_threads(); let breadth_first = builder.get_breadth_first(); let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads) .map(|_| { let worker = if breadth_first { Worker::new_fifo() } else { Worker::new_lifo() }; let stealer = worker.stealer(); (worker, stealer) }) .unzip(); ``` * [`get_num_threads`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/lib.rs#L400) 初始化並回傳 threadpool 裡的 thread 數量(根據實際 cpu 或環境變數) * `get_breadth_first` 決定 workqueue 取任務的方向,參考[註解](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/lib.rs#L509) * 根據 thread 的數量建立各自的 worker / stealer,參考 [`crossbeam-deque`](https://docs.rs/crossbeam-deque/0.8.1/crossbeam_deque/) ```rust=23 let logger = Logger::new(n_threads); let registry = Arc::new(Registry { logger: logger.clone(), thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(), sleep: Sleep::new(logger, n_threads), injected_jobs: Injector::new(), terminate_count: AtomicUsize::new(1), panic_handler: builder.take_panic_handler(), start_handler: builder.take_start_handler(), exit_handler: builder.take_exit_handler(), }); // If we return early or panic, make sure to terminate existing threads. let t1000 = Terminator(&registry); ``` 建立新的 `Registry` 結構: * [`logger`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/log.rs#L109): 記錄事件發生(debug 用) * [`thread_infos`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/registry.rs#L553): 每個 thread 都有自己對應的 `stealer` 可以取走該 thread 之 work queue 上任務(`stealer` 可以在 thread 之間共享,因此可以使用其他 thread 的 `stealer` 取走屬於其他 thread 之 work queue 的任務) * [`sleep`](https://github.com/rayon-rs/rayon/tree/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/sleep): 與 worker thread 的休眠與喚醒相關 * [`injected_jobs`](https://docs.rs/crossbeam-deque/0.8.1/crossbeam_deque/): 在 [`crossbeam-deque`](https://docs.rs/crossbeam-deque/0.8.1/crossbeam_deque/) 中的 global FIFO queue * [`terminate_count`](https://doc.rust-lang.org/std/sync/atomic/struct.AtomicUsize.html) * `handler` 處理特殊事件的 closure `Terminator` 用來在後續 spawn thread 失敗時進行正確的回收,參考 [Rayon#288](https://github.com/rayon-rs/rayon/commit/a29cb5ce05739b4e0b7238843c605d18ae09152e#diff-23de7a86ae4545fe32d8594074c28c5e529381161232fdcb8184d454e5d599d0) ```rust=38 for (index, worker) in workers.into_iter().enumerate() { let thread = ThreadBuilder { name: builder.get_thread_name(index), stack_size: builder.get_stack_size(), registry: registry.clone(), worker, index, }; if let Err(e) = builder.get_spawn_handler().spawn(thread) { return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e))); } } // Returning normally now, without termination. mem::forget(t1000); Ok(registry.clone()) } ``` 對於每個 `worker`,`ThreadBuilder` 被建立,`ThreadBuilder` 的主要功用是透過 `builder` 中的 `spawn_handler` (預設為 [`DefaultSpawn`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/registry.rs#L89)) 來啟動其 [`main_loop`](https://github.com/rayon-rs/rayon/blob/6a01573a180c89d853b7d22d20bd2f123f2df5b4/rayon-core/src/registry.rs#L799),後者建立 worker thread 且 worker thread 會從 work queue 中挑選任務並且執行。 如果每個 thread 都成功建立 [`mem::forget`](https://doc.rust-lang.org/std/mem/fn.forget.html) 正確將釋放記憶體的並避免執行 destructor。自此,全域的 `THE_REGISTRY` 被成功初始化。 ## `join_context` [`join_context`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/join/mod.rs#L115) 的輸入參數是兩個 closure `oper_a` 和 `oper_b`。並且,兩個 closure 被執行的函數 `call_a` 與 `call_b` 被定義,可以看到兩者的差異在於 closure 的輸入參數 [`FnContext`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/lib.rs#L758) 之成員 `migrated` 來源不同。一個是由 `call` 函式輸入參數所定義,然後回傳成沒有輸入的 closure,另一個則是需要在使用 closure 時去定義輸入。 ```rust= pub fn join_context<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB) where A: FnOnce(FnContext) -> RA + Send, B: FnOnce(FnContext) -> RB + Send, RA: Send, RB: Send, { #[inline] fn call_a<R>(f: impl FnOnce(FnContext) -> R, injected: bool) -> impl FnOnce() -> R { move || f(FnContext::new(injected)) } #[inline] fn call_b<R>(f: impl FnOnce(FnContext) -> R) -> impl FnOnce(bool) -> R { move |migrated| f(FnContext::new(migrated)) } ... ``` :::info :notes: 1. 擁有 `FnOnce` trait 的 closure 會獲取並消費從周邊作用域的變量,因此有可能只能執行一次(變數的生命週期在 closure 執行之後結束) 2. 加上 `move` 可以使 closure 強制獲得變量的所有權 ::: ```rust pub(super) fn in_worker<OP, R>(op: OP) -> R where OP: FnOnce(&WorkerThread, bool) -> R + Send, R: Send, { unsafe { let owner_thread = WorkerThread::current(); if !owner_thread.is_null() { // Perfectly valid to give them a `&T`: this is the // current thread, so we know the data structure won't be // invalidated until we return. op(&*owner_thread, false) } else { global_registry().in_worker_cold(op) } } } ``` `join_context` 會藉由 [`in_worker`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/registry.rs#L864) 來執行一個 closure。`in_worker` 首先判斷執行自己的 thread 是不是 worker thread(具體是通過呼叫 `WorkerThread::current` 來查看 [`WORKER_THREAD_STATE`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/registry.rs#L611),這個 STATE 在每個 thread 初始化時為 null,如果該 thread 是 worker thread,則會被透過 [`set_current`](https://github.com/rayon-rs/rayon/blob/3153b486c7161d5f47858e8fa8207172cc5b359b/rayon-core/src/registry.rs#L635) 設置),如果是的話則直接執行 closure,否則通過 `global_registry()` 找到 Injector 並把任務丟到 global injector queue 中。 ```rust=19 registry::in_worker(|worker_thread, injected| unsafe { // Create virtual wrapper for task b; this all has to be // done here so that the stack frame can keep it all live // long enough. let job_b = StackJob::new(call_b(oper_b), SpinLatch::new(worker_thread)); let job_b_ref = job_b.as_job_ref(); worker_thread.push(job_b_ref); ``` `in_worker` 中的 closure 首先將 job B 的 reference 加入到 worker queue 中。 ```rust=27 // Execute task a; hopefully b gets stolen in the meantime. let status_a = unwind::halt_unwinding(call_a(oper_a, injected)); let result_a = match status_a { Ok(v) => v, Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err), }; ``` 直接把 job A 執行了。`unwind::halt_unwinding` 會藉由 [`catch_unwind`](https://doc.rust-lang.org/std/panic/fn.catch_unwind.html) 來執行 closure :::info * [`AssertUnwindSafe`](https://doc.rust-lang.org/std/panic/struct.AssertUnwindSafe.html) * [What is unwind safety?](https://doc.rust-lang.org/stable/std/panic/trait.UnwindSafe.html#what-is-unwind-safety) ::: ```rust=35 // Now that task A has finished, try to pop job B from the // local stack. It may already have been popped by job A; it // may also have been stolen. There may also be some tasks // pushed on top of it in the stack, and we will have to pop // those off to get to it. while !job_b.latch.probe() { if let Some(job) = worker_thread.take_local_job() { if job == job_b_ref { // Found it! Let's run it. // // Note that this could panic, but it's ok if we unwind here. let result_b = job_b.run_inline(injected); return (result_a, result_b); } else { worker_thread.execute(job); } } else { // Local deque is empty. Time to steal from other // threads. worker_thread.wait_until(&job_b.latch); debug_assert!(job_b.latch.probe()); break; } } (result_a, job_b.into_result()) ``` 最後,要確保把 job B 也執行才能返回,這分成以下幾種狀況: 1. 從自己的 work queue 中嘗試取出 job B 來執行,如果依序有其他非 job B 的任務先排近來也要執行掉 2. job B 被其他 thread 偷去執行(stolen),但也還沒做完,此時 thread 可以嘗試在等待的時間裡去偷其他任務過來執行