Try   HackMD

MapReduce with POSIX Thread

contributed by <HaoTse>, <shelly4132>, <HahaSula>, <nekoneko>, <Fzzzz>, <abba123>


相關連結

待解決

  • 完成 lock free thread pool
  • 在此版本的 mapreduce 上應用 merge-sort
  • Unsolved problem 1:產生 queue full 並重新 add 的效率問題
  • Unsolved problem 2:精簡質數演算法的判斷式
  • Unsolved problem 3:解決 data race
  • 閱讀參考資料中的 Misco-distributed computing framework designed for mobile devices

預期目標

  • 研究給定的 MapReduce 程式碼,思考 thread pool 設計的驗證和效能分析,提出改善機制
    • 改寫為 lock-free 的 thread pool,甚至可以重新實作程式碼
  • 找出 MapReduce 的應用,將其中具體而微的案例,在給定的程式碼基礎上重新實作

使用工具

  • Github
    第一次使用 github 多人開發,把30 天精通 Git 版本控管研讀一次。
  • astyle
    ln -sf ../../scripts/pre-commit.hook .git/hooks/pre-commit
  • llvm & clang

What is mapreduce

  • 一種軟體框架 (software framework)
  • GOOGLE 提出
  • 為大量資料 (大於1TB) 做平行運算處理
  • 概念主要是 映射函數(Map)歸納函數(Reduce) 兩種
  • 運用在開源的雲端技術 Hadoop 中

映射函數(Map)

對一些獨立元素組成的概念上的列表的每一個元素進行指定的操作。
例如一個成績列表,有人發現所有學生的成績都被高估了一分,他可以定義一個「減一」的映射函數,用來修正這個錯誤。
事實上,每個元素都是被獨立操作的,而原始列表沒有被更改,因為這裡創建了一個新的列表來保存新的答案,這就是說,Map操作是可以高度並行的,這對高性能要求的應用以及並行計算領域的需求非常有用。

應用案例呢? jserv


歸納函數(Reduce)

對一個列表的元素進行適當的合併。
例如想知道班級的平均分,可以定義一個歸納函數,通過讓列表中的奇數(odd)或偶數(even)元素跟自己的相鄰的元素相加的方式把列表減半,如此遞歸運算直到列表只剩下一個元素,然後用這個元素除以人數,就得到了平均分。


Dataflow







g



輸入

輸入



分割

分割



輸入->分割





1

1



分割->1





2

2



分割->2





3

3



分割->3





4

4



分割->4





...

...



分割->...





N

N



分割->N





合併

合併



1->合併





2->合併





3->合併





4->合併





...->合併





N->合併





輸出

輸出



合併->輸出






Example


論文閱讀

摘要

利用特定的 map function 將一組 key/value 轉換成一些 intermediate key/value。
map (in_key, in_value) -> list(out_key, intermediate_value)
而 reduce function 則是將擁有同樣 intermediate key 的 value 合併在一起。
reduce (out_key, list(intermediate_value)) -> list(out_value)
許多現實世界的工作都可以套用這個model。

這邊有一個蠻生動的比喻:http://www.aprilzephyr.com/blog/04242016/我是如何向老婆解釋MapReduce的?(轉)/ gnitnaw

MapReduce提供:

  • Automatic parallelization and distribution
  • Fault-tolerance
  • I/O scheduling
  • Status and monitoring

Introduction

這部份分兩部份,前半段筆者簡述一下在某些使用情境下,需要更高階、抽象的框架去隱藏處理大量資料所面臨到的複雜問題,後半段摘要後面每個章節所要介紹的內容。

  • 筆者們在 google 工作時,常常需要對資料做大量的處理,像是 inverted indices 、 呈現 web documents 之間圖的關係。
  • 而原本這些運算都是很直覺性得好處理,但面對到大量資料與分散資料到數千台電腦做運算的情境下,往往需要處理許多繁複的問題,像是資料的分佈,或當錯誤發生時,仍然可以確保資料原有的完整性,等等。
  • 因此,筆者們提出一個新的 framework 處理以下複雜的問題,去簡化資料處理的過程。
    • messy details of parallelization
    • fault-tolerance
    • data distribution
    • load balancing

Programming Model

  • Map : 使用者自訂 map function ,產生 intermideate key/value paris 。
  • Map/Reduce library 會將相同 key 的 intermideate pairs 合併,傳入 Reduce function 。
  • Reduce : 使用者自訂函式,傳入 key 值和相同 key 值的一組 values 。使用者需自訂 Reduce 的行為,通常產生的結果為零或一個的值( value )。

Examples

以在大量的檔案中尋找每個單字出現的次數為例:

map function 在每一個單字出現時賦予那個單字1,也就是他出現一次,reduce function 則是將每個單字各自出現的次數加在一起。

map(String key, String value):
    // key: document name
    // value: document contents
    for each word w in value:
        EmitIntermediate(w, "1");
reduce(String key, Iterator values):
    // key: a word
    // values: a list of counts
    int result = 0;
    for each v in values:
        result += ParseInt(v);
    Emit(AsString(result));

其他應用

  • Distributed Grep
    • 搜尋給定的 pattern。
  • Count of URL Access Frequency
    • 計算網站被造訪次數。
  • Reverse Web-Link Graph
    • web-link graph 指的是當 URL1 可以連結到 URL2 時,他們之間就會有一條從 URL1 連到 URL2 的邊,而 reverse web-link graph 就是將所有邊的反向。
  • Term-Vector per Host
    - term vector 是將一個或多個檔案裡最重要的單字用 <word, frequency> 的形式一組一組放進list裡面。
  • Inverted Index
    • 一種索引方法,用來索引某個單字出現在哪個檔案裡面。
  • Distributed Sort
    • 對大量的 records 做排序
    • map function : <key, record> ,其中 key 是用 records 產生。
    • reduction function : 不做任何跟動,單純產生 output ,通常稱之為 identy function
    • 牽涉到 4.1 partiton function 和 4.2 ordering guarantees 。
  • Document Clustering
  • Machine Learning
  • Statistical Machine Translation

斜體部份為論文中未提及


