---
title: TOOLS- kettle
tags: JAVA
description: LEARNING KETTLE.
---
# [THREAD] JAVA執行緒
:::info
:bulb: Before the 1:1 meeting, ask the manager and employee to fill in the answer. You can design questions according to your needs.
:::
## :white_check_mark: 何謂執行緒
### 1. 前言介紹
- program 程式: code的集合,尚未load進記憶體,可以被執行多次(有多個process),靜態的程式碼
- process 程序: 是被電腦執行的程式實體(instance),包含有code、系統資源(像是memory, file, i/o devices),變成可被執行狀態,例如我們點開應用程式的行為,就是在將program化為process,包含有memory space跟thread(s),可以藉由inter-process communication(IPC) mechanisms相互溝通。
- thread 執行緒: 程序的次單位,每個thread有自己的program counter, stack及Local變數,CPU調度級分派的基本單位,包含在程序當中,共享程序當中的資源及memory space,各負責一個功能,可以在單一的執行緒當中,執行多個tasks,也就是說執行緒是一連串的指示,在同一個程式當中,各個執行緒能獨立並行執行tasks. thread溝通方式,可以透過shared memory, synchronization機制(LOCK)或SEMAPHORES(旗號)<!--特定數值的號誌,根據情境做加減,當數值為0時為unsignaled 大於0時為signaled,依情況管理執行緒對資源的操作,不過無法保證進入順序-->或condition變數
## :heavy_check_mark: 單執行緒與多執行緒 <!--需包含比較-->
### :flags: 單執行緒
程式在同一個時間只會執行一個thread,執行需按照特定順序,前面完成後面才能執行,也就是task不能獨立執行,所以像是遇到blocking的時候,就會延宕進入等待,直到完成後才會恢復執行,表示有synchronous操作。
- 適用於簡單的tasks,不需要負責的計算或平行處理(同時間做多件事),也不需要在意響應時間,事件之間有相依關係,常用於點對點服務,通常效率較多執行緒低一點,如:計算機輸入計算輸出
- 優勢為容易撰寫及debugging、較低的開支(overhead)、系統穩定、擴展性強
### :flags: 多執行緒
程式在同一個時間,執行多個執行緒,多個執行緒之間彼此獨立,相互不影響,能同步執行,有異步asynchronous操作
- 適用於較複雜的tasks,需要平行處理(後台有些處理需求)或大量的計算操作,需求高併發(concurrency),如:網頁瀏覽器、遊戲(怪物、背景、音樂)、DB、
- 優勢為提升響應性(responsiveness),優化圖形介面,增加使用者體驗、較好的效能,改善CPU使用效率、優先秩序安排(prioritization)、改善結構,將複雜邏輯分離開,易於理解與修改
- 多執行緒後同步: 等待所有thread都完成程式再執行下一個step,這些結果與下個步驟有關,例如:有一個計算需統整來自不同項目的資料,才能產生最終的結果(output)
- 多執行緒後異步: 不需等待所有thread完成,即可執行下一個步驟,這些thread的結果與下個步驟無關,不牽涉到同步或是協調的問題,例如一些背景程式(logging or monitoring),在此種情況下要注意各thread是獨立的,沒有互動性
## :heavy_check_mark: Thread生命週期
1. new: 被建立出來,但尚未被start()
2. ready: 執行start()進入排程器等待,而且可以被run(),還沒被分配到cpu資源(runnable)
3. running: 執行run(),執行狀態,被調度、分配到資源(runnable)
4. blocked: 阻塞狀態,需等待一段不確定的硬體輸入、輸出的 I/O 時間,像是等待monitor lock釋放以繼續執行(non-runnable)
5. wating: 等待特定情況完成以繼續執行(non-runnable),執行 wait() 方法即移出執行狀態,透過被 notify()、notifyAll() 方法呼叫,回到預備狀態
6. timed waiting: 跟waiting相同,不過是在特定的時間限制內(non-runnable),sleep(long milliseconds)
7. terminated: 完成執行,可能是完成執行項目(從run()中解脫)或是提早終止(產生exception或手動終止)
:::warning
:warning: 使用多執行緒時,要注意**Race condition**、**Deadlock**及其他併行問題、沒有exception只能try catch
:::
### Deadlock死結: 程式卡住無動作,不會拋異常或提示
1. Mutual exclusion互斥,資源每次只能給一個process使用
2. Hold and Wait: 取得並等待另一個資源
3. No preemption: 資源只能由自己釋放,不得由其他人奪取
4. Circular wait: 每個process握有另一個process請求的資源,你我,我等他,他等你
-> 資源分配,由系統安排只會是safe state的request
-> thread synchronization同步鎖機制(隱式): 當多個thread使用同一物件時,限定同一時間只能有一個人取用,A使用時,加上鎖讓別人不能使用,任務完成後釋放,讓別人使用
- 同步方法的鎖: 靜態方法(CLASS.class)、非靜態方法(this)
- 同步區塊,自己指定,同常也是用this或CLASS.class
※注意多個執行緒都用同一把鎖,否則無法確定資源安全;靜態方法共用同一把鎖,非靜態方法用同一把鎖(?)
:star: **處理方法**
1. 明確找出多執行緒的code
2. 確定有無共享資源
3. 確定是否有多個邏輯操作共享數據
> 解: 也就是上述上鎖,並讓操作的邏輯都是在同步範圍內(範圍過大則降低多執行緒優勢,範圍過小則不夠安全)
## :heavy_check_mark: Spring boot 實作
進行多執行緒專案,為了有效果管理執行緒,選擇使用執行緒池,配置java.util.concurrent.TaskExecutor或其子類的bean,減少執行緒的創建和銷毀次數,提高系統的性能和效率。
- annotation:@EnableAsync(開啟非同步操作)、@Async(管理的對象)<!--執行新的thread-->
- 獲取返回值: CompletableFuture (強化版future),可以將多個非同步任務進行複雜組合
1. 執行緒池配置(也可以放在properties)
```java
@Configuration
@EnableAsync
public class SpringAsyncConfig {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2); // 初始化的執行緒數量
executor.setMaxPoolSize(2); // 最大的執行緒數, 緩衝滿了之後會再新增
executor.setQueueCapacity(100); // 緩衝佇列
executor.setKeepAliveSeconds(60); // 超出核心數量的執行緒,閒置多久後銷毀
executor.setThreadNamePrefix("async-thread-"); // 執行緒前綴
executor.initialize();
return executor;
}
```
2. 註解多執行緒操作: public method only、不可非同步調用類內部的方法,返回值只能是void或future的子類,需注意此非spring的 bean
```java
@Async("executor")
public class taskOne {
String task1secAsync = "task 1sec async";
Thread.sleep(1000L);
System.out.println("getInsurance: " + Thread.currentThread().getName());
return CompletableFuture.completedFuture(task1secAsync);
}
@Async("executor")
public class taskTwo {
String task1secAsync = "task 1sec async";
Thread.sleep(3000L);
System.out.println("getInsurance: " + Thread.currentThread().getName());
return CompletableFuture.completedFuture(task1secAsync);
}
```
Future: 非同步操作,交付給thread pool執行後需要取回執行結果
-> 應對更多元的情境,future不足以使用
- 合併非同步處理的結果,運算獨立,但同時結果具有一定相依性
- 等待所有及和的任務完成
- 只等待最快的任務完成
- 完成任務後,依結果進行下一步操作,不會單純阻塞等待
**CompletableFuture** (java8新增): 實現future及CompletionStage interface,擁有callback功能,相較future能主動設置結果,在block的情況下能主動終止等待(而不是等待或直到超時),在某個行為完成後可以繼續進行下個行為,內部封裝thread pool,可以將請求或處理過程做異步操作,常用的方法:
- runAsync: 異步操作,無返回值
- supplyAsync: 異步操作,有返回值
- anyOf: 任意一個完成即可進行下一步
- allOf: 全部完成才可以進行下一步
---
- join: 完成後回傳結果,只會抛出unchecked異常
- get: 完成後回傳結果,可以增加超時時間,get會拋出具體的異常
- getNow: 如果完成或產生異常,則返回;否則回傳valueIfAbsent的值
- isCancelled
- isCompletedExceptionally
- isDone
---
- complete
- completeExceptionally
- cancel
## REFERENCE
- [Java和Springboot中的多线程](http://bingerambo.com/posts/2021/11/java%E5%92%8Cspringboot%E4%B8%AD%E7%9A%84%E5%A4%9A%E7%BA%BF%E7%A8%8B/#threadpoolexecutor%E6%A6%82%E8%BF%B0)
- [模組21-22 執行緒與多執行緒設計(Thread、Multithreading、Runnable、Life Cycle of Thread)](/OisZ0l--R9Ofc4p1rrnR4Q)
- [Java實現多執行緒的四種方式](https://tw511.com/a/01/45663.html)
- [單執行緒](https://www.jendow.com.tw/wiki/%E5%96%AE%E5%9F%B7%E8%A1%8C%E7%B7%92)
- [Java提高班(一)Thread详解](https://www.cnblogs.com/vipstone/p/9762360.html)
- [【恐龍】理解 Process & Thread](https://medium.com/erens-tech-book/%E7%90%86%E8%A7%A3-process-thread-94a40721b492)
- [Multi-Threaded Application vs. Single Threaded Application](https://dzone.com/articles/multi-threaded-application-vs)
- [Single-thread or multithread operations](https://www.ibm.com/docs/en/zos/2.4.0?topic=program-single-thread-multithread-operations)
- [Java 的多執行緒,由基礎開始認識 Threading](https://litotom.com/java-threading-basic/)
- [牛年自強計畫 Week 1 - SpringBoot - 番外篇 Java 執行緒 Thread/Runnable/Callable](https://hackmd.io/@KaiChen/B1J-x9Akd)
- [三、Queue 的應用(3) - Semaphore、Mutex的同步](https://ithelp.ithome.com.tw/articles/10219642)
- [尚硅谷_Java零基础教程(多线程)-- 学习笔记](https://blog.csdn.net/JMW1407/article/details/119869538)
- [Class Thread](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Thread.html)
- [Java實現多執行緒的四種方式](https://tw511.com/a/01/45663.html)
- [ExecutorService And Thread Pool](https://jamesqi.medium.com/executorservice-and-thread-pool-4612359b74ae)
- [JAVA多執行緒的基本知識](https://popcornylu.gitbooks.io/java_multithread/content/)
- [一文秒懂 Java ExecutorService](https://www.twle.cn/c/yufei/javatm/javatm-basic-executorservice.html)
- [Java 四種線程池newCachedThreadPool,newFixedThreadPool,newScheduledThreadPool,newSingleThreadExecutor](https://www-cnblogs-com.translate.goog/zhujiabin/p/5404771.html?_x_tr_sl=zh-CN&_x_tr_tl=zh-TW&_x_tr_hl=zh-TW&_x_tr_pto=sc)
- [CompletableFuture使用大全,简单易懂](https://juejin.cn/post/6844904195162636295)
- [奇淫巧技,CompletableFuture 异步多线程是真的优雅](https://zhuanlan.zhihu.com/p/554266793)
- [ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)](https://segmentfault.com/a/1190000039267451)
---
## 其他
- 單核CPU: 假的多執行緒,只是因為切換很快,所以像是單執行緒
- 多核CPU: 真的多執行緒
- 並行: 同時多個CPU多個任務
- 併發: 一個CPU多個任務,像是多個人做同一件事
### **Table: process vs thread**
| | 程序 process | 執行緒 thread |
| ------------ | ----------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------ |
| 維度 | 裝載執行緒的容器 | 在程序內部 |
| 資源與記憶體 | 不共享 | 同程序內的執行緒們共享 |
| 優勢 | 相互獨立, 所以安全性較高 | 同步執行task的效率較高 |
| 適用場景 | 高隔離/安全性,低容錯的情境,如: 伺服器app或作業系統,能透過在不同電腦或cluster執行改善擴展性及可靠性 | 多核系統併發程式,可同時在不同程序執行,也會使用異步程式,分別在不同執行緒同步運算及IO操作 |
## :heavy_check_mark: JAVA建立thread的方法: 繼承或實現介面
**執行緒種類:**
- 背景(daemon thread): garbage collector
- 主執行緒(main thread): 執行main方法的程序
- 一般的執行緒thread,用戶的執行緒與背景執行緒(服務用戶執行緒)相對應
在Java啟動時,擁有一個main執行緒(main執行時所產生),若要使用多執行緒,可以使用`java.lang.Thread`,向OS取得額外的執行緒,提供建立啟動終止暫停恢復等方法,chatGPT說有兩種建立方法(繼承THREAD或實現RUNNABLE介面)
**==1. 繼承Thread 類別==:** 受限於單一繼承特性
1. 需要建立一個類別 extends Thread 類別
2. override 它的 run() 方法,來定義THREAD要執行的TASKS
3. 此 Thread 的衍生類別產生物件實體後,以 start() 啟動執行緒,調用run()方法。
**==2. Runnable 介面==:** 沒有返回結果
1. 需要建立一個類別 implements Runnable interface,並實作它的方法 run()
2. 此實作 Runnable 的類別產生物件實體後,作為參數傳入 Thread 建構子產生物件
3. 以 start() 調用Runnable介面的實作過的 run() 方法。
:::info
:information_source: 在jdk5.0以前所使用的以上兩種方法,都需要重寫run(),一個是在thread內的run(),一個則是在介面子類的run(),定義要執行的邏輯在內,一個是在thread內,優先會選擇runnable的這種方式,消弭單繼承的侷限性,多個執行緒可以共享同介面的對象;jdk8後有lambda方法使用
:::
**==3. 實現Callable介面==:** 有返回結果的多執行緒(有返回值),可以拋出異常,支持泛型的返回值,可以看成RUNNABLE的延伸
1. 需要建立一個類別 implements Callable interface,並實作它的方法 run()
2. 此實作 Runnable 的類別產生物件實體後,作為參數傳入FutureTask(future interface的唯一實現類)來產生物件
3. 由FutureTask包裝(以獲取處理結果),作為參數傳入 Thread 建構子產生物件
4. 以 start()調用Callable的call()方法。
**==4. Thread pool執行緒池(executorService、executors)==:** 分離創建及執行角色,提前建立幾個thread放在pool,負責執行thread,能取用並重複使用thread,一般情況下建議手動調用shutdown()關閉
### 基本款thread pool
1. 以ExecutorService(繼承Executors)建立pool:利用Executors的工廠方法創建ExecutorService,預配置幾種thread pool可以直接使用,newCacheThreadPool(靈活回收資源再使用或建立執行緒)、newFixThreadPool(固定數量的執行緒,超出的會在queue中等待)、newSingleThreadExecutor(只有一個執行緒,確保是FIFO或LIFO的順序)、newScheduledThreadPool(按照排程定期或週期性執行)、newSingleThreadScheduledExecutor(單一執行緒按照排程執行)
3. 遞交Runnable或實現Callable介面子類執行,常用方法:
- Future<> submit(): 執行,回傳future(runnable或callable)
- void execute():執行,無返回值 (來自executor介面)
- invokeAny(): 執行一組task,返回任一成功結果
- invokeAll(): 執行一組task,返回全部全部結果
- shutdown(): 停止submit新任務,不會中斷現在等待或執行中的任務,對於定期任務而言會因失去 interrupted 狀態而繼續執行造成錯誤,需要針對任務本身的 ScheduledFuture 作 cancel(true) 的處理
- shutdownNow(): 同上,不過會取消等待中的任務,並且嘗試中斷執行中的任務
- isShutdown()
- isTerminating(): 判斷是否為正在超時等待狀況
- isTerminated(): 判斷是否結束超時等待狀況
### ForkJoin Pool
每個thread擁有自己的雙向queue(相對基本款的Pool是大家共用一個queue),適用於task非獨立,交由系統去執行(切分並合併),在task不可分割並且處理時間差異很大時不適用。
- [ ] Future (jdk5 接收Asynchronous Result的一個interface),可以選擇阻塞執行緒,以get()結果,或先別動
- [ ] CompletableFuture
- supplyAsync(): 執行task,有返回值
- runAsync(): 執行task,無返回值
- get(): 獲取結果,會拋異常ExecutionException
- join(): 獲取結果,方法不會拋異常,執行結果拋CompletionException
- thenRun/thenRunAsync: 不依賴上個task結果,無參數及返回值
- thenAccept/thenAcceptAsync: 依賴上個task結果,有傳參數及但沒有返回值
- thenApply/thenApplyAsync: 依賴上個task結果,有傳參數且有返回值
構造方法的參數
| 名稱 | 類型 | 意義 |
| --------------- | ------------------------ | ------------------ |
| corePoolSize | int | 核心執行緒池大小 |
| maximumPoolSize | int | 最大執行緒池大小 |
| keepAliveTime | long | 執行緒最大空閒時間 |
| unit | TimeUnit | 時間單位 |
| workQueue | BlockingQueue | 等待佇列 |
| threadFactory | ThreadFactory | 執行緒創建工廠 |
| handler | RejectedExecutionHandler | 拒絕策略 |
```JAVA
publicclass TestFixedThreadPool {
publicstaticvoid main(String[] args) {
// 創建一個可重用固定線程數的線程池
ExecutorService pool = Executors.newFixedThreadPool( 2 );
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
//將線程放入池中進行執行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
//關閉線程池
pool.shutdown();
}
}
```
## :heavy_check_mark: 常用的Thread方法
- `void start()`:啟動執行緒,執行 Thread 物件的 run 方法,重複使用start()會拋異常“IllegalThreadStateException”
- `void run()`:如果直接調用,就會以一般非執行緒的方式執行,沒有啟動多執行緒,屬於Thread 物件的 run 方法,包含有要做的tasks,通常users會@override來自定義行為
- `static Thread currentThread()`: 取得目前thread物件,常用於main thread及runnable實現的class,將回傳該執行緒物件。
- `String getName()`: 取得執行緒的名稱
- `void setName(String name)`::給執行緒一個名字,多個不同執行緒時方便掌握
---
- `static void yield()`:使目前正在執行的執行緒讓出執行權(相同優先級有效),不會釋放鎖。
- `static void join() throws InterruptedException`:等待被呼叫的執行緒終止,呼叫的(正在執行的)執行緒才會繼續執行,,例如在a執行緒b.join(), 是a要等b完成
- `static void sleep(long milliseconds) throws InterruptedException`:使執行緒休眠至指定毫秒後。
- `boolean isAlive()`: 確認thread當下是執行或終止狀態
- `void setPriority(int newPriority)`:設定執行緒的優先權([範圍](https://hackmd.io/@czyue/SkrBq9FEO#setPriority))
- `int getPriority()`: 獲取優先級
- `setPriority(int newPriority)`:設定優先級,1~5(default)~10,優先權高的較有"可能"被盡早執行,相等時任選其一執行
- Thread.MIN_PRIORITY:相當於最小值 1。
- Thread.NORM_PRIORITY:相當於預設值 5。
- Thread.MAX_PRIORITY:相當於最大值 10。
調度方法
1. 同優先層級為先進先出,時間片策略
2. 高優先層級,搶佔式策略
※ 低優先表示獲得調度的"機率"低,不是說一定會排在高優先級之後
---
- `void setDaemon(boolean on)`:在start()之前使用,可以將其設為背景執行緒(守護執行緒),例如gc執行緒。
- `boolean isDeamon()`:該執行緒是否為背景執行緒。
- `boolean isInterrupted()`: 確認thread是否被中斷(interrupted)
---
- `void wait()`: 放棄所有,要等到notify()或notifyAll()才能重新執行
- `void notify()`: 喚醒queue當中優先層級最高者結束等待
- `void notifyAll()`: 喚醒所有排隊的人結束等待
> 只能在synchronized範圍下使用,否則會有ava.lang.IllegalMonitorStateException
### Deprecated method
- void stop(): 強制終止執行緒,不建議使用,可能不太安全
- void suspend(): 不會釋放鎖,要盡量避免用suspend和resume()控制,可能導致deadlock
- void resume(): 和suspend相搭配,可能導致deadlock
### 類似方法區別: sleep()和wait() <!--兩者都會讓thread暫停一會兒-->
| | sleep | wait |
|---|-----|-----|
|方法| thread class的靜態方法 | object類實體的方法 |
|使用| 可以在任意thread使用 | 只能在monitor有用此實體時使用|
|條件| 沉睡特定時間| 等待特定condition為true|
|鎖釋放|不釋放,所以其他thread此時無法獲取物件|釋放,別人可以獲取物件,存在改變等待狀態thread的等待議題|
|恢復|沉睡情況可以被其他thread中斷或是其他方法喚醒|從等待狀態甦醒,表示某種情況為true|
|恢復方法|interreput()|notify()/ notifyAll()|
|調用次數限制|能多次使用|只能一次,如果要在不同條件都等待,那就要使用Loop在不同的情境下使用|
## Java example
:beginner:**實作創建執行緒**
```java=
public class ThreadDemo {
public static void main(String[] args) throws Exception {
System.out.println("Current thread: " + Thread.currentThread().getName());
// 1. Extends Thread
MyThreadExtends myThreadExtends = new MyThreadExtends();
myThreadExtends.start();
// 2. Implements Runnable
MyThreadRunnable myThreadRunnable = new MyThreadRunnable();
Thread threadRunnable = new Thread(myThreadRunnable);
threadRunnable.start();
// 3. Implements Runnable Callable
MyThreadCallable myThreadCallable = new MyThreadCallable();
FutureTask<String> future = new FutureTask<>(myThreadCallable);
Thread threadCallable = new Thread(future);
threadCallable.start();
System.out.println(future.get());
// 4. Thread pool: Executors and ExecutorService
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.submit(new MyThreadRunnable());
executor.submit(new MyThreadRunnable());
executor.submit(new MyThreadCallable());
executor.submit(new MyThreadCallable());
executor.submit(new MyThreadCallable());
}
// 4. Thread pool: forkjoin pool
ForkJoinPool forkJoinPool = new ForkJoinPool();
Integer result = forkJoinPool.invoke(new Fibonacci(10));
System.out.println("Fibonacci 的结果是: " + result);
forkJoinPool.shutdown();
}
}
class MyThreadExtends extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ": Run method executed by extending Thread");
}
}
class MyThreadRunnable implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ": Run method executed by implementing Runnable");
}
}
class MyThreadCallable implements Callable<String> {
String message = "Callable interface return value";
public String call() throws Exception {
System.out.println(Thread.currentThread().getName() + ": Run method executed by implementing Callable");
return message;
}
}
class Fibonacci extends RecursiveTask<Integer> {
/**
*
*/
private static final long serialVersionUID = 1L;
final int n;
Fibonacci(int n) {
this.n = n;
}
@Override
public Integer compute() {
if (n <= 1) { // 任務小到不需要拆分, 直接計算獲得結果
return n;
}
// 進行fork調用子任務計算,join合併計算結果
System.out.println(Thread.currentThread().getName());
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}
```
:beginner:**TODO: 實作鎖定synchronized、lock**
jdk5.0顯式定義同步鎖,不同於synchronized,Lock只有程式碼區塊鎖,在訪問共享資源前,要先有Lock instance
```java
class A {
private final ReentrantLock lock = new ReenTrantLock();
public void m(){
lock.lock();
try{
}
finally {
lock.unlock();
}
}
}
```
### 怎麼判斷需求的thread數量?
- CPU intensive tasks: 最多為CPU核數量
- Network or I/O intensive tasks: 可以多於CPU核的數量,因為cores不是限制因素