# Proposal to Implement a General Task Queue for the Console
## Executive Summary
We deal with long-running tasks and regularly-scheduled tasks by passing them off to celery, which is a management system separate from the website that the operators see.
If a task is interrupted it can be hard to restart or resume it later, but sometimes interruptions are unavoidable.
In addition, particularly in the ingest process, we have a number of tasks that depend on one another. Celery does have the notion of a chain of tasks, but our dependencies are more complex than that. We've worked around that up to now by creating tasks that check if they can run, and if not can restart themselves after a few seconds. That does largely work, although under certain circumstances they can run away with themselves.
We have also had problems monitoring the running, completed, or failed tasks because the current celery setup does not keep an easily accessible record of these things across restarts.
We have also had problems with our automated unit tests, which ensure the robustness of the whole system. Because celery is a separate process it's not straightforward to test things in the normal way and we've had to resort to some trickery to try to work around that.
We'd like to build a queue system, stored in our database, that will allow us to manage the celery tasks more effectively. This would be a new layer between our system and the existing celery. This will make resuming or restarting easier, it will make error tracing easier, and it will allow us to present a clearer picture of what's going on and what has happened in the past. In addition it would allow us to pause one task and to prioritise another one. Having a queue will also mean that our automated testing of the tasks, and how they are initiated, can become much more robust because we can test the construction of the queue independently of testing each type of task.
## The Problems
### Restart of Failed Operations such as Ingest
We have a common problem cycle of (failure, repair, ad hoc fiddling to resubmit the existing task); this is because the various things all have distinct control flows - sometimes you have to hand submit a celery task, sometimes you need to adjust some outside state first (such as the step position of some process), sometimes you have to just start again but first delete all the partial work already done, sometimes you need to disable self-requeuing tasks in the celery queue as well.
The specific process of restart is entirely specific to the operation being resumed. Putting a simple "restart" button in various places is an open ended task requiring a custom solution for each such button.
### Tasks which fail often need to be recreated
Most tasks we submit to celery are a single atomic operation: create a database tree, make a link tree for processing, etc. So redoing them should normally be just fine. However, we don’t have a common “resubmit” button we can just push after some repair.
### Tasks in celery vanish over a celery restart
Celery tasks do not survive a restart of celery (such as happens during a deploy). Having tasks in a Django model would preserve their state.
### Tasks in celery do not support dependencies
Celery tasks don’t have any notion of dependencies so you can’t submit a task to block until another task completes. As such, several tasks have a “are we there yet? if not resubmit ourself after a delay” step. The test condition is peculiar to the task and requires a task to know how to submit itself.
### There is no clean shutdown process for deployments
We cannot request tasks to stop, except individually.
We cannot prevent new tasks from being queued.
We have to ask users to avoid submitting long tasks such as ingests if an important update needs to be deployed.
### Deployments require celery to be idle, at least as far as ingests go
Since there's no clean global "please shut down" process
we cannot stop or interrupt things,
we need to wait for the current tasks to complete
and we need to ask users not to submit new things.
### There is no priority system for tasks
If PM want to indicate that something needs to take priority
over other tasks there is no feature for this.
Related, we cannot pause tasks.
### There is little visibility of task state, pending or depedencies
Having dependencies would let us see why a task has not happened yet.
Having a view of the task graph should show what's running and hopefully its progress, and also why a task is blocked (waiting for a prerequisite task).
Failed tasks awaiting repair or cancellation will also be visible.
## The Proposed Queue
We propose to maintain a queue of task entries
with an associated worker to dispatch ready to run entries
in priority order as capacity becomes available.
These are regular Django Models and persist across application restart.
Every action currently run as a celery task
will get an entry in the queue, initially `PENDING`.
Entries may depend on any number of other entries
completing before they become available for dispatch.
These dependencies are arranged when a complex set of operations is prepared;
the task functions themselves do not need to know these relationships
and have no hand in honouring them.
We can present a visualisation of the tasks queue and also of the tasks associated with things like ingests.
Because task entries transition to `FAILED` on failure,
we're free to repair fixable issues and requeue the entry,
or to abort the task and the larger operation
if things really need to be redone.
### Example CAN Ingest Setup
For example, when a CAN ingest is committed several things must happen:
1. an Asset tree of folders and files must be made from the chosen preingest assets
2. a link tree of files must be made for the tape ingest
3. a link tree of files must be made for the metadata pass
4. a link tree of files must be made for the proxy process
5. the ingest link tree must be ingested into BlackPearl
6. the metadata link tree must be scanned for metadata
7. the ingest link tree must be cleaned up
8. the metadata link tree must be cleaned up
Steps 2, 3 and 4 need step 1 to be complete because they use the Asset information.
Step 5 needs step 2 to be complete.
Step 6 needs step 3 to be complete.
Step 7 needs step 5 to be complete.
Step 8 needs step 6 to be complete.
Currently this is held together using custom logic in each celery task definition.
In the new queue
the CAN ingest setup process will make all these task entries in state `PREPARE`
and specify which entries need to wait for other entries.
When ready, each task is set to `PENDING`,
ready to be dispatched when all its prerequisites
are complete and capacity is available.
### Task Entry state transitions
When an entry from the queue is queued to celery
the entry is marked as `RUNNING`.
A running task can finish in one of 4 ways:
- completion/success
- failure
- cancellation
- abort
If the celery task runs to completion,
the entry is marked as `COMPLETED`
and dependent entries are no longer blocked by this entry.
If the celery task fails,
the entry is marked as `FAILED`.
Dependent tasks remain blocked
so that a repair can be done and the entry reset to `PENDING`,
ready for being run again.
Tasks are "cancelled" by actions such as shutdown
(for example, before an update/deploy).
The entry is marked as `CANCELLED`.
Dependent tasks remain blocked
so that the cancelled entry can be reset to `PENDING`,
ready to be rerun.
Tasks are "aborted" by terminating the celery task if running and marking the entry as `ABORTED`.
Dependent tasks are also marked as `ABORTED`,
abandoning the entire chain of entries
in a cascade.
## Tech
### Models
Celery task state may be tracked and stored persistently by `django-celery`.
A `TaskEntry` Model with the following fields:
- `uuid`: the primary key
- `creation_time` and `update_time`: like other Models
- `priority`: an integer, default `0`; ready-to-run entries are dispatched in `(priority,creation-time?)` order capacity permitting
- `description`: descriptive string
- `audit_context`: UUID of the context `AuditLog` entry
- `task_callable`: the task function in `module:funcname` form
- `task_arguments`: a JSON blob with a list of positional parameter values
- `app_state`: a generic relation to a Model instance for use by the task itself
- `status`: one of `PREPARE`, `PENDING`, `RUNNING`, `FAILED`, `CANCELLED`, `ABORTED`
- `required`: a relation to other `TaskEntry` instances prerequisite to this task entry
- `blocker_count`: a nonnegative integer holding the count of prerequisites which block this task entry, essentially the number of noncomplete prerequisites
- `start_time`: when the entry was most recently move to `RUNNING` status
- `end_time`: when the entry was most recently moved from `RUNNING` status
- `progress`: a relation to any number of `TaskProgress` instances representing progress of some kind; these will have at least `title`, `start`, `end`, `position`
- `celery_state`: a relation to any number of celery task records as maintained by `django-celery`; only one can be "live" at any time
### Task Function Specification
A `@TaskEntry.task` decorator will provide boilerplate and administration logic
with the following effects:
- the following positional parameters will be prepended to the function arguments: `TaskEntry` `uuid`, `runstate:RunState` to mediate cancellation and start/stop times
- directly calling the function will create a new `TaskEntry` and dispatch the function immediately in the current `Thread` like an ordinary call, with the expected status transitions
- celery `@task`-like `.apply` and maybe `.defer` methods, maybe also `.bg` method to dispatch the task via a `Thread` instead of via the Celery queue process
- create a context `AuditLog` instance with the entry `audit_context` as its own context
- create a `RunState` for the `runstate` argument, check it on exit, transition to `CANCELLED`
- catch exceptions and transition to `FAILED`
- signal handler for `SIGTERM` (Celery cancellation mechanism) and set `runstate.cancelled`
- `AuditLog` action entries for task start and stop