# linux 期末專題
> 執行人: ChengChaoChun
## [Engineering a Sort Function](https://cs.fit.edu/~pkc/classes/writing/papers/bentley93engineering.pdf) (quick sort 的改進)
快速排序(QuickSort)是一種高效的排序演算法,由 Tony Hoare 在1960年提出。它使用分治法(Divide and Conquer)策略來排序陣列。其基本思想是通過一個樞紐元素(pivot)將陣列劃分成兩部分,左邊部分的所有元素都比樞紐元素小,右邊部分的所有元素都比樞紐元素大,然後對這兩部分分別進行遞迴排序。
### pivot key 的選擇
pivot key 用於將陣列分成兩部分,使得左側部分的所有元素都小於等於 pivot key,而右側部分的所有元素都大於等於 pivot key。最好的情況是 pivot key 左側的元素個數等於右側元素的個數,然而每次要找到這個中位數是不切實際的,因此可以隨機選擇一個元素做為 pivot key ,也許表現會比直接選擇第一個元素做 pivot key 要好。
```
void qsort1(char *a, int n, int es, int (*cmp)())
{
int j;
char *pi, *pj, *pn;
if (n <= 1) return;
pi = a + (rand() % n) * es; // 選擇隨機元素
swap(a, pi, es);
pi = a;
pj = pn = a + n * es;
for (;;) {
do pi += es; while (pi < pn && cmp(pi, a) < 0);
do pj -= es; while (cmp(pj, a) > 0);
if (pj < pi) break;
swap(pi, pj, es);
}
swap(a, pj, es);
j = (pj - a) / es;
qsort1(a, j, es, cmp);
qsort1(a + (j+1)*es, n-j-1, es, cmp);
}
```
隨機選擇一個元素做為 pivot key 的比較次數大約為 1.386n lg n,為了提高快速排序效率,作者採用 the median of the medians of three samples 方法來選擇 pivot key 。經過分析,這個 pivot key 會更接近陣列的中位數。然而,這會增加比較次數,而比較是需要時間成本的。對於較大陣列來說,這是可以接受的,但對於小陣列來說則顯得昂貴。因此,對於小陣列,作者選擇使用中間元素做為 pivot key。
```
pm = a + (n/2)*es; /* Small arrays, middle element */
if (n > 7) {
pl = a;
pn = a + (n-1)*es;
if (n > 40) { /* Big arrays, pseudomedian of 9 */
s = (n/8)*es;
pl = med3(pl, pl+s, pl+2*s, cmp);
pm = med3(pm-s, pm, pm+s, cmp);
pn = med3(pn-2*s, pn-s, pn, cmp);
}
pm = med3(pl, pm, pn, cmp); /* Mid-size, med of 3 */
}
```
### 相同元素的處理
為了讓程式在大多數輸入情況下都有良好的運行時間,我們需要處理陣列中相同的元素。

這裡使用 4 個指標 a, b, c, d , a 和 b 指向陣列左端,而 c 和 d 指向陣列右端。 b 向右掃描,遇到相等的元素時交換到索引 a 所指的元素位置,並在遇到比 pivot key 大的元素時停下來。同樣的,將索引 c 向左移動,遇到相等的元素時交換 c 和 d 所指的元素,並在遇到比 pivot key 小的元素時停下來,然後 b 指向的元素與 c 交換,直到 b 與 c 相遇。

接著將兩端等於 pivot key 的元素移動至中間,遞迴執行左右兩測的元素,直到排序完成。

