Try   HackMD

Linux 小考 9-1

解釋 schedsort 的運作原理

程式碼首先讀取輸入,檢查輸入資料數量,並呼叫 schedsort 進行排序,並且動態分配 max_prio 的空間如下:

size_t max_prio = (count < 512 ? 512 : (count < 4096 ? 1024 : 2048));

接著透過迴圈尋找輸入資料的最大和最小值,並定義 val_range = val_max - val_min + 1; ,作為等等 fill_ctx_t 結構的成員值。

整個程式碼並未呼叫排序演算法的 API ,而是透過多個 thread 並行寫入資料至優先級 bucket 中,再將多個 buckets 依順序合併。接下來分配 bucket 的空間,每個 bucket 就像是一個優先權佇列,並且總共有 max_prio 個 bucket,三種陣列的用意如下:

  • buckets:優先級佇列,每個 bucket 就是一種優先級
  • bucket_sizes:定義每個優先級裡面的資料數量,用 _Atomic 關鍵字宣告可以確保資料安全的被寫入 bucket 而不會有 race condition
  • bucket_caps:定義每個 bucket 的最大容量,用來分配每個 bucket 的空間

為什麼是 size_t max_prio = (count < 512 ? 512 : (count < 4096 ? 1024 : 2048));
為什麼是 bucket_caps[i] = (count / max_prio) * 2 + 8;

排序數量超過 1024 之後,發現有時候會寫排序錯誤,而且是在第一個 index 出錯,但有時候又會正確排序並輸出結果

接下來依照 N_WORKERS 數量去創造多個 thread,每個執行緒在執行 fill_worker 這個函式,並且把 fill_ctxs[i] 的地址當作 fill_worker 函式的參數,再將創建出的 thread id 用 fillers 陣列存起來。在進行 pthread_create 時,執行緒就會開始並行了,並且同步執行 fill_worker。主要執行緒,即呼叫 pthread_create 的執行緒則會繼續往下執行 pthread_join,在指定的執行緒執行完之前,目前的執行緒會阻塞自己。

fill_worker 函式負責插入資料到 bucket,而且每個執行緒透過 istride 決定要處理哪一段資料,可分為兩步驟:

  1. 使用正規化技巧計算優先級,決定要放到哪個 bucket
  2. 安全的放入該 bucket 避免並行時覆蓋 bucket 資料

一般正規化縮放整個資料範圍,但如果本來資料分佈就有偏態,在縮放後容易集中映射到少數 bucket,造成多執行緒對少數 bucket 的 atomic 操作競爭,因此 schedsort 運用以下正規化的方式減緩叢集效應的發生。

stable_code = ((value - val_min) << 32) | index;
bucket = (stable_code * MAX_PRIO) / ((uint64_t)range << 32);

stable code 的設計特點包含:

  • 穩定性 (stability):相同值將保留原始順序,因為 stable_code 的值會以 index 較小者優先
  • 均勻分布:透過將原始資料值減去最小值後左移 32 位元,並與原始索引結合,產生 64-bit 的 stable_code,再進行線性正規化,此映射手法有助於將偏態分布轉化為近似均勻分布,因為生成的 stable_code 會是唯一的。

計算完要放進哪個 bucket 後,接下來用 C11 Atomics 提供的操作取得唯一的 index。由於在同個 bucket 內可透過 bucket_sizes 追蹤該 bucket 目前的資料量,為避免覆蓋掉原本的資料,且考慮並行操作,應先取得當前最後一個元素的 index 並放到其後面,因此 AAAA 應填入 atomic_fetch_add(&ctx->bucket_sizes[norm], 1),這個操作會先 fetch 指定的地址所存放的值,再將其加 1 ,整個過程是不可分割的,避免更改值的時候其他執行緒存取值。

size_t index = atomic_fetch_add(&ctx->bucket_sizes[norm], 1);

當所有執行緒結束之後,要確定每個 bucket 的最終 size,因此用 bucket_sizes_plain 存放。要取得確定的 size,我們應從 bucket_sizes 著手並且賦值。BBBB 填入 atomic_load(&bucket_sizes[i]),以取得被宣告為 _Atomicbucket_sizes 元素。

for (size_t i = 0; i < max_prio; i++)
    bucket_sizes_plain[i] = atomic_load(&bucket_sizes[i]);

這邊換成 bucket_sizes[i] 輸出結果也會對,因為同時間沒有其他的執行緒運行中,也就沒有人存取 bucket_sizes[i]

而我們要計算每個 bucket 合併時的 offset,存放到 bucket_sizes_offset 中,因此使用 prefix sum 來計算。prefix sum 的概念是累加,計算這格時還要再加上前一格的值,因此 CCCC 應該填入 bucket_sizes_plain[p - 1],所以 bucket_sizes_offset[i] 就會儲存前 0~(i-1) 個 bucket 共有幾個元素,作為合併時的 offset。

