# 4-3. Apache Flink_ - Stream and Batch Processing in a Single Engine ## Introduction * Data-stream processing * 複雜事件處理系統(data stream 無限) * 主要服務於特定應用 * static data processing * batch * data stream 是有限的,且記錄的順序和時間並不重要 * 佔據大部分應用 * data processing * 需要處理隨著時間產生的 data * 當今通常忽略資料產生的連續性和及時性。資料記錄通常被 batch 成靜態資料集(例如,每小時、每日或每月分塊),然後以與時間無關的方式進行處理 * Data collection tools + workflow managers + schedulers 協調 batches 的創建和處理 * Apache Flink * real-time analysis + continuous streams + batch processing * durable message queue * 允許對 data steam 進行幾乎任意的重播 * 3種不同類型的計算 => start their processing at different points in the durable stream * 高度靈活的窗口機制,Flink 程式可以在同一操作中計算早期和大約的結果,以及延遲和精確的結果 * 支持不同的時間概念(event-time, ingestion-time, processing-time) * 提供了一個專門的 API 來處理 static data set * 用於圖形分析和機器學習的 library ## System Architecture Flink 的 API stack: deployment + core + APIs + libraries ![image](https://hackmd.io/_uploads/H1JHLkal0.png) a Flink runtime program 是一個由 data streams + 有狀態 operators 形成的有向無環圖(DAG) * core * distributed dataflow engine, executes dataflow programs * 執行 DataSet 和 DataStream API 創建的 runtime program * 作為抽象有界(batch)和無界(stream)處理的共同基礎 * core API * DataSet API * 用於處理有限的 data set (batch processing) * DataStream API * 用於處理潛在無限的 data stream (stream processing) * 生成 runtime program * 特定領域的 library 和 API * 在 core API 之上 * 生成 DataSet 和 DataStream API 程式 a Flink cluster: client + Job Manager + at least one Task Manager ![image](https://hackmd.io/_uploads/SJNw5kpl0.png) * client * 轉換階段: 接受 code,將其轉換為 data flow 圖,並提交給 Job Manager * 檢查 operator 之間交換的 data 的 data type,並創建序列化 program 和其他類型/模式特定的 code * DataSet program 包含一個基於成本的查詢優化階段 * Job Manager * 協調 data stream 的分布式執行 * 追蹤每個 operator 和 data stream 的狀態和進度,安排新的 operator,並協調和恢復 checkpoint * 在高可用性設置中,Job Manager 在每個 checkpoint 保存一組最小的 metadata 到 fault-tolerant storage,使得備用 Job Manager 可以重建 checkpoint 並從那裡恢復 data stream 執行 * Task Manager * 實際的 data 處理,執行一個或多個產生 data stream 的 operator * 向 Job Manager 報告其狀態 * 維護 buffer pool 以 buffer 或實體化 data stream,並維護網絡以在 operator 之間交換 data stream ## The Common Fabric: Streaming Dataflows 使用者可以使用多種 API 編寫 Flink 程式,但最終都會編譯成一個共同的表示形式: data flow graph,由 Flink 的引擎執行 ### Dataflow Graphs ![image](https://hackmd.io/_uploads/ByPPMIpeA.png) * data flow graph * 一個有向無環圖(DAG) * 有狀態的 operator + 由 operator 產生且供 operator 使用的 data flow * 以 data 並行方式執行 * operator 被並行化為一個或多個 *subtasks* 的並行 * data stream 被劃分為一個或多個 *stream partitions*( one subtask one partition) * operator 實現所有處理邏輯(例如,filters, hash joins, stream window functions) * data stream 以各種模式在 producing 和 consuming operator 之間分配數據(例如, point-to-point, broadcast, re-partition, fan-out, and merge) ### Data Exchange through Intermediate Data Streams ++Flink 的 stream 透過 data 交換機制 + 不同類型的Control Events 來通信++ * Intermediate Data Streams * operator 之間 data 交換的 core abstraction * logical handle: the data 被 produced by an operator 可以被 consumed by one or more operators * 指向的 data 可能會也可能不會在 disk 上實體化 * data stream 的特定行為由 Flink 中的更高層參數化(例如,由 DataSet API 使用的 program optimizer) #### Pipelined and Blocking Data Exchange * Pipelined intermediate streams * 在同時運行的 producers 和 consumers 之間交換 data * intermediate buffer pool: 將 producer 的 back pressure(回饋) 傳遞給 consumer(防止 producer 速度超過 consumer ),以彌補短期 throughput fluctuations * 用於 continuous streaming programs、many parts of batch dataflow,以盡可能避免 data 實體化(儲存) * Blocking streams * 將所有 produce operator 的 data buffer 起來,然後再使其可供 consumer,從而將 produce 和 consume operator 分隔到不同的執行階段 * 需要更多的記憶體,經常溢出到secondary storage,並不傳遞 back pressure * 用於 bounded data streams、隔離連續的 operator、在計劃中包含可能導致分布式 deadlock 的 pipelined operator(例如,sort-merge連接) #### Balancing Latency and Throughput Flink 的 data 交換機制圍繞 buffer 的交換來實施 * 當 producer 的 data record 準備就緒時,它被序列化並分割成一個或多個 buffer (一個 buffer 可以容納多條 record),這些 buffer 可以轉發給 consumer * buffer 發送給 consumer 的時機為 * buffer 滿時 * 達到 timeout 條件時 * 通過將 buffer 的大小設置為較高值來實現高 throughput * 通過將 buffer timeout 設置為較低值來實現 low latency * 隨著 buffer timeout 的增加,latency 與 throughput 的增加,直到達到 full throughput(即 buffer 填充速度快於 timeout) ![image = alt](https://hackmd.io/_uploads/HyUFxZAxC.png =400x) #### Control Events * Control Events * 由 operator 在 data stream 中注入的特殊 event,並按順序與 stream partition 內的所有其他 data record 和 event 一起傳遞 * receiver operator 在這些 event 到達時執行特定動作以作出反應 * checkpoint barriers: 通過將 stream 分為 pre-checkpoint 和 post-checkpoint 來協調 checkpoint * watermarks: 用來表示 stream partition 內事件時間的進度 * iteration barriers: 用來表示 stream partition 已達到 superstep 的結束(在 cyclic dataflows 上的 Bulk/StaleSynchronous-Parallel iterative 算法中) * 需要假設 stream partition 保持 record 的順序 * Flink 保證 consume 單一 stream partition 的一元 operator record 的 FIFO 順序 * receive 多個 stream partition 的 operator 按到達順序合併 stream,以跟上 stream 的速率並避免 back pressure * Flink 中的 streaming dataflows 在任何形式的 repartition 或 broadcast 之後都不提供排序保證,處理無序 record 的責任留給 operator * 大多數 operator 不需要順序(例如,hash-joins、maps),而需要彌補無序到達的 operator(例如,event-time windows),可以作為operator邏輯的一部分更有效地進行 ### Fault Tolerance Flink 提供2個保證 * reliable execution(嚴格的 exactly-once-processing consistency) * 處理故障(通過 checkpoint 和部分 re-execution) 系統為了有效提供保證 * 一般假設為 data source 是 persistent 且 replayable(包括 files 和 durable message queues(例如,Apache Kafka)) * 在實際中,non-persistent source 也可以通過在 source operators 的狀態中保留預寫 log 來納入 Apache Flink 的 checkpointing mechanism * distributed consistent snapshots 以實現 exactly-once-processing 保證 * data stream 的 unbounded 性質 -> 不可能實作恢復時的重新計算,因為可能需要 replay 幾個月的計算來完成長期運行的作業 * 限制恢復時間 -> 定期取得 operators 狀態的 snapshots,包括 input stream 的當前位置 * 挑戰在於在不停止執行的情況下,為所有平行 operators 拍攝一個一致的 snapshots * 本質上,所有 operators 的 snapshots 應該參照計算中的相同邏輯時間 * Flink 使用的機制稱為 Asynchronous Barrier Snapshotting(ABS) * Barriers 是 control records,注入到對應於邏輯時間的 input streams 中,並在邏輯上將 stream 分為將包含在當前 snapshot 中的部分和稍後將 snapshot 的部分 1. operator 從 upstream 接收 barry,進行 alignment 階段,確保已接收來自所有輸入的 barriers 2. operator 將其狀態(例如,sliding window 的內容或自定義 data structure)寫入 durable storage(例如,外部系統 HDFS) 3. 一旦狀態被備份,operator 就將 barry 向 downstream 轉發 4. 最終,所有 operator 將登記其狀態的 snapshot,並完成一個 global snapshot ![image](https://hackmd.io/_uploads/S1GCN4AxA.png) (snapshot t2 包含所有 operator 狀態,這些狀態是在 t2 barry 之前消耗所有 record 的結果) 由於 DAG 結構,ABS 不需要對正在傳輸的 record 進行 checkpoint,而是只依賴於 alignment 階段來將所有產生的結果應用到 operator 狀態。這保證了需要寫入存儲的數據保持在理論最小量(即僅 operator 的 current state) 故障恢復 1. 將所有 operator 狀態恢復到從最後一個成功 snapshot 取得的各自狀態 2. 從存在 snapshot 的最新 barry 開始重新啟動 input stream * 恢復所需的重新計算量限於兩個連續 barry 之間的input records * 此外,通過 replay 直接的 upstream subtask 中被 buffer 的未處理 records,可以部分恢復失敗的 subtask ABS 提供了幾個好處 * 保證了 exactly-once state update,而不會暫停計算 * 它與其他形式的 control message 完全分離(例如,觸發 windows 計算的 event,從而不將 windows 機制限制於 checkpoint 間隔的倍數) * 它與用於 reliable storage 的機制完全分離,允許將狀態備份到文件系統、databases 等,具體取決於 Flink 使用多大的環境 ### Iterative Dataflows Incremental processing 和 iterations 對於應用程序至關重要(例如圖形處理和機器學習) * 在 data-parallel 處理平台中支持 iterations * 依賴於為每次 iteration 提交一個 new job * 向運行中的 DAG 添加額外的 node 或 feedback edge * 在 Flink 中,迭代被實現為 *iteration steps* ![image](https://hackmd.io/_uploads/B1Ss0EAx0.png) * head task 和 tail task 與 feedback edges 連接 * head task 和 tail task 的目的是建立一個 active feedback channel 到 iteration steps,並為在此 feedback channel 內傳輸中的 data record 提供協調 * 實作任何類型的 structured parallel iteration model(例如,Bulk Synchronous Parallel(BSP)model)都需要協調,且使用control event來實現 ## Stream Analytics on Top of Dataflows DataStream API 在 Flink 運行時實現了一個完整的串流分析框架: 管理時間的機制(例如 out of order event 處理、定義 window)、維護和更新用戶定義的狀態 串流 API 基於 DataStream 的概念,即(可能是無界的)給定類型元素的不可變集合 Flink 的運行時已經支持 pipelined data 傳輸、持續的 stateful operator、consistency state update 的容錯機制 * 在其上層叠加一個流處理器本質上歸結為實現一個 window system 和一個 state interface * 這些對運行時是不可見的,運行時只將 window 視為 stateful operator 的一種實現 ### The Notion of Time Flink 區分兩種時間概念 * event-time,即 event 開始的時間(data 被傳入的時間) * 提供最可靠的 semantics,但可能由於 event-time 和 processing-time 的延遲而表現出延遲 * processing-time,即數據被實際處理數據的時間 * 依賴於 local machine clock,因此為較不可靠的時間觀念,這可能導致恢復時重播不一致 * 更低的延遲 * ingestion-time,即 event 進入 Flink 的時間 * 比 event-time 實現了更低的處理延遲,並與 processing-time 相比產生更精確的結果 在分佈式系統中,event-time 和 processing-time 之間存在 arbitrary skew * 從基於 event-time semantics(允許系統根據事件發生的實際時間來處理數據,而不是依賴於數據到達處理系統的時間) 獲取答案有任意延遲 * 為了避免任意延遲,這些系統定期插入 low watermarks 的特殊 event,以標記 grobal 進度 watermarks * 包括一個時間屬性 t,表明所有低於 t 的事件已經進入了某個 operator * 幫助 execution engine 以正確的 event order 處理事件並序列化操作,如透過時間進度中的統一的方法進行 window 計算 * 流程 1. 開始於 topology 的開頭,在這裡確定未來元素中時間訊息 2. 從開頭傳播到 data stream 的其他 operator 3. operator 決定它們如何對 watermarks 作出反應 * 簡單的操作,如 map 或 filter,只是轉發接收到的 watermark, * 複雜的基於 watermarks 進行計算的operator(例如,event time window)首先計算由 watermarks 觸發的結果,然後再轉發它 * 如果一個 operate 有多於一個輸入,系統只向 operator 轉發最小的進入 watermark,從而確保正確的結果 ### Stateful Stream Processing stateful operator * 簡單: counter 或 sum * 複雜: classification tree 或在機器學習應用中常用的 a large sparse matrix * Stream windows: 將 records 指派給記憶體中持續更新的 buckets(operator state 的一部分) state 如何整合到 API 中: * 使用 operator interfaces 或註解在 operator scope 內靜態宣告明確的 local variable * 用於 partitioned key-value states 及其相關操作的 operator-state abstraction * 系統提供的 StateBackend abstraction,配置state的存儲和 checkpoint * 允許在 stream 應用中進行高度靈活的自定義 state 管理 :::success checkpoint 機制保證任何註冊的 state 都是持久的,具有 exactly-once update semantics ::: ### Stream Windows window: logical views(查看 data 的方式)(在 unbounded stream 上的 incremental computation 通常是在不斷演進的logical views 上進行評估) * Flink 在一個 stateful operator 中整合了 window 功能 * 通過 declaration 進行配置 * declaration = window assigner + 可選的 trigger + evictor * 三個功能選擇: 在一組 predefined 實作(例如,sliding time window)中 / 由用戶明確 defined * assigner * 負責將每條 record 分配到 logical window * 例如,這個決定可以基於 record 的 timestamp、在sliding window的情況下,一個元素可以屬於多個 logical window * trigger * 定義與 window 定義相關聯的操作何時執行 * evictor * 確定在每個 window 中保留哪些 record * window 分配過程能夠涵蓋所有已知的 window 類型 * window 功能整合了無序處理 * 可用於實現多種 window: 如果 stream 在進行 window 操作之前已按 key 進行 partition,則 window 操作是 loacl,因此不需要 worker 之間的協調 window 定義 example: 1. ![image](https://hackmd.io/_uploads/S1oNlUCgA.png) * assigner: 範圍為6秒,每2秒滑動一次 * trigger: window 結果在 watermark 通過 window 的結尾時計算 2. ![image](https://hackmd.io/_uploads/Hy2ux80eA.png =200x) * assigner: 定義了一個 grobal window * trigger: window 在每1000個 event 上觸發操作 * evictor: 保留最後100個元素 ### Asynchronous Stream Iterations Loops in streams * feedback loops 不需要協調 * Asynchronous iteration 滿足了 stream 應用的通信需求,並且與基於有限數據的結構化 iterations 的並行優化問題不同 * Flink * 當沒有啟用 iteration 控制機制時,執行模型已經涵蓋了 Asynchronous iteration * 為了符合容錯保證 * feedback stream 被視為 implicit-iteration head operator 內的 operator state * global snapshot的一部分 * DataStream API 允許對 feedback stream 進行 explicit 定義,並包含對串流上 structured loop 以及 progress tracking support ## Batch Analytics on Top of Dataflows 將所有輸入數據插入 window 的 stream program 可以形成一個 batch program,而 batch processing 應完全被 Flink 的上述功能所涵蓋 But, * syntax(即 batch 計算的 API)可以簡化(例如,不需要人為的 global window 定義) * 處理有界 dataset 的程式由額外的優化、更高效的容錯記帳、階段性調度獲得 Flink 對 batch processing 的方法如下: * batch 計算與 stream 計算由相同的 runtime 執行 * 可在 runtime 執行的檔案會使用 blocked data stream 進行參數化,將大型計算分解成隔離的 stages(依次進行排程) * 當周期性 snapshot 的開銷很高時,會關閉周期性 snapshot * 故障恢復可以通過從最新的實體化 intermediate stream(可能是 source)重播丟失的 stream partition 來實現 * Blocking operators(例如,sorts)僅是 block operator,此狀態會直到消耗完所有 input * runtime 不知道 operator 是否 block,這些 operator 使用 Flink 提供的管理內存,如果其輸入超出此記憶體限制,可以溢出到 disk * 一個專用的 DataSet API 為 batch 計算提供abstraction,即具有界容錯的 DataSet t data structure 和對 DataSets 的變換(例如,joins, aggregations, iterations) 查詢優化層將一個 DataSet 程式轉換成一個高效的可執行檔。 ### Query Optimization Flink 優化器 * 借鑑了並行 database 系統的技術: plan equivalence, cost modeling, interesting-property propagation * operator 對優化器隱藏了它們的 semantics * 根據 interesting-property propagation,使用基於成本的方法在 physical plans 中進行選擇 * 成本包括 network、disk I/O 、CPU 成本 * 使用程序員提供的提示克服存在 UDF 時的cardinality 估計問題 ### Memory Management * 基於 database 技術,Flink 將數據序列化到 memory segments 中,而不是在 JVM heap 中分配 object 來表示 buffer 中的使用中的 data record * sorting 和 joining 等操作盡可能直接在 binary data 上進行,將 serialization 和 deserialization 的開銷保持在最低限度,並在需要時將部分 dara 溢出到 disk * 為了處理 arbitrary objects,使用 type inference 和自定義 serialization 機制 通過保持數據處理在 binary representation 和 off-heap,Flink 能夠減少 garbage collection 的開銷,並使用 cache-efficient、robust 的算法,在 memory pressure 擴展 ### Batch Iterations * 通過使用iteration-control events,執行模型允許在其上實現任何類型的 structured iteration logic * 引入了如 delta iterations,利用 sparse 計算 dependencies ## Conclusion Apache Flink,實現了通用 dataflow engine 的平台,設計用來執行 stream and batch 分析 * dataflow engine 將 operator state 和 logical intermediate 由 batch 和 data stream API 使用不同的參數來應用 * stream API 提供了保持 recoverable state 狀態、partition、transform、aggregate data stream window 的方法 * batch 計算特別對待: * Flink 通過使用查詢優化器優化 batch 計算的執行 * 實現在記憶體不足時溢出到 disk 的 block operator