# [Legato] Event Loop > https://docs.legato.io/latest/c_eventLoop.html https://docs.legato.io/latest/le__eventLoop_8h.html ## Legato Event Loop 基本介紹 Legato Event Loop API 支援 event-driven programming。 每個 Legato thread 都會有一個 Event Loop,而每個 Event Loop 裡面都會有一個 Event Queue。當 Event Queue 裡面的 Event 到達了 Queue 的 front,就會處理該 Event(也就是執行其 handler function)。 Legato 的 event 是用 Event ID(`le_event_Id_t`)來識別每一個 event,我們要使用 `le_event_CreateId()` 來建立 event。 若要發起 event,則使用 `le_event_Report()`。 ## Legato Event Loop 使用方式 Legato Event Loop 有以下幾種使用方式: - [Publish-Subscribe Events](https://docs.legato.io/latest/c_eventLoop.html#c_event_publishSubscribe) - 訂閱 event,當訂閱的 event 被發佈,就執行 handler - [Queue a Function to a Thread's Event Queue](https://docs.legato.io/latest/c_eventLoop.html#c_event_deferredFunctionCalls): - 將 function 丟到自己或別的 thread 的 Event Queue,當到達 Event Queue 的 front 就執行該 function - [Layered Publish-Subscribe Handlers](https://docs.legato.io/latest/c_eventLoop.html#c_event_layeredPublishSubscribe) - 用在 server 和 client 的模式,讓 client 可以因 server 的某個 event 的發生,而去執行 handler。 - 當 Server 發佈某個 event,server 會先執行第一層 handler,且會在第一層 handler 裡面呼叫第二層 handler,而第二層 handler 就是由 client 執行。 - 細節可參閱 [[Legato] Layered Publish-Subscribe Handlers](/7TboYqtmSKKPFHmC4TqVbg) 後面會介紹 Publish-Subscribe Events、Queue a Function to a Thread's Event Queue、以及如何設定 Event Data 和 Handler Context。 ## 啟動 Event Loop ### Worker Thread - 經由 `le_thread_Create()` 所建立的 Legato worker thraed 並不會自動執行它的 Event Loop,worker thread 必須要呼叫 `le_event_RunLoop()` 才會開始跑它的 Event Loop。 - 若 worker thread 沒有呼叫 `le_event_RunLoop()`,則放到其 Event Queue 的 function 和訂閱的 event handler 都不會被執行。 - Worker thread 一旦執行 `le_event_RunLoop()` 後就不會自己結束(除非執行 `le_thread_Exit()` 或被其他人執行 `le_thread_Cancel()`),此時若有其他 thread 呼叫了 `le_thread_Join()` 在等該 worker thread 結束,會永遠都等不到。 ### Main Thread main thread 執行完 `COMPONENT_INIT` 的事情後,就會主動去跑它的 Event Loop,所以 main thread 不用也不可執行 `le_event_RunLoop()`。 ## Publish-Subscribe Events ### 基本使用流程 1. 使用 [`le_event_CreateId()`](https://docs.legato.io/latest/le__eventLoop_8h.html#a41a96eb3affb07184b519164cf54e213) 建立 event。 2. Subscriber thread 使用 [`le_event_AddHandler()`](https://docs.legato.io/latest/le__eventLoop_8h.html#ae65a65b4111618f47d7e6d57a48289e5) 訂閱該 event,並指定該 event 發生後欲執行的 handler function。 - [`le_event_AddHandler()`](https://docs.legato.io/latest/le__eventLoop_8h.html#ae65a65b4111618f47d7e6d57a48289e5) 此 API 的其中兩個參數是 Event ID 和 handler function。 - 呼叫此 API 的 thread 會訂閱該 event,當該 event 發生了,該 event 就會被放到有訂閱此 event 的 thread 的 Event Queue,待 event 到達 Event Queue 的 front,就會執行該 thread 指定的 handler function。 3. Subscriber thread 執行 [`le_event_RunLoop()`](https://docs.legato.io/latest/le__eventLoop_8h.html#ae313b457994371c658be9fe0494a01ff) 開始跑自己的 Event Loop。 - 若無執行此 API,則即使訂閱的 event 被 report 了,也不會去執行 handler function。 4. Publisher thread 使用 [`le_event_Report()`](https://docs.legato.io/latest/le__eventLoop_8h.html#ae3ffe6990b70fb572b4eef06739b4f54) 發布某個 event 的發生。 - 一旦某個 event 被 report,則所有有訂閱該 event 的 thread 的 Event Queue 都會被放入該 event,一旦 event 到達 Event Queue 的 front,就會執行該 thread 為該 event 指定的 handler function。 ### 其他注意事項 - 若一個 event 已被加入 Event Queue,但尚未到達 Queue 的 front,結果就被 `le_event_RemoveHandler` 將該 handler 移除,則該 handler 就不會被呼叫。 一旦執行 `le_event_AddHandler()` 訂閱了 Event,則不論是否已執行 `le_event_RunLoop()`,只要該 Event 有被 report,就會被放到 Event Queue,待 `le_event_RunLoop()` 被執行後就會開始執行其 Event Loop。 ## Queue a Function to a Thread's Event Queue ### 使用方式 - [`le_event_QueueFunction()`](https://docs.legato.io/latest/le__eventLoop_8h.html#a6dcc88f96060c5bc107a81a978132f38) - 將指定的 function 放到呼叫此 API 的 thread 的 Event Queue - [`le_event_QueueFunctionToThread()`](https://docs.legato.io/latest/le__eventLoop_8h.html#a228da2d1f53ffa74517f108b0dcfa4d9) - 將指定的 function 放到指定的 thread 的 Event Queue - 被丟到 Event Queue 的 function 必須符合以下的 function prototype: - [`typedef void (*le_event_DeferredFunc_t) (void *param1Ptr, void *param2Ptr)`](https://docs.legato.io/latest/le__eventLoop_8h.html#a6f86581d4d2bf24647d5efaff1046d5d) ### 其他注意事項: - 被放 function 到 Event Queue 的 thread,一定也要執行 [`le_event_RunLoop()`](https://docs.legato.io/latest/le__eventLoop_8h.html#ae313b457994371c658be9fe0494a01ff) 才會去跑他的 Event Loop - 若沒有執行 [`le_event_RunLoop()`](https://docs.legato.io/latest/le__eventLoop_8h.html#ae313b457994371c658be9fe0494a01ff),則不論是 queued function 或訂閱的 event,都不會執行 以下是 Publish-Subscribe Event 和 Queue Function to Event Queue 大致的使用流程: ![legato_event_loop](https://hackmd.io/_uploads/SymdrYHRC.png) ### Example 以下例子,欲測試以下幾件事: - 3 個 Events,分別在以下時機點被 report: - Event1:worker thread 訂閱 event handler 之前,且是在 worker thread 執行 `le_event_RunLoop` 之前 - Event2:worker thread 訂閱 event handler 之後,但在 worker thread 執行 `le_event_RunLoop` 之前 - Event3:worker thread 執行完 `le_event_RunLoop`(使用 semaphore 確保 Event3 在 worker thread 執行完 `le_event_RunLoop` 之後才被執行) - 在執行 `le_thread_Start` 之前和之後分別各 queue 一個 function 到 worker thread 的 Event Queue,測試在 worker thread 被啟動前所 queue 的 function 是否會被執行。 - main thread 也 queue 一個 function 到自己的 Event Queue,看是不是等到所有 `COMPONENT_INIT` 的 function 都 return 後 main thread 才會執行自己的 Event Loop。 #### Project Structure ``` simpleEventLoop ├── eventLoopComp │ ├── Component.cdef │ └── eventLoop.c └── simpleEventLoopApp.adef ``` #### `simpleEventLoopApp.adef` ```adef executables: { eventLoopExec = ( eventLoopComp ) } processes: { run: { eventLoopProc = ( eventLoopExec ) } } ``` #### `Component.cdef` ```cdef sources: { eventLoop.c } ``` #### `eventLoop.c` ```cpp #include "legato.h" #include "interfaces.h" #define EVENT_NUM (3) static le_thread_Ref_t threadRef = NULL; static le_event_Id_t event_id[EVENT_NUM]; static char name[20]; static le_sem_Ref_t semaphore = NULL; static void threadDestructor(void *context) { LE_INFO("[%s] inside thread destructor", le_thread_GetMyName()); } static void handlerFunc1(void *reportPtr) { LE_INFO("[%s] inside event1 handler function", le_thread_GetMyName()); } static void handlerFunc2(void *reportPtr) { LE_INFO("[%s] inside event2 handler function", le_thread_GetMyName()); } static void handlerFunc3(void *reportPtr) { LE_INFO("[%s] inside event3 handler function", le_thread_GetMyName()); le_thread_Exit(NULL); } static void postSemaphore() { le_sem_Post(semaphore); LE_INFO("[%s] just posted the semaphore", le_thread_GetMyName()); } static void *threadMainFunc(void *context) { LE_INFO("[%s] threadMainFunc executed", le_thread_GetMyName()); le_thread_AddDestructor(threadDestructor, context); le_event_Report(event_id[0], NULL, 0); LE_INFO("[%s] reported event id 1", le_thread_GetMyName()); le_event_AddHandler("MyEvent handler1", event_id[0], handlerFunc1); le_event_AddHandler("MyEvent handler2", event_id[1], handlerFunc2); le_event_AddHandler("MyEvent handler3", event_id[2], handlerFunc3); LE_INFO("[%s] just registered event handler 1~3", le_thread_GetMyName()); le_event_Report(event_id[1], NULL, 0); LE_INFO("[%s] reported event id 2", le_thread_GetMyName()); le_event_QueueFunction((le_event_DeferredFunc_t) postSemaphore, NULL, NULL); // queue the function that execute le_sem_Post() to worker thread's Event Queue // to make sure the semaphore will be posted after the le_event_RunLoop() had been executed le_event_RunLoop(); return NULL; } static void deferredFunc(void *param1Ptr, void *param2Ptr) { LE_INFO("[%s] inside deferredFunc, Msg: %s", le_thread_GetMyName(), (char *) param1Ptr); } COMPONENT_INIT { semaphore = le_sem_Create("MySemaphore", 0); for (int i = 0; i < EVENT_NUM; ++i) { snprintf(name, sizeof(name), "MyEvent%d", i + 1); event_id[i] = le_event_CreateId(name, 0); } threadRef = le_thread_Create("WorkerThread", threadMainFunc, NULL); le_thread_SetJoinable(threadRef); le_event_QueueFunctionToThread(threadRef, (le_event_DeferredFunc_t) deferredFunc, "Queued function1 before calling le_thread_Start", NULL); LE_INFO("[%s] just queued function1 to WorkerThread", le_thread_GetMyName()); le_thread_Start(threadRef); LE_INFO("[%s] just called le_thread_Start", le_thread_GetMyName()); le_event_QueueFunctionToThread(threadRef, (le_event_DeferredFunc_t) deferredFunc, "Queued funtion2 after calling le_thread_Start", NULL); LE_INFO("[%s] just queued function2 to WorkerThread", le_thread_GetMyName()); le_event_QueueFunction((le_event_DeferredFunc_t) deferredFunc, "Queued function3 to main thread to see when to be executed", NULL); LE_INFO("[%s] just queued function3 to itself", le_thread_GetMyName()); LE_INFO("[%s] is about to wait for a semaphore to make sure the worker thread finish calling le_event_AddHandler", le_thread_GetMyName()); le_sem_Wait(semaphore); LE_INFO("[%s] just finished waiting for the semaphore", le_thread_GetMyName()); le_sem_Delete(semaphore); le_event_Report(event_id[2], NULL, 0); LE_INFO("[%s] reported event id 3", le_thread_GetMyName()); LE_INFO("[%s] before calling le_thread_Join", le_thread_GetMyName()); le_thread_Join(threadRef, NULL); } ``` #### Result ![image2024-9-13_17-18-37](https://hackmd.io/_uploads/Sy5R8YH0R.png) 從執行結果可以看到: - Event 2 和 Event 3 的 handler function 都有被執行,所以只要是訂閱完 event handler 之後,不論是否已執行 `le_event_RunLoop`,若 event 有被 report,則之後執行了 `le_event_RunLoop` 後這些 event 的 handler 都會被執行。 - 只要 worker thread 被 create,不論是否已執行 `le_thread_Start`,只要將 function 放到該 worker thread 的 Event Queue,則執行 `le_event_RunLoop` 後都會執行這些 queued functions。 - 放到 main thread Event Queue 的 function 在最後才被執行,所以 main thread 會等到 `COMPONENT_INIT` 內的 function 都 return 後才會執行自己的 Event Loop。 此外,若將 worker thread 內執行 le_event_RunLoop 這段刪掉,則執行結果如下: ![image2024-9-13_17-20-45](https://hackmd.io/_uploads/rJjjwKH0R.png) 可以看到 workerThread 執行完 `threadMainFunc` 後就直接呼叫 destructor 了,無論是 queued function 或 event handler 都沒有被執行。 所以若想要 worker thread 處理 Event Queue 的東西,一定要呼叫 `le_event_RunLoop`。 ## 設定 Event Data 以及 Handler Context 我們可以設定: - 有時候當一個 Event 發生時,可能會有一些跟這個 Event 有關的 data 要被處理,我們可以自己定義這個 Event 被 report 時有哪些 data,然後將這些 data 傳給 handler function 處理。 - 我們也可以傳送 context 物件到 event handler 裡面使用。 :::info Legato 官方文件中,使用動詞的 report 來表示發佈某個 Event,也用名詞的 report 來表示要被傳給 handler function 處理的 data,我覺得用名詞的 report 來表示傳給 handler function 處理的 data 有點容易混淆,所以我這裡都是用 Event Data 來表示這個要傳給 handler function 處理的 data ::: ### 注意事項 - Event data 是跟著 event。 - 當 event 被 report,就會將 event data ++**COPY**++ 一份傳給 handler function,所以 handler function 內對 event data 做的事都++不會++影響到原本的 event data 物件。 - ++這個被 copy 而傳給 handler function 的 event data,其生命週期僅在 handler function 內有效,當 handler function return 後,它就會消失++。 - ++所以若要 keep 這個 data 的話,要自己在 handler function 內複製一份++。 - Handler context 是跟著 handler。 - Handler context 並++不會++被 copy 一份傳到 handler function 內,++而是直接 reference 到原本的 context 物件++,++所以在 handler function 內對 context 做的修改,都會影響到原本的 context 物件++。 ![legato_eventLoop_eventData_handlerContext](https://hackmd.io/_uploads/ByiuOYrAC.png) ### Example 以下例子,有兩個 Event,這兩個 Event 的 report data 都有 `int32_t n1` 和 `int32_t n2`。 另外,有兩個 handler function,一個 handler function 會將 `n1` 和 `n2` 相加,並儲存到 handler `context` 的 `int32_t res` 變數裡面,最後在 handler function 內將 `n1` 和 `n2` 都設定為 0。 另一個 handler function 則是將 `n1` 和 `n2` 相減,並將結果儲存到其 `context` 的 `int32_t res` 變數裡面,最後也會在 handler function 內將 `n1` 和 `n2` 都設定為 0。 #### Project Structure ``` . ├── eventLoop2App.adef └── eventLoopComp ├── Component.cdef └── eventLoop.c ``` #### `Component.cdef` ```cdef sources: { eventLoop.c } ``` #### `eventLoop.c` ```cpp #include "legato.h" #include "interfaces.h" #define EVENTDATA_EVENTID_SIZE (10) #define CONTEXT_HANDLERNAME_SIZE (10) #define CONTEXT_MSG_SIZE (100) static le_thread_Ref_t workerThreadRef = NULL; static le_thread_DestructorRef_t workerThreadDestructorRef = NULL; static le_event_Id_t event1, event2; static le_event_HandlerRef_t handler1Ref = NULL, handler2Ref = NULL; static le_sem_Ref_t semaphore[2] = {NULL}; typedef struct { int32_t n1; int32_t n2; char *event_id; } MyEventData_t; typedef struct { int32_t res; int32_t called_time; char *handler_name; char *msg; } MyHandlerContext_t; void handler1 (void *reportPtr) { MyHandlerContext_t *myHandlerContextPtr = (MyHandlerContext_t *) le_event_GetContextPtr(); MyEventData_t *myEventDataPtr = (MyEventData_t *) reportPtr; LE_INFO("[%s] inside %s with %s reported", le_thread_GetMyName(), myHandlerContextPtr->handler_name, myEventDataPtr->event_id); LE_INFO(" Address of the Event Data: %p", (void *) myEventDataPtr); LE_INFO(" Address of the Handler Context: %p", (void *) myHandlerContextPtr); myHandlerContextPtr->res = (myEventDataPtr->n1 + myEventDataPtr->n2); ++myHandlerContextPtr->called_time; free(myHandlerContextPtr->msg); myHandlerContextPtr->msg = (char *) malloc(sizeof(char) * CONTEXT_MSG_SIZE); snprintf(myHandlerContextPtr->msg, (sizeof(char) * CONTEXT_MSG_SIZE), "event ID: %s, n1(%d) + n2(%d) = result(%d)", myEventDataPtr->event_id, myEventDataPtr->n1, myEventDataPtr->n2, myHandlerContextPtr->res); myEventDataPtr->n1 = myEventDataPtr->n2 = 0; } void handler2 (void *reportPtr) { MyHandlerContext_t *myHandlerContextPtr = (MyHandlerContext_t *) le_event_GetContextPtr(); MyEventData_t *myEventDataPtr = (MyEventData_t *) reportPtr; LE_INFO("[%s] inside %s with %s reported", le_thread_GetMyName(), myHandlerContextPtr->handler_name, myEventDataPtr->event_id); LE_INFO(" Address of the Event Data %p", (void *) myEventDataPtr); LE_INFO(" Address of the Handler Context: %p", (void *) myHandlerContextPtr); myHandlerContextPtr->res = (myEventDataPtr->n1 - myEventDataPtr->n2); ++myHandlerContextPtr->called_time; free(myHandlerContextPtr->msg); myHandlerContextPtr->msg = (char *) malloc(sizeof(char) * CONTEXT_MSG_SIZE); snprintf(myHandlerContextPtr->msg, (sizeof(char) * CONTEXT_MSG_SIZE), "event ID: %s, n1(%d) - n2(%d) = result(%d)", myEventDataPtr->event_id, myEventDataPtr->n1, myEventDataPtr->n2, myHandlerContextPtr->res); myEventDataPtr->n1 = myEventDataPtr->n2 = 0; le_sem_Post(semaphore[1]); } void workerThreadDestructor(void *context) { LE_INFO("[%s] inside worker thread destructor", le_thread_GetMyName()); } void *workerThreadFunc(void *context) { // set event and its event handler, also making worker thread execute the event handler when the event is reported handler1Ref = le_event_AddHandler("handler1Ref", event1, handler1); handler2Ref = le_event_AddHandler("handler2Ref", event2, handler2); // add a destructor to this worker thread workerThreadDestructorRef = le_thread_AddDestructor(workerThreadDestructor, NULL); le_sem_Post(semaphore[0]); le_event_RunLoop(); return NULL; } COMPONENT_INIT { semaphore[0] = le_sem_Create("MySemaphore1", 0); semaphore[1] = le_sem_Create("MySemaphore2", 0); // create events event1 = le_event_CreateId("event1", sizeof(MyEventData_t)); event2 = le_event_CreateId("event2", sizeof(MyEventData_t)); // set event data for event1 and event2 MyEventData_t event1_data = {.n1 = 300, .n2 = 200, .event_id = NULL}; event1_data.event_id = (char *) malloc(sizeof(char) * EVENTDATA_EVENTID_SIZE); strcpy(event1_data.event_id, "Event1"); MyEventData_t event2_data = {.n1 = 500, .n2 = 100, .event_id = NULL}; event2_data.event_id = (char *) malloc(sizeof(char) * EVENTDATA_EVENTID_SIZE); strcpy(event2_data.event_id, "Event2"); LE_INFO("[%s] address of event1_data: %p", le_thread_GetMyName(), (void *) &event1_data); LE_INFO("[%s] address of event2_data: %p", le_thread_GetMyName(), (void *) &event2_data); // create thread and set its thread function workerThreadRef = le_thread_Create("WorkerThread", workerThreadFunc, NULL); // start threads le_thread_Start(workerThreadRef); // wait for the worker thread main function to finish le_sem_Wait(semaphore[0]); // set the context of event handler1 and handler 2 MyHandlerContext_t handler1_context = {.res = 0, .called_time = 0, .handler_name = NULL, .msg = NULL}; handler1_context.handler_name = (char *) malloc(sizeof(char) * CONTEXT_HANDLERNAME_SIZE); strcpy(handler1_context.handler_name, "Handler1"); handler1_context.msg = (char *) malloc(sizeof(char) * 50); strcpy(handler1_context.msg, "This is the message of handler1 context"); MyHandlerContext_t handler2_context = {.res = 0, .called_time = 0, .handler_name = NULL, .msg = NULL}; handler2_context.handler_name = (char *) malloc(sizeof(char) * CONTEXT_HANDLERNAME_SIZE); strcpy(handler2_context.handler_name, "Handler2"); handler2_context.msg = (char *) malloc(sizeof(char) * 50); strcpy(handler2_context.msg, "This is the message of handler2 context"); LE_INFO("[%s] address of handler1_context: %p", le_thread_GetMyName(), &handler1_context); LE_INFO("[%s] address of handler2_context: %p", le_thread_GetMyName(), &handler2_context); le_event_SetContextPtr(handler1Ref, (void *) &handler1_context); le_event_SetContextPtr(handler2Ref, (void *) &handler2_context); // le_event_Report will MAKE A COPY of the event data to the handler le_event_Report(event1, &event1_data, sizeof(event1_data)); le_event_Report(event2, &event2_data, sizeof(event2_data)); // wait for the worker thread to finish the event handler le_sem_Wait(semaphore[1]); // check the final value of event1_data and event2_data LE_INFO("[%s] checking final values of Event Data and Handler Context", le_thread_GetMyName()); LE_INFO("[%s] address of event1_data: %p, event1_data.n1: %d, event1_data.n2: %d", le_thread_GetMyName(), (void *) &event1_data, event1_data.n1, event1_data.n2); LE_INFO("[%s] address of event2_data: %p, event2_data.n1: %d, event2_data.n2: %d", le_thread_GetMyName(), (void *) &event2_data, event2_data.n1, event2_data.n2); // check the final value of context LE_INFO("[%s] address of handler1_context: %p, handler1_context.res: %d, handler1_context.called_time: %d, handler1_context.msg: \'%s\'", le_thread_GetMyName(), (void *) &handler1_context, handler1_context.res, handler1_context.called_time, handler1_context.msg); LE_INFO("[%s] address of handler2_context: %p, handler2_context.res: %d, handler2_context.called_time: %d, handler2_context.msg: \'%s\'", le_thread_GetMyName(), (void *) &handler2_context, handler2_context.res, handler2_context.called_time, handler2_context.msg); // release dynamically allocated resources free(event1_data.event_id); free(event2_data.event_id); free(handler1_context.handler_name); free(handler1_context.msg); free(handler2_context.handler_name); free(handler2_context.msg); // terminate the worker thread le_thread_Cancel(workerThreadRef); } ``` #### Result ![image2024-9-18_10-36-54](https://hackmd.io/_uploads/SJjutYrRR.png) 從結果可以看到: - handler function 內印出的 event data 的記憶體位址跟 main 裡面印出來的結果不一樣,且 handler function 裡面將 n1 和 n2 都設定為 0,但並不影響到 main 裡面的 event data 的值。 - 在 handler context 的部份,handler function 內印出的 context 的記憶體位址就跟 main 裡面印出來的位址一樣,且在 handler function 內對 context 做的修改,在 main 裡面也會看到相同的變動。