解釋 schedsort 的運作原理
程式碼首先讀取輸入,檢查輸入資料數量,並呼叫 schedsort
進行排序,並且動態分配 max_prio
的空間如下:
接著透過迴圈尋找輸入資料的最大和最小值,並定義 val_range = val_max - val_min + 1;
,作為等等 fill_ctx_t
結構的成員值。
整個程式碼並未呼叫排序演算法的 API ,而是透過多個 thread 並行寫入資料至優先級 bucket 中,再將多個 buckets 依順序合併。接下來分配 bucket 的空間,每個 bucket 就像是一個優先權佇列,並且總共有 max_prio
個 bucket,三種陣列的用意如下:
buckets
:優先級佇列,每個 bucket 就是一種優先級bucket_sizes
:定義每個優先級裡面的資料數量,用 _Atomic
關鍵字宣告可以確保資料安全的被寫入 bucket 而不會有 race conditionbucket_caps
:定義每個 bucket 的最大容量,用來分配每個 bucket 的空間為什麼是 size_t max_prio = (count < 512 ? 512 : (count < 4096 ? 1024 : 2048));
為什麼是 bucket_caps[i] = (count / max_prio) * 2 + 8;
?
排序數量超過 1024 之後,發現有時候會寫排序錯誤,而且是在第一個 index 出錯,但有時候又會正確排序並輸出結果
接下來依照 N_WORKERS
數量去創造多個 thread,每個執行緒在執行 fill_worker
這個函式,並且把 fill_ctxs[i]
的地址當作 fill_worker
函式的參數,再將創建出的 thread id 用 fillers
陣列存起來。在進行 pthread_create
時,執行緒就會開始並行了,並且同步執行 fill_worker
。主要執行緒,即呼叫 pthread_create
的執行緒則會繼續往下執行 pthread_join
,在指定的執行緒執行完之前,目前的執行緒會阻塞自己。
fill_worker
函式負責插入資料到 bucket,而且每個執行緒透過 i
和 stride
決定要處理哪一段資料,可分為兩步驟:
一般正規化縮放整個資料範圍,但如果本來資料分佈就有偏態,在縮放後容易集中映射到少數 bucket,造成多執行緒對少數 bucket 的 atomic 操作競爭,因此 schedsort
運用以下正規化的方式減緩叢集效應的發生。
stable code 的設計特點包含:
stable_code
的值會以 index 較小者優先stable_code
會是唯一的。計算完要放進哪個 bucket 後,接下來用 C11 Atomics 提供的操作取得唯一的 index。由於在同個 bucket 內可透過 bucket_sizes 追蹤該 bucket 目前的資料量,為避免覆蓋掉原本的資料,且考慮並行操作,應先取得當前最後一個元素的 index 並放到其後面,因此 AAAA
應填入 atomic_fetch_add(&ctx->bucket_sizes[norm], 1)
,這個操作會先 fetch 指定的地址所存放的值,再將其加 1 ,整個過程是不可分割的,避免更改值的時候其他執行緒存取值。
當所有執行緒結束之後,要確定每個 bucket 的最終 size,因此用 bucket_sizes_plain
存放。要取得確定的 size,我們應從 bucket_sizes
著手並且賦值。BBBB
填入 atomic_load(&bucket_sizes[i])
,以取得被宣告為 _Atomic
的 bucket_sizes
元素。
這邊換成 bucket_sizes[i]
輸出結果也會對,因為同時間沒有其他的執行緒運行中,也就沒有人存取 bucket_sizes[i]
而我們要計算每個 bucket 合併時的 offset,存放到 bucket_sizes_offset
中,因此使用 prefix sum 來計算。prefix sum 的概念是累加,計算這格時還要再加上前一格的值,因此 CCCC
應該填入 bucket_sizes_plain[p - 1]
,所以 bucket_sizes_offset[i]
就會儲存前 0~(i-1)
個 bucket 共有幾個元素,作為合併時的 offset。
當資料都被放進 buckets
之後,就要將每個 bucket
依優先級合併,並將排序後的結果放回 sorted
。此時也依據 N_WORKERS
的數量建立執行緒,此次執行緒是並行處理 work_func
,共享一個 sorted
並將 bucket 的資料合併進去。主程式在建立完執行緒之後,一樣以 pthread_join
阻塞自己,直到指定的執行緒結束。
worker_func
則是依照 offset
和 size
直接找到目前 bucket 應該合併的位置,並依序放入資料到 sorted
,這樣做可以避免多個執行緒競爭資源,而且線性寫入對快取較友善,避免隨機寫入的跳躍存取,線性寫入
執行緒結束後排序完成,將 sorted
複製到原 data
,並在主程式檢查 data
是否被正確排序。
運用 perf 去做分析排序數量不同的情況,寫以下的腳本 test.sh
進行不同排序數量的測試:
並且將輸出使用以下程式 plot.py
繪圖:
在 size_t max_prio = (count < 512 ? 512 : (count < 4096 ? 1024 : 2048));
的情況下跑輸入數量 count = 100~8200
的測資,並用 plot.py
產生以下圖,但是發現在 result.txt
內,count > 1024
之後都產生錯誤排序。
試著將 4096
改成 1024
,size_t max_prio = (count < 512 ? 512 : (count < 1024 ? 1024 : 2048))
,發現變成從 count > 2048
之後會開始排序錯誤,而且是發生在第一個元素的時候就回傳錯誤。發現只要 count
發現只要超過 max_prio
就會發生排序錯誤。