# Install kafka on CentOS 7 {%hackmd BJrTq20hE %} ## 一、Java 1.8  安裝 & 下載kafka ``` yum update yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel curl -O https://downloads.apache.org/kafka/2.7.0/kafka_2.12-2.7.0.tgz ``` 下載kafka_2.12-2.7.0.tgz  (不要載到src)  到 home 目錄解壓縮 ----------------------------------------------------------------------------------------------------------------------------------- ## 二、 環境建置 ### Kafka 環境配置 ``` vi /home/kafka_2.12-2.7.0/config/server.properties ``` 1. broker.id = 0 (第一台) ex. (若為cluster) broker.id = 1 (第二台) broker.id = 2 (第三台) 4. zookeeper.connect=host1:2181,host2:2181,host3:2181 ![](https://i.imgur.com/gNFZ1rs.png) 4. listeners=PLAINTEXT://localhost:9092 --> //localhost依對應Ip修改 5. advertised.listeners=PLAINTEXT://localhost:9092 --> //localhost依對應Ip修改 6. log.dirs=/home/kafka/kafka-logs  設定kafka log路徑  (視需求更改) 7. num.partitions=3  設定要將資料要切成幾個partitions  (視需求更改) 8. offsets.topic.replication.factor=1  設定備份數量   (視需求更改) 9. log.retention.hours=8 設定資料留存時間    視需求更改) 10. delete.topic.enable=true 設定可刪除topic(刪除後重啟才會生效)   -->此項需自行添加 ![](https://i.imgur.com/ZBL9Bsf.png) --- ### zookeeper設定配置 ``` vi /home/kafka_2.12-2.7.0/config/zookeeper.properties ``` 1. dataDir=/home/kafka/zookeeper/data - - 存放資料的位置,可依照需求進行修改。 2. server.1=kafka00:2888:3888 - server.x,x數字部分要依照 kafka 的 broker.id。 - kafka00部分則依照寫在 /etc/hosts 的 kafka 的 IP 去對應。 若為多台可設定多個server.id ex. ![](https://i.imgur.com/ReoP7sN.png) 3. initLimit=5 - - 節點之間的確認存活時間 4. syncLimit=2    - 傳輸資料的時間 ![](https://i.imgur.com/HgJ3eZs.png) 5. tickTime=2000 . 多台kafka啟動時設定的timeout時間 ------------------------------------------------------------------------------------------------------------------------------------- ### Service建置 Create zookeeper.service 以及 kafka.service 並將其放進 /etc/systemd/system 檢查各service檔內配置路徑是否相符 ``` vim /etc/systemd/system/kafka.service vim /etc/systemd/system/zookeeper.service ``` 輸入以下內容 在kafka.service直接輸入以下內容: (請注意修改Execstart/stop路徑) ``` yam= [Unit] Requires=zookeeper.service After=zookeeper.service [Service] Type=simple User=root ExecStart=/bin/sh -c '/home/kafka_2.12-2.7.0/bin/kafka-server-start.sh /home/kafka_2.12-2.7.0/config/server.properties > /home/kafka_2.12-2.7.0/kafka.log 2>&1' ExecStop=/home/kafka_2.12-2.7.0/bin/kafka-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target ``` 在zookeeper.service直接輸入以下內容:(請注意修改Execstart/stop路徑) ```yam= [Unit] Requires=network.target remote-fs.target After=network.target remote-fs.target [Service] Type=simple User=root ExecStart=/home/kafka_2.12-2.7.0/bin/zookeeper-server-start.sh /home/kafka_2.12-2.7.0/config/zookeeper.properties ExecStop=/home/kafka_2.12-2.7.0/bin/zookeeper-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target ``` 設定user=root   確定user有權限可以開啟.sh ![](https://i.imgur.com/6IRkC9k.png) ``` systemctl daemon-reload ``` --- ### 啟動zookeeper & kafka kafka依賴於zookeeper,故需先啟動zookeeper,再啟動kafka *注意:若為多台kafka需同時啟動,以讓cluster同時啟動,若不是同時啟用,可以到 `/home/kafka/kafka/kafka-logs/meta.properties` 中手動修改cluster id 使多台的id保持一致 ``` systemctl start zookeeper systemctl enable zookeeper systemctl start kafka systemctl enable kafka ``` ``` (mkdir /home/kafka/zookeeper/data) vi /home/kafka/zookeeper/data/myid ``` (Required!!) 設定第幾台zookerper編號,依照broker id 設定 ex. broker.id=0 => myid 設定為0 broker.id=1 => myid 設定為1 ... ![](https://i.imgur.com/PoddAmy.png) ``` systemctl restart zookeeper ``` 打開防火牆 2888, 3888, 9092, 2181 port ex. ``` firewall-cmd --add-port=2888/tcp --permanent firewall-cmd --add-port=3888/tcp --permanent firewall-cmd --add-port=9092/tcp --permanent firewall-cmd --add-port=2181/tcp --permanent firewall-cmd --reload ``` Done! --- # kafka & logstash input & output 範例 Kafka  運作原理: 1. Data output 到 kafka queue 2. 由另一端logstash 收 kafka queue裡的data 範例如下 ================================================================================== logstash+kafka 端: ```java= input{} filter{} output{ kafka { topic_id => "kafkatest_01" //注意input與output的topic plugin名稱不一樣 bootstrap_servers => "10.6.91.153:9092,10.6.91.154:9092,10.6.91.155:9092" max_request_size => 10485760 } } ``` > [time=Wed, Mar 24, 2022 3:27 PM] ----------------------------------------------------------------------------------------------------------------------------------- CLM logstash端: ```java= input { kafka { id => "kafka01" topics => ["kafkatest_01"] //注意input與output的topic plugin名稱不一樣 consumer_threads => 16 bootstrap_servers => "10.6.91.153:9092,10.6.91.154:9092,10.6.91.155:9092" //若為多台 => ex. ["host1:port","host2:port","host3:port"] codec => json } filter{} output{ES} ``` # kafka 命令操作 > script命令位於路徑/home/kafka_[version]/bin/ <font color="#C10066">**創建Topic**</font> ```java= kafka-topics.sh --create --zookeeper 192.168.80.11:2181,192.168.80.12:2181,192.168.80.13:2181 --replication-factor 2 --partitions 3 --topic test ``` > --zookeeper:定義zookeeper集群地址,如果有多個IP地址使用逗號分割,一般使用一個IP即可 > --replication-factor:定義分區副本數,1代表單個副本,建議為2 > --partitions:定義分區數 > --topic:定義topic名稱 -------------------- <font color="#C10066">**測試發佈資訊**</font> ```java= kafka-console-producer.sh --broker-list 192.168.80.11:9092,192.168.80.12:9092,192.168.80.13:9092 --topic test ``` ex. ![](https://i.imgur.com/mpbaYbd.png) ----------------------------------- <font color="#C10066">**查看當前消费資訊** </font> ```java= kafka-console-consumer.sh --bootstrap-server 192.168.80.11:9092,192.168.80.12:9092,192.168.80.13:9092 --topic test --from-beginning ``` > --from-beginning:會把topic中以往所有的資訊都讀取出來 ex. ![](https://i.imgur.com/o0z9z0B.png) ----------------------------------- <font color="#C10066">**列出當前所有topic**</font> ```java= kafka-topics.sh --list --zookeeper 192.168.80.11:2181,192.168.80.12:2181,192.168.80.13:2181 ``` ex. ![](https://i.imgur.com/GAslZOT.png) ----------------------------------- <font color="#C10066">**查看某個topic的設定详情**</font> ```java= kafka-topics.sh --describe --zookeeper 192.168.80.11:2181,192.168.80.12:2181,192.168.80.13:2181 ``` ex. ![](https://i.imgur.com/3DYcH5e.png) ----------------------------------- <font color="#C10066">**列出所有group**</font> ```java= kafka-consumer-groups.sh --bootstrap-server 192.168.2.149:9092 --list ``` 呈現範例 ```java= group-1234 group-5677 ``` <font color="#C10066">**查看某個group的消費详情**</font> ```java= kafka-consumer-groups.sh --bootstrap-server 192.168.2.149:9092 --group group_1234 --describe ``` 呈現範例 ```java= TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test_topic 0 641 641 0 consumer-1-c313db2b-7758-4de0-8cbd-025997d1a4cc /192.168.2.149 consumer-1 test_topic 1 632 632 0 consumer-1-c313db2b-7758-4de0-8cbd-025997d1a4cc /192.168.2.149 consumer-1 test_topic 2 699 699 0 consumer-1-c313db2b-7758-4de0-8cbd-025997d1a4cc /192.168.2.149 consumer-1 ``` ex. > TOPIC:該group裡消費的topic名稱 > PARTITION:分區編號 > CURRENT-OFFSET:該分區消費到的offset > LOG-END-OFFSET:該分區當前的latest offset > LAG:消費滯留區間,看消費速度和生產者速度,一般數值過大則可能出現消費跟不上,需要注意 > CONSUMER-ID:server端給該分區分配的consumer編號 > CLIENT-ID:消費者id ----------------------------------- <font color="#C10066">**删除 topic**</font> ```java= kafka-topics.sh --delete --zookeeper 192.168.80.11:2181,192.168.80.12:2181,192.168.80.13:2181 --topic test ``` ex. ![](https://i.imgur.com/hU7bQCt.png) ----------------------------------- <font color="#C10066">**修改分區數**</font> ```java= kafka-topics.sh --zookeeper 192.168.80.11:2181,192.168.80.12:2181,192.168.80.13:2181 --alter --topic test --partitions 6 ``` 可以使用以下命令先確認目前設定值 ```java= kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-topic` ``` ----------------------------------- > [time=Wed, Apr 6, 2022 3:25 PM] ###### tags: `Other` made by Mario first made: 2021/04/16