Try   HackMD

2022q1 Homework1 (lab0)

contributed by < kevinshieh0225 >

作業描述

本次作業 lab0 實作 FIFO 與 FILO 的佇列 queue 。為了使 remove, add 等操作維持在 O(1) 的時間複雜度,以 circular doubly linked list 的結構實現。相關知識背景務必詳閱你所不知道的 C 語言: linked list 和非連續記憶體,會有更清晰的背景說明。

運用 The Linux Kernel API - List Management Functions 的基礎,讓學員學習 Linux 核心程式碼風格實作方式,也讓我們初探物件導向作為一種態度的實作技巧。

事先準備

system

$ gcc -v
...
gcc version 9.3.0 (Ubuntu 9.3.0-17ubuntu1~20.04)

github

在 GitHub 上 fork lab0-c 並循序參考作業指引完成相關環境設定與實作。

rebase on fork

git remote add upstream https://github.com/original-repo/goes-here.git
git fetch upstream
git rebase upstream/master
git push origin master --force

queue 程式實作

修改 queue.c 的 function 已完成實作。需觀察 queue.h 介面對於 element_t 或 function 的描述,並搭配觀察 list.h 中對於 struct list_head 指標的描述來確定整體實作原理。

Some issue

1. malloc, free and robustness

(1) 透過動態記憶體配置來增減 element_t 的節點資料。而在我們選擇 delete element_t 時也必須用 free() 來將配置空間釋放。

(2) 任何使用動態記憶體配置的程式碼都務必確認是否成功,否則可能出現程式碼運行中的例外狀況。需確保 malloc() 回傳的內容:如果 heap 的記憶體空間不足時,其回傳值為 NULL。

(3) 注意 element_t 的 element:

/* Linked list element */
typedef struct {
    /* Pointer to array holding string.
     * This array needs to be explicitly allocated and freed
     */
    char *value;
    struct list_head list;
} element_t;

我們可以發現,element_t 中包含了一個 char pointer 與 struct list_head。在配置 element_t 空間時,完整的 string value 是還未被宣告的。於是 char* 要額外宣告,element_t->value 再指向此字串位置。最後在釋放時也必須同時釋放 element_t 與 char*。

void q_release_element(element_t *e)
{
    free(e->value);
    free(e);
}

2. container_of

在我們實作的 queue ,是以 element_t 作為節點,將 char *value 與 list_head 作為 member,所以節點本身並不沒有紀錄節點串連,是透過 list_head 之間來完成 linked list 的串連。

struct list_head {
    struct list_head *prev;
    struct list_head *next;
};

這麼做的好處是模組化,模組化提升程式碼的封裝、提升維護性。利用 list_head 實作 linked list 資料形式與功能,讓我們在設計新的資料結構時可以透過引用模組來實現功能,且只要所有 associate 關聯(association-uml)的 object 在程式碼上都遵守此份模組的規範,即可減少需維護的模組範圍。

用通順的漢語書寫。

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
jserv

container_of 使我們能夠從 member 的地址回推到原本物件地址。巨集原始碼如下:

#define __LIST_HAVE_TYPEOF 1
#ifndef container_of
#ifdef __LIST_HAVE_TYPEOF
#define container_of(ptr, type, member)                            \
    __extension__({                                                \
        const __typeof__(((type *) 0)->member) *__pmember = (ptr); \
        (type *) ((char *) __pmember - offsetof(type, member));    \
    })
#else
#define container_of(ptr, type, member) \
    ((type *) ((char *) (ptr) -offsetof(type, member)))
#endif
#endif

struct list_head ptr;
element_t *node = container_of(ptr, element_t, list)

假設 ptr 是一個 element_t 的成員,我們可以使用 container_of 來回推 node 的地址。第一個參數放入該成員位址,第二個放入我要回推物件的 type,第三個值輸入 ptr 在該 struct 物件中的名稱為何。

q_new / q_free / q_size

q_new() 我們希望建立一個新的 queue 資料物件。注意此時還未輸入節點,這是一個空的 queue。所以我們使用 INIT_LIST_HEAD(head) 先使節點中的 list_head 頭尾地址相連。

