# Practices https://hackmd.io/NJZ3r30ST46wzwzGnngY5A https://airflow.apache.org/docs/apache-airflow/stable/tutorial/pipeline.html# # Overview Represent workflows as DAG of tasks: * complex workflow/task dependencies * dynamic/code-based Python, Jinja * scalibility, handles workflows at any scale, from small to enterprise-level * extensible, integrates with a wide range of tools and cloud services, allows you to run custom scripts. * observability, the Airflow UI provides an immediate overview of all data pipelines # Airflow Architecture ![airflow1](https://hackmd.io/_uploads/B16mBZ2Xyg.png) A minimal Airflow installation consists of the following components: A **scheduler**, which handles both triggering scheduled workflows, and submitting Tasks to the executor to run. The executor, is a configuration property of the scheduler, not a separate component and runs within the scheduler process. There are several executors available out of the box, and you can also write your own. A **webserver**, which presents a handy user interface to inspect, trigger and debug the behaviour of DAGs and tasks. A **folder of DAG files**, which is read by the scheduler to figure out what tasks to run and when to run them. A **metadata database**, which airflow components use to store state of workflows and tasks. Setting up a metadata database is described in Set up a Database Backend and is required for Airflow to work. ***Optional components*** Some Airflow components are optional and can enable better extensibility, scalability, and performance in your Airflow: Optional **worker**, which executes the tasks given to it by the scheduler. In the basic installation worker might be part of the scheduler not a separate component. It can be run as a long running process in the CeleryExecutor, or as a POD in the KubernetesExecutor. Optional **triggerer**, which executes deferred tasks in an asyncio event loop. In basic installation where deferred tasks are not used, a triggerer is not necessary. More about deferring tasks can be found in Deferrable Operators & Triggers. Optional **dag processor**, which parses DAG files and serializes them into the metadata database. By default, the dag processor process is part of the scheduler, but it can be run as a separate component for scalability and security reasons. If dag processor is present scheduler does not need to read the DAG files directly. More about processing DAG files can be found in DAG File Processing Optional folder of **plugins**. Plugins are a way to extend Airflow’s functionality (similar to installed packages). Plugins are read by the scheduler, dag processor, triggerer and webserver. More about plugins can be found in Plugins ![apache_airflow_architecture](https://hackmd.io/_uploads/Byx9zDo7Jg.jpg) More Animate version! https://datageeklab.com/apache-airflow-architecture-key-components/ # Web UI Web UI for monitoring and troubleshooting data pipelines. ![dags](https://hackmd.io/_uploads/rktCqwsXkl.png) # Workflow DAGs Airflow allows to represent workflows as DAGs of tasks. ![edge_label_example](https://hackmd.io/_uploads/HyVuqDiX1g.png) Dag code example: ``` from datetime import datetime from airflow import DAG from airflow.decorators import task from airflow.operators.bash import BashOperator # A DAG represents a workflow, a collection of tasks with DAG(dag_id="demo", start_date=datetime(2022, 1, 1), schedule="0 0 * * *") as dag: # Tasks are represented as operators hello = BashOperator(task_id="hello", bash_command="echo hello") @task() def airflow(): print("airflow") # Set dependencies between tasks hello >> airflow() ``` DagRun: https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dag-run.html # Tasks The basic unit of execution in Airflow. **Task Execution flow (CeleryExecutor):** ![run_task_on_celery_executor](https://hackmd.io/_uploads/rJ_F--hQ1e.png) [1] *SchedulerProcess* processes the tasks and when it finds a task that needs to be done, sends it to the QueueBroker. [2] *SchedulerProcess* also begins to periodically query ResultBackend for the status of the task. [3] *QueueBroker*, when it becomes aware of the task, sends information about it to one WorkerProcess. [4] *WorkerProcess* assigns a single task to a one WorkerChildProcess. [5] *WorkerChildProcess* performs the proper task handling functions - execute_command(). It creates a new process - LocalTaskJobProcess. [6] *LocalTaskJobProcess* logic is described by LocalTaskJob class. It starts new process using TaskRunner. [7][8] Process *RawTaskProcess* and LocalTaskJobProcess is stopped when they have finished their work. [10][12] *WorkerChildProcess* notifies the main process - WorkerProcess about the end of the task and the availability of subsequent tasks. [11] *WorkerProcess* saves status information in ResultBackend. [13] When *SchedulerProcess* asks ResultBackend again about the status, it will get information about the status of the task. **Task lifecycle/status** https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html ![task_lifecycle_diagram](https://hackmd.io/_uploads/HyraLPj7Jg.png) ``` * none: The Task has not yet been queued for execution (its dependencies are not yet met) * scheduled: The scheduler has determined the Task’s dependencies are met and it should run * queued: The task has been assigned to an Executor and is awaiting a worker * running: The task is running on a worker (or on a local/synchronous executor) * success: The task finished running without errors * restarting: The task was externally requested to restart when it was running * failed: The task had an error during execution and failed to run * skipped: The task was skipped due to branching, LatestOnly, or similar. * upstream_failed: An upstream task failed and the Trigger Rule says we needed it * up_for_retry: The task failed, but has retry attempts left and will be rescheduled. * up_for_reschedule: The task is a Sensor that is in reschedule mode * deferred: The task has been deferred to a trigger * removed: The task has vanished from the DAG since the run started ``` More task trigger rules beside default `all_success` (e.g. `all_done`, `one_success`, etc.) https://airflow.apache.org/docs/apache-airflow/2.10.3/core-concepts/dags.html # Core Concepts **Operators**: predefined templates for task (PythonOperator, BashOperator, PostgresOperator, HttpOperator, etc.) **Sensors**: a type of task, wait for something to happend (SqlSensor, HttpSensor etc.), two types of running `poke`, `reschedule` **Data interval**: execution_date/logical_date/date_interval_start (@daily, @monthly, cron, etc.) **Hooks**:interface to an external platform, allow users to re-use, often the building blocks that Operators are built out of. **Connections**: a connection to external systems (usually with user credentials - username/password) **XComs**: task communication (push/pull job's medatadata/data, e.g. result file path, result table name, dynamic parameters) **Variables**:Airflow runtime configurations (e.g. AIRFLOW_VAR_MY_VAR=MY_VALUE) # DAG Best Practices Best practices for designing and maintaining Airflow DAGs include: * Keep Tasks Atomic: Ideally each task accomplishes one specific action. DAGs designed with atomic tasks give you fine-grained observability into events in your data pipeline and the ability to rerun from the point of failure. * Design idempotent DAGs and Tasks: When backfilling DAGs for earlier dates it is crucial that the same input for individual tasks and DAGs produce the same output in order to benefit from reliable, reproducible orchestration * Modularize your code: Airflow is built on Python code and allows you to modularize commonly used actions in Python functions to be reused in several tasks across DAGs. As in all software engineering, it is a good practice to try not to repeat yourself, if you write the same code twice - consider turning it into a reusable module or even to create your own custom Airflow operator. * Implement Automated Testing: Write unit tests for custom functions and operators, as well as integration and validation tests for your DAGs. Leverage development environments for automatic testing before pushing your code to production. * Handle Failures: Implement automatic retries and alerts for task failures. * Use Version Control: Like all code, DAGs and supporting code like SQL scripts should be stored in version control and be governed by a CI/CD process. * More: https://www.astronomer.io/docs/learn/dag-best-practices # Performance Tunning A few key settings for tunning Airflow at environtment-level, dag-level and task-level. **Core settings:** * **parallelism**:The maximum number of tasks that can run concurrently on **each scheduler** within a single Airflow environment. For example, if this setting is set to 32, and there are two schedulers, then no more than 64 tasks can be in a **running** or **queued** state at once across all DAGs. If your tasks remain in a scheduled state for an extended period, you might want to increase this value. The default value is 32. * **max_active_tasks_per_dag** (formerly dag_concurrency): The maximum number of tasks that can be scheduled at the same time across all runs of a DAG. Use this setting to prevent any one DAG from taking up too many of the available slots from parallelism or your pools. The default value is 16. * **max_active_runs_per_dag**: Determines the maximum number of active DAG runs (per DAG) that the Airflow scheduler can create at a time. **Scheduler settings:** Control how the scheduler parses DAG files and creates DAG runs * **min_file_process_interval**: The frequency that the DAGs directory is scanned for new files, in seconds. * **dag_dir_list_interval**: The frequency that each DAG file is parsed, in seconds. Updates to DAGs are reflected after this interval. parsing_processes: How many processes the scheduler can run in parallel to parse DAGs (2 x vCPUs) **Task-level settings:** * **max_active_tis_per_dag** (formerly task_concurrency): The maximum number of times that the same task can run concurrently across all DAG runs. For instance, if a task pulls from an external resource, such as a data table, that should not be modified by multiple tasks at once, then you can set this value to 1. * **pool**: Defines the amount of pools available for a task. Pools are a way to limit the number of concurrent instances of an arbitrary group of tasks. This setting is useful if you have a lot of workers or DAG runs in parallel, but you want to avoid an API rate limit or otherwise don't want to **overwhelm a data source or destination**. **Celery Executor:** Scaling with the Celery executor involves choosing both the number and size of the workers available to Airflow. The more workers you have available in your environment, or the larger your workers are, the more capacity you have to run tasks concurrently. You can also tune your **worker_concurrency**, which determines how many tasks each Celery worker can run at any given time (default 32). Could be set for each worker independently. # Cross-DAG dependencies https://www.astronomer.io/docs/learn/cross-dag-dependencies # More resources https://airflow.apache.org/docs/apache-airflow/2.10.3/core-concepts/overview.html https://airflow.apache.org/docs/apache-airflow/stable/faq.html https://www.astronomer.io/docs/learn/ https://geniusee.com/single-blog/apache-airflow-overview-what-it-is-and-how-it-works https://datageeklab.com/apache-airflow-architecture-key-components/ https://airflow.apache.org/docs/apache-airflow/2.10.3/core-concepts/executor/index.html https://airflow.apache.org/docs/apache-airflow-providers-celery/stable/celery_executor.html