# 12. Administering Kafka
## 12.1 Topic Operations
### Creating a New Topic
在集群中創建 topic 的時候需要三個參數,這三個參數是必須的,不過這些參數有些在 broker 上有默認值
- Topic Name: 你需要創建 Topic 的名稱
- Replication Factor: 集群中 topic 的每個分區的副本數
- Partitions: 分區數,topic 由這些分區組成
#### Specifying Topic Configurations
- 可在創建的時候設置 topic 的副本,或者設置配置參數對 topic 的配置進行覆蓋,可以用 kafka-topics.sh 通過 --config 來實現
#### topic 命名
- Topic的名稱可以包含字母、數字、字符及下劃線、破折號和點號
- topic的命名,不建議的使用:
1. 以兩個下劃線(_)開頭的 topic 名稱,這種形式的 topic 被視為集群內部的topic,ex: comsumer group offset 存儲的 topic 是__consumer_offsets
2. 在單個集群中同時使用句號(.)和下劃線(_),因為在內部統計 topic 時,句號被改為了下劃線,ex: topic.1在統計中將變成topic_1
#### kafka-topics.sh 使用範例
創建指定名稱、partition 數量、replica 數量的 topic
如果集群設置為支持機架的 replica 分配,那麽每個 partition 的 replica 將位於單獨的機架中,如果不需要機架支持,那麽可以通過–disable-rack-aware 關閉
```
[old version]
kafka-topics.sh --zookeeper <zookeeper connect> --create --topic <string>
--replication-factor <integer> --partitions <integer>
[new version]
kafka-topics.sh --bootstrap-server <connection-string>:<port> --create -- topic <string>
--replication-factor <integer> --partitions <integer>
```
```
# 創建名為 my-topic 的topic,包含 8 個 partition,每個 partition 有 2 個 replica
# --if-not-exists: 如果 Topic 已經存在,則不會返回一個錯誤
[old version]
kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --create
--topic my-topic --replication-factor 2 --partitions 8 --if-not-exists
Created topic "my-topic".
[new version]
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --replication-factor 2 --partitions 8 Created topic "my-topic".
```
### Listing All Topics in a Cluster
(列表格式為每行一個topic,沒有特定的順序)
```
[old version]
# kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--list
my-topic - marked for deletion
other-topic
#
[new version]
kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
my-topic
other-topic
```
### Describing Topic Details
可以獲得集群中一個或多個topic的詳細信息,輸出包括:PartitionCount,topic 配置覆蓋以及每個分區及其副本分配的清單
(通過向命令行提供一個 topic 參數,可以將此限制為單個 topic 主題)
```
[old version]
# kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --describe --topic other-topic
Topic:other-topic PartitionCount:8 ReplicationFactor:2 Configs:
Topic:other-topic Partition: 0 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 1 ... Replicas: 0,1 Isr: 0,1
Topic:other-topic Partition: 2 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 3 ... Replicas: 0,1 Isr: 0,1
Topic:other-topic Partition: 4 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 5 ... Replicas: 0,1 Isr: 0,1
Topic:other-topic Partition: 6 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 7 ... Replicas: 0,1 Isr: 0,1
# Replicas: 0,1 表示這個分區有兩個副本,分別位於 broker 0 和 broker 1
# Isr: 0,1 表示這個分區的同步副本是 broker 0 和 broker 1
[new version]
# kafka-topics.sh --boostrap-server localhost:9092 --describe --topic my-topic
Topic: my-topic PartitionCount: 8
ment.bytes=1073741824
Topic: my-topic Partition: 0
Topic: my-topic Partition: 1
Topic: my-topic Partition: 2
Topic: my-topic Partition: 3
Topic: my-topic Partition: 4
Topic: my-topic Partition: 5
Topic: my-topic Partition: 6
Topic: my-topic Partition: 7
```
describe 命令還有幾個用於過濾輸出的選項
- --topics-with-overrides:
只會列出包含了與集群不同配置的topic
- --under-replicated-partitions:
將顯示一個或者多個 replica 與 leader 不同步的所有partitions
- -–unavailable-partitions:
顯示沒有leader的所有不同步的所有partitions。這是一種更嚴重的情況,意味著該分區目前處於脫機狀態,不能用於客戶端生產或者使用
- --exclude-internal:
這個選項會在描述主題時排除 Kafka 的內部主題(如 __consumer_offsets 等)。這些內部主題主要用於 Kafka 自己的內部管理和運行,因此通常不需要在描述主題時顯示出來。
- --at-min-isr-partitions;
列出那些當前在最小 ISR 數量上的分區。如果分區的 ISR 剛好等於配置的最小 ISR(min.insync.replicas),這意味著如果再失去一個副本,該分區將不能再保證數據一致性
- --under-min-isr-partitions
列出那些當前低於最小 ISR 數量的分區。表明這些分區處於不健康狀態,可能會影響到數據的一致性和可用性
### Adding Partitions
有時需要增加 topic 的 Partitions 數量,最常見的原因:
- 是為了進一步擴展 topic,或者降低單個 Partitions 的吞吐量
- 如果想在單個 consumer group 裡擴展更多的 consumer,那麼也需新增 Partitions 數量,因為 Partitions 只能由 consumer group 中的單個 consumer 使用
使用 key 來控制的 topic 很難添加 Partitions:
- Kafka 使用 key 的 hash 值來確定消息應該被分配到哪個分區。這意味著隨著分區數量的變化,hash 值的分佈也會改變,從而導致相同 key 的消息可能會被分配到不同的分區
- 建議在創建 topic 的時候設置一次包含 key 控制消息的topic 的 Partitions 數量,並避免調整 topic 的大小
跳過不存在 topic 的錯誤:
- 雖然 –alter 命令提供了一個 –if-exists 參數,但是不建議使用它
- 如果正在修改的 topic 不存在,使用此參數將導致命令不返回錯誤,這可能會掩蓋本應該創建 topic 的 topic 不存在問題
- ex: 將 my-topic 的 Partitions 增加到16個:
```
[old version]
# kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter --topic my-topic --partitions 16
WARNING: If partitions are increased for a topic that has a key,
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
[new version]
kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic my-topic --partitions 16
```
### Reducing Partitions
無分減少 Partitions 的數量:
- 從 topic 中刪除的 Partitions 將導致該 topic 的部分數據也被刪除,從消費者的角度來看,這是不一致的
- 嘗試將數據重新分發到剩余的 Partitions 也會很困難,會導致消息亂序
- 如果需要減少 Partitions 的數量,則需要刪除 topic 並重新創建它
### Deleting a Topic
* 即使沒有消息的 topic 也會使用集群的資源,包括磁盤空間, open filehandles 和 memory,如果 topic 不再需要,可以刪除它並釋放這些資源
* 為了執行此操作,集群中的 broker 必須配置 delete.topic.enable 選項為 true,如果這個選項設置為 false,則這個操作將被忽略
* 刪除數據之前需要注意:刪除一個 topic 也會刪除它的全部消息,這是不可逆的操作
#### 指令
刪除 my-topic topic:
```
[old version]
# kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--delete --topic my-topic
Topic my-topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set
to true.
[new version]
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic my-topic
```
## 12.2 Consumer Groups
舊版 consumer group: 信息存儲在zookeeper中,使用 –zookeeper參數指定 zookeeper 的地址
新版 consumer group: 信息存儲在kafka中的特定topic中,使用 –bootstrap-server 指定 kafka broker 的ip和port
### List and Describe Groups
old version:
```
# kafka-consumer-groups.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --list
console-consumer-79697
myconsumer
#
```
new version
```
# kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
console-consumer-95554
console-consumer-9581
my-consumer
```
查看一個 consumer group 的情況:
用 –describe 替換 –list 之後加上 –group 參數, 將列出當前指定的 consumer group 正在使用的 topic 以及每個 topic 的 offset
```
[old version]
# kafka-consumer-groups.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--describe --group myconsumer
GROUP TOPIC PARTITION
CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
myconsumer my-topic 0
1688 1688 0
myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 1
1418 1418 0
myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 2
1314 1315 1
myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 3
2012 2012 0
myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 4
1089 1089 0
myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 5
1429 1432 3
myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 6
1634 1634 0
myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 7
2261 2261 0
myconsumer_host1.example.com-1478188622741-7dab5ca7-0
#
[new version]
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-consumer
```
| field | description |
| -------- | -------- |
| GROUP | consumer group 的名稱 |
| TOPIC | 被消費的topic的名稱 |
| PARTITION | 被消費的分區ID |
| CURRENT-OFFSET | consumer group 為這個 topic partition提交的最後一個offset,這是 consumer 在 partition提交的最後一個offset 中的位置 |
| LOG-END-OFFSET | Topic的當前高水位線的offset,這是 producer 提交到 consumer group 被確認的最後一條消息的 offset |
| LAG | 此Topic分區的 consumer 當前的 offset 和 broker 中水位線的差異|
| OWNER | 當前使用此topic的 partition提交的最後一個offset 的consumer group 的組成員,這是消費者組成員提供的任意ID,不一定包括消費者的主機名 |
### Delete Group
old version:
將從 zookeeper 中刪除整個組, 包括該組正在使用的所有topic的所有被存儲的offset
先關閉組中的consumer:
如果沒有首先關閉所有consumer,則可能會導致消費者出現不可預測行為,因為 group 的元數據已經從 zookeeper 上被刪除。
delete consumer group: testgroup
```
[old version]
# kafka-consumer-groups.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --delete --group testgroup
Deleted all consumer group information for group testgroup in
zookeeper.
[new version]
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-consumer
Deletion of requested consumer groups ('my-consumer') was successful.
```
還可以使用相同的命令刪除正在使用的 topic 的 offset,而不是刪除整個組, 在執行操作之前要停止 consumer。或不要讓他們讀取即將被刪除的topic
從 testgroup 中刪除 my-topic 的 offset:
```
[old version]
# kafka-consumer-groups.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --delete --group testgroup
--topic my-topic
Deleted consumer group information for group testgroup topic
my-topic in zoo keeper.
[new version]
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group testgroup --topic my-topic
```
### Offset Management
除了可以顯示和刪除 consumer group 的 offset之外,還可以獲取offset, 並以 batch 的方式存儲新的 offset,這對於在出現需要重新讀取消息的問題時,使用重置 offset 非常有用。或者消費者無法正常處理消息,需要跳過 offset 時,可以做 offset 重置
#### Export Offsets
沒有腳本來導出 offset, 但是我們可以使用 kafka-run-class.sh 在適當的時候通過其底層的java類來執行該工具。導出offset,將生成一個文件, 每個 partition在文件裡就是一行,格式:
/consumers/GROUPNAME/offsets/topic/TOPICNAME/PARTITIONID-0:OFFSET。
```
[old version]
# kafka-run-class.sh kafka.tools.ExportZkOffsets
--zkconnect zoo1.example.com:2181/kafka-cluster --group testgroup
--output-file offsets
# cat offsets
/consumers/testgroup/offsets/my-topic/0:8905
/consumers/testgroup/offsets/my-topic/1:8915
/consumers/testgroup/offsets/my-topic/2:9845
/consumers/testgroup/offsets/my-topic/3:8072
/consumers/testgroup/offsets/my-topic/4:8008
/consumers/testgroup/offsets/my-topic/5:8319
/consumers/testgroup/offsets/my-topic/6:8102
/consumers/testgroup/offsets/my-topic/7:12739
[new version]
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --export --group my-consumer --topic my-topic --reset-offsets --to-current --dry-run > offsets.csv
# cat offsets.csv
my-topic,0,8905
my-topic,1,8915
my-topic,2,9845
my-topic,3,8072
my-topic,4,8008
my-topic,5,8319
my-topic,6,8102
my-topic,7,12739
```
#### Import Offsets
導入 offset 的工具與導出相反,它獲取導出 offset 生成的文件,並使用該文件設置 consumer group 的當前 offset。一種常見的做法是對export offset 的文件進行複製,並編輯該副本,修改裡面的 offset 值。
注意,對於 import 命令,不需 –group 選項,因為文件中已經有 consumer group
```
[old version]
# kafka-run-class.sh kafka.tools.ImportZkOffsets --zkconnect
zoo1.example.com:2181/kafka-cluster --input-file offsets
[new version]
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group my-consumer
--from-file offsets.csv --execute
#
TOPIC PARTITION NEW-OFFSET
my-topic 0 8905
my-topic 1 8915
my-topic 2 9845
my-topic 3 8072
my-topic 4 8008
my-topic 5 8319
my-topic 6 8102
my-topic 7 12739
```
在執行 Import Offsets 之前,必須停止 consumer group 中的所有consumer:如果 consumer group 處於活躍狀態,則不會讀取這些offset,反而消費者會將這些寫入的 offset 覆蓋。
## 12.3 Dynamic Configuration Changes
* 在集群運行的過程中,可以對 topic 和 client(consumer, producer) 的 config 進行覆蓋
* 這些更改被放在一個單獨的命令行工具 kafka-config.sh 中
* 一旦設置好,這些 config 對於集群就是永遠生效,存儲在 zookeeper 中,並且 broker 在啟動時會讀取這些 config
### Overriding Topic Configuration Defaults
* 為了滿足不同場景,topic的很多參數都可以進行單獨設置
* 大多數 config 在 broker 層級下都有默認值,沒覆蓋情況都使用默認值
* 更改topic config 例子
```
[old version]
kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter --entity-type topics --entity-name <topic name>
--add-config <key>=<value>[,<key>=<value>...]
[new version]
kafka-configs.sh --bootstrap-server localhost:9092
--alter --entity-type topics --entity-name <topic-name>
--add-config <key>=<value>[,<key>=<value>...]
```
* 將 my-topic 的 topic 的用戶留存時間設置為1小時,3600萬ms:
```
# kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter --entity-type topics --entity-name my-topic --add-config
retention.ms=3600000
Updated config for topic: "my-topic".
[new version]
kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my-topic --add-config retention.ms=3600000
Updated config for topic: "my-topic".
```
### Overriding Client and User Configuration Defaults
* 對於 kafka 客戶端唯一可以配置的是 consumer 和 producer 的 config,他們都是 bytes/sec rates, 表示客戶端在每個 broker的生產和消費速率
* 如果集群中有 5 個broker,並且客戶端指定10M/s的生產者配額,那麽總速率為50MB/s
* 客戶端ID與消費者組:
客戶端ID可以與 consumer group 的名稱不同,consumer 可以設置自己的ID,不同組的consumer可能會有相同的客戶端ID,最佳的方法是使用能夠表明他們所屬 consumer group 的標示來命名,這樣可以更容易在日志中確定是哪個組負責請求
* 更改客戶端 config 例子
```
[old version]
kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter -- entity-type clients --entity-name <client ID>
--add-config <key>=<value>[,<key>=<value>...]
[new version]
# change the controller mutation rate for both a user and client
kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config "controller_mutations_rate=10" --entity-type clients --entity-name <client ID> --entity-type users --entity-name <user ID>
```
### Overriding Broker Configuration Defaults
More than 80 overrides can be altered with kafka-configs.sh for brokers.
* min.insync.replicas
* unclean.leader.election.enable
* max.connections
### Describing Configuration Overrides
* 可以使用命令行工具列出所有被覆蓋的 config,從而檢查topic, 客戶端的配置
* 通過 –describe 命令即可:
```
[old version]
# kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--describe --entity-type topics --entity-name my-topic
Configs for topics:my-topic are
retention.ms=3600000,segment.ms=3600000
[new version]
kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type topics --entity-name my-topic
Configs for topics:my-topic are
retention.ms=3600000
```
* describe 只能顯示覆蓋的 config,不包括集群默認的 config
### Removing Configuration Overrides
* 可以完全刪除動態配置,讓集群恢覆到默認值,可以使用alter命令和delete-config命令來刪除 Configuration Overrides
* 刪除 my-topic 的 retention.ms 動態配置
```
[old version]
# kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter -- entity-type topics --entity-name my-topic
--delete-config retention.ms
Updated config for topic: "my-topic".
[new version]
kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my-topic --delete-config retention.ms
Updated config for topic: "my-topic".
```