# Airflow Dag 撰寫
###### tags: `系統`
```
專案: Airflow 自動化流程管理系統
更新日期: 2021-09-27
版本: V1
作者: Ya-Sheng Chen (Rock)
```
## 序
`Airflow`是一個工作流分配管理系統,通過有向非迴圈圖的方式管理任務流程,設定任務依賴關係和時間排程,在資料工程中,常會有繁雜的資料處理,有序且多樣條件的運行,Bash 雖然可以快速撰寫並部署自動化,但當數量變多且繁雜時,維護及除錯變得複雜繁瑣且難控管,利用Airflow 可視化介面讓控管及除錯更方便。
本篇說明如何撰寫Dags 腳本,部署自動化程式。
如未安裝Airflow 可參考 [Airflow 安裝流程](https://hackmd.io/tpALjmehRJK7s9Qk64oIdw)
## 啟動
- 於`local install`
- `airflow webserver -p 8080`
- `airflow scheduler`
- 使用`docker-compose` 安裝
- `cd docker-compose`
- `docker compose up`
- 使用 [GUI介面管理](http://localhost:8080/) Airflow
## 建立腳本Dags
`DAG`-有向無環圖 `(Directed Acyclic Graph) ` 是類似氣流式的有序且相依的流程圖,並表示應該如何運行,故以此為Airflow 每項任務的流程名稱。
`DAG 腳本`主要以下列部分組成:
- 設定預設參數
- 建立 `DAG`
- 建立 `task` 並套用`Dag`
- 建立 `task` 執行順序
範例:
```
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.operators.python_operator import BranchPythonOperator, PythonOperator
# 設定 Dags預設參數
default_args = {
'owner': 'rock',
'depends_on_past': False,
'start_date': datetime(2021,9,2),
'email': ['rock@example.com'],
'email_on_failure': True, # Error send email
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
# 建立dag 物件
GenPros_dag = DAG(dag_id = 'General_Process_Yearly',
description = 'General Yearly Process',
default_args = default_args,
schedule_interval = '0 0 31 12 *', # 每月1日 00am
tags = ['Year','Maintain','Stock'])
# cmd 指令組建
sour_act = f'source {venv_path}/venv/bin/activate'
gbe_cmd = f'cd {basic_path} && scrapy crawl GB_ECG_Spider'
idc_cmd = f'cd {basic_path} && scrapy crawl TW_IDC_Spider'
# 建立執行task
start = DummyOperator(task_id='start', start_date=startDate, dag=GenPros_dag)
end = DummyOperator(task_id='end', start_date=startDate, dag=GenPros_dag)
gbe_crawl = BashOperator(task_id='GB_ECG_Spider', bash_command=gbe_cmd, dag=GenPros_dag)
idc_crawl = BashOperator(task_id='TW_IDC_Spider', bash_command=idc_cmd, dag=GenPros_dag)
# 建立task 有序執行
start >> [gbe_crawl, idc_crawl] >> end
```
### 預設參數
- `parameter`:
- **`owner`**:DAG 擁有者的名稱,如上一篇說明的,通常是負責實作這個 DAG 的人員名稱
- **`depends_on_past`**: 每一次執行的 Task 是否會依賴於上次執行的 Task,`False`,代表上次的 Task 如果執行失敗,這次的 Task 就不會繼續執行
- **`start_date`**: Task 從哪個日期後開始可以被 Scheduler 排入排程
- **`email`**: 如果 Task 執行失敗的話,要寄信給哪些人的 email
- **`email_on_failure`**: 如果 Task 執行失敗的話,是否寄信
- **`email_on_retry`**: 如果 Task 重試的話,是否寄信
- **`retries`**: 最多重試的次數
- **`retry_delay`**: 每次重試中間的間隔
- **`end_date`**: Task 從哪個日期後,開始不被 Scheduler 放入排程
- **`execution_timeout`** : Task 執行時間的上限
- **`on_failure_callback`**: Task 執行失敗時,呼叫的 function
- **`on_success_callback`**: Task 執行成功時,呼叫的 function
- **`on_retry_callback`**: Task 重試時,呼叫的 function
```
default_args = {
'owner': 'rock',
'depends_on_past': False,
'start_date': datetime(2021, 09, 10),
'email': ['rock@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'end_date': datetime(2020, 2, 29),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
}
```
#### Dags object 創建
- 使用With
```
with DAG(dag_id = v['dag_id'],
description = v['description'],
default_args = default_args,
schedule_interval = v['schedule_interval'],
tags = v['tags']
) as dag:
<do some task & workflow>
```
- create variable
```
dag1 = DAG(dag_id = v['dag_id'],
description = v['description'],
default_args = default_args,
schedule_interval = v['schedule_interval'],
tags = v['tags']
)
```
#### 建立 Task
- 自動化 `Task` 主要分為以下操作(`Dags Operators`):
- `BashOperator` - 執行bash 指令
- `PythonOperator` - 執行python function
- `EmailOperator` - 寄Email
- `SimpleHttpOperator` - 請求http request
- `MySqlOperator`, `SqliteOperator`, `PostgresOperator`, `MsSqlOperator`, `OracleOperator`, `JdbcOperator`, etc. - 執行 sql 語法
- `Sensor` - 等待 certain time, file, database row, S3 key, etc…回覆
>更多請參考 [官方文件](https://airflow.apache.org/docs/apache-airflow/1.10.1/concepts.html#operators)
範例:
```
task1 = DummyOperator(task_id='skip', start_date=startDate, dag=dag1)
task2 = BashOperator( task_id='test1', bash_command=f'echo process Start', dag=dag1)
```
#### 建立Task 執行順序
`>>` 代表有向順序執行
```
op1 >> op2
```
使用`[]` 代表同層執行
```
op1 >> [op2, op3] >> op4
```
#### 建立DAG 實體參數`schedule_interval`
| preset | meaning | cron |
| ------ | -------------------------------------------------- | ---- |
| `None` | 不排程時間,利用事件`externally triggered`觸發執行 | - |
|`@once` | 排程一次,僅執行一次|-|
|`@hourly` |每小時執行| `0 * * * *`|
|`@daily` |每日 `00:00` 執行 |`0 0 * * *`|
|`@weekly` |每週日 `00:00` 執行|`0 0 * * 0`
|`@monthly` |每月1號 `00:00`執行|`0 0 1 * *`|
|`@yearly`| 每年1/1號 `00:00`執行|`0 0 1 1 *`|
>note: Use:`schedule_interval=None`,Not use`schedule_interval='None'`
參考文件:
更多關於 scheduler 請參考 [pypi doc](https://airflow.apache.org/docs/apache-airflow/1.10.1/scheduler.html)
快速建立cron 格式時間可使用 [cron monitor](https://crontab.guru/) 產生相關`cron` 時間語法。
##### 父子Dag 相依性
當Dags 相互有執行關係時,dag2 要等待 dags1 完全執行完後才執行,使用`ExternalTaskSensor`
`from airflow.operators.sensors import ExternalTaskSensor`
```
wait_for_dinner = ExternalTaskSensor(
task_id='wait_for_dinner',
external_dag_id='Parent_dag',
external_task_id='cook_dinner',
start_date=datetime(2020, 4, 29),
execution_delta=timedelta(hours=1),
timeout=3600,
)
```
更多請參考 [trigger dag on sucess](https://stackoverflow.com/questions/61514887/how-to-trigger-a-dag-on-the-success-of-a-another-dag-in-airflow-using-python)
#### `airflow.cfg` 設定
##### Not showing example
```
load_examples = False
```
##### setting alarm email
```
[smtp]
smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
smtp_user = YOUR_EMAIL_ADDRESS
smtp_password = 16_DIGIT_APP_PASSWORD
smtp_port = 587
smtp_mail_from = YOUR_EMAIL_ADDRESS
```
> how to gmail application password ? 參考 附件1 `取得應用程式使用密碼`
##### setting timezone
```
[core]
# Default timezone in case supplied date times are naive
# can be utc (default), system, or any IANA timezone string (e.g. Europe/Amsterdam)
default_timezone = Asia/Taipei
[webserver]
# Default timezone to display all dates in the UI, can be UTC, system, or
# any IANA timezone string (e.g. Europe/Amsterdam). If left empty the
# default value of core/default_timezone will be used
# Example: default_ui_timezone = America/New_York
default_ui_timezone = Asia/Taipei
```
> 另Dag 須將`default_args`內的`Start_Date`參數加上`tzinfo`,以表示顯示時區格式
範例:
```
import pendulum
local_tz = pendulum.timezone("Asia/Taipei")
# 設定 Dags預設參數
default_args = {
...
'start_date': datetime(2021, 9, 15,tzinfo=local_tz),
...
}
```
### Dag 執行延遲一日解決方式
情境: 如果今日是2022-01-11,使用shedule_interval 設定cron `0 0 * * * *` 預計每日00點執行腳本,預期執行日為 2022-01-11 00:00:00,因airflow 設計關係,實際執行為2022-01-12執行,可以理解為 運行11日腳本=11~12 日之間的工作故會等,11日實際過完才執行腳本。
[可參考說明](https://towardsdatascience.com/airflow-schedule-interval-101-bbdda31cc463)
[可參考說明2](https://forum.astronomer.io/t/airflow-pro-tip-scheduler-will-run-your-job-one-schedule-interval-after-the-start-date/315)
## 參考資源
- [Airflow 官方文件](https://airflow.apache.org/docs/apache-airflow/stable/index.html)
- [Airflow pypi 文件](https://airflow.apache.org/docs/apache-airflow/1.10.1/index.html)
- [一段 Airflow 與資料工程的故事:談如何用 Python 追漫畫連載](https://leemeng.tw/a-story-about-airflow-and-data-engineering-using-how-to-use-python-to-catch-up-with-latest-comics-as-an-example.html)
- [資料科學家 L 的奇幻旅程 Vol.1 新人不得不問的 2 個問題](https://leemeng.tw/journey-of-data-scientist-L-part-1-two-must-ask-questions-when-on-board.html#%E5%84%80%E8%A1%A8%E6%9D%BF%E4%B8%8A%E7%9A%84-KPI-%E6%98%AF%E6%80%8E%E9%BA%BC%E7%94%A2%E7%94%9F%E7%9A%84%EF%BC%9F)
- [Airflow動手玩](https://zh-tw.coderbridge.com/series/c012cc1c8f9846359bb9b8940d4c10a8/posts/96bfcd7cfbc241b19f38248dac4b826e)
## 附件1 Google帳戶 `取得應用程式使用密碼`
| 步驟 | 圖示 |
| ---- | ------------------------ |
| 右上角`應用程式icon`|<img src="https://i.imgur.com/Bfw2UUe.png" width="30"> |
|google 帳戶設定<img src="https://i.imgur.com/3rsgMPR.png" width="50"> | <img src="https://i.imgur.com/79U8qun.png" width="200">|
| `安全性設定`<br>>`登入google`|<img src="https://i.imgur.com/ukg9jNU.png" width="200"><img src="https://i.imgur.com/beG2qAP.png" width="300">|
| 開啟完`兩步驟驗證`後<br>>`應用程式密碼` ||
| 建立應用程式密碼`選取應用程式`<br>>`其他(自訂名稱)` |<img src="https://i.imgur.com/MA6IR9U.png" width="300"><img src="https://i.imgur.com/DJwDnxx.png" width="300">|