# 牛年自強計畫 Week 6 - SpringBoot - 番外篇 Java 執行緒 CompletableFuture
## 【前言】
要來講 Java 8 最後一個關於 Future 更動 -- CompletableFuture 啦!
被譽為 Java 8 三神器之一的 CompletableFuture 從出現就為執行緒的應用帶來諸多改變。
下面就為大家介紹介紹~
> Java 8 三神器: Stream API, Optional API, CompletableFuture.
## 【CompletableFuture】
做為 Future 的實作,自然保有了 Future 所有的特性,並提供了四種可快速執行非同步動作的方法
| 方法名稱 | 說明 |
| ---- | ---- |
| **runAsync** (Runnable runnable) | 執行一個沒有回傳值的非同步動作,並在預設的 Thread Pool 中執行,預設為 ForkJoinPool.commonPool()|
| **runAsync** (Runnable runnable, Executor executor) | 執行一個沒有回傳值的非同步動作,並在指定的 Thread Pool 中執行|
| **supplyAsync** (Supplier supplier) | 執行一個有回傳值的非同步動作,並在預設的 Thread Pool 中執行,預設為 ForkJoinPool.commonPool()|
| **supplyAsync** (Supplier supplier, Executor executor) | 執行一個有回傳值的非同步動作,並在指定的 Thread Pool 中執行|
同時也實作了 **CompletionStage** 這個介面,讓其有了類似於 JS Promise 的功能、整合連續流程架構,以及多分支合併結果的方法等。
Kai 會用下面的名詞去介紹這三種特性:
- **Listenable**: 類 JS Promise 功能 (CallBack)
- **Composible**: 連續流程
- **Combinable**: 多分支
### Listenable
有在寫 JS 的開發人員相信對 CallBack 指令不陌生,這是針對非同步處理有結果的時候,進行結果處理的程序。
透過 CallBack 方式,開發人員不需要在意程式何時會取得結果,而只需要專注於取得結果後的業務邏輯實現。
以往的 Future 只有透過 get() 的方式作取得的結果,且程序會停在該點上,直到取得結果為止。
而 CompletableFuture 則新加入了幾種方法讓我們能夠作出類似 CallBack 一樣的效果。
只不過無法像 JS CallBack 可以完全非同步,CompleteFuture 仍會停在 get() 方法那,差別在於當完成後,程式會自動執行 **whenComplete()** 或 **handle()** 的部分。
| 方法名稱 | 說明 |
| ---- | ---- |
| **whenComplete** ((result, throwable) -> {}) | 當非同步處理完成後,將結果或拋出帶到 CallBack function 中|
| **handle** ((result, throwable) -> { return result;}) | 當非同步處理完成後,將結果或拋出帶到 CallBack function 中,最後回傳結果|
#### 範例
```java=
@DisplayName("Test CompletableFuture: whenComplete() ")
@Test
public void testCase1() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello World!";
}).whenComplete((result, throwable)->{
System.out.println(result);
test();
});
future.get();
}
@DisplayName("Test CompletableFuture: handle() ")
@Test
public void testCase2() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello World!";
}).handle((result, throwable)->{
test();
return result;
});
String getString = future.get();
System.out.println(getString);
}
public void test(){
System.out.println("Finish!!");
}
```
#### 範例結果
```java=
/*** whenComplete() ***/
Test Start
Hello World!
Finish!!
Test End
/*** handle() ***/
Test Start
Finish!!
Hello World!
Test End
```
### Composible
有的時候,我們會需要在一件非同步處理完成後接續執行下一步的動作,且這個動作跟主流程無關,為一串非同步的連續流程。
若在前頭透過 whenComplete() 的方法中加入第二步的處理,那樣的結構看起來還尚可閱讀,但如果需要第三層? 第四層? 那便會讓整段程式碼不堪入目的複雜。
因此針對這種連續流程,CompletableFuture 提供符合使用情境的方法如下:
| 方法名稱 | 說明 |
| ---- | ---- |
| **thenRun** (Runnable runnable) | 接續執行新的非同步任務,無回傳值,預設使用上一個 Runnable 的 Thread|
| **thenRunAsync** (Runnable runnable) | 接續執行新的非同步任務,無回傳值,預設使用 ForkJoinPool.commonPool(),也就是說可能發生排隊的狀況|
| **thenRunAsync** (Runnable runnable, Executor executor) | 接續執行新的非同步任務,無回傳值,預設使用指定的 Thread Pool|
| **thenApply** (Runnable runnable) | 接續執行新的非同步任務,有回傳值,預設使用上一個 Runnable 的 Thread|
| **thenApplyAsync** (Runnable runnable) | 接續執行新的非同步任務,有回傳值,預設使用 ForkJoinPool.commonPool(),也就是說可能發生排隊的狀況|
| **thenApplyAsync** (Runnable runnable, Executor executor) | 接續執行新的非同步任務,有回傳值,預設使用指定的 Thread Pool|
| **thenAccept** (Runnable runnable) | 接續動作,透過 Consumer function 去接回傳值,並且不再傳出任何結果|
| **thenAcceptAsync** (Runnable runnable) | 接續動作,透過 Consumer function 去接回傳值,並且不再傳出任何結果。預設使用 ForkJoinPool.commonPool() |
| **thenAcceptAsync** (Runnable runnable, Executor executor) | 接續動作,透過 Consumer function 去接回傳值,並且不再傳出任何結果。使用指定的 Thread Pool|
| **thenCompose** (Runnable runnable) | 接續執行新的非同步任務,回傳一個 CompletableFuture,預設使用上一個 Runnable 的 Thread|
| **thenComposeAsync** (Runnable runnable) | 接續執行新的非同步任務,回傳一個 CompletableFuture,預設使用 ForkJoinPool.commonPool(),也就是說可能發生排隊的狀況|
| **thenComposeAsync** (Runnable runnable, Executor executor) | 接續執行新的非同步任務,回傳一個 CompletableFuture,預設使用指定的 Thread Pool|
簡單來說:
- **Run** 和 **Accept** 無回傳值;**Apply** 有回傳值
- **Apply** 回傳**Object**;**Compose** 回傳 **CompletableFuture**
- **thenXXX ()** 和 **thenXXXAsync (..., Eexcutor)** 的執行狀況會比 **thenXXXAsync ()** 穩定
#### 範例
```java=
@DisplayName("Test CompletableFuture: thenRun() and thenApply() ")
@Test
public void testCase3() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.runAsync(() -> {
try {
sleep(1000);
System.out.println("Stop at First Runnable");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).thenRun(()->{
try {
sleep(1000);
System.out.println("Stop at Second Runnable");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).thenApply((result)->{
return "Hello World!";
}).whenComplete((result, throwable)->{
System.out.println(result);
test();
});
future.get();
}
@DisplayName("Test CompletableFuture: thenAccept() and thenCompose() ")
@Test
public void testCase4() throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
return "Hello";
}).thenCompose(result -> {
return CompletableFuture.supplyAsync(() -> {return result+" World!";});
}).thenAccept(result -> {
System.out.println("Got Result:" +result);
});
future.get();
}
```
#### 範例結果
```java=
/*** thenRun() and thenApply() ***/
Test Start
Stop at First Runnable
Stop at Second Runnable
Hello World!
Finish!!
Test End
/*** thenAccept() and thenCompose() ***/
Test Start
Got Result:Hello World!
Test End
```
### Combinable
非同步的任務,有時候並不會只開出一條分支出去,甚至一條分支出去後還可能岔出更多分支出來,這種類似遞迴的情況常出現在爬蟲等操作上。
CompletableFuture 提供對於兩個 CompletableFuture 類的整合處理,針對兩個以上的複數時,則再透過兩兩配對的方式逐一縮合為一。
方法又分成兩者皆須完成以及,兩者其中之一完成即可 (未結束的一方仍會完成任務)。
兩者皆完成方法:
| 方法名稱 | 說明 |
| ---- | ---- |
| **runAfterBoth** (CompletionStage, Runnable runnable) | 整合兩個無回傳值的 CompletableFuture 類|
| **thenAcceptBoth** (CompletionStage, Runnable runnable) | 整合兩個有回傳值的 CompletableFuture 類,但不能再回傳結果|
| **thenCombine** (CompletionStage, Runnable runnable) | 整合兩個有回傳值的 CompletableFuture 類,須再回傳結果|
#### 範例
```java=
@DisplayName("Test CompletableFuture: runAfterBoth()")
@Test
public void testCase5() throws ExecutionException, InterruptedException {
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
System.out.println("Run 1");
});
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
System.out.println("Run 2");
});
CompletableFuture<Void> future3 = future1.runAfterBoth(future2, ()->{
System.out.println("Run 3");
}).whenComplete((Void, throwable)->{
System.out.println("Finished future3.");
});
future3.get();
}
@DisplayName("Test CompletableFuture: thenAcceptBoth()")
@Test
public void testCase6() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
return 100;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
return 200;
});
CompletableFuture<Void> future3 = future1.thenAcceptBoth(future2, (result1, result2)->{
System.out.println(result1 + result2);
});
future3.get();
}
@DisplayName("Test CompletableFuture: thenCombine()")
@Test
public void testCase7() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
return 100;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
return 200;
});
CompletableFuture<Integer> future3 = future1.thenCombine(future2, (result1, result2)->{
return result1 + result2;
});
Integer getInt = future3.get();
System.out.println(getInt);
}
```
#### 範例結果
```java=
/*** runAfterBoth() ***/
Test Start
Run 1
Run 2
Run 3
Finished future3.
Test End
/*** thenAcceptBoth() ***/
Test Start
300
Test End
/*** thenCombine() ***/
Test Start
300
Test End
```
兩者其中一個完成方法 (未結束的一方會在主程序結束前盡力完成任務):
| 方法名稱 | 說明 |
| ---- | ---- |
| **runAfterEither** (CompletionStage, Runnable runnable) | 整合兩個無回傳值的 CompletableFuture 類。先結束的一方會讓流程提早進入 runAfterEither(),但未結束的一方在時限內,有機會可以執行完全|
| **acceptEither** (CompletionStage, Runnable runnable) | 整合兩個有回傳值的 CompletableFuture 類,但不能再回傳結果。先結束的一方會讓流程提早進入 runAfterEither(),但未結束的一方在時限內,有機會可以執行完全|
| **applyToEither** (CompletionStage, Runnable runnable) | 整合兩個有回傳值的 CompletableFuture 類,須再回傳結果。先結束的一方會讓流程提早進入 runAfterEither(),但未結束的一方在時限內,有機會可以執行完全|
#### 範例
```java=
@DisplayName("Test CompletableFuture: runAfterEither()")
@Test
public void testCase8() throws ExecutionException, InterruptedException {
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
/*** the fastest would be the output ***/
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Run 1");
});
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
/*** the fastest would be the output ***/
try {
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Run 2");
});
CompletableFuture<Void> future3 = future1.runAfterEither(future2, ()->{
System.out.println("Run 3");
}).whenComplete((Void, throwable)->{
/*** if the deadline bigger than the rest Future, it can finish its task. ***/
try {
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Finished future3.");
});
future3.get();
}
@DisplayName("Test CompletableFuture: acceptEither()")
@Test
public void testCase9() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
/*** the fastest would be the output ***/
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
/*** the fastest would be the output ***/
try {
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 200;
});
CompletableFuture<Void> future3 = future1.acceptEither(future2, (result)->{
/*** if the deadline bigger than the rest Future, it can finish its task. ***/
try {
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(result);
});
future3.get();
}
@DisplayName("Test CompletableFuture: applyToEither()")
@Test
public void testCase10() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
/*** the fastest would be the output ***/
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
/*** the fastest would be the output ***/
try {
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 200;
});
CompletableFuture<Integer> future3 = future1.applyToEither(future2, (result)->{
return result;
});
Integer getInt = future3.get();
System.out.println(getInt);
/*** if the deadline bigger than the rest Future, it can finish its task. ***/
try {
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
```
#### 結果
```java=
/*** runAfterEither() ***/
Test Start
Run 1
Run 3
Run 2
Finished future3.
Test End
/*** acceptEither() ***/
Test Start
100
Test End
/*** applyToEither() ***/
Test Start
100
Test End
```
## 【結語】
執行緒類的學習,將在這篇文章後告個段落,算是為以前沒有好好理解這東西添上一個學習的里程碑了!
接下來的內容應該會優先去補一些以前沒注意的細節為主。都是運用在開發上後開始遇到的部分,例如說 Java 程式碼的優化、Clean Code、最早的 Docker 和 SpringBoot JPA 都有可以補充的地方,甚至是作專案的一些工具、心態、方式等等。
請大家繼續指教了!
首頁 [Kai 個人技術 Hackmd](/2G-RoB0QTrKzkftH2uLueA)
###### tags: `Spring Boot`