# Linux 小考 9-1 > 解釋 schedsort 的運作原理 [程式碼](https://gist.github.com/jserv/f5159c5d23a49f49e5f2d62d5f39c1f2)首先讀取輸入,檢查輸入資料數量,並呼叫 `schedsort` 進行排序,並且動態分配 `max_prio` 的空間如下: ```c size_t max_prio = (count < 512 ? 512 : (count < 4096 ? 1024 : 2048)); ``` 接著透過迴圈尋找輸入資料的最大和最小值,並定義 `val_range = val_max - val_min + 1;` ,作為等等 `fill_ctx_t` 結構的成員值。 整個程式碼並未呼叫排序演算法的 API ,而是透過多個 thread 並行寫入資料至優先級 bucket 中,再將多個 buckets 依順序合併。接下來分配 bucket 的空間,每個 bucket 就像是一個優先權佇列,並且總共有 `max_prio` 個 bucket,三種陣列的用意如下: - `buckets`:優先級佇列,每個 bucket 就是一種優先級 - `bucket_sizes`:定義每個優先級裡面的資料數量,用 `_Atomic` 關鍵字宣告可以確保資料安全的被寫入 bucket 而不會有 race condition - `bucket_caps`:定義每個 bucket 的最大容量,用來分配每個 bucket 的空間 :::info 為什麼是 `size_t max_prio = (count < 512 ? 512 : (count < 4096 ? 1024 : 2048));` 為什麼是 `bucket_caps[i] = (count / max_prio) * 2 + 8;` ? ::: > 排序數量超過 1024 之後,發現有時候會寫排序錯誤,而且是在第一個 index 出錯,但有時候又會正確排序並輸出結果 接下來依照 `N_WORKERS` 數量去創造多個 thread,每個執行緒在執行 `fill_worker` 這個函式,並且把 `fill_ctxs[i]` 的地址當作 `fill_worker` 函式的參數,再將創建出的 thread id 用 `fillers` 陣列存起來。在進行 `pthread_create` 時,執行緒就會開始並行了,並且同步執行 `fill_worker`。主要執行緒,即呼叫 `pthread_create` 的執行緒則會繼續往下執行 `pthread_join`,在指定的執行緒執行完之前,目前的執行緒會阻塞自己。 `fill_worker` 函式負責插入資料到 bucket,而且每個執行緒透過 `i` 和 `stride` 決定要處理哪一段資料,可分為兩步驟: 1. 使用正規化技巧計算優先級,決定要放到哪個 bucket 2. 安全的放入該 bucket 避免並行時覆蓋 bucket 資料 一般正規化縮放整個資料範圍,但如果本來資料分佈就有偏態,在縮放後容易集中映射到少數 bucket,造成多執行緒對少數 bucket 的 atomic 操作競爭,因此 `schedsort` 運用以下正規化的方式減緩叢集效應的發生。 ```c stable_code = ((value - val_min) << 32) | index; bucket = (stable_code * MAX_PRIO) / ((uint64_t)range << 32); ``` stable code 的設計特點包含: - 穩定性 (stability):相同值將保留原始順序,因為 `stable_code` 的值會以 index 較小者優先 - 均勻分布:透過將原始資料值減去最小值後左移 32 位元,並與原始索引結合,產生 64-bit 的 stable_code,再進行線性正規化,此映射手法有助於將偏態分布轉化為近似均勻分布,因為生成的 `stable_code` 會是唯一的。 計算完要放進哪個 bucket 後,接下來用 C11 Atomics 提供的操作取得唯一的 index。由於在同個 bucket 內可透過 bucket_sizes 追蹤該 bucket 目前的資料量,為避免覆蓋掉原本的資料,且考慮並行操作,應先取得當前最後一個元素的 index 並放到其後面,因此 `AAAA` 應填入 `atomic_fetch_add(&ctx->bucket_sizes[norm], 1)`,這個操作會先 fetch 指定的地址所存放的值,再將其加 1 ,整個過程是不可分割的,避免更改值的時候其他執行緒存取值。 ```c size_t index = atomic_fetch_add(&ctx->bucket_sizes[norm], 1); ``` 當所有執行緒結束之後,要確定每個 bucket 的最終 size,因此用 `bucket_sizes_plain` 存放。要取得確定的 size,我們應從 `bucket_sizes` 著手並且賦值。`BBBB` 填入 `atomic_load(&bucket_sizes[i])`,以取得被宣告為 `_Atomic` 的 `bucket_sizes` 元素。 ```c for (size_t i = 0; i < max_prio; i++) bucket_sizes_plain[i] = atomic_load(&bucket_sizes[i]); ``` :::info 這邊換成 `bucket_sizes[i]` 輸出結果也會對,因為同時間沒有其他的執行緒運行中,也就沒有人存取 `bucket_sizes[i]` ::: 而我們要計算每個 bucket 合併時的 offset,存放到 `bucket_sizes_offset` 中,因此使用 prefix sum 來計算。prefix sum 的概念是累加,計算這格時還要再加上前一格的值,因此 `CCCC` 應該填入 `bucket_sizes_plain[p - 1]`,所以 `bucket_sizes_offset[i]` 就會儲存前 `0~(i-1)` 個 bucket 共有幾個元素,作為合併時的 offset。 當資料都被放進 `buckets` 之後,就要將每個 `bucket` 依優先級合併,並將排序後的結果放回 `sorted`。此時也依據 `N_WORKERS` 的數量建立執行緒,此次執行緒是並行處理 `work_func`,共享一個 `sorted` 並將 bucket 的資料合併進去。主程式在建立完執行緒之後,一樣以 `pthread_join` 阻塞自己,直到指定的執行緒結束。 `worker_func` 則是依照 `offset` 和 `size` 直接找到目前 bucket 應該合併的位置,並依序放入資料到 `sorted`,這樣做可以避免多個執行緒競爭資源,而且線性寫入對快取較友善,避免隨機寫入的跳躍存取,線性寫入 ```c for (int p = ctx->begin_bucket; p < ctx->end_bucket; p++) { int *src = ctx->buckets[p]; size_t len = ctx->bucket_sizes[p]; size_t offset = ctx->bucket_offsets[p]; for (size_t i = 0; i < len; i++) ctx->result[offset + i] = src[i]; } ``` 執行緒結束後排序完成,將 `sorted` 複製到原 `data` ,並在主程式檢查 `data` 是否被正確排序。 ### 統計模型 運用 perf 去做分析排序數量不同的情況,寫以下的腳本 `test.sh` 進行不同排序數量的測試: ```c > results.txt for size in $(seq 100 10 8200); do input=$(for i in $(seq 1 $size); do echo $i; done) result=$(perf stat -e cycles,instructions,cache-misses \ ./sort $input 2>&1) echo "Test results for $size elements:" >> results.txt echo "$result" >> results.txt echo "--------------------------" >> results.txt done ``` 並且將輸出使用以下程式 `plot.py` 繪圖: ```python import re import matplotlib.pyplot as plt with open("results.txt", "r") as file: lines = file.readlines() sizes = [] elapsed_times = [] cycles = [] instructions = [] cache_misses = [] size_pattern = re.compile(r"Test results for (\d+) elements:") time_pattern = re.compile(r"Elapsed time: (\d+\.\d+) us") cycle_pattern = re.compile(r"\s*([\d,]+)\s*cpu_atom/cycles/") instruction_pattern = re.compile(r"\s*([\d,]+)\s*cpu_atom/instructions/") cache_miss_pattern = re.compile(r"\s*([\d,]+)\s*cpu_atom/cache-misses/") current_size = None for line in lines: if size_pattern.match(line): current_size = int(size_pattern.match(line).group(1)) sizes.append(current_size) if time_pattern.match(line): elapsed_time = float(time_pattern.match(line).group(1)) / 1000.0 elapsed_times.append(elapsed_time) if cycle_pattern.match(line): cycle_str = cycle_pattern.match(line).group(1) cycles.append(int(cycle_str.replace(',', ''))) if instruction_pattern.match(line): instruction_str = instruction_pattern.match(line).group(1) instructions.append(int(instruction_str.replace(',', ''))) if cache_miss_pattern.match(line): cache_miss_str = cache_miss_pattern.match(line).group(1) cache_misses.append(int(cache_miss_str.replace(',', ''))) plt.figure(figsize=(10, 12)) print(sizes) plt.subplot(3, 1, 1) plt.plot(sizes, cycles, label='CPU Cycles', color='b') plt.xlabel('Number of Elements') plt.ylabel('CPU Cycles') plt.title('CPU Cycles vs. Number of Elements') plt.grid(True) plt.subplot(3, 1, 2) plt.plot(sizes, instructions, label='Instructions', color='g') plt.xlabel('Number of Elements') plt.ylabel('Instructions') plt.title('Instructions vs. Number of Elements') plt.grid(True) plt.subplot(3, 1, 3) plt.plot(sizes, cache_misses, label='Cache Misses', color='r') plt.xlabel('Number of Elements') plt.ylabel('Cache Misses') plt.title('Cache Misses vs. Number of Elements') plt.grid(True) plt.tight_layout() plt.savefig("plot.png") ``` 在 `size_t max_prio = (count < 512 ? 512 : (count < 4096 ? 1024 : 2048));` 的情況下跑輸入數量 `count = 100~8200` 的測資,並用 `plot.py` 產生以下圖,但是發現在 `result.txt` 內,`count > 1024` 之後都產生錯誤排序。 ![image](https://hackmd.io/_uploads/Bk5nan4Jge.png) 試著將 `4096` 改成 `1024` ,`size_t max_prio = (count < 512 ? 512 : (count < 1024 ? 1024 : 2048))`,發現變成從 `count > 2048` 之後會開始排序錯誤,而且是發生在第一個元素的時候就回傳錯誤。發現只要 `count` 發現只要超過 `max_prio` 就會發生排序錯誤。