這裡在靜態分析時會出現由於 element_t *node 在配置空間後並未回傳,可能存在記憶體失去追蹤而未釋放的風險(但其實我們還是能夠取用該位置只是透過 container_of 來完成),故插入 cppcheck-suppress 來避免偵錯。

struct list_head *q_new()
{
    element_t *node = malloc(sizeof(element_t));
    if (!node)
        return NULL;
    struct list_head *head = &(node->list);
    INIT_LIST_HEAD(head);
    // cppcheck-suppress memleak
    return head;
}

不要只張貼程式碼,你要闡述自己的作法和後續的改進

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
jserv

q_free() 在傳入 queue 的起始位置,我們希望釋放整個 queue。我們透過 while loop 先到訪每個 list_head 位置,隨後取得 element_t 位置後,注意必須先指向下一個 list_head,再來釋放記憶體空間,否則 ptr 將指向不存在的位置,使迭代無法完成。

最後我們再釋放 queue head 的空間。無法在 while 中一氣呵成釋放的原因是 queue head 沒有配置 char * 的空間,存在形式和其他節點不同,故需獨立處理。

void q_free(struct list_head *l)
{
    if (!l)
        return;
    struct list_head *ptr = l->next;
    while (ptr != l) {
        element_t *node = container_of(ptr, element_t, list);
        ptr = ptr->next;
        q_release_element(node);
    }
    element_t *node = container_of(l, element_t, list);
    free(node);
}
int q_size(struct list_head *head)
{
    int num = 0;
    struct list_head *node;
    if (head)
        list_for_each (node, head)
            num += 1;
    return num;
}

q_insert

在 queue 的頭或尾安插節點。首先先建立 element_t,在確保 malloc 都成功下再透過 list_add 函數將被建立的 struct list_head 安插在 linked list 中。

這裡除了需先確認 queue 有順利建成,再來確認 element_t, char * 的記憶體配置是否順利。由於要頻繁確認之故,所以程式碼顯得很冗長,還未想到更簡潔的方式處理這塊。

由於 char * 作為傳入的 parameter 無法透過 sizeof 來確保其字串長度的,可透過一層 while loop 來計算字串的長度。

可以使用 strlen() 來確認 string 的大小:strlen 是以 counter 從 char * 逐一尋找最終非 NULLCHAR '\0' 位置,以回傳字串大小。
亦可參考 數值系統 在 strlen 的 bitwise 操作嘗試長度取值。

透過 memcpy 來複製字串,若使用 strcpy 會產生安全疑慮,因為該函數恐產生溢位風險,並無法通過靜態分析。

bool q_insert_head(struct list_head *head, char *s)
{
    if (!head)
        return false;
    element_t *node = malloc(sizeof(element_t));
    if (!node)
        return false;
    int charsize = strlen(s);
    char *value = malloc(charsize + 1);
    if (!value) {
        free(node);
        return false;
    }
    memcpy(value, s, charsize);
    value[charsize] = '\0';
    node->value = value;
    list_add(&(node->list), head);
    /* for q_insert_tail use :
    ** list_add_tail(&(node->list), head);
    */
    return true;
}

q_remove

移除節點。這裡特別注意必須題目敘述之 buffsize 大小。

element_t *q_remove_head(struct list_head *head, char *sp, size_t bufsize)
{
    if (!head || list_empty(head))
        return NULL;
    element_t *node = container_of(head->next, element_t, list);
    /* for q_remove_tail use :
    ** element_t *node = container_of(head->prev, element_t, list);
    */
    if (sp) {
        memcpy(sp, node->value, bufsize - 1);
        sp[bufsize - 1] = '\0';
    }
    list_del(head->next);
    return node;
}

q_delete_mid

此題希望刪除 queue 中間節點。我使用快慢指摽的技巧,快指標走兩步慢指標走一步,即可讓快指標指向終點 (head node) 時,慢指標可以指在中間節點上。再來執行節點刪除。

