<style type='text/css'> h2:before { content:"¶ "; color:lightgray } .underline { text-decoration: underline; text-decoration-color: red; } </style> **August 2021** | **v1.0** Bofry/flink/plugins/DataBroker 快速入門指南 ================================== ###### tags: `flink-databroker` `tutorial` ---------------------------------------------------------------- #### 目錄 - [簡介](#簡介) - [環境要求](#環境要求) - [快速入門](#快速入門) - [進階說明](#進階說明) ---------------------------------------------------------------- ## 簡介 ⠿ **Bofry/flink/plugins/DataBroker** 是使用 [*flink/DataStream API*](https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/overview/) 作為主體,專門提供開發者快速建置 *Archiver* 應用服務為目的而製作的插件。主要特色如下: 1. <span class="underline">**非全功能的插件**</span>,主要目的是快速開發 *Archiver* 服務。 2. 插件遵循 <span class="underline">[**DataStream Connectors**](https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/overview/) **標準**</span>來作為應用程式開發配置的原則。 3. *Bofry/flink/plugins/DataBroker* 所支援的內建功能: 1. 通用的應用程式設定(*config*)處理;支援解析環境變數、*JSON*、*YAML* 以及命令列參數。 2. 使用*URL*、*Path*來提供,應用程式設定(*config*)。 3. 提供資料儲存 *Model* 的 3-1. 來源名稱定義 (*source*) 3-2. 型別 (*type*) 3-3. 輸出名稱定義 (*name*) 3-4. 檢核驗證 (*tag*) 3-5. 預設值 (*default*) [🔝回目錄](#%e7%9b%ae%e9%8c%84) ---------------- ## 環境要求 1. Apache Flink 1.13.0 以上。 kakfa 5.3.1 以上。 >註:可使用Docker,下載 [Docker-compose](#下載) 2. 啟動 Docker。 3. DataBroker Jar 下載。 >註:下載 [DataBroker.jar](#下載) [🔝回目錄](#%e7%9b%ae%e9%8c%84) ---------------- ## 快速入門 ⠿ 本節提供快速建構 **DataBroker** 服務的基本步驟。 - **步驟一:建立yaml設定檔** ⠿ 於啟動 Docker-compose 路徑的 /shard/fink/ 目錄內,建立名稱為 **KafkaToStdout.yaml** 的檔案。 ```bash= $ cd ${docker-compose}/shared/flink $ touch KafkaToStdout.yaml ``` - **步驟二:編輯yaml設定檔內容** ⠿ 編輯 **KafkaToStdout.yaml** 檔,將下方內容複製進入並儲存。 ```bash= $ vi KafkaToStdout.yaml ``` ⠿ 設定檔內容: ```yaml= namespace: com.bcow source: - type: kafka config: topic: 'source-topic-dev' properties: bootstrap.servers: 'kafka:9091' group.id: com.bcow.test auto.offset.reset: earliest max.poll.records: 500 sink: - type: stdout config: prefix: 'test1 Say:' mapping: - name: player type: string tag: - 'required' - 'non_empty' source: player default: jj - name: vendor type: string tag: - 'required' source: provider default: pp - name: bet type: decimal tag: - 'required' - 'non_zero' - 'non_negative_integer' source: bet default: 999.99 ``` - **步驟三:上傳 DataBroker Jar 至 Flink** 1. 瀏覽器開啟`Apache Flink Web Dashboard` ```yaml= # default url http://localhost:8081 ``` 2. 點擊左側功能清單的`Submit New Job`。 3. 點擊右方的`+ Add NEW`按鈕。 4. 選擇`DataBroker-0.1.jar`檔案進行上傳。 - **步驟四:啟動 DataBroker Job** 1. 進入`Submit New Job` 頁面。 2. 點選展開`DataBroker-0.1.jar`Job。 3. 於`Program Arguments`輸入參數。 ```yaml= --conf /shared/KafkaToStdout.yaml ``` 4. 點擊`Submit`啟動。 5. 頁面自動跳轉`Flink Streaming Job`,確認狀態是在`Running`。 - **步驟五:寫入來源 kafka 的 example 資料** ⠿ 使用以下內容發送至**kafka topic** :`source-topic-dev`內。 ```yaml= 123:{"player":"KK","provider":"10547","bet":666.66} ``` - **步驟六:確認 DataBroker Job 正常輸出至 Stdout** ⠿ 使用`Docker logs`觀看`Stdout`輸出內容。 ```bash $ docker logs taskmanager-1 ``` ⠿ 輸出內容: ```bash test1 Say:{"player":"KK","vendor":"10547","bet":666.66} ``` >備註: >快速入門情境為:kafka -> DataBroker Job -> stdout >```yaml= >#用語定義: >source : 指資料來源,通常是流。(如: kafka,...) >sink : 指資料接收器,可能是另一個流,或是儲存體。(如: kafka, stdout, mysql,...) > >``` [🔝回目錄](#%e7%9b%ae%e9%8c%84) ---------------- ## 進階說明