# 4. Kafka Consumers: Reading Data from Kafka
## Kafka Consumer Concepts
### Consumers and Consumer Groups
#### consumer
#### Consumer group
- 1 組 Consumer group 共同合作處理一些 topics,也就是一組consumers 分擔多個 partitions 的 loading
- 透過增減 member 的數量來拓展處理 messages 的能力,因此topic擁有數量較多的partitions,在 scale 上較有彈性
- 1 個 partition 只會被同個 group 內的一個 member consume,因此 member 的數量超過 partition 的數量時,就會有 consumer 會閒置

### Consumer Protocol
#### 名詞認識
- group coordinator: 在 broker 端會有一個 broker 來擔任(不同的 group 會有不同的 broker),coordinator 負責處理 consumer heartbeat、指派 group leader 以及 consumer 加入&離開等事務
- group leader: 第一個加入 group 的 consumer 成為group leader
#### startup, consume, shutdown
1. Startup
- consumer 起來時先問 broker 使用的 api version
- 接著透過 metadata request 詢問 cluster information(e.q. broker address、partition 數量以及 partition leader等)
- consumer 接著會尋找 coordinator 並送 JoinGroup reuqest 給coordinator 要求加入group,第一個加入 group 的會成為 group leader
- group leader 會從 coordinator 得知 group 中所有 members 的資訊(資訊存在zookeeper),group leader 負責分配 partitions 並將名單送給 coordinator
- 接著 members 詢問 coordinator 得知自己的 assignment

2. Consumption
- consumer 開始拉取 message 時要先知道要從哪裡開始,可以透過 fetch offset request 得知該 partition 上次處理到哪個 offset
- fetch offset request 不是必要的,只要 consumer 已經知道offset,那只要在 fetch reques t時可以聲明即可
- 隨著 fetch request 不停地拉取 messages,consumer 還必須適時的向 broker 更新 offset 以及向 coordinator 發送 heartbeat

3. Shutdown
- consumer 發送 leave group request 以 gracefully shutdown

### Poll loop
- consumer poll api 封裝大部分的動作,包含 partition rebalance、送 heatbeat 及 data fetching,這樣的設計讓application 只要處理資料就好
- consumer必須要在一定的時間內送出 heartbeat,否則會被認為 not alive
- 因此處理資料的時間必需短於 session timeout 的時間
- poll api 回傳 messages,每筆message包含key、value、partition、offset 和 topic
- poll 可以設定參數,控制 block 多久時間來等待 consumer buffer 裡有資料,時間長短端看 application 想要多快拿回控制
#### Thread Safety
- 同一格群組裡,無法讓一個 thread 運行多個 consumer
- 也無法讓多個 thread 安全的共享一個 consumer
- 按照規則:one consumer one thread
### Rebalance
某些情況發生時,consumer group 內的 partitions 需要重新分配,好讓每個 partition 都有被處理以及盡可能地平均分配
#### 發生 rebalance 的情況:
- consumer 加入 group
- consumer 離開 group
- topic 有新增 partition
- broker failure,該 broker leader partition 轉讓
#### rebalance 歷程:
- consumer 會定時送 heartbeat 給 coordinator
- 當 coordinator 一段時間沒收到 heartbeat 時便認定 consumer 已經退出 group,觸發 rebalance
- 在執行 rebalance 的期間,整個 group 不會 consume message
- 在 coordinator 尚未發現 consumer 已經退出(e.q. consumer crash)的這段時間,會使 partition 的 messages 暫停被 consume,直到 heartbeat session timeout(應該就是等待一下 確認 consumer 是真的死,才觸發 rebalance)
- consumer 明確地告知 group coordinator 退出,可使 group coordinator 立即觸發 rebalance 以便降低無法處理 partition的gap