```
void iqsort2(int *x, int n)
{
int a, b, c, d, l, h, s, v;
if (n <= 1) return;
v = x[rand() % n];
a = b = 0;
c=d= n-1;
// 將等於 pivot key 的元素移動至兩端,並且左側的元素均小於 pivot key ,右側均大於 pivot key
for (;;) {
while (b <= c && x[b] <= v) {
if (x[b] == v) iswap(a++, b, x);
b++;
}
while (c >= b && x[c] >= v) {
if (x[c] == v) iswap(d--, c, x);
c--;
}
if (b > c) break;
iswap(b++, c--, x);
}
// 等於 pivot key 的元素移動至中間
s = min(a, b-a);
for(l = 0, h = b-s; s; s--) iswap(l++, h++, x);
s = min(d-c, n-1-d);
for(l = b, h = n-s; s; s--) iswap(l++, h++, x);
// 遞迴執行
iqsort2(x, b-a);
iqsort2(x + n-(d-c), d-c);
}
```
### 快速排序最終版本
總結 :
* 插入排序在排序小陣列上比起快速排序更快,因此元素個數少於 7 的陣列直接使用插入排序來排序,否則使用快速排序。
* 使用快速排序時,選擇陣列中間的元素做為 pivot key ,若陣列長度大於 40 則採用 the median of the medians of three samples 方法來選擇 pivot key。
* 等於 pivot key 的元素替換至兩端。
* b 指標遇到大於 pivot key 的元素時就會跳出第一個 while 迴圈,而 c 指標遇到小於 pivot key 的元素時就會跳出第二個 while 迴圈,然後將 b 和 c 指標指向的元素交換,這部分與傳統的快速排序相同。
* b 指標與 c 指標相遇後,將左右兩端的元素移動至中間,接著遞迴執行排序。
```
void qsort(char *a, size_t n, size_t es, int (*cmp)())
{
char *pa, *pb, *pc, *pd, *pl, *pm, *pn, *pv;
int r, swaptype;
WORD t, v;
size_t s;
SWAPINIT(a, es);
if (n < 7) { /* Insertion sort on smallest arrays */
for (pm = a + es; pm < a + n*es; pm += es)
for (pl = pm; pl > a && cmp(pl-es, pl) > 0; pl -= es)
swap(pl, pl-es);
return;
}
pm = a + (n/2)*es; /* Small arrays, middle element */
if (n > 7) {
pl = a;
pn = a + (n-1)*es;
if (n > 40) { /* Big arrays, pseudomedian of 9 */
s = (n/8)*es;
pl = med3(pl, pl+s, pl+2*s, cmp);
pm = med3(pm-s, pm, pm+s, cmp);
pn = med3(pn-2*s, pn-s, pn, cmp);
}
pm = med3(pl, pm, pn, cmp); /* Mid-size, med of 3 */
}
PVINIT(pv, pm); /* pv points to partition value */
pa = pb = a;
pc = pd = a + (n-1)*es;
for (;;) {
while (pb <= pc && (r = cmp(pb, pv)) <= 0) {
if (r == 0) { swap(pa, pb); pa += es; }
pb += es;
}
while (pc >= pb && (r = cmp(pc, pv)) >= 0) {
if (r == 0) { swap(pc, pd); pd -= es; }
pc -= es;
}
if (pb > pc) break;
swap(pb, pc);
pb += es;
pc -= es;
}
pn = a + n*es;
s = min(pa-a, pb-pa ); vecswap(a, pb-s, s);
s = min(pd-pc, pn-pd-es); vecswap(pb, pn-s, s);
if ((s = pb-pa) > es) qsort(a, s/es, es, cmp);
if ((s = pd-pc) > es) qsort(pn-s, s/es, es, cmp);
}
```
## [ksort](https://github.com/sysprog21/ksort/blob/master/sort_impl.c) 排序
ksort 是一個處理並行排序的 Linux 核心模組。
### 並行排序實作 (`sort_impl.c`)
ksort 使用 [Engineering a Sort Function](https://cs.fit.edu/~pkc/classes/writing/papers/bentley93engineering.pdf) 提出的 quick sort 版本,並改進,加上 linux 核心 的 CMWQ 來實作並行排序。
在 `sort_impl.c` 的 161 行多了一個 `swap_cnt` 變數的判斷式,如果`swap_cnt` 等於 0 就切換成插入排序。往前看到 130 行到 153 行程式,可以發現一但需要使用 `q_swap` 巨集,`swap_cnt` 就會被設置為 1 。因此判斷式成立的條件是,在第一次進入 for 循環時, `pb` 指標前的元素都小於 pivot key,且 `pc` 指標之後的元素都大於 pivot key ,也就是說當前陣列有可能已經大致是有序的,此時使用插入排序可能可以達到最好的情況,即時間複雜度為 $n$。
變數 r 設置了交換次數的上限。儘管此時 `swap_cnt` 為 0 ,但是使用插入排序來排序 `pb` 指標之前的元素有可能會形成最壞的情況,即時間複雜度為 $n^2$,同樣的,排序 `pc` 指標之後的元素也可能會形成最壞的情況,因此這裡限制了交換次數的上限來保證排序的效率。
```
if (swap_cnt == 0) { /* Switch to insertion sort */
r = 1 + n / 4; /* n >= 7, so r >= 2 */
for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)
for (pl = pm; pl > (char *) a && CMP(thunk, pl - es, pl) > 0;pl -= es) {
q_swap(pl, pl - es);
if (++swap_cnt > r)
goto nevermind;
}
return;
}
```
nl 與 nr 分別表示 pb 指標以前待排序的元素個數與 pc 指標以後待排序的元素個數。
* 當 nl > 100 且 nr > 100 : 新建一個 work item 並放到 workqueue 來排序 pb 指標以前的元素(並行排序),而 pc 指標以後的元素則跳轉到 top 標籤排序(在當前 work item)。
* 當 nl > 0 : 遞迴執行函式來排序 pb 指標以前的元素。遞迴在當前執行流上立刻排序,而 CMWQ 是非同步的執行,因此待排序元素較少時使用遞迴更有效率。
```
nevermind:
nl = (pb - pa) / es;
nr = (pd - pc) / es;
if (nl > 100 && nr > 100) {
struct qsort *q = kmalloc(sizeof(struct qsort), GFP_KERNEL);
init_qsort(q, a, nl, c);
queue_work(workqueue, &q->w);
} else if (nl > 0) {
qs->a = a;
qs->n = nl;
/* The struct qsort is used for recursive call, so don't free it in
* this iteration.
*/
do_free = false;
qsort_algo(w);
}
if (nr > 0) {
a = pn - nr * es;
n = nr;
goto top;
}
if (do_free)
kfree(qs);
```
### 透過系統呼叫來設定排序
目前在 ksort 模組有 qsort , linux sort , 以及 pdqsort。
使用 write 系統呼叫來選擇使用的排序。當 sort_selector 為 1 時,使用 qsort ,當 sort_selector 為 2 時,使用 linux sort ,當 sort_selector 為 3 時,使用 pdqsort。
```
static int sort_selector = 0;
static ssize_t sort_write(struct file *file,
const char *buf,
size_t size,
loff_t *offset)
{
void *set_sort_buf = kmalloc(size, GFP_KERNEL);
if (!set_sort_buf)
return 0;
unsigned long len;
len = copy_from_user(set_sort_buf, buf, size);
if (len != 0)
return 0;
int set_sort = *(int *)set_sort_buf;
if(set_sort > 0 && set_sort < 4){
sort_selector = set_sort;
printk("sort_selector : %d", set_sort);
return set_sort;
}else{
printk("sort_selector error");
return 0;
}
}
```
### 排序時間比較
#### 使用者空間 (userspace) 中測量
* **降序**

* **Fisher Yates shuffling**

|  |  |
|-|-|
* **使用 xoro 核心模組**
使用 xoro 核心模組生成的偽隨機數時,程式無法正常運作,後來發現是 qsort 出現問題,還沒有找到解決辦法。

#### Linux 核心模組中測量
* **降序**

* **Fisher Yates shuffling**

|  |  |
|-|-|
* **使用 xoro 核心模組**

### ksort 改進
取消並行排序的排序速度會比並行排序要快一點點。
```
nevermind:
nl = (pb - pa) / es;
nr = (pd - pc) / es;
if (nl > 0) {
qs->a = a;
qs->n = nl;
/* The struct qsort is used for recursive call, so don't free it in
* this iteration.
*/
do_free = false;
qsort_algo(w);
}
if (nr > 0) {
a = pn - nr * es;
n = nr;
goto top;
}
if (do_free)
kfree(qs);
```
* 比較結果 (userspace)


* 比較結果 (kernelspace)


## [simrupt](https://github.com/sysprog21/simrupt) 研究
simrupt 是一個利用定時器模擬中斷的裝置,其實作技術包括以下幾個方面:
* 資料結構 (kfifo , circular buffer)
* Interrupt Handler Top Half (timer)
* Bottom Half (CMWQ, tasklet)
### linux 中斷處理
在Linux系統中,中斷處理常式通常被分為 Top Half 和 Bottom Half,這種分層處理有助於保證系統的快速響應和高效率。
* Top Half :是直接處理中斷事件的部分。它必須快速執行,通常用於執行必要的最小程式碼來快速響應中斷。
* Bottom Half :用於延遲處理的部分,可以在稍後的時間點執行。這是為了避免長時間處於 interupt context ,從而提高系統的響應性和效率。 Bottom Half 的實作方式有 Softirq, Tasklet , Workqueue。
### 運行 simrupt
掛載 simrupt 模組。
```
sudo insmod simrupt.ko
```
執行下面命令
```
sudo cat /dev/simrupt
```
輸出 :
```
!"#$%&'()*+,-./012345 .....
```
### 分析 simrupt
在掛載 simrupt 模組之後,系統會執行模組初始化。詳細過程參考 [simrupt_init](https://hackmd.io/@sysprog/linux2024-integration/%2F%40sysprog%2Flinux2024-integration-c#simrupt_init)。
在模組初始化中,設置定時器。
```
/* Setup the timer */
timer_setup(&timer, timer_handler, 0);
```
執行 `sudo cat /dev/simrupt` 命令後,首先執行 open 系統呼叫接著再執行 read 系統呼叫。
當前只有一個 process 在使用 simrupt 時,便會設置定時器超時時間。
```
static int simrupt_open(struct inode *inode, struct file *filp)
{
pr_debug("simrupt: %s\n", __func__);
if (atomic_inc_return(&open_cnt) == 1)
mod_timer(&timer, jiffies + msecs_to_jiffies(delay)); //設置定時器超時時間。
pr_info("open current cnt: %d\n", atomic_read(&open_cnt));
return 0;
}
```
設置好定時器後,每當定時器超時,就執行 每當產生定時器中斷時,便執行`timer_handler` 函式。

`timer_handler` 函式部分內容 :
```
/* We are using a kernel timer to simulate a hard-irq, so we must expect
* to be in softirq context here.
*/
WARN_ON_ONCE(!in_softirq());
/* Disable interrupts for this CPU to simulate real interrupt context */
local_irq_disable();
tv_start = ktime_get();
process_data();
tv_end = ktime_get();
nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start));
pr_info("simrupt: [CPU#%d] %s in_irq: %llu usec\n", smp_processor_id(),
__func__, (unsigned long long) nsecs >> 10);
mod_timer(&timer, jiffies + msecs_to_jiffies(delay));
local_irq_enable();
```
在上面提到中斷處理過程可以分為 Top Half 和 Bottom Half 。 Top Half 用來快速處理中斷,剩下的複雜處理程序在交由 Bottom Half 處理。
由於 Bottom Half 的處理時間較長,為了提高系統的響應速度,因此 Bottom Half 的執行可以被中斷。相對而言,Top Half 的執行是不可中斷的,不過由於它們執行速度快,因此也不會對作業系統的效率產生太大影響,這也是將中斷處理過程分成 Top Half 和 Bottom Half 的原因。
在 `timer_handler` 函式中,需要確保當前在系統運行在 Bottom Half (可被中斷),這樣才能執行 Top Half 。執行 Top Half 時先關閉中斷,結束後才把中斷打開, 這裡的 `process_data()` 就是 Top Half。
`process_data` 函式 :
```
static void process_data(void)
{
WARN_ON_ONCE(!irqs_disabled());
pr_info("simrupt: [CPU#%d] produce data\n", smp_processor_id());
fast_buf_put(update_simrupt_data());
pr_info("simrupt: [CPU#%d] scheduling tasklet\n", smp_processor_id());
tasklet_schedule(&simrupt_tasklet);
}
```
`process_data` 要做的只是產生一筆資料然後放到 circuler buffer,後續部分就交給 Bottom Half。
`simrupt_tasklet_func` 函式部分內容
```
static DECLARE_WORK(work, simrupt_work_func);
static void simrupt_tasklet_func(unsigned long __data)
{
.
.
queue_work(simrupt_workqueue, &work);
.
.
}
```
將 `simrupt_work_func` 放到 workqueue。
`simrupt_work_func` 函式部分內容 :
```
/* Workqueue handler: executed by a kernel thread */
static void simrupt_work_func(struct work_struct *w)
{
.
.
while (1) {
/* Consume data from the circular buffer */
mutex_lock(&consumer_lock);
val = fast_buf_get();
mutex_unlock(&consumer_lock);
if (val < 0)
break;
/* Store data to the kfifo buffer */
mutex_lock(&producer_lock);
produce_data(val);
mutex_unlock(&producer_lock);
}
wake_up_interruptible(&rx_wait);
}
```
`simrupt_work_func` 從 circular buffer 讀取資料,然後放到 kfifo。 。
前面提到,執行 cat 命令後會首先執行 open 系統呼叫,接著會不斷執行 read 系統呼叫。因此,每當 kfifo 有新資料時,就會被讀取出來,從而看到 simrupt 不斷輸出<s>字符</s>字符。
:::danger
注意用語:
* character 是「字元」,不是「字符」
* interrupt service routine 是「中斷處理常式」,而非「中斷處理程序」
* implement 是「實作」,而非「實現」
:::