bool q_delete_mid(struct list_head *head)
{
    if (!head || list_empty(head))
        return false;
    struct list_head *slowptr = head->next;
    for (struct list_head *fastptr = head->next;
         fastptr != head && fastptr->next != head;
         fastptr = fastptr->next->next) {
        slowptr = slowptr->next;
    }
    element_t *node = container_of(slowptr, element_t, list);
    list_del(slowptr);
    free(node->value);
    free(node);
    return true;
}

q_delete_dup

希望刪除重複值節點。我透過設定 ckp 與 ptr 節點,讓 ckp 指在起始節點,讓 ptr 迭代尋值,若 ckp 與 ptr 指向的 字串相同,則我將該字串刪除,並讓 ptr 指向下一個節點;而若 ckp 與 ptr 不同,我則更新 ckp 位置到 ptr 所在,代表本位置以前的所有節點都是未重複過的。

我透過 strcmp 來比較字串大小,若相同則回傳值為 0。

這裏有可改進之處:由於每次發現重複值我便必須刪除節點並維護指標,如果發生連續性的重複字串時,其實更好的做法是找到迄今為止未重複之節點,一次性的刪除並維護指標位置,這樣能夠避免過多重複性但非必要的動作,可在日後改進。

bool q_delete_dup(struct list_head *head)
{
    if (!head || list_empty(head))
        return false;
    struct list_head *ckp = head->next;
    element_t *ckpnode = container_of(ckp, element_t, list);
    for (struct list_head *ptr = ckp->next; ptr != head; ptr = ckp->next) {
        element_t *ptrnode = container_of(ptr, element_t, list);
        if (strcmp(ckpnode->value, ptrnode->value) == 0) {
            list_del(ptr);
            q_release_element(ptrnode);
        } else {
            ckp = ptr;
            ckpnode = container_of(ckp, element_t, list);
        }
    }
    return true;
}

原先的程式碼是將串列重複的節點刪除,但不刪除初始出現的重複節點(故在執行完演算法後讓串列無重複節點的狀況就好)
以下演算法是刪除所有重複的節點的版本。

/**
 * @brief 刪除串列中重複節點
 * 
 * ckp 指向紀錄節點,ckpnode 指向紀錄節點的除存 element_t ,為了取得 string。
 * ckpisdup 紀錄是否有先前執行了刪除節點,告知我們要不要把當前的紀錄節點刪除。
 * 
 * for (ptr 指向當前節點,往前檢查直到回到起點節點)
 *     - 如果檢查發現紀錄節點與當前節點相同:
 *         > 開始向前尋值並刪除節點直到不再是重複節點
 *     - 如果不相同:
 *         > 檢查是否在先前在刪除節點,如果是的話我就要把當前節點刪除
 *         > 最後一同更新當前節點,表示前面的串列都已經整理過
 * 
 * 這裡程式碼缺點是易讀性很差,還沒想到更好的方式。
 */
bool q_delete_dup(struct list_head *head)
{
    if (!head || list_empty(head))
        return false;
    struct list_head *ckp = head->next;
    element_t *ckpnode = list_entry(ckp, element_t, list);
    bool ckpisdup = false;
    for (struct list_head *ptr = ckp->next; ptr != head; ptr = ckp->next) {
        element_t *ptrnode = list_entry(ptr, element_t, list);
        if (strcmp(ckpnode->value, ptrnode->value) == 0) {
            list_del(ptr);
            q_release_element(ptrnode);
            ckpisdup = true;
        } else {
            if (ckpisdup) {
                list_del(ckp);
                q_release_element(ckpnode);
                ckpisdup = false;
            }
            ckp = ptr;
            ckpnode = list_entry(ckp, element_t, list);
        }
    }
    if (ckpisdup) {
        list_del(ckp);
        q_release_element(ckpnode);
    }
    return true;
}

q_swap / q_reverse

/**
 * @brief 兩兩交換節點
 * 
 * 先確認是否有兩節點能夠互換
 * 如果為真我先對 *next 的指標們進行更新
 * 在完成以後在依緒透過回溯來更新 *prev 的指標。
 * 
 * 這裡程式碼缺點是易讀性很差,還沒想到更好的方式。
 */
