# 12. Administering Kafka ## 12.1 Topic Operations ### Creating a New Topic 在集群中創建 topic 的時候需要三個參數,這三個參數是必須的,不過這些參數有些在 broker 上有默認值 - Topic Name: 你需要創建 Topic 的名稱 - Replication Factor: 集群中 topic 的每個分區的副本數 - Partitions: 分區數,topic 由這些分區組成 #### Specifying Topic Configurations - 可在創建的時候設置 topic 的副本,或者設置配置參數對 topic 的配置進行覆蓋,可以用 kafka-topics.sh 通過 --config 來實現 #### topic 命名 - Topic的名稱可以包含字母、數字、字符及下劃線、破折號和點號 - topic的命名,不建議的使用: 1. 以兩個下劃線(_)開頭的 topic 名稱,這種形式的 topic 被視為集群內部的topic,ex: comsumer group offset 存儲的 topic 是__consumer_offsets 2. 在單個集群中同時使用句號(.)和下劃線(_),因為在內部統計 topic 時,句號被改為了下劃線,ex: topic.1在統計中將變成topic_1 #### kafka-topics.sh 使用範例 創建指定名稱、partition 數量、replica 數量的 topic 如果集群設置為支持機架的 replica 分配,那麽每個 partition 的 replica 將位於單獨的機架中,如果不需要機架支持,那麽可以通過–disable-rack-aware 關閉 ``` [old version] kafka-topics.sh --zookeeper <zookeeper connect> --create --topic <string> --replication-factor <integer> --partitions <integer> [new version] kafka-topics.sh --bootstrap-server <connection-string>:<port> --create -- topic <string> --replication-factor <integer> --partitions <integer> ``` ``` # 創建名為 my-topic 的topic,包含 8 個 partition,每個 partition 有 2 個 replica # --if-not-exists: 如果 Topic 已經存在,則不會返回一個錯誤 [old version] kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --create --topic my-topic --replication-factor 2 --partitions 8 --if-not-exists Created topic "my-topic". [new version] kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --replication-factor 2 --partitions 8 Created topic "my-topic". ``` ### Listing All Topics in a Cluster (列表格式為每行一個topic,沒有特定的順序) ``` [old version] # kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --list my-topic - marked for deletion other-topic # [new version] kafka-topics.sh --bootstrap-server localhost:9092 --list __consumer_offsets my-topic other-topic ``` ### Describing Topic Details 可以獲得集群中一個或多個topic的詳細信息,輸出包括:PartitionCount,topic 配置覆蓋以及每個分區及其副本分配的清單 (通過向命令行提供一個 topic 參數,可以將此限制為單個 topic 主題) ``` [old version] # kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --describe --topic other-topic Topic:other-topic PartitionCount:8 ReplicationFactor:2 Configs: Topic:other-topic Partition: 0 ... Replicas: 1,0 Isr: 1,0 Topic:other-topic Partition: 1 ... Replicas: 0,1 Isr: 0,1 Topic:other-topic Partition: 2 ... Replicas: 1,0 Isr: 1,0 Topic:other-topic Partition: 3 ... Replicas: 0,1 Isr: 0,1 Topic:other-topic Partition: 4 ... Replicas: 1,0 Isr: 1,0 Topic:other-topic Partition: 5 ... Replicas: 0,1 Isr: 0,1 Topic:other-topic Partition: 6 ... Replicas: 1,0 Isr: 1,0 Topic:other-topic Partition: 7 ... Replicas: 0,1 Isr: 0,1 # Replicas: 0,1 表示這個分區有兩個副本,分別位於 broker 0 和 broker 1 # Isr: 0,1 表示這個分區的同步副本是 broker 0 和 broker 1 [new version] # kafka-topics.sh --boostrap-server localhost:9092 --describe --topic my-topic Topic: my-topic PartitionCount: 8 ment.bytes=1073741824 Topic: my-topic Partition: 0 Topic: my-topic Partition: 1 Topic: my-topic Partition: 2 Topic: my-topic Partition: 3 Topic: my-topic Partition: 4 Topic: my-topic Partition: 5 Topic: my-topic Partition: 6 Topic: my-topic Partition: 7 ``` describe 命令還有幾個用於過濾輸出的選項 - --topics-with-overrides: 只會列出包含了與集群不同配置的topic - --under-replicated-partitions: 將顯示一個或者多個 replica 與 leader 不同步的所有partitions - -–unavailable-partitions: 顯示沒有leader的所有不同步的所有partitions。這是一種更嚴重的情況,意味著該分區目前處於脫機狀態,不能用於客戶端生產或者使用 - --exclude-internal: 這個選項會在描述主題時排除 Kafka 的內部主題(如 __consumer_offsets 等)。這些內部主題主要用於 Kafka 自己的內部管理和運行,因此通常不需要在描述主題時顯示出來。 - --at-min-isr-partitions; 列出那些當前在最小 ISR 數量上的分區。如果分區的 ISR 剛好等於配置的最小 ISR(min.insync.replicas),這意味著如果再失去一個副本,該分區將不能再保證數據一致性 - --under-min-isr-partitions 列出那些當前低於最小 ISR 數量的分區。表明這些分區處於不健康狀態,可能會影響到數據的一致性和可用性 ### Adding Partitions 有時需要增加 topic 的 Partitions 數量,最常見的原因: - 是為了進一步擴展 topic,或者降低單個 Partitions 的吞吐量 - 如果想在單個 consumer group 裡擴展更多的 consumer,那麼也需新增 Partitions 數量,因為 Partitions 只能由 consumer group 中的單個 consumer 使用 使用 key 來控制的 topic 很難添加 Partitions: - Kafka 使用 key 的 hash 值來確定消息應該被分配到哪個分區。這意味著隨著分區數量的變化,hash 值的分佈也會改變,從而導致相同 key 的消息可能會被分配到不同的分區 - 建議在創建 topic 的時候設置一次包含 key 控制消息的topic 的 Partitions 數量,並避免調整 topic 的大小 跳過不存在 topic 的錯誤: - 雖然 –alter 命令提供了一個 –if-exists 參數,但是不建議使用它 - 如果正在修改的 topic 不存在,使用此參數將導致命令不返回錯誤,這可能會掩蓋本應該創建 topic 的 topic 不存在問題 - ex: 將 my-topic 的 Partitions 增加到16個: ``` [old version] # kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --alter --topic my-topic --partitions 16 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded! [new version] kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic my-topic --partitions 16 ``` ### Reducing Partitions 無分減少 Partitions 的數量: - 從 topic 中刪除的 Partitions 將導致該 topic 的部分數據也被刪除,從消費者的角度來看,這是不一致的 - 嘗試將數據重新分發到剩余的 Partitions 也會很困難,會導致消息亂序 - 如果需要減少 Partitions 的數量,則需要刪除 topic 並重新創建它 ### Deleting a Topic * 即使沒有消息的 topic 也會使用集群的資源,包括磁盤空間, open filehandles 和 memory,如果 topic 不再需要,可以刪除它並釋放這些資源 * 為了執行此操作,集群中的 broker 必須配置 delete.topic.enable 選項為 true,如果這個選項設置為 false,則這個操作將被忽略 * 刪除數據之前需要注意:刪除一個 topic 也會刪除它的全部消息,這是不可逆的操作 #### 指令 刪除 my-topic topic: ``` [old version] # kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --delete --topic my-topic Topic my-topic is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. [new version] kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic my-topic ``` ## 12.2 Consumer Groups 舊版 consumer group: 信息存儲在zookeeper中,使用 –zookeeper參數指定 zookeeper 的地址 新版 consumer group: 信息存儲在kafka中的特定topic中,使用 –bootstrap-server 指定 kafka broker 的ip和port ### List and Describe Groups old version: ``` # kafka-consumer-groups.sh --zookeeper zoo1.example.com:2181/kafka-cluster --list console-consumer-79697 myconsumer # ``` new version ``` # kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list console-consumer-95554 console-consumer-9581 my-consumer ``` 查看一個 consumer group 的情況: 用 –describe 替換 –list 之後加上 –group 參數, 將列出當前指定的 consumer group 正在使用的 topic 以及每個 topic 的 offset ``` [old version] # kafka-consumer-groups.sh --zookeeper zoo1.example.com:2181/kafka-cluster --describe --group myconsumer GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER myconsumer my-topic 0 1688 1688 0 myconsumer_host1.example.com-1478188622741-7dab5ca7-0 myconsumer my-topic 1 1418 1418 0 myconsumer_host1.example.com-1478188622741-7dab5ca7-0 myconsumer my-topic 2 1314 1315 1 myconsumer_host1.example.com-1478188622741-7dab5ca7-0 myconsumer my-topic 3 2012 2012 0 myconsumer_host1.example.com-1478188622741-7dab5ca7-0 myconsumer my-topic 4 1089 1089 0 myconsumer_host1.example.com-1478188622741-7dab5ca7-0 myconsumer my-topic 5 1429 1432 3 myconsumer_host1.example.com-1478188622741-7dab5ca7-0 myconsumer my-topic 6 1634 1634 0 myconsumer_host1.example.com-1478188622741-7dab5ca7-0 myconsumer my-topic 7 2261 2261 0 myconsumer_host1.example.com-1478188622741-7dab5ca7-0 # [new version] kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-consumer ``` | field | description | | -------- | -------- | | GROUP | consumer group 的名稱 | | TOPIC | 被消費的topic的名稱 | | PARTITION | 被消費的分區ID | | CURRENT-OFFSET | consumer group 為這個 topic partition提交的最後一個offset,這是 consumer 在 partition提交的最後一個offset 中的位置 | | LOG-END-OFFSET | Topic的當前高水位線的offset,這是 producer 提交到 consumer group 被確認的最後一條消息的 offset | | LAG | 此Topic分區的 consumer 當前的 offset 和 broker 中水位線的差異| | OWNER | 當前使用此topic的 partition提交的最後一個offset 的consumer group 的組成員,這是消費者組成員提供的任意ID,不一定包括消費者的主機名 | ### Delete Group old version: 將從 zookeeper 中刪除整個組, 包括該組正在使用的所有topic的所有被存儲的offset 先關閉組中的consumer: 如果沒有首先關閉所有consumer,則可能會導致消費者出現不可預測行為,因為 group 的元數據已經從 zookeeper 上被刪除。 delete consumer group: testgroup ``` [old version] # kafka-consumer-groups.sh --zookeeper zoo1.example.com:2181/kafka-cluster --delete --group testgroup Deleted all consumer group information for group testgroup in zookeeper. [new version] kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-consumer Deletion of requested consumer groups ('my-consumer') was successful. ``` 還可以使用相同的命令刪除正在使用的 topic 的 offset,而不是刪除整個組, 在執行操作之前要停止 consumer。或不要讓他們讀取即將被刪除的topic 從 testgroup 中刪除 my-topic 的 offset: ``` [old version] # kafka-consumer-groups.sh --zookeeper zoo1.example.com:2181/kafka-cluster --delete --group testgroup --topic my-topic Deleted consumer group information for group testgroup topic my-topic in zoo keeper. [new version] kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group testgroup --topic my-topic ``` ### Offset Management 除了可以顯示和刪除 consumer group 的 offset之外,還可以獲取offset, 並以 batch 的方式存儲新的 offset,這對於在出現需要重新讀取消息的問題時,使用重置 offset 非常有用。或者消費者無法正常處理消息,需要跳過 offset 時,可以做 offset 重置 #### Export Offsets 沒有腳本來導出 offset, 但是我們可以使用 kafka-run-class.sh 在適當的時候通過其底層的java類來執行該工具。導出offset,將生成一個文件, 每個 partition在文件裡就是一行,格式: /consumers/GROUPNAME/offsets/topic/TOPICNAME/PARTITIONID-0:OFFSET。 ``` [old version] # kafka-run-class.sh kafka.tools.ExportZkOffsets --zkconnect zoo1.example.com:2181/kafka-cluster --group testgroup --output-file offsets # cat offsets /consumers/testgroup/offsets/my-topic/0:8905 /consumers/testgroup/offsets/my-topic/1:8915 /consumers/testgroup/offsets/my-topic/2:9845 /consumers/testgroup/offsets/my-topic/3:8072 /consumers/testgroup/offsets/my-topic/4:8008 /consumers/testgroup/offsets/my-topic/5:8319 /consumers/testgroup/offsets/my-topic/6:8102 /consumers/testgroup/offsets/my-topic/7:12739 [new version] kafka-consumer-groups.sh --bootstrap-server localhost:9092 --export --group my-consumer --topic my-topic --reset-offsets --to-current --dry-run > offsets.csv # cat offsets.csv my-topic,0,8905 my-topic,1,8915 my-topic,2,9845 my-topic,3,8072 my-topic,4,8008 my-topic,5,8319 my-topic,6,8102 my-topic,7,12739 ``` #### Import Offsets 導入 offset 的工具與導出相反,它獲取導出 offset 生成的文件,並使用該文件設置 consumer group 的當前 offset。一種常見的做法是對export offset 的文件進行複製,並編輯該副本,修改裡面的 offset 值。 注意,對於 import 命令,不需 –group 選項,因為文件中已經有 consumer group ``` [old version] # kafka-run-class.sh kafka.tools.ImportZkOffsets --zkconnect zoo1.example.com:2181/kafka-cluster --input-file offsets [new version] kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group my-consumer --from-file offsets.csv --execute # TOPIC PARTITION NEW-OFFSET my-topic 0 8905 my-topic 1 8915 my-topic 2 9845 my-topic 3 8072 my-topic 4 8008 my-topic 5 8319 my-topic 6 8102 my-topic 7 12739 ``` 在執行 Import Offsets 之前,必須停止 consumer group 中的所有consumer:如果 consumer group 處於活躍狀態,則不會讀取這些offset,反而消費者會將這些寫入的 offset 覆蓋。 ## 12.3 Dynamic Configuration Changes * 在集群運行的過程中,可以對 topic 和 client(consumer, producer) 的 config 進行覆蓋 * 這些更改被放在一個單獨的命令行工具 kafka-config.sh 中 * 一旦設置好,這些 config 對於集群就是永遠生效,存儲在 zookeeper 中,並且 broker 在啟動時會讀取這些 config ### Overriding Topic Configuration Defaults * 為了滿足不同場景,topic的很多參數都可以進行單獨設置 * 大多數 config 在 broker 層級下都有默認值,沒覆蓋情況都使用默認值 * 更改topic config 例子 ``` [old version] kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster --alter --entity-type topics --entity-name <topic name> --add-config <key>=<value>[,<key>=<value>...] [new version] kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name <topic-name> --add-config <key>=<value>[,<key>=<value>...] ``` * 將 my-topic 的 topic 的用戶留存時間設置為1小時,3600萬ms: ``` # kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster --alter --entity-type topics --entity-name my-topic --add-config retention.ms=3600000 Updated config for topic: "my-topic". [new version] kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my-topic --add-config retention.ms=3600000 Updated config for topic: "my-topic". ``` ### Overriding Client and User Configuration Defaults * 對於 kafka 客戶端唯一可以配置的是 consumer 和 producer 的 config,他們都是 bytes/sec rates, 表示客戶端在每個 broker的生產和消費速率 * 如果集群中有 5 個broker,並且客戶端指定10M/s的生產者配額,那麽總速率為50MB/s * 客戶端ID與消費者組: 客戶端ID可以與 consumer group 的名稱不同,consumer 可以設置自己的ID,不同組的consumer可能會有相同的客戶端ID,最佳的方法是使用能夠表明他們所屬 consumer group 的標示來命名,這樣可以更容易在日志中確定是哪個組負責請求 * 更改客戶端 config 例子 ``` [old version] kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster --alter -- entity-type clients --entity-name <client ID> --add-config <key>=<value>[,<key>=<value>...] [new version] # change the controller mutation rate for both a user and client kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config "controller_mutations_rate=10" --entity-type clients --entity-name <client ID> --entity-type users --entity-name <user ID> ``` ### Overriding Broker Configuration Defaults More than 80 overrides can be altered with kafka-configs.sh for brokers. * min.insync.replicas * unclean.leader.election.enable * max.connections ### Describing Configuration Overrides * 可以使用命令行工具列出所有被覆蓋的 config,從而檢查topic, 客戶端的配置 * 通過 –describe 命令即可: ``` [old version] # kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster --describe --entity-type topics --entity-name my-topic Configs for topics:my-topic are retention.ms=3600000,segment.ms=3600000 [new version] kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type topics --entity-name my-topic Configs for topics:my-topic are retention.ms=3600000 ``` * describe 只能顯示覆蓋的 config,不包括集群默認的 config ### Removing Configuration Overrides * 可以完全刪除動態配置,讓集群恢覆到默認值,可以使用alter命令和delete-config命令來刪除 Configuration Overrides * 刪除 my-topic 的 retention.ms 動態配置 ``` [old version] # kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster --alter -- entity-type topics --entity-name my-topic --delete-config retention.ms Updated config for topic: "my-topic". [new version] kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my-topic --delete-config retention.ms Updated config for topic: "my-topic". ```