# Asynchronous Event/Task Processing
## Context
## Example Diagram
https://mermaid-js.github.io/mermaid-live-editor/#/edit/eyJjb2RlIjoiZ3JhcGggVEQ7XG4gICAgQ29tcGxldGVkQXBwIC0tPiBBZGRSZXNwb25zZUZvcm1zXG4gICAgQWRkUmVzcG9uc2VGb3JtcyAtLT4gQ2FsY3VsYXRlQW5kQWRkTVFMU2NvcmVcbiAgICBDYWxjdWxhdGVBbmRBZGRNUUxTY29yZSAtLT58UXVhbGlmaWVkfCBDb252ZXJ0VG9Db250YWN0XG5cdENhbGN1bGF0ZUFuZEFkZE1RTFNjb3JlIC0tPnxVbnF1YWxpZmllZHwgRW5kXG4gICAgQ29udmVydFRvQ29udGFjdCAtLT4gQ3JlYXRlT3Bwb3J0dW5pdHlcbiAgICBDcmVhdGVPcHBvcnR1bml0eSAtLT4gQ3JlYXRlQ29udGFjdEluRGlhbHBhZFxuICAgIENyZWF0ZUNvbnRhY3RJbkRpYWxwYWQgLS0-IENyZWF0ZUFjdGl2aXR5SW5TYWxlc2ZvcmNlQ29udGFjdENyZWF0ZWRcbiAgICBDcmVhdGVPcHBvcnR1bml0eSAtLT4gQXNzb2NpYXRlT3Bwb3J0dW5pdHlUb0NhbXBhaWduXG4gICAgQ3JlYXRlT3Bwb3J0dW5pdHkgLS0-IEFzc2lnbk93bmVyT2ZPcHBvcnR1bml0eVxuICAgIENyZWF0ZUNvbnRhY3RJbkRpYWxwYWQgLS0-IFNlbmRTTVNUb0RpYWxwYWRDb250YWN0XG4gICAgU2VuZFNNU1RvRGlhbHBhZENvbnRhY3QgLS0-IENyZWF0ZUFjdGl2aXR5SW5TYWxlc2ZvcmNlU01TU2VudFxuICAgIEFzc2lnbk93bmVyT2ZPcHBvcnR1bml0eSAtLT4gRW5kXG4gICAgQ3JlYXRlQWN0aXZpdHlJblNhbGVzZm9yY2VTTVNTZW50IC0tPiBFbmRcbiAgICBBc3NvY2lhdGVPcHBvcnR1bml0eVRvQ2FtcGFpZ24gLS0-IEVuZFxuICAgIENyZWF0ZUFjdGl2aXR5SW5TYWxlc2ZvcmNlQ29udGFjdENyZWF0ZWQgLS0-IEVuZCIsIm1lcm1haWQiOnsidGhlbWUiOiJkZWZhdWx0In0sInVwZGF0ZUVkaXRvciI6ZmFsc2V9
## Notes
- Decompose work into smaller units
- Fault tolerant in the right way
- Debuggable
- Replayable
- even for successful ones, because our code was incorrect
- Idempotent
- Management dashboard
- Integrate with Zapier for extensibility (notifications)
- Pass minimum amount of state/context to future tasks
## Questions
- Events or tasks?
- Dependency management: we shouldn't record a "Welcome SMS Sent" activity in salesforce before the task to create the dialpad SMS is sent, but we also shouldn't re-send the SMS if recording the activity fails
- We would need some way to track task state and outcomes. I don't think there's a way to mark that a welcome SMS has been sent in Dialpad. We don't want to accidently send it twice.
# Brainstorming vvvvvv
## Architecture
- We need some form of ingress to handle incoming requests.
- The ingress should also act as a dispatch. It should read the request data and transform it into appropriate tasks, with task dependencies defined.
- We can (potentially) cache data between functions with something like Redis, to improve performance
- How we can able to manage the dependencies in a mono-repo? In Javascript stack we can use Lerna.
### Flow
When a new request is made, an ingress function will read the request data and extract the arguments. The function will determine the corresponding workflow to take (based on endpoint? something else?), and construct a tree of subtasks that represent the actions in the workflow. Once the subtask tree is built, a master task representing the entire workflow will be created and execute the subtask tree.
This allows us to monitor the progress of the request as a whole, while also allowing us to monitor and replay any portion of the workflow that may fail.
Transitions between subtasks will occur through callbacks, ideally using a mechanism built-in to the task queue that we select. Callbacks provide a non-blocking way to manage tasks, freeing up the worker queue to perform other tasks while waiting on the results from the running task.
Once a subtask is complete, it will invoke the next subtask in the tree. Output data from the subtask can be passed to the next subtask to allow for more efficient state sharing (while still being mindful of our requirements to be able to run subtasks manually).
Each external service call should be represented by an individual subtask. This will allow us to track the results of each execution, replay the specific service call (with specific parameters already set) in the event of failure, and have further code execution paused until
### Considerations
- If we want to invoke subtasks manually, we should think of a way to further define groups. If we use the assumption that each external service call is it's own task, then a single "action" (from the end-user point of view) can consist of multiple tasks. For example, sending a welcome SMS consists of four tasks:
- Create contact in Dialpad
- Record "Contact Created" activity in Salesforce
- Send welcome SMS to user
- Record "SMS Sent" activity in Salesforce
To represent this, we can define a task group, "Send Welcome", which builds a tree consisting of those four subtasks. The master task will then represent that group of tasks, and allow us to monitor the execution of the group as a whole.
### Task Structure
Individual tasks should aim to be self-contained, and perform a single task. It should return structured output which can then be transformed into the input for another task.
> Make each program do one thing well
> Expect the output of every program to become the input to another, as yet unknown, program
A process can then be built that is composed of individual tasks. This gives us flexibility to quickly reorganize the process and quickly insert/remove tasks at arbitrary points in the pipeline.
#### Example Tasks
- Record Salesforce Activity
| Input | Output |
| ----------- | --------- |
| contact_id | success |
| subject | record_id |
| description | |
- Send SMS
| Input | Output |
| ------- | ------ |
| phone | success |
| message | |
- Get Salesforce Contact
| Input | Output |
| ---------- | ---------- |
| contact_id | first_name |
| | last_name |
| | email |
| | phone |
### Task Composition
Once task inputs and outputs have been defined, we can declaratively define a pipeline composed of individual tasks. It would work similarly to how Zapier works, but fine-tuned for our business process with far more control over individual execution.
#### Task Composition Example
In this example, `data` comes from json data passed into the ingress.
```
workflow:
steps:
- task: get_salesforce_contact
id: salesforce_contact
input:
contact_id: data.salesforce_contact_id
- task: create_dialpad_contact
id: create_dialpad_contact
input:
first_name: salesforce_contact.first_name
last_name: salesforce_contact.last_name
phone: salesforce_contact.phone
- task: record_salesforce_activity
id: record_contact_creation
input:
contact_id: data.salesforce_contact_id
subject: Contact created
description: Contact has been created in Dialpad
- task: send_sms
id: send_sms
input:
phone: salesforce_contact.phone
message: "Welcome, {{ salesforce_contact.first_name }}, your application has been submitted!"
- task: record_salesforce_activity
id: record_welcome_sms
input:
contact_id: data.salesforce_contact_id
subject: Introduction SMS sent
description: An introductory SMS has been sent
zapier:
zap: write_to_spreadsheet
input:
event: Introduction SMS sent
first_name: salesforce_contact.first_name
last_name: salesforce_contact.last_name
output:
first_name: salesforce_contact.first_name
last_name: salesforce_contact.last_name
```
With an input of:
```
{
"salesforce_contact_id": "abc"
}
```
The output will be:
```
{
"first_name": "Test",
"last_name": "Last"
}
```
## Endpoint Routing
### Ingress
Our current ingress uses Google Cloud Functions to process requests. We can continue using that, but there are some downsides:
- Each endpoint needs to be predefined with a deployment specific to that endpoint.
- Deployment of functions takes a few minutes. As we increase the number of deployments, the time to deploy will increase linearly.
Custom ingress
Moving to a more custom ingress (such as with Flask/Django) will allow for dynamic endpoint creation, and much faster deployment times (as there would only be a single deployment).
Endpoint and functions documentation.
### Task Scheduling
If we decide to use a non-cloud-based task scheduler, we will most likely need to setup some form of VM/containers infrastructure to run individual tasks.
## Dashboard Options
(in no particular order)
- Graphana + Prometheus for fine-grained views of process metrics (how many SMSs were sent, how many applications were processed, how many Salesforce API requests were made). Can also monitor health of our task framework (ex: https://grafana.com/grafana/dashboards/10026)
- [Celery Dashboard](https://github.com/mehdigmira/celery-dashboard)
- [Flower](https://flower.readthedocs.io/en/latest/) [Celery Dashboard]
## Options
Decision by Wednesday
### Drag-and-Drop Automation Engines
- Zapier
- Automate.io
- Workato
### Task Queues
#### Celery
- Supports task chaining (building a dependency tree of tasks) https://docs.celeryproject.org/en/latest/userguide/canvas.html#canvas-chain
- Configurable automatic retries
#### RQ
- Does not support job dependencies: https://github.com/rq/rq/issues/260
#### GCloud tasks service
- Cloud based
- https://cloud.google.com/tasks
#### AWS SQS + AWS Step functions
- Cloud based
- For this we will need to move every thing to AWS :/
- Worth to take a look at
- https://aws.amazon.com/getting-started/hands-on/orchestrate-microservices-with-message-queues-on-step-functions/
### Open Source Workflow Engines
- Prefect
- Airflow (has some issues)
- https://docs.prefect.io/core/getting_started/why-not-airflow.html#overview
### Github actions
- Set up a Github Action to deploy website and functions in differents instances when something it will be merged to master.
## Future
- Salesforce API has a request limit per day. While we aren't close to hitting it now, we should keep that in mind as we scale. Currently we make >10 requests per submitted application.