# 12. Administering Kafka ## 12.4 Producing and Consuming - `kafka-console-consumer.sh` 呼叫 console consumer - `kafka-console-producer.sh` 呼叫 console producer 這兩個 command 可以啟動 console producer 以及 console consumer。透過 console producer 以及 console consumer 可以手動執行 produce 或是 consume 的動作,來測試跟 Kafka 相依的 application。 但要注意 console producer 以及 console consumer 只是簡易的測試方式,不建議使用在正式的 application ### Console Producer 目的:用來將訊息寫入 Kafka topic 預設的行為: - 訊息按行讀取 - 用 tab character 來分割 key-value,若沒有 tab character 則默認 key 為 null - 使用 default serializer (DefaultEncoder) 讀取 console 的輸入並 produce raw bytes 進 Kafka topic 啟動 Console Producer 需要的參數 - `--bootstrap-server` - `--topic` 範例 ``` kafka-console-producer.sh --bootstrap-server <connection-string>:<port> --topic <topic-name> >Message 1 >Test Message 2 >Test Message 3 >Message 4 ``` #### 設定 producer config 可以在啟動 console producer 時,給定一般的 producer config。 取決於config 數量以及偏好的方式 方法一: - 使用 `--producer.config <config-file>` 指定 config file - `<config-file>` 是包含 config 設定文件的完整路徑 方法二: - 命令行參數 - 使用 `--producer-property <key>=<value>` 形式來指定 config 選項 - `<key>` 是配置的 config 名稱,`<value>` 是要設置的值 常見的 config 選項: - `--batch-size`:指定在單個批次中發送的訊息數量(如果不是同步發送的情況下) - `--timeout`:如果生產者在異步模式下運行,此選項提供了在生產前等待批次的最大時間,以避免在低生產量的 topic 上長時間等待 - `--compression-codec <string>`:指定生產訊息時使用的壓縮類型。有效類型可以是 none、gzip、snappy、zstd 或 lz4。default 為 gzip - `--sync`:同步生產訊息,等待每條訊息被確認後再發送下一條 ##### Line-reader `kafka.tools.ConsoleProducer$LineMessageReader` class 負責讀取標準輸入並創建生產者記錄 - `ignore.error`:設置為 false 時,而當 `parse.key` 設置為 true 且沒有鍵分隔符時會拋出異常。默認值為 true。 - `parse.key`:設置為 false 時,始終將鍵設置為 null。默認值為 true。 - `key.separator`:指定在讀取時訊息鍵和訊息值之間使用的分隔符字元。默認為 tab character ### Console Consumer 用途:從 Kafka cluster 中的一個或多個 topic 中 consume 訊息 預設行為 - 訊息以 new line 分隔的形式輸出到 console - 輸出訊息中的 raw byte - 沒有 key,沒有formating,則會使用 DefaultFormatter 啟動 Console Producer 需要的參數 - `--bootstrap-server` - `topic` - `--topic` 指定單個要消費的 topic - `--whitelist` 使用正則表達式匹配所有要消費的topic - `timeframe` - `--from-beginning` 從第一條訊息開始消費訊息 - `--offset` 指定從某個特定的 offset 開始消費 - `--time` 根據時間戳指定從某個特定的時間點開始消費訊息 範例 ``` kafka-console-consumer.sh --bootstrap-server <connection-string>:<port> --topic <topic-name> --<timeframe> Message 1 Test Message 2 Test Message 3 Message 4 ``` ``` kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist 'my.*' --from-beginning Message 1 Test Message 2 Test Message 3 Message 4 ``` #### 設定 consumer config ## 12.5 Partition Management - Kafka 預設安裝包含一些管理分區的腳本工具 - 允許重新選舉領導副本 - 將分區分配給特定 broker - 這些工具在需要手動調整以平衡集群中的消息流量時非常有用 ### Preferred Replica Election 分區的多副本和領導者: - 每個分區有多個副本,但同一時間只有一個領導者 - 所有生產和消費操作都在領導者 broker 上進行 領導者平衡的重要性: - 必須保持分區領導權在不同代理上的平衡,以確保負載均衡 - 領導權轉移需要同步副本接管 自動領導者平衡: - 建議啟用 automatic leader balancing - 使用如 [Cruise Control](https://github.com/linkedin/cruise-control) 等工具 如果發現 Kafka 集群負載不均衡,可以執行一個輕量級且通常無影響的操作,稱為 Preferred Replica Election,"優先領導者選舉"。這會告訴集群控制器為分區選擇理想的領導者。客戶端可以自動跟蹤領導權變更,從而能夠移動到新轉移領導權的代理上 優先領導者選舉: - 可以執行優先領導者選舉來手動調整領導者平衡 - 使用 `kafka-leader-election.sh` 工具啟動優先領導者選舉,舊版為`kafka- preferred-replica-election.sh` 並準備 deprecate - 可以在所有主題或指定的分區和主題上啟動選舉 範例 啟動整個集群中所有主題的優先領導者選舉 ``` kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type PREFERRED --all-topic-partitions ``` 在特定分區或主題上啟動選舉 ``` # partitions.json { "partitions": [ { "partition": 1, "topic": "my-topic" }, { "partition": 2, "topic": "foo" } ] } ``` ``` kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type PREFERRED --path-to-json-file partitions.json ``` #### Changing a Partition’s Replicas 有時可能需要手動更改分區的副本分配: - broker 的負載不均衡,且領導自動分配無法正確處理 - 如果 broker 下線且分區副本數不足 - 如果添加了新的 broker 並希望更快地將新分區分配到它上面 - 想要調整 topic 的副本 使用 `kafka-reassign-partitions.sh` 執行此操作,但這種手動改變分區副本分配的作法是一個多步驟的過程 - generate a move set 生成遷移的集合 - execute on the provided move set proposal 執行生成提案 範例情境 原本有一個 4 個 broker 的 Kafka 集群,後面新增了 2 個 broker,達到總數為 6 個,希望將原本的兩個 topic 移轉到後面新增的 5 號,6 號 broker 上 必須要指定被移動的 topic,可以透過 json file,指定要移動的 topic ``` # topics.json { "topics": [ { "topic": "foo1" }, { "topic": "foo2" } ], "version": 1 } ``` 透過 topics.json 指定的 topic,可以生成要遷移的集合結果 ``` kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics.json --broker-list 5,6 --generate ``` 遷移提案 ``` {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]}, {"topic":"foo1","partition":0,"replicas":[3,4]}, {"topic":"foo2","partition":2,"replicas":[1,2]}, {"topic":"foo2","partition":0,"replicas":[3,4]}, {"topic":"foo1","partition":1,"replicas":[2,3]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] } Proposed partition reassignment configuration {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]}, } # {"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":2,"replicas":[5,6]}, {"topic":"foo2","partition":0,"replicas":[5,6]}, {"topic":"foo1","partition":1,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[5,6]}] ``` 執行完,可以將輸出的遷移提案可以存成 2 個不同名稱的 json file - revert-reassignment.json:用來復原原本的狀態 - expand-cluster-reassignment.json:用來下一步真正執行分配 透過 expand-cluster-reassignment.json 文件執行分區重新分配 ``` kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --execute ``` controller 會通過將新的副本添加到每個分區的副本列表中來執行重新分配,新副本會從當前領導者複製所有的消息,當新副本複製完成後,會將舊的副本移除掉 檢查分區移動的狀況,會需要在執行步驟中使用的 json file ``` kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --verify ``` 範例結果 ``` Status of partition reassignment: Reassignment of partition [foo1,0] completed successfully Reassignment of partition [foo1,1] is in progress Reassignment of partition [foo1,2] is in progress Reassignment of partition [foo2,0] completed successfully Reassignment of partition [foo2,1] completed successfully Reassignment of partition [foo2,2] completed successfully ``` #### Dumping Log Segments 情境:有時可能需要讀取消息的具體內容,例如,當你的 topic 中出現 poison pill 消息(損壞的消息)導致消費者無法處理時 `kafka-dump-log.sh` 可用於 decode 分區的日誌段,可以查看單個消息而無需消費和 decode 它們 查找日誌段的目錄將是 `/tmp/kafka-logs/<topic-name>-<partition>` 範例 `/tmp/kafka-logs/my-topic-0/` ``` # kafka-dump-log.sh --files /tmp/kafka-logs/my-topic-0/00000000000000000000.log Dumping /tmp/kafka-logs/my-topic-0/00000000000000000000.log Starting offset: 0 baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1623034799990 size: 77 magic: 2 compresscodec: NONE crc: 1773642166 isvalid: true baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 77 CreateTime: 1623034803631 size: 82 magic: 2 compresscodec: NONE crc: 1638234280 isvalid: true baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 159 CreateTime: 1623034808233 size: 82 magic: 2 compresscodec: NONE crc: 4143814684 isvalid: true baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 241 CreateTime: 1623034811837 size: 77 magic: 2 compresscodec: NONE crc: 3096928182 isvalid: true # ``` `--print-data-log` 選項 ``` # kafka-dump-log.sh --files /tmp/kafka-logs/my-topic-0/00000000000000000000.log --print-data-log Dumping /tmp/kafka-logs/my-topic-0/00000000000000000000.log Starting offset: 0 baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1623034799990 size: 77 magic: 2 compresscodec: NONE crc: 1773642166 isvalid: true | offset: 0 CreateTime: 1623034799990 keysize: -1 valuesize: 9 sequence: -1 headerKeys: [] payload: Message 1 baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 77 CreateTime: 1623034803631 size: 82 magic: 2 compresscodec: NONE crc: 1638234280 isvalid: true | offset: 1 CreateTime: 1623034803631 keysize: -1 valuesize: 14 sequence: -1 headerKeys: [] payload: Test Message 2 baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 159 CreateTime: 1623034808233 size: 82 magic: 2 compresscodec: NONE crc: 4143814684 isvalid: true | offset: 2 CreateTime: 1623034808233 keysize: -1 valuesize: 14 sequence: -1 headerKeys: [] payload: Test Message 3 baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 241 CreateTime: 1623034811837 size: 77 magic: 2 compresscodec: NONE crc: 3096928182 isvalid: true | offset: 3 CreateTime: 1623034811837 keysize: -1 valuesize: 9 sequence: -1 headerKeys: [] payload: Message 4 # ``` #### Replica Verification 分區複製類似於常規的 Kafka 消費者客戶端 - 從最舊的 offset 開始複製,並定期將當前 offset 數值檢查點保存到disk 上。當複製停止而後重新開始時,副本會從最後的檢查點繼續 要驗證 topic 分區的副本在整個集群中是否一致,可以使用 `kafka-replica-verification.sh` 工具進行驗證 - 將從給定的一組 topic 分區的所有副本中提取消息 - 檢查所有副本上是否存在所有消息,並顯示給定分區的最大 lag 數 - 此過程將在一個循環中連續運行,直到被取消掉 範例: 驗證在 Kafka broker 1 和 broker 2 上,以 "my" 開頭的 topic ``` # kafka-replica-verification.sh --broker-list kafka.host1.domain.com:9092,kafka.host2.domain.com:9092 --topic-white-list 'my.*' 2021-06-07 03:28:21,829: verification process is started. 2021-06-07 03:28:51,949: max lag is 0 for partition my-topic-0 at offset 4 among 1 partitions 2021-06-07 03:29:22,039: max lag is 0 for partition my-topic-0 at offset 4 among 1 partitions ... # ``` ## 12.6 Other Tools for more: https://kafka.apache.org ### Client ACLs Access Control List (ACL) 可以用於管理和控制對 Kafka 的資源(topic、consumer group、cluster)的訪問權限等等 操作範例 讓 User Alice 可以讀取 my-topic ``` kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \ --add --allow-principal User:Alice \ --operation Read --topic my-topic ``` ### Lightweight MirrorMaker ### Testing tools ## 12.7 Unsafe Operations 技術上可行的管理任務,但除非在極端情況下,否則不應該嘗試這些任務 ### Moving the Cluster Controller Controller - 每個 Kafka 集群有一個 broker 作為 controller - controller 有一個特殊線程負責監督 Kafka 集群的操作,一般的 broker 不會執行這樣的特殊線程 Controller election - 透過 Zookeeper 自動產生 - 當 controller 故障,會從群集中的其他 broker 選一個新的 controller 出來 強制移動 controller 的情況 - 集群或 broker 故障時,可能需要強制移動 controller,而不需關閉整個群集 - 現任 controller 遭遇異常或無法正常工作時,可考慮移動 controller 移動控制器的風險和頻率 - 通常風險不大,但不應經常執行 移動 controller 的方法 - 手動刪除 /admin/controller 下的 ZooKeeper znode - 集群會隨機選出新的 controller - 目前無法指定特定 broker 為 controller ### Removing Topics to Be Deleted 在正常情況下,當在 Kafka 中刪除 topic 時,ZooKeeper 節點會發出刪除請求。當每個副本完成 topic 刪除且確認刪除完成後,znode 就會被移除。 有以下情境可能會有刪除請求卡住的情況: - 請求者無法知道集群中是否啟用了刪除 topic 的功能,並可能發出刪除請求在一個禁用了刪除功能的集群中的 topic 上 - 請求刪除一個非常大的 topic,但集群在處理刪除請求之前,一個或多個副本因為硬體故障而下線,導致刪除無法完成,因為 controller 無法確認刪除成功 解決方法: - 刪除 `/admin/delete_topic/<topic> znode` 移除待處理的刪除請求 - 如果刪除請求 requeue,需要強制移動 controller,確保沒有待處理的 cache request ### Deleting Topics Manually 如果在一個禁用了 topic 刪除功能的集群中,或者需要在正常操作流程之外刪除一些 topic,可以「手動」從集群中刪除它們。 這需要完全關閉集群中的所有 broker,並且不能在集群中任何 broker 運行時進行 Modifying the cluster metadata in ZooKeeper when the cluster is online is a very dangerous operation and can put the cluster into an unstable state 如果真的必須刪除某些 topic,操作順序如下 - 關閉集群中的所有 broker - 從 Kafka 集群路徑中刪除 ZooKeeper 路徑 `/brokers/topics/<topic>` - 每個 broker 的日誌目錄中刪除 partition directory。這些 directory 被命名為 `<topic>-<int>`,其中 `<int>` 是 partition ID - 重啟所有 broker # Summary