# Ch6. event sourcing * Using the Event sourcing pattern to develop business logic * Implementing an event store * Integrating sagas and event sourcing-based business logic * Implementing saga orchestrators using event sourcing ## 6.1 Developing business logic using event sourcing * event represents a state change of the aggregate. * persists an aggregate as a sequence of events * recreates the current state of an aggregate by replaying the events. :::info object-relational table ![image](https://hackmd.io/_uploads/H1mNXWNKa.png) ::: :::info event table ![image](https://hackmd.io/_uploads/HktHm-4F6.png) ::: ### 6.1.1 The trouble with traditional persistence #### OBJECT-RELATIONAL IMPEDANCE MISMATCH | | Data Models | Granularity of Operations | Complex Relationships | Query Languages | Performance | | | | -------------- | -------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------ | -------------------------------------------------------------------------- | ------------------------------------------------------------------------------------ | ---------------------------------------------------------------------------------------- | ---- | ---- | | OOP | 在OOP中,数据封装在对象内,对象具有属性(数据成员)和方法(函数或过程)。对象之间的关系通过关联建立。 | 面向对象编程通常涉及对整个对象的操作。例如,可能有在对象的属性上一起执行操作的方法。 | 面向对象模型可以表示复杂的关系,如继承层次、多态和关联。 | 面向对象语言使用编程语言进行查询和操作数据结构。 | 面向对象操作可能需要多个查询或操作来获取和更新相关数据。 | | | | relational DB | 数据以表格形式组织,具有行和列。实体之间的关系使用外键表示。关系模型不直接支持继承、多态或封装等概念。 | 数据库操作通常涉及对单个行或一组行的操作。查询和更新通常以记录为单位进行。 | 在关系数据库中表示复杂关系可能需要多个表,而查询这样的关系可能更为复杂。 | 使用结构化查询语言(SQL)进行查询和操作数据,其语法和结构与典型编程语言不同。 | 针对关系数据库进行查询优化可能涉及到反规范化或使用更复杂的连接操作以有效地检索相关数据。 | | | | IMPEDANCE | 传统的关系数据库通常要求将复杂的面向对象结构映射到关系型数据表,这可能导致模型不匹配。 | 传统的关系数据库操作通常涉及单个记录或记录集,而不太适应面向对象范式中更大的对象。 | 在关系数据库中表示复杂关系可能需要使用多个表,这可能导致查询变得更加复杂。 | 传统查询通常涉及复杂的连接和聚合操作,而在事件溯源中,查询可以更直接地从事件中构建。 | 传统的关系型数据库性能可能受到复杂查询和连接操作的影响。 | | | | event sourcing | 在事件溯源中,应用程序的状态通过回放一系列事件来推导。事件代表状态变更,并以追加方式存储在日志中。实体的当前状态是通过应用这些事件来确定的。 | 通过将应用程序的状态表示为一系列事件,可以更灵活地处理数据变更,而不仅仅是单个记录。 | 通过事件的序列可以更自然地表示实体之间的复杂关系。 | 查询可以通过回放事件序列来构建实体的当前状态。 | 回放事件序列来构建当前状态可能涉及较大的数据集,但可以通过合适的索引和缓存进行优化。 | | | #### LACK OF AGGREGATE HISTORY just as the above event table and general item table. event table records the aggregate history. #### IMPLEMENTING AUDIT LOGGING IS TEDIOUS AND ERROR PRONE event table record who or when processing the change. #### EVENT PUBLISHING IS BOLTED ON TO THE BUSINESS LOGIC 假设有一个订单聚合,当订单状态发生变化时,需要发布一个订单已完成的事件。 ```java= public class Order { private String orderId; private String status; // Constructor, getters, setters... public void markAsCompleted() { // Business logic to mark the order as completed // Now, let's bolt on event-generation logic OrderCompletedEvent event = new OrderCompletedEvent(orderId); EventPublisher.publish(event); // Event publishing bolted on to the business logic } } ``` 在上述例子中,markAsCompleted 方法包含了将订单标记为已完成的业务逻辑,但同时也手动添加了发布订单已完成事件的逻辑。事件发布的逻辑被添加到了业务逻辑中。 ### 6.1.2 Overview of event sourcing * An aggregate is stored in the database as a series of events. * Each event represents a state change of the aggregate. * An aggregate’s business logic is structured around the requirement to produce and consume these events. #### EVENT SOURCING PERSISTS AGGREGATES USING EVENTS ![image](https://hackmd.io/_uploads/HktHm-4F6.png) An application loads an aggregate from the **event store** by retrieving its events and replaying them. Specifically, loading an aggregate consists of the following three steps: 1. Load the events for the aggregate. 2. Create an aggregate instance by using its default constructor. 3. Iterate through the events, calling apply(). ```java= Class aggregateClass = ...; Aggregate aggregate = aggregateClass.newInstance(); for (Event event : events) { aggregate = aggregate.applyEvent(event); } // use aggregate... ``` | | how to reconstruct the in-memory state of an aggregate | | | -------------- | ------------------------------------------------------------------------------- | --- | | ORM | executing one or more SELECT statements to retrieve the current persisted state | | | event sourcing | loading the events and replaying events | | #### **EVENTS REPRESENT STATE CHANGES** * Whenever the aggregate’s state changes, it must emit an event. * Because events are used to persist an aggregate, you no longer have the option of using a minimal OrderCreated event that contains the orderId. ![aggregate change status by apply events](https://hackmd.io/_uploads/rk6xIfEYa.png) #### **AGGREGATE METHODS ARE ALL ABOUT EVENTS** * the outcome of invoking an aggregate’s command method is a sequence of events that represent the state changes that must be made. * These events are persisted in the database and applied to the aggregate to update its state. * It’s important to note that these methods can’t fail, because an event represents a state change that has happened. Each method updates the aggregate based on the event. ![image](https://hackmd.io/_uploads/HykEvMVF6.png) :::info ![image](https://hackmd.io/_uploads/SymAPzVta.png) The Eventuate Client framework split command() into process() and apply() process -> validate parameters apply -> status change ::: #### **Step to create or update aggregate** An aggregate is created using the following steps: 1. Instantiate aggregate root using its default constructor. 2. Invoke process() to generate the new events. 3. Update the aggregate by iterating through the new events, calling its apply(). 4. Save the new events in the event store. An aggregate is updated using the following steps: 1. Load aggregate’s events from the event store. 2. Instantiate the aggregate root using its default constructor. 3. Iterate through the loaded events, calling apply() on the aggregate root. 4. Invoke its process() method to generate new events. 5. Update the aggregate by iterating through the new events, calling apply(). 6. Save the new events in the event store. #### **EVENT SOURCING-BASED ORDER AGGREGATE** * The only difference is that the aggregate’s id isn’t stored in the aggregate. The createOrder() factory method has been replaced by process() and apply() methods. The process() method takes a CreateOrder command and emits an OrderCreated event. The apply() method takes the OrderCreated and initializes the fields of the Order. ```java= public class Order { private OrderState state; private Long consumerId; private Long restaurantId; private OrderLineItems orderLineItems; private DeliveryInformation deliveryInformation; private PaymentInformation paymentInformation; private Money orderMinimum; public Order() { } // create order public List<Event> process(CreateOrderCommand command) { // Validates the command and returns an OrderCreatedEvent ... validate command ... return events(new OrderCreatedEvent(command.getOrderDetails())); } public void apply(OrderCreatedEvent event) { //Apply the OrderCreatedEvent by initializing the fields of the Order. OrderDetails orderDetails = event.getOrderDetails(); this.orderLineItems = new OrderLineItems(orderDetails.getLineItems()); this.orderMinimum = orderDetails.getOrderMinimum(); this.state = APPROVAL_PENDING; } // revised order public List<Event> process(ReviseOrder command) { OrderRevision orderRevision = command.getOrderRevision(); switch (state) { case APPROVED: LineItemQuantityChange change = orderLineItems.lineItemQuantityChange(orderRevision); if (change.newOrderTotal.isGreaterThanOrEqual(orderMinimum)) { throw new OrderMinimumNotMetException(); } return singletonList(new OrderRevisionProposed(orderRevision, change.currentOrderTotal, change.newOrderTotal)); default: throw new UnsupportedStateTransitionException(state); } } public void apply(OrderRevisionProposed event) { this.state = REVISION_PENDING; } //confirm revised public List<Event> process(ConfirmReviseOrder command) { OrderRevision orderRevision = command.getOrderRevision(); switch (state) { case REVISION_PENDING: LineItemQuantityChange licd = orderLineItems.lineItemQuantityChange(orderRevision); return singletonList(new OrderRevised(orderRevision, licd.currentOrderTotal, licd.newOrderTotal)); default: throw new UnsupportedStateTransitionException(state); } } public void apply(OrderRevised event) { OrderRevision orderRevision = event.getOrderRevision(); if (!orderRevision.getRevisedLineItemQuantities().isEmpty()) { orderLineItems.updateLineItems(orderRevision); } this.state = APPROVED; } } ``` ### **6.1.3 Handling concurrent updates using optimistic locking** * version number in aggregate root table ```sql= UPDATE AGGREGATE_ROOT_TABLE SET VERSION = VERSION + 1 ... WHERE VERSION = <original version> ``` * event store could maintain an explicit version number ### **6.1.4 Event sourcing and publishing events** Chapter 3 describes a couple of different mechanisms—**polling**and **transaction log tailing**—for publishing messages that are inserted into the database as part of a transaction. The main difference is that it **permanently stores events in an EVENTS table** rather than temporarily saving events in an OUTBOX table and then deleting them. #### USING POLLING TO PUBLISH EVENTS * challenge : determining which events are new. ex. SELECT * FROM EVENTS where event_id > ? ORDER BY event_id ASC. :::info ![image](https://hackmd.io/_uploads/HJvh0MEFa.png) mistake to ignore events ::: * solution: add an extra column to the EVENTS table that tracks whether an event has been published. 1. Find unpublished events by executing this SELECT statement: SELECT * FROM EVENTS where PUBLISHED = 0 ORDER BY event_id ASC. 2. Publish events to the message broker. 3. Mark the events as having been published: UPDATE EVENTS SET PUBLISHED = 1 WHERE EVENT_ID in. #### USING TRANSACTION LOG TAILING TO RELIABLY PUBLISH EVENTS It reads events inserted into an EVENTS table from the database transaction log and publishes them to the message broker. ### **6.1.5 Using snapshots to improve performance** In this example, the snapshot version is N. The application only needs to load thesnapshot and the two events that follow it in order to restore the state of the aggregate.The previous N events are not loaded from the event store. :::info ![image](https://hackmd.io/_uploads/SJ9M-7Vta.png) ::: ```java= Class aggregateClass = ...; Snapshot snapshot = ...; Aggregate aggregate = recreateFromSnapshot(aggregateClass, snapshot); for (Event event : events) { aggregate = aggregate.applyEvent(event); } // use aggregate... ``` ![image](https://hackmd.io/_uploads/BkM3ZZEFT.png) ### **6.1.6 Idempotent message processing** mq might send message at least once -> how to avoid duplicate event. * **discard duplicate event by comparing id**: records the ids of processed messages in a PROCESSED_MESSAGES table as part of the local ACID transaction used by the business logic to create or update aggregates. If the ID of a message is in the PROCESSED_MESSAGES table, it’s a duplicate and can be discarded. ### **6.1.7 Evolving domain events** * Consists of one or more aggregates * Defines the events that each aggregate emits * Defines the structure of the events These changes occur naturally as a service’s domain model evolves over time ![image](https://hackmd.io/_uploads/HyLfHmNFa.png) #### MANAGING SCHEMA CHANGES THROUGH UPCASTING sql ->flyway as version control aggregate->transform events when they’re loaded from the event store. ### **6.1.8 Benefits of event sourcing** * Reliably publishes domain events * Preserves the history of aggregates * Mostly avoids the O/R impedance mismatch problem * Provides developers with a time machine ### **6.1.9 Drawbacks of event sourcing** * It has a different programming model that has a learning curve. * It has the complexity of a messaging-based application. * Evolving events can be tricky. * Deleting data is tricky. * Querying the event store is challenging. #### Evolving events can be tricky With event sourcing, the schema of events (and snapshots!) will evolve over time. Because events are stored forever, aggregates potentially need to fold events corresponding to multiple schema versions. A good solution to this problem is to upgrade events to the latest version when they’re loaded from the event store. #### DELETING DATA IS TRICKY GDPR to protect user inforamation. solution: user with encryted key -> event store the encryted data. deleting the encryted data to protect the information. #### QUERYING THE EVENT STORE IS CHALLENGING you must implement queries using the CQRS approach described in chapter 7. ## 6.2 Implementing an event store * An event store is a hybrid of a database and a message broker. database :an API for inserting and retrieving an aggregate’s events by primary key. message broker: because it has an API for subscribing to events. ### **6.2.1 How the Eventuate Local event store works** ![image](https://hackmd.io/_uploads/Byzx974t6.png) #### THE SCHEMA OF EVENTUATE LOCAL’S EVENT DATABASE * events—Stores the events * entities—One row per entity * snapshots—Stores snapshots ```sql= create table events ( event_id varchar(1000) PRIMARY KEY, event_type varchar(1000), event_data varchar(1000) NOT NULL, entity_type VARCHAR(1000) NOT NULL, entity_id VARCHAR(1000) NOT NULL, triggering_event VARCHAR(1000) -- used to detect duplicate events/messages. --It storesthe ID of the message/event whose processing generated this event. ); create table entities ( entity_type VARCHAR(1000), entity_id VARCHAR(1000), entity_version VARCHAR(1000) NOT NULL, -- Each time an entity is updated, the entity_version column is updated. PRIMARY KEY(entity_type, entity_id) ); create table snapshots ( entity_type VARCHAR(1000), entity_id VARCHAR(1000), entity_version VARCHAR(1000), snapshot_type VARCHAR(1000) NOT NULL, snapshot_json VARCHAR(1000) NOT NULL, triggering_events VARCHAR(1000), PRIMARY KEY(entity_type, entity_id, entity_version) ) ``` * flow to find() aggregate * The find() operation queries the entity table to retrieve the entity’s current version. * The find() operation queries the snapshots table to retrieve the latest snapshot. * if any. If a snapshot exists, the find() operation queries the events table to find all events whose event_id is greater than the snapshot’s entity_version. * Otherwise, find() retrieves all events for the specified entity. #### THE EVENTUATE LOCAL EVENT RELAY PROPAGATES EVENTS FROM THE DATABASE TO THE MESSAGE BROKER * It uses transaction log tailing whenever possible and polling for other databases. * The event relay connects to the MySQL server as if it were a slave and reads the MySQL binlog, a record of updates made to the database. Inserts into the EVENTS table, which correspond to events, are published to the appropriate Apache Kafka topic. * periodically saves the current position in the binlog—filename and offset