--- title: 'Dart 異步:Event-Loop、Future、Stream' disqus: kyleAlien --- Dart 異步:Event-Loop、Future、Stream === ## Overview of Content :::success * 如果喜歡讀更好看一點的網頁版本,可以到我新做的網站 [**DevTech Ascendancy Hub**](https://devtechascendancy.com/) 本篇文章對應的是 [**深入探索 Dart 的併發與異步處理:從 Isolate 到 Event Loop 的全面指南 | Future、Stream**](https://devtechascendancy.com/dart-concurrency-async-guide_future-stream/) ::: 參考文章: [**Stream 參考**](https://juejin.cn/post/6844903686737494023)、[**線上編譯 Dart**](https://dartpad.dev/) [TOC] ## Dart 的併發 我們之前有提到 Dart 是單執行緒(線程)的語言,但是單執行緒運作在耗時操作(像是網路請求、IO 操作)是相當不符合要求的,它會照成應用的卡頓… 所以 Dart 使採用併發(`Concurrency`)的機制,但是其特點與 Java 併發不同,兩者併發的差異如下表~ | 語言 | 資源隔離性 | 介紹 | | -------- | -------- | -------- | | Java | 資源共享 | 由於資源共享,所以要注意資源的同步問題 | | Dart | **資源隔離,記憶體不共享** | 特性像是 Java 的進程(`Process`),但是並不是進程,必須把它看成 Java Thread,但是它是記憶體安全模型 | :::info * 關於多執行緒其實還有「並行」、「併發」概念,兩者個差異的請點擊 [**Thread & Process 的差異:併發 & 並行概念**](https://devtechascendancy.com/process-multithread-guide/#Thread_Process_%E7%9A%84%E5%B7%AE%E7%95%B0%EF%BC%9A%E4%BD%B5%E7%99%BC_%E4%B8%A6%E8%A1%8C%E6%A6%82%E5%BF%B5) 了解 ::: ### 證明 Dart 單執行緒 * 用以下程式用來證明 Dart 是單執行緒 ```dart= import 'dart:io'; void main() { bool finish = false; print("ready read file($finish)"); new File(r"app_entry.dart").readAsString().then((value) { finish = true; print("already read done($finish)"); }); while(!finish) { } // 會卡在這,在 Event - Loop 會解釋 // 永遠不會執行到 "read file finish",因為主執行序不會等待 File 的 IO print("read file finish"); } ``` > ![image](https://hackmd.io/_uploads/H1NV4cbo0.png) 我們可以發現以上程式永遠不會結束,這是因為 Dart 的單執行緒… 1. 執行到讀取檔案時,Dart 會把 IO 耗時操作放置到事件循環內等待處理,然後繼續執行後續代碼 2. 執行到 `while(!finish)` 之後,就進入了無限循環,並且這個循環會一直佔有 CPU 資源,導致沒有機會去處理 IO 事件,這個測試無法結束 > 事件循環(`Event-Loop`)之後的小節會說明 ```mermaid sequenceDiagram participant MainThread participant FileIO participant EventLoop MainThread->>MainThread: print("start read file(false)") MainThread->>FileIO: File.readAsString() FileIO-->>MainThread: 返回 Future,往下執行 MainThread->>MainThread: 進入 while(!finish) 循環 MainThread->>MainThread: 無限循環阻塞 Note right of EventLoop: 事件循環被塞住,無法被處理 FileIO->>EventLoop: 讀取文件完成,加入事件隊列 EventLoop-->>MainThread: 回調帶隊列中,無法被執行(因為主執行緒被卡住) ``` ### isolate 類:開啟資源隔離執行緒 * 我們前面有提及「資源隔離」,那這裡我們就來使用 Dart 的資源隔離類 :::warning 要注意它與 Java Thread 最大的不同在資源的隔離不共享 (若需要資源必須透過傳遞)… 在 Java 中,執行序內的資源是不隔離的所以可以共享,而不須透過傳遞 ::: 首先 import `isolate` 類,並使用 `isolate#spawn` 開啟新的 isolate,然後我們做以下實驗來證明一些事情 * **開啟新執行序**: 透過 `isolate#spawn` 方法可以開啟一個新的執行序 ```dart= import 'dart:isolate'; int? data; // int 是類,所有 default 是 null void main() { data = 10; // 開新線程,spawn 是泛型 // 原型 : external static Future<Isolate> spawn<T> (方法, ,{...}) Isolate.spawn(entryPoint, "Alien"); print("Finish main isolate, data: $data"); // 判斷是否會排隊進行任務 and 數據是否隔離 } void entryPoint(String message) { // 可以省去 (message) print("In child isolate, Params: $message, data: $data"); // 證明數據是隔離的 } ``` 從執行結果來看我們可以知道 1. `spawn` 方法確實可以開啟一個新執行序,因為主執行緒正常會先結束,而開啟的執行序在之後才結束 2. **`isolate#spawn` 方法開啟的執行序是預設不守護主執行緒的**,因為主執行緒結束後,透過 `spawn` 開啟的執行緒也不會結束 > ![image](https://hackmd.io/_uploads/B1FDzKZiC.png) * **證明資源的隔離**: 要正名資源的隔離也相當容易,讓不同的執行序持有相同資源的引用,如果資源被改變則是資源共享,而資源沒有被改變則是資源不共享!(也就是資源被隔離) ```dart= import 'dart:isolate'; int i = -1; void main() { i = 20; Isolate.spawn(entryPoint, "Test"); // Java 可想成 new Thread print("Main $i"); } void entryPoint(String msg) { print("Isolate $i"); } ``` 從結果可以看到,兩者所取得的資源並不相同,`Isolate.spawn` 開啟的執行序仍舊是保持初始化的 -1(因為開啟執行緒後執行的 `entryPoint` 函數,它是取全域的 `i` 資源) :::warning * 如果是資源共享的情況下,會變怎樣? 資源 `i` 如果是共享的,那代表主執行緒內將 `i` 修改為 20,而後 `Isolate.spawn` 開啟的執行序也會讀取到 `i` 為 20 ::: > ![image](https://hackmd.io/_uploads/r1yVEF-iC.png) ### Isolate 通訊:ReceivePort & SendPort * 由於 isolate 開啟的執行序,其資源是相互隔離的,若需要傳遞資源則必須透過「某種方法傳遞」,在 Dart 中,這種方法就是 `ReceivePort` & `SendPort` > ReceivePort & SendPort,就像是 Android 的 Handler (不同執行緒之間的通訊) 每個 isolate 內都可以設定 ReceivePort & SendPort 物件,可以透過這兩個物件來發送與接收資源,參考以下的概念圖 > ![](https://i.imgur.com/iRcAIw0.png) :::info * ReceivePort 物件接收資源時,是否會被切換到別的執行緒中? 你在哪個執行緒內創建,就會回傳到該執行緒內中,並不用特別設定 ::: * 使用 ReceivePort & SendPort 的範例如下,該範例會讓主執行緒與 isolate 執行緒通訊 1. 在主執行緒(`Main Thread`)中創建 ReceivePort 物件,並把 ReceivePort 物件內的 SendPort 物件傳遞給 Isolate 創建出的執行序(這樣 Isolate 就可以透過它發送資源給主執行緒) ```dart= import 'dart:isolate'; void main() { ReceivePort receivePort = ReceivePort(); // 創建 isolate 執行緒 Isolate.spawn<SendPort>(isolateFunc, receivePort.sendPort); // receive 監聽 // 原型: StreamSubscription listen(void onData(var message), {...}); receivePort.listen(mainListenIsolate); // main 監聽 子 isolate 訊息 print("Main~~~~~~ finish"); // Main 並不會等待 isolate ! } ``` 2. 主執行緒監聽 isolate 執行序發送的訊息 ```dart= // main isolate 監聽子 isolate void mainListenIsolate(var message) { if(message is SendPort) { print("I am Main, I get your send Port"); message.send("Main get"); // 2. } else { // 接收子 isolate 訊息 print("Main get Message from isolate: $message"); } } ``` 3. isolate 執行序接收主執行序的 `SendPort` 物件,並在 isolate 執行緒內創建 ReceivePort 物件,並將其傳給主執行緒(這樣主執行緒就可以透過它傳遞資源給 isolate 執行緒) ```dart= // isolate 執行序 void isolateFunc(SendPort sendPort) { print('isolate created'); // 創建自己的接收器 ReceivePort receivePort = ReceivePort(); // 透過主執行緒給予的 SendPort 物件發送把 isolate 內的發送器(`SendPort`)發送給主執行緒 sendPort.send(receivePort.sendPort); // 監聽 Main receivePort.listen((var message) { print("Child Isolate get Message from Main: $message"); }); sendPort.send("I am isolate"); // 透過主執行緒給予的 SendPort 物件發送資料給主執行緒 } ``` > ![image](https://hackmd.io/_uploads/By5w6YbiC.png) :::warning * 以下做了個有趣的測試,可以用來加強 Dart 是單執行緒的觀念… 一樣是這個小節的範例,在這裡我們在主行緒休眠了 5s 中,那會怎樣呢? ```dart= import 'dart:io'; import 'dart:isolate'; void main() { ReceivePort receivePort = ReceivePort(); Isolate.spawn<SendPort>(isolateFunc, receivePort.sendPort); // receive 監聽 // 原型: StreamSubscription listen(void onData(var message), {...}); receivePort.listen(mainListenIsolate); // main 監聽 子 isolate 訊息 // 添加休眠 5 秒 sleep(Duration(seconds:5)); print("Main~~~~~~ finish"); // 證明 Main 並不會等待 isolate ! } ``` > ![image](https://hackmd.io/_uploads/SJRt0Kbi0.png) 在這個範例中,我們在主執行緒中進行了 5 秒的休眠(`sleep`)。儘管 Isolate 會在 `Isolate.spawn` 後立即啟動並執行 `isolateFunc` 函數,但由於主執行緒被 sleep 阻塞,主執行緒無法立即處理來自 Isolate 的訊息 這意味著雖然 Isolate 已經獨立運行,但**主執行緒必須等待 sleep 結束後,才會繼續處理剩餘的程式碼和非同步事件** > 因此,在主執行緒恢復後,你會看到 Isolate 發送的訊息被處理,並繼續執行剩餘程式碼 > ![image](https://hackmd.io/_uploads/SJj90tWsA.png) ::: :::success * 有些人可能會發現,為什麼這個範例程式不會結束(要自己按 `Ctrl+C` 才會結束) 而範例程式碼不會結束的原因是因為主執行緒的 `ReceivePort` 一直在監聽來自 Isolate 的訊息,這是導致程式持續運行的原因 ::: ### Isolate 併發證明 * 以下程式碼展示了 Dart 中 Isolate 的併發性,它允許我們在獨立的執行緒中執行代碼 ```dart= import 'dart:isolate'; void main() { // 在新的兩個 isolate 中創建兩個任務,這兩個任務做 dowhile Isolate.spawn(isoMain1, "start---1"); Isolate.spawn(isoMain2, "start------2"); while(true) {} } void isoMain1(String str) { Future.doWhile(() { print(str); return true; }); } void isoMain2(String str) { Future.doWhile(() { print(str); return true; }); } ``` 如果不是併發程序的話,就會是單一輸出結果 > ![](https://i.imgur.com/mTT0XJR.png) ## 認識 Event Loop Dart Event Loop 與 Android Handler 機制類似(都是以「**事件驅動**」模式設計),通過從 Loop 中不斷獲取消息 像是 isolate 發送的消息就是通過 Event Loop 處理後,由單執行緒的 Dart 應用接收 ```mermaid graph LR subgraph Dart Event Loop 待處理任務 subgraph queue EventQueue MicrotaskQueue end 待處理任務 --> queue end 耗時任務 --> |給予待處理的任務| 待處理任務 queue -.-> |空閒時| d(dart 執行序處理) ``` ### Dart Event Loop 特性:EventQueue、MicrotaskQueue :::info 這個小節我們先撇除併發概念,先單獨來觀察 `Dart Event Loop` 的特性 ::: * 首先我們要知道,在 Dart 中一個執行緒(線程)會對應一個事件循環(`Event-loop`) * `Dart Event Loop` 與 Android Handler 不同的是… 1. Android 每個 Thread 都有一個 `Event-loop`,並對應 **單獨的 MessageQueue**,再藉由 Handler 處理消息 ```mermaid graph LR MainThread <-.-> Event-loop <-.-> MessageQueue ``` :::success 但是與 Dart 不同的是,Android 天生屬於多執行序 ::: 2. **Dart 則是一個 Event-Looper 對應兩個 Queue,儲存不同等級的工作**,兩個 Queue 的如下表所述 ```mermaid graph LR MainThread <-.-> Event-loop Event-loop <-.-> MessageQueue Event-loop <-.-> MicrotaskQueue ``` | Dart 事件類型 | 介紹 | 特色 | 範例 | | -------- | -------- | -------- | - | | EventQueue | 普通事件 | **++每次執行完都會檢查 微事件++** | Future.delayed(Duration(seconds: 1)); | | MicrotaskQueue | 微事件 | **==優先等級最高==** | Future.microtask() | > 黃色區塊為子執行序,**要注意 Main 函數會正常執行,並 ++併發執行 Event-Loop++** > 每次 Event-loop 在執行 `EventQueue` 任務前,都會檢查 `MicrotaskQueue` 是否有需要執行的微任務,若 `MicrotaskQueue` 有任務要執行會先執行,之後才接續執行 `EventQueue` 任務 > ![](https://i.imgur.com/oYq1D7v.png) :::info * **`Event-loop` 的啟動**: Event-loop 在 Dart 應用中是自動啟動的,**不需要等 main 完全執行完畢才開始運行**… 實際上,Event-loop 是一個「**持續運行的機制**」,從應用啟動之後就開始運行 ::: ### EventQueue、MicrotaskQueue 驗證執行順序 * **在 Dart Event Loop 中插入任務的使用範例如下** * **插入任務至 `EventQueue`**: `then` 函數是在 Event-Loop 檢查 EventQueue 時才執行,只要 EventQueue 內有任務就會被執行(以下任務是 IO) ```dart= import 'dart:io'; void main() { new File(r"app_entry.dart").readAsString().then((value) { print("執行 Task"); print(value); }); print("read file finish"); } ``` > ![image](https://hackmd.io/_uploads/rJvFT5WoR.png) * **插入任務至 `MicrotaskQueue`**: 這裡我們來驗證將任務插入至 `MicrotaskQueue` 是否會比起把任務插入 `EventQueue` 還要快被執行 插入任務至 `MicrotaskQueue` 需要使用 `Future.microtask` 函數 ```java= import 'dart:io'; void main() { new File(r"app_entry.dart").readAsString().then((value) { print("執行 Task"); }); // 原型 : factory Future.microtask(FutureOr<T> computation()) { Future.microtask(() { print("執行 micro Task"); }); print("Main finish"); } ``` 如下結果,我們可以看到 **微任務的執行會在一般任務之前**! > ![image](https://hackmd.io/_uploads/ByuNR5-oR.png) ### Micro 微任務:任務插隊 * 現在來證明 Micro 微任務是可以插隊一般任務的 (每次執行一般 Event 都會檢查有沒有需要執行的微任務),驗證程式如下: 我們在主執行序中創建 `ReceivePort` 物件並監聽事件,當有事件進入主執行序時「插入微任務」,並在微任務執行時打印,來驗證是否微任務不論順序,都會執行在一般任務之前 ```java= import 'dart:isolate'; void main() { ReceivePort receivePort = ReceivePort(); receivePort.listen((message) { print(message); Future.microtask(() => print("微任務插隊~")); }); // 一般 Event receivePort.sendPort.send("傳送 Message 1"); // 一般 Event receivePort.sendPort.send("傳送 Message 2"); // 一般 Event receivePort.sendPort.send("傳送 Message 3"); print("Main function finish"); } ``` > ![](https://i.imgur.com/kJ3Q5Iy.png) ### 事件循環與協程:Dart vs. Kotlin * 個人熟悉的協程是 [**Kotlin 的智能協程**](https://devtechascendancy.com/applied-kotlin-coroutines-in-depth-guide/),所以以下透過 Kotlin 協程與 Dart 事件循環做比較 * 以下範例是最一開始我們看的「Dart 單執行緒的證明」,在這裡我們插入協程的概念會如何? 如果我改成在 `while` 判斷中修眠,那 Dart 事件循環是否可以像是 Kotlin 中提供的協程一樣,在主執行緒空閒時幫我去處理任務呢? ```dart= void main() { ... new File(r"app_entry.dart").readAsString().then((value) { // 會不會有機會處理到這裡? }); while(!finish) { print("test..."); sleep(const Duration(seconds: 1)); } ... } ``` **答案是否定的**!就算是這樣修改,程式也不會有機會執行到以上的 IO 任務,因為 Dart 是採用事件循環機制,與 [**Kotlin 的智能協程**](https://devtechascendancy.com/applied-kotlin-coroutines-in-depth-guide/) 就有本質上的不同 * 兩者個差異關鍵為: * **Dart 事件循環**:當主執行緒被阻塞時(例如透過 `sleep`),事件循環無法繼續,非同步任務的回呼也無法執行 * **Kotlin 協程**:Kotlin 的協程能夠在任務之間智慧切換,避免主執行緒阻塞,能夠繼續處理其他任務 :::success 所以以效能來說的話 Kotlin 的協程能帶來更高的效能,不過相對的撰寫起來需要更多的概念,複雜度也會更高 ::: ### 結合 Dart 事件循環、執行序 * 首先我們仍要抱有一個觀念 Dart 是單執行序的!那它是怎麼(或是說何時)去執行異步任務的呢?這裡需要再澄清兩個觀念 * 前面小節我們所說的「**Dart 事件循環**」,是在說明 Dart 的 `Event-loop` 特性與內部結構,這時與執行序尚未產生關聯 * **`Event-loop` 機制與異步任務關聯**: `Event-loop` 是用來掃描其內部的兩個 Queue 事件的機制,而我們撰寫的 **異步任務就會放置到 Queue 中** * **`Event-loop` 機制與執行序產生關聯** 那放置在兩個 Queue 的任務何時被執行呢? Dart 不會另外起一執行序來執行,仍舊保值單一執行序執行,而這個執行是採用「**協作式**」的方式執行… 也就是說「**異步任務會在主執行序空閒時被執行**」 ## 認識 Future 在 Dart 庫中隨處可見 Future 物件… 並且 **通常異步函數返回的物件就是一個 Future** (像是 `Isolate.spawn` 方法返回就是一個 Future 物件) ```java= // Isolate.dart // spawn 方法,返回值就是 Future external static Future<Isolate> spawn<T>( void entryPoint(T message), T message, {bool paused: false, bool errorsAreFatal, SendPort onExit, SendPort onError, @Since("2.3") String debugName}); ``` 以下介紹幾個 Future 常用的 API,如下表所示 | Future API | 介紹 | | -------- | -------- | | then(...) | 當任務執行完畢後 | | wait(...) | 等待以上 Future 任務都執行完後再一起做結尾 | | catchError(...) | 抓取異步錯誤 | ### then 函數:異步的回調、異步串接處理 * **異步的回調**: 在呼叫異步函數時會返回一個 `Future` 物件,然後我們可以執行 `then` 函數,這個 `then` 函數就是異步的回調函數(`Callback Function`),以下是個簡單的使用範例 ```dart= void main() { new File(r"app_entry.dart").readAsString().then((value) { print("Read file finish"); }); } ``` > ![image](https://hackmd.io/_uploads/ryGqk-miA.png) * **異步串接處理**: Future#`then` 函數返回的也是一個 `Future` 物件,所以可以接續使用 :::info 如同 Java 的 Builder 建構者模式,這在做網路 API 請求的時候非常好用 ::: 範例如下: ```dart= void main() { new File(r"app_entry.dart").readAsString().then((value) { print("Read file finish"); return value.length; }).then((length) { print("File length: $length"); }); } ``` > ![image](https://hackmd.io/_uploads/HJf21ZmiA.png) ### wait 函數:結合異步任務 * **結合異步任務**: 透過 Future#`wait` 函數,我們可以用來等待多個 Future 都執行完後在做統一處理,可以用來做異步任務協作) 範例如下所示 ```dart= void main() { // 異步任務一 Future readFile = new File(r"app_entry.dart").readAsString().then((value) { return value.length; }); // 異步任務二 Future delay = Future.delayed(const Duration(seconds: 3)); // 等待兩任務都結束 Future.wait([readFile, delay]).then((resultArray) { print("第一個 Future : ${resultArray[0]}"); print("第二個 Future : ${resultArray[1] ??= "null"}"); }); print("main function finish"); } ``` 如下圖所見,我們會看到 `wait` 函數確實會等待兩個任務都執行完畢才會被執行 > ![image](https://hackmd.io/_uploads/HJjfWWXj0.png) ### catchError 函數:捕捉異常 * **捕捉異步任務異常**: 使用 file#`readAsString` 切換為異步讀取資源 (該函數返回 Future),這時就可以使用 Future#`catchError` 函數來捕捉錯誤 以下範例會讀取一個不存在的檔案 ```dart= import 'dart:io'; void throwError() { // readAsString 原型 : Future<String> readAsString({Encoding encoding: utf8}); new File(r"123.txt").readAsString().then((value) { // 讀取不存在文件 print(value); }).catchError((e,s) { // catchError 原型 Future<T> catchError(Function onError, {bool test(Object error)}); print("----> Caught an exception: $e"); print("----> Stack trace: $s"); }); } void main() { throwError(); } ``` 如下圖所見,我們會捕捉到 `PathNoFoundException` 異常 > ![image](https://hackmd.io/_uploads/rJAO7ZXoA.png) :::warning 如果沒有捕捉錯誤,則會產生 `Unhandled exception` 異常,對於 Dart 異常不熟悉的可以看 [**Dart 的捕捉異常限制**](https://devtechascendancy.com/dart-java-compare_exception_function-methods/#Dart_%E7%95%B0%E5%B8%B8%E3%80%81%E6%96%B7%E8%A8%80) 這篇文章 ::: * Future#`catchError` 也接受捕捉特定類型異常,只要透設定第二個參數就可以指定異常類型,範例如下 ```dart= import 'dart:io'; void throwError() { new File(r"123.txt").readAsString().then((value) { // 讀取不存在文件 print(value); }).catchError((e, s) { print("----> Caught an exception: $e"); print("----> Stack trace: $s"); }, test: (e) => e is PathNotFoundException); } void main() { throwError(); } ``` > ![image](https://hackmd.io/_uploads/B1t4X-7o0.png) ## 認識 Stream Stream 表示的也是異步數據,它是 Dart 中處理異步事件「**流**」的 API 它與 Future 的差異在,**Future 表示 ++一次性++ 任務的返回**,而 **Stream 可以 ++分多次++ 異步任務**,這樣的好處有 1. 可以減少一次性記憶體的消耗量(因為 Stream 可以分批回傳) 2. 響應速度較快;就像是加載較大的網頁時,Stream 可以先回傳已經加載好的部分,這樣可以有比較好的使用體驗(而 Future 則是全部加載完畢後再一次返回) ### 測試 Stream 與 Future 的不同 * Stream 有監聽函數可以監聽到目前執行到哪,像是 `onDone`、`onError`、`resume`、`pause`...等等函數可以使用(如下表所示),與 Future 最大不同在於 Stream 不會單次讀取完成 | 監聽方法 | 說明 | | -------- | -------- | | **onData**(必須參數) | 收到 Data 時會被觸發 | | onError | 收到 Error 時觸發 | | onDone | 結束 Stream 流時觸發 | | unsubscribeOnError | 當第一次收到 onError 時是否取消 Stream 流 | 以下範例中我們來讀取大一點的檔案([`linux-5.6.4.tar.xz`](https://ftp.ntu.edu.tw/pub/linux/kernel/v5.x/linux-5.6.4.tar.xz)),這樣就可以明顯看出兩者個不同 ```dart= import 'dart:async'; import 'dart:io'; import 'dart:isolate'; // 讀取次數 const fileName = '/Users/user/Downloads/linux-5.6.4.tar.xz'; int times = 0; void main() { Isolate.spawn(isolateFuture, "start read File By Future"); Isolate.spawn(isolateStream, "start read File By Stream"); while(true) {} } void isolateFuture(String str) { print(str); new File(fileName).readAsBytes().then((value) { print("Future : ${times++}"); }); print("Future Finish\n\n"); } void isolateStream(String str) { print(str); // 多次 StreamSubscription<List<int>> listen = new File(fileName).openRead().listen((event) { print("Stream : ${times++}"); }); // 結束通知閉包,還有 onError 等等... listen.onDone(() { print("onDone"); }); // stream 也可以暫停 listen.pause(); listen.resume(); print("Stream Finish"); } ``` 看到結果知道 Stream 會多次讀取資料,而 Future 是一次性的全部讀取進記憶體中 > ![image](https://hackmd.io/_uploads/HkjTIbQiC.png) ### 創建 Stream 物件 * 來看看幾個比較常見的 Stream 建構方法,如下表所示 | Factory 建構 | 功能 | | -------- | -------- | | Stream.fromFuture | 當 future 完成時將觸發一個數據或錯誤,然後立即關閉這個流 | | Stream.fromFutures(多了個 `s`) | 每個 future 都有自己的數據(`onData`)或錯誤(`onError`)事件,當整個 future 完成後,流將會關閉,如果 future 為空,流將會立即關閉 | | Stream.fromIterable | 從集合中戶取數據的單訂閱流 | 1. **`Stream.fromFuture` 範例**: ```dart= import 'dart:async'; void main() { Future<String> myFuture = Future.delayed( Duration(seconds: 2), () => 'Hello from Future!'); Stream<String> streamFromFuture = Stream.fromFuture(myFuture); streamFromFuture.listen( (data) { print('Data: $data'); }, onError: (error) { print('Error: $error'); }, onDone: () { print('Stream closed.'); }, ); } ``` 2. **`Stream.fromFutures` 範例**:跟 `Stream.fromFuture` 差異不大,只是它會同時管理多個 `Future` ```dart= import 'dart:async'; void main() { Future<String> future1 = Future.delayed(Duration(seconds: 1), () => 'First Future'); Future<String> future2 = Future.delayed(Duration(seconds: 2), () => 'Second Future'); Future<String> future3 = Future.delayed(Duration(seconds: 3), () => 'Third Future'); Stream<String> streamFromFutures = Stream.fromFutures([future1, future2, future3]); streamFromFutures.listen( (data) { print('Data: $data'); }, onError: (error) { print('Error: $error'); }, onDone: () { print('Stream closed.'); }, ); } ``` 3. **`Stream.fromIterable` 範例**: ```dart= import 'dart:async'; void main() { List<int> myIterable = [1, 2, 3, 4, 5]; Stream<int> streamFromIterable = Stream.fromIterable(myIterable); streamFromIterable.listen( (data) { print('Data: $data'); }, onError: (error) { print('Error: $error'); }, onDone: () { print('Stream closed.'); }, ); } ``` * 如果沒有額外設定,**==Stream 預設是單監聽模式==,如果監聽數量大於一個以上的監聽則會拋出錯誤** ```dart= void main() { var stream = new File(r"/Users/user/Downloads/linux-5.6.4.tar.xz").openRead(); // stream 流讀取 stream.listen((List<int> bytes) { print("stream listen 1"); }); // 錯誤! 只能有一個監聽者 stream.listen((List<int> bytes){ print("stream listen 2"); }); } ``` > ![](https://i.imgur.com/CaMVkjb.png) ### Stream 轉廣播模式:多個監聽者 * Stream 預設是單監聽模式,若要讓 Stream 可以讓多個用戶監聽(允許多個用戶監聽),就要將其轉為「**廣播模式**」 透過 **Broadcast#`asBroadcastStream` 方法,就可以將 Stream 轉為可多個監聽者** ```dart= import 'dart:io'; void main() { // Stream 轉為 Broadcast 就可以多個監聽 var broadcastStream = new File(r"/Users/user/Downloads/linux-5.6.4.tar.xz").openRead().asBroadcastStream(); broadcastStream.listen((event){ print("廣播訂閱者 - 1"); }); broadcastStream.listen((event){ print("廣播訂閱者 - 2"); }); print("Main Finish"); } ``` > ![](https://i.imgur.com/S28gSCV.png) ### StreamController:可動態添加數據 * StreamController 是流管理器,可透過 factory 建構函數的 `broadcast` 創建 StreamController,並且它的多訂閱模式是「**熱訂閱**」 我們可以從以下範例了解到 `StreamController` 如何使用: 1. 普通的 Stream 流是一個「**單訂閱的不可變流**」,它不可以臨時添加數據 ```dart= void main() { var stream = Stream.fromIterable([1, 2, 3]); // 3 秒後添加訂閱者,Timer 延遲 (時間,callback) Timer(const Duration(seconds: 3), () => stream.listen((Object o) => print("延遲三秒: $o"))); } ``` 2. StreamController 則是一個可以動態添加數據的流,但是它屬於「熱流」(**當沒接收到數據就會流失掉**) ```dart= void main() { //創建一個 StreamController var streamController = StreamController.broadcast(); // factory 構造器 // 在以後微任務中獲得事件 streamController.add("Hello"); // 無法接收,因為尚未有 listener //訂閱事件 streamController.stream.listen((i){ print("StreamController broadcast: $i"); }); streamController.add("World"); // 可以被接收,因為已有 listener // 記得關閉流 !!! streamController.close(); } ``` 如下圖所見,在 `listen` 之前的 `Hello` 數據是無法被 StreamController 監聽到的,只有在 `listen` 之後的 `World` 可被監聽到 > ![image](https://hackmd.io/_uploads/SkVZ6ZQsA.png) ### 通過 `async*-yield` 生成 stream * 使用 **async\*** 需要搭配上 **`yield`** 這個關鍵字一起使用,可以讓數據異步返回 (可執行判斷到一半就返回) 範例如下 ```dart= Stream<int> countStream(int to) async* { for (int i = 1; i <= to; i++) { yield i; // 每接收到一個數據就直接拋出異步訊息 (該拋出訊息不會中斷程式流程) } } ``` 使用方式如下: 1. 使用 `await-for` 來消費 Stream 流 ```dart= void main() async { // 使用 countStream 函數生成一個 Stream await for (int value in countStream(5)) { print('Received: $value'); } print('Stream processing completed.'); } ``` > ![image](https://hackmd.io/_uploads/HkNBlzmiA.png) 2. 使用 `listen` 方法來監聽流 ```dart= void main() { // 使用 countStream 函數生成一個 Stream final stream = countStream(5); // 訂閱這個 Stream 並監聽數據 stream.listen((value) { print('Received: $value'); }, onDone: () { print('Stream processing completed.'); }, ); } ``` > ![image](https://hackmd.io/_uploads/HkNBlzmiA.png) ### 用 Stream 做簡易 EventBus * 使用 Dart 的廣播機制創建 Android 的第三方庫 `EventBus`,其功能類似於「[**觀察者模式**](https://devtechascendancy.com/observer-jdk-android-framework-listview/)」,主要的行為有 ^1.^ 訂閱、^2.^ 通知 * 我們可以透過 `StreamController` 物件來設定、過濾泛型類型,以 **達到指定類型的註冊與通知** 範例如下: * **EventBus 類實作**: ```dart= import 'dart:async'; class EventBus { static EventBus _instance; static StreamController _streamController; EventBus._internal() { _streamController = StreamController.broadcast(); } factory EventBus.getDefault() { // 單例模式 return _instance ??= EventBus._internal(); } StreamSubscription<T> register<T>(void onData(T event)) { if(T == dynamic) { // 未指定類型 return _streamController.stream.listen(onData); } else { // 傳送指定類型 return _streamController.stream.where((type) => type is T) // 判斷 .cast<T>() // 強制轉型 .listen(onData); } } // 傳送廣播資訊,多廣播 void post<T>(T msg) { _streamController.add(msg); } // 關閉廣播流 void close() { _streamController.close(); } } ``` * **使用自製的 EventBus 類**: ```dart= import 'eventBus.dart'; void main() { EventBus.getDefault().register((event) { print("Receive All Message, from EventBus: $event"); }); EventBus.getDefault().register<String> ((event) { print("--- Receive String Message, from EventBus: $event"); }); EventBus.getDefault().register<int>((event) { print("--- --- Receive int Message, from EventBus: $event"); }); EventBus.getDefault().register<double>((event) { print("--- --- --- Receive double Message, from EventBus: $event"); }); EventBus.getDefault().post(1111); EventBus.getDefault().post("Hello World"); EventBus.getDefault().post(765.123); EventBus.getDefault().close(); } ``` > ![](https://i.imgur.com/K0bbfuc.png) ## async / await 關鍵字 使用 **`async` 關鍵字描述的函數是 ++異步代碼塊++ (概念類似新開執行序)** 使用 **`await` 用來描述呼叫的函數,可以用來 ++等待異步執行的任務完畢++** ### async 異步函數 * **被 async 描述的函數只能返回 ==`void`== 或是 ==`Future`== 類(因為是異步)** > ![](https://i.imgur.com/uKhiCU1.png) 使用範例如下: ```dart= import 'dart:io'; void main() { var string = asyncReadFile(); string.then((value) => print("Read Finish")); } // 異步函數 Future asyncReadFile() async { // 使用 async 描述 var file = new File(r"D:\mingw64\build-info.txt").readAsString(); return file; } ``` > ![](https://i.imgur.com/0lwjdTj.png) ### await 等待函數 * **==await 必須要配合 async 使用==,無法單獨使用 `await` 描述呼叫的函數** > ![](https://i.imgur.com/oNSoKxg.png) * **await 可以有規律的順序執行,它會等待上一個異步函數結束才往下執行** 我們可以透過以下範例了解 `await` 的功能、好處: 1. 尚未使用 `await` 描述異步任務,那異步任務的讀取順序將會不同(每次執行都可能是不同的結果) ```dart= void noSync() { // 讀取順序不同,File1(linux-5.6.4.tar.xz) 較大的會讀得比較慢,File2(build.gradle) 會先讀完 new File(r"D:\Linux Code\linux-5.6.4.tar.xz").readAsBytes().then((value) => print("file1 read finish")); new File(r"D:\mingw64\build.gradle").readAsString().then((value) => print("file2 read finish")); } ``` 2. 以往我們可能會使用瘋狂回調的方式,這容易進入回調地獄(`callback hell`) ```dart= import 'dart:io'; // 多重回調,容易混亂 void oldStyle() { new File(r"D:\Linux Code\linux-5.6.4.tar.xz").readAsBytes().then((value) { print("then --- file1 read finish"); new File(r"D:\mingw64\build.gradle").readAsString().then((value) { print("then --- file2 read finish"); }); }); } ``` 3. 使用 `await` 關鍵字描述後,就可以擺脫這種回調地獄問題,可以讓函數按照預期順序執行,並且可讀性也高 ```dart= import 'dart:io'; Future orderReadFile() async { // await 異步中的同步 await new File(r"D:\Linux Code\linux-5.6.4.tar.xz").readAsBytes().then((value) => print("async --- file1 read finish")); var file2 = await new File(r"D:\mingw64\build.gradle").readAsString().then((value) => print("async --- file1 read finish")); return file2; } ``` > ![](https://i.imgur.com/9gq4uwW.png) ## Appendix & FAQ :::info ::: ###### tags: `Flutter`