# 4-2 Consistency and Completeness: Rethinking Distributed Stream Processing in Apache Kafka https://365nthu-my.sharepoint.com/personal/112062562_office365_nthu_edu_tw/_layouts/15/doc.aspx?sourcedoc={9747953e-d055-4f10-be8a-e47e193e1dfe}&action=edit ## ABSTRACT ## 1. Introduction :::danger Apache Kafka在Stream Processing的應用 ::: #### 背景 * 數據串流處理興起,作為實現即時數據驅動應用程式的新程式設計範式正在引起業界的極大關注 * 串流處理的異步性質基於傳遞訊息的事件驅動,而不需緊密協調同步來更新本地狀態 * 最大挑戰為正確性的保證,不論是面臨故障或是數據亂序,都要產生無誤的結果 #### 一致性(Consistency) & 完整性(Completeness) :::warning * 一致性: 從故障中能恢復一致的狀態,不會有重複項或遺失資料 * 完整性: 能夠處理和輸出所有的input data而沒有丟失數據片段。即使沒有照順序到達也能反映已接收到完整數據 ::: ![image](https://hackmd.io/_uploads/SyO1KnjgR.png) * 圖a中的處理器個單一輸入流和單一輸出流 * 圖b中處理完11後,還沒有確認後便故障。在恢復後,又重新處理了一次11,導致重複更新,如圖c,造成**結果不一致**。 圖d則是11和13都被處理了並且輸出。之後又收到12(更早的紀錄),這條紀錄為無序的到達,表之前發出的結果並未包含12,即為**結果不完整** #### 主要貢獻 * Apache Kafka利用其持久化的log架構,結合串流處理功能,通過有序的transaction log追加和重放來簡化一致性和完整性的挑戰 * 通過**Idempotent**和**transaction write protocal**來確保一次性語義 * 通過**revision**為基礎的推測處理來處理無序數據。 ## 2. STREAMING CORRECTNESS CHALLENGES ### 2.1 Fault Tolerance and Consistency :::danger 故障種類以及容錯機制 ::: #### 故障場景: * storage engine failure * 故障可能使部分狀態丟失,導致需要從頭開始重新計算 * 這期間新的數據不會被處理,影響可用性 * stream processing failure * 若處理完一條紀錄並更新狀態後,還沒確認接收該紀錄就故障,再恢復時可能會再次處理同個紀錄,造成重複問題 * inter-processor RPC failure * 串流處理器間通過訊息傳遞,若未收到ack不一定代表未成功傳送 #### 容錯機制 * Checkpoint * 設置檢查點,定期檢查狀態,作為故障時回退的snapshot。 * 在pipeline中對多個狀態檢查點做對齊,會在數據流中插入marker作為同步屏障 * 輸出已提交問題 * 若數據在故障前已提交,而故障狀態回復到之前的檢查點,重新處理可能導致重複提交。 * 透過唯一id或timestamp來分辨以去除重複 ### 2.2 Out-of-order Handling and Completeness :::danger 無序數據流 ::: #### 無序數據的原因 * 最常見因素是時鐘偏差或是網路延遲,導致先到的數據有較大的timestamp * 多個輸入流合成一個輸入流也可能使順序混論 #### 解決方法 * 基於checkpoint * 用來阻止結果的輸出直到某時間點前確認所有事件都完成 * micro-batching * 將數據流分解成幾個batch * 當batch被完成後,將被同步觸發並將更新後的狀態提交到外部儲存 ## 3. STREAM PROCESSING IN APACHE KAFKA ### 3.1 Partitions and Timestamped Records #### Kafka的分區機制 * 消費者客戶端可以訂閱一個或多個topic partition,並按附加順序從這些partition中的log中讀取record * 具有共同訂閱的多個consumer client形成一個comsumer group,其會處理任務分配、由於成員資格更改而重新平衡 #### record儲存 * 內容: * key * value * offset * timestamp * record用key-value對來存在partition之中 * 每個record被分配一個offset來唯一標示其在log中位置 * offset順序不代表確切邏輯順序,故另外也附帶一個timestamp來記錄事件發生時間 * 示意圖: ![image](https://hackmd.io/_uploads/Bk3WlRhlR.png) ### 3.2 Streams DSL and Operator Topology #### Kafka Stream * Kafka Stream是一個Java library,包含在Apache Kafka,允許用戶建構即時狀態串流應用 * 提供High-level DSL讓client指定topic,透過數據流不斷地從source topic讀取數據,將輸入流轉換為新的流並不斷更新table,最後將結果流通過pipeline傳回sink topic。 * ??作為ksqulDB的併行運行基礎,一個事件流資料庫。 * ??ksqlDB取得儲存在Kafka中的輸入資料流,並連續查詢來衍生新的資料流或物化視圖 #### Topology * Kafka Streams將應用邏輯翻譯成由連接的資料轉換運算子組成的拓撲 * 拓樸由多個連續數據處理步驟組成,如 filter, map, aggregation, etc * 拓撲被細分為子拓撲,其中每個子拓撲由連續的運算子組成,子拓墣中的運算子之間不需要透過網路而是直接在同個節點上完成 * Example ![image](https://hackmd.io/_uploads/B1FnyzhlA.png) * Example in Kafka Streams DSL ![image](https://hackmd.io/_uploads/BkGj-WhgA.png) * 說明: * stream():在輸入流中找出在pageview-events的record * filter():中找出view.period >= 30s的record * map():給他們新的key value * groupByKey():將相同key的record放到同一組 * windowedBy():五秒內取一個window * count():數window內有幾筆record * toStream():輸出結果到sink topic * filter以及map為同個子拓墣,因為其不會使partition做shuffle * 而map和count則會存在不同子拓墣,因為map會變更partition key,而count需要所有相同key值的數據存在於同個partition,故會需要做repartition :::info **Repartition** * 有時候數據操作會需要相同key的數據位於同個partition以確保正確進行計算,要在partition間移動。 * 有個特殊的topic叫做repartition topic,是用來做repartition的特別topic ::: #### State Store * 一些涉及狀態管理的操作稱為stateful operator(如aggregation,會依賴先前的數據),其通過state store來維護以及管理狀態,這state會被寫入Changelog topc * 有分三種: * built-in persistent state stores * in-memory state stores * custom implementations from the supplied state store APIs #### Changelog topic * 預設情況下,處理完數後會將狀態更新會寫入到一個特定的地方叫做Changlog topic * 在key一樣但有更高的offset則會刪除過去的record * 用於記錄每次狀態變更,若碰到故障則可以從這些狀態來恢復 :::info Repartition topic和Changelog topic都是internal data stream,無法被使用者看到 ::: ### 3.3 Data-Parallelism and Tasks * 子拓樸可被分解成一個或多個task,一個source topic對應一個任務 * task是Kafka Streams中並行工作的最小單元,可以獨立運行, * task按照處理來自source topic partition的record,透過遍歷子拓樸、更新state store以及生成輸出紀錄到changlog topic以及sink topic #### Deployment and Migration * Kafka Stream應用可以部署在多個計算節點上 * 當新實例啟動或是現有實例故障時,task會自動在實例中重新分配已平衡工作負載 * 若stateful operator的task需要遷移到新的實例,通過重放相對應的changlog topic的狀態來恢復精確的狀態 #### Example ![image](https://hackmd.io/_uploads/B1FnyzhlA.png) * 第一個拓樸包含filter, map, groupByKey三個操作,主要負責重新分配數據,確保同key數據被分配到同一個分區 * 第二個拓樸包含windowBy, aggregation操作。windowBy操作根據重分區主題讀取輸入紀錄,分配到對應窗口。aggregation對每個窗口中記錄進行計數 #### 故障處理與保證一致性 * Kafka Stream使用嵌入式producer client將輸出紀錄發送到broker,如果超時則生產者retry,但這可能導致record重複 * 在正常處理期間,Kafka Streams在將傳出記錄到sink topic以及狀態儲存的更新後,會定期提交其在來source topic上的當前處理位置。 * 如果在這些操作之間發生故障,可以在從changlog top狀態復原處理狀態後,重新取得相同的source topic的record來重新處理一次,即是種at-least-once stream processing ## 4. EXACTLY-ONCE IN KAFKA STREAMS * broker故障時會找新的broker接替 * n個broker,Kafka可以容忍n-1個故障 ### 4.1 Idempotent Writes Per Partition * Idempotent operation可以執行多次,其效果會與執行一次的結果相同 * producer client可以多次將相同record傳到partition log,並且在log上只會發生一次追加,可以消除由於各種故障場景而導致重複記錄的可能性 * 類似於TCP都有一個單調遞增的序列號,用唯一識別來消除重複紀錄 #### batch優化處理 * 生產者發送一批數據時,只需為batch中第一筆紀錄指令序列號。因為是單調遞增,故後面的都可以推出來。減少了儲存空間 #### broker序列號管理 * 每個broker會緩存每個producer最新序列號,並在broker關閉時將序列號永久保存。為了在故障或重啟時能夠恢復正確狀態 * broker故障,其原本的topic會由其他broker接管,這些新的領導副本會察看存在log的序列號已確定接收到的數據是否正確 ### 4.2 Transactional Writes Across Partitions * Exactly-once stream processing即在碰到故障時也要確保只被執行一次 * 其所有操作需要有原子性,如 * output record加到sink topic * state store更新,透過加到changelog topic * source topic的offset提交 * 原子性即表示操作完全成功或完全不成功 * ??比起傳統兩階段提交協議要將資料寫入兩次(一次寫入log另一次寫入資料),Kafka’s transactional protocol只需要寫入一次到log,並利用offset來避免暴露中止向client發送資料 * Kafka透過transaction log使transaction coordinator有很高可用性,從而避免單點故障 #### Transaction Coordinator and Transaction Log * Transaction Coordinator * 在producer一個write後,會被賦予一個唯一的transaction id,在重啟時分辨相同的transaction producer * 另外有個遞增的epoch,在重啟後會獲得一個較高的epoch,Kafka透過epoch來判斷新舊transaction,接受新的拒絕舊的 * 每個Kafka broker都有一個transaction coordinator module,用於管理分配所有transaction producer的給他的metadata(如transaction狀態) * Transaction Log :::info **Transaction Metadata**: 儲存目前transaction state * Ongoing * PrepareCommit * Completed ::: * transaction log作為一個internal topic,用於儲存transaction的metadata的更新,每個協調器都有一些transaction log中分區的一些子集。 * transaction log只會存最新的metadata,而不儲存實際的record * transaction coordinator是唯一能讀寫transaction log的元素 * 在擁有一個或多個transaction log partition的coordinator故障時,會選擇新的作為新的coordinator來接管,透過重放transaction log中的metadata來重建狀態 * 所有producer transaction id透過hash function對應到一個特定的transaction log partition。即表示transactional producer只會與一個transaction coordinator做溝通 #### 流程 ![截圖 2024-04-17 下午3.51.38](https://hackmd.io/_uploads/Skgn1WTlC.png) 1. 註冊id: transaction producer啟動,先向transaction coordinator註冊id 2. 檢查未完成的transaction:檢查id並看是否有狀態是PrepareCommit,有的話則將其提交。其他狀態的則終止 3. 增加epoch: 以防有zombie producer(已過時) 4. 在註冊被確認後,producer初始化完畢可以開始送record #### Two-Phase Transactional Commit * 一個生產者在時間內最多同時有一個正在進行的transaction * producer要送record到新的topic partition時,會先向transaction coordinator註冊其partition * 生產者送出所有record後,會清空所有write,並等待broker的ack * 送出另一個請求給transaction coordinator來進入commit流程 ##### 兩階段Commit: * 第一階段 * 將state更新為PrepareCommit並且在transaction log紀錄其變化 * 這更新被視為sychronization barrier,即不可回退 * 第二階段(coordinator非同步進行) * coordinator會在被註冊的partition上進行標記transaction commit marker * 此標記表示在該標記前的所有record都已經被提交 * 再所有marker都被coordinator確認,將會update其state為CompleteCommit,允許該producer開始其他transaction ##### Abort * 中間有error發生,producer也可以送出目前transaction的abort請求給coordinator * 在coordinator超時也會終止該transaction * Abort步驟: 1. 將state設成PrepareAbort 2. 設定abort marker,表是該marker前的所有record都被捨棄 3. 在所有marker被ack後,把state設成CompleteAbort #### Reading Transactional Record(trivial) * consumer client只讀已提交的數據 * transaction被提交才被下游可見,終止則record不會被回傳 * offset只有在transaction提交才更新,被終止則會被刪除,並回復上個提交的transaction狀態 ### 4.3 Performance Implications ![截圖 2024-04-17 下午5.12.14](https://hackmd.io/_uploads/H1SYzM6g0.png) * 從at-least-once變成exaty-once造成的performance影響 * 會需要producer和coordinator中額外的通訊 * throughput:會相對降低10~20%相對較小,主要原因是要附加一些數值,但開銷也很小 * delay:是由於partition數量多,因為transaction marker和partition數量成正比 * 提交間隔:更長間隔導致更大的transaction,從而攤平成本,但也增加端點到端點延遲 ## 5. KAFKA STREAMS REVISION PROCESSING * 前面提到的Idempotent和transaction只保證exactly-once,但是並不依賴此機制來處理無序數據 * 當無序紀錄到達,會發出糾正的結果來使前面發出的結果都無效 * stream以及table的操作: * stream以追加record的方式,table通過changelog來表示 * 處理無序數據 * stateless operator(如filter或mapValue):不用管record順序 * stateful operator(如aggregation或join):依賴先前收到的record來確定當前處理結果,對順序敏感 * 對於stream來說,因先前結果錯誤,後面的收到正確的結果,前面的錯誤結果已經沒有辦法撤回了,故會需要一段寬限期來持續傳送結果直到寬限期過了。該期間由使用者來設定 * 對於time-evolving table來說,因為後面的結果(修正revision)會覆蓋掉先前的結果,故不影響正確性 以下為例: ![image](https://hackmd.io/_uploads/BkCROvplR.png) > `window-by` 運算子依照每個輸入 record 的 timestamp 分配到 5 秒週期的 window。 > aggregation 運算子的寬限期被設為 10 秒。 在一開始,一個 timestamp 12 的 record 被處裡,然後更新 state store 的值 window$[10,15)$ 為 1,然後輸出到 output stream (上圖 (a))。 然後 timestamp 16 的 record 到達, state store 更新 window$[15,20)$ 的值為 1,然後輸出到 output stream (上圖(b))。 之後,一個 out-of-order 的 timestamp 14 的 record 到達,並且因為時間還在寬限期內而被接受。這導致了 window$[10,15)$ 被更新為 2,所以需要發出值為 2 的糾正。 > 這個糾正是透過跟之前同個 topic partition 送出,所以也會被同一個 downstream task 收到、處理。(上圖\(c)) > 在 Streams DSL 中,用戶需要提供相對應的 accumulations 與 retractions 實作以在收到修正時進一步修正 processing state。 最後有 timestamp 23 的 record 被收到。除了輸出相對應的值到 output stream,Kafka Streams 也會垃圾回收 window$[10,15)$ ,因其已經超過寬限期(上圖(d))。 之後再有 out-of-order 的 timestamp 12 的 record 送達,但會被拋棄掉。 注意這邊的寬限期僅僅只是告訴 Kafka Stream 要維持多久的 state 來應付 out-of-order 資料,<font color=red>而非指示為了保證完整性要延遲多久才輸出</font>。 ## 6. LARGE-SCALE DEPLOYMENTS ### 6.1 Bloomberg Real-time Pricing Platform ### 6.2 Expedia Real-time Conversation Platform ## 7. RELATED WORK ## 8. CONCLUSION AND FUTURE WORK