# 牛年自強計畫 Week 2 - SpringBoot - 番外篇 Java 執行緒 Thread Pool ## 【前言】 前面一篇介紹過了 Thread 後,在使用上幫助了工程師解決需要同時處理的事情,但卻衍生一個問題,那就是開出了大量的 Thread 狀況下會不易管理進度,更甚會有漏掉尚在執行中的 Thread 便將程式結束的窘況。 又另一種情境,當有重複性的、大量的工作,需要開 Thread 去處理,但總共要開多少個? 假如某一段時間內湧入了 1000 份工作,要開出 1000 個 Thread 應對? 一個工作就開一新的 Thread 的話容易造成系統過度負擔而降低了處理效能,最常見的就是 Out Of Memory 問題啦~ 是否有一個東西;一個物件;一個類,可以幫助我們更好、更方便的、更快速地管理這些開出來的 Thread 且避免掉上述兩種情境呢? 這一篇將要介紹到的 **Thread Pool** 就要來幫助工程師解決這些困境。 ## 【Thread Pool 介紹】 **Thread Pool** 是一個讓工程師們可以集合 Threads,使其有序地、有限制地逐一處理,並提供許多方法讓工程師得以確認、檢視、甚至定時。 類似 DB Connection Pool,**Thread Pool** 同樣提供設定最大執行量的管理,所有提交進來的 Thread 會被妥善分配在 Queue 當中,並等到有空位的時候執行,結束後的空位會留給排列 Queue 的下一個 Thread 使用。 引用 Baeldung 的文章 ***Introduction to Thread Pools in Java*** 的圖講解  圖片來源: https://www.baeldung.com/thread-pool-java-and-guava ExecutorService 中會有專門集合工作的 Queue,並依照最大上限的 Thread 數量去執行依序的工作,並蒐集這些結果,開發師可以透過呼叫方法,一次性 or 需求性的取得這些結果的回傳集合,接續後續的處理動作。 而這 ExecutorService 又是什麼呢? ## 【Executor 類家族介紹】 Java Executor 類能夠幫助開發者將 Task 和 Thread 做分離,達到任務歸任務類、執行歸執行類的方法進行解耦,下面是 Kai 做的關係圖,帶大家快速認識這幾個類的繼承、實作邏輯~  **【Executor】** 是 Java 用來實作 Thread Pool 的 Interface,在這 Interface 中訂定了 execute() 最基本的執行方法,剛方法。 **【ExecutorService】** 繼承了 **Executor**,並訂定了基本完整的 Thread Pool 操作方法,包含執行、停止、取得結果等,為後續衍生的 **Executor 類**奠定了運行處理 Thread Pool 的基礎。 方法使用說明如下: |方法|使用說明| | ---- | ---- | | **submit** | 送入 Runnable or Callable 類並執行,不論有無回傳值,都會在完成後回傳一個 Future 類。代入參數部分可用 Lambda 方式處理為 ()->Function(Object) 形式,將 "把類別處理為 Runnable" 的流程交給 Lambda 糖果包解決| | **invokeAll** | 一次執行所有的 Task,回傳所有接收的 Future 類 | | **invokeAny** | 一次執行所有的 Task,回傳最快接收的 Future 類 | | **shutdown** | 阻止 submit 新的任務,不會中斷等待中的任務以及執行中的任務;對於定期任務而言會因失去 interrupted 狀態而繼續執行造成錯誤,需要針對任務本身的 **ScheduledFuture** 作 **cancel(true)** 的處理 | | **shutdownNow** | 阻止 submit 新的任務、取消等待中的任務、嘗試中斷執行中的任務;對於定期任務而言會因失去 interrupted 狀態而繼續執行造成錯誤,需要針對任務本身的 **ScheduledFuture** 作 **cancel(true)** 的處理 | | **isShutdown** | 判斷是否為 shutdown 狀況 | | **isTerminating** | 判斷是否為正在超時等待狀況 | | **isTerminated** | 判斷是否結束超時等待狀況 | | **awaitTermination** | 設定等待機制,將會偵測是否仍有執行中的 Task,若**有**則啟動超時等待且返回 **false**;若**無**則會接續執行後續程式且返回 **true**。通常編寫在執行 **shutdown** 和 **shutdownNow** 之後,作為時間限制的最後一道防線使用 | > shutdown 和 shutdownNow 無法阻止定期任務持續運作的問題請洽這篇文章 [Java线程池的shutdownnow()方法为什么不能停止运行的任务?](https://blog.csdn.net/mucaoyx/article/details/109153721) **【AbstractExecutorService】** 實作了 submit() 方法,並在其中使用會回傳 **RunnableFuture** 的 newTaskFor() 方法,然後接續 execute() 進行執行的動作。其餘則有不少新的參數和方法進行定義,並交由 **ThreadPoolExecutor** 進行實現。 方法使用說明如下: |方法|使用說明| | ---- | ---- | | **newTaskFor** | 在 submit() 中執行包裝 Runnable or Callable 並回傳 RunnableFuture 類,交由 executor() 執行 | **【ThreadPoolExecutor】** 實作了 **Queue** 的執行流程,並額外提供了諸多用於確認 ThreadPool 目前狀態、參數的方法,是最完整、最常被使用的 **Executor 類**。 |方法|使用說明| | ---- | ---- | | **newFixedThreadPool** | 建構一個全新的 ThreadPool 並給予最大可同時執行 Thread 數量的限制 | | **getActivityCount** | 返回目前執行中的 Thread 數量,最大上限為 Thread 允許數量 | | **getPoolSize** | 返回設定的最大可同時執行 Thread 數量值 | | **getQueue** | 返回存放 Tasks 的 Queue 物件,可透過 getQueue().size() 取得等待值行的 Tasks 數量 | | **getCompletedTaskCount** | 返回已完成的 Tasks 數量 | **【ScheduledExecutorService】** 另外一種為了實現時間控制功能而衍生的 Executor 類,其實作與定義的方法與 AbstractExecutorService 有諸多不同,但因實務面上甚少使用,只需要了解即可。 方法使用說明如下: |方法|使用說明| | ---- | ---- | | **schedule** | 放入一個 work、時間參數、時間單位,Task 會在經過 時間參數 * 時間單位後,將 Task 放入空的 Thread 執行,若當下沒有空的 Thread,則進行等待,直到有了之後馬上被分配執行| **【ScheduledThreadPoolExecutor】** 實作了 **ScheduledExecutorService** 以及繼承 **ThreadPoolExecutor**,可當作處理執行時間和頻率的 **ThreadPoolExecutor** 強化版,但因功能面相的不同,因此不會在不需要強調時間控制的部分去使用這個類別。 方法使用說明如下: |方法|使用說明| | ---- | ---- | | **scheduleAtFixedRate** | 設定執行頻率。※ 若前一次 Task 尚未結束 | | **scheduleWithFixedDelay**| 設定每一次執行後,多久會再次執行。| ## 【Executors】 **Executors** 類,是**獨立**於上述所有類的存在,其本身提供許多建立 ThreadPool 的 Static 方法,幫助開發者可以簡單的去建立各式各樣的 ThreadPool 類實體。 方法使用說明如下: |方法|使用說明| | ---- | ---- | | **newFixedThreadPool** | 建立一個有最大上限的 Thread Pool,並回傳 | | **newCachedThreadPool**| 建立一個帶有緩存的 Thread Pool,並回傳| | **newSingleThreadExecutor** | 建立一個僅有單一實體的 Thread Pool,並回傳 | | **newScheduledThreadPool**| 建立一個可以安排時程處理的 Thread Pool,並回傳| Kai 馬上運用了 Executors 類在下面的範例中。 ## 【範例】 **build.gradle** ```xml= plugins { id 'org.springframework.boot' version '2.3.3.RELEASE' id 'io.spring.dependency-management' version '1.0.8.RELEASE' id 'java' } group 'org.example' version '1.0-SNAPSHOT' repositories { mavenCentral() } configurations.all { exclude group: 'org.springframework.boot', module: 'spring-boot-starter-logging' } dependencies { implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.junit.jupiter:junit-jupiter-api:5.6.0' implementation 'org.junit.jupiter:junit-jupiter-engine:5.6.0' implementation 'org.springframework.boot:spring-boot-starter-log4j2' } test { useJUnitPlatform() } ``` **log4j.properties** ```xml= appender.console.type = Console appender.console.name = STDOUT appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n rootLogger.level = DEBUG rootLogger.appenderRefs = stdout rootLogger.appenderRef.stdout.ref = STDOUT ``` **test.java** ```java= package Thread.ThreadPool; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class test { static Logger logger = LoggerFactory.getLogger(test.class); @DisplayName("Test ThreadPool : Test Case 1 : Normal Loop Check") @Test public void testCase1() { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); threadPoolExecutor.submit(() -> { Thread.sleep(3000); return null; }); threadPoolExecutor.submit(() -> { Thread.sleep(3000); return null; }); threadPoolExecutor.submit(() -> { Thread.sleep(3000); return null; }); threadPoolExecutor.submit(() -> { Thread.sleep(9000); return null; }); while(threadPoolExecutor.getActiveCount() > 0){ logger.debug("Pool Size: " + threadPoolExecutor.getPoolSize()); logger.debug("Activity Threads: " + threadPoolExecutor.getActiveCount()); logger.debug("Queue Size: " + threadPoolExecutor.getQueue().size()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } @DisplayName("Test ThreadPool : Test Case 2 : awaitTermination Check") @Test public void testCase2() { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); threadPoolExecutor.submit(() -> { Thread.sleep(3000); return null; }); threadPoolExecutor.submit(() -> { Thread.sleep(3000); return null; }); threadPoolExecutor.submit(() -> { Thread.sleep(3000); return null; }); threadPoolExecutor.submit(() -> { Thread.sleep(9000); return null; }); /* ThreadPoolExecutor before using shutdown(), all false */ logger.debug("isTerminated: " + threadPoolExecutor.isTerminated()); logger.debug("isTerminating: " + threadPoolExecutor.isTerminating()); threadPoolExecutor.shutdown(); /* ThreadPoolExecutor after using shutdown(), * isShutdown() => true; isTerminated() => false; isTerminating() => false */ logger.debug("isShutdown: " + threadPoolExecutor.isShutdown()); logger.debug("isTerminated: " + threadPoolExecutor.isTerminated()); logger.debug("isTerminating: " + threadPoolExecutor.isTerminating()); try { /* awaitTermination encounter executing task, return false */ logger.debug("awaitTermination: "+threadPoolExecutor.awaitTermination(3, TimeUnit.SECONDS)); } catch (InterruptedException e) { e.printStackTrace(); } /* ThreadPoolExecutor after using shutdown(), * isTerminated() => false; isTerminating() => true */ logger.debug("isTerminated: " + threadPoolExecutor.isTerminated()); logger.debug("isTerminating: " + threadPoolExecutor.isTerminating()); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } try { /* all tasks finished, return true */ logger.debug("awaitTermination: "+threadPoolExecutor.awaitTermination(3, TimeUnit.SECONDS)); } catch (InterruptedException e) { e.printStackTrace(); } /* ThreadPoolExecutor after using shutdown(), * isTerminated() => true; isTerminating() => false */ logger.debug("isTerminated: " + threadPoolExecutor.isTerminated()); logger.debug("isTerminating: " + threadPoolExecutor.isTerminating()); } @BeforeAll public static void BeforeAll(){ logger.debug("Test Start"); } @AfterAll public static void AfterAll(){ logger.debug("Test End"); } } ``` 範例中設計了 testCase1 和 testCase2 分別展示了使用一般方式處理判定 ThreadPool 執行的狀況以及透過 **awaitTermination** 方式的處理。 testCase2 則很明白的呈現了 **isShutdown()**、**isTerminated()**、**isTerminating()** 的使用差別。 > 別忘了,**shutdown()** 或 **shutdonwNow()** 幾乎都會搭配 **awaitTermination()** 一起使用。 而不論 **isShutdown()**、**isTerminated()**、**isTerminating()**,都無法用來作為判定 ThreadPool 是否已完成所有 Tasks 的參數,因此還是要使用到 **getActivityCount()**,通常沒有正在執行的任務時,就可以視為已完成所有任務,若任務因各種原因 Delay、暫緩、無回應則應該要有各被執行任務的一些 Feedback 去處理,而不是在 ThreadPool 這邊加功夫。 首頁 [Kai 個人技術 Hackmd](/2G-RoB0QTrKzkftH2uLueA) ###### tags: `Spring Boot`
×
Sign in
Email
Password
Forgot password
or
By clicking below, you agree to our
terms of service
.
Sign in via Facebook
Sign in via Twitter
Sign in via GitHub
Sign in via Dropbox
Sign in with Wallet
Wallet (
)
Connect another wallet
New to HackMD?
Sign up