Try   HackMD

[Legato] Asynchronous Server

https://docs.legato.io/latest/apiFilesC.html#apiFilesC_asyncServer
https://docs.legato.io/latest/defFilesCdef.html#defFilesCdef_providesApiAsync

Legato 預設上,提供 API 的 server 端是同步的 server,也就是 client 端呼叫了 API,server 端會依序處理其接收到的 client 端的 API 請求。

若 API 的執行會花很多時間,且要 return 執行的結果給 client 端,這樣會花很多時間。

倘若將 server 端設定成非同步,則當 server 的 main thread 每接收到 client 端的一個 API 請求,我們就可以把這個 API 請求轉給 worker thread 執行,待 worker thread 完成後,再通知 main thread,main thread 再把執行結果回傳給 client 端。

這樣當 worker thread 在執行 API 時,main thread 可以繼續接收其他 client 端的 API 請求、並再轉發給其他 worker thread 處理,而不會被卡住。

下圖是簡單的示意圖,有兩個 client,分別是 client1 和 client2。client1 先呼叫 API1,之後 client2 才呼叫 API2,而 API1 的執行時間會比 API2 的執行時間長許多。
若是 sync server,client2 的 API2 請求必須等到 client1 的 API1 請求做完,才會執行 API2;但若是 async server 的話,client2 很快就可以得到 API2 執行的結果。

Image Not Showing Possible Reasons
  • The image was uploaded to a note which you don't have access to
  • The note which the image was originally uploaded to has been deleted
Learn More →

PS. 使用 async server 的話,client 端呼叫 API 後仍會卡在那邊直到該 API return 喔。

https://docs.legato.io/latest/apiFilesC.html#apiFilesC_asyncServer

There are two alternatives to implement the server-side functionality.

The default case is where each server-side function has the same interface as the client-side function. The server-side function takes the IN parameters, and returns the OUT parameters and function result when the function exits.

In the async-server case, the server-side function doesn't necessarily return the OUT parameters and function result when it exits. Instead, there's a separate Respond function for each server-side function. The OUT parameters and function result are returned by passing these values to the Respond function. The Respond function can be called at any time, normally after the server-side function has exited.

Regardless of how the server-side functions are implemented, the client-side function waits until the OUT parameters and function result are returned.
無論 server 端是用哪種方式實作,client 端都會等到 OUT 參數或 function return

The async-server functionality is not enabled by default. Enable it by using the .cdef provides [async].

https://docs.legato.io/latest/defFilesCdef.html#defFilesCdef_providesApiAsync

The server of a service can also implement the functions as if they were called directly by the client (even though the client may be running inside another process). When the client calls an API function, the server's API function gets called, and when the server returns from the function, the function returns in the client process.

Sometimes the server needs to hold onto the client request and do other things (like handing requests from other clients in the meantime) before sending a response back. This is called asynchronous mode, and is enabled using the [async] keyword on the end of the api section entry:

provides:
{
    api:
    {
        bar.api [async]
    }
}

When asynchronous mode is enabled for a server-side interface, the generated code changes as follows:

  • commandRef parameter is added to the beginning of all the API functions' parameter lists.
  • return value is removed from every API function.
  • Respond() function is generated for every API function.

In async mode, the server responds to the client's call to API function F() by calling the associated FRespond() function.

The Respond functions all take the commandRef as their first parameter. If an API function has a return value, that return value is sent to the client through the second parameter of the Respond function. Any output parameters defined in the API function are also passed as parameters to the Respond function.

See API Files for more information, or try it and have a look at the generated header files.

設定 Asynchronouns Server

若我們想將某個 .api 設定成非同步,則需要在 server 的 Component.cdefprovides section 的 api section 將該 API 標記成 [async],如下:

provides:
{
    api:
    {
        bar.api [async]
    }
}

[async] 的效果

接下來來看將 API 標示成 [async] 後,對其所產生的 C 語言的 function prototype 會有什麼變化~

假設 .api 裡面定義了以下兩個 API 的 interface:

