owned this note
owned this note
Published
Linked with GitHub
## **【Apache Airflow 工作流】**
:::info
- 什麼是 Apache Airflow?
- Airflow One Node Architecture、Airflow Multi Node Architecture
- 安裝Airflow (ft.Docker)
- 什麼是DAG?
- 撰寫DAG
- 執行DAG
- DAG 設置文件輸入/輸出
- 設置時間
- 寄email通知
:::
### 什麼是 Airflow?
Airflow,可以做到任務的排程、執行和監控
- 定義的 DAG 調度和執行 ETL 任務
- 定義任務之間的依賴性,確保在執行任務時的順序和條件
- 監控任務的執行狀態、錯誤信息
- 整合各種數據庫、資料湖、API 和其他數據源,成為一個通用的工作流自動化平台
在 Apache Airflow 中,有幾個重要的元素一起協同工作,形成一個完整的工作流自動化系統 :
Web Server(提供用戶UI界面,能夠查看 DAG(有向無環圖)、觸發任務執行、查看運行記錄等)
-> Operator(具體的任務,代表了要執行的單元,可能是action operators、transfer data、wait for a condition...)
-> Scheduler
-> Metastore(通常指的是Airflow的元數據庫,SQLite、MySQL、PostgreSQL)
-> Triggerer(Celery 或其他分佈式任務佇列系統) (Kubernetes(簡稱 K8s)自動化部署、擴展和操作應用程序容器)
-> Executor (定義任務的執行方式)
<br/>
### 安裝Airflow
#### docker
先安裝docker
到官網下載 [docker installer](https://www.docker.com/products/docker-desktop/)
設一個新資料夾,在終端機cd過去,輸入指令
```=
cd /Users/catalinakuo/Downloads/macbook_gpu/airflow_docker
echo -e "AIRFLOW_IMAGE_NAME=apache/airflow:2.8.1\nAIRFLOW_UID=50000" > .env
mkdir -p dags logs plugins config
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.8.1/docker-compose.yaml'
```
:+1: PS Vscode要先 (Cmd + Shift + P)
Shell Command: Install 'code' command in PATH

這時候進vscode查看

確認一下【.env】
```=
AIRFLOW_IMAGE_NAME=apache/airflow:2.8.1
AIRFLOW_UID=50000
```

初使化AIRFLOW+啟動
```=
docker compose up airflow-init
docker compose up -d
```

打開安裝好的應用程式


在chrome輸入localhost:8080,打開ui介面,帳號密碼都是airflow


:+1: PS 如果無法成功,可能port號被佔據,在終端機清除docker volume 並重新啟動
```=
docker volume prune -f
docker-compose down --volumes --remove-orphans
lsof -i :5432
sudo kill -9 <port_number>
docker-compose up -d
```

<br/>
點DAG進去後,上方可以查看
```=
- Grid (網格): 顯示各個任務的狀態和相關資訊
- Graph (圖表): 架構圖,顯示任務之間的依賴關係
- Calendar (行事曆): 以日曆形式顯示工作流程的執行計劃
- Task Duration (任務持續時間): 顯示每個任務執行所需的時間
- Task Tries (任務嘗試次數): 顯示任務執行失敗後的重試次數
- Landing Times (著陸時間): 顯示任務成功完成的時間
- Gantt (甘特圖): 顯示工作流程的執行時間表,會更直觀地了解任務之間的時間關係
- Details (詳細資訊): 查看工作流程執行的詳細資訊,日誌、輸入參數等
- Code (程式碼): 查看工作流程的程式碼
- Audit Log (審計日誌): 有關工作流程執行的審計日誌,可以用在追踪和分析系統的使用和行為
```

<br/>
### 什麼是 DAG (Directed Acyclic Graph 有向無環圖) ?
DAG 是一種數學結構,其中節點(任務Task)之間有方向性的連接,不存在循環
在 Airflow 中,指工作流程中的任務按照指定的順序執行,不會形成迴圈
任務 (Task): 每個任務代表工作流程中的一個單元,例如執行程式、複製數據、發送郵件...
:+1: PS 清理數據、執行程式,只要是不同操作,分不同的task,不要寫在一起! 會比較好管理
依賴關係 (Dependency): 一個任務可能依賴於其他一個或多個任務的完成
執行順序 (Execution Order): DAG 保證任務按照它們的依賴順序來執行,類似於if...,elif...,else、if not...
(網路圖)

<br/>
### 撰寫DAG
進到dags資料夾下
建立一個檔案

設置DAG
```=
# id 唯一性 # catchup 默認true,會“追趕”執行之前未執行的任務
with DAG('user_processing', start_date=datetime(2023/11/25),
schedule_interval='@daily', catchup=False) as dag:
```
設置TASK - 連接postgresSQL
```=
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime
# 定義 DAG
with DAG(
dag_id='user_processing', # DAG 的唯一識別 ID
start_date=datetime(2023, 11, 25), # 正確格式:datetime(year, month, day)
schedule_interval='@daily', # 每天執行一次
catchup=False, # 不補跑過去的未執行任務
tags=['example'], # 可加可不加,用來分類
) as dag:
# 定義一個 PostgresOperator 來建立資料表
create_table = PostgresOperator(
task_id='create_user_table', # 任務 ID(每個 DAG 內唯一)
postgres_conn_id='postgres', # Airflow 中的 connection ID
sql="""
CREATE TABLE IF NOT EXISTS users (
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
email TEXT NOT NULL
);
""",
)
insert_data = PostgresOperator(
task_id='insert_fake_user',
postgres_conn_id='postgres',
sql="""
INSERT INTO users (firstname, lastname, country, username, password, email)
VALUES ('Tony', 'Stark', 'USA', 'ironman', 'jarvis123', 'tony@stark.com');
"""
)
create_table >> insert_data
```
進入Admin connections 設一個連接

<br/>
### 執行DAG
可以終端機輸入確定一下狀態 啟動
```=
docker ps | grep scheduler
docker-compose ps
```

bash 互動模式
可以像在 Linux 終端機一樣操作該容器內部環境
```=
docker exec -it materials-airflow-scheduler-1 /bin/bash
```
會跳到airflow

可以看到有哪些主要指令可以使用
```=
airflow -h
```

可以確認DAG是否已被載入
```=
docker exec -it airflow_docker-airflow-scheduler-1 bash
#會進入一個容器內的 Linux 環境,像這樣
root@984cb96ecc62:/opt/airflow#
airflow dags list
```

UI介面也會出現

可以確認DAG中指令
```=
airflow tasks test user_processing create_user_table 2023-11-25
```

terminal進到docker postgresql,查看是否建立成功
```=
docker exec -it airflow_docker-postgres-1 bash
psql -U airflow -d airflow
SELECT * FROM users;
```

也可以下載 DBeaver (支援多種資料庫的圖形界面)查看
yaml檔要先新增port號才會連接成功
```=
ports:
- "5432:5432"
```
```=
# 停掉所有服務
docker-compose down
# 重新啟動並套用新設定
docker-compose up -d
```




<br/>
### DAG 設置文件輸入/輸出
yaml檔要新增/tmp:/tmp才會連接成功
```=
- /tmp:/tmp
```

輸入,每天自動更新文件
```=
# producer.py
from airflow import DAG, Dataset
from airflow.decorators import task
from datetime import datetime
# 定義兩個 Dataset
my_file = Dataset("/tmp/my_file.txt")
her_file = Dataset("/tmp/her_file.txt")
# Producer DAG
with DAG(
dag_id="producer",
schedule="@daily",
start_date=datetime(2023, 11, 26),
catchup=False,
):
# 輸出 my_file.txt
@task(outlets=[my_file])
def update_dataset():
with open(my_file.uri, "a+") as f:
f.write("my_file updated\n")
# 輸出 her_file.txt
@task(outlets=[her_file])
def update_dataset_2():
with open(her_file.uri, "a+") as f:
f.write("her_file updated\n")
update_dataset() >> update_dataset_2()
```
輸出,偵測到文件更新,自動觸發DAG,去讀取Dataset內容
```=
# consumer.py
from airflow import DAG, Dataset
from airflow.decorators import task
from datetime import datetime
my_file = Dataset("/tmp/my_file.txt")
her_file = Dataset("/tmp/her_file.txt")
with DAG(
dag_id="consumer",
schedule=[my_file, her_file], # 多 Dataset trigger 條件
start_date=datetime(2023, 11, 26),
catchup=False
):
@task
def read_dataset():
print("=== Reading my_file.txt ===")
with open(my_file.uri, "r") as f:
print(f.read())
print("=== Reading her_file.txt ===")
with open(her_file.uri, "r") as f:
print(f.read())
read_dataset()
```
```=
# 停掉所有服務
docker-compose down
# 重新啟動並套用新設定
docker-compose up -d
```
如果還沒觸發,點進ui 介面的 producer,右上角trigger即可
從ui介面就會看到關係圖了

多檔案
當任一 Dataset 被更新,就會觸發 DAG
輸入
```=
from airflow import DAG, Dataset
from airflow.decorators import task
from datetime import datetime
my_file = Dataset("/tmp/my_file.txt")
her_file = Dataset("/tmp/her_file.txt")
with DAG(
dag_id="producer",
schedule="@daily",
start_date=datetime(2023, 11 ,26),
catchup=False
):
@task(outlets=[my_file])
def update_dataset():
with open(my_file.uri, "a+") as f:
f.write("producer update")
@task(outlets=[her_file])
def update_dataset_2():
with open(her_file.uri, "a+") as f:
f.write("producer update")
update_dataset() >> update_dataset_2()
```
輸出
```=
from airflow import DAG, Dataset
from airflow.decorators import task
from datetime import datetime
my_file = Dataset("/tmp/my_file.txt")
her_file = Dataset("/tmp/her_file.txt")
with DAG(
dag_id="consumer",
schedule=[my_file, her_file],
start_date=datetime(2023, 11, 26),
catchup=False
):
@task
def read_dataset():
with open(my_file.uri, "r") as f:
print(f.read())
read_dataset()
```
```=
# 停掉所有服務
docker-compose down
# 重新啟動並套用新設定
docker-compose up -d
```
如果還沒觸發,點進ui 介面的 producer,右上角trigger即可

從ui介面就會看到關係圖了

>PS 如範例,只要 my_file 成功執行,會直接觸發 consumer.py,her_file.txt 不管有沒有成功都不會影響
<br/>
### DAG 設置時間
前面設置過的時間start_date=datetime(2023/11/25)
schedule_interval='@daily'
參考 [官網 Cron Presets](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dag-run.html)
參考 [Apache Airflow(四) start_date & schedule_interval](https://medium.com/sq-catch-and-note/apache-airflow-%E5%9B%9B-start-date-schedule-interval-3ce4a47656ff)
- start_date
- schedule_interval
- end_date

<br/>
### 寄email通知
設一個測試的dag
```=
# test_email.py
from airflow import DAG
from airflow.operators.email import EmailOperator
from datetime import datetime
from pendulum import timezone
with DAG(
dag_id="test_email",
start_date=datetime(2023, 11, 26, tzinfo=timezone("Asia/Taipei")),
schedule="0 21 * * *", # 每天晚上 9 點
catchup=False
) as dag:
send_email = EmailOperator(
task_id="test_email_task",
to="我的email@gmail.com",
subject="Airflow Email Test",
html_content="<p>This is a test from Airflow SMTP</p>"
trigger_rule=TriggerRule.ALL_DONE # 不管 read_dataset 成功/失敗 都會跑
)
```

yaml檔要加入email設定
```=
# airflow-common-env 下加入
# ✉️ Email SMTP settings
AIRFLOW__EMAIL__EMAIL_BACKEND: airflow.utils.email.send_email_smtp
AIRFLOW__SMTP__SMTP_HOST: smtp.gmail.com
AIRFLOW__SMTP__SMTP_STARTTLS: True
AIRFLOW__SMTP__SMTP_SSL: False
AIRFLOW__SMTP__SMTP_USER: 我的email@gmail.com
AIRFLOW__SMTP__SMTP_PASSWORD: 申請到的應用程式密碼
AIRFLOW__SMTP__SMTP_PORT: 587
AIRFLOW__SMTP__SMTP_MAIL_FROM: 我的email@gmail.com
```

PS 申請應用程式密碼,以gmail為例
進到 [google後台](https://myaccount.google.com/security?rapt=AEjHL4PDIuW09gQJOhsAoZJxlGGxrAy6GPVtgsG7U_Rh81dUrS8sS7G2H4rILYKztt9axBqVRIE1k2Ujevj7Z0ilj5_PCpGUD5CqvXlfF3VAhEWw5s8v4zI),安全性認證 -> 兩步驟驗證 (gmail要有開才能申請)

滑到頁面最下面的 應用程式密碼 -> 輸入自己的gmail -> 會出現一組16位數的密碼 xxxx xxxx xxxx xxxx -> 貼到yaml檔即可

```=
# 停掉所有服務
docker-compose down
# 重新啟動並套用新設定
docker-compose up -d
```
設置成功就會收到email了
