Try   HackMD

2022q1 Homework5 (quiz5)

contributed by < leewei05 >

測驗 1

  • 解釋上述程式碼運作原理,指出可改進之處並實作
    是否有必要先將數值轉成字串?用十進位的角度處理運算是否產生額外的計算負擔?
  • Linux 核心原始程式碼 lib/math/prime_numbers.c 有類似的程式碼,請解釋其作用、裡頭 RCU 的使用情境,及針對執行時間的改進

測驗 2

Hazard pointer 原理

Hazard pointer 是由 hazard pointers 以及 retire list 所組成。其目的是為了實作 lock-free 的記憶體回收機制,適用於 Write-Rarely-Read-Many 的場景。

  • hazard pointer: 儲存此 thread 正在存取的指標,因此該指標不能直接被釋放
  • retire list: 被這個 thread 要求釋放的指標,但實際尚未釋放。只有每個 thread 自己可以存取的,因此不需要被同步

實作注意事項:

  • 確保其他執行緒沒有在存取欲釋放的物件,也就是各個執行緒的 hazard pointer 並沒有該物件
  • 各個執行緒欲釋放的物件放在 retire list

實作程式碼

以下是 Hazard pointer 的簡化 C11 實作,搭配 GNU extension:

封裝 GNU built-in functions

/* shortcuts */
#define atomic_load(src) __atomic_load_n(src, __ATOMIC_SEQ_CST)
#define atomic_store(dst, val) __atomic_store(dst, val, __ATOMIC_SEQ_CST)
#define atomic_exchange(ptr, val) \
    __atomic_exchange_n(ptr, val, __ATOMIC_SEQ_CST)
#define atomic_cas(dst, expected, desired)                                 \
    __atomic_compare_exchange(dst, expected, desired, 0, __ATOMIC_SEQ_CST, \
                              __ATOMIC_SEQ_CST)

#include <stdint.h>

#define LIST_ITER(head, node) \
    for (node = atomic_load(head); node; node = atomic_load(&node->next))

typedef struct __hp {
    uintptr_t ptr;
    struct __hp *next;
} hp_t;

配置一個新的空間並把新加入的 node 插入至 linked list 的 head。

#include <stdbool.h>
#include <stdlib.h>
#include <string.h>

/* Allocate a new node with specified value and append to list */
static hp_t *list_append(hp_t **head, uintptr_t ptr)
{
    hp_t *new = calloc(1, sizeof(hp_t));
    if (!new)
        return NULL;

    new->ptr = ptr;
    hp_t *old = atomic_load(head);

    do {
        new->next = old;
    } while (!atomic_cas(head, &old, &new));

    return new;
}

Built-in Function: type __atomic_load_n (type *ptr, int memorder)
This built-in function implements an atomic load operation. It returns the contents of *ptr.

Built-in Function: bool __atomic_compare_exchange_n (type *ptr, type *expected, type desired, bool weak, int success_memorder, int failure_memorder)
This built-in function implements an atomic compare and exchange operation. This compares the contents of *ptr with the contents of *expected. If equal, the operation is a read-modify-write operation that writes desired into *ptr. If they are not equal, the operation is a read and the current contents of *ptr are written into *expected.

If desired is written into *ptr then true is returned and memory is affected according to the memory order specified by success_memorder. There are no restrictions on what memory order can be used here.
Otherwise, false is returned and memory is affected according to failure_memorder.

expected 為 0 表示該 node 的指標物件是空的,可以直接寫入指標至該 node;atomic_cas 回傳 true 則表示 ptr 順利寫入至 &node->ptr

/* Attempt to find an empty node to store value, otherwise append a new node.
 * Returns the node containing the newly added value.
 */
hp_t *list_insert_or_append(hp_t **head, uintptr_t ptr)
{
    hp_t *node;
    bool need_alloc = true;

    LIST_ITER (head, node) {
        uintptr_t expected = atomic_load(&node->ptr);
        if (expected == 0 && atomic_cas(&node->ptr, &expected, &ptr)) {
            need_alloc = false;
            break;
        }
    }

    if (need_alloc)
        node = list_append(head, ptr);

    return node;
}

比對 node 的 ptr 與欲移除的 ptr 是否相同,相同則寫入 0 至 node 的 ptr,但並沒有清除 node 的記憶體空間。如果之後執行緒要移除某個物件,可以直接把該物件的指標寫入至空的 node。

/* Remove a node from the list with the specified value */
bool list_remove(hp_t **head, uintptr_t ptr)
{
    hp_t *node;
    const uintptr_t nullptr = 0;

    LIST_ITER (head, node) {
        uintptr_t expected = atomic_load(&node->ptr);
        if (expected == ptr && atomic_cas(&node->ptr, &expected, &nullptr))
            return true;
    }

    return false;
}

