# 4-3. Apache Flink_ - Stream and Batch Processing in a Single Engine
## Introduction
* Data-stream processing
* 複雜事件處理系統(data stream 無限)
* 主要服務於特定應用
* static data processing
* batch
* data stream 是有限的,且記錄的順序和時間並不重要
* 佔據大部分應用
* data processing
* 需要處理隨著時間產生的 data
* 當今通常忽略資料產生的連續性和及時性。資料記錄通常被 batch 成靜態資料集(例如,每小時、每日或每月分塊),然後以與時間無關的方式進行處理
* Data collection tools + workflow managers + schedulers 協調 batches 的創建和處理
* Apache Flink
* real-time analysis + continuous streams + batch processing
* durable message queue
* 允許對 data steam 進行幾乎任意的重播
* 3種不同類型的計算 => start their processing at different points in the durable stream
* 高度靈活的窗口機制,Flink 程式可以在同一操作中計算早期和大約的結果,以及延遲和精確的結果
* 支持不同的時間概念(event-time, ingestion-time, processing-time)
* 提供了一個專門的 API 來處理 static data set
* 用於圖形分析和機器學習的 library
## System Architecture
Flink 的 API stack: deployment + core + APIs + libraries

a Flink runtime program 是一個由 data streams + 有狀態 operators 形成的有向無環圖(DAG)
* core
* distributed dataflow engine, executes dataflow programs
* 執行 DataSet 和 DataStream API 創建的 runtime program
* 作為抽象有界(batch)和無界(stream)處理的共同基礎
* core API
* DataSet API
* 用於處理有限的 data set (batch processing)
* DataStream API
* 用於處理潛在無限的 data stream (stream processing)
* 生成 runtime program
* 特定領域的 library 和 API
* 在 core API 之上
* 生成 DataSet 和 DataStream API 程式
a Flink cluster: client + Job Manager + at least one Task Manager

* client
* 轉換階段: 接受 code,將其轉換為 data flow 圖,並提交給 Job Manager
* 檢查 operator 之間交換的 data 的 data type,並創建序列化 program 和其他類型/模式特定的 code
* DataSet program 包含一個基於成本的查詢優化階段
* Job Manager
* 協調 data stream 的分布式執行
* 追蹤每個 operator 和 data stream 的狀態和進度,安排新的 operator,並協調和恢復 checkpoint
* 在高可用性設置中,Job Manager 在每個 checkpoint 保存一組最小的 metadata 到 fault-tolerant storage,使得備用 Job Manager 可以重建 checkpoint 並從那裡恢復 data stream 執行
* Task Manager
* 實際的 data 處理,執行一個或多個產生 data stream 的 operator
* 向 Job Manager 報告其狀態
* 維護 buffer pool 以 buffer 或實體化 data stream,並維護網絡以在 operator 之間交換 data stream
## The Common Fabric: Streaming Dataflows
使用者可以使用多種 API 編寫 Flink 程式,但最終都會編譯成一個共同的表示形式: data flow graph,由 Flink 的引擎執行
### Dataflow Graphs

* data flow graph
* 一個有向無環圖(DAG)
* 有狀態的 operator + 由 operator 產生且供 operator 使用的 data flow
* 以 data 並行方式執行
* operator 被並行化為一個或多個 *subtasks* 的並行
* data stream 被劃分為一個或多個 *stream partitions*( one subtask one partition)
* operator 實現所有處理邏輯(例如,filters, hash joins, stream window functions)
* data stream 以各種模式在 producing 和 consuming operator 之間分配數據(例如, point-to-point, broadcast, re-partition, fan-out, and merge)
### Data Exchange through Intermediate Data Streams
++Flink 的 stream 透過 data 交換機制 + 不同類型的Control Events 來通信++
* Intermediate Data Streams
* operator 之間 data 交換的 core abstraction
* logical handle: the data 被 produced by an operator 可以被 consumed by one or more operators
* 指向的 data 可能會也可能不會在 disk 上實體化
* data stream 的特定行為由 Flink 中的更高層參數化(例如,由 DataSet API 使用的 program optimizer)
#### Pipelined and Blocking Data Exchange
* Pipelined intermediate streams
* 在同時運行的 producers 和 consumers 之間交換 data
* intermediate buffer pool: 將 producer 的 back pressure(回饋) 傳遞給 consumer(防止 producer 速度超過 consumer ),以彌補短期 throughput fluctuations
* 用於 continuous streaming programs、many parts of batch dataflow,以盡可能避免 data 實體化(儲存)
* Blocking streams
* 將所有 produce operator 的 data buffer 起來,然後再使其可供 consumer,從而將 produce 和 consume operator 分隔到不同的執行階段
* 需要更多的記憶體,經常溢出到secondary storage,並不傳遞 back pressure
* 用於 bounded data streams、隔離連續的 operator、在計劃中包含可能導致分布式 deadlock 的 pipelined operator(例如,sort-merge連接)
#### Balancing Latency and Throughput
Flink 的 data 交換機制圍繞 buffer 的交換來實施
* 當 producer 的 data record 準備就緒時,它被序列化並分割成一個或多個 buffer (一個 buffer 可以容納多條 record),這些 buffer 可以轉發給 consumer
* buffer 發送給 consumer 的時機為
* buffer 滿時
* 達到 timeout 條件時
* 通過將 buffer 的大小設置為較高值來實現高 throughput
* 通過將 buffer timeout 設置為較低值來實現 low latency
* 隨著 buffer timeout 的增加,latency 與 throughput 的增加,直到達到 full throughput(即 buffer 填充速度快於 timeout)

