# 用 Docker 實作 Spark 讀寫 Cassandra 資料
###### tags: `Data Engineering`
###### 更新日期: 2025-09-10
## 任務說明
在本機(自己的電腦上) 啟兩個 docker,
一個是 cassandra,一個是 spark,
然後用 spark 把資料存入 cassandra 之後,
再用 spark 再撈出來
## (0) 開啟一個資料夾並新增 docker-compose.yml
```
# docker-compose.yml (Cassandra + Bitnami Spark)
services:
cassandra:
image: cassandra:latest
container_name: cassandra
ports:
- "9042:9042" # CQL
environment:
- MAX_HEAP_SIZE=1G
- HEAP_NEWSIZE=256M
volumes:
- cassandra_data:/var/lib/cassandra
healthcheck:
test: ["CMD-SHELL", "cqlsh -e 'DESCRIBE KEYSPACES' 2>/dev/null || exit 1"]
interval: 20s
timeout: 5s
retries: 30
spark:
image: bitnami/spark:3.5.1
container_name: spark
depends_on:
cassandra:
condition: service_healthy
environment:
- SPARK_MODE=client # 互動用
# 可選:如果你要用 Python 3.11 以上,Bitnami 已內建,不需另設
stdin_open: true
tty: true
command: ["/bin/bash","-lc","sleep infinity"] # 讓容器常駐
volumes:
- ./work:/work # 腳本路徑 (Windows 可用相對路徑)
volumes:
cassandra_data:
```
## (1) 啟動 cassandra 和 spark
cd 到 docker-compose.yml 的資料夾
➱ docker compose up -d
➱ docker compose ps
## (2) 進 Cassandra 建 keyspace/表
➱ docker compose exec cassandra bash
➱ cqlsh
在 cqlsh> 中執行:
-- 建 keyspace(SimpleStrategy 先測就好)
```
CREATE KEYSPACE demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
```
```
USE demo;
```
-- 建表(注意 Cassandra 沒有 AUTO_INCREMENT,要自己決定主鍵)
```
CREATE TABLE exchange_rate_settle (
currency text,
currency_settle text,
rate decimal,
ts timestamp,
PRIMARY KEY ((currency, currency_settle), ts) -- 複合分區鍵 + 排序鍵
);
```
-- 插入幾筆資料
```
INSERT INTO exchange_rate_settle (currency, currency_settle, rate, ts)
VALUES ('USD', 'TWD', 32.5, toTimestamp(now()));
INSERT INTO exchange_rate_settle (currency, currency_settle, rate, ts)
VALUES ('USD', 'TWD', 32.8, toTimestamp(now()));
SELECT * FROM exchange_rate_settle WHERE currency='USD' AND currency_settle='TWD';
```
## (3) 用 Spark 寫入/讀出 Cassandra
### 進到 Spark 容器
➱ docker compose exec spark bash
------------------------------------
### (a) 找到 py4j 檔名(版本號可能稍有不同)
```
ls /opt/bitnami/spark/python/lib/py4j*-src.zip
```
#### 假設看到 py4j-0.10.9.7-src.zip,就設置 PYTHONPATH 與 PYTHONSTARTUP
```
export PYTHONPATH=/opt/bitnami/spark/python/lib/py4j-0.10.9.7-src.zip:/opt/bitnami/spark/python/:/opt/bitnami/spark/python/
export PYTHONSTARTUP=/opt/bitnami/spark/python/pyspark/shell.py
```
### (b) 直接呼叫 spark-submit 啟動 PySpark 殼(而非呼叫 pyspark wrapper)
```
/opt/bitnami/spark/bin/spark-submit \
--master local[*] \
--packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.1 \
--conf spark.cassandra.connection.host=cassandra \
pyspark-shell-main
```
### 啟動後快速驗證
```
spark.version
df = spark.read.format("org.apache.spark.sql.cassandra") \
.options(keyspace="demo", table="exchange_rate_settle").load()
df.limit(5).show()
```
## Spark 寫入與讀取 Cassandra
```
from pyspark.sql import functions as F
from pyspark.sql import Row
from datetime import datetime
# 寫入 3 筆資料
from decimal import Decimal
from datetime import datetime, timedelta
from pyspark.sql import Row
base = datetime.utcnow()
rows = [
Row(currency="USD", currency_settle="TWD", rate=Decimal("30.28"), ts=base),
Row(currency="USD", currency_settle="TWD", rate=Decimal("30.295"), ts=base + timedelta(milliseconds=1)),
Row(currency="USD", currency_settle="TWD", rate=Decimal("30.279"), ts=base + timedelta(milliseconds=2)),
]
spark.createDataFrame(rows).write.format("org.apache.spark.sql.cassandra") \
.mode("append").options(keyspace="demo", table="exchange_rate_settle").save()
# 讀整張表 (把資料 load 進 df)
df = spark.read.format("org.apache.spark.sql.cassandra") \
.options(keyspace="demo", table="exchange_rate_settle") \
.load()
# 依照時間排序後再看資料
df.orderBy("currency","currency_settle","ts").show(df.count(), truncate=False)
```
## (4) 收尾
➱ exit() # 退出 pyspark
➱ exit # 退出 Spark 容器 bash
➱ docker compose down