Implementation

  • 論文有提到 Map/Reduce 架構針對背後硬體環境的不同而有所不一樣的實做 。
  • Google 設計的 Map/Recue 是建立在電腦叢集上的軟硬體架構。

Execution Overview

喚起 Map 的時候會自動將 input data 分成 M 個 split,分散在多台機器上,這些 splits 可以被不同機器平行地處理。透過使用者自己定義的 partitioning function 可以將 intermediate key 分成 R 個片段,讓 Reduce 分散去處理。

上圖為執行 MapReduce 時整體的流程,以下將透過他們編號去分別解釋:

  1. 在 user program 中 MapReduce Library 會先將 input file 分成 M 個片段,接著就會在一群機器上開始複製很多 program。
  2. 複製的program裡面有一個比較特別的叫 Master,其他都是 worker,Master 會分派工作給空閒的 worker,總共會有 M 個 Map 的工作和 R 個 Reduce 的工作要分配。
  3. 被指派到 Map 工作的 worker 會去讀取對應的 split 將 key/value 讀取出來丟到使用者定義的 Map function 裡面,而由此輸出的 intermediate key/value 將會被暫存在記憶體中。
  4. 那些在記憶體中的 key/value 會被定期寫入 local disk 中,並透過 partitioning function 被分成 R 個區域,而他們所在的區域會被傳回給 Master,Master 之後便會將這些訊息傳達給 Reduce function。
  5. 當 reduce worker 被 Master 告知那些區域後,他便會透過遠端程序呼叫去讀取 local disk 裡的資料。在 reduce worker 將所有 intermediate data 都讀入後,他會利用 intermediate key 將那些 data 去做排序,這樣就能將擁有相同 key 的 data 集合在一起。
  6. reduce worker 會將 intermediate key 與他所對應的所有 value 丟進 reduce function 裡面,而輸出的結果則會被加到一個 final output file裡。
  7. 當所有 map 和 reduce 的工作都被完成後,Master 就會喚醒 user program。

在一切都完整的結束後,MapReduce 的結果可以在那 R 個 output file 裡面找到,通常使用者不需要將那些結果再合併成一個檔案,而是直接將這些結果再傳入另一個 MapReduce call 裡,或是將他們放入另一個分散式系統裡去處理。


Master Data Structure

  • master data structure 其中紀錄每個 map , reduce tasks 的
    • 狀態( status ),狀態有三種︰ idle , in-progress , completed
    • 所在的 worker machine
  • intermediate 資料的位置會經由 master 從 map tasks 傳給 reduce tasks
  • master 儲存 R 個由 map tasks 產生的 intermediate 資料的位置和大小( completed tasks ),將這些資訊逐步地丟給有執行狀態為in-progress的 reduce tasks 的 worker machine 。

Fault Tolerance

因為 MapReduce 是設計來處理大量的資料在數百或數千的機器上,因此他必須能在錯誤發生時好好的處理。

Worker Failure

Master 會定期去向 workers 要回應,如果在一定的時間內 worker 沒有回應的話,那個 worker 就會被標示為 failed。而被這些標示為 failed 的 worker 所完成或正在進行的 map 與正在進行的 reduce task 則會被標示為 idle,重新進入排程。

  • map task 不管有沒有完成都需要被重新執行的原因是因為他的輸出結果是存在那個 failed 的機器上無法被存取。
  • 已完成的 reduce task 因為他的輸出結果是存在 global file system 上所以不需要重新執行。
Master Failure

可以定期設置 checkpoint 紀錄 Master 的狀態,當 Master 出問題時便可從上一個 checkpoint 的狀態開始執行。但論文裡提到當只有一個 Master 的時候,幾乎不太會有 Master 出問題的時候,因此他們選擇當 Master failed 的時候直接終止整個 MapReduce。


Semantics int the Presence of Failures
  • 假如使用者自訂的函式為 deterministic function ,則 map/reduce framework 輸出的結果( output )與原本單一循序執行的程式的結果是相同的 。
  • 原因是在於 map 和 reduce tasks 產生輸出檔( output files ) 的行為是 atomic commits
    • 每個 in-progress state 的 task 都會將結果寫入私自的暫存檔( private temporary file ) 。

    • map task 產生 R 個暫存檔, reduce task 產生 1 個

    • 當 map task 執行完畢時,傳訊息給 master ,其中包含 R 個暫存檔的檔名。當 master 從執行完畢的 map task 收到訊息時,不會儲存傳來的 R 個暫存檔檔名。相反的,則會紀錄檔名。

      不了解原文提到的 Otherwise, it records the names of R files in a master data structure.otherwise 是指何種情況下。nekoneko

    • reduce task 執行完畢時,會將暫存檔檔名改為最終的輸出檔名。但是為了防止同時會有多個 reduce task 更改檔名為最終輸出檔檔名,map/reduce 上的檔案系統支援 atomic rename operation

  • map 和 reduce 函式為 non-deterministic function的情況下,仍可以使整個程式符合一個 weaker semantic
  • non-deterministic function 的情況下,reduce task R1 產生的結果等價於另一個 non-deterministic 程式的結果;而 reduce task R2 產生的結果卻等價於另一個不同的 non-deterministic 程式的結果。
  • 例如 : 若有兩個 reduce task R1 , R2 會讀取 map task M 輸出的值, 最終可能讀到兩個不同執行過程的 task M 的結果。

atomic commits

determistic function , non-determistic function

  • 無論任何時候,傳入相同的 input value 必定得到相同的結果
  • 反之,non-determistic function 傳入相同的值時,可能會得到不同的結果,例如亂數產生的函式 rand()
  • 參考資料 DETERMINISTIC Functions , deterministic algorithms

semantics

  • 這裡的 semantics 指的是讀寫上的 semantic,在後面兩段他提到 non-determinstic 的 user defined map/reduce 會產生不同的結果,所以要採取和 determinstic 不同的 semantics
    ( 論文裡並沒有指定要哪種 semantics ,他只說了他們提供比較弱但仍合理的 )
    查了一下 wiki,就常見的 semantics 的強弱順序是 atomic > regular > safe

