--- tags: '閱讀筆記 ,Event-Driven' --- # Designing Event-Driven Systems Kafka由Linkedin 開發,後來成立Confluent公司維護 作者是Confluent CTO ![](https://i.imgur.com/4a1otDm.png) ## PART I Setting the Stage ### ch 1 Introduction - Kafka 針對可重播日誌的方法具有兩個優點:(p6) - 容易達到一發生的事件立即做出響應(無需輪詢) - 它提供了一個中央存儲庫,可以將整個數據集推送到可能需要的地方 - Kafka的另一個好處是,它允從數據庫的角度出發 利用stream of data 發展出系統 (P6) ### ch2 The Origins of Streaming - 可以使用KSQL(類似於SQL的流處理語言)在Kafka上構建流處理 - 例如:在Kafka流中,統計一天內用戶每次打開程序和對應崩潰的事件 可以使用單獨的KSQL來 - (a)計算每天每個程序打開的運行次數 - (b)計算每天每個程序崩潰的運行次數 - (c)如果崩潰率與打開超過一定數量 ![](https://i.imgur.com/FP4h555.png) ![](https://i.imgur.com/abYwh5g.png) - The streaming layer is fault-tolerant - Each stream processor node can hold state of its own - Each stream processor can write and store local state - 每個stream processor無須透過網路都可以寫入和存儲本地狀態 同時將數據刷新回Kafka以確保持久性 ::: info Note: [疑問] - 如果上述的local state在kafka外部,**寫入**數據何時準確寫回給Kafka? 在kafka裡面stream processor處理的邏輯應該無法同步 要不然作者怎強調`no network calls needed` - 在ch7 `Writing Through a State Store to a Kafka Topic in Kafka Streams` 章節中,把State Store串接CDC連到Kafka ,這樣組合就是把它當producer 所以如果上述State Store如果只用來寫入或許可以 ::: ### ch 3 Is Kafka What You Think It Is? - Kafka是一個集中event stream platform ,具有足夠的靈活性 可以取代以下三項 - REST Asynchronous - stateless protocol 可以取代 - event bus - kafka有更高的吞吐量 - kafka有cluster - This is emphasized by the core mantra of event-driven services: `Centralize an immut‐able stream of facts. Decentralize the freedom to act, adapt, and change.` - database - 相比傳統DB - 比batch processing更好方案: KSQL and KafkaStreams會一直持續更新 - 比喻成central nervous system ![](https://i.imgur.com/DB97cI2.png) ### ch 4 Beyond Messaging: An Overview of theKafka Broker - The Log: An Efficient Structure for Retaining andDistributing Messages - 可重播日誌(replayable log) - R/W log is O(1) ![](https://i.imgur.com/oB1DXVd.png) - Kafka的分區式(Partitions Partitioning) - 講解備份機制 - Linear Scalability - 一個單一的Kafka集群通常會增長到擁有100個以上節點 - Segregating Load in Multiservice Ecosystems - 配額(quotas)可用於控制共享群集的不同服務的吞吐量分配 - Maintaining Strong Ordering Guarantees - 因為Kafka僅保證分區中的強順序 - 需要partitioning key (ex:CustomerId)來關聯所有資料 ![](https://i.imgur.com/fCVtPnO.png) - 全局排序(global ordering),要用single partition topic 來處理 :::warning [注意] (a)如果producer配置為允許一次發送多個批次 (b)發生較早的批次失敗 (a)+(b)則導致分區中記錄的消息順序可能與producer發送的消息順序不一致 - 為避免這種情況,請將producer配置為一次只發送一批 (`batch.size=1`) ::: - Ensuring Messages Are Durable - 根據數據的重要性,配置replication factor - [補充]https://docs.confluent.io/platform/current/clients/producer.html#configuration - `acks=1` : 確保leader寫入成功 - `acks=all`: producer是否必須等待完整復制的確認 - Load-Balance Services and Make Them HighlyAvailable - 添加consumer後,將提供高可用性(假設topic的partition數量大於現有consumer的數量) - 一個topic的partition只能發送一個consumer;consumer可以接多個partition - 因此即使consumer失敗並重新啟動,也可以保證順序 - Compacted Topics - Compacted Topic是僅保留最新事件的主題(就是當表用) - 因為不需要處理被取代(superseded)的事件,所以這允許更有效的流處理 - Long-Term Data Storage - 經常看到保留或Compacted Topics保存超過100TB的數據 - regular topic - audit(審計) - Event Sourcing - Compacted Topics - 減少總體佔用空間 - 兩者結合 - `latest-versioned pattern` - 利用額外存儲空間用Kafka Streams將它們鏈接在一起 - Security - Kafka支持企業級安全功能 - TLS ## PART II Designing Event-Driven Systems ### ch 5 Events: A Basis for Collaboration - Commands, Events, and Queries - 網絡通信的方式可以分為三類: - command - 命令是動作–請求其他服務執行某些操作,這將更改該系統的狀態 - 它通常指示完成,並可以選擇返回結果 - 適用時機 - 當操作必須同步完成 - 使用業務流程或流程管理器 - event - 事件既是事實又是通知 - 廣播出去,不期望有任何反應 - 適用時機 - 當鬆散耦合很重要時 - 事件流對多個服務處理 - 必須將數據從一個應用程序複製到另一個應用程序時 - 有助於並發執行 - query - 要求查找內容的請求 - 沒有副作用 - 適用時機 - 跨服務邊界的輕量級數據檢索 - 在服務邊界內的重量級數據檢索 - Coupling and Message Brokers - ` loose coupling` 一詞的含義可能不同: - “鬆散耦合減少了當事雙方交換信息時彼此之間做出的假設(assumptions)的數量” - “衡量一個組件更改將對其他組件產生的影響的一種度量” (又名connascence) - Loose coupling 不一定是好;也不是tight coupling也不一定是壞 - Loose coupling使組件可以彼此獨立地進行更改 - tight coupling可以使組件彼此之間獲取更多的價值 - coupling 注意事項 - Interface surface area (提供的數據量) - 用戶數 - 運營穩定性和性能 - 變更頻率 - Essential Data Coupling Is Unavoidable - 例如,如果應用程序需要發送信件,則將需要電子郵件地址 - 程式碼功能可以解耦,但數據可能就無法 - Events may be used for notification, and state transfers - state transfers好處: - 更好的隔離和自治 - 更快的數據訪問 - 允許數據離線使用 ![](https://i.imgur.com/DpCMQ7Q.png) - 使用REST/RPC 好處: - 簡單 - Singleton (State lives in only one place ) - 集中控制 ![](https://i.imgur.com/aF2USpn.png) - The Event Collaboration Pattern - 分為choreographed (event)或 orchestrated (集中,command) - choreographed - 好處 - pluggable - 可以僅對相關服務進行更改,而不會影響其他服務 - orchestrated - 好處 - 整個工作流程都在一個地方用代碼寫下 - ![](https://i.imgur.com/RJ4ZEaQ.png) - Mixing Request- and Event-Driven Protocols - 1個cluster - 前端服務向UI提供REST接口 - 狀態更改作為事件在Kafka上 - 後端服務僅依賴於Kafka ![](https://i.imgur.com/IAFD4BG.png) - 多個cluser - 用DDD劃分 ![](https://i.imgur.com/1J6zubG.png) ### Ch 6 - Processing Event with Stateful Functions - Making Services Stateful - 儘管無狀態很好,但不可避免的是必須存儲某些狀態 - 例如 - 對於Web應用程序,將需要存儲用sessions - 存儲在數據庫中,以使應用程序本身變為無狀態 - 對於流系統,狀態包含在事件流中 - Kafka的Streams API更進一步:它們確保將計算所需的所有數據提前加載到API中 - 無論是事件還是進行查找或擴充所需的任何表 - 三個例子比較(p49-p55) - 案例說明 - 假設我們正在構建一個email service - 監聽訂單的事件流 - email service在支付訂單後會發送確認電子郵件 - 有三種方法可以對事件進行處理 - The Event-Driven Approach - 監聽 - 訂單事件 - 說明 - 對於每個訂單事件,它都會同步調出至付款服務,檢查是否已付款 如果已付款,則發送電子郵件 - ![](https://i.imgur.com/tI4q6fM.png) - 缺點 - 需要不斷查詢另一項服務 - 付款和訂單通常是在大約同一時間創建的 因此,如果在付款已在付款服務中註冊之前訂單事件到達了電子郵件服務 則email service將必須輪詢並等待直到付款被註冊,然後再發送付款電子郵件 :::info Note: 如果被限制一次只能處理一個事件就會有些問題浮出 ::: - The Pure (Stateless) Streaming Approach - streams are buffered,直到兩個事件都到達為止.再可以將它們連接在一起 - ![](https://i.imgur.com/zA8AdFk.png) - 此方法解決了以上事件驅動方法的兩個缺點: - 不再需要不斷查找 - 訂單事件或付款事件是否先到達不再重要 :::warning [注意] 在這種情況下,緩衝區中實際上存儲著狀態 並且當Kafka Streams重新啟動時,它必須重新加載每個緩衝區的內容 ::: :::info Note: 合併多個事件流,轉成有意義的事件 ::: - The Stateful Streaming Approach - 假設情境 - email service還需要查找客戶的電子郵件地址 - 不會有任何近期事件(除非客戶最近更新了他/她的電子郵件地址) - [方案1]透過rest 向customer service 在數據庫中查找電子郵件地址 - ![](https://i.imgur.com/hP2uhaU.png) - [方案2]利用Kafka預加載客戶事件流(其中包含最新的電子郵件地址)到電子郵件服務中以進行本地查詢 - ![](https://i.imgur.com/RFNFuQP.png) - 通過這種方法,電子郵件服務現在是有狀態的 - 優點 - 電子郵件服務不再取決於最壞情況下的性能或客戶服務的活躍程度(用於提供電子郵件地址) - 電子郵件服務可以更快地處理事件,因為不再需要網絡呼叫 - 電子郵件服務可以更自由以數據為中心的操作 - 缺點 - 最壞的情況下,它將必須在重新啟動時重新加載整個數據庫 - The Practicalities of Being Stateful - 用上述例子,強調處理stateful帶來的挑戰 - standby replicas - 確保一個節點上的table or state store 是最新狀態,而且要有 fail over機制 - Disk checkpoints - 定期建立檢查點,以便重新啟動後更快地重新加載狀態 - Compacted topics - 讓資料減少 ### ch7 Event Sourcing, CQRS, and Other StatefulPatterns - CQRS = Command Query Responsibility Segregation - Command - 在異步進行進一步處理(即讀取)之前,每個狀態更改(即寫入)都首先保存到日誌中 - Query - 可以從日誌中導出聚合(aggregate)狀態(通過進行每個狀態更改)以進行查詢 - 好處 - 讀寫分離允許每個對象的單獨優化 - EX: 建立index差異 - 根據寫入不斷增長的日誌建立index - 讀取針對報表設計index - 復原機制 - 如果服務引入了錯誤並破壞了數據 - 因為每個狀態更改都保留在日誌中,所以它就像一個`版本控制系統` 並且不會丟失任何信息,更容易恢復數據 (p59) - 在修復錯誤之後,可以重播日誌,回到錯誤之前重新跑完 (p60) - ![](https://i.imgur.com/gBS0INt.png) - 相對於傳統數據DB,毀壞的數據覆蓋了原始數據,增加了修復的難度 而且修復之後需要更將新數據發送到相關服務 ![](https://i.imgur.com/lnzIInn.png) - Materialized view - 解決方案是預先生成一個視圖,與需求結果一致 - 完全可拋棄的,因為它可以從源數據存儲中完全重建 - 如果數據源變化太快,資源可能會消耗過度 - Event Sourcing 必須用此概念建立當前狀態查詢 - Build In-Process Views with Tables and State Stores in KafkaStreams - Kafka的Streams API提供了用於實現Event sourcing和CQRS的最簡單的機制之一 - 可以直接在Kafka Streams API內部實現view,而無需外部DB! - 最簡單的方法是將Kafka中的事件流轉換成可以在本地查詢的表。 - 例如,將Customer事件流轉換成CustomerId可以查詢的Customer表僅需一行代碼: ``` KTable<CustomerId, Customer> customerTable = builder.table("customer-topic"); ``` - Writing Through a Database into a Kafka Topic with Kafka Connect - 使用Kafka Connect和CDC可以從傳統DB中流式傳輸事件 - 該服務將寫入傳統DB,該DB將通過Kafka Connect發出Kafka消息 下游服務可以相應地響應該消息 - CDC代表更改數據捕獲,並且它插入傳統DB使用的二進制日誌時非常有效 - ![](https://i.imgur.com/AB6fhII.png) - Writing Through a State Store to a Kafka Topic in Kafka Streams - 服務也有可能寫入Kafka state store,並從該存儲發出事件 - ![](https://i.imgur.com/QaHDPrM.png) - 這種方法必不可少的是,用Kafka state store 替換了上面討論的方法中的DB - 好處 - 該DB是本地DB,因此訪問速度更快 - 由於狀態存儲由Kafka Streams包裝,因此它可以參與事務 服務發布並寫入狀態存儲的事件是原子的 - 較少的配置,因為它是單個API - Unlocking Legacy Systems with CDC - CDC可以與舊系統DB一起使用,以實現增量轉移 - Kafka的單個消息轉換可用於將原始數據庫中的原始事件消息轉換為更一致的消息 - Query a Read-Optimized View Created in a Database - 另一個常見的模式是使用Connect API 在DB內部創建一個經過讀取優化的基於事件的視圖 - 使用Kafka Connect可用的接收器連接器,可以在任意數量的不同DB中快速輕鬆地創建此類視圖 - 正如我們在上一節中討論的那樣,這些通常稱為多語言視圖(polyglot views) 它們為廣泛的數據存儲技術的資料廣度 - 例如,數據可以流式傳輸到Elasticsearch中 - Memory Images / Prepopulated Caches - 將整個數據集本地加載到內存中可能是有意義,如果數據集 - 適合內存 - 可以在合理的時間內加載 - 可以使用Kafka Streams內存存儲來實現 - The Event-Sourced View - Materialized view - Event-Sourced View是指一種查詢資源(數據庫,內存映像等) - 該查詢資源是在一項服務中根據另一項服務創作的數據創建的 - 其數據直接來自日誌,並且可以隨時重新生成 ## PART III Rethinking Architecture atCompany Scales ### ch 8 Sharing Data and Services Across anOrganization - 前言 - 隨著組織的發展,有必要將系統分解為解藕組件,這些組件可以單獨處理和擴展(微服務) 否則,開發進度可能會停滯不前,並且應用程序可能會導致性能下降 - 在將系統分解為微服務之後,隨著業務需求的發展,不同的服務將不可避免地需要訪問另一服務所擁有的數據 - Encapsulation Isn’t Always Your Friend - 在無法及時更新數據擁有服務的地方,必須向數據請求服務提供直接訪問數據的權限 從而增加了耦合併降低了微服務的用途 - The REST-to-ETL Problem - 因為交互資料麻煩,所以資料複製方式傳遞(ETL) - 資料轉換一旦發生錯誤就很難處理 - ![](https://i.imgur.com/fnQ9tam.png) - The God Service Problem - 數據擁有服務會慢慢發展為自定義數據庫實現,能夠公開不同的數據結構 變成許多其他服務依賴它 - 這又增加了耦合,並降低了微服務的目的 - ![](https://i.imgur.com/iTfpOB4.png) - 傳統上,微服務使用三種不同的數據共享模式來處理該問題 每種模式都有其自身的一系列問題(P86) - Service interface - 讓服務緊密藕合,難以擴展 - Shared databases - 整體式,不易於擴展 - Messaging - 缺乏歷史紀錄,處理後訊息就消失,難以推廣新應用 - ![](https://i.imgur.com/hJn48n1.png) :::info Note: [疑問] 正常來說,現今微服務架構也不會只用一個工具來建置... ::: - Data on the Inside and Data on the Outside (P85) - 內部數據和外部數據 - 外部數據不易分享 ### ch 9 Event Streams as a SharedSource of Truth - A Database Inside Out - event sourcing + data outside = Event streaming as the source of truth - "將數據庫由內而外"的基本思想是,數據庫具有許多核心組件(commit log,views,indexes,caches),而不是將這些問題混在一起 (P90) - 分拆舉例 - log serves for data flow: 傳統分散式DB - individual query:Redis,SOLR, Hive tables - stream processing systems (trigger and view): Storm和Samza - ![](https://i.imgur.com/MVk8gK3.png) ### ch10 Lean Data - Lean Data概念是應用程序不選擇收集和整理大型數據集,而是精心選擇較小的數據(恰好是它們在某個時間點所需的數據) 這些數據從central event store推送到cache或它們控制的存儲(stores)中 透過輕量級view的操作流劃分顯示結果 - event streams as a source of truth 好處 - 可以快速重建 - 相比傳統ETL(extract, transform, load)技術(P95) - 一收到消息就必須寫入,因為沒有再來一次機會 - 讓大量的數據複製好幾份 - 兩地的資料可能因時間造成Data quality issue - datasets are stored in Kafka(P97) - 可以選擇所需的數據,每個應用程序都可以縮小其視圖並進行讀取優化 - 例如,當處理與產品更新有關的事件流時 inventory service可以僅保留product ID和庫存項目數 可存在memory,並且還減少了耦合 - Rebuilding Event-Source Views - 缺點 - 如果需要更多數據,則需要返回日誌 - 最乾淨的方法是刪除view並從頭開始重建它 - 使用memory建立view,就要考慮這些 - 使用Kafka Streams時,還要多考慮狀態問題(statefulness) - Databases and Caches - 最大問題是重建,選擇寫優化的數據庫或緩存工具 - in-memory database/cache:Redis,MemSQL,Hazelcast。 - memory-optimized database:Couchbase或MongoDB(不用MongoDb 紀錄log) - write/disk optimized,log-structured database : Cassandra或RocksDB。 - Automation and Schema Migration - 數據庫遷移(針對Schema)需要因為表結構處理而調整 - 對於Kafka只要砍掉view重新建立即可 :::info Note: DB遷移或許可以達到不停機方案 ::: - Data Divergence Problem - 數據差異問題是由於多種原因造成的: - 對於不同的系統具有不同的數據模型 - on the wire(VO?):JSON - internaldomain model: object model - data model in the database:DDL - 在不同的團隊,部門或公司,在語義上加以區別 - 例如,兩家公司進行合併可能會對供應商是客戶還是承包商還是僱員有不同的解讀 - Kafka通過將所有數據保存在一個單一的事實來源(single shared source of truth)中來幫助解決這些問題 在此情況下,所有相關各方可以聚在一起並達成共識,而不是將數據通過不同的服務和團隊進行管道傳輸 而每個人都需要達成共識時間在特定的服務/團隊界面上有所不同 ## PART IV Consistency, Concurrency, andEvolution ### ch 11 Consistency and Concurrency in Event-Driven Systems - 術語`consistency`有許多不同的用法 - CAP定理:consistency (eventual consistency & strong consistency) - ACID: strong consistency - Eventual Consistency 最終一致性 - 使用提供最終一致性的Event-Driven時,有兩個注意事項: - Timeliness 及時性 - 如果兩個服務處理同一個事件系統,則它們將以不同的速率處理它們,因此某個服務可能落後於另一個服務 - 如果業務運營出於任何原因同時諮詢這2種服務,則可能導致不一致 - Collisions 衝突 - - 如果不同的服務對同一事件流中的同一實體進行更改,則如果稍後合併該數據(例如在數據庫中),則某些更改可能會丟失 - 大型業務系統通常犧牲即時性(lack of timeliness),減少Collisions機會(p110) - 複製很多只讀的副本 - 根據數據類型(Topic)分配single writer來實現的 :::info Note: 在這個程度見解,只解決讀的壓力,寫的壓力其實沒有根本上解決,不過那只有超大型公司(每日流量破百萬)等級才要考慮 ::: - The Single Writer Principle - 特定類型事件的責任分配給單個服務 `events of a specifictype is assigned to a single service—a single write` - 好處 - 避免correctness - 允許在一個地方應用版本控制和一致性檢查(consistency checks) - 隔離了將每個業務實體及時發展到單個服務的邏輯,使更改更容易(EX:schema改變) - 將數據集的所有權給予單個團隊,從而可以進行專門化 EX:對於與財務相關的數據集,可能會有復雜的業務規則,需要專門的團隊來處理 - ![](https://i.imgur.com/gaq9mTv.png) :::info Note: 上述所有優勢都是`數據來自單一來源`的關係 ::: - Command Topic - 單一寫入原則的一種變體,其中的Topic分為兩個部分:Command和Entity - Command Topic可以由任何進程寫入,並用於啟動事件(EX:收到的訂單) - Entity Topic 只能由擁有的服務(通常的單個編寫者)寫入(EX:已驗證訂單/已發送訂單) - Single Writer Per Transition - 單一寫入原則的變體,其中服務擁有特定的事務,而不是Topic中的所有事務 - 例如 - 從請求的訂單到已驗證的訂單的過渡:由訂單服務處理 - 從已驗證的訂單到已付款的過渡:由付款服務處理 - 從收到的付款到已確認的訂單的轉換:由訂單服務處理 - Atomicity with Transactions - Kafka提供兩個事務保證 - 在事務中發送給不同Topic的消息將全部寫入或完全不寫入 - 在事務中發送到單個Topic的消息將永遠不會重複,即使失敗也是如此 - Identity and Concurrency Control - 通過具有唯一的標識符和版本控制,可以實現樂觀併發控制(optimistic concurrency control) - EX: 1. 如果用戶同時在兩台設備上打開其個人資料頁面 在第一台設備上進行更改並提交 然後在第二台設備上進行更改 而無需先重新加載以從第一台設備上獲取更改 2. 第二台設備上的更新將被拒絕(因為嘗試的更新將在舊版本中進行,而該舊版本尚未合併在第一台設備上所做的更改) ![](https://i.imgur.com/9LPfcNG.png) - 樂觀併發控制技術可以等效地在同步或異步系統中實現 ### ch 12 Transactions, but Not as We Know Them - 事務具有三個重要功能: - 刪除重複項 - 允許原子性地將消息組自動發送到不同的Topic(例如:確認訂單和減少庫存數量) - 將數據保存到Kafka state store,並自動將消息發送到另一個服務 - The Duplicates Problem - 由於系統的不可靠性以及需要重試,通常無法避免重複 - EX: - 如果服務A發送了對服務B的API請求 則服務B成功處理了該請求 但是在發送響應之前崩潰了 服務A在超時後自然會重試,導致重複 - ![](https://i.imgur.com/DalBnXU.png) - 傳統上方式重複資料使用PK來預防,如果是update,就讓他重複更新沒關係(P116) - Kafka中的事務允許創建長鏈結(Long chains),其中鏈中每個步驟的處理都在一次保證中 - 系統中不依賴Kafka的部分將不得不使用其他方式來處理重複項 - exactly-once 屬性要設定 - ![](https://i.imgur.com/OSRrJzs.png) :::info 長鏈結 HTTP協議: `Connection:keep-alive` ::: - Using the Transactions API to Remove Duplicates - 一種常見的用例是,需要確保消費者只有在成功向Kafka發送消息成功後才提交offset(補償) - 如果消息未成功提交給Kafka,則消費者將不會提高其抵銷額 - EX: - 考慮使用一個帳戶驗證服務,該服務可以接收存款,對其進行驗證,然後向Kafka發送一條新消息,將其標記為已驗證存款 - 該交易可以如下實現: ```java // Read and validate deposits validatedDeposits = validate(consumer.poll(0)) // Send validated deposits & commit offsets atomically producer.beginTransaction() producer.send(validatedDeposits) producer.sendOffsetsToTransaction(offsets(consumer)) producer.endTransaction() ``` - 如果使用Kafka Streams API,則不需要額外的代碼,僅需要啟用該功能 - Exactly Once Is Both Idempotence and Atomic Commit (p119) - 身為borker的kafka,上實際上有兩個重複的機會 - Sending message to Kafka - 在將確認發送回客戶端之前,將消息發送到Kafka可能會失敗,可能會導致消息重複 - Reading message from Kafka - 在提交偏移之前(offsets commit),從Kafka讀取消息可能會失敗,並且該進程重新啟動時,可能會第二次讀取該消息 - 解決方式 - sending duplicated - 冪等(idempotence),給每個生產者分配一個唯一的識別符號,並且給每個生產者的消息分配一個序列號 因此如果消息中已經有一條同樣識別符號消息存在,它就會被刪除 - :::info Note: 識別符號 EX:生產者ID +序列 ::: - reading a single message multiple times - 可以通過重複數據刪除(例如在數據庫中)方式解決 - 但也可以使用Kafka的事務,它提供了更廣泛的保證 類似於數據庫中的事務,將所有消息一起捆綁在一個原子提交中(atomic commit) - ![](https://i.imgur.com/6uEG6P9.png) - How Kafka’s Transactions Work Under the Covers - code很像DB寫法 - start transaction - write messages to Kafka - commit or abort - 但有一個很不同的點,它是streaming,以下舉例(Figure 12-5) - ![](https://i.imgur.com/V27bpd8.png) - 會先送Begin標記,在送message,最後在送Commit(or Abort) - <font color=red>只有committed的資料才會供給下游</font>,也就是consumer只會處理已commited訊息 - transaction coordinator - ultimate arbiter(終極仲裁者) - 實作 two-phase commit概念 - 事務產生間接費用,此類開銷可以使用Kafka附帶的性能測試腳本來衡量 - Store State and Send Events Atomically - 利用transactions功能開啟,Kafka能輕易達成 transaction automatically - 下圖表達都是保證transaction automatically - ![](https://i.imgur.com/4nIAvpA.png) - What Can’t Transactions Do? - 不能回滾,要管理跨服務事務管理請參考saga ### ch 13 Evolving Schemas and Data over Time - schema management - [Protobuf](https://developers.google.com/protocol-buffers/) - [JSON Schema](http://json-schema.org/) - [avro](https://avro.apache.org/) - Using Schemas to Manage the Evolution of Data inTime - 如何管理message Schemas版本 - return code 可以表示太舊或不兼容舊版本 - [schema-registry](https://docs.confluent.io/platform/current/schema-registry/index.html) - 支援 Avro schemas - 紀錄Topics 和 schema對應 ![](https://i.imgur.com/lNx8M0b.png) - Handling Schema Change and Breaking Backword Compatibility - schema on read (p128) - 根據讀的需求來設計schema - 比較好處理新舊相容問題 - Kafka中引入非向後兼容數據格式,最常見方法是通過"雙模式升級窗口(Dual Schema Upgrade Window)" - 在該窗口中,我們創建兩個Topic:topic-v1和topic-v2,分別用於具有舊模式和新模式的消息 - producer處理兩個版本Topic時,通常有4個方案 1. Topic擁有服務可以使用事務發佈到兩者 2. Topic擁有服務被重定向為發佈到topic-v2,添加了Kafka Streams作業將topic-v2消息轉換為topic-v1 3. Topic擁有服務繼續寫入topic-v1,添加了Kafka Streams作業將topic-v1消息上轉換為topic-v2直到所有客戶端都已升級,擁有Topic服務指向topic-v2 4. Topic擁有服務在內部遷移其數據集,然後將整個視圖重新發佈到topic-v2主題中的日誌中 :::warning 注意: - 上面的方法1和2無法處理topic-v1日誌中的現有消息,因此不適用於Event Sourcing /長期存儲 - 方法3和方法4都使用Kafka Streams作業,該作業能夠將topic-v1消息上轉換為topic-v2消息 因此能夠topic-v1通過簡單地從偏移量0開始運行作業來處理現有消息 ::: ![](https://i.imgur.com/yblqp8k.png) - Collaborating Over Schema Changes - 使用版本控制和GitHub是協作最好的好方法(P131) - 版控機制 - merge request(pull request) - Handling Unreadable Messages - 傳統的消息系統處理無法處理就轉到dead letter queue - 在Kafka建立分開topic來特別處理 - Deleting Data - 刪除方式 - 只需讓數據過期即可 - 使用Kafka做eventing sourcing時,可以通過使用刪除標記來刪除 - 使用compacted topics,並使用要刪除的key和null將新消息寫入Topic - 當您想要刪除的key進行分區,而且被用於其他key時,就會發生復雜的情況 - EX:在線零售商希望能夠(a)通過customerId刪除,以及(b)通過productId進行分區 因為customerId和productId之間沒有一對一的關係 所以無法創建從customerId到productId的關聯,以便使用productId刪除 - 解決上述問題的方法是使用[productId] [customerId]作為key來刪除 :::info Note: 有沒有做partitions影響很大,導入前必須實作了解,以及規劃可能需要考慮此情況 ::: - Triggeiring Downstream Deletes - 在刪除Kafka上的記錄後,可能還需要刪除某些下游Kafka Connect接收器(例如,傳統數據庫)中的記錄 - 如果使用CDC,則下游將刪除 - 如果不使用CDC,則需要自定義機制 - Segregating Public and Private Topics - Public 和Private 可以使用命名約定分開 - authorization interface可以用於將讀/寫許可分配給相關服務 ## PART V Implementing Streaming Serviceswith Kafka ### ch 14 Implementing Streaming Serviceswith Kafka - A Simple Email Service Built with Kafka Streams and KSQL - 如果應用程序在JVM中運行,則可以使用Kafka Streams Java - KSQL為Kafka Streams API提供類似SQL的包裝器 - Windows, Joins, Tables, and State Stores - Stream join Stream - 傳入事件要定義緩衝的時間段(retention) - Ktable join Stream - Ktable 通常是是一個完成的topic (通常是壓縮過的),存於state store - Kafka Streams中有兩種類型的表格:KTable,GlobalKTable - KTable跨實例進行分區 - GlobalKTable是broadcast(但佔用更多空間) - 由於KTable是按key分區的,因此只能按key連接 - join KTable 方式不是key的話,要做重新分區(repartition) (ch15 Rekey to Join) ### ch 15 Building Streaming Services - Join-Filter-Process - join:Kafka Streams DSL用於聯接其他服務發出的一組流和表 - filter:過濾所有不需要的內容,Aggregations(聚集)也經常在這裡使用 - process:將join結果傳遞到執行業務邏輯的功能,該業務邏輯的輸出被推送到另一個流中 - Event-Sourced Views in Kafka Streams - 當提供對由跨多個實例分區的Kafka狀態存儲支持的資源的RESTful訪問時 需要確保請求使用key到達正確的實例 - 這是通過使用交互式查詢來實現的 - 使用Kafka Connect並使用傳統數據庫作為Event-Sourced Views 也是常見的做法 - Collapsing CQRS with a Blocking Read - CQRS模式基於讀取和寫入的異步性 - 這意味著當用戶嘗試在寫入後立即讀取時,該讀取可能不會顯示最新的寫入 - 使用長輪詢(long polling)技術可以消除CQRS的異步性,以使寫入後的讀取看起來是同步的 - Scaling Concurrent Operations in Streaming Systems - 考慮一個庫存服務,該服務對收到的每個訂單消息執行以下任務 - 驗證庫存中是否有足夠的產品 - if true: 請保留適當數量的產品 - 發送確認訂單的消息 - ![](https://i.imgur.com/SFdf695.png) - 要達到上述目的 - 必須使用Kafka事務 - 這是為了確保針對一個productId的所有操作將在同一線程上順序執行 - 必須rekey,以使用productId作為key - 可以通過以下方式實現重新生成key `orders.selectKey((id, order) -> order.getProduct())//rekey by ProductId` - rekey將值推回到Kafka中的新`中間Topic` - 代表庫存已預留狀態 state store - 建立 state store `KeyValueStore<Product, Long> store = context.getStateStore(RESERVED);` - 使用 stat store `//Get the current reserved stock for this product Long reserved = store.get(order.getProduct()); //Add the quantity for this order and submit it back store.put(order.getProduct(), reserved + order.getQuantity())` - Rekey to Join - 加入時必須對主題進行co-partitioned - 相同數量的分區和相同的分區策略 ![](https://i.imgur.com/Xh0esiu.png) - ProductId 都統一在instance 2 - Repartitioning and Staged Execution - 通常將流Rekey另一個key以進行處理,然後Rekey還原原始key以更新某些視圖 - EX: 最初由orderId鍵入關鍵字的訂單流可以由productId Rekey關鍵字以進行驗證 然後再Rekey回orderId以更新驗證狀態 ![](https://i.imgur.com/XrwU5oG.png) - 在這種情況下,如果要保證順序,則用於劃分事件流的key必須是不變的 - 對於上面的示例,orderId和productId必須在與該訂單有關的所有消息中保持固定 - Waiting for N Events - 另一個常見用例是在觸發處理的另一步驟之前發生N個事件(Figure 15-1. OV Topic) - 如果N個事件發送不同Topic,則可以在單獨的流上使用join來等待事件 (圖中事例) - 如果N個事件發送同一Topic,則解決方案通常採用以下形式 (P151) - Group by the key - 計算key的次數(用aggregator處理) - Filter required count ![](https://i.imgur.com/fWO4G0m.png) - A More Holistic Streaming Ecosystem ![](https://i.imgur.com/X1WilpZ.png) :::info Note: 其實可以發現,稍微一點複雜的商業邏輯用了此架構好像又變得更複雜 😶😶😶😶 ::: # 心得 ## 缺乏客觀性 - 書本內容缺點是沒有把原本方案拿來比較 - EX: 作者只拿單一原件比較 - Kafka vs DB - Kafka vs MQ - 正常架構是把所有原件合在一起才對比叫才對 - 攻擊微服務(ch8)論點不足 - 為了強調資料共享,一直說資料沒共享的壞處 - 其實這跟DDD切服務邊界問題有關,跟技術無關 ## Process Manager - Greg Young ,Event Sourcing 的作者, 有提到一個問題點,沒有Process Manager - 開發快速不是重點,而是之後要如何維護 - 書本沒有針對這點有提出好的方案 ## KAFKA 並不適合 event sourcing 討論 - Kafka查詢功能不完善,只能用topic 來提供想要查詢的內容 - 一致性問題仍存在(無法做到Two-phase Commit),不可能適用於所有商業模型 - 書中作者針對批評他的blog有做演講反擊 - https://www.confluent.io/kafka-summit-san-francisco-2019/event-sourcing-stream-processing-and-serverless/ ## 實現event sourcing 不用太複雜 - 如果我們目的只是回播原來狀況, 利用db cdc機制其實就可以 - Kafka 結合最大好處是給下游查詢多樣性 # 資料補充 ## event driven - https://zhuanlan.zhihu.com/p/258619276 - https://martinfowler.com/eaaDev/EventCollaboration.html - https://supunbhagya.medium.com/request-driven-vs-event-driven-microservices-7b1fe40dccde ## event sourcing - https://martinfowler.com/articles/201701-event-driven.html - https://docs.microsoft.com/zh-tw/azure/architecture/patterns/event-sourcing - https://eventuate.io/whyeventsourcing.html - https://dev.to/barryosull/event-sourcing-what-it-is-and-why-its-awesome - https://www.confluent.io/blog/event-sourcing-cqrs-stream-processing-apache-kafka-whats-connection/ ## 架構 - https://zimarev.com/blog/event-sourcing/myth-busting/2020-07-09-overselling-event-sourcing/ - https://stackoverflow.com/questions/56728979/event-sourcing-why-a-dedicated-event-store - https://www.slideshare.net/BenWilcock1/microservice-architecture-with-cqrs-and-event-sourcing - https://docs.microsoft.com/zh-tw/dotnet/architecture/microservices/multi-container-microservice-net-applications/integration-event-based-microservice-communications - https://gist.github.com/brendanzab/a6073e73f751a6ca9750f960a92f2afe - https://www.slideshare.net/gschmutz/kafka-as-an-event-store-is-it-good-enough-191034501 - https://andela.com/insights/building-scalable-applications-using-event-sourcing-and-cqrs/ - https://mykidong.medium.com/data-streaming-microservices-platform-and-devops-3906300512e7 - https://debezium.io/blog/2020/02/10/event-sourcing-vs-cdc/ ## KAFKA - https://posts.specterops.io/real-time-sysmon-processing-via-ksql-and-helk-part-1-initial-integration-88c2b6eac839 - https://www.confluent.io/blog/kafka-streams-tables-part-1-event-streaming/ - https://kafka.apache.org/0110/documentation/streams/developer-guide - https://docs.confluent.io/platform/current/streams/developer-guide/processor-api.html#sts=State%20Stores%C2%B6 - https://kafka.apache.org/documentation/#compaction - https://www.confluent.io/blog/handling-gdpr-log-forget/ - https://blog.csdn.net/u013200380/article/details/106453013 - https://debezium.io/blog/2018/09/20/materializing-aggregate-views-with-hibernate-and-debezium/ ## KAFKA 性能測試 - https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines ## 反對 - https://chriskiehl.com/article/event-sourcing-is-hard - https://medium.com/serialized-io/apache-kafka-is-not-for-event-sourcing-81735c3cf5c - https://zimarev.com/blog/event-sourcing/myth-busting/2020-07-09-overselling-event-sourcing/