--- title: TOOLS- kettle tags: JAVA description: LEARNING KETTLE. --- # [THREAD] JAVA執行緒 :::info :bulb: Before the 1:1 meeting, ask the manager and employee to fill in the answer. You can design questions according to your needs. ::: ## :white_check_mark: 何謂執行緒 ### 1. 前言介紹 - program 程式: code的集合,尚未load進記憶體,可以被執行多次(有多個process),靜態的程式碼 - process 程序: 是被電腦執行的程式實體(instance),包含有code、系統資源(像是memory, file, i/o devices),變成可被執行狀態,例如我們點開應用程式的行為,就是在將program化為process,包含有memory space跟thread(s),可以藉由inter-process communication(IPC) mechanisms相互溝通。 - thread 執行緒: 程序的次單位,每個thread有自己的program counter, stack及Local變數,CPU調度級分派的基本單位,包含在程序當中,共享程序當中的資源及memory space,各負責一個功能,可以在單一的執行緒當中,執行多個tasks,也就是說執行緒是一連串的指示,在同一個程式當中,各個執行緒能獨立並行執行tasks. thread溝通方式,可以透過shared memory, synchronization機制(LOCK)或SEMAPHORES(旗號)<!--特定數值的號誌,根據情境做加減,當數值為0時為unsignaled 大於0時為signaled,依情況管理執行緒對資源的操作,不過無法保證進入順序-->或condition變數 ## :heavy_check_mark: 單執行緒與多執行緒 <!--需包含比較--> ### :flags: 單執行緒 程式在同一個時間只會執行一個thread,執行需按照特定順序,前面完成後面才能執行,也就是task不能獨立執行,所以像是遇到blocking的時候,就會延宕進入等待,直到完成後才會恢復執行,表示有synchronous操作。 - 適用於簡單的tasks,不需要負責的計算或平行處理(同時間做多件事),也不需要在意響應時間,事件之間有相依關係,常用於點對點服務,通常效率較多執行緒低一點,如:計算機輸入計算輸出 - 優勢為容易撰寫及debugging、較低的開支(overhead)、系統穩定、擴展性強 ### :flags: 多執行緒 程式在同一個時間,執行多個執行緒,多個執行緒之間彼此獨立,相互不影響,能同步執行,有異步asynchronous操作 - 適用於較複雜的tasks,需要平行處理(後台有些處理需求)或大量的計算操作,需求高併發(concurrency),如:網頁瀏覽器、遊戲(怪物、背景、音樂)、DB、 - 優勢為提升響應性(responsiveness),優化圖形介面,增加使用者體驗、較好的效能,改善CPU使用效率、優先秩序安排(prioritization)、改善結構,將複雜邏輯分離開,易於理解與修改 - 多執行緒後同步: 等待所有thread都完成程式再執行下一個step,這些結果與下個步驟有關,例如:有一個計算需統整來自不同項目的資料,才能產生最終的結果(output) - 多執行緒後異步: 不需等待所有thread完成,即可執行下一個步驟,這些thread的結果與下個步驟無關,不牽涉到同步或是協調的問題,例如一些背景程式(logging or monitoring),在此種情況下要注意各thread是獨立的,沒有互動性 ## :heavy_check_mark: Thread生命週期 1. new: 被建立出來,但尚未被start() 2. ready: 執行start()進入排程器等待,而且可以被run(),還沒被分配到cpu資源(runnable) 3. running: 執行run(),執行狀態,被調度、分配到資源(runnable) 4. blocked: 阻塞狀態,需等待一段不確定的硬體輸入、輸出的 I/O 時間,像是等待monitor lock釋放以繼續執行(non-runnable) 5. wating: 等待特定情況完成以繼續執行(non-runnable),執行 wait() 方法即移出執行狀態,透過被 notify()、notifyAll() 方法呼叫,回到預備狀態 6. timed waiting: 跟waiting相同,不過是在特定的時間限制內(non-runnable),sleep(long milliseconds) 7. terminated: 完成執行,可能是完成執行項目(從run()中解脫)或是提早終止(產生exception或手動終止) :::warning :warning: 使用多執行緒時,要注意**Race condition**、**Deadlock**及其他併行問題、沒有exception只能try catch ::: ### Deadlock死結: 程式卡住無動作,不會拋異常或提示 1. Mutual exclusion互斥,資源每次只能給一個process使用 2. Hold and Wait: 取得並等待另一個資源 3. No preemption: 資源只能由自己釋放,不得由其他人奪取 4. Circular wait: 每個process握有另一個process請求的資源,你我,我等他,他等你 -> 資源分配,由系統安排只會是safe state的request -> thread synchronization同步鎖機制(隱式): 當多個thread使用同一物件時,限定同一時間只能有一個人取用,A使用時,加上鎖讓別人不能使用,任務完成後釋放,讓別人使用 - 同步方法的鎖: 靜態方法(CLASS.class)、非靜態方法(this) - 同步區塊,自己指定,同常也是用this或CLASS.class ※注意多個執行緒都用同一把鎖,否則無法確定資源安全;靜態方法共用同一把鎖,非靜態方法用同一把鎖(?) :star: **處理方法** 1. 明確找出多執行緒的code 2. 確定有無共享資源 3. 確定是否有多個邏輯操作共享數據 > 解: 也就是上述上鎖,並讓操作的邏輯都是在同步範圍內(範圍過大則降低多執行緒優勢,範圍過小則不夠安全) ## :heavy_check_mark: Spring boot 實作 進行多執行緒專案,為了有效果管理執行緒,選擇使用執行緒池,配置java.util.concurrent.TaskExecutor或其子類的bean,減少執行緒的創建和銷毀次數,提高系統的性能和效率。 - annotation:@EnableAsync(開啟非同步操作)、@Async(管理的對象)<!--執行新的thread--> - 獲取返回值: CompletableFuture (強化版future),可以將多個非同步任務進行複雜組合 1. 執行緒池配置(也可以放在properties) ```java @Configuration @EnableAsync public class SpringAsyncConfig { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(2); // 初始化的執行緒數量 executor.setMaxPoolSize(2); // 最大的執行緒數, 緩衝滿了之後會再新增 executor.setQueueCapacity(100); // 緩衝佇列 executor.setKeepAliveSeconds(60); // 超出核心數量的執行緒,閒置多久後銷毀 executor.setThreadNamePrefix("async-thread-"); // 執行緒前綴 executor.initialize(); return executor; } ``` 2. 註解多執行緒操作: public method only、不可非同步調用類內部的方法,返回值只能是void或future的子類,需注意此非spring的 bean ```java @Async("executor") public class taskOne { String task1secAsync = "task 1sec async"; Thread.sleep(1000L); System.out.println("getInsurance: " + Thread.currentThread().getName()); return CompletableFuture.completedFuture(task1secAsync); } @Async("executor") public class taskTwo { String task1secAsync = "task 1sec async"; Thread.sleep(3000L); System.out.println("getInsurance: " + Thread.currentThread().getName()); return CompletableFuture.completedFuture(task1secAsync); } ``` Future: 非同步操作,交付給thread pool執行後需要取回執行結果 -> 應對更多元的情境,future不足以使用 - 合併非同步處理的結果,運算獨立,但同時結果具有一定相依性 - 等待所有及和的任務完成 - 只等待最快的任務完成 - 完成任務後,依結果進行下一步操作,不會單純阻塞等待 **CompletableFuture** (java8新增): 實現future及CompletionStage interface,擁有callback功能,相較future能主動設置結果,在block的情況下能主動終止等待(而不是等待或直到超時),在某個行為完成後可以繼續進行下個行為,內部封裝thread pool,可以將請求或處理過程做異步操作,常用的方法: - runAsync: 異步操作,無返回值 - supplyAsync: 異步操作,有返回值 - anyOf: 任意一個完成即可進行下一步 - allOf: 全部完成才可以進行下一步 --- - join: 完成後回傳結果,只會抛出unchecked異常 - get: 完成後回傳結果,可以增加超時時間,get會拋出具體的異常 - getNow: 如果完成或產生異常,則返回;否則回傳valueIfAbsent的值 - isCancelled - isCompletedExceptionally - isDone --- - complete - completeExceptionally - cancel ## REFERENCE - [Java和Springboot中的多线程](http://bingerambo.com/posts/2021/11/java%E5%92%8Cspringboot%E4%B8%AD%E7%9A%84%E5%A4%9A%E7%BA%BF%E7%A8%8B/#threadpoolexecutor%E6%A6%82%E8%BF%B0) - [模組21-22 執行緒與多執行緒設計(Thread、Multithreading、Runnable、Life Cycle of Thread)](/OisZ0l--R9Ofc4p1rrnR4Q) - [Java實現多執行緒的四種方式](https://tw511.com/a/01/45663.html) - [單執行緒](https://www.jendow.com.tw/wiki/%E5%96%AE%E5%9F%B7%E8%A1%8C%E7%B7%92) - [Java提高班(一)Thread详解](https://www.cnblogs.com/vipstone/p/9762360.html) - [【恐龍】理解 Process & Thread](https://medium.com/erens-tech-book/%E7%90%86%E8%A7%A3-process-thread-94a40721b492) - [Multi-Threaded Application vs. Single Threaded Application](https://dzone.com/articles/multi-threaded-application-vs) - [Single-thread or multithread operations](https://www.ibm.com/docs/en/zos/2.4.0?topic=program-single-thread-multithread-operations) - [Java 的多執行緒,由基礎開始認識 Threading](https://litotom.com/java-threading-basic/) - [牛年自強計畫 Week 1 - SpringBoot - 番外篇 Java 執行緒 Thread/Runnable/Callable](https://hackmd.io/@KaiChen/B1J-x9Akd) - [三、Queue 的應用(3) - Semaphore、Mutex的同步](https://ithelp.ithome.com.tw/articles/10219642) - [尚硅谷_Java零基础教程(多线程)-- 学习笔记](https://blog.csdn.net/JMW1407/article/details/119869538) - [Class Thread](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Thread.html) - [Java實現多執行緒的四種方式](https://tw511.com/a/01/45663.html) - [ExecutorService And Thread Pool](https://jamesqi.medium.com/executorservice-and-thread-pool-4612359b74ae) - [JAVA多執行緒的基本知識](https://popcornylu.gitbooks.io/java_multithread/content/) - [一文秒懂 Java ExecutorService](https://www.twle.cn/c/yufei/javatm/javatm-basic-executorservice.html) - [Java 四種線程池newCachedThreadPool,newFixedThreadPool,newScheduledThreadPool,newSingleThreadExecutor](https://www-cnblogs-com.translate.goog/zhujiabin/p/5404771.html?_x_tr_sl=zh-CN&_x_tr_tl=zh-TW&_x_tr_hl=zh-TW&_x_tr_pto=sc) - [CompletableFuture使用大全,简单易懂](https://juejin.cn/post/6844904195162636295) - [奇淫巧技,CompletableFuture 异步多线程是真的优雅](https://zhuanlan.zhihu.com/p/554266793) - [ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)](https://segmentfault.com/a/1190000039267451) --- ## 其他 - 單核CPU: 假的多執行緒,只是因為切換很快,所以像是單執行緒 - 多核CPU: 真的多執行緒 - 並行: 同時多個CPU多個任務 - 併發: 一個CPU多個任務,像是多個人做同一件事 ### **Table: process vs thread** | | 程序 process | 執行緒 thread | | ------------ | ----------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------ | | 維度 | 裝載執行緒的容器 | 在程序內部 | | 資源與記憶體 | 不共享 | 同程序內的執行緒們共享 | | 優勢 | 相互獨立, 所以安全性較高 | 同步執行task的效率較高 | | 適用場景 | 高隔離/安全性,低容錯的情境,如: 伺服器app或作業系統,能透過在不同電腦或cluster執行改善擴展性及可靠性 | 多核系統併發程式,可同時在不同程序執行,也會使用異步程式,分別在不同執行緒同步運算及IO操作 | ## :heavy_check_mark: JAVA建立thread的方法: 繼承或實現介面 **執行緒種類:** - 背景(daemon thread): garbage collector - 主執行緒(main thread): 執行main方法的程序 - 一般的執行緒thread,用戶的執行緒與背景執行緒(服務用戶執行緒)相對應 在Java啟動時,擁有一個main執行緒(main執行時所產生),若要使用多執行緒,可以使用`java.lang.Thread`,向OS取得額外的執行緒,提供建立啟動終止暫停恢復等方法,chatGPT說有兩種建立方法(繼承THREAD或實現RUNNABLE介面) **==1. 繼承Thread 類別==:** 受限於單一繼承特性 1. 需要建立一個類別 extends Thread 類別 2. override 它的 run() 方法,來定義THREAD要執行的TASKS 3. 此 Thread 的衍生類別產生物件實體後,以 start() 啟動執行緒,調用run()方法。 **==2. Runnable 介面==:** 沒有返回結果 1. 需要建立一個類別 implements Runnable interface,並實作它的方法 run() 2. 此實作 Runnable 的類別產生物件實體後,作為參數傳入 Thread 建構子產生物件 3. 以 start() 調用Runnable介面的實作過的 run() 方法。 :::info :information_source: 在jdk5.0以前所使用的以上兩種方法,都需要重寫run(),一個是在thread內的run(),一個則是在介面子類的run(),定義要執行的邏輯在內,一個是在thread內,優先會選擇runnable的這種方式,消弭單繼承的侷限性,多個執行緒可以共享同介面的對象;jdk8後有lambda方法使用 ::: **==3. 實現Callable介面==:** 有返回結果的多執行緒(有返回值),可以拋出異常,支持泛型的返回值,可以看成RUNNABLE的延伸 1. 需要建立一個類別 implements Callable interface,並實作它的方法 run() 2. 此實作 Runnable 的類別產生物件實體後,作為參數傳入FutureTask(future interface的唯一實現類)來產生物件 3. 由FutureTask包裝(以獲取處理結果),作為參數傳入 Thread 建構子產生物件 4. 以 start()調用Callable的call()方法。 **==4. Thread pool執行緒池(executorService、executors)==:** 分離創建及執行角色,提前建立幾個thread放在pool,負責執行thread,能取用並重複使用thread,一般情況下建議手動調用shutdown()關閉 ### 基本款thread pool 1. 以ExecutorService(繼承Executors)建立pool:利用Executors的工廠方法創建ExecutorService,預配置幾種thread pool可以直接使用,newCacheThreadPool(靈活回收資源再使用或建立執行緒)、newFixThreadPool(固定數量的執行緒,超出的會在queue中等待)、newSingleThreadExecutor(只有一個執行緒,確保是FIFO或LIFO的順序)、newScheduledThreadPool(按照排程定期或週期性執行)、newSingleThreadScheduledExecutor(單一執行緒按照排程執行) 3. 遞交Runnable或實現Callable介面子類執行,常用方法: - Future<> submit(): 執行,回傳future(runnable或callable) - void execute():執行,無返回值 (來自executor介面) - invokeAny(): 執行一組task,返回任一成功結果 - invokeAll(): 執行一組task,返回全部全部結果 - shutdown(): 停止submit新任務,不會中斷現在等待或執行中的任務,對於定期任務而言會因失去 interrupted 狀態而繼續執行造成錯誤,需要針對任務本身的 ScheduledFuture 作 cancel(true) 的處理 - shutdownNow(): 同上,不過會取消等待中的任務,並且嘗試中斷執行中的任務 - isShutdown() - isTerminating(): 判斷是否為正在超時等待狀況 - isTerminated(): 判斷是否結束超時等待狀況 ### ForkJoin Pool 每個thread擁有自己的雙向queue(相對基本款的Pool是大家共用一個queue),適用於task非獨立,交由系統去執行(切分並合併),在task不可分割並且處理時間差異很大時不適用。 - [ ] Future (jdk5 接收Asynchronous Result的一個interface),可以選擇阻塞執行緒,以get()結果,或先別動 - [ ] CompletableFuture - supplyAsync(): 執行task,有返回值 - runAsync(): 執行task,無返回值 - get(): 獲取結果,會拋異常ExecutionException - join(): 獲取結果,方法不會拋異常,執行結果拋CompletionException - thenRun/thenRunAsync: 不依賴上個task結果,無參數及返回值 - thenAccept/thenAcceptAsync: 依賴上個task結果,有傳參數及但沒有返回值 - thenApply/thenApplyAsync: 依賴上個task結果,有傳參數且有返回值 構造方法的參數 | 名稱 | 類型 | 意義 | | --------------- | ------------------------ | ------------------ | | corePoolSize | int | 核心執行緒池大小 | | maximumPoolSize | int | 最大執行緒池大小 | | keepAliveTime | long | 執行緒最大空閒時間 | | unit | TimeUnit | 時間單位 | | workQueue | BlockingQueue | 等待佇列 | | threadFactory | ThreadFactory | 執行緒創建工廠 | | handler | RejectedExecutionHandler | 拒絕策略 | ```JAVA publicclass TestFixedThreadPool { publicstaticvoid main(String[] args) { // 創建一個可重用固定線程數的線程池 ExecutorService pool = Executors.newFixedThreadPool( 2 ); Thread t1 = new MyThread(); Thread t2 = new MyThread(); Thread t3 = new MyThread(); Thread t4 = new MyThread(); Thread t5 = new MyThread(); //將線程放入池中進行執行 pool.execute(t1); pool.execute(t2); pool.execute(t3); pool.execute(t4); pool.execute(t5); //關閉線程池 pool.shutdown(); } } ``` ## :heavy_check_mark: 常用的Thread方法 - `void start()`:啟動執行緒,執行 Thread 物件的 run 方法,重複使用start()會拋異常“IllegalThreadStateException” - `void run()`:如果直接調用,就會以一般非執行緒的方式執行,沒有啟動多執行緒,屬於Thread 物件的 run 方法,包含有要做的tasks,通常users會@override來自定義行為 - `static Thread currentThread()`: 取得目前thread物件,常用於main thread及runnable實現的class,將回傳該執行緒物件。 - `String getName()`: 取得執行緒的名稱 - `void setName(String name)`::給執行緒一個名字,多個不同執行緒時方便掌握 --- - `static void yield()`:使目前正在執行的執行緒讓出執行權(相同優先級有效),不會釋放鎖。 - `static void join() throws InterruptedException`:等待被呼叫的執行緒終止,呼叫的(正在執行的)執行緒才會繼續執行,,例如在a執行緒b.join(), 是a要等b完成 - `static void sleep(long milliseconds) throws InterruptedException`:使執行緒休眠至指定毫秒後。 - `boolean isAlive()`: 確認thread當下是執行或終止狀態 - `void setPriority(int newPriority)`:設定執行緒的優先權([範圍](https://hackmd.io/@czyue/SkrBq9FEO#setPriority)) - `int getPriority()`: 獲取優先級 - `setPriority(int newPriority)`:設定優先級,1~5(default)~10,優先權高的較有"可能"被盡早執行,相等時任選其一執行 - Thread.MIN_PRIORITY:相當於最小值 1。 - Thread.NORM_PRIORITY:相當於預設值 5。 - Thread.MAX_PRIORITY:相當於最大值 10。 調度方法 1. 同優先層級為先進先出,時間片策略 2. 高優先層級,搶佔式策略 ※ 低優先表示獲得調度的"機率"低,不是說一定會排在高優先級之後 --- - `void setDaemon(boolean on)`:在start()之前使用,可以將其設為背景執行緒(守護執行緒),例如gc執行緒。 - `boolean isDeamon()`:該執行緒是否為背景執行緒。 - `boolean isInterrupted()`: 確認thread是否被中斷(interrupted) --- - `void wait()`: 放棄所有,要等到notify()或notifyAll()才能重新執行 - `void notify()`: 喚醒queue當中優先層級最高者結束等待 - `void notifyAll()`: 喚醒所有排隊的人結束等待 > 只能在synchronized範圍下使用,否則會有ava.lang.IllegalMonitorStateException ### Deprecated method - void stop(): 強制終止執行緒,不建議使用,可能不太安全 - void suspend(): 不會釋放鎖,要盡量避免用suspend和resume()控制,可能導致deadlock - void resume(): 和suspend相搭配,可能導致deadlock ### 類似方法區別: sleep()和wait() <!--兩者都會讓thread暫停一會兒--> | | sleep | wait | |---|-----|-----| |方法| thread class的靜態方法 | object類實體的方法 | |使用| 可以在任意thread使用 | 只能在monitor有用此實體時使用| |條件| 沉睡特定時間| 等待特定condition為true| |鎖釋放|不釋放,所以其他thread此時無法獲取物件|釋放,別人可以獲取物件,存在改變等待狀態thread的等待議題| |恢復|沉睡情況可以被其他thread中斷或是其他方法喚醒|從等待狀態甦醒,表示某種情況為true| |恢復方法|interreput()|notify()/ notifyAll()| |調用次數限制|能多次使用|只能一次,如果要在不同條件都等待,那就要使用Loop在不同的情境下使用| ## Java example :beginner:**實作創建執行緒** ```java= public class ThreadDemo { public static void main(String[] args) throws Exception { System.out.println("Current thread: " + Thread.currentThread().getName()); // 1. Extends Thread MyThreadExtends myThreadExtends = new MyThreadExtends(); myThreadExtends.start(); // 2. Implements Runnable MyThreadRunnable myThreadRunnable = new MyThreadRunnable(); Thread threadRunnable = new Thread(myThreadRunnable); threadRunnable.start(); // 3. Implements Runnable Callable MyThreadCallable myThreadCallable = new MyThreadCallable(); FutureTask<String> future = new FutureTask<>(myThreadCallable); Thread threadCallable = new Thread(future); threadCallable.start(); System.out.println(future.get()); // 4. Thread pool: Executors and ExecutorService ExecutorService executor = Executors.newFixedThreadPool(3); executor.submit(new MyThreadRunnable()); executor.submit(new MyThreadRunnable()); executor.submit(new MyThreadCallable()); executor.submit(new MyThreadCallable()); executor.submit(new MyThreadCallable()); } // 4. Thread pool: forkjoin pool ForkJoinPool forkJoinPool = new ForkJoinPool(); Integer result = forkJoinPool.invoke(new Fibonacci(10)); System.out.println("Fibonacci 的结果是: " + result); forkJoinPool.shutdown(); } } class MyThreadExtends extends Thread { @Override public void run() { System.out.println(Thread.currentThread().getName() + ": Run method executed by extending Thread"); } } class MyThreadRunnable implements Runnable { @Override public void run() { System.out.println(Thread.currentThread().getName() + ": Run method executed by implementing Runnable"); } } class MyThreadCallable implements Callable<String> { String message = "Callable interface return value"; public String call() throws Exception { System.out.println(Thread.currentThread().getName() + ": Run method executed by implementing Callable"); return message; } } class Fibonacci extends RecursiveTask<Integer> { /** * */ private static final long serialVersionUID = 1L; final int n; Fibonacci(int n) { this.n = n; } @Override public Integer compute() { if (n <= 1) { // 任務小到不需要拆分, 直接計算獲得結果 return n; } // 進行fork調用子任務計算,join合併計算結果 System.out.println(Thread.currentThread().getName()); Fibonacci f1 = new Fibonacci(n - 1); f1.fork(); Fibonacci f2 = new Fibonacci(n - 2); return f2.compute() + f1.join(); } } ``` :beginner:**TODO: 實作鎖定synchronized、lock** jdk5.0顯式定義同步鎖,不同於synchronized,Lock只有程式碼區塊鎖,在訪問共享資源前,要先有Lock instance ```java class A { private final ReentrantLock lock = new ReenTrantLock(); public void m(){ lock.lock(); try{ } finally { lock.unlock(); } } } ``` ### 怎麼判斷需求的thread數量? - CPU intensive tasks: 最多為CPU核數量 - Network or I/O intensive tasks: 可以多於CPU核的數量,因為cores不是限制因素