# Flink Archiver Plugin Review
###### tags: `data-stream` `tutorial`
## 緣起
盛哥的願望:希望可以有方案來解決「資料存取層寫入」問題。
- 背景說明
- 目前使用微服務架構,配合流式處理大量資料。
- 面臨問題
- 大量資料需經由流式處理來反序列化;因此需要額外的程式碼做欄位驗證、型別轉換...等。
- 擴充缺乏彈性。若增加欄位,必須修改原程式碼、建立單元測試與部署。
- 針對不同的來源與輸出資料庫,則必須要建立新的服務處理。
- 需要花費較多時間撰寫單元測試,面對較多欄位的資料模型,需要花費更多時間處理。
- 預期目的
- 減少開發時因人為錯誤造成的疏失。
- 減少開發的時間成本。(2day -> 0.5day)
- 減低測試需求。(不需單元測試,直接 end to end)
- 減少服務的數量,方便管理。
- 容易改造資料流,不需要增加新的服務。
## Flink 簡介
Apache Flink是由Apache軟體基金會開發的開源流處理框架,其核心是用Java和Scala編寫的分布式流數據流引擎。Flink以數據並行和管道方式執行任意流數據程序,Flink的流水線運行時系統可以執行批處理和流處理程序。此外,Flink的運行時本身也支持疊代算法的執行。
Flink提供高吞吐量、低延遲的流數據引擎以及對事件-時間處理和狀態管理的支持。Flink應用程式在發生機器故障時具有容錯能力,並且支持exactly-once語義。程序可以用Java、Scala、Python和SQL等語言編寫,並自動編譯和優化到在集群或雲環境中運行的數據流程序。
Flink並不提供自己的數據存儲系統,但為Amazon Kinesis、Apache Kafka、Alluxio、HDFS、Apache Cassandra和Elasticsearch等系統提供了數據源和接收器。
## 用語定義
- **source** : 指資料來源,通常是流。(如: kafka,...)
- **sink** : 指資料接收器,可能是另一個流,或是儲存體。(如: kafka, stdout, mysql,...)
## Demo
- Archiver job program arguments:
```bat
--conf http://192.168.0.108/01.yaml
--conf http://192.168.0.108/02.yaml
--conf http://192.168.0.108/03.yaml
--conf http://192.168.0.108/04.yaml
```
### 情境一:(初步的 Demo archiver plugin 的效果)
- source: kafka
```json
bcow:{"player":"JJ","provider":"10666"}
```
- 設定檔
```yaml
namespace: com.bcow
source:
- type: kafka
config:
topic: 'source-topic-dev'
properties:
bootstrap.servers: 'kafka:9091'
group.id: com.bcow.test
sink:
- type: stdout
config:
prefix: 'test1 Say:'
mapping:
- name: player
type: string
source: player
# tag: '{"required","notEmpty","notNull","notZero"}'
# _default: JJ
- name: vendor
type: string
source: provider
# tag: '{"required","notEmpty","notNull","notZero"}'
# _default: 10547
```
- sink: stdout
```json
test1 Say:{"player":"JJ","vendor":"10666"}
```
### 情境二:(設定檔支援環境變數的使用)
- source: kafka
```json
bcow:{"player":"KK","provider":"10777"}
```
- 設定檔
```yaml
namespace: com.bcow
source:
- type: kafka
config:
topic: 'source-topic-${Environment}'
properties:
bootstrap.servers: 'kafka:9091'
group.id: com.bcow.test
sink:
- type: stdout
config:
prefix: 'test2 Say:'
mapping:
- name: player
type: string
source: player
# tag: '{"required","notEmpty","notNull","notZero"}'
# _default: JJ
- name: vendor
type: string
source: provider
# tag: '{"required","notEmpty","notNull","notZero"}'
# _default: 10547
```
- sink: stdout
```json
test2 Say:{"player":"KK","vendor":"10777"}
```
### 情境三:(新增欄位)
- source: kafka
```json
bcow:{"player":"LL","bet":"9999.99","provider":"10888"}
```
- 設定檔
```yaml
namespace: com.bcow
source:
- type: kafka
config:
topic: 'source-topic-${Environment}'
properties:
bootstrap.servers: 'kafka:9091'
group.id: com.bcow.test
sink:
- type: stdout
config:
prefix: 'test3 Say:'
mapping:
- name: player
type: string
source: player
# tag: '{"required","notEmpty","notNull","notZero"}'
# _default: JJ
- name: vendor
type: string
source: provider
# tag: '{"required","notEmpty","notNull","notZero"}'
# _default: 10547
- name: bet
type: string
source: bet
# tag: '{"required","notEmpty","notNull","notZero"}'
# _default: 9999.99
```
- sink: stdout
```json
test3 Say:{"player":"LL","bet":"9999.99","vendor":"10888"}
```
### 情境四:(支援同時多個輸出位置)
- source: kafka
```json
bcow:{"player":"MM","bet":"9999.99","provider":"10888","timestamp":1628880299123}
```
- sink: stdout, mysql, elasticsearch
- 設定檔
```yaml
namespace: com.bcow
source:
- type: kafka
config:
topic: 'source-topic-${Environment}'
properties:
bootstrap.servers: 'kafka:9091'
group.id: com.bcow.test
sink:
- type: stdout
config:
prefix: 'test4 Say:'
mapping:
- name: player
type: string
source: player
# tag: '{"required","notEmpty","notNull","notZero"}'
# _default: JJ
- name: vendor
type: string
source: provider
# tag: '{"required","notEmpty","notNull","notZero"}'
# _default: 10547
- type: mysql
config:
properties:
host: 'jdbc:mysql://mysql:3306/testdb?allowMultiQueries=true'
user: 'root'
password: 'root'
sql: 'INSERT INTO `testdb`.`bet` (`player`, `bet`, `create_date`) VALUES (%{player}, %{bet}, %{create_date});'
mapping:
- name: player
type: string
source: player
# tag: '{"required","notEmpty","notNull","notZero"}'
# _default: JJ
- name: bet
type: string
source: bet
# tag: '{"required","notEmpty","notNull","notZero"}'
# _default: 9999.99
- name: create_date
type: timestamp
source: timestamp
# tag: '{"required","notEmpty","notNull","notZero"}'
# _default: 1628880299123
- type: elasticsearch
config:
properties:
host: 'http://elasticsearch:9200'
user: 'bcow'
password: 'bcow'
method: 'GET'
index: 'test'
partition: '210816' # TODO script
action: '/_update'
doc_id: 1 #TODO script
dsl: |
{
"script": {
"source": "ctx._source.vendor = params.vendor;ctx._source.player = params.player;",
"lang": "painless",
"params": %s
},
"upsert": %s
}
mapping:
- name: player
type: string
source: player
# tag: '{"required","notEmpty","notNull","notZero"}'
# _default: JJ
- name: vendor
type: string
source: provider
# tag: '{"required","notEmpty","notNull","notZero"}'
# _default: 10547
```
- sink: stdout
```json
test4 Say:{"player":"MM","vendor":"10888"}
```
- sink: mysql
```json
player | bet | create_date
--------+-----------+---------------------
MM | 9999.99 | 2021-08-13 18:44:59
```
- sink: elasticsearch
```json
"hits" : [
{
"_index" : "test",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"player" : "MM",
"vendor" : "10888"
}
}
]
```
## 上版步驟
### 步驟 1 - plugin 上傳
- 開啟`Flink job manage`頁面。
- 點擊左側功能清單的`Submit New Job`。
- 點擊右方的`+ Add NEW`按鈕。
- 選擇`plugin.jar`檔案上傳。
### 步驟 2 - 新增Job
- 開啟`Flink job manage`頁面。
- 點擊左側功能清單的`Submit New Job`。
- 點選要執行的`plugin`選項。
- 輸入參數設定。
- 點擊`Submit`按鈕,新增`job`。
------
## TODO
- [ ] 加入 scripting 支援。
- [ ] 實作 **mapping** 區段中 `tag` 支援。
- [ ] 實作 **mapping** 區段中 `default` 支援。
- [ ] 實作 ConfigMap 環境變數支援。
- [ ] 實作 Apache Cassandra sink 支援。
- [ ] 實作 SQLite sink 支援。
- [ ] 實作 Postgres sink 支援。
- [ ] 實作 Apache Kafka sink 支援。
- [ ] 實作 Nats source 支援。