# Apache Kafka ## kafka 解決的問題 **1.** 做為數據資料的緩衝區並分類,避免一瞬間大量資料塞爆 server **2.** 簡化 server 的程式碼,數據資料來源可能有很多種(MySQL、Flume、網路port...),統一送到kafka統一格式,可以避免 server 寫多種不同格式的資料程式,減少開發成本。 ![image](https://hackmd.io/_uploads/HJACHrcDA.png) ## kafka 兩種模式 ### 1. 點對點 > consumer 拉取數據後將數據從 kafka queue 上刪掉 ![image](https://hackmd.io/_uploads/r1gzaHcv0.png) ### 2. 發布/訂閱模式(主流) > 多個 Topics,consumer 拉取數據後不刪除 kafka queue 上的數據,其他 consumers 仍可獲取相同的數據 > 可以處理較複雜的場景 ![image](https://hackmd.io/_uploads/H1SmaSqwR.png) ## kafka 基礎架構 * Producer & Consumer 的操作都是對 leader 去操作,除非 leader 掛掉才由 follwer 接手 ![image](https://hackmd.io/_uploads/rJupC9jwR.png) ## kafka 部署 ### 基本配置 1. server.properties ```shell= ## 在個別節點中的 kafka 修改以下配置 borker_id = 0 ## 不同節點中的 broker_id 需不同 zookeeper.connect=[node1]:2181,[node2]:2181,[node3]:2181/kafka log.dirs=/home/yangyo/kafka_2.13-3.7.0/datas ``` * 啟動 & 停止 kafka 集群指令 ```shell= ## start kafka-server-start.sh [-daemon] [server.properties path] ## stop kafka-server-stop.sh ``` * 啟動 & 停止 zookeeper 指令 ```shell= ## start zookeeper-server-start.sh [zookeeper.properties path] ## stop zookeeper-server-stop.sh ``` ### 集群啟動腳本 :::info 使用腳本前需要先對集群進行 ssh 免密登入 ::: 1. 啟動 zookeeper ```bash= #!/bin/bash case $1 in "start") for i in node1 node2 node3 do echo "------------- START $i -------------" ssh yangyo@$i "/home/yangyo/kafka_2.13-3.7.0/bin/zookeeper-server-start.sh /home/yangyo/kafka_2.13-3.7.0/config/zookeeper.properties" done ;; "stop") for i in node1 node2 node3 do echo "------------- STOP $i -------------" ssh yangyo@$i "/home/yangyo/kafka_2.13-3.7.0/bin/zookeeper-server-stop.sh" done ;; *) echo "Invalid option. Please use 'start' or 'stop'." ;; esac ``` 2. 啟動 kafka ```bash= #!/bin/bash case $1 in "start") for i in node1 node2 node3 do echo "------------- START $i -------------" ssh yangyo@$i "/home/yangyo/kafka_2.13-3.7.0/bin/kafka-server-start.sh /home/yangyo/kafka_2.13-3.7.0/config/server.properties" done ;; "stop") for i in node1 node2 node3 do echo "------------- STOP $i -------------" ssh yangyo@$i "/home/yangyo/kafka_2.13-3.7.0/bin/kafka-server-stop.sh" done ;; *) echo "Invalid option. Please use 'start' or 'stop'." ;; esac ``` ## kafka 三大腳本 ### 1. kafka-topics.sh ![image](https://hackmd.io/_uploads/HyvLHl6wC.png) #### 連接上某台 kafka broker 主機名稱跟端口號 > 只要連接上,集群中其他節點的 topics 都看得到,所以此動作相當於連上集群 ```bash= ## 可以多寫幾個主機端口避免某台掛掉連不上 ## 列出集群topics kafka-topics.sh --bootstrap-server node1:9092, node2:9092 --list ``` #### 創建 topics ```bash= kafka-topics.sh --boostrap-server node1:9092 --topic first_topic --create --partitions 1 --replication-factor 3 ``` # Week1 ## Kafka數據結構 ![](https://i.imgur.com/CZRIiAm.png) Kafka會透過序列化模組將key、data value 轉化成二進制儲存到Kafka Partition中(加密),取出時也會透過序列化模組將二進制轉乘原本的key、data value(解密) ## Consumer and Producer ### Producer 外部傳進來Kafka的數據 ![](https://i.imgur.com/rfcwC7N.png) 當Producer將資料送給kafka需要kafka回傳確認資料確實有送到Kafka中,而acks將決定何時回傳 * acks = 0 資料送出去即可不用等待確認(資料很可能loss) * acks = 1 資料送出去寫入leader後回傳(資料有機會在更新複製Partition時loss) * acks = 2 資料送出去寫入leader、複製Partition後回傳(資料不會loss) #### Record(kafka中的一筆訊息) > 訊息可以一筆筆的傳送,但是在大數量的情境下會消耗掉很大量的網路傳輸成本,因此 Kafka 是批次寫入的,但是批次寫入一定會造成寫入的延遲性,這必須視情況下去考量,看使用場景是 I/O 重要、還是低延遲比較重要。 ![](https://i.imgur.com/w6cRTWx.png) 訊息一律 **key:** 用來分辨要將資料傳到哪個partition,若未設定及採RR分配 **timestamp:** 可根據時間戳抓取某個record的offset從中斷地方繼續讀取 **value:** 通常是JSON格式 ### Consumer 消費Kafka的數據 ![](https://i.imgur.com/3PB7A39.png) ![](https://i.imgur.com/EMtsXVV.png) 當Consumer group死掉,kafka可根據offset來決定該從哪筆資料重新開始讀 ## Topics(分類主題) ### Partition 每個topics中又可切分成多個partition,每個partition中的資料會不停增長,Partition的特性包括 1. 不可修改刪除 2. 經過一段時間(Default: one week)會被自動刪除 3. Partiotions之間互相獨立 4. Partition數目不受限制 ![](https://i.imgur.com/8bTc2D8.png) ## Brokers Brokers即server Cluster > Brokers > Topics > Partitions ![](https://i.imgur.com/bQre0DI.png) 每個Topics的Partitions可被平行分散在各個Broker ![](https://i.imgur.com/vIX4Vmx.png) Kafka Client 只要向其中一個Broker發連線請求,就可以得到其他Brokers的資訊並和其他Brokers連線 ## Zookeeper(最新版本將不會使用,因為不安全) 1. 用來管理Brokers 2. 當Broker掛掉時用來選出新的leader 3. 傳送消息給Kafka(ex:新的topics、新broker、broker down......) ## Kafka CLI 1. 創建Topic `kafka-topics.sh --bootstrap-server localhost:9092 --topic <topic's name> --create` 2. 創建Topic並自訂partition數量、複製partion數量(注意不能大於broker數量) `kafka-topics.sh --bootstrap-server localhost:9092 --topic <topic's name> --create --partitions <數量> --replications factor <數量>` 3. 查看topics list `kafka-topics.sh --bootstrap-server localhost:9092 --list` 4. 查看topics詳細內容 `kafka-topics.sh --bootstrap-server localhost:9092 --topic <topic's name> --describe` 5. 刪除topics `kafka-topics.sh --bootstrap-server localhost:9092 --topic <topic's name> --delete` ## Producer CLI 1. 傳送訊息給kafka `kafka-console-producer.sh --bootstrap-server localhost:9092 --topic <topic's name>` 2. 傳送訊息給kafka增強性質確保資料不流失在leader、replication `kafka-console-producer.sh --bootstrap-server localhost:9092 --topic <topic's name> --producer-property ack=all` 3. 傳送訊息with key `kafka-console-producer.sh --bootstrap-server localhost:9092 --topic <topic's name> --property parse.key=true --property key.separator=:` **:的左邊為key name,右邊為要傳送之內容** 4. 用RR(輪流)將資料送給各個partition `kafka-console-producer.sh --bootstrap-server localhost:9092 producer-property partitioner.class=org.apache.kafka.client.producer.RoundRobinPartitioner --topic <topic's name>` ## Consumer CLI 1. 向kafka讀取資料 `kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <topic's name>` ![](https://i.imgur.com/tNCjPlF.png) 2. 從頭讀(根據partition順序) `kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <topic's name> --from-beginning` ![](https://i.imgur.com/1s6AvdS.png) 3. 打印每筆資料各種屬性(時間、key、value、存在的partition number) `kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <topic's name> --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --property print.partition=true --from-beginning` ![](https://i.imgur.com/qu327jo.png) ## Consumer Group CLI ![](https://i.imgur.com/y6PuYhM.png) 1. 告訴kafka我們正在使用consumer group (多開幾個Terminal Tab run 相同指令,即會產生多個consumer在相同group中) `kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <topic's name> --group <group's name>` 2. 管理不同的Consumer groups * List all consumer groups `kafka-consumer-groups.sh --command.config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --list` * 查看consumer group 細節 `kafka-consumer-groups.sh --command.config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --describe --group <group's name>` * reset offset ` kafka-consumer-groups.sh --command-config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --group my-first-application --reset-offsets --to-earliest --topic third_topic --dry-run ` ` kafka-consumer-groups.sh --command-config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --group my-first-application --reset-offsets --to-earliest --topic third_topic --execute # Week2 ## Kafka test 1. **原始配置** * topics name : test-topic * partition 數量 : 1 * broker數量 : 1 ![](https://i.imgur.com/ng0B3eV.png) **producer測試結果** **avg latency = 136.65 ms** * 發送1萬條消息 * 每條1024byte * producer acks = 1 * 不限流 ![](https://i.imgur.com/gBUyTlm.png) 2. **在同一broker上將partition數量變更為3個、acks=0、bactch size從16KB改為32KB** 想法:acks=0producer不必等待leader確認收到訊息回應便可持續送下一筆訊息(缺點為訊息可能在傳送過程中流失),partition改為3,可讓producer平行傳送訊息減少傳遞時間、增加producer一次發送消息的數量,減少發送次數(因為牽扯到I/O) * topics name : test-topic * partition 數量 : 3 * broker數量 : 1 ![](https://i.imgur.com/iuI6j72.png) **producer測試結果** **avg latency = 80.59 ms** * 發送1萬條消息 * 每條1024byte * producer acks = 0 * batch size = 32KB * 不限流 ![](https://i.imgur.com/nbYLwsD.png) 3. **linger.ms改為50ms** **想法**:雖然配置3的batch size提高可以減少producer的I/O次數,但預設的linger.ms為0即Producer會立即將消息發送到Broker,也許一次送不到32KB反而還是無法降低傳送次數,將linger.ms設為50ms,讓更多的消息累積到一個batch中再送,才能真正有效降低傳送次數、I/O次數 * topics name : test-topic * partition 數量 : 3 * broker數量 : 1 **producer測試結果** **avg latency = 42.23 ms** * 發送1萬條消息 * 每條1024byte * producer acks = 0 * batch size = 32KB * linger.ms = 50ms * 不限流 ![](https://i.imgur.com/O8qoXng.png) 4. **更改batch.size 、 linger.ms、compression.type** * topics name : test-topic * partition 數量 : 3 * broker數量 : 1 **producer測試結果** **avg latency = 11.82 ms** * 發送1萬條消息 * 每條1024byte * producer acks = 0 * batch size = 64KB * linger.ms = 500ms * compression type = gzip * 不限流 ![](https://i.imgur.com/p8vxwhc.png) 5. **更改 linger .ms** * topics name : test-topic * partition 數量 : 3 * broker數量 : 1 **producer測試結果** **avg latency = 6.34 ms** * 發送1萬條消息 * 每條1024byte * producer acks = 0 * batch size = 64KB * linger.ms = 1000ms * compression type = gzip * 不限流 ![](https://i.imgur.com/OSxme8t.png) # Week3 ## Kafka ### 優點 1. 擴展性: 資料不會因為被consumer消費而消失,可以隨時增加新的consumer來處理資料 2. 穩定性: 可應付瞬間大流量 3. 保證順序: 保證每個partition中資料讀取是按照順序的(磁碟按照順序讀取比較快),但不保證同時讀取多個partitions的順序 4. 持久性: 資料的持久性,有多個replication可以避免資料流失 ### 缺點 1. 成本高: 部署、維運成本較高,吞吐量小不適用 2. 訊息可能亂序: 當broker掛掉時會導致訊息順序亂掉 3. 訊息非即時: 訊息採批次傳送,不絕對即時 ### Kafka Connect(資料庫) source connector : Producer sink connector : Consumer 要先下載 JDBC Connector * 需要修改相對應的設定檔 `connect-distributed.properties` ```java bootstrap.servers=127.0.0.1:9092 // Broker Server 的 IP 位子 group.id=connect-cluster rest.port=8083 // REST 介面監聽的 port,預設是8083,若是用 connect 的 standalone 模式,預設 port 是 8084。 plugin.path=/usr/local/etc/kafkaConnect // JDBC Connector 所在的絕對路徑 ``` * 新增source connector ```java $ curl -X POST -H 'Content-Type: application/json' -i 'http://127.0.0.1:8083/connectors' \ --data \ '{ "name":"test-upload-source-mysql", "config":{ "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url":"jdbc:mysql://127.0.0.1:3306/kafka_store?user=root&password=資料庫密碼", "table.whitelist":"source_users", "incrementing.column.name": "id", "mode":"incrementing", "topic.prefix": "test-mysql-"} }' ``` **name**:指定新增 connector 的名稱 **config**:指定 connector 的設定資訊 **connector.class**:使用哪個 connector 類別 **connection.url**:連結 Mysql 的 url **table.whitelist**:下載哪些表格 **incrementing.column .name**:增長的欄位名稱 **mode**:指定 connector 的模式 **topic.prefix**:Kafka會新增一個 Topic,這邊是指令該 Topic 的前綴,最後產生的名稱會是前綴加上表格名稱,Ex. test-mysql-source_users * 新增Sink Connector ```java curl -X POST -H 'Content-Type: application/json' -i 'http://127.0.0.1:8083/connectors' \ --data \ '{"name":"test-download-to-mysql","config":{ "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url":"jdbc:mysql://127.0.0.1:3306/target_database", "connection.user":"root", "connection.password":"ifalo.net", "topics":"test-mysql-source_users", "auto.create":"false", "insert.mode": "upsert", "pk.mode":"record_value", "pk.fields":"id", "table.name.format": "target_users"}}' ``` **name**:指定新增 connector 的名稱 **config**:新增 connector 的設定資訊 **connector.class**:使用哪個 connector 類別 **connection.url**:Mysql 連接的 url **topics**:從哪個 topic 讀取資料 **auto.create**:是否自動新建表格 **insert.mode**:寫入的模式,這邊選用 upsert **pk.mode**:選擇主鍵模式 record_value 從訊息的 value 中取得資料 **pk.fields**:pk 欄位名稱 **table.name.format**:指定輸出到資料庫哪個表格 # KafkaJs ### 安裝KafkaJs套件 ```javascript npm install kafkajs ``` ### 基本用法 * 創建一個或多個kafka broker(s) ```javascript const { Kafka } = require('kafkajs') const kafka = new Kafka({ clientId: 'my-app', brokers: ['localhost:9092', 'localhost:9093', 'localhost:9094'], }) ``` * 創建生產者並傳送訊息 ```javascript const producer = kafka.producer() //這裡的kafka為上面創建之kafka broker await producer.connect() await producer.send({ topic: 'test-topic', messages: [ { value: 'Hello KafkaJS user!' }, ], }) await producer.disconnect() ``` * 若要一次發送多條訊息給多個topics則使用sendBatch ```javascript const topicMessages = [ { topic: 'topic-a', messages: [{ key: 'key', value: 'hello topic-a' }], }, { topic: 'topic-b', messages: [{ key: 'key', value: 'hello topic-b' }], }, { topic: 'topic-c', messages: [ { key: 'key', value: 'hello topic-c', headers: { 'correlation-id': '2bfb68bb-893a-423b-a7fa-7b568cad5b67', }, } ], } ] await producer.sendBatch({ topicMessages }) ``` * 創建消費者並收訊息 ```javascript const consumer = kafka.consumer({ groupId: 'test-group' }) await consumer.connect() await consumer.subscribe({ topic: 'test-topic', fromBeginning: true }) await consumer.run({ eachMessage: async ({ topic, partition, message }) => { console.log({ value: message.value.toString(), }) }, }) ``` # Producer到Consumer之bottleneck ### 思考 **producer到consumer之間必須經過的部分為何?** 1.網路 2.kafka broker **可能的瓶頸為何?** 1.網路頻寬 2.硬體資源(CPU、Memory、disk的速度計算能力) 3.kafka 對資料儲存、查找的速度、穩定度 **思考方向** 如何加速kafka broker本身在資料的儲存與查詢,使用的方法為何? ## Kafka與傳統application讀寫查詢方法的不同 1. **讀取、查詢** ![](https://i.imgur.com/ui5m7hk.png) Kafka在讀取資料時省略了應用層和OS的資料傳輸,當大量的資料在做讀取時可以加快速度 **另外Kafka在儲存data時採用的是分段的形式,當log檔案到達一定的數量,會產生另一個log檔繼續存放,此時若要查詢某個offset的資料可以直接根據log檔名到該檔案找尋(讀的時候也會將附近的Data一併讀,有spatial locallity的效果),不必每個檔案都讀節省時間** 2. **寫入** ![](https://i.imgur.com/mAiCotY.png) 傳統的必須寫兩次,而kafka只須寫一次即可,後面那次是OS在Disk空閒時會自己將Cache中的內容寫入 # Week4 ## Consumer Group 如何取數據(大方向) ### 獲取數據 ![](https://i.imgur.com/NjZNUGI.png) 各個broker的offset會存在Cluster中,Consumer可以直接和broker連接根據offset資訊去讀訊息,而broker在做數據傳送時也是採批次傳送 ### 再平衡機制 ![](https://i.imgur.com/LXGWR3m.png) 必須先至Zookeeper中取得Cluster中的Partition數量再連至broker去取數據(任何Consumer Group 第一次要進行消費時都必須至Zookeeper去讀Cluster相關資料) ## Kafka 如何分配partition至consumer **1. Range Strategy** **方法:** * 以topic角度出發 * 先將partitions、consumers根據編號順序排好 * partition數/consumer數 = 一個consumer需配到多少個partitions * 若無法整除,則根據consumer順序每個consumer多分一個partition **例子:** 有10個partitions、3個topics(每個topics的partitions數分別為334)、3個consumers(皆訂閱3個topics) partitions按順序排為: [P0 P1 P2] 、 [P3 P4 P5] 、 [P6 P7 P8 P9] consumers按順序排為: C1 C2 C3 根據topic中有多少partitions、多少consumer訂閱分配,結果為: C1:P0 P3 P4 P7 C2:P1 P5 P8 C3:P2 P6 P9 ![](https://i.imgur.com/nP0jyuQ.png) **缺點:** 在所有consumer都訂閱相同topics的情況下,若各個topics中partitions數量不同會有負載不均的情形 ![](https://i.imgur.com/gCNq0gg.png) **2. RoundRobin Strategy** **此方法可以解決Range Strategy負載過重的問題** **方法:** * 以partition角度出發 * 利用hashcode將partitions、consumers排序 * 再利用RR的方式分配partition **例子:** 有10個partitions、3個consumers 利用hashcode排序結果如下: partitions按順序排為: P0 P1 P2 P3 P4 P5 consumers按順序排為: C2 C1 C0 C0:P2 P5 C1:P1 P4 C2:P0 P3 ![](https://i.imgur.com/dY8b5pM.png) **缺點:** 當每個consumer訂閱的topic不同時則無法均勻分配 **3. Sticky Strategy** * 以consumer角度出發 * 比Range、RR來的複雜 **目的:** * partition的分配盡量平均 * 當需要rebalance時盡量保持原本的分配,盡量不讓已經被分配的partition分配給別人 * 若兩者衝突則優先保持分配平均 **例子:** 3個consumers(C0 C1 C2)訂閱4個topics(T0 T1 T2 T3)每個topics有2個partitions(P0 P1) 假設目前分配為: C0: T0-0 T1-1 T3-0 C1: T0-1 T2-0 T3-1 C2: T1-0 T2-1 此時shutdown C1,使用RR法、Sticky法rebalance結果為: **RR:** C0: T0-0 T1-0 T2-0 T3-0 C2: T0-1 T1-1 T2-1 T3-1 **Sticky:** C0: T0-0 T1-1 T3-0 T2-0 C2: T1-0 T2-1 T0-1 T3-1 **保留原本在C0、C2上的partition** ## 去除掉Zookeeper之新版kafka 用Quorum Controller取代原本用來和zookeeper溝通之單一controller,Quorum中的每個controller都會通過Raft機制來備份元數據 ### 傳統controller Kafka集群中會有一個broker被選舉為Controller負責跟Zookeeper進行交互,它負責管理整個Kafka集群中所有分區和副本的狀態。其他broker監聽Controller節點的數據變化。 **舉例:** 比如當某個分區的leader出現故障時,Controller會為該分區選舉新的leader。當檢測到分區的ISR集合發生變化時,Controller會通知所有broker更新元數據。當某個topic增加分區時,Controller會負責重新分配分區。 ![](https://i.imgur.com/lREKaZ9.png) * Controller選舉成功後,會從Zookeeper集群中拉取一份完整的元數據初始化ControllerContext,這些元數據緩存在Controller節點。當集群發生變化時,比如增加topic分區,Controller不僅需要變更本地的緩存數據,還需要將這些變更信息同步到其他Broker。 * Controller監聽到Zookeeper事件、定時任務事件和其他事件後,將這些事件按照先後順序暫存到LinkedBlockingQueue中,由事件處理線程按順序處理,這些處理多數需要跟Zookeeper交互,Controller則需要更新自己的元數據。 **缺點:** 當分區數增加時,Zookeeper保存的元數據變多,Zookeeper集群壓力變大,達到一定程度後,監聽延遲增加,給Kafaka的工作帶來了影響。 ### KRAFT協議 KRaft是一種共識協議,可以直接在Kafka中管理元數據。元數據的管理被整合到了Kafka當中,不需要使用Zookeeper,大大簡化了Kafka的架構,提高了partitions的可伸縮性和彈性。 ![](https://i.imgur.com/OHgvcYD.png) 將metadata直接存在Raft日誌當中,取消原本只有一個controller的機制,利用多個controller形成一個集合,在leader broker掛掉之後,剩餘多個controller重新選出一個leader負責更新metadata,而其他(稱之為follower)負責接收leader的操作並確保它們的本地日誌和leader的日誌保持一致,去掉了還需從zookeeper獲取metadata的距離,節省很多時間成本、管理成本。 ## 新版Kafka如何讓consumer消費距離最近的replica **方法:** 1. 讓consumer自行根據從broker蒐集到的metadata(rackId, host information)來決定 ```java client.rack=<rack-ID> ``` 2. 由broker根據consumer資訊決定該分配哪個replica給哪個consumer ```java replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector broker.rack=<region> ``` **在Kafka中,當一個Consumer向Broker註冊時,Broker會根據Consumer的註冊信息進行分區的分配。Broker會首先查詢所有的分區副本所在的Broker的網絡位置,然後根據Consumer的網絡位置和分區副本的網絡位置進行比較,選擇距離最近的分區副本分配給Consumer。在這個過程中,Broker會考慮多種因素,例如分區的領導者、副本的分佈情況、Consumer的網絡位置等,如果Consumer和Broker在同一rack內,它們之間的網絡延遲將會比跨rack的網絡延遲要小,因此可以提高消費者的消費速度和效率。**