# Linux 核心專題: 改進 ksort
> 執行人: tintinjian12999
> [解說錄影](https://www.youtube.com/watch?v=YSmA_4jNIdw)
### Reviewed by `164253`
pivot 為中位數時,後續只需再做 $O(2\frac n2\log\frac n2)$ 的排序,否則假設小於 pivot 有 $a$ 項,需要 $O(a\log a+(n-a)\log(n-a))$ 完成排序,顯然最小值在 $a=\frac n2$ 。
對於多個數和 pivot 相同的情況,可以 $O(n)$ 掃描一次,用[摩爾投票法](https://en.m.wikipedia.org/wiki/Boyer%E2%80%93Moore_majority_vote_algorithm) 找出眾數,並判斷眾數的出現次數過少則後續使用普通的方法排序。
### Reviewed by `vestata`
>此處分別測試了排序五種資料所需的時間,分別是 rand, sawtooth, stagger, shuffle, plateau
這邊五種排序資料是參考甚麼文獻還是自己假設的?
### Reviewed by `Terry7Wei7`
完成剩餘的TODO事項,並更新在頁面中
## 任務簡介
重作第六次作業的 ksort,探討並行化的資料排序實作手法。
## TODO: 依據第六次作業規範,重作 ksort
> 要確保並行化的資料排序結果正確,善用 kernel thread / workqueue / CMWQ 來分發運算工作到有效的 CPU 核。
> 善用 CMWQ 達成排序的並行處理,需要設計對應的檢驗程式。
> 可適度引用其他學員的成果,務必註明出處並確認其實作有效益
> [CMWQ](https://www.kernel.org/doc/html/v4.10/core-api/workqueue.html)
:::danger
更新超連結指向的版本,應為 latest
:::
### 在 ksort 裡的 CMWQ
:::danger
不該急著閱讀程式碼或「舉燭」,你該說明,如何借助 CMWQ,讓原本單一執行緒的快速排序程式碼,得以成為並行化的實作?中間又有哪些考慮因素和限制呢?
:::
#### 為什麼要使用 CMWQ
在原有的 quick sort 實作中,會遞迴呼叫函式來達到 partition 的效果,並且這一過程是循序的 (sequential),而為了加速程式的執行,能夠將遞迴呼叫的部份使用 CMWQ 並行化以增加程式執行的效率。
:::danger
用 [Amdahl's law](https://en.wikipedia.org/wiki/Amdahl%27s_law) 解釋和預測加速的幅度,附上對應的數學分析,作為之後實驗時的分析對照。
:::
##### Amdahl's law
Amdahl's law 可以用來表示針對系統中一部分<s>優化</s> 改善後對整體表現帶來的性能增加,其數學式為
:::danger
不要濫用「優化」一詞,參見 https://hackmd.io/@sysprog/it-vocabulary
:::
$$
S_{speedup} = \frac{1}{(1 - p) + \frac{p}{s}}
$$
其中 p 是該改善操作帶來的影響佔整體系統的比重,s 是該改善操作帶來的效能提昇。
而整個系統運行的總時間為 $$T(s) = (1 - p)T + \frac{p}{s}T$$
其中 T 代表原本系統運行花的時間而 T(s) 代表應用該改善操作後運行花的時間。
以上公式可以用來評估改善帶來的效益,舉維基百科的例子
![圖片](https://hackmd.io/_uploads/B1ME6Tc80.png)
考慮以上兩個循序的程式,程式 A 需要花 3 秒執行而程式 B 需要花 1 秒執行,若有兩個操作,一個能令程式 B 快 5 倍而另外一個能令程式 A 快 2 倍,此時前者帶來的改善幅度為
$$
S_{speedup} = \frac{1}{(1 - 0.25) + \frac{0.25}{5}} = 1.25
$$
而後者帶來的改善幅度為
$$
S_{speedup} = \frac{1}{(1 - 0.75) + \frac{0.75}{2}} = 1.6
$$
能夠看到雖然讓程式 B 快 5 倍看起來提昇較多但是對整體系統的影響反而較少。
並且改善帶來的效能提昇也是有上限的
$$
\lim_{s \to \infty}S_{speedup} = \frac{1}{(1 - 0.25) + \frac{0.25}{s}} = 1.33
$$
並且其也能用來預估程式並行化帶來的效益
$$
S_{speedup} = \frac{1}{(1 - p) + \frac{p}{N}}
$$
其中 p 為並行的比例,N 為用來計算的資源數
#### ksort 實作
ksort 裡定義結構體 `struct qsort`
```c
struct common {
int swaptype; /* Code to use for swapping */
size_t es; /* Element size. */
cmp_t *cmp; /* Comparison function */
};
struct qsort {
struct work_struct w;
struct common *common;
void *a;
size_t n;
};
```
裡面包含 CMWQ 的 work item 結構體 `work_struct`,排序時用到的各項參數與比較函式 `common`,`void *a` 為欲排序資料的指標,n 為資料數量。
在 ksort 模組被呼叫後,會先使用 kmalloc 動態<s>分配</s> 配置 qsort 結構體的記憶體,之後呼叫 `init_qsort` 函式
```c
static void init_qsort(struct qsort *q,
void *elems,
size_t size,
struct common *common)
{
INIT_WORK(&q->w, qsort_algo);
q->a = elems;
q->n = size;
q->common = common;
}
```
在 `init_qsort` 裡呼叫 `linux/workqueue.h` 裡的函數 `INIT_WORK` 用來初始化 `work_struct` 結構體並將它跟排序實作的函式 `qsort_algo` 連結。
使用
```c
queue_work(struct workqueue_struct *workqueue, struct work_struct w)
```
能夠將初始化的 work 加入到 workqueue 中開始執行。
##### quick sort 的實作
在 ksort 裡對於 quick sort 的實作參考〈[Engeineering a sort function](https://cs.fit.edu/~pkc/classes/writing/papers/bentley93engineering.pdf)〉。
- [ ] pivot 的選擇
quick sort 演算法的最佳情況發生在每次 pivot 的選擇皆為資料中位數時,但同時中位數的計算也會產生運算成本,
:::danger
提供數學證明。
:::
![圖片](https://hackmd.io/_uploads/SyPhgsh80.png)
以上圖的決策樹為例,對應的實作為
:::danger
使用 Graphviz 製圖。
:::
```c
static inline char *med3(char *a,
char *b,
char *c,
cmp_t *cmp,
__attribute__((unused)) void *thunk)
{
return CMP(thunk, a, b) < 0
? (CMP(thunk, b, c) < 0 ? b : (CMP(thunk, a, c) < 0 ? c : a))
: (CMP(thunk, b, c) > 0 ? b : (CMP(thunk, a, c) < 0 ? a : c));
}
```
每次呼叫 `med3` 的時候都會至少增加 2 次,至多增加 3 次比較,而如果想更進一步的使用三組元素中位數的中位數,相較隨機選取 pivot 會至少增加 8 次,至多 12 次的比較,因此對於小數量的資料使用三個元素的中位數,而大數量的資料則使用三組元素中位數的中位數。
以資料數量為 40 為界,資料數量小於 40 則使用資料的第一個元素,最後一個元素,正中央元素三者的中位數當作 pivot,而當資料數量大於 40 時由於計算中位數的時間成本較效能的提昇帶來的效益小,因此選擇三組資料,計算每組的中位數,再以這些中位數的中位數作為 pivot
```c
pm = (char *) a + (n / 2) * es;
if (n > 7) {
pl = (char *) a;
pn = (char *) a + (n - 1) * es;
if (n > 40) {
d = (n / 8) * es;
pl = med3(pl, pl + d, pl + 2 * d, cmp, thunk);
pm = med3(pm - d, pm, pm + d, cmp, thunk);
pn = med3(pn - 2 * d, pn - d, pn, cmp, thunk);
}
pm = med3(pl, pm, pn, cmp, thunk);
}
```
:::danger
注意用語:
* call 是「呼叫」,而非「調用」
:::
:::danger
區分「函數」和「函式」,見: https://hackmd.io/@sysprog/it-vocabulary
:::
透過在<s>調用</s> 比較<s>函數</s> 時計數全域變數 `cmp_cnt` 可以得到使用隨機元素當作 pivot ,使用三個元素的中位數當作 pivot ,和使用三組元素中位數的中位數當作 pivot 對比較次數的影響。
隨機資料:
```c
inbuf[i] = rand() % n;
```
![圖片](https://hackmd.io/_uploads/HJYNGicUC.png)
![圖片](https://hackmd.io/_uploads/S1XBzscUR.png)
部份排序完成的資料
```c
inbuf[i] = (rand() % 2) ? (j += 2) : (k += 2);
```
![圖片](https://hackmd.io/_uploads/B1787o5LC.png)
![圖片](https://hackmd.io/_uploads/S1hU7scI0.png)
可以看到使用三組元素中位數的中位數(圖中的 Med of 9)在這兩種資料分佈上皆優於另外兩者
###### 改進 partition
原本使用兩個指標從資料的頭和尾掃描能將資料分為大於等於 pivot,小於等於 pivot,和 pivot 本身三個部份
![2024-06-23 22-51-19](https://hackmd.io/_uploads/SyG2I2rUA.png)
```c
i = 0;
j = n;
for (;;) {
do i++; while (i < n && a[i] < a[0]);
do j--; while (a[j] > a[0]);
if (j < i) break;
swap(i, j, a);
}
swap(0, j, a);
```
這麼做的缺點是在遇到有大量重複資料的時候因為無法很好的區隔等於 pivot 的資料導致分割的效率會很差,因此這裡的實作在犧牲部份性能的情況下使分割操作在有重複資料時的效能可以進一步提升。
:::danger
數學分析呢?
:::
假設 n 個資料裡面有 n / 2 個相同的元素,則當選到該元素作為 pivot 時若使用原本的方法還是需要 $n\lg{n}$ 次的比較,但如果使用下面的方法,在一開始就能把相同的元素全部選取出來,因此只需要對剩下的元素進行分割,因此只需要 $\frac{n}{2}lg(\frac{n}{2})$ 次的比較,對執行效率有很大的提升。
```c
a = b = 0;
c = d = n-1;
for (;;) {
while (b <= c && x[b] <= v) {
if (x[b] == v) iswap(a++, b, x);
b++;
}
while (c >= b && x[c] >= v) {
if (x[c] == v) iswap(d--, c, x);
c--;
}
if (b > c) break;
iswap(b++, c--, x);
}
s = min(a, b-a);
for(l = 0, h = b-s; s; s--) iswap(l++, h++, x);
s = min(d-c, n-1-d);
for(l = b, h = n-s; s; s--) iswap(l++, h++, x);
```
以上的程式碼能夠在前後指標走訪資料的同時將等於 pivot 的部份交換到資料的前後端
![2024-06-27 17-03-02](https://hackmd.io/_uploads/H1dmoj9LR.png)
在最後做交換能夠將資料分為小於 pivto,等於 pivot,大於 pivot 三個部份
![2024-06-27 17-05-16](https://hackmd.io/_uploads/HJHDsi9IR.png)
首先將 pivot 交換到數列開頭,有四個指標,a 與 b 一開始起點為數列的起始點,接著移動 b ,過程中若遇到和 pivot 相同的元素則將其和 a 指向的元素作交換,交換的同時增加 a ,直到遇到大於 pivot 的資料停止,而後 c 與 d 也是作相同的操作,只是交換後是將 d 作遞減,直到遇到小於 pivot 的資料停止,此時交換 b 與 c ,重複以上動作直到 c < b,最後將與 pivot 相等的部份交換到數列中央。
用以下數列作示範,假設 pivot 為 3
```graphviz
digraph structs {
node[shape=plaintext];
a [label = "a"];
b [label = "b"];
d [label = "d"];
c [label = "c"];
node[shape=box];
struct0 [label= "3"];
struct1 [label= "8"];
struct2 [label= "2"];
struct3 [label= "7"];
struct4 [label= "3"];
struct5 [label= "3"];
struct6 [label= "1"];
struct7 [label= "2"];
a->struct0;
b->struct0;
d->struct7;
c->struct7;
{
rank="same";
struct0 -> struct1
struct1 -> struct2
struct2 -> struct3
struct3 -> struct4
struct4 -> struct5
struct5 -> struct6
struct6 -> struct7
}
}
```
```graphviz
digraph structs {
node[shape=plaintext];
a [label = "a"];
b [label = "b"];
d [label = "d"];
c [label = "c"];
node[shape=box];
struct0 [label= "3"];
struct1 [label= "8"];
struct2 [label= "2"];
struct3 [label= "7"];
struct4 [label= "3"];
struct5 [label= "3"];
struct6 [label= "1"];
struct7 [label= "2"];
a->struct0;
b->struct1;
d->struct7;
c->struct6;
{
rank="same";
struct0 -> struct1
struct1 -> struct2
struct2 -> struct3
struct3 -> struct4
struct4 -> struct5
struct5 -> struct6
struct6 -> struct7
}
}
```
```graphviz
digraph structs {
node[shape=plaintext];
a [label = "a"];
b [label = "b"];
d [label = "d"];
c [label = "c"];
node[shape=box];
struct0 [label= "3"];
struct1 [label= "1"];
struct2 [label= "2"];
struct3 [label= "7"];
struct4 [label= "3"];
struct5 [label= "3"];
struct6 [label= "8"];
struct7 [label= "2"];
a->struct0;
b->struct2;
d->struct7;
c->struct5;
{
rank="same";
struct0 -> struct1
struct1 -> struct2
struct2 -> struct3
struct3 -> struct4
struct4 -> struct5
struct5 -> struct6
struct6 -> struct7
}
}
```
```graphviz
digraph structs {
node[shape=plaintext];
a [label = "a"];
b [label = "b"];
d [label = "d"];
c [label = "c"];
node[shape=box];
struct0 [label= "3"];
struct1 [label= "1"];
struct2 [label= "2"];
struct3 [label= "7"];
struct4 [label= "3"];
struct5 [label= "2"];
struct6 [label= "8"];
struct7 [label= "3"];
a->struct0;
b->struct3;
d->struct7;
c->struct5;
{
rank="same";
struct0 -> struct1
struct1 -> struct2
struct2 -> struct3
struct3 -> struct4
struct4 -> struct5
struct5 -> struct6
struct6 -> struct7
}
}
```
```graphviz
digraph structs {
node[shape=plaintext];
a [label = "a"];
b [label = "b"];
d [label = "d"];
c [label = "c"];
node[shape=box];
struct0 [label= "3"];
struct1 [label= "1"];
struct2 [label= "2"];
struct3 [label= "2"];
struct4 [label= "3"];
struct5 [label= "7"];
struct6 [label= "8"];
struct7 [label= "3"];
a->struct0;
b->struct4;
d->struct6;
c->struct4;
{
rank="same";
struct0 -> struct1
struct1 -> struct2
struct2 -> struct3
struct3 -> struct4
struct4 -> struct5
struct5 -> struct6
struct6 -> struct7
}
}
```
```graphviz
digraph structs {
node[shape=plaintext];
a [label = "a"];
b [label = "b"];
d [label = "d"];
c [label = "c"];
node[shape=box];
struct0 [label= "3"];
struct1 [label= "3"];
struct2 [label= "2"];
struct3 [label= "2"];
struct4 [label= "1"];
struct5 [label= "7"];
struct6 [label= "8"];
struct7 [label= "3"];
a->struct1;
b->struct5;
d->struct6;
c->struct4;
{
rank="same";
struct0 -> struct1
struct1 -> struct2
struct2 -> struct3
struct3 -> struct4
struct4 -> struct5
struct5 -> struct6
struct6 -> struct7
}
}
```
```graphviz
digraph structs {
node[shape=plaintext];
a [label = "a"];
b [label = "b"];
d [label = "d"];
c [label = "c"];
node[shape=box];
struct0 [label= "2"];
struct1 [label= "2"];
struct2 [label= "1"];
struct3 [label= "3"];
struct4 [label= "3"];
struct5 [label= "3"];
struct6 [label= "7"];
struct7 [label= "8"];
a->struct1;
b->struct5;
d->struct6;
c->struct4;
{
rank="same";
struct0 -> struct1
struct1 -> struct2
struct2 -> struct3
struct3 -> struct4
struct4 -> struct5
struct5 -> struct6
struct6 -> struct7
}
}
```
如此分割完成。
- [ ] 結合 quick sort 與 insertion sort
主要分為兩個部份,一個是當資料的數量小於 7 的時候會切換成 insertion sort
```c
if (n < 7) {
for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)
for (pl = pm; pl > (char *) a && CMP(thunk, pl - es, pl) > 0;
pl -= es)
q_swap(pl, pl - es);
return;
}
```
因為 insertion sort 在少量部份有序的資料中表現會優於 quick sort
以資料 [1, 2, 3, 4, 5, 10, 9, 8, 7, 6] 為例,若使用 insertion sort 所需的比較次數為
前半段 : $5$ 次加上後半段 : $\frac{(5 - 1)5}{2} = 10$次共 15 次比較
而此時使用 quick sort 的話最佳情況則需要至少 $nlgn$ 次的比較,大約為 33 次
以下分別單獨使用 insertion sort 與 quick sort 對不同資料數量的鋸齒狀資料分佈進行排序並紀錄所花的時間
資料: [0, 1, 2, 0, 1, 2 ....]
![runtime](https://hackmd.io/_uploads/rkDd83hBR.png)
可以看到在 n < 19 的情況下 insertion sort 的表現優於 quick sort
:::danger
解釋實驗結果繪製出的曲線何以出現高低起伏。
是否引入統計模型予以修正?
:::
另一個結合 inserion sort 與 quick sort 的地方發生在判斷資料即將排序完成時,因為 insertion sort 對於已經排序好的資料只需要 $n$ 次的比較,因此相較於 quick sort 的 $n\lg{n}$ 次比較更有效率
```c
// qsort_algo()
if (swap_cnt == 0) { /* Switch to insertion sort */
r = 1 + n / 4; /* n >= 7, so r >= 2 */
for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)
for (pl = pm; pl > (char *) a && CMP(thunk, pl - es, pl) > 0;
pl -= es) {
q_swap(pl, pl - es);
if (++swap_cnt > r)
goto nevermind;
}
return;
}
```
其中 swap_cnt 在前面進行 partition 時如果有發生交換就會設為 1 ,若為 0 的話則猜測此時資料已經接近排序完成而轉向使用 insertion sort,同時在使用 insertion sort 時也會檢測交換的次數是否過多,超過一定的值代表資料還沒接近排序完成則繼續進行 partition。
- [ ] 利用 workqueue 取代遞迴
```c
nl = (pb - pa) / es;
nr = (pd - pc) / es;
if (nl > 100 && nr > 100) {
struct qsort *q = kmalloc(sizeof(struct qsort), GFP_KERNEL);
init_qsort(q, a, nl, c);
queue_work(workqueue, &q->w);
} else if (nl > 0) {
qs->a = a;
qs->n = nl;
/* The struct qsort is used for recursive call, so don't free it in
* this iteration.
*/
do_free = false;
qsort_algo(w);
}
if (nr > 0) {
a = pn - nr * es;
n = nr;
goto top;
}
```
:::danger
注意用語:
* jump 是「跳躍」,而非「跳轉」,這操作沒有「轉」
$\to$ 教育部重編國語詞典的「[轉](https://dict.revised.moe.edu.tw/dictView.jsp?ID=8114)」解釋。
:::
`nl` 和 `nr` 用來計算分割出左邊小於 pivot 的資料和右邊大於 pivot 的資料長度,若皆大於 100 則創建一個新的 work 用來繼續對左邊的資料進行分割並加入 work queue ,其餘情況針對左邊的資料進行遞迴,右邊的資料則透過 goto <s>跳轉</s> 回整個函式的開頭。所以只有當左右資料都達到一定數量後才會創建新的 work。
以下針對使用 CMWQ 的 ksort (並行版本) 和不使用 CMWQ 的 ksort (循序版本)進行執行時間的測量
資料: rand
![runtime](https://hackmd.io/_uploads/rkVc2IhL0.png)
:::danger
解釋為何改進幅度有限,運用 Amdahl's Law
:::
在並行版的 quick sort 中能夠並行化的只有遞迴的部份,其餘的諸如 pivot 的選擇和 partition 的過程都無法透過 CMWQ 的幫助得到加速
$$
S_{speedup} = \frac{1}{(1 - p) + \frac{p}{s}}
$$
這裡 Amdahl's Law 中的 p 為程式中能夠過並行加速的部份,可以看到無論並行帶來的效能<s>優ㄏ化</s> 有多大整體的效能優化
:::danger
不要濫用「優化」,見: https://hackmd.io/@sysprog/it-vocabulary
:::
$S_{speedup}$ 也會被限制在 $\frac{1}{(1 - p)}$ 。
因此可以看到圖中雖然使用了 CMWQ 後整體執行時間有下降但還是一定的限制。
#### ksort 在不同資料下的排序時間
此處分別測試了排序五種資料所需的時間,分別是 rand, sawtooth, stagger, shuffle, plateau,其分佈如下
- [ ] rand
```c
inbuf[i] = rand() % n;
```
資料分佈如下
![data_dis](https://hackmd.io/_uploads/Sy8ej6MU0.png)
- [ ] sawtooth
```c
inbuf[i] = i % 5;
```
資料分佈如下
![data_dis](https://hackmd.io/_uploads/HJd5cTG8C.png)
- [ ] stagger
```c
inbuf[i] = (i * 100 + i) % n;
```
資料分佈如下
![data_dis](https://hackmd.io/_uploads/SJjMipML0.png)
- [ ] shuffle
```c
inbuf[i] = (rand() % 2) ? (j += 2) : (k += 2);
```
資料分佈如下
![data_dis](https://hackmd.io/_uploads/rkYEjazLA.png)
- [ ] plateau
```c
inbuf[i] = ((i * 100 + i) % n < 500) ? 0 : 500;
```
資料分佈如下
![data_dis](https://hackmd.io/_uploads/ryRSAazIA.png)
測量出的執行時間為
![runtime](https://hackmd.io/_uploads/r1JPuiILC.png)
其中 sawtooth 與 plateau 都具有大量的重複元素,執行的結果也是這兩者快於其他,證明了 partition 策略的改進對於有著重複元素的資料分佈有不少幫助。
## TODO: 整合第一週測驗提到的 Timsort
> 予以並行化
## TODO: 實作 Pattern Defeating Quicksort (pdqsort)
> 比較 Timsort, pdqsort 和 Linux 核心 lib/sort.c,並確保可從使用者層級的程式對裝置檔案進行設定 (如 write 系統呼叫),讓這些排序實作存在於同一個 Linux 核心模組,並得以切換和比較。
> 過程中需要量化執行時間,可在 Linux 核心模組和使用者空間 (userspace) 中測量。在 Linux 核心模組中,可用 ktime 系列的 API,而在使用者層級可用 clock_gettime 相關 API,分別用 gnuplot 製圖。
> 善用統計模型,除去極端數值,過程中應詳述你的手法;
> 需要針對不同的情境去準備測試資料;
## TODO: 引入 PRNG
> 考慮到產生亂數的時間和可預測性,改用 xorshift128+ 作為 PRNG,並善用 xoro 核心模組。
> 資料排序要考慮到演算法最差的狀況,不只有亂數輸入。
> 針對執行結果,予以討論,附上數學分析和解讀。