# 4-3. Apache FlinkTM: Stream and Batch Processing in a Single Engine
Apache Flink 是一個處理 streaming data 和 batch data 的開源系統。
Flink 的設計哲學是很多資料處理應用
- real-time analytics
- continuous data pipelines
- historic data processing (batch)
- iterative algorithms (machine learning, graph analysis)
都能用 pipelined fault-tolerant 的 dataflows 表達、執行。
本文介紹 Flink 的架構,並說明如何在單一執行模型下統一(看似多樣的)使用案例。
## 1 Introduction
傳統上,資料流處理(data-stream processing)和靜態(批)資料處理(static/batch data processing)被認為是兩種不同的應用類型,前者用於專門的應用,後者適用大部分的使用案例、資料規模和市場。
然而,如今許多大規模資料處理案例實際上處理的是隨時間持續產生的資料,例如:網站日誌、應用程式日誌、感應器,或是資料庫的交易日誌記錄。現在的系統沒有將資料流視為流動的資料,而是將資料紀錄(通常是人為地)批次化成靜態的資料集,沒有考慮時間特性。這些方法都存在高延遲(因為批次)、高複雜度(連接和協調多個系統,並實現業務邏輯兩次),且存在不準確性,因為應用程式未明確處理時間維度。
Apache Flink 將資料流處理作為實時分析、持續資料流和批處理的統一模型。Flink與允許任意重播(quasi-arbitrary replay)資料流的持久化訊息佇列(如 Apache Kafka 或 Amazon Kinesis)結合,不同類型的計算(即時處理最新事件、在big window中定期連續聚合資料,或處理TB級的歷史資料)都是從持久流中的不同點開始處理,並在計算過程中保持不同形式的狀態。通過高度靈活的視窗機制(windowing mechanism),Flink 可以計算++迅速但近似++以及++延遲但準確++的兩種結果,從而無需為這兩個使用者案例組合不同的系統。此外,Flink 支援不同的時間概念(事件時間event-time、攝取時間ingestion-time、處理時間processing-time),以便為程式設計師在定義事件如何關聯方面提供高度彈性。
同時,Flink承認不論是現在還是將來,專門的批處理仍然是有需求的。針對靜態資料的複雜查詢仍然適合批處理抽象化。此外,在串流使用案例的傳統實作中仍然需要批處理,且目前尚無對這種串流資料處理做分析的有效演算法。
批次程式是串流程式的特例,串流是finite的,且record的順序和時間不重要(所有record隱含地屬於一個all-compassing window)。然而,為了更好的支援批次的使用案例,Flink 提供了專門的 API 來處理靜態資料集,使用專門的資料結構和演算法來處理批次版本的操作,例如join或grouping,並且使用專門的調度策略。因此,Flink 在串流執行時具有完整且高效的批處理功能,包括用於圖形分析和機器學習的函式庫。
:::info
An all-encompassing window, also known as a global window or tumbling window, encompasses all the data within a fixed duration or size, and once the window reaches its boundary, it is closed, and processing occurs on the contents of that window. (by chatpgt)
:::
本文的貢獻如下:
- 為統一串流和批次資料處理的架構提供使用案例,包括僅針對靜態資料集相關的特定最佳化
- 展示了如何將串流、批次、迭代和互動式分析表示為容錯的串流資料流 (fault-tolerant streaming dataflows) [(3)](#3-The-Common-Fabric-Streaming-Dataflows)
- 透過展示如何將串流、批次、迭代和互動式分析表示為串流資料流,我們討論如何在這些資料流的基礎上通過靈活的視窗機制(windowing mechanism)[(4)](#4-Analytics-on-Top-of-Dataflows)建構成熟的串流分析系統及批次處理器[(4.1)](#41-The-Notion-of-Time)。
## 2 System Architecture
:::success
Four main layers
- deployment
- core
- APIs
- libraries
:::

Flink 的核心是分散式資料流引擎(distributed dataflow engine),用於執行資料流程式(dataflow programs)。運行時,程式是一個由stateful operators組成的DAG,由資料流連接。
兩個核心API:
- DataSet API: 處理 finite 資料集 (batch processing)
- DataStream API: 處理可能 unbounded 的資料流 (stream processing)
Flink 核心執行時引擎可以看作是一個串流資料流引擎, DataSet 和 DataStream API 都生成 runtime programs executable,所以是通用的架構。在核心 API 上,Flink 綁定了 domain-specific 的 libraries 和 APIs,來產生 DataSet 和 DataStream API 程式。

一個 Flink cluster 包含三種行程:
- **Client**: 接收程式碼,轉成資料流程圖(dataflow graph),提交給Job Manager
- 轉換時檢查operator間交換的資料的data type (schema)
- 建立serializer和其它的type/schema specific程式碼
- DataSet程式會經過一個額外的cost-based的query最佳化階段,類似關聯式查詢optimizer執行的physical optimzations
- **Job Manager**: 協調資料流的分散式執行
- 追蹤每個operator和stream的state和progress
- schedule新的operator
- coordinate checkpoint
- recovery
- HA配置下,會把每個checkpoint的最小metadata set存到容錯的儲存空間,以便重建checkpoint並從其還原dataflow execution
- **Task Manager**(至少一個): 做實際的資料處理
- 執行一或多個operator
- operator產生stream,向Job Manager回報自己的狀態
- 維護buffer pool,用以buffer或實現stream
- 維護網路連接,以便在operator間交換資料流
## 3 The Common Fabric: Streaming Dataflows
雖然使用者可以用多樣的API撰寫Flink程式,但最終都會被編譯成一個通用的表示形式:資料流程圖。
### 3.1 Dataflow Graphs

資料流程圖是一個有向無環圖(DAG)
- **有狀態的運算子 (stateful operators)**
- 特殊情況下可以是無狀態的
- 實作所有處理邏輯,例如filter、hash joins和stream window function)
- 通常是常見演算法的教科書版實作
- **資料流**
- 由運算子產生的資料
- 可被運算子消費
- 分配資料的模式有很多種,像是point-to-point、broadcast、re-partition、fan-out和merge
因為資料流程圖是以data-parallel的方式執行
- 運算子被平行化為一個或多個稱為 *subtask* 的平行執行實例
- 流被分為一或多個 *stream partitions*
每個 producing subtask 對應一個 partition。
### 3.2 Data Exchange through Intermediate Data Streams
Flink的中間資料流(Intermediate data stream)是運算子間資料交換的核心抽象,代表由運算子產生並可供一個或多個運算子消費的資料的邏輯處理方式。中間資料流在某種意義上是邏輯概念的,它們指向的資料可能是在磁碟上,也可能不在磁碟上。資料流的特定行為由flink的更高層參數指定(例如DataSet API使用的program optimizer)。
#### Pipelined and Blocking Data
**管線化中間流(Pipelined intermediate stream)**
- 在同時運作的生產者和消費者之間交換資料,實現管線化執行
- 透過intermediate buffer pool的彈性調節,從消費者向生產者傳播回壓(backpressure),以補償短期吞吐量波動
- 用於連續流程式,及批次資料流程的許多部分,以避免可能的實體化
**阻塞流(Blocking stream)**
- 適用於有界的資料流
- 先buffer生產運算子的所有資料才提供消費,將生產運算子和消費運算子分隔到不同的執行階段
- 需要更多記憶體,經常溢出到次要儲存,且不傳播回壓
- 阻塞流在需要時可用於
- 將連續運算子(successive operator)互相隔離
- 需要管線破壞運算子(pipline-breaking operator)時
- 例如sort-merge join可能導致distibuted deadlock的情況
#### Balancing Latency and Throughput
Flink 的資料交換機制是圍繞著**buffer的交換**而設計的。當生產者端準備好一個record時,該record會被序列化並拆分為一個或多個buffer(一個buffer也可以容納多個record),然後轉發給消費者。
Buffer發送給消費者的時機:
- Buffer滿了
- 達成timeout條件
因此
- 大的 buffer size -> 高 throughput
- 小的 buffer timeout -> 低 latency

隨著buffer timeout時間的增加,latency隨著throughput的增長變大,直到達成full throughput(buffer填滿的速度比timeout expire的速度快)
#### Control Events
除了交換資料,Flink的stream也傳達不同類型的控制事件。運算子在資料流注入特別事件,這些事件也和其他的資料record及event一樣,在stream partition中照順序傳遞。
Flink 使用很多特別型態的控制事件:
- ***checkpoint barriers***: 透過將stream分成pre-checkpoint和post-checkpoint,協調checkpoint [(3.3)](#33-Fault-Tolerance)
- ***watermarks***: 表示stream partition內的事件時間進度 [(4.1)](#41-The-Notion-of-Time)
- ***iteration barriers***: 表示在循環資料流的 Bulk/Stale-Synchronous-Parallel 迭代算法中,stream partition已到superstep結尾 [(5.3)](#53-Batch-Iterations)
控制事件假設stream partition保留了record的順序。
- 僅消費單一stream partition的一元運算子保證record的FIFO順序
- 接收不只一個stream partition的運算子會用抵達順序merge收到的stream,跟上流的速率,避免回壓
因此,在任何形式的repartition或broadcast後,Flink中的串流資料流不提供順序性的保證。
**處理record無序的問題交由運算子去實作**
這個安排是最有效率的設計,大部分的運算子不要求確定的順序(e.g., hash joins, maps),而那些需要對無序抵達做出補償的運算子(e.g., event-time windows)可以比把這個問題考慮進運算子邏輯中更有效率的完成。
### 3.3 Fault Tolerance
Flink 保證
- 可靠的執行:嚴格的exactly-once-processing一致性
- 故障處理:checkpoint和部分重新執行
讓系統有效保證這些的前提:
- persistent且可重播的data source
- files
- durable message queues (e.g., Apache Kafka)
實務上,non-persistent的來源也可以透過在來源運算子(source operator)的狀態中記錄write-ahead log納入保證。
**Checkpoint 機制**
建立在在分散式一致性快照上,達成exactly-once-processing的保證。
資料流的無界特性讓還原並重新計算的作法變的不切實際...
- 可能需要為長期運行的任務重播數月的資料,並重新計算
為了限制復原的時長,Flink **對運算子狀態做快照,定期儲存目前輸入流的位置**。
核心挑戰:如何在不停止任務執行的情況下,製作所有平行運算子的一致性快照?讓所有運算子的快照應該指向計算中的相同邏輯時間。
**Asynchronous Barrier Snapshotting (ABS)**
**Barrier**: 注入輸入流的控制 record
- 有對應的邏輯時間
- 邏輯上將stream分成
- 會影響到當前快照的部分
- 後續快照才會包含的部分
運算子從上游收到barrier:
1. 進入alignment階段,確保已接收來自所有上游的barrier
2. 將自己的狀態寫入至持久性儲存
- sliding window內容
- 客製的資料結構
3. 將barrier轉發給下游
最終,所有運算子會註冊他們狀態的快照,完成一個全域的快照。

**故障復原**:將所有運算子的狀態還原到成功的最後一次快照,並重啟輸入流到有快照的最新barrier。
- 最大的重新計算量:兩個連續barrier之間的record量。
- 恢復失敗的子任務
- 額外重播直接上游子任務buffer的未處理的記錄
ABS 帶來的好處:
- exactly-once state updates,且不需要暫停計算
- 與其他形式的控制消息完全解耦
- 例:透過觸發window計算的事件,從而不將window機制限制在檢查點間隔的倍數範圍內
- 與用於可靠存儲的機制完全解耦,允許狀態備份到文件系統、資料庫等
### 3.4 Iterative Dataflows
增量處理(incremental processing)和迭代對應用(e.g., graph processing and ML)來說是至關重要的。
資料並行計算平台對於迭代計算的支持,通常是靠
- 為每個iteration提交一個新的job
- 在運行中的DAG加上額外的節點或回饋邊(feedback edge)
***Iteration steps***: Flink中的迭代實作,特殊的運算子本身就包含一個execution graph。

Flink允許迭代的Head和Tail隱性的用回饋邊連結
- 目的:維護DAG-based runtime和scheduler
- 作用:為iteration step建立一個主動回饋通道,並為處理該回饋通道內傳輸的資料記錄提供協調
- 實現任何類型的結構化並行迭代模型(例如Bulk Synchronous Parallel (BSP) model)都需要協調,並且使用控制事件
## 4 Stream Analytics on Top of Dataflows
Flink 的 DataStream API 在 Flink runtime 上實現了完整的流分析框架,包括管理時間的機制,例如:
- 處理無序事件
- 定義window
- 維護和更新用戶定義的狀態
由於 Flink runtime 已經支持管線化的資料傳輸、連續的有狀態的運算子,以及用於一致狀態更新的容錯機制,因此在其上覆蓋流處理器基本上就是實現一個windowing system和state interface。這些在runtime是不可見的,它們將window視為有狀態的運算子的一種實現。
### 4.1 The Notion of Time
Flink有兩種時間的概念
- **事件時間(event-time)**: 事件產生的時間
- **處理時間(processing-time)**: 機器處理資料的實際時間
在分散式系統中,事件時間和處理時間之間存在著任意的偏移,導致任意的延遲。
***low watermarks***
- 系統定期插入的特殊事件,標記global progress
- 目的:避免任意的延遲
- 舉例:time attribute *t* 代表所有小於t時間的事件都已進入運算子
- 透過統一的進度衡量,協助執行引擎按照正確的順序處理事件,並對window computation等操作進行序列化
運算子決定如何對水印(watermarks)作出反應
- 簡單的操作 (map/filter):轉發
- 以水印進行計算的複雜運算子 (event-time windows):計算由水印觸發的結果,再轉發
如果一個運算子有多個輸入,系統只會轉發接收到的水印最小值,從而確保正確的結果。
兩種時間概念的特性
- **處理時間(processing-time)**
- local machine clocks
- 對時間的認知不可靠
- 恢復時可能導致不一致的replay
- 較低的延遲
- **事件時間(event-time)**
- 提供最可靠的語義
- 因為event-time-processing-time lag而延遲
事件時間的特例
- **攝入時間(ingestion-time)**
- 事件進入 Flink 的時間
- 比事件時間具有更低的處理延遲
- 可以獲得比處理時間更準確的結果
### 4.2 Stateful Stream Processing
在Flink中,狀態是明確的,並透過提供以下方式合併到API中:
- 運算子介面或註解,用於在運算子範圍內靜態註冊明確局部變數
- 運算子狀態抽象化,用於宣告分區key-value states及其相關運算
使用者也可以使用系統提供的 StateBackend 抽象來配置狀態的儲存和checkpoint方式,從而在串流應用中實現靈活的自訂狀態管理。Flink 的檢查點機制[(3.3)](#33-Fault-Tolerance)保證任何註冊狀態都具有一次性更新語義。
### 4.3 Stream Windows
Flink將windowing合併到有狀態的運算子,透過三個核心函式進行靈活宣告組成:
- assigner: 將每個record分配到logical window
- trigger: 定義何時執行和window定義相關的操作
- evictor: 決定每個window要保留哪些reocrds
其中只有assigner是必要的。
Flink的window分配過程是獨特的,能涵蓋所有已知的視窗類型
- periodic time windows
- count windows
- punctuation windows
- landmark windows
- session windows
- delta windows
Flink 的視窗功能無縫地結合了無序處理
:::info
```
stream
.window(SlidingTimeWindows.of(Time.of(6, SECONDS), Time.of(2, SECONDS))
.trigger(EventTimeTrigger.create())
```
- assigner: 6s的window,每2s滑動
- trigger: 一旦水印通過window的末端時,window的結果被計算出來
:::
:::info
A global window creates a single logical group.
```
stream
.window(GlobalWindow.create())
.trigger(Count.of(1000))
.evict(Count.of(100))
```
- assigner: global window
- trigger: 每 1000 個事件呼叫操作
- evictor: 保留最後 100 個元素
:::
注意:若stream已按key分區,那麼上述的window操作就是local的,不需要worker間的協調。這個機制可用來實作多種windowing功能。
### 4.4 Asynchronous Stream Iterations
- 大多數情況下,回饋迴圈(feedback loops)不需要協調
- 非同步迭代涵蓋了串流應用的通訊需求,與基於有限資料的結構化迭代的並行最佳化問題有所不同
- Flink的執行模型已經涵蓋了非同步迭代,在沒有啟用迭代控制機制的情況下如 [(3.4) 圖六](#34-Iterative-Dataflows) 所示
- 為了遵守容錯保證,feedback streams被視為implicit-iteration head運算子內的運算子狀態,同時也是全域快照的一部分
- DataStream API 允許對feedback streams做明確的定義,並且可以輕鬆支援對流進行結構化迴圈的操作,同時支持進度追蹤
## 5 Batch Analytics on Top of Dataflows
有界資料集是無界資料流的特例。批次程式(batch program)可由在一個window中插入所有輸入資料的串流程式(stream program)形成,而批處理(batch processing)應被上述所提到的Flink特性涵蓋。
然而,
1. 語法可被簡化 (batch計算的API)
- 不需要人為的global window定義
2. 處理有界的資料集的程式可以做額外的最佳化
- 為容錯提供更有效的記錄
- staged scheduling
Flink 的批處理方式如下:
- 批次計算和串流計算都是由同樣的runtime執行。Runtime執行檔可使用阻塞資料流進行參數化,以將大型計算分解為連續排程的isolated stages。
- 當overhead大時,關閉定期快照。錯誤恢復轉而使用重播丟失的stream partitions,從最新的materialized intermediate stream開始(可能是資料來源)
- 阻塞運算子(例如sort)會阻塞直到消費完他們全部的輸入為止。Runtime部會察覺他們是否阻塞。這些運算子使用Flink提供的managed memory,若超過記憶體界線則外溢到磁碟。
- 專用的DataSet API提供批次計算熟悉的的抽象,也就是一個有界的容錯DataSet資料結構和DataSet轉換,例如join、aggregation、iterations。
- Query最佳化層將DataSet程式轉換成高效的可執行檔。
### 5.1 Query Optimization
Flink 的最佳化器建立在平行資料庫系統的技術上
- plan equivalence
- cost modeling
- interesting-property propagation
然而,因為運算子對最佳化器隱藏了他們的語義
- 構成 Flink 資料流程式的任意 UDF-heavy DAG 不允許傳統最佳化器使用開箱即用的資料庫技術
- cardinality 和 cost-estimation 方法同樣難以使用
Flink 的 runtime 支援多樣的執行策略,包含
- repartition
- broadcast data transfer
- sort-based grouping
- sort- and hash-based join implementations
Flink 的最佳化器
- 基於 interesting properties propagation 的概念列舉不同的 physical plan
- 使用基於成本的方法進行選擇
- 成本包含網路、磁碟IO、CPU
- 可以使用程式設計師提供的提示
- 克服 UDF 存在時的cardinality estimation問題
### 5.2 Memory Management
- 基於資料庫技術,Flink 將資料序列化到記憶體段中,而不是在 JVM heap 中 allocate 物件來表示 buffered 的運行中資料記錄。
- 像是 sorting 和 joining 的操作會盡可能地直接按二進位資料操作,保持序列化與反序列化的開銷在最小值,需要時則溢出到磁碟。
- 為了處理任意的物件,Flink 使用 type inference 和自訂序列化的機制。
- 透過**將資料處理保持在二進位表示和 heap 外**,Flink 設法**減少garbage collection的開銷,使用 cache-efficient 和健全演算法**,在記憶體壓力下優雅地擴展。
### 5.3 Batch Iterations
- 過去,迭代圖分析、平行梯度下降和最佳化技術已在Bulk Synchronous Parallel (BSP)和Stale Synchronous Parallel (SSP)等模型之上實施
- Flink 的執行模型允許透過使用iteration-control event在上面實現任何類型的結構化迭代邏輯
- 例如:在 BSP 執行中,iteration-control event標記迭代計算中superstep的開始和結束
- Flink 引入了更多新穎的最佳化技術
- 例如:Delta 迭代的概念,可利用sparse computational dependencies
## Conclusions
- 介紹 Apache Flink,一個實現通用資料流引擎的平台,旨在執行stream analytics和batch analytics。
- Flink 的資料流引擎將運算子狀態和logical intermediate result視為一等公民,並由具有不同參數的批次和資料流 API 使用。
- 建構在 Flink streaming 資料流引擎之上的 streaming API 提供了保持可恢復狀態以及分區、轉換和聚合資料流window的方法。
- 雖然理論上批量計算是串流計算的一種特例,但 Flink 對它們進行了特殊處理,透過使用Query最佳化器對其執行最佳化,並實現在沒有記憶體的情況下優雅地溢出到磁碟的阻塞運算子。
## References
本文
https://www.diva-portal.org/smash/get/diva2:1059537/FULLTEXT01.pdf
翻譯
https://www.victorchu.info/posts/f9aa8120/