# Pyspark 筆記
## pyspark 是什麼
**Python + Spark**
用 Python 呼叫 Spark
Spark 是 distribute compute engine
在 PySpark 我們把資料切割成 RDD/DataFrame 來進行處理
DataFrame 其實是一個對資料的索引列表,可以用 SQL 的方式去操作,表內的資料分散在各個 partition 之內,所以對列表內的欄位做操作時很自然的可以把每個操作分別派給不同 partition 去操作。
相對的 sync 的成本就高,所以要避免用純 python 的角度去思考,比如 collect 回來成 list 再用 python 操作就是很低效的做法
每個 partition 內可能會有多筆資料,partition 內的資料處理是 Serial 的
## pyspark api 文件去哪裡找
https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html
這絕對會是你的好朋友,記得 url 要針對現在用的版號修改,搜尋引擎可能會找到舊的
## 如何看版本
`spark.version`
## DataFrame v.s RDD
參考 https://www.infoq.cn/article/three-apache-spark-apis-rdds-dataframes-and-datasets
因為我接下來要處理的資料結構全部都是字串,所以我選擇用 DataFrame 處理
DataFrame 也比較省空間,而且強型態
## udf 是什麼
userDefineFunction
效率不太好的自訂 row data 處理 function
效率不好是因為要把 python code 一路轉成 java 能跑的結果再轉回來
![](https://i.imgur.com/yFUEME5.png)
## DataFrame 基本操作
DataFrame 裡面的結構是 tuple,是不能修改的,所以要新刪改其實都是創造一個新的 dataframe
基本常用操作有
`withColumn`, `select`
而他們其實都是回傳一個 dataframe ,所以如果一次要刪除一個 column 再新增一個可以這樣串
e.g.
`df_mapped = df_result.withColumn('listing', udf('DNSResult')).select(df_mapped.value, df_mapped.listing)`
一行代表下面這兩句
```
df_mapped = df_result.withColumn('listing', udf('DNSResult'))
df_mapped = df_mapped.select(df_mapped.value, df_mapped.listing)
```
1. `df_result.withColumn('listing', udf('DNSResult'))`
* withColumn 是 create new column,`listing` 是名稱,用把 `DNSResult` 這個 column 丟給 udf 來獲得的新 column
2. `.select(df_mapped.value, df_mapped.listing)`
* select 會取出指定的 df_mapped.value 和 df_mapped.listing 來組成新的 Dataframe
## iterate column example
不能用 Python 本身的,要用 Pyspark 的來處理才能平行化
`df.foreach(f)`
而且因為 DataFrame 裡面的結構是 tuple,所以我不能用賦值蓋回去
然而我發現叫下去沒用 =_=
後來改用 udf(UserDefinedFunction)
大概長這樣
```python
udf = UserDefinedFunction(lambda x: print(x) x + 'newValue', StringType())
new_df = df.select(*[udf(value) for value in df.columns])
```
但是 lambda 只能有一行,不符合需求(Python 規定 lambda 只能有一行,不然就寫成 Function)
所以開始找文件[原廠文件在這](https://docs.databricks.com/spark/latest/spark-sql/udf-python.html)
所以改成這樣
https://spark.apache.org/docs/2.4.4/api/python/pyspark.sql.html
```python
from pyspark.sql.functions import UserDefinedFunction
import re
IPv4_REGEX = r"(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})"
patt = re.compile(IPv4_REGEX) # IPv4_REGEX = r"(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})"
def get_reversed_ip(ip): # 1把 IP 反過來,這樣在 zonefile 裡面的結構比較容易查詢(原理為何?) -> zonefile 是從後往前查的
groups = patt.match(ip)
reversedIP = "%s.%s.%s.%s" % (groups.group(4), groups.group(3), groups.group(2), groups.group(1))
return reversedIP
udf = UserDefinedFunction(get_reversed_ip, StringType())
new_df = df.select(*[udf(value) for value in df.columns])
new_df.show()
```
## Pyspark 的多重回傳值展開
用 explode
但展開之後組不太回去就是
:::info
可以用 agg + collect set 組回去
但這樣做效能跟記憶體上都不好
:::
## 環境配置
我決定在 windows 的 anaconda3 上面跑
參考教學 https://zhuanlan.zhihu.com/p/37617055
## DataFrame to list
用 `collect`,但每次做這個操作的時候都會把 `collect` 的 data sync 到同一台,所以效能不太好。
## 打 DB
Dataframe 可以用 write function 直接打進 table,但 Dataframe 本質上就是一個 table,所以雖然有提供 append 模式,但在寫入的時候 Dataframe 必須完全長的跟 table 一樣才行。
## poc 測試指令
`python spark_submit.py /Users/rance_jen/Documents/ers_project/App-pyspark/app/LSSPreprocess/LSSPreprocessor.py`
## boto3
用 pip install --user boto3 來裝
反正把 error 拿去喂 google 就能拿到解答
## ers_s3
放在 `Library-aws-utilities`,是 submodule 之一,在裡面打一下
```shell
git submodule init
git submodule update --recursive
```
## sc 是什麼
SparkContext 的 instance
由於在 spark shell 中會自動被初始化,所以官方範例他媽的都直接用,完全不解釋這是什麼
用指令在 jupyter notebook 起的話是這樣寫
`sc = SparkContext("local", "First App")`
在 pyspark2.0 之後通常建議用 SparkSession
所以如果起了 SparkSession 之後又嘗試起 SparkContext 就會跳出
> "ValueError: Cannot run multiple SparkContexts at once".
## Spark 中的 Application、SparkSession、SparkContext 關係為何
簡單一張圖
![](https://i.imgur.com/rhJ6lde.png)
那我在 SparkSession 內要用 SparkContext 怎麼辦?
`spark.sparkContext`
範例
```python
spark=SparkSession \
.builder \
.appName('LSS_POC') \
.getOrCreate()
ips_sequence = {"127.0.0.1":"ABAB", "127.0.0.2":"BABA"}
title = Row("ipv4", "ip_sequence")
df_file = spark.sparkContext.parallelize([title(key, ips_sequence[key]) for key in ips_sequence]).toDF()
```
## persist & cache 的差別
兩者皆是設定 storage level
persist 的 level 是 disk&memory
cache 的 level 是 memory
## 一次創建多個 column 的作法
第一種 for Dict:先組成 RDD 再轉 DF
```python
ips_sequence = {'127.0.0.1':'ABAB', '127.0.0.2':'BANB'}
# FileRDD == df_file
title = Row('ipv4', 'ip_sequence')
df_file = spark.sparkContext.parallelize([title(key, ips_sequence[key]) for key in ips_sequence]).toDF()
+---------+-----------+
| ipv4|ip_sequence|
+---------+-----------+
|127.0.0.1| ABAB|
|127.0.0.2| BANB|
+---------+-----------+
```
第二種 for list with tuple:直接塞
```python
v = sqlContext.createDataFrame([
("a", "Alice", 34),
("b", "Bob", 36),
("c", "Charlie", 30),
], ["id", "name", "age"])
+---+-------+---+
| id| name|age|
+---+-------+---+
| a| Alice| 34|
| b| Bob| 36|
| c|Charlie| 30|
+---+-------+---+
```
## filter
裡面塞 sql stament,如果要用 regular 的話記得要用 rlike 不是 like
```python
# CharRDD = df_char
df_char = df_file.select(explode(split(df_file.ip_sequence, '')).alias('char'))\
.filter("char rlike '[a-z]'")\
.withColumn('count', lit(1))
df_char.show()
```
## agg
避免使用 agg ,本質上是 collect 會有 OOM 的風險
尤其在 data 相當傾斜 (skewed) 的時候特別容易炸,可以將 agg 放在最後一步資料最少的時候再來處理
## 多重參數進 udf
主要用 struct
```python
from pyspark.sql.functions import struct
def get_suffix(row):
return row.char
udf = UserDefinedFunction(get_suffix, StringType())
test = df_suffix.withColumn('new_prefix', udf(struct('char', 'fren')))
test.show()
```
## 判斷 dataframe 是否為空
`df_fren_unqualified.head(1)`
會回傳一個 list
直接拿 list 轉 bool 就可以
## append row
用 union(不過限定 column 數量要一樣)
## join 的條件寫法
有兩種,可以寫
`df.join(df2, df.name == df2.name, 'left').drop(df.name)`
或是寫
`df.join(df2, 'name', 'left')` 都可以
對不上的部份會是空的,可以用 `.na.drop()` 來清掉
## udf 裡面嘗試 reference 其他 dataframe 的資料
並不能,完全不能,只能換一種方法寫。
udf 是打給 executor 的指令,但整個 dataframe 的 reference 是紀錄在 driver 的
executor 並沒有能力去 reference 到 dataframe
[詳細解釋參考我](https://stackoverflow.com/questions/47509249/how-to-pass-dataframe-as-input-to-spark-udf)
executor 能存取的只有傳進去跟 broadcast 出去的資料
這種情況只能換位要用這個 row 到底要被怎麼處理的角度去思考。
## return type 的坑
StringType() 寫成 ArrayType(StringType()) 還是可以跑
只是出來都是 null ...
## 在 pyspark 裡面讀取 S3 的檔案
因為要從 S3 拉資料要走特定 port
不能直接用 open 要用 `smart_open.open`
## 寫檔案上 S3
```python
df_result.coalesce(1).write.format("json").option("header", "false").mode("append").save("s3://coretech-ers-test-tzuan/rance_poc.txt")
```
:::warning
空的 dataframe 嘗試寫入 S3 會噴 exception
:::
## graphframes tutorial
https://hackmd.io/@Rance/Sy-37xYjr
## 除了 broadcast 以外偷渡 value 的方式
[範例參考](https://stackoverflow.com/questions/47906697/pyspark-pass-list-as-parameter-to-udf?rq=1)
創出一個 udf 的 obj 時就把創出 cate 的 function obj 並 bind 兩個參數,一個是 label_list,一個是 lambda 的 place holder,後面再帶入 col("distances") 給 lambda 填上之前空的 place holder,這樣 driver 把 udf 傳給 executor 的時候 label_list 就會偷渡過去,因為 label_list 已經被紀錄在 lambda 內的 cate function obj 內。
> provided by Vincent
>
## 讀取 CSV
因為 spark 在存檔時沒辦法指定檔名,只能指定資料夾,故相對的在讀取時也指定資料夾就好
spark 預設的行為就是把下面所有檔案按照給定格式 load 近來
```python
schema = StructType([StructField('ipv4', StringType(), True)])
df_ipv4 = spark.read.csv(full_path, schema=schema)
```
## date 偏移的計算
```python
target_date = datetime.datetime.strptime(CONST.DATE, "%Y%m%d")
print(target_date)
sIP_source = set()
for shift_day in range(1, 8):
log_date = (target_date - datetime.timedelta(days = shift_day)).strftime("%Y%2m%2d")
full_path = '/'.join([CONST.S3_PREFIX, CONST.S3_ARQLOG_BUCKET, CONST.S3_ARQIPS_PATH]) + log_date
```
## 用到 python utility 怎麼辦
~~整份複製貼上~~
可以用 --pyfile 帶上 EMR 之後 import 進來用即可
## 要讀取 DBPASSWORD 這種敏感資料怎麼辦
用 SSM(AWS System manager 的 Parameter store)
佈署方式在 CloudFormation 的 parameter-store.yml 會負責配置
但 parameter-store.yml 裡面也不能真的直接寫出 password
所以裡面會再去吃 circleCI 的 env value 來帶上去。
## sqlite3
狀況如下:
![](https://i.imgur.com/CfuJ0tB.png)
[參考這裡](https://stackoverflow.com/questions/21154180/frequent-operationalerror-unable-to-open-database-file-with-in-memory-sqlite3)
Emr 上面沒有寫檔案的權限,所以開 `con = sqlite3.Connection('local_query_volume')` 會直接陣亡
最後是用 in-memory DB 解掉的,語法是 `conn = sqlite3.connect(":memory:")`
## mysql 的雷
1. dataframe 可以直接讀一個「table」,所以如果要拿 select 出來的資料必須包起來變成一個 table 像是 `(select xxx from ooo) newTable`
2. 欄位名不可以包含特殊字元像是 `SELECT CONCAT(INET_NTOA(snet), "/32") from xxx`,因為這樣出來的 column 名稱會包含特殊字元,要改成 `SELECT CONCAT(INET_NTOA(snet), "/32") as result from xxx`
## history
http://emr-user.test.ers.a1q7.net:18080/
這裡可以看到所有 spark 運作紀錄
點進指定的 job 之後可以案右上方 sql 來看 get input 跟 output 的端點
## 看 hadoop 的狀況
http://emr-user.test.ers.a1q7.net:8088/cluster/apps/RUNNING
配合 http://emr-user.prod.ers.a1q7.net:8998/batches?size=100000000 可以查到所有目前正在跑得 livy log
## read as dataframe 的雷
spark 本身是 java,所以儲存方式也是 java
如果把 intip 直接讀進去, class A 超過 127 的會直接 overflow
在 dataframe 中用負數表示,就此消失 query 不到 XDDD
要用 LongType 去接....
## step function debug
去 step function 執行
參數去 cloudwatch event 看
https://us-west-2.console.aws.amazon.com/cloudwatch/home?region=us-west-2#rules:
選定之後右上角案 edit
> date 的格式會是 `2018-01-15T 11:00:00Z`
執行後去 step function 看執行狀況
如果 fail 的話去 cloudwatch 看 log
https://us-west-2.console.aws.amazon.com/cloudwatch/home?region=us-west-2#logStream:group=LogGroup-test-ers-ETL-failure
看 livy log 的話要去
http://emr-user.prod.ers.a1q7.net:8998/ui/batch/83830/log
83830 改成這個 job id, job id 在 hadoop 看得到
## RecursionError !?
```python
Traceback (most recent call last):
File "/mnt/tmp/spark-dc3c6e02-c2b4-437c-b47d-2bae381807f7/LSSBuildClustering.py", line 184, in <module>
val_ac_sprefixs = spark.sparkContext.broadcast(AhoTrie(sprefixs))
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 874, in broadcast
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/broadcast.py", line 91, in __init__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/broadcast.py", line 120, in dump
_pickle.PicklingError: Could not serialize broadcast: RecursionError: maximum recursion depth exceeded while calling a Python object
```
dump 的意思是 serialize 一個 object,在 serialize 的過程中會用 recursive 的方式去走一個 object
然後我傳入一個 trie 就讓他走爆了。
比較簡單的解決方案是用 singleton 去做(在 flow 上處理) or 用 DoubleArrayACTrie 去做(在 Data Struct 上處理)
## singleton
```python
class Singleton(type):
_instances = {}
_cur_id = -1
def __call__(cls, *args, **kwargs):
if cls in cls._instances and cls._cur_id == id(args[0]):
return cls._instances[cls]
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
cls._cur_id = id(args[0])
class AhoTrie(metaclass=Singleton):
```
這是實測可用的寫法
但這會導致在 global 的等級 construct 一次之後就不再更新內容
今天我是寫在一個 while 裡面想更西內容時就會改不了
這個時候可以取 construct list 的 id 判斷,~~這個 id 是 memory address(中文亂翻)~~
> This is an integer (or long integer) which is guaranteed to be unique and constant for this object
## unpersist
~~放在 code 最下面依然會爆炸
~~搞不是很懂~~
~~開 blocking 一樣掛~~
~~掛在 udf 裡面~~
只有 destroy 不能用
## 強制 spark gc 的寫法
```python
def notify_spark_gc(spark):
spark.sparkContext._jvm.System.gc()
```
## 強制 python gc 的寫法
```python
gc.collect()
```
## Parallel 後面不能接 job
I don't know why either
沒人嘗試在 Parallel 後面接 job 過,我在整個 core-tech github 搜尋過
大家頂多就是 Parallel 後面接 End
嘗試接就會跳出 runtime error
![](https://i.imgur.com/QydD1RQ.png)
我嘗試過把順序反過來就可以正常運行了
## Parallel 會搶 memory
我嘗試開四個 Parallel 的 job 就一直得到
![](https://i.imgur.com/7gI82mv.png)
算一下它是要不到多少記憶體
![](https://i.imgur.com/oJa12jU.png)
連要 600 mb 都可以 OOM !?
可能因為我們 Master 開太廢 XD
![](https://i.imgur.com/kjsJ45O.png)
這配備比我的小米筆記本還慘
相對的 prod 的配置好多了
![](https://i.imgur.com/uhcN8cf.png)
## 額外 config
https://aws.amazon.com/tw/blogs/big-data/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/
config 懶人包
```python
CONFIG = SparkConf().setAll([
('spark.executor.memoryOverhead', '4096'),
("spark.memory.fraction", "0.8"),
("spark.memory.storageFraction", "0.4"),
("spark.scheduler.listenerbus.eventqueue.capacity", "100000"),
("spark.executor.memory", "8g"),
("spark.driver.memory", "8g"),
# Recommand by aws doc https://aws.amazon.com/tw/blogs/big-data/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/
("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:OnOutOfMemoryError='kill -9 %p'"),
("spark.driver.extraJavaOptions", "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:OnOutOfMemoryError='kill -9 %p'")
])
```
注意
```
spark.executor.memory
spark.driver.memory
```
只是 spark 「可以 alloc 的上限」,並不代表設了就一定可以拿到,就算設很大如果那一台 master 已經沒 memory 了,要 600 mb 也可以當場暴斃
## ERS_S3
不支援 Python3
重寫了一個
```python
def S3_upload_obj(fd, output_bucket, output_path, sns_arn):
RETURN_STATUS_SUCCESS = 0
def normalize(value):
return value.rstrip().rstrip("/").lstrip("/")
s3_client = boto3.client('s3')
res = s3_client.put_object(Bucket = normalize(output_bucket),
Key = normalize(output_path),
Body = fd,
ServerSideEncryption = "AES256")
return res
```
## livy api
`http://emr-user.prod.ers.a1q7.net:8998/batches?size=100000000`
## 雞尾酒
Don't upload zero dataframe
Broadcast before join
2 paralle
cut sequence
sum+join before agg
![](https://i.imgur.com/EPmp3Z2.png)
![](https://i.imgur.com/fjGvaN7.png)
![](https://i.imgur.com/7vL1TXY.png)
## memory allocate
spark 的初始 memory allocate = executor * spark.executor.memory + spark.driver.memory + N(一些誤差)
初始大小 allocate 不到的話會一直待機,待機超過一定的時間會 timeout
初始大小不夠用的話會自動往上 allocate
有一部份的 buffer 似乎一定會 dynamic allocate,因此如果整個 hadoop 已經滿了就會跳 oom
lock 沒寫好 log 會亂跳 RDD no replica 之類的狂刷
## dealing data craw
直接 agg 太大量的資料會掛掉,有以下幾種解決方法
1. pre filter
很直觀,把資料量變少問題就變小
2. 2 way reduce
分兩次做 reduceByKey,先做一次 local 再做一次 global
ex. `[[hello, 5], [hello, 6], [hello, 2], [hello, 3]]`
先隨機上個 random tag
-> `[[hello, 5, 2], [hello, 6, 1], [hello, 2, 2], [hello, 3, 1]]`
-> `[[hello, 7, 2], [hello, 9, 1]`
對每個 executor 都做完這種事情之後再做 glocal reduceByKey
沒有解決 data craw 的本質但是減少了 shuffle 的運算量
3. 用 broadcast 躲過 shuffle
dataframe 需要 shuffle 是因為在 join 時不知道每個 row 的確切位置要打到那,那如果今天我 join 的表可以切成一大一小的話。
只要把小的 broadcast 到所有的 executor 上就可以去除 shuffle 的必要性,因為每個 executor 都已經有完整的資訊可以做 join 了
4. 切開分別做 join
大表切小表,分開 join
最後再 union
有相依性的可以先套上一個 key 之後切開再 join 再 join
基本上就是第二種方法的延伸。
## What my problem
OOM 但有多種原因同時造成
### Boss 1
#### 症狀:Aggregate 太大的資料
我一開始把 suffix 整個 explode 之後全部做 agg
![](https://i.imgur.com/qcGrWk5.png)
特徵:有時候會過,有時候不會過,因為每次執行 spark job 時 partition 的分區都不同,可能 data 會歪,可能不會,歪太多 memory 放不下就會 oom,反正就是會隨機爆炸。
#### 解決方案:把 memory 開大
看到 oom ,大家的第一個反應都是這樣
`spark.executor.memory", "16g`
#### 後遺症:沒有解決 Aggregate 時 data craw 的問題
### Boss 2
#### 症狀:parallel 走不動
Spark 起機器時 allocate memory 的計算公式如下
`executor * spark.executor.memory + spark.driver.memory + N(一些 buffer)`
如果太長時間要不到,就會 timeout。
特徵:因為 step function 打 livy 到 hadoop 接受的順序沒有保證,因此上去 Hadoop metrics 看會比較準。
#### 解決方案:把 parallel 砍掉
我把四個砍成兩個,能動了,暫時。
### Boss 1 再臨
#### 症狀:Aggregate 時還是有機率炸掉
特徵:因為在 Agg 時 memory 使用量會突然增加,當兩個 job 剛好把 emr 的 memory 吃滿的時候,其中一個 job 要就會 oom
#### 解決方案:先 reduceByKey 再 filter 之後才 Aggregate
大幅減少需要 aggregate 的資料量,從幾十萬筆 -> 幾千筆,差了 100 倍
因為步驟多了,速度其實變慢了,所以我又把 parallel 開起來。
### Boss 2 再臨
#### 症狀:EMR 被吃太滿常常會有一個 job 被撞到 oom
兩個起來總共會卡住三百多快四百的 memory,後面排太多的時候會有一個 job 被撞到 oom
#### 解決方案:把 parallel 拔掉,並且改成動態的 allocate memory
現在剛啟動時只會吃 100 多 g 了
### Boss 3
#### 症狀:LSS 跑比較久,當後面需要大量 memory 的 job 排太多,LSS 會被撞下去
Spark 會動態的分配記憶體,如果其他 job 都跑完了就多分一點給 LSS,沒有的話 LSS 靠一百多 G 也能正常跑,然而 LSS 要跑 256 個 Class A 需要 2~3 個小時的時間才能跑完。當後面累積的 job 越來越多,LSS 會被其他人撞下去。
:::info
或是 EMR 會掛掉......
:::
#### 車禍現場
![](https://i.imgur.com/dWJfZLy.png)
## PySpark 潛規則
我們沒有設定 spark 搶資源的上限,基本上就是閒置多少搶多少,所以我們 spark 最好都跑一些很快就能處理掉的 task
就算不能很快處理掉也要切成小塊的避免長佔資源,長佔資源導致其他 job 累積太多的話 emr 有可能直接掛掉,或是 emr 直接幹掉這個 job。
### How to verify Clouformation template
1. Go to AWS Cloudformation stack console
![](https://i.imgur.com/r7K6aRb.png)
2. Choose Previous console and select the stack which you want to update
![](https://i.imgur.com/c5mFM7r.png)
3. Select update stack and pass you yaml file
![](https://i.imgur.com/CM6FT3m.png)
4. You will get the result about is syntx validate or not
![](https://i.imgur.com/ITgEptJ.png)
### How to get the reason why CloudFormation deployment failed
1. Go to AWS Cloudformation stack console
![](https://i.imgur.com/r7K6aRb.png)
2. Choose Previous console and select the stack which you want to know
![](https://i.imgur.com/c5mFM7r.png)
3. You will find the error message like below
![](https://i.imgur.com/3JBoZNB.png)
#### If you want full error message
0. install AWS-CLI on your machine
1. Set AWS STS on your machine
2. `aws cloudformation describe-stack-events --stack-name [STACKNAME] --max-items 1000 > cf.log`
3. `cf.log` will contain full log like below
![](https://i.imgur.com/CX6ZOek.png)
### If your InputTransformer reach the length limit
[doc](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-events-rule-inputtransformer.html)
![](https://i.imgur.com/hw8xjzW.png)
Then split your InputTransformer to diffrent Target like bellow
![](https://i.imgur.com/AioUxcw.png)
### StateMachine bug
StateMachine 的定義寫錯 deploy 並不會失敗
只有跑到最後面的時候會掛掉,還有流程圖會畫不出來
上去 stepfunction 案 edit 之後刷新流程圖就可以看到定義哪裡寫錯
或是直接丟給 json formatter 也可以找出來
## column name 有 . 怎麼辦?
```
"`tmase_result.total_score`"
```
用這種格式去拿
## 限制 executor 數量
`('spark.dynamicAllocation.maxExecutors', '8'),`
預設是無限制,無限制在 shuffle 或是 upload 的時候特別容易 oom
因為會 allocate 一大堆 uploader
## pyspark 沒 log
應更改執行時的 input
`"spark.submit.deployMode": "cluster"` 改成 client 即可
```json
{
"FromAddrCount": {
"env": "test",
"region": "us-west-2",
"s3_script_bucket": "coretech-ers-test-emr-us-west-2/app",
"proj_name": "EurekaPreprocessedLogFromAddrCount",
"wait_time": 30,
"file": "EurekaPreprocessedLogFromAddrCount.py",
"args": [
"--dtime",
"2020-10-13T06:15:00Z",
"--env",
"test",
"--region",
"us-west-2"
],
"conf": {
"spark.submit.deployMode": "client"
},
"py3": true
}
}
```
用比喻好了 Spark 是一人力派遣公司 裡面有兩種人
(老闆) Master Node - (員工) Worker Nodes
當接到了一個案子 Spark Job, 老闆要選出一個工頭(Driver)和一些工人(Excutors)幫忙做事
deploy mode: client, 老闆(Master node)自己當工頭(Driver)
deploy mode: cluster, 老闆(Master node)選一個員工(worker node)當工頭
所以cluster mode就是老闆不會被操爆