# Monitoring Apache Kafka with Prometheus ###### tags: `Apache Kafka` `Prometheus` ## 直接說安裝心得 1. 建議使用java agent的方式 2. 因為zookeeper, broker, connect...etc都有各自的metrics,所以如果在公司管理的VM要預先規劃要給Prometheus收取metrics的ports 3. 舊的版本的jmx exporter有可能遇到Too many open file的問題,見下面。 ## 舊版本JMX exporter遇到Too many open file的查法 確定pid ```bash= ps -ef | grep javaagent ``` 如果倒掉了可以看server log確認是不是Too many open file。 如果還活著可以透過下面這個指令觀察open file的數量。 ```bash= ls -l /proc/[PID]/fd | wc -l ``` 然後,用下面指令看一下open file的類型 ```bash= ls -l /proc/[PID]/fd ``` 會發現為了網路開的檔案量很大。然後用下列指令可以看JMX Exporter用的那個port的socket的state,很多都是無法消化的CLOSE_WAIT。 ```bash= ss -tpn ``` 個人經驗不管你怎麼設定sun的httpserver的timeout都不能完全解決問題,我那時候是直接把blog上使用的0.15.0改成當時最新的0.18.0就沒遇到這個問題。 ## Confluent - JMX Monitoring stacks Grafana Kafka broker dashboard ### Throughput In/Out #### Metrics與Prometheus text exposition format per instance的例子 * kafka_server_brokertopicmetrics_messagesinpersec ``` # HELP kafka_server_brokertopicmetrics_messagesinpersec Attribute exposed for management kafka.server:name=MessagesInPerSec,type=BrokerTopicMetrics,attribute=Count # TYPE kafka_server_brokertopicmetrics_messagesinpersec untyped kafka_server_brokertopicmetrics_messagesinpersec{topic="__consumer_offsets",} 23.0 kafka_server_brokertopicmetrics_messagesinpersec{topic="product.project-management.project.v0",} 54.0 kafka_server_brokertopicmetrics_messagesinpersec{topic="test.testtopic.v0",} 1.0 kafka_server_brokertopicmetrics_messagesinpersec 78.0 ``` * kafka_server_brokertopicmetrics_bytesinpersec ``` # HELP kafka_server_brokertopicmetrics_bytesinpersec Attribute exposed for management kafka.server:name=BytesInPerSec,type=BrokerTopicMetrics,attribute=Count # TYPE kafka_server_brokertopicmetrics_bytesinpersec untyped kafka_server_brokertopicmetrics_bytesinpersec{topic="__consumer_offsets",} 70419.0 kafka_server_brokertopicmetrics_bytesinpersec{topic="product.project-management.project.v0",} 28183.0 kafka_server_brokertopicmetrics_bytesinpersec{topic="test.testtopic.v0",} 78.0 kafka_server_brokertopicmetrics_bytesinpersec 98680.0 ``` * kafka_server_brokertopicmetrics_bytesoutpersec ``` # HELP kafka_server_brokertopicmetrics_bytesoutpersec Attribute exposed for management kafka.server:name=BytesOutPerSec,type=BrokerTopicMetrics,attribute=Count # TYPE kafka_server_brokertopicmetrics_bytesoutpersec untyped kafka_server_brokertopicmetrics_bytesoutpersec{topic="test.node-red.v0",} 0.0 kafka_server_brokertopicmetrics_bytesoutpersec{topic="datagov.pipeline.mgt.offset",} 0.0 kafka_server_brokertopicmetrics_bytesoutpersec{topic="datagov.pipeline.syncerp_qas_db.dbo.ytbrct",} 0.0 kafka_server_brokertopicmetrics_bytesoutpersec{topic="product.project-management.project.activity-change.v0",} 0.0 kafka_server_brokertopicmetrics_bytesoutpersec 0.0 kafka_server_brokertopicmetrics_bytesoutpersec{topic="test.testtopic.v0",} 0.0 kafka_server_brokertopicmetrics_bytesoutpersec{topic="human-resource.employee.resignation.v0",} 0.0 kafka_server_brokertopicmetrics_bytesoutpersec{topic="datagov.pipeline.bi_etl_qas.dbo.dim_delta_org_first_line",} 0.0 kafka_server_brokertopicmetrics_bytesoutpersec{topic="datagov.pipeline.syncerp_qas_db.dbo.hq_t077x",} 0.0 kafka_server_brokertopicmetrics_bytesoutpersec{topic="quality.quality-alert.product-line-unblock.v0",} 0.0 kafka_server_brokertopicmetrics_bytesoutpersec{topic="quality.quality-alert.product-line-block.v0",} 0.0 kafka_server_brokertopicmetrics_bytesoutpersec{topic="datagov.pipeline.mes_cdm_qas_db.sfcs.c_std_mfg_plant_t",} 0.0 kafka_server_brokertopicmetrics_bytesoutpersec{topic="datagov.pipeline.mes_cdm_qas_db.std_mes.c_std_line_desc_t",} 0.0 kafka_server_brokertopicmetrics_bytesoutpersec{topic="datagov.pipeline.hris_hrm_qas_db.dbo.temp_iorg_businessunit",} 0.0 kafka_server_brokertopicmetrics_bytesoutpersec{topic="quality.quality-alert.v0",} 0.0 kafka_server_brokertopicmetrics_bytesoutpersec{topic="datagov.pipeline.mes_cdm_qas_db.sfcs.c_std_mfg_site_t",} 0.0 ``` #### `rect()` function and `sum()` aggregator `rete()`用來計算時間序列在單位時間內的增長率或速率。因此這個function需要兩個參數一個是時間序列,另一個是時間範圍。在這個panel的Query都是使用[5m]。 `sum()`是一個aggregator,加上`without()`可以刪除不需要的label然後做aggregate。 ``` sum without(device, fstype, mountpoint)(node_filesystem_size_bytes) ``` 針對下列資料 ``` node_filesystem_free_bytes{device="/dev/sda1",fstype="vfat", instance="localhost:9100",job="node",mountpoint="/boot/efi"} 70300672 node_filesystem_free_bytes{device="/dev/sda5",fstype="ext4", instance="localhost:9100",job="node",mountpoint="/"} 30791843840 node_filesystem_free_bytes{device="tmpfs",fstype="tmpfs", instance="localhost:9100",job="node",mountpoint="/run"} 817094656 node_filesystem_free_bytes{device="tmpfs",fstype="tmpfs", instance="localhost:9100",job="node",mountpoint="/run/lock"} 5238784 node_filesystem_free_bytes{device="tmpfs",fstype="tmpfs", instance="localhost:9100",job="node",mountpoint="/run/user/1000"} 826912768 ``` 結果就是 ``` {instance="localhost:9100",job="node"} 32511390720 ``` #### Message In ```PromQL= sum without(instance,topic) (rate(kafka_server_brokertopicmetrics_messagesinpersec{ job="kafka-broker", env="$env", topic!=""}[5m])) ``` #### Message In Per Broker ```PromQL= sum without(topic) (rate(kafka_server_brokertopicmetrics_messagesinpersec{ job="kafka-broker", env="$env", instance=~"$instance", topic!=""}[5m])) ``` #### Byte In ```PromQL= sum without(instance,topic) (rate(kafka_server_brokertopicmetrics_bytesinpersec{ job="kafka-broker", env="$env", topic!=""}[5m])) ``` #### Byte In Per Broker ```PromQL= sum without(topic) (rate(kafka_server_brokertopicmetrics_bytesinpersec{ job="kafka-broker", env="$env", instance=~"$instance", topic!=""}[5m])) ``` #### Byte Out ```PromQL= sum without(instance,topic) (rate(kafka_server_brokertopicmetrics_bytesoutpersec{ job="kafka-broker", env="$env", topic!=""}[5m])) ``` #### Byte Out Per Broker ```PromQL= sum without(topic) (rate(kafka_server_brokertopicmetrics_bytesoutpersec{ job="kafka-broker", env="$env", instance=~"$instance", topic!=""}[5m])) ``` kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent kafka_network_processor_idlepercent ``` ``` kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent kafka_server_kafkarequesthandlerpool_requesthandleravgidlepercent_total ``` min(kafka_server_kafkarequesthandlerpool_requesthandleravgidlepercent_total{env="prd"}) ``` ``` avg by (instance) (kafka_network_processor_idlepercent{env="prd"}) ``` (isr) under replicated partition ``` kafka_server_replicamanager_underreplicatedpartitions{instance="stg"} ``` ``` rate(kafka_server_replicamanager_underreplicatedpartitions{instance="cndgnkfkstg02.deltaos.corp:9998"}[10m]) ``` 知道broker但那些topic不知道 ``` kafka_cluster_partition_underreplicated{instance='twtpesqmskfkstg05.deltaos.corp:9998'}kafka_cluster_partition_underreplicated{instance='twtpesqmskfkstg05.deltaos.corp:9998'} ``` pass partition >= broker數量的topic,針對partition設定<=3的topic做rebalance ### JVM #### Heap Usage ``` jvm_memory_bytes_used{job="kafka-broker", env="$env", area="heap"} / jvm_memory_bytes_max{job="kafka-broker", env="$env", area="heap"} * 100 ``` ## Reference * [Monitoring Your Event Streams: Integrating Confluent with Prometheus and Grafana](https://www.confluent.io/blog/monitor-kafka-clusters-with-prometheus-grafana-and-confluent/) * [Prometheus - JMX Exporter](https://github.com/prometheus/jmx_exporter) * [Confluent - JMX Monitoring stacks](https://github.com/confluentinc/jmx-monitoring-stacks) * [Confluent - Monitoring Kafka with JMX](https://docs.confluent.io/platform/current/kafka/monitoring.html#broker-metrics) * [Prometheus - Query - Function - rate()](https://prometheus.io/docs/prometheus/latest/querying/functions/#rate) * [Throttling Kafka Consumption - Dispelling Myths](https://www.linkedin.com/pulse/throttling-kafka-consumption-dispelling-myths-ganesh-ghag/)