# 從 SQL 仔到資料工程師:打造第一個 ETL/ELT 工作流 :::info - Databaase vs Data Warehouse vs Data Lake - ETL vs ELT - 工作流介紹(以GCP為例) - 範例 ::: ![截圖 2025-08-22 14.34.32](https://hackmd.io/_uploads/B1xrl5Htex.png) ![截圖 2025-08-22 14.35.00](https://hackmd.io/_uploads/ByeBx5rYll.png) 會sql=會資料工程嗎?當初的我想得太簡單了 SQL 側重在資料查詢與分析,而資料工程則處理資料規模過大或需要即時處理的挑戰 當我們收集到資料後,往往需要借助排程與流程管理工具(如 Airflow)、資料管線平台(如 Kafka、Hadoop)、分散式處理工具(如 Spark、Flink),確保資料能被高效收集、轉換與儲存。最終,這些資料會進入分析資料庫,並透過 BI 工具(如 Tableau、Looker Studio)提供可視化與決策支援 <br/> ## Databaase vs Data Warehouse vs Data Lake Database:活的資料庫,重即時交易 Data Lake:大水庫,先存全部原始資料,重探索 & ML Data Warehouse:乾淨整理好的歷史資料,重報表決策 通常的流程為 : Database → Data Lake → Data Warehouse → BI/ML | Feature | Database | Data Lake | Data Warehouse | | ------------------------- | -------------------- | --------------------------------- | ---------------- | | **Purpose (用途)** | 即時交易 (例如:POS、App 記錄) | 原始資料儲存,支援多種應用 | 歷史資料分析 (報表、OLAP) | | **Data Structure (資料型態)** | 結構化 (表格) | 結構化 + 半結構化 + 非結構化 (表格、JSON、圖片、影片) | 結構化 (乾淨的表格) | | **Speed (速度)** | 小查詢很快 (高 TPS) | 不一定,要看處理方式 (批次/即時) | 為分析查詢最佳化 (大查詢快) | | **Use Case (應用)** | 作業系統 (例如 ATM、購物網站交易) | 進階分析、機器學習、資料科學 | 商業智慧 (BI 報表、決策) | | **Scalability (擴展性)** | 有限 (單機/垂直擴充) | 高度可擴展 (S3、HDFS 幾乎無上限) | 中等 (雲端水平擴展有限) | <br/> ## ETL vs ELT ### ETL - Extract:資料多樣 table (結構化資料) image (非結構化) log file (半結構化) - Transform:在進入倉儲前就先處理,清理、標準化、定義 schema - Load:將乾淨資料存進 Data Warehouse,例如:BigQuery、Snowflake、Redshift - 用途:商業智慧 (BI)、報表、決策支持 (OLAP 分析) ### ELT - Extract:來源多樣 table (結構化資料) image (非結構化) log file (半結構化) - Load:原始資料直接存進 Data Lake,例如:S3、HDFS - Transform:需要時再轉換 Schema-on-read,依應用再做清理 - 用途:多團隊共用,機器學習 (ML)、資料科學、探索性分析 <br/> ## 工作流介紹 (以GCP為例) 假設今天網站資料會使用 GCP BigQuery,有可能的做法為: | 方案 | Data Sources | Data Processing (流程管理工具) | Data Processing (管線平台) | Data Processing (分散式處理工具) | Data Storage | 說明 | | --------------- | ---------------------- | ------------------------ | ---------------------- | ---------------------------- | ------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | **A 批次分析** | BigQuery | Airflow / Cloud Composer | | | BigQuery (整理後) | **流程**:BigQuery 先存網站流量 → Airflow / Composer 定時觸發 SQL 或 ETL job → 整理後再存回 BigQuery <br> **適用**:每日或每小時的批次報表 <br> **優缺點**:邏輯簡單、成本低,但無法即時 | | **B 分散式處理** | BigQuery → GCS Export | | | Spark / Cloud Dataproc | BigQuery / CloudSQL | **流程**:BigQuery 匯出大批量資料到 GCS → Spark / Cloud Dataproc 做清洗、轉換、ML 前處理 → 再寫回 BigQuery / CloudSQL <br> **適用**:TB 級以上資料,複雜轉換或 ML pipeline <br> **優缺點**:彈性大、功能強,但需維運叢集 | | **C 即時流處理** | 網站事件 (JSON) | 流式處理是一直跑(24/7)<br> 不需要 Airflow | Kafka / Pub/Sub | Flink / Spark Streaming / Cloud Dataflow | BigQuery / Elasticsearch | **流程**:網站事件(JSON)直接寫進 Kafka / Pub/Sub → Flink / Spark Streaming / Cloud Dataflow 即時處理(sessionization、filter、aggregation)→ 寫入 BigQuery / Elasticsearch <br> **適用**:秒級即時分析(流量監控、詐欺偵測) <br> **優缺點**:即時性佳,但需處理系統穩定性與高吞吐量 | | **D Lambda 架構** | 網站事件 + BigQuery Export | Spark 批次部分需要 Airflow 觸發(如每天 1AM 跑一次)<br>Flink 流部分則是(24/7),不靠 Airflow | Kafka / Pub/Sub | Flink (即時) + Spark / Cloud Dataproc (批次) | BigQuery (RT+Historical) | **流程**:Flink 處理即時事件 → Spark / Cloud Dataproc 批次處理歷史完整數據 → 最後合併查詢 <br> **適用**:需要同時兼顧「完整歷史」+「最新即時」的場景 <br> **優缺點**:即時與歷史兼顧,但需維護兩條管線,複雜度高 | | **E 雲端無伺服器** | 網站流量 (GCP) | Dataflow 可以直接由 PubSub 事件觸發,完全不需要 Airflow | Pub/Sub | Cloud Dataflow | BigQuery | **流程**:網站流量直接進 Pub/Sub → Dataflow 即時或批次處理 → 寫入 BigQuery <br> **適用**:不想維運基礎設施的團隊 <br> **優缺點**:完全託管,依事件量付費,開發快但可能有雲端鎖定風險 | PS 補充 | 功能 | Cloud Composer (類似 Apache Airflow) | Cloud Dataflow (類似 Apache Beam) | | ------ | ------------------------ | --------------------- | | 定位 | Orchestration(排程、監控) | Processing(資料處理) | | 是否處理資料 | ❌ 不處理資料 | ✅ 負責轉換/聚合 | | 使用場景 | 每日跑 ETL、依賴管理 | 即時流式處理、批次運算 | <br/> | 特點 / 工具 | **Cloud Dataproc** (類似 Apache Spark/Hadoop) | **Cloud Dataflow** (類似 Apache Beam, Serverless) | **Flink 自建** (自己維運) | | -------- | ------------------------------ | ---------------------------------- | ------------------- | | **定位** | 雲端批次處理 | 雲端即時/批次全託管 | 超低延遲流處理 | | **適合場景** | TB 級大批量 ETL、ML 前處理 | Pub/Sub → BigQuery 即時分析、批次流並存 | 毫秒級事件處理、CEP、IoT | | **優點** | Spark 生態完整、功能強 | 免維運、即時+批次都行、按量付費 | 延遲最低、彈性最大 | | **缺點** | 要管叢集、不是即時 | 綁 Google 生態、抽象高 | 維運麻煩、學習曲線高 | <br/> ## 範例 ### **json file → Kafka → Flink SQL → GCS 檔案 → BigQuery → windows 分析後存入 postgresql** 如果做 windows 分析 要「即時」(tumbling/hopping/session) → 放在 Flink SQL 做,邊吃 Kafka 邊算,結果直接寫 GCS/BigQuery 要「批次/回溯/重算」window 分析(移動平均、留存、回補缺漏)→ 放在 BigQuery SQL 做,用排程(DTS / Composer)每日或每小時跑 --- 假設有 JSON 檔代表使用者點擊網站的事件,每天都會有一份 event_20250822.json event_20250823.json event_20250824.json ⋯⋯ ```= {"ip":"1.2.3.4","url":"/home","referrer":"google","host":"example.com","event_time":"2025-08-20T12:34:56Z"} ``` 用 Kafka 當成「即時事件資料管道」 DAG1 : 每天自動抓取 json 檔案 → 建 Kafka topic,用 console producer 送 json file 進去 ```= from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime with DAG( dag_id="send_json_to_kafka_daily", start_date=datetime(2025, 8, 1), schedule_interval="0 0 * * *", # 每天凌晨 0 點 catchup=False, ) as dag: send_to_kafka = BashOperator( task_id="send_json_to_kafka", bash_command=( "kafka-console-producer.sh " "--bootstrap-server localhost:9092 " "--topic web-events < /data/event_{{ ds_nodash }}.json" ) ) ``` 用 Flink SQL 接 Kafka(前面設好的 web-events) 做資料處理 → 傳到 GCS job.sql ```=sql -- 來源:Kafka,讓 Flink 能「持續讀取 Kafka 裡的事件」 CREATE TABLE web_events ( ip STRING, url STRING, referrer STRING, host STRING, event_time STRING, ts AS TO_TIMESTAMP_LTZ(CAST(DATE_FORMAT(event_time, 'yyyy-MM-dd''T''HH:mm:ss''Z') AS STRING), 0), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'web-events', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'flink-web-consumer', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); -- Parquet 作為輸出表(儲存在 GCS) CREATE TABLE events_parquet ( ip STRING, host STRING, url STRING, referrer STRING, ts TIMESTAMP_LTZ(3) ) PARTITIONED BY (`dt`) WITH ( 'connector' = 'filesystem', 'path' = 'gs://my-web-events-bucket/web-events/raw/', 'format' = 'parquet', 'sink.partition-commit.policy.kind' = 'success-file', 'sink.partition-commit.trigger' = 'partition-time', 'sink.partition-commit.delay' = '1 min', 'partition.time-extractor.timestamp-pattern' = '$dt', 'partition.time-extractor.class' = 'org.apache.flink.table.partition.TimeExtractor', 'partition.time-extractor.timestamp-formatter' = 'yyyyMMdd' ); -- 寫入Flink 一旦啟動就會一直從 Kafka 拉資料,會在叢集上變成「24/7 的 job」 -- 加上 partition 欄位 dt(用事件時間轉字串) INSERT INTO events_parquet SELECT ip, host, url, referrer, ts, DATE_FORMAT(ts, 'yyyyMMdd') AS dt FROM web_events; ``` 執行 job.sql ```= # 開叢集 ./bin/start-cluster.sh # 打開 SQL Client # -f 會一次把整份檔案送去執行 ./bin/sql-client.sh -f /path/to/job.sql ``` check sql ```= # 開叢集 ./bin/start-cluster.sh # 打開 SQL Client ./bin/sql-client.sh # check Kafka 是否有真的寫入 SELECT * FROM web_events; ``` DAG2 : 每小時自動載入 GCS → BigQuery ```= from datetime import datetime from airflow import DAG from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator with DAG( dag_id="gcs_parquet_to_bq_hourly", start_date=datetime(2025, 8, 1), schedule_interval="0 * * * *", # 每小時 catchup=False, tags=["gcs_to_bq"], ) as dag: load = GCSToBigQueryOperator( task_id="load_parquet", bucket="my-web-events-bucket", source_objects=["web-events/raw/{{ ds_nodash }}/*.parquet"] destination_project_dataset_table="my_analytics.web_events", source_format="PARQUET", write_disposition="WRITE_APPEND", # 或選 WRITE_TRUNCATE(每日重寫) autodetect=True, gcp_conn_id="google_cloud_default", # 確保 Airflow UI 有這個 connection ) ``` 從 BigQuery 每天做 Window 分析(session 分析),結果寫入 OLTP 資料庫,供讀取分析 先建 PostgreSQL 表格(只做一次) ```=sql CREATE TABLE daily_sessions ( ip TEXT, host TEXT, event_date DATE, total_events INT, first_event TIMESTAMP, last_event TIMESTAMP, session_duration_seconds INT ); ``` SQL 檔,或放在 Scheduled Query daily_sessions_today.sql ```=sql # 計算每個 IP + Host 的每日瀏覽行為與停留時間 CREATE OR REPLACE TABLE my_analytics.daily_sessions AS SELECT ip, host, DATE(ts) AS event_date, COUNT(*) AS total_events, MIN(ts) AS first_event, MAX(ts) AS last_event, TIMESTAMP_DIFF(MAX(ts), MIN(ts), SECOND) AS session_duration_seconds FROM my_analytics.web_events GROUP BY ip, host, DATE(ts); ``` DAG3 : 每天從 BigQuery 抽資料 → 寫入 PostgreSQL ```= from airflow import DAG from airflow.providers.google.cloud.transfers.bigquery_to_postgres import BigQueryToPostgresOperator from datetime import datetime from airflow.operators.python import PythonOperator import os with DAG( dag_id="bq_to_postgres_daily", start_date=datetime(2025, 8, 1), schedule_interval="0 2 * * *", # 每天凌晨 2 點 catchup=False, ) as dag: # bq_to_pg = BigQueryToPostgresOperator( task_id="bq_to_pg_daily_sessions", sql="SELECT * FROM `my_analytics.daily_sessions` WHERE event_date = CURRENT_DATE()", postgres_table="daily_sessions", replace=False, truncate=False, write_disposition="WRITE_APPEND", gcp_conn_id="google_cloud_default", postgres_conn_id="my_postgres", ) ```