---
tags: CERN, Invenio
---
# Invenio-WTFlow :question:
## Why?
Run workflows indempontently on the server side upon or during a submission process. Giving the user visibility of the status and also the possiblity to interact with the tasks, i.e. Start/Stop/Restart.
## Why not Celery?
The reporting celery does if [efemeral](https://docs.celeryproject.org/en/latest/userguide/configuration.html#result-expires) and can cuase problems. And if set to persist on the DB can cause quite an overhead.
On top of that celery it's not very user friendly when it comes to reporting workflow status.
## Why not a workflow engine like Airflow?
Integration of any existing workflow engine with Invenio is complitated. They usually require their DB, workers, etc. which will in turn introduce new services to manage and possible uneccesary service duplication
## Proposed solution
Use a combination of celery workers and tasks together with a couple of custom objects: **Workflow** and **Task**.
```mermaid
classDiagram
CeleryTask <|-- Task: extends
Task "1..*" o.. Workflow
class Workflow {
- UUID id
- str name
- dict args_kwargs: default args
+ status()
+ start(*args, **kwargs)
+ stop(*args, **kwargs)
+ restart(*args, **kwargs)
}
class Task {
- UUID id
- UUID workflow_id
- UUID _id: internal, celery task ID
- dict args_kwargs
- str name
- str status
- str message
~ start(workflow_id, id_=None)
+ status()
+ stop(*args, **kwargs)
+ restart(*args, **kwargs)
+ update_state()
+ on_failure()
+ on_success()
}
```
A new decorator will be implemented `@workflow_task` wrapping the one offered by Celery, setting, among other things, the base clase to the new `Task` class and the task name to be used in the future.
**NOTES**:
- Perhaps both models storing this classes should use *SQLAlchemyContinum* to save the different "runs" of the workflow and the tasks, if more than one.
- The default workflow class can handle most of the cases, if we want to group tasks differently than by name the we need to overwrite this class and create our own status property.
- Should we make them both records :question:
### Configuration and tasks definition
**NOTE**: Celery tasks still need to be registered with entry points like before.
```python
# config.py
****_WORKFLOWS = {
'video-processing': {
'workflow_class': 'cds.modules.deposit.workflow:VideoProcessing',
'permission_factory': '...' # we can split by action here
},
'pdf-plot-extraction': {
'workflow_deffinition': 'cds.modules.depost.workflow:pdf_plot_extraction'
}
}
****_PERMISSION_FACTORY = 'cds.modules.permissions.workflow_permission_factory:permission_factory'
# workflow_permission_factory.py
def permission_factory(workflow, action):
# Check what ever you need
# action might be things like: start/stop/restat-workflow/task
#workflow.py
pdf_plot_extraction=chain(task1, task2, task3)
class VideoProcessing(Workflow):
workflow = chain(group(task1, task2), task3)
@property
def status(self):
# Calculate status in a custom maner out of the task list
# workflow_tasks.py
from invenio_****.api import workflow_task
@workflow_task
def task1(param1, param2):
# Do whatever
```
### Worflow Flow
```mermaid
sequenceDiagram
participant Client
participant REST API
participant Workflow API
Client->>REST API: /api/worflow/ POST {name}
Note right of REST API: Maybe add optional<br/>start parameter?
REST API->>Workflow API: Create new Workflow
Workflow API->>Workflow API: Save new instance to DB
Workflow API-->> REST API: {uuid}
REST API-->> Client: {uuid}
Client->>REST API: /api/worflow/<:w_id:> POST {data}
REST API->>Workflow API: Workflow
Workflow API->>Workflow API: Locate instance on DB
Workflow API->>Workflow API: Find class/worflow from config
Workflow API->>Workflow API: Apply Async with default parameters + {data}
Workflow API-->> REST API: {status}
REST API-->> Client: {status}
Client->>REST API: /api/worflow/<:w_id:> GET
REST API->>Workflow API: Worflow status
Workflow API->>Workflow API: Locate instance on DB
Workflow API->>Workflow API: Find class/worflow from config if any
Workflow API->>Workflow API: Calculate status from tasks
Workflow API-->> REST API: {status}
REST API-->> Client: {status}
Client->>REST API: /api/worflow/<:w_id:> PUT {data}
REST API->>Workflow API: Worflow restart
Workflow API->>Workflow API: Locate instance on DB
Workflow API->>Workflow API: Stop all running tasks
Workflow API->>Workflow API: Reset all tasks statuses
Workflow API->>Workflow API: Apply Async with default parameters + {data}
Note right of Workflow API: Same UUID
Workflow API-->> REST API: {status}
REST API-->> Client: {status}
Client->>REST API: /api/worflow/<:w_id:> DEL
REST API->>Workflow API: Worflow stop
Workflow API-->> REST API: {status}
REST API-->> Client: {status}
```
### Tasks flow
```mermaid
sequenceDiagram
participant Client
participant REST API
participant Workflow API
Client->>REST API: /api/worflow/<:w_id:>/task/<:task_id:> GET
REST API->>Workflow API: Workflow single task status
Workflow API->>Workflow API: Locate instance on DB
Workflow API->>Workflow API: Find class/worflow from config if any
Workflow API->>Workflow API: Get tasks status
Workflow API-->> REST API: {status}
REST API-->> Client: {status}
Client->>REST API: /api/worflow/<:w_id:>/task/<:task_id:> PUT {data}
REST API->>Workflow API: Workflow task restart
Workflow API->>Workflow API: Locate workflow instance on DB
Workflow API->>Workflow API: Stop running task with ID if any
Workflow API->>Workflow API: Reset task status
Workflow API->>Workflow API: Apply Async the task with default parameters + {data}
Note right of Workflow API: Same UUID
Workflow API-->> REST API: {status}
REST API-->> Client: {status}
Client->>REST API: /api/worflow/<:w_id:>/task/<:task_id:> DEL
REST API->>Workflow API: Worflow task stop
Workflow API-->> REST API: {status}
REST API-->> Client: {status}
```
TODO:
- Pending tasks don't have an ID inside the DB so they don't appear on the status calculation (I think we can get this) :exclamation:
- Restarting a single tasks won't trigger the rest of the tasks down the line by default.