Locality

  • 網路頻寬是稀少資源因此利用將 input data 存在 local disk 和 GFS (Google File System) ,減少佔用網路頻寬。
  • Google File System 將檔案切成 64 MB 大小的儲存單位( block ) , 並將其副本存在其他 machine 當中,通常副本數量為 3
  • master 將 map task 分配到儲存相對應 data block 的 worker machine 。 若失敗的話, master 會將 map task 分配給其他擁有副本的 worker machine
    • 例如︰分配給在同樣 netwrok switch 上且擁有副本檔案的 worker machine

Task Granularity

  • 理想的情況下, M (map tasks) 和 R (reduce tasks) 的數量要遠大於 work machine 的數量
    • 改善 dynamic load balancing
    • 加速修復 worker machine failed 的時間,原本在 failed work machine 該完成的 map tasks 可以分散給其他的 work machines
  • MapReduce 比較推薦的作法是:M = 200, 000、R = 5, 000, using 2,000 worker machines。

Backup Tasks

在 MapReduce 中讓整體執行時間上升的就是「Straggler」,指的就是一台機器花了比預期時間還要長很多的時間去完成剩下的幾個 map 或 reduce task。會造成這樣的情形有很多原因,比如說較差的 disk 可能會讓讀取效率變差,如果這台機器上又剛好有其他 task 在執行,那資源搶奪的情況就會讓 MapReduce 的執行時間變得更久。

在論文中提到他們使用一個機制去處理這樣的情況,當 MapReduce 已經接近完結時,Master 將還正在執行的 task 進行備份,當不管是原本的還是備份的執行完成時,那個 task 就會被標記為已完成。論文的後面也有舉出實際的例子證明這個機制確實能有效提升 MapReduce 的執行時間。


Refinements

Partitioning Function

  • 依據 map/reduce 的機制, intermediate pair 會分配給 R 個 reduce tasks ,最終產生 R 個輸出檔( 每個 map task 產生一個輸出檔),分配 intermediate pair 的動作是靠 partition function 和傳入的 key 完成。
  • 預設的情況下,為了均衡的 intermediate pair 分佈到每個 reduce tasks , partition functionhash(key) mod R
  • 然而,某些情況下,有比預設 partition function 更合適的選擇。例如︰當 key 為 URLs ,最後希望能將擁有相同 host name 的 URLs 輸出到同一個檔案, partition function 適合用 hash(Hostname(urlkey)) mod R

Ordering Guarantees

每個 partion 裡的 intermediate pairs 會依據 key 值排序成升冪或降冪


Combiner Function

在一些情況下,map function 所產生出來的 intermediate key 具有很高的重複性,且 reduce function 具有交換律與結合律。以前面提到的計算單字出現次數的例子來說,每個 map task 可能會輸出數百數千個 <the, 1> 這樣形式的東西,這些結果會透過網路被送到一個 reduce task 裡面去做總和。可以讓使用者透過自己定義的 combine function 先做部份的結合,再將 map 的結果傳到 reduce,這樣可以有效的提升執行速度。

  • 通常 combine 的程式碼會長的跟 reduce 一樣
  • 差別在於 combiner 的輸出會放入 intermediate file,而 reduce 則是放入 final output file

Section 4 一堆關於refinement的內容怎麼只剩這個? gnitnaw


Input and Output Types

  • Map/Reduce Library 提供讀取不同格式資料的 API 和 輸出不同格式資料的 API

  • Map/Reduce Library 本身有內建幾個 reading interface , 論文裡舉例以下兩種

    • text mode : 從檔案一行一行都資料,產生 key 為該行在檔案的位移量和 value 為該行的內容的 key/value pairs
    • 或是支援讀取一串以 key 排序的 key/value pairs
  • 使用者是可以自訂 reading interface 支援讀取新的資料格式(new input format )

  • reading interface ( reader ) 並沒有限制一定是要從檔案讀取資料,也可以是其他讀取形式,例如︰從資料庫讀取 records 或是程式裡存在記憶體的某個資料結構。

  • writer 同理,如上面所述。

  • 參考資料
    GoogleCloudPlatform/appengine-mapreduce - 3.4 Readers and Writers


Performance

Cluster Configuration

  • Consisted of approximately 1800 machines:
    • 4 GB of memory
    • Dual-processor 2 GHz Xeons with Hyperthreading
    • Dual 160 GB IDE disks
    • Gigabit Ethernet per machine
    • Bisection bandwidth approximately 100 Gbps

Sort

  • 排序 10^10 個 100-byte 的 records,總大小約1TB 。
  • 從 text line 中,擷取 10 byte 作為 sorting key 。
  • 以sorting key 和原本的 text line 作為 intermediate key/value pairs 。
  • Reduce 的部份使用內建函式 Identity ,不更動 intermediate pair 直接輸出成 output pair 。
    以排序 1TB 的資料為例:

a. 一般 MapReduce 的情況,總體執行時間為 891 秒。

b. 未始用 backup task 的情況,在 960 秒之後幾乎所有 task 都已經完成了,剩下 5 個 reduce task 還沒做完,一直到 1283 秒之後才結束整個 MapReduce,比一般情況下的時間多了 44%。

c. machine failure 的情況,總體執行時間為 933 秒,因為那些被 killed 的工作很快地就被重新執行所以執行時間只比一般情況上升了 5%。

該篇論文有無不了解的地方?可以列出來,雖然我不一定能解答,不過有人看就能集思廣益。gnitnaw


理解程式碼

threadpool.[ch]

  • threadpool.h 中定義了 MAX_THREADS,又在 threadpool.c 中定義 THREADS_MAX,不太了解兩者的作用差別。
  • MAX_THREADS 只有在 threadpool.c 裡的 threadpool_create 函式使用到,是用來限制輸入的 thread_count 要介於 0 到 MAX_THREADS 之間。
  • THREADS_MAX 是用來定義 threadpool_map_t 的成員 personal_pointers 的陣列長度和 reduce_t_internal 的成員 elements 的陣列長度。
  • sem_init
    • 第一個參數為 semaphore 的位址,第二個參數為設定 semaphore 是否可讓不同 process 使用,第三個參數為semaphore初始值。
    • 若第二個參數為0表示 semaphore 只能在 process 的 thread 間共用,不能在不同 process 間使用。
    • 多個 thread 不能初始化同一個 semaphore。
    • 不能對其他 thread 正在使用的 semaphore 重新初始化。
    • sem_init() 成功完成後會回傳 0。
  • sem_post
    • 對 semaphore 值加一。
    • 若 semaphore 的值大於一時,會叫醒( wake up )另一個被 sem_wait block 住的執行緒,叫醒的執行緒會重新鎖上 semaphore。
    • sem_post() 成功完成後會回傳 0。
  • sem_wait
    • 對 semaphore 值減一。
    • sem_wait() 成功完成後回傳 0。
  • sem_destroy
    • 銷毀 semaphore。
    • sem_destroy() 成功完成後回傳 0。

