# Apache Flink - DataBroker 資料代理
###### tags: `data-stream` `data-broker`
## DataBroker 簡介
<!-- TODO 他來做什麼? -->
DataBroker 透過管道的方式設計介接各式服務的資料流,提供資料路由以及各式服務的連接器,能夠提供彈性的資料閘道服務設計,輕易整合至既有系統中。並且使用容易理解的`YAML`格式設定,來實現設定即程式碼的目的。

### 產品特色
- 提供多種輸入資料流與輸出資料流。
- 支援資料流路由,容易改造資料流。
- 提供資料過濾機制。
- 提供資料欄位映射,輕易轉換與驗證。
- 提供失敗重送與失敗處理機制。
- 設定即程式碼(Configuration as Code),易使用。
## 設定檔結構
設定文件中提供的三個全域設定元素:`namespace`、`source`和`sink`,分別對應基本設定命名項、資料輸入流項,以及資料輸出流相關的設定項。
下面提供的一個摘要設定文件:
```yaml=
namespace: foo.demo
source:
- type: kafka
route: foo
sink:
- type: mysql
route: !regex { pattern: '^[a-zA-Z0-9._.-]+'}
filter: "ctx.bar == 'baz'"
on_retry:
...
on_failure:
...
```
### namespace 區段
> 命名該設定的名稱。命名規範與Java package的一致;長度限制為`128`。
### source 區段
> 輸入流的相關設定。
- type: 資料輸入流的類別。
- route: 路由名稱。[參考:資料路由設定](#資料路由設定)
- config: 資料輸入流的相關設定。
- properties: 設定檔。
- metadata: 資料輸入欄位的設定。[參考:資料欄位映射設定](#資料欄位映射設定)
### sink 區段
> 輸出流的相關設定。
- type: 資料輸出流的類別。
- route: 路由匹配得設定。[參考:資料路由設定](#資料路由設定)
- filter: 資料過濾設定。[參考:資料欄位檢核設定](#資料欄位檢核設定)
- config: 資料輸出流的相關設定。
- properties: 設定檔。
- mapping: 資料輸出欄位的設定。[參考:資料欄位映射設定](#資料欄位映射設定)
- on_retry: 重送機制設定。[參考:啟用 retry 或 failure 設定](#啟用-retry-或-failure-設定)
> 註:目前只支援`elasticsearch`。
- on_failure: 失敗機制設定。
## 資料路由設定
路由的設定是成對的設定,`source`定義路由名稱,`sink`定義匹配規則。
下面範例定義了兩個輸入流及兩個輸出流,其中`source-1`定義路由名稱`foo`,`source-2`定義路由名稱`bar`;其中`sink-1`接收路由名稱為`foo`與`baz`的輸入流,故能接收來自`source-1`的資料,但不會接收來自`source-2`的資料;而`sink-2`會接收到全部資料。
```yaml=
source:
- type: source-1
route: foo
- type: source-2
route: bar
sink:
- type: sink-1
route: !regex { pattern: '^(foo||baz)$'}
- type: sink-2
route: !regex { pattern: '^[a-zA-Z0-9._.-]+$'}
```
## 資料欄位映射設定
資料欄位映射設定將資料結構化,依照欄位的型別進行資料轉換與目的資料結構相容,且提供欄位檢核或預設值。
欄位映射是依照`source`的`metadata`屬性與`sink`的`mapping`屬性成對設定。`source`的`metadata`屬性可省略。
- metadata: 資料輸入欄位設定
- name: 名稱
- type: 型別
- precision: 精度
> 註:適用於 timestamp 類型,可定義 sec, msec。
- mapping: 資料輸出欄位設定
- name: 名稱
- type: 型別
- precision: 精度
> 註:適用於 timestamp 類型,可定義 sec, msec。
- tag: 欄位檢核
- source: 資料輸入欄位名稱
- default: 預設值
下面範例定義了資料輸出的欄位其名稱為`value`,型別為`string`,來源欄位名稱`value`,預設值為`baz`。
```yaml=
source:
- type: kafka
config:
...
sink:
- type: elasticsearch
config:
...
mapping:
- name: value
type: string
source: value
default: baz
```
下面範例定義了資料輸出至`elasticsearch`sink,其中欄位`create_date`精度為`msec`的`timestamp`型別,將寫入`create_date_src`的值,並轉換其值的精度由`sec`轉換為`msec`。
```yaml=
source:
- type: kafka
config:
...
metadata:
- name: create_date_src
type: timestamp
precision: sec
sink:
- type: elasticsearch
config:
...
mapping:
- name: create_date
type: timestamp
precision: msec
source: create_date_src
```
## 資料欄位檢核設定
針對來源欄位的基礎檢核,於`tag`設定,可組合多個檢核。
- required:
- 必填。
> 註:支援所有欄位型別。
- non_empty:
- 不為空,允許為null。
> 註:僅支援 string 型別。
- non_zero:
- 數值不允許為0,允許正、負數、null。
> 註:僅支援 number 欄位。
- non_negative_integer:
- 數值不允許為負數,允許正數、null。
> 註:僅支援 number 欄位。
下面範例定義了名稱為`id`的欄位,型別為`long`,其欄位為必填,且不可為零和負數。
```yaml
mapping:
- name: id
type: long
tag:
- 'required'
- 'non_zero'
- 'non_negative_integer'
source: id
```
## 資料過濾設定
資料過濾的設定,於`sink`下,針對映射的欄位做邏輯判斷(`Ruby script`),決定是否處理該筆資料。
下面範例定義了兩個映射欄位,分別為`value`、`create_data`;資料過濾對`value`內的值使用判斷等於`baz`才進行處理,否則忽略該筆資料。
```yaml=
sink:
- type: elasticsearch
filter: "ctx.value == 'baz'"
config:
...
mapping:
- name: value
type: string
source: value
- name: create_date
type: timestamp
source: create_date
```
## 資料查詢佔位符表示方式
查詢語句支援佔位符標籤,用於替換參數值,可以使用欄位名稱替換個別欄位,亦可使用`*`號替換整個文件。
佔位符表示式:
1. 替換個別欄位,使用欄位名稱表示;下面將替換為`value`欄位的值。
```java=
%{value}
```
2. 替換整個文件,使用`*`表示。
```java=
%{*}
```
3. 替換整個文件並使用json格式化。
```java=
%{*:json}
```
範例一:下面使用`elasticsearch`的DSL舉例,`%{*}`會被替換成`{"value":"baz"}`。
> 註:目前`elasticsearch`僅支援`document`,不支援`handler`。
```yaml=
sink:
- type: elasticsearch
config:
properties:
...
dsl: |
{
"script": {
"source": "ctx._source.value = params.value;",
"lang": "painless",
"params": %{*}
},
"upsert": %{*}
}
mapping:
- name: value
type: string
source: value
```
範例二:下面使用`postgresql`的SQL舉例,`%{*:json}`會被替換成`{"value":"baz"}`。
```yaml=
sink:
- type: postgresql
config:
properties:
...
sql: 'CALL stored_procedures(%{*:json}::json, NULL)'
mapping:
- name: value
type: string
source: value
```
範例三:下面使用`mysql`的SQL舉例,`%{value}`會被替換成`'baz'`。
> 註:內部使用`ColumnParameter`方式帶入,避免`SQL Injection`安全問題。
```yaml=
sink:
- type: mysql
config:
properties:
...
sql: 'INSERT INTO `foodb`.`bar` (`value`) VALUES (%{value});'
mapping:
- name: value
type: string
source: value
```
## 使用環境變數
下面範例,從環境變數取得名稱`TOPIC_NAME`、`KAFKA_BOOTSTRAP`與`ELASTICSEARCH_HOST`的值分別替換`${TOPIC_NAME}`、`${KAFKA_BOOTSTRAP}`與`${ELASTICSEARCH_HOST}`。
```yaml=
namespace: foo.demo
source:
- type: kafka
config:
topic: '${TOPIC_NAME}'
properties:
bootstrap.servers: '${KAFKA_BOOTSTRAP}'
...
sink:
- type: elasticsearch
config:
properties:
host: '${ELASTICSEARCH_HOST}'
```
## 啟用 retry 或 failure 設定
當sink發生失敗的操作時,僅會將資料寫入log。如需進一步的失敗處理,可啟用`on_retry`或`on_failure`設定,指示當發生失敗的操作時,要進行何種處置。以下列出目前支援的處置方式:
- on_retry: 當發生可重試的失敗操作,將資料寫入指定的寫入器。
- 例:操作逾時、連線中斷...等。
- on_failure: 當發生不可重試的失敗操作,將資料寫入指定的寫入器。
- 例:語法錯誤、型別錯誤...等。
> 註:目前的寫入器僅支援kafka與flink log。
下面範例啟用`on_retry`與`on_failure`,當發生操作失敗時,會將其資料寫入kafka。
```yaml=
sink:
- type: elasticsearch
config:
...
mapping:
...
on_retry:
type: kafka
config:
topic: 'retry-topic'
properties:
bootstrap.servers: 'localhost:9092'
...
on_failure:
type: kafka
config:
topic: 'failure-topic'
properties:
bootstrap.servers: 'localhost:9092'
...
```
## Cassandra sink 相關設定
資料輸出類別設定為`cassandra`,設定`properties`提供`host`、`port`、`keyspace`、`table`。
支援資料欄位型別為:
- boolean
- string
- integer
- long
- decimal
- date
- time
- timestamp
- map
- Array型別
- string[]
- short[]
- integer[]
- long[]
- float[]
- double[]
- boolean[]
- decimal[]
```yaml=
sink:
- type: cassandra
config:
properties:
host: 'localhost'
port: 9042
keyspace: 'test'
table: 'test_table'
mapping:
- name: value
type: string
tag:
- 'required'
source: value
- name: create_date
type: timestamp
precision: msec
tag:
- 'required'
source: create_date
```
## Elasticsearch sink 相關設定
資料輸出類別設定為`elasticsearch`,設定`properties`提供`host`、`user`、`password`,以及下面設定:
- method: Elasticsearch REST API 的 method。
- use_bulk: 是否使用 Bulk API 作為寫入。(true/false, 預設值`false`)
- bulk_flush_interval: 觸發寫入的間隔時間。(單位:秒)
- bulk_flush_capacity: 觸發寫入的上限筆數。
- bulk_flush_max_size: 寫入的資料位元上限。
- action: Elasticsearch REST API 的 action。
> 註:目前提供`create`, `delete`, `index`, `update`
- index: 寫入資料的index名稱。
> 註:支援Ruby script
- document_id: 寫入資料的document id。
> 註:支援Ruby script
- dsl: 寫入資料的Elasticsearch DSL。[參考:資料查詢的設定](#資料查詢的設定)
> 註:僅支援 document。
支援資料欄位型別為:
- boolean
- string
- integer
- long
- decimal
- date
- time
- timestamp
```yaml=
sink:
- type: elasticsearch
config:
properties:
host: 'http://localhost:9200'
user: 'admin'
password: 'admin'
method: 'POST'
use_bulk: true
bulk_flush_interval: 10
bulk_flush_capacity: 50
bulk_flush_max_size: 2097152
action: 'update'
index: "data-store-index-#{ctx.value[0..5]}"
document_id: "id:#{ctx.value}"
dsl: |
{
"script": {
"source": "ctx._source.value = params.value;",
"lang": "painless",
"params": %{*}
},
"upsert": %{*}
}
mapping:
- name: value
type: string
tag:
- 'required'
source: value
- name: create_date
type: timestamp
precision: msec
tag:
- 'required'
source: create_date
```
## Kafka source 相關設定
資料輸入類別設定`kafka`,設定資料輸入的來源`topic`,`properties`提供`bootstrap.servers`、`group.id`...等等。
> 註:依據 apache kafka 提供的設定自行增減。
> 註:目前僅支援`json`格式輸入。
- metadata: 資料輸入欄位的設定。[參考:資料欄位映射設定](#資料欄位映射設定)
```yaml=
source:
- type: kafka
config:
topic: 'input-topic'
properties:
bootstrap.servers: 'localhost:9092'
group.id: foo-group
auto.offset.reset: earliest
max.poll.records: 50
max.poll.interval.ms: 86400000
metadata:
- name: create_date
type: timestamp
precision: sec
```
## Kafka sink 相關設定
資料輸出類別設定`kafka`,設定資料輸入的來源`topic`,`properties`提供`bootstrap.servers`、`acks`...等等。
- message_key: 寫入資料的message key。
> 註:支援Ruby script
支援資料欄位型別為:
- boolean
- string
- integer
- long
- decimal
- date
- time
- timestamp
```yaml=
sink:
- type: kafka
config:
topic: 'output-topic'
message_key: "key-#{ctx.value[0..5]}"
properties:
bootstrap.servers: 'localhost:9092'
acks: '-1'
security.protocol: 'PLAINTEXT'
mapping:
- name: value
type: string
tag:
- 'required'
source: value
- name: create_date
type: timestamp
precision: msec
tag:
- 'required'
source: create_date
```
## MySQL sink 相關設定
資料輸出類別設定`mysql`,設定`properties`提供`host`、`user`、`password`。
- sql: 寫入資料的MySQL SQL。
支援資料欄位型別為:
- boolean
- string
- integer
- long
- decimal
- date
- time
- timestamp
```yaml=
sink:
- type: mysql
config:
properties:
host: 'jdbc:mysql://localhost:3306/foodb?allowMultiQueries=true'
user: 'root'
password: 'root'
sql: 'INSERT INTO `foodb`.`bar` (`value`, `create_date`) VALUES (%{value}, %{create_date});'
mapping:
- name: value
type: string
tag:
- 'required'
source: value
- name: create_date
type: timestamp
precision: msec
tag:
- 'required'
source: create_date
```
## PostgreSQL sink 相關設定
資料輸出類別設定`postgresql`,設定`properties`提供`host`、`user`、`password`。
- sql: 寫入資料的PostgreSQL SQL。
支援資料欄位型別為:
- boolean
- string
- integer
- long
- decimal
- date
- time
- timestamp
- json
- Array型別
- string[]
- short[]
- integer[]
- long[]
- float[]
- double[]
- boolean[]
```yaml=
sink:
- type: postgresql
config:
properties:
host: 'jdbc:postgresql://127.0.0.1:5432/foodb'
user: 'root'
password: 'root'
sql: 'INSERT INTO bar (value, create_date) VALUES (%{value}, %{create_date});'
mapping:
- name: value
type: string
tag:
- 'required'
source: value
- name: create_date
type: timestamp
precision: msec
tag:
- 'required'
source: create_date
```
## SQLite sink 相關設定
資料輸出類別設定`sqlite`,設定`properties`提供`host`、`user`、`password`。
- sql: 寫入資料的SQLite SQL。
支援資料欄位型別為:
- boolean
- string
- integer
- long
- decimal
- date
- time
- timestamp
```yaml=
sink:
- type: sqlite
config:
properties:
host: 'jdbc:sqlite:/tmp/Test-#{ctx.create_date.to_s[0..5]}.db'
user: 'sqlite'
password: 'sqlite'
sql: 'CREATE TABLE IF NOT EXISTS bar (value TEXT, create_date INTEGER);INSERT INTO bar (value, create_date) VALUES( %{value}, %{create_date});'
mapping:
- name: value
type: string
tag:
- 'required'
source: value
- name: create_date
type: timestamp
precision: msec
tag:
- 'required'
source: create_date
```