FUNCTION function_a
(
    int32 a IN,
    int32 b IN,
    int32 result OUT
);

FUNCTION int32 function_b
(
    int32 in_parameter IN,
    int32 out_parameter OUT
);

function_a 有三個參數,ab 都是 IN 參數,result 則是 OUT 參數(也就是 result 是 pass by address 的方式傳到 function 內,並在 function 內修改 result 的值),然後 return void

function_b 則有兩個參數,in_parameterIN 參數,out_parameter 則是 OUT 參數,最後還會 return 一個 int_32

若沒設定為非同步 server 的話,預期在 C 語言的 function prototype 會是如下:

void <interfaceName>_function_a(int32_t a, int32_t b, int32_t *result);

int32_t <interfaceName>_function_b(int32_t in_parameter, int32_t *out_parameter);

但使用 [async] 後,function prototype 會有所變動。

執行完 mkapp -t simulation yourApp.adef 後,查看 _build_yourApp/simulation/api/5362441aefbd3b1670a884ba65439c29/async_server/<interface>_server.h(以下僅節錄部份):

//--------------------------------------------------------------------------------------------------
/**
 * Server-side respond function for server_interface_function_a
 */
//--------------------------------------------------------------------------------------------------
void server_interface_function_aRespond
(
    server_interface_ServerCmdRef_t _cmdRef,
    int32_t result
);

//--------------------------------------------------------------------------------------------------
/**
 * Prototype for server-side async interface function
 */
//--------------------------------------------------------------------------------------------------
void server_interface_function_a
(
    server_interface_ServerCmdRef_t _cmdRef,
    int32_t a,
    int32_t b
);

//--------------------------------------------------------------------------------------------------
/**
 * Server-side respond function for server_interface_function_b
 */
//--------------------------------------------------------------------------------------------------
void server_interface_function_bRespond
(
    server_interface_ServerCmdRef_t _cmdRef,
    int32_t _result,
    int32_t out_parameter
);

//--------------------------------------------------------------------------------------------------
/**
 * Prototype for server-side async interface function
 */
//--------------------------------------------------------------------------------------------------
void server_interface_function_b
(
    server_interface_ServerCmdRef_t _cmdRef,
    int32_t in_parameter
);

可以看到實際的 function prototype 有了以下的變化:

  • return type 的部分,可以看到即使像 function_b 原本設定 return int32,但全部的 function 都變成 return void
  • 每個 function 都多出了一個各自的 Respond function,像是 function_a 就多出了 function_aRespondfunction_b 就多出了 function_bRespond
    • 當 server 完成某個 API 請求、要 return 給 client 了,這時候就是呼叫該 API 的 Respond function
    • 若有 return value,則 return value 會是 Respond function 的第二個參數,後面則依序是被設定為 OUT 的參數。
  • 每個 function 的第一個參數都變成 <interface>_ServerCmdRef_t _cmdRef

所以 sync server 跟 async server 最大的差別在於:async server 的 API 是透過呼叫 Respond function 來 return,代表我們可以把事情丟給一個 worker thread 來專門處理這個 API 的事情,等到 worker thread 做完之後,再呼叫 Respond function 來 return。

PS. 但是要注意的是 Respond function 只能由 main thread 呼叫,不可由 worker thread 呼叫,否則執行時會出現錯誤訊息:

Sep  5 06:20:29 simulation user.emerg TelAF: *EMR* | asyncServer[8234]/framework T=FuncBWorkerThread | messagingSession.c msgSession_SendMessage() 1637 | Attempt to send by thread that doesn't own session 'server_interface_alias_name'.

所以我們可以讓 worker thread 做完事情後發出 event,然後 main thread 執行這個 event 的 handler,並且在該 handler 裡面呼叫該 API 的 Respond function。

sync server 如果沒有要 return 的話,應該也是可以丟給 worker thread 處理事情;但如果要 return 的話,main thread 基本上就一定要等待 worker thread 做完事情(像是使用 le_thread_Join 等 API),才能取得 worker thread 的結果,最後才 return。這樣 main thread 仍會被卡在那邊,沒辦法趁 worker thread 在做事的時候先去做其他事。

