Try   HackMD

2022q1 Homework1 (lab0)

contributed by < NOVBobLee >

作業要求

注意書寫規範:中英文間用一個半形空白區隔
:notes: jserv

已修改

實驗環境

$ gcc --version
gcc (Ubuntu 9.3.0-17ubuntu1~20.04) 9.3.0
$ lscpu
Architecture:                    x86_64
CPU op-mode(s):                  32-bit, 64-bit
Byte Order:                      Little Endian
Address sizes:                   39 bits physical, 48 bits virtual
CPU(s):                          8
On-line CPU(s) list:             0-7
Thread(s) per core:              2
Core(s) per socket:              4
Socket(s):                       1
NUMA node(s):                    1
Vendor ID:                       GenuineIntel
CPU family:                      6
Model:                           94
Model name:                      Intel(R) Xeon(R) CPU E3-1230 v5 @ 3.40GHz
Stepping:                        3
CPU MHz:                         3400.000
CPU max MHz:                     3800.0000
CPU min MHz:                     800.0000
BogoMIPS:                        6799.81
Virtualization:                  VT-x
L1d cache:                       128 KiB
L1i cache:                       128 KiB
L2 cache:                        1 MiB
L3 cache:                        8 MiB
NUMA node0 CPU(s):               0-7

開發紀錄

佇列操作 queue.c

擴充 Aspell 字典

在這個開發環境裡,會使用 Aspell 輔助檢查 git commit message 裡有無拼錯字,若有單字沒在字典裡,就必須填充新字彙進去。

# append in scripts/aspell-pws
...
cscope
Makefile

扣除軟體套件的名稱,其他的詞彙不應該加入,畢竟只要用清晰的英語書寫即可涵蓋你的操作。
:notes: jserv

已修改(修改前版本)
# append in scripts/aspell-pws
...
cscope
dup
Makefile
alloc

擴充 Makefile 功能

增加產生/更新 cscope 和 tags 的功能,方便追蹤程式碼。

cscope_tags:
        @rm -f cscope.* tags
        @echo -n "Old cscope, tags revmoed..\nNew cscope.. "
        @find . -name "*.[ch]" > cscope.files
        @cscope -Rbq
        @echo "done\nNew tags.. \c" -e
        @ctags -R -h=".c.h"
        @echo "done"

兩個重要的資料結構







%0



head

prev

next
list_head



elm1

value

prev

next
element_t



head:n->elm1:p






elm2

value

prev

next
element_t



elm1:n->elm2:p






elm2:n->head:p






/* doubly-linked list */
struct list_head {
    struct list_head *next;
    struct list_head *prev;
};
/* linked list for queue elements*/
typedef struct {
    char *value;
    struct list_head list;
} element_t;

這裡 element_t 裡的 value 字元指標,其用途不只可以儲存字串,還可以儲存任意資料的地址,當然要先轉型,任意型態指標皆可被轉型為 (char *)(void *) ,反之亦同。在此文中,只使用字串。

N1256 (6.2.5-27)
A pointer to void shall have the same representation and alignment requirements as a pointer to a character type. []

N1256 (6.3.2.3-1)
A pointer to void may be converted to or from a pointer to any incomplete or object type. A pointer to any incomplete or object type may be converted to a pointer to void and back again; the result shall compare equal to the original pointer.

value 只作為字串使用,則可以利用 GNU C 擴充的 Arrays of Length Zero,使在實作上更方便簡潔。

/* using GNU C extension array-of-zero-length */
typedef struct {
    struct list_head list;
    char value[0]; /* must be the last element */
} element_t;

/* example */
int string_length = 5;
element_t *elm = malloc(sizeof(element_t) + string_length);
free(elm);

佇列除錯輔助巨集

靜態的除錯輔助用巨集,會列印參數 node 的名稱和其相對應的 value 值,若沒有對應的 value 成員,則列印 head 。

#ifdef DEBUG_PRINT
/*
 * Print the member value correspoding to member list
 * @node: the member list of element_t (struct list_head *)
 * @head: head of the list
 */
#define print_value(node, head) \
    printf("%s: %s\n", #node,   \
           (node != head) ? list_entry(node, element_t, list)->value : "head")
#endif /* DEBUG_PRINT */

備註:原本這裡 node 使用 list 這個名稱,不過會跟成員名稱 list 重複,巨集會展開錯誤,故改用 node

配置新佇列 q_new







%0



queue

prev

next
queue



queue:p->queue





queue:n->queue





配置新佇列 queue ,配置記憶體空間然後初始化。初始化後, queue 會指向自己,代表空佇列。

struct list_head *q_new()
{
    struct list_head *queue;

    queue = malloc(sizeof(struct list_head));
    if (queue)
        INIT_LIST_HEAD(queue);

    return queue;
}

插入新佇列元素 q_insert_head / q_insert_tail

在解釋釋放佇列 q_free 之前,我們要先知道字串 value 的空間是誰配置的。

插入新佇列元素有兩種方式,其一為插入前端 q_insert_head ,其二為插入尾端 q_insert_tail ,都要求為佇列元素 element_t 和其成員 value 配置空間。我們將其分為兩個部份:配置佇列元素 element_alloc 與插入佇列元素 q_insert_head/q_insert_tail

第一部份:配置佇列元素 element_alloc

