---
title: 'RTOS - Thread 同步、通訊'
disqus: kyleAlien
---
RTOS - Thread 同步、通訊
===
## OverView of Content
如果要 Mutli Thread 共同協作,那 Thread 同步、通訊概念就相當重要
:::danger
**Thread 的同步、通訊是兩種不同的概念** (雖然很常一起說,不過我們還要是清楚分清這兩者的差異)
:::
[TOC]
## Thread 同步
* 最常用的 3 種 Thread 同步方案如下
1. **Semaphore 信號量**
2. **Mutex 互斥量**
3. **Event 事件**
### Semaphore 信號量
:::info
**Semaphore 之前是用在航海中的船隻做通訊,所有原來也有 ++旗語++ 的意思**
:::
* Semaphore 包括了 ^1^ 信號量(`Semaphore`)、^2^ Operation P(`Wait`)、^3^ Operation V(`Single`),**信號量代表個 ++共用的資源++**,P 表示消耗資源,V 回歸資源
* 要注意幾個點,就能了解
1. 對 S 的操作都是 **原子操作**,也就是不可再切割得更小操作
2. P 在資源不足時會等待
3. V 在資源足夠時會去喚醒等待的 P 操作
> P 是 Consumer、V 是 Producer
>
> 
* 信號量可看做一個計時器,用來控制多個進程訪問 **同一資源**(符合定義),常用來作為一種同步手段
* 信號量注意點
1. **死鎖**:如果 ThreadA 一直等不到 ThreadB 通知,而 ThreadB 又在等待 ThreadA 那兩者都無法執行,這種情況稱為 死鎖
:::danger
信號量是基礎的 Thread 同步方案,如果沒有適時的釋放訊號,可能就會導致死鎖
:::
> 
2. **Thread 優先級失效**:因為鎖的關係導致優先級改變
:::warning
可能高優先度的 ThreadA 在等待信號,而低優先度的 ThreadB 就先進行任務,導致低優先度的 ThreadB 先完成任務
:::
> 
| 功能 | RTT_API | 說明 |
| -------- | -------- | -------- |
| 創建 | `rt_sem_create()` | 創建一個信號量 |
| 初始化 | `rt_sem_init()` | 隊指定信號量做初始化 |
| 獲取 | `rt_sem_take()` | 獲取信號量,如果 **沒有獲取道則會掛起 (wait)** |
| 獲取 | `rt_sem_trytake()` | 直接獲取,獲取不到也不會等待 |
| 釋放 | `rt_release()` | 釋放信號量,信號量 + 1 |
| 刪除 | `rt_sem_delete()`/`rt_sem_detach()` | delete 是靜態刪除、detach 是動態刪除 |
### Semaphore 信號量 - 二值信號量
* 二值信號量是一種簡化版本的 Semaphore,其值就只有 0、1 (就像是一個 boolean),**資源只有佔用 (lcoked) 跟 被佔用中 (unlocked)**,本質上並沒有差異
> 
### Mutex 互斥量
:::info
特點是 Mutex 具有 **==所有權==** 的概念,**只有所有者可以釋放 Mutex 資源,信號量則是任何 Thread 都可以釋放**
:::
* Mutex 是一種 **特殊的二值信號量**,它具有 **排他性**,其特點如下 (與 Semaphore 差異點)
1. **所有者概念:** Thread 只要獲取 Mutex 資源,該 Mutex 資源就屬於該 Thread (類似有一個標記),在獲取者釋放之前其他 Thread 無法獲取
> 
:::info
* 獲取該 Mutex 資源者可以反覆獲取,不用擔心遞迴操作把自身鎖死
```c=
// 概念程式
struct info_t info get_information(struct info_t info, int target) {
// 不會把自身鎖死
rt_mutex_take(...);
if(info.target == target) {
return info;
}
get_information(info.next, target); // 遞歸操作
}
```
:::
2. **解決優先權改變問題:** 當有另外一個優先度較高的 Thread 在等待 Mutex 資源時,**OS 會將正在處理 Mutex 的 Thread 暫時提高到跟等待者一樣高的優先級,讓 CPU 優先處理**
> 
| 功能 | RTT_API | 說明 |
| -------- | -------- | -------- |
| 創建 | `rt_mutex_create()` | 創建一個 mutex |
| 初始化 | `rt_mutex_init()` | 隊指定 mutex 做初始化 |
| 獲取 | `rt_mutex_take()` | 指定時間內 獲取 mutex,如果 **沒有獲取道則會掛起 (wait)** |
| 釋放 | `rt_mutex_release()` | 釋放 mutex |
| 刪除 | `rt_mutex_delete()`/`rt_mutex_detach()` | delete 是靜態刪除、detach 是動態刪除 |
### Mutex 互斥量 - 實驗
1. 創建 `mutex.h` 檔案:該檔案主要宣告函數給 main 使用
```c=
// mutex.h
/* defined the LED2 pin: PD13 */
#define LED2_PIN GET_PIN(D, 13)
/* defined the LED3 pin: PD14 */
#define LED3_PIN GET_PIN(D, 14)
/* defined the LED4 pin: PD15 */
#define LED4_PIN GET_PIN(D, 15)
extern int mutex_creaet(void);
extern int thread_create(void);
extern void main_mutex_test(void);
```
2. 創建一個 `mutex_test.c` 檔案:該文件專門處理 mutex 相關測試函數;創建 2 個 thread 不斷開關 LED (**開關前必須獲得 Mutex 鎖**)
```c=
// mutex_test.c
#include <rtthread.h>
#include "drv_common.h"
#define LED2_PRIO 8
#define LED2_STACK_SIZE 256
#define LED2_TICKS 2
#define LED3_PRIO 9
#define LED3_STACK_SIZE 256
#define LED3_TICKS 2
static rt_mutex_t led_2_mutex = RT_NULL;
static rt_mutex_t led_3_mutex = RT_NULL;
typedef void (*entry)(void *parameter);
// take mutex 的模板
void take_mutex(rt_mutex_t * const t, rt_base_t pin, int delay_time, const char *name) {
uint16_t count;
rt_mutex_take(*t, RT_WAITING_FOREVER);
for(count = 0; count < 3; count++) {
rt_kprintf("Thread name: %s, time: %d\n", name, count);
rt_pin_write(pin, PIN_LOW);
rt_thread_mdelay(delay_time);
rt_pin_write(pin, PIN_HIGH);
rt_thread_mdelay(delay_time);
}
rt_mutex_release(*t);
}
// 創建 mutex 的模板
static int _mutex_create(rt_mutex_t * const t, const char *name, rt_uint8_t flag) {
*t = rt_mutex_create(name, flag);
if(*t == RT_NULL) {
rt_kprintf("Create mutex %s fail", name);
return 0;
}
return 1;
}
int mutex_creaet(void) {
int res_1 = _mutex_create(&led_2_mutex, "LED_2", RT_IPC_FLAG_FIFO);
int res_2 = _mutex_create(&led_3_mutex, "LED_3", RT_IPC_FLAG_FIFO);
return res_1 && res_2;
}
// 創建 thread 的模板
static int _thread_create(const char *name,
entry func,
const int size,
const int prio,
const int tick) {
rt_thread_t tid;
tid = rt_thread_create(name,
func,
RT_NULL,
size,
prio,
tick
);
if(tid == RT_NULL) {
rt_kprintf("Create mutex %s fail", name);
return 0;
}
rt_thread_startup(tid);
return 1;
}
static void led_2(void *parameter){
while(1) {
take_mutex(&led_2_mutex, LED2_PIN, 50, "led_2_mutex LED_2_THREAD");
}
}
static void led_3(void *parameter){
while(1) {
take_mutex(&led_3_mutex, LED3_PIN, 50, "led_3_mutex LED_3_THREAD");
}
}
// 創建兩個 Thread
int thread_create() {
int res_1 = _thread_create("LED_2_THREAD",
led_2,
LED2_STACK_SIZE,
LED2_PRIO,
LED2_TICKS);
int res_2 = _thread_create("LED_3_THREAD",
led_3,
LED3_STACK_SIZE,
LED3_PRIO,
LED3_TICKS);
return res_1 && res_2;
}
void main_mutex_test(void){
while(1) {
take_mutex(&led_2_mutex, LED2_PIN, 500, "led_2_mutex Main");
take_mutex(&led_3_mutex, LED3_PIN, 500, "led_3_mutex Main");
}
}
```
3. main 使用
```c=
// main.c
#include <rtthread.h>
#include <rtdevice.h>
#include "drv_common.h"
#include "drv_gpio.h"
#include "mutex.h"
int main(void)
{
mutex_creaet();
thread_create();
while (1)
{
main_mutex_test();
}
}
```
下圖結果為 Main Thread 取得 `led_2_mutex`:所以同樣有 `led_2_mutex` 的 LED_2_Thread 會進行休眠等待,而 LED_3_Thread 正常運行
> 
:::info
* 如果需要 Thread 對 LED 輪流閃爍只需要修改 mutex 的目標
```c=
static void led_2(void *parameter){
while(1) {
// 使用同樣的 mutex 鎖
take_mutex(&led_2_mutex, LED2_PIN, 50, "led_2_mutex LED_2_THREAD");
}
}
static void led_3(void *parameter){
while(1) {
// 使用同樣的 mutex 鎖
take_mutex(&led_2_mutex, LED3_PIN, 50, "led_3_mutex LED_3_THREAD");
}
}
```
:::
### Event 事件
* Evnet 事件是一個標記,**標註了某一個行為被觸發,觸發後會 ++通知所有等待該事件的 Thread++,而具體要使用哪個 Thread 來處理事件要看使用者處理**
> 
* 使用 Event 同步的特性如下
1. **可以 `1 對 1`、`1 對多`**
2. **事件沒有緩存對列**:當下獲取到的就是最新事件
:::info
* RTT 的事件有 32 Bit 可以設定,每個 bit 代表了一個事件,需要多事件時可以同時寫入這個位元組
:::
| 功能 | RTT_API | 說明 |
| -------- | -------- | -------- |
| 創建 | rt_event_create() | 創建一個 event |
| 初始化 | rt_event_init() | 隊指定 event 做初始化 |
| 獲取 | rt_event_recv() | 指定時間內 主動獲取 event,如果 **沒有獲取道則會掛起 (wait)** |
| 傳遞 | rt_event_send() | 發送 event 事件 |
| 刪除 | rt_event_delete()/rt_event_detach() | delete 是靜態刪除、detach 是動態刪除 |
### 硬體中斷 & 同步機制
* 這裡有一個重點是,**不管哪種同步機制,都不可以在中斷時進行有關 wait (堵塞) 的行為**,發出 release 訊號還是可以的,但是獲取則不行
:::warning
**在中斷時 Mutex 更是 ++不能獲取++,++也不能釋放++**,**因為進入中斷時不一定是 Mutex 資源所屬的 Thread**,這會導致中斷被卡死
:::
## Thread 通訊
同步機制簡單來說就是傳遞一個標記,不能傳遞更多的訊息
### Mailbox 郵箱
* Mailbox 機制是最常見的通訊方式,OS 是充當 **郵局** 的腳色;Mailbox 有以下特色
1. 郵箱中的信 **一般是按照 FIFO 的規則** (可使用 flag 參數決定不同數據結構)
2. 郵箱 **有大小限制**
3. **每個 Thread 都有一個專用郵箱**
> 
* RTT 的郵箱規定:其內容只能是 4 Byte,**這個大小剛好可以傳遞一個指針**
:::info
這個限制主要是為了效率,限制每個郵件內容都為 4 個 Byte 代表了 **相同的處理時間 (這時間是可以準確計算),這對 RTOS 相當重要 !**
:::
* RTT 的郵箱也有分為靜態、動態創建,^1.^ 靜態對象的內存空間是永遠被占用、^2.^ 動態空間的內存對象是可以被釋放的
| 功能 | RTT_API | 說明 |
| -------- | -------- | -------- |
| 創建 | rt_mb_create() | 創建一個 mb |
| 初始化 | rt_mb_init() | 隊指定 mb 做初始化 |
| 獲取 | rt_mb_recv() | 指定時間內主動獲取 mb |
| 傳遞 | rt_mb_send()/rt_mb_send_wait() | 發送 mb 事件,多了 wait 關鍵字則會等待固定 Tick 時間,如果 **沒有發送則會掛起 (wait)** |
| 刪除 | rt_mb_delete()/rt_mb_detach() | delete 是靜態刪除、detach 是動態刪除 |
:::warning
* 這裡較為不同的是,如果 **郵箱已滿 Send 可能也會導致 Wait (掛起) 狀態**
:::
### MailBox - 實驗
1. 創建 `box_mail.h`:宣告 main 會使用到的函數
```c=
// box_mail.h
/* defined the LED2 pin: PD13 */
#define LED2_PIN GET_PIN(D, 13)
/* defined the LED3 pin: PD14 */
#define LED3_PIN GET_PIN(D, 14)
/* defined the LED4 pin: PD15 */
#define LED4_PIN GET_PIN(D, 15)
extern void main_msg_test(void);
extern int create_mail(void);
extern int thread_create_for_mail(void);
```
2. 創建 `box_mail_test.c` 檔案:該檔案負責創建 2 個 Thread 來開關 LED (**透過收到的訊息來決定要開還是關**)
```c=
// box_mail_test.c
#include <rtthread.h>
#include "drv_common.h"
#include "box_mail.h"
#define LED2_PRIO 8
#define LED2_STACK_SIZE 256
#define LED2_TICKS 2
#define LED2_ACK 0x01
#define LED3_PRIO 9
#define LED3_STACK_SIZE 256
#define LED3_TICKS 2
#define LED3_ACK 0x02
#define MAX_MSG_NUM 32
static rt_mq_t mq_led_2 = RT_NULL;
static rt_mq_t mq_led_3 = RT_NULL;
static rt_mailbox_t box_led2 = RT_NULL;
static rt_mailbox_t box_led3 = RT_NULL;
static char LED_ON[] = "On";
static char LED_OFF[] = "Off";
enum THREAD_ID {
MAIN = 1,
LED_2_THREAD,
LED_3_THREAD,
UNKNOW,
};
struct led_msg_t {
enum THREAD_ID target_thread;
char info[4];
rt_mailbox_t ack;
};
typedef void (*entry)(void *parameter);
// 創建 queue 的模板
static int _create_queue(rt_mq_t * const queue, const char *name, int flag) {
*queue = rt_mq_create(name,
sizeof(struct led_msg_t),
MAX_MSG_NUM,
flag);
if(*queue == RT_NULL) {
rt_kprintf("Create queue %s fail\n", name);
return 0;
}
return 1;
}
// 創建 mail 的模板
static int _create_mail(rt_mailbox_t * const box, const char *name, int flag) {
*box = rt_mb_create(name, MAX_MSG_NUM * 4, flag);
if(*box == RT_NULL) {
rt_kprintf("Create mail %s fail\n", name);
return 0;
}
return 1;
}
int create_mail(void) {
int res_1 = _create_queue(&mq_led_2, "LED_2_QUEUE", RT_IPC_FLAG_FIFO);
int res_2 = _create_queue(&mq_led_3, "LED_3_QUEUE", RT_IPC_FLAG_FIFO);
if(!res_1 || !res_2) {
return 1;
}
res_1 = _create_mail(&box_led2, "LED_2_MAIL_BOX", RT_IPC_FLAG_FIFO);
res_2 = _create_mail(&box_led3, "LED_3_MAIL_BOX", RT_IPC_FLAG_FIFO);
return res_1 && res_2;
}
// 創建 thread 的模板
static int _msg_thread_create(const char *name,
entry func,
const int size,
const int prio,
const int tick) {
rt_thread_t tid;
tid = rt_thread_create(name,
func,
RT_NULL,
size,
prio,
tick
);
if(tid == RT_NULL) {
rt_kprintf("Create mutex %s fail\n", name);
return 0;
}
rt_thread_startup(tid);
return 1;
}
// 接收 mail's msg 的模板
static void receive_msg(const rt_mq_t * const queue,
enum THREAD_ID thread_id,
rt_base_t pin,
int ack) {
struct led_msg_t msg;
// 等待指定 queue 的 msg 訊息
rt_mq_recv(*queue, &msg, sizeof(struct led_msg_t), RT_WAITING_FOREVER);
// 判斷是否屬於該 thread
if(msg.target_thread != thread_id) {
rt_kprintf("Error thread: %d\n", thread_id);
return;
}
if(rt_strcmp(msg.info, LED_ON)) {
rt_pin_write(pin, PIN_LOW);
rt_thread_mdelay(500);
rt_mb_send(msg.ack, ack);
rt_kprintf("THREAD_ID: %d open led\n", thread_id);
} else if(rt_strcmp(msg.info, LED_OFF)) {
rt_pin_write(pin, PIN_HIGH);
rt_thread_mdelay(500);
rt_mb_send(msg.ack, ack);
rt_kprintf("THREAD_ID: %d close led\n", thread_id);
} else {
rt_kprintf("Unknow msg: %s\n", msg.info);
}
}
static void led_2_thread_recv(void) {
while(1) {
receive_msg(&mq_led_2, LED_2_THREAD, LED2_PIN, LED2_ACK);
}
}
static void led_3_thread_recv(void) {
while(1) {
receive_msg(&mq_led_3, LED_3_THREAD, LED3_PIN, LED3_ACK);
}
}
// 創建 mail
int thread_create_for_mail(void) {
int res_1 = _msg_thread_create("LED_2_THREAD_MSG",
(void *)led_2_thread_recv,
LED2_STACK_SIZE,
LED2_PRIO,
LED2_TICKS);
int res_2 = _msg_thread_create("LED_3_THREAD_MSG",
(void *)led_3_thread_recv,
LED3_STACK_SIZE,
LED3_PRIO,
LED3_TICKS);
return res_1 && res_2;
}
// 透過 main 來發送訊息給 Mail 來控制 LED
static void send_msg_to_led(enum THREAD_ID target_thread,
rt_mailbox_t *reply_ack,
rt_mq_t *queue,
int target_ack) {
uint16_t count;
struct led_msg_t msg = {0};
for(count = 0; count < 3; count++) {
rt_ubase_t value;
// Open LED
msg.target_thread = target_thread;
rt_strcpy(msg.info, LED_ON);
msg.ack = *reply_ack;
// 對指定 queue 傳送訊息 !
rt_mq_send(*queue, &msg, sizeof(struct led_msg_t));
// 設定接收的 mailbox
rt_mb_recv(*reply_ack, &value, RT_WAITING_FOREVER);
if(value != target_ack) {
rt_kprintf("Thread %d get err msg\n", target_thread);
continue;
}
// Close LED
msg.target_thread = target_thread;
rt_strcpy(msg.info, LED_OFF);
msg.ack = *reply_ack;
rt_mq_send(*queue, &msg, sizeof(struct led_msg_t));
rt_mb_recv(*reply_ack, &value, RT_WAITING_FOREVER);
}
}
void main_msg_test(void) {
send_msg_to_led(LED_2_THREAD, &box_led2 ,&mq_led_2, LED2_ACK);
send_msg_to_led(LED_3_THREAD, &box_led3 ,&mq_led_3, LED3_ACK);
}
```
3. Main 使用呼叫測試
```c=
// main.c
#include <rtthread.h>
#include <rtdevice.h>
#include "drv_common.h"
#include "drv_gpio.h"
#include "mutex.h"
#include "box_mail.h"
int main(void)
{
rt_pin_mode(LED2_PIN, PIN_MODE_OUTPUT);
rt_pin_mode(LED3_PIN, PIN_MODE_OUTPUT);
rt_pin_mode(LED4_PIN, PIN_MODE_OUTPUT);
rt_pin_write(LED4_PIN, PIN_HIGH);
create_mail();
thread_create_for_mail();
while (1)
{
main_msg_test();
}
}
```
> 
:::info
* **Mail Box 看似 thread 在等待 msg,但其實 Thread 是在等待 OS 內核的某個 Queue 中的訊息**
消息對列 (以這邊來說就是 `rt_mq_t`) **由使用者創建,在這之後就是由 OS 來維護**,其他 Thread 只要將消息丟入 消息對列,OS 就會將該消息轉發
> OS 並不會在意是哪個 Thread 對 Queue 填充消息,所以需要注意判斷
:::
### MessageQueue 消息隊列
* MessageQueue 消息隊列與 Mb 郵件最不一樣的是,**MessageQueue 發送的內容是不定長度的信息 (Message)**,再將這些 Message 組成隊列
1. MessageQueue 遵從 Queue 的數據結構特性 FIFO
2. 同樣有大小空間限制
3. 需指定 Msg 最大長度
> 
:::success
**Queue 可以插入 ==緊急消息== 到隊列最前方**
:::
* MessageQueue & OS 運行的概念如下
1. 鏈結一個空閒列表,當需要發送訊息時,就會取出一個空的消息 (Message)
2. 將數據 copy 進 Msg 空間
3. 將 Msg 放置到另外一個待處理的隊列中
4. 當有 Thread 要取出數據時,再將數據 copy 進目標 Thread 的空間 (兩次拷貝)
> 
| 功能 | RTT_API | 說明 |
| -------- | -------- | -------- |
| 創建 | rt_mq_create() | 創建一個 mq |
| 初始化 | rt_mq_init() | 隊指定 mq 做初始化 |
| 獲取 | rt_mq_recv() | 指定時間內主動獲取 mq |
| 傳遞 | rt_mq_send()/**rt_mq_send_urgent**() | 發送 mq 事件,多了 **urgent 關鍵字則會將消息插入到隊列最前方** |
| 刪除 | rt_mq_delete()/rt_mq_detach() | delete 是靜態刪除、detach 是動態刪除 |
### 信號
:::warning
**請不要與 Semaphore 信號量 搞混 !!**
:::
* 這裡的信號是模擬了一個硬體的中斷,通常實現方法是使用新片提供的 **軟體中斷指令,所以其本質上也是一個中斷**
1. **安裝處理 Function:** 既然是像中斷,那就跟中斷的使用方式類似,中斷是在運行前先設定中斷向量,所以信號則是在 Thread **安裝好要處理的函數後**,自動觸發
> 每個 Thread 都可以有自己的信號處理函數
:::info
POSIX 的表準用戶 Thread 只能接收到兩個信號 **SIGUSR1(10)**、**SIGUSR2(12)**,所以 RTT OS 有提供對應的 API
:::
2. **不存在 wait 問題:** 如同中斷一樣,只是有沒有被觸發,一般來說 Thread 仍可繼續運行 (但仍有 Function 可讓 Thread 等待)
3. 信號與互斥量一樣,有 **==所有者==** 的概念 !! **Thread 在接收到信號後,處理資料的地方就是該 Thread 私有的 Stack,所以要特別注意 Stack 的大小**
* RTT 預設不開啟信號機制
| 功能 | RTT_API | 說明 |
| -------- | -------- | -------- |
| 安裝 | rt_signal_install() | 安裝信號被觸發時要處理的函數,信號只有 **SIGUSR1(10)**、**SIGUSR2(12)** |
| 堵塞 | rt_singal_mask() | 堵塞,信號只有 **SIGUSR1(10)**、**SIGUSR2(12)** |
| 解除堵塞 | rt_singal_mask() | 解除堵塞,信號只有 **SIGUSR1(10)**、**SIGUSR2(12)** |
| 發送 | rt_thread_kill() | 接收信號的 Thread 句柄,信號只有 **SIGUSR1(10)**、**SIGUSR2(12)** |
| 等待 | rt_singal_wait() | 指定等待的訊號 |
## Appendix & FAQ
:::info
:::
###### tags: `RTOS`