# Part 1: Designing Data-Intensive Applications # Part I: Foundations of Data Systems # Chapter 1: Reliable, Scalable and Maintainable Applications ## Thinking About Data Systems Data systems: databases, caches, queues,... ## Reliability Continuing to work correctly, even when things go wrong ### Hardware Faults Hardware redundancy is not enough anymore, it's software's responsiblity for fault-tolerance, especially in cloud based systems like AWS (servers can be down anytime for patching, maintaing,...) ### Software Errors Software errors tend to cause more systematic error within the system. Carefully thinking about assumptions and interactions in the system; thorough testing; process isolation; allowing processes to crash and restart; measuring, monitoring, and analyzing system behavior in production ### Human Errors Humans are unreliable How to prevent: - Good admin interfaces - Non production sandbox for experimenting - Unit tests, integration tests, manual tests - Quick and easy recovery - Detailed, clear monitoring ## Scalability System's ability to cope with increased load. ### Describing Load Load parameters depend on the system architecture Twitter for example: - Number of tweets posted: 4.6kwrites/sec - Number of tweets fetched: 300k reads/sec - Fanout: 345k writes/sec ### Describing Performance 2 ways to look at performance: - increase load and keep system resources (CPU, mem, network) unchanged -> how is the performance affected? - increase load parameter -> how much to increase resources if you want to keep performance unchanged? In batch processing system: `throughput`, in normal systems: `response time` - The `response time` varies, it should be a `distribution`, not number - The `median` and `percentile` should be used ![image](https://hackmd.io/_uploads/HJZwyDGxJe.png) The load-generating client needs to keep sending requests independently of the response time to avoid `head-of-line blocking` ![image](https://hackmd.io/_uploads/Hky8-wGlJg.png) ### Approaches for Coping with Load An architecture that scales well for a particular application is built around assumptions of which operations will be common and which will be rare—the load parameters ## Maintainability 3 design principles: - Operability: easy for keeping the system running smoothly - Simplicity: easy for new engineers to understand the system - Evolvability: easy for engineers to make changes to the system in the future. ### Operability: Making Life Easy for Operations Data systems can do various things to make routine tasks easy - Visibility in runtime behavior and internals of the system - Good support for automation and integration - Avoiding dependency on individual machines - Good documentation, easy-to-understand operational model - Good default behavior, but also giving administrators the freedom to override defaults when needed - Self-healing, manual control for admins - Exhibiting predictable behavior, minimizing surprises ### Simplicity: Managing Complexity Reducing complexity improves the maintainability. Using `abstraction` to reduce complexity. ### Evolvability: Making Change Easy The ease with which you can modify a data system, and adapt it to changing requirements, is closely linked to its simplicity and its abstractions: simple and easy-to-understand systems are usually easier to modify than complex ones ## Summary There is unfortunately no easy fix for making applications reliable, scalable, or maintainable. However, there are certain patterns and techniques that keep reappearing in different kinds of applications. # Chapter 2: Data Models and Query Languages Each layer hides the complexity of the layers below it by providing a clean data model (APIs over APIs, JSON over bytes in memory,...) ## Relational Model Versus Document Model Relational databases turned out to generalize very well, beyond their original scope of business data processing, to a broad variety of use cases ### The Birth of NoSQL Adoption of NoSQL databases: - A need for greater scalability, large datasets or very hight write throughput - A widespread preference for free and open source - Specialized query operations - Frustration with the restrictiveness of relational schemas ### The Object-Relational Mismatch Mismatch in database objects and application code objects. ORM (Object-relational mapping) can be used but didn't completely hide the differences. ### Many-to-One and Many-to-Many Relationships Linkedin profile example ![image](https://hackmd.io/_uploads/HklxX_zeyl.png) ### Are Document Databases Repeating History? Historical systems like `network model` or `hierarchical model` had some remarkable similarities to the JSON model **Network model** - Using access path and links (like linked list) to traverse data nodes - You could change the access paths, but then you had to go through a lot of handwritten database query code and rewrite it to handle the new access paths **The relational model** - The query optimizer automatically decides which parts of the query to execute in which order, and which indexes to use **Comparison to document databases** - Document databases reverted back to the `hierarchical model` in one aspect: storing nested records within their parent record rather than in a separate table. - Document databases use `document reference` for foreign data joining. They do not follow the path of `network model` (CODASYL) ### Relational Versus Document Databases Today Document data model are schema flexibility, better performance due to locality, and that for some applications it is closer to the data structures used by the application. The relational model counters by providing better support for joins, and many-to-one and many-to-many relationships. **Which data model leads to simpler application code?** Depends on type of applications. Does it use many-to-many relationships, does it has document-like structure? **Schema flexibility in the document model** `schema-on-read` (the structure of the data is implicit, and only interpreted when the data is read). Opposite to `schema-on-write` for relational databases. The `schema-on-read` approach is advantageous if the items in the collection don’t all have the same structure for some reason. **Data locality for queries** The locality advantage only applies if you need large parts of the document at the same time. It is generally recommended that you keep documents fairly small and avoid writes that increase the size of a document (entire document need to be rewritten) **Convergence of document and relational databases** Most relational databases now support JSON, XML. Some document dbs support relational-like joins. A hybrid of the relational and document models is a good route for databases to take in the future. ## Query Languages for Data SQL is `declarative` query language. SQL is more limited in functionality gives the database much more room for automatic optimizations. Declarative languages have a better chance of getting faster in parallel execution because they specify only the pattern of the results, not the algorithm that is used to determine the results. ### Declarative Queries on the Web CSS selector example (declarative) and DOM query example (imperative). CSS selector is better. Similarly, in databases, declarative query languages like SQL turned out to be much better than imperative query APIs. ### MapReduce Querying `MapReduce` is a programming model for processing large amounts of data in bulk. `MapReduce` is between declarative and imperative. ## Graph-Like Data Models Many-to-many relationships are an important distinguishing feature between different data models. If your application has mostly one-to-many relationships (tree-structured data) or no relationships between records, the document model is appropriate. Many-to-many relationships are very common: **Graph** Typical examples: - Socal graphs: Vertices are people, and edges indicate which people know each other - Web graph: Vertices are web pages, and edges indicate HTML links to other pages - Road or rail networks: Vertices are junctions, and edges represent the roads or railway lines between them. ![image](https://hackmd.io/_uploads/rkM0xKMx1l.png) ### Property Graphs For example: Neo4j, Titan and InifiniteGraph Vertex: - unique id - set of outgoing edges - set of incoming edges - collection of properties (key-value) Edge: - unique id - vertex at edge starts (tail vertex) - vertex at edge ends (head vertex) - label to describe kind of relationship - collection of properties (key-value) ![image](https://hackmd.io/_uploads/HynwZtfxJx.png) Graphs are good for evolvability. ### The Cypher Query Language A `declarative` query language for property graphs, created for Neo4j ![image](https://hackmd.io/_uploads/SJUrztMxyl.png) ![image](https://hackmd.io/_uploads/HyPHmKfeJl.png) ### Graph Queries in SQL Using `recursive common table expressions`. The SQL query is very complicated compared to `Cypher`. It’s important to pick a data model that is suitable for your application. ### Triple-Stores and SPARQL Mostly equivalent to property graph model, using different words to describe the same ideas. (`subject`, `predicate`, `object`), for example: (`Jim`, `likes`, `bananas`) ![image](https://hackmd.io/_uploads/rkfKrtzgJg.png) **The semantic web** Different websites to publish data in a consistent format, allowing data from different websites to be automatically combined into a web of data—a kind of internet-wide "database of everything." **The RDF data model** `Resource Description Framework` data: machine-readable data for computers to read. **The SPARQL query language** A query language for triple-stores using RDF data model. ![image](https://hackmd.io/_uploads/r1g6LFMlye.png) ### The Foundation: Datalog Old foundational query language. Similar to triple-store model, generalized a bit: `predicate(subject, object)` It’s less convenient for simple one-off queries, but it can cope better if your data is complex. ## Summary Historically, data started out being represented as one big tree (the hierarchical model), but that wasn’t good for representing many-to-many relationships, so the relational model was invented to solve that problem. More recently, developers found that some applications don’t fit well in the relational model either. New nonrelational "NoSQL" datastores have diverged in two main directions: - Document databases: target use cases where data comes in self-contained documents and relationships between one document and another are rare - Graph databases go in the opposite direction, targeting use cases where anything is potentially related to everything. # Chapter 3: Storage and Retrieval On the most fundamental level, a database needs to do two things: when you give it some data, it should store the data, and when you ask it again later, it should give the data back to you. Storage engines families: `log-structured` and `page-oriented` (B-trees for example) ## Data Structures That Power Your Database Many databases internally use a `log`, which is an append-only data file. To efficiently find the value for particular key in database, use the `index` data structure. The general idea behind them is to keep some additional metadata on the side, which acts as a signpost and helps you to locate the data you Well-chosen indexes speed up read queries, but every index slows down writes. ### Hash Indexes ![image](https://hackmd.io/_uploads/BkIzXoXxkg.png) Log is append-only, so the size will increase. Break the log into segments of certain size and use `compaction` on the segments to reduce the size of logs. ![image](https://hackmd.io/_uploads/Hyo_7s7gyg.png) We can also merge segments, the old segments still can be served, the new segments are served after merging. ![image](https://hackmd.io/_uploads/SygGNs7gyg.png) Each segment has its own hash table, the lookup is performed from the most recently written segment the go front. **Detail in real implementation:** - File format: binary format - Deleting records: append a special deletion record to tell the merging process discard the key - Crash recovery: storing segment's hash map on disk. - Partially written records: using checksum - Concurrency control: common to have 1 writer thread and multiple read threads **Limitations**: - The hash table must fit in memory, not suitable for large key number - Range queries are not efficient. ### SSTables and LSM-Trees The sequence of key-value in log file is `sorted by key`. This is `Sorted String Table` (SSTable) 1. Merging is simple and efficient ![image](https://hackmd.io/_uploads/ryeQFk4x1g.png) 2. In-memory index size is smaller ![image](https://hackmd.io/_uploads/HJnUt14gJx.png) 3. Compress range of records before writing to disk **Constructing and maintaining SSTables** Storage engine flows: - When write: add it to an in-mem balanced tree data structure, sometimes called a `memtable` - Memtable get bigger -> write to disk as an SSTable file - a segment file, continue with a next memtable - Read: find key in memtable, then recently written segments and older ones. - Merging and compaction on segments. Limitations: memtable is lost if crash -> use a separated log file as above to write records (no sort) **Making an LSM-tree out of SSTables** `Log-Structured Merge-Tree` (LSM-Tree). Storage engines that are based on this principle of merging and compacting sorted files are often called LSM storage engines. **Performance optimizations** LSM-Tree algorithm can be slow when looking up key not in database. Storage engines use `Bloom filters` (A Bloom filter is a memory-efficient data structure for approximating the contents of a set. It can tell you if a key does not appear in the database, and thus saves many unnecessary disk reads for nonexistent keys.) `size-tiered` compaction: newer and smallers segments are merged into older and larger ones `leveled` compaction: the key range is split up into smaller SSTables and older data is moved into separate "levels" ### B-Trees The most widely used indexing structure is B-Tree. Standard index implementation in almost all relational databases and many non relational ones. B-Trees keep key-value pairs sorted (like SSTables). It break the database into fixed-size blocks (pages) ![image](https://hackmd.io/_uploads/S1iVQx4lyl.png) **Making B-trees reliable** The basic underlying write operation of a B-tree is to overwrite a page on disk with new data, assuming that the address is not changed. It's easy to be corrupted when writing to disk. B-Tree includes additional data structure on dis: write-ahead log (WAL, `redo log`). Append-only file. **B-tree optimizations** Few ways to optimize: - Instead of WAL, use copy-on-write scheme - Abbreviating the key - Place leaf pages in sequential order on disk - Additional pointers for sibling leaf (not go back to parents) - B-Tree variants such as `fractal trees` borrow some log-structured ideas to reduce disk size ### Comparing B-Trees and LSM-Trees LSM-Trees are typically faster for writes and B-Trees are faster for read. **Advantages of LSM-trees** B-Tree write: to WAL, to pages (twice if page is splitted). LSM-trees are typically able to sustain higher write throughput than B-trees, partly because they sometimes have lower write amplification LSM-trees can be compressed better, and thus often produce smaller files on disk than B-trees **Downsides of LSM-trees** A request can wait for disk operation (compaction of SSTables). For high write throughput, compaction speed may not catch up write speed -> disk increase, out of disk, read is slower In B-Tree, in tree exists once, can be used for locking keys -> transactions. ### Other Indexing Structures `Secondary indexes` often be used, the key is not unique. This can be solved in two ways: either by making each value in the index a list of matching row identifiers (like a postings list in a full-text index) or by making each key unique by appending a row identifier to it. (B-Trees and log-structured indexes can be used) **Storing values within the index** The key in index is pointing to the actual storage of data (`heap` file) `clustered index`: store the indexed row directly within an index, require additional space **Multi-column indexes** `concatenated index`: concat the columns to form an index Multi-dimensional indexes are a more general way of querying several columns at once, which is particularly important for geospatial data. Specialized spatial indexes such as R-trees are used. For example, PostGIS implements geospatial indexes as R-trees. **Full-text search and fuzzy indexes** `fuzzy`: similar keys, such as mispelled words Full-text search engines commonly allow a search for one word to include synonyms, ignore grammatical variations, search for occurrences of words near each other in the same document, support various other features that depend on linguistic analysis of the text. To cope with typos in documents or queries, Lucene is able to search text for words within a certain edit distance. **Keeping everything in memory** Besides performance, another interesting area for in-memory databases is providing data models that are difficult to implement with disk-based indexes. For example, Redis offers a database-like interface to various data structures such as priority queues and sets. Because it keeps all data in memory, its implementation is comparatively simple. ## Transaction Processing or Analytics? ![image](https://hackmd.io/_uploads/S1xwNgHgkg.png) ### Data Warehousing It's not good to let business analysts run ad hoc analytic queries on an OLTP database, since those queries are often expensive, scanning large parts of the dataset, which can harm the performance of concurrently executing transactions. A `data warehouse` is a separate database for analytics. Data is extracted from OLTP database (using periodic data dump or continuos stream of updates), transformed into an analysis-friendly schema, cleaned up, and then loaded into the data warehouse. This process is called `ETL`. **The divergence between OLTP databases and data warehouses** Data model for data warehouse is mostly relational (using SQL) On the surface, a data warehouse and a relational OLTP database look similar (with SQL query interface). The internals of the systems can look quite different, because they are optimized for very different query patterns. ### Stars and Snowflakes: Schemas for Analytics Many data warehouses are used in a formulaic style, known as `star schema` (`dimensional modeling`) At the center of the schema is a so-called `fact table`. Each row of the fact table represents an event that occurred at a particular time. ![image](https://hackmd.io/_uploads/BJnJdeSeke.png) Some of the columns in the fact table are attributes, other are foreign keys to `dimension tables`. `snowflake schema`: a variation, dimensions are break down into subdimensions. More normalized but more complex. In a typical data warehouse, tables are often very wide: fact tables often have over 100 columns, sometimes several hundred. Dimension tables can also be very wide. ## Column-Oriented Storage If there are trillions of rows and petabytes in fact tables, storing and querying becomes challenging. Using index on dimensional FKs still make the database to load all the rows. `column-oriented storage`: don’t store all the values from one row together, but store all the values from each column together instead ### Column Compression Column data is often easy to compressed, a common technique: `bitmap encoding` ![image](https://hackmd.io/_uploads/H18mogSg1l.png) **Memory bandwidth and vectorized processing** For data warehouse queries that need to scan over millions of rows, a big bottleneck is the bandwidth for getting data from disk into memory. Besides reducing the volume of data that needs to be loaded from disk, column-oriented storage layouts are also good for making efficient use of CPU cycles. `vectorized processing`: A CPU can execute such a loop much faster than code that requires a lot of function calls and conditions. Operators, such as the bitwise AND and OR described previously, can be designed to operate on such chunks of compressed column data directly ### Sort Order in Column Storage The data can be sorted using columns, this is good for column compression. That compression effect is strongest on the first sort key **Several different sort orders** Different queries benefit from different sort orders, so why not store the same data sorted in several different ways. Having multiple sort orders in a column-oriented store is a bit similar to having multiple secondary indexes in a row-oriented store. Big difference: row-oriented store keeps every row in one place (heap file or a clustered index), and secondary indexes just contain pointers to the matching rows. In a column store, there normally aren’t any pointers to data elsewhere, only columns containing values. ### Writing to Column-Oriented Storage These optimizations make writes more difficult. An update-in-place approach, like B-trees use, is not possible with compressed columns. **Solution**: Using LSM-Trees, writes first go to an in-memory store, prepared to writing to disk. When enough writes have accumulated, they are merged with the column files on disk and written to new files in bulk. Queries need to examine both the column data on disk and the recent writes in memory, and combine the two ### Aggregation: Data Cubes and Materialized Views OLTP databases often use virtual views (shortcut for writing query) instead of materialize views (actual copy of query results) - because of write update overhead In read-heavy data warehouses materialized views can make more sense ## Summary There are big differences between the access patterns in use cases: - OLTP systems: user-facing, they may see a huge volume of requests. Applications usually only touch a small number of records in each query. The application requests records using some kind of key, and the storage engine uses an index to find the data for the requested key. Disk seek time is often the bottleneck here. - Data warehouses and similar analytic systems are less well known, because they are primarily used by business analysts, not by end users. They handle a much lower volume of queries than OLTP systems, but each query is typically very demanding, requiring many millions of records to be scanned in a short time. Disk bandwidth (not seek time) is often the bottleneck here, and column-oriented storage is an increasingly popular solution for this kind of workload. OLTP: - log-structured: permit appending only to files and delete but not update them. Bitcask, SSTables, LSM-trees, LevelDB, Cassandra, HBase, Lucene, and others belong to this group - update-in-place: which treats the disk as a set of fixed-size pages that can be overwritten. B-Tree are majorly used in relational databases # Chapter 4: Encoding and Evolution When data format or schema changes, change to application code needs to happen: - Server side: rolling update - Client side: depends on when user installing the update The old and new data format can coexist. We need to maintain compatibility: - Backward compatibility: newer code can read old format data (easy to apply) - Forward compatibility: older code can read new format data (harder to apply) ## Formats for Encoding Data Data (at least) exist in two representations: - In memory: data is kept in data structures - Write to file or send over network: need to encode to self-contained sequence of byte (JSON for example) `encoding` (`serialization` or `marshalling`): translation from in-mem to byte sequence. Reverse is `decoding`, `parsing`, `deserialization`, `unmarshalling`. ### Language-Specific Formats Languages come with built-in serializers: `pickle` for Python, `Serializable` for Java... Also 3rd party libs. Problems: - Languages lock-in - Security problem when deserializing - Efficiency (CPU time, payload size) - Versioning data Generally a bad idea to use language-specific formats. ### JSON, XML and Binary Variants JSON, XML, CSV: textual formats. Problems: - Ambiguity in encoding of numbers. XML, CSV: cannot distinguish number with string. JSON: floating-point problem and big numbers. - JSON, XML: good for Unicode characters but don't support binary strings (workaround with base64 encoding, but increase the size 33%) - CSV is quite a vague format. **Binary encoding** To reduce size and faster to parse. JSON: MessagePack, BSON, BJSON, BISON, MessagePack,... XML: WBXML, Fast Infoset... ![image](https://hackmd.io/_uploads/By0xaaslJg.png) ### Thrift and Protocol Buffers Apache Thrift and `protobuf`: binary encoding libs. They require a schema for any data that is encoded. Thrift interface definition language (IDL) - using CompactProtocol (remove fields) and BinaryProtocol for encoding ![image](https://hackmd.io/_uploads/SyWqp6sxke.png) Protocol Buffers schema definition - only 1 encoding protocol ![image](https://hackmd.io/_uploads/H1M2ppsxJe.png) ![image](https://hackmd.io/_uploads/BJB91RixJg.png) The `required` and `optional` property is a runtime check, not encoded in the payload **Field tags and schema evolution** Field names can be changed but not the field tags. New data can add new field tags without compromising old code (but cannot be required) Only removing optional fields possible and not using the same tag number again. **Datatypes and schema evolution** Possible with a risk of values losing precision or get truncated. ### Avro Apache Avro: useful for Hadoop's use cases. 2 schema languages: Avro IDL (human editing) and one based on JSON (machine-readable) ![image](https://hackmd.io/_uploads/HyWTVRigye.png) Avro binary encoding is the most compact of the all encoding format above (no fields or datatypes). ![image](https://hackmd.io/_uploads/B1Aer0sl1g.png) The binary data must be exact same as schema. **The writer’s schema and the reader’s schema** ![image](https://hackmd.io/_uploads/ryjRBRoxJx.png) **Schema evolution rules** To maintain compatibility, you may only add or remove a field that has a default value. Changing the datatype of a field is possible, Avro can convert tthe field. Changing the name of a field is possible but a little tricky: the reader’s schema can contain aliases for field names. **But what is the writer’s schema?** The answer depends on the context in which Avro is being used - Large file with lots of records - Database with individually written records - Sending records over a network connection **Dynamically generated schemas** Avro doesn't have tag numbers so it's friendly to `dynamically generated schemas` ### The Merits of Schemas Although textual data formats like JSON, XML, CSV are widespread, binary encodings based on schemas are also a viable option. They have a number of nice properties: - Much more compact (than binary JSON), can omit name fields - The schema is a valuable form of documentation - Keeping a database of schemas allows you to check forward and backward compatibility of schema changes - For static programming languages, generate code from the schema is useful, it enables type checking at compile time. ## Modes of Dataflow Most common ways how data flows between processes: - Via databases - Via service calls - Via asynchronous message passing ### Dataflow Through Databases 2 possible ways: - Single process acessing the database - The processes accessing the databases may from different applications or services, or several instances of the same service (parallel). Both backward and forward compatability needed. A bad case: ![image](https://hackmd.io/_uploads/ByX9by2eyl.png) **Different values written at different times** `data outlives code`: data from long time ago may still live in database (old schemas). Migrating data in large datasets are expensive. Most relational databases allow simple schema changes, such as adding new columns with `null` value as default. Schema evolution thus allows the entire database to appear as if it was encoded with a single schema, even though the underlying storage may contain records encoded with various historical versions of the schema. **Archival storage** Snapshots, data dumps,... As the data dump is written in one go and is thereafter immutable, formats like Avro object container files are a good fit. ### Dataflow Through Services: REST and RPC Clients and servers communicate through network, `microservices` architecture for example We should expect old and new versions of servers and clients to be running at the same time, and so the data encoding used by servers and clients must be compatible across versions of the service API **Web services** Using REST and SOAP as the main approaches **The problems with remote procedure calls (RPCs)** A network request is very different from a local function call: - A local function call is predictable, network call is not - If you don’t get a response from the remote service, you have no way of knowing whether the request got through or not - Requests may go through but responses are lost - Network call response time varies - Problematic with large object **Current directions for RPC** Thrift and Avro come with RPC support included, gRPC is an RPC implementation using Protocol Buffers, Finagle also uses Thrift, and Rest.li uses JSON over HTTP Custom RPC protocols with a binary encoding format can achieve better performance than something generic like JSON over REST. REST seems to be the predominant style for public APIs. The main focus of RPC frameworks is on requests between services owned by the same organization, typically within the same datacenter. ### Message-Passing Dataflow `asynchronous message-passing systems`, which are somewhere between RPC and databases. Using a message broker has several advantages compared to direct RPC: - It can act as a buffer if the recipient is unavailable or overloaded - It can automatically redeliver messages - It avoids the sender needing to know the IP address and port number of the recipient - It allows one message to be sent to several recipients. - It logically decouples the sender from the recipient `message passing` is one-way, different from RPC -> `asynchronous` **Message brokers** Message brokers are used as follows: one process sends a message to a named queue or topic, and the broker ensures that the message is delivered to one or more consumers of or subscribers to that queue or topic. **Distributed actor frameworks** `actor model`: a programming model for concurrency in a single process. in `distributed actor frameworks`, this model is used to scale an app across nodes. A distributed actor framework essentially integrates a message broker and the actor programming model into a single framework. Three popular distributed actor frameworks handle message encoding as follows: - `Akka` - `Orleans` - `Erlang OTP` ## Summary Data encoding formats and their compatibility properties: - Programming language–specific encodings are restricted to a single programming language and often fail to provide forward and backward compatibility. - Textual formats like JSON, XML, and CSV are widespread, and their compatibility depends on how you use them. You have to be careful with things like numbers and binary strings - Binary schema–driven formats like Thrift, Protocol Buffers, and Avro allow compact, efficient encoding with clearly defined forward and backward compatibility semantics We can conclude that with a bit of care, backward/forward compatibility and rolling upgrades are quite achievable. # Part II: Distributed Data Reasons to distribute a database across multiple machines - Scalability - Fault tolerance/high availability - Latency ## Scaling to Higher Load `shared-memory` architectures (scaling up, vertical scaling) have many limitations - Bottlenecks - Cost - Limited fault tolerance ### Shared-Nothing Architectures Also called `horizontal scaling`. Machines are nodes, coordination is done via software level. It also can increase complexity for systems. ### Replication Versus Paritioning Replication: keep a copy of same data on different nodes, creates redundancy Partitioning (sharding): splitting big database into smaller subsets, they can be assigned to different nodes ![image](https://hackmd.io/_uploads/rkulQTEWyl.png) # Chapter 5: Replication Purpose of replications: - To keep data geographically close to users (reduce latency) - To allow the system to continue working even if some parts failed (high availability) - Scale out the number of machines that can serve read queries (increase read throughput) If data not change over time, replication is easy. Difficulty of replication lies in handling changes. Three popular algorithms: `single-leader`, `multi-leader` and `leaderless` replication. ## Leaders and Followers The most common solution for `replication` is the `leader-based replication` (`active/passive` or `master-slave replication`): - Clients write to `master` (`leader`) - one of the replicas, master writes data to its local storage - `followers` (`read replicas`) received `replication log` (`change stream`) from master then write to their local storage - reads is done on `followers`, writes only accepted on `leader` ![image](https://hackmd.io/_uploads/SkedrGL-Je.png) Leader-based is applied on databases (PostgreSQL, MySQL...), message brokers (Kafka, RabbitMQ), network filesystems (DRBD). ### Synchronous Versus Asynchronous Replication ![image](https://hackmd.io/_uploads/BkQfuM8-kl.png) Advantage of synchronous: follower is guaranteed to have updated data, if master fail, we still have latest data. Disadvantage: if replication action fail, leader blocks all writes until the replica is up again. In practice, enabling synchronous replication on database means: 1 node is sync, others are async. If the sync node unavailable, one async node is made sync (at least 2 nodes with latest data). This configuration is called `semi-synchronous` Full async replication allow the continuing of writes actions, but may cause data lost (weak durability) if leader fails and replication is finished. But this way is still widely used (especially in geography use case). ### Setting Up New Followers Conceptually, the process looks like: 1. Take a snapshot of leader's database at some point in time, if possible, without locking the database 2. Copy the snapshot to follower 3. Follower connects leader, requets all data changed since snapshot taken, snapshot is associated with an exact position in leader's replication log (postgre: `log sequence number`, mysql: `binlog coordinates`) 4. `caught up`: process data changes from snapshot time. It now continue to process data changes ### Handling Node Outages The goal is to keep the system as a whole running despite individual node failures, and to keep the impact of a node outage as small as possible **Follower failure: Catch-up recovery** Follower caught up with leader by requests changes from leader, using it's changelog as the mark. **Leader failure: Failover** `failover`: A follower need to be promoted, writes must be routed to this node, other followers consume data from this node Automatic failover: 1. *Determining that the leader has failed*. Normally using timeout messages for checking. 2. *Choosing a new leader*. Through an election process or through `controller node`. The best candidate is the follower with the most up-to-date data. 3. *Reconfiguring the system to use the new leader*. Routing writes to new leader, system also need to ensure that if the old leader is back, it becomes follower too. Failover is fraught with things that can go wrong: - In async replication, conflict data between new leader and old leader (if it's back). The solution is to discard old leaders's unreplicated data (may violate durability) - Dangerous if other storage systems outside of the database need to be coordinated with the database contents - Two nodes believe they are leader (`split brain`). One common solution is to shut down 1 node - What is the right timeout for detecting leader dead? May the leader is lagging because of traffic spike? Because of these problems, some operation team prefer manually failover. ### Implementation of Replication Logs **Statement-based replication** Leader logs every write request and sends to followers. Problems: - Nondeterministic function, NOW() or RAND() would generate a different value for replicas - Exact order must be followed, this could be limiting for multiple concurrently executing transactions. - Statements with different side effects may result in different side effects in each replicas (triggers, stored-procedure, UDF) Other replicaion methods are preferred. **Write-ahead log (WAL) shipping** The log used in Btree before can also be used by leader to followers for building the exact copy of data. (this method is usedin PostgreSQL and Oracle) Disadvantage: data is described in a very low level (bytes and blocks). Replication is closely coupled to storage engine so it's not possible to run different version of database softwares on leader and followers. This could create downtime when upgrading storage engine. **Logical (row-based) log replication** An alternative to use different log formats for replication and storage engine. This is `logical log` Logical log: sequence of records describing writes to database tables at the granularity of a row: - For an inserted row, the log contains the new values of all columns - For a deleted row, the log contains enough information to uniquely identify the row that was deleted. - For an updated row, the log contains enough information to uniquely identify the updated row, and the new values of all columns MySQL's binlog (when configured to use row-based rep) use this approach. Logical log is decoupled from storage engine so it's possible to run different version on leader and followers. This is also easier in `change data capture` use case. **Trigger-based replication** Replication at application layer. Database's triggers trigger custom code to replicate data to external system. This has greater overheads and error-prone but it's more flexible for replication to external system use cases. ## Problems with Replication Lag With async replication, the result may different because of lag, it's `eventual consistency` ### Reading Your Own Writes ![image](https://hackmd.io/_uploads/r1OWyvUWyl.png) `read-after-write consistency` guarantees user (who writes) see the updates (no promises for other users) Some possible techniques: - Read things from own user, read from leader, otherwise read from follower. Need to know if things would change, an example: read leader when reading own profile and read followers for other users's profiles. - If many things can be editable, track the modify time, in a certain amount of time, read from leader. Could also monitor replication lag on followers and prevent read on them. - Client remember the most recent write, check with replicas and leader - If replicas distributed across data-center (geographically), request served by leader must be routed to the datacenter contains the leader. For `cross-device` read-after-write consistency, some issues need to be considered: - Remembering last updated time on client is difficult because of cross-device. This metadata need to be centralized. - In multiple datacenters use case, need to route all user's device to a same datacenter ### Monotonic Reads ![image](https://hackmd.io/_uploads/Bk2Sbw8Wyl.png) `Monotonic reads` prevents this. It's weaker than strong consitency but stronger than eventual consistency. It guarantees user do not see backward data when making serveral reads in sequence. A possible solution: Make the user stick to a replica, for example: using hash of user ID for replica mapping. If replica fails, user's queries need to be rerouted. ### Consistent Prefix Reads ![image](https://hackmd.io/_uploads/S1CvGD8Z1l.png) `consistent prefix reads` prevent this kind of anomaly. It guarantees that if a sequence of writes happens in a certain order, then anyong reading those writes will see them appear in the same order. This problem often happens in sharded databases because partitions operate independently. One solution: writes casually related to each other are written to the same partition - but in some apps, this can not be done efficiently. ### Solutions for Replication Lag Single-node transactions have existed for a long time, but in distributed databases, it's expensive in performance and scalability. Eventual consistency is inevitable in a scalable system (??? revisit this statement in chapter 9) ## Multi-Leader Replication In one leader replication, if the leader fails, the writes fail. `multi-leader` (`master-master`, `active/active`): leader acts as a follower to other leaders, leader foward changes to all other nodes (including other leaders) ### Use Cases for Multi-Leader Replication **Multi-datacenter operation** ![image](https://hackmd.io/_uploads/BklzMBSPWyg.png) Comparison with single-leader: - *Performance*: in single-leader, latency to writes when it travel through internet to the datacenter contains leader. In multi-leader, writes go to local datacenter, replication in async, hide the latency from users - *Tolerance of datacenter outages*: Datacenter can operate independently, without failover like in single-leader - *Tolerance of network problems*: a temporary network interruption does not prevent writes being processed (like if it goes over internet like in single-leader) Multi-leader is often implemented by external tools, Tungsten Replicator for MySQL, BDR for PostgreSQL, GoldenGate for Oracle *Big downside:* the same data is modified in two datacenter, need to resolve conflict Multi-leader is dangerous and should be avoided if possible **Clients with offline operation** Application needs to work while it's disconnected with Internet (calendar app for example) The changes will be synced when the device goes online. Each device has a local database that acts as a leader (accepts write), async replication between multi-leader in all devices. CouchDB is designed for this mode of operation. **Collaborative editing** When one user edits a document, the changes are instantly applied to their local replica (the state of the document in their web browser or client application) and asynchronously replicated to the server and any other users who are editing the same document. ### Handling Write Conflicts This is the biggest problem with multi-leader. ![image](https://hackmd.io/_uploads/rkxwsHP-Je.png) **Synchronous versus asynchronous conflict detection** Using synchronous conflict detection may prevent this, but it's meaningless, just use single-leader instead **Conflict avoidance** Avoiding conflicts is a frequently recommended approach (all writes for a record go thru the same leader) Sometimes you need to change the leader of a record. For example when the leader fails or a user move to different region **Converging toward a consistent state** The database must resolve conflict in a convergent way, all replicas must arrive at the same value when all changes have been replicated Some ways: - Give each write a unique ID (e.g., a timestamp, a long random number, a UUID, or a hash of the key and value), pick the write with the highest ID as the winner. `last write wins` (LWW): timestamp is used, it's popular but prone to data loss - Give each replica a unique ID, and let writes that originated at a higher-numbered replica always take precedence over writes that originated at a lower-numbered replica. This approach also implies data loss. - Somehow merge the values - order them alphabet and concat them - Record conflict in an explicit data structure that preserves all information, write application code that resolves the conflict at some later time (perhaps by prompting the user). **Custom conflict resolution logic** Conflict resolution code may be executed on write or on read: - *On write*: calls the conflict handler in case of conflict, runs in background - *On read*: conflicting writes are stored. Application prompt user to resolve conflict and write result back. CouchDB works this way. Conflict is row, document level, so in transaction, writes are still considered separatedly. **Automatic Conflict Resolution** Few researches: - Conflict-free replicated datatypes (CRDTs): family of data structures that can be edited by multi users, automatically resolve conflicts - Mergeable persistent data structures: track history explicitly, similar to Git VCS, use 3-way merge function - Operational transformation: algorithm for collaborative editing applications (Etherpad or Google Docs) It’s likely that they will be integrated into more replicated data systems in the future, make multi-leader data sync simpler. ### Multi-Leader Replication Topologies `replication topology`: communication paths which writes are propagated from one node to another. With more than 2 leaders, many topologies are possible: ![image](https://hackmd.io/_uploads/SyveQIDbye.png) The most general: `all-to-all`. MySQL default: `circular` In circular and star, write need to pass through several nodes. In the replication log, each write is tagged with the identifiers of nodes it passed through. Ignored when duplicated In circular and star, single point of failure can happen. For all-to-all this does not happen, but it has issues too: ![image](https://hackmd.io/_uploads/S1h27IPWJl.png) To order events correctly, using `version vectors`. However, conflict detection techniques are poorly implemented in many multi-leader replication systems. ## Leaderless Replication Used in AWS DynamoDB, this kind of database is also known as `Dynamo-style`. Cassandra also uses this. Any replica directly accept writes Client directly sends writes to several replicas, while in others, a coordinator node does this on behalf of the client. The coordinator does not enforce a particular ordering of writes. ### Writing to the Database When a Node Is Down Failover does not exist ![image](https://hackmd.io/_uploads/HkYo82uZkg.png) Read requests are sent to *several nodes in parallel*. Version numbers are used to determine newer value. **Read repair and anti-entropy** How can node catchs up after it comes back online: - `Read repair`: client writes new value back to the stale replica - `Anti-entropy process`: background process for checking and copying missing data, unlike replication log, this process does not guarantee write order, there maybe a significant delay Not all systems use both. Without anti-entropy process, rarely read data maybe stale, reduced durability. **Quorums for reading and writing** `w`: write to node successfully `r`: up to date read node `w + r > n`: up-to-date value, this is quorum reads and writes Common choice for Dynamo-style databases: make `n` odd (3 or 5), set `w = r = (n + 1) / 2`, this is configurable Quorum condition `w + r > n`, allows system to tolerate unavailable nodes: - `w < n`: we can write if a node unavailable - `r < n`: we can read if a node unavailable - with `n = 3, w = 2, r = 2`: tolerate 1 unavailable node - with `n = 5, w = 3, r = 2`: tolerate 2 unavailable nodes - normally, reads and writes are sent to `n` replicas parallely. `w` and `r` determines how many nodes we wait for before we consider successfully, otherwise, error returns. ![image](https://hackmd.io/_uploads/HJW492d-yx.png) ### Limitations of Quorum Consistency Often, `r` and `w` are chosen to be a majority (more than n/2), this can ensure tolerate up to n/2 node failures. But quorums are not necessarily majorities, it matters that the sets of nodes used by reads/write overlap. With smaller `w` and `r`, lower latency and higher availability but data might be stale. However, with quorum condition satisfied, stale data may happen: - If a sloppy quorum is used, `w` nodes different with `r` nodes, no longer overlap guaranteed. - If writes concurrently, we need to merge concurrent writes, writes can be lost. - If a write concurrent with a read, write may be only on some replicas, data may be old or new - If a write happened on some nodes but lower than `w`, the data on new nodes may be return even though the write didn't success (because writes node < `w`) - If a node with new value fail, it may be recovered from stale node, this break the quorum condition because write nodes < `w` - Unlucky with timing Quorum allow to adjust the probability of reading stale values but it's not a absolute guarantee. **Monitoring staleness** In leader based replication, we can monitor the replication lag from exposed metrics. This is possible because writes are applied in order. It's more difficult in leaderless replication because no write order guaranteed. Moreover, if db only uses read repair (no anti-entropy), the value may be stale for a long time. ### Sloppy Quorums and Hinted Handoff Quorums can tolerate failed and slow nodes. Leaderless replication is good for use cases that require HA and low latency, and can tolerate occasional stale reads. In large cluster (more than `n` nodes), client can connect to some database nodes during network interruption, but not to the nodes that it needs to assemble a quorum. Trade-off: - Is it better to return errors to requests that cannot satisfy quorum? - Or should we accept writes, write them to reachable nodes but aren't among the `n` nodes on which the value usually lives? The latter is `sloppy quorum`: writes and reads still require `w` and `r` but may not include nodes among the `n` home nodes. `hinted handoff`: when network is available, writes on temp node sent to "home" nodes. Trade off: even `w + r > n`, not sure to read the latest value (may be written to node outside of `n`). Sloppy quorum incrase write availability. In Cassandra, it's disable by default. **Multi-datacenter operation** In Cassandra, `n` includes nodes in all datacenters. Write from client is sent all replicas, but it only waits for quorum of nodes in local datacenter for low latency. Writes to other datacenter is async. ### Detecting Concurrent Writes Concurrent write to the same key. The problem: events in a different order at different nodes ![image](https://hackmd.io/_uploads/rJLpM-tbyg.png) How to achieve eventual consistency? **Last write wins (discarding concurrent writes)** For example, attach timestamp to each write. Cassandra supports this conflict resolution method. May lose data. If losing data is not acceptable, LWW is a bad choice. **The “happens-before” relationship and concurrency** An operation A `happens before` another operation B if B knows about A, or depends on A, or builds upon A in some way. Two operations are `concurrent` if neither happens before the other (neither knows about the other) In concurrent operations, time does not matter. **Capturing the happens-before relationship** ![image](https://hackmd.io/_uploads/rylwLZYWye.png) The algorithm: - Server maintains a version number for every key, increments version when key is written, stores new version with the value written - Client reads key, server returns all values have not been overwritten, with latest version. A client must read a key before writing - Client writes a key, it must include the version from the prior read, and it must merge together all values that it received in the prior read. (The response from a write request can be like a read, returning all current values, allows us to chain several writes.) - Server receives a write with a particular version, it overwrite all values with that version or below (since it knows that they have been merged), but it must keep all values with a higher version (because those values are concurrent with the incoming write). **Merging concurrently written values** The algorithm ensures not data is dropped but clients have to clean up by merging concurrently written values. It's the same problem as conflict resolution in multi-leader replication. This can happen in application code but it's complex and error-prone. Some data structures (like CRDTs) support automatic conflict resolution (discussed above) **Version vectors** The above algorithm is developed from a single replica, how about multi replicas? We need to use version number `per replica` as well as per key. `version vector`: collection of version numbers from all replicas. It's sent from the database replicas when values are read and sent back to db when write. This allows db to distinguish between overwrites and concurrent writes. ## Summary Replication purpose: - HA - Disconnected operation - Latency - Scalability Three main approaches for replication: - Single-leader replication: writes to single node which sends a stream of data change events to other replicas. Reads on any replica (might stale if from follower) - Multi-leader replication: write to one of several leaders. The leaders send streams of data change events to each other and to any followers. - Leaderless replication: write to several nodes, and read from several nodes in parallel in order to detect and correct nodes with stale data. Single-leader is popular: easy to understand, no conflict resolution. Multi-leader is good for faulty nodes, network interruptions, latency spikes but provides weak consistency. Replication can be sync or async. Consistency models which are helpful for deciding how an application should behave under replication lag: - Read-after-write - Monotonic reads - Consistent prefix reads # Chapter 6: Partitioning Partitions are defined in such a way that each piece of data (each record, row, document) belongs to exactly one partition. Each partition is a small database of its own. Partitioning is for increasing `scalability` ## Partitioning and Replication Everything about replication of databases can be applied equally to replication of partitions ![image](https://hackmd.io/_uploads/rJeuNr5Zye.png) ## Partitioning of Key-Value Data The goal with partitioning is to spread the data and query load evenly across nodes. `skewed`: some partitions have more data or queries than others. `hot spot`: partition with high load Simplest approach to avoid hot spot: assign records randomly, disadvantage: to read a particular item, no way to knowing which node it's on, have to query all nodes parallely ### Partitioning by Key Range Assign a continuous range of keys (from minimum to some maximum) to each partition. The request can go directly to the node that contains the partition and the key boundaries ![image](https://hackmd.io/_uploads/By34vSc-kl.png) The partition boundaries might be chosen manually by admins, or automatically by database. This partitioning strategy is used by Bigtable, Hbase, RethinkDB and MongoDB With each partition, keys can be sorted (SSTables and LSMTrees). This is easy for range scans (scanning for events in the timestamp-based key records) Downside: certain access patterns can lead to hot spots. For example: many writes to a range timestamp - in one partition in a day To avoid this problem, you should add a prefix to the timestamp key (like a object name...), but will need to adjust the query for range scans, with the object name. ### Partitioning by Hash of Key The hash function need not to be strong, Cassandra and MongoDB uses MD5. Each partition contains a range of hashes ![image](https://hackmd.io/_uploads/HJkmUGjZ1e.png) This technique is good at distributing keys fairly among partitions. Downside: lose the ability to perform range queries. Cassandra achieves a combination between 2 strategies, using `compound primary key`, first part of key is hashed to determine partition, other for sorting. ### Skewed Workloads and Relieving Hot Spots When many reads and writes for the same key, hot spots still happen. A simple techinque to use in application POV: add a random number to begin or end of hot key. This would split the key writes evenly across keys, on different partitions (reads need to be combined from different keys) ## Partitioning and Secondary Indexes The partitioning schemes above rely on key-value data model. It's more complicated when secondary indexes involved The problem with secondary indexes is that they don’t map neatly to partitions. Two main approaches to partitioning a database with secondary indexes: document-based partitioning and term-based partitioning. ### Partitioning Secondary Indexes by Document ![image](https://hackmd.io/_uploads/rJChhzoZJg.png) Each partition maintains its own secondary indexes, covering only documents in that partition. This is also called `local index` `scatter/gather`: query for all partitions and combine the result, this would make the read queries quite expensive because it's not always possible to serve secondary index queries from a single partition. ### Partitioning Secondary Indexes by Term `global index`: covers data in all partitions. however we can't just store that index in one node -> bottleneck, it's must also be partitioned, but different from primary key ![image](https://hackmd.io/_uploads/rkvC58jbke.png) ![image](https://hackmd.io/_uploads/S1h098sZ1e.png) global (term-partitioned) index: indexing by term or hash of term. It's useful for range scans. This makes read queries more efficient, a client only needs to make request to the partition contains the term. Downside: writes slower and complicated because write to a single doc may affect multiple partitions of the index In practice, updates to secondary indexes are async ## Rebalancing Partitions `rebalancing`: moving load from one node in the cluster to another Minimum requirements: - After rebalancing, the load (data storage, read/write requests) should be shared fairly - While rebalancing, the database should continue accepting reads/writes - No more data than necessary should be moved between nodes ### Strategies for Rebalancing Different ways to assign partitions to nodes **How not to do it: hash mod N** If the number of nodes changes, most of the keys will need to be moved. We need an approach that doesn’t move data around more than necessary. **Fixed number of partitions** Simple solution: create more partitions than nodes, assign several partitions to each node. Only entire partitions are moved between nodes. The number of partitions does not change, nor does the assignment of keys to partitions. ![image](https://hackmd.io/_uploads/Hk_cXDoW1e.png) This approach is used in Riak, Elasticsearch, Couchbase and Voldemort. The number of partitions is the maximum number of nodes possible, high enough for future growth and not too high to avoid management effort. **Dynamic partitioning** For key range partitioning, fixed number of partitions is inconvenient, if the boundaries is wrong, all data maybe in just a partition. When a partition grows to exceed a configured size, it is split into two partitions so that approximately half of the data ends up on each side. Conversely, if lots of data is deleted and a partition shrinks below some threshold, it can be merged with an adjacent partition. Caveat: init side is 1 partition on 1 node, other are idle. HBase and MongoDB allow an initial set of partitions to be configured on an empty database (`pre-splitting`). Dynamic partitioning is not only suitable for key range–partitioned data, but can equally well be used with hash-partitioned data. MongoDB since version 2.4 supports both key-range and hash partitioning, and it splits partitions dynamically in either case. **Partitioning proportionally to nodes** In both fixed and dynamic size for partitions, the number of partitions is independent of the number of nodes. Another option, used by Cassandra and Ketama, is to make the number of partitions proportional to the number of nodes - have a fixed number of partitions/node. When a node joins cluster, it randomly chooses a fixed number of partitions to split, and take one half of those partitions. ### Operations: Automatic or Manual Rebalancing Does the rebalancing happen automatically or manually? There is a gradient between fully automatic rebalancing and fully manual. For example, Couchbase, Riak, and Voldemort generate a suggested partition assignment automatically, but require an administrator to commit it before it takes effect. ## Request Routing This is a `service discovery` problem Some approaches: - Allow clients to contact any node (round-robin), if the node contains partition, it serve, else it forward request - Send all requests to a routing tier first, which determines the node that should handle each request and forwards it accordingly. This routing tier does not itself handle any requests; it only acts as a partition-aware load balancer - Require that clients be aware of the partitioning and the assignment of partitions to nodes. In this case, a client can connect directly to the appropriate node. In all cases, the key problem is: how does the component making the routing decision learn about changes in the assignment of partitions to nodes? ![image](https://hackmd.io/_uploads/rksPE4Jf1x.png) Many distributed data systems rely on a separate coordination service such as ZooKeeper to keep track of this cluster metadata ![image](https://hackmd.io/_uploads/ByFRuVkMkg.png) LinkedIn's Expresso uses Helix (relies on ZooKeeper). HBase, SolrCloud, and Kafka also use ZooKeeper to track partition assignment. MongoDB has a similar architecture, but it relies on its own config server implementation and mongos daemons as the routing tier. Cassandra and Riak take a different approach: they use a gossip protocol among the nodes to disseminate any changes in cluster state. Requests can be sent to any node, and that node forwards them to the appropriate node for the requested partition. ### Parallel Query Execution `massively parallel processing (MPP)` relational database products, often used for analytics. A typical data warehouse query contains several join, filtering, grouping, and aggregation operations. The MPP query optimizer breaks this complex query into a number of execution stages and partitions, many of which can be executed in parallel on different nodes of the database cluster. Queries that involve scanning over large parts of the dataset particularly benefit from such parallel execution. ## Summary The goal of partitioning is to spread the data and query load evenly across multiple machines, avoiding hot spots (nodes with disproportionately high load). We discussed two main approaches to partitioning: - `Key range partitioning`, where keys are sorted, and a partition owns all the keys from some minimum up to some maximum. Efficient range queries but risk of hot spots. In this approach, partitions are typically rebalanced dynamically by splitting the range into two subranges when a partition gets too big. - `Hash partitioning`, where a hash function is applied to each key, and a partition owns a range of hashes. Range queries inefficient. When partitioning by hash, it is common to create a fixed number of partitions in advance, to assign several partitions to each node, and to move entire partitions from one node to another when nodes are added or removed. Dynamic partitioning can also be used. A secondary index also needs to be partitioned, and there are two methods: - `Document-partitioned indexes (local indexes)`, where the secondary indexes are stored in the same partition as the primary key and value. This means that only a single partition needs to be updated on write, but a read of the secondary index requires a scatter/gather across all partitions. - `Term-partitioned indexes (global indexes)`, where the secondary indexes are partitioned separately, using the indexed values. An entry in the secondary index may include records from all partitions of the primary key. When a document is written, several partitions of the secondary index need to be updated; however, a read can be served from a single partition. # Chapter 7: Transactions A transaction is a way for an application to group several reads and writes together into a logical unit. ## The Slippery Concept of a Transaction Transactions in relational databases is similar to the original transactions. Transactions in non-relational databases is cropped, have weaker set of guarantees. ### The Meaning of ACID The safety guaranteed by transactions is described by ACID, `Atomicity, Consistency, Isolation and Durability`. The detail implementation is different by each databases. Systems do not meet ACID are BASE (Basic Available, Soft state, Soft state and Eventual Consistency) **Atomicity** Generally, atomicity is something that cannot be broken down into smaller parts ACID atomicity describes what happens if client make several writes, but fault occurs after some writes are processed. The transaction is `aborted`, writes are discarded or undo. It can also be called `abortability`. **Consistency** In ACID, consistency refers to an application-specific notion of the database being in a `good state` ACID consistency is that you have certain statements about your data (`invariants`) that must always be true. It's more on application side for assuring data consistency and applying it. Thus, the letter C doesn’t really belong in ACID. **Isolation** Concurrently executing transactions are isolated from each other. The database ensures when transactions have committed, the result is the same as if they had run serially (one after another), even though in reality they may have run concurrently. ![image](https://hackmd.io/_uploads/r1c-UP3fye.png) **Durability** Once a transaction has committed, data written will not be forgotten, even if hardware fault or database crashes. Single-node db: writes are stored in hard drive or SSD. Replicated db: data replicated to some nodes. ### Single-Object and Multi-Object Operations Multi-object transactions require some way of determining which read and write operations belong to the same transaction. In relational db, this is typically done based on client's TCP connection. Everything between `BEGIN TRANSACTION` and `COMMIT` is part of transaction. Many nonrelational db don't group operations together, fails may leave the database in partially updated state. **Single-object writes** Atomicity and isolation also apply when a single object is being changed Atomicity can be implemented using a log for crash recovery (similar to WAL in BTree), isolation can be implemented using a lock on each object. Similarly popular is a `compare-and-set` operation, which allows a write to happen only if the value has not been concurrently changed by someone else. These single-object operations are useful, However, they are not transactions in the usual sense of the word. **The need for multi-object transactions** Many distributed datastores have abandoned multi-object transactions because of difficulty, HA and performance problem. Sometimes single-object write is sufficient, but in many cases writes to several different objects need to be coordinated: - In relational data model, ensure foreign references are valid. - In document data model, update several documents for denormalized information syncing. - Secondary indexes updates. Such applications can still be implemented without transactions. However, error handling are more complicated without atomicity, and the lack of isolation can cause concurrency problems. **Handling errors and aborts** ACID databases philosophy: if the database in danger of viloating ACID, it would rather abandon the transaction entirely. However, leaderless replication databases, is more of `best effort` style, it won't undo things has been done. It's application's reponsibility to recover the errors. Retrying aborted transaction is a simple and effective error handling approach, but it's not perfect: - Transaction succeeded, but response as fail -> transaction twice - Overload error -> retrying makes it worse, can use exponential backoff - Should only retry transient errors - If there are side effects outside of the application, those side effects may happen even if the transaction is aborted (sending emails for example) - If the client process fails while retrying, any data it was trying to write to the database is lost. ## Weak Isolation Levels In theory, isolation let you pretend that no concurrency is happening, `serializable` isolation: database guarantees that transactions have same effect as if they ran serially (one at a time, without concurrency) In practice, isolation is complicated, serializable isolation has a performance cost. So it's common for systems to use weaker levels of isolation. ### Read Committed The most basic level of isolation. It makes 2 guarantees: - When reading from the database, you will only see data that has been committed (no dirty reads) - When writing to the database, you will only overwrite data that has been committed (no dirty writes). **No dirty reads** ![image](https://hackmd.io/_uploads/SJz19d3zye.png) **No dirty writes** `dirty write`: earlier write is part of a transaction that has not yet committed, later write overwrites an uncommitted value. ![image](https://hackmd.io/_uploads/BkAYcunf1l.png) **Implementing read committed** Popular isolation level, it's the default setting in Oracle 11g, PostgreSQL, SQL Server 2012, MemSQL,... Most commonly, databases prevent dirty writes by using row-level locks Dirty reads can be implemented by using the same lock, this ensure a read couldn't happen while an object has a dirty, uncommited value. But this could cause performance issue, write lock can block the reads. Most databases prevent dirty reads by remembering old and new value. ## Snapshot Isolation and Repeatable Read A problem can happen with `read committed` ![image](https://hackmd.io/_uploads/SJMkpu2z1e.png) This is `nonrepeatable read` or `read skew` In Alice cases, this is acceptable, but in some other cases: `backups` and `analytic queries and integrity checks`, this could cause problems. `snapshot isolation` is the most common solution. Each transaction reads from a `consistent snapshot` of the database. Even if data is subsequently changed by another transaction, each transaction sees only the old data from that particular point in time. Snapshot isolation is a born for long-running, read-only queries such as backups and analytics. It's a popular feature supported by PostgreSQL, MySQL with InnoDB storage engine, Oracle, SQL Server,... **Implementing snapshot isolation** It also uses write locks to prevent dirty writes. However, reads do not acquire any locks. Reads and writes don't block each other. Databases hold several different committed versions of an object, this is `multi-version concurrency control (MVCC)` If database only need to provide `read committed`, it's sufficient to keep two versions of an object. However storage engines support isolation snapshot use MVCC for their `read committed` as well. A typical approach is that read committed uses a separate snapshot for each query, while snapshot isolation uses the same snapshot for an entire transaction. ![image](https://hackmd.io/_uploads/ryGNZYnGJg.png) **Visibility rules for observing a consistent snapshot** - At start transaction, database makes a list of all the other transactions that are in progress (not yet committed or aborted) at that time. Any writes that those transactions have made are ignored, even if the transactions subsequently commit. - Any writes made by aborted transactions are ignored. - Any writes made by transactions with a later transaction ID are ignored. - All other writes are visible to the application’s queries. **Indexes and snapshot isolation** `Traditional Approach`: The index points to all versions of an object. Queries must filter out versions that aren’t relevant to the current transaction. When older versions of objects are no longer needed (thanks to garbage collection), their corresponding index entries are also removed. `Append-Only/Copy-on-Write Approach`: used by databases like CouchDB, Datomic, and LMDB. Instead of modifying existing pages in a B-tree when updating data, they create new versions of the affected pages. Parent pages and the root of the tree are also updated with new references, leaving untouched pages unchanged. **Repeatable read and naming confusion** Snapshot isolation is called `serializable` in Oracle, `repeatable read` in PostgreSQL and MySQL ### Preventing Lost Updates The two last levels focus on what read-only transaction can see in concurrent writes. There are several other interesting kinds of conflicts that can occur between concurrently writing transactions. `lost update` for example. The `read-modify-write cycle` can make one of the modifications lost. Some examples of `lost update`: - Updating account balance, incrementing a counter - Two users editing a wiki page at the same time **Atomic write operations** Remove the need to `read-modify-write` cycle in application code. `UPDATE counters SET value = value + 1 WHERE key = 'foo';` MongoDB also supports modify some parts of a document, Redis provides atomic operations for modifying data structures. Not everything can be expressed in atomic operation, but if they can, it's usually the best choice. ORMs make it easy to accidentally write code that perform unsafe read-modify-write cycles. **Explicit locking** The application explicitly lock objects. ![image](https://hackmd.io/_uploads/SkPmbhTMkl.png) **Automatically detecting lost updates** Allows `read-modify-write` cycles run parallely, and if the transaction manager detects a lost update, abort the transaction, force it to retry the cycle. Databases can perform this check efficiently in conjunction with snapshot isolation. **Compare-and-set** Only allowing update if the value has not changed since last read. ![image](https://hackmd.io/_uploads/B1U4N36MJg.png) May not safe if database allow WHERE to check for old snapshots. **Conflict resolution and replication** Locks and compare-and-set do not apply in multi-leader and leaderless replicaion because of possibility of concurrent writes. A common approach is to allow concurrent writes to create several conflicting versions of a value and use application code or data structures to resolve and merge these versions. ### Write Skew and Phantoms ![image](https://hackmd.io/_uploads/H1smd3afkg.png) **Characterizing write skew** `write skew` is neither `dirty write` or `lost update` because it updating two different objects. There are different ways to prevent lost update. With write skew, options are more restricted: - Atomic single-object operations don't help - Automatic dectection of lost updates don't neither - Database constraints or triggers, materialized views - Explitcit lock the row, if can't use serializable **More examples of write skew** - Meeting room booking system - Multiplayer game - Claiming a username - Preventing double-spending **Phantoms causing write skew** Write in one transaction changes the result of a search query in another transaction, is called a `phantom`. Like in 4 examples above, the query check for the absence of a record (so can't lock it), then add it. **Materializing conflicts** In phantoms, if there is no row to lock, we can introduct a lock object to it? This approach is called `materializing conflicts`. It should be the last option because it introducts concurrency problem into application code. Should prefer serializable isolation ## Serializability Serializable isolation is the strongest isolation level. The result is same as if they had executed one at a time, serially. ### Actual Serial Execution Removing concurrency, executing everything in serial order, or single thread. This is possible because: - RAM now cheap enough to keep everything in memory - OLTP transactions are usually short and only make a small number of reads/writes This approach is implemented in VoltDB/H-Store, Redis and Datomic. However, throughput is limited to that single CPU core. **Encapsulating transactions in stored procedures** Systems with single-threaded serial transaction processing don’t allow interactive multi-statement transactions. Instead, the application must submit the entire transaction code to database ahead of time, as a `stored procedure` ![image](https://hackmd.io/_uploads/SJmxjpafkx.png) **Pros and cons of stored procedures** Cons: - Ugly syntax, lack of libraries - Difficult to manage: version control, deploy, test, integrate - Trouble with badly written code (using much of CPU or RAM) These issues are overcomed nowadays, some DB use general lanaguages (Java, Groovy) for writing stored procedures With stored procedures and in-memory data, executing all transactions on a single thread becomes feasible. As they don’t need to wait for I/O and avoid the overhead of other concurrency control mechanisms, they can achieve quite good throughput on a single thread. **Partitioning** For applications with high write throughput, the single-threaded transaction processor can become a serious bottleneck because it depends on 1 CPU core To scale to multiple CPU cores, we can partition the data. But cross-partition transactions are slower. Partition also depends on data structures, key-value data can often be partitioned easily, but data with multiple secondary indexes would require a lot of cross-partition coordination. **Summary of serial execution** Certain constraints: - Every transaction must be small and fast, because it takes only one slow transaction to stall all transaction processing. - It is limited to use cases where the active dataset can fit in memory. Otherwise, it would be very low to access data on disk - Write throughput must be low enough to be handled on a single CPU core, or else transactions need to be partitioned without requiring cross-partition coordination - Cross-partition transactions are possible, but there is a hard limit to the extent to which they can be used. ### Two-Phase Locking (2PL) Two-phase locking is similar to locks in prevent dirty writes, but stronger. Several transactions are allowed to concurrently read the same object as long as nobody is writing to it. But as soon as anyone wants to write, exclusive access is required: - If transaction A has read an object and transaction B wants to write to that object, B must wait until A commits or aborts before it can continue. (This ensures that B can’t change the object unexpectedly behind A’s back.) - If transaction A has written an object and transaction B wants to read that object, B must wait until A commits or aborts before it can continue. (Reading an old version of the object, like in Figure 7-1, is not acceptable under 2PL.) In 2PL, writes and reads block each other. **Implementation of two-phase locking** Is used by serializable isolation level in MySQL (InnoDB) and SQL Server Implemented by using a lock on each object, the lock is in `shared mode` or `exclusive mode` - If a transaction wants to read an object, it must acquire shared mode lock. Many transactions can share the lock in shared mode. But if a transaction has exclusive lock, other transactions must wait - If a transaction wants to write -> acquire exlusive lock - If a transaction reads then write -> upgrade lock from shared to exlusive - Transactions hold lock until commit or abort `deadlock`: many transactions waiting for each other, database auto abort one of transaction in deadlock, must be retried by application **Performance of two-phase locking** Transaction throughput and response time is low **Predicate locks** A database with serializable isolation must prevent phantoms. Using a `predicate lock`, it belongs to all objects that match some search condition ![image](https://hackmd.io/_uploads/HkcdVTbQJe.png) The key idea is predicate lock applies to objct might be added in the future (phantoms) **Index-range locks** `predicate locks` do not perform well. Most databases with 2PL actually implement `index-range locking` (`next-key locking`) It simplifies a predicate by making it match a greater set of objects. ### Serializable Snapshot Isolation (SSI) Provides full serializability but only a small performance penalty compared to snapshot isolation. It's the serializable isolation level used in PostgreSQL since 9.1. **Pessimistic versus optimistic concurrency control** 2PL is `pessimistic`, if anything goes wrong, wait. Serial execution is pessimistic to the extreme SSI is `optimistic` concurrency control mechanism. Instead of blocking, transactions continue hoping that everything is ok. On top of snapshot isolation, SSI adds an algorithm for detecting serialization conflicts among writes and determining which transactions to abort. **Decisions based on an outdated premise** How does the database know if a query result might have changed? There are two cases to consider: - Detecting reads of a stale MVCC object version - Detecting writes that affect prior reads **Detecting stale MVCC reads** ![image](https://hackmd.io/_uploads/HyDn9TZXJx.png) **Detecting writes that affect prior reads** ![image](https://hackmd.io/_uploads/BJQp96-X1g.png) **Performance of serializable snapshot isolation** Compared to two-phase locking, SSI doesn’t block waiting for locks held by another transaction. Like under snapshot isolation, writers don’t block readers, and vice versa. Latency much more predictable and less variable. In particular, read-only queries can run on a consistent snapshot without requiring any locks, which is very appealing for read-heavy workloads. Compared to serial execution, SSI is not limited to the throughput of a single CPU core. Even though data may be partitioned across multiple machines, transactions can read and write data in multiple partitions while ensuring serializable isolation. The rate of aborts significantly affects the overall performance of SSI. ## Summary Race conditions - Dirty reads - Dirty writes - Read skew - Lost updates - Phantom reads 3 approaches to implement serializable transactions: - Literally executing transactions in a serial order: single CPU core - Two-phase locking: For decades this has been the standard way of implementing serializability, but many applications avoid using it because of its performance characteristics. - Serializable snapshot isolation (SSI): It uses an optimistic approach, allowing transactions to proceed without blocking. When a transaction wants to commit, it is checked, and it is aborted if the execution was not serializable. # Chapter 8: The Trouble with Distributed Systems ## Faults and Partial Failures A invidual computer with good software is either fully functional or entirely broken, not something between. In a distributed system, there may well be some parts of the system that are broken in some unpredictable way. ### Cloud Computing and Supercomputing Supercomputer is more like a single-node computer than a distributed system: it deals with partial failure by letting it escalate into total failure—if any part of the system fails, just let everything crash (like a kernel panic on a single machine). In distributed systems, we must accept the possibility of partial failure and build fault-tolerance mechanisms into the software ## Unreliable Networks ![image](https://hackmd.io/_uploads/r18lpw4X1l.png) The usual way of handling this issue is a timeout: after some time you give up waiting and assume that the response is not going to arrive. ### Network Faults in Practice Even if network faults are rare in your environment, the fact that faults can occur means that your software needs to be able to handle them. ### Detecting Faults If something has gone wrong, you may get an error response at some level of the stack, but in general you have to assume that you will get no response at all. You can retry a few times (TCP retries transparently, but you may also retry at the application level), wait for a timeout to elapse, and eventually declare the node dead if you don’t hear back within the timeout. ### Timeouts and Unbounded Delays It's hard to determine a timeout value. Scenario: every packet is delivered within `d` or lost, but never longer than `d`. Non-failed node handles request within `r`. `2d + r` is a reasonable timeout value. Problem: `d` and `r` are not guaranteed - unbounded delays. **Network congestion and queueing** ![image](https://hackmd.io/_uploads/B14cWdNX1e.png) In such environments, you can only choose timeouts experimentally: measure the distribution of network round-trip times over an extended period, and over many machines, to determine the expected variability of delays. In such environments, you can only choose timeouts experimentally: measure the distribution of network round-trip times over an extended period, and over many machines, to determine the expected variability of delays. TCP retransmission timeouts also work similarly. ### Synchronous Versus Asynchronous Networks `synchronous` network: does not suffer from queuing, the bandwith is pre allocated, for example: telephone network. Latency is fixed: `bounded delay` **Can we not simply make network delays predictable?** Ethernet and IP are packet-switched protocols, which suffer from queueing and thus unbounded delays in the network. These protocols do not have the concept of a circuit. They are optimized for `bursty traffic` With careful use of quality of service (QoS, prioritization and scheduling of packets) and admission control (rate-limiting senders), it is possible to emulate circuit switching on packet networks, or provide statistically bounded delay. However, such quality of service is currently not enabled in multi-tenant datacenters and public clouds, or when communicating via the internet ## Unreliable Clocks In distributed systems, delays are variable, each machine has its own clock and not sync with others. Using NTP for adjusting time following a group of servers. The servers get their time from a more accurate time source, such as a GPS receiver. ### Monotonic Versus Time-of-Day Clocks Modern computers have at least 2 different kinds of clocks: `time-of-day clock` and a `monotonic` clock. **Time-of-day clocks** Returns the current date and time according to some calendar (`wall-clock time`). It is usually synced with NTP. If the time is too far from NTP, it could jump back, which makes it not possible for measuring elapsed time. **Monotonic clocks** Suitable for measuring time elapsed (time interval). NTP may adjust the frequency at which the monotonic clock moves forward (this is known as `slewing` the clock) if it detects that the computer’s local quartz is moving faster or slower than the NTP server. By default, 0.05% is allowed by NTP. In a distributed system, using a monotonic clock for measuring elapsed time (e.g., timeouts) is usually fine, because it doesn’t assume any synchronization between different nodes’ clocks and is not sensitive to slight inaccuracies of measurement. ### Clock Synchronization and Accuracy Time-of-day clocks need to be set according to an NTP server, but it's not so reliable: - The quartz clock in a computer is not very accurate - Application observing clock may jump back or forward - NTP synchronization may delay - NTP server misconfiguration - Leap seconds - Virtualized clocks in VM - Incorrect hardware clock ### Relying on Synchronized Clocks Although they work quite well most of the time, robust software needs to be prepared to deal with incorrect clocks. **Timestamps for ordering events** ![image](https://hackmd.io/_uploads/H1g3DorXkl.png) Problems with LWW: - Database writes can mysteriously disappear - LWW cannot distinguish between writes that occurred sequentially in quick succession `logical clocks` are a safer option for ordering events. Time of a day or monotonic clocks are physical clocks. **Clock readings have a confidence interval** System may be 95% confident that the time now is between 10.3 and 10.5 seconds past the minute, but it doesn’t know any more precisely than that The uncertainty bound can be calculated based on your time source. If you’re getting the time from a server, the uncertainty is based on the expected quartz drift since your last sync with the server, plus the NTP server’s uncertainty, plus the network round-trip time to the server (to a first approximation, and assuming you trust the server). **Synchronized clocks for global snapshots** Synchronizes clocks for transaction ID in snapshot isolation level in distributed systems ![image](https://hackmd.io/_uploads/BklH6iS7kl.png) ### Process Pauses A node in a distributed system must assume that its execution can be paused for a significant length of time at any point, even in the middle of a function. During the pause, the rest of the world keeps moving and may even declare the paused node dead because it’s not responding. Eventually, the paused node may continue running, without even noticing that it was asleep until it checks its clock sometime later. GC pauses for example. **Response time guarantees** Real-time operating system (RTOS): there is a specified deadline by which the software must respond. Expensive, with low throughput (because of prioritizing response time) **Limiting the impact of garbage collection** If the runtime can warn the application that a node soon requires a GC pause, the application can stop sending new requests to that node, wait for it to finish processing outstanding requests, and then perform the GC while no requests are in progress. A variant: garbage collector only for short-lived objects, restart processes periodically, one node at a time for (like rolling upgrade) These measures cannot fully prevent garbage collection pauses, but they can usefully reduce their impact on the application. ## Knowledge, Truth, and Lies In a distributed system, we can state the assumptions we are making about the behavior (the system model) and design the actual system in such a way that it meets those assumptions. ### The Truth Is Defined by the Majority Many distributed algorithms rely on a quorum, that is, voting among the nodes: decisions require some minimum number of votes from several nodes in order to reduce the dependence on any one particular node. **The leader and the lock** ![image](https://hackmd.io/_uploads/Skefo2HmJg.png) **Fencing tokens** ![image](https://hackmd.io/_uploads/HyXAinrm1g.png) `fencing token` which is a number that increases every time a lock is granted. Note that this mechanism requires the resource itself to take an active role in checking tokens by rejecting any writes with an older token than one that has already been processed ### Byzantine Faults Fencing tokens can be faked by node for bypassing system guarantees. `Byzantine fault`: nodes may `lie` (send arbitrary faulty or corrupted responses) `Byzantine Generals Problem`: problem of reaching consensus in this untrusting environment A system is `Byzantine fault-tolerant` if it continues to operate correctly even if some of the nodes are malfunctioning. Protocols for making systems Byzantine fault-tolerant are quite complicated, and fault-tolerant embedded systems rely on support from hardware, the cost make makes them impractical Most Byzantine fault-tolerant algorithms require a supermajority of more than two-thirds of the nodes to be functioning correctly ### System Model and Reality `system model`: abstraction describes what things an an aglorithm may assume 3 common system models for timing assumptions: - `synchronous model`: assumes bounded network delay, bounded process pauses and bounded clock error. This is not a realistic model of most practical systems - `partially synchronous model`: sync most of the time. This is a realistic model of many systems - `asynchronous system`: algorithm is now allowed to make any timing assumption. Very restrictive number of algorithms 3 common system models for nodes: - `crash-stop faults`: a node can fail in only 1 way, crashing. Nodes do not come back - `crash-recovery faults`: nodes are assumed to have stable storage across crashes, in-mem state is lost - `Byzantine (arbitrary) faults`: nodes may do absolutely anything, including trying to trick and deceive other nodes For modeling real systems, the partially synchronous model with crash-recovery faults is generally the most useful model **Correctness of an algorithm** We can write down the `properties` we want of a distributed algorithm to define what it means to be correct For example, algorithm for fencing tokens would need these properties: `Uniqueness`, `Monotonic sequence`, `Availability` **Safety and liveness** 2 kinds of properties: `safety` and `liveness` - If a safety property is violated, we can point at a particular point in time at which it was broken. After a safety property has been violated, the violation cannot be undone—the damage is already done - A liveness property works the other way round: it may not hold at some point in time, but there is always hope that it may be satisfied in the future ## Summary