# **【PySpark 巨量資料處理:Azure Databricks 設置】**
:::info
- 什麼是 Apache Spark?
- 什麼是Microsoft Azure?
- 申請帳戶
- Azure Databricks 設置
- 創建 計算池(Compute Pools)
- 創建 計算策略(Compute Policy)
- Azure Databricks 基本操作
- 設置付費提醒
- 從 Azure Databricks 訪問 Data Lake Storage(ADLS)
- Access Keys
- Microsoft Azure Storage Explorer APP
- SAS Token
- Service Principal
- 管理 Secrets、Secret Scope
- Access Keys
- SAS Token
- Service Principal
- Spark
- DBFS 讀取資料
- 外部存儲系統(例如Azure Blob Storage、Amazon S3)掛載到 DBFS 中,之後直接從 databricks 讀取
- 掛載多個
- 寫回 Azure storge account 儲存體帳戶
- Workflow
- 嵌入其他筆記本
- 執行多個筆記本,結果存儲在變數中
- JOIN
- Databricks Jobs
- Delta Lake
- Azure Data Factory
- Azure Machine Learning
- 視覺化BI連接
:::
<br/>
## 什麼是 Apache Spark?
看這裏
[【Spark + Iceberg:本地端 Memory Tuning、Join 與資料傾斜處理】](https://hackmd.io/@workcata/S1StkgX9el)
<br/>
## 什麼是 Microsoft Azure Databricks ?
Microsoft Azure Databricks 結合 Apache Spark 和 Azure
Azure Databricks 提供 Apache Spark 分析引擎,支持分佈式數據處理,來處理大規模的數據集
可以掛載外部儲存體帳戶(例如 Azure Blob Storage、Azure Data Lake Storage,從Databricks讀取並分析,再寫回外部儲存體帳戶
- 雲的優勢
Azure Databricks 能有效地處理大規模的數據集,用戶可以根據實際需求按需擴展或縮小計算和存儲資源
團隊中用戶也可以在同一項目協作,共享代碼、數據、視覺化
<br/>
## 申請帳戶

免費帳戶可以使用12個月,會有200美金的額度
中間會預刷1美金的預授權來開通

開通後登入

>如果是學生,且email有.edu,可以申請學生帳戶,100美金的額度,功能會比較少
>google "Start building the future with Azure for Students" 註冊
<br/>
### Azure Databricks 設置
Azure Databricks 是 Microsoft Azure 提供的基於 Apache Spark 的大數據分析平台
搜尋

建立






需要等一小段時間
成功後,點 前往資源

啟動工作區


支援多種數據

創cpmpute


等一段時間

#### 創建 計算池(Compute Pools)
Compute Pools是一種VM資源配置和管理概念,它會照我們設的條件自動配置不同類型和大小的VM

創建


#### 創建 計算策略(Compute Policy)
Compute Policy 是一種自動調整Databricks集群的工作節點數量的設定
創建



<br/>
## Azure Databricks 基本操作
- 打開資料夾,一切都很像jupyter notebook * shift+enter 運行



>可以更改連接
>
<br/>
>"H" 查看所有快捷鍵
>
<br/>
>雖然我選python,但可以使用"%"轉換
>
<br/>
>"%fs
>ls /databricks-datasets" 查看內建資料集
>
<br/>
>"dbutils.fs.ls('/databricks-datasets/')" 列出指定路徑 /databricks-datasets/ 下的文件和目錄
>* 如果要確保答案會出來,可以用display(包住)
<br/>
## 設置付費提醒
進到 成本管理 - 預算
上方加入



<br/>
## 從 Azure Databricks 訪問 Data Lake Storage(ADLS)
- Access Keys、Microsoft Azure Storage Explorer APP
優點:簡單、快速,只需使用存取金鑰即可訪問 ADLS
缺點:需要管理金鑰、缺乏細粒度的權限控制,所有使用該金鑰的人都有相同的權限
選storge account 儲存體帳戶

建立



創建好後

新增容器並上傳資料


可以編輯並預覽


>假設要一次查看所有的,下載 Microsoft Azure Storage Explorer APP (類似mysql workbench)
>
>選download now
>
>
>使用azure登入
>
>
>選擇訂閱
>
>
>
>
>
查看資料
databicks,key從ui介面查看

```=
spark.conf.set(
"fs.azure.account.key.001dl.dfs.core.windows.net",
"key")
```
```=
display(dbutils.fs.ls("abfss://test@001dl.dfs.core.windows.net"))
```

```=
display(spark.read.csv("abfss://test@001dl.dfs.core.windows.net/file_name.csv"))
```

- SAS Token
優點:提供有限時間的臨時授權,增加安全性
缺點: 需要管理 SAS Token、有效期限需要謹慎設置

最下面會產生token,複製

用 Storge Explorer

```=
spark.conf.set("fs.azure.account.auth.type.001dl.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.001dl.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set("fs.azure.sas.fixed.token.001dl.dfs.core.windows.net", "剛剛複製的token")
```
後面方式一樣
```=
display(dbutils.fs.ls("abfss://test@001dl.dfs.core.windows.net"))
```
```=
display(spark.read.csv("abfss://test@001dl.dfs.core.windows.net/file_name.csv"))
```
- Service Principal
優點:使用 Azure Active Directory (AAD) 身份驗證,更安全、可以有自己的權限,實現更細粒度的存取控制
缺點:設置較複雜,需要設定 Service Principal 以及相應的權限、需要定期更新 Service Principal 的密碼或使用者憑證
在Azure設定使用者

新增註冊

應用程式 (用戶端) 識別碼
目錄 (租用戶) 識別碼
複製

新增一個密鑰,複製 值 secret value

回到Storage Account,設定IAM




貼上前面複製的
```=
client_id = "@@@"
tenant_id = "@@@"
client_secret = "@@@"
```
```=
spark.conf.set("fs.azure.account.auth.type.001dl.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.001dl.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.001dl.dfs.core.windows.net", client_id)
spark.conf.set("fs.azure.account.oauth2.client.secret.001dl.dfs.core.windows.net", client_secret)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.001dl.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")
```
後面方式一樣
```=
display(dbutils.fs.ls("abfss://test@001dl.dfs.core.windows.net"))
```
```=
display(spark.read.csv("abfss://test@001dl.dfs.core.windows.net/file_name.csv"))
```
<br/>
### 管理 Secrets、Secret Scope
可以將機密信息,例如資料庫密碼、API 金鑰存到 Secrets,防止機密資料外洩
Secret Scopes 用來將 Secrets 組織起來,再分配給特定的用戶、群組或工作區
#### Access Keys
- Secrets
建立資源 key vault


將Access Keys 貼到 secret value 祕密值


<br/>
>PS 如果出現權限不夠,去角色指派將自己加入管理員
>
>
<br/>
- Secret Scope
databrick主頁url加上 "/secrets/createScope"

到剛剛 D001dl-key-vault - properties 屬性複製貼過去


列出所有的 Secret Scopes
```=
dbutils.secrets.listScopes()
```

列出指定 Secret Scope 中的所有 Secrets
```=
dbutils.secrets.list(scope = 'D001dl-secret-scope')
```

指定 Secret Scope 名稱是 'D001dl-secret-scope'
指定的 key 名稱是 'D001dl-account-key'
```=
dbutils.secrets.get(scope = 'D001dl-secret-scope', key = 'D001dl-account-key')
```

<br/>
>PS 如果出現權限不夠,去角色指派將AzureDatabricks 加入secrets/readMetadata 權限
- 讀取
設定變量
```=
formula1dl_account_key = dbutils.secrets.get(scope = 'D001dl-secret-scope', key = 'D001dl-account-key')
```
設定spark訪問
```=
spark.conf.set(
"fs.azure.account.key.001dl.dfs.core.windows.net",
formula1dl_account_key)
```
列出 Azure Data Lake Storage Gen2 中指定路徑(/test)的文件和文件夾
```=
display(dbutils.fs.ls("abfss://test@001dl.dfs.core.windows.net"))
```

讀取 Azure Data Lake Storage Gen2 中的 CSV 文件
```=
display(spark.read.csv("abfss://test@001dl.dfs.core.windows.net/file_name.csv"))
```
<br/>
### SAS Token
- Secrets
key value 如果有建過,不用重建
把前面複製過的sas token 貼到 secret

- Secret Scope
前面如果有建過,不用重建
- 讀取
設定變量
```=
D001dl_teest_sas_token = dbutils.secrets.get(scope = 'D001dl-secret-scope', key = 'D001dl-sas-token')
```
設定spark訪問
```=
spark.conf.set("fs.azure.account.auth.type.001dl.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.001dl.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set("fs.azure.sas.fixed.token.001dl.dfs.core.windows.net", D001dl_teest_sas_token)
```
列出 Azure Data Lake Storage Gen2 中指定路徑(/test)的文件和文件夾
```=
display(dbutils.fs.ls("abfss://test@001dl.dfs.core.windows.net"))
```

讀取 Azure Data Lake Storage Gen2 中的 CSV 文件
```=
display(spark.read.csv("abfss://test@001dl.dfs.core.windows.net/file_name.csv"))
```
<br/>
### 管理 Service Principal
- Secrets
key value 如果有建過,不用重建
把前面複製過的 client_id、 tenant_id、client_secret 貼到 secret

- Secret Scope
前面如果有建過,不用重建
- 讀取
設定變量
```=
client_id = dbutils.secrets.get(scope = 'D001dl-secret-scope', key = 'D001dl-app-client-id')
tenant_id = dbutils.secrets.get(scope = 'D001dl-secret-scope', key = 'D001dl-app-tenant-id')
client_secret = dbutils.secrets.get(scope = 'D001dl-secret-scope', key = 'D001dl-app-client-secret')
```
設定spark訪問
```=
spark.conf.set("fs.azure.account.auth.type.001dl.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.001dl.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.001dl.dfs.core.windows.net", client_id)
spark.conf.set("fs.azure.account.oauth2.client.secret.001dl.dfs.core.windows.net", client_secret)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.001dl.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")
```

列出 Azure Data Lake Storage Gen2 中指定路徑(/test)的文件和文件夾
```=
display(dbutils.fs.ls("abfss://test@001dl.dfs.core.windows.net"))
```
讀取 Azure Data Lake Storage Gen2 中的 CSV 文件
```=
display(spark.read.csv("abfss://test@001dl.dfs.core.windows.net/file_name.csv"))
```
<br/>
## Spark 指令
前面都是直接使用 Spark 配置,分別列出和讀取 Azure Data Lake Storage Gen2 中的文件
優點: 簡單直接,對於少量的數據讀取操作比較方便
缺點: 不夠靈活,無法在數據讀取之前,進行額外的授權控制和設定
除了這個方法外,更簡單可以直接上傳databricks的DBFS。或是將 Azure Data Lake Storage Gen2 掛載到 DBFS,然後顯示掛載點的內容,並讀取文件
- DBFS 讀取資料
右上方帳號 管理員設置 - 高級 - DBFS File Browser 打開
之後就可以直接上傳檔案

列出根目錄 (/) 下的所有文件
```=
display(dbutils.fs.ls('/'))
```

列出 FileStore 目錄下的所有文件和文件夾
```=
display(dbutils.fs.ls('/FileStore'))
```

列出 fileStore 目錄下的 CSV 文件
```=
display(spark.read.csv('/FileStore/file_name.csv'))
```

<br/>
- 外部存儲系統(例如Azure Blob Storage、Amazon S3)掛載到 DBFS 中,之後直接從 databricks 讀取
可以從 Azure storge account 儲存體帳戶 上傳 raw data

>也可以使用 Microsoft Azure Storage Explorer

<br/>
設定變量
```=
client_id = dbutils.secrets.get(scope = 'D001dl-secret-scope', key = 'D001dl-app-client-id')
tenant_id = dbutils.secrets.get(scope = 'D001dl-secret-scope', key = 'D001dl-app-tenant-id')
client_secret = dbutils.secrets.get(scope = 'D001dl-secret-scope', key = 'D001dl-app-client-secret')
```
配置 Azure Data Lake Storage Gen2 的授權信息
```=
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": client_id,
"fs.azure.account.oauth2.client.secret": client_secret,
"fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"}
```
掛載 Azure Data Lake Storage Gen2
```=
dbutils.fs.mount(
source = "abfss://test@001dl.dfs.core.windows.net/",
mount_point = "/mnt/001dl/test",
extra_configs = configs)
```
顯示掛載點內容
```=
display(dbutils.fs.ls("/mnt/001dl/test"))
# display(dbutils.fs.mounts())
# %fs
# ls /mnt/001dl/test
```

讀取文件
```=
display(spark.read.csv("/mnt/001dl/test/file_name.csv"))
```
設成 dataframe
```=
df = spark.read.csv("/mnt/001dl/test/file_name.csv")
```
印出df
```=
df.show()
```
卸載 Azure Data Lake Storage Gen2
```=
dbutils.fs.unmount('/mnt/001dl/test')
```
- 掛載多個
```=
def mounts(storage_account_name, container_name):
client_id = dbutils.secrets.get(scope = 'D001dl-secret-scope', key = 'D001dl-app-client-id')
tenant_id = dbutils.secrets.get(scope = 'D001dl-secret-scope', key = 'D001dl-app-tenant-id')
client_secret = dbutils.secrets.get(scope = 'D001dl-secret-scope', key = 'D001dl-app-client-secret')
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": client_id,
"fs.azure.account.oauth2.client.secret": client_secret,
"fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"}
if any(mount.mountPoint == f"/mnt/{storage_account_name}/{container_name}" for mount in dbutils.fs.mounts()):
dbutils.fs.unmount(f"/mnt/{storage_account_name}/{container_name}")
dbutils.fs.mount(
source = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/",
mount_point = f"/mnt/{storage_account_name}/{container_name}",
extra_configs = configs)
display(dbutils.fs.mounts())
```
```=
mounts('001dl', 'test')
```
```=
mounts('001dl', 'raw-data')
```
<br/>
## 寫回 Azure storge account 儲存體帳戶
做完 Spark 資料整理後,(在有掛載的情況下)可以把資料寫回去
```=
df.write.mode("overwrite").parquet("/mnt/001dl/test/file_name")
```
.partitionBy 分不同資料夾
```=
df.write.mode('overwrite').partitionBy('signin_year').parquet('/mnt/001dl/test/project_1')
```
查看確認
```=
%fs
ls /mnt/001dl/test/file_name
```
```=
df = spark.read.parquet("/mnt/001dl/test/file_name")
display(df)
```
假設資料處理後,再存進新的table
```=
# database: test
%python
df_1.write.format('parquet').saveAsTable('test.df_1_python')
# 指定路徑資料夾
df_1.write.format('parquet').option("path", f"{folder_name_path}/file_name").saveAsTable('test.df_1_python')
# 指定路徑資料夾
df_1.write.format('parquet').saveAsTable('test.df_1_python', path=f"/mnt/001dl/{folder_name_path}/file_name")
# 指定路徑資料夾
df_1.write.format("delta").mode("overwrite").save("/mnt/001dl/test/df_1")
%sql
DESC EXTENDED df_1_python;
```
>PS
```=
%sql
CREATE DATABASE IF NOT EXISTS test_2
LOCATION "/mnt/001dl/test"
```
<br/>
## Workflow
- 嵌入其他筆記本
```=
%run "../folder_name/notebook_name"
```
```=
df_1 = spark.read.parquet(f"{foldername_path}/file_name).filter("year" == "2023")
display(df)
```
```=
df_2 = spark.read.parquet(f"{foldername_path}/file_name")
df_2.createTempView("df_2_temp")
%sql
SELECT * FROM df_2_temp WHERE year = 2000
```
- 執行多個筆記本,結果存儲在變數中
```=
result_1 = dbutils.notebook.run("filename_1", 0, {"p_data_source": "Ergast API"})
result_1
result_2 = dbutils.notebook.run("filename_2", 0, {"p_data_source": "Ergast API"})
result_2
```
- JOIN DataFrames
```=
df_inner = df_1.join(df_2, df_1.userid == df_2.userid, "inner")
df_outer = df_1.join(df_2, df_1.userid == df_2.userid, "outer")
df_left = df_1.join(df_2, df_1.userid == df_2.userid, "left")
df_right = df_1.join(df_2, df_1.userid == df_2.userid, "right")
df_semijoin = df_1.join(df_2, df_1.userid == df_2.userid, "semi")
# 所有可能的行的組合
df_crossjoin = df_1.crossJoin(df_2)
```
- Databricks Jobs
Databricks Jobs 可以包含一系列的程式碼,在指定的時間進行定期執行或以其他觸發條件執行

<br/>
## Delta Lake
資料寫入 Delta Lake 格式
Delta Lake 是一個構建在 Apache Spark 上的開源存儲層,支持事務性數據湖
>參考 [Delta Lake documentation](https://docs.delta.io/latest/index.html)

>SQL大部分操作跟mysql差不多
>例如
>
>SQL VS Python寫法
```=
# SQL
%sql
UPDATE demo.df_1_python_3
SET fastestLap = fastestLap + 1
WHERE fastestLap <=2
#PYTHON
%python
from delta.tables import DeltaTable
deltaTablePeople = DeltaTable.forPath(spark, '/mnt/001dl/test/df_1')
deltaTablePeople.update("fastestLap <= 2", {"fastestLap": "fastestLap + 1"})
```
<br/>
寫進sql
```=
df_1.write.format("delta").mode("overwrite").partitionBy("col_name").saveAsTable('demo.df_1_python')
%sql
select * from demo.df_1_python
%sql
SHOW PARTITIONS demo.df_1_python
```
寫進儲存體帳戶指定資料夾
```=
df_1.write.format("delta").mode("overwrite").save("/mnt/001dl/test/df_1")
%python
df_1 = spark.read.format("delta").load("/mnt/001dl/test/df_1")
display(df_1)
```
```
# 從指定路徑資料夾寫進sql
%sql
CREATE TABLE demo.df_1
USING delta
LOCATION '/mnt/001dl/test/df_1'
%sql
select * from demo.df_1
```
<br/>
## Azure Data Factory
Azure Data Factory (ADF) 用於構建、部署和管理數據管道的 Azure 服務


啟動工作室



<br/>
## Azure Machine Learning


啟動工作室


<br/>
## 視覺化BI連接

複製cluster資料貼過去


