# Windows下用pyspark連接mysql資料庫 ###### tags: `Python` > pyspark是Spark对Python的api接口,可以在Python环境中通过调用pyspark模块来操作spark,完成大数据框架下的数据分析与挖掘。其中,数据的读写是基础操作,pyspark的子模块pyspark.sql 可以完成大部分类型的数据读写。文本介绍在pyspark中读写Mysql数据库。 ## Window 10 PySpark 安裝 詳細可參考:[用 pip 在 Windows 上安裝單機版 pyspark](https://louis925.wordpress.com/2018/07/16/%E7%94%A8-pip-%E5%9C%A8-windows-%E4%B8%8A%E5%AE%89%E8%A3%9D%E5%96%AE%E6%A9%9F%E7%89%88-pyspark-%E4%B8%A6%E9%96%8B%E5%95%9F-jupyter-notebook/) 1. 先確定已安裝 Python (建議到 Python 官網下載安裝 64 bits https://www.python.org/downloads/windows/ ) 2. 安裝 Java https://www.java.com/ ,建議安裝到 C:\Java\ 下。若 Java 安裝路徑有空白,pyspark 執行會出現錯誤。 3. 新增 JAVA_HOME 到環境變數,例如: ``` JAVA_HOME = C:\Java\jre1.8.0_241 ``` ![](https://i.imgur.com/XeUqnFv.png) 4. 在命令提示字元輸入 ``` pip3 install pyspark ``` 5. 在命令提示字元輸入 pyspark 即可啟動 但是在 Windows 上還需要手動下載 winutils.exe 讓 Hadoop 在 Windows 上正常運作 7. 到 https://github.com/steveloughran/winutils/ 下載對應版本的 winutils.exe (在 bin 資料夾內,例如: https://github.com/steveloughran/winutils/blob/master/hadoop-3.0.0/bin/winutils.exe) 8. 將 winutils.exe 放到 C:\winutils\bin\ 下 9. 新增 Windows 環境變數: ``` HADOOP_HOME = C:\winutils ``` 接著還會遇到暫存資料夾 tmp\hive 權限的問題 10. 在你要開啟專案的硬碟的根目錄 (如 E:\) 建立資料夾 \tmp\hive (假如 ipynb 在 E:\abc\def\code.ipynb 內,就建立兩個資料夾在 E:\tmp\hive,一般此資料夾會自動在執行 pyspark 時建立,但權限會有問題,需手動修改) 11. 用 winutils.exe 改變該暫存資料夾的權限 ``` %HADOOP_HOME%\bin\winutils.exe chmod 777 E:\tmp\hive ``` 12. 檢查該資料夾權限 ``` %HADOOP_HOME%\bin\winutils.exe ls E:\tmp\hive ``` 應該要為 drwxrwxrwx 這樣應該就可以正常使用 pyspark 了 Reference: https://blogs.msdn.microsoft.com/arsen/2016/02/09/resolving-spark-1-6-0-java-lang-nullpointerexception-not-found-value-sqlcontext-error-when-running-spark-shell-on-windows-10-64-bit/ ## PySpark Mysql環境配置 詳細可參考: [pyspark对Mysql数据库进行读写 ](https://zhuanlan.zhihu.com/p/136777424) pyspark连接Mysql是通过java实现的,所以需要下载连接Mysql的jar包。 [下载地址](https://dev.mysql.com/downloads/connector/j/) ![](https://i.imgur.com/cfpmkGl.png) 选择下载Connector/J,然后选择操作系统为Platform Independent,下载压缩包到本地。 ![](https://i.imgur.com/c7y9wvk.png) 然後因為是直接通過 pip3 install pyspark 的方式安裝 PySpark,可以先透過以下程式碼查詢 PySpark路徑:(參考:[PySpark 連線 MySQL 示例 ](https://www.mdeditor.tw/pl/pZup/zh-tw)) ``` from pyspark import find_spark_home print(find_spark_home._find_spark_home()) ``` 然后解压文件,将其中的jar包mysql-connector-java-8.0.22.jar放入spark的安装目录下的jars資料夾 ![](https://i.imgur.com/kqLND6J.png) ## 讀取 MySql 下方程式碼參考:[PySpark 连接 MySQL 示例 ](https://juejin.im/post/6847902219627397127)的Spark 代码示例 ```python= from pyspark import SparkContext from pyspark.sql import SQLContext if __name__ == '__main__': # spark 初始化 sc = SparkContext(master='local', appName='sql') spark = SQLContext(sc) # mysql 配置(需要修改) prop = {'user': 'root', 'password': 'rootroot', 'driver': 'com.mysql.cj.jdbc.Driver'} # database 地址(需要修改) url = 'jdbc:mysql://localhost:3306/testdb?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC' # 读取表 data = spark.read.jdbc(url=url, table='user', properties=prop) # 打印data数据类型 print(type(data)) # 展示数据 data.show() # 关闭spark会话 sc.stop() ``` 注意點: 1. prop参数需要根据实际情况修改,文中用户名和密码用xxx代替了,driver参数也可以不需要; 2. url参数需要根据实际情况修改,格式为jdbc:mysql://主机:端口/数据库; 3. 通过调用方法read.jdbc进行读取,返回的数据类型为spark DataFrame; 輸出畫面如下: ![](https://i.imgur.com/Uh2D2TL.png) ### 解決使用JDBC連接 MySql 時,時區跟編碼的錯誤 參考:[使用JDBC连接MySql时出现...](https://www.cnblogs.com/EasonJim/p/6906713.html) 在连接字符串后面加上?serverTimezone=UTC 其中UTC是统一标准世界时间。 完整的连接字符串示例: `jdbc:mysql://localhost:3306/test?serverTimezone=UTC` 或者还有另一种选择: `jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8`,这个是解决中文乱码输入问题,当然也可以和上面的一起结合:`jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC` ## 寫入 MySql 下方程式碼參考:[pyspark对Mysql数据库进行读写 ](https://zhuanlan.zhihu.com/p/136777424)的**4 写入Mysql** ```python= import pandas as pd from pyspark import SparkContext from pyspark.sql import SQLContext, Row if __name__ == '__main__': # spark 初始化 sc = SparkContext(master='local', appName='sql') spark = SQLContext(sc) # mysql 配置(需要修改) prop = {'user': 'root', 'password': 'rootroot', 'driver': 'com.mysql.cj.jdbc.Driver'} # database 地址(需要修改) url = 'jdbc:mysql://localhost:3306/testdb?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC' # 创建spark DataFrame # 方式1:list转spark DataFrame l = [(1, 12), (2, 22)] # 创建并指定列名 list_df = spark.createDataFrame(l, schema=['id', 'value']) # 方式2:rdd转spark DataFrame rdd = sc.parallelize(l) # rdd col_names = Row('id', 'value') # 列名 tmp = rdd.map(lambda x: col_names(*x)) # 设置列名 rdd_df = spark.createDataFrame(tmp) # 方式3:pandas dataFrame 转spark DataFrame df = pd.DataFrame({'id': [1, 2], 'value': [12, 22]}) pd_df = spark.createDataFrame(df) # 写入数据库 pd_df.write.jdbc(url=url, table='new', mode='append', properties=prop) # 关闭spark会话 sc.stop() ``` 效果如下: ![](https://i.imgur.com/HnBLxao.png) 注意點: 1. prop和url参数同样需要根据实际情况修改; 2. 写入数据库要求的对象类型是spark DataFrame,提供了三种常见数据类型转spark DataFrame的方法; 3. 通过调用write.jdbc方法进行写入,其中的model参数控制写入数据的行为。 ![](https://i.imgur.com/NiGE5lu.png) 当数据库无写入的表时,这四种模式都会根据设定的表名称自动创建表,无需在Mysql里先建表。