Try   HackMD

Crust of Rust : async/await

直播錄影

  • 主機資訊
    ​​​​wilson@wilson-HP-Pavilion-Plus-Laptop-14-eh0xxx ~/CrustOfRust> neofetch --stdout
    ​​​​wilson@wilson-HP-Pavilion-Plus-Laptop-14-eh0xxx 
    ​​​​----------------------------------------------- 
    ​​​​OS: Ubuntu 22.04.3 LTS x86_64 
    ​​​​Host: HP Pavilion Plus Laptop 14-eh0xxx 
    ​​​​Kernel: 6.2.0-37-generic 
    ​​​​Uptime: 22 mins 
    ​​​​Packages: 2367 (dpkg), 11 (snap) 
    ​​​​Shell: bash 5.1.16 
    ​​​​Resolution: 2880x1800 
    ​​​​DE: GNOME 42.9 
    ​​​​WM: Mutter 
    ​​​​WM Theme: Adwaita 
    ​​​​Theme: Yaru-dark [GTK2/3] 
    ​​​​Icons: Yaru [GTK2/3] 
    ​​​​Terminal: gnome-terminal 
    ​​​​CPU: 12th Gen Intel i5-12500H (16) @ 4.500GHz 
    ​​​​GPU: Intel Alder Lake-P 
    ​​​​Memory: 1995MiB / 15695MiB 
    
  • Rust 編譯器版本 :
    ​​​​wilson@wilson-HP-Pavilion-Plus-Laptop-14-eh0xxx ~/CrustOfRust> rustc --version
    ​​​​rustc 1.70.0 (90c541806 2023-05-31) (built from a source tarball)
    

Introduction

0:00:00

Finally, we tackle the topic of async/await in Rust, and specifically looking at how to use and think about async/await more so than how it works under the hood. My hope with this video is to convey the mental model and intuition you should use when using async/await in your own code, without getting bogged down in the details of Future, Pin, and Wakers.

Jon 為了 async 寫了 Rust for Rustaceans,因為在 The Rust Programming Language 並沒有提到 async。Asynchronous Programming in Rust 的編排沒有很好,還在調整中,直播也會談到該書遺漏的地方。另外推薦瀏覽 mini-redis,它實作了 redis client 和 server,雖然實作不完全,但它向你展示如何建立非同步應用程式、client-side 和 server-side,它有很完善的文件,你可以閱讀它來了解發生的情況並了解其中的一些設計原則和思想。

雖然非同步程式碼有很多不同的 Executor,但 Jon 通常會關注 tokio,因為它是使用最廣泛的 Executor,也是最積極維護的一個。等等將要討論的大部分內容都與 Executor 無關,但不討論 tokio 的具體細節,因為這與你如何思考非同步程式碼並不相關。

Fixing the video title

0:04:49

What is async fn?

0:05:28

開始建置 Rust 專案 :

$ cargo new --bin patience
$ cd patience
$ vim src/main.rs

先定義一個 async 函式的 :

#![allow(dead_code, unused_variables)] use std::future::Future; fn main() { println!("Hello, world!"); // let x = foo1(); // 可以編譯 let x: usize = foo1(); // 無法編譯 } // async keyword 編譯器會讓 foo1 de-sugaring 成 foo2 async fn foo1() -> usize { 0 } // Future trait 表示尚未準備好的值,但最終會回傳 usize。 // 我不會告訴你什麼時候回傳 usize,只是在未來的某個時間點。 fn foo2() -> impl Future<Output = usize> { async { 0 } }
目前程式碼
#![allow(dead_code, unused_variables)] use std::future::Future; fn main() { println!("Hello, world!"); // let x = foo1(); let x: usize = foo1(); } async fn foo1() -> usize { 0 } fn foo2() -> impl Future<Output = usize> { async { 0 } }

如果你宣告 xusize,編譯器會提示以下錯誤訊息 :

cargo run ... --> src\main.rs:8:20 | 8 | let x: usize = foo1(); | ----- ^^^^^^ expected `usize`, found future | | | expected due to this | note: calling an async function returns a future --> src\main.rs:8:20 | 8 | let x: usize = foo1(); | ^^^^^^ help: consider `await`ing on the `Future` | 8 | let x: usize = foo1().await; | ++++++

await 表示在實際 resolve 為其輸出型別之前,請勿執行以下指令清單。

當你嘗試想在 foo1 裡印出東西,會印不出來 :

