# Week 06 - Replication & CRDT
###### tags: `WS2020-IN2259-DS`
## Data replication
### Why Replicate?
#### Performance
* 這裡先看 **data replication**,computation replication 是別的概念,之後 MapReduce 會再教。
* 範例:**Caching** data at **browsers** and **proxy servers**,方便資料的快速讀取。

* 真實案例:Typical (video) CDN (Content Delivery Network)。

#### High availability
* 如果發生 crash,資料可以從別的 replica 獲得。

* **Network Partitioning**:就算 Partition 脫離,兩個中心還是可以各自擔任戰備。

* **Partition tolerance**:指系統擁有在 network partitions 狀態下繼續運作的能力。
#### Fault tolerance
* 就算出現 faulty nodes (**並非僅是 crash,而是任意未知的錯誤**),也能提供穩定的服務。

#### “Cost” of Replication
* **storing** additional copies of data。
* **keep replicas up to date**。
* 如何處理 replicas 中的 **stale (out-of-date) data**。
### Self-study Questions
* What is the “cache hierarchy” for web data? Think of web pages with varying complexity of content; understand where each content element is cached.
* Is this cache hierarchy the same for any distributed data processing system?
* Find some popular examples of open-source caching technology and understand their position in the cache hierarchy.
* Think of alternatives for recovering a crashed replica – how does it catch up to the current state?
* Determine conflicts when issuing reads and writes concurrently to replicas.
## Replication patterns
### Active replication
#### 定義
* 由 **clients 傳送 requests 至每一個 replica** 來完成 replication。

* Requests 可以是 read,也可以是 write。
* Replicas 可能會 crash。
* **Fast to get a response**:因為多個 replica 會回傳 response,因此只要收到第一個 (**first result received**),就可以立即進行處理。
除非假設有 Byzantine failure,那就要等收到 majority results 後才能處理。
* 需要 **Configuration service** 來協助 client 獲知 replica 狀態。
* **Failure detection**:例如透過 heartbeats 機制來偵測故障的 replica。
* **Configuration management**
#### 問題
* 若**同步問題**沒有處理好,Replicas 的 value 可能會分散 (**diverge**)。

#### 解決問題
* 需要 **total order broadcast**:來自所有 clients 的所有 requests,**均為相同的 order**。
#### 圖解
* **Write**

* **Read** (或者可以實作 read from a quorum,以應對 Byzantine failure)

### Passive replication
* 分兩種 scheme:**primary-backup** 與 **multi-primary**。
#### Primary-backup

##### 定義
* **Primary**
* 接收來自 clients 的 **invocations** (調用)。
* 執行 **requests** 與 回傳 **replies**。
* 複製 (**Replicates**) 狀態至其他 replicas。
* **Backup**
* 只與 primary 互動。
* 擔任 primary 的備援 (透過 LE)。
* **leader election**:選一個 **primary** replica (有可能透過 configuration service 提供 LE)。
* **Eager replication**:
在 **request boundary 內** (primary 收到 request 至送出 reply 的區間) 處理完 replication。

* **Lazy replication**:
因為 primary 對於 replicas 的寫入**不同步**,因此 replicas 有可能會 **diverge**,這需要額外的機制**處理 primary crashing** 的問題。

#### Primary-Backup: Presence of Failures
* 無論是 Eager replication 還是 Lazy replication,都會在 backups 中**選出新的 primary**。

##### After client receives reply
* 不會選出新的 primary。

##### Before propagating updates
* 步驟:
1. **client 進行 retry**:根據 **timeout mechanism**。
1. **retry** 可能還是會**失敗**:因為可能仍對原本的 primary 做 retry。
1. **檢查**是否有新的 **leader**:透過 configuration service 進行檢查,並再次 retry。
* 需求:
* **Timeout mechanism**:讓 client 進行 retry。
* **Configuration service**:Failure detection、Leader election、Configuration management
* 圖示:

##### Before receiving all write acknowledgements
* 需求:
* **Timeout mechanism**:讓 client 進行 retry。
* Replica 維護一份 **persistent log**:為了**避免重複寫入**,因此需要於各個 replica 維護一份 **WAL**。
* 如果 replica 的介面具有 **idempotence (冪等性)**,那麼這種 idempotend 的操作就允許重複的 write opration。
* 參考閱讀:[Idempotency Key:原理與實測](https://william-yeh.net/post/2020/03/idempotency-key-test/)
* **Configuration service**:Failure detection、Leader election、Configuration management
* 圖示:

#### Multi-primary


##### 目標
* 有效利用所有 replicas 的資源:distribute the loading of replicas。
##### 定義
* 允許所有的 replicas 都有權限處理來自 client 的 requests。
* 因此 replicas 需要對 requests 進行排序,例如使用 **consensus**。
* 使用 **eager replication**:
* **速度慢**,因為 replica 會被鎖定 (locked)。
* 在執行任何指令與回覆 clients 前,replicas 的 **operation order** 必須協調至一致。
* 使用 **optimistic Lazy replication** (optimistic Lazy MPR):
* **改善 response times**:replicas 可以**及時回覆** client,以及執行**非同步 propagate** updates。
* 也稱作 optimistic replication。
* 問題:因為 **replicas 會 diverge**,因此會產生 inconsistencies、aborts、rollbacks。
### Self-study Questions
* Specify as pseudo code how a configuration service is used in each replication failure scenario.
* Further, specify as pseudo code each the client and replica, including the additional mechanisms needed in each failure scenario.
* Analytically compare all replication patterns in terms of number of messages exchanged for reading and writing, given n replicas.
* How do the replication patterns compare in terms of potential for concurrency of operations?
## Chain replication - OVERVIEW
* Update - Reply

* Query - Reply

* 注意事項
* **Update** processing request to head (write)
* **Update** processing reply from tail (write)
* **Query** processing from/to tail (read)
### Update 比較 (Primary-Backup vs. Chain Replication)
* Primary-Backup Replication (**4 messages**)

* Chain Replication (**5 messages**)

### Query 比較 (Primary-Backup vs. Chain Replication)
* Primary-Backup Replication (等待所有比現在這個 query 早的 updates 同步完成)

* Chain Replication (直接發送 reply!)

### Fault-tolerance in ChainReplication
* $f + 1$ 個 nodes 可以容忍 $f$ 個 failures。
### Self-study Questions
* How many node failures can the primary-backup approach sustain without disrupting service?
* Discuss pros and cons of chain, primary-backup, active, and multi-primary replication?
* Quantify the number of messages exchanged by drawing on n replicas for each of the replication schemes for m updates.
* Discuss the replica balance across all approaches?
## Chain replication - UPDATE & QUERY OPERATIONS
* $R_2$ is the predecessor of $R_3$
* $R_3$ is the successor of $R_2$
### Updates
* 步驟 1

* 步驟 2

* 步驟 3

* 步驟 4

### Multiple Updates
* 可以同時處理多個 update

### 各個 History 之間的關係
* subset 與 superset

### Queries
* The **tail** is the point (關鍵) of **linearization** (線性化)!

### Self-study Questions
* Do speculative histories have to be persisted to disk?
* Do stable histories have to be persisted to disk?
* While the subset relationships are neat, do they serve an ulterior purpose?
* Could a reply to an update be sent before transitioning the update to the stable history of the tail?
* Can chain replication work effectively with a single replica?
## Chain replication - FAILUR HANDLING & RECONFIGURATIONS
### Failures Handling
#### Head failure
* **沒有 update**:$R_2$ 直接成為 new head。

* **Head 收到 update,但還沒 propagate**:client 收不到 reply,因此發起 retry。



#### Middle node failure
* $R2$ 故障,$R_{head}$ 向故障 $R2$ 的 successor $R_3$ 對談。

* $R_3$ 成為 $R_{head}$ 的 successor,$R_{head}$ 向 $R_3$ 傳播 update (紫色小圈圈)。

* Successor 傳播 in-flight acknowledgements 至 new predecessor 處,完成 update (綠色小圈圈,比較早的 update)。

#### Tail failure
* $R_{tail}$ 故障,$R_{3}$ 成為 new tail。

* $R_{3}$ 將 **stable history** 與 **speculative history** 刷新 (flush) 成相同的狀態,

### Reconfigurations
* 因應 failure recovery 的 node 新增。
* 因應 extend topology 的 node 新增。
* 為 replicas 設定一個新的 chain。
#### Adding a New Node
* A Configuration Change
* 使用特別的指令 **configuration updates**:**add(nodeid)** 來建構 chain。
* 圖示

* Inferring Configuration
* 透過**查看 configuration updates 的順序**,nodes 便可以**確定 chain 的配置**方式。
* 例如 old tail 在收到 add(new tail) 後,就知道自己不再是 tail 了。
* 圖示

* Flush History to New Tail
* 圖示


* Copy Speculative onto Stable History
* 圖示

* Respond to Queries and Acknowledge Updates
* 圖示

* Propagate Acknowledgements
* 圖示

### Self-study Questions
* Given a toplogy n replicas, n>2, how does replica i know about replica i+2, assuming replica i+1 failed?
* Go through the motions of constructing a topology with add(nodeId) messages.
* When does a replica know for sure a node has been permanently added?
## Gossiping
### Protocols
* Disseminate information in **incremental manner** (漸進的方式)。
* 避免因為 heavy broadcast messages 導致 nodes 的 overloading。
* 缺點就是需要比較多的時間來 propagate information。
* 各個 node 各自維護對於其他 nodes 的 **partial view**。
* 各個 node **隨機**挑選其他 nodes 進行「聊八卦」:
* Application data (e.g., current state)
* Its partial view (e.g., topology information)
* 各個 node 根據「聽到的八卦」來更新自己的 state 與 partial view。
* 「八卦」發生是:**periodically** (週期性的)、**non-deterministically** (不確定的,亦即中間結果無法確定)
* 例如:Cassandra 以 gossiping 來傳播 node status、failure information 以及 metadata。
### Lazy Replication Using Gossiping
* Replicas 聊 operations 的「八卦」。
* Replicas 比較 (**reconcile**/compare) **自己的 operation logs** 與**收到的 operations**。
* 目標是確定還沒執行的 operations,如此可以確保所有 operations 都會執行到,且只會執行一次 (除非我們有 idempotent system)。
* 這個步驟與該 application 極為相關。
* 我們**假設**所有 operations 可以以任何順序 (**in any order**) 被加到 replica 中。
* 當系統不再處理任何 operations 後,所有的 replicas 在不斷的「八卦」後,最終會收斂 (**eventually converges**) 至相同的狀態。
### Self-study Questions
* Draw out any topology of nodes and inject a message at a randomly chosen node, compare a broadcast (send to all neighbours versus a gossip (send to some neighbours):
* How many messages are required?
* How long does it take for all nodes to be up-to-date?
* Have each node in your topology maintain a data structure (e.g., counter, list, array, set, etc.), inject data structure updates at random nodes and propagate these updates via gossip:
* What is the net result?
* Does your replicated system converge at each replica to one and the same state (data structure)?
## CRDT - CONFLICT-FREE REPLICATED DATA TYPES
### Eventual consistency, informally
* Desirable:**large-scale distributed systems**,因為需要 **high availability**。
* 實作簡單便宜 (如透過 gossip),但可能會丟出**舊資料 (stale data)**。
* 對於強調 stronger consistency 的環境,eventual consistency 會造成**很大的挑戰 (challenge)**。
#### Handling Concurrent Writes
* 前提:很少,甚至**沒有 concurrent writes** 的場景 (scenarios),如 client-centric consistency。
* 但仍然需要 **handle concurrent writes** 的機制,以預防 concurrent writes 發生。
* 如果有 handle concurrent writes 的方法,eventual consistency 可以運用得更廣。
* 「只」需要保證在所有寫入 (至相同的 key-value) 後,無論 order 為何,**所有 replicas 收斂**。
#### 範例

#### Self-study Questions
* Think of a few basic data structures, like lists, sets, counters, binary trees, heaps, maps, etc., and visualize for yourself what happens if replicated instances of these structures are updated via gossip.
* Does their state converge, no matter the update sequence?
* What happens if update operations are lost or duplicated?
* What mechanisms we know other than gossip could be used to keep these replicated structures updated without violating their convergence.
* What are pros and cons of these mechanisms?
## CRDT - FROM STATE-BASED OBJECTS TO REPLICATED STATE-BASED OBJECTS
### Introduce
* Mostly plain old **objects**。
* 提供 clients 執行的 requests:**update** 與 **query**。
* 維護一份自己的內部狀態 (**Internal state**)。
* 對 instances (已初始化的 objects) 執行 **merge** requests。
* 提供 periodically merge (==support infrastructure==)
### 程式範例 - Class Average
* Internal state:`self.sum` and `self.cnt`。
* Query:return average。
* Update:以 x 更新 average。
* Merge:與另外一個 instance 合併 (merge)。
```python
class Avg(object):
def __init__(self):
self.sum = 0
self.cnt = 0
def query(self):
if self.cnt != 0:
return self.sum / self.cnt
else:
return 0
def update(self, x):
self.sum += x
self.cnt += 1
def merge(self, avg):
self.sum += avg.sum
self.cnt += avg.cnt
```
### Replicated State-based Object
* State-based object 的 replication 可以橫跨 nodes。
* 因此兩個 nodes 都必須有 State-based object 的 copy。
* Client 只向其中一個 nodes 傳送 requests。
* Nodes 間週期地傳送自己 State-based object 的 copy 至其他 nodes 進行 merge。
#### Timeline 圖解
* 就算橫跨不同 replicas,**operation identifier** 仍是**唯一**。

#### States and Causal Histories
* **Query** 要記得用 sum 除以 cnt (instance 數量)。

#### Merge
* 注意 **causal history** 的變化

#### Nodes Periodically Propagate Their State
* 把自己的 copy 傳送至另一個 replica 的 instance。

### Self-study Questions
* Think of a few basic data structures, like lists, sets, counters, binary trees, heaps, maps, etc., and visualize for yourself what happens if replicated instances of these structures are updated via gossip.
* For the above data structures, specify merge operations that merge the state of two instances of a given structure.
* Assume merge happens periodically, does your replicated structures’ state converge?
## CRDT - EVENTUAL CONSISTENCY, MORE FORMALLY
### Eventual Consistency
* 兩個 `replicated state-based object`
* 相同 `causal history`
* Eventually (**not necessarily immediately**) converge to the same `internal state`
*
### Strong Eventual Consistency
* 兩個 `replicated state-based object`
* 相同 `causal history`
* **Immediately** have the same `internal state`
### MergeAverage
* 相同的 `causal history`,但**無法 converge** 至相同的 `internal state`。
* 不是 EC,更不是 SEC。
* 圖示:


### NoMergeAverage
* Object’s merge:
* 不做任何 merge 的動作。
* 其他機制與 `MergeAverage` 相同。
* 相同的 `causal history`,並且最終會 **converge** 至**穩定 (stable)** 但**不同 (different)** 的 `internal state`。
* 不是 EC,更不是 SEC。
* 圖示:


### BMergeAverage
* Object’s merge:
* a:不做任何事。
* b:以 a 的 `internal state` 覆蓋掉自己的 `internal state`。
* 其他機制與 `MergeAverage` 相同。
* 相同的 `causal history`,並且最終會 **converge** 至相同的 `internal state`。
* **Eventually consistent!** (因為 `a1` 與 `b1` 狀態不同,因此沒有達到 strong 等級)
* 圖示:


### MaxAverage
* Object’s merge:
* Pair-wise max of `sum` and `cnt`
* 其他機制與 `MergeAverage` 相同。
* 相同的 `causal history`,相同的 `internal state`。
* **Strongly eventually consistent!** (然而機制上不正確...)
* 圖示:


### Summary
* 設計 **strongly eventually consistent state-based object** 是大挑戰!
* **Comparison**:
* MergeAverage:相同的 `causal history`,不同的 `internal state`。
* NoMergeAverage:相同的 `causal history`,相同的 `stale state`,不同的 `internal state`。
* BMergeAverage:相同的 `causal history`,**最終到達**相同的 `internal state`。
* MaxAverage:相同的 `causal history`,**一直保持**相同的 `internal state`。
* 圖示:

### Self-study Questions
* Can you design Average such that it becomes EC or SEC as well as offers correct averaging semantics?
* Think of other data structures and design update, query, and merge operations with reasonable semantics.
* Always draw timelines and state diagrams for your designs and proof EC or SEC, if possible.
* Think of data structures that support multiple update operations and one or more query operations.
## CRDT - Conflict-free replicated data types, 2011
### Introduce
* Handles **concurrent writes**。
* Intuition:
* 不允許任意 value 的寫入,**寫入**操作必須**保證不會產生 conflict**。
* CRDT 的資料結構是設計**特殊的寫入操作**,以此保證 **strong eventual consistency** 與 **monotonicity** (no rollbacks)。
* Not panacea but a great solution!
### Classification
* CmRDT - **Commutative/op-based**
* 前提:
* **duplicate-free**:不會重複操作相同的指令,造成計數器重複累加。
* **no-loss messaging**:不會有遺失的指令,造成計數器的運算缺漏。
* 操作:在 replicas 間 propagate operations。
* 範例:growth-only counter,只處理 `increment` 指令。
* CvRDT - **Convergent/state-based**
* 操作:propagate 並 `merge` (idempotent merge) `internal state`。
* 範例:max register,可以儲存最大值。
* 因此,CRDT 的 value 是取決於 multiple write operations or states,而非只有最後一個。
### Properties (state-based CRDT)
* Code Example
```python
class IntMax(object):
def __init__(self):
self.x = 0
def query(self):
return self.x
def update(self, x):
assert x >= 0
self.x += x
def merge(self, other):
self.x = max(self.x, other.x)
```
* `merge` is **associative**
* `merge(merge(a, b), c) = merge(a, merge(b, c))`
* `merge` is **commutative**
* `merge(a, b) = merge(b, a)`
* `merge` is **idempotent**
* `merge(a, a) = a`
* Every `update` is **increasing**
* `merge(a, update(a, x)) = update(a, x)`
### G-Counter CRDT
* Growth-only counter。
* n 個 nodes 的 G-Counter:大小為 n 的非負整數陣列。
* `query`:回傳 n 階 array 每個元素的總和。
* `add(x)`:如果在第 i 個伺服器被觸發,那就在 array 的第 i 個元素操作 `add(x)`。
* `merge`:對兩個 array 做 **pairwise maximumm**。
### PN-Counter CRDT
* Positive-Nagitive counter。
* 2 個 G-Counter:`p` 與 `n`。
* `query`:回傳 `p.query()` - `n.query()`。
* `add(x)`:執行 `p.add(x)`。
* `sub(x)`:執行 `n.add(x)`。
* `merge`:對 2 個 PN-Counter 做 **pairwise maximumm**。
### G-Set CRDT
* Growth-only set。
* 可以進行累加,但不行去除元素的 replicated set。
* `query`:回傳 `G-Set` (Internal state)。
* `add(x)`:新增 `x` 至 `G-Set`。
* `merge`:執行 set union。
### 2P-Set CRDT
* 2 個 G-Set:`a` 與 `r`。
* `query`:回傳 `a.query()` - `r.query()` (set difference)。
* `add(x)`:執行 `a.add(x)`。
* `sub(x)`:執行 `r.add(x)`。
* `merge`:對 2 個 G-Set 做 **pairwise maximumm**。
### Summary
* CmCRDTs and CvCRDTs are **equivalent**!
* Really **neat** (**elegant**) solution if it applies
### Self-study Questions
* For the CRDTs introduced, establish its four properties.
* Create example executions for each CRDT and complete a timeline and a state table.
* Fine use cases where the introduced CRDTs apply and show how they are used.
* Think of new CRDTs and repeat the above.