教材:10710周志遠教授平行程式 https://www.youtube.com/playlist?list=PLS0SUwlYe8cxqw70UHOE5n4Lm-mXFXbZT 20250916 筆記 內容可能有錯僅供參考 在這個章節開始投影片和10710周志遠教授平行程式有差異,接下來以 NTHU-PP-2021 (Chinese) 為主 [NTHU-PP-Chap10-Big Data-Part3](https://www.youtube.com/watch?v=dKx06kwUpns&list=PLQTDfc8kgjhMKtgumyK0gmEelnTtqJGsp&index=24&t=32s) 搭配 22C~22D Distributed Computing for Big Data Using MapReduce 24A~24B Information Retrieval Algorithms 今日大綱 1. MapReduce應用與資訊檢索 (Information Retrieval, IR) 2. 資訊檢索系統架構 3. 排序檢索的相似度計算:TF-IDF 4. MapReduce的限制與高階抽象:Hive與Pig 5. Spark:解決MapReduce的迭代與互動式限制 ### **MapReduce應用與資訊檢索 (Information Retrieval, IR)** * 我們已經討論過MapReduce程式模型及其運作過程。 * 接下來將探討MapReduce如何應用於處理巨量資料 (big data) 的實際問題。 * MapReduce最初開發主要是為了解決**資訊檢索 (Information Retrieval, IR)** 相關問題,特別是針對文字資料,例如Google早期處理和搜尋網頁內容。 * IR領域建立已久,應用廣泛,從大量資訊中提取所需資訊。 * IR的基本問題是用戶輸入**查詢 (query)**,描述所需的**概念**。 * 同時,被搜尋的**文件 (document)** 也包含許多概念,需用文字來描述。 * 目標是根據查詢的概念表示與文件概念表示的相似度,找到用戶所需的資訊。 * 這需要定義、量測和計算概念之間的相似度。 ### **資訊檢索系統架構** * 典型的網頁或文件搜尋系統架構:用戶提交查詢 (query),系統在大量文件 (documents) 中找到符合查詢的文件。 * **文件表示 (Document Representation)**:文件需透過某種「表示函數」來描述其內容,這是IR的一個核心問題。 * **索引 (Index)**:為加速搜尋,系統通常會建立索引,就像電話簿或字典一樣,透過關鍵字快速找到目標資訊。 * 查詢也需要透過類似的表示函數來描述,以方便文件與查詢之間的比較判斷。 * **計算的時間點 (Computational Aspects)**: * **索引建立 (Indexing)**:通常是**離線 (offline)** 進行的。因為建立索引需要花費大量時間(編排文字、建立資料結構),文件數量越多、規模越大,所需時間越長,這是一個標準的巨量資料問題。MapReduce非常適合處理這種需要大量資料處理的**批次處理 (batch processing)** 任務,因此最適合用於建立索引的計算。 * **查詢處理 (Querying)**:通常是**線上 (online)** 進行的,用戶期望立即獲得搜尋結果。時間效率非常重要,這也是需要索引來加速搜尋的原因。 ### **文件表示方法:詞袋模型 (Bag of Words)** * 第一步是文件表示 (representation)。 * 最簡單的表示方式是**詞袋模型 (Bag of Words, BoW)**。 * BoW模型完全不考慮語義、斷詞或詞序,將文件拆解成許多獨立的字詞 (words),並以此進行搜尋。 * 儘管簡單,但它足夠有效率和效果來解決IR問題。 * **BoW模型的前提假設**:每個字詞都是獨立的,不考慮字詞之間的關係或進一步分析。 * **文字資料前處理步驟 (Preprocessing)**: 1. **分詞 (Tokenization)**:將文字切分成一個個的獨立詞元 (tokens),例如斷句或將複合詞拆開。 2. **停用詞移除 (Stop Word Removal)**:移除對搜尋意義不大或頻繁出現的字詞 (如「的」、「是」、「a」、「an」) 和標點符號,以減少搜尋次數。 3. **正規化 (Normalization)**:將同一個字的不同形式 (如單複數、不同時態) 或不同語言的對應詞,映射到單一的標準字詞上。 * BoW模型不考慮語法、詞義或其延伸意義。 * 這三個前處理步驟非常適合使用MapReduce的Mapper功能來實現,Mapper可以處理每一行文字並輸出所需的詞元。 ### **反向索引 (Inverted Index)** * 資訊檢索中常用來加速搜尋的索引是**反向索引 (Inverted Index)**。 * 「反向」的意思是透過**字詞 (word) 或關鍵字**來反向搜尋並對應到**文件ID (document ID)**,而非從文件內容找字詞。 * **查詢檢索類型 (Retrieval Types)**: * **布林檢索 (Boolean Retrieval)**:最簡單的方式,使用布林運算符 (AND, OR, NOT) 來決定是否檢索文件。 * 優點:結果明確,文件要麼符合,要麼不符合。 * 缺點:無法對符合條件的文件進行排序或分級,且過於嚴謹,可能錯過稍有差異但相關的文件。 * **排序檢索 (Ranked Retrieval)**:更常見的方式,會根據相關性高低對檢索結果進行排序 (ranking)。 * 優點:對於查詢不明確的情況更實用;可以先顯示相關性最高的結果,並在後台逐步計算並顯示其他較不相關的結果,從而隱藏搜尋成本。 * 缺點:定義和量化文件與查詢之間的相似度 (similarity) 或接近度 (closeness) 更加困難,特別是在詞袋表示法下。 ### **排序檢索的相似度計算:TF-IDF** * 相似度計算通常使用**向量空間模型 (Vector Space Model)**。 * 文件和查詢都可以表示為字詞向量 (vector),向量的維度是所有獨特字詞的數量。 * 向量中的值表示字詞是否出現 (0/1) 或出現頻率。 * **相似度量測**:透過計算文件向量和查詢向量之間的夾角 (angle) 來衡量。夾角越小,表示越接近。 * **餘弦相似度 (Cosine Similarity)** 常用於計算向量夾角,數學上涉及內積和範數計算。 * 為簡化計算,有時會使用**內積 (inner product)** 作為相似度的近似值。 * **字詞權重 (Term Weighting)**:單純的內積可能無法反映字詞的真實重要性,因此引入字詞權重。 * **局部權重 (Local Weight)**: * **詞頻 (Term Frequency, TF)**:一個字詞在**單一文件**中出現的頻率。 * TF越高,表示該字詞對該文件越重要 (例如,關於「狗」的文章中「狗」字會頻繁出現)。 * TF與重要性呈**正相關**。 * **全域權重 (Global Weight)**: * **文件頻率 (Document Frequency, DF)**:一個字詞在**所有文件**中出現的機率。 * DF越高,表示該字詞對區分不同文件的重要性越低 (例如,常見詞如「我」、「是」在所有文件中都出現,就沒有區分度)。 * DF與重要性呈**負相關** (稱為逆文件頻率 Inverse Document Frequency, IDF)。 * **TF-IDF (Term Frequency-Inverse Document Frequency)**: * Google早期建立搜尋引擎時,採用了著名的TF-IDF加權方法。 * TF-IDF結合了局部TF的正相關性與全域DF的負相關性。 * TF-IDF的定義通常為 `TF_ij * IDF_i`。 * `TF_ij` 指的是字詞 `i` 在文件 `j` 中的詞頻。 * `IDF_i` 指的是字詞 `i` 的逆文件頻率,通常計算為 `log(N / DF_i)`,其中 `N` 是總文件數,`DF_i` 是字詞 `i` 出現的文件數。 * 如果字詞 `i` 在所有 `N` 個文件中都出現 (`DF_i = N`),則 `IDF_i` 為 `log(1) = 0`,該字詞的權重為0,表示不重要。 * 如果 `DF_i` 越小,`IDF_i` 越大,表示該字詞越重要。 * 這種概念設計類似於機器學習中的特徵工程。 ### **使用MapReduce建構反向索引** * **反向索引的資料結構**: * 為節省空間,反向索引通常以連結串列 (linked list) 或稀疏陣列的方式儲存,而非完整的二維陣列。 * **鍵 (Key)**:搜尋的字詞 (word)。 * **值 (Value)**:一個包含「文件頻率 (DF)」和一個串列 (list) 的結構,串列中包含多個 `(文件ID, 詞頻)` 的配對,表示該字詞在哪些文件中出現了多少次。 * 例如,字詞 "blue" -> DF: 1, `[(文件2, 1)]`;字詞 "fish" -> DF: 4, `[(文件1, 2), (文件2, 2), (文件3, 1), (文件4, 1)]`。 * **MapReduce建構反向索引的流程**: * **Mapper功能**: * 每個Mapper負責處理一份文件 (或文件的一部分)。 * 讀取文件內容,進行分詞、停用詞移除、正規化等前處理。 * 對於文件中出現的每個字詞,Mapper會輸出一個中間鍵值對 `(字詞, (文件ID, 詞頻))`。例如,對於文件1中的 "fish" 出現2次,Mapper輸出 `(fish, (1, 2))`。 * **Shuffle與Sort階段**: * MapReduce框架會根據鍵 (字詞) 對所有Mapper的中間輸出進行**分組 (grouping)** 和**排序 (sorting)**。 * 所有相同字詞的鍵值對會被匯總在一起,並送到同一個Reducer。 * 字詞排序有助於查詢時透過二分搜尋等方式快速定位。 * **Reducer功能**: * 每個Reducer接收到一個字詞及其對應的 `(文件ID, 詞頻)` 串列,例如 `(fish, [(1, 2), (2, 2), (3, 1), (4, 1)])`。 * Reducer計算該字詞的**文件頻率 (DF)** (即串列中不同文件ID的數量)。 * Reducer將這些 `(文件ID, 詞頻)` 配對以及DF值組合成最終的反向索引條目並輸出。 * **優化:值排序 (Sorting Values)** * 為了在查詢時更有效率,Reducer輸出的 `(文件ID, 詞頻)` 串列本身也應該按照文件ID進行排序 (例如 `(文件1, 2)` 在 `(文件2, 2)` 之前)。 * **問題**:若直接在Reducer內部對大量值進行排序 (例如一個字詞出現在1萬個文件中),可能導致**記憶體溢位 (memory overflow)**,因為所有值都需要載入記憶體。這種做法是**不可擴展的 (unscalable)**。 * **解決方案:次要排序 (Secondary Sort)**: * 這是一種標準的MapReduce技巧,核心是建立**複合鍵 (Composite Key)**。 * 將原本的鍵 (字詞) 和要排序的值 (文件ID) 組合在一起作為新的鍵,例如 `(字詞, 文件ID)`,然後讓MapReduce框架對這個複合鍵進行排序。 * 例如,`fish` 在文件1出現,`fish` 在文件9出現,則鍵為 `(fish, 1)` 和 `(fish, 9)`。 * MapReduce框架會自動先按字詞排序,再按文件ID排序,確保 `(fish, 1)` 在 `(fish, 9)` 之前。 * **自訂分組函數 (Custom Grouping Function)**:為了確保所有屬於同一個「字詞」的複合鍵 (例如 `(fish, 1)` 和 `(fish, 9)`) 仍然被送到**同一個Reducer**處理,需要自訂分組函數,讓它只根據複合鍵的**前半部分 (字詞)** 進行分組。 * 這樣,Reducer收到的值已經是按文件ID排序的,Reducer只需逐一輸出,無需將所有值載入記憶體,從而實現可擴展性並避免記憶體溢位。 ### **相似度計算的離線/線上區分** * 雖然MapReduce適合建立反向索引 (離線),但**不適合**用於線上即時的**查詢相似度計算**。 * 原因:MapReduce是批次處理系統,啟動和調度Mapper/Reducer任務的**開銷 (overhead)** 很高 (可能需30秒甚至幾分鐘),不適合互動式、毫秒級的即時查詢。 * 通常會將建好的反向索引存入**資料庫**,並使用**記憶體內處理 (in-memory processing)** 或傳統資料庫查詢方式,來快速計算查詢結果。 * 線上查詢時,只涉及少量資料 (與查詢字詞相關的索引條目) 和跳躍式存取,這些行為不適合MapReduce的設計。 ### **MapReduce的限制與高階抽象:Hive與Pig** * MapReduce雖然能解決巨量資料處理問題,但存在明顯缺點: * **程式模型僵硬**:只能依循Map和Reduce的固定模式。複雜任務常需要串接多個MapReduce Job,程式碼不直觀,管理複雜。 * **過度依賴磁碟I/O**:每個MapReduce Job的輸入輸出都需要寫入磁碟。在多個Job串接時,會產生大量的「不必要」磁碟I/O,拖累效能。 * **不適合迭代運算 (Iterative Computations)**:例如機器學習演算法,每一步迭代都可能是一個獨立的Job,導致重複的硬碟I/O和複雜的Job管理。 * **不適合互動式資料分析 (Interactive Data Analysis)**:用戶期待快速回應,但MapReduce的硬碟I/O導致回應時間過長,無法進行即時互動式分析。 * 為了解決MapReduce程式模型複雜的問題,並讓資料分析師更容易使用,開發了更高階的抽象語言: * **Hive**:提供類似SQL的查詢語言 (HQL),讓用戶可以用SQL語法進行資料分析。HQL查詢會被編譯成一系列的MapReduce Job。 * **Pig**:提供一種稱為Pig Latin的資料流 (data flow) 腳本語言。Pig Latin腳本也將被編譯成MapReduce Job。它特別適合描述資料經過多個轉換階段的資料處理流程。 * Hive和Pig本質上是**語言轉換器 (language translator)** 或編譯器,將高階語言翻譯成MapReduce程式,在保留MapReduce可擴展性的同時,提供更友善的程式設計介面。 ### **Spark:解決MapReduce的迭代與互動式限制** * 在MapReduce廣泛應用約一兩年後,出現了**Spark**,在解決MapReduce在**迭代運算**和**互動式資料處理**方面的不足。 * 這類任務包括機器學習、圖形處理、串流分析以及類似SQL的查詢分析。 * **核心理念:記憶體內運算 (In-memory Computing)**: * 與MapReduce將中間資料寫入硬碟不同,Spark會將資料**快取 (cache)** 在叢集節點的**記憶體 (memory)** 中。 * 它建立了一個**分散式共享記憶體層 (distributed shared memory layer)**,讓資料可以被重複使用。這顯著加快了迭代和互動式計算的速度。 * **程式設計模型:函數式程式設計 (Functional Programming) 與Scala**: * MapReduce的程式模型較為受限 (map, reduce函數)。 * Spark將其泛化,採用更通用的**函數式程式設計**概念。 * **Scala**:Spark選擇支援的語言,它是一種可擴展的 (scalable) 函數式程式設計語言。 * 函數式程式的**無狀態性 (stateless)** 特點使其非常適合平行計算和分散式計算。 * Spark本質上是Scala的分散式執行引擎,讓Scala程式能在叢集上擴展執行。 * 這允許在**單一程式**中編寫更複雜的資料流或資料處理流程,而不是需要串接多個獨立的Job。 * 例如,WordCount在Spark/Scala中可以用兩行程式碼簡潔地完成,比MapReduce更易寫和管理。 * **彈性分散式資料集 (Resilient Distributed Datasets, RDD)**: * Spark最核心的資料抽象。 * RDD是一個**分散式資料集**,類似於MapReduce的HDFS資料區塊,但它儲存在各個節點的**記憶體中**。 * **彈性 (Resilient)**:即使叢集中的任一節點故障,RDD也能夠被重建。 * **不可變性 (Immutable)**:RDD一旦被創建,其值就不能改變。每次資料轉換 (transformation) 都會產生一個新的RDD。 * **血緣 (Lineage / DAG)**:Spark會追蹤每個RDD是如何通過一系列轉換操作 (transformation operations) 創建的。這形成了一個有向無環圖 (Directed Acyclic Graph, DAG)。當某個RDD分區丟失時,Spark可以利用這個血緣圖從頭或從上一個仍在記憶體中的RDD重新計算,從而實現容錯。 * **惰性求值 (Lazy Evaluation)**:RDD的轉換操作並不會立即執行,只有當遇到一個**行動操作 (Action Operation)** (如 `count()`, `reduce()`, `collect()`, `save()`) 時,整個計算鏈才會被一次性執行和實體化 (materialize)。 * 優點:Spark可以在執行前對整個DAG進行優化,減少不必要的計算和中間結果的儲存,從而節省記憶體。 * **快取 (Cache)**:RDD可以被明確地**快取**在記憶體中,以便後續的重複使用。第一次計算可能沒有顯著加速,但之後的相同計算將直接從記憶體讀取,避免硬碟I/O和前處理,顯著提升速度。 * **Spark運作範例 (Log Mining)**: * 讀取日誌文件 (`textFile`) -> 產生 `lines` RDD (此時未執行,惰性求值)。 * 篩選錯誤訊息 (`filter`) -> 產生 `errors` RDD (未執行)。 * 提取訊息內容 (`map`) -> 產生 `messages` RDD (未執行)。 * 呼叫 `cache()` 在 `messages` RDD 上 (標記要快取,但仍未執行)。 * 呼叫 `count()` 在 `messages` RDD 上 (這是一個**行動操作**)。此時,Spark才會執行整個DAG:從磁碟讀取、過濾、提取,並將 `messages` RDD的各個分區快取到各個Worker節點的記憶體中,最後執行計數。 * 後續任何針對 `messages` RDD 的操作 (如再次 `count()`),都將直接從記憶體快取中讀取資料,無需重新從磁碟讀取或執行前處理,大大加速。 * **Spark挑戰**: * **堆疊溢位 (Stack Overflow)**:如果RDD的血緣鏈過長 (即經過太多轉換操作而沒有快取中斷),在發生故障需要重新計算時,可能會導致堆疊溢位。因此,程式設計師需要策略性地決定何時呼叫 `cache()` 來縮短血緣鏈。 * **Spark生態系統**: * 與HDFS整合,可以讀取HDFS上的資料。 * 其記憶體內運算能力使其能處理多種類型的應用程式,包括:**SQL查詢** (比Hive/Pig更快)、**串流處理** (通過微批次處理實現時效性)、**機器學習** (因迭代特性而非常適合,目前主流)、**圖形處理**。 * Spark是一個非常重要且功能強大的分散式運算框架,在機器學習和深度學習領域應用廣泛 (如SparkNet)。