void q_swap(struct list_head *head)
{
    if (!head || list_empty(head))
        return;
    struct list_head *ptr = head;
    while (ptr->next != head && ptr->next->next != head) {
        // swap next pointer
        struct list_head *tmp = ptr->next->next->next;
        ptr->next->next->next = ptr->next;
        ptr->next = ptr->next->next;
        ptr->next->next->next = tmp;
        // swap prev pointer
        ptr->next->next->next->prev = ptr->next->next;
        ptr->next->next->prev = ptr->next;
        ptr->next->prev = ptr;
        ptr = ptr->next->next;
    }
}

/**
 * @brief DoublyLinkedList 節點反轉
 * 
 * 從 head 出發一圈將 next 和 prev 指標位置互換。
 * 
 * @Trick
 * 由於這裡是少數 queue head 也需參與行動
 * 但 head 也同時必須成為終止判定的指標
 * 故我利用 do-while 的方式:
 *     讓 head 先執行一次後指標前移
 *     後續的執行再判定是否達到停止條件。
 */
void q_reverse(struct list_head *head)
{
    struct list_head *ptr = head;
    if (ptr && !list_empty(ptr)) {
        do {
            struct list_head *tmp = ptr->prev;
            ptr->prev = ptr->next;
            ptr->next = tmp;
            ptr = ptr->prev;
        } while (ptr != head);
    }
}

q_sort

此實作參考 jserv 在 linked list 上的程式碼。
特別可以注意,我是使用 single linked list 的邏輯在實作 mergesort
因為如果我們每次合併 node list 時都要把重新整理雙向串列,會浪費過多操作。
於是乾脆把順向串列整理好後,最後再整理反向串列。

struct list_head *mergeTwoLists(struct list_head *L1, struct list_head *L2)
{
    struct list_head *head = NULL, **ptr = &head, **cur;
    for (cur = NULL; L1 && L2; *cur = (*cur)->next) {
        // compare accending pair by pair
        element_t *node1 = container_of(L1, element_t, list);
        element_t *node2 = container_of(L2, element_t, list);
        cur = (strcmp(node1->value, node2->value) <= 0) ? &L1 : &L2;
        *ptr = *cur;
        ptr = &(*ptr)->next;
    }
    *ptr = (struct list_head *) ((uintptr_t) L1 | (uintptr_t) L2);
    return head;
}

static struct list_head *mergesort_list(struct list_head *head)
{
    if (!head || !head->next)
        return head;
    struct list_head *slow = head;
    for (struct list_head *fast = head->next; fast && fast->next;
         fast = fast->next->next)
        slow = slow->next;
    struct list_head *mid = slow->next;
    slow->next = NULL;

    struct list_head *left = mergesort_list(head), *right = mergesort_list(mid);
    return mergeTwoLists(left, right);
}

void q_sort(struct list_head *head)
{
    if (!head || list_empty(head))
        return;
    head->prev->next = NULL;
    head->next = mergesort_list(head->next);
    // reassign the prev ptr
    struct list_head *ptr = head;
    for (; ptr->next; ptr = ptr->next)
        ptr->next->prev = ptr;
    ptr->next = head;
    head->prev = ptr;
}

TODO: mergesort_list 使用遞迴實作,嘗試改為迭代 (iterative) 並探討系統堆疊使用量的變化 (作業說明介紹若干記憶體分析工具)。

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
jserv

q_sort 在遞迴與迭代之效能比較

iterative merge sort

參考 Merge Sort 與它的變化 實作 iteration merge sort 的改進版:

