# Optimistic Replication Roadmap ## 1. Intent Some of this document's assumptions are more general than the current project's scope requires. This is intentional, as this roadmap serves both for outlining the immediate goals, as well as a path for further optimizations/development. Whenever possible I quickly switch focus to the problem at hand, and opt for a more particular solution (as opposed to a more general one). Treat this document as a "mind dump" of my research on what we should be paying attention to, not necessarily in the near future, but somewhere along the road. :question: Whenever you see such a mark, please, consider providing a feedback -- those are areas that need peers' input/review. ## 2. Optimistic replication fundamentals ### 2.0 Definitions Replication : Placing copies of data on different nodes to improve availability and fault tolerance. Optimistic replication : Allows node to read and update local copies of data at any time. Each update is *tentative* because it might conflict with another remote update. CRDTs : Conflict-free replicated data types. Basically, CRDTs provide way to merge incoming updates without resorting to conflict resolution by clients -- all merges are automatic. Siblings : Different versions of the same data, due to concurrent updates. ### 2.1 General workflow The main reason to use optimistic replication is high availability and low latency, which is an absolute requirement for the relay nodes. With pessimistic or eager replication there must be some kind of synchronization on data updates, possible conflicts are resolved *a priori*. With OR conflicts are resolved after the fact, in the background. The following general pattern exists: - Any node can initiate reading or writing transaction on a local replica of the data. - When the local transaction completes successfully, the node is responsible for propagating this update to other nodes, before the transaction is considered successful system-wide. - Remote nodes replay, or apply, the propagated updates locally and respond with a status. - After receiving enough confirmations, initiator node can consider transaction as comitted. This workflow assumes that updates to the same key can occur concurrently, and common order is established at some later stage, *a posteriori*. ### 2.2 Transmitting updates There are two possible ways to propagate updates: - **State-based**, where after-value of a transaction is transmitted (in our case, since we do not use transactions and work on whole values, it is just a whole value struct). - **Operation-based**, where update operation itself is transmitted. The state-based approach is what is currently assumed by our code. **Pros/cons:** - state-based replaying has some advantages/disadvantages: - <sub>:heavy_plus_sign:</sub> it is deterministic - <sub>:heavy_plus_sign:</sub> can be more efficient (as applying the update is just writing into database) - <sub>:heavy_minus_sign:</sub> if objects are relatively big, then we are moving around lots of bytes - operation based approach has the following advantages/disadvantages: - <sub>:heavy_minus_sign:</sub> it can be not deterministic w/o operation rewrites (see example below) - <sub>:heavy_plus_sign:</sub> it is expacted to produce smaller footprint - <sub>:heavy_plus_sign:</sub> normally, operations are more likely to commute, so merging them is easier Example when operation update is non-deterministic, w/o operation transformation/rewrite: - consider shared text "abc" - node1 issues operation: `insert("X", 1)` (resulting in "aXbc") - node2 issues operation: `delete(2)` (resulting in "ab") - when node2 replays `insert("X", 1)` it gets "aXb" - when node1 replays `delete(2)` it gets "aXc", so w/o operation rewrite to `delete(1)` we cannot deterministically replay :::warning :question: At the moment, we do not update persisted objects on the granularity of individual fields, always writing whole objects, instead. Will this requirement remain? If it does, then we can proceed with a simpler design when working with concurrent updates -- as our data comes close to being immutable (you do not update the objects, you always add -- even deletions can be structured as writes). *The answer to this question has huge implications -- as it is easy to work with whole immutable objects, without strong causality primitives, like vector versions, and it is absolutely another story if we allow in-object modifications (then we need stronger approach to causality detection).* ::: :::info **:fire: Suggested approach:** - Since we are operating on whole values (we never update individual fields of value structs), the operation-based approach doesn't really apply to us. - When bootstrapping we definitely want to provide key ranged *values* which are to be applied to a new node. So, state-based, again. - Even when propagating individual updates (like adding new element to routing table), we still will need to propagate whole objects, as this is exactly what the local node operates on. **Action items:** - [ ] Enable networking within storage (so that storage can initiate updates propagation). - [ ] Propagating updates to replication peers. - [ ] Propagate additions - [ ] Propagate removals - [ ] Applying updates and ACKing back. ::: ### 2.2 Conflicts and partial ordering If updates are not concurrent, they can be ordered (see [section on ordering events](#4-Ordering-concurrent-events)), and applied one after another. There still can be a merging process, as updates might change different fields of a persisted structure -- but if we have order of events, we can do this deterministically. For a pair of concurrent updates we can have one of the following: - Transactions are **commuting** -- order is immaterial and you can apply them in either `T1, T2` or `T2, T1` order. - Transactions are **non-commuting** -- order is important, and while both `T1, T2` and `T2, T1` *can* make sense, the semantics is different (consider `T1 := transfer balance` and `T2 := deposit $100`, either order can make sense, but the meaning is quite different -- so, all nodes should apply in some common order). - Transactions are **antagonistic** -- either order of transactions will violate some invariant, and you have to drop either one, or both. When updates are mutually non-commuting or antagonistic, we have a conflict. Reconciliation involves aborting some updates (overwriting them) and combining non-conflicting updates (see details in [Conflict Resolution section](#5-Conflict-resolution)). **Note:** Suggested approach and action items are discussed in upcoming sections. ### 2.3 Conflict resolvers There are two kinds of possible resolvers: - **Syntactic resolver** can merge the updates automatically. For example if CRDTs are used to express your data, then all updates can be automatically merged. Alternatively, a simple policy of LWW (last writer wins) can be used, then an object with a higher timestamp overwrites previous updates, automatically. - **Semantic resolver** is an application specific code, that knows how to deal with conflicts. This requires system to keep multiple versions of data, and then some process to merge that data. Some illustrative implementations: - Riak has quite an extended support for sibling resolution (they started from vector clocks and migrated to dotted vector versions to be able to track down concurrent updates). They expose the resolver interface allowing end-users to plug-in code for conflict resolution. They also, eventually, moved to CRDTs (registers, sets, maps are supported), so that users can get syntactic resolution, and not worry about vector clocks. - Cassandra from the very beginning ditched the idea of using vector clocks for tracking down the order of concurrent operations (the reason is to lower latency) and instead decided to split any data struct into fields (each data instance is row, each data instance's field is a column). Then, each column can be updated separately, and they use timestamps with simple LWW policy on per column level. :::info **:fire: Suggested approach:** - It seems that we can rely on syntactic resolution for all our data types. - Before `set` and `map` data types were implemented for our database, (being our own clients) we actually had a semantic resolvers in form of RocksDB merge operators. Right now, data types are structured in such a way that no extra merging is necessary. If, however, we need any additional merging logic, we can always opt for merge operators re-introduction. ::: ## 3. Propagation mechanism and eventual consistency ### 3.0 Definitions Replication factor : Number of copies to be held within a system, for any instance of data. Replica set or replication peers : Nodes that share responsibility of holding a value identified by a given key. The size of a replica set is equals to the replication factor. Consistency level : Number of replication peers that must acknowledge the read/write operation for it to be considered successful. Coordinator node : Any node that receives a write/read request and coordinates its fulfillment. Preference list : For any given key, known list of peers that can be selected for an update event propagation. Each update event requires a replica factor number of such peers for an operation to be considered successful. Sloppy quorum : While quorum implies `50% + 1` nodes, sloppy quorum has the very same number but of *guaranteedly healthy* nodes. ### 3.1 Coordinator nodes and replica sets The relay network is non-hierarchical, so any node can respond to the request. It is possible to place some kind of load balancer in front of relay nodes, that will know how to select a node that is responsible for managing incoming key. That way coordinator node will write locally, as -- in that case -- it is a part of a replica set. However, this is not strictly required -- all writes/reads by a coordinator node can be remote. The read/write operations are to be applied against a subset of a *preference list* -- nodes that are considered to be possible targets for a key range for a given request. Preference list should be higher than replication factor, as some nodes may be unavailable. On [Dynamo-based][5] storage systems, we have a quorum-like consistency protocol: - With replication factor of `N`, we can configure two parameters `R` and `W`. The former is the minimum number of nodes that must respond for a read operation to be considered successful, and the latter is the same for writes. - For quorum-like consistency, we have to make sure that `R + W > N`. Note that individually both parameters are lower than the replication factor, allowing to have a better latency. For instance, for the replication factor of `N=3`, the good values for quorum-like reads and writes are `R=2` and `W=2`, respectively. If we set `R=1, W=1`, then we have a consistency of `ONE` (in Cassandra's terms), and setting `R=W=N` gives us consistency level of `ALL`. Note that the latency of read/write operations will be bound by the latency of the slowest node selected from the preference list. That's why we normally want to have `R` and `W` that are strictly lower than `N`. :::info **:fire: Suggested approach:** - Opt for quorum-like consistency with `(N, R, W) = (3, 2, 2)`. - Since we are using consistence hashing it is not hard to create a load balancing service that will forward requests based on incoming keys, so that coordinator node is part of the replica set, and one write is local to that coordinator node. **Action items:** - [ ] Preference list formation (based on consistent hashing). - [ ] Making sure that at least `R` or `W` (depending on the operation) healthy nodes are selectable from the preference list. - [ ] Make sure that `(N, R, W)` are configurable parameters. - [ ] System-wide successful writes: on writes notify `N`, but wait for `W`. - [ ] System-wide successful reads: on reads ping `N`, but wait for `R` responses. - [ ] Any node has capability of being coordinator for any read/write. **Action items (future):** - [ ] Load balancer node (which is aware of consistency hashing mechanisms). ::: ### 3.2 Sloppy Quorums and Hinted Handoff As mentioned before, the preference list should be bigger than the replication factor, so that failure of some nodes (or a network partition) can be tolerated. Practically, it means that our system should be able to select top `N` **healthy** nodes from the preference list, quickly. This means that, on some operations, data will be pushed not to nodes assigned to a key range by the consistent hashing mechanism, but to some other -- backup -- nodes. Whenever coordinator node pushes the data to such a backup node, that node will not only store the data, but, in addition to that, will store some hints on where this data should have gone originally. Alternatively, this hint information can be provided by coordinator node. Once a node that served as a backup node discovers that hinted peer has recovered, those backup updates are pushed to it (all this in background). See [Hinted Handoff section](#35-Hinted-Handoff) for details. This hinting mechanism allows storage system to be always-writable, even in presence of failures or partitions. :::danger For the first implementation, we will not use sloppy quorums or hinted handoff. If we cannot select enout participants from the replica set, error is to be retuned to the user (and they can retry). ::: ### 3.3 Writes For a replication factor of `N`, we need to make sure that updates eventually propagate to all `N` replica peers. However, for a lower latency, we want to minimize the number of replicas being written during the request execution process. So, with some `W < N`, for a write operation to be considered successful, and assuming that coordinator node is a part of a replica set, a coordinator node: - writes data locally, assigns timestamp or vector version to it. - notifies all `N-1` *healthy* peers, but *actively* waits for only `W-1` peers: once they ACK, write is considered successful, and the rest of peers responses proceed in the background (assuming you want to wait on them, for logging purposes, for example). :::info **:fire: Suggested approach:** - Operations can fail (massive portion of the cluster is down, and not enough peers for `W` writes). So, there should be some time limit imposed on an operation. Generally, it is a wise practice to cap all networking operations with some (probably, generous) time limit. There should be an exit strategy for unpredictable events. **Action items:** The most of the heavy-lifting is already covered in [Transmitting updates](#22-Transmitting-updates) section. - [ ] Integrate `relay_rocks` and `rely_irn`, so that database adapters can communicate the updates to the rest of the replica set. - [ ] Update `relay_rocks::types::*` (`string`, `set`, `map`) to support pushing to `W-1` replicas protocol. - [ ] Add time limit after which an operation times out. - [ ] Built tests that cover: - [ ] Successful write (`W` propagation). - [ ] Full `N` propagation. - [ ] Rollback of local transaction if not enough peers responded. ::: ### 3.4 Reads Similarly to writes, on reads, a coordinator node: - contacts all `N-1` (assuming coordinator is a part of replica set, and has its own view on data as well) highest-ranked reachable peers from the preference list. - actively waits for `R-1` responses, resolves possible conflicts, produces the result to the client. :::info **Action items:** - [ ] Integrate `relay_rocks` and `rely_irn`, so that database adapters can pull the data views from the rest of the replica set. - [ ] Update `relay_rocks::types::*` (`string`, `set`, `map`) to support pulling `R-1` replicas protocol. - [ ] Add time limit after which an operation times out. - [ ] Built tests that cover: - [ ] Non-conflicting read from `R` peers. ::: #### 3.4.1 Read repair In highly concurrent systems when divergent updates contend for some value, we have to answer two questions: - Who will be responsible for conflict resolution? - When conflicts will be resolved. In strongly consistent systems we resolve conflicts on writes. With OR, we trade some consistency guarantees for a better availability and low latency, so conflicts are resolved at a later stage, when reads are requested (there are also additional places when diverging data versions can be reconciliated, like on import/export, compaction). The idea is simple: - When gathering responses from `R` peers, we check whether there are different versions of a data. - To understand whether two versions are different, we can go as simple as using `updated` timestamps or version vectors, or some kind of digest (like hash, which can then be used in Merkle trees -- to see whether key *ranges* are diverging). - If divergent versions are not causally related (that's they can't be deterministically ordered), we need some kind of reconciliation scheme, either syntactic, or semantic (in that case we can present client with different versions, and allow it to provide resolvers, as Riak does). - Once conflict is resolved, all the nodes need this info propagated: in background, they are sent a resolved version of data, so on the next read request no conflicting version will be reported. Alternatively, no propagation occurs (just a local update, fixing conflict locally), and each node from the replicat set, has its own take on merging and updating locally. :::info **:fire: Suggested Approach:** - In our system, it seems to be possible to merge the data syntactically, without using any semantic conflict resolving. - We should opt for read-repair, making sure that more data is read, the less conflicted it becomes. That way hot paths will be converged quicker, which is a nice approach on deciding which keys should be resolved first. - We should probably not propagate the merged versions, allowing each peer to do a local conflict repare. This implies less messages travelling around, and a bit more work for each individual peer. **Action items:** - [ ] Produce "diff-like" function that will conclude whether incoming reads are in conflict. - [x] Make sure that for all storage types (unaddressed and addressed mailbox, routing table, client-project and project-project_data mapping) we can merge syntactically. - [ ] Built tests that cover: - [ ] Conflicting read from `R` peers. - [ ] Conflict resolution, and local update with a new calculated value. **Action items (future):** - [ ] Implement propagation of updated version to conflicting peers. ::: ### 3.3 More on removals When data is removed, those removals should also propagate to peers. We have two scenarios where it is important: - Exporting/importing key ranges: if the source node has the data item removed, on export this information should travel with the import, and update the destination accordingly. - On individual operations: when item is destined to be removed on one node, all its peers in the replica set should be aware of this delete. In order to provide support for both use-cases, and to leverage RocksDB append-only approach, delete operations should be treated as writes: - When node receives delete request, it updates the data instance with a marker value -- so-called tombstone -- which means that the record is considered as deleted. - The delete is propagated to all `N-1` peers, and upon ACKs from `W-1` peers, is considered successful. - In order to provide even stronger consistency guarantees, tombstoned items will not be purged immediately, but instead will be marked with some expiration time, when it elapses, those items can be safely garbage collected in background (on compaction, for example). :::info **:fire: Suggested Approach:** - Mark items as deleted, do not physically remove them. Items are removed only by garbage collector. - Separate reads into two types: - Clients should not see deleted items. This means that individual reads as well as range scans do not produce deleted items. - Internal reads (when coordinator gathers the views from peers) and import/export scans should produce those tombstones, so that this information is included in calculating the merged view on a data instance. **Action items:** - [ ] Introduce tombstones, instead of physical deletes - [ ] Allow propagating individual deletes to peers - [ ] Make sure that deleted items can have an expiration time, that is separate from the item's expiration time. Moreover, when item expires, do not immediately remove it on compaction, but turn the item into tombstone (so that internal reads and scans can get this info) and it can follow the same garbage collection protocol. - [ ] Implement garbage collection of deleted items, which should be removed once their deletion expiry time elapses. - [ ] Test cases: - [ ] Deletion of an item without expiration time. - [ ] Deletion of an item with expiration time: - [ ] Item is expired (so is marked for deletion). - [ ] Item is not expired (delete is requested by some client). - [ ] Propagation: - [ ] Items are invisible for client reads. - [ ] Items are visible for internal reads. - [ ] Garbage collection: - [ ] Items are uneffected if delete expiration hasn't come. - [ ] Items are physically removed on delete expiration. ::: ### 3.4 Handling range synchronization Nodes should be able to compare the data *en masse*, on some key range: - Node can be just joining the cluster, or fall behind due to partition. As node has multiple key ranges (vnodes) it needs to pull updated information from different peers. - To optimize the process, there should be mechanism of comparing data between nodes on per key *range* level. - One way to do it is to use Merkle trees, which will work as hashroot for a key range, and on divergent hashroots node can immediately know that it needs to pull the data from the peer. :::info **:fire: Suggested Approach:** - During the first phase, just pull key ranges in non-optimal, all ranges included, way. - As this might involve long catching up process, for a node that has been down -- due to maintenance, for instance, -- we need to use hashroots and pull only those branches that are diverging. - Merkle trees allow us to disect the key range into smaller and smaller regions, up until we know the diverging branch of the tree, and then only sub-range for that diverging region is pulled. - Merkle trees are costly to build and maintain, so whenever this optimization is tackled a good consideration on how to keep tree up to date must be made ([Dynamo Paper][5] has some pointers). **Action items:** - [ ] Exporting/importing of a key range of data between nodes. - [x] Storage layer - [ ] Network layer **Action items (future):** - [ ] Implement Merkle roots for data range. - [ ] Allow to disect key range into a smaller range, that is responsible for key divergence. - [ ] Allow importing/exporting diverging sub-ranges only. ::: ### 3.5 Hinted Handoff Sloppy quorums are formed using *reachable, healthy* nodes, instead of just replica set nodes that are responsible for a key range. Therefore, we need a protocol of handling handoff of those items to the nodes that should have been selected, if they were healthy. The good thing about consistent hashing, among many others, is that we can form a preference list using ordinary "go clockwise" approach. That way, if a node is down permanently, then those nodes we are writing to, will be the new responsible nodes for a key, anyway! In order to support that semantics, we need to know when a node is considered as failing temporary (so is subject to catch up, when online) and when failure is deemed as permanent (and the node need to rejoin the cluster via bootstrapping). Thus, backup nodes should be capable of the following: - In background, they should check the cluster status, once they have found that a node, for which they have hinted data, is online, start pushing data towards them. - The handoff data should have expiry time which is a function on expiry time to consider a failing node as "failed permanently". When this timestamp is elapsed: it means enough time has passed for the original target to be considered as failed permanently, then the data should be just assumed as owned (the current node is now responsible for the updated key range). :::info **:fire: Suggested Approach:** - Cassandra stores the hints in files on a coordinator node, which, in my opinion, is giving too much of responsibility to a node that can be participating in replica set, thus impedes durability (if the coordinator node goes down, so go its own local data and hints). - I believe that data should be pushed down the key ring to nodes that are possible candidates for the key range if the failure is permanent. **Action Items:** - [ ] Double-check that we have a preference list which is smart enough to produce only *healthy* peers. - [ ] Allow coordinator to push hinted items (or destination node be capable of understanding that data is not originally intended for it, but for handing off to another peer). - [ ] Mechanism for membership and failure detection, with ability to deem nodes as failed permanently. - [ ] Propagate hinted data to node returning to the cluster. - [ ] Ability to take ownership of the data, if the original target is down permanently. ::: ### 3.6 Rollbacks Consider the situation when a write operation is requested, number of peers have already responded and while waiting for `W` peers to respond the coordinator node fails (for whatever reason). Or, for whatever reason a coordinator cannot end up with `W` confirmation and timeout error should be returned to the user. The operation is not complete, but the value is already partially replicated. Does it pose a problem? Not essentially: - Since there's no quorum, partial writes will not result in values being reported by reads. - The only issue is that our database now has values that are wasting some space, and need to be garbage collected. :::info **:fire: Suggested approach:** - On partial reads, repair the local version (if locally we have data but `R` peers report that there is no data for the key). - If it is not a coordinator node that fails, after returning timeout error to the client, deletes might be initiated for nodes that ACKed the transaction (in the background). **Action items:** - [ ] Test cases: - [ ] Simulate coordinator node failure - [ ] Simulate less than required `W` reporting ::: ## 4. Ordering concurrent events When producing partial ordering we can essentially rely on two approaches: - Using physical clocks and timestamps. - Using logical clock and some kind of counters. If we use physical clocks we will have to make sure that time on nodes is synced, which is not an easy task. If we accept some level of time drifting, then it is hard to actually understand whether updates were concurrent, or can be ordered (see example below). Cassandra's decision not to use any kind of vector clocks, resulted in *allegedly* better performance (since logical clocks generally require read -- the current version of object -- before write), but it also resulted in an additional complexity of making sure that the chance of data access contention and, as effect, loss is diminished (structuring their data as columns). At the moment, we are NOT destructuring the persisted data into fields (to mutate them individually). Moreover, our data usage patterns allow us to treat values as immutable (we never update internal state of the values, values can be dropped or replaced), so we **can** rely on timestamps. However, a word of caution: the moment we decide that we want to mutate some internal field within our saved values, timestamps will be inadequate for durability guarantees (data loss :scream:), because timestamps are notoriously bad indicators for concurrent events (time drifts, no way to understand if events have causality relation etc). Cassandra structured their data by splitting fields into separate columns, Riak uses dotted version vectors to track causality, i.e. we will need extra mechanisms, should we require interior mutability. So, <sup>[2], [3], [4]</sup>: - Relying on wall clock is not that easy (see the NTP section in [All things clock][2] article), without some kind of coordinator (like ZooKeeper). - Preventing data loss in highly concurrent environment using timestamps is really tough (check out a simple set update example in [Jepsen: Cassandra][3]). An illustrative example of why timestamps can't help with determining whether updates are concurrent, or not: ```sequence Node1->Node2: 1. sadd("topic", "entry1") Note left of Node2: On Node2: ["entry1"] Node1-->Node3: 2. propagate sadd("topic", "entry1") Note left of Node3: On Node3: ["entry1"] Node1->Node2: 3. sadd("topic", "entry2") Note left of Node2: On Node2: ["entry1", "entry2"] Node1-->Node3: 4. propate sadd("topic", "entry2") Note left of Node3: On Node3: ["entry1", "entry2"] Node3->Node2: 5. sadd("topic", "entry3") ``` At the end of the interaction, `Node3` knows about both `entry1` and `entry2` updates, but with simple timestamps there's simply no way to show this! Indeed, if we swap the order of operations `4` and `5`, relative timestamps will be the same, but this time `Node3` will have a concurrent update with `Node1`. With version vectors (and more space optimized dotted version vectors), we can distinguish between two situations, and apply `add` operation on `Node2` in the first case, and `merge` in the second. :+1: The good news is that our data schema is simple right now, therefore in both cases we just use `add` and it works ok (data is deduplicated, LWW policy is used -- w/o critical data loss). :::info **:fire: Suggested approach:** - Rely on timestamps for the moment. - Rely on version vectors *and timestamps* for ordering events if we decide to allow mutating objects internals: - Avoids implementation of a pretty complex logic of managing columns. - No need to make sure that time on each node is perfectly synchronized -- that's depend on logical instead of physical clocks. - Since each data instance still has a timestamp, it will work as tie-breaker (there can still be close to millisecond antagonistic updates, in theory, but hardly in practice given what we are doing). - Try to continue structuring our database types so that they work as CRDTs, without semantic reconciliation required. Overall, the proposed approach is quite similar to what Cassandra does, but without over-complicating the storage engine with column management (which, I believe, is an overkill for our use-case). **Action items:** - [ ] When importing/exporting take `updated` timestamp into account - [ ] Double check that conflict resolution information from the next section doesn't introduce any unwanted data loss. - [ ] Determine time sync approach (average of several NTP servers?). ::: ## 5. Conflict resolution :question: Double-check that what is claimed here, does really make sense. :::warning **State-based CRDTs** aka convergent replicated data types (CvRDTs) assume that whole objects are transmitted (hence state-based), and the merge function is able to, well..merge them w/o conflicts. The merge function must be commutative, associative and idempotent, so that you can merge data in any order, incrementally, not be affected by duplicates. It seems that our use-cases allow for such a merge process: either via way we've structured our database -- see Appendix I -- or using RocksDB merge operator (for a more complex future use-cases). ::: **What kind of updates do we have?** So, when it comes to concurrent updates do we deal with commuting, non-commuting or antagonistic updates? We are not dealing with complex transactions, so non-commuting updates are almost absent. Almost, because we might have delete and add operations pair, and order of execution may matter. However, on such cases, we can opt for some automatic reconciliation strategy, like "additions win" (if no relative order can be established and updates are treated as concurrent). Moreover, since we are using sloppy quorum and read repairs, accidentally "undeleted" items are highly unlikely. Due to simplicity of our schema and the way we implemented data types in RocksDB, we can always almost trivially resolve antagonistic updates (we always know which one should prevail). Let's check! **1. Unaddressed mailbox** ```rust! // Layout (set) storage["topic"] = {MailboxMessage1, .., MailboxMessageN} // Operations storage.sadd(topic, [msg1, .., msgK]); storage.srem(topic, msg); storage.sget(topic) -> [msg1, .., msgN]; ``` See Appendix I, for more info on how `set` data type is implemented. Since we deduplicate added items, any number of concurrent additions end up as a union of them. And since each individual item is stored as a separate row, we can add them in any order, they do not interfere with each other. **If we rely on timestamps only:** Time drifts may interfere into ordering of things, but in the worst case, we will have some deleted item *on some node* become undeleted. However, given that we rely on sloppy quorum and read repairs, it will require more than one node start drifting, as otherwise its data will be repaired using the info from the other replica peers. **If we can detect concurrent updates (i.e. implement logical clocks):** With "addition wins" we can add concurrent removals, and still be able to merge into a union set, deterministically. Note that not all updates are concurrent, so whenever we can deduce the order of operations, they will be applied in that order. With "additions win", deduplication and the way we structure the data, it is hard to come up with an antagonistic updates, that require some extra reconciliation. **2. Mailbox** ```rust! // Layout (map) storage["recipient" + "topic"]["msg_id"] = MailboxMessage // Operations (key = recipient + topic) storage.hset(key, msg_id, msg); storage.hvals(key) -> [msg1, .., msgN]; storage.hdel(key, msg_id); ``` Since we are operating on whole items (that's we manipulate messages as a whole, not on individual fields), then accepting **LWW** (the last writer wins) policy we can avoid antagonistic transactions (or know how to merge them syntactically). *Note:* For full info on our schema see Appendix II. :::info **:fire: Suggested approach:** - For "Unaddressed Mailbox" (and any set instance) use **additions win** on concurrent updates (where order cannot be determined -- that's if logical clocks are used) and normal **union merge** for the rest. - For "Mailbox" use **LWW**, which shouldn't pose any issues if we continue operating on whole items. - "Routing table" is very similar to "Unaddressed Mailbox" (both are sets), so similar reasoning applies. - For "Client-Project" and "Project-Project Data" mappings (they are simple key-value pairs), both **additions win** (if we have delete and add concurrently) and **LWW** can be used. :warning: Whenever we assume that "something wins", it means overwriting/aborting some other update, so is essentially signifies some data loss. **The trick is to lose redundant data only or be able to repair.** ::: ## Appendix I: Implemented RocksDB data types We have built several data types on top of the RocksDB. What follows is a brief explanation on how they are implemented, which has the direct impact on how we can express our current schema. ### String The String data type treats values as just string of bytes: - Both key and value are marshaled into message pack, and stored within a single column family. - No meta data column family, as we do not to keep extra information about values outside the values themselves. - All values (for all data types) are wrapped into `DataExt` struct (which should probably be called `DataContext`). This wrapper holds extra information per value: `expiration_time`, `updated`. Whenever we need to augment a value with some internal data, we extend the wrapper. Key | Value -|- `key` | `DataExt(value)` Interface is simple: ```rust! /// Gets value for a provided `key`. /// /// Time complexity: `O(1)`. fn get(key); /// Sets value for a provided key. /// /// Time complexity: `O(1)`. fn set(key, value, ttl); /// Returns and removes value for a provided key. /// /// Time complexity: `O(1)`. fn getdel(key); /// Returns initialized iterator for a provided `key` range. fn scan_iterator(left, right) -> StringStorageIterator; ``` We do not *update* values, when something changes, we replace the value. This allows us to rely on timestamps for ordering events, without too much worrying about concurrent updates. ### Set Originally implemented as a `key -> set` mapping, where values were encoded as lists of items. Key | Value -|- `key` | `[val1, val2, ..., valN]` This resulted in `O(n)` insertions and deletions (you unpack the set add/remove element, pack the set back). Additionally, concurrent writes required semantic merge operation, to reconciliate the set (we've used RocksDB merge operator). The only upside of such an implementation was that it is relatively simple. Downsides outweighted heavily. We gradually moved to a better design, which allows both more optimal data manipulation and less contention on concurrent updates. Right now, for each `key -> [val1, val2, ..., valN]` pair we add a single entry into meta data column family: Key | Value -|- `cf_id` + `key` | `{version: timestamp}` where - `cf_id` is identifier allowing different set/map instances to use the very same meta data column family -- all records are prepended with that set/map instance id. - `version` is used to - quickly remove a set and all its items (increase version, garbage collect in background) - ability to quickly select range of keys (see below) And the actual set elements are put into their own column family: Key | Value -|- `key` + `version` + `valK` | `DataExt(None)` where - `version` comes from meta data, and you are able to search key ranges easily `key+version..key+(version+1)` - data additions/removals are `O(1)` operations, as you only add/remove distinct nodes, never looking in the rest of the set. - as each value is wrapped into `DataExt`, we are able to provide per element extra info (like expiration times) - data contention is also avoided, as we can use the `LWW` for individual items, and items operate on distinct rows. The resultant data type performs under the same time complexity assumptions as Redis's `set` data type. Interface: ```rust! /// Returns all the members of the set value stored at key. fn smembers(key) -> HashSet<val>; /// Adds specified members to a set identified by provided key. /// /// If set with a given key doesn't exist, it is created. Multiple items can /// be added in a single call. Duplicate values are ignored. fn sadd(key, values: [val1, ..., valK], ttl); /// Removes specified member from a set identified by provided key. /// /// If after a sequence of merges set doesn't have any elements, the key is /// removed on the next compaction, by the compaction filter. fn srem(key, value); /// Returns initialized iterator for a provided `key` range. fn scan_iterator(left, right) -> SetStorageIterator; ``` ### Map The `set` data type is actually a `map` with empty values, i.e. `set[key] = [val1, ..., valK]` can be represented using `map[key] = {val1: none, ..., valK: none}` map. Using this fact, maps were build in a very similar manner. So, maps are `map[key] = {field1: val1, ..., fieldK: valK}` objects, and are represented in DB as following: Meta data column family: Key | Value -|- `cf_id` + `key` | `{version: timestamp}` Data column family: Key | Value -|- `key` + `version` + `fieldK` | `DataExt(valK)` ### Additional information - Currently, our keys (in Redis and MongoDB) are overly verbose -- unnecessarily taking more space than required. In RocksDB adapter, keys are marshaled into message pack messages, without any extra strings added, i.e. `Topic("543f33a28235e70cb2860d4cd1f4423184450311d6ca72477dad71a337b1ac9f")` will be marshaled into bytes w/o appending/prefixing anything like `mailbox/{topic}`. We don't need those extra qualifiers (each data type is in its own column family -- table) and we use keys to scan over them in RocksDB. ## Appendix II: Database schema Currently, we have a relatively easy schema we need to support (hence, in many places we can opt for a more straightforward solution, at least during the first iteration). ### 1. Unaddressed mailbox | Entity | Type | Info | |-|-|-| | Key | `domain::Topic` || | Value | `mailbox::MailboxMessage[]` | List of `MailboxMessage` objects per key | | RocksDB Value Type | `Set` | see Appendix I | Sample topic: `Topic("543f33a28235e70cb2860d4cd1f4423184450311d6ca72477dad71a337b1ac9f")` Sample message: ```json= MailboxMessage { sender_id: Some(ClientId("z6MkhhNEThbvirkLoqr4cucwp7bcYCNWjwCiWbuRdX7mnDGQ")), recipient_id: Some(ClientId("z6MkhhNEThbvirkLoqr4cucwp7bcYCNWjwCiWbuRdX7mnDGK")), content: Request { id: MessageId(0), jsonrpc: "2.0", params: Publish( Publish { topic: Topic("543f33a28235e70cb2860d4cd1f4423184450311d6ca72477dad71a337b1ac9f"), message: "eyJ0aXRsZSI6IlRlc3Qgbm90aWZpY2F0aW9uIiwiYm9keSI6IkZpbmQgb3V0IG1vcmUiLCJpY29uIjpudWxsLCJ1cmwiOm51bGx9", ttl_secs: 30, tag: 42, prompt: false } ) } } ``` ### 2. Mailbox | Entity | Type | Info | |-|-|-| | Key | `domain::ClientId` + `domain::Topic` | `(recipient_id, topic)`| | Value | `mailbox::MessageId` -> `mailbox::MailboxMessage` |list of `(message_id -> message)` pairs| | RocksDB Value Type | `Map` | see Appendix I | Since, `Mailbox` is saved as `map` in our database, we are able to manipulate data on per message id level, e.g. ``` // pseudocode key = (recipient_id, topic) // Returns the value associated with `message_id` in the hash stored at `key`. db.hget(key, message_id) ``` NB: With `set` datatype we can also access set elements individually, but by value, not key (and as such it involves scanning set in `O(n)` to find the corresponding element). With maps, per message operations are all `O(1)`. ### 3. Routing Table | Entity | Type | Info | |-|-|-| | Key | `domain::Topic` | | | Value | `routing_table::RoutingTableEntry[]` | List of `RoutingTableEntry` objects per key| | RocksDB Value Type | `Set` | see Appendix I | Sample topic: `Topic("543f33a28235e70cb2860d4cd1f4423184450311d6ca72477dad71a337b1ac9f")` Sample routing table entry: ```json= RoutingTableEntry { client_id: ClientId("z6MkhhNEThbvirkLoqr4cucwp7bcYCNWjwCiWbuRdX7mnDGK"), relay_addr: 127.0.0.1:8080 } ``` ### 4. Client to Project mapping | Entity | Type | Info | |-|-|-| | Key | `domain::ClientId` | | | Value | `domain::ProjectId` | | | RocksDB Value Type | `String` | see Appendix I | Sample client: `ClientId("z6MkhhNEThbvirkLoqr4cucwp7bcYCNWjwCiWbuRdX7mnDGK")` Sample project: `ProjectId("af25318b8d1e15a3ca0d2612c1334eba")` ### 5. Project to Project Data mapping | Entity | Type | Info | |-|-|-| | Key | `domain::ProjectId` | | | Value | `project::ProjectDataResponse` | `Result(ProjectData, StorageError)` we cache even errors atm | | RocksDB Value Type | `String` | see Appendix I | Sample project: `ProjectId("af25318b8d1e15a3ca0d2612c1334eba")` Sample project data: ```json= ProjectData { uuid: "f889b1cc-63d8-418c-ace6-e3275c675d85d", name: "PROJECT_NAME470", push_url: None, keys: [], is_enabled: true, allowed_origins: [] } ``` ## References - [Optimistic Replication and Resolution][1], Marc Shapiro (2009) - [Cassandra - A Decentralized Structured Storage System][7] Avinash Lakshman, Prashant Malik (2009) - [Jepsen: Cassandra][3], Kyle Kingsbury (2013) - [Dynamo: Amazon’s Highly Available Key-value Store][5], Giuseppe DeCandia et al. (2007) - [All Things Clock, Time and Order in Distributed Systems: Physical Time in Depth][2], Kousik Nath (2021) - [The problem with timestamps][4], Kyle Kingsbury (2013) - [Time, Clocks, and the Ordering of Events in a Distributed System][8] Leslie Lamport (1978) - [Dotted Version Vectors: Logical Clocks for Optimistic Replication][6] Nuno Preguiça et al. (2010) [1]: <https://hal.inria.fr/hal-01248202/file/optimistic-replication-Encyclopedia-DB-systems-2009.pdf> "Optimistic replication and resolution" [2]: <https://medium.com/geekculture/all-things-clock-time-and-order-in-distributed-systems-physical-time-in-depth-3c0a4389a838> "All Things Clock, Time and Order in Distributed Systems: Physical Time in Depth" [3]: <https://aphyr.com/posts/294-call-me-maybe-cassandra> "Jepsen: Cassandra" [4]: <https://aphyr.com/posts/299-the-trouble-with-timestamps> "The problem with timestamps" [5]: <https://www.allthingsdistributed.com/files/amazon-dynamo-sosp2007.pdf> "Dynamo: Amazon’s Highly Available Key-value Store" [6]: <https://arxiv.org/abs/1011.5808> "Dotted Version Vectors: Logical Clocks for Optimistic Replication" [7]: <https://www.cs.cornell.edu/projects/ladis2009/papers/lakshman-ladis2009.pdf> "Cassandra - A Decentralized Structured Storage System" [8]: <https://amturing.acm.org/p558-lamport.pdf> "Time, Clocks, and the Ordering of Events in a Distributed System"