My Async Server Example

以下此例子,server 提供兩個 API,function_afunction_b:

  • function_a 會睡 10 秒。
  • function_b 不會睡覺。
  • 並且開兩個 worker threads 分別處理這兩個 API 做的事情。

client1 先呼叫 function_a,然後 client2 才呼叫 function_b

sync server 的狀況,由於 client1 先呼叫了 function_a,所以一定會睡完 10 秒,結束後才會執行 function_b

async server,把 function_a 睡十秒的事情丟到 worker thread 裡面處理,所以 worker thread 睡 10 秒。把事情丟給 worker thread 後,main thread 就可以繼續做事,當 main thread 接收到 client2 使用 function_b 的請求,main thread 就把事情丟給專門處理 function_b 的 worker thread。所以預期 client2 可以先得到結果,client1 則必須等 10 秒才能得到結果。

Project Structure

cpt1020@Ubuntu1804:~/asyncServer$ tree
.
├── asyncServer
│   ├── asyncServerApp.adef
│   ├── asyncServerComp
│   │   ├── Component.cdef
│   │   └── server.c
│   └── server_api.api
├── client1
│   ├── asyncServer_client1App.adef
│   └── client1Comp
│       ├── client1.c
│       └── Component.cdef
└── client2
    ├── asyncServer_client2App.adef
    └── client2Comp
        ├── client2.c
        └── Component.cdef

6 directories, 10 files

Async Server App

asyncServer/asyncServerApp.adef

executables:
{
    asyncServer = ( asyncServerComp )
}
 
processes:
{
    run:
    {
        ( asyncServer )
    }
}
 
extern:
{
    server_interface_alias_name = asyncServer.asyncServerComp.server_interface
}

asyncServer/server_api.api

FUNCTION function_a
(
    int32 a IN,
    int32 b IN,
    int32 result OUT
);

FUNCTION function_b
(
    int32 a IN,
    int32 b IN,
    int32 result OUT
);

asyncServer/asyncServerComp/server.c

#include "legato.h"
#include "interfaces.h"

static le_thread_Ref_t       funcAWorkerThreadRef  = NULL;
static le_thread_Ref_t       funcBWorkerThreadRef  = NULL;
static le_event_Id_t         funcAReturnEvent;
static le_event_Id_t         funcBReturnEvent;
static le_event_HandlerRef_t funcAReturnHandlerRef = NULL;
static le_event_HandlerRef_t funcBReturnHandlerRef = NULL;

typedef struct
{
    int32_t n1;
    int32_t n2;
    server_interface_ServerCmdRef_t _cmdRef;
} FunctionData_t;

static FunctionData_t funcA_data = {.n1 = 0, .n2 = 0, ._cmdRef = NULL};
static FunctionData_t funcB_data = {.n1 = 0, .n2 = 0, ._cmdRef = NULL};

void function_a_handler(void *funcA_data_ptr, void *paramPtr)
{
    LE_INFO("inside function_a_handler, executed by %s, ready to sleep for 10 seconds", le_thread_GetMyName());
    sleep(10);
    LE_INFO("after sleeping for 10 seconds");
    le_event_Report(funcAReturnEvent, funcA_data_ptr, sizeof(funcA_data));
}

void function_a_mainThreadReturnEventHandler(void *reportPtr)
{
    LE_INFO("inside function_a_mainThreadReturnEventHandler, executed by %s", le_thread_GetMyName());
    FunctionData_t *functionDataRef = (FunctionData_t *) reportPtr;
    LE_INFO("ready to return function_a");
    server_interface_function_aRespond(functionDataRef->_cmdRef, functionDataRef->n1 + functionDataRef->n2);
}

void server_interface_function_a(server_interface_ServerCmdRef_t _cmdRef, int32_t a, int32_t b)
{
    LE_INFO("inside function_a, thread name: %s", le_thread_GetMyName());
    funcA_data._cmdRef = _cmdRef;
    funcA_data.n1 = a;
    funcA_data.n2 = b;
    le_event_QueueFunctionToThread(funcAWorkerThreadRef, function_a_handler, &funcA_data, NULL);
}