顧名思義,該執行緒的 retire list 是否存在某個物件。

/* Returns 1 if the list currently contains an node with the specified value */
bool list_contains(hp_t **head, uintptr_t ptr)
{
    hp_t *node;

    LIST_ITER (head, node) {
        if (atomic_load(&node->ptr) == ptr)
            return true;
    }

    return false;
}

釋放 retire list 裡所有 node 的記憶體空間。比較好奇NOT THREAD SAFE 的註解,retire list 不是只有單一執行緒可以存取嗎?

list_ 開頭的函式最初的設計目標是達到通用性質,模組化的程式碼應該儘量思考到後續的重用 (reusability),因此及早標注其限制就變得重要

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
jserv

/* Frees all the nodes in a list - NOT THREAD SAFE */
void list_free(hp_t **head)
{
    hp_t *cur = *head;
    while (cur) {
        hp_t *old = cur;
        cur = cur->next;
        free(old);
    }
}

定義 domain_t 來封裝 hazard pointers 物件。

#define DEFER_DEALLOC 1

typedef struct {
    hp_t *pointers;
    hp_t *retired;
    void (*deallocator)(void *);
} domain_t;

/* Create a new domain on the heap */
domain_t *domain_new(void (*deallocator)(void *))
{
    domain_t *dom = calloc(1, sizeof(domain_t));
    if (!dom)
        return NULL;

    dom->deallocator = deallocator;
    return dom;
}

/* Free a previously allocated domain */
void domain_free(domain_t *dom)
{
    if (!dom)
        return;

    if (dom->pointers)
        list_free(&dom->pointers);

    if (dom->retired)
        list_free(&dom->retired);

    free(dom);
}

寫入 prot_ptr 至 hazard pointer。

/*
 * Load a safe pointer to a shared object. This pointer must be passed to
 * `drop` once it is no longer needed. Returns 0 (NULL) on error.
 */
uintptr_t load(domain_t *dom, const uintptr_t *prot_ptr)
{
    const uintptr_t nullptr = 0;

    while (1) {
        uintptr_t val = atomic_load(prot_ptr);
        hp_t *node = list_insert_or_append(&dom->pointers, val);
        if (!node)
            return 0;

        /* Hazard pointer inserted successfully */
        if (atomic_load(prot_ptr) == val)
            return val;

        /*
         * This pointer is being retired by another thread - remove this hazard
         * pointer and try again. We first try to remove the hazard pointer we
         * just used. If someone else used it to drop the same pointer, we walk
         * the list.
         */
        uintptr_t tmp = val;
        if (!atomic_cas(&node->ptr, &tmp, &nullptr))
            list_remove(&dom->pointers, val);
    }
}

Built-in Function: void __builtin_unreachable (void)
If control flow reaches the point of the __builtin_unreachable, the program is undefined. It is useful in situations where the compiler cannot deduce the unreachability of the code.

執行緒用完 shared object 之後必須呼叫 droplist_remove 將會寫入 0 至 node 的 ptr,但並沒有清除 node 的記憶體空間。程式碼會執行 __builtin_unreachable 是沒有定義的狀況,因為執行緒現在使用的 shared object 一定會在 linked list 中。故 list_remove 回傳 false 一定是有其他不預期的原因。

/*
 * Drop a safe pointer to a shared object. This pointer (`safe_val`) must have
 * come from `load`
 */
void drop(domain_t *dom, uintptr_t safe_val)
{
    if (!list_remove(&dom->pointers, safe_val))
        __builtin_unreachable();
}
  1. 物件不在 hazard pointers 中,可以進行 deallocate
  2. DEFER_DEALLOC 等於 1 時,先寫入至 retire list 晚點再 deallocate
  3. 等到其他執行緒的 hazard pointers 沒有該物件即可以進行 deallocate
static void cleanup_ptr(domain_t *dom, uintptr_t ptr, int flags)
{
    if (!list_contains(&dom->pointers, ptr)) { /* deallocate straight away */
        dom->deallocator((void *) ptr);
    } else if (flags & DEFER_DEALLOC) { /* Defer deallocation for later */
        list_insert_or_append(&dom->retired, ptr);
    } else { /* Spin until all readers are done, then deallocate */
        while (list_contains(&dom->pointers, ptr))
            ;
        dom->deallocator((void *) ptr);
    }
}

把 shared pointer 的值跟 new pointer 互換,之後再釋放 shared pointer 原本佔用的記憶體空間。