#### Control Events
* Control Events
* 由 operator 在 data stream 中注入的特殊 event,並按順序與 stream partition 內的所有其他 data record 和 event 一起傳遞
* receiver operator 在這些 event 到達時執行特定動作以作出反應
* checkpoint barriers: 通過將 stream 分為 pre-checkpoint 和 post-checkpoint 來協調 checkpoint
* watermarks: 用來表示 stream partition 內事件時間的進度
* iteration barriers: 用來表示 stream partition 已達到 superstep 的結束(在 cyclic dataflows 上的 Bulk/StaleSynchronous-Parallel iterative 算法中)
* 需要假設 stream partition 保持 record 的順序
* Flink 保證 consume 單一 stream partition 的一元 operator record 的 FIFO 順序
* receive 多個 stream partition 的 operator 按到達順序合併 stream,以跟上 stream 的速率並避免 back pressure
* Flink 中的 streaming dataflows 在任何形式的 repartition 或 broadcast 之後都不提供排序保證,處理無序 record 的責任留給 operator
* 大多數 operator 不需要順序(例如,hash-joins、maps),而需要彌補無序到達的 operator(例如,event-time windows),可以作為operator邏輯的一部分更有效地進行
### Fault Tolerance
Flink 提供2個保證
* reliable execution(嚴格的 exactly-once-processing consistency)
* 處理故障(通過 checkpoint 和部分 re-execution)
系統為了有效提供保證
* 一般假設為 data source 是 persistent 且 replayable(包括 files 和 durable message queues(例如,Apache Kafka))
* 在實際中,non-persistent source 也可以通過在 source operators 的狀態中保留預寫 log 來納入
Apache Flink 的 checkpointing mechanism
* distributed consistent snapshots 以實現 exactly-once-processing 保證
* data stream 的 unbounded 性質 -> 不可能實作恢復時的重新計算,因為可能需要 replay 幾個月的計算來完成長期運行的作業
* 限制恢復時間 -> 定期取得 operators 狀態的 snapshots,包括 input stream 的當前位置
* 挑戰在於在不停止執行的情況下,為所有平行 operators 拍攝一個一致的 snapshots
* 本質上,所有 operators 的 snapshots 應該參照計算中的相同邏輯時間
* Flink 使用的機制稱為 Asynchronous Barrier Snapshotting(ABS)
* Barriers 是 control records,注入到對應於邏輯時間的 input streams 中,並在邏輯上將 stream 分為將包含在當前 snapshot 中的部分和稍後將 snapshot 的部分
1. operator 從 upstream 接收 barry,進行 alignment 階段,確保已接收來自所有輸入的 barriers
2. operator 將其狀態(例如,sliding window 的內容或自定義 data structure)寫入 durable storage(例如,外部系統 HDFS)
3. 一旦狀態被備份,operator 就將 barry 向 downstream 轉發
4. 最終,所有 operator 將登記其狀態的 snapshot,並完成一個 global snapshot

