# Apache Flink - DataBroker 資料代理 ###### tags: `data-stream` `data-broker` ## DataBroker 簡介 <!-- TODO 他來做什麼? --> DataBroker 透過管道的方式設計介接各式服務的資料流,提供資料路由以及各式服務的連接器,能夠提供彈性的資料閘道服務設計,輕易整合至既有系統中。並且使用容易理解的`YAML`格式設定,來實現設定即程式碼的目的。 ![](https://images2.imgbox.com/f8/36/ZYnb9AL0_o.png) ### 產品特色 - 提供多種輸入資料流與輸出資料流。 - 支援資料流路由,容易改造資料流。 - 提供資料過濾機制。 - 提供資料欄位映射,輕易轉換與驗證。 - 提供失敗重送與失敗處理機制。 - 設定即程式碼(Configuration as Code),易使用。 ## 設定檔結構 設定文件中提供的三個全域設定元素:`namespace`、`source`和`sink`,分別對應基本設定命名項、資料輸入流項,以及資料輸出流相關的設定項。 下面提供的一個摘要設定文件: ```yaml= namespace: foo.demo source: - type: kafka route: foo sink: - type: mysql route: !regex { pattern: '^[a-zA-Z0-9._.-]+'} filter: "ctx.bar == 'baz'" on_retry: ... on_failure: ... ``` ### namespace 區段 > 命名該設定的名稱。命名規範與Java package的一致;長度限制為`128`。 ### source 區段 > 輸入流的相關設定。 - type: 資料輸入流的類別。 - route: 路由名稱。[參考:資料路由設定](#資料路由設定) - config: 資料輸入流的相關設定。 - properties: 設定檔。 - metadata: 資料輸入欄位的設定。[參考:資料欄位映射設定](#資料欄位映射設定) ### sink 區段 > 輸出流的相關設定。 - type: 資料輸出流的類別。 - route: 路由匹配得設定。[參考:資料路由設定](#資料路由設定) - filter: 資料過濾設定。[參考:資料欄位檢核設定](#資料欄位檢核設定) - config: 資料輸出流的相關設定。 - properties: 設定檔。 - mapping: 資料輸出欄位的設定。[參考:資料欄位映射設定](#資料欄位映射設定) - on_retry: 重送機制設定。[參考:啟用 retry 或 failure 設定](#啟用-retry-或-failure-設定) > 註:目前只支援`elasticsearch`。 - on_failure: 失敗機制設定。 ## 資料路由設定 路由的設定是成對的設定,`source`定義路由名稱,`sink`定義匹配規則。 下面範例定義了兩個輸入流及兩個輸出流,其中`source-1`定義路由名稱`foo`,`source-2`定義路由名稱`bar`;其中`sink-1`接收路由名稱為`foo`與`baz`的輸入流,故能接收來自`source-1`的資料,但不會接收來自`source-2`的資料;而`sink-2`會接收到全部資料。 ```yaml= source: - type: source-1 route: foo - type: source-2 route: bar sink: - type: sink-1 route: !regex { pattern: '^(foo||baz)$'} - type: sink-2 route: !regex { pattern: '^[a-zA-Z0-9._.-]+$'} ``` ## 資料欄位映射設定 資料欄位映射設定將資料結構化,依照欄位的型別進行資料轉換與目的資料結構相容,且提供欄位檢核或預設值。 欄位映射是依照`source`的`metadata`屬性與`sink`的`mapping`屬性成對設定。`source`的`metadata`屬性可省略。 - metadata: 資料輸入欄位設定 - name: 名稱 - type: 型別 - precision: 精度 > 註:適用於 timestamp 類型,可定義 sec, msec。 - mapping: 資料輸出欄位設定 - name: 名稱 - type: 型別 - precision: 精度 > 註:適用於 timestamp 類型,可定義 sec, msec。 - tag: 欄位檢核 - source: 資料輸入欄位名稱 - default: 預設值 下面範例定義了資料輸出的欄位其名稱為`value`,型別為`string`,來源欄位名稱`value`,預設值為`baz`。 ```yaml= source: - type: kafka config: ... sink: - type: elasticsearch config: ... mapping: - name: value type: string source: value default: baz ``` 下面範例定義了資料輸出至`elasticsearch`sink,其中欄位`create_date`精度為`msec`的`timestamp`型別,將寫入`create_date_src`的值,並轉換其值的精度由`sec`轉換為`msec`。 ```yaml= source: - type: kafka config: ... metadata: - name: create_date_src type: timestamp precision: sec sink: - type: elasticsearch config: ... mapping: - name: create_date type: timestamp precision: msec source: create_date_src ``` ## 資料欄位檢核設定 針對來源欄位的基礎檢核,於`tag`設定,可組合多個檢核。 - required: - 必填。 > 註:支援所有欄位型別。 - non_empty: - 不為空,允許為null。 > 註:僅支援 string 型別。 - non_zero: - 數值不允許為0,允許正、負數、null。 > 註:僅支援 number 欄位。 - non_negative_integer: - 數值不允許為負數,允許正數、null。 > 註:僅支援 number 欄位。 下面範例定義了名稱為`id`的欄位,型別為`long`,其欄位為必填,且不可為零和負數。 ```yaml mapping: - name: id type: long tag: - 'required' - 'non_zero' - 'non_negative_integer' source: id ``` ## 資料過濾設定 資料過濾的設定,於`sink`下,針對映射的欄位做邏輯判斷(`Ruby script`),決定是否處理該筆資料。 下面範例定義了兩個映射欄位,分別為`value`、`create_data`;資料過濾對`value`內的值使用判斷等於`baz`才進行處理,否則忽略該筆資料。 ```yaml= sink: - type: elasticsearch filter: "ctx.value == 'baz'" config: ... mapping: - name: value type: string source: value - name: create_date type: timestamp source: create_date ``` ## 資料查詢佔位符表示方式 查詢語句支援佔位符標籤,用於替換參數值,可以使用欄位名稱替換個別欄位,亦可使用`*`號替換整個文件。 佔位符表示式: 1. 替換個別欄位,使用欄位名稱表示;下面將替換為`value`欄位的值。 ```java= %{value} ``` 2. 替換整個文件,使用`*`表示。 ```java= %{*} ``` 3. 替換整個文件並使用json格式化。 ```java= %{*:json} ``` 範例一:下面使用`elasticsearch`的DSL舉例,`%{*}`會被替換成`{"value":"baz"}`。 > 註:目前`elasticsearch`僅支援`document`,不支援`handler`。 ```yaml= sink: - type: elasticsearch config: properties: ... dsl: | { "script": { "source": "ctx._source.value = params.value;", "lang": "painless", "params": %{*} }, "upsert": %{*} } mapping: - name: value type: string source: value ``` 範例二:下面使用`postgresql`的SQL舉例,`%{*:json}`會被替換成`{"value":"baz"}`。 ```yaml= sink: - type: postgresql config: properties: ... sql: 'CALL stored_procedures(%{*:json}::json, NULL)' mapping: - name: value type: string source: value ``` 範例三:下面使用`mysql`的SQL舉例,`%{value}`會被替換成`'baz'`。 > 註:內部使用`ColumnParameter`方式帶入,避免`SQL Injection`安全問題。 ```yaml= sink: - type: mysql config: properties: ... sql: 'INSERT INTO `foodb`.`bar` (`value`) VALUES (%{value});' mapping: - name: value type: string source: value ``` ## 使用環境變數 下面範例,從環境變數取得名稱`TOPIC_NAME`、`KAFKA_BOOTSTRAP`與`ELASTICSEARCH_HOST`的值分別替換`${TOPIC_NAME}`、`${KAFKA_BOOTSTRAP}`與`${ELASTICSEARCH_HOST}`。 ```yaml= namespace: foo.demo source: - type: kafka config: topic: '${TOPIC_NAME}' properties: bootstrap.servers: '${KAFKA_BOOTSTRAP}' ... sink: - type: elasticsearch config: properties: host: '${ELASTICSEARCH_HOST}' ``` ## 啟用 retry 或 failure 設定 當sink發生失敗的操作時,僅會將資料寫入log。如需進一步的失敗處理,可啟用`on_retry`或`on_failure`設定,指示當發生失敗的操作時,要進行何種處置。以下列出目前支援的處置方式: - on_retry: 當發生可重試的失敗操作,將資料寫入指定的寫入器。 - 例:操作逾時、連線中斷...等。 - on_failure: 當發生不可重試的失敗操作,將資料寫入指定的寫入器。 - 例:語法錯誤、型別錯誤...等。 > 註:目前的寫入器僅支援kafka與flink log。 下面範例啟用`on_retry`與`on_failure`,當發生操作失敗時,會將其資料寫入kafka。 ```yaml= sink: - type: elasticsearch config: ... mapping: ... on_retry: type: kafka config: topic: 'retry-topic' properties: bootstrap.servers: 'localhost:9092' ... on_failure: type: kafka config: topic: 'failure-topic' properties: bootstrap.servers: 'localhost:9092' ... ``` ## Cassandra sink 相關設定 資料輸出類別設定為`cassandra`,設定`properties`提供`host`、`port`、`keyspace`、`table`。 支援資料欄位型別為: - boolean - string - integer - long - decimal - date - time - timestamp - map - Array型別 - string[] - short[] - integer[] - long[] - float[] - double[] - boolean[] - decimal[] ```yaml= sink: - type: cassandra config: properties: host: 'localhost' port: 9042 keyspace: 'test' table: 'test_table' mapping: - name: value type: string tag: - 'required' source: value - name: create_date type: timestamp precision: msec tag: - 'required' source: create_date ``` ## Elasticsearch sink 相關設定 資料輸出類別設定為`elasticsearch`,設定`properties`提供`host`、`user`、`password`,以及下面設定: - method: Elasticsearch REST API 的 method。 - use_bulk: 是否使用 Bulk API 作為寫入。(true/false, 預設值`false`) - bulk_flush_interval: 觸發寫入的間隔時間。(單位:秒) - bulk_flush_capacity: 觸發寫入的上限筆數。 - bulk_flush_max_size: 寫入的資料位元上限。 - action: Elasticsearch REST API 的 action。 > 註:目前提供`create`, `delete`, `index`, `update` - index: 寫入資料的index名稱。 > 註:支援Ruby script - document_id: 寫入資料的document id。 > 註:支援Ruby script - dsl: 寫入資料的Elasticsearch DSL。[參考:資料查詢的設定](#資料查詢的設定) > 註:僅支援 document。 支援資料欄位型別為: - boolean - string - integer - long - decimal - date - time - timestamp ```yaml= sink: - type: elasticsearch config: properties: host: 'http://localhost:9200' user: 'admin' password: 'admin' method: 'POST' use_bulk: true bulk_flush_interval: 10 bulk_flush_capacity: 50 bulk_flush_max_size: 2097152 action: 'update' index: "data-store-index-#{ctx.value[0..5]}" document_id: "id:#{ctx.value}" dsl: | { "script": { "source": "ctx._source.value = params.value;", "lang": "painless", "params": %{*} }, "upsert": %{*} } mapping: - name: value type: string tag: - 'required' source: value - name: create_date type: timestamp precision: msec tag: - 'required' source: create_date ``` ## Kafka source 相關設定 資料輸入類別設定`kafka`,設定資料輸入的來源`topic`,`properties`提供`bootstrap.servers`、`group.id`...等等。 > 註:依據 apache kafka 提供的設定自行增減。 > 註:目前僅支援`json`格式輸入。 - metadata: 資料輸入欄位的設定。[參考:資料欄位映射設定](#資料欄位映射設定) ```yaml= source: - type: kafka config: topic: 'input-topic' properties: bootstrap.servers: 'localhost:9092' group.id: foo-group auto.offset.reset: earliest max.poll.records: 50 max.poll.interval.ms: 86400000 metadata: - name: create_date type: timestamp precision: sec ``` ## Kafka sink 相關設定 資料輸出類別設定`kafka`,設定資料輸入的來源`topic`,`properties`提供`bootstrap.servers`、`acks`...等等。 - message_key: 寫入資料的message key。 > 註:支援Ruby script 支援資料欄位型別為: - boolean - string - integer - long - decimal - date - time - timestamp ```yaml= sink: - type: kafka config: topic: 'output-topic' message_key: "key-#{ctx.value[0..5]}" properties: bootstrap.servers: 'localhost:9092' acks: '-1' security.protocol: 'PLAINTEXT' mapping: - name: value type: string tag: - 'required' source: value - name: create_date type: timestamp precision: msec tag: - 'required' source: create_date ``` ## MySQL sink 相關設定 資料輸出類別設定`mysql`,設定`properties`提供`host`、`user`、`password`。 - sql: 寫入資料的MySQL SQL。 支援資料欄位型別為: - boolean - string - integer - long - decimal - date - time - timestamp ```yaml= sink: - type: mysql config: properties: host: 'jdbc:mysql://localhost:3306/foodb?allowMultiQueries=true' user: 'root' password: 'root' sql: 'INSERT INTO `foodb`.`bar` (`value`, `create_date`) VALUES (%{value}, %{create_date});' mapping: - name: value type: string tag: - 'required' source: value - name: create_date type: timestamp precision: msec tag: - 'required' source: create_date ``` ## PostgreSQL sink 相關設定 資料輸出類別設定`postgresql`,設定`properties`提供`host`、`user`、`password`。 - sql: 寫入資料的PostgreSQL SQL。 支援資料欄位型別為: - boolean - string - integer - long - decimal - date - time - timestamp - json - Array型別 - string[] - short[] - integer[] - long[] - float[] - double[] - boolean[] ```yaml= sink: - type: postgresql config: properties: host: 'jdbc:postgresql://127.0.0.1:5432/foodb' user: 'root' password: 'root' sql: 'INSERT INTO bar (value, create_date) VALUES (%{value}, %{create_date});' mapping: - name: value type: string tag: - 'required' source: value - name: create_date type: timestamp precision: msec tag: - 'required' source: create_date ``` ## SQLite sink 相關設定 資料輸出類別設定`sqlite`,設定`properties`提供`host`、`user`、`password`。 - sql: 寫入資料的SQLite SQL。 支援資料欄位型別為: - boolean - string - integer - long - decimal - date - time - timestamp ```yaml= sink: - type: sqlite config: properties: host: 'jdbc:sqlite:/tmp/Test-#{ctx.create_date.to_s[0..5]}.db' user: 'sqlite' password: 'sqlite' sql: 'CREATE TABLE IF NOT EXISTS bar (value TEXT, create_date INTEGER);INSERT INTO bar (value, create_date) VALUES( %{value}, %{create_date});' mapping: - name: value type: string tag: - 'required' source: value - name: create_date type: timestamp precision: msec tag: - 'required' source: create_date ```