###### tags: `Quarkus` `MicroService`
近期因工作上需求,需要接觸新的後端框架...Quarkus,號稱有出色的啟動速度和低記憶體占用....因寫法上與一般Web API有點不同特別做個紀錄。
## 一、RESTful Web API 阻塞式、非阻塞式、響應式與併發差異性
一般API常見的有四種寫法,分別為阻塞式、非阻塞式、響應式與併發的寫法。他們的差異如下
- 阻塞式 (Blocking) : 每個請求都必須等待前一個請求完成後才能繼續進行。
- 非阻塞式 (Non-blocking) : 後續的請求不必等前一個請求完成,就可繼續進行。簡單來說就是允許系統在等待某個任務完成的同時,去做其他事情。
- 響應式 (Reactive) : 基立於Non-blocking(Thread Pool)、事件驅動(Event-Driven)和觀察者模式(Observer Pattern),相較於Non-blocking更能輕易做數據流的整合與操作,能更優雅的處理好的反壓(back-pressure)和資源彈性(resource elasticity)。
- 併發 (Concurrency) : 基本上併發是一種概念,簡單來說只要涉及同時間有多個請求或操作在進行就可以叫併發? 而Non-blocking與Reactive就是一種併發的實現方式。
### a.Blocking
下圖為Blocking Flow,這Flow很簡單,簡單來說HTTP GET/user/joe打出去後,Client要等到HTTP Response,後端服務(Container)才可以接受處理另一個HTTP Request。

Java範例程式碼如下
```java=
@GetMapping("/blocking")
public String blocking() throws InterruptedException {
// Simulate a long-running operation
Thread.sleep(5000);
return "Finished blocking operation";
}
```
### b.Non-Blocking
Non-Blocking Flow如下,可以看到一個Key Point,在Container往Controller呼叫getUser()當下立即返回一個Future HTTP Result。這個Future HTTP Result會告知Container可以去處理其他事情,已收到任務命令。Container就可以接著去接收下一個Request。
詳細流程如下,基礎原理在Thread Pool與callback的應用實作
- 客戶端到容器(Container)
- Step1 : Client發起HTTP GET請求,當客戶端(Client)發起一個HTTP GET到 /user/joe,這個請求首先會被送到容器(Container)。
- 容器到Controller
- Step1 : HTTP請求到達Controller,容器會將這個HTTP請求轉交給Controller對應的getUser()方法。
- Step2 : Controller回應Future HTTP Result,這裡的"Future HTTP Result"是一種占位符,告訴容器這個請求會被異步處理。Controller會立即回應,這樣容器就知道可以去處理其他請求。
- Controller到Service
- Step1 : Service方法開啟新執行緒,Controller會調用Service的getUserByName()方法。Service層會開啟一個新的執行緒(通常來自一個執行緒池(Thread Pool))來處理這個請求。
- Step2 : 釋放原始執行緒,一旦新的執行緒開始運行,用來處理原始HTTP請求的執行緒就會被釋放,返回到執行緒池中。
- Service到Repository到Database
- Step1 : 非同步操作和回調(Callbacks),Service會進一步調用Repository的findAllByLastName()方法,然後Repository會非同步地與Database進行交互。
- Step2 : Repository回應給Service,Repository會回傳一個CompletableFuture給Service。
- Service到Controller到容器
- Step1 : Service回應給Controller,Service會將CompletableFuture回傳給Controller。
- Step2 : 回調由不同的執行緒執行:這個CompletableFuture一旦完成,它的回調會被一個不同的執行緒(也來自執行緒池)執行。
- Step3 : 返回HTTP響應:最後,Controller會生成最終的HTTP響應(通常是CompletableFuture的實際內容)並回傳給容器,然後容器再回傳給客戶端。