當資料都被放進 buckets 之後,就要將每個 bucket 依優先級合併,並將排序後的結果放回 sorted。此時也依據 N_WORKERS 的數量建立執行緒,此次執行緒是並行處理 work_func,共享一個 sorted 並將 bucket 的資料合併進去。主程式在建立完執行緒之後,一樣以 pthread_join 阻塞自己,直到指定的執行緒結束。

worker_func 則是依照 offsetsize 直接找到目前 bucket 應該合併的位置,並依序放入資料到 sorted,這樣做可以避免多個執行緒競爭資源,而且線性寫入對快取較友善,避免隨機寫入的跳躍存取,線性寫入

for (int p = ctx->begin_bucket; p < ctx->end_bucket; p++) {
    int *src = ctx->buckets[p];
    size_t len = ctx->bucket_sizes[p];
    size_t offset = ctx->bucket_offsets[p];
    for (size_t i = 0; i < len; i++)
        ctx->result[offset + i] = src[i];
}

執行緒結束後排序完成,將 sorted 複製到原 data ,並在主程式檢查 data 是否被正確排序。

統計模型

運用 perf 去做分析排序數量不同的情況,寫以下的腳本 test.sh 進行不同排序數量的測試:

> results.txt
for size in $(seq 100 10 8200); do
    input=$(for i in $(seq 1 $size); do echo $i; done)
    result=$(perf stat -e cycles,instructions,cache-misses \
             ./sort $input 2>&1)
    echo "Test results for $size elements:" >> results.txt
    echo "$result" >> results.txt
    echo "--------------------------" >> results.txt
done

並且將輸出使用以下程式 plot.py 繪圖:

import re
import matplotlib.pyplot as plt

with open("results.txt", "r") as file:
    lines = file.readlines()

sizes = []
elapsed_times = []
cycles = []
instructions = []
cache_misses = []

size_pattern = re.compile(r"Test results for (\d+) elements:")
time_pattern = re.compile(r"Elapsed time: (\d+\.\d+) us")
cycle_pattern = re.compile(r"\s*([\d,]+)\s*cpu_atom/cycles/")
instruction_pattern = re.compile(r"\s*([\d,]+)\s*cpu_atom/instructions/")
cache_miss_pattern = re.compile(r"\s*([\d,]+)\s*cpu_atom/cache-misses/")

current_size = None
for line in lines:
    if size_pattern.match(line):
        current_size = int(size_pattern.match(line).group(1))
        sizes.append(current_size)

    if time_pattern.match(line):
        elapsed_time = float(time_pattern.match(line).group(1)) / 1000.0 
        elapsed_times.append(elapsed_time)
    if cycle_pattern.match(line):
        cycle_str = cycle_pattern.match(line).group(1)
        cycles.append(int(cycle_str.replace(',', '')))
    if instruction_pattern.match(line):
        instruction_str = instruction_pattern.match(line).group(1)
        instructions.append(int(instruction_str.replace(',', '')))
    if cache_miss_pattern.match(line):
        cache_miss_str = cache_miss_pattern.match(line).group(1)
        cache_misses.append(int(cache_miss_str.replace(',', '')))

plt.figure(figsize=(10, 12))

print(sizes)
plt.subplot(3, 1, 1)
plt.plot(sizes, cycles, label='CPU Cycles', color='b')
plt.xlabel('Number of Elements')
plt.ylabel('CPU Cycles')
plt.title('CPU Cycles vs. Number of Elements')
plt.grid(True)

plt.subplot(3, 1, 2)
plt.plot(sizes, instructions, label='Instructions', color='g')
plt.xlabel('Number of Elements')
plt.ylabel('Instructions')
plt.title('Instructions vs. Number of Elements')
plt.grid(True)

plt.subplot(3, 1, 3)
plt.plot(sizes, cache_misses, label='Cache Misses', color='r')
plt.xlabel('Number of Elements')
plt.ylabel('Cache Misses')
plt.title('Cache Misses vs. Number of Elements')
plt.grid(True)

plt.tight_layout()
plt.savefig("plot.png")

size_t max_prio = (count < 512 ? 512 : (count < 4096 ? 1024 : 2048)); 的情況下跑輸入數量 count = 100~8200 的測資,並用 plot.py 產生以下圖,但是發現在 result.txt 內,count > 1024 之後都產生錯誤排序。

image

試著將 4096 改成 1024size_t max_prio = (count < 512 ? 512 : (count < 1024 ? 1024 : 2048)),發現變成從 count > 2048 之後會開始排序錯誤,而且是發生在第一個元素的時候就回傳錯誤。發現只要 count 發現只要超過 max_prio 就會發生排序錯誤。