--- title: '響應式編程-概念' disqus: hackmd --- # 1. 概念 * **Servlet Stack**: * Spring MVC * 同步、阻塞式架構(synchronous blocking I/O) * follow 一個請求一條線程的模型(one-request-pre-thread model) * **Reactive Stack**: * Spring Webflux * 非同步架構(non-blocking web framwork) * 支持高並發、高吞吐量 * **比較** | | Reactive Stack | Servlet Stack | | -------- | -------- | -------- | | 底層 |Netty, Servlet 3.1(開始支持異步) + Containers|Servlet Containers| |底層規範|Reactive Stream Adapters|Servlet API| |SpringBoot 套件對應|Spring Security Reactive|Spring Security| | |Spring Webflux|Spring MVC| | |Spring Data Reactive Repostiories|Spring DataRepostiories| * **常識**: * **高並發**:緩存(緩沖區)+異步+排序機制 * **高可用**:分片(數據如何分佈)+複製(數據複製機制,並免數據遺失)+選領導 * **非阻塞圖**: ![非阻塞原理圖](https://hackmd.io/_uploads/Sk-7o5lZJg.png) * **Netty概念圖**:『`少量線程持續運行` **優於** `大量線程等待切換`』 (`非同步核心概念`)![Netty概念圖](https://hackmd.io/_uploads/B11_oqgZJx.png) ## 1.1 Reactive Stream 規範 * 底層機制 * 發布訂閱 * 四大組件 ## 1.2 Reactor 框架(響應式庫) * Mono/Flux * 流操作符 * 線程調度 * 錯誤處理 * 高級特性 ## 1.3 Spring Webflux 響應式應用 * **說明**:**響應式的Web框架** * **對標Spring MVC** * **內容**: * HttpServer(底層的響應式機制) * DispatcherHandler原理 * SEE功能 * 新版Filter * Spring MVC 組件對比 ### 1.3.1 Spring Data R2DBC (Reactive Relational Database Connectivity) * **說明**:針對關聯數據庫的**響應式 API 標準** * **內容**: * R2dbc 驅動 * DatabaseClient API * 自定義Repository * 自定義Converter * 關聯查詢 * 最佳實踐 ### 1.3.2 Spring Security Reactive * **內容**: * RBAC權限模型 * FilterChain配置 * ReactiveUserDetailsService * PasswordEncoder * 授權認證配置 # 2. Reactive Stream 規範 * **Reactive Stream **是JVM面向**響應式**(`流`)的庫的**標準**和**規範** * 處理不限數量的元素 * 有序 * 組件之間**異步傳遞元素** * 強制性**非阻塞**、**背壓模式**(`Backpressure`) * **正向壓力**(`正壓`):數據的生產者給消費者壓力 * **背壓模式**:**數據消費者**告知**數據生產者**消費的速度,從而實現生產和消費的速度協調 * **非阻塞原理**:緩沖區+**回調**(`是一種在程式碼運行過程中將一個方法傳遞給另一個方法`) * **目標**:通過**全異步的方式**,加**緩衝區**,構建一個`實時的數據流系統` * **概念**:**Tomcat**發過來的請求,**SpringBoot**這邊會先進入緩衝區(`數據`),**處理的線程**持續的從**緩衝區中取出數據**(`消費數據`),所以Tomcat可以持續的**發送請求**(`產生數據`),不需等待結果返回 * Kafka、MQ能建構出**大型分散式的響應式系統**,但**缺少本地化的訊息系統解決方案**: * 讓所有的異步線程能互相監聽訊息,處理訊息,建立即時訊息處理串流 * **核心**:**消息傳遞**(`響應式構建的依據`) * 基於異步、消息事件的全事件回調系統(響應式系統) * 圖:![響應式系統圖](https://hackmd.io/_uploads/HJyMLBAxkl.png) * **總結**: * **底層**:基於**數據緩沖隊列**+**消息驅動模型**+**異步回調機制** * 編碼:流式編程+鏈式調用+聲明式API * **效果**:**優雅全異步**+**消息實時處理**+**高吞吐量**+佔用少量資源 * **痛點**: * **以前**:開發一個**高並發系統**:快取、非同步、排隊機制,**手動控制整個邏輯** * **現在**:**全自動控制**整個邏輯(`天生高並發`) ## 2.1 Reactive Stream 規範-四大核心接口 * **java.util.concurrent.Flow類**:**共有四大函數接口**(`響應式使用`),構建出響應式系統 * **Processor\<T, R>**:處理器 * 同時實現**訂閱者**、**發佈者**的接口的組件(`是訂閱者也是發佈者`) * 接收一個來自發佈者的數據,並進行處理,再發佈給下一個訂閱者 * 在Reactor中擔任中間環節,**表示一個處理階段** * **Subscription**:訂閱關係 * **訂閱者**與**發佈者**之間的關鍵接口 * 訂閱者通過訂閱表示對發佈者所產生的數據有興趣,可以請求一定數量的元素 * 訂閱者也可以取消訂閱 * **Subscriber\<T>**:訂閱者 * 消費數據流 * **Publisher\<T>**:發佈者 * 產生數據流 * **管道圖**: ![響應式數據管道圖](https://hackmd.io/_uploads/rJ492HCl1x.png) * **綁定操作**:就是發布者,記住了所有訂閱者都有誰,有數據後,給所有訂閱者把數據推送過去 * **鏈表的形式綁定了責任關係** ## 2.2 Reactive Stream 規範-發佈\訂閱數據 * **概念圖**: ![發佈與訂閱數據概念圖](https://hackmd.io/_uploads/H1xeQORgJe.png) * **代碼**: ```java= public class FlowDemo { public static void main(String[] args) throws InterruptedException { //1. 定義一個發佈者:發佈數據 //1.1 使用已 實現的發佈者 SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); //2. 定義一個訂閱者:訂閱數據 Flow.Subscriber<String> subscriber1 = new Flow.Subscriber<>() { private Flow.Subscription subscription; //在訂閱時,onXxxx:表示Xxxx事件發生時,會執行這個回調 @Override public void onSubscribe(Flow.Subscription subscription) { System.out.println(Thread.currentThread().getName() + " - 訂閱開始: " + subscription); this.subscription = subscription; //訂閱建立後,立刻從上游請求一個數據 subscription.request(1); } //在下一個元素到達時,執行這個回調方法,接收新的數據 @Override public void onNext(String s) { System.out.println(Thread.currentThread().getName() + " - 接收數據: " + s); if (s.equals("p-7")) { //到達目標後就取消訂閱 subscription.cancel(); } else { //每次消費完一個新元素後,再請求一個新的元素 subscription.request(1); } } //在錯誤發生時(生產數據出現錯誤),執行這個回調方法 @Override public void onError(Throwable throwable) { System.out.println(Thread.currentThread().getName() + " - 接收到錯誤: " + throwable); } //在完成時 @Override public void onComplete() { System.out.println(Thread.currentThread().getName() + " - 接收到完成"); } }; //3. 綁定發佈者與訂閱者 publisher.subscribe(subscriber1); //4 發佈數據:發佈10數據 // for (int i = 0; i < 100; i++) { System.out.println(Thread.currentThread().getName()); if (i > 96) { //異常中斷數據推送,訂閱者會回調onError(),目前不好處理,等到Reactor框架再處理 publisher.closeExceptionally(new Exception("asd")); } else { publisher.submit("p-" + i); } //發佈數據:是給某一個隊列存入數據 => 寫入緩衝區 } //5. 發佈者關閉通道,訂閱者接受到關閉信號,會回調onComplete() publisher.close(); Thread.sleep(1000); } } ``` ## 2.3 Reactive Stream 規範-中間處理 Processor * **Processor是『訂閱者』也是『發佈者』** * **圖**: ![發佈與訂閱與處理數據概念圖](https://hackmd.io/_uploads/rJh8Xoybyx.png) * **代碼**: ```java= public class FlowDemo2 { //定義流中間操作,添加一個haha前綴 //一個訂閱者,也是一個發佈者 //只寫訂閱者的接口 static class Processor1 extends SubmissionPublisher<String> implements Flow.Processor<String, String> { private Flow.Subscription subscription;//保存綁定關係 @Override public void onSubscribe(Flow.Subscription subscription) { System.out.println("Proccess 訂閱:" + subscription); this.subscription = subscription; //要一個新數據 subscription.request(1); } @Override public void onNext(String s) { System.out.println("Proccess 拿到數據:" + s); //加工 s += "haha"; //發送加工後數據 submit(s);//SubmissionPublisher類的方法 //要一個新數據 subscription.request(1); } @Override public void onError(Throwable throwable) { System.out.println("Proccess: " + Thread.currentThread().getName() + " - 接收到錯誤: " + throwable); } @Override public void onComplete() { System.out.println("Proccess: " + Thread.currentThread().getName() + " - 接收到完成"); } } public static void main(String[] args) throws InterruptedException { //1. 定義一個發佈者:發佈數據 //1.1 使用已 實現的發佈者 SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); //2. 定義中間操作:添加一個haha前綴 Processor1 processor1 = new Processor1(); //3. 定義一個訂閱者:訂閱數據 Flow.Subscriber<String> subscriber1 = new Flow.Subscriber<>() { private Flow.Subscription subscription; //在訂閱時,onXxxx:表示Xxxx事件發生時,會執行這個回調 @Override public void onSubscribe(Flow.Subscription subscription) { System.out.println(Thread.currentThread().getName() + " - 訂閱開始: " + subscription); this.subscription = subscription; //訂閱建立後,立刻從上游請求一個數據 subscription.request(1); } //在下一個元素到達時,執行這個回調方法,接收新的數據 @Override public void onNext(String s) { System.out.println(Thread.currentThread().getName() + " - 接收數據: " + s); //每次消費完一個新元素後,再請求一個新的元素 subscription.request(1); } //在錯誤發生時(生產數據出現錯誤),執行這個回調方法 @Override public void onError(Throwable throwable) { System.out.println(Thread.currentThread().getName() + " - 接收到錯誤: " + throwable); } //在完成時 @Override public void onComplete() { System.out.println(Thread.currentThread().getName() + " - 接收到完成"); } }; //3.綁定 //3.1 發佈者綁定處理器 publisher.subscribe(processor1); //3.2 處理器再綁定訂閱者 processor1.subscribe(subscriber1); //4 發佈數據:發佈10數據 for (int i = 0; i < 10; i++) { publisher.submit("p-" + i); //發佈數據:是給某一個隊列存入數據 => 寫入緩衝區 } //5. 發佈者關閉通道,訂閱者接受到關閉信號,會回調onComplete() publisher.close(); Thread.sleep(1000); } } ``` # 3. Reactor 響應式框架 * **基本流程**:產生數據 => 中間操作 => 消費數據 * **概念**: 1. 數據流:數據源頭 2. 變化傳播:數據操作(中間操作) 3. 異步編程模式:底層控制異步 * 數據流:每個元素從流的源頭,開始源源不斷的,自己往下滾動 * onNext:當某個元素到達後,我們可以定義它的處理邏輯 * onError:異常結束,觸發事件 * onComplete:處理結束,觸發事件 * 命令式編程如下:響應式是將命令式編程的概念,轉換成響應式對應的方法 ```java= public void execute(List<Order> orderList){ //處理訂單 //異常處理 //正常返回 } ``` * **流**:**數據** + **信息**(`完成/異常`) * 數據流**經過中間操作**後,會產生**新的數據流** * 所有**操作符**(`中間操作`)基於發佈者流 ## 3.1 Reactor基礎 ### 3.1.1 發佈流包裝類 * **Flux**\<T>:一個流中**0~N**個元素 * **Mono**\<T>:一個流中**0~1**個元素 * 代碼: ```java= public class FluxDemo { public static void main(String[] args) throws IOException { mono(); flux(); } public static void mono() throws IOException { Mono<Integer> just = Mono.just(1); just.subscribe(e -> System.out.println("mono : " + e)); } public static void flux() throws IOException { //1. 數據流:多元素 (流沒有被消費,就不會被啟用) Flux<Integer> just = Flux.just(1, 2, 3, 4, 5); //2. 訂閱(消費)流 just.subscribe(e -> System.out.println("e1 = " + e)); //一個流可以有多個消費者 just.subscribe(e -> System.out.println("e2 = " + e)); //對每個消費者來說流都是一樣 => 廣播模式 System.out.println("======"); Flux<Long> interval = Flux.interval(Duration.ofSeconds(1L));//每秒產生一個數字,從0開始 interval.subscribe(System.out::println); //異步的,所以避免主線程結束 System.in.read();//輸入一個數據後,主線程結束 } } ``` ### 3.1.2 事件感知API - doOnXxx * **說明**:當流發生什麼事情的時候,觸發一個doOnXxx * **觸發一個回調**(`Hook 勾子函數`) * **設定在發佈者身上** * **注意**:如果該流有執行**中間操作**,那該**事件感知**是寫在該流的後面,該事件感知是感知該新的流 ```java public static void fluxOnEx() throws IOException, InterruptedException { Flux<Integer> flux = Flux.range(1, 10) //當前是 感知flux這個最一開始的流 .doOnNext(integer -> System.out.println("流1 doOnNext " + integer)) //中間操作 Processor .map(integer -> integer * integer) //當前是 感知map處理後的流 .doOnNext(integer -> System.out.println("流2 doOnNext " + integer)) //中間操作 Processor .delayElements(Duration.ofSeconds(1))//延遲元素傳輸,產生新流 //當前是 感知delayElements處理後的流 .doOnNext(integer -> System.out.println("流3 doOnNext " + integer)); flux.subscribe(System.out::println); } ``` * **doOnXxx** * doOnComplete:流**正常結束**時,觸發 * doOnCancel:流**被取消**時,觸發 * doOnError:流**有錯誤/異常**時,觸發 * doOnNext:流**有『數據』到達**時,觸發 * SignalType.**ON_NEXT** * doOnEach:流**有『元素』到達**時,觸發 * **元素**:流的數據、**信號** * **reactor.core.publisher.SignalType**類:**信號** * SUBSCRIBE:被訂閱時 * REQUEST:請求N個元素時 * CANCEL:流被取消訂閱時 * ON_SUBSCRIBE:在訂閱時 * ON_NEXT:在數據到達時 * ON_ERROR:在流錯誤時 * ON_COMPLETE:在流正常完成時 * AFTER_TERMINATE:在流被中斷以後 * CURRENT_CONTEXT:當前上下文 * ON_CONTEXT:感知上下文 * doOnSubscribe:流**被訂閱**時,觸發 * doOnRequest:當**消費者請求元素**時,觸發 * 默認請求數據量為**Long.MAX_VALUE** * doOnTerminate:流**被取消**、**有錯誤/異常**被中斷時,觸發 * doOnDiscard:流中**有元素被忽略**時,觸發 * **代碼**: ```java= public static void fluxOnAPI() throws IOException, InterruptedException { Flux<Integer> flux = Flux.range(1, 10) //中間操作 Processor .delayElements(Duration.ofSeconds(1))//延遲元素傳輸,產生新流 //事件感知 //當前是 感知delayElements處理後的流 .doOnCancel(() -> System.out.println("流 被取消")) .doOnError(throwable -> System.out.println("流 異常錯誤 " + throwable)) .doOnNext(integer -> System.out.println("流 doOnNext " + integer)) .doOnEach(signal -> { if (signal.isOnNext()) { System.out.println("流 doOnEach get doOnNext: " + signal.get()); } else if (signal.isOnComplete()) { System.out.println("流 doOnEach get doOnComplete"); } else if (signal.isOnError()) { System.out.println("流 doOnEach doOnError " + signal.getThrowable()); } }) .doOnRequest(e -> System.out.println("流 doOnRequest " + e)) .doOnSubscribe(subscription -> System.out.println("流 被訂閱 " + subscription + " aa.")) .doOnComplete(() -> System.out.println("流 正常結束")) .doOnTerminate(() -> System.out.println("流 異常結束 ")); flux.subscribe(System.out::println); } ``` * **正常完成**: 1. 流 被訂閱(`doOnSubscribe`) 2. 流 doOnRequest(`doOnRequest`) 3. 流 doOnNext 1(`doOnNext`) 4. 流 doOnEach get doOnNext:1(`doOnEach`) 5. 1(`subscribe`) 6. **持續3~5步驟** 7. 流 doOnEach get doOnComplete(`doOnEach`) 8. 流 正常結束(`doOnComplete`) 9. 流 異常結束(`doOnTerminate`) ### 3.1.3 響應式流日誌 * log():打印流的完整日誌 * 代碼: ```java= public static void fluxLog() throws IOException, InterruptedException { Flux.range(1, 7) .log()//日誌 .filter(integer -> integer > 3) .log()//日誌 .map(i -> "haha " + i) .log()//日誌 .subscribe(System.out::println); } ``` * 輸出: ```txt= [ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscriptionConditional) [ INFO] (main) | onSubscribe([Fuseable] FluxFilterFuseable.FilterFuseableSubscriber) [ INFO] (main) | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber) [ INFO] (main) | request(unbounded) [ INFO] (main) | request(unbounded) [ INFO] (main) | request(unbounded) [ INFO] (main) | onNext(1) [ INFO] (main) | onNext(2) [ INFO] (main) | onNext(3) [ INFO] (main) | onNext(4) [ INFO] (main) | onNext(4) [ INFO] (main) | onNext(haha 4) haha 4 [ INFO] (main) | onNext(5) [ INFO] (main) | onNext(5) [ INFO] (main) | onNext(haha 5) haha 5 [ INFO] (main) | onNext(6) [ INFO] (main) | onNext(6) [ INFO] (main) | onNext(haha 6) haha 6 [ INFO] (main) | onNext(7) [ INFO] (main) | onNext(7) [ INFO] (main) | onNext(haha 7) haha 7 [ INFO] (main) | onComplete() [ INFO] (main) | onComplete() [ INFO] (main) | onComplete() ``` ## 3.2 Reactor 核心 * 序列 * 線程調度 * 異常 * 操作符 ### 3.2.1 訂閱 subscribe * 使用**subscribe()**,傳入消費者的回調 * 常用三種subscribe: ```java= public static void fluxSubscribe() throws IOException, InterruptedException { Flux<Integer> range = Flux.range(1, 12).map(i -> { if (i == 9) { i = 10 / (9 - i); } return i; }); //基本訂閱:元素消費者 range.subscribe(System.out::println); //指定訂閱規則:元素消費者,異常結束消費者 range.subscribe( System.out::println,//元素消費者 throwable -> System.out.println(throwable)//異常結束消費者 ); //指定訂閱規則:元素消費者,異常結束消費者,正常結束時消費者 range.subscribe( System.out::println,//元素消費者 throwable -> System.out.println(throwable),//異常結束消費者 ()-> System.out.println("End")//正常結束消費者 ); } ``` ### 3.2.2 自定義消費者 * **流的生命週期的鉤子可以傳播給訂閱者** | 類型 | 發生時機 | 用途 | 常見場景 | | ----------------------------- | --------------------------- | --------------------------------------- | ----------------------------------------- | | **`doOnXxx`**(Publisher階段) | 📍**事件即將發生時**(流仍繼續往下走) | **Side-effect(副作用)通知** — 用來「觀察」事件但不改變資料 | 記錄 log、統計、監控、debug | | **`onXxx`**(中間操作) | ⚙️**事件發生後立即執行動作**(通常改變流或資料) | **主動操作/轉換行為** | 改寫資料流內容,如 `map`、`filter`、`onErrorResume` | | **`hookOnXxx`**(Subscriber階段) | ✅**事件完成後(下游消費者收到)** | **最終回調**,你可以拿到結果、狀態、錯誤等資訊 | 在 `BaseSubscriber` 裡覆寫,用來控制背壓、監聽完成事件、錯誤處理 | * 繼承**BaseSubscriber\<T>類**(`推薦使用`) * 重寫如下方法 ```java= //流的生命週期的鉤子可以傳播給訂閱者 //doOnXxx:發生這個事件的時候,產生一個回調,通知你 //OnXxx:發生這個事件後,執行一個動作 //hockOnXxx:執行完這個事件後,產生一個回調,通知你 public static void fluxConsumer() { Flux<Integer> flux = Flux.range(1, 12) //定義在發佈者身上 .doOnNext((i) -> System.out.println("doOnNext " + i)); //自定一個消費者 flux.subscribe(new BaseSubscriber<Integer>() { //生命週期鉤子1:訂閱關係 綁定時觸發 @Override protected void hookOnSubscribe(Subscription subscription) { //流被訂閱時觸發 System.out.println("綁定了 " + subscription); //找生產要數據 request(1);//要一個數據 // requestUnbounded();//要無限數據 } //生命週期鉤子2:訂閱關係 綁定時觸發 //每個到達元素都會觸發 @Override protected void hookOnNext(Integer value) { System.out.println("數據到達時觸發 " + value); if (value == 5) { cancel();//取消 流 } request(1);//請求數據一次 } @Override protected void hookOnComplete() { System.out.println("流正常結束"); } @Override protected void hookOnError(Throwable throwable) { System.out.println("流異常結束 ,error = " + throwable); } @Override protected void hookOnCancel() { System.out.println("流被取消時執行"); } @Override protected void hookFinally(SignalType type) { System.out.println("最終回調,一定會執行"); } @Override public Context currentContext() { return super.currentContext(); } }); } ``` * BaseSubscriber中的方法: * cancel():取消流 * request():請求數據 * request(1):請求數據一次 * requestUnbounded():請求所有元素 ### 3.2.3 背壓(Backpressure) * 訂閱者向發佈者**請求數據** * 訂閱者**沒有**向發佈者請求數據,發佈者發佈數據,`該數據會存在發佈者的緩衝區中,不會觸發任何線程進行阻塞`,除非訂閱者向發佈者請求數據 ### 3.2.4 請求重塑(Reshape Requests) - 批次處理 * **buffer()**:給數據加一個緩衝區(`緩沖`),**一次最多只能消費?數據** * 消費者可以**批次處理** * **代碼**: ```java= public static void buffer() { Flux<List<Integer>> flux = Flux.range(1, 10)//原始流是10個數據 .buffer(3);//緩沖區:緩沖設定為3,消費一次最多可以獲取3個數據 //消費者共請求4次 => 10/3 => 3 + 1 flux.subscribe(System.out::println); } ``` * **輸出結果** ``` [1, 2, 3] [4, 5, 6] [7, 8, 9] [10] ``` * **limitRate()**:限制**請求數據**(`請求頻率`、`限流`) * 限制每次從上游生產者請求的資料量來**防止資料生產速度過快**,以**避免消費者處理不過來** * **75%策略**:第一次抓取30個數據,如果75%的元素已經處理好了,繼續抓取新的75%元素 * 代碼: ```java= public static void limitRate(){ Flux<Integer> flux = Flux.range(1, 100) .log()//限流觸發,看上游是怎麼限流取得資料的 .limitRate(30);//一次預取30個元素,另外有75%策略 //75%策略 //第一次抓取30個數據,如果75%的元素已經處理好了,繼續抓取新的75%元素 flux.subscribe(System.out::println); } ``` ### 3.2.5 創建序列-Sink 動態地按需生成數據 * **作用**:**動態地按需生成數據**,而不是一開始就有一組數據 * **一開始就有一組數據**: * `Flux.rang()` * `Flux.just()` * **Sink**: * Sink.**next**(元素):**發送數據** * Sink.**complete**():**流完成** * **generate()**:同步調用的情況下創建**序列**(`流`) * 參數1:設定初始值 * 參數2:Consumer<SynchronousSink\<T>> generator * Sink: 接收器、水槽、通道 * **代碼**: ```java= //編程方式創建序列 public static void seqGenerat() { //參數1:設定初始值 //參數2:Consumer<SynchronousSink<T>> generator Flux<Object> generate = Flux.generate( () -> 0,//設定原始數據 => 建議使用AtomicLong 傳遞原子操作,多線程安全 //基於狀態(state)的generate (state, sink) -> { sink.next("ha " + state);//傳遞數據:可能會拋出『運行時異常、編譯時異常』 if (state > 9) { sink.complete();//傳遞數據完成 } if (state == 7) { sink.error(new Exception("WEEOR AA")); } return state + 1;//返回新的迭代state值 }) .log(); generate.subscribe(); } ``` * **create()**:**非同步調用**(`異步、多線程`)的情況下創建序列 * 代碼: ```java= public void seqCreate() throws InterruptedException { Flux<Object> flux = Flux.create(fluxSink -> { MyListener myListener = new MyListener(fluxSink); for (int i = 0; i < 10; i++) { myListener.online("aw " + i); } }).log(); flux.subscribe(); } class MyListener { FluxSink<Object> sink; public MyListener(FluxSink<Object> sink) { this.sink = sink; } //用戶登陸,觸發online監聽 public void online(String userName) { System.out.println("login " + userName); //sink:發出數據 sink.next("login " + userName); } } ``` ### 3.2.6 自定義元素處理 * **handle()**:複雜的邏輯處理,可以**轉換成多種不同類型的元素** => **filter + map** * **條件處理** * 自定義流中元素的處理規則 * **相似的map()**:是將一個類型轉換成另一個類型 * **轉換操作** * **代碼**: ```java= public void handler() { Flux<Object> handle = Flux.range(1, 4) .handle((value, sink) -> { if (value % 2 == 0) { sink.next(value * 2); } else { sink.next("a" + value);//向下發送數據的通道 } }) .log(); handle.subscribe(); } ``` * 日誌: ```txt= [ INFO] (main) | onSubscribe([Fuseable] FluxHandleFuseable.HandleFuseableSubscriber) [ INFO] (main) | request(unbounded) [ INFO] (main) | onNext(a1) [ INFO] (main) | onNext(4) [ INFO] (main) | onNext(a3) [ INFO] (main) | onNext(8) [ INFO] (main) | onComplete() ``` ### 3.2.7 自定義線程調度規則 * 響應式編程:全異步、消息、事件回調 * **『不指定線程池,默認使用發佈者的線程』**(`因為訂閱流才會動`) * **流的發佈**、**中間操作**,默認使用**當前線程**(`訂閱者線程`) * **目的**:**改變**發佈者、訂閱者**所在的線程池**,進而使執行的線程有所不同 * **方法**: * **publishOn()**:改變**發佈者**的所在的線程池 * **subscribeOn()**:改變**訂閱者**的所在的線程池 * **參數**:Scheduler類 * 使用**Schedulers類**創建(`設定線程池`) * **重點**:方法執行後的中間操作,才會開始改用新的線程池中的線程 * **設定其中一個就可以**,publishOn 或 subscribeOn * 常用**Schedulers類**:**調度器**(`線程池`) * Schedulers.immediate(): * **默認** * 無執行上下文,**當前線程運行所有操作** * Schedulers.single():使用固定的一個單線程(`固定使用該線程池中某一個線程`) * Schedulers.boundedElastic():有界(`線程池有線程數限制`),彈性調度線程 * 默認線程數:10 \* CPU核心 * 默認隊列:100K * 作用:管理和存儲待處理的任務 * Schedulers.parallel():**並發池** * **全局共享** * **基本使用**: ```java= public void scheduler() { Flux.range(1, 10) //改變發佈者的所在的線程池 .publishOn(Schedulers.parallel())//在哪個線程池把這個流的數據和操作執行 //改變訂閱者的所在的線程池 .subscribeOn(Schedulers.immediate())//在哪個線程池把這個流的讀取執行 .subscribe(); } ``` * **代碼2**:**主線程訂閱流** ```java= public void scheduler() { Flux.range(1, 3) .log() .publishOn(Schedulers.single())//在哪個線程池把這個流的數據和操作執行 .map(i -> i * i) .log() .subscribe(); } ``` * **map處理是改用其他線程池** * **log**: ```txt= [ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) [ INFO] (main) | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber) [ INFO] (main) | request(unbounded) [ INFO] (main) | request(256) [ INFO] (main) | onNext(1) [ INFO] (main) | onNext(2) [ INFO] (single-1) | onNext(1) [ INFO] (main) | onNext(3) [ INFO] (single-1) | onNext(4) [ INFO] (single-1) | onNext(9) [ INFO] (main) | onComplete() [ INFO] (single-1) | onComplete() ``` ## 3.3 常用操作 * **是一個發佈者也是一個訂閱者** * 訂閱上一個流 ### 3.3.1 filter - 過濾 * **代碼**: ```java= /** * onSubscribe([Fuseable] FluxFilterFuseable.FilterFuseableSubscriber) => 流被訂閱(filter中間操作被訂閱) * request(unbounded) => 請求無線數據(requestUnbounded()) * onNext()數據到達 */ @Test void filter() { Flux.range(1, 4)//流的發佈者 .log()//顯示過濾前的數據 .filter(i -> i % 2 == 0) // .log()//顯示過濾後的數據 .subscribe(); } ``` * subscribe()是訂閱**filter這個流** ### 3.3.2 flatMap - 扁平化 * **作用**:**扁平化** * **無序** * **原因**:有一個concurrency參數 * 默認:256,表示同時處理256件任務 * **說明**:將多個流,變成一個流 * **代碼**: ```java= //扁平化,無序 @Test void flatMap() { Flux<String> log = Flux.just("Aasd adds", "uljk lkjl") //將多個流,變成一個流 .flatMap(s -> Flux.fromArray(s.split(" ")) )//將Array變成一個流的Flux靜態方法 .log(); log.subscribe(); } ``` ### 3.3.3 concat - 連接 * **concat()**:連接多個流 * 靜態方法 * **concatWith()**:連接老流 * 注意:兩個流類型需要一致 * **concatMap()**:**連接映射** * 一個元素可以變成多個,類型不受限制 * **按順序的異步操作**(`數據庫查詢、網絡請求、文件讀取`),速度較慢,需要等待前一個元素處理完才換下一個,**如果不用按順序處理可以改用flatMap** #### 3.3.3.1 concat() * **代碼**: ```java= @Test void concat() { Flux.concat(Flux.range(1, 4), Flux.just("As", "asd")) .log() .subscribe(); } ``` #### 3.3.3.2 concatWith() * **代碼**: ```java= //流連接(中間操作),運行中 連接一個新流 //限制:類型需要與舊流的元素類型相同 @Test void concatWith() { Flux.range(1, 2) .map(i -> i * i) .concatWith(Flux.just(4, 5)) .log() .subscribe(); } ``` #### 3.3.3.3 concatMap() * **代碼**: ```java= //連接映射:一個元素可以變成單個或多個元素,每個元素按順序完後,彙整為一個流 //按順序的異步操作(數據庫查詢、網絡請求、文件讀取),速度較慢,需要等待前一個元素處理完才換下一個,如果不用按順序處理可以改用flatMap @Test void concatMap() { Flux.just(1, 2) //使用一個Flux或Mono包裝,Function接口 .concatMap(s -> Flux.just(s + "-a", s + "-b")) .log() .subscribe(); } ``` * **log()** * onNext(1-a) * onNext(1-b) * onNext(2-a) * onNext(2-b) ### 3.3.4 transform - 狀態轉換 * **作用**:**把流變成新數據** * **transform()**:**無狀態轉換** * 訂閱流 `不會共享 外部變量的值` * 無論多少訂閱者,transform**只執行一次** * **transformDeferred()**:**有狀態轉換** * 訂閱流 `會共享 外部變量的值` * 無論多少訂閱者,**每一個訂閱者**transform**都會執行一次** #### 3.3.4.1 transform() - 無狀態轉換 * **代碼**: ```java= @Test void transform() { //原子 AtomicInteger atomicInteger = new AtomicInteger(); Flux<String> transform = Flux.just("s", "b") .transform(value -> { if (atomicInteger.incrementAndGet() == 1) { //第一次調用 return value.map(String::toUpperCase); } else { //不是第一次調用 return value; } }); transform.subscribe(v -> System.out.println("訂閱者1 " + v)); transform.subscribe(v -> System.out.println("訂閱者2 " + v)); } ``` * **輸出**: * 訂閱者1 S * 訂閱者1 B * 訂閱者2 S * 訂閱者2 B #### 3.3.4.2 transformDeferred() - 有狀態轉換 * **代碼**: ```java= @Test void transformDeferred() { //原子 AtomicInteger atomicInteger = new AtomicInteger(); Flux<String> transform = Flux.just("s", "b") .transformDeferred(value -> { if (atomicInteger.incrementAndGet() == 1) { //第一次調用 return value.map(String::toUpperCase); } else { //不是第一次調用 return value; } }); transform.subscribe(v -> System.out.println("訂閱者1 " + v)); transform.subscribe(v -> System.out.println("訂閱者2 " + v)); } ``` * **輸出**: * 訂閱者1 S * 訂閱者1 B * 訂閱者2 s * 訂閱者2 b ### 3.3.5 empty - 流為空處理 * **defaultIfEmpty()**:**靜態**兜底調用 * 如果發佈者數據為空使用默認值,否則使用發佈者的值 * **switchIfEmpty()**:空轉換,調用**動態**兜底調用 * 如果發佈者數據為空使用默認值,否則使用發佈者的值 * **流為空說明**: * **Mono.just(null)**:流裡面**有一個null值的元素** * **Mono.empty()**:流裡面**沒有的元素**,只有完成信號 #### 3.3.5.1 defaultIfEmpty() * **代碼**: ```java= @Test void empty() { getMono() .defaultIfEmpty("X")//如果發佈者數據為空使用默認值,否則使用發佈者的值 .log() .subscribe(); } ``` #### 3.3.5.2 switchIfEmpty() * **代碼**: ```java= @Test void switchIfEmpty() { getMono() .switchIfEmpty(Mono.just("AB"))//如果發佈者數據為空使用默認值,否則使用發佈者的值 .log() .subscribe(); } ``` ### 3.3.6 merge - 合併 * **作用**:**合併流** * **merge()**:靜態調用 * A流元素、B流元素,**按元素產生時間順序排列**(`與流的順序無關`) * **mergeWith()**:方法調用 * **mergeSequential()**:靜態調用 * **按流順序**,排列元素(`與concat相似`) #### 3.3.6.1 merge() * **代碼**: ```java= @Test void merge() throws IOException { //concat:連接,A流元素+B流元素 //merge:合併,A流元素、B流元素 按產生時間順序排列 Flux.merge( Flux.just(1, 2, 4).delayElements(Duration.ofSeconds(1L)),//每1秒發一個元素 Flux.just("Sd", "kkk").delayElements(Duration.ofMillis(1500L)), //每1.5秒發一個元素 Flux.just("sasd", "l;lj").delayElements(Duration.ofMillis(800L))//每0.8秒發一個元素 ) .log().subscribe(); //delayElements:為異步發送,需要主線程不要結束 System.in.read(); } ``` * **log()** 1. onNext(sasd) 2. onNext(1) 3. onNext(Sd) 4. onNext(l;lj) 5. onNext(2) 6. onNext(kkk) 7. onNext(4) #### 3.3.6.2 mergeSequential() * **代碼**: ```java= @Test void mergeSequential() throws IOException { //按流順序,排列元素(與concat相似) Flux.mergeSequential( Flux.just(1, 2, 4).delayElements(Duration.ofSeconds(1L)), Flux.just("Sd", "kkk").delayElements(Duration.ofMillis(1500L)), Flux.just("sasd", "l;lj").delayElements(Duration.ofMillis(800L)) ) .log() .subscribe(); System.in.read(); } ``` * **log()**: * onNext(1) * onNext(2) * onNext(4) * onNext(Sd) * onNext(kkk) * onNext(sasd) * onNext(l;lj) * onComplete() ### 3.3.7 zip - 壓縮 * **作用**:將**多個流**(`上限壓縮8個流`),**一ㄧ對應**的元素變成**多個元組**(`Tuple`) * **注意**:**無法結對**的元素會`被忽略` * **zip()**:靜態調用 * **zipWith()**:方法調用 #### 3.3.7.1 zip() * **代碼**:壓縮兩個流 ```java= @Test void zip() { //靜態調用 Flux.zip(Flux.just(1, 2, 4), Flux.just("A", "B", "C")) .log() .subscribe(); } ``` * **log()** * onNext(\[1,A]) * onNext(\[2,B]) * onNext(\[4,C]) #### 3.3.7.2 zipWith() * **代碼**:壓縮兩個流 ```java= @Test void zipWith() { Flux.just(1, 2, 3, 0) .zipWith(Flux.just(4, 5, 6)) .log() .subscribe(); } ``` * **log()** * onNext(\[1,4]) * onNext(\[2,5]) * onNext(\[3,6]) ## 3.4 錯誤處理 * **概念**: * 錯誤**默認**是一個**中斷行為** * **subscribe**:消費者可以感知 正常元素try 和 流發生的錯誤catch * **方法**: * **onErrorReturn()**:返回默認值 * **onErrorResume()**:調用一個有返回值的方法 * **onErrorMap()**:拋出一個**非業務邏輯**的錯誤(`已處理原本錯誤後拋出自定錯誤`) * **doOnError()**:發佈者感知**錯誤** * **doFinaly()**:發佈者感知**流結束**(`不論正常或異常結束`) * **onErrorContinue()**:忽略當前異常,**僅通知紀錄**(`該異常可以做一些處理`),繼續推進 * **onErrorComplete()**:將錯誤結束信號,替換成正常結束信號 * **onErrorStop()**:在流中遇到錯誤時**停止流**,**不會進行任何其他處理**(`包括重試或繼續處理流中的後續數據`) ### 3.4.1 Catch and return a static default value - 捕獲異常返回一個靜態默認值 * 命令式編程: ```java= try { return doSomethingDangerous(10); } catch (Throwable error) { return "RECOVERED"; } ``` * 響應式編程:onErrorReturn() ```java= Flux.just(10) .map(this::doSomethingDangerous) .onErrorReturn("RECOVERED"); ``` * **onErrorReturn()** * 發佈者處理掉異常,消費者不會感知到異常 * **返回默認值** * 正常完成 * **代碼**: ```java= @Test void error() { Flux.just(1, 2, 0, 4) .map(i -> "100 / " + i + " = " + (100 / i)) .onErrorReturn("ha") //.onErrorReturn(ArithmeticException.class, "ASDSD") //方式二:斷言錯誤類型 .subscribe(System.out::println, error -> System.out.println("error: " + error.getMessage()), () -> System.out.println("流 結束")//感知正常結束 ); } ``` * **log()**: 1. 100 / 1 = 100 2. 100 / 2 = 50 3. `ASDSD` 4. 流 結束 ### 3.4.2 Catch and execute an alternative path with a fallback method. - 捕獲異常並執行一個回退方法 * 命令式編程: ```java= String v1; try { v1 = callExternalService("key1"); } catch (Throwable error) { v1 = getFromCache("key1"); } ``` * 響應式編程:**onErrorResume()** ```java= Flux.just("key1", "key2") .flatMap(k -> callExternalService(k) .onErrorResume(e -> getFromCache(k)) ); ``` * onErrorResume() * 發佈者處理掉異常,消費者不會感知到異常 * **調用一個方法** * 正常完成 * **代碼**: ```java= @Test void onErrorResume() { Flux.just(1, 2, 0, 4) .map(i -> "100 / " + i + " = " + (100 / i)) .onErrorResume(err -> Mono.just("ja")) .subscribe(System.out::println, error -> System.out.println("error: " + error.getMessage()), () -> System.out.println("流 結束")//感知正常結束 ); } ``` * **log()** 1. 100 / 1 = 100 2. 100 / 2 = 50 3. `ja` 4. 流 結束 ### 3.5.3 Catch and dynamically compute a fallback value. - 捕獲異常並動態計算返回值 * 命令式編程: ```java= try { Value v = erroringMethod(); return MyWrapper.fromValue(v); } catch (Throwable error) { return MyWrapper.fromError(error); } ``` * **響應式編程**:**onErrorResume()** ```java= erroringFlux.onErrorResume(error -> Mono.just( MyWrapper.fromError(error) )); ``` * 代碼: ```java= @Test void onErrorResume() { Flux.just(1, 2, 0, 4) .map(i -> "100 / " + i + " = " + (100 / i)) .onErrorResume(throwable -> processError(throwable)) .subscribe(System.out::println, error -> System.out.println("error: " + error.getMessage()), () -> System.out.println("流 結束")//感知正常結束 ); } Mono<String> processError(Throwable throwable) { if (throwable instanceof ArithmeticException) { return Mono.just("Zero"); } if (throwable instanceof NullPointerException) { return Mono.just("Null"); } return Mono.just("HAHA"); } ``` * **log()**: 1. 100 / 1 = 100 2. 100 / 2 = 50 3. `Zero` 4. 流 結束 ### 3.5.4 Catch, wrap to a BusinessException, and re-throw. - 捕獲異常後,包裝成一個業務異常並拋出 #### 3.5.4.1 onErrorResume() * **包裝重新拋出異常**: * 發佈者處理掉異常,**訂閱者有感知** * **拋出新異常** 或 **Mon.empty() 吃掉異常** * 流**異常結束** * 命令式編程: ```java= try { return callExternalService(k); } catch (Throwable error) { throw new BusinessException("oops, SLA exceeded", error); } ``` * **響應式編程1**:**onErrorResume()** ```java= Flux.just("timeout1") .flatMap(k -> callExternalService(k)) .onErrorResume(original -> Flux.error( new BusinessException("oops, SLA exceeded", original)) ); ``` * **代碼1**: ```java= @Test void catchAndThrow() { Flux.just(1, 2, 0, 4) .map(i -> "100 / " + i + " = " + (100 / i)) .onErrorResume(throwable -> Flux.error(new BusinessException(throwable.getMessage() + " = Bone"))) .subscribe(System.out::println, error -> System.out.println("error: " + error.getMessage()), () -> System.out.println("流 結束")//感知正常結束 ); } ``` * **log()**: 1. 100 / 1 = 100 2. 100 / 2 = 50 3. `error: / by zero = Bone` #### 3.5.4.2 onErrorMap() * **響應式編程2**:**onErrorMap()** ```java= Flux.just("timeout1") .flatMap(k -> callExternalService(k)) .onErrorMap(original -> new BusinessException("oops, SLA exceeded", original)); ``` * **onErrorMap()**:將一個異常**映射**成另一個異常 * **代碼2**: ```java= @Test void onErrorMap() { Flux.just(1, 2, 0, 4) .map(i -> "100 / " + i + " = " + (100 / i)) .onErrorMap(throwable -> new BusinessException(throwable.getMessage())) .subscribe(System.out::println, error -> System.out.println("error: " + error.getMessage()), () -> System.out.println("流 結束")//感知正常結束 ); } ``` ### 3.5.5 Catch, log an error-specific message, and re-throw. - 捕獲異常,紀錄特殊的錯誤日誌,再拋出重新異常 * 捕獲異常,並執行動作,**且不影像異常繼續順者流水線傳播** * **doOnError()**: * 發佈者不處理掉異常,**只在異常發生的時候做一件事**(`發佈者感知`),**消費者有感知** * 命令式編程: ```java= try { return callExternalService(k); } catch (RuntimeException error) { //make a record of the error log("uh oh, falling back, service failed for key " + k); throw error; } ``` * **響應式編程**:**doOnError()**(`事件感知`) ```java= LongAdder failureStat = new LongAdder(); Flux<String> flux = Flux.just("unknown") .flatMap(k -> callExternalService(k) .doOnError(e -> { failureStat.increment(); log("uh oh, falling back, service failed for key " + k); }) ); ``` * **代碼**: ```java= @Test void doOnError() { Flux.just(1, 2, 0, 4) .map(i -> "100 / " + i + " = " + (100 / i)) .doOnError(throwable -> { System.out.println("doOnError: " + throwable.getMessage()); }) .subscribe(System.out::println, error -> System.out.println("error: " + error.getMessage()), () -> System.out.println("流 結束")//感知正常結束 ); } ``` * **log()**: 1. 100 / 1 = 100 2. 100 / 2 = 50 3. doOnError: / by zero 4. error: / by zero ### 3.5.6 Use the finally block to clean up resources or a Java 7 "try-with-resource" construct - 執行finally * **doFinaly()**: * 發佈者**感知流完成**時(`不論是否中斷或正常結束都會執行`),執行操作 * 命令式編程: * 例1: ```java= Stats stats = new Stats(); stats.startTimer(); try { doSomethingDangerous(); } finally { stats.stopTimerAndRecordTiming(); } ``` * 例2: ```java= try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) { return disposableInstance.toString(); } ``` * **響應式編程**:**doFinaly()**(`事件感知`) ```java= Stats stats = new Stats(); LongAdder statsCancel = new LongAdder(); Flux<String> flux = Flux.just("foo", "bar") .doOnSubscribe(s -> stats.startTimer()) .doFinally(type -> { stats.stopTimerAndRecordTiming(); if (type == SignalType.CANCEL) statsCancel.increment(); }) .take(1); ``` * **代碼**:**流中斷結束** ```java= @Test void doOnError() { Flux.just(1, 2, 0, 4) .map(i -> "100 / " + i + " = " + (100 / i)) .doOnError(throwable -> { System.out.println("doOnError: " + throwable.getMessage()); }) .doFinally(signalType -> System.out.println("流完: " + signalType)) .subscribe(System.out::println, error -> System.out.println("error: " + error.getMessage()), () -> System.out.println("流 結束")//感知正常結束 ); } ``` * **log()**: 1. 100 / 1 = 100 2. 100 / 2 = 50 3. doOnError: / by zero 4. error: / by zero 5. `流完: onError` ### 3.5.7 忽略當前異常,僅通知紀錄,繼續推進 * **onErrorContinue()**: * 發佈者**處理掉異常**,消費者不會感知 * **代碼**: ```java= @Test void jumpError() { Flux.just(1, 2, 0, 4) .map(i -> 10 / i) //發生錯誤,繼續前進 .onErrorContinue((throwable, val) -> { System.out.println(throwable); System.out.println("val = " + val); } ) .subscribe(v -> System.out.println("v = " + v), error -> System.out.println("e = " + error) ); } ``` * **log()**: 1. v = 10 2. v = 5 3. `java.lang.ArithmeticException: / by zero` 4. `val = 0` 5. v = 2 ### 3.5.8 結束處理 * **onErrorComplete()**:將錯誤結束信號,替換成正常結束信號 ```java= @Test void onErrorComplete() { Flux.just(1, 2, 0, 4) .map(i -> 10 / i) .onErrorComplete()//將錯誤結束信號,替換成正常結束信號 .subscribe(System.out::println, error -> System.out.println("error: " + error.getMessage()), () -> System.out.println("流 結束")//感知正常結束 ); } ``` * **log()**: 1. 10 2. 5 3. `流 結束` * **onErrorStop()**:在流中遇到錯誤時**停止流**,**不會進行任何其他處理**(`包括重試或繼續處理流中的後續數據`) ```java= @Test void onErrorStop() { Flux.just(1, 2, 0, 4) .map(i -> 10 / i) .onErrorStop()//錯誤後,停止流,所有監聽者全部結束 .subscribe(System.out::println, error -> System.out.println("error: " + error.getMessage()), () -> System.out.println("流 結束")//感知正常結束 ); } ``` * **log()**: 1. 10 2. 5 3. `error: / by zero` ## 3.6 重試、超時 * **retry()**:把流**從頭到尾**從新請求 * **timeout()**:設定**超時的時間** * 超時拋出`java.util.concurrent.TimeoutException ` * **代碼**:使用**超時方法**(`timeout()`)模擬,重試設定**ㄧ次** ```java= @Test void retryAndTimeOut() throws IOException { Flux.just(1, 2, 3) .log() .delayElements(Duration.ofSeconds(3)) .log() .timeout(Duration.ofSeconds(2))//獲取元素超時設定 .retry(1)//把流從頭到尾從新請求 .map(i -> i + " ha") .subscribe(); } ``` * log() 1. 第一次 1. onSubscribe(\[Synchronous Fuseable] FluxArray.ArraySubscription) 2. onSubscribe(FluxConcatMapNoPrefetch.FluxConcatMapNoPrefetchSubscriber) 3. request(unbounded) 4. request(1) 5. onNext(1) 6. cancel() 7. cancel() 2. **重試**:將整個流**從頭訂閱** 1. onSubscribe(\[Synchronous Fuseable] FluxArray.ArraySubscription) 2. onSubscribe(FluxConcatMapNoPrefetch.FluxConcatMapNoPrefetchSubscriber) 3. request(unbounded) 4. request(1) 5. onNext(1) 6. cancel() 7. cancel() 4. 超時錯誤拋出 ## 3.7 Sinks 工具類 - 數據管道 * **概念**:發送數據的**管道** * 所有數據順著這個管道往下走 * 訂閱者默認**從訂閱的當下開始接收據** * **onBackpressureBuffer()**:背壓(Backpressure)處理操作符,設定緩衝區大小 * **作用**:將溢出的數據存放到一個緩衝區中,讓**消費者在有空時再處理**這些數據。它確保數據不會丟失,但**需要有足夠的內存來存放這些數據**。 ### 3.7.1 單播(單一訂閱者) * **概念**:該**數據管道**(`Sinks`)只能綁定單個訂閱者 * **多個訂閱者的錯誤訊息**: `java.lang.IllegalStateException: Sinks.many().unicast() sinks only allow a single Subscriber` * **方法**:**Sinks.many().unicast()** * **代碼**: ```java= @Test void sink() throws InterruptedException, IOException { //從訂閱時,才會開始消費 //1. 建立數據管道 Sinks.Many<Object> many = Sinks.many() .unicast()//單播 .onBackpressureBuffer(new LinkedBlockingDeque<>(5));//背壓對列,該隊列最多存放(緩存)5個元素 new Thread(() -> { for (int i = 0; i < 5; i++) { //2. 發送數據 many.tryEmitNext("a-" + i); try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } } }).start(); //3. 訂閱數據 many.asFlux().subscribe(System.out::println); System.in.read(); } ``` ### 3.7.2 多播(無限訂閱者) * **概念**:該**數據管道**(`Sinks`)綁頂**多個**訂閱者 * **方法**:**Sinks.many().multicast()** * **代碼**: ```java= @Test void sinkMulticast() throws InterruptedException, IOException { //從訂閱時,才會開始消費 Sinks.Many<Object> many = Sinks.many() .multicast()//多播 .onBackpressureBuffer(); new Thread(() -> { for (int i = 0; i < 5; i++) { //發送數據 many.tryEmitNext("a-" + i); try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } } }).start(); many.asFlux().subscribe(v -> System.out.println("v1 = " + v)); Thread.sleep(1500); //延後訂閱 many.asFlux().subscribe(v -> System.out.println("v2 = " + v)); System.in.read(); } ``` * **log()**: 1. v1 = a-0 2. v1 = a-1 3. v1 = a-2 4. v2 = a-2 5. v1 = a-3 6. v2 = a-3 7. v1 = a-4 8. v2 = a-4 * **問題**:缺失之前未訂閱時,所產生的數據? * 默認情況 ### 3.7.3 數據重放 * **概念**:這個管道能**重放元素**,**後來的訂閱者**也會`接收到未訂閱前的元素` * **使用一組隊列進行數據緩存**,來達到重放 * **方法**:**Sinks.many().replay();** * **limit()**:**限制重放元素** * **代碼**:重放**1個**元素(`訂閱當下的前一個元素`) ```java= @Test void sinksReplay() throws InterruptedException, IOException { //重放,這個管道能重放元素,後來的訂閱者也會接收到未訂閱前的元素 Sinks.Many<Object> replay = Sinks.many().replay() .limit(1);//重放三個元素 new Thread(() -> { for (int i = 0; i < 5; i++) { //發送數據 replay.tryEmitNext("a-" + i); try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } } }).start(); replay.asFlux().subscribe(v -> System.out.println("v1 = " + v)); Thread.sleep(3000); replay.asFlux().subscribe(v -> System.out.println("v2 = " + v)); System.in.read(); } ``` * **log()** * v1 = a-0 * v1 = a-1 * `v1 = a-2`(`重放`) * v1 = a-3 * `v2 = a-2`(`重放`) * v2 = a-3 * v1 = a-4 * v2 = a-4 ### 3.7.4 Flux的數據重放 * **cache()**:設定**緩存數據** * **默認下是緩存所有數據**(`不設定緩存大小`),但可能會**導致OOM** * **代碼**:緩存**三個**數據 ```java= @Test void noSinksReplay() throws InterruptedException { Flux<Integer> cache = Flux.range(1, 4) .delayElements(Duration.ofSeconds(1)) .cache(3);//緩存三個數據 cache.subscribe(v->System.out.println("V1 = "+v)); Thread.sleep(5000); cache.subscribe(v->System.out.println("V2 = "+v)); } ``` * **log()**: 1. V1 = 1 2. `V1 = 2` 3. `V1 = 3` 4. `V1 = 4` 5. V2 = 2 6. V2 = 3 7. V2 = 4 ## 3.8 阻塞API - block() * **概念**:**不使用subscribe下,從Flux獲取數據** * 只能從流中獲取**單個數據** * 如果是一個Flux的數據,將此Flux -> Mono後,再獲取數據 * **也是一種訂閱者**(`BlockingMonoSubscriber`) * **方法**: * **block()**:獲取數據 * **blockLast()**: 獲取流**最後一個數據** * **blockFirst()**:獲取流**第一個數據** * **代碼**:獲取**流**的**最後一個數據** ```java= @Test void blockLast() { Integer integer = Flux.just(1, 2, 4) .map(i -> i + 10) .blockLast(); System.out.println(integer); } ``` * 輸出:`14` * **代碼**:獲取**流**的**所有數據** ```java= @Test void blockList() { Mono<List<Integer>> mono = Flux.just(1, 2, 4) .map(i -> i + 10) .collectList(); List<Integer> block = mono.block();//也是一種訂閱者,BlockingMonoSubscriber System.out.println(block); } ``` * 輸出:`[11, 12, 14]` * collectList():收集流中的數據,變成單一個數據 ## 3.9 ParalleFlux 並發流 * **parallel()**:設定**並發數** * 與 **runOn()** 混合使用(`設定後續動作要運行的線程池`) * **代碼**:**百萬數據,分成8個線程,分批處理** ```java= @Test void paralleFlux() { //情景:百萬數據,分成8個線程,分批處理 Flux.range(1, 5000) .buffer(100)//Flux<List<Integer>>,緩衝區每次消費100個數據 .parallel(8)//設定8個線程並發處理,所以會有8次訂閱上面的流 .runOn(Schedulers.newParallel("YY"))//後續的操作運作在這一個新創建的線程池 .log() .subscribe(v -> System.out.println("v = " + v)); } ``` * **buffer(100)**:**緩衝區每次消費100個數據**(`批次處理`) ## 3.10 Context(上下文)API * **文檔**:https://projectreactor.io/docs/core/snapshot/reference/advancedFeatures/context.html#context.api * **解決**:資料**流期間共享數據**(`多線程處理下`) * **ThreadLocal**在響應式程式設計中無法使用。 * 作用:用來在多線程環境下為**每個線程維護各自的變量副本**(`一個ThreadLocal 變量的值在每個線程中都是獨立的,互不影響`) * **方法**: * **Context**:讀寫 * **ContextView**:只讀 * **注意**: * 上游能獲取下游**最近一次的數據** * **Context**為**不可變**(`解決多線程的數據競爭問題`) * **重點**:**變量是由下游傳遞到上游** * **響應式**:dao(`產生數據`) -> service -> controller * **變量**從下游**傳播**給上游,**上游不要知道數據條件**,**數據產生者**為`上游`,條件是由下游所擁有 * 代碼: ```java= @Test void threadLocal() { Flux.just(1, 2, 3) //支持Context的中間操作 .transformDeferredContextual((flux, contextView) -> flux .map(i -> i + " " + (String) contextView.get("prefix"))) //上游能獲取下游最近一次的數據 // .contextWrite(Context.of("a", "As")) .contextWrite(c -> c.put("prefix", "haha")) //在ThreadLocal中共享數據,上游的所有人能夠看到,Context是由下游傳播給上遊 .subscribe(System.out::println); //命令式 controller -> service -> dao //響應式 dao -> service -> controller == 所以是從下游傳播給上游,上游不要知道獲取數據條件,數據產生者為上游,條件是由下游所擁有 //Context不可變,解決多線程的數據競爭問題 } ``` * **contextWrite()**:創建context,兩種創建方式 ###### tags:`Reactive Stream`