#### 為人詬病點:
因為 rebalance 過程會觸發 stop-the-world(STW),此時對應 topic 的資源都會處於不可用的狀態。小規模的集群還好,如果是大規模的集群,比如幾百個節點的 consumer 或 kafka connect 等,就是一場災難。
所以 kafka2.4的時候,社區推出兩個新feature來解決重平衡過程中STW的問題:
* Incremental Rebalance Protocol(以下簡稱cooperative協議):改進了eager協議(即舊重平衡協議)的問題,避免STW的發生
* static Group membership:避免重起或暫時離開的消費者觸發重平衡
### Static Group Membership
#### why
* 目前的 Rebalance 設計中,消費者組下的每個實例都會被Coordinator 分配一個成員 member.id,這個 member ID 是Kafka 自動生成的
* member.id 規則是 client.id-UUID,client.id 就是 Consumer 端參數 client.id 的值,而且這個 ID 會隨著每輪Rebalance 發生變化的
* 因為 Coordinator 無法記住每個成員都是誰,制約 Rebalance 時所有成員必須強制重新加入
* 源代碼可以發現:每次 Client 重啟回來發送 JoinGroup 時,它會封裝一個 UNKNOWN_MEMBER_ID 的空串,沒有任何有意義的信息給到Broker端,Coordinator 接收到後只能把它當做是一個全新的成員
* 如果 member.id 能夠被記住,那麽 Coordinator 就可以容忍它短暫的離線而不開啟 Rebalance,從而縮短消費者組整體不可用的時間窗口
#### how
* 2.3和2.4版本引入了靜態成員(Static Member)的概念以及一個新的 Consumer 端參數:group.instance.id。
* 一旦配置了該參數,成員將自動成為靜態成員,否則的話和以前一樣依然被視為是動態成員
* 你可以認為這個新參數是一個『要被持久化的新 member.id』
* 它依然不能由用戶指定,構建規則是 group.instsance.id-UUID
* 和 member.id 不同的是,每次成員重啟回來後,其靜態成員 ID 值是不變的,因此之前分配給該成員的所有分區也是不變的,而且在沒有超時前靜態成員重啟回來是不會觸發 Rebalance 的
#### 好處:
- 其靜態成員ID值是不變的,因此之前分配給該成員的所有分區也是不變的
- 假設一個成員掛掉,在沒有超時前,靜態成員重啟回來是不會觸發 Rebalance 的(超時時間為 session.timeout.ms,默認10 sec)
- 在靜態成員掛掉這段時間,broker 會一直為該消費者保存狀態(offset),直到超時或靜態成員重新連接
#### 使用了 static membership 功能後,觸發 rebalance 的條件如下:
- 新成員加入組:這個條件依然不變。當有新成員加入時肯定會觸發 Rebalance 重新分配分區
- Leader 成員重新加入組:比如主題分配方案發生變更
- 現有成員離組時間超過了 session.timeout.ms 超時時間:即使它是靜態成員,Coordinator 也不會無限期地等待它。一旦超過了 session.timeout.ms 時間依然會觸發 Rebalance
- Coordinator 接收到 LeaveGroup 請求:成員主動通知 Coordinator 永久離組
#### 使用 static membership 的兩個條件是:
1. consumer 客戶端添加配置:props.put("group.instance.id", "con1");
2. 設置 session.timeout.ms 為一個合理的時間,這個參數受限於group.min.session.timeout.ms(6 sec)和 group.max.session.timeout.ms(30 min),即大小不能超過這個上下限。但是調的過大也可能造成 broker不斷等待掛掉的消費者客戶端的情況

