---
title: '響應式編程-概念'
disqus: hackmd
---
# 1. 概念
* **Servlet Stack**:
* Spring MVC
* 同步、阻塞式架構(synchronous blocking I/O)
* follow 一個請求一條線程的模型(one-request-pre-thread model)
* **Reactive Stack**:
* Spring Webflux
* 非同步架構(non-blocking web framwork)
* 支持高並發、高吞吐量
* **比較**
| | Reactive Stack | Servlet Stack |
| -------- | -------- | -------- |
| 底層 |Netty, Servlet 3.1(開始支持異步) + Containers|Servlet Containers|
|底層規範|Reactive Stream Adapters|Servlet API|
|SpringBoot 套件對應|Spring Security Reactive|Spring Security|
| |Spring Webflux|Spring MVC|
| |Spring Data Reactive Repostiories|Spring DataRepostiories|
* **常識**:
* **高並發**:緩存(緩沖區)+異步+排序機制
* **高可用**:分片(數據如何分佈)+複製(數據複製機制,並免數據遺失)+選領導
* **非阻塞圖**:

* **Netty概念圖**:『`少量線程持續運行` **優於** `大量線程等待切換`』 (`非同步核心概念`)
## 1.1 Reactive Stream 規範
* 底層機制
* 發布訂閱
* 四大組件
## 1.2 Reactor 框架(響應式庫)
* Mono/Flux
* 流操作符
* 線程調度
* 錯誤處理
* 高級特性
## 1.3 Spring Webflux 響應式應用
* **說明**:**響應式的Web框架**
* **對標Spring MVC**
* **內容**:
* HttpServer(底層的響應式機制)
* DispatcherHandler原理
* SEE功能
* 新版Filter
* Spring MVC 組件對比
### 1.3.1 Spring Data R2DBC (Reactive Relational Database Connectivity)
* **說明**:針對關聯數據庫的**響應式 API 標準**
* **內容**:
* R2dbc 驅動
* DatabaseClient API
* 自定義Repository
* 自定義Converter
* 關聯查詢
* 最佳實踐
### 1.3.2 Spring Security Reactive
* **內容**:
* RBAC權限模型
* FilterChain配置
* ReactiveUserDetailsService
* PasswordEncoder
* 授權認證配置
# 2. Reactive Stream 規範
* **Reactive Stream **是JVM面向**響應式**(`流`)的庫的**標準**和**規範**
* 處理不限數量的元素
* 有序
* 組件之間**異步傳遞元素**
* 強制性**非阻塞**、**背壓模式**(`Backpressure`)
* **正向壓力**(`正壓`):數據的生產者給消費者壓力
* **背壓模式**:**數據消費者**告知**數據生產者**消費的速度,從而實現生產和消費的速度協調
* **非阻塞原理**:緩沖區+**回調**(`是一種在程式碼運行過程中將一個方法傳遞給另一個方法`)
* **目標**:通過**全異步的方式**,加**緩衝區**,構建一個`實時的數據流系統`
* **概念**:**Tomcat**發過來的請求,**SpringBoot**這邊會先進入緩衝區(`數據`),**處理的線程**持續的從**緩衝區中取出數據**(`消費數據`),所以Tomcat可以持續的**發送請求**(`產生數據`),不需等待結果返回
* Kafka、MQ能建構出**大型分散式的響應式系統**,但**缺少本地化的訊息系統解決方案**:
* 讓所有的異步線程能互相監聽訊息,處理訊息,建立即時訊息處理串流
* **核心**:**消息傳遞**(`響應式構建的依據`)
* 基於異步、消息事件的全事件回調系統(響應式系統)
* 圖:
* **總結**:
* **底層**:基於**數據緩沖隊列**+**消息驅動模型**+**異步回調機制**
* 編碼:流式編程+鏈式調用+聲明式API
* **效果**:**優雅全異步**+**消息實時處理**+**高吞吐量**+佔用少量資源
* **痛點**:
* **以前**:開發一個**高並發系統**:快取、非同步、排隊機制,**手動控制整個邏輯**
* **現在**:**全自動控制**整個邏輯(`天生高並發`)
## 2.1 Reactive Stream 規範-四大核心接口
* **java.util.concurrent.Flow類**:**共有四大函數接口**(`響應式使用`),構建出響應式系統
* **Processor\<T, R>**:處理器
* 同時實現**訂閱者**、**發佈者**的接口的組件(`是訂閱者也是發佈者`)
* 接收一個來自發佈者的數據,並進行處理,再發佈給下一個訂閱者
* 在Reactor中擔任中間環節,**表示一個處理階段**
* **Subscription**:訂閱關係
* **訂閱者**與**發佈者**之間的關鍵接口
* 訂閱者通過訂閱表示對發佈者所產生的數據有興趣,可以請求一定數量的元素
* 訂閱者也可以取消訂閱
* **Subscriber\<T>**:訂閱者
* 消費數據流
* **Publisher\<T>**:發佈者
* 產生數據流
* **管道圖**:

* **綁定操作**:就是發布者,記住了所有訂閱者都有誰,有數據後,給所有訂閱者把數據推送過去
* **鏈表的形式綁定了責任關係**
## 2.2 Reactive Stream 規範-發佈\訂閱數據
* **概念圖**:

* **代碼**:
```java=
public class FlowDemo {
public static void main(String[] args) throws InterruptedException {
//1. 定義一個發佈者:發佈數據
//1.1 使用已 實現的發佈者
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
//2. 定義一個訂閱者:訂閱數據
Flow.Subscriber<String> subscriber1 = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
//在訂閱時,onXxxx:表示Xxxx事件發生時,會執行這個回調
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println(Thread.currentThread().getName() + " - 訂閱開始: " + subscription);
this.subscription = subscription;
//訂閱建立後,立刻從上游請求一個數據
subscription.request(1);
}
//在下一個元素到達時,執行這個回調方法,接收新的數據
@Override
public void onNext(String s) {
System.out.println(Thread.currentThread().getName() + " - 接收數據: " + s);
if (s.equals("p-7")) {
//到達目標後就取消訂閱
subscription.cancel();
} else {
//每次消費完一個新元素後,再請求一個新的元素
subscription.request(1);
}
}
//在錯誤發生時(生產數據出現錯誤),執行這個回調方法
@Override
public void onError(Throwable throwable) {
System.out.println(Thread.currentThread().getName() + " - 接收到錯誤: " + throwable);
}
//在完成時
@Override
public void onComplete() {
System.out.println(Thread.currentThread().getName() + " - 接收到完成");
}
};
//3. 綁定發佈者與訂閱者
publisher.subscribe(subscriber1);
//4 發佈數據:發佈10數據
//
for (int i = 0; i < 100; i++) {
System.out.println(Thread.currentThread().getName());
if (i > 96) {
//異常中斷數據推送,訂閱者會回調onError(),目前不好處理,等到Reactor框架再處理
publisher.closeExceptionally(new Exception("asd"));
} else {
publisher.submit("p-" + i);
}
//發佈數據:是給某一個隊列存入數據 => 寫入緩衝區
}
//5. 發佈者關閉通道,訂閱者接受到關閉信號,會回調onComplete()
publisher.close();
Thread.sleep(1000);
}
}
```
## 2.3 Reactive Stream 規範-中間處理 Processor
* **Processor是『訂閱者』也是『發佈者』**
* **圖**:

