# 牛年自強計畫 Week 6 - SpringBoot - 番外篇 Java 執行緒 CompletableFuture ## 【前言】 要來講 Java 8 最後一個關於 Future 更動 -- CompletableFuture 啦! 被譽為 Java 8 三神器之一的 CompletableFuture 從出現就為執行緒的應用帶來諸多改變。 下面就為大家介紹介紹~ > Java 8 三神器: Stream API, Optional API, CompletableFuture. ## 【CompletableFuture】 做為 Future 的實作,自然保有了 Future 所有的特性,並提供了四種可快速執行非同步動作的方法 | 方法名稱 | 說明 | | ---- | ---- | | **runAsync** (Runnable runnable) | 執行一個沒有回傳值的非同步動作,並在預設的 Thread Pool 中執行,預設為 ForkJoinPool.commonPool()| | **runAsync** (Runnable runnable, Executor executor) | 執行一個沒有回傳值的非同步動作,並在指定的 Thread Pool 中執行| | **supplyAsync** (Supplier supplier) | 執行一個有回傳值的非同步動作,並在預設的 Thread Pool 中執行,預設為 ForkJoinPool.commonPool()| | **supplyAsync** (Supplier supplier, Executor executor) | 執行一個有回傳值的非同步動作,並在指定的 Thread Pool 中執行| 同時也實作了 **CompletionStage** 這個介面,讓其有了類似於 JS Promise 的功能、整合連續流程架構,以及多分支合併結果的方法等。 Kai 會用下面的名詞去介紹這三種特性: - **Listenable**: 類 JS Promise 功能 (CallBack) - **Composible**: 連續流程 - **Combinable**: 多分支 ### Listenable 有在寫 JS 的開發人員相信對 CallBack 指令不陌生,這是針對非同步處理有結果的時候,進行結果處理的程序。 透過 CallBack 方式,開發人員不需要在意程式何時會取得結果,而只需要專注於取得結果後的業務邏輯實現。 以往的 Future 只有透過 get() 的方式作取得的結果,且程序會停在該點上,直到取得結果為止。 而 CompletableFuture 則新加入了幾種方法讓我們能夠作出類似 CallBack 一樣的效果。 只不過無法像 JS CallBack 可以完全非同步,CompleteFuture 仍會停在 get() 方法那,差別在於當完成後,程式會自動執行 **whenComplete()** 或 **handle()** 的部分。 | 方法名稱 | 說明 | | ---- | ---- | | **whenComplete** ((result, throwable) -> {}) | 當非同步處理完成後,將結果或拋出帶到 CallBack function 中| | **handle** ((result, throwable) -> { return result;}) | 當非同步處理完成後,將結果或拋出帶到 CallBack function 中,最後回傳結果| #### 範例 ```java= @DisplayName("Test CompletableFuture: whenComplete() ") @Test public void testCase1() throws ExecutionException, InterruptedException { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "Hello World!"; }).whenComplete((result, throwable)->{ System.out.println(result); test(); }); future.get(); } @DisplayName("Test CompletableFuture: handle() ") @Test public void testCase2() throws ExecutionException, InterruptedException { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "Hello World!"; }).handle((result, throwable)->{ test(); return result; }); String getString = future.get(); System.out.println(getString); } public void test(){ System.out.println("Finish!!"); } ``` #### 範例結果 ```java= /*** whenComplete() ***/ Test Start Hello World! Finish!! Test End /*** handle() ***/ Test Start Finish!! Hello World! Test End ``` ### Composible 有的時候,我們會需要在一件非同步處理完成後接續執行下一步的動作,且這個動作跟主流程無關,為一串非同步的連續流程。 若在前頭透過 whenComplete() 的方法中加入第二步的處理,那樣的結構看起來還尚可閱讀,但如果需要第三層? 第四層? 那便會讓整段程式碼不堪入目的複雜。 因此針對這種連續流程,CompletableFuture 提供符合使用情境的方法如下: | 方法名稱 | 說明 | | ---- | ---- | | **thenRun** (Runnable runnable) | 接續執行新的非同步任務,無回傳值,預設使用上一個 Runnable 的 Thread| | **thenRunAsync** (Runnable runnable) | 接續執行新的非同步任務,無回傳值,預設使用 ForkJoinPool.commonPool(),也就是說可能發生排隊的狀況| | **thenRunAsync** (Runnable runnable, Executor executor) | 接續執行新的非同步任務,無回傳值,預設使用指定的 Thread Pool| | **thenApply** (Runnable runnable) | 接續執行新的非同步任務,有回傳值,預設使用上一個 Runnable 的 Thread| | **thenApplyAsync** (Runnable runnable) | 接續執行新的非同步任務,有回傳值,預設使用 ForkJoinPool.commonPool(),也就是說可能發生排隊的狀況| | **thenApplyAsync** (Runnable runnable, Executor executor) | 接續執行新的非同步任務,有回傳值,預設使用指定的 Thread Pool| | **thenAccept** (Runnable runnable) | 接續動作,透過 Consumer function 去接回傳值,並且不再傳出任何結果| | **thenAcceptAsync** (Runnable runnable) | 接續動作,透過 Consumer function 去接回傳值,並且不再傳出任何結果。預設使用 ForkJoinPool.commonPool() | | **thenAcceptAsync** (Runnable runnable, Executor executor) | 接續動作,透過 Consumer function 去接回傳值,並且不再傳出任何結果。使用指定的 Thread Pool| | **thenCompose** (Runnable runnable) | 接續執行新的非同步任務,回傳一個 CompletableFuture,預設使用上一個 Runnable 的 Thread| | **thenComposeAsync** (Runnable runnable) | 接續執行新的非同步任務,回傳一個 CompletableFuture,預設使用 ForkJoinPool.commonPool(),也就是說可能發生排隊的狀況| | **thenComposeAsync** (Runnable runnable, Executor executor) | 接續執行新的非同步任務,回傳一個 CompletableFuture,預設使用指定的 Thread Pool| 簡單來說: - **Run** 和 **Accept** 無回傳值;**Apply** 有回傳值 - **Apply** 回傳**Object**;**Compose** 回傳 **CompletableFuture** - **thenXXX ()** 和 **thenXXXAsync (..., Eexcutor)** 的執行狀況會比 **thenXXXAsync ()** 穩定 #### 範例 ```java= @DisplayName("Test CompletableFuture: thenRun() and thenApply() ") @Test public void testCase3() throws ExecutionException, InterruptedException { CompletableFuture<String> future = CompletableFuture.runAsync(() -> { try { sleep(1000); System.out.println("Stop at First Runnable"); } catch (InterruptedException e) { e.printStackTrace(); } }).thenRun(()->{ try { sleep(1000); System.out.println("Stop at Second Runnable"); } catch (InterruptedException e) { e.printStackTrace(); } }).thenApply((result)->{ return "Hello World!"; }).whenComplete((result, throwable)->{ System.out.println(result); test(); }); future.get(); } @DisplayName("Test CompletableFuture: thenAccept() and thenCompose() ") @Test public void testCase4() throws ExecutionException, InterruptedException { CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { return "Hello"; }).thenCompose(result -> { return CompletableFuture.supplyAsync(() -> {return result+" World!";}); }).thenAccept(result -> { System.out.println("Got Result:" +result); }); future.get(); } ``` #### 範例結果 ```java= /*** thenRun() and thenApply() ***/ Test Start Stop at First Runnable Stop at Second Runnable Hello World! Finish!! Test End /*** thenAccept() and thenCompose() ***/ Test Start Got Result:Hello World! Test End ``` ### Combinable 非同步的任務,有時候並不會只開出一條分支出去,甚至一條分支出去後還可能岔出更多分支出來,這種類似遞迴的情況常出現在爬蟲等操作上。 CompletableFuture 提供對於兩個 CompletableFuture 類的整合處理,針對兩個以上的複數時,則再透過兩兩配對的方式逐一縮合為一。 方法又分成兩者皆須完成以及,兩者其中之一完成即可 (未結束的一方仍會完成任務)。 兩者皆完成方法: | 方法名稱 | 說明 | | ---- | ---- | | **runAfterBoth** (CompletionStage, Runnable runnable) | 整合兩個無回傳值的 CompletableFuture 類| | **thenAcceptBoth** (CompletionStage, Runnable runnable) | 整合兩個有回傳值的 CompletableFuture 類,但不能再回傳結果| | **thenCombine** (CompletionStage, Runnable runnable) | 整合兩個有回傳值的 CompletableFuture 類,須再回傳結果| #### 範例 ```java= @DisplayName("Test CompletableFuture: runAfterBoth()") @Test public void testCase5() throws ExecutionException, InterruptedException { CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> { System.out.println("Run 1"); }); CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> { System.out.println("Run 2"); }); CompletableFuture<Void> future3 = future1.runAfterBoth(future2, ()->{ System.out.println("Run 3"); }).whenComplete((Void, throwable)->{ System.out.println("Finished future3."); }); future3.get(); } @DisplayName("Test CompletableFuture: thenAcceptBoth()") @Test public void testCase6() throws ExecutionException, InterruptedException { CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { return 100; }); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> { return 200; }); CompletableFuture<Void> future3 = future1.thenAcceptBoth(future2, (result1, result2)->{ System.out.println(result1 + result2); }); future3.get(); } @DisplayName("Test CompletableFuture: thenCombine()") @Test public void testCase7() throws ExecutionException, InterruptedException { CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { return 100; }); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> { return 200; }); CompletableFuture<Integer> future3 = future1.thenCombine(future2, (result1, result2)->{ return result1 + result2; }); Integer getInt = future3.get(); System.out.println(getInt); } ``` #### 範例結果 ```java= /*** runAfterBoth() ***/ Test Start Run 1 Run 2 Run 3 Finished future3. Test End /*** thenAcceptBoth() ***/ Test Start 300 Test End /*** thenCombine() ***/ Test Start 300 Test End ``` 兩者其中一個完成方法 (未結束的一方會在主程序結束前盡力完成任務): | 方法名稱 | 說明 | | ---- | ---- | | **runAfterEither** (CompletionStage, Runnable runnable) | 整合兩個無回傳值的 CompletableFuture 類。先結束的一方會讓流程提早進入 runAfterEither(),但未結束的一方在時限內,有機會可以執行完全| | **acceptEither** (CompletionStage, Runnable runnable) | 整合兩個有回傳值的 CompletableFuture 類,但不能再回傳結果。先結束的一方會讓流程提早進入 runAfterEither(),但未結束的一方在時限內,有機會可以執行完全| | **applyToEither** (CompletionStage, Runnable runnable) | 整合兩個有回傳值的 CompletableFuture 類,須再回傳結果。先結束的一方會讓流程提早進入 runAfterEither(),但未結束的一方在時限內,有機會可以執行完全| #### 範例 ```java= @DisplayName("Test CompletableFuture: runAfterEither()") @Test public void testCase8() throws ExecutionException, InterruptedException { CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> { /*** the fastest would be the output ***/ try { sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Run 1"); }); CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> { /*** the fastest would be the output ***/ try { sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Run 2"); }); CompletableFuture<Void> future3 = future1.runAfterEither(future2, ()->{ System.out.println("Run 3"); }).whenComplete((Void, throwable)->{ /*** if the deadline bigger than the rest Future, it can finish its task. ***/ try { sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Finished future3."); }); future3.get(); } @DisplayName("Test CompletableFuture: acceptEither()") @Test public void testCase9() throws ExecutionException, InterruptedException { CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { /*** the fastest would be the output ***/ try { sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return 100; }); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> { /*** the fastest would be the output ***/ try { sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return 200; }); CompletableFuture<Void> future3 = future1.acceptEither(future2, (result)->{ /*** if the deadline bigger than the rest Future, it can finish its task. ***/ try { sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(result); }); future3.get(); } @DisplayName("Test CompletableFuture: applyToEither()") @Test public void testCase10() throws ExecutionException, InterruptedException { CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { /*** the fastest would be the output ***/ try { sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return 100; }); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> { /*** the fastest would be the output ***/ try { sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return 200; }); CompletableFuture<Integer> future3 = future1.applyToEither(future2, (result)->{ return result; }); Integer getInt = future3.get(); System.out.println(getInt); /*** if the deadline bigger than the rest Future, it can finish its task. ***/ try { sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } ``` #### 結果 ```java= /*** runAfterEither() ***/ Test Start Run 1 Run 3 Run 2 Finished future3. Test End /*** acceptEither() ***/ Test Start 100 Test End /*** applyToEither() ***/ Test Start 100 Test End ``` ## 【結語】 執行緒類的學習,將在這篇文章後告個段落,算是為以前沒有好好理解這東西添上一個學習的里程碑了! 接下來的內容應該會優先去補一些以前沒注意的細節為主。都是運用在開發上後開始遇到的部分,例如說 Java 程式碼的優化、Clean Code、最早的 Docker 和 SpringBoot JPA 都有可以補充的地方,甚至是作專案的一些工具、心態、方式等等。 請大家繼續指教了! 首頁 [Kai 個人技術 Hackmd](/2G-RoB0QTrKzkftH2uLueA) ###### tags: `Spring Boot`