# Install kafka on CentOS 7
{%hackmd BJrTq20hE %}
## 一、Java 1.8 安裝 & 下載kafka
```
yum update
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
curl -O https://downloads.apache.org/kafka/2.7.0/kafka_2.12-2.7.0.tgz
```
下載kafka_2.12-2.7.0.tgz (不要載到src) 到 home 目錄解壓縮
-----------------------------------------------------------------------------------------------------------------------------------
## 二、 環境建置
### Kafka 環境配置
```
vi /home/kafka_2.12-2.7.0/config/server.properties
```
1. broker.id = 0 (第一台)
ex. (若為cluster)
broker.id = 1 (第二台)
broker.id = 2 (第三台)
4. zookeeper.connect=host1:2181,host2:2181,host3:2181

4. listeners=PLAINTEXT://localhost:9092 --> //localhost依對應Ip修改
5. advertised.listeners=PLAINTEXT://localhost:9092 --> //localhost依對應Ip修改
6. log.dirs=/home/kafka/kafka-logs 設定kafka log路徑 (視需求更改)
7. num.partitions=3 設定要將資料要切成幾個partitions (視需求更改)
8. offsets.topic.replication.factor=1 設定備份數量 (視需求更改)
9. log.retention.hours=8 設定資料留存時間 視需求更改)
10. delete.topic.enable=true 設定可刪除topic(刪除後重啟才會生效) -->此項需自行添加

---
### zookeeper設定配置
```
vi /home/kafka_2.12-2.7.0/config/zookeeper.properties
```
1. dataDir=/home/kafka/zookeeper/data
- - 存放資料的位置,可依照需求進行修改。
2. server.1=kafka00:2888:3888
- server.x,x數字部分要依照 kafka 的 broker.id。
- kafka00部分則依照寫在 /etc/hosts 的 kafka 的 IP 去對應。
若為多台可設定多個server.id
ex.

3. initLimit=5
- - 節點之間的確認存活時間
4. syncLimit=2
- 傳輸資料的時間

5. tickTime=2000
. 多台kafka啟動時設定的timeout時間
-------------------------------------------------------------------------------------------------------------------------------------
### Service建置
Create zookeeper.service 以及 kafka.service 並將其放進 /etc/systemd/system
檢查各service檔內配置路徑是否相符
```
vim /etc/systemd/system/kafka.service
vim /etc/systemd/system/zookeeper.service
```
輸入以下內容
在kafka.service直接輸入以下內容: (請注意修改Execstart/stop路徑)
``` yam=
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=root
ExecStart=/bin/sh -c '/home/kafka_2.12-2.7.0/bin/kafka-server-start.sh /home/kafka_2.12-2.7.0/config/server.properties > /home/kafka_2.12-2.7.0/kafka.log 2>&1'
ExecStop=/home/kafka_2.12-2.7.0/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
```
在zookeeper.service直接輸入以下內容:(請注意修改Execstart/stop路徑)
```yam=
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=root
ExecStart=/home/kafka_2.12-2.7.0/bin/zookeeper-server-start.sh /home/kafka_2.12-2.7.0/config/zookeeper.properties
ExecStop=/home/kafka_2.12-2.7.0/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
```
設定user=root 確定user有權限可以開啟.sh

```
systemctl daemon-reload
```
---
### 啟動zookeeper & kafka
kafka依賴於zookeeper,故需先啟動zookeeper,再啟動kafka
*注意:若為多台kafka需同時啟動,以讓cluster同時啟動,若不是同時啟用,可以到 `/home/kafka/kafka/kafka-logs/meta.properties` 中手動修改cluster id 使多台的id保持一致
```
systemctl start zookeeper
systemctl enable zookeeper
systemctl start kafka
systemctl enable kafka
```
```
(mkdir /home/kafka/zookeeper/data)
vi /home/kafka/zookeeper/data/myid
```
(Required!!)
設定第幾台zookerper編號,依照broker id 設定
ex.
broker.id=0 => myid 設定為0
broker.id=1 => myid 設定為1
...

