Wei Li
    • Create new note
    • Create a note from template
      • Sharing URL Link copied
      • /edit
      • View mode
        • Edit mode
        • View mode
        • Book mode
        • Slide mode
        Edit mode View mode Book mode Slide mode
      • Customize slides
      • Note Permission
      • Read
        • Only me
        • Signed-in users
        • Everyone
        Only me Signed-in users Everyone
      • Write
        • Only me
        • Signed-in users
        • Everyone
        Only me Signed-in users Everyone
      • Engagement control Commenting, Suggest edit, Emoji Reply
    • Invite by email
      Invitee

      This note has no invitees

    • Publish Note

      Share your work with the world Congratulations! 🎉 Your note is out in the world Publish Note

      Your note will be visible on your profile and discoverable by anyone.
      Your note is now live.
      This note is visible on your profile and discoverable online.
      Everyone on the web can find and read all notes of this public team.
      See published notes
      Unpublish note
      Please check the box to agree to the Community Guidelines.
      View profile
    • Commenting
      Permission
      Disabled Forbidden Owners Signed-in users Everyone
    • Enable
    • Permission
      • Forbidden
      • Owners
      • Signed-in users
      • Everyone
    • Suggest edit
      Permission
      Disabled Forbidden Owners Signed-in users Everyone
    • Enable
    • Permission
      • Forbidden
      • Owners
      • Signed-in users
    • Emoji Reply
    • Enable
    • Versions and GitHub Sync
    • Note settings
    • Note Insights New
    • Engagement control
    • Make a copy
    • Transfer ownership
    • Delete this note
    • Save as template
    • Insert from template
    • Import from
      • Dropbox
      • Google Drive
      • Gist
      • Clipboard
    • Export to
      • Dropbox
      • Google Drive
      • Gist
    • Download
      • Markdown
      • HTML
      • Raw HTML
Menu Note settings Note Insights Versions and GitHub Sync Sharing URL Create Help
Create Create new note Create a note from template
Menu
Options
Engagement control Make a copy Transfer ownership Delete this note
Import from
Dropbox Google Drive Gist Clipboard
Export to
Dropbox Google Drive Gist
Download
Markdown HTML Raw HTML
Back
Sharing URL Link copied
/edit
View mode
  • Edit mode
  • View mode
  • Book mode
  • Slide mode
Edit mode View mode Book mode Slide mode
Customize slides
Note Permission
Read
Only me
  • Only me
  • Signed-in users
  • Everyone
Only me Signed-in users Everyone
Write
Only me
  • Only me
  • Signed-in users
  • Everyone
