首先是輸入的部分,因為兩個節點在交換資料並 merge 的時候,在長度一樣的情況下會比較好實作,因此首先將每個節點的資料都設定為
此外,由於資料量過小時,將其分散到不同 process 反而會降低其效能,因此我將每個 process 的最小 batch size 設定為 65536,如果低於 65536 就不再使用其他 process。
接著建立三個足夠大的 float 陣列:a, b, recv_data
,並建立兩個 float 指標:data[0] = a, data[1] = b
,將 data[0]
作為目前使用的資料。
最後呼叫 MPI 的 read api 將對應到的資料讀到 data[0][1] ~ data[0][chunksize]
,並將 data[0][0], data[0][chunksize + 1]
設定為 infinity。
輸入的部分有兩種情況:
MPI_File_iread_at_all
,利用其非同步的特性,使得 process 不需要等待其他 process,可以直接進行計算。MPI_File_read_at
讀檔process 內的排序我使用了 boost::sort::spreadsort::spreadsort
這個基於 Radix sort 的排序,在大資料下有較好的表現
首先定義一個變數 tick = rank % 2
,代表進行資料交換時要做的動作。在最一開始的時候,每個 process 的 tick
會呈現 0, 1, 0, 1, 0, 1, ...
的模式。
接著每一輪都會進行兩次的資料交換,tick = 0
的 process 將作為左節點,tick = 1
的 process 作為右節點進行資料交換,因此交換的模式就會呈現:0(0) <=> 1(1), 2(0) <=> 3(1), ...
。
經過第一次資料交換後,將每個 process 的 tick 與 1 做 xor,因此每個 process 的 tick
就會呈現 1, 0, 1, 0, 1, 0, ...
,交換的模式就會變成:O(1), 1(0) <=> 2(1), 3(0) <=> 4(1), ...
首先左節點將其最大的資料 cnt
設為 1。
接著是 merge 的實作,由於我已經將資料的兩端都設定成 infinity,因此在做 merge 的過程中,不需要判定兩邊是否已經達到邊界,以減少分支判斷,右節點的 merge 方式如下如下,而左節點則是從 index = 1 開始判斷:
i = j = now = batchsize;
while (now) {
if (data[0][i] > recv_arr[j]) {
data[1][now--] = data[0][i--];
}
else {
data[1][now--] = recv_arr[j--];
}
}
將 data[0], recv_arr
兩個陣列的資料 merge 到 data[1] 過後,因為 data 本身是一個 float 的 pointer array,指向 a, b 兩個陣列,因此只需要呼叫 std::swap(data[0], data[1]
就能夠將 merge 過後的資料再度以 data[0]
表示。透過與動態規劃題目中滾動陣列的方式相似,省去了 memcpy 的步驟。
最後,我呼叫了 AllGather,將每個 process 的 cnt 做 OR 操作,只要有一對 process 有進行資料交換,就會繼續新的一輪資料交換,直到每個 process 都不需要再交換為止。
最後呼叫 MPI 的 file read 相關函式(File_open, File_write_at, File_close
)將資料存入指定檔案中,便完成了 Odd-Even Sort 的平行版本
以下的實驗,我以 06.txt
這個有 65536 個 float 的測資作為小測資代表,並以 35.txt
這個有 536869888 個 float 的測資作為大測資代表,並預設以 1 個節點、12 個 process 作為基準。並在程式碼中透過 MPI_Wtime
來算出 IO time, MPI time, CPU time.
在一開始,我仿照最原始的 odd-even sort,將所有 process 視為是一個很長但被切成幾段的陣列,每次就只跟相鄰的單一元素進行比較交換,在小測資上花了 1.72s 的時間,在大測資上則是直接 TLE。透過輸出結果可以發現,CPU time 佔據了接近 80% 的時間,這是因為在 process 內,直接與相鄰元素交換等同於每一次的跨 process 資料交換都需要花最多
因此,我修改了交換資料的策略,從單一元素改為整個 chunk (process) 的所有資料。起初我使用的策略是,在進行兩堆資料雙指針 merge 的時候,將一整段資料傳給對方,並各自進行 merge, copy,將資料正確的擺放到新的陣列裡,如下所示:
int change = 0;
bool pick = rank % 2;
do {
int cnt = 0;
change = 0;
float recv_val;
for (int t = 0; t < 2; t++) {
if (pick == 1 && rank && rank < size) {
mpi_time -= MPI_Wtime();
MPI_Send(data, batchsize, MPI_FLOAT, rank - 1, 0, MPI_COMM_WORLD);
MPI_Recv(recv_arr, batchsize, MPI_FLOAT, rank - 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
mpi_time += MPI_Wtime();
if (data[0] < recv_arr[batchsize - 1]) {
cnt++;
std::merge(data, data + batchsize, recv_arr, recv_arr + batchsize, tmp);
std::copy(tmp + batchsize, tmp + 2 * batchsize, data);
}
}
if (pick == 0 && rank < size - 1) {
mpi_time -= MPI_Wtime();
MPI_Recv(recv_arr, batchsize, MPI_FLOAT, rank + 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
MPI_Send(data, batchsize, MPI_FLOAT, rank + 1, 0, MPI_COMM_WORLD);
mpi_time += MPI_Wtime();
if (data[batchsize - 1] > recv_arr[0]) {
cnt++;
std::merge(data, data + batchsize, recv_arr, recv_arr + batchsize, tmp);
std::copy(tmp, tmp + batchsize, data);
}
}
pick = !pick;
}
mpi_time -= MPI_Wtime();
MPI_Allreduce(&cnt, &change, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
mpi_time += MPI_Wtime();
} while (change);
在此策略下,06.txt
的執行時間降低到了 0.39s,而大測資則是花了 24s 完成。可以發現,CPU time 已經降低到了 34.1%,反而是 IO 佔了 43.6%,成為這個算法的瓶頸。
因此,我將輸入改成是非同步輸入,並在等待輸入完成的過程中先行將需要填充為 INF 的部分填充好,最後在大測資上得到了 16.59s 的執行時間。此時的 CPU, MPI 再次成為瓶頸:
MPI_Request send_request, recv_request;
MPI_Info info;
MPI_Info_create(&info);
MPI_Info_set(info, "cb_buffer_size", "16777216"); // 16MB
MPI_Info_set(info, "romio_cb_read", "enable"); // 啟用集體緩衝讀取
MPI_File_open(MPI_COMM_WORLD, input_filename, MPI_MODE_RDONLY, info, &input_file);
MPI_File_set_info(input_file, info);
MPI_File_iread_at_all(input_file, start * sizeof(float), data, len, MPI_FLOAT, &recv_request);
if (len < batchsize) {
std::fill(data + len, data + batchsize, std::numeric_limits<float>::infinity());
}
MPI_Wait(&recv_request, MPI_STATUS_IGNORE);
MPI_File_close(&input_file);
最後,我將 Send, Recv
改成非同步的 Isend, IRecv
,並使用滾動陣列以及填充資料的方式來優化 copy 以及 merge 的時間,也就是最前面提到的實作方法,並加入了更多編譯器優化如 -march=native -mtune=native -funroll-loops -fopenmp -ftree-vectorize
。這樣的方法下,大測資只花了 9.19s 就完成了排序,每個階段所花費的時間幾乎減半。
實驗全部都是在課程的 apollo 上跑的,並使用了 MPI_Walltime
這個 function 來取得每個時間點的單位時間。首先將程式切成三段:Input, Computing, Output,但因為 Computing 的過程包含了 communication & sorting,因此需要額外的將有用到 MPI 時間算出來,如此一來便可以得到 IO, Computing, Communication 的時間了。
不同 Process 數量下,IO / MPI / Computing 三種操作的時間占比
不同 Process 數量下的 Speed up
將 IO 拿掉後的 Speed up
df -h
看了一下硬碟的情況,發現 /share
是使用 beegfs,推測有可能是因為檔案系統或是掛載的硬碟不一樣的緣故,才會導致有那麼大的差異這次作業中,我發現在平行程式的世界裡面,即使時間複雜度是一樣的,結果也有可能差數十倍,甚至數百倍,這個與平常寫 sequential code 有很大的差異,除了需要壓常之外,對於各種工具所佔用的時間、方法都需要十分了解,否則就有可能將好的工具用在錯誤的地方,最後反而使效能下降。
除此之外,我有讀到一篇文章寫到使用 SIMD 來實作 merge,不過可惜因為時間分配的緣故,沒有辦法將這個功能實作出來,否則 Computing 的效能應該會再快上不少,希望之後有空的時候可以把這部分理解清楚。同時我也很好奇最前面的同學是用什麼方法加速的,因為某些測資看起來就一定會花上一定的時間做初始化等處理,但他竟然能夠壓到那麼低,除了使用 SIMD 實作 merge 之外,暫時想不太到還有什麼方法能夠加速數十 % 了。