# Apache Kafka
## kafka 解決的問題
**1.** 做為數據資料的緩衝區並分類,避免一瞬間大量資料塞爆 server
**2.** 簡化 server 的程式碼,數據資料來源可能有很多種(MySQL、Flume、網路port...),統一送到kafka統一格式,可以避免 server 寫多種不同格式的資料程式,減少開發成本。

## kafka 兩種模式
### 1. 點對點
> consumer 拉取數據後將數據從 kafka queue 上刪掉

### 2. 發布/訂閱模式(主流)
> 多個 Topics,consumer 拉取數據後不刪除 kafka queue 上的數據,其他 consumers 仍可獲取相同的數據
> 可以處理較複雜的場景

## kafka 基礎架構
* Producer & Consumer 的操作都是對 leader 去操作,除非 leader 掛掉才由 follwer 接手

## kafka 部署
### 基本配置
1. server.properties
```shell=
## 在個別節點中的 kafka 修改以下配置
borker_id = 0 ## 不同節點中的 broker_id 需不同
zookeeper.connect=[node1]:2181,[node2]:2181,[node3]:2181/kafka
log.dirs=/home/yangyo/kafka_2.13-3.7.0/datas
```
* 啟動 & 停止 kafka 集群指令
```shell=
## start
kafka-server-start.sh [-daemon] [server.properties path]
## stop
kafka-server-stop.sh
```
* 啟動 & 停止 zookeeper 指令
```shell=
## start
zookeeper-server-start.sh [zookeeper.properties path]
## stop
zookeeper-server-stop.sh
```
### 集群啟動腳本
:::info
使用腳本前需要先對集群進行 ssh 免密登入
:::
1. 啟動 zookeeper
```bash=
#!/bin/bash
case $1 in
"start")
for i in node1 node2 node3
do
echo "------------- START $i -------------"
ssh yangyo@$i "/home/yangyo/kafka_2.13-3.7.0/bin/zookeeper-server-start.sh /home/yangyo/kafka_2.13-3.7.0/config/zookeeper.properties"
done
;;
"stop")
for i in node1 node2 node3
do
echo "------------- STOP $i -------------"
ssh yangyo@$i "/home/yangyo/kafka_2.13-3.7.0/bin/zookeeper-server-stop.sh"
done
;;
*)
echo "Invalid option. Please use 'start' or 'stop'."
;;
esac
```
2. 啟動 kafka
```bash=
#!/bin/bash
case $1 in
"start")
for i in node1 node2 node3
do
echo "------------- START $i -------------"
ssh yangyo@$i "/home/yangyo/kafka_2.13-3.7.0/bin/kafka-server-start.sh /home/yangyo/kafka_2.13-3.7.0/config/server.properties"
done
;;
"stop")
for i in node1 node2 node3
do
echo "------------- STOP $i -------------"
ssh yangyo@$i "/home/yangyo/kafka_2.13-3.7.0/bin/kafka-server-stop.sh"
done
;;
*)
echo "Invalid option. Please use 'start' or 'stop'."
;;
esac
```
## kafka 三大腳本
### 1. kafka-topics.sh

#### 連接上某台 kafka broker 主機名稱跟端口號
> 只要連接上,集群中其他節點的 topics 都看得到,所以此動作相當於連上集群
```bash=
## 可以多寫幾個主機端口避免某台掛掉連不上
## 列出集群topics
kafka-topics.sh --bootstrap-server node1:9092, node2:9092 --list
```
#### 創建 topics
```bash=
kafka-topics.sh --boostrap-server node1:9092 --topic first_topic --create --partitions 1 --replication-factor 3
```
# Week1
## Kafka數據結構

Kafka會透過序列化模組將key、data value 轉化成二進制儲存到Kafka Partition中(加密),取出時也會透過序列化模組將二進制轉乘原本的key、data value(解密)
## Consumer and Producer
### Producer
外部傳進來Kafka的數據

當Producer將資料送給kafka需要kafka回傳確認資料確實有送到Kafka中,而acks將決定何時回傳
* acks = 0 資料送出去即可不用等待確認(資料很可能loss)
* acks = 1 資料送出去寫入leader後回傳(資料有機會在更新複製Partition時loss)
* acks = 2 資料送出去寫入leader、複製Partition後回傳(資料不會loss)
#### Record(kafka中的一筆訊息)
> 訊息可以一筆筆的傳送,但是在大數量的情境下會消耗掉很大量的網路傳輸成本,因此 Kafka 是批次寫入的,但是批次寫入一定會造成寫入的延遲性,這必須視情況下去考量,看使用場景是 I/O 重要、還是低延遲比較重要。