void q_sort(struct list_head *head)
{
    if (!head || list_empty(head))
        return;
    head->prev->next = NULL;

    int listsSize = 0;
    struct list_head *lists[150000] = {NULL};

    /* divide to single node 
     * 
     * split listnode first
     * but if the node sequence is already in accending
     * ignore spliting
     */
    struct list_head *node = head->next
    while(node){
        struct list_head *iter = node;
        while (iter && iter->next && iter->value < iter->next->value)
            iter = iter->next;
        lists[listsSize++] = node;
        node = iter->next;
        iter->next = NULL;
    }

    /* merge K sorted lists 
     * 
     * merge and sort two list node in array
     * pair in front and end list node
     * until all the list node had been merge in one list
     */
    while (listsSize > 1) {
        for (int i = 0, j = listsSize - 1; i < j; i++, j--)
            lists[i] = mergeTwoLists(lists[i], lists[j]);
        listsSize = (listsSize + 1) / 2;
    }
    // reassign the prev ptr
    head->next = lists[0];
    struct list_head *ptr = head;
    for (; ptr->next; ptr = ptr->next)
        ptr->next->prev = ptr;
    ptr->next = head;
    head->prev = ptr;
}

perf stat

參考 bakudr18效能分析實作

使用 perf stat 執行 500 次測試在迭代與遞迴版的 sort 上:
分別隨機產生 1000, 10000, 25000, 50000, 100000 個串列節點做 cache 觀察實驗:

@REM trace-sort.cmd

option fail 0
option malloc 0
new
ih RAND 1000    
@REM (10000, 25000, 50000, 100000)
sort
free

quit
$ sudo perf stat -r 500 -e cycles,instructions,\
    cache-references,cache-misses \
    ./qtest -f traces/trace-sort.cmd 
  • ih RAND 1000
# recursive
      429,4594      cycles                                                        ( +-  0.72% )
      402,3140      instructions              #    0.94  insn per cycle           ( +-  0.13% )
       13,2553      cache-references                                              ( +-  0.67% )
        3,4594      cache-misses              #   26.098 % of all cache refs      ( +-  0.89% )

     0.0017419 +- 0.0000212 seconds time elapsed  ( +-  1.22% )

# iterative
      599,7217      cycles                                                        ( +-  0.66% )
      529,7882      instructions              #    0.88  insn per cycle           ( +-  0.07% )
       18,4345      cache-references                                              ( +-  0.51% )
        6,6911      cache-misses              #   36.296 % of all cache refs      ( +-  0.66% )

     0.0023704 +- 0.0000218 seconds time elapsed  ( +-  0.92% )
  • ih RAND 10000
# recursive
     2987,2129      cycles                                                        ( +-  0.71% )
     2767,1574      instructions              #    0.93  insn per cycle           ( +-  0.01% )
      105,0227      cache-references                                              ( +-  0.22% )
       18,9708      cache-misses              #   18.064 % of all cache refs      ( +-  2.13% )

     0.0097144 +- 0.0000942 seconds time elapsed  ( +-  0.97% )

# iterative
     3490,4083      cycles                                                        ( +-  0.73% )
     2860,1657      instructions              #    0.82  insn per cycle           ( +-  0.01% )
      145,8666      cache-references                                              ( +-  0.11% )
       25,5875      cache-misses              #   17.542 % of all cache refs      ( +-  1.72% )

      0.011597 +- 0.000110 seconds time elapsed  ( +-  0.95% )
  • ih RAND 25000
# recursive
     8799,0769      cycles                                                        ( +-  0.36% )
     6851,6095      instructions              #    0.78  insn per cycle           ( +-  0.01% )
      309,8532      cache-references                                              ( +-  0.09% )
      109,2375      cache-misses              #   35.255 % of all cache refs      ( +-  0.72% )

      0.034604 +- 0.000216 seconds time elapsed  ( +-  0.63% )

# iterative
   1,0853,8558      cycles                                                        ( +-  0.47% )
     6881,3598      instructions              #    0.63  insn per cycle           ( +-  0.01% )
      405,7416      cache-references                                              ( +-  0.09% )
      174,5545      cache-misses              #   43.021 % of all cache refs      ( +-  0.99% )

      0.053943 +- 0.000370 seconds time elapsed  ( +-  0.69% )
  • ih RAND 50000
# recursive
   2,0458,3259      cycles                                                        ( +-  0.21% )
   1,3795,8839      instructions              #    0.67  insn per cycle           ( +-  0.01% )
      694,3560      cache-references                                              ( +-  0.07% )
      335,3012      cache-misses              #   48.290 % of all cache refs      ( +-  0.38% )

      0.080581 +- 0.000274 seconds time elapsed  ( +-  0.34% )

