# Mqtt+Kafaka+MariaDB
[TOC]
## Mqtt
### 安裝 Mqtt
* 安裝 mosquitto
`sudo yum -y install epel-release`
`sudo yum -y install mosquitto`
* 啟動 mosquitto
`sudo systemctl start mosquitto`
* 設定開機啟動 mosquitto
`sudo systemctl enable mosquitto`
### 設定 Mqtt
* 打開 mosquitto 的設定檔
`sudo vim /etc/mosquitto/mosquitto.conf`
* 在 234 行加上這些
```=1
listenner 1883 {主機IP}
protocol mqtt
listenner 9001
protocol websockets
```
* 重啟 mosuitto 套用設定
`sudo systemctl retstart mosquitto`
## 安裝 kafka 和 mqtt-connector
### [用學姐共筆的方法安裝](https://hackmd.io/_w1tfOYZRHm5DU0UrHezjg?view#%E5%AE%89%E8%A3%9Ddocker)
* 建立 docker 時會有一個 docker 安裝 kafka,一個 docker 安裝 mqtt-connector
### 安裝 docker
* 安裝一些必要套件
`sudo yum install -y yum-utils`
* add repository(新增 Docker 官方的 stable 套件庫(repository))
`sudo yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo`
* 安裝 docker
`sudo yum install -y docker-ce docker-ce-cli containerd.io`
* 設定 docker 開機啟動
`sudo systemctl enable docker`
* 啟動 docker
`sudo systemctl start docker`
### 設定要建立的 docker 和各自的內容
* 建立 docker-compose.yml
`vim docker-compose.yml`
> 有些目前應該不會用到,但我這裡還是全部都安裝
```yml=1
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.1.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:7.1.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema-registry:
image: confluentinc/cp-schema-registry:7.1.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
connect:
image: cnfldemos/cp-server-connect-datagen:0.5.3-7.1.0
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.1.1.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
control-center:
image: confluentinc/cp-enterprise-control-center:7.1.1
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- connect
- ksqldb-server
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.1.1
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:7.1.1
container_name: ksqldb-cli
depends_on:
- broker
- connect
- ksqldb-server
entrypoint: /bin/sh
tty: true
ksql-datagen:
image: confluentinc/ksqldb-examples:7.1.1
hostname: ksql-datagen
container_name: ksql-datagen
depends_on:
- ksqldb-server
- broker
- schema-registry
- connect
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b broker:29092 1 40 && \
echo Waiting for Confluent Schema Registry to be ready... && \
cub sr-ready schema-registry 8081 40 && \
echo Waiting a few seconds for topic creation to finish... && \
sleep 11 && \
tail -f /dev/null'"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
STREAMS_BOOTSTRAP_SERVERS: broker:29092
STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
STREAMS_SCHEMA_REGISTRY_PORT: 8081
rest-proxy:
image: confluentinc/cp-kafka-rest:7.1.1
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
```
* 安裝 docker-compose 指令
`sudo curl -L "https://github.com/docker/compose/releases/download/1.24.1/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose`
* 如果沒有 curl 指令
`sudo yum install curl`
* 修改 docker-compose 權限
`sudo chmod +x /usr/local/bin/docker-compose`
* 建立軟連結,讓指令找得到執行檔
`sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose`
* 確認是否安裝完成
`sudo docker-compose --version`

* 啟動所有 docker
`sudo docker-compose up -d`
* 檢查執行中的 docker
`sudo docker ps`
### 安裝 mqtt-connector
* 進入 connect 這個 docker
`sudo docker exec -it connect bash`
* 安裝 mqtt-connecter
`confluent-hub install --no-prompt confluentinc/kafka-connect-mqtt:latest
`
* 離開 container
`exit`
### 設定 connector 連線的設定檔
* 建立設定檔
`vim mqtt.json`
* 設定檔內容:
```json=1
{
"name": "mqtt-source",
"config": {
"connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max": 1,
"mqtt.server.uri": "tcp://{Mqtt主機IP}:1883",
"mqtt.topics": "{Mqtt topic名稱}",
"kafka.topic": "{kafka topic名稱}",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"confluent.topic.bootstrap.servers": "broker:29092",
"confluent.topic.replication.factor": 1
}
}
```
* 重啟 connect 這個 docker
`sudo docker restart connect`
* 把設定檔 mqtt.json 掛到 connector 上執行
`curl -s -X POST -H 'Content-Type: application/json' --data @mqtt.json http://localhost:8083/connectors`
* 確認 connector 是否建立成功
`curl localhost:8083/connectors`

:::info
* 刪掉已建立的連線
`curl -X DELETE http://localhost:8083/connectors/mqtt-source`
* 報錯 connection reset by peer 可能要等待一下或重啟後再等待一下就會正常
* 重啟 connect 這個 docker
`sudo docker restart connect`
:::
## 安裝 MariaDB
### 安裝 MariDB
* 安裝 mariadb
`sudo dnf install mariadb-server`
* 啟動 mariadb
`sudo systemctl start mariadb`
* 確認 mariadb 狀態
`sudo systemctl status mariadb`
* 設定開機啟動 mariadb
`sudo systemctl enable mariadb`
* 用 root 登入並建立使用者和給予權限
`sudo mysql -u root`
* 建立使用者
`CREATE USER 'my_user'@'localhost' IDENTIFIED BY 'my_password';`
* 給予權限
`GRANT ALL PRIVILEGES ON *.* TO 'my_user'@'localhost';`
* 刷新權限
`flush privileges;`
### 建立資料庫+資料表
* 建立資料庫 test
`create database test;`
* 使用 test 這個資料庫
`use test;`
* 建立資料表 kafkMessage
```mysql=1
CREATE TABLE `kafkaMessage` (`id` int(10) NOT NULL PRIMARY KEY auto_increment,`message` varchar(50) NOT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
```
* 退出 MariaDB
`exit`
## 建立 php 程式碼訂閱 kafka 並存到 MariaDB
### 安裝 php 連接 kafka 的套件 rdkafka
* 有點忘記當時是怎麼裝的了
> by chatGPT
### 建立 php 程式碼
* 建立 php 檔案
`vim getKafka.php`
* 檔案內容:
```php=1
<?php
$conf = new RdKafka\Conf();
$conf -> set('group.id','myConsumerGroup');
$consumer = new Rdkafka\Consumer($conf);
$consumer -> addBrokers('localhost:9092');
$topic = $consumer->newTopic('{kafka topic}');
$topic -> consumeStart(0,RD_KAFKA_OFFSET_BEGINNING);
$servername = "localhost";
$username = "{mariadb使用者名稱}";
$password = "{mariadb使用者密碼}";
$dbname = "{資料庫名稱}";
$conn = new mysqli($servername,$username,$password,$dbname);
while (true) {
$message = $topic->consume(0,1000);
if ($message) {
echo $message -> payload . "\n";
// write to mariadb
$thisMessage = $message->payload;
//echo $thisMessage;
//$message = "test";
$sql = "insert into `kafkaMessage`(message)values(";
$sql .= "'".$thisMessage."')";
$conn->query($sql);
}
}
$conn.close();
```
* 執行 getKafka.php
`php getKafka.php`
## 測試連線
### mqtt 發送訊息
* 範例 mqtt 的 topic 為 app/room2
`mosquitto_pub -h {mqtt主機IP} -t app/room2 -m "hello"`
### 執行 getKafka.php 將 mqtt -> kafka的訊息存到 MariaDB
`php getKafka.php`
### 進 MariaDB 檢查結果
`mysql -u root 使用者名稱 -p 使用者密碼`
`use test;`
`select * from kafkaMessage;`