# 4. Kafka Consumers: Reading Data from Kafka ## Kafka Consumer Concepts ### Consumers and Consumer Groups #### consumer #### Consumer group - 1 組 Consumer group 共同合作處理一些 topics,也就是一組consumers 分擔多個 partitions 的 loading - 透過增減 member 的數量來拓展處理 messages 的能力,因此topic擁有數量較多的partitions,在 scale 上較有彈性 - 1 個 partition 只會被同個 group 內的一個 member consume,因此 member 的數量超過 partition 的數量時,就會有 consumer 會閒置 ![image.png](https://hackmd.io/_uploads/S1tyfwIQ6.png) ### Consumer Protocol #### 名詞認識 - group coordinator: 在 broker 端會有一個 broker 來擔任(不同的 group 會有不同的 broker),coordinator 負責處理 consumer heartbeat、指派 group leader 以及 consumer 加入&離開等事務 - group leader: 第一個加入 group 的 consumer 成為group leader #### startup, consume, shutdown 1. Startup - consumer 起來時先問 broker 使用的 api version - 接著透過 metadata request 詢問 cluster information(e.q. broker address、partition 數量以及 partition leader等) - consumer 接著會尋找 coordinator 並送 JoinGroup reuqest 給coordinator 要求加入group,第一個加入 group 的會成為 group leader - group leader 會從 coordinator 得知 group 中所有 members 的資訊(資訊存在zookeeper),group leader 負責分配 partitions 並將名單送給 coordinator - 接著 members 詢問 coordinator 得知自己的 assignment ![image.png](https://hackmd.io/_uploads/S1_L5OUXT.png) 2. Consumption - consumer 開始拉取 message 時要先知道要從哪裡開始,可以透過 fetch offset request 得知該 partition 上次處理到哪個 offset - fetch offset request 不是必要的,只要 consumer 已經知道offset,那只要在 fetch reques t時可以聲明即可 - 隨著 fetch request 不停地拉取 messages,consumer 還必須適時的向 broker 更新 offset 以及向 coordinator 發送 heartbeat ![image.png](https://hackmd.io/_uploads/r1E29OU7p.png) 3. Shutdown - consumer 發送 leave group request 以 gracefully shutdown ![image.png](https://hackmd.io/_uploads/BJeRcuIQT.png) ### Poll loop - consumer poll api 封裝大部分的動作,包含 partition rebalance、送 heatbeat 及 data fetching,這樣的設計讓application 只要處理資料就好 - consumer必須要在一定的時間內送出 heartbeat,否則會被認為 not alive - 因此處理資料的時間必需短於 session timeout 的時間 - poll api 回傳 messages,每筆message包含key、value、partition、offset 和 topic - poll 可以設定參數,控制 block 多久時間來等待 consumer buffer 裡有資料,時間長短端看 application 想要多快拿回控制 #### Thread Safety - 同一格群組裡,無法讓一個 thread 運行多個 consumer - 也無法讓多個 thread 安全的共享一個 consumer - 按照規則:one consumer one thread ### Rebalance 某些情況發生時,consumer group 內的 partitions 需要重新分配,好讓每個 partition 都有被處理以及盡可能地平均分配 #### 發生 rebalance 的情況: - consumer 加入 group - consumer 離開 group - topic 有新增 partition - broker failure,該 broker leader partition 轉讓 #### rebalance 歷程: - consumer 會定時送 heartbeat 給 coordinator - 當 coordinator 一段時間沒收到 heartbeat 時便認定 consumer 已經退出 group,觸發 rebalance - 在執行 rebalance 的期間,整個 group 不會 consume message - 在 coordinator 尚未發現 consumer 已經退出(e.q. consumer crash)的這段時間,會使 partition 的 messages 暫停被 consume,直到 heartbeat session timeout(應該就是等待一下 確認 consumer 是真的死,才觸發 rebalance) - consumer 明確地告知 group coordinator 退出,可使 group coordinator 立即觸發 rebalance 以便降低無法處理 partition的gap ![image.png](https://hackmd.io/_uploads/ryz80KcQT.png) #### 為人詬病點: 因為 rebalance 過程會觸發 stop-the-world(STW),此時對應 topic 的資源都會處於不可用的狀態。小規模的集群還好,如果是大規模的集群,比如幾百個節點的 consumer 或 kafka connect 等,就是一場災難。 所以 kafka2.4的時候,社區推出兩個新feature來解決重平衡過程中STW的問題: * Incremental Rebalance Protocol(以下簡稱cooperative協議):改進了eager協議(即舊重平衡協議)的問題,避免STW的發生 * static Group membership:避免重起或暫時離開的消費者觸發重平衡 ### Static Group Membership #### why * 目前的 Rebalance 設計中,消費者組下的每個實例都會被Coordinator 分配一個成員 member.id,這個 member ID 是Kafka 自動生成的 * member.id 規則是 client.id-UUID,client.id 就是 Consumer 端參數 client.id 的值,而且這個 ID 會隨著每輪Rebalance 發生變化的 * 因為 Coordinator 無法記住每個成員都是誰,制約 Rebalance 時所有成員必須強制重新加入 * 源代碼可以發現:每次 Client 重啟回來發送 JoinGroup 時,它會封裝一個 UNKNOWN_MEMBER_ID 的空串,沒有任何有意義的信息給到Broker端,Coordinator 接收到後只能把它當做是一個全新的成員 * 如果 member.id 能夠被記住,那麽 Coordinator 就可以容忍它短暫的離線而不開啟 Rebalance,從而縮短消費者組整體不可用的時間窗口 #### how * 2.3和2.4版本引入了靜態成員(Static Member)的概念以及一個新的 Consumer 端參數:group.instance.id。 * 一旦配置了該參數,成員將自動成為靜態成員,否則的話和以前一樣依然被視為是動態成員 * 你可以認為這個新參數是一個『要被持久化的新 member.id』 * 它依然不能由用戶指定,構建規則是 group.instsance.id-UUID * 和 member.id 不同的是,每次成員重啟回來後,其靜態成員 ID 值是不變的,因此之前分配給該成員的所有分區也是不變的,而且在沒有超時前靜態成員重啟回來是不會觸發 Rebalance 的 #### 好處: - 其靜態成員ID值是不變的,因此之前分配給該成員的所有分區也是不變的 - 假設一個成員掛掉,在沒有超時前,靜態成員重啟回來是不會觸發 Rebalance 的(超時時間為 session.timeout.ms,默認10 sec) - 在靜態成員掛掉這段時間,broker 會一直為該消費者保存狀態(offset),直到超時或靜態成員重新連接 #### 使用了 static membership 功能後,觸發 rebalance 的條件如下: - 新成員加入組:這個條件依然不變。當有新成員加入時肯定會觸發 Rebalance 重新分配分區 - Leader 成員重新加入組:比如主題分配方案發生變更 - 現有成員離組時間超過了 session.timeout.ms 超時時間:即使它是靜態成員,Coordinator 也不會無限期地等待它。一旦超過了 session.timeout.ms 時間依然會觸發 Rebalance - Coordinator 接收到 LeaveGroup 請求:成員主動通知 Coordinator 永久離組 #### 使用 static membership 的兩個條件是: 1. consumer 客戶端添加配置:props.put("group.instance.id", "con1"); 2. 設置 session.timeout.ms 為一個合理的時間,這個參數受限於group.min.session.timeout.ms(6 sec)和 group.max.session.timeout.ms(30 min),即大小不能超過這個上下限。但是調的過大也可能造成 broker不斷等待掛掉的消費者客戶端的情況 ![image.png](https://hackmd.io/_uploads/B1jXZYLmp.png) ## Creating a Kafka Consumer - 在讀取消息之前,需要先創建一個 Consumer object - 創建 Consumer object,把想要傳給 Consumer 的屬性放在 Properties 里 - 需要使用 3 個必要的屬性: bootstrap.servers、 key.deserializer、 value.deserializer #### python code ``` from confluent_kafka import Consumer conf = {'bootstrap.servers': 'host1:9092,host2:9092', 'group.id': 'foo', 'auto.offset.reset': 'smallest'} consumer = Consumer(conf) ``` ## Subscribing to Topics - subscribe() 可以傳入一個正則表達式,正則表達式可以匹配多個主題如果有人創建了新的主題,並且主題名與正則表達式匹配,那麽會立即觸發一次再均衡,消費者就可以讀取新添加的主題 #### python code ``` running = True def basic_consume_loop(consumer, topics): try: consumer.subscribe(topics) while running: msg = consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: # End of partition event sys.stderr.write('%% %s [%d] reached end at offset %d\n' % (msg.topic(), msg.partition(), msg.offset())) elif msg.error(): raise KafkaException(msg.error()) else: msg_process(msg) finally: # Close down consumer to commit final offsets. consumer.close() def shutdown(): running = False ``` 備註: 很重要!!最後要執行 Consumer.close(),這樣才能確保 active sockets 關掉,同時也將觸發 group rebalance,將 partition 做 re-assigned,如果沒有正確關閉,就是在 session timeout 後做 rebalance ## Configuring Consumers * fetch.min.bytes * 控制 broker 最小回傳 messages 的 size,如果可取得的 messages size 不夠,broker 會 block 直到 size 夠才回傳 * 可以提高 fetch.min.bytes 來降低 RTT(來回通訊延遲),並且在 consumer 數量很多的時候可以降低 brokers 的 loading * fetch.max.wait.ms * 當 messages size 不足 fetch.min.bytes 的要求時,broker 最多 block 多久就要回應 request * fetch.max.bytes * 一次拉取 messages 最大返回的 size,默認為1M,如果一個分區的第一批消息大小大於該值也會返回,為了保證 consume 得程序可以一直進行 * max.poll.records * 該屬性用於控制單次調用 call() 方法能夠返回的記錄數量,可以幫你控制在輪詢里需要處理的數據量 * max.partition.fetch.bytes * 每個 partition 每次回傳給 consumer 的最大 size,避免 run out of consumer memory * 需注意設置太大時,可能使 data processing 的時間過久,而無法在 session timeout 之前送 heartbeat * 這個設置是 per partition 的,因此要跟 consumer 總共處理partitions 數量一起計算是否超過 consumer memory * 假設有 20 個 partitions、5 個 consumers 且max.partition.fetch.byte 是 1MB,則每個 consumer 就需要大於 4MB 的可用 memory。但實際上會需要更多的 memory,因為 group 裡的 consumer 可能會離開 * session.timeout.ms * heartbeats 有效時間 * 較低的 timeout 時間可以快速地偵測不預期退出的 consumer,但也較容易誤判造成 rebalance * 較高的 timeout 會使恢復 partition consume 的 gap 較大,但也較不易誤判造成 rebalance * heartbeat.interval.ms - 多久送 heartbeats,通常與 session.timeout.ms 一起調整 * max.poll.interval.ms * 兩次 poll 方法調用的最大間隔時間,單位毫秒,默認為 5 分鐘 * 如果 consumer 在該間隔內沒有發起 poll 操作,該 consumer 將被踢除,觸發 rebalance,將該 consumer 分配的 partition 分配給其他 consumer * default.api.timeout.ms * 指定客戶端 api 的超時時間,default: 1 min * 此配置用於「未指定超時參數」的所有客戶端操作的默認超時 * request.timeout.ms * 請求的超時時間,與 Broker 端的網絡通訊的請求超時時間,default: 30s * auto.offset.reset * 當 consumer 提供的 offset 不合法(offset 已經被刪除或者 offset 不存在)時,要從哪裡(latest or earliest)開始 consume * value: earliest(or smallest), latest(or largest)(default) * enable.auto.commit * 當設為 true 時,會根據 auto.commit.interval.ms 時間自動 commit offset * partition.assignment.strategy * PartitionAssignor 根據給定的消費者和主題,決定哪些分區應該被分配給哪個消費者 * 分區方法 * Range: 會把主題的若干個連續的分區分配給消費者。假設悄費者 C1 和消費者 C2 同時 訂閱了主題 T1 和主題 T2,且每個主題有 3 個分區。那麽消費者 C1 有可能分配到這兩個主題的分區 0 和分區 1,而消費者 C2 分配到這兩個主題的分區 2。因為每個主題 擁有奇數個分區,而分配是在主題內獨立完成的,第一個消費者最後分配到比第二個消費者更多的分區。只要使用了 Range 策略,而且分區數量無法被消費者數量整除,就會出現這種情況 * RoundRobin:把主題的所有分區逐個分配給消費者。假設給消費者 C1 和消費者 C2 分配分區,那麽消費者 C1 將分到主題 T1 的分區 0 和分區 2 以及主題 T2 的分區 1,消費者 C2 將分配到主題 T1 的分區 l 以及主題T2 的分區 0 和分區 2。一般 來說,如果所有消費者都訂閱相同的主題(這種情況很常見), RoundRobin策略會給所 有消費者分配相同數量的分區(或最多就差一個分區)。 * Sticky: 保證客戶端,比如 consumer 消費者在 rebalance 後能夠維持原本的分配方案,可惜的是這個分配策略依舊是在eager協議的框架之下,rebalance 仍然需要每個 consumer 都先放棄當前持有的資源(分區) * Cooperative Sticky: 將一次全局重平衡,改成每次小規模重平衡,直至最終收斂平衡的過程 **eager:** ![image.png](https://hackmd.io/_uploads/SyybjCO7T.png) 圖解: - 最開始的時候,c1 c2 各自發送 hearbeat 給到 group coordinator - 這時候 group coordinator 收到一個 join group 的 request請求,group coordinator 知道有新成員加入組了 - 在下一個心跳中 group coordinator 通知 c1 和 c2,準備rebalance - c1 和 c2 放棄(revoke)各自的 partition,然後發送joingroup 的 request 給 group coordinator - group coordinator 處理好分配方案(交給 leader consumer 分配的),發送 sync group request 給 c1 c2 c3,附帶新的分配方案 - c1 c2 c3 接收到分配方案後,重新開始消費 **cooperative:** ![image.png](https://hackmd.io/_uploads/SkSyjCdXT.png) 圖解: - 最開始的時候c1 c2各自發送hearbeat心跳信息給到group coordinator - 這時候group coordinator收到一個join group的request請求,group coordinator知道有新成員加入組了 - 在下一個心跳中 group coordinator 通知 c1 和 c2 ,準備 rebalance - c1 和 c2 發送 joingroup 的 request 給group coordinator,但不需要 revoke 其所擁有的 partition,而是將其擁有的分區編碼後一並發送給 group coordinator,即 {c1->p1, p2},{c2->p3} - group coordinator 從元數據中獲取當前的分區信息(這個稱為assigned-partitions),再從 c1 c2 的 joingroup request 中獲取分配的分區(這個稱為 owned-partitions),通過assigned-partitions 和 owned-partitions 知曉當前分配情況,決定取消 c1 一個分區 p2 的消費權,然後發送 sync group request({c1->p1},{c2->p3})給c1 c2,讓它們繼續消費p1 p2 - c1 c2 接收到分配方案後,重新開始消費,一次 rebalance 完成,當然這時候 p2 處於無人消費狀態 - 再次觸發rebalance,重覆上述流程,不過這次的目的是把 p2 分配給c3(通過 assigned-partitions 和 owned-partitions 獲取分區分配狀態) * client.id * 該屬性可以是任意字符串,broker 用它來標識從客戶端發送過來的消息,通常被用在日志、度量指標和配額里 * client.rack * 此客戶端的機架標識符,這可以是任何字符串值,指示此客戶端的物理位置 * 它與 broker 配置 broker.rack 相對應 * group.instance.id * 用戶為 consumer 指定的一個 consumer ID * 每個 consumer group 底下的 consumer 的 ID 必須是唯一的 * 一旦設置了該 ID,consumer 就會被視為是一個靜態成員(Static Member) * Static Member 配以較大的 session timeout 設置,能夠避免因臨時不可用(比如重啟)而引發的 Rebalance * receive.buffer.bytes and send.buffer.bytes * socket 在『讀寫數據』時用到的 TCP 緩沖區大小 * 如果它們被設為-1,就使用操作系統的默認值 * 如果生產者或消費者與 broker處於不同的數據中心內,可以適當增大這些值,因為跨數據中心的網絡一般都有比較高的延遲和比較低的帶寬 * offsets.retention.minutes * 記錄 topic 的偏移量日志的保留時長 * default: 7 days * 比如我們將 offsets.retention.minutes 設為10,即十分鐘。然後最後一次主題 A 的消費偏移量是100,十分鐘內繼續消費,會從 101 開始繼續消費,但是十分鐘內我們沒有繼續消費,該記錄主題A的消費偏移量100 的日志將會被清除,也就是下次繼續消費主題 A 的消息時,我們不知道上一次消費哪里了(注意,主題A所存儲的消息依舊在broker上,並沒有被刪除), 在這種情況下,將會根據 auto.offset.reset 的設置,讀取最早(earliest)/最晚(latest)的消息。 ## Commits and Offsets 1. commit offset < 正在處理的 offset => 那麽處於兩個 offset 之間的消息就會被重覆處理 ![image](https://hackmd.io/_uploads/By_WOSxVa.png) 2. commit offset > 正在處理的 offset => 那麽處於兩個 offset 之間的消息將會丟失 ![image](https://hackmd.io/_uploads/Syd4_SgN6.png) ### Automatic Commit * application 不用處理 commit,commit 會在 poll api 裡自動處理 * 當呼叫 poll 時,會檢查是否超過上次 commit 加上 auto.commit.interval.ms 的時間(默認 5s),是的話就 commit 上一次 poll 的最大 offset * 使用這種方式,容易出現提交的 offset 小於客戶端處理的最後一個消息的offset。假設使用默認的 5s commit.interval,在最近一次提交之後的 3s 發生了 rebalance,rebalance 之後,消費者從最後一次提交的offset 位置開始讀取消息。這個時候 offset 已經落後了 3s(因為沒有達到5s的時限,並沒有提交偏移量),所以在這 3s 的數據將會被重覆處理 * 在調用 close() 方法之前也會進行 auto commit (offset 提交 但還沒 consume 完的悲劇) ![image](https://hackmd.io/_uploads/ByPNRQZVa.png) ### Manual Commit * 大部分開發者透過控制 offset 提交時間,來消除丟失消息的可能性,並在發生 rebalance 時減少重覆消息的數量 * 消費者 API 提供了另一種 commit offset 的方式,開發者可以在必要的時候提交當前 offset,而不是基於時間間隔 * 需要把 auto.commit.offset 設為 false,讓應用程序決定何時提交offset #### synchronous Commit * 使用 commitSync() 提交 offset 最簡單也最可靠 * 這個 API 會提交由 poll() 方法返回的最新 offset,提交成功後馬上返回,如果提交失敗就拋出異常 * 在處理完所有記錄後要確保調用了 commitSync(),否則還是會有丟失消息的風險 * 如果發生 rebalance,從最近一批消息到發生 rebalance 之間的所有消息都將被重覆處理 * 同時在這個程序中,只要沒有發生不可恢覆的錯誤,commitSync() 方法會一直嘗試直至提交成功。如果提交失敗,我們也只能把異常記錄到錯誤日志 * synchronous Commit 有一個不足之處: 在 broker 對提交請求作出回應之前,應用程序會一直阻塞,這樣會限制應用程序的吞吐量。可以通過降低提交頻率來提升吞吐量,但如果發生 rebalance,會增加重覆消息的數量 #### Asynchronous Commit * application 自己掌控 commit 時機,但不需要立刻知道 commit 結果 * commitAsync api 不等 broker response,而是用 callback 的方式等到 broker 回應後才處理 commit 結果 * commitAsync 失敗後不會自動 retry,因為自動 retry 可能會造成問題(e.q.假設一開始commit offset 2000 時遇到短暫的 connection 問題,而隨後的 offset 3000 commit 成功了,這時 offset 2000 要 retry 並且成功就可能會把 offset commit 較小的位置) * 在成功提交或碰到無法恢覆的錯誤之前,commitSync() 會一直重試,但是 commitAsync() 不會,這也是 commitAsync() 不好的一個地方。 * commitAsync() 也支持 callback,在 broker 作出響應時會執行callback。callback 經常被用於記錄提交錯誤或生成度量指標。如果要用它來進行重試,則一定要注意提交的順序 ##### Retrying Async Commits * A simple pattern to get the commit order right for asynchronous retries is to use a monotonically increasing sequence number. * Increase the sequence number every time you commit, and add the sequence number at the time of the commit to the commitAsync callback. * When you’re getting ready to send a retry, check if the commit sequence number the callback got is equal to the instance variable * if it is, there was no newer commit and it is safe to retry. * If the instance sequence number is higher, don’t retry because a newer commit was already sent. #### Combining Synchronous and Asynchronous Commits * 一般情況下,偶爾出現提交失敗,不進行重試不會有太大問題,因為如果提交失敗是因為臨時問題導致的,那麽後續的提交總會有成功的 * 如果這是發生在關閉 consumer 或 rebalance 前的最後一次提交,就要確保能夠提交成功,在這種情況下,我們應該考慮使用 Combining Commits 的方法 * 正常運行: 使用 commitAsync 方法來進行提交,這樣的運行速度更快,而且就算當前提交失敗,下次提交成功也可以 * 非正常運行: 如果直接關閉 consumer,就沒有所謂的 “下一次提交”,因為不會再調用 poll() 方法,所以要使用 commitSync 方法一直重試,直到提交成功或發生無法恢覆的錯誤 #### Committing a Specified Offset * 無法單用 commitSync() 或 commitAsync() 來實現,因為它們只會提交最後一個偏移量 * 如果 poll() 方法返回一大批數據,為了避免因 rebalance 引起的重覆處理整批消息,想要在批次中間 commit offset,可以在調用 commitSync() 和 commitAsync() 方法時傳進去希望提交的 partition 和 offset 的 map * consumer 可能不只讀取一個 partition,你需要跟蹤所有 partition 的 offset,所以在這個層面上控制 offset 的提交會讓代碼變覆雜 ##### python code ``` import json import logging import pymysql from confluent_kafka import Consumer logger = logging.getLogger() # kafka 消費者 settings = { 'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094', 'security.protocol': 'SASL_PLAINTEXT', 'sasl.mechanism': 'SCRAM-SHA-256', 'sasl.username': 'myname', 'sasl.password': 'mypasswd', 'group.id': 'myid', 'enable.auto.commit': False } c = Consumer(settings, logger=logger) c.subscribe(['customers']) # 消息消費和處理流程 done_msg = None counts = 0 try: while True: msg = c.poll(timeout=1.0) if msg is None: continue elif msg.error(): logger.error(msg.error()) else: print(msg) logger.info('Message done: %s', msg) done_msg = msg counts += 1 if (counts % 50) == 0: c.commit(message=done_msg, asynchronous=True) # 異步提交 offset except KeyboardInterrupt: pass except Exception as e: logger.error(e, exc_info=True) # 關閉 conn finally: try: c.commit(message=done_msg, asynchronous=False) # 同步提交 offset finally: c.close() ### 解釋 c.commit() 如何指定 offset? 1. parameter offsets: explicitly list the offsets for each assigned topic partition 2. parameter message : will commit offsets relative to a Message object returned by poll() ### ``` ## Rebalance Listeners * 在 partition rebalance 前,如果 consumer 知道它即將不再負責某個 partition,那麽它可能需要將已經處理過的消息 offset 進行 commit,甚至需要關閉 file handles, database connection (cleanup work) * Kafka API 允許我們在 consumer 新增 partition 或者失去 partition 時進行處理,我們只需要在調用 subscribe() 方法時傳入ConsumerRebalanceListener 對象 ConsumerRebalanceListener 對象有 3 個方法: 1. public void onPartitionsAssigned(Collection<TopicPartition> partitions) 此方法在 partition 分配給 consumer 後,在 consumer 開始讀取消息前調用 2. public void onPartitionsRevoked(Collection<TopicPartition> partitions) 此方法會在 consumer 停止消費消費後,在 rebalance 開始前調用 3. public void onPartitionsLost(Collection<TopicPartition> partitions) 只有在 cooperative rebalancing 會用到,而且只有在特殊情況會發生: c1 的 partition 要給 c2 處理,但 c1 卻沒有先 revoke(正常來說都要先 revoke) c2 可能已經有自己的狀態,所以要小心 conflict 如果沒有用此方法 onPartitionsRevoked() 會被調用 ### python code: on_assign, on_revoke ``` from confluent_kafka import Consumer, KafkaError conf = { 'bootstrap.servers': 'your_bootstrap_servers', 'group.id': 'your_consumer_group', 'auto.offset.reset': 'earliest' } def on_assign(consumer, partitions): # 在重新平衡之前調用,可以在這里執行一些邏輯 print('Rebalance: Assigning partitions:', partitions) # 在這里設置初始偏移量,如果你有需要的話 for partition in partitions: partition.offset = 0 # 提交初始偏移量 consumer.assign(partitions) def on_revoke(consumer, partitions): # 在重新平衡之後調用,可以在這里執行一些邏輯 print('Rebalance: Revoking partitions:', partitions) # 在這里提交最新的偏移量 consumer.commit(offsets={partition.topic: {partition.partition: partition.offset} for partition in partitions}) # 取消分配分區 consumer.unassign() consumer = Consumer(conf) # 设置 設置 Rebalance 回調函數 consumer.subscribe(['your_topic'], on_assign=on_assign, on_revoke=on_revoke) # 消費msg 省略囉~ ``` ## Consuming Records with Specific Offsets * 在此之前,我們使用 poll() 來從 last committed offset 開始消費,但我們也可以從一個指定的 offest 開始消費 * 如果想從 beginning of the partition 重新開始消費,那麽可以使用seekToBeginning(TopicPartition tp); * 如果想從 end of the partition 消費最新的消息,那麽可以使用seekToEnd(TopicPartition tp) * Kafka 還支持我們從 specific offset 開始消費,且應用場景有很多,其中最典型的一個是:offset 存在其他系統(例如數據庫)中,並且以其他系統的 offset 為準 ### python code: 指定 specific offset ``` from confluent_kafka import Consumer, KafkaError c = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': 'mygroup', 'auto.offset.reset': 'earliest'}) c.subscribe(['mytopic']) # seek()方法將分區的偏移量設置為0,意味著消費者將從分區的開頭開始消費消息 c.seek(TopicPartition('mytopic', 0), 0) ``` ## But How Do We Exit 1. 在一般情況下,我們會在一個主線程中循環 poll 消息並進行處理 2. 當需要退出 poll 循環時,我們可以使用另一個線程調用 consumer.wakeup(),調用此方法會使得 poll() 拋出WakeupException,如果 consumer poll loop 在主線程跑,則可以調用 ShutdownHook 來達成 3. 如果調用 wakup 時,主線程正在處理消息,那麽在下一次主線程調用 poll 時會拋出異常: WakeUpException 4. 主線程在拋出 WakeUpException 後,需要調用 consumer.close(),此方法會提交 offset,同時發送一個退出 consumer group 的消息到Kafka 的 group coordinator 5. group coordinator 收到消息後會立即進行 rebalance(而無需等待此consumer session timeout) (python 沒有查到相對應的 function) ## Deserializers 反序列化 * Kafka producer 負責將 object 序列化成 bytes arrays 並發送到 Kafka,consumer 則需要將 bytes arrays 轉換成 object,這就是反序列化做的事情 * 序列化與反序列化需要匹配,因此作為開發者,需要關注寫入到主題使用的是什麽序列化格式,並且保證寫入的數據能夠被 consumer 反序列化成功 * 使用 Avro 與 Schema Registry 來序列化與反序列化,那麽事情會輕鬆許多,因為 AvroSerializer 會保證所有寫入的數據都是結構兼容的,並且能夠被反序列化出來 ### Custom Deserializers * 提醒!!不推薦實現自定義的序列化與反序列化,因為往往這些方案並不成熟,難以維護和升級,而且容易出錯 * 可以使用JSON、Thrift、Protobuf或者Avro的成熟的解決方案 ### Using Avro Deserialization with Kafka Consumer #### python code ``` pip install confluent-kafka[avro] ``` ``` from confluent_kafka import KafkaError, Consumer from confluent_kafka.avro import AvroConsumer # 配置 conf = { 'bootstrap.servers': 'broker1:9092,broker2:9092', 'group.id': 'CountryCounter', 'key.deserializer': 'io.confluent.kafka.serializers.StringDeserializer', 'value.deserializer': 'io.confluent.kafka.serializers.KafkaAvroDeserializer', 'schema.registry.url': 'your_schema_registry_url', # 替換為你的 Schema Registry URL 不一定要有! } # 創建 AvroConsumer consumer = AvroConsumer(conf) # 訂閱主題 topic = 'customerContacts' consumer.subscribe([topic]) print(f'Reading topic: {topic}') try: while True: # 拉取消息 records = consumer.poll(1000) for record in records: if record.error(): if record.error().code() == KafkaError._PARTITION_EOF: # End of partition event continue else: print(record.error()) break # 處理消息 value = record.value() if value is not None: print(f'Current customer name is: {value["name"]}') # 替換為你的 Avro 消息中的字段名 # 同步提交偏移量 consumer.commit() except KeyboardInterrupt: pass finally: consumer.close() ``` ## Standalone Consumer: Why and How to Use a Consumer Without a Group * 一般情況下我們都是 consumer group(即便只有一個 consumer)來消費消息,因為這樣可以在增加或減少 consumer 時自動進行 rebalance,這種方式是推薦的 * 在知道 topic 和 partition 的情況下,也可以使用 Standalone Consumer 來進行消費,需要自己給 consumer 分配消費 partition,而不是讓 consumer 訂閱(成為消費組)topic * Standalone Consumer 除了需要主動獲取 partition 以及沒有 rebalance,其他的處理邏輯都是一樣的 * 需要注意: 如果添加了新的 partition,這個 consumer 是感知不到的,需要通過 consumer.partitionsFor() 來重新獲取 partition。 #### python code ``` from confluent_kafka import Consumer, KafkaError, TopicPartition conf = { 'bootstrap.servers': 'your_bootstrap_servers', 'group.id': 'your_consumer_group', 'auto.offset.reset': 'earliest', # OR 'latest' } consumer = Consumer(conf) # 獲取主題的分區信息 topic = 'your_topic' partition_infos = consumer.list_topics(topic=topic).topics[topic].partitions.values() # 構建 TopicPartition 對象的列表 partitions = [TopicPartition(topic, partition.partition) for partition in partition_infos] # 手動分配分區 consumer.assign(partitions) try: while True: # 處理消息 ...... # 同步提交偏移量 consumer.commit() except KeyboardInterrupt: pass finally: consumer.close() ``` ## ref [kafka 參數說明](https://www.cnblogs.com/hello-/articles/15577525.html ) [rebalance 解說](https://zhuanlan.zhihu.com/p/337469858) [讀書筆記1](https://arronhong.github.io/2021/05/25/Introduction%20to%20Kafka/#more ) [讀書筆記2](https://blog.csdn.net/skycanf/article/details/81668584?spm=1001.2101.3001.6650.7&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-7-81668584-blog-121654059.235%5Ev38%5Epc_relevant_default_base3&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-7-81668584-blog-121654059.235%5Ev38%5Epc_relevant_default_base3&utm_relevant_index=13) [offset commit](https://zhuanlan.zhihu.com/p/577059966) [offset commit 2](https://www.cnblogs.com/fnlingnzb-learner/p/13429705.html) [kafka python code](https://blog.51cto.com/quantfabric/2504495?u_atoken=0b0f4c57-ac73-4739-979c-d0c51e77380d&u_asession=01V1-F_gi0UGcAOSTkXA2DxcAyyCuA5WIJgGj8TMXiA9LdlceKqQklU4xBcbi1G1vMPObCD4f2uBDuabLWwX9PDdsq8AL43dpOnCClYrgFm6o&u_asig=05CVKgncGS6zO5dOpiej7tL7BzLemFYDd8kxsMfD1iB_vY9PSSjBHxjiUhl-HRrnEpnLnstK_XrR0KmV_bYXbUoLf2xiU3uClTzl0R5l7JaZogFJfZno2402tRCHR2UeUncnBP3-n_jkhHRfZ9w8ceZqwk7vdM6WwoyaGMxdSYBBKABXu60eqbqNPNMBIK8bMVksmHjM0JOodanL5-M1Qs1XMoGEX5Mht9a1BRcKgahZHyh7ikxj749pp0REMktVBy1mn-1zbfsI-O5V0v-zruxEalWuX0Cj2gkXnO6VeaKeTY94r_LXIIil3Y3aVPRGAe&u_aref=HHl08ZObiVmDJ7YRGf5SxOI6R78%3D)