# **【Kafka × Airflow 3:資料流處理與自動化】** :::info - 雲端 MongoDB Atlas 安裝 - Airflow 3 + Dags 設置 (Docker) - Kafka 資料串流處理實作 - 為什麼要使用 Kafka? - 測試 - 實作:log 自動更新入 PostgreSQL ::: 其他資料流相關筆記: [【Apache Airflow 工作流】](https://hackmd.io/@workcata/rkPNA2TE6) [【Spark + Iceberg:本地端 Memory Tuning、Join 與資料傾斜處理】ft. DataExpert 課程實作](https://hackmd.io/@workcata/S1StkgX9el) [【PySpark 巨量資料處理:Azure Databricks 設置】](https://hackmd.io/@workcata/HythIKGu6) [【從 SQL 仔到資料工程師:打造第一個 ETL/ELT 工作流】](https://hackmd.io/@workcata/ryLXecrYgl) <br/> 這篇記錄練習資料流與自動化的小專案:透過 Airflow 3 做排程 → 測試把資料存進 PostgreSQL / SQLite / MongoDB → 再嘗試 Kafka 串流處理 在 Airflow 架構裡,本體必須依賴一個 Metadata Database 來存放 DAG、Task、Log 等狀態 官方支援的後端資料庫只有:SQLite(僅限開發測試)、PostgreSQL、MySQL/MariaDB MongoDB 不行,它不是 RDBMS、沒有 SQL / Transaction / Schema 特性,Airflow Metadata DB 需要用 SQLAlchemy ORM 產生 schema(create tables, migrations…),但 MongoDB 沒有相容的 dialect 我想實作看看,所以用 Docker 架了 Airflow(搭配 PostgreSQL、SQLite 當後端),讓 DAG 可以正常寫入。MongoDB 的部分則改用雲端的 MongoDB Atlas,把它當成 DAG 任務中的資料來源與目標 <br/> ## 雲端 MongoDB Atlas 安裝 PS Window 安裝看這篇 [【MongoDB 是什麼?Mongodb 優勢、安裝/指令】](https://hackmd.io/@workcata/SksSRXEmn) 進到 [MongoDB Community Server](https://www.mongodb.com/try/download/community),選自己的版本,用 Terminal 載 ```= brew install mongodb-altas altas setup ``` ![截圖 2025-09-02 17.04.16](https://hackmd.io/_uploads/S17m7E49xx.png) 會自動跳到網頁,輸入 Terminal 出現的驗證碼 ![1756803981218](https://hackmd.io/_uploads/HkHCXE4cll.jpg) ![截圖 2025-09-02 17.06.48](https://hackmd.io/_uploads/Bk26Q4Nqlg.png) 回到 Terminal,啟動 MongoDB ```= mongosh ``` ![截圖 2025-09-02 17.10.40](https://hackmd.io/_uploads/HkZsVVN5ll.png) 這裡我遇到一個問題,因為是免費版,帳號的叢集額度不夠,如果以前用過,要去把舊的叢集刪除 進到 [MongoDB Atlas](https://cloud.mongodb.com/v2/644282f992773f5088469ecc#/clusters) ![截圖 2025-09-02 17.18.14](https://hackmd.io/_uploads/rJJYLE4cxg.png) 刪除後 Create 一個新的 ![截圖 2025-09-02 17.30.43](https://hackmd.io/_uploads/SJE8YE4qlg.png) 預設不是 Free,需要手動改 ![截圖 2025-09-02 17.20.53](https://hackmd.io/_uploads/HyPIwNV5xl.png) ![截圖 2025-09-02 17.21.37](https://hackmd.io/_uploads/BknID44clx.png) 點選上方 Connect -> Shell ![截圖 2025-09-02 17.24.42](https://hackmd.io/_uploads/rJlF_4Vqlg.png) 設置 username + password ![截圖 2025-09-02 17.31.20](https://hackmd.io/_uploads/B1ZCKN49xg.png) ![截圖 2025-09-02 17.24.52](https://hackmd.io/_uploads/SJ2FuEV9lx.png) 照著指令在 Terminal 操作 ```= brew install mongosh mongosh "mongodb+srv://cluster0.szfovfp.mongodb.net/" --apiVersion 1 --username <db_username> ``` 它會要求輸入密碼,剛才在 Atlas 建 Database User 設定的(不是 Atlas 登入密碼) ![截圖 2025-09-04 13.43.55](https://hackmd.io/_uploads/BJcmwj8cgl.png) 查看資料庫 ```= show dbs ``` 建立自己的資料庫 & collection ```= use mydb db.mycollection.insertOne({name: "hello", value: 123}) ``` 查詢剛剛放進 mycollection 的資料 ``` db.mycollection.find() ``` 看有哪些 collections ```= show collections ``` ![截圖 2025-09-02 17.37.43](https://hackmd.io/_uploads/rJ7ziVV9ee.png) ![截圖 2025-09-04 14.42.50](https://hackmd.io/_uploads/HyceB3L5ex.png) 開啟 Vscode 測試,是否有連上 MongoDB Atlas ```= !pip install "pymongo[srv]" !pip install dnspython from pymongo import MongoClient import pymongo import pandas as pd ``` ![截圖 2025-09-02 18.14.36](https://hackmd.io/_uploads/ByNsQrN9el.png) 測試連線 ```= url = "mongodb+srv://<username>:<password>@cluster0.szfovfp.mongodb.net/?retryWrites=true&w=majority" client = MongoClient(url, serverSelectionTimeoutMS=10000) # 測試連線 print(client.admin.command("ping")) # 成功會回 {'ok': 1} ``` ![截圖 2025-09-02 17.55.38](https://hackmd.io/_uploads/r1nX1SNqxg.png) 連到前面在 Terminal 創建的資料庫,插入一筆新資料 ```= db = client["mydb"] collection = db["mycollection"] # 插入資料 collection.insert_one({"name": "Focus", "skill": "machine learning"}) # 查詢 All for doc in collection.find(): print(doc) ``` ![截圖 2025-09-02 17.56.47](https://hackmd.io/_uploads/ByV_1r4qlx.png) 確定可以寫入資料後,接著架設 Airflow 3 和 Dags 任務 <br/> ## Airflow 3 + Dags 設置 (Docker) 這裡透過 Docker Airflow 3 來安裝 Kafka、PostgreSQL、SQLite Airflow 3 與舊版在架構上有一些差異: - Airflow 2.x Scheduler 是單一進程,直接負責排程與下發任務 - Airflow 3.x Scheduler 內部新增了一個 Internal API(預設監聽在 8793),由 Gunicorn 啟動一個輕量 Web 服務 這個 Internal API 的作用,是讓其他元件可以透過 HTTP 與 Scheduler 溝通,例如: - LocalExecutor / KubernetesExecutor 的 worker → 回報任務狀態、接收調度 - DAG Processor → 上報 DAG 解析結果 - Triggerer → 傳遞 deferrable 任務事件 <br/> 開一個 Vscode 資料夾,我取名為 airflow_dbs 資料結構會呈現 ```= airflow_dbs/ ← 專案根目錄 ├── dags/ ← 放所有 Airflow DAG 的資料夾 │ ├── sqlite_daily_dump.py ← DAG:每天寫入 SQLite │ ├── postgresql_daily_dump.py ← DAG:每天寫入 PostgreSQL │ └── mongodb_daily_dump.py ← DAG:每天寫入 MongoDB (用 MongoHook 連線 Atlas) ├── .env ├── docker-compose.yml ├── logs ├── plugins ``` PS 拿 gmail app 密碼 因為 Google 已經關閉「允許低安全性應用程式」,不能再用 Gmail 的真實登入密碼寄信,因此需要在 Google 帳號裡開啟 應用程式密碼 開啟 [Google 帳號安全性設定](https://myaccount.google.com/security?utm_source=chatgpt.com) 安全性 -> 開啟兩步驟驗證 ![截圖 2025-09-04 18.09.44](https://hackmd.io/_uploads/HkPuSJw5gg.png) ![截圖 2025-09-04 18.13.39](https://hackmd.io/_uploads/S1md8kwclg.png) 回到上一頁,上方搜詢 應用程式密碼 ![截圖 2025-09-04 18.17.26](https://hackmd.io/_uploads/S1vUv1w9ge.png) ![截圖 2025-09-04 18.18.13](https://hackmd.io/_uploads/Sy9dDywclg.png) 會出現一組16位數的密碼 xxxx xxxx xxxx xxxx,複製下來,中間空格刪掉,直接貼到 .env ![1756981126994](https://hackmd.io/_uploads/ByXCPJPqxl.jpg) 申請 Airflow 的加密金鑰(Fernet key) 需要注意,每次重建容器時都換 key ```= python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())" ``` .env gmail、應用程式密碼、加密金鑰 改成自己的 ```= # Docker 裡,程式預設用 root 身份,主機看到都是 root 權限,會遇到 Permission denied,自己帳號打不開、不能刪 # macOS 上帳號 501 / 0 是 root 群組 id,設定後,Airflow 會假裝用本機帳號在寫檔,主機就能正常讀寫 AIRFLOW_UID=501 AIRFLOW_GID=0 # SQLAlchemy DSN,Airflow 會使用到,自動註冊一個 conn_id=pg_target 的連線 PG_TARGET_URI=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow MONGO_ATLAS_URI=mongodb+srv://://<username>:<password>@cluster0.szfovfp.mongodb.net/?retryWrites=true&w=majority # SMTP 連線(供 EmailOperator 使用) # 使用 Gmail App Password,連線到 465 並啟用 SSL,關閉 STARTTLS AIRFLOW_CONN_SMTP_DEFAULT=smtp+ssl://<gmail>%40gmail.com:<應用程式密碼>@smtp.gmail.com:465?starttls=false AIRFLOW__CORE__FERNET_KEY=<加密金鑰> AIRFLOW__SMTP__SMTP_HOST=smtp.gmail.com AIRFLOW__SMTP__SMTP_PORT=465 AIRFLOW__SMTP__SMTP_USER=://<gmail> AIRFLOW__SMTP__SMTP_PASSWORD=:<應用程式密碼> AIRFLOW__SMTP__SMTP_MAIL_FROM=<gmail> AIRFLOW__SMTP__SMTP_STARTTLS=False AIRFLOW__SMTP__SMTP_SSL=True ``` docker-compose.yml ```= services: # Kafka (KRaft 單節點) kafka: image: bitnami/kafka:3.7 container_name: kafka ports: ["29092:29092"] environment: - KAFKA_ENABLE_KRAFT=yes - KAFKA_CFG_NODE_ID=1 - KAFKA_CFG_PROCESS_ROLES=controller,broker # 宣告連線位址 - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,PLAINTEXT_HOST://:29092 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT - KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrst # Kafka 資料持久化 volumes: - kafka_data:/bitnami/kafka # 掛了自動重啟 restart: unless-stopped # Postgres(Airflow 後端) postgres: image: postgres:16-alpine container_name: airflow-postgres environment: - POSTGRES_USER=airflow - POSTGRES_PASSWORD=airflow - POSTGRES_DB=airflow # 我指定 5433,正常沒裝過用 5432 即可 ports: ["5433:5432"] # 資料持久化 volumes: - pg_data:/var/lib/postgresql/data # DB ready 後,才讓依賴服務啟動 healthcheck: test: ["CMD-SHELL","pg_isready -U airflow -d airflow || exit 1"] interval: 5s timeout: 3s retries: 30 restart: unless-stopped # Airflow DB migrate(一次性) # 把 Airflow 的 Metadata DB(存在 Postgres)升/建立 schema airflow-init: image: apache/airflow:3.0.6-python3.12 container_name: airflow-init depends_on: postgres: condition: service_healthy env_file: .env environment: - AIRFLOW__CORE__EXECUTOR=LocalExecutor - AIRFLOW__CORE__LOAD_EXAMPLES=False # 指定上方的 postgresql - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow - AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE=Asia/Taipei - AIRFLOW__CORE__SIMPLE_AUTH_MANAGER_ALL_ADMINS=True # .env 建立 connection id=pg_target - AIRFLOW_CONN_PG_TARGET=${PG_TARGET_URI} - AIRFLOW_CONN_MONGO_ATLAS=${MONGO_ATLAS_URI} # SQLite DAG 用來輸出的掛載路徑 - SQLITE_EXPORT_DIR=/opt/airflow/sqlite - _PIP_ADDITIONAL_REQUIREMENTS=pandas sqlalchemy psycopg2-binary apache-airflow-providers-smtp pymongo kafka-python # SMTP config is provided via .env - AIRFLOW__CORE__FERNET_KEY=${AIRFLOW__CORE__FERNET_KEY} - POSTGRES_CONN_ID=pg_target - LOGS_DIR=/opt/airflow/logs_test user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}" # 本機的資料夾 掛載到容器裡 volumes: - ./dags:/opt/airflow/dags - ./logs:/opt/airflow/logs - ./plugins:/opt/airflow/plugins - ./sqlite:/opt/airflow/sqlite - ./logs_test:/opt/airflow/logs_test # 執行 Airflow 的資料庫 Migration,連到 postgres command: ["bash","-lc","airflow db migrate && airflow users create --role Admin --username admin --password admin --firstname Admin --lastname User --email admin@example.com || true"] restart: "no" # Airflow Webserver and Scheduler # Airflow 2.x Webserver(UI)跟 Scheduler(排程引擎)是 獨立的兩個服務 # Airflow 3 process 可以同時跑 scheduler + api-server airflow-webserver-scheduler: image: apache/airflow:3.0.6-python3.12 container_name: airflow-webserver-scheduler # Postgres 資料庫變成「健康」才會啟動 depends_on: postgres: condition: service_healthy airflow-init: condition: service_completed_successfully # 把 .env 檔裡的變數讀進來 env_file: .env environment: # 多進程 - AIRFLOW__CORE__EXECUTOR=LocalExecutor - AIRFLOW__CORE__LOAD_EXAMPLES=False - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow - AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE=Asia/Taipei # 所有使用者都是 admin - AIRFLOW__CORE__SIMPLE_AUTH_MANAGER_ALL_ADMINS=True # 開啟 Internal API,scheduler 內部靠 gunicorn web 服務互相通訊 - AIRFLOW__CORE__INTERNAL_API_ENABLED=True # 讓 Internal API 綁在 8793 port,支援 IPv4/IPv6 - GUNICORN_CMD_ARGS=--bind 0.0.0.0:8793 --bind [::]:8793 - AIRFLOW__API__HOST=0.0.0.0 - AIRFLOW__API__PORT=8793 - AIRFLOW__API__WORKERS=2 - AIRFLOW__API__WORKER_TIMEOUT=60 - AIRFLOW__CORE__PARALLELISM=1 # Scheduler 每次查詢 DB 時最多拿 1 個 TaskInstance - AIRFLOW__SCHEDULER__MAX_TIS_PER_QUERY=1 - AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT=120 - AIRFLOW__DAG_PROCESSOR__DAG_FILE_PROCESSOR_TIMEOUT=120 - AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT=600 - SQLITE_EXPORT_DIR=/opt/airflow/sqlite - _PIP_ADDITIONAL_REQUIREMENTS=pandas sqlalchemy psycopg2-binary apache-airflow-providers-smtp pymongo kafka-python - AIRFLOW_CONN_PG_TARGET=${PG_TARGET_URI} - AIRFLOW_CONN_MONGO_ATLAS=${MONGO_ATLAS_URI} # SMTP config is provided via .env - POSTGRES_CONN_ID=pg_target - LOGS_DIR=/opt/airflow/logs_test user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}" ports: ["8081:8080"] volumes: - ./dags:/opt/airflow/dags - ./logs:/opt/airflow/logs - ./plugins:/opt/airflow/plugins - ./sqlite:/opt/airflow/sqlite - ./logs_test:/opt/airflow/logs_test # 啟動後要跑的指令(Airflow 3 使用 api-server) command: ["bash","-lc","airflow scheduler & airflow api-server --port 8080 --host 0.0.0.0"] # Airflow 3 API server exposes /live and /ready endpoints healthcheck: test: ["CMD", "python", "-c", "import urllib.request,sys;sys.exit(0 if urllib.request.urlopen('http://localhost:8080/live').status==200 else 1)"] interval: 10s timeout: 5s retries: 10 restart: unless-stopped # Airflow DAG Processor(解析 DAG → serialized_dag) airflow-dag-processor: image: apache/airflow:3.0.6-python3.12 container_name: airflow-dag-processor depends_on: postgres: condition: service_healthy airflow-init: condition: service_completed_successfully env_file: .env environment: - AIRFLOW__CORE__EXECUTOR=LocalExecutor - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow - AIRFLOW__CORE__SIMPLE_AUTH_MANAGER_ALL_ADMINS=True - SQLITE_EXPORT_DIR=/opt/airflow/sqlite - _PIP_ADDITIONAL_REQUIREMENTS=pandas sqlalchemy psycopg2-binary apache-airflow-providers-smtp pymongo kafka-python - AIRFLOW_CONN_PG_TARGET=${PG_TARGET_URI} - AIRFLOW_CONN_MONGO_ATLAS=${MONGO_ATLAS_URI} # SMTP config is provided via .env - POSTGRES_CONN_ID=pg_target - LOGS_DIR=/opt/airflow/logs_test user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}" volumes: - ./dags:/opt/airflow/dags - ./logs:/opt/airflow/logs - ./sqlite:/opt/airflow/sqlite - ./logs_test:/opt/airflow/logs_test # 只執行 dag-processor(專門解析 DAG,不負責執行) command: ["bash","-lc","airflow dag-processor"] restart: unless-stopped # Airflow Triggerer(處理 deferrable 等待) airflow-triggerer: image: apache/airflow:3.0.6-python3.12 container_name: airflow-triggerer depends_on: postgres: condition: service_healthy airflow-init: condition: service_completed_successfully env_file: .env environment: - AIRFLOW__CORE__EXECUTOR=LocalExecutor - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow - AIRFLOW__CORE__SIMPLE_AUTH_MANAGER_ALL_ADMINS=True - SQLITE_EXPORT_DIR=/opt/airflow/sqlite - _PIP_ADDITIONAL_REQUIREMENTS=pandas sqlalchemy psycopg2-binary apache-airflow-providers-smtp pymongo kafka-python - AIRFLOW_CONN_PG_TARGET=${PG_TARGET_URI} - AIRFLOW_CONN_MONGO_ATLAS=${MONGO_ATLAS_URI} # SMTP config is provided via .env - POSTGRES_CONN_ID=pg_target - LOGS_DIR=/opt/airflow/logs_test user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}" volumes: - ./dags:/opt/airflow/dags - ./logs:/opt/airflow/logs - ./plugins:/opt/airflow/plugins - ./sqlite:/opt/airflow/sqlite - ./logs_test:/opt/airflow/logs_test # 專門監聽 deferrable operator 狀態 command: ["bash","-lc","airflow triggerer"] restart: unless-stopped volumes: kafka_data: driver: local pg_data: driver: local ``` 資料夾下再開一個資料夾,命名為 dags,裡面放置我所有要跑的資料,這裡用每天寫入一個日期檔案測試 sqlite_daily_dump.py ```= from __future__ import annotations import os import sqlite3 from datetime import datetime import pytz from airflow import DAG from airflow.operators.python import PythonOperator import smtplib import ssl from airflow.utils.trigger_rule import TriggerRule EXPORT_DIR = os.environ.get("SQLITE_EXPORT_DIR", "/opt/airflow/sqlite") DB_PATH = os.path.join(EXPORT_DIR, "daily.db") LOCAL_TZ = os.environ.get("LOCAL_TZ", "Asia/Taipei") # 寫入資料,sqlite 寫入要指定時區 def write_today(**context): os.makedirs(EXPORT_DIR, exist_ok=True) tz = pytz.timezone(LOCAL_TZ) now = datetime.now(tz) now_iso = now.isoformat(timespec="seconds") ds = context.get("ds") or now.date().isoformat() ds_nodash = context.get("ds_nodash") or ds.replace("-", "") tbl = f"d{ds_nodash}" conn = sqlite3.connect(DB_PATH) cur = conn.cursor() cur.execute(f''' CREATE TABLE IF NOT EXISTS "{tbl}" ( id INTEGER PRIMARY KEY AUTOINCREMENT, today TEXT NOT NULL, created_at TEXT NOT NULL ) ''') cur.execute(f'INSERT INTO "{tbl}" (today, created_at) VALUES (?, ?)', (ds, now_iso)) conn.commit() conn.close() print(f'[INFO] Wrote table "{tbl}" into {DB_PATH}') print(f'[DEBUG] LOCAL_TZ={LOCAL_TZ} ds={ds} created_at={now_iso}') # 啟用時間、每天執行 with DAG( dag_id="sqlite_daily_dump", start_date=datetime(2025, 9, 1), schedule="@daily", catchup=False, tags=["sqlite", "daily"], ) as dag: write_task = PythonOperator( task_id="write_today_table", python_callable=write_today, ) # 寄送 gmail def _send_email(subject: str, html: str): host = os.getenv("AIRFLOW__SMTP__SMTP_HOST", "smtp.gmail.com") port = int(os.getenv("AIRFLOW__SMTP__SMTP_PORT", "465")) user = os.getenv("AIRFLOW__SMTP__SMTP_USER") password = os.getenv("AIRFLOW__SMTP__SMTP_PASSWORD") sender = os.getenv("AIRFLOW__SMTP__SMTP_MAIL_FROM", user) to = "catalinakuowork@gmail.com" if not user or not password: raise RuntimeError("SMTP credentials not set in environment") msg = (f"From: {sender}\r\n" f"To: {to}\r\n" f"Subject: {subject}\r\n" f"Content-Type: text/html; charset=utf-8\r\n\r\n" f"{html}") if port == 465: context = ssl.create_default_context() with smtplib.SMTP_SSL(host, port, context=context, timeout=30) as server: server.login(user, password) server.sendmail(sender, [to], msg.encode("utf-8")) else: with smtplib.SMTP(host, port, timeout=30) as server: server.ehlo() server.starttls(context=ssl.create_default_context()) server.ehlo() server.login(user, password) server.sendmail(sender, [to], msg.encode("utf-8")) # 成功時通知 def _notify_success(**context): ds = context.get("ds") ds_nodash = context.get("ds_nodash") or (ds.replace("-", "") if ds else "") _send_email( subject="[Airflow] SQLite daily dump - 成功", html=f"<p>表 <code>d{ds_nodash}</code> 寫入成功</p>", ) # 失敗時通知 def _notify_failure(**context): ds = context.get("ds") ds_nodash = context.get("ds_nodash") or (ds.replace("-", "") if ds else "") _send_email( subject="[Airflow] SQLite daily dump - 失敗", html=f"<p>表 <code>d{ds_nodash}</code> 寫入失敗,請檢查日誌。</p>", ) notify_success = PythonOperator( task_id="notify_on_success", python_callable=_notify_success, trigger_rule=TriggerRule.ALL_SUCCESS, ) notify_failure = PythonOperator( task_id="notify_on_failure", python_callable=_notify_failure, trigger_rule=TriggerRule.ONE_FAILED, ) write_task >> [notify_success, notify_failure] ``` 之後 Trigger 執行,本機端資料庫查看,確實有寫入 ```=sql SELECT * FROM sqlite_master WHERE type IN ('table', 'view') ORDER BY name; SELECT * FROM "d20250906"; ``` ![截圖 2025-09-06 14.25.03](https://hackmd.io/_uploads/BkW1ELFqxg.png) postgres_daily_dump.py ```= from __future__ import annotations from datetime import datetime import os import smtplib import ssl from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.postgres.hooks.postgres import PostgresHook from psycopg2 import sql # 組動態表名 from airflow.utils.trigger_rule import TriggerRule # 寫入資料,postgres 有時區感知,會自動轉換為 PostgreSQL 的 server 時區 def write_today_pg(**context): """ 在 Postgres 建立表 dYYYYMMDD,並插入今天日期。 需要 Airflow 連線 conn_id='pg_target'。 """ ds = context.get("ds") if not ds: ds = datetime.utcnow().date().isoformat() ds_nodash = context.get("ds_nodash") or ds.replace("-", "") tbl = f"d{ds_nodash}" hook = PostgresHook(postgres_conn_id="pg_target") conn = hook.get_conn() conn.autocommit = True with conn.cursor() as cur: # 建表 cur.execute( sql.SQL(""" CREATE TABLE IF NOT EXISTS {tbl} ( id SERIAL PRIMARY KEY, today DATE NOT NULL, created_at TIMESTAMPTZ NOT NULL ) """).format(tbl=sql.Identifier(tbl)) ) # 寫入 cur.execute( sql.SQL("INSERT INTO {tbl} (today, created_at) VALUES (%s, NOW())") .format(tbl=sql.Identifier(tbl)), [ds], ) conn.close() # 啟用時間、每天執行 with DAG( dag_id="postgres_daily_dump", start_date=datetime(2025, 9, 1), schedule="@daily", catchup=False, tags=["postgres", "daily"], ) as dag: write_task = PythonOperator( task_id="write_today_table_pg", python_callable=write_today_pg, ) # 寄送 gmail def _send_email(subject: str, html: str): host = os.getenv("AIRFLOW__SMTP__SMTP_HOST", "smtp.gmail.com") port = int(os.getenv("AIRFLOW__SMTP__SMTP_PORT", "465")) user = os.getenv("AIRFLOW__SMTP__SMTP_USER") password = os.getenv("AIRFLOW__SMTP__SMTP_PASSWORD") sender = os.getenv("AIRFLOW__SMTP__SMTP_MAIL_FROM", user) to = "catalinakuowork@gmail.com" if not user or not password: raise RuntimeError("SMTP credentials not set in environment") msg = (f"From: {sender}\r\n" f"To: {to}\r\n" f"Subject: {subject}\r\n" f"Content-Type: text/html; charset=utf-8\r\n\r\n" f"{html}") if port == 465: context = ssl.create_default_context() with smtplib.SMTP_SSL(host, port, context=context, timeout=30) as server: server.login(user, password) server.sendmail(sender, [to], msg.encode("utf-8")) else: with smtplib.SMTP(host, port, timeout=30) as server: server.ehlo() server.starttls(context=ssl.create_default_context()) server.ehlo() server.login(user, password) server.sendmail(sender, [to], msg.encode("utf-8")) # 成功時通知 def _notify_success(**context): ds = context.get("ds") ds_nodash = context.get("ds_nodash") or (ds.replace("-", "") if ds else "") _send_email( subject="[Airflow] Postgres daily dump - 成功", html=f"<p>表 <code>d{ds_nodash}</code> 寫入成功</p>", ) # 失敗時通知 def _notify_failure(**context): ds = context.get("ds") ds_nodash = context.get("ds_nodash") or (ds.replace("-", "") if ds else "") _send_email( subject="[Airflow] Postgres daily dump - 失敗", html=f"<p>表 <code>d{ds_nodash}</code> 寫入失敗,請檢查日誌。</p>", ) notify_success = PythonOperator( task_id="notify_on_success", python_callable=_notify_success, trigger_rule=TriggerRule.ALL_SUCCESS, ) notify_failure = PythonOperator( task_id="notify_on_failure", python_callable=_notify_failure, trigger_rule=TriggerRule.ONE_FAILED, ) write_task >> [notify_success, notify_failure] ``` 之後 Trigger 執行,本機端資料庫查看,確實有寫入 ```=sql SELECT current_database(); SELECT table_schema, table_name FROM information_schema.tables; SELECT * FROM "d20250906" ORDER BY id DESC LIMIT 10; ``` ![截圖 2025-09-06 15.38.08](https://hackmd.io/_uploads/B1kxrPK5gl.png) mongodb_daily_dump.py ```= from __future__ import annotations import os from datetime import datetime import pytz from airflow import DAG from airflow.operators.python import PythonOperator from airflow.utils.trigger_rule import TriggerRule import smtplib import ssl from airflow.hooks.base import BaseHook MONGODB_URI_ENV = os.getenv("MONGODB_URI") MONGODB_DB = os.getenv("MONGODB_DB", "daily_db") LOCAL_TZ = os.environ.get("LOCAL_TZ", "Asia/Taipei") def _resolve_mongo_uri() -> str: # 優先使用 Airflow connection: mongo_atlas(由 AIRFLOW_CONN_MONGO_ATLAS 提供) try: conn = BaseHook.get_connection("mongo_atlas") uri = conn.get_uri() # 若 schema 缺省導致成為 mongo://,但 .env 已提供 srv URI,直接用 get_uri() 即可 if uri: return uri except Exception: pass # 退回使用環境變數 MONGODB_URI 或 .env 的 MONGO_ATLAS_URI if MONGODB_URI_ENV: return MONGODB_URI_ENV alt = os.getenv("MONGO_ATLAS_URI") if alt: return alt # 最後的保底(使用者提供的預設測試帳密) return "mongodb+srv://test123:test123@cluster0.szfovfp.mongodb.net/?retryWrites=true&w=majority" # 寫入資料 def write_today_mongo(**context): # 延後載入,避免 DAG 匯入期因套件未安裝而失敗 from pymongo import MongoClient tz = pytz.timezone(LOCAL_TZ) now = datetime.now(tz) now_iso = now.isoformat(timespec="seconds") ds = context.get("ds") or now.date().isoformat() ds_nodash = context.get("ds_nodash") or ds.replace("-", "") coll_name = f"d{ds_nodash}" mongo_uri = _resolve_mongo_uri() client = MongoClient(mongo_uri, serverSelectionTimeoutMS=10000) db = client[MONGODB_DB] coll = db[coll_name] doc = { "today": ds, "created_at": now_iso, } result = coll.insert_one(doc) print(f"[INFO] Inserted _id={result.inserted_id} into {MONGODB_DB}.{coll_name}") print(f"[DEBUG] LOCAL_TZ={LOCAL_TZ} ds={ds} created_at={now_iso}") # 寄送 gmail def _send_email(subject: str, html: str): host = os.getenv("AIRFLOW__SMTP__SMTP_HOST", "smtp.gmail.com") port = int(os.getenv("AIRFLOW__SMTP__SMTP_PORT", "465")) user = os.getenv("AIRFLOW__SMTP__SMTP_USER") password = os.getenv("AIRFLOW__SMTP__SMTP_PASSWORD") sender = os.getenv("AIRFLOW__SMTP__SMTP_MAIL_FROM", user) to = "catalinakuowork@gmail.com" if not user or not password: raise RuntimeError("SMTP credentials not set in environment") msg = ( f"From: {sender}\r\n" f"To: {to}\r\n" f"Subject: {subject}\r\n" f"Content-Type: text/html; charset=utf-8\r\n\r\n" f"{html}" ) if port == 465: context = ssl.create_default_context() with smtplib.SMTP_SSL(host, port, context=context, timeout=30) as server: server.login(user, password) server.sendmail(sender, [to], msg.encode("utf-8")) else: with smtplib.SMTP(host, port, timeout=30) as server: server.ehlo() server.starttls(context=ssl.create_default_context()) server.ehlo() server.login(user, password) server.sendmail(sender, [to], msg.encode("utf-8")) # 啟用時間、每天執行 with DAG( dag_id="mongodb_daily_dump", start_date=datetime(2025, 9, 1), schedule="@daily", catchup=False, tags=["mongodb", "daily"], ) as dag: write_task = PythonOperator( task_id="write_today_collection", python_callable=write_today_mongo, ) def _notify_success(**context): ds = context.get("ds") ds_nodash = context.get("ds_nodash") or (ds.replace("-", "") if ds else "") _send_email( subject="[Airflow] MongoDB daily dump - 成功", html=f"<p>集合 <code>d{ds_nodash}</code> 寫入成功</p>", ) def _notify_failure(**context): ds = context.get("ds") ds_nodash = context.get("ds_nodash") or (ds.replace("-", "") if ds else "") _send_email( subject="[Airflow] MongoDB daily dump - 失敗", html=f"<p>集合 <code>d{ds_nodash}</code> 寫入失敗,請檢查日誌。</p>", ) notify_success = PythonOperator( task_id="notify_on_success", python_callable=_notify_success, trigger_rule=TriggerRule.ALL_SUCCESS, ) notify_failure = PythonOperator( task_id="notify_on_failure", python_callable=_notify_failure, trigger_rule=TriggerRule.ONE_FAILED, ) write_task >> [notify_success, notify_failure] ``` 之後 Trigger 執行,本機端資料庫查看,確實有寫入 ```= show dbs use <db_name> show collections db.<collection_name>.find().pretty() ``` ![截圖 2025-09-06 15.51.54](https://hackmd.io/_uploads/SJat_wt5gl.png) 上面檔案都放入dags,終端機 cd 到主要資料夾後 ```= docker compose up airflow-init docker compose up -d # 確認 kafka 容器在跑 docker ps ``` ![截圖 2025-09-04 16.24.10](https://hackmd.io/_uploads/Skp3hpU9ex.png) 檢查埠與狀態 ```= docker ps --format "table {{.Names}}\t{{.Status}}\t{{.Ports}}" # airflow-web 應該看到 0.0.0.0:8081->8080/tcp # airflow-postgres 應該看到 0.0.0.0:5433->5432/tcp ``` ![截圖 2025-09-04 16.24.42](https://hackmd.io/_uploads/ryf1apU9lg.png) UI打開,就會看到了 ![截圖 2025-09-02 19.03.14](https://hackmd.io/_uploads/HkSGJU49ex.png) 手動觸發 Trigger ![截圖 2025-09-06 15.54.27](https://hackmd.io/_uploads/S14aOvt9lg.png) ![截圖 2025-09-06 15.57.59](https://hackmd.io/_uploads/SkMsYvKcgx.png) <br/> ## Kafka 資料串流處理 ### 為什麼要使用 Kafka? Kafka 是設計來處理「即時、高頻、連續」的資料流 - Airflow 是「我安排好時間你再跑」 - Kafka 是「一發生事情就通知你跑」 當資料來源是即時推播、想讓系統是事件驅動(資料來 → 自動觸發處理),才會用 Kafka + Airflow 整合 <br/> ### 測試 首先要建立 Topic、Producer、Consumer Topic 是 Producer & Consumer 溝通的橋樑 - Producer 把訊息丟到某個 topic - Consumer 訂閱該 topic 才能收到訊息 - topic,為「訊息存放區」 一個 Kafka 可以有多個 topic不同的系統可以只訂閱自己需要的 topic 例如: orders → 存訂單訊息 payments → 存付款訊息 logs → 存應用程式 log 建立 topic ```= docker exec -it kafka kafka-topics.sh \ --bootstrap-server localhost:9092 \ --create --topic test --partitions 1 --replication-factor 1 ``` 確認建立 ```= docker exec -it kafka kafka-topics.sh \ --bootstrap-server localhost:9092 --list ``` ![截圖 2025-09-02 18.31.26](https://hackmd.io/_uploads/BkpFDrN5ge.png) 開一個 新的 Terminal 視窗,執行 Consumer,它會掛在那裡等待訊息 ```= docker exec -it kafka kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic test --from-beginning ``` ![截圖 2025-09-02 18.35.49](https://hackmd.io/_uploads/ryrquBN9xg.png) 再開一個 新的 Terminal 視窗,執行 Producer,進入後輸入幾行文字(例如 Hello World),按 Enter,會看到 Consumer 同步輸出這些訊息 ```= docker exec -it kafka kafka-console-producer.sh \ --broker-list localhost:9092 \ --topic test ``` ![截圖 2025-09-02 18.35.17](https://hackmd.io/_uploads/rJtyYSVcgx.png) 回到 Consumer ![截圖 2025-09-02 18.35.26](https://hackmd.io/_uploads/ryAauB4qel.png) <br/> ## 實作:log 自動更新入 PostgreSQL 假設我的網站每天都有 log 檔,要讓 kafka 自動抓取新檔案,觸發寫入 PostgreSQL。再每天觸發一次,確保資料都有完全進去資料庫 log 檔案夾 /opt/airflow/logs_test → Producer 轉 JSON 丟 Kafka log_topic → Consumer 拉取寫入 Postgres logs_table - dags/kafka_postgres_daily_dump.py:每日 DAG 排程跑一次 - dags/kafka_postgres_watch.py : 每分鐘偵測檔案變動(mtime),有變動就立即觸發主 DAG - 偏移與去重:以 .offsets.json 記錄每個檔案已處理「行數」,只送新增行;watch DAG 以 Variable 記錄「檔案 mtime 快照」 這裡用個log檔案 ./logs_test/20250909.log ```= 2025-09-09 12:00:00 INFO Started 2025-09-09 13:01:00 WARN Slow IO 2025-09-09 14:02:00 ERROR Disk full code=ENOSPC 2025-09-09 15:28:52 INFO user=alice action=login ``` ![截圖 2025-09-07 19.31.02](https://hackmd.io/_uploads/SycZT1s9xg.png) log_producer.py 掃描 log 資料夾,有新行就送到 Kafka topic ```= #!/usr/bin/env python3 # Unified log producer: scans a directory, sends only new lines to Kafka as JSON. import json import os from datetime import datetime, timezone from typing import Dict, Iterable, Tuple DEFAULT_LOG_DIR = os.environ.get("LOG_DIR", "/opt/airflow/logs_test") DEFAULT_OFFSETS_PATH = os.environ.get("OFFSETS_PATH", os.path.join(DEFAULT_LOG_DIR, ".offsets.json")) DEFAULT_BOOTSTRAP = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", os.environ.get("BOOTSTRAP_SERVERS", "kafka:9092")) DEFAULT_TOPIC = os.environ.get("KAFKA_TOPIC", os.environ.get("TOPIC", "log_topic")) # 目錄裡的 log 檔,Offset 載入 def _load_offsets(offsets_path: str) -> Dict[str, int]: try: with open(offsets_path, "r", encoding="utf-8") as f: data = json.load(f) if isinstance(data, dict): return {str(k): int(v) for k, v in data.items()} return {} except FileNotFoundError: return {} except Exception: return {} # 寫入 offset 更新結果 def _save_offsets(offsets_path: str, offsets: Dict[str, int]) -> None: os.makedirs(os.path.dirname(offsets_path), exist_ok=True) with open(offsets_path, "w", encoding="utf-8") as f: json.dump(offsets, f, ensure_ascii=False, indent=2) essential_skip = {".DS_Store"} # 掃描新行 def scan_new_lines(log_dir: str, offsets_path: str) -> Tuple[Iterable[dict], Dict[str, int]]: offsets = _load_offsets(offsets_path) records = [] if not os.path.isdir(log_dir): return records, offsets for entry in sorted(os.listdir(log_dir)): # Skip hidden files and the offsets file itself and trivial OS files if entry.startswith('.') or entry == os.path.basename(offsets_path) or entry in essential_skip: continue file_path = os.path.join(log_dir, entry) if not os.path.isfile(file_path): continue prev_count = int(offsets.get(entry, 0)) try: with open(file_path, "r", encoding="utf-8", errors="replace") as f: lines = f.readlines() except Exception: # Skip files we cannot read continue total = len(lines) if total > prev_count: for idx, line in enumerate(lines[prev_count:], start=prev_count + 1): records.append( { "filename": entry, "line_number": idx, "message": line.rstrip("\n"), "ingested_at": datetime.now(timezone.utc).isoformat(), } ) offsets[entry] = total return records, offsets # 將資料送到 Kafka def produce_records(bootstrap_servers: str, topic: str, records: Iterable[dict]) -> int: from kafka import KafkaProducer producer = KafkaProducer( bootstrap_servers=bootstrap_servers, value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode("utf-8"), linger_ms=50, acks="all", retries=3, ) count = 0 for rec in records: producer.send(topic, value=rec) count += 1 producer.flush() producer.close() return count def main(): log_dir = DEFAULT_LOG_DIR offsets_path = DEFAULT_OFFSETS_PATH bootstrap = DEFAULT_BOOTSTRAP topic = DEFAULT_TOPIC print(f"Scanning dir={log_dir} offsets={offsets_path} topic={topic} bootstrap={bootstrap}") records, updated_offsets = scan_new_lines(log_dir, offsets_path) if not records: print("No new lines found. Nothing to send.") return sent = produce_records(bootstrap, topic, records) _save_offsets(offsets_path, updated_offsets) print(f"Sent {sent} messages. Files tracked: {len(updated_offsets)}") if __name__ == "__main__": main() ``` ./dags/kafka_postgres_daily_dump.py Airflow DAG,先 produce logs → consume 存入 Postgres ```= from __future__ import annotations import os import json from datetime import datetime import logging from pendulum import timezone from airflow import DAG from airflow.decorators import task from airflow.operators.python import PythonOperator from airflow.utils.trigger_rule import TriggerRule import smtplib import ssl from airflow.providers.postgres.hooks.postgres import PostgresHook # 嘗試從 utils/log_producer.py 匯入 log 掃描與 Kafka 發送函式 # 若路徑錯誤,就動態將 當前目錄加入 sys.path 再 import try: from utils.log_producer import scan_new_lines, produce_records except ModuleNotFoundError: import sys sys.path.insert(0, os.path.dirname(__file__)) from utils.log_producer import scan_new_lines, produce_records DAG_ID = "kafka_postgres_daily_dump" # Config LOG_DIR = os.environ.get("LOGS_DIR", "/opt/airflow/logs_test") OFFSETS_PATH = os.environ.get("LOGS_OFFSETS_PATH", os.path.join(LOG_DIR, ".offsets.json")) KAFKA_BOOTSTRAP = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "kafka:9092") KAFKA_TOPIC = os.environ.get("KAFKA_TOPIC", "log_topic") POSTGRES_CONN_ID = os.environ.get("POSTGRES_CONN_ID", "postgres") POSTGRES_TABLE = os.environ.get("POSTGRES_LOGS_TABLE", "logs_table") default_args = { "owner": "airflow", } # 執行時間 with DAG( dag_id=DAG_ID, start_date=datetime(2025, 9, 1, tzinfo=timezone("Asia/Taipei")), schedule="@daily", catchup=False, default_args=default_args, tags=["kafka", "postgres", "logs"], ): # Producer 把找到的新行送進 Kafka topic @task(task_id="produce_logs_to_kafka") def produce_logs_to_kafka(): # 監控資料夾是否有新檔案 records, updated_offsets = scan_new_lines(LOG_DIR, OFFSETS_PATH) # 掃描 log_dir 找到新行 sent = 0 if records: sent = produce_records(KAFKA_BOOTSTRAP, KAFKA_TOPIC, records) # 把找到的新行送進 Kafka topic if records: try: os.makedirs(os.path.dirname(OFFSETS_PATH), exist_ok=True) with open(OFFSETS_PATH, "w", encoding="utf-8") as f: json.dump(updated_offsets, f, ensure_ascii=False, indent=2) except Exception as e: # Fail the task if we cannot persist progress to avoid duplicates logging.exception("[producer] failed to persist offsets to %s", OFFSETS_PATH) raise logging.info( "[producer] dir=%s files_tracked=%s new_records=%s sent=%s", LOG_DIR, len(updated_offsets), len(records), sent, ) if records: sample = records[:3] logging.info("[producer] sample records: %s", json.dumps(sample, ensure_ascii=False)) return {"sent": sent, "files_tracked": len(updated_offsets)} # Consumer 訊息寫入 PostgreSQL @task(task_id="consume_kafka_to_postgres") def consume_kafka_to_postgres(): from kafka import KafkaConsumer consumer = KafkaConsumer( KAFKA_TOPIC, bootstrap_servers=KAFKA_BOOTSTRAP, group_id="airflow_logs_consumer", enable_auto_commit=True, auto_offset_reset="earliest", value_deserializer=lambda v: json.loads(v.decode("utf-8")), consumer_timeout_ms=5000, # stop after idle max_poll_records=1000, ) rows = [] for msg in consumer: rec = msg.value rows.append( ( rec.get("filename"), int(rec.get("line_number")) if rec.get("line_number") is not None else None, rec.get("message"), rec.get("ingested_at"), int(msg.offset), int(msg.partition), ) ) # 每一條訊息解析後塞進 rows 清單 consumer.close() logging.info("[consumer] polled messages: %s", len(rows)) # 關閉 Kafka consumer hook = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID) create_sql = f""" CREATE TABLE IF NOT EXISTS {POSTGRES_TABLE} ( filename TEXT, line_number INTEGER, message TEXT, ingested_at TIMESTAMPTZ, kafka_offset BIGINT, kafka_partition INTEGER ); """ insert_sql = f""" INSERT INTO {POSTGRES_TABLE} (filename, line_number, message, ingested_at, kafka_offset, kafka_partition) VALUES (%s, %s, %s, %s, %s, %s); """ # 用 Airflow conn_id 建立 DB 連線 with hook.get_conn() as conn: with conn.cursor() as cur: cur.execute(create_sql) if rows: cur.executemany(insert_sql, rows) conn.commit() logging.info( "[consumer] inserted rows: %s into table=%s (conn_id=%s)", len(rows), POSTGRES_TABLE, POSTGRES_CONN_ID, ) return {"inserted": len(rows)} # Gmail def _send_email(subject: str, html: str): host = os.getenv("AIRFLOW__SMTP__SMTP_HOST", "smtp.gmail.com") port = int(os.getenv("AIRFLOW__SMTP__SMTP_PORT", "587")) user = os.getenv("AIRFLOW__SMTP__SMTP_USER") password = os.getenv("AIRFLOW__SMTP__SMTP_PASSWORD") sender = os.getenv("AIRFLOW__SMTP__SMTP_MAIL_FROM", user) to = os.getenv("NOTIFY_EMAIL_TO", "catalinakuowork@gmail.com") if not user or not password: raise RuntimeError("SMTP credentials not set in environment") msg = (f"From: {sender}\r\n" f"To: {to}\r\n" f"Subject: {subject}\r\n" f"Content-Type: text/html; charset=utf-8\r\n\r\n" f"{html}") if port == 465: context = ssl.create_default_context() with smtplib.SMTP_SSL(host, port, context=context, timeout=30) as server: server.login(user, password) server.sendmail(sender, [to], msg.encode("utf-8")) else: with smtplib.SMTP(host, port, timeout=30) as server: server.ehlo() server.starttls(context=ssl.create_default_context()) server.ehlo() server.login(user, password) server.sendmail(sender, [to], msg.encode("utf-8")) def _notify_success(**context): ti = context["ti"] inserted = ti.xcom_pull(task_ids="consume_kafka_to_postgres", key="return_value") or {} sent = ti.xcom_pull(task_ids="produce_logs_to_kafka", key="return_value") or {} _send_email( subject="[Airflow] Kafka→Postgres daily dump - 成功", html=( f"<p>Kafka→Postgres 完成。</p>" f"<p>Producer sent: <b>{sent.get('sent', 0)}</b> 行,追蹤檔案數:{sent.get('files_tracked', 0)}</p>" f"<p>Consumer inserted: <b>{inserted.get('inserted', 0)}</b> 筆</p>" ), ) def _notify_failure(**context): dag_run = context.get("dag_run") err = "請檢查任務日誌" _send_email( subject="[Airflow] Kafka→Postgres daily dump - 失敗", html=( f"<p>任務失敗:{dag_run.run_id if dag_run else ''}</p>" f"<p>{err}</p>" ), ) notify_success = PythonOperator( task_id="notify_on_success", python_callable=_notify_success, trigger_rule=TriggerRule.ALL_SUCCESS, ) notify_failure = PythonOperator( task_id="notify_on_failure", python_callable=_notify_failure, trigger_rule=TriggerRule.ONE_FAILED, ) p = produce_logs_to_kafka() c = consume_kafka_to_postgres() # 成功:兩個任務都成功後通知;失敗:任一任務失敗就通知 p >> c >> notify_success [p, c] >> notify_failure ``` ./dags/kafka_postgres_watch.py 監控log資料夾是否有新檔案,如果有新檔案,立即觸發另一個 DAG(kafka_postgres_daily_dump),寫入 Postgresql ```= from __future__ import annotations import os import json from datetime import datetime import logging from pendulum import timezone from airflow import DAG from airflow.decorators import task # Airflow 3:使用 SDK 的 Variable,避免在任務中直接 ORM 存取 from airflow.sdk import Variable from airflow.providers.standard.operators.python import ShortCircuitOperator from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator # 嘗試匯入與 daily_dump 相同的工具,偵測是否有「新行」 try: from utils.log_producer import scan_new_lines except ModuleNotFoundError: import sys sys.path.insert(0, os.path.dirname(__file__)) from utils.log_producer import scan_new_lines DAG_ID = "kafka_postgres_watch" LOG_DIR = os.environ.get("LOGS_DIR", "/opt/airflow/logs_test") # 用 Variable 紀錄「上一版行數快照」,避免重複觸發 OFFSETS_VAR = os.environ.get("LOGS_OFFSETS_VAR", "kafka_logs_watch_offsets") # 與 daily_dump 相同的 offsets 路徑(僅用於讀取先前進度,不在此寫回檔案) OFFSETS_PATH = os.environ.get("LOGS_OFFSETS_PATH", os.path.join(LOG_DIR, ".offsets.json")) # 執行時間 with DAG( dag_id=DAG_ID, start_date=datetime(2025, 9, 1, tzinfo=timezone("Asia/Taipei")), schedule="*/1 * * * *", catchup=False, tags=["kafka", "watch"], ): # 監控資料夾是否有「新行」而非僅新檔案 @task(task_id="check_new_lines") def check_new_lines(): # 讀取上一版快照(檔名->行數)。若不存在則視為空 try: prev_snapshot = json.loads(Variable.get(OFFSETS_VAR)) if not isinstance(prev_snapshot, dict): prev_snapshot = {} except Exception: prev_snapshot = {} # 掃描當前狀態(不在此寫入 .offsets.json) records, updated_offsets = scan_new_lines(LOG_DIR, OFFSETS_PATH) # 計算相對於 Variable 快照的增量行數,避免 .offsets.json 尚未更新導致重複觸發 delta_by_file = {} for fname, total in updated_offsets.items(): prev = int(prev_snapshot.get(fname, 0)) if str(prev_snapshot.get(fname, 0)).isdigit() else prev_snapshot.get(fname, 0) or 0 try: prev = int(prev) except Exception: prev = 0 inc = max(int(total) - prev, 0) if inc > 0: delta_by_file[fname] = inc delta_total = sum(delta_by_file.values()) # 實務上,以 records_found 作為保險:即使 Variable 快照已等於目前 totals,但若 offsets.json 仍落後,仍要觸發 has_new_lines = (delta_total > 0) or bool(records) # 更新快照到 Airflow Variable,避免下一輪重複觸發(失敗不致命) try: # 若 SDK Variable 在任務時段不被允許,也不阻塞流程 Variable.set(OFFSETS_VAR, json.dumps(updated_offsets, ensure_ascii=False, sort_keys=True)) except Exception: pass # 紀錄偵測資訊 try: logging.info( "[watch] dir=%s files_tracked=%s records_found=%s delta_total=%s", LOG_DIR, len(updated_offsets), len(records), delta_total, ) if delta_by_file: logging.info("[watch] per-file delta: %s", json.dumps(delta_by_file, ensure_ascii=False)) except Exception: pass # 回傳供後續 ShortCircuit 與 TriggerDagRunOperator 使用 conf = { "reason": "new_lines_detected" if has_new_lines else "no_changes", "stats": { "files": len(updated_offsets), "records": len(records), "delta_records": delta_total, }, } return {"should_trigger": has_new_lines, "conf": conf} check = check_new_lines() @task(task_id="build_conf") def build_conf(payload: dict) -> dict: return (payload or {}).get("conf", {}) conf_payload = build_conf(check) # 依據檢查結果決定是否繼續往下游觸發 gate = ShortCircuitOperator( task_id="should_trigger", python_callable=lambda ti: (ti.xcom_pull(task_ids="check_new_lines") or {}).get("should_trigger", False), ) trigger = TriggerDagRunOperator( task_id="trigger_daily_dump", trigger_dag_id="kafka_postgres_daily_dump", conf=conf_payload, reset_dag_run=False, wait_for_completion=False, skip_when_already_exists=True, fail_when_dag_is_paused=False, ) check >> gate >> trigger ``` ./log_test/20250909.log 新增一筆 手動觸發 watch dag 後,json 檔自動更新、PostgreSQL 也有成功寫入 .offsets.json ![截圖 2025-09-08 00.02.15](https://hackmd.io/_uploads/By99nXs9ge.png)