### 測驗 `1` 參考 [Optimized QuickSort — C Implementation (Non-Recursive)](https://alienryderflex.com/quicksort/),實作非遞迴 (non-recursive; iterative) 的快速排序法。此處提到的方法是以 swap 為主體,利用 `L` 與 `R` 去紀錄需交換的數量,再用 `begin[]` 與 `end[]` 作為堆疊,用來紀錄比較的範圍。 假定下方是起始的陣列內容。採取 head 作為 pivot,並且 L 會從左邊開始掃,紀錄有多少值小於 pivot,R 從右邊開始,紀錄有多少值大於 pivot。(R 一開始會定義為陣列的 length,以減法的方式紀錄) > 這裡的 L 就會是 3,而 R 就會是 5 ```graphviz digraph graphname{ node[shape=box] head[shape=plaintext,fontcolor=red] pivot[shape=plaintext,fontcolor=red] L[shape=plaintext,fontcolor=green] R[shape=plaintext,fontcolor=green] rankdir=LR { rankdir = LR P[label=4] A[label=4] B[label=1] C[label=3] D[label=5] E[label=2] F[label=7] NULL1[label=NULL,shape=plaintext] } { rank="same" pivot->P head->A } { rank="same" L->D } { rank="same" R->E } A->B->C->D->E->F->NULL1 } ``` 於是會把 array[4] 放到 array[0],array[3] 放到 array[4],再把 pivot 放入 array[3] ```graphviz digraph graphname{ node[shape=box] head[shape=plaintext,fontcolor=red] pivot[shape=plaintext,fontcolor=red] rankdir=LR { rankdir = LR P[label=4] A[label=2] B[label=1] C[label=3] D[label=4] E[label=5] F[label=7] NULL1[label=NULL,shape=plaintext] } { rank="same" head->A pivot->P } A->B->C->D->E->F->NULL1 } ``` 再來就是利用 begin 與 end 作為堆疊,去儲存尚未排列好的序列,由於 pivot 目前在 array[3] 的位置,所以就會有一組 begin 跟 end 為 0~2、一組為 4~5。之後會重複上述的行為,一次會以 5 為 pivot,另一次則會以 2 為 pivot,就可以得到排序好的陣列。 ```graphviz digraph graphname{ node[shape=box] head[shape=plaintext,fontcolor=red] rankdir=LR { rankdir = LR A[label=1] B[label=2] C[label=3] D[label=4] E[label=5] F[label=7] NULL1[label=NULL,shape=plaintext] } { rank="same" head->A } A->B->C->D->E->F->NULL1 } ``` 用變數 `i` 紀錄現在的 `stack` 在什麼位置,每次迴圈的開始都進行 pop 的操作: ```c node_t *top = stk[i--]; ``` 依照相對於 `pivot` 大小整理節點的步驟沒有不同。而分類完 `left` 和 `right` 之後,本來應該各自遞迴計算的部分換成「放進堆疊中」,換言之,這份程式碼嘗試用堆疊模擬原本的遞迴呼叫。 考慮以下的鏈結串列結構體: ```c typedef struct __node { struct __node *left, *right; struct __node *next; long value; } node_t; ``` 對應的操作函式如下: (部分遮蔽,亦即 `AAAA` 和 `BBBB`) ```c void list_add(node_t **list, node_t *node_t) { node_t->next = *list; *list = node_t; } node_t *list_tail(node_t **left) { while ((*left) && (*left)->next) left = &(AAAA); return *left; } int list_length(node_t **left) { int n = 0; while (*left) { ++n; left = &(BBBB); } return n; } node_t *list_construct(node_t *list, int n) { node_t *node = malloc(sizeof(node_t)); node->next = list; node->value = n; return node; } void list_free(node_t **list) { node_t *node = (*list)->next; while (*list) { free(*list); *list = node; if (node) node = node->next; } } ``` 當快速排序的實作程式碼可用時,我們準備以下程式碼: ```c #include <assert.h> #include <stdbool.h> #include <stdint.h> #include <stdio.h> #include <stdlib.h> /* Verify if list is order */ static bool list_is_ordered(node_t *list) { bool first = true; int value; while (list) { if (first) { value = list->value; first = false; } else { if (list->value < value) return false; value = list->value; } list = list->next; } return true; } /* shuffle array, only work if n < RAND_MAX */ void shuffle(int *array, size_t n) { if (n <= 0) return; for (size_t i = 0; i < n - 1; i++) { size_t j = i + rand() / (RAND_MAX / (n - i) + 1); int t = array[j]; array[j] = array[i]; array[i] = t; } } int main(int argc, char **argv) { node_t *list = NULL; size_t count = 100000; int *test_arr = malloc(sizeof(int) * count); for (int i = 0; i < count; ++i) test_arr[i] = i; shuffle(test_arr, count); while (count--) list = list_construct(list, test_arr[count]); quick_sort(&list); assert(list_is_ordered(list)); list_free(&list); free(test_arr); return; } ``` 以下是運用 [Optimized QuickSort — C Implementation (Non-Recursive)](https://alienryderflex.com/quicksort/) 所實作非遞迴 (non-recursive; iterative) 的快速排序法程式碼: (部分遮蔽) ```c void quick_sort(node_t **list) { int n = list_length(list); int value; int i = 0; int max_level = 2 * n; node_t *begin[max_level], *end[max_level]; node_t *result = NULL, *left = NULL, *right = NULL; begin[0] = *list; end[0] = list_tail(list); while (i >= 0) { node_t *L = begin[i], *R = end[i]; if (L != R) { node_t *pivot = L; value = pivot->value; node_t *p = pivot->next; pivot->next = NULL; while (p) { node_t *n = p; p = CCCC; list_add(n->value > value ? &right : &left, n); } begin[i] = left; end[i] = DDDD; begin[i + 1] = pivot; end[i + 1] = pivot; begin[i + 2] = right; end[i + 2] = EEEE; left = right = NULL; i += 2; } else { if (L) list_add(&result, L); i--; } } *list = result; } ``` 假設 `malloc` 總是成功,且不會觸發 [assert](https://man7.org/linux/man-pages/man3/assert.3.html)。請補完程式碼,使其得以符合預期運作。作答規範: * 全部應以最簡潔的形式撰寫,避免空白字元 :::success 延伸問題: 1. 解釋上述程式碼的運作原理,提出改進方案並予以實作。 2. 使用 Linux 核心風格的 List API 改寫上述程式碼,並針對鏈結串列,提出可避免最差狀況的快速排序實作,應設計效能評比的測試程式。 ::: --- ### 測驗 `2` Tim Peter 他結合合併排序和插入排序的特點,提出 [Timsort](https://en.wikipedia.org/wiki/Timsort),如今已廣泛用於 Python, Android, Java, Chrome 及 Swift 等場景。Tim Peter 注意到,在現實世界中,許多資料往往是由一些已經部分排序的序列組合而成,於是他的策略是首先識別出資料中已排序的子序列 (這些子序列稱為 run),然後透過不斷合併這些 run 來達成全資料範圍的排序。對於隨機資料,最佳的排序效率上限被約束在 $O(n\log{n})$,但即便是在 $O(n\log{n})$ 等級的排序演算法中,其效能與理論上的極限值 $O(n\log{n})$ 之間仍存在差距。Timsort 在處理隨機資料時,表現略遜一籌,但對於部分已排序的資料則有出色的表現。 Timsort 致力於從以下三個方面,改進合併排序演算法: 1. 加快合併(merge)過程 2. 減少進行合併的次數。 3. 在某些特定情況下,尋找比合併排序更有效的排序方法 考慮以下資料: $$ {5, 6, 7, 8, 9, 10, 1, 2, 3} $$ 如何藉由最少的合併次數,來排序這個數列呢? 直觀上,將數列分為兩部分:已按升序排列的 ${5, 6, 7, 8, 9, 10}$ 和 ${1, 2, 3}$,然後將這二部分合併,僅需一次合併操作,即可完成排序。 考慮略為複雜的案例: $$ {1,2,3,4,3,2,4,7,8} $$ 這由 3 個 run 構成: * ${1,2,3,4}$ * ${3,2}$ * ${4,7,8}$ 三者都是單調的 (monotonic),在實際程式中,對於單調遞減的 run,會被反轉成遞增的序列。 ![0-run-1](https://hackmd.io/_uploads/H1ifAW8np.gif) 合併序列時,若 run 的數量等於或者略小於 2 的冪,效率會最高;若略大於 2 的冪,效率就會特別低。Timsort 為了使合併過程更加均衡,需要儘量控制每個 run 的長度,定義一個 minrun (最小運行長度),用以表示每個 run 的最小長度,若長度太短,就用二分插入排序,將 run 後面的元素插入到前面的 run 裡面。 以上面的案例來說,若 minrun=5,於是首個 run 不符合要求,就會把後面的 3 插入到首個 run 裡面,變成 $$ {1,2,3,3,4} $$ ![0-run-2](https://hackmd.io/_uploads/Sy4mCWU36.gif) 顯然,選擇 minrun 的策略是 Timsort 的關鍵,其中一個實務上的方法是,取資料長度 $N$ 的前 6 位,若剩餘位中任何一位被設置,則加 1。這主要是為了在處理隨機數列時,使得 $N$ 能夠被 minrun 切分成 2 的冪的 run,以便在合併時達到最佳平衡。 minrun 的設計目的是讓剩餘資料在分割為 minrun 時,能夠盡可能接近 2 的冪,從而使得後續的合併過程更高效;實際操作中,會不斷將 n 除以 2,直到 n 小於一個預設的最小合併長度(`MIN_MERGE`),過程中只要有任何一次不能整除 2,最終結果就加一,這種方法確保分組接近 2 的冪。 例如,當 $N$ 為 2112 時,若 minrun 設為 32,則會切分成 66 個 run,合併時會產生 2048 + 64 的不平衡;但如果 minrun 設為 33,則會被切分成 64 個 run,其分割結果為 2 的冪,合併時達到完美平衡。 Timsort 採用一組堆疊 (stack) 來暫存 run,此舉是為了避免在一開始就掃描整個資料範圍來產生 run,從而降低記憶體開銷。過程中,Timsort 運用 `merge_collapse` 函式來確保堆疊上的 run 長度保持平衡。該函式主要檢查堆疊頂端的 3 個 run 是否滿足以下原則: 1. A 的長度要大於 B 和 C 的長度總和。 2. B 的長度要大於 C 的長度。 當不符合這些條件時,函式會決定是否進行合併。例如,考慮 3 段 run: A 的長度為 30,B 的長度為 20,C 的長度為 10,由於 A 的長度不大於 B 加 C 的總和(亦即 $A <= B + C$),且 C 的長度小於 A,因此會選擇將 C 與 B 進行合併,形成兩個新的 run * A: 30 * BC: 30。 藉此,不僅確保堆疊中 run 的長度平衡,且因合併只能在相鄰的兩個 run 間進行,以確保排序的穩定性,因此合併操作僅可能採取 A+(B+C) 或 (A+B)+C 的形式進行。此舉讓 Timsort 在執行時兼顧高效又穩定。 處理兩個相鄰子數列的合併過程中,直接在記憶體中操作會遇到一定的挑戰,因為大量的記憶體操作不僅實作困難,且效率也未必理想。因此,Timsort 採取了使用一塊臨時記憶區的策略,其大小設定為兩個子數列(A、B)中較短者的長度。 當 A 的長度小於 B 時,會先將 A 數列暫存。直觀的合併方法是從 A 和 B 的開頭開始進行逐對比較,將較小的元素放置於 A 的原位置,這一過程一直持續到 A 和 B 完全排序,類似於經典的合併排序類似,也稱為逐對合併(one-pair-at-a-time)。 然而,考慮到 A 和 B 都是已排序好的數列,我們可持續改進:首先尋找 B 的首個元素(即 $B_0$)在 A 中的排序位置,從而確定 A 中有一段是小於 $B_0$ 者,可將這部分元素放回 A。接著,尋找剩餘 A 的首個元素在 B 中的位置,如此反覆進行,直到完成排序。這種方法被稱為 Galloping mode。 例如,考慮以下: * A: ${1,2,3,7,8,9}$ * B: ${4,5,6,8,9,10,11}$ 顯然 A 較短,因此先將 A 放入暫存記憶區 `temp`。接著找到 $B_0$ 在 A 中的位置,即 $A_2$ 和 $A_3$ 之間,因 $B_0 > A_2$,所以 $A_0$ 到 $A_2$ 可以直接放回 A 數列。然後,將 `temp` 的首個元素放到 $B$ 中適當的位置,如此反覆進行。 這種改進方案在多數情況效果顯著,但對於隨機資料序列而言,可能會比逐對合併更慢。故在 Timsort 會有一個機制決定是否要啟動 Galloping mode 。 Timsort 相較於合併排序,有以下改善。 - [ ] 降低 cache miss 傳統合併排序的問題: * 傳統的合併排序非遞迴實作中,會從合併樹的最底層開始合併,底層合併完才進行較上層的合併。如此的缺點是,每進行其中一層的合併時,就要掃過整個序列進行合併。因此對 cache 不友善。 > 在 Linux 核心有對此進行改進,不會掃描整個序列才進行合併,而是把元素加入 Pending List 的過程中,趁這些元素還在記憶體階層上層 (如 L1 cache) 時,在適當時機就合併。 * 用合併樹的概念來說明遞迴版本的合併排序實作:在合併樹上的每一個節點,可以代表在遞迴中一次函式呼叫。若去追綜合併順序時,會發現遞迴版本相較於非遞迴版本,會先處理完左子樹後,再去處理右子樹,最後在把二個子樹合併。故合併時,二個子樹的內容都可能還在上層的記憶體階層中。在連續記憶體的情境,其合併的順序相對於非遞迴版對 cache 較友善,但對鏈結串列一類非連續性記憶體而言,進行分治(divide) 的過程中,意即決定要切在哪一個點決定左右子樹,要走訪目前遞迴呼叫對應到合併樹中內部節點,所有待排序的元素,對 cache 不友善。 Timsort 的改進: * Timesort 可認定為非遞迴版本合併排序的改進,儘量讓要排序的元素剛才存取過,還在記憶體階層上層時,即可進行合併,類似 Linux 核心的合併排序實作的作法,亦即它不必走訪整個序列才能進行合併,而是一邊走訪,就一邊把序列的元素加進待合併的 runs 當中,在適當時機就合併。 - [ ] 降低記憶體開銷 傳統合併排序在合併時,會配置一塊與待合併的二個 runs 相同大小的記憶體空間,然後把合併的內容存放到新配置的空間中。故有 $N$ 個元素時,最多需要額外 $N$ 個元素的記憶體空間。 在 Timsort 中,進行合併時,會以較複雜的演算法,只會對 2 個 runs 中,其範圍重疊的部份進行合併。這裡的重疊,不是指 2 個 runs 的元素一樣,而是在大小關係上重疊。例如 $[2, 4, 6, 7, 8, 11]$ 和 $[9, 10,11, 15, 17]$,則重疊的部份為 $[8, 11]$ 和 $[9, 10, 11]$,故只要對這二個部份進行合併就好,其他部份接上去即可。 額外空間的部份,會配置二個要合併的部份中跟較小的那一部份元素數量的記憶體。故以上述例子,只需要配置 2 個元素的空間即可合併。 - [ ] 降低比較次數 一個待排序的序列中,裡面可能有部份元素已經排序好。 Timsort 利用這個特性,在尋找 runs 時,會把已經排序好的元素加入同一個 runs ,避免需要額外的時間排序。 Timsort 合併二個 Runs 時,運用上述 Galloping mode,可減少比較次數加速合併。 延伸閱讀: [改進 `lib/list_sort.c`](https://hackmd.io/@sysprog/Hy5hmaKBh)^含解說錄影^ 以下 `list.h` 指取自 Linux 核心原始程式碼的鏈結串列實作,見 [list.h](https://github.com/sysprog21/lab0-c/blob/master/list.h)。 檔案 `sort_impl.h` 包裝 Timsort 在內的排序操作介面: ```c struct list_head; typedef int (*list_cmp_func_t)(void *, const struct list_head *, const struct list_head *); void timsort(void *priv, struct list_head *head, list_cmp_func_t cmp); ``` 我們準備用來測試 Timsort 的鏈結串列程式碼,檔名為 `main.c`: ```c #include <stdbool.h> #include <stdint.h> #include <stdio.h> #include <stdlib.h> #include "list.h" #include "sort_impl.h" typedef struct { struct list_head list; int val; int seq; } element_t; #define SAMPLES 1000 static void create_sample(struct list_head *head, element_t *space, int samples) { printf("Creating sample\n"); for (int i = 0; i < samples; i++) { element_t *elem = space + i; elem->val = rand(); list_add_tail(&elem->list, head); } } static void copy_list(struct list_head *from, struct list_head *to, element_t *space) { if (list_empty(from)) return; element_t *entry; list_for_each_entry (entry, from, list) { element_t *copy = space++; copy->val = entry->val; copy->seq = entry->seq; list_add_tail(&copy->list, to); } } int compare(void *priv, const struct list_head *a, const struct list_head *b) { if (a == b) return 0; int res = list_entry(a, element_t, list)->val - list_entry(b, element_t, list)->val; if (priv) *((int *) priv) += 1; return res; } bool check_list(struct list_head *head, int count) { if (list_empty(head)) return 0 == count; element_t *entry, *safe; size_t ctr = 0; list_for_each_entry_safe (entry, safe, head, list) { ctr++; } int unstable = 0; list_for_each_entry_safe (entry, safe, head, list) { if (entry->list.next != head) { if (entry->val > safe->val) { fprintf(stderr, "\nERROR: Wrong order\n"); return false; } if (entry->val == safe->val && entry->seq > safe->seq) unstable++; } } if (unstable) { fprintf(stderr, "\nERROR: unstable %d\n", unstable); return false; } if (ctr != SAMPLES) { fprintf(stderr, "\nERROR: Inconsistent number of elements: %ld\n", ctr); return false; } return true; } typedef void (*test_func_t)(void *priv, struct list_head *head, list_cmp_func_t cmp); typedef struct { char *name; test_func_t impl; } test_t; int main(void) { struct list_head sample_head, warmdata_head, testdata_head; int count; int nums = SAMPLES; /* Assume ASLR */ srand((uintptr_t) &main); test_t tests[] = { {.name = "timesort", .impl = timsort}, {NULL, NULL}, }; test_t *test = tests; INIT_LIST_HEAD(&sample_head); element_t *samples = malloc(sizeof(*samples) * SAMPLES); element_t *warmdata = malloc(sizeof(*warmdata) * SAMPLES); element_t *testdata = malloc(sizeof(*testdata) * SAMPLES); create_sample(&sample_head, samples, nums); while (test->impl) { printf("==== Testing %s ====\n", test->name); /* Warm up */ INIT_LIST_HEAD(&warmdata_head); INIT_LIST_HEAD(&testdata_head); copy_list(&sample_head, &testdata_head, testdata); copy_list(&sample_head, &warmdata_head, warmdata); test->impl(&count, &warmdata_head, compare); /* Test */ count = 0; test->impl(&count, &testdata_head, compare); printf(" Comparisons: %d\n", count); printf(" List is %s\n", check_list(&testdata_head, nums) ? "sorted" : "not sorted"); test++; } return 0; } ``` 接著是 Timsort 的實作,刻意不額外配置記憶體空間。檔名: `timsort.c` (部分遮蔽) ```c #include <stdint.h> #include <stdlib.h> #include "list.h" #include "sort_impl.h" static inline size_t run_size(struct list_head *head) { if (!head) return 0; if (!head->next) return 1; return (size_t) (head->next->prev); } struct pair { struct list_head *head, *next; }; static size_t stk_size; static struct list_head *merge(void *priv, list_cmp_func_t cmp, struct list_head *a, struct list_head *b) { struct list_head *head; struct list_head **tail = AAAA; for (;;) { /* if equal, take 'a' -- important for sort stability */ if (cmp(priv, a, b) <= 0) { *tail = a; tail = BBBB; a = a->next; if (!a) { *tail = b; break; } } else { *tail = b; tail = CCCC; b = b->next; if (!b) { *tail = a; break; } } } return head; } static void build_prev_link(struct list_head *head, struct list_head *tail, struct list_head *list) { tail->next = list; do { list->prev = tail; tail = list; list = list->next; } while (list); /* The final links to make a circular doubly-linked list */ DDDD = head; EEEE = tail; } static void merge_final(void *priv, list_cmp_func_t cmp, struct list_head *head, struct list_head *a, struct list_head *b) { struct list_head *tail = head; for (;;) { /* if equal, take 'a' -- important for sort stability */ if (cmp(priv, a, b) <= 0) { tail->next = a; a->prev = tail; tail = a; a = a->next; if (!a) break; } else { tail->next = b; b->prev = tail; tail = b; b = b->next; if (!b) { b = a; break; } } } /* Finish linking remainder of list b on to tail */ build_prev_link(head, tail, b); } static struct pair find_run(void *priv, struct list_head *list, list_cmp_func_t cmp) { size_t len = 1; struct list_head *next = list->next, *head = list; struct pair result; if (!next) { result.head = head, result.next = next; return result; } if (cmp(priv, list, next) > 0) { /* decending run, also reverse the list */ struct list_head *prev = NULL; do { len++; list->next = prev; prev = list; list = next; next = list->next; head = list; } while (next && cmp(priv, list, next) > 0); list->next = prev; } else { do { len++; list = next; next = list->next; } while (next && cmp(priv, list, next) <= 0); list->next = NULL; } head->prev = NULL; head->next->prev = (struct list_head *) len; result.head = head, result.next = next; return result; } static struct list_head *merge_at(void *priv, list_cmp_func_t cmp, struct list_head *at) { size_t len = run_size(at) + run_size(at->prev); struct list_head *prev = at->prev->prev; struct list_head *list = merge(priv, cmp, at->prev, at); list->prev = prev; list->next->prev = (struct list_head *) len; --stk_size; return list; } static struct list_head *merge_force_collapse(void *priv, list_cmp_func_t cmp, struct list_head *tp) { while (stk_size >= 3) { if (run_size(tp->prev->prev) < run_size(tp)) { tp->prev = merge_at(priv, cmp, tp->prev); } else { tp = merge_at(priv, cmp, tp); } } return tp; } static struct list_head *merge_collapse(void *priv, list_cmp_func_t cmp, struct list_head *tp) { int n; while ((n = stk_size) >= 2) { if ((n >= 3 && run_size(tp->prev->prev) <= run_size(tp->prev) + run_size(tp)) || (n >= 4 && run_size(tp->prev->prev->prev) <= run_size(tp->prev->prev) + run_size(tp->prev))) { if (run_size(tp->prev->prev) < run_size(tp)) { tp->prev = merge_at(priv, cmp, tp->prev); } else { tp = merge_at(priv, cmp, tp); } } else if (run_size(tp->prev) <= run_size(tp)) { tp = merge_at(priv, cmp, tp); } else { break; } } return tp; } void timsort(void *priv, struct list_head *head, list_cmp_func_t cmp) { stk_size = 0; struct list_head *list = head->next, *tp = NULL; if (head == head->prev) return; /* Convert to a null-terminated singly-linked list. */ head->prev->next = NULL; do { /* Find next run */ struct pair result = find_run(priv, list, cmp); result.head->prev = tp; tp = result.head; list = result.next; stk_size++; tp = merge_collapse(priv, cmp, tp); } while (list); /* End of input; merge together all the runs. */ tp = merge_force_collapse(priv, cmp, tp); /* The final merge; rebuild prev links */ struct list_head *stk0 = tp, *stk1 = stk0->prev; while (stk1 && stk1->prev) stk0 = stk0->prev, stk1 = stk1->prev; if (stk_size <= FFFF) { build_prev_link(head, head, stk0); return; } merge_final(priv, cmp, head, stk1, stk0); } ``` 預期執行輸出: (Comparisons 一項可能會變化) ``` Creating sample ==== Testing timesort ==== Comparisons: 9326 List is sorted ``` 請補完程式碼,使其運作符合預期。 * 全部應以最簡潔的形式撰寫,避免空白字元 :::success 延伸問題: 1. 解釋上述程式碼的運作原理,提出改進方案並予以實作。 2. 將改進過的 Timsort 實作整合進 [sysprog21/lab0-c](https://github.com/sysprog21/lab0-c),作為另一個有效的排序命令,應設計效能評比的測試程式,確保測試案例符合 Timsort 設想的資料分布樣式。 :::