/* Swaps the contents of a shared pointer with a new pointer. The old value will
 * be deallocated by calling the `deallocator` function for the domain, provided
 * when `domain_new` was called. If `flags` is 0, this function will wait
 * until no more references to the old object are held in order to deallocate
 * it. If flags is `DEFER_DEALLOC`, the old object will only be deallocated
 * if there are already no references to it; otherwise the cleanup will be done
 * the next time `cleanup` is called.
 */
void swap(domain_t *dom, uintptr_t *prot_ptr, uintptr_t new_val, int flags)
{
    const uintptr_t old_obj = atomic_exchange(prot_ptr, new_val);
    cleanup_ptr(dom, old_obj, flags);
}

推斷 EXP3, EXP4 皆為 list_remove(&dom->retired, ptr)

cleanup 要用在哪?測試程式最後也沒有呼叫 cleanup

這就是要讓你擴充的,你推測其作用,撰寫對應的測試程式

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
jserv

/* Forces the cleanup of old objects that have not been deallocated yet. Just
 * like `swap`, if `flags` is 0, this function will wait until there are no
 * more references to each object. If `flags` is `DEFER_DEALLOC`, only
 * objects that already have no living references will be deallocated.
 */
void cleanup(domain_t *dom, int flags)
{
    hp_t *node;

    LIST_ITER (&dom->retired, node) {
        uintptr_t ptr = node->ptr;
        if (!ptr)
            continue;

        if (!list_contains(&dom->pointers, ptr)) {
            /* We can deallocate straight away */
            if (EXP3)
                dom->deallocator((void *) ptr);
        } else if (!(flags & DEFER_DEALLOC)) {
            /* Spin until all readers are done, then deallocate */
            while (list_contains(&dom->pointers, ptr))
                ;
            if (EXP4)
                dom->deallocator((void *) ptr);
        }
    }
}

測試程式

#include <assert.h>
#include <err.h>
#include <pthread.h>
#include <stdio.h>

#define N_READERS 1
#define N_WRITERS 1
#define N_ITERS 20
#define ARRAY_SIZE(x) sizeof(x) / sizeof(*x)

typedef struct {
    unsigned int v1, v2, v3;
} config_t;

static config_t *shared_config;
static domain_t *config_dom;

config_t *create_config()
{
    config_t *out = calloc(1, sizeof(config_t));
    if (!out)
        err(EXIT_FAILURE, "calloc");
    return out;
}

void delete_config(config_t *conf)
{
    assert(conf);
    free(conf);
}

static void print_config(const char *name, const config_t *conf)
{
    printf("%s : { 0x%08x, 0x%08x, 0x%08x }\n", name, conf->v1, conf->v2,
           conf->v3);
}

void init()
{
    shared_config = create_config();
    config_dom = domain_new(delete_config);
    if (!config_dom)
        err(EXIT_FAILURE, "domain_new");
}

void deinit()
{
    delete_config(shared_config);
    domain_free(config_dom);
}

static void *reader_thread(void *arg)
{
    (void) arg;

    for (int i = 0; i < N_ITERS; ++i) {
        config_t *safe_config =
            (config_t *) load(config_dom, (uintptr_t *) &shared_config);
        if (!safe_config)
            err(EXIT_FAILURE, "load");

        print_config("read config    ", safe_config);
        drop(config_dom, (uintptr_t) safe_config);
    }

    return NULL;
}

static void *writer_thread(void *arg)
{
    (void) arg;

    for (int i = 0; i < N_ITERS / 2; ++i) {
        config_t *new_config = create_config();
        new_config->v1 = rand();
        new_config->v2 = rand();
        new_config->v3 = rand();
        print_config("updating config", new_config);

        swap(config_dom, (uintptr_t *) &shared_config, (uintptr_t) new_config,
             0);
        print_config("updated config ", new_config);
    }

    return NULL;
}

int main()
{
    pthread_t readers[N_READERS], writers[N_WRITERS];

    init();

    /* Start threads */
    for (size_t i = 0; i < ARRAY_SIZE(readers); ++i) {
        if (pthread_create(readers + i, NULL, reader_thread, NULL))
            warn("pthread_create");
    }
    for (size_t i = 0; i < ARRAY_SIZE(writers); ++i) {
        if (pthread_create(writers + i, NULL, writer_thread, NULL))
            warn("pthread_create");
    }

    /* Wait for threads to finish */
    for (size_t i = 0; i < ARRAY_SIZE(readers); ++i) {
        if (pthread_join(readers[i], NULL))
            warn("pthread_join");
    }
    for (size_t i = 0; i < ARRAY_SIZE(writers); ++i) {
        if (pthread_join(writers[i], NULL))
            warn("pthread_join");
    }

    deinit();

    return EXIT_SUCCESS;
}