# Airflow migration blueprint
## Airflow entities
- DAG https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html
- Tasks https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html
- Operator https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/operators.html
- TaskFlow API https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html
- Scheduler https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/scheduler.html
- Listener: https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/listeners.html
- XCom Backend https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/xcoms.html
- airflow metadata database https://airflow.apache.org/docs/apache-airflow/stable/database-erd-ref.html
- Deferrable Operator: https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html
- Triggerer: https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html
- Sensor https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/sensors.html
## Airflow concepts
Airflow DAG can be seen as an equivalent to AiiDA's Process class. In contrast to aiida, airflow is designed to create and customize DAGs much more easily than in AiiDA. An airflow DAG consists of multiple tasks tasks. Dependencies are expressed by data dependency or explicit dependency operations using `<<` in the DAG definition. A DAG consists of tasks. Airflow offers multiple ways to define tasks. One can define the logic for a task by writing a custom Operator, a python class, or use the TaskFlow API that allows a pythonic construction of tasks from functions. Airflow communicates data between tasks through their XCom backend that can transport any JSON serializable data. The communicated data is stored in the airflow metadata database. Airflow provides a scheduler that controls the number workers and tasks that are active. Operators can define logic to be put on hold so they stop execution until some time or an event is triggered. They stop occupying a task slot in this case and are rescheduled. This can happen inside the operator by rescheduling in (Sensors) or deferring in (deferrable Operators). reschedule the task once an event is triggered. In the case of deferring one can put an trigger into the triggerer event loop that yields an event once the trigger is executed. The triggerer is a separate process that executes an event loop where all triggers are executed. Triggerers and schedulers communicate over the metadata database.
## Strategy
We will split the development in two phases, both ending with a production-ready code base. In the first phase we will focus the effort on replacing the existing plumpy+kiwipy engine, creating a backwards-compatible layer for existing CalcJob's and WorkChain's. We further want to implement a XComBackend and Listener to track the inputs and outputs of airflow tasks so they can be tracked in the aiida database. With these changes existing aiida workflows can be extended with airflow Tasks that are tracked by the AiiDA database for provenance.
In the second phase we will analyze how we can improve the existing CalcJob and WorkChain API with the flexible DAG API that airflow provides. We will create a version that uses the AiiDA database optionally relying on our the XCom backend and the Listeners to push data to the AiiDA database only when the database is present.
### Phase 1 - Backwards compatible layer
We will implement a drop-in replacement for the CalcJob for WorkChain. For the CalcJob backwards compatible layer we will reimplement the CalcJob class in from of a airflow `TaskGroup`. That `TaskGroup` uses the information in the spec to pass input data and define the logic of the tasks. We will reuse the `engine/calcjob/tasks.py` function to implement most of the logic. For the WorkChain we will follow a similar strategy. We will create custom TaskGroups to represent the while, if logic of a WorkChain that are used when translating the outline into tasks. An advantage of airflow is that we can make the WorkChain nonblocking when submitting another WorkChain by putting it on hold using the TriggerDagRunOperator.
We will use the triggerer to run the CalcJob logic in an asynchronous event loop. The CalcJob airflow Operators will defer to a trigger that contains the actual logic. The number of triggerers and the number of triggers a triggerer can host can be controlled in the airflow configuration. Each triggerer will run its own TransportQueue to not overexert the number of SSH connections to the HPCs Triggerers work similar as AiiDA's worker. They can run multiple CalcJobs at the same time and control their own TransportQueue.
In airflow one can add Listeners that are executed once a certain event is executed. This allows us to attache a hook when a DAGRun is started and thus store the inputs in the aiida database and associate them with the DAGRun ID. Furthermore, airflow allows us to listen to the XCom communication and thus we can retrieve any data that is communicated between two tasks and store it in the aiida database. Since we have the IDs of the DAGRun available we can associate it to the same workflow object in the database. Airflow only allows to communicate JSON serializable data so any data that is communicated can be also stored in the aiida database.
On the aiida side we need to still assess the solution with the best UX. An argument in the AiiDA CLI that controls which CalcJob and WorkChain is used would be ideal but has not been analyzed for feasibility. We will not be able to hide airflow from the user so an exposure to the airflow CLI and the airflow configuration is inevitable. Airflow allows external developers to develop provider packages. These provider packages can automatically extend functionality of airflow adding CLI endpoints, DAGs as well as plugins like Operators, Listener and UI extensions.
### Phase 2 - API redesign
Airflow in contrast to AiiDA provides a more pluggable interface for defining processes. This enables us to design the API of CalcJob in a way that relies less on user-specified metadata options during process submission and instead allows for greater customizability through operators that extend process functionality. Such an approach could significantly simplify the existing CalcJob API, since the significant amount of options required to customize a submission script can be pushed to the workflow developer leading to a substantial reduction in both code complexity and maintenance overhead within AiiDA. Another goal in this phase is to make the database integration entirely optional via the choice of the XCom backend. These developments should be seen as an extension of the effort of phase 1 for a simpler API for workflow developer for more rapid prototyping while still offering essential features for high-throughput calculations as the transport and job scheduler logic layers.
Since in airflow processes can be more put together more flexible airflow provides a very flexible interface to write in creating process classes for this new API, this new API will be not backwards-compatible with existing aiida workflows. Existing workflows will be extendable by constructs of the new API (e.g. concatenating a CalcJob in the old and the new API). This effort should not replace the effort of phase 1 but rather should represent an effort in improving the aiida API that is the reason of a lot of usability issues.
## Management assessment
### Cons of switching to airflow
Integrating with Airflow means giving up some flexibility in how we develop the engine. Even though we can contribute to Airflow and have some influence on its direction, we would still lose a degree of control. Depending on how deeply we integrate with the airflow code base, we might end up relying on parts of airflow’s code that change in future versions, forcing us to pin specific releases or continuously adapt our code.
We would also become dependent on a much larger external codebase. Even though fixing issues in our current codebase is time consuming and extensively, moving to a larger code base might worsen this state even further making any issue unfixable for us. In this case case we entirely rely on airflow developers to fix the bugs in our interest. If we rely on Airflow internals, we could hit bugs that aren’t covered by their public API and therefore aren’t guaranteed to be fixed. Since our use case differs from the industry focus of Airflow, and we are not a paying customer, our issues might not get much attention.
Switching to Airflow would also change the user experience significantly in contrast to aiida-workgraph and the aiida-gui. Users would need to learn a new interface, since airflow’s architecture — with its API server, scheduler, triggerers, and DAG processor — is more complex than AiiDA’s simpler daemon-worker plus message queue system setup. That added complexity could make things feel heavier and introduce friction where the current system is simpler.
It might be a better use of time and effort to keep improving the current AiiDA engine instead of investing heavily in a migration that has a risk of never reaching a stable, production-ready state.
People need to relearn where to find certain things, since airflow works differently under the hood. Airflow is with its api-server, scheduler, triggerers and dag processor is more complex than aiida with its daemon and workers. This additional complexity could introduce more friction than aiida currently does with its message queue system.
The integration of aiida into airflow will to a great part only replace parts of the code base with airflow equivalents. This code will be similarly hard to maintain once the group will have shrinked in the next year. It might be a better investment of time resources to just improve the quality of the existing code base rather than performing a drastic engine change that might not end up in a production-ready state.
### Pros of switching to airflow
Although the engine code only contributes to a relatively small amount of portion of the overall code base, it introduces a disproportionately large maintenance burden. Fixing bugs requires a deep understanding of the underlying layers of plumpy and kiwipy, as well as a profound knowledge of AiiDA to make any meaningful change. These layers are sparsely documented, which makes development heavily dependent on trial and error exploration. The concurrent nature of the engine’s execution further complicates the logic and makes debugging difficult. From our experience, it takes about a year to gain a solid understanding of AiiDA’s engine. The current outlook suggests that we will not have additional human resources to work on or improve the engine once our contract ends next year. It also appears that funding is easier to obtain for visible improvements on AiiDAlab and Materials Cloud rather than for engine-related work. Since aiida-workgraph builds a significant amount of logic on top of the existing engine, we expect to encounter similar issues as we did with plumpy after Martin and Sebastiaan left as soon as Xing leaves the group.
With Apache behind airflow we have a well-established nonprofit organization developing open-source software already for a long time with a great track record. The discontinuation of airflow by Apache appears highly unlikely, as it is one of the leading players in the workflow orchestration market, offering an unmatched level of flexibility and documentation. The developer team of airflow is much larger and by that can offer a more robust and polished product that we can do with 1-2 developers in the past years. Considering the timeframe of the last month, airflow has 9 developers with above 20 commits. We also think that contributors of airflow have stronger background in software engineering than contributors on scientific software packages and thus produce software with overall better quality.
Due to the wide use of airflow we can rely on a larger community of people to help us and new users in case of questions and problems. Since the community is larger there is larger dataset of contributions such that LLMs can perform better in answering and fixing user questions.
The rabbitmq message queue system will be replaced by the poll-system with heartbeats and pipes for communication in airflow. This new system of airflow does not require an installation of a system service and increases the portability of the workflows. While we could perform a similar change in the existing plumpy code, airflow already provides building blocks to perform such a change.
With the switch to airflow we see the potential that our workflows can be exposed to a wider community due the popularity of airflow. With phase 2 we hope that the new CalcJob and WorkChain API allows an easier translation of aiida workflows to other distributed computing infrastructures (celery, kubernetes) again potentially attracting fundings.
### Lines of codebase analysis
This section analyzes the parts of the codebase that can largely be replaced by functionality already provided by airflow, thereby eliminating the need for us to maintain that code in the future. Before proceeding with the analysis, it is important to acknowledge some caveats. While the number of lines of code can offer a rough estimate of potential maintenance cost reduction, it does not accurately reflect the true maintenance effort. For example, the ORM submodule in aiida-core contains is largest amount of lines of code, yet it is relatively straightforward to understand and maintain in comparison to the engine. Changes in the ORM layer tend to be repetitive and can often be automated with modern tooling. In contrast, even a single line of code in the engine can be complex to modify, requiring deep technical expertise and careful reasoning. Furthermore, it is difficult to precisely estimate how much of the codebase will ultimately be removed. In this analysis, we have listed only the modules where we expect that the majority of the code can be eliminated. However, other modules not explicitly mentioned will also be indirectly affected by this reduction For instance, once we switch to using the airflow CLI and REST API, we will no longer need to maintain equivalent functionality in our own CLI and REST interfaces related to engine logic.
In Figure Ref.~@fig:aiida-core-line-count we can see the total line count of the aiida-core modules and in Figure @fig:aiida-core-line-count we can see a total count of lines for the modules we will most likely replace by existing airflow code. We did not count empty lines in the code base, which explain discrepancies to other estimations as on GitHub or by using bash tools like `wc`. We only consider modules with a significant reduction of code. These includes the modules engine, broker, plumpy and kiwipy. We did not include the engine module in aiida-core, since equivalent logic has to be created in the airflow provider for backwards-compatibility. It is important to note regarding the engine module in aiida that for backwards-compatibility we still need to recreate equivalent abstractions that emulate the `CalcJob`, `WorkChain` and `BaseRestartWorkChain` in airflow. Furthermore, for the `CalcJob` replacement we will reuse most of the code in `engine/processes/calcjobs/tasks.py` to fill the logic of airflow CalcJob.
With airflows TaskFlow API most API proposal by workgraph are covered. Some functionalities like the while loop need an equivalent implementation, if they are needed. The airflow UI covers most of the functionality of aiida-gui. Any functionality that is not already covered can be added to the UI by an airflow UI plugin. Furthermore, airflow has its own scheduler that is configurable which makes the aiida-submission-controller obsolete.
| **Component** | **Lines of Code** |
|----------------------------------------|------------------:|
| kiwipy v0.8.5 | 1 918 |
| plumpy v0.25.0 | 5 154 |
| aiida-core v2.7.1 (broker) | 298 |
| aiida-workgraph v1.0.0b3 | 5 223 |
| node-graph v0.3.13 | 3 499 |
| aiida-gui v0.2.4 (python) | 1 183 |
| aiida-gui v0.2.4 (frontend) | 2 810 |
| aiida-submission-controller v0.0.3 | 283 |
| **Total:** | **20 268** |
*Figure 1: Lines of code for modules/projects that can be mostly removed by an Airflow migration.*
| **Module** | **Lines of Code** |
|----------------------|------------------:|
| aiida/orm | 20 310 |
| aiida/storage | 15 773 |
| aiida/tools | 14 067 |
| aiida/cmdline | 12 108 |
| aiida/engine | 7 540 |
| aiida/transports | 5 496 |
| aiida/restapi | 3 942 |
| aiida/schedulers | 3 722 |
| aiida/manage | 3 603 |
| aiida/common | 2 829 |
| aiida/repository | 1 011 |
| aiida/plugins | 763 |
| aiida/calculations | 621 |
| aiida/parsers | 322 |
| aiida/brokers | 298 |
| aiida/sphinxext | 252 |
| aiida | 101 |
| aiida/workflows | 91 |
| **Total:** | **92 849** |
*Figure 2: Lines of code of aiida-core 2.7.1.*
## Implementation strategy
In the prototyping phase we identified strategies to implement aiida's event loop to pool multiple aiida procesess. This is an essential feature for aiida for several reasons. First, aiida mostly is performing I/O and network operation, asynchronous I/O is therefore essential for performance gains in the aiida engine. Furthermore, aiida uses this architecture to share resources like the TransportQueue between multiple aiida Processes. A deviation from this architecture would need to implement alternatives for this cases.
### Triggerer approach
In airflow, one task runs exactly on one python process. This means that for each task a new python process has to be initialized that occupies one CPU core due to GIL. In contrast, a worker process in aiida-core, also occupies one CPU core, but it can handle multiple AiiDA Processes (e.g., CalcJobs) at the same time. The execute function of an AiiDA-process is asynchronous, thereby a worker can handle multiple AiiDA-processes concurrently at different steps. This is important for speed improvements since AiiDA mainly performs blocking I/O and network calls. To implement this similarly in Airflow we need to move the processing logic to Airflow triggers. An Airflow trigger can also run asynchronously while an Airflow task cannot. A trigger can be executed by deferring within an Airflow task. Here a simplified example how this would be implemented.
```py
class CalcJobSubmitOperator(BaseOperator):
#...
def execute(self, context: Context):
self.defer(
trigger=CalcJobSubmitTrigger(
machine=self.machine,
local_workdir=self.local_workdir,
remote_workdir=self.remote_workdir,
submission_script=self.submission_script
),
method_name="execute_complete",
)
class CalcJobSubmitTrigger(BaseTrigger)
# ...
async def run(self):
# logic of submit here
```
In this case the trigger is executed by a triggerer while the `CalcJobSubmitOperator` task is sleeping. The `CalcJobSubmitOperator` task is put on hold stopping the occupation of a CPU core. Each triggerer is run on a separate process with an event loop. There is no coordinated load-balancing happening between the triggerers but once a triggerer is occupied with a trigger, it automatically will not be able to to fetch further triggers. Multiple triggerers have to be spawned so we can support multiple event loops. Airflow will automatically distribute among the triggers once the capacity of a triggerer is full (by default 1000).
Here is a full example of how the submit task can be implemented with a trigger.
```python
class CalcJobSubmit(BaseOperator):
def execute(self, context):
node_uuid = retrieve_uuid(context)
self.defer(
trigger=SubmitTrigger(node_uuid=node_uuid),
method_name="resume",
)
```
*Figure 3: A simplified version of the airflow Operator code for a upload step in a function. The Operator blocks one CPU core while being executed but defers immediately to the trigger `SubmitCalcJobTrigger` unblocking the CPU core.*
```python
from aiida_core.engine.calcjob.tasks \
import task_upload_job
class SubmitTrigger(BaseTrigger):
def __init__(self, node_uuid):
super().__init__()
self.node_uuid = node_uuid
async def run(self):
response = await task_submit_job(
self.node_uuid)
yield TriggerEvent(
{"response": response,
"node_uuid": self.node_uuid}
)
```
*Figure 4: A simplified version of the airflow Trigger code for the upload step in a function. It mainly executes aiida-cores task definition. The trigger is executed on one of the triggerers that can accumulate a configurable amount of triggers in their event loop.*
The communication within the airflow systems during the DAG run of three CalcJobs can be seen in Fig. 1 and 2. The CalcJobs are referred by their IDs 1, 2, 3. We assume in this example that 2 triggerers run. Each triggerer is assigned to a port that are only important for the logging information. Each communication of airflow is put into sequential order by numbering them from 1 to 8. The database serves as central authority allowing the scheduler and triggerers to implicitly communicate with each other by polling regularly from the database.

