> Show the airflow job example.
# For this talk I'm assuming no prior airflow knowlegde, so befor we begin lets familiarize some of the airflow concepts.
# Each box is task here, and you chain these tasks to form a complete Job or DAG = Same thing
# operators are just templates Task with some parameters
# Sensors - if we want to wait something like upstream tables, we use special type of operators
# Hooks are just simplied interface to connect to external services are like a DB
A brief overview of common, high level airflow concepts
- operators - a template for a predefined Task
- sensors - a special type of Operator that are designed to wait for something to occur.
- hooks - a high-level interface to an external services like databases
[Airflow concepts](https://airflow.apache.org/docs/apache-airflow/stable/concepts/overview.html)
Maybe show them the UI first so they know how the end result looks like [AppsFlyer [ Activity Event](https://airflow-crm.data-engineering.myteksi.net/admin/airflow/tree?dag_id=perf.f_appsflyer_activity_events) or [First Booking](https://airflow-crm.data-engineering.myteksi.net/admin/airflow/tree?dag_id=appsflyer.s2s_first_bookings.v0) or [ID GF Subscription](https://airflow-crm.data-engineering.myteksi.net/admin/airflow/tree?dag_id=crm.eater.id_package_ceria_subscription)]
## 1. Example: How to schedule a Presto job
### Creating the table
The usual workbench SQL method:
```sql
CREATE TABLE IF NOT EXISTS schema.table (
name varchar,
rundate date
)
WITH (partitioned_by = ARRAY['rundate']);
```
### Airflow DAG
#### Sensors
if we want to wait upstream tables, light house sensor, details of it we will put in to. You run the sensor task then do.
Every job in the end need creates Lighthouse, we can extend the presto to include lighthouse signal. what if your job depends on another job, we need to create a light house signal.
#### Cluster tag
we now running in AWS, magic-presto -> magic-presto-az
can specify this PrestoOpertator
```python
from airflow import DAG
from operators.presto_operator import PrestoOperator
from datetime import datetime, timedelta
from hooks.slack_hook import send_slack_notification
from airflow.operators.light_house_job_dependencies_sensor import LightHouseJobDependenciesSensor
default_args = {
'owner': 'kornesh.kanan',
'start_date': datetime(2021, 7, 1),
'retries': 3,
'retry_delay': timedelta(minutes=3),
'on_failure_callback': send_slack_notification(
status='Failure',
username='Perf Update',
extra_text='cc @vaibhav.vij',
)
}
with DAG(
'crm.examples.perf_update', # job id
schedule_interval='@daily',
default_args=default_args,
catchup=False,
max_active_runs=1) as dag:
sensor = LightHouseJobDependenciesSensor(...)
daily = PrestoOperator(task_id='daily_update',
queries=[
'''
DELETE FROM schema.table WHERE rundate = '2022-01-01'
''',
'''
INSERT INTO schema.table (name, rundate)
VALUES
('A', '2022-01-01'),
('B', '2022-01-01')
'''
])
sensor >> daily
```
### Code review & deployment
Create a git branch and commit changes
```bash
git checkout -b 'airflow-example' # Create a branch
git add dags/example.py # Add the files
git commit -m "Add Airflow example" # Commit changes
git push # Push commit to origin
```
You can copy paste the MR link in a browser. Once merged, it will be automatically deployed to our production airflow instance.
You can view the job from [Airflow UI](https://airflow-crm.data-engineering.myteksi.net/admin/)
## 2. Reboot Campaign queries [TYPE]
A quick ~~demo~~ overview of how we can migrate a slide job into an airflow job.
- [sfmc_kyc_rejection_daily](https://one-de.prod.data-engineering.myteksi.net/slide/crm/jobs/1371)
- [sfmc_kyc_rejection_cumulative](https://one-de.prod.data-engineering.myteksi.net/slide/crm/jobs/1372)
Copy paste the following lines into the above example.
```python
...
with DAG(...) as dag:
daily = PrestoOperator(task_id='daily_update',
queries=[...])
cumulative = PrestoOperator(task_id='cumulative_update',
queries=[
'''DELETE FROM ...''',
'''INSERT INTO ...'''
])
sensor >> daily >> cumulative
```
In slide we'd update the `src/main/scripts/sfmc/daily_s3_to_sftp_automations.yaml`
```yaml
gfg:
owner: vaibhav.vij
notify_cc:
- yongseng.goh
sftp_conn_id: sftp_grabtaxi
schema: crm
partition_column: rundate
sftp_path: /Import/gfg_automations/
start_date: 2020-03-27
end_date:
encrypt: yes
platform: sfmc
automations:
pax_crm_all_all_gp_kyc-rejection_all:
table_name: sfmc_kyc_rejection_daily
```
### 2.2 How to move from YAML to DAGs
This yaml can be part of dag. starts daily 8am in the morning. Only way to connect this is through yaml. they don't have to sense or wait for something to complete. Now eveyrhting will be immediate
Based on [crm.eater.id_package_ceria_subscription](https://gitlab.myteksi.net/data-science/marketing/airflow/-/blob/master/src/main/python/dags/campaigns/eater/id_gf_subscription.py)
```python
from sfmc.sftp_upload_helper import create_upload_tasks
...
with DAG(...):
check_snoozetime_limit, count_check, s3_to_sftp_upload, signal = \
create_upload_tasks(
table_name='sfmc_kyc_rejection_daily',
notify_cc=['vaibhav.vij'],
sftp_path='/Import/gfg_automations/',
journey_name='pax_crm_id_all_gf_subscription-exp_all',
sftp_conn_id='sftp_grabtaxi',
**default_args)
sensor >> daily >> check_snoozetime_limit >> count_check >> s3_to_sftp_upload >> signal
daily >> cumulative
```
## 3. Data Quality Check: NEW file. Copy sensor daily example template
We need some guardrails into to ensure data integrity and to avoid human errors (eg. the 90% control 10% test).
Before we insert data into a production table, we should ideally write it into a staging table and do some data quality check. Eg. check if the rows are unique. Only after this we should load it into our final table. Especially/At least, we should be doing this for important tables.
### _daily_ task splits into two tasks: Staging and production
```python
...
with DAG(...) as dag:
sensor = LightHouseJobDependenciesSensor(...)
daily_stg = PrestoOperator(task_id='daily_stg',
queries=[
'''DROP TABLE IF EXISTS campaign_stg''',
'''CREATE TABLE campaign_stg AS (...)'''])
dq_check = DataQualityCheck(...) # Eg. UniquenessCheck
daily_prod = PrestoOperator(task_id='daily_prod',
queries=[
'''DELETE FROM campaign WHERE rundate = ...''',
'''INSERT INTO campaign SELECT * FROM daily_stg ...'''])
sensor >> daily_stg >> dq_check >> daily_prod
```