# introduction Spark
## outline
1. Introduction to Hadoop
* hdfs
* HBase
2. spark 概念、核心架構
* spark SQL
* spark streaming
* structured streaming
* spark ML
## Introduction to Hadoop
Hadoop是一個能夠對大量資料進行分散式處理的軟體框架,它具有以下幾個方面的特性:
* 高效能: 採用分散式儲存(HDFS)與分散式處理(mapReduce)兩大核心技術,具有處理PB等級資料的能力。
* 成本低: 可以用在由一般PC所架設的集群環境內。
* 高可靠性: 採用冗餘資料儲存,當一個副本發生故障,可由其它副本資料正常對外服務。
* 高容錯性: 採用冗餘資料儲存,當某個節點發生錯誤,系統能即時自動的取得副本資料以及重新佈署運算資源與任務。
* 運行在Linux平台
* 支援多種程式設計語言(java,R,python)
* 高可擴展性(Scale Out擴充能力)
**Hadoop Ecosystem**

---
### HDFS(Hadoop Distributed File System)
**HDFS的設計理念:**
* 將分散式的檔案環境,模擬成單一的檔案目錄系統。
* 異地備份:
1. 每個檔案被分割成許多的區塊(Block),每個區塊的大小通常為64MB
或128MB。
2. 系統會將每個區塊複製成許多複本,並分散儲存於不同的資料節點
上。預設是3個複本,用戶可自行調整。
* 具備硬體錯誤容忍能力
1. 由於是採用一般PC或伺服器,故硬體易出狀況,而HDFS採用複製
資料以因應硬體的故障。
2. 當偵測到錯誤時,迅速地從複製的資料執行資料回復工作。
* 處理大規模資料集
1. 支援超過1萬個節點、Perabytes等級的資料量空間需求。
* 資料存取特性
1. Write-once-read-many存取模式,一次寫入,多次存取。
2. 檔案一旦建立、寫入,就不允許修改。
* 在地運算
1. 移動運算到資料端比移動資料到運算端來得成本低。
2. 由於資料的所在位置有被考慮,因此運算工作可以移至距離客戶端最近的資料節點之複本。
**HDFS的結構:**
分散式檔案系統在實體結構上是由電腦集群中的多個節點(node)構成的,這些節點分為兩類:
* 一類為主節點(Master Node)或稱名稱節點(NameNode)
* 另一類為從節點(SlaveNode)或稱資料節點(DataNode)
* 資料以區塊(Block)型式儲存


* 問題:
1. 不適合低延遲資料訪問(HDFS即時性不佳)
2. 無法高效儲存大量小檔(檔案太小、索引多/大)
---
### HBase
**HBase簡介:**
* HBase是一個高可靠、高性能、以欄為主、伸縮性佳的分散式資料庫系統,主要用來儲存非結構化和半結構化的鬆散資料。
* HBase的目標是處理非常龐大的表,可以透過橫向擴展的方式,利用廉價電腦集群處理由超過10億列資料和數百萬欄元素組成的資料表
**HBase與傳統關聯式資料庫的比較分析:**
* 資料型態:關聯式資料庫採用二維關聯模型,具有豐富的資料型態和儲存方式。HBase則採用了更加簡單的資料模型,它把資料儲存為未經解釋的字串,由應用程式的開發人員來解釋資料型態。
* 資料操作:關聯式資料庫中包含了豐富的操作,其中會涉及複雜的多表連接。HBase操作則不存在複雜的表與表之間的關係,只有簡單的插入、查詢、刪除、清空等,因為HBase在設計上就已經避免了複雜的表和表之間的關係,它僅靠一張或少數幾張表儲存所有資料。
* 儲存模式:關聯式資料庫是基於列模式儲存的。HBase是基於欄儲存的。
* 資料索引:關聯式資料庫通常可以針對不同欄建構複雜的多個索引,以提高資料連繫性能。HBase只有一個索引—列鍵,透過列鍵連繫或者掃描,從而使得整個系統不會慢下來
* 資料維護:在關聯式資料庫中,更新操作會用最新的當前值去替換記錄中原來的舊值,舊值被覆蓋後就不會存在。而在HBase中執行更新操作時,並不會刪除資料舊的版本,而是生成一個新的版本,舊有的版本仍然保留,經一段時間後,在後台進行清理時,將之刪除
* 可伸縮性:關聯式資料庫很難實現橫向擴展,縱向擴展的空間也比較有限。相反,HBase和BigTable這些分散式資料庫就是為了實現靈活的橫向擴展而開發的,能夠輕易地透過在集群中增加或者減少硬體數量來實現性能的伸縮
**舉例:學生資料庫系統**

-HBase資料模式

