Try   HackMD

Crust of Rust : Channels

直播錄影

  • 主機資訊
    ​​​​wilson@wilson-HP-Pavilion-Plus-Laptop-14-eh0xxx ~> neofetch --stdout
    ​​​​wilson@wilson-HP-Pavilion-Plus-Laptop-14-eh0xxx 
    ​​​​----------------------------------------------- 
    ​​​​OS: Ubuntu 22.04.3 LTS x86_64 
    ​​​​Host: HP Pavilion Plus Laptop 14-eh0xxx 
    ​​​​Kernel: 6.2.0-39-generic 
    ​​​​Uptime: 5 mins 
    ​​​​Packages: 2373 (dpkg), 13 (snap) 
    ​​​​Shell: bash 5.1.16 
    ​​​​Resolution: 3840x2160 
    ​​​​DE: GNOME 42.9 
    ​​​​WM: Mutter 
    ​​​​WM Theme: Adwaita 
    ​​​​Theme: Yaru-dark [GTK2/3] 
    ​​​​Icons: Yaru [GTK2/3] 
    ​​​​Terminal: gnome-terminal 
    ​​​​CPU: 12th Gen Intel i5-12500H (16) @ 4.500GHz 
    ​​​​GPU: Intel Alder Lake-P 
    ​​​​Memory: 4688MiB / 15695MiB
    
  • Rust 編譯器版本 :
    ​​​​wilson@wilson-HP-Pavilion-Plus-Laptop-14-eh0xxx ~/CrustOfRust> rustc --version
    ​​​​rustc 1.70.0 (90c541806 2023-05-31) (built from a source tarball)
    

Introduction

0:00:00

In this (fifth) Crust of Rust video, we cover multi-produce/single-consumer (mpsc) channels, by re-implementing some of the std::sync::mpsc types from the standard library. As part of that, we cover what channels are used for, how they work at a high level, different common channel variants, and common channel implementations. In the process, we go over some common Rust concurrency primitives like Mutex and Condvar.

Module std::sync::mpsc

MPSC(Multiple Producer Single Consumer : many-to-one channel) 是標準函式庫的 channel 實作,channel 是從 A 地傳送 (send) 資料,然後從 B 地接收 (receive) 資料。

crate 提供 Receiver 型別, Sender 型別, SyncSender 型別, 等等。

channel 是單向的,Sender 只能傳送,Receiver 只能接收。你可以 clone Sender,但你不可以 clone Receiver,因為現在是 MPSC。

Q : what does crossbeam do differently to the std lib? I forget
A : 等實作完了之後再來講解。

Q : what kind of data can I send through a channel?
A : 看到標準函式庫的 channel 函式簽章以及 Sender 結構簽章 :

pub fn channel<T>() -> (Sender<T>, Receiver<T>) pub struct Sender<T> { /* private fields */ }

當你創一個 channel 時,你可以透過 channel 傳送任意 T 型別。

Q : The data has to be Send though, right?
A : 並不完全是,假設你現在創建一個 channel,但你永遠不會將Sender 或 Receiver 交給不同的執行緒,然後 T 正在同一執行緒上傳送,因此實際上 T 不用是 Send,因為你不會將 Sender 或 Receiver 跨執行緒邊界移動,但若如果你讓它們跨執行緒邊界的話,那就必須要是 Send。
標準函式庫有做出 Sender 是不是 Send 的區別 :

impl<T: Send> Send for Sender<T> // Sender 是 `Send` 如果 T 是 `Send`

Q : Are there any constraints on what kinds of types you can send through the channel though? Does it have to be Send?
A : 沒有其他 constraint,至於要不要是 Send,請看上一題。

Q : does the data need to be sized or can it be dyn?
A : 它必須要是 Sized。前面的直播有提到,Sized trait 是 auto trait,但它也是 auto bound,除非你用 ?Sized 來表示某東西不需要是 Sized,不然其他情況的 T 都必須要是 Sized。

Q : While you go, can you point out any differences between other implementations of channels, if you happen to know?
A : 等等會看到。

Q : How is the sender thread distinguished from the reciever? Thanks!
A : 它們有不同的型別。當你創建 channel 時,你會拿到 Sender 型別以及 Receiver 型別。

Q : can the data be non-'static?
A : 可以,但你必須擁有它。

Q : What's the point of a channel if you're not going between threads
A : 在一些情況下你可能不會在執行緒之間切換,但你可能想要concurrently 執行,但不要 parallel 執行。你可能有一個執行緒,像是 event 迴圈或其他東西,它最終可能會傳送給自己,並且您可能仍然需要 Sender 和 Receiver abstraction。

Q : who owns the data in the channel? the channel object?
A : channel 型別本身擁有 T,所以當你在 channel 上 T,但是你沒有接收到 T,你卸除 Sender 和 Receiver,channel 會確保 T 被卸除。

Q : What is the performance impact of the channel
A : 等我們實作的時候會去談到。

Q : how does it do backpreasure?
A : 等我們實作的時候會去談到。

Initial structure

0:08:20

開始建置 Rust 專案 :

$ cargo new --lib panama
$ cd panama
$ vim src/lib.rs

先宣告基本的結構和函式 :

pub struct Sender<T> {} pub struct Receiver<T> {} pub fn channel<T>() -> (Sender<T>, Receiver<T>) {} // 一般回傳的 tuple 值都是 Sender 擺前面,Receiver 擺後面

本次實作的 channel 會是相當直覺的版本,程式碼會用到一些有用的並行原始物件 (Mutex, Arc, Condvar),但不會是效能最好的,因為實作效能非常好的 channel 需要更多的細節和技巧。

Mutex 相關函式簽章:

impl<T> Mutex<T> pub fn lock(&self) -> LockResult<MutexGuard<'_, T>> // 回傳 MutexGuard // 當你擁有 MutexGuard,你被保證是唯一可以存取 T 的人,T 受 mutex 保護。 // 當有多個執行緒想爭奪 lock 時,只有 1 個執行緒能取得 lock, // 並且對 T 進行修改。沒有搶到 lock 的其他執行緒都會被 block。

我們可以使用 Arc (Atomically Reference Counted) 來跨越執行緒邊界。我們想要實作的 chaanel,它在單個線程上並非沒有用處,但我們當然希望它能夠跨執行緒邊界工作,這也是我們使用 mutex 而不是像 refcell 這種東西的原因。

Condvar 是一種向不同執行緒宣布您已經更改了它所關心的東西的方法。假設有一個 Receiver 正在等待,因為還沒有資料可以接收,接著 Sender 傳送了資料,Sender 必須喚醒正在 sleep 的 Receiver,跟 Receiver 說你有資料可以讀了。

接著用常常在 Rust 定義看見的結構,這個結構會有很多東西指向該結構且該結構持有的值是共享的,我們會將它宣告成 Inner :

struct Inner<T> { queue: Vec<T>, }

接著定義 Sender 和 Receiver 的成員 :