(snapshot t2 包含所有 operator 狀態,這些狀態是在 t2 barry 之前消耗所有 record 的結果)
由於 DAG 結構,ABS 不需要對正在傳輸的 record 進行 checkpoint,而是只依賴於 alignment 階段來將所有產生的結果應用到 operator 狀態。這保證了需要寫入存儲的數據保持在理論最小量(即僅 operator 的 current state)
故障恢復
1. 將所有 operator 狀態恢復到從最後一個成功 snapshot 取得的各自狀態
2. 從存在 snapshot 的最新 barry 開始重新啟動 input stream
* 恢復所需的重新計算量限於兩個連續 barry 之間的input records
* 此外,通過 replay 直接的 upstream subtask 中被 buffer 的未處理 records,可以部分恢復失敗的 subtask
ABS 提供了幾個好處
* 保證了 exactly-once state update,而不會暫停計算
* 它與其他形式的 control message 完全分離(例如,觸發 windows 計算的 event,從而不將 windows 機制限制於 checkpoint 間隔的倍數)
* 它與用於 reliable storage 的機制完全分離,允許將狀態備份到文件系統、databases 等,具體取決於 Flink 使用多大的環境
### Iterative Dataflows
Incremental processing 和 iterations 對於應用程序至關重要(例如圖形處理和機器學習)
* 在 data-parallel 處理平台中支持 iterations
* 依賴於為每次 iteration 提交一個 new job
* 向運行中的 DAG 添加額外的 node 或 feedback edge
* 在 Flink 中,迭代被實現為 *iteration steps*