Java範例程式碼如下
```java=
@RestController
public class NonBlockingController {
@GetMapping("/non-blocking")
public CompletableFuture<String> nonBlocking() {
return CompletableFuture.supplyAsync(() -> {
try {
// Simulate a long-running operation
Thread.sleep(5000);
} catch (InterruptedException e) {
// Handle exception
}
return "Finished non-blocking operation";
});
}
}
```
### c.Reactive
響應式是基立於Non-blocking(Thread Pool)的設計是上,並套用事件驅動(Event-Driven)和觀察者模式(Observer Pattern)設計模式。可以輕易達到資料流串接處理、back-pressure與resource elasticity。
這邊以flow圖去解釋要想個情境會稍微麻煩一些,故這小節紀錄Reactive,我想說直接以Srouce Code去解釋。
#### 資料流串接處理
響應式寫法可以輕易的讓你客製資料流串接,例如三支服務處理完後,再去做事情。
舉個例子,假設你的系統需要從多個服務提供者(例如,不同的API或數據庫)中獲取數據,並且希望這些操作能夠非阻塞和高效地進行。
```java=
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
@RestController
public class ReactiveController {
// 模擬從一個遠程API或數據庫中獲取單一數據
private Mono<String> getDataFromProvider1() {
return Mono.just("Data from Provider 1")
.delayElement(Duration.ofSeconds(2));
}
// 模擬從另一個遠程API或數據庫中獲取流式數據
private Flux<String> getDataFromProvider2() {
return Flux.just("Data 1 from Provider 2", "Data 2 from Provider 2")
.delayElements(Duration.ofSeconds(1));
}
@GetMapping("/aggregate")
public Flux<String> aggregateData() {
// 合併兩個數據來源並返回
return Flux.concat(getDataFromProvider1(), getDataFromProvider2());
}
}
```
如果用一般寫法要達到同效果,寫法如下,相對起來會較於複雜,使用Reactive我們可以輕易達到當兩個數據都收到資料時,在實際做下一步動作.
```java=
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@RestController
public class NonReactiveController {
// 模擬從一個遠程API或數據庫中獲取單一數據
private String getDataFromProvider1() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Data from Provider 1";
}
// 模擬從另一個遠程API或數據庫中獲取流式數據
private List<String> getDataFromProvider2() {
List<String> data = new ArrayList<>();
data.add("Data 1 from Provider 2");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
data.add("Data 2 from Provider 2");
return data;
}
@GetMapping("/aggregate")
public List<String> aggregateData() {
List<String> aggregatedData = new ArrayList<>();
// 從第一個數據提供者獲取數據
aggregatedData.add(getDataFromProvider1());
// 從第二個數據提供者獲取數據
aggregatedData.addAll(getDataFromProvider2());
return aggregatedData;
}
}
```
#### back-pressure
back-pressure為一種流量控制手段,當系統中的一個部分(通常是消費者)無法跟上另一個部分(通常是生產者)的速度時,它會發出一個信號,要求生產者減慢速度。換句話說,當系統過載時,響應式架構允許系統向上游(發送請求的來源)發出信號,告訴它「慢下來!」。
舉個簡單例子,假設我們的後端API需要從多個數據源拉取數據,然後進行一些耗時的計算或轉換,最後將結果返回給客戶端。使用Reactive方法程式碼如下,
這個例子中,我們創建了一個Flowable流,用於模擬一個快速生成數據的數據源。這個流生成0到999的整數。然後,我們用BackpressureStrategy.BUFFER來指定背壓策略,這樣如果後續的操作(例如,存儲到數據庫或進行計算)無法跟上數據生成的速度,這些數據會被緩存起來。
我們使用.observeOn(Schedulers.io())來指定後續操作應該在I/O線程上執行,以避免阻塞主線程。
這個.blockingSubscribe()方法會阻塞當前線程直到流被完全消費。這裡只是為了簡單示範,實際應用中應避免使用阻塞調用。
```java=
import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class RxJavaBackPressureExampleController {
@GetMapping("/rxjava-backpressure-example")
public String backPressureExample() {
Flowable<Integer> flowable = Flowable.create(emitter -> {
for (int i = 0; i < 1000; i++) {
emitter.onNext(i);
}
emitter.onComplete();
}, BackpressureStrategy.BUFFER);
StringBuilder result = new StringBuilder();
flowable.observeOn(Schedulers.io())
.blockingSubscribe(
item -> {
// 模擬耗時操作
Thread.sleep(100);
result.append(item).append(", ");
},
throwable -> result.append("Error: ").append(throwable),
() -> result.append("Done")
);
return result.toString();
}
}
```
#### resource elasticity
指一個系統能夠根據需求自動地調整其資源使用(例如,CPU、記憶體、帶寬等)。這邊舉個例子,
一個圖片轉換服務為例。這個服務需要大量的CPU資源來進行圖片處理。
情境描述
- 用戶會上傳圖片進行格式轉換(例如,從JPEG轉為PNG)。
- 在普通工作時間,只有少數請求。
- 在晚上或促銷活動期間,請求數量會劇增。
- 我們希望系統能夠根據請求的量動態調整資源。
程式碼如下,在這個例子中,我們使用了 Reactor 的 Elastic Scheduler。這個 Scheduler 會根據需要動態地創建或銷毀執行緒,從而達到資源彈性。
```java=
// Controller
@RestController
public class ImageController {
@Autowired
private ImageService imageService;
@PostMapping("/convert")
public Mono<Void> convert(@RequestBody Flux<Image> images) {
return imageService.convertImages(images).then();
}
}
// Service
@Service
public class ImageService {
public Flux<Image> convertImages(Flux<Image> images) {
return images.parallel()
.runOn(Schedulers.elastic()) // 使用Elastic Scheduler
.map(this::heavyImageProcessing)
.sequential();
}
private Image heavyImageProcessing(Image image) {
// 耗CPU的圖片處理
return new Image(); // 返回轉換後的圖片
}
}
```
再舉個例子,你可以使用 Schedulers.io()如下 來自動地管理一個無界的線程池。根據需要創建更多的線程,並在完成後自動釋放,從而實現一種資源彈性。
```java=
Observable<String> observable = Observable.create(emitter -> {
emitter.onNext("Hello");
emitter.onComplete();
});
observable
.observeOn(Schedulers.io())
.subscribe(item -> {
// 這個操作將在 io Scheduler 上運行,具有資源彈性
System.out.println(item);
});
```
### d.Reactive補充
在純後端的情境中,非阻塞(Non-blocking)的實現方式通常已經足夠應對大多數的需求,特別是在處理 I/O 等待、資料庫查詢或網絡請求等方面。但Reactive還是有其特定的應用場景,最常見的就是事件驅動架構及流量控制(Back-Pressure)。針對這兩個應用再給個情境。
- 流量控制(Back-Pressure)
假設你有一個後端系統,它負責處理來自多個客戶端的圖片上傳。每個客戶端上傳的速度可能會不一樣。如果一個客戶端上傳速度非常快,而你的後端處理速度跟不上,這會造成資源過度使用或者其他客戶端的請求被延遲。
在這種情況下,你可以使用 Reactive 的 Back-Pressure 機制來控制接收速度。即,後端會告訴每個客戶端它目前可以接受的速度,這樣就不會因為一兩個過快的客戶端而影響整個系統。
在Java的世界中,這可以使用 Project Reactor 或者 RxJava 等庫來實現。以 Project Reactor 為例:
```java=
Flux.create(sink -> {
// 生產數據的邏輯,可能是讀取一個快速的數據源
})
.onBackpressureBuffer() // 這裡就是使用了背壓機制,當速度過快時會緩存數據
.subscribe(data -> {
// 處理數據的邏輯,可能是寫入一個慢速的數據源
});
```
- 事件驅動架構 (這範例有點爛...事件驅動架構是一個很大的Scope....)
在一個購物網站的後端,當用戶完成付款後,會有多個步驟需要同時進行,例如更新庫存、通知用戶、生成發票等。這些步驟之間相互獨立,但都是由「用戶完成付款」這一個事件觸發的。
在這種情況下,你可以使用事件驅動架構,當一個事件發生時,會通知(publish)給多個事件消費者(subscriber)。
例如,使用 Java 的 Spring Framework,你可以這樣做:
```java=
// 事件發布者
public class PaymentService {
@Autowired
private ApplicationEventPublisher publisher;
public void completePayment() {
// ...付款邏輯
publisher.publishEvent(new PaymentCompletedEvent(this));
}
}
// 事件消費者
public class StockService implements ApplicationListener<PaymentCompletedEvent> {
@Override
public void onApplicationEvent(PaymentCompletedEvent event) {
// ...更新庫存邏輯
}
}
```
每當 PaymentService 發布一個 PaymentCompletedEvent,所有訂閱了這個事件的服務(例如 StockService)都會收到通知,並執行相應的邏輯。
- 非阻塞(Non-blocking)
- 一般是基於執行緒池(Thread Pool)來實現的。
- 主要目的是讓執行緒可以在等待某個操作(如I/O操作或數據庫查詢)完成的期間,被釋放去做其他事情。
- 使用Future、CompletableFuture或其他異步機制來表示未來會完成的結果。
- 響應式(Reactive)
- 基於事件驅動(Event-Driven)和觀察者模式(Observer Pattern)。
- 更加關注數據流和變化的傳播,通常用於實現更為複雜的數據交互和流控制。
- 使用Reactive Streams API或者相關的庫(如RxJava、Project Reactor等)。
- 可以更好地處理背壓(Back-Pressure),即在高負載情況下控制數據流。
## 二、Quarkus 響應式與阻塞式寫法差異
一般來說Rest都是使用Java EE提供的Servlet的這個類別來實踐,傳統的Rest服務皆是如此。
而在Quarkus從底層就採用了新的引擎Vert.x來實作,而Vert.x又別於Servlet的不同是,他基於netty進行實作,也就是他是基於異步高併發的事件驅動的應用程式框架,也代表說為什麼在使用Quarkus要以響應式編程來實作你的程式。

