## Kafka connector 目前有兩間公司有產出Kafka connector的有兩個機構 - Confluent - Lenses Confluent的大部分Connector都是閉源的且要收費 Lenses的是開源的,但是文件不太完全,可以參考[stream-reactor](https://github.com/lensesio/stream-reactor) 另外有一個CDC(Change Data Captur) tool [Debezium](https://debezium.io/) 其中也有Connector的功能 ## 使用方式 以下簡略介紹Kafka Connector要怎麼使用 ### 執行前準備 在Kafka的資料夾,找到`connect-standalone.properties`及`connect-standalone.sh` `connect-standalone.sh`是用來執行connector使用的 `connect-standalone.properties`則是connector的Config 需要先修改`connect-standalone.properties`裡面的`plugin.path` 加入connetcor的路徑,若是多個可以用`,`分隔 ```config= plugin.path=/bitnami/kafka/libs/kafka-connect-mqtt-assembly-6.2.0.jar,/bitnami/kafka/libs/kafka-connect-influxdb-assembly-6.3.0.jar,/bitnami/kafka/libs/kafka-connect-redis-assembly-6.3.0.jar ``` 若有需要,也可以把Plugin寫成檔案,詳細各種Config的寫法請參考各自的文件 - connect-mqtt-source.properties ``` name=mqtt-source connector.class=io.lenses.streamreactor.connect.mqtt.source.MqttSourceConnector # Contains the MQTT connection end points. connect.mqtt.hosts=tcp://mqtt:1883 # Contains the MQTT connection user name. connect.mqtt.username=test # Contains the MQTT connection password. connect.mqtt.password=test # Specifies the MQTT quality of service. connect.mqtt.service.quality=2 # Provides the time interval to establish the MQTT connection. connect.mqtt.timeout=3000 # Specifies whether the MQTT connection should be clean or not. connect.mqtt.clean=true # The keep alive interval in milliseconds for the MQTT connection. connect.mqtt.keep.alive=5000 # Contains the MQTT session client id. #connect.mqtt.client.id=my_client_id # Specifies the action to be taken if an error occurs while inserting the data. # Options: NOOP, THROW, RETRY connect.mqtt.error.policy=THROW # The time in milliseconds between retries. connect.mqtt.retry.interval=60000 # The maximum number of times to try the write again. connect.mqtt.max.retries=20 # Specifies the MQTT retained flag. connect.mqtt.retained.messages=false # Determines if conversion exceptions should be thrown or not. connect.mqtt.converter.throw.on.error=false # Specifies the Avro Schema for the AvroConverter. # Format: $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE # or $KAFKA_TOPIC=$PATH_TO_AVRO_SCHEMA #connect.converter.avro.schemas=my_mqtt_topic=/path/to/avro_schema.avsc # Logs received MQTT messages. connect.mqtt.log.message=false # Contains the Kafka Connect Query Language describing the sourced MQTT source and the target Kafka topics. connect.mqtt.kcql=INSERT INTO `$` SELECT * FROM `#` WITHCONVERTER=`io.lenses.streamreactor.connect.converters.source.JsonSimpleConverter` # Provides the timeout to poll incoming messages. connect.mqtt.polling.timeout=1000 # Replicates the shared subscriptions to all tasks instead of distributing them. connect.mqtt.share.replicate=false # Enables the output for how many records have been processed. connect.progress.enabled=false # Provides the path to the CA certificate file to use with the MQTT connection. #connect.mqtt.ssl.ca.cert=/path/to/ca_cert.pem # Provides the path to the certificate file to use with the MQTT connection. #connect.mqtt.ssl.cert=/path/to/client_cert.pem # Provides the path to the certificate private key file to use with the MQTT connection. #connect.mqtt.ssl.key=/path/to/client_key.pem # Process duplicate messages. connect.mqtt.process.duplicates=false ``` ### 執行方式 - 單獨啟動Connector ``` bash= connect-standalone.sh connect-standalone.properties ``` - 啟動Connector及Plugin ```bash= connect-standalone.sh connect-standalone.properties connect-mqtt-source.properties ``` 執行後可以發現`8083`Port被connector使用,這時候我們就要打curl來啟動各種connector,或是執行時把其他connector的config附在後面,不過這樣不太彈性 - curl啟動Plugin ```bash curl -X POST -H "Content-Type: application/json" -d '{ "name": "mqtt-source", "config": { "connector.class": "io.lenses.streamreactor.connect.mqtt.source.MqttSourceConnector", "connect.mqtt.hosts": "tcp://mqtt:1883", "connect.mqtt.username": "test", "connect.mqtt.password": "test", "connect.mqtt.service.quality": "2", "connect.mqtt.timeout": "3000", "connect.mqtt.clean": "true", "connect.mqtt.keep.alive": "5000", "connect.mqtt.error.policy": "THROW", "connect.mqtt.retry.interval": "60000", "connect.mqtt.max.retries": "20", "connect.mqtt.retained.messages": "false", "connect.mqtt.converter.throw.on.error": "false", "connect.mqtt.log.message": "false", "connect.mqtt.kcql": "INSERT INTO `$` SELECT payload FROM `#` WITHCONVERTER=`io.lenses.streamreactor.connect.converters.source.JsonSimpleConverter`", "connect.mqtt.polling.timeout": "1000", "connect.mqtt.share.replicate": "false", "connect.progress.enabled": "false", "connect.mqtt.process.duplicates": "false" } }' redis:8083/connectors ``` curl完如果Config無誤,會Return同樣的Payload,這時候就知道啟動成功了,除了POST來上傳Config以外,還有各種方式可以控制Connector,可以參考[Confluent RESTapi](https://docs.confluent.io/platform/current/connect/references/restapi.html)