Try   HackMD

RT-Thread IPC Message Queue

  • 特性:可接受不固定長度的訊息

結構

File: rtdef.h

#ifdef RT_USING_MESSAGEQUEUE /** * message queue structure */ struct rt_messagequeue { struct rt_ipc_object parent; /**< inherit from ipc_object */ void *msg_pool; /**< start address of message queue */ rt_uint16_t msg_size; /**< message size of each message */ rt_uint16_t max_msgs; /**< max number of messages */ rt_uint16_t entry; /**< index of messages in the queue */ void *msg_queue_head; /**< list head */ void *msg_queue_tail; /**< list tail */ void *msg_queue_free; /**< pointer indicated the free node of queue */ }; typedef struct rt_messagequeue *rt_mq_t; #endif

建立 message queue

動態記憶體管理

rt_mq_create

功能 回傳值
建立 message queue message queue
*name msg_size max_msgs flag
名字 一封訊息的大小 訊息數上限 FIFO / PRIO
/** * This function will create a message queue object from system resource * * @param name the name of message queue * @param msg_size the size of message * @param max_msgs the maximum number of message in queue * @param flag the flag of message queue * * @return the created message queue, RT_NULL on error happen */ rt_mq_t rt_mq_create(const char *name, rt_size_t msg_size, rt_size_t max_msgs, rt_uint8_t flag) { struct rt_messagequeue *mq; struct rt_mq_message *head; register rt_base_t temp; RT_DEBUG_NOT_IN_INTERRUPT; /* allocate object */ mq = (rt_mq_t)rt_object_allocate(RT_Object_Class_MessageQueue, name); if (mq == RT_NULL) return mq; /* set parent */ mq->parent.parent.flag = flag; /* init ipc object */ rt_ipc_object_init(&(mq->parent));
  • 首先要一塊物件給 message queue,並同時填入 flag 及初始化
/* init message queue */ /* get correct message size */ mq->msg_size = RT_ALIGN(msg_size, RT_ALIGN_SIZE); mq->max_msgs = max_msgs;
  • 設定訊息的大小,與訊息數量的上限

RT_ALGIN 目的在對齊訊息的大小,根據不同板子所定義不同的 RT_ALIGN_SIZE 會有所差別
#define RT_ALIGN(size, align) (((size) + (align) - 1) & ~((align) - 1))

  • 如傳進來的是 RT_ALGIN(7,8) 則結果是 8
  • 如傳進來的是 RT_ALGIN(13,4) 則結果是 16
  • 即結果為大於後值的最小倍數
/* allocate message pool */ mq->msg_pool = RT_KERNEL_MALLOC((mq->msg_size + sizeof(struct rt_mq_message)) * mq->max_msgs); if (mq->msg_pool == RT_NULL) { rt_mq_delete(mq); return RT_NULL; }
  • 接著需 allocate 適當的記憶體存放訊息
/* init message list */ mq->msg_queue_head = RT_NULL; mq->msg_queue_tail = RT_NULL; /* init message empty list */ mq->msg_queue_free = RT_NULL; for (temp = 0; temp < mq->max_msgs; temp ++) { head = (struct rt_mq_message *)((rt_uint8_t *)mq->msg_pool + temp * (mq->msg_size + sizeof(struct rt_mq_message))); head->next = mq->msg_queue_free; mq->msg_queue_free = head; }
  • 先將頭尾設為空,再一塊一塊的將 msg_pool 插在 free list 的第一顆
/* the initial entry is zero */ mq->entry = 0; return mq; } RTM_EXPORT(rt_mq_create);
  • 最後設定 entry 為 0

靜態記憶體管理

rt_mq_init

