Try   HackMD

J11: sort

tags: linux2021

主講人: jserv / 課程討論區: 2021 年系統軟體課程

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
返回「Linux 核心設計」課程進度表

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
預期目標

Hybrid sorting

除了在教科書出現的經典排序演算法,如 merge sort 和 quick sort,在諸多軟體會採用截長補短的策略,融合兩項或更多排序演算法,這樣的案例包含 TimsortIntrosort

Introsort 可看做 quick sort 的強化,遞迴分割陣列,區間越來越短,數字也幾乎排序好。對於幾乎已經排序好的短區間,quick sort 表現不佳,於是切換為 heapsort。分水嶺通常設定成

logN2=2×logNN 是輸入資料的長度。之前學員已嘗試實作 Introsort,可見:

以下是參考的 Introsort 實作:

/**
 * introsort is comprised of:
 * - Quicksort with explicit stack instead of recursion, and ignoring small
 *   partitions
 * - Binary heapsort with Floyd's optimization, for stack depth > 2log2(n)
 * - Final shellsort pass on skipped small partitions (small gaps only)
 */

#include <limits.h>
#include <stdint.h>
#include <string.h>

typedef int (*comparator_t)(const void *, const void *);

typedef struct {
    char *low, *high;
} stack_node_t;

#define idx(x) (x) * data_size                 /* manual indexing */
#define STACK_SIZE (sizeof(size_t) * CHAR_BIT) /* size constant */

static inline int __log2(size_t x)
{
    return 63 - __builtin_clzll(x);
}

#define WORD_ALIGNED 1
static inline void swap(void *restrict _a, void *restrict _b, size_t data_size)
{
#if WORD_ALIGNED
    long *a = (long *) _a, *b = (long *) _b;
    while ((data_size--) / sizeof(long) > 0) {
        long tmp = *a;
        *a++ = *b;
        *b++ = tmp;
    }
#else
    char *a = (char *) _a, *b = (char *) _b;
    while (data_size-- > 0) {
        char tmp = *a;
        *a++ = *b;
        *b++ = tmp;
    }
#endif
}

