# Flink Archiver Plugin Review ###### tags: `data-stream` `tutorial` ## 緣起 盛哥的願望:希望可以有方案來解決「資料存取層寫入」問題。 - 背景說明 - 目前使用微服務架構,配合流式處理大量資料。 - 面臨問題 - 大量資料需經由流式處理來反序列化;因此需要額外的程式碼做欄位驗證、型別轉換...等。 - 擴充缺乏彈性。若增加欄位,必須修改原程式碼、建立單元測試與部署。 - 針對不同的來源與輸出資料庫,則必須要建立新的服務處理。 - 需要花費較多時間撰寫單元測試,面對較多欄位的資料模型,需要花費更多時間處理。 - 預期目的 - 減少開發時因人為錯誤造成的疏失。 - 減少開發的時間成本。(2day -> 0.5day) - 減低測試需求。(不需單元測試,直接 end to end) - 減少服務的數量,方便管理。 - 容易改造資料流,不需要增加新的服務。 ## Flink 簡介 Apache Flink是由Apache軟體基金會開發的開源流處理框架,其核心是用Java和Scala編寫的分布式流數據流引擎。Flink以數據並行和管道方式執行任意流數據程序,Flink的流水線運行時系統可以執行批處理和流處理程序。此外,Flink的運行時本身也支持疊代算法的執行。 Flink提供高吞吐量、低延遲的流數據引擎以及對事件-時間處理和狀態管理的支持。Flink應用程式在發生機器故障時具有容錯能力,並且支持exactly-once語義。程序可以用Java、Scala、Python和SQL等語言編寫,並自動編譯和優化到在集群或雲環境中運行的數據流程序。 Flink並不提供自己的數據存儲系統,但為Amazon Kinesis、Apache Kafka、Alluxio、HDFS、Apache Cassandra和Elasticsearch等系統提供了數據源和接收器。 ## 用語定義 - **source** : 指資料來源,通常是流。(如: kafka,...) - **sink** : 指資料接收器,可能是另一個流,或是儲存體。(如: kafka, stdout, mysql,...) ## Demo - Archiver job program arguments: ```bat --conf http://192.168.0.108/01.yaml --conf http://192.168.0.108/02.yaml --conf http://192.168.0.108/03.yaml --conf http://192.168.0.108/04.yaml ``` ### 情境一:(初步的 Demo archiver plugin 的效果) - source: kafka ```json bcow:{"player":"JJ","provider":"10666"} ``` - 設定檔 ```yaml namespace: com.bcow source: - type: kafka config: topic: 'source-topic-dev' properties: bootstrap.servers: 'kafka:9091' group.id: com.bcow.test sink: - type: stdout config: prefix: 'test1 Say:' mapping: - name: player type: string source: player # tag: '{"required","notEmpty","notNull","notZero"}' # _default: JJ - name: vendor type: string source: provider # tag: '{"required","notEmpty","notNull","notZero"}' # _default: 10547 ``` - sink: stdout ```json test1 Say:{"player":"JJ","vendor":"10666"} ``` ### 情境二:(設定檔支援環境變數的使用) - source: kafka ```json bcow:{"player":"KK","provider":"10777"} ``` - 設定檔 ```yaml namespace: com.bcow source: - type: kafka config: topic: 'source-topic-${Environment}' properties: bootstrap.servers: 'kafka:9091' group.id: com.bcow.test sink: - type: stdout config: prefix: 'test2 Say:' mapping: - name: player type: string source: player # tag: '{"required","notEmpty","notNull","notZero"}' # _default: JJ - name: vendor type: string source: provider # tag: '{"required","notEmpty","notNull","notZero"}' # _default: 10547 ``` - sink: stdout ```json test2 Say:{"player":"KK","vendor":"10777"} ``` ### 情境三:(新增欄位) - source: kafka ```json bcow:{"player":"LL","bet":"9999.99","provider":"10888"} ``` - 設定檔 ```yaml namespace: com.bcow source: - type: kafka config: topic: 'source-topic-${Environment}' properties: bootstrap.servers: 'kafka:9091' group.id: com.bcow.test sink: - type: stdout config: prefix: 'test3 Say:' mapping: - name: player type: string source: player # tag: '{"required","notEmpty","notNull","notZero"}' # _default: JJ - name: vendor type: string source: provider # tag: '{"required","notEmpty","notNull","notZero"}' # _default: 10547 - name: bet type: string source: bet # tag: '{"required","notEmpty","notNull","notZero"}' # _default: 9999.99 ``` - sink: stdout ```json test3 Say:{"player":"LL","bet":"9999.99","vendor":"10888"} ``` ### 情境四:(支援同時多個輸出位置) - source: kafka ```json bcow:{"player":"MM","bet":"9999.99","provider":"10888","timestamp":1628880299123} ``` - sink: stdout, mysql, elasticsearch - 設定檔 ```yaml namespace: com.bcow source: - type: kafka config: topic: 'source-topic-${Environment}' properties: bootstrap.servers: 'kafka:9091' group.id: com.bcow.test sink: - type: stdout config: prefix: 'test4 Say:' mapping: - name: player type: string source: player # tag: '{"required","notEmpty","notNull","notZero"}' # _default: JJ - name: vendor type: string source: provider # tag: '{"required","notEmpty","notNull","notZero"}' # _default: 10547 - type: mysql config: properties: host: 'jdbc:mysql://mysql:3306/testdb?allowMultiQueries=true' user: 'root' password: 'root' sql: 'INSERT INTO `testdb`.`bet` (`player`, `bet`, `create_date`) VALUES (%{player}, %{bet}, %{create_date});' mapping: - name: player type: string source: player # tag: '{"required","notEmpty","notNull","notZero"}' # _default: JJ - name: bet type: string source: bet # tag: '{"required","notEmpty","notNull","notZero"}' # _default: 9999.99 - name: create_date type: timestamp source: timestamp # tag: '{"required","notEmpty","notNull","notZero"}' # _default: 1628880299123 - type: elasticsearch config: properties: host: 'http://elasticsearch:9200' user: 'bcow' password: 'bcow' method: 'GET' index: 'test' partition: '210816' # TODO script action: '/_update' doc_id: 1 #TODO script dsl: | { "script": { "source": "ctx._source.vendor = params.vendor;ctx._source.player = params.player;", "lang": "painless", "params": %s }, "upsert": %s } mapping: - name: player type: string source: player # tag: '{"required","notEmpty","notNull","notZero"}' # _default: JJ - name: vendor type: string source: provider # tag: '{"required","notEmpty","notNull","notZero"}' # _default: 10547 ``` - sink: stdout ```json test4 Say:{"player":"MM","vendor":"10888"} ``` - sink: mysql ```json player | bet | create_date --------+-----------+--------------------- MM | 9999.99 | 2021-08-13 18:44:59 ``` - sink: elasticsearch ```json "hits" : [ { "_index" : "test", "_type" : "_doc", "_id" : "1", "_score" : 1.0, "_source" : { "player" : "MM", "vendor" : "10888" } } ] ``` ## 上版步驟 ### 步驟 1 - plugin 上傳 - 開啟`Flink job manage`頁面。 - 點擊左側功能清單的`Submit New Job`。 - 點擊右方的`+ Add NEW`按鈕。 - 選擇`plugin.jar`檔案上傳。 ### 步驟 2 - 新增Job - 開啟`Flink job manage`頁面。 - 點擊左側功能清單的`Submit New Job`。 - 點選要執行的`plugin`選項。 - 輸入參數設定。 - 點擊`Submit`按鈕,新增`job`。 ------ ## TODO - [ ] 加入 scripting 支援。 - [ ] 實作 **mapping** 區段中 `tag` 支援。 - [ ] 實作 **mapping** 區段中 `default` 支援。 - [ ] 實作 ConfigMap 環境變數支援。 - [ ] 實作 Apache Cassandra sink 支援。 - [ ] 實作 SQLite sink 支援。 - [ ] 實作 Postgres sink 支援。 - [ ] 實作 Apache Kafka sink 支援。 - [ ] 實作 Nats source 支援。