--- title: 'Flow & RxJava' disqus: kyleAlien --- Flow & RxJava === ## Overview of Content 我們在 Java 中常用的響應式編程模型就是 RxJava;而 Flow 就是 Koltin 結合 Coroutine 響應式編程的產物 :::info 響應式編程:簡單來想就是依照上一步反應而觸發不同的行為編程;最常見的就是 API 請求,由 API 請求的結果來驅動 APP 內的邏輯,讓它們之間產生對應的反應(像是很多的監聽者) ::: [TOC] ## Flow 基本使用 Flow 使用起來類似於 RxJava,其函數的對應如無下 | 功能 | Flow | RxJava | | ------------ | ---- | ------ | | 對流發送資料 | emit() | onNext() | | 接收資料 | collect() | subscribe() | Flow 是一個介面 ```kotlin= // Flow 源碼 public interface Flow<out T> { public suspend fun collect(collector: FlowCollector<T>) } ``` ### flow 基本使用 * **Flow 可以返回(或是說發射 `emit`)多個異步計算的結果**,**並用 `collect` 接收結果**;FlowBuilder 有幾種使用方式,常用的方式如下 1. **使用頂層函數 `flow{ }`**:可以直接使用 `flow{ }` 創建 SafeFlow ```kotlin= // 源碼 public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block) ``` **`flow{ }` 的使用範例如下**: ```kotlin= // 範例 suspend fun sampleUse01() = coroutineScope { flow<Int> { for (i in 1..5) { val res = measureTimeMillis { delay(1000L * i) emit(i) } println("Use time: $res") } }.collect { println("res: $it") } } ``` > ![](https://i.imgur.com/4ul40Pv.png) 2. **使用頂層函數 `flowOf{ }`**:其實內部就是透過 flow 並呼叫 `emit` 函數 ```kotlin= // 源碼 public fun <T> flowOf(vararg elements: T): Flow<T> = flow { for (element in elements) { emit(element) } } ``` **`flowOf{ }` 使用範例如下**: ```kotlin= // 範例 suspend fun sampleUse02() = coroutineScope { flowOf("A", "B", "C", "D") .onEach { // onEach 每次發送前執行 delay(100) } .collect { println("it each: $it") } } ``` > ![](https://i.imgur.com/QmXSThT.png) 3. **使用頂層函數 `asFlow`**:它是一個 `Iterable` 類的拓展泛型函數,同樣透過 `emit` 發送 ```kotlin= // 源碼 public fun <T> Iterable<T>.asFlow(): Flow<T> = flow { forEach { value -> emit(value) } } ``` **`asFlow` 的使用範例如下**: ```kotlin= // 範例 suspend fun sampleUse03() = coroutineScope { listOf(1.1, 2.2, 3.3, 4.4, 5.5).asFlow() .onEach { // onEach 每次發送前執行 delay(100) } .collect { println("it each: $it") } } ``` 4. 使用頂層函數 `channelFlow`,跟上面的 **差別在於,它透過 channel 的 send 發送**(Channel 的特點是可以與異步產生掛勾) ```kotlin= // 源碼 public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> = ChannelFlowBuilder(block) ``` **`channelFlow` 的使用範例如下**: ```kotlin= // 範例 suspend fun sampleUse04() = coroutineScope { channelFlow { for (i in 1..5) { val res = measureTimeMillis { delay(1000L * i) send(i) } println("Use time: $res") } }.collect { println("channel res: $it") } } ``` > ![](https://i.imgur.com/wQ13lAd.png) ### Channel Flow 特點 * Channel Flow 與一般 Flow 兩個都是 **生產者跟消費者模型**,但是還是有差異,其差異如下 | Flow 種類 | 是否同步 | 特點 | | --------- | -------- | ---- | | Flow | 是 | 非掛起(非阻塞,CPU 沒有讓出資源) | | ChannelFlow | 否、**異步** | **可切換上下文(不同 `Coroutine` 間通訊)** | 1. **Flow 使用範例**: 一般的 Flow 是「**同步**」的方式傳遞訊息,**必須等待 `collect` 處理完畢才可以進行下一次 `emit` 行為** ```kotlin= suspend fun normalFlow_Sync() = coroutineScope { val totalTime = measureTimeMillis { flow { // 與使用 `flowOf` 則相同 for (it in 1 .. 5) { // 1 ~ 5 delay(100) println("Hello, emit now: $it") emit(it) } }.collect{ delay(500) println("World, I get normal flow item: $it") } } // println("Flow with sync, total use time: $totalTime") } ``` > ![](https://i.imgur.com/DqV6X68.png) 概念圖如下,每次都必須傳輸(耗時 100ms)、收集(耗時 500ms)結束才往下一個事件走 ```mermaid graph TB 1 --> |100ms + 500ms| 2 --> |100ms + 500ms| 3 --> |100ms + 500ms| 4 --> |100ms + 500ms| 5 ``` 2. **ChannelFlow 使用範例**: ChannelFlow 是「**異步**」傳送訊息,**不需等待 `collect` 處理完就可以繼續 `send` 訊息**(collect 仍需要等待) ```kotlin= suspend fun channelFlow_Async() = coroutineScope { val totalTime = measureTimeMillis { channelFlow { for (it in 1 .. 5) { delay(100) println("Hello, send now: $it") send(it) // 可以一直發送,不用等待 } }.collect{ delay(500) // 這裡必須等待 println("World, I get channel flow item: $it") } } println("Flow with async, total use time: $totalTime") } ``` > ![](https://i.imgur.com/zLM0r1y.png) ### Flow 切換 Thread - flowOn * 在 Flow 中要切換 Thread 就需要使用到 `flowOn` 關鍵字,**`flowOn` 影響到的範圍是在 ++collect++ 之前** :::info 類似 RxJava 中切換 Thread 的 `observeOn`、`subscribeOn` 方法 ::: > 以下範例將 `flow`、`map` 運作在 IO 執行序,`collect `接收在 Main 執行序接收! ```kotlin= suspend fun flowSwitchThread() = coroutineScope { flow{ for (i in 1..3) { println("---Flow thread: ${Thread.currentThread().name}=$i") emit(i) } } .map { println("------Map thread: ${Thread.currentThread().name}=$it") it * it } .flowOn(Dispatchers.IO) // 讓 flow、map 運作在 IO 上 .collect { // collect 仍運作在 main println("Collect thread: ${Thread.currentThread().name}=$it") } } ``` > ![](https://i.imgur.com/X4tfQes.png) :::warning * 我們在使用 Coroutine 時是使用 `withContext` 函數做切換,但 **在 Flow 中不可以使用 `withContext` 切換**! ::: * **Flow#`collect` 運作在哪個執行序是由 flow 在哪個執行序啟動為準**(以下指定運作 Flow 的執行序) ```kotlin= suspend fun flowSwitchThread2() = coroutineScope { withContext(newSingleThreadContext("HelloWorld")) { flow{ for (i in 1..3) { println("---Flow thread: ${Thread.currentThread().name}=$i") emit(i) } } .map { println("------Map thread: ${Thread.currentThread().name}=$it") it * it } .flowOn(Dispatchers.IO) .collect { // 切換到 HelloWorld Thread println("Collect thread: ${Thread.currentThread().name}=$it") } } } ``` > ![](https://i.imgur.com/70U8sFT.png) * `flowOn` 是 **影響上游的操作**,所以一個 Flow 中可以多次切換 flowOn 去指定業務邏輯要在哪個不同 Thread 的運作;請注意以下 Thread 切換操作 ```kotlin= suspend fun flowSwitchThread3() = coroutineScope { val customerDispatcher = newSingleThreadContext("HelloWorld") flow{ for (i in 1..3) { println("---Flow thread: ${Thread.currentThread().name}=$i") emit(i) } } .map { // map 1 println("------Map on customerDispatcher: ${Thread.currentThread().name}=$it") it * it } .flowOn(customerDispatcher) // 影響上游操作: 影響 flow、map 1 兩個操作 .map { // map 2 println("------Map on IO: ${Thread.currentThread().name}=$it") it * it } .flowOn(Dispatchers.IO) // 只影響 map 2 操作 .collect { println("Collect thread: ${Thread.currentThread().name}=$it") } } ``` > ![](https://i.imgur.com/PqksYyq.png) ### 取消、關閉 Flow 流 * Flow 是 **可以被關閉、取消**,範例如下… ```kotlin= fun main() : Unit = runBlocking { withTimeoutOrNull(2000) { flow{ for (i in 1..5) { delay(400) emit(i) println("---Flow=$i") } }.collect { delay(400) println("Collect get: $it") } } println("Main done.") } ``` > ![](https://i.imgur.com/gh1a6Np.png) ### 監聽 Flow 流 * 在 Flow 的運行中可以知道 Flow 的開始 `onStart`、結束 `onCompletion` :::info 類似 RxJava 中的 `do` 函數 ::: ```kotlin= fun main() : Unit = runBlocking { flow { for (i in 1..5) { emit(i) println("---Flow=$i") } }.onStart { println("onStart ~~") }.onCompletion { println("onCompletion ~~") }.collect { println("Collect get: $it") } } ``` > ![](https://i.imgur.com/BER4Bjh.png) ### Flow 的協程 & Sequences 的同步 * Flow 是按照順序執行,而 Sequences 也可以達到相同效果,但兩者仍有差異 | 順序類 | 發送數據的方法 | 差異 | | - | - | - | | `Flow` | `emit` | 同步,但是發送資料時內部是使用協程的機制,並不堵塞執行序 | | `Sequences` | `yield` | 同步,內部 **不支援 suspend function**(會堵塞執行序) | 1. **Flow 非同步**: 以下範例中,Flow 在發送 (emit) 後資料後,會把 MainThread 讓出 (`delay` 函數) 讓其他需要 MainThread 的函數去使用 > 以下的測試將只會在 MainThread 執行 **測試的目的是為了確認 `Flow` 在執行 `suspend function` 時是否會堵塞 MainThread**,如果會堵塞則以下的 `launch` Lambda 將無法被運行 ```kotlin= suspend fun flowUse() = coroutineScope { launch { for(i in 1 .. 5) { delay(100) println("Launch item=${i}, Thread: ${Thread.currentThread().name}") } } flow { for (i in 1 .. 5) { delay(100) emit(i) println("flow item=${i}, Thread: ${Thread.currentThread().name}") } }.collect { println("collect get=(${it}), Thread: ${Thread.currentThread().name}") } println("Done") } ``` > 從結果來看 `flow` 是不對堵塞 CPU 的 > > ![](https://i.imgur.com/aS3SjRG.png) :::info * 由上圖結果可知,`flow` 在運行 `suspend function` 時不會堵塞 MainThread,它會讓出 Thread 的使用權給其他需要的函數使用 > 從這裡我們也可以看出 `flow` 是使用了「協程, Coroutine」技術,才可以達到不堵塞單一執行序,而執行異步的行為! ::: 2. **Sequences 同步**: **sequences 會堵塞 Mainthread 的使用(正確點來說是,當前運行的 Thread)** 直到它執行完畢 以下案例,同樣的程式,不過我們將 `flow` 換成 `sequence`,來觀察它是否會堵塞 MainThread 的執行 ```kotlin= suspend fun sequencesUse() = coroutineScope { launch { // sequences 會堵占 main 的使用 for(i in 1 .. 5) { delay(100) println("Launch item=${i}, Thread: ${Thread.currentThread().name}") } } sequence { for (i in 1 .. 5) { Thread.sleep(100) yield(i) // 等同 emit 的意思 println("sequence item=${i}, Thread: ${Thread.currentThread().name}") } }.forEach { println("forEach get=(${it}), Thread: ${Thread.currentThread().name}") } println("Done") } ``` > 可以看到 Sequence 佔用了當前 MainThread 的執行 > > ![](https://i.imgur.com/iiYHJ6a.png) :::info * 在特別強調一次: **sequences 堵塞的是當前執行序**,而不是只堵塞 Mainthread ```kotlin= // 創建指定 Thread 測試 fun main() : Unit = runBlocking(newSingleThreadContext("Hello")) { // 堵塞當前 thread sequencesUse() } ``` > ![](https://i.imgur.com/hhSzZwo.png) ::: ### 監看 Flow 結束:就算異常也要看到結束 * Flow 是一個 Suspend function,如果需要在 Flow 結束時 (不管是正常結束、還是拋出錯誤),通知使用者進行操作 > 這裡只說明如何判斷 Flow 的結束,並 **不包含 Flow 的錯誤捕捉** * 可以通過以下兩種方式 1. **imperative**:用一個大的 try/finally 包裹,**透過 finally 通知使用者 Flow 已經完成**; > 沒有 catch 的話就不能捕捉 Exception,只能通過 finally 知道最終結果 ```kotlin= suspend fun flowTryFinally() = coroutineScope { try { flowOf(1, 2, 3) .map { it * it } .collect { if (it == 4) { throw Exception("Test throw. $it") } } } finally { println("Try finally flow finish.") } } fun main() : Unit = runBlocking { flowTryFinally() // 不會執行到 println("Main finish.") } ``` > ![](https://i.imgur.com/9gkYhoW.png) 2. **declatative**:透過 `onCompletion` 函數,就可以達到上面 `finally` 的相同效果! ```kotlin= suspend fun flowOnCompletion() = coroutineScope { flowOf(1, 2, 3) .map { it * it } .onCompletion { println("onCompletion flow finish.") } .collect { if (it == 9) { throw Exception("Test throw. $it") } } } fun main() : Unit = runBlocking { // flowTryFinally() flowOnCompletion() // 不會執行到 println("Main finish.") } ``` > ![](https://i.imgur.com/VvM9WW2.png) ### Flow 異常處理 / 重試 * 上面我們說了 Flow 的結束,但並沒有說如何處理異常;而這邊我們就特別來說說 Flow 是如何處理異常的;**Flow 處理異常有兩種方案** 1. 使用傳統的 `try/catch` 處理 ```kotlin= suspend fun traditionTryCatch() { try { flowOf(1, 2, 3) .map { it * it } .onCompletion { println("onCompletion flow finish.") } .collect { if (it == 9) { throw Exception("Test throw. $it") } } } catch (e : Exception) { println("flow get exception: $e") } } fun main() : Unit = runBlocking { traditionTryCatch() println("Main finish") } ``` 2. **使用 `catch` 操作符**:catch 操作符可以捕捉 **上游** 的操作錯誤 ```kotlin= suspend fun flowCatch() { flowOf(1, 2, 3) .map { if (it == 3) { throw Exception("Test throw. $it") } it * it } .catch { // 捕捉上游 println("flow get exception: $it") } .onCompletion { // 不影響下游 println("onCompletion flow finish. e: $it") } .collect { println("Flow catch: $it.") } } ``` > ![](https://i.imgur.com/YgddRAy.png) :::warning * 何謂上游錯誤 ? 就是在設定 `catch` 操作符之前的錯誤 ```kotlin= suspend fun flowCatch2() { flowOf(1, 2, 3) .catch { // 這時就無法捕捉 map 中的錯誤 println("flow get exception: $it") } .map { if (it == 3) { throw Exception("Test throw. $it") } it * it } .onCompletion { println("onCompletion flow finish. e: $it") } .collect { println("Flow catch: $it.") } } ``` 由此我們也 **可以知道 `catch` 是無法處理 `collect` 中的錯誤的** > ![](https://i.imgur.com/0uzaMaN.png) ::: * Flow 在發生錯誤時可以透過 `retry`、`retryWhen` 操作符對上游做重試 :::info * **`retry` 重試次數是都是從 1 開始計算;`retryWhen` 重試次數是都是從 0 開始通知** ::: 1. `retry` 操作符可以透過 return Boolean 來決定是否再次重試;Return true 代表重試,否則不重試 ```kotlin= suspend fun flowCatchRetry() { flowOf(1, 2, 3) .onEach { println("Current number=($it)") if (it == 2) { throw Exception("Test throw=($it)") } } .retry(2) { // 重試兩次 if (it.message == "Test throw=(2)") { println("Handle exception.") return@retry true } false } .onCompletion { println("onCompletion flow finish.") } .collect { if (it == 9) { throw Exception("Test throw. $it") } } } ``` > ![reference link](https://i.imgur.com/R8wzs9I.png) 2. `retryWhen` 可以達到跟上述一樣的效果;`retryWhen` 會不斷地重試,並且它多了一個當前重試次數給使用者判斷 ```kotlin= suspend fun flowCatchRetryWhen() { flowOf(1, 2, 3) .onEach { println("Current number=($it)") if (it == 2) { throw Exception("Test throw=($it)") } } .retryWhen {e, times -> println("Current try times=($times), e=($e)") if(e is Exception) { // 同 `retry`,返回 true 代表同意重試 return@retryWhen times < 2 } false } .onCompletion { println("onCompletion flow finish.") } .collect { if (it == 9) { throw Exception("Test throw. $it") } } } ``` > ![](https://i.imgur.com/xmCSJCb.png) ## Flow & RxJava Koltin 協程可以透過一些類的組合來達到等同於 RxJava 的效果,如下表 | RxJava | Corotines | | - | - | | `Single<T>` | `Defered<T>` | | `Maybe<T>` | `Defered<T>` | | `Completable` | `Job` | | `Observable<T>` | `Channel<T>`、`Flow<T>` | | `Flowable<T>` | `Channel<T>`、`Flow<T>` | ### Cold Stream 冷流:Flow * Cold Stream 如同上面的範例一樣,**在呼叫 Flow#`collect` 後才開始運行 Flow 的流程** > 如同 RxJava 使用 subscribe 函數才開始運行 1. 實驗一:延遲 `collect`,觀察是否會少收到 Flow 的資料 ```kotlin= @Test fun testColdStream() { var flow: Flow<String>? = null CoroutineScope(Dispatchers.IO).launch { println("start flow: ${Thread.currentThread().name}") flow = flowOf("A", "B", "C", "D", "E", "F", "G") .onEach { // onEach 每次發送前執行 delay(100) println("${Thread.currentThread().name}, send: $it") } } println("start") runBlocking { // 延遲 `collect` delay(300) flow!!.collect { println("${Thread.currentThread().name}, receive: $it") } } println("done") } ``` > 從結果可以看出來,我其實已經延遲 `collect`,但是 Flow 仍是等到有人 `collect` 才發送資料 > > ![image](https://hackmd.io/_uploads/B1CyL1xP6.png) 2. 實驗二:創建兩個 `collect`,並延遲 `collect`,觀察兩個是否會少收到 Flow 的資料 ```kotlin= @Test fun testColdStream() { var flow: Flow<String>? = null CoroutineScope(Dispatchers.IO).launch { println("start flow: ${Thread.currentThread().name}") flow = flowOf("A", "B", "C", "D", "E", "F", "G") .onEach { // onEach 每次發送前執行 delay(100) println("${Thread.currentThread().name}, send: $it") } } println("start") runBlocking { delay(300) flow!!.collect { println("1111 ${Thread.currentThread().name}, receive: $it") } delay(100) flow!!.collect { println("2222 ${Thread.currentThread().name}, receive: $it") } } println("done") } ``` > 從結果可以看到,第二個 `collect` **仍會完整地收到全部的資料** > > ![image](https://hackmd.io/_uploads/HkDtLgxD6.png) ### Hot Stream 熱流:MutableSharedFlow * **Hot Stream 如同直播,數據要即時擷取否則過了就不會再見到**;Kotlin 則可以透過 `MutableSharedFlow` 的協助來到相同的效果 ```kotlin= @Test fun testHotStream() { // 創建發射器,熱流發射器 val flow: MutableSharedFlow<String> = MutableSharedFlow() CoroutineScope(Dispatchers.IO).launch { println("start flow: ${Thread.currentThread().name}") flowOf("A", "B", "C", "D", "E", "F", "G") .onEach { delay(100) println("${Thread.currentThread().name}, send: $it") flow.emit(it) } .onCompletion { println("Flow completed") } .launchIn(this) // 啟動 Flow,這裡是使用 launchIn } println("start") runBlocking { delay(300) // 只會收到當下的數據! flow.collect { println("${Thread.currentThread().name}, receive: $it") } } println("done") } ``` > ![image](https://hackmd.io/_uploads/Sk-qOkgDT.png) ## Backpressure 背壓 首先先來介紹一下 **何謂 Backpressure** ? 我們知道 Flow 就像是一個生產者消費者模型,而 Backpressure 的情況則是 **生產者的產量遠遠大於消費者** :::danger Backpressure 產生後如果沒有策略處理則可能導致應用崩潰 ::: RxJava 有對 Backpressure 有相對應的策略,反映在 Flow 中也有相同的策略,請見下表 | RxJava | Flow | 說明 | | - | - | - | | `BUFFER` | buffer() | Buffer 用來儲存尚未處理的數據,沒有固定 Buffer 大小,有可能導致 OOM | | `DROP` | - | Flow 緩衝池滿了,則拋棄準備進入緩衝池的新數據 | | `LATEST` | conflate() | 行為同 `DROP`,但 `LETEST` 會強制將最後一個數據放入緩衝池中 | ### Flow 實現 BUFFER 策略 * 首先先來看看沒有 Buffer 時,Flow 面對消費者/生產者時間差,會有甚麼反應 ```kotlin= suspend fun flowWithoutBuffer() = coroutineScope { fun curTime() = System.currentTimeMillis() var startTimeStamp : Long = 0 flowOf(1, 2, 3, 4, 5) .onStart { startTimeStamp = curTime() }.onEach { println("Supplier $it (${curTime() - startTimeStamp} ms).") } .collect { // 一個一個處理 println("Consumer $it start(${curTime() - startTimeStamp} ms).") delay(500) // 延緩消費者 println("Consumer $it finish(${curTime() - startTimeStamp} ms).") } } ``` 如下圖,我們可以看到沒有 Buffer 機制,那 flow 則要等待 `collect` 處理完,才能發送下一個 `emit` 數據 > ![](https://i.imgur.com/Joeqou9.png) * 以下使用 flow 來達成 RxJava `BUFFER` 的功能 (重點其實就是加了一個 `buffer` 函數) ```kotlin= suspend fun flowWithBuffer() = coroutineScope { fun curTime() = System.currentTimeMillis() var startTimeStamp : Long = 0 flowOf(1, 2, 3, 4, 5) .onStart { startTimeStamp = curTime() }.onEach { println("Supplier $it (${curTime() - startTimeStamp} ms).") } .buffer() // 不限定則 Buffer 無限大 .collect { // 一個一個處理 println("Consumer $it start(${curTime() - startTimeStamp} ms).") delay(500) // 延緩消費者 println("Consumer $it finish(${curTime() - startTimeStamp} ms).") } } ``` 故意延遲 Comsumer 消耗,但這些仍在 Buffer 中,就不必等待 `collect` 處理完就可以 emit 下一個數據 > ![](https://i.imgur.com/mQ8ZjTh.png) * Flow Buffer 如果沒有給定 `capacity` 限制則無限大,**假設有給定 `capacity` 數值,則須加設預設的 2 個容量** ```kotlin= suspend fun flowBuffer_2() { fun curTime() = System.currentTimeMillis() var startTimeStamp : Long = 0 flowOf(1, 2, 3, 4, 5) .onStart { startTimeStamp = curTime() }.onEach { println("Supplier $it (${curTime() - startTimeStamp} ms).") } .buffer(1) // 它預設有 2 個 capacity .collect { // 一個一個處理 println("Consumer $it start(${curTime() - startTimeStamp} ms).") delay(500) // 延緩消費者 println("Consumer $it finish(${curTime() - startTimeStamp} ms).") } } ``` 可以看到 buffer 明明設定為 1,不過 flow 在 emit 時直到 3 才真正堵塞,由此可見 **`buffer` 預設有 2 個 capacity** > ![](https://i.imgur.com/THnPIoI.png) ### BUFFER 策略:異步併發 * Flow 使用 `buffer` 操作就可以達到併發的效果 (如果 Buffer 尚未滿的情況下) 1. 首先我們知道一般 **非 Channel 的 Flow 是一個同步操作**,必須要等待 collect 操作完才可以執行下一個步驟; ```kotlin= suspend fun flowNoBuffer() { val uesTimes = measureTimeMillis { flowOf(1, 2, 3, 4, 5) .onEach { delay(100) } .collect { delay(500) println("$it") } } println("Without buffer=($uesTimes ms)") } ``` > ![](https://i.imgur.com/AIHZhUI.png) 2. 這時如果多了 `buffer` 情況就會如同「**ChannelFlow**」,**不需等待 collect 結束就可以執行下一個操作**;可以達到類似 ChannelFlow 的效果 ```kotlin= suspend fun flowBufferAsChannel() { val uesTimes = measureTimeMillis { flowOf(1, 2, 3, 4, 5) .onEach { delay(100) } .buffer() .collect { delay(500) println("$it") } } println("Without buffer=($uesTimes ms)") } ``` > ![](https://i.imgur.com/OIoplpV.png) ### Flow 實現 LATEST 策略 * 使用 Flow 實現 RxJava 中的 `LATEST` (`LATEST` 的特色是會保存最後一個數據,我們就檢查最後一個數據是否有被保存) ```kotlin= suspend fun flowLatest() { fun curTime() = System.currentTimeMillis() var startTimeStamp : Long = 0 flowOf(1, 2, 3, 4, 5) .onStart { startTimeStamp = curTime() }.onEach { println("Supplier $it (${curTime() - startTimeStamp} ms).") } .conflate() // LATEST 策略 .collect { // 一個一個處理 println("Consumer $it start(${curTime() - startTimeStamp} ms).") delay(500) // 延緩消費者 println("Consumer $it finish(${curTime() - startTimeStamp} ms).") } } ``` > ![](https://i.imgur.com/jyUg6Pl.png) ## Flow 其他操作 ### 轉換 transform * 在 `transform` 操作符中幾個特點:可以多次 emit 數據、**emit 數據沒有限制** 1. 多次 emit ```kotlin= suspend fun transformMultiEmit() = coroutineScope { val startTimeStamp = System.currentTimeMillis() (1..3).asFlow() .transform { println("Transform --- $it") emit(it * 2) // 多次 emit delay(100) emit(it * 4) } .collect{ println("Collect($it), time=(${System.currentTimeMillis() - startTimeStamp})") } } ``` > ![](https://i.imgur.com/1Ala79P.png) 2. emit 數據沒有限制:一般 Flow 是有限制 emit 數據類型,其類型必須與 Flow 相同,而 `transform` 內的 emit 則沒有限制 ```kotlin= suspend fun transformEmitOtherType() = coroutineScope { val startTimeStamp = System.currentTimeMillis() (1..3).asFlow() .transform { println("Transform --- $it") emit(it) // 多次 emit delay(100) emit("Hello: $it") } .collect{ println("Collect=($it), time=(${System.currentTimeMillis() - startTimeStamp})") } } ``` > ![](https://i.imgur.com/Wteex8T.png) ### 限制取用 take * 一般的 Flow 在發射數據 (emit) 時都沒有限制,這時 **如果你要限制數據的發射接收數量,就可以使用 `take` 操作符** ```kotlin= fun main() : Unit = runBlocking { flowOf(1, 2, 3, 4, 5) .take(3) // 限制接收數量 .collect { println("Flow with take=($it)") } println("Main finish.") } ``` > ![](https://i.imgur.com/Wqwerwi.png) ### Flow 計算結果 * Flow 除了 emit 以外我們還可以透過兩個操作符來聚集所有 Flow 的結果 1. **`reduce` 操作符**:獲取上一次的結果,返回下一個結果 ```kotlin= suspend fun flowReduce() = coroutineScope { val res = (1..5) .asFlow() .reduce {lastValue, curValue -> println("lastValue=($lastValue), curValue=($curValue)") lastValue + curValue } println("Reduce=($res)") } ``` > ![](https://i.imgur.com/eugiCgF.png) 2. **`fold` 操作符**:跟 `reduce` 操作符很像,不過它可以設定初始值,透過初始值開始計算 ```kotlin= suspend fun flowFold() = coroutineScope { val res = (1..5) .asFlow() // 初始值設定為 3,從 3 開始計算 .fold(3) {lastValue, curValue -> println("lastValue=($lastValue), curValue=($curValue)") lastValue + curValue } println("Reduce=($res)") } ``` > ![](https://i.imgur.com/CPmNxZs.png) ### 合併操作符 * **Flow 也有合併操作符,可以合併兩個不同的 Flow** 1. **`zip` 操作符**:兩個不同的 Flow ```kotlin= suspend fun flowZip() = coroutineScope { val flowA = flowOf(1, 2, 3, 4, 5) val flowB = flowOf("A", "B", "C", "D", "E") flowA.zip(flowB) { a, b -> val tmp = "Zip flowA=($a), flowB=($b)" println(tmp) tmp }.collect { println("Collect=($it)") } } ``` > ![](https://i.imgur.com/CFQi0w0.png) :::info * 當兩個 Flow 的數據量不同時,會以最少數據量的 Flow 為準 ```kotlin= suspend fun flowZip2() = coroutineScope { // 數量差異 val flowA = flowOf(1, 2, 3, 4) val flowB = flowOf("A", "B", "C", "D", "E") flowA.zip(flowB) { a, b -> val tmp = "Zip flowA=($a), flowB=($b)" println(tmp) tmp }.collect { println("Collect=($it)") } } ``` > ![](https://i.imgur.com/W2zJ4hw.png) ::: 2. **`combine` 操作符**:與 `zip` 類似 (但不同);**當兩個 Flow 數量不同時**,不足處會取用最後一個數據來合併 ```kotlin= suspend fun flowCombine() = coroutineScope { val flowA = flowOf(1, 2) val flowB = flowOf("A", "B", "C", "D", "E") flowA.combine(flowB) { a, b -> val tmp = "Combine flowA=($a), flowB=($b)" println(tmp) tmp }.collect { println("Collect=($it)") } } ``` > ![](https://i.imgur.com/zdwZKDk.png) * 上面兩個操作符會將 Flow 合併成一個 Flow (算是合併、並聯);操作符 `flatMerge` 則是 **==串聯 Flow==,將數據會合成一個流** ```kotlin= suspend fun flowConcat() = coroutineScope { val flowA = flowOf(1, 2, 3, 4) val flowB = flowOf("A", "B", "C", "D", "E") flowOf(flowA, flowB) .flattenConcat() .collect { println("Collect=($it)") } } ``` > ![](https://i.imgur.com/QvoYeJP.png) ### Nest Flow:Flat 鋪平 * 如果有使用到巢狀 (Nest) Flow 並控制其行為的狀況就可以使用 Flat 相關操作符 | Flat 相關操作符 | 特色 | | -------- | -------- | | flatMapConcat | 等待 Flat 內部完成才通知 Collect | | flatMapMerge | 併發操作,不會等待 Flat 內部就直接通知 Collect | | flatMapLatest | 當有第二次數據發送 (emit) 時,就會停止 Collect 接收 | 1. **`flatMapConcat` 操作符**:等待 Flat 內部完成才通知 Collect ```kotlin= suspend fun flatConcat() { var startTime : Long = 0 (1..5) .asFlow() .onStart { startTime = System.currentTimeMillis() } .flatMapConcat { flow { emit("$it: First") delay(500) emit("$it: Second") } } .collect { println("Collect=($it), time=(${System.currentTimeMillis() - startTime} ms)") } } ``` > ![](https://i.imgur.com/Qi5PmIO.png) 2. **`flatMapMerge` 操作符**:併發操作,不會等待 Flat 內部就直接通知 Collect ```kotlin= suspend fun flatMerge() { var startTime : Long = 0 (1..5) .asFlow() .onStart { startTime = System.currentTimeMillis() } .flatMapMerge { flow { emit("$it: First") delay(500) emit("$it: Second") } } .collect { println("Collect=($it), time=(${System.currentTimeMillis() - startTime} ms)") } } ``` > ![](https://i.imgur.com/SPlkuWw.png) 3. **`flatMapLatest` 操作符**:當有第二次數據發送 (emit) 時,就會停止 Collect 接收 ```kotlin= suspend fun flatLatest() { var startTime : Long = 0 (1..5) .asFlow() .onStart { startTime = System.currentTimeMillis() } .flatMapLatest { flow { emit("$it: First") delay(500) emit("$it: Second") } } .collect { println("Collect=($it), time=(${System.currentTimeMillis() - startTime} ms)") } } ``` > ![](https://i.imgur.com/ciVxlDl.png) ## 更多的 Kotlin 語言相關文章 在這裡,我們提供了一系列豐富且深入的 Kotlin 語言相關文章,涵蓋了從基礎到進階的各個方面。讓我們一起來探索這些精彩內容! ### Kotlin 語言基礎 * **Kotlin 語言基礎**:想要建立堅實的 Kotlin 基礎?以下這些文章將帶你深入探索 Kotlin 的關鍵基礎和概念,幫你打造更堅固的 Kotlin 語言基礎 :::info * [**Kotlin 函數、類、屬性 | DataClass、Sealed、Object 關鍵字 | Enum、Companion、NPE**](https://devtechascendancy.com/kotlin-functions_oop_dataclass_sealed_object/) * [**深入探究 Kotlin 與 Java 泛型:擦除、取得泛型類型、型變、投影 | 協變、逆變**](https://devtechascendancy.com/explore-kotlin-java-generics_type_erasure/) * [**深入 Kotlin 函數特性:Inline、擴展、標準函數全解析 | 提升程式碼效能與可讀性**](https://devtechascendancy.com/kotlin_inline_extensions_standards-func/) * [**Kotlin DSL、操作符、中綴表達式 Infix | DSL 詳解 | DSL 設計與應用**](https://devtechascendancy.com/kotlin-dsl-operators-infix-explained/) ::: ### Kotlin 特性、特點 * **Kotlin 特性、特點**:探索 Kotlin 的獨特特性和功能,加深對 Kotlin 語言的理解,並增強對於語言特性的應用 :::warning * [**Kotlin 代理與懶加載機制:使用、lazy 深度解析**](https://devtechascendancy.com/kotlin-delegate_java-proxy_lateinit_lazy/) * [**Kotlin Lambda 編程 & Bytecode | Array & Collections 集合 | 集合函數式 API**](https://devtechascendancy.com/kotlin-lambda-bytecode-array-collections-functional/) * [**深入理解 Kotlin:智能推斷與 Contact 規則**](https://devtechascendancy.com/kotlin-smart-inference-contract-rules-guide/) ::: ### Kotlin 進階:協程、響應式、異步 * **Kotlin 進階:協程、響應式、異步**:若想深入學習 Kotlin 的進階主題,包括協程應用、Channel 使用、以及 Flow 的探索,請查看以下文章 :::danger * [**應用 Kotlin 協程:對比 Thread、創建協程、任務掛起 | Dispatcher、CoroutineContext、CoroutineScope**](https://devtechascendancy.com/applied-kotlin-coroutines-in-depth-guide/) * [**Kotlin Channel 使用介紹 | Select、Actor | 生產者消費者**](https://devtechascendancy.com/kotlin-channel_select_actor_cs/) * [**探索 Kotlin Flow:基本使用、RxJava 對比、背壓機制 | Flow 細節**](https://devtechascendancy.com/kotlin-flow-usage_compare-rx_backpressure/) ::: ## Appendix & FAQ :::info ::: ###### tags: `Kotlin`