void sort(void *_array,
          size_t length,
          size_t data_size,
          comparator_t comparator)
{
    if (length == 0)
        return;

    char *array = (char *) _array;
    const size_t max_thresh = data_size << 2;
    const int max_depth = __log2(length) << 1;

    /* Temporary storage used by both heapsort and shellsort */
    char tmp[data_size];

    if (length > 16) {
        char *low = array, *high = array + idx(length - 1);
        stack_node_t stack[STACK_SIZE] = {0};
        stack_node_t *top = stack + 1;

        int depth = 0;
        while (stack < top) {
            /* Exceeded max depth: do heapsort on this partition */
            if (depth > max_depth) {
                size_t part_length = (size_t)((high - low) / data_size) - 1;
                if (part_length > 0) {
                    size_t i, j, k = part_length >> 1;

                    /* heapification */
                    do {
                        i = k;
                        j = (i << 1) + 2;
                        memcpy(tmp, low + idx(i), data_size);

                        while (j <= part_length) {
                            if (j < part_length)
                                j += (comparator(low + idx(j),
                                                 low + idx(j + 1)) < 0);
                            if (comparator(low + idx(j), tmp) <= 0)
                                break;
                            memcpy(low + idx(i), low + idx(j), data_size);
                            i = j;
                            j = (i << 1) + 2;
                        }

                        memcpy(low + idx(i), tmp, data_size);
                    } while (k-- > 0);

                    /* heapsort */
                    do {
                        i = part_length;
                        j = 0;
                        memcpy(tmp, low + idx(part_length), data_size);

                        /* Floyd's optimization:
                         * Not checking low[j] <= tmp saves nlog2(n) comparisons
                         */
                        while (j < part_length) {
                            if (j < part_length - 1)
                                j += (comparator(low + idx(j),
                                                 low + idx(j + 1)) < 0);
                            memcpy(low + idx(i), low + idx(j), data_size);
                            i = j;
                            j = (i << 1) + 2;
                        }

                        /* Compensate for Floyd's optimization by sifting down
                         * tmp. This adds O(n) comparisons and moves.
                         */
                        while (i > 1) {
                            j = (i - 2) >> 1;
                            if (comparator(tmp, low + idx(j)) <= 0)
                                break;
                            memcpy(low + idx(i), low + idx(j), data_size);
                            i = j;
                        }

                        memcpy(low + idx(i), tmp, data_size);
                    } while (part_length-- > 0);
                }

                /* pop next partition from stack */
                --top;
                --depth;
                low = top->low;
                high = top->high;
                continue;
            }

            /* 3-way "Dutch national flag" partition */
            char *mid = low + data_size * ((high - low) / data_size >> 1);
            if (comparator(mid, low) < 0)
                swap(mid, low, data_size);
            if (comparator(mid, high) > 0)
                swap(mid, high, data_size);
            else
                goto skip;
            if (comparator(mid, low) < 0)
                swap(mid, low, data_size);

        skip:;
            char *left = low + data_size, *right = high - data_size;

            /* sort this partition */
            do {
                while (comparator(left, mid) < 0)
                    left += data_size;
                while (comparator(mid, right) < 0)
                    right -= data_size;

                if (left < right) {
                    swap(left, right, data_size);
                    if (mid == left)
                        mid = right;
                    else if (mid == right)
                        mid = left;
                    left += data_size, right -= data_size;
                } else if (left == right) {
                    left += data_size, right -= data_size;
                    break;
                }
            } while (left <= right);

            /* Prepare the next iteration
             * Push larger partition and sort the other; unless one or both
             * smaller than threshold, then leave it to the final shellsort.
             * Both below threshold
             */
            if ((size_t)(right - low) <= max_thresh &&
                (size_t)(high - left) <= max_thresh) {
                --top;
                --depth;
                low = top->low;
                high = top->high;
            }
            /* Left below threshold */
            else if ((size_t)(right - low) <= max_thresh &&
                     (size_t)(high - left) > max_thresh) {
                low = left;
            }
            /* Right below threshold */
            else if ((size_t)(right - low) > max_thresh &&
                     (size_t)(high - left) <= max_thresh) {
                high = right;
            } else {
                /* Push big left, sort smaller right */
                if ((right - low) > (high - left)) {
                    top->low = low, top->high = right;
                    low = left;
                    ++top;
                    ++depth;
                } else { /* Push big right, sort smaller left */
                    top->low = left, top->high = high;
                    high = right;
                    ++top;
                    ++depth;
                }
            }
        }
    }

    /* Clean up the leftovers with shellsort.
     * Already mostly sorted; use only small gaps.
     */
    const size_t gaps[2] = {1ul, 4ul};

    int i = 1;
    do {
        if (length < 4)
            continue;

        for (size_t j = gaps[i], k = j; j < length; k = ++j) {
            memcpy(tmp, array + idx(k), data_size);

            while (k >= gaps[i] &&
                   comparator(array + idx(k - gaps[i]), tmp) > 0) {
                memcpy(array + idx(k), array + idx(k - gaps[i]), data_size);
                k -= gaps[i];
            }

            memcpy(array + idx(k), tmp, data_size);
        }
    } while (i-- > 0);
}

Introsort 又可進一步擴充為 Pattern Defeating Quicksort,可簡稱為 pdqsort,縮減比較的次數,進行細部改良,可參見論文 Pattern-defeating Quicksort

swenson/sort 彙整多項排序實作,並提供多種情境的效能評估,參考執行輸出:

