--- tags: CERN, Invenio --- # Invenio-WTFlow :question: ## Why? Run workflows indempontently on the server side upon or during a submission process. Giving the user visibility of the status and also the possiblity to interact with the tasks, i.e. Start/Stop/Restart. ## Why not Celery? The reporting celery does if [efemeral](https://docs.celeryproject.org/en/latest/userguide/configuration.html#result-expires) and can cuase problems. And if set to persist on the DB can cause quite an overhead. On top of that celery it's not very user friendly when it comes to reporting workflow status. ## Why not a workflow engine like Airflow? Integration of any existing workflow engine with Invenio is complitated. They usually require their DB, workers, etc. which will in turn introduce new services to manage and possible uneccesary service duplication ## Proposed solution Use a combination of celery workers and tasks together with a couple of custom objects: **Workflow** and **Task**. ```mermaid classDiagram CeleryTask <|-- Task: extends Task "1..*" o.. Workflow class Workflow { - UUID id - str name - dict args_kwargs: default args + status() + start(*args, **kwargs) + stop(*args, **kwargs) + restart(*args, **kwargs) } class Task { - UUID id - UUID workflow_id - UUID _id: internal, celery task ID - dict args_kwargs - str name - str status - str message ~ start(workflow_id, id_=None) + status() + stop(*args, **kwargs) + restart(*args, **kwargs) + update_state() + on_failure() + on_success() } ``` A new decorator will be implemented `@workflow_task` wrapping the one offered by Celery, setting, among other things, the base clase to the new `Task` class and the task name to be used in the future. **NOTES**: - Perhaps both models storing this classes should use *SQLAlchemyContinum* to save the different "runs" of the workflow and the tasks, if more than one. - The default workflow class can handle most of the cases, if we want to group tasks differently than by name the we need to overwrite this class and create our own status property. - Should we make them both records :question: ### Configuration and tasks definition **NOTE**: Celery tasks still need to be registered with entry points like before. ```python # config.py ****_WORKFLOWS = { 'video-processing': { 'workflow_class': 'cds.modules.deposit.workflow:VideoProcessing', 'permission_factory': '...' # we can split by action here }, 'pdf-plot-extraction': { 'workflow_deffinition': 'cds.modules.depost.workflow:pdf_plot_extraction' } } ****_PERMISSION_FACTORY = 'cds.modules.permissions.workflow_permission_factory:permission_factory' # workflow_permission_factory.py def permission_factory(workflow, action): # Check what ever you need # action might be things like: start/stop/restat-workflow/task #workflow.py pdf_plot_extraction=chain(task1, task2, task3) class VideoProcessing(Workflow): workflow = chain(group(task1, task2), task3) @property def status(self): # Calculate status in a custom maner out of the task list # workflow_tasks.py from invenio_****.api import workflow_task @workflow_task def task1(param1, param2): # Do whatever ``` ### Worflow Flow ```mermaid sequenceDiagram participant Client participant REST API participant Workflow API Client->>REST API: /api/worflow/ POST {name} Note right of REST API: Maybe add optional<br/>start parameter? REST API->>Workflow API: Create new Workflow Workflow API->>Workflow API: Save new instance to DB Workflow API-->> REST API: {uuid} REST API-->> Client: {uuid} Client->>REST API: /api/worflow/<:w_id:> POST {data} REST API->>Workflow API: Workflow Workflow API->>Workflow API: Locate instance on DB Workflow API->>Workflow API: Find class/worflow from config Workflow API->>Workflow API: Apply Async with default parameters + {data} Workflow API-->> REST API: {status} REST API-->> Client: {status} Client->>REST API: /api/worflow/<:w_id:> GET REST API->>Workflow API: Worflow status Workflow API->>Workflow API: Locate instance on DB Workflow API->>Workflow API: Find class/worflow from config if any Workflow API->>Workflow API: Calculate status from tasks Workflow API-->> REST API: {status} REST API-->> Client: {status} Client->>REST API: /api/worflow/<:w_id:> PUT {data} REST API->>Workflow API: Worflow restart Workflow API->>Workflow API: Locate instance on DB Workflow API->>Workflow API: Stop all running tasks Workflow API->>Workflow API: Reset all tasks statuses Workflow API->>Workflow API: Apply Async with default parameters + {data} Note right of Workflow API: Same UUID Workflow API-->> REST API: {status} REST API-->> Client: {status} Client->>REST API: /api/worflow/<:w_id:> DEL REST API->>Workflow API: Worflow stop Workflow API-->> REST API: {status} REST API-->> Client: {status} ``` ### Tasks flow ```mermaid sequenceDiagram participant Client participant REST API participant Workflow API Client->>REST API: /api/worflow/<:w_id:>/task/<:task_id:> GET REST API->>Workflow API: Workflow single task status Workflow API->>Workflow API: Locate instance on DB Workflow API->>Workflow API: Find class/worflow from config if any Workflow API->>Workflow API: Get tasks status Workflow API-->> REST API: {status} REST API-->> Client: {status} Client->>REST API: /api/worflow/<:w_id:>/task/<:task_id:> PUT {data} REST API->>Workflow API: Workflow task restart Workflow API->>Workflow API: Locate workflow instance on DB Workflow API->>Workflow API: Stop running task with ID if any Workflow API->>Workflow API: Reset task status Workflow API->>Workflow API: Apply Async the task with default parameters + {data} Note right of Workflow API: Same UUID Workflow API-->> REST API: {status} REST API-->> Client: {status} Client->>REST API: /api/worflow/<:w_id:>/task/<:task_id:> DEL REST API->>Workflow API: Worflow task stop Workflow API-->> REST API: {status} REST API-->> Client: {status} ``` TODO: - Pending tasks don't have an ID inside the DB so they don't appear on the status calculation (I think we can get this) :exclamation: - Restarting a single tasks won't trigger the rest of the tasks down the line by default.