功能 回傳值
初始化 message queue RT_EOK
mq *name *msgpool msg_size pool_size flag
message queue 本體 名字 存放訊息的位址 一封訊息的大小 存放訊息的大小 FIFO / PRIO
/** * This function will initialize a message queue and put it under control of * resource management. * * @param mq the message object * @param name the name of message queue * @param msgpool the beginning address of buffer to save messages * @param msg_size the maximum size of message * @param pool_size the size of buffer to save messages * @param flag the flag of message queue * * @return the operation status, RT_EOK on successful */ rt_err_t rt_mq_init(rt_mq_t mq, const char *name, void *msgpool, rt_size_t msg_size, rt_size_t pool_size, rt_uint8_t flag) { struct rt_mq_message *head; register rt_base_t temp; /* parameter check */ RT_ASSERT(mq != RT_NULL); /* init object */ rt_object_init(&(mq->parent.parent), RT_Object_Class_MessageQueue, name); /* set parent flag */ mq->parent.parent.flag = flag; /* init ipc object */ rt_ipc_object_init(&(mq->parent));
  • 這裡就不需要去要一塊物件,直接拿來用即可
/* set messasge pool */ mq->msg_pool = msgpool; /* get correct message size */ mq->msg_size = RT_ALIGN(msg_size, RT_ALIGN_SIZE); mq->max_msgs = pool_size / (mq->msg_size + sizeof(struct rt_mq_message)); /* init message list */ mq->msg_queue_head = RT_NULL; mq->msg_queue_tail = RT_NULL; /* init message empty list */ mq->msg_queue_free = RT_NULL; for (temp = 0; temp < mq->max_msgs; temp ++) { head = (struct rt_mq_message *)((rt_uint8_t *)mq->msg_pool + temp * (mq->msg_size + sizeof(struct rt_mq_message))); head->next = mq->msg_queue_free; mq->msg_queue_free = head; } /* the initial entry is zero */ mq->entry = 0; return RT_EOK; } RTM_EXPORT(rt_mq_init);
  • 其餘的動作皆與上面相同

刪除 message queue

動態記憶體管理

rt_mq_delete

功能 回傳值
刪除 message queue RT_EOK
mq
欲刪除的 message queue
/** * This function will delete a message queue object and release the memory * * @param mq the message queue object * * @return the error code */ rt_err_t rt_mq_delete(rt_mq_t mq) { RT_DEBUG_NOT_IN_INTERRUPT; /* parameter check */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(rt_object_is_systemobject(&mq->parent.parent) == RT_FALSE); /* resume all suspended thread */ rt_ipc_list_resume_all(&(mq->parent.suspend_thread));
  • 先把正在等待收訊息的 thread 叫醒
/* free message queue pool */ RT_KERNEL_FREE(mq->msg_pool); /* delete message queue object */ rt_object_delete(&(mq->parent.parent)); return RT_EOK; } RTM_EXPORT(rt_mq_delete);
  • 接著 free msg_pool,並刪除物件

靜態記憶體管理

rt_mq_detach

功能 回傳值
刪除 message queue RT_EOK
mq
欲刪除的 message queue
/** * This function will detach a message queue object from resource management * * @param mq the message queue object * * @return the operation status, RT_EOK on successful */ rt_err_t rt_mq_detach(rt_mq_t mq) { /* parameter check */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(rt_object_is_systemobject(&mq->parent.parent)); /* resume all suspended thread */ rt_ipc_list_resume_all(&mq->parent.suspend_thread); /* detach message queue object */ rt_object_detach(&(mq->parent.parent)); return RT_EOK; } RTM_EXPORT(rt_mq_detach);
  • 這裡的 msg_pool 就不需要 free

傳送訊息

rt_mq_send

功能 回傳值
傳送訊息 RT_EOK
mq *buffer size
欲傳送的 message queue 訊息資料 訊息大小
/** * This function will send a message to message queue object, if there are * threads suspended on message queue object, it will be waked up. * * @param mq the message queue object * @param buffer the message * @param size the size of buffer * * @return the error code */ rt_err_t rt_mq_send(rt_mq_t mq, void *buffer, rt_size_t size) { register rt_ubase_t temp; struct rt_mq_message *msg; /* parameter check */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(buffer != RT_NULL); RT_ASSERT(size != 0); /* greater than one message size */ if (size > mq->msg_size) return -RT_ERROR; RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent))); /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* get a free list, there must be an empty item */ msg = (struct rt_mq_message *)mq->msg_queue_free; /* message queue is full */ if (msg == RT_NULL) { /* enable interrupt */ rt_hw_interrupt_enable(temp); return -RT_EFULL; }
  • 首先確定 message queue 沒滿(即 free list 不為空)
  • 如果滿了,回傳 FULL