*Figure 5: A simplified version of the communication order within airflow system for the CalcJob submit task starting from the airflow scheduler perspective.*

*Figure 6: A simplified version of the communication order within airflow system for the CalcJob submit task starting from the airflow triggerer perspective.*
### Executor approach
Because the triggerer approach seems to introduce overhead for each executed task most likely due to the hopping of the task to the database, scheduler, database, triggerer and back, we started to investigate an alternative execution model by implementing our own executor of airflow that directly spawns workers with event pools in the scheduler, and maps tasks to one of these workers. This design differs with the LocalExecutor of airflow that spawns for each task its own process. In this approach we need to map the task id to the worker pid so that airflow can correctly send messages to the Task's process the through the UNIX pipes. Airflow provides the `supervise` function in `airflow.sdk.execution_time.supervisor` to allow workers to execute tasks. The supervise functions takes the current process and forks it. It sets a UNIX pipes for redirecting the stdoud, stderr and requests. These pipes are used to communicate between the scheduler and the task instances. We will use the same approach as done in the LocalExecutor of airflow with the difference that instead of forking the current process, we create a coroutine out of the task execution and put it into the workers asynchronous event loop. This brings back more code back from plumpy and this provides less maintenance benefit. An open question is if we need `nest_asyncio` in this approach. Reusing this approach would be a nogo, since this library is already deprecated for a long time and should not be reused if we switch to a different engine.
```
┌──────────────────────────────┐
│ Airflow Scheduler │
└──────────────┬───────────────┘
│
▼
┌────────────────────────────────┐
│ Custom Async Executor │
│ ───────────────────────────────│
│ Supervisor w/ Virtual Mapping │
│ ┌──────────────────────────┐ │
│ │ Worker Pool │ │
│ │ ┌────────────────────┐ │ │
│ │ │ Async Event Loop 1 │ │ │
│ │ │ ├─ Task A │ │ │
│ │ │ ├─ Task B │ │ │
│ │ └────────────────────┘ │ │
│ │ ┌────────────────────┐ │ │
│ │ │ Async Event Loop 2 │ │ │
│ │ │ ├─ Task C │ │ │
│ │ └────────────────────┘ │ │
│ └──────────────────────────┘ │
└────────────────────────────────┘
▲
│ UNIX pipes / IPC
▼
Airflow Metadata Database
```
*Figure 7: Architecture of the custom executor that would spawn a fixed set of workers that also directly execute the tasks as coroutines in their event loops instead of forking the current process to execute the task.*
### TransportQueue server approach
TODO