# Linux 小考 9-1
> 解釋 schedsort 的運作原理
[程式碼](https://gist.github.com/jserv/f5159c5d23a49f49e5f2d62d5f39c1f2)首先讀取輸入,檢查輸入資料數量,並呼叫 `schedsort` 進行排序,並且動態分配 `max_prio` 的空間如下:
```c
size_t max_prio = (count < 512 ? 512 : (count < 4096 ? 1024 : 2048));
```
接著透過迴圈尋找輸入資料的最大和最小值,並定義 `val_range = val_max - val_min + 1;` ,作為等等 `fill_ctx_t` 結構的成員值。
整個程式碼並未呼叫排序演算法的 API ,而是透過多個 thread 並行寫入資料至優先級 bucket 中,再將多個 buckets 依順序合併。接下來分配 bucket 的空間,每個 bucket 就像是一個優先權佇列,並且總共有 `max_prio` 個 bucket,三種陣列的用意如下:
- `buckets`:優先級佇列,每個 bucket 就是一種優先級
- `bucket_sizes`:定義每個優先級裡面的資料數量,用 `_Atomic` 關鍵字宣告可以確保資料安全的被寫入 bucket 而不會有 race condition
- `bucket_caps`:定義每個 bucket 的最大容量,用來分配每個 bucket 的空間
:::info
為什麼是 `size_t max_prio = (count < 512 ? 512 : (count < 4096 ? 1024 : 2048));`
為什麼是 `bucket_caps[i] = (count / max_prio) * 2 + 8;` ?
:::
> 排序數量超過 1024 之後,發現有時候會寫排序錯誤,而且是在第一個 index 出錯,但有時候又會正確排序並輸出結果
接下來依照 `N_WORKERS` 數量去創造多個 thread,每個執行緒在執行 `fill_worker` 這個函式,並且把 `fill_ctxs[i]` 的地址當作 `fill_worker` 函式的參數,再將創建出的 thread id 用 `fillers` 陣列存起來。在進行 `pthread_create` 時,執行緒就會開始並行了,並且同步執行 `fill_worker`。主要執行緒,即呼叫 `pthread_create` 的執行緒則會繼續往下執行 `pthread_join`,在指定的執行緒執行完之前,目前的執行緒會阻塞自己。
`fill_worker` 函式負責插入資料到 bucket,而且每個執行緒透過 `i` 和 `stride` 決定要處理哪一段資料,可分為兩步驟:
1. 使用正規化技巧計算優先級,決定要放到哪個 bucket
2. 安全的放入該 bucket 避免並行時覆蓋 bucket 資料
一般正規化縮放整個資料範圍,但如果本來資料分佈就有偏態,在縮放後容易集中映射到少數 bucket,造成多執行緒對少數 bucket 的 atomic 操作競爭,因此 `schedsort` 運用以下正規化的方式減緩叢集效應的發生。
```c
stable_code = ((value - val_min) << 32) | index;
bucket = (stable_code * MAX_PRIO) / ((uint64_t)range << 32);
```
stable code 的設計特點包含:
- 穩定性 (stability):相同值將保留原始順序,因為 `stable_code` 的值會以 index 較小者優先
- 均勻分布:透過將原始資料值減去最小值後左移 32 位元,並與原始索引結合,產生 64-bit 的 stable_code,再進行線性正規化,此映射手法有助於將偏態分布轉化為近似均勻分布,因為生成的 `stable_code` 會是唯一的。
計算完要放進哪個 bucket 後,接下來用 C11 Atomics 提供的操作取得唯一的 index。由於在同個 bucket 內可透過 bucket_sizes 追蹤該 bucket 目前的資料量,為避免覆蓋掉原本的資料,且考慮並行操作,應先取得當前最後一個元素的 index 並放到其後面,因此 `AAAA` 應填入 `atomic_fetch_add(&ctx->bucket_sizes[norm], 1)`,這個操作會先 fetch 指定的地址所存放的值,再將其加 1 ,整個過程是不可分割的,避免更改值的時候其他執行緒存取值。
```c
size_t index = atomic_fetch_add(&ctx->bucket_sizes[norm], 1);
```
當所有執行緒結束之後,要確定每個 bucket 的最終 size,因此用 `bucket_sizes_plain` 存放。要取得確定的 size,我們應從 `bucket_sizes` 著手並且賦值。`BBBB` 填入 `atomic_load(&bucket_sizes[i])`,以取得被宣告為 `_Atomic` 的 `bucket_sizes` 元素。
```c
for (size_t i = 0; i < max_prio; i++)
bucket_sizes_plain[i] = atomic_load(&bucket_sizes[i]);
```
:::info
這邊換成 `bucket_sizes[i]` 輸出結果也會對,因為同時間沒有其他的執行緒運行中,也就沒有人存取 `bucket_sizes[i]`
:::
而我們要計算每個 bucket 合併時的 offset,存放到 `bucket_sizes_offset` 中,因此使用 prefix sum 來計算。prefix sum 的概念是累加,計算這格時還要再加上前一格的值,因此 `CCCC` 應該填入 `bucket_sizes_plain[p - 1]`,所以 `bucket_sizes_offset[i]` 就會儲存前 `0~(i-1)` 個 bucket 共有幾個元素,作為合併時的 offset。
當資料都被放進 `buckets` 之後,就要將每個 `bucket` 依優先級合併,並將排序後的結果放回 `sorted`。此時也依據 `N_WORKERS` 的數量建立執行緒,此次執行緒是並行處理 `work_func`,共享一個 `sorted` 並將 bucket 的資料合併進去。主程式在建立完執行緒之後,一樣以 `pthread_join` 阻塞自己,直到指定的執行緒結束。
`worker_func` 則是依照 `offset` 和 `size` 直接找到目前 bucket 應該合併的位置,並依序放入資料到 `sorted`,這樣做可以避免多個執行緒競爭資源,而且線性寫入對快取較友善,避免隨機寫入的跳躍存取,線性寫入
```c
for (int p = ctx->begin_bucket; p < ctx->end_bucket; p++) {
int *src = ctx->buckets[p];
size_t len = ctx->bucket_sizes[p];
size_t offset = ctx->bucket_offsets[p];
for (size_t i = 0; i < len; i++)
ctx->result[offset + i] = src[i];
}
```
執行緒結束後排序完成,將 `sorted` 複製到原 `data` ,並在主程式檢查 `data` 是否被正確排序。
### 統計模型
運用 perf 去做分析排序數量不同的情況,寫以下的腳本 `test.sh` 進行不同排序數量的測試:
```c
> results.txt
for size in $(seq 100 10 8200); do
input=$(for i in $(seq 1 $size); do echo $i; done)
result=$(perf stat -e cycles,instructions,cache-misses \
./sort $input 2>&1)
echo "Test results for $size elements:" >> results.txt
echo "$result" >> results.txt
echo "--------------------------" >> results.txt
done
```
並且將輸出使用以下程式 `plot.py` 繪圖:
```python
import re
import matplotlib.pyplot as plt
with open("results.txt", "r") as file:
lines = file.readlines()
sizes = []
elapsed_times = []
cycles = []
instructions = []
cache_misses = []
size_pattern = re.compile(r"Test results for (\d+) elements:")
time_pattern = re.compile(r"Elapsed time: (\d+\.\d+) us")
cycle_pattern = re.compile(r"\s*([\d,]+)\s*cpu_atom/cycles/")
instruction_pattern = re.compile(r"\s*([\d,]+)\s*cpu_atom/instructions/")
cache_miss_pattern = re.compile(r"\s*([\d,]+)\s*cpu_atom/cache-misses/")
current_size = None
for line in lines:
if size_pattern.match(line):
current_size = int(size_pattern.match(line).group(1))
sizes.append(current_size)
if time_pattern.match(line):
elapsed_time = float(time_pattern.match(line).group(1)) / 1000.0
elapsed_times.append(elapsed_time)
if cycle_pattern.match(line):
cycle_str = cycle_pattern.match(line).group(1)
cycles.append(int(cycle_str.replace(',', '')))
if instruction_pattern.match(line):
instruction_str = instruction_pattern.match(line).group(1)
instructions.append(int(instruction_str.replace(',', '')))
if cache_miss_pattern.match(line):
cache_miss_str = cache_miss_pattern.match(line).group(1)
cache_misses.append(int(cache_miss_str.replace(',', '')))
plt.figure(figsize=(10, 12))
print(sizes)
plt.subplot(3, 1, 1)
plt.plot(sizes, cycles, label='CPU Cycles', color='b')
plt.xlabel('Number of Elements')
plt.ylabel('CPU Cycles')
plt.title('CPU Cycles vs. Number of Elements')
plt.grid(True)
plt.subplot(3, 1, 2)
plt.plot(sizes, instructions, label='Instructions', color='g')
plt.xlabel('Number of Elements')
plt.ylabel('Instructions')
plt.title('Instructions vs. Number of Elements')
plt.grid(True)
plt.subplot(3, 1, 3)
plt.plot(sizes, cache_misses, label='Cache Misses', color='r')
plt.xlabel('Number of Elements')
plt.ylabel('Cache Misses')
plt.title('Cache Misses vs. Number of Elements')
plt.grid(True)
plt.tight_layout()
plt.savefig("plot.png")
```
在 `size_t max_prio = (count < 512 ? 512 : (count < 4096 ? 1024 : 2048));` 的情況下跑輸入數量 `count = 100~8200` 的測資,並用 `plot.py` 產生以下圖,但是發現在 `result.txt` 內,`count > 1024` 之後都產生錯誤排序。

試著將 `4096` 改成 `1024` ,`size_t max_prio = (count < 512 ? 512 : (count < 1024 ? 1024 : 2048))`,發現變成從 `count > 2048` 之後會開始排序錯誤,而且是發生在第一個元素的時候就回傳錯誤。發現只要 `count` 發現只要超過 `max_prio` 就會發生排序錯誤。