# ABSTRACT # 1. Introduction 每個夠大的網路公司都會有巨量的 log 資料。 這些資料通常組成為: 1. 用戶活動如登入、瀏覽、點擊、按讚、分享、留言、搜尋; 2. 工作指標如服務呼叫堆疊(service call stack)、呼叫延遲(call latency)、錯誤(errors)、系統指標如 CPU, memory, network, 或硬碟使用率。 然而,近期網路應用的趨勢已經使活動數據成為生產數據管道的一部分,直接用於網站功能。 這些功能包括: 1. 搜尋關聯(search relevance) 2. 基於物品熱門度或 activity stream 共現(co-occurence)的推薦 3. 目標式廣告(ad targeting) 4. 安全應用於防範濫發(spam)或未經授權的資料抓取(data scraping) 5. 關於朋友的近期動態總覽報告 這種實時運用 log 資料創造了新的挑戰,因為其容量大於實際資料好幾個數量及。 <!-- # 2. Related Work --> # 3. Kafka Architecture and Design Principles 首先介紹 Kafka 的基本概念。 一串特定類型的資料流定義為 ==*topic(主題)*==。 一個生產者(producer)可以推送資料到一個 *topic*。 推送的資料被存在一組伺服器叫 ==*brokers(中介)*==。 一個消費者(consumer)可以從 *broker* 訂閱一個或多個 *topic*,並從 broker 拉取(pulling) 資料來消費。 以下舉例使用 Kafka API 的範例程式碼。 一個訊息被定義為只酬載位元組的資料(contain just a payload of bytes)。 為了效率,生產者可以在一個 request 裡一次送好幾組訊息。 生產者範例程式碼: ``` producer = new Producer(…); message = new Message(“test message str”.getBytes()); set = new MessageSet(message); producer.send(“topic1”, set); ``` 要訂閱一組訊息,消費者首先要創造一個或多個該 *topic* 的訊息流(message stream)。 發布到 *topic* 的訊息會被平均的分配到這些子訊息流(sub-streams)。 每個訊息流提供了迭代器(iterator)的介面,消費者迭代(iterate)每個訊息並處理。 與傳統迭代器不同的地方在它不會終止,如果目前沒有更多訊息可以消費,那就會阻擋直到有新的訊息。 消費者範例程式碼 ``` streams[] = Consumer.createMessageStreams(“topic1”, 1) for (message : streams[0]) { bytes = message.payload(); // do something with the bytes } ``` Kafka 架構總覽如圖1所示。 因為 Kafka 是分散式的,所以一個 Kafka 叢集通常由數個 *broker* 組成。 為了平衡負載,一個 *topic* 會被分為多個分區(partition)然後每個 broker 存一或多個分區。 複數個生產者消費者可以同時發布與檢索訊息。  ## 3.1 Efficiency on a Single Partition 我們在 Kafka 中做了幾個增進系統性能的決定。 ### Simple storage: Kafka 有著非常簡單的儲存布局。每個 *topic* 的分區對應到一個 logical log。物理上,每個 log 被實作為一組相同大小的片段檔案(segment file)(例如 1 GB)。 每當生產者發布一個訊息到分區時, *broker* 就將其附加到最後一個片段檔案。 為了更好的效能,只會在一定數目的訊息被發布或者是過一定時間後才會沖(flush)到硬碟裡面。 一個訊息只會在被沖完後才會被消費者看到。 不像傳統訊息系統,一個存在 Kafka 裡面的訊息沒有明顯的訊息 id(message id)。取而代之,每個訊息都以log 中的邏輯位址偏移(logical offset)表示。注意 id 為遞增但非連續。 一個消費者永遠以循序的方式從一個分區消費。 如果一個消費者得知一個訊息的偏移(offset),那暗示了該消費者已經接收過更以前的訊息了。 每個 *broker* 記錄著一列偏移,包含著每個片段檔案的第一個訊息的偏移量。 一個消費者在收到訊息之後,會計算下一個訊息的偏移量,並用於下一次的請求。  ### Efficient transfer: 另一個非傳統的抉擇為避免顯性地快取訊息於 Kafka 層。 反之,我們單純用潛在的檔案系統的 page cache。這有助於避免二次緩衝——資料只存在 page cache 中。 這還可以得益於當 *broker* 重啟的時候還可以保留有效的 cache。因為 Kafka 不會快取訊息,所以 garbage collection 的花費很少。 最後,因為生產者消費者都是循序地操作,且生產者通常會落後消費者,普遍的 OS 快取方式非常有效。 我們另外也優化消費者的網路存取。 Kafka 是多訂閱者系統(multi-subscriber system)而且一個訊息可能會重複被多個消費者消費多次。 一個從本地檔案傳送資料到 remote socket 的典型作法包括以下步驟: 1. 從儲存媒體讀取到 OS 的 page cache 2. 從 page cache 複製資料到 application buffer 3. 複製 application buffer 到 kernel buffer 4. 傳送 kernel buffer 到 socket 以上總共 4 次複製 2 次 system call。 在 Linux 與其他 Unix 作業系統中存在一個傳送檔案 API (sendfile API),其可以直接從 file channel 傳送位元組到 socket channel。這通常避免了上面步驟(2)(3)中的 2 次複製與 1 次 system call。 ### Stateless broker: 不像其他訊息系統, Kafka 中關於消費者消費了多少的資料並非由 *broker* 管理,而是由消費者本身管理。 但是,這樣也讓 *broker* 難以刪除訊息,因為它不知道是否所有消費者都消費了該則訊息。 Kafka 用基於時間的SLA(time-based SLA(Service-Level Agreement))來執行保留政策(retention policy)。 一個訊息會在待在 *broker* 中一定週期後自動刪除,通常是 7 天。 這有個重要的連帶效果。一個消費者可以故意回捲(rewind)到舊的偏移並重新消費資料。 例如,當有個生產者發生錯誤,該應用可以在錯誤修復後重新撥放(re-play)特定訊息。 ## 3.2 Distributed Coordination Kafka 有個概念叫做 *consumer group*。每個 *consumer group* 為一或多個消費者共同消費一組訂閱的 *topic*。 i.e. 每則訊息只會傳到 *consumer group* 底下的一個消費者。 不同 *consumer group* 獨立地消費一組 *topic* ,不需要 group 間的協調。 我們的目標是把存在 broker 裡面的訊息平均的分配到消費者。 我們的決定是讓==一個 *topic* 裡面的分區為平行化的最小單位。== 這意味著來自同個分區的訊息只會同時被一個 *consumer group* 裡的消費者消費。如果我們允許多個消費者同時消費一個分區,那就會需要 locking 跟狀態維護成本。 相對的,我們只需要在消費者重新平衡附載(rebalance load)的時候重新協調。 為了讓附載真正平衡,我們要求一個 *topic* 底下的分區要多於每個 group 中的消費者數量。 第二個決定是讓消費者不要有一個中心的主節點(master node),而是讓它們去中心化地彼此協調。 為了達成協調性,這邊用了一個廣為使用的共識服務 ==Zookeeper==。 Zookeeper 有非常簡單的類檔案系統 API。其可以建立路徑(path),設定路徑的值,從路徑讀取值,刪除路徑,條列路徑的子路徑。 他還可以做以下幾件事情: 1. 可以登記一個 watch 以在子路徑或路徑的值發生變化時收到通知 2. 路徑可以被登記為短暫的(ephemeral)(相對於持久的(persistent)),意即如果創建路徑的用戶端沒了,那路徑也會跟著沒了 3. Zookeeper 會備份資料到多個伺服器,令資料有高可靠性與可得性。 Kafka 用 Zookeeper 來達成以下幾件任務: 1. 偵測 *broker* 與消費者的新增與刪除 2. 當上面事件發生時,觸發消費者之間的重新平衡 3. 維持消費關係(consumption relationship)與關切每個分區消費過的偏移量。 更準確地說,當 *broker* 或消費者啟動時,會將資訊存在 Zookeeper 的 *broker* 或消費者註冊表(registry)。 - ***broker* 註冊表**存了 *broker* 的 host name 跟 port,以及一套 topic 跟分區。 - **消費者註冊表**存了它屬於哪個 consumer group 以及它訂閱的一套主題。 每個 consumer group 都在 Zookeeper 關聯了一個所有權(ownership)註冊表以及偏移量註冊表。 - **所有權註冊表**對於每個被訂閱的分區都有一個路徑(path),路徑的值為目前消費此分區的消費者 id (我們將此表示為消費者擁有此分區) - **偏移量註冊表**存了每個被訂閱分區最後一個被消費的訊息的偏移量。 Zookeeper 對於 ***broker* 註冊表**、**消費者註冊表**、**所有權註冊表**產生的路徑為短暫的;==對於**偏移量註冊表**為持久的==。 - 如果一個 broker 掛了,所有分區都會被 **broker 註冊表**移除。 - 如果一個消費者掛了,會失去所有**消費者註冊表**項目以及於**所有權註冊表**擁有的分區。 每個消費者都會向 Zookeeper 登記一個 watcher 來監控 ***broker* 註冊表**與**消費者註冊表**,然後會在 *broker* 組或 consumer group 發生變化時收到通知。 當消費者第一次啟動或者收到關於 *broker*/消費者更動的通知時,消費者會啟動一套重新平衡的機制來決定他該由哪個分區的子集合(subset of partitions)來消費。 這套流程如下演算法一:  從 Zookeeper 的 *broker* 與消費者註冊表讀取後,消費者會首先計算對於每個訂閱的 *topic* $T$ 中可用的分區集合($P_T$),還有訂閱 $T$ 的消費者集合($C_T$)。 然後範圍分割(range-partitions) $P_T$ 成 $|C_T|$ 個區塊(chunk),然後選擇一個區塊來擁有。 消費者會對於他選的每個區塊在**所有權註冊表**中標記為新的擁有者。 最後,消費者開始從各個擁有的分區,依照存在**偏移量註冊表**中的偏移量來拉取資料。在消費者拉取資料的同時也會更新**偏移量註冊表**中的偏移量。 當一個 group 裡面有多個消費者時,每個消費者都會收到 *broker* 或消費者的更動消息。有可能消息傳到消費者的時序會有歧異,有可能發生一個消費者試圖擁有某個仍在被另一個消費者使用的分區。 發生這樣的情況時,第一個消費者只需要釋放所有它目前擁有分區,等一下下,再重啟重新平衡機制。通常多試幾次之後就會趨於穩定了。 當一個新的 consumer group 被創立時,**偏移量註冊表**中會沒有可用的偏移量。 於此,消費者可以從最小或最大的偏移量開始消費(取決於設定),這可透過我們在 *broker* 中提供的 API 來達成。 ## 3.3 Delivery Guarantees 通常來說, Kafka 只保證至少一次傳輸(at-least-once delivery)。 大多時候訊息只會被傳輸一次到各 consumer group。 但是當有個消費者崩潰且為不乾淨關機(unclean shutdown),接管崩潰消費者的另一個消費者有可能會收到多個重複的訊息,重複訊息為自最後一個成功提交到 Zookeeper 的偏移量之後的訊息。 如果一個應用對重複很在乎,那它就必須要自行添加除重複的邏輯。 Kafka 保證了從一個分區傳送到消費者的訊息是循序的。 但是不保證從不同分區的訊息是循序的。 為了避免 log 汙染, Kafka 會在 log 中替每個訊息存 CRC(Cyclic Redundancy Check?)。 如果有 I/O 錯誤,Kafka 會執行復原流程以移除有不一致 CRC 的訊息。 在訊息層有 CRC 就可以讓我們檢查訊息被產生或消費後有無網路錯誤。 如果 *broker* 掛了,那所有儲存其中未被消費的訊息會無法存取。 如果 *broker* 的儲存系統掛了,那麼所有未被消費的訊息就永遠丟失了。 <!-- ### 4. Kafka Usage at LinkedIn ### 5. Experimental Results ### 6. Conclusion and Future Works -->
×
Sign in
Email
Password
Forgot password
or
By clicking below, you agree to our
terms of service
.
Sign in via Facebook
Sign in via Twitter
Sign in via GitHub
Sign in via Dropbox
Sign in with Wallet
Wallet (
)
Connect another wallet
New to HackMD?
Sign up