# Spark秒上手 #
###### tags: `bigdata` `spark`
## 安裝 ##
以Ubuntu為例
**Step1:** 下載安裝
```bash
# 安裝scala
sudo apt-get install scala
# 安裝spark
wget https://archive.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz
sudo tar xvf spark-2.4.3-bin-hadoop2.7.tgz -C /usr/local/spark
```
**Step2:** 編輯 ~/.bashrc
```bash
export JAVA_HOME="/path/to/java/home"
export HADOOP_HOME=/opt/hadoop-2.7.7
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH
```
註:Mac為`.bash_profile`
**Step3:** 環境變數
在 `/usr/local/spark/conf` 複製出 `spark-env.sh`:
```bash
# 指定encoding
export PYTHONIOENCODING=utf8
# 指定python版本
export PYSPARK_PYTHON=python3.6
export PYSPARK_DRIVER_PYTHON=python3.6
# 指定spark-submit需要的變數
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
```
**Step4:** 執行
```bash
# 引入設定
source ~/.bashrc
# 互動式介面
spark-shell
# 或
env JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 spark-shell
```
## 文檔範例: users.csv
```
0001,john
0002,peter
0003,mary
0004,albee
0005,tom
```
## 入門 ##
```python
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import Row
# 建立spark context
spark = SparkSession.builder \
.master('local[*]') \ # 本機執行才需要!
.appName("my app") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
sc = spark.sparkContext
# 讀為RDD檔
file_rdd = sc.textFile("/path/to/users.csv")
# RDD轉DataFrame
csv_rdd = file_rdd.map(lambda line: line.split(","))
rows = csv_rdd.map(lambda cols: Row(id=cols[0], name=cols[1]))
df = spark.createDataFrame(rows)
# 資料
df.count()
df.show()
```
### DataFrame ###
```python
# 建空的dataframe
emp_df = spark.createDataFrame([], StructType([]))
# == 以2D-Array建立
df = spark.createDataFrame([
(0, "John"),
(1, "Mary")
], ["id", "name"])
df = spark.createDataFrame([
[0, "John"],
[1, "Mary"]
], ["id", "name"]) # 也可以
df = spark.createDataFrame([
(0, "John"),
(1, "Mary")
]) # 不定義schema也可以,則使用預設值_1, _2, _3
df.show()
+---+----+
| id|name|
+---+----+
| 0|John|
| 1|Mary|
+---+----+
# == 加欄位
import pyspark.sql.functions as F
from pyspark.sql.types import *
df = df.withColumn("email", F.udf(lambda name: name+"@mail.com")(F.col('name')))
# --或
df = df.withColumn("email", F.udf(lambda name: name+"@mail.com", StringType())(F.col('name')))
df.show()
+---+----+-------------+
| id|name| email|
+---+----+-------------+
| 0|John|John@mail.com|
| 1|Mary|Mary@mail.com|
+---+----+-------------+
# == Join
gender_df = spark.createDataFrame([
(0, "M"),
(1, "F")
], ["id", "gender"])
join_df = df.join(gender_df, df.id == gender_df.id, how='left')
+---+----+---+------+
| id|name| id|gender|
+---+----+---+------+
| 0|John| 0| M|
| 1|Mary| 1| F|
+---+----+---+------+
# == 選欄位
A = df.alias('A')
G = gender_df.alias('G')
join_df = A.join(G, A.id == B.id, how='left').selectExpr("A.id AS id", "A.name AS name", "G.gender AS gender")
+---+----+------+
| id|name|gender|
+---+----+------+
| 0|John| M|
| 1|Mary| F|
+---+----+------+
# 或
join_df = df.join(gender_df, df.id == gender_df.id, how='left')
join_df = join_df.select(df.id, df.name, gender_df.gender)
# == 改值
join_df.withColumn('gender', F.when(F.col('gender')=='M', "1").otherwise("0")).show()
```
### One Column DataFrame ###
```python
spark.createDataFrame([("1",), ("2",), ("3",)], ['id']) # 逗號一定要
+---+
| id|
+---+
| 1|
| 2|
| 3|
+---+
```
### RDD ###
```python
# == inner join
>>> rdd1 = sc.parallelize([('cat', 2), ('cat', 5), ('book', 4), ('cat', 12)])
>>> rdd2 = sc.parallelize([('cat', 2), ('cup', 5), ('mouse', 4),('cat', 12)])
>>> rdd1.join(rdd2).collect()
[('cat', (2, 2)), ('cat', (2, 12)), ('cat', (5, 2)), ('cat', (5, 12)), ('cat', (12, 2)), ('cat', (12, 12))]
# 相當於
+----+---+----+---+
|name|cnt|name|cnt|
+----+---+----+---+
| cat| 2| cat| 2|
| cat| 2| cat| 12|
| cat| 5| cat| 2|
| cat| 5| cat| 12|
| cat| 12| cat| 2|
| cat| 12| cat| 12|
+----+---+----+---+
# == 交集
>>> rdd1=sc.parallelize(['1001','1002','1003'])
>>> rdd2=sc.parallelize(['1003','1004'])
>>> rdd1.intersection(rdd2).collect()
['1003']
# == 差集
>>> rdd1.subtract(rdd2).collect()
['1001', '1002']
# == 聯集
>>> rdd1.union(rdd2).collect()
['1001', '1002', '1003', '1003', '1004']
# == 組合
>>> rdd1.cartesian(rdd2).collect()
[('1001', '1003'), ('1001', '1004'), ('1002', '1003'), ('1002', '1004'), ('1003', '1003'), ('1003', '1004')]
```
### FileSystem ###
```python
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration
fs = FileSystem.get(Configuration())
fs.exists(Path('/path/to/filename'))
statuses = fs.listStatus(Path('/path/to/dir'))
statuses = fs.globStatus(Path('/path/to/dir/*.tsv'))
for status in statuses:
print(status.getPath())
```
更多請參考[Hadoop FileSystem API](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html)
### Json ###
```python
from pyspark.sql import SQLContext
df = spark.read.json('/path/to/json')
df = spark.read.json(Seq(jsonStr).toDS)
# Read
sqlctx = SQLContext(sc)
df = sqlctx.read.parquet('/path/to/parquet_root')
df.show()
df.printSchema()
```
### Parquet ###
```python
from pyspark.sql import SQLContext
# Save as parquet
df.write.parquet('/path/to/parquet_root', mode='overwrite')
# Read parquet
sqlctx = SQLContext(sc)
df = sqlctx.read.parquet('/path/to/parquet_root')
```
### Spark SQL ###
```python
# 註冊Scheme RDD
df.registerTempTable('users')
sqlctx.sql("SELECT * FROM users where name = 'peter'").show()
```
### Query ###
```python
# 取前n筆
df.select("id").take(n)
df.where(df.id < "0003").take(n)
# query
df.select("id") # 選某欄
df.select(df.id) # 選某欄
df.where(df.id < "0003") # 條件查詢
df.where(df.id == "0003")
df.filter("id == '0003'")
# 轉陣列
ary = df.where(df.id < "0003").collect()
# 某一筆轉Dict
ary[0].asDict()
```
### Sort ###
```python
df.rdd.sortBy(lambda row: row.name, ascending=False)
```
### GroupBy ###
```python
from pyspark.sql.functions import *
>>> tb1 = spark.createDataFrame([("dx001", 3), ("dx002", 6), ("dx003", 2), ("dx004", 1)], ["id", "pv"])
>>> tb2 = spark.createDataFrame([("dx001", 30), ("dx002", 60), ("dx003", 20), ("dx004", 10)], ["id", "pv"])
>>> tb12 = tb1.union(tb2)
>>> tb12.show()
+-----+---+
| id| pv|
+-----+---+
|dx001| 3|
|dx002| 6|
|dx003| 2|
|dx004| 1|
|dx001| 30|
|dx002| 60|
|dx003| 20|
|dx004| 10|
+-----+---+
### group後計算列數
>>> tb12.groupBy("id").count().show()
+-----+-----+
| id|count|
+-----+-----+
|dx002| 2|
|dx001| 2|
|dx004| 2|
|dx003| 2|
+-----+-----+
### group加總欄位
>>> tb12.groupBy("id").sum("pv").show() # 或
>>> tb12.groupBy("id").agg({"pv":"sum"}).show() # 或
>>> tb12.groupBy("id").agg(F.sum("pv").alias("sum(pv)")}).show()
+-----+-------+
| id|sum(pv)|
+-----+-------+
|dx002| 66|
|dx001| 33|
|dx004| 11|
|dx003| 22|
+-----+-------+
### 加總某欄位
>>> tb12.agg({'pv':'sum'}).show()
+-------+
|sum(pv)|
+-------+
| 132|
+-------+
#
#
# 例二:會員買購商品
#
#
>>> order_df = spark.createDataFrame([
("oid1", 'u1', 'item1'),
("oid2", 'u2', 'item2'),
("oid2", 'u3', 'item3'),
("oid3", 'u1', 'item4'),
("oid4", 'u2', 'item5'),
("oid5", 'u2', 'item5') # 刻意重複
], ["oid", "uid", "iid"])
### 會員的購買商品清單
>>> order_df.groupBy('uid').agg(F.concat_ws(",", F.collect_list("iid")).alias('items')).show()
+---+-----------------+
|uid| items|
+---+-----------------+
| u3| item3|
| u1| item1,item4|
| u2|item2,item5,item5|
+---+-----------------+
>>> order_df.groupBy('uid').agg(F.concat_ws(",", F.collect_set("iid")).alias('items')).show()
+---+-----------+
|uid| items|
+---+-----------+
| u3| item3|
| u1|item4,item1|
| u2|item5,item2|
+---+-----------+
### explode
>>> ui_df = order_df.groupBy('uid').agg(F.collect_set("iid").alias('items'))
+---+--------------+
|uid| items|
+---+--------------+
| u3| [item3]|
| u1|[item4, item1]|
| u2|[item5, item2]|
+---+--------------+
>>> ui_df.select(ui_df.uid, F.explode(ui_df.items).alias('item'))
+---+-----+
|uid| item|
+---+-----+
| u3|item3|
| u1|item4|
| u1|item1|
| u2|item5|
| u2|item2|
+---+-----+
```
### Merge ###
```python
df1 = spark.createDataFrame([('1', 'n1'), ('2', 'n2'), ('3', 'n3')], ['id', 'name'])
df2 = spark.createDataFrame([('1', 'n1a'), ('2', 'n2a'), ('4', 'n4')], ['id', 'name'])
>>> df1.show()
+---+----+
| id|name|
+---+----+
| 1| n1|
| 2| n2|
| 3| n3|
+---+----+
>>> df2.show()
+---+----+
| id|name|
+---+----+
| 1| n1a|
| 2| n2a|
| 4| n4|
+---+----+
# 先union取聯集的id
id_df = df1.union(df2).select('id').distinct()
df1 = df1.alias("df1")
df2 = df2.alias("df2")
df = id_df.join(df1, ['id'], how='left').join(df2, ['id'], how='left')
df = df.selectExpr('id', 'df1.name AS n1', 'df2.name AS n2')
df = df.withColumn('name', F.udf(lambda n1,n2: n2 if n2!=None else n1)(F.col('n1'), F.col('n2')))
df = df.selectExpr('id', 'name')
df.show()
+---+----+
| id|name|
+---+----+
| 3| n3|
| 1| n1a|
| 4| n4|
| 2| n2a|
+---+----+
```
### Distinct某欄 ###
```python
df1 = spark.createDataFrame([('1', 'n1'), ('2', 'n2'), ('3', 'n3')], ['id', 'name'])
df3 = spark.createDataFrame([('1', 'n1a'), ('2', 'n2a'), ('1', 'n1b')], ['id', 'name'])
>>> df1.show()
+---+----+
| id|name|
+---+----+
| 1| n1|
| 2| n2|
| 3| n3|
+---+----+
>>> df3.show()
+---+----+
| id|name|
+---+----+
| 1| n1a|
| 2| n2a|
| 1| n1b|
+---+----+
>>> df3prime = df3.groupBy('id').agg(F.concat_ws(" ", F.collect_set("name")).alias('name'))
+---+-------+
| id| name|
+---+-------+
| 1|n1a n1b|
| 2| n2a|
+---+-------+
>>> df1.join(df3, ['id'], how='left').show()
+---+----+----+
| id|name|name|
+---+----+----+
| 3| n3|null|
| 1| n1| n1a|
| 1| n1| n1b| # 會有重覆id
| 2| n2| n2a|
+---+----+----+
>>> df1.join(df3prime, ['id'], how='left').show()
+---+----+-------+
| id|name| name|
+---+----+-------+
| 3| n3| null|
| 1| n1|n1a n1b|
| 2| n2| n2a|
+---+----+-------+
```
### Join ###
```python
df1 = spark.createDataFrame([('1', 'n1'), ('2', 'n2'), ('3', 'n3'), ('4', 'n4'), ('5', 'n5')], ['id', 'name'])
df2 = spark.createDataFrame([('3', 'n3a'), ('4', 'n4a')], ['id', 'name'])
df3 = spark.createDataFrame([('6', 'n6'), ('4', 'n4a')], ['id', 'name'])
>>> df1.show()
+---+----+
| id|name|
+---+----+
| 1| n1|
| 2| n2|
| 3| n3|
| 4| n4|
| 5| n5|
+---+----+
>>> df2.show()
+---+----+
| id|name|
+---+----+
| 3| n3a|
| 4| n4a|
+---+----+
>>> df1.join(df2, ['id'], how='inner').show()
+---+----+----+
| id|name|name|
+---+----+----+
| 3| n3| n3a|
| 4| n4| n4a|
+---+----+----+
>>> df1.join(df2, ['id'], how='left_anti').show()
+---+----+
| id|name|
+---+----+
| 5| n5|
| 1| n1|
| 2| n2|
+---+----+
>>> df1.join(df2, ['id'], how='left_semi').show()
+---+----+
| id|name|
+---+----+
| 3| n3|
| 4| n4|
+---+----+
>>> df1.join(df3, ['id'], how='left').show()
+---+----+----+
| id|name|name|
+---+----+----+
| 3| n3|null|
| 5| n5|null|
| 1| n1|null|
| 4| n4| n4a|
| 2| n2|null|
+---+----+----+
>>> df1.join(df3, ['id'], how='outer').show()
+---+----+----+
| id|name|name|
+---+----+----+
| 3| n3|null|
| 5| n5|null|
| 6|null| n6|
| 1| n1|null|
| 4| n4| n4a|
| 2| n2|null|
+---+----+----+
```
## jupyter可跑pyspark
編輯`.bashrc`
```bash
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
```