/* move free list pointer */ mq->msg_queue_free = msg->next;
  • 接著 free list 往下一顆走
/* enable interrupt */ rt_hw_interrupt_enable(temp); /* the msg is the new tailer of list, the next shall be NULL */ msg->next = RT_NULL; /* copy buffer */ rt_memcpy(msg + 1, buffer, size);
  • 將訊息填入從 free list 拿的一顆(msg),這顆待會是新的尾巴(設定 next = NULL
/* disable interrupt */ temp = rt_hw_interrupt_disable(); /* link msg to message queue */ if (mq->msg_queue_tail != RT_NULL) { /* if the tail exists, */ ((struct rt_mq_message *)mq->msg_queue_tail)->next = msg; }
  • 如果尾巴不為空(也就是 message queue 有東西),將原本的尾巴指向 msg
/* set new tail */ mq->msg_queue_tail = msg; /* if the head is empty, set head */ if (mq->msg_queue_head == RT_NULL) mq->msg_queue_head = msg;
  • 設定新的尾巴
  • 如果頭為空(也就是 message queue 為空),設定新的頭
/* increase message entry */ mq->entry ++;
  • 最後更新 entry
/* resume suspended thread */ if (!rt_list_isempty(&mq->parent.suspend_thread)) { rt_ipc_list_resume(&(mq->parent.suspend_thread)); /* enable interrupt */ rt_hw_interrupt_enable(temp); rt_schedule(); return RT_EOK; } /* enable interrupt */ rt_hw_interrupt_enable(temp); return RT_EOK; } RTM_EXPORT(rt_mq_send);
  • 如果有人在等待接收訊息,叫醒他

傳送緊急訊息

  • 與上面不同的是:這裡將新訊息插入第一顆

rt_mq_urgent

功能 回傳值
傳送緊急訊息 RT_EOK
mq *buffer size
欲傳送的 message queue 訊息資料 訊息大小
/** * This function will send an urgent message to message queue object, which * means the message will be inserted to the head of message queue. If there * are threads suspended on message queue object, it will be waked up. * * @param mq the message queue object * @param buffer the message * @param size the size of buffer * * @return the error code */ rt_err_t rt_mq_urgent(rt_mq_t mq, void *buffer, rt_size_t size) { register rt_ubase_t temp; struct rt_mq_message *msg; /* parameter check */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(buffer != RT_NULL); RT_ASSERT(size != 0); /* greater than one message size */ if (size > mq->msg_size) return -RT_ERROR; RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent))); /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* get a free list, there must be an empty item */ msg = (struct rt_mq_message *)mq->msg_queue_free; /* message queue is full */ if (msg == RT_NULL) { /* enable interrupt */ rt_hw_interrupt_enable(temp); return -RT_EFULL; } /* move free list pointer */ mq->msg_queue_free = msg->next; /* enable interrupt */ rt_hw_interrupt_enable(temp); /* copy buffer */ rt_memcpy(msg + 1, buffer, size);
  • 因為要插在第一顆,next 就不用設定為空了
/* disable interrupt */ temp = rt_hw_interrupt_disable(); /* link msg to the beginning of message queue */ msg->next = mq->msg_queue_head; mq->msg_queue_head = msg;
  • 這裡就將新訊息插在第一顆
/* if there is no tail */ if (mq->msg_queue_tail == RT_NULL) mq->msg_queue_tail = msg;
  • 如果原本的 message queue 為空,設定新的尾巴
/* increase message entry */ mq->entry ++; /* resume suspended thread */ if (!rt_list_isempty(&mq->parent.suspend_thread)) { rt_ipc_list_resume(&(mq->parent.suspend_thread)); /* enable interrupt */ rt_hw_interrupt_enable(temp); rt_schedule(); return RT_EOK; } /* enable interrupt */ rt_hw_interrupt_enable(temp); return RT_EOK; } RTM_EXPORT(rt_mq_urgent);
  • 其他的動作皆相同

