# 【Introduction & Implementation】
<center>
<img src="https://hackmd.io/_uploads/H1lMB2eZyl.png"
style="
width: 90%;
height: auto;">
<div style="
border-bottom: 3px solid #d9d9d9;
display: inline-block;
color: #999;
padding: 3px;">
</div>
</center>
<br>
## 1. Hadoop
### 1.1 Hadoop Architecture
#### 1.1.1Hadoop MapReduce
- Single master node, many worker nodes
- Client submits a job to master node
- Master splits each job into tasks (map/reduce), and assigns tasks to worker nodes
#### 1.1.2 Hadoop Distributed File System (HDFS)
- Single name node, many data nodes
- Files stored as large, fixed-size (e.g. 64MB) blocks
- HDFS typically holds map input and reduce output
### 1.2 Hadoop Workflow
#### 1.2.1 Hadoop Dataflow - Whole Flow
<center>
<img src="https://hackmd.io/_uploads/BJloJ1lbkl.png"
style="
width: 90%;
height: auto;">
<div style="
border-bottom: 3px solid #d9d9d9;
display: inline-block;
color: #999;
padding: 3px;">
</div>
</center>
#### 1.2.2 Hadoop Dataflow - Map Reduce Flow
<center>
<img src="https://hackmd.io/_uploads/ryh8Nyl-yl.png"
style="
width: 90%;
height: auto;">
<div style="
border-bottom: 3px solid #d9d9d9;
display: inline-block;
color: #999;
padding: 3px;">
</div>
</center>
<br>
## 2. MapReduce
### 2.1 Why MapReduce is proposed ?
- A simplified platform for programmer or scientists with non-parallel background
- Hadoop: an open source implementation for MapReduce
### 2.2 What is MapReduce?
- **Parallel programming** model for clusters of commodity machines
- Platform for reliable, scalable parallel computing
- Abstracts issues of parallel environment from programmer.
- Data-parallel programming model for clusters of commodity machines
- Pioneered by Google: Processes 20 PB of data per day
- Popularized by open-source Hadoop project: Used by Yahoo!, Facebook, Amazon, …
### 2.3 MapReduce Implementation
- Google has a proprietary implementation in C++: Bindings in Java, Python
- **Hadoop** is an open-source implementation in Java
1. Development led by Yahoo, used in production
2. Now an **Apache project**
3. Rapidly expanding software ecosystem
#### 2.3.1 MapReduce Programming Model
- We writing two functions, Map and Reduce. (have a special form key-value pairs)
- All values with the **same key** are sent to the **same reducer**
1. **Data type**:key-value records
2. **Map function**:$(K_{in}, V_{in}) \rightarrow list(K_{inter}, V_{inter})$
3. **Reduce function**: $(K_{inter}, list(V_{inter})) \rightarrow list(K_{out}, V_{out})$
<center>
<img src="https://hackmd.io/_uploads/BkyauygZyg.png"
style="
width: 90%;
height: auto;">
<div style="
border-bottom: 3px solid #d9d9d9;
display: inline-block;
color: #999;
padding: 3px;">
</div>
</center>
<br>
#### 2.3.2 Pay Attention on the following things
- **Primitive Type**
<center>
| Java Primitive | Writable Implementation |
| :--: | :--: |
| boolean | BooleanWritable |
| byte | ByteWritable |
| int | IntWritable & VIntWritable |
| float | FloatWritable |
| long | LongWritable & VLongWritable |
| double | DoubleWritable |
</center>
- **Context Object**
1. context.write()
2. Hadoop I/O Controlling
3. Key, Value
## 3. MapReduce Implementation Tips
### 3.1 MapReduce Example
#### 3.1.1 Example - Word Count
<center>
<img src="https://hackmd.io/_uploads/HkLVKse-Je.png"
style="
width: 90%;
height: auto;">
<div style="
border-bottom: 3px solid #d9d9d9;
display: inline-block;
color: #999;
padding: 3px;">
</div>
</center>
#### 3.1.2 Example - Sum
<center>
<img src="https://hackmd.io/_uploads/Hkk3pilZyg.png"
style="
width: 90%;
height: auto;">
<div style="
border-bottom: 3px solid #d9d9d9;
display: inline-block;
color: #999;
padding: 3px;">
</div>
</center>
#### 3.1.3 Example - Mean
<center>
<img src="https://hackmd.io/_uploads/S13RMneZkg.png"
style="
width: 90%;
height: auto;">
<div style="
border-bottom: 3px solid #d9d9d9;
display: inline-block;
color: #999;
padding: 3px;">
</div>
</center>
<br>
### 3.2 Mapper
繼承了 Hadoop 的 Mapper 類別,**提供了 MapReduce 過程中的「映射」階段**,在這個階段,輸入的資料會被處理並轉換為 **key-value** 組對形式
#### 3.2.1 Example - Word Count
```java!
// LongWritable: 是輸入鍵的類型,表示每行文字的偏移量 (offset)。
// Text: 是輸入值的類型,即輸入的行文字。
// Text: 是輸出鍵的類型,表示每個字詞。
// IntWritable: 是輸出值的類型,用來儲存每個字詞的次數。
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable>
```
<center>
<img src="https://hackmd.io/_uploads/BJxL9jlW1l.png"
style="
width: 70%;
height: auto;">
<div style="
border-bottom: 3px solid #d9d9d9;
display: inline-block;
color: #999;
padding: 3px;">
</div>
</center>
#### 3.2.2 Example - Sum
```java!
// LongWritable: 是輸入鍵的類型,代表每行的偏移量。
// Text: 是輸入值的類型,即每行的文字。
// Text: 是輸出鍵的類型,這裡我們固定將鍵設為 "Total"。
// IntWritable: 是輸出值的類型,代表每個數字的值。
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable>
```
<center>
<img src="https://hackmd.io/_uploads/ByDfCogbJe.png"
style="
width: 70%;
height: auto;">
<div style="
border-bottom: 3px solid #d9d9d9;
display: inline-block;
color: #999;
padding: 3px;">
</div>
</center>
#### 3.2.3 Example - Mean
```java!
// LongWritable: 是輸入鍵的類型,代表行的偏移量。
// Text: 是輸入值的類型,即每行的文字。
// Text: 是輸出鍵的類型。
// MapWritable: 是輸出值的類型,用來存放每行數字的數量和總和。
public static class Map extends Mapper<LongWritable, Text, Text, MapWritable>
```
<center>
<img src="https://hackmd.io/_uploads/B1w5J3xbye.png"
style="
width: 70%;
height: auto;">
<div style="
border-bottom: 3px solid #d9d9d9;
display: inline-block;
color: #999;
padding: 3px;">
</div>
</center>
<br>
### 3.3 Reducer
#### 3.3.1 Example - Word Count
```java!
// Text: 是輸入鍵的類型,表示每個字詞。
// IntWritable: 是輸入值的類型,表示每個字詞的總次數。
// Text: 是輸出鍵的類型,表示每個字詞。
// IntWritable: 是輸出值的類型,用來儲存每個字詞的總次數。
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>
```
<center>
<img src="https://hackmd.io/_uploads/SkZay3x-kx.png"
style="
width: 70%;
height: auto;">
<div style="
border-bottom: 3px solid #d9d9d9;
display: inline-block;
color: #999;
padding: 3px;">
</div>
</center>
#### 3.3.2 Example - Sum
```java!
// Text: 輸入鍵和輸出鍵類型,即 "Total"。
// IntWritable: 輸入值和輸出值的類型為,即每個數字的值。
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>
```
<center>
<img src="https://hackmd.io/_uploads/HkLygheW1e.png"
style="
width: 70%;
height: auto;">
<div style="
border-bottom: 3px solid #d9d9d9;
display: inline-block;
color: #999;
padding: 3px;">
</div>
</center>
#### 3.3.3 Example - Mean
```java!
// Text: 輸入鍵類型,與 Mapper 的輸出鍵一致。
// MapWritable: 輸入值類型,包含數字的計數和總和。
// Text: 輸出鍵類型。
// DoubleWritable: 輸出值類型,用來表示計算出的平均值。
public static class Reduce extends Reducer<Text, MapWritable, Text, DoubleWritable>
```
<center>
<img src="https://hackmd.io/_uploads/Sktvxhl-1g.png"
style="
width: 70%;
height: auto;">
<div style="
border-bottom: 3px solid #d9d9d9;
display: inline-block;
color: #999;
padding: 3px;">
</div>
</center>
<br>
### 3.4 Main
```java!
// 1. Configuration conf = new Configuration(): 建立一個新的 Configuration 物件 conf,用來儲存 Hadoop 作業的配置信息。這是 Job 設定所需的基礎設定。
// 2. Job job = new Job(conf, "[Output Text]"): 創建一個新的 Job 物件 job,並將 conf 配置和作業名稱 “[Output Text]” 傳入。
// 3. job.setMapperClass(Map.class): 設定 Mapper 類別,指定我們之前定義的 Map 類別。
// 4. job.setReducerClass(Reduce.class): 設定 Reducer 類別,指定我們之前定義的 Reduce 類別。
// 5. job.setInputFormatClass(TextInputFormat.class): 指定輸入格式類別, 表示每一行文字是 Mapper 的輸入。
// 6. job.setOutputFormatClass(TextOutputFormat.class): 指定輸出格式類別,表示輸出為純文字格式。
// 7. FileInputFormat.addInputPath(job, new Path(args[0])): 設定作業的輸入路徑。args[0] 表示從命令列傳入的第一個參數(即輸入資料路徑)。
// 8. FileOutputFormat.setOutputPath(job, new Path(args[1]))```設定作業的輸出路徑。args[1] 表示從命令列傳入的第二個參數(即輸出結果路徑)。
// 9. job.waitForCompletion(true): 啟動 MapReduce 作業並等待它完成。參數 true 表示在作業完成時向終端顯示進度。
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "[Output Text]");
job.setOutputKeyClass(...);
job.setOutputValueClass(...);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setJarByClass(...);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
```
<center>
<img src="https://hackmd.io/_uploads/HkbVYelZkg.png"
style="
width: 90%;
height: auto;">
<div style="
border-bottom: 3px solid #d9d9d9;
display: inline-block;
color: #999;
padding: 3px;">
</div>
</center>
<br>
## 4. Hadoop Project Whole Implementation
### 4.1 Hadoop Project:Word Count
#### 4.1.1 Map
```java=
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
String token = tokenizer.nextToken();
context.write(new Text(token), new IntWritable(1));
}
}
}
```
- ```public void map(LongWritable key, Text value, Context context)```:
1. ```key```: 文字的偏移量,在此 Word Count 實作不使用。
2. ```value```: 每一行的文字內容。
3. ```context```: 寫出 key-value 組對,在本例中,是每個字詞和計數 1。
- ```String line = value.toString()```: 輸入行文字轉換為字串類型。
- ```StringTokenizer tokenizer = new StringTokenizer(line)```: 將該行文字以空白 (或分隔符號) 分割為字詞。
- ```context.write(new Text(token), new IntWritable(1))```: 輸出 key-value 組對。這樣的輸出結果會被傳遞給 Reducer 階段來累加各個字詞的總次數。
:::info
**注意事項**: 目的是遍歷輸入文字每一行,將每個字詞以 **(字詞, 1)** 形式輸出。
:::
<br>
#### 4.1.2 Reducer
```java=
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
```
- ```public void reduce(Text key, Iterable<IntWritable> values, Context context)```:
1. ```key```: 是每個唯一字詞。
2. ```values```: 是一個包含 IntWritable 元素的迭代器,儲存了該字詞每次出現時的計數(每個值都為 1)。
3. context 用於將最終結果寫出。
- ```sum += val.get()```: 從 values 中取得一個 IntWritable 物件 val,並透過 val.get() 方法取得整數(此例中為 1),然後加到 sum 變數中。
:::info
**注意事項**: 累加同一字詞的出現次數計算字詞總次數,並輸出最終結果 **(字詞, 次數)**。
:::
<br>
#### 4.1.3 main
```java=
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "wordcount");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setJarByClass(WordCount.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
```
- ```job.setOutputKeyClass(Text.class)```
1. 指定 MapReduce 作業的輸出鍵和值的類型:
2. Text.class 表示輸出鍵類型為 Text(字詞)。
- ```job.setOutputValueClass(IntWritable.class)```
1. 指定 MapReduce 作業的輸出鍵和值的類型:
2. IntWritable.class 表示輸出值類型為 IntWritable(字詞的出現次數)。
- ```job.setJarByClass(WordCount.class)```: 指定作業邏輯主類別(WordCount.class),這樣 Hadoop 可以識別並打包正確的 Jar 文件,並將它分發到叢集節點上執行。
<center>
<img src="https://hackmd.io/_uploads/HkLVKse-Je.png"
style="
width: 90%;
height: auto;">
<div style="
border-bottom: 3px solid #d9d9d9;
display: inline-block;
color: #999;
padding: 3px;">
</div>
</center>
<br>
### 4.2 Hadoop Project:Sum
#### 4.2.1 Map
```java=
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
String token = tokenizer.nextToken();
context.write(new Text("Total"), new IntWritable(Integer.parseInt(token)));
}
}
}
```
- ```public void map(LongWritable key, Text value, Context context)```
1. ```key```: 表示該行偏移量(在此例中不使用)。
2. ```value```: 表示每行的文字內容。
4. ```context```: 寫出中間結果。
- ```context.write(new Text("Total"), new IntWritable(Integer.parseInt(token)))```
1. 將 token 轉換為整數並用 IntWritable 包裝,然後與固定鍵 "Total" 一起輸出。
2. 所有的數字都會累加到 "Total" 鍵之下,在 Reducer 階段進行最終的加總。
<br>
#### 4.2.2 Reducer
```java=
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
```
- ```public void reduce(Text key, Iterable<IntWritable> values, Context context)```
1. ```key```: 是 "Total"(唯一鍵)。
2. ```values```: 是一個包含 IntWritable 元素的迭代器,儲存所有數字的值。
3. ```context```: 用於寫出最終結果。
<br>
#### 4.2.3 main
```java=
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "Sum");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setJarByClass(WordCount.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
```
<center>
<img src="https://hackmd.io/_uploads/S1eqQne-1l.png"
style="
width: 90%;
height: auto;">
<div style="
border-bottom: 3px solid #d9d9d9;
display: inline-block;
color: #999;
padding: 3px;">
</div>
</center>
<br>
### 4.3 Hadoop Project:Mean
#### 4.3.1 Map
```java=
public static class Map extends Mapper<LongWritable, Text, Text, MapWritable> {
private Text word = new Text();
private MapWritable arry = new MapWritable();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
int count = 0;
int sum = 0;
while (tokenizer.hasMoreTokens()) {
count ++;
sum = sum + Integer.parseInt(tokenizer.nextToken());
}
arry.put(new IntWritable(0), new IntWritable(count));
arry.put(new IntWritable(1), new IntWritable(sum));
context.write(word, arry);
}
}
```
- ```Text word = new Text()``` 和 ```MapWritable arry = new MapWritable()```
1. 宣告 Text 類型的變數 word 和 MapWritable 類型的變數 arry。
2. ```word```: 變數作為輸出鍵,在此例中並未初始化具體值,因此默認為空值。
3. ```arry```: 變數用來儲存計算出的數字總和和計數結果。
- ```public void map(LongWritable key, Text value, Context context)```
1. key 是行的偏移量,在此例中不使用。
2. value 是每行的文字內容。
3. context 用來輸出中間結果。
- ```arry.put(new IntWritable(0), new IntWritable(count))``` 和 ```arry.put(new IntWritable(1), new IntWritable(sum))```
1. 將計算出的數量 count 和總和 sum 存入 MapWritable 變數 arry。
2. ```new IntWritable(0)``` 和 ```new IntWritable(1)``` 作為鍵,分別對應 count 和 sum 的值。
<br>
#### 4.3.2 Reducer
```java=
public static class Reduce extends Reducer<Text, MapWritable, Text, DoubleWritable> {
public void reduce(Text key, Iterable<MapWritable> arry, Context context) throws IOException, InterruptedException {
int sum = 0;
int count = 0;
for (MapWritable ar : arry) {
count = count + ((IntWritable)ar.get(new IntWritable(0))).get();
sum = sum + ((IntWritable)ar.get(new IntWritable(1))).get();
}
context.write(key, new DoubleWritable(sum/count));
}
}
```
- ```public void reduce(Text key, Iterable<MapWritable> arry, Context context)```
1. ```key```: 是 Text 類型的鍵值(默認為空字串)。
2. ```arry```: 是一個包含 MapWritable 的迭代器,每個 MapWritable 包含 count(數字數量)和 sum(數字總和)。
3. ```context```: 用來輸出結果。
- ```count = count + ((IntWritable)ar.get(new IntWritable(0))).get()```
1. 使用 ```ar.get(new IntWritable(0))``` 取得每行的 count 值,並將其轉型為 IntWritable。
2. 使用 .get() 方法取得其整數值並加到 count。
- ```sum = sum + ((IntWritable)ar.get(new IntWritable(1))).get()```
1. 使用 ```ar.get(new IntWritable(1))``` 取得每行的 sum 值,並將其轉型為 IntWritable。
2. 使用 .get() 方法取得其整數值並加到 sum。
<br>
#### 4.3.3 main
```java=
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "Avg");
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MapWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
job.setJarByClass(Average.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
```
- ```job.setMapOutputKeyClass(Text.class)```和 ```job.setMapOutputValueClass(MapWritable.class)```
1. 指定 Mapper 的輸出鍵和值的類型:
2. Text.class 為輸出鍵類型,即我們在 Mapper 中設定的固定鍵(word)。
3. MapWritable.class 為輸出值類型,包含每行的數字總和和計數。
- ```job.setOutputKeyClass(Text.class)``` 和```job.setOutputValueClass(DoubleWritable.class)```
1. 設定 Reducer 的最終輸出鍵和值的類型:
2. Text.class 為輸出鍵類型。
3. DoubleWritable.class 為輸出值類型,用來表示平均值。
<center>
<img src="https://hackmd.io/_uploads/Sktvxhl-1g.png"
style="
width: 70%;
height: auto;">
<div style="
border-bottom: 3px solid #d9d9d9;
display: inline-block;
color: #999;
padding: 3px;">
</div>
</center>