# iterative
   2,4660,8830      cycles                                                        ( +-  0.17% )
   1,3685,5232      instructions              #    0.55  insn per cycle           ( +-  0.01% )
      875,4610      cache-references                                              ( +-  0.04% )
      500,2342      cache-misses              #   57.140 % of all cache refs      ( +-  0.20% )

      0.122214 +- 0.000227 seconds time elapsed  ( +-  0.19% )
  • ih RAND 100000
# recursive
   4,5710,0302      cycles                                                        ( +-  0.08% )
   2,7945,2035      instructions              #    0.61  insn per cycle           ( +-  0.00% )
     1573,9750      cache-references                                              ( +-  0.03% )
      870,5131      cache-misses              #   55.307 % of all cache refs      ( +-  0.11% )

      0.184705 +- 0.000225 seconds time elapsed  ( +-  0.12% )

# iterative
   5,8864,9910      cycles                                                        ( +-  0.14% )
   2,7584,3506      instructions              #    0.47  insn per cycle           ( +-  0.00% )
     1941,7409      cache-references                                              ( +-  0.03% )
     1354,6765      cache-misses              #   69.766 % of all cache refs      ( +-  0.16% )

      0.290253 +- 0.000471 seconds time elapsed  ( +-  0.16% )

以上實驗可以發現:

  • 雖然遞迴與迭代的指令數不會差太多,但迭代所需的時脈週期卻更多。
  • 迭代使用的 cache-references 更多,且 cache-misses 也通常更多。

遞迴基於 divide and conquer 的特性,在合併排序上遞迴是更 cache-friendly 的,在降低 cache-references 和 cache-misses 之下,也讓執行效率更高。

valgrind

本次實驗未用 valgrind 做系統性實驗,但紀錄使用基本指令,以便下次使用:

# memory usage test
$ valgrind --tool=massif ./qtest -f ./traces/trace-sort.cmd
# cache usage test
$ valgrind --tool=cachegrind ./qtest -f ./traces/trace-sort.cmd
$ ms_print massif.out.xxxxx
$ sudo apt install massif-visualizer
$ massif-visualizer massif.out.xxxxx

只是紀錄,對本次實驗幫助不大。

qtest 新功能實作:Fisher-Yates shuffle

新增 qtest feature : shuffle
編輯 console_init() 並新增 do_shuffle()

static void console_init()
{
    ...
    ADD_COMMAND(shuffle,
                "                | Shuffle the queue with Fisher–Yates shuffle "
                "algorithm");
    ...
}
/**
 * @brief 對資料節點做 Fisher-Yates shuffle
 * 
 * 參考網誌 [Fisher–Yates shuffle 洗牌算法]
 * (https://gaohaoyang.github.io/2016/10/16/shuffle-algorithm/)
 * 
 * 1. 檢查 argc 長度
 * 2. 檢查 l_meta 不可為空
 * 3. for loop 逐一進行 leave-one-out shuffle
 *         從 [1,range] 挑出一個節點將他擺到佇列末端
 *         每次執行後減少 range 大小
 *         執行直到出現 error 或是 range == 0
 */
static bool do_shuffle(int argc, char *argv[])
{
    if (argc != 1) {
        report(1, "%s takes no arguments", argv[0]);
        return false;
    }
    if (!l_meta.l)
        report(3, "Warning: Calling size on null queue");
    error_check();
    bool ok = true;
    set_noallocate_mode(true);
    if (exception_setup(true)) {
        int cnt = q_size(l_meta.l);
        srand(time(NULL));
        ok = ok && !error_check();
        for (int range = cnt; ok && range > 0; range--) {
            int randint = rand() % range + 1;
            struct list_head *ptr = l_meta.l;
            while (randint-- > 0)
                ptr = ptr->next;
            list_move_tail(ptr, l_meta.l);
            ok = ok && !error_check();
        }
    }
    exception_cancel();
    set_noallocate_mode(false);
    show_queue(3);
    return ok;
}