訊息一律
**key:**
用來分辨要將資料傳到哪個partition,若未設定及採RR分配
**timestamp:**
可根據時間戳抓取某個record的offset從中斷地方繼續讀取
**value:**
通常是JSON格式
### Consumer
消費Kafka的數據


當Consumer group死掉,kafka可根據offset來決定該從哪筆資料重新開始讀
## Topics(分類主題)
### Partition
每個topics中又可切分成多個partition,每個partition中的資料會不停增長,Partition的特性包括
1. 不可修改刪除
2. 經過一段時間(Default: one week)會被自動刪除
3. Partiotions之間互相獨立
4. Partition數目不受限制

## Brokers
Brokers即server
Cluster > Brokers > Topics > Partitions

每個Topics的Partitions可被平行分散在各個Broker

Kafka Client 只要向其中一個Broker發連線請求,就可以得到其他Brokers的資訊並和其他Brokers連線
## Zookeeper(最新版本將不會使用,因為不安全)
1. 用來管理Brokers
2. 當Broker掛掉時用來選出新的leader
3. 傳送消息給Kafka(ex:新的topics、新broker、broker down......)
## Kafka CLI
1. 創建Topic
`kafka-topics.sh --bootstrap-server localhost:9092 --topic <topic's name> --create`
2. 創建Topic並自訂partition數量、複製partion數量(注意不能大於broker數量)
`kafka-topics.sh --bootstrap-server localhost:9092 --topic <topic's name> --create --partitions <數量> --replications factor <數量>`
3. 查看topics list
`kafka-topics.sh --bootstrap-server localhost:9092 --list`
4. 查看topics詳細內容
`kafka-topics.sh --bootstrap-server localhost:9092 --topic <topic's name> --describe`
5. 刪除topics
`kafka-topics.sh --bootstrap-server localhost:9092 --topic <topic's name> --delete`
## Producer CLI
1. 傳送訊息給kafka
`kafka-console-producer.sh --bootstrap-server localhost:9092 --topic <topic's name>`
2. 傳送訊息給kafka增強性質確保資料不流失在leader、replication
`kafka-console-producer.sh --bootstrap-server localhost:9092 --topic <topic's name> --producer-property ack=all`
3. 傳送訊息with key
`kafka-console-producer.sh --bootstrap-server localhost:9092 --topic <topic's name> --property parse.key=true --property key.separator=:`
**:的左邊為key name,右邊為要傳送之內容**
4. 用RR(輪流)將資料送給各個partition
`kafka-console-producer.sh --bootstrap-server localhost:9092 producer-property partitioner.class=org.apache.kafka.client.producer.RoundRobinPartitioner --topic <topic's name>`
## Consumer CLI
1. 向kafka讀取資料
`kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <topic's name>`

2. 從頭讀(根據partition順序)
`kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <topic's name> --from-beginning`

3. 打印每筆資料各種屬性(時間、key、value、存在的partition number)
`kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <topic's name> --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --property print.partition=true --from-beginning`

## Consumer Group CLI

