---
title: 'SpringBoot3 Webflux - 響應式編程 '
disqus: hackmd
---
# 1. WebFlux
* **WebFlux**:**底層**完全基於 **netty** + **Reactor** + **Springweb** 完成一個全異步非阻塞Web響應式框架
* 底層:異步+訊息隊列(內存)+事件回調機制 => 整套系統
* 優點:使用少量資源處理大量請求
* **官網**:https://docs.spring.io/spring-framework/reference/web/webflux.html
## 1.1 組件對比
| API 功能 | Servlet| WebFlux |
| -------- | -------- | -------- |
| 前端控制器 | DispatcherServlet| DispatcherHandler|
| 處理器 | Controller | WebHandler/Controller |
| 請求、響應 | **ServletRequest**<br>**ServletReponse**| **ServerWebExchange**|
| 過濾器 |Filter(HttpFilter)| WebFilter |
| 異常處理器|HandlerExceptionReslover|DispatcherExceptionHandler|
| Web配置 | @EnableWebMVC | @EnableWebFlux |
| 自定義配置 | WebMvcConfigurer | WebFluxConfigurer |
| 返回結果 |任意 | **Mono**、**Flux**、任意 |
| 發送REST請求 | RestTemplate | WebClinet |
## 1.2 引入WebFlux
* **pom.xml**:
```xml=
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
```
* **Context響應式上下文數據傳遞**:由下游**傳播**給上游
* 阻塞式:瀏覽器 --> Controller --> Service --> DAO
* 響應式:**DAO**(`數據源查詢對象『數據發佈者』`) --> Service --> Controller --> 瀏覽器
# 2. Reactor核心-HttpHandler原生API
* **代碼**:使用HttpHandler、HttpServer**原生Reactor Netty API**編寫一**個伺服器**
```java=
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import reactor.core.publisher.Mono;
import reactor.netty.http.server.HttpServer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class Webflux1Application {
public static void main(String[] args) throws IOException {
//1.創建一個處理Http請求的處理器,參數:請求、響應,返回值 Mono<Void>:代碼處理完成的信號
HttpHandler httpHandler = (ServerHttpRequest request, ServerHttpResponse response) -> {
System.out.println(Thread.currentThread().getName() + " 請求進來:" + request.getURI());
//編寫請求的處理業務
// response.getHeaders();//獲取Header
// response.getStatusCode();//獲取響應狀態碼
// response.getCookies();//獲取Cookie
// response.bufferFactory();//buffer工廠
// response.writeWith();//把Xxx寫出
// response.setComplete();//響應結束
//數據的發佈者:Mono<DataBuffer>\Flux<DataBuffer>
//1. 創建 響應數據的DataBuffer
DataBufferFactory dataBufferFactory = response.bufferFactory();
//2. 寫數據,數據Buffer
DataBuffer dataBuffer = dataBufferFactory.wrap((request.getPath() + " hello! ").getBytes(StandardCharsets.UTF_8));
//3. 需要一個DataBuffer的發佈者
return response.writeWith(Mono.just(dataBuffer));
};
//2、起動一台伺服器,監聽8080端口,接收數據,拿到資料交給 HttpHandler 進行請求處理
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
//3. 啟動Netty伺服器
HttpServer.create()
.host("localhost")
.port(8080)
.handle(adapter)//指定一個處理器
.bindNow();//現在綁定
System.out.println("啟動,監聽8080,接收請求");
System.in.read();
System.out.println("End");
}
}
```
# 3. DispatcherHandler 原理
* **SpringMVC**:DispatcherServlet
* **SpringWebFlux**:**DispatcherHandler**
## 3.1 請求處理流程
* **三個屬性**:
* HandlerMapping:請求映射處理器
* 保存每個請求由哪個方法進行處理
* HandlerAdapter:處理器的適配器
* 反射執行目標方法
* HandlerResultHandler:處理結果的處理器
* **源碼**:
```java=
@Nullable
private List<HandlerMapping> handlerMappings;
@Nullable
private List<HandlerAdapter> handlerAdapters;
@Nullable
private List<HandlerResultHandler> resultHandlers;
```
* DispatcherServlet有一個**doDispatch()**,來處理所有請求
* DispatcherHandler有一個**handle()**,來處理所有請求
```java=
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
if (this.handlerMappings == null) {
return createNotFoundError();
}
if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
return handlePreFlight(exchange);
}
return Flux.fromIterable(this.handlerMappings)//拿到所有的handlerMappings
.concatMap(mapping -> mapping.getHandler(exchange))//找每一個mapping,看誰能夠處理這個請求的handlerAdapter
.next()//直接處發獲取元素,拿到流的第一個符合的元素
.switchIfEmpty(createNotFoundError())//找不到元素,觸發404錯誤
.onErrorResume(ex -> handleResultMono(exchange, Mono.error(ex))//異常處理,一旦前面發生異常,調用handleResultMono()處理
.flatMap(handler -> handleRequestWith(exchange, handler));//調用方法處理請求,得到響應結果
}
```
1. **請求**和**響應**都封裝在**ServerWebExchange**中,由**handle方法**進行處理
2. 如果**沒有任何請求映射處理器**,直接返回一個**Mono.error** (`創建一個未找到的錯誤`) 的終結流
```java=
private <R> Mono<R> createNotFoundError() {
return Mono.defer(() -> {
Exception ex = new ResponseStatusException(HttpStatus.NOT_FOUND);
return Mono.error(ex);
});
}
```
* Mono.defer():該流有訂閱,且流被激活後,動態調用
* 延遲創建 Mono
* 每次被訂閱時調用,重新執行邏輯,創建新的Mono
4. **跨域工具**,是否跨域請求,跨域請求檢查是否複雜跨域,需要預檢請求;
* CorsUtils.isPreFlightRequest
6. **Flux串流操作**
1. 找到**HandlerMapping**
2. 獲取**handlerAdapter**
3. Adapter**處理請求**
* **期間的的錯誤**由 **onErrorResume()** 觸發回調感知
* **源碼兩個核心方法**:
* **handleRequestWith**:編寫了**handlerAdapter**怎麼**處理清求**
* **handleResult**:處理返回值
# 4. 註解式Controller
* **說明**:SpringWebFlux**向下兼容**原來SpringMVC`大多數注解和API`
## 4.1 基本範例
* **代碼**:
```java=
@RestController
public class HelloController {
//WebFlux:向下兼容原來SpringMVC大多數注解和API
@GetMapping("/hello")
public String hello(@RequestParam(value = "key", defaultValue = "AA") String key) {
return "hello world! " + key;
}
//推薦方式
//1. 返回單個數據Mono
//2. 返回多個數據Flux
@GetMapping("/haha")
public ResponseEntity<Mono<String>> haha() {
return ResponseEntity.ok(Mono.just("haha"));
}
@GetMapping(value = "/huhu", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<Flux<String>> huhu() {
return ResponseEntity.ok(Flux.just("huhu1", "huhu2"));
}
}
```
## 4.2 Server-Sent Events 服務器推送
* **說明**:**服務器推送**
* **服務器推送的專用類**:org.springframework.http.codec.**ServerSentEvent**
* SSE **的核心數據結構包括**:
* 數據 (data):每個事件的主要內容。
* 事件類型 (event):標識事件類型的名稱。
* ID (id):唯一標識該事件的標識符。
* 重試時間 (retry):通知客戶端在連接斷開時重試的間隔時間。
* 備註(comment)
* **代碼**:每500毫秒,推送一個數據到前端
```java=
//3. 配合Flux,完成SSE(Server Send Event)服務端事件推送
@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public ResponseEntity<Flux<ServerSentEvent<String>>> sse() {
return ResponseEntity.ok(Flux.range(1, 10)
.map(i ->
//構建一個SSE對象
ServerSentEvent.builder("ha " + i)
.id(i + " id")//數據編號
.comment("her-" + i)//備註
.event("haha-" + i)//表示那一個事件推送
.build()
)
.delayElements(Duration.ofMillis(500))
);
}
```
* **指定返回的內容類型**為 **text/event-stream** (`SSE 的標準 MIME 類型`)
* SSE與WebScoket的區別:
* SSE:單工
* 請求過去以後,等待**服務端源源不絕的數據**
* WebScoket:雙工
* 連接建立後,可以任意交互
## 4.3 檔案上傳
* **文檔**:https://docs.spring.io/spring-framework/reference/web/webflux/controller/ann-methods/multipart-forms.html
* 使用**FilePart**,`MultipartFile也可以使用`
* FilePart的效率更好,底層採用`零拷貝`
* **代碼**:
```java=
@GetMapping("/upload")
public String uploadMVC(MultipartFile也可以使用 multipartFile) {
return "OK";
}
@GetMapping("/upload2")
public String uploadFlux(FilePart filePart) {
filePart.filename();
filePart.transferTo(Path.of("/"));//高速,零拷貝
return "OK";
}
```
# 5. 錯誤響應-全局異常處理
* **@ControllerAdvice方式** 和 **ErrorAttributes + AbstractErrorWebExceptionHandler** 只會有**其中一個生效**
* **ErrorAttributes + AbstractErrorWebExceptionHandler**是`預設`
* **@ControllerAdvice方式** 是用`配置`開啟**預設處理類** 或 **自行使用該註解**
## 5.1 @ControllerAdvice方式-應用層
* **說明**:SpringWebFlux**向下兼容**原來SpringMVC`大多數注解和API`
* **方式**:基於**流**的**回調機制**
* MCV:是基於AOP
* 開啟**默認**的 **@ControllerAdvice** 的處理類
* **配置**:`spring.webflux.prombledetails.enabled=true`
* **默認的處理類**(**ResponseEntityExceptionHandler**)
* 建議參考**ResponseEntityExceptionHandler**
* 自動建立 ProblemDetail
* 自動設定 status, title, detail, instance
* 自動解析 MessageSource 進行 i18n
* 自動對應常見 WebFlux 例外(像 MissingRequestValueException, UnsupportedMediaTypeStatusException 等)
* **代碼**:**使用方式不變**
```java=
//@ResponseBody//這個類別中的所有方法預設都會回傳 HTTP 回應的正文(Response Body)
//@ControllerAdvice
@RestControllerAdvice// @ControllerAdvice + @ResponseBody
public class GloabExceptionHandler {
@ExceptionHandler(ArithmeticException.class)
public String handlerArithmeticException(ArithmeticException exception) {
System.out.println("數學運算異常 " + exception.getMessage());
return "a";
}
}
```
* **@RestControllerAdvice** = **@ControllerAdvice** + **@ResponseBody**
* **@ExceptionHandler**:捕獲異常
* **ErrorResponse**:快速構建錯誤響應(`底層為ProblemDetail`)
* **ProblemDetail**:自定義PD返回
## 5.2 ErrorAttributes + AbstractErrorWebExceptionHandler - Reactor 層(最低層)
* spring webflux預設
* 攔截所有未被 ControllerAdvice 捕獲的錯誤
* 實做ErrorAttributes和AbstractErrorWebExceptionHandler
* ErrorAttributes:產生Error的Response
* 默認的DefaultErrorAttributes需要,注意ErrorAttributeOptions,他會移除回傳Map中的某些Key
* **ErrorProperties**:可以調整返回的Key的配置
* **參數**:
* server.error.include-binding-errors:返回@Vaild的錯誤結果
* server.error.include-message:返回錯誤的Msg
* server.error.include-stacktrace:返回stacktrace
* **參數值**:
| 參數值 | 行為說明 | 常見用途 |
| :--------- | :------------ | :--------- |
| `NEVER` | 永不輸出該屬性 | 生產環境 |
| `ALWAYS` | 永遠輸出 | 開發、除錯用 |
| `ON_PARAM` | 只有在請求帶特定參數時輸出 | 兼顧安全與調試靈活性 |
* AbstractErrorWebExceptionHandler:將訊息回傳
# 6. 註解開發
## 6.1 目標方法參數
* **文檔**:https://docs.spring.io/spring-framework/reference/web/webflux/controller/ann-methods.html
* **ServerWebExchange**:封裝了請求和回應對象的對象,獲取自定義數據,自定義響應
* **ServerHttpRequest**:請求對象
* **ServerHttpResponse**:響應對象
* **WebSession**:訪問Session對象
* org.springframework.http.HttpMethod:請求方式
* java.util.Locale:國際化
* java.util.TimeZone + java.time.ZoneId:時區
* @PathVariable:路徑變量
* @MatrixVariable:矩陣變量
* @RequestParam:請求參數
* @RequestHeader:請求頭
* @CookieValue:獲取Cookie
* @RequestBody:獲取請求体
* HttpEntity\<B>:封裝後的請求對象
* @RequestPart:form-data
* Errors or BindingResult:數據校驗
* @RequestAttribute:轉發請求的請求域數據
* 其他:所有對象都能作為參數
* 基本類型,等於標註@RequestParam
* 對象類型,等於標註@ModelAttribute
## 6.2 返回值
* @ResponseBody:把響應數據寫出,如果是對象自動轉換成Json
* **ResponseEntity\<B>**:支持快捷**自定義響應內容**
* HttpHeaders:沒有響應內容,只有響應頭
* **ErrorResponse**:快速構**建錯誤響應**
* **ProblemDetail**:SpringBoot3
* String:與之前使用規則一樣
* View:與之前使用規則一樣:與之前使用規則一樣
* java.util.Map \ org.springframework.ui.Model:與之前使用規則一樣
* @ModelAttribute:與之前使用規則一樣
* **Rendering**:一種view對象
* 新版的**頁面跳轉API**(`不能標註@ResponseBody`)
```java=
//類要標註@Contoller,
@GetMapping
public Rendering rendering() {
Rendering.redirectTo("/ad");//重定向到目前項目根路徑下的ad
return Rendering.redirectTo("https://www.google.com").build();//重定向到google
}
```
* void:代表響應完成信號
* **Flux\<ServerSentEvent>**:使用`text/event-stream`完成SSE效果
* 其他值:未在上述列表的返回值,當成頁面的數據
# 7. RequestContext
* **作用**:儲存和管理與**目前 HTTP 請求相關的上下文資訊的機制**。它可以在`請求處理的各個階段共享數據`
* **SpringMVC**:**RequestContextHolder** 一種靜態方法存取目前請求的上下文資訊:
* 它使用 **ThreadLocal** 儲存請求上下文,**確保資料與當前線程綁定**。
* 適合傳統的基於 Servlet 的 Web 應用
* **獲取當前上下文**:
```java=
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest request = attributes.getRequest();
```
* **SpringWebFlux**:Reactor的**ContextView**(`參考響應式-概念`)
* 在整個**反應式鏈中傳遞共享資料**(類似於 ThreadLocal)
* 可儲存使用者資訊、設定資料等,與每個 Mono 或 Flux 的執行綁定。
# 8. WebFlux配置
* **@EnableWebFlux**:開啟WebFlux自定義
* 不推薦使用(`因為所有WebFlux的默認配置全部失效,全部自定義`)
* WebFluxAutoConfiguration:WebFlux的自動配置
## 8.1 自定義WebFlux-修改某些配置
* 注入**WebFluxConfigurer**組件,並**重寫需要調整的底層邏輯**
* **擴展預設配置,而不會完全覆蓋**
* **代碼**:**修改默認的跨域配置**
```java=
@Configuration
public class MyConfiguration {
//配置底層
@Bean
public WebFluxConfigurer configurer1() {
return new WebFluxConfigurer() {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**")//允許所有路徑
.allowedHeaders("*")//允許所有請求頭
.allowedMethods("*")//允許所有方法
.allowedOrigins("localhost");
}
};
}
}
```
# 9. Filter
* **使用WebFilter**
* **代碼**:
```java=
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
public class MyWebFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
System.out.println("===請求放行之前===");
Mono<Void> filter = chain
.filter(exchange)//放行
.doOnError(throwable -> System.out.println(throwable))//目標方法異常後
.doFinally(signalType -> System.out.println("=目標方法執行之後="));//目標方法執行之後
return filter;//注意返回的流
}
}
```
# 10. R2DBC
* 響應式的關聯式ORM
## 10.1 依賴
* 導入
* 驅動
* 連接池
* pom.xml
```xml=
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<scope>runtime</scope>
</dependency>
```
## 10.2 基本範例-R2DBC
* 代碼:
```java=
// 思想:
// 1. 有了r2dbc,我們的應用在資料庫層面 天然支援高並發、高吞吐量。
// 2. 並不能提升開發效率
@Test
void getConnection() throws IOException {
// r2dbc基於全異步、響應式、訊息驅動
// r2dbc:postgresql://localhost:5432/postgres
//0. postgresql配置
PostgresqlConnectionConfiguration postgresqlConnectionConfiguration = PostgresqlConnectionConfiguration.builder()
.addHost("localhost", 5432)
.database("postgres")
.username("nicolas")
.build();
//1. 獲取連接工廠,使用Postgresql
PostgresqlConnectionFactory connectionFactory = new PostgresqlConnectionFactory(postgresqlConnectionConfiguration);
//2. 獲取連接,發送sql
//數據發佈者
Publisher<? extends Connection> publisher = connectionFactory.create();
//Mono.from 通常用於從某個 Publisher 中提取數據,並將其包裝為一個 Mon
Mono.from(connectionFactory.create())
//flatMap 改用flatMapMany,因為會返回很多數據
.flatMapMany(connection -> connection
.createStatement("SELECT * from student WHERE age > $1 and age < $2")
.bind(0, 9)
.bind(1, 123)
.execute()
)
//使用flatMap,上面的流,可能是Flux\Mono,map的前提是上面的流與下面的流要相同
.flatMap(result -> result
.map((row, rowmedata) ->
row.get("id", String.class)))
.take(5)//取出5筆數據
.subscribe(System.out::println);
//卡住主線程
System.in.read();
}
```
* take():取出5筆數據
* 背壓:不用回傳所有東西,基於請求量回傳
## 10.3 SpringData-R2DBC
* **配置**:
```properties=
spring.r2dbc.url=r2dbc:postgresql://localhost:5432/postgres
spring.r2dbc.username=nicolas
spring.r2dbc.password
```
### 10.3.1 SpringBoot對r2dbc的自動配置
* **R2dbcAutoConfiguration**:主要配置**連接工廠**、**連接池**(`非DB線程池,不使用DB線程池`)
* R2dbcDataAutoConfiguration:
* **R2abcEntityTemplate**:操作資料庫的響應式客戶端(XxxTemplate)
* 提供 CRUD API
* RelationalManagedTypes:描述應用程序中與數據庫交互的**受管理的實體類型**
* 使用注解如 **@Entity** 或 **@Table** 標記的類
* R2dbcMappingContext:數據類型映射關係
* MappingR2dbcConverter:轉換器
* R2dbcCustomConversions:自定義轉換器
* 自定數據類型轉換功能
* **R2dbcRepositoriesAutoconfiguration**:開啟SpringData聲明式介面方式的CRUD
* R2dbcTransactionManagerAutoConfiguration:事務管理
### 10.3.2 DatabaseClient & R2dbcEntityTemplate
* 兩個R2DBC核心API
* DatabaseClient
* R2dbcEntityTemplate
#### 10.3.2.1 R2dbcEntityTemplate
* **作用**:提供CRUD API
* **映射對象**:
```java=
@Table(name = "student")
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class StudentData {
@Id
@Column("id")
private String id;
@Column("age")
private Integer age;
@Column("name")
private String name;
@Column("nickname")
private String nickName;
}
```
* 代碼:
```java=
@Autowired//join很難處理
R2dbcEntityTemplate r2dbcEntityTemplate;//CRUD API
@Test
void r2dbcEntityTemplate() throws IOException {
//1. 構造查詢條件 id = '1'
Criteria criteria = Criteria.empty()
.and("id").is("1");
//2. 封裝為Query對象
Query query = Query.query(criteria);
r2dbcEntityTemplate.select(query, StudentData.class)
.subscribe(System.out::println);
//卡住主線程
System.in.read();
}
```
#### 10.3.2.2 DatabaseClient
* **處理複雜的SQL**
* **代碼**:
```java=
@Autowired //貼近底層,寫原生SQL
DatabaseClient databaseClient;//數據庫客戶端
@Test
void databaseClient() throws IOException {
databaseClient.sql("select * from student where id = $1")
.bind(0, "1")
//抓取數據
.fetch()
//返回所有數據
.all()
.map(row -> {
return StudentData.builder()
.id(row.get("id").toString())
.age(Integer.parseInt(row.get("age").toString()))
.name(row.get("name").toString())
.nickName(row.get("nickname").toString())
.build();
})
.subscribe(System.out::println);
System.in.read();
}
```
* fetch():抓取數據
* all():返回所有數據
* first():返回第一筆
* one():返回一筆
### 10.3.3 手動創建連接池
* 繼承**AbstractR2dbcConfiguration**,重寫**connectionFactory()**
* 創建一個連接池
* **@EnableR2dbcRepositories**:開啟R2dbc的倉庫功能
* basePackages:設定Data的位置
* entityOperationsRef:指定使用的數據庫引擎
* 多數據源時建議配置
* 建立**ReactiveTransactionManager**:**開啟事物機制**
* 方法上貼`@Transactional`:
* @Transactional 的本質是 **「聲明一個帶有事務 Context 的數據流」**
*** 執行時**會**到當前的Context**中尋找事務
* **差異**:
* **傳統** (Imperative):依靠 `ThreadLocal<Connection>`。一旦切換線程,事務就丟失。
* **響應式** (Reactive):依靠 `Subscriber Context`。事務狀態就像隨身攜帶的「背包」,隨著數據流(Stream)傳遞,無論線程如何切換,**R2dbcTransactionManager** 都能從 Context 中找回同一個連接。
* **代碼**:
```java=
//開啟R2dbc的倉庫功能:JPA
@EnableR2dbcRepositories(basePackages = "com.nicolas.pulse.adapter.repository",
entityOperationsRef = "MainR2dbcEntityOperations")
@Configuration
public class R2dbcConfiguration extends AbstractR2dbcConfiguration {
@Bean("R2dbcConfigurationDB")
@Override
public ConnectionFactory connectionFactory() {
ConnectionFactory connectionFactory = ConnectionFactories.get(builder()
.option(DRIVER, "postgresql")
.option(HOST, "localhost")
.option(PORT, 5432)
.option(USER, "nicolas")
.option(DATABASE, "postgres")
.build()
);
ConnectionPoolConfiguration poolConfig = ConnectionPoolConfiguration.builder(connectionFactory)
.initialSize(5)
.maxSize(20)
.maxIdleTime(java.time.Duration.ofMinutes(30))
.build();
return new ConnectionPool(poolConfig);
}
@Primary
@Bean("MainR2dbcEntityOperations")
public R2dbcEntityOperations mainR2dbcEntityOperations(@Qualifier("MainConnectionFactory") ConnectionFactory connectionFactory) {
DatabaseClient databaseClient = DatabaseClient.create(connectionFactory);
return new R2dbcEntityTemplate(databaseClient, PostgresDialect.INSTANCE);
}
// 設定事務
@Primary
@Bean
public ReactiveTransactionManager transactionManager(@Qualifier("MainConnectionFactory") ConnectionFactory connectionFactory) {
return new R2dbcTransactionManager(connectionFactory);
}
}
```
### 10.3.4 數據封裝
* **與JPA配置方式相同**
* **schema**
```sql=
CREATE TABLE public.student (
id varchar NOT NULL,
"name" varchar NULL,
age int4 NULL,
nickname varchar NULL,
createtime timestamp NULL,
CONSTRAINT student_pk PRIMARY KEY (id)
);
```
* **代碼**:
```java=
@Table(name = "student")
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class StudentData {
@Id
@Column("id")
private String id;
@Column("age")
private Integer age;
@Column("name")
private String name;
@Column("nickname")
private String nickName;
@Column("createtime")
private Instant createTime; //響應式中日期的映射用 Instant 或 LocalXxx
}
```
* 注意:響應式中**日期的映射**用 `Instant` 或 `LocalXxx`
* **時間類型的不能使用Date**
### 10.3.5 CRUD 接口
* 繼承 **R2dbcRepository\<T, ID>**
* **代碼**
```java=
import org.springframework.data.r2dbc.repository.R2dbcRepository;
//最底層是 ReactiveCrudRepository
public interface StudentRepository extends R2dbcRepository<StudentData, Long> {
}
```
### 10.3.6 自定轉換器
* **用於Enum轉換**,代替**JPA**的`@Enumerated`
1. 創建轉換器:實現Converter
```java=
import io.r2dbc.spi.Row;
import org.springframework.core.convert.converter.Converter;
import org.springframework.data.convert.ReadingConverter;
@ReadingConverter
public class StudentConvert implements Converter<Row, StudentData> {
@Override
public StudentData convert(Row source) {
return StudentData.builder()
.id(source.get("id", String.class))
.name(source.get("name", String.class))
.age(source.get("age", Integer.class))
.nickName(source.get("nickname", String.class))
.createTime(source.get("createtime", LocalDateTime.class))
.teacherData(TeacherData.builder()
.id(source.get("teacher_id", String.class))
.name(source.get("teacher_name", String.class))
.build())
.build();
}
}
```
* `@ReadingConverter`:語意標記
* 不會自動註冊 Bean
* 告訴 Spring Data R2DBC 這個 Converter 是用來把 DB 讀出來的資料轉成 Entity 層型別的
2. 將轉換器**添加至轉換器列表中** 或者 `@Component`聲明為**Bean**
```java=
@EnableR2dbcRepositories()//開啟R2dbc的倉庫功能:JPA
@Configuration
public class R2dbcConfiguration extends AbstractR2dbcConfiguration {
@Bean
@Override
public R2dbcCustomConversions r2dbcCustomConversions() {
return R2dbcCustomConversions.of(PostgresDialect.INSTANCE, new StudentConvert());
}
}
```
* **R2dbcCustomConversions**:**管理**和**註冊**數據庫與應用程序之間的**自定義數據類型轉換器**
## 10.4 最佳實踐
* **SpringDataR2DBC**,基礎的CRUD用R2dbcRepository提供好了
* **自訂複雜的SQL**(`單表`):**@Query**
* **多表查詢**複雜結果集:
* **DatabaseClient**自訂SQL及結果封裝
* @Query+自訂Converter實現結果封裝
* **1-1**、**1-N**關聯關係的封裝都需要**自訂結果集**的方式
* SpringDataR2DBC:
* 自訂Converter指定結果封裝
* **DatabaseClient**(`貼近底層的操作`)
### 10.4.1 一對多
* 使用 **DatabaseClient**(`貼近底層的操作`)
* **代碼**:
```java=
@Autowired //貼近底層,寫原生SQL
DatabaseClient databaseClient;//數據庫客戶端
@Test
void oneToMany() throws IOException {
teacherRepositoryPeer.findAll().subscribe(System.out::println);
databaseClient.sql("""
select t.id tid,t.name tname,s.* from teacher t
left join student s on t.id =s.teacher_id
order by t.id
""")
.fetch()
.all()
//按條件將獲取批次數據
//如果下一個值與上一個值的判定不同,就會開啟新的buffer => 注意:數據需要先被排序過
.bufferUntilChanged(a -> a.get("tid").toString())
.map(listMap -> TeacherData.builder()
.id(listMap.get(0).get("tid").toString())
.name(listMap.get(0).get("tname").toString())
.studentData(listMap.stream().map(map -> StudentData.builder()
.id(Optional.of(map.get("id")).map(Object::toString).orElse(null))
.name(map.get("name").toString())
.nickName(map.get("nickname").toString())
.createTime(LocalDateTime.parse(map.get("createtime").toString(), DateTimeFormatter.ISO_LOCAL_DATE_TIME))
.age(Integer.parseInt(map.get("age").toString()))
.build())
.toList())
.build())
.subscribe(System.out::println);
System.in.read();
}
```
* 使用 **bufferUntilChanged(條件)**:**按條件將獲取批次數據**
* **重點**:**如果下一個值與上一個值的判定不同**,就會開啟新的buffer(`新批次`)
* **效果**:**獲取批次數據**
* **注意**:數據需要`先排序`
## 10.5 審計
* `@EnableR2dbcAuditing`:**開啟審計功能**
* 實現**ReactiveAuditorAware\<T>**:用於`@CreatedBy`、`@LastModifiedBy`
* 建議繼承**Persistable\<T>**:判斷是不是新Row
* **預設判斷**:是存入的數據有無ID
* 有ID:舊數據
* 無ID:舊數據
* JPA是**存入前**會去DB查有ID是否存在
* @LastModifiedDate、 @CreatedDate,只有支援下列類型:
* java.time.LocalDateTime, java.time.LocalDate, java.time.LocalTime, java.time.Instant, java.util.Date, java.lang.Long, long
### 10.5.1 繼承Persistable\<T>
* **代碼**:account
```java=
@Table(value = DbMeta.AccountData.TABLE_NAME)
public class AccountData implements Persistable<String> {
@Id
@Column(DbMeta.AccountData.COL_ID)
private String id;
@Column(DbMeta.AccountData.COL_NAME)
private String name;
@Column(DbMeta.AccountData.COL_SHOW_NAME)
private String showName;
@Column(DbMeta.AccountData.COL_PASSWORD)
private String password;
@Column(DbMeta.AccountData.COL_IS_ACTIVE)
private boolean isActive;
@CreatedBy
@Column(DbMeta.AccountData.COL_CREATED_BY)
private String createdBy;
@LastModifiedBy
@Column(DbMeta.AccountData.COL_UPDATED_BY)
private String updatedBy;
@CreatedDate
@Column(DbMeta.AccountData.COL_CREATED_AT)
private Instant createdAt;
@LastModifiedDate
@Column(DbMeta.AccountData.COL_UPDATED_AT)
private Instant updatedAt;
@Column(DbMeta.AccountData.COL_REMARK)
private String remark;
@Override
public boolean isNew() {
return !StringUtils.hasText(createdBy)
&& !StringUtils.hasText(updatedBy)
&& ObjectUtils.isEmpty(createdAt)
&& ObjectUtils.isEmpty(updatedAt);
}
}
```
# 11. Spring Security
* pom.xml
```xml=
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
```
## 11.1 默認配置
* 默認行為:所有請求都需要**登錄**(`認證、Authenticated`)才能夠訪問
* **SecurityAutoConfiguration**:**非響應式**
* 導入 SecurityFilterChain 組件:
* 默認所有請求都要登陸才可以
* 默認登錄頁面
* 需要添加`@EnableWebSecurity`,才可以自定配置(`響應式默認導入類似註解,可以不需要添加`)
* SecurityFilterAutoConfiguration
* MethodSecurityAspectJAutoProxyRegistrar
* **ReactiveSecurityAutoConfiguration**:**響應式**
* 導入 **EnableWebFluxSecurityConfiguration** 配置
* **@EnableWebFluxSecurity**註解 `默認導入`
* ServerHttpSecurityConfiguration
* WebFluxSecurityConfiguration
* ReactiveOAuth2ClientImportSelector
## 11.2 配置SecurityFilterChain
* 使用 **ServerHttpSecurityConfiguration** 中創建的 **ServerHttpSecurity**,來`配置SecurityFilterChain`
* 作用:對Security中的某些事項進行配置_
* 禁用 csrf:沒有禁用,會去檢查csrf_token
* .csrf(ServerHttpSecurity.CsrfSpec::disable)
* 禁用默認的表單登陸
* .formLogin(ServerHttpSecurity.FormLoginSpec::disable)
* 路徑設定:哪些要認證、哪些要授權的路徑配置
* .authorizeExchange((authorize)->{})
* 設定 Basic Auth 登入
* .httpBasic(Customizer.withDefaults())
* 自定認證規則
* .authenticationManager()
* 代碼:
```java
@Configuration
public class SecurityConfiguration {
private final ReactiveUserDetailsService reactiveUserDetailsService;
public SecurityConfiguration(@Qualifier("ReactiveUserDetailsServiceImpl") ReactiveUserDetailsService reactiveUserDetailsService) {
this.reactiveUserDetailsService = reactiveUserDetailsService;
}
@Bean
SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {
return http
//定義哪些請求需要認證、角色
.authorizeExchange((authorize) -> {
//該路經所有人都可以訪問
authorize.pathMatchers("/hello").permitAll();
//剩下路徑都需要認證(登陸)
authorize.anyExchange().authenticated();
}
)
//Basic Auth 登陸
.httpBasic(Customizer.withDefaults())
//X. 禁用默認的表單登陸
.formLogin(ServerHttpSecurity.FormLoginSpec::disable)
//3. 安全控制:禁用csrf(跨站請求偽造)防護,當前不需要使用,改用JWT或OAuth2.0
.csrf(ServerHttpSecurity.CsrfSpec::disable)
//4. 配置認證規則(自定認證器):從DB中獲取用戶
// 使用 ReactiveAuthenticationManager 去查詢用戶信息
// UserDetailsRepositoryReactiveAuthenticationManager 去DB中獲取用戶信息
// 需要 ReactiveUserDetailsService
// 結論:實現一個 ReactiveUserDetailsService:響應式的用戶查詢服務 (對應 UserDetailsService)
//實現ReactiveUserDetailsService,就不會有默認密碼
.authenticationManager(getUserDetailsRepositoryReactiveAuthenticationManager())
.build();
}
@Bean
public UserDetailsRepositoryReactiveAuthenticationManager getUserDetailsRepositoryReactiveAuthenticationManager() {
UserDetailsRepositoryReactiveAuthenticationManager authenticationManager =
new UserDetailsRepositoryReactiveAuthenticationManager(reactiveUserDetailsService);
authenticationManager.setPasswordEncoder(passwordEncoder());
return authenticationManager;
}
@Bean
public PasswordEncoder passwordEncoder() {
return new BCryptPasswordEncoder();
}
}
```
### 11.2.1 自定認證規則-說明
* **設定在ServerHttpSecurity.authenticationManager(ReactiveAuthenticationManager manager)**
* 使用 **ReactiveAuthenticationManager**:查詢用戶信息
* **UserDetailsRepositoryReactiveAuthenticationManager**(`ReactiveAuthenticationManager的實現類`):去DB中獲取用戶信息
* 獲取用戶信息的部分:需要實現**ReactiveUserDetailsService**中**findByUsername**,返回一個Mono\<UserDetails>(`需要將Account的資訊在轉換成UserDetails`)
### 11.2.2 自定認證規則-從DB獲取用戶信息
* DTO
* Account
```java=
public class AccountData {
@Id
@Column("id")
private String id;
@Column("name")
private String name;
@Column("password")
private String password;
@Column("age")
private Integer age;
@Column("nickname")
private String nickName;
@Column("createtime")
private LocalDateTime createTime;
@Transient
@Builder.Default
private List<RoleData> roleDataList = new ArrayList<>();
}
```
* Role
```java=
public class RoleData {
@Id
@Column("id")
private String id;
@Column("name")
private String name;
@Column("privileges")
private String privileges;
@Transient
@Builder.Default
private List<AccountData> accountDataList = new ArrayList<>();
}
```
* Entity
* Account
```java=
public class Account {
private String id;
private String name;
private String password;
private Integer age;
private String nickName;
private LocalDateTime createTime;
private List<Role> roleList;
}
```
* Role
```java=
public class Role {
private String id;
private String name;
private List<String> privileges;
}
```
* 代碼:需JPA的概念不同,需要**一次取回所有信息**(`帳號、角色、權限`)
```java=
@Repository("AccountRepositoryImpl")
public class AccountRepositoryImpl implements AccountRepository {
private final AccountRepositoryPeer accountRepositoryPeer;
private final DatabaseClient databaseClient;
public AccountRepositoryImpl(AccountRepositoryPeer accountRepositoryPeer, DatabaseClient databaseClient) {
this.accountRepositoryPeer = accountRepositoryPeer;
this.databaseClient = databaseClient;
}
@Override
public Mono<Account> findByName(String name) {
return databaseClient.sql("""
select a.id a_id,a."name" a_name,a.password,a.age ,a.nickname ,a.createtime ,r.id r_id,r."name" r_name,r."privileges" from account a
left join account_role ar on a.id = ar.account_id
left join "role" r on r.id= ar.role_id
Where a.name = $1
order by a.id
""")
.bind(0, name)
//獲取數據
.fetch()
//返回所有數據
.all()
.collectList()
.map(this::toAccountData)
.map(AccountDataMapper::dataToDomain);
}
private AccountData toAccountData(List<Map<String, Object>> rowList) {
List<RoleData> roleDataList = rowList.stream().filter(row -> Objects.nonNull(row.get("r_id")))
.map(row -> RoleData.builder()
.id(row.get("r_id").toString())
.name(row.get("r_name").toString())
.privileges(row.get("privileges").toString())
.build())
.toList();
return AccountData.builder()
.id(rowList.get(0).get("a_id").toString())
.name(rowList.get(0).get("a_name").toString())
.password(rowList.get(0).get("password").toString())
.age(Integer.parseInt(rowList.get(0).get("age").toString()))
.nickName(rowList.get(0).get("nickname").toString())
.createTime(LocalDateTime.parse(rowList.get(0).get("createtime").toString(), DateTimeFormatter.ISO_LOCAL_DATE_TIME))
.roleDataList(roleDataList)
.build();
}
}
```
### 11.2.3 自定認證規則-實現 ReactiveUserDetailsService
* **作用**:**SpringSecurity獲取用戶信息的接口**
* 使用**SpringSecurity**提供的**User**創建**UserDetails**
* `import org.springframework.security.core.userdetails.User;`
* `import org.springframework.security.core.userdetails.UserDetails;`
* **代碼**:
```java=
import com.nicolas.webflux1.entity.Role;
import com.nicolas.webflux1.service.AccountRepository;
import org.springframework.security.core.userdetails.ReactiveUserDetailsService;
import org.springframework.security.core.userdetails.User;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.security.core.userdetails.UsernameNotFoundException;
import reactor.core.publisher.Mono;
public class ReactiveUserDetailsServiceImpl implements ReactiveUserDetailsService {
private final AccountRepository accountRepository;
public ReactiveUserDetailsServiceImpl(AccountRepository accountRepository) {
this.accountRepository = accountRepository;
}
@Override
public Mono<UserDetails> findByUsername(String username) {
return accountRepository.findByName(username)
.switchIfEmpty(Mono.error(new UsernameNotFoundException("User name not found, name '%s'".formatted(username))))
.map(account -> User.builder()
.username(account.getName())
.password(account.getPassword())
.roles(account.getRoleList().stream().map(Role::getName).toArray(String[]::new))
.authorities(account.getRoleList().stream().flatMap(role -> role.getPrivileges().stream()).distinct().toArray(String[]::new))
.build());
}
}
```
### 11.2.4 自定認證規則-創建 UserDetailsRepositoryReactiveAuthenticationManager
* 創建**自定認證器**
* **代碼**:配置**獲取User**、**加密器**
```java=
@Bean
public UserDetailsRepositoryReactiveAuthenticationManager getUserDetailsRepositoryReactiveAuthenticationManager() {
UserDetailsRepositoryReactiveAuthenticationManager authenticationManager =
new UserDetailsRepositoryReactiveAuthenticationManager(reactiveUserDetailsService);
//配置加密器
authenticationManager.setPasswordEncoder(passwordEncoder());
return authenticationManager;
}
@Bean
public PasswordEncoder passwordEncoder() {
return new BCryptPasswordEncoder();
}
```
* 對應MVC的DaoAuthenticationProvider
## 11.3 方法註解式權限驗證
* **開啟功能**:添加 **@EnableReactiveMethodSecurity**
* **非響應式**:
* SpringBoot2:@EnableGlobalMethodSecurity
* SpringBoot3:@EnableMethodSecurity
* **代碼**:
```java=
@EnableReactiveMethodSecurity// 開啟方法權限驗證
@Configuration
public class SecurityConfiguration {
@Bean
SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {
return http.build();
}
}
```
* 方法添加 @PreAuthorize
* 支持複雜的SpEL
* `@PreAuthorize("@mySecurityExpression.hasAuthority('test')")`
* mySecurityExpression:Bean名稱
* hasAuthority:該Bean中的方法
* 使用 **SecurityExpressionOperations** 接口中的方法,方法:
* **hasRole()**
* **hasAuthority()**
* **isAuthenticated()**
* **範例**:
* @PreAuthorize("hasRole('ADMIN')"):
* 角色需要有 ADMIN
* @PreAuthorize("hasAuthority('READ_PRIVILEGE')")
* 權限需要有 READ_PRIVILEGE
* @PreAuthorize("isAuthenticated()")
* 是否已登入
* @PreAuthorize("#username == authentication.name")
* 使用 # 獲取方法參數或檢查 authentication 內容。
* 它是 **Authentication** 對象的 getName() 方法的返回值。
* @PreAuthorize("hasRole('USER') and #id == principal.id")
* 使用 # 獲取方法參數或檢查 authentication 內容。
* **principal** 是 **Authentication** 對象中的主體(Principal)部分
## 11.4 自定義過濾器
* 把自定義過濾器**加入過濾鏈**的話,建議 **自行 new** 自定義過濾器
* 自定義過濾器使用 **@Component**,就會**被自動加入**到過濾鏈(`避免重複加入`)
* 使用`addFilterAt`將自定義過濾器加入到**過濾鏈**
* `addFilterBefore` 沒有生效
## 11.5 使用JWT或Oauth2
* 使用http`.oauth2ResourceServer()`
* 使用Oauth2的話,建立一個**ReactiveJwtDecoder**的Bean,有兩的實現類可以用,當中就可以設定**金鑰位置**(`url\檔案路徑`)或**金鑰字串**
### 11.5.1 使用JWT
* **方式**ㄧ:**實作ReactiveAuthenticationManager**,用於處理**自行產生的JWT**
* 參考oauth2ResourceServer中**ReactiveAuthenticationManager**實作 => **JwtReactiveAuthenticationManager**
* **核心項目**:
* 驗證Token:`.jwtDecoder()`
* 對應的介面**ReactiveJwtDecoder**
* 將Token轉換成Authentication:`.jwtAuthenticationConverter()`
* **配置**:
```java
return http
.oauth2ResourceServer(oauth2 -> oauth2
.jwt(jwt -> jwt.authenticationManager(customerJwtReactiveAuthenticationManager))
)
```
* **方式二**:對應上述**兩個核心項**目,都可以在`jwt`後進行**鏈式複寫**
* `.jwtDecoder()`:**驗證Token**
* `.jwtAuthenticationConverter()`:**將Token轉換成Authentication**
### 11.5.2 JWT 異常處理
* **現象描述**:在配置 SecurityWebFilterChain 時,僅在全域 `.exceptionHandling()` 配置處理器對 JWT 認證失敗**無**,必須在 `.oauth2ResourceServer().jwt()` 內部**重複配置**
* **核心原因**:**職責分離與就近原則**
* Spring Security 的過濾器鏈 (Filter Chain) 採用模組化設計:
* **OAuth2 專屬過濾器**:當請求進入 JWT 驗證邏輯時,它**擁有自己的異常處理上下文**
* **優先權**:`jwt()` 內部的 `authenticationEntryPoint` 優先級高於全域配置。如果內部未指定,它**會觸發 OAuth2 的預設行為**(`僅回傳 401 狀態碼`),而**不會「向上拋出」給全域處理器**
* **完全配置模板**:確保所有安全漏洞(無效 Token、過期 Token、權限不足、匿名存取)都能統一口徑回傳 JSON
```java=
return http
.oauth2ResourceServer(oauth2 -> oauth2
.jwt(jwt -> jwt
.authenticationManager(...)
.authenticationEntryPoint(securityExceptionHandler)
)
)
.exceptionHandling(e -> e
.accessDeniedHandler(securityExceptionHandler)
.authenticationEntryPoint(securityExceptionHandler)
)
.build();
```
# 12. AOP
* **Spring AOP 在 WebFlux 與在傳統 Spring MVC / Web 的行為是一樣的**
## 12.1 核心相同點
* 攔截機制:都是基於 Spring 代理(Proxy)或 AspectJ,**攔截 Spring Bean 方法**
* 觸發時機:方法呼叫瞬間(entry / exit)
* 可得資訊:方法反射、Class、參數、annotation
* 使用方式:@Around, @Before, @After,切點表達式一樣
* 換句話說,AOP 的**運作「方法攔截」邏輯**跟 **WebFlux / Web 無關**,都是代理 + 反射 + 方法呼
## 12.2 不同點(只在外層環境)
| 差異點 | WebFlux | Spring MVC / Web |
| -------------- | ---------------------------------------------------- | ----------------------------- |
| 返回值 | Mono / Flux / 非阻塞 Publisher | 普通物件 / Servlet response |
| Thread 模型 | 非阻塞,可能 thread hopping | 一個 request 對應一條 thread |
| ThreadLocal 依賴 | 不可靠,因 pipeline 會切換 thread | 可可靠使用 ThreadLocal |
| pipeline 內的操作符 | 不能被 AOP 切,需用 `.doOnSubscribe` / `.doFinally` 或 Hooks | 不存在類似 pipeline,方法內程式可直接切 |
| 事件流管理 | Reactive pipeline / non-blocking | blocking / thread-per-request |
## 12.3 實務結論
* 方法層級的 log / metrics / annotation 檢查 → WebFlux 與 Web 完全一致
* 事務 / ThreadLocal / pipeline lambda → WebFlux 需額外處理(Reactive Context / Hooks / 包裝 pipeline)
* **ThreadLocal**:MVC的**事務管理**採用ThreadLocal
# 13. ResponseEntity
* 參考網址:https://docs.spring.io/spring-framework/reference/web/webflux/controller/ann-methods/responseentity.html
## 13.1 表格
| 程式碼結構 | 狀態碼 & 標頭 (控制時機) | 回應體 (Body 傳輸方式) | 核心應用場景 | 控制力與優勢 |
| :--- | :--- | :--- | :--- | :--- |
| **`Mono<T>` 或 `Flux<T>`** | 預設為 `200 OK`,由框架自動處理。 | 數據準備好後非同步發送。 | **最簡潔的寫法**。適用於不需自定義狀態碼的 GET 請求。 | **控制力最低**;但程式碼最簡單、最直接。 |
| **`Mono<ResponseEntity<T>>`** | **非同步**:**等待操作結果後**,一次性發送。 | **非同步**:等待操作結果後,一次性發送。 | **單一資源操作**:建立、更新、查詢單一資源,且需要根據結果動態返回 `201`、`404` 等狀態碼。 | **控制力最高**:基於非同步結果決定**所有**回應細節。 |
| **`ResponseEntity<Mono<T>>` 或 `ResponseEntity<Flux<T>>`** | **立即**發送 (`200 OK` 或預設值)。 | **稍後**非同步串流發送。 | **大型數據串流**:需要立即建立連線並開始傳輸數據,**而不在乎後續狀態碼變化**。 | **串流優勢**:回應速度快,但狀態碼固定。 |
| **`Mono<ResponseEntity<List<T>>>`** | **非同步**:等待 `Flux` 收集完畢後,一次性發送。 | **非同步**:作為單一 `List` 傳輸。 | **列表後置控制**:需要等待整個列表結果,以便處理列表為空時的狀態碼(如 `204 No Content`)。 | **次高控制力**:犧牲串流,換取對完整列表狀態碼的控制。 |
## 13.2 實際開發
* 查詢單個:使用 Mono\<ResponseEntity\<T>> (處理 404)。
* 建立/修改:使用 Mono\<ResponseEntity\<T>> (返回 201/200)
* **高效串流**或**查詢多個**:使用 Flux\<T> 或 ResponseEntity\<Flux\<T>>
* 即時通知/SSE:使用 Flux\<T> 配合 MediaType.TEXT_EVENT_STREAM_VALUE。
# X. MDC
* MDC是基於ThreadLocal,在MVC下,每一個Request都在同一Thread開始與結束
* 在WebFlux,每一個Request不會在同一Thread開始與結束,需要解決此問題
* **設定該次Request的traceId**:
* 使用Reactor的contexView,用於在整個Request中可以獲取到traceId
* **代碼**:使用WebFilter
```java=
@Component
@Order(-1)
public class MdcFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
String traceId = exchange.getRequest().getId();
return chain.filter(exchange)
.contextWrite(context -> context.put("traceId", traceId));
}
}
```
* traceId來自每次Request時,webflux為該次Request設定的Id
* 引入 **context-propagation**,處理ThreadLoacl問題
* Micrometer 的 **context-propagation** 透過註冊 **ThreadLocalAccessor**,讓 Reactor Context 自動同步到 MDC,並由Hooks.enableAutomaticContextPropagation() 全域啟用自動傳遞。
* reactor整合**context-propagation**
* **整合類**:**ContextPropagationSupport**
* **引入**
```xml=
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>context-propagation</artifactId>
</dependency>
```
* **代碼**:處理ThreadLoacl
```java=
@Configuration
public class PropagationConfiguration {
@PostConstruct
public void registerMdc() {
ContextRegistry.getInstance().registerThreadLocalAccessor(
mdcProperties.getTraceId(), // 為這個 ThreadLocal accessor 命名(key)
() -> MDC.get(mdcProperties.getTraceId()), // 定義如何讀取 ThreadLocal
traceId -> MDC.put(mdcProperties.getTraceId(), traceId), // 定義如何寫入 ThreadLocal
() -> MDC.remove(mdcProperties.getTraceId()) // 定義如何清除 ThreadLocal
);
Hooks.enableAutomaticContextPropagation();
}
}
```
* 當 Reactor Context 裡有 `"traceId"` 時,請在需要的 thread 也幫我放進 MDC(ThreadLocal)
###### tags:`SpringWebflux` `r2dbc` `SpringSecurity` `Reactive Stream`