void function_b_handler(void *funcB_data_ptr, void *paramPtr)
{
    LE_INFO("inside function_b_handler, executed by %s", le_thread_GetMyName());
    le_event_Report(funcBReturnEvent, funcB_data_ptr, sizeof(funcB_data));
}

void function_b_mainThreadReturnEventHandler(void *reportPtr)
{
    LE_INFO("inside function_b_mainThreadReturnEventHandler, executed by %s", le_thread_GetMyName());
    FunctionData_t *functionDataRef = (FunctionData_t *) reportPtr;
    LE_INFO("ready to return function_b");

    server_interface_function_bRespond(functionDataRef->_cmdRef, functionDataRef->n1 + functionDataRef->n2);
}

void server_interface_function_b(server_interface_ServerCmdRef_t _cmdRef, int32_t a, int32_t b)
{
    LE_INFO("inside function_b, thread name: %s", le_thread_GetMyName());
    funcB_data._cmdRef = _cmdRef;
    funcB_data.n1 = a;
    funcB_data.n2 = b;
    le_event_QueueFunctionToThread(funcBWorkerThreadRef, function_b_handler, &funcB_data, NULL);
}

void *funcAWorkerThreadFunc(void *context)
{
    le_event_RunLoop(); 
    return NULL;
}

void *funcBWorkerThreadFunc(void *context)
{
    le_event_RunLoop(); 
    return NULL;
}

COMPONENT_INIT
{
    // creating events
    funcAReturnEvent = le_event_CreateId("funcAReturnEvent", sizeof(funcA_data));
    funcBReturnEvent = le_event_CreateId("funcBReturnEvent", sizeof(funcB_data));
    
    // setting events and their event handlers, also making main thread execute the event handler when the event is reported
    funcAReturnHandlerRef = le_event_AddHandler("funcAReturnHandler", funcAReturnEvent, function_a_mainThreadReturnEventHandler);
    funcBReturnHandlerRef = le_event_AddHandler("funcBReturnHandler", funcBReturnEvent, function_b_mainThreadReturnEventHandler);

    // creating worker threads and setting their thread functions
    funcAWorkerThreadRef = le_thread_Create("FuncAWorkerThread", funcAWorkerThreadFunc, NULL);
    funcBWorkerThreadRef = le_thread_Create("FuncBWorkerThread", funcBWorkerThreadFunc, NULL);

    // starting worker threads
    le_thread_Start(funcAWorkerThreadRef);
    le_thread_Start(funcBWorkerThreadRef);
}

asyncServer/asyncServerComp/Component.cdef

provides:
{
    api:
    {
        server_interface = server_api.api [async]
    }
}
sources:
{
    server.c
}

Client1 App

client1/asyncServer_client1App.adef

executables:
{
    client1 = ( client1Comp )
}
 
processes:
{
    run:
    {
        ( client1 )
    }
}
 
bindings:
{
    client1.client1Comp.client_interface -> asyncServerApp.server_interface_alias_name   
}

client1/client1Comp/client1.c

#include "legato.h"
#include "interfaces.h"

COMPONENT_INIT
{
    LE_INFO("client1 calling server function_a");
    int32_t sum = 0;
    LE_INFO("client1 send 10 and 5 to function_a");
    client_interface_function_a(10, 5, &sum);
    LE_INFO("client1 received sum from function_a: %d", sum);
}

client1/client1Comp/Component.cdef

requires:
{
    api:
    {
        client_interface = server_api.api       
    }
}
sources:
{
    client1.c
}

Client2 App

client2/asyncServer_client2App.adef

executables:
{
    client2 = ( client2Comp )
}
 
processes:
{
    run:
    {
        ( client2 )
    }
}
 
bindings:
{
    client2.client2Comp.client_interface -> asyncServerApp.server_interface_alias_name   
}

client2/client2Comp/client2.c

#include "legato.h"
#include "interfaces.h"

