## **【Apache Airflow 工作流】** :::info - 什麼是 Apache Airflow? - Airflow One Node Architecture、Airflow Multi Node Architecture - 安裝Airflow (ft.Docker) - 什麼是DAG? - 撰寫DAG - 執行DAG - DAG 設置文件輸入/輸出 - 設置時間 - 寄email通知 ::: ### 什麼是 Airflow? Airflow,可以做到任務的排程、執行和監控 - 定義的 DAG 調度和執行 ETL 任務 - 定義任務之間的依賴性,確保在執行任務時的順序和條件 - 監控任務的執行狀態、錯誤信息 - 整合各種數據庫、資料湖、API 和其他數據源,成為一個通用的工作流自動化平台 在 Apache Airflow 中,有幾個重要的元素一起協同工作,形成一個完整的工作流自動化系統 : Web Server(提供用戶UI界面,能夠查看 DAG(有向無環圖)、觸發任務執行、查看運行記錄等) -> Operator(具體的任務,代表了要執行的單元,可能是action operators、transfer data、wait for a condition...) -> Scheduler -> Metastore(通常指的是Airflow的元數據庫,SQLite、MySQL、PostgreSQL) -> Triggerer(Celery 或其他分佈式任務佇列系統) (Kubernetes(簡稱 K8s)自動化部署、擴展和操作應用程序容器) -> Executor (定義任務的執行方式) <br/> ### 安裝Airflow #### docker 先安裝docker 到官網下載 [docker installer](https://www.docker.com/products/docker-desktop/) 設一個新資料夾,在終端機cd過去,輸入指令 ```= cd /Users/catalinakuo/Downloads/macbook_gpu/airflow_docker echo -e "AIRFLOW_IMAGE_NAME=apache/airflow:2.8.1\nAIRFLOW_UID=50000" > .env mkdir -p dags logs plugins config curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.8.1/docker-compose.yaml' ``` :+1: PS Vscode要先 (Cmd + Shift + P) Shell Command: Install 'code' command in PATH ![截圖 2025-05-25 11.16.55](https://hackmd.io/_uploads/HJ_nsbgzgx.png) 這時候進vscode查看 ![截圖 2025-05-25 11.17.35](https://hackmd.io/_uploads/SkR1n-lfgx.png) 確認一下【.env】 ```= AIRFLOW_IMAGE_NAME=apache/airflow:2.8.1 AIRFLOW_UID=50000 ``` ![截圖 2025-05-25 09.57.50](https://hackmd.io/_uploads/BJU4teezxx.png) 初使化AIRFLOW+啟動 ```= docker compose up airflow-init docker compose up -d ``` ![截圖 2025-05-25 11.19.35](https://hackmd.io/_uploads/H1PLn-gGee.png) 打開安裝好的應用程式 ![截圖 2025-05-25 12.45.06](https://hackmd.io/_uploads/HywDemeMee.png) ![螢幕擷取畫面 2023-11-25 234904](https://hackmd.io/_uploads/rkYKD5ySa.png) 在chrome輸入localhost:8080,打開ui介面,帳號密碼都是airflow ![截圖 2025-05-25 11.44.32](https://hackmd.io/_uploads/rJ4VGzefgl.png) ![螢幕擷取畫面 2023-11-25 003927](https://hackmd.io/_uploads/S12R-IA46.png) :+1: PS 如果無法成功,可能port號被佔據,在終端機清除docker volume 並重新啟動 ```= docker volume prune -f docker-compose down --volumes --remove-orphans lsof -i :5432 sudo kill -9 <port_number> docker-compose up -d ``` ![截圖 2025-05-25 11.43.55](https://hackmd.io/_uploads/r1XMMGlMxg.png) <br/> 點DAG進去後,上方可以查看 ```= - Grid (網格): 顯示各個任務的狀態和相關資訊 - Graph (圖表): 架構圖,顯示任務之間的依賴關係 - Calendar (行事曆): 以日曆形式顯示工作流程的執行計劃 - Task Duration (任務持續時間): 顯示每個任務執行所需的時間 - Task Tries (任務嘗試次數): 顯示任務執行失敗後的重試次數 - Landing Times (著陸時間): 顯示任務成功完成的時間 - Gantt (甘特圖): 顯示工作流程的執行時間表,會更直觀地了解任務之間的時間關係 - Details (詳細資訊): 查看工作流程執行的詳細資訊,日誌、輸入參數等 - Code (程式碼): 查看工作流程的程式碼 - Audit Log (審計日誌): 有關工作流程執行的審計日誌,可以用在追踪和分析系統的使用和行為 ``` ![螢幕擷取畫面 2023-11-25 121153](https://hackmd.io/_uploads/HJ9DNlJSp.png) <br/> ### 什麼是 DAG (Directed Acyclic Graph 有向無環圖) ? DAG 是一種數學結構,其中節點(任務Task)之間有方向性的連接,不存在循環 在 Airflow 中,指工作流程中的任務按照指定的順序執行,不會形成迴圈 任務 (Task): 每個任務代表工作流程中的一個單元,例如執行程式、複製數據、發送郵件... :+1: PS 清理數據、執行程式,只要是不同操作,分不同的task,不要寫在一起! 會比較好管理 依賴關係 (Dependency): 一個任務可能依賴於其他一個或多個任務的完成 執行順序 (Execution Order): DAG 保證任務按照它們的依賴順序來執行,類似於if...,elif...,else、if not... (網路圖) ![R](https://hackmd.io/_uploads/BJzSqeJHa.png) <br/> ### 撰寫DAG 進到dags資料夾下 建立一個檔案 ![截圖 2025-05-25 11.51.02](https://hackmd.io/_uploads/rJv3XMefxx.png) 設置DAG ```= # id 唯一性 # catchup 默認true,會“追趕”執行之前未執行的任務 with DAG('user_processing', start_date=datetime(2023/11/25), schedule_interval='@daily', catchup=False) as dag: ``` 設置TASK - 連接postgresSQL ```= from airflow import DAG from airflow.providers.postgres.operators.postgres import PostgresOperator from datetime import datetime # 定義 DAG with DAG( dag_id='user_processing', # DAG 的唯一識別 ID start_date=datetime(2023, 11, 25), # 正確格式:datetime(year, month, day) schedule_interval='@daily', # 每天執行一次 catchup=False, # 不補跑過去的未執行任務 tags=['example'], # 可加可不加,用來分類 ) as dag: # 定義一個 PostgresOperator 來建立資料表 create_table = PostgresOperator( task_id='create_user_table', # 任務 ID(每個 DAG 內唯一) postgres_conn_id='postgres', # Airflow 中的 connection ID sql=""" CREATE TABLE IF NOT EXISTS users ( firstname TEXT NOT NULL, lastname TEXT NOT NULL, country TEXT NOT NULL, username TEXT NOT NULL, password TEXT NOT NULL, email TEXT NOT NULL ); """, ) insert_data = PostgresOperator( task_id='insert_fake_user', postgres_conn_id='postgres', sql=""" INSERT INTO users (firstname, lastname, country, username, password, email) VALUES ('Tony', 'Stark', 'USA', 'ironman', 'jarvis123', 'tony@stark.com'); """ ) create_table >> insert_data ``` 進入Admin connections 設一個連接 ![截圖 2025-05-25 11.58.45](https://hackmd.io/_uploads/rJ8m8fxGgx.png) <br/> ### 執行DAG 可以終端機輸入確定一下狀態 啟動 ```= docker ps | grep scheduler docker-compose ps ``` ![截圖 2025-05-25 12.03.33](https://hackmd.io/_uploads/BJBj8Gezlg.png) bash 互動模式 可以像在 Linux 終端機一樣操作該容器內部環境 ```= docker exec -it materials-airflow-scheduler-1 /bin/bash ``` 會跳到airflow ![螢幕擷取畫面 2023-11-25 134839](https://hackmd.io/_uploads/SyiT9bkHp.png) 可以看到有哪些主要指令可以使用 ```= airflow -h ``` ![截圖 2025-05-25 12.05.34](https://hackmd.io/_uploads/S10Gwzxzex.png) 可以確認DAG是否已被載入 ```= docker exec -it airflow_docker-airflow-scheduler-1 bash #會進入一個容器內的 Linux 環境,像這樣 root@984cb96ecc62:/opt/airflow# airflow dags list ``` ![截圖 2025-05-25 12.10.40](https://hackmd.io/_uploads/Hy-I_fxMex.png) UI介面也會出現 ![截圖 2025-05-25 12.11.05](https://hackmd.io/_uploads/BkSdOGlfle.png) 可以確認DAG中指令 ```= airflow tasks test user_processing create_user_table 2023-11-25 ``` ![截圖 2025-05-25 12.13.28](https://hackmd.io/_uploads/SkagYMxMgx.png) terminal進到docker postgresql,查看是否建立成功 ```= docker exec -it airflow_docker-postgres-1 bash psql -U airflow -d airflow SELECT * FROM users; ``` ![截圖 2025-05-25 12.24.34](https://hackmd.io/_uploads/SyLcsMxzge.png) 也可以下載 DBeaver (支援多種資料庫的圖形界面)查看 yaml檔要先新增port號才會連接成功 ```= ports: - "5432:5432" ``` ```= # 停掉所有服務 docker-compose down # 重新啟動並套用新設定 docker-compose up -d ``` ![截圖 2025-05-25 12.30.28](https://hackmd.io/_uploads/r1GVXmxMeg.png) ![截圖 2025-05-25 12.42.04](https://hackmd.io/_uploads/ry6h1Qxzxl.png) ![截圖 2025-05-25 12.42.49](https://hackmd.io/_uploads/HyFC1QxGel.png) ![截圖 2025-05-25 12.43.10](https://hackmd.io/_uploads/BJZelXxzgl.png) <br/> ### DAG 設置文件輸入/輸出 yaml檔要新增/tmp:/tmp才會連接成功 ```= - /tmp:/tmp ``` ![截圖 2025-05-25 15.25.37](https://hackmd.io/_uploads/SyHbIBxMlg.png) 輸入,每天自動更新文件 ```= # producer.py from airflow import DAG, Dataset from airflow.decorators import task from datetime import datetime # 定義兩個 Dataset my_file = Dataset("/tmp/my_file.txt") her_file = Dataset("/tmp/her_file.txt") # Producer DAG with DAG( dag_id="producer", schedule="@daily", start_date=datetime(2023, 11, 26), catchup=False, ): # 輸出 my_file.txt @task(outlets=[my_file]) def update_dataset(): with open(my_file.uri, "a+") as f: f.write("my_file updated\n") # 輸出 her_file.txt @task(outlets=[her_file]) def update_dataset_2(): with open(her_file.uri, "a+") as f: f.write("her_file updated\n") update_dataset() >> update_dataset_2() ``` 輸出,偵測到文件更新,自動觸發DAG,去讀取Dataset內容 ```= # consumer.py from airflow import DAG, Dataset from airflow.decorators import task from datetime import datetime my_file = Dataset("/tmp/my_file.txt") her_file = Dataset("/tmp/her_file.txt") with DAG( dag_id="consumer", schedule=[my_file, her_file], # 多 Dataset trigger 條件 start_date=datetime(2023, 11, 26), catchup=False ): @task def read_dataset(): print("=== Reading my_file.txt ===") with open(my_file.uri, "r") as f: print(f.read()) print("=== Reading her_file.txt ===") with open(her_file.uri, "r") as f: print(f.read()) read_dataset() ``` ```= # 停掉所有服務 docker-compose down # 重新啟動並套用新設定 docker-compose up -d ``` 如果還沒觸發,點進ui 介面的 producer,右上角trigger即可 從ui介面就會看到關係圖了 ![截圖 2025-05-25 15.52.47](https://hackmd.io/_uploads/SkBDnrxGgx.png) 多檔案 當任一 Dataset 被更新,就會觸發 DAG 輸入 ```= from airflow import DAG, Dataset from airflow.decorators import task from datetime import datetime my_file = Dataset("/tmp/my_file.txt") her_file = Dataset("/tmp/her_file.txt") with DAG( dag_id="producer", schedule="@daily", start_date=datetime(2023, 11 ,26), catchup=False ): @task(outlets=[my_file]) def update_dataset(): with open(my_file.uri, "a+") as f: f.write("producer update") @task(outlets=[her_file]) def update_dataset_2(): with open(her_file.uri, "a+") as f: f.write("producer update") update_dataset() >> update_dataset_2() ``` 輸出 ```= from airflow import DAG, Dataset from airflow.decorators import task from datetime import datetime my_file = Dataset("/tmp/my_file.txt") her_file = Dataset("/tmp/her_file.txt") with DAG( dag_id="consumer", schedule=[my_file, her_file], start_date=datetime(2023, 11, 26), catchup=False ): @task def read_dataset(): with open(my_file.uri, "r") as f: print(f.read()) read_dataset() ``` ```= # 停掉所有服務 docker-compose down # 重新啟動並套用新設定 docker-compose up -d ``` 如果還沒觸發,點進ui 介面的 producer,右上角trigger即可 ![截圖 2025-05-25 17.59.46](https://hackmd.io/_uploads/B1SD9DeMee.png) 從ui介面就會看到關係圖了 ![截圖 2025-05-25 17.52.47](https://hackmd.io/_uploads/ByVYODgGxl.png) >PS 如範例,只要 my_file 成功執行,會直接觸發 consumer.py,her_file.txt 不管有沒有成功都不會影響 <br/> ### DAG 設置時間 前面設置過的時間start_date=datetime(2023/11/25) schedule_interval='@daily' 參考 [官網 Cron Presets](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dag-run.html) 參考 [Apache Airflow(四) start_date & schedule_interval](https://medium.com/sq-catch-and-note/apache-airflow-%E5%9B%9B-start-date-schedule-interval-3ce4a47656ff) - start_date - schedule_interval - end_date ![螢幕擷取畫面 2023-11-26 002318](https://hackmd.io/_uploads/S1lc1j1Sp.png) <br/> ### 寄email通知 設一個測試的dag ```= # test_email.py from airflow import DAG from airflow.operators.email import EmailOperator from datetime import datetime from pendulum import timezone with DAG( dag_id="test_email", start_date=datetime(2023, 11, 26, tzinfo=timezone("Asia/Taipei")), schedule="0 21 * * *", # 每天晚上 9 點 catchup=False ) as dag: send_email = EmailOperator( task_id="test_email_task", to="我的email@gmail.com", subject="Airflow Email Test", html_content="<p>This is a test from Airflow SMTP</p>" trigger_rule=TriggerRule.ALL_DONE # 不管 read_dataset 成功/失敗 都會跑 ) ``` ![截圖 2025-05-25 18.43.21](https://hackmd.io/_uploads/SkfoVOlfxe.png) yaml檔要加入email設定 ```= # airflow-common-env 下加入 # ✉️ Email SMTP settings AIRFLOW__EMAIL__EMAIL_BACKEND: airflow.utils.email.send_email_smtp AIRFLOW__SMTP__SMTP_HOST: smtp.gmail.com AIRFLOW__SMTP__SMTP_STARTTLS: True AIRFLOW__SMTP__SMTP_SSL: False AIRFLOW__SMTP__SMTP_USER: 我的email@gmail.com AIRFLOW__SMTP__SMTP_PASSWORD: 申請到的應用程式密碼 AIRFLOW__SMTP__SMTP_PORT: 587 AIRFLOW__SMTP__SMTP_MAIL_FROM: 我的email@gmail.com ``` ![1748170857880](https://hackmd.io/_uploads/S1KNtOeMxl.jpg) PS 申請應用程式密碼,以gmail為例 進到 [google後台](https://myaccount.google.com/security?rapt=AEjHL4PDIuW09gQJOhsAoZJxlGGxrAy6GPVtgsG7U_Rh81dUrS8sS7G2H4rILYKztt9axBqVRIE1k2Ujevj7Z0ilj5_PCpGUD5CqvXlfF3VAhEWw5s8v4zI),安全性認證 -> 兩步驟驗證 (gmail要有開才能申請) ![截圖 2025-05-25 19.10.02](https://hackmd.io/_uploads/BJqvo_xfxx.png) 滑到頁面最下面的 應用程式密碼 -> 輸入自己的gmail -> 會出現一組16位數的密碼 xxxx xxxx xxxx xxxx -> 貼到yaml檔即可 ![1748171523793](https://hackmd.io/_uploads/SkwujOxMeg.jpg) ```= # 停掉所有服務 docker-compose down # 重新啟動並套用新設定 docker-compose up -d ``` 設置成功就會收到email了 ![S__73900034](https://hackmd.io/_uploads/Skn6iOlzxe.jpg)