# Week 07 - Consistent Hashing & Mapreduce
###### tags: `WS2020-IN2259-DS`
## Consistent Hashing
### Introduction

### Problem: Mapping objects to caches
* 首先,系統上布置了一堆 `caches`。
* 如:cooperative caching、CDNs 等等。
* 每個 `cache` 帶著**相同份量的 `object`** (**equal share of `objects`**)。
* 而 `clients` 必須知道要向哪一個 `cache` 進行 query。
* 否則一個一個找會消耗太多時間。
* **Horizontally partition** (shard) `object ID` space
* 將 `object ID` 進行水平分割。例如:1-100 為一組,101-200 為一組,以此類推。
* 但是在 **skewed distributions** 的情形下,會**有問題**。例如:10 servers,每個 server 負責 100個 `object ID`,但所有 objects 並沒有平均分配這些 1-1000 的 `object ID`,反而都集中在 1-100 與 901-1000 的區間中,這樣會造成很多 server 沒有作用。
* **`Cache`** 中儲存的內容必須可以**隨意的**「**進出 (come and go)**」,且不會影響到功能,如:non-effected caches。
### Solution attempt: Use hashing
* 讓 **`object ID`** (例如:URL u) **map** 到 **`cache`** 上。
* 使用 **hash function** **`h(u)`** 來運算。
* 例如:`h(x) = (ax + b) mod p`,**`p`** 表示 `h(x)` 的 range,意即 `caches` 的數量。
* **`u`** 可以由 `object ID` (如:URL) 的 bit pattern 轉換為 number 而來。
* 而 hash 傾向於 **distribute input uniformly**,這可以**解決 skewed distributions** 的情形。
* `Cache` **不會管** `object` (URLs) 自己不均衡的分配 (**uneven share**)。
* 不會有不成比例的過載 `node` (potential bottleneck)。
#### Example 1 - General Hashing
* 依照 `h(u)` 運算以分配 `object` 至 `caches`。
* 圖示:

#### Example 2 - Removing a Cache
* 原 `cache` $\mathrm{C_2}$ 壞掉,因此重新命名所有 `caches`。
* `p` 也由 5 改成 4。
* 但可以觀察到,幾乎所有 `objects` 的位置都改變了。
* 圖示:

#### Example 3 - Adding a Cache
* 重新加入新 `cache` $\mathrm{C_4}$。
* `p` 也由 4 改成 5。
* 但可以觀察到,幾乎所有 `objects` 的位置又改變了。
* 圖示:

### Consistent hashing
* 目標:
* **Uniform distribution** of `objects` across `nodes`。
* 易於搜索 `objects`。
* 使 `client` 可以於本地端計算出所需 `object` 的相對應 `node`。
* 允取 `nodes` 可以在**不影響太多的情況下**進行新增與移除。
* 意即只能 remap n/m `objects` (n `objects`, m `slots`)
* D. Karger et al., MIT, 1997
* Basis for [Akamai](https://zh.wikipedia.org/wiki/%E9%98%BF%E5%8D%A1%E8%BF%88%E7%A7%91%E6%8A%80)
* CDN company (content distribution network)
* Web cache as a service
#### Key idea intuition
* **基礎的 hash function** 用來映射 id:Map $[0, ..., m-1]$ to $[0, 1]$
* 如:`h(x) = (ax + b) mod m`
* 想像這個 hash function 形成一個**圓 (circumference 1)** 的映射。
* 通常是**單位圓 (unit circle)**,但此範例則是周長為 1 的圓。
* 所有的 **`object`** 透過 h(x) 映射至環上的一個 `slot`。
* 所有的 **`cache`** 同樣透過 h(x) 映射至環上的一個 `slot`。
* 最後,依照圓上的 **clockwise** 方向,將所有 `object` 指定給**最近的** `cache slot`。
* 圖示:

#### Example 1 - Mapping items to caches

#### Example 2 - Removing a cache

#### Example 3 - Adding a cache

#### Example 4 - Processing a Lookup(key)

### Cache lookup data structure at each node
* 目標:快速地於 `node` 中找到目標 `cache`。
* 實作:
* 使用 **binary tree** 儲存 `cache points`。
* 以 $O(\log n)$ 的時間複雜度搜尋到 clockwise successor of a `URL point`。
* 圖示:

### Cassandra global read-path
* 目標:某 `client` 要讀取 key 為 k1 的 **`value`** (假設此 key-value 位於 `table` t1)。
* 第一步:`client` 傳送 **`request`** 至任意 `node`。
* 第二步:此 `node` 擔任 `coordinator`,找到負責此 key-value 的 `replica`,並轉送 **`request`**。
* 第三步:負責此 key-value 的 `replica` 查找 local file system,並回傳 **`value`** 至 `coordinator`
* 第四步:`coordinator` 回傳 **`value`** 至 `client`。
* 圖示:

### Base hash function: MD5
* **Message Digest 5** (MD5), R. Rivest, 1992 (MD1, ..., MD6)
* 產生 **128-bit (16-byte)** 的 **`hash value`**。
* 任意長度的訊息,最終產出的都是**固定長度的 `hash value` (fixed-length output)**。
* 通常以 **16 進制 (hex number)** 表示,因此表示時需 **32 個 digits**。
* **Not Collision Resistant**:會有碰撞的危險!因此不適合用於加解密,但將 MD5 **用於 consistent hashing 已是足夠**。
> **US-CERT** about MD5: should be considered **cryptographically broken** and **unsuitable for further use**.
* 相比起 MD5,SHA-2 比較適合用於加解密。
* 範例:
```
MD5(“The quick brown fox jumps over the lazy dog”) = 9e107d9d372bb6826bd81d3542a419d6
MD5(“The quick brown fox jumps over the lazy dog.”) = e4d909c290d0fb1ca068ffaddf22cbd0
MD5(“”) = d41d8cd98f00b204e9800998ecf8427e
```
### Self-study questions
* How would you use MD5 and SHA2 instead of h(..) from our slides for consistent hashing?
* Apply h(..), MD5, SHA2 to a URL, where does the output map on the unit circle?
* Discuss pros and cons of having a given caching server map to one vs. more points on the unit circle.
* What are the implications of a slash-dot effect on consistent hashing?
## MapReduce - Introduction
### 起源
* Google 等大型科技公司面對**分析巨量資料** (order of **petabytes**) 的難題。
* 如:Inverted index, web access log analysis, system log analysis, distributed grep, etc.
* 這些處理資料的演算法相當簡單。
* 運算包含了 **repetitive maintenance** 的處理。
* 一種 **computation replication**,指於巨量資料的各個不同部分重複地進行運算以保持一致性。
* 例如:split data, forward data and code to participating nodes, check node state, react to node failures, retrieve partial results, reorganize into final result
* 需要 **large-scale data processing abstraction**。
* 靈感來源為 **functional programming** 與 **scatter/gather** in distributed/grid computing,但這裡需要將資料轉為 **key-value** 的格式,以此簡化設計。
* MapReduce paper published in 2004 at OSDI。
#### [Scatter/Gather Pattern - Compared to Broadcast](https://mpitutorial.com/tutorials/mpi-scatter-gather-and-allgather/)

#### Functional Programming - Quick Digression
* 與 MapReduce 的關係:
> MapReduce is “functional programming meets distributed processing ...”
* 何謂 [functional programming](https://zh.wikipedia.org/wiki/%E5%87%BD%E6%95%B0%E5%BC%8F%E7%BC%96%E7%A8%8B)?
* 將 computation 視為 functions 的應用與組合。
* 理論基礎為 [lambda calculus](https://zh.wikipedia.org/wiki/%CE%9B%E6%BC%94%E7%AE%97)。
* 為 [declarative programming](https://zh.wikipedia.org/wiki/%E5%AE%A3%E5%91%8A%E5%BC%8F%E7%B7%A8%E7%A8%8B) 其中的一種。
* 與 [imperative programming](https://zh.wikipedia.org/wiki/%E6%8C%87%E4%BB%A4%E5%BC%8F%E7%B7%A8%E7%A8%8B) 有何不同?
* Data flow 隱含於程式中。
* 執行順序是有機會改變的。
* 可參考此文章理解:[JavaScript: Functional Programming 函式編程概念 | by Po-Ching Liu | Medium](https://medium.com/@totoroLiu/javascript-functional-programming-%E5%87%BD%E5%BC%8F%E7%B7%A8%E7%A8%8B%E6%A6%82%E5%BF%B5-e8f4e778fc08)
### Lisp 簡易入門
* 線上練習:[Replumb REPL](https://clojurescript.io/)
* Lists are primitive datatypes
```lisp
(list 1 2 3 4 5)
→ (1 2 3 4 5)
(list (list 'a 1) (list 'b 2) (list 'c 3))
→ ((a 1) (b 2) (c 3))
(nth (list 1 2 3 4 5) 0)
→ 1
(nth (list (list 'a 1) (list 'b 2) (list 'c 3)) 3)
→ Error - Index out of bounds
```
* Function evaluation written in prefix notation
```lisp
(+ 1 2)
→ 3
(* 3 4)
→ 12
(Math/sqrt (+ (* 3 3) (* 4 4)))
→ 5
(def x 3)
→ x
(* x 5)
→ 15
```
* Functions are defined by binding **lambda expressions** to variables
```lisp
(def foo
(fn [x y] (Math/sqrt (+ (* x x) (* y y)))))
```
* Once defined, function can be applied
```lisp
(foo 3 4)
→ 5
```
* Generally expressed with **recursive** calls (instead of loops)
```lisp
(def factorial (fn [n]
(if (= n 1)
1
(* n (factorial (- n 1))))))
(factorial 6)
→ 720
```
* Everything is an **s-expression**
* **Homoiconicity**:`data` 與 `code` 之間沒有區別,如:operators, lists, values...
* 可以容易地產生 self-modifying code (SMC)。
* **Higher-order functions**
* Functions 以其他 functions 為參數 (arguments)。
* 範例:
```lisp
(def adder (fn [x] (fn [a] (+ x a)))
(def add-five (adder 5))
(add-five 11)
→ 16
```
#### Clojure Reduce
* Clojure 是一種動態的、強類型、執行在 Java 虛擬機(JVM)上的 Lisp 方言。
* 參考閱讀:[Day 01 - Clojure 基礎知識(一) - iT 邦幫忙::一起幫忙解決難題,拯救 IT 人的一天](https://ithelp.ithome.com.tw/articles/10184776)
* 範例:
```lisp
(reduce + 0 (list 1 2 3 4 5))
→ 15
```
* 圖示:

### From Lisp to MapReduce
#### Why use `functional programming` for large-scale computing?
* **隱藏** **analytics code** 中 distribution 與 coordination 的部分。
* 定義 functions 可以牢牢抓住 **core application logic (核心的邏輯)**。
* 讓此 **framework** (MapReduce) 可**跨機器 (across many machines)** 執行 functions。
* 綜合以上,如此可避免因 distribution、parallelism、coordination 所產生的 tricky bugs。
#### Adoption of two important concepts from `functional programming`
* **Map**:於 list 上的所以元素進行操作。
* **Fold**:即 **reduce**,指以某種方式結合 list 上的 results。
#### Map
* Higher-order function
* 機制:
* Function 應用至 list 中的各個元素。
* Result 是一個新的 list。
* 範例:
```lisp
(map (fn [x] (* x x)) (list 1 2 3 4 5))
→ '(1 4 9 16 25)
```
* 圖示:

#### Fold
* Higher-order function
* 機制:
* **Accumulator** 設定初始值。
* 對 accumulator 與 list 中的第一個 value 進行 function 的運算。
* 儲存 result 於 accumulator。
* 於 list 的剩餘元素重複上述操作。
* 最後 result 即為 accumulator 內之 value。
* 範例:
```lisp
(reduce + 0 (list 1 2 3 4 5))
→ 15
```
* 圖示:

#### Map/Fold in Action
* **Map** example
```lisp
(map (fn [x] (* x x)) (list 1 2 3 4 5))
→ '(1 4 9 16 25)
```
* **Fold** example (in Clojure called **reduce** with accumulator argument)
```lisp
(reduce + 0 (list 1 2 3 4 5))
→ 15
(reduce * 1 (list 1 2 3 4 5))
→ 120
```
* Sum of squares (combination of Map and Fold)
```lisp
(def sum-of-squares (fn [v] (reduce + 0 (map (fn [x] (* x x)) v)))
(sum-of-squares (list 1 2 3 4 5))
→ 55
```
#### 應用於 Distributed System
* 假設現在有一條很長的 records list,然後我們想要:
1. **分散 (distribute)** 各個 map operations 至多個 nodes。
2. 再將各個 nodes 完成的 map result **一起帶回 (bring back together)** 至後續的 fold operation 進行處理。
* 這就是 **MapReduce** (如 Hadoop,開源的 MapReduce 實作)。
* 由於本身 functional paradigm 的特性,MapReduce 可以達到 **Implicit parallelism**
* **Parallelize execution**:因 map operations 本身就相互獨立,因此可以做到平行執行。
* **Reorder folding**:若 fold function 為 commutative 且 associative,那便可以亂序執行。
* Commutative - change order of operands: `x*y = y*x`
* Associative - change order of operations: `(2+3)+4 = 2+(3+4)=9`
### MapReduce vs. MPI & RPC
* Message-passing interface (MPI)
* 基礎通訊的 library。
* 於 scientific computing 領域較為流行。
* Remote procedure calls (RPC)
* 呼叫另一 machine 之 function 的方式。
* 於 client/server designs 較為流行。
* MapReduce
* 抽象化 **complex** distributed programming 為 **simple** programming model。
* 提供 **fault-tolerance**。
* 為了 **specificity** 因而放棄 generality。
### MapReduce Summary
* 為了處理 **large-scale data sets** 而設計的 programming model 與 runtime system。
* 靈感來源:functional programming languages。
* 程式設計師指定處理「**什麼 (what)**」?
* 系統決定「**如何 (how)**」處理?
* scheduling, parallelism, locality, communication
* MapReduce framework responsibility:
* Automatic parallelization, distribution, result gathering
* Fault-tolerance
* I/O scheduling
* Status reporting and monitoring
### Self-study Questions
* Identify a few computations that lend themselves to map-style processing
* Identify a few computations that lend themselves to fold/reduce-style processing
* Identify a few computations that lend themselves to map-reduce-style processing
* Encode your computaitons in ClojureScript and evaluate at https://clojurescript.io
* Determine what aspect of your computation could be executed in parallel
* Think about how you’d handle this parallelism on your own
* What impact do non-commutative or non-associative computations have on reduce?
* Express an iterative computation with map-reduce and encode in ClojureScript.
## MapReduce - Architecture
### Map Task Examples
* ML 的 Feature extraction
* Scale raw image to smaller size
* Run edge detector on each image in training set
* Recoding
* Recode video from source to target format in **different resolutions**
* Natural language processing
* Translate each web page and index it
* **Sentiment analysis** of each web page, tweet, ...:標註「感情」的標籤,如:興奮、悲傷、讚、爛之類的。
### High-level and simplified view of the Architecture
* Input data 分配給 `workers` (即 `nodes`) 進行處理。
* `Master` 負責 coordinates `worker` selection 與 failover。
* 結果儲存於 output data files。
* 圖示:

### Less Simplified View

### MapReduce Programming Model
* Input & output: set of **key-value pairs**
* Map
* 首先讀取 data source 中的 `records`。
* 如:lines of files、rows of DB tables 等等
* 將這些 `records` 以 **key-value pairs** 的格式餵給 map function。
* 如:(filename, file-line(s))。
* 產出的 `intermediate values` 同樣為 **key-value pairs**
* 如:(word(i), 1)
* 最後會對產出的 `intermediate values` 以 key 為基礎進行排序 (shuffles and sorts),如此便可以將相同 key 的 **pairs** 送至同一個 `worker` 進行 reduce 處理。
* 範例:
```lisp
map (in_key, in_value) → list(out_key, intermediate_value)
```
* Reduce
* 將所有對應到**同一個 key** 的 `intermediate values` 合併成一個 list,即 `output value`。
* 最後產出的 `output values` 可以是 **key-value pairs**,也可以是 **single key-value pair**。
* 範例:
```lisp
reduce (out_key, list(intermediate_value)) → list(out_value)
```
* Combine (optional optimizations)
* 將 values 結合成 single value。
* 範例:
```lisp
combine (key, values) → (key, f(values))
```
* Partition (optional optimizations)
* User-specified I/O locations and tuning parameters
* 範例:
```lisp
partition (out_key, number of partitions) → Partition-ID for out_key
```
### Performed internally to MapReduce
* Scheduling
* 指派進行 map 與 reduce 作業的 workers。
* Data distribution
* 分配 `input data` 給負責 **map** 的 workers。
* Synchronization
* 針對 `intermediate results` 進行 gather、sort、shuffle 等 **reduce** 操作。
* Errors and faults
* 偵測 workers failures,並重啟 failed workers。
### Self-study Questions
* Worker failure is handled by restarting a worker, how about handling master failure?
* Could map and reduce tasks be running in an overlapping fashion instead of completing all map tasks before starting the reduce tasks? Discuss.
* Think about the MapReduce shuffle phase, how would you design this style of processing?
* Since data is stored locally (local node) by map tasks, is there a way to achieve locality of reference for reduce tasks (i.e., having them access the data locally as well)?
## MapReduce - EXAMPLE & DISCUSSION
### Word Count
* Count word frequencies across set of documents
* MAP:
* 輸入一個 document,產出此 document 中每個 word 的 key-value pair。
* 如:`("banana", 1)`、`("banana", 1)`、`("you", 1)`、`("you", 1)`、`("hello", 1)`
* INPUT:`(FileName, FileContent)`
* OUTPUT:`List(Word, WordAppearence)`
* REDUCE:
* 合併重複的 key-value pairs,並填入 values 的 sum。
* 如:`("banana", "2")`、`("you", 2)`、`("hello", 1)`
* INPUT:`(Word, List<WordAppearence>)`
* OUTPUT:`(Word, sum<WordAppearence>)`
* 圖示:
:::spoiler **Overview**

:::
:::spoiler **Map Phase**


:::
:::spoiler **Sort Phase**


:::
:::spoiler **Reduce Phase**


:::
<!-- * **Overview**

* **Map Phase**


* **Sort Phase**


* **Reduce Phase**

 -->
### Nota Bene!!!
#### Combiners
* 一個 map task 經常會產生很多相同 key 的 pairs。
* 如:出現頻率很高的 "the"。
* 對於擁有「結合率 (associativity)」的 operations,可以在 `mapper` 做 **pre-aggregating** 以節省 bandwidth (decreases size of intermediate data)。
* 如:於 word count 執行 **loal summing**。
* 範例:
```lisp
combine (key, values)
→ (key, f(values))
```
#### Partition Function
* 輸入 map 的資料為透過對 `input file` 做連續拆分 (contiguous splits) 而建立的。
* 為了 reduce 作業之順遂,我們需要確保進入同一個 `worker` 的 `intermediate values` 都是屬於相同的 `key`。
* 系統預設的 partition function:`hash(key) mod R`
* 可以分配 `intermediate key-value pairs` 至 R 個 `reduce workers`。
* 有時我們可以 override 此預設 partition function:
* 若 key 均為已知的值,便可以手動進行 load balance 之調整。
* 若有特定的需求,可以規劃某些 `key-value pair` 應位於相同的 `output files`。
* 範例:
```lisp
partition (out_key, number of partitions)
→ Partition-ID for out_key
```
### Parallelism (觀念重要!)
* **map()**:run in **parallel**、基於 different **`input data sets`**
* **reduce()**:run in **parallel**、基於 different **`output key`**
* **Bottleneck**:一定要等 **map phase** 完全完成 (**completely finished**) 後,才可以開始 reduce phase。
* **Straggler (流浪漢) problem**:若一些 `workers` 速度比較慢,那這個 `worker` 會拖慢整個運算。
* 有個解法是運用 redundant `workers` 於開始時同時進行運算,然後取用最快出來的 result。
### Google’s MapReduce Implementation
As Described in Their 2004 Publication
* Runs on Google clusters (state 2004/5)
* 1000s of 2-CPU x86 machines, 2-4 GB of memory, local-based storage (GFS), limited bandwidth (commodity hardware)
* C++ library linked to user programs (use of RPC)
* Scheduling/runtime system (a.k.a. master)
* Assign tasks to machines: typically # map tasks > # of machines
* Often use 200,000 map/5,000 reduce tasks, 2000 machines
* Pipeline shuffling with map execution
* Other MapReduce implementations
* Hadoop: Open-source, Java-based MapReduce framework
* Phoenix: Open-source MapReduce framework for **multi-core**
* Spark: MapReduce-like framework with **in-memory processing** in Scala/Java
### Criticism of MapReduce
* 來自從業者 (practitioner) 的思考
* Too **low level**
* 需要人工地為每個 record 的操作編寫程式。
* 並非 declarative model (SQL)。
* Nothing new
* 使用老舊的 classical Lisp 或是 higher order functions。
* Low per node performance
* 因為 replication 與 data transfer 會降低效能。
* Shuffling process 非常昂貴 (需要盡可能地減少)。
* 大量至 GFS 的 I/O。
* Batch computing, not designed for incremental, streaming tasks
* 資料需要備齊後才可以開始工作。
* 執行期無法加入更多輸入資料。
### Is my Job MapReducable?
* One-iteration algorithms 最適合!
* Multiple-iteration algorithms 還 OK。
* 只有**少數共享的資料**需要進行「跨 」的同步 (通常是透過 file system 進行同步作業)。
* 而那些需要進行**大量共享資料**之「跨 iteration」同步作業的 algorithms,便極度不適合 MapReduce。
* 如多數的 machine learning algorithms。
* 替換方案:Bulk synchronous processing frameworks。
* 圖表:
### Iterative MapReduce (最常見)
*
### Spark
* 比 Hadoop MapReduce 快上 100 倍。

* 支援多種 programming interfaces:Java、Scala、Python、R。
* 運用 useful libraries 的力量:

* 運行於 Hadoop cluster 之上 (uses HDFS)。
* **In-memory computation** vs. stable storage (MapReduce)
### Beyond MapReduce

### Self-study Questions
* Find other non-iterative computation examples and express them via a map and a reduce phase.
* For your examples, specify the map and the reduce interface.
* Also, specify combine and partition, if applicable.
* Also, specify the sort-shuffle phase that is done by the MapReduce framework.
* Is it possibly to do pre-aggregation for non-associatie and non-commutative operations, explain.