Hadoop + Spark 集群
===
Hadoop 集群 (Hadoop cluster) 由**17台主機共136個核心數、1TB記憶體組成**。
<!-- 本平台提供的開發環境為 RStudio ,使用R語言來撰寫程式:
* [RStudio](https://ba.cm.nsysu.edu.tw:8787/) -->
以下我們提供R語言使用 Hadoop+Spark 的幾項簡易教學:
* [Hadoop HDFS 教學](#Hadoo-HDFS-教學)
* [Spark 連線設定](#Spark-連線設定)
* [Spark 實作巨量資料範例](#Spark-實作巨量資料範例)
# Hadoop HDFS 教學
Hadoop 是什麼呢?
簡單來講,想像如果有個檔案大小超過你的個人電腦可以儲存的容量,那絕對無法被放進你的電腦對吧?這時候我們就需要 Hadoop 了!
**Hadoop 是一個能儲存並管理大量資料的雲端平台**,除了能儲存超過一個伺服器所能容納的檔案外,還**能夠同時儲存、處理、分析**幾千萬份這種超大的檔案,所以每當講到大數據,便會提到 Hadoop 這套技術。
這段影音及文字教學,目的是要讓使用者可以連線上管院伺服器上的 Hadoop,並對 HDFS(Hadoop File System) 上的檔案做些簡單的操作。
## 影音及文字教學
{%youtube 8CgHTT_Mhbc %}
---
如果使用資料集過大無法自行上傳到RStudio時,請寄信聯絡我們幫忙上傳: baadmin@cm.nsysu.edu.tw
1. **打開終端機**
首先我們要開啟Terminal。
進入RStudio之後,點選Terminal
3. **使用終端機與HDFS系統互動**
以下是基本的HDFS操作:
- **hdfs dfs -put [欲上傳文件位置] [上傳目的地]**: 寫入檔案至HDFS。
- **hdfs dfs -get [欲下載文件位置] [下載目的地]**:下載HDFS上的檔案。
```
# 將mtcars.csv資料集上傳至hdfs檔案處理系統
hdfs dfs -put ./mtcars.csv /data/yourAccountName/mtcars.csv
# 從hdfs上你的資料夾中下載mtcars.csv到本地端
hdfs dfs -get /data/yourAccountName/mtcars.csv ./mtcars.csv
```
- **hdfs dfs -ls [資料夾]**:列出HDFS上某資料夾中所有檔案。
- **hdfs dfs -rm [欲刪除檔案位置]**:刪除HDFS上的檔案。
```
#列出你資料夾下的檔案
hdfs dfs -ls /data/yourAccountName
#刪除你資料夾下的mtcars.csv
hdfs dfs -rm /data/yourAccountName/mtcars.csv
```
上述語法的 **/yourAccountName/** 部分,請使用者自行將其改成自己的目錄名稱。
mtcars.csv 也請改成你要分析的資料集
**以上就是對於Hadoop HDFS互動的介紹。**
除了用接下來教學的Spark,使用者也可以透過"MapRudeuce"的方法去對Hadoop上的資料做操作。
---
# Spark 連線設定
既然處理大數據我們有了Hadoop這個強大的工具了,那為何還有Spark的出現呢?而Spark與Hadoop又有什麼不同呢?
Hadoop在執行MapReduce的運算時,會將中間產生的數據,存儲在硬碟中,也就說任何的資料存取都是在執行I/O,而I/O往往是效能的瓶頸,因此會有讀寫資料延遲的問題。但Spark比Hadoop晚4年問世,卻能以100倍快的速度執行MapReduce是為什麼呢?
**Spark是基於記憶體**內的運算框架,在運算時,會將中間產生的數據暫存在記憶體中,因此可以大大地加快運算速度,尤其是反覆執行越多次時,所需讀取的資料量就越大,越能看出Spark的效能。而Spark同時也與Hadoop相容,所以同樣可以透過HDFS存儲檔案。
## 影音及文字教學
{%youtube 8GPJPELHEBU %}
---
1. **連線上HDFS**
- 請加在程式碼最前面,確保有執行
```R
Sys.setenv("HADOOP_CMD" = "/opt/hadoop/bin/hadoop")
```
2. **設定Spark的連線資訊**
- 由於Spark是基於記憶體的運算,所以在連線前必須了解連線的伺服器提供多大的資源,並設定合理的資源消耗,例如使用多少的記憶體、核心數等等。
```R
# the following code are fixed setting
library(dplyr)
library(sparklyr)
Sys.setenv(SPARK_HOME="/opt/spark")
config <- spark_config()
config$spark.executor.memory = "20G"
config$spark.cores.max = "10"
config$spark.driver.memory = "20G"
config$spark.yarn.executor.memoryOverhead = "4096"
# create a connection to Spark
sc <- spark_connect(master = "spark://192.168.1.100:7077", config = config)
```
執行完以上的程式後就可以連線上Spark了。
3. **在HDFS上存取檔案**
- 先前提到過,Spark跟HDFS是相容的,所以連線上Spark時也可以對HDFS上的檔案做存取。
- 底下程式碼示範如何從HDFS上讀出檔案和把R的物件儲存至HDFS。
```R
# 此處path參數中的yourAccountName請改為使用者自己的路徑
mySDF = spark_read_csv(sc, name = "mtcars", path = "hdfs://192.168.1.100:9000/data/yourAccountName/mtcars.csv", header = T)
# 也可以把 R dataframe 用Spark來讀取.
movies_sdf = copy_to(sc, df = ggplot2movies::movies, name = "movies",overwrite = T)
```
4. **中斷連線**
- 若使用者在執行完畢Spark的操作卻沒有中斷連線的話,會導致集群上環的運算資源被特定使用者佔用住,導致其他使用者無法使用群集上資源,所以,當使用完Spark時務必要養成斷線的好習慣。
```R
spark_disconnect(sc)
```
---
# Spark 實作巨量資料範例
接下來的範例會示範如何在Spark上操作巨量資料。
範例中我們會用到 babynames 資料集,資料集紀錄一百八十多萬的新生兒姓名資料。選用這個資料集的目的是要讓使用者感受在操作巨量資料時,Spark的運算能力有多強大。
1. **設定連線資訊&讀取資料**
- Spark 連線
```R
# the following code are fixed setting
Sys.setenv("HADOOP_CMD" = "/opt/hadoop/bin/hadoop")
library(dplyr)
library(sparklyr)
Sys.setenv(SPARK_HOME="/opt/spark")
config <- spark_config()
config$spark.executor.memory = "32G"
config$spark.cores.max = "50"
config$spark.driver.memory = "16G"
config$spark.yarn.executor.memoryOverhead = "4096"
# create a connection to Spark
sc <- spark_connect(master = "spark://192.168.1.100:7077", config = config)
```
- 資料集讀取
- 上述的資料集已經放在叫做 **sample** 資料夾裡面了,透過底下的程式碼就可以將資料讀出來了。
- babynames是美國新生兒的姓名資料,資料筆數大約有 **一百八十多萬** 多筆。
```R
data_file_1 = "hdfs:/home/sample/babynames.csv"
# 透過spark_read_csv 至 hdfs上 讀取babynames 資料集
baby = spark_read_csv(sc, "babynames", data_file_1)
# babynames 資料集總共有一百八十多萬筆
baby %>% summarise(n = n())
```
2. **簡單查詢句:以 babynames 資料集為例**
使用者可以想像當資料筆數超級多的時候,一句查詢句至少要搜尋過全部資料一次,那會需要花多少時間去執行這個查詢句,況且實務上我們不會只做單一條件的查詢句,也就是說我們的條件可能會很複雜包含不只一個變數,那表示一定會搜尋全部的資料不少次,可想而知,這必定是很耗資源的一段程式。但透過Spark,執行底下的程式最多只要花5~7秒的時間就可以完成查詢,這就是Spark強大的地方。
- 以babynames資料為例,底下查詢句用來查詢各欄位中 **不重複的資料** 有幾筆。
- 執行結果可以看出prop欄位的不重複資料有二十多萬多筆,卻能夠很快就完成查詢句。
```R
# 設定起始時間
start_time <- proc.time()
baby %>%
summarise(n_year = n_distinct(year),
n_sex = n_distinct(sex),
n_name = n_distinct(name),
n_prop = n_distinct(percent)) %>%
data.frame
# n_year n_sex n_name n_prop
# 129 2 6782 11545
# 用程式執行完的時間減去起始時間
proc.time() - start_time
# user system elapsed
# 0.059 0.025 0.769
```
- 如果是直接在執行相同的查詢句,而不是透過spark做運算的話,其執行速度沒有辦法來得像spark這麼快。
```R
# 設定起始時間
start_time <- proc.time()
babynames::babynames %>%
summarise(n_year = n_distinct(year),
n_sex = n_distinct(sex),
n_name = n_distinct(name),
n_n = n_distinct(n),
n_prop = n_distinct(prop)) %>%
data.frame
# n_year n_sex n_name n_n n_prop
# 136 2 95025 13604 162480
proc.time() - start_time
# user system elapsed
# 0.578 0.013 0.658
```
3. **資料視覺化:以 babynames 資料集為例**
babynames這個資料集紀錄從1880年開始,在美國每年出生的新生兒取哪些名字,每個名字總共被用了幾次等資訊,在資料集裡面共有5個columns,分別是年份、性別、姓名、總數、佔總出生人數的比重。
在範例中,寫了一個函式用來查詢特定一年中,哪些名字最常被使用來命名。
- 載入會用到的packages
```R
library(ggplot2) #The following are the tools for graphing
library(dygraphs)
library(rbokeh)
library(RColorBrewer)
library(plotly)
```
- 先找出1986以後且被用來命名過超過一千次的資料,列出姓名跟性別這個兩個變數,並以topNames這個變數儲存起來。
```R
topNames = baby %>%
filter(year > 1986) %>%
group_by(name, sex) %>%
summarize(count = sum(n)) %>%
filter(count > 1000) %>%
select(name, sex)
```
- 接著用原始資料對topNames做inner_join,並且group_by年份這個變數,表示出來的結果會是各年中最代表的姓名
```R
yearlyName = baby %>%
filter(year > 1986) %>%
inner_join(topNames) %>%
group_by(year, name, sex) %>%
summarize(count = sum(n)) %>%
sdf_copy_to(sc, ., "yearlyname", T, overwrite=T)
```
- 再來我們將上面做過的事情,寫成一個函式,以便未來想要做查詢的時候,不需要重複打這麼多程式碼,只要傳入年份變數就可以得到我們想要的結果,還可以把結果以圖形化的方式呈現出來。
```R
MostPopularNames <- function(year) {
topNames <- baby %>%
filter(year >= 1986) %>%
group_by(name, sex) %>%
summarize(count = sum(n)) %>%
filter(count > 1000) %>%
select(name, sex)
yearlyName <- baby %>%
filter(year >= 1986) %>%
inner_join(topNames) %>%
group_by(year, name, sex) %>%
summarize(count = sum(n)) %>%
sdf_copy_to(sc, ., "yearlyname", T, overwrite=T)
TopNm <- yearlyName %>%
filter(year == year) %>%
group_by(name,sex) %>%
summarize(count=sum(count)) %>%
group_by(sex) %>%
mutate(rank = min_rank(desc(count))) %>%
filter(rank < 5) %>%
arrange(sex, rank) %>%
select(name,sex,rank) %>%
sdf_copy_to(sc,.,"topNames",T,overwrite=TRUE)
topNamesYearly <- yearlyName %>%
inner_join(select(TopNm, sex, name)) %>%
collect
#兩種畫法可以選一種畫就好
#第一種圖
ggplot(topNamesYearly,
aes(year, count, color = name)) +
facet_grid(~sex) +
geom_line() +
ggtitle(paste0("Most Popular Names of ", year))
#第二種圖
# 拿出名字欄位之後要做factor化
names <- TopNm %>%
select(name) %>%
collect %>%
as.data.frame
names <- names[, 1, T]
topNamesYearly$name <- factor(topNamesYearly$name, levels = names, labels = names)
p = ggplot(topNamesYearly, aes(year, count, color=name)) +
theme_light() +
facet_wrap(~sex, nrow=2) +
geom_vline(xintercept = year, col='yellow', lwd=1.2) +
geom_line() +
ggtitle(sprintf('Most Popular Names of %d',year)) +
scale_colour_brewer(palette = "Paired")
plotly_build(p)
}
```
- 透過函式給定特定年份,就可以得到該年中最常被使用的姓名是哪個。
```R
MostPopularNames(2015)
MostPopularNames(2000)
```

- 將男生姓名及女生姓名在資料中的分佈狀況畫出來。
```R
sharedName = baby %>%
mutate(male=ifelse(sex == "M", n, 0),
female=ifelse(sex == "F", n, 0)) %>%
group_by(name) %>%
summarize(Male = sum(male),
Female = sum(female),
count = sum(n),
AvgYear = round(sum(year * n) / sum(n),0)) %>%
filter(Male > 10000 & Female > 10000) %>% collect
figure(width = NULL, height = NULL,
xlab = "Log10 Number of Males",
ylab = "Log10 Number of Females",
title = "Top shared names (1880 - 2014)") %>%
ly_points(log10(Male), log10(Female), data = sharedName,
color = AvgYear, size = scale(sqrt(count)),
hover = list(name, Male, Female, AvgYear), legend = FALSE)
```

4. **使用完Spark後,如果暫時沒有要再使用,務必要將其中斷連線**
```R
spark_disconnect(sc)
```