# Feature Lake
[TOC]
## Brain Function
> These are the BrainOS functions which takes a set of input features and result in some output features. They belong to one of category from AIX api, Rule Engine, Optimization Engine and ML/DL models.
### Two kind of functions
#### 1. Interpretation functions
* They will be used for feature extraction primarily from text, image, video feed, audio input
* Examples - Tweet, Image of crop, Feed from security camera etc
* Will be called from ingestion layer
* Primarily deep learning functions
* Generate features as output

#### 2. Inference functions
* They will be used for inferencing, prediction, decisions and optimization
* Will be called from feature lake
* It can be any of below :
-- Deep learning functions
-- Machine learning functions
-- Rules functions
-- Optimization functions
#### Naming convention
* /telecom/function/inference/customer_estimated_usage
* /telecom/function/inference/churn_prediction
* /commons/function/interpretation/text/tweet_sentiment
* /commons/function/interpretation/speech/speech_to_text
The reponsibility of brainOS function is to work as a bridge and take brainevent from BrainOS world and convert to desired input parameters for models and invoke them.
All functions will be onboarded from Brain Workbench and has the following config :
```protobuf
message BrainFunctionSchema {
jio.brain.proto.base.BrainToken function_token = 1;
jio.brain.proto.base.BrainFunctionType function_type = 2; // ML, DL, INTERPRETATION, RULE
string model_id = 3;
InvocationConfig invocation_config = 4;
string input_topic = 3;
string output_topic = 4;
FunctionParameterSet input = 5;
FunctionParameterSet output = 6;
map<string, string> function_context = 9;
}
message InvocationConfig {
string invocation_type = 1; // REST, GRPC, ..
oneof config {
GrpcConfig grpc = 2;
RestConfig rest = 3;
}
}
message GrpcConfig {
string host = 1;
string port = 2;
map<string, string> feature_request_mapping = 3;
map<string, string> feature_response_mapping = 4;
}
message RestConfig {
string endpoint = 1;
map<string, string> feature_request_mapping = 2;
map<string, string> feature_response_mapping = 3;
}
// Might be not needed going forward.
message FunctionParameterSet {
map<string, string> entities =1;
repeated string features = 2;
repeated string knowledge_attributes = 3;
}
````
| API | schema/function/create |
| -------- | -------- |
| API name | add_function |
| Description | create a function config and save to the db |
| Input | BrainFunctionSchema |
| Output | Success or Error ( already exists, bad request) |
| API | schema/function/get/{function_key} |
| -------- | -------- |
| API name | get_function |
| Description | get a function config from its key |
| Input | /telecom/function/inference/churn_rate_prediction |
| Output | BrainFunctionSchema or Error ( not found) |
| API | schema/function/getFeatures/{funtion_key} |
| -------- | -------- |
| API name | get_all_features |
| Description | get all input features for a function config from its key |
| Input | /telecom/function/inference/churn_rate_prediction |
| Output | FunctionParameterSet or Error ( not found) |
| API | schema/feature/getFunctions/{feature_key} |
| -------- | -------- |
| API name | get_all_functions |
| Description | get all functions for a given feature, as input in the function config |
| Input | feature_key (eg. /telecom/feature/churn_rate_score |
| Output | Set<FunctionConfig> or Error ( not found) |
#### Deployment
Functions will be deployed as running container on kubernetes. It will be deployed similar to other services in DAG.
```protobuf
message BrainProcessSchema {
jio.brain.proto.base.BrainToken token = 1; // function_name
string input_topic = 2; // output of ingestion
jio.brain.proto.event.BrainEventSchema input_event_schema = 3;
map<string, jio.brain.proto.event.BrainEventSchema> output_event_schema = 4; // key = output_topic, value = event key in dictionary
oneof config {
BrainEnrichProcessSchema enrich = 5;
BrainFilterProcessSchema filter = 6;
BrainPivotProcessSchema pivot = 7;
BrainReduceProcessSchema reduce = 8;
BrainIngestProcessSchema ingest = 9;
BrainQuantizeProcessSchema quantize = 10;
BrainComputeProcessSchema compute = 11;
BrainFunctionProcessSchema function = 12;
}
}
```
## Media Quantity
```protobuf
message BrainQuantity {
oneof quantity_is_one_of {
BrainAtomicQuantity atomic = 1;
BrainCompoundQuantity compound = 2;
BrainTemporalQuantity temporal = 3;
BrainSpatialQuantity spatial = 4;
BrainMediaQuantity media = 5;
}
}
message BrainMediaQuantity {
oneof media_is_one_of {
BrainTextQuantity text = 1;
BrainVideoQuantity video = 2;
BrainAudioQuantity audio = 3;
BrainImageQuantity image = 4;
}
}
message BrainTextQuantity {
//string encoding = 1;
//BrainLanguageType language = 2; // en, hn, sp
//BrainTextType text_type = 3; // tweet, review comment, chat
string text = 4; // actual content
}
message BrainPayload {
string source_uri = 1;
oneof payload_is_one_of {
string uri = 2;
bytes payload = 3;
string base64encoded = 4;
}
}
message BrainVideoQuantity {
BrainVideoFormat video_format = 1; // mpeg4, mov
BrainPayload payload = 2; // url or bytes
}
message BrainAudioQuantity {
BrainAudioFormat audio_format = 1; // mp3, wav
BrainPayload payload = 2; // url or bytes
}
message BrainImageQuantity {
BrainImageFormat format = 1; // png, jpeg
BrainPayload payload = 2; // url or bytes
}
message BrainMediaQuantitySchema {
oneof media_is_one_of {
BrainTextQuantitySchema text = 1;
BrainVideoQuantitySchema video = 2;
BrainAudioQuantitySchema audio = 3;
BrainImageQuantitySchema image = 4;
}
}
message BrainTextQuantitySchema {
BrainTextType default_text_type = 1; // tweet/chat/email
BrainLanguageType default_language = 2; // english/hindi
BrainTextEncoding default_encoding = 3; // utf16, utf8, ascii
}
message BrainVideoQuantitySchema {
BrainVideoFormat default_video_format = 1; //mpeg,mov
BrainPayloadType default_payload_type = 2;
}
message BrainAudioQuantitySchema {
BrainAudioFormat default_audio_format = 1; // wav,mp3
BrainPayloadType payload_type = 2;
}
message BrainImageQuantitySchema {
BrainImageFormat image_format = 1;
BrainPayloadType payload_type = 2;
}
```
> For rest see below feature lake
## Brain Feature
### What are features ?
Features are transactional/time-series data points which are input to ML models, Rule Engines, Optimisation engines. Features are atomic in nature, mostly numeric but it can be string also.
In BrainOS, features are either the outputs of pivot or reduce, or ouput of interpretation layer.
>This proto represents the format in which features are saved to the feature lake.
```protobuf
message BrainFeature {
uint32 feature = 1;
string feature_key = 2;
google.protobuf.Any feature_value = 3; // primitive type int, string, map
}
```
> Similar to functions, features will also be standardised and common to AIX, Rule Engine, Optimization Engine & 0ML
### Naming convention
* /telecom/feature/customer/daily_mute_call
* /agriculture/feature/crop/crop_disease
* /retail/feature/invoice/store_monthly_sales
* /commons/feature/sensor/text/sentiment
### How features will be onboarded ?
#### 1. Output of DAG
The BrainOS will treat DAG's leaf nodes output properties as features. Example, if reduce is leaf node of DAG then all its brain events will be treated as features by default.
#### 2. Output of AIX, Rule, Optimisation engines
While defining the Function config, the features are also defined if they don't exist. To define a feature you need a name and quantity type.
```protobuf
message BrainEventFeatureSchema {
jio.brain.proto.base.BrainToken token = 1;
jio.brain.proto.quantity.BrainQuantitySchema quantity = 2;
//optional jio.brain.proto.base.BrainToken unit = 3;
}
```
Using these two sources we have list of features as below:
| Feature | Process | Topic | Source | action |
| -------- | -------- | -------- |-------- |-------- |
| telecom/feature/lsr/call_experience| reduce_001 | topic_001| telecom/property/lsr/call_experience | edit/delete
| telecom/feature/lsr/call_duration| reduce_001 | topic_002| telecom/property/lsr/call_duration | edit/delete
| telecom/feature/lsr/churn_score| churn_prediction_model | topic_003| --- | edit/delete
```protobuf
message FeatureConfig {
string feature = 1;
string process = 2;
string topic = 3;
string source = 4;
jio.brain.proto.quantity.BrainQuantitySchema quantity = 2;
}
```
### BrainEvent for features
:::info
** Features will be part of BrainEvent only but as a separate field
:::
```protobuf
message BrainFeatureEventKeyValue {
BrainEventKey feature_key = 1;
BrainEventFeatureSetValue feature_value = 2;
}
message BrainEventFeatureSetValue {
BrainFeatureSetStore features = 1;
BrainEntityAttributeStore attributes = 2;
}
message BrainFeatureSetStore {
oneof feature {
jio.brain.proto.feature.BrainFeatureSet features = 1;
jio.brain.proto.feature.BrainFeatureFamily feature_family = 2; // TBD for usage in feature lake
}
}
/*
{ "telecom/feature/call_experience" : {
"telecom/feature/call_duration" : 100 ,
"telecom/feature/call_drop" : 4365,
"telecom/feature/call_noise" : 834 }
}
*/
```
## Feature Lake
Feature lake is time-series database which represent mapping of keys (entity x context) with all its related features in time-series form.
### Brain Key
* It is same key as for aggregation/pivot process in Brain OS DAG
* It uniquely defined one row in feature lake. Features will be calculated/stored against this key.
* This key will also have time series values associated with same features
```protobuf
message BrainEventKey {
BrainEntityStore entity_store = 1; // customer, cell-id
BrainQuantityStore context_store = 2; // time-stamp, lat-long
BrainAttributeStore attribute_store = 3; // only symbolic, binned e.g. Number of dairy products sold in store
BrainEntityPredicateStore predicate_store = 4; // Call time between Rajan and Shailesh TODO ??
BrainTimeWindowType window_type = 5; // hour, day, week, weekend etc.
}
```
> Window type above in BrainEventKey is a feature. Example - aggregated daily vs weekly time are two distinct features.
### Key generation from BrainKey
The above BrainEventKey will be transformed to SHA256 string. It is the primary key in feature lake.
```java
String sha3Key = new DigestUtils("SHA3-256").digestAsHex(brainEventKey.toByteArray());
```
Key will be stored as human-readable form in feature lake. The UI will be built over it to look inside key.
### Feature Lake Services
* Brain Function / Interpretation functions / Inference functions
* Repository Router
* Feature / Knowledge Repository
* Delta Service
* Schedular Service
* Function Resolver
* Feature Resolver
* Function Router

## Repository Router
The responsibility of repository router is to create feature and attribute update request protos and send it to the corresponding topic.
Feature update commands :
````protobuf
message FeatureCommand {
oneof command_is_one_of {
FeatureCreateCommand create = 1;
FeatureUpdateCommand update = 2; // TODO do we need it
FeatureDeleteCommand delete = 3;
}
}
message FeatureCreateCommand {
google.protobuf.Timestamp window_end_time = 1; // get from brain event meta data
jio.brain.proto.event.BrainEventKey event_key = 2; // get from brain event
map<uint32, jio.brain.proto.quantity.BrainQuantity> feature_set = 3;
}
message FeatureUpdateCommand {
google.protobuf.Timestamp window_end_time = 1; // get from brain event meta data
jio.brain.proto.event.BrainEventKey event_key = 2; // get from brain event
map<uint32, jio.brain.proto.quantity.BrainQuantity> feature_set = 3;
}
message FeatureDeleteCommand {
jio.brain.proto.event.BrainEventKey event_key = 1;
repeated uint32 id_set = 2;
}
`````
Knowledge Update commands :
````protobuf
message EntityCommand {
google.protobuf.Timestamp created_time = 1;
uint32 entity_type = 2;
oneof command_is_one_of {
NewEntityCommand new_entity = 4;
DeleteEntityCommand delete_entity = 5;
AddOrUpdateAttributeCommand attribute_set= 6;
DeleteAttributeCommand delete_attribute = 7;
AddOrUpdateAttributeQualifierCommand add_or_update_attribute_qualifier = 8;
DeleteAttributeQualifierCommand delete_attribute_qualifier = 9;
NewEntityPredicateCommand new_predicate = 10;
DeleteEntityPredicateCommand delete_predicate = 11;
AddOrUpdatePredicateQualifierCommand add_or_update_predicate_qualifier = 12;
DeletePredicateQualifierCommand delete_predicate_qualifier = 13;
}
}
message AddOrUpdateAttributeCommand {
uint64 entity_id = 1;
map<uint32, jio.brain.proto.quantity.BrainQuantity> attribute_set = 2;
}
````
### Feature Respository
Once receiving feature_update proto from the topic, feature repository will do following :
1. validate the request proto.
2. translate to primitive type
3. save to time-series database in a time-series manner
4. send to topic delta service is listening
```protobuf
message BrainFeature {
uint32 feature = 1;
string feature_key = 2;
google.protobuf.Any feature_value = 3; // primitive type int, string, map
}
```
```protobuf
message KeyFeatureMap {
string key = 1;
repeated string features = 2;
}
```
```protobuf
message FeatureCrudRequest {
FeatureCommand command = 1;
}
message FeatureCrudResponse {
jio.brain.proto.base.BrainStatus brainStatus = 1;
}
```
| API | CreateFeature |
| -------- | -------- |
| Description | Create , Update , Delete Features based on Command |
| Input | FeatureCrudRequest |
| Output | FeatureCrudResponse |
```protobuf
message FeatureKeyRequest {
jio.brain.proto.event.BrainEventKey event_key = 1;
}
message FeatureKeyResponse {
string feature_key = 1;
jio.brain.proto.base.BrainStatus brainStatus = 2;
}
```
| API | GenerateEventKey |
| -------- | -------- |
| Description | generate sha256 key from brainEvent key proto |
| Input | FeatureKeyRequest |
| Output | FeatureKeyResponse |
```protobuf
message FeatureKeyComponentsRequest {
string feature_key = 1;
}
message FeatureKeyComponentResponse {
jio.brain.proto.event.BrainEventKey event_key = 1;
jio.brain.proto.base.BrainStatus brainStatus = 2;
}
```
| API | keyComponents |
| -------- | -------- |
| Description | get features Key components |
| Input | FeatureKeyComponentsRequest |
| Output | FeatureKeyComponentResponse |
```protobuf
rpc get(FeatureGetRequest) returns (FeatureGetResponse);
message FeatureGetRequest {
string brain_key = 1;
uint32 feature_key = 2;
jio.brain.proto.base.BrainTimeWindowType window_time_type = 3;
}
message FeatureGetResponse {
uint32 feature_set = 1;
jio.brain.proto.quantity.BrainQuantity feature_values = 2;
google.protobuf.Timestamp window_time = 3;
jio.brain.proto.base.BrainStatus brainStatus = 4;
}
rpc getFeatureList(MultipleFeatureGetRequest) returns (MultipleFeatureGetResponse);
message MultipleFeatureGetRequest {
string feature_key = 1;
repeated uint32 feature_name_set = 2;
jio.brain.proto.base.BrainTimeWindowType window_time_type = 3;
google.protobuf.Timestamp window_start_time = 4;
google.protobuf.Timestamp window_end_time = 5;
int32 count = 6;
}
message MultipleFeatureGetResponse {
repeated FeaturesData feature_values = 1;
jio.brain.proto.base.BrainStatus brainStatus = 2;
}
message FeaturesData {
uint32 feature_key = 1;
repeated FeatureTimeSeriesValue feature_time_series_values = 2;
}
message FeatureTimeSeriesValue {
google.protobuf.Timestamp window_time = 1;
jio.brain.proto.quantity.BrainQuantity feature_value = 2;
}
```
| API | getFeatures |
| -------- | -------- |
| Description | get features based on Feature key |
| Input | FeatureGetRequest |
| Output | FeatureGetResponses |
## Delta Service
This service will consume data from feature update / attribute update against a key and save it in cache. Also provide it will provide getAllData and deleteAllData API’s.
```protobuf=
message KeyFeatureMap {
string key = 1;
repeated string features = 2;
}
```
| API | Kafka Consumer |
| -------- | -------- |
| API name | Consume Feature_Update_Topic And Attribute_Update_Topic |
| Description |kafka consumers to consume key and feature set from topics |
| Input | **DeltaCacheInput** |
| Output | BrainStatus |
| API | Add to Delta Cache |
| -------- | -------- |
| API name | addDeltaToCache |
| Description | Saving the key and feature set to a cache db (redis or mongo) where keys are unique|
| Input | **DeltaCacheDto** |
| Output | **DeltaCache** |
| API | Get All Delta |
| -------- | -------- |
| API name | getAllUnprocessedFeatures |
| Description | Its a 15 min scheduler which gets all the key and feature set |
| Input | |
| Output | List<**DeltaCache**> |
| API | Delete All Delta Processed|
| -------- | -------- |
| API name | deleteAllProcessed |
| Description | All the feature set which have processed should get deleted |
| Input | DeltaCacheInput |
| Output | true/false |
## Schedular Service + Function Resolver
The scheduler will run every 15 min and fetch all unprocessed data from cache and delete the cache data. It will process the fetched data. For Each key it will find which all functions required to be executed (where the updated feature is configured as Input Feature). At the end it will publish the Key_Function data to Feature_Resolver.
```protobuf=
message KeyFunction{
repeated KeyFunctionMap keyFunctionMap = 1;
}
message KeyFunctionMap {
uint32 key = 1;
repeated string function_name = 2;
}
```
| API | 15 Min Scheduler |
| -------- | -------- |
| API name | scheduler |
| Description | Runs every 15 min and processes Delta Cache Data |
| Input | |
| Output | |
| API | Get all DeltaCache Unprocessed Data |
| -------- | -------- |
| API name | getAllDeltaCacheData |
| Description | Call Delta cache service to get all unprocessed cache data |
| Input | |
| Output | List<**DeltaCache**> |
| API | Delete all DeltaCache Processed Data |
| -------- | -------- |
| API name | deleteDeltaCacheProcessedData |
| Description | Call Delta cache service to delet all processed cache data |
| Input | |
| Output | true/false |
| API | Function Resolver |
| -------- | -------- |
| API name | functionResolver |
| Description | For Each key find all functions required to be executed (where the updated feature is configured as Input Feature) |
| Input | List<**DeltaCache**> |
| Output | **KeyFunction** |
| API | Publish TO Feature Resolver |
| -------- | -------- |
| API name | publishKeyFunctionData |
| Description | Publish Key Function Map to Feature Resolver |
| Input | **KeyFunction** |
| Output | BrainStatus |
## Feature Resolver
This will consume Feature_Resolver data. For each Key it will fetch all Unique Input features/Attribute defined for the function and fetch Feature data from Feature repository and Attribute data from Knowledge Repository. For each function it will create a Brain Event and publish key_function_event data on topic. This will consume Feature_Resolver data. For each Key it will fetch all Unique Input features/Attribute defined for the function and fetch Feature data from Feature repository and Attribute data from Knowledge Repository. For each function it will create a Brain Event and publish key_function_event data on topic.
```protobuf=
message KeyFunctionEventSet{
Repeated KeyFunctionEvent keyFunctionEvent
}
message KeyFunctionEvent{
uint32 key = 1;
repeated FunctionEvent functionEvent = 2;
}
message FunctionEvent{
string function = 1;
BrainEvent brainEvent = 2;
}
```
| API | Kafka Consumer |
| -------- | -------- |
| API name | Consume keyFunctionMap from Feature_Resolver_Topic |
| Description | |
| Input | **KeyFunction** |
| Output | BrainStatus |
| API | Get Features for Function |
| -------- | -------- |
| API name | getFunctionFeatureMap |
| Description | |
| Input | String functionName |
| Output | List<String> featureNameList |
| API | Get Input Feature and Attribute for Function |
| -------- | -------- |
| API name | getFunctionFeatureAttribute |
| Description | |
| Input | String functionName |
| Output | List<String> featureNameList, List<String> attributeNameList |
| API | Get Feature value for key |
| -------- | -------- |
| API name | getFeatureForKey |
| Description | Get From Feature repository |
| Input | String key, String featureName |
| Output | BrainQuantity |
| API | Get Entity Attribute value for key |
| -------- | -------- |
| API name | getFeatureForKey |
| Description | Get From Knowledge repository |
| Input | String key, String attributeName, entityId |
| Output | BrainQuantity |
| API | Generate BrainEvent |
| -------- | -------- |
| API name | createBrainEvent |
| Description | Convert data in Brain event for each function for given key |
| Input | String key, String functionName, BrainQuantity value |
| Output | BrainEvent |
| API | Publish To Function Router |
| -------- | -------- |
| API name | publish data to Function_Executor topic |
| Description | Publish Key Function Event Set to Function Route |
| Input | **KeyFunctionEventSet** |
| Output | BrainStatus |
## Function Router
This will consume key_function_event data. It will read function config and send the event for the key on the input topic on function.
| API | Kafka Consumer |
| -------- | -------- |
| API name | Consume KeyFunctionEventSet from Function_Executor |
| Description | |
| Input | **KeyFunctionEventSet** |
| Output | BrainStatus |
| API | Get Function Config |
| -------- | -------- |
| API name | getFunctionConfig |
| Description | |
| Input | String functionName |
| Output | FunctionConfig |
| API | Exeute Function |
| -------- | -------- |
| API name | exeuteFunction |
| Description | For each key execute all the functions(Publish the Brain Event on input topic of that function) |
| Input | |
| Output | BrainStatus |