## Creating a Kafka Consumer
- 在讀取消息之前,需要先創建一個 Consumer object
- 創建 Consumer object,把想要傳給 Consumer 的屬性放在 Properties 里
- 需要使用 3 個必要的屬性: bootstrap.servers、 key.deserializer、 value.deserializer
#### python code
```
from confluent_kafka import Consumer
conf = {'bootstrap.servers': 'host1:9092,host2:9092',
'group.id': 'foo',
'auto.offset.reset': 'smallest'}
consumer = Consumer(conf)
```
## Subscribing to Topics
- subscribe() 可以傳入一個正則表達式,正則表達式可以匹配多個主題如果有人創建了新的主題,並且主題名與正則表達式匹配,那麽會立即觸發一次再均衡,消費者就可以讀取新添加的主題
#### python code
```
running = True
def basic_consume_loop(consumer, topics):
try:
consumer.subscribe(topics)
while running:
msg = consumer.poll(timeout=1.0)
if msg is None: continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
msg_process(msg)
finally:
# Close down consumer to commit final offsets.
consumer.close()
def shutdown():
running = False
```
備註: 很重要!!最後要執行 Consumer.close(),這樣才能確保 active sockets 關掉,同時也將觸發 group rebalance,將 partition 做 re-assigned,如果沒有正確關閉,就是在 session timeout 後做 rebalance
## Configuring Consumers
* fetch.min.bytes
* 控制 broker 最小回傳 messages 的 size,如果可取得的 messages size 不夠,broker 會 block 直到 size 夠才回傳
* 可以提高 fetch.min.bytes 來降低 RTT(來回通訊延遲),並且在 consumer 數量很多的時候可以降低 brokers 的 loading
* fetch.max.wait.ms
* 當 messages size 不足 fetch.min.bytes 的要求時,broker 最多 block 多久就要回應 request
* fetch.max.bytes
* 一次拉取 messages 最大返回的 size,默認為1M,如果一個分區的第一批消息大小大於該值也會返回,為了保證 consume 得程序可以一直進行
* max.poll.records
* 該屬性用於控制單次調用 call() 方法能夠返回的記錄數量,可以幫你控制在輪詢里需要處理的數據量
* max.partition.fetch.bytes
* 每個 partition 每次回傳給 consumer 的最大 size,避免 run out of consumer memory
* 需注意設置太大時,可能使 data processing 的時間過久,而無法在 session timeout 之前送 heartbeat
* 這個設置是 per partition 的,因此要跟 consumer 總共處理partitions 數量一起計算是否超過 consumer memory
* 假設有 20 個 partitions、5 個 consumers 且max.partition.fetch.byte 是 1MB,則每個 consumer 就需要大於 4MB 的可用 memory。但實際上會需要更多的 memory,因為 group 裡的 consumer 可能會離開
* session.timeout.ms
* heartbeats 有效時間
* 較低的 timeout 時間可以快速地偵測不預期退出的 consumer,但也較容易誤判造成 rebalance
* 較高的 timeout 會使恢復 partition consume 的 gap 較大,但也較不易誤判造成 rebalance
* heartbeat.interval.ms
- 多久送 heartbeats,通常與 session.timeout.ms 一起調整
* max.poll.interval.ms
* 兩次 poll 方法調用的最大間隔時間,單位毫秒,默認為 5 分鐘
* 如果 consumer 在該間隔內沒有發起 poll 操作,該 consumer 將被踢除,觸發 rebalance,將該 consumer 分配的 partition 分配給其他 consumer
* default.api.timeout.ms
* 指定客戶端 api 的超時時間,default: 1 min
* 此配置用於「未指定超時參數」的所有客戶端操作的默認超時
* request.timeout.ms
* 請求的超時時間,與 Broker 端的網絡通訊的請求超時時間,default: 30s
* auto.offset.reset
* 當 consumer 提供的 offset 不合法(offset 已經被刪除或者 offset 不存在)時,要從哪裡(latest or earliest)開始 consume
* value: earliest(or smallest), latest(or largest)(default)
* enable.auto.commit
* 當設為 true 時,會根據 auto.commit.interval.ms 時間自動 commit offset
* partition.assignment.strategy
* PartitionAssignor 根據給定的消費者和主題,決定哪些分區應該被分配給哪個消費者
* 分區方法
* Range: 會把主題的若干個連續的分區分配給消費者。假設悄費者 C1 和消費者 C2 同時 訂閱了主題 T1 和主題 T2,且每個主題有 3 個分區。那麽消費者 C1 有可能分配到這兩個主題的分區 0 和分區 1,而消費者 C2 分配到這兩個主題的分區 2。因為每個主題 擁有奇數個分區,而分配是在主題內獨立完成的,第一個消費者最後分配到比第二個消費者更多的分區。只要使用了 Range 策略,而且分區數量無法被消費者數量整除,就會出現這種情況
* RoundRobin:把主題的所有分區逐個分配給消費者。假設給消費者 C1 和消費者 C2 分配分區,那麽消費者 C1 將分到主題 T1 的分區 0 和分區 2 以及主題 T2 的分區 1,消費者 C2 將分配到主題 T1 的分區 l 以及主題T2 的分區 0 和分區 2。一般 來說,如果所有消費者都訂閱相同的主題(這種情況很常見), RoundRobin策略會給所 有消費者分配相同數量的分區(或最多就差一個分區)。
* Sticky: 保證客戶端,比如 consumer 消費者在 rebalance 後能夠維持原本的分配方案,可惜的是這個分配策略依舊是在eager協議的框架之下,rebalance 仍然需要每個 consumer 都先放棄當前持有的資源(分區)
* Cooperative Sticky: 將一次全局重平衡,改成每次小規模重平衡,直至最終收斂平衡的過程
**eager:**