還有一個 function 是 sem_trywait() 可能也要研究一下,他能讓你的 semaphore 更有彈性。Yen-Kuan Wu

一個建議:不要假設這個程式的作者一開始就知道自己在寫啥,你如果覺得他的作法莫名其妙就勇敢把它改掉。
有時候老師會故意挑有很大改進空間的code給你們玩。gnitnaw


test/*

test裡面的4個程式都有memory leak的問題,請用valgrind檢查。gnitnaw
感謝提醒,已修正,請見該次 commit
另外對於我有找到 stackoverflow上的 memory leak 的討論,除了mapreduce.c沒有正確釋放需修正外,其他的都屬 reachablefzzz


thrdtest.c

看起來是普通檢視 thread_pool 運作正不正常,task 中只做簡單的 done++ 紀錄做了幾次。

$ ./thrdtest
Pool started with 32 threads and queue size of 256
Added 264 tasks

Did 185 tasks

如果將

while((tasks / 2) > done) { usleep(10000); }

拿掉的話就會發現每次 did 的數量都只有 32,也就是 thread 的數量,這裡可以發現 threadpool_destroy 不會等 queue 的完成。

thrdtest.c 的第33行,有需要使用到lock? 變數task都只有在main函式呼叫

while(threadpool_add(pool, &dummy_task, NULL, 0) == 0) { pthread_mutex_lock(&lock); tasks++; pthread_mutex_unlock(&lock); }

nekoneko

是的,拿掉後用 ThreadSanitizer 檢查沒有出現 data race 的訊息!
賴劭芊

你source code裏面沒更新喔。
還有本程式裡唯一一個被thread更改的變數為done,此變數為計數器性質,
為何不考慮用原子操作(atomic operation)取代mutex exclusion?
原子操作快很多,不過函式中usleep那麼久你大概也看不出來有快多少。gnitnaw


shutdown.c

檢測下列兩者是否正常運作。

  1. immediate shutdownthreadpool_destroy(pool, 0)
  2. graceful shutdownthreadpool_destroy(pool, threadpool_graceful)

heavy.c

開了64個 thread pool 每個 pool 的 queue 的 size 是8192,應該只是在測試 thread 的極限。


mapreduce.c

計算小於 DATASIZE 所有質數的總和。







mapreduce



mapreduce

mapreduce



threadpool_map

threadpool_map



mapreduce->threadpool_map





threadpool_add1

threadpool_add1



threadpool_map->threadpool_add1





threadpool_add2

threadpool_add2



threadpool_map->threadpool_add2





threadpool_add3

threadpool_add3



threadpool_map->threadpool_add3





threadpool_add4

threadpool_add4



threadpool_map->threadpool_add4





threadpool_thread1

threadpool_thread1



threadpool_add1->threadpool_thread1





threadpool_thread2

threadpool_thread2



threadpool_add2->threadpool_thread2





threadpool_thread3

threadpool_thread3



threadpool_add3->threadpool_thread3





threadpool_thread4

threadpool_thread4



threadpool_add4->threadpool_thread4





threadpool_map_thread1

threadpool_map_thread1



threadpool_thread1->threadpool_map_thread1





threadpool_map_thread2

threadpool_map_thread2



threadpool_thread2->threadpool_map_thread2





threadpool_map_thread3

threadpool_map_thread3



threadpool_thread3->threadpool_map_thread3





threadpool_map_thread4

threadpool_map_thread4



threadpool_thread4->threadpool_map_thread4





  • mapreduce.c (is_simple)
    紀錄所有小於 DATASIZE 的質數於 data 裡

    ​void is_simple(int n, void *_data) ​{ ​ int *data = (int *) _data; ​ int x = data[n]; ​ data[n] = 0; ​ if (x < 2) return; ​  for (int i = 2; i < x; i++) ​ if (x % i == 0) return; ​ data[n] = x; ​}
  • mapreduce.c (my_reduce)
    將 left 與 right 的值相加後存到 left 裡

    ​void my_reduce(void *self, void *left, void *right) ​{ ​​​​ *((int *) left) += *((int *) right); ​}
  • threadpool.c (threadpool_map_thread(void *arg))

    int id = *(int *) arg; ​threadpool_map_t *map = (threadpool_map_t *) ((int *) arg - id);

    參照 mapreduce.c 中的 threadpool_map_t

    typedef struct { ​ int personal_pointers[THREADS_MAX]; ​ void(*routine)(int n, void *); ​ void *arg; ​ int size; ​ int thread_count; ​ sem_t done_indicator; ​} threadpool_map_t;
    • arg 指向 threadpool_map_t.personal_pointers[i]
    • ((int *) arg - id) 表示 threadpool_map_t.personal_pointers[i - id] 的位置,也就是 threadpool_map_t.personal_pointers[0] 的位置 ( struct 的起始位置)。
  • threadpool.c (threadpool_reduce)


Code Refactoring

處理 queue full 的情況

論文的 Fault Tolerance 中曾提及失敗的 map task 仍需被重新放進 queue 裡等待重新執行,而在原本的程式碼中卻直接 return error,經過測試將 QUEUE 設到 5 以下程式就會死掉,因此在這裡判斷 _err 是否為 threadpool_queue_full 的情況,是的話就讓這個 task 等到 queue 有位置時重新 add。

將 queue 的大小設成 256 與 2 時的時間比較,可以發現 queue size 比較小並沒有讓時間變的比較長。

底下是 gprof 的結果,可看出變化不大
queue size = 256:

Each sample counts as 0.01 seconds.
 %   cumulative   self              self     total           
time   seconds   seconds    calls  ns/call  ns/call  name    
95.18      0.38     0.38  1455810   261.52   261.52  is_simple
 5.01      0.40     0.02                             threadpool_reduce_thread
 0.00      0.40     0.00   186523     0.00     0.00  my_reduce
 0.00      0.40     0.00        8     0.00     0.00  my_free
 0.00      0.40     0.00        6     0.00     0.00  my_alloc_neutral
 0.00      0.40     0.00        1     0.00     0.00  my_finish

queue size = 2:

Each sample counts as 0.01 seconds.
 %   cumulative   self              self     total           
time   seconds   seconds    calls  ns/call  ns/call  name    
91.08      0.40     0.40  1328197   301.74   301.74  is_simple
 2.28      0.41     0.01   273410    36.65    36.65  my_reduce
 2.28      0.42     0.01                             main
 2.28      0.43     0.01                             threadpool_map_thread
 2.28      0.44     0.01                             threadpool_reduce_thread
 0.00      0.44     0.00        8     0.00     0.00  my_free
 0.00      0.44     0.00        7     0.00     0.00  my_alloc_neutral
 0.00      0.44     0.00        1     0.00     0.00  my_finish

精簡程式碼

threadpool.cthreadpool_create 函式中,判斷傳入的 thread_countqueue_size 的 if 敘述可以再精簡

  • 109 行不需要 goto err ,因為還未呼叫到 pool = (threadpool_t *)malloc(sizeof(threadpool_t) ,不用處理 pool 記憶體釋放。
  • 原本
if(thread_count <= 0 || thread_count > MAX_THREADS || queue_size <= 0 || queue_size > MAX_QUEUE) { return NULL; } if(thread_count <= 0 || thread_count > THREADS_MAX) { goto err; }
  • 修改後
if(thread_count <= 0 || thread_count > MAX_THREADS \ || thread_count > THREADS_MAX \ || queue_size <= 0 || queue_size > MAX_QUEUE) { return NULL; }

修正 mapreduce 的行為

對整個程式稍微做了一點調整,在閱讀原本程式碼的時候,發現到這裡真的很奇怪

int err = threadpool_map(pool, info.thread_count, threadpool_reduce_thread, &info, 0); if (err) return err; for (int i = 1; i < info.thread_count; i++) { info.reduce_data->reduce(info.reduce_data->additional, info.elements[0], info.elements[i]); info.reduce_data->reduce_free(info.reduce_data->additional, info.elements[i]); } info.reduce_data->reduce_finish(info.reduce_data->additional, info.elements[0]); info.reduce_data->reduce_free(info.reduce_data->additional, info.elements[0]); threadpool_merge_reduce(&info); return 0; }

出現了兩次的 reduce ,而且兩個 reduce 都是同一種行為
雖然這不影響最後結果及程式撰寫,但對於使用這份程式的人來說,整體的彈性會因為卡在 true reduce 完後的 merge 仍然和原本的 reduce 共用 function 而導致難已撰寫。所以將 for 迴圈裡進行 merge 動作的部分獨立出來,並將原本沒有特別用處的 reduce_finish 重新調用給他。(merge 和 reduce 兩者相似,但不是相等,應該獨立出來處理。)

static void threadpool_merge_reduce(reduce_t_internal *info) { // merge all reduce result into global one(in master) for (int i = 0; i < info->thread_count; i++) { info->reduce_data->reduce_finish(info->reduce_data->additional, info->reduce_data->result , info->elements[i]); info->reduce_data->reduce_free(info->reduce_data->additional, info->elements[i]); } }

在新的 static function 裡面,不再使用 reduce ,而是改為使用 reduce_finish

本次修正請見 commit

另外又發現到,在 test/mapreduce.c 中,如果 threadpool_reduce_t 裡的 function 給 NULL 的話,會造成 core dump ,所以另外增設一個階段為 threadpool_mapreduce_setup,用來設置 mapreduce 會用到的 user-defined function 或 specification,並在裡面做好 error handling,讓使用者可以在外部呼叫的時候知道自己漏了哪邊。
另外增加 mapreduce_err_t 的 enumeration

typedef enum{
    reduce_reduce_not_set = -1,
    reduce_alloc_not_set = -2,
    reduce_free_not_set = -3,
    reduce_finish_not_set = -4
} mapreduce_err_t;

這部分如果以後有新增 map 階段的 error handling 的話需要重新新增

原本程式執行順序為







g



create

create



map

map



create->map





reduce

reduce



map->reduce





end

end



reduce->end





改為







g



create

create



setup

setup



create->setup





map

map



setup->map





reduce

reduce



map->reduce





merge

merge



reduce->merge





end

end



merge->end





最後修正命名問題,並額外在 threadpool_reduce_t 裡新增 void *result ,讓原本結果都是輸出在 elements[0],改為使用者自定變數去獲取結果值。
修正後的 struct threadpool_reduce_t

typedef struct {
    void (*reduce)(void *addional, void *result, void *data);
    void (*reduce_finish)(void *additional, void *result , void *local);
    void *(*reduce_alloc_neutral)(void *additional);
    void (*reduce_free)(void *additional, void *node);
    
    int object_size;
    void *begin;
    void *end;
    void *result;
    void *additional;
} threadpool_reduce_t;

主要更動為更改易混淆的命名,像是原本 reduce function 中的 left , right , self,這幾個因為操作上的不同,而重新命名成其他更容易懂得變數名稱。

修正後的版本的 example commit,移除 mapreduce.c 以避免命名上的誤會 (未來仍可能將 mapreduce 額外拉出做成一份檔,所以不該在 test 裡面使用容易混淆的命名)


改進質數演算法

想要先將 CPU user time 降下來,因此先對質數演算法 mapreduce.c(is_simple) 做改進,原本的 for loop 的時間複雜度是 O(n),因為一直呼叫它,所以先來改進它的效能。

is_simple for loop 部份改為

if (x > 2 && x % 2 == 0) return; for (int i = 3; i * i <= x; i += 2) if (x % i == 0) return;

減少 for loop iteration 的次數。
參考老師的建議,將判斷式使用 bitwise operator 簡化,如下

if ((x & ~3) && !(x & 1)) return;

如何將判斷式更加簡化和 for loop 的簡化還在想鄭皓澤

(x & ~3) && !(x & 1) 仍可化簡為 bitwise 操作 jserv

接著可以發現

$ make test
map : 0.000554
reduce : 0.000251

$ time ./tests/mapreduce
./tests/mapreduce  0.00s user 0.00s system 37% cpu 0.011 total

map 的時間下降了,執行時間也下降了。


Thread Pool 改進目標

The API contains addtional unused 'flags' parameters that would allow some additional options:

  • Lazy creation of threads (easy)
  • Reduce number of threads automatically (hard)
  • Unlimited queue size (medium)
  • Kill worker threads on destroy (hard, dgerous)
  • Support Windows API (medium)
  • Reduce locking contention (medium/hard)

對照 mutrace 和 ThreadSanitizer 的輸出,先找到 lock contention 和潛在的執行緒操作議題 jserv


程式分析

計算時間

  • 利用 clock_gettime 去計算 map 跟 reduce 的時間

    ​map : 0.044008 sec
    ​reduce : 0.000240 sec
    

Yastopwatch 很好用,只是原本的程式碼不是用 clock_gettime,需要改寫再來整合 jserv

精度應該夠? get_usecgettimeofday, 還有一個 get_tsc 是直接用 rdtscDEFINE_TSC_CLOCK 應該很夠用了 louielu

  • 利用 time 指令觀察程式執行時間

    ​$ time ./tests/mapreduce
    ​./tests/mapreduce  0.18s user 0.00s system 307% cpu 0.058 total
    

    因為使用 thread 的關係,可以發現 CPU 使用率高於 100%,並且 CPU user time 大於 system time,詳細可以參考 Homework1 (compute-pi)

使用 Yastopwatch 計算時間

看到老師的建議後,研究了如何使用 Yastopwatch ,並將其實做方式從 gettimeoftoday() 改為 clock_gettime()


mutrace

先使用上一個作業學到的工具 mutrace 來觀察程式。

mutrace: Showing statistics for process mapreduce (pid 11976).
mutrace: 1 mutexes used.

Mutex #0 (0x0x1723300) first referenced by:
	/usr/lib/mutrace/libmutrace.so(pthread_mutex_init+0xf2) [0x7fedd58f34b2]
	./tests/mapreduce(threadpool_create+0x10a) [0x402055]
	./tests/mapreduce(main+0x3b) [0x4017f8]
	/lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xf0) [0x7fedd532a830]

mutrace: Showing 1 most contended mutexes:

 Mutex #   Locked  Changed    Cont. tot.Time[ms] avg.Time[ms] max.Time[ms]  Flags
       0       56       42        2        0.138        0.002        0.073 M-.--.
                                                                           ||||||
                                                                           /|||||
          Object:                                     M = Mutex, W = RWLock /||||
           State:                                 x = dead, ! = inconsistent /|||
             Use:                                 R = used in realtime thread /||
      Mutex Type:                 r = RECURSIVE, e = ERRRORCHECK, a = ADAPTIVE /|
  Mutex Protocol:                                      i = INHERIT, p = PROTECT /
     RWLock Kind: r = PREFER_READER, w = PREFER_WRITER, W = PREFER_WRITER_NONREC 

mutrace: Note that the flags column R is only valid in --track-rt mode!

mutrace: Total runtime is 6.375 ms.

mutrace: Results for SMP with 4 processors.

ThreadSanitizer

用來察看 data race 的工具

先安裝 llvm 還有 clang

$ sudo apt-get install llvm
$ sudo apt-get install clang

Makefile 中使用 clang 編譯,並加上 -fsanitize=thread -g -O1,結果顯示4個測試檔只有 thrdtest 有 data race 的問題。

$ make test 

Execute tests/thrdtest...
Pool started with 32 threads and queue size of 256
Added 288 tasks
==================
WARNING: ThreadSanitizer: data race (pid=15153)
  Write of size 4 at 0x0000014ade3c by thread T1 (mutexes: write M0):
    #0 dummy_task /home/chien/mapreduce/tests/thrdtest.c:17 (thrdtest+0x0000004a2059)
    #1 threadpool_thread threadpool.c (thrdtest+0x0000004a2da2)

  Previous read of size 4 at 0x0000014ade3c by main thread:
    #0 main /home/chien/mapreduce/tests/thrdtest.c:39 (thrdtest+0x0000004a21b6)

  As if synchronized via sleep:
    #0 usleep <null> (thrdtest+0x000000433852)
    #1 dummy_task /home/chien/mapreduce/tests/thrdtest.c:15 (thrdtest+0x0000004a2037)
    #2 threadpool_thread threadpool.c (thrdtest+0x0000004a2da2)

  Location is global '<null>' at 0x000000000000 (thrdtest+0x0000014ade3c)

  Mutex M0 (0x0000014ade40) created at:
    #0 pthread_mutex_init <null> (thrdtest+0x00000043db35)
    #1 main /home/chien/mapreduce/tests/thrdtest.c:25 (thrdtest+0x0000004a2092)

  Thread T1 (tid=15155, running) created by main thread at:
    #0 pthread_create <null> (thrdtest+0x000000422336)
    #1 threadpool_create <null> (thrdtest+0x0000004a23dc)
    #2 __libc_start_main /build/glibc-GKVZIf/glibc-2.23/csu/../csu/libc-start.c:291 (libc.so.6+0x00000002082f)

SUMMARY: ThreadSanitizer: data race /home/chien/mapreduce/tests/thrdtest.c:17 in dummy_task

根據 ThreadSanitizer 的訊息,出現 race condition 的應該是 thrdtest.c 裡 data 這個變數,嘗試做了以下修改:

原本:

while((tasks / 2) > done) { usleep(10000); }

修改後:
將取得 done 值的部分放在 crtical section 裡

pthread_mutex_lock(&lock); tmp_done = done; pthread_mutex_unlock(&lock); while ((tasks / 2) > tmp_done) { usleep(10000); pthread_mutex_lock(&lock); tmp_done = done; pthread_mutex_unlock(&lock); }

實作 lock free

參考 LFTPool


流程

  • main thread
    呼叫 tpool_init 後,做完一連串的設定後,會呼叫 spawn_new_thread,在 spawn_new_thread 會做 pthread_create 的動作,接下來進入 wait_for_thread_registration 等待所有 thread create 完成。
Created with Raphaël 2.2.0tpool_initspawn_new_threadwait_for_thread_registrationglobal_num_thread < num_expectedreturnyesno
  • 其他 thread
    進入 tpool_thread 後,其他 thread 首先會對 global_num_thread 加 1,以讓 main thread 可以知道是否所有的 thread 都已經 create 完。
    接著會對 signal set 做操作,讓當下的 thread 等待呼叫,叫醒後如果不是 shutdown 的話會去 get_work_concurrently 拿 work,否則就 pthread_exit
    而 work 的 add 與 get 則是採用 ring-buffer 機制。

    __sync_* 、signal 與 ring-buffer 的操作後面會解釋鄭皓澤

Created with Raphaël 2.2.0tpool_threadwait signalshutdownpthread_exitget_work_concurrentlywork doneyesnoyesno

tpool_add_work

這裡 add work 的機制為,首先先根據任務排程演算法去取得相對應的 thread,這裡實作的方法有 Round-RobinLeast-Load 演算法,Round-Robin 即輪詢式地分配工作,Least-Load 即選擇目前具有最少工作的 worker thread 放入。
並在 dispatch_work2thread 中加入 work,並且判斷如果只有這個新加的 work 就直接叫醒 thread。


signal

signal 的使用方法可見 man page,首先在 tpool_init 中初始化 SIGUSR1 的處理函式為 sig_do_nothing,並在 tpool_thread 中以以下方法達到 conditional variable 的作用。

sigemptyset(&zeromask); sigemptyset(&newmask); sigaddset(&newmask, SIGXX); sigprocmask(SIG_BLOCK, &newmask, &oldmask) ; while (!CONDITION) sigsuspend(&zeromask); sigprocmask(SIG_SETMASK, &oldmask, NULL);

sigsuspend 會讓呼叫的 thread 睡著,參考資料鄭皓澤

跟 process 一樣,每個 thread 都有一個 signal mask,用來指定那些非同步 signal 會被 thread 處理,稱為 unblocked signals,這裡就是用這個原理來運作的,而送出 signal 的函式為 pthread_kill(pthread_t thread, int sig)

pthread_kill 的作用並不是 kill thread


__sync_*

gcc 從 4.1.2 開始提供了 __sync_* 系列的 built-in 函式,程式碼中有多處使用到,例如 __sync_val_compare_and_swap__sync_bool_compare_and_swap__sync_fetch_and_add 等等。
此類 api 稱為 Built-in functions for atomic memory access,atomic operation 是一種不會被中斷的操作,透過他來對共享變數做修改,可以減少 mutex 的使用,達到 lock-free 的目標,進而提升效率。

這句話不精確,不要搞錯 atomic operation 和 mutex 的行為!請重新描述 jserv
抱歉,語意表達上不好,參考 Is there any difference between “mutex” and “atomic operation”? 後修正了。鄭皓澤

詳細使用方法請見參考資料

你前面說要用clang結果這邊用gcc?雖然clang可能多少support,不過現在有現成的C11 atomic operation可以用阿。gnitnaw
這邊是研究LFTPool的過程,研究過程我也有注意到 C11 已經有 atomic operation 可以用,計畫之後用 C11 改寫 LFTPool。鄭皓澤


ring-buffer

環狀佇列長度為 2 的整數次方,out 和 in 下標一直遞增至越界後迴轉,其類型為 unsigned int,即 out 指標一直追趕 in 指標,out 和 in 映射至 FiFo 的對應下標處,其間的元素即為隊列元素。

LFTPool 中還有實作 tpool_inc_threadstpool_dec_threadsbalance_thread_load 可以達到 lazy initional 的目標,待之後深入研究。鄭皓澤


加入 lock free

先初步的將 LFTPool 結合原本的 threadpool.[ch] 後,結果不如預期。遇到以下幾點問題

LFTPool 是個效率較差的 lock-free thread pool,裡頭用到 UNIX signal,後者是很大的效能開銷,應該把 LFTPool 看作「lock-free 的嘗試」,降低 lock contention 但引入更高成本的系統呼叫。關鍵是避開 contention,這部份要善用 atomics jserv

  1. tests/heavy.c 跑不動,程式跑到這裡我的電腦會直接當機,目前還在尋找問題。

    嘗試後發現好像是我的電腦記憶體不足,試過幾個數字後發現我的電腦只能跑 QUEUES = 4,建議先將 threadpool.c 中的 WORK_QUEUE_POWER 設小一點鄭皓澤
    heavy.c 的話我的是 Segmentation fault (core dumped) fzzzz
    這裡研究後還是覺得 SIZE 跟 QUEUE 的質不能定義的太大,會 core dumped 的原因應該是在於 SIZE 太大,導致工作無法消耗,這部分還需要去改善 lock free 版本中對於 queue full 等等情況的 error handle鄭皓澤

  2. tests/mapreduce.c 的運行結果不如預期,在 #define DATASIZE (2000000) 的情況下,執行時間如下

    ​[map] Total time: 0.230074
    ​[is_simple] Total time: 0.776775
    ​[reduce] Total time: 0.012306
    

    原始的版本執行時間如下

    ​[map] Total time: 0.210654
    ​[is_simple] Total time: 0.708091
    ​[reduce] Total time: 0.009638
    

    可見不但沒有精進反而還有變慢的跡象。
    各執行 100 次的比較結果如下圖。

    但使用 mutrace 觀察下,mapreduce.c 中的 mutex 確實消失了。

比較

  • 不同 thread 數量

is_simple會拖比較長多半是因為當is_simple要判斷比較大的n是否為質數時,它得try過每個單數(3,5,7)。
有個方法可以嘗試(不確定有用,可能在DATASIZE小的時候反而更糟):當你用is_simple判斷出某數是質數時,順手把此質數的倍數都mark成0。然後is_simple遇到已經為0的數就直接return。不過不保證會比較好,要是我直接在threadpool_map前把很簡單就能確認的mark掉。
不過話說回來,你們使用mapReduce找質數的動機是啥?應該是讓一堆Core可以一起計算分擔工作讓計算時間縮短,可以比用一個Core算快很多。
可是從你們的code看起來,分配工作(map)耗的成本就比工作本身耗的成本(is_simple)多了。
換句話說,你拜託別人工作,結果你得花更多時間把工作分配給他,那你不如自己做算了,還省時省力。
所以你們該做的,是先讓分配工作的成本<完成被分配工作的成本。
提示:每個queue不一定只能處理一個數,你也不是只能用一個map/reduce(分配工作這種事本來就可以分批做),你處理一個數可以順便把相關的數處理掉,後面就不用重複同樣的動作了。gnitnaw
會用 mapreduce 來找質數的原因只是僅僅因為原本的 example 是用他來做實驗,這部份的 code 與想法的確問題還有很多,我們似乎執著於找質數這部份的效能太久了,的確是應該先從根本的 map 與 reduce 改善著手,撰寫 sort 的部份時也發現我們原本的 mapreduce 問題很多,分配工作的部份的確是很耗時,會儘快修正,謝謝提示。鄭皓澤

那個is_simple找質數的函式本身就是很笨的方法(老師應該是故意的),google一下應該可以發現快很多的。
還有你對於is_simple的計時有點問題,有些queue沒有計入,請修正。
對於不同DATASIZE的計算時間記得也做一下(請順便解決DATASIZE太大時overflow的問題)。
gnitnaw


C11 atomic operation

使用 C11 內建的 atomic operation 取代 LETPool 中使用的 GCC 提供的 __sync_* built-in function。
使用方法可見 cppreference.com

可使用 C11 提供的 atomic operatihttps 去解決 tests/*.c 中的 mutex。鄭皓澤


Mergesort with MapReduce

TeraSort

Terasort 是 yahoo 在 hadoop 上實作的一個排序,在 2008 年 Hadoop 1TB 排序基準評估中獲得第1名。

為了提高 reduce 的並行度,在 map 做了 sample 和 partion

  • Sample:每個 map task 都先從 input 中取一些樣本出來排序
  • Partition:利用自訂的 partitioner 從 sample 取出一些 key (分割點,會將輸入分割成 R 個部份 = Reduce task 的個數) , 所有 sample[i − 1] <= key < sample[i] 會被送到 reduce i 去做排序,這確保了 reduce i 的值都會小於 reduce i+1。
    • partioner 會用 sampled key 建立 trie tree 的資料結構來加速 key 的搜尋

論文參考方向

  1. Yahoo Terasort
    • 網友整理 yahoo terasort 的論文,其中還參考了 yahoo developer hadoop tutorial 的介紹
    • TeraSort SlideShare page 11
  2. " two level trie "
    • 了解 trie tree 資料結構
    • trie
  3. Yahoo Hadoop MapReduce
  4. Other Reference

Sort

github repo

說明

  • 使用自訂的 map , reduce , reduce_alloc_neutral 來實做 sort

  • 目前使用原有 有 lock 的版本 實做

  • 參考

  • 觀察 threadpool.[ch]

    • input data 由 threadpool_map 傳入,input data 必須為陣列。傳入的 input_data 可以藉由使用者自訂的 map 和 reduce 被存取
    • 使用者自訂的 reduce 在 threadpool_reduce 被使用的情境有兩個
      • 每個 threadpool_reduce_thread 分割的資料與 elements[n] 做 reduce
      • threadpool_reduce 將 element[0] 跟所有其他 elements 做 reduce
    • 最後可以藉由自訂的函式 .reduce_finish 和 變數 .self 將 elements[0] 的結果取出
  • 由程式碼的觀察

    • 將 reduce 寫成 merge 兩個 sorted 的 link list
    • 傳入的 input_data 為 llist_t * 的陣列,每個元素都是有一個 node 的 link list object
    • elements[n] 的 type 為 llist_t ** , 是為了保持 reduce(void *self, void *left, void *right) 中傳入 left 和 right 的 type 一致

先不說結果,請問使用linked list或array對於效能上會有啥差異?不理解為啥你們要用linked list。gnitnaw

結果

  • 測試環境
Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                4
On-line CPU(s) list:   0-3
Thread(s) per core:    2
Core(s) per socket:    2
Socket(s):             1
NUMA node(s):          1
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 37
Model name:            Intel(R) Core(TM) i3 CPU       M 330  @ 2.13GHz
Stepping:              2
CPU MHz:               1066.000
CPU max MHz:           2133.0000
CPU min MHz:           933.0000
BogoMIPS:              4255.69
Virtualization:        VT-x
L1d cache:             32K
L1i cache:             32K
L2 cache:              256K
L3 cache:              3072K
NUMA node0 CPU(s):     0-3

8 條 threads 和 256 長的 task queue

  • 10000 筆資料
[map] Total time: 0.000191
[reduce] Total time: 0.043792
  • 100000 筆資料
[map] Total time: 0.000797
[reduce] Total time: 4.488939
  • threads 1 2 4 1024 , queue size 256 , data size 100 200 300 10000
    • total map_time ( threadpool_map )
    • total reduce time ( threadpool_reduce )
      total_reduce_time
    • total reduce time, 16, 32, 64, 128 threads

這部份只有先做圖出來,目前還沒想到如何藉由其他效能測試工具,解釋影響執行時間的關鍵效能
nekoneko

這邊應該要加上一個 base_line 的 single thread program 做比較,看看使用 map/reduce framework 實做是否真的有加快。
nekoneko

  • lock free 版本的 threadpool.[ch]
    • total map time
      lock_free_map
    • total reduce time

reduce time的y軸可以改用log scale,方便看。還有請附上實驗環境(測試用電腦的資料)
void my_map 是空的?那你map time計時有啥意義?gnitnaw

以補上測試環境,謝謝提醒!nekoneko
關於 map time 計時的部份,的確原本是有思考圖表的意義。目前我認為測試 empty 的 my_map 函式的圖表,可以作為思考 threadpool_map 本身的 overhead (因為 my_map 為空的)。否則,如 gnitnaw 大所說的一樣,其實是沒有意義的。nekoneko


參考資料

tags: HaoTse shelly4132 HahaSula nekoneko Fzzzz abba123 team7 sysprog21 mapreduce LFTPool TeraSort