先配置字串 value 和佇列元素 element_t 的空間,若某一配置失敗,則須將已配置的空間釋放。成功的話則將字串空間 value 連接上佇列元素 element_t ,回傳這剛配置完成的佇列元素地址,接續第二部份。值得注意的地方在 LIST_POISONING ,我們剛配置完元素後,並沒有幫 list 成員初始化或與其他 struct list_head 連結,為防止使用無意義的指標,改其值指向一個會發生 segfault 的記憶體位置 (Linux LIST_POISON feature) 。

static element_t *element_alloc(size_t value_size)
{
    element_t *elm;
    char *value;

    value = malloc(value_size);
    if (!value)
        goto fail_alloc_value;

    elm = malloc(sizeof(element_t));
    if (!elm)
        goto fail_alloc_elm;

    /* assemble the element_t and value */
    elm->value = value;

    /* Poison the list node to prevent anyone using the unlinked list node */
#ifdef LIST_POISONING
    elm->list->prev = (struct list_head *) (0x00100100);
    elm->list->next = (struct list_head *) (0x00200200);
#endif

    return elm;

fail_alloc_elm:
    free(value);
fail_alloc_value:
    return NULL;
}
element_t 使用 Arrays of Length Zero 會更簡潔
static element_t *element_alloc(size_t value_size)
{
    element_t *elm;

    elm = malloc(sizeof(element_t) + value_size);
    if (!elm)
        return NULL;
    
    /* Poison the list node to prevent anyone using the unlinked list node */
#ifdef LIST_POISONING
    elm->list->prev = (struct list_head *) (0x00100100);
    elm->list->next = (struct list_head *) (0x00200200);
#endif

    return elm;
}
第二部份:插入新元素 q_insert_head / q_insert_tail

配置完佇列元素後,將輸入字串複製到 value 裡,最後將其插入佇列 head 的前端/尾端。要注意輸入的字串 s 有被截短 (truncated) 的可能,所以在複製得到的字串結尾再加上 '\0' 表示字串結束。

