# [Legato] Event Loop
> https://docs.legato.io/latest/c_eventLoop.html
https://docs.legato.io/latest/le__eventLoop_8h.html
## Legato Event Loop 基本介紹
Legato Event Loop API 支援 event-driven programming。
每個 Legato thread 都會有一個 Event Loop,而每個 Event Loop 裡面都會有一個 Event Queue。當 Event Queue 裡面的 Event 到達了 Queue 的 front,就會處理該 Event(也就是執行其 handler function)。
Legato 的 event 是用 Event ID(`le_event_Id_t`)來識別每一個 event,我們要使用 `le_event_CreateId()` 來建立 event。
若要發起 event,則使用 `le_event_Report()`。
## Legato Event Loop 使用方式
Legato Event Loop 有以下幾種使用方式:
- [Publish-Subscribe Events](https://docs.legato.io/latest/c_eventLoop.html#c_event_publishSubscribe)
- 訂閱 event,當訂閱的 event 被發佈,就執行 handler
- [Queue a Function to a Thread's Event Queue](https://docs.legato.io/latest/c_eventLoop.html#c_event_deferredFunctionCalls):
- 將 function 丟到自己或別的 thread 的 Event Queue,當到達 Event Queue 的 front 就執行該 function
- [Layered Publish-Subscribe Handlers](https://docs.legato.io/latest/c_eventLoop.html#c_event_layeredPublishSubscribe)
- 用在 server 和 client 的模式,讓 client 可以因 server 的某個 event 的發生,而去執行 handler。
- 當 Server 發佈某個 event,server 會先執行第一層 handler,且會在第一層 handler 裡面呼叫第二層 handler,而第二層 handler 就是由 client 執行。
- 細節可參閱 [[Legato] Layered Publish-Subscribe Handlers](/7TboYqtmSKKPFHmC4TqVbg)
後面會介紹 Publish-Subscribe Events、Queue a Function to a Thread's Event Queue、以及如何設定 Event Data 和 Handler Context。
## 啟動 Event Loop
### Worker Thread
- 經由 `le_thread_Create()` 所建立的 Legato worker thraed 並不會自動執行它的 Event Loop,worker thread 必須要呼叫 `le_event_RunLoop()` 才會開始跑它的 Event Loop。
- 若 worker thread 沒有呼叫 `le_event_RunLoop()`,則放到其 Event Queue 的 function 和訂閱的 event handler 都不會被執行。
- Worker thread 一旦執行 `le_event_RunLoop()` 後就不會自己結束(除非執行 `le_thread_Exit()` 或被其他人執行 `le_thread_Cancel()`),此時若有其他 thread 呼叫了 `le_thread_Join()` 在等該 worker thread 結束,會永遠都等不到。
### Main Thread
main thread 執行完 `COMPONENT_INIT` 的事情後,就會主動去跑它的 Event Loop,所以 main thread 不用也不可執行 `le_event_RunLoop()`。
## Publish-Subscribe Events
### 基本使用流程
1. 使用 [`le_event_CreateId()`](https://docs.legato.io/latest/le__eventLoop_8h.html#a41a96eb3affb07184b519164cf54e213) 建立 event。
2. Subscriber thread 使用 [`le_event_AddHandler()`](https://docs.legato.io/latest/le__eventLoop_8h.html#ae65a65b4111618f47d7e6d57a48289e5) 訂閱該 event,並指定該 event 發生後欲執行的 handler function。
- [`le_event_AddHandler()`](https://docs.legato.io/latest/le__eventLoop_8h.html#ae65a65b4111618f47d7e6d57a48289e5) 此 API 的其中兩個參數是 Event ID 和 handler function。
- 呼叫此 API 的 thread 會訂閱該 event,當該 event 發生了,該 event 就會被放到有訂閱此 event 的 thread 的 Event Queue,待 event 到達 Event Queue 的 front,就會執行該 thread 指定的 handler function。
3. Subscriber thread 執行 [`le_event_RunLoop()`](https://docs.legato.io/latest/le__eventLoop_8h.html#ae313b457994371c658be9fe0494a01ff) 開始跑自己的 Event Loop。
- 若無執行此 API,則即使訂閱的 event 被 report 了,也不會去執行 handler function。
4. Publisher thread 使用 [`le_event_Report()`](https://docs.legato.io/latest/le__eventLoop_8h.html#ae3ffe6990b70fb572b4eef06739b4f54) 發布某個 event 的發生。
- 一旦某個 event 被 report,則所有有訂閱該 event 的 thread 的 Event Queue 都會被放入該 event,一旦 event 到達 Event Queue 的 front,就會執行該 thread 為該 event 指定的 handler function。
### 其他注意事項
- 若一個 event 已被加入 Event Queue,但尚未到達 Queue 的 front,結果就被 `le_event_RemoveHandler` 將該 handler 移除,則該 handler 就不會被呼叫。
一旦執行 `le_event_AddHandler()` 訂閱了 Event,則不論是否已執行 `le_event_RunLoop()`,只要該 Event 有被 report,就會被放到 Event Queue,待 `le_event_RunLoop()` 被執行後就會開始執行其 Event Loop。
## Queue a Function to a Thread's Event Queue
### 使用方式
- [`le_event_QueueFunction()`](https://docs.legato.io/latest/le__eventLoop_8h.html#a6dcc88f96060c5bc107a81a978132f38)
- 將指定的 function 放到呼叫此 API 的 thread 的 Event Queue
- [`le_event_QueueFunctionToThread()`](https://docs.legato.io/latest/le__eventLoop_8h.html#a228da2d1f53ffa74517f108b0dcfa4d9)
- 將指定的 function 放到指定的 thread 的 Event Queue
- 被丟到 Event Queue 的 function 必須符合以下的 function prototype:
- [`typedef void (*le_event_DeferredFunc_t) (void *param1Ptr, void *param2Ptr)`](https://docs.legato.io/latest/le__eventLoop_8h.html#a6f86581d4d2bf24647d5efaff1046d5d)
### 其他注意事項:
- 被放 function 到 Event Queue 的 thread,一定也要執行 [`le_event_RunLoop()`](https://docs.legato.io/latest/le__eventLoop_8h.html#ae313b457994371c658be9fe0494a01ff) 才會去跑他的 Event Loop
- 若沒有執行 [`le_event_RunLoop()`](https://docs.legato.io/latest/le__eventLoop_8h.html#ae313b457994371c658be9fe0494a01ff),則不論是 queued function 或訂閱的 event,都不會執行
以下是 Publish-Subscribe Event 和 Queue Function to Event Queue 大致的使用流程:

### Example
以下例子,欲測試以下幾件事:
- 3 個 Events,分別在以下時機點被 report:
- Event1:worker thread 訂閱 event handler 之前,且是在 worker thread 執行 `le_event_RunLoop` 之前
- Event2:worker thread 訂閱 event handler 之後,但在 worker thread 執行 `le_event_RunLoop` 之前
- Event3:worker thread 執行完 `le_event_RunLoop`(使用 semaphore 確保 Event3 在 worker thread 執行完 `le_event_RunLoop` 之後才被執行)
- 在執行 `le_thread_Start` 之前和之後分別各 queue 一個 function 到 worker thread 的 Event Queue,測試在 worker thread 被啟動前所 queue 的 function 是否會被執行。
- main thread 也 queue 一個 function 到自己的 Event Queue,看是不是等到所有 `COMPONENT_INIT` 的 function 都 return 後 main thread 才會執行自己的 Event Loop。
#### Project Structure
```
simpleEventLoop
├── eventLoopComp
│ ├── Component.cdef
│ └── eventLoop.c
└── simpleEventLoopApp.adef
```
#### `simpleEventLoopApp.adef`
```adef
executables:
{
eventLoopExec = ( eventLoopComp )
}
processes:
{
run:
{
eventLoopProc = ( eventLoopExec )
}
}
```
#### `Component.cdef`
```cdef
sources:
{
eventLoop.c
}
```
#### `eventLoop.c`
```cpp
#include "legato.h"
#include "interfaces.h"
#define EVENT_NUM (3)
static le_thread_Ref_t threadRef = NULL;
static le_event_Id_t event_id[EVENT_NUM];
static char name[20];
static le_sem_Ref_t semaphore = NULL;
static void threadDestructor(void *context)
{
LE_INFO("[%s] inside thread destructor", le_thread_GetMyName());
}
static void handlerFunc1(void *reportPtr)
{
LE_INFO("[%s] inside event1 handler function", le_thread_GetMyName());
}
static void handlerFunc2(void *reportPtr)
{
LE_INFO("[%s] inside event2 handler function", le_thread_GetMyName());
}
static void handlerFunc3(void *reportPtr)
{
LE_INFO("[%s] inside event3 handler function", le_thread_GetMyName());
le_thread_Exit(NULL);
}
static void postSemaphore()
{
le_sem_Post(semaphore);
LE_INFO("[%s] just posted the semaphore", le_thread_GetMyName());
}
static void *threadMainFunc(void *context)
{
LE_INFO("[%s] threadMainFunc executed", le_thread_GetMyName());
le_thread_AddDestructor(threadDestructor, context);
le_event_Report(event_id[0], NULL, 0);
LE_INFO("[%s] reported event id 1", le_thread_GetMyName());
le_event_AddHandler("MyEvent handler1", event_id[0], handlerFunc1);
le_event_AddHandler("MyEvent handler2", event_id[1], handlerFunc2);
le_event_AddHandler("MyEvent handler3", event_id[2], handlerFunc3);
LE_INFO("[%s] just registered event handler 1~3", le_thread_GetMyName());
le_event_Report(event_id[1], NULL, 0);
LE_INFO("[%s] reported event id 2", le_thread_GetMyName());
le_event_QueueFunction((le_event_DeferredFunc_t) postSemaphore, NULL, NULL);
// queue the function that execute le_sem_Post() to worker thread's Event Queue
// to make sure the semaphore will be posted after the le_event_RunLoop() had been executed
le_event_RunLoop();
return NULL;
}
static void deferredFunc(void *param1Ptr, void *param2Ptr)
{
LE_INFO("[%s] inside deferredFunc, Msg: %s", le_thread_GetMyName(), (char *) param1Ptr);
}
COMPONENT_INIT
{
semaphore = le_sem_Create("MySemaphore", 0);
for (int i = 0; i < EVENT_NUM; ++i)
{
snprintf(name, sizeof(name), "MyEvent%d", i + 1);
event_id[i] = le_event_CreateId(name, 0);
}
threadRef = le_thread_Create("WorkerThread", threadMainFunc, NULL);
le_thread_SetJoinable(threadRef);
le_event_QueueFunctionToThread(threadRef, (le_event_DeferredFunc_t) deferredFunc, "Queued function1 before calling le_thread_Start", NULL);
LE_INFO("[%s] just queued function1 to WorkerThread", le_thread_GetMyName());
le_thread_Start(threadRef);
LE_INFO("[%s] just called le_thread_Start", le_thread_GetMyName());
le_event_QueueFunctionToThread(threadRef, (le_event_DeferredFunc_t) deferredFunc, "Queued funtion2 after calling le_thread_Start", NULL);
LE_INFO("[%s] just queued function2 to WorkerThread", le_thread_GetMyName());
le_event_QueueFunction((le_event_DeferredFunc_t) deferredFunc, "Queued function3 to main thread to see when to be executed", NULL);
LE_INFO("[%s] just queued function3 to itself", le_thread_GetMyName());
LE_INFO("[%s] is about to wait for a semaphore to make sure the worker thread finish calling le_event_AddHandler", le_thread_GetMyName());
le_sem_Wait(semaphore);
LE_INFO("[%s] just finished waiting for the semaphore", le_thread_GetMyName());
le_sem_Delete(semaphore);
le_event_Report(event_id[2], NULL, 0);
LE_INFO("[%s] reported event id 3", le_thread_GetMyName());
LE_INFO("[%s] before calling le_thread_Join", le_thread_GetMyName());
le_thread_Join(threadRef, NULL);
}
```
#### Result

從執行結果可以看到:
- Event 2 和 Event 3 的 handler function 都有被執行,所以只要是訂閱完 event handler 之後,不論是否已執行 `le_event_RunLoop`,若 event 有被 report,則之後執行了 `le_event_RunLoop` 後這些 event 的 handler 都會被執行。
- 只要 worker thread 被 create,不論是否已執行 `le_thread_Start`,只要將 function 放到該 worker thread 的 Event Queue,則執行 `le_event_RunLoop` 後都會執行這些 queued functions。
- 放到 main thread Event Queue 的 function 在最後才被執行,所以 main thread 會等到 `COMPONENT_INIT` 內的 function 都 return 後才會執行自己的 Event Loop。
此外,若將 worker thread 內執行 le_event_RunLoop 這段刪掉,則執行結果如下:

可以看到 workerThread 執行完 `threadMainFunc` 後就直接呼叫 destructor 了,無論是 queued function 或 event handler 都沒有被執行。
所以若想要 worker thread 處理 Event Queue 的東西,一定要呼叫 `le_event_RunLoop`。
## 設定 Event Data 以及 Handler Context
我們可以設定:
- 有時候當一個 Event 發生時,可能會有一些跟這個 Event 有關的 data 要被處理,我們可以自己定義這個 Event 被 report 時有哪些 data,然後將這些 data 傳給 handler function 處理。
- 我們也可以傳送 context 物件到 event handler 裡面使用。
:::info
Legato 官方文件中,使用動詞的 report 來表示發佈某個 Event,也用名詞的 report 來表示要被傳給 handler function 處理的 data,我覺得用名詞的 report 來表示傳給 handler function 處理的 data 有點容易混淆,所以我這裡都是用 Event Data 來表示這個要傳給 handler function 處理的 data
:::
### 注意事項
- Event data 是跟著 event。
- 當 event 被 report,就會將 event data ++**COPY**++ 一份傳給 handler function,所以 handler function 內對 event data 做的事都++不會++影響到原本的 event data 物件。
- ++這個被 copy 而傳給 handler function 的 event data,其生命週期僅在 handler function 內有效,當 handler function return 後,它就會消失++。
- ++所以若要 keep 這個 data 的話,要自己在 handler function 內複製一份++。
- Handler context 是跟著 handler。
- Handler context 並++不會++被 copy 一份傳到 handler function 內,++而是直接 reference 到原本的 context 物件++,++所以在 handler function 內對 context 做的修改,都會影響到原本的 context 物件++。

### Example
以下例子,有兩個 Event,這兩個 Event 的 report data 都有 `int32_t n1` 和 `int32_t n2`。
另外,有兩個 handler function,一個 handler function 會將 `n1` 和 `n2` 相加,並儲存到 handler `context` 的 `int32_t res` 變數裡面,最後在 handler function 內將 `n1` 和 `n2` 都設定為 0。
另一個 handler function 則是將 `n1` 和 `n2` 相減,並將結果儲存到其 `context` 的 `int32_t res` 變數裡面,最後也會在 handler function 內將 `n1` 和 `n2` 都設定為 0。
#### Project Structure
```
.
├── eventLoop2App.adef
└── eventLoopComp
├── Component.cdef
└── eventLoop.c
```
#### `Component.cdef`
```cdef
sources:
{
eventLoop.c
}
```
#### `eventLoop.c`
```cpp
#include "legato.h"
#include "interfaces.h"
#define EVENTDATA_EVENTID_SIZE (10)
#define CONTEXT_HANDLERNAME_SIZE (10)
#define CONTEXT_MSG_SIZE (100)
static le_thread_Ref_t workerThreadRef = NULL;
static le_thread_DestructorRef_t workerThreadDestructorRef = NULL;
static le_event_Id_t event1, event2;
static le_event_HandlerRef_t handler1Ref = NULL,
handler2Ref = NULL;
static le_sem_Ref_t semaphore[2] = {NULL};
typedef struct
{
int32_t n1;
int32_t n2;
char *event_id;
} MyEventData_t;
typedef struct
{
int32_t res;
int32_t called_time;
char *handler_name;
char *msg;
} MyHandlerContext_t;
void handler1 (void *reportPtr)
{
MyHandlerContext_t *myHandlerContextPtr = (MyHandlerContext_t *) le_event_GetContextPtr();
MyEventData_t *myEventDataPtr = (MyEventData_t *) reportPtr;
LE_INFO("[%s] inside %s with %s reported", le_thread_GetMyName(), myHandlerContextPtr->handler_name, myEventDataPtr->event_id);
LE_INFO(" Address of the Event Data: %p", (void *) myEventDataPtr);
LE_INFO(" Address of the Handler Context: %p", (void *) myHandlerContextPtr);
myHandlerContextPtr->res = (myEventDataPtr->n1 + myEventDataPtr->n2);
++myHandlerContextPtr->called_time;
free(myHandlerContextPtr->msg);
myHandlerContextPtr->msg = (char *) malloc(sizeof(char) * CONTEXT_MSG_SIZE);
snprintf(myHandlerContextPtr->msg, (sizeof(char) * CONTEXT_MSG_SIZE), "event ID: %s, n1(%d) + n2(%d) = result(%d)",
myEventDataPtr->event_id, myEventDataPtr->n1, myEventDataPtr->n2, myHandlerContextPtr->res);
myEventDataPtr->n1 = myEventDataPtr->n2 = 0;
}
void handler2 (void *reportPtr)
{
MyHandlerContext_t *myHandlerContextPtr = (MyHandlerContext_t *) le_event_GetContextPtr();
MyEventData_t *myEventDataPtr = (MyEventData_t *) reportPtr;
LE_INFO("[%s] inside %s with %s reported", le_thread_GetMyName(), myHandlerContextPtr->handler_name, myEventDataPtr->event_id);
LE_INFO(" Address of the Event Data %p", (void *) myEventDataPtr);
LE_INFO(" Address of the Handler Context: %p", (void *) myHandlerContextPtr);
myHandlerContextPtr->res = (myEventDataPtr->n1 - myEventDataPtr->n2);
++myHandlerContextPtr->called_time;
free(myHandlerContextPtr->msg);
myHandlerContextPtr->msg = (char *) malloc(sizeof(char) * CONTEXT_MSG_SIZE);
snprintf(myHandlerContextPtr->msg, (sizeof(char) * CONTEXT_MSG_SIZE), "event ID: %s, n1(%d) - n2(%d) = result(%d)",
myEventDataPtr->event_id, myEventDataPtr->n1, myEventDataPtr->n2, myHandlerContextPtr->res);
myEventDataPtr->n1 = myEventDataPtr->n2 = 0;
le_sem_Post(semaphore[1]);
}
void workerThreadDestructor(void *context)
{
LE_INFO("[%s] inside worker thread destructor", le_thread_GetMyName());
}
void *workerThreadFunc(void *context)
{
// set event and its event handler, also making worker thread execute the event handler when the event is reported
handler1Ref = le_event_AddHandler("handler1Ref", event1, handler1);
handler2Ref = le_event_AddHandler("handler2Ref", event2, handler2);
// add a destructor to this worker thread
workerThreadDestructorRef = le_thread_AddDestructor(workerThreadDestructor, NULL);
le_sem_Post(semaphore[0]);
le_event_RunLoop();
return NULL;
}
COMPONENT_INIT
{
semaphore[0] = le_sem_Create("MySemaphore1", 0);
semaphore[1] = le_sem_Create("MySemaphore2", 0);
// create events
event1 = le_event_CreateId("event1", sizeof(MyEventData_t));
event2 = le_event_CreateId("event2", sizeof(MyEventData_t));
// set event data for event1 and event2
MyEventData_t event1_data = {.n1 = 300, .n2 = 200, .event_id = NULL};
event1_data.event_id = (char *) malloc(sizeof(char) * EVENTDATA_EVENTID_SIZE);
strcpy(event1_data.event_id, "Event1");
MyEventData_t event2_data = {.n1 = 500, .n2 = 100, .event_id = NULL};
event2_data.event_id = (char *) malloc(sizeof(char) * EVENTDATA_EVENTID_SIZE);
strcpy(event2_data.event_id, "Event2");
LE_INFO("[%s] address of event1_data: %p", le_thread_GetMyName(), (void *) &event1_data);
LE_INFO("[%s] address of event2_data: %p", le_thread_GetMyName(), (void *) &event2_data);
// create thread and set its thread function
workerThreadRef = le_thread_Create("WorkerThread", workerThreadFunc, NULL);
// start threads
le_thread_Start(workerThreadRef);
// wait for the worker thread main function to finish
le_sem_Wait(semaphore[0]);
// set the context of event handler1 and handler 2
MyHandlerContext_t handler1_context = {.res = 0, .called_time = 0, .handler_name = NULL, .msg = NULL};
handler1_context.handler_name = (char *) malloc(sizeof(char) * CONTEXT_HANDLERNAME_SIZE);
strcpy(handler1_context.handler_name, "Handler1");
handler1_context.msg = (char *) malloc(sizeof(char) * 50);
strcpy(handler1_context.msg, "This is the message of handler1 context");
MyHandlerContext_t handler2_context = {.res = 0, .called_time = 0, .handler_name = NULL, .msg = NULL};
handler2_context.handler_name = (char *) malloc(sizeof(char) * CONTEXT_HANDLERNAME_SIZE);
strcpy(handler2_context.handler_name, "Handler2");
handler2_context.msg = (char *) malloc(sizeof(char) * 50);
strcpy(handler2_context.msg, "This is the message of handler2 context");
LE_INFO("[%s] address of handler1_context: %p", le_thread_GetMyName(), &handler1_context);
LE_INFO("[%s] address of handler2_context: %p", le_thread_GetMyName(), &handler2_context);
le_event_SetContextPtr(handler1Ref, (void *) &handler1_context);
le_event_SetContextPtr(handler2Ref, (void *) &handler2_context);
// le_event_Report will MAKE A COPY of the event data to the handler
le_event_Report(event1, &event1_data, sizeof(event1_data));
le_event_Report(event2, &event2_data, sizeof(event2_data));
// wait for the worker thread to finish the event handler
le_sem_Wait(semaphore[1]);
// check the final value of event1_data and event2_data
LE_INFO("[%s] checking final values of Event Data and Handler Context", le_thread_GetMyName());
LE_INFO("[%s] address of event1_data: %p, event1_data.n1: %d, event1_data.n2: %d", le_thread_GetMyName(), (void *) &event1_data, event1_data.n1, event1_data.n2);
LE_INFO("[%s] address of event2_data: %p, event2_data.n1: %d, event2_data.n2: %d", le_thread_GetMyName(), (void *) &event2_data, event2_data.n1, event2_data.n2);
// check the final value of context
LE_INFO("[%s] address of handler1_context: %p, handler1_context.res: %d, handler1_context.called_time: %d, handler1_context.msg: \'%s\'", le_thread_GetMyName(), (void *) &handler1_context, handler1_context.res, handler1_context.called_time, handler1_context.msg);
LE_INFO("[%s] address of handler2_context: %p, handler2_context.res: %d, handler2_context.called_time: %d, handler2_context.msg: \'%s\'", le_thread_GetMyName(), (void *) &handler2_context, handler2_context.res, handler2_context.called_time, handler2_context.msg);
// release dynamically allocated resources
free(event1_data.event_id);
free(event2_data.event_id);
free(handler1_context.handler_name);
free(handler1_context.msg);
free(handler2_context.handler_name);
free(handler2_context.msg);
// terminate the worker thread
le_thread_Cancel(workerThreadRef);
}
```
#### Result

從結果可以看到:
- handler function 內印出的 event data 的記憶體位址跟 main 裡面印出來的結果不一樣,且 handler function 裡面將 n1 和 n2 都設定為 0,但並不影響到 main 裡面的 event data 的值。
- 在 handler context 的部份,handler function 內印出的 context 的記憶體位址就跟 main 裡面印出來的位址一樣,且在 handler function 內對 context 做的修改,在 main 裡面也會看到相同的變動。