**HBase表格架構**
* 列鍵(Row Key):在HBase的表格中,每筆記錄用列鍵(Row Key)作為唯一標示,用來檢索記錄,可視為其主。資料儲存時,會按照Row key的順序排列。
* 欄族 (Column Family):
1. 每一列裡的資料是以欄族 (Column Family) 來分組。每個Column Family可有無限數量的 Columns。這裡的欄(Column)又可稱為欄限定詞(Column Qualifier)。
2. Column Family是HBase表格的Schema之一部份,而Column不是。因此,在實際操作每個欄的資料時,欄名之前都要以欄族名稱做為前置詞。格式為 <column family name>:<qualifier>
* 單元 (Cell):
1. 儲存在裡面的資料稱為單元值 (Cell’s value) ,同一個Cell內可能存有不同時間戳記的單元值。
2. 而**Row Key+Column Family+Column Qualifier+Time Stamp**組合起來可決定一個單元值。
* 時間戳記 (Time Stamp):
1. 對HBase的表格進行資料的插入、修改、刪除,其結果皆會被記載於表格中,並以時間戳記區分成不同的版本(Version)。因此,這些操作對HBase的表格來說皆為資料插入。
2. 進行資料查詢與處理時,若無指定版本,則以 “時間戳記最新” 的版本的值為基礎,愈新的資料排愈前面。
3. HBase會保留一定數量的版本,而預設數量是3個,此數量可自行設置。當HBase進行表格合併工作時,時間戳記較舊的資料會被刪除。時間戳記可以由HBase自動給定,或是使用者自行設定。
**HBase資料座標**
HBase中需要根據列鍵、欄族、欄限定詞和時間戳記來確定一個儲存格。因此,可以視為一個“四維座標”,即[列鍵, 欄族, 欄限定詞, 時間戳記]

