# Source/Sink Connector ### Argo Events - eventsource-webhook-connector.yaml ``` apiVersion: argoproj.io/v1alpha1 kind: EventSource metadata: name: webhook-connector spec: service: ports: - port: 12001 targetPort: 12001 webhook: # event-source can run multiple HTTP servers. Simply define a unique port to start a new HTTP server connector-delete: # port to run HTTP server on port: "12001" # endpoint to listen to endpoint: /connector/delete # HTTP request method to allow. In this case, only POST requests are accepted method: POST connector-source-create: port: "12001" endpoint: /connector/source/create method: POST connector-source-update: port: "12001" endpoint: /connector/source/update method: POST ``` ### Argo Events - webhook-sensor-connector.yaml ``` apiVersion: argoproj.io/v1alpha1 kind: Sensor metadata: name: webhook-connector spec: template: serviceAccountName: argo-server dependencies: - name: delete eventSourceName: webhook-connector eventName: connector-delete - name: source-create eventSourceName: webhook-connector eventName: connector-source-create - name: source-update eventSourceName: webhook-connector eventName: connector-source-update triggers: - template: conditions: "delete" name: delete http: url: http://10.52.52.164:8083/connectors/ method: DELETE retryStrategy: steps: 3 duration: 3s policy: status: allow: - 200 - 201 # delete operator : User need input name of connectors, REST API get json value with "key=name" then append value to url parameters: - src: dependencyName: delete dataKey: body.name dest: http.url operation: append - template: conditions: "source-update" name: source-update http: url: http://10.106.161.246:8080/async-function/connector-update method: POST payload: - src: dependencyName: source-update dataKey: body.config dest: config - src: dependencyName: source-update dataKey: body.name dest: name headers: ## check log X-Callback-Url: https://webhook.site/b0d707f0-bccb-4438-aa1f-8d27ec39c68f retryStrategy: steps: 3 duration: 3s policy: status: allow: - 200 - 201 - template: conditions: "source-create" name: source-create http: url: http://10.106.161.246:8080/async-function/connector-create method: POST payload: - src: dependencyName: source-create dataKey: body.name dest: name - src: dependencyName: source-create dataKey: body.type value: mysql dest: type - src: dependencyName: source-create dataKey: body.db value: mysql dest: db - src: dependencyName: source-create dataKey: body.table value: null dest: table - src: dependencyName: source-create dataKey: body.user dest: user - src: dependencyName: source-create dataKey: body.password dest: password - src: dependencyName: source-create dataKey: body.topic value: "-" dest: topic - src: dependencyName: source-create dataKey: body.mode value: bulk dest: mode - src: dependencyName: source-create dataKey: body.poll-interval-ms value: "3600000" dest: poll-interval-ms - src: dependencyName: source-create dataKey: body.timestamp-column-name value: null dest: timestamp-column-name - src: dependencyName: source-create dataKey: body.incrementing-column-name value: null dest: incrementing-column-name headers: X-Callback-Url: http://10.52.52.164:8083/connectors Content-Type: application/json retryStrategy: steps: 3 duration: 3s policy: status: allow: - 200 - 201 ``` :::success :warning: **OpenFaaS 裡面的 requirements.txt 放置 python 安裝套件 e.g. requests** ::: ### OpenFaaS - connector-update/handler.py ``` import json import requests def handle(req): data = json.loads(req) url = 'http://10.52.52.164:8083/connectors/'+data['name']+'/config' r = requests.get(url) configs = r.json() data["config"] = json.loads(data["config"]) for key in data["config"].keys(): if key in configs: configs[key] = data["config"][key] headers = {'content-type': 'application/json'} result = requests.put(url, data=json.dumps(configs), headers=headers) return result, configs ``` ### OpenFaaS - connector-create/handler.py ``` import json def handle(req): data = json.loads(req) if data["type"] == "mysql": data["type"] = "jdbc:mysql://127.0.0.1:3306/"+data["db"] if data["type"] == "postgres": data["type"] = "jdbc:postgresql://127.0.0.1:5432/"+data["db"] if data["type"] == "mssql": data["type"] = "jdbc:sqlserver://127.0.0.1:1433;databaseName="+data["db"] configs = { "name": data["name"], "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": data["type"], "connection.user": data["user"], "connection.password": data["password"], "topic.prefix": data["topic"]+"-", "mode": data["mode"], "poll.interval.ms": data["poll-interval-ms"], "catalog.pattern": data["db"], "table.whitelist": data["table"], "timestamp.column.name": data["timestamp-column-name"], "incrementing.column.name": data["incrementing-column-name"], "validate.non.null": "false" } } if data["mode"] == "bulk": del configs["config"]["timestamp.column.name"] del configs["config"]["validate.non.null"] if data["mode"] == "timestamp": del configs["config"]["poll.interval.ms"] del configs["config"]["incrementing.column.name"] if data["mode"] == "incrementing": del configs["config"]["poll.interval.ms"] del configs["config"]["timestamp.column.name"] if data["table"] == '': del configs["config"]["table.whitelist"] return json.dumps(configs) ``` [Connector ref](https://www.ignite-service.cn/confluent/Kafka-ConvertersSerialization.html#%E9%85%8D%E7%BD%AE%E8%BD%AC%E6%8D%A2%E5%99%A8)