--- title: 'SpringBoot3 Webflux - 響應式編程 ' disqus: hackmd --- # 1. WebFlux * **WebFlux**:**底層**完全基於 **netty** + **Reactor** + **Springweb** 完成一個全異步非阻塞Web響應式框架 * 底層:異步+訊息隊列(內存)+事件回調機制 => 整套系統 * 優點:使用少量資源處理大量請求 * **官網**:https://docs.spring.io/spring-framework/reference/web/webflux.html ## 1.1 組件對比 | API 功能 | Servlet| WebFlux | | -------- | -------- | -------- | | 前端控制器 | DispatcherServlet| DispatcherHandler| | 處理器 | Controller | WebHandler/Controller | | 請求、響應 | **ServletRequest**<br>**ServletReponse**| **ServerWebExchange**| | 過濾器 |Filter(HttpFilter)| WebFilter | | 異常處理器|HandlerExceptionReslover|DispatcherExceptionHandler| | Web配置 | @EnableWebMVC | @EnableWebFlux | | 自定義配置 | WebMvcConfigurer | WebFluxConfigurer | | 返回結果 |任意 | **Mono**、**Flux**、任意 | | 發送REST請求 | RestTemplate | WebClinet | ## 1.2 引入WebFlux * **pom.xml**: ```xml= <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> ``` * **Context響應式上下文數據傳遞**:由下游**傳播**給上游 * 阻塞式:瀏覽器 --> Controller --> Service --> DAO * 響應式:**DAO**(`數據源查詢對象『數據發佈者』`) --> Service --> Controller --> 瀏覽器 # 2. Reactor核心-HttpHandler原生API * **代碼**:使用HttpHandler、HttpServer**原生Reactor Netty API**編寫一**個伺服器** ```java= import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.http.server.reactive.HttpHandler; import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpResponse; import reactor.core.publisher.Mono; import reactor.netty.http.server.HttpServer; import java.io.IOException; import java.nio.charset.StandardCharsets; public class Webflux1Application { public static void main(String[] args) throws IOException { //1.創建一個處理Http請求的處理器,參數:請求、響應,返回值 Mono<Void>:代碼處理完成的信號 HttpHandler httpHandler = (ServerHttpRequest request, ServerHttpResponse response) -> { System.out.println(Thread.currentThread().getName() + " 請求進來:" + request.getURI()); //編寫請求的處理業務 // response.getHeaders();//獲取Header // response.getStatusCode();//獲取響應狀態碼 // response.getCookies();//獲取Cookie // response.bufferFactory();//buffer工廠 // response.writeWith();//把Xxx寫出 // response.setComplete();//響應結束 //數據的發佈者:Mono<DataBuffer>\Flux<DataBuffer> //1. 創建 響應數據的DataBuffer DataBufferFactory dataBufferFactory = response.bufferFactory(); //2. 寫數據,數據Buffer DataBuffer dataBuffer = dataBufferFactory.wrap((request.getPath() + " hello! ").getBytes(StandardCharsets.UTF_8)); //3. 需要一個DataBuffer的發佈者 return response.writeWith(Mono.just(dataBuffer)); }; //2、起動一台伺服器,監聽8080端口,接收數據,拿到資料交給 HttpHandler 進行請求處理 ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler); //3. 啟動Netty伺服器 HttpServer.create() .host("localhost") .port(8080) .handle(adapter)//指定一個處理器 .bindNow();//現在綁定 System.out.println("啟動,監聽8080,接收請求"); System.in.read(); System.out.println("End"); } } ``` # 3. DispatcherHandler 原理 * **SpringMVC**:DispatcherServlet * **SpringWebFlux**:**DispatcherHandler** ## 3.1 請求處理流程 * **三個屬性**: * HandlerMapping:請求映射處理器 * 保存每個請求由哪個方法進行處理 * HandlerAdapter:處理器的適配器 * 反射執行目標方法 * HandlerResultHandler:處理結果的處理器 * **源碼**: ```java= @Nullable private List<HandlerMapping> handlerMappings; @Nullable private List<HandlerAdapter> handlerAdapters; @Nullable private List<HandlerResultHandler> resultHandlers; ``` * DispatcherServlet有一個**doDispatch()**,來處理所有請求 * DispatcherHandler有一個**handle()**,來處理所有請求 ```java= @Override public Mono<Void> handle(ServerWebExchange exchange) { if (this.handlerMappings == null) { return createNotFoundError(); } if (CorsUtils.isPreFlightRequest(exchange.getRequest())) { return handlePreFlight(exchange); } return Flux.fromIterable(this.handlerMappings)//拿到所有的handlerMappings .concatMap(mapping -> mapping.getHandler(exchange))//找每一個mapping,看誰能夠處理這個請求的handlerAdapter .next()//直接處發獲取元素,拿到流的第一個符合的元素 .switchIfEmpty(createNotFoundError())//找不到元素,觸發404錯誤 .onErrorResume(ex -> handleResultMono(exchange, Mono.error(ex))//異常處理,一旦前面發生異常,調用handleResultMono()處理 .flatMap(handler -> handleRequestWith(exchange, handler));//調用方法處理請求,得到響應結果 } ``` 1. **請求**和**響應**都封裝在**ServerWebExchange**中,由**handle方法**進行處理 2. 如果**沒有任何請求映射處理器**,直接返回一個**Mono.error** (`創建一個未找到的錯誤`) 的終結流 ```java= private <R> Mono<R> createNotFoundError() { return Mono.defer(() -> { Exception ex = new ResponseStatusException(HttpStatus.NOT_FOUND); return Mono.error(ex); }); } ``` * Mono.defer():該流有訂閱,且流被激活後,動態調用 * 延遲創建 Mono * 每次被訂閱時調用,重新執行邏輯,創建新的Mono 4. **跨域工具**,是否跨域請求,跨域請求檢查是否複雜跨域,需要預檢請求; * CorsUtils.isPreFlightRequest 6. **Flux串流操作** 1. 找到**HandlerMapping** 2. 獲取**handlerAdapter** 3. Adapter**處理請求** * **期間的的錯誤**由 **onErrorResume()** 觸發回調感知 * **源碼兩個核心方法**: * **handleRequestWith**:編寫了**handlerAdapter**怎麼**處理清求** * **handleResult**:處理返回值 # 4. 註解式Controller * **說明**:SpringWebFlux**向下兼容**原來SpringMVC`大多數注解和API` ## 4.1 基本範例 * **代碼**: ```java= @RestController public class HelloController { //WebFlux:向下兼容原來SpringMVC大多數注解和API @GetMapping("/hello") public String hello(@RequestParam(value = "key", defaultValue = "AA") String key) { return "hello world! " + key; } //推薦方式 //1. 返回單個數據Mono //2. 返回多個數據Flux @GetMapping("/haha") public ResponseEntity<Mono<String>> haha() { return ResponseEntity.ok(Mono.just("haha")); } @GetMapping(value = "/huhu", produces = MediaType.APPLICATION_JSON_VALUE) public ResponseEntity<Flux<String>> huhu() { return ResponseEntity.ok(Flux.just("huhu1", "huhu2")); } } ``` ## 4.2 Server-Sent Events 服務器推送 * **說明**:**服務器推送** * **服務器推送的專用類**:org.springframework.http.codec.**ServerSentEvent** * SSE **的核心數據結構包括**: * 數據 (data):每個事件的主要內容。 * 事件類型 (event):標識事件類型的名稱。 * ID (id):唯一標識該事件的標識符。 * 重試時間 (retry):通知客戶端在連接斷開時重試的間隔時間。 * 備註(comment) * **代碼**:每500毫秒,推送一個數據到前端 ```java= //3. 配合Flux,完成SSE(Server Send Event)服務端事件推送 @GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public ResponseEntity<Flux<ServerSentEvent<String>>> sse() { return ResponseEntity.ok(Flux.range(1, 10) .map(i -> //構建一個SSE對象 ServerSentEvent.builder("ha " + i) .id(i + " id")//數據編號 .comment("her-" + i)//備註 .event("haha-" + i)//表示那一個事件推送 .build() ) .delayElements(Duration.ofMillis(500)) ); } ``` * **指定返回的內容類型**為 **text/event-stream** (`SSE 的標準 MIME 類型`) * SSE與WebScoket的區別: * SSE:單工 * 請求過去以後,等待**服務端源源不絕的數據** * WebScoket:雙工 * 連接建立後,可以任意交互 ## 4.3 檔案上傳 * **文檔**:https://docs.spring.io/spring-framework/reference/web/webflux/controller/ann-methods/multipart-forms.html * 使用**FilePart**,`MultipartFile也可以使用` * FilePart的效率更好,底層採用`零拷貝` * **代碼**: ```java= @GetMapping("/upload") public String uploadMVC(MultipartFile也可以使用 multipartFile) { return "OK"; } @GetMapping("/upload2") public String uploadFlux(FilePart filePart) { filePart.filename(); filePart.transferTo(Path.of("/"));//高速,零拷貝 return "OK"; } ``` # 5. 錯誤響應-全局異常處理 * **@ControllerAdvice方式** 和 **ErrorAttributes + AbstractErrorWebExceptionHandler** 只會有**其中一個生效** * **ErrorAttributes + AbstractErrorWebExceptionHandler**是`預設` * **@ControllerAdvice方式** 是用`配置`開啟**預設處理類** 或 **自行使用該註解** ## 5.1 @ControllerAdvice方式-應用層 * **說明**:SpringWebFlux**向下兼容**原來SpringMVC`大多數注解和API` * **方式**:基於**流**的**回調機制** * MCV:是基於AOP * 開啟**默認**的 **@ControllerAdvice** 的處理類 * **配置**:`spring.webflux.prombledetails.enabled=true` * **默認的處理類**(**ResponseEntityExceptionHandler**) * 建議參考**ResponseEntityExceptionHandler** * 自動建立 ProblemDetail * 自動設定 status, title, detail, instance * 自動解析 MessageSource 進行 i18n * 自動對應常見 WebFlux 例外(像 MissingRequestValueException, UnsupportedMediaTypeStatusException 等) * **代碼**:**使用方式不變** ```java= //@ResponseBody//這個類別中的所有方法預設都會回傳 HTTP 回應的正文(Response Body) //@ControllerAdvice @RestControllerAdvice// @ControllerAdvice + @ResponseBody public class GloabExceptionHandler { @ExceptionHandler(ArithmeticException.class) public String handlerArithmeticException(ArithmeticException exception) { System.out.println("數學運算異常 " + exception.getMessage()); return "a"; } } ``` * **@RestControllerAdvice** = **@ControllerAdvice** + **@ResponseBody** * **@ExceptionHandler**:捕獲異常 * **ErrorResponse**:快速構建錯誤響應(`底層為ProblemDetail`) * **ProblemDetail**:自定義PD返回 ## 5.2 ErrorAttributes + AbstractErrorWebExceptionHandler - Reactor 層(最低層) * spring webflux預設 * 攔截所有未被 ControllerAdvice 捕獲的錯誤 * 實做ErrorAttributes和AbstractErrorWebExceptionHandler * ErrorAttributes:產生Error的Response * 默認的DefaultErrorAttributes需要,注意ErrorAttributeOptions,他會移除回傳Map中的某些Key * **ErrorProperties**:可以調整返回的Key的配置 * **參數**: * server.error.include-binding-errors:返回@Vaild的錯誤結果 * server.error.include-message:返回錯誤的Msg * server.error.include-stacktrace:返回stacktrace * **參數值**: | 參數值 | 行為說明 | 常見用途 | | :--------- | :------------ | :--------- | | `NEVER` | 永不輸出該屬性 | 生產環境 | | `ALWAYS` | 永遠輸出 | 開發、除錯用 | | `ON_PARAM` | 只有在請求帶特定參數時輸出 | 兼顧安全與調試靈活性 | * AbstractErrorWebExceptionHandler:將訊息回傳 # 6. 註解開發 ## 6.1 目標方法參數 * **文檔**:https://docs.spring.io/spring-framework/reference/web/webflux/controller/ann-methods.html * **ServerWebExchange**:封裝了請求和回應對象的對象,獲取自定義數據,自定義響應 * **ServerHttpRequest**:請求對象 * **ServerHttpResponse**:響應對象 * **WebSession**:訪問Session對象 * org.springframework.http.HttpMethod:請求方式 * java.util.Locale:國際化 * java.util.TimeZone + java.time.ZoneId:時區 * @PathVariable:路徑變量 * @MatrixVariable:矩陣變量 * @RequestParam:請求參數 * @RequestHeader:請求頭 * @CookieValue:獲取Cookie * @RequestBody:獲取請求体 * HttpEntity\<B>:封裝後的請求對象 * @RequestPart:form-data * Errors or BindingResult:數據校驗 * @RequestAttribute:轉發請求的請求域數據 * 其他:所有對象都能作為參數 * 基本類型,等於標註@RequestParam * 對象類型,等於標註@ModelAttribute ## 6.2 返回值 * @ResponseBody:把響應數據寫出,如果是對象自動轉換成Json * **ResponseEntity\<B>**:支持快捷**自定義響應內容** * HttpHeaders:沒有響應內容,只有響應頭 * **ErrorResponse**:快速構**建錯誤響應** * **ProblemDetail**:SpringBoot3 * String:與之前使用規則一樣 * View:與之前使用規則一樣:與之前使用規則一樣 * java.util.Map \ org.springframework.ui.Model:與之前使用規則一樣 * @ModelAttribute:與之前使用規則一樣 * **Rendering**:一種view對象 * 新版的**頁面跳轉API**(`不能標註@ResponseBody`) ```java= //類要標註@Contoller, @GetMapping public Rendering rendering() { Rendering.redirectTo("/ad");//重定向到目前項目根路徑下的ad return Rendering.redirectTo("https://www.google.com").build();//重定向到google } ``` * void:代表響應完成信號 * **Flux\<ServerSentEvent>**:使用`text/event-stream`完成SSE效果 * 其他值:未在上述列表的返回值,當成頁面的數據 # 7. RequestContext * **作用**:儲存和管理與**目前 HTTP 請求相關的上下文資訊的機制**。它可以在`請求處理的各個階段共享數據` * **SpringMVC**:**RequestContextHolder** 一種靜態方法存取目前請求的上下文資訊: * 它使用 **ThreadLocal** 儲存請求上下文,**確保資料與當前線程綁定**。 * 適合傳統的基於 Servlet 的 Web 應用 * **獲取當前上下文**: ```java= ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); HttpServletRequest request = attributes.getRequest(); ``` * **SpringWebFlux**:Reactor的**ContextView**(`參考響應式-概念`) * 在整個**反應式鏈中傳遞共享資料**(類似於 ThreadLocal) * 可儲存使用者資訊、設定資料等,與每個 Mono 或 Flux 的執行綁定。 # 8. WebFlux配置 * **@EnableWebFlux**:開啟WebFlux自定義 * 不推薦使用(`因為所有WebFlux的默認配置全部失效,全部自定義`) * WebFluxAutoConfiguration:WebFlux的自動配置 ## 8.1 自定義WebFlux-修改某些配置 * 注入**WebFluxConfigurer**組件,並**重寫需要調整的底層邏輯** * **擴展預設配置,而不會完全覆蓋** * **代碼**:**修改默認的跨域配置** ```java= @Configuration public class MyConfiguration { //配置底層 @Bean public WebFluxConfigurer configurer1() { return new WebFluxConfigurer() { @Override public void addCorsMappings(CorsRegistry registry) { registry.addMapping("/**")//允許所有路徑 .allowedHeaders("*")//允許所有請求頭 .allowedMethods("*")//允許所有方法 .allowedOrigins("localhost"); } }; } } ``` # 9. Filter * **使用WebFilter** * **代碼**: ```java= import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebFilter; import org.springframework.web.server.WebFilterChain; import reactor.core.publisher.Mono; public class MyWebFilter implements WebFilter { @Override public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); ServerHttpResponse response = exchange.getResponse(); System.out.println("===請求放行之前==="); Mono<Void> filter = chain .filter(exchange)//放行 .doOnError(throwable -> System.out.println(throwable))//目標方法異常後 .doFinally(signalType -> System.out.println("=目標方法執行之後="));//目標方法執行之後 return filter;//注意返回的流 } } ``` # 10. R2DBC * 響應式的關聯式ORM ## 10.1 依賴 * 導入 * 驅動 * 連接池 * pom.xml ```xml= <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-r2dbc</artifactId> </dependency> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.postgresql</groupId> <artifactId>r2dbc-postgresql</artifactId> <scope>runtime</scope> </dependency> ``` ## 10.2 基本範例-R2DBC * 代碼: ```java= // 思想: // 1. 有了r2dbc,我們的應用在資料庫層面 天然支援高並發、高吞吐量。 // 2. 並不能提升開發效率 @Test void getConnection() throws IOException { // r2dbc基於全異步、響應式、訊息驅動 // r2dbc:postgresql://localhost:5432/postgres //0. postgresql配置 PostgresqlConnectionConfiguration postgresqlConnectionConfiguration = PostgresqlConnectionConfiguration.builder() .addHost("localhost", 5432) .database("postgres") .username("nicolas") .build(); //1. 獲取連接工廠,使用Postgresql PostgresqlConnectionFactory connectionFactory = new PostgresqlConnectionFactory(postgresqlConnectionConfiguration); //2. 獲取連接,發送sql //數據發佈者 Publisher<? extends Connection> publisher = connectionFactory.create(); //Mono.from 通常用於從某個 Publisher 中提取數據,並將其包裝為一個 Mon Mono.from(connectionFactory.create()) //flatMap 改用flatMapMany,因為會返回很多數據 .flatMapMany(connection -> connection .createStatement("SELECT * from student WHERE age > $1 and age < $2") .bind(0, 9) .bind(1, 123) .execute() ) //使用flatMap,上面的流,可能是Flux\Mono,map的前提是上面的流與下面的流要相同 .flatMap(result -> result .map((row, rowmedata) -> row.get("id", String.class))) .take(5)//取出5筆數據 .subscribe(System.out::println); //卡住主線程 System.in.read(); } ``` * take():取出5筆數據 * 背壓:不用回傳所有東西,基於請求量回傳 ## 10.3 SpringData-R2DBC * **配置**: ```properties= spring.r2dbc.url=r2dbc:postgresql://localhost:5432/postgres spring.r2dbc.username=nicolas spring.r2dbc.password ``` ### 10.3.1 SpringBoot對r2dbc的自動配置 * **R2dbcAutoConfiguration**:主要配置**連接工廠**、**連接池**(`非DB線程池,不使用DB線程池`) * R2dbcDataAutoConfiguration: * **R2abcEntityTemplate**:操作資料庫的響應式客戶端(XxxTemplate) * 提供 CRUD API * RelationalManagedTypes:描述應用程序中與數據庫交互的**受管理的實體類型** * 使用注解如 **@Entity** 或 **@Table** 標記的類 * R2dbcMappingContext:數據類型映射關係 * MappingR2dbcConverter:轉換器 * R2dbcCustomConversions:自定義轉換器 * 自定數據類型轉換功能 * **R2dbcRepositoriesAutoconfiguration**:開啟SpringData聲明式介面方式的CRUD * R2dbcTransactionManagerAutoConfiguration:事務管理 ### 10.3.2 DatabaseClient & R2dbcEntityTemplate * 兩個R2DBC核心API * DatabaseClient * R2dbcEntityTemplate #### 10.3.2.1 R2dbcEntityTemplate * **作用**:提供CRUD API * **映射對象**: ```java= @Table(name = "student") @Data @Builder @AllArgsConstructor @NoArgsConstructor public class StudentData { @Id @Column("id") private String id; @Column("age") private Integer age; @Column("name") private String name; @Column("nickname") private String nickName; } ``` * 代碼: ```java= @Autowired//join很難處理 R2dbcEntityTemplate r2dbcEntityTemplate;//CRUD API @Test void r2dbcEntityTemplate() throws IOException { //1. 構造查詢條件 id = '1' Criteria criteria = Criteria.empty() .and("id").is("1"); //2. 封裝為Query對象 Query query = Query.query(criteria); r2dbcEntityTemplate.select(query, StudentData.class) .subscribe(System.out::println); //卡住主線程 System.in.read(); } ``` #### 10.3.2.2 DatabaseClient * **處理複雜的SQL** * **代碼**: ```java= @Autowired //貼近底層,寫原生SQL DatabaseClient databaseClient;//數據庫客戶端 @Test void databaseClient() throws IOException { databaseClient.sql("select * from student where id = $1") .bind(0, "1") //抓取數據 .fetch() //返回所有數據 .all() .map(row -> { return StudentData.builder() .id(row.get("id").toString()) .age(Integer.parseInt(row.get("age").toString())) .name(row.get("name").toString()) .nickName(row.get("nickname").toString()) .build(); }) .subscribe(System.out::println); System.in.read(); } ``` * fetch():抓取數據 * all():返回所有數據 * first():返回第一筆 * one():返回一筆 ### 10.3.3 手動創建連接池 * 繼承**AbstractR2dbcConfiguration**,重寫**connectionFactory()** * 創建一個連接池 * **@EnableR2dbcRepositories**:開啟R2dbc的倉庫功能 * basePackages:設定Data的位置 * entityOperationsRef:指定使用的數據庫引擎 * 多數據源時建議配置 * 建立**ReactiveTransactionManager**:**開啟事物機制** * 方法上貼`@Transactional`: * @Transactional 的本質是 **「聲明一個帶有事務 Context 的數據流」** *** 執行時**會**到當前的Context**中尋找事務 * **差異**: * **傳統** (Imperative):依靠 `ThreadLocal<Connection>`。一旦切換線程,事務就丟失。 * **響應式** (Reactive):依靠 `Subscriber Context`。事務狀態就像隨身攜帶的「背包」,隨著數據流(Stream)傳遞,無論線程如何切換,**R2dbcTransactionManager** 都能從 Context 中找回同一個連接。 * **代碼**: ```java= //開啟R2dbc的倉庫功能:JPA @EnableR2dbcRepositories(basePackages = "com.nicolas.pulse.adapter.repository", entityOperationsRef = "MainR2dbcEntityOperations") @Configuration public class R2dbcConfiguration extends AbstractR2dbcConfiguration { @Bean("R2dbcConfigurationDB") @Override public ConnectionFactory connectionFactory() { ConnectionFactory connectionFactory = ConnectionFactories.get(builder() .option(DRIVER, "postgresql") .option(HOST, "localhost") .option(PORT, 5432) .option(USER, "nicolas") .option(DATABASE, "postgres") .build() ); ConnectionPoolConfiguration poolConfig = ConnectionPoolConfiguration.builder(connectionFactory) .initialSize(5) .maxSize(20) .maxIdleTime(java.time.Duration.ofMinutes(30)) .build(); return new ConnectionPool(poolConfig); } @Primary @Bean("MainR2dbcEntityOperations") public R2dbcEntityOperations mainR2dbcEntityOperations(@Qualifier("MainConnectionFactory") ConnectionFactory connectionFactory) { DatabaseClient databaseClient = DatabaseClient.create(connectionFactory); return new R2dbcEntityTemplate(databaseClient, PostgresDialect.INSTANCE); } // 設定事務 @Primary @Bean public ReactiveTransactionManager transactionManager(@Qualifier("MainConnectionFactory") ConnectionFactory connectionFactory) { return new R2dbcTransactionManager(connectionFactory); } } ``` ### 10.3.4 數據封裝 * **與JPA配置方式相同** * **schema** ```sql= CREATE TABLE public.student ( id varchar NOT NULL, "name" varchar NULL, age int4 NULL, nickname varchar NULL, createtime timestamp NULL, CONSTRAINT student_pk PRIMARY KEY (id) ); ``` * **代碼**: ```java= @Table(name = "student") @Data @Builder @AllArgsConstructor @NoArgsConstructor public class StudentData { @Id @Column("id") private String id; @Column("age") private Integer age; @Column("name") private String name; @Column("nickname") private String nickName; @Column("createtime") private Instant createTime; //響應式中日期的映射用 Instant 或 LocalXxx } ``` * 注意:響應式中**日期的映射**用 `Instant` 或 `LocalXxx` * **時間類型的不能使用Date** ### 10.3.5 CRUD 接口 * 繼承 **R2dbcRepository\<T, ID>** * **代碼** ```java= import org.springframework.data.r2dbc.repository.R2dbcRepository; //最底層是 ReactiveCrudRepository public interface StudentRepository extends R2dbcRepository<StudentData, Long> { } ``` ### 10.3.6 自定轉換器 * **用於Enum轉換**,代替**JPA**的`@Enumerated` 1. 創建轉換器:實現Converter ```java= import io.r2dbc.spi.Row; import org.springframework.core.convert.converter.Converter; import org.springframework.data.convert.ReadingConverter; @ReadingConverter public class StudentConvert implements Converter<Row, StudentData> { @Override public StudentData convert(Row source) { return StudentData.builder() .id(source.get("id", String.class)) .name(source.get("name", String.class)) .age(source.get("age", Integer.class)) .nickName(source.get("nickname", String.class)) .createTime(source.get("createtime", LocalDateTime.class)) .teacherData(TeacherData.builder() .id(source.get("teacher_id", String.class)) .name(source.get("teacher_name", String.class)) .build()) .build(); } } ``` * `@ReadingConverter`:語意標記 * 不會自動註冊 Bean * 告訴 Spring Data R2DBC 這個 Converter 是用來把 DB 讀出來的資料轉成 Entity 層型別的 2. 將轉換器**添加至轉換器列表中** 或者 `@Component`聲明為**Bean** ```java= @EnableR2dbcRepositories()//開啟R2dbc的倉庫功能:JPA @Configuration public class R2dbcConfiguration extends AbstractR2dbcConfiguration { @Bean @Override public R2dbcCustomConversions r2dbcCustomConversions() { return R2dbcCustomConversions.of(PostgresDialect.INSTANCE, new StudentConvert()); } } ``` * **R2dbcCustomConversions**:**管理**和**註冊**數據庫與應用程序之間的**自定義數據類型轉換器** ## 10.4 最佳實踐 * **SpringDataR2DBC**,基礎的CRUD用R2dbcRepository提供好了 * **自訂複雜的SQL**(`單表`):**@Query** * **多表查詢**複雜結果集: * **DatabaseClient**自訂SQL及結果封裝 * @Query+自訂Converter實現結果封裝 * **1-1**、**1-N**關聯關係的封裝都需要**自訂結果集**的方式 * SpringDataR2DBC: * 自訂Converter指定結果封裝 * **DatabaseClient**(`貼近底層的操作`) ### 10.4.1 一對多 * 使用 **DatabaseClient**(`貼近底層的操作`) * **代碼**: ```java= @Autowired //貼近底層,寫原生SQL DatabaseClient databaseClient;//數據庫客戶端 @Test void oneToMany() throws IOException { teacherRepositoryPeer.findAll().subscribe(System.out::println); databaseClient.sql(""" select t.id tid,t.name tname,s.* from teacher t left join student s on t.id =s.teacher_id order by t.id """) .fetch() .all() //按條件將獲取批次數據 //如果下一個值與上一個值的判定不同,就會開啟新的buffer => 注意:數據需要先被排序過 .bufferUntilChanged(a -> a.get("tid").toString()) .map(listMap -> TeacherData.builder() .id(listMap.get(0).get("tid").toString()) .name(listMap.get(0).get("tname").toString()) .studentData(listMap.stream().map(map -> StudentData.builder() .id(Optional.of(map.get("id")).map(Object::toString).orElse(null)) .name(map.get("name").toString()) .nickName(map.get("nickname").toString()) .createTime(LocalDateTime.parse(map.get("createtime").toString(), DateTimeFormatter.ISO_LOCAL_DATE_TIME)) .age(Integer.parseInt(map.get("age").toString())) .build()) .toList()) .build()) .subscribe(System.out::println); System.in.read(); } ``` * 使用 **bufferUntilChanged(條件)**:**按條件將獲取批次數據** * **重點**:**如果下一個值與上一個值的判定不同**,就會開啟新的buffer(`新批次`) * **效果**:**獲取批次數據** * **注意**:數據需要`先排序` ## 10.5 審計 * `@EnableR2dbcAuditing`:**開啟審計功能** * 實現**ReactiveAuditorAware\<T>**:用於`@CreatedBy`、`@LastModifiedBy` * 建議繼承**Persistable\<T>**:判斷是不是新Row * **預設判斷**:是存入的數據有無ID * 有ID:舊數據 * 無ID:舊數據 * JPA是**存入前**會去DB查有ID是否存在 * @LastModifiedDate、 @CreatedDate,只有支援下列類型: * java.time.LocalDateTime, java.time.LocalDate, java.time.LocalTime, java.time.Instant, java.util.Date, java.lang.Long, long ### 10.5.1 繼承Persistable\<T> * **代碼**:account ```java= @Table(value = DbMeta.AccountData.TABLE_NAME) public class AccountData implements Persistable<String> { @Id @Column(DbMeta.AccountData.COL_ID) private String id; @Column(DbMeta.AccountData.COL_NAME) private String name; @Column(DbMeta.AccountData.COL_SHOW_NAME) private String showName; @Column(DbMeta.AccountData.COL_PASSWORD) private String password; @Column(DbMeta.AccountData.COL_IS_ACTIVE) private boolean isActive; @CreatedBy @Column(DbMeta.AccountData.COL_CREATED_BY) private String createdBy; @LastModifiedBy @Column(DbMeta.AccountData.COL_UPDATED_BY) private String updatedBy; @CreatedDate @Column(DbMeta.AccountData.COL_CREATED_AT) private Instant createdAt; @LastModifiedDate @Column(DbMeta.AccountData.COL_UPDATED_AT) private Instant updatedAt; @Column(DbMeta.AccountData.COL_REMARK) private String remark; @Override public boolean isNew() { return !StringUtils.hasText(createdBy) && !StringUtils.hasText(updatedBy) && ObjectUtils.isEmpty(createdAt) && ObjectUtils.isEmpty(updatedAt); } } ``` # 11. Spring Security * pom.xml ```xml= <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-security</artifactId> </dependency> ``` ## 11.1 默認配置 * 默認行為:所有請求都需要**登錄**(`認證、Authenticated`)才能夠訪問 * **SecurityAutoConfiguration**:**非響應式** * 導入 SecurityFilterChain 組件: * 默認所有請求都要登陸才可以 * 默認登錄頁面 * 需要添加`@EnableWebSecurity`,才可以自定配置(`響應式默認導入類似註解,可以不需要添加`) * SecurityFilterAutoConfiguration * MethodSecurityAspectJAutoProxyRegistrar * **ReactiveSecurityAutoConfiguration**:**響應式** * 導入 **EnableWebFluxSecurityConfiguration** 配置 * **@EnableWebFluxSecurity**註解 `默認導入` * ServerHttpSecurityConfiguration * WebFluxSecurityConfiguration * ReactiveOAuth2ClientImportSelector ## 11.2 配置SecurityFilterChain * 使用 **ServerHttpSecurityConfiguration** 中創建的 **ServerHttpSecurity**,來`配置SecurityFilterChain` * 作用:對Security中的某些事項進行配置_ * 禁用 csrf:沒有禁用,會去檢查csrf_token * .csrf(ServerHttpSecurity.CsrfSpec::disable) * 禁用默認的表單登陸 * .formLogin(ServerHttpSecurity.FormLoginSpec::disable) * 路徑設定:哪些要認證、哪些要授權的路徑配置 * .authorizeExchange((authorize)->{}) * 設定 Basic Auth 登入 * .httpBasic(Customizer.withDefaults()) * 自定認證規則 * .authenticationManager() * 代碼: ```java @Configuration public class SecurityConfiguration { private final ReactiveUserDetailsService reactiveUserDetailsService; public SecurityConfiguration(@Qualifier("ReactiveUserDetailsServiceImpl") ReactiveUserDetailsService reactiveUserDetailsService) { this.reactiveUserDetailsService = reactiveUserDetailsService; } @Bean SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) { return http //定義哪些請求需要認證、角色 .authorizeExchange((authorize) -> { //該路經所有人都可以訪問 authorize.pathMatchers("/hello").permitAll(); //剩下路徑都需要認證(登陸) authorize.anyExchange().authenticated(); } ) //Basic Auth 登陸 .httpBasic(Customizer.withDefaults()) //X. 禁用默認的表單登陸 .formLogin(ServerHttpSecurity.FormLoginSpec::disable) //3. 安全控制:禁用csrf(跨站請求偽造)防護,當前不需要使用,改用JWT或OAuth2.0 .csrf(ServerHttpSecurity.CsrfSpec::disable) //4. 配置認證規則(自定認證器):從DB中獲取用戶 // 使用 ReactiveAuthenticationManager 去查詢用戶信息 // UserDetailsRepositoryReactiveAuthenticationManager 去DB中獲取用戶信息 // 需要 ReactiveUserDetailsService // 結論:實現一個 ReactiveUserDetailsService:響應式的用戶查詢服務 (對應 UserDetailsService) //實現ReactiveUserDetailsService,就不會有默認密碼 .authenticationManager(getUserDetailsRepositoryReactiveAuthenticationManager()) .build(); } @Bean public UserDetailsRepositoryReactiveAuthenticationManager getUserDetailsRepositoryReactiveAuthenticationManager() { UserDetailsRepositoryReactiveAuthenticationManager authenticationManager = new UserDetailsRepositoryReactiveAuthenticationManager(reactiveUserDetailsService); authenticationManager.setPasswordEncoder(passwordEncoder()); return authenticationManager; } @Bean public PasswordEncoder passwordEncoder() { return new BCryptPasswordEncoder(); } } ``` ### 11.2.1 自定認證規則-說明 * **設定在ServerHttpSecurity.authenticationManager(ReactiveAuthenticationManager manager)** * 使用 **ReactiveAuthenticationManager**:查詢用戶信息 * **UserDetailsRepositoryReactiveAuthenticationManager**(`ReactiveAuthenticationManager的實現類`):去DB中獲取用戶信息 * 獲取用戶信息的部分:需要實現**ReactiveUserDetailsService**中**findByUsername**,返回一個Mono\<UserDetails>(`需要將Account的資訊在轉換成UserDetails`) ### 11.2.2 自定認證規則-從DB獲取用戶信息 * DTO * Account ```java= public class AccountData { @Id @Column("id") private String id; @Column("name") private String name; @Column("password") private String password; @Column("age") private Integer age; @Column("nickname") private String nickName; @Column("createtime") private LocalDateTime createTime; @Transient @Builder.Default private List<RoleData> roleDataList = new ArrayList<>(); } ``` * Role ```java= public class RoleData { @Id @Column("id") private String id; @Column("name") private String name; @Column("privileges") private String privileges; @Transient @Builder.Default private List<AccountData> accountDataList = new ArrayList<>(); } ``` * Entity * Account ```java= public class Account { private String id; private String name; private String password; private Integer age; private String nickName; private LocalDateTime createTime; private List<Role> roleList; } ``` * Role ```java= public class Role { private String id; private String name; private List<String> privileges; } ``` * 代碼:需JPA的概念不同,需要**一次取回所有信息**(`帳號、角色、權限`) ```java= @Repository("AccountRepositoryImpl") public class AccountRepositoryImpl implements AccountRepository { private final AccountRepositoryPeer accountRepositoryPeer; private final DatabaseClient databaseClient; public AccountRepositoryImpl(AccountRepositoryPeer accountRepositoryPeer, DatabaseClient databaseClient) { this.accountRepositoryPeer = accountRepositoryPeer; this.databaseClient = databaseClient; } @Override public Mono<Account> findByName(String name) { return databaseClient.sql(""" select a.id a_id,a."name" a_name,a.password,a.age ,a.nickname ,a.createtime ,r.id r_id,r."name" r_name,r."privileges" from account a left join account_role ar on a.id = ar.account_id left join "role" r on r.id= ar.role_id Where a.name = $1 order by a.id """) .bind(0, name) //獲取數據 .fetch() //返回所有數據 .all() .collectList() .map(this::toAccountData) .map(AccountDataMapper::dataToDomain); } private AccountData toAccountData(List<Map<String, Object>> rowList) { List<RoleData> roleDataList = rowList.stream().filter(row -> Objects.nonNull(row.get("r_id"))) .map(row -> RoleData.builder() .id(row.get("r_id").toString()) .name(row.get("r_name").toString()) .privileges(row.get("privileges").toString()) .build()) .toList(); return AccountData.builder() .id(rowList.get(0).get("a_id").toString()) .name(rowList.get(0).get("a_name").toString()) .password(rowList.get(0).get("password").toString()) .age(Integer.parseInt(rowList.get(0).get("age").toString())) .nickName(rowList.get(0).get("nickname").toString()) .createTime(LocalDateTime.parse(rowList.get(0).get("createtime").toString(), DateTimeFormatter.ISO_LOCAL_DATE_TIME)) .roleDataList(roleDataList) .build(); } } ``` ### 11.2.3 自定認證規則-實現 ReactiveUserDetailsService * **作用**:**SpringSecurity獲取用戶信息的接口** * 使用**SpringSecurity**提供的**User**創建**UserDetails** * `import org.springframework.security.core.userdetails.User;` * `import org.springframework.security.core.userdetails.UserDetails;` * **代碼**: ```java= import com.nicolas.webflux1.entity.Role; import com.nicolas.webflux1.service.AccountRepository; import org.springframework.security.core.userdetails.ReactiveUserDetailsService; import org.springframework.security.core.userdetails.User; import org.springframework.security.core.userdetails.UserDetails; import org.springframework.security.core.userdetails.UsernameNotFoundException; import reactor.core.publisher.Mono; public class ReactiveUserDetailsServiceImpl implements ReactiveUserDetailsService { private final AccountRepository accountRepository; public ReactiveUserDetailsServiceImpl(AccountRepository accountRepository) { this.accountRepository = accountRepository; } @Override public Mono<UserDetails> findByUsername(String username) { return accountRepository.findByName(username) .switchIfEmpty(Mono.error(new UsernameNotFoundException("User name not found, name '%s'".formatted(username)))) .map(account -> User.builder() .username(account.getName()) .password(account.getPassword()) .roles(account.getRoleList().stream().map(Role::getName).toArray(String[]::new)) .authorities(account.getRoleList().stream().flatMap(role -> role.getPrivileges().stream()).distinct().toArray(String[]::new)) .build()); } } ``` ### 11.2.4 自定認證規則-創建 UserDetailsRepositoryReactiveAuthenticationManager * 創建**自定認證器** * **代碼**:配置**獲取User**、**加密器** ```java= @Bean public UserDetailsRepositoryReactiveAuthenticationManager getUserDetailsRepositoryReactiveAuthenticationManager() { UserDetailsRepositoryReactiveAuthenticationManager authenticationManager = new UserDetailsRepositoryReactiveAuthenticationManager(reactiveUserDetailsService); //配置加密器 authenticationManager.setPasswordEncoder(passwordEncoder()); return authenticationManager; } @Bean public PasswordEncoder passwordEncoder() { return new BCryptPasswordEncoder(); } ``` * 對應MVC的DaoAuthenticationProvider ## 11.3 方法註解式權限驗證 * **開啟功能**:添加 **@EnableReactiveMethodSecurity** * **非響應式**: * SpringBoot2:@EnableGlobalMethodSecurity * SpringBoot3:@EnableMethodSecurity * **代碼**: ```java= @EnableReactiveMethodSecurity// 開啟方法權限驗證 @Configuration public class SecurityConfiguration { @Bean SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) { return http.build(); } } ``` * 方法添加 @PreAuthorize * 支持複雜的SpEL * `@PreAuthorize("@mySecurityExpression.hasAuthority('test')")` * mySecurityExpression:Bean名稱 * hasAuthority:該Bean中的方法 * 使用 **SecurityExpressionOperations** 接口中的方法,方法: * **hasRole()** * **hasAuthority()** * **isAuthenticated()** * **範例**: * @PreAuthorize("hasRole('ADMIN')"): * 角色需要有 ADMIN * @PreAuthorize("hasAuthority('READ_PRIVILEGE')") * 權限需要有 READ_PRIVILEGE * @PreAuthorize("isAuthenticated()") * 是否已登入 * @PreAuthorize("#username == authentication.name") * 使用 # 獲取方法參數或檢查 authentication 內容。 * 它是 **Authentication** 對象的 getName() 方法的返回值。 * @PreAuthorize("hasRole('USER') and #id == principal.id") * 使用 # 獲取方法參數或檢查 authentication 內容。 * **principal** 是 **Authentication** 對象中的主體(Principal)部分 ## 11.4 自定義過濾器 * 把自定義過濾器**加入過濾鏈**的話,建議 **自行 new** 自定義過濾器 * 自定義過濾器使用 **@Component**,就會**被自動加入**到過濾鏈(`避免重複加入`) * 使用`addFilterAt`將自定義過濾器加入到**過濾鏈** * `addFilterBefore` 沒有生效 ## 11.5 使用JWT或Oauth2 * 使用http`.oauth2ResourceServer()` * 使用Oauth2的話,建立一個**ReactiveJwtDecoder**的Bean,有兩的實現類可以用,當中就可以設定**金鑰位置**(`url\檔案路徑`)或**金鑰字串** ### 11.5.1 使用JWT * **方式**ㄧ:**實作ReactiveAuthenticationManager**,用於處理**自行產生的JWT** * 參考oauth2ResourceServer中**ReactiveAuthenticationManager**實作 => **JwtReactiveAuthenticationManager** * **核心項目**: * 驗證Token:`.jwtDecoder()` * 對應的介面**ReactiveJwtDecoder** * 將Token轉換成Authentication:`.jwtAuthenticationConverter()` * **配置**: ```java return http .oauth2ResourceServer(oauth2 -> oauth2 .jwt(jwt -> jwt.authenticationManager(customerJwtReactiveAuthenticationManager)) ) ``` * **方式二**:對應上述**兩個核心項**目,都可以在`jwt`後進行**鏈式複寫** * `.jwtDecoder()`:**驗證Token** * `.jwtAuthenticationConverter()`:**將Token轉換成Authentication** ### 11.5.2 JWT 異常處理 * **現象描述**:在配置 SecurityWebFilterChain 時,僅在全域 `.exceptionHandling()` 配置處理器對 JWT 認證失敗**無**,必須在 `.oauth2ResourceServer().jwt()` 內部**重複配置** * **核心原因**:**職責分離與就近原則** * Spring Security 的過濾器鏈 (Filter Chain) 採用模組化設計: * **OAuth2 專屬過濾器**:當請求進入 JWT 驗證邏輯時,它**擁有自己的異常處理上下文** * **優先權**:`jwt()` 內部的 `authenticationEntryPoint` 優先級高於全域配置。如果內部未指定,它**會觸發 OAuth2 的預設行為**(`僅回傳 401 狀態碼`),而**不會「向上拋出」給全域處理器** * **完全配置模板**:確保所有安全漏洞(無效 Token、過期 Token、權限不足、匿名存取)都能統一口徑回傳 JSON ```java= return http .oauth2ResourceServer(oauth2 -> oauth2 .jwt(jwt -> jwt .authenticationManager(...) .authenticationEntryPoint(securityExceptionHandler) ) ) .exceptionHandling(e -> e .accessDeniedHandler(securityExceptionHandler) .authenticationEntryPoint(securityExceptionHandler) ) .build(); ``` # 12. AOP * **Spring AOP 在 WebFlux 與在傳統 Spring MVC / Web 的行為是一樣的** ## 12.1 核心相同點 * 攔截機制:都是基於 Spring 代理(Proxy)或 AspectJ,**攔截 Spring Bean 方法** * 觸發時機:方法呼叫瞬間(entry / exit) * 可得資訊:方法反射、Class、參數、annotation * 使用方式:@Around, @Before, @After,切點表達式一樣 * 換句話說,AOP 的**運作「方法攔截」邏輯**跟 **WebFlux / Web 無關**,都是代理 + 反射 + 方法呼 ## 12.2 不同點(只在外層環境) | 差異點 | WebFlux | Spring MVC / Web | | -------------- | ---------------------------------------------------- | ----------------------------- | | 返回值 | Mono / Flux / 非阻塞 Publisher | 普通物件 / Servlet response | | Thread 模型 | 非阻塞,可能 thread hopping | 一個 request 對應一條 thread | | ThreadLocal 依賴 | 不可靠,因 pipeline 會切換 thread | 可可靠使用 ThreadLocal | | pipeline 內的操作符 | 不能被 AOP 切,需用 `.doOnSubscribe` / `.doFinally` 或 Hooks | 不存在類似 pipeline,方法內程式可直接切 | | 事件流管理 | Reactive pipeline / non-blocking | blocking / thread-per-request | ## 12.3 實務結論 * 方法層級的 log / metrics / annotation 檢查 → WebFlux 與 Web 完全一致 * 事務 / ThreadLocal / pipeline lambda → WebFlux 需額外處理(Reactive Context / Hooks / 包裝 pipeline) * **ThreadLocal**:MVC的**事務管理**採用ThreadLocal # 13. ResponseEntity * 參考網址:https://docs.spring.io/spring-framework/reference/web/webflux/controller/ann-methods/responseentity.html ## 13.1 表格 | 程式碼結構 | 狀態碼 & 標頭 (控制時機) | 回應體 (Body 傳輸方式) | 核心應用場景 | 控制力與優勢 | | :--- | :--- | :--- | :--- | :--- | | **`Mono<T>` 或 `Flux<T>`** | 預設為 `200 OK`,由框架自動處理。 | 數據準備好後非同步發送。 | **最簡潔的寫法**。適用於不需自定義狀態碼的 GET 請求。 | **控制力最低**;但程式碼最簡單、最直接。 | | **`Mono<ResponseEntity<T>>`** | **非同步**:**等待操作結果後**,一次性發送。 | **非同步**:等待操作結果後,一次性發送。 | **單一資源操作**:建立、更新、查詢單一資源,且需要根據結果動態返回 `201`、`404` 等狀態碼。 | **控制力最高**:基於非同步結果決定**所有**回應細節。 | | **`ResponseEntity<Mono<T>>` 或 `ResponseEntity<Flux<T>>`** | **立即**發送 (`200 OK` 或預設值)。 | **稍後**非同步串流發送。 | **大型數據串流**:需要立即建立連線並開始傳輸數據,**而不在乎後續狀態碼變化**。 | **串流優勢**:回應速度快,但狀態碼固定。 | | **`Mono<ResponseEntity<List<T>>>`** | **非同步**:等待 `Flux` 收集完畢後,一次性發送。 | **非同步**:作為單一 `List` 傳輸。 | **列表後置控制**:需要等待整個列表結果,以便處理列表為空時的狀態碼(如 `204 No Content`)。 | **次高控制力**:犧牲串流,換取對完整列表狀態碼的控制。 | ## 13.2 實際開發 * 查詢單個:使用 Mono\<ResponseEntity\<T>> (處理 404)。 * 建立/修改:使用 Mono\<ResponseEntity\<T>> (返回 201/200) * **高效串流**或**查詢多個**:使用 Flux\<T> 或 ResponseEntity\<Flux\<T>> * 即時通知/SSE:使用 Flux\<T> 配合 MediaType.TEXT_EVENT_STREAM_VALUE。 # X. MDC * MDC是基於ThreadLocal,在MVC下,每一個Request都在同一Thread開始與結束 * 在WebFlux,每一個Request不會在同一Thread開始與結束,需要解決此問題 * **設定該次Request的traceId**: * 使用Reactor的contexView,用於在整個Request中可以獲取到traceId * **代碼**:使用WebFilter ```java= @Component @Order(-1) public class MdcFilter implements WebFilter { @Override public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { String traceId = exchange.getRequest().getId(); return chain.filter(exchange) .contextWrite(context -> context.put("traceId", traceId)); } } ``` * traceId來自每次Request時,webflux為該次Request設定的Id * 引入 **context-propagation**,處理ThreadLoacl問題 * Micrometer 的 **context-propagation** 透過註冊 **ThreadLocalAccessor**,讓 Reactor Context 自動同步到 MDC,並由Hooks.enableAutomaticContextPropagation() 全域啟用自動傳遞。 * reactor整合**context-propagation** * **整合類**:**ContextPropagationSupport** * **引入** ```xml= <dependency> <groupId>io.micrometer</groupId> <artifactId>context-propagation</artifactId> </dependency> ``` * **代碼**:處理ThreadLoacl ```java= @Configuration public class PropagationConfiguration { @PostConstruct public void registerMdc() { ContextRegistry.getInstance().registerThreadLocalAccessor( mdcProperties.getTraceId(), // 為這個 ThreadLocal accessor 命名(key) () -> MDC.get(mdcProperties.getTraceId()), // 定義如何讀取 ThreadLocal traceId -> MDC.put(mdcProperties.getTraceId(), traceId), // 定義如何寫入 ThreadLocal () -> MDC.remove(mdcProperties.getTraceId()) // 定義如何清除 ThreadLocal ); Hooks.enableAutomaticContextPropagation(); } } ``` * 當 Reactor Context 裡有 `"traceId"` 時,請在需要的 thread 也幫我放進 MDC(ThreadLocal) ###### tags:`SpringWebflux` `r2dbc` `SpringSecurity` `Reactive Stream`