* head task 和 tail task 與 feedback edges 連接
* head task 和 tail task 的目的是建立一個 active feedback channel 到 iteration steps,並為在此 feedback channel 內傳輸中的 data record 提供協調
* 實作任何類型的 structured parallel iteration model(例如,Bulk Synchronous Parallel(BSP)model)都需要協調,且使用control event來實現
## Stream Analytics on Top of Dataflows
DataStream API 在 Flink 運行時實現了一個完整的串流分析框架: 管理時間的機制(例如 out of order event 處理、定義 window)、維護和更新用戶定義的狀態
串流 API 基於 DataStream 的概念,即(可能是無界的)給定類型元素的不可變集合
Flink 的運行時已經支持 pipelined data 傳輸、持續的 stateful operator、consistency state update 的容錯機制
* 在其上層叠加一個流處理器本質上歸結為實現一個 window system 和一個 state interface
* 這些對運行時是不可見的,運行時只將 window 視為 stateful operator 的一種實現
### The Notion of Time
Flink 區分兩種時間概念
* event-time,即 event 開始的時間(data 被傳入的時間)
* 提供最可靠的 semantics,但可能由於 event-time 和 processing-time 的延遲而表現出延遲
* processing-time,即數據被實際處理數據的時間
* 依賴於 local machine clock,因此為較不可靠的時間觀念,這可能導致恢復時重播不一致
* 更低的延遲
* ingestion-time,即 event 進入 Flink 的時間
* 比 event-time 實現了更低的處理延遲,並與 processing-time 相比產生更精確的結果
在分佈式系統中,event-time 和 processing-time 之間存在 arbitrary skew
* 從基於 event-time semantics(允許系統根據事件發生的實際時間來處理數據,而不是依賴於數據到達處理系統的時間) 獲取答案有任意延遲
* 為了避免任意延遲,這些系統定期插入 low watermarks 的特殊 event,以標記 grobal 進度
watermarks
* 包括一個時間屬性 t,表明所有低於 t 的事件已經進入了某個 operator
* 幫助 execution engine 以正確的 event order 處理事件並序列化操作,如透過時間進度中的統一的方法進行 window 計算
* 流程
1. 開始於 topology 的開頭,在這裡確定未來元素中時間訊息
2. 從開頭傳播到 data stream 的其他 operator
3. operator 決定它們如何對 watermarks 作出反應
* 簡單的操作,如 map 或 filter,只是轉發接收到的 watermark,
* 複雜的基於 watermarks 進行計算的operator(例如,event time window)首先計算由 watermarks 觸發的結果,然後再轉發它
* 如果一個 operate 有多於一個輸入,系統只向 operator 轉發最小的進入 watermark,從而確保正確的結果
### Stateful Stream Processing
stateful operator
* 簡單: counter 或 sum
* 複雜: classification tree 或在機器學習應用中常用的 a large sparse matrix
* Stream windows: 將 records 指派給記憶體中持續更新的 buckets(operator state 的一部分)
state 如何整合到 API 中:
* 使用 operator interfaces 或註解在 operator scope 內靜態宣告明確的 local variable
* 用於 partitioned key-value states 及其相關操作的 operator-state abstraction
* 系統提供的 StateBackend abstraction,配置state的存儲和 checkpoint
* 允許在 stream 應用中進行高度靈活的自定義 state 管理
:::success
checkpoint 機制保證任何註冊的 state 都是持久的,具有 exactly-once update semantics
:::
### Stream Windows
window: logical views(查看 data 的方式)(在 unbounded stream 上的 incremental computation 通常是在不斷演進的logical views 上進行評估)
* Flink 在一個 stateful operator 中整合了 window 功能
* 通過 declaration 進行配置
* declaration = window assigner + 可選的 trigger + evictor
* 三個功能選擇: 在一組 predefined 實作(例如,sliding time window)中 / 由用戶明確 defined
* assigner
* 負責將每條 record 分配到 logical window
* 例如,這個決定可以基於 record 的 timestamp、在sliding window的情況下,一個元素可以屬於多個 logical window
* trigger
* 定義與 window 定義相關聯的操作何時執行
* evictor
* 確定在每個 window 中保留哪些 record
* window 分配過程能夠涵蓋所有已知的 window 類型
* window 功能整合了無序處理
* 可用於實現多種 window: 如果 stream 在進行 window 操作之前已按 key 進行 partition,則 window 操作是 loacl,因此不需要 worker 之間的協調
window 定義 example:
1. 
* assigner: 範圍為6秒,每2秒滑動一次
* trigger: window 結果在 watermark 通過 window 的結尾時計算
2. 
* assigner: 定義了一個 grobal window
* trigger: window 在每1000個 event 上觸發操作
* evictor: 保留最後100個元素
### Asynchronous Stream Iterations
Loops in streams
* feedback loops 不需要協調
* Asynchronous iteration 滿足了 stream 應用的通信需求,並且與基於有限數據的結構化 iterations 的並行優化問題不同
* Flink
* 當沒有啟用 iteration 控制機制時,執行模型已經涵蓋了 Asynchronous iteration
* 為了符合容錯保證
* feedback stream 被視為 implicit-iteration head operator 內的 operator state
* global snapshot的一部分
* DataStream API 允許對 feedback stream 進行 explicit 定義,並包含對串流上 structured loop 以及 progress tracking support
## Batch Analytics on Top of Dataflows
將所有輸入數據插入 window 的 stream program 可以形成一個 batch program,而 batch processing 應完全被 Flink 的上述功能所涵蓋
But,
* syntax(即 batch 計算的 API)可以簡化(例如,不需要人為的 global window 定義)
* 處理有界 dataset 的程式由額外的優化、更高效的容錯記帳、階段性調度獲得
Flink 對 batch processing 的方法如下:
* batch 計算與 stream 計算由相同的 runtime 執行
* 可在 runtime 執行的檔案會使用 blocked data stream 進行參數化,將大型計算分解成隔離的 stages(依次進行排程)
* 當周期性 snapshot 的開銷很高時,會關閉周期性 snapshot
* 故障恢復可以通過從最新的實體化 intermediate stream(可能是 source)重播丟失的 stream partition 來實現
* Blocking operators(例如,sorts)僅是 block operator,此狀態會直到消耗完所有 input
* runtime 不知道 operator 是否 block,這些 operator 使用 Flink 提供的管理內存,如果其輸入超出此記憶體限制,可以溢出到 disk
* 一個專用的 DataSet API 為 batch 計算提供abstraction,即具有界容錯的 DataSet t data structure 和對 DataSets 的變換(例如,joins, aggregations, iterations)
查詢優化層將一個 DataSet 程式轉換成一個高效的可執行檔。
### Query Optimization
Flink 優化器
* 借鑑了並行 database 系統的技術: plan equivalence, cost modeling, interesting-property propagation
* operator 對優化器隱藏了它們的 semantics
* 根據 interesting-property propagation,使用基於成本的方法在 physical plans 中進行選擇
* 成本包括 network、disk I/O 、CPU 成本
* 使用程序員提供的提示克服存在 UDF 時的cardinality 估計問題
### Memory Management
* 基於 database 技術,Flink 將數據序列化到 memory segments 中,而不是在 JVM heap 中分配 object 來表示 buffer 中的使用中的 data record
* sorting 和 joining 等操作盡可能直接在 binary data 上進行,將 serialization 和 deserialization 的開銷保持在最低限度,並在需要時將部分 dara 溢出到 disk
* 為了處理 arbitrary objects,使用 type inference 和自定義 serialization 機制
通過保持數據處理在 binary representation 和 off-heap,Flink 能夠減少 garbage collection 的開銷,並使用 cache-efficient、robust 的算法,在 memory pressure 擴展
### Batch Iterations
* 通過使用iteration-control events,執行模型允許在其上實現任何類型的 structured iteration logic
* 引入了如 delta iterations,利用 sparse 計算 dependencies
## Conclusion
Apache Flink,實現了通用 dataflow engine 的平台,設計用來執行 stream and batch 分析
* dataflow engine 將 operator state 和 logical intermediate 由 batch 和 data stream API 使用不同的參數來應用
* stream API 提供了保持 recoverable state 狀態、partition、transform、aggregate data stream window 的方法
* batch 計算特別對待:
* Flink 通過使用查詢優化器優化 batch 計算的執行
* 實現在記憶體不足時溢出到 disk 的 block operator