# 7. Reliable Data Delivery ## 7.1 Reliability Guarantees 1. 在討論可靠性時,一般使用 Guarantee 這個詞,Guarantee 指的是: 確保系統在各種不同的環境下能夠發生一致的行為 2. Kafka 提供的基礎保障可以用來構建可靠的系統,卻無法保證完全可靠,需要在可靠性和吞吐之間做取捨: * Kafka 在 partition 上提供了消息的順序保證 * 生產的消息在寫入到所有的同步分區(partition on all its in-sync replicas)上後被認為是 commited (不需要刷到硬盤). producer 可以選擇在消息提交完成後接收 broker 的確認, 或是寫入 leader 之後, 或者所有的副本 * 只要有一個 replica 存在, 提交的消息就不會丟失 * consumer 只能讀取到已提交的消息 ## 7.2 Replication 1. kafka 複製機制和 partition 有多個 replicas 架構,是 kafka 可靠性保證的核心 2. 把消息寫入多個 replicas 可以使 kafka 在發生崩潰時仍能保證消息的持久性 3. partition:kafka 存儲數據的基本單位,存儲在單個 disk 上,一個 topic 的數據被分到多個 partition 進行存儲,kafka 可以保證 partition 內的數據是有序的 4. partition replicas:每個 partition 有多個 replicas;producer, consumer 只與 leader replica 交互;follower replica 只需要即時從 leader 複製最新事件,以與 leader 保持同步;當 leader 不可用時,其中一個 in-sync replica 選舉為新 leader;(注意: in-sync replica 才可以選舉為新首領) 5. follower replica 成為 in-sync replica 的條件: * 保持到zk的連接會話: 每隔 6s 向 zk 發送心跳,時間可配置 * 每隔 10s 向 leader 拉取消息,時間可配置 * 過去 10s 內從 leader 拉取最新的消息,保持不間斷的從 leader 獲取消息是不夠的, 必須保證幾乎沒有延遲 #### 非同步副本問題 1. 如果一個 replica 在 in-sync 與 out-of-sync 狀態間頻繁切換,說明集群內部出現了問題,通常是由於 jvm 不恰當 gc 導致的,需要優化系統性能 2. 一個滯後的 in-sync replica 會導致 consumer, producer 變慢 因為消息被認為 committed 前,client 會等待所有 in-sync replica 接收消息 - 當一個 replica 不再同步了,我們就不需要關心它是否接收到消息 - 更少的 in-sync replica,意味著更低的有效複製系統,發生宕機時丟失數據的風險更大 ## 7.3 Broker Configuration ### Replication Factor * replication.factor 為 topic 級別的副本數設置 * default.replication.factor 為 broker 級別的副本數設置,default = 3 * 通過這個配置來控制自動創建的 topic 的副本數: 為 N 的時候,也就是每個 partition 總共被 N 個不同的 broker 複製 N 次,可以容忍失去 N-1 個 replicas,保證 topic 的可讀寫 * replication.factor = N,則需要至少 N 個 broker,會佔用 N 倍的磁碟空間 * N 的選擇是『可用性』、『存儲硬件』之間的權衡 * 可以透過 broker.rake 來表示 broker 所在的機架,kafka 可保證將 partition 的 replicas 分佈在多個機架上,獲得更高可用性 ![image](https://hackmd.io/_uploads/rkNgfSkvp.png) 設定上的建議: * 需要可用性的場景,建議設定 N=3 * broker 重啟導致 topic 不可用若是可接受的,可以設定 N=1 ### Unclean Leader Election * unclean.leader.election.enable 為 broker 級別 * 0.11.0.0 之前的版本,默認為 true,之後的版本默認為 false * clean leader election: 選舉過程中,commited data 都有同時存在 in-sync replica,那麼這個選舉就是 clean 的! * 控制 out-of-sync replica 能否參與 leader 的選舉: - 設置為 true,當沒有 in-sync replica 可用的時候,out-of-sync replica 會成為 leader,意味著有數據丟失 - 設置為 false,則意味不會有 commited data 丟失,但在 leader 恢復之前,系統會處於不可用的狀態 - 需要在可用性和一致性之間做取捨 設定上的建議: * 數據質量與一致性要求高的,建議設定 false,ex: 銀行交易事務 * 可用性要求高的,建議設定 true,ex: 實時點擊流分析系統 圖解: ![image](https://hackmd.io/_uploads/ry16VSyDa.png) When unclean leader election is enabled the following scenario results in message loss: ![image](https://hackmd.io/_uploads/rkpRVr1Dp.png) ### Minimum In-Sync Replicas ``` 前情提要,為何需要此參數: 1. 默認情況下,一條消息被寫入所有 in-sync replica,才被認為是 committed,才可以繼續向 partition 寫入和消費下一條數據 2. 如果所有 in-sync replica 只剩下 1 個,並且它變成不可用了,此時數據就丟失!!! ``` * min.insync.replicas 設置可以作用於 broker 和 topic 級別 * min.insync.replicas=N: 至少要存在 N 個 in-sync replica,才能像 partition 寫入數據 * 假如 broker 數為 3,min.insync.replicas=2. 當 2 個 in-sync replica 中的 1 個出現問題,集群便不會再接受 producer 的發送消息請求. 客戶端會收到 NotEnoughReplicasException * NotEnoughReplicasException 時, consumer 還可以繼續讀取存在的數據,唯一的 in-sync replica 變成只提供讀 * 可以避免再發生 unclean leader election 時,數據的寫入與讀取有非預期不一制的狀況發生 圖解: ![image](https://hackmd.io/_uploads/B12o7rkP6.png) ### Keeping Replicas In Sync * replica 有兩個情況會變成 out of sync: 1. loses connectivity to ZooKeeper 2. fails to keep up with the leader and builds up a replication lag * zookeeper.session.timeout.ms - 定義: ZooKeeper 會話超時時間。如果 Broker 在此時間內未向 ZooKeeper 上報心跳,則被認為失效,並且被踢出 cluster - v2.5.0, zookeeper.session.timeout.ms 從 6s 增加到 18s, 為了在雲端增加 Kafka clusters 的穩定性,因為雲端 network latencies 可能會高一點 - 此值的設定需要一些權衡: 1. high enough to avoid random flapping caused by garbage collection or network conditions 2. low enough to make sure brokers that are actually frozen will be detected in a timely manner * replica.lag.time.max.ms - 定義:Kafka 依據 replica.lag.time.max.ms 參數來判定一個 Replica 是否仍屬於 ISR,如果 Follower 較長時間未與 Leader 保持同步,則 Leader 會考慮將這個 Follower 從 ISR 中踢走 - v2.5.0, 此值從 10s 增加至 30s,改善 resilience of the cluster 和避免 unnecessary flapping ### Persisting to Disk Kafka will flush messages to disk when rotating segments (by default 1 GB in size) and before restarts but will otherwise rely on Linux page cache to flush messages when it becomes full. 以下 2 個參數可以讓存入 disk 變得更彈性 * flush.message:最多可以有幾則 msg 沒存進去 disk,如果值=1,則每個訊息都要直接存進去 disk * flush.ms: 訊息存進去 disk 的頻率 ## 7.4 Using Producers in a Reliable System ``` 情況1 broker配置了3個副本,禁用不完全首領選舉;生產者發送消息設置acks=1; 生產者發送消息A給首領,首領成功寫入,並告訴生產者成功寫入,但跟隨者副本還沒有收到這個消息;這時首領崩潰了,而此時,消息還沒有被跟隨者副本覆制過去。 結果: 另外兩個副本仍然被認為是同步的(畢竟判斷一個副本不同步需要一段時間),而其中一個副本稱為新首領。 1.對於生產者來說,消息A成功寫入了 2.對於消費者來說,因為消息A沒有被複製到所有副本,即不會認為已提交,所以消費者是看不到消息A的,它認為數據是一致的,沒有丟失消息 小結:從生產者角度來講,實際上就是丟失了一條消息;即便kafka系統看起來數據是一致的; ``` ``` 情況2 broker配置了3個副本,禁用不完全首領選舉;生產者發送消息設置acks=all; 生產者往kafka發送消息,分區首領剛好崩潰了,新首領正在選舉中,kafka會向生產者返回首領不可用的響應;這個時候,若生產者沒有正確處理錯誤或沒有重試直到發送成功,則消息就有可能丟失; 結果: 1. 這不算broker可靠性問題,因為broker沒有收到這個消息; 2. 這也不是不一致性問題,因為消費者沒有讀到這個消息; 問題在於, 生產者沒有正確處理錯誤,弄丟消息的是它自己 ``` **如何避免上述兩種問題?** 1. 根據可靠性需求,配置恰當acks值 2. 在參數配置和代碼裡,正確處理錯誤 ### Send Acknowledgments acks 可選 0, 1 或者 all,設置影響吞吐和一致性 * acks=0 - 消息發送出去後就認為是成功寫入 topic - producer 只管發不管發送成功與否,不需要等待服務器的確認,這時 retries 設置無效 - unclean.leader.election.enable=true 情況下,正在新首領選舉時,producer 並不會知道原來 leader 已經不可用了!因此會丟失訊息 - 延遲低,容易丟失數據 * acks=1 - 發送後 leader 寫入成功(但是並沒有刷新到磁盤)並且向 producer 回應,就認為成功 - unclean.leader.election.enable=true 情況下,正在新首領選舉時,producer 會收到:LeaderNotAvailableException,要針對此錯誤來 retry! - 延遲中等,一旦 leader 掛了,且消息還沒同步到 in-sync replica,就會丟失數據 - 丟失訊息情境說明: producer 發送數據到 leader,leader 寫本地日誌成功,返回客戶端成功;此時 ISR 中的 replica 還沒有來得及拉取該消息,leader 就死了,那麼此次發送的消息就會丟失 ![image](https://hackmd.io/_uploads/Skn-hfdP6.png) * acks=all - 發送後等待所有 in-sync replicas 寫入後確認 - all 設定值等同於 -1 - 可以透過 min.insync.replicas,來改變『所有』 in-sync replicas 寫入後才算成功的這個條件! - 最保險,但是延遲會最多的做法,可通過異步與大批次來加快速度 - 不會丟失訊息情境說明: 數據發送到 leader, ISR 的 follower 全部完成數據同步後,leader 此時掛掉,那麼會選舉出新的 leader,數據不會丟失 ![image](https://hackmd.io/_uploads/SkgGazOv6.png) - 重複訊息情境說明: 數據發送到 leader 後 ,部分 ISR 的 replica 同步,leader 此時掛掉。follower1 和 follower2 都有可能變成新的 leader, producer 端會得到返回異常,producer 端會重新發送數據,數據可能會重複 ![image](https://hackmd.io/_uploads/HkqOafODa.png) (補充: 當然上圖中如果在 leader crash 的時候,follower2 還沒有同步到任何數據,且 follower2 被選舉爲新的leader的話,這樣消息就不會重複) ### Configuring Producer Retries #### producer 需要處理的錯誤 - 自動處理: 可通過重試來解決.若 broker 返回的錯誤,可以通過重試來解決,則生產者自動處理這些錯誤 - 手動處理: 難以透過重試來解決 #### retry 1. 可重試 vs 不可重試 - 重試之後可以解決的:如首領不可用錯誤(LEADER_NOT_AVAILABLE),重試幾次,首領選舉完成,消息成功寫入 - 重試之後無法解決的:如配置錯誤(INVALID-CONFIG),消息大小超過閾值 2. retry 次數設定: - 次數設高:想抓住異常多重試幾次 - 次數設低:延遲是不可接受的 - 次數設 0:想把消息保存到某個地方(某 topic)之後再來重試 - 無限重試:MirrorMaker **注意:** * 重試發送一個已經失敗的消息會帶來風險,因為如果兩個消息都寫入成功,則消息會重複,這需要消費者在處理消息時保證冪等性 * retries 和錯誤處理,就是 Kafka 消息的 at least once 保證 * 在0.11.0.0之後, 提供了冪等的特性,保證消息的 exactly one: enable.idempotence,可以把重複訊息 skip #### 冪等性 - 服務器對先後多次相同客戶端請求的回應是相同的,如轉賬 ### Additional Error Handling 1. 不可重試的 broker 錯誤,如消息大小超長錯誤,認證錯誤 2. 消息發送前的錯誤,如序列化錯誤 3. producer 達到 retry 次數上限時或消息占用內存達到上限時的錯誤 ## 7.5 Using Consumers in a Reliable System ### Important Consumer Configuration Properties for Reliable Processing - group.id - 兩個有相同 group.id 並且訂閱同一個 topic 的 consumer, 會分配到 topic 下不同的 partition,如果希望 1 個 consumer 要看到全部 message,就要設定不同的 group.id - auto.offset.reset - 這個參數控制當 broker 端沒有發現任何提交的 offset 的時候, consumer 應該從什麽位置開始讀取消息 - 有 earliest 和 latest 兩種設置,earliest 意思是會從 offset = 0 開始讀取, 而 latest 意思是從最末尾開始 - enable.auto.commit - 按照時間計劃 commit offset 或者代碼中手動 commit,對 consumer 來說這是一個重大的決定 - 自動 commit 會保證只提交循環中已經處理的數據, 但是有可能會在下次提交始前系統崩潰,導致已經被處理的消息的 offset 沒有提交到 broker,下次拉取的時候(consumer 重新上線或者 rebalance 時候由其他 consumer 處理該 partition)會重新拉取已經處理過的消息, 重覆消費 - 如果將拉取的消息交由其他的線程處理, 那自動提交可能會消息還沒處理完畢,就已經 commit offset - auto.commit.interval.ms - 設置自動 commit offset 的頻率;默認值是每 5 秒自動提交一次 - 當 enable.auto.commit 設置為 true 的時候, 通過這個配置控制自動提交的時間間隔 - 越大吞吐就越大, 一致性就越低, 越小則會增加提交的次數, 影響吞吐, 但是會提高一致性 ### Explicitly Committing Offsets in Consumers 1. 總是提交已經處理過得消息 假如你是在 poll 循環中處理所有的消息, 並且不需要維護跨多次 poll 的狀態, 會比較容易實現. 可以使用自動提交, 或者在 poll 結束時進行偏移量提交. 2. 提交頻率是性能和系統崩潰時重覆的消息數量間的取舍 一次 poll 循環中可以進行多次 commit offset, 甚至每處理一條提交一次. 或者幾個 poll 提交一次. 提交會有性能上的開銷, 類似生產者的acks=all 3. 保證你清楚的了解將要提交什麽偏移量 常見的一個陷阱就是一次 poll 循環中的 offset 提交了讀到的最大 ofset, 而不是已經處理過得最大 offset,會導致消息丟失 4. 再平衡 設計應用程式前要注意: consumer 在 partition 被撤銷之前要 commit offset,並在分配到新 partition 時,清理之前的狀態 5. 消費者可能需要重試 - 情況1:在遇到可重試錯誤時,提交最後一個處理成功的 offset,然後把沒有處理好的消息保存到緩存里(下一個 poll 就不會把它覆蓋掉),調用 consumer 的 pause() 來確保其他 poll 不會返回數據;如果重試成功或重試次數達到上限,把錯誤消息丟棄,調用 resume() 讓 consumer 從 poll 重新獲取新數據 - 情況2:在遇到可重試錯誤時,把錯誤消息寫入另外的 topic-B(解耦);由 topic-B 的 consumer 來處理錯誤;類似於MQ的死信隊列; 6. 消費者可能需要維護狀態 - 某些場景下, 需要在多個輪詢間存在聚合運算,例如:想計算 message 的移動平均數,有一個方法是,在 commit offset 時,順便把剛算好的平均數寫到一個 result-topic - 但 kafka 並不支援 transaction,萬一平均數寫入 result-topic,此時卻來不及 commit offset 就崩潰了,那就亂掉了!!! - 建議嘗試使用 kafkaStreams 類庫,為聚合,連接,時間窗和其他覆雜分析提供了高級的dsl api ## 7.6 Validating System Reliability ### Validating Configuration - Kafka 在 org.apacha.kafka.tools 下有兩個 class 作為驗證工具,可以通過命令行運行,也可以在各種測試框架中使用: - VerifiableProducer - VerifiableConsumer - VerifiableProducer - 可以按照我們指定的參數來發送一定數量的消息,消息內容為從 1 開始遞增的數字,參數包括 acks,重試次數和發送速率等等,運行時會打印每條消息發送成功或失敗 ![image](https://hackmd.io/_uploads/BJ_r8WmOp.png) - VerifiableConsumer - 消費 VerifiableProducer 生產的消息,按照消費順序打印消息內容,並且打印提交 offset 和分區重分配的消息 ![image](https://hackmd.io/_uploads/H1L8UWXuT.png) - 建議可以設定一些故障的場景: * leader 選舉: 關掉 leader 所在的 broker,producer 和 consumer 需要多長時間恢復? * controller 選舉: 重啟 controller,整個系統需要多長時間恢覆? * 滾動重啟: 一台一台的重啟 broker,能否做到一條消息都不丟失? * unclean leader 選舉: 當發生了unclean leader 選舉時,producer 和 consumer 會發生什麽,能否接受後果? ### Validating Applications - producer 和 consumer 替換成了自己開發的應用代碼,保持 Kafka 的配置不變,啟動應用中的 producer 和 consumer,在構建的場景中測試一些情境: * producer 和 consumer 與 Kafka 集群斷開網絡 * 發生了 leader 選舉 * broker 進行滾動重啟 * consumer 進行滾動重啟 * producer 進行滾動重啟 ### Monitoring Reliability in Production 這一步非常重要,因為萬一前兩步有所疏漏,或者來不及做,監控可以確保及時發現問題,避免損失。 監控的內容可以包括:JMX、日志以及其它更覆雜的自定義的指標。 #### JMX 監控 - Kafka 自帶了 JMX 監控,對於 broker, producer 和 consumer,分別有不同的指標可以關注。 - 對於 broker,值得監控的指標很多,比如達不到 ISR 最小副本數的 partition 個數,正在同步的分區副本數,下線分區數,controller 數量,失敗的生產請求數,leader 選舉次數和時間等等 - 對於 producer,兩個和可靠性相關的指標是每條消息的平均 error-rate 和平均 retry-rate,這兩個指標如果上升了,表明系統肯定是出了問題,producer 發送訊息的 error 是 warn 等級 - 對於 consumer,最重要的指標是 consumer-lag,它表明了這個 consumer 當前消費到的位置落後於這個 topic 的各個 partition 最新消息有多遠,理想情況是在 0 和一個很小的值之間波動,因為 consumer 有可能一次 poll 很多訊息,需要消化一下!如果 consumer-lag 增大到一定的閾值,則需要進行處理 - burrow 是一個可以監控 consumer-lag 的工具! 考慮一下吧~ #### Confluent Control Center - 是一個對整個產品進行管理的控制中心,最主要的功能對Kafka cluster 的各個 producer 和 consumer 進行性能監控(ex: 比較producer 和 consumer 個別處理的消息數量),同時可以很容易地管理 Kafka 的連接、創建、編輯和管理與其它系統的連接 - Confluent Control Center 只在 Confluent Kafka 企業版提供支持 ![image](https://hackmd.io/_uploads/BJtRGfm_6.png) ## ref [筆記1](https://atbug.com/kafka-reliable-data-delivery/) [筆記2](https://blog.csdn.net/PacosonSWJTU/article/details/121757987?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522170295638116800180613753%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=170295638116800180613753&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~first_rank_ecpm_v1~rank_v31_ecpm-12-121757987-null-null.142^v96^pc_search_result_base1&utm_term=Kafka%E7%9A%84%E6%B6%88%E6%81%AF%E5%8F%AF%E9%9D%A0%E4%BC%A0%E9%80%92&spm=1018.2226.3001.4187)