# Mqtt+Kafaka+MariaDB [TOC] ## Mqtt ### 安裝 Mqtt * 安裝 mosquitto `sudo yum -y install epel-release` `sudo yum -y install mosquitto` * 啟動 mosquitto `sudo systemctl start mosquitto` * 設定開機啟動 mosquitto `sudo systemctl enable mosquitto` ### 設定 Mqtt * 打開 mosquitto 的設定檔 `sudo vim /etc/mosquitto/mosquitto.conf` * 在 234 行加上這些 ```=1 listenner 1883 {主機IP} protocol mqtt listenner 9001 protocol websockets ``` * 重啟 mosuitto 套用設定 `sudo systemctl retstart mosquitto` ## 安裝 kafka 和 mqtt-connector ### [用學姐共筆的方法安裝](https://hackmd.io/_w1tfOYZRHm5DU0UrHezjg?view#%E5%AE%89%E8%A3%9Ddocker) * 建立 docker 時會有一個 docker 安裝 kafka,一個 docker 安裝 mqtt-connector ### 安裝 docker * 安裝一些必要套件 `sudo yum install -y yum-utils` * add repository(新增 Docker 官方的 stable 套件庫(repository)) `sudo yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo` * 安裝 docker `sudo yum install -y docker-ce docker-ce-cli containerd.io` * 設定 docker 開機啟動 `sudo systemctl enable docker` * 啟動 docker `sudo systemctl start docker` ### 設定要建立的 docker 和各自的內容 * 建立 docker-compose.yml `vim docker-compose.yml` > 有些目前應該不會用到,但我這裡還是全部都安裝 ```yml=1 --- version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:7.1.1 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker: image: confluentinc/cp-server:7.1.1 hostname: broker container_name: broker depends_on: - zookeeper ports: - "9092:9092" - "9101:9101" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1 KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9101 KAFKA_JMX_HOSTNAME: localhost KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081 CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092 CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 CONFLUENT_METRICS_ENABLE: 'true' CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' schema-registry: image: confluentinc/cp-schema-registry:7.1.1 hostname: schema-registry container_name: schema-registry depends_on: - broker ports: - "8081:8081" environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 connect: image: cnfldemos/cp-server-connect-datagen:0.5.3-7.1.0 hostname: connect container_name: connect depends_on: - broker - schema-registry ports: - "8083:8083" environment: CONNECT_BOOTSTRAP_SERVERS: 'broker:29092' CONNECT_REST_ADVERTISED_HOST_NAME: connect CONNECT_GROUP_ID: compose-connect-group CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000 CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081 # CLASSPATH required due to CC-2422 CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.1.1.jar CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR control-center: image: confluentinc/cp-enterprise-control-center:7.1.1 hostname: control-center container_name: control-center depends_on: - broker - schema-registry - connect - ksqldb-server ports: - "9021:9021" environment: CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092' CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083' CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088" CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088" CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" CONTROL_CENTER_REPLICATION_FACTOR: 1 CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 CONFLUENT_METRICS_TOPIC_REPLICATION: 1 PORT: 9021 ksqldb-server: image: confluentinc/cp-ksqldb-server:7.1.1 hostname: ksqldb-server container_name: ksqldb-server depends_on: - broker - connect ports: - "8088:8088" environment: KSQL_CONFIG_DIR: "/etc/ksql" KSQL_BOOTSTRAP_SERVERS: "broker:29092" KSQL_HOST_NAME: ksqldb-server KSQL_LISTENERS: "http://0.0.0.0:8088" KSQL_CACHE_MAX_BYTES_BUFFERING: 0 KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" KSQL_KSQL_CONNECT_URL: "http://connect:8083" KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1 KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true' KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true' ksqldb-cli: image: confluentinc/cp-ksqldb-cli:7.1.1 container_name: ksqldb-cli depends_on: - broker - connect - ksqldb-server entrypoint: /bin/sh tty: true ksql-datagen: image: confluentinc/ksqldb-examples:7.1.1 hostname: ksql-datagen container_name: ksql-datagen depends_on: - ksqldb-server - broker - schema-registry - connect command: "bash -c 'echo Waiting for Kafka to be ready... && \ cub kafka-ready -b broker:29092 1 40 && \ echo Waiting for Confluent Schema Registry to be ready... && \ cub sr-ready schema-registry 8081 40 && \ echo Waiting a few seconds for topic creation to finish... && \ sleep 11 && \ tail -f /dev/null'" environment: KSQL_CONFIG_DIR: "/etc/ksql" STREAMS_BOOTSTRAP_SERVERS: broker:29092 STREAMS_SCHEMA_REGISTRY_HOST: schema-registry STREAMS_SCHEMA_REGISTRY_PORT: 8081 rest-proxy: image: confluentinc/cp-kafka-rest:7.1.1 depends_on: - broker - schema-registry ports: - 8082:8082 hostname: rest-proxy container_name: rest-proxy environment: KAFKA_REST_HOST_NAME: rest-proxy KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092' KAFKA_REST_LISTENERS: "http://0.0.0.0:8082" KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' ``` * 安裝 docker-compose 指令 `sudo curl -L "https://github.com/docker/compose/releases/download/1.24.1/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose` * 如果沒有 curl 指令 `sudo yum install curl` * 修改 docker-compose 權限 `sudo chmod +x /usr/local/bin/docker-compose` * 建立軟連結,讓指令找得到執行檔 `sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose` * 確認是否安裝完成 `sudo docker-compose --version` ![](https://hackmd.io/_uploads/BykvjsUEh.png) * 啟動所有 docker `sudo docker-compose up -d` * 檢查執行中的 docker `sudo docker ps` ### 安裝 mqtt-connector * 進入 connect 這個 docker `sudo docker exec -it connect bash` * 安裝 mqtt-connecter `confluent-hub install --no-prompt confluentinc/kafka-connect-mqtt:latest ` * 離開 container `exit` ### 設定 connector 連線的設定檔 * 建立設定檔 `vim mqtt.json` * 設定檔內容: ```json=1 { "name": "mqtt-source", "config": { "connector.class": "io.confluent.connect.mqtt.MqttSourceConnector", "tasks.max": 1, "mqtt.server.uri": "tcp://{Mqtt主機IP}:1883", "mqtt.topics": "{Mqtt topic名稱}", "kafka.topic": "{kafka topic名稱}", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "confluent.topic.bootstrap.servers": "broker:29092", "confluent.topic.replication.factor": 1 } } ``` * 重啟 connect 這個 docker `sudo docker restart connect` * 把設定檔 mqtt.json 掛到 connector 上執行 `curl -s -X POST -H 'Content-Type: application/json' --data @mqtt.json http://localhost:8083/connectors` * 確認 connector 是否建立成功 `curl localhost:8083/connectors` ![](https://hackmd.io/_uploads/BJZ06oI4h.png) :::info * 刪掉已建立的連線 `curl -X DELETE http://localhost:8083/connectors/mqtt-source` * 報錯 connection reset by peer 可能要等待一下或重啟後再等待一下就會正常 * 重啟 connect 這個 docker `sudo docker restart connect` ::: ## 安裝 MariaDB ### 安裝 MariDB * 安裝 mariadb `sudo dnf install mariadb-server` * 啟動 mariadb `sudo systemctl start mariadb` * 確認 mariadb 狀態 `sudo systemctl status mariadb` * 設定開機啟動 mariadb `sudo systemctl enable mariadb` * 用 root 登入並建立使用者和給予權限 `sudo mysql -u root` * 建立使用者 `CREATE USER 'my_user'@'localhost' IDENTIFIED BY 'my_password';` * 給予權限 `GRANT ALL PRIVILEGES ON *.* TO 'my_user'@'localhost';` * 刷新權限 `flush privileges;` ### 建立資料庫+資料表 * 建立資料庫 test `create database test;` * 使用 test 這個資料庫 `use test;` * 建立資料表 kafkMessage ```mysql=1 CREATE TABLE `kafkaMessage` (`id` int(10) NOT NULL PRIMARY KEY auto_increment,`message` varchar(50) NOT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ``` * 退出 MariaDB `exit` ## 建立 php 程式碼訂閱 kafka 並存到 MariaDB ### 安裝 php 連接 kafka 的套件 rdkafka * 有點忘記當時是怎麼裝的了 > by chatGPT ### 建立 php 程式碼 * 建立 php 檔案 `vim getKafka.php` * 檔案內容: ```php=1 <?php $conf = new RdKafka\Conf(); $conf -> set('group.id','myConsumerGroup'); $consumer = new Rdkafka\Consumer($conf); $consumer -> addBrokers('localhost:9092'); $topic = $consumer->newTopic('{kafka topic}'); $topic -> consumeStart(0,RD_KAFKA_OFFSET_BEGINNING); $servername = "localhost"; $username = "{mariadb使用者名稱}"; $password = "{mariadb使用者密碼}"; $dbname = "{資料庫名稱}"; $conn = new mysqli($servername,$username,$password,$dbname); while (true) { $message = $topic->consume(0,1000); if ($message) { echo $message -> payload . "\n"; // write to mariadb $thisMessage = $message->payload; //echo $thisMessage; //$message = "test"; $sql = "insert into `kafkaMessage`(message)values("; $sql .= "'".$thisMessage."')"; $conn->query($sql); } } $conn.close(); ``` * 執行 getKafka.php `php getKafka.php` ## 測試連線 ### mqtt 發送訊息 * 範例 mqtt 的 topic 為 app/room2 `mosquitto_pub -h {mqtt主機IP} -t app/room2 -m "hello"` ### 執行 getKafka.php 將 mqtt -> kafka的訊息存到 MariaDB `php getKafka.php` ### 進 MariaDB 檢查結果 `mysql -u root 使用者名稱 -p 使用者密碼` `use test;` `select * from kafkaMessage;`