# 【Introduction & Implementation】 <center> <img src="https://hackmd.io/_uploads/H1lMB2eZyl.png" style=" width: 90%; height: auto;"> <div style=" border-bottom: 3px solid #d9d9d9; display: inline-block; color: #999; padding: 3px;"> </div> </center> <br> ## 1. Hadoop ### 1.1 Hadoop Architecture #### 1.1.1Hadoop MapReduce - Single master node, many worker nodes - Client submits a job to master node - Master splits each job into tasks (map/reduce), and assigns tasks to worker nodes #### 1.1.2 Hadoop Distributed File System (HDFS) - Single name node, many data nodes - Files stored as large, fixed-size (e.g. 64MB) blocks - HDFS typically holds map input and reduce output ### 1.2 Hadoop Workflow #### 1.2.1 Hadoop Dataflow - Whole Flow <center> <img src="https://hackmd.io/_uploads/BJloJ1lbkl.png" style=" width: 90%; height: auto;"> <div style=" border-bottom: 3px solid #d9d9d9; display: inline-block; color: #999; padding: 3px;"> </div> </center> #### 1.2.2 Hadoop Dataflow - Map Reduce Flow <center> <img src="https://hackmd.io/_uploads/ryh8Nyl-yl.png" style=" width: 90%; height: auto;"> <div style=" border-bottom: 3px solid #d9d9d9; display: inline-block; color: #999; padding: 3px;"> </div> </center> <br> ## 2. MapReduce ### 2.1 Why MapReduce is proposed ? - A simplified platform for programmer or scientists with non-parallel background - Hadoop: an open source implementation for MapReduce ### 2.2 What is MapReduce? - **Parallel programming** model for clusters of commodity machines - Platform for reliable, scalable parallel computing - Abstracts issues of parallel environment from programmer. - Data-parallel programming model for clusters of commodity machines - Pioneered by Google: Processes 20 PB of data per day - Popularized by open-source Hadoop project: Used by Yahoo!, Facebook, Amazon, … ### 2.3 MapReduce Implementation - Google has a proprietary implementation in C++: Bindings in Java, Python - **Hadoop** is an open-source implementation in Java 1. Development led by Yahoo, used in production 2. Now an **Apache project** 3. Rapidly expanding software ecosystem #### 2.3.1 MapReduce Programming Model - We writing two functions, Map and Reduce. (have a special form key-value pairs) - All values with the **same key** are sent to the **same reducer** 1. **Data type**:key-value records 2. **Map function**:$(K_{in}, V_{in}) \rightarrow list(K_{inter}, V_{inter})$ 3. **Reduce function**: $(K_{inter}, list(V_{inter})) \rightarrow list(K_{out}, V_{out})$ <center> <img src="https://hackmd.io/_uploads/BkyauygZyg.png" style=" width: 90%; height: auto;"> <div style=" border-bottom: 3px solid #d9d9d9; display: inline-block; color: #999; padding: 3px;"> </div> </center> <br> #### 2.3.2 Pay Attention on the following things - **Primitive Type** <center> | Java Primitive | Writable Implementation | | :--: | :--: | | boolean | BooleanWritable | | byte | ByteWritable | | int | IntWritable & VIntWritable | | float | FloatWritable | | long | LongWritable & VLongWritable | | double | DoubleWritable | </center> - **Context Object** 1. context.write() 2. Hadoop I/O Controlling 3. Key, Value ## 3. MapReduce Implementation Tips ### 3.1 MapReduce Example #### 3.1.1 Example - Word Count <center> <img src="https://hackmd.io/_uploads/HkLVKse-Je.png" style=" width: 90%; height: auto;"> <div style=" border-bottom: 3px solid #d9d9d9; display: inline-block; color: #999; padding: 3px;"> </div> </center> #### 3.1.2 Example - Sum <center> <img src="https://hackmd.io/_uploads/Hkk3pilZyg.png" style=" width: 90%; height: auto;"> <div style=" border-bottom: 3px solid #d9d9d9; display: inline-block; color: #999; padding: 3px;"> </div> </center> #### 3.1.3 Example - Mean <center> <img src="https://hackmd.io/_uploads/S13RMneZkg.png" style=" width: 90%; height: auto;"> <div style=" border-bottom: 3px solid #d9d9d9; display: inline-block; color: #999; padding: 3px;"> </div> </center> <br> ### 3.2 Mapper 繼承了 Hadoop 的 Mapper 類別,**提供了 MapReduce 過程中的「映射」階段**,在這個階段,輸入的資料會被處理並轉換為 **key-value** 組對形式 #### 3.2.1 Example - Word Count ```java! // LongWritable: 是輸入鍵的類型,表示每行文字的偏移量 (offset)。 // Text: 是輸入值的類型,即輸入的行文字。 // Text: 是輸出鍵的類型,表示每個字詞。 // IntWritable: 是輸出值的類型,用來儲存每個字詞的次數。 public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> ``` <center> <img src="https://hackmd.io/_uploads/BJxL9jlW1l.png" style=" width: 70%; height: auto;"> <div style=" border-bottom: 3px solid #d9d9d9; display: inline-block; color: #999; padding: 3px;"> </div> </center> #### 3.2.2 Example - Sum ```java! // LongWritable: 是輸入鍵的類型,代表每行的偏移量。 // Text: 是輸入值的類型,即每行的文字。 // Text: 是輸出鍵的類型,這裡我們固定將鍵設為 "Total"。 // IntWritable: 是輸出值的類型,代表每個數字的值。 public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> ``` <center> <img src="https://hackmd.io/_uploads/ByDfCogbJe.png" style=" width: 70%; height: auto;"> <div style=" border-bottom: 3px solid #d9d9d9; display: inline-block; color: #999; padding: 3px;"> </div> </center> #### 3.2.3 Example - Mean ```java! // LongWritable: 是輸入鍵的類型,代表行的偏移量。 // Text: 是輸入值的類型,即每行的文字。 // Text: 是輸出鍵的類型。 // MapWritable: 是輸出值的類型,用來存放每行數字的數量和總和。 public static class Map extends Mapper<LongWritable, Text, Text, MapWritable> ``` <center> <img src="https://hackmd.io/_uploads/B1w5J3xbye.png" style=" width: 70%; height: auto;"> <div style=" border-bottom: 3px solid #d9d9d9; display: inline-block; color: #999; padding: 3px;"> </div> </center> <br> ### 3.3 Reducer #### 3.3.1 Example - Word Count ```java! // Text: 是輸入鍵的類型,表示每個字詞。 // IntWritable: 是輸入值的類型,表示每個字詞的總次數。 // Text: 是輸出鍵的類型,表示每個字詞。 // IntWritable: 是輸出值的類型,用來儲存每個字詞的總次數。 public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> ``` <center> <img src="https://hackmd.io/_uploads/SkZay3x-kx.png" style=" width: 70%; height: auto;"> <div style=" border-bottom: 3px solid #d9d9d9; display: inline-block; color: #999; padding: 3px;"> </div> </center> #### 3.3.2 Example - Sum ```java! // Text: 輸入鍵和輸出鍵類型,即 "Total"。 // IntWritable: 輸入值和輸出值的類型為,即每個數字的值。 public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> ``` <center> <img src="https://hackmd.io/_uploads/HkLygheW1e.png" style=" width: 70%; height: auto;"> <div style=" border-bottom: 3px solid #d9d9d9; display: inline-block; color: #999; padding: 3px;"> </div> </center> #### 3.3.3 Example - Mean ```java! // Text: 輸入鍵類型,與 Mapper 的輸出鍵一致。 // MapWritable: 輸入值類型,包含數字的計數和總和。 // Text: 輸出鍵類型。 // DoubleWritable: 輸出值類型,用來表示計算出的平均值。 public static class Reduce extends Reducer<Text, MapWritable, Text, DoubleWritable> ``` <center> <img src="https://hackmd.io/_uploads/Sktvxhl-1g.png" style=" width: 70%; height: auto;"> <div style=" border-bottom: 3px solid #d9d9d9; display: inline-block; color: #999; padding: 3px;"> </div> </center> <br> ### 3.4 Main ```java! // 1. Configuration conf = new Configuration(): 建立一個新的 Configuration 物件 conf,用來儲存 Hadoop 作業的配置信息。這是 Job 設定所需的基礎設定。 // 2. Job job = new Job(conf, "[Output Text]"): 創建一個新的 Job 物件 job,並將 conf 配置和作業名稱 “[Output Text]” 傳入。 // 3. job.setMapperClass(Map.class): 設定 Mapper 類別,指定我們之前定義的 Map 類別。 // 4. job.setReducerClass(Reduce.class): 設定 Reducer 類別,指定我們之前定義的 Reduce 類別。 // 5. job.setInputFormatClass(TextInputFormat.class): 指定輸入格式類別, 表示每一行文字是 Mapper 的輸入。 // 6. job.setOutputFormatClass(TextOutputFormat.class): 指定輸出格式類別,表示輸出為純文字格式。 // 7. FileInputFormat.addInputPath(job, new Path(args[0])): 設定作業的輸入路徑。args[0] 表示從命令列傳入的第一個參數(即輸入資料路徑)。 // 8. FileOutputFormat.setOutputPath(job, new Path(args[1]))```設定作業的輸出路徑。args[1] 表示從命令列傳入的第二個參數(即輸出結果路徑)。 // 9. job.waitForCompletion(true): 啟動 MapReduce 作業並等待它完成。參數 true 表示在作業完成時向終端顯示進度。 public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "[Output Text]"); job.setOutputKeyClass(...); job.setOutputValueClass(...); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setJarByClass(...); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } ``` <center> <img src="https://hackmd.io/_uploads/HkbVYelZkg.png" style=" width: 90%; height: auto;"> <div style=" border-bottom: 3px solid #d9d9d9; display: inline-block; color: #999; padding: 3px;"> </div> </center> <br> ## 4. Hadoop Project Whole Implementation ### 4.1 Hadoop Project:Word Count #### 4.1.1 Map ```java= public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { String token = tokenizer.nextToken(); context.write(new Text(token), new IntWritable(1)); } } } ``` - ```public void map(LongWritable key, Text value, Context context)```: 1. ```key```: 文字的偏移量,在此 Word Count 實作不使用。 2. ```value```: 每一行的文字內容。 3. ```context```: 寫出 key-value 組對,在本例中,是每個字詞和計數 1。 - ```String line = value.toString()```: 輸入行文字轉換為字串類型。 - ```StringTokenizer tokenizer = new StringTokenizer(line)```: 將該行文字以空白 (或分隔符號) 分割為字詞。 - ```context.write(new Text(token), new IntWritable(1))```: 輸出 key-value 組對。這樣的輸出結果會被傳遞給 Reducer 階段來累加各個字詞的總次數。 :::info **注意事項**: 目的是遍歷輸入文字每一行,將每個字詞以 **(字詞, 1)** 形式輸出。 ::: <br> #### 4.1.2 Reducer ```java= public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } } ``` - ```public void reduce(Text key, Iterable<IntWritable> values, Context context)```: 1. ```key```: 是每個唯一字詞。 2. ```values```: 是一個包含 IntWritable 元素的迭代器,儲存了該字詞每次出現時的計數(每個值都為 1)。 3. context 用於將最終結果寫出。 - ```sum += val.get()```: 從 values 中取得一個 IntWritable 物件 val,並透過 val.get() 方法取得整數(此例中為 1),然後加到 sum 變數中。 :::info **注意事項**: 累加同一字詞的出現次數計算字詞總次數,並輸出最終結果 **(字詞, 次數)**。 ::: <br> #### 4.1.3 main ```java= public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "wordcount"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setJarByClass(WordCount.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } ``` - ```job.setOutputKeyClass(Text.class)``` 1. 指定 MapReduce 作業的輸出鍵和值的類型: 2. Text.class 表示輸出鍵類型為 Text(字詞)。 - ```job.setOutputValueClass(IntWritable.class)``` 1. 指定 MapReduce 作業的輸出鍵和值的類型: 2. IntWritable.class 表示輸出值類型為 IntWritable(字詞的出現次數)。 - ```job.setJarByClass(WordCount.class)```: 指定作業邏輯主類別(WordCount.class),這樣 Hadoop 可以識別並打包正確的 Jar 文件,並將它分發到叢集節點上執行。 <center> <img src="https://hackmd.io/_uploads/HkLVKse-Je.png" style=" width: 90%; height: auto;"> <div style=" border-bottom: 3px solid #d9d9d9; display: inline-block; color: #999; padding: 3px;"> </div> </center> <br> ### 4.2 Hadoop Project:Sum #### 4.2.1 Map ```java= public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { String token = tokenizer.nextToken(); context.write(new Text("Total"), new IntWritable(Integer.parseInt(token))); } } } ``` - ```public void map(LongWritable key, Text value, Context context)``` 1. ```key```: 表示該行偏移量(在此例中不使用)。 2. ```value```: 表示每行的文字內容。 4. ```context```: 寫出中間結果。 - ```context.write(new Text("Total"), new IntWritable(Integer.parseInt(token)))``` 1. 將 token 轉換為整數並用 IntWritable 包裝,然後與固定鍵 "Total" 一起輸出。 2. 所有的數字都會累加到 "Total" 鍵之下,在 Reducer 階段進行最終的加總。 <br> #### 4.2.2 Reducer ```java= public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } } ``` - ```public void reduce(Text key, Iterable<IntWritable> values, Context context)``` 1. ```key```: 是 "Total"(唯一鍵)。 2. ```values```: 是一個包含 IntWritable 元素的迭代器,儲存所有數字的值。 3. ```context```: 用於寫出最終結果。 <br> #### 4.2.3 main ```java= public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "Sum"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setJarByClass(WordCount.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } ``` <center> <img src="https://hackmd.io/_uploads/S1eqQne-1l.png" style=" width: 90%; height: auto;"> <div style=" border-bottom: 3px solid #d9d9d9; display: inline-block; color: #999; padding: 3px;"> </div> </center> <br> ### 4.3 Hadoop Project:Mean #### 4.3.1 Map ```java= public static class Map extends Mapper<LongWritable, Text, Text, MapWritable> { private Text word = new Text(); private MapWritable arry = new MapWritable(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); int count = 0; int sum = 0; while (tokenizer.hasMoreTokens()) { count ++; sum = sum + Integer.parseInt(tokenizer.nextToken()); } arry.put(new IntWritable(0), new IntWritable(count)); arry.put(new IntWritable(1), new IntWritable(sum)); context.write(word, arry); } } ``` - ```Text word = new Text()``` 和 ```MapWritable arry = new MapWritable()``` 1. 宣告 Text 類型的變數 word 和 MapWritable 類型的變數 arry。 2. ```word```: 變數作為輸出鍵,在此例中並未初始化具體值,因此默認為空值。 3. ```arry```: 變數用來儲存計算出的數字總和和計數結果。 - ```public void map(LongWritable key, Text value, Context context)``` 1. key 是行的偏移量,在此例中不使用。 2. value 是每行的文字內容。 3. context 用來輸出中間結果。 - ```arry.put(new IntWritable(0), new IntWritable(count))``` 和 ```arry.put(new IntWritable(1), new IntWritable(sum))``` 1. 將計算出的數量 count 和總和 sum 存入 MapWritable 變數 arry。 2. ```new IntWritable(0)``` 和 ```new IntWritable(1)``` 作為鍵,分別對應 count 和 sum 的值。 <br> #### 4.3.2 Reducer ```java= public static class Reduce extends Reducer<Text, MapWritable, Text, DoubleWritable> { public void reduce(Text key, Iterable<MapWritable> arry, Context context) throws IOException, InterruptedException { int sum = 0; int count = 0; for (MapWritable ar : arry) { count = count + ((IntWritable)ar.get(new IntWritable(0))).get(); sum = sum + ((IntWritable)ar.get(new IntWritable(1))).get(); } context.write(key, new DoubleWritable(sum/count)); } } ``` - ```public void reduce(Text key, Iterable<MapWritable> arry, Context context)``` 1. ```key```: 是 Text 類型的鍵值(默認為空字串)。 2. ```arry```: 是一個包含 MapWritable 的迭代器,每個 MapWritable 包含 count(數字數量)和 sum(數字總和)。 3. ```context```: 用來輸出結果。 - ```count = count + ((IntWritable)ar.get(new IntWritable(0))).get()``` 1. 使用 ```ar.get(new IntWritable(0))``` 取得每行的 count 值,並將其轉型為 IntWritable。 2. 使用 .get() 方法取得其整數值並加到 count。 - ```sum = sum + ((IntWritable)ar.get(new IntWritable(1))).get()``` 1. 使用 ```ar.get(new IntWritable(1))``` 取得每行的 sum 值,並將其轉型為 IntWritable。 2. 使用 .get() 方法取得其整數值並加到 sum。 <br> #### 4.3.3 main ```java= public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "Avg"); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(MapWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); job.setJarByClass(Average.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } ``` - ```job.setMapOutputKeyClass(Text.class)```和 ```job.setMapOutputValueClass(MapWritable.class)``` 1. 指定 Mapper 的輸出鍵和值的類型: 2. Text.class 為輸出鍵類型,即我們在 Mapper 中設定的固定鍵(word)。 3. MapWritable.class 為輸出值類型,包含每行的數字總和和計數。 - ```job.setOutputKeyClass(Text.class)``` 和```job.setOutputValueClass(DoubleWritable.class)``` 1. 設定 Reducer 的最終輸出鍵和值的類型: 2. Text.class 為輸出鍵類型。 3. DoubleWritable.class 為輸出值類型,用來表示平均值。 <center> <img src="https://hackmd.io/_uploads/Sktvxhl-1g.png" style=" width: 70%; height: auto;"> <div style=" border-bottom: 3px solid #d9d9d9; display: inline-block; color: #999; padding: 3px;"> </div> </center>