# ch5 replication
###### tags: `design_DDA`
## why replication?
* geographyically closer to user to reduce latency
* greater available for hazadous available
* increase read through put
## replication type
1. single-leader
2. multi-leader
3. leaderless
leader-base=active/passive=master-slave
## synchronous vs asynchronous
* when multi-node update replica successful,wait all success or some success
* most case: synchronous for one of follower is synchronous,leader plus the follower called as "semi-synchrous"
*
## read-write consistency
## setup new follower
1. take snap shot, copy to follower
2. new follower ask leader to "caught up" the rest change beyond the snapshot
## handle node outage
1. follower failure: catchup the lost data by log and ask the leader
2. leader failure: promote one follower to leader
a. client and other follower recognize new leader
### how to detect failover
* determine leader failed: timeout
* choose new leader: use the most up-to-date follower to decrease data lost.
* reconfig system to use new leader, avoid origin leader back to work as leader
#### issue
1. async lost data when follower update to leader, primary key duplicate issue
2. two leader scheme: data corrupt, sol. database auto detect leader count equal one.
3. timeout scale choosing: how long is suitable? too long lose data, too short false alarm
## replication-log
### statement-based replication: insert,update sql language directly
1. now(),rand() lead to data inconsistent -> replcae with direct value
2. update where \<condition\> restrict the undergo order is important
3. stored procesdure, function might differnet from diff follower -> use deterministic function
```
A deterministic function always gives the same answer when it has the same inputs.
```
while many edge case, not recommend to use statement-based.
ex. mysql <5.1, VoltDB
### WAL-write ahead log shipping: append-only sequence of bytes containing all write to database.
every modification write into log, if crash occur, the index can be restored to consistent state. nearly copy the same data to follower. while the log describe in very low level(byte in which block change). replication nearly copy all the engine. if the version different from leader and follower, the replication nearly impossible.
ex. pg,oracle
[how to get WAL in pg](https://isdaniel.github.io/postgresql-wal-introduce/)
### logical\(row-based\) log replication
different format for replication and storage engine, decouple. distinguish it from storage engine's(physical) data representation.
relational-database is usually sequence of records describeing writes to database table at granularity of row.
* insert: the new value of all column
* delete: enough info to distinguish the row ex.PK, or all column if no PK
* update: all new value and enough info to distinguish the row
easy for external resourse, example: data-analyse warehouse
ex.mysql binlog
### trigger-based repliaction
external action to copy the data, although flexibe but prone to bug
## problem from replication lag
1. replicate distribute read request to follower increase flow capacity, but only in asynchrounous replication, or the write would fail if one follower crash. while in async, user might read outdate data
### eventual consistency:
if stop to write, then "wait a while", all same request should get same response from all followers.
```
why?
claiming that transactions are too expensive in terms of performance and availability,
and asserting that eventual consistency is inevitable in a scalable system
```
read-after-write consistency: if user send update data, the user should see the info they update when reloading page. ->implementation: always read the user's profile from the leader, other's from followers.
```
This is a guarantee that if the user reloads the page, they will always
see any updates they submitted themselves. It makes no promises about other users:
other users’ updates may not be visible until some later time
```
if hard to distingusish the self/others part, working around might be:
1. you could track the time of the last update and, for one minute after the last update, make all reads from the leader. the time 1 min depend on the replication lag timescale.
2. client record the update timestamp, then only accept the replica response with the update timestamp after the client time.
cross-device read-after-write consistency:
route requests from same user(even cross-devices) to same replica
#### monotonic read
* read from diffent replica with non-eqaul data
sol: make user read from the same replica, hash userid to choose the replica to read.

#### consistent prefix read
* violation of casuality(result and reason). ex. conversation

observer reading content disobey the casuality. occuring from sharding database. Different partitions operate independently, so there is no global ordering of writes: when a user reads from the database, they may see some parts of the database in an older state and some in a newer state.
sol. write to the same partition. done by "happens-before"
## multi-leader replication (master-master, active/active replication)
Leader-based replication has one major downside: there is only one leader, and all writes must go through it.
prob of multi-leader:
1. each node that processes a write must forward that data change to all the other nodes
2. conflict resolvation
3. autoincrementing keys, triggers, and integrity constraints
scenario
1. multi-datacenter: all write should replcate to other datacenter.
| | single | multi |
| --------------------------------------- | --------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------ |
| performance | all write need to go through single node, greater latency and bad performance | write disperse to multi datacenter, seem to be faster |
| High Availability | promote follower to be next leader. | each datacenter can continue operating independently of the others,and replication catches up when the failed datacenter comes back online |
| Tolerance of network problems for write | very sensitive to problems in this inter-datacenter link, because writes are made synchronously over this link. | a temporary network interruption does not interrupt writes being processed. |
### Clients with offline operation
multi-datacenter just like application suppoting offline functions.
You need to be able to see your meetings (make read requests) and
enter new meetings (make write requests) at any time, regardless of whether your
device currently has an internet connection. If you make any changes while you are
offline, they need to be synced with a server and your other devices when the device is next online.
local database that acts as a leader (it accepts write
requests), and there is an asynchronous multi-leader replication process (sync)
between the replicas of your calendar on all of your devices. The replication lag may
be hours or even days, depending on when you have internet access available.
each device is a “datacenter,”
and the network connection between them is extremely unreliable. As the rich history
of broken calendar sync implementations demonstrates, multi-leader replication
is a tricky thing to get right.
#### couchDB
[wiki ref](https://zh.wikipedia.org/zh-tw/CouchDB)
1. 不同於關係型資料庫,CouchDB沒有將資料和關係儲存在表格里。替代的,每個資料庫是一個獨立的文件集合。每一個文件維護其自己獨立的資料和自包涵的schema。一個應用程式可能會存取多個資料庫,比如其中一個位於使用者的手機上,另一個位於在遠端的伺服器上。文件的元資料包含版本資訊,讓其能夠合併可能因為資料庫連結遺失導致的任何差異。
2. 實現了一個多版本並行控制(MVCC)形式,用來避免在資料庫寫操作的時候對檔案進行加鎖。衝突留給應用程式去解決。解決一個衝突的通用操作的是首先合併資料到其中一個文件,然後刪除舊的資料
#### collaborative editing
the changes are instantly applied to their local replica (the
state of the document in their web browser or client application) and asynchronously replicated to the server and any other users who are editing the same document.
1. lock document and release: If another user wants to editthe same document, they first have to wait until the first user has committed their changes and released the lock. This collaboration model is equivalent to single-leader replication with transactions on the leader.
2. async: it also brings all the challenges of multi-leader replication, including requiring conflict resolution
## write conflict

### Synchronous versus asynchronous conflict detection
single do lock: database, the second writer will either block and wait for the first write to complete, or abort the second write transaction, forcing the user to retry the
write.
multi both success(QQ): On the other hand, in a multi-leader setup, both writes are successful, and the conflict is only detected asynchronously at some later point in time. At that time, it may be too late to ask the user to resolve the conflict.
sol 1- synchrous write: while correct result, but lose the benefit from multi-lead, just use single lead.
### Conflict avoidance
particular record change go to the same leader.(record based) -> make this action as single-based lead.
ex. in an application where a user can edit their own data, you can ensure that requests from a particular user are always routed to the same datacenter and use the leader in that datacenter for reading and writing. Different users may have different “home” datacenters (perhaps picked based on geographic proximity to the user),but from any one user’s point of view the configuration is essentially single-leader
### Converging toward a consistent state
A single-leader database applies writes in a sequential order: if there are several updates to the same field, the last write determines the final value of the field.
In a multi-leader configuration, there is no defined ordering of writes, so it’s not clear what the final value should be.
rule : every replication scheme must ensure that the data is eventually the same in all replicas.
that all replicas must arrive at the same final value
when all changes have been replicated.
There are various ways of achieving convergent conflict resolution:
* last write wins(LWW): Give each write a unique ID (e.g., a timestamp, a long random number, a UUID, or a hash of the key and value), pick the write with the highest ID as the winner, and throw away the other writes.
* replica priority: Give each replica a unique ID, and let writes that originated at a highernumbered replica always take precedence over writes that originated at a lowernumbered replica. This approach also implies data loss.
* merge and wait for resolving: Somehow merge the values together—e.g., order them alphabetically and then concatenate them (in Figure 5-7, the merged title might be something like “B/C”), or resolve by application code/ prompt to user to judge.
## Custom conflict resolution logic
* resolve on writing: when detect conflict in DB, run in background, must be execute quickly.
* resolve on reading: all the conflicting writes are stored. The next time the data is read, these multiple versions of the data are returned to the application. The application may prompt the user or automatically resolve the conflict, and write the result back to the database. CouchDB works this way.\(handle by application\)
### Automatic Conflict Resolution
1. Conflict-free replicated datatypes (CRDTs): data structures for sets, maps, ordered lists, counters, etc. that can be concurrently edited by multiple users, and which automatically resolve conflicts in sensible ways. Some CRDTs have been implemented in Riak 2.0
2. Mergeable persistent data structures: track history explicitly, similarly to the Git version control system, and use a three-way merge function (whereas CRDTsuse two-way merges).
3. Operational transformation: resolution algorithm behind collaborative editing applications such as Etherpad [30] and Google Docs. It was designed particularly for concurrent editing of an ordered list of items, such as the list of characters that constitute a text document.
~~### whereas CRDTsuse two-way merges~~
### conflict
1. two writes concurrently modified the same field in the same record, setting it to two different values. There is little doubt that this is a conflict.
2. read and update: booking ticket on the same site.
### Multi-Leader Replication Topologies

* all-to-all: every leader sends its writes to every other leader.
* circular: each node receives writes from one node and forwards those writes (plus any writes of its own) to one other node.
* star(tree): root node forwards writes to all of the other nodes. The star topology can be generalized to a tree.
for circular and star type,
1. to avoid infinite loop of passing write data, there will be unique tag on each node. each write will carry the node's tag they pass through. replica get the write with its own tag then ignore the change.
2. one node failure lead to the interruption of replication flow. while all-to-all wont be affected by single failure.
for all-to-all,
the casulity reversion might occur,

timestamp might not work owing to the clock cannot be trust(CH8).
solution: version vector
### version vector
## leaderless replication
A leader determines the order in which writes should be processed, and followers apply the leader’s writes in the same order.
leaderless:
1. allowing any replica to directly accept writes from clients
2. Riak, Cassandra, and Voldemort are open source datastores with leaderless replication models inspired by Dynamo, so this kind of database is also known as Dynamo-style.
##### data-pass type
1. the client directly sends its writes to several replicas,
2. while in others, a coordinator node does this(sending write to multi-nodes) on behalf of the client. However,unlike a leader database, that coordinator does not enforce a particular ordering of writes.
### Writing to the Database When a Node Is Down

get ok > n/2 means successfully write. (data dominate in all nodes)
for reading:
1. read requests are also sent to several nodes in parallel (send request to more than n/2 nodes)
2. Also Version numbers are used to determine which value is newer.
#### Read repair and anti-entropy
#### how unavailable node catch up
* read repair(client): The client sees that replica has a stale value and writes the newer value back to that replica. This approach works well for values that are frequently read.
* anti-entropy process(server self-detect):background process that constantly looks for differences in the data between replicas , then copies any missing data from one replica to another.
Unlike the replication log in leader-based replication, this anti-entropy process does not copy writes in any particular order, and there may be a significant delay before data is copied.
Note that without an anti-entropy process, values that are rarely read may be missing from some replicas and thus have reduced durability,because read repair is only performed when a value is read by the application.
### Quorum

read request send to more than the total stale replica can guarantee get the lastest info. latency will be longer but there will be guarantee get the lastest data.
w: number of success write return by replicas
r: number of replica read request sends to
make w+r > n(total nodes)
Reads and writes that obey these r and w values are called quorum reads and writes.
reads and writes are always sent to all n replicas in parallel. The parameters w and r determine how many nodes we wait for—
n often be odd (2n+1), then make w=r=(n+1)/2 round up
extreme case: w=n, make read faster(r=1), but write will fail when any one node fail.
• If w < n, we can still process writes if a node is unavailable.
• If r < n, we can still process reads if a node is unavailable.
• With n = 3, w = 2, r = 2 we can tolerate one unavailable node.
• With n = 5, w = 3, r = 3 we can tolerate two unavailable nodes.(n must >= w|r for write/read success)
r and w are chosen to be a majority (more than n/2) of nodes, because that ensures w + r > n while still tolerating up to n/2 node failures.
### quorum edge case: still get stale value
* sloppy quorum
* concurrent writes with LWW(last write wins) with skew timestamp.
* some nodes out of memory, make new value cannot write into, the succesfully write node will not rollback.
* replica copy from stale replica make stale replica dominates.
### Monitoring staleness
leader-base: For leader-based replication, the database typically exposes metrics for the replication lag, which you can feed into a monitoring system. This is possible because writes are applied to the leader and to followers in the same order, and each node has a position in the replication log (the number of writes it has applied locally).
However, in systems with leaderless replication, there is no fixed order in which writes are applied, which makes monitoring more difficult.
only uses read repair (no anti-entropy), there is no limit to how old a value might be — if a value is only infrequently read, the value returned by a stale replica may be ancient.
~~#### how to estimate staleness by n,w,r[48]~~
### sloppy quorum
accept writes anyway, and write them to some nodes that arereachable but aren’t among the n nodes on which the valueusually lives
quorom interrupt by network issue:
it’s likely that the client can connect to some database nodes during the network interruption, just not to the nodes that it needs to assemble a quorum for a particular value. In that case, database designers face a trade-off:
1. accept writes anyway, and write them to some nodes that are reachable but aren’t among the n nodes on which the value usually lives-> sloppy quorum
2. return error when quorum condition cannot be met?
#### hinted handoff
when sloppy quorum, Once the network interruption is fixed, any writes that one node temporarily accepted on behalf of another node are sent to the appropriate “home” nodes.
#### Is sloppy quorum "quorum" enough?
However, this means that even when w + r > n, you cannot be sure to read the latest value for a key, because the latest value may have been temporarily written to some nodes outside of n.
a sloppy quorum actually isn’t a quorum at all in the traditional sense.<font color="red">It’s only an assurance of durability</font>, namely that the data is stored on w nodes somewhere. There is no guarantee that a read of r nodes will see it until the hinted handoff has completed.
Sloppy quorums are optional in all common Dynamo implementations. In Riak they are enabled by default, and in Cassandra and Voldemort they are disabled by default.
#### Multi-datacenter operation
profit: neglect latency to the farer datacenter
* Cassandra and Voldemort: write to all replica in all datacenters, but quorum follow within the client's datacenter.
* Riak: only write to local datacenter, cross-datacenter communication in background.
### Detecting Concurrent Writes
although in Dynamo-style databases conflicts can also arise during read repair or hinted handoff.

eventually inconsistent!!!node 2 thinks that the final value of X is B, whereas the other nodes think that the value is A.
#### LLW(last write win)[53]
* force an arbitrary order on each writes
Even though the writes don’t have a natural ordering, we can force an arbitrary order on them. For example, we can attach a timestamp to each write, pick the biggest timestamp as the most “recent,” and discard any writes with an earlier timestamp. This conflict resolution algorithm, called last write wins (LWW), is the only supported conflict resolution method in Cassandra [53], and an optional feature in Riak.
**drawback** :at the cost of durability. if there are several concurrent writes to the same key, even if they were all reported as successful to the client (because they were written to w replicas), only one of the writes will survive and the others will be silently discarded. Moreover, LWW may even drop writes that are not concurrent, as we shall discuss in “Timestamps for ordering events” on page 291.
---
#### The “happens-before” relationship and concurrency

B is causally dependent on A.
---

concurrent: when each client starts the operation, it does not know that another client is also performing an operation on the same key. Thus, there is no causal dependency between the operations.(i.e., neither knows about the other)
### solve concurrent
* use map for each user and process merging, just like CRDTs.

the clients are never fully up to date with the data on the server, since there is always another operation going on concurrently. But old versions of the value do get overwritten eventually, and no writes are lost.

merging in process by version vector
Note that the server can determine whether two operations are concurrent by looking at the version numbers—it does not need to interpret the value itself.
return merging message for client to judge, when send back to server with new version number given by client. then server can overwrite all data with less version number to the client version number.
### This algorithm ensures that no data is silently dropped, but it unfortunately requires that the clients do some extra work
add: return item set for shopping cart example
remove: cannot only take the union of concurrent message, rather leave a "tombstone" to represent an item was removed by one concurrent action. take CRDTs as example
#### crdt
[yt_ref](https://www.youtube.com/watch?v=CD_0u03EdcA&ab_channel=CNCF%5BCloudNativeComputingFoundation%5D)

application_data and metadata
metadata: offer information for mergeing between peer-to-peer
### crdt example
before sync

after sync

key value= device_id increment_count
#### crdt-tradeOff
* extra memory for metadata
---
#### network protocal
you choose:
* websocket
* gossip protocol
* mesh network(libp2p)
* WebRTC
#### propagate update to peers
* idempotent: an update from one peer to another can be applied multiple times and the merging result is the same effect.(ex. send count:{100:1} get same result with multiple applied)
* commutative : X\*Y ===Y\*X , ordering doesn't matter.
* associative: \(X\*Y\)\*z ===Y\*\(X\*z\)

##### operation-based CRDTs
##### state-based CRDTs
### verison vector for multi-leader
single replica uses a single version number to capture dependencies between operations,but that is not sufficient when there are multiple replicas accepting writes concurrently.
Instead, we need to use a version number per replica as well as per key. Each replica increments its own version number when processing a write, and also keeps track of the version numbers it has seen from each of the other replicas. This information indicates which values to overwrite and which values to keep as siblings.
version vectors are sent from the database replicas to clients when values are read, and need to be sent back to the database
when a value is subsequently written. (Riak encodes the version vector as a string that it calls causal context.)
like in the single-replica example, the application may need to merge siblings.The version vector structure ensures that it is safe to read from one replica and subsequently
write back to another replica.