---
title: Apache Flume 和 kafka 串接
tags: Apache, Flume, Kafka
description: Apache Flume 和 kafka 串接
---
# Apache Flume 和 kafka 串接
## Apache Flume 和 Kafka 集成原因
***注意:您的本地環境必須先安裝好 Flume 和 kafka***
Flume 為日誌採集系統,線上資料一般透過 local 或是 socket 的方式傳輸給另外一個系統,再大量各式各樣的線上系統下很難分別去向 kafka 寫入資料,因此此時會在 kafka 前加入 flume 架構幫助系統傳輸資料。
flume 可以當作數據傳輸的通道,而在高峰時間,當輸入資料大於輸出資料,會導致來不及寫入資料,導致資料流失,因此會導入 kafka,kafka 在日誌上會有 cache 來儲存資料,因此當系統重啟、資料來不及寫入等等,kafka 會有日誌 log 所以資料不會遺失。
數據 -> flume -> kafka -> hdfs -> 離線處理
數據 -> flume -> kafka -> storm
## Apache Flume 設定
先進入 flume 資料夾,在 conf 目錄下新增 kafka-flume-test01.conf 檔案
```
$ cd flume
$ nano conf/kafka-flume-test01.conf
```
在 kafka-flume-test01.conf 內容新增
```
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinks.sk1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sk1.topic = kafkatest
a1.sinks.sk1.kafka.bootstrap.servers = localhost:9092
a1.sinks.sk1.serializer.class = kafka.serializer.StringEncoder
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
```
啟動 flume 監聽端口
```
$ bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/kafka-flume-test01.conf -Dflume.root.logger=INFO,console
```
## Kafka 設定
創建一個kafka Topic 測試
```
$ cd kafka_2.13-3.0.0
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 5 --partitions 1 --topic kafkatest
```
啟動 kafka 監聽端口
```
$ bin/kafka-console-consumer.sh --topic kafkatest --from-beginning --bootstrap-server localhost:9092
```
## 測試
```
$ nc localhost 44444
```
從44444端口輸入數據測試
```
> test_flume2kafka01
OK
> test_flume2kafka02
OK
```
flume 監聽 44444 端口並且以 producer 角色將數據流向 kafka

kafka consumer 端口監聽內容如下

---
## 參考
* https://www.gushiciku.cn/pl/g4FT/zh-tw
* https://zhuanlan.zhihu.com/p/354689744
## Thank you! :dash:
You can find me on
- GitHub: https://github.com/shaung08
- Email: a2369875@gmail.com