# 0. Getting started with Airflow
## Airflow
- We only cover v1. Latest version of Airflow is v2 but we're running on v1. Migrating to v2 requires significant effort from our end. Find out [what has changed on v1 vs v2](https://wiki.grab.com/display/OLYMPUS/04.+Airflow+Service:+Migration+from+Airflow+v1+to+Airflow+v2).
- Familiarize/introduce high level [Airflow concepts](https://airflow.apache.org/docs/apache-airflow/stable/concepts/overview.html)
### Folder structure
`/src/main/python/dags/`
contain all our DAGs
`/src/main/python/helpers/`
- common python scripts that you want to use across many DAGs
- operators - a template for a predefined Task
- sensors - a special type of Operator that are designed to wait for something to occur.
- hooks - a high-level interface to an external services like databases
~~/src/main/python/plugins/~~
### Naming conventions and best practises
- snake case variables and DAG IDs by default (eg `snake_case` vs `CamelCase`)
- TODO: team naming coventions & subdirectory structure (`eg perf.table_name`)
- Follow [Airflow Best Practises](https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html)
## [Not covered] Running examples locally with Docker & docker-compose
The easiest & fastest way to get hands on Airflow is running a instance locally.
Setting up local environment for development requires [installation of docker and optionally docker-compose](https://docs.docker.com/compose/install/).
With docker-compose we can `docker-compose up`
```yaml=docker-compose.yml
version: '3'
services:
airflow:
image: puckel/docker-airflow
ports:
- "8080:8080"
volumes:
- './:/usr/local/airflow/dags'
```
or with docker:
```bash
docker run -d -p 8080:8080 -v .:/usr/local/airflow/dags puckel/docker-airflow
docker ps
docker exec -it b2a71e bash
```
then visit [http://localhost:8080](http://localhost:8080) for the UI.
## 1. Example: How to schedule a Presto job
### Creating the table
The usual workbench SQL method.
```sql
CREATE TABLE IF NOT EXISTS schema.table (
name varchar,
rundate date
)
WITH (partitioned_by = ARRAY['rundate']);
```
you can also use a operator. External tables needs to be created via QuickSilver.
### Airflow DAG
```python
from airflow import DAG
from operators.presto_operator import PrestoOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'kornesh.kanan',
'start_date': datetime(2021, 7, 1),
'retries': 3,
'retry_delay': timedelta(minutes=3),
'depends_on_past': True
}
with DAG(
'hourly_update', # job id
schedule_interval='@hourly',
default_args=default_args,
catchup=False,
max_active_runs=1) as dag:
update = PrestoOperator(task_id='hourly_update',
queries=[
'''
DELETE FROM schema.table WHERE rundate = '2022-01-01'
''',
'''
INSERT INTO schema.table (name, rundate)
VALUES
('A', '2022-01-01'),
('B', '2022-01-01')
'''
])
update
```
### Code review & deployment
Create a git branch and commit changes
```bash
git checkout -b 'airflow-example' # Create a branch
git add dags/example.py # Add the files
git commit -m "Add Airflow example" # Commit changes
git push # Push commit to origin
```
You can copy paste the MR link in a browser. Once merged, it will be automatically deployed to our production airflow instance.
## 2. Reboot Campaign queries
A quick ~~demo~~ overview of how we can migrate a slide job into an airflow job.
Copy paste the following SQLs into the airflow template from above example.
[Q: Should we embed this code?]
- [sfmc_kyc_rejection_daily](https://one-de.prod.data-engineering.myteksi.net/slide/crm/jobs/1371)
- [sfmc_kyc_rejection_cumulative](https://one-de.prod.data-engineering.myteksi.net/slide/crm/jobs/1372)
Update the `src/main/scripts/sfmc/daily_s3_to_sftp_automations.yaml`
```yaml
gfg:
owner: vaibhav.vij
notify_cc:
- yongseng.goh
sftp_conn_id: sftp_grabtaxi
schema: crm
partition_column: rundate
sftp_path: /Import/gfg_automations/
start_date: 2020-03-27
end_date:
encrypt: yes
platform: sfmc
automations:
pax_crm_all_all_gp_kyc-rejection_all:
table_name: sfmc_kyc_rejection_daily
```
### 2.2 How move from YML to DAGs
Based on [crm.eater.id_package_ceria_subscription](https://gitlab.myteksi.net/data-science/marketing/airflow/-/blob/master/src/main/python/dags/campaigns/eater/id_gf_subscription.py)
```python
from sfmc.sftp_upload_helper import create_upload_tasks
...
with DAG(...):
check_snoozetime_limit, count_check, s3_to_sftp_upload, signal = \
create_upload_tasks(
table_name='sfmc_kyc_rejection_daily',
notify_cc=['vaibhav.vij'],
sftp_path='/Import/gfg_automations/',
journey_name='pax_crm_id_all_gf_subscription-exp_all', # TODO: What does it refers to
sftp_conn_id='sftp_grabtaxi',
**default_args) # TODO: Why default args here
```
## 3. Data Quality Check [WIP]
Before we insert data into table, we should ideally write it into a staging table and do some data quality check. Eg. check if the rows are unique. Only then we load it into final table. We should be doing this for important tables to ensure data integritiy and avoid human errors. (Eg. the 90% control 10% test)