bool q_insert_head(struct list_head *head, char *s)
{
    element_t *elm;
    int s_length;

    if (!head)
        return false;

    s_length = strlen(s);
    elm = element_alloc(s_length + 1);
    if (!elm)
        return false;

    /* copy the string content */
    memcpy(elm->value, s, s_length);
    elm->value[s_length] = '\0';

    /* add the list into the front of head */
    list_add(&elm->list, head);

    return true;
}
q_insert_tail (將 list_add 改為 list_add_tail
bool q_insert_tail(struct list_head *head, char *s)
{
    element_t *elm;
    int s_length;

    if (!head)
        return false;

    s_length = strlen(s);
    elm = element_alloc(s_length + 1);
    if (!elm)
        return false;

    /* copy the string content */
    memcpy(elm->value, s, s_length);
    elm->value[s_length] = '\0';

    /* add the list at the tail of head */
    list_add_tail(&elm->list, head);

    return true;
}

釋放佇列 q_free

釋放佇列,包括 queue 本身,所有佇列元素 element_t 及其字串成員 value

void q_free(struct list_head *l)
{
    element_t *elm, *elm_safe;

    if (!l)
        return;

    /* free the queue elements */
    list_for_each_entry_safe (elm, elm_safe, l, list)
        q_release_element(elm);

    /* free the queue itself, the list head */
    free(l);
}

在這個動作裡,我們會一邊釋放佇列元素,然後一邊走訪下一個佇列元素。要注意若使用普通的 list_for_each_entry (只供讀取)會發生問題,必須使用 list_for_each_entry_safe (可適度修改佇列元素)。

void q_release_element(element_t *e)
{
    free(e->value);
    free(e);
}
為何使用 list_for_each_entry_safe

(這邊直接展開巨集)
注意在第 6 行 free(elm); 之後,要更新 elm 到下一個佇列元素,其利用 elm->list.next ,但是當下的 elm 已經被釋放掉了。

/* using list_for_each_entry */ for (elm = list_entry(head->next, element_t, list); &elm->list != head; elm = list_entry(elm->list.next, element_t, list) { free(elm->value); free(elm); }

相對的, list_for_each_entry_safe 則會先用 elm_safe 紀錄下一個佇列元素(第 3, 6 行)。若當下所在佇列元素 elm 被移除,還是可以從 elm_safe 找到下一個佇列元素。

/* using list_for_each_entry_safe */ for (elm = list_entry(head->next, element_t, list), elm_safe = list_entry(elm->list.next, element_t, list); &elm->list != head; elm = list_entry(elm->list.next, element_t, list), elm_safe = list_entry(elm_safe->list.next, element_t, list)) { free(elm->value); free(elm); }

移除佇列元素 q_remove_head / q_remove_tail

對佇列移除前端 / 尾端元素,並將其所紀錄的字串內容複製到輸入給的 sp 位置上。

element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize)
{
    element_t *elm;

    if (!head || list_empty(head))
        return NULL;

    elm = list_first_entry(head, element_t, list);

    /* copy the string if sp is non-NULL */
    if (sp) {
        memcpy(sp, elm->value, bufsize - 1);
        sp[bufsize - 1] = '\0';
    }

    /* remove the list from head */
    list_del(&elm->list);

    return elm;
}
q_remove_tail (將 list_first_entry 改成 list_last_entry
element_t *q_remove_tail(struct list_head *head, char *sp, size_t bufsize)
{
    element_t *elm;

    if (!head || list_empty(head))
        return NULL;

    elm = list_last_entry(head, element_t, list);

    /* copy the string if sp is non-NULL */
    if (sp) {
        memcpy(sp, elm->value, bufsize - 1);
        sp[bufsize - 1] = '\0';
    }

    /* remove the list from head */
    list_del(&elm->list);

    return elm;
}

兩個方法差異在於 list_first_entrylist_last_entry ,前者其實就傳入 head 回傳 head->next ,那如果要得到雙向鍊結結構的最後一個元素,那可以傳入 head->prev->prev ,也就會得到 (head->prev->prev)->next 了。

q_remove_tail (使用 head->prev->prev by laneser 同學)
element_t *q_remove_tail(struct list_head *head, char *sp, size_t bufsize)
{
    if (!head)
        return NULL;
    return q_remove_head(head->prev->prev);
}

佇列長度 q_size

給定一佇列,回傳其佇列元素個數。佇列由 struct list_head 連結而成,只要走訪每一個成員 list 即可算出個數。

int q_size(struct list_head *head)
{
    if (!head)
        return 0;

    int len = 0;
    struct list_head *li;

    list_for_each (li, head)
        ++len;

    return len;
}

刪除佇列中心點元素 q_delete_mid

利用兩個速度相差一倍的指標,走訪整個佇列。當快的指標走完全程,則慢的指標會停在佇列中心點上,移除之。

bool q_delete_mid(struct list_head *head)
{
    struct list_head *fast, *slow;

    if (!head || list_empty(head))
        return false;

    /* two traverses with different speeds until fast meets head */
    fast = head->next;
    slow = head->next;
    while (fast != head && fast->next != head) {
        slow = slow->next; /* will stop on the middle node */
        fast = fast->next->next;
    }

    /* remove the middle node from head */
    list_del(slow);

    /* delete the relative element */
    q_release_element(list_entry(slow, element_t, list));

    return true;
}

佇列刪除重複內容元素 q_delete_dup

佇列排序後,刪除其字串內容重複的元素,例如 1-1-2-3 刪除完後會變成 2-3 。這邊使用兩個指標,一個指向當下佇列元素 elm ,另一個指向下一個佇列元素 next_elm ,若 elm 的字串內容與 next_elm 的字串內容相同,則刪除當下的佇列元素 elm ,繼續重複這個動作。但只重複這個動作,會留下重複的最後一個元素,所以再加一個判定條件,前一個佇列元素是否為被刪除的元素,以刪除餘下的最後一個元素(前一個佇列元素因為與當下的元素字串內容相同而被刪除)。

bool q_delete_dup(struct list_head *head)
{
    element_t *elm, *next_elm;
    int found_dup = 0;

    if (!head)
        return false;

    /* if head is empty or singular, then it's done */
    if (list_empty(head) || list_is_singular(head))
        return true;

    /* traverse the whole head and delete the duplicate nodes one by one */
    list_for_each_entry_safe (elm, next_elm, head, list) {
        int cmp_result =
            (elm->list.next != head) && !strcmp(elm->value, next_elm->value);
        if (cmp_result || found_dup) {
            list_del(&elm->list);
            q_release_element(elm);
            found_dup = cmp_result;
        }
    }

    return true;
}

佇列元素前後互換 q_swap

佇列元素兩兩一組作互換,例如 1-2-3-4 變成 2-1-4-3

void q_swap(struct list_head *head)
{
    struct list_head *a, *b;

    if (!head || list_empty(head) || list_is_singular(head))
        return;

    /* initialize */
    a = head->next;
    b = a->next;

    do {
        /* swap a, b */
        a->prev->next = b;
        b->prev = a->prev;
        a->prev = b;
        a->next = b->next;
        b->next->prev = a;
        b->next = a;

        /* update a, b for next iteration */
        a = a->next;
        b = a->next;
    } while (a != head && b != head);
}

佇列反轉 q_reverse

佇列反轉,例如 1-2-3 反轉成 3-2-1 。這邊使用兩個指標,一個從前端走向中心,另一個從尾端走向中心,兩個指標作互換,直至所有佇列元素都造訪過。

void q_reverse(struct list_head *head)
{
    struct list_head *a, *b, *tmp;

    if (!head || list_empty(head) || list_is_singular(head))
        return;

    /* initialize */
    a = head->next;
    b = head->prev;

    do {
        /* swap a, b */
        if (a->next != b) { /* if a is not beside b */
            a->prev->next = b;
            b->prev->next = a;
            tmp = a->prev;
            a->prev = b->prev;
            b->prev = tmp;

            a->next->prev = b;
            b->next->prev = a;
            tmp = a->next;
            a->next = b->next;
            b->next = tmp;
        } else {
            a->prev->next = b;
            b->prev = a->prev;
            b->next->prev = a;
            a->next = b->next;
            a->prev = b;
            b->next = a;
        }

        /* update a, b for next iteration */
        tmp = a;
        a = b->next;
        b = tmp->prev;
    } while (a != b && a->prev != b); /* stop when a is b or a precedes b */
}

後來新增 list_swap 方法,可以直接替代中間 swap a, b 的部份。

void q_reverse(struct list_head *head)
{
    struct list_head *a, *b, *tmp;

    if (!head || list_empty(head) || list_is_singular(head))
        return;

    /* initialize */
    a = head->next;
    b = head->prev;

    do {
        list_swap(a, b);

        /* update a, b for next iteration */
        tmp = a;
        a = b->next;
        b = tmp->prev;
    } while (a != b && a->prev != b); /* stop when a is b or a precedes b */
}

後來看到一個可以把 if 分支拿掉的方法,想法滿直接的,把雙向鍊結結構的 nextprev 互換,最後結果就是一個反轉的佇列了。

沒 if 分支程式碼( by Chao-Shun-Cheng 同學)
void q_reverse(struct list_head *head)
{
    if (!head || list_empty(head))
        return;
    struct list_head *node = NULL, *safe = NULL, *temp = NULL;
    list_for_each_safe (node, safe, head) {
        temp = node->next;
        node->next = node->prev;
        node->prev = temp;
    }
    /* swap head's next and prev */
    temp = head->next;
    head->next = head->prev;
    head->prev = temp;
}

佇列排序 q_sort

目前直接使用 Linux 的 list_sort ,在這紀錄其程式碼的內容,還有作一些實驗。

TODO

  • 改寫 q_sort
  • 實作 fully-eager bottom-up merge sort
  • 驗證 cache trashing 的減少

Linux 的 list_sort 使用的是一種特別的 bottom-up 合併排序, bottom-up 為使用迭代而非遞迴。一般來說, bottom-up 的合併排序每次迭代都會新增一個元素進排序好的 sublists 中,進行排序合併,只要出現一樣長度的 sublist 就會馬上進行合併,例如 4-2-1-1 合併成 4-2-2 繼續合併成 4-4 最後變成 8 ,合併到無法合併為止,此模式稱為 fully-eager 。相對來說, Linux 的 list_sort 則是 not fully-eager ,會把剛才一次迭代裡的三個排序合併平均分散到三次迭代裡,此用意為維持至少長度

2:1 的平衡排序合併,雖然會增加些許比較次數,但對現代處理器來說,減少 cache thrashing 的發生更為重要 。

list_sort 版本( Linux )

q_sort 是我們呼叫的函式,亦即佇列的方法之一:傳入一佇列,進行遞增排序。從註釋的圖可以看到,在排序的過程中會把雙向鍊結串列的結構給破壞掉, sublist 與 sublist 由 prev 連結, sublist 裡的元素則是由 next 連結,整個過程會分為三個階段,第一個階段為長度

1:1 的排序合併,由 countpending 中的元素個數)控制;第二階段則是把所有 sublists 合併到剩兩個 sublists;最後階段把那兩個 sublists 合併然後復原雙向鍊結結構。

/*
 * From Linux: lib/list_sort.c
 * bottom-up merge sort (not fully-eager):
 *
 *       prepare-to-merge(4 + 4 = 8)
 *             |   \   tail(10) <~ tail(1011)    1011 = 8 + 2 + 1
 *        prev |    \ /           /               ^   = (4 + 4) + 2 + 1
 *    NULL <-- o <-- o <-- o <-- o  <~~ pending   |
 *            /     /     /     /                 0 bit on 4 (2^2)
 *           o     o     o    NULL               /
 *     next /     /     /            state: [ 4-4-2-1 > 8-2-1 ]
 *         o     o    NULL           count = 11
 *        /     /              list
 *       o     o                 |
 *      /     /         head --> o --> o --> NULL
 *    NULL  NULL             next
 * (older)  (newer)
 */
void q_sort(struct list_head *head)
{
    struct list_head *list, *pending;
    unsigned long count = 0;

    if (!head)
        return;

    list = head->next;
    /* if head is empty or singular */
    if (list == head->prev)
        return;

    pending = NULL;
    head->prev->next = NULL; /* break the cycle of doubly linked list */

    /* pahse 1 */
    do {
        int bits;
        struct list_head **tail = &pending; /* pending's tail */

        /* move tail to the next two possible merging pending lists */
        for (bits = count; bits & 1; bits >>= 1)
            tail = &(*tail)->prev;

        /* merge if bits becomes non-zero even */
        if (bits) {
            struct list_head *newer = *tail, *older = newer->prev;

            newer = merge(older, newer);
            newer->prev = older->prev;
            *tail = newer;
        }

        /* add list into pending and update list, count */
        list->prev = pending;
        pending = list;
        list = list->next;
        pending->next = NULL;
        ++count;
    } while (list);

    /* phase 2: all lists are in pending, merge them */
    list = pending;
    pending = list->prev;
    for (;;) {
        struct list_head *next = pending->prev;

        if (!next)
            break;

        list = merge(pending, list);
        pending = next;
    }

    /* phase 3: final merge and restore the doubly linked list */
    merge_restore(head, pending, list);
}

merge 函式就是主要的排序合併的部份,傳入兩個要合併的 sublists 進行排序合併,其行為有點像穿針引線的動作, tail 是針而 next 是線。比較兩邊 sublists 的開頭元素,選擇比較小的元素穿入線中,繼續下一個比較。完成後,回傳合併後 sublist 的開頭。

static struct list_head *merge(struct list_head *a, struct list_head *b)
{
    /* head initial value is meaningless, just for satisfying cppcheck */
    struct list_head *head = NULL, **tail = &head; /* next's tail */

    /* threading the lists in order */
    for (;;) {
        element_t *elm_a = list_entry(a, element_t, list);
        element_t *elm_b = list_entry(b, element_t, list);
        if (strcmp(elm_a->value, elm_b->value) <= 0) {
            *tail = a;
            tail = &a->next;
            a = a->next;
            if (!a) {
                *tail = b;
                break;
            }
        } else {
            *tail = b;
            tail = &b->next;
            b = b->next;
            if (!b) {
                *tail = a;
                break;
            }
        }
    }

    return head;
}

merge_restore 則是合併最後剩下的兩個 sublist ,然後復原雙向鍊結結構中的 prevhead ,完成佇列的排序。

static void merge_restore(struct list_head *head,
                          struct list_head *a,
                          struct list_head *b)
{
    struct list_head *tail = head;

    /* threading in order and restore the prev pointer */
    for (;;) {
        element_t *elm_a = list_entry(a, element_t, list);
        element_t *elm_b = list_entry(b, element_t, list);
        if (strcmp(elm_a->value, elm_b->value) <= 0) {
            tail->next = a;
            a->prev = tail;
            tail = a;
            a = a->next;
            if (!a)
                break;
        } else {
            tail->next = b;
            b->prev = tail;
            tail = b;
            b = b->next;
            if (!b) {
                b = a;
                break;
            }
        }
    }

    /* splice two lists */
    tail->next = b;

    /* restore the remaining nodes' prev pointers */
    do {
        b->prev = tail;
        tail = b;
        b = b->next;
    } while (b);

    /* restore the cycle of doubly linked list */
    tail->next = head;
    head->prev = tail;
}

對於 list_sort 他如何知道該是哪兩個 sublists 要合併,則是利用 pending 裡的元素個數 count ,其對應的方式非常有趣。可以觀察一下二進位與合併的 sublists 長度兩者的關係,若下一個進位的位數在第零位(

20 的位置),那麼合併的長度即為 1 (
20=1
) ,唯有例外就是進位數在最高位數的更上一位,比方說 7(111) 變成 8(1000) ,這時就不會有合併發生。為何是這樣的對應方式,大概可以歸納出以下三個規則:

  1. 二進位代表著想要達到的狀態,例如 5(101) 的話代表想變成 [ 4-1 ] 的狀態
  2. 一次最多只能合併一次
  3. 若有多組可合併的選擇,則最短相同長度的 sublists 優先

這邊舉例說明,以 10(1010) 來說,希望達成的狀態是 [ 8-2 ] ,但其初始狀態為 [ 4-4-1-1 ] ,看來是無法一次達成,那只好先合併最短的組合,即 1-1 ,所以狀態變成 [ 4-4-2 ]

元素數量    二進位    狀態            合併的 sublists 長度
     1        1    [ 1 ]

     2       10    [ 1-1 > 2 ]            1
     3       11    [ 2-1 ]

     4      100    [ 2-1-1 > 2-2 ]        1 *
     5      101    [ 2-2-1 > 4-1 ]        2
     6      110    [ 4-1-1 > 4-2 ]        1 *
     7      111    [ 4-2-1 ]

     8     1000    [ 4-2-1-1 > 4-2-2 ]    1 *
     9     1001    [ 4-2-2-1 > 4-4-1 ]    2 *
    10     1010    [ 4-4-1-1 > 4-4-2 ]    1 *
    11     1011    [ 4-4-2-1 > 8-2-1 ]    4
    12     1100    [ 8-2-1-1 > 8-2-2 ]    1 *
    13     1101    [ 8-2-2-1 > 8-4-1 ]    2 *
    14     1110    [ 8-4-1-1 > 8-4-2 ]    1 *
    15     1111    [ 8-4-2-1 ]

從上表可以看到一個規律,我們以二進位最高位數相同的分一組,所以 2, 3 會在同一組(最高位數是

21 ),每一組極限就是把
1
填滿。從每一組的開頭到結尾,他的改變都不會碰到最高位數,只會在其位數以下的地方作改變。而我們的改變都是加
1
,這意味著,上一組的變化可以完全複製到現在這組的變化裡,換句話說,可以重現上一組變化,即表中 * 標示。然後換組的時候,我們要先用
1
填滿二進位,這需要上一組兩倍的迭代數,所以上一組的變化會在當下這組重現兩次(例如 8 到 11 , 12 到 15 )。

Fully-eager 的版本實作

同理,使用剛才所提二進位的規律,加上 fully-eager 的特性,一發生合併會合到無法合併為止,所以每個迭代一定可以達到希望達到的狀態。然後每次都是加

1 ,所以
20
位置是
1
的時候不會有合併發生,反而
20
的位置是零的,代表會有合併會發生。而要合併幾次,要看二進位尾部有幾個零,比如 7(111) 變成 8(1000) ,這代表我加
1
後會進位 3 次,亦即合併 3 次。所以我們可以將 q_sort 的 phase 1 換成以下程式碼就可以變成 fully-eager 版本。其原理就是遇到二進位尾部有零的時候,就要做排序合併,有幾個零就做幾次,即可符合 fully-eager 所要求。

/* phase 1: fully-eager bottom-up merge sort */                                                         
do {
    struct list_head **tail = &pending;

    if (count) {
        int trailing_zeros = __builtin_ctz(count);

        while (trailing_zeros) {
            struct list_head *newer = *tail, *older = newer->prev;
            
            newer = merge(older, newer);
            newer->prev = older->prev;
            *tail = newer;
            --trailing_zeros;
        }
    }
    
    list->prev = pending;
    pending = list;
    list = list->next;
    pending->next = NULL;
    ++count;      
} while (list);
驗證

待續

qtest

qtest 記憶體遺失問題

使用 Address Sanitizer 檢查記憶體使用是否有錯誤,編譯完後執行 qtest ,然後離開,即會出現該錯誤,指出用到一個非法的指標,或是訪問 NULL 指標。

# 開啟 address sanitizer(ASan)
$ make SANITIZER=1
$ ./qtest
cmd> quit
Freeing queue
Segmentation fault occurred.  You dereferenced a NULL or invalid pointer
Aborted (core dumped)
ASan 編譯小筆記

編譯時 CFLAGS, LDFLAGS 都要加 -fsanitize=address
How to use AddressSanitizer with GCC?

執行 valgrind 結果,指出有兩種錯誤,一個讀了 NULL 指標,應該跟 ASan 給的錯誤一樣。另一個則是 still reachable ,跟都 linenoiseHistoryAdd/Load 有關,有記憶體未釋放。使用 massif 和 Massif.js 視覺化之後,可以看到最後沒有變回零,即有記憶體未釋放。

still reachable 程式結束時,有記憶體還是可以訪問,沒釋放也沒丟失。

# 沒開 ASan
$ valgrind ./qtest
cmd> quit
Freeing queue
==29468== Invalid read of size 4
==29468==    at 0x10E538: run_console (console.c:653)
==29468==    by 0x10C9DF: main (qtest.c:1017)
==29468==  Address 0x0 is not stack'd, malloc'd or (recently) free'd
==29468== 
Segmentation fault occurred.  You dereferenced a NULL or invalid pointer
==29468== 5 bytes in 1 blocks are still reachable in loss record 1 of 3
==29468==    at 0x483B7F3: malloc (in /usr/lib/x86_64-linux-gnu/valgrind/vgpreload_memcheck-amd64-linux.so)
==29468==    by 0x4A5850E: strdup (strdup.c:42)
==29468==    by 0x110D83: linenoiseHistoryAdd (linenoise.c:1236)
==29468==    by 0x111916: linenoiseHistoryLoad (linenoise.c:1325)
==29468==    by 0x10CA3A: main (qtest.c:1004)
==29468== 
==29468== 102 bytes in 18 blocks are still reachable in loss record 2 of 3
==29468==    at 0x483B7F3: malloc (in /usr/lib/x86_64-linux-gnu/valgrind/vgpreload_memcheck-amd64-linux.so)
==29468==    by 0x4A5850E: strdup (strdup.c:42)
==29468==    by 0x110CF7: linenoiseHistoryAdd (linenoise.c:1236)
==29468==    by 0x111916: linenoiseHistoryLoad (linenoise.c:1325)
==29468==    by 0x10CA3A: main (qtest.c:1004)
==29468== 
==29468== 160 bytes in 1 blocks are still reachable in loss record 3 of 3
==29468==    at 0x483B7F3: malloc (in /usr/lib/x86_64-linux-gnu/valgrind/vgpreload_memcheck-amd64-linux.so)
==29468==    by 0x110D43: linenoiseHistoryAdd (linenoise.c:1224)
==29468==    by 0x111916: linenoiseHistoryLoad (linenoise.c:1325)
==29468==    by 0x10CA3A: main (qtest.c:1004)
==29468== 
Aborted (core dumped)

memory leak

順著 valgrind 顯示,直接看 linenoiseHistory 裡的第 1224, 1236 行的 mallocstrdup ,可以從 valgrind 看出 strdup 裡面也有呼叫 malloc ,沒釋放的記憶體應該從這兩個地方配置的。而配置的地址是存在 history 裡,用途是存放過往輸入命令 (command) ,所以推測應該是在程式結束前沒有釋放 history

// linenoise.c static char **history = NULL;
int linenoiseHistoryAdd(const char *line) { char *linecopy; if (history_max_len == 0) return 0; /* Initialization on first call. */ if (history == NULL) { history = malloc(sizeof(char *) * history_max_len); if (history == NULL) return 0; memset(history, 0, (sizeof(char *) * history_max_len)); } /* Don't add duplicated lines. */ if (history_len && !strcmp(history[history_len - 1], line)) return 0; /* Add an heap allocated copy of the line in the history. * If we reached the max length, remove the older line. */ linecopy = strdup(line); if (!linecopy) return 0; if (history_len == history_max_len) { free(history[0]); memmove(history, history + 1, sizeof(char *) * (history_max_len - 1)); history_len--; } history[history_len] = linecopy; history_len++; return 1; }

既然有方法用到 malloc ,那應該也有 free 在介面裡。這裡我試著用 cscope 搜尋使用到 history 的函式,可以看到有 free(history) 出現,在 freeHistory 裡面。

/* Free the history, but does not reset it. Only used when we have to
 * exit() to avoid memory leaks are reported by valgrind & co. */
static void freeHistory(void)
{
    if (history) {
        int j;

        for (j = 0; j < history_len; j++)
            free(history[j]);
        free(history);
    }
}

繼續再追蹤,用 cscope 查尋誰有呼叫 freeHistory ,出現只有一筆資料,是 linenoiseAtExit ,從名字可以看出是離開 linenoise 的時候呼叫,在離開前釋放記憶體,看起來合情合理。

/* At exit we'll try to fix the terminal to the initial conditions. */
static void linenoiseAtExit(void)
{
    disableRawMode(STDIN_FILENO);
    freeHistory();
}

linenoiseAtExit 是有加上 static 的函式,所以是從內部呼叫的。那再用 cscope 搜尋,找到是 enableRawMode 把他當參數放進 atexit ,準備在離開的時候呼叫。那繼續追蹤到不是 static 的函式,過程為 enableRawMode > linenoiseRaw > linenoise ,最終是眼熟的名字, linenoise 的主程式(過程中有把不太可能的選擇篩掉)。

atexit: register a function to be called at normal process termination

static int enableRawMode(int fd)
{
    ...

    if (!atexit_registered) {
        atexit(linenoiseAtExit);
        atexit_registered = 1;
    }
 
    ...
}

追加紀錄: qtest 使用 -f 讀取檔案時,記憶體的問題就消失了。

使用 valgrind:

$ cat traces/trace-quit.cmd
# test quit
quit
$ valgrind ./qtest -f traces/trace-quit.cmd
cmd> # test quit
cmd> quit
Freeing queue

從 massif 紀錄中可以看到最後變回零,代表沒有記憶體遺失。

no memory leak

開啟 ASan:

$ make SANITIZER=1
$ ./qtest -f traces/trace-quit.cmd
cmd> # test quit
cmd> quit
Freeing queue

原本只是想寫 script 自動化測試,不過意外發現問題跟有沒有使用讀檔案有關。回去再看一次 valgrind 的 stack trace ,路徑是 main > linenoiseLoad > linenoiseAdd > malloc ,從 qtest.c 程式碼得知,唯有在沒給 infile_name 的時候會初始化 linenoise 的 history ,同時這邊也是 valgrind 給的 stack trace 原點。

// qtest.c
int main(int argc, char *argv[])
{
    ...

    /* Initialize linenoise only when infile_name not exist */
    if (!infile_name) {
        /* Trigger call back function(auto completion) */
        linenoiseSetCompletionCallback(completion);
        linenoiseHistorySetMaxLen(HISTORY_LEN);
        linenoiseHistoryLoad(HISTORY_FILE); /* Load the history at startup */
    }

    ...
}

繼續順著沒有檔案名稱 infile_name 的路線走,會進入上面那段初始化程式碼,接著進入 run_console ,在沒有檔案名稱 infile_name 的情況下, push_file 不會修改 has_file 變數,依然為 false 。繼續走會進入第 647 行的區塊,循環呼叫 linenoise ,所以是有用 atexit 註冊我們關注的 linenoiseAtExit

bool run_console(char *infile_name) { if (!push_file(infile_name)) { report(1, "ERROR: Could not open source file '%s'", infile_name); return false; } if (!has_infile) { char *cmdline; while ((cmdline = linenoise(prompt)) != NULL) { interpret_cmd(cmdline); linenoiseHistoryAdd(cmdline); /* Add to the history. */ linenoiseHistorySave(HISTORY_FILE); /* Save the history on disk */ linenoiseFree(cmdline); while (buf_stack->fd != STDIN_FILENO) cmd_select(0, NULL, NULL, NULL, NULL); has_infile = false; } } else { while (!cmd_done()) cmd_select(0, NULL, NULL, NULL, NULL); } return err_cnt == 0; }

那這樣就奇怪了,有註冊 linenoiseAtExithistory 卻沒有被釋放,那還有東西遺漏,再回去看 valgrind 顯示,記憶體遺漏之前還有個非法讀取 NULL 指標,所以有可能是發生讀取 NULL 指標,然後導致無法呼叫 freeHistory ,致使記憶體遺漏,這樣現在目標轉到非法讀取 NULL 指標的錯誤上。

valgrind 說在 run_console 的第 653 行非法讀取 NULL 指標,該行只出現 buf_stack 這個指標,這個指標跟 RIO 套件(建立及管理 buffer 以減少 read 系統呼叫次數)有關,裡面的 buffer 是 stack 結構,而 buf_stack 會指向其頂端。最初 buf_stack 會初始化為 NULL ,再來進 push_file 拿到一個 stack ,接下來進入第 646 行的區塊,等待輸入命令,這時的狀態有一層 stack ,所以 buf_stack 不是 NULL

while (buf_stack && buf_stack->fd != STDIN_FILENO)

而會使 buf_stack 變為 NULL 的方法就是 pop_file ,他會排出一層 stack 。當我們輸入命令 quit 的時後, do_quit 會在 interpret_cmd 裡的命令解析後呼叫,而 do_quit 會呼叫 pop_file ,這時 buf_stack 就會變成 NULL ,然後在第 653 行讀取他就會發生錯誤。這樣我們就在讀取前加一個檢查即可。

$ valgrind ./qtest
cmd> quit
Freeing queue
$ make SANITIZER=1
$ ./qtest
cmd> quit
Freeing queue

mem heap extra

改完後檢驗,從 valgrind 和 ASan 看來沒有問題,但從 massif 的輸出來看,最後還有 1024 Bytes mem_heap 和 8 Bytes mem_heap_extra 尚遺留,待研究。

Valgrind User Manual
The number of extra heap bytes allocated at that point. This reflects the number of bytes allocated in excess of what the program asked for. There are two sources of extra heap bytes.
First, every heap block has administrative bytes associated with it. The exact number of administrative bytes depends on the details of the allocator. By default Massif assumes 8 bytes per block, as can be seen from the example, but this number can be changed via the heap-admin option.
Second, allocators often round up the number of bytes asked for to a larger number, usually 8 or 16. This is required to ensure that elements within the block are suitably aligned. If N bytes are asked for, Massif rounds N up to the nearest multiple of the value specified by the alignment option.

新命令 shuffle

使用 Fisher-Yates shuffle 實作佇列隨機排序,加入到 qtest 命令裡。

/* Fisher-Yates shuffle pseudocode */
// int a[n];
for (int i = n - 1; i > 0; --i) {
    int j = rand() % (i + 1);
    swap(a[i], a[j]);
}

首先寫個可以將兩個 list nodes 作交換的函式 list_swap ,其想法很簡單,先紀錄兩個 list nodes 前一個的位置(紀錄之後要插入的位置),然後利用剛才紀錄的位置,移除 list node 後再插入到紀錄的位置後面。不過會有個問題,當兩個 list nodes 相連的時候,會有一個紀錄的位置是在要交換的 list node 上,這樣會發生 node a 要插入到 node a 後面的矛盾。所以當有此情況發生的時候,我們要修正插入位置。同理,兩個 list nodes 不能相同,不然會有一樣的矛盾發生。

static inline void list_swap(struct list_head *a, struct list_head *b)
{
    struct list_head *pos = b->prev;

    list_del(b);
    /* replace a by b */
    b->prev = a->prev;
    b->next = a->next;
    a->next->prev = b;
    a->prev->next = b;
    if (pos == a)
        pos = b;
    /* add a before pos */
    list_add(a, pos);
}

再來是要想一下如何把 pseudocode 裡的陣列換成雙向鍊結結構,簡單方式就 a[1] 就換成 head->next->next ,利用 index 對應移動指標的次數,但這樣要用的變數有點多。後來想到 i 是從

n1 往下降,這樣我們可以利用雙向鍊結結構, ij 用相反方向移動,所以就變成 i 對應的 list node 用 list_for_each_safe 移動,而 j 對應的 list node 則是以 prev 的方向移動。

#include <time.h> void q_shuffle(struct list_head *head) { struct list_head *cur, *next, *swap_node; int n = q_size(head); if (n < 2) return; srand(time(NULL)); list_for_each_safe (cur, next, head) { if (n-- == 1) break; int idx = rand() % (n + 1); for (swap_node = head->prev; idx--; swap_node = swap_node->prev) ; if (swap_node != cur) list_swap(swap_node, cur); if (swap_node == next) next = cur; } }

我們用到 list_for_each_safe ,停止條件是 cur == head ,還有會使用到 list_swap 改變 list 的排列順序,所以要針對這兩點作一些對應。在原算法中, i 最後是

1 ,對應的 node 是 head->prev ,但 list_for_each_safe 會停止在 head ,也就是會多一次迭代,所以加了第 12 行,利用剩下需隨機排列數量 n 讓他提早結束。

#define list_for_each_safe(node, safe, head)                     \
    for (node = (head)->next, safe = node->next; node != (head); \
         node = safe, safe = node->next)

而另一點改變 list 排列順序的對應,首先要看一下 list_for_each_safe 的巨集運作方式。從程式碼來看,針對 node 的更新是用第 3 行的方式,我們列出以下幾種 case :

  1. 自己跟自己交換,是被 list_swap 禁止,不過寫個判斷即可
  2. nodenext->next (含)之後的 list node 交換,那沒有問題
  3. nodenext 交換,會出問題(原地再進迴圈一次,與原演算法有異)






%0



after

____

next

cur

cur->next

____
after swap



before

____

cur

next

next->next

____
before swap



那就針對第 3 點進行對應,如果發生第 3 點,原本 next 更新是要變成 next->next ,而經過交換後,原本要更新成的 list node 因為交換變成了 cur->next ,那我們就把 next 換成 cur 即可,這樣 list_for_each_safe 可以繼續運作無誤, list_swap 就完成了。







%0



expect

____

____

cur

next

____
expected



solution

____

____

next
cur

next->next

____
solution



// console.h
typedef bool (*cmd_function)(int argc, char *argv[]);
void add_cmd(char *name, cmd_function operation, char *documentation);
#define ADD_COMMAND(cmd, msg) add_cmd(#cmd, do_##cmd, msg)

再來要把 q_shuffle 加入到 qtest 中,成為新的命令 shuffle 。在 qtest.c 裡,每次啟動時候都會呼叫 console_init ,將 console 的命令還有一些參數初始化,如果要加入一個命令到裡,會使用到 ADD_COMMAND 這個巨集, cmd 是我們輸入的命令,而 msg 是輸入 help 的時候會顯示的命令說明,他展開後是 add_cmd 的個函式,可以看出輸入命令後會執行函式 operation ,名稱會是 do_##cmd ,所以我們要加入一個命令 shuffle ,那麼那個執行函式的名稱就要為 do_shuffle ,型態則為 cmd_function

extern void q_shuffle(struct list_head *); static bool do_shuffle(int argc, char *argv[]) { if (argc != 1) { report(1, "%s takes no arguments", argv[0]); return false; } if (!l_meta.l) report(3, "Warning: Try to access null queue"); error_check(); set_noallocate_mode(true); if (exception_setup(true)) q_shuffle(l_meta.l); exception_cancel(); set_noallocate_mode(false); show_queue(3); return !error_check(); }

qtest 是一個人機互動的程式,難免操作者可能會操作失誤,所以要加一些檢查讓程序可以正常運作,順便提醒操作者如何正確操作。第 4 行提醒 do_shuffle 不用輸入參數,第 9 行確認要先建立一個佇列,第 13 行禁止該區域內使用動態配置記憶(使用會跳警告),第 14 行是設置錯誤處理區域,一般程序如果有錯誤發生,作業系統會傳遞 SIGHUP 訊號給程序,準備終止他。若在錯誤處理區域內, exception_setup 會用 sigsetjmp 註冊接收 SIGHUP 的訊號,所以呼叫的函式如果出現錯誤,會引發 longjmp ,讓 qtest 來處理這個錯誤,而不是讓作業系統終止程序,這樣一來 qtest 就可以繼續運作下去。 show_queue 顯示隨機排序後的佇列,最後 error_check 是回傳剛才錯誤處理區域內是否有錯誤發生。這樣 shuffle 命令就完成了。

新命令 web

(待續)

參考資料

共筆示範