# 用 Docker 實作 Spark 讀寫 Cassandra 資料 ###### tags: `Data Engineering` ###### 更新日期: 2025-09-10 ## 任務說明 在本機(自己的電腦上) 啟兩個 docker, 一個是 cassandra,一個是 spark, 然後用 spark 把資料存入 cassandra 之後, 再用 spark 再撈出來 ## (0) 開啟一個資料夾並新增 docker-compose.yml ``` # docker-compose.yml (Cassandra + Bitnami Spark) services: cassandra: image: cassandra:latest container_name: cassandra ports: - "9042:9042" # CQL environment: - MAX_HEAP_SIZE=1G - HEAP_NEWSIZE=256M volumes: - cassandra_data:/var/lib/cassandra healthcheck: test: ["CMD-SHELL", "cqlsh -e 'DESCRIBE KEYSPACES' 2>/dev/null || exit 1"] interval: 20s timeout: 5s retries: 30 spark: image: bitnami/spark:3.5.1 container_name: spark depends_on: cassandra: condition: service_healthy environment: - SPARK_MODE=client # 互動用 # 可選:如果你要用 Python 3.11 以上,Bitnami 已內建,不需另設 stdin_open: true tty: true command: ["/bin/bash","-lc","sleep infinity"] # 讓容器常駐 volumes: - ./work:/work # 腳本路徑 (Windows 可用相對路徑) volumes: cassandra_data: ``` ## (1) 啟動 cassandra 和 spark cd 到 docker-compose.yml 的資料夾 ➱ docker compose up -d ➱ docker compose ps ## (2) 進 Cassandra 建 keyspace/表 ➱ docker compose exec cassandra bash ➱ cqlsh 在 cqlsh> 中執行: -- 建 keyspace(SimpleStrategy 先測就好) ``` CREATE KEYSPACE demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; ``` ``` USE demo; ``` -- 建表(注意 Cassandra 沒有 AUTO_INCREMENT,要自己決定主鍵) ``` CREATE TABLE exchange_rate_settle ( currency text, currency_settle text, rate decimal, ts timestamp, PRIMARY KEY ((currency, currency_settle), ts) -- 複合分區鍵 + 排序鍵 ); ``` -- 插入幾筆資料 ``` INSERT INTO exchange_rate_settle (currency, currency_settle, rate, ts) VALUES ('USD', 'TWD', 32.5, toTimestamp(now())); INSERT INTO exchange_rate_settle (currency, currency_settle, rate, ts) VALUES ('USD', 'TWD', 32.8, toTimestamp(now())); SELECT * FROM exchange_rate_settle WHERE currency='USD' AND currency_settle='TWD'; ``` ## (3) 用 Spark 寫入/讀出 Cassandra ### 進到 Spark 容器 ➱ docker compose exec spark bash ------------------------------------ ### (a) 找到 py4j 檔名(版本號可能稍有不同) ``` ls /opt/bitnami/spark/python/lib/py4j*-src.zip ``` #### 假設看到 py4j-0.10.9.7-src.zip,就設置 PYTHONPATH 與 PYTHONSTARTUP ``` export PYTHONPATH=/opt/bitnami/spark/python/lib/py4j-0.10.9.7-src.zip:/opt/bitnami/spark/python/:/opt/bitnami/spark/python/ export PYTHONSTARTUP=/opt/bitnami/spark/python/pyspark/shell.py ``` ### (b) 直接呼叫 spark-submit 啟動 PySpark 殼(而非呼叫 pyspark wrapper) ``` /opt/bitnami/spark/bin/spark-submit \ --master local[*] \ --packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.1 \ --conf spark.cassandra.connection.host=cassandra \ pyspark-shell-main ``` ### 啟動後快速驗證 ``` spark.version df = spark.read.format("org.apache.spark.sql.cassandra") \ .options(keyspace="demo", table="exchange_rate_settle").load() df.limit(5).show() ``` ## Spark 寫入與讀取 Cassandra ``` from pyspark.sql import functions as F from pyspark.sql import Row from datetime import datetime # 寫入 3 筆資料 from decimal import Decimal from datetime import datetime, timedelta from pyspark.sql import Row base = datetime.utcnow() rows = [ Row(currency="USD", currency_settle="TWD", rate=Decimal("30.28"), ts=base), Row(currency="USD", currency_settle="TWD", rate=Decimal("30.295"), ts=base + timedelta(milliseconds=1)), Row(currency="USD", currency_settle="TWD", rate=Decimal("30.279"), ts=base + timedelta(milliseconds=2)), ] spark.createDataFrame(rows).write.format("org.apache.spark.sql.cassandra") \ .mode("append").options(keyspace="demo", table="exchange_rate_settle").save() # 讀整張表 (把資料 load 進 df) df = spark.read.format("org.apache.spark.sql.cassandra") \ .options(keyspace="demo", table="exchange_rate_settle") \ .load() # 依照時間排序後再看資料 df.orderBy("currency","currency_settle","ts").show(df.count(), truncate=False) ``` ## (4) 收尾 ➱ exit() # 退出 pyspark ➱ exit # 退出 Spark 容器 bash ➱ docker compose down