# Setup Airflow Setup a production-level Airlow cluster using Docker-compose. ![Screenshot 2024-12-01 at 17-22-43 Airflow Architecture A Deep Dive into Data Pipeline Orchestration by Bageshwar Kumar Medium](https://hackmd.io/_uploads/BkNMKhFmyx.png) (source internet) # Code struct ``` ├── app │   ├── .env │   ├── dev.env │   ├── docker-compose-flower.yaml │   ├── docker-compose-init.yaml │   ├── docker-compose-worker.yaml │   └── docker-compose.yaml └── build ├── Dockerfile ├── README.md └── libs ├── instantclient-basic-linux.x64-21.16.0.0.0dbru.zip └── requirements-local.txt ``` # Build Airflow custom image Extend an Airflow image for working with Dbt, Redshit, Oracle DB. * requirements-local.txt ``` airflow-dbt==0.4.0 dbt-redshift==1.7.0 dbt-core==1.7.1 oracledb==2.5.0 apache-airflow-providers-common-sql apache-airflow-providers-oracle[common.sql] flower==2.0.1 gunicorn==20.1.0 itsdangerous==2.0.1 Flask-OpenID==1.3.0 flask-oidc==1.4.0 astronomer-cosmos==1.6.0 ``` * Dockerfile: ``` FROM apache/airflow:2.9.3 USER root # Install Oracle binary # https://www.oracle.com/vn/database/technologies/instant-client/linux-x86-64-downloads.html RUN apt update && apt install unzip libaio1 RUN mkdir -p /opt/oracle && chown -R airflow: /opt/oracle COPY ./libs/instantclient-basic-linux.x64-21.16.0.0.0dbru.zip /opt/oracle/ RUN cd /opt/oracle && unzip instantclient-basic-linux.x64-21.16.0.0.0dbru.zip RUN echo /opt/oracle/instantclient_21_16 > /etc/ld.so.conf.d/oracle-instantclient.conf RUN ldconfig # Install Python requirements COPY --chown=airflow:root ./libs/requirements-local.txt ./requirements/ USER airflow RUN pip install -r requirements/requirements-local.txt ``` * Build image: ``` docker build -t my-com/airflow:2.9.3 -f Dockerfile . ``` Wait for build to success and check images by `docker images list | grep airflow` # Setup databases ## Metadata database Run a PostgreSQL on your host machine: ``` docker run -d --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=t0b3changed \ --restart unless-stopped \ -e PGDATA=/var/lib/postgresql/data/pgdata -v /path/to/postgres/data/:/var/lib/postgresql/data -d postgres:15 ``` `/path/to/postgres/data/` path to a folder on your host for Postgres to persist data. * Create database and user for Airflow: ``` create database airflow; create user airflow with password 'my_airflow_pwd'; # Select/switch to airflow db \c airflow GRANT ALL ON SCHEMA public TO airflow; GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO airflow; GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO airflow; ``` ## Task queue Run a Redis on your host machine: ``` docker run -d --name redis -p 6379:6379 redis:7.4.1 --save 60 1 --loglevel warning ``` `--save 60 1` allow Redis to persist data every 60s # Deployment configs ## Environment Update env vars (.env) for deployment ``` # User CeleryExecutor AIRFLOW__CORE__EXECUTOR='CeleryExecutor' # Connection to metadata server (PostgreSQL), queue (Redis) AIRFLOW__DATABASE__SQL_ALCHEMY_CONN='postgresql+psycopg2://airflow:airflow_pwd@host_name/airflow' AIRFLOW__CELERY__RESULT_BACKEND='db+postgresql://airflow:airflow_pwd@host_name/airflow' AIRFLOW__CELERY__BROKER_URL='redis://host_name:6379/1' AIRFLOW__CORE__FERNET_KEY='_SWDqtSH05c-yrITFolOXMPDmSJE-aRU6LcEb1ffMwo=' AIRFLOW__CORE__DEFAULT_TIMEZONE='Asia/Ho_Chi_Minh' AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION='true' AIRFLOW__CORE__LOAD_EXAMPLES='false' AIRFLOW__API__AUTH_BACKENDS='airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session' AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK='true' # AIRFLOW_CONFIG='/opt/airflow/config/airflow.cfg' ``` ## Docker-compose configs * docker-compose.yaml: Base components of Airflow cluster (webserver, scheduler, trigger) ``` x-airflow-common: &airflow-common image: my-com/airflow:2.9.3 env_file: .env volumes: - /path/to/airflow/data/dags:/opt/airflow/dags - /path/to/airflow/data/logs:/opt/airflow/logs - /path/to/airflow/data/config:/opt/airflow/config - /path/to/airflow/data/plugins:/opt/airflow/plugins user: "${AIRFLOW_UID:-50000}:0" services: airflow-webserver: <<: *airflow-common command: webserver ports: - "8080:8080" healthcheck: test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] interval: 30s timeout: 10s retries: 5 start_period: 30s restart: always airflow-scheduler: <<: *airflow-common command: scheduler healthcheck: test: ["CMD", "curl", "--fail", "http://localhost:8974/health"] interval: 30s timeout: 10s retries: 5 start_period: 30s restart: always airflow-triggerer: <<: *airflow-common command: triggerer healthcheck: test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"'] interval: 30s timeout: 10s retries: 5 start_period: 30s restart: always ``` * docker-compose-worker.yaml: Deploy workers node (replicas 3) ``` x-airflow-common: &airflow-common image: my-com/airflow:2.9.3 env_file: .env volumes: &volumes - /path/to/airflow/data/dags:/opt/airflow/dags - /path/to/airflow/data/logs:/opt/airflow/logs - /path/to/airflow/data/config:/opt/airflow/config - /path/to/airflow/data/plugins:/opt/airflow/plugins user: "${AIRFLOW_UID:-50000}:0" services: airflow-worker: <<: *airflow-common command: celery worker healthcheck: # yamllint disable rule:line-length test: - "CMD-SHELL" - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' interval: 30s timeout: 10s retries: 5 start_period: 30s environment: # Required to handle warm shutdown of the celery workers properly # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation DUMB_INIT_SETSID: "0" restart: always deploy: mode: replicated replicas: 3 ``` * docker-compose-init.yaml: Init Airflow data, configs ``` x-airflow-common: &airflow-common image: my-com/airflow:2.9.3 env_file: .env volumes: &volumes - /path/to/airflow/data/dags:/opt/airflow/dags - /path/to/airflow/data/logs:/opt/airflow/logs - /path/to/airflow/data/config:/opt/airflow/config - /path/to/airflow/data/plugins:/opt/airflow/plugins user: "${AIRFLOW_UID:-50000}:0" services: airflow-init: <<: *airflow-common entrypoint: /bin/bash # yamllint disable rule:line-length command: - -c - | if [[ -z "${AIRFLOW_UID}" ]]; then echo echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m" echo "If you are on Linux, you SHOULD follow the instructions below to set " echo "AIRFLOW_UID environment variable, otherwise files will be owned by root." echo "For other operating systems you can get rid of the warning with manually created .env file:" echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user" echo fi one_meg=1048576 mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg)) cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat) disk_available=$$(df / | tail -1 | awk '{print $$4}') warning_resources="false" if (( mem_available < 4000 )) ; then echo echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m" echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))" echo warning_resources="true" fi if (( cpus_available < 2 )); then echo echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m" echo "At least 2 CPUs recommended. You have $${cpus_available}" echo warning_resources="true" fi if (( disk_available < one_meg * 10 )); then echo echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m" echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))" echo warning_resources="true" fi if [[ $${warning_resources} == "true" ]]; then echo echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m" echo "Please follow the instructions to increase amount of resources available:" echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin" echo fi mkdir -p /sources/logs /sources/dags /sources/plugins chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins} exec /entrypoint airflow version # yamllint enable rule:line-length environment: _AIRFLOW_DB_MIGRATE: 'true' _AIRFLOW_WWW_USER_CREATE: 'true' _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} _PIP_ADDITIONAL_REQUIREMENTS: '' user: "0:0" volumes: - ${AIRFLOW_PROJ_DIR:-.}:/sources ``` * docker-compose-flower.yaml: Flower service for monitoring workers (Celery workers) ``` x-airflow-common: &airflow-common image: my-com/airflow:2.9.3 env_file: .env volumes: &volumes - /path/to/airflow/data/dags:/opt/airflow/dags - /path/to/airflow/data/logs:/opt/airflow/logs - /path/to/airflow/data/config:/opt/airflow/config - /path/to/airflow/data/plugins:/opt/airflow/plugins user: "${AIRFLOW_UID:-50000}:0" services: flower: <<: *airflow-common command: celery flower ports: - "8080:5555" healthcheck: test: ["CMD", "curl", "--fail", "http://localhost:5555/"] interval: 30s timeout: 10s retries: 5 start_period: 30s restart: always ``` # Launch Make sure database instances are available and run: ``` docker compose -f docker-compose.yaml \ -f docker-compose-init.yaml \ -f docker-compose-worker.yaml \ -f docker-compose-flower.yaml up -d ``` We can selectively deploy components on diferrent physical servers (e.g. deploy more workers): ``` docker compose -f docker-compose-flower.yaml up -d ``` Airflow should be running now! Login with `admin/admin` at `http://localhost:8080` # Dags Start to deploy your dags to `/path/to/airflow/data/dags` # Docker Best practices * Pull an image: ``` docker pull postgres:15 docker pull quay.io/wildfly/wildfly ``` * Build an image: `docker build -t my-app:latest -f Dockerfile .` * Write a Dockerfile ``` FROM apache/airflow:2.9.3 RUN apt-get update && apt-get install -y git USER airflow RUN pip install airflow-dbt==0.4.0 dbt-core==1.7.1 dbt-redshift==1.7.0 ``` * Run a container, stop, start: ``` docker run -d --name my-app ... docker stop my-app docker start my-app ``` * Mount a volume (e.g. shared data folder), map ports from host to container: ``` docker run -d --name my-app -p 8081:8080 -v /home/ubuntu/appdata/:/opt/myapp/data/ ... ``` * Pass env vars to containers: ``` docker run -d --name my-app -e MYVAR=myvalue \ --env-file=/home/ubuntu/myapp.env my-app ``` * Restart policy (e.g. auto start containers on host restarts) ``` docker run -d --name my-app --restart unless-stopped my-app ... ``` * Run with a specific user (usually set in the Dockerfile): ``` docker run -d --name my-app --user=airflow ``` * Limit resources for a container (e.g. for performance test): ``` docker run --cpus=0.5 --memory=1g my-app ... ``` * Exec into a running container(e.g. for debug): ``` docker exec -it my-app /bin/bash` ``` * List images, containers on host: ``` docker container list --all docker image list --all ``` * Create a network and assign containers to it: ``` docker network create my_network Containers on the same network can see each other by name: docker run -d --name my-app --network my-network docker run -d --name postgres_db --network my-network ``` * Monitoring processes, resources: ``` Show running containers: docker ps Show resouces usage (mem, cpu) by containers: docker stats Show running processes inside a container: docker top my-app ```