Qsort 100000 x86_64                       109        9216431.2 ns/op
Binary_insertion_sort 100000 x86_64         1     1020912000.0 ns/op
Bitonic_sort 100000 x86_64                  2      994466000.0 ns/op
Quick_sort 100000 x86_64                  185        5419470.3 ns/op
Merge_sort 100000 x86_64                  151        6638079.5 ns/op
Heap_sort 100000 x86_64                   164        6099676.8 ns/op
Shell_sort 100000 x86_64                  113        8874380.5 ns/op
Tim_sort 100000 x86_64                    135        7445088.9 ns/op
Merge_sort_in_place 100000 x86_64         158        6372259.5 ns/op
Grail_sort 100000 x86_64                  122        8244967.2 ns/op
Sqrt_sort 100000 x86_64                   144        6978180.6 ns/op
Rec_stable_sort 100000 x86_64              36       27847444.4 ns/op
Grail_sort_dyn_buffer 100000 x86_64       139        7201316.5 ns/op

延伸閱讀:

  • CppCon 2019: Andrei Alexandrescu "Speed Is Found In The Minds of People"

    In all likelihood, sorting is one of the most researched classes of algorithms. It is a fundamental task in Computer Science, both on its own and as a step in other algorithms. Efficient algorithms for sorting and searching are now taught in core undergraduate classes. Are they at their best, or is there more blood to squeeze from that stone? This talk will explore a few less known – but more allegro! – variants of classic sorting algorithms. And as they say, the road matters more than the destination. Along the way, we'll encounter many wondrous surprises and we'll learn how to cope with the puzzling behavior of modern complex architectures.

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
Pseudorandom number generator

xoroshiro128+ 是個 Pseudorandom number generator,有著優異的效能表現,被若干 JavaScript 引擎採用 (如 V8, Chrome, Firefox, Safari),儘管無法通過 BigCrush 測試

詳閱 xoshiro / xoroshiro generators and the PRNG shootout

sysprog21/ksort 整合 xoroshiro128+ 作為亂數產生器,並實作對應的 Linux 核心模組,從而允許使用者層級的資料存取。

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
ksort: 改寫自 Linux 核心的排序實作

sysprog21/ksort 從 Linux 核心原始程式碼中,萃取以下檔案:

注意 lib/sort.c 採用 heapsort,平均可達到

n×log2n+0.37×n+o(n) 次比較,在最差的狀況下則是
1.5×n×log2n+O(n)
次。

開發環境的需求與 fibdrv 描述相同,不該在虛擬機器環境裡測試,否則會有效能偏差的疑慮

取得原始程式碼並測試:

$ git clone https://github.com/sysprog21/ksort
$ cd ksort
$ make
$ make check

預期可見二次 Passed [-] 訊息。

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
作業要求

  • 自 GitHub 對 sysprog21/ksort 進行 fork,並引入以下修改:
    • 比照 fibdrv 的手法,在 Linux 核心模組內,透過 ktime 取得時間並紀錄排序實作佔用的時間
    • 改寫 swenson/sort 的程式碼並整合進 ksort,擴充測試情境,並分析現有 Linux 核心採用的 heapsort 效能表現。注意亂數產生器應採用 xoroshiro128+,現有 ksort 已提供對應的實作
    • 實作上述的 introsort 或 pdqsort,並整合進 ksort,須確保和標頭檔 include/linux/sort.h 有著一致的存取介面 (但實作可更換,善用 C 語言:物件導向程式設計篇提到的技巧)
    • 在實體機器比較你新增的 introsort/pdqsort 和 Linux 核心的 lib/sort.c 效能落差,並嘗試解釋具體原因

      看到這裡,你有沒有滿心期待:可準備貢獻程式碼到 Linux 核心!

  • 一旦 ksort 裡頭的 introsort/pdqsort 實作經過驗證後,透過 User-Mode Linux 的實驗環境,替換 Linux 核心原始程式碼的 lib/sort.c 為你的排序實作,並進行系統測試
    • 搭配 GDB 設定中斷點,觀察使用到 sortsort_r 函式的 Linux 核心程式碼,舉例說明
  • 將你的共筆加到 2021q1 Homework5 (作業區)
  • 截止日期:
    • Apr 12, 2021 (含) 之前