> Show the airflow job example. # For this talk I'm assuming no prior airflow knowlegde, so befor we begin lets familiarize some of the airflow concepts. # Each box is task here, and you chain these tasks to form a complete Job or DAG = Same thing # operators are just templates Task with some parameters # Sensors - if we want to wait something like upstream tables, we use special type of operators # Hooks are just simplied interface to connect to external services are like a DB A brief overview of common, high level airflow concepts - operators - a template for a predefined Task - sensors - a special type of Operator that are designed to wait for something to occur. - hooks - a high-level interface to an external services like databases [Airflow concepts](https://airflow.apache.org/docs/apache-airflow/stable/concepts/overview.html) Maybe show them the UI first so they know how the end result looks like [AppsFlyer [ Activity Event](https://airflow-crm.data-engineering.myteksi.net/admin/airflow/tree?dag_id=perf.f_appsflyer_activity_events) or [First Booking](https://airflow-crm.data-engineering.myteksi.net/admin/airflow/tree?dag_id=appsflyer.s2s_first_bookings.v0) or [ID GF Subscription](https://airflow-crm.data-engineering.myteksi.net/admin/airflow/tree?dag_id=crm.eater.id_package_ceria_subscription)] ## 1. Example: How to schedule a Presto job ### Creating the table The usual workbench SQL method: ```sql CREATE TABLE IF NOT EXISTS schema.table ( name varchar, rundate date ) WITH (partitioned_by = ARRAY['rundate']); ``` ### Airflow DAG #### Sensors if we want to wait upstream tables, light house sensor, details of it we will put in to. You run the sensor task then do. Every job in the end need creates Lighthouse, we can extend the presto to include lighthouse signal. what if your job depends on another job, we need to create a light house signal. #### Cluster tag we now running in AWS, magic-presto -> magic-presto-az can specify this PrestoOpertator ```python from airflow import DAG from operators.presto_operator import PrestoOperator from datetime import datetime, timedelta from hooks.slack_hook import send_slack_notification from airflow.operators.light_house_job_dependencies_sensor import LightHouseJobDependenciesSensor default_args = { 'owner': 'kornesh.kanan', 'start_date': datetime(2021, 7, 1), 'retries': 3, 'retry_delay': timedelta(minutes=3), 'on_failure_callback': send_slack_notification( status='Failure', username='Perf Update', extra_text='cc @vaibhav.vij', ) } with DAG( 'crm.examples.perf_update', # job id schedule_interval='@daily', default_args=default_args, catchup=False, max_active_runs=1) as dag: sensor = LightHouseJobDependenciesSensor(...) daily = PrestoOperator(task_id='daily_update', queries=[ ''' DELETE FROM schema.table WHERE rundate = '2022-01-01' ''', ''' INSERT INTO schema.table (name, rundate) VALUES ('A', '2022-01-01'), ('B', '2022-01-01') ''' ]) sensor >> daily ``` ### Code review & deployment Create a git branch and commit changes ```bash git checkout -b 'airflow-example' # Create a branch git add dags/example.py # Add the files git commit -m "Add Airflow example" # Commit changes git push # Push commit to origin ``` You can copy paste the MR link in a browser. Once merged, it will be automatically deployed to our production airflow instance. You can view the job from [Airflow UI](https://airflow-crm.data-engineering.myteksi.net/admin/) ## 2. Reboot Campaign queries [TYPE] A quick ~~demo~~ overview of how we can migrate a slide job into an airflow job. - [sfmc_kyc_rejection_daily](https://one-de.prod.data-engineering.myteksi.net/slide/crm/jobs/1371) - [sfmc_kyc_rejection_cumulative](https://one-de.prod.data-engineering.myteksi.net/slide/crm/jobs/1372) Copy paste the following lines into the above example. ```python ... with DAG(...) as dag: daily = PrestoOperator(task_id='daily_update', queries=[...]) cumulative = PrestoOperator(task_id='cumulative_update', queries=[ '''DELETE FROM ...''', '''INSERT INTO ...''' ]) sensor >> daily >> cumulative ``` In slide we'd update the `src/main/scripts/sfmc/daily_s3_to_sftp_automations.yaml` ```yaml gfg: owner: vaibhav.vij notify_cc: - yongseng.goh sftp_conn_id: sftp_grabtaxi schema: crm partition_column: rundate sftp_path: /Import/gfg_automations/ start_date: 2020-03-27 end_date: encrypt: yes platform: sfmc automations: pax_crm_all_all_gp_kyc-rejection_all: table_name: sfmc_kyc_rejection_daily ``` ### 2.2 How to move from YAML to DAGs This yaml can be part of dag. starts daily 8am in the morning. Only way to connect this is through yaml. they don't have to sense or wait for something to complete. Now eveyrhting will be immediate Based on [crm.eater.id_package_ceria_subscription](https://gitlab.myteksi.net/data-science/marketing/airflow/-/blob/master/src/main/python/dags/campaigns/eater/id_gf_subscription.py) ```python from sfmc.sftp_upload_helper import create_upload_tasks ... with DAG(...): check_snoozetime_limit, count_check, s3_to_sftp_upload, signal = \ create_upload_tasks( table_name='sfmc_kyc_rejection_daily', notify_cc=['vaibhav.vij'], sftp_path='/Import/gfg_automations/', journey_name='pax_crm_id_all_gf_subscription-exp_all', sftp_conn_id='sftp_grabtaxi', **default_args) sensor >> daily >> check_snoozetime_limit >> count_check >> s3_to_sftp_upload >> signal daily >> cumulative ``` ## 3. Data Quality Check: NEW file. Copy sensor daily example template We need some guardrails into to ensure data integrity and to avoid human errors (eg. the 90% control 10% test). Before we insert data into a production table, we should ideally write it into a staging table and do some data quality check. Eg. check if the rows are unique. Only after this we should load it into our final table. Especially/At least, we should be doing this for important tables. ### _daily_ task splits into two tasks: Staging and production ```python ... with DAG(...) as dag: sensor = LightHouseJobDependenciesSensor(...) daily_stg = PrestoOperator(task_id='daily_stg', queries=[ '''DROP TABLE IF EXISTS campaign_stg''', '''CREATE TABLE campaign_stg AS (...)''']) dq_check = DataQualityCheck(...) # Eg. UniquenessCheck daily_prod = PrestoOperator(task_id='daily_prod', queries=[ '''DELETE FROM campaign WHERE rundate = ...''', '''INSERT INTO campaign SELECT * FROM daily_stg ...''']) sensor >> daily_stg >> dq_check >> daily_prod ```