# Windows下用pyspark連接mysql資料庫
###### tags: `Python`
> pyspark是Spark对Python的api接口,可以在Python环境中通过调用pyspark模块来操作spark,完成大数据框架下的数据分析与挖掘。其中,数据的读写是基础操作,pyspark的子模块pyspark.sql 可以完成大部分类型的数据读写。文本介绍在pyspark中读写Mysql数据库。
## Window 10 PySpark 安裝
詳細可參考:[用 pip 在 Windows 上安裝單機版 pyspark](https://louis925.wordpress.com/2018/07/16/%E7%94%A8-pip-%E5%9C%A8-windows-%E4%B8%8A%E5%AE%89%E8%A3%9D%E5%96%AE%E6%A9%9F%E7%89%88-pyspark-%E4%B8%A6%E9%96%8B%E5%95%9F-jupyter-notebook/)
1. 先確定已安裝 Python (建議到 Python 官網下載安裝 64 bits https://www.python.org/downloads/windows/ )
2. 安裝 Java https://www.java.com/ ,建議安裝到 C:\Java\ 下。若 Java 安裝路徑有空白,pyspark 執行會出現錯誤。
3. 新增 JAVA_HOME 到環境變數,例如:
```
JAVA_HOME = C:\Java\jre1.8.0_241
```

4. 在命令提示字元輸入
```
pip3 install pyspark
```
5. 在命令提示字元輸入 pyspark 即可啟動
但是在 Windows 上還需要手動下載 winutils.exe 讓 Hadoop 在 Windows 上正常運作
7. 到 https://github.com/steveloughran/winutils/ 下載對應版本的 winutils.exe (在 bin 資料夾內,例如: https://github.com/steveloughran/winutils/blob/master/hadoop-3.0.0/bin/winutils.exe)
8. 將 winutils.exe 放到 C:\winutils\bin\ 下
9. 新增 Windows 環境變數:
```
HADOOP_HOME = C:\winutils
```
接著還會遇到暫存資料夾 tmp\hive 權限的問題
10. 在你要開啟專案的硬碟的根目錄 (如 E:\) 建立資料夾 \tmp\hive
(假如 ipynb 在 E:\abc\def\code.ipynb 內,就建立兩個資料夾在 E:\tmp\hive,一般此資料夾會自動在執行 pyspark 時建立,但權限會有問題,需手動修改)
11. 用 winutils.exe 改變該暫存資料夾的權限
```
%HADOOP_HOME%\bin\winutils.exe chmod 777 E:\tmp\hive
```
12. 檢查該資料夾權限
```
%HADOOP_HOME%\bin\winutils.exe ls E:\tmp\hive
```
應該要為 drwxrwxrwx
這樣應該就可以正常使用 pyspark 了
Reference: https://blogs.msdn.microsoft.com/arsen/2016/02/09/resolving-spark-1-6-0-java-lang-nullpointerexception-not-found-value-sqlcontext-error-when-running-spark-shell-on-windows-10-64-bit/
## PySpark Mysql環境配置
詳細可參考:
[pyspark对Mysql数据库进行读写
](https://zhuanlan.zhihu.com/p/136777424)
pyspark连接Mysql是通过java实现的,所以需要下载连接Mysql的jar包。
[下载地址](https://dev.mysql.com/downloads/connector/j/)

选择下载Connector/J,然后选择操作系统为Platform Independent,下载压缩包到本地。

然後因為是直接通過 pip3 install pyspark 的方式安裝 PySpark,可以先透過以下程式碼查詢 PySpark路徑:(參考:[PySpark 連線 MySQL 示例
](https://www.mdeditor.tw/pl/pZup/zh-tw))
```
from pyspark import find_spark_home
print(find_spark_home._find_spark_home())
```
然后解压文件,将其中的jar包mysql-connector-java-8.0.22.jar放入spark的安装目录下的jars資料夾

## 讀取 MySql
下方程式碼參考:[PySpark 连接 MySQL 示例
](https://juejin.im/post/6847902219627397127)的Spark 代码示例
```python=
from pyspark import SparkContext
from pyspark.sql import SQLContext
if __name__ == '__main__':
# spark 初始化
sc = SparkContext(master='local', appName='sql')
spark = SQLContext(sc)
# mysql 配置(需要修改)
prop = {'user': 'root',
'password': 'rootroot',
'driver': 'com.mysql.cj.jdbc.Driver'}
# database 地址(需要修改)
url = 'jdbc:mysql://localhost:3306/testdb?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC'
# 读取表
data = spark.read.jdbc(url=url, table='user', properties=prop)
# 打印data数据类型
print(type(data))
# 展示数据
data.show()
# 关闭spark会话
sc.stop()
```
注意點:
1. prop参数需要根据实际情况修改,文中用户名和密码用xxx代替了,driver参数也可以不需要;
2. url参数需要根据实际情况修改,格式为jdbc:mysql://主机:端口/数据库;
3. 通过调用方法read.jdbc进行读取,返回的数据类型为spark DataFrame;
輸出畫面如下:

### 解決使用JDBC連接 MySql 時,時區跟編碼的錯誤
參考:[使用JDBC连接MySql时出现...](https://www.cnblogs.com/EasonJim/p/6906713.html)
在连接字符串后面加上?serverTimezone=UTC
其中UTC是统一标准世界时间。
完整的连接字符串示例:
`jdbc:mysql://localhost:3306/test?serverTimezone=UTC`
或者还有另一种选择:
`jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8`,这个是解决中文乱码输入问题,当然也可以和上面的一起结合:`jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC`
## 寫入 MySql
下方程式碼參考:[pyspark对Mysql数据库进行读写
](https://zhuanlan.zhihu.com/p/136777424)的**4 写入Mysql**
```python=
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
if __name__ == '__main__':
# spark 初始化
sc = SparkContext(master='local', appName='sql')
spark = SQLContext(sc)
# mysql 配置(需要修改)
prop = {'user': 'root',
'password': 'rootroot',
'driver': 'com.mysql.cj.jdbc.Driver'}
# database 地址(需要修改)
url = 'jdbc:mysql://localhost:3306/testdb?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC'
# 创建spark DataFrame
# 方式1:list转spark DataFrame
l = [(1, 12), (2, 22)]
# 创建并指定列名
list_df = spark.createDataFrame(l, schema=['id', 'value'])
# 方式2:rdd转spark DataFrame
rdd = sc.parallelize(l) # rdd
col_names = Row('id', 'value') # 列名
tmp = rdd.map(lambda x: col_names(*x)) # 设置列名
rdd_df = spark.createDataFrame(tmp)
# 方式3:pandas dataFrame 转spark DataFrame
df = pd.DataFrame({'id': [1, 2], 'value': [12, 22]})
pd_df = spark.createDataFrame(df)
# 写入数据库
pd_df.write.jdbc(url=url, table='new', mode='append', properties=prop)
# 关闭spark会话
sc.stop()
```
效果如下:

注意點:
1. prop和url参数同样需要根据实际情况修改;
2. 写入数据库要求的对象类型是spark DataFrame,提供了三种常见数据类型转spark DataFrame的方法;
3. 通过调用write.jdbc方法进行写入,其中的model参数控制写入数据的行为。

当数据库无写入的表时,这四种模式都会根据设定的表名称自动创建表,无需在Mysql里先建表。