# linux 期末專題 > 執行人: ChengChaoChun ## [Engineering a Sort Function](https://cs.fit.edu/~pkc/classes/writing/papers/bentley93engineering.pdf) (quick sort 的改進) 快速排序(QuickSort)是一種高效的排序演算法,由 Tony Hoare 在1960年提出。它使用分治法(Divide and Conquer)策略來排序陣列。其基本思想是通過一個樞紐元素(pivot)將陣列劃分成兩部分,左邊部分的所有元素都比樞紐元素小,右邊部分的所有元素都比樞紐元素大,然後對這兩部分分別進行遞迴排序。 ### pivot key 的選擇 pivot key 用於將陣列分成兩部分,使得左側部分的所有元素都小於等於 pivot key,而右側部分的所有元素都大於等於 pivot key。最好的情況是 pivot key 左側的元素個數等於右側元素的個數,然而每次要找到這個中位數是不切實際的,因此可以隨機選擇一個元素做為 pivot key ,也許表現會比直接選擇第一個元素做 pivot key 要好。 ``` void qsort1(char *a, int n, int es, int (*cmp)()) { int j; char *pi, *pj, *pn; if (n <= 1) return; pi = a + (rand() % n) * es; // 選擇隨機元素 swap(a, pi, es); pi = a; pj = pn = a + n * es; for (;;) { do pi += es; while (pi < pn && cmp(pi, a) < 0); do pj -= es; while (cmp(pj, a) > 0); if (pj < pi) break; swap(pi, pj, es); } swap(a, pj, es); j = (pj - a) / es; qsort1(a, j, es, cmp); qsort1(a + (j+1)*es, n-j-1, es, cmp); } ``` 隨機選擇一個元素做為 pivot key 的比較次數大約為 1.386n lg n,為了提高快速排序效率,作者採用 the median of the medians of three samples 方法來選擇 pivot key 。經過分析,這個 pivot key 會更接近陣列的中位數。然而,這會增加比較次數,而比較是需要時間成本的。對於較大陣列來說,這是可以接受的,但對於小陣列來說則顯得昂貴。因此,對於小陣列,作者選擇使用中間元素做為 pivot key。 ``` pm = a + (n/2)*es; /* Small arrays, middle element */ if (n > 7) { pl = a; pn = a + (n-1)*es; if (n > 40) { /* Big arrays, pseudomedian of 9 */ s = (n/8)*es; pl = med3(pl, pl+s, pl+2*s, cmp); pm = med3(pm-s, pm, pm+s, cmp); pn = med3(pn-2*s, pn-s, pn, cmp); } pm = med3(pl, pm, pn, cmp); /* Mid-size, med of 3 */ } ``` ### 相同元素的處理 為了讓程式在大多數輸入情況下都有良好的運行時間,我們需要處理陣列中相同的元素。 ![螢幕擷取畫面 2024-06-06 132317](https://hackmd.io/_uploads/BkbGOTCE0.png) 這裡使用 4 個指標 a, b, c, d , a 和 b 指向陣列左端,而 c 和 d 指向陣列右端。 b 向右掃描,遇到相等的元素時交換到索引 a 所指的元素位置,並在遇到比 pivot key 大的元素時停下來。同樣的,將索引 c 向左移動,遇到相等的元素時交換 c 和 d 所指的元素,並在遇到比 pivot key 小的元素時停下來,然後 b 指向的元素與 c 交換,直到 b 與 c 相遇。 ![螢幕擷取畫面 2024-06-06 132324](https://hackmd.io/_uploads/rJDnoTRV0.png) 接著將兩端等於 pivot key 的元素移動至中間,遞迴執行左右兩測的元素,直到排序完成。 ![螢幕擷取畫面 2024-06-06 144716](https://hackmd.io/_uploads/HyiYiRR40.png) ``` void iqsort2(int *x, int n) { int a, b, c, d, l, h, s, v; if (n <= 1) return; v = x[rand() % n]; a = b = 0; c=d= n-1; // 將等於 pivot key 的元素移動至兩端,並且左側的元素均小於 pivot key ,右側均大於 pivot key for (;;) { while (b <= c && x[b] <= v) { if (x[b] == v) iswap(a++, b, x); b++; } while (c >= b && x[c] >= v) { if (x[c] == v) iswap(d--, c, x); c--; } if (b > c) break; iswap(b++, c--, x); } // 等於 pivot key 的元素移動至中間 s = min(a, b-a); for(l = 0, h = b-s; s; s--) iswap(l++, h++, x); s = min(d-c, n-1-d); for(l = b, h = n-s; s; s--) iswap(l++, h++, x); // 遞迴執行 iqsort2(x, b-a); iqsort2(x + n-(d-c), d-c); } ``` ### 快速排序最終版本 總結 : * 插入排序在排序小陣列上比起快速排序更快,因此元素個數少於 7 的陣列直接使用插入排序來排序,否則使用快速排序。 * 使用快速排序時,選擇陣列中間的元素做為 pivot key ,若陣列長度大於 40 則採用 the median of the medians of three samples 方法來選擇 pivot key。 * 等於 pivot key 的元素替換至兩端。 * b 指標遇到大於 pivot key 的元素時就會跳出第一個 while 迴圈,而 c 指標遇到小於 pivot key 的元素時就會跳出第二個 while 迴圈,然後將 b 和 c 指標指向的元素交換,這部分與傳統的快速排序相同。 * b 指標與 c 指標相遇後,將左右兩端的元素移動至中間,接著遞迴執行排序。 ``` void qsort(char *a, size_t n, size_t es, int (*cmp)()) { char *pa, *pb, *pc, *pd, *pl, *pm, *pn, *pv; int r, swaptype; WORD t, v; size_t s; SWAPINIT(a, es); if (n < 7) { /* Insertion sort on smallest arrays */ for (pm = a + es; pm < a + n*es; pm += es) for (pl = pm; pl > a && cmp(pl-es, pl) > 0; pl -= es) swap(pl, pl-es); return; } pm = a + (n/2)*es; /* Small arrays, middle element */ if (n > 7) { pl = a; pn = a + (n-1)*es; if (n > 40) { /* Big arrays, pseudomedian of 9 */ s = (n/8)*es; pl = med3(pl, pl+s, pl+2*s, cmp); pm = med3(pm-s, pm, pm+s, cmp); pn = med3(pn-2*s, pn-s, pn, cmp); } pm = med3(pl, pm, pn, cmp); /* Mid-size, med of 3 */ } PVINIT(pv, pm); /* pv points to partition value */ pa = pb = a; pc = pd = a + (n-1)*es; for (;;) { while (pb <= pc && (r = cmp(pb, pv)) <= 0) { if (r == 0) { swap(pa, pb); pa += es; } pb += es; } while (pc >= pb && (r = cmp(pc, pv)) >= 0) { if (r == 0) { swap(pc, pd); pd -= es; } pc -= es; } if (pb > pc) break; swap(pb, pc); pb += es; pc -= es; } pn = a + n*es; s = min(pa-a, pb-pa ); vecswap(a, pb-s, s); s = min(pd-pc, pn-pd-es); vecswap(pb, pn-s, s); if ((s = pb-pa) > es) qsort(a, s/es, es, cmp); if ((s = pd-pc) > es) qsort(pn-s, s/es, es, cmp); } ``` ## [ksort](https://github.com/sysprog21/ksort/blob/master/sort_impl.c) 排序 ksort 是一個處理並行排序的 Linux 核心模組。 ### 並行排序實作 (`sort_impl.c`) ksort 使用 [Engineering a Sort Function](https://cs.fit.edu/~pkc/classes/writing/papers/bentley93engineering.pdf) 提出的 quick sort 版本,並改進,加上 linux 核心 的 CMWQ 來實作並行排序。 在 `sort_impl.c` 的 161 行多了一個 `swap_cnt` 變數的判斷式,如果`swap_cnt` 等於 0 就切換成插入排序。往前看到 130 行到 153 行程式,可以發現一但需要使用 `q_swap` 巨集,`swap_cnt` 就會被設置為 1 。因此判斷式成立的條件是,在第一次進入 for 循環時, `pb` 指標前的元素都小於 pivot key,且 `pc` 指標之後的元素都大於 pivot key ,也就是說當前陣列有可能已經大致是有序的,此時使用插入排序可能可以達到最好的情況,即時間複雜度為 $n$。 變數 r 設置了交換次數的上限。儘管此時 `swap_cnt` 為 0 ,但是使用插入排序來排序 `pb` 指標之前的元素有可能會形成最壞的情況,即時間複雜度為 $n^2$,同樣的,排序 `pc` 指標之後的元素也可能會形成最壞的情況,因此這裡限制了交換次數的上限來保證排序的效率。 ``` if (swap_cnt == 0) { /* Switch to insertion sort */ r = 1 + n / 4; /* n >= 7, so r >= 2 */ for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es) for (pl = pm; pl > (char *) a && CMP(thunk, pl - es, pl) > 0;pl -= es) { q_swap(pl, pl - es); if (++swap_cnt > r) goto nevermind; } return; } ``` nl 與 nr 分別表示 pb 指標以前待排序的元素個數與 pc 指標以後待排序的元素個數。 * 當 nl > 100 且 nr > 100 : 新建一個 work item 並放到 workqueue 來排序 pb 指標以前的元素(並行排序),而 pc 指標以後的元素則跳轉到 top 標籤排序(在當前 work item)。 * 當 nl > 0 : 遞迴執行函式來排序 pb 指標以前的元素。遞迴在當前執行流上立刻排序,而 CMWQ 是非同步的執行,因此待排序元素較少時使用遞迴更有效率。 ``` nevermind: nl = (pb - pa) / es; nr = (pd - pc) / es; if (nl > 100 && nr > 100) { struct qsort *q = kmalloc(sizeof(struct qsort), GFP_KERNEL); init_qsort(q, a, nl, c); queue_work(workqueue, &q->w); } else if (nl > 0) { qs->a = a; qs->n = nl; /* The struct qsort is used for recursive call, so don't free it in * this iteration. */ do_free = false; qsort_algo(w); } if (nr > 0) { a = pn - nr * es; n = nr; goto top; } if (do_free) kfree(qs); ``` ### 透過系統呼叫來設定排序 目前在 ksort 模組有 qsort , linux sort , 以及 pdqsort。 使用 write 系統呼叫來選擇使用的排序。當 sort_selector 為 1 時,使用 qsort ,當 sort_selector 為 2 時,使用 linux sort ,當 sort_selector 為 3 時,使用 pdqsort。 ``` static int sort_selector = 0; static ssize_t sort_write(struct file *file, const char *buf, size_t size, loff_t *offset) { void *set_sort_buf = kmalloc(size, GFP_KERNEL); if (!set_sort_buf) return 0; unsigned long len; len = copy_from_user(set_sort_buf, buf, size); if (len != 0) return 0; int set_sort = *(int *)set_sort_buf; if(set_sort > 0 && set_sort < 4){ sort_selector = set_sort; printk("sort_selector : %d", set_sort); return set_sort; }else{ printk("sort_selector error"); return 0; } } ``` ### 排序時間比較 #### 使用者空間 (userspace) 中測量 * **降序** ![螢幕擷取畫面 2024-06-22 181633](https://hackmd.io/_uploads/SyAiVmVUR.png) * **Fisher Yates shuffling** ![螢幕擷取畫面 2024-06-22 181906](https://hackmd.io/_uploads/HJZvB7VIA.png) | ![螢幕擷取畫面 2024-06-22 185425](https://hackmd.io/_uploads/ryyR6XEI0.png) | ![螢幕擷取畫面 2024-06-22 185446](https://hackmd.io/_uploads/rkSmAXNI0.png) | |-|-| * **使用 xoro 核心模組** 使用 xoro 核心模組生成的偽隨機數時,程式無法正常運作,後來發現是 qsort 出現問題,還沒有找到解決辦法。 ![螢幕擷取畫面 2024-06-22 184426](https://hackmd.io/_uploads/HyPHo74U0.png) #### Linux 核心模組中測量 * **降序** ![螢幕擷取畫面 2024-06-22 181558](https://hackmd.io/_uploads/ByWiVQNI0.png) * **Fisher Yates shuffling** ![螢幕擷取畫面 2024-06-22 181935](https://hackmd.io/_uploads/B1avr7ELA.png) | ![螢幕擷取畫面 2024-06-22 185201](https://hackmd.io/_uploads/SkmWa7V8R.png) | ![螢幕擷取畫面 2024-06-22 184802](https://hackmd.io/_uploads/B1FO2X4LR.png) | |-|-| * **使用 xoro 核心模組** ![螢幕擷取畫面 2024-06-22 184457](https://hackmd.io/_uploads/HJnvs7ELR.png) ### ksort 改進 取消並行排序的排序速度會比並行排序要快一點點。 ``` nevermind: nl = (pb - pa) / es; nr = (pd - pc) / es; if (nl > 0) { qs->a = a; qs->n = nl; /* The struct qsort is used for recursive call, so don't free it in * this iteration. */ do_free = false; qsort_algo(w); } if (nr > 0) { a = pn - nr * es; n = nr; goto top; } if (do_free) kfree(qs); ``` * 比較結果 (userspace) ![螢幕擷取畫面 2024-06-27 023651](https://hackmd.io/_uploads/SyFJekqUC.png) ![螢幕擷取畫面 2024-06-27 023849](https://hackmd.io/_uploads/S1Swxy5L0.png) * 比較結果 (kernelspace) ![螢幕擷取畫面 2024-06-27 023623](https://hackmd.io/_uploads/Sy7gg15IR.png) ![螢幕擷取畫面 2024-06-27 023824](https://hackmd.io/_uploads/ry4LekqIR.png) ## [simrupt](https://github.com/sysprog21/simrupt) 研究 simrupt 是一個利用定時器模擬中斷的裝置,其實作技術包括以下幾個方面: * 資料結構 (kfifo , circular buffer) * Interrupt Handler Top Half (timer) * Bottom Half (CMWQ, tasklet) ### linux 中斷處理 在Linux系統中,中斷處理常式通常被分為 Top Half 和 Bottom Half,這種分層處理有助於保證系統的快速響應和高效率。 * Top Half :是直接處理中斷事件的部分。它必須快速執行,通常用於執行必要的最小程式碼來快速響應中斷。 * Bottom Half :用於延遲處理的部分,可以在稍後的時間點執行。這是為了避免長時間處於 interupt context ,從而提高系統的響應性和效率。 Bottom Half 的實作方式有 Softirq, Tasklet , Workqueue。 ### 運行 simrupt 掛載 simrupt 模組。 ``` sudo insmod simrupt.ko ``` 執行下面命令 ``` sudo cat /dev/simrupt ``` 輸出 : ``` !"#$%&'()*+,-./012345 ..... ``` ### 分析 simrupt 在掛載 simrupt 模組之後,系統會執行模組初始化。詳細過程參考 [simrupt_init](https://hackmd.io/@sysprog/linux2024-integration/%2F%40sysprog%2Flinux2024-integration-c#simrupt_init)。 在模組初始化中,設置定時器。 ``` /* Setup the timer */ timer_setup(&timer, timer_handler, 0); ``` 執行 `sudo cat /dev/simrupt` 命令後,首先執行 open 系統呼叫接著再執行 read 系統呼叫。 當前只有一個 process 在使用 simrupt 時,便會設置定時器超時時間。 ``` static int simrupt_open(struct inode *inode, struct file *filp) { pr_debug("simrupt: %s\n", __func__); if (atomic_inc_return(&open_cnt) == 1) mod_timer(&timer, jiffies + msecs_to_jiffies(delay)); //設置定時器超時時間。 pr_info("open current cnt: %d\n", atomic_read(&open_cnt)); return 0; } ``` 設置好定時器後,每當定時器超時,就執行 每當產生定時器中斷時,便執行`timer_handler` 函式。 ![螢幕擷取畫面 2024-06-25 200954](https://hackmd.io/_uploads/BkU27EuIA.png) `timer_handler` 函式部分內容 : ``` /* We are using a kernel timer to simulate a hard-irq, so we must expect * to be in softirq context here. */ WARN_ON_ONCE(!in_softirq()); /* Disable interrupts for this CPU to simulate real interrupt context */ local_irq_disable(); tv_start = ktime_get(); process_data(); tv_end = ktime_get(); nsecs = (s64) ktime_to_ns(ktime_sub(tv_end, tv_start)); pr_info("simrupt: [CPU#%d] %s in_irq: %llu usec\n", smp_processor_id(), __func__, (unsigned long long) nsecs >> 10); mod_timer(&timer, jiffies + msecs_to_jiffies(delay)); local_irq_enable(); ``` 在上面提到中斷處理過程可以分為 Top Half 和 Bottom Half 。 Top Half 用來快速處理中斷,剩下的複雜處理程序在交由 Bottom Half 處理。 由於 Bottom Half 的處理時間較長,為了提高系統的響應速度,因此 Bottom Half 的執行可以被中斷。相對而言,Top Half 的執行是不可中斷的,不過由於它們執行速度快,因此也不會對作業系統的效率產生太大影響,這也是將中斷處理過程分成 Top Half 和 Bottom Half 的原因。 在 `timer_handler` 函式中,需要確保當前在系統運行在 Bottom Half (可被中斷),這樣才能執行 Top Half 。執行 Top Half 時先關閉中斷,結束後才把中斷打開, 這裡的 `process_data()` 就是 Top Half。 `process_data` 函式 : ``` static void process_data(void) { WARN_ON_ONCE(!irqs_disabled()); pr_info("simrupt: [CPU#%d] produce data\n", smp_processor_id()); fast_buf_put(update_simrupt_data()); pr_info("simrupt: [CPU#%d] scheduling tasklet\n", smp_processor_id()); tasklet_schedule(&simrupt_tasklet); } ``` `process_data` 要做的只是產生一筆資料然後放到 circuler buffer,後續部分就交給 Bottom Half。 `simrupt_tasklet_func` 函式部分內容 ``` static DECLARE_WORK(work, simrupt_work_func); static void simrupt_tasklet_func(unsigned long __data) { . . queue_work(simrupt_workqueue, &work); . . } ``` 將 `simrupt_work_func` 放到 workqueue。 `simrupt_work_func` 函式部分內容 : ``` /* Workqueue handler: executed by a kernel thread */ static void simrupt_work_func(struct work_struct *w) { . . while (1) { /* Consume data from the circular buffer */ mutex_lock(&consumer_lock); val = fast_buf_get(); mutex_unlock(&consumer_lock); if (val < 0) break; /* Store data to the kfifo buffer */ mutex_lock(&producer_lock); produce_data(val); mutex_unlock(&producer_lock); } wake_up_interruptible(&rx_wait); } ``` `simrupt_work_func` 從 circular buffer 讀取資料,然後放到 kfifo。 。 前面提到,執行 cat 命令後會首先執行 open 系統呼叫,接著會不斷執行 read 系統呼叫。因此,每當 kfifo 有新資料時,就會被讀取出來,從而看到 simrupt 不斷輸出<s>字符</s>字符。 :::danger 注意用語: * character 是「字元」,不是「字符」 * interrupt service routine 是「中斷處理常式」,而非「中斷處理程序」 * implement 是「實作」,而非「實現」 :::