圖解:
- 最開始的時候,c1 c2 各自發送 hearbeat 給到 group coordinator
- 這時候 group coordinator 收到一個 join group 的 request請求,group coordinator 知道有新成員加入組了
- 在下一個心跳中 group coordinator 通知 c1 和 c2,準備rebalance
- c1 和 c2 放棄(revoke)各自的 partition,然後發送joingroup 的 request 給 group coordinator
- group coordinator 處理好分配方案(交給 leader consumer 分配的),發送 sync group request 給 c1 c2 c3,附帶新的分配方案
- c1 c2 c3 接收到分配方案後,重新開始消費
**cooperative:**

圖解:
- 最開始的時候c1 c2各自發送hearbeat心跳信息給到group coordinator
- 這時候group coordinator收到一個join group的request請求,group coordinator知道有新成員加入組了
- 在下一個心跳中 group coordinator 通知 c1 和 c2 ,準備 rebalance
- c1 和 c2 發送 joingroup 的 request 給group coordinator,但不需要 revoke 其所擁有的 partition,而是將其擁有的分區編碼後一並發送給 group coordinator,即 {c1->p1, p2},{c2->p3}
- group coordinator 從元數據中獲取當前的分區信息(這個稱為assigned-partitions),再從 c1 c2 的 joingroup request 中獲取分配的分區(這個稱為 owned-partitions),通過assigned-partitions 和 owned-partitions 知曉當前分配情況,決定取消 c1 一個分區 p2 的消費權,然後發送 sync group request({c1->p1},{c2->p3})給c1 c2,讓它們繼續消費p1 p2
- c1 c2 接收到分配方案後,重新開始消費,一次 rebalance 完成,當然這時候 p2 處於無人消費狀態
- 再次觸發rebalance,重覆上述流程,不過這次的目的是把 p2 分配給c3(通過 assigned-partitions 和 owned-partitions 獲取分區分配狀態)
* client.id
* 該屬性可以是任意字符串,broker 用它來標識從客戶端發送過來的消息,通常被用在日志、度量指標和配額里
* client.rack
* 此客戶端的機架標識符,這可以是任何字符串值,指示此客戶端的物理位置
* 它與 broker 配置 broker.rack 相對應
* group.instance.id
* 用戶為 consumer 指定的一個 consumer ID
* 每個 consumer group 底下的 consumer 的 ID 必須是唯一的
* 一旦設置了該 ID,consumer 就會被視為是一個靜態成員(Static Member)
* Static Member 配以較大的 session timeout 設置,能夠避免因臨時不可用(比如重啟)而引發的 Rebalance
* receive.buffer.bytes and send.buffer.bytes
* socket 在『讀寫數據』時用到的 TCP 緩沖區大小
* 如果它們被設為-1,就使用操作系統的默認值
* 如果生產者或消費者與 broker處於不同的數據中心內,可以適當增大這些值,因為跨數據中心的網絡一般都有比較高的延遲和比較低的帶寬
* offsets.retention.minutes
* 記錄 topic 的偏移量日志的保留時長
* default: 7 days
* 比如我們將 offsets.retention.minutes 設為10,即十分鐘。然後最後一次主題 A 的消費偏移量是100,十分鐘內繼續消費,會從 101 開始繼續消費,但是十分鐘內我們沒有繼續消費,該記錄主題A的消費偏移量100 的日志將會被清除,也就是下次繼續消費主題 A 的消息時,我們不知道上一次消費哪里了(注意,主題A所存儲的消息依舊在broker上,並沒有被刪除), 在這種情況下,將會根據 auto.offset.reset 的設置,讀取最早(earliest)/最晚(latest)的消息。
## Commits and Offsets
1. commit offset < 正在處理的 offset
=> 那麽處於兩個 offset 之間的消息就會被重覆處理

