# introduction Spark ## outline 1. Introduction to Hadoop * hdfs * HBase 2. spark 概念、核心架構 * spark SQL * spark streaming * structured streaming * spark ML ## Introduction to Hadoop Hadoop是一個能夠對大量資料進行分散式處理的軟體框架,它具有以下幾個方面的特性: * 高效能: 採用分散式儲存(HDFS)與分散式處理(mapReduce)兩大核心技術,具有處理PB等級資料的能力。 * 成本低: 可以用在由一般PC所架設的集群環境內。 * 高可靠性: 採用冗餘資料儲存,當一個副本發生故障,可由其它副本資料正常對外服務。 * 高容錯性: 採用冗餘資料儲存,當某個節點發生錯誤,系統能即時自動的取得副本資料以及重新佈署運算資源與任務。 * 運行在Linux平台 * 支援多種程式設計語言(java,R,python) * 高可擴展性(Scale Out擴充能力) **Hadoop Ecosystem** ![Hadoop Ecosystem](https://i.imgur.com/IrxBqZ2.png) --- ### HDFS(Hadoop Distributed File System) **HDFS的設計理念:** * 將分散式的檔案環境,模擬成單一的檔案目錄系統。 * 異地備份: 1. 每個檔案被分割成許多的區塊(Block),每個區塊的大小通常為64MB 或128MB。 2. 系統會將每個區塊複製成許多複本,並分散儲存於不同的資料節點 上。預設是3個複本,用戶可自行調整。 * 具備硬體錯誤容忍能力 1. 由於是採用一般PC或伺服器,故硬體易出狀況,而HDFS採用複製 資料以因應硬體的故障。 2. 當偵測到錯誤時,迅速地從複製的資料執行資料回復工作。 * 處理大規模資料集 1. 支援超過1萬個節點、Perabytes等級的資料量空間需求。 * 資料存取特性 1. Write-once-read-many存取模式,一次寫入,多次存取。 2. 檔案一旦建立、寫入,就不允許修改。 * 在地運算 1. 移動運算到資料端比移動資料到運算端來得成本低。 2.  由於資料的所在位置有被考慮,因此運算工作可以移至距離客戶端最近的資料節點之複本。 **HDFS的結構:** 分散式檔案系統在實體結構上是由電腦集群中的多個節點(node)構成的,這些節點分為兩類: * 一類為主節點(Master Node)或稱名稱節點(NameNode) * 另一類為從節點(SlaveNode)或稱資料節點(DataNode) * 資料以區塊(Block)型式儲存 ![](https://i.imgur.com/TaFWNnm.png) ![](https://i.imgur.com/z67UecV.png) * 問題: 1. 不適合低延遲資料訪問(HDFS即時性不佳) 2. 無法高效儲存大量小檔(檔案太小、索引多/大) --- ### HBase **HBase簡介:** * HBase是一個高可靠、高性能、以欄為主、伸縮性佳的分散式資料庫系統,主要用來儲存非結構化和半結構化的鬆散資料。 * HBase的目標是處理非常龐大的表,可以透過橫向擴展的方式,利用廉價電腦集群處理由超過10億列資料和數百萬欄元素組成的資料表 **HBase與傳統關聯式資料庫的比較分析:** * 資料型態:關聯式資料庫採用二維關聯模型,具有豐富的資料型態和儲存方式。HBase則採用了更加簡單的資料模型,它把資料儲存為未經解釋的字串,由應用程式的開發人員來解釋資料型態。 * 資料操作:關聯式資料庫中包含了豐富的操作,其中會涉及複雜的多表連接。HBase操作則不存在複雜的表與表之間的關係,只有簡單的插入、查詢、刪除、清空等,因為HBase在設計上就已經避免了複雜的表和表之間的關係,它僅靠一張或少數幾張表儲存所有資料。 * 儲存模式:關聯式資料庫是基於列模式儲存的。HBase是基於欄儲存的。 * 資料索引:關聯式資料庫通常可以針對不同欄建構複雜的多個索引,以提高資料連繫性能。HBase只有一個索引—列鍵,透過列鍵連繫或者掃描,從而使得整個系統不會慢下來 * 資料維護:在關聯式資料庫中,更新操作會用最新的當前值去替換記錄中原來的舊值,舊值被覆蓋後就不會存在。而在HBase中執行更新操作時,並不會刪除資料舊的版本,而是生成一個新的版本,舊有的版本仍然保留,經一段時間後,在後台進行清理時,將之刪除 * 可伸縮性:關聯式資料庫很難實現橫向擴展,縱向擴展的空間也比較有限。相反,HBase和BigTable這些分散式資料庫就是為了實現靈活的橫向擴展而開發的,能夠輕易地透過在集群中增加或者減少硬體數量來實現性能的伸縮 **舉例:學生資料庫系統** ![](https://i.imgur.com/MKOkNVZ.png) -HBase資料模式 ![](https://i.imgur.com/yaU0uts.png) **HBase表格架構** * 列鍵(Row Key):在HBase的表格中,每筆記錄用列鍵(Row Key)作為唯一標示,用來檢索記錄,可視為其主。資料儲存時,會按照Row key的順序排列。 * 欄族 (Column Family): 1. 每一列裡的資料是以欄族 (Column Family) 來分組。每個Column Family可有無限數量的 Columns。這裡的欄(Column)又可稱為欄限定詞(Column Qualifier)。 2. Column Family是HBase表格的Schema之一部份,而Column不是。因此,在實際操作每個欄的資料時,欄名之前都要以欄族名稱做為前置詞。格式為 <column family name>:<qualifier> * 單元 (Cell): 1. 儲存在裡面的資料稱為單元值 (Cell’s value) ,同一個Cell內可能存有不同時間戳記的單元值。 2. 而**Row Key+Column Family+Column Qualifier+Time Stamp**組合起來可決定一個單元值。 * 時間戳記 (Time Stamp): 1. 對HBase的表格進行資料的插入、修改、刪除,其結果皆會被記載於表格中,並以時間戳記區分成不同的版本(Version)。因此,這些操作對HBase的表格來說皆為資料插入。 2. 進行資料查詢與處理時,若無指定版本,則以 “時間戳記最新” 的版本的值為基礎,愈新的資料排愈前面。 3. HBase會保留一定數量的版本,而預設數量是3個,此數量可自行設置。當HBase進行表格合併工作時,時間戳記較舊的資料會被刪除。時間戳記可以由HBase自動給定,或是使用者自行設定。 **HBase資料座標** HBase中需要根據列鍵、欄族、欄限定詞和時間戳記來確定一個儲存格。因此,可以視為一個“四維座標”,即[列鍵, 欄族, 欄限定詞, 時間戳記] ![](https://i.imgur.com/ct7qwGO.png) --- ## spark 是基於記憶體計算的巨量資料平行計算框架,可用於建構大型的、低延遲的資料分析應用 ### 基本概念: [concepts](https://www.readfog.com/a/1632012897572982784) * RDD:是 Resilient Distributed Dataset(彈性分佈式數據集)的簡稱,是分佈式內存的一個抽象概念,提供了一種高度受限的共享內存模型。 * DAG:是 Directed Acyclic Graph(有向無環圖)的簡稱,反映 RDD 之間的依賴關係。 * Executor:是運行在工作節點(WorkerNode)的一個進程,負責運行 Task。 * Application:用戶編寫的 Spark 應用程序。 * Task:運行在 Executor 上的工作單元。 * Job:一個 Job 包含多個 RDD 及作用於相應 RDD 上的各種操作。 * Stage:是 Job 的基本調度單位,一個 Job 會分爲多組 Task,每組 Task 被稱爲 Stage,或者也被稱爲 TaskSet,代表了一組關聯的、相互之間沒有 Shuffle 依賴關係的任務組成的任務集。 ### Spark執行架構包括: * 集群資源管理器(Cluster Manager):對整個Spark應用程式進行資源的 分配和管理調度。可以是自帶的CM、Mesos或YARN。 * 工作節點(WorkerNode):用來執行Task的機器 * 任務控制節點(Driver):透過SparkContext這個class讓應用程式與Spark集群對接並進行控管,諸如:程式提交後產生有向無環圖DAG、對DAG分成多個階段、對多個階段進行任務拆解、把拆解後的任務分配到相關的Executor執行。 * 執行行程(Executor):每個工作節點上負責具體任務執行的行程 ![](https://i.imgur.com/uSrUbHB.png) ### RDD概念 RDD本質上是一個唯讀、不可改變的分區記錄集合。 ![](https://i.imgur.com/M63mmd7.png) RDD API 類型: 1. Transformation 2. Action ![](https://i.imgur.com/hnygV4b.png) ```python= from pyspark import SparkContext def main(): with SparkContext(appName='wordcount') with sc: # transformation data = sc.textFile('/input/a.txt').map(lambda x: x[0])\ .flatMap(lambda x: x.split(' '))\ .map(lambda x: (x, 1)).reduceByKey(lambda a, b: (a+b)) # action result = data.collect() for (word, count) in result: print("word:", word, "count:", count) if __name__ == '__main__': main() ``` * Spark的運作核心為RDD 。有許多不同的RDD API * RDD API的局限性 * 沒有Schema * 因為沒有Schema,所以使用者需自已最佳化程式 * 從不同的資料來源讀取資料非常困難 * 合併多個資料來源的資料也很困難 --- ## spark SQL * Spark SQL是Spark中用於結構化資料處理的元件 * 採用DataFrame資料模型 * 帶有Schema的RDD * 可連結多種不同類型的資料來源 * Hive, Avro, Parquet, JSON, JDBC, … * 支援多種程式語言之撰寫 * Scala, Java, Python,SQl-92 … ![](https://i.imgur.com/J3iQofe.png) * 可以透過JDBC連接外部的關聯式資料庫 * Spark SQL是佈署在Spark RDD API與運算框架之上,有兩層: * Catalyst最佳化引擎:根據資料的分佈與特點,用以最佳化SQL指令 * 兩種編寫應用程式的方法:SQL與DataFrame/Dataset API ![](https://i.imgur.com/kj6Kvk3.png) ```python= import org.apache.spark.sql.SparkSession import spark.implicits._ /* * SparkSession在Spark 2.0提出,未來計畫取代SparkContext。 * SparkSession包含了Saprk SQL、Streaming等等元件的進入點,並且支援查詢Hive包含HiveQL或UDF等等。 */ val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() //建立一個DataFrame val df = spark.read.json("examples/src/main/resources/people.json") // 顯示DataFrame資料內容: df.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // 印出DataFrame的schema結構 df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // 只顯示"name"欄位 df.select("name").show() // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // 選出"name"、"age",並且將age欄位的值+1 df.select($"name", $"age" + 1).show() // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ // 選出age欄位值大於25的資料 df.filter($"age" > 25).show() // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ // 以name分群並計算數量 df.groupBy("age").count().show() // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+ //使用Spark SQL進行查詢 // 首先註冊一個DataFrame當作SQL語法會用到的暫存veiw:"employee" df.createOrReplaceTempView("employee") val sqlDF = spark.sql("SELECT * FROM employee") sqlDF.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ ``` --- ## spark streaming ### 串流計算概念 * 串流計算:即時獲取來自不同資料來源的海量資料,經過即時分析處理,獲得有價值的資訊 * 串流計算秉承一個基本理念,即資料的價值隨著時間的流逝而降低,如用戶點擊流。 * 對於一個串流計算系統來說,它應達到如下需求: * 高性能:處理巨量資料的基本要求,如:每秒處理幾十萬條資料 * 海量式:支持TB級甚至是PB級的資料規模 * 即時性:保證較低的延遲時間,達到秒級別,甚至是毫秒級別 * 分散式:支援巨量資料的基本架構,即:必須能夠水準擴展與平行處理 * 易用性:能夠快速進行開發和部署集群及應用 * 可靠性:能可靠地處理串流資料 * 串流計算處理流程 1. 料即時收集 2. 資料即時計算 3. 即時查詢服務 ![](https://i.imgur.com/Us5GaTV.png) * 應用情境: 串流計算適合於需要處理持續到達的串流資料、對資料處理有較 高即時性要求的場景 ### Spark Streaming設計 * Spark Streaming可整合多種輸入資料來源,如Kafka、Flume、HDFS,甚至是普通的CP通訊端。經處理後的資料可儲存至檔案系統、資料庫,或顯示在儀錶板裡 ![](https://i.imgur.com/6ssDPY8.png) * Spark Streaming的基本原理是將即時輸入資料流程以時間片段 (秒級) 為單位進行拆解,然後經Spark引擎以類似批次處理的方式處理每個時間片段資料 ![](https://i.imgur.com/uyiwUw2.png) ### Spark Streaming工作機制 * 在Spark Streaming中,會有一個元件Receiver,作為長期執行的task跑在一個Executor上 * 每個Receiver都會負責一個input Dstream (例如:從檔案、通訊埠、或是從Kafka中讀取資料流…等) * Spark Streaming透過input DStream與不同的外部資料來源連接,以讀取相關資料 ![](https://i.imgur.com/KthiRCN.png) --- ## structured streaming structured streaming = spark streaming + spark SQL * 是將即時資料流視為一個不斷增長、無邊界的輸入表格(Unbounded Input Table),新流進來的資料被持續不斷地添加到表格的末端 * 串流計算可視為在一個靜態表格(DataFrame/DataSet)上的批次處理,Spark會在此無邊界輸入表格上執行計算 * 和Spark SQL 共用大部分API,用戶可以使用Spark SQL 所提供的相關函式(function),來對串流資料進行即時查詢處理。 ![](https://i.imgur.com/rDpQxvy.png) --- ## spark ML ### 巨量資料的機器學習 * 傳統的機器學習演算法,由於技術和單機儲存的限制,只能運用在少量資料的議題上,並依賴於資料抽樣。巨量資料的出現,可以支援在資料母體上進行機器學習 * 機器學習演算法涉及大量反覆運算(迭代計算) * 基於記憶體的Spark比較適合進行大量反覆運算 ### ML pipeline ![](https://i.imgur.com/I7PjfWb.png) ### 基本概念: * DataFrame: * Spark ML是以DataFrame資料集作為處理對象,用它來儲存資料,同時模型的預測結果也是以DataFrame型式表示。 * DataFrame可以儲存各種型態的資料、文字、特徵向量、類別標籤和預測標籤等。 * Estimator: 學習評估器,待訓練的機器學習演算法 * 有很多的學習評估器可使用,如:LogisticRegression(邏輯斯迴歸分類器)、DecisionTreeClassifier(決策樹分類器)、RandomForestClassifier(隨機森林分類器)、NaiveBayes(單純貝氏分類器)、KMeans(K-means聚類器)…等物件。 * 可以利用每個學習評估器中的fit()方法,輸入DataFrame型式的訓練資料,透過對演算法內參數的調整,訓練出符合訓練資料特徵的學習後模型 (即:PipelineModel)。 * Transformer:轉換器,DataFrame轉換為另一個新的DataFrame。 * 特徵轉換器,而Estimator經訓練後的學習後模型,也可被視為Transformer。 * 在spark.ml.feature中有很多不同類型的轉換器可使用,例如: * HashingTF(雜湊轉換器)、StringIndexer(字串索引器)、OneHotEncoder轉換器…等物件,可依不同需求,將某DataFrame資料集轉換成合乎需要的新DataFrame資料集。 * Parameter: * 學習到適合的Parameter主要是用來設置Transformer或Estimator以執行資料分析的任務。 * PipeLine: * 將多個階段(Stage/PipelineStage,即:前述所指的各項轉換器和學習評估器)以形成機器學習的工作流程。 ![](https://i.imgur.com/Dz99bL1.png)