# Spark秒上手 # ###### tags: `bigdata` `spark` ## 安裝 ## 以Ubuntu為例 **Step1:** 下載安裝 ```bash # 安裝scala sudo apt-get install scala # 安裝spark wget https://archive.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz sudo tar xvf spark-2.4.3-bin-hadoop2.7.tgz -C /usr/local/spark ``` **Step2:** 編輯 ~/.bashrc ```bash export JAVA_HOME="/path/to/java/home" export HADOOP_HOME=/opt/hadoop-2.7.7 export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin SPARK_HOME=/usr/local/spark export PATH=$SPARK_HOME/bin:$PATH ``` 註:Mac為`.bash_profile` **Step3:** 環境變數 在 `/usr/local/spark/conf` 複製出 `spark-env.sh`: ```bash # 指定encoding export PYTHONIOENCODING=utf8 # 指定python版本 export PYSPARK_PYTHON=python3.6 export PYSPARK_DRIVER_PYTHON=python3.6 # 指定spark-submit需要的變數 export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop ``` **Step4:** 執行 ```bash # 引入設定 source ~/.bashrc # 互動式介面 spark-shell # 或 env JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 spark-shell ``` ## 文檔範例: users.csv ``` 0001,john 0002,peter 0003,mary 0004,albee 0005,tom ``` ## 入門 ## ```python from pyspark.sql import SparkSession from pyspark import SparkContext from pyspark.sql import Row # 建立spark context spark = SparkSession.builder \ .master('local[*]') \ # 本機執行才需要! .appName("my app") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() sc = spark.sparkContext # 讀為RDD檔 file_rdd = sc.textFile("/path/to/users.csv") # RDD轉DataFrame csv_rdd = file_rdd.map(lambda line: line.split(",")) rows = csv_rdd.map(lambda cols: Row(id=cols[0], name=cols[1])) df = spark.createDataFrame(rows) # 資料 df.count() df.show() ``` ### DataFrame ### ```python # 建空的dataframe emp_df = spark.createDataFrame([], StructType([])) # == 以2D-Array建立 df = spark.createDataFrame([ (0, "John"), (1, "Mary") ], ["id", "name"]) df = spark.createDataFrame([ [0, "John"], [1, "Mary"] ], ["id", "name"]) # 也可以 df = spark.createDataFrame([ (0, "John"), (1, "Mary") ]) # 不定義schema也可以,則使用預設值_1, _2, _3 df.show() +---+----+ | id|name| +---+----+ | 0|John| | 1|Mary| +---+----+ # == 加欄位 import pyspark.sql.functions as F from pyspark.sql.types import * df = df.withColumn("email", F.udf(lambda name: name+"@mail.com")(F.col('name'))) # --或 df = df.withColumn("email", F.udf(lambda name: name+"@mail.com", StringType())(F.col('name'))) df.show() +---+----+-------------+ | id|name| email| +---+----+-------------+ | 0|John|John@mail.com| | 1|Mary|Mary@mail.com| +---+----+-------------+ # == Join gender_df = spark.createDataFrame([ (0, "M"), (1, "F") ], ["id", "gender"]) join_df = df.join(gender_df, df.id == gender_df.id, how='left') +---+----+---+------+ | id|name| id|gender| +---+----+---+------+ | 0|John| 0| M| | 1|Mary| 1| F| +---+----+---+------+ # == 選欄位 A = df.alias('A') G = gender_df.alias('G') join_df = A.join(G, A.id == B.id, how='left').selectExpr("A.id AS id", "A.name AS name", "G.gender AS gender") +---+----+------+ | id|name|gender| +---+----+------+ | 0|John| M| | 1|Mary| F| +---+----+------+ # 或 join_df = df.join(gender_df, df.id == gender_df.id, how='left') join_df = join_df.select(df.id, df.name, gender_df.gender) # == 改值 join_df.withColumn('gender', F.when(F.col('gender')=='M', "1").otherwise("0")).show() ``` ### One Column DataFrame ### ```python spark.createDataFrame([("1",), ("2",), ("3",)], ['id']) # 逗號一定要 +---+ | id| +---+ | 1| | 2| | 3| +---+ ``` ### RDD ### ```python # == inner join >>> rdd1 = sc.parallelize([('cat', 2), ('cat', 5), ('book', 4), ('cat', 12)]) >>> rdd2 = sc.parallelize([('cat', 2), ('cup', 5), ('mouse', 4),('cat', 12)]) >>> rdd1.join(rdd2).collect() [('cat', (2, 2)), ('cat', (2, 12)), ('cat', (5, 2)), ('cat', (5, 12)), ('cat', (12, 2)), ('cat', (12, 12))] # 相當於 +----+---+----+---+ |name|cnt|name|cnt| +----+---+----+---+ | cat| 2| cat| 2| | cat| 2| cat| 12| | cat| 5| cat| 2| | cat| 5| cat| 12| | cat| 12| cat| 2| | cat| 12| cat| 12| +----+---+----+---+ # == 交集 >>> rdd1=sc.parallelize(['1001','1002','1003']) >>> rdd2=sc.parallelize(['1003','1004']) >>> rdd1.intersection(rdd2).collect() ['1003'] # == 差集 >>> rdd1.subtract(rdd2).collect() ['1001', '1002'] # == 聯集 >>> rdd1.union(rdd2).collect() ['1001', '1002', '1003', '1003', '1004'] # == 組合 >>> rdd1.cartesian(rdd2).collect() [('1001', '1003'), ('1001', '1004'), ('1002', '1003'), ('1002', '1004'), ('1003', '1003'), ('1003', '1004')] ``` ### FileSystem ### ```python Path = sc._gateway.jvm.org.apache.hadoop.fs.Path FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration fs = FileSystem.get(Configuration()) fs.exists(Path('/path/to/filename')) statuses = fs.listStatus(Path('/path/to/dir')) statuses = fs.globStatus(Path('/path/to/dir/*.tsv')) for status in statuses: print(status.getPath()) ``` 更多請參考[Hadoop FileSystem API](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html) ### Json ### ```python from pyspark.sql import SQLContext df = spark.read.json('/path/to/json') df = spark.read.json(Seq(jsonStr).toDS) # Read sqlctx = SQLContext(sc) df = sqlctx.read.parquet('/path/to/parquet_root') df.show() df.printSchema() ``` ### Parquet ### ```python from pyspark.sql import SQLContext # Save as parquet df.write.parquet('/path/to/parquet_root', mode='overwrite') # Read parquet sqlctx = SQLContext(sc) df = sqlctx.read.parquet('/path/to/parquet_root') ``` ### Spark SQL ### ```python # 註冊Scheme RDD df.registerTempTable('users') sqlctx.sql("SELECT * FROM users where name = 'peter'").show() ``` ### Query ### ```python # 取前n筆 df.select("id").take(n) df.where(df.id < "0003").take(n) # query df.select("id") # 選某欄 df.select(df.id) # 選某欄 df.where(df.id < "0003") # 條件查詢 df.where(df.id == "0003") df.filter("id == '0003'") # 轉陣列 ary = df.where(df.id < "0003").collect() # 某一筆轉Dict ary[0].asDict() ``` ### Sort ### ```python df.rdd.sortBy(lambda row: row.name, ascending=False) ``` ### GroupBy ### ```python from pyspark.sql.functions import * >>> tb1 = spark.createDataFrame([("dx001", 3), ("dx002", 6), ("dx003", 2), ("dx004", 1)], ["id", "pv"]) >>> tb2 = spark.createDataFrame([("dx001", 30), ("dx002", 60), ("dx003", 20), ("dx004", 10)], ["id", "pv"]) >>> tb12 = tb1.union(tb2) >>> tb12.show() +-----+---+ | id| pv| +-----+---+ |dx001| 3| |dx002| 6| |dx003| 2| |dx004| 1| |dx001| 30| |dx002| 60| |dx003| 20| |dx004| 10| +-----+---+ ### group後計算列數 >>> tb12.groupBy("id").count().show() +-----+-----+ | id|count| +-----+-----+ |dx002| 2| |dx001| 2| |dx004| 2| |dx003| 2| +-----+-----+ ### group加總欄位 >>> tb12.groupBy("id").sum("pv").show() # 或 >>> tb12.groupBy("id").agg({"pv":"sum"}).show() # 或 >>> tb12.groupBy("id").agg(F.sum("pv").alias("sum(pv)")}).show() +-----+-------+ | id|sum(pv)| +-----+-------+ |dx002| 66| |dx001| 33| |dx004| 11| |dx003| 22| +-----+-------+ ### 加總某欄位 >>> tb12.agg({'pv':'sum'}).show() +-------+ |sum(pv)| +-------+ | 132| +-------+ # # # 例二:會員買購商品 # # >>> order_df = spark.createDataFrame([ ("oid1", 'u1', 'item1'), ("oid2", 'u2', 'item2'), ("oid2", 'u3', 'item3'), ("oid3", 'u1', 'item4'), ("oid4", 'u2', 'item5'), ("oid5", 'u2', 'item5') # 刻意重複 ], ["oid", "uid", "iid"]) ### 會員的購買商品清單 >>> order_df.groupBy('uid').agg(F.concat_ws(",", F.collect_list("iid")).alias('items')).show() +---+-----------------+ |uid| items| +---+-----------------+ | u3| item3| | u1| item1,item4| | u2|item2,item5,item5| +---+-----------------+ >>> order_df.groupBy('uid').agg(F.concat_ws(",", F.collect_set("iid")).alias('items')).show() +---+-----------+ |uid| items| +---+-----------+ | u3| item3| | u1|item4,item1| | u2|item5,item2| +---+-----------+ ### explode >>> ui_df = order_df.groupBy('uid').agg(F.collect_set("iid").alias('items')) +---+--------------+ |uid| items| +---+--------------+ | u3| [item3]| | u1|[item4, item1]| | u2|[item5, item2]| +---+--------------+ >>> ui_df.select(ui_df.uid, F.explode(ui_df.items).alias('item')) +---+-----+ |uid| item| +---+-----+ | u3|item3| | u1|item4| | u1|item1| | u2|item5| | u2|item2| +---+-----+ ``` ### Merge ### ```python df1 = spark.createDataFrame([('1', 'n1'), ('2', 'n2'), ('3', 'n3')], ['id', 'name']) df2 = spark.createDataFrame([('1', 'n1a'), ('2', 'n2a'), ('4', 'n4')], ['id', 'name']) >>> df1.show() +---+----+ | id|name| +---+----+ | 1| n1| | 2| n2| | 3| n3| +---+----+ >>> df2.show() +---+----+ | id|name| +---+----+ | 1| n1a| | 2| n2a| | 4| n4| +---+----+ # 先union取聯集的id id_df = df1.union(df2).select('id').distinct() df1 = df1.alias("df1") df2 = df2.alias("df2") df = id_df.join(df1, ['id'], how='left').join(df2, ['id'], how='left') df = df.selectExpr('id', 'df1.name AS n1', 'df2.name AS n2') df = df.withColumn('name', F.udf(lambda n1,n2: n2 if n2!=None else n1)(F.col('n1'), F.col('n2'))) df = df.selectExpr('id', 'name') df.show() +---+----+ | id|name| +---+----+ | 3| n3| | 1| n1a| | 4| n4| | 2| n2a| +---+----+ ``` ### Distinct某欄 ### ```python df1 = spark.createDataFrame([('1', 'n1'), ('2', 'n2'), ('3', 'n3')], ['id', 'name']) df3 = spark.createDataFrame([('1', 'n1a'), ('2', 'n2a'), ('1', 'n1b')], ['id', 'name']) >>> df1.show() +---+----+ | id|name| +---+----+ | 1| n1| | 2| n2| | 3| n3| +---+----+ >>> df3.show() +---+----+ | id|name| +---+----+ | 1| n1a| | 2| n2a| | 1| n1b| +---+----+ >>> df3prime = df3.groupBy('id').agg(F.concat_ws(" ", F.collect_set("name")).alias('name')) +---+-------+ | id| name| +---+-------+ | 1|n1a n1b| | 2| n2a| +---+-------+ >>> df1.join(df3, ['id'], how='left').show() +---+----+----+ | id|name|name| +---+----+----+ | 3| n3|null| | 1| n1| n1a| | 1| n1| n1b| # 會有重覆id | 2| n2| n2a| +---+----+----+ >>> df1.join(df3prime, ['id'], how='left').show() +---+----+-------+ | id|name| name| +---+----+-------+ | 3| n3| null| | 1| n1|n1a n1b| | 2| n2| n2a| +---+----+-------+ ``` ### Join ### ```python df1 = spark.createDataFrame([('1', 'n1'), ('2', 'n2'), ('3', 'n3'), ('4', 'n4'), ('5', 'n5')], ['id', 'name']) df2 = spark.createDataFrame([('3', 'n3a'), ('4', 'n4a')], ['id', 'name']) df3 = spark.createDataFrame([('6', 'n6'), ('4', 'n4a')], ['id', 'name']) >>> df1.show() +---+----+ | id|name| +---+----+ | 1| n1| | 2| n2| | 3| n3| | 4| n4| | 5| n5| +---+----+ >>> df2.show() +---+----+ | id|name| +---+----+ | 3| n3a| | 4| n4a| +---+----+ >>> df1.join(df2, ['id'], how='inner').show() +---+----+----+ | id|name|name| +---+----+----+ | 3| n3| n3a| | 4| n4| n4a| +---+----+----+ >>> df1.join(df2, ['id'], how='left_anti').show() +---+----+ | id|name| +---+----+ | 5| n5| | 1| n1| | 2| n2| +---+----+ >>> df1.join(df2, ['id'], how='left_semi').show() +---+----+ | id|name| +---+----+ | 3| n3| | 4| n4| +---+----+ >>> df1.join(df3, ['id'], how='left').show() +---+----+----+ | id|name|name| +---+----+----+ | 3| n3|null| | 5| n5|null| | 1| n1|null| | 4| n4| n4a| | 2| n2|null| +---+----+----+ >>> df1.join(df3, ['id'], how='outer').show() +---+----+----+ | id|name|name| +---+----+----+ | 3| n3|null| | 5| n5|null| | 6|null| n6| | 1| n1|null| | 4| n4| n4a| | 2| n2|null| +---+----+----+ ``` ## jupyter可跑pyspark 編輯`.bashrc` ```bash export PYSPARK_DRIVER_PYTHON=jupyter export PYSPARK_DRIVER_PYTHON_OPTS='notebook' ```