--- title: 'Kotlin Channel 機制 - Select、Actor' disqus: kyleAlien --- Kotlin Channel 機制 - Select、Actor === ## Overview of Content :::success * 如果喜歡讀更好看一點的網頁版本,可以到我新做的網站 [**DevTech Ascendancy Hub**](https://devtechascendancy.com/) 本篇文章對應的是 [**Kotlin Channel 介紹使用 | Select、Actor | 生產者消費者**](https://devtechascendancy.com/kotlin-channel_select_actor_cs/) ::: [TOC] ## Channel 介紹 Kotlin 的 Channel 機制 **生產者 & 消費者 機制** ,這與 Java 中的 **BlockingQueue 類似** | 機制 | 入隊函數 | 出隊函數 | 等待方式 | 補充 | | - | - | - | - | - | | Kotlin Channel | send | receive | **掛起**(執行序讓出 CPU, 協程則繼續給其他地方使用) | 可主動關閉 | | Java BlockingQueue | put | take | **堵塞**(執行序讓出 CPU) | - | :::success * **掛起 & 堵塞** 差異 ? 其實兩者差異不大,都是讓出 CPU 的時間片,但是在 API 的設計上確有不同的語意: - **堵塞是 讓出 CPU 時間,並在滿足條件後「被動喚醒」,如果條件不滿足就一直堵塞**;如同 `Object#wait()` 方法 - **掛起是 讓出 CPU 時間,並在一定時間後「主動喚醒」去檢查條件是否滿足**;如同 `Thread#sleep()` 方法 ::: ### 生產者 / 消費者模式:Channel 基礎使用 * 以下做一個經典的生產者消費者(`Consumer`/`Supplier`)模式,這個模式下就是分為兩個腳色,一個負責生產資料,一個負責消耗資料; * 在這裡我們透過 Channel 類的基本使用來實現生產者/消費者;範例如下 > 以下範例中可以體現出 `BlockingQueue` 的堵塞功能 (讓消費者、生產者休眠不同的時間) ```kotlin= fun main() { runBlocking { println("Thread: ${Thread.currentThread().name}") testChannel(this) } println("Main finish") } suspend fun testChannel(scope : CoroutineScope) { val channel = Channel<Int>() scope.launch(Dispatchers.Default) { repeat(5) { delay(100) // 生產者休眠 100ms println("Channel sender, data: $it, time: ${Date().time % 10_000}") // supplier // send 是一個 suspend 函數 channel.send(it) } } repeat(5) { delay(150) // 消費者休眠 150ms // consumer // receive 是一個 suspend 函數 println("Channel receive: ${channel.receive()}, time: ${Date().time % 10_000}") } } ``` > ![image](https://hackmd.io/_uploads/S17hu8uaa.png) ### 生產者 / 消費者模式:Channel 中斷任務 * 這裡我們仍實做一個生產者/消費者模式,不過這裡我們在執行到一半時中斷 Channel,突顯出 Channel 可中斷任務的特點;範例如下… ```kotlin= fun main() { val channel = Channel<Int>() runBlocking { println("Thread: ${Thread.currentThread().name}") this.launch(Dispatchers.Default) { repeat(5) { delay(100) println("Channel sender, data: $it, time: ${Date().time % 10_000}") // supplier // send 是一個 suspend 函數 channel.send(it) } } this.launch(Dispatchers.Default) { repeat(5) { if (it == 2) { // 中斷 Channel channel.cancel() } delay(150) // consumer // receive 是一個 suspend 函數 println("Channel receive: ${channel.receive()}, time: ${ java.util.Date().time % 10_000}}") } } delay(1_000) } println("Main finish") } ``` > ![image](https://hackmd.io/_uploads/BJoFKLOTp.png) ### Channel 特點:緩衝 * Channel 類的另一特點在於 **可以設定緩衝區大小**;在這裡我們可以做一個簡單的例子,這個緩衝區相當於座位的概念,當座位滿時程可自然只能等待座位空出後才可以進入,否則就只能站著 > 以下範例中,**當緩衝區大小滿的時候 `send` 函數就會被掛起** ```kotlin= fun main() : Unit = runBlocking { val channel = Channel<Int>(capacity = 2) launch(coroutineContext) { repeat(4) { delay(50) channel.send(it) println("Channel sending: $it") } } launch(coroutineContext) { repeat(4) { delay(500) println("--- Channel receive: ${channel.receive()}") } } delay(3000) } ``` 可以看到 `send` 函數在 Buffer 滿了之後就無法再放入,導致 `send` 函數掛起等待空間空閑次才能再放入 > ![](https://i.imgur.com/rdv0wiU.png) ### ReceiveChannel Pipe 機制:傳遞結果 * **Pipe 是 Linux 中的進程通訊機制之一,可以將上一個處理的結果傳遞給下一個使用;而 `Coroutine` 也有設計相同的效果** :::info Pipe 機制也可以通過 OOP 設計中的 **責任鏈模式** 達成 ::: * **Coroutine 使用 `produce` 來產生一個 `ReceiveChannel` 物件**,這個物件可以用來接收 `SendChannel` 類透過 `send` 發出的運算結果 接下來的範例中,我們來將計算的步驟透過 `ReceiveChannel` 串接,計算的步驟串接概念如下圖 ```mermaid graph LR rc_1(發送要計算的數) rc_2(將數平方) rc_3(將數 + 1) rc_1 --> rc_2 --> rc_3 ``` ```kotlin= val coroutine = CoroutineScope(Job()) // 將結果透過 produce 傳出 fun produce1() = coroutine.produce(Dispatchers.Default) { // 上下文為 ProducerScope,而 ProducerScope 就繼承自 SendChannel repeat(3) { // 傳輸結果 send(it) } } // 再開啟一個 ReceiveChannel 物件,這個 ReceiveChannel 是負責做「數的平方」 fun produce2(rec: ReceiveChannel<Int>) = coroutine.produce(Dispatchers.Default) { // 透過 ReceiveChannel 接收結果 rec.toList().forEach { // 傳輸結果 send(it * it) } } // 再開啟一個 ReceiveChannel 物件,這個 ReceiveChannel 是負責做「數 + 1」 fun produce3(rec: ReceiveChannel<Int>) = coroutine.produce(Dispatchers.Default) { for (x in rec) { // 傳輸結果 send(x + 1) } } fun main() = runBlocking { val num = produce1() val squ = produce2(num) val add = produce3(squ) // 從最後一個 ReceiveChannel 消耗結果 add.consumeEach { println("add it: $it") // 消耗結果 } num.cancel() squ.cancel() add.cancel() delay(1000) } ``` 上面範例運行的過程如下(請由上至下去看) | produce\number | 0 | 1 | 2 | 3 | 4 | | -------- | -------- | -------- | - | - | - | | produce1 (傳送數) | 0 | 1 | 2 | 3 | 4 | | produce2 (平方) | 0 | 1 | 4 | 9 | 16 | | produce3 (加一)| 1 | 2 | 5 | 10 | 17 | :::warning 被消費(consume)過後的元素就不會往下傳遞 ::: > ![](https://i.imgur.com/0GGhGDJ.png) ## Select 表達 select 可以用來等待多個 suspend function 可以用來併發多個 `suspend` 函數,**並可取得其中一個先回覆的任務作為結果** ### Select 使用範例 * 接下來的範例中,我們創建兩個 `ReceiveChannel`,再透過 Select 來選中第一個返回的結果作為函數的結果! ```kotlin= val coroutine = CoroutineScope(Job()) fun produce1() = coroutine.produce<String>(Dispatchers.Default) { while (true) { delay(400) send("Kyle") } } fun produce2() = coroutine.produce<String>(Dispatchers.Default) { while (true) { delay(200) send("Pan") } } suspend fun selectSample(channel1: ReceiveChannel<String>, channel2: ReceiveChannel<String>) : String { // select 函數會堵塞 val sRes = select<String> { channel1.onReceive { "channel 1 -> $it" } channel2.onReceive { "channel 2 -> $it" } } println("Select finish") return sRes } fun main() : Unit = runBlocking { val produce1 = produce1() val produce2 = produce2() // 重複呼叫 4 次,看誰先處理完就會做為結果 repeat(5) { println("------Final result: ${selectSample(produce1, produce2)}") } delay(3000) produce1.cancel() produce2.cancel() } ``` > ![](https://i.imgur.com/A5yPjve.png) ## Actor 表達 **actor 其本身就是一個協程**,**它的內部包含 channel 成員**,所有我們可以透過對 actor 發送訊息,讓它傳遞訊息,也可以用來作為不同的協程通訊 ### Actor 使用範例 1. actor 返回的類型是 `SendChannel<E>`,主要可以用它來發送資料,而它並不像 channel 一樣有 `receive` 函數 2. actor 的最後一個 Lambda 參數的上下文為 ActorScope,而它的內部有 `channel` 成員 > ![image](https://hackmd.io/_uploads/ByKG4wOpp.png) ```kotlin= fun main(): Unit = runBlocking { val actor = actor<Int>(coroutineContext) { // 上下文為 ActorScope var sum = 0 // channel 是 ActorScope 中的成員 for (i in channel) { sum += i println("actor sum: $sum") } } repeat(5) { actor.send(it) } actor.close() } ``` > ![](https://i.imgur.com/SpDYBFg.png) ## 更多的 Kotlin 語言相關文章 在這裡,我們提供了一系列豐富且深入的 Kotlin 語言相關文章,涵蓋了從基礎到進階的各個方面。讓我們一起來探索這些精彩內容! ### Kotlin 語言基礎 * **Kotlin 語言基礎**:想要建立堅實的 Kotlin 基礎?以下這些文章將帶你深入探索 Kotlin 的關鍵基礎和概念,幫你打造更堅固的 Kotlin 語言基礎 :::info * [**Kotlin 函數、類、屬性 | DataClass、Sealed、Object 關鍵字 | Enum、Companion、NPE**](https://devtechascendancy.com/kotlin-functions_oop_dataclass_sealed_object/) * [**深入探究 Kotlin 與 Java 泛型:擦除、取得泛型類型、型變、投影 | 協變、逆變**](https://devtechascendancy.com/explore-kotlin-java-generics_type_erasure/) * [**深入 Kotlin 函數特性:Inline、擴展、標準函數全解析 | 提升程式碼效能與可讀性**](https://devtechascendancy.com/kotlin_inline_extensions_standards-func/) * [**Kotlin DSL、操作符、中綴表達式 Infix | DSL 詳解 | DSL 設計與應用**](https://devtechascendancy.com/kotlin-dsl-operators-infix-explained/) ::: ### Kotlin 特性、特點 * **Kotlin 特性、特點**:探索 Kotlin 的獨特特性和功能,加深對 Kotlin 語言的理解,並增強對於語言特性的應用 :::warning * [**Kotlin 代理與懶加載機制:使用、lazy 深度解析**](https://devtechascendancy.com/kotlin-delegate_java-proxy_lateinit_lazy/) * [**Kotlin Lambda 編程 & Bytecode | Array & Collections 集合 | 集合函數式 API**](https://devtechascendancy.com/kotlin-lambda-bytecode-array-collections-functional/) * [**深入理解 Kotlin:智能推斷與 Contact 規則**](https://devtechascendancy.com/kotlin-smart-inference-contract-rules-guide/) ::: ### Kotlin 進階:協程、響應式、異步 * **Kotlin 進階:協程、響應式、異步**:若想深入學習 Kotlin 的進階主題,包括協程應用、Channel 使用、以及 Flow 的探索,請查看以下文章 :::danger * [**應用 Kotlin 協程:對比 Thread、創建協程、任務掛起 | Dispatcher、CoroutineContext、CoroutineScope**](https://devtechascendancy.com/applied-kotlin-coroutines-in-depth-guide/) * [**Kotlin Channel 使用介紹 | Select、Actor | 生產者消費者**](https://devtechascendancy.com/kotlin-channel_select_actor_cs/) * [**探索 Kotlin Flow:基本使用、RxJava 對比、背壓機制 | Flow 細節**](https://devtechascendancy.com/kotlin-flow-usage_compare-rx_backpressure/) ::: ## Appendix & FAQ :::info ::: ###### tags: `Kotlin`