---
## spark
是基於記憶體計算的巨量資料平行計算框架,可用於建構大型的、低延遲的資料分析應用
### 基本概念:
[concepts](https://www.readfog.com/a/1632012897572982784)
* RDD:是 Resilient Distributed Dataset(彈性分佈式數據集)的簡稱,是分佈式內存的一個抽象概念,提供了一種高度受限的共享內存模型。
* DAG:是 Directed Acyclic Graph(有向無環圖)的簡稱,反映 RDD 之間的依賴關係。
* Executor:是運行在工作節點(WorkerNode)的一個進程,負責運行 Task。
* Application:用戶編寫的 Spark 應用程序。
* Task:運行在 Executor 上的工作單元。
* Job:一個 Job 包含多個 RDD 及作用於相應 RDD 上的各種操作。
* Stage:是 Job 的基本調度單位,一個 Job 會分爲多組 Task,每組 Task 被稱爲 Stage,或者也被稱爲 TaskSet,代表了一組關聯的、相互之間沒有 Shuffle 依賴關係的任務組成的任務集。
### Spark執行架構包括:
* 集群資源管理器(Cluster Manager):對整個Spark應用程式進行資源的
分配和管理調度。可以是自帶的CM、Mesos或YARN。
* 工作節點(WorkerNode):用來執行Task的機器
* 任務控制節點(Driver):透過SparkContext這個class讓應用程式與Spark集群對接並進行控管,諸如:程式提交後產生有向無環圖DAG、對DAG分成多個階段、對多個階段進行任務拆解、把拆解後的任務分配到相關的Executor執行。
* 執行行程(Executor):每個工作節點上負責具體任務執行的行程

### RDD概念
RDD本質上是一個唯讀、不可改變的分區記錄集合。

RDD API 類型:
1. Transformation
2. Action

```python=
from pyspark import SparkContext
def main():
with SparkContext(appName='wordcount') with sc:
# transformation
data = sc.textFile('/input/a.txt').map(lambda x: x[0])\
.flatMap(lambda x: x.split(' '))\
.map(lambda x: (x, 1)).reduceByKey(lambda a, b: (a+b))
# action
result = data.collect()
for (word, count) in result:
print("word:", word, "count:", count)
if __name__ == '__main__':
main()
```
* Spark的運作核心為RDD 。有許多不同的RDD API
* RDD API的局限性
* 沒有Schema
* 因為沒有Schema,所以使用者需自已最佳化程式
* 從不同的資料來源讀取資料非常困難
* 合併多個資料來源的資料也很困難
---
## spark SQL
* Spark SQL是Spark中用於結構化資料處理的元件
* 採用DataFrame資料模型
* 帶有Schema的RDD
* 可連結多種不同類型的資料來源
* Hive, Avro, Parquet, JSON, JDBC, …
* 支援多種程式語言之撰寫
* Scala, Java, Python,SQl-92 …

* 可以透過JDBC連接外部的關聯式資料庫
* Spark SQL是佈署在Spark RDD API與運算框架之上,有兩層:
* Catalyst最佳化引擎:根據資料的分佈與特點,用以最佳化SQL指令
* 兩種編寫應用程式的方法:SQL與DataFrame/Dataset API

```python=
import org.apache.spark.sql.SparkSession
import spark.implicits._
/*
* SparkSession在Spark 2.0提出,未來計畫取代SparkContext。
* SparkSession包含了Saprk SQL、Streaming等等元件的進入點,並且支援查詢Hive包含HiveQL或UDF等等。
*/
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
//建立一個DataFrame
val df = spark.read.json("examples/src/main/resources/people.json")
// 顯示DataFrame資料內容:
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// 印出DataFrame的schema結構
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// 只顯示"name"欄位
df.select("name").show()
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
// 選出"name"、"age",並且將age欄位的值+1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+
// 選出age欄位值大於25的資料
df.filter($"age" > 25).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// 以name分群並計算數量
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
//使用Spark SQL進行查詢
// 首先註冊一個DataFrame當作SQL語法會用到的暫存veiw:"employee"
df.createOrReplaceTempView("employee")
val sqlDF = spark.sql("SELECT * FROM employee")
sqlDF.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
```
---
## spark streaming
### 串流計算概念
* 串流計算:即時獲取來自不同資料來源的海量資料,經過即時分析處理,獲得有價值的資訊
* 串流計算秉承一個基本理念,即資料的價值隨著時間的流逝而降低,如用戶點擊流。
* 對於一個串流計算系統來說,它應達到如下需求:
* 高性能:處理巨量資料的基本要求,如:每秒處理幾十萬條資料
* 海量式:支持TB級甚至是PB級的資料規模
* 即時性:保證較低的延遲時間,達到秒級別,甚至是毫秒級別
* 分散式:支援巨量資料的基本架構,即:必須能夠水準擴展與平行處理
* 易用性:能夠快速進行開發和部署集群及應用
* 可靠性:能可靠地處理串流資料
* 串流計算處理流程
1. 料即時收集
2. 資料即時計算
3. 即時查詢服務

* 應用情境: 串流計算適合於需要處理持續到達的串流資料、對資料處理有較
高即時性要求的場景
### Spark Streaming設計
* Spark Streaming可整合多種輸入資料來源,如Kafka、Flume、HDFS,甚至是普通的CP通訊端。經處理後的資料可儲存至檔案系統、資料庫,或顯示在儀錶板裡

* Spark Streaming的基本原理是將即時輸入資料流程以時間片段 (秒級) 為單位進行拆解,然後經Spark引擎以類似批次處理的方式處理每個時間片段資料

### Spark Streaming工作機制
* 在Spark Streaming中,會有一個元件Receiver,作為長期執行的task跑在一個Executor上
* 每個Receiver都會負責一個input Dstream (例如:從檔案、通訊埠、或是從Kafka中讀取資料流…等)
* Spark Streaming透過input DStream與不同的外部資料來源連接,以讀取相關資料

---
## structured streaming
structured streaming = spark streaming + spark SQL
* 是將即時資料流視為一個不斷增長、無邊界的輸入表格(Unbounded Input Table),新流進來的資料被持續不斷地添加到表格的末端
* 串流計算可視為在一個靜態表格(DataFrame/DataSet)上的批次處理,Spark會在此無邊界輸入表格上執行計算
* 和Spark SQL 共用大部分API,用戶可以使用Spark SQL 所提供的相關函式(function),來對串流資料進行即時查詢處理。

---
## spark ML
### 巨量資料的機器學習
* 傳統的機器學習演算法,由於技術和單機儲存的限制,只能運用在少量資料的議題上,並依賴於資料抽樣。巨量資料的出現,可以支援在資料母體上進行機器學習
* 機器學習演算法涉及大量反覆運算(迭代計算)
* 基於記憶體的Spark比較適合進行大量反覆運算
### ML pipeline

### 基本概念:
* DataFrame:
* Spark ML是以DataFrame資料集作為處理對象,用它來儲存資料,同時模型的預測結果也是以DataFrame型式表示。
* DataFrame可以儲存各種型態的資料、文字、特徵向量、類別標籤和預測標籤等。
* Estimator: 學習評估器,待訓練的機器學習演算法
* 有很多的學習評估器可使用,如:LogisticRegression(邏輯斯迴歸分類器)、DecisionTreeClassifier(決策樹分類器)、RandomForestClassifier(隨機森林分類器)、NaiveBayes(單純貝氏分類器)、KMeans(K-means聚類器)…等物件。
* 可以利用每個學習評估器中的fit()方法,輸入DataFrame型式的訓練資料,透過對演算法內參數的調整,訓練出符合訓練資料特徵的學習後模型 (即:PipelineModel)。
* Transformer:轉換器,DataFrame轉換為另一個新的DataFrame。
* 特徵轉換器,而Estimator經訓練後的學習後模型,也可被視為Transformer。
* 在spark.ml.feature中有很多不同類型的轉換器可使用,例如:
* HashingTF(雜湊轉換器)、StringIndexer(字串索引器)、OneHotEncoder轉換器…等物件,可依不同需求,將某DataFrame資料集轉換成合乎需要的新DataFrame資料集。
* Parameter:
* 學習到適合的Parameter主要是用來設置Transformer或Estimator以執行資料分析的任務。
* PipeLine:
* 將多個階段(Stage/PipelineStage,即:前述所指的各項轉換器和學習評估器)以形成機器學習的工作流程。
