# Airflow Dag 撰寫 ###### tags: `系統` ``` 專案: Airflow 自動化流程管理系統 更新日期: 2021-09-27 版本: V1 作者: Ya-Sheng Chen (Rock) ``` ## 序 `Airflow`是一個工作流分配管理系統,通過有向非迴圈圖的方式管理任務流程,設定任務依賴關係和時間排程,在資料工程中,常會有繁雜的資料處理,有序且多樣條件的運行,Bash 雖然可以快速撰寫並部署自動化,但當數量變多且繁雜時,維護及除錯變得複雜繁瑣且難控管,利用Airflow 可視化介面讓控管及除錯更方便。 本篇說明如何撰寫Dags 腳本,部署自動化程式。 如未安裝Airflow 可參考 [Airflow 安裝流程](https://hackmd.io/tpALjmehRJK7s9Qk64oIdw) ## 啟動 - 於`local install` - `airflow webserver -p 8080` - `airflow scheduler` - 使用`docker-compose` 安裝 - `cd docker-compose` - `docker compose up` - 使用 [GUI介面管理](http://localhost:8080/) Airflow ## 建立腳本Dags `DAG`-有向無環圖 `(Directed Acyclic Graph) ` 是類似氣流式的有序且相依的流程圖,並表示應該如何運行,故以此為Airflow 每項任務的流程名稱。 `DAG 腳本`主要以下列部分組成: - 設定預設參數 - 建立 `DAG` - 建立 `task` 並套用`Dag` - 建立 `task` 執行順序 範例: ``` from airflow import DAG from datetime import datetime, timedelta from airflow.operators.bash_operator import BashOperator from airflow.operators.dummy_operator import DummyOperator from airflow.sensors.external_task_sensor import ExternalTaskSensor from airflow.operators.python_operator import BranchPythonOperator, PythonOperator # 設定 Dags預設參數 default_args = { 'owner': 'rock', 'depends_on_past': False, 'start_date': datetime(2021,9,2), 'email': ['rock@example.com'], 'email_on_failure': True, # Error send email 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5) } # 建立dag 物件 GenPros_dag = DAG(dag_id = 'General_Process_Yearly', description = 'General Yearly Process', default_args = default_args, schedule_interval = '0 0 31 12 *', # 每月1日 00am tags = ['Year','Maintain','Stock']) # cmd 指令組建 sour_act = f'source {venv_path}/venv/bin/activate' gbe_cmd = f'cd {basic_path} && scrapy crawl GB_ECG_Spider' idc_cmd = f'cd {basic_path} && scrapy crawl TW_IDC_Spider' # 建立執行task start = DummyOperator(task_id='start', start_date=startDate, dag=GenPros_dag) end = DummyOperator(task_id='end', start_date=startDate, dag=GenPros_dag) gbe_crawl = BashOperator(task_id='GB_ECG_Spider', bash_command=gbe_cmd, dag=GenPros_dag) idc_crawl = BashOperator(task_id='TW_IDC_Spider', bash_command=idc_cmd, dag=GenPros_dag) # 建立task 有序執行 start >> [gbe_crawl, idc_crawl] >> end ``` ### 預設參數 - `parameter`: - **`owner`**:DAG 擁有者的名稱,如上一篇說明的,通常是負責實作這個 DAG 的人員名稱 - **`depends_on_past`**: 每一次執行的 Task 是否會依賴於上次執行的 Task,`False`,代表上次的 Task 如果執行失敗,這次的 Task 就不會繼續執行 - **`start_date`**: Task 從哪個日期後開始可以被 Scheduler 排入排程 - **`email`**: 如果 Task 執行失敗的話,要寄信給哪些人的 email - **`email_on_failure`**: 如果 Task 執行失敗的話,是否寄信 - **`email_on_retry`**: 如果 Task 重試的話,是否寄信 - **`retries`**: 最多重試的次數 - **`retry_delay`**: 每次重試中間的間隔 - **`end_date`**: Task 從哪個日期後,開始不被 Scheduler 放入排程 - **`execution_timeout`** : Task 執行時間的上限 - **`on_failure_callback`**: Task 執行失敗時,呼叫的 function - **`on_success_callback`**: Task 執行成功時,呼叫的 function - **`on_retry_callback`**: Task 重試時,呼叫的 function ``` default_args = { 'owner': 'rock', 'depends_on_past': False, 'start_date': datetime(2021, 09, 10), 'email': ['rock@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'end_date': datetime(2020, 2, 29), # 'execution_timeout': timedelta(seconds=300), # 'on_failure_callback': some_function, # 'on_success_callback': some_other_function, # 'on_retry_callback': another_function, } ``` #### Dags object 創建 - 使用With ``` with DAG(dag_id = v['dag_id'], description = v['description'], default_args = default_args, schedule_interval = v['schedule_interval'], tags = v['tags'] ) as dag: <do some task & workflow> ``` - create variable ``` dag1 = DAG(dag_id = v['dag_id'], description = v['description'], default_args = default_args, schedule_interval = v['schedule_interval'], tags = v['tags'] ) ``` #### 建立 Task - 自動化 `Task` 主要分為以下操作(`Dags Operators`): - `BashOperator` - 執行bash 指令 - `PythonOperator` - 執行python function - `EmailOperator` - 寄Email - `SimpleHttpOperator` - 請求http request - `MySqlOperator`, `SqliteOperator`, `PostgresOperator`, `MsSqlOperator`, `OracleOperator`, `JdbcOperator`, etc. - 執行 sql 語法 - `Sensor` - 等待 certain time, file, database row, S3 key, etc…回覆 >更多請參考 [官方文件](https://airflow.apache.org/docs/apache-airflow/1.10.1/concepts.html#operators) 範例: ``` task1 = DummyOperator(task_id='skip', start_date=startDate, dag=dag1) task2 = BashOperator( task_id='test1', bash_command=f'echo process Start', dag=dag1) ``` #### 建立Task 執行順序 `>>` 代表有向順序執行 ``` op1 >> op2 ``` 使用`[]` 代表同層執行 ``` op1 >> [op2, op3] >> op4 ``` #### 建立DAG 實體參數`schedule_interval` | preset | meaning | cron | | ------ | -------------------------------------------------- | ---- | | `None` | 不排程時間,利用事件`externally triggered`觸發執行 | - | |`@once` | 排程一次,僅執行一次|-| |`@hourly` |每小時執行| `0 * * * *`| |`@daily` |每日 `00:00` 執行 |`0 0 * * *`| |`@weekly` |每週日 `00:00` 執行|`0 0 * * 0` |`@monthly` |每月1號 `00:00`執行|`0 0 1 * *`| |`@yearly`| 每年1/1號 `00:00`執行|`0 0 1 1 *`| >note: Use:`schedule_interval=None`,Not use`schedule_interval='None'` 參考文件: 更多關於 scheduler 請參考 [pypi doc](https://airflow.apache.org/docs/apache-airflow/1.10.1/scheduler.html) 快速建立cron 格式時間可使用 [cron monitor](https://crontab.guru/) 產生相關`cron` 時間語法。 ##### 父子Dag 相依性 當Dags 相互有執行關係時,dag2 要等待 dags1 完全執行完後才執行,使用`ExternalTaskSensor` `from airflow.operators.sensors import ExternalTaskSensor` ``` wait_for_dinner = ExternalTaskSensor( task_id='wait_for_dinner', external_dag_id='Parent_dag', external_task_id='cook_dinner', start_date=datetime(2020, 4, 29), execution_delta=timedelta(hours=1), timeout=3600, ) ``` 更多請參考 [trigger dag on sucess](https://stackoverflow.com/questions/61514887/how-to-trigger-a-dag-on-the-success-of-a-another-dag-in-airflow-using-python) #### `airflow.cfg` 設定 ##### Not showing example ``` load_examples = False ``` ##### setting alarm email ``` [smtp] smtp_host = smtp.gmail.com smtp_starttls = True smtp_ssl = False smtp_user = YOUR_EMAIL_ADDRESS smtp_password = 16_DIGIT_APP_PASSWORD smtp_port = 587 smtp_mail_from = YOUR_EMAIL_ADDRESS ``` > how to gmail application password ? 參考 附件1 `取得應用程式使用密碼` ##### setting timezone ``` [core] # Default timezone in case supplied date times are naive # can be utc (default), system, or any IANA timezone string (e.g. Europe/Amsterdam) default_timezone = Asia/Taipei [webserver] # Default timezone to display all dates in the UI, can be UTC, system, or # any IANA timezone string (e.g. Europe/Amsterdam). If left empty the # default value of core/default_timezone will be used # Example: default_ui_timezone = America/New_York default_ui_timezone = Asia/Taipei ``` > 另Dag 須將`default_args`內的`Start_Date`參數加上`tzinfo`,以表示顯示時區格式 範例: ``` import pendulum local_tz = pendulum.timezone("Asia/Taipei") # 設定 Dags預設參數 default_args = { ... 'start_date': datetime(2021, 9, 15,tzinfo=local_tz), ... } ``` ### Dag 執行延遲一日解決方式 情境: 如果今日是2022-01-11,使用shedule_interval 設定cron `0 0 * * * *` 預計每日00點執行腳本,預期執行日為 2022-01-11 00:00:00,因airflow 設計關係,實際執行為2022-01-12執行,可以理解為 運行11日腳本=11~12 日之間的工作故會等,11日實際過完才執行腳本。 [可參考說明](https://towardsdatascience.com/airflow-schedule-interval-101-bbdda31cc463) [可參考說明2](https://forum.astronomer.io/t/airflow-pro-tip-scheduler-will-run-your-job-one-schedule-interval-after-the-start-date/315) ## 參考資源 - [Airflow 官方文件](https://airflow.apache.org/docs/apache-airflow/stable/index.html) - [Airflow pypi 文件](https://airflow.apache.org/docs/apache-airflow/1.10.1/index.html) - [一段 Airflow 與資料工程的故事:談如何用 Python 追漫畫連載](https://leemeng.tw/a-story-about-airflow-and-data-engineering-using-how-to-use-python-to-catch-up-with-latest-comics-as-an-example.html) - [資料科學家 L 的奇幻旅程 Vol.1 新人不得不問的 2 個問題](https://leemeng.tw/journey-of-data-scientist-L-part-1-two-must-ask-questions-when-on-board.html#%E5%84%80%E8%A1%A8%E6%9D%BF%E4%B8%8A%E7%9A%84-KPI-%E6%98%AF%E6%80%8E%E9%BA%BC%E7%94%A2%E7%94%9F%E7%9A%84%EF%BC%9F) - [Airflow動手玩](https://zh-tw.coderbridge.com/series/c012cc1c8f9846359bb9b8940d4c10a8/posts/96bfcd7cfbc241b19f38248dac4b826e) ## 附件1 Google帳戶 `取得應用程式使用密碼` | 步驟 | 圖示 | | ---- | ------------------------ | | 右上角`應用程式icon`|<img src="https://i.imgur.com/Bfw2UUe.png" width="30"> | |google 帳戶設定<img src="https://i.imgur.com/3rsgMPR.png" width="50"> | <img src="https://i.imgur.com/79U8qun.png" width="200">| | `安全性設定`<br>>`登入google`|<img src="https://i.imgur.com/ukg9jNU.png" width="200"><img src="https://i.imgur.com/beG2qAP.png" width="300">| | 開啟完`兩步驟驗證`後<br>>`應用程式密碼` || | 建立應用程式密碼`選取應用程式`<br>>`其他(自訂名稱)` |<img src="https://i.imgur.com/MA6IR9U.png" width="300"><img src="https://i.imgur.com/DJwDnxx.png" width="300">|