## Kafka connector
目前有兩間公司有產出Kafka connector的有兩個機構
- Confluent
- Lenses
Confluent的大部分Connector都是閉源的且要收費
Lenses的是開源的,但是文件不太完全,可以參考[stream-reactor](https://github.com/lensesio/stream-reactor)
另外有一個CDC(Change Data Captur) tool [Debezium](https://debezium.io/) 其中也有Connector的功能
## 使用方式
以下簡略介紹Kafka Connector要怎麼使用
### 執行前準備
在Kafka的資料夾,找到`connect-standalone.properties`及`connect-standalone.sh`
`connect-standalone.sh`是用來執行connector使用的
`connect-standalone.properties`則是connector的Config
需要先修改`connect-standalone.properties`裡面的`plugin.path`
加入connetcor的路徑,若是多個可以用`,`分隔
```config=
plugin.path=/bitnami/kafka/libs/kafka-connect-mqtt-assembly-6.2.0.jar,/bitnami/kafka/libs/kafka-connect-influxdb-assembly-6.3.0.jar,/bitnami/kafka/libs/kafka-connect-redis-assembly-6.3.0.jar
```
若有需要,也可以把Plugin寫成檔案,詳細各種Config的寫法請參考各自的文件
- connect-mqtt-source.properties
```
name=mqtt-source
connector.class=io.lenses.streamreactor.connect.mqtt.source.MqttSourceConnector
# Contains the MQTT connection end points.
connect.mqtt.hosts=tcp://mqtt:1883
# Contains the MQTT connection user name.
connect.mqtt.username=test
# Contains the MQTT connection password.
connect.mqtt.password=test
# Specifies the MQTT quality of service.
connect.mqtt.service.quality=2
# Provides the time interval to establish the MQTT connection.
connect.mqtt.timeout=3000
# Specifies whether the MQTT connection should be clean or not.
connect.mqtt.clean=true
# The keep alive interval in milliseconds for the MQTT connection.
connect.mqtt.keep.alive=5000
# Contains the MQTT session client id.
#connect.mqtt.client.id=my_client_id
# Specifies the action to be taken if an error occurs while inserting the data.
# Options: NOOP, THROW, RETRY
connect.mqtt.error.policy=THROW
# The time in milliseconds between retries.
connect.mqtt.retry.interval=60000
# The maximum number of times to try the write again.
connect.mqtt.max.retries=20
# Specifies the MQTT retained flag.
connect.mqtt.retained.messages=false
# Determines if conversion exceptions should be thrown or not.
connect.mqtt.converter.throw.on.error=false
# Specifies the Avro Schema for the AvroConverter.
# Format: $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE
# or $KAFKA_TOPIC=$PATH_TO_AVRO_SCHEMA
#connect.converter.avro.schemas=my_mqtt_topic=/path/to/avro_schema.avsc
# Logs received MQTT messages.
connect.mqtt.log.message=false
# Contains the Kafka Connect Query Language describing the sourced MQTT source and the target Kafka topics.
connect.mqtt.kcql=INSERT INTO `$` SELECT * FROM `#` WITHCONVERTER=`io.lenses.streamreactor.connect.converters.source.JsonSimpleConverter`
# Provides the timeout to poll incoming messages.
connect.mqtt.polling.timeout=1000
# Replicates the shared subscriptions to all tasks instead of distributing them.
connect.mqtt.share.replicate=false
# Enables the output for how many records have been processed.
connect.progress.enabled=false
# Provides the path to the CA certificate file to use with the MQTT connection.
#connect.mqtt.ssl.ca.cert=/path/to/ca_cert.pem
# Provides the path to the certificate file to use with the MQTT connection.
#connect.mqtt.ssl.cert=/path/to/client_cert.pem
# Provides the path to the certificate private key file to use with the MQTT connection.
#connect.mqtt.ssl.key=/path/to/client_key.pem
# Process duplicate messages.
connect.mqtt.process.duplicates=false
```
### 執行方式
- 單獨啟動Connector
``` bash=
connect-standalone.sh connect-standalone.properties
```
- 啟動Connector及Plugin
```bash=
connect-standalone.sh connect-standalone.properties connect-mqtt-source.properties
```
執行後可以發現`8083`Port被connector使用,這時候我們就要打curl來啟動各種connector,或是執行時把其他connector的config附在後面,不過這樣不太彈性
- curl啟動Plugin
```bash
curl -X POST -H "Content-Type: application/json" -d '{
"name": "mqtt-source",
"config": {
"connector.class": "io.lenses.streamreactor.connect.mqtt.source.MqttSourceConnector",
"connect.mqtt.hosts": "tcp://mqtt:1883",
"connect.mqtt.username": "test",
"connect.mqtt.password": "test",
"connect.mqtt.service.quality": "2",
"connect.mqtt.timeout": "3000",
"connect.mqtt.clean": "true",
"connect.mqtt.keep.alive": "5000",
"connect.mqtt.error.policy": "THROW",
"connect.mqtt.retry.interval": "60000",
"connect.mqtt.max.retries": "20",
"connect.mqtt.retained.messages": "false",
"connect.mqtt.converter.throw.on.error": "false",
"connect.mqtt.log.message": "false",
"connect.mqtt.kcql": "INSERT INTO `$` SELECT payload FROM `#` WITHCONVERTER=`io.lenses.streamreactor.connect.converters.source.JsonSimpleConverter`",
"connect.mqtt.polling.timeout": "1000",
"connect.mqtt.share.replicate": "false",
"connect.progress.enabled": "false",
"connect.mqtt.process.duplicates": "false"
}
}' redis:8083/connectors
```
curl完如果Config無誤,會Return同樣的Payload,這時候就知道啟動成功了,除了POST來上傳Config以外,還有各種方式可以控制Connector,可以參考[Confluent RESTapi](https://docs.confluent.io/platform/current/connect/references/restapi.html)