# Kotlin coroutines - 實務
###### tags: `Kotlin` `coroutines`
# 2025/03/11
檢磅資訊中有等級, 我想把相同等級的檢磅資訊歸納成一群, 然後列表會有兩層,
第一層是 summary, 只顯示等級和數量,
第二層才是明細 item
```
interface SaGradeWeightDetailDelegate:
CloneByUpdatedAtDelegate<SaGradeWeightDetailDelegate>, UpdatedAtDelegate
{
val theGradeWeightDetailId: String
val theGradeWeightId: String
val theGwdWeighingNo: String
val theGwdReferenceNo: String
val theGwdFarmerId: String
val theGwdFarmerCode: String
val theGwdFarmerName: String
val theGwdProductId: String
val theGwdProductCode: String
val theGwdProductName: String
val theGwdFarmerTeamId: String
val theGwdFarmerTeamCode: String
val theGwdFarmerTeamName: String
val theGwdFarmerTeamLevelId: String
/** 等級代碼, 6 */
val theGwdFarmerTeamLevelCode: String
/** 等級名稃, 良上 */
val theGwdFarmerTeamLevelName: String
/** 實際重量 */
val theGwdWeightActual: Float
/** 扣重 */
val theGwdWeightDeduction: Float
/** 計價重量 */
val theGwdWeightPricing: Float
/** 磅秤編號 */
val theGwdScaleNo: Int
val theGwdReceiptDate: String
}
interface SaGradeWeightDetailItemDelegate : TextLabelWrapperDelegate<SaGradeWeightDetailDelegate>
data class SaGradeWeightDetailSummary(
val theGwdFarmerTeamLevelCode: String,
val theGwdFarmerTeamLevelName: String,
val itemCount: Int,
val items: List<SaGradeWeightDetailItemDelegate>
)
```
Kotlin 的 ``Iterable<T>.groupBy`` 可以先把原本
``List<T>`` 依 ``T`` 中的 property 來當作 key, 建成 ``Map<K, List<T>>``
這樣 檢磅資訊 List 就依不同等級分群了
然後 ``Map<out K, R>.map()`` 再把中間生成的 ``Map`` 轉成 summary List ``List<R>``
```
val groupedItems = itemList
.groupBy {
Pair(it.theDataObject.theGwdFarmerTeamLevelCode, it.theDataObject.theGwdFarmerTeamLevelName)
}
.map { (key, items) ->
SaGradeWeightDetailSummary(
theGwdFarmerTeamLevelCode = key.first,
theGwdFarmerTeamLevelName = key.second,
itemCount = items.size,
items = items
)
}
```
# 2025/03/04
以下程式碼, ``launch`` 會建立兩個並行的 coroutine, 所以不會循序執行!
kotlin:
```
private fun compoundTasks() {
mixUiAndIoWork1()
mixUiAndIoWork2()
}
private fun mixUiAndIoWork1 () {
launch {
val otherItem: SaItem? = withContext(Dispatchers.IO) {
findOtherItem()
}
if (null != otherItem) {
// update UI
}
else {
Logger.getLogger(getLogTag()).log(Level.WARNING, "mixUiAndIoWork1 - otherItem is null!!")
}
}
}
private fun mixUiAndIoWork2 () {
launch {
val otherBuyer: SaBuyer? = withContext(Dispatchers.IO) {
findOtherBuyer()
}
if (null != otherBuyer) {
// update UI
}
else {
Logger.getLogger(getLogTag()).log(Level.WARNING, "mixUiAndIoWork2 - otherBuyer is null!!")
}
}
}
```
改成使用 ``suspend fun``, 並在一個地方統一使用 ``launch`` 才能確保循序執行
```
private fun compoundTasks() {
launch {
mixUiAndIoWork1()
mixUiAndIoWork2()
}
}
private suspend fun mixUiAndIoWork1 () {
val otherItem: SaItem? = withContext(Dispatchers.IO) {
findOtherItem()
}
if (null != otherItem) {
// update UI
}
else {
Logger.getLogger(getLogTag()).log(Level.WARNING, "mixUiAndIoWork1 - otherItem is null!!")
}
}
private suspend fun mixUiAndIoWork2 () {
val otherBuyer: SaBuyer? = withContext(Dispatchers.IO) {
findOtherBuyer()
}
if (null != otherBuyer) {
// update UI
}
else {
Logger.getLogger(getLogTag()).log(Level.WARNING, "mixUiAndIoWork2 - otherBuyer is null!!")
}
}
```
---
# 2024/05/09
* [Calling Kotlin Suspending Functions from Java](https://www.baeldung.com/kotlin/suspend-functions-from-java)
---
# 2024/04/30
``async{}`` and ``await()`` 要依序執行
## 錯誤:
```
fun triggerAsyncAwait() {
triggerSyncInstantlyResultLiveData.value = null
var syncResult = -1
val gallerySync: Deferred<FmApiResult<Int>> = async {
Logger.getLogger(getLogTag()).log(Level.INFO,"triggerAsyncAwait() - gallerySync start")
delay(10 * 1000L)
Logger.getLogger(getLogTag()).log(Level.WARNING,"triggerSyncInstantly() - gallerySync end")
FmApiResult.Success<Int>(0)
}
val galleryApplySync: Deferred<FmApiResult<Int>> = async {
Logger.getLogger(getLogTag()).log(Level.INFO,"triggerAsyncAwait() - galleryApplySync start")
delay(10 * 1000L)
Logger.getLogger(getLogTag()).log(Level.WARNING,"triggerSyncInstantly() - galleryApplySync end")
FmApiResult.Success<Int>(0)
}
launch (Dispatchers.IO) {
when(val gallerySyncResult = gallerySync.await()) {
is FmApiResult.Success -> {
Logger.getLogger(getLogTag()).log(Level.SEVERE,"triggerSyncInstantly() - gallerySync success - on Thread: [${Thread.currentThread().name}]")
}
is FmApiResult.Error -> {
Logger.getLogger(getLogTag()).log(Level.SEVERE,"triggerSyncInstantly() - gallerySync error - on Thread: [${Thread.currentThread().name}]", gallerySyncResult.cause)
}
}
when(val galleryApplySyncResult = galleryApplySync.await()) {
is FmApiResult.Success -> {
Logger.getLogger(getLogTag()).log(Level.SEVERE,"triggerSyncInstantly() - galleryApplySync success - on Thread: [${Thread.currentThread().name}]")
syncResult = 0
}
is FmApiResult.Error -> {
Logger.getLogger(getLogTag()).log(Level.SEVERE,"triggerSyncInstantly() - galleryApplySync error - on Thread: [${Thread.currentThread().name}]", galleryApplySyncResult.cause)
}
}
triggerSyncInstantlyResultLiveData.postValue(syncResult)
}
}
```
這樣會同時時執行 ``gallerySync``和 ``galleryApplySync``
## 解法
參照這篇文章, 使用 ``for``, 在真正要執行時, 才引用
``awaint {}.awit()`` 來執行背景任務, 這樣才會依序執行;
不然就是在 ``launch{}`` 中依序執行程式碼
* [How To Make Sequential Background Tasks with Kotlin Coroutines](https://medium.com/@manuchekhrdev/how-to-make-sequential-background-tasks-with-kotlin-coroutines-e94ccf9f45c1)
```
suspend fun delayFor(time: Long, tag: String): FmApiResult<Int> {
Logger.getLogger(getLogTag()).log(Level.WARNING,"delayFor() - [$tag] start")
delay(time)
Logger.getLogger(getLogTag()).log(Level.WARNING,"delayFor() - [$tag] end")
return FmApiResult.Success<Int>(0)
}
fun triggerAsyncAwait() {
triggerSyncInstantlyResultLiveData.value = null
var syncResult = -1
launch (Dispatchers.IO) {
val list = listOf("gallerySync", "galleryApplySync")
list.forEach { tag ->
val result = async { delayFor(10 * 1000L, tag) }.await()
when(result) {
is FmApiResult.Success -> {
Logger.getLogger(getLogTag()).log(Level.SEVERE,"triggerSyncInstantly() - [$it] success - on Thread: [${Thread.currentThread().name}]")
}
is FmApiResult.Error -> {
Logger.getLogger(getLogTag()).log(Level.SEVERE,"triggerSyncInstantly() - [$it] error - on Thread: [${Thread.currentThread().name}]", result.cause)
}
}
if (tag == "galleryApplySync") { syncResult = 1}
}
triggerSyncInstantlyResultLiveData.postValue(syncResult)
}
}
```
# 2023/03/21
New
[Kotlin Arrow Kt Eval Monad: A Beginner’s Guide](https://proandroiddev.com/kotlin-arrow-kt-eval-monad-a-beginners-guide-bff50e49d07c)
[Exploring the Power of Kotlin Contracts for Better Code Quality](https://oguzhanaslann.medium.com/exploring-the-power-of-kotlin-contracts-for-better-code-quality-80bb279d7d2d)
[Should you use getters and setters in Python?](https://python.plainenglish.io/should-you-use-getters-and-setters-in-python-d4db9a892878)
# 2021/10/20
How to make "inappropriate blocking method call" appropriate?
### Google "okhttp inappropriate blocking method call"
## [False positive: "Inappropriate blocking method call" with coroutines and Dispatchers.IO](https://youtrack.jetbrains.com/issue/KTIJ-838)
See this example (Android target):
```
@Throws(IOException::class)
fun blockingFunc(): String{
Thread.sleep(4000)
return "12"
}
private fun test() {
GlobalScope.launch(Dispatchers.Main) {
val v = withContext(Dispatchers.IO) {
blockingFunc() // << Warning: "Inappropriate blocking method call"
}
println("v: $v")
val v1 = async(Dispatchers.IO) {
blockingFunc() // << no warning
}.await()
println("v: $v1")
}
}
```
The ``blockingFunc`` is marked as throwing ``IOException``
(that's how IDE detects "blocking calls").
But the function is __called in ``Dispatchers.IO`` dedicated to such calls,
so doesn't block main thread (or calling coroutine)__.
``withContext(Dispatchers.IO)`` __behaves similar as calling__ ``delay()``,
__it suspends coroutine, does not block__.
In contrast, later call with ``async(Dispatchers.IO){}.await()`` __does similar thing -
switches context and waits for result, yet there is not this warning__.
It looks to me that this inspection is poorly documented
and doesn't work properly. It guesses "blocking function"
from ``@Throws`` annotation (which is not needed at all in Kotlin),
so it may only guess in limited cases.
Also blocking call may be anything else besides I/O,
e.g. compute factorial.
Finally, the warning is shown inconsistently and in my opinion it's wrong in this case.
---
## [How to make "inappropriate blocking method call" appropriate?](https://newbedev.com/how-to-make-inappropriate-blocking-method-call-appropriate)
* [How to make "inappropriate blocking method call" appropriate?](https://stackoverflow.com/questions/58680028/how-to-make-inappropriate-blocking-method-call-appropriate)
The warning is about methods that
__block current thread and coroutine cannot be properly suspended__.
This way, you __lose all benefits of coroutines and downgrade to one job per thread again__.
Each case should be handled in a different way.
For suspendable http calls you can use __Ktor http client__.
But sometimes there is no library for your case,
so you can either write your own solution or ignore this warning.
Edit: ``withContext(Dispatchers.IO)`` or some custom dispatcher
can be used to workaround the problem. Thanks for the comments.
結論, 使用 ``withContext(Dispatchers.IO)`` 去包住 blocking method calls 是一個可行的方式!!
You also get this warning when calling a suspending function that is annotated with @Throws(IOException::class) (Kotlin 1.3.61). Not sure if that is intended or not. Anyway, you can suppress this warning by removing that annotation or changing it to Exception class.
__Wrap__ the "inappropriate blocking method call" code in __another context__ using ``withContext``.
That is to say (for example):
If you are doing a read/write blocking method call:
```
val objects = withContext(Dispatchers.IO) { dao.getAll() }
```
If you are performing a blocking network request (using Retrofit):
```
val response = withContext(Dispatchers.IO) { call.execute() }
```
Or if you are performing a CPU intensive blocking task:
```
val sortedUsers = withContext(Dispatchers.Default) { users.sortByName() }
```
This will __suspend the current coroutine,
then execute the "inappropriate blocking call" on a different thread__
(from either the Dispatchers.IO or Dispatchers.Default pools),
thereby __not blocking the thread your coroutine is executing on__.
---
* 利用 Coroutine 實作 Long-running Task - [Kotlin coroutines - Long-running Task](https://hackmd.io/2mlMN-wtScCQTiF7IQAfHA)
不知能否用 ``Flow`` 來實作?
# 2021/05/26
針對 Flow
1. 與 suspend functions 相比, 是 Flow 可以 emit 多個 sequential values
2. 具有 Stream of data 的概念 (``Iterator``)
3. 以 Produce / Consume data 是以 asynchronous 方式執行 (suspend function, e.g., ``emit()``, ``collect()``)
4. Google 以 Producer, Intermediary 和 Consumer 三個角色來看它的 Operator
4.1. Producer class: 利用 ``Flow`` 來 ``emit()``
4.2. Consumer class: 利用 ``Flow`` 來 ``collect()``
## 建立 Flow
建立 ``Flow<Type>`` 可以使用 Flow builder ``flow()``, 傳入一個 suspend block:
```
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
```
e.g.,
```
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(continueFlag) {
showLog(Log.INFO, getLogTag(), "NewsRemoteDataSource - flow block")
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
```
```
val timeStampFlow: Flow<Long> = flow {
showLog(Log.INFO, "FlowChainTest01", "buildFlowChain - timeStampFlow - start, on Thread: ${Thread.currentThread().name}")
for (i in 1..10) {
delay(250)
val timeStamp = System.currentTimeMillis()
emit(timeStamp)
showLog(Log.INFO, "FlowChainTest01", "buildFlowChain - emit [$timeStamp], on Thread: ${Thread.currentThread().name}")
}
}
```
1. ``emit()`` 為 suspend function, 為 Producer 的概念
2. Flow 在 coroutine 中運行, 當執行到 suspend function,
Flow 就暫停直到 suspend function 執行完成
3. 使用 flow builder ``flow{}`` 建立的 ``Flow<Type>`` 為一個 __cold ``Flow``__
4. 本身跑在 coroutine 中 (由當下 ``CoroutineContext``決定決定)
--> 所以無法在其它 ``CoroutineContext`` 下進行 ``emit()``
也就是巢狀利用 ``launch`` 或 ``withContext`` 只能有 ``suspend functions``
---
仔細看, Flow builder ``flow()`` 和 ``launch()`` 不同, 並沒有傳入 ``CoroutineContext``!!
並且後來有推出 ``flowOn()`` 可以指定 ``Dispatchers.IO`` (類似 Rx 的 ``subscribeOn()``)
預設, flow 會跑在 terminal operator 的 coroutine 的 coroutineContext 上,
``flowOn()`` 會把 upstream flow 的 CoroutineContext 改掉
但不會影響到 terminal operator 的 coroutineContext
這樣的方式像 Rx 精神類似 , 把 Producer 和 Intermediary 都串好:
```
class FlowChainTest01 {
fun buildFlowChain(): Flow<Long> {
val timeStampFlow: Flow<Long> = flow {
showLog(Log.INFO, "FlowChainTest01", "buildFlowChain - timeStampFlow - start, on Thread: ${Thread.currentThread().name}")
for (i in 1..10) {
delay(250)
val timeStamp = System.currentTimeMillis()
emit(timeStamp)
showLog(Log.INFO, "FlowChainTest01", "buildFlowChain - emit [$timeStamp], on Thread: ${Thread.currentThread().name}")
}
}
return timeStampFlow
.map {
showLog(Log.INFO, "FlowChainTest01", "buildFlowChain - map - timeStamp: [$it], on Thread: ${Thread.currentThread().name}")
it % 10L
}
.onEach {
val value = it.toInt()
showLog(Log.INFO, "FlowChainTest01", "buildFlowChain - onEach - value: [$value], on Thread: ${Thread.currentThread().name}")
}
.flowOn(Dispatchers.IO)
}
}
```
最後再給 ``launch()`` 去 Consume Data!!
```
class FlowTerminalTest01 {
private val supervisorJob = SupervisorJob()
private var currentTaskJob: Job? = null
val context: CoroutineContext = Job() + Dispatchers.Main
//private val coroutineScope = CoroutineScope(supervisorJob)
private val coroutineScope = CoroutineScope(context)
fun cancelCurrentJobIfNeeded() {
if (null != currentTaskJob) {
showLog(Log.WARN, "FlowTerminalTest01",
"cancelCurrentJobIfNeeded - currentTaskJob - isActive: ${currentTaskJob!!.isActive}, isCancelled: ${currentTaskJob!!.isCancelled}, on Thread: ${Thread.currentThread().name}")
if ((currentTaskJob!!.isActive) && (!(currentTaskJob!!.isCancelled)) ) {
currentTaskJob!!.cancel()
showLog(Log.WARN, this@FlowTerminalTest01.getLogTag(), "[!! cancel current task job !!], on Thread: ${Thread.currentThread().name}")
}
}
}
fun testFlowByLaunch() {
cancelCurrentJobIfNeeded()
currentTaskJob = CoroutineScope(context).launch {
//observeSomething
showLog(Log.INFO, "FlowTerminalTest01", "testFlowByLaunch - launch, on Thread: ${Thread.currentThread().name}")
try {
FlowChainTest01().buildFlowChain().collect {
showLog(Log.INFO, "FlowTerminalTest01", "testFlowByLaunch - collect: [$it], on Thread: ${Thread.currentThread().name}")
}
}
catch (cause: Throwable) {
showLog(Log.ERROR, "FlowTerminalTest01", "testFlowByLaunch - exception on launch, on Thread: ${Thread.currentThread().name}", cause)
}
}
}
fun testFlowByLaunchIn() {
cancelCurrentJobIfNeeded()
currentTaskJob = FlowChainTest01().buildFlowChain()
.onStart {
showLog(Log.INFO, "FlowTerminalTest01", "testFlowByLaunchIn - onStart, on Thread: ${Thread.currentThread().name}")
}
.launchIn(coroutineScope)
}
}
```
```
I/FlowTerminalTest01: testFlowByLaunch - launch, on Thread: main
I/FlowChainTest01: buildFlowChain - timeStampFlow - start, on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: buildFlowChain - map - timeStamp: [1621994677734], on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: buildFlowChain - onEach - value: [4], on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: buildFlowChain - emit [1621994677734], on Thread: DefaultDispatcher-worker-1
I/FlowTerminalTest01: testFlowByLaunch - collect: [4], on Thread: main
I/FlowChainTest01: buildFlowChain - map - timeStamp: [1621994677993], on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: buildFlowChain - onEach - value: [3], on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: buildFlowChain - emit [1621994677993], on Thread: DefaultDispatcher-worker-1
I/FlowTerminalTest01: testFlowByLaunch - collect: [3], on Thread: main
I/FlowChainTest01: buildFlowChain - map - timeStamp: [1621994678250], on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: buildFlowChain - onEach - value: [0], on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: buildFlowChain - emit [1621994678250], on Thread: DefaultDispatcher-worker-1
I/FlowTerminalTest01: testFlowByLaunch - collect: [0], on Thread: main
I/FlowChainTest01: buildFlowChain - map - timeStamp: [1621994678507], on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: buildFlowChain - onEach - value: [7], on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: buildFlowChain - emit [1621994678507], on Thread: DefaultDispatcher-worker-1
I/FlowTerminalTest01: testFlowByLaunch - collect: [7], on Thread: main
I/FlowChainTest01: buildFlowChain - map - timeStamp: [1621994678763], on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: buildFlowChain - onEach - value: [3], on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: buildFlowChain - emit [1621994678763], on Thread: DefaultDispatcher-worker-1
I/FlowTerminalTest01: testFlowByLaunch - collect: [3], on Thread: main
I/FlowChainTest01: buildFlowChain - map - timeStamp: [1621994679022], on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: buildFlowChain - onEach - value: [2], on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: buildFlowChain - emit [1621994679022], on Thread: DefaultDispatcher-worker-1
I/FlowTerminalTest01: testFlowByLaunch - collect: [2], on Thread: main
I/FlowChainTest01: buildFlowChain - map - timeStamp: [1621994679281], on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: buildFlowChain - onEach - value: [1], on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: buildFlowChain - emit [1621994679281], on Thread: DefaultDispatcher-worker-1
I/FlowTerminalTest01: testFlowByLaunch - collect: [1], on Thread: main
I/FlowChainTest01: buildFlowChain - map - timeStamp: [1621994679538], on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: buildFlowChain - onEach - value: [8], on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: buildFlowChain - emit [1621994679538], on Thread: DefaultDispatcher-worker-1
I/FlowTerminalTest01: testFlowByLaunch - collect: [8], on Thread: main
I/FlowChainTest01: buildFlowChain - map - timeStamp: [1621994679794], on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: buildFlowChain - onEach - value: [4], on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: buildFlowChain - emit [1621994679794], on Thread: DefaultDispatcher-worker-1
I/FlowTerminalTest01: testFlowByLaunch - collect: [4], on Thread: main
I/FlowChainTest01: buildFlowChain - map - timeStamp: [1621994680052], on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: buildFlowChain - onEach - value: [2], on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: buildFlowChain - emit [1621994680052], on Thread: DefaultDispatcher-worker-1
I/FlowTerminalTest01: testFlowByLaunch - collect: [2], on Thread: main
```
flow 在遇到 Exception 時, 就會中止了!
```
I/FlowTerminalTest01: testFlowByLaunchIn - onStart, on Thread: main
I/FlowChainTest01: buildFlowChain - timeStampFlow - start, on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: buildFlowChain - map - timeStamp: [1621996311958], on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: buildFlowChain - onEach - value: [8], on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: buildFlowChain - emit [1621996311958], on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: buildFlowChain - map - timeStamp: [1621996312210], on Thread: DefaultDispatcher-worker-1
E/FlowTerminalTest01: testFlowByLaunchIn - catch, on Thread: main
tw.com.goglobal.project.baseframework.apis.IeRuntimeException: Throw error deliberately
```
若不想終止, 要另外用其它法處理, 例如, 每次都回傳 ``IeApiResponse``
```
data class IeApiResponse<T>(
val result: T?,
val error: IeRuntimeException?
) {
override fun toString() = "IeApiResponse {result: " + (result ?: "Null") + ", error: " + (error?: "Null") + "}"
}
```
```
fun buildFlowChain2(): Flow<IeApiResponse<Long>> {
val timeStampFlow: Flow<IeApiResponse<Long>> = flow {
showLog(Log.INFO, "FlowChainTest01", "buildFlowChain - timeStampFlow - start, on Thread: ${Thread.currentThread().name}")
for (i in 1..10) {
delay(250)
val timeStamp = System.currentTimeMillis()
emit(IeApiResponse<Long>(timeStamp, null))
showLog(Log.INFO, "FlowChainTest01", "buildFlowChain - emit [$timeStamp], on Thread: ${Thread.currentThread().name}")
}
}
return timeStampFlow
.map {
showLog(Log.INFO, "FlowChainTest01", "buildFlowChain - map - timeStamp: [$it], on Thread: ${Thread.currentThread().name}")
if (null != it.result) {
val result = it.result!! % 10L
if(result == 0L) {
IeApiResponse<Long>(null, IeRuntimeException("Throw error deliberately", "00000"))
}
else {
IeApiResponse<Long>(result, null)
}
}
else {
showLog(Log.ERROR, "FlowChainTest01", "buildFlowChain - map - it.result is null!!")
it
}
}
.onEach {
if (null != it.result) {
val value = it.result!!.toInt()
showLog(Log.INFO, "FlowChainTest01", "buildFlowChain - onEach - value: [$value], on Thread: ${Thread.currentThread().name}")
}
else {
showLog(Log.ERROR, "FlowChainTest01", "buildFlowChain - onEach - it.result is null!!")
}
}
.flowOn(Dispatchers.IO)
}
```
```
fun testFlowByLaunch2() {
cancelCurrentJobIfNeeded()
currentTaskJob = CoroutineScope(context).launch {
showLog(Log.INFO, "FlowTerminalTest01", "testFlowByLaunch - launch, on Thread: ${Thread.currentThread().name}")
FlowChainTest01().buildFlowChain2().collect {
if (null != it.result) {
showLog(Log.INFO, "FlowTerminalTest01", "testFlowByLaunch - collect: [${it.result!!}], on Thread: ${Thread.currentThread().name}")
}
else if (null != it.error) {
showLog(Log.ERROR, "FlowTerminalTest01", "testFlowByLaunch - exception on launch, on Thread: ${Thread.currentThread().name}", it.error!!)
}
}
}
}
```
---
``launchIn()`` 為 Terminal flow operator, 可以指定 ``coroutineScope``
實際上是 ``coroutineScope.launch { flow.collect() }`` 的重新包裝
常與 ``onEach`, ``onCompletion`` ``catch`` 作搭配:
使用 ``launchIn()``, 在記得加上 ``catch()`` 去處理 Exception, 不然程式會 crashed
e.g.,
```
flow
.onEach { value -> updateUi(value) }
.onCompletion { cause -> updateUi(if (cause == null) "Done" else "Failed") }
.catch { cause -> LOG.error("Exception: $cause") }
.launchIn(uiScope)
```
---
### [Differences in methods of collecting Kotlin Flows](https://itnext.io/differences-in-methods-of-collecting-kotlin-flows-3d1d4efd1c2)
``.collect()`` 與 ``.launchIn()`` 之間的區別
1.
``.collect()`` 是 suspending method
``.launchIn()`` 不是 suspending method
所以, ``.collect()`` 只能在其它 suspending method 中使用
但 ``.launchIn()`` 可以在所有 method 中使用
2.
``.launchIn()`` 還會回傳 ``Job`` Instance, 所以可以用來取消 coroutine
3. 測試與觀看原始碼, 會發現在某些情況下使用 ``.collect()``
整個 coroutine (``{...}`` 中) 會被暫停, 所以 ``runBlocking{}`` 中會整個暫停
但使用 ``.launchIn()`` 時, 它會使用所傳入的 ``CoroutineScope``
來建立另一個 coroutine, 然後去進行 ``.collect()``, 所以不會影響到原本的 coroutine
作者建議使用 ``.launchIn()``! 然後小心使用 ``.collect()``
---
### Transform / Intermediary operators:
``map{}``
``onEach{}`` 類似 Rx 的 ``doOnNext()``
intermediate operators 像 map(), onEach()
map() 的 block crossinline transform: suspend (value: T) -> R 有回傳值,
onEach()的 block action: suspend (T) -> Unit 沒有回傳值,
### Terminal / Consumer operators
``collect``, ``toList``, ``first``, ``single``
---
source code 有這段註解:
```
* Creates a _cold_ flow from the given suspendable [block].
* The flow being _cold_ means that the [block] is called every time a terminal operator is applied to the resulting flow.
*
* Example of usage:
*
* ```
* fun fibonacci(): Flow<BigInteger> = flow {
* var x = BigInteger.ZERO
* var y = BigInteger.ONE
* while (true) {
* emit(x)
* x = y.also {
* y += x
* }
* }
* }
*
* fibonacci().take(100).collect { println(it) }
* ```
*
* Emissions from [flow] builder are [cancellable] by default — each call to [emit][FlowCollector.emit]
* also calls [ensureActive][CoroutineContext.ensureActive].
*
* `emit` should happen strictly in the dispatchers of the [block] in order to preserve the flow context.
* For example, the following code will result in an [IllegalStateException]:
*
* ```
* flow {
* emit(1) // Ok
* withContext(Dispatcher.IO) {
* emit(2) // Will fail with ISE
* }
* }
* ```
*
* If you want to switch the context of execution of a flow, use the [flowOn] operator.
```
---
# 2021/06/14
這段測試 Flow 的 Code, Producer block 會依序 emit 10個 timestamp
然後 中間兩個 intermediate operator 會先後將 timestamp 轉成 Date,
再把 Date 轉成 string 最後被 consumer (collect) 處理
```
class FlowChainTest01 {
fun buildFlowChain(): Flow<IeApiResponse<String>> {
val timeStampFlow: Flow<IeApiResponse<Long>> = flow {
showLog(Log.INFO, "FlowChainTest01", "buildFlowChain - timeStampFlow - start, on Thread: ${Thread.currentThread().name}")
for (i in 1..10) {
delay(250)
val timeStamp = System.currentTimeMillis()
emit(IeApiResponse<Long>(timeStamp, null))
showLog(Log.INFO, "FlowChainTest01", "buildFlowChain - emit [$timeStamp], on Thread: ${Thread.currentThread().name}")
}
}
return timeStampFlow
.map {
apiResponseLongToDate(it)
//apiResponseLongToDate2(it)
}
.map {
//apiResponseDateToString(it)
apiResponseDateToString2(it)
}
.onEach {
apiResponseStringPrint(it)
}
.flowOn(Dispatchers.IO)
}
private fun apiResponseLongToDate(longApiResponse: IeApiResponse<Long>): IeApiResponse<Date> {
showLog(Log.INFO, "FlowChainTest01", "apiResponseLongToDate - on Thread: ${Thread.currentThread().name}")
return if (null != longApiResponse.result) {
showLog(Log.INFO, "FlowChainTest01", "apiResponseLongToDate - timeStamp: {${longApiResponse.result}}")
val theDate :Date = Date()
IeApiResponse<Date>(theDate, null)
}
else {
showLog(Log.ERROR, "FlowChainTest01", "apiResponseLongToDate - longApiResponse.result is null!!")
val error = IeRuntimeException("Timestamp is null", BaseErrorCodes.ILLEGAL_ARGUMENT_ERROR)
IeApiResponse<Date>(null, error)
}
}
private val apiResponseLongToDate2: (IeApiResponse<Long>) -> IeApiResponse<Date>
= { response -> apiResponseLongToDate(response) }
private fun apiResponseDateToString(dateApiResponse: IeApiResponse<Date>): IeApiResponse<String> {
showLog(Log.INFO, "FlowChainTest01", "apiResponseDateToString - on Thread: ${Thread.currentThread().name}")
return if (null != dateApiResponse.result) {
showLog(Log.INFO, "FlowChainTest01", "apiResponseDateToString - timeStamp: {${dateApiResponse.result!!.time}}")
val theDateString = dateApiResponse.result!!.toDateFormatString()
IeApiResponse<String>(theDateString, null)
}
else {
showLog(Log.ERROR, "FlowChainTest01", "apiResponseDateToString - dateApiResponse.result is null!!")
val error = IeRuntimeException("Date is null", BaseErrorCodes.ILLEGAL_ARGUMENT_ERROR)
IeApiResponse<String>(null, error)
}
}
private val apiResponseDateToString2: (IeApiResponse<Date>) -> IeApiResponse<String>
= { response -> apiResponseDateToString(response) }
private fun apiResponseStringPrint(stringApiResponse: IeApiResponse<String>) {
showLog(Log.INFO, "FlowChainTest01", "apiResponseStringPrint - on Thread: ${Thread.currentThread().name}")
if (null != stringApiResponse.result) {
showLog(Log.INFO, "FlowChainTest01", "apiResponseStringPrint - dateString: {${stringApiResponse.result!!}}")
}
else {
showLog(Log.ERROR, "FlowChainTest01", "apiResponseDateToString - stringApiResponse.result is null!!")
}
}
}
```
```
class FlowTerminalTest01 {
private val supervisorJob = SupervisorJob()
private var currentTaskJob: Job? = null
val context: CoroutineContext = Job() + Dispatchers.Main
//private val coroutineScope = CoroutineScope(supervisorJob)
private val coroutineScope = CoroutineScope(context)
fun cancelCurrentJobIfNeeded() {
if (null != currentTaskJob) {
showLog(Log.WARN, "FlowTerminalTest01",
"cancelCurrentJobIfNeeded - currentTaskJob - isActive: ${currentTaskJob!!.isActive}, isCancelled: ${currentTaskJob!!.isCancelled}, on Thread: ${Thread.currentThread().name}")
if ((currentTaskJob!!.isActive) && (!(currentTaskJob!!.isCancelled)) ) {
currentTaskJob!!.cancel()
showLog(Log.WARN, "FlowTerminalTest01", "[!! cancel current task job !!], on Thread: ${Thread.currentThread().name}")
}
}
}
fun testFlowByLaunch() {
cancelCurrentJobIfNeeded()
currentTaskJob = CoroutineScope(context).launch {
//observeSomething
showLog(Log.INFO, "FlowTerminalTest01", "testFlowByLaunch - launch, on Thread: ${Thread.currentThread().name}")
FlowChainTest01().buildFlowChain().collect {
if (null != it.result) {
showLog(Log.INFO, "FlowTerminalTest01", "testFlowByLaunch - collect: [${it.result!!}], on Thread: ${Thread.currentThread().name}")
}
else if (null != it.error) {
showLog(Log.ERROR, "FlowTerminalTest01", "testFlowByLaunch - exception on launch, on Thread: ${Thread.currentThread().name}", it.error!!)
}
}
}
}
}
```
可以發現 Flow 的 Log, 每一 Round
是從 Producer -> intermediate operators 處理完後, 才會emit 下一個值
而 consumer 因為在不同 Thread, 會再每一 round 的
最後一個 intermediate operator 處理完後才接收到值
還有因為我 log 是印在 emit 之後,
結果是``apiResponseStringPrint`` 跑完後才出現
``buildFlowChain2 - emit [1623682021202]``
```
I/FlowChainTest01: buildFlowChain2 - timeStampFlow - start, on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseLongToDate - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseLongToDate - timeStamp: {1623682021202}
I/FlowChainTest01: apiResponseDateToString - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseDateToString - timeStamp: {1623682021204}
I/FlowChainTest01: apiResponseStringPrint - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseStringPrint - dateString: {20210614_224701}
I/FlowChainTest01: buildFlowChain2 - emit [1623682021202], on Thread: DefaultDispatcher-worker-1
I/FlowTerminalTest01: testFlowByLaunch - collect: [20210614_224701], on Thread: main
I/FlowChainTest01: apiResponseLongToDate - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseLongToDate - timeStamp: {1623682021462}
I/FlowChainTest01: apiResponseDateToString - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseDateToString - timeStamp: {1623682021463}
I/FlowChainTest01: apiResponseStringPrint - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseStringPrint - dateString: {20210614_224701}
I/FlowChainTest01: buildFlowChain2 - emit [1623682021462], on Thread: DefaultDispatcher-worker-1
I/FlowTerminalTest01: testFlowByLaunch - collect: [20210614_224701], on Thread: main
I/FlowChainTest01: apiResponseLongToDate - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseLongToDate - timeStamp: {1623682021721}
I/FlowChainTest01: apiResponseDateToString - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseDateToString - timeStamp: {1623682021722}
I/FlowChainTest01: apiResponseStringPrint - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseStringPrint - dateString: {20210614_224701}
I/FlowChainTest01: buildFlowChain2 - emit [1623682021721], on Thread: DefaultDispatcher-worker-1
I/FlowTerminalTest01: testFlowByLaunch - collect: [20210614_224701], on Thread: main
I/FlowChainTest01: apiResponseLongToDate - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseLongToDate - timeStamp: {1623682021980}
I/FlowChainTest01: apiResponseDateToString - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseDateToString - timeStamp: {1623682021981}
I/FlowChainTest01: apiResponseStringPrint - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseStringPrint - dateString: {20210614_224701}
I/FlowChainTest01: buildFlowChain2 - emit [1623682021980], on Thread: DefaultDispatcher-worker-1
I/FlowTerminalTest01: testFlowByLaunch - collect: [20210614_224701], on Thread: main
I/FlowChainTest01: apiResponseLongToDate - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseLongToDate - timeStamp: {1623682022240}
I/FlowChainTest01: apiResponseDateToString - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseDateToString - timeStamp: {1623682022241}
I/FlowChainTest01: apiResponseStringPrint - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseStringPrint - dateString: {20210614_224702}
I/FlowChainTest01: buildFlowChain2 - emit [1623682022240], on Thread: DefaultDispatcher-worker-1
I/FlowTerminalTest01: testFlowByLaunch - collect: [20210614_224702], on Thread: main
I/FlowChainTest01: apiResponseLongToDate - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseLongToDate - timeStamp: {1623682022501}
I/FlowChainTest01: apiResponseDateToString - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseDateToString - timeStamp: {1623682022501}
I/FlowChainTest01: apiResponseStringPrint - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseStringPrint - dateString: {20210614_224702}
I/FlowChainTest01: buildFlowChain2 - emit [1623682022501], on Thread: DefaultDispatcher-worker-1
I/FlowTerminalTest01: testFlowByLaunch - collect: [20210614_224702], on Thread: main
I/FlowChainTest01: apiResponseLongToDate - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseLongToDate - timeStamp: {1623682022761}
I/FlowChainTest01: apiResponseDateToString - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseDateToString - timeStamp: {1623682022762}
I/FlowChainTest01: apiResponseStringPrint - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseStringPrint - dateString: {20210614_224702}
I/FlowChainTest01: buildFlowChain2 - emit [1623682022761], on Thread: DefaultDispatcher-worker-1
I/FlowTerminalTest01: testFlowByLaunch - collect: [20210614_224702], on Thread: main
I/FlowChainTest01: apiResponseLongToDate - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseLongToDate - timeStamp: {1623682023023}
I/FlowChainTest01: apiResponseDateToString - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseDateToString - timeStamp: {1623682023024}
I/FlowChainTest01: apiResponseStringPrint - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseStringPrint - dateString: {20210614_224703}
I/FlowChainTest01: buildFlowChain2 - emit [1623682023023], on Thread: DefaultDispatcher-worker-1
I/FlowTerminalTest01: testFlowByLaunch - collect: [20210614_224703], on Thread: main
I/FlowChainTest01: apiResponseLongToDate - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseLongToDate - timeStamp: {1623682023297}
I/FlowChainTest01: apiResponseDateToString - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseDateToString - timeStamp: {1623682023298}
I/FlowChainTest01: apiResponseStringPrint - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseStringPrint - dateString: {20210614_224703}
I/FlowChainTest01: buildFlowChain2 - emit [1623682023297], on Thread: DefaultDispatcher-worker-1
I/FlowTerminalTest01: testFlowByLaunch - collect: [20210614_224703], on Thread: main
I/FlowChainTest01: apiResponseLongToDate - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseLongToDate - timeStamp: {1623682023569}
I/FlowChainTest01: apiResponseDateToString - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseDateToString - timeStamp: {1623682023571}
I/FlowChainTest01: apiResponseStringPrint - on Thread: DefaultDispatcher-worker-1
I/FlowChainTest01: apiResponseStringPrint - dateString: {20210614_224703}
I/FlowChainTest01: buildFlowChain2 - emit [1623682023569], on Thread: DefaultDispatcher-worker-1
I/FlowTerminalTest01: testFlowByLaunch - collect: [20210614_224703], on Thread: main
```
---
# 2021/06/24
使用 ``launchWhenCreated()``, ``launchWhenStarted()``, ``launchWhenResumed()``
會在某些 LifeCycle 區間內才 collect value
以 ``launchWhenStarted()`` 為例,
``Activity`` / ``Fragment`` 只有在 ``onStart`` 到 ``onStop`` 的週期內才會 ``collect``
若單純使用 ``launch``, 那會直到 Job 被取消, 或 ``viewLifecycleOwner.lifecycleScope`` 結束為止
從下面的 Log 可以看見 ``StateFlow`` coroutine 持續在 ``TestFlowAndLiveDataViewModel1`` 中運行,
每秒更新一次 ``StateFlow``的 ``value``
但 ``TestFlowFragment`` 只有在 ``onStart`` 到 ``onStop`` 的週期才會 ``collect``
```
I/System.out: 09:46:02.262 [main] INFO TestFlowAndLiveDataViewModel1 - startIncrementCounterStateFlowPeriodically
I/System.out: 09:46:02.263 [main] INFO TestFlowAndLiveDataViewModel1 - modifyIsRunningPeriodicalJobFlag - result: [true]
I/System.out: 09:46:03.276 [pool-10-thread-1] INFO TestFlowAndLiveDataViewModel1 - IncrementCounterStateFlowPeriodically - time: [1624499163276]
I/System.out: 09:46:03.281 [main] INFO TestFlowFragment - collect from StateFlow: [1s}]
I/System.out: 09:46:04.290 [pool-10-thread-1] INFO TestFlowAndLiveDataViewModel1 - IncrementCounterStateFlowPeriodically - time: [1624499164290]
I/System.out: 09:46:04.293 [main] INFO TestFlowFragment - collect from StateFlow: [2s}]
I/System.out: 09:46:05.302 [pool-10-thread-1] INFO TestFlowAndLiveDataViewModel1 - IncrementCounterStateFlowPeriodically - time: [1624499165302]
[main] INFO TestFlowFragment - collect from StateFlow: [3s}]
...
I/System.out: 09:46:21.610 [pool-10-thread-1] INFO TestFlowAndLiveDataViewModel1 - IncrementCounterStateFlowPeriodically - time: [1624499181609]
I/System.out: 09:46:21.619 [main] INFO TestFlowFragment - collect from StateFlow: [19s}]
I/System.out: 09:46:22.225 [main] INFO TestFlowFragment - onPause
I/System.out: 09:46:22.271 [main] INFO TestFlowFragment - onStop
I/System.out: 09:46:23.644 [pool-10-thread-1] INFO TestFlowAndLiveDataViewModel1 - IncrementCounterStateFlowPeriodically - time: [1624499183644]
I/System.out: 09:46:24.660 [pool-10-thread-1] INFO TestFlowAndLiveDataViewModel1 - IncrementCounterStateFlowPeriodically - time: [1624499184660]
I/System.out: 09:46:25.677 [pool-10-thread-1] INFO TestFlowAndLiveDataViewModel1 - IncrementCounterStateFlowPeriodically - time: [1624499185676]
...
I/System.out: 09:46:59.174 [pool-10-thread-1] INFO TestFlowAndLiveDataViewModel1 - IncrementCounterStateFlowPeriodically - time: [1624499219174]
I/System.out: 09:46:59.880 [main] INFO TestFlowFragment - onStart
I/System.out: 09:46:59.882 [main] INFO TestFlowFragment - collect from StateFlow: [56s}]
I/System.out: 09:46:59.894 [main] INFO TestFlowFragment - onResume
I/System.out: 09:47:00.181 [pool-10-thread-1] INFO TestFlowAndLiveDataViewModel1 - IncrementCounterStateFlowPeriodically - time: [1624499220181]
I/System.out: 09:47:00.183 [main] INFO TestFlowFragment - collect from StateFlow: [57s}]
I/System.out: 09:47:01.189 [pool-10-thread-1] INFO TestFlowAndLiveDataViewModel1 - IncrementCounterStateFlowPeriodically - time: [1624499221189]
I/System.out: 09:47:01.194 [main] INFO TestFlowFragment - collect from StateFlow: [58s}]
I/System.out: 09:47:02.210 [pool-10-thread-1] INFO TestFlowAndLiveDataViewModel1 - IncrementCounterStateFlowPeriodically - time: [1624499222209]
I/System.out: 09:47:02.219 [main] INFO TestFlowFragment - collect from StateFlow: [59s}]
```
---
# [Flow](https://kotlinlang.org/docs/reference/coroutines/flow.html)
建立 ``Flow`` 乃是使用 ``flow { ... }`` builder
``Flow`` 是 Cold, 要有使引用 ``collect()`` 或 ``launchIn()``,它才會開始運作
另外,``launchIn()`` 會回傳 ``Job`` 物件,讓外部可以取消它;
使用 ``collect()`` 是無法取消它的 (因為它沒有回傳任何 ``Job`` 物件)
### __``collect``__:
```
/**
* Terminal flow operator that collects the given flow with a provided [action].
* If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.
*
* Example of use:
*
* ```
* val flow = getMyEvents()
* try {
* flow.collect { value ->
* println("Received $value")
* }
* println("My events are consumed successfully")
* } catch (e: Throwable) {
* println("Exception from the flow: $e")
* }
* ```
*/
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
```
__Sample__:
```
fun foo(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling foo...")
val flow = foo()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
```
---
像 RxJava 一樣,
1. 有 __``take``__ operator 來取前幾個物件
### ``take``
```
/**
* Returns a flow that contains first [count] elements.
* When [count] elements are consumed, the original flow is cancelled.
* Throws [IllegalArgumentException] if [count] is not positive.
*/
public fun <T> Flow<T>.take(count: Int): Flow<T> {
require(count > 0) { "Requested element count $count should be positive" }
return flow {
var consumed = 0
try {
collect { value ->
if (++consumed < count) {
return@collect emit(value)
} else {
return@collect emitAbort(value)
}
}
} catch (e: AbortFlowException) {
e.checkOwnership(owner = this)
}
}
}
```
```
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // take only the first two
.collect { value -> println(value) }
}
```
2. 可以進行 Transform
```
/**
* Returns a flow containing the results of applying the given [transform] function to each value of the original flow.
*/
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value ->
return@transform emit(transform(value))
}
```
```
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
// map()
fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
``transform()``
fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
}
```
3. Conversion to various collections like ``toList`` and ``toSet``.
Operators to get the ``first`` value and to ensure that a flow emits a single value.
Reducing a flow to a value with ``reduce`` and ``fold``.
---
資料發送,要看最末端的 ``collect`` 接收後,才會再送出下一個
```
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
```
```
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
```
---
Flow 可使用``flowOn()`` 來切換 Thread! 使用 ``withContext()`` 時要小心別弄錯方式
```
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
checkFlowContext(context)
return when {
context == EmptyCoroutineContext -> this
this is FusibleFlow -> fuse(context = context)
else -> ChannelFlowOperatorImpl(this, context = context)
}
}
```
### 正確方式
```
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
fun main() = runBlocking<Unit> {
foo().collect { value ->
log("Collected $value")
}
}
```
```
withContext(context) {
foo.collect { value ->
println(value) // run in the specified context
}
}
```
### 錯誤方式
```
fun foo(): Flow<Int> = flow {
// The WRONG way to change context for CPU-consuming code in flow builder
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
emit(i) // emit next value
}
}
}
fun main() = runBlocking<Unit> {
foo().collect { value -> println(value) }
}
```
---
```
public interface FlowCollector<in T> {
/**
* Collects the value emitted by the upstream.
* This method is not thread-safe and should not be invoked concurrently.
*/
public suspend fun emit(value: T)
}
```
---
只要使用``runBlocking`` 目前使用的Thread就會被 Block住,
直到內部 Block (``{}``)裡面的所有 Coroutine 都執行完成為止
使用``launch``用來啟動一個新的 Coroutine
使用``withContext``來將目前的工作排到某一個Thread執行
``async + await`` 目前實作的次數不足,無法深刻體會...
下面這個程式片段,
為每個 for loop 中的工作使用``launch``用來啟動一個新的 Coroutine,
然後每個 Coroutine 都使用``withContext``來將目前的工作排到某一個Thread執行。
因為我們需要每個 Coroutine 若有找到結果,就寫入``subnetDeviceMap``
當所有 Coroutine 都執行完後,就把結果``subnetDeviceMap``通知給``delegate``
但這樣的方法,無法得知為每個 Coroutine 到底執行完成了沒?
因此只好利用``runBlocking``來等所有 Coroutine 執行完成,
才把結果``subnetDeviceMap``通知給``delegate``
只是``runBlocking``會把目前使用的Thread就會被 Block住,
若是從 Main Thread 呼叫,就會卡住 Main Thread,
所以特別宣告
```
val coroutineContext: CoroutineContext = Dispatchers.IO
```
並將它當 Parameter 傳入 ``runBlocking``,才不會卡住 Main Thread
```
val coroutineContext: CoroutineContext = Dispatchers.IO
val finalSubnetDeviceMap = runBlocking<Map<String, String>>(coroutineContext) {
val subnetDeviceMap = hashMapOf<String, String>()
errorMap.clear()
for (ipAddress in ipAddressList) {
launch {
try {
val serialNumber = withContext(Dispatchers.IO) { findSubnetDevice(ipAddress) }
Log.i(getLogTag(), "^_^Y find serialNumber $serialNumber for ipAddress: $ipAddress")
subnetDeviceMap[serialNumber] = ipAddress
}
catch(cause: IeSocketException) {
errorMap[ipAddress] = cause
}
}
}
subnetDeviceMap
}
delegate.invoke(IeApiResult.Success(finalSubnetDeviceMap))
```
根據
[Parallel Map in Kotlin](https://jivimberg.io/blog/2018/05/04/parallel-map-in-kotlin/),上述使用 ``runBlocking`` 並不是一個好做法!
後來參考
[RxJava to Kotlin coroutines by Chris Banes](https://medium.com/androiddevelopers/rxjava-to-kotlin-coroutines-1204c896a700)
與
[CoroutineExtensions.kt](https://github.com/chrisbanes/tivi/blob/master/base/src/main/java/app/tivi/extensions/CoroutineExtensions.kt)
使用 ``Collection<A>.parallelMap``:
```
suspend fun <A, B> Collection<A>.parallelMap(
concurrency: Int = defaultConcurrency,
block: suspend (A) -> B
): List<B> = coroutineScope {
val semaphore = Channel<Unit>(concurrency)
map { item ->
async {
semaphore.send(Unit) // Acquire concurrency permit
try {
block(item)
} finally {
semaphore.receive() // Release concurrency permit
}
}
}.awaitAll()
}
```
並將原本的程式片段改成,這樣就可以達成類似 ``RxJava`` 中 ``flatMap``的效果,
可以同時開許多個 Thread 執行 ``Collection`` 中元件的任務!
要注意的是,在 ``RxJava`` 中使用 ``flatMap``時,
也要``subscribeOn``將``Collection`` 中元件的任務安排到 Thread執行;
同理,在 Coroutine 中要使用 ``withContext``將``Collection`` 中元件的任務安排到 Thread執行!
```
launch {
val resultList: List<SubnetPingResult<String>>
= ipAddressList.parallelMap(getCustomizedThreadCount()) { ipAddress ->
try {
val serialNumber = withContext(Dispatchers.IO) { findSubnetDevice(ipAddress) }
Log.i(getLogTag(), "^_^Y find serialNumber $serialNumber for ipAddress: $ipAddress - on Thread: ${Thread.currentThread().name}")
SubnetPingResult<String>(ipAddress, result = serialNumber)
}
catch(cause: IeSocketException) {
SubnetPingResult<String>(ipAddress, cause = cause)
}
}
delegate.invoke(IeApiResult.Success(resultList))
}
```