# 6-1. Advanced Usage of Events - MA
- For more details of searching and emitting event, please refer to ++[Agent List](https://hackmd.io/Uj-tC7l9Q82VyGr8R5PL2Q?view#Agent-List)++
- Also, you might need a reference table for the variable differences when it comes to searching and emitting events. ++[Event Fields Comparison Table](https://hackmd.io/N2Km6-8kRI2tMR5X4tRHeQ?view)++
---
In the previous course, we have showcased the use case of Lackeside Oasis Café to demonstrate how to emit events (as well as searching events) and show some practical usages through events via LOC. In this course, we will extend that use case and introduce you a more advanced usage of events.
In this topic, we extend the "MA" use case that we have mentioned in the previous topics, so as to make you master in events. One of the merits of LOC as a framework is to allow developers to easily maintain highly readable events when implementing the operations or services. For the reason that, integration and auditable lineage can be designed and developed in one framework, which might require more than two softwares/system tools to implement in the past.
In this topic, we will reuse the MA example and extend the data process and logics in order to achieve these things below:
- search events
- read API header
- set "Async" mode API
- verify the basic authentication in REST API
:::spoiler **<font color=indigo>Prologue Before LOC Setup</font>**
## Before LOC Setup

We have did it on purpose to separate the application design and the event design and skipped the "Reference" when we mentioned the whole DP design before, to make us more familiar with logic coding and deployment process at first.
But this time, we take a deep dive into **Reference (Events)**, which is the design of Event in LOC. Through this course, you will know how to analyse the important things/values that we should keep in a DP through the BA/SA process, then design and implement them into logics, so that users can understand the data usage, flow, context or lineage in a more readable way.
These important parts of Event design will be discussed further in the "Event Design" topic. Let's assume that we have completed the BA/SA process and have the requirements and spec to start development.
Additionally, we would like to emphasise **Reference (Events)** that we skipped before. Since LOC can simultaneously realise logic operations and emit events, we know that in the designing stage of BA/SA, users can freely leave corresponding events for later use (such as aggregation, new services, auditing, etc).
Therefore, with **Reference (Events)**, users can address the data issue we mentioned in the previous courses when it comes to digital transformation; for the reason that, events can keep the flexibility of data usage and data lineage in sequence.
:::
### Preparation: Emit MA as event
In the data process of querying MA, we additionally inserted the emit event function to keep the corresponding events in this process, to record the MA of which stock is calculated and kept in meta.
Therefore, in this example, we imagine that when users want to query the MA index of particular stock, especially when time series information is required, these required MA information can be aggregated through querying events if existed. In this way, the further query does not need to query DB every time, hence realising the separation of reading and writing.
This design can map back to CQRS pattern in LOC that we mentioned before. Through using events as indicators, users do not need to read the database repeatedly when querying MA data, which can effectively relieve the loading of the original system.
**++Flow Analysis++**

Therefore, when calculating MA, we always leave events to keep the MA index. The event schema is designed to be like:
```
SourceDID: [MA Window (20,50 or 100)]
TargetDID: [Stock Code]
LabelName: [Calculate MA]
Meta: [Details of MA]
```
Here we have some variables to set. "sourceDID", "targetDID", "labelName" and "meta", the "type" can be used as a categorical field to group events for maintenance.
:::spoiler **<font color=indigo>Details of Emitting_MA_event Logic</font>**
```javascript=
export async function run(ctx) {
// checking input is from http api
if ("event" in ctx.payload || "messageQueue" in ctx.payload) {
throw new Error("this logic accept http payload only");
}
// a function that transforms byte array to string
function UTF8ArrToStr(aBytes) {
let utf8decoder = new TextDecoder();
return utf8decoder.decode(new Uint8Array(aBytes));
}
// parse payload body and check payload is as defined
const getHttpBodyJson = JSON.parse(UTF8ArrToStr(ctx.payload.http.body));
const data_payload = getHttpBodyJson;
const days = parseInt(data_payload.days ?? "0");
const stock_code = data_payload.stock_code ?? "";
if (stock_code == ""){
throw "please enter the correct stock code."
}
if (days !== 20 && days !== 50 && days !== 100) {
throw "days should be 20 | 50 | 100";
}
// parse header BASIC authrization if existed, or it will be Unknown
const headers = ctx.payload.http.headers;
const authorization = headers["authorization"];
let decoded_auth = ""
let account = ""
if (authorization !== undefined && authorization.startsWith("Basic")){
decoded_auth = atob(authorization.substring(6));
account = decoded_auth.split(":")[0];
await ctx.agents.sessionStorage.putJson("authorization", {
account: account,
});
} else {account = "Unknown"}
ctx.agents.logging.info(`Account: ${account}`);
// MySQL authorized access
const database = process.env.db1_database;
const table = process.env.db1_table4;
let connection = new Saffron.Database.MySqlParameters({
host: process.env.db1_host,
port: parseInt(process.env.db1_port ?? "3306"),
database,
username: process.env.db1_username,
password: process.env.db1_password,
});
let db = await ctx.agents.database?.connect({
databaseDriver: Saffron.Database.Driver.MySql,
connection,
});
// sql query
let sql = `SELECT stock_code, close, volume, date \
FROM ${table} \
WHERE stock_code = '${stock_code}' \
ORDER BY date DESC \
LIMIT ${days};`
// calculate MA
let resp = await db?.query(sql, []);
let data = [];
let numerator_ma = 0;
let denominator_ma = days;
resp?.rows.forEach((row) => {
numerator_ma += Number(row["close"]);
})
let ma = Number(numerator_ma) / Number(denominator_ma);
// push data and put into result
data.push({stock_code: stock_code, MA: ma, Numerator: numerator_ma, Denominator: denominator_ma, user: account});
await ctx.agents.sessionStorage.putJson("result", data)
// emit event after calculation success
await ctx.agents.eventStore.emit([
{
sourceDID: `${days}MA`,
targetDID: `Stock_Code: ${stock_code}`,
labelName: `calculate_${days}MA` ,
meta: `${ma}`,
type: `default`,
},
]);
}
export async function handleError(ctx, error) {
ctx.agents.logging.error(error.message);
}
```
:::
Here, we can deploy its API with ++**Async**++ mode, since the data of the next "Query MA data process" will be searched in the event, we do not need to wait this part to be done before executing the next step.
:::spoiler **<font color=indigo>API Configuration of Emitting MA Event</font>**
```yaml=
method: POST
mode: Async
encapsulation: true
name: emitMAevent
path: /workshop/emitMAevent
dataProcessPids:
- pid: 00000000-0000-0000-0000-000000000000
revision: latest
```
:::
This time, we are going to create a new data process to search these emitted events and aggregate them as a response that we need.
**++Data Process Analysis++**

## Step 1. Initiate New Data Process Template File
In the beginning, let us start the whole journey by
```
loc login
```
And
```
loc new [template name]
```
:::info
Once again, please use a special/unique template name for yourselves,
such as `John_Search_MA`.
:::
## Step 2. Edit API Route Yaml File
After designing and planning, we would know the API path (shown in the graph below) so that we can set it up in the `api-route-config.yaml` first.
We design that users will bring a basic authentication token when calling the API. Usually in the design of the HTTP API, the authentication token is always placed in header to ensure security.

- API Route: You need to
(1) change to **POST** method to bring payload to the generic logic to parse values;
(2) use **Sync** mode
(3) rename the API route name that you favour, such as **emitUserQueryMA**;
(4) create your own path, such as **/workshop/searchMA**;
and leave the rest unchanged.
:::spoiler **<font color=indigo>Details of api-route-config.yaml</font>**
```yaml=
method: POST
mode: Sync
encapsulation: true
name: emitUserQueryMA
path: /workshop/searchMA
dataProcessPids:
- pid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
revision: latest
```
:::
## Step 3. Create Logics
### Generic Logic 1: Parse API Payload
Firstly, we need to parse the API payload input so as to drive this entire search MA data process.
We would parse "days" (20, 50 or 100 days) and "stock_code" from the request body and check whether these 2 values are correctly parsed.
**Additionally, we would also parse the "Basic Auth" information to see which user calls the API to get MA.**
:::spoiler **<font color=indigo>1st Generic Logic</font>**
```javascript=
export async function run(ctx) {
// checking input is from http api
if ("event" in ctx.payload || "messageQueue" in ctx.payload) {
throw new Error("This logic accepts http payload only.");
}
// a function that transforms byte array to string
function UTF8ArrToStr(aBytes) {
let utf8decoder = new TextDecoder();
return utf8decoder.decode(new Uint8Array(aBytes));
}
// parse payload body and check payload is as defined
const getHttpBodyJson = JSON.parse(UTF8ArrToStr(ctx.payload.http.body));
const data_payload = getHttpBodyJson;
const days = parseInt(data_payload.days ?? "0");
const stock_code = data_payload.stock_code ?? "";
if (stock_code == "") {
throw "Please enter a correct stock code."
}
if (days !== 20 && days !== 50 && days !== 100) {
throw "days should be 20 | 50 | 100";
}
// parse header BASIC authrization if existed, or it will be Unknown
const headers = ctx.payload.http.headers;
const authorization = headers["authorization"];
let decoded_auth = ""
let account = ""
if (authorization !== undefined && authorization.startsWith("Basic")) {
decoded_auth = atob(authorization.substring(6));
account = decoded_auth.split(":")[0];
await ctx.agents.sessionStorage.putJson("authorization", {
account: account,
});
} else { account = "Unknown" }
ctx.agents.logging.info(`Account: ${account}`);
await ctx.agents.sessionStorage.putJson("data_payload", data_payload)
await ctx.agents.sessionStorage.putJson("days", days)
await ctx.agents.sessionStorage.putJson("stock_code", stock_code)
await ctx.agents.sessionStorage.putJson("account", account)
}
export async function handleError(ctx, error) {
ctx.agents.logging.error(error.message);
}
```
:::
### Generic Logic 2: Search MA Events
Now, we're going to search MA information from the event store.
:::warning
Please bear in mind that only when MA events are emitted to the event store, will we be able to search them. As for how to emit MA events, please refer to our earlier section [Before LOC Setup](https://hackmd.io/TMq4QqMbSZqIrUzcwE0S9w?view#Before-LOC-Setup).
:::
According to this graph, the search keywords for events are from the API payload. By using the search event function
`ctx.agents.eventStore.search()`,
we can query the aggregation of MA results from the event store.

:::spoiler **<font color=indigo>2nd Generic Logic</font>**
```javascript=
export async function run(ctx) {
// get data from session storage
const days = await ctx.agents.sessionStorage.get("days");
const stock_code = await ctx.agents.sessionStorage.get("stock_code");
// search events with multiple "fields" which is equals to "value"
let search_event = {
queries: [
{
Match: {
field: "label_name",
value: `calculate_${days}MA`,
},
},
{
Match: {
field: "target_digital_identity",
value: `Stock_Code: ${stock_code}`,
},
},
],
excludes: [],
filters: [],
from: 0,
size: 1000,
sorts: [],
};
let event_resp = await ctx.agents.eventStore.search(search_event);
let get_events = event_resp?.events
// read metadata (order items) from the events and save in ma_data array
let ma_data = [];
get_events.forEach(maEvent => {
const maItem = JSON.parse(maEvent.meta);
ma_data.push(maItem);
});
// write the order items into session storage
await ctx.agents.sessionStorage.putJson("ma_data", ma_data);
}
export async function handleError(ctx, error) {
ctx.agents.logging.error(error.message);
}
```
:::
### Generic Logic 3: Emit Authentication Event
In this generic logic, we are going to record into event store who requests for what MA data.
Therefore, with the auth information that comes with the http request header, it is the requester info that we need.
In the previous courses, we have repeatedly mentioned that the event schema contains these major factors: `Source DID / Target DID /Label Name / Meta / Timestamp`
So here is the **event schema** we will use later:
```
SourceDID: [Stock Code]
TargetDID: [User Info/ID]
LabelName: [Query MA Index]
Meta: [MA Data]
```
With the event schema in mind, let's apply it in this generic logic.
:::spoiler **<font color=indigo>3rd Generic Logic</font>**
```javascript=
export async function run(ctx) {
const data_payload = await ctx.agents.sessionStorage.get("data_payload");
const days = await ctx.agents.sessionStorage.get("days");
const stock_code = await ctx.agents.sessionStorage.get("stock_code");
const account = await ctx.agents.sessionStorage.get("account");
const ma_data = await ctx.agents.sessionStorage.get("ma_data");
// emit event
await ctx.agents.eventStore.emit([
{
sourceDID: `Stock_Code: ${stock_code}`,
targetDID: `User: ${account}`,
labelName: `query_${days}MA`,
meta: `${ma_data}`,
type: 'default',
}
]);
}
export async function handleError(ctx, error) {
ctx.agents.logging.error(error.message);
}
```
:::
### Aggregator Logic
Now we are going to compile a result/output in the aggregator logic where the data we need can be retrieved from the event store and transmitted from the API payload. All we need to do next is to use `ctx.agents.sessionStorage` to get these data back.
:::spoiler **<font color=indigo>Details of Aggregator Logic</font>**
```javascript=
export async function run(ctx) {
const stock_code = await ctx.agents.sessionStorage.get("stock_code");
const ma_data = await ctx.agents.sessionStorage.get("ma_data");
const get_events = await ctx.agents.sessionStorage.get("get_events");
let result = {
Status: 200,
Message: 'Query for MA Successful!',
Stock_Code: stock_code,
MA_Data: ma_data,
};
ctx.agents.result.finalize(result);
}
export async function handleError(ctx, error) {
ctx.agents.logging.error(error.message);
let result = {
status: 500,
errorMessage: `An error occurs when calling API. Error: ${error.message}`,
};
ctx.agents.result.finalize(result);
}
```
:::
### Edit Configuration
In this example, we have three generic logics, but we need to edit config.yaml to add one more generic logic because there are only two in the original config.yaml.
:::info
In fact, we can have more generic logics in response to the situation of code decoupling for further maintenance. Just remember to make corresponding settings in config.yaml.
:::
- Configuration: Edit `config.yaml`
(1) rename the configuration name that you favour, such as **emitUserQueryMA**;
(2) rename the logic names that you favour, such as **generic-*-emitUserQueryMA.js**;
(3) add a new **"genericLogics"**.
:::spoiler **<font color=indigo>Details of config.yaml</font>**
```yaml=
version: 0.1.0
name: emitUserQueryMA
description: description
timeoutSeconds: 180
aggregatorLogic:
name: aggregator-emitUserQueryMA
file: aggregator-logic.js
genericLogics:
- name: generic-1-emitUserQueryMA
file: 1.js
- name: generic-2-emitUserQueryMA
file: 2.js
- name: generic-3-emitUserQueryMA
file: 3.js
```
:::
## Step 4. Deploy Data Process and API Route
Once the setting above is completed, we can deploy "emitUserQueryMA" ++data process++ and ++API route++ at the same time by using command:
`loc deploy [template name] -ar`.
Here we use
`loc deploy emitUserQueryMA -ar`.


Now they have been successfully deployed to LOC. When users query MA through the API, LOC will record the access log information in the event store.
## Step 5. Trigger API Route with Payload
Now we can get MA data through triggering the [API route we set up previously](#Step-2-Edit-API-Route-yaml-file).
Just like before, here we use ++Postman++ to demonstrate.
- Select ```POST``` and use the endpoint path we set in ```api-route-config.yaml``` with domain URL.
Afterwards, use the JSON payload as below to get the data that we need:
++**payload**++
```json=
{
"days":50,
"stock_code":"1301"
}
```
- Switch to `Authorization` and select **Basic Auth** to key in your username and password.

- Click `Send`, and you can expect the response of the `MA` index with `stock_code:1301` and other additional information.

:::info
ADDITIONAL
:::spoiler **<font color=Indigo>If API in **Async** Mode</font>**

Even this data process has successfully executed and emitted the search event, the API response will return **202** as accepted, meaning *the request has been accepted to process, and you can examine the event results to check if the process has been completed.*
:::
By looking into events (shown in the [next section](#Demonstration-of-Events-Visualisation)), we know that users successfully obtain the information - MA index of a specific stock_code.
In addition to returning MA data, we can simultaneously record the whole querying process as events.
And these events can be extended to more applications for further use.
## Demonstration of Events Visualisation
In this example, we design two kinds of events:
- MA data of a specific stock_code,
- the record when a user queries the MA.
Events that store relevant MA information in the the event store help us build a time-series graph/dataset through some tools for further analysis, audit or usage.
Here, we use Neo4j and Kibana as examples to demonstrate how to visualise events.
### Neo4j
With graphical visualization tools such as Neo4j, we can grasp the context of each event in a graphical way, and accurately filter out the key events of our interest.
Through this graph, we can see
(1) which stock_code has been used to calculate MA, and
(2) which users have accessed the information.
Hence, when admin/users intend to audit/review the access log of MA data, they can filter the events to detect abnormalities or suspicion in a highly readable way.
**++Relationship between Events:++**

**++Particular Events:++**

### Kibana
With the BI tools or Data discovery tools such as Kibana, we also can discover the event details and show relative dashboard that we want to analyse.
Through the graph, we can see
(1) the event details that we keep in LOC, which we select "query_50MA" as the keyword to display the events we have.
(2) the dashboard that shows the conclusive data of which users query the 50MA of particular stock.
**++Events Discovery:++**

With these data, we can get more insights from the results. For example, we can see that there are lots of **Unknown users who do not call API to access MA data with qualified tokens (Basic Auth)**, and thus we can follow up on these unknown records for further tracking and auditing.
**++Event Analysis Dashboard:++**

---
###### tags: `Workshop`