框架的底層架構就先大概知道這裡就好了,詳細的有機會再去更深入的了解,目前我們需要專注的是怎麼寫好Quarkus的程序,像是常聽到的Reactive的相關套件,在Java和JavaScript最有名的就是RX,像是Spring Flux就是基於rxJava去實作的框架,但Quarkus選用的非rxJava而是Mutiny這個一個套件。
### a. 阻塞 VS 非阻塞(響應式寫法)
我現在有一個需求會有兩個資料原,並且要將兩個資料原彙整後再回傳,以下分別用阻塞和非阻塞的寫法來說明。
#### 阻塞
在以下的寫法必須先取得Data1後才能去發起請求取得Data2,因為此目的是要進行彙整,這樣我就會多了一個等待時間,導致程序在處理的速度較慢。
```java
public Data aggregateData() {
Data data1 = fetchDataFromDatabase(); // 請求資料庫,此時執行緒被阻塞
Data data2 = fetchDataFromExternalAPI(); // 請求外部 API,此時執行緒再次被阻塞
return combine(data1, data2); // 整合兩部分的資料
}
```
#### 非阻塞(Mutiny)
而非阻塞再進行撰寫的時候,Data1和Data2是同時發出請求查詢資料,然後再進行彙整,這樣就可以減少要等待Data1查詢回來後的時間,這樣在程序處理的效能會比較快速。
```java
public Uni<Data> aggregateData() {
Uni<Data> data1Uni = fetchDataFromDatabaseAsync(); // 非阻塞地請求資料庫
Uni<Data> data2Uni = fetchDataFromExternalAPIAsync(); // 非阻塞地請求外部 API
return Uni.combine().all().unis(data1Uni, data2Uni) // 同時等待兩個 Uni 完成
.asTuple()
.onItem().transform(tuple -> combine(tuple.getItem1(), tuple.getItem2()));
}
```
### b. Mutiny
在這個Mutiny主要提供了兩種介面來實踐Reactive如下列說明:
1. Uni<?>
> 提供0個或一個執行結果的異步操作,可被做資料串流整合,通常單獨使用只有onItem(onSucess)與onFailure,兩種事件。
2. Multi<?>
> 提供多個結果的異步操作,可以做資料串流整合,onItem與onFailure。
在這裡還是要提醒一下,Reactive不管是rxJava還是Mutiny都是基於Functional Programing (函數式程式設計),如果有寫過前端框架如Angular,Rest,Vue的人應該會相對熟悉。
#### Uni 提供0個或1個執行結果的異步操作
Uni如果以Java原生的套件可以相當等於Future和CompletionStage。
##### Create Uni
```java
//建立一個直接回傳值的Uni
Uni<String> uniFromValue = Uni.createFrom().item("Hello, Mutiny!");
//創建一個有邏輯處理過後的Uni
Uni<String> uniFromSupplier = Uni.createFrom().item(() -> {
// dosomething
return "Async result";
});
```
##### 將CompletionStage轉換為Uni
```java
Uni<String> uniFromCompletionStage = Uni.createFrom().completionStage(this::asyncMethod);
```
##### 組合多個Uni
```java
Uni<String> uni1 = Uni.createFrom().item("Hello");
Uni<String> uni2 = Uni.createFrom().item("Mutiny");
Uni<String> combined = uni1.onItem()
.combine()
.with(uni2)
.by((item1, item2) -> item1 + " " + item2);
```
##### 錯誤處理
```java
Uni<String> recovered = uniFromValue
.onFailure()
.recoverWithItem("Fallback item in case of failure");
```
##### 非阻塞異步轉換為阻塞同步 (這樣寫就變成同步了要注意)
```java
String result = uniFromValue.await().indefinitely();
System.out.println(result); // 輸出:Hello, Mutiny!
```
#### Multi 提供多個結果的異步操作
與 Uni 不同的是,Multi 可以表示多筆資料項目,例如來自資料庫的多筆記錄或者一系列的事件
##### Create Multi
```java
// 創建一個集合的Multi
Multi<String> names = Multi.createFrom().items("Alice", "Bob", "Charlie");
```
##### 過濾和轉換
```java
Multi<Integer> lengths = names
.filter(name -> !name.equals("Bob")) // 過濾掉 "Bob"
.onItem().transform(String::length); // 將名稱轉換為其長度
```
##### 錯誤處理
```java
Multi<String> processed = names
.onItem().transform(name -> {
if (name.equals("Bob")) {
throw new RuntimeException("Bob not allowed!");
}
return name;
})
.onFailure().recoverWithMulti(Multi.createFrom().item("Error occurred"));
```
##### 合併
```java
Multi<String> multi1 = Multi.createFrom().items("A", "B");
Multi<String> multi2 = Multi.createFrom().items("C", "D");
Multi<String> merged = Multi.createBy().merging().streams(multi1, multi2); // 輸出:A, B, C, D
```
##### 連結
```java
Multi<String> multi1 = Multi.createFrom().items("A", "B");
Multi<String> multi2 = Multi.createFrom().items("C", "D");
Multi<String> concatenated = Multi.createBy().concatenating().streams(multi1, multi2); // 輸出:A, B (等待完成), C, D
```
### c. Infrastructure撰寫注意
實作過程串接有遇到一些問題,實測串接MongoDB段,Quarkus有提供一個響應式DB物件叫ReactiveMongoClient。如果直接使用這個物件,他基底基本上就是使用Mutiny去包MongoDB官方的MogoClient物件,預設就是響應式寫法。但實際用起來有很大的問題,在資料MongoDB Collection資料量破千後,使用ReactiveMongoClient寫法實際跑起來會比阻塞的寫法還慢。稍微查一下,感覺這個[Issue](https://github.com/quarkusio/quarkus/issues/36177)目前還是Open。所以建議寫法,還是自己使用Mutiny去包DB物件。
### d. K6.io測試結果
[範例點我](https://github.com/KCDC-Cloud-Dev/Research-Dummy/tree/feature-web-service-sample/quarkus-dummy/web-service-mongodb)
針對範例程式去使用k6.io測試,模擬1000個user同時打API,並執行1000次結果如下,數據部分可以看到響應式寫法比阻塞式寫法在connecting時間差了7倍,成功率也比較高。這為簡單以極端例子去測試,做個簡易參考。

註:實際測試checks應該要達100%,通常有幾個原因,系統乘載量真的太大(極端模擬硬體Support不足),不然就是程式碼真的寫得有問題。
## Reference
https://quarkus.pro/guides/getting-started-reactive.html
https://blog.csdn.net/Tong_Rui/article/details/110683703
https://zhuanlan.zhihu.com/p/63228618
https://quarkus.io/blog/mutiny-back-pressure/
https://dassum.medium.com/building-a-reactive-restful-web-service-using-spring-boot-and-postgres-c8e157dbc81d
http://blog.nostratech.com/2018/12/building-reactive-rest-api-using-spring.html
https://howtodoinjava.com/spring-webflux/spring-webflux-tutorial/
https://engineering.linecorp.com/en/blog/reactive-streams-armeria-2
https://smallrye.io/smallrye-mutiny/1.6.0/tutorials/creating-uni-pipelines/