# Pyspark 筆記 ## pyspark 是什麼 **Python + Spark** 用 Python 呼叫 Spark Spark 是 distribute compute engine 在 PySpark 我們把資料切割成 RDD/DataFrame 來進行處理 DataFrame 其實是一個對資料的索引列表,可以用 SQL 的方式去操作,表內的資料分散在各個 partition 之內,所以對列表內的欄位做操作時很自然的可以把每個操作分別派給不同 partition 去操作。 相對的 sync 的成本就高,所以要避免用純 python 的角度去思考,比如 collect 回來成 list 再用 python 操作就是很低效的做法 每個 partition 內可能會有多筆資料,partition 內的資料處理是 Serial 的 ## pyspark api 文件去哪裡找 https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html 這絕對會是你的好朋友,記得 url 要針對現在用的版號修改,搜尋引擎可能會找到舊的 ## 如何看版本 `spark.version` ## DataFrame v.s RDD 參考 https://www.infoq.cn/article/three-apache-spark-apis-rdds-dataframes-and-datasets 因為我接下來要處理的資料結構全部都是字串,所以我選擇用 DataFrame 處理 DataFrame 也比較省空間,而且強型態 ## udf 是什麼 userDefineFunction 效率不太好的自訂 row data 處理 function 效率不好是因為要把 python code 一路轉成 java 能跑的結果再轉回來 ![](https://i.imgur.com/yFUEME5.png) ## DataFrame 基本操作 DataFrame 裡面的結構是 tuple,是不能修改的,所以要新刪改其實都是創造一個新的 dataframe 基本常用操作有 `withColumn`, `select` 而他們其實都是回傳一個 dataframe ,所以如果一次要刪除一個 column 再新增一個可以這樣串 e.g. `df_mapped = df_result.withColumn('listing', udf('DNSResult')).select(df_mapped.value, df_mapped.listing)` 一行代表下面這兩句 ``` df_mapped = df_result.withColumn('listing', udf('DNSResult')) df_mapped = df_mapped.select(df_mapped.value, df_mapped.listing) ``` 1. `df_result.withColumn('listing', udf('DNSResult'))` * withColumn 是 create new column,`listing` 是名稱,用把 `DNSResult` 這個 column 丟給 udf 來獲得的新 column 2. `.select(df_mapped.value, df_mapped.listing)` * select 會取出指定的 df_mapped.value 和 df_mapped.listing 來組成新的 Dataframe ## iterate column example 不能用 Python 本身的,要用 Pyspark 的來處理才能平行化 `df.foreach(f)` 而且因為 DataFrame 裡面的結構是 tuple,所以我不能用賦值蓋回去 然而我發現叫下去沒用 =_= 後來改用 udf(UserDefinedFunction) 大概長這樣 ```python udf = UserDefinedFunction(lambda x: print(x) x + 'newValue', StringType()) new_df = df.select(*[udf(value) for value in df.columns]) ``` 但是 lambda 只能有一行,不符合需求(Python 規定 lambda 只能有一行,不然就寫成 Function) 所以開始找文件[原廠文件在這](https://docs.databricks.com/spark/latest/spark-sql/udf-python.html) 所以改成這樣 https://spark.apache.org/docs/2.4.4/api/python/pyspark.sql.html ```python from pyspark.sql.functions import UserDefinedFunction import re IPv4_REGEX = r"(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})" patt = re.compile(IPv4_REGEX) # IPv4_REGEX = r"(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})" def get_reversed_ip(ip): # 1把 IP 反過來,這樣在 zonefile 裡面的結構比較容易查詢(原理為何?) -> zonefile 是從後往前查的 groups = patt.match(ip) reversedIP = "%s.%s.%s.%s" % (groups.group(4), groups.group(3), groups.group(2), groups.group(1)) return reversedIP udf = UserDefinedFunction(get_reversed_ip, StringType()) new_df = df.select(*[udf(value) for value in df.columns]) new_df.show() ``` ## Pyspark 的多重回傳值展開 用 explode 但展開之後組不太回去就是 :::info 可以用 agg + collect set 組回去 但這樣做效能跟記憶體上都不好 ::: ## 環境配置 我決定在 windows 的 anaconda3 上面跑 參考教學 https://zhuanlan.zhihu.com/p/37617055 ## DataFrame to list 用 `collect`,但每次做這個操作的時候都會把 `collect` 的 data sync 到同一台,所以效能不太好。 ## 打 DB Dataframe 可以用 write function 直接打進 table,但 Dataframe 本質上就是一個 table,所以雖然有提供 append 模式,但在寫入的時候 Dataframe 必須完全長的跟 table 一樣才行。 ## poc 測試指令 `python spark_submit.py /Users/rance_jen/Documents/ers_project/App-pyspark/app/LSSPreprocess/LSSPreprocessor.py` ## boto3 用 pip install --user boto3 來裝 反正把 error 拿去喂 google 就能拿到解答 ## ers_s3 放在 `Library-aws-utilities`,是 submodule 之一,在裡面打一下 ```shell git submodule init git submodule update --recursive ``` ## sc 是什麼 SparkContext 的 instance 由於在 spark shell 中會自動被初始化,所以官方範例他媽的都直接用,完全不解釋這是什麼 用指令在 jupyter notebook 起的話是這樣寫 `sc = SparkContext("local", "First App")` 在 pyspark2.0 之後通常建議用 SparkSession 所以如果起了 SparkSession 之後又嘗試起 SparkContext 就會跳出 > "ValueError: Cannot run multiple SparkContexts at once". ## Spark 中的 Application、SparkSession、SparkContext 關係為何 簡單一張圖 ![](https://i.imgur.com/rhJ6lde.png) 那我在 SparkSession 內要用 SparkContext 怎麼辦? `spark.sparkContext` 範例 ```python spark=SparkSession \ .builder \ .appName('LSS_POC') \ .getOrCreate() ips_sequence = {"127.0.0.1":"ABAB", "127.0.0.2":"BABA"} title = Row("ipv4", "ip_sequence") df_file = spark.sparkContext.parallelize([title(key, ips_sequence[key]) for key in ips_sequence]).toDF() ``` ## persist & cache 的差別 兩者皆是設定 storage level persist 的 level 是 disk&memory cache 的 level 是 memory ## 一次創建多個 column 的作法 第一種 for Dict:先組成 RDD 再轉 DF ```python ips_sequence = {'127.0.0.1':'ABAB', '127.0.0.2':'BANB'} # FileRDD == df_file title = Row('ipv4', 'ip_sequence') df_file = spark.sparkContext.parallelize([title(key, ips_sequence[key]) for key in ips_sequence]).toDF() +---------+-----------+ | ipv4|ip_sequence| +---------+-----------+ |127.0.0.1| ABAB| |127.0.0.2| BANB| +---------+-----------+ ``` 第二種 for list with tuple:直接塞 ```python v = sqlContext.createDataFrame([ ("a", "Alice", 34), ("b", "Bob", 36), ("c", "Charlie", 30), ], ["id", "name", "age"]) +---+-------+---+ | id| name|age| +---+-------+---+ | a| Alice| 34| | b| Bob| 36| | c|Charlie| 30| +---+-------+---+ ``` ## filter 裡面塞 sql stament,如果要用 regular 的話記得要用 rlike 不是 like ```python # CharRDD = df_char df_char = df_file.select(explode(split(df_file.ip_sequence, '')).alias('char'))\ .filter("char rlike '[a-z]'")\ .withColumn('count', lit(1)) df_char.show() ``` ## agg 避免使用 agg ,本質上是 collect 會有 OOM 的風險 尤其在 data 相當傾斜 (skewed) 的時候特別容易炸,可以將 agg 放在最後一步資料最少的時候再來處理 ## 多重參數進 udf 主要用 struct ```python from pyspark.sql.functions import struct def get_suffix(row): return row.char udf = UserDefinedFunction(get_suffix, StringType()) test = df_suffix.withColumn('new_prefix', udf(struct('char', 'fren'))) test.show() ``` ## 判斷 dataframe 是否為空 `df_fren_unqualified.head(1)` 會回傳一個 list 直接拿 list 轉 bool 就可以 ## append row 用 union(不過限定 column 數量要一樣) ## join 的條件寫法 有兩種,可以寫 `df.join(df2, df.name == df2.name, 'left').drop(df.name)` 或是寫 `df.join(df2, 'name', 'left')` 都可以 對不上的部份會是空的,可以用 `.na.drop()` 來清掉 ## udf 裡面嘗試 reference 其他 dataframe 的資料 並不能,完全不能,只能換一種方法寫。 udf 是打給 executor 的指令,但整個 dataframe 的 reference 是紀錄在 driver 的 executor 並沒有能力去 reference 到 dataframe [詳細解釋參考我](https://stackoverflow.com/questions/47509249/how-to-pass-dataframe-as-input-to-spark-udf) executor 能存取的只有傳進去跟 broadcast 出去的資料 這種情況只能換位要用這個 row 到底要被怎麼處理的角度去思考。 ## return type 的坑 StringType() 寫成 ArrayType(StringType()) 還是可以跑 只是出來都是 null ... ## 在 pyspark 裡面讀取 S3 的檔案 因為要從 S3 拉資料要走特定 port 不能直接用 open 要用 `smart_open.open` ## 寫檔案上 S3 ```python df_result.coalesce(1).write.format("json").option("header", "false").mode("append").save("s3://coretech-ers-test-tzuan/rance_poc.txt") ``` :::warning 空的 dataframe 嘗試寫入 S3 會噴 exception ::: ## graphframes tutorial https://hackmd.io/@Rance/Sy-37xYjr ## 除了 broadcast 以外偷渡 value 的方式 [範例參考](https://stackoverflow.com/questions/47906697/pyspark-pass-list-as-parameter-to-udf?rq=1) 創出一個 udf 的 obj 時就把創出 cate 的 function obj 並 bind 兩個參數,一個是 label_list,一個是 lambda 的 place holder,後面再帶入 col("distances") 給 lambda 填上之前空的 place holder,這樣 driver 把 udf 傳給 executor 的時候 label_list 就會偷渡過去,因為 label_list 已經被紀錄在 lambda 內的 cate function obj 內。 > provided by Vincent > ## 讀取 CSV 因為 spark 在存檔時沒辦法指定檔名,只能指定資料夾,故相對的在讀取時也指定資料夾就好 spark 預設的行為就是把下面所有檔案按照給定格式 load 近來 ```python schema = StructType([StructField('ipv4', StringType(), True)]) df_ipv4 = spark.read.csv(full_path, schema=schema) ``` ## date 偏移的計算 ```python target_date = datetime.datetime.strptime(CONST.DATE, "%Y%m%d") print(target_date) sIP_source = set() for shift_day in range(1, 8): log_date = (target_date - datetime.timedelta(days = shift_day)).strftime("%Y%2m%2d") full_path = '/'.join([CONST.S3_PREFIX, CONST.S3_ARQLOG_BUCKET, CONST.S3_ARQIPS_PATH]) + log_date ``` ## 用到 python utility 怎麼辦 ~~整份複製貼上~~ 可以用 --pyfile 帶上 EMR 之後 import 進來用即可 ## 要讀取 DBPASSWORD 這種敏感資料怎麼辦 用 SSM(AWS System manager 的 Parameter store) 佈署方式在 CloudFormation 的 parameter-store.yml 會負責配置 但 parameter-store.yml 裡面也不能真的直接寫出 password 所以裡面會再去吃 circleCI 的 env value 來帶上去。 ## sqlite3 狀況如下: ![](https://i.imgur.com/CfuJ0tB.png) [參考這裡](https://stackoverflow.com/questions/21154180/frequent-operationalerror-unable-to-open-database-file-with-in-memory-sqlite3) Emr 上面沒有寫檔案的權限,所以開 `con = sqlite3.Connection('local_query_volume')` 會直接陣亡 最後是用 in-memory DB 解掉的,語法是 `conn = sqlite3.connect(":memory:")` ## mysql 的雷 1. dataframe 可以直接讀一個「table」,所以如果要拿 select 出來的資料必須包起來變成一個 table 像是 `(select xxx from ooo) newTable` 2. 欄位名不可以包含特殊字元像是 `SELECT CONCAT(INET_NTOA(snet), "/32") from xxx`,因為這樣出來的 column 名稱會包含特殊字元,要改成 `SELECT CONCAT(INET_NTOA(snet), "/32") as result from xxx` ## history http://emr-user.test.ers.a1q7.net:18080/ 這裡可以看到所有 spark 運作紀錄 點進指定的 job 之後可以案右上方 sql 來看 get input 跟 output 的端點 ## 看 hadoop 的狀況 http://emr-user.test.ers.a1q7.net:8088/cluster/apps/RUNNING 配合 http://emr-user.prod.ers.a1q7.net:8998/batches?size=100000000 可以查到所有目前正在跑得 livy log ## read as dataframe 的雷 spark 本身是 java,所以儲存方式也是 java 如果把 intip 直接讀進去, class A 超過 127 的會直接 overflow 在 dataframe 中用負數表示,就此消失 query 不到 XDDD 要用 LongType 去接.... ## step function debug 去 step function 執行 參數去 cloudwatch event 看 https://us-west-2.console.aws.amazon.com/cloudwatch/home?region=us-west-2#rules: 選定之後右上角案 edit > date 的格式會是 `2018-01-15T 11:00:00Z` 執行後去 step function 看執行狀況 如果 fail 的話去 cloudwatch 看 log https://us-west-2.console.aws.amazon.com/cloudwatch/home?region=us-west-2#logStream:group=LogGroup-test-ers-ETL-failure 看 livy log 的話要去 http://emr-user.prod.ers.a1q7.net:8998/ui/batch/83830/log 83830 改成這個 job id, job id 在 hadoop 看得到 ## RecursionError !? ```python Traceback (most recent call last): File "/mnt/tmp/spark-dc3c6e02-c2b4-437c-b47d-2bae381807f7/LSSBuildClustering.py", line 184, in <module> val_ac_sprefixs = spark.sparkContext.broadcast(AhoTrie(sprefixs)) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 874, in broadcast File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/broadcast.py", line 91, in __init__ File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/broadcast.py", line 120, in dump _pickle.PicklingError: Could not serialize broadcast: RecursionError: maximum recursion depth exceeded while calling a Python object ``` dump 的意思是 serialize 一個 object,在 serialize 的過程中會用 recursive 的方式去走一個 object 然後我傳入一個 trie 就讓他走爆了。 比較簡單的解決方案是用 singleton 去做(在 flow 上處理) or 用 DoubleArrayACTrie 去做(在 Data Struct 上處理) ## singleton ```python class Singleton(type): _instances = {} _cur_id = -1 def __call__(cls, *args, **kwargs): if cls in cls._instances and cls._cur_id == id(args[0]): return cls._instances[cls] cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) cls._cur_id = id(args[0]) class AhoTrie(metaclass=Singleton): ``` 這是實測可用的寫法 但這會導致在 global 的等級 construct 一次之後就不再更新內容 今天我是寫在一個 while 裡面想更西內容時就會改不了 這個時候可以取 construct list 的 id 判斷,~~這個 id 是 memory address(中文亂翻)~~ > This is an integer (or long integer) which is guaranteed to be unique and constant for this object ## unpersist ~~放在 code 最下面依然會爆炸 ~~搞不是很懂~~ ~~開 blocking 一樣掛~~ ~~掛在 udf 裡面~~ 只有 destroy 不能用 ## 強制 spark gc 的寫法 ```python def notify_spark_gc(spark): spark.sparkContext._jvm.System.gc() ``` ## 強制 python gc 的寫法 ```python gc.collect() ``` ## Parallel 後面不能接 job I don't know why either 沒人嘗試在 Parallel 後面接 job 過,我在整個 core-tech github 搜尋過 大家頂多就是 Parallel 後面接 End 嘗試接就會跳出 runtime error ![](https://i.imgur.com/QydD1RQ.png) 我嘗試過把順序反過來就可以正常運行了 ## Parallel 會搶 memory 我嘗試開四個 Parallel 的 job 就一直得到 ![](https://i.imgur.com/7gI82mv.png) 算一下它是要不到多少記憶體 ![](https://i.imgur.com/oJa12jU.png) 連要 600 mb 都可以 OOM !? 可能因為我們 Master 開太廢 XD ![](https://i.imgur.com/kjsJ45O.png) 這配備比我的小米筆記本還慘 相對的 prod 的配置好多了 ![](https://i.imgur.com/uhcN8cf.png) ## 額外 config https://aws.amazon.com/tw/blogs/big-data/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/ config 懶人包 ```python CONFIG = SparkConf().setAll([ ('spark.executor.memoryOverhead', '4096'), ("spark.memory.fraction", "0.8"), ("spark.memory.storageFraction", "0.4"), ("spark.scheduler.listenerbus.eventqueue.capacity", "100000"), ("spark.executor.memory", "8g"), ("spark.driver.memory", "8g"), # Recommand by aws doc https://aws.amazon.com/tw/blogs/big-data/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/ ("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:OnOutOfMemoryError='kill -9 %p'"), ("spark.driver.extraJavaOptions", "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:OnOutOfMemoryError='kill -9 %p'") ]) ``` 注意 ``` spark.executor.memory spark.driver.memory ``` 只是 spark 「可以 alloc 的上限」,並不代表設了就一定可以拿到,就算設很大如果那一台 master 已經沒 memory 了,要 600 mb 也可以當場暴斃 ## ERS_S3 不支援 Python3 重寫了一個 ```python def S3_upload_obj(fd, output_bucket, output_path, sns_arn): RETURN_STATUS_SUCCESS = 0 def normalize(value): return value.rstrip().rstrip("/").lstrip("/") s3_client = boto3.client('s3') res = s3_client.put_object(Bucket = normalize(output_bucket), Key = normalize(output_path), Body = fd, ServerSideEncryption = "AES256") return res ``` ## livy api `http://emr-user.prod.ers.a1q7.net:8998/batches?size=100000000` ## 雞尾酒 Don't upload zero dataframe Broadcast before join 2 paralle cut sequence sum+join before agg ![](https://i.imgur.com/EPmp3Z2.png) ![](https://i.imgur.com/fjGvaN7.png) ![](https://i.imgur.com/7vL1TXY.png) ## memory allocate spark 的初始 memory allocate = executor * spark.executor.memory + spark.driver.memory + N(一些誤差) 初始大小 allocate 不到的話會一直待機,待機超過一定的時間會 timeout 初始大小不夠用的話會自動往上 allocate 有一部份的 buffer 似乎一定會 dynamic allocate,因此如果整個 hadoop 已經滿了就會跳 oom lock 沒寫好 log 會亂跳 RDD no replica 之類的狂刷 ## dealing data craw 直接 agg 太大量的資料會掛掉,有以下幾種解決方法 1. pre filter 很直觀,把資料量變少問題就變小 2. 2 way reduce 分兩次做 reduceByKey,先做一次 local 再做一次 global ex. `[[hello, 5], [hello, 6], [hello, 2], [hello, 3]]` 先隨機上個 random tag -> `[[hello, 5, 2], [hello, 6, 1], [hello, 2, 2], [hello, 3, 1]]` -> `[[hello, 7, 2], [hello, 9, 1]` 對每個 executor 都做完這種事情之後再做 glocal reduceByKey 沒有解決 data craw 的本質但是減少了 shuffle 的運算量 3. 用 broadcast 躲過 shuffle dataframe 需要 shuffle 是因為在 join 時不知道每個 row 的確切位置要打到那,那如果今天我 join 的表可以切成一大一小的話。 只要把小的 broadcast 到所有的 executor 上就可以去除 shuffle 的必要性,因為每個 executor 都已經有完整的資訊可以做 join 了 4. 切開分別做 join 大表切小表,分開 join 最後再 union 有相依性的可以先套上一個 key 之後切開再 join 再 join 基本上就是第二種方法的延伸。 ## What my problem OOM 但有多種原因同時造成 ### Boss 1 #### 症狀:Aggregate 太大的資料 我一開始把 suffix 整個 explode 之後全部做 agg ![](https://i.imgur.com/qcGrWk5.png) 特徵:有時候會過,有時候不會過,因為每次執行 spark job 時 partition 的分區都不同,可能 data 會歪,可能不會,歪太多 memory 放不下就會 oom,反正就是會隨機爆炸。 #### 解決方案:把 memory 開大 看到 oom ,大家的第一個反應都是這樣 `spark.executor.memory", "16g` #### 後遺症:沒有解決 Aggregate 時 data craw 的問題 ### Boss 2 #### 症狀:parallel 走不動 Spark 起機器時 allocate memory 的計算公式如下 `executor * spark.executor.memory + spark.driver.memory + N(一些 buffer)` 如果太長時間要不到,就會 timeout。 特徵:因為 step function 打 livy 到 hadoop 接受的順序沒有保證,因此上去 Hadoop metrics 看會比較準。 #### 解決方案:把 parallel 砍掉 我把四個砍成兩個,能動了,暫時。 ### Boss 1 再臨 #### 症狀:Aggregate 時還是有機率炸掉 特徵:因為在 Agg 時 memory 使用量會突然增加,當兩個 job 剛好把 emr 的 memory 吃滿的時候,其中一個 job 要就會 oom #### 解決方案:先 reduceByKey 再 filter 之後才 Aggregate 大幅減少需要 aggregate 的資料量,從幾十萬筆 -> 幾千筆,差了 100 倍 因為步驟多了,速度其實變慢了,所以我又把 parallel 開起來。 ### Boss 2 再臨 #### 症狀:EMR 被吃太滿常常會有一個 job 被撞到 oom 兩個起來總共會卡住三百多快四百的 memory,後面排太多的時候會有一個 job 被撞到 oom #### 解決方案:把 parallel 拔掉,並且改成動態的 allocate memory 現在剛啟動時只會吃 100 多 g 了 ### Boss 3 #### 症狀:LSS 跑比較久,當後面需要大量 memory 的 job 排太多,LSS 會被撞下去 Spark 會動態的分配記憶體,如果其他 job 都跑完了就多分一點給 LSS,沒有的話 LSS 靠一百多 G 也能正常跑,然而 LSS 要跑 256 個 Class A 需要 2~3 個小時的時間才能跑完。當後面累積的 job 越來越多,LSS 會被其他人撞下去。 :::info 或是 EMR 會掛掉...... ::: #### 車禍現場 ![](https://i.imgur.com/dWJfZLy.png) ## PySpark 潛規則 我們沒有設定 spark 搶資源的上限,基本上就是閒置多少搶多少,所以我們 spark 最好都跑一些很快就能處理掉的 task 就算不能很快處理掉也要切成小塊的避免長佔資源,長佔資源導致其他 job 累積太多的話 emr 有可能直接掛掉,或是 emr 直接幹掉這個 job。 ### How to verify Clouformation template 1. Go to AWS Cloudformation stack console ![](https://i.imgur.com/r7K6aRb.png) 2. Choose Previous console and select the stack which you want to update ![](https://i.imgur.com/c5mFM7r.png) 3. Select update stack and pass you yaml file ![](https://i.imgur.com/CM6FT3m.png) 4. You will get the result about is syntx validate or not ![](https://i.imgur.com/ITgEptJ.png) ### How to get the reason why CloudFormation deployment failed 1. Go to AWS Cloudformation stack console ![](https://i.imgur.com/r7K6aRb.png) 2. Choose Previous console and select the stack which you want to know ![](https://i.imgur.com/c5mFM7r.png) 3. You will find the error message like below ![](https://i.imgur.com/3JBoZNB.png) #### If you want full error message 0. install AWS-CLI on your machine 1. Set AWS STS on your machine 2. `aws cloudformation describe-stack-events --stack-name [STACKNAME] --max-items 1000 > cf.log` 3. `cf.log` will contain full log like below ![](https://i.imgur.com/CX6ZOek.png) ### If your InputTransformer reach the length limit [doc](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-events-rule-inputtransformer.html) ![](https://i.imgur.com/hw8xjzW.png) Then split your InputTransformer to diffrent Target like bellow ![](https://i.imgur.com/AioUxcw.png) ### StateMachine bug StateMachine 的定義寫錯 deploy 並不會失敗 只有跑到最後面的時候會掛掉,還有流程圖會畫不出來 上去 stepfunction 案 edit 之後刷新流程圖就可以看到定義哪裡寫錯 或是直接丟給 json formatter 也可以找出來 ## column name 有 . 怎麼辦? ``` "`tmase_result.total_score`" ``` 用這種格式去拿 ## 限制 executor 數量 `('spark.dynamicAllocation.maxExecutors', '8'),` 預設是無限制,無限制在 shuffle 或是 upload 的時候特別容易 oom 因為會 allocate 一大堆 uploader ## pyspark 沒 log 應更改執行時的 input `"spark.submit.deployMode": "cluster"` 改成 client 即可 ```json { "FromAddrCount": { "env": "test", "region": "us-west-2", "s3_script_bucket": "coretech-ers-test-emr-us-west-2/app", "proj_name": "EurekaPreprocessedLogFromAddrCount", "wait_time": 30, "file": "EurekaPreprocessedLogFromAddrCount.py", "args": [ "--dtime", "2020-10-13T06:15:00Z", "--env", "test", "--region", "us-west-2" ], "conf": { "spark.submit.deployMode": "client" }, "py3": true } } ``` 用比喻好了 Spark 是一人力派遣公司 裡面有兩種人 (老闆) Master Node - (員工) Worker Nodes 當接到了一個案子 Spark Job, 老闆要選出一個工頭(Driver)和一些工人(Excutors)幫忙做事 deploy mode: client, 老闆(Master node)自己當工頭(Driver) deploy mode: cluster, 老闆(Master node)選一個員工(worker node)當工頭 所以cluster mode就是老闆不會被操爆