# Week 06 - Replication & CRDT ###### tags: `WS2020-IN2259-DS` ## Data replication ### Why Replicate? #### Performance * 這裡先看 **data replication**,computation replication 是別的概念,之後 MapReduce 會再教。 * 範例:**Caching** data at **browsers** and **proxy servers**,方便資料的快速讀取。 ![](https://i.imgur.com/rjI4Xi6.png =500x) * 真實案例:Typical (video) CDN (Content Delivery Network)。 ![](https://i.imgur.com/xmMuzEY.png =500x) #### High availability * 如果發生 crash,資料可以從別的 replica 獲得。 ![](https://i.imgur.com/GkoZL0Q.png =400x) * **Network Partitioning**:就算 Partition 脫離,兩個中心還是可以各自擔任戰備。 ![](https://i.imgur.com/Gk6C8kj.png =400x) * **Partition tolerance**:指系統擁有在 network partitions 狀態下繼續運作的能力。 #### Fault tolerance * 就算出現 faulty nodes (**並非僅是 crash,而是任意未知的錯誤**),也能提供穩定的服務。 ![](https://i.imgur.com/Sq8jdse.png =400x) #### “Cost” of Replication * **storing** additional copies of data。 * **keep replicas up to date**。 * 如何處理 replicas 中的 **stale (out-of-date) data**。 ### Self-study Questions * What is the “cache hierarchy” for web data? Think of web pages with varying complexity of content; understand where each content element is cached. * Is this cache hierarchy the same for any distributed data processing system? * Find some popular examples of open-source caching technology and understand their position in the cache hierarchy. * Think of alternatives for recovering a crashed replica – how does it catch up to the current state? * Determine conflicts when issuing reads and writes concurrently to replicas. ## Replication patterns ### Active replication #### 定義 * 由 **clients 傳送 requests 至每一個 replica** 來完成 replication。 ![](https://i.imgur.com/KsWR3Is.png =300x) * Requests 可以是 read,也可以是 write。 * Replicas 可能會 crash。 * **Fast to get a response**:因為多個 replica 會回傳 response,因此只要收到第一個 (**first result received**),就可以立即進行處理。 除非假設有 Byzantine failure,那就要等收到 majority results 後才能處理。 * 需要 **Configuration service** 來協助 client 獲知 replica 狀態。 * **Failure detection**:例如透過 heartbeats 機制來偵測故障的 replica。 * **Configuration management** #### 問題 * 若**同步問題**沒有處理好,Replicas 的 value 可能會分散 (**diverge**)。 ![](https://i.imgur.com/bur50Wa.png =300x) #### 解決問題 * 需要 **total order broadcast**:來自所有 clients 的所有 requests,**均為相同的 order**。 #### 圖解 * **Write** ![](https://i.imgur.com/2LM24Qo.png =500x) * **Read** (或者可以實作 read from a quorum,以應對 Byzantine failure) ![](https://i.imgur.com/TwdjtaJ.png =500x) ### Passive replication * 分兩種 scheme:**primary-backup** 與 **multi-primary**。 #### Primary-backup ![](https://i.imgur.com/Bk6WVHg.png =250x) ##### 定義 * **Primary** * 接收來自 clients 的 **invocations** (調用)。 * 執行 **requests** 與 回傳 **replies**。 * 複製 (**Replicates**) 狀態至其他 replicas。 * **Backup** * 只與 primary 互動。 * 擔任 primary 的備援 (透過 LE)。 * **leader election**:選一個 **primary** replica (有可能透過 configuration service 提供 LE)。 * **Eager replication**: 在 **request boundary 內** (primary 收到 request 至送出 reply 的區間) 處理完 replication。 ![](https://i.imgur.com/IFl2U19.png =500x) * **Lazy replication**: 因為 primary 對於 replicas 的寫入**不同步**,因此 replicas 有可能會 **diverge**,這需要額外的機制**處理 primary crashing** 的問題。 ![](https://i.imgur.com/TKvtEDn.png =500x) #### Primary-Backup: Presence of Failures * 無論是 Eager replication 還是 Lazy replication,都會在 backups 中**選出新的 primary**。 ![](https://i.imgur.com/Z94nbdv.png =500x) ##### After client receives reply * 不會選出新的 primary。 ![](https://i.imgur.com/tWXswb0.png =500x) ##### Before propagating updates * 步驟: 1. **client 進行 retry**:根據 **timeout mechanism**。 1. **retry** 可能還是會**失敗**:因為可能仍對原本的 primary 做 retry。 1. **檢查**是否有新的 **leader**:透過 configuration service 進行檢查,並再次 retry。 * 需求: * **Timeout mechanism**:讓 client 進行 retry。 * **Configuration service**:Failure detection、Leader election、Configuration management * 圖示: ![](https://i.imgur.com/lGhYFvZ.png =500x) ##### Before receiving all write acknowledgements * 需求: * **Timeout mechanism**:讓 client 進行 retry。 * Replica 維護一份 **persistent log**:為了**避免重複寫入**,因此需要於各個 replica 維護一份 **WAL**。 * 如果 replica 的介面具有 **idempotence (冪等性)**,那麼這種 idempotend 的操作就允許重複的 write opration。 * 參考閱讀:[Idempotency Key:原理與實測](https://william-yeh.net/post/2020/03/idempotency-key-test/) * **Configuration service**:Failure detection、Leader election、Configuration management * 圖示: ![](https://i.imgur.com/mB2zfgC.png =500x) #### Multi-primary ![](https://i.imgur.com/8i9R3y9.png =250x) ![](https://i.imgur.com/ub9UsXd.png =250x) ##### 目標 * 有效利用所有 replicas 的資源:distribute the loading of replicas。 ##### 定義 * 允許所有的 replicas 都有權限處理來自 client 的 requests。 * 因此 replicas 需要對 requests 進行排序,例如使用 **consensus**。 * 使用 **eager replication**: * **速度慢**,因為 replica 會被鎖定 (locked)。 * 在執行任何指令與回覆 clients 前,replicas 的 **operation order** 必須協調至一致。 * 使用 **optimistic Lazy replication** (optimistic Lazy MPR): * **改善 response times**:replicas 可以**及時回覆** client,以及執行**非同步 propagate** updates。 * 也稱作 optimistic replication。 * 問題:因為 **replicas 會 diverge**,因此會產生 inconsistencies、aborts、rollbacks。 ### Self-study Questions * Specify as pseudo code how a configuration service is used in each replication failure scenario. * Further, specify as pseudo code each the client and replica, including the additional mechanisms needed in each failure scenario. * Analytically compare all replication patterns in terms of number of messages exchanged for reading and writing, given n replicas. * How do the replication patterns compare in terms of potential for concurrency of operations? ## Chain replication - OVERVIEW * Update - Reply ![](https://i.imgur.com/o76UDtF.png =300x) * Query - Reply ![](https://i.imgur.com/UwksRnL.png =300x) * 注意事項 * **Update** processing request to head (write) * **Update** processing reply from tail (write) * **Query** processing from/to tail (read) ### Update 比較 (Primary-Backup vs. Chain Replication) * Primary-Backup Replication (**4 messages**) ![](https://i.imgur.com/yXwfH0p.png =300x) * Chain Replication (**5 messages**) ![](https://i.imgur.com/kneDLhI.png =300x) ### Query 比較 (Primary-Backup vs. Chain Replication) * Primary-Backup Replication (等待所有比現在這個 query 早的 updates 同步完成) ![](https://i.imgur.com/SJVLjxS.png =300x) * Chain Replication (直接發送 reply!) ![](https://i.imgur.com/AdCib5J.png =300x) ### Fault-tolerance in ChainReplication * $f + 1$ 個 nodes 可以容忍 $f$ 個 failures。 ### Self-study Questions * How many node failures can the primary-backup approach sustain without disrupting service? * Discuss pros and cons of chain, primary-backup, active, and multi-primary replication? * Quantify the number of messages exchanged by drawing on n replicas for each of the replication schemes for m updates. * Discuss the replica balance across all approaches? ## Chain replication - UPDATE & QUERY OPERATIONS * $R_2$ is the predecessor of $R_3$ * $R_3$ is the successor of $R_2$ ### Updates * 步驟 1 ![](https://i.imgur.com/WqcCn1V.png =500x) * 步驟 2 ![](https://i.imgur.com/ed20uJ1.png =500x) * 步驟 3 ![](https://i.imgur.com/erY4jmA.png =500x) * 步驟 4 ![](https://i.imgur.com/bjObE9L.png =500x) ### Multiple Updates * 可以同時處理多個 update ![](https://i.imgur.com/n6hDF1O.png =500x) ### 各個 History 之間的關係 * subset 與 superset ![](https://i.imgur.com/YFJdkaa.png =400x) ### Queries * The **tail** is the point (關鍵) of **linearization** (線性化)! ![](https://i.imgur.com/1R4djAB.png =500x) ### Self-study Questions * Do speculative histories have to be persisted to disk? * Do stable histories have to be persisted to disk? * While the subset relationships are neat, do they serve an ulterior purpose? * Could a reply to an update be sent before transitioning the update to the stable history of the tail? * Can chain replication work effectively with a single replica? ## Chain replication - FAILUR HANDLING & RECONFIGURATIONS ### Failures Handling #### Head failure * **沒有 update**:$R_2$ 直接成為 new head。 ![](https://i.imgur.com/3vBjCS6.png =400x) * **Head 收到 update,但還沒 propagate**:client 收不到 reply,因此發起 retry。 ![](https://i.imgur.com/y0VxfUi.png =400x) ![](https://i.imgur.com/d7nz7Kt.png =400x) ![](https://i.imgur.com/Dt2Qzl5.png =400x) #### Middle node failure * $R2$ 故障,$R_{head}$ 向故障 $R2$ 的 successor $R_3$ 對談。 ![](https://i.imgur.com/ZVGUTN1.png =400x) * $R_3$ 成為 $R_{head}$ 的 successor,$R_{head}$ 向 $R_3$ 傳播 update (紫色小圈圈)。 ![](https://i.imgur.com/aBhpRIP.png =400x) * Successor 傳播 in-flight acknowledgements 至 new predecessor 處,完成 update (綠色小圈圈,比較早的 update)。 ![](https://i.imgur.com/KvvutdT.png =400x) #### Tail failure * $R_{tail}$ 故障,$R_{3}$ 成為 new tail。 ![](https://i.imgur.com/5s5dd4S.png =400x) * $R_{3}$ 將 **stable history** 與 **speculative history** 刷新 (flush) 成相同的狀態, ![](https://i.imgur.com/xQL0Ong.png =400x) ### Reconfigurations * 因應 failure recovery 的 node 新增。 * 因應 extend topology 的 node 新增。 * 為 replicas 設定一個新的 chain。 #### Adding a New Node * A Configuration Change * 使用特別的指令 **configuration updates**:**add(nodeid)** 來建構 chain。 * 圖示 ![](https://i.imgur.com/KEcrYyE.png) * Inferring Configuration * 透過**查看 configuration updates 的順序**,nodes 便可以**確定 chain 的配置**方式。 * 例如 old tail 在收到 add(new tail) 後,就知道自己不再是 tail 了。 * 圖示 ![](https://i.imgur.com/pDRTZfT.png) * Flush History to New Tail * 圖示 ![](https://i.imgur.com/qZGdm6Q.png) ![](https://i.imgur.com/JZOiujh.png) * Copy Speculative onto Stable History * 圖示 ![](https://i.imgur.com/iyRNmSr.png) * Respond to Queries and Acknowledge Updates * 圖示 ![](https://i.imgur.com/6i9EzrB.png) * Propagate Acknowledgements * 圖示 ![](https://i.imgur.com/kdEKruC.png) ### Self-study Questions * Given a toplogy n replicas, n>2, how does replica i know about replica i+2, assuming replica i+1 failed? * Go through the motions of constructing a topology with add(nodeId) messages. * When does a replica know for sure a node has been permanently added? ## Gossiping ### Protocols * Disseminate information in **incremental manner** (漸進的方式)。 * 避免因為 heavy broadcast messages 導致 nodes 的 overloading。 * 缺點就是需要比較多的時間來 propagate information。 * 各個 node 各自維護對於其他 nodes 的 **partial view**。 * 各個 node **隨機**挑選其他 nodes 進行「聊八卦」: * Application data (e.g., current state) * Its partial view (e.g., topology information) * 各個 node 根據「聽到的八卦」來更新自己的 state 與 partial view。 * 「八卦」發生是:**periodically** (週期性的)、**non-deterministically** (不確定的,亦即中間結果無法確定) * 例如:Cassandra 以 gossiping 來傳播 node status、failure information 以及 metadata。 ### Lazy Replication Using Gossiping * Replicas 聊 operations 的「八卦」。 * Replicas 比較 (**reconcile**/compare) **自己的 operation logs** 與**收到的 operations**。 * 目標是確定還沒執行的 operations,如此可以確保所有 operations 都會執行到,且只會執行一次 (除非我們有 idempotent system)。 * 這個步驟與該 application 極為相關。 * 我們**假設**所有 operations 可以以任何順序 (**in any order**) 被加到 replica 中。 * 當系統不再處理任何 operations 後,所有的 replicas 在不斷的「八卦」後,最終會收斂 (**eventually converges**) 至相同的狀態。 ### Self-study Questions * Draw out any topology of nodes and inject a message at a randomly chosen node, compare a broadcast (send to all neighbours versus a gossip (send to some neighbours): * How many messages are required? * How long does it take for all nodes to be up-to-date? * Have each node in your topology maintain a data structure (e.g., counter, list, array, set, etc.), inject data structure updates at random nodes and propagate these updates via gossip: * What is the net result? * Does your replicated system converge at each replica to one and the same state (data structure)? ## CRDT - CONFLICT-FREE REPLICATED DATA TYPES ### Eventual consistency, informally * Desirable:**large-scale distributed systems**,因為需要 **high availability**。 * 實作簡單便宜 (如透過 gossip),但可能會丟出**舊資料 (stale data)**。 * 對於強調 stronger consistency 的環境,eventual consistency 會造成**很大的挑戰 (challenge)**。 #### Handling Concurrent Writes * 前提:很少,甚至**沒有 concurrent writes** 的場景 (scenarios),如 client-centric consistency。 * 但仍然需要 **handle concurrent writes** 的機制,以預防 concurrent writes 發生。 * 如果有 handle concurrent writes 的方法,eventual consistency 可以運用得更廣。 * 「只」需要保證在所有寫入 (至相同的 key-value) 後,無論 order 為何,**所有 replicas 收斂**。 #### 範例 ![](https://i.imgur.com/PWD7lVg.png =600x) #### Self-study Questions * Think of a few basic data structures, like lists, sets, counters, binary trees, heaps, maps, etc., and visualize for yourself what happens if replicated instances of these structures are updated via gossip. * Does their state converge, no matter the update sequence? * What happens if update operations are lost or duplicated? * What mechanisms we know other than gossip could be used to keep these replicated structures updated without violating their convergence. * What are pros and cons of these mechanisms? ## CRDT - FROM STATE-BASED OBJECTS TO REPLICATED STATE-BASED OBJECTS ### Introduce * Mostly plain old **objects**。 * 提供 clients 執行的 requests:**update** 與 **query**。 * 維護一份自己的內部狀態 (**Internal state**)。 * 對 instances (已初始化的 objects) 執行 **merge** requests。 * 提供 periodically merge (==support infrastructure==) ### 程式範例 - Class Average * Internal state:`self.sum` and `self.cnt`。 * Query:return average。 * Update:以 x 更新 average。 * Merge:與另外一個 instance 合併 (merge)。 ```python class Avg(object): def __init__(self): self.sum = 0 self.cnt = 0 def query(self): if self.cnt != 0: return self.sum / self.cnt else: return 0 def update(self, x): self.sum += x self.cnt += 1 def merge(self, avg): self.sum += avg.sum self.cnt += avg.cnt ``` ### Replicated State-based Object * State-based object 的 replication 可以橫跨 nodes。 * 因此兩個 nodes 都必須有 State-based object 的 copy。 * Client 只向其中一個 nodes 傳送 requests。 * Nodes 間週期地傳送自己 State-based object 的 copy 至其他 nodes 進行 merge。 #### Timeline 圖解 * 就算橫跨不同 replicas,**operation identifier** 仍是**唯一**。 ![](https://i.imgur.com/lotICsN.png =500x) #### States and Causal Histories * **Query** 要記得用 sum 除以 cnt (instance 數量)。 ![](https://i.imgur.com/8J0vns0.png =500x) #### Merge * 注意 **causal history** 的變化 ![](https://i.imgur.com/ki669q6.png =500x) #### Nodes Periodically Propagate Their State * 把自己的 copy 傳送至另一個 replica 的 instance。 ![](https://i.imgur.com/JhhXRgW.png =500x) ### Self-study Questions * Think of a few basic data structures, like lists, sets, counters, binary trees, heaps, maps, etc., and visualize for yourself what happens if replicated instances of these structures are updated via gossip. * For the above data structures, specify merge operations that merge the state of two instances of a given structure. * Assume merge happens periodically, does your replicated structures’ state converge? ## CRDT - EVENTUAL CONSISTENCY, MORE FORMALLY ### Eventual Consistency * 兩個 `replicated state-based object` * 相同 `causal history` * Eventually (**not necessarily immediately**) converge to the same `internal state` * ### Strong Eventual Consistency * 兩個 `replicated state-based object` * 相同 `causal history` * **Immediately** have the same `internal state` ### MergeAverage * 相同的 `causal history`,但**無法 converge** 至相同的 `internal state`。 * 不是 EC,更不是 SEC。 * 圖示: ![](https://i.imgur.com/N2C9RIS.png =300x) ![](https://i.imgur.com/81zWwrs.png =300x) ### NoMergeAverage * Object’s merge: * 不做任何 merge 的動作。 * 其他機制與 `MergeAverage` 相同。 * 相同的 `causal history`,並且最終會 **converge** 至**穩定 (stable)** 但**不同 (different)** 的 `internal state`。 * 不是 EC,更不是 SEC。 * 圖示: ![](https://i.imgur.com/N2C9RIS.png =300x) ![](https://i.imgur.com/RuvZGDR.png =300x) ### BMergeAverage * Object’s merge: * a:不做任何事。 * b:以 a 的 `internal state` 覆蓋掉自己的 `internal state`。 * 其他機制與 `MergeAverage` 相同。 * 相同的 `causal history`,並且最終會 **converge** 至相同的 `internal state`。 * **Eventually consistent!** (因為 `a1` 與 `b1` 狀態不同,因此沒有達到 strong 等級) * 圖示: ![](https://i.imgur.com/N2C9RIS.png =300x) ![](https://i.imgur.com/spL9KiO.png =300x) ### MaxAverage * Object’s merge: * Pair-wise max of `sum` and `cnt` * 其他機制與 `MergeAverage` 相同。 * 相同的 `causal history`,相同的 `internal state`。 * **Strongly eventually consistent!** (然而機制上不正確...) * 圖示: ![](https://i.imgur.com/N2C9RIS.png =300x) ![](https://i.imgur.com/JihT18n.png =300x) ### Summary * 設計 **strongly eventually consistent state-based object** 是大挑戰! * **Comparison**: * MergeAverage:相同的 `causal history`,不同的 `internal state`。 * NoMergeAverage:相同的 `causal history`,相同的 `stale state`,不同的 `internal state`。 * BMergeAverage:相同的 `causal history`,**最終到達**相同的 `internal state`。 * MaxAverage:相同的 `causal history`,**一直保持**相同的 `internal state`。 * 圖示: ![](https://i.imgur.com/QIhjS0c.png =300x) ### Self-study Questions * Can you design Average such that it becomes EC or SEC as well as offers correct averaging semantics? * Think of other data structures and design update, query, and merge operations with reasonable semantics. * Always draw timelines and state diagrams for your designs and proof EC or SEC, if possible. * Think of data structures that support multiple update operations and one or more query operations. ## CRDT - Conflict-free replicated data types, 2011 ### Introduce * Handles **concurrent writes**。 * Intuition: * 不允許任意 value 的寫入,**寫入**操作必須**保證不會產生 conflict**。 * CRDT 的資料結構是設計**特殊的寫入操作**,以此保證 **strong eventual consistency** 與 **monotonicity** (no rollbacks)。 * Not panacea but a great solution! ### Classification * CmRDT - **Commutative/op-based** * 前提: * **duplicate-free**:不會重複操作相同的指令,造成計數器重複累加。 * **no-loss messaging**:不會有遺失的指令,造成計數器的運算缺漏。 * 操作:在 replicas 間 propagate operations。 * 範例:growth-only counter,只處理 `increment` 指令。 * CvRDT - **Convergent/state-based** * 操作:propagate 並 `merge` (idempotent merge) `internal state`。 * 範例:max register,可以儲存最大值。 * 因此,CRDT 的 value 是取決於 multiple write operations or states,而非只有最後一個。 ### Properties (state-based CRDT) * Code Example ```python class IntMax(object): def __init__(self): self.x = 0 def query(self): return self.x def update(self, x): assert x >= 0 self.x += x def merge(self, other): self.x = max(self.x, other.x) ``` * `merge` is **associative** * `merge(merge(a, b), c) = merge(a, merge(b, c))` * `merge` is **commutative** * `merge(a, b) = merge(b, a)` * `merge` is **idempotent** * `merge(a, a) = a` * Every `update` is **increasing** * `merge(a, update(a, x)) = update(a, x)` ### G-Counter CRDT * Growth-only counter。 * n 個 nodes 的 G-Counter:大小為 n 的非負整數陣列。 * `query`:回傳 n 階 array 每個元素的總和。 * `add(x)`:如果在第 i 個伺服器被觸發,那就在 array 的第 i 個元素操作 `add(x)`。 * `merge`:對兩個 array 做 **pairwise maximumm**。 ### PN-Counter CRDT * Positive-Nagitive counter。 * 2 個 G-Counter:`p` 與 `n`。 * `query`:回傳 `p.query()` - `n.query()`。 * `add(x)`:執行 `p.add(x)`。 * `sub(x)`:執行 `n.add(x)`。 * `merge`:對 2 個 PN-Counter 做 **pairwise maximumm**。 ### G-Set CRDT * Growth-only set。 * 可以進行累加,但不行去除元素的 replicated set。 * `query`:回傳 `G-Set` (Internal state)。 * `add(x)`:新增 `x` 至 `G-Set`。 * `merge`:執行 set union。 ### 2P-Set CRDT * 2 個 G-Set:`a` 與 `r`。 * `query`:回傳 `a.query()` - `r.query()` (set difference)。 * `add(x)`:執行 `a.add(x)`。 * `sub(x)`:執行 `r.add(x)`。 * `merge`:對 2 個 G-Set 做 **pairwise maximumm**。 ### Summary * CmCRDTs and CvCRDTs are **equivalent**! * Really **neat** (**elegant**) solution if it applies ### Self-study Questions * For the CRDTs introduced, establish its four properties. * Create example executions for each CRDT and complete a timeline and a state table. * Fine use cases where the introduced CRDTs apply and show how they are used. * Think of new CRDTs and repeat the above.