* **代碼**:
```java=
public class FlowDemo2 {
//定義流中間操作,添加一個haha前綴
//一個訂閱者,也是一個發佈者
//只寫訂閱者的接口
static class Processor1 extends SubmissionPublisher<String> implements Flow.Processor<String, String> {
private Flow.Subscription subscription;//保存綁定關係
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("Proccess 訂閱:" + subscription);
this.subscription = subscription;
//要一個新數據
subscription.request(1);
}
@Override
public void onNext(String s) {
System.out.println("Proccess 拿到數據:" + s);
//加工
s += "haha";
//發送加工後數據
submit(s);//SubmissionPublisher類的方法
//要一個新數據
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("Proccess: " + Thread.currentThread().getName() + " - 接收到錯誤: " + throwable);
}
@Override
public void onComplete() {
System.out.println("Proccess: " + Thread.currentThread().getName() + " - 接收到完成");
}
}
public static void main(String[] args) throws InterruptedException {
//1. 定義一個發佈者:發佈數據
//1.1 使用已 實現的發佈者
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
//2. 定義中間操作:添加一個haha前綴
Processor1 processor1 = new Processor1();
//3. 定義一個訂閱者:訂閱數據
Flow.Subscriber<String> subscriber1 = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
//在訂閱時,onXxxx:表示Xxxx事件發生時,會執行這個回調
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println(Thread.currentThread().getName() + " - 訂閱開始: " + subscription);
this.subscription = subscription;
//訂閱建立後,立刻從上游請求一個數據
subscription.request(1);
}
//在下一個元素到達時,執行這個回調方法,接收新的數據
@Override
public void onNext(String s) {
System.out.println(Thread.currentThread().getName() + " - 接收數據: " + s);
//每次消費完一個新元素後,再請求一個新的元素
subscription.request(1);
}
//在錯誤發生時(生產數據出現錯誤),執行這個回調方法
@Override
public void onError(Throwable throwable) {
System.out.println(Thread.currentThread().getName() + " - 接收到錯誤: " + throwable);
}
//在完成時
@Override
public void onComplete() {
System.out.println(Thread.currentThread().getName() + " - 接收到完成");
}
};
//3.綁定
//3.1 發佈者綁定處理器
publisher.subscribe(processor1);
//3.2 處理器再綁定訂閱者
processor1.subscribe(subscriber1);
//4 發佈數據:發佈10數據
for (int i = 0; i < 10; i++) {
publisher.submit("p-" + i);
//發佈數據:是給某一個隊列存入數據 => 寫入緩衝區
}
//5. 發佈者關閉通道,訂閱者接受到關閉信號,會回調onComplete()
publisher.close();
Thread.sleep(1000);
}
}
```
# 3. Reactor 響應式框架
* **基本流程**:產生數據 => 中間操作 => 消費數據
* **概念**:
1. 數據流:數據源頭
2. 變化傳播:數據操作(中間操作)
3. 異步編程模式:底層控制異步
* 數據流:每個元素從流的源頭,開始源源不斷的,自己往下滾動
* onNext:當某個元素到達後,我們可以定義它的處理邏輯
* onError:異常結束,觸發事件
* onComplete:處理結束,觸發事件
* 命令式編程如下:響應式是將命令式編程的概念,轉換成響應式對應的方法
```java=
public void execute(List<Order> orderList){
//處理訂單
//異常處理
//正常返回
}
```
* **流**:**數據** + **信息**(`完成/異常`)
* 數據流**經過中間操作**後,會產生**新的數據流**
* 所有**操作符**(`中間操作`)基於發佈者流
## 3.1 Reactor基礎
### 3.1.1 發佈流包裝類
* **Flux**\<T>:一個流中**0~N**個元素
* **Mono**\<T>:一個流中**0~1**個元素
* 代碼:
```java=
public class FluxDemo {
public static void main(String[] args) throws IOException {
mono();
flux();
}
public static void mono() throws IOException {
Mono<Integer> just = Mono.just(1);
just.subscribe(e -> System.out.println("mono : " + e));
}
public static void flux() throws IOException {
//1. 數據流:多元素 (流沒有被消費,就不會被啟用)
Flux<Integer> just = Flux.just(1, 2, 3, 4, 5);
//2. 訂閱(消費)流
just.subscribe(e -> System.out.println("e1 = " + e));
//一個流可以有多個消費者
just.subscribe(e -> System.out.println("e2 = " + e));
//對每個消費者來說流都是一樣 => 廣播模式
System.out.println("======");
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1L));//每秒產生一個數字,從0開始
interval.subscribe(System.out::println);
//異步的,所以避免主線程結束
System.in.read();//輸入一個數據後,主線程結束
}
}
```
### 3.1.2 事件感知API - doOnXxx
* **說明**:當流發生什麼事情的時候,觸發一個doOnXxx
* **觸發一個回調**(`Hook 勾子函數`)
* **設定在發佈者身上**
* **注意**:如果該流有執行**中間操作**,那該**事件感知**是寫在該流的後面,該事件感知是感知該新的流
```java
public static void fluxOnEx() throws IOException, InterruptedException {
Flux<Integer> flux = Flux.range(1, 10)
//當前是 感知flux這個最一開始的流
.doOnNext(integer -> System.out.println("流1 doOnNext " + integer))
//中間操作 Processor
.map(integer -> integer * integer)
//當前是 感知map處理後的流
.doOnNext(integer -> System.out.println("流2 doOnNext " + integer))
//中間操作 Processor
.delayElements(Duration.ofSeconds(1))//延遲元素傳輸,產生新流
//當前是 感知delayElements處理後的流
.doOnNext(integer -> System.out.println("流3 doOnNext " + integer));
flux.subscribe(System.out::println);
}
```
* **doOnXxx**
* doOnComplete:流**正常結束**時,觸發
* doOnCancel:流**被取消**時,觸發
* doOnError:流**有錯誤/異常**時,觸發
* doOnNext:流**有『數據』到達**時,觸發
* SignalType.**ON_NEXT**
* doOnEach:流**有『元素』到達**時,觸發
* **元素**:流的數據、**信號**
* **reactor.core.publisher.SignalType**類:**信號**
* SUBSCRIBE:被訂閱時
* REQUEST:請求N個元素時
* CANCEL:流被取消訂閱時
* ON_SUBSCRIBE:在訂閱時
* ON_NEXT:在數據到達時
* ON_ERROR:在流錯誤時
* ON_COMPLETE:在流正常完成時
* AFTER_TERMINATE:在流被中斷以後
* CURRENT_CONTEXT:當前上下文
* ON_CONTEXT:感知上下文
* doOnSubscribe:流**被訂閱**時,觸發
* doOnRequest:當**消費者請求元素**時,觸發
* 默認請求數據量為**Long.MAX_VALUE**
* doOnTerminate:流**被取消**、**有錯誤/異常**被中斷時,觸發
* doOnDiscard:流中**有元素被忽略**時,觸發
* **代碼**:
```java=
public static void fluxOnAPI() throws IOException, InterruptedException {
Flux<Integer> flux = Flux.range(1, 10)
//中間操作 Processor
.delayElements(Duration.ofSeconds(1))//延遲元素傳輸,產生新流
//事件感知
//當前是 感知delayElements處理後的流
.doOnCancel(() -> System.out.println("流 被取消"))
.doOnError(throwable -> System.out.println("流 異常錯誤 " + throwable))
.doOnNext(integer -> System.out.println("流 doOnNext " + integer))
.doOnEach(signal -> {
if (signal.isOnNext()) {
System.out.println("流 doOnEach get doOnNext: " + signal.get());
} else if (signal.isOnComplete()) {
System.out.println("流 doOnEach get doOnComplete");
} else if (signal.isOnError()) {
System.out.println("流 doOnEach doOnError " + signal.getThrowable());
}
})
.doOnRequest(e -> System.out.println("流 doOnRequest " + e))
.doOnSubscribe(subscription -> System.out.println("流 被訂閱 " + subscription + " aa."))
.doOnComplete(() -> System.out.println("流 正常結束"))
.doOnTerminate(() -> System.out.println("流 異常結束 "));
flux.subscribe(System.out::println);
}
```
* **正常完成**:
1. 流 被訂閱(`doOnSubscribe`)
2. 流 doOnRequest(`doOnRequest`)
3. 流 doOnNext 1(`doOnNext`)
4. 流 doOnEach get doOnNext:1(`doOnEach`)
5. 1(`subscribe`)
6. **持續3~5步驟**
7. 流 doOnEach get doOnComplete(`doOnEach`)
8. 流 正常結束(`doOnComplete`)
9. 流 異常結束(`doOnTerminate`)
### 3.1.3 響應式流日誌
* log():打印流的完整日誌
* 代碼:
```java=
public static void fluxLog() throws IOException, InterruptedException {
Flux.range(1, 7)
.log()//日誌
.filter(integer -> integer > 3)
.log()//日誌
.map(i -> "haha " + i)
.log()//日誌
.subscribe(System.out::println);
}
```
* 輸出:
```txt=
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscriptionConditional)
[ INFO] (main) | onSubscribe([Fuseable] FluxFilterFuseable.FilterFuseableSubscriber)
[ INFO] (main) | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(1)
[ INFO] (main) | onNext(2)
[ INFO] (main) | onNext(3)
[ INFO] (main) | onNext(4)
[ INFO] (main) | onNext(4)
[ INFO] (main) | onNext(haha 4)
haha 4
[ INFO] (main) | onNext(5)
[ INFO] (main) | onNext(5)
[ INFO] (main) | onNext(haha 5)
haha 5
[ INFO] (main) | onNext(6)
[ INFO] (main) | onNext(6)
[ INFO] (main) | onNext(haha 6)
haha 6
[ INFO] (main) | onNext(7)
[ INFO] (main) | onNext(7)
[ INFO] (main) | onNext(haha 7)
haha 7
[ INFO] (main) | onComplete()
[ INFO] (main) | onComplete()
[ INFO] (main) | onComplete()
```
## 3.2 Reactor 核心
* 序列
* 線程調度
* 異常
* 操作符
### 3.2.1 訂閱 subscribe
* 使用**subscribe()**,傳入消費者的回調
* 常用三種subscribe:
```java=
public static void fluxSubscribe() throws IOException, InterruptedException {
Flux<Integer> range = Flux.range(1, 12).map(i -> {
if (i == 9) {
i = 10 / (9 - i);
}
return i;
});
//基本訂閱:元素消費者
range.subscribe(System.out::println);
//指定訂閱規則:元素消費者,異常結束消費者
range.subscribe(
System.out::println,//元素消費者
throwable -> System.out.println(throwable)//異常結束消費者
);
//指定訂閱規則:元素消費者,異常結束消費者,正常結束時消費者
range.subscribe(
System.out::println,//元素消費者
throwable -> System.out.println(throwable),//異常結束消費者
()-> System.out.println("End")//正常結束消費者
);
}
```
### 3.2.2 自定義消費者
* **流的生命週期的鉤子可以傳播給訂閱者**
| 類型 | 發生時機 | 用途 | 常見場景 |
| ----------------------------- | --------------------------- | --------------------------------------- | ----------------------------------------- |
| **`doOnXxx`**(Publisher階段) | 📍**事件即將發生時**(流仍繼續往下走) | **Side-effect(副作用)通知** — 用來「觀察」事件但不改變資料 | 記錄 log、統計、監控、debug |
| **`onXxx`**(中間操作) | ⚙️**事件發生後立即執行動作**(通常改變流或資料) | **主動操作/轉換行為** | 改寫資料流內容,如 `map`、`filter`、`onErrorResume` |
| **`hookOnXxx`**(Subscriber階段) | ✅**事件完成後(下游消費者收到)** | **最終回調**,你可以拿到結果、狀態、錯誤等資訊 | 在 `BaseSubscriber` 裡覆寫,用來控制背壓、監聽完成事件、錯誤處理 |
* 繼承**BaseSubscriber\<T>類**(`推薦使用`)
* 重寫如下方法
```java=
//流的生命週期的鉤子可以傳播給訂閱者
//doOnXxx:發生這個事件的時候,產生一個回調,通知你
//OnXxx:發生這個事件後,執行一個動作
//hockOnXxx:執行完這個事件後,產生一個回調,通知你
public static void fluxConsumer() {
Flux<Integer> flux = Flux.range(1, 12)
//定義在發佈者身上
.doOnNext((i) -> System.out.println("doOnNext " + i));
//自定一個消費者
flux.subscribe(new BaseSubscriber<Integer>() {
//生命週期鉤子1:訂閱關係 綁定時觸發
@Override
protected void hookOnSubscribe(Subscription subscription) {
//流被訂閱時觸發
System.out.println("綁定了 " + subscription);
//找生產要數據
request(1);//要一個數據
// requestUnbounded();//要無限數據
}
//生命週期鉤子2:訂閱關係 綁定時觸發
//每個到達元素都會觸發
@Override
protected void hookOnNext(Integer value) {
System.out.println("數據到達時觸發 " + value);
if (value == 5) {
cancel();//取消 流
}
request(1);//請求數據一次
}
@Override
protected void hookOnComplete() {
System.out.println("流正常結束");
}
@Override
protected void hookOnError(Throwable throwable) {
System.out.println("流異常結束 ,error = " + throwable);
}
@Override
protected void hookOnCancel() {
System.out.println("流被取消時執行");
}
@Override
protected void hookFinally(SignalType type) {
System.out.println("最終回調,一定會執行");
}
@Override
public Context currentContext() {
return super.currentContext();
}
});
}
```
* BaseSubscriber中的方法:
* cancel():取消流
* request():請求數據
* request(1):請求數據一次
* requestUnbounded():請求所有元素
### 3.2.3 背壓(Backpressure)
* 訂閱者向發佈者**請求數據**
* 訂閱者**沒有**向發佈者請求數據,發佈者發佈數據,`該數據會存在發佈者的緩衝區中,不會觸發任何線程進行阻塞`,除非訂閱者向發佈者請求數據
### 3.2.4 請求重塑(Reshape Requests) - 批次處理
* **buffer()**:給數據加一個緩衝區(`緩沖`),**一次最多只能消費?數據**
* 消費者可以**批次處理**
* **代碼**:
```java=
public static void buffer() {
Flux<List<Integer>> flux = Flux.range(1, 10)//原始流是10個數據
.buffer(3);//緩沖區:緩沖設定為3,消費一次最多可以獲取3個數據
//消費者共請求4次 => 10/3 => 3 + 1
flux.subscribe(System.out::println);
}
```
* **輸出結果**
```
[1, 2, 3]
[4, 5, 6]
[7, 8, 9]
[10]
```
* **limitRate()**:限制**請求數據**(`請求頻率`、`限流`)
* 限制每次從上游生產者請求的資料量來**防止資料生產速度過快**,以**避免消費者處理不過來**
* **75%策略**:第一次抓取30個數據,如果75%的元素已經處理好了,繼續抓取新的75%元素
* 代碼:
```java=
public static void limitRate(){
Flux<Integer> flux = Flux.range(1, 100)
.log()//限流觸發,看上游是怎麼限流取得資料的
.limitRate(30);//一次預取30個元素,另外有75%策略
//75%策略
//第一次抓取30個數據,如果75%的元素已經處理好了,繼續抓取新的75%元素
flux.subscribe(System.out::println);
}
```
### 3.2.5 創建序列-Sink 動態地按需生成數據
* **作用**:**動態地按需生成數據**,而不是一開始就有一組數據
* **一開始就有一組數據**:
* `Flux.rang()`
* `Flux.just()`
* **Sink**:
* Sink.**next**(元素):**發送數據**
* Sink.**complete**():**流完成**
* **generate()**:同步調用的情況下創建**序列**(`流`)
* 參數1:設定初始值
* 參數2:Consumer<SynchronousSink\<T>> generator
* Sink: 接收器、水槽、通道
* **代碼**:
```java=
//編程方式創建序列
public static void seqGenerat() {
//參數1:設定初始值
//參數2:Consumer<SynchronousSink<T>> generator
Flux<Object> generate = Flux.generate(
() -> 0,//設定原始數據 => 建議使用AtomicLong 傳遞原子操作,多線程安全
//基於狀態(state)的generate
(state, sink) -> {
sink.next("ha " + state);//傳遞數據:可能會拋出『運行時異常、編譯時異常』
if (state > 9) {
sink.complete();//傳遞數據完成
}
if (state == 7) {
sink.error(new Exception("WEEOR AA"));
}
return state + 1;//返回新的迭代state值
})
.log();
generate.subscribe();
}
```
* **create()**:**非同步調用**(`異步、多線程`)的情況下創建序列
* 代碼:
```java=
public void seqCreate() throws InterruptedException {
Flux<Object> flux = Flux.create(fluxSink -> {
MyListener myListener = new MyListener(fluxSink);
for (int i = 0; i < 10; i++) {
myListener.online("aw " + i);
}
}).log();
flux.subscribe();
}
class MyListener {
FluxSink<Object> sink;
public MyListener(FluxSink<Object> sink) {
this.sink = sink;
}
//用戶登陸,觸發online監聽
public void online(String userName) {
System.out.println("login " + userName);
//sink:發出數據
sink.next("login " + userName);
}
}
```
### 3.2.6 自定義元素處理
* **handle()**:複雜的邏輯處理,可以**轉換成多種不同類型的元素** => **filter + map**
* **條件處理**
* 自定義流中元素的處理規則
* **相似的map()**:是將一個類型轉換成另一個類型
* **轉換操作**
* **代碼**:
```java=
public void handler() {
Flux<Object> handle = Flux.range(1, 4)
.handle((value, sink) -> {
if (value % 2 == 0) {
sink.next(value * 2);
} else {
sink.next("a" + value);//向下發送數據的通道
}
})
.log();
handle.subscribe();
}
```
* 日誌:
```txt=
[ INFO] (main) | onSubscribe([Fuseable] FluxHandleFuseable.HandleFuseableSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(a1)
[ INFO] (main) | onNext(4)
[ INFO] (main) | onNext(a3)
[ INFO] (main) | onNext(8)
[ INFO] (main) | onComplete()
```
### 3.2.7 自定義線程調度規則
* 響應式編程:全異步、消息、事件回調
* **『不指定線程池,默認使用發佈者的線程』**(`因為訂閱流才會動`)
* **流的發佈**、**中間操作**,默認使用**當前線程**(`訂閱者線程`)
* **目的**:**改變**發佈者、訂閱者**所在的線程池**,進而使執行的線程有所不同
* **方法**:
* **publishOn()**:改變**發佈者**的所在的線程池
* **subscribeOn()**:改變**訂閱者**的所在的線程池
* **參數**:Scheduler類
* 使用**Schedulers類**創建(`設定線程池`)
* **重點**:方法執行後的中間操作,才會開始改用新的線程池中的線程
* **設定其中一個就可以**,publishOn 或 subscribeOn
* 常用**Schedulers類**:**調度器**(`線程池`)
* Schedulers.immediate():
* **默認**
* 無執行上下文,**當前線程運行所有操作**
* Schedulers.single():使用固定的一個單線程(`固定使用該線程池中某一個線程`)
* Schedulers.boundedElastic():有界(`線程池有線程數限制`),彈性調度線程
* 默認線程數:10 \* CPU核心
* 默認隊列:100K
* 作用:管理和存儲待處理的任務
* Schedulers.parallel():**並發池**
* **全局共享**
* **基本使用**:
```java=
public void scheduler() {
Flux.range(1, 10)
//改變發佈者的所在的線程池
.publishOn(Schedulers.parallel())//在哪個線程池把這個流的數據和操作執行
//改變訂閱者的所在的線程池
.subscribeOn(Schedulers.immediate())//在哪個線程池把這個流的讀取執行
.subscribe();
}
```
* **代碼2**:**主線程訂閱流**
```java=
public void scheduler() {
Flux.range(1, 3)
.log()
.publishOn(Schedulers.single())//在哪個線程池把這個流的數據和操作執行
.map(i -> i * i)
.log()
.subscribe();
}
```
* **map處理是改用其他線程池**
* **log**:
```txt=
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[ INFO] (main) | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | request(256)
[ INFO] (main) | onNext(1)
[ INFO] (main) | onNext(2)
[ INFO] (single-1) | onNext(1)
[ INFO] (main) | onNext(3)
[ INFO] (single-1) | onNext(4)
[ INFO] (single-1) | onNext(9)
[ INFO] (main) | onComplete()
[ INFO] (single-1) | onComplete()
```
## 3.3 常用操作
* **是一個發佈者也是一個訂閱者**
* 訂閱上一個流
### 3.3.1 filter - 過濾
* **代碼**:
```java=
/**
* onSubscribe([Fuseable] FluxFilterFuseable.FilterFuseableSubscriber) => 流被訂閱(filter中間操作被訂閱)
* request(unbounded) => 請求無線數據(requestUnbounded())
* onNext()數據到達
*/
@Test
void filter() {
Flux.range(1, 4)//流的發佈者
.log()//顯示過濾前的數據
.filter(i -> i % 2 == 0)
// .log()//顯示過濾後的數據
.subscribe();
}
```
* subscribe()是訂閱**filter這個流**
### 3.3.2 flatMap - 扁平化
* **作用**:**扁平化**
* **無序**
* **原因**:有一個concurrency參數
* 默認:256,表示同時處理256件任務
* **說明**:將多個流,變成一個流
* **代碼**:
```java=
//扁平化,無序
@Test
void flatMap() {
Flux<String> log = Flux.just("Aasd adds", "uljk lkjl")
//將多個流,變成一個流
.flatMap(s -> Flux.fromArray(s.split(" ")) )//將Array變成一個流的Flux靜態方法
.log();
log.subscribe();
}
```
### 3.3.3 concat - 連接
* **concat()**:連接多個流
* 靜態方法
* **concatWith()**:連接老流
* 注意:兩個流類型需要一致
* **concatMap()**:**連接映射**
* 一個元素可以變成多個,類型不受限制
* **按順序的異步操作**(`數據庫查詢、網絡請求、文件讀取`),速度較慢,需要等待前一個元素處理完才換下一個,**如果不用按順序處理可以改用flatMap**
#### 3.3.3.1 concat()
* **代碼**:
```java=
@Test
void concat() {
Flux.concat(Flux.range(1, 4),
Flux.just("As", "asd"))
.log()
.subscribe();
}
```
#### 3.3.3.2 concatWith()
* **代碼**:
```java=
//流連接(中間操作),運行中 連接一個新流
//限制:類型需要與舊流的元素類型相同
@Test
void concatWith() {
Flux.range(1, 2)
.map(i -> i * i)
.concatWith(Flux.just(4, 5))
.log()
.subscribe();
}
```
#### 3.3.3.3 concatMap()
* **代碼**:
```java=
//連接映射:一個元素可以變成單個或多個元素,每個元素按順序完後,彙整為一個流
//按順序的異步操作(數據庫查詢、網絡請求、文件讀取),速度較慢,需要等待前一個元素處理完才換下一個,如果不用按順序處理可以改用flatMap
@Test
void concatMap() {
Flux.just(1, 2)
//使用一個Flux或Mono包裝,Function接口
.concatMap(s -> Flux.just(s + "-a", s + "-b"))
.log()
.subscribe();
}
```
* **log()**
* onNext(1-a)
* onNext(1-b)
* onNext(2-a)
* onNext(2-b)
### 3.3.4 transform - 狀態轉換
* **作用**:**把流變成新數據**
* **transform()**:**無狀態轉換**
* 訂閱流 `不會共享 外部變量的值`
* 無論多少訂閱者,transform**只執行一次**
* **transformDeferred()**:**有狀態轉換**
* 訂閱流 `會共享 外部變量的值`
* 無論多少訂閱者,**每一個訂閱者**transform**都會執行一次**
#### 3.3.4.1 transform() - 無狀態轉換
* **代碼**:
```java=
@Test
void transform() {
//原子
AtomicInteger atomicInteger = new AtomicInteger();
Flux<String> transform = Flux.just("s", "b")
.transform(value -> {
if (atomicInteger.incrementAndGet() == 1) {
//第一次調用
return value.map(String::toUpperCase);
} else {
//不是第一次調用
return value;
}
});
transform.subscribe(v -> System.out.println("訂閱者1 " + v));
transform.subscribe(v -> System.out.println("訂閱者2 " + v));
}
```
* **輸出**:
* 訂閱者1 S
* 訂閱者1 B
* 訂閱者2 S
* 訂閱者2 B
#### 3.3.4.2 transformDeferred() - 有狀態轉換
* **代碼**:
```java=
@Test
void transformDeferred() {
//原子
AtomicInteger atomicInteger = new AtomicInteger();
Flux<String> transform = Flux.just("s", "b")
.transformDeferred(value -> {
if (atomicInteger.incrementAndGet() == 1) {
//第一次調用
return value.map(String::toUpperCase);
} else {
//不是第一次調用
return value;
}
});
transform.subscribe(v -> System.out.println("訂閱者1 " + v));
transform.subscribe(v -> System.out.println("訂閱者2 " + v));
}
```
* **輸出**:
* 訂閱者1 S
* 訂閱者1 B
* 訂閱者2 s
* 訂閱者2 b
### 3.3.5 empty - 流為空處理
* **defaultIfEmpty()**:**靜態**兜底調用
* 如果發佈者數據為空使用默認值,否則使用發佈者的值
* **switchIfEmpty()**:空轉換,調用**動態**兜底調用
* 如果發佈者數據為空使用默認值,否則使用發佈者的值
* **流為空說明**:
* **Mono.just(null)**:流裡面**有一個null值的元素**
* **Mono.empty()**:流裡面**沒有的元素**,只有完成信號
#### 3.3.5.1 defaultIfEmpty()
* **代碼**:
```java=
@Test
void empty() {
getMono()
.defaultIfEmpty("X")//如果發佈者數據為空使用默認值,否則使用發佈者的值
.log()
.subscribe();
}
```
#### 3.3.5.2 switchIfEmpty()
* **代碼**:
```java=
@Test
void switchIfEmpty() {
getMono()
.switchIfEmpty(Mono.just("AB"))//如果發佈者數據為空使用默認值,否則使用發佈者的值
.log()
.subscribe();
}
```
### 3.3.6 merge - 合併
* **作用**:**合併流**
* **merge()**:靜態調用
* A流元素、B流元素,**按元素產生時間順序排列**(`與流的順序無關`)
* **mergeWith()**:方法調用
* **mergeSequential()**:靜態調用
* **按流順序**,排列元素(`與concat相似`)
#### 3.3.6.1 merge()
* **代碼**:
```java=
@Test
void merge() throws IOException {
//concat:連接,A流元素+B流元素
//merge:合併,A流元素、B流元素 按產生時間順序排列
Flux.merge(
Flux.just(1, 2, 4).delayElements(Duration.ofSeconds(1L)),//每1秒發一個元素
Flux.just("Sd", "kkk").delayElements(Duration.ofMillis(1500L)), //每1.5秒發一個元素
Flux.just("sasd", "l;lj").delayElements(Duration.ofMillis(800L))//每0.8秒發一個元素
)
.log().subscribe();
//delayElements:為異步發送,需要主線程不要結束
System.in.read();
}
```
* **log()**
1. onNext(sasd)
2. onNext(1)
3. onNext(Sd)
4. onNext(l;lj)
5. onNext(2)
6. onNext(kkk)
7. onNext(4)
#### 3.3.6.2 mergeSequential()
* **代碼**:
```java=
@Test
void mergeSequential() throws IOException {
//按流順序,排列元素(與concat相似)
Flux.mergeSequential(
Flux.just(1, 2, 4).delayElements(Duration.ofSeconds(1L)),
Flux.just("Sd", "kkk").delayElements(Duration.ofMillis(1500L)),
Flux.just("sasd", "l;lj").delayElements(Duration.ofMillis(800L))
)
.log()
.subscribe();
System.in.read();
}
```
* **log()**:
* onNext(1)
* onNext(2)
* onNext(4)
* onNext(Sd)
* onNext(kkk)
* onNext(sasd)
* onNext(l;lj)
* onComplete()
### 3.3.7 zip - 壓縮
* **作用**:將**多個流**(`上限壓縮8個流`),**一ㄧ對應**的元素變成**多個元組**(`Tuple`)
* **注意**:**無法結對**的元素會`被忽略`
* **zip()**:靜態調用
* **zipWith()**:方法調用
#### 3.3.7.1 zip()
* **代碼**:壓縮兩個流
```java=
@Test
void zip() {
//靜態調用
Flux.zip(Flux.just(1, 2, 4),
Flux.just("A", "B", "C"))
.log()
.subscribe();
}
```
* **log()**
* onNext(\[1,A])
* onNext(\[2,B])
* onNext(\[4,C])
#### 3.3.7.2 zipWith()
* **代碼**:壓縮兩個流
```java=
@Test
void zipWith() {
Flux.just(1, 2, 3, 0)
.zipWith(Flux.just(4, 5, 6))
.log()
.subscribe();
}
```
* **log()**
* onNext(\[1,4])
* onNext(\[2,5])
* onNext(\[3,6])
## 3.4 錯誤處理
* **概念**:
* 錯誤**默認**是一個**中斷行為**
* **subscribe**:消費者可以感知 正常元素try 和 流發生的錯誤catch
* **方法**:
* **onErrorReturn()**:返回默認值
* **onErrorResume()**:調用一個有返回值的方法
* **onErrorMap()**:拋出一個**非業務邏輯**的錯誤(`已處理原本錯誤後拋出自定錯誤`)
* **doOnError()**:發佈者感知**錯誤**
* **doFinaly()**:發佈者感知**流結束**(`不論正常或異常結束`)
* **onErrorContinue()**:忽略當前異常,**僅通知紀錄**(`該異常可以做一些處理`),繼續推進
* **onErrorComplete()**:將錯誤結束信號,替換成正常結束信號
* **onErrorStop()**:在流中遇到錯誤時**停止流**,**不會進行任何其他處理**(`包括重試或繼續處理流中的後續數據`)
### 3.4.1 Catch and return a static default value - 捕獲異常返回一個靜態默認值
* 命令式編程:
```java=
try {
return doSomethingDangerous(10);
}
catch (Throwable error) {
return "RECOVERED";
}
```
* 響應式編程:onErrorReturn()
```java=
Flux.just(10)
.map(this::doSomethingDangerous)
.onErrorReturn("RECOVERED");
```
* **onErrorReturn()**
* 發佈者處理掉異常,消費者不會感知到異常
* **返回默認值**
* 正常完成
* **代碼**:
```java=
@Test
void error() {
Flux.just(1, 2, 0, 4)
.map(i -> "100 / " + i + " = " + (100 / i))
.onErrorReturn("ha")
//.onErrorReturn(ArithmeticException.class, "ASDSD") //方式二:斷言錯誤類型
.subscribe(System.out::println,
error -> System.out.println("error: " + error.getMessage()),
() -> System.out.println("流 結束")//感知正常結束
);
}
```
* **log()**:
1. 100 / 1 = 100
2. 100 / 2 = 50
3. `ASDSD`
4. 流 結束
### 3.4.2 Catch and execute an alternative path with a fallback method. - 捕獲異常並執行一個回退方法
* 命令式編程:
```java=
String v1;
try {
v1 = callExternalService("key1");
}
catch (Throwable error) {
v1 = getFromCache("key1");
}
```
* 響應式編程:**onErrorResume()**
```java=
Flux.just("key1", "key2")
.flatMap(k -> callExternalService(k)
.onErrorResume(e -> getFromCache(k))
);
```
* onErrorResume()
* 發佈者處理掉異常,消費者不會感知到異常
* **調用一個方法**
* 正常完成
* **代碼**:
```java=
@Test
void onErrorResume() {
Flux.just(1, 2, 0, 4)
.map(i -> "100 / " + i + " = " + (100 / i))
.onErrorResume(err -> Mono.just("ja"))
.subscribe(System.out::println,
error -> System.out.println("error: " + error.getMessage()),
() -> System.out.println("流 結束")//感知正常結束
);
}
```
* **log()**
1. 100 / 1 = 100
2. 100 / 2 = 50
3. `ja`
4. 流 結束
### 3.5.3 Catch and dynamically compute a fallback value. - 捕獲異常並動態計算返回值
* 命令式編程:
```java=
try {
Value v = erroringMethod();
return MyWrapper.fromValue(v);
}
catch (Throwable error) {
return MyWrapper.fromError(error);
}
```
* **響應式編程**:**onErrorResume()**
```java=
erroringFlux.onErrorResume(error -> Mono.just(
MyWrapper.fromError(error)
));
```
* 代碼:
```java=
@Test
void onErrorResume() {
Flux.just(1, 2, 0, 4)
.map(i -> "100 / " + i + " = " + (100 / i))
.onErrorResume(throwable -> processError(throwable))
.subscribe(System.out::println,
error -> System.out.println("error: " + error.getMessage()),
() -> System.out.println("流 結束")//感知正常結束
);
}
Mono<String> processError(Throwable throwable) {
if (throwable instanceof ArithmeticException) {
return Mono.just("Zero");
}
if (throwable instanceof NullPointerException) {
return Mono.just("Null");
}
return Mono.just("HAHA");
}
```
* **log()**:
1. 100 / 1 = 100
2. 100 / 2 = 50
3. `Zero`
4. 流 結束
### 3.5.4 Catch, wrap to a BusinessException, and re-throw. - 捕獲異常後,包裝成一個業務異常並拋出
#### 3.5.4.1 onErrorResume()
* **包裝重新拋出異常**:
* 發佈者處理掉異常,**訂閱者有感知**
* **拋出新異常** 或 **Mon.empty() 吃掉異常**
* 流**異常結束**
* 命令式編程:
```java=
try {
return callExternalService(k);
}
catch (Throwable error) {
throw new BusinessException("oops, SLA exceeded", error);
}
```
* **響應式編程1**:**onErrorResume()**
```java=
Flux.just("timeout1")
.flatMap(k -> callExternalService(k))
.onErrorResume(original -> Flux.error(
new BusinessException("oops, SLA exceeded", original))
);
```
* **代碼1**:
```java=
@Test
void catchAndThrow() {
Flux.just(1, 2, 0, 4)
.map(i -> "100 / " + i + " = " + (100 / i))
.onErrorResume(throwable -> Flux.error(new BusinessException(throwable.getMessage() + " = Bone")))
.subscribe(System.out::println,
error -> System.out.println("error: " + error.getMessage()),
() -> System.out.println("流 結束")//感知正常結束
);
}
```
* **log()**:
1. 100 / 1 = 100
2. 100 / 2 = 50
3. `error: / by zero = Bone`
#### 3.5.4.2 onErrorMap()
* **響應式編程2**:**onErrorMap()**
```java=
Flux.just("timeout1")
.flatMap(k -> callExternalService(k))
.onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));
```
* **onErrorMap()**:將一個異常**映射**成另一個異常
* **代碼2**:
```java=
@Test
void onErrorMap() {
Flux.just(1, 2, 0, 4)
.map(i -> "100 / " + i + " = " + (100 / i))
.onErrorMap(throwable -> new BusinessException(throwable.getMessage()))
.subscribe(System.out::println,
error -> System.out.println("error: " + error.getMessage()),
() -> System.out.println("流 結束")//感知正常結束
);
}
```
### 3.5.5 Catch, log an error-specific message, and re-throw. - 捕獲異常,紀錄特殊的錯誤日誌,再拋出重新異常
* 捕獲異常,並執行動作,**且不影像異常繼續順者流水線傳播**
* **doOnError()**:
* 發佈者不處理掉異常,**只在異常發生的時候做一件事**(`發佈者感知`),**消費者有感知**
* 命令式編程:
```java=
try {
return callExternalService(k);
}
catch (RuntimeException error) {
//make a record of the error
log("uh oh, falling back, service failed for key " + k);
throw error;
}
```
* **響應式編程**:**doOnError()**(`事件感知`)
```java=
LongAdder failureStat = new LongAdder();
Flux<String> flux =
Flux.just("unknown")
.flatMap(k -> callExternalService(k)
.doOnError(e -> {
failureStat.increment();
log("uh oh, falling back, service failed for key " + k);
})
);
```
* **代碼**:
```java=
@Test
void doOnError() {
Flux.just(1, 2, 0, 4)
.map(i -> "100 / " + i + " = " + (100 / i))
.doOnError(throwable -> {
System.out.println("doOnError: " + throwable.getMessage());
})
.subscribe(System.out::println,
error -> System.out.println("error: " + error.getMessage()),
() -> System.out.println("流 結束")//感知正常結束
);
}
```
* **log()**:
1. 100 / 1 = 100
2. 100 / 2 = 50
3. doOnError: / by zero
4. error: / by zero
### 3.5.6 Use the finally block to clean up resources or a Java 7 "try-with-resource" construct - 執行finally
* **doFinaly()**:
* 發佈者**感知流完成**時(`不論是否中斷或正常結束都會執行`),執行操作
* 命令式編程:
* 例1:
```java=
Stats stats = new Stats();
stats.startTimer();
try {
doSomethingDangerous();
}
finally {
stats.stopTimerAndRecordTiming();
}
```
* 例2:
```java=
try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) {
return disposableInstance.toString();
}
```
* **響應式編程**:**doFinaly()**(`事件感知`)
```java=
Stats stats = new Stats();
LongAdder statsCancel = new LongAdder();
Flux<String> flux =
Flux.just("foo", "bar")
.doOnSubscribe(s -> stats.startTimer())
.doFinally(type -> {
stats.stopTimerAndRecordTiming();
if (type == SignalType.CANCEL)
statsCancel.increment();
})
.take(1);
```
* **代碼**:**流中斷結束**
```java=
@Test
void doOnError() {
Flux.just(1, 2, 0, 4)
.map(i -> "100 / " + i + " = " + (100 / i))
.doOnError(throwable -> {
System.out.println("doOnError: " + throwable.getMessage());
})
.doFinally(signalType -> System.out.println("流完: " + signalType))
.subscribe(System.out::println,
error -> System.out.println("error: " + error.getMessage()),
() -> System.out.println("流 結束")//感知正常結束
);
}
```
* **log()**:
1. 100 / 1 = 100
2. 100 / 2 = 50
3. doOnError: / by zero
4. error: / by zero
5. `流完: onError`
### 3.5.7 忽略當前異常,僅通知紀錄,繼續推進
* **onErrorContinue()**:
* 發佈者**處理掉異常**,消費者不會感知
* **代碼**:
```java=
@Test
void jumpError() {
Flux.just(1, 2, 0, 4)
.map(i -> 10 / i)
//發生錯誤,繼續前進
.onErrorContinue((throwable, val) -> {
System.out.println(throwable);
System.out.println("val = " + val);
}
)
.subscribe(v -> System.out.println("v = " + v),
error -> System.out.println("e = " + error)
);
}
```
* **log()**:
1. v = 10
2. v = 5
3. `java.lang.ArithmeticException: / by zero`
4. `val = 0`
5. v = 2
### 3.5.8 結束處理
* **onErrorComplete()**:將錯誤結束信號,替換成正常結束信號
```java=
@Test
void onErrorComplete() {
Flux.just(1, 2, 0, 4)
.map(i -> 10 / i)
.onErrorComplete()//將錯誤結束信號,替換成正常結束信號
.subscribe(System.out::println,
error -> System.out.println("error: " + error.getMessage()),
() -> System.out.println("流 結束")//感知正常結束
);
}
```
* **log()**:
1. 10
2. 5
3. `流 結束`
* **onErrorStop()**:在流中遇到錯誤時**停止流**,**不會進行任何其他處理**(`包括重試或繼續處理流中的後續數據`)
```java=
@Test
void onErrorStop() {
Flux.just(1, 2, 0, 4)
.map(i -> 10 / i)
.onErrorStop()//錯誤後,停止流,所有監聽者全部結束
.subscribe(System.out::println,
error -> System.out.println("error: " + error.getMessage()),
() -> System.out.println("流 結束")//感知正常結束
);
}
```
* **log()**:
1. 10
2. 5
3. `error: / by zero`
## 3.6 重試、超時
* **retry()**:把流**從頭到尾**從新請求
* **timeout()**:設定**超時的時間**
* 超時拋出`java.util.concurrent.TimeoutException `
* **代碼**:使用**超時方法**(`timeout()`)模擬,重試設定**ㄧ次**
```java=
@Test
void retryAndTimeOut() throws IOException {
Flux.just(1, 2, 3)
.log()
.delayElements(Duration.ofSeconds(3))
.log()
.timeout(Duration.ofSeconds(2))//獲取元素超時設定
.retry(1)//把流從頭到尾從新請求
.map(i -> i + " ha")
.subscribe();
}
```
* log()
1. 第一次
1. onSubscribe(\[Synchronous Fuseable] FluxArray.ArraySubscription)
2. onSubscribe(FluxConcatMapNoPrefetch.FluxConcatMapNoPrefetchSubscriber)
3. request(unbounded)
4. request(1)
5. onNext(1)
6. cancel()
7. cancel()
2. **重試**:將整個流**從頭訂閱**
1. onSubscribe(\[Synchronous Fuseable] FluxArray.ArraySubscription)
2. onSubscribe(FluxConcatMapNoPrefetch.FluxConcatMapNoPrefetchSubscriber)
3. request(unbounded)
4. request(1)
5. onNext(1)
6. cancel()
7. cancel()
4. 超時錯誤拋出
## 3.7 Sinks 工具類 - 數據管道
* **概念**:發送數據的**管道**
* 所有數據順著這個管道往下走
* 訂閱者默認**從訂閱的當下開始接收據**
* **onBackpressureBuffer()**:背壓(Backpressure)處理操作符,設定緩衝區大小
* **作用**:將溢出的數據存放到一個緩衝區中,讓**消費者在有空時再處理**這些數據。它確保數據不會丟失,但**需要有足夠的內存來存放這些數據**。
### 3.7.1 單播(單一訂閱者)
* **概念**:該**數據管道**(`Sinks`)只能綁定單個訂閱者
* **多個訂閱者的錯誤訊息**: `java.lang.IllegalStateException: Sinks.many().unicast() sinks only allow a single Subscriber`
* **方法**:**Sinks.many().unicast()**
* **代碼**:
```java=
@Test
void sink() throws InterruptedException, IOException {
//從訂閱時,才會開始消費
//1. 建立數據管道
Sinks.Many<Object> many = Sinks.many()
.unicast()//單播
.onBackpressureBuffer(new LinkedBlockingDeque<>(5));//背壓對列,該隊列最多存放(緩存)5個元素
new Thread(() -> {
for (int i = 0; i < 5; i++) {
//2. 發送數據
many.tryEmitNext("a-" + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
//3. 訂閱數據
many.asFlux().subscribe(System.out::println);
System.in.read();
}
```
### 3.7.2 多播(無限訂閱者)
* **概念**:該**數據管道**(`Sinks`)綁頂**多個**訂閱者
* **方法**:**Sinks.many().multicast()**
* **代碼**:
```java=
@Test
void sinkMulticast() throws InterruptedException, IOException {
//從訂閱時,才會開始消費
Sinks.Many<Object> many = Sinks.many()
.multicast()//多播
.onBackpressureBuffer();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
//發送數據
many.tryEmitNext("a-" + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
many.asFlux().subscribe(v -> System.out.println("v1 = " + v));
Thread.sleep(1500);
//延後訂閱
many.asFlux().subscribe(v -> System.out.println("v2 = " + v));
System.in.read();
}
```
* **log()**:
1. v1 = a-0
2. v1 = a-1
3. v1 = a-2
4. v2 = a-2
5. v1 = a-3
6. v2 = a-3
7. v1 = a-4
8. v2 = a-4
* **問題**:缺失之前未訂閱時,所產生的數據?
* 默認情況
### 3.7.3 數據重放
* **概念**:這個管道能**重放元素**,**後來的訂閱者**也會`接收到未訂閱前的元素`
* **使用一組隊列進行數據緩存**,來達到重放
* **方法**:**Sinks.many().replay();**
* **limit()**:**限制重放元素**
* **代碼**:重放**1個**元素(`訂閱當下的前一個元素`)
```java=
@Test
void sinksReplay() throws InterruptedException, IOException {
//重放,這個管道能重放元素,後來的訂閱者也會接收到未訂閱前的元素
Sinks.Many<Object> replay = Sinks.many().replay()
.limit(1);//重放三個元素
new Thread(() -> {
for (int i = 0; i < 5; i++) {
//發送數據
replay.tryEmitNext("a-" + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
replay.asFlux().subscribe(v -> System.out.println("v1 = " + v));
Thread.sleep(3000);
replay.asFlux().subscribe(v -> System.out.println("v2 = " + v));
System.in.read();
}
```
* **log()**
* v1 = a-0
* v1 = a-1
* `v1 = a-2`(`重放`)
* v1 = a-3
* `v2 = a-2`(`重放`)
* v2 = a-3
* v1 = a-4
* v2 = a-4
### 3.7.4 Flux的數據重放
* **cache()**:設定**緩存數據**
* **默認下是緩存所有數據**(`不設定緩存大小`),但可能會**導致OOM**
* **代碼**:緩存**三個**數據
```java=
@Test
void noSinksReplay() throws InterruptedException {
Flux<Integer> cache = Flux.range(1, 4)
.delayElements(Duration.ofSeconds(1))
.cache(3);//緩存三個數據
cache.subscribe(v->System.out.println("V1 = "+v));
Thread.sleep(5000);
cache.subscribe(v->System.out.println("V2 = "+v));
}
```
* **log()**:
1. V1 = 1
2. `V1 = 2`
3. `V1 = 3`
4. `V1 = 4`
5. V2 = 2
6. V2 = 3
7. V2 = 4
## 3.8 阻塞API - block()
* **概念**:**不使用subscribe下,從Flux獲取數據**
* 只能從流中獲取**單個數據**
* 如果是一個Flux的數據,將此Flux -> Mono後,再獲取數據
* **也是一種訂閱者**(`BlockingMonoSubscriber`)
* **方法**:
* **block()**:獲取數據
* **blockLast()**: 獲取流**最後一個數據**
* **blockFirst()**:獲取流**第一個數據**
* **代碼**:獲取**流**的**最後一個數據**
```java=
@Test
void blockLast() {
Integer integer = Flux.just(1, 2, 4)
.map(i -> i + 10)
.blockLast();
System.out.println(integer);
}
```
* 輸出:`14`
* **代碼**:獲取**流**的**所有數據**
```java=
@Test
void blockList() {
Mono<List<Integer>> mono = Flux.just(1, 2, 4)
.map(i -> i + 10)
.collectList();
List<Integer> block = mono.block();//也是一種訂閱者,BlockingMonoSubscriber
System.out.println(block);
}
```
* 輸出:`[11, 12, 14]`
* collectList():收集流中的數據,變成單一個數據
## 3.9 ParalleFlux 並發流
* **parallel()**:設定**並發數**
* 與 **runOn()** 混合使用(`設定後續動作要運行的線程池`)
* **代碼**:**百萬數據,分成8個線程,分批處理**
```java=
@Test
void paralleFlux() {
//情景:百萬數據,分成8個線程,分批處理
Flux.range(1, 5000)
.buffer(100)//Flux<List<Integer>>,緩衝區每次消費100個數據
.parallel(8)//設定8個線程並發處理,所以會有8次訂閱上面的流
.runOn(Schedulers.newParallel("YY"))//後續的操作運作在這一個新創建的線程池
.log()
.subscribe(v -> System.out.println("v = " + v));
}
```
* **buffer(100)**:**緩衝區每次消費100個數據**(`批次處理`)
## 3.10 Context(上下文)API
* **文檔**:https://projectreactor.io/docs/core/snapshot/reference/advancedFeatures/context.html#context.api
* **解決**:資料**流期間共享數據**(`多線程處理下`)
* **ThreadLocal**在響應式程式設計中無法使用。
* 作用:用來在多線程環境下為**每個線程維護各自的變量副本**(`一個ThreadLocal 變量的值在每個線程中都是獨立的,互不影響`)
* **方法**:
* **Context**:讀寫
* **ContextView**:只讀
* **注意**:
* 上游能獲取下游**最近一次的數據**
* **Context**為**不可變**(`解決多線程的數據競爭問題`)
* **重點**:**變量是由下游傳遞到上游**
* **響應式**:dao(`產生數據`) -> service -> controller
* **變量**從下游**傳播**給上游,**上游不要知道數據條件**,**數據產生者**為`上游`,條件是由下游所擁有
* 代碼:
```java=
@Test
void threadLocal() {
Flux.just(1, 2, 3)
//支持Context的中間操作
.transformDeferredContextual((flux, contextView) -> flux
.map(i -> i + " " + (String) contextView.get("prefix")))
//上游能獲取下游最近一次的數據
// .contextWrite(Context.of("a", "As"))
.contextWrite(c -> c.put("prefix", "haha"))
//在ThreadLocal中共享數據,上游的所有人能夠看到,Context是由下游傳播給上遊
.subscribe(System.out::println);
//命令式 controller -> service -> dao
//響應式 dao -> service -> controller == 所以是從下游傳播給上游,上游不要知道獲取數據條件,數據產生者為上游,條件是由下游所擁有
//Context不可變,解決多線程的數據競爭問題
}
```
* **contextWrite()**:創建context,兩種創建方式
###### tags:`Reactive Stream`