2. commit offset > 正在處理的 offset
=> 那麽處於兩個 offset 之間的消息將會丟失

### Automatic Commit
* application 不用處理 commit,commit 會在 poll api 裡自動處理
* 當呼叫 poll 時,會檢查是否超過上次 commit 加上 auto.commit.interval.ms 的時間(默認 5s),是的話就 commit 上一次 poll 的最大 offset
* 使用這種方式,容易出現提交的 offset 小於客戶端處理的最後一個消息的offset。假設使用默認的 5s commit.interval,在最近一次提交之後的 3s 發生了 rebalance,rebalance 之後,消費者從最後一次提交的offset 位置開始讀取消息。這個時候 offset 已經落後了 3s(因為沒有達到5s的時限,並沒有提交偏移量),所以在這 3s 的數據將會被重覆處理
* 在調用 close() 方法之前也會進行 auto commit
(offset 提交 但還沒 consume 完的悲劇)

### Manual Commit
* 大部分開發者透過控制 offset 提交時間,來消除丟失消息的可能性,並在發生 rebalance 時減少重覆消息的數量
* 消費者 API 提供了另一種 commit offset 的方式,開發者可以在必要的時候提交當前 offset,而不是基於時間間隔
* 需要把 auto.commit.offset 設為 false,讓應用程序決定何時提交offset
#### synchronous Commit
* 使用 commitSync() 提交 offset 最簡單也最可靠
* 這個 API 會提交由 poll() 方法返回的最新 offset,提交成功後馬上返回,如果提交失敗就拋出異常
* 在處理完所有記錄後要確保調用了 commitSync(),否則還是會有丟失消息的風險
* 如果發生 rebalance,從最近一批消息到發生 rebalance 之間的所有消息都將被重覆處理
* 同時在這個程序中,只要沒有發生不可恢覆的錯誤,commitSync() 方法會一直嘗試直至提交成功。如果提交失敗,我們也只能把異常記錄到錯誤日志
* synchronous Commit 有一個不足之處: 在 broker 對提交請求作出回應之前,應用程序會一直阻塞,這樣會限制應用程序的吞吐量。可以通過降低提交頻率來提升吞吐量,但如果發生 rebalance,會增加重覆消息的數量
#### Asynchronous Commit
* application 自己掌控 commit 時機,但不需要立刻知道 commit 結果
* commitAsync api 不等 broker response,而是用 callback 的方式等到 broker 回應後才處理 commit 結果
* commitAsync 失敗後不會自動 retry,因為自動 retry 可能會造成問題(e.q.假設一開始commit offset 2000 時遇到短暫的 connection 問題,而隨後的 offset 3000 commit 成功了,這時 offset 2000 要 retry 並且成功就可能會把 offset commit 較小的位置)
* 在成功提交或碰到無法恢覆的錯誤之前,commitSync() 會一直重試,但是 commitAsync() 不會,這也是 commitAsync() 不好的一個地方。
* commitAsync() 也支持 callback,在 broker 作出響應時會執行callback。callback 經常被用於記錄提交錯誤或生成度量指標。如果要用它來進行重試,則一定要注意提交的順序
##### Retrying Async Commits
* A simple pattern to get the commit order right for asynchronous retries is to use a monotonically increasing sequence number.
* Increase the sequence number every time you commit, and add the sequence number at the time of the commit to the commitAsync callback.
* When you’re getting ready to send a retry, check if the commit sequence number the callback got is equal to the instance variable
* if it is, there was no newer commit and it is safe to retry.
* If the instance sequence number is higher, don’t retry because a newer commit was already sent.
#### Combining Synchronous and Asynchronous Commits
* 一般情況下,偶爾出現提交失敗,不進行重試不會有太大問題,因為如果提交失敗是因為臨時問題導致的,那麽後續的提交總會有成功的
* 如果這是發生在關閉 consumer 或 rebalance 前的最後一次提交,就要確保能夠提交成功,在這種情況下,我們應該考慮使用 Combining Commits 的方法
* 正常運行: 使用 commitAsync 方法來進行提交,這樣的運行速度更快,而且就算當前提交失敗,下次提交成功也可以
* 非正常運行: 如果直接關閉 consumer,就沒有所謂的 “下一次提交”,因為不會再調用 poll() 方法,所以要使用 commitSync 方法一直重試,直到提交成功或發生無法恢覆的錯誤
#### Committing a Specified Offset
* 無法單用 commitSync() 或 commitAsync() 來實現,因為它們只會提交最後一個偏移量
* 如果 poll() 方法返回一大批數據,為了避免因 rebalance 引起的重覆處理整批消息,想要在批次中間 commit offset,可以在調用 commitSync() 和 commitAsync() 方法時傳進去希望提交的 partition 和 offset 的 map
* consumer 可能不只讀取一個 partition,你需要跟蹤所有 partition 的 offset,所以在這個層面上控制 offset 的提交會讓代碼變覆雜
##### python code
```
import json
import logging
import pymysql
from confluent_kafka import Consumer
logger = logging.getLogger()
# kafka 消費者
settings = {
'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanism': 'SCRAM-SHA-256',
'sasl.username': 'myname',
'sasl.password': 'mypasswd',
'group.id': 'myid',
'enable.auto.commit': False
}
c = Consumer(settings, logger=logger)
c.subscribe(['customers'])
# 消息消費和處理流程
done_msg = None
counts = 0
try:
while True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
elif msg.error():
logger.error(msg.error())
else:
print(msg)
logger.info('Message done: %s', msg)
done_msg = msg
counts += 1
if (counts % 50) == 0:
c.commit(message=done_msg, asynchronous=True) # 異步提交 offset
except KeyboardInterrupt:
pass
except Exception as e:
logger.error(e, exc_info=True)
# 關閉 conn
finally:
try:
c.commit(message=done_msg, asynchronous=False) # 同步提交 offset
finally:
c.close()
### 解釋
c.commit() 如何指定 offset?
1. parameter offsets: explicitly list the offsets for each assigned topic partition
2. parameter message : will commit offsets relative to a Message object returned by poll()
###
```
## Rebalance Listeners
* 在 partition rebalance 前,如果 consumer 知道它即將不再負責某個 partition,那麽它可能需要將已經處理過的消息 offset 進行 commit,甚至需要關閉 file handles, database connection (cleanup work)
* Kafka API 允許我們在 consumer 新增 partition 或者失去 partition 時進行處理,我們只需要在調用 subscribe() 方法時傳入ConsumerRebalanceListener 對象
ConsumerRebalanceListener 對象有 3 個方法:
1. public void onPartitionsAssigned(Collection<TopicPartition> partitions)
此方法在 partition 分配給 consumer 後,在 consumer 開始讀取消息前調用
2. public void onPartitionsRevoked(Collection<TopicPartition> partitions)
此方法會在 consumer 停止消費消費後,在 rebalance 開始前調用
3. public void onPartitionsLost(Collection<TopicPartition> partitions)
只有在 cooperative rebalancing 會用到,而且只有在特殊情況會發生: c1 的 partition 要給 c2 處理,但 c1 卻沒有先 revoke(正常來說都要先 revoke) c2 可能已經有自己的狀態,所以要小心 conflict
如果沒有用此方法 onPartitionsRevoked() 會被調用
### python code: on_assign, on_revoke
```
from confluent_kafka import Consumer, KafkaError
conf = {
'bootstrap.servers': 'your_bootstrap_servers',
'group.id': 'your_consumer_group',
'auto.offset.reset': 'earliest'
}
def on_assign(consumer, partitions):
# 在重新平衡之前調用,可以在這里執行一些邏輯
print('Rebalance: Assigning partitions:', partitions)
# 在這里設置初始偏移量,如果你有需要的話
for partition in partitions:
partition.offset = 0
# 提交初始偏移量
consumer.assign(partitions)
def on_revoke(consumer, partitions):
# 在重新平衡之後調用,可以在這里執行一些邏輯
print('Rebalance: Revoking partitions:', partitions)
# 在這里提交最新的偏移量
consumer.commit(offsets={partition.topic: {partition.partition: partition.offset} for partition in partitions})
# 取消分配分區
consumer.unassign()
consumer = Consumer(conf)
# 设置 設置 Rebalance 回調函數
consumer.subscribe(['your_topic'], on_assign=on_assign, on_revoke=on_revoke)
# 消費msg 省略囉~
```
## Consuming Records with Specific Offsets
* 在此之前,我們使用 poll() 來從 last committed offset 開始消費,但我們也可以從一個指定的 offest 開始消費
* 如果想從 beginning of the partition 重新開始消費,那麽可以使用seekToBeginning(TopicPartition tp);
* 如果想從 end of the partition 消費最新的消息,那麽可以使用seekToEnd(TopicPartition tp)
* Kafka 還支持我們從 specific offset 開始消費,且應用場景有很多,其中最典型的一個是:offset 存在其他系統(例如數據庫)中,並且以其他系統的 offset 為準
### python code: 指定 specific offset
```
from confluent_kafka import Consumer, KafkaError
c = Consumer({'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'})
c.subscribe(['mytopic'])
# seek()方法將分區的偏移量設置為0,意味著消費者將從分區的開頭開始消費消息
c.seek(TopicPartition('mytopic', 0), 0)
```
## But How Do We Exit
1. 在一般情況下,我們會在一個主線程中循環 poll 消息並進行處理
2. 當需要退出 poll 循環時,我們可以使用另一個線程調用 consumer.wakeup(),調用此方法會使得 poll() 拋出WakeupException,如果 consumer poll loop 在主線程跑,則可以調用 ShutdownHook 來達成
3. 如果調用 wakup 時,主線程正在處理消息,那麽在下一次主線程調用 poll 時會拋出異常: WakeUpException
4. 主線程在拋出 WakeUpException 後,需要調用 consumer.close(),此方法會提交 offset,同時發送一個退出 consumer group 的消息到Kafka 的 group coordinator
5. group coordinator 收到消息後會立即進行 rebalance(而無需等待此consumer session timeout)
(python 沒有查到相對應的 function)
## Deserializers 反序列化
* Kafka producer 負責將 object 序列化成 bytes arrays 並發送到 Kafka,consumer 則需要將 bytes arrays 轉換成 object,這就是反序列化做的事情
* 序列化與反序列化需要匹配,因此作為開發者,需要關注寫入到主題使用的是什麽序列化格式,並且保證寫入的數據能夠被 consumer 反序列化成功
* 使用 Avro 與 Schema Registry 來序列化與反序列化,那麽事情會輕鬆許多,因為 AvroSerializer 會保證所有寫入的數據都是結構兼容的,並且能夠被反序列化出來
### Custom Deserializers
* 提醒!!不推薦實現自定義的序列化與反序列化,因為往往這些方案並不成熟,難以維護和升級,而且容易出錯
* 可以使用JSON、Thrift、Protobuf或者Avro的成熟的解決方案
### Using Avro Deserialization with Kafka Consumer
#### python code
```
pip install confluent-kafka[avro]
```
```
from confluent_kafka import KafkaError, Consumer
from confluent_kafka.avro import AvroConsumer
# 配置
conf = {
'bootstrap.servers': 'broker1:9092,broker2:9092',
'group.id': 'CountryCounter',
'key.deserializer': 'io.confluent.kafka.serializers.StringDeserializer',
'value.deserializer': 'io.confluent.kafka.serializers.KafkaAvroDeserializer',
'schema.registry.url': 'your_schema_registry_url', # 替換為你的 Schema Registry URL 不一定要有!
}
# 創建 AvroConsumer
consumer = AvroConsumer(conf)
# 訂閱主題
topic = 'customerContacts'
consumer.subscribe([topic])
print(f'Reading topic: {topic}')
try:
while True:
# 拉取消息
records = consumer.poll(1000)
for record in records:
if record.error():
if record.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
continue
else:
print(record.error())
break
# 處理消息
value = record.value()
if value is not None:
print(f'Current customer name is: {value["name"]}') # 替換為你的 Avro 消息中的字段名
# 同步提交偏移量
consumer.commit()
except KeyboardInterrupt:
pass
finally:
consumer.close()
```
## Standalone Consumer: Why and How to Use a Consumer Without a Group
* 一般情況下我們都是 consumer group(即便只有一個 consumer)來消費消息,因為這樣可以在增加或減少 consumer 時自動進行 rebalance,這種方式是推薦的
* 在知道 topic 和 partition 的情況下,也可以使用 Standalone Consumer 來進行消費,需要自己給 consumer 分配消費 partition,而不是讓 consumer 訂閱(成為消費組)topic
* Standalone Consumer 除了需要主動獲取 partition 以及沒有 rebalance,其他的處理邏輯都是一樣的
* 需要注意: 如果添加了新的 partition,這個 consumer 是感知不到的,需要通過 consumer.partitionsFor() 來重新獲取 partition。
#### python code
```
from confluent_kafka import Consumer, KafkaError, TopicPartition
conf = {
'bootstrap.servers': 'your_bootstrap_servers',
'group.id': 'your_consumer_group',
'auto.offset.reset': 'earliest', # OR 'latest'
}
consumer = Consumer(conf)
# 獲取主題的分區信息
topic = 'your_topic'
partition_infos = consumer.list_topics(topic=topic).topics[topic].partitions.values()
# 構建 TopicPartition 對象的列表
partitions = [TopicPartition(topic, partition.partition) for partition in partition_infos]
# 手動分配分區
consumer.assign(partitions)
try:
while True:
# 處理消息
......
# 同步提交偏移量
consumer.commit()
except KeyboardInterrupt:
pass
finally:
consumer.close()
```
## ref
[kafka 參數說明](https://www.cnblogs.com/hello-/articles/15577525.html
)
[rebalance 解說](https://zhuanlan.zhihu.com/p/337469858)
[讀書筆記1](https://arronhong.github.io/2021/05/25/Introduction%20to%20Kafka/#more
)
[讀書筆記2](https://blog.csdn.net/skycanf/article/details/81668584?spm=1001.2101.3001.6650.7&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-7-81668584-blog-121654059.235%5Ev38%5Epc_relevant_default_base3&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-7-81668584-blog-121654059.235%5Ev38%5Epc_relevant_default_base3&utm_relevant_index=13)
[offset commit](https://zhuanlan.zhihu.com/p/577059966)
[offset commit 2](https://www.cnblogs.com/fnlingnzb-learner/p/13429705.html)
[kafka python code](https://blog.51cto.com/quantfabric/2504495?u_atoken=0b0f4c57-ac73-4739-979c-d0c51e77380d&u_asession=01V1-F_gi0UGcAOSTkXA2DxcAyyCuA5WIJgGj8TMXiA9LdlceKqQklU4xBcbi1G1vMPObCD4f2uBDuabLWwX9PDdsq8AL43dpOnCClYrgFm6o&u_asig=05CVKgncGS6zO5dOpiej7tL7BzLemFYDd8kxsMfD1iB_vY9PSSjBHxjiUhl-HRrnEpnLnstK_XrR0KmV_bYXbUoLf2xiU3uClTzl0R5l7JaZogFJfZno2402tRCHR2UeUncnBP3-n_jkhHRfZ9w8ceZqwk7vdM6WwoyaGMxdSYBBKABXu60eqbqNPNMBIK8bMVksmHjM0JOodanL5-M1Qs1XMoGEX5Mht9a1BRcKgahZHyh7ikxj749pp0REMktVBy1mn-1zbfsI-O5V0v-zruxEalWuX0Cj2gkXnO6VeaKeTY94r_LXIIil3Y3aVPRGAe&u_aref=HHl08ZObiVmDJ7YRGf5SxOI6R78%3D)