---
title: 'Flow & RxJava'
disqus: kyleAlien
---
Flow & RxJava
===
## Overview of Content
我們在 Java 中常用的響應式編程模型就是 RxJava;而 Flow 就是 Koltin 結合 Coroutine 響應式編程的產物
:::info
響應式編程:簡單來想就是依照上一步反應而觸發不同的行為編程;最常見的就是 API 請求,由 API 請求的結果來驅動 APP 內的邏輯,讓它們之間產生對應的反應(像是很多的監聽者)
:::
[TOC]
## Flow 基本使用
Flow 使用起來類似於 RxJava,其函數的對應如無下
| 功能 | Flow | RxJava |
| ------------ | ---- | ------ |
| 對流發送資料 | emit() | onNext() |
| 接收資料 | collect() | subscribe() |
Flow 是一個介面
```kotlin=
// Flow 源碼
public interface Flow<out T> {
public suspend fun collect(collector: FlowCollector<T>)
}
```
### flow 基本使用
* **Flow 可以返回(或是說發射 `emit`)多個異步計算的結果**,**並用 `collect` 接收結果**;FlowBuilder 有幾種使用方式,常用的方式如下
1. **使用頂層函數 `flow{ }`**:可以直接使用 `flow{ }` 創建 SafeFlow
```kotlin=
// 源碼
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
```
**`flow{ }` 的使用範例如下**:
```kotlin=
// 範例
suspend fun sampleUse01() = coroutineScope {
flow<Int> {
for (i in 1..5) {
val res = measureTimeMillis {
delay(1000L * i)
emit(i)
}
println("Use time: $res")
}
}.collect {
println("res: $it")
}
}
```
> 
2. **使用頂層函數 `flowOf{ }`**:其實內部就是透過 flow 並呼叫 `emit` 函數
```kotlin=
// 源碼
public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
for (element in elements) {
emit(element)
}
}
```
**`flowOf{ }` 使用範例如下**:
```kotlin=
// 範例
suspend fun sampleUse02() = coroutineScope {
flowOf("A", "B", "C", "D")
.onEach { // onEach 每次發送前執行
delay(100)
}
.collect {
println("it each: $it")
}
}
```
> 
3. **使用頂層函數 `asFlow`**:它是一個 `Iterable` 類的拓展泛型函數,同樣透過 `emit` 發送
```kotlin=
// 源碼
public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
forEach { value ->
emit(value)
}
}
```
**`asFlow` 的使用範例如下**:
```kotlin=
// 範例
suspend fun sampleUse03() = coroutineScope {
listOf(1.1, 2.2, 3.3, 4.4, 5.5).asFlow()
.onEach { // onEach 每次發送前執行
delay(100)
}
.collect {
println("it each: $it")
}
}
```
4. 使用頂層函數 `channelFlow`,跟上面的 **差別在於,它透過 channel 的 send 發送**(Channel 的特點是可以與異步產生掛勾)
```kotlin=
// 源碼
public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
ChannelFlowBuilder(block)
```
**`channelFlow` 的使用範例如下**:
```kotlin=
// 範例
suspend fun sampleUse04() = coroutineScope {
channelFlow {
for (i in 1..5) {
val res = measureTimeMillis {
delay(1000L * i)
send(i)
}
println("Use time: $res")
}
}.collect {
println("channel res: $it")
}
}
```
> 
### Channel Flow 特點
* Channel Flow 與一般 Flow 兩個都是 **生產者跟消費者模型**,但是還是有差異,其差異如下
| Flow 種類 | 是否同步 | 特點 |
| --------- | -------- | ---- |
| Flow | 是 | 非掛起(非阻塞,CPU 沒有讓出資源) |
| ChannelFlow | 否、**異步** | **可切換上下文(不同 `Coroutine` 間通訊)** |
1. **Flow 使用範例**:
一般的 Flow 是「**同步**」的方式傳遞訊息,**必須等待 `collect` 處理完畢才可以進行下一次 `emit` 行為**
```kotlin=
suspend fun normalFlow_Sync() = coroutineScope {
val totalTime = measureTimeMillis {
flow { // 與使用 `flowOf` 則相同
for (it in 1 .. 5) { // 1 ~ 5
delay(100)
println("Hello, emit now: $it")
emit(it)
}
}.collect{
delay(500)
println("World, I get normal flow item: $it")
}
}
//
println("Flow with sync, total use time: $totalTime")
}
```
> 
概念圖如下,每次都必須傳輸(耗時 100ms)、收集(耗時 500ms)結束才往下一個事件走
```mermaid
graph TB
1 --> |100ms + 500ms| 2 --> |100ms + 500ms| 3 --> |100ms + 500ms| 4 --> |100ms + 500ms| 5
```
2. **ChannelFlow 使用範例**:
ChannelFlow 是「**異步**」傳送訊息,**不需等待 `collect` 處理完就可以繼續 `send` 訊息**(collect 仍需要等待)
```kotlin=
suspend fun channelFlow_Async() = coroutineScope {
val totalTime = measureTimeMillis {
channelFlow {
for (it in 1 .. 5) {
delay(100)
println("Hello, send now: $it")
send(it) // 可以一直發送,不用等待
}
}.collect{
delay(500) // 這裡必須等待
println("World, I get channel flow item: $it")
}
}
println("Flow with async, total use time: $totalTime")
}
```
> 
### Flow 切換 Thread - flowOn
* 在 Flow 中要切換 Thread 就需要使用到 `flowOn` 關鍵字,**`flowOn` 影響到的範圍是在 ++collect++ 之前**
:::info
類似 RxJava 中切換 Thread 的 `observeOn`、`subscribeOn` 方法
:::
> 以下範例將 `flow`、`map` 運作在 IO 執行序,`collect `接收在 Main 執行序接收!
```kotlin=
suspend fun flowSwitchThread() = coroutineScope {
flow{
for (i in 1..3) {
println("---Flow thread: ${Thread.currentThread().name}=$i")
emit(i)
}
}
.map {
println("------Map thread: ${Thread.currentThread().name}=$it")
it * it
}
.flowOn(Dispatchers.IO) // 讓 flow、map 運作在 IO 上
.collect {
// collect 仍運作在 main
println("Collect thread: ${Thread.currentThread().name}=$it")
}
}
```
> 
:::warning
* 我們在使用 Coroutine 時是使用 `withContext` 函數做切換,但 **在 Flow 中不可以使用 `withContext` 切換**!
:::
* **Flow#`collect` 運作在哪個執行序是由 flow 在哪個執行序啟動為準**(以下指定運作 Flow 的執行序)
```kotlin=
suspend fun flowSwitchThread2() = coroutineScope {
withContext(newSingleThreadContext("HelloWorld")) {
flow{
for (i in 1..3) {
println("---Flow thread: ${Thread.currentThread().name}=$i")
emit(i)
}
}
.map {
println("------Map thread: ${Thread.currentThread().name}=$it")
it * it
}
.flowOn(Dispatchers.IO)
.collect {
// 切換到 HelloWorld Thread
println("Collect thread: ${Thread.currentThread().name}=$it")
}
}
}
```
> 
* `flowOn` 是 **影響上游的操作**,所以一個 Flow 中可以多次切換 flowOn 去指定業務邏輯要在哪個不同 Thread 的運作;請注意以下 Thread 切換操作
```kotlin=
suspend fun flowSwitchThread3() = coroutineScope {
val customerDispatcher = newSingleThreadContext("HelloWorld")
flow{
for (i in 1..3) {
println("---Flow thread: ${Thread.currentThread().name}=$i")
emit(i)
}
}
.map { // map 1
println("------Map on customerDispatcher: ${Thread.currentThread().name}=$it")
it * it
}
.flowOn(customerDispatcher) // 影響上游操作: 影響 flow、map 1 兩個操作
.map { // map 2
println("------Map on IO: ${Thread.currentThread().name}=$it")
it * it
}
.flowOn(Dispatchers.IO) // 只影響 map 2 操作
.collect {
println("Collect thread: ${Thread.currentThread().name}=$it")
}
}
```
> 
### 取消、關閉 Flow 流
* Flow 是 **可以被關閉、取消**,範例如下…
```kotlin=
fun main() : Unit = runBlocking {
withTimeoutOrNull(2000) {
flow{
for (i in 1..5) {
delay(400)
emit(i)
println("---Flow=$i")
}
}.collect {
delay(400)
println("Collect get: $it")
}
}
println("Main done.")
}
```
> 
### 監聽 Flow 流
* 在 Flow 的運行中可以知道 Flow 的開始 `onStart`、結束 `onCompletion`
:::info
類似 RxJava 中的 `do` 函數
:::
```kotlin=
fun main() : Unit = runBlocking {
flow {
for (i in 1..5) {
emit(i)
println("---Flow=$i")
}
}.onStart {
println("onStart ~~")
}.onCompletion {
println("onCompletion ~~")
}.collect {
println("Collect get: $it")
}
}
```
> 
### Flow 的協程 & Sequences 的同步
* Flow 是按照順序執行,而 Sequences 也可以達到相同效果,但兩者仍有差異
| 順序類 | 發送數據的方法 | 差異 |
| - | - | - |
| `Flow` | `emit` | 同步,但是發送資料時內部是使用協程的機制,並不堵塞執行序 |
| `Sequences` | `yield` | 同步,內部 **不支援 suspend function**(會堵塞執行序) |
1. **Flow 非同步**:
以下範例中,Flow 在發送 (emit) 後資料後,會把 MainThread 讓出 (`delay` 函數) 讓其他需要 MainThread 的函數去使用
> 以下的測試將只會在 MainThread 執行
**測試的目的是為了確認 `Flow` 在執行 `suspend function` 時是否會堵塞 MainThread**,如果會堵塞則以下的 `launch` Lambda 將無法被運行
```kotlin=
suspend fun flowUse() = coroutineScope {
launch {
for(i in 1 .. 5) {
delay(100)
println("Launch item=${i}, Thread: ${Thread.currentThread().name}")
}
}
flow {
for (i in 1 .. 5) {
delay(100)
emit(i)
println("flow item=${i}, Thread: ${Thread.currentThread().name}")
}
}.collect {
println("collect get=(${it}), Thread: ${Thread.currentThread().name}")
}
println("Done")
}
```
> 從結果來看 `flow` 是不對堵塞 CPU 的
>
> 
:::info
* 由上圖結果可知,`flow` 在運行 `suspend function` 時不會堵塞 MainThread,它會讓出 Thread 的使用權給其他需要的函數使用
> 從這裡我們也可以看出 `flow` 是使用了「協程, Coroutine」技術,才可以達到不堵塞單一執行序,而執行異步的行為!
:::
2. **Sequences 同步**:
**sequences 會堵塞 Mainthread 的使用(正確點來說是,當前運行的 Thread)** 直到它執行完畢
以下案例,同樣的程式,不過我們將 `flow` 換成 `sequence`,來觀察它是否會堵塞 MainThread 的執行
```kotlin=
suspend fun sequencesUse() = coroutineScope {
launch {
// sequences 會堵占 main 的使用
for(i in 1 .. 5) {
delay(100)
println("Launch item=${i}, Thread: ${Thread.currentThread().name}")
}
}
sequence {
for (i in 1 .. 5) {
Thread.sleep(100)
yield(i) // 等同 emit 的意思
println("sequence item=${i}, Thread: ${Thread.currentThread().name}")
}
}.forEach {
println("forEach get=(${it}), Thread: ${Thread.currentThread().name}")
}
println("Done")
}
```
> 可以看到 Sequence 佔用了當前 MainThread 的執行
>
> 
:::info
* 在特別強調一次:
**sequences 堵塞的是當前執行序**,而不是只堵塞 Mainthread
```kotlin=
// 創建指定 Thread 測試
fun main() : Unit = runBlocking(newSingleThreadContext("Hello")) { // 堵塞當前 thread
sequencesUse()
}
```
> 
:::
### 監看 Flow 結束:就算異常也要看到結束
* Flow 是一個 Suspend function,如果需要在 Flow 結束時 (不管是正常結束、還是拋出錯誤),通知使用者進行操作
> 這裡只說明如何判斷 Flow 的結束,並 **不包含 Flow 的錯誤捕捉**
* 可以通過以下兩種方式
1. **imperative**:用一個大的 try/finally 包裹,**透過 finally 通知使用者 Flow 已經完成**;
> 沒有 catch 的話就不能捕捉 Exception,只能通過 finally 知道最終結果
```kotlin=
suspend fun flowTryFinally() = coroutineScope {
try {
flowOf(1, 2, 3)
.map {
it * it
}
.collect {
if (it == 4) {
throw Exception("Test throw. $it")
}
}
} finally {
println("Try finally flow finish.")
}
}
fun main() : Unit = runBlocking {
flowTryFinally()
// 不會執行到
println("Main finish.")
}
```
> 
2. **declatative**:透過 `onCompletion` 函數,就可以達到上面 `finally` 的相同效果!
```kotlin=
suspend fun flowOnCompletion() = coroutineScope {
flowOf(1, 2, 3)
.map {
it * it
}
.onCompletion {
println("onCompletion flow finish.")
}
.collect {
if (it == 9) {
throw Exception("Test throw. $it")
}
}
}
fun main() : Unit = runBlocking {
// flowTryFinally()
flowOnCompletion()
// 不會執行到
println("Main finish.")
}
```
> 
### Flow 異常處理 / 重試
* 上面我們說了 Flow 的結束,但並沒有說如何處理異常;而這邊我們就特別來說說 Flow 是如何處理異常的;**Flow 處理異常有兩種方案**
1. 使用傳統的 `try/catch` 處理
```kotlin=
suspend fun traditionTryCatch() {
try {
flowOf(1, 2, 3)
.map {
it * it
}
.onCompletion {
println("onCompletion flow finish.")
}
.collect {
if (it == 9) {
throw Exception("Test throw. $it")
}
}
} catch (e : Exception) {
println("flow get exception: $e")
}
}
fun main() : Unit = runBlocking {
traditionTryCatch()
println("Main finish")
}
```
2. **使用 `catch` 操作符**:catch 操作符可以捕捉 **上游** 的操作錯誤
```kotlin=
suspend fun flowCatch() {
flowOf(1, 2, 3)
.map {
if (it == 3) {
throw Exception("Test throw. $it")
}
it * it
}
.catch {
// 捕捉上游
println("flow get exception: $it")
}
.onCompletion {
// 不影響下游
println("onCompletion flow finish. e: $it")
}
.collect {
println("Flow catch: $it.")
}
}
```
> 
:::warning
* 何謂上游錯誤 ? 就是在設定 `catch` 操作符之前的錯誤
```kotlin=
suspend fun flowCatch2() {
flowOf(1, 2, 3)
.catch {
// 這時就無法捕捉 map 中的錯誤
println("flow get exception: $it")
}
.map {
if (it == 3) {
throw Exception("Test throw. $it")
}
it * it
}
.onCompletion {
println("onCompletion flow finish. e: $it")
}
.collect {
println("Flow catch: $it.")
}
}
```
由此我們也 **可以知道 `catch` 是無法處理 `collect` 中的錯誤的**
> 
:::
* Flow 在發生錯誤時可以透過 `retry`、`retryWhen` 操作符對上游做重試
:::info
* **`retry` 重試次數是都是從 1 開始計算;`retryWhen` 重試次數是都是從 0 開始通知**
:::
1. `retry` 操作符可以透過 return Boolean 來決定是否再次重試;Return true 代表重試,否則不重試
```kotlin=
suspend fun flowCatchRetry() {
flowOf(1, 2, 3)
.onEach {
println("Current number=($it)")
if (it == 2) {
throw Exception("Test throw=($it)")
}
}
.retry(2) { // 重試兩次
if (it.message == "Test throw=(2)") {
println("Handle exception.")
return@retry true
}
false
}
.onCompletion {
println("onCompletion flow finish.")
}
.collect {
if (it == 9) {
throw Exception("Test throw. $it")
}
}
}
```
> 
2. `retryWhen` 可以達到跟上述一樣的效果;`retryWhen` 會不斷地重試,並且它多了一個當前重試次數給使用者判斷
```kotlin=
suspend fun flowCatchRetryWhen() {
flowOf(1, 2, 3)
.onEach {
println("Current number=($it)")
if (it == 2) {
throw Exception("Test throw=($it)")
}
}
.retryWhen {e, times ->
println("Current try times=($times), e=($e)")
if(e is Exception) {
// 同 `retry`,返回 true 代表同意重試
return@retryWhen times < 2
}
false
}
.onCompletion {
println("onCompletion flow finish.")
}
.collect {
if (it == 9) {
throw Exception("Test throw. $it")
}
}
}
```
> 
## Flow & RxJava
Koltin 協程可以透過一些類的組合來達到等同於 RxJava 的效果,如下表
| RxJava | Corotines |
| - | - |
| `Single<T>` | `Defered<T>` |
| `Maybe<T>` | `Defered<T>` |
| `Completable` | `Job` |
| `Observable<T>` | `Channel<T>`、`Flow<T>` |
| `Flowable<T>` | `Channel<T>`、`Flow<T>` |
### Cold Stream 冷流:Flow
* Cold Stream 如同上面的範例一樣,**在呼叫 Flow#`collect` 後才開始運行 Flow 的流程**
> 如同 RxJava 使用 subscribe 函數才開始運行
1. 實驗一:延遲 `collect`,觀察是否會少收到 Flow 的資料
```kotlin=
@Test
fun testColdStream() {
var flow: Flow<String>? = null
CoroutineScope(Dispatchers.IO).launch {
println("start flow: ${Thread.currentThread().name}")
flow = flowOf("A", "B", "C", "D", "E", "F", "G")
.onEach { // onEach 每次發送前執行
delay(100)
println("${Thread.currentThread().name}, send: $it")
}
}
println("start")
runBlocking {
// 延遲 `collect`
delay(300)
flow!!.collect {
println("${Thread.currentThread().name}, receive: $it")
}
}
println("done")
}
```
> 從結果可以看出來,我其實已經延遲 `collect`,但是 Flow 仍是等到有人 `collect` 才發送資料
>
> 
2. 實驗二:創建兩個 `collect`,並延遲 `collect`,觀察兩個是否會少收到 Flow 的資料
```kotlin=
@Test
fun testColdStream() {
var flow: Flow<String>? = null
CoroutineScope(Dispatchers.IO).launch {
println("start flow: ${Thread.currentThread().name}")
flow = flowOf("A", "B", "C", "D", "E", "F", "G")
.onEach { // onEach 每次發送前執行
delay(100)
println("${Thread.currentThread().name}, send: $it")
}
}
println("start")
runBlocking {
delay(300)
flow!!.collect {
println("1111 ${Thread.currentThread().name}, receive: $it")
}
delay(100)
flow!!.collect {
println("2222 ${Thread.currentThread().name}, receive: $it")
}
}
println("done")
}
```
> 從結果可以看到,第二個 `collect` **仍會完整地收到全部的資料**
>
> 
### Hot Stream 熱流:MutableSharedFlow
* **Hot Stream 如同直播,數據要即時擷取否則過了就不會再見到**;Kotlin 則可以透過 `MutableSharedFlow` 的協助來到相同的效果
```kotlin=
@Test
fun testHotStream() {
// 創建發射器,熱流發射器
val flow: MutableSharedFlow<String> = MutableSharedFlow()
CoroutineScope(Dispatchers.IO).launch {
println("start flow: ${Thread.currentThread().name}")
flowOf("A", "B", "C", "D", "E", "F", "G")
.onEach {
delay(100)
println("${Thread.currentThread().name}, send: $it")
flow.emit(it)
}
.onCompletion {
println("Flow completed")
}
.launchIn(this) // 啟動 Flow,這裡是使用 launchIn
}
println("start")
runBlocking {
delay(300)
// 只會收到當下的數據!
flow.collect {
println("${Thread.currentThread().name}, receive: $it")
}
}
println("done")
}
```
> 
## Backpressure 背壓
首先先來介紹一下 **何謂 Backpressure** ? 我們知道 Flow 就像是一個生產者消費者模型,而 Backpressure 的情況則是 **生產者的產量遠遠大於消費者**
:::danger
Backpressure 產生後如果沒有策略處理則可能導致應用崩潰
:::
RxJava 有對 Backpressure 有相對應的策略,反映在 Flow 中也有相同的策略,請見下表
| RxJava | Flow | 說明 |
| - | - | - |
| `BUFFER` | buffer() | Buffer 用來儲存尚未處理的數據,沒有固定 Buffer 大小,有可能導致 OOM |
| `DROP` | - | Flow 緩衝池滿了,則拋棄準備進入緩衝池的新數據 |
| `LATEST` | conflate() | 行為同 `DROP`,但 `LETEST` 會強制將最後一個數據放入緩衝池中 |
### Flow 實現 BUFFER 策略
* 首先先來看看沒有 Buffer 時,Flow 面對消費者/生產者時間差,會有甚麼反應
```kotlin=
suspend fun flowWithoutBuffer() = coroutineScope {
fun curTime() = System.currentTimeMillis()
var startTimeStamp : Long = 0
flowOf(1, 2, 3, 4, 5)
.onStart {
startTimeStamp = curTime()
}.onEach {
println("Supplier $it (${curTime() - startTimeStamp} ms).")
}
.collect {
// 一個一個處理
println("Consumer $it start(${curTime() - startTimeStamp} ms).")
delay(500) // 延緩消費者
println("Consumer $it finish(${curTime() - startTimeStamp} ms).")
}
}
```
如下圖,我們可以看到沒有 Buffer 機制,那 flow 則要等待 `collect` 處理完,才能發送下一個 `emit` 數據
> 
* 以下使用 flow 來達成 RxJava `BUFFER` 的功能 (重點其實就是加了一個 `buffer` 函數)
```kotlin=
suspend fun flowWithBuffer() = coroutineScope {
fun curTime() = System.currentTimeMillis()
var startTimeStamp : Long = 0
flowOf(1, 2, 3, 4, 5)
.onStart {
startTimeStamp = curTime()
}.onEach {
println("Supplier $it (${curTime() - startTimeStamp} ms).")
}
.buffer() // 不限定則 Buffer 無限大
.collect {
// 一個一個處理
println("Consumer $it start(${curTime() - startTimeStamp} ms).")
delay(500) // 延緩消費者
println("Consumer $it finish(${curTime() - startTimeStamp} ms).")
}
}
```
故意延遲 Comsumer 消耗,但這些仍在 Buffer 中,就不必等待 `collect` 處理完就可以 emit 下一個數據
> 
* Flow Buffer 如果沒有給定 `capacity` 限制則無限大,**假設有給定 `capacity` 數值,則須加設預設的 2 個容量**
```kotlin=
suspend fun flowBuffer_2() {
fun curTime() = System.currentTimeMillis()
var startTimeStamp : Long = 0
flowOf(1, 2, 3, 4, 5)
.onStart {
startTimeStamp = curTime()
}.onEach {
println("Supplier $it (${curTime() - startTimeStamp} ms).")
}
.buffer(1) // 它預設有 2 個 capacity
.collect {
// 一個一個處理
println("Consumer $it start(${curTime() - startTimeStamp} ms).")
delay(500) // 延緩消費者
println("Consumer $it finish(${curTime() - startTimeStamp} ms).")
}
}
```
可以看到 buffer 明明設定為 1,不過 flow 在 emit 時直到 3 才真正堵塞,由此可見 **`buffer` 預設有 2 個 capacity**
> 
### BUFFER 策略:異步併發
* Flow 使用 `buffer` 操作就可以達到併發的效果 (如果 Buffer 尚未滿的情況下)
1. 首先我們知道一般 **非 Channel 的 Flow 是一個同步操作**,必須要等待 collect 操作完才可以執行下一個步驟;
```kotlin=
suspend fun flowNoBuffer() {
val uesTimes = measureTimeMillis {
flowOf(1, 2, 3, 4, 5)
.onEach {
delay(100)
}
.collect {
delay(500)
println("$it")
}
}
println("Without buffer=($uesTimes ms)")
}
```
> 
2. 這時如果多了 `buffer` 情況就會如同「**ChannelFlow**」,**不需等待 collect 結束就可以執行下一個操作**;可以達到類似 ChannelFlow 的效果
```kotlin=
suspend fun flowBufferAsChannel() {
val uesTimes = measureTimeMillis {
flowOf(1, 2, 3, 4, 5)
.onEach {
delay(100)
}
.buffer()
.collect {
delay(500)
println("$it")
}
}
println("Without buffer=($uesTimes ms)")
}
```
> 
### Flow 實現 LATEST 策略
* 使用 Flow 實現 RxJava 中的 `LATEST` (`LATEST` 的特色是會保存最後一個數據,我們就檢查最後一個數據是否有被保存)
```kotlin=
suspend fun flowLatest() {
fun curTime() = System.currentTimeMillis()
var startTimeStamp : Long = 0
flowOf(1, 2, 3, 4, 5)
.onStart {
startTimeStamp = curTime()
}.onEach {
println("Supplier $it (${curTime() - startTimeStamp} ms).")
}
.conflate() // LATEST 策略
.collect {
// 一個一個處理
println("Consumer $it start(${curTime() - startTimeStamp} ms).")
delay(500) // 延緩消費者
println("Consumer $it finish(${curTime() - startTimeStamp} ms).")
}
}
```
> 
## Flow 其他操作
### 轉換 transform
* 在 `transform` 操作符中幾個特點:可以多次 emit 數據、**emit 數據沒有限制**
1. 多次 emit
```kotlin=
suspend fun transformMultiEmit() = coroutineScope {
val startTimeStamp = System.currentTimeMillis()
(1..3).asFlow()
.transform {
println("Transform --- $it")
emit(it * 2) // 多次 emit
delay(100)
emit(it * 4)
}
.collect{
println("Collect($it), time=(${System.currentTimeMillis() - startTimeStamp})")
}
}
```
> 
2. emit 數據沒有限制:一般 Flow 是有限制 emit 數據類型,其類型必須與 Flow 相同,而 `transform` 內的 emit 則沒有限制
```kotlin=
suspend fun transformEmitOtherType() = coroutineScope {
val startTimeStamp = System.currentTimeMillis()
(1..3).asFlow()
.transform {
println("Transform --- $it")
emit(it) // 多次 emit
delay(100)
emit("Hello: $it")
}
.collect{
println("Collect=($it), time=(${System.currentTimeMillis() - startTimeStamp})")
}
}
```
> 
### 限制取用 take
* 一般的 Flow 在發射數據 (emit) 時都沒有限制,這時 **如果你要限制數據的發射接收數量,就可以使用 `take` 操作符**
```kotlin=
fun main() : Unit = runBlocking {
flowOf(1, 2, 3, 4, 5)
.take(3) // 限制接收數量
.collect {
println("Flow with take=($it)")
}
println("Main finish.")
}
```
> 
### Flow 計算結果
* Flow 除了 emit 以外我們還可以透過兩個操作符來聚集所有 Flow 的結果
1. **`reduce` 操作符**:獲取上一次的結果,返回下一個結果
```kotlin=
suspend fun flowReduce() = coroutineScope {
val res = (1..5)
.asFlow()
.reduce {lastValue, curValue ->
println("lastValue=($lastValue), curValue=($curValue)")
lastValue + curValue
}
println("Reduce=($res)")
}
```
> 
2. **`fold` 操作符**:跟 `reduce` 操作符很像,不過它可以設定初始值,透過初始值開始計算
```kotlin=
suspend fun flowFold() = coroutineScope {
val res = (1..5)
.asFlow()
// 初始值設定為 3,從 3 開始計算
.fold(3) {lastValue, curValue ->
println("lastValue=($lastValue), curValue=($curValue)")
lastValue + curValue
}
println("Reduce=($res)")
}
```
> 
### 合併操作符
* **Flow 也有合併操作符,可以合併兩個不同的 Flow**
1. **`zip` 操作符**:兩個不同的 Flow
```kotlin=
suspend fun flowZip() = coroutineScope {
val flowA = flowOf(1, 2, 3, 4, 5)
val flowB = flowOf("A", "B", "C", "D", "E")
flowA.zip(flowB) {
a, b ->
val tmp = "Zip flowA=($a), flowB=($b)"
println(tmp)
tmp
}.collect {
println("Collect=($it)")
}
}
```
> 
:::info
* 當兩個 Flow 的數據量不同時,會以最少數據量的 Flow 為準
```kotlin=
suspend fun flowZip2() = coroutineScope {
// 數量差異
val flowA = flowOf(1, 2, 3, 4)
val flowB = flowOf("A", "B", "C", "D", "E")
flowA.zip(flowB) {
a, b ->
val tmp = "Zip flowA=($a), flowB=($b)"
println(tmp)
tmp
}.collect {
println("Collect=($it)")
}
}
```
> 
:::
2. **`combine` 操作符**:與 `zip` 類似 (但不同);**當兩個 Flow 數量不同時**,不足處會取用最後一個數據來合併
```kotlin=
suspend fun flowCombine() = coroutineScope {
val flowA = flowOf(1, 2)
val flowB = flowOf("A", "B", "C", "D", "E")
flowA.combine(flowB) {
a, b ->
val tmp = "Combine flowA=($a), flowB=($b)"
println(tmp)
tmp
}.collect {
println("Collect=($it)")
}
}
```
> 
* 上面兩個操作符會將 Flow 合併成一個 Flow (算是合併、並聯);操作符 `flatMerge` 則是 **==串聯 Flow==,將數據會合成一個流**
```kotlin=
suspend fun flowConcat() = coroutineScope {
val flowA = flowOf(1, 2, 3, 4)
val flowB = flowOf("A", "B", "C", "D", "E")
flowOf(flowA, flowB)
.flattenConcat()
.collect {
println("Collect=($it)")
}
}
```
> 
### Nest Flow:Flat 鋪平
* 如果有使用到巢狀 (Nest) Flow 並控制其行為的狀況就可以使用 Flat 相關操作符
| Flat 相關操作符 | 特色 |
| -------- | -------- |
| flatMapConcat | 等待 Flat 內部完成才通知 Collect |
| flatMapMerge | 併發操作,不會等待 Flat 內部就直接通知 Collect |
| flatMapLatest | 當有第二次數據發送 (emit) 時,就會停止 Collect 接收 |
1. **`flatMapConcat` 操作符**:等待 Flat 內部完成才通知 Collect
```kotlin=
suspend fun flatConcat() {
var startTime : Long = 0
(1..5)
.asFlow()
.onStart { startTime = System.currentTimeMillis() }
.flatMapConcat {
flow {
emit("$it: First")
delay(500)
emit("$it: Second")
}
}
.collect {
println("Collect=($it), time=(${System.currentTimeMillis() - startTime} ms)")
}
}
```
> 
2. **`flatMapMerge` 操作符**:併發操作,不會等待 Flat 內部就直接通知 Collect
```kotlin=
suspend fun flatMerge() {
var startTime : Long = 0
(1..5)
.asFlow()
.onStart { startTime = System.currentTimeMillis() }
.flatMapMerge {
flow {
emit("$it: First")
delay(500)
emit("$it: Second")
}
}
.collect {
println("Collect=($it), time=(${System.currentTimeMillis() - startTime} ms)")
}
}
```
> 
3. **`flatMapLatest` 操作符**:當有第二次數據發送 (emit) 時,就會停止 Collect 接收
```kotlin=
suspend fun flatLatest() {
var startTime : Long = 0
(1..5)
.asFlow()
.onStart { startTime = System.currentTimeMillis() }
.flatMapLatest {
flow {
emit("$it: First")
delay(500)
emit("$it: Second")
}
}
.collect {
println("Collect=($it), time=(${System.currentTimeMillis() - startTime} ms)")
}
}
```
> 
## 更多的 Kotlin 語言相關文章
在這裡,我們提供了一系列豐富且深入的 Kotlin 語言相關文章,涵蓋了從基礎到進階的各個方面。讓我們一起來探索這些精彩內容!
### Kotlin 語言基礎
* **Kotlin 語言基礎**:想要建立堅實的 Kotlin 基礎?以下這些文章將帶你深入探索 Kotlin 的關鍵基礎和概念,幫你打造更堅固的 Kotlin 語言基礎
:::info
* [**Kotlin 函數、類、屬性 | DataClass、Sealed、Object 關鍵字 | Enum、Companion、NPE**](https://devtechascendancy.com/kotlin-functions_oop_dataclass_sealed_object/)
* [**深入探究 Kotlin 與 Java 泛型:擦除、取得泛型類型、型變、投影 | 協變、逆變**](https://devtechascendancy.com/explore-kotlin-java-generics_type_erasure/)
* [**深入 Kotlin 函數特性:Inline、擴展、標準函數全解析 | 提升程式碼效能與可讀性**](https://devtechascendancy.com/kotlin_inline_extensions_standards-func/)
* [**Kotlin DSL、操作符、中綴表達式 Infix | DSL 詳解 | DSL 設計與應用**](https://devtechascendancy.com/kotlin-dsl-operators-infix-explained/)
:::
### Kotlin 特性、特點
* **Kotlin 特性、特點**:探索 Kotlin 的獨特特性和功能,加深對 Kotlin 語言的理解,並增強對於語言特性的應用
:::warning
* [**Kotlin 代理與懶加載機制:使用、lazy 深度解析**](https://devtechascendancy.com/kotlin-delegate_java-proxy_lateinit_lazy/)
* [**Kotlin Lambda 編程 & Bytecode | Array & Collections 集合 | 集合函數式 API**](https://devtechascendancy.com/kotlin-lambda-bytecode-array-collections-functional/)
* [**深入理解 Kotlin:智能推斷與 Contact 規則**](https://devtechascendancy.com/kotlin-smart-inference-contract-rules-guide/)
:::
### Kotlin 進階:協程、響應式、異步
* **Kotlin 進階:協程、響應式、異步**:若想深入學習 Kotlin 的進階主題,包括協程應用、Channel 使用、以及 Flow 的探索,請查看以下文章
:::danger
* [**應用 Kotlin 協程:對比 Thread、創建協程、任務掛起 | Dispatcher、CoroutineContext、CoroutineScope**](https://devtechascendancy.com/applied-kotlin-coroutines-in-depth-guide/)
* [**Kotlin Channel 使用介紹 | Select、Actor | 生產者消費者**](https://devtechascendancy.com/kotlin-channel_select_actor_cs/)
* [**探索 Kotlin Flow:基本使用、RxJava 對比、背壓機制 | Flow 細節**](https://devtechascendancy.com/kotlin-flow-usage_compare-rx_backpressure/)
:::
## Appendix & FAQ
:::info
:::
###### tags: `Kotlin`