---
title: 'Java 阻塞隊列、生產者 / 消費者'
disqus: kyleAlien
---
Java 阻塞隊列、生產者 / 消費者
===
## Overview of Content
如有引用參考請詳註出處,感謝 :cat:
:::success
* 如果喜歡讀更好看一點的網頁版本,可以到我新做的網站 [**DevTech Ascendancy Hub**](https://devtechascendancy.com/)
本篇文章對應的是 [**高效使用阻塞隊列:生產者-消費者模型的最佳實踐 | 阻塞隊列 | 3 種實現**](https://devtechascendancy.com/producer-consumer-blocking/)
:::
[TOC]
## 認識生產者、消費者
生產者-消費者模型(Producer-Consumer Model)是一種常見的多執行緒佈局,這裡我們先捨去「多執行序概念」,先了解這兩個角色
* **生產者**(`Producer`):負責生成數據或任務
* **消費者**(`Consumer`):從隊列中取出數據、任務並進行處理
而這兩種通常會一同處理同一個事物,目標是加速處理資料的效率… 那為了達到這個目標,我們需要什麼呢?請繼續往下看…
### 生產者、消費者模型:所需元素
* 要製作生產者、消費者模型我們需要以下幾個元素
* **通訊容器**:這個容器就是隊列(`Queue`)
需要有一個橋樑將生產者、消費者連接起來,這個橋樑就是「隊列」,這樣兩種才可以通訊(相互傳遞是否已經處理完畢,或是生產出需要處理的物件)
:::info
* 為什麼隊列?不是其他數據結構
因為隊列(`Queue`)這種數據結構本身就具有 FIFO 的特性,這個特性擁有「**排隊**」的功能,能夠「**有順序性**」的處理事件
:::
```mermaid
graph TD
subgraph Queue
Task1 --> Task2 --> Task3
end
Producer -.-> |生成數據或任務| Queue
Queue -.-> |取出數據、任務| Consumer
subgraph 生產者-消費者模型
Producer["生產者 (Producer)"]
Queue["隊列 (Queue)"]
Consumer["消費者 (Consumer)"]
end
```
* **這個通訊容器「可阻塞」**:
阻塞意味著「等待」,因為不管是生產者還是消費者,雙方往往不會有資源對等的情況,更常碰到的是其中一方缺乏資源,而另外一方不斷搜尋(`Pulling`)的情況
這意味著資源的耗費,所以 生產者、消費者模型 往往需要一個可阻塞的容器
:::warning
* **為什麼需要阻塞**?
阻塞在執行序來說就是休眠,意味這讓出 CPU 資源,這種特性運作在隊列中代表的含義就是等待任務(並且不耗費 CPU 資源),可以大大提升應用的性能
:::
```mermaid
graph TD
Producer -.->|生成數據或任務| Queue
Queue -.->|取出數據、任務| Consumer
subgraph 生產者-消費者模型
Producer["生產者 (Producer)"]
Queue["阻塞隊列 (Blocking Queue)"]
Consumer["消費者 (Consumer)"]
end
classDef queueClass fill:#f9f,stroke:#333,stroke-width:2px;
Queue:::queueClass
```
* 再來,我們需要 **這個通訊容器「執行序安全」**:也就同步機制
在這裡我們再加入多執行序(`Multi-Thread`)的特性,我們知道多執行序在運行時沒有任何的鎖 🔒 那就是一種非安全操作,所以我們需要這個通訊容器內擁有鎖 🔒
> 而怎麼鎖,幾把鎖,又該如何挑選鎖… 可以點擊以下連結去了解鎖,而這篇文章底下會說明如何實現「生產者、消費者模型」
:::info
對鎖 🔒 不清楚的讀者,可以點擊 [**全面解析多執行緒與同步技術:SYNC、CAS、ThreadLocal | 公平鎖、可重入鎖、樂觀鎖**](https://devtechascendancy.com/multithread-sync-cas-thread-local-guide/) 參考
:::
```mermaid
graph TD
Producer -.->|生成數據或任務| Queue
Queue -.->|取出數據、任務| Consumer
subgraph " 🔒 🔒 🔒 生產者-消費者模型 🔒 🔒 🔒 "
Producer["生產者 (Producer)"]
Queue["阻塞隊列 (Blocking Queue)"]
Consumer["消費者 (Consumer)"]
end
classDef queueClass fill:#f9f,stroke:#333,stroke-width:2px;
Queue:::queueClass
```
### 阻塞隊列特性
* **堵塞**:執行緒在阻塞時會讓出 CPU 資源,再讓出 CPU 資源後,系統就會將 CPU 運算資源讓給其他執行序
* **阻塞隊列**:阻塞隊列是「產生者、消費者模式」中的一個元素,在不符合條件時就會進行阻塞動作(可以想成等待),其操作必須符合 **兩個條件**
1. 作為「消費者」角色,當 **==隊列為空時==**,消費者要獲取元素,會呈現執行序 **等待狀態**(阻塞),直到有數據放入佇列(通常帶有通知功能)
```mermaid
graph TD
Producer -->|生成數據或任務| Queue
Queue -->|取出數據、任務| Consumer
subgraph 生產者-消費者模型
Producer["生產者 (Producer)"]
Queue["阻塞隊列 (Blocking Queue)"]
Consumer["消費者 (Consumer)"]
end
classDef queueClass fill:#f9f,stroke:#333,stroke-width:2px;
Queue:::queueClass
subgraph 阻塞特性
阻塞-->讓出CPU資源
讓出CPU資源-->其他執行緒獲取CPU
end
Consumer -.-> |1. 無元素可消耗| 阻塞
其他執行緒獲取CPU -.-> |2. 通知| Producer
```
2. 作為「生產者」角色,當 **==隊列滿時==**,生產者要加入元素,也會呈現**等待狀態**(阻塞)等佇列有空間,並同時讓出 CPU 資源(通常帶有通知功能)
```mermaid
graph TD
Producer -->|生成數據或任務| Queue
Queue -->|取出數據、任務| Consumer
subgraph 生產者-消費者模型
Producer["生產者 (Producer)"]
Queue["阻塞隊列 (Blocking Queue)"]
Consumer["消費者 (Consumer)"]
end
classDef queueClass fill:#f9f,stroke:#333,stroke-width:2px;
Queue:::queueClass
subgraph 阻塞特性
阻塞-->讓出CPU資源
讓出CPU資源-->其他執行緒獲取CPU
end
Producer -.-> |1. 隊列滿| 阻塞
其他執行緒獲取CPU -.-> |2. 通知| Consumer
```
### 生產者、消費者模型:應用場景
* **資源管理**:
生產者-消費者模型可以「**有效、安全**」地管理和分配系統資源,避免資源的競爭和衝突
* **異步處理**:
生產者和消費者可以在不同的執行緒中執行,實現任務的異步處理,提高系統的響應速度
* **多執行緒處理**:
在多執行緒環境中,生產者生成數據,消費者處理數據,這樣可以充分利用多核處理器的性能,提高應用的效率(**提升利用 CPU 的吞吐量**)
併發編程中使用生產者 & 消費者模式能夠解決絕大多數的併發問題,如果發生生產 & 消費兩者效率不同時就必須使用一個容器去解決,**該容器就是隊列,++隊列會產生一種緩衝的功能++**
## 堵塞隊列:Blocking-Queue
我們前面已經說明了為何生產者、消費者模型要使用隊列,那接著我們就來看看 Java 內置的阻塞隊列有哪些?這些隊列又哪有些特性、特色
### [BlockingQueue](https://developer.android.com/reference/java/util/concurrent/BlockingQueue) 界面:阻塞隊列核心方法
* Java [**BlockingQueue**](https://cs.android.com/android/platform/superproject/main/+/main:libcore/ojluni/src/main/java/java/util/concurrent/BlockingQueue.java) 是一個界面(`interface`)它定義了阻塞隊列的核心方法,下表是 `BlockingQueue` 類的重點方法,**其中 ==只有 put、take 會有阻塞== 現象**
* 對列表「**添加**」元素
| Name | Params | return | Func |
| ------- | --------------------------- | ------- | ----------------------------------------------------------------------------------------- |
| `add` | E(泛型) | boolean | 如果不違反容量限制則回傳 true,沒空間拋出 IllegalStateException |
| `offer` | E | boolean | 就像是 `add(E)` 方法,但並**不會拋出異常** |
| `offer` | E, time, unit | boolean | **限定時間內要放入數據** |
| **`put`**(會阻塞) | E | void | 將數據加入 BlockingQueue 裡面,**如果沒有空間,則調用此方法的執行序被阻斷**,直至有空間放入 |
* 對列表「移除」元素
| Name | Params | return | Func |
| ------- | --------------------------- | ------- | ----------------------------------------------------------------------------------------- |
| `poll` | time | E | 搜尋 & 移除 Header 數據 |
| `poll` | time, unit | E | **限定時間內要取出數據** |
| **`take`**(會阻塞) | void | E | 執行序**一直等待**直到取出任務 |
| `drainTo` | Collection<\? super E> | int | 一次性**取出所有任務**,添加任務到指定集合,返回任務數量 |
| `drainTo` | Collection<\? super E>, max | int | 一次性取出**限制數量任務**,返回任務數量 |
* 「提交任務」到列表
| Name | Params | return | Func |
| ------- | --------------------------- | ------- | ----------------------------------------------------------------------------------------- |
| execute | Runnable | void | **提交任務**去給執行序池執行 |
* 下表則是比較 BlockingQueue 相同目的之下(以下分為插入、移除、檢查),所擁有的不同方法,這些方法也會有不同的反應
| 方法目的 | 操作失敗時拋出異常 | 操作時不拋出,使用返回值 | 操作失敗時一直堵塞 |
| ---- | --------- | -------- | -------- |
| 插入 | `add(E)` | `offer(E)` | `put()` |
| 移除 | `remove()` | `poll()` | `take()` |
| 檢查 | `element()` | `peek()` | --- |
### BlockingQueue 實現:各種阻塞隊列特性
* 我們上面說過 BlockingQueue 是一個界面不能直接實例化,所以這裡就要來介紹有哪些類實作了 BlockingQueue 界面,不同的類個別都會有不同特性,我們可以根據自身的業務需求去做選擇…
首先,我們再來重新複習一下「隊列」的功能:隊列是 **在生產者、消費者之間所需要的 ++容器++,目的是為了 ==平衡兩方生產、消耗不均的問題==**,否則沒辦法協作;
> **生產者、消費者之間不需要知道對方,不會直接產生關係 (就像是生產的流水線關係)**
* Java BlockingQueue 有幾種實現類 (要注意※ **各有不同特性**),以下我們將阻塞隊列分為「**有界**」、「**無界**」來看
:::danger
* **有界 & 無界 ?**
既然是排隊就會有所謂的上限,**而有界就是有上限,無界就是無上限**
**而無界的特性就是,==放入不會被堵塞,++取得會堵塞++==**,當寫入空間不足時會不斷地擴容,值到超出上限發生了 OOM,系會被把這個進程給 kill
:::
* **有界隊列**:
1. **`ArrayBlockingQueue` 隊列** : 由數據結構靜態 Queue 所組成的 **「有界」** 堵塞隊列
它的特性是 FIFO,默認 **不保證執行序(`Thread`)公平** 的訪問對列,但仍可透過參數設定調整為公平(如下圖)
> 
:::info
* **什麼是公平的訪問**?
如果是不公平訪問,意味著 **無論一個執行緒在隊列前面等待了多長時間,新的執行緒仍然可能在它之前獲得訪問權**(效率高)
而公平訪問則是 **按照阻塞的先後順序訪問隊列**(效率較低)
:::
2. **`LinkedBlockingQueue` 隊列** : 由數據結構動態 Queue 所組成的 **「有界」** 堵塞隊列
它的特性是 FIFO,能高效的處理數據主要是因為,**對於生產者消費者使用了個別「獨立」的鎖**,意味著可以分開等待時間,使用高並發進型數據處理
Queue 的容量預設為 `Integer#MAX_VALUE` 的數量,但也可透過參數自己設定
> 
3. **`LinkedBlockingDeque` 隊列** : 由**數據結構雙向 Queue** 所組成的 **「雙向」** 堵塞隊列
> **多執行序同時入隊時,競爭少一半所以速度較快,==雙端可取可放==**
:::info
透過設定值可以影響到它是有界或是無界
:::
* **無界隊列**:
1. **`PriorityBlockingQueue` 隊列** : 支持**優先即排**序的 **「無界」** 堵塞隊列
其特色是插入隊列時無堵塞,而從隊列中取出元素就有堵塞行為
> 默認初始容量為 11,**可自定 `compareTo` 方法,自訂比較器**
2. **`LinkedTransferQueue` 隊列** : 由數據結構 Queue 所組成的 **「無界」** 堵塞隊列,實現了重要界面 [TransferQueue](https://developer.android.com/reference/java/util/concurrent/TransferQueue.html)
特性是提供比 `LinkedBlockingQueue` 更高效的轉移操作
除了擁有 `LinkedBlockingQueue` 的所有功能外,它還提供了一個 `transfer` 方法,該方法允許將一個元素直接轉移給消費者,而不是將其排入隊列
> 該隊列特別適合在生產者和消費者的數量相差不大且需要快速傳遞的場景中使用。它也能在生產者和消費者數量不平衡的情況下,通過等待策略來實現高效運行
3. **`DelayQueue` 隊列** : 使用**優先即排序**的「**無界**」堵塞隊列,剩餘時間越短越快取出,並且時間到後才能取出
> DelayQueue 是一個泛型,泛型有規範必須實現 Delayed 界面,並且該界面實現 `Comparable<Delayed>` 界面,支持可延遲獲取元素
4. **`SynchroniusQueue` 隊列** : **不儲存元素**的隊列,解偶生產者 & 消費者
> **okHttp 有使用**,目的是添加元素一失敗,馬上創建新的執行序去訪問網路資源
### ArrayBlockingQueue & LinkedBlockingQueue:鎖的差別 🔒
:::info
如果不清楚「鎖 🔒」的概念,可以先去了解 [**全面解析多執行緒與同步技術:SYNC、CAS、ThreadLocal | 公平鎖、可重入鎖、樂觀鎖**](https://devtechascendancy.com/multithread-sync-cas-thread-local-guide/)
:::
* **`ArrayBlockingQueue`、`LinkedBlockingQueue` 兩者都是使用 ReentrantLock 顯式可重入鎖,但是兩者 ++鎖的實現有差別++**
* ArrayBlockingQueue 實現的隊列,**生產、消費是使用「同把鎖」,效率較低**,因為等待生產者與消費者之間必須相互等待,無法獨立作業
> 
* LinkedBlockingQueue **鎖是分離**的,**生產使用的是 putLock,消費使用的是 takeLock,可分開做等待、喚醒的動作,加強的效率**
> 
2. **生產 & 消費時的操作不同**
* `ArrayBlockingQueue` **使用 Array 陣列**,所以在取值時速度較快
* `LinkedBlockingQueue` **使用 Linked 數據結構**,再取值時需要再 `new Node<E>` 進行插入、移除,**所以 Linked 效率會低一點**
3. **初始化大小**
* `Array` 必須指定初始化陣列的數值,用此數值來創建陣列大小
* `Linked` 在創建時不需要指定大小,因為它是使用串列,有頭指標
### SynchronousQueue
* [**SynchronousQueue**](https://cs.android.com/android/platform/superproject/main/+/main:libcore/ojluni/src/main/java/java/util/concurrent/SynchronousQueue.java?q=SynchronousQueue&ss=android%2Fplatform%2Fsuperproject%2Fmain) **其特色是它只會儲存當前元素** (只存一個元素,但是有稱為隊列),每一個 put 操作都必須等待一個 take 消費,否則不能添加元素
```java=
// SynchronousQueue.java
// 內部並沒有等待
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
```
### LinkedTransferQueue
* [**LinkedTransferQueue**](https://cs.android.com/android/platform/superproject/main/+/main:libcore/ojluni/src/main/java/java/util/concurrent/LinkedTransferQueue.java) 特色主要在於 transfer 方法
| 方法名 | 功能 |
| - | - |
| transfer | 當數據還在等待時間 (時間到才能被消費),生產者一個立即給消費者; **當沒有消費者時如同 put 會做等待** |
| tryTransfer | 該方法用來試探傳入元素是否能直接給消費者,如果沒有消費者則返回 false,**不做等待,算是一種試探** |
### LinkedBlockingDequeue
* 其使用的方式是雙端隊列,雙端隊列可以增加數據的吞吐量
> 
## 實現生產者、消費者模型
接下來我們就透過 Java 中已有的一些共能(鎖、阻塞隊列)來實現產生產者與消費者模型
### ArrayBlockingQueue 實現模型
* ArrayBlockingQueue 內部就實現了堵塞,如果我們想要單純的使用 Java 內置的阻塞隊列來實現生產者、消費者模型也是可以的,範例如下
* 使用 ArrayBlockingQueue 阻塞隊列
以下簡單的使用靜態資源(`static member`)
```java=
abstract class abCommonClass {
protected static final int SIZE = 10;
protected static ArrayBlockingQueue<String> a = new ArrayBlockingQueue<>(SIZE, true);
}
```
:::success
* 為什麼要使用靜態資源?
其實只是為個簡單示範而已,這樣我在外部就不需要創建隊列… 而使用靜態(`static`)成員的原因是因為,「生產者、消費者」的目標容器要相同,否則就無法協作!
:::
* **消費者**(`consumer`):
首先消費者進入無限循環,不斷的取出元素,直到元素都被取完就「阻塞」(`poll` 方法的特性,如果沒有元素可取 `poll` 就會阻塞當前執行緒)
```java=
class abConsumer extends abCommonClass implements Runnable {
@Override
public void run() {
while(true) {
try {
TimeUnit.SECONDS.sleep(3);
a.poll();
System.out.println("Poll Data " + a.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
```
* **生產者**(`producer`):
相同,生產者進入無限循環,不斷的放入元素,直到隊列被填滿就「阻塞」(`offer` 方法的特性,如果隊列滿 `offer` 就會阻塞當前執行緒)
```java=
class abProducer extends abCommonClass implements Runnable {
@Override
public void run() {
while(true) {
try {
TimeUnit.SECONDS.sleep(1);
a.offer("String");
System.out.println("Offer Data " + a.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
```
* 測試自實現的生產者-消費者模型
```java=
public class testArrayBlockingQueue {
public static void main(String[] args) {
abConsumer c = new abConsumer();
abProducer p = new abProducer();
new Thread(c).start();
new Thread(p).start();
}
}
```
**--實做--**
> 
### PriorityQueue、Synchronized 實現模型
* 以下我們自己去實現實現生產者、消費者模型
* **鎖** 🔒:鎖是為了要顧慮到生產者、消費者模型的執行緒安全
這邊簡單的使用靜態資源(成員 `p`)作為鎖
```java=
abstract class commonKey {
// 限制 Queue 的大小
protected static final int SIZE = 10;
// Java 的阻塞隊列
protected static PriorityQueue<String> p = new PriorityQueue<>(SIZE);
}
```
:::info
* Java 的阻塞隊列內就已經有鎖了,為什麼還要鎖?
這是因為阻塞隊列的鎖是針對隊列內的操作元素,而不是外部的操作,我們現在的鎖是針對 **生產者、消費者操作的鎖**
:::
* **消費者**(`consumer`):
首先消費者開始運行後,^1.^進入 while 無限循環,不斷判斷是否有元素要消耗,並且在 ^2.^ 判斷元素時使用鎖 🔒 保障資源的安全消耗
> 鎖配合 `synchronized` 關鍵字
如果 ^3.^ 「沒有元素」要消耗則執行序進入 `wait()` 阻塞休眠,^4.^ 消耗則使用 `poll()` 方法,並呼叫 `notify` 通知生產這繼續生產
```java=
class consumer extends commonKey implements Runnable {
@Override
public void run() {
while(true) {
synchronized (p) {
while(p.size() == 0) {
try {
System.out.println("Queue Empty can not Poll");
p.wait(); // Object's method
} catch (InterruptedException e) {
e.printStackTrace();
p.notify();
}
}
try {
//TimeUnit.SECONDS.sleep(1);
System.out.println("Poll Data " + p.size());
p.poll();
p.notify();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
```
* **生產者**(`producer`):
同樣生產者者開始運行後,^1.^進入 while 無限循環,不斷判斷是否有元素需要消耗,並且在 ^2.^ 判斷元素時使用鎖 🔒 保障資源的安全消耗
> 鎖配合 `synchronized` 關鍵字
如果 ^3.^ 「元素超過隊列限制」則生產者執行序進入 `wait()` 阻塞休眠,^4.^ 生產則使用 `offer()` 方法,並呼叫 `notify` 通知消費者繼續消費
```java=
class producer extends commonKey implements Runnable {
@Override
public void run() {
while(true) {
synchronized (p) {
while(p.size() == SIZE) {
try {
System.out.println("Queue Full can not offer anymore");
p.wait();
} catch (InterruptedException e) {
e.printStackTrace();
p.notify();
}
}
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("producer " + p.size());
p.offer("Hello");
p.notify();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
```
* 測試自實現的生產者-消費者模型
```java=
public class testBlockingQueue {
public static void main(String[] args) {
consumer c = new consumer();
producer p = new producer();
Thread t1 = new Thread(c);
Thread t2 = new Thread(p);
t1.start();
t2.start();
}
}
```
> 
### PriorityQueue、ReentrantLock 實現模型:更高效能
* 接著,如同上個小節的範例,上個小節的範例在使用鎖時使用 `synchronized` 關鍵字配合「同把鎖」來達到生產者、消費者
這種實現雖然可以完成模型,不過效能不夠好,所以這裡我們透過鎖的優化(使用 `ReentrantLock`),來達成 **生產者、消費者使用「不同條件來操作鎖 🔒」**
* **鎖** 🔒:鎖是為了要顧慮到生產者、消費者模型的執行緒安全
這邊進階改成 `ReentrantLock` 作為鎖,並分別出兩個條件分別給生產者(使用 `con` 成員)、消費者控制(使用 `pro` 成員)
```java=
class rCommonReentrant {
// 隊列大小限制
protected static final int SIZE = 10;
// Java 的阻塞隊列
protected static final PriorityQueue<String> p = new PriorityQueue<>(SIZE);
// Java 的可重入鎖
protected static ReentrantLock r = new ReentrantLock();
// 創建兩個條件去解鎖
protected static Condition con = r.newCondition();
protected static Condition pro = r.newCondition();
}
```
* **消費者**(`Consumer`):
這裡我們只說明不同點,首先 ^1.^ 使用鎖不再是透過 `synchronized` 關鍵字,而是使用 `lock`、`unlock` 操作,再來 ^2.^ 鎖的等待使用消費者自身的 Condition#`await` 方法
在消耗完元素後使用生產者的 Condition#`signal` 喚醒生產者
```java=
class rConsumer extends rCommonReentrant implements Runnable {
@Override
public void run() {
while(true) {
try {
r.lock();
while(p.size() == 0) {
System.out.println("Queue Empty can not Poll");
con.await();
}
//TimeUnit.SECONDS.sleep(1);
p.poll();
System.out.println("Poll Data " + p.size());
// 緩醒生產者
pro.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
r.unlock();
}
}
}
}
```
* **生產者**(`Producer`):
這裡我們只說明不同點,首先 ^1.^ 使用鎖不再是透過 `synchronized` 關鍵字,而是使用 `lock`、`unlock` 操作,再來 ^2.^ 鎖的等待使用生產者自身的 Condition#`await` 方法
在生產完元素後使用消費者的 Condition#`signal` 喚醒消費者
```java=
class rProducer extends rCommonReentrant implements Runnable {
@Override
public void run() {
while(true) {
try {
r.lock();
while(p.size() == 10) {
System.out.println("Queue Full can not offer anymore");
pro.await();
}
// 故意拖延生產
TimeUnit.SECONDS.sleep(1);
p.offer("Hello");
System.out.println("offer Data " + p.size());
// 喚醒消費者
con.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
r.unlock();
}
}
}
}
```
* 測試自實現的生產者-消費者模型
```java=
public class reenBlocking {
public static void main(String[] args) {
rConsumer c = new rConsumer();
rProducer p = new rProducer();
new Thread(c).start();
new Thread(p).start();
}
}
```
> 
## Appendix & FAQ
:::info
:::
###### tags: `Java 基礎進階`