1. 告訴kafka我們正在使用consumer group (多開幾個Terminal Tab run 相同指令,即會產生多個consumer在相同group中)
`kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <topic's name> --group <group's name>`
2. 管理不同的Consumer groups
* List all consumer groups
`kafka-consumer-groups.sh --command.config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --list`
* 查看consumer group 細節
`kafka-consumer-groups.sh --command.config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --describe --group <group's name>`
* reset offset
`
kafka-consumer-groups.sh --command-config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --group my-first-application --reset-offsets --to-earliest --topic third_topic --dry-run
`
`
kafka-consumer-groups.sh --command-config playground.config --bootstrap-server cluster.playground.cdkt.io:9092 --group my-first-application --reset-offsets --to-earliest --topic third_topic --execute
# Week2
## Kafka test
1. **原始配置**
* topics name : test-topic
* partition 數量 : 1
* broker數量 : 1

**producer測試結果**
**avg latency = 136.65 ms**
* 發送1萬條消息
* 每條1024byte
* producer acks = 1
* 不限流

2. **在同一broker上將partition數量變更為3個、acks=0、bactch size從16KB改為32KB**
想法:acks=0producer不必等待leader確認收到訊息回應便可持續送下一筆訊息(缺點為訊息可能在傳送過程中流失),partition改為3,可讓producer平行傳送訊息減少傳遞時間、增加producer一次發送消息的數量,減少發送次數(因為牽扯到I/O)
* topics name : test-topic
* partition 數量 : 3
* broker數量 : 1

**producer測試結果**
**avg latency = 80.59 ms**
* 發送1萬條消息
* 每條1024byte
* producer acks = 0
* batch size = 32KB
* 不限流

3. **linger.ms改為50ms**
**想法**:雖然配置3的batch size提高可以減少producer的I/O次數,但預設的linger.ms為0即Producer會立即將消息發送到Broker,也許一次送不到32KB反而還是無法降低傳送次數,將linger.ms設為50ms,讓更多的消息累積到一個batch中再送,才能真正有效降低傳送次數、I/O次數
* topics name : test-topic
* partition 數量 : 3
* broker數量 : 1
**producer測試結果**
**avg latency = 42.23 ms**
* 發送1萬條消息
* 每條1024byte
* producer acks = 0
* batch size = 32KB
* linger.ms = 50ms
* 不限流

4. **更改batch.size 、 linger.ms、compression.type**
* topics name : test-topic
* partition 數量 : 3
* broker數量 : 1
**producer測試結果**
**avg latency = 11.82 ms**
* 發送1萬條消息
* 每條1024byte
* producer acks = 0
* batch size = 64KB
* linger.ms = 500ms
* compression type = gzip
* 不限流

5. **更改 linger .ms**
* topics name : test-topic
* partition 數量 : 3
* broker數量 : 1
**producer測試結果**
**avg latency = 6.34 ms**
* 發送1萬條消息
* 每條1024byte
* producer acks = 0
* batch size = 64KB
* linger.ms = 1000ms
* compression type = gzip
* 不限流

# Week3
## Kafka
### 優點
1. 擴展性:
資料不會因為被consumer消費而消失,可以隨時增加新的consumer來處理資料
2. 穩定性:
可應付瞬間大流量
3. 保證順序:
保證每個partition中資料讀取是按照順序的(磁碟按照順序讀取比較快),但不保證同時讀取多個partitions的順序
4. 持久性:
資料的持久性,有多個replication可以避免資料流失
### 缺點
1. 成本高:
部署、維運成本較高,吞吐量小不適用
2. 訊息可能亂序:
當broker掛掉時會導致訊息順序亂掉
3. 訊息非即時:
訊息採批次傳送,不絕對即時
### Kafka Connect(資料庫)
source connector : Producer
sink connector : Consumer
要先下載 JDBC Connector
* 需要修改相對應的設定檔 `connect-distributed.properties`
```java
bootstrap.servers=127.0.0.1:9092 // Broker Server 的 IP 位子
group.id=connect-cluster
rest.port=8083 // REST 介面監聽的 port,預設是8083,若是用 connect 的 standalone 模式,預設 port 是 8084。
plugin.path=/usr/local/etc/kafkaConnect // JDBC Connector 所在的絕對路徑
```
* 新增source connector
```java
$ curl -X POST -H 'Content-Type: application/json' -i 'http://127.0.0.1:8083/connectors' \
--data \
'{
"name":"test-upload-source-mysql",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://127.0.0.1:3306/kafka_store?user=root&password=資料庫密碼",
"table.whitelist":"source_users",
"incrementing.column.name": "id",
"mode":"incrementing",
"topic.prefix": "test-mysql-"}
}'
```
**name**:指定新增 connector 的名稱
**config**:指定 connector 的設定資訊
**connector.class**:使用哪個 connector 類別
**connection.url**:連結 Mysql 的 url
**table.whitelist**:下載哪些表格
**incrementing.column .name**:增長的欄位名稱
**mode**:指定 connector 的模式
**topic.prefix**:Kafka會新增一個 Topic,這邊是指令該 Topic 的前綴,最後產生的名稱會是前綴加上表格名稱,Ex. test-mysql-source_users
* 新增Sink Connector
```java
curl -X POST -H 'Content-Type: application/json' -i 'http://127.0.0.1:8083/connectors' \
--data \
'{"name":"test-download-to-mysql","config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://127.0.0.1:3306/target_database",
"connection.user":"root",
"connection.password":"ifalo.net",
"topics":"test-mysql-source_users",
"auto.create":"false",
"insert.mode": "upsert",
"pk.mode":"record_value",
"pk.fields":"id",
"table.name.format": "target_users"}}'
```
**name**:指定新增 connector 的名稱
**config**:新增 connector 的設定資訊
**connector.class**:使用哪個 connector 類別
**connection.url**:Mysql 連接的 url
**topics**:從哪個 topic 讀取資料
**auto.create**:是否自動新建表格
**insert.mode**:寫入的模式,這邊選用 upsert
**pk.mode**:選擇主鍵模式 record_value 從訊息的 value 中取得資料
**pk.fields**:pk 欄位名稱
**table.name.format**:指定輸出到資料庫哪個表格
# KafkaJs
### 安裝KafkaJs套件
```javascript
npm install kafkajs
```
### 基本用法
* 創建一個或多個kafka broker(s)
```javascript
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092', 'localhost:9093', 'localhost:9094'],
})
```
* 創建生產者並傳送訊息
```javascript
const producer = kafka.producer() //這裡的kafka為上面創建之kafka broker
await producer.connect()
await producer.send({
topic: 'test-topic',
messages: [
{ value: 'Hello KafkaJS user!' },
],
})
await producer.disconnect()
```
* 若要一次發送多條訊息給多個topics則使用sendBatch
```javascript
const topicMessages = [
{
topic: 'topic-a',
messages: [{ key: 'key', value: 'hello topic-a' }],
},
{
topic: 'topic-b',
messages: [{ key: 'key', value: 'hello topic-b' }],
},
{
topic: 'topic-c',
messages: [
{
key: 'key',
value: 'hello topic-c',
headers: {
'correlation-id': '2bfb68bb-893a-423b-a7fa-7b568cad5b67',
},
}
],
}
]
await producer.sendBatch({ topicMessages })
```
* 創建消費者並收訊息
```javascript
const consumer = kafka.consumer({ groupId: 'test-group' })
await consumer.connect()
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
value: message.value.toString(),
})
},
})
```
# Producer到Consumer之bottleneck
### 思考
**producer到consumer之間必須經過的部分為何?**
1.網路
2.kafka broker
**可能的瓶頸為何?**
1.網路頻寬
2.硬體資源(CPU、Memory、disk的速度計算能力)
3.kafka 對資料儲存、查找的速度、穩定度
**思考方向**
如何加速kafka broker本身在資料的儲存與查詢,使用的方法為何?
## Kafka與傳統application讀寫查詢方法的不同
1. **讀取、查詢**

Kafka在讀取資料時省略了應用層和OS的資料傳輸,當大量的資料在做讀取時可以加快速度
**另外Kafka在儲存data時採用的是分段的形式,當log檔案到達一定的數量,會產生另一個log檔繼續存放,此時若要查詢某個offset的資料可以直接根據log檔名到該檔案找尋(讀的時候也會將附近的Data一併讀,有spatial locallity的效果),不必每個檔案都讀節省時間**
2. **寫入**

傳統的必須寫兩次,而kafka只須寫一次即可,後面那次是OS在Disk空閒時會自己將Cache中的內容寫入
# Week4
## Consumer Group 如何取數據(大方向)
### 獲取數據

各個broker的offset會存在Cluster中,Consumer可以直接和broker連接根據offset資訊去讀訊息,而broker在做數據傳送時也是採批次傳送
### 再平衡機制

必須先至Zookeeper中取得Cluster中的Partition數量再連至broker去取數據(任何Consumer Group 第一次要進行消費時都必須至Zookeeper去讀Cluster相關資料)
## Kafka 如何分配partition至consumer
**1. Range Strategy**
**方法:**
* 以topic角度出發
* 先將partitions、consumers根據編號順序排好
* partition數/consumer數 = 一個consumer需配到多少個partitions
* 若無法整除,則根據consumer順序每個consumer多分一個partition
**例子:**
有10個partitions、3個topics(每個topics的partitions數分別為334)、3個consumers(皆訂閱3個topics)
partitions按順序排為: [P0 P1 P2] 、 [P3 P4 P5] 、 [P6 P7 P8 P9]
consumers按順序排為: C1 C2 C3
根據topic中有多少partitions、多少consumer訂閱分配,結果為:
C1:P0 P3 P4 P7
C2:P1 P5 P8
C3:P2 P6 P9

**缺點:**
在所有consumer都訂閱相同topics的情況下,若各個topics中partitions數量不同會有負載不均的情形

**2. RoundRobin Strategy**
**此方法可以解決Range Strategy負載過重的問題**
**方法:**
* 以partition角度出發
* 利用hashcode將partitions、consumers排序
* 再利用RR的方式分配partition
**例子:**
有10個partitions、3個consumers
利用hashcode排序結果如下:
partitions按順序排為: P0 P1 P2 P3 P4 P5
consumers按順序排為: C2 C1 C0
C0:P2 P5
C1:P1 P4
C2:P0 P3

**缺點:**
當每個consumer訂閱的topic不同時則無法均勻分配
**3. Sticky Strategy**
* 以consumer角度出發
* 比Range、RR來的複雜
**目的:**
* partition的分配盡量平均
* 當需要rebalance時盡量保持原本的分配,盡量不讓已經被分配的partition分配給別人
* 若兩者衝突則優先保持分配平均
**例子:**
3個consumers(C0 C1 C2)訂閱4個topics(T0 T1 T2 T3)每個topics有2個partitions(P0 P1)
假設目前分配為:
C0: T0-0 T1-1 T3-0
C1: T0-1 T2-0 T3-1
C2: T1-0 T2-1
此時shutdown C1,使用RR法、Sticky法rebalance結果為:
**RR:**
C0: T0-0 T1-0 T2-0 T3-0
C2: T0-1 T1-1 T2-1 T3-1
**Sticky:**
C0: T0-0 T1-1 T3-0 T2-0
C2: T1-0 T2-1 T0-1 T3-1
**保留原本在C0、C2上的partition**
## 去除掉Zookeeper之新版kafka
用Quorum Controller取代原本用來和zookeeper溝通之單一controller,Quorum中的每個controller都會通過Raft機制來備份元數據
### 傳統controller
Kafka集群中會有一個broker被選舉為Controller負責跟Zookeeper進行交互,它負責管理整個Kafka集群中所有分區和副本的狀態。其他broker監聽Controller節點的數據變化。
**舉例:**
比如當某個分區的leader出現故障時,Controller會為該分區選舉新的leader。當檢測到分區的ISR集合發生變化時,Controller會通知所有broker更新元數據。當某個topic增加分區時,Controller會負責重新分配分區。

* Controller選舉成功後,會從Zookeeper集群中拉取一份完整的元數據初始化ControllerContext,這些元數據緩存在Controller節點。當集群發生變化時,比如增加topic分區,Controller不僅需要變更本地的緩存數據,還需要將這些變更信息同步到其他Broker。
* Controller監聽到Zookeeper事件、定時任務事件和其他事件後,將這些事件按照先後順序暫存到LinkedBlockingQueue中,由事件處理線程按順序處理,這些處理多數需要跟Zookeeper交互,Controller則需要更新自己的元數據。
**缺點:**
當分區數增加時,Zookeeper保存的元數據變多,Zookeeper集群壓力變大,達到一定程度後,監聽延遲增加,給Kafaka的工作帶來了影響。
### KRAFT協議
KRaft是一種共識協議,可以直接在Kafka中管理元數據。元數據的管理被整合到了Kafka當中,不需要使用Zookeeper,大大簡化了Kafka的架構,提高了partitions的可伸縮性和彈性。

將metadata直接存在Raft日誌當中,取消原本只有一個controller的機制,利用多個controller形成一個集合,在leader broker掛掉之後,剩餘多個controller重新選出一個leader負責更新metadata,而其他(稱之為follower)負責接收leader的操作並確保它們的本地日誌和leader的日誌保持一致,去掉了還需從zookeeper獲取metadata的距離,節省很多時間成本、管理成本。
## 新版Kafka如何讓consumer消費距離最近的replica
**方法:**
1. 讓consumer自行根據從broker蒐集到的metadata(rackId, host information)來決定
```java
client.rack=<rack-ID>
```
2. 由broker根據consumer資訊決定該分配哪個replica給哪個consumer
```java
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector
broker.rack=<region>
```
**在Kafka中,當一個Consumer向Broker註冊時,Broker會根據Consumer的註冊信息進行分區的分配。Broker會首先查詢所有的分區副本所在的Broker的網絡位置,然後根據Consumer的網絡位置和分區副本的網絡位置進行比較,選擇距離最近的分區副本分配給Consumer。在這個過程中,Broker會考慮多種因素,例如分區的領導者、副本的分佈情況、Consumer的網絡位置等,如果Consumer和Broker在同一rack內,它們之間的網絡延遲將會比跨rack的網絡延遲要小,因此可以提高消費者的消費速度和效率。**