---
# System prepended metadata

title: Week 06 - Replication & CRDT
tags: [WS2020-IN2259-DS]

---

# Week 06 - Replication & CRDT
###### tags: `WS2020-IN2259-DS`

## Data replication

### Why Replicate?

#### Performance
* 這裡先看 **data replication**，computation replication 是別的概念，之後 MapReduce 會再教。
* 範例：**Caching** data at **browsers** and **proxy servers**，方便資料的快速讀取。
    ![](https://i.imgur.com/rjI4Xi6.png =500x)
* 真實案例：Typical (video) CDN (Content Delivery Network)。
    ![](https://i.imgur.com/xmMuzEY.png =500x)

#### High availability
* 如果發生 crash，資料可以從別的 replica 獲得。
    ![](https://i.imgur.com/GkoZL0Q.png =400x)
* **Network Partitioning**：就算 Partition 脫離，兩個中心還是可以各自擔任戰備。
    ![](https://i.imgur.com/Gk6C8kj.png =400x)
    * **Partition tolerance**：指系統擁有在 network partitions 狀態下繼續運作的能力。

#### Fault tolerance
* 就算出現 faulty nodes (**並非僅是 crash，而是任意未知的錯誤**)，也能提供穩定的服務。
    ![](https://i.imgur.com/Sq8jdse.png =400x)

#### “Cost” of Replication
* **storing** additional copies of data。
* **keep replicas up to date**。
* 如何處理 replicas 中的 **stale (out-of-date) data**。

### Self-study Questions
* What is the “cache hierarchy” for web data? Think of web pages with varying complexity of content; understand where each content element is cached.
* Is this cache hierarchy the same for any distributed data processing system?
* Find some popular examples of open-source caching technology and understand their position in the cache hierarchy.
* Think of alternatives for recovering a crashed replica – how does it catch up to the current state?
* Determine conflicts when issuing reads and writes concurrently to replicas. 

## Replication patterns

### Active replication

#### 定義
* 由 **clients 傳送 requests 至每一個 replica** 來完成 replication。
    ![](https://i.imgur.com/KsWR3Is.png =300x)
* Requests 可以是 read，也可以是 write。
* Replicas 可能會 crash。
* **Fast to get a response**：因為多個 replica 會回傳 response，因此只要收到第一個 (**first result received**)，就可以立即進行處理。
    除非假設有 Byzantine failure，那就要等收到 majority results 後才能處理。
* 需要 **Configuration service** 來協助 client 獲知 replica 狀態。
    * **Failure detection**：例如透過 heartbeats 機制來偵測故障的 replica。
    * **Configuration management**

#### 問題
* 若**同步問題**沒有處理好，Replicas 的 value 可能會分散 (**diverge**)。
    ![](https://i.imgur.com/bur50Wa.png =300x)

#### 解決問題
* 需要 **total order broadcast**：來自所有 clients 的所有 requests，**均為相同的 order**。

#### 圖解
* **Write**
    ![](https://i.imgur.com/2LM24Qo.png =500x)
* **Read** (或者可以實作 read from a quorum，以應對 Byzantine failure)
    ![](https://i.imgur.com/TwdjtaJ.png =500x)

### Passive replication
* 分兩種 scheme：**primary-backup** 與 **multi-primary**。

#### Primary-backup
![](https://i.imgur.com/Bk6WVHg.png =250x)

##### 定義
* **Primary**
    * 接收來自 clients 的 **invocations** (調用)。
    * 執行 **requests** 與 回傳 **replies**。
    * 複製 (**Replicates**) 狀態至其他 replicas。
* **Backup**
    * 只與 primary 互動。
    * 擔任 primary 的備援 (透過 LE)。
* **leader election**：選一個 **primary** replica (有可能透過 configuration service 提供 LE)。
* **Eager replication**：
    在 **request boundary 內** (primary 收到 request 至送出 reply 的區間) 處理完 replication。
    ![](https://i.imgur.com/IFl2U19.png =500x)
* **Lazy replication**：
    因為 primary 對於 replicas 的寫入**不同步**，因此 replicas 有可能會 **diverge**，這需要額外的機制**處理 primary crashing** 的問題。
    ![](https://i.imgur.com/TKvtEDn.png =500x)

#### Primary-Backup: Presence of Failures
* 無論是 Eager replication 還是 Lazy replication，都會在 backups 中**選出新的 primary**。
    ![](https://i.imgur.com/Z94nbdv.png =500x)

##### After client receives reply
* 不會選出新的 primary。
    ![](https://i.imgur.com/tWXswb0.png =500x)

##### Before propagating updates
* 步驟：
    1. **client 進行 retry**：根據 **timeout mechanism**。
    1. **retry** 可能還是會**失敗**：因為可能仍對原本的 primary 做 retry。
    1. **檢查**是否有新的 **leader**：透過 configuration service 進行檢查，並再次 retry。
* 需求：
    * **Timeout mechanism**：讓 client 進行 retry。
    * **Configuration service**：Failure detection、Leader election、Configuration management
* 圖示：
    ![](https://i.imgur.com/lGhYFvZ.png =500x)

##### Before receiving all write acknowledgements
* 需求：
    * **Timeout mechanism**：讓 client 進行 retry。
    * Replica 維護一份 **persistent log**：為了**避免重複寫入**，因此需要於各個 replica 維護一份 **WAL**。
        * 如果 replica 的介面具有 **idempotence (冪等性)**，那麼這種 idempotend 的操作就允許重複的 write opration。
        * 參考閱讀：[Idempotency Key：原理與實測](https://william-yeh.net/post/2020/03/idempotency-key-test/)
    * **Configuration service**：Failure detection、Leader election、Configuration management
* 圖示：
    ![](https://i.imgur.com/mB2zfgC.png =500x)

#### Multi-primary
![](https://i.imgur.com/8i9R3y9.png =250x)
![](https://i.imgur.com/ub9UsXd.png =250x)

##### 目標
* 有效利用所有 replicas 的資源：distribute the loading of replicas。

##### 定義
* 允許所有的 replicas 都有權限處理來自 client 的 requests。
    * 因此 replicas 需要對 requests 進行排序，例如使用 **consensus**。
* 使用 **eager replication**：
    * **速度慢**，因為 replica 會被鎖定 (locked)。
    * 在執行任何指令與回覆 clients 前，replicas 的 **operation order** 必須協調至一致。
* 使用 **optimistic Lazy replication** (optimistic Lazy MPR)：
    * **改善 response times**：replicas 可以**及時回覆** client，以及執行**非同步 propagate** updates。
* 也稱作 optimistic replication。
* 問題：因為 **replicas 會 diverge**，因此會產生 inconsistencies、aborts、rollbacks。

### Self-study Questions
* Specify as pseudo code how a configuration service is used in each replication failure scenario.
* Further, specify as pseudo code each the client and replica, including the additional mechanisms needed in each failure scenario.
* Analytically compare all replication patterns in terms of number of messages exchanged for reading and writing, given n replicas.
* How do the replication patterns compare in terms of potential for concurrency of operations?

## Chain replication - OVERVIEW
* Update - Reply
    ![](https://i.imgur.com/o76UDtF.png =300x)
* Query - Reply
    ![](https://i.imgur.com/UwksRnL.png =300x)
* 注意事項
    * **Update** processing request to head (write)
    * **Update** processing reply from tail (write)
    * **Query** processing from/to tail (read)

### Update 比較 (Primary-Backup vs. Chain Replication)
* Primary-Backup Replication (**4 messages**)
    ![](https://i.imgur.com/yXwfH0p.png =300x)
* Chain Replication (**5 messages**)
    ![](https://i.imgur.com/kneDLhI.png =300x)

### Query 比較 (Primary-Backup vs. Chain Replication)
* Primary-Backup Replication (等待所有比現在這個 query 早的 updates 同步完成)
    ![](https://i.imgur.com/SJVLjxS.png =300x)
* Chain Replication (直接發送 reply！)
    ![](https://i.imgur.com/AdCib5J.png =300x)

### Fault-tolerance in ChainReplication
*  $f + 1$ 個 nodes 可以容忍 $f$ 個 failures。

### Self-study Questions
* How many node failures can the primary-backup approach sustain without disrupting service?
* Discuss pros and cons of chain, primary-backup, active, and multi-primary replication?
* Quantify the number of messages exchanged by drawing on n replicas for each of the replication schemes for m updates.
* Discuss the replica balance across all approaches?

## Chain replication - UPDATE & QUERY OPERATIONS
* $R_2$ is the predecessor of $R_3$
* $R_3$ is the successor of $R_2$
### Updates
* 步驟 1
	![](https://i.imgur.com/WqcCn1V.png =500x)
* 步驟 2
	![](https://i.imgur.com/ed20uJ1.png =500x)
* 步驟 3
	![](https://i.imgur.com/erY4jmA.png =500x)
* 步驟 4
	![](https://i.imgur.com/bjObE9L.png =500x)

### Multiple Updates
* 可以同時處理多個 update
    ![](https://i.imgur.com/n6hDF1O.png =500x)

### 各個 History 之間的關係
* subset 與 superset
    ![](https://i.imgur.com/YFJdkaa.png =400x)

### Queries
* The **tail** is the point (關鍵) of **linearization** (線性化)!
    ![](https://i.imgur.com/1R4djAB.png =500x)

### Self-study Questions
* Do speculative histories have to be persisted to disk?
* Do stable histories have to be persisted to disk?
* While the subset relationships are neat, do they serve an ulterior purpose?
* Could a reply to an update be sent before transitioning the update to the stable history of the tail?
* Can chain replication work effectively with a single replica?

## Chain replication - FAILUR HANDLING & RECONFIGURATIONS

### Failures Handling

#### Head failure
* **沒有 update**：$R_2$ 直接成為 new head。
    ![](https://i.imgur.com/3vBjCS6.png =400x)
* **Head 收到 update，但還沒 propagate**：client 收不到 reply，因此發起 retry。
    ![](https://i.imgur.com/y0VxfUi.png =400x)
    ![](https://i.imgur.com/d7nz7Kt.png =400x)
    ![](https://i.imgur.com/Dt2Qzl5.png =400x)

#### Middle node failure
* $R2$ 故障，$R_{head}$ 向故障 $R2$ 的 successor $R_3$ 對談。
    ![](https://i.imgur.com/ZVGUTN1.png =400x)
* $R_3$ 成為 $R_{head}$ 的 successor，$R_{head}$ 向 $R_3$ 傳播 update (紫色小圈圈)。
    ![](https://i.imgur.com/aBhpRIP.png =400x)
* Successor 傳播 in-flight acknowledgements 至 new predecessor 處，完成 update (綠色小圈圈，比較早的 update)。
    ![](https://i.imgur.com/KvvutdT.png =400x)

#### Tail failure
* $R_{tail}$ 故障，$R_{3}$ 成為 new tail。
    ![](https://i.imgur.com/5s5dd4S.png =400x)
* $R_{3}$ 將 **stable history** 與 **speculative history** 刷新 (flush) 成相同的狀態， 
    ![](https://i.imgur.com/xQL0Ong.png =400x)

### Reconfigurations
* 因應 failure recovery 的 node 新增。
* 因應 extend topology 的 node 新增。
* 為 replicas 設定一個新的 chain。

#### Adding a New Node
* A Configuration Change
    * 使用特別的指令 **configuration updates**：**add(nodeid)** 來建構 chain。
    * 圖示
        ![](https://i.imgur.com/KEcrYyE.png)
* Inferring Configuration
    * 透過**查看 configuration updates 的順序**，nodes 便可以**確定 chain 的配置**方式。
    * 例如 old tail 在收到 add(new tail) 後，就知道自己不再是 tail 了。
    * 圖示
        ![](https://i.imgur.com/pDRTZfT.png)
* Flush History to New Tail
    * 圖示
        ![](https://i.imgur.com/qZGdm6Q.png)
        ![](https://i.imgur.com/JZOiujh.png)
* Copy Speculative onto Stable History
    * 圖示
        ![](https://i.imgur.com/iyRNmSr.png)
* Respond to Queries and Acknowledge Updates
    * 圖示
        ![](https://i.imgur.com/6i9EzrB.png)
* Propagate Acknowledgements
    * 圖示
        ![](https://i.imgur.com/kdEKruC.png)

### Self-study Questions
* Given a toplogy n replicas, n>2, how does replica i know about replica i+2, assuming replica i+1 failed?
* Go through the motions of constructing a topology with  add(nodeId) messages.
* When does a replica know for sure a node has been permanently added?

## Gossiping

### Protocols
* Disseminate information in **incremental manner** (漸進的方式)。
    * 避免因為 heavy broadcast messages 導致 nodes 的 overloading。
    * 缺點就是需要比較多的時間來 propagate information。
* 各個 node 各自維護對於其他 nodes 的 **partial view**。
* 各個 node **隨機**挑選其他 nodes 進行「聊八卦」：
    * Application data (e.g., current state)
    * Its partial view (e.g., topology information)
* 各個 node 根據「聽到的八卦」來更新自己的 state 與 partial view。
* 「八卦」發生是：**periodically** (週期性的)、**non-deterministically** (不確定的，亦即中間結果無法確定)
* 例如：Cassandra 以 gossiping 來傳播 node status、failure information 以及 metadata。

### Lazy Replication Using Gossiping
* Replicas 聊 operations 的「八卦」。
* Replicas 比較 (**reconcile**/compare) **自己的 operation logs** 與**收到的 operations**。
    * 目標是確定還沒執行的 operations，如此可以確保所有 operations 都會執行到，且只會執行一次 (除非我們有 idempotent system)。
    * 這個步驟與該 application 極為相關。
* 我們**假設**所有 operations 可以以任何順序 (**in any order**) 被加到 replica 中。
* 當系統不再處理任何 operations 後，所有的 replicas 在不斷的「八卦」後，最終會收斂 (**eventually converges**) 至相同的狀態。

### Self-study Questions
* Draw out any topology of nodes and inject a message at a randomly chosen node, compare a broadcast (send to all neighbours versus a gossip (send to some neighbours): 
    * How many messages are required?
    * How long does it take for all nodes to be up-to-date?
* Have each node in your topology maintain a data structure (e.g., counter, list, array, set, etc.), inject data structure updates at random nodes and propagate these updates via gossip: 
    * What is the net result?
    * Does your replicated system converge at each replica to one and the same state (data structure)?

## CRDT - CONFLICT-FREE REPLICATED DATA TYPES

### Eventual consistency, informally
* Desirable：**large-scale distributed systems**，因為需要 **high availability**。
* 實作簡單便宜 (如透過 gossip)，但可能會丟出**舊資料 (stale data)**。
* 對於強調 stronger consistency 的環境，eventual consistency 會造成**很大的挑戰 (challenge)**。

#### Handling Concurrent Writes
* 前提：很少，甚至**沒有 concurrent writes** 的場景 (scenarios)，如 client-centric consistency。
* 但仍然需要 **handle concurrent writes** 的機制，以預防 concurrent writes 發生。
* 如果有 handle concurrent writes 的方法，eventual consistency 可以運用得更廣。
* 「只」需要保證在所有寫入 (至相同的 key-value) 後，無論 order 為何，**所有 replicas 收斂**。

#### 範例
![](https://i.imgur.com/PWD7lVg.png =600x)

#### Self-study Questions
* Think of a few basic data structures, like lists, sets, counters, binary trees, heaps, maps, etc., and visualize for yourself what happens if replicated instances of these structures are updated via gossip.
* Does their state converge, no matter the update sequence?
* What happens if update operations are lost or duplicated?
* What mechanisms we know other than gossip could be used to keep these replicated structures updated without violating their convergence.
* What are pros and cons of these mechanisms?

## CRDT - FROM STATE-BASED OBJECTS TO REPLICATED STATE-BASED OBJECTS

### Introduce
* Mostly plain old **objects**。
* 提供 clients 執行的 requests：**update** 與 **query**。
* 維護一份自己的內部狀態 (**Internal state**)。
* 對 instances (已初始化的 objects) 執行 **merge** requests。
* 提供 periodically merge (==support infrastructure==)

### 程式範例 - Class Average
* Internal state：`self.sum` and `self.cnt`。
* Query：return average。
* Update：以 x 更新 average。
* Merge：與另外一個 instance 合併 (merge)。
```python
class Avg(object):
    def __init__(self):
        self.sum = 0
        self.cnt = 0

    def query(self):
        if self.cnt != 0:
            return self.sum / self.cnt
        else:
            return 0

    def update(self, x):
        self.sum += x
        self.cnt += 1

    def merge(self, avg):
        self.sum += avg.sum
        self.cnt += avg.cnt
```

### Replicated State-based Object
* State-based object 的 replication 可以橫跨 nodes。
* 因此兩個 nodes 都必須有 State-based object 的 copy。
* Client 只向其中一個 nodes 傳送 requests。
* Nodes 間週期地傳送自己 State-based object 的 copy 至其他 nodes 進行 merge。

#### Timeline 圖解
* 就算橫跨不同 replicas，**operation identifier** 仍是**唯一**。
![](https://i.imgur.com/lotICsN.png =500x)

#### States and Causal Histories
* **Query** 要記得用 sum 除以 cnt (instance 數量)。
    ![](https://i.imgur.com/8J0vns0.png =500x)

#### Merge
* 注意 **causal history** 的變化
    ![](https://i.imgur.com/ki669q6.png =500x)

#### Nodes Periodically Propagate Their State
* 把自己的 copy 傳送至另一個 replica 的 instance。
    ![](https://i.imgur.com/JhhXRgW.png =500x)

### Self-study Questions
* Think of a few basic data structures, like lists, sets, counters, binary trees, heaps, maps, etc., and visualize for yourself what happens if replicated instances of these structures are updated via gossip.
* For the above data structures, specify merge operations that merge the state of two instances of a given structure.
* Assume merge happens periodically, does your replicated structures’ state converge?

## CRDT - EVENTUAL CONSISTENCY, MORE FORMALLY
### Eventual Consistency
* 兩個 `replicated state-based object`
* 相同 `causal history`
* Eventually (**not necessarily immediately**) converge to the same `internal state`
* 

### Strong Eventual Consistency
* 兩個 `replicated state-based object`
* 相同 `causal history`
* **Immediately** have the same `internal state`

### MergeAverage
* 相同的 `causal history`，但**無法 converge** 至相同的 `internal state`。
* 不是 EC，更不是 SEC。
* 圖示：
    ![](https://i.imgur.com/N2C9RIS.png =300x)
    ![](https://i.imgur.com/81zWwrs.png =300x)

### NoMergeAverage
* Object’s merge：
    * 不做任何 merge 的動作。
* 其他機制與 `MergeAverage` 相同。
* 相同的 `causal history`，並且最終會 **converge** 至**穩定 (stable)** 但**不同 (different)** 的 `internal state`。
* 不是 EC，更不是 SEC。
* 圖示：
    ![](https://i.imgur.com/N2C9RIS.png =300x)
    ![](https://i.imgur.com/RuvZGDR.png =300x)

### BMergeAverage
* Object’s merge：
    * a：不做任何事。
    * b：以 a 的 `internal state` 覆蓋掉自己的 `internal state`。
* 其他機制與 `MergeAverage` 相同。
* 相同的 `causal history`，並且最終會 **converge** 至相同的 `internal state`。
* **Eventually consistent！** (因為 `a1` 與 `b1` 狀態不同，因此沒有達到 strong 等級)
* 圖示：
    ![](https://i.imgur.com/N2C9RIS.png =300x)
    ![](https://i.imgur.com/spL9KiO.png =300x)

### MaxAverage
* Object’s merge：
    * Pair-wise max of `sum` and `cnt`
* 其他機制與 `MergeAverage` 相同。
* 相同的 `causal history`，相同的 `internal state`。
* **Strongly eventually consistent！** (然而機制上不正確...)
* 圖示：
    ![](https://i.imgur.com/N2C9RIS.png =300x)
    ![](https://i.imgur.com/JihT18n.png =300x)

### Summary
* 設計 **strongly eventually consistent state-based object** 是大挑戰！
* **Comparison**：
    * MergeAverage：相同的 `causal history`，不同的 `internal state`。
    * NoMergeAverage：相同的 `causal history`，相同的 `stale state`，不同的 `internal state`。
    * BMergeAverage：相同的 `causal history`，**最終到達**相同的 `internal state`。
    * MaxAverage：相同的 `causal history`，**一直保持**相同的 `internal state`。
    * 圖示：
        ![](https://i.imgur.com/QIhjS0c.png =300x)

### Self-study Questions
* Can you design Average such that it becomes EC or SEC as well as offers correct averaging semantics?
* Think of other data structures and design update, query, and merge operations with reasonable semantics.
* Always draw timelines and state diagrams for your designs and proof EC or SEC, if possible.
* Think of data structures that support multiple update operations and one or more query operations.

## CRDT - Conflict-free replicated data types, 2011

### Introduce
*  Handles **concurrent writes**。
*  Intuition：
    * 不允許任意 value 的寫入，**寫入**操作必須**保證不會產生 conflict**。
    * CRDT 的資料結構是設計**特殊的寫入操作**，以此保證 **strong eventual consistency** 與 **monotonicity** (no rollbacks)。
*  Not panacea but a great solution!

### Classification
* CmRDT - **Commutative/op-based**
    * 前提：
        * **duplicate-free**：不會重複操作相同的指令，造成計數器重複累加。
        * **no-loss messaging**：不會有遺失的指令，造成計數器的運算缺漏。
    * 操作：在 replicas 間 propagate operations。
    * 範例：growth-only counter，只處理 `increment` 指令。
* CvRDT - **Convergent/state-based** 
    * 操作：propagate 並 `merge` (idempotent merge) `internal state`。
    * 範例：max register，可以儲存最大值。
* 因此，CRDT 的 value 是取決於 multiple write operations or states，而非只有最後一個。

### Properties (state-based CRDT)
* Code Example
    ```python
    class IntMax(object):
        def __init__(self):
            self.x = 0
        def query(self):
            return self.x
        def update(self, x):
            assert x >= 0
            self.x += x
        def merge(self, other):
            self.x = max(self.x, other.x)
    ```
*  `merge` is **associative**
    * `merge(merge(a, b), c) = merge(a, merge(b, c))`
* `merge` is **commutative**
    * `merge(a, b) = merge(b, a)`
* `merge` is **idempotent** 
    * `merge(a, a) = a`
* Every `update` is **increasing**
    * `merge(a, update(a, x)) = update(a, x)`

### G-Counter CRDT
* Growth-only counter。
* n 個 nodes 的 G-Counter：大小為 n 的非負整數陣列。
    * `query`：回傳 n 階 array 每個元素的總和。
    * `add(x)`：如果在第 i 個伺服器被觸發，那就在 array 的第 i 個元素操作 `add(x)`。
    * `merge`：對兩個 array 做 **pairwise maximumm**。

### PN-Counter CRDT
* Positive-Nagitive counter。
* 2 個 G-Counter：`p` 與 `n`。
    * `query`：回傳 `p.query()` - `n.query()`。
    * `add(x)`：執行 `p.add(x)`。
    * `sub(x)`：執行 `n.add(x)`。
    * `merge`：對 2 個 PN-Counter 做 **pairwise maximumm**。

### G-Set CRDT
* Growth-only set。
* 可以進行累加，但不行去除元素的 replicated set。
    * `query`：回傳 `G-Set` (Internal state)。
    * `add(x)`：新增 `x` 至 `G-Set`。
    * `merge`：執行 set union。

### 2P-Set CRDT
* 2 個 G-Set：`a` 與 `r`。
    * `query`：回傳 `a.query()` - `r.query()` (set difference)。
    * `add(x)`：執行 `a.add(x)`。
    * `sub(x)`：執行 `r.add(x)`。
    * `merge`：對 2 個 G-Set 做 **pairwise maximumm**。

### Summary
* CmCRDTs and CvCRDTs are **equivalent**!
* Really **neat** (**elegant**) solution if it applies

### Self-study Questions
* For the CRDTs introduced, establish its four properties.
* Create example executions for each CRDT and complete a timeline and a state table.
* Fine use cases where the introduced CRDTs apply and show how they are used.
* Think of new CRDTs and repeat the above.