---
title: 'Dart 異步:Event-Loop、Future、Stream'
disqus: kyleAlien
---
Dart 異步:Event-Loop、Future、Stream
===
## Overview of Content
:::success
* 如果喜歡讀更好看一點的網頁版本,可以到我新做的網站 [**DevTech Ascendancy Hub**](https://devtechascendancy.com/)
本篇文章對應的是 [**深入探索 Dart 的併發與異步處理:從 Isolate 到 Event Loop 的全面指南 | Future、Stream**](https://devtechascendancy.com/dart-concurrency-async-guide_future-stream/)
:::
參考文章: [**Stream 參考**](https://juejin.cn/post/6844903686737494023)、[**線上編譯 Dart**](https://dartpad.dev/)
[TOC]
## Dart 的併發
我們之前有提到 Dart 是單執行緒(線程)的語言,但是單執行緒運作在耗時操作(像是網路請求、IO 操作)是相當不符合要求的,它會照成應用的卡頓…
所以 Dart 使採用併發(`Concurrency`)的機制,但是其特點與 Java 併發不同,兩者併發的差異如下表~
| 語言 | 資源隔離性 | 介紹 |
| -------- | -------- | -------- |
| Java | 資源共享 | 由於資源共享,所以要注意資源的同步問題 |
| Dart | **資源隔離,記憶體不共享** | 特性像是 Java 的進程(`Process`),但是並不是進程,必須把它看成 Java Thread,但是它是記憶體安全模型 |
:::info
* 關於多執行緒其實還有「並行」、「併發」概念,兩者個差異的請點擊 [**Thread & Process 的差異:併發 & 並行概念**](https://devtechascendancy.com/process-multithread-guide/#Thread_Process_%E7%9A%84%E5%B7%AE%E7%95%B0%EF%BC%9A%E4%BD%B5%E7%99%BC_%E4%B8%A6%E8%A1%8C%E6%A6%82%E5%BF%B5) 了解
:::
### 證明 Dart 單執行緒
* 用以下程式用來證明 Dart 是單執行緒
```dart=
import 'dart:io';
void main() {
bool finish = false;
print("ready read file($finish)");
new File(r"app_entry.dart").readAsString().then((value) {
finish = true;
print("already read done($finish)");
});
while(!finish) { } // 會卡在這,在 Event - Loop 會解釋
// 永遠不會執行到 "read file finish",因為主執行序不會等待 File 的 IO
print("read file finish");
}
```
> 
我們可以發現以上程式永遠不會結束,這是因為 Dart 的單執行緒…
1. 執行到讀取檔案時,Dart 會把 IO 耗時操作放置到事件循環內等待處理,然後繼續執行後續代碼
2. 執行到 `while(!finish)` 之後,就進入了無限循環,並且這個循環會一直佔有 CPU 資源,導致沒有機會去處理 IO 事件,這個測試無法結束
> 事件循環(`Event-Loop`)之後的小節會說明
```mermaid
sequenceDiagram
participant MainThread
participant FileIO
participant EventLoop
MainThread->>MainThread: print("start read file(false)")
MainThread->>FileIO: File.readAsString()
FileIO-->>MainThread: 返回 Future,往下執行
MainThread->>MainThread: 進入 while(!finish) 循環
MainThread->>MainThread: 無限循環阻塞
Note right of EventLoop: 事件循環被塞住,無法被處理
FileIO->>EventLoop: 讀取文件完成,加入事件隊列
EventLoop-->>MainThread: 回調帶隊列中,無法被執行(因為主執行緒被卡住)
```
### isolate 類:開啟資源隔離執行緒
* 我們前面有提及「資源隔離」,那這裡我們就來使用 Dart 的資源隔離類
:::warning
要注意它與 Java Thread 最大的不同在資源的隔離不共享 (若需要資源必須透過傳遞)… 在 Java 中,執行序內的資源是不隔離的所以可以共享,而不須透過傳遞
:::
首先 import `isolate` 類,並使用 `isolate#spawn` 開啟新的 isolate,然後我們做以下實驗來證明一些事情
* **開啟新執行序**:
透過 `isolate#spawn` 方法可以開啟一個新的執行序
```dart=
import 'dart:isolate';
int? data; // int 是類,所有 default 是 null
void main() {
data = 10;
// 開新線程,spawn 是泛型
// 原型 : external static Future<Isolate> spawn<T> (方法, ,{...})
Isolate.spawn(entryPoint, "Alien");
print("Finish main isolate, data: $data"); // 判斷是否會排隊進行任務 and 數據是否隔離
}
void entryPoint(String message) { // 可以省去 (message)
print("In child isolate, Params: $message, data: $data"); // 證明數據是隔離的
}
```
從執行結果來看我們可以知道
1. `spawn` 方法確實可以開啟一個新執行序,因為主執行緒正常會先結束,而開啟的執行序在之後才結束
2. **`isolate#spawn` 方法開啟的執行序是預設不守護主執行緒的**,因為主執行緒結束後,透過 `spawn` 開啟的執行緒也不會結束
> 
* **證明資源的隔離**:
要正名資源的隔離也相當容易,讓不同的執行序持有相同資源的引用,如果資源被改變則是資源共享,而資源沒有被改變則是資源不共享!(也就是資源被隔離)
```dart=
import 'dart:isolate';
int i = -1;
void main() {
i = 20;
Isolate.spawn(entryPoint, "Test"); // Java 可想成 new Thread
print("Main $i");
}
void entryPoint(String msg) {
print("Isolate $i");
}
```
從結果可以看到,兩者所取得的資源並不相同,`Isolate.spawn` 開啟的執行序仍舊是保持初始化的 -1(因為開啟執行緒後執行的 `entryPoint` 函數,它是取全域的 `i` 資源)
:::warning
* 如果是資源共享的情況下,會變怎樣?
資源 `i` 如果是共享的,那代表主執行緒內將 `i` 修改為 20,而後 `Isolate.spawn` 開啟的執行序也會讀取到 `i` 為 20
:::
> 
### Isolate 通訊:ReceivePort & SendPort
* 由於 isolate 開啟的執行序,其資源是相互隔離的,若需要傳遞資源則必須透過「某種方法傳遞」,在 Dart 中,這種方法就是 `ReceivePort` & `SendPort`
> ReceivePort & SendPort,就像是 Android 的 Handler (不同執行緒之間的通訊)
每個 isolate 內都可以設定 ReceivePort & SendPort 物件,可以透過這兩個物件來發送與接收資源,參考以下的概念圖
> 
:::info
* ReceivePort 物件接收資源時,是否會被切換到別的執行緒中?
你在哪個執行緒內創建,就會回傳到該執行緒內中,並不用特別設定
:::
* 使用 ReceivePort & SendPort 的範例如下,該範例會讓主執行緒與 isolate 執行緒通訊
1. 在主執行緒(`Main Thread`)中創建 ReceivePort 物件,並把 ReceivePort 物件內的 SendPort 物件傳遞給 Isolate 創建出的執行序(這樣 Isolate 就可以透過它發送資源給主執行緒)
```dart=
import 'dart:isolate';
void main() {
ReceivePort receivePort = ReceivePort();
// 創建 isolate 執行緒
Isolate.spawn<SendPort>(isolateFunc, receivePort.sendPort);
// receive 監聽
// 原型: StreamSubscription listen(void onData(var message), {...});
receivePort.listen(mainListenIsolate); // main 監聽 子 isolate 訊息
print("Main~~~~~~ finish"); // Main 並不會等待 isolate !
}
```
2. 主執行緒監聽 isolate 執行序發送的訊息
```dart=
// main isolate 監聽子 isolate
void mainListenIsolate(var message) {
if(message is SendPort) {
print("I am Main, I get your send Port");
message.send("Main get"); // 2.
} else {
// 接收子 isolate 訊息
print("Main get Message from isolate: $message");
}
}
```
3. isolate 執行序接收主執行序的 `SendPort` 物件,並在 isolate 執行緒內創建 ReceivePort 物件,並將其傳給主執行緒(這樣主執行緒就可以透過它傳遞資源給 isolate 執行緒)
```dart=
// isolate 執行序
void isolateFunc(SendPort sendPort) {
print('isolate created');
// 創建自己的接收器
ReceivePort receivePort = ReceivePort();
// 透過主執行緒給予的 SendPort 物件發送把 isolate 內的發送器(`SendPort`)發送給主執行緒
sendPort.send(receivePort.sendPort);
// 監聽 Main
receivePort.listen((var message) {
print("Child Isolate get Message from Main: $message");
});
sendPort.send("I am isolate"); // 透過主執行緒給予的 SendPort 物件發送資料給主執行緒
}
```
> 
:::warning
* 以下做了個有趣的測試,可以用來加強 Dart 是單執行緒的觀念… 一樣是這個小節的範例,在這裡我們在主行緒休眠了 5s 中,那會怎樣呢?
```dart=
import 'dart:io';
import 'dart:isolate';
void main() {
ReceivePort receivePort = ReceivePort();
Isolate.spawn<SendPort>(isolateFunc, receivePort.sendPort);
// receive 監聽
// 原型: StreamSubscription listen(void onData(var message), {...});
receivePort.listen(mainListenIsolate); // main 監聽 子 isolate 訊息
// 添加休眠 5 秒
sleep(Duration(seconds:5));
print("Main~~~~~~ finish"); // 證明 Main 並不會等待 isolate !
}
```
> 
在這個範例中,我們在主執行緒中進行了 5 秒的休眠(`sleep`)。儘管 Isolate 會在 `Isolate.spawn` 後立即啟動並執行 `isolateFunc` 函數,但由於主執行緒被 sleep 阻塞,主執行緒無法立即處理來自 Isolate 的訊息
這意味著雖然 Isolate 已經獨立運行,但**主執行緒必須等待 sleep 結束後,才會繼續處理剩餘的程式碼和非同步事件**
> 因此,在主執行緒恢復後,你會看到 Isolate 發送的訊息被處理,並繼續執行剩餘程式碼
> 
:::
:::success
* 有些人可能會發現,為什麼這個範例程式不會結束(要自己按 `Ctrl+C` 才會結束)
而範例程式碼不會結束的原因是因為主執行緒的 `ReceivePort` 一直在監聽來自 Isolate 的訊息,這是導致程式持續運行的原因
:::
### Isolate 併發證明
* 以下程式碼展示了 Dart 中 Isolate 的併發性,它允許我們在獨立的執行緒中執行代碼
```dart=
import 'dart:isolate';
void main() {
// 在新的兩個 isolate 中創建兩個任務,這兩個任務做 dowhile
Isolate.spawn(isoMain1, "start---1");
Isolate.spawn(isoMain2, "start------2");
while(true) {}
}
void isoMain1(String str) {
Future.doWhile(() {
print(str);
return true;
});
}
void isoMain2(String str) {
Future.doWhile(() {
print(str);
return true;
});
}
```
如果不是併發程序的話,就會是單一輸出結果
> 
## 認識 Event Loop
Dart Event Loop 與 Android Handler 機制類似(都是以「**事件驅動**」模式設計),通過從 Loop 中不斷獲取消息
像是 isolate 發送的消息就是通過 Event Loop 處理後,由單執行緒的 Dart 應用接收
```mermaid
graph LR
subgraph Dart Event Loop
待處理任務
subgraph queue
EventQueue
MicrotaskQueue
end
待處理任務 --> queue
end
耗時任務 --> |給予待處理的任務| 待處理任務
queue -.-> |空閒時| d(dart 執行序處理)
```
### Dart Event Loop 特性:EventQueue、MicrotaskQueue
:::info
這個小節我們先撇除併發概念,先單獨來觀察 `Dart Event Loop` 的特性
:::
* 首先我們要知道,在 Dart 中一個執行緒(線程)會對應一個事件循環(`Event-loop`)
* `Dart Event Loop` 與 Android Handler 不同的是…
1. Android 每個 Thread 都有一個 `Event-loop`,並對應 **單獨的 MessageQueue**,再藉由 Handler 處理消息
```mermaid
graph LR
MainThread <-.-> Event-loop <-.-> MessageQueue
```
:::success
但是與 Dart 不同的是,Android 天生屬於多執行序
:::
2. **Dart 則是一個 Event-Looper 對應兩個 Queue,儲存不同等級的工作**,兩個 Queue 的如下表所述
```mermaid
graph LR
MainThread <-.-> Event-loop
Event-loop <-.-> MessageQueue
Event-loop <-.-> MicrotaskQueue
```
| Dart 事件類型 | 介紹 | 特色 | 範例 |
| -------- | -------- | -------- | - |
| EventQueue | 普通事件 | **++每次執行完都會檢查 微事件++** | Future.delayed(Duration(seconds: 1)); |
| MicrotaskQueue | 微事件 | **==優先等級最高==** | Future.microtask() |
> 黃色區塊為子執行序,**要注意 Main 函數會正常執行,並 ++併發執行 Event-Loop++**
> 每次 Event-loop 在執行 `EventQueue` 任務前,都會檢查 `MicrotaskQueue` 是否有需要執行的微任務,若 `MicrotaskQueue` 有任務要執行會先執行,之後才接續執行 `EventQueue` 任務
> 
:::info
* **`Event-loop` 的啟動**:
Event-loop 在 Dart 應用中是自動啟動的,**不需要等 main 完全執行完畢才開始運行**… 實際上,Event-loop 是一個「**持續運行的機制**」,從應用啟動之後就開始運行
:::
### EventQueue、MicrotaskQueue 驗證執行順序
* **在 Dart Event Loop 中插入任務的使用範例如下**
* **插入任務至 `EventQueue`**:
`then` 函數是在 Event-Loop 檢查 EventQueue 時才執行,只要 EventQueue 內有任務就會被執行(以下任務是 IO)
```dart=
import 'dart:io';
void main() {
new File(r"app_entry.dart").readAsString().then((value) {
print("執行 Task");
print(value);
});
print("read file finish");
}
```
> 
* **插入任務至 `MicrotaskQueue`**:
這裡我們來驗證將任務插入至 `MicrotaskQueue` 是否會比起把任務插入 `EventQueue` 還要快被執行
插入任務至 `MicrotaskQueue` 需要使用 `Future.microtask` 函數
```java=
import 'dart:io';
void main() {
new File(r"app_entry.dart").readAsString().then((value) {
print("執行 Task");
});
// 原型 : factory Future.microtask(FutureOr<T> computation()) {
Future.microtask(() {
print("執行 micro Task");
});
print("Main finish");
}
```
如下結果,我們可以看到 **微任務的執行會在一般任務之前**!
> 
### Micro 微任務:任務插隊
* 現在來證明 Micro 微任務是可以插隊一般任務的 (每次執行一般 Event 都會檢查有沒有需要執行的微任務),驗證程式如下:
我們在主執行序中創建 `ReceivePort` 物件並監聽事件,當有事件進入主執行序時「插入微任務」,並在微任務執行時打印,來驗證是否微任務不論順序,都會執行在一般任務之前
```java=
import 'dart:isolate';
void main() {
ReceivePort receivePort = ReceivePort();
receivePort.listen((message) {
print(message);
Future.microtask(() => print("微任務插隊~"));
});
// 一般 Event
receivePort.sendPort.send("傳送 Message 1");
// 一般 Event
receivePort.sendPort.send("傳送 Message 2");
// 一般 Event
receivePort.sendPort.send("傳送 Message 3");
print("Main function finish");
}
```
> 
### 事件循環與協程:Dart vs. Kotlin
* 個人熟悉的協程是 [**Kotlin 的智能協程**](https://devtechascendancy.com/applied-kotlin-coroutines-in-depth-guide/),所以以下透過 Kotlin 協程與 Dart 事件循環做比較
* 以下範例是最一開始我們看的「Dart 單執行緒的證明」,在這裡我們插入協程的概念會如何?
如果我改成在 `while` 判斷中修眠,那 Dart 事件循環是否可以像是 Kotlin 中提供的協程一樣,在主執行緒空閒時幫我去處理任務呢?
```dart=
void main() {
...
new File(r"app_entry.dart").readAsString().then((value) {
// 會不會有機會處理到這裡?
});
while(!finish) {
print("test...");
sleep(const Duration(seconds: 1));
}
...
}
```
**答案是否定的**!就算是這樣修改,程式也不會有機會執行到以上的 IO 任務,因為 Dart 是採用事件循環機制,與 [**Kotlin 的智能協程**](https://devtechascendancy.com/applied-kotlin-coroutines-in-depth-guide/) 就有本質上的不同
* 兩者個差異關鍵為:
* **Dart 事件循環**:當主執行緒被阻塞時(例如透過 `sleep`),事件循環無法繼續,非同步任務的回呼也無法執行
* **Kotlin 協程**:Kotlin 的協程能夠在任務之間智慧切換,避免主執行緒阻塞,能夠繼續處理其他任務
:::success
所以以效能來說的話 Kotlin 的協程能帶來更高的效能,不過相對的撰寫起來需要更多的概念,複雜度也會更高
:::
### 結合 Dart 事件循環、執行序
* 首先我們仍要抱有一個觀念 Dart 是單執行序的!那它是怎麼(或是說何時)去執行異步任務的呢?這裡需要再澄清兩個觀念
* 前面小節我們所說的「**Dart 事件循環**」,是在說明 Dart 的 `Event-loop` 特性與內部結構,這時與執行序尚未產生關聯
* **`Event-loop` 機制與異步任務關聯**:
`Event-loop` 是用來掃描其內部的兩個 Queue 事件的機制,而我們撰寫的 **異步任務就會放置到 Queue 中**
* **`Event-loop` 機制與執行序產生關聯**
那放置在兩個 Queue 的任務何時被執行呢?
Dart 不會另外起一執行序來執行,仍舊保值單一執行序執行,而這個執行是採用「**協作式**」的方式執行… 也就是說「**異步任務會在主執行序空閒時被執行**」
## 認識 Future
在 Dart 庫中隨處可見 Future 物件… 並且 **通常異步函數返回的物件就是一個 Future** (像是 `Isolate.spawn` 方法返回就是一個 Future 物件)
```java=
// Isolate.dart
// spawn 方法,返回值就是 Future
external static Future<Isolate> spawn<T>(
void entryPoint(T message), T message,
{bool paused: false,
bool errorsAreFatal,
SendPort onExit,
SendPort onError,
@Since("2.3") String debugName});
```
以下介紹幾個 Future 常用的 API,如下表所示
| Future API | 介紹 |
| -------- | -------- |
| then(...) | 當任務執行完畢後 |
| wait(...) | 等待以上 Future 任務都執行完後再一起做結尾 |
| catchError(...) | 抓取異步錯誤 |
### then 函數:異步的回調、異步串接處理
* **異步的回調**:
在呼叫異步函數時會返回一個 `Future` 物件,然後我們可以執行 `then` 函數,這個 `then` 函數就是異步的回調函數(`Callback Function`),以下是個簡單的使用範例
```dart=
void main() {
new File(r"app_entry.dart").readAsString().then((value) {
print("Read file finish");
});
}
```
> 
* **異步串接處理**:
Future#`then` 函數返回的也是一個 `Future` 物件,所以可以接續使用
:::info
如同 Java 的 Builder 建構者模式,這在做網路 API 請求的時候非常好用
:::
範例如下:
```dart=
void main() {
new File(r"app_entry.dart").readAsString().then((value) {
print("Read file finish");
return value.length;
}).then((length) {
print("File length: $length");
});
}
```
> 
### wait 函數:結合異步任務
* **結合異步任務**:
透過 Future#`wait` 函數,我們可以用來等待多個 Future 都執行完後在做統一處理,可以用來做異步任務協作)
範例如下所示
```dart=
void main() {
// 異步任務一
Future readFile = new File(r"app_entry.dart").readAsString().then((value) {
return value.length;
});
// 異步任務二
Future delay = Future.delayed(const Duration(seconds: 3));
// 等待兩任務都結束
Future.wait([readFile, delay]).then((resultArray) {
print("第一個 Future : ${resultArray[0]}");
print("第二個 Future : ${resultArray[1] ??= "null"}");
});
print("main function finish");
}
```
如下圖所見,我們會看到 `wait` 函數確實會等待兩個任務都執行完畢才會被執行
> 
### catchError 函數:捕捉異常
* **捕捉異步任務異常**:
使用 file#`readAsString` 切換為異步讀取資源 (該函數返回 Future),這時就可以使用 Future#`catchError` 函數來捕捉錯誤
以下範例會讀取一個不存在的檔案
```dart=
import 'dart:io';
void throwError() {
// readAsString 原型 : Future<String> readAsString({Encoding encoding: utf8});
new File(r"123.txt").readAsString().then((value) {
// 讀取不存在文件
print(value);
}).catchError((e,s) { // catchError 原型 Future<T> catchError(Function onError, {bool test(Object error)});
print("----> Caught an exception: $e");
print("----> Stack trace: $s");
});
}
void main() {
throwError();
}
```
如下圖所見,我們會捕捉到 `PathNoFoundException` 異常
> 
:::warning
如果沒有捕捉錯誤,則會產生 `Unhandled exception` 異常,對於 Dart 異常不熟悉的可以看 [**Dart 的捕捉異常限制**](https://devtechascendancy.com/dart-java-compare_exception_function-methods/#Dart_%E7%95%B0%E5%B8%B8%E3%80%81%E6%96%B7%E8%A8%80) 這篇文章
:::
* Future#`catchError` 也接受捕捉特定類型異常,只要透設定第二個參數就可以指定異常類型,範例如下
```dart=
import 'dart:io';
void throwError() {
new File(r"123.txt").readAsString().then((value) {
// 讀取不存在文件
print(value);
}).catchError((e, s) {
print("----> Caught an exception: $e");
print("----> Stack trace: $s");
}, test: (e) => e is PathNotFoundException);
}
void main() {
throwError();
}
```
> 
## 認識 Stream
Stream 表示的也是異步數據,它是 Dart 中處理異步事件「**流**」的 API
它與 Future 的差異在,**Future 表示 ++一次性++ 任務的返回**,而 **Stream 可以 ++分多次++ 異步任務**,這樣的好處有
1. 可以減少一次性記憶體的消耗量(因為 Stream 可以分批回傳)
2. 響應速度較快;就像是加載較大的網頁時,Stream 可以先回傳已經加載好的部分,這樣可以有比較好的使用體驗(而 Future 則是全部加載完畢後再一次返回)
### 測試 Stream 與 Future 的不同
* Stream 有監聽函數可以監聽到目前執行到哪,像是 `onDone`、`onError`、`resume`、`pause`...等等函數可以使用(如下表所示),與 Future 最大不同在於 Stream 不會單次讀取完成
| 監聽方法 | 說明 |
| -------- | -------- |
| **onData**(必須參數) | 收到 Data 時會被觸發 |
| onError | 收到 Error 時觸發 |
| onDone | 結束 Stream 流時觸發 |
| unsubscribeOnError | 當第一次收到 onError 時是否取消 Stream 流 |
以下範例中我們來讀取大一點的檔案([`linux-5.6.4.tar.xz`](https://ftp.ntu.edu.tw/pub/linux/kernel/v5.x/linux-5.6.4.tar.xz)),這樣就可以明顯看出兩者個不同
```dart=
import 'dart:async';
import 'dart:io';
import 'dart:isolate';
// 讀取次數
const fileName = '/Users/user/Downloads/linux-5.6.4.tar.xz';
int times = 0;
void main() {
Isolate.spawn(isolateFuture, "start read File By Future");
Isolate.spawn(isolateStream, "start read File By Stream");
while(true) {}
}
void isolateFuture(String str) {
print(str);
new File(fileName).readAsBytes().then((value) {
print("Future : ${times++}");
});
print("Future Finish\n\n");
}
void isolateStream(String str) {
print(str);
// 多次
StreamSubscription<List<int>> listen = new File(fileName).openRead().listen((event) {
print("Stream : ${times++}");
});
// 結束通知閉包,還有 onError 等等...
listen.onDone(() {
print("onDone");
});
// stream 也可以暫停
listen.pause();
listen.resume();
print("Stream Finish");
}
```
看到結果知道 Stream 會多次讀取資料,而 Future 是一次性的全部讀取進記憶體中
> 
### 創建 Stream 物件
* 來看看幾個比較常見的 Stream 建構方法,如下表所示
| Factory 建構 | 功能 |
| -------- | -------- |
| Stream.fromFuture | 當 future 完成時將觸發一個數據或錯誤,然後立即關閉這個流 |
| Stream.fromFutures(多了個 `s`) | 每個 future 都有自己的數據(`onData`)或錯誤(`onError`)事件,當整個 future 完成後,流將會關閉,如果 future 為空,流將會立即關閉 |
| Stream.fromIterable | 從集合中戶取數據的單訂閱流 |
1. **`Stream.fromFuture` 範例**:
```dart=
import 'dart:async';
void main() {
Future<String> myFuture = Future.delayed(
Duration(seconds: 2),
() => 'Hello from Future!');
Stream<String> streamFromFuture = Stream.fromFuture(myFuture);
streamFromFuture.listen(
(data) {
print('Data: $data');
},
onError: (error) {
print('Error: $error');
},
onDone: () {
print('Stream closed.');
},
);
}
```
2. **`Stream.fromFutures` 範例**:跟 `Stream.fromFuture` 差異不大,只是它會同時管理多個 `Future`
```dart=
import 'dart:async';
void main() {
Future<String> future1 = Future.delayed(Duration(seconds: 1), () => 'First Future');
Future<String> future2 = Future.delayed(Duration(seconds: 2), () => 'Second Future');
Future<String> future3 = Future.delayed(Duration(seconds: 3), () => 'Third Future');
Stream<String> streamFromFutures = Stream.fromFutures([future1, future2, future3]);
streamFromFutures.listen(
(data) {
print('Data: $data');
},
onError: (error) {
print('Error: $error');
},
onDone: () {
print('Stream closed.');
},
);
}
```
3. **`Stream.fromIterable` 範例**:
```dart=
import 'dart:async';
void main() {
List<int> myIterable = [1, 2, 3, 4, 5];
Stream<int> streamFromIterable = Stream.fromIterable(myIterable);
streamFromIterable.listen(
(data) {
print('Data: $data');
},
onError: (error) {
print('Error: $error');
},
onDone: () {
print('Stream closed.');
},
);
}
```
* 如果沒有額外設定,**==Stream 預設是單監聽模式==,如果監聽數量大於一個以上的監聽則會拋出錯誤**
```dart=
void main() {
var stream = new File(r"/Users/user/Downloads/linux-5.6.4.tar.xz").openRead();
// stream 流讀取
stream.listen((List<int> bytes) {
print("stream listen 1");
});
// 錯誤! 只能有一個監聽者
stream.listen((List<int> bytes){
print("stream listen 2");
});
}
```
> 
### Stream 轉廣播模式:多個監聽者
* Stream 預設是單監聽模式,若要讓 Stream 可以讓多個用戶監聽(允許多個用戶監聽),就要將其轉為「**廣播模式**」
透過 **Broadcast#`asBroadcastStream` 方法,就可以將 Stream 轉為可多個監聽者**
```dart=
import 'dart:io';
void main() {
// Stream 轉為 Broadcast 就可以多個監聽
var broadcastStream = new File(r"/Users/user/Downloads/linux-5.6.4.tar.xz").openRead().asBroadcastStream();
broadcastStream.listen((event){
print("廣播訂閱者 - 1");
});
broadcastStream.listen((event){
print("廣播訂閱者 - 2");
});
print("Main Finish");
}
```
> 
### StreamController:可動態添加數據
* StreamController 是流管理器,可透過 factory 建構函數的 `broadcast` 創建 StreamController,並且它的多訂閱模式是「**熱訂閱**」
我們可以從以下範例了解到 `StreamController` 如何使用:
1. 普通的 Stream 流是一個「**單訂閱的不可變流**」,它不可以臨時添加數據
```dart=
void main() {
var stream = Stream.fromIterable([1, 2, 3]);
// 3 秒後添加訂閱者,Timer 延遲 (時間,callback)
Timer(const Duration(seconds: 3),
() => stream.listen((Object o) => print("延遲三秒: $o")));
}
```
2. StreamController 則是一個可以動態添加數據的流,但是它屬於「熱流」(**當沒接收到數據就會流失掉**)
```dart=
void main() {
//創建一個 StreamController
var streamController = StreamController.broadcast(); // factory 構造器
// 在以後微任務中獲得事件
streamController.add("Hello"); // 無法接收,因為尚未有 listener
//訂閱事件
streamController.stream.listen((i){
print("StreamController broadcast: $i");
});
streamController.add("World"); // 可以被接收,因為已有 listener
// 記得關閉流 !!!
streamController.close();
}
```
如下圖所見,在 `listen` 之前的 `Hello` 數據是無法被 StreamController 監聽到的,只有在 `listen` 之後的 `World` 可被監聽到
> 
### 通過 `async*-yield` 生成 stream
* 使用 **async\*** 需要搭配上 **`yield`** 這個關鍵字一起使用,可以讓數據異步返回 (可執行判斷到一半就返回)
範例如下
```dart=
Stream<int> countStream(int to) async* {
for (int i = 1; i <= to; i++) {
yield i; // 每接收到一個數據就直接拋出異步訊息 (該拋出訊息不會中斷程式流程)
}
}
```
使用方式如下:
1. 使用 `await-for` 來消費 Stream 流
```dart=
void main() async {
// 使用 countStream 函數生成一個 Stream
await for (int value in countStream(5)) {
print('Received: $value');
}
print('Stream processing completed.');
}
```
> 
2. 使用 `listen` 方法來監聽流
```dart=
void main() {
// 使用 countStream 函數生成一個 Stream
final stream = countStream(5);
// 訂閱這個 Stream 並監聽數據
stream.listen((value) {
print('Received: $value');
},
onDone: () {
print('Stream processing completed.');
},
);
}
```
> 
### 用 Stream 做簡易 EventBus
* 使用 Dart 的廣播機制創建 Android 的第三方庫 `EventBus`,其功能類似於「[**觀察者模式**](https://devtechascendancy.com/observer-jdk-android-framework-listview/)」,主要的行為有 ^1.^ 訂閱、^2.^ 通知
* 我們可以透過 `StreamController` 物件來設定、過濾泛型類型,以 **達到指定類型的註冊與通知**
範例如下:
* **EventBus 類實作**:
```dart=
import 'dart:async';
class EventBus {
static EventBus _instance;
static StreamController _streamController;
EventBus._internal() {
_streamController = StreamController.broadcast();
}
factory EventBus.getDefault() {
// 單例模式
return _instance ??= EventBus._internal();
}
StreamSubscription<T> register<T>(void onData(T event)) {
if(T == dynamic) { // 未指定類型
return _streamController.stream.listen(onData);
} else {
// 傳送指定類型
return _streamController.stream.where((type) => type is T) // 判斷
.cast<T>() // 強制轉型
.listen(onData);
}
}
// 傳送廣播資訊,多廣播
void post<T>(T msg) {
_streamController.add(msg);
}
// 關閉廣播流
void close() {
_streamController.close();
}
}
```
* **使用自製的 EventBus 類**:
```dart=
import 'eventBus.dart';
void main() {
EventBus.getDefault().register((event) {
print("Receive All Message, from EventBus: $event");
});
EventBus.getDefault().register<String> ((event) {
print("--- Receive String Message, from EventBus: $event");
});
EventBus.getDefault().register<int>((event) {
print("--- --- Receive int Message, from EventBus: $event");
});
EventBus.getDefault().register<double>((event) {
print("--- --- --- Receive double Message, from EventBus: $event");
});
EventBus.getDefault().post(1111);
EventBus.getDefault().post("Hello World");
EventBus.getDefault().post(765.123);
EventBus.getDefault().close();
}
```
> 
## async / await 關鍵字
使用 **`async` 關鍵字描述的函數是 ++異步代碼塊++ (概念類似新開執行序)**
使用 **`await` 用來描述呼叫的函數,可以用來 ++等待異步執行的任務完畢++**
### async 異步函數
* **被 async 描述的函數只能返回 ==`void`== 或是 ==`Future`== 類(因為是異步)**
> 
使用範例如下:
```dart=
import 'dart:io';
void main() {
var string = asyncReadFile();
string.then((value) => print("Read Finish"));
}
// 異步函數
Future asyncReadFile() async { // 使用 async 描述
var file = new File(r"D:\mingw64\build-info.txt").readAsString();
return file;
}
```
> 
### await 等待函數
* **==await 必須要配合 async 使用==,無法單獨使用 `await` 描述呼叫的函數**
> 
* **await 可以有規律的順序執行,它會等待上一個異步函數結束才往下執行**
我們可以透過以下範例了解 `await` 的功能、好處:
1. 尚未使用 `await` 描述異步任務,那異步任務的讀取順序將會不同(每次執行都可能是不同的結果)
```dart=
void noSync() {
// 讀取順序不同,File1(linux-5.6.4.tar.xz) 較大的會讀得比較慢,File2(build.gradle) 會先讀完
new File(r"D:\Linux Code\linux-5.6.4.tar.xz").readAsBytes().then((value) => print("file1 read finish"));
new File(r"D:\mingw64\build.gradle").readAsString().then((value) => print("file2 read finish"));
}
```
2. 以往我們可能會使用瘋狂回調的方式,這容易進入回調地獄(`callback hell`)
```dart=
import 'dart:io';
// 多重回調,容易混亂
void oldStyle() {
new File(r"D:\Linux Code\linux-5.6.4.tar.xz").readAsBytes().then((value) {
print("then --- file1 read finish");
new File(r"D:\mingw64\build.gradle").readAsString().then((value) {
print("then --- file2 read finish");
});
});
}
```
3. 使用 `await` 關鍵字描述後,就可以擺脫這種回調地獄問題,可以讓函數按照預期順序執行,並且可讀性也高
```dart=
import 'dart:io';
Future orderReadFile() async {
// await 異步中的同步
await new File(r"D:\Linux Code\linux-5.6.4.tar.xz").readAsBytes().then((value) => print("async --- file1 read finish"));
var file2 = await new File(r"D:\mingw64\build.gradle").readAsString().then((value) => print("async --- file1 read finish"));
return file2;
}
```
> 
## Appendix & FAQ
:::info
:::
###### tags: `Flutter`