... fn main() { println!("Hello, world!"); - let x: usize = foo1(); + let x = foo1(); + // 只是為了展示 x 不能宣告為 usize } async fn foo1() -> usize { + println!("foo"); 0 } fn foo2() -> impl Future<Output = usize> { + println!("foo"); async { 0 } }
目前程式碼
#![allow(dead_code, unused_variables)] use std::future::Future; fn main() { println!("Hello, world!"); let x = foo1(); } async fn foo1() -> usize { println!("foo"); 0 } fn foo2() -> impl Future<Output = usize> { async { println!("foo"); 0 } }

編譯執行的結果只印出主程式的 println!() :

$ cargo run ... Hello, world!

之所以沒有印出 "foo" 是因為在 await 之前,Future 不做任何事。Future 只是描述將在未來某個時刻執行的一系列步驟。等等會討論到讓 foo()println!() 被執行。

繼續看到 foo2 函式 :

fn foo2() -> impl Future<Output = usize> { async { println!("foo"); 0 } }

Future 第一次 await 會執行 Line 3,並立刻回傳 0 並 resolve。

awaiting futures

0:10:40

想像 foo1read_to_string("file") 的其中一部分功能 :

... fn main() { println!("Hello, world!"); - let x = foo1(); + let x = foo2(); } ... fn foo2() -> impl Future<Output = usize> { async { - println!("foo"); + println!("foo1"); + read_to_string("file").await; + println!("foo2"); 0 } }

foo1() 函式的操作確實就會花一些時間在處理 I/O,那 Line 14 會等 Line 15 resolve 完之後才會印出 "foo1"。

如果讓 foo1() 函式不要 await :

fn foo2() -> impl Future<Output = usize> { async { println!("foo1"); - read_to_string("file").await; + read_to_string("file"); println!("foo2"); 0 } }

那麼 Line 6 就會直接印出 "foo2",因為 Line 5 只會回傳 Future,因為我們沒告訴 foo1() 函式要 await,所以沒有實際的工作發生。

你可以想成非同步區塊的方式實際上是它們以區塊的形式執行,將程式碼複製來說明 :

fn foo2() -> impl Future<Output = usize> { async { // First time: (表示 Line 6 完成) println!("foo1"); read_to_string("file1").await; // Wait here // Second time: (表示 Line 9 完成) println!("foo1"); read_to_string("file2").await; println!("foo1"); read_to_string("file3").await; println!("foo1"); read_to_string("file4").await; println!("foo2"); 0 } }

第一次會執行 Line 5 與 Line 6,然後就不會做任何事情了,它會 yield back。

你可以把上面程式碼想成做以下的事情 (實際上不是這樣),它就像在 spin 和 yield :

fn foo2() -> impl Future<Output = usize> { async { // First time: println!("foo1"); - read_to_string("file1").await; // Wait here + let fut = read_to_string("file1").await; + while !fut.is_ready() { + std::thread::yield_now(); + fut.try_complete(); + } + let result = fut.take_result(); // Second time: println!("foo1"); read_to_string("file2").await; println!("foo1"); read_to_string("file3").await; println!("foo1"); read_to_string("file4").await; println!("foo2"); 0 } }

程式將檢查 fut 是否在完成方面取得了進展。為 Future 取得進展意味著到達下一個等待點 (Line 15)。因此,當 Future 確實開始執行時,程式將從上次暫停的地方繼續執行,直到下一個可能無法繼續進展的地方。

繼續用原程式說明 :

fn foo2() -> impl Future<Output = usize> { async { println!("foo1"); read_to_string("file1").await; println!("foo1"); read_to_string("file2").await; println!("foo1"); - read_to_string("file3").await; + let x = /* waiting on */ read_to_string("file3").await; println!("foo1"); + expensive_function(x); - read_to_string("file4").await; + /* yield again and wait on */read_to_string("file4").await; println!("foo2"); 0 } }

因為在 Line 12 沒有使用 await,一旦 Line 10 的 await resolves,我們就可以開始執行。而且我們可以假設沒有非同步。我們只執行指令,其中可能包括呼叫所有 Line 12 這些 expensive_function() ,我們可能會做各種事情,而且我們不會被中斷。

儘管此時我們處於標準執行緒模型中,但其他事物不會在我們的位置上執行。我們的執行緒可能會被中斷,另一個執行緒可能會開始執行,但除此之外,在這一點上,與非同步相關的魔法是不存在的。我們將繼續執行,就好像我們是一個正常的函式,一直到我們創建另一個 Futurue,然後等待該 Futurue。在那時,我們會再次進入等待狀態,等待該 Futurue 完成。一旦 await (Line 10),我們就必須執行 Line 11 - Line 12,因為這兩行沒有使用 await,並且我們一直執行直到下一個等待點 (Line 14)。

Yielding

0:17:00

另一種 de-sugaring await :

fn foo2() -> impl Future<Output = usize> { async { // let x = read_to_string("file").await; let fut = read_to_string("file"); let x = loop { if let Some(result) = fut.try_check_completed() { break result; } else { fut.try_make_progress(); yield; } }; println!("foo1"); read_to_string("file1").await; println!("foo1"); read_to_string("file2").await; println!("foo1"); let x = /* waiting on */ read_to_string("file3").await; println!("foo1"); expensive_function(x); /* yield again and wait on */read_to_string("file4").await; println!("foo2"); 0 } }

你現在可以把 Line 13 的 yield 想成 thread yield,但它具有一個額外的特性,即你實際上會一直返回到首次 Future awaited 的地方。你可以把它想成東西的 stack 在彼此等待 :

fn main() { println!("Hello, world!"); let x = foo2(); } async fn foo1() -> usize { println!("foo"); 0 } fn foo2() -> impl Future<Output = usize> { async { // let x = read_to_string("file").await; let fut = read_to_string("file"); let x = loop { if let Some(result) = fut.try_check_completed() { break result; } else { fut.try_make_progress(); yield; } }; ... } }

main() 函式等待 foo2() 函式,foo2 等待 read_to_string() 函式,read_to_string() 函式等待 foo1() 函式。每當你 yield 時,你實際上會返回到該呼叫 stack 的頂部,並重新回到那裡。但下一次,當某事呼叫類似 try_check_completed()try_make_progress() 時,它將從先前呼叫 let 的最底部項目的 yield point 繼續。這是一種思考 await 的方式,它是一個迴圈,每當它無法進展時就會 yield。

目前程式碼
#![allow(dead_code, unused_variables)] use std::future::Future; fn main() { println!("Hello, world!"); let x = foo2(); } async fn foo1() -> usize { println!("foo"); 0 } fn foo2() -> impl Future<Output = usize> { async { // let x = read_to_string("file").await; let fut = read_to_string("file"); let x = loop { if let Some(result) = fut.try_check_completed() { break result; } else { fut.try_make_progress(); yield; } }; println!("foo1"); read_to_string("file1").await; println!("foo1"); read_to_string("file2").await; println!("foo1"); let x = /* waiting on */ read_to_string("file3").await; println!("foo1"); expensive_function(x); /* yield again and wait on */read_to_string("file4").await; println!("foo2"); 0 } }

Awaiting one of multiple futures

0:20:03

為什麼我們想要使用 await ? 假設你有很多 Future,一個 Future 是從 terminal 讀取,你會等待使用者寫入東西到 terminal,另一個 Future 會從 network 讀取,你也會等待一個新的網路連線。你無法控制使用者何時在 terminal 中輸入內容,也無法控制某些外部程式何時連線。所以你想同時等待這兩個事件,而你並不真正關心哪一個會先發生 :

fn main() { println!("Hello, world!"); let read_from_terminal = std::thread::spawn(move || { let x = std::io::stdin(); for line in x.lines() { // do something on user input } }); let read_from_network = std::thread::spawn(move || { let mut x = std::net::TcpListener::bind("0.0.0.0:8080").unwrap(); while let Ok(stream) = x.accept() { // do something on stream } }); }

如果你在執行緒系統,我們會有一個執行緒用於處理 terminal 讀取,另一個執行緒用於接受連線。但如果我們必須為我們必須執行的每一項操作都使用一個執行緒,情況會變得更糟。甚至不考慮性能,僅僅是思考起來就變得很煩人。想像一下你上面的程式碼要管理的網路狀況如下面程式碼 :

fn main() { ... let read_from_network = std::thread::spawn(move || { let mut x = std::net::TcpListener::bind("0.0.0.0:8080").unwrap(); while let Ok(stream) = x.accept() { // do something on stream handle_connection(stream); } }); }

現在我們有一個單獨的執行緒來管理所有的連線。假設一大群使用者正在連線下載一些大文件之類的東西。你真的很想能夠使用多個執行緒,或者至少,如果一個執行緒連線已滿,你希望能夠在另一個執行緒上進行寫入。如果只有一個執行緒,這就會變得非常奇怪,因為在某個時刻,handle_connection() 必須呼叫 stream.write() :

fn main() { ... let read_from_network = std::thread::spawn(move || { let mut x = std::net::TcpListener::bind("0.0.0.0:8080").unwrap(); while let Ok(stream) = x.accept() { // do something on stream handle_connection(stream); + stream.write(); } }); }

假設一下其中一個 client 的速度非常慢。我們需要為每個 stream 建立一個執行緒,這樣你才能夠向所有這些快速的 clients 寫入而不會被困在向最慢的 client 寫入 :

fn main() { ... let read_from_network = std::thread::spawn(move || { let mut x = std::net::TcpListener::bind("0.0.0.0:8080").unwrap(); while let Ok(stream) = x.accept() { // do something on stream - handle_connection(stream); - stream.write(); + let handle = std::thread::spawn(move || { + handle_connection(stream); + }); } }); }

現在我們得到了一個執行緒 handle (Line 9)。我們需要對此做一些事情,以便在嘗試關閉伺服器時記得等待執行緒。現在我們有所有這些執行緒都在執行。像這樣編寫程式是可能的,而且很多大型程式都是這樣寫的,但不知怎的感覺很奇怪。

非同步模型實際上可以使處理網路以及 terminal 的過程更簡單。非同步模型可讓你以更符合你的想法的方式對此進行建模 :

fn main() { let network = read_from_network(); let terminal = read_from_terminal(); select! { stream <- network.await => { // do something on stream } line <- terminal.await => { // do something with line } }; }

select! 巨集在很多函式庫都有,像是 tokio,futures,等等。select 的作用是等待多個 Future,並告訴你哪個先完成。

select 將嘗試在網路上取得進展,如果它確實在網路上取得了進展,那麼它會給你一個 stream 並呼叫 Line 7 的程式碼;如果它在網路上沒有取得進展,那麼它會嘗試在 terminal 上取得進展。如果它在 terminal 上取得進展,就像 terminal 現在有新 line 一樣,那麼它將用它得到的 line 來執行 Line 10 的程式碼。如果網路以及 terminal 都沒有取得進展,那麼它就會 yield。然後在未來的某個時候,它會重試,並嘗試再次檢查進度。

在實際操作中,select! 實際上比這更聰明。在這裡底層真正發生的是,當你 yield 時,你不僅僅是說 yield,你說的是要 yield,直到發生這件事情。例如,對於網路 socket,你 yield 的是直到在這個網路 socket 上發生某事。在 terminal 上,你說的是要 yield,直到這發生某事。

在幕後,無論是執行 Future 的任何東西,並且具有 try_check_completed()try_make_progress() 等迴圈,select! 實際上會在底層使用一些作業系統原始機制來更聰明地進行重試,它實際上不是在執行像 spin 等待之類的迴圈,它只是說,我要試試 Line 6,我要試試 Line 9。如果兩者都失敗了,我會讓自己進入睡眠狀態,然後一旦作業系統通知我值得重試,我就會再次嘗試select! 甚至可以更聰明,它甚至可以意識到,Line 6 這個網路 socket 沒有任何改變,所以甚至不必嘗試在這方面取得進展。但 Line 9 的 terminal 狀態確實發生了變化,所以我要重試這個操作。

這就是為什麼 Future 對於這種情況很方便的其中一個原因,因為你有這種優雅的方式來同時嘗試多個事情。因為 Future 不是一次性執行所有程式碼,而是這種分段操作,同時也描述了如何稍後再試的方式,這使得它非常適合進行任何主要涉及 I/O 的編程,無論是網路、磁碟還是計時器,它並不僅僅是在計算上 spin,在這種情況下,非同步並不能真正增加多少效能。非同步允許你放棄執行緒的時間,比如,如果有一個需要從磁碟讀取的操作,當它在等待磁碟時,非同步允許其他事情代替它執行。

另一種思考非同步等待的方式是把它看作是一個大型的狀態機。你處於 Future 的當前狀態,並且有多條前進的路徑。一個前進的路徑是網路 socket 上有東西可用。另一個前進的路徑是 terminal上有東西可用。然後你按照狀態機中的適當 edge 前進,然後執行下一個狀態中的程式碼。

整理一下程式碼 :

#![allow(dead_code, unused_variables)] use std::future::Future; fn main() { println!("Hello, world"); let network = read_from_network(); let terminal = read_from_terminal(); select! { stream <- network.await => { // do something on stream } line <- terminal.await => { // do something with line } }; } async fn foo1() -> usize { println!("foo"); 0 } +async fn read_to_string(_: &str) {} +fn expensive_function(_: ()) {} fn foo2() -> impl Future<Output = usize> { async { - // let x = read_to_string("file").await; - let fut = read_to_string("file"); - let x = loop { - if let Some(result) = fut.try_check_completed() { - break result; - } else { - fut.try_make_progress(); - yield; - } - }; println!("foo1"); read_to_string("file1").await; println!("foo1"); read_to_string("file2").await; println!("foo1"); let x = /* waiting on */ read_to_string("file3").await; println!("foo1"); expensive_function(x); /* yield again and wait on */read_to_string("file4").await; println!("foo2"); 0 } }
目前程式碼
#![allow(dead_code, unused_variables)] use std::future::Future; fn main() { println!("Hello, world"); let network = read_from_network(); let terminal = read_from_terminal(); select! { stream <- network.await => { // do something on stream } line <- terminal.await => { // do something with line } }; } async fn foo1() -> usize { println!("foo"); 0 } async fn read_to_string(_: &str) {} fn expensive_function(_: ()) {} fn foo2() -> impl Future<Output = usize> { async { println!("foo1"); read_to_string("file1").await; println!("foo1"); read_to_string("file2").await; println!("foo1"); let x = /* waiting on */ read_to_string("file3").await; println!("foo1"); expensive_function(x); /* yield again and wait on */read_to_string("file4").await; println!("foo2"); 0 } }

select! 巨集增加工作 :

fn main() { println!("Hello, world"); let network = read_from_network(); let terminal = read_from_terminal(); let mut foo = foo2(); loop { select! { stream <- network.await => { // do something on stream } line <- terminal.await => { // do something with line } foo <- foo.await => { } // 如果 network 以及 terminal 都沒事做,那麼 foo2 變成 await, // foo2 就會開始執行 (Line 31) 到不能執行 (Line 32)為止,它就會 yield。 // 控制流回到 stack,在這種情況下流向 select!, // 並且選擇再次重試,看看 network 或 terminal 有沒有東西。 // 如果沒有的話,且 Line 32 已完成, // 那麼就會從 Line 32 的 yield point 繼續往下執行。 }; } } ... fn foo2() -> impl Future<Output = usize> { async { println!("foo1"); read_to_string("file1").await; println!("foo1"); read_to_string("file2").await; println!("foo1"); let x = /* waiting on */ read_to_string("file3").await; println!("foo1"); expensive_function(x); /* yield again and wait on */read_to_string("file4").await; println!("foo2"); 0 } }

現在假設一下,foo 函式開始執行一段時間,它讀取 file 1,讀取 file 2,正在讀取 file 3,但是它 yield。在那一刻,網路上有一個新的 stream。所以我們執行 Line 11,

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
無論 Line 11 發生了什麼,都會被執行,我們甚至不會再檢查 foo
Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
。因為 foo 不會再做任何事情,因為它沒有被 poll,它不會在後台執行,它是協作式調度的,它依賴於它的親代,也就是 select!,實際上繼續等待它

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
foo 不能獨立執行,擁有 foo 的人負責再次等待它
Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
。在這種情況下,如果有一個新的 stream 返回,我們最終完成執行這個 Line 10 到 Line 12 的程式碼塊,那麼在那一刻,我們退出 select,再次走訪迴圈,然後再次進行選擇。現在有新的串流嗎?假設沒有,terminal 沒有新的 line,所以現在我們繼續等待 foo。所以現在也許 file 3 完成讀取了,foo 再次等待,但這次我在讀取 file 4,所以它得以進展。然後也許 terminal 有新的 line,我們再次進入迴圈,依此類推。因此,思考這個問題的方式實際上是協作式調度的概念,如果我不執行,我會讓在我上面的人決定下一個誰執行。

Cancellation

0:34:46

如果你不希望 Future 繼續做它正在做的事情,你可以卸除它。訣竅是,如果你不等待它,它就不會做任何事情。等待是推動其進步的動力。

以下的直執行緒式碼中,你實際上沒有一種取消的方式 :

fn foo2() -> impl Future<Output = usize> { async { println!("foo1"); read_to_string("file1").await; println!("foo1"); read_to_string("file2").await; println!("foo1"); let x = /* waiting on */ read_to_string("file3").await; println!("foo1"); expensive_function(x); /* yield again and wait on */read_to_string("file4").await; println!("foo2"); 0 } }

因為一旦你再 Line 5 呼叫了 .await,你就無法再控制執行了。你告訴 Rust 的是,我不想繼續執行,直到你從這個 Future 中獲得了承諾的值。當那發生時,執行下一行。因此,你在這裡實際上沒有取消的方式。

但如果你想取消這個操作,你會做類似以下的事情 :

-fn foo2() -> impl Future<Output = usize> +fn foo2(cancel: tokio::sync::mpsc::Receiver<()>) -> impl Future<Output = usize> { async { println!("foo1"); read_to_string("file1").await; println!("foo1"); - read_to_string("file2").await; + select! { + done <- read_to_string("file2").await => { + // continue; fall-through to println below + } + cancel <- cancel.await => { + return 0; + } + } println!("foo1"); let x = /* waiting on */ read_to_string("file3").await; println!("foo1"); expensive_function(x); /* yield again and wait on */read_to_string("file4").await; println!("foo2"); 0 } }

對於 Line 9 的 :select" 另一個名稱有人提議叫 "race"。這是一種很好的思考方式。有點像 done 跟 cancel 在相互競爭,無論哪個先完成,哪個 .await 能夠首先產生其結果,就可以執行,而另一個則不行。它在這一點上被中斷了,除非你迴圈並嘗試更多。

目前程式碼
#![allow(dead_code, unused_variables)] use std::future::Future; fn main() { println!("Hello, world"); let network = read_from_network(); let terminal = read_from_terminal(); let mut foo = foo2(); loop { select! { stream <- network.await => { // do something on stream } line <- terminal.await => { // do something with line } foo <- foo.await => { } }; } } async fn foo1() -> usize { println!("foo"); 0 } async fn read_to_string(_: &str) {} fn expensive_function(_: ()) {} fn foo2(cancel: tokio::sync::mpsc::Receiver<()>) -> impl Future<Output = usize> { async { println!("foo1"); read_to_string("file1").await; println!("foo1"); select! { done <- read_to_string("file2").await => { // continue; fall-through to println below } cancel <- cancel.await => { return 0; } } println!("foo1"); let x = /* waiting on */ read_to_string("file3").await; println!("foo1"); expensive_function(x); /* yield again and wait on */read_to_string("file4").await; println!("foo2"); 0 } }

Executing futures

0:37:58

繼續講解程式碼 :

fn main() { println!("Hello, world"); let network = read_from_network(); let terminal = read_from_terminal(); let mut foo = foo2(); loop { select! { stream <- network.await => { // do something on stream } line <- terminal.await => { // do something with line } foo <- foo.await => { } }; } }

從網路讀取 (Line 4) 並等待 (Line 10) 這一步驟,最終會導致一個系統呼叫 (非常耗時)。它會向作業系統發出讀取系統呼叫,要求給我這些 bytes,然後作業系統會回應說,我沒有這些 bytes。那你要怎麼辦呢?這就是 Executor 的概念介入的地方。Executor 的基本前提是只允許你在非同步函式和非同步區塊中等待。所以這實際上不行編譯。fn main() 改為 async fn main() 時才可以編譯。

但是你會看到編譯器報錯,因為 main() 函式不允許是非同步的。因為最終在你的程式中的某個地方,你會有這個巨大的 Future,它掌握著整個程式的執行,但它是一個 Future,所以還沒有任何執行,所以你有這個描述整個應用程式控制流的頂級 Future,某件事必須執行它在一個迴圈中,必須有 Line 8 - Line 19 的那種東西,試著看它是否完成,如果它沒有完成,那麼怎麼辦?它不能 yield,因為 main 函式的上面沒有東西,如果它不能取得進展,因為 main 函式不能 yueld,那該怎麼辦 ? 你可以想像它只是在迴圈中 spin,但在實際操作中,這並不是我們想要做的事情。因此,一個原始的 Executor,就是一個僅僅在迴圈中 poll Future,沒有其他操作的東西。它只是持續地積極地重試。

實際上,在實踐中,Executor crate (tokio 就是 Executor crate 的一個例子) 提供了最低層的資源,如網路 socket 和 timer,同時也提供了頂層的 Executor 迴圈。這兩者在幕後是相互連接的。

想像一下,如果你從網路中讀取並呼叫 .await。實際上會發生的是,tokio 的 TCP stream 將從網路中讀取,意識到我現在無法讀取,然後它將其 file descriptor 儲存起來,將 socket ID 儲存到 Executor 的內部狀態中,並說,下次你去睡覺時,注意這個 file descriptor 是否改變了其狀態。我會告訴作業系統,如果這個 file descriptor 的任何事情發生變化,請喚醒你。然後在某個時候,當我們一直 yield 直到返回到 main 函式、返回到主 Executor,而不是只是在迴圈中 spin,它會將所有它知道需要等待的資源都發送到作業系統,並說,讓我睡覺,但是如果這些中的任何一個發生變化,請喚醒我。如果其中任何一個的狀態發生了變化,請喚醒我,因為我需要做更多的工作。這種外部 Executor 迴圈在不同的作業系統上有不同的實作 (Linux : epollm Mac : kqueue),以最佳地利用底層機制。

實際上,在實踐中,當你使用 tokio 時,你所做的是類似於這樣的事情,它允許你寫 async fn main() :

#[tokio::main] async fn main() { ... }

但實際上,這只是一個程序式巨集,它稍微重組了你的程式碼,將它轉換為 fn main(),然後執行以下操作:

fn main() { let runtime = tokio::runtime::Runtime::new(); runtime.block_on(async { println!("Hello, world"); let network = read_from_network(); let terminal = read_from_terminal(); let mut foo = foo2(); ... }); }

因此,你的 main 不是非同步的,即使你寫的是 async fn main,程序式巨集將其轉換回一個 non-async 的 fn main(),因為當 Linux 執行你的二進位檔時,它只需要一個要函式去呼叫一個常規函式,Lunux 對 Rust 的非同步沒有任何了解。然後在 main 中所做的事情是創建一個 runtime,這是一個 Executor,這個大迴圈會試圖 poll 經過 block_on 的 Future。如果無法在該 Future 上取得進展,就會轉向作業系統並說,如果這些事情中的任何一個發生變化,就叫醒我,然後迴圈直到未來 resolve 完成,直到未來沒有更多的任務可執行。

繼續觀察無窮迴圈 :

fn main() { let runtime = tokio::runtime::Runtime::new(); runtime.block_on(async { ... loop { select! { stream <- network.await => { // do something on stream } line <- terminal.await => { // do something with line } foo <- foo.await => { } }; } }); }

這個非同步永遠不會完成,它永遠不會 resolve 為最終值。

但是你可以想像一下,例如,當我們從 terminal 獲取某些內容時,我們就會 break :

fn main() { let runtime = tokio::runtime::Runtime::new(); runtime.block_on(async { ... loop { select! { stream <- network.await => { // do something on stream } line <- terminal.await => { // do something with line + break; } foo <- foo.await => { } }; } }); }

這意味著當有人在 terminal 上寫入內容時,該迴圈就會退出。我們將放棄 network Future,不再做更多的工作。我們將放棄 foo Future 並且不再做更多的工作。此時,我們已到達非同步區塊 (Line 3 - Line 18)的末端。所以這個非同步塊 resolve 為一個 (),block_on 也就完成了,因為給定的 Future 已經 resolved。在那一點上,我們處於執行的結尾,然後 main 退出。這就是這些特性首先如何執行的較高級別流程。

Q : I'm pretty new to this. I only know about how the JS event loop works. Where would event loop fit in this? Like a comparison
A: 你可以將外部 Executor 迴圈視為 event loop。它的工作是繼續執行可用的 Futures,直到沒有更多的 Futures 可以執行。到那時,一切就完成了,
現在,JavaScript 中的 event loop 和這裡的區別在於你可以選擇自己的 event loop。你不必使用 tokio。事實上,tokio 有多個 runtime 的 variants 供你選擇使用。而且可以說,你也可以編寫自己的 event loop。這個 loop { select } 就是一個 event loop。它是你在更大的 event loop 上下文中的自己的 event loop。

Q : You talked about the futures yielding. How does this yielding look like in code?
A : 一般來說,你不需要自己擔心 yield。每當你使用 .await,implicitly,將 .await 想像成執行 yield 的語法糖。你實際上不能直接在你自己的程式碼中使用 yield 機制。如果你試圖自己實作一個 Future,比如通過實作該 trait,你就沒有使用 yield keyword 的權限。相反,你必須基本上手動實作狀態機器,這就是在 async await 出現之前 Rust 中的未來生態系統的狀況。相信我,這樣做更好。但一般來說,現在很少需要編寫自己的 Future 實作。

Q : What is the best way to pass data to now tokio::spawn and retrieve? What are the best practices to handle errors in async? When we call tokio::spawn?
A : 等等會談到。

Q : why is [tokio::main] a macro and not a simple function?

#[tokio::main] async fn main() { ... }

A : 因為很好寫。

Q : so does tokio re-implement kqueue sort of thing? or libuv
A : tokio 使用的是 Mayo。 Mayo 是一個對 kqueue、epoll 進行抽象的 crate。基本上,它提供了類似於 libUV 的東西,基本上是一個作業系統 event loop,或者甚至不是 event loop,而是事件註冊。你可以說,我想睡覺,直到發生其中任何一個事件。然後你就進入睡眠狀態。然後作業系統,通過 Mayo 選擇的任何機制,當其中一個事件發生時,將喚醒你,你就可以取得進展。

現在,你可能會想 :

fn foo2(cancel: tokio::sync::mpsc::Receiver<()>) -> impl Future<Output = usize> { async { println!("foo1"); read_to_string("file1").await; println!("foo1"); select! { done <- read_to_string("file2").await => { // continue; fall-through to println below } cancel <- cancel.await => { return 0; } } println!("foo1"); let x = /* waiting on */ read_to_string("file3").await; println!("foo1"); expensive_function(x); /* yield again and wait on */read_to_string("file4").await; println!("foo2"); 0 } }

如果我們在 Line 11 等待 cancel,cancel 是一個 channel Receiver (Line 1)。如果你在 channel receiver 上等待,就沒有 file descriptor 可以給作業系統。作業系統對於像 Receiver 這樣的東西一無所知。在這些情況下,底層可能會有更多的事情在進行中。

實際上,你應該相信 Executor 知道如何喚醒自己的 Future。在這種情況下,如果你使用 tokio runtime 時,並使用 tokio channel,它知道如何處理。事實上,在這裡使用的部件在很大程度上是執行時獨立的,例如,你可以使用一個叫做 futures 的 crate,它也有很多為 futures 提供的實用機制。Jon 認為它們之中有一個是 channel。你可以使用 futures channel 與 tokio,因為它們使用的是 Rust 語言提供的相同的底層機制。因此,Executor 必須包含處理非基於作業系統的事件的機制。

Q : so in select! it tries to run network.await and if nothing happens then it tries terminal.await and if nothing happens then it tries foo.await? and if network.await runs and someth…
A : select! 的工作方式是在所有給定選項中進行選擇。它不記得過去執行過的任何內容。實際上,如果你想重新執行某些操作,你可以這樣做 :

fn main() { let runtime = tokio::runtime::Runtime::new(); runtime.block_on(async { println!("Hello, world"); - let network = read_from_network(); + let mut network = read_from_network(); - let terminal = read_from_terminal(); + let mut terminal = read_from_terminal(); let mut foo = foo2(); loop { select! { - stream <- network.await => { + stream <- (&mut network).await => { // do something on stream } - line <- terminal.await => { + line <- (&mut terminal).await => { // do something with line } - foo <- foo.await => { + foo <- (&mut foo).await => { } }; } }); }

borrow checker 會對此發出警告。如果我實際上匯入了 select! 巨集,它會告訴我必須這樣做。因為否則,在迴圈的第一次走訪時,network 的所有權會被傳遞給了 await。因此,在下一次迴圈時,borrow checker 會認為 network 已經被移動,你不能再次使用它。因此,在實際操作中,你會做一些像這樣的事情,這允許它在迴圈的多次迭代中被重用。但是,是的,它會考慮所有情況。它並不會記住過去的嘗試。

Select arms with side-effects

0:50:50

Q : I think it's the same question as above, but what happens if an abandoned select!-arm has side effects?
A : 複製檔案為例子 :

fn main() { let runtime = tokio::runtime::Runtime::new(); runtime.block_on(async { println!("Hello, world"); let mut network = read_from_network(); let mut terminal = read_from_terminal(); let mut foo = foo2(); // 使用了 tokio::fs 而不是 std::fs。 // 這是因為如果你對 std::fs 進行操作,它沒有 await,也沒有 async 函式, // 因為標準函式庫一般不定義 async 函式,因為它們依賴於與 Executor 的集成, // 你需要這個集成才能獲得協作調度和我們談到的智能喚醒。 // 因此,你實際上需要使用 IO 資源的非同步版本。 let mut f1 = tokio::fs::File::open("foo"); // async read let mut f2 = tokio::fs::File::create("bar"); // async write let copy = tokio::io::copy(&mut f1, &mut f2); select! { stream <- (&mut network).await => { // do something on stream } line <- (&mut terminal).await => { // do something with line } foo <- (&mut foo).await => { } _ <- copy.await }; // 假設 select 檢查 Line 20 沒事發生,Line 23 沒事發生,Line 26 沒事發生, // 接著 Line 28 寫入 MB,它必須等待磁碟,它不能取得進展,也還沒完成寫入, // 接著回到 Line 20,假設 stream 完成了,我們會離開 select! // _some_ bytes have been copied from foo to bar, but not all // copy.await // 你可以這麼做,但很容易忘記 }); }

它在執行過程中達到了某個 yield point,但這意味著 Future 的尾部還沒有被執行。在這一點上,如果你丟棄了 Future,它就無法完成。它無法完成其餘的工作。它在那一刻就被終止了。因此,現在你需要考慮到你的程式可能處於中間狀態。這是一個特定的情況,我不想稱之為問題,但這只會真正影響到 select!。因為如果你考慮到以下情況 :

fn foo2(cancel: tokio::sync::mpsc::Receiver<()>) -> impl Future<Output = usize> { async { ... println!("foo1"); let x = /* waiting on */ read_to_string("file3").await; println!("foo1"); expensive_function(x); /* yield again and wait on */read_to_string("file4").await; println!("foo2"); 0 } }

它實際上不可能發生 select! 的那種情形,你沒有辦法在操作中途取消操作。事實上,唯一一種使得這個操作或者你在其上呼叫 .await 的任何操作不會被完成、會被中斷的方式是,如果鏈上有一個 select。因此,一般來說,當你寫 select! 時,你必須小心考慮,如果一個分支執行了,然後另一個分支完成了,那會發生什麼。因此,第一個分支並沒有執行到完成,但第二個分支卻完成了。那麼,這對你來說意味著什麼?這是你在使用 select! 時需要關注的一個錯誤案例。

Cooperative scheduling

0:55:35

Q : As you present it the executor is making the assumption that the future are not "greedy" : how do you avoid a future not taking all of the compute time and grind the all async to a halt?
A : 這也是一個很好的觀察,我很高興你提出了這一點,因為這意味著人們理解了這裡的思維模式,即這是協作調度的,這意味著其他事情可以執行,只要正在執行的事情偶爾 yield。如果一個 Future 只是一個忙碌的 spin 迴圈,或者使用標準 I/O 並且呼叫像是在一個巨大的東西上進行讀取,或者在一個永遠不會給出任何 byte 的網路 socket 上進行讀取,那麼該執行緒將被 block。

如果使用的是非同步 TCP I/O stream,那麼如果無法完成,讀取操作將會 yield。但是,如果使用的是標準 TCP I/O stream,它不知道任何有關非同步的訊息,當你進行讀取時,它的實作是 block 執行緒,不做其他事情,絕對不會 yield,只是 block 直到讀取完成,這可能永遠不會發生,也許這個特定的 TCP channel 永遠不會再發出任何 byte。然後你就陷入了困境。

這是一個非常糟糕的情況,因為現在你的其他未來都無法取得任何進展。它們不再被 poll,因為其他 Future 無法執行,因為正在執行的事情,正在 block執行者的事情不會 yield。因此,非常重要的是你要使用這個,你要投入到這個協作調度的世界中。這也是為什麼在非同步上下文中使用 block 操作時,你必須非常小心地使用不知道非同步產生的 block 操作或計算密集型操作。

有一些機制可以改善這種情況,例如 : Function tokio::task::spawn_blocking 以及 Function tokio::task::block_in_place。如果你在非同步上下文中,需要執行一些你知道可能會 block 很長時間的操作,無論是因為計算還是因為進行系統呼叫等,你可以使用這些方法告訴 Executor,嘿,我即將 block,確保其他任務得以執行。它們的工作原理本次直播不會完全探討,這有點太技術性了。但建議你閱讀這些方法的文件,以了解它們之間的區別和各自的優缺點。但是有一些方法可以發出 signal,表明你將要 block 並且 Executor 需要採取適當的步驟來減少問題。

Selects with many arms

0:58:38

Q : does that mean that select with a huge number of cases will potentially be slowed down by trying all options out every time?
A : 你可能會這樣認為。答案是取決於 select! 的實作方式。一般來說,如果你有一個有百萬個 case 的 select,那似乎是一個問題。但是,考慮到 select 實際上強迫你將它們寫出來,這似乎在第一次出現時就不太可能。

現在,你可能會擁有如此龐大的 select,例如,你可以想像你有一個程式碼產生 pipeline。在這種情況下,大多數 select 實作都是針對少數情況而不是許多情況進行最佳化的。但是,select 可能是智能的。智能的意思是只 poll 可能取得進展的那些,我們不會深入探討它們如何做到這一點,但基本上有一種方式讓 select 能夠與 Rust 處理 Future 和喚醒的機制進行集成。當 Future 通過某種機制變得可執行,比如一個 file descriptor 已經就緒,或者在記憶體通道上發送了一個訊息,有一種方式讓 select 巨集能夠注入自身到這個 Future 已經就緒的通知中,並在發生時得到更新自身狀態的 signal。

因此,你可以想像,select 在其所有分支上保持著幾乎像 bit mask 一樣的狀態。當通知到來時,它會為相應的分支翻轉 bit。然後,在下一次 select 到來或下一次等待 select 時,它只會檢查那些位已經設置的分支。在實踐中,一般而言,select 不會這樣做,因為這涉及到一堆機制,而大多數 select 只有少數分支。如果你有許多分支,你可以想像寫一個 select 巨集只在有多個分支時才使用這個技巧。但目前沒有哪個實作這樣做。

Q : is select! macro fair? As in can it happen that only one branch will run forever (in theory). Also this stream is about async/await and not hazard pointers right?
A : 這取決於實作。看一下 futures crate :

select Polls multiple futures and streams simultaneously, executing the branch for the future that finishes first. If multiple futures are ready, one will be pseudo-randomly selected at runtime. Futures directly passed to select! must be Unpin and implement FusedFuture.

select_biased Polls multiple futures and streams simultaneously, executing the branch for the future that finishes first. Unlike select!, if multiple futures are ready, one will be selected in order of declaration. Futures directly passed to select_biased! must be Unpin and implement FusedFuture.

Fused futures

1:01:51

Q : when do you use fuse with select?
A :

Fuse 是一種用於表示安全地 poll Future 的方式,安全地等待一個 Future,即使這個 Future 在過去已經完成過。看到以下的例子 :

fn main() { let runtime = tokio::runtime::Runtime::new(); runtime.block_on(async { ... loop { select! { stream <- (&mut network).await => { // do something on stream } line <- (&mut terminal).await => { // do something with line } foo <- (&mut foo).await => { } _ <- copy.await }; } }); }

當我們進行選擇時,假設網路和 terminal 都準備就緒。所以 select 會檢查它們兩個。並且它們兩個都說:「我準備好了。」那麼 select 會怎麼做呢?實際上,它不會同時 poll 它們兩個,而是會依次 poll。所以它先 poll 網路,網路說:「我完成了,這是值。」然後它會說:「太好了,謝謝,我要執行這個分支。」我們再次進入迴圈。現在 select 仍然包括這個分支,即使這個 Future 已經完成了。但是 select 不會記住它已經完成,也不會記住它不需要再次檢查這個分支,因為 select 只知道被呼叫了一次。迴圈不是 select 的一部分。因此,即使它已經產生了值,這個 Future 需要再次安全地 poll,而這就是「Fuse」的描述。

Q : How would you continue a half-completed-then-abandoned arm? You can await the same future after the select!?
A : 你可以在選擇後等待相同的 Future。這取決於你如何在其上進行選擇,但如果你像 Line 7 進行選擇 :

fn main() { let runtime = tokio::runtime::Runtime::new(); runtime.block_on(async { ... loop { select! { stream <- (&mut network).await => { // do something on stream } line <- (&mut terminal).await => { // do something with line } foo <- (&mut foo).await => { } _ <- copy.await }; } }); }

那麼這將等待該 Future 的 mutable borrow。因此,在迴圈之後,你仍然可以等待該值,這是一般情況下,你將希望選擇 Future 的 mutable 參考而不是 Future 本身的原因之一,因為如果你這樣做,那只有在這不是一個迴圈的情況下才能正常工作 :

fn main() { let runtime = tokio::runtime::Runtime::new(); runtime.block_on(async { ... select! { stream <- network.await => { // do something on stream } line <- terminal.await => { // do something with line } foo <- foo.await => { } _ <- copy.await }; }); }

那麼網路將移入 select 中的 Line 6 的 await 中,並且你將無法稍後 await 它,它將在 select 結束時被卸除。但是,如果你只是等待對其的 mutable 參考,那麼該 mutable 參考會在迴圈結束時結束,因此你稍後將能夠再次 await 它。

Overhead of async

1:04:35

Q : How bad of a performance hit is to use async when you don't call any async stuff down the line?
A : 非同步並不會為執行程式碼增加任何開銷。假設你有矩陣乘法的函式,並在前面加了 async keyword :

async fn matrix_multiply() {}

實際上,執行的程式碼還是一樣的。只是它被包裝在 Future 型別中,你需要 await 以獲取結果。但 await 不會做任何事情,它不會以任何有意義的方式改變產生的程式碼。因此,將某些東西標記為非同步並不會增加任何開銷。

也許你問的類似問題是 asynchronous I/O 讀取與 synchronous I/O 讀取之間的開銷是多少?確實有一些開銷,因為現在你需要與 Executor 機制進行結合,需要做更多的工作。但一般來說,額外的系統呼叫會在所有 Future 或所有資源上分攤。因此,實際上並不會增加太多開銷。通常從不需要產生成千上萬個執行緒或更現實地說是數十萬個執行緒的好處,通常 Executor 執行只需較少的執行緒,然後由它們進行協作調度,往往會更快。

這其中有幾個原因。一個常見的原因是你不需要經常跨越作業系統邊界。如果有很多執行緒,實際的作業系統執行緒,如果一個執行緒必須在讀取時 block,那麼進行系統呼叫,作業系統必須切換到不同的執行緒,這是不免費的,然後執行該執行緒。使用非同步,如果讀取失敗,作業系統返回到相同的執行緒並說,我無法取得任何進展。它 yield 控制權, Executor 在同一執行緒上繼續執行,因此不存在上下文切換,然後只是 poll 不同的 Future,因此,一般來說,這最終會更有效率。

實際上,很難說哪一種是客觀上更快的,因為它真的取決於你的用例,但總的來說,如果你正在進行的工作是 I/O bound,比如網路伺服器,非同步可能會導致更有效地使用你的資源。而且,非同步的程式碼更容易閱讀,也許不一定更容易推理,但更容易閱讀。

Q : Are these like protothreads? I remember using that in a embedded OS
A : 是的,可以將它們視為 userspace 的執行緒。

Is select a future?

1:07:56

Q : what would select! return? Another future to be awaited?
A : select 會為你產生一個 Future,然後對該 Future 呼叫 await。一般來說,你也可以手動建構 select,這通常會更麻煩一些,但你可以為 block 自己建構 select Future,這並不完全正確。select 擴展基本上是一個帶有 match 或者是一堆 if 的迴圈,它實際上並不產生一個 Future,但你可以通過將其包裝在 async 區塊中並將其分配給變數來使其成為 Future。select 實際上並不擴展為一種型別,它擴展為執行適當操作使其工作的 Rust 程式碼。但如果你希望的話,可以將其製作為 Future,不過它除了 Future 之外並不會有實質的回傳值。

Awaiting multiple futures

1:09:20

所以我們談到了 select 是一種 branch 控制流的方式,它可以說,做這個或做那個,哪一個先完成就採取哪個。另一種操作是 join,它是說,等到所有這些 future 完成。舉例來說,假設你想要讀取 3 個檔案 :

fn main() { let runtime = tokio::runtime::Runtime::new(); runtime.block_on(async { ... let files = (0..10).map(|i| tokio::fs::File::read_to_string(format!("file{}", i))); // 在繼續執行我的程式之前,我想等待它們全部完成。 // 想像一下,我可能在串接它們,或者在所有的 bytes 上計算 hash 值之類的操作。 // 所以在這種情況下,就會使用 join 操作。 }); }

使用 Macro futures::join,這種 join 只適合少數 I/O 操作,因為你要列舉所有名稱 :

fn main() { let runtime = tokio::runtime::Runtime::new(); runtime.block_on(async { ... let files: Vec<_> = (0..3) .map(|i| tokio::fs::File::read_to_string(format!("file{}", i))) .collect(); // compare (sequential 讀取檔案) // 這種讀取方法,作業系統只能就一個一個讀。 let file1 = files[0].await; let file2 = files[1].await; let file3 = files[2].await; // to this (並行讀取檔案) // 這種讀取方法,作業系統才有資源調度的空間, let (file1, file2, file3) = join!(files[0], files[1], files[2]); }); }

看到 try_join_all 的函式簽章 :

pub fn try_join_all<I>(iter: I) -> TryJoinAll<<I as IntoIterator>::Item> where I: IntoIterator, <I as IntoIterator>::Item: TryFuture, // TryFuture 的輸出為 Result

try_join_all 將嘗試 join 給定迭代器中的所有內容。join space 之所以有點奇怪是因為一般來說,你可能關心輸出結果順序這一事實,你可以映射回輸入順序 :

fn main() { let runtime = tokio::runtime::Runtime::new(); runtime.block_on(async { ... let files: Vec<_> = (0..3) .map(|i| tokio::fs::File::read_to_string(format!("file{}", i))) .collect(); // compare let file1 = files[0].await; let file2 = files[1].await; let file3 = files[2].await; // to this let file_bytes = try_join_all!(files); // 我想確保 file_bytes == files[0], // 事實上,try_join_all 會幫你保證順序不變,即便它們 out-of-order 完成, // 最後它會幫你重排,但重排並不是免費的 ! }); }
目前程式碼
#![allow(dead_code, unused_variables)] use std::future::Future; fn main() { let runtime = tokio::runtime::Runtime::new(); runtime.block_on(async { println!("Hello, world"); let mut network = read_from_network(); let mut terminal = read_from_terminal(); let mut foo = foo2(); loop { select! { stream <- (&mut network).await => { // do something on stream } line <- (&mut terminal).await => { // do something with line } foo <- (&mut foo).await => { } _ <- copy.await }; let files: Vec<_> = (0..3) .map(|i| tokio::fs::File::read_to_string(format!("file{}", i))) .collect(); // compare let file1 = files[0].await; let file2 = files[1].await; let file3 = files[2].await; // to this let file_bytes = join_all!(files); } }); } async fn foo1() -> usize { println!("foo"); 0 } async fn read_to_string(_: &str) {} fn expensive_function(_: ()) {} fn foo2(cancel: tokio::sync::mpsc::Receiver<()>) -> impl Future<Output = usize> { async { println!("foo1"); read_to_string("file1").await; println!("foo1"); select! { done <- read_to_string("file2").await => { // continue; fall-through to println below } cancel <- cancel.await => { return 0; } } println!("foo1"); let x = /* waiting on */ read_to_string("file3").await; println!("foo1"); expensive_function(x); /* yield again and wait on */read_to_string("file4").await; println!("foo2"); 0 } }

如果不想因為重排順序而損失效能,可以使用 Struct futures::stream::FuturesUnordered :

pub fn new() -> FuturesUnordered<Fut> pub fn push(&self, future: Fut) impl<'a, Fut> IntoIterator for &'a FuturesUnordered<Fut> where Fut: Unpin,

這種結構的使用情境是,結果包含你關心的所有訊息,並且你不一定關心輸入,因為輸出描述了它來自哪個輸入。這樣可以更有效率。

Q : So the multiple await is cascading the same as it would in JS pretty much?
A : 是的,sequential 讀取檔案也的 pseudo 語法也可以這樣寫 :

files[0].then(files[1]).then(files[2])

Q : and join is kinda Promise.all
A : 你可以這樣想。

Q : there was also join_all
A : 是的,如下

-let file_bytes = try_join_all!(files); +let file_bytes = join_all!(files);

前面提到過 select 不一定聰明地確保只檢查可能準備好的 Future,但 join 是的,因為它知道你可能會加入很多 Future。比如說,想像你正在下載 cargo 專案的所有依賴項,可能會有成千上萬個,你想要平行下載它們,或者至少某些子集平行下載,然後有很多分支,你希望確保你不必每次都檢查所有的 Future,你只想檢查那些確實已經取得進展的 Future。一般來說,像 FuturesUnordered 這樣的聯合操作將在執行時系統中實現這個類似 hook 的功能,以確保它只檢查那些已經收到了作業系統事件的 ready 狀態的 future。

Q : join_all and try_join_all now actually use FuturesOrdered under the hood, as of the latest release (yesterday), so it's very efficient
A : 是的,join_alltry_join_all 在內部使用的是 FuturesOrdered,而不是 FuturesUnordered。

Parallelism and spawning

1:19:17

join 很棒,select 很棒,但它們只是允許事情並行執行。這並不意味著它們可以平行執行,這是一個重要的區別,在處理 Future 時通常會被忽視。

假設 runtime 只有一個執行緒。因此,當你呼叫 block_on 時,你給它一個 Future。它不知道該 Future 包含了一堆其他 Futures,它只知道有一個,像 runtime.block_on() 函式,是 tokio 中一個型別的方法,它只得到一個 Future,它可以 await 該 Future,它它不能查看內部的程式碼。它不知道其中是否有 await, join 或 FuturesOrdered 之類的東西。它只得到一個 Future。

因此,它唯一能做的就是執行該 Future,嘗試看看該 Future 是否能完成,如果不能,那麼就發出系統呼叫,進入睡眠狀態,等待事件發生。然後嘗試在返回時在該 Future 上取得進展,當它這樣做時,該 Future 內部檢查其內部的 Futures,可能位於 stack 下方,或者,如果它是一個 select,它會檢查所有包含的 Futures。但最終,在頂層只有一個執行進入點。

這有點不幸,因為這意味著只有一個 Future,擁有多個執行緒是毫無好處的,因為其他執行緒沒有其他事情可做,沒有其他的 Future,只有一個。因此,即使你有 10 個執行緒,仍然只有一個 Future。因此,只有一個執行緒可以在任何時候等待該 Future,因為等待一個 Future 需要對它進行 exclusive 參考,多個執行緒無法這樣做。

即使多個執行緒能夠這樣做,也沒有意義,因為只有一個程式碼片段,你不希望多次執行相同的程式碼。這意味著,即使你進行類似 join 的操作,你正在並行地進行所有操作,它們仍然在一個執行緒上同時進行,這意味著這可能實際上並不是你想要的。

假設你正在編寫一個 web server,你有一個迴圈來接受 TCP 連線。對於每個 TCP 連線,你都會得到一個處理該連線的 Future。你將它放入類似於 FuturesUnordered 的結構中,然後使用 await 在 FuturesUnordered 上。

直接看到以下 Tcp 的例子的問題 :

fn main() { let runtime = tokio::runtime::Runtime::new(); runtime.block_on(async { println!("Hello, world"); let mut accept = tokio::net::TcpListener::bind("0.0.0.0:8000"); let mut connections = futures:future::FuturesUnordered::new(); loop { select! { stream <- (&mut accept).await => { connection.push(handle_connection(stream)); } _ <- (&mut connection).await => {} } } } } async fn handle_connection(_: TcpStream) { todo!()}

我們需要在 Line 14 這個 branch 上 await 所有的 Futures,因為除非 Future 被 await,否則 Futures 不會執行,所以如果我們沒有 Line 14,這意味著沒有任何東西在等待 FuturesUnordered,這意味著沒有任何東西在等待裡面的 Futures,這意味著沒有任何東西正在等待任何 client 連線,這意味著沒有任何 client 連線正在被服務。所以我們確實需要等待連線,但我們也想看看是否有新的連線進來。所以我們需要 Line 11 等待 accept,這就是為什麼我們需要 select!。你可以把這個 FuturesUnordered 想像成一個 join,基本上,我希望所有這些都並行執行。這個程式碼不能編譯,因為這裡 Line 13 跟 Line 14 的 mutable 參考被同時使用,但我們先忽略這一點。

問題 : block_on 的區塊仍只有一個 top level Future,這意味著,即使執行時擁有與 CPU 核心數相同的執行緒,一次只有一個執行緒能執行。它將在所有不同的連線之間進行 multiplex,但想像一下,如果有 100,000 個連線,那麼這個執行緒將完全忙於處理所有這些連線。它沒有浪費任何時間,它不會休眠或其他任何事情,它在工作,只是工作量比它自己能處理的要多,但是所有其他執行緒無法幫助。

因為只有一個 Future,所以你可以解決這個問題並引入平行,而不僅僅是並行 :

fn main() { let runtime = tokio::runtime::Runtime::new(); runtime.block_on(async { println!("Hello, world"); let mut accept = tokio::net::TcpListener::bind("0.0.0.0:8000"); - let mut connections = futures:future::FuturesUnordered::new(); - loop { + while let Ok(stream) = accept.wait { + tokio::spawn(handle_connection(stream)); - select! { - stream <- (&mut accept).await => { - connection.push(handle_connection(stream)); } - _ <- (&mut connection).await => {} - } } } } async fn handle_connection(_: TcpStream) { todo!()}

spawn 方法提供了一個介面,可以將 Future 的任務交給 Executor,無論該的任務交給 Executor 是什麼。你在 Line 11 給它一個 Future,它將整個 Future move 到 Executor。這就好像你將它交給了 runtime 時。所以現在 runtime 有兩個 Future。它有 Line 4 的 Future,還有 Line 11 的 Future,這意味著有兩個獨立的 Future,這意味著兩個執行緒可以同時執行它們。使用這個 spawn 方法,如果一個執行緒正在忙於處理接收連線,那麼另一個執行緒可以處理特定連線的 Future。

Line 11 不是 thread spawn。這些執行緒由 runtime 管理,並且有一個“固定”的數量。這就是為什麼 spawn 通常要求你傳入的未來是 Send 的,因為否則它就無法發送到另一個執行緒上進行處理。雖然有 spawn local,但我們這裡不會深入討論。

目前程式碼
#![allow(dead_code, unused_variables)] use std::future::Future; fn main() { let runtime = tokio::runtime::Runtime::new(); runtime.block_on(async { println!("Hello, world"); let mut accept = tokio::net::TcpListener::bind("0.0.0.0:8000"); while let Ok(stream) = accept.wait { tokio::spawn(handle_connection(stream)); } }); } async fn handle_connection(_: TcpStream) { todo!()} async fn foo1() -> usize { println!("foo"); 0 } async fn read_to_string(_: &str) {} fn expensive_function(_: ()) {} fn foo2(cancel: tokio::sync::mpsc::Receiver<()>) -> impl Future<Output = usize> { async { println!("foo1"); read_to_string("file1").await; println!("foo1"); select! { done <- read_to_string("file2").await => { // continue; fall-through to println below } cancel <- cancel.await => { return 0; } } println!("foo1"); let x = /* waiting on */ read_to_string("file3").await; println!("foo1"); expensive_function(x); /* yield again and wait on */read_to_string("file4").await; println!("foo2"); 0 } }

一般來說,spawn 也要求你傳入的 Future 是 static 的 :

async fn handle_connection(_: TcpStream) { tokio::spawn(async { // whatever }); }

因為它不知道 runtime 的生命周期。事實上,async handle_connection 函式這個外部的非同步部分可能完成,但是 Line 3 spawn 的非同步 Future 仍然需要執行,因此它需要具有 static 的生命周期。它不能與 handle_connection 的生命周期相關聯。

想像你有以下變數 :

async fn handle_connection(_: TcpStream) { + let x = vec![]; tokio::spawn(async { + &x // whatever }); }

如果 handle_connection 返回,但 Line 3 的 Future 仍然嘗試執行,x 將被卸除,但 Line 3 的 Future 有對 x 的參考,因此這是不合法的。這就是為什麼 spawn 需要 static 的原因。

這就是在非同步程式中引入平行的方式,你需要將可以並行執行的 Future 與 Executor 進行通訊。可能 Executor 沒有那麼多執行緒。例如,對於 tokio runtime,你可以將工作執行緒設置為一個,這樣就只有一個執行緒。因此,這些 Futures 是否可以平行執行並不重要,因為只有一個執行緒,所以沒有平行。但通常情況下,你會使用這種模式,以便這些 Futures 不僅可以在一個執行緒上並行執行,還可以在多個執行緒上平行執行

這就是為什麼要記住要執行 spawn 的原因,通常當不太熟悉非同步 await 的人開始編寫非同步 await 時,他們發現他們的程式性能大幅下降,這是因為他們沒有執行 spawn。因此,他們的整個應用程式都在一個執行緒上執行。當整個應用程式都在一個執行緒上執行時,當然會比有多個執行緒執行的時候慢,因為沒有任何東西可以平行執行。

Sharing across spawn

1:30:37

Q : What is the best way to pass data to now tokio::spawn and retrieve? What are the best practices to handle errors in async when we call tokio::spawn? < :)
A : spawn 有點奇怪,因為就像執行緒的 spawn 一樣,你不能 implicitly 與另一個東西通訊。它只是在別處執行,你對它沒有控制。因此,你需要應用與多執行緒程式相同的技術,如果你想在這個東西和那個東西之間共享資料,或者假設我有兩個我想要執行的東西,我希望它們共享對某個 vector 的存取權限,需要透過 Arc 溝通 (或者 channel) :

async fn handle_connection(_: TcpStream) { let x = Arc::new(Mutex::new(vec![])); let x1 = Arc::clone(&x); tokio::spawn(async move { x1.lock() // whatever }); let x2 = Arc::clone(&x); tokio::spawn(async move { x2.lock() // whatever }); }

就像你擁有與在多執行緒程式中一樣可用的所有相同技術,你應該以同樣的方式思考它。

有一個例外,那就是如果你有一個,如果你產生了一些東西,並且你想將其結果傳達給 spawn 它的東西,以 tokio 為例,有一個 join_handle :

async fn handle_connection(_: TcpStream) { let x = Arc::new(Mutex::new(vec![])); let x1 = Arc::clone(&x); let join_handle = tokio::spawn(async move { x1.lock() // whatever 0 // ends with zero }); assert_eq!(join_handle.await, 0); let x2 = Arc::clone(&x); tokio::spawn(async move { x2.lock() // whatever }); }

如果你不等待 join_handle,而是只是卸除它,這就像卸除 thread_handle 一樣,它不會終止執行緒或其他任何操作,你只是無法獲取其結果值。這是將生成的操作的結果通知給 caller 的一種方法。

需要記住的一點是,如果你執行 spawn,就像執行緒 spwan 一樣,如果你 spawn Future,並且裡面發生錯誤 :

async fn handle_connection(_: TcpStream) { let x = Arc::new(Mutex::new(vec![])); let x1 = Arc::clone(&x); let join_handle = tokio::spawn(async move { x1.lock() + let x: Result<_, _> = definitely.errors(); // whatever 0 // ends with zero }); assert_eq!(join_handle.await, 0); let x2 = Arc::clone(&x); tokio::spawn(async move { x2.lock() // whatever }); }

對於錯誤該怎麼處理呢?你沒有一個地方可以必然地將它傳達。因為你沒有一個方法與 caller 溝通。

你可以將錯誤輸出到標準輸出,或者記錄到文件中,也可以使用一些日誌框架(如tracing)來發出錯誤事件,然後在其他地方進行處理。一般來說,Jon 更傾向於這種方法,即如果遇到無法進一步傳播的錯誤,則使用事件分發工具(如tracing)來解耦事件的產生和訂閱,這是一個不錯的方法。

Q : is there any benefit on calling tokio::spawn and immediately awaiting on it?
A : 這確實有點不太常見。這樣做的好處是,你可以讓當前直營續上的其他操作繼續執行,而不必等待該操作在其他地方完成。
假設你有一個需要進行類似 deserialization 的操作。這個操作需要一些 CPU 資源,但仍然屬於 I/O 型別。因此,你可能希望在非同步上下文中執行它,而不是在 block 上下文中執行 :

async fn handle_connection(_: TcpStream) { let x = Arc::new(Mutex::new(vec![])); let x1 = Arc::clone(&x); let join_handle = tokio::spawn(async move { deserialize() }); join_handle.await; let x2 = Arc::clone(&x); tokio::spawn(async move { x2.lock() // whatever }); }

Line 6 會立即 yield,因為 Line 5 還在 deserialization,尚未回傳。假設它可能在 select 或 join 中。當前執行緒上的其他任務(例如,正在執行 handle_connection 的執行緒)可以繼續執行。然後,這個反序列化操作可以在不同的執行緒上執行,進行這個需要大量 CPU 資源的操作。因此,它可以並行地與其他任務一起進行,從而使整個任務可以繼續進展。這可能是一種情況,你會希望啟動然後立即等待的情況,儘管這種情況並不常見。

Runtime discovery

1:36:23

Q : How is the tokio::spawn connected to the runtime instance created above?
A : Runtime::new 只是建立一個普通值,沒有什麼特別的。block_on 會在 Executor 中設定一些特殊的執行緒區域變數。因此,當你呼叫 tokio::spawn 時,它會檢查這些執行緒區域變數,以找到當前的 runtime,然後在那裡 spawn。類似地,當 runtime 最終執行傳入的 future 時,它也會設定相同的執行緒區域變數。因此,當該 Future 呼叫 tokio::spawn 時,它可以找到 Executor 等。它不是一個 singleton,所以你可以有多個 runtime。如果在一個 runtime 的上下文中呼叫 tokio::spawn,它將在那個 runtime 上 spawn,而不是在其他的 runtime 上。

這是很有價值的。假設你有控制平面流量和資料平面流量,你希望確保控制平面消息始終被處理。因此,你可以建立兩個 runtime,一個有兩個執行緒,另一個有剩餘核心數量的執行緒。你將所有的控制平面操作都 spawn 在具有兩個執行緒的 runtime 上。你將所有的資料平面操作都 spawn 在另一個 runtime 上。然後,這兩個 runtime 都可以繼續執行。但是你知道兩個核心被保留用於控制平面流量。如果你有一個 singleton runtime,你就無法做到這一點。

你可以想像 Executor 本身支援優先順序和其他功能,這會變得有些複雜,因為它還需要與作業系統、runtime 控制和優先順序控制整合。能夠明確地做到這一點是很好的,這方面有一些優勢,儘管這可能會更難發現。

Q : What should I do if there is expensive function (e.g. hash password) that I don't want to block async execute thread
A : 這就是你使用諸如 spawn_blockingblock_in_place 之類的東西的時候。

Q : what happens if you Tokyo spawn before creating any runtime?
A: 會 panic,它會說沒有 runtime

Rust Future 不依賴於執行緒區域變數,這是一個重要的區別。在 Rust 語言或標準庫的非同步支援中,沒有任何東西依賴於執行緒區域變數。Tokio 使用執行緒區域變數,還有其他一些 Executor 也這樣做,為了使介面稍微更加友好。否則,想像一下,如果沒有執行緒區域變數。spawn 就不能是一個 free function,你必須做一些像 runtime.spawn 這樣的事情。但這意味著你現在需要在整個應用程式中傳遞 runtime 的控制程式碼,以便在 stack 的深處的任何地方都能 spawn。你可以這樣 implicitly 表達出來。事實上,tokio 允許你這麼做 :

fn main() { let runtime = tokio::runtime::Runtime::new(); + let handle= runtime.handle(); runtime.block_on(async { println!("Hello, world"); let mut accept = tokio::net::TcpListener::bind("0.0.0.0:8000"); while let Ok(stream) = accept.wait { - tokio::spawn(handle_connection(stream)); + handle.spawn(handle_connection(stream)); } }); }

這種做法如此常見,因此使用執行緒區域變數進行操作,因為這樣介面更加人性化。當然,缺點是這意味著 tokio 在嵌入式上下文中不太適用,因為在那裡可能沒有執行緒區域變數。但是,Rust 的非同步 primitives 或語言支援中並沒有這方面的要求。

Q : how much memory process will allocate then u creating runtime? will this memory reuse for different runtime same version?
A : 建立 runtime 時,並不會分配大量記憶體。它可能會有一些控制資料結構,但通常並不是很大。因此,runtime 本身並不會產生太多額外開銷。

Stack variables in async

1:42:05

Future 到底是什麼 ?

#![allow(dead_code, unused_variables)] use std::future::Future; fn main() {} async fn foo() { // chunk 1 { let x = [0; 1024]; let fut = tokio::fs::read_into("file.dat", &mut x[..]); } // fut.await; <--- edge between state 1 and state 2 yield; //really: return // chunk 2 { let n = fut.output(); println!("{:?}", x[..n]); } }

Line 8 的 x 儲存在哪裡 ? 是 stack 嗎 ? 可是執行到 Line 13 yield 之後,x 不就消失了嗎 ? 這樣 Line 9 要怎麼在 yield 的時候繼續參考 x ? 實際上,編譯器會產生有點像狀態機的 enum :

enum StateMachine { Chunk1 { x: [u8; 1024, fut: tokio::fs::ReadIntoFuture<'x>]}, // 因為 fut 保存了對 x 的參考。這是一種自我參考, // 這也是你實際上無法以任何有意義的方式自己編寫此內容的另一個原因。 Chunk2 {}, // 沒有區域變數需要保存 }

這裡的狀態實際上意味著需要跨 await 保存的任何區域變數。

以下的 z 就是不需要被保存,因為從未給出過對 vec 的參考,就像 z 永遠不會在以後的 chunk 中使用一樣 :

#![allow(dead_code, unused_variables)] use std::future::Future; fn main() {} async fn foo() { // chunk 1 { let x = [0; 1024]; + let z = vec![]; let fut = tokio::fs::read_into("file.dat", &mut x[..]); } yield; //really: return ... }

async fn 會轉成以下 :

-async fn foo() +fn foo() -> impl Future<Output = ()> /* StateMachine */ +// foo 真正回傳的是一個狀態機 { // chunk 1 { let x = [0; 1024]; let fut = tokio::fs::read_into("file.dat", &mut x[..]); } // fut.await; <--- edge between state 1 and state 2 yield; //really: return // chunk 2 { - let n = fut.output(); + let n = self.fut.output(); - println!("{:?}", x[..n]); + println!("{:?}", self.x[..n]); } }

每次我們嘗試 await 這個 Future,每次我們 continue 它,我們真正做的就是在狀態機型別中 continue 。我們在狀態機型別上呼叫一個方法,該方法會取得 exclusive 參考,以便它可以存取 Future,以便它可以 continue await,存取任何內部區域變數等。

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
impl Future 是一種型別。impl Trait 意味著一個有名字的型別,但我不會告訴你這個名字是什麼。

接著看到 foo 函式的回傳型別 :

#[tokio::main] async fn main() { let mut x: StateMachine = foo(); StateMachine::await(&mut x); // 這個 StateMachine 就保存了狀態機的值 }

如果將狀態機傳入函式,而該狀態機又存很多值,會涉及很多記憶體複製 :

#[tokio::main] async fn main() { let mut x: StateMachine = foo(); bar(x); } fn bar(_: impl Future) {}

如果 foo() 函式用到別的 Future,foo() 會傳的狀態機也要記錄該 Future 的資訊 :

fn foo() -> impl Future<Output = ()> /* StateMachine */ { // chunk 1 { let x = [0; 1024]; let fut = tokio::fs::read_into("file.dat", &mut x[..]); } // fut.await; <--- edge between state 1 and state 2 yield; //really: return // chunk 2 { let n = fut.output(); println!("{:?}", x[..n]); + some_library::execute().await; } }

如果我們有 select! 或 join!,就要儲存所有的 Futures,這都牽涉到記憶體複製,所以在做 profiling 就會看到 memcpy

想要減少記憶體複製,可以將它們裝到 Box 裡面 :

#[tokio::main] async fn main() { let mut x: StateMachine = Box::pin(foo()); bar(x); // 傳遞指向 heap 的指標,避免 StateMachine 的記憶體複製。 }

這是使用像 tokio::spawn 這樣的東西的另一個原因,因為 spawn 將該 future 放入 Executor 中,然後只保持對該 future 的指標。這就是擁有 spawn_handle 實際上是有用的原因,因為你可以儲存它,然後像這樣以下這樣做 :

fn foo() -> impl Future<Output = ()> /* StateMachine */ { ... // chunk 2 { let n = fut.output(); println!("{:?}", x[..n]); - some_library::execute().await; + tokio::spawn(some_library::execute()).await; } }

那麼現在就不必儲存 some_library::execute() 這個 Future 的狀態機,我們只需儲存指向它的指標 (這樣就可以避免狀態機的記憶體複製),然後可以等待它。

Q : wasnt that an union of with the size of the largest chunk state?
A : 可以這麼想。

Q : Why can futures assigned on the stack be moved Don't they need to be pin?
A : 我們不會過多地討論 pin,但基本上沒有什麼可以阻止你移動一個 Future。實際上,在 Rust 語言中沒有任何東西可以阻止你移動任何值。但是一旦你開始 await 一個 Future,除非它被 pin 住,否則你就不能再移動它了。但是,一個 Future 本身並沒有任何內在的特性意味著你不能移動它。

Q : Can you have an async function that internally creates a Vec of futures from itself, recursively? I know recursion requires indirection, so in other words, does putting it in a Vec count as indirection ?
A : Jon 應該能夠做到這一點。儘管你必須使用 join,而不是 vec,才能實際上 select 或 await 一組 futures。

Async fn in traits

1:55:45

async trait 沒有被支援 :

#![allow(dead_code, unused_variables)] use std::future::Future; fn main() {} struct Request; struct Response; trait Service { async fn call(_: Request) -> Response; // de-sugar // fn call(_: Request) -> impl Future<Output = Response>; }

編譯錯誤 :

$ cargo check ... error[E0706]: functions in traits cannot be declared `async` --> src\main.rs:10:5 | 10 | async fn call(_: Request) -> Response; | -----^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | | | `async` because of this | ...

de-sugar 版本的編譯錯誤 :

$ cargo check error[E0562]: `impl Trait` only allowed in function and inherent method return types, not in trait method return types --> src\main.rs:10:29 | 10 | fn call(_: Request) -> impl Future<Output = Response>; | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |

de-sugar 版本的編譯錯誤分析 :

#![allow(dead_code, unused_variables)] use std::future::Future; fn main() {} struct Request; struct Response; trait Service { - fn call(_: Request) -> impl Future<Output = Response>; + fn call(&mut self, _: Request) -> impl Future<Output = Response>; } +struct X; +impl Service for X { + async call(&mut self, _: Request) -> Response + { + Response + } +} +struct Y; +impl Service for Y { + async call(&mut self, _: Request) -> Response + { + let z = [0; 1024]; + tokio::time::sleep(100).await; + drop(z); + Response + } +} +fn foo(x: &mut dyn Service) +{ + let fut = x.call(Request); +}

Line 34 的 fut 多大 ? fut 的大小取決於 Future 使用的非同步區塊中的 stack 變數。因此,fut 的大小是未知的。因此,編譯器不知道在這裡產生什麼程式碼,因為它不知道 fut 的大小。

如果傳入型別是以下,也許編譯器會知道,但現在這是一種你無法命名的型別 :

-fn foo(x: &mut dyn Service) +fn foo(x: impl Service) { let fut = x.call(Request); }

Future 的名字不知道如何命名,以下這樣寫也不對 :

struct FooCallM<F>(F); -fn foo(x: impl Service) +fn foo<S: Service>(x: S) -> <typeof S::call> { let fut = x.call(Request); + FooCall(fut); }

在我們到目前為止所描述的內容中,並沒有真正好的方法來處理像這樣的 asyncfn 呼叫。因為它產生的東西的型別在任何地方都是未知的,它沒有被寫在任何地方。

復原 Service 程式碼 :

trait Service { - fn call(&mut self, _: Request) -> impl Future<Output = Response>; + async fn call(&mut self, _: Request) -> Response; }

改使用 Pin :

#![allow(dead_code, unused_variables)] use std::future::Future; fn main() {} struct Request; struct Response; trait Service { - async fn call(&mut self, _: Request) -> Response; + async fn call(&mut self, _: Request) -> Pin<Box<dyn Future<Output = Response>>>; } struct X; impl Service for X { - async call(&mut self, _: Request) -> Response + async fn call(&mut self, _: Request) -> Pin<Box<dyn Future<Output = Response>>> { - Response + Box::pin(async move { Response }) } } struct Y; +#[async_trait] impl Service for Y { async call(&mut self, _: Request) -> Response { let z = [0; 1024]; tokio::time::sleep(100).await; drop(z); Response } } struct FooCall<F>(F); fn foo(x: &mut dyn Service) { let fut = x.call(Request); FooCall(fut); }

Pin<Box<dyn Future<Output = Response>>> 大小是已知的,它是 heap 配置 dynamically dispatched Future,編譯器可以推論出。但這樣做的問題是都將 Futures 配置在 heap 上,因為現在是 dynamically dispatched,所以不能有 monomorphizations 和一些最佳化,這樣做也會給記憶體配置器更多壓力,你會有間接所有的 Futures。假設你現在有 AsyncRead :

trait AsyncRead { fn call(&mut self, _: Request) -> Pin<Box<dyn Future<Output = Response>>>; }

只要你做 AsyncRead 操作,你都要配置 heap 的記憶體以及額外的指標間接,所以操作可能會變很昂貴。這就是為什麼 async_trait 對於較高層次的事物非常有效,因為 trait 通常是如此。但是,如果你必須在 stack 底部使用它,效果可能不佳,或者說效果可能不是很好。始終先進行測量,然後查看是否存在問題。

另外一個方法是 :

#![allow(dead_code, unused_variables)] use std::future::Future; fn main() {} struct Request; struct Response; trait Service { + type CallFuture: Future<Output = Response>; + fn call(&mut self, _: Request) -> Self::CallFuture; } struct X; impl Service for X { + type CallFuture = Pin<Box<dyn Future<Output = Response>>>; - fn call(&mut self, _: Request) -> Response + fn call(&mut self, _: Request) -> Self::CallFuture { - Box::pin(async move { Response }) + async { Response } } }

Rust 知道如何將 associated type 傳遞給 caller,因此他們現在可以知道實際型別的大小。你有一種方式來命名返回型別,以便你可以在其他結構和其他地方使用它。問題在於當你實作這個 trait 時,並不清楚如何命名除了 Pin<Box<dyn ...>> 之外的任何型別。

編譯器理論上可以將以下程式碼轉換 :

trait Service { async fn call(&mut self, _: Request) -> Response; // type CallFuture: Future<Output = Response>; // fn call(&mut self, _: Request) -> Self::CallFuture; } struct X; impl Service for X { async fn call(&mut self, _: Request) -> Response where Self: Sized { Response } }

Asynchronous Mutexes

2:08:06

同步 Mutex vs. 非同步 Mutex :

#![allow(dead_code, unused_variables)] use std::future::Future; use std::sync::{Arc, Mutex}; pub mod tokio { pub async fn spawn(_: impl Future) {} pub mod sync { pub struct Mutex; } } use tokio::sync::Mutex as TMutex; async fn main() { let x = Arc::new(Mutex::new(0)); let x1 = Arc::clone(&x); tokio::spawn(async move { loop { let x = x1.lock(); tokio::fs::read_to_string("file").await; *x1 += 1; } }); let x2 = Arc::clone(&x); tokio::spawn(async move { loop { *x2.lock() += 1; } }); }

同步 Mutex : Line 23 拿到 lock,接著 Line 24 讀取字串且呼叫 .await,接著 yield,換到 Line 31 執行時,lock 卻在 Line 23 就被拿走了,Line 31 會 block 目前執行緒,但這意味著執行緒的 Executor 被 block 了,這表示它無法繼續從字串中讀取,因為這需要繼續執行該 Future。這意味著 Line 21 的 Future 永遠不會放棄它的 lock guard,這意味著 lock 永遠不會被釋放,這意味著 Line 31 的 lock 永遠不會完成,因此我們陷入了 deadlock。

非同步 Mutex : Line 23 拿到 lock,接著 Line 24 讀取字串且呼叫 .await,接著 yield,換到 Line 31 執行時,lock 卻在 Line 23 就被拿走了,Line 31 會 yield,Line 24 會再次執行 (I/O 最終會完成),最後執行 Line 25 並釋放 lock,接著又回到 Line 31,這次沒有 deadlock。

非同步 Mutex 的一個缺點是它們速度較慢。這是因為它們需要更多的機制,以便能夠在需要時進行類似的 yield 控制,並知道何時喚醒每個事物。它們內部所需執行的操作相當複雜。因此,一般建議是,只要你的 critical section 很短且不包含任何 await pointyield point,你實際上應該使用同步 Mutex。以下是沒有 await point 且 critical section 很短的程式 :

#![allow(dead_code, unused_variables)] use std::future::Future; use std::sync::{Arc, Mutex}; pub mod tokio { pub async fn spawn(_: impl Future) {} pub mod sync { pub struct Mutex; } } use tokio::sync::Mutex as TMutex; async fn main() { let x = Arc::new(Mutex::new(0)); let x1 = Arc::clone(&x); tokio::spawn(async move { loop { let x = x1.lock(); *x1 += 1; } }); let x2 = Arc::clone(&x); tokio::spawn(async move { loop { *x2.lock() += 1; } }); }

critical section 必須很短(很快就做完)的原因是,如果 Executor A 的 critical section 進行一個巨大的矩陣乘法,那麼當然,雖然沒有 await point,但你仍然阻礙了該執行緒,不讓其他在 Executor B 上的 Futures 的工作進行。因此,這有點類似於任何其他長期操作,但稍微更糟,因為你還持有 Mutex 和矩陣,這可能會以相同的方式 block 其他執行緒上的其他 Future,就像它們可能無法取得進展一樣。所以 Executor B instance 你可能會想要用非同步 Mutex,當它拿不到 lock 不是被 block,而是讓給別的 Future 繼續執行。

目前沒有很好的工具去檢查 blocking ,執行很長的時間或是得到非預期的 cancellation。
目前最好的工具是 tokio-rs/console,它會 hook 到 Executor,它會計算自某個 Future yield 以來的時間有多長?上次重試 Future 以來的時間有多長?因此,有一些監控功能可以指出這些問題。它還不完善,而且也不是真正與 tokio 綁定的東西。這個工具是為 tokio 編寫的,但你可以想像以相同的方式對任何 Executor 進行 instrumenting,以發現特定模式何時出現,比如一個 Future 超過一秒未 yield,這看起來似乎是一個問題。你可以向使用者突出顯示這一點。

Q : Can you elaborate what is the conceptual difference between a thread:spawn, and a tokio:spawn ?
A : 一個 thread:spawn 將其傳遞的 Future 交給 Executor,在 Executor 希望的時候執行,與其他 Future 並行。一個 thread:spawn 會 spawn 一個新的作業系統執行緒,該執行緒將與程式中的所有其他內容並行執行,並且不受 Executor 控制。 thread:spawn 也不接受 Future ,而是接受一個 closure。因此,如果你想要在 spawn 的執行緒內等待 Future,你需要在其中創建自己的 Executor。相反,如果你 tokio:spawn 某些東西,則不能保證你擁有自己的執行緒。因此,你必須合作進行排程。如果你使用 tokio:spawn,你必須有 yield point,因為否則可能會 block Executor 。如果你 thread:spawn,那麼這不是一個問題,因為作業系統能夠主動中斷你。基本上可以將執行緒視為不合作排程的,因此它們可以執行 blcok 操作。而 tokio:spawn 或者僅是 spawn 的 Future 任務是合作排程的,因此需要 yield 以讓系統中的所有 Future 繼續執行。

Do futures stay on a thread?

2:21:03

Q : When I await a inside a future, will the current future get scheduled on the same thread when it resumes to start progress and it can get scheduled on another thread ?
A : 當一個 Future yield 時,它只是 yield 給任何 await 它的東西。如下程式碼 :

async fn b(mut f: impl Future) { f.await; }

如果在 f 中使用了 await,然後最終進行了 yield,你就回到了 b,此時由 b 決定接下來發生什麼。在這種情況下,b 選擇了 await,所以它將保持 await,因為既然無法取得進展,它也無法取得進展,所以它會 yield。最終,這個 yield會一直傳遞直到 Executor。在那一點上,Executor 只是將該 Future 放回到 Future 的 work queue 中,這個 work queue 通常由所有的 worker threads 處理,至少在像 tokio 的預設多執行緒 Executor 中是這樣。但這取決於你的 Executor。有些 Executor 是單執行緒的。如果你有一個單執行緒的 Executor,它會在同一個執行緒上執行,因為沒有其他執行緒。在 tokio 中,不能保證是同一個執行緒。你可以想像有方法來指定在不同的 Executor 中僅在一個執行緒上執行。在 tokio 中,例如,有一個 spawn local,它提供了一些這樣的保證。

看到以下程式 :

async fn b(mut f: impl Future) { loop { f.poll(); // poll 是一種檢查 Future 進展的方式。 } }

b 實際上根本不使用 await keyword。因此,它永遠不會 yield,它只是將你困在一個忙碌的迴圈中。如果是這樣,那麼 f 將立即再次執行,並且將在同一個執行緒上執行,因為這裡沒有 yield。因此,Executor 沒有機會將您重新安排到不同的執行緒上。但一般情況下,情況並非如此。因此,通常在整個 stack 中一直往上,它都將是一個 await,一直到 Executor ,Executor 有可能將您重新安排到不同的執行緒上。

一般來說,至少 tokio 嘗試將 Future 保持在同一個執行緒上,因為這通常有助於 cache 的 locality 等問題,但它並不保證。如果你真的希望一個 Future 不被跨執行緒傳遞,你只需不為其實作 Send。當然,這樣做會使得與該 Future 一起工作變得更加困難,但這將是你 statically 強制執行這一點的方式。

Async stack traces

2:24:11

Q : What would be your favorite method to get an async "stack trace", i.e. to show the async call graph up to a certain point?
A : Jon 沒有一個很好的方法來解決這個問題。確實,如果你在一個 Future 中只是印出一個常規的 stack trace,你將得到呼叫 stack trace,一直到 Executor。通常這樣可以幫助,但問題真正的根源來自於 spawn。看到以下程式 :

fn main() {} async fn foo() { tokio::spawn(async { panic!(); }) }

Line 6 的 panic 不會 include foo,因為這是一個 Future,被放置到 Executor 的工作佇列中作為一個 top 的 Future 來 await。然後 Executor 的一個工作執行緒將會選擇該 Future 並 await 它。在這一點上,foo 不再參與其中,foo 所做的只是將 Future 放到了佇列中,但是當 Executor 執行緒 await 這個 Future 並引發 panic 時,該 panic 的回溯只會說明 Executor poll 了這個 Future 。因此,它會指向 Line 5 的 Future,但該 trace 將不包括 foo。Jon 沒有一個很好的解決這個問題的方法。

不過你可以使用 instrument 的解決方法,Crate tracing 只是一種日誌系統,可讓你發出事件,然後讓訂閱者接收這些事件。存在的一件非常巧妙的事情就是追蹤未來,而且有一個非常棒的東西存在,那就是 Crate tracing_futures :

my_future .instrument(tracing::info_span!("my_future")) .await

在剛剛的例子使用 instrument :

fn main() {} // 語法不正確,只是想說明想法 async fn foo() { tokio::spawn(async { tracing::error!("oops"); // 會 include Line 8 的 event path panic!(); }.instrument("in foo")); }

目前沒有 non-insturmented 的解決方法。

Q : Best practices to call async code from sync? :)
A : 盡量不要這樣做。這很難做到正確。你會遇到一些問題,比如一個特定的 Future 可能正在使用 tokio 的 I/O 資源,而 Future 的 Executor 不知道如何執行。這樣你就會遇到一個 runtime 異常,如果你盲目地嘗試 block Future ,這是一種情況。另一個問題是你沒有給 caller 或你的函式庫的使用者對執行的控制權。基本上,你強制將非同步 runtime 強加給他們,而不是讓他們選擇 runtime,這往往會讓人感到沮喪。

也許一個好的方法是想像一下,我正在編寫一個非同步應用程式,我的非同步應用程式呼叫了你的同步函式庫。我使用像spawn_blocking 這樣的東西。然後你的同步函式庫在內部創建一個非同步 runtime 來 block 一個 Future。現在我有了一個嵌套的非同步 runtime。這引起了各種問題。有時是 runtime 異常,有時兩者不相容,例如其他 runtime 異常,有時只是一個性能問題,現在你 spawn 了兩倍數量的執行緒,它們不真正合作地互相調度。這真是一場噩夢。如果你內部有非同步操作,就把它們公開為非同步的,然後讓使用者選擇如何使它們同步。

Q : So for me to understand threads are heavy guns, and futures slimmer ? Then are there any real use-cases where to use threads instead of futures?
A : 有的。對於任何計算密集型的任務,非同步並不會增加太多價值,反而往往會變得麻煩,因為所有的操作都必須標記為 block 的。這樣做只會讓編碼變得更加煩瑣,因為你需要將它們作為 closure 傳遞。在這種情況下,如果任務非常計算密集,而且不涉及 I/O 操作,Jon 會使用 Crate rayon

如果你的程式基本上不涉及I/O操作,那麼就沒有真正的理由使用非同步。確實,一般來說,編寫非非同步程式碼往往會使得至少簡單程式碼、流程優化的程式碼或單一執行的程式碼更易讀。它為你提供了更好的編譯器錯誤資訊。如果你不處於非同步上下文中,borrow checker 通常工作得更好。backtace 資訊也更好。因此,如果你只是在編寫直線程式碼,比如你在編寫一個 non 非同步的程式碼,你就不需要擔心這些問題。對於一個簡單的命令列工具或者轉換器之類的東西,Jon 可能不會去考慮非同步,因為它確實會讓你的任務變得更加困難。如果你正在編寫的東西可能在 Future 會涉及大量的I/O操作,或者你可能想要處理多種不同類型的事件,那麼 select! 是非常有用的,而在多執行緒程式碼中模擬相同的模式會有些麻煩,那麼 Jon 會傾向於遠離多執行緒。但如果你真的不需要非同步提供的機制,那就使用普通的多執行緒。

Q : what if I call tokio::spawn in not tokio runtime then?
A : panic。

Wrapping up

2:33:04

待整理

  1. 自己看 tokio
  2. 為什麼需要 async_trait ?