# Week 07 - Consistent Hashing & Mapreduce ###### tags: `WS2020-IN2259-DS` ## Consistent Hashing ### Introduction ![](https://i.imgur.com/UUYJ8kH.png =500x) ### Problem: Mapping objects to caches * 首先,系統上布置了一堆 `caches`。 * 如:cooperative caching、CDNs 等等。 * 每個 `cache` 帶著**相同份量的 `object`** (**equal share of `objects`**)。 * 而 `clients` 必須知道要向哪一個 `cache` 進行 query。 * 否則一個一個找會消耗太多時間。 * **Horizontally partition** (shard) `object ID` space * 將 `object ID` 進行水平分割。例如:1-100 為一組,101-200 為一組,以此類推。 * 但是在 **skewed distributions** 的情形下,會**有問題**。例如:10 servers,每個 server 負責 100個 `object ID`,但所有 objects 並沒有平均分配這些 1-1000 的 `object ID`,反而都集中在 1-100 與 901-1000 的區間中,這樣會造成很多 server 沒有作用。 * **`Cache`** 中儲存的內容必須可以**隨意的**「**進出 (come and go)**」,且不會影響到功能,如:non-effected caches。 ### Solution attempt: Use hashing * 讓 **`object ID`** (例如:URL u) **map** 到 **`cache`** 上。 * 使用 **hash function** **`h(u)`** 來運算。 * 例如:`h(x) = (ax + b) mod p`,**`p`** 表示 `h(x)` 的 range,意即 `caches` 的數量。 * **`u`** 可以由 `object ID` (如:URL) 的 bit pattern 轉換為 number 而來。 * 而 hash 傾向於 **distribute input uniformly**,這可以**解決 skewed distributions** 的情形。 * `Cache` **不會管** `object` (URLs) 自己不均衡的分配 (**uneven share**)。 * 不會有不成比例的過載 `node` (potential bottleneck)。 #### Example 1 - General Hashing * 依照 `h(u)` 運算以分配 `object` 至 `caches`。 * 圖示: ![](https://i.imgur.com/lkIPA5G.png =x350) #### Example 2 - Removing a Cache * 原 `cache` $\mathrm{C_2}$ 壞掉,因此重新命名所有 `caches`。 * `p` 也由 5 改成 4。 * 但可以觀察到,幾乎所有 `objects` 的位置都改變了。 * 圖示: ![](https://i.imgur.com/W3EmXlb.png =x350) #### Example 3 - Adding a Cache * 重新加入新 `cache` $\mathrm{C_4}$。 * `p` 也由 4 改成 5。 * 但可以觀察到,幾乎所有 `objects` 的位置又改變了。 * 圖示: ![](https://i.imgur.com/JsrVYv5.png =x350) ### Consistent hashing * 目標: * **Uniform distribution** of `objects` across `nodes`。 * 易於搜索 `objects`。 * 使 `client` 可以於本地端計算出所需 `object` 的相對應 `node`。 * 允取 `nodes` 可以在**不影響太多的情況下**進行新增與移除。 * 意即只能 remap n/m `objects` (n `objects`, m `slots`) * D. Karger et al., MIT, 1997 * Basis for [Akamai](https://zh.wikipedia.org/wiki/%E9%98%BF%E5%8D%A1%E8%BF%88%E7%A7%91%E6%8A%80) * CDN company (content distribution network) * Web cache as a service #### Key idea intuition * **基礎的 hash function** 用來映射 id:Map $[0, ..., m-1]$ to $[0, 1]$ * 如:`h(x) = (ax + b) mod m` * 想像這個 hash function 形成一個**圓 (circumference 1)** 的映射。 * 通常是**單位圓 (unit circle)**,但此範例則是周長為 1 的圓。 * 所有的 **`object`** 透過 h(x) 映射至環上的一個 `slot`。 * 所有的 **`cache`** 同樣透過 h(x) 映射至環上的一個 `slot`。 * 最後,依照圓上的 **clockwise** 方向,將所有 `object` 指定給**最近的** `cache slot`。 * 圖示: ![](https://i.imgur.com/f2DGR97.png =300x) #### Example 1 - Mapping items to caches ![](https://i.imgur.com/xmujSof.png =500x) #### Example 2 - Removing a cache ![](https://i.imgur.com/d5Zb3oi.png =500x) #### Example 3 - Adding a cache ![](https://i.imgur.com/ZDINPyv.png =500x) #### Example 4 - Processing a Lookup(key) ![](https://i.imgur.com/vJqvhzR.png =500x) ### Cache lookup data structure at each node * 目標:快速地於 `node` 中找到目標 `cache`。 * 實作: * 使用 **binary tree** 儲存 `cache points`。 * 以 $O(\log n)$ 的時間複雜度搜尋到 clockwise successor of a `URL point`。 * 圖示: ![](https://i.imgur.com/FuQAhYY.png =500x) ### Cassandra global read-path * 目標:某 `client` 要讀取 key 為 k1 的 **`value`** (假設此 key-value 位於 `table` t1)。 * 第一步:`client` 傳送 **`request`** 至任意 `node`。 * 第二步:此 `node` 擔任 `coordinator`,找到負責此 key-value 的 `replica`,並轉送 **`request`**。 * 第三步:負責此 key-value 的 `replica` 查找 local file system,並回傳 **`value`** 至 `coordinator` * 第四步:`coordinator` 回傳 **`value`** 至 `client`。 * 圖示: ![](https://i.imgur.com/rce505G.png =500x) ### Base hash function: MD5 * **Message Digest 5** (MD5), R. Rivest, 1992 (MD1, ..., MD6) * 產生 **128-bit (16-byte)** 的 **`hash value`**。 * 任意長度的訊息,最終產出的都是**固定長度的 `hash value` (fixed-length output)**。 * 通常以 **16 進制 (hex number)** 表示,因此表示時需 **32 個 digits**。 * **Not Collision Resistant**:會有碰撞的危險!因此不適合用於加解密,但將 MD5 **用於 consistent hashing 已是足夠**。 > **US-CERT** about MD5: should be considered **cryptographically broken** and **unsuitable for further use**. * 相比起 MD5,SHA-2 比較適合用於加解密。 * 範例: ``` MD5(“The quick brown fox jumps over the lazy dog”) = 9e107d9d372bb6826bd81d3542a419d6 MD5(“The quick brown fox jumps over the lazy dog.”) = e4d909c290d0fb1ca068ffaddf22cbd0 MD5(“”) = d41d8cd98f00b204e9800998ecf8427e ``` ### Self-study questions * How would you use MD5 and SHA2 instead of h(..) from our slides for consistent hashing? * Apply h(..), MD5, SHA2 to a URL, where does the output map on the unit circle? * Discuss pros and cons of having a given caching server map to one vs. more points on the unit circle. * What are the implications of a slash-dot effect on consistent hashing? ## MapReduce - Introduction ### 起源 * Google 等大型科技公司面對**分析巨量資料** (order of **petabytes**) 的難題。 * 如:Inverted index, web access log analysis, system log analysis, distributed grep, etc. * 這些處理資料的演算法相當簡單。 * 運算包含了 **repetitive maintenance** 的處理。 * 一種 **computation replication**,指於巨量資料的各個不同部分重複地進行運算以保持一致性。 * 例如:split data, forward data and code to participating nodes, check node state, react to node failures, retrieve partial results, reorganize into final result * 需要 **large-scale data processing abstraction**。 * 靈感來源為 **functional programming** 與 **scatter/gather** in distributed/grid computing,但這裡需要將資料轉為 **key-value** 的格式,以此簡化設計。 * MapReduce paper published in 2004 at OSDI。 #### [Scatter/Gather Pattern - Compared to Broadcast](https://mpitutorial.com/tutorials/mpi-scatter-gather-and-allgather/) ![](https://i.imgur.com/mj5Ml8N.png =500x) #### Functional Programming - Quick Digression * 與 MapReduce 的關係: > MapReduce is “functional programming meets distributed processing ...” * 何謂 [functional programming](https://zh.wikipedia.org/wiki/%E5%87%BD%E6%95%B0%E5%BC%8F%E7%BC%96%E7%A8%8B)? * 將 computation 視為 functions 的應用與組合。 * 理論基礎為 [lambda calculus](https://zh.wikipedia.org/wiki/%CE%9B%E6%BC%94%E7%AE%97)。 * 為 [declarative programming](https://zh.wikipedia.org/wiki/%E5%AE%A3%E5%91%8A%E5%BC%8F%E7%B7%A8%E7%A8%8B) 其中的一種。 * 與 [imperative programming](https://zh.wikipedia.org/wiki/%E6%8C%87%E4%BB%A4%E5%BC%8F%E7%B7%A8%E7%A8%8B) 有何不同? * Data flow 隱含於程式中。 * 執行順序是有機會改變的。 * 可參考此文章理解:[JavaScript: Functional Programming 函式編程概念 | by Po-Ching Liu | Medium](https://medium.com/@totoroLiu/javascript-functional-programming-%E5%87%BD%E5%BC%8F%E7%B7%A8%E7%A8%8B%E6%A6%82%E5%BF%B5-e8f4e778fc08) ### Lisp 簡易入門 * 線上練習:[Replumb REPL](https://clojurescript.io/) * Lists are primitive datatypes ```lisp (list 1 2 3 4 5) → (1 2 3 4 5) (list (list 'a 1) (list 'b 2) (list 'c 3)) → ((a 1) (b 2) (c 3)) (nth (list 1 2 3 4 5) 0) → 1 (nth (list (list 'a 1) (list 'b 2) (list 'c 3)) 3) → Error - Index out of bounds ``` * Function evaluation written in prefix notation ```lisp (+ 1 2) → 3 (* 3 4) → 12 (Math/sqrt (+ (* 3 3) (* 4 4))) → 5 (def x 3) → x (* x 5) → 15 ``` * Functions are defined by binding **lambda expressions** to variables ```lisp (def foo (fn [x y] (Math/sqrt (+ (* x x) (* y y))))) ``` * Once defined, function can be applied ```lisp (foo 3 4) → 5 ``` * Generally expressed with **recursive** calls (instead of loops) ```lisp (def factorial (fn [n] (if (= n 1) 1 (* n (factorial (- n 1)))))) (factorial 6) → 720 ``` * Everything is an **s-expression** * **Homoiconicity**:`data` 與 `code` 之間沒有區別,如:operators, lists, values... * 可以容易地產生 self-modifying code (SMC)。 * **Higher-order functions** * Functions 以其他 functions 為參數 (arguments)。 * 範例: ```lisp (def adder (fn [x] (fn [a] (+ x a))) (def add-five (adder 5)) (add-five 11) → 16 ``` #### Clojure Reduce * Clojure 是一種動態的、強類型、執行在 Java 虛擬機(JVM)上的 Lisp 方言。 * 參考閱讀:[Day 01 - Clojure 基礎知識(一) - iT 邦幫忙::一起幫忙解決難題,拯救 IT 人的一天](https://ithelp.ithome.com.tw/articles/10184776) * 範例: ```lisp (reduce + 0 (list 1 2 3 4 5)) → 15 ``` * 圖示: ![](https://i.imgur.com/hly44l0.png =400x) ### From Lisp to MapReduce #### Why use `functional programming` for large-scale computing? * **隱藏** **analytics code** 中 distribution 與 coordination 的部分。 * 定義 functions 可以牢牢抓住 **core application logic (核心的邏輯)**。 * 讓此 **framework** (MapReduce) 可**跨機器 (across many machines)** 執行 functions。 * 綜合以上,如此可避免因 distribution、parallelism、coordination 所產生的 tricky bugs。 #### Adoption of two important concepts from `functional programming` * **Map**:於 list 上的所以元素進行操作。 * **Fold**:即 **reduce**,指以某種方式結合 list 上的 results。 #### Map * Higher-order function * 機制: * Function 應用至 list 中的各個元素。 * Result 是一個新的 list。 * 範例: ```lisp (map (fn [x] (* x x)) (list 1 2 3 4 5)) → '(1 4 9 16 25) ``` * 圖示: ![](https://i.imgur.com/LI2M748.png) #### Fold * Higher-order function * 機制: * **Accumulator** 設定初始值。 * 對 accumulator 與 list 中的第一個 value 進行 function 的運算。 * 儲存 result 於 accumulator。 * 於 list 的剩餘元素重複上述操作。 * 最後 result 即為 accumulator 內之 value。 * 範例: ```lisp (reduce + 0 (list 1 2 3 4 5)) → 15 ``` * 圖示: ![](https://i.imgur.com/5YRdOde.png) #### Map/Fold in Action * **Map** example ```lisp (map (fn [x] (* x x)) (list 1 2 3 4 5)) → '(1 4 9 16 25) ``` * **Fold** example (in Clojure called **reduce** with accumulator argument) ```lisp (reduce + 0 (list 1 2 3 4 5)) → 15 (reduce * 1 (list 1 2 3 4 5)) → 120 ``` * Sum of squares (combination of Map and Fold) ```lisp (def sum-of-squares (fn [v] (reduce + 0 (map (fn [x] (* x x)) v))) (sum-of-squares (list 1 2 3 4 5)) → 55 ``` #### 應用於 Distributed System * 假設現在有一條很長的 records list,然後我們想要: 1. **分散 (distribute)** 各個 map operations 至多個 nodes。 2. 再將各個 nodes 完成的 map result **一起帶回 (bring back together)** 至後續的 fold operation 進行處理。 * 這就是 **MapReduce** (如 Hadoop,開源的 MapReduce 實作)。 * 由於本身 functional paradigm 的特性,MapReduce 可以達到 **Implicit parallelism** * **Parallelize execution**:因 map operations 本身就相互獨立,因此可以做到平行執行。 * **Reorder folding**:若 fold function 為 commutative 且 associative,那便可以亂序執行。 * Commutative - change order of operands: `x*y = y*x` * Associative - change order of operations: `(2+3)+4 = 2+(3+4)=9` ### MapReduce vs. MPI & RPC * Message-passing interface (MPI) * 基礎通訊的 library。 * 於 scientific computing 領域較為流行。 * Remote procedure calls (RPC) * 呼叫另一 machine 之 function 的方式。 * 於 client/server designs 較為流行。 * MapReduce * 抽象化 **complex** distributed programming 為 **simple** programming model。 * 提供 **fault-tolerance**。 * 為了 **specificity** 因而放棄 generality。 ### MapReduce Summary * 為了處理 **large-scale data sets** 而設計的 programming model 與 runtime system。 * 靈感來源:functional programming languages。 * 程式設計師指定處理「**什麼 (what)**」? * 系統決定「**如何 (how)**」處理? * scheduling, parallelism, locality, communication * MapReduce framework responsibility: * Automatic parallelization, distribution, result gathering * Fault-tolerance * I/O scheduling * Status reporting and monitoring ### Self-study Questions * Identify a few computations that lend themselves to map-style processing * Identify a few computations that lend themselves to fold/reduce-style processing * Identify a few computations that lend themselves to map-reduce-style processing * Encode your computaitons in ClojureScript and evaluate at https://clojurescript.io * Determine what aspect of your computation could be executed in parallel * Think about how you’d handle this parallelism on your own * What impact do non-commutative or non-associative computations have on reduce? * Express an iterative computation with map-reduce and encode in ClojureScript. ## MapReduce - Architecture ### Map Task Examples * ML 的 Feature extraction * Scale raw image to smaller size * Run edge detector on each image in training set * Recoding * Recode video from source to target format in **different resolutions** * Natural language processing * Translate each web page and index it * **Sentiment analysis** of each web page, tweet, ...:標註「感情」的標籤,如:興奮、悲傷、讚、爛之類的。 ### High-level and simplified view of the Architecture * Input data 分配給 `workers` (即 `nodes`) 進行處理。 * `Master` 負責 coordinates `worker` selection 與 failover。 * 結果儲存於 output data files。 * 圖示: ![](https://i.imgur.com/f8TK8ez.png =300x) ### Less Simplified View ![](https://i.imgur.com/Rko7VgA.png =500x) ### MapReduce Programming Model * Input & output: set of **key-value pairs** * Map * 首先讀取 data source 中的 `records`。 * 如:lines of files、rows of DB tables 等等 * 將這些 `records` 以 **key-value pairs** 的格式餵給 map function。 * 如:(filename, file-line(s))。 * 產出的 `intermediate values` 同樣為 **key-value pairs** * 如:(word(i), 1) * 最後會對產出的 `intermediate values` 以 key 為基礎進行排序 (shuffles and sorts),如此便可以將相同 key 的 **pairs** 送至同一個 `worker` 進行 reduce 處理。 * 範例: ```lisp map (in_key, in_value) → list(out_key, intermediate_value) ``` * Reduce * 將所有對應到**同一個 key** 的 `intermediate values` 合併成一個 list,即 `output value`。 * 最後產出的 `output values` 可以是 **key-value pairs**,也可以是 **single key-value pair**。 * 範例: ```lisp reduce (out_key, list(intermediate_value)) → list(out_value) ``` * Combine (optional optimizations) * 將 values 結合成 single value。 * 範例: ```lisp combine (key, values) → (key, f(values)) ``` * Partition (optional optimizations) * User-specified I/O locations and tuning parameters * 範例: ```lisp partition (out_key, number of partitions) → Partition-ID for out_key ``` ### Performed internally to MapReduce * Scheduling * 指派進行 map 與 reduce 作業的 workers。 * Data distribution * 分配 `input data` 給負責 **map** 的 workers。 * Synchronization * 針對 `intermediate results` 進行 gather、sort、shuffle 等 **reduce** 操作。 * Errors and faults * 偵測 workers failures,並重啟 failed workers。 ### Self-study Questions * Worker failure is handled by restarting a worker, how about handling master failure? * Could map and reduce tasks be running in an overlapping fashion instead of completing all map tasks before starting the reduce tasks? Discuss. * Think about the MapReduce shuffle phase, how would you design this style of processing? * Since data is stored locally (local node) by map tasks, is there a way to achieve locality of reference for reduce tasks (i.e., having them access the data locally as well)? ## MapReduce - EXAMPLE & DISCUSSION ### Word Count * Count word frequencies across set of documents * MAP: * 輸入一個 document,產出此 document 中每個 word 的 key-value pair。 * 如:`("banana", 1)`、`("banana", 1)`、`("you", 1)`、`("you", 1)`、`("hello", 1)` * INPUT:`(FileName, FileContent)` * OUTPUT:`List(Word, WordAppearence)` * REDUCE: * 合併重複的 key-value pairs,並填入 values 的 sum。 * 如:`("banana", "2")`、`("you", 2)`、`("hello", 1)` * INPUT:`(Word, List<WordAppearence>)` * OUTPUT:`(Word, sum<WordAppearence>)` * 圖示: :::spoiler **Overview** ![](https://i.imgur.com/e9r3Q2r.png =500x) ::: :::spoiler **Map Phase** ![](https://i.imgur.com/I5qrgDi.png =500x) ![](https://i.imgur.com/8OUcSee.png =500x) ::: :::spoiler **Sort Phase** ![](https://i.imgur.com/6jkd7cA.png =500x) ![](https://i.imgur.com/2DjDgpB.png =500x) ::: :::spoiler **Reduce Phase** ![](https://i.imgur.com/IKOD4Mq.png =500x) ![](https://i.imgur.com/LFDmpY2.png =500x) ::: <!-- * **Overview** ![](https://i.imgur.com/e9r3Q2r.png =500x) * **Map Phase** ![](https://i.imgur.com/I5qrgDi.png =500x) ![](https://i.imgur.com/8OUcSee.png =500x) * **Sort Phase** ![](https://i.imgur.com/6jkd7cA.png =500x) ![](https://i.imgur.com/2DjDgpB.png =500x) * **Reduce Phase** ![](https://i.imgur.com/IKOD4Mq.png =500x) ![](https://i.imgur.com/LFDmpY2.png =500x) --> ### Nota Bene!!! #### Combiners * 一個 map task 經常會產生很多相同 key 的 pairs。 * 如:出現頻率很高的 "the"。 * 對於擁有「結合率 (associativity)」的 operations,可以在 `mapper` 做 **pre-aggregating** 以節省 bandwidth (decreases size of intermediate data)。 * 如:於 word count 執行 **loal summing**。 * 範例: ```lisp combine (key, values) → (key, f(values)) ``` #### Partition Function * 輸入 map 的資料為透過對 `input file` 做連續拆分 (contiguous splits) 而建立的。 * 為了 reduce 作業之順遂,我們需要確保進入同一個 `worker` 的 `intermediate values` 都是屬於相同的 `key`。 * 系統預設的 partition function:`hash(key) mod R` * 可以分配 `intermediate key-value pairs` 至 R 個 `reduce workers`。 * 有時我們可以 override 此預設 partition function: * 若 key 均為已知的值,便可以手動進行 load balance 之調整。 * 若有特定的需求,可以規劃某些 `key-value pair` 應位於相同的 `output files`。 * 範例: ```lisp partition (out_key, number of partitions) → Partition-ID for out_key ``` ### Parallelism (觀念重要!) * **map()**:run in **parallel**、基於 different **`input data sets`** * **reduce()**:run in **parallel**、基於 different **`output key`** * **Bottleneck**:一定要等 **map phase** 完全完成 (**completely finished**) 後,才可以開始 reduce phase。 * **Straggler (流浪漢) problem**:若一些 `workers` 速度比較慢,那這個 `worker` 會拖慢整個運算。 * 有個解法是運用 redundant `workers` 於開始時同時進行運算,然後取用最快出來的 result。 ### Google’s MapReduce Implementation As Described in Their 2004 Publication * Runs on Google clusters (state 2004/5) * 1000s of 2-CPU x86 machines, 2-4 GB of memory, local-based storage (GFS), limited bandwidth (commodity hardware) * C++ library linked to user programs (use of RPC) * Scheduling/runtime system (a.k.a. master) * Assign tasks to machines: typically # map tasks > # of machines * Often use 200,000 map/5,000 reduce tasks, 2000 machines * Pipeline shuffling with map execution * Other MapReduce implementations * Hadoop: Open-source, Java-based MapReduce framework * Phoenix: Open-source MapReduce framework for **multi-core** * Spark: MapReduce-like framework with **in-memory processing** in Scala/Java ### Criticism of MapReduce * 來自從業者 (practitioner) 的思考 * Too **low level** * 需要人工地為每個 record 的操作編寫程式。 * 並非 declarative model (SQL)。 * Nothing new * 使用老舊的 classical Lisp 或是 higher order functions。 * Low per node performance * 因為 replication 與 data transfer 會降低效能。 * Shuffling process 非常昂貴 (需要盡可能地減少)。 * 大量至 GFS 的 I/O。 * Batch computing, not designed for incremental, streaming tasks * 資料需要備齊後才可以開始工作。 * 執行期無法加入更多輸入資料。 ### Is my Job MapReducable? * One-iteration algorithms 最適合! * Multiple-iteration algorithms 還 OK。 * 只有**少數共享的資料**需要進行「跨 」的同步 (通常是透過 file system 進行同步作業)。 * 而那些需要進行**大量共享資料**之「跨 iteration」同步作業的 algorithms,便極度不適合 MapReduce。 * 如多數的 machine learning algorithms。 * 替換方案:Bulk synchronous processing frameworks。 * 圖表: ### Iterative MapReduce (最常見) * ### Spark * 比 Hadoop MapReduce 快上 100 倍。 ![](https://i.imgur.com/VpXn1Zn.png =200x) * 支援多種 programming interfaces:Java、Scala、Python、R。 * 運用 useful libraries 的力量: ![](https://i.imgur.com/353j28R.png =300x) * 運行於 Hadoop cluster 之上 (uses HDFS)。 * **In-memory computation** vs. stable storage (MapReduce) ### Beyond MapReduce ![](https://i.imgur.com/iftZvGE.png =500x) ### Self-study Questions * Find other non-iterative computation examples and express them via a map and a reduce phase. * For your examples, specify the map and the reduce interface. * Also, specify combine and partition, if applicable. * Also, specify the sort-shuffle phase that is done by the MapReduce framework. * Is it possibly to do pre-aggregation for non-associatie and non-commutative operations, explain.