# Kotlin coroutines - 實務 ###### tags: `Kotlin` `coroutines` # 2025/03/11 檢磅資訊中有等級, 我想把相同等級的檢磅資訊歸納成一群, 然後列表會有兩層, 第一層是 summary, 只顯示等級和數量, 第二層才是明細 item ``` interface SaGradeWeightDetailDelegate: CloneByUpdatedAtDelegate<SaGradeWeightDetailDelegate>, UpdatedAtDelegate { val theGradeWeightDetailId: String val theGradeWeightId: String val theGwdWeighingNo: String val theGwdReferenceNo: String val theGwdFarmerId: String val theGwdFarmerCode: String val theGwdFarmerName: String val theGwdProductId: String val theGwdProductCode: String val theGwdProductName: String val theGwdFarmerTeamId: String val theGwdFarmerTeamCode: String val theGwdFarmerTeamName: String val theGwdFarmerTeamLevelId: String /** 等級代碼, 6 */ val theGwdFarmerTeamLevelCode: String /** 等級名稃, 良上 */ val theGwdFarmerTeamLevelName: String /** 實際重量 */ val theGwdWeightActual: Float /** 扣重 */ val theGwdWeightDeduction: Float /** 計價重量 */ val theGwdWeightPricing: Float /** 磅秤編號 */ val theGwdScaleNo: Int val theGwdReceiptDate: String } interface SaGradeWeightDetailItemDelegate : TextLabelWrapperDelegate<SaGradeWeightDetailDelegate> data class SaGradeWeightDetailSummary( val theGwdFarmerTeamLevelCode: String, val theGwdFarmerTeamLevelName: String, val itemCount: Int, val items: List<SaGradeWeightDetailItemDelegate> ) ``` Kotlin 的 ``Iterable<T>.groupBy`` 可以先把原本 ``List<T>`` 依 ``T`` 中的 property 來當作 key, 建成 ``Map<K, List<T>>`` 這樣 檢磅資訊 List 就依不同等級分群了 然後 ``Map<out K, R>.map()`` 再把中間生成的 ``Map`` 轉成 summary List ``List<R>`` ``` val groupedItems = itemList .groupBy { Pair(it.theDataObject.theGwdFarmerTeamLevelCode, it.theDataObject.theGwdFarmerTeamLevelName) } .map { (key, items) -> SaGradeWeightDetailSummary( theGwdFarmerTeamLevelCode = key.first, theGwdFarmerTeamLevelName = key.second, itemCount = items.size, items = items ) } ``` # 2025/03/04 以下程式碼, ``launch`` 會建立兩個並行的 coroutine, 所以不會循序執行! kotlin: ``` private fun compoundTasks() { mixUiAndIoWork1() mixUiAndIoWork2() } private fun mixUiAndIoWork1 () { launch { val otherItem: SaItem? = withContext(Dispatchers.IO) { findOtherItem() } if (null != otherItem) { // update UI } else { Logger.getLogger(getLogTag()).log(Level.WARNING, "mixUiAndIoWork1 - otherItem is null!!") } } } private fun mixUiAndIoWork2 () { launch { val otherBuyer: SaBuyer? = withContext(Dispatchers.IO) { findOtherBuyer() } if (null != otherBuyer) { // update UI } else { Logger.getLogger(getLogTag()).log(Level.WARNING, "mixUiAndIoWork2 - otherBuyer is null!!") } } } ``` 改成使用 ``suspend fun``, 並在一個地方統一使用 ``launch`` 才能確保循序執行 ``` private fun compoundTasks() { launch { mixUiAndIoWork1() mixUiAndIoWork2() } } private suspend fun mixUiAndIoWork1 () { val otherItem: SaItem? = withContext(Dispatchers.IO) { findOtherItem() } if (null != otherItem) { // update UI } else { Logger.getLogger(getLogTag()).log(Level.WARNING, "mixUiAndIoWork1 - otherItem is null!!") } } private suspend fun mixUiAndIoWork2 () { val otherBuyer: SaBuyer? = withContext(Dispatchers.IO) { findOtherBuyer() } if (null != otherBuyer) { // update UI } else { Logger.getLogger(getLogTag()).log(Level.WARNING, "mixUiAndIoWork2 - otherBuyer is null!!") } } ``` --- # 2024/05/09 * [Calling Kotlin Suspending Functions from Java](https://www.baeldung.com/kotlin/suspend-functions-from-java) --- # 2024/04/30 ``async{}`` and ``await()`` 要依序執行 ## 錯誤: ``` fun triggerAsyncAwait() { triggerSyncInstantlyResultLiveData.value = null var syncResult = -1 val gallerySync: Deferred<FmApiResult<Int>> = async { Logger.getLogger(getLogTag()).log(Level.INFO,"triggerAsyncAwait() - gallerySync start") delay(10 * 1000L) Logger.getLogger(getLogTag()).log(Level.WARNING,"triggerSyncInstantly() - gallerySync end") FmApiResult.Success<Int>(0) } val galleryApplySync: Deferred<FmApiResult<Int>> = async { Logger.getLogger(getLogTag()).log(Level.INFO,"triggerAsyncAwait() - galleryApplySync start") delay(10 * 1000L) Logger.getLogger(getLogTag()).log(Level.WARNING,"triggerSyncInstantly() - galleryApplySync end") FmApiResult.Success<Int>(0) } launch (Dispatchers.IO) { when(val gallerySyncResult = gallerySync.await()) { is FmApiResult.Success -> { Logger.getLogger(getLogTag()).log(Level.SEVERE,"triggerSyncInstantly() - gallerySync success - on Thread: [${Thread.currentThread().name}]") } is FmApiResult.Error -> { Logger.getLogger(getLogTag()).log(Level.SEVERE,"triggerSyncInstantly() - gallerySync error - on Thread: [${Thread.currentThread().name}]", gallerySyncResult.cause) } } when(val galleryApplySyncResult = galleryApplySync.await()) { is FmApiResult.Success -> { Logger.getLogger(getLogTag()).log(Level.SEVERE,"triggerSyncInstantly() - galleryApplySync success - on Thread: [${Thread.currentThread().name}]") syncResult = 0 } is FmApiResult.Error -> { Logger.getLogger(getLogTag()).log(Level.SEVERE,"triggerSyncInstantly() - galleryApplySync error - on Thread: [${Thread.currentThread().name}]", galleryApplySyncResult.cause) } } triggerSyncInstantlyResultLiveData.postValue(syncResult) } } ``` 這樣會同時時執行 ``gallerySync``和 ``galleryApplySync`` ## 解法 參照這篇文章, 使用 ``for``, 在真正要執行時, 才引用 ``awaint {}.awit()`` 來執行背景任務, 這樣才會依序執行; 不然就是在 ``launch{}`` 中依序執行程式碼 * [How To Make Sequential Background Tasks with Kotlin Coroutines](https://medium.com/@manuchekhrdev/how-to-make-sequential-background-tasks-with-kotlin-coroutines-e94ccf9f45c1) ``` suspend fun delayFor(time: Long, tag: String): FmApiResult<Int> { Logger.getLogger(getLogTag()).log(Level.WARNING,"delayFor() - [$tag] start") delay(time) Logger.getLogger(getLogTag()).log(Level.WARNING,"delayFor() - [$tag] end") return FmApiResult.Success<Int>(0) } fun triggerAsyncAwait() { triggerSyncInstantlyResultLiveData.value = null var syncResult = -1 launch (Dispatchers.IO) { val list = listOf("gallerySync", "galleryApplySync") list.forEach { tag -> val result = async { delayFor(10 * 1000L, tag) }.await() when(result) { is FmApiResult.Success -> { Logger.getLogger(getLogTag()).log(Level.SEVERE,"triggerSyncInstantly() - [$it] success - on Thread: [${Thread.currentThread().name}]") } is FmApiResult.Error -> { Logger.getLogger(getLogTag()).log(Level.SEVERE,"triggerSyncInstantly() - [$it] error - on Thread: [${Thread.currentThread().name}]", result.cause) } } if (tag == "galleryApplySync") { syncResult = 1} } triggerSyncInstantlyResultLiveData.postValue(syncResult) } } ``` # 2023/03/21 New [Kotlin Arrow Kt Eval Monad: A Beginner’s Guide](https://proandroiddev.com/kotlin-arrow-kt-eval-monad-a-beginners-guide-bff50e49d07c) [Exploring the Power of Kotlin Contracts for Better Code Quality](https://oguzhanaslann.medium.com/exploring-the-power-of-kotlin-contracts-for-better-code-quality-80bb279d7d2d) [Should you use getters and setters in Python?](https://python.plainenglish.io/should-you-use-getters-and-setters-in-python-d4db9a892878) # 2021/10/20 How to make "inappropriate blocking method call" appropriate? ### Google "okhttp inappropriate blocking method call" ## [False positive: "Inappropriate blocking method call" with coroutines and Dispatchers.IO](https://youtrack.jetbrains.com/issue/KTIJ-838) See this example (Android target): ``` @Throws(IOException::class) fun blockingFunc(): String{ Thread.sleep(4000) return "12" } private fun test() { GlobalScope.launch(Dispatchers.Main) { val v = withContext(Dispatchers.IO) { blockingFunc() // << Warning: "Inappropriate blocking method call" } println("v: $v") val v1 = async(Dispatchers.IO) { blockingFunc() // << no warning }.await() println("v: $v1") } } ``` The ``blockingFunc`` is marked as throwing ``IOException`` (that's how IDE detects "blocking calls"). But the function is __called in ``Dispatchers.IO`` dedicated to such calls, so doesn't block main thread (or calling coroutine)__. ``withContext(Dispatchers.IO)`` __behaves similar as calling__ ``delay()``, __it suspends coroutine, does not block__. In contrast, later call with ``async(Dispatchers.IO){}.await()`` __does similar thing - switches context and waits for result, yet there is not this warning__. It looks to me that this inspection is poorly documented and doesn't work properly. It guesses "blocking function" from ``@Throws`` annotation (which is not needed at all in Kotlin), so it may only guess in limited cases. Also blocking call may be anything else besides I/O, e.g. compute factorial. Finally, the warning is shown inconsistently and in my opinion it's wrong in this case. --- ## [How to make "inappropriate blocking method call" appropriate?](https://newbedev.com/how-to-make-inappropriate-blocking-method-call-appropriate) * [How to make "inappropriate blocking method call" appropriate?](https://stackoverflow.com/questions/58680028/how-to-make-inappropriate-blocking-method-call-appropriate) The warning is about methods that __block current thread and coroutine cannot be properly suspended__. This way, you __lose all benefits of coroutines and downgrade to one job per thread again__. Each case should be handled in a different way. For suspendable http calls you can use __Ktor http client__. But sometimes there is no library for your case, so you can either write your own solution or ignore this warning. Edit: ``withContext(Dispatchers.IO)`` or some custom dispatcher can be used to workaround the problem. Thanks for the comments. 結論, 使用 ``withContext(Dispatchers.IO)`` 去包住 blocking method calls 是一個可行的方式!! You also get this warning when calling a suspending function that is annotated with @Throws(IOException::class) (Kotlin 1.3.61). Not sure if that is intended or not. Anyway, you can suppress this warning by removing that annotation or changing it to Exception class. __Wrap__ the "inappropriate blocking method call" code in __another context__ using ``withContext``. That is to say (for example): If you are doing a read/write blocking method call: ``` val objects = withContext(Dispatchers.IO) { dao.getAll() } ``` If you are performing a blocking network request (using Retrofit): ``` val response = withContext(Dispatchers.IO) { call.execute() } ``` Or if you are performing a CPU intensive blocking task: ``` val sortedUsers = withContext(Dispatchers.Default) { users.sortByName() } ``` This will __suspend the current coroutine, then execute the "inappropriate blocking call" on a different thread__ (from either the Dispatchers.IO or Dispatchers.Default pools), thereby __not blocking the thread your coroutine is executing on__. --- * 利用 Coroutine 實作 Long-running Task - [Kotlin coroutines - Long-running Task](https://hackmd.io/2mlMN-wtScCQTiF7IQAfHA) 不知能否用 ``Flow`` 來實作? # 2021/05/26 針對 Flow 1. 與 suspend functions 相比, 是 Flow 可以 emit 多個 sequential values 2. 具有 Stream of data 的概念 (``Iterator``) 3. 以 Produce / Consume data 是以 asynchronous 方式執行 (suspend function, e.g., ``emit()``, ``collect()``) 4. Google 以 Producer, Intermediary 和 Consumer 三個角色來看它的 Operator 4.1. Producer class: 利用 ``Flow`` 來 ``emit()`` 4.2. Consumer class: 利用 ``Flow`` 來 ``collect()`` ## 建立 Flow 建立 ``Flow<Type>`` 可以使用 Flow builder ``flow()``, 傳入一個 suspend block: ``` public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block) ``` e.g., ``` val latestNews: Flow<List<ArticleHeadline>> = flow { while(continueFlag) { showLog(Log.INFO, getLogTag(), "NewsRemoteDataSource - flow block") val latestNews = newsApi.fetchLatestNews() emit(latestNews) // Emits the result of the request to the flow delay(refreshIntervalMs) // Suspends the coroutine for some time } } ``` ``` val timeStampFlow: Flow<Long> = flow { showLog(Log.INFO, "FlowChainTest01", "buildFlowChain - timeStampFlow - start, on Thread: ${Thread.currentThread().name}") for (i in 1..10) { delay(250) val timeStamp = System.currentTimeMillis() emit(timeStamp) showLog(Log.INFO, "FlowChainTest01", "buildFlowChain - emit [$timeStamp], on Thread: ${Thread.currentThread().name}") } } ``` 1. ``emit()`` 為 suspend function, 為 Producer 的概念 2. Flow 在 coroutine 中運行, 當執行到 suspend function, Flow 就暫停直到 suspend function 執行完成 3. 使用 flow builder ``flow{}`` 建立的 ``Flow<Type>`` 為一個 __cold ``Flow``__ 4. 本身跑在 coroutine 中 (由當下 ``CoroutineContext``決定決定) --> 所以無法在其它 ``CoroutineContext`` 下進行 ``emit()`` 也就是巢狀利用 ``launch`` 或 ``withContext`` 只能有 ``suspend functions`` --- 仔細看, Flow builder ``flow()`` 和 ``launch()`` 不同, 並沒有傳入 ``CoroutineContext``!! 並且後來有推出 ``flowOn()`` 可以指定 ``Dispatchers.IO`` (類似 Rx 的 ``subscribeOn()``) 預設, flow 會跑在 terminal operator 的 coroutine 的 coroutineContext 上, ``flowOn()`` 會把 upstream flow 的 CoroutineContext 改掉 但不會影響到 terminal operator 的 coroutineContext 這樣的方式像 Rx 精神類似 , 把 Producer 和 Intermediary 都串好: ``` class FlowChainTest01 { fun buildFlowChain(): Flow<Long> { val timeStampFlow: Flow<Long> = flow { showLog(Log.INFO, "FlowChainTest01", "buildFlowChain - timeStampFlow - start, on Thread: ${Thread.currentThread().name}") for (i in 1..10) { delay(250) val timeStamp = System.currentTimeMillis() emit(timeStamp) showLog(Log.INFO, "FlowChainTest01", "buildFlowChain - emit [$timeStamp], on Thread: ${Thread.currentThread().name}") } } return timeStampFlow .map { showLog(Log.INFO, "FlowChainTest01", "buildFlowChain - map - timeStamp: [$it], on Thread: ${Thread.currentThread().name}") it % 10L } .onEach { val value = it.toInt() showLog(Log.INFO, "FlowChainTest01", "buildFlowChain - onEach - value: [$value], on Thread: ${Thread.currentThread().name}") } .flowOn(Dispatchers.IO) } } ``` 最後再給 ``launch()`` 去 Consume Data!! ``` class FlowTerminalTest01 { private val supervisorJob = SupervisorJob() private var currentTaskJob: Job? = null val context: CoroutineContext = Job() + Dispatchers.Main //private val coroutineScope = CoroutineScope(supervisorJob) private val coroutineScope = CoroutineScope(context) fun cancelCurrentJobIfNeeded() { if (null != currentTaskJob) { showLog(Log.WARN, "FlowTerminalTest01", "cancelCurrentJobIfNeeded - currentTaskJob - isActive: ${currentTaskJob!!.isActive}, isCancelled: ${currentTaskJob!!.isCancelled}, on Thread: ${Thread.currentThread().name}") if ((currentTaskJob!!.isActive) && (!(currentTaskJob!!.isCancelled)) ) { currentTaskJob!!.cancel() showLog(Log.WARN, this@FlowTerminalTest01.getLogTag(), "[!! cancel current task job !!], on Thread: ${Thread.currentThread().name}") } } } fun testFlowByLaunch() { cancelCurrentJobIfNeeded() currentTaskJob = CoroutineScope(context).launch { //observeSomething showLog(Log.INFO, "FlowTerminalTest01", "testFlowByLaunch - launch, on Thread: ${Thread.currentThread().name}") try { FlowChainTest01().buildFlowChain().collect { showLog(Log.INFO, "FlowTerminalTest01", "testFlowByLaunch - collect: [$it], on Thread: ${Thread.currentThread().name}") } } catch (cause: Throwable) { showLog(Log.ERROR, "FlowTerminalTest01", "testFlowByLaunch - exception on launch, on Thread: ${Thread.currentThread().name}", cause) } } } fun testFlowByLaunchIn() { cancelCurrentJobIfNeeded() currentTaskJob = FlowChainTest01().buildFlowChain() .onStart { showLog(Log.INFO, "FlowTerminalTest01", "testFlowByLaunchIn - onStart, on Thread: ${Thread.currentThread().name}") } .launchIn(coroutineScope) } } ``` ``` I/FlowTerminalTest01: testFlowByLaunch - launch, on Thread: main I/FlowChainTest01: buildFlowChain - timeStampFlow - start, on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: buildFlowChain - map - timeStamp: [1621994677734], on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: buildFlowChain - onEach - value: [4], on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: buildFlowChain - emit [1621994677734], on Thread: DefaultDispatcher-worker-1 I/FlowTerminalTest01: testFlowByLaunch - collect: [4], on Thread: main I/FlowChainTest01: buildFlowChain - map - timeStamp: [1621994677993], on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: buildFlowChain - onEach - value: [3], on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: buildFlowChain - emit [1621994677993], on Thread: DefaultDispatcher-worker-1 I/FlowTerminalTest01: testFlowByLaunch - collect: [3], on Thread: main I/FlowChainTest01: buildFlowChain - map - timeStamp: [1621994678250], on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: buildFlowChain - onEach - value: [0], on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: buildFlowChain - emit [1621994678250], on Thread: DefaultDispatcher-worker-1 I/FlowTerminalTest01: testFlowByLaunch - collect: [0], on Thread: main I/FlowChainTest01: buildFlowChain - map - timeStamp: [1621994678507], on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: buildFlowChain - onEach - value: [7], on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: buildFlowChain - emit [1621994678507], on Thread: DefaultDispatcher-worker-1 I/FlowTerminalTest01: testFlowByLaunch - collect: [7], on Thread: main I/FlowChainTest01: buildFlowChain - map - timeStamp: [1621994678763], on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: buildFlowChain - onEach - value: [3], on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: buildFlowChain - emit [1621994678763], on Thread: DefaultDispatcher-worker-1 I/FlowTerminalTest01: testFlowByLaunch - collect: [3], on Thread: main I/FlowChainTest01: buildFlowChain - map - timeStamp: [1621994679022], on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: buildFlowChain - onEach - value: [2], on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: buildFlowChain - emit [1621994679022], on Thread: DefaultDispatcher-worker-1 I/FlowTerminalTest01: testFlowByLaunch - collect: [2], on Thread: main I/FlowChainTest01: buildFlowChain - map - timeStamp: [1621994679281], on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: buildFlowChain - onEach - value: [1], on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: buildFlowChain - emit [1621994679281], on Thread: DefaultDispatcher-worker-1 I/FlowTerminalTest01: testFlowByLaunch - collect: [1], on Thread: main I/FlowChainTest01: buildFlowChain - map - timeStamp: [1621994679538], on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: buildFlowChain - onEach - value: [8], on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: buildFlowChain - emit [1621994679538], on Thread: DefaultDispatcher-worker-1 I/FlowTerminalTest01: testFlowByLaunch - collect: [8], on Thread: main I/FlowChainTest01: buildFlowChain - map - timeStamp: [1621994679794], on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: buildFlowChain - onEach - value: [4], on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: buildFlowChain - emit [1621994679794], on Thread: DefaultDispatcher-worker-1 I/FlowTerminalTest01: testFlowByLaunch - collect: [4], on Thread: main I/FlowChainTest01: buildFlowChain - map - timeStamp: [1621994680052], on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: buildFlowChain - onEach - value: [2], on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: buildFlowChain - emit [1621994680052], on Thread: DefaultDispatcher-worker-1 I/FlowTerminalTest01: testFlowByLaunch - collect: [2], on Thread: main ``` flow 在遇到 Exception 時, 就會中止了! ``` I/FlowTerminalTest01: testFlowByLaunchIn - onStart, on Thread: main I/FlowChainTest01: buildFlowChain - timeStampFlow - start, on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: buildFlowChain - map - timeStamp: [1621996311958], on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: buildFlowChain - onEach - value: [8], on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: buildFlowChain - emit [1621996311958], on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: buildFlowChain - map - timeStamp: [1621996312210], on Thread: DefaultDispatcher-worker-1 E/FlowTerminalTest01: testFlowByLaunchIn - catch, on Thread: main tw.com.goglobal.project.baseframework.apis.IeRuntimeException: Throw error deliberately ``` 若不想終止, 要另外用其它法處理, 例如, 每次都回傳 ``IeApiResponse`` ``` data class IeApiResponse<T>( val result: T?, val error: IeRuntimeException? ) { override fun toString() = "IeApiResponse {result: " + (result ?: "Null") + ", error: " + (error?: "Null") + "}" } ``` ``` fun buildFlowChain2(): Flow<IeApiResponse<Long>> { val timeStampFlow: Flow<IeApiResponse<Long>> = flow { showLog(Log.INFO, "FlowChainTest01", "buildFlowChain - timeStampFlow - start, on Thread: ${Thread.currentThread().name}") for (i in 1..10) { delay(250) val timeStamp = System.currentTimeMillis() emit(IeApiResponse<Long>(timeStamp, null)) showLog(Log.INFO, "FlowChainTest01", "buildFlowChain - emit [$timeStamp], on Thread: ${Thread.currentThread().name}") } } return timeStampFlow .map { showLog(Log.INFO, "FlowChainTest01", "buildFlowChain - map - timeStamp: [$it], on Thread: ${Thread.currentThread().name}") if (null != it.result) { val result = it.result!! % 10L if(result == 0L) { IeApiResponse<Long>(null, IeRuntimeException("Throw error deliberately", "00000")) } else { IeApiResponse<Long>(result, null) } } else { showLog(Log.ERROR, "FlowChainTest01", "buildFlowChain - map - it.result is null!!") it } } .onEach { if (null != it.result) { val value = it.result!!.toInt() showLog(Log.INFO, "FlowChainTest01", "buildFlowChain - onEach - value: [$value], on Thread: ${Thread.currentThread().name}") } else { showLog(Log.ERROR, "FlowChainTest01", "buildFlowChain - onEach - it.result is null!!") } } .flowOn(Dispatchers.IO) } ``` ``` fun testFlowByLaunch2() { cancelCurrentJobIfNeeded() currentTaskJob = CoroutineScope(context).launch { showLog(Log.INFO, "FlowTerminalTest01", "testFlowByLaunch - launch, on Thread: ${Thread.currentThread().name}") FlowChainTest01().buildFlowChain2().collect { if (null != it.result) { showLog(Log.INFO, "FlowTerminalTest01", "testFlowByLaunch - collect: [${it.result!!}], on Thread: ${Thread.currentThread().name}") } else if (null != it.error) { showLog(Log.ERROR, "FlowTerminalTest01", "testFlowByLaunch - exception on launch, on Thread: ${Thread.currentThread().name}", it.error!!) } } } } ``` --- ``launchIn()`` 為 Terminal flow operator, 可以指定 ``coroutineScope`` 實際上是 ``coroutineScope.launch { flow.collect() }`` 的重新包裝 常與 ``onEach`, ``onCompletion`` ``catch`` 作搭配: 使用 ``launchIn()``, 在記得加上 ``catch()`` 去處理 Exception, 不然程式會 crashed e.g., ``` flow .onEach { value -> updateUi(value) } .onCompletion { cause -> updateUi(if (cause == null) "Done" else "Failed") } .catch { cause -> LOG.error("Exception: $cause") } .launchIn(uiScope) ``` --- ### [Differences in methods of collecting Kotlin Flows](https://itnext.io/differences-in-methods-of-collecting-kotlin-flows-3d1d4efd1c2) ``.collect()`` 與 ``.launchIn()`` 之間的區別 1. ``.collect()`` 是 suspending method ``.launchIn()`` 不是 suspending method 所以, ``.collect()`` 只能在其它 suspending method 中使用 但 ``.launchIn()`` 可以在所有 method 中使用 2. ``.launchIn()`` 還會回傳 ``Job`` Instance, 所以可以用來取消 coroutine 3. 測試與觀看原始碼, 會發現在某些情況下使用 ``.collect()`` 整個 coroutine (``{...}`` 中) 會被暫停, 所以 ``runBlocking{}`` 中會整個暫停 但使用 ``.launchIn()`` 時, 它會使用所傳入的 ``CoroutineScope`` 來建立另一個 coroutine, 然後去進行 ``.collect()``, 所以不會影響到原本的 coroutine 作者建議使用 ``.launchIn()``! 然後小心使用 ``.collect()`` --- ### Transform / Intermediary operators: ``map{}`` ``onEach{}`` 類似 Rx 的 ``doOnNext()`` intermediate operators 像 map(), onEach() map() 的 block crossinline transform: suspend (value: T) -> R 有回傳值, onEach()的 block action: suspend (T) -> Unit 沒有回傳值, ### Terminal / Consumer operators ``collect``, ``toList``, ``first``, ``single`` --- source code 有這段註解: ``` * Creates a _cold_ flow from the given suspendable [block]. * The flow being _cold_ means that the [block] is called every time a terminal operator is applied to the resulting flow. * * Example of usage: * * ``` * fun fibonacci(): Flow<BigInteger> = flow { * var x = BigInteger.ZERO * var y = BigInteger.ONE * while (true) { * emit(x) * x = y.also { * y += x * } * } * } * * fibonacci().take(100).collect { println(it) } * ``` * * Emissions from [flow] builder are [cancellable] by default &mdash; each call to [emit][FlowCollector.emit] * also calls [ensureActive][CoroutineContext.ensureActive]. * * `emit` should happen strictly in the dispatchers of the [block] in order to preserve the flow context. * For example, the following code will result in an [IllegalStateException]: * * ``` * flow { * emit(1) // Ok * withContext(Dispatcher.IO) { * emit(2) // Will fail with ISE * } * } * ``` * * If you want to switch the context of execution of a flow, use the [flowOn] operator. ``` --- # 2021/06/14 這段測試 Flow 的 Code, Producer block 會依序 emit 10個 timestamp 然後 中間兩個 intermediate operator 會先後將 timestamp 轉成 Date, 再把 Date 轉成 string 最後被 consumer (collect) 處理 ``` class FlowChainTest01 { fun buildFlowChain(): Flow<IeApiResponse<String>> { val timeStampFlow: Flow<IeApiResponse<Long>> = flow { showLog(Log.INFO, "FlowChainTest01", "buildFlowChain - timeStampFlow - start, on Thread: ${Thread.currentThread().name}") for (i in 1..10) { delay(250) val timeStamp = System.currentTimeMillis() emit(IeApiResponse<Long>(timeStamp, null)) showLog(Log.INFO, "FlowChainTest01", "buildFlowChain - emit [$timeStamp], on Thread: ${Thread.currentThread().name}") } } return timeStampFlow .map { apiResponseLongToDate(it) //apiResponseLongToDate2(it) } .map { //apiResponseDateToString(it) apiResponseDateToString2(it) } .onEach { apiResponseStringPrint(it) } .flowOn(Dispatchers.IO) } private fun apiResponseLongToDate(longApiResponse: IeApiResponse<Long>): IeApiResponse<Date> { showLog(Log.INFO, "FlowChainTest01", "apiResponseLongToDate - on Thread: ${Thread.currentThread().name}") return if (null != longApiResponse.result) { showLog(Log.INFO, "FlowChainTest01", "apiResponseLongToDate - timeStamp: {${longApiResponse.result}}") val theDate :Date = Date() IeApiResponse<Date>(theDate, null) } else { showLog(Log.ERROR, "FlowChainTest01", "apiResponseLongToDate - longApiResponse.result is null!!") val error = IeRuntimeException("Timestamp is null", BaseErrorCodes.ILLEGAL_ARGUMENT_ERROR) IeApiResponse<Date>(null, error) } } private val apiResponseLongToDate2: (IeApiResponse<Long>) -> IeApiResponse<Date> = { response -> apiResponseLongToDate(response) } private fun apiResponseDateToString(dateApiResponse: IeApiResponse<Date>): IeApiResponse<String> { showLog(Log.INFO, "FlowChainTest01", "apiResponseDateToString - on Thread: ${Thread.currentThread().name}") return if (null != dateApiResponse.result) { showLog(Log.INFO, "FlowChainTest01", "apiResponseDateToString - timeStamp: {${dateApiResponse.result!!.time}}") val theDateString = dateApiResponse.result!!.toDateFormatString() IeApiResponse<String>(theDateString, null) } else { showLog(Log.ERROR, "FlowChainTest01", "apiResponseDateToString - dateApiResponse.result is null!!") val error = IeRuntimeException("Date is null", BaseErrorCodes.ILLEGAL_ARGUMENT_ERROR) IeApiResponse<String>(null, error) } } private val apiResponseDateToString2: (IeApiResponse<Date>) -> IeApiResponse<String> = { response -> apiResponseDateToString(response) } private fun apiResponseStringPrint(stringApiResponse: IeApiResponse<String>) { showLog(Log.INFO, "FlowChainTest01", "apiResponseStringPrint - on Thread: ${Thread.currentThread().name}") if (null != stringApiResponse.result) { showLog(Log.INFO, "FlowChainTest01", "apiResponseStringPrint - dateString: {${stringApiResponse.result!!}}") } else { showLog(Log.ERROR, "FlowChainTest01", "apiResponseDateToString - stringApiResponse.result is null!!") } } } ``` ``` class FlowTerminalTest01 { private val supervisorJob = SupervisorJob() private var currentTaskJob: Job? = null val context: CoroutineContext = Job() + Dispatchers.Main //private val coroutineScope = CoroutineScope(supervisorJob) private val coroutineScope = CoroutineScope(context) fun cancelCurrentJobIfNeeded() { if (null != currentTaskJob) { showLog(Log.WARN, "FlowTerminalTest01", "cancelCurrentJobIfNeeded - currentTaskJob - isActive: ${currentTaskJob!!.isActive}, isCancelled: ${currentTaskJob!!.isCancelled}, on Thread: ${Thread.currentThread().name}") if ((currentTaskJob!!.isActive) && (!(currentTaskJob!!.isCancelled)) ) { currentTaskJob!!.cancel() showLog(Log.WARN, "FlowTerminalTest01", "[!! cancel current task job !!], on Thread: ${Thread.currentThread().name}") } } } fun testFlowByLaunch() { cancelCurrentJobIfNeeded() currentTaskJob = CoroutineScope(context).launch { //observeSomething showLog(Log.INFO, "FlowTerminalTest01", "testFlowByLaunch - launch, on Thread: ${Thread.currentThread().name}") FlowChainTest01().buildFlowChain().collect { if (null != it.result) { showLog(Log.INFO, "FlowTerminalTest01", "testFlowByLaunch - collect: [${it.result!!}], on Thread: ${Thread.currentThread().name}") } else if (null != it.error) { showLog(Log.ERROR, "FlowTerminalTest01", "testFlowByLaunch - exception on launch, on Thread: ${Thread.currentThread().name}", it.error!!) } } } } } ``` 可以發現 Flow 的 Log, 每一 Round 是從 Producer -> intermediate operators 處理完後, 才會emit 下一個值 而 consumer 因為在不同 Thread, 會再每一 round 的 最後一個 intermediate operator 處理完後才接收到值 還有因為我 log 是印在 emit 之後, 結果是``apiResponseStringPrint`` 跑完後才出現 ``buildFlowChain2 - emit [1623682021202]`` ``` I/FlowChainTest01: buildFlowChain2 - timeStampFlow - start, on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseLongToDate - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseLongToDate - timeStamp: {1623682021202} I/FlowChainTest01: apiResponseDateToString - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseDateToString - timeStamp: {1623682021204} I/FlowChainTest01: apiResponseStringPrint - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseStringPrint - dateString: {20210614_224701} I/FlowChainTest01: buildFlowChain2 - emit [1623682021202], on Thread: DefaultDispatcher-worker-1 I/FlowTerminalTest01: testFlowByLaunch - collect: [20210614_224701], on Thread: main I/FlowChainTest01: apiResponseLongToDate - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseLongToDate - timeStamp: {1623682021462} I/FlowChainTest01: apiResponseDateToString - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseDateToString - timeStamp: {1623682021463} I/FlowChainTest01: apiResponseStringPrint - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseStringPrint - dateString: {20210614_224701} I/FlowChainTest01: buildFlowChain2 - emit [1623682021462], on Thread: DefaultDispatcher-worker-1 I/FlowTerminalTest01: testFlowByLaunch - collect: [20210614_224701], on Thread: main I/FlowChainTest01: apiResponseLongToDate - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseLongToDate - timeStamp: {1623682021721} I/FlowChainTest01: apiResponseDateToString - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseDateToString - timeStamp: {1623682021722} I/FlowChainTest01: apiResponseStringPrint - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseStringPrint - dateString: {20210614_224701} I/FlowChainTest01: buildFlowChain2 - emit [1623682021721], on Thread: DefaultDispatcher-worker-1 I/FlowTerminalTest01: testFlowByLaunch - collect: [20210614_224701], on Thread: main I/FlowChainTest01: apiResponseLongToDate - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseLongToDate - timeStamp: {1623682021980} I/FlowChainTest01: apiResponseDateToString - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseDateToString - timeStamp: {1623682021981} I/FlowChainTest01: apiResponseStringPrint - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseStringPrint - dateString: {20210614_224701} I/FlowChainTest01: buildFlowChain2 - emit [1623682021980], on Thread: DefaultDispatcher-worker-1 I/FlowTerminalTest01: testFlowByLaunch - collect: [20210614_224701], on Thread: main I/FlowChainTest01: apiResponseLongToDate - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseLongToDate - timeStamp: {1623682022240} I/FlowChainTest01: apiResponseDateToString - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseDateToString - timeStamp: {1623682022241} I/FlowChainTest01: apiResponseStringPrint - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseStringPrint - dateString: {20210614_224702} I/FlowChainTest01: buildFlowChain2 - emit [1623682022240], on Thread: DefaultDispatcher-worker-1 I/FlowTerminalTest01: testFlowByLaunch - collect: [20210614_224702], on Thread: main I/FlowChainTest01: apiResponseLongToDate - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseLongToDate - timeStamp: {1623682022501} I/FlowChainTest01: apiResponseDateToString - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseDateToString - timeStamp: {1623682022501} I/FlowChainTest01: apiResponseStringPrint - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseStringPrint - dateString: {20210614_224702} I/FlowChainTest01: buildFlowChain2 - emit [1623682022501], on Thread: DefaultDispatcher-worker-1 I/FlowTerminalTest01: testFlowByLaunch - collect: [20210614_224702], on Thread: main I/FlowChainTest01: apiResponseLongToDate - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseLongToDate - timeStamp: {1623682022761} I/FlowChainTest01: apiResponseDateToString - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseDateToString - timeStamp: {1623682022762} I/FlowChainTest01: apiResponseStringPrint - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseStringPrint - dateString: {20210614_224702} I/FlowChainTest01: buildFlowChain2 - emit [1623682022761], on Thread: DefaultDispatcher-worker-1 I/FlowTerminalTest01: testFlowByLaunch - collect: [20210614_224702], on Thread: main I/FlowChainTest01: apiResponseLongToDate - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseLongToDate - timeStamp: {1623682023023} I/FlowChainTest01: apiResponseDateToString - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseDateToString - timeStamp: {1623682023024} I/FlowChainTest01: apiResponseStringPrint - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseStringPrint - dateString: {20210614_224703} I/FlowChainTest01: buildFlowChain2 - emit [1623682023023], on Thread: DefaultDispatcher-worker-1 I/FlowTerminalTest01: testFlowByLaunch - collect: [20210614_224703], on Thread: main I/FlowChainTest01: apiResponseLongToDate - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseLongToDate - timeStamp: {1623682023297} I/FlowChainTest01: apiResponseDateToString - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseDateToString - timeStamp: {1623682023298} I/FlowChainTest01: apiResponseStringPrint - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseStringPrint - dateString: {20210614_224703} I/FlowChainTest01: buildFlowChain2 - emit [1623682023297], on Thread: DefaultDispatcher-worker-1 I/FlowTerminalTest01: testFlowByLaunch - collect: [20210614_224703], on Thread: main I/FlowChainTest01: apiResponseLongToDate - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseLongToDate - timeStamp: {1623682023569} I/FlowChainTest01: apiResponseDateToString - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseDateToString - timeStamp: {1623682023571} I/FlowChainTest01: apiResponseStringPrint - on Thread: DefaultDispatcher-worker-1 I/FlowChainTest01: apiResponseStringPrint - dateString: {20210614_224703} I/FlowChainTest01: buildFlowChain2 - emit [1623682023569], on Thread: DefaultDispatcher-worker-1 I/FlowTerminalTest01: testFlowByLaunch - collect: [20210614_224703], on Thread: main ``` --- # 2021/06/24 使用 ``launchWhenCreated()``, ``launchWhenStarted()``, ``launchWhenResumed()`` 會在某些 LifeCycle 區間內才 collect value 以 ``launchWhenStarted()`` 為例, ``Activity`` / ``Fragment`` 只有在 ``onStart`` 到 ``onStop`` 的週期內才會 ``collect`` 若單純使用 ``launch``, 那會直到 Job 被取消, 或 ``viewLifecycleOwner.lifecycleScope`` 結束為止 從下面的 Log 可以看見 ``StateFlow`` coroutine 持續在 ``TestFlowAndLiveDataViewModel1`` 中運行, 每秒更新一次 ``StateFlow``的 ``value`` 但 ``TestFlowFragment`` 只有在 ``onStart`` 到 ``onStop`` 的週期才會 ``collect`` ``` I/System.out: 09:46:02.262 [main] INFO TestFlowAndLiveDataViewModel1 - startIncrementCounterStateFlowPeriodically I/System.out: 09:46:02.263 [main] INFO TestFlowAndLiveDataViewModel1 - modifyIsRunningPeriodicalJobFlag - result: [true] I/System.out: 09:46:03.276 [pool-10-thread-1] INFO TestFlowAndLiveDataViewModel1 - IncrementCounterStateFlowPeriodically - time: [1624499163276] I/System.out: 09:46:03.281 [main] INFO TestFlowFragment - collect from StateFlow: [1s}] I/System.out: 09:46:04.290 [pool-10-thread-1] INFO TestFlowAndLiveDataViewModel1 - IncrementCounterStateFlowPeriodically - time: [1624499164290] I/System.out: 09:46:04.293 [main] INFO TestFlowFragment - collect from StateFlow: [2s}] I/System.out: 09:46:05.302 [pool-10-thread-1] INFO TestFlowAndLiveDataViewModel1 - IncrementCounterStateFlowPeriodically - time: [1624499165302] [main] INFO TestFlowFragment - collect from StateFlow: [3s}] ... I/System.out: 09:46:21.610 [pool-10-thread-1] INFO TestFlowAndLiveDataViewModel1 - IncrementCounterStateFlowPeriodically - time: [1624499181609] I/System.out: 09:46:21.619 [main] INFO TestFlowFragment - collect from StateFlow: [19s}] I/System.out: 09:46:22.225 [main] INFO TestFlowFragment - onPause I/System.out: 09:46:22.271 [main] INFO TestFlowFragment - onStop I/System.out: 09:46:23.644 [pool-10-thread-1] INFO TestFlowAndLiveDataViewModel1 - IncrementCounterStateFlowPeriodically - time: [1624499183644] I/System.out: 09:46:24.660 [pool-10-thread-1] INFO TestFlowAndLiveDataViewModel1 - IncrementCounterStateFlowPeriodically - time: [1624499184660] I/System.out: 09:46:25.677 [pool-10-thread-1] INFO TestFlowAndLiveDataViewModel1 - IncrementCounterStateFlowPeriodically - time: [1624499185676] ... I/System.out: 09:46:59.174 [pool-10-thread-1] INFO TestFlowAndLiveDataViewModel1 - IncrementCounterStateFlowPeriodically - time: [1624499219174] I/System.out: 09:46:59.880 [main] INFO TestFlowFragment - onStart I/System.out: 09:46:59.882 [main] INFO TestFlowFragment - collect from StateFlow: [56s}] I/System.out: 09:46:59.894 [main] INFO TestFlowFragment - onResume I/System.out: 09:47:00.181 [pool-10-thread-1] INFO TestFlowAndLiveDataViewModel1 - IncrementCounterStateFlowPeriodically - time: [1624499220181] I/System.out: 09:47:00.183 [main] INFO TestFlowFragment - collect from StateFlow: [57s}] I/System.out: 09:47:01.189 [pool-10-thread-1] INFO TestFlowAndLiveDataViewModel1 - IncrementCounterStateFlowPeriodically - time: [1624499221189] I/System.out: 09:47:01.194 [main] INFO TestFlowFragment - collect from StateFlow: [58s}] I/System.out: 09:47:02.210 [pool-10-thread-1] INFO TestFlowAndLiveDataViewModel1 - IncrementCounterStateFlowPeriodically - time: [1624499222209] I/System.out: 09:47:02.219 [main] INFO TestFlowFragment - collect from StateFlow: [59s}] ``` --- # [Flow](https://kotlinlang.org/docs/reference/coroutines/flow.html) 建立 ``Flow`` 乃是使用 ``flow { ... }`` builder ``Flow`` 是 Cold, 要有使引用 ``collect()`` 或 ``launchIn()``,它才會開始運作 另外,``launchIn()`` 會回傳 ``Job`` 物件,讓外部可以取消它; 使用 ``collect()`` 是無法取消它的 (因為它沒有回傳任何 ``Job`` 物件) ### __``collect``__: ``` /** * Terminal flow operator that collects the given flow with a provided [action]. * If any exception occurs during collect or in the provided flow, this exception is rethrown from this method. * * Example of use: * * ``` * val flow = getMyEvents() * try { * flow.collect { value -> * println("Received $value") * } * println("My events are consumed successfully") * } catch (e: Throwable) { * println("Exception from the flow: $e") * } * ``` */ public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector<T> { override suspend fun emit(value: T) = action(value) }) ``` __Sample__: ``` fun foo(): Flow<Int> = flow { println("Flow started") for (i in 1..3) { delay(100) emit(i) } } fun main() = runBlocking<Unit> { println("Calling foo...") val flow = foo() println("Calling collect...") flow.collect { value -> println(value) } println("Calling collect again...") flow.collect { value -> println(value) } } ``` --- 像 RxJava 一樣, 1. 有 __``take``__ operator 來取前幾個物件 ### ``take`` ``` /** * Returns a flow that contains first [count] elements. * When [count] elements are consumed, the original flow is cancelled. * Throws [IllegalArgumentException] if [count] is not positive. */ public fun <T> Flow<T>.take(count: Int): Flow<T> { require(count > 0) { "Requested element count $count should be positive" } return flow { var consumed = 0 try { collect { value -> if (++consumed < count) { return@collect emit(value) } else { return@collect emitAbort(value) } } } catch (e: AbortFlowException) { e.checkOwnership(owner = this) } } } ``` ``` fun numbers(): Flow<Int> = flow { try { emit(1) emit(2) println("This line will not execute") emit(3) } finally { println("Finally in numbers") } } fun main() = runBlocking<Unit> { numbers() .take(2) // take only the first two .collect { value -> println(value) } } ``` 2. 可以進行 Transform ``` /** * Returns a flow containing the results of applying the given [transform] function to each value of the original flow. */ public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value -> return@transform emit(transform(value)) } ``` ``` suspend fun performRequest(request: Int): String { delay(1000) // imitate long-running asynchronous work return "response $request" } // map() fun main() = runBlocking<Unit> { (1..3).asFlow() // a flow of requests .map { request -> performRequest(request) } .collect { response -> println(response) } } ``transform()`` fun main() = runBlocking<Unit> { (1..3).asFlow() // a flow of requests .transform { request -> emit("Making request $request") emit(performRequest(request)) } .collect { response -> println(response) } } ``` 3. Conversion to various collections like ``toList`` and ``toSet``. Operators to get the ``first`` value and to ensure that a flow emits a single value. Reducing a flow to a value with ``reduce`` and ``fold``. --- 資料發送,要看最末端的 ``collect`` 接收後,才會再送出下一個 ``` (1..5).asFlow() .filter { println("Filter $it") it % 2 == 0 } .map { println("Map $it") "string $it" }.collect { println("Collect $it") } ``` ``` Filter 1 Filter 2 Map 2 Collect string 2 Filter 3 Filter 4 Map 4 Collect string 4 Filter 5 ``` --- Flow 可使用``flowOn()`` 來切換 Thread! 使用 ``withContext()`` 時要小心別弄錯方式 ``` public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> { checkFlowContext(context) return when { context == EmptyCoroutineContext -> this this is FusibleFlow -> fuse(context = context) else -> ChannelFlowOperatorImpl(this, context = context) } } ``` ### 正確方式 ``` fun foo(): Flow<Int> = flow { for (i in 1..3) { Thread.sleep(100) // pretend we are computing it in CPU-consuming way log("Emitting $i") emit(i) // emit next value } }.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder fun main() = runBlocking<Unit> { foo().collect { value -> log("Collected $value") } } ``` ``` withContext(context) { foo.collect { value -> println(value) // run in the specified context } } ``` ### 錯誤方式 ``` fun foo(): Flow<Int> = flow { // The WRONG way to change context for CPU-consuming code in flow builder kotlinx.coroutines.withContext(Dispatchers.Default) { for (i in 1..3) { Thread.sleep(100) // pretend we are computing it in CPU-consuming way emit(i) // emit next value } } } fun main() = runBlocking<Unit> { foo().collect { value -> println(value) } } ``` --- ``` public interface FlowCollector<in T> { /** * Collects the value emitted by the upstream. * This method is not thread-safe and should not be invoked concurrently. */ public suspend fun emit(value: T) } ``` --- 只要使用``runBlocking`` 目前使用的Thread就會被 Block住, 直到內部 Block (``{}``)裡面的所有 Coroutine 都執行完成為止 使用``launch``用來啟動一個新的 Coroutine 使用``withContext``來將目前的工作排到某一個Thread執行 ``async + await`` 目前實作的次數不足,無法深刻體會... 下面這個程式片段, 為每個 for loop 中的工作使用``launch``用來啟動一個新的 Coroutine, 然後每個 Coroutine 都使用``withContext``來將目前的工作排到某一個Thread執行。 因為我們需要每個 Coroutine 若有找到結果,就寫入``subnetDeviceMap`` 當所有 Coroutine 都執行完後,就把結果``subnetDeviceMap``通知給``delegate`` 但這樣的方法,無法得知為每個 Coroutine 到底執行完成了沒? 因此只好利用``runBlocking``來等所有 Coroutine 執行完成, 才把結果``subnetDeviceMap``通知給``delegate`` 只是``runBlocking``會把目前使用的Thread就會被 Block住, 若是從 Main Thread 呼叫,就會卡住 Main Thread, 所以特別宣告 ``` val coroutineContext: CoroutineContext = Dispatchers.IO ``` 並將它當 Parameter 傳入 ``runBlocking``,才不會卡住 Main Thread ``` val coroutineContext: CoroutineContext = Dispatchers.IO val finalSubnetDeviceMap = runBlocking<Map<String, String>>(coroutineContext) { val subnetDeviceMap = hashMapOf<String, String>() errorMap.clear() for (ipAddress in ipAddressList) { launch { try { val serialNumber = withContext(Dispatchers.IO) { findSubnetDevice(ipAddress) } Log.i(getLogTag(), "^_^Y find serialNumber $serialNumber for ipAddress: $ipAddress") subnetDeviceMap[serialNumber] = ipAddress } catch(cause: IeSocketException) { errorMap[ipAddress] = cause } } } subnetDeviceMap } delegate.invoke(IeApiResult.Success(finalSubnetDeviceMap)) ``` 根據 [Parallel Map in Kotlin](https://jivimberg.io/blog/2018/05/04/parallel-map-in-kotlin/),上述使用 ``runBlocking`` 並不是一個好做法! 後來參考 [RxJava to Kotlin coroutines by Chris Banes](https://medium.com/androiddevelopers/rxjava-to-kotlin-coroutines-1204c896a700) 與 [CoroutineExtensions.kt](https://github.com/chrisbanes/tivi/blob/master/base/src/main/java/app/tivi/extensions/CoroutineExtensions.kt) 使用 ``Collection<A>.parallelMap``: ``` suspend fun <A, B> Collection<A>.parallelMap( concurrency: Int = defaultConcurrency, block: suspend (A) -> B ): List<B> = coroutineScope { val semaphore = Channel<Unit>(concurrency) map { item -> async { semaphore.send(Unit) // Acquire concurrency permit try { block(item) } finally { semaphore.receive() // Release concurrency permit } } }.awaitAll() } ``` 並將原本的程式片段改成,這樣就可以達成類似 ``RxJava`` 中 ``flatMap``的效果, 可以同時開許多個 Thread 執行 ``Collection`` 中元件的任務! 要注意的是,在 ``RxJava`` 中使用 ``flatMap``時, 也要``subscribeOn``將``Collection`` 中元件的任務安排到 Thread執行; 同理,在 Coroutine 中要使用 ``withContext``將``Collection`` 中元件的任務安排到 Thread執行! ``` launch { val resultList: List<SubnetPingResult<String>> = ipAddressList.parallelMap(getCustomizedThreadCount()) { ipAddress -> try { val serialNumber = withContext(Dispatchers.IO) { findSubnetDevice(ipAddress) } Log.i(getLogTag(), "^_^Y find serialNumber $serialNumber for ipAddress: $ipAddress - on Thread: ${Thread.currentThread().name}") SubnetPingResult<String>(ipAddress, result = serialNumber) } catch(cause: IeSocketException) { SubnetPingResult<String>(ipAddress, cause = cause) } } delegate.invoke(IeApiResult.Success(resultList)) } ```