## **【Spark + Iceberg:本地端 Memory Tuning、Join 與資料傾斜處理】ft. DataExpert 課程實作**
:::info
- 什麼是 Apache Spark?
- Spark 的三大核心概念
- Spark 記憶體調整(Memory Tuning)
- Join 策略
- 是否該使用 Broadcast Hash Join?
- Notebook 模式 vs Spark Server
- Spark 暫存與快取機制比較
- Parquet(檔案格式與排序)
- 如何處理資料傾斜(某個partiton過大)? 兩種常見解法(AQE vs. Salting)
- 解法 1:Adaptive Query Execution(AQE,自適應查詢執行)
- 解法 2:Salting the GROUP BY(群組鹽值法)
- 安裝 docker
- 實作
- 補充:暫存視圖(Temporary View)
:::
<br/>
這篇是照著 [DataExpert Youtube : Spark + Iceberg in 1 Hour - Memory Tuning, Joins, Partition - Week 3 Day 1 - DataExpert.io Boot Camp](https://www.youtube.com/watch?v=3R-SLYK-P_0) 課程實作
內容包含:
- Spark 效能調校: 掌握 Memory Tuning、高效能 Join 策略,並實作 DataFrame 與 Dataset 的應用
- 資料傾斜處理: 學習並應用 Salting 等技巧,解決分散式運算中的效能瓶頸
- 資料湖整合: 結合 Iceberg Table Format,管理並操作本地端的分區資料。
如果想要認證放在 Linkedin,要到官網完成所有課程和繳交作業
<br/>
## 什麼是 Apache Spark?
Apache Spark 是一個分散式計算框架,旨在高效地處理非常大量的資料。它被視為 Hadoop 和 Java MapReduce 等早期大數據技術的後繼者
可以把它想像成一個擁有許多工人(Executor)的工頭(Driver),能夠將一個複雜的任務(例如分析 100TB 的資料)分解成許多小任務,並分配給所有工人同時執行,大幅提升效率
Spark 的核心優勢在於它能將運算過程儲存在記憶體中,比傳統的 MapReduce 寫入硬碟的方式快很多
### Spark 的三大核心概念
1. RDD vs. DataFrame (資料處理的演進)
| 特性 | **RDD (Resilient Distributed Datasets)** | **DataFrame** |
| :--- | :--- | :--- |
| **資料結構** | 無結構,類似於分散式的 Java/Scala/Python 物件集合 | 結構化,類似於帶有欄位名稱的資料表 |
| **優點** | 靈活度高,可以處理任何格式的資料(包含非結構化的純文字、圖片) | 效能高,因為有 **Catalyst 優化器**會自動最佳化查詢 |
| **缺點** | 效能較低,因為 Spark 無法理解資料的內部結構,無法進行優化 | 僅限於結構化與半結構化資料 |
| **API** | 提供了低階轉換(`map`、`filter`)和動作(`collect`、`reduce`) | 提供了高階 API(如 `select`、`where`、`groupBy`、`join`),類似 SQL |
| **類型安全** | 編譯時型別安全(在 Scala/Java 中) | 弱類型,主要靠欄位名稱 |
| **依賴與聚合** | - **窄依賴 (Narrow dependency)**:每個 parent partition 對應少數 child(例:`map`、`filter`)<br> - **寬依賴 (Wide dependency)**:child partition 需要 shuffle(例:`groupByKey`、`reduceByKey`),會觸發 shuffle → 成本高。<br> - **GroupByKey**:所有同 key 的資料都要 shuffle 到同一個節點 → 成本高、容易 OOM。<br> - **ReduceByKey**:先在本地 combine,再 shuffle → 更高效。 | Catalyst 自動判斷 query plan,減少 shuffle,並選擇最佳執行方式 |
| **分區控制** | - **coalesce(n)**:減少分區數,**不 shuffle**,快但可能不均勻,常用於輸出前避免小檔案。<br> - **repartition(n)**:增加或減少分區數,**一定 shuffle**,資料更平均但成本較高。 | DataFrame 同樣支援 `coalesce()` 和 `repartition()`,用法與效果一致 |
| **資料讀取** | `sc.textFile()`、`wholeTextFiles()` | `spark.read.csv/json/parquet/orc/jdbc`;支援 HDFS、S3、GCS 等儲存系統 |
| **Schema 管理** | 無 schema,需要自行解析字串或物件 | - **Schema Enforcement**:用 `StructType` 明確指定欄位型別,避免推斷錯誤<br> - **Read Modes**:`PERMISSIVE`(預設)、`DROPMALFORMED`、`FAILFAST`,控制壞資料處理方式 |
| **Transformation / Action** | `map`、`filter`、`flatMap`(transformation);`collect`、`count`(action) | `select`、`where`、`withColumn`(transformation);`show`、`count`、`write.save`(action) |
| **資料寫入** | RDD 轉文字檔 `saveAsTextFile()` | `df.write.mode("append/overwrite").format("parquet").partitionBy("dt")`;常用 Parquet/ORC |
| **操作與效能** | 需要手動優化(如調整分區、避免 shuffle) | Catalyst + Tungsten 自動優化;支援 `explain()` 查看 query plan |
| **資料型別處理** | 基本型別(字串、數字);需手動處理 JSON/結構化 | - `cast()` 轉型<br> - `fillna()/dropna()` 缺值處理<br> - 複合型別:`ArrayType`、`MapType`、`StructType`<br> - JSON 處理:`from_json()/to_json()`<br> - 拆解:`explode()` |
| **日期與時間** | 沒有內建支援,需要自己 parse | - `to_date()`、`to_timestamp()`、`date_format()`<br> - `datediff()`、`add_months()`、`last_day()`<br> - 時區設定:`spark.sql.session.timeZone` |
| **快取與暫存** | - `cache()` = `persist(MEMORY_ONLY)`,適合重複查詢<br> - `persist()` 可選多種層級(MEMORY、DISK、SER)<br> 下面解說 | DataFrame 與 Table 都支援 `cache()` / `persist()` / `unpersist()`;也可用 `spark.catalog.cacheTable("table")` 快取整張表 |
| **適用場景** | 需要對資料進行**低階、客製化**操作,或處理**非結構化資料**時 | 絕大多數的 ETL、批次分析、機器學習等**結構化資料處理**任務 |
| **演進** | Spark 1.0 版本的主要核心 | Spark 2.0 版本開始推廣,並成為主流 |
2. Driver vs. Executor (工頭與工人的關係)
Spark 應用程式由一個 Driver 和多個 Executor 組成,這就是分散式運算的基礎
| 角色 | **Driver (驅動程式)** | **Executor (執行器)** |
| :--- | :--- | :--- |
| **任務** | 負責**統籌規劃**,將任務分解成多個階段和任務,並分配給 Executor 執行 | 負責**實際執行運算**,處理由 Driver 分配的小任務。 |
| **比喻** | 建築工地的**工頭**,規劃建築藍圖並分配工作給工人 | 建築工地的**工人**,負責砌磚、搬運等實際工作 |
| **運作** | 當你提交一個 Spark 應用程式時,Driver 就會啟動,與叢集管理器(如 YARN 或 Kubernetes)協調,以獲得 Executor 的資源 | 每個 Executor 都有自己的 CPU 核心和記憶體,獨立執行任務並回報結果給 Driver |
3. 分散式運算原理 (協同工作的核心流程)
提交一個 Spark 任務時,背後會發生以下流程:
| 階段 | **核心動作** | **說明** |
| :--- | :--- | :--- |
| **1. 任務提交** | 啟動 Driver | 使用 `spark-submit` 啟動 Spark 應用程式,並啟動作為工頭的 **Driver** |
| **2. 資源協調** | 申請 Executor | **Driver** 與叢集管理器溝通,申請所需的 **Executor** (工人) 資源 |
| **3. 任務分解** | 建立執行計畫 | **Driver** 將複雜的任務(如 JOIN 或 GROUP BY)拆解成多個**階段 (Stages)** 和更小的**任務 (Tasks)** |
| **4. 任務執行** | 分配與運算 | **Driver** 將任務發送到每個可用的 **Executor**,由它們平行處理分散在不同節點上的資料 |
| **5. 結果回傳** | 收集結果 | **Executor** 完成任務後,將結果回傳給 **Driver**,或將中間結果暫存在記憶體/硬碟中,供後續階段使用
<br/>
假設我要做一個資料管道,可能會是
json file → Kafka → Spark 批次處理資料 → GCS 檔案 → BigQuery → windows 分析後存入 postgresql
<br/>
下面課程示範為「Spark 批次處理資料」階段:(本地端 docker 容器)建立 SparkSession -> 讀表 → 加鹽聚合 → 寫回 Iceberg 表 -> 觀察處理是否真的改善
<br/>
## Spark 記憶體調整(Memory Tuning)
| 參數名稱 | 中文說明 | 詳細解釋 |
| ------------------------------------- | ------------------------ | ------------------------------------------------------------------------------------------------- |
| `spark.executor.memory` | 每個 Executor 可使用的記憶體大小 | 設定太小會導致 Spark 必須將資料「spill to disk」(寫到磁碟),導致速度大幅變慢。講師建議透過嘗試不同值(例如 2、4、6、8GB),找到能穩定運行且最小的值,以避免記憶體浪費|
| `spark.executor.cores` | 每個 Executor 可使用的 CPU 核心數 | 控制每個 Executor 同時能執行多少 task。預設是 4,講師建議不超過 6,避免資源爭用 |
| `spark.executor.memoryOverheadFactor` | Executor 記憶體的額外保留比例 | 這是保留給非 heap 記憶體用途(例如 shuffle buffer、序列化、UDF 中 native 操作等),預設是 10%。如果 job 很複雜,或使用大量 UDF,講師建議加大這個比例 |
<br/>
## Join 策略
Spark 主要支援三種類型的連接操作
| Join 類型 | 適用條件 | 原理 | 效能特性 | 限制 / 注意事項 | 案例 |
|-----------|----------|------|----------|-----------------|------|
| **Shuffle Sort Merge Join**<br>(洗牌排序合併連接) | 幾乎所有情況 | 依連接鍵 Shuffle 資料到各執行器 → 每個執行器內排序與合併 | 最不高效,但最通用 | Shuffle 是 Spark 擴展性瓶頸,大量資料(>20-30TB/天)成本極高 | — |
| **Broadcast Hash Join**<br>(廣播雜湊連接) | 其中一側資料集很小(~8-10GB 以內) | 小表廣播到所有執行器,避免 Shuffle | 非常高效,節省網路與磁碟 I/O | 小表過大(例如 IPv6 查找表)時無法廣播 | Netflix:處理 100TB/h 流程中廣播數 GB 的 IP 查找表 |
| **Bucket Join**<br>(分桶連接) | 兩表均已按連接鍵分桶 | 預先按鍵分桶 → 相同桶號直接匹配 → 無 Shuffle | 巨大效能提升 | 建桶需成本,適用多次 Join/聚合場景;桶數建議 2 的冪次方;避免過多空桶 | Facebook:10TB 與 50TB 資料,1024 桶,將 Shuffle Join 轉為高效 Bucket Join |
<br/>
## 是否該使用 Broadcast Hash Join?
小表被很多任務重複廣播且經常變動,導致網路壓力大,這時可考慮 cache 小表,或落地成維度表

```=
spark.conf.set("spark.sql.adaptive.enabled", "true")
# 調整自動廣播門檻
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "256MB")
```
<br/>
## Notebook 模式 vs Spark Server
| Notebook 模式 | Spark Server 模式 |
| --------------------------- | ----------------------- |
| Session 一直開著,變數和 cache 會一直留 | 每次 run 都是新的 Application |
| 容易忘記 `unpersist()` 導致記憶體佔滿 | 不會留舊 cache,乾淨環境 |
| 適合探索資料(EDA) | 適合正式任務、批次處理、測試 |
<br/>
## Spark 暫存與快取機制比較
| Spark 行為 | 說明 |
| -------------- | ---------------------------- |
| Temporary View | 儲存查詢邏輯,每次都重新計算 |
| Cache (MEMORY) | 把資料結果暫存在記憶體,重複使用時不用重算 |
| Cache (DISK) | 把 DataFrame 序列化後,檔案寫到磁碟,下次用的時候直接從磁碟讀 |
| unpersist() | 在 Notebook 模式(像 Jupyter、Databricks Notebook),快取資料會一直佔記憶體,要手動處理 |
<br/>
## Parquet(檔案格式與排序)
| 功能 / 建議 | 說明 | Pandas 對照理解 |
| -------------------------------- | ------------------------------------- | -------------------------------------------------------- |
| **Parquet 格式** | 支援壓縮(Run-length encoding),非常適合大資料儲存 | Pandas `to_parquet()`,比 `to_csv()` 更省空間、更快讀寫 |
| **不要用 `.sort()` 全域排序** | 全域排序會非常慢,因為 Spark 要在多節點間移動資料(Shuffle) | Pandas `sort_values()` 對超大 DataFrame 在單機很慢一樣,但這裡還要跨機器傳資料 |
| **改用 `.sortWithinPartitions()`** | 只在每個分區內排序,能並行處理,速度快 | Pandas `groupby(..., sort=False)` + 再對每組內排序,避免全表排序 |
<br/>
## 如何處理資料傾斜(某個partiton過大)? 兩種常見解法(AQE vs. Salting)
當某個分區(或 Executor)獲得的資料量遠多於其他分區時,就會發生資料傾斜 (Data Skew),這會導致效能瓶頸。我們可以採用兩種常見策略來應對:
- 解法 1: AQE (自適應查詢執行) — 這是自動化的智慧優化
- 解法 2: Salting (群組鹽值法) — 這是手動的程式碼調整
<br/>
### 解法 1:Adaptive Query Execution(AQE,自適應查詢執行)
AQE 是 Spark 2.x 版之後內建的智慧功能,能自動偵測並優化執行計畫
當開啟功能後,Spark 在執行 GROUP BY 或 JOIN 時,若發現資料傾斜,它會:
- 動態切分大分區:自動將資料量異常龐大的分區(例如某個 user_id)切分成多個小分區
- 動態調整分區數:根據資料的實際大小(例如 10MB),自動調整 Shuffle 之後的分區數量,確保每個執行器的工作量更平衡
啟用 AQE 總開關
```=
spark.conf.set("spark.sql.adaptive.enabled", "true")
```
處理 Join/Aggregate 資料傾斜,自動判斷切分 _1、_2...
```=
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
```
設定每個分區的最佳大小,讓 Spark 自動調整分區數量
這裡設定為 64MB,意味著 Spark 會盡量讓每個分區的輸出大小接近 64MB
```=
spark.conf.set("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", "64MB")
```
當執行以下 SQL 時,Spark 會在後台自動判斷哪個 user_id 存在資料傾斜,並進行分片處理,我們不需要修改程式碼
```=
SELECT user_id, COUNT(*) AS interaction_count
FROM social_media_events
GROUP BY user_id
```
<br/>
### 解法 2:Salting the GROUP BY(群組鹽值法)
Salting 是一種手動解決資料傾斜的方法。核心思想是:透過引入一個隨機值(salt),將原本傾斜的單一鍵值,分散到多個分區上進行平行處理,然後再進行第二次聚合
假設要對 user_id 做 groupBy 或 join,但某個用戶(例如 Beyoncé)資料多得不合理,導致 Spark 卡在那一個 partition,會造成資料傾斜
- 為傾斜的 user_id 欄位加上一個隨機數(例如 0-9),形成一個新的複合鍵 (user_id_salt)
```=
# df.withColumn("salt", (rand() * 10).cast("int"))
df_salted = df.withColumn("salt", (rand() * 10).cast("int"))
df_salted.createOrReplaceTempView("salted_events")
```
- 對 salted key groupBy 並做第一次 aggregate,將工作分散到多個分區
```=
# df.groupBy("user_id", "salt").agg(...)
CREATE OR REPLACE TEMP VIEW salted_agg AS
SELECT user_id, salt, COUNT(*) AS partial_count
FROM salted_events
GROUP BY user_id, salt
```
- 對第一次聚合的結果,再進行第二次聚合,但這次只使用原始的 user_id,將所有相同 user_id 的部分結果合併
```=
# df.groupBy("user_id").agg(...)
SELECT user_id, SUM(partial_count) AS total_count
FROM salted_agg
GROUP BY user_id
```
如果是 AVG,就要先算 SUM 和 COUNT,最後再除
```=
SELECT user_id, SUM(total_sum) / SUM(total_count) AS avg_value
FROM (
SELECT user_id, salt, SUM(metric) AS total_sum, COUNT(*) AS total_count
FROM salted_events
GROUP BY user_id, salt
) tmp
GROUP BY user_id
```
<br/>
## 安裝 docker
docker-compose.yaml
- Spark(整合 Iceberg connector)
- REST API
- 模擬 S3 的物件儲存系統
- 容器啟動後自動初始化 MinIO bucket(warehouse),設定權限
```=
version: "3"
services:
spark-iceberg:
image: tabulario/spark-iceberg
container_name: spark-iceberg
build: spark/
networks:
iceberg_net:
depends_on:
- rest
- minio
volumes:
- ./warehouse:/home/iceberg/warehouse
- ./notebooks:/home/iceberg/notebooks/notebooks
- ./data:/home/iceberg/data
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
ports:
- 8888:8888
- 8080:8080
- 10000:10000
- 10001:10001
- 4040-4042:4040-4042
rest:
image: tabulario/iceberg-rest
container_name: iceberg-rest
networks:
iceberg_net:
ports:
- 8181:8181
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
- CATALOG_WAREHOUSE=s3://warehouse/
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio:9000
minio:
image: minio/minio
container_name: minio
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
- MINIO_DOMAIN=minio
networks:
iceberg_net:
aliases:
- warehouse.minio
ports:
- 9001:9001
- 9000:9000
command: ["server", "/data", "--console-address", ":9001"]
mc:
depends_on:
- minio
image: minio/mc
container_name: mc
networks:
iceberg_net:
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc rm -r --force minio/warehouse;
/usr/bin/mc mb minio/warehouse;
/usr/bin/mc policy set public minio/warehouse;
tail -f /dev/null
"
networks:
iceberg_net:
```
執行
```=
docker-compose up -d
```

<br/>
## 實作
```=
應用程式 → PostgreSQL (OLTP),這裡沒連接S3,用 csv 模擬資料示範
↓ (CDC/ETL)
S3 + Iceberg (Data Lake)
↓
Spark 大數據分析 → 結果存回 S3/PostgreSQL/csv
↓
BI 工具/儀表板
```
Iceberg 的作用是作為 資料湖的表格式 (Table Format),讓 S3 這種物件儲存變得像資料庫一樣好用
確認 pyspark 環境配置
```=
import pyspark
print("pyspark version:", pyspark.__version__)
print("pyspark path :", inspect.getfile(pyspark))
```

設定 JAVA_HOME (下載路徑) + 設定環境變數
我自己有很多python環境,指定目前這個
```=
import os, sys
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-arm64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
# 讓 driver / executor 都用現在這個 Python
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable
print("JAVA_HOME =", os.environ["JAVA_HOME"])
```

Iceberg JAR 設定
```=
ICEBERG_JAR = "/opt/spark/jars/iceberg-spark-runtime-3.5_2.12-1.8.1.jar"
# 檢查 JAR 檔案是否存在
if not os.path.exists(ICEBERG_JAR):
print(f"Iceberg JAR 檔案不存在: {ICEBERG_JAR}")
print("請再次確認")
else:
print(f"找到 Iceberg JAR: {ICEBERG_JAR}")
# 設置 Spark submit 參數
os.environ["PYSPARK_SUBMIT_ARGS"] = f"--jars {ICEBERG_JAR} pyspark-shell"
print(f"PYSPARK_SUBMIT_ARGS = {os.environ['PYSPARK_SUBMIT_ARGS']}")
```
啟動 Spark + Iceberg
假設兩個表,一個100MB、一個260MB
- spark.sql.autoBroadcastJoinThreshold 門檻=256MB
- Spark 的判斷:Spark 會看到 100MB 的表小於 256MB 的門檻值,而 260MB 的表超過門檻
- Spark 的行動:Spark 會選擇最有效率的 Broadcast Hash Join。它會將 100MB 的表廣播到所有執行器上,並在這些執行器上與 260MB 的表進行連接
- spark.sql.autoBroadcastJoinThreshold 門檻=50MB
- Spark 的判斷:Spark 會看到兩個表的大小(260MB 和 100MB)都超過了 50MB 的門檻值
- Spark 的行動:Spark 會放棄 Broadcast Join,退而求其次,改用 Shuffle Sort Merge Join。它會對兩個表都進行資料洗牌(Shuffle)和排序(Sort),這個過程會產生大量的網路傳輸和磁碟 I/O,導致任務效能下降。之後可以手動廣播,或一開始就使用 Bucket Join(分桶連接)
```=
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, expr, col
spark = (
# 配置 Iceberg
SparkSession.builder
.appName("Jupyter-Iceberg-REST")
# --- 效能調校設定 ---
# 啟用 AQE(Adaptive Query Execution),讓 Spark 能動態優化
.config("spark.sql.adaptive.enabled", "true")
# 設定自動 Broadcast Join 的門檻,預設是 10MB
.config("spark.sql.autoBroadcastJoinThreshold", "256MB")
# 啟用 AQE 的資料傾斜處理
.config("spark.sql.adaptive.skewJoin.enabled", "true")
# --- 調整 memory ---
.config("spark.executor.memory", "4g")
.config("spark.executor.cores", "2")
# --- Iceberg 與 Catalog 設定 ---
.config("spark.sql.defaultCatalog", "spark_catalog")
# 加載 Iceberg 的 Spark 擴充功能
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
# REST catalog 配置(不使用 S3FileIO)
# .config("spark.sql.catalog.rest", "org.apache.iceberg.spark.SparkCatalog")
# .config("spark.sql.catalog.rest.type", "rest")
# .config("spark.sql.catalog.rest.uri", "http://iceberg-rest:8181")
# *** S3 相關配置,讓 REST 服務處理存儲 ***
# .config("spark.sql.catalog.rest.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
# .config("spark.sql.catalog.rest.s3.endpoint", "http://minio:9000")
# .config("spark.sql.catalog.rest.s3.path-style-access", "true")
# 啟用本地 spark_catalog
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
.config("spark.sql.catalog.spark_catalog.warehouse", "/tmp/iceberg-warehouse")
# 開啟 AQE(Adaptive Query Execution)
.config("spark.sql.adaptive.enabled", "true")
.getOrCreate()
)
#
all_conf = dict(spark.sparkContext.getConf().getAll())
catalog_conf = {k: v for k, v in all_conf.items() if "catalog" in k.lower()}
print("\n=== Catalog 配置 ===")
for k, v in catalog_conf.items():
print(f"{k}: {v}")
# 測試兩個 catalog
print("\n=== 測試 Catalogs ===")
try:
catalogs = spark.sql("SHOW CATALOGS").collect()
print("可用的 catalogs:")
for catalog in catalogs:
print(f" - {catalog.catalog}")
except Exception as e:
print(f"列出 catalogs 失敗: {e}")
```
成功的話,會看到
+--------------+
| catalog |
+--------------+
| spark_catalog|
+--------------+
這裡沒連sql,載入 CSV 模擬資料
```=
events = spark.read.option("header", "true").csv("/home/iceberg/data/events.csv").withColumn("event_date", expr("DATE_TRUNC('day', event_time)"))
devices = spark.read.option("header","true").csv("/home/iceberg/data/devices.csv")
df = events.join(devices,on="device_id",how="left")
df = df.withColumnsRenamed({'browser_type': 'browser_family', 'os_type': 'os_family'})
df.show()
```

```=
sorted = df.repartition(10, col("event_date"))\
.sortWithinPartitions(col("event_date"), col("host"))\
.withColumn("event_time", col("event_time").cast("timestamp"))
sortedTwo = df.repartition(10, col("event_date"))\
.sort(col("event_date"), col("host"))\
.withColumn("event_time", col("event_time").cast("timestamp"))
sorted.show()
sortedTwo.show()
```

建立 Iceberg 表
```=sql
%%sql
SHOW CATALOGS;
```

```=sql
%%sql
-- 使用本地 spark_catalog 創建 DB
CREATE NAMESPACE IF NOT EXISTS spark_catalog.test;
```
```=sql
%%sql
SHOW NAMESPACES IN spark_catalog;
```

```=sql
%%sql
DROP TABLE IF EXISTS spark_catalog.test.events;
```
```=sql
%%sql
DROP TABLE IF EXISTS spark_catalog.test.events_sorted;
```
```=sql
%%sql
CREATE TABLE IF NOT EXISTS spark_catalog.test.events (
url STRING,
referrer STRING,
browser_family STRING,
os_family STRING,
device_family STRING,
host STRING,
event_time TIMESTAMP,
event_date DATE
)
USING iceberg
PARTITIONED BY (event_date);
```
```=sql
%%sql
CREATE TABLE IF NOT EXISTS spark_catalog.test.events_sorted (
url STRING,
referrer STRING,
browser_family STRING,
os_family STRING,
device_family STRING,
host STRING,
event_time TIMESTAMP,
event_date DATE
)
USING iceberg
PARTITIONED BY (event_date);
```
```=sql
%%sql
SELECT * FROM spark_catalog.test.events;
```
```=sql
%%sql
SELECT * FROM spark_catalog.test.events_sorted
```

Spark 排序 + Iceberg 表寫入比較
```=
from pyspark.sql.functions import col, to_timestamp
# 1
start_df = (df
.withColumn("event_time", to_timestamp("event_time"))
.withColumn("event_date", col("event_time").cast("date"))
.withColumnRenamed("device_type", "device_family") # ← 對齊表的欄位名
)
# 2 只挑表中需要的欄位
cols = ["url","referrer","browser_family","os_family","device_family","host","event_time","event_date"]
df_out = start_df.select(*[col(c) for c in cols])
# 3 分區後寫入(先分別排序,避免記憶體壓力)
df_out = df_out.repartition(10, col("event_date"))
df_out.writeTo("spark_catalog.test.events").overwritePartitions()
# check schema 一樣
first_sort_df = df_out.sortWithinPartitions(col("event_date"), col("browser_family"), col("host"))
first_sort_df.writeTo("spark_catalog.test.events_sorted").overwritePartitions()
```
```=sql
%%sql
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted'
FROM spark_catalog.test.events_sorted.files
UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted'
FROM spark_catalog.test.events.files
```
```=sql
%%sql
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files
FROM spark_catalog.test.events.files;
```

實際觀察 Spark 執行計劃
```=sql
%%sql
EXPLAIN SELECT count(*)
FROM spark_catalog.test.events
WHERE event_date < DATE '2022-01-01';
```

| 區塊 | 說明 |
| ------------------------------------- | ------------------------------------------------------- |
| `AdaptiveSparkPlan` | 啟用了 AQE(Adaptive Query Execution),表示 Spark 會依據實際資料調整計劃 |
| `BatchScan spark_catalog.test.events` | 使用 Iceberg 的 V2 DataSource 進行批次掃描 |
| `filters=...` | `event_date = ...` 的查詢條件**已成功被下推**(Filter Pushdown) |
| `1754870400000000` | 是 `2025-08-11` 的微秒時間戳(Iceberg 內部格式) |
| `Filter(...)` | Spark 還會在執行時再做一次檢查(保險) |
| `groupedBy=[]` | 無 group by 聚合,因此不需推入 Iceberg |
| `RuntimeFilters: []` | 未使用動態分區過濾(如 join filter) |
確認有多少不同partition(event_date)
```=
%%sql
SELECT DISTINCT partition
FROM spark_catalog.test.events.files;
```

查看
```=
%%sql
SELECT *
FROM spark_catalog.test.events
WHERE event_date = DATE '2021-02-06';
```

<br/>
## 補充:暫存視圖(Temporary View)
若是今天想要 建立暫存視圖(Temporary View),可以註冊一個臨時的表,在後面引用
Temporary View
```=
devices.createOrReplaceTempView("devices")
events.createOrReplaceTempView("events")
```
三種方式過濾掉 user_id 或 device_id NULL 的資料 (Dataset API vs DataFrame API vs Spark SQL)
```=
// Dataset API
val filteredViaDataset = events.filter(event => event.user_id.isDefined && event.device_id.isDefined)
// DataFrame API
val filteredViaDataFrame = events.toDF().where($"user_id".isNotNull && $"device_id".isNotNull)
// Spark SQL
val filteredViaSparkSql = sparkSession.sql("SELECT * FROM events WHERE user_id IS NOT NULL AND device_id IS NOT NULL")
```
Dataset API
```=
val combinedViaDatasets = filteredViaDataset
.joinWith(devices, events("device_id") === devices("device_id"), "inner")
.map{ case (event: Event, device: Device) => EventWithDeviceInfo(
user_id=event.user_id.get,
device_id=device.device_id,
browser_type=device.browser_type,
os_type=device.os_type,
device_type=device.device_type,
referrer=event.referrer.getOrElse("unknow"),
host=event.host,
url=event.url,
event_time=event.event_time
) }
.map { eventWithDevice =>
// Convert browser_type to uppercase while maintaining immutability
eventWithDevice.copy(browser_type = eventWithDevice.browser_type.toUpperCase)
}
combinedViaDatasets.show(5)
```

DataFrame API(不需要定義 case class,常用於動態 schema 或不確定欄位時)
```=
val combinedViaDataFrames = filteredViaDataFrame.as("e")
.join(devices.as("d"), $"e.device_id" === $"d.device_id", "inner")
.select(
$"e.user_id",
$"d.device_id",
$"d.browser_type",
$"d.os_type",
$"d.device_type",
$"e.referrer",
$"e.host",
$"e.url",
$"e.event_time"
)
val rows= combinedViaDatasets.take(5)
rows.foreach(println)
combinedViaDataFrames.show(5)
```

Spark SQL (直觀,容易閱讀與維護)
```=
filteredViaSparkSql.createOrReplaceTempView("filtered_events")
val combinedViaSparkSQL = spark.sql(f"""
SELECT
fe.user_id,
d.device_id,
d.browser_type,
d.os_type,
d.device_type,
fe. referrer,
fe.host,
fe.url,
fe.event_time
FROM filtered_events fe
JOIN devices d ON fe.device_id = d.device_id
""")
combinedViaSparkSQL.show(5)
```

| 觀點 | Dataset API | DataFrame API | Spark SQL |
| ------------------- | ---------------------------------- | ------------------- | ------------ |
| **型別安全** | ✅ 有 (`Option[T]` 提供編譯期檢查) | ❌ 無 | ❌ 無 |
| **IntelliSense 支援** | ✅ 完整(case class 提供提示) | ❌ 弱(只靠欄位名稱) | ❌ 幾乎無 |
| **可讀性** | 中(有 `.map`、`.joinWith` 等 Scala 語法) | 中(類似 SQL,但欄位需加 `$`) | ✅ 高(最接近 SQL) |
| **維護性** | ✅ 高(compile-time check) | ❌ 易寫錯欄位名不報錯 | 中 |
| **靈活度** | ✅ 最佳(可搭配函數式編程) | ✅ 不錯 | ❌ 受限於 SQL 語法 |
| **實務適用時機** | 有明確 schema、穩定資料結構 | 欄位變動大、來源動態 | 給分析師、偏查詢導向情境 |