# Source/Sink Connector
### Argo Events - eventsource-webhook-connector.yaml
```
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: webhook-connector
spec:
service:
ports:
- port: 12001
targetPort: 12001
webhook:
# event-source can run multiple HTTP servers. Simply define a unique port to start a new HTTP server
connector-delete:
# port to run HTTP server on
port: "12001"
# endpoint to listen to
endpoint: /connector/delete
# HTTP request method to allow. In this case, only POST requests are accepted
method: POST
connector-source-create:
port: "12001"
endpoint: /connector/source/create
method: POST
connector-source-update:
port: "12001"
endpoint: /connector/source/update
method: POST
```
### Argo Events - webhook-sensor-connector.yaml
```
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: webhook-connector
spec:
template:
serviceAccountName: argo-server
dependencies:
- name: delete
eventSourceName: webhook-connector
eventName: connector-delete
- name: source-create
eventSourceName: webhook-connector
eventName: connector-source-create
- name: source-update
eventSourceName: webhook-connector
eventName: connector-source-update
triggers:
- template:
conditions: "delete"
name: delete
http:
url: http://10.52.52.164:8083/connectors/
method: DELETE
retryStrategy:
steps: 3
duration: 3s
policy:
status:
allow:
- 200
- 201
# delete operator : User need input name of connectors, REST API get json value with "key=name" then append value to url
parameters:
- src:
dependencyName: delete
dataKey: body.name
dest: http.url
operation: append
- template:
conditions: "source-update"
name: source-update
http:
url: http://10.106.161.246:8080/async-function/connector-update
method: POST
payload:
- src:
dependencyName: source-update
dataKey: body.config
dest: config
- src:
dependencyName: source-update
dataKey: body.name
dest: name
headers:
## check log
X-Callback-Url: https://webhook.site/b0d707f0-bccb-4438-aa1f-8d27ec39c68f
retryStrategy:
steps: 3
duration: 3s
policy:
status:
allow:
- 200
- 201
- template:
conditions: "source-create"
name: source-create
http:
url: http://10.106.161.246:8080/async-function/connector-create
method: POST
payload:
- src:
dependencyName: source-create
dataKey: body.name
dest: name
- src:
dependencyName: source-create
dataKey: body.type
value: mysql
dest: type
- src:
dependencyName: source-create
dataKey: body.db
value: mysql
dest: db
- src:
dependencyName: source-create
dataKey: body.table
value: null
dest: table
- src:
dependencyName: source-create
dataKey: body.user
dest: user
- src:
dependencyName: source-create
dataKey: body.password
dest: password
- src:
dependencyName: source-create
dataKey: body.topic
value: "-"
dest: topic
- src:
dependencyName: source-create
dataKey: body.mode
value: bulk
dest: mode
- src:
dependencyName: source-create
dataKey: body.poll-interval-ms
value: "3600000"
dest: poll-interval-ms
- src:
dependencyName: source-create
dataKey: body.timestamp-column-name
value: null
dest: timestamp-column-name
- src:
dependencyName: source-create
dataKey: body.incrementing-column-name
value: null
dest: incrementing-column-name
headers:
X-Callback-Url: http://10.52.52.164:8083/connectors
Content-Type: application/json
retryStrategy:
steps: 3
duration: 3s
policy:
status:
allow:
- 200
- 201
```
:::success
:warning: **OpenFaaS 裡面的 requirements.txt 放置 python 安裝套件 e.g. requests**
:::
### OpenFaaS - connector-update/handler.py
```
import json
import requests
def handle(req):
data = json.loads(req)
url = 'http://10.52.52.164:8083/connectors/'+data['name']+'/config'
r = requests.get(url)
configs = r.json()
data["config"] = json.loads(data["config"])
for key in data["config"].keys():
if key in configs:
configs[key] = data["config"][key]
headers = {'content-type': 'application/json'}
result = requests.put(url, data=json.dumps(configs), headers=headers)
return result, configs
```
### OpenFaaS - connector-create/handler.py
```
import json
def handle(req):
data = json.loads(req)
if data["type"] == "mysql":
data["type"] = "jdbc:mysql://127.0.0.1:3306/"+data["db"]
if data["type"] == "postgres":
data["type"] = "jdbc:postgresql://127.0.0.1:5432/"+data["db"]
if data["type"] == "mssql":
data["type"] = "jdbc:sqlserver://127.0.0.1:1433;databaseName="+data["db"]
configs = {
"name": data["name"],
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": data["type"],
"connection.user": data["user"],
"connection.password": data["password"],
"topic.prefix": data["topic"]+"-",
"mode": data["mode"],
"poll.interval.ms": data["poll-interval-ms"],
"catalog.pattern": data["db"],
"table.whitelist": data["table"],
"timestamp.column.name": data["timestamp-column-name"],
"incrementing.column.name": data["incrementing-column-name"],
"validate.non.null": "false"
}
}
if data["mode"] == "bulk":
del configs["config"]["timestamp.column.name"]
del configs["config"]["validate.non.null"]
if data["mode"] == "timestamp":
del configs["config"]["poll.interval.ms"]
del configs["config"]["incrementing.column.name"]
if data["mode"] == "incrementing":
del configs["config"]["poll.interval.ms"]
del configs["config"]["timestamp.column.name"]
if data["table"] == '':
del configs["config"]["table.whitelist"]
return json.dumps(configs)
```
[Connector ref](https://www.ignite-service.cn/confluent/Kafka-ConvertersSerialization.html#%E9%85%8D%E7%BD%AE%E8%BD%AC%E6%8D%A2%E5%99%A8)