# Optimistic Replication Roadmap
## 1. Intent
Some of this document's assumptions are more general than the current project's scope requires.
This is intentional, as this roadmap serves both for outlining the immediate goals, as well as
a path for further optimizations/development.
Whenever possible I quickly switch focus to the problem at hand, and opt for a more particular solution
(as opposed to a more general one).
Treat this document as a "mind dump" of my research on what we should be paying attention
to, not necessarily in the near future, but somewhere along the road.
:question: Whenever you see such a mark, please, consider providing a feedback -- those are areas that
need peers' input/review.
## 2. Optimistic replication fundamentals
### 2.0 Definitions
Replication
: Placing copies of data on different nodes to improve availability and fault tolerance.
Optimistic replication
: Allows node to read and update local copies of data at any time. Each update is *tentative*
because it might conflict with another remote update.
CRDTs
: Conflict-free replicated data types. Basically, CRDTs provide way to merge incoming updates without
resorting to conflict resolution by clients -- all merges are automatic.
Siblings
: Different versions of the same data, due to concurrent updates.
### 2.1 General workflow
The main reason to use optimistic replication is high availability and low latency, which is an
absolute requirement for the relay nodes. With pessimistic or eager replication there must be some kind of synchronization on data updates, possible conflicts
are resolved *a priori*. With OR conflicts are resolved after the fact, in the background.
The following general pattern exists:
- Any node can initiate reading or writing transaction on a local replica of the data.
- When the local transaction completes successfully, the node is responsible for propagating this update
to other nodes, before the transaction is considered successful system-wide.
- Remote nodes replay, or apply, the propagated updates locally and respond with a status.
- After receiving enough confirmations, initiator node can consider transaction as comitted.
This workflow assumes that updates to the same key can occur concurrently, and common order is
established at some later stage, *a posteriori*.
### 2.2 Transmitting updates
There are two possible ways to propagate updates:
- **State-based**, where after-value of a transaction is transmitted (in our case, since we do not
use transactions and work on whole values, it is just a whole value struct).
- **Operation-based**, where update operation itself is transmitted.
The state-based approach is what is currently assumed by our code.
**Pros/cons:**
- state-based replaying has some advantages/disadvantages:
- <sub>:heavy_plus_sign:</sub> it is deterministic
- <sub>:heavy_plus_sign:</sub> can be more efficient (as applying the update is just writing into database)
- <sub>:heavy_minus_sign:</sub> if objects are relatively big, then we are moving around lots of bytes
- operation based approach has the following advantages/disadvantages:
- <sub>:heavy_minus_sign:</sub> it can be not deterministic w/o operation rewrites (see example below)
- <sub>:heavy_plus_sign:</sub> it is expacted to produce smaller footprint
- <sub>:heavy_plus_sign:</sub> normally, operations are more likely to commute, so merging them is easier
Example when operation update is non-deterministic, w/o operation transformation/rewrite:
- consider shared text "abc"
- node1 issues operation: `insert("X", 1)` (resulting in "aXbc")
- node2 issues operation: `delete(2)` (resulting in "ab")
- when node2 replays `insert("X", 1)` it gets "aXb"
- when node1 replays `delete(2)` it gets "aXc", so w/o operation rewrite to `delete(1)` we cannot deterministically replay
:::warning
:question: At the moment, we do not update persisted objects on the granularity of individual fields,
always writing whole objects, instead. Will this requirement remain? If it does, then we can proceed
with a simpler design when working with concurrent updates -- as our data comes close to being
immutable (you do not update the objects, you always add -- even deletions can be structured as writes).
*The answer to this question has huge implications -- as
it is easy to work with whole immutable objects, without strong causality primitives, like vector
versions, and it is absolutely another story if we allow in-object modifications (then we need stronger
approach to causality detection).*
:::
:::info
**:fire: Suggested approach:**
- Since we are operating on whole values (we never update individual fields of value structs),
the operation-based approach doesn't really apply to us.
- When bootstrapping we definitely want to provide key ranged *values* which are to be applied to a
new node. So, state-based, again.
- Even when propagating individual updates (like adding new element to routing table), we still will
need to propagate whole objects, as this is exactly what the local node operates on.
**Action items:**
- [ ] Enable networking within storage (so that storage can initiate updates propagation).
- [ ] Propagating updates to replication peers.
- [ ] Propagate additions
- [ ] Propagate removals
- [ ] Applying updates and ACKing back.
:::
### 2.2 Conflicts and partial ordering
If updates are not concurrent, they can be ordered (see [section on ordering events](#4-Ordering-concurrent-events)), and
applied one after another. There still can be a merging process, as updates might change different
fields of a persisted structure -- but if we have order of events, we can do this deterministically.
For a pair of concurrent updates we can have one of the following:
- Transactions are **commuting** -- order is immaterial and you can apply them
in either `T1, T2` or `T2, T1` order.
- Transactions are **non-commuting** -- order is important, and while both `T1, T2` and `T2, T1` *can*
make sense, the semantics is different (consider `T1 := transfer balance` and `T2 := deposit $100`,
either order can make sense, but the meaning is quite different -- so, all nodes should apply in some
common order).
- Transactions are **antagonistic** -- either order of transactions will violate some invariant,
and you have to drop either one, or both.
When updates are mutually non-commuting or antagonistic, we have a conflict. Reconciliation involves
aborting some updates (overwriting them) and combining non-conflicting updates (see details in
[Conflict Resolution section](#5-Conflict-resolution)).
**Note:** Suggested approach and action items are discussed in upcoming sections.
### 2.3 Conflict resolvers
There are two kinds of possible resolvers:
- **Syntactic resolver** can merge the updates automatically. For example if CRDTs are used to express
your data, then all updates can be automatically merged. Alternatively, a simple policy of LWW
(last writer wins) can be used, then an object with a higher timestamp overwrites previous updates,
automatically.
- **Semantic resolver** is an application specific code, that knows how to deal with conflicts. This requires
system to keep multiple versions of data, and then some process to merge that data.
Some illustrative implementations:
- Riak has quite an extended support for sibling resolution (they started from vector clocks and migrated
to dotted vector versions to be able to track down concurrent updates). They expose the resolver
interface allowing end-users to plug-in code for conflict resolution.
They also, eventually, moved to CRDTs (registers, sets, maps are supported), so that users can get
syntactic resolution, and not worry about vector clocks.
- Cassandra from the very beginning ditched the idea of using vector clocks for tracking down the order of
concurrent operations (the reason is to lower latency) and instead decided to split any data struct into
fields (each data instance is row, each data instance's field is a column). Then, each column can be updated
separately, and they use timestamps with simple LWW policy on per column level.
:::info
**:fire: Suggested approach:**
- It seems that we can rely on syntactic resolution for all our data types.
- Before `set` and `map` data types were implemented for our database, (being our own clients) we
actually had a semantic resolvers in form of RocksDB merge operators. Right now, data types are structured
in such a way that no extra merging is necessary. If, however, we need any additional merging logic,
we can always opt for merge operators re-introduction.
:::
## 3. Propagation mechanism and eventual consistency
### 3.0 Definitions
Replication factor
: Number of copies to be held within a system, for any instance of data.
Replica set or replication peers
: Nodes that share responsibility of holding a value identified by a given key. The size of a replica set
is equals to the replication factor.
Consistency level
: Number of replication peers that must acknowledge the read/write operation for it to be considered
successful.
Coordinator node
: Any node that receives a write/read request and coordinates its fulfillment.
Preference list
: For any given key, known list of peers that can be selected for an update event propagation. Each
update event requires a replica factor number of such peers for an operation to be considered successful.
Sloppy quorum
: While quorum implies `50% + 1` nodes, sloppy quorum has the very same number but of *guaranteedly healthy* nodes.
### 3.1 Coordinator nodes and replica sets
The relay network is non-hierarchical, so any node can respond to the request. It is possible to place
some kind of load balancer in front of relay nodes, that will know how to select a node that is
responsible for managing incoming key. That way coordinator node will write locally, as -- in that case --
it is a part of a replica set. However, this is not strictly required -- all writes/reads by a coordinator
node can be remote.
The read/write operations are to be applied against a subset of a *preference list* -- nodes that are
considered to be possible targets for a key range for a given request.
Preference list should be higher than replication factor, as some nodes may be unavailable.
On [Dynamo-based][5] storage systems, we have a quorum-like consistency protocol:
- With replication factor of `N`, we can configure two parameters `R` and `W`. The former is the minimum
number of nodes that must respond for a read operation to be considered successful, and the latter is
the same for writes.
- For quorum-like consistency, we have to make sure that `R + W > N`. Note that individually both parameters
are lower than the replication factor, allowing to have a better latency.
For instance, for the replication factor of `N=3`, the good values for quorum-like reads and writes
are `R=2` and `W=2`, respectively. If we set `R=1, W=1`, then we have a consistency of `ONE`
(in Cassandra's terms), and setting `R=W=N` gives us consistency level of `ALL`.
Note that the latency of read/write operations will be bound by the latency of the slowest node selected
from the preference list. That's why we normally want to have `R` and `W` that are strictly lower than `N`.
:::info
**:fire: Suggested approach:**
- Opt for quorum-like consistency with `(N, R, W) = (3, 2, 2)`.
- Since we are using consistence hashing it is not hard to create a load balancing service that will
forward requests based on incoming keys, so that coordinator node is part of the replica set, and one
write is local to that coordinator node.
**Action items:**
- [ ] Preference list formation (based on consistent hashing).
- [ ] Making sure that at least `R` or `W` (depending on the operation) healthy nodes are selectable
from the preference list.
- [ ] Make sure that `(N, R, W)` are configurable parameters.
- [ ] System-wide successful writes: on writes notify `N`, but wait for `W`.
- [ ] System-wide successful reads: on reads ping `N`, but wait for `R` responses.
- [ ] Any node has capability of being coordinator for any read/write.
**Action items (future):**
- [ ] Load balancer node (which is aware of consistency hashing mechanisms).
:::
### 3.2 Sloppy Quorums and Hinted Handoff
As mentioned before, the preference list should be bigger than the replication factor, so that failure
of some nodes (or a network partition) can be tolerated. Practically, it means that our system should
be able to select top `N` **healthy** nodes from the preference list, quickly. This means that, on some
operations, data will be pushed not to nodes assigned to a key range by the consistent hashing mechanism,
but to some other -- backup -- nodes.
Whenever coordinator node pushes the data to such a backup node, that node will not only store the data,
but, in addition to that, will store some hints on where this data should have gone originally.
Alternatively, this hint information can be provided by coordinator node.
Once a node that served as a backup node discovers that hinted peer has recovered, those backup updates
are pushed to it (all this in background). See [Hinted Handoff section](#35-Hinted-Handoff) for details.
This hinting mechanism allows storage system to be always-writable, even in presence of failures or partitions.
:::danger
For the first implementation, we will not use sloppy quorums or hinted handoff. If we cannot select enout
participants from the replica set, error is to be retuned to the user (and they can retry).
:::
### 3.3 Writes
For a replication factor of `N`, we need to make sure that updates eventually propagate to all `N`
replica peers. However, for a lower latency, we want to minimize the number of replicas being written
during the request execution process. So, with some `W < N`, for a write operation to be considered
successful, and assuming that coordinator node is a part of a replica set, a coordinator node:
- writes data locally, assigns timestamp or vector version to it.
- notifies all `N-1` *healthy* peers, but *actively* waits for only `W-1` peers: once they ACK, write is considered
successful, and the rest of peers responses proceed in the background (assuming you want to wait on them,
for logging purposes, for example).
:::info
**:fire: Suggested approach:**
- Operations can fail (massive portion of the cluster is down, and not enough peers for `W` writes). So,
there should be some time limit imposed on an operation. Generally, it is a wise practice to cap all
networking operations with some (probably, generous) time limit. There should be an exit strategy for
unpredictable events.
**Action items:**
The most of the heavy-lifting is already covered in [Transmitting updates](#22-Transmitting-updates) section.
- [ ] Integrate `relay_rocks` and `rely_irn`, so that database adapters can communicate the updates
to the rest of the replica set.
- [ ] Update `relay_rocks::types::*` (`string`, `set`, `map`) to support pushing to `W-1` replicas protocol.
- [ ] Add time limit after which an operation times out.
- [ ] Built tests that cover:
- [ ] Successful write (`W` propagation).
- [ ] Full `N` propagation.
- [ ] Rollback of local transaction if not enough peers responded.
:::
### 3.4 Reads
Similarly to writes, on reads, a coordinator node:
- contacts all `N-1` (assuming coordinator is a part of replica set, and has its own view on data as well)
highest-ranked reachable peers from the preference list.
- actively waits for `R-1` responses, resolves possible conflicts, produces the result to the client.
:::info
**Action items:**
- [ ] Integrate `relay_rocks` and `rely_irn`, so that database adapters can pull the data views
from the rest of the replica set.
- [ ] Update `relay_rocks::types::*` (`string`, `set`, `map`) to support pulling `R-1` replicas protocol.
- [ ] Add time limit after which an operation times out.
- [ ] Built tests that cover:
- [ ] Non-conflicting read from `R` peers.
:::
#### 3.4.1 Read repair
In highly concurrent systems when divergent updates contend for some value, we have to answer two questions:
- Who will be responsible for conflict resolution?
- When conflicts will be resolved.
In strongly consistent systems we resolve conflicts on writes. With OR, we trade some consistency
guarantees for a better availability and low latency, so conflicts are resolved at a later stage, when
reads are requested (there are also additional places when diverging data versions can be reconciliated,
like on import/export, compaction).
The idea is simple:
- When gathering responses from `R` peers, we check whether there are different versions of a data.
- To understand whether two versions are different, we can go as simple as using `updated` timestamps
or version vectors, or some kind of digest (like hash, which can then be used in Merkle trees -- to
see whether key *ranges* are diverging).
- If divergent versions are not causally related (that's they can't be deterministically ordered), we
need some kind of reconciliation scheme, either syntactic, or semantic (in that case we can present
client with different versions, and allow it to provide resolvers, as Riak does).
- Once conflict is resolved, all the nodes need this info propagated: in background, they are sent
a resolved version of data, so on the next read request no conflicting version will be reported.
Alternatively, no propagation occurs (just a local update, fixing conflict locally), and each node from
the replicat set, has its own take on merging and updating locally.
:::info
**:fire: Suggested Approach:**
- In our system, it seems to be possible to merge the data syntactically, without using any semantic
conflict resolving.
- We should opt for read-repair, making sure that more data is read, the less conflicted it becomes.
That way hot paths will be converged quicker, which is a nice approach on deciding which keys should
be resolved first.
- We should probably not propagate the merged versions, allowing each peer to do a local conflict repare.
This implies less messages travelling around, and a bit more work for each individual peer.
**Action items:**
- [ ] Produce "diff-like" function that will conclude whether incoming reads are in conflict.
- [x] Make sure that for all storage types (unaddressed and addressed mailbox, routing table,
client-project and project-project_data mapping) we can merge syntactically.
- [ ] Built tests that cover:
- [ ] Conflicting read from `R` peers.
- [ ] Conflict resolution, and local update with a new calculated value.
**Action items (future):**
- [ ] Implement propagation of updated version to conflicting peers.
:::
### 3.3 More on removals
When data is removed, those removals should also propagate to peers. We have two scenarios where it
is important:
- Exporting/importing key ranges: if the source node has the data item removed, on export this information
should travel with the import, and update the destination accordingly.
- On individual operations: when item is destined to be removed on one node, all its peers in the replica set
should be aware of this delete.
In order to provide support for both use-cases, and to leverage RocksDB append-only approach, delete
operations should be treated as writes:
- When node receives delete request, it updates the data instance with a marker value -- so-called
tombstone -- which means that the record is considered as deleted.
- The delete is propagated to all `N-1` peers, and upon ACKs from `W-1` peers, is considered successful.
- In order to provide even stronger consistency guarantees, tombstoned items will not be purged immediately,
but instead will be marked with some expiration time, when it elapses, those items can be safely
garbage collected in background (on compaction, for example).
:::info
**:fire: Suggested Approach:**
- Mark items as deleted, do not physically remove them. Items are removed only by garbage collector.
- Separate reads into two types:
- Clients should not see deleted items. This means that individual reads as well as range scans do
not produce deleted items.
- Internal reads (when coordinator gathers the views from peers) and import/export scans should
produce those tombstones, so that this information is included in calculating the merged view on a
data instance.
**Action items:**
- [ ] Introduce tombstones, instead of physical deletes
- [ ] Allow propagating individual deletes to peers
- [ ] Make sure that deleted items can have an expiration time, that is separate from the item's
expiration time. Moreover, when item expires, do not immediately remove it on compaction, but turn
the item into tombstone (so that internal reads and scans can get this info) and it can follow the same
garbage collection protocol.
- [ ] Implement garbage collection of deleted items, which should be removed once their deletion expiry
time elapses.
- [ ] Test cases:
- [ ] Deletion of an item without expiration time.
- [ ] Deletion of an item with expiration time:
- [ ] Item is expired (so is marked for deletion).
- [ ] Item is not expired (delete is requested by some client).
- [ ] Propagation:
- [ ] Items are invisible for client reads.
- [ ] Items are visible for internal reads.
- [ ] Garbage collection:
- [ ] Items are uneffected if delete expiration hasn't come.
- [ ] Items are physically removed on delete expiration.
:::
### 3.4 Handling range synchronization
Nodes should be able to compare the data *en masse*, on some key range:
- Node can be just joining the cluster, or fall behind due to partition. As node has multiple key ranges
(vnodes) it needs to pull updated information from different peers.
- To optimize the process, there should be mechanism of comparing data between nodes on per key *range*
level.
- One way to do it is to use Merkle trees, which will work as hashroot for a key range, and on divergent
hashroots node can immediately know that it needs to pull the data from the peer.
:::info
**:fire: Suggested Approach:**
- During the first phase, just pull key ranges in non-optimal, all ranges included, way.
- As this might involve long catching up process, for a node that has been down -- due to maintenance,
for instance, -- we need to use hashroots and pull only those branches that are diverging.
- Merkle trees allow us to disect the key range into smaller and smaller regions, up until we know
the diverging branch of the tree, and then only sub-range for that diverging region is pulled.
- Merkle trees are costly to build and maintain, so whenever this optimization is tackled a good
consideration on how to keep tree up to date must be made ([Dynamo Paper][5] has some pointers).
**Action items:**
- [ ] Exporting/importing of a key range of data between nodes.
- [x] Storage layer
- [ ] Network layer
**Action items (future):**
- [ ] Implement Merkle roots for data range.
- [ ] Allow to disect key range into a smaller range, that is responsible for key divergence.
- [ ] Allow importing/exporting diverging sub-ranges only.
:::
### 3.5 Hinted Handoff
Sloppy quorums are formed using *reachable, healthy* nodes, instead of just replica set nodes
that are responsible for a key range. Therefore, we need a protocol of handling handoff of those items
to the nodes that should have been selected, if they were healthy.
The good thing about consistent hashing, among many others, is that we can form a preference list using
ordinary "go clockwise" approach. That way, if a node is down permanently, then those nodes we are writing to, will be the new
responsible nodes for a key, anyway! In order to support that semantics, we need to know when a node
is considered as failing temporary (so is subject to catch up, when online) and when failure is deemed as permanent (and the
node need to rejoin the cluster via bootstrapping).
Thus, backup nodes should be capable of the following:
- In background, they should check the cluster status, once they have found that a node, for which
they have hinted data, is online, start pushing data towards them.
- The handoff data should have expiry time which is a function on expiry time to consider a failing
node as "failed permanently". When this timestamp is elapsed: it means enough time has passed for the
original target to be considered as failed permanently, then the data should be just assumed as owned
(the current node is now responsible for the updated key range).
:::info
**:fire: Suggested Approach:**
- Cassandra stores the hints in files on a coordinator node, which, in my opinion, is giving too much
of responsibility to a node that can be participating in replica set, thus impedes durability (if
the coordinator node goes down, so go its own local data and hints).
- I believe that data should be pushed down the key ring to nodes that are possible candidates for
the key range if the failure is permanent.
**Action Items:**
- [ ] Double-check that we have a preference list which is smart enough to produce only *healthy* peers.
- [ ] Allow coordinator to push hinted items (or destination node be capable of understanding that
data is not originally intended for it, but for handing off to another peer).
- [ ] Mechanism for membership and failure detection, with ability to deem nodes as failed permanently.
- [ ] Propagate hinted data to node returning to the cluster.
- [ ] Ability to take ownership of the data, if the original target is down permanently.
:::
### 3.6 Rollbacks
Consider the situation when a write operation is requested, number of peers have already responded and
while waiting for `W` peers to respond the coordinator node fails (for whatever reason). Or, for whatever
reason a coordinator cannot end up with `W` confirmation and timeout error should be returned to the user.
The operation is not complete, but the value is already partially replicated. Does it pose a problem?
Not essentially:
- Since there's no quorum, partial writes will not result in values being reported by reads.
- The only issue is that our database now has values that are wasting some space, and need to be garbage collected.
:::info
**:fire: Suggested approach:**
- On partial reads, repair the local version (if locally we have data but `R` peers report that there
is no data for the key).
- If it is not a coordinator node that fails, after returning timeout error to the client, deletes might
be initiated for nodes that ACKed the transaction (in the background).
**Action items:**
- [ ] Test cases:
- [ ] Simulate coordinator node failure
- [ ] Simulate less than required `W` reporting
:::
## 4. Ordering concurrent events
When producing partial ordering we can essentially rely on two approaches:
- Using physical clocks and timestamps.
- Using logical clock and some kind of counters.
If we use physical clocks we will have to make sure that time on nodes is synced, which is not an easy
task. If we accept some level of time drifting, then it is hard to actually understand whether updates
were concurrent, or can be ordered (see example below). Cassandra's decision not to use any kind of
vector clocks, resulted in *allegedly* better performance (since logical clocks generally require read
-- the current version of object -- before write), but it also resulted in an additional complexity
of making sure that the chance of data access contention and, as effect, loss is diminished
(structuring their data as columns).
At the moment, we are NOT destructuring the persisted data into fields (to mutate them individually).
Moreover, our data usage patterns allow us to treat values as immutable (we never update internal state
of the values, values can be dropped or replaced), so we **can** rely on timestamps.
However, a word of caution: the moment we decide that we want to mutate some internal field within our saved values,
timestamps will be inadequate for durability guarantees (data loss :scream:), because timestamps are notoriously
bad indicators for concurrent events (time drifts, no way to understand if events have causality relation etc).
Cassandra structured their data by splitting fields into separate columns, Riak uses dotted version vectors
to track causality, i.e. we will need extra mechanisms, should we require interior mutability.
So, <sup>[2], [3], [4]</sup>:
- Relying on wall clock is not that easy (see the NTP section in [All things clock][2] article), without
some kind of coordinator (like ZooKeeper).
- Preventing data loss in highly concurrent environment using timestamps is really tough (check out
a simple set update example in [Jepsen: Cassandra][3]).
An illustrative example of why timestamps can't help with determining whether updates are concurrent,
or not:
```sequence
Node1->Node2: 1. sadd("topic", "entry1")
Note left of Node2: On Node2: ["entry1"]
Node1-->Node3: 2. propagate sadd("topic", "entry1")
Note left of Node3: On Node3: ["entry1"]
Node1->Node2: 3. sadd("topic", "entry2")
Note left of Node2: On Node2: ["entry1", "entry2"]
Node1-->Node3: 4. propate sadd("topic", "entry2")
Note left of Node3: On Node3: ["entry1", "entry2"]
Node3->Node2: 5. sadd("topic", "entry3")
```
At the end of the interaction, `Node3` knows about both `entry1` and `entry2` updates, but with simple
timestamps there's simply no way to show this! Indeed, if we swap the order of operations `4` and `5`,
relative timestamps will be the same, but this time `Node3` will have a concurrent update with `Node1`.
With version vectors (and more space optimized dotted version vectors), we can distinguish between
two situations, and apply `add` operation on `Node2` in the first case, and `merge` in the second.
:+1: The good news is that our data schema is simple right now, therefore in both cases we just use
`add` and it works ok (data is deduplicated, LWW policy is used -- w/o critical data loss).
:::info
**:fire: Suggested approach:**
- Rely on timestamps for the moment.
- Rely on version vectors *and timestamps* for ordering events if we decide to allow mutating
objects internals:
- Avoids implementation of a pretty complex logic of managing columns.
- No need to make sure that time on each node is perfectly synchronized --
that's depend on logical instead of physical clocks.
- Since each data instance still has a timestamp, it will work as tie-breaker
(there can still be close to millisecond antagonistic updates, in theory,
but hardly in practice given what we are doing).
- Try to continue structuring our database types so that they work as CRDTs, without
semantic reconciliation required.
Overall, the proposed approach is quite similar to what Cassandra does, but without over-complicating
the storage engine with column management (which, I believe, is an overkill for our use-case).
**Action items:**
- [ ] When importing/exporting take `updated` timestamp into account
- [ ] Double check that conflict resolution information from the next section doesn't introduce any
unwanted data loss.
- [ ] Determine time sync approach (average of several NTP servers?).
:::
## 5. Conflict resolution
:question: Double-check that what is claimed here, does really make sense.
:::warning
**State-based CRDTs** aka convergent replicated data types (CvRDTs) assume that whole objects are
transmitted (hence state-based), and the merge function is able to, well..merge them w/o conflicts.
The merge function must be commutative, associative and idempotent, so that you can merge data in
any order, incrementally, not be affected by duplicates. It seems that our use-cases allow for such
a merge process: either via way we've structured our database -- see Appendix I -- or using RocksDB
merge operator (for a more complex future use-cases).
:::
**What kind of updates do we have?**
So, when it comes to concurrent updates do we deal with commuting, non-commuting or antagonistic
updates?
We are not dealing with complex transactions, so non-commuting updates are almost absent. Almost,
because we might have delete and add operations pair, and order of execution may matter. However,
on such cases, we can opt for some automatic reconciliation strategy, like "additions win" (if
no relative order can be established and updates are treated as concurrent). Moreover, since we
are using sloppy quorum and read repairs, accidentally "undeleted" items are highly unlikely.
Due to simplicity of our schema and the way we implemented data types in RocksDB, we can always
almost trivially resolve antagonistic updates (we always know which one should prevail).
Let's check!
**1. Unaddressed mailbox**
```rust!
// Layout (set)
storage["topic"] = {MailboxMessage1, .., MailboxMessageN}
// Operations
storage.sadd(topic, [msg1, .., msgK]);
storage.srem(topic, msg);
storage.sget(topic) -> [msg1, .., msgN];
```
See Appendix I, for more info on how `set` data type is implemented.
Since we deduplicate added items, any number of concurrent additions end up as a union of them. And
since each individual item is stored as a separate row, we can add them in any order, they do not
interfere with each other.
**If we rely on timestamps only:** Time drifts may interfere into ordering of things, but in the worst
case, we will have some deleted item *on some node* become undeleted. However, given that we rely on
sloppy quorum and read repairs, it will require more than one node start drifting, as otherwise
its data will be repaired using the info from the other replica peers.
**If we can detect concurrent updates (i.e. implement logical clocks):**
With "addition wins" we can add concurrent removals, and still be able to merge into a union set,
deterministically. Note that not all updates are concurrent, so whenever we can deduce the order of
operations, they will be applied in that order.
With "additions win", deduplication and the way we structure the data, it is hard to come up with
an antagonistic updates, that require some extra reconciliation.
**2. Mailbox**
```rust!
// Layout (map)
storage["recipient" + "topic"]["msg_id"] = MailboxMessage
// Operations (key = recipient + topic)
storage.hset(key, msg_id, msg);
storage.hvals(key) -> [msg1, .., msgN];
storage.hdel(key, msg_id);
```
Since we are operating on whole items (that's we manipulate messages as a whole, not on individual
fields), then accepting **LWW** (the last writer wins) policy we can avoid antagonistic transactions
(or know how to merge them syntactically).
*Note:* For full info on our schema see Appendix II.
:::info
**:fire: Suggested approach:**
- For "Unaddressed Mailbox" (and any set instance) use **additions win** on concurrent updates
(where order cannot be determined -- that's if logical clocks are used) and normal **union merge** for the rest.
- For "Mailbox" use **LWW**, which shouldn't pose any issues if we continue operating on whole items.
- "Routing table" is very similar to "Unaddressed Mailbox" (both are sets), so similar reasoning applies.
- For "Client-Project" and "Project-Project Data" mappings (they are simple key-value pairs), both
**additions win** (if we have delete and add concurrently) and **LWW** can be used.
:warning: Whenever we assume that "something wins", it means overwriting/aborting some other update,
so is essentially signifies some data loss. **The trick is to lose redundant data only or be able to
repair.**
:::
## Appendix I: Implemented RocksDB data types
We have built several data types on top of the RocksDB. What follows is a brief explanation on how they are
implemented, which has the direct impact on how we can express our current schema.
### String
The String data type treats values as just string of bytes:
- Both key and value are marshaled into message pack, and stored within a single column family.
- No meta data column family, as we do not to keep extra information about values outside the values themselves.
- All values (for all data types) are wrapped into `DataExt` struct (which should probably be called
`DataContext`). This wrapper holds extra information per value: `expiration_time`, `updated`.
Whenever we need to augment a value with some internal data, we extend the wrapper.
Key | Value
-|-
`key` | `DataExt(value)`
Interface is simple:
```rust!
/// Gets value for a provided `key`.
///
/// Time complexity: `O(1)`.
fn get(key);
/// Sets value for a provided key.
///
/// Time complexity: `O(1)`.
fn set(key, value, ttl);
/// Returns and removes value for a provided key.
///
/// Time complexity: `O(1)`.
fn getdel(key);
/// Returns initialized iterator for a provided `key` range.
fn scan_iterator(left, right) -> StringStorageIterator;
```
We do not *update* values, when something changes, we replace the value. This allows us to rely
on timestamps for ordering events, without too much worrying about concurrent updates.
### Set
Originally implemented as a `key -> set` mapping, where values were encoded as lists of items.
Key | Value
-|-
`key` | `[val1, val2, ..., valN]`
This resulted in `O(n)` insertions and deletions (you unpack the set add/remove element, pack the
set back). Additionally, concurrent writes required semantic merge operation, to reconciliate
the set (we've used RocksDB merge operator).
The only upside of such an implementation was that it is relatively simple. Downsides outweighted heavily.
We gradually moved to a better design, which allows both more optimal data manipulation and less
contention on concurrent updates.
Right now, for each `key -> [val1, val2, ..., valN]` pair we add a single entry into meta data column family:
Key | Value
-|-
`cf_id` + `key` | `{version: timestamp}`
where
- `cf_id` is identifier allowing different set/map instances to use the very same meta
data column family -- all records are prepended with that set/map instance id.
- `version` is used to
- quickly remove a set and all its items (increase version, garbage collect in background)
- ability to quickly select range of keys (see below)
And the actual set elements are put into their own column family:
Key | Value
-|-
`key` + `version` + `valK` | `DataExt(None)`
where
- `version` comes from meta data, and you are able to search key ranges easily `key+version..key+(version+1)`
- data additions/removals are `O(1)` operations, as you only add/remove distinct nodes, never looking
in the rest of the set.
- as each value is wrapped into `DataExt`, we are able to provide per element extra info (like expiration times)
- data contention is also avoided, as we can use the `LWW` for individual items, and items operate on
distinct rows.
The resultant data type performs under the same time complexity assumptions as Redis's `set` data type.
Interface:
```rust!
/// Returns all the members of the set value stored at key.
fn smembers(key) -> HashSet<val>;
/// Adds specified members to a set identified by provided key.
///
/// If set with a given key doesn't exist, it is created. Multiple items can
/// be added in a single call. Duplicate values are ignored.
fn sadd(key, values: [val1, ..., valK], ttl);
/// Removes specified member from a set identified by provided key.
///
/// If after a sequence of merges set doesn't have any elements, the key is
/// removed on the next compaction, by the compaction filter.
fn srem(key, value);
/// Returns initialized iterator for a provided `key` range.
fn scan_iterator(left, right) -> SetStorageIterator;
```
### Map
The `set` data type is actually a `map` with empty values, i.e. `set[key] = [val1, ..., valK]` can
be represented using `map[key] = {val1: none, ..., valK: none}` map.
Using this fact, maps were build in a very similar manner. So, maps are
`map[key] = {field1: val1, ..., fieldK: valK}` objects, and are represented in DB as following:
Meta data column family:
Key | Value
-|-
`cf_id` + `key` | `{version: timestamp}`
Data column family:
Key | Value
-|-
`key` + `version` + `fieldK` | `DataExt(valK)`
### Additional information
- Currently, our keys (in Redis and MongoDB) are overly verbose -- unnecessarily taking more space
than required. In RocksDB adapter, keys are marshaled into message pack messages, without any extra
strings added, i.e. `Topic("543f33a28235e70cb2860d4cd1f4423184450311d6ca72477dad71a337b1ac9f")`
will be marshaled into bytes w/o appending/prefixing anything like `mailbox/{topic}`.
We don't need those extra qualifiers (each data type is in its own column family -- table) and we use keys to scan over them in RocksDB.
## Appendix II: Database schema
Currently, we have a relatively easy schema we need to support (hence, in many places we can opt for
a more straightforward solution, at least during the first iteration).
### 1. Unaddressed mailbox
| Entity | Type | Info |
|-|-|-|
| Key | `domain::Topic` ||
| Value | `mailbox::MailboxMessage[]` | List of `MailboxMessage` objects per key |
| RocksDB Value Type | `Set` | see Appendix I |
Sample topic:
`Topic("543f33a28235e70cb2860d4cd1f4423184450311d6ca72477dad71a337b1ac9f")`
Sample message:
```json=
MailboxMessage {
sender_id: Some(ClientId("z6MkhhNEThbvirkLoqr4cucwp7bcYCNWjwCiWbuRdX7mnDGQ")),
recipient_id: Some(ClientId("z6MkhhNEThbvirkLoqr4cucwp7bcYCNWjwCiWbuRdX7mnDGK")),
content: Request {
id: MessageId(0),
jsonrpc: "2.0",
params: Publish(
Publish {
topic: Topic("543f33a28235e70cb2860d4cd1f4423184450311d6ca72477dad71a337b1ac9f"),
message: "eyJ0aXRsZSI6IlRlc3Qgbm90aWZpY2F0aW9uIiwiYm9keSI6IkZpbmQgb3V0IG1vcmUiLCJpY29uIjpudWxsLCJ1cmwiOm51bGx9",
ttl_secs: 30, tag: 42, prompt: false
}
)
}
}
```
### 2. Mailbox
| Entity | Type | Info |
|-|-|-|
| Key | `domain::ClientId` + `domain::Topic` | `(recipient_id, topic)`|
| Value | `mailbox::MessageId` -> `mailbox::MailboxMessage` |list of `(message_id -> message)` pairs|
| RocksDB Value Type | `Map` | see Appendix I |
Since, `Mailbox` is saved as `map` in our database, we are able to manipulate data
on per message id level, e.g.
```
// pseudocode
key = (recipient_id, topic)
// Returns the value associated with `message_id` in the hash stored at `key`.
db.hget(key, message_id)
```
NB: With `set` datatype we can also access set elements individually, but by value, not key (and as
such it involves scanning set in `O(n)` to find the corresponding element). With maps, per message
operations are all `O(1)`.
### 3. Routing Table
| Entity | Type | Info |
|-|-|-|
| Key | `domain::Topic` | |
| Value | `routing_table::RoutingTableEntry[]` | List of `RoutingTableEntry` objects per key|
| RocksDB Value Type | `Set` | see Appendix I |
Sample topic:
`Topic("543f33a28235e70cb2860d4cd1f4423184450311d6ca72477dad71a337b1ac9f")`
Sample routing table entry:
```json=
RoutingTableEntry {
client_id: ClientId("z6MkhhNEThbvirkLoqr4cucwp7bcYCNWjwCiWbuRdX7mnDGK"),
relay_addr: 127.0.0.1:8080
}
```
### 4. Client to Project mapping
| Entity | Type | Info |
|-|-|-|
| Key | `domain::ClientId` | |
| Value | `domain::ProjectId` | |
| RocksDB Value Type | `String` | see Appendix I |
Sample client:
`ClientId("z6MkhhNEThbvirkLoqr4cucwp7bcYCNWjwCiWbuRdX7mnDGK")`
Sample project:
`ProjectId("af25318b8d1e15a3ca0d2612c1334eba")`
### 5. Project to Project Data mapping
| Entity | Type | Info |
|-|-|-|
| Key | `domain::ProjectId` | |
| Value | `project::ProjectDataResponse` | `Result(ProjectData, StorageError)` we cache even errors atm |
| RocksDB Value Type | `String` | see Appendix I |
Sample project:
`ProjectId("af25318b8d1e15a3ca0d2612c1334eba")`
Sample project data:
```json=
ProjectData {
uuid: "f889b1cc-63d8-418c-ace6-e3275c675d85d",
name: "PROJECT_NAME470",
push_url: None,
keys: [],
is_enabled: true,
allowed_origins: []
}
```
## References
- [Optimistic Replication and Resolution][1], Marc Shapiro (2009)
- [Cassandra - A Decentralized Structured Storage System][7] Avinash Lakshman, Prashant Malik (2009)
- [Jepsen: Cassandra][3], Kyle Kingsbury (2013)
- [Dynamo: Amazon’s Highly Available Key-value Store][5], Giuseppe DeCandia et al. (2007)
- [All Things Clock, Time and Order in Distributed Systems: Physical Time in Depth][2], Kousik Nath (2021)
- [The problem with timestamps][4], Kyle Kingsbury (2013)
- [Time, Clocks, and the Ordering of Events in a Distributed System][8] Leslie Lamport (1978)
- [Dotted Version Vectors: Logical Clocks for Optimistic Replication][6] Nuno Preguiça et al. (2010)
[1]: <https://hal.inria.fr/hal-01248202/file/optimistic-replication-Encyclopedia-DB-systems-2009.pdf> "Optimistic replication and resolution"
[2]: <https://medium.com/geekculture/all-things-clock-time-and-order-in-distributed-systems-physical-time-in-depth-3c0a4389a838> "All Things Clock, Time and Order in Distributed Systems: Physical Time in Depth"
[3]: <https://aphyr.com/posts/294-call-me-maybe-cassandra> "Jepsen: Cassandra"
[4]: <https://aphyr.com/posts/299-the-trouble-with-timestamps> "The problem with timestamps"
[5]: <https://www.allthingsdistributed.com/files/amazon-dynamo-sosp2007.pdf> "Dynamo: Amazon’s Highly Available Key-value Store"
[6]: <https://arxiv.org/abs/1011.5808> "Dotted Version Vectors: Logical Clocks for Optimistic Replication"
[7]: <https://www.cs.cornell.edu/projects/ladis2009/papers/lakshman-ladis2009.pdf> "Cassandra - A Decentralized Structured Storage System"
[8]: <https://amturing.acm.org/p558-lamport.pdf> "Time, Clocks, and the Ordering of Events in a Distributed System"