# RT-Thread IPC Message Queue
- 特性:可接受不固定長度的訊息
## 結構
:::success
**File:** rtdef.h
:::
```c=663
#ifdef RT_USING_MESSAGEQUEUE
/**
* message queue structure
*/
struct rt_messagequeue
{
struct rt_ipc_object parent; /**< inherit from ipc_object */
void *msg_pool; /**< start address of message queue */
rt_uint16_t msg_size; /**< message size of each message */
rt_uint16_t max_msgs; /**< max number of messages */
rt_uint16_t entry; /**< index of messages in the queue */
void *msg_queue_head; /**< list head */
void *msg_queue_tail; /**< list tail */
void *msg_queue_free; /**< pointer indicated the free node of queue */
};
typedef struct rt_messagequeue *rt_mq_t;
#endif
```
---
## 建立 message queue
### 動態記憶體管理
#### `rt_mq_create`
| 功能 | 回傳值 |
| --- | ------ |
| 建立 message queue | message queue |
| `*name` | `msg_size` | `max_msgs` | `flag` |
| ------- | ---------- | ---------- | ------ |
| 名字 | 一封訊息的大小 | 訊息數上限 | FIFO / PRIO |
```c=1852
/**
* This function will create a message queue object from system resource
*
* @param name the name of message queue
* @param msg_size the size of message
* @param max_msgs the maximum number of message in queue
* @param flag the flag of message queue
*
* @return the created message queue, RT_NULL on error happen
*/
rt_mq_t rt_mq_create(const char *name,
rt_size_t msg_size,
rt_size_t max_msgs,
rt_uint8_t flag)
{
struct rt_messagequeue *mq;
struct rt_mq_message *head;
register rt_base_t temp;
RT_DEBUG_NOT_IN_INTERRUPT;
/* allocate object */
mq = (rt_mq_t)rt_object_allocate(RT_Object_Class_MessageQueue, name);
if (mq == RT_NULL)
return mq;
/* set parent */
mq->parent.parent.flag = flag;
/* init ipc object */
rt_ipc_object_init(&(mq->parent));
```
- 首先要一塊物件給 message queue,並同時填入 flag 及初始化
```c=+
/* init message queue */
/* get correct message size */
mq->msg_size = RT_ALIGN(msg_size, RT_ALIGN_SIZE);
mq->max_msgs = max_msgs;
```
- 設定訊息的大小,與訊息數量的上限
:::info
`RT_ALGIN` 目的在對齊訊息的大小,根據不同板子所定義不同的 `RT_ALIGN_SIZE` 會有所差別
`#define RT_ALIGN(size, align) (((size) + (align) - 1) & ~((align) - 1))`
- 如傳進來的是 `RT_ALGIN(7,8)` 則結果是 8
- 如傳進來的是 `RT_ALGIN(13,4)` 則結果是 16
- 即結果為大於後值的**最小倍數**
:::
```c=+
/* allocate message pool */
mq->msg_pool = RT_KERNEL_MALLOC((mq->msg_size + sizeof(struct rt_mq_message)) * mq->max_msgs);
if (mq->msg_pool == RT_NULL)
{
rt_mq_delete(mq);
return RT_NULL;
}
```
- 接著需 allocate 適當的記憶體存放訊息
```c=+
/* init message list */
mq->msg_queue_head = RT_NULL;
mq->msg_queue_tail = RT_NULL;
/* init message empty list */
mq->msg_queue_free = RT_NULL;
for (temp = 0; temp < mq->max_msgs; temp ++)
{
head = (struct rt_mq_message *)((rt_uint8_t *)mq->msg_pool +
temp * (mq->msg_size + sizeof(struct rt_mq_message)));
head->next = mq->msg_queue_free;
mq->msg_queue_free = head;
}
```
- 先將頭尾設為空,再一塊一塊的將 `msg_pool` 插在 free list 的第一顆
```c=+
/* the initial entry is zero */
mq->entry = 0;
return mq;
}
RTM_EXPORT(rt_mq_create);
```
- 最後設定 `entry` 為 0
---
### 靜態記憶體管理
#### `rt_mq_init`
| 功能 | 回傳值 |
| --- | ------ |
| 初始化 message queue | `RT_EOK` |
| `mq` | `*name` | `*msgpool` | `msg_size` | `pool_size` | `flag` |
| ---- | ------- | ---------- | ---------- | ----------- | ------ |
| message queue 本體 | 名字 | 存放訊息的位址 | 一封訊息的大小 | 存放訊息的大小 | FIFO / PRIO |
```c=1764
/**
* This function will initialize a message queue and put it under control of
* resource management.
*
* @param mq the message object
* @param name the name of message queue
* @param msgpool the beginning address of buffer to save messages
* @param msg_size the maximum size of message
* @param pool_size the size of buffer to save messages
* @param flag the flag of message queue
*
* @return the operation status, RT_EOK on successful
*/
rt_err_t rt_mq_init(rt_mq_t mq,
const char *name,
void *msgpool,
rt_size_t msg_size,
rt_size_t pool_size,
rt_uint8_t flag)
{
struct rt_mq_message *head;
register rt_base_t temp;
/* parameter check */
RT_ASSERT(mq != RT_NULL);
/* init object */
rt_object_init(&(mq->parent.parent), RT_Object_Class_MessageQueue, name);
/* set parent flag */
mq->parent.parent.flag = flag;
/* init ipc object */
rt_ipc_object_init(&(mq->parent));
```
- 這裡就不需要去要一塊物件,直接拿來用即可
```c=+
/* set messasge pool */
mq->msg_pool = msgpool;
/* get correct message size */
mq->msg_size = RT_ALIGN(msg_size, RT_ALIGN_SIZE);
mq->max_msgs = pool_size / (mq->msg_size + sizeof(struct rt_mq_message));
/* init message list */
mq->msg_queue_head = RT_NULL;
mq->msg_queue_tail = RT_NULL;
/* init message empty list */
mq->msg_queue_free = RT_NULL;
for (temp = 0; temp < mq->max_msgs; temp ++)
{
head = (struct rt_mq_message *)((rt_uint8_t *)mq->msg_pool +
temp * (mq->msg_size + sizeof(struct rt_mq_message)));
head->next = mq->msg_queue_free;
mq->msg_queue_free = head;
}
/* the initial entry is zero */
mq->entry = 0;
return RT_EOK;
}
RTM_EXPORT(rt_mq_init);
```
- 其餘的動作皆與上面相同
---
## 刪除 message queue
### 動態記憶體管理
#### `rt_mq_delete`
| 功能 | 回傳值 |
| --- | ------ |
| 刪除 message queue | `RT_EOK` |
| `mq` |
| ---- |
| 欲刪除的 message queue |
```c=1920
/**
* This function will delete a message queue object and release the memory
*
* @param mq the message queue object
*
* @return the error code
*/
rt_err_t rt_mq_delete(rt_mq_t mq)
{
RT_DEBUG_NOT_IN_INTERRUPT;
/* parameter check */
RT_ASSERT(mq != RT_NULL);
RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue);
RT_ASSERT(rt_object_is_systemobject(&mq->parent.parent) == RT_FALSE);
/* resume all suspended thread */
rt_ipc_list_resume_all(&(mq->parent.suspend_thread));
```
- 先把正在等待收訊息的 thread 叫醒
```c=+
/* free message queue pool */
RT_KERNEL_FREE(mq->msg_pool);
/* delete message queue object */
rt_object_delete(&(mq->parent.parent));
return RT_EOK;
}
RTM_EXPORT(rt_mq_delete);
```
- 接著 free `msg_pool`,並刪除物件
---
### 靜態記憶體管理
#### `rt_mq_detach`
| 功能 | 回傳值 |
| --- | ------ |
| 刪除 message queue | `RT_EOK` |
| `mq` |
| ---- |
| 欲刪除的 message queue |
```c=1827
/**
* This function will detach a message queue object from resource management
*
* @param mq the message queue object
*
* @return the operation status, RT_EOK on successful
*/
rt_err_t rt_mq_detach(rt_mq_t mq)
{
/* parameter check */
RT_ASSERT(mq != RT_NULL);
RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue);
RT_ASSERT(rt_object_is_systemobject(&mq->parent.parent));
/* resume all suspended thread */
rt_ipc_list_resume_all(&mq->parent.suspend_thread);
/* detach message queue object */
rt_object_detach(&(mq->parent.parent));
return RT_EOK;
}
RTM_EXPORT(rt_mq_detach);
```
- 這裡的 `msg_pool` 就不需要 free
---
## 傳送訊息
### `rt_mq_send`
| 功能 | 回傳值 |
| --- | ------ |
| 傳送訊息 | `RT_EOK`|
| `mq` | `*buffer` | `size` |
| ---- | --------- | ------ |
| 欲傳送的 message queue | 訊息資料 | 訊息大小 |
```c=1950
/**
* This function will send a message to message queue object, if there are
* threads suspended on message queue object, it will be waked up.
*
* @param mq the message queue object
* @param buffer the message
* @param size the size of buffer
*
* @return the error code
*/
rt_err_t rt_mq_send(rt_mq_t mq, void *buffer, rt_size_t size)
{
register rt_ubase_t temp;
struct rt_mq_message *msg;
/* parameter check */
RT_ASSERT(mq != RT_NULL);
RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue);
RT_ASSERT(buffer != RT_NULL);
RT_ASSERT(size != 0);
/* greater than one message size */
if (size > mq->msg_size)
return -RT_ERROR;
RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent)));
/* disable interrupt */
temp = rt_hw_interrupt_disable();
/* get a free list, there must be an empty item */
msg = (struct rt_mq_message *)mq->msg_queue_free;
/* message queue is full */
if (msg == RT_NULL)
{
/* enable interrupt */
rt_hw_interrupt_enable(temp);
return -RT_EFULL;
}
```
- 首先確定 message queue 沒滿(即 free list 不為空)
- 如果滿了,回傳 `FULL`
```c=+
/* move free list pointer */
mq->msg_queue_free = msg->next;
```
- 接著 free list 往下一顆走
```c=+
/* enable interrupt */
rt_hw_interrupt_enable(temp);
/* the msg is the new tailer of list, the next shall be NULL */
msg->next = RT_NULL;
/* copy buffer */
rt_memcpy(msg + 1, buffer, size);
```
- 將訊息填入從 free list 拿的一顆(`msg`),這顆待會是新的尾巴(設定 `next = NULL`)
```c=+
/* disable interrupt */
temp = rt_hw_interrupt_disable();
/* link msg to message queue */
if (mq->msg_queue_tail != RT_NULL)
{
/* if the tail exists, */
((struct rt_mq_message *)mq->msg_queue_tail)->next = msg;
}
```
- 如果尾巴不為空(也就是 message queue 有東西),將原本的尾巴指向 `msg`
```c=+
/* set new tail */
mq->msg_queue_tail = msg;
/* if the head is empty, set head */
if (mq->msg_queue_head == RT_NULL)
mq->msg_queue_head = msg;
```
- 設定新的尾巴
- 如果頭為空(也就是 message queue 為空),設定新的頭
```c=+
/* increase message entry */
mq->entry ++;
```
- 最後更新 `entry`
```c=+
/* resume suspended thread */
if (!rt_list_isempty(&mq->parent.suspend_thread))
{
rt_ipc_list_resume(&(mq->parent.suspend_thread));
/* enable interrupt */
rt_hw_interrupt_enable(temp);
rt_schedule();
return RT_EOK;
}
/* enable interrupt */
rt_hw_interrupt_enable(temp);
return RT_EOK;
}
RTM_EXPORT(rt_mq_send);
```
- 如果有人在等待接收訊息,叫醒他
---
### 傳送緊急訊息
- 與上面不同的是:這裡將新訊息插入**第一顆**
#### `rt_mq_urgent`
| 功能 | 回傳值 |
| --- | ------ |
| 傳送緊急訊息 | `RT_EOK`|
| `mq` | `*buffer` | `size` |
| ---- | --------- | ------ |
| 欲傳送的 message queue | 訊息資料 | 訊息大小 |
```c=2039
/**
* This function will send an urgent message to message queue object, which
* means the message will be inserted to the head of message queue. If there
* are threads suspended on message queue object, it will be waked up.
*
* @param mq the message queue object
* @param buffer the message
* @param size the size of buffer
*
* @return the error code
*/
rt_err_t rt_mq_urgent(rt_mq_t mq, void *buffer, rt_size_t size)
{
register rt_ubase_t temp;
struct rt_mq_message *msg;
/* parameter check */
RT_ASSERT(mq != RT_NULL);
RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue);
RT_ASSERT(buffer != RT_NULL);
RT_ASSERT(size != 0);
/* greater than one message size */
if (size > mq->msg_size)
return -RT_ERROR;
RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent)));
/* disable interrupt */
temp = rt_hw_interrupt_disable();
/* get a free list, there must be an empty item */
msg = (struct rt_mq_message *)mq->msg_queue_free;
/* message queue is full */
if (msg == RT_NULL)
{
/* enable interrupt */
rt_hw_interrupt_enable(temp);
return -RT_EFULL;
}
/* move free list pointer */
mq->msg_queue_free = msg->next;
/* enable interrupt */
rt_hw_interrupt_enable(temp);
/* copy buffer */
rt_memcpy(msg + 1, buffer, size);
```
- 因為要插在第一顆,`next` 就不用設定為空了
```c=+
/* disable interrupt */
temp = rt_hw_interrupt_disable();
/* link msg to the beginning of message queue */
msg->next = mq->msg_queue_head;
mq->msg_queue_head = msg;
```
- 這裡就將新訊息插在第一顆
```c=+
/* if there is no tail */
if (mq->msg_queue_tail == RT_NULL)
mq->msg_queue_tail = msg;
```
- 如果原本的 message queue 為空,設定新的尾巴
```c=+
/* increase message entry */
mq->entry ++;
/* resume suspended thread */
if (!rt_list_isempty(&mq->parent.suspend_thread))
{
rt_ipc_list_resume(&(mq->parent.suspend_thread));
/* enable interrupt */
rt_hw_interrupt_enable(temp);
rt_schedule();
return RT_EOK;
}
/* enable interrupt */
rt_hw_interrupt_enable(temp);
return RT_EOK;
}
RTM_EXPORT(rt_mq_urgent);
```
- 其他的動作皆相同
---
## 接收訊息
### `rt_mq_recv`
| 功能 | 回傳值 |
| --- | ------ |
| 接收訊息| `RT_EOK` |
| `mq` | `*buffer` | `size` | `timeout` |
| ---- | --------- | ------ | --------- |
| 欲訊息 message queue | 訊息存放處 | 訊息存放處大小 | 等待時間(如果需要)|
```c=2123
/**
* This function will receive a message from message queue object, if there is
* no message in message queue object, the thread shall wait for a specified
* time.
*
* @param mq the message queue object
* @param buffer the received message will be saved in
* @param size the size of buffer
* @param timeout the waiting time
*
* @return the error code
*/
rt_err_t rt_mq_recv(rt_mq_t mq,
void *buffer,
rt_size_t size,
rt_int32_t timeout)
{
struct rt_thread *thread;
register rt_ubase_t temp;
struct rt_mq_message *msg;
rt_uint32_t tick_delta;
/* parameter check */
RT_ASSERT(mq != RT_NULL);
RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue);
RT_ASSERT(buffer != RT_NULL);
RT_ASSERT(size != 0);
/* initialize delta tick */
tick_delta = 0;
/* get current thread */
thread = rt_thread_self();
RT_OBJECT_HOOK_CALL(rt_object_trytake_hook, (&(mq->parent.parent)));
/* disable interrupt */
temp = rt_hw_interrupt_disable();
/* for non-blocking call */
if (mq->entry == 0 && timeout == 0)
{
rt_hw_interrupt_enable(temp);
return -RT_ETIMEOUT;
}
/* message queue is empty */
while (mq->entry == 0)
{
RT_DEBUG_IN_THREAD_CONTEXT;
/* reset error number in thread */
thread->error = RT_EOK;
/* no waiting, return timeout */
if (timeout == 0)
{
/* enable interrupt */
rt_hw_interrupt_enable(temp);
thread->error = -RT_ETIMEOUT;
return -RT_ETIMEOUT;
}
```
- 如果 message queue 為空,且不等待,回傳 `TIMEOUT`
```c=+
/* suspend current thread */
rt_ipc_list_suspend(&(mq->parent.suspend_thread),
thread,
mq->parent.parent.flag);
```
- 如要等待,將 thread 掛在等待鏈上
```c=+
/* has waiting time, start thread timer */
if (timeout > 0)
{
/* get the start tick of timer */
tick_delta = rt_tick_get();
RT_DEBUG_LOG(RT_DEBUG_IPC, ("set thread:%s to timer list\n",
thread->name));
/* reset the timeout of thread timer and start it */
rt_timer_control(&(thread->thread_timer),
RT_TIMER_CTRL_SET_TIME,
&timeout);
rt_timer_start(&(thread->thread_timer));
}
```
- 並啟動一個 timer
```c=+
/* enable interrupt */
rt_hw_interrupt_enable(temp);
/* re-schedule */
rt_schedule();
```
- 開始等待,做一次調度
```c=+
/* recv message */
if (thread->error != RT_EOK)
{
/* return error */
return thread->error;
}
/* disable interrupt */
temp = rt_hw_interrupt_disable();
/* if it's not waiting forever and then re-calculate timeout tick */
if (timeout > 0)
{
tick_delta = rt_tick_get() - tick_delta;
timeout -= tick_delta;
if (timeout < 0)
timeout = 0;
}
}
```
- 如跳回來,重新計算 `timeout`
```c=+
/* get message from queue */
msg = (struct rt_mq_message *)mq->msg_queue_head;
/* move message queue head */
mq->msg_queue_head = msg->next;
/* reach queue tail, set to NULL */
if (mq->msg_queue_tail == msg)
mq->msg_queue_tail = RT_NULL;
```
- 如果 message queue 有資料,拿第一顆,同時更新 head(tail,如果需要)
```c=+
/* decrease message entry */
mq->entry --;
```
- 更新 entry
```c=+
/* enable interrupt */
rt_hw_interrupt_enable(temp);
/* copy message */
rt_memcpy(buffer, msg + 1, size > mq->msg_size ? mq->msg_size : size);
```
- 接著複製找到的訊息
```c=+
/* disable interrupt */
temp = rt_hw_interrupt_disable();
/* put message to free list */
msg->next = (struct rt_mq_message *)mq->msg_queue_free;
mq->msg_queue_free = msg;
/* enable interrupt */
rt_hw_interrupt_enable(temp);
RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mq->parent.parent)));
return RT_EOK;
}
RTM_EXPORT(rt_mq_recv);
```
- 最後將 `msg` 插入 free list 的頭
###### tags: `RT-Thread` `kernel` `ipc` `message queue`