# DOCUMENTATION AIRFLOW
<img src="https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcQ8FTpOXKsKFbmwZS28VI4JbAvi2cWgUlWQjJMw0yd1p9ikpMVJsHKsa_1v-5imPz0cHKE&usqp=CAU" height=70px/>
###### tags: `Documentation` `Airflow`
[TOC]
## Apa Itu Airflow?
:::info
**Apache Airflow** is an open-source workflow management platform for data engineering pipelines. It started at Airbnb in October 2014 as a solution to manage the company's increasingly complex workflows
:::
### Dependencies
**Operating system**
Airflow is being developed and tested on Linux, with Debian and RedHat being the main platforms.
**Python**
Airflow is written in Python 3 and requires Python 3.5 or higher.
**redis**
The redis database is required by Airflow as a communication broker between tasks. It is also used as the default broker for the Celery queuing system, but could be replaced with any other supported Celery broker.
**PostgreSQL**
Airflow makes use of PostgreSQL for storing persistent data during a workflow run that can be accessed by all tasks.
### Cakupan Airflow
:::info
:::
#### Workflow
**Sequential**
<img src="https://australiansynchrotron.github.io/lightflow/img/example_sequential.svg" width="70%" style="margin-bottom: 20px;" />
>The sequential workflow pattern is one of the most basic patterns. A number of tasks are executed in order, starting with the first task.
>In this example, a counter is stored in value and initialized in task_1. Each task increments the counter, prints its value and passes it on to the next task.
**Parallel**
<img src="https://australiansynchrotron.github.io/lightflow/img/example_parallel.svg" width="70%" style="margin-bottom: 20px;" />
>Airflow runs tasks that can be executed in parallel at the same time.
>This example starts with the branch_task that fans out into three simple tasks. Once all three tasks have completed running, the join_task is executed.
**Data Flow**
<img src="https://australiansynchrotron.github.io/lightflow/img/example_dataflow.svg" width="70%" style="margin-bottom: 20px;" />
>Almost arbitrary data can flow between tasks. If multiple tasks send data to the same task downstream, the data can be labeled with an alias in order for the downstream task to be able to pick the right dataset.
>In this example, the put_task stores 5 into value and passes it on to the print_task_1, square_task, multiply_task and subtract_task. The square_task squares the value, prints it and passes it on to the multiply_task. The input to the multiply_task are two datasets. Since multiplication is a commutative operation, the multiply_task will simply multiply both datasets regardless of their order. The result of the multiplication is passed on to the subtract_task. Since the order of the input for the subtract_task matters, the data passed from the put_task is given the alias first, while the data from the multiply_task is labelled second.
**Sub DAG**
<img src="https://australiansynchrotron.github.io/lightflow/img/example_subdag.svg" width="70%" style="margin-bottom: 20px;" />
>Lightflow allows for more than one DAG to be defined in a workflow and for tasks to queue new DAGs. This allows for dynamically changing workflows.
>In this example two DAGs are defined. The main_dag that is executed at the start of the workflow and the sub_dag that is started dynamically. The call_dag_task of the main_dag starts 5 sub_dag instances by sending a request to the signal system. For illustration purposes a numpy array is being sent with the request.
**Data Store**
<img src="https://australiansynchrotron.github.io/lightflow/img/example_store.svg" width="70%" style="margin-bottom: 20px;" />
>Data that should be kept during a workflow run can be saved into the persistent data store. This data is deleted as soon as the workflow run ends, but is available to all tasks during the lifetime of the workflow.
>The data store provides methods to store and retrieve single values or append values to a list. This can even be done asynchronously from different tasks at the same time. The key under which the data is being stored supports a hierarchical structure using a dot notation.
>This example workflow stores different types of data in the persistent data store and modifies them.
Trigger
Scheduler
### Konektifitas
:::info
:::
Python
Bash
SSH
FTP/SFTP
PostgreSQL
SQL Server
## Cara Kerja Airflow
:::info
:::
```sequence
Title: Inquiry Bulk Data Cekrekening.id \n
participant CEKREKENING.ID
participant PROXY
participant OCP
participant AIRFLOW
participant FDS
participant EXTDWH
FDS->AIRFLOW: Register \nJob Scheduler
AIRFLOW->CEKREKENING.ID: Inquiry Data Rekening
AIRFLOW-->OCP: Call API
OCP-->PROXY: Tunnel
PROXY-->CEKREKENING.ID: Passing Request
CEKREKENING.ID->AIRFLOW: Response result
CEKREKENING.ID-->PROXY: Passing Response
PROXY-->OCP: Receive
OCP-->AIRFLOW: By Pass
AIRFLOW->FDS: OK
AIRFLOW-->EXTDWH: Store Result
AIRFLOW->FDS: NOK
AIRFLOW-->KILL: Skip
```
## Use Case
:::info
:::
### Create DAG
```sequence
Title: Inquiry Bulk Data Cekrekening.id \n
participant CEKREKENING.ID
participant PROXY
participant OCP
participant AIRFLOW
participant FDS
participant EXTDWH
FDS->AIRFLOW: Register \nJob Scheduler
AIRFLOW->CEKREKENING.ID: Inquiry Data Rekening
AIRFLOW-->OCP: Call API
OCP-->PROXY: Tunnel
PROXY-->CEKREKENING.ID: Passing Request
CEKREKENING.ID->AIRFLOW: Response result
CEKREKENING.ID-->PROXY: Passing Response
PROXY-->OCP: Receive
OCP-->AIRFLOW: By Pass
AIRFLOW->FDS: OK
AIRFLOW-->EXTDWH: Store Result
AIRFLOW->FDS: NOK
AIRFLOW-->KILL: Skip
```
Running DAG
Monitoring DAG
```sequence
Title: Penarikan Data Tracking
participant ONMOBILE
participant FIREBASE
participant GOOGLE ANALYTICS
participant BIG QUERY
participant AIRFLOW
participant EXTDWH
participant BIG DATA
ONMOBILE->FIREBASE:
FIREBASE-->GOOGLE ANALYTICS:
GOOGLE ANALYTICS->AIRFLOW: Using API
GOOGLE ANALYTICS-->BIG QUERY: Scheduler \nBy Google
BIG QUERY->AIRFLOW: Direct Connect
AIRFLOW->EXTDWH: Store PostgreSQL
AIRFLOW->BIG DATA: Store Big Data
```
```
import os
from airflow import DAG
from bigdata.core.replication.BaseDags import create_custom_dags
from datetime import datetime, timezone, timedelta
FILE_PATH = os.path.dirname(os.path.realpath(__file__))
WORKFLOW_NAME = FILE_PATH.split('/')[-1]
TAGS = FILE_PATH.split('/')[-3:-1]
print("workflow_name: ", WORKFLOW_NAME)
kwargs_db = {
"workflow_name": WORKFLOW_NAME,
"connection_id": "mssql_sql_cls",
"chunksize": 1000,
"file_path": FILE_PATH
}
tags = ["bigdata"]
tags.extend(TAGS)
today = datetime.today()
tz = timezone(timedelta(hours=7))
today = today.astimezone(tz)
ds = today.strftime('%Y-%m-%d') config = {
"workflow_name": WORKFLOW_NAME,
"owner": "aris.purnomo",
"t0_name": "x_t0_sqlserver",
"t1_name": "x_t1_dev",
"table_name": "ClsFacilityWOCORecap_TR",
"start_date": "2022-10-19",
"salt_key": "development",
"schedule_interval": "@once",
"tags":tags,
"file_path": FILE_PATH,
"ds": ds,
"kwargs_db": kwargs_db
}
dags = create_custom_dags(config)
globals()[WORKFLOW_NAME] = dags
```