# 4-1. Kafka: a Distributed Messaging System for Log Processing :::info **Kafka:** 用於收集和傳遞大批量的日誌數據,並且具有低延遲的分散式訊息傳遞系統。 這篇論文是LinkedIn與2011年發表的關於Kafka的論文,從中可以了解Kafka最源頭的設計理念。 ::: ## 1. Introduction 任何一家大型網路公司都會產生大量的"log"資料,資料內容包含: 1. user activity events (登入、頁面瀏覽、點擊等) 2. operational metrics (呼叫延遲、錯誤,系統指標,如CPU、記憶體、網路或磁碟利用率等) 然而最近網路應用的趨勢使得使用者在網站上的行為和互動數據,成為了生產數據管線的一部分,並且直接應用於網站的各種功能,如個性化推薦、使用者行為分析等,而這些資料量比真實資料大好幾個數量級,許多**早期處理這類資料的系統都是依靠從生產伺服器上實際收集日誌檔案進行分析**(也就是使用外部存儲裝置從服務器上直接複製數據,或是透過軟體工具自動化地完成數據提取)。 **近年來也有一些專門的分散式日誌聚合器**,這些系統主要是為了收集日誌數據,並將日誌資料載入到資料倉儲或Hadoop 中**進行離線處理**。但LinkedIn發現除了傳統的離線分析之外,還需要以極小的延遲來支援上述大部分即時應用,因此他們開發了**Kafka**,一種新型的日誌處理的訊息傳遞系統。具有**分散式、可擴展性、高吞吐量,並提供了類似於訊息傳遞系統的API,允許應用程式即時消耗日誌事件**,可以利用一個單一軟體來在線上和離線處理各種類型的日誌資料。 ## 2. Related Work 傳統企業的message system常常無法很好的處理log,主要原因如下: >**message system:** 允許software application的不同部分或不同應用城市之間交換數據,**支持異步通訊**,發送者和接收者不需要直接連接或知道對方的實現細節就可以通信。 1. 在收集日誌數據的情況往往**過度專注於提供很好的交付保證**,導致系統過載。 2. 許多系統在其設計的主要限制因素上,並**不是以吞吐量作為首要目標**,使用TCP作為訊息投遞的通訊方式(TCP不適用於高吞吐量為目標的通訊方式)。 3. **在分散式支援方面比較弱**,沒有簡單的方法可以在多台機器上對訊息進行分區和儲存。 4. 訊息傳遞是在訊息即時處理的假設上,而**缺少對於大量訊息持久化儲存的能力**,所以如果出現訊息累積時,像是做周期性大量加載而非持續處理的離線數據處理系統(如數據倉儲應用)時,性能會顯著下降。 在近幾年海量日誌處理的場景下產生一些**日誌聚合工具**,大多是為**離線消耗日誌資料**而建構,大多採用push模式。 >push模式: Broker將資料轉發給數據處理系統。 * Scribe: Facebook用於收集日誌資料的中間件,它基於Socket來收集日誌數據,並透過日誌聚合定期寫入HDFS,是用來解決日誌寫入HDFS效能不足的一種手段 >**HDFS(Hadoop Distributed File System)** : Hadoop 生態系統的核心儲存組件,為存儲大量數據而設計的分佈式檔案系統。 >**Hadhoop:** 一個開源軟件框架,主要基於 HDFS(Hadoop 分佈式檔案系統)來存儲數據,通常用於非結構化或半結構化數據的大規模處理,適合不需要實時響應的系統。 * Flume: Cloudera開發的日誌聚合中間件,支援pipes與sinks的靈活擴展以及分布式的支援。 * HedWig: Yahoo開發的一個分散式的發布訂閱系統,主要用於記錄訊息的消費記錄,有高度的可擴展性和可用性,並提供了強大的持久性保證,但主要是用於儲存資料庫(data store)的提交日誌。 而Linkedin發現**pull模式**更適合我們的應用,因為**每個數據處理系統都能以自己能承受的最大速率檢索到消息**,避免被推播的訊息淹沒在比自己能承受的速度更快的訊息中,也可以讓數據處理系統更容易回溯到之前的狀態。 ## 3. Kafka Architecture and Design Principles Kafka引入了Broker用於訊息的儲存,為了實現大量資料的存儲,Kafka叢集一般包含多個Broker;為了實現負載平衡,每一個Topic會被分成多個Partition,這些Partiton會被打散到多台Broker儲存,每一個Broker會儲存一個或多個Partion,多個生產者和消費者可以同時發布和消費訊息,架構如下圖*Figure 1*。 ![image](https://hackmd.io/_uploads/BkNQPE4eC.png) >* **topic:** A stream of messages of a particular type(一個特定類型的訊息流) >* **producer:** 可以向一個topic發布訊息,發布的訊息被儲存在一組稱為Broker的伺服器上,為了提高效率,生產者可以在一次發布請求中發送一組訊息。 >* **consumer:** 可以從Broker訂閱一個或多個主題,並透過從Broker pull data來consume訂閱的訊息。 :::info consumer要訂閱一組主題,首先要為該主題建立一個或多個訊息流(理解為分區),發佈到該主題的訊息將被平均分配到這些子訊息流(分區),每個訊息流在持續產生的訊息流上提供了一個迭代器介面,消費者對訊息流中的每個訊息進行迭代,並處理訊息的內容。 >**與傳統迭代差別:** >訊息流迭代器永遠不會終止,如果目前沒有更多的消息要消費,迭代器就會阻塞,直到新的訊息發佈到主題上。 ::: ### 3.1 Efficiency on a Single Partition * **Simple storage:** 一個主題的每個Partition對應一個logical log,這個log在是由一系列segment file實現的。每個segment file大約有相同的大小,每當生產者向分區發布訊息時,Broker只需將訊息附加到最後一個segment file中。 >為了實現高效能,Kafka先將訊息存放在內存中的頁面快取(page cache)裡,再flush到磁碟(consumer才能看到),flush策略: >1. 記憶體快取到**指定數量**的消息後自動flush磁碟。 >2. **固定時間**閾值自動flush磁碟。 > >**會引發的問題 :** Broker突然崩潰將導致數據的一致性問題,所以在最新版本的Kafka透過ISR機制最大程度的減少這種問題的發生。 >**和典型的message system不同的地方:** >1. message id 的處理方式: Kafka中每個訊息都是**透過其在日誌中的logic offset來定址**,而不是用特地id來標示,避免大量查找操作,減少系統開銷。 >2. id 特性: **message id是遞增但非連續的**,為了計算下一條message的ID,系統必須將當前message的長度加到它的ID上,也就是說message ID是根據message的大小**動態計算**的,而不是簡單地按照順序分配。 由於每個message都通過其在日誌中的位置(偏移量)來定位,而這個位置實質上反映了message的順序和相對大小,這使得message ID和偏移量之間存在著直接的關聯,因此在討論或處理Kafka message時,提到message ID或偏移量本質上是在指同一件事,都是指message在日誌中的位置。 Kafka維護了一個offset的in-memory index,記錄了每个Segment内第一個message的offset,如*Figure 2*。 ![image](https://hackmd.io/_uploads/S1QVv44lC.png) * **Efficient transfer:** 1. 批量發送與pull message >減少請求次數。 3. 在Kafka 應用程序處理消息的邏輯層面中(Kafka layer)避免在記憶體cache messsage,而是依賴底層檔案系統的page cache。 >避免了雙重緩衝,訊息只會緩存在頁面快取中,除此之外,即使在broker process重啟的時候,也能保留熱緩存(warm cache)。 4. 利用sendfile API來有效的將log segment file中的bytes從broker傳遞給consumer >sendfile API可以直接將bytes從檔案通道傳輸到socket通道。 >相比於典型的將bytes從本機檔案傳送到遠端socket,可以減少2次copy和1次system call。 * **Stateless broker:** 每個consumer已consume的資訊不是由broker維護,而是由consumer本身維護。 * **優點:** 減少複雜性和Broker的開銷。 * **缺點:** Broker不知道是否所有的使用者都consume了這個訊息,不知道怎麼刪除。 * **解決方法:** 使用簡單的基於時間的SLA保留策略,如果一則訊息在broker中保留的時間超過一定的時間(通常是7天),則會自動刪除。 >由於Kafka 的效能不會隨著資料量的增加而降低,所以這種長時間保留的方案是可行的。 **附帶的好處:** (pull model更容易實現)consumer可以故意回到一個舊的offset,重新consume數據,也就是說當consumer應用程序邏輯中存在錯誤時,應用程序可以在錯誤修復後重新播放某些消息,在consumer崩潰時,consumer可以記錄未刷新消息的最小偏移量,並在重啟時從該偏移量重新consume。 :::info **為什麼Kafka 的效能不會隨著資料量的增加而降低?** 1. Kafka 在物理存儲上採用**順序寫入**的方式,效率遠高於隨機寫入,所以即便資料量增加,寫入操作的效能也不會顯著下降。 2. 利用操作系統的**Zero-copy**技術將消息從disk直接發送到網路,避免了將資料在user space和kernel space之間來回copy的開銷,減少了CPU的使用和系統的上下文切換。 3. **將topic分區,允許資料被平行處理**,即使資料量增加,只要合理地增加分區數量和服務器,就能保持系統的高效能。 4. **依賴底層檔案系統的頁面緩存來緩存資料**,不是自己維護一個獨立的緩存層,這利用了現代操作系統對於檔案系統緩存的優化,使得即使資料集增大,資料的讀取效能也不會受到太大影響 ::: ### 3.2 Distributed Coordination * Producer在分散式環境中的執行方式: 發布訊息時,依照隨機選擇的分區或通過分區鍵和分區函數選擇分區,將訊息投遞到Broker。 * Consumer在分散式環境中的執行方式: >目的: **在不引入過多的協調開銷的情況下,將儲存在Broker 中的訊息平均分配給群組中的所有consumer。** 1. 在消費訊息時會有一個Consumer Group的概念,共同consume一組被訂閱的主題。 2. 不同的Consumer Group各自獨立consume全套訂閱的訊息,不需要跨Consumer Group的協調機制。 3. 使主題內的一個partition成為平行處理的最小單位,一個topic的其中**一個partition只會分配給一個Consumer Group中的一個consumer**(一個consumer可能會被分配到多個partition)。 >可以平行處理整個topic的消息,隨著Consumer Group中的consumer的數量增加,可以通過增加分區的數量來平行增加處理能力,從而達到橫向擴展。 >**避免了同一分區內的消息處理需要在多個consumer之間進行協調和同步的情況** 4. 不設立中心化的主節點,讓consumer以去中心化的方式自行協調。 >Consumer Group內各Consumer分配消費任務的過程稱為**rebalance**,為了方便協調,採用了一個高度可用的共識服務**Zookeeper**。 >Kafka 使用Zookeeper 完成以下任務: >1. 檢測Broker 和消費者的新增和刪除,當事件發生時在每個消費者中觸發一個再平衡過程。 >2. 維護消費關係,並追蹤每個分區的消費偏移 >一些註冊表存的東西: >1. Broker 註冊表: 包含Broker的主機名稱和port,以及儲存在其上的topic和partition。 >2. consumer註冊表: 包括consumer所屬的consumer group,以及它所訂閱的topic集合。 >3. 所有權註冊表: 對每個訂閱的分區都有一個路徑,路徑值是目前從這個分區consume的consumer id。 >4. 偏移註冊表: 為每個訂閱的partition儲存了該分區中最後一個被consume的訊息的偏移量。 > >每個Consumer Group都與Zookeeper中的一個所有權註冊表和一個偏移註冊表相關聯。 :::info **rebalance過程**: 1. Consumer監聽Zookeeper是否有Consumer/Broker的新增或刪除,若有觸發rebalance。 2. 從所有權註冊表中移除由該consumer擁有的partition。 3. Consumer從Zookeeper取得待consume的partition 註冊表和待分配任務的consumer 註冊表,分別對他們進行排序。 4. 假設partition 註冊表為N份,consumer 註冊表有M個,將N份照順序均分給M個consumer。如將1,2,3,4,5,6分成2份(即1,2,3和4,5,6)。 5. 當前consumer由上一步計算得知它分配到的partition列表,在所有權註冊表中寫入自己作為該分區的新所有者。 6. consumer啟動pull資料執行緒開始拉partition的資料consume,偏移量從儲存在偏移註冊表中的記錄值開始。 7. 如果partition是否被其他consumer佔據了消費權,若是則釋放其目前擁有的所有分區,等待一段時間後嘗試rebalance,這種情況一般是Zookeeper消息延遲導致的。 ::: ### 3.3 Delivery Guarantees Kafka 只保證**at-least-once delivery**,**不使用2 phase commit(2PC)**,大多情況下一個訊息會準確地傳遞給每個consumer group一次,當一個consumer process崩潰而沒有乾淨關閉的情況下,新接管的consumer process在最後一次offset成功提交給zookeeper 後可能會得到一些重複的訊息,導致訊息被重複消費,consumer端需要自行容錯這類問題。 >處理同步問題 >1. 透過返回給consumer的偏移量。 >2. 使用訊息中的一些唯一密鑰。 > >不管是哪一個都比2PC更cost-effective。 保證來自單一分區的訊息會依序傳遞給消費者,但不保證對於來自不同分區的訊息的順序。 為了避免日誌損壞,Kafka 在日誌中為每個訊息儲存一個CRC,如果Broker上有任何I/O 錯誤,Kafka會執行一個復原程序來刪除那些具有不一致CRC 的訊息,除此之外,在訊息層級擁有CRC也允許我們在訊息產生或consume後檢查網路錯誤。 目前如果一個Broker 宕機或永久損壞,那麼儲存在其上的任何未被消費的資訊都將不可用或永久遺失,所以未來Kafka中希望能新增複製功能,以便在多個Broker上儲存每一則訊息。 ## 4. Kafka Usage at LinkedIn 在主要數據中心,有多個前端服務(frontend)通過Load balancer將數據發布到Kafka broker,這些數據隨後被同一數據中心內的realtime service consume。此外,還有一個獨立的 Kafka 集群部署在分析數據中心,這個集群的 Kafka broker從主數據中心的實例拉取數據到數據倉庫(DWH)和 Hadoop 系統,進行進一步的數據加載和分析工作。 >**什麼是data warehouse(DWH)和Hadoop?** >是兩種不同類型的大數據存儲和處理系統。 >* **data warehouse:** 通常是關係型數據庫的一種特殊形式,專門設計來優化查詢和分析操作而不是事務處理,通常用於結構化數據的商業智能(BI)處理,尤其是需要快速、複雜的查詢時。 >* **Hadhoop:** 一個開源軟件框架,主要基於 HDFS(Hadoop 分佈式檔案系統)來存儲數據,通常用於非結構化或半結構化數據的大規模處理,適合不需要實時響應的系統。 ![image](https://hackmd.io/_uploads/HyVHwVNeR.png) 載入到Hadoop 叢集中是透過實作一種特殊的Kafka 輸入格式來完成的,該格式允許MapReduce 作業直接從Kafka中讀取資料,MapReduce 作業會載入原始數據,然後將其分組和壓縮,以便將來進行高效處理。 因為stateless broker和 client-side storage of message offsets的特性,MapReduce 任務管理允許任務失敗和重新啟動,而不會在任務重新啟動時重複或遺失訊息,只有在任務成功完成後,資料和偏移量才會儲存在HDFS 中。 * 使用Avro 作為序列化協議。 >**什麼是Avro?** >Apache軟件基金會開發的開源序列化框架,被設計用來支援快速、高效的序列化操作,並支援跨語言的數據交換。 * 將每個訊息Avro模式的id和序列化的bytes儲存在有效payload中,允許執行一個約定,以確保資料生產者和消費者之間的相容性。 * 使用一個輕量級的schema註冊服務來將schema id對應到實際的schema。 * 當consumer得到一個訊息時,它會在schema註冊表中查找,以檢索該schema,該schema被用來將bytes解碼成物件(只需要對每個模式進行一次,因為值是不可更改的)。 ## 5. Experimental Results 比較**Kafka**與**Apache ActiveMQ v5.4**(一種流行的JMS 開源實作)和以效能著稱的訊息系統**RabbitMQ v2.4**,比較目的是為了說明specialized system可以實現的潛在性能的提升,而不是要表明其他消息系統比 Kafka 差,因為ActiveMQ 和 RabbitMQ 擁有比 Kafka 更多的功能。 * **實驗環境:** * 使用ActiveMQ 的 KahaDB 作為持久化消息儲存的機制。 * 實驗在配置有 8 個 2GHz 核心、16GB 內存、6 個 RAID 10 磁盤的 2 台 Linux 機器上進行。 * 兩台機器通過 1Gb 的網絡連接相連,一台機器作為代理(broker),另一台機器作為生產者或消費者使用。 * **Producer test** Kafka producer被設置為以batch size為1和50發送message,ActiveMQ 和 RabbitMQ 假設使用batch size為1,Kafka 在批量大小為50時,吞吐量幾乎使1Gb網絡鏈路飽和,遠高於 ActiveMQ 和 RabbitMQ,**因為Kafka producer不等待broker確認,明顯能增加吞吐量**,除此之外,Kafka 的存儲格式更高效,**每條消息的開銷遠低於 ActiveMQ**。 ![image](https://hackmd.io/_uploads/BkrIDENlA.png) * **Consumer test** 在所有系統中,使用單一consumer來檢索總共1000萬條消息,每次拉取請求預取的數據量大致相同——最多1000條消息或大約200KB,都是從底層檔案系統的頁面緩存或某些內存緩衝中提供數據。 從*Figure 5*可以看出Kafka 消費速率遠高於 ActiveMQ 和 RabbitMQ,原因如下: 1. Kafka 擁有更高效的存儲格式,從代理到消費者傳輸的字節數更少 2. ActiveMQ 和 RabbitMQ 的broker必須維護每條消息的傳遞狀態,但Kafka不用。 3. Kafka沒有寫入disk的動作。 4. Kafka使用sendfile API,可以減少傳輸開銷。 ![image](https://hackmd.io/_uploads/S11vwVElA.png) ## 6. Conclusion and Future Works 本篇提出了一個名為Kafka的新型系統,用於處理大量的日誌資料流,與普通訊息傳遞系統一樣採用了一種基於pull的consume模型,允許應用程式以自己的速度consume數據,並在需要的時候隨時rewind consume。 **透過專注於日誌處理應用,Kafka 可以實現了比傳統訊息系統更高的吞吐量,也可以進行擴展**。 未來方向: 1. 在多個Broker間添加內建的訊息複製功能,即使在機器故障無法恢復的情況下,也可以提供持久化和數據可用性保證。 2. 同時支援非同步和同步複製模型,允許在生產者延遲和所提供的保證強度之間進行一些權衡。 3. 一個應用可以根據自己對持久化、可用性和吞吐量的要求,選擇合適的冗餘等級。 4. 增加streaming處理能力 * 當實時應用程式從Kafka取得訊息後,它們通常會進行一些常見的操作,例如計算一段時間內的訊息數量(窗口計數)或是將收到的訊息與其他資料庫中的資料或另一條訊息流中的訊息結合起來,為了讓這些操作能夠順利進行,當訊息被發送出去的時候,系統會根據一個特定的關鍵字(聯接鍵)來分類這些訊息,確保所有帶有相同關鍵字的訊息都被送到同一個分區。這樣一來,這些訊息就會被送到同一個處理這個分區的消費者進程,能提高對streaming數據處理的能力和效率,為在多台消費者機器上處理分散的訊息流提供了可能。 ## Reference [翻譯] https://juejin.cn/post/6844904161159413774 [筆記] https://medium.com/@vikashsahu4/kafka-a-distributed-messaging-system-for-log-processing-ce62e396626c [筆記] https://distributed-computing-musings.com/2022/03/paper-notes-kafka-a-distributed-messaging-system-for-log-processing/ [筆記] https://zhangjunjia.github.io/2020/06/04/kafka-paper/