--- title: 'RTOS - Thread 同步、通訊' disqus: kyleAlien --- RTOS - Thread 同步、通訊 === ## OverView of Content 如果要 Mutli Thread 共同協作,那 Thread 同步、通訊概念就相當重要 :::danger **Thread 的同步、通訊是兩種不同的概念** (雖然很常一起說,不過我們還要是清楚分清這兩者的差異) ::: [TOC] ## Thread 同步 * 最常用的 3 種 Thread 同步方案如下 1. **Semaphore 信號量** 2. **Mutex 互斥量** 3. **Event 事件** ### Semaphore 信號量 :::info **Semaphore 之前是用在航海中的船隻做通訊,所有原來也有 ++旗語++ 的意思** ::: * Semaphore 包括了 ^1^ 信號量(`Semaphore`)、^2^ Operation P(`Wait`)、^3^ Operation V(`Single`),**信號量代表個 ++共用的資源++**,P 表示消耗資源,V 回歸資源 * 要注意幾個點,就能了解 1. 對 S 的操作都是 **原子操作**,也就是不可再切割得更小操作 2. P 在資源不足時會等待 3. V 在資源足夠時會去喚醒等待的 P 操作 > P 是 Consumer、V 是 Producer > > ![](https://i.imgur.com/JTAT5ra.png) * 信號量可看做一個計時器,用來控制多個進程訪問 **同一資源**(符合定義),常用來作為一種同步手段 * 信號量注意點 1. **死鎖**:如果 ThreadA 一直等不到 ThreadB 通知,而 ThreadB 又在等待 ThreadA 那兩者都無法執行,這種情況稱為 死鎖 :::danger 信號量是基礎的 Thread 同步方案,如果沒有適時的釋放訊號,可能就會導致死鎖 ::: > ![](https://i.imgur.com/C6jIJJc.png) 2. **Thread 優先級失效**:因為鎖的關係導致優先級改變 :::warning 可能高優先度的 ThreadA 在等待信號,而低優先度的 ThreadB 就先進行任務,導致低優先度的 ThreadB 先完成任務 ::: > ![](https://i.imgur.com/0vExbCJ.png) | 功能 | RTT_API | 說明 | | -------- | -------- | -------- | | 創建 | `rt_sem_create()` | 創建一個信號量 | | 初始化 | `rt_sem_init()` | 隊指定信號量做初始化 | | 獲取 | `rt_sem_take()` | 獲取信號量,如果 **沒有獲取道則會掛起 (wait)** | | 獲取 | `rt_sem_trytake()` | 直接獲取,獲取不到也不會等待 | | 釋放 | `rt_release()` | 釋放信號量,信號量 + 1 | | 刪除 | `rt_sem_delete()`/`rt_sem_detach()` | delete 是靜態刪除、detach 是動態刪除 | ### Semaphore 信號量 - 二值信號量 * 二值信號量是一種簡化版本的 Semaphore,其值就只有 0、1 (就像是一個 boolean),**資源只有佔用 (lcoked) 跟 被佔用中 (unlocked)**,本質上並沒有差異 > ![](https://i.imgur.com/jGmfYfG.png) ### Mutex 互斥量 :::info 特點是 Mutex 具有 **==所有權==** 的概念,**只有所有者可以釋放 Mutex 資源,信號量則是任何 Thread 都可以釋放** ::: * Mutex 是一種 **特殊的二值信號量**,它具有 **排他性**,其特點如下 (與 Semaphore 差異點) 1. **所有者概念:** Thread 只要獲取 Mutex 資源,該 Mutex 資源就屬於該 Thread (類似有一個標記),在獲取者釋放之前其他 Thread 無法獲取 > ![](https://i.imgur.com/iLK5niL.png) :::info * 獲取該 Mutex 資源者可以反覆獲取,不用擔心遞迴操作把自身鎖死 ```c= // 概念程式 struct info_t info get_information(struct info_t info, int target) { // 不會把自身鎖死 rt_mutex_take(...); if(info.target == target) { return info; } get_information(info.next, target); // 遞歸操作 } ``` ::: 2. **解決優先權改變問題:** 當有另外一個優先度較高的 Thread 在等待 Mutex 資源時,**OS 會將正在處理 Mutex 的 Thread 暫時提高到跟等待者一樣高的優先級,讓 CPU 優先處理** > ![](https://i.imgur.com/7C6SzSO.png) | 功能 | RTT_API | 說明 | | -------- | -------- | -------- | | 創建 | `rt_mutex_create()` | 創建一個 mutex | | 初始化 | `rt_mutex_init()` | 隊指定 mutex 做初始化 | | 獲取 | `rt_mutex_take()` | 指定時間內 獲取 mutex,如果 **沒有獲取道則會掛起 (wait)** | | 釋放 | `rt_mutex_release()` | 釋放 mutex | | 刪除 | `rt_mutex_delete()`/`rt_mutex_detach()` | delete 是靜態刪除、detach 是動態刪除 | ### Mutex 互斥量 - 實驗 1. 創建 `mutex.h` 檔案:該檔案主要宣告函數給 main 使用 ```c= // mutex.h /* defined the LED2 pin: PD13 */ #define LED2_PIN GET_PIN(D, 13) /* defined the LED3 pin: PD14 */ #define LED3_PIN GET_PIN(D, 14) /* defined the LED4 pin: PD15 */ #define LED4_PIN GET_PIN(D, 15) extern int mutex_creaet(void); extern int thread_create(void); extern void main_mutex_test(void); ``` 2. 創建一個 `mutex_test.c` 檔案:該文件專門處理 mutex 相關測試函數;創建 2 個 thread 不斷開關 LED (**開關前必須獲得 Mutex 鎖**) ```c= // mutex_test.c #include <rtthread.h> #include "drv_common.h" #define LED2_PRIO 8 #define LED2_STACK_SIZE 256 #define LED2_TICKS 2 #define LED3_PRIO 9 #define LED3_STACK_SIZE 256 #define LED3_TICKS 2 static rt_mutex_t led_2_mutex = RT_NULL; static rt_mutex_t led_3_mutex = RT_NULL; typedef void (*entry)(void *parameter); // take mutex 的模板 void take_mutex(rt_mutex_t * const t, rt_base_t pin, int delay_time, const char *name) { uint16_t count; rt_mutex_take(*t, RT_WAITING_FOREVER); for(count = 0; count < 3; count++) { rt_kprintf("Thread name: %s, time: %d\n", name, count); rt_pin_write(pin, PIN_LOW); rt_thread_mdelay(delay_time); rt_pin_write(pin, PIN_HIGH); rt_thread_mdelay(delay_time); } rt_mutex_release(*t); } // 創建 mutex 的模板 static int _mutex_create(rt_mutex_t * const t, const char *name, rt_uint8_t flag) { *t = rt_mutex_create(name, flag); if(*t == RT_NULL) { rt_kprintf("Create mutex %s fail", name); return 0; } return 1; } int mutex_creaet(void) { int res_1 = _mutex_create(&led_2_mutex, "LED_2", RT_IPC_FLAG_FIFO); int res_2 = _mutex_create(&led_3_mutex, "LED_3", RT_IPC_FLAG_FIFO); return res_1 && res_2; } // 創建 thread 的模板 static int _thread_create(const char *name, entry func, const int size, const int prio, const int tick) { rt_thread_t tid; tid = rt_thread_create(name, func, RT_NULL, size, prio, tick ); if(tid == RT_NULL) { rt_kprintf("Create mutex %s fail", name); return 0; } rt_thread_startup(tid); return 1; } static void led_2(void *parameter){ while(1) { take_mutex(&led_2_mutex, LED2_PIN, 50, "led_2_mutex LED_2_THREAD"); } } static void led_3(void *parameter){ while(1) { take_mutex(&led_3_mutex, LED3_PIN, 50, "led_3_mutex LED_3_THREAD"); } } // 創建兩個 Thread int thread_create() { int res_1 = _thread_create("LED_2_THREAD", led_2, LED2_STACK_SIZE, LED2_PRIO, LED2_TICKS); int res_2 = _thread_create("LED_3_THREAD", led_3, LED3_STACK_SIZE, LED3_PRIO, LED3_TICKS); return res_1 && res_2; } void main_mutex_test(void){ while(1) { take_mutex(&led_2_mutex, LED2_PIN, 500, "led_2_mutex Main"); take_mutex(&led_3_mutex, LED3_PIN, 500, "led_3_mutex Main"); } } ``` 3. main 使用 ```c= // main.c #include <rtthread.h> #include <rtdevice.h> #include "drv_common.h" #include "drv_gpio.h" #include "mutex.h" int main(void) { mutex_creaet(); thread_create(); while (1) { main_mutex_test(); } } ``` 下圖結果為 Main Thread 取得 `led_2_mutex`:所以同樣有 `led_2_mutex` 的 LED_2_Thread 會進行休眠等待,而 LED_3_Thread 正常運行 > ![](https://i.imgur.com/cU0O7kJ.png) :::info * 如果需要 Thread 對 LED 輪流閃爍只需要修改 mutex 的目標 ```c= static void led_2(void *parameter){ while(1) { // 使用同樣的 mutex 鎖 take_mutex(&led_2_mutex, LED2_PIN, 50, "led_2_mutex LED_2_THREAD"); } } static void led_3(void *parameter){ while(1) { // 使用同樣的 mutex 鎖 take_mutex(&led_2_mutex, LED3_PIN, 50, "led_3_mutex LED_3_THREAD"); } } ``` ::: ### Event 事件 * Evnet 事件是一個標記,**標註了某一個行為被觸發,觸發後會 ++通知所有等待該事件的 Thread++,而具體要使用哪個 Thread 來處理事件要看使用者處理** > ![](https://i.imgur.com/l0KgHN1.png) * 使用 Event 同步的特性如下 1. **可以 `1 對 1`、`1 對多`** 2. **事件沒有緩存對列**:當下獲取到的就是最新事件 :::info * RTT 的事件有 32 Bit 可以設定,每個 bit 代表了一個事件,需要多事件時可以同時寫入這個位元組 ::: | 功能 | RTT_API | 說明 | | -------- | -------- | -------- | | 創建 | rt_event_create() | 創建一個 event | | 初始化 | rt_event_init() | 隊指定 event 做初始化 | | 獲取 | rt_event_recv() | 指定時間內 主動獲取 event,如果 **沒有獲取道則會掛起 (wait)** | | 傳遞 | rt_event_send() | 發送 event 事件 | | 刪除 | rt_event_delete()/rt_event_detach() | delete 是靜態刪除、detach 是動態刪除 | ### 硬體中斷 & 同步機制 * 這裡有一個重點是,**不管哪種同步機制,都不可以在中斷時進行有關 wait (堵塞) 的行為**,發出 release 訊號還是可以的,但是獲取則不行 :::warning **在中斷時 Mutex 更是 ++不能獲取++,++也不能釋放++**,**因為進入中斷時不一定是 Mutex 資源所屬的 Thread**,這會導致中斷被卡死 ::: ## Thread 通訊 同步機制簡單來說就是傳遞一個標記,不能傳遞更多的訊息 ### Mailbox 郵箱 * Mailbox 機制是最常見的通訊方式,OS 是充當 **郵局** 的腳色;Mailbox 有以下特色 1. 郵箱中的信 **一般是按照 FIFO 的規則** (可使用 flag 參數決定不同數據結構) 2. 郵箱 **有大小限制** 3. **每個 Thread 都有一個專用郵箱** > ![](https://i.imgur.com/wJWu8NW.png) * RTT 的郵箱規定:其內容只能是 4 Byte,**這個大小剛好可以傳遞一個指針** :::info 這個限制主要是為了效率,限制每個郵件內容都為 4 個 Byte 代表了 **相同的處理時間 (這時間是可以準確計算),這對 RTOS 相當重要 !** ::: * RTT 的郵箱也有分為靜態、動態創建,^1.^ 靜態對象的內存空間是永遠被占用、^2.^ 動態空間的內存對象是可以被釋放的 | 功能 | RTT_API | 說明 | | -------- | -------- | -------- | | 創建 | rt_mb_create() | 創建一個 mb | | 初始化 | rt_mb_init() | 隊指定 mb 做初始化 | | 獲取 | rt_mb_recv() | 指定時間內主動獲取 mb | | 傳遞 | rt_mb_send()/rt_mb_send_wait() | 發送 mb 事件,多了 wait 關鍵字則會等待固定 Tick 時間,如果 **沒有發送則會掛起 (wait)** | | 刪除 | rt_mb_delete()/rt_mb_detach() | delete 是靜態刪除、detach 是動態刪除 | :::warning * 這裡較為不同的是,如果 **郵箱已滿 Send 可能也會導致 Wait (掛起) 狀態** ::: ### MailBox - 實驗 1. 創建 `box_mail.h`:宣告 main 會使用到的函數 ```c= // box_mail.h /* defined the LED2 pin: PD13 */ #define LED2_PIN GET_PIN(D, 13) /* defined the LED3 pin: PD14 */ #define LED3_PIN GET_PIN(D, 14) /* defined the LED4 pin: PD15 */ #define LED4_PIN GET_PIN(D, 15) extern void main_msg_test(void); extern int create_mail(void); extern int thread_create_for_mail(void); ``` 2. 創建 `box_mail_test.c` 檔案:該檔案負責創建 2 個 Thread 來開關 LED (**透過收到的訊息來決定要開還是關**) ```c= // box_mail_test.c #include <rtthread.h> #include "drv_common.h" #include "box_mail.h" #define LED2_PRIO 8 #define LED2_STACK_SIZE 256 #define LED2_TICKS 2 #define LED2_ACK 0x01 #define LED3_PRIO 9 #define LED3_STACK_SIZE 256 #define LED3_TICKS 2 #define LED3_ACK 0x02 #define MAX_MSG_NUM 32 static rt_mq_t mq_led_2 = RT_NULL; static rt_mq_t mq_led_3 = RT_NULL; static rt_mailbox_t box_led2 = RT_NULL; static rt_mailbox_t box_led3 = RT_NULL; static char LED_ON[] = "On"; static char LED_OFF[] = "Off"; enum THREAD_ID { MAIN = 1, LED_2_THREAD, LED_3_THREAD, UNKNOW, }; struct led_msg_t { enum THREAD_ID target_thread; char info[4]; rt_mailbox_t ack; }; typedef void (*entry)(void *parameter); // 創建 queue 的模板 static int _create_queue(rt_mq_t * const queue, const char *name, int flag) { *queue = rt_mq_create(name, sizeof(struct led_msg_t), MAX_MSG_NUM, flag); if(*queue == RT_NULL) { rt_kprintf("Create queue %s fail\n", name); return 0; } return 1; } // 創建 mail 的模板 static int _create_mail(rt_mailbox_t * const box, const char *name, int flag) { *box = rt_mb_create(name, MAX_MSG_NUM * 4, flag); if(*box == RT_NULL) { rt_kprintf("Create mail %s fail\n", name); return 0; } return 1; } int create_mail(void) { int res_1 = _create_queue(&mq_led_2, "LED_2_QUEUE", RT_IPC_FLAG_FIFO); int res_2 = _create_queue(&mq_led_3, "LED_3_QUEUE", RT_IPC_FLAG_FIFO); if(!res_1 || !res_2) { return 1; } res_1 = _create_mail(&box_led2, "LED_2_MAIL_BOX", RT_IPC_FLAG_FIFO); res_2 = _create_mail(&box_led3, "LED_3_MAIL_BOX", RT_IPC_FLAG_FIFO); return res_1 && res_2; } // 創建 thread 的模板 static int _msg_thread_create(const char *name, entry func, const int size, const int prio, const int tick) { rt_thread_t tid; tid = rt_thread_create(name, func, RT_NULL, size, prio, tick ); if(tid == RT_NULL) { rt_kprintf("Create mutex %s fail\n", name); return 0; } rt_thread_startup(tid); return 1; } // 接收 mail's msg 的模板 static void receive_msg(const rt_mq_t * const queue, enum THREAD_ID thread_id, rt_base_t pin, int ack) { struct led_msg_t msg; // 等待指定 queue 的 msg 訊息 rt_mq_recv(*queue, &msg, sizeof(struct led_msg_t), RT_WAITING_FOREVER); // 判斷是否屬於該 thread if(msg.target_thread != thread_id) { rt_kprintf("Error thread: %d\n", thread_id); return; } if(rt_strcmp(msg.info, LED_ON)) { rt_pin_write(pin, PIN_LOW); rt_thread_mdelay(500); rt_mb_send(msg.ack, ack); rt_kprintf("THREAD_ID: %d open led\n", thread_id); } else if(rt_strcmp(msg.info, LED_OFF)) { rt_pin_write(pin, PIN_HIGH); rt_thread_mdelay(500); rt_mb_send(msg.ack, ack); rt_kprintf("THREAD_ID: %d close led\n", thread_id); } else { rt_kprintf("Unknow msg: %s\n", msg.info); } } static void led_2_thread_recv(void) { while(1) { receive_msg(&mq_led_2, LED_2_THREAD, LED2_PIN, LED2_ACK); } } static void led_3_thread_recv(void) { while(1) { receive_msg(&mq_led_3, LED_3_THREAD, LED3_PIN, LED3_ACK); } } // 創建 mail int thread_create_for_mail(void) { int res_1 = _msg_thread_create("LED_2_THREAD_MSG", (void *)led_2_thread_recv, LED2_STACK_SIZE, LED2_PRIO, LED2_TICKS); int res_2 = _msg_thread_create("LED_3_THREAD_MSG", (void *)led_3_thread_recv, LED3_STACK_SIZE, LED3_PRIO, LED3_TICKS); return res_1 && res_2; } // 透過 main 來發送訊息給 Mail 來控制 LED static void send_msg_to_led(enum THREAD_ID target_thread, rt_mailbox_t *reply_ack, rt_mq_t *queue, int target_ack) { uint16_t count; struct led_msg_t msg = {0}; for(count = 0; count < 3; count++) { rt_ubase_t value; // Open LED msg.target_thread = target_thread; rt_strcpy(msg.info, LED_ON); msg.ack = *reply_ack; // 對指定 queue 傳送訊息 ! rt_mq_send(*queue, &msg, sizeof(struct led_msg_t)); // 設定接收的 mailbox rt_mb_recv(*reply_ack, &value, RT_WAITING_FOREVER); if(value != target_ack) { rt_kprintf("Thread %d get err msg\n", target_thread); continue; } // Close LED msg.target_thread = target_thread; rt_strcpy(msg.info, LED_OFF); msg.ack = *reply_ack; rt_mq_send(*queue, &msg, sizeof(struct led_msg_t)); rt_mb_recv(*reply_ack, &value, RT_WAITING_FOREVER); } } void main_msg_test(void) { send_msg_to_led(LED_2_THREAD, &box_led2 ,&mq_led_2, LED2_ACK); send_msg_to_led(LED_3_THREAD, &box_led3 ,&mq_led_3, LED3_ACK); } ``` 3. Main 使用呼叫測試 ```c= // main.c #include <rtthread.h> #include <rtdevice.h> #include "drv_common.h" #include "drv_gpio.h" #include "mutex.h" #include "box_mail.h" int main(void) { rt_pin_mode(LED2_PIN, PIN_MODE_OUTPUT); rt_pin_mode(LED3_PIN, PIN_MODE_OUTPUT); rt_pin_mode(LED4_PIN, PIN_MODE_OUTPUT); rt_pin_write(LED4_PIN, PIN_HIGH); create_mail(); thread_create_for_mail(); while (1) { main_msg_test(); } } ``` > ![](https://i.imgur.com/QN8PNSw.png) :::info * **Mail Box 看似 thread 在等待 msg,但其實 Thread 是在等待 OS 內核的某個 Queue 中的訊息** 消息對列 (以這邊來說就是 `rt_mq_t`) **由使用者創建,在這之後就是由 OS 來維護**,其他 Thread 只要將消息丟入 消息對列,OS 就會將該消息轉發 > OS 並不會在意是哪個 Thread 對 Queue 填充消息,所以需要注意判斷 ::: ### MessageQueue 消息隊列 * MessageQueue 消息隊列與 Mb 郵件最不一樣的是,**MessageQueue 發送的內容是不定長度的信息 (Message)**,再將這些 Message 組成隊列 1. MessageQueue 遵從 Queue 的數據結構特性 FIFO 2. 同樣有大小空間限制 3. 需指定 Msg 最大長度 > ![](https://i.imgur.com/RQkF0gd.png) :::success **Queue 可以插入 ==緊急消息== 到隊列最前方** ::: * MessageQueue & OS 運行的概念如下 1. 鏈結一個空閒列表,當需要發送訊息時,就會取出一個空的消息 (Message) 2. 將數據 copy 進 Msg 空間 3. 將 Msg 放置到另外一個待處理的隊列中 4. 當有 Thread 要取出數據時,再將數據 copy 進目標 Thread 的空間 (兩次拷貝) > ![](https://i.imgur.com/kghy5UD.png) | 功能 | RTT_API | 說明 | | -------- | -------- | -------- | | 創建 | rt_mq_create() | 創建一個 mq | | 初始化 | rt_mq_init() | 隊指定 mq 做初始化 | | 獲取 | rt_mq_recv() | 指定時間內主動獲取 mq | | 傳遞 | rt_mq_send()/**rt_mq_send_urgent**() | 發送 mq 事件,多了 **urgent 關鍵字則會將消息插入到隊列最前方** | | 刪除 | rt_mq_delete()/rt_mq_detach() | delete 是靜態刪除、detach 是動態刪除 | ### 信號 :::warning **請不要與 Semaphore 信號量 搞混 !!** ::: * 這裡的信號是模擬了一個硬體的中斷,通常實現方法是使用新片提供的 **軟體中斷指令,所以其本質上也是一個中斷** 1. **安裝處理 Function:** 既然是像中斷,那就跟中斷的使用方式類似,中斷是在運行前先設定中斷向量,所以信號則是在 Thread **安裝好要處理的函數後**,自動觸發 > 每個 Thread 都可以有自己的信號處理函數 :::info POSIX 的表準用戶 Thread 只能接收到兩個信號 **SIGUSR1(10)**、**SIGUSR2(12)**,所以 RTT OS 有提供對應的 API ::: 2. **不存在 wait 問題:** 如同中斷一樣,只是有沒有被觸發,一般來說 Thread 仍可繼續運行 (但仍有 Function 可讓 Thread 等待) 3. 信號與互斥量一樣,有 **==所有者==** 的概念 !! **Thread 在接收到信號後,處理資料的地方就是該 Thread 私有的 Stack,所以要特別注意 Stack 的大小** * RTT 預設不開啟信號機制 | 功能 | RTT_API | 說明 | | -------- | -------- | -------- | | 安裝 | rt_signal_install() | 安裝信號被觸發時要處理的函數,信號只有 **SIGUSR1(10)**、**SIGUSR2(12)** | | 堵塞 | rt_singal_mask() | 堵塞,信號只有 **SIGUSR1(10)**、**SIGUSR2(12)** | | 解除堵塞 | rt_singal_mask() | 解除堵塞,信號只有 **SIGUSR1(10)**、**SIGUSR2(12)** | | 發送 | rt_thread_kill() | 接收信號的 Thread 句柄,信號只有 **SIGUSR1(10)**、**SIGUSR2(12)** | | 等待 | rt_singal_wait() | 指定等待的訊號 | ## Appendix & FAQ :::info ::: ###### tags: `RTOS`