# 0. Getting started with Airflow ## Airflow - We only cover v1. Latest version of Airflow is v2 but we're running on v1. Migrating to v2 requires significant effort from our end. Find out [what has changed on v1 vs v2](https://wiki.grab.com/display/OLYMPUS/04.+Airflow+Service:+Migration+from+Airflow+v1+to+Airflow+v2). - Familiarize/introduce high level [Airflow concepts](https://airflow.apache.org/docs/apache-airflow/stable/concepts/overview.html) ### Folder structure `/src/main/python/dags/` contain all our DAGs `/src/main/python/helpers/` - common python scripts that you want to use across many DAGs - operators - a template for a predefined Task - sensors - a special type of Operator that are designed to wait for something to occur. - hooks - a high-level interface to an external services like databases ~~/src/main/python/plugins/~~ ### Naming conventions and best practises - snake case variables and DAG IDs by default (eg `snake_case` vs `CamelCase`) - TODO: team naming coventions & subdirectory structure (`eg perf.table_name`) - Follow [Airflow Best Practises](https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html) ## [Not covered] Running examples locally with Docker & docker-compose The easiest & fastest way to get hands on Airflow is running a instance locally. Setting up local environment for development requires [installation of docker and optionally docker-compose](https://docs.docker.com/compose/install/). With docker-compose we can `docker-compose up` ```yaml=docker-compose.yml version: '3' services: airflow: image: puckel/docker-airflow ports: - "8080:8080" volumes: - './:/usr/local/airflow/dags' ``` or with docker: ```bash docker run -d -p 8080:8080 -v .:/usr/local/airflow/dags puckel/docker-airflow docker ps docker exec -it b2a71e bash ``` then visit [http://localhost:8080](http://localhost:8080) for the UI. ## 1. Example: How to schedule a Presto job ### Creating the table The usual workbench SQL method. ```sql CREATE TABLE IF NOT EXISTS schema.table ( name varchar, rundate date ) WITH (partitioned_by = ARRAY['rundate']); ``` you can also use a operator. External tables needs to be created via QuickSilver. ### Airflow DAG ```python from airflow import DAG from operators.presto_operator import PrestoOperator from datetime import datetime, timedelta default_args = { 'owner': 'kornesh.kanan', 'start_date': datetime(2021, 7, 1), 'retries': 3, 'retry_delay': timedelta(minutes=3), 'depends_on_past': True } with DAG( 'hourly_update', # job id schedule_interval='@hourly', default_args=default_args, catchup=False, max_active_runs=1) as dag: update = PrestoOperator(task_id='hourly_update', queries=[ ''' DELETE FROM schema.table WHERE rundate = '2022-01-01' ''', ''' INSERT INTO schema.table (name, rundate) VALUES ('A', '2022-01-01'), ('B', '2022-01-01') ''' ]) update ``` ### Code review & deployment Create a git branch and commit changes ```bash git checkout -b 'airflow-example' # Create a branch git add dags/example.py # Add the files git commit -m "Add Airflow example" # Commit changes git push # Push commit to origin ``` You can copy paste the MR link in a browser. Once merged, it will be automatically deployed to our production airflow instance. ## 2. Reboot Campaign queries A quick ~~demo~~ overview of how we can migrate a slide job into an airflow job. Copy paste the following SQLs into the airflow template from above example. [Q: Should we embed this code?] - [sfmc_kyc_rejection_daily](https://one-de.prod.data-engineering.myteksi.net/slide/crm/jobs/1371) - [sfmc_kyc_rejection_cumulative](https://one-de.prod.data-engineering.myteksi.net/slide/crm/jobs/1372) Update the `src/main/scripts/sfmc/daily_s3_to_sftp_automations.yaml` ```yaml gfg: owner: vaibhav.vij notify_cc: - yongseng.goh sftp_conn_id: sftp_grabtaxi schema: crm partition_column: rundate sftp_path: /Import/gfg_automations/ start_date: 2020-03-27 end_date: encrypt: yes platform: sfmc automations: pax_crm_all_all_gp_kyc-rejection_all: table_name: sfmc_kyc_rejection_daily ``` ### 2.2 How move from YML to DAGs Based on [crm.eater.id_package_ceria_subscription](https://gitlab.myteksi.net/data-science/marketing/airflow/-/blob/master/src/main/python/dags/campaigns/eater/id_gf_subscription.py) ```python from sfmc.sftp_upload_helper import create_upload_tasks ... with DAG(...): check_snoozetime_limit, count_check, s3_to_sftp_upload, signal = \ create_upload_tasks( table_name='sfmc_kyc_rejection_daily', notify_cc=['vaibhav.vij'], sftp_path='/Import/gfg_automations/', journey_name='pax_crm_id_all_gf_subscription-exp_all', # TODO: What does it refers to sftp_conn_id='sftp_grabtaxi', **default_args) # TODO: Why default args here ``` ## 3. Data Quality Check [WIP] Before we insert data into table, we should ideally write it into a staging table and do some data quality check. Eg. check if the rows are unique. Only then we load it into final table. We should be doing this for important tables to ensure data integritiy and avoid human errors. (Eg. the 90% control 10% test)