# Part 2: Designing Data-Intensive Applications # Chapter 9: Consistency and Consensus If the case "one thing fail, everything stops" cannot be accepted, we must find a way to tolerate the fault => Build general-purpose abstractions with useful guarantees, let applications rely on those abstractions. ## Consistency Guarantees Transaction isolation is primarily about avoiding race conditions due to concurrently executing transactions, whereas distributed consistency is mostly about coordinating the state of replicas in the face of delays and faults.   ## Linearizability Knowned as `atomic consistency`, `strong consistency`. The idea is to make a system appear as if there were only one copy of the data, and all operations on it are atomic. ![image](https://hackmd.io/_uploads/Sylla4W4yl.png) ### What Makes a System Linearizable? ![image](https://hackmd.io/_uploads/HytDC4-4Je.png) If one client's read returns new value, all subsequent reads must also return new value, even if the write operation has not yet completed. ![image](https://hackmd.io/_uploads/SyuTAEW4Je.png) Linearizability is a recency guarantee on reads and writes of a register (an individual object). It doesn’t group operations together into transactions, so it does not prevent problems such as write skew. A database may provide both serializability and linearizability, and this combination is known as `strict serializability` or `strong one-copy serializability` (strong-1SR). Implementations of serializability based on two-phase locking or actual serial execution are typically linearizable. However, serializable snapshot isolation is not linearizable. ### Relying on Linearizability Use cases for linearizability **Locking and leader election** Used for lead election in Zookeeper... Used in distributed locking. **Constraints and uniqueness guarantees** Enforce data uniqueness at the time data is written - use Linearizability **Cross-channel timing dependencies** ![image](https://hackmd.io/_uploads/SJFxurWEyl.png) This problem arises because there are two different communication channels between the web server and the resizer: the file storage and the message queue. ### Implementing Linearizable Systems The most common approach to making a system fault-tolerant is to use replication *- Single-leader replication (potentially linearizable)*: If you make reads from the leader, or from synchronously updated followers, they have the potential to be linearizable. May not because of snapshot isolation or concurrency bugs *- Consensus algorithms (linearizable)*: this is how Zookeeper and etcd work *- Multi-leader replication (not linearizable)*: multiple writes on multiple nodes *- Leaderless replication (probably not linearizable)*: LWW and sloppy quorums **Linearizability and quorums** ![image](https://hackmd.io/_uploads/HkMA9BWEJg.png) In summary, it is safest to assume that a leaderless system with Dynamo-style replication does not provide linearizability. ### The Cost of Linearizability ![image](https://hackmd.io/_uploads/HJ9-hrW4Jl.png) **The CAP theorem** The trade-off: - If application requires linearizability, and some replicas have network problem, they have to wait (become unavailable) - If application does not require linearizability, with network problem they can operate idependently, they are available, but not linearizability Thus, applications that don’t require linearizability can be more tolerant of network problems. This insight is popularly known as the CAP theorem. **Linearizability and network delays** The reason for dropping linearizability is performance, not fault tolerance ## Ordering Guarantees ### Ordering and Causality If a system obeys the ordering imposed by causality, we say that it is causally consistent. For example, snapshot isolation provides causal consistency: when you read from the database, and you see some piece of data, then you must also be able to see any data that causally precedes it (assuming it has not been deleted in the meantime). **The causal order is not a total order** If you are familiar with distributed version control systems such as Git, their version histories are very much like the graph of causal dependencies. Often one commit happens after another, in a straight line, but sometimes you get branches (when several people concurrently work on a project), and merges are created when those concurrently created commits are combined. **Linearizability is stronger than causal consistency** Any system that is linearizable will preserve causality correctly. But linearizability is not the only way of preserving causality. Researchers are exploring new kinds of databases that preserve causality, with performance and availability characteristics that are similar to those of eventually consistent systems. **Capturing causal dependencies** In order to maintain causality, you need to know which operation `happened before` which other operation. Causal consistency goes further (than detecting concurrent writes in leaderless replication): it needs to track causal dependencies across the entire database, not just for a single key. Version vectors can be generalized to do this. ### Sequence Number Ordering Keeping track of all causal dependencies can become impractical. we can use `sequence numbers` or `timestamps` (can be logical clocks) to order events. Sequence numbers and timestamps provide `total order` **Noncausal sequence number generators** If multi-leader or leaderless replication is used, some common ways to generate seq number: - Each node generate independent set of numbers, for example odd and even numbers - Attach a timestamp from a time-of-day (physical clock) - Preallocate a block of seq num However, they all have a problem: the sequence numbers they generate are not consistent with causality (different nodes generation). **Lamport timestamps** There is actually a simple method for generating sequence numbers that is consistent with causality. It is called `Lamport timestamp` ![image](https://hackmd.io/_uploads/BkXm8RzVkl.png) Although there are some similarities, they have a different purpose: version vectors can distinguish whether two operations are concurrent or whether one is causally dependent on the other, whereas Lamport timestamps always enforce a total ordering. **Timestamp ordering is not sufficient** The problem here is that the total order of operations only emerges after you have collected all of the operations (know that other nodes are doing) This idea of knowing when your total order is finalized is captured in the topic of `total order broadcast`. ### Total Order Broadcast Total order broadcast is usually described as a protocol for exchanging messages between nodes. Informally, it requires that two safety properties always be satisfied: - `Reliable delivery`: No messages are lost: if a message is delivered to one node, it is delivered to all nodes - `Totally ordered delivery`: Messages are delivered to every node in the same order. **Using total order broadcast** Total order broadcast is exactly what need for database replication: if every message represents a write to the database, and every replica processes the same writes in the same order, then the replicas will remain consistent with each other. This principle is known as `state machine replication`. Delivering a message is like appending to the log. Since all nodes must deliver the same messages in the same order, all nodes can read the log and see the same sequence of messages. **Implementing linearizable storage using total order broadcast** Total order broadcast is asynchronous: messages are guaranteed to be delivered reliably in a fixed order, but there is no guarantee about when a message will be delivered. By contrast, linearizability is a recency guarantee: a read is guaranteed to see the latest value written. **Implementing total order broadcast using linearizable storage** The easiest way is to assume you have a linearizable register that stores an integer and that has an atomic increment-and-get operation. The algorithm is simple: for every message you want to send through total order broadcast, you increment-and-get the linearizable integer, and then attach the value you got from the register as a sequence number to the message. You can then send the message to all nodes (resending any lost messages), and the recipients will deliver the messages consecutively by sequence number. ## Distributed Transactions and Consensus The goal is simply `to get several nodes to agree on something`. But it's easy to wrongly implement this. Use cases: - leader election - atomic commit: in database that supports trancactions spanning multi nodes and partitions. 2PC is a kind of consensus algorithm - but not very good. ### Atomic Commit and Two-Phase Commit (2PC) **From single-node to distributed atomic commit** Atomic commit on single-node is easy, using similar kinds of WAL. On distributed nodes, it's not easy to just send commit messages to other nodes. **Introduction to two-phase commit** ![image](https://hackmd.io/_uploads/rJdv67DE1l.png) Ask for acknowledgement from 2 nodes for writing and commiting changes. The transaction coordinator is often implemented from the code library. **A system of promises** The protocol contains two crucial "points of no return": when a participant votes "yes," it promises that it will definitely be able to commit later (although the coordinator may still choose to abort); and once the coordinator decides, that decision is irrevocable **Coordinator failure** ![image](https://hackmd.io/_uploads/B1rLE4v4Jg.png) The only way 2PC can complete is by waiting for the coordinator to recover. This is why the coordinator must write its decisions into a transaction log. Thus, the commit point of 2PC comes down to a regular single-node atomic commit on the coordinator. **Three-phase commit** Two-phase commit is called a `blocking atomic` commit protocol due to the fact that 2PC can become stuck waiting for the coordinator to recover. In theory, it is possible to make an atomic commit protocol `nonblocking`, so that it does not get stuck if a node fails. However, making this work in practice is not so straightforward. ### Distributed Transactions in Practice Database-internal transactions do not have to be compatible with any other system, so they can use any protocol and apply optimizations specific to that particular technology. For that reason, database-internal distributed transactions can often work quite well. On the other hand, transactions spanning heterogeneous technologies are a lot more challenging. **Exactly-once message processing** Such a distributed transaction is only possible if all systems affected by the transaction are able to use the same atomic commit protocol **XA transactions** X/Open XA (short for eXtended Architecture) is a standard for implementing two-phase commit across heterogeneous technologies Coordinator implements XA API for 2PC. **Holding locks while in doubt** Holding locks in 2PC can block other nodes and processes. **Recovering from coordinator failure** The administrator must examine the participants of each in-doubt transaction, determine whether any participant has committed or aborted already, and then apply the same outcome to the other participants. **Limitations of distributed transactions** XA itself a database, many problems happen with the coordinator: stateful/stateless, replication, compatible with data systems... ### Fault-Tolerant Consensus It can be proved that any consensus algorithm requires at least a majority of nodes to be functioning correctly in order to assure termination. **Consensus algorithms and total order broadcast** Total order broadcast is equivalent to repeated rounds of consensus (each consensus decision corresponding to one message delivery). **Single-leader replication and consensus** In automatic leader election system, it seems that in order to elect a leader, we first need a leader (with total broadcast). In order to solve consensus, we must first solve consensus. How do we break out of this conundrum? **Epoch numbering and quorums** Each epoch number has a leader (if conflicts, higher epoch win) Every decisions of leader must followed a quorum agreements. 2 vote rounds: leader election and leader's proposal election. **Limitations of consensus** Consensus algorithms are a huge breakthrough for distributed systems: they bring concrete safety properties (agreement, integrity, and validity) to systems where everything else is uncertain, and they nevertheless remain fault-tolerant Consensus systems generally rely on timeouts to detect failed nodes and require a fixed number of nodes to tolerate some failed nodes. ### Membership and Coordination Services ZooKeeper and etcd are designed to hold small amounts of data, that small amount of data is replicated across all the nodes using a fault-tolerant total order broadcast algorithm. ZooKeeper features include total broadcast (and consensus) and: - Linearizable atomic operations - Total ordering of operations - Failure detection - Change notifications **Allocating work to nodes** An application may initially run only on a single node, but eventually may grow to thousands of nodes. Trying to perform majority votes over so many nodes would be terribly inefficient. Instead, ZooKeeper runs on a fixed number of nodes (usually three or five) and performs its majority votes among those nodes while supporting a potentially large number of clients. Thus, ZooKeeper provides a way of "outsourcing" some of the work of coordinating nodes (consensus, operation ordering, and failure detection) to an external service. **Service discovery** Although service discovery does not require consensus, leader election does. Thus, if your consensus system already knows who the leader is, then it can make sense to also use that information to help other services discover who the leader is. **Membership services** A membership service determines which nodes are currently active and live members of a cluster. ## Summary Linearizability is appealing because it is easy to understand—it makes a database behave like a variable in a single-threaded program, but it has the downside of being slow, especially in environments with large network delays. Causality provides us with a weaker consistency model: some things can be concurrent, so the version history is like a timeline with branching and merging. Causal consistency does not have the coordination overhead of linearizability and is much less sensitive to network problems. # Part III: Derived Data ## Systems of Record and Derived Data Systems store and process data are grouped into: - **Systems of record**: the `source of truth`, holds the authoritative version of data (normalized data) - **Dervied data systems**: cache data for example, denormalized data, indexes and materialized views. Derived data is redundant but good for read request. The distinction between two type depend on the use case of applications. # Chapter 10: Batch Processing Three kinds of system: - Services (online systems): requests/responses, measure of performance by response time - Batch processing systems (offline systems): input, output data, measure of performance by throughput - Stream processing systems (near-real-time systems): input, output data but they operate shortly after receiving events (different with batch processing) ## Batch Processing with Unix Tools ### Simple Log Analysis The sort utility in GNU Coreutils (Linux) automatically handles larger-than-memory datasets by spilling to disk, and automatically parallelizes sorting across multiple CPU cores. ### The Unix Philosophy ![image](https://hackmd.io/_uploads/S1FlhDdEkg.png) Unix use an uniform interface for input and output of programs. However, the biggest limitation of Unix tools is that they run only on a single machine—and that’s where tools like Hadoop come in. ## MapReduce and Distributed Filesystems MapReduce is a bit like Unix tools, but distributed across potentially thousands of machines. While Unix tools use `stdin` and `stdout` as input and output, MapReduce jobs read and write files on a distributed filesystem. In Hadoop's implementation, it's HDFS. HDFS is based on the shared-nothing principle. HDFS consists of a daemon process running on each machine, exposing a network service that allows other nodes to access files stored on that machine (assuming that every general-purpose machine in a datacenter has some disks attached to it). A central server called the `NameNode` keeps track of which file blocks are stored on which machine. Thus, HDFS conceptually creates one big filesystem that can use the space on the disks of all machines running the daemon. Replication by copying data to other machines. ### MapReduce Job Execution MapReduce is a programming framework with which you can write code to process large datasets in a distributed filesystem like HDFS. ![image](https://hackmd.io/_uploads/Bk6eEFd4kg.png) To create a MapReduce job, need to implement 2 callback functions, mapper and reducer. - Mapper is called once for every input, extract key-value pairs (0 or many) - Reducer: iterate through all values belonging same key, produce output records **Distributed execution of MapReduce** ![image](https://hackmd.io/_uploads/SJeB8Y_41g.png) The reduce task takes the files from the mappers and merges them together, preserving the sort order The reducer is called with a key and an iterator that incrementally scans over all records with the same key. The output records are written to a file on the distributed filesystem (usually, one copy on the local disk of the machine running the reducer, with replicas on other machines). **MapReduce workflows** It is very common for MapReduce jobs to be chained together into workflows, such that the output of one job becomes the input to the next job. The first job must be configured to write its output to a designated directory in HDFS, and the second job must be configured to read that same directory name as its input. One job in a work‐flow can only start when the prior jobs that produce its input directories—have completed successfully. To handle these dependencies between job executions, various workflow schedulers for Hadoop have been developed, including Oozie, Azkaban, Luigi, Airflow, and Pinball. ### The Output of Batch Workflows Batch processing is not transaction processing, nor is it analytics. It is closer to analytics, in that a batch process typically scans over large portions of an input dataset. The output of a batch process is often not a report, but some other kind of structure. **Building search indexes** The original purpose was to building search indexes. The mappers partition the set of documents as needed, each reducer builds the index for its partition, and the index files are written to the distributed filesystem. **Key-value stores as batch process output** Another common use for batch processing is to build machine learning systems such as classifiers and recommendation system. Build a database inside the batch job and write it as files to output directory, just like the search indexes in the last section and can be loaded in bulk into servers that handle read-only queries. **Philosophy of batch process outputs** By treating inputs as immutable and avoiding side effects, batch jobs not only achieve good performance but also become much easier to maintain ### Comparing Hadoop to Distributed Databases Hadoop is somewhat like a distributed version of Unix, where HDFS is the filesystem and MapReduce is a quirky implementation of a Unix process. **Diversity of storage** Databases require structuring data models, Hadoop only needs byte sequences (can be any file types) Data warehouse vs Data lake. Hadoop has often been used for implementing ETL processes: Data from transaction processing systems is dumped into the distributed filesystem in some raw form, and then MapReduce jobs are written to clean up that data, transform it into a relational form, and import it into an MPP data warehouse for analytic purposes. **Diversity of processing models** The Hadoop ecosystem includes both random-access OLTP databases (Hbase...) and MPP-style analytic databases (Impala). Both kind of databases use MapReduce and HDFS. **Designing for frequent faults** MapReduce can tolerate the failure of a map or reduce task without it affecting the job as a whole by retrying work at the granularity of an individual task. MapReduce is designed to tolerate frequent unexpected task termination: it’s not because the hardware is particularly unreliable, it’s because the freedom to arbitrarily terminate processes enables better resource utilization in a computing cluster. In an environment where tasks are not so often terminated, the design decisions of MapReduce make less sense ## Beyond MapReduce MapReduce is very robust: you can use it to process almost arbitrarily large quantities of data on an unreliable multi-tenant system with frequent task terminations, and it will still get the job done. On the other hand, other tools are sometimes orders of magnitude faster for some kinds of processing. ### Materialization of Intermediate State `intermediate state`: output data from one job that will be passed to another job. The process of writing out this intermediate state to files is called `materialization`. Different with pipes in Unix (using `stream` in-mem buffer). Some downsides of MapReduce compares to Unix: - A new job must be finished to start the next job - Mappers of later jobs can be redundant - Intermediate state makes redundate replication of data across nodes. **Dataflow engines** Spark, Tez and Flink were developed to fix the problems with MapReduce: they handle an entire workflow as one job, rather than breaking it up into independent subjobs. They are `dataflow engines` They use `operators`: - repartition and sorting are optional Advantages: - Sort is not mandatory like in MapReduce - No unnecessary map tasks - Locality optimizations, task placement from the scheduler for data sharing (without need of replication like in MapReduce) - Keep intermediate state between operators in memory or disk - Operators can start executing as soon as their input is ready - Existing Java Virtual Machine (JVM) processes can be reused to run new operators (MapReduce launches a new JVM for each task) **Fault tolerance** Dataflow engines recompute data if a the machine is lost. In order to avoid such cascading faults, it is better to make operators deterministic. Recovering from faults by recomputing data is not always the right answer: if the intermediate data is much smaller than the source data, or if the computation is very CPU intensive, it is probably cheaper to materialize the intermediate data to files than to recompute it. **Discussion of materialization** Flink especially is built around the idea of pipelined execution. When the job completes, its output needs to go somewhere durable so that users can find it and use it—most likely, it is written to the distributed filesystem again. Thus, when using a dataflow engine, materialized datasets on HDFS are still usually the inputs and the final outputs of a job. Like with MapReduce, the inputs are immutable and the output is completely replaced. The improvement over MapReduce is that you save yourself writing all the intermediate state to the filesystem as well. ### Graphs and Iterative Processing Goal is to perform some kind of offline processing or analysis on an entire graph (recommendation, ranking engines) This kind of graph processing algorithm is often implemented in an iterative style: - An external scheduler runs a batch process to calculate one step of the algorithm - When the batch process completes, the scheduler checks whether it has finished - If it has not yet finished, the scheduler goes back to step 1 and runs another round of the batch process This approach works, but implementing it with MapReduce is often very inefficient. **The Pregel processing model** `bulk synchronous parallel` model of computation is popular (`Pregel` model) In each iteration, a function is called for each vertex, passing it all the messages that were sent to it—much like a call to the reducer. The difference from MapReduce is that in the Pregel model, a vertex remembers its state in memory from one iteration to the next, so the function only needs to process new messages. If no messages, no work. **Fault tolerance** The fault tolerance is achieved by periodically checkpointing the state of all vertices at the end of an iteration—i.e., writing their full state to durable storage. **Parallel execution** Graph is often simply partitioned by an arbitrarily assigned vertex ID => communication overhead. If your graph can fit in memory on a single computer, it’s quite likely that a single-machine (maybe even single threaded) algorithm will outperform a distributed batch process. If the graph is too big to fit on a single machine, a distributed approach such as Pregel is unavoidable. ### High-Level APIs and Languages The dataflow APIs generally use relational-style building locks to express a computation: joining datasets on the value of some field; grouping tuples by key; filtering by some condition; and aggregating tuples by counting, summing, or other functions. Internally, these operations are implemented using the various join and grouping algorithms that we discussed earlier in this chapter. **The move toward declarative query languages** The application simply states which joins are required, and the query optimizer decides how they can best be executed. By incorporating declarative aspects in their high-level APIs, and having query optimizers that can take advantage of them during execution, batch processing frameworks begin to look more like MPP databases (and can achieve comparable performance). At the same time, by having the extensibility of being able to run arbitrary code and read data in arbitrary formats, they retain their flexibility advantage. ## Summary The two main problems that distributed batch processing frameworks need to solve are: - **Partitioning**: In MapReduce, mappers are partitioned according to input file block, bringing all the records with the same key—together in the same place. Dataflow engines try to avoid sorting unless it is required, they take a broadly similar approach to partitioning - **Fault tolerance**: MapReduce frequently writes to disk. Dataflow engines perform less materialization of intermediate state and keep more in memory, need to recompute data if node fails. Deterministic operators reduce the amount of data that needs to be recomputed. Thanks to the framework, your code in a batch processing job does not need to worry about implementing fault -tolerance mechanisms: the framework can guarantee that the final output of a job is the same as if no faults had occurred, even though in reality various tasks perhaps had to be retried. These reliable semantics are much stronger than what you usually have in online services that handle user requests and that write to databases as a side effect of processing a request. The distinguishing feature of a batch processing job is that it reads some input data and produces some output data, without modifying the input—in other words, the output is derived from the input. Crucially, the input data is `bounded`: it has a known, fixed size. Because it is bounded, a job knows when it has finished reading the entire input, and so a job eventually completes when it is done. # Chapter 11: Stream Processing `event streams`: a data management mechanism, unbounded, incrementally processed counterpart to the batch data. ## Transmitting Event Streams An event is generated once by a `producer` (`publisher` or `sender`) and potentially processed by multi `consumers` (`subscribers` or `recipients`). Related records are grouped into a `topic` or `stream` ### Messaging Systems Messaging system allows multiple producer nodes to send messages to the same topic and allows multiple consumer nodes to receive messages in a topic. **Message brokers compared to databases** Differences: - DBs keep data until explicitly deleted, brokers delete after successfully delivered - The queues are short - DBs support secondary indexes, brokers support subscribing topics matching pattern => select the portion of the data **Multiple consumers** When multi consumers read messages in same topic, two main patterns of messaging: - `Load balancing`: Each message is sent to a consumer and sent randomly, use this when processing the message is expensive -> parallelizing - `Fan-out`: each message is sent to all consumers **Acknowledgments and redelivery** `acknowledgments`: a client must explicitly tell the broker that it has finished processing a message so the broker can remove it. If no ack found, the message is delivered to another consumer. ![image](https://hackmd.io/_uploads/B1GSepTNyl.png) ### Partitioned Logs Why can we not have a hybrid, combining the durable storage approach of databases with the low-latency notification facilities of messaging? This is the idea behind log-based message brokers. **Using logs for message storage** Producer sends a message by appending it to the end of the log, and a consumer receives messages by reading the log sequentially (similar to `tail -f`) In order to scale high throughput -> the log is `partitioned`. Partitions are hosted on different machines, topic can be a group of partitions contain same type messages. ![image](https://hackmd.io/_uploads/H1nB7a6Ekl.png) Examples: Kafka, Kinesis Streams. Fault toleration by replicating messages. **Logs compared to traditional messaging** Load balancing: broker assign entire partitions to nodes in the consumer group. Fan-out is just allow nodes reading from logs Downsides for load balancing with lod-based: - maximum of nodes = number of partitions in the topic - if a single msg is slow to process, block the following messages When messages may be expensive to process or we want to paralleliz processing and ordering not important => use JMS/AMQP style of message broker High message throughput, each message is fast to process, ordering is important => use log-based broker **Consumer offsets** Using offsets for tracking processed messages, similar to `log sequence number` in leader-based replication (broker is the master, consumers are followers) **Disk space usage** Log is divided to segments, remove old segments (or move to archive storage) to save disk usage. Throughput is constant since messages are written to disk anyway and the buffer (on disk) for messages are large. **When consumers cannot keep up with producers** The log-based approach is a form of buffering with a large but fixed-size buffer. You can monitor how far a consumer is behind the head of the log, and raise an alert if it falls behind significantly. As the buffer is large, there is enough time for a human operator to fix the slow consumer and allow it to catch up before it starts missing messages. **Replaying old messages** In log-based message broker: replaying old messages is possible -> can be used like batch processing ## Databases and Streams ### Keeping Systems in Sync Conflicts happen in dual writes strategy ![image](https://hackmd.io/_uploads/rkRQ3cWSye.png) ### Change Data Capture The process of observing all data changes written to a database and extracting them in a form in which they can be replicated to other systems. ![image](https://hackmd.io/_uploads/BkxancWHJl.png) **Implementing change data capture** CDC makes one db the leader and turns other into followers. A log-based message broker is well suited for transporting the change events from the source db. Database triggers can be used to implement CDC but they are fragile and have perf overheads. Parsing the replication log is more robust. CDC is async, replication lag can happen. **Initial snapshot** Using snapshot with CDC for restoring data from a point of time. **Log compaction** If you can only keep a limited amount of log history, you need to go through the snapshot process every time you want to add a new derived data system. However, `log compaction` provides a good alternative. If the CDC system is set up such that every change has a primary key, and every update for a key replaces the previous value for that key, then it’s sufficient to keep just the most recent write for a particular key. Now to rebuild a derived data system, just need to go from offset 0 of the log-compacted topic (without having to take another snapshot) **API support for change streams** Databases are beginning to support change streams as a first-class interface. ### Event Sourcing Similar to CDC, event sourcing also stores all changes as a log of events. Differences: - Application does not care about CDC - Event store is append-only, immutable. Events are designed to reflect things happened at application level (storing user's action for example) **Deriving current state from the event log** Events log can be run many times with the same output. Like CDC, replaying the event log allows reconstructing the current state of the system. However, log compaction needs to be handled differently: - In CDC, log compaction can dícard entirely old versions of the key - Event sourcing: later events typically do not overwrite prior events, so log compaction is not possible in the same way **Commands and events** User request intially is a command, if all validation is successful and the command is accpted, it becomes an event (durable and immutable) ### State, Streams, and Immutability ![image](https://hackmd.io/_uploads/S12ldiZHye.png) **Advantages of immutable events** Immutable events are useful for auditing (for error transactions for example) Immutable events capture more information than just the current state. **Deriving several views from the same event log** Deriving several views for different read and query patterns, blue/green database deployment **Limitations of immutability** Many dbs use immutable data structures and multi-version data. VCS like Git... also immutable Deleting data (in case of security, compliance,...) is hard. ## Processing Streams Examples: - CDC - Produce output streams - Notifications ### Uses of Stream Processing Stream processing has long been used for monitoring purposes **Complex event processing** CEP allows to specify rules to search for certain patterns of events in a stream. Queries are stored long term and events from streams continuously flow past them in search of a query that matches an event pattern (reverse to normal databases) **Stream analytics** Examples: - Measuring the rate of some type of event - Calculating the rolling average of a value over some time period - Comparing current statistics to previous time intervals Stream analytics systems sometimes use probabilistic algorithms, such as Bloom filters. Many open source distributed stream processing frameworks are designed with analytics in mind: Spark Streaming, Flink, Kafka Streams... And Google Cloud Dataflow, Azure Stream Analytics **Maintaining materialized views** CDC is a type maintaining `materialized views` **Search on streams** Such as full-text search queries ### Reasoning About Time Many stream processing frameworks use the local system clock on the processing machine (the processing time) to determine windowing. However, it breaks down if there is any significant processing lag. **Event time versus processing time** There maybe many reasons for delayed processing: queueing, network faults, performance issue, message delays,.. Confusing event time and processing time leads to bad data ![image](https://hackmd.io/_uploads/B1kKUIBrkx.png) **Knowing when you’re ready** 2 options for handling `straggler` events: - Ignore them, they are probably a small percentage of events - Publish a `correction`, updated the value of the window **Whose clock are you using, anyway?** To adjust for incorrect device clocks, one approach is to log three timestamps - Time at which event occured - Time event sent to server - Time event received by the server **Types of windows** Several types of windows in common use: - `Tumbling window`: has a fixed length, and every event belongs to exactly one window - `Hopping window`: has a fixed length, but allows windows to overlap in order to provide some smoothing - `Sliding window`: contains all the events that occur within some interval of each other - `Session window`: has no fixed duration, grouping all events for the same user occur closely. ### Fault Tolerance **Microbatching and checkpointing** `microbatching`: break the stream into blocks, treat them as batch process, using Spark Streaming. They provides tumbling windows. A variant approach, used in Apache Flink, generate rolling checkpoints of state and write them to durable storage. Restarting a failed task causes the external side effect to happen twice, and microbatching or checkpointing alone is not sufficient to prevent this problem. **Atomic commit revisited** Atomic commit the changes to downstream systems. **Idempotence** Even if an operation is not naturally idempotent, it can often be made idempotent with a bit of extra metadata. ## Summary Two types of message brokers: - AMQP/JMS-style message broker - Log-based message broker