Only me Signed-in users Everyone
Engagement control Commenting, Suggest edit, Emoji Reply
  • Invite by email
    Invitee

    This note has no invitees

  • Publish Note

    Share your work with the world Congratulations! 🎉 Your note is out in the world Publish Note

    Your note will be visible on your profile and discoverable by anyone.
    Your note is now live.
    This note is visible on your profile and discoverable online.
    Everyone on the web can find and read all notes of this public team.
    See published notes
    Unpublish note
    Please check the box to agree to the Community Guidelines.
    View profile
    Engagement control
    Commenting
    Permission
    Disabled Forbidden Owners Signed-in users Everyone
    Enable
    Permission
    • Forbidden
    • Owners
    • Signed-in users
    • Everyone
    Suggest edit
    Permission
    Disabled Forbidden Owners Signed-in users Everyone
    Enable
    Permission
    • Forbidden
    • Owners
    • Signed-in users
    Emoji Reply
    Enable
    Import from Dropbox Google Drive Gist Clipboard
       Owned this note    Owned this note      
    Published Linked with GitHub
    • Any changes
      Be notified of any changes
    • Mention me
      Be notified of mention me
    • Unsubscribe
    # Kafka 名詞介紹 Kafka 是一個高效、可擴展且具容錯性的分散式串流平台,用來在系統之間以即時方式傳送與處理大量資料訊息。 ![image](https://hackmd.io/_uploads/HJUqRozHlg.png) ## 非常推薦的複習影片 {%preview https://www.youtube.com/watch?v=DU8o-OTeoCc %} 📍 Producer: Write messages to Topics. 📍 Topic: A logical grouping of partitions. Where we publish to and consume from topics in Kafka. 📍 Partition: == Queue. An ordered, immutable sequence of messages that we append to, like a log file. 📍 Broker:The server(physical or virtual) that holds the queues.Each broker can hold multiple queues(partitions) 📍 Kafka Cluster(Brokers): 一組 Kafka 伺服器,負責接收、儲存與傳遞訊息。 📍 Segment:每個 Partition 雖然在邏輯上是一連串有序的訊息,但實際上 Kafka 為了方便儲存與管理,會把每個 Partition 的資料切成一塊一塊的檔案,這些檔案就叫做 Segment。 📍 Consumer: Subscribe one or more topics 從指定的 Topic Partition 中讀取訊息的角色。[AWS ESK 中 Client Machine 負責擔任 Producer, Consumer 的任務](https://hackmd.io/@Yw4GiQ0vTfG2fP9bu5dlzg/BkS0LGcPxl) ![image](https://hackmd.io/_uploads/HyHw-196ye.png) ![image](https://hackmd.io/_uploads/HJdHNk9TJe.png) ```log= Topic └── Partition └── Segment File 1(ex: 00000000000000000000.log) └── Segment File 2(ex: 00000000000000001000.log) └── ... ``` # Kafka message/record structure ![image](https://hackmd.io/_uploads/ryEbM2Grgg.png) # Kafka vs Message Queue {%preview https://youtu.be/w8xWTIFU4C8 %} ![image](https://hackmd.io/_uploads/ry8CwsMrll.png) ![image](https://hackmd.io/_uploads/rJfldjfBgx.png) ![image](https://hackmd.io/_uploads/SyHrOifrgg.png) ![image](https://hackmd.io/_uploads/S1vOdjfree.png) # Partition Leadership & Replication ![image](https://hackmd.io/_uploads/ByPOUyqakl.png) 深橘色是Partition-Leader,淺橘色是Follower-Leader,Leader會跟Follower更新資訊。當Broker 4 掛掉時,Broker 2 的 Topic 1 Partition 4 會變成新的Leader。如下圖。 ![image](https://hackmd.io/_uploads/HyhKOycakg.png) ![image](https://hackmd.io/_uploads/SJ08H2frle.png) # Kafka Data Retention Policy 資料保留原則 Kafka 預設會保留每筆訊息 1 週(7 天),超過7天會以Segment為單位刪除。 ![image](https://hackmd.io/_uploads/B1YDiy5TJg.png) 保留期限可以:全域設定(globally):對所有 topic 使用同一個 retention 設定、每個 topic 單獨設定(per topic) ✅ 設定依據可能來自: 商業決策(business decision):例如保留 1 個月的訂單資料 📍 成本考量(cost factor):保留越久佔用越多磁碟空間 📍 合規性(compliance)需求:如 GDPR,要求特定資料保存或刪除期限 # Producer 流程圖 ![image](https://hackmd.io/_uploads/SJ2742MHgx.png) ![image](https://hackmd.io/_uploads/SkycA19TJx.png) ## 🔹 Step 1: Producer Record 使用者(或應用程式)呼叫 send() 方法,傳送一筆 Producer Record。 ### Record 包含: Topic、(可選的)Partition、Key(會影響分配到哪個 Partition) 、Value(實際的訊息內容) ## 🔹 Step 2: Serializer(序列化器) 把 Key 和 Value 轉成位元組(byte[])格式,因為Kafka 只能處理二進位的資料,以便網路傳輸與儲存。 String → byte[] JSON → byte[] Object → byte[] 你可以自定義 Serializer,例如使用 StringSerializer、JsonSerializer、或 Avro 等格式。 ## 🔹 Step 3: Partitioner(分區器) 決定這筆資料該送到哪個 Partition。 流程如下: 如果 Producer Record 指定了 Partition → 直接用它 如果沒指定但有 Key → Kafka 會用 Key 做 Hash(通常是 murmur2),然後取餘數分配給某個 Partition(保持相同 Key 落在相同 Partition) 如果沒 Key → 隨機或輪詢(round robin) 相同 key 的資料都送到同一個 partition → 保持有序性(例如訂單編號、用戶 ID) ## 🔹 Step 4: Batch 放入對應 Partition Buffer(不是直接送 Kafka Broker!) Kafka Producer 為了提升效能,不會每來一筆資料就立刻送出,而是會先累積成批次(batch): 每個 Partition 有自己的 Batch(記憶體 buffer) Kafka 預設會根據大小(batch.size)或時間(linger.ms)將多筆資料一併送出 好處:合併封包、節省網路開銷 ## 🔹 Step 5: 傳送給 Kafka Broker(當 batch 滿或時間到)送出後: Kafka Broker 接收、寫入對應的 Topic Partition 的 segment file 裡 ![image](https://hackmd.io/_uploads/rykdNhGHgx.png) ## 🔶 Step 6: 下方 Retry / Fail 流程圖 這是 Kafka Producer 的「錯誤處理邏輯」: 送出 batch 給 Kafka broker 時可能失敗(例如網路錯誤) Kafka Producer 會根據設定(如 retries=3)來嘗試重新傳送 如果重試成功 → 回傳 metadata 給應用程式 如果重試次數用完還是失敗 → 拋出例外(throw exception),應用程式要自己處理 # 如何確保 Producer 把資料正確傳入 Broker ![image](https://hackmd.io/_uploads/B1Uaegcp1l.png) 這張圖是在說明 **Kafka Producer 的「確認機制」設定(Acknowledgment, 簡稱 Acks)**,也就是 **Producer 在寫入資料後,要不要等 Kafka 回應,以及等誰的回應**。 這個設定會直接影響: - **資料可靠性(reliability)** - **效能(latency/performance)** - **是否會丟資料(資料安全性)** ## 🔧 Kafka Producer 的 `acks` 參數有三種值: ### ✅ 1. `acks=0`(圖上:Acks 0 / NONE) - **Producer 寫入 Leader broker 後,不等任何回應,馬上回報成功** - Kafka 連確認是否真的寫進去都不管 - ⚠️ 最快,但風險最大 → **可能會丟資料而不自知** 適合: - 非關鍵資料,如即時監控但可接受誤差 ### ✅ 2. `acks=1`(圖上:Acks 1 / LEADER) - **Producer 寫入 Leader broker** - **只要 Leader 回應「我寫入成功」就算完成** - 不保證 follower 是否也成功同步 優點: - 折衷方案,速度不錯,有基本可靠性 - 如果 leader 掛掉,但 follower 尚未同步,**仍可能資料遺失** 適合: - 一般應用,如即時系統,但可接受小量損失 ### ✅ 3. `acks=all` 或 `acks=-1`(圖上:Acks ALL) - **Producer 會等到 Leader 和「所有 in-sync follower」都確認寫入成功後,才算成功** - 保證資料不會丟,只要有一個同步 follower 存在 - 這是 **最安全但最慢** 的選項 圖中的: 1. Producer 先寫入 leader(Broker 101) 2. Leader 把資料同步到 follower(Broker 102、103) 3. 所有 ISR(in-sync replicas)都寫入成功 4. Producer 才收到 ACK 如果某個 follower(像圖中的 Broker 104)**落後沒同步完成,就不算在 ISR 裡,不影響確認流程** 適合: - 需要高度可靠性的場景,如金融、交易系統 --- # Producer -> Kafka -> Consumer 傳送保證的方法 ![image](https://hackmd.io/_uploads/S1AgXecTye.png) 這張圖說明的是 Kafka(或任何訊息系統)中的 **訊息傳遞保證(Delivery Guarantees)**,也就是系統保證訊息被處理「幾次」、「是否會重複」、「會不會漏掉」。 ## 🔒 Kafka 三種 Delivery Guarantee 模式: ### ✅ **1. At Most Once(至多一次)** - 每個訊息**最多傳遞一次** - 如果過程中出錯,**可能會遺失訊息** - 不會重複,但會漏 - 圖中例子:`2, 4, 7` 消失了 📌 常見於:不重要的事件、效能最優先、可容忍遺失的情境 (例如:滑鼠事件、某些 UI log) ### ✅ **2. At Least Once(至少一次)** - 每個訊息**至少會送到一次** - 不會漏資料,但**可能會有重複** - 需要靠下游消費端做 **去重(deduplication)** 圖中例子:`2`、`6` 被重送了一次 📌 常見於:資料不可以遺失,但可以靠後處理去除重複 (例如:訂單系統、Log 收集) ### ✅ **3. Exactly Once(剛好一次)** - 每筆訊息**只被處理一次,不重複、不遺失** - 這是最理想但也最難達成的狀態 - Kafka 支援 Exactly Once Semantic(EOS)要搭配: - **Idempotent Producer**(不重送重複資料) - **Transactional API**(一組操作要嘛全部成功,要嘛都不做) 圖中例子:完美 1~8 無重複、無遺失 📌 常見於:金融轉帳、資料一致性極高要求的場景 ## 🧠 總結對照表: | 模式 | 會漏資料? | 會重複? | 難度 | 適用場景 | |-------------------|------------|----------|------|----------| | At Most Once | ✅ 有可能 | ❌ 不會 | ⭐ | 高效能、低風險 | | At Least Once | ❌ 不會 | ✅ 有可能 | ⭐⭐ | 寧願重複不漏 | | Exactly Once | ❌ 不會 | ❌ 不會 | ⭐⭐⭐ | 關鍵資料一致性 | --- # 實現 Exactly Once 的方法 ![image](https://hackmd.io/_uploads/B18C7xc6yx.png) 什麼是「冪等(Idempotent)」? 冪等的意思是:不管你執行幾次結果都一樣。 ## 🔼 上半部:GOOD — 使用 Idempotent Producer 流程: Producer 傳送 (x, y) 到 Broker Broker 寫入資料到 target partition 成功收到 ACK,完成寫入 ✅ 即使之後斷線重連或重試,Kafka 也能判斷:「這筆資料我已經收過了!」→ 不重複 ## 🔽 下半部:BAD — 沒有使用 Idempotent Producer 後果: 訂單重複處理 使用者被扣款兩次 實時統計數字錯誤 ## ✅ 如何開啟 Kafka 的 Idempotent Producer? 只要設定這一行: `enable.idempotence=true` Kafka 就會自動幫你: 為每個 Producer 分配唯一 ID(PID) 為每個 partition 的訊息自動編號(sequence number) 確保 broker 只接受一次寫入 # Consumer Group 機制 ![image](https://hackmd.io/_uploads/BkhOBe9ake.png) 這張圖解釋 Kafka 是如何將訊息自動分配給多個 Consumer 做「平行處理與負載平衡」 ## ✅ 1. 什麼是平行處理 一組有相同 Group ID 的 Consumers Kafka 保證:一個 Partition 的資料只會由 Group 中的一個 Consumer 處理 ➡️ 這樣就可以避免同樣資料被多次處理 ➡️ 適合 分散式平行處理 ## ✅ 2. 什麼是自動負載平衡 Rebalancing Kafka 會根據 Partition 數量與 Consumer 數量,自動分配誰負責哪個 Partition,例如: Topic X 的 Partition 0、1 給 Consumer a,Partition 2 給 Consumer b Topic Y 的 Partition 0、1、2 全部分配給 Consumer b(可能 a 沒訂閱這個 topic) 如果某個 Consumer 當機,Kafka 會自動把它的 Partition 分配給其他存活的 Consumer → 這叫做 rebalancing ![image](https://hackmd.io/_uploads/BJbfDl5a1x.png) # Kafka Compacted Topics 是什麼? Compacted Topic 會針對每個 key 只保留最新的 value,適合儲存最終狀態資料,節省空間並提高查詢效率。 ![image](https://hackmd.io/_uploads/rk19Pl561l.png) ## ✅ 傳統 Kafka Log(上面那排) Kafka 平常的 topic 是追加式的(append-only),所有訊息都會被儲存一段時間(根據 retention) 每筆資料都有 key 和 value(例如 k1:v1) 但當 key 重複時(如 k1 出現了很多次),仍然保留所有歷史記錄 📌 問題是:有些場景其實只想要保留 每個 key 的「最新狀態」,例如: 訂單最後狀態(已付款 / 已出貨) 使用者最後登入時間 最終配置(latest config) ## ✅ Log Compaction 的目的 Kafka 提供「Log Compaction」功能來達成這件事: Kafka 會掃描 topic 中的訊息,針對相同 key,只保留最新的一筆(舊的會被刪掉) ## ⚙️ 怎麼開啟 Compacted Topic? 你只要在建立 topic 時設定: ```log= kafka-topics.sh --create --topic my-topic \ --config cleanup.policy=compact ``` # 如何搭建Kafka? Kafka Connect API 這張圖是介紹 **Apache Kafka Connect** 的架構與功能,尤其是它如何在 Kafka 與外部資料系統之間進行 **資料匯入(source)與匯出(sink)**,而「Kafka Pipeline」就是整個資料流動的核心。 ![image](https://hackmd.io/_uploads/BJVK0lq6yx.png) ### ✅ **Kafka Connect 是什麼?** > Kafka Connect 是 Kafka 的官方工具,用來「連接外部資料來源與目標系統」,讓你可以把資料自動 **讀入 Kafka 或寫出 Kafka**,不需要自己寫繁瑣的程式。 ### 🔹 左側:**Sources(資料來源)** - 包含常見的資料庫或資料來源: - **JDBC(關聯式資料庫)** - **MongoDB(NoSQL)** - **MySQL** Kafka Connect 會透過 **Source Connector** 將這些資料抓進 Kafka。 ### 🔹 中間上半部:**Kafka Connect API + Connector** - Kafka Connect 核心運作機制:透過「**Connector**」將資料接進接出。 - 每個 Connector 可以對應一個來源(source)或目標(sink)系統。 - 你可以同時掛很多 connector(支援上百個來源與目的地) ### 🔹 中間下方藍色框:**Kafka Pipeline** > 這是資料的「中繼核心」,資料先進 Kafka,再被送出。 - 所有來源資料都會被先寫入 Kafka 的某個 Topic → 這形成了資料流管線(pipeline) - Kafka 本身提供高度擴展性、可重播、容錯等特性,這讓 pipeline 變得非常穩定可靠 ### 🔹 右側:**Sinks(資料目的地)** Kafka 的資料可以經由 **Sink Connector** 被推送到外部系統,例如: - **Elastic(Elasticsearch)**:用來搜尋或分析 - **Cassandra**:NoSQL 資料庫 - **HDFS(Hadoop 分散式檔案系統)**:用於大數據儲存 --- ## 🔁 Kafka Connect 支援哪些重要功能? - ✅ **Fault Tolerant**:容錯、可重啟,保證資料不遺失 - ✅ **Schema Preservation**:搭配 Confluent Schema Registry 可保留資料欄位結構(Schema Evolution) - ✅ **中央化管理**:可以整合進 **Confluent Control Center** 做 UI 管理與監控 ## 📦 Kafka Pipeline 是什麼? Kafka Pipeline 指的就是: > 一整條資料流動的管線:從 **Source 系統 ➜ Kafka Topic ➜ Sink 系統** Kafka Connect 把這些轉換與搬運工作自動化了,而且透過 Kafka 本身的分散式特性,能夠做到: - 高併發 - 彈性擴充 - 資料不中斷的流動 --- # Kafka Connect 架構 ![image](https://hackmd.io/_uploads/r1eq0yb5pye.png) ## 🟩 Worker 節點(worker 1、worker 2...) 每個 worker 是一個 Kafka Connect 節點,會執行任務(task) 每個節點都有: 自己的設定檔(如 connect-distributed.properties) REST API:用來接收 Connector 的操作指令(如新增/刪除 Connector) 被分配到的 Connector 與 Task(可重分配) ## 🧱 Connector & Task 的關係 Connector 是高階的邏輯單位(例如:「我要從 MySQL 匯入資料」) 每個 Connector 可以被拆成多個子任務(Task)並平行執行,分配到不同 worker 上 📌 例如:Connector A 有 Task 1 分配給 Worker 1,Task 2 給 Worker 2 ## 🧩 Config 儲存在 Kafka Topic 裡 所有 connector 的設定資訊(config)都會存在一個 Kafka topic 中(通常叫做 connect-configs) Worker 啟動時會自動從 Kafka 讀取這些設定,然後協調分配任務 Kafka 本身提供儲存、同步與容錯 → 不怕某個 worker 掛掉! ## 🔗 group.id:建立 Connect Cluster 的關鍵 所有想加入同一個 Kafka Connect Cluster 的 worker,都要有相同的 group.id 這樣他們才會協同工作,共同處理所有 Connector 任務 --- # Confluent REST Proxy 的用處 ![image](https://hackmd.io/_uploads/Sk-TPW9TJg.png) 讓 非原生 Kafka 客戶端(例如不使用 Java 的應用程式,或網路受限的系統)也能透過 HTTP / REST API 使用 Kafka! ## 🔸 REST Proxy 是什麼? Kafka 原本是為 Java 應用程式設計,使用 Kafka 原生 Client Library(如 KafkaProducer、KafkaConsumer) 但對於非 Java 環境(如 Python、JavaScript、Node.js、C#)或受限網路環境,使用原生 client 可能不方便 所以 Confluent 提供 REST Proxy,允許你透過簡單的 HTTP 請求操作 Kafka ### 🔹 Kafka 本體(左下) Kafka 正常執行在內部網路內部 ### 🔹 REST Proxy(核心) 作為中介層,允許用戶端透過 REST / HTTP 與 Kafka 溝通 這層可以穿越防火牆(Firewall),使外部系統也能與 Kafka 整合 ### 🔹 Schema Registry(可選,但建議使用) 若使用 Avro、Protobuf 等序列化格式,可以透過 REST Proxy 結合 Schema Registry 保證資料格式一致(避免 producer/consumer 格式不符) ### 🔹 支援的 Client 端: Kafka 原生 Java 應用 繼續用原本 Kafka Client library 直接對 Kafka 操作 非 Java 應用程式(Non-Java Applications) 改透過 REST Proxy 發送或接收訊息 ### 右邊三大重點功能: 🔹 Provides RESTful Interface 讓 Kafka 可以被當作 REST API 使用(例如 POST /topics/my-topic) 🔹 Simplifies Message Creation and Consumption 只要會 HTTP 就能傳訊息、接收訊息,不需要學 Kafka protocol 🔹 Simplifies Administrative Actions 可以用 HTTP 操作 topic、查看 metadata、管理 offset 等 # 即使有設置 Schema Registry 仍會報錯的原因 非常好的問題!即使使用了 Avro、Protobuf + Schema Registry,理論上可以保證資料格式一致,但在實務上還是**可能出現錯誤或不相容的情況**,常見原因如下: ### ① **Producer 沒有使用或正確設定 Schema Registry** - Producer 沒有串接 Schema Registry,就直接把資料序列化後送進 Kafka - 結果格式不合法、缺少 schema ID,Consumer 就會讀不懂,報錯 📌 **常見錯誤:** ``` Missing schema ID or magic byte in message ``` ### ② **Producer 用錯 Schema(寫入與註冊的不一致)** - Producer 可能手動定義了 schema,但與註冊進 Schema Registry 的 schema 不符 - 例如欄位名稱拼錯、資料型別不同(string vs int) 📌 例子: ```json // schema 註冊的是 name: string { "name": 1234 ← Producer 寫成 int } ``` ### ③ **Schema 有變更,但不向後相容** 如果你修改了 schema,但沒有考慮相容性(Compatibility),可能導致: - Producer 送出新版 schema,但 Consumer 是讀舊版資料 - 或 Consumer 預期某欄位必填,但 Producer 沒送(少欄位) 📌 常見錯誤: ``` io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with earlier schema ``` ### ④ **Consumer 沒使用正確的 Deserializer** - Producer 用 Avro 編碼,但 Consumer 沒有用 Avro Deserializer 或沒串接 Schema Registry - 會導致資料無法還原 → 報序列化錯誤(例如 magic byte 不符) 📌 錯誤訊息示範: ``` org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 123 ``` ### ⑤ **網路問題 / Schema Registry 不可用** - 若 Producer 寫入資料時無法連到 Schema Registry,可能 fallback 成無效格式(或直接 fail) - Consumer 無法解析 schema id → 資料無法反序列化 ## 🧠 總結重點: 即使有 Schema Registry,還是可能出錯的原因包含: | 類型 | 原因 | |------|------| | ⚙️ 設定錯誤 | Producer/Consumer 沒正確串接 Schema Registry | | 🛠 格式錯誤 | Producer 寫入錯誤資料型別 / 欄位 | | 🔄 相容性問題 | schema 更新不向後相容 | | 📡 網路錯誤 | 無法連線 Schema Registry | | ❌ 解碼錯誤 | Consumer 沒使用正確的 Deserializer | --- # Confluent Schema Registry 之介紹 Confluent Schema Registry 是 Kafka 架構中的「資料格式守門員」,幫助你在 `多個 Producer/Consumer 之間統一資料格式`、避免資料格式錯誤、並支援版本演進與跨系統協作。 ![image](https://hackmd.io/_uploads/SJUkab5aye.png) ### 🔸 Kafka 的問題背景(圖左上) App 1、App 2 是兩個 Producer 每個 App 都透過 Serializer 將資料編碼成 `Avro / JSON / Protobuf 格式`後送進 Kafka 藍色與橘色塊代表不同格式的訊息 如果格式沒對齊(比如:橘色格式的資料),就會出現驚嘆號 ⚠️(格式錯誤、不相容) ### 🔸 中央關鍵:Schema Registry Schema Registry 會儲存每個 Kafka Topic 對應的 資料結構定義(Schema),並提供這些功能: #### Producer 在寫入前: 會先將自己的 schema 註冊 / 驗證 如果不相容,Schema Registry 就會拒絕 → 避免壞格式寫入 Kafka! #### Consumer 在讀取時: 根據資料內的 schema ID,自動去 Registry 下載對應 schema 這樣就能正確反序列化(deserialize)資料 ### 🔸 Kafka Topic 中的資料 無論是 Avro、JSON Schema、Protobuf,真正儲存在 Kafka 裡的是序列化後的資料 + schema ID。這樣 Consumer 就能確保正確還原成原始格式。 ### 🔸 最右側:資料可供消費的目標(Example Consumers) 如 Elastic、Cassandra、HDFS 等 這些系統可能從 Kafka 擷取資料分析或儲存,都仰賴一致的 schema 結構 ## 流程圖 ![image](https://hackmd.io/_uploads/B112yf5TJl.png) ### ✅ Producer 端(左邊) Producer 要送資料前,先查詢 / 註冊 schema(如下Json檔) 到 Schema Registry ```jsonld= { "type": "record", "name": "User", "fields": [ { "name": "name", "type": "string" }, { "name": "age", "type": "int" } ] } ``` 此 schema 對應每一筆資料如下: ```jsonld= { "name": "Alice", "age": 30 } ``` 如果該 schema 已存在 → 回傳 schema ID 如果是新 schema → Schema Registry 儲存後分配一個新的 ID(如圖中 ID=103) Producer 將資料依據該 schema 做序列化(encode) → 得到 Avro 格式資料 Producer 把 Avro 格式資料送進 Kafka topic #### Kafka 實際儲存的訊息格式(Avro)是: magic byte + schema ID + encoded data 📍 Magic Byte(固定為 0):標記這是 Confluent Avro 格式 📍 Schema ID:對應到 Schema Registry 裡的某個 schema(例如 ID=103) 📍 Avro Data:Producer 根據該 schema 編碼出來的二進位資料 #### Producer 內部有 cache(綠色框): 不用每次都查 Schema Registry,減少延遲與負擔 ### ✅ Kafka Cluster(中間) 多個 Broker 構成 Kafka Cluster,照常接收、儲存、傳遞訊息 不需要知道 schema 的細節,它只是傳遞二進位資料(byte stream) ### ✅ Consumer 端(右邊) Consumer 從 Kafka 讀取一筆訊息 發現內含 schema ID = 103 Consumer 查詢 Schema Registry,取得 ID=103 對應的 schema 如果該 schema 已經被 cache,就不用重查(同樣有 cache) Consumer 根據 schema 解碼資料 成功解析成具欄位結構的物件(如 JSON、物件等) # ksqlDB Kafka 動態Stream data資料庫 一個基於 Kafka 的 即時串流 SQL 引擎,讓你可以用 SQL 來處理 Kafka 資料流,不需要寫 Java、Python 等傳統程式碼。 ![image](https://hackmd.io/_uploads/BklVrzqpJl.png) ![image](https://hackmd.io/_uploads/H1xwrfqpJx.png) # What is Kafka Stream? Kafka Streams 是 Kafka 架構內的一層 處理邏輯(processing layer),它並不取代 Producer / Kafka Cluster / Consumer 架構,而是提供你在這個架構中 進行資料即時轉換(Streaming ETL) 的能力。 ![image](https://hackmd.io/_uploads/rJ-hIGcpkx.png) ✔ 做轉換、過濾、聚合、join ✔ 寫回 Kafka topic(變成處理後的資料流) # 三種針對 Kafka 資料做 ETL 的方式 你可以用 ksqlDB、Kafka Streams(推薦這個) 或 Producer/Consumer API 來針對 Kafka 中的資料進行 ETL 處理,而 Kafka 原生就支援你自由打造各種資料流動架構,不同的 producer/consumer 之間可以靈活組合、互不干擾。 ![image](https://hackmd.io/_uploads/HyCq_G9aJg.png) ![image](https://hackmd.io/_uploads/BJP3dzca1x.png) # 應用情境 ![image](https://hackmd.io/_uploads/r1g3MUhGHlx.png) ![image](https://hackmd.io/_uploads/Hk-4UnGBgg.png) ![image](https://hackmd.io/_uploads/rJp2I2Grlg.png) ![image](https://hackmd.io/_uploads/HkbyP2fHex.png) ![image](https://hackmd.io/_uploads/HkJdvnMBgx.png) # 實作需考慮的要點! 這個非常重要,建議看文章比較容易看懂。 {%preview https://www.hellointerview.com/learn/system-design/deep-dives/kafka %} ## 1. Scalability ### 🔹 message <1MB preferred! ### 🔹 當你使用 Kafka 時,通常會專注在如何擴展「topic」,而不是整個 Kafka 集群(cluster)。這是因為每個 topic 的流量和需求可能差異很大。 有些 topic 流量很大(high throughput),需要被「分割成很多 partitions」,這樣這些 partitions 就能分散到不同的 broker 上處理,達到分散負載、提升處理能力。但也有些 topic 流量很小,可能一個 broker 就能輕鬆處理完,不需要額外的擴展。 **每個 partition 都會有一個 leader**,不是整個 topic 只有一個 leader! Kafka 的架構是: 一個 topic 可以有 **多個 partitions**(例如 10 個)。 每個 partition 都會有: * 一個 **leader partition**(負責處理讀寫請求) * 零個或多個 **follower partitions**(用來做備援同步) 當你把一個 topic 拆成 10 個 partitions 的時候: * Kafka 會將這 10 個 partitions 分配給集群中的不同 brokers。 * **每個 partition 都可以由不同的 broker 當 leader,同時處理 client 的讀寫請求。** 這樣的好處是: * 你就能實現 **平行處理(parallelism)**。 * 多個 consumer(如果你設定成 consumer group)可以分別訂閱不同的 partitions,各自處理各自的資料,**大大提升 throughput 與處理效能**。 #### 🧠 舉個具體例子: 假設有 topic `user-events`,你設定有 3 個 brokers 和 6 個 partitions: | Partition | Leader Broker | | --------- | ------------- | | 0 | Broker 1 | | 1 | Broker 2 | | 2 | Broker 3 | | 3 | Broker 1 | | 4 | Broker 2 | | 5 | Broker 3 | 每個 partition 都有自己的 leader,因此 client 的資料可以被分散寫入不同的 partition,分別由不同的 broker 處理。 --- ### 🔹 為什麼「random partitioning with no key」會失去順序 #### 🔁 Kafka 中的順序保證是怎麼運作的? * **Kafka 保證在同一個 partition 裡的訊息是有順序的**(也就是 FIFO)。 * 但**不同 partitions 之間的訊息順序是無法保證的**。 #### 🎯 所以為什麼「random partitioning with no key」會失去順序? 當你不提供 key,Kafka 會用 round-robin 或 random 的方式**將訊息分散到不同的 partitions**。例如: ```text Message 1 → partition 0 Message 2 → partition 2 Message 3 → partition 1 Message 4 → partition 0 Message 5 → partition 2 ``` 這種分配下來,不同 partition 中的訊息順序彼此是獨立的。雖然每個 partition 內部順序仍是正確的,但**整體來看訊息順序就亂了**。 --- #### 🔍 舉例說明: 假設你有這樣一串訊息: ```text M1, M2, M3, M4, M5 (依序送出) ``` Kafka 分散如下: * M1 → partition 0 * M2 → partition 1 * M3 → partition 2 * M4 → partition 0 * M5 → partition 1 Consumer 端在讀資料時,是**同時從多個 partitions 拉資料**,結果可能變成: ```text M3, M1, M5, M2, M4 ← 順序錯亂 ``` --- #### ✅ 解法:如果你要保留順序 那就要 **使用 message key**,這樣 Kafka 會用 key 的 hash 來決定分到哪個 partition。 只要 **同一個 key 永遠送到同一個 partition**,那麼你就能保證: > 「**相同 key 的訊息是有順序的**」。 這種方式非常常見於: * 使用者操作日誌(key 是 user ID) * 訂單流程追蹤(key 是 order ID) --- VPC vpc-05ebc178cf7f49316 (default) Subnets subnet-03d0389a25443ff40 subnet-05f70c9b3354877de subnet-09935a7562c207a49 Security groups associated with VPC sg-007d8065a8645d400 {%preview https://docs.aws.amazon.com/msk/latest/developerguide/create-client-iam-role.html %}

    Import from clipboard

    Paste your markdown or webpage here...

    Advanced permission required

    Your current role can only read. Ask the system administrator to acquire write and comment permission.

    This team is disabled

    Sorry, this team is disabled. You can't edit this note.

    This note is locked

    Sorry, only owner can edit this note.

    Reach the limit

    Sorry, you've reached the max length this note can be.
    Please reduce the content or divide it to more notes, thank you!

    Import from Gist

    Import from Snippet

    or

    Export to Snippet

    Are you sure?

    Do you really want to delete this note?
    All users will lose their connection.

    Create a note from template

    Create a note from template

    Oops...
    This template has been removed or transferred.
    Upgrade
    All
    • All
    • Team
    No template.

    Create a template

    Upgrade

    Delete template

    Do you really want to delete this template?
    Turn this template into a regular note and keep its content, versions, and comments.

    This page need refresh

    You have an incompatible client version.
    Refresh to update.
    New version available!
    See releases notes here
    Refresh to enjoy new features.
    Your user state has changed.
    Refresh to load new user state.

    Sign in

    Forgot password

    or

    By clicking below, you agree to our terms of service.

    Sign in via Facebook Sign in via Twitter Sign in via GitHub Sign in via Dropbox Sign in with Wallet
    Wallet ( )
    Connect another wallet

    New to HackMD? Sign up

    Help

    • English
    • 中文
    • Français
    • Deutsch
    • 日本語
    • Español
    • Català
    • Ελληνικά
    • Português
    • italiano
    • Türkçe
    • Русский
    • Nederlands
    • hrvatski jezik
    • język polski
    • Українська
    • हिन्दी
    • svenska
    • Esperanto
    • dansk

    Documents

    Help & Tutorial

    How to use Book mode

    Slide Example

    API Docs

    Edit in VSCode

    Install browser extension

    Contacts

    Feedback

    Discord

    Send us email

    Resources

    Releases

    Pricing

    Blog

    Policy

    Terms

    Privacy

    Cheatsheet

    Syntax Example Reference
    # Header Header 基本排版
    - Unordered List
    • Unordered List
    1. Ordered List
    1. Ordered List
    - [ ] Todo List
    • Todo List
    > Blockquote
    Blockquote
    **Bold font** Bold font
    *Italics font* Italics font
    ~~Strikethrough~~ Strikethrough
    19^th^ 19th
    H~2~O H2O
    ++Inserted text++ Inserted text
    ==Marked text== Marked text
    [link text](https:// "title") Link
    ![image alt](https:// "title") Image
    `Code` Code 在筆記中貼入程式碼
    ```javascript
    var i = 0;
    ```
    var i = 0;
    :smile: :smile: Emoji list
    {%youtube youtube_id %} Externals
    $L^aT_eX$ LaTeX
    :::info
    This is a alert area.
    :::

    This is a alert area.

    Versions and GitHub Sync
    Get Full History Access

    • Edit version name
    • Delete

    revision author avatar     named on  

    More Less

    Note content is identical to the latest version.
    Compare
      Choose a version
      No search result
      Version not found
    Sign in to link this note to GitHub
    Learn more
    This note is not linked with GitHub
     

    Feedback

    Submission failed, please try again

    Thanks for your support.

    On a scale of 0-10, how likely is it that you would recommend HackMD to your friends, family or business associates?

    Please give us some advice and help us improve HackMD.

     

    Thanks for your feedback

    Remove version name

    Do you want to remove this version name and description?

    Transfer ownership

    Transfer to
      Warning: is a public team. If you transfer note to this team, everyone on the web can find and read this note.

        Link with GitHub

        Please authorize HackMD on GitHub
        • Please sign in to GitHub and install the HackMD app on your GitHub repo.
        • HackMD links with GitHub through a GitHub App. You can choose which repo to install our App.
        Learn more  Sign in to GitHub

        Push the note to GitHub Push to GitHub Pull a file from GitHub

          Authorize again
         

        Choose which file to push to

        Select repo
        Refresh Authorize more repos
        Select branch
        Select file
        Select branch
        Choose version(s) to push
        • Save a new version and push
        • Choose from existing versions
        Include title and tags
        Available push count

        Pull from GitHub

         
        File from GitHub
        File from HackMD

        GitHub Link Settings

        File linked

        Linked by
        File path
        Last synced branch
        Available push count

        Danger Zone

        Unlink
        You will no longer receive notification when GitHub file changes after unlink.

        Syncing

        Push failed

        Push successfully