# Monitoring Apache Kafka with Prometheus
###### tags: `Apache Kafka` `Prometheus`
## 直接說安裝心得
1. 建議使用java agent的方式
2. 因為zookeeper, broker, connect...etc都有各自的metrics,所以如果在公司管理的VM要預先規劃要給Prometheus收取metrics的ports
3. 舊的版本的jmx exporter有可能遇到Too many open file的問題,見下面。
## 舊版本JMX exporter遇到Too many open file的查法
確定pid
```bash=
ps -ef | grep javaagent
```
如果倒掉了可以看server log確認是不是Too many open file。
如果還活著可以透過下面這個指令觀察open file的數量。
```bash=
ls -l /proc/[PID]/fd | wc -l
```
然後,用下面指令看一下open file的類型
```bash=
ls -l /proc/[PID]/fd
```
會發現為了網路開的檔案量很大。然後用下列指令可以看JMX Exporter用的那個port的socket的state,很多都是無法消化的CLOSE_WAIT。
```bash=
ss -tpn
```
個人經驗不管你怎麼設定sun的httpserver的timeout都不能完全解決問題,我那時候是直接把blog上使用的0.15.0改成當時最新的0.18.0就沒遇到這個問題。
## Confluent - JMX Monitoring stacks Grafana Kafka broker dashboard
### Throughput In/Out
#### Metrics與Prometheus text exposition format per instance的例子
* kafka_server_brokertopicmetrics_messagesinpersec
```
# HELP kafka_server_brokertopicmetrics_messagesinpersec Attribute exposed for management kafka.server:name=MessagesInPerSec,type=BrokerTopicMetrics,attribute=Count
# TYPE kafka_server_brokertopicmetrics_messagesinpersec untyped
kafka_server_brokertopicmetrics_messagesinpersec{topic="__consumer_offsets",} 23.0
kafka_server_brokertopicmetrics_messagesinpersec{topic="product.project-management.project.v0",} 54.0
kafka_server_brokertopicmetrics_messagesinpersec{topic="test.testtopic.v0",} 1.0
kafka_server_brokertopicmetrics_messagesinpersec 78.0
```
* kafka_server_brokertopicmetrics_bytesinpersec
```
# HELP kafka_server_brokertopicmetrics_bytesinpersec Attribute exposed for management kafka.server:name=BytesInPerSec,type=BrokerTopicMetrics,attribute=Count
# TYPE kafka_server_brokertopicmetrics_bytesinpersec untyped
kafka_server_brokertopicmetrics_bytesinpersec{topic="__consumer_offsets",} 70419.0
kafka_server_brokertopicmetrics_bytesinpersec{topic="product.project-management.project.v0",} 28183.0
kafka_server_brokertopicmetrics_bytesinpersec{topic="test.testtopic.v0",} 78.0
kafka_server_brokertopicmetrics_bytesinpersec 98680.0
```
* kafka_server_brokertopicmetrics_bytesoutpersec
```
# HELP kafka_server_brokertopicmetrics_bytesoutpersec Attribute exposed for management kafka.server:name=BytesOutPerSec,type=BrokerTopicMetrics,attribute=Count
# TYPE kafka_server_brokertopicmetrics_bytesoutpersec untyped
kafka_server_brokertopicmetrics_bytesoutpersec{topic="test.node-red.v0",} 0.0
kafka_server_brokertopicmetrics_bytesoutpersec{topic="datagov.pipeline.mgt.offset",} 0.0
kafka_server_brokertopicmetrics_bytesoutpersec{topic="datagov.pipeline.syncerp_qas_db.dbo.ytbrct",} 0.0
kafka_server_brokertopicmetrics_bytesoutpersec{topic="product.project-management.project.activity-change.v0",} 0.0
kafka_server_brokertopicmetrics_bytesoutpersec 0.0
kafka_server_brokertopicmetrics_bytesoutpersec{topic="test.testtopic.v0",} 0.0
kafka_server_brokertopicmetrics_bytesoutpersec{topic="human-resource.employee.resignation.v0",} 0.0
kafka_server_brokertopicmetrics_bytesoutpersec{topic="datagov.pipeline.bi_etl_qas.dbo.dim_delta_org_first_line",} 0.0
kafka_server_brokertopicmetrics_bytesoutpersec{topic="datagov.pipeline.syncerp_qas_db.dbo.hq_t077x",} 0.0
kafka_server_brokertopicmetrics_bytesoutpersec{topic="quality.quality-alert.product-line-unblock.v0",} 0.0
kafka_server_brokertopicmetrics_bytesoutpersec{topic="quality.quality-alert.product-line-block.v0",} 0.0
kafka_server_brokertopicmetrics_bytesoutpersec{topic="datagov.pipeline.mes_cdm_qas_db.sfcs.c_std_mfg_plant_t",} 0.0
kafka_server_brokertopicmetrics_bytesoutpersec{topic="datagov.pipeline.mes_cdm_qas_db.std_mes.c_std_line_desc_t",} 0.0
kafka_server_brokertopicmetrics_bytesoutpersec{topic="datagov.pipeline.hris_hrm_qas_db.dbo.temp_iorg_businessunit",} 0.0
kafka_server_brokertopicmetrics_bytesoutpersec{topic="quality.quality-alert.v0",} 0.0
kafka_server_brokertopicmetrics_bytesoutpersec{topic="datagov.pipeline.mes_cdm_qas_db.sfcs.c_std_mfg_site_t",} 0.0
```
#### `rect()` function and `sum()` aggregator
`rete()`用來計算時間序列在單位時間內的增長率或速率。因此這個function需要兩個參數一個是時間序列,另一個是時間範圍。在這個panel的Query都是使用[5m]。
`sum()`是一個aggregator,加上`without()`可以刪除不需要的label然後做aggregate。
```
sum without(device, fstype, mountpoint)(node_filesystem_size_bytes)
```
針對下列資料
```
node_filesystem_free_bytes{device="/dev/sda1",fstype="vfat",
instance="localhost:9100",job="node",mountpoint="/boot/efi"} 70300672
node_filesystem_free_bytes{device="/dev/sda5",fstype="ext4",
instance="localhost:9100",job="node",mountpoint="/"} 30791843840
node_filesystem_free_bytes{device="tmpfs",fstype="tmpfs",
instance="localhost:9100",job="node",mountpoint="/run"} 817094656
node_filesystem_free_bytes{device="tmpfs",fstype="tmpfs",
instance="localhost:9100",job="node",mountpoint="/run/lock"} 5238784
node_filesystem_free_bytes{device="tmpfs",fstype="tmpfs",
instance="localhost:9100",job="node",mountpoint="/run/user/1000"} 826912768
```
結果就是
```
{instance="localhost:9100",job="node"} 32511390720
```
#### Message In
```PromQL=
sum without(instance,topic)
(rate(kafka_server_brokertopicmetrics_messagesinpersec{
job="kafka-broker",
env="$env",
topic!=""}[5m]))
```
#### Message In Per Broker
```PromQL=
sum without(topic)
(rate(kafka_server_brokertopicmetrics_messagesinpersec{
job="kafka-broker",
env="$env",
instance=~"$instance",
topic!=""}[5m]))
```
#### Byte In
```PromQL=
sum without(instance,topic)
(rate(kafka_server_brokertopicmetrics_bytesinpersec{
job="kafka-broker",
env="$env",
topic!=""}[5m]))
```
#### Byte In Per Broker
```PromQL=
sum without(topic)
(rate(kafka_server_brokertopicmetrics_bytesinpersec{
job="kafka-broker",
env="$env",
instance=~"$instance",
topic!=""}[5m]))
```
#### Byte Out
```PromQL=
sum without(instance,topic)
(rate(kafka_server_brokertopicmetrics_bytesoutpersec{
job="kafka-broker",
env="$env",
topic!=""}[5m]))
```
#### Byte Out Per Broker
```PromQL=
sum without(topic)
(rate(kafka_server_brokertopicmetrics_bytesoutpersec{
job="kafka-broker",
env="$env",
instance=~"$instance",
topic!=""}[5m]))
```
kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent
kafka_network_processor_idlepercent
```
```
kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent
kafka_server_kafkarequesthandlerpool_requesthandleravgidlepercent_total
```
min(kafka_server_kafkarequesthandlerpool_requesthandleravgidlepercent_total{env="prd"})
```
```
avg by (instance) (kafka_network_processor_idlepercent{env="prd"})
```
(isr) under replicated partition
```
kafka_server_replicamanager_underreplicatedpartitions{instance="stg"}
```
```
rate(kafka_server_replicamanager_underreplicatedpartitions{instance="cndgnkfkstg02.deltaos.corp:9998"}[10m])
```
知道broker但那些topic不知道
```
kafka_cluster_partition_underreplicated{instance='twtpesqmskfkstg05.deltaos.corp:9998'}kafka_cluster_partition_underreplicated{instance='twtpesqmskfkstg05.deltaos.corp:9998'}
```
pass partition >= broker數量的topic,針對partition設定<=3的topic做rebalance
### JVM
#### Heap Usage
```
jvm_memory_bytes_used{job="kafka-broker", env="$env", area="heap"} / jvm_memory_bytes_max{job="kafka-broker", env="$env", area="heap"} * 100
```
## Reference
* [Monitoring Your Event Streams: Integrating Confluent with Prometheus and Grafana](https://www.confluent.io/blog/monitor-kafka-clusters-with-prometheus-grafana-and-confluent/)
* [Prometheus - JMX Exporter](https://github.com/prometheus/jmx_exporter)
* [Confluent - JMX Monitoring stacks](https://github.com/confluentinc/jmx-monitoring-stacks)
* [Confluent - Monitoring Kafka with JMX](https://docs.confluent.io/platform/current/kafka/monitoring.html#broker-metrics)
* [Prometheus - Query - Function - rate()](https://prometheus.io/docs/prometheus/latest/querying/functions/#rate)
* [Throttling Kafka Consumption - Dispelling Myths](https://www.linkedin.com/pulse/throttling-kafka-consumption-dispelling-myths-ganesh-ghag/)