COMPONENT_INIT
{
    LE_INFO("client2 calling server function_b");
    int32_t sum = 0;
    LE_INFO("client2 send 20 and 10 to function_b");
    client_interface_function_b(20, 10, &sum);
    LE_INFO("client2 received sum from function_b: %d", sum);
}

client2/client2Comp/Component.cdef

requires:
{
    api:
    {
        client_interface = server_api.api       
    }
}
sources:
{
    client2.c
}

Result

以下僅列出重要的 log:

# client1 在 06:21:49 時先呼叫了 function_a,並且開始睡覺
Sep  5 06:21:49 simulation user.info TelAF:  INFO | client1[8374]/client1Comp T=main | client1.c _client1Comp_COMPONENT_INIT() 6 | client1 calling server function_a
Sep  5 06:21:49 simulation user.info TelAF:  INFO | client1[8374]/client1Comp T=main | client1.c _client1Comp_COMPONENT_INIT() 8 | client1 send 10 and 5 to function_a
Sep  5 06:21:49 simulation user.info TelAF:  INFO | asyncServer[8352]/asyncServerComp T=main | server.c server_interface_function_a() 39 | inside function_a, thread name: main
Sep  5 06:21:49 simulation user.info TelAF:  INFO | asyncServer[8352]/asyncServerComp T=FuncAWorkerThread | server.c function_a_handler() 23 | inside function_a_handler, executed by FuncAWorkerThread, ready to sleep for 10 seconds

# client2 在 06:21:49 時呼叫了 function_b,並在同一秒內就得到了結果
Sep  5 06:21:49 simulation user.info TelAF:  INFO | client2[8394]/client2Comp T=main | client2.c _client2Comp_COMPONENT_INIT() 6 | client2 calling server function_b
Sep  5 06:21:49 simulation user.info TelAF:  INFO | client2[8394]/client2Comp T=main | client2.c _client2Comp_COMPONENT_INIT() 8 | client2 send 20 and 10 to function_b
Sep  5 06:21:49 simulation user.info TelAF:  INFO | asyncServer[8352]/asyncServerComp T=main | server.c server_interface_function_b() 63 | inside function_b, thread name: main
Sep  5 06:21:49 simulation user.info TelAF:  INFO | asyncServer[8352]/asyncServerComp T=FuncBWorkerThread | server.c function_b_handler() 48 | inside function_b_handler, executed by FuncBWorkerThread
Sep  5 06:21:49 simulation user.info TelAF:  INFO | asyncServer[8352]/asyncServerComp T=main | server.c function_b_mainThreadReturnEventHandler() 54 | inside function_b_mainThreadReturnEventHandler, executed by main
Sep  5 06:21:49 simulation user.info TelAF:  INFO | asyncServer[8352]/asyncServerComp T=main | server.c function_b_mainThreadReturnEventHandler() 56 | ready to return function_b
Sep  5 06:21:49 simulation user.info TelAF:  INFO | client2[8394]/client2Comp T=main | client2.c _client2Comp_COMPONENT_INIT() 10 | client2 received sum from function_b: 30

# 處理 function_a 的 worker thread 在 06:21:59 起床了
# 並且 client1 在 06:21:59 收到結果
Sep  5 06:21:59 simulation user.info TelAF:  INFO | asyncServer[8352]/asyncServerComp T=FuncAWorkerThread | server.c function_a_handler() 25 | after sleeping for 10 seconds
Sep  5 06:21:59 simulation user.info TelAF:  INFO | asyncServer[8352]/asyncServerComp T=main | server.c function_a_mainThreadReturnEventHandler() 31 | inside function_a_mainThreadReturnEventHandler, executed by main
Sep  5 06:21:59 simulation user.info TelAF:  INFO | asyncServer[8352]/asyncServerComp T=main | server.c function_a_mainThreadReturnEventHandler() 33 | ready to return function_a
Sep  5 06:21:59 simulation user.info TelAF:  INFO | client1[8374]/client1Comp T=main | client1.c _client1Comp_COMPONENT_INIT() 10 | client1 received sum from function_a: 15