```
systemctl restart zookeeper
```
打開防火牆 2888, 3888, 9092, 2181 port
ex.
```
firewall-cmd --add-port=2888/tcp --permanent
firewall-cmd --add-port=3888/tcp --permanent
firewall-cmd --add-port=9092/tcp --permanent
firewall-cmd --add-port=2181/tcp --permanent
firewall-cmd --reload
```
Done!
---
# kafka & logstash input & output 範例
Kafka 運作原理:
1. Data output 到 kafka queue
2. 由另一端logstash 收 kafka queue裡的data
範例如下
==================================================================================
logstash+kafka 端:
```java=
input{}
filter{}
output{
kafka {
topic_id => "kafkatest_01" //注意input與output的topic plugin名稱不一樣
bootstrap_servers => "10.6.91.153:9092,10.6.91.154:9092,10.6.91.155:9092"
max_request_size => 10485760
}
}
```
> [time=Wed, Mar 24, 2022 3:27 PM]
-----------------------------------------------------------------------------------------------------------------------------------
CLM logstash端:
```java=
input {
kafka {
id => "kafka01"
topics => ["kafkatest_01"] //注意input與output的topic plugin名稱不一樣
consumer_threads => 16
bootstrap_servers => "10.6.91.153:9092,10.6.91.154:9092,10.6.91.155:9092" //若為多台 => ex. ["host1:port","host2:port","host3:port"]
codec => json
}
filter{}
output{ES}
```
# kafka 命令操作
> script命令位於路徑/home/kafka_[version]/bin/
<font color="#C10066">**創建Topic**</font>
```java=
kafka-topics.sh --create --zookeeper 192.168.80.11:2181,192.168.80.12:2181,192.168.80.13:2181 --replication-factor 2 --partitions 3 --topic test
```
> --zookeeper:定義zookeeper集群地址,如果有多個IP地址使用逗號分割,一般使用一個IP即可
> --replication-factor:定義分區副本數,1代表單個副本,建議為2
> --partitions:定義分區數
> --topic:定義topic名稱
--------------------
<font color="#C10066">**測試發佈資訊**</font>
```java=
kafka-console-producer.sh --broker-list 192.168.80.11:9092,192.168.80.12:9092,192.168.80.13:9092 --topic test
```
ex.

-----------------------------------
<font color="#C10066">**查看當前消费資訊** </font>
```java=
kafka-console-consumer.sh --bootstrap-server 192.168.80.11:9092,192.168.80.12:9092,192.168.80.13:9092 --topic test --from-beginning
```
> --from-beginning:會把topic中以往所有的資訊都讀取出來
ex.

-----------------------------------
<font color="#C10066">**列出當前所有topic**</font>
```java=
kafka-topics.sh --list --zookeeper 192.168.80.11:2181,192.168.80.12:2181,192.168.80.13:2181
```
ex.

-----------------------------------
<font color="#C10066">**查看某個topic的設定详情**</font>
```java=
kafka-topics.sh --describe --zookeeper 192.168.80.11:2181,192.168.80.12:2181,192.168.80.13:2181
```
ex.

-----------------------------------
<font color="#C10066">**列出所有group**</font>
```java=
kafka-consumer-groups.sh --bootstrap-server 192.168.2.149:9092 --list
```
呈現範例
```java=
group-1234
group-5677
```
<font color="#C10066">**查看某個group的消費详情**</font>
```java=
kafka-consumer-groups.sh --bootstrap-server 192.168.2.149:9092 --group group_1234 --describe
```
呈現範例
```java=
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test_topic 0 641 641 0 consumer-1-c313db2b-7758-4de0-8cbd-025997d1a4cc /192.168.2.149 consumer-1
test_topic 1 632 632 0 consumer-1-c313db2b-7758-4de0-8cbd-025997d1a4cc /192.168.2.149 consumer-1
test_topic 2 699 699 0 consumer-1-c313db2b-7758-4de0-8cbd-025997d1a4cc /192.168.2.149 consumer-1
```
ex.
> TOPIC:該group裡消費的topic名稱
> PARTITION:分區編號
> CURRENT-OFFSET:該分區消費到的offset
> LOG-END-OFFSET:該分區當前的latest offset
> LAG:消費滯留區間,看消費速度和生產者速度,一般數值過大則可能出現消費跟不上,需要注意
> CONSUMER-ID:server端給該分區分配的consumer編號
> CLIENT-ID:消費者id
-----------------------------------
<font color="#C10066">**删除 topic**</font>
```java=
kafka-topics.sh --delete --zookeeper 192.168.80.11:2181,192.168.80.12:2181,192.168.80.13:2181 --topic test
```
ex.

-----------------------------------
<font color="#C10066">**修改分區數**</font>
```java=
kafka-topics.sh --zookeeper 192.168.80.11:2181,192.168.80.12:2181,192.168.80.13:2181 --alter --topic test --partitions 6
```
可以使用以下命令先確認目前設定值
```java=
kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-topic`
```
-----------------------------------
> [time=Wed, Apr 6, 2022 3:25 PM]
###### tags: `Other`
made by Mario
first made:
2021/04/16