use std::sync::{Arc, Condvar, Mutex}; pub struct Sender<T> { inner: Arc<Mutex<Inner<T>>>, // 參考網站 : // https://rust-lang.tw/book-tw/ch16-03-shared-state.html // 為什麼 Mutex 最外層要用 Arc 包起來 ?, // 因為 Mutex 本身的所有權是不能移至多個執行緒中, // 必須使用到多個所有權分配到多個執行緒的方法,也就是 Arc。 // 至於不能用 Rc 的原因是 Rc 不是 thread safety。 } pub struct Receiver<T> { inner: Arc<Mutex<Inner<T>>>, }

接著實作 channel :

pub fn channel<T>() -> (Sender<T>, Receiver<T>) { let inner = Inner { queue: Vec::new() }; let inner = Arc::new(Mutex::new(inner)); ( Sender { // 增加參考計數 inner: inner.clone(), }, Receiver { inner: inner.clone(), } ) }
目前程式碼
use std::sync::{Arc, Condvar, Mutex}; pub struct Sender<T> { inner: Arc<Mutex<Inner<T>>>, } pub struct Receiver<T> { inner: Arc<Mutex<Inner<T>>>, } struct Inner<T> { queue: Vec<T>, } pub fn channel<T>() -> (Sender<T>, Receiver<T>) { let inner = Inner { queue: Vec::new() }; let inner = Arc::new(Mutex::new(inner)); ( Sender { inner: inner.clone(), }, Receiver { inner: inner.clone(), } ) }

Structure Q&A

0:14:27

Q : refcell does runtime borrow checking right?
A : 是的。Mutex 在某種意義上來說也是,但 Mutex 不是 borrow check,所以更多的是 borrow 和 force。如果有兩個執行緒在同一個時間想存取同一個東西,Mutex 會 block 其中一個執行緒;RefCell 則告訴你 "你現在無法得到這個 mutably"。

Q : Why does the Condvar always need Mutex guard?
A : 當我們的實作用到 Condvar 的時候再來解釋。

Q : why not make it a linkedlist?
A : 我們將討論替代實作。

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
0:15:30
Q : Is it possible to specialize the struct so that if T is not send, the Arc Mutex would just be a RC
A : 沒辦法很簡單的做到,你無法 specialize 定義。你必須要擁有 unsynced 或是 unsend,Jon 認為有些用例不如執行緒 channel 那樣清晰。一般來說,用於 channel 的東西是 Send。

Q : I've seen a lot of people using Mutexes from parking lot and channels from crossbeam. Does condvar have a similar 'better' implementation you know of?
A : parking lot 也提供 parking 和 notification (這就是 Condvar 給你的),所以你也可以使用 parking lot。有些對話提到,嘗試要將 parking lot 實作的東西,像是 Mutex 和 Condvar,使它們成為標準函式庫,某天可能會發生。

Q : Why would you not put the Mutex in Inner?
A : 我們可以這麼做,如下:

use std::sync::{Arc, Condvar, Mutex}; pub struct Sender<T> { inner: Arc<Inner<T>>, } pub struct Receiver<T> { inner: Arc<Inner<T>>, } struct Inner<T> { queue: Mutex<Vec<T>>, } pub fn channel<T>() -> (Sender<T>, Receiver<T>) { let inner = Inner { queue: Mutex::default() }; let inner = Arc::new(inner); ( Sender { inner: inner.clone(), }, Receiver { inner: inner.clone(), } ) }

Q : Why does the receiver type need to have the Arc protected by a mutex if the channel may only have a single consumer thread?
A : 因為傳送和接收有可能同時發生,但它們之間也必須 mutual exclusive。

Q : is there a difference between an Arc<Mutex<T>> and a boolean semaphore?
A : Mutex 從表層看的話其實是 boolean semaphore,所以單看表層的話沒有差異的,但底層的機制卻有所不同。Jon 認為沒有理由在 Mutex 的實作上使用 boolean semaphore。特別是,Mutex 為你帶來的是它與作業系統實作的 parking 機制和 user mode futex 整合在一起。boolean semaphore 基本上是你檢查和 atomically 更新的 boolean flag。問題是如果當前設置了該 flag,表示某個某個持有 lock 的執行緒在 critical section,如果你使用的是 boolean semaphore,那麼你要做的就是使用必須 spin,也就是反覆去檢查 flag 直到 flag 可用;而如果你使用的是 Mutex,作業系統可以將執行緒置於 sleep 狀態,並在 Mutex 可用時喚醒它,這通常更高效,儘管會增加一點 latency。

Q : why arc is needed ?
A : 如果改成以下這樣 :

pub struct Sender<T> { inner: Inner<T>, } pub struct Receiver<T> { inner: Inner<T>, }

那麼 Sender 和 Receiver 擁有的會是兩個不同的 inner 的 instance,如果它們的 inner 不同的話就無法溝通了。

send and recv

0:19:20

開始實作 Sender :

impl<T> Sender<T> { // 先不回傳值看看等等會遇到什麼問題 pub fn send(&mut self, t: T) { // 本次實作直接 unwrap,不考慮 panic 發生的情形 // Jon 一開始忘記加 mut,我自己先加了。 let mut queue = self.inner.queue.lock().unwrap(); queue.push(t); } } impl<T> Receiver<T> { pub fn recv(&mut self) -> T { // Jon 一開始忘記加 mut,我自己先加了。 let mut queue = self.inner.queue.lock().unwrap(); // 回傳 Option 型態,因為 Vec 可能沒資料。 // 編譯器告訴我們回傳型態與函式簽章不吻合。 // 你可以實作 try_recv 來接收值, // 但我們等等要實作的是 blocking 版本的 recv, // 也就是使用到 Condvar。 queue.pop() } }

Line 7 : lock 函式回傳 LockResult 型別,假設最後一個執行緒拿到 lock,當它持有 lock 的時候因為在更新東西的過程中導致 panic,使得 lock 現在處於某種不太不一致的狀態。lock 進行溝通的方式是,當執行緒發生 panic 時,它會釋放 lock,但它也會在其中設置一個小 flag 來表示最後一個執行緒在存取 lock 發生 panic,如下 :

pub type LockResult<Guard> = Result<Guard, PoisonError<Guard>>;

LockResult 不是 Guard 就是 PoisonError,它可以讓你知道其他執行緒在持有 lock 的過程中發生 panic,你之後才知道可以做出對應的處置,當然,你也可以直接忽略 panic 的發生。

目前的 queue 實際上操作起來是 stack,所以要改用 VecDeque,它會持續追蹤指標指向的起始位置以及結束位置,當你 push back 值的時候,指向結束位置的指標會向後移動;當你 pop front 的值的時候起,它會移除起始位置的資料,並將指向起始位置的指標向後移動。指標在移動的過程中可能會發生環繞 :

+use std::collections::VecDeque; ... impl<T> Sender<T> { pub fn send(&mut self, t: T) { let mut queue = self.inner.queue.lock().unwrap(); - queue.push(t); + queue.push_back(t); } } impl<T> Receiver<T> { pub fn recv(&mut self) -> T { let mut queue = self.inner.queue.lock().unwrap(); - queue.pop() + queue.pop_front() } } struct Inner<T> { - queue: Mutex<Vec<T>>, + queue: Mutex<VecDeq<T>>, } ...

接著加入 Condvar 成員來完成 block 版本的 recv :

struct Inner<T> { queue: Mutex<VecDeque<T>>, + available: Condvar, }

Condvar 必須在 Mutex 外面,不然會發生以下情形 :
被喚醒的執行緒 B 必須持有 lock,但喚醒別人的執行緒 A 的目前持有 lock,接著 B 又因為 A 持有 lock,又變回 sleep 的狀態,接著 A 又釋放了 lock ,導致沒有執行緒是 wake 的狀態,最終造成 deadlock - 沒有執行緒有進展,即時應該要有進展,所以 Condvar 必須在 Mutex 外面。

當 A 執行緒通知 B 執行緒之後必須釋放 lock,這就回答了之前的問題 "為什麼 Condvar 總是需要用到 MutexGuard",因為你必須證明你現在有 lock,lock 會確保 atomically 完成步驟。

channel 的 inner 也要指派值給 Condvar 成員 :
(Jon 一開始忘記要指派值給 Condvar 成員,我自己先加了。)

pub fn channel<T>() -> (Sender<T>, Receiver<T>) { let inner = Inner { - queue: Vec::new() + queue: Mutex::default(), + available: Condvar::new(), } ... }

實作 block 版本的 recv :

impl<T> Receiver<T> { pub fn recv(&mut self) -> T { - let mut queue = self.inner.queue.lock().unwrap(); - queue.pop_front() + // 編譯器會告訴你必須要 loop, + // 不然被喚醒之後會直接結束該函式,一樣接收不到值。 + loop { + let mut queue = self.inner.queue.lock().unwrap(); + match queue.pop_front() { + Some(t) => return t, + None => { + self.inner.available.wait(queue); + } + } + } } }

看到 wait 函式簽章 :

// 當執行緒 B 被執行緒 A 喚醒之後, // 會自動取得 lock (MutexGuard),因為 A 會將 lock 交給 B。 // // 注意事項 : // 實作的程式碼要自己 drop lock, // 因為 A 並不會在通知 B 的時候自動 drop lock。 pub fn wait<'a, T>( &self, guard: MutexGuard<'a, T> ) -> LockResult<MutexGuard<'a, T>>

根據 wait 函式簽章的回傳值再把 recv 函式做修正 :

impl<T> Receiver<T> { pub fn recv(&mut self) -> T { + // `let mut queue = ...` 移到 loop 外 + let mut queue = self.inner.queue.lock().unwrap(); loop { - let mut queue = self.inner.queue.lock().unwrap(); match queue.pop_front() { Some(t) => return t, None => { - self.inner.available.wait(queue); + queue = self.inner.available.wait(queue).unwrap(); } } } } }

接著讓 send 函式有喚醒正在 sleep 狀態的執行緒的功能 :

impl<T> Sender<T> { pub fn send(&mut self, t: T) { let mut queue = self.inner.queue.lock().unwrap(); queue.push_back(t); + // 通知別人前,要記得`手動`先釋放 lock。 + + // 為什麼是用 drop 不是 unlock : + // unlock 此函式等效於對 guard 呼叫 drop, + // 但更具有 self-documenting 的特性。 + // 或者,當 guard 超出作用域時,它將被自動 drop。 + // 呼叫 drop(queue), + // 之後離開函式不會有 double free 的問題。 + drop(queue); + // 只通知一個正在 sleep 的執行緒 + self.inner.available.notify_one(); } }
目前程式碼
use std::collections::VecDeque; use std::sync::{Arc, Condvar, Mutex}; pub struct Sender<T> { inner: Arc<Inner<T>>, } impl<T> Sender<T> { pub fn send(&mut self, t: T) { let mut queue = self.inner.queue.lock().unwrap(); queue.push_back(t); drop(queue); self.inner.available.notify_one(); } } pub struct Receiver<T> { inner: Arc<Inner<T>>, } impl<T> Receiver<T> { pub fn recv(&mut self) -> T { let mut queue = self.inner.queue.lock().unwrap(); loop { match queue.pop_front() { Some(t) => return t, None => { queue = self.inner.available.wait(queue).unwrap(); } } } } } struct Inner<T> { queue: Mutex<VecDeque<T>>, available: Condvar, } pub fn channel<T>() -> (Sender<T>, Receiver<T>) { let inner = Inner { queue: Mutex::default(), available: Condvar::new(), }; let inner = Arc::new(inner); ( Sender { inner: inner.clone(), }, Receiver { inner: inner.clone(), } ) }

send/recv Q&A

0:29:03

Q : vector double ended queue is a VecDeq
A : 基本上是。VecDeq 就是有 head 和 tail index 的 vector。

Q : Isn't that kind of loop the raison d'être of async ?
A : 並不全然是,因為我們實作的 recv() 函式並不是 spin 迴圈,而且通常當你的程式是 I/O bound 而不是 CPU bound 時,你才需要 async await。而我們的 recv() 並不是 I/O bound,所以不需要用到 async await。

注意到 recv 函式的其中用到 wait 函式的那一行 :

queue = self.inner.available.wait(queue).unwrap();

當執行緒呼叫 wait 函式的時候,如果這時候執行緒持有 lock,wait 函式會先讓執行緒釋放 lock,再將它 block。notify_one 則不一樣,在喚醒別的執行緒之前要程式設計師自己釋放 lock。

Q : How it's protected from cond var spurious wakeup?
A : 當你呼叫 wait 的時候,作業系統不保證如果你無事可做,你就不會被喚醒,所以我們的 recv 函式才會使用到 loop。假設你現在有一個被喚醒的執行緒,該執行緒因為一些原因被喚醒 (Sender 通知 Receiver 有東西可以取的原因除外),作業系統並不能告訴被喚醒的執行緒是什麼原因喚醒它的,所以我們能做的事就是用 loop 迴圈再次檢查 VecDeq 現在有沒有值可以取,如果沒有值的話就再回到 sleep 的狀態。

Q : But how can someone send? We have the mutex locked now when we're receiving, so it blocks insertions?
A : 所以剛剛才說,呼叫 wait 的執行緒會在 sleep 之前釋放 lock,這樣子 sender 就可以繼續進行了。

Q : wouldn't the lock be dropped after the notify?
A : 先 drop 的原因是因為被喚醒的執行緒可以直接執行。

Q : how does it know which thread notify ?
A : 它不知道。它只知道要喚醒其中一個有用到那個 Condvar 的執行緒。

Q : wouldn't it be nicer to use brackets around let queue = .. and send, instead of drop?
A : 你也可以這樣做,如下 (看你要不要改,本文維持使用 drop ) :

impl<T> Sender<T> { pub fn send(&mut self, t: T) { - let mut queue = self.inner.queue.lock().unwrap(); - queue.push_back(t); + { + let mut queue = self.inner.queue.lock().unwrap(); + queue.push_back(t); + } - drop(queue); self.inner.available.notify_one(); } }

但是 Jon 比較喜歡 explicit 的方式去釋放 lock。

Q : Does we not need to take the lock in the loop than? Since we give it up in previous iteration?
A : notify_one 函式會幫即將要被喚醒的執行緒取得 lock,在 wake 狀態的執行緒不需要自己去取得 lock。

Q : There is a wait with timeout, right?
A : 是的,而且它也會回傳 MutexGuard 型別。

Q : Woudn't only one thread empty the entire queue and only allow other threads again once its empty?
A : 當我們的 recv 執行了 Some(t) => return t, 這行之後就離開函式了,離開函式就會自動釋放 lock 了。

Q : Is there a notification variant which takes the guard to drop it for you?
A : 據 Jon 所知並非如此。

Q : if n threads are waiting, one of them is randomly chosen to be woken up?
A : 是的,另外還有 notify_all 函式可以使用。

Q : is it possible for the queue to be locked between the drop and when the receiver locks, from another sender?
A : 可能有另一個 Sender 也設法將資料 push 到 queue,但這並不會造成問題,Receiver 最終仍然會獲得執行機會。

目前程式碼
use std::collections::VecDeque; use std::sync::{Arc, Condvar, Mutex}; pub struct Sender<T> { inner: Arc<Inner<T>>, } impl<T> Sender<T> { pub fn send(&mut self, t: T) { let mut queue = self.inner.queue.lock().unwrap(); queue.push_back(t); drop(queue); self.inner.available.notify_one(); } } pub struct Receiver<T> { inner: Arc<Inner<T>>, } impl<T> Receiver<T> { pub fn recv(&mut self) -> T { let mut queue = self.inner.queue.lock().unwrap(); loop { match queue.pop_front() { Some(t) => return t, None => { queue = self.inner.available.wait(queue).unwrap(); } } } } } struct Inner<T> { queue: Mutex<VecDeque<T>>, available: Condvar, } pub fn channel<T>() -> (Sender<T>, Receiver<T>) { let inner = Inner { queue: Mutex::default(), available: Condvar::new(), }; let inner = Arc::new(inner); ( Sender { inner: inner.clone(), }, Receiver { inner: inner.clone(), } ) }

Does it work?

0:34:36

接著我們要確保 Sender 是 clonable :

#[derive(Clone)] pub struct Sender<T> { inner: Arc<Inner<T>>, }

如果我們這樣做的話,de-suger 之後會變成以下函式 :

impl<T: Clone> Clone for Sender<T> { fn clone(&self) -> Self { // ... } }

但這樣做有一個問題是,現在的 T 被加上 Clone bound,但我們要 clone 的東西明明就是 Arc,並不需要有 Clone bound,所以要自己實作一個 clone 來用 :

impl<T> Clone for Sender<T> { fn clone(&self) -> Self { Sender { // 通常不這麼寫,原因請看詳細說明 // inner: self.inner.clone(), // 通常這麼寫 inner: Arc::clone(&self.inner), } } }

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
詳細說明
inner: self.inner.clone() 在技術上是合法的,但通常不是你想要寫的。假設你現在的 inner 也實作了 clone,Rust 不知道 .clone() 是要 clone Arc 亦或是 Arc inner 的值,因為 Arc 解參考至 inner 型別,. 運算子遞迴至 inner Deref。

Q : couldnt lock() block?
A : 是的,這就是 lock 的意義。

Q : I don't understand why you're talking about waking up multiple receivers, while implementing MPSC.
A : 你是對的,現在只有一個 Receiver,所以只要用到 send 函式只要使用 notify_one 函式即可。

Q : will there be a max size on the queue? that will need senders to wait
A : 等等會談論到。

Q : It's also easier to read that its a trivial clone that way
A : 是的,透過移除 Clone bound。

Q : Is there a way to disable autoderef
A : 不要使用 . 運算子,改使用統一的方法/呼叫語法即可。

Q : Why does rust bubble EINTR or whatever OS equivalent up into the .wait() function?
A : 它常常別無選擇。wait 函式的實作基本上最終只是作業系統實作,Rust 開發人員希望在除了 poisoning 之外添加盡可能少的內容。Jon 認為 wait 函式特別說它並不能保證你不會得到虛假的喚醒。

開始實作 test case :

#[cfg(test)] mod tests { use super::*; #[test] fn ping_pong() { let (mut tx, mut rx) = channel(); tx.send(42); assert_eq!(rx.recv(), 42); } }

順利通過測試 :

$ cargo test ... running 1 test test tests::ping_pong ... ok ...

Q : Do you think tx and rx are good names for channels? I know std docs use that but I've always hated it
A : 端看個人喜好,Jon 自己很喜歡。

目前程式碼
use std::collections::VecDeque; use std::sync::{Arc, Condvar, Mutex}; pub struct Sender<T> { inner: Arc<Inner<T>>, } impl<T> Clone for Sender<T> { fn clone(&self) -> Self { Sender { inner: Arc::clone(&self.inner), } } } impl<T> Sender<T> { pub fn send(&mut self, t: T) { let mut queue = self.inner.queue.lock().unwrap(); queue.push_back(t); drop(queue); self.inner.available.notify_one(); } } pub struct Receiver<T> { inner: Arc<Inner<T>>, } impl<T> Receiver<T> { pub fn recv(&mut self) -> T { let mut queue = self.inner.queue.lock().unwrap(); loop { match queue.pop_front() { Some(t) => return t, None => { queue = self.inner.available.wait(queue).unwrap(); } } } } } struct Inner<T> { queue: Mutex<VecDeque<T>>, available: Condvar, } pub fn channel<T>() -> (Sender<T>, Receiver<T>) { let inner = Inner { queue: Mutex::default(), available: Condvar::new(), }; let inner = Arc::new(inner); ( Sender { inner: inner.clone(), }, Receiver { inner: inner.clone(), } ) } #[cfg(test)] mod tests { use super::*; #[test] fn ping_pong() { let (mut tx, mut rx) = channel(); tx.send(42); assert_eq!(rx.recv(), 42); } }

Zero senders

0:40:28

如果我們添加新的測試 :

#[test] fn closed_tx() { let (tx, mut rx) = channel::<()>(); let _ = tx; let _ = rx.recv(); }

這個測試會因為 Receiver 在等待永遠等不到的資料而卡住。所以我們要實作一個功能告訴 Receiver 已經沒有 Sender 了。

先做命名的變動,將 Inner 改為 Shared,inner 改為 shared :

... impl<T> Clone for Sender<T> { fn clone(&self) -> Self { Sender { - inner: Arc::clone(&self.inner), + shared: Arc::clone(&self.shared), } } } impl<T> Sender<T> { pub fn send(&mut self, t: T) { - let mut queue = self.shared.queue.lock().unwrap(); + let mut inner = self.shared.inner.lock().unwrap(); - queue.push_back(t); + inner.queue.push_back(t); - drop(queue); + drop(inner); - let mut queue = self.shared.available.notify_one(); + self.shared.available.notify_one(); } } ... impl<T> Receiver<T> { pub fn recv(&mut self) -> T { - let mut queue = self.shared.queue.lock().unwrap(); + let mut inner = self.shared.inner.lock().unwrap(); loop { - match queue.pop_front() { + match inner.queue.pop_front() { Some(t) => return t, None => { - queue = self.shared.available.wait(queue).unwrap(); + inner = self.shared.available.wait(inner).unwrap(); } } } } } // 之所以做這個改動是因為我們想要有額外的資料被 Mutex 保護。 // 這個額外的就是指 sender 的數量。 struct Inner<T> { - queue: Mutex<VecDeque<T>>, + queue: VecDeque<T>, - available: Condvar, + senders: usize, } struct Shared<T> { - queue: Mutex<VecDeque<T>>, + inner: Mutex<Inner<T>>, + available: Condvar, } pub fn channel<T>() -> (Sender<T>, Receiver<T>) { + let inner = Inner { + queue: VecDeque::default(), + senders: 1, + }; let shared = Shared { - queue: Mutex::default(), + inner: Mutex::new(inner), available: Condvar::new(), }; ... } ...

接下來我們希望在 clone Sender 的時候,會增加 senders 的值 :

impl<T> Clone for Sender<T> { fn clone(&self) -> Self { + let mut inner = self.shared.inner.lock().unwrap(); + inner.senders += 1; + drop(inner); Sender { - inner: Arc::clone(&self.inner), + shared: Arc::clone(&self.shared), } } }

同樣的我們希望在 drop Sender 的時候,會減少 senders 的值,為 Senders 實作 drop 函式 :

impl<T> Drop for Sender<T> { fn drop(&mut self) { let mut inner = self.shared.inner.lock().unwrap(); inner.senders -= 1; let was_last = inner.senders == 0; drop(inner); // 如果是最後一個 Sender, // 表示還有一個 Receiver 被 block,需要被喚醒。 // 不然它將永遠不會被喚醒。 if was_last { self.shared.available.notify_one(); } } }

recv 函式也要做修改 :

impl<T> Receiver<T> { + // 回傳型態改成 Option<T> + // 因為 channel 可能因為早就沒有 Sender 而永遠是空的, + // 這時候得 recv 要回傳的是 None 值。 pub fn recv(&mut self) -> Option<T> { let mut inner = self.shared.inner.lock().unwrap(); loop { match inner.queue.pop_front() { - Some(t) => return t, + Some(t) => return Some(t), + // 檢查 sender 的數量 + None if inner.senders == 0 => return None, None => { inner = self.shared.available.wait(inner).unwrap(); } } } } }

測試也要改寫 :

#[cfg(test)] mod tests { use super::*; #[test] fn ping_pong() { let (mut tx, mut rx) = channel(); tx.send(42); - assert_eq!(rx.recv(), 42); + assert_eq!(rx.recv(), Some(42)); } #[test] fn closed_tx() { let (tx, mut rx) = channel::<()>(); let _ = tx; + // 預期這次就不會 hang 住了 - let _ = rx.recv(); + assert_eq!(rx.recv(), None); } }

不幸的是,程式碼經過修改後,在測試的時候竟然還是 hang 住!

目前程式碼
use std::collections::VecDeque; use std::sync::{Arc, Condvar, Mutex}; pub struct Sender<T> { shared: Arc<Shared<T>>, } impl<T> Clone for Sender<T> { fn clone(&self) -> Self { let mut inner = self.shared.inner.lock().unwrap(); inner.senders += 1; drop(inner); Sender { shared: Arc::clone(&self.shared), } } } impl<T> Drop for Sender<T> { fn drop(&mut self) { let mut inner = self.shared.inner.lock().unwrap(); inner.senders -= 1; let was_last = inner.senders == 0; drop(inner); if was_last { self.shared.available.notify_one(); } } } impl<T> Sender<T> { pub fn send(&mut self, t: T) { let mut inner = self.shared.inner.lock().unwrap(); inner.queue.push_back(t); drop(inner); self.shared.available.notify_one(); } } pub struct Receiver<T> { shared: Arc<Shared<T>>, } impl<T> Receiver<T> { pub fn recv(&mut self) -> Option<T> { let mut inner = self.shared.inner.lock().unwrap(); loop { match inner.queue.pop_front() { Some(t) => return Some(t), None if inner.senders == 0 => return None, None => { inner = self.shared.available.wait(inner).unwrap(); } } } } } struct Inner<T> { queue: VecDeque<T>, senders: usize, } struct Shared<T> { inner: Mutex<Inner<T>>, available: Condvar, } pub fn channel<T>() -> (Sender<T>, Receiver<T>) { let inner = Inner { queue: VecDeque::default(), senders: 1, }; let shared = Shared { inner: Mutex::new(inner), available: Condvar::new(), }; let shared = Arc::new(shared); ( Sender { shared: shared.clone(), }, Receiver { shared: shared.clone(), } ) } #[cfg(test)] mod tests { use super::*; #[test] fn ping_pong() { let (mut tx, mut rx) = channel(); tx.send(42); assert_eq!(rx.recv(), Some(42)); } #[test] fn closed_tx() { let (tx, mut rx) = channel::<()>(); let _ = tx; assert_eq!(rx.recv(), None); } }

Q&A

0:46:27


Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
GitHub comment
Q : Since using Arc between Sender and Receiver, why not use Arc::strong_count to check if there's Sender alive? It's a mpsc channel, if strong_count == 1 checking by Receiver, we know there's only one Receiver alive and no other Sender.
A : I actually already talked through that approach in the video :)

Q : Can't the receiver check if shared is unique?
A : 不行。如果將程式碼做以下調整會有 was_last 無法判斷的情形發生 :

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
本程式碼只展示不能這樣做,不要照著這樣做)

... impl<T> Clone for Sender<T> { fn clone(&self) -> Self { - let mut inner = self.shared.inner.lock().unwrap(); - inner.senders += 1; - drop(inner); ... } } impl<T> Drop for Sender<T> { fn drop(&mut self) { - // let mut inner = self.shared.inner.lock().unwrap(); - // inner.senders -= 1; - // let was_last = inner.senders == 0; - // drop(inner); + // 移除了 `Inner::senders` 的話, + // 當 Sender 被 drop 時, + // Sender 就沒辦法知道自己是不是最後一個 Sender, + // Sender 也就不知道是否要通知 Receiver 了。 if was_last { self.shared.available.notify_one(); } } } ... impl<T> Receiver<T> { pub fn recv(&mut self) -> Option<T> { let mut inner = self.shared.inner.lock().unwrap(); loop { match inner.queue.pop_front() { Some(t) => return Some(t), - None if inner.senders == 0 => return None, + None if Arc::strong_count(&self.shared) == 1 => return None, ... } } } } } struct Inner<T> { queue: VecDeque<T>, - senders: usize, } ... pub fn channel<T>() -> (Sender<T>, Receiver<T>) { let inner = Inner { queue: VecDeque::default(), - senders: 1, }; ... } ...

Q : Could you use an AtomicUsize in Shared rather than creating Inner?
A : 可以,儘管獲取 mutex 的那一刻,AtomicUsize 並沒有多大價值,好處只有你不用在 drop 和 clone 的實作 acquire lock 的功能,但 drop 跟 clone 應該相對較少發生,且我們的 crtical section 已經很短了,使用 lock 應該也很快。

Q : wouldn't you want notify all for drop?
A : 不。當最後一個 Sender 離開之後,表示現在只剩一個 Receiver,所以最多只有一個執行緒在 block 的狀態。

Q : can you overflow sender count? Probably not feasible I guess
A : 理論上不太可能。

Q : Is there any immediate benefit to adding it to the mutex rather than using an AtomicUsize?
A : 剛剛已經提到一些了。

Q : What's the difference betweeen VecDeque::new() and VecDeque::default()?
A : 沒有區別。

Q : I think the error was initializing senders to 1 in the constructor and then calling clone on the sender we return ??
A : 我們 channel 函式的 clone 是 clone Shared,而不是 clone Sender,所以 channel 函式的 clone 並不會增加 senders 值。

Q : could you get false sharing inbetween the vecdeque and the sender count?
A : 有可能,但它們在 Mutex 之下,所以沒什麼問題。

Q : Can't you just use the lack of guarantee of no spurious wakeups and just notify every time a sender is dropped?
A : 可以,但會造成一堆額外的喚醒。你會想要避免虛假喚醒,因為虛假喚醒也會帶來效能衝擊。

Why does it hang?

0:50:53

看看卡在哪裡 :

#[test] fn closed_tx() { let (tx, mut rx) = channel::<()>(); let _ = tx; + eprintln!("X"); assert_eq!(rx.recv(), None); }

接著跑測試 :

$cargo test -- --test-threads=1 --nocapture
...
test tests::closed_tx ... X

發現是在呼叫 recv 的時候 hang 住了。

檢查為什麼會卡住,先從 recv 開始檢查 :

impl<T> Receiver<T> { pub fn recv(&mut self) -> Option<T> { let mut inner = self.shared.inner.lock().unwrap(); loop { match inner.queue.pop_front() { Some(t) => return Some(t), - None if inner.senders == 0 => return None, + // 印出 inner.senders 的值 + None if dbg!(inner.senders) == 0 => return None, None => { inner = self.shared.available.wait(inner).unwrap(); } } } } }

接著測試,發現 senders 數量不為 0 :

$ cargo test -- --test-threads=1 --nocapture ... running 2 tests test tests::closed_tx ... [src\lib.rs:61] inner.senders = 1

接著檢查 drop 函式 :

impl<T> Drop for Sender<T> { fn drop(&mut self) { let mut inner = self.shared.inner.lock().unwrap(); + eprint!("drop sender, count was {} ", inner.senders); inner.senders -= 1; let was_last = inner.senders == 0; drop(inner); if was_last { self.shared.available.notify_one(); } } }

再跑一次測試,發現 senders 竟然沒有 drop :

$ cargo test -- --test-threads=1 --nocapture ... running 2 tests test tests::closed_tx ... [src\lib.rs:61] inner.senders = 1

再來改一下 test case 看看 :

#[test] fn closed_tx() { let (tx, mut rx) = channel::<()>(); - let _ = tx; - eprintln!("X"); + drop(tx); assert_eq!(rx.recv(), None); }

再跑一次測試,這次就沒有 hang 住了 :

$ cargo test -- --test-threads=1 --nocapture ... running 2 tests test tests::closed_tx ... drop sender, count was 1 [src\lib.rs:61] inner.senders = 0 ok test tests::ping_pong ... drop sender, count was 1 ok

Jon 的印象是指派給 _ 會立即卸除值,但經過實驗之後,發現這可能不是真的。

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
What happens when assigning to the underscore pattern?
_ means "don't bind this value". When you create a new value on the spot, this means the value is immediately dropped, like you said, since there is no binding to own it.

When you use it with something that's already bound to a variable, the value is not moved, which means the variable retains ownership. So this code works.

let guard = Guard; let _ = guard; let _a = guard; // `guard` still has ownership
目前程式碼
use std::collections::VecDeque; use std::sync::{Arc, Condvar, Mutex}; pub struct Sender<T> { shared: Arc<Shared<T>>, } impl<T> Clone for Sender<T> { fn clone(&self) -> Self { let mut inner = self.shared.inner.lock().unwrap(); inner.senders += 1; drop(inner); Sender { shared: Arc::clone(&self.shared), } } } impl<T> Drop for Sender<T> { fn drop(&mut self) { let mut inner = self.shared.inner.lock().unwrap(); inner.senders -= 1; let was_last = inner.senders == 0; drop(inner); if was_last { self.shared.available.notify_one(); } } } impl<T> Sender<T> { pub fn send(&mut self, t: T) { let mut inner = self.shared.inner.lock().unwrap(); inner.queue.push_back(t); drop(inner); self.shared.available.notify_one(); } } pub struct Receiver<T> { shared: Arc<Shared<T>>, } impl<T> Receiver<T> { pub fn recv(&mut self) -> Option<T> { let mut inner = self.shared.inner.lock().unwrap(); loop { match inner.queue.pop_front() { Some(t) => return Some(t), None if inner.senders == 0 => return None, None => { inner = self.shared.available.wait(inner).unwrap(); } } } } } struct Inner<T> { queue: VecDeque<T>, senders: usize, } struct Shared<T> { inner: Mutex<Inner<T>>, available: Condvar, } pub fn channel<T>() -> (Sender<T>, Receiver<T>) { let inner = Inner { queue: VecDeque::default(), senders: 1, }; let shared = Shared { inner: Mutex::new(inner), available: Condvar::new(), }; let shared = Arc::new(shared); ( Sender { shared: shared.clone(), }, Receiver { shared: shared.clone(), } ) } #[cfg(test)] mod tests { use super::*; #[test] fn ping_pong() { let (mut tx, mut rx) = channel(); tx.send(42); assert_eq!(rx.recv(), Some(42)); } #[test] fn closed_tx() { let (tx, mut rx) = channel::<()>(); drop(tx); assert_eq!(rx.recv(), None); } }

Implementation Q&A

0:53:08

Q : Why not use AtomicUSize instead of Mutex ?
A : 我們需要 Mutex 來控制 queue。因為我們現在的實作有 Mutex,所以 AtomicUSize 並沒有任何幫助。

Q : Do you ever use gdb to debug Rust programs?
A : Jon 有。Jon 發現 print debugging 比較容易用在小程式上面,例如現在我們正在實作的這個程式。

這次我們新增卸除 rx 值的 test case :

#[test] fn closed_rx() { let (mut tx, rx) = channel(); drop(rx); tx.send(42); }

可以通過測試 :

$ cargo test ... running 3 tests test tests::closed_rx ... ok test tests::closed_tx ... ok test tests::ping_pong ... ok

但這樣對嗎? 或許我們要跟 Sender 說已經沒有 Receiver 了,而不是盲目地傳送成功? 但這其實並沒有正確解答。傳送是否總是成功,亦或是可能以某種方式傳送失敗都是設計的選擇但這裡的實作我們先暫時這樣,如果想要有傳送失敗的功能,當使用者傳送某值失敗時,使用者必須能夠取的回某值,記錄到 log 是其中一種方法,這樣使用者才可以嘗試傳送到別的地方。稍微再提一下傳送失敗的功能可能還需要加什麼才能完成,你可能需將 Inner 結構再加一個 bool closed_flag,如果 Receiver 卸除時,將 closed_flag 設為 True,不用呼叫 notify_all 函式,因為目前的實作並沒有 block Sender 的機制。要傳送資料之前先檢查 closed_flag 是否為 False,否的話則回傳 Err

Q : can we resurrect dropped channel?
A : 不行。如果 Sender 離開了,你將沒辦法再傳送資料了。在我們的實作中,Sender 和 Receiver 具有相同的實作。理論上,我們可以加入一個方法,讓你從 Receiver 建構 Sender。大多數的 Sender 和 Receiver 實作並不像我們現在的實作一樣完全對稱,你無法輕鬆地從別人實作的 Receiver 建立 Sender。在我們的實作,你可以從 Sender 那裡得到一個 Receiver,但我們不提供這個功能,因為這將導致使用者可能創建多個 Receiver,雖然我們的實作可以支援 MPMC。

你可能注意到了,我們的實作的每個操作都都要 acquire lock,導致效能低下。如果你想要高效能的 channel,你有一堆的 send 在彼此競爭 lock,這時你可能不希望 send 彼此爭用 lock 的情形發生。假設你現在有 10 個執行緒同時嘗試想要傳送資料,實際上你也許可以實作來允許它們做到這一點,唯一真正需要同步的是 Sender 與 Receiver,而不是 Sender 彼此之間同步,想要高效能的話,不應該像我們目前的實作 lock 所有 Sender,我們等等會稍微提到要怎麼實作高效能的 channel。

Synchronous channels

0:58:37

標準函式庫的 Sender 有兩種 : Sender 以及 SyncSender,當你建構這兩種不同的 Sender (asynchronous 版本以及 synchronous 版本),你皆得到相同的 Receiver。注意到,asynchronousasync 是不同的東西。

  • synchronous 不論 Sender 比 Receiver 快多少,由於 channel 的 capcity 有大小限制,所以當 buffer 沒有空間時,Sender 會被 block。
  • asynchronous (我們的實作) : 由於 channel 的 capcity 可以增長,所以 Sender 不會因為有 buffer 滿就被 block 的情形發生。

結論是 channel 的版本是由 Sender 是否會被 block 決定。雖然 asynchronous 版本聽起來很棒,但由於它沒有 back pressure ( back pressure : 生產者速度太快,消費者跟不上的壓力),使得 buffer 可以一直增長,最後導致系統承受不住。

如果你想要實作 synchronous 版本的 channel,你需要有 block Sender 的機制,且 Receiver 需要通知處於 sleep 狀態的 Sender 可以繼續傳送資料了。根據我們現有的實作,你需要新增一個 Condvar 給 Sender。

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
1:02:04
Q : Why not have senders use Weak? If Sender::send tries to deref the Weak and returns none, then fail. And if the weak count is 0, then you can know that Receiver::recv will fail.

A : 可以。weak 是 Arc 的版本,weak 不會增加參考計數。如果參考計數尚未歸零,你有方法可以試著去增加參考計數。Sender 會試著 upgrade 它們的 Sender,如果 upgrade 成功的話,它們將知道 Receiver 還在。不過 weak 有個缺點是,你必須 atomically 增加/減少參考計數,這樣增加額外的開銷。

Q : Is there a way to have a CondVar without a Mutex?
A : 沒有,Condvar 都一定要搭配 Mutex 使用。

Q : wouldn't send technically block if a send caused a vec resize
A : 它不是 block Sender 的機制,只是在 resize 的時候,傳送需要花比較久的時間而已。如果想要避免偶爾因為 resize 而導致傳送時間不穩定的問題,可以不要使用 VecDeq。

Q : Given the implementation you currently have, how hard would it be to write an Iterator implementation which consumes values from the channel until all senders are gone and then ends with None ?
A : 非常簡單,將下面程式碼加到我們的實作 :

impl<T> Iterator for Receiver<T> { type Item = T; fn next(&mut self) -> Option<Self::Item> { self.recv() } }

Q : Is there a good way to send multiple items at once?
A : 理論上很容易加這功能,新增 send_many 函式,該函式一樣需要 Mutex。

目前程式碼
use std::collections::VecDeque; use std::sync::{Arc, Condvar, Mutex}; pub struct Sender<T> { shared: Arc<Shared<T>>, } impl<T> Clone for Sender<T> { fn clone(&self) -> Self { let mut inner = self.shared.inner.lock().unwrap(); inner.senders += 1; drop(inner); Sender { shared: Arc::clone(&self.shared), } } } impl<T> Drop for Sender<T> { fn drop(&mut self) { let mut inner = self.shared.inner.lock().unwrap(); inner.senders -= 1; let was_last = inner.senders == 0; drop(inner); if was_last { self.shared.available.notify_one(); } } } impl<T> Sender<T> { pub fn send(&mut self, t: T) { let mut inner = self.shared.inner.lock().unwrap(); inner.queue.push_back(t); drop(inner); self.shared.available.notify_one(); } } pub struct Receiver<T> { shared: Arc<Shared<T>>, } impl<T> Receiver<T> { pub fn recv(&mut self) -> Option<T> { let mut inner = self.shared.inner.lock().unwrap(); loop { match inner.queue.pop_front() { Some(t) => return Some(t), None if inner.senders == 0 => return None, None => { inner = self.shared.available.wait(inner).unwrap(); } } } } } impl<T> Iterator for Receiver<T> { type Item = T; fn next(&mut self) -> Option<Self::Item> { self.recv() } } struct Inner<T> { queue: VecDeque<T>, senders: usize, } struct Shared<T> { inner: Mutex<Inner<T>>, available: Condvar, } pub fn channel<T>() -> (Sender<T>, Receiver<T>) { let inner = Inner { queue: VecDeque::default(), senders: 1, }; let shared = Shared { inner: Mutex::new(inner), available: Condvar::new(), }; let shared = Arc::new(shared); ( Sender { shared: shared.clone(), }, Receiver { shared: shared.clone(), } ) } #[cfg(test)] mod tests { use super::*; #[test] fn ping_pong() { let (mut tx, mut rx) = channel(); tx.send(42); assert_eq!(rx.recv(), Some(42)); } #[test] fn closed_tx() { let (tx, mut rx) = channel::<()>(); drop(tx); assert_eq!(rx.recv(), None); } #[test] fn closed_rx() { let (mut tx, rx) = channel(); drop(rx); tx.send(42); } }

Batch recv optimization

1:05:55

因為我們只有一個 Receiver,我們其實可以不用在每次呼叫 recv 時都去 acqurie lock,這樣 Sender 在 acquire lock 就可以更快了,因為 Receiver acquire lock 次數降低了。

先新增 buffer 成員到 Receiver :

pub struct Receiver<T> { shared: Arc<Shared<T>>, + buffer: VecDeque<T>, }

再修改 recv 函式 :

impl<T> Receiver<T> { pub fn recv(&mut self) -> Option<T> { + // 先檢查 buffer 有沒有東西,如果有的話直接從這裡拿就好 + // 這裡不需要加 lock 了,將提升效能 + if let Some(t) = self.buffer.pop_front() { + return Some(t); + } + let mut inner = self.shared.inner.lock().unwrap(); loop { match inner.queue.pop_front() { - Some(t) => return Some(t), + Some(t) => { + // 1. 之所以這樣做是因為可能 Senders 已經讓 queue 累積一些資料了 + // 2. 因為現在我們有 lock,所以在 swap queue 跟 buffer 時, + // 不會有 race condition 的問題。 + if !inner.queue.is_empty() { + std::mem::swap(&mut self.buffer, &mut inner.queue); + } + // 上一步驟只是做 swap, + // 仍須回傳 t 給 Receiver + return Some(t); + }, None if inner.senders == 0 => return None, None => { inner = self.shared.available.wait(inner).unwrap(); } } } } }

channel 函式的 Receiver 也要初始化 buffer :

pub fn channel<T>() -> (Sender<T>, Receiver<T>) { ... let shared = Arc::new(shared); ( ... Receiver { shared: shared.clone(), + buffer: Default::default(), } ) }

Q : Won't the Receiver buffer optimization trigger a lot of extra memory allocator activity?
A : 最多就兩倍。resize 也可以 amortized。

Q : How did you come to that optimization? Looking at implementations of channels? Got it from somewhere else?
A : 如果你去看更多最佳化的實作,你會發現這是很常見的技巧。

Q : It's important that swap() is used, rather than just discarding the local buffer each time.
A : 如果分配新的 VecDeq 將會讓程式降低效率 :

self.buffer = std::mem::take(&mut inner.queue);
// std::mem::swap(&mut self.buffer, &mut inner.queue);

Q : Do you think it might be faster without the branch for the swap?
A : 沒有 branch 可能比較快,但如果量很大的話,現在的 branch predictor 預測的正確率相當高。當你的 queue 總是非空,它會傾向下次猜不跳,當你的 queue 總是空,它會傾向下次猜跳。
將實作改成無論 queue 是否為空都 swap :

Some(t) => { - if !inner.queue.is_empty() { - std::mem::swap(&mut self.buffer, &mut inner.queue); + std::mem::swap(&mut self.buffer, &mut inner.queue); - } return Some(t); },

Q : branch predictor?
A : CPU 內部的元件,用來幫助你判斷 branch 猜跳還是不跳來降低 branch hazard。

Q : or maybe recv can just return "list of values" as a result??
A : 是可以,但像我們的實作只回傳 Option 值會更好,然後將回傳值當 Iteraotr 使用會比你的方法快。如果你回傳 list,將會導致額外的記憶體開銷,因為回傳時每次都要給 list 配置記憶體。

Q : What about extending buffer with inner.queue.drain(..)? This would save memory but will probably be a lot slower
A : 它事實上沒有省下記憶體,你最終仍然會發現 buffer 跟 queue 都有 capacity。

目前程式碼
use std::collections::VecDeque; use std::sync::{Arc, Condvar, Mutex}; pub struct Sender<T> { shared: Arc<Shared<T>>, } impl<T> Clone for Sender<T> { fn clone(&self) -> Self { let mut inner = self.shared.inner.lock().unwrap(); inner.senders += 1; drop(inner); Sender { shared: Arc::clone(&self.shared), } } } impl<T> Drop for Sender<T> { fn drop(&mut self) { let mut inner = self.shared.inner.lock().unwrap(); inner.senders -= 1; let was_last = inner.senders == 0; drop(inner); if was_last { self.shared.available.notify_one(); } } } impl<T> Sender<T> { pub fn send(&mut self, t: T) { let mut inner = self.shared.inner.lock().unwrap(); inner.queue.push_back(t); drop(inner); self.shared.available.notify_one(); } } pub struct Receiver<T> { shared: Arc<Shared<T>>, buffer: VecDeque<T>, } impl<T> Receiver<T> { pub fn recv(&mut self) -> Option<T> { // 先檢查 buffer 有沒有東西,如果有的話直接從這裡拿就好 // 這裡不需要加 lock 了,將提升效能 if let Some(t) = self.buffer.pop_front() { return Some(t); } let mut inner = self.shared.inner.lock().unwrap(); loop { match inner.queue.pop_front() { Some(t) => { std::mem::swap(&mut self.buffer, &mut inner.queue); return Some(t); }, None if inner.senders == 0 => return None, None => { inner = self.shared.available.wait(inner).unwrap(); } } } } } impl<T> Iterator for Receiver<T> { type Item = T; fn next(&mut self) -> Option<Self::Item> { self.recv() } } struct Inner<T> { queue: VecDeque<T>, senders: usize, } struct Shared<T> { inner: Mutex<Inner<T>>, available: Condvar, } pub fn channel<T>() -> (Sender<T>, Receiver<T>) { let inner = Inner { queue: VecDeque::default(), senders: 1, }; let shared = Shared { inner: Mutex::new(inner), available: Condvar::new(), }; let shared = Arc::new(shared); ( Sender { shared: shared.clone(), }, Receiver { shared: shared.clone(), buffer: Default::default(), } ) } #[cfg(test)] mod tests { use super::*; #[test] fn ping_pong() { let (mut tx, mut rx) = channel(); tx.send(42); assert_eq!(rx.recv(), Some(42)); } #[test] fn closed_tx() { let (tx, mut rx) = channel::<()>(); drop(tx); assert_eq!(rx.recv(), None); } #[test] fn closed_rx() { let (mut tx, rx) = channel(); drop(rx); tx.send(42); } }

Channel flavors

1:13:23

// Flavors: // - Synchronous channels: Channel where send() can block. Limited capacity. // - Asynchronous channels: Channel where send() cannot block. Unbounded. // - Rendezvous channels: Synchronous with capacity = 0. Used for thread synchronization. // - Oneshot channels: Any capacity. In practice, only one call to send().

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
補充
Rendezvous channels 是一個容量為零的同步channel。因此,在這裡的想法是 rendezvous channel 實際上是,它不允許你發送東西。通常,它是一個僅用於同步兩方的 channel。很常見的是,rendezvous channel 的 T 類型被設置為像 unit (empty tuple)一樣。

Rendezvous channel 主要用於執行緒同步。如果透過某執行緒 A 去啟動執行緒 B,但執行緒 A 實際上沒有向執行緒 B 傳送任何內容。現在只是想要執行緒 B 做一些事情。這就是 rendezvous channel 的用途。

容量為零的 channel 意思表示只有在當前有一個 block 的 Receiver 時才能進行傳送,因為你無法將任何東西儲存在 channel 本身。因此可以傳送的唯一方式是將它交給當前正在等待的 Receiver。這基本上意味著一個執行緒必須到達其 receive point,而另一個執行緒則必須到達其 send point,現在它們會合了,它們都在一個已知的位置,然後它們可以從那裡繼續往下執行。這通常是透過 Barrier 來實現的,但你也可以選擇使用 channel 版本來實現,channel 版本最終是 two-way 同步,因為 Receiver 在 Sender 到達之前也無法繼續。

Flavor Q&A

1:18:48

Q : I've heard the term 'bounded' and 'unbounded' for your 'synchronous' and 'asynchronous' too - in case others are wondering
A : synchronous channel 通常被叫做 bounded channel,asynchronous 通常被叫做 unbounded channel。之所以要這麼叫是因為要跟 async 做區分。

Q : So basically what you should use if you need a Convar, but don't have a Mutex with locked data?
A : Rendezvous 不是 Mutex,它並不保證 mutual exclusion,它有點像 Condvar 讓你用來喚醒其它執行緒,但它沒有給你 guard 資料的方法。

Q : more like UNIX pipes
A : 所有 channel 都像 UNIX pipes。

Q : can rendezvous channels actually send anything useful?
A : 你可以在 rendezvous channel 內傳送/接收資料,它仍然有 T 型別。但可以傳送/接收資料的前提是,Sender 只能在 Receiver 當前存在的情況下傳送,並且 Receiver 只有在 Sender 當前存在的情況下才能接收。

Q : Seems Rust has a lot of specific impl/design of channels while Go just have simple channels impl.
A : go 有我們剛剛提到所有 flavors 的 channel 實作。具體來說,go 以及 Rust 的 channel 只有一種類型,剛剛提到的不同 flavors 並非不同的 channel 型別,而是在執行時期選擇的不同實作。
Oneshot channels 如果有額外的傳送資料發生,該 channel 將 upgrade 成不同的 channel,這也表示第一次在傳送資料的速度快於後面傳送資料的速度。你判斷出 channel 是 Rendezvous channels 是因為它的 capacity 為 0。而分辨出 Asynchronous channel 以及 Synchronous channel 的方法則是看它們的 capacity 是否為 bounded。

Q : Would a sync channel where T=() be a Rendezvous channel?
A : 是的。

Q : Kinda like the baton pass in a relay race 😀
A : 是的。

Other implementations

1:22:32

// Flavors: // - Synchronous channels: Channel where send() can block. Limited capacity. // - Mutex + Condvar + VecDeque // - Atomic VecDeque (atomic queue) + thread::park + thread::Thread::notify // - Asynchronous channels: Channel where send() cannot block. Unbounded. // - Mutex + Condvar + VecDeque // - Mutex + Condvar + LinkedList // - Atomic linked list, linked list of T // - Atomic block linked list, linked list of atomic VecDeque<T> // - Rendezvous channels: Synchronous with capacity = 0. Used for thread synchronization. // - Oneshot channels: Any capacity. In practice, only one call to send(). // async

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
補充

  1. Atomic VecDeque 是固定大小的陣列,但你要 atomically 追蹤 head 指標以及 tail 指標。
    • flume 使用比我們得實作更聰明的方法使用 Mutex。
    • crossbeam ArrayQueue/SegQueue 使用到 head 指標以及 tail 指標。
  2. LinkedList 可以省去額外配置記憶體的需求(優於 VecDeq),但通常會用到的是雙向鏈結串鍊。每次呼叫 send 都將資料放到雙向鏈結串列的頭即可,而呼叫 recv 時只要取得 head 指標的記憶體位置,並且將別的 scope 的 head 設為 NULL 讓別人不能 access,然後再反向走訪(單向鏈結串列做不到)雙向鏈結串列取出節點資料即可。
  3. Atomic block (區塊) linked list 不是每次 push 都是一個將新 item append 到串列的 atomic operation,你所做的是將 atomic VecDeque<T> 與 atomic block linked list 混合。只有偶爾你才需要實際 append 到 linked list,而並行 append item 到 linked list 是一個有問題的操作,因為想像你有兩個 Senders 同時都想要傳送資料,使用 linked list 的話,其中一個 Sender 將成功更新 linked list 的 tail 指標,但另一個 Sender 會失敗,失敗的那一個必須 retry。如果你有 T blocks 的串列,則執行緒實際上只偶爾需要更新 tail 指標。你可以使用 fetch_add 並行執行來 increment tail 指標。因此,這種 atomic block linked list 在實踐中效率更高。
    實作參考 : Struct crossbeam::queue::SegQueue

Q : how do async/await channels differ in implementation from blocking ones?
A : 等等會看到。

Q : Doesn't a LinkedList guarantee to always end up doing an allocation/deallocation on each push/pop ?
A : 是的。每次進行 push 都會配置記憶體,pop 都會釋放記憶體。這也顯示了 block linked list 變體的另一個優勢。通常,記憶體配置系統不是你程式的瓶頸,尤其是如果你使用像 jemallocator 這樣的東西,基本上是使用 thread local allocation,所以還可以接受。但這其實有一個缺點,當你在測量 channel 的效能的時候,實際上會測量到記憶體開銷與記憶體配置器效能。

Q : wouldn't a smart list just keep spare nodes around?
A : 你可以這麼做。建立一個 reusing pool 給這些節點並重複使用這些節點,但問題是你現在要 atomically 管理 pool,同步原始物件的使用仍無法避免。Jon 不清楚你是否可以編寫比記憶體配置器的配置和釋放記憶體更好的 reusing pool 實作。如果你想要使用實作 reusing pool,你可能會想要使用 arena allocator

Future-aware channels

1:32:24

// async/await

要實作出同時滿足以下兩個機制 : async/await(如 futures) 世界的 channel 以及 blocking 執行緒世界的 channel 非常困難,因為這些基本機制有些不同。

如果你這樣做,假設你做了傳送/接收的動作,並且 channel 此時已經滿了,那麼在 async/await 世界中,你不希望 block 的情形發生,而是希望 yield 值給 parent Future,最終 yield 值給 Executor,然後在將來的某個時候,你將被喚醒以再次輪詢。這聽起來有點像等待 Condvar,但實際上不完全相同,因為實際上在 async/await 世界中,需要回傳值而無法在當前函式中停留,並且 async/await 和 blocking 的通知機制略有不同,儘管 async/await 確實具有與你通知 Waker 相同的風格,但這將導致其他事物再次被輪詢

Future/Executor
圖片來源 : Future

而程式難寫的地方在於讓內部知道它是在 async context (例如 : future context) 中使用的實作,或在 blocking context 中,而不將這些資訊揭露給使用者。很多時候,你可能會得到一些額外的類型參數,比如 Waker 或 SignalingMechanism,這會讓結構變得非常難看 :

pub struct Sender<T, Waker> pub struct Sender<T, SignalingMechanism>

雖然實作不容易,但還是有其他人實作出來,例如 : flume 和 crossbeam,都有 blocking 和 async/await 版本,你可以去看看它們是怎麼實作的。基本上需要更多的 bookkeeping。你必須對類型要求更加挑剔。而通常你最終得到的是看起來很類似但實際上有些不同的 channel,在執行時期,它基本上最終會分化成管理底層資料儲存或底層資料結構的不同方式,具體取決於您是否在 blocking 世界中。例如,如果您在 blocking 世界中,你可以進行一些在 async 世界做不到的最佳化,反之亦然。但在實際應用中,使用的資料結構無論是使用 VecDeq 還是 Atomic Linked List 或類似的任何東西,都是相當類似的,存在的風格也是相當類似的。

Q : You could probably beat the allocator because we always need allocations of the same size
A : 記憶體配置器很擅長利用重複模式,所以實際上你的實作還是不容易贏過記憶體配置器。而且想要實作出高效的重複使用的機制(類似 GC) 其實相當困難。你可以使用 bump allocator 或類似的東西。

Where next?

1:36:27

看看文件以及如何實作的 :

  • MPSC
  • crossbeam : 適用於 high contention 的情形,因為它沒使用 Mutex,而是使用到了 atomical operation。
  • flume : 適用於 low contention 的情形,因為爭奪 lock 情形發生較少,所以 Mutex 不會造成過多的額外開銷。

Channels Q&A

1:38:24

Q : any thoughts on benchmarking channel impls?
A : chan crate,性能評估包含 go channels, 標準函式庫 channels, flume, crossbeam channels,不過這個 crate 已經過時了。
一般來說,你在做 channel 的效能評估時,你會評估所有 flavor 的 channels,因為它們確實代表了真實的用例。
測量項目包含 :

  • 單次傳送大資料以及小資料的效能比較
  • Sender 數量多寡 (SPSC vs. MPSC) 的效能比較
  • bounded channel 是否有易滿或易空的傾向

Q : rendezvous channels are like default go channels ? (0 capacity)
A : 是的。

Q : A bump allocator would probably be really good since you could likely allocate memory atomically
A : 很有可能,而且也因為您不需要卸除任何東西,在這種情況下,因為記憶體已經被移交。所以 drop 實作是一個 no-op。所以你也許可以使用像 bumpalow 這樣的酷東西。

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
1:40:40
Q : how do they support async without tying it to specific executor like Tokio?
A : 當前在 async/await 生態系統中存在不協調的主要原因是圍繞著 I/O trait,如 async read/async write,以及 spawn feature,即能夠在背景執行 future。對於實作一個 channel,你不需要這兩者。你只需要標準函式庫提供的基本機制像是 Waker trait、yield、sleep 和 wake 等功能。這些都是相同的。它們來自標準函式庫的 std::task。因此,您可以獨立於 Executor 使用這些,這就是為什麼 channel 在跨 Executor 方面相對輕鬆的原因。


Q : If you have a sleeping sender thread, and you'd like it to wake if the Receiver is dropped (so it can free up resources), is there a standard way to do that? Just have it wake few seconds to check
A : 不用每幾秒去喚醒 Senders 一次。你只要在我們實作的 Receiver 加一個 Drop 實作,該實作要有 notify_all 去通知所有 Senders,Senders 再做可能像是釋放記憶體的處置之類的就好。
Q : I meant if the sender is sleeping on something else (i.e. a network socket), not sleeping in send().
A : 這基本上就是你需要 async/await,因為你不容易有辦法在 synchronous 程式碼中等待多個不同事件中的任何一個。


目前程式碼
use std::collections::VecDeque; use std::sync::{Arc, Condvar, Mutex}; // Flavors: // - Synchronous channels: Channel where send() can block. Limited capacity. // - Mutex + Condvar + VecDeque // - Atomic VecDeque (atomic queue) + thread::park + thread::Thread::notify // - Asynchronous channels: Channel where send() cannot block. Unbounded. // - Mutex + Condvar + VecDeque // - Mutex + Condvar + LinkedList // - Atomic linked list, linked list of T // - Atomic block linked list, linked list of atomic VecDeque<T> // - Rendezvous channels: Synchronous with capacity = 0. Used for thread synchronization. // - Oneshot channels: Any capacity. In practice, only one call to send(). // async pub struct Sender<T> { shared: Arc<Shared<T>>, } impl<T> Clone for Sender<T> { fn clone(&self) -> Self { let mut inner = self.shared.inner.lock().unwrap(); inner.senders += 1; drop(inner); Sender { shared: Arc::clone(&self.shared), } } } impl<T> Drop for Sender<T> { fn drop(&mut self) { let mut inner = self.shared.inner.lock().unwrap(); inner.senders -= 1; let was_last = inner.senders == 0; drop(inner); if was_last { self.shared.available.notify_one(); } } } impl<T> Sender<T> { pub fn send(&mut self, t: T) { let mut inner = self.shared.inner.lock().unwrap(); inner.queue.push_back(t); drop(inner); self.shared.available.notify_one(); } } pub struct Receiver<T> { shared: Arc<Shared<T>>, buffer: VecDeque<T>, } impl<T> Receiver<T> { pub fn recv(&mut self) -> Option<T> { if let Some(t) = self.buffer.pop_front() { return Some(t); } let mut inner = self.shared.inner.lock().unwrap(); loop { match inner.queue.pop_front() { Some(t) => { std::mem::swap(&mut self.buffer, &mut inner.queue); return Some(t); }, None if inner.senders == 0 => return None, None => { inner = self.shared.available.wait(inner).unwrap(); } } } } } impl<T> Iterator for Receiver<T> { type Item = T; fn next(&mut self) -> Option<Self::Item> { self.recv() } } struct Inner<T> { queue: VecDeque<T>, senders: usize, } struct Shared<T> { inner: Mutex<Inner<T>>, available: Condvar, } pub fn channel<T>() -> (Sender<T>, Receiver<T>) { let inner = Inner { queue: VecDeque::default(), senders: 1, }; let shared = Shared { inner: Mutex::new(inner), available: Condvar::new(), }; let shared = Arc::new(shared); ( Sender { shared: shared.clone(), }, Receiver { shared: shared.clone(), buffer: Default::default(), } ) } #[cfg(test)] mod tests { use super::*; #[test] fn ping_pong() { let (mut tx, mut rx) = channel(); tx.send(42); assert_eq!(rx.recv(), Some(42)); } #[test] fn closed_tx() { let (tx, mut rx) = channel::<()>(); drop(tx); assert_eq!(rx.recv(), None); } #[test] fn closed_rx() { let (mut tx, rx) = channel(); drop(rx); tx.send(42); } }

待整理

  1. 0:15:30
  2. 1:02:04
  3. Future-aware channels 整個小節
  4. 1:40:40