接收訊息

rt_mq_recv

功能 回傳值
接收訊息 RT_EOK
mq *buffer size timeout
欲訊息 message queue 訊息存放處 訊息存放處大小 等待時間(如果需要)
/** * This function will receive a message from message queue object, if there is * no message in message queue object, the thread shall wait for a specified * time. * * @param mq the message queue object * @param buffer the received message will be saved in * @param size the size of buffer * @param timeout the waiting time * * @return the error code */ rt_err_t rt_mq_recv(rt_mq_t mq, void *buffer, rt_size_t size, rt_int32_t timeout) { struct rt_thread *thread; register rt_ubase_t temp; struct rt_mq_message *msg; rt_uint32_t tick_delta; /* parameter check */ RT_ASSERT(mq != RT_NULL); RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue); RT_ASSERT(buffer != RT_NULL); RT_ASSERT(size != 0); /* initialize delta tick */ tick_delta = 0; /* get current thread */ thread = rt_thread_self(); RT_OBJECT_HOOK_CALL(rt_object_trytake_hook, (&(mq->parent.parent))); /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* for non-blocking call */ if (mq->entry == 0 && timeout == 0) { rt_hw_interrupt_enable(temp); return -RT_ETIMEOUT; } /* message queue is empty */ while (mq->entry == 0) { RT_DEBUG_IN_THREAD_CONTEXT; /* reset error number in thread */ thread->error = RT_EOK; /* no waiting, return timeout */ if (timeout == 0) { /* enable interrupt */ rt_hw_interrupt_enable(temp); thread->error = -RT_ETIMEOUT; return -RT_ETIMEOUT; }
  • 如果 message queue 為空,且不等待,回傳 TIMEOUT
/* suspend current thread */ rt_ipc_list_suspend(&(mq->parent.suspend_thread), thread, mq->parent.parent.flag);
  • 如要等待,將 thread 掛在等待鏈上
/* has waiting time, start thread timer */ if (timeout > 0) { /* get the start tick of timer */ tick_delta = rt_tick_get(); RT_DEBUG_LOG(RT_DEBUG_IPC, ("set thread:%s to timer list\n", thread->name)); /* reset the timeout of thread timer and start it */ rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &timeout); rt_timer_start(&(thread->thread_timer)); }
  • 並啟動一個 timer
/* enable interrupt */ rt_hw_interrupt_enable(temp); /* re-schedule */ rt_schedule();
  • 開始等待,做一次調度
/* recv message */ if (thread->error != RT_EOK) { /* return error */ return thread->error; } /* disable interrupt */ temp = rt_hw_interrupt_disable(); /* if it's not waiting forever and then re-calculate timeout tick */ if (timeout > 0) { tick_delta = rt_tick_get() - tick_delta; timeout -= tick_delta; if (timeout < 0) timeout = 0; } }
  • 如跳回來,重新計算 timeout
/* get message from queue */ msg = (struct rt_mq_message *)mq->msg_queue_head; /* move message queue head */ mq->msg_queue_head = msg->next; /* reach queue tail, set to NULL */ if (mq->msg_queue_tail == msg) mq->msg_queue_tail = RT_NULL;
  • 如果 message queue 有資料,拿第一顆,同時更新 head(tail,如果需要)
/* decrease message entry */ mq->entry --;
  • 更新 entry
/* enable interrupt */ rt_hw_interrupt_enable(temp); /* copy message */ rt_memcpy(buffer, msg + 1, size > mq->msg_size ? mq->msg_size : size);
  • 接著複製找到的訊息
/* disable interrupt */ temp = rt_hw_interrupt_disable(); /* put message to free list */ msg->next = (struct rt_mq_message *)mq->msg_queue_free; mq->msg_queue_free = msg; /* enable interrupt */ rt_hw_interrupt_enable(temp); RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mq->parent.parent))); return RT_EOK; } RTM_EXPORT(rt_mq_recv);
  • 最後將 msg 插入 free list 的頭
tags: RT-Thread kernel ipc message queue