# RT-Thread IPC Message Queue - 特性:可接受不固定長度的訊息 ## 結構 :::success **File:** rtdef.h ::: ```c=663 #ifdef RT_USING_MESSAGEQUEUE /** * message queue structure */ struct rt_messagequeue { struct rt_ipc_object parent; /**< inherit from ipc_object */ void *msg_pool; /**< start address of message queue */ rt_uint16_t msg_size; /**< message size of each message */ rt_uint16_t max_msgs; /**< max number of messages */ rt_uint16_t entry; /**< index of messages in the queue */ void *msg_queue_head; /**< list head */ void *msg_queue_tail; /**< list tail */ void *msg_queue_free; /**< pointer indicated the free node of queue */ }; typedef struct rt_messagequeue *rt_mq_t; #endif ``` --- ## 建立 message queue ### 動態記憶體管理 #### `rt_mq_create` | 功能 | 回傳值 | | --- | ------ | | 建立 message queue | message queue | | `*name` | `msg_size` | `max_msgs` | `flag` | | ------- | ---------- | ---------- | ------ | | 名字 | 一封訊息的大小 | 訊息數上限 | FIFO / PRIO | ```c=1852 /** * This function will create a message queue object from system resource * * @param name the name of message queue * @param msg_size the size of message * @param max_msgs the maximum number of message in queue * @param flag the flag of message queue * * @return the created message queue, RT_NULL on error happen */ rt_mq_t rt_mq_create(const char *name, rt_size_t msg_size, rt_size_t max_msgs, rt_uint8_t flag) { struct rt_messagequeue *mq; struct rt_mq_message *head; register rt_base_t temp; RT_DEBUG_NOT_IN_INTERRUPT; /* allocate object */ mq = (rt_mq_t)rt_object_allocate(RT_Object_Class_MessageQueue, name); if (mq == RT_NULL) return mq; /* set parent */ mq->parent.parent.flag = flag; /* init ipc object */ rt_ipc_object_init(&(mq->parent)); ``` - 首先要一塊物件給 message queue,並同時填入 flag 及初始化 ```c=+ /* init message queue */ /* get correct message size */ mq->msg_size = RT_ALIGN(msg_size, RT_ALIGN_SIZE); mq->max_msgs = max_msgs; ``` - 設定訊息的大小,與訊息數量的上限 :::info `RT_ALGIN` 目的在對齊訊息的大小,根據不同板子所定義不同的 `RT_ALIGN_SIZE` 會有所差別 `#define RT_ALIGN(size, align) (((size) + (align) - 1) & ~((align) - 1))` - 如傳進來的是 `RT_ALGIN(7,8)` 則結果是 8 - 如傳進來的是 `RT_ALGIN(13,4)` 則結果是 16 - 即結果為大於後值的**最小倍數** ::: ```c=+ /* allocate message pool */ mq->msg_pool = RT_KERNEL_MALLOC((mq->msg_size + sizeof(struct rt_mq_message)) * mq->max_msgs); if (mq->msg_pool == RT_NULL) { rt_mq_delete(mq); return RT_NULL; } ``` - 接著需 allocate 適當的記憶體存放訊息 ```c=+ /* init message list */ mq->msg_queue_head = RT_NULL; mq->msg_queue_tail = RT_NULL; /* init message empty list */ mq->msg_queue_free = RT_NULL; for (temp = 0; temp < mq->max_msgs; temp ++) { head = (struct rt_mq_message *)((rt_uint8_t *)mq->msg_pool + temp * (mq->msg_size + sizeof(struct rt_mq_message))); head->next = mq->msg_queue_free; mq->msg_queue_free = head; } ``` - 先將頭尾設為空,再一塊一塊的將 `msg_pool` 插在 free list 的第一顆 ```c=+ /* the initial entry is zero */ mq->entry = 0; return mq; } RTM_EXPORT(rt_mq_create); ``` - 最後設定 `entry` 為 0 --- ### 靜態記憶體管理 #### `rt_mq_init` | 功能 | 回傳值 | | --- | ------ | | 初始化 message queue | `RT_EOK` | | `mq` | `*name` | `*msgpool` | `msg_size` | `pool_size` | `flag` | | ---- | ------- | ---------- | ---------- | ----------- | ------ | | message queue 本體 | 名字 | 存放訊息的位址 | 一封訊息的大小 | 存放訊息的大小 | FIFO / PRIO | ```c=1764 /** * This function will initialize a message queue and put it under control of * resource management. * * @param mq the message object * @param name the name of message queue * @param msgpool the beginning address of buffer to save messages * @param msg_size the maximum size of message * @param pool_size the size of buffer to save messages * @param flag the flag of message queue * * @return the operation status, RT_EOK on successful */ rt_err_t rt_mq_init(rt_mq_t mq, const char *name, void *msgpool, rt_size_t msg_size, rt_size_t pool_size, rt_uint8_t flag) { struct rt_mq_message *head; register rt_base_t temp; /* parameter check */ RT_ASSERT(mq != RT_NULL); /* init object */ rt_object_init(&(mq->parent.parent), RT_Object_Class_MessageQueue, name); /* set parent flag */ mq->parent.parent.flag = flag; /* init ipc object */ rt_ipc_object_init(&(mq->parent)); ``` - 這裡就不需要去要一塊物件,直接拿來用即可 ```c=+ /* set messasge pool */ mq->msg_pool = msgpool; /* get correct message size */ mq->msg_size = RT_ALIGN(msg_size, RT_ALIGN_SIZE); mq->max_msgs = pool_size / (mq->msg_size + sizeof(struct rt_mq_message)); /* init message list */ mq->msg_queue_head = RT_NULL; mq->msg_queue_tail = RT_NULL; /* init message empty list */ mq->msg_queue_free = RT_NULL; for (temp = 0; temp < mq->max_msgs; temp ++) { head = (struct rt_mq_message *)((rt_uint8_t *)mq->msg_pool + temp * (mq->msg_size + sizeof(struct rt_mq_message))); head->next = mq->msg_queue_free; mq->msg_queue_free = head; } /* the initial entry is zero */ mq->entry = 0; return RT_EOK; } RTM_EXPORT(rt_mq_init); ``` - 其餘的動作皆與上面相同 --- ## 刪除 message queue ### 動態記憶體管理 #### `rt_mq_delete` | 功能 | 回傳值 | | --- | ------ | | 刪除 message queue | `RT_EOK` | | `mq` | | ---- | | 欲刪除的 message queue | ```c=1920 /** * This function will delete a message queue object and release the memory * * @param mq the message queue object * * @return the error code */ rt_err_t rt_mq_delete(rt_mq_t mq) { RT_DEBUG_NOT_IN_INTERRUPT; /* parameter check */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(rt_object_is_systemobject(&mq->parent.parent) == RT_FALSE); /* resume all suspended thread */ rt_ipc_list_resume_all(&(mq->parent.suspend_thread)); ``` - 先把正在等待收訊息的 thread 叫醒 ```c=+ /* free message queue pool */ RT_KERNEL_FREE(mq->msg_pool); /* delete message queue object */ rt_object_delete(&(mq->parent.parent)); return RT_EOK; } RTM_EXPORT(rt_mq_delete); ``` - 接著 free `msg_pool`,並刪除物件 --- ### 靜態記憶體管理 #### `rt_mq_detach` | 功能 | 回傳值 | | --- | ------ | | 刪除 message queue | `RT_EOK` | | `mq` | | ---- | | 欲刪除的 message queue | ```c=1827 /** * This function will detach a message queue object from resource management * * @param mq the message queue object * * @return the operation status, RT_EOK on successful */ rt_err_t rt_mq_detach(rt_mq_t mq) { /* parameter check */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(rt_object_is_systemobject(&mq->parent.parent)); /* resume all suspended thread */ rt_ipc_list_resume_all(&mq->parent.suspend_thread); /* detach message queue object */ rt_object_detach(&(mq->parent.parent)); return RT_EOK; } RTM_EXPORT(rt_mq_detach); ``` - 這裡的 `msg_pool` 就不需要 free --- ## 傳送訊息 ### `rt_mq_send` | 功能 | 回傳值 | | --- | ------ | | 傳送訊息 | `RT_EOK`| | `mq` | `*buffer` | `size` | | ---- | --------- | ------ | | 欲傳送的 message queue | 訊息資料 | 訊息大小 | ```c=1950 /** * This function will send a message to message queue object, if there are * threads suspended on message queue object, it will be waked up. * * @param mq the message queue object * @param buffer the message * @param size the size of buffer * * @return the error code */ rt_err_t rt_mq_send(rt_mq_t mq, void *buffer, rt_size_t size) { register rt_ubase_t temp; struct rt_mq_message *msg; /* parameter check */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(buffer != RT_NULL); RT_ASSERT(size != 0); /* greater than one message size */ if (size > mq->msg_size) return -RT_ERROR; RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent))); /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* get a free list, there must be an empty item */ msg = (struct rt_mq_message *)mq->msg_queue_free; /* message queue is full */ if (msg == RT_NULL) { /* enable interrupt */ rt_hw_interrupt_enable(temp); return -RT_EFULL; } ``` - 首先確定 message queue 沒滿(即 free list 不為空) - 如果滿了,回傳 `FULL` ```c=+ /* move free list pointer */ mq->msg_queue_free = msg->next; ``` - 接著 free list 往下一顆走 ```c=+ /* enable interrupt */ rt_hw_interrupt_enable(temp); /* the msg is the new tailer of list, the next shall be NULL */ msg->next = RT_NULL; /* copy buffer */ rt_memcpy(msg + 1, buffer, size); ``` - 將訊息填入從 free list 拿的一顆(`msg`),這顆待會是新的尾巴(設定 `next = NULL`) ```c=+ /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* link msg to message queue */ if (mq->msg_queue_tail != RT_NULL) { /* if the tail exists, */ ((struct rt_mq_message *)mq->msg_queue_tail)->next = msg; } ``` - 如果尾巴不為空(也就是 message queue 有東西),將原本的尾巴指向 `msg` ```c=+ /* set new tail */ mq->msg_queue_tail = msg; /* if the head is empty, set head */ if (mq->msg_queue_head == RT_NULL) mq->msg_queue_head = msg; ``` - 設定新的尾巴 - 如果頭為空(也就是 message queue 為空),設定新的頭 ```c=+ /* increase message entry */ mq->entry ++; ``` - 最後更新 `entry` ```c=+ /* resume suspended thread */ if (!rt_list_isempty(&mq->parent.suspend_thread)) { rt_ipc_list_resume(&(mq->parent.suspend_thread)); /* enable interrupt */ rt_hw_interrupt_enable(temp); rt_schedule(); return RT_EOK; } /* enable interrupt */ rt_hw_interrupt_enable(temp); return RT_EOK; } RTM_EXPORT(rt_mq_send); ``` - 如果有人在等待接收訊息,叫醒他 --- ### 傳送緊急訊息 - 與上面不同的是:這裡將新訊息插入**第一顆** #### `rt_mq_urgent` | 功能 | 回傳值 | | --- | ------ | | 傳送緊急訊息 | `RT_EOK`| | `mq` | `*buffer` | `size` | | ---- | --------- | ------ | | 欲傳送的 message queue | 訊息資料 | 訊息大小 | ```c=2039 /** * This function will send an urgent message to message queue object, which * means the message will be inserted to the head of message queue. If there * are threads suspended on message queue object, it will be waked up. * * @param mq the message queue object * @param buffer the message * @param size the size of buffer * * @return the error code */ rt_err_t rt_mq_urgent(rt_mq_t mq, void *buffer, rt_size_t size) { register rt_ubase_t temp; struct rt_mq_message *msg; /* parameter check */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(buffer != RT_NULL); RT_ASSERT(size != 0); /* greater than one message size */ if (size > mq->msg_size) return -RT_ERROR; RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent))); /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* get a free list, there must be an empty item */ msg = (struct rt_mq_message *)mq->msg_queue_free; /* message queue is full */ if (msg == RT_NULL) { /* enable interrupt */ rt_hw_interrupt_enable(temp); return -RT_EFULL; } /* move free list pointer */ mq->msg_queue_free = msg->next; /* enable interrupt */ rt_hw_interrupt_enable(temp); /* copy buffer */ rt_memcpy(msg + 1, buffer, size); ``` - 因為要插在第一顆,`next` 就不用設定為空了 ```c=+ /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* link msg to the beginning of message queue */ msg->next = mq->msg_queue_head; mq->msg_queue_head = msg; ``` - 這裡就將新訊息插在第一顆 ```c=+ /* if there is no tail */ if (mq->msg_queue_tail == RT_NULL) mq->msg_queue_tail = msg; ``` - 如果原本的 message queue 為空,設定新的尾巴 ```c=+ /* increase message entry */ mq->entry ++; /* resume suspended thread */ if (!rt_list_isempty(&mq->parent.suspend_thread)) { rt_ipc_list_resume(&(mq->parent.suspend_thread)); /* enable interrupt */ rt_hw_interrupt_enable(temp); rt_schedule(); return RT_EOK; } /* enable interrupt */ rt_hw_interrupt_enable(temp); return RT_EOK; } RTM_EXPORT(rt_mq_urgent); ``` - 其他的動作皆相同 --- ## 接收訊息 ### `rt_mq_recv` | 功能 | 回傳值 | | --- | ------ | | 接收訊息| `RT_EOK` | | `mq` | `*buffer` | `size` | `timeout` | | ---- | --------- | ------ | --------- | | 欲訊息 message queue | 訊息存放處 | 訊息存放處大小 | 等待時間(如果需要)| ```c=2123 /** * This function will receive a message from message queue object, if there is * no message in message queue object, the thread shall wait for a specified * time. * * @param mq the message queue object * @param buffer the received message will be saved in * @param size the size of buffer * @param timeout the waiting time * * @return the error code */ rt_err_t rt_mq_recv(rt_mq_t mq, void *buffer, rt_size_t size, rt_int32_t timeout) { struct rt_thread *thread; register rt_ubase_t temp; struct rt_mq_message *msg; rt_uint32_t tick_delta; /* parameter check */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(buffer != RT_NULL); RT_ASSERT(size != 0); /* initialize delta tick */ tick_delta = 0; /* get current thread */ thread = rt_thread_self(); RT_OBJECT_HOOK_CALL(rt_object_trytake_hook, (&(mq->parent.parent))); /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* for non-blocking call */ if (mq->entry == 0 && timeout == 0) { rt_hw_interrupt_enable(temp); return -RT_ETIMEOUT; } /* message queue is empty */ while (mq->entry == 0) { RT_DEBUG_IN_THREAD_CONTEXT; /* reset error number in thread */ thread->error = RT_EOK; /* no waiting, return timeout */ if (timeout == 0) { /* enable interrupt */ rt_hw_interrupt_enable(temp); thread->error = -RT_ETIMEOUT; return -RT_ETIMEOUT; } ``` - 如果 message queue 為空,且不等待,回傳 `TIMEOUT` ```c=+ /* suspend current thread */ rt_ipc_list_suspend(&(mq->parent.suspend_thread), thread, mq->parent.parent.flag); ``` - 如要等待,將 thread 掛在等待鏈上 ```c=+ /* has waiting time, start thread timer */ if (timeout > 0) { /* get the start tick of timer */ tick_delta = rt_tick_get(); RT_DEBUG_LOG(RT_DEBUG_IPC, ("set thread:%s to timer list\n", thread->name)); /* reset the timeout of thread timer and start it */ rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &timeout); rt_timer_start(&(thread->thread_timer)); } ``` - 並啟動一個 timer ```c=+ /* enable interrupt */ rt_hw_interrupt_enable(temp); /* re-schedule */ rt_schedule(); ``` - 開始等待,做一次調度 ```c=+ /* recv message */ if (thread->error != RT_EOK) { /* return error */ return thread->error; } /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* if it's not waiting forever and then re-calculate timeout tick */ if (timeout > 0) { tick_delta = rt_tick_get() - tick_delta; timeout -= tick_delta; if (timeout < 0) timeout = 0; } } ``` - 如跳回來,重新計算 `timeout` ```c=+ /* get message from queue */ msg = (struct rt_mq_message *)mq->msg_queue_head; /* move message queue head */ mq->msg_queue_head = msg->next; /* reach queue tail, set to NULL */ if (mq->msg_queue_tail == msg) mq->msg_queue_tail = RT_NULL; ``` - 如果 message queue 有資料,拿第一顆,同時更新 head(tail,如果需要) ```c=+ /* decrease message entry */ mq->entry --; ``` - 更新 entry ```c=+ /* enable interrupt */ rt_hw_interrupt_enable(temp); /* copy message */ rt_memcpy(buffer, msg + 1, size > mq->msg_size ? mq->msg_size : size); ``` - 接著複製找到的訊息 ```c=+ /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* put message to free list */ msg->next = (struct rt_mq_message *)mq->msg_queue_free; mq->msg_queue_free = msg; /* enable interrupt */ rt_hw_interrupt_enable(temp); RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mq->parent.parent))); return RT_EOK; } RTM_EXPORT(rt_mq_recv); ``` - 最後將 `msg` 插入 free list 的頭 ###### tags: `RT-Thread` `kernel` `ipc` `message queue`