教材:10710周志遠教授平行程式
https://www.youtube.com/playlist?list=PLS0SUwlYe8cxqw70UHOE5n4Lm-mXFXbZT
20250916 筆記
內容可能有錯僅供參考
在這個章節開始投影片和10710周志遠教授平行程式有差異,接下來以
NTHU-PP-2021 (Chinese) 為主
[NTHU-PP-Chap10-Big Data-Part3](https://www.youtube.com/watch?v=dKx06kwUpns&list=PLQTDfc8kgjhMKtgumyK0gmEelnTtqJGsp&index=24&t=32s)
搭配 22C~22D Distributed Computing for Big Data Using MapReduce
24A~24B Information Retrieval Algorithms
今日大綱
1. MapReduce應用與資訊檢索 (Information Retrieval, IR)
2. 資訊檢索系統架構
3. 排序檢索的相似度計算:TF-IDF
4. MapReduce的限制與高階抽象:Hive與Pig
5. Spark:解決MapReduce的迭代與互動式限制
### **MapReduce應用與資訊檢索 (Information Retrieval, IR)**
* 我們已經討論過MapReduce程式模型及其運作過程。
* 接下來將探討MapReduce如何應用於處理巨量資料 (big data) 的實際問題。
* MapReduce最初開發主要是為了解決**資訊檢索 (Information Retrieval, IR)** 相關問題,特別是針對文字資料,例如Google早期處理和搜尋網頁內容。
* IR領域建立已久,應用廣泛,從大量資訊中提取所需資訊。
* IR的基本問題是用戶輸入**查詢 (query)**,描述所需的**概念**。
* 同時,被搜尋的**文件 (document)** 也包含許多概念,需用文字來描述。
* 目標是根據查詢的概念表示與文件概念表示的相似度,找到用戶所需的資訊。
* 這需要定義、量測和計算概念之間的相似度。
### **資訊檢索系統架構**
* 典型的網頁或文件搜尋系統架構:用戶提交查詢 (query),系統在大量文件 (documents) 中找到符合查詢的文件。
* **文件表示 (Document Representation)**:文件需透過某種「表示函數」來描述其內容,這是IR的一個核心問題。
* **索引 (Index)**:為加速搜尋,系統通常會建立索引,就像電話簿或字典一樣,透過關鍵字快速找到目標資訊。
* 查詢也需要透過類似的表示函數來描述,以方便文件與查詢之間的比較判斷。
* **計算的時間點 (Computational Aspects)**:
* **索引建立 (Indexing)**:通常是**離線 (offline)** 進行的。因為建立索引需要花費大量時間(編排文字、建立資料結構),文件數量越多、規模越大,所需時間越長,這是一個標準的巨量資料問題。MapReduce非常適合處理這種需要大量資料處理的**批次處理 (batch processing)** 任務,因此最適合用於建立索引的計算。
* **查詢處理 (Querying)**:通常是**線上 (online)** 進行的,用戶期望立即獲得搜尋結果。時間效率非常重要,這也是需要索引來加速搜尋的原因。
### **文件表示方法:詞袋模型 (Bag of Words)**
* 第一步是文件表示 (representation)。
* 最簡單的表示方式是**詞袋模型 (Bag of Words, BoW)**。
* BoW模型完全不考慮語義、斷詞或詞序,將文件拆解成許多獨立的字詞 (words),並以此進行搜尋。
* 儘管簡單,但它足夠有效率和效果來解決IR問題。
* **BoW模型的前提假設**:每個字詞都是獨立的,不考慮字詞之間的關係或進一步分析。
* **文字資料前處理步驟 (Preprocessing)**:
1. **分詞 (Tokenization)**:將文字切分成一個個的獨立詞元 (tokens),例如斷句或將複合詞拆開。
2. **停用詞移除 (Stop Word Removal)**:移除對搜尋意義不大或頻繁出現的字詞 (如「的」、「是」、「a」、「an」) 和標點符號,以減少搜尋次數。
3. **正規化 (Normalization)**:將同一個字的不同形式 (如單複數、不同時態) 或不同語言的對應詞,映射到單一的標準字詞上。
* BoW模型不考慮語法、詞義或其延伸意義。
* 這三個前處理步驟非常適合使用MapReduce的Mapper功能來實現,Mapper可以處理每一行文字並輸出所需的詞元。
### **反向索引 (Inverted Index)**
* 資訊檢索中常用來加速搜尋的索引是**反向索引 (Inverted Index)**。
* 「反向」的意思是透過**字詞 (word) 或關鍵字**來反向搜尋並對應到**文件ID (document ID)**,而非從文件內容找字詞。
* **查詢檢索類型 (Retrieval Types)**:
* **布林檢索 (Boolean Retrieval)**:最簡單的方式,使用布林運算符 (AND, OR, NOT) 來決定是否檢索文件。
* 優點:結果明確,文件要麼符合,要麼不符合。
* 缺點:無法對符合條件的文件進行排序或分級,且過於嚴謹,可能錯過稍有差異但相關的文件。
* **排序檢索 (Ranked Retrieval)**:更常見的方式,會根據相關性高低對檢索結果進行排序 (ranking)。
* 優點:對於查詢不明確的情況更實用;可以先顯示相關性最高的結果,並在後台逐步計算並顯示其他較不相關的結果,從而隱藏搜尋成本。
* 缺點:定義和量化文件與查詢之間的相似度 (similarity) 或接近度 (closeness) 更加困難,特別是在詞袋表示法下。
### **排序檢索的相似度計算:TF-IDF**
* 相似度計算通常使用**向量空間模型 (Vector Space Model)**。
* 文件和查詢都可以表示為字詞向量 (vector),向量的維度是所有獨特字詞的數量。
* 向量中的值表示字詞是否出現 (0/1) 或出現頻率。
* **相似度量測**:透過計算文件向量和查詢向量之間的夾角 (angle) 來衡量。夾角越小,表示越接近。
* **餘弦相似度 (Cosine Similarity)** 常用於計算向量夾角,數學上涉及內積和範數計算。
* 為簡化計算,有時會使用**內積 (inner product)** 作為相似度的近似值。
* **字詞權重 (Term Weighting)**:單純的內積可能無法反映字詞的真實重要性,因此引入字詞權重。
* **局部權重 (Local Weight)**:
* **詞頻 (Term Frequency, TF)**:一個字詞在**單一文件**中出現的頻率。
* TF越高,表示該字詞對該文件越重要 (例如,關於「狗」的文章中「狗」字會頻繁出現)。
* TF與重要性呈**正相關**。
* **全域權重 (Global Weight)**:
* **文件頻率 (Document Frequency, DF)**:一個字詞在**所有文件**中出現的機率。
* DF越高,表示該字詞對區分不同文件的重要性越低 (例如,常見詞如「我」、「是」在所有文件中都出現,就沒有區分度)。
* DF與重要性呈**負相關** (稱為逆文件頻率 Inverse Document Frequency, IDF)。
* **TF-IDF (Term Frequency-Inverse Document Frequency)**:
* Google早期建立搜尋引擎時,採用了著名的TF-IDF加權方法。
* TF-IDF結合了局部TF的正相關性與全域DF的負相關性。
* TF-IDF的定義通常為 `TF_ij * IDF_i`。
* `TF_ij` 指的是字詞 `i` 在文件 `j` 中的詞頻。
* `IDF_i` 指的是字詞 `i` 的逆文件頻率,通常計算為 `log(N / DF_i)`,其中 `N` 是總文件數,`DF_i` 是字詞 `i` 出現的文件數。
* 如果字詞 `i` 在所有 `N` 個文件中都出現 (`DF_i = N`),則 `IDF_i` 為 `log(1) = 0`,該字詞的權重為0,表示不重要。
* 如果 `DF_i` 越小,`IDF_i` 越大,表示該字詞越重要。
* 這種概念設計類似於機器學習中的特徵工程。
### **使用MapReduce建構反向索引**
* **反向索引的資料結構**:
* 為節省空間,反向索引通常以連結串列 (linked list) 或稀疏陣列的方式儲存,而非完整的二維陣列。
* **鍵 (Key)**:搜尋的字詞 (word)。
* **值 (Value)**:一個包含「文件頻率 (DF)」和一個串列 (list) 的結構,串列中包含多個 `(文件ID, 詞頻)` 的配對,表示該字詞在哪些文件中出現了多少次。
* 例如,字詞 "blue" -> DF: 1, `[(文件2, 1)]`;字詞 "fish" -> DF: 4, `[(文件1, 2), (文件2, 2), (文件3, 1), (文件4, 1)]`。
* **MapReduce建構反向索引的流程**:
* **Mapper功能**:
* 每個Mapper負責處理一份文件 (或文件的一部分)。
* 讀取文件內容,進行分詞、停用詞移除、正規化等前處理。
* 對於文件中出現的每個字詞,Mapper會輸出一個中間鍵值對 `(字詞, (文件ID, 詞頻))`。例如,對於文件1中的 "fish" 出現2次,Mapper輸出 `(fish, (1, 2))`。
* **Shuffle與Sort階段**:
* MapReduce框架會根據鍵 (字詞) 對所有Mapper的中間輸出進行**分組 (grouping)** 和**排序 (sorting)**。
* 所有相同字詞的鍵值對會被匯總在一起,並送到同一個Reducer。
* 字詞排序有助於查詢時透過二分搜尋等方式快速定位。
* **Reducer功能**:
* 每個Reducer接收到一個字詞及其對應的 `(文件ID, 詞頻)` 串列,例如 `(fish, [(1, 2), (2, 2), (3, 1), (4, 1)])`。
* Reducer計算該字詞的**文件頻率 (DF)** (即串列中不同文件ID的數量)。
* Reducer將這些 `(文件ID, 詞頻)` 配對以及DF值組合成最終的反向索引條目並輸出。
* **優化:值排序 (Sorting Values)**
* 為了在查詢時更有效率,Reducer輸出的 `(文件ID, 詞頻)` 串列本身也應該按照文件ID進行排序 (例如 `(文件1, 2)` 在 `(文件2, 2)` 之前)。
* **問題**:若直接在Reducer內部對大量值進行排序 (例如一個字詞出現在1萬個文件中),可能導致**記憶體溢位 (memory overflow)**,因為所有值都需要載入記憶體。這種做法是**不可擴展的 (unscalable)**。
* **解決方案:次要排序 (Secondary Sort)**:
* 這是一種標準的MapReduce技巧,核心是建立**複合鍵 (Composite Key)**。
* 將原本的鍵 (字詞) 和要排序的值 (文件ID) 組合在一起作為新的鍵,例如 `(字詞, 文件ID)`,然後讓MapReduce框架對這個複合鍵進行排序。
* 例如,`fish` 在文件1出現,`fish` 在文件9出現,則鍵為 `(fish, 1)` 和 `(fish, 9)`。
* MapReduce框架會自動先按字詞排序,再按文件ID排序,確保 `(fish, 1)` 在 `(fish, 9)` 之前。
* **自訂分組函數 (Custom Grouping Function)**:為了確保所有屬於同一個「字詞」的複合鍵 (例如 `(fish, 1)` 和 `(fish, 9)`) 仍然被送到**同一個Reducer**處理,需要自訂分組函數,讓它只根據複合鍵的**前半部分 (字詞)** 進行分組。
* 這樣,Reducer收到的值已經是按文件ID排序的,Reducer只需逐一輸出,無需將所有值載入記憶體,從而實現可擴展性並避免記憶體溢位。
### **相似度計算的離線/線上區分**
* 雖然MapReduce適合建立反向索引 (離線),但**不適合**用於線上即時的**查詢相似度計算**。
* 原因:MapReduce是批次處理系統,啟動和調度Mapper/Reducer任務的**開銷 (overhead)** 很高 (可能需30秒甚至幾分鐘),不適合互動式、毫秒級的即時查詢。
* 通常會將建好的反向索引存入**資料庫**,並使用**記憶體內處理 (in-memory processing)** 或傳統資料庫查詢方式,來快速計算查詢結果。
* 線上查詢時,只涉及少量資料 (與查詢字詞相關的索引條目) 和跳躍式存取,這些行為不適合MapReduce的設計。
### **MapReduce的限制與高階抽象:Hive與Pig**
* MapReduce雖然能解決巨量資料處理問題,但存在明顯缺點:
* **程式模型僵硬**:只能依循Map和Reduce的固定模式。複雜任務常需要串接多個MapReduce Job,程式碼不直觀,管理複雜。
* **過度依賴磁碟I/O**:每個MapReduce Job的輸入輸出都需要寫入磁碟。在多個Job串接時,會產生大量的「不必要」磁碟I/O,拖累效能。
* **不適合迭代運算 (Iterative Computations)**:例如機器學習演算法,每一步迭代都可能是一個獨立的Job,導致重複的硬碟I/O和複雜的Job管理。
* **不適合互動式資料分析 (Interactive Data Analysis)**:用戶期待快速回應,但MapReduce的硬碟I/O導致回應時間過長,無法進行即時互動式分析。
* 為了解決MapReduce程式模型複雜的問題,並讓資料分析師更容易使用,開發了更高階的抽象語言:
* **Hive**:提供類似SQL的查詢語言 (HQL),讓用戶可以用SQL語法進行資料分析。HQL查詢會被編譯成一系列的MapReduce Job。
* **Pig**:提供一種稱為Pig Latin的資料流 (data flow) 腳本語言。Pig Latin腳本也將被編譯成MapReduce Job。它特別適合描述資料經過多個轉換階段的資料處理流程。
* Hive和Pig本質上是**語言轉換器 (language translator)** 或編譯器,將高階語言翻譯成MapReduce程式,在保留MapReduce可擴展性的同時,提供更友善的程式設計介面。
### **Spark:解決MapReduce的迭代與互動式限制**
* 在MapReduce廣泛應用約一兩年後,出現了**Spark**,在解決MapReduce在**迭代運算**和**互動式資料處理**方面的不足。
* 這類任務包括機器學習、圖形處理、串流分析以及類似SQL的查詢分析。
* **核心理念:記憶體內運算 (In-memory Computing)**:
* 與MapReduce將中間資料寫入硬碟不同,Spark會將資料**快取 (cache)** 在叢集節點的**記憶體 (memory)** 中。
* 它建立了一個**分散式共享記憶體層 (distributed shared memory layer)**,讓資料可以被重複使用。這顯著加快了迭代和互動式計算的速度。
* **程式設計模型:函數式程式設計 (Functional Programming) 與Scala**:
* MapReduce的程式模型較為受限 (map, reduce函數)。
* Spark將其泛化,採用更通用的**函數式程式設計**概念。
* **Scala**:Spark選擇支援的語言,它是一種可擴展的 (scalable) 函數式程式設計語言。
* 函數式程式的**無狀態性 (stateless)** 特點使其非常適合平行計算和分散式計算。
* Spark本質上是Scala的分散式執行引擎,讓Scala程式能在叢集上擴展執行。
* 這允許在**單一程式**中編寫更複雜的資料流或資料處理流程,而不是需要串接多個獨立的Job。
* 例如,WordCount在Spark/Scala中可以用兩行程式碼簡潔地完成,比MapReduce更易寫和管理。
* **彈性分散式資料集 (Resilient Distributed Datasets, RDD)**:
* Spark最核心的資料抽象。
* RDD是一個**分散式資料集**,類似於MapReduce的HDFS資料區塊,但它儲存在各個節點的**記憶體中**。
* **彈性 (Resilient)**:即使叢集中的任一節點故障,RDD也能夠被重建。
* **不可變性 (Immutable)**:RDD一旦被創建,其值就不能改變。每次資料轉換 (transformation) 都會產生一個新的RDD。
* **血緣 (Lineage / DAG)**:Spark會追蹤每個RDD是如何通過一系列轉換操作 (transformation operations) 創建的。這形成了一個有向無環圖 (Directed Acyclic Graph, DAG)。當某個RDD分區丟失時,Spark可以利用這個血緣圖從頭或從上一個仍在記憶體中的RDD重新計算,從而實現容錯。
* **惰性求值 (Lazy Evaluation)**:RDD的轉換操作並不會立即執行,只有當遇到一個**行動操作 (Action Operation)** (如 `count()`, `reduce()`, `collect()`, `save()`) 時,整個計算鏈才會被一次性執行和實體化 (materialize)。
* 優點:Spark可以在執行前對整個DAG進行優化,減少不必要的計算和中間結果的儲存,從而節省記憶體。
* **快取 (Cache)**:RDD可以被明確地**快取**在記憶體中,以便後續的重複使用。第一次計算可能沒有顯著加速,但之後的相同計算將直接從記憶體讀取,避免硬碟I/O和前處理,顯著提升速度。
* **Spark運作範例 (Log Mining)**:
* 讀取日誌文件 (`textFile`) -> 產生 `lines` RDD (此時未執行,惰性求值)。
* 篩選錯誤訊息 (`filter`) -> 產生 `errors` RDD (未執行)。
* 提取訊息內容 (`map`) -> 產生 `messages` RDD (未執行)。
* 呼叫 `cache()` 在 `messages` RDD 上 (標記要快取,但仍未執行)。
* 呼叫 `count()` 在 `messages` RDD 上 (這是一個**行動操作**)。此時,Spark才會執行整個DAG:從磁碟讀取、過濾、提取,並將 `messages` RDD的各個分區快取到各個Worker節點的記憶體中,最後執行計數。
* 後續任何針對 `messages` RDD 的操作 (如再次 `count()`),都將直接從記憶體快取中讀取資料,無需重新從磁碟讀取或執行前處理,大大加速。
* **Spark挑戰**:
* **堆疊溢位 (Stack Overflow)**:如果RDD的血緣鏈過長 (即經過太多轉換操作而沒有快取中斷),在發生故障需要重新計算時,可能會導致堆疊溢位。因此,程式設計師需要策略性地決定何時呼叫 `cache()` 來縮短血緣鏈。
* **Spark生態系統**:
* 與HDFS整合,可以讀取HDFS上的資料。
* 其記憶體內運算能力使其能處理多種類型的應用程式,包括:**SQL查詢** (比Hive/Pig更快)、**串流處理** (通過微批次處理實現時效性)、**機器學習** (因迭代特性而非常適合,目前主流)、**圖形處理**。
* Spark是一個非常重要且功能強大的分散式運算框架,在機器學習和深度學習領域應用廣泛 (如SparkNet)。