# 4-2: Consistency and Completeness: Rethinking Distributed Stream Processing in Apache Kafka ## INTRODUCTION * 系統處理資料的方式總共分成以下三種: * 服務(可以想像成real time system) * 每收到一個請求就馬上處理該請求並返回結果。 * 例子: 網站 * 批處理 (batch processing) * 收到某一群輸入資料後,才統一處理該群請求。通常需要花費相對較長的時間去處理資料,但在處理時可以視為不會再有新的資料加入進來。 * 例子: 分析資料 * 流處理 (stream processing) * 介於上述兩個處理方式之間,request進來之後不一定會被馬上處理,但會在一段時間之後被處理,且在處理的過程中允許加入新的資料。 * 例子: stdin, stdout, tcp socket * 這篇主要在探討利用 apache kafka 當成底層系統,在流處理上要如何保證其的 consistency 與 completeness * consistency * 與之前探討的 consistency 不太一樣,他代表的更像是 fault tolerance,也就是在發生錯誤之後,做完 error recovery 後資料完全不受到影響。 * 下面那張圖的 (a)、(b)、\(c) 就代表當node發生錯誤時,有可能會導致state出現重複。 * 這同時也是 kafka 原論文的缺點,就是kafka系統實際上也可能會處理同一個資料兩次以上,那篇論文只保證資料會被 process at least once,但不是 exactly once。 * completeness * 代表當資料以不正確的順序到達時,系統仍然可以保證其資料處理的正確性。 * 下面那張圖的 (a)、(b)、\(d) 就代表當資料不是以正確的order到達時,就有可能會導致處理出來的資料不正確。 * 除了要保證以上兩點外,作者希望在 performance(throughput or latency) 上不要下降太多。  ## STREAMING CORRECTNESS CHALLENGES ### Fault Tolerance and Consistency * 一致性的保證: * 僅一次 (Exactly once): * 定義: > for each record from the input data stream, its processing results will be reflected exactly once even under failures. Results are reflected in two ways: result records in the output data streams, but also internal state updates in stateful stream processing operators. * 簡單來說就是每筆輸入都只產生對應的一筆輸出,且系統內部的狀態不受到影響。 * 可能遇到的問題: * 儲存系統損壞: * 系統將無法保證其內部的狀態正確。 * 系統需要原本的輸入重新計算輸出。 * 如果輸入包含 nondeterministic 的 operation 會導致故障前後的結果不一致。 * processor 故障: * 有可能會重複處理同樣的資料。 * 可能會發生當一個 processor 暫時斷網,但其實沒有故障,可是系統已經認為其故障並重新生成一個新的 processor,這時候原本的 processor 就變成 zombie instance,且系統會有 duplicate processor 的問題。 * request 可能會故障: * 分散式系統的master通過 ack 來確定 worker 是否有收到訊息並處理資料,但沒收到 ack 有分許多種情況,有可能是 request 完全沒送到 worker、worker 故障、worker 送回 ack 的過程中出錯,這些在 master 看來都是 ack 沒有送到。 * 因此如果 master 只是在沒有收到 ack 時 resend request,有可能會導致 worker 收到duplicate 的 request 進而導致系統內部的狀態錯誤。 * 目前的處理方式: * 仰賴在 stable file system 做 checkpoint。 * 但有可能發生 output commit problem。也就是系統在故障之前已經發送出處理結果,但 checkpoint 沒有記錄到這件事,導致同一筆輸入產生 duplicate 的輸出。 ### Out-of-order Handling and Completeness * Stream processing 必須保證處理順序必須依照資料的 timestamp,否則可能會產生語意上的錯誤。 * 資料 Out-of-order 的原因: * clock skewness * network delay * read multiple input * multiple processor * 目前解決方式: * micro-batching techniques * 將 stream 拆成一個個的 batch,每個 processor 會在確認該 batch 所包含的所有資料都已經到達才開始處理資料。 ## STREAM PROCESSING IN APACHE KAFKA ### Partitions and Timestamped Records 回顧 [kafka](https://hackmd.io/9IO9TrebTAy_lvwF_V_5Cw)。 ### Streams DSL and Operator Topology * 介紹 kafka stream library 與其中的 DSL (Domain Specific Language),可以把 DSL 想像成是SQL 那樣的語言,只是 SQL 是與資料庫互動的介面,而 DSL 是定義 stream processing 具體要怎麼做的介面。 * 以下是使用 DSL 定義 stream 的範例:  kafka stream library會自動將裡面的操作轉換成拓樸圖,拓樸中包含多個子拓樸,library會將可以一起做的操作聚合在同一個子拓樸中。以下是該段DSL轉換成的拓樸圖。  被分在同一子拓樸的意思是不需要使用網路傳輸資料,我的理解是都在同一台 worker 上處理,當資料流從一個子拓樸到另一個子拓樸時,需要做repartition,也就是對子拓樸產生出的資料分配到不同的partition上,而接下來的子拓樸會將repartition後的資料當成新的input,然後做該子拓樸包含的操作。 * 當下游的子拓樸處理完資料後,它會通知kafka去repartition刪除已被處理完的資料,以保證空間可以被其他的操作所利用。 ### Data-Parallelism and Tasks * 在 kafka stream 中,一個 task 是最小的並行工作單位。 * 每個子拓樸裡可以被分配多個任務,任務的數量與 parition 的數量相等。可以藉由設定repartition 的數量來決定每一個子拓樸 task 的數量。 * Kafka stream 會作為producer的角色將輸出紀錄發到 kafka broker,如果在指定時間沒有收到ack,將會重新發送一模一樣的訊息給 broker,這會導致同一個 message 有可能被 produce 不只一次。 * > 此外,在正常處理期間,Kafka Streams會在將發出的記錄刷新到目標主題以及對狀態存儲的更新之後,定期提交其當前處理位置到源主題上。但如果在這些操作之間發生故障,那麼在故障切換後,可以在從更改日誌中恢復處理狀態後重新獲取相同的源主題記錄。在這種情況下,我們將重新處理這些記錄,而處理結果已經被持久化到目標主題和狀態存儲中,導致至少一次的流處理場景。 ## EXACTLY-ONCE IN KAFKA STREAM * 為了克服前面說明 kafka 是 at least once 而不是 exactly once 的問題,這個章節提出了解決方法。 * > Kafka架構背後的一個關鍵設計原則是在broker上持久保存流式記錄,使其具有耐用性和不可變性,這種設計選擇帶來了幾個好處。 * 被寫入 partition 裡的每個紀錄都將透過複製讓其持久化的存儲在 $n$ 個不同的 broker 上,對於某個 partition,有一個 broker 會被選為 master,負責接收 producer 產生的 record並將其複製到其他 broker 上,當 master broker crash 時,它承載的每個主導副本都將從該分區的從屬副本中選出一個新的 master,這個設計可以容忍 $n-1$ 個 broker 故障。 :::info 關於可能出現的中間狀態,Kafka的數據複製協議保證一旦記錄成功追加到主導副本,它將被複製到所有可用的副本。 ::: * 這個設計保證當下游任務在處理時暫時變慢時,上游任務不會出現任何回壓問題,因為紀錄被暫時buffer 在 broker 上。 * > 降低了恢復故障流處理實例的複雜性,並使任務故障切換和恢復相對輕量級 * kafka 裡會有一個 topic 是儲存 record 的處理記錄的,也就代表實際的資料存儲相對沒那麼重要,因為當前的狀態可以透過系統之前某一個 moment 的 snapshot 加上 reexecute change log topic 裡的資訊來回復當前的狀態。 * > 因此,我們可以將容錯狀態管理和輸出提交問題縮小到關於哪些輸入記錄被視為已處理以及在各種故障情況下哪些輸出記錄可見的合約。在Kafka中,此合約被描述為冪等和事務性的記錄追加。 ### Idempotent Writes Per Partition * idempotent operation: 代表一個操作被執行多次跟只執行一次是等價的,如果 producer 做的操作是idempotent operation,那就算重複發送了某一筆 record,在 broker 上存儲的 log 還是只會有一筆。 * 實作 idempotent producer 的方式類似於 TCP,producer 會對每個 record 維護一個單調遞增的 id,在 producer 將 record 傳送給 broker時,附上該筆 record 的 id 與 producer 的 id,broker 就可以利用這些資訊達到去除重複 record 的目的。 * 與 TCP 不同的是,record id 會與 record 本身一起被持久保存到複製的日誌中,該資訊可以讓 broker 做 error recovery 時更容易。 ### Transactional Writes Across Partitions > Exactly-Once 語意具體來看,要做到以下幾項操作原子完成: 1. 輸出結果記錄到sink topic,並且對於下游conumser 可見。 2. 計算依賴的state store 更新。 3. 輸入資料topic 的消費位置(offset)移動並完成commit。 > 怎麼將上述寫多個Kafka topic 變成一次事務? 傳統的二階段提交協議,需要寫兩次,一次log,另一次數據。 Kafka 實作的交易協定只寫一次log,利用追加的offset 遞增特性,可以將需要終止處理的資料對下游消費者不可見(未commit 就是隱藏)。 如圖4 完整示範了一次事務操作: 1. 4.b 步驟是註冊producer 到transaction Coordinator。 transaction coordinator 會對指定transaction-id 註冊上來的producer 做epoch 遞增。考慮到使用相同transaction-id 的殭屍實例情況,較老epoch 的producer 寫入資料會忽略。註冊在得到transaction coordinator 回覆後,producer 完成了交易的初始化操作。 2. 緊接著,兩階段事務提交,一個producer 同時只能有一個進行中的事務。 producer 在發送資料之前,先在transaction Coordinator 上註冊pattition(步驟4.c),完成註冊後,producer 可以準備發送資料到patition。 3. 提交事務 * 首先prodcuer flush 所有寫入請求,等待kafka broker 的回覆收到。 * producer 發送另一個請求到transaction coordinator 來初始化提交操作,透過二階段提交: * 第一階段:coordinator 更新事務狀態為PrepareCommit,記錄狀態到transaction-log topic。這個更新可以認為是事務的同步屏障(barrier),狀態更新寫入transaction log後,事務就不會停止了(failover 時PreapreCommit 狀態會做狀態恢復並繼續事務)。 * 第二階段:非同步地將提交交易的標記寫入事務所註冊的多個partition。步驟4.f 在所有交易標記都被partition leader ack 以後,coordinator 更新交易狀態為CompleteCommit,之後允許producer 開啟下一個新的交易。 > 如何終止一個進行中的事務? 1. 首先將事務狀態更新為PrepareAbort,事務終止的標記會被寫入事務註冊的多個partition,表示透過transaction-id 寫入的在該標記先前的資料都被終止且對下游消費者不可見。 2. 當所有事務標記partition leader ack 後,事務終止並更新事務狀態為CompleteAbort。 與寫相配合的是讀的部分: * 下游消費者被設定只讀取上游commited 資料。 * task 的commited offset 只在事務提交以後才會整整更新到到來源topic 上。  ### Performance Implications > 事務的代價與partition 數相關度高,因為事務標記被寫到各個partition,隨著partition 數目增加,事務的成本也變高。 實現上影響事務提交吞吐、延遲的主要因素是提交間隔(commit interval)。增加提交間隔,可以提升吞吐,代價是增加一些延遲。論文提到一些最佳化,例如當多個task 被分配到一個實例上,多次patitinon 操作可以在一次註冊操作重完成,相關寫入處理也可以合併為一個進行中的事務。 圖5 是論文給出的效能比較:Exactly-Once 語意比At-Least-Once 語意吞吐能力上下降10%-20%。  ## KAFKA STREAMS REVISION PROCESSING > 首先,Kafka 在儲存上有stream/table 的二象性。stream 和table 二象性透過互轉體現: * table as streamtable 可以看作是某個時刻的snapshot,記錄stream 中每個key 在目前的最新值。遍歷table 中的每個key-value 就可以得到一個stream。 * stream as tablestream 是追加寫入的changelog,對stream 從頭到尾重播可以建構出一個table。另外,對stream 做聚合操作也會得到一個table。 在亂序資料場景下,對operator 類型有一個區分: * 無狀態operator:例如filter/mapValue,不需要引入重排(造成業務延遲的原因),在接收、處理新資料記錄時可以直接發出(emit)處理結果。 * 有狀態operator:例如join/aggergation,依賴前一批接收到的輸入資料來確定目前的處理結果,是順序敏感的。 在Kafka Streams 中,可以對時序敏感的運算子設定參數(算子級grace period,不是系統級的全域watermark 參數),遲到的資料如果延遲在參數閾值以內,還是會被接收、處理。同時Kafka Streams 檢查每個運算子的輸出類型,決定是否可以發出(emit)推測性(speculatively)的結果: * 下游是append-only 類型的stream,系統就需要累積資料並直到資料完整後再發出。 * 下游是table 類型,系統可以即時發出推測性結果,即使在後續收到遲到資料(但滿足grace period 要求),下游算子可以把上游算子輸出的changelog stream 作為輸入,基於該輸入實現修正處理狀態(撤回舊的處理結果,並重新累積新的計算結果)。  > 在DSL 機制中,使用者需要為累積(accumulations)和撤回(retractions)提供對應的實現,知道怎樣修正結果,例如join 派產生新的table,那麼修正會從當前子拓撲往後傳播。 圖6 展示了Kafka Streams 怎樣處理遲到資料。視窗聚合週期5 秒,grace peroid 10 秒: 1. 亂序事件14(時間戳記)在6.b 到達,因此6.c 中舊視窗[10,15) 結果被更新,修改的結果w10:2 以changelog 形式被發出。 3. 6.d 在grace period 間隔達到後,[10,15) 視窗結果被回收(舊state GC),對應該時間戳視窗內的事件也不再被處理。 效能上,如果每次修正都直接穿透到下游,會產生高網路、cpu消耗(對應大量的累積、撤回)。 Kafka Streams 的因應是,DSL 上設計了壓制(suppress),可以緩衝一批修正操作再發出到下游,尤其是對於相同key 的緩衝,多筆記錄會被壓縮到1 個記錄。 ## LARGE-SCALE DEPLOYMENTS ## RELATED WORK ## CONCLUSION AND FUTURE WORK > 在生產應用時,可以配置交易提交間隔、算子層級的延時容忍參數,適配不同業務對於效能、正確性的取捨要求。論文舉了兩個生產部署的例子: * Bloomberg Real-time Pricing Platform:不同業務複雜度有差別,效能分佈在10K-25K EPS左右,Exactly-Once 效能損失在6%-10% 左右。 * Expedia Real-time Conversation Platform:簡單場景下commit interval 100ms。對於複雜的對話-視圖聚合服務,commit interval 配置為1500ms,並且開啟壓制以降低磁碟、網路IO 開銷。 > 論文展望部分主要特性提及: 1. 降端到端的延遲,應對故障場景,使用cascading rollback 演算法處理未提交的輸入資料。 2. 跨多個Kafka Streams 應用程式支援一致的狀態查詢服務。 3. 基於對齊的交易邊界支援即時控制策略,包括:auto scaling、應用程式更新、設定更新。 ## Reference * [論文原文](https://dl.acm.org/doi/10.1145/3448016.3457556) * [導讀](https://developer.aliyun.com/article/790839)
×
Sign in
Email
Password
Forgot password
or
By clicking below, you agree to our
terms of service
.
Sign in via Facebook
Sign in via Twitter
Sign in via GitHub
Sign in via Dropbox
Sign in with Wallet
Wallet (
)
Connect another wallet
New to HackMD?
Sign up