Try   HackMD

2024q1 Homework(quiz10)

contributed by < yourui1017 >

開發環境

$ gcc --version
(Ubuntu 11.4.0-1ubuntu1~22.04) 11.4.0

$ lscpu
Architecture:                    x86_64
CPU op-mode(s):                  32-bit, 64-bit
Address sizes:                   39 bits physical, 48 bits virtual
Byte Order:                      Little Endian
CPU(s):                          12
On-line CPU(s) list:             0-11
Vendor ID:                       GenuineIntel
Model name:                      Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
CPU family:                      6
Model:                           158
Thread(s) per core:              2
Core(s) per socket:              6
Socket(s):                       1
Stepping:                        10
CPU max MHz:                     4100.0000
CPU min MHz:                     800.0000
BogoMIPS:                        4399.99
Virtualization:                  VT-x
Caches (sum of all):     
L1d:                             192 KiB (6 instances)
L1i:                             192 KiB (6 instances)
L2:                              1.5 MiB (6 instances)
L3:                              9 MiB (1 instance)
NUMA:                    
NUMA node(s):                    1
NUMA node0 CPU(s):               0-11

第十週測驗題

測驗2

延伸問題:

  1. 解釋上述程式碼運作原理,程式碼參照論文〈A Scalable, Portable, and Memory-Efficient Lock-Free FIFO Queue〉提及的 SCQ (Scalable Circular Queue)
  2. 比照 rcu-list 的實作手法,藉由 thread-rcu 來改寫上述使用到 hazard pointer (HP) 的場景,比較效能落差並解讀
  3. userspace RCU 改寫上述程式碼並探討效能表現
  4. 綜合上述實驗,提出改進 lfq 效能的方案
  5. 在 Linux 核心找出類似 lfq 需求的程式碼和應用場景

lock-free: 強調以系統的觀點來看,只要執行足夠長的時間,至少會有一個執行緒會有進展。因此不能夠恣意地使用 lock,否則就會違反 lock-free 的特性。

於是乎,我們需要某種同為 lock-free 的記憶體物件回收機制。

對於 C 這樣缺乏內建 concurrent GC 機制的程式語言來說,若要實作 lock-free 演算法,就要自行處理記憶體釋放的議題。Hazard pointer 是其中一種解決方案,其原理是讀取端執行緒對指標進行識別,指標 (特別是指向的記憶體區塊) 若要釋放時,會事先保存,延遲到確認沒有讀取端執行緒,才進行真正的釋放。Linux 核心的 RCU 同步機制是另一種 lock-free 程式設計演算法和記憶體回收機制。

參考自 並行程式設計: Hazard pointer

重要結構體

struct lfq_node {
    void *data;
    union {
        struct lfq_node *next;
        struct lfq_node *free_next;
    };
    bool can_free;
};

struct lfq_ctx {
    alignas(64) struct lfq_node *head;
    int count;
    struct lfq_node **HP; /* hazard pointers */
    int *tid_map;
    bool is_freeing;
    struct lfq_node *fph, *fpt; /* free pool head/tail */

    /* FIXME: get rid of struct. Make it configurable */
    int MAX_HP_SIZE;

    /* avoid cacheline contention */
    alignas(64) struct lfq_node *tail;
};

程式碼運作原理

首先,會先建立 lfq_ctx ,並且初始化此 lfq_ctx 。

int lfq_init(struct lfq_ctx *ctx, int max_consume_thread)
{
    struct lfq_node *tmp = calloc(1, sizeof(struct lfq_node));
    if (!tmp)
        return -errno;

    struct lfq_node *node = calloc(1, sizeof(struct lfq_node));
    if (!node)
        return -errno;

    tmp->can_free = node->can_free = true;
    memset(ctx, 0, sizeof(struct lfq_ctx));
    ctx->MAX_HP_SIZE = max_consume_thread;
    ctx->HP = calloc(max_consume_thread, sizeof(struct lfq_node));
    ctx->tid_map = calloc(max_consume_thread, sizeof(struct lfq_node));
    ctx->head = ctx->tail = tmp;
    ctx->fph = ctx->fpt = node;

    return 0;
}

並且我做了以下的改變:

int lfq_init(struct lfq_ctx *ctx, int max_consume_thread)
{
    struct lfq_node *tmp = calloc(1, sizeof(struct lfq_node));
    if (!tmp)
        return -errno;

    struct lfq_node *node = calloc(1, sizeof(struct lfq_node));
    if (!node)
        return -errno;
+   memset(ctx, 0, sizeof(struct lfq_ctx));
    tmp->can_free = node->can_free = true;
-   memset(ctx, 0, sizeof(struct lfq_ctx));
    ctx->MAX_HP_SIZE = max_consume_thread;
    ctx->HP = calloc(max_consume_thread, sizeof(struct lfq_node));
-   ctx->tid_map = calloc(max_consume_thread, sizeof(struct lfq_node));
+   ctx->tid_map = calloc(max_consume_thread, sizeof(int));
    ctx->head = ctx->tail = tmp;
    ctx->fph = ctx->fpt = node;

    return 0;
}

在原本的程式碼中會先將 tmp->can_free 和 node->can_free 設為 True ,但卻又在隨後把所有 ctx 中的位元組皆設為 0 ,讓前面的初始動作失去效用,因此把 memset(ctx, 0, sizeof(struct lfq_ctx)); 移至 tmp->can_free = node->can_free = true; 之前。

另外,原始的程式碼配置記憶體大小為結構體 struct lfq_nodectx->tid_map ,但是因為 ctx->tid_map 的資料型別是 *int ,因此更改為配置記憶體大小為 intctx->tid_map

接下來則是建立 consumer 和 producer 的 thread ,並且在沒有使用到 lock 的情況下,完成 Producer–consumer problem

其中會使用到 4 個變數儲存當前的狀態

  • uint64_t cnt_added
    • 紀錄 Producer 加入多少次資料
  • uint64_t cnt_removed
    • 紀錄 Consumer 拿走多少次資料
  • int cnt_thread
    • 紀錄當前 Consumer 的 thread ID
  • int cnt_producer
    • 紀錄當前 Producer 的數量

問題1: 為什麼在 main 函式中要先給定 consumer thread 的任務,而不是先給定 producer thread 的任務? 如此一來不是可以省去給定任務前後的 atomic_fetch_add(&cnt_producer, 1); 和 atomic_fetch_sub(&cnt_producer, 1); 操作嗎?

atomic_fetch_add(&cnt_producer, 1);
for (int i = 0; i < MAX_CONSUMER; i++) {
    pthread_create(&thread_cons[i], NULL, remove_queue, (void *) &ctx);
}

for (int i = 0; i < MAX_PRODUCER; i++) {
    atomic_fetch_add(&cnt_producer, 1);
    pthread_create(&thread_pros[i], NULL, add_queue, (void *) &ctx);
}
atomic_fetch_sub(&cnt_producer, 1);

問題2: 在 lfq.c 中有 lfq_dequeue 函式,但卻沒有用到,他的用途是什麼?

首先先了解 producer 的任務。

producer 的任務是把資料放入佇列中,等待 consumer 提取資料。

執行函式如下:

void *add_queue(void *data)
{
    struct lfq_ctx *ctx = data;
    long added;
    for (added = 0; added < 500000; added++) {
        struct user_data *p = malloc(sizeof(struct user_data));
        p->data = SOME_ID;
        int ret = 0;
        if ((ret = lfq_enqueue(ctx, p)) != 0) {
            printf("lfq_enqueue failed, reason:%s\n", strerror(-ret));
            atomic_fetch_add(&cnt_added, added);
            atomic_fetch_sub(&cnt_producer, 1);
            return 0;
        }
    }
    atomic_fetch_add(&cnt_added, added);
    atomic_fetch_sub(&cnt_producer, 1);
    printf("Producer thread [%lu] exited! Still %d running...\n",
           pthread_self(), atomic_load(&cnt_producer));
    return 0;
}
int lfq_enqueue(struct lfq_ctx *ctx, void *data)
{
    struct lfq_node *insert_node = calloc(1, sizeof(struct lfq_node));
    if (!insert_node)
        return -errno;

    insert_node->data = data;
    struct lfq_node *old_tail = atomic_exchange(&ctx->tail, insert_node);
    /* We have claimed our spot in the insertion order by modifying tail.
     * we are the only inserting thread with a pointer to the old tail.
     *
     * Now we can make it part of the list by overwriting the NULL pointer in
     * the old tail. This is safe whether or not other threads have updated
     * ->next in our insert_node.
     */
    assert(!old_tail->next && "old tail was not NULL");
    atomic_store(&old_tail->next, insert_node);

    return 0;
}

每個 producer thread 會有各自的區域參數 added,紀錄當前輸入資料的次數,並且在最後將每個 producer thread 輸入資料的次數總和儲存在主進程的 cnt_added。每個 producer thread 預設會輸入 500000 次資料進入佇列。因此,如果 producer thread 沒有完成指定的輸入資料次數,代表在建立 insert_node 時發生錯誤。

另外可以注意到在針對 struct lfq_ctx *ctx 的資料進行操作時,會使用到 atomic 操作,防止 condition variable 的發生,導致資料不正確。

再來是 consumer 的任務。

consumer 的任務是等待 producer 把資料放入佇列中,並提取資料。

執行函式如下:

void *remove_queue(void *data)
{
    struct lfq_ctx *ctx = data;
    int tid = atomic_fetch_add(&cnt_thread, 1);
    long deleted = 0;
    while (1) {
        struct user_data *p = lfq_dequeue_tid(ctx, tid);
        if (p) {
            if (p->data != SOME_ID) {
                printf("data wrong!!\n");
                exit(1);
            }

            free(p);
            deleted++;
        } else {
            if (!ctx->count && !atomic_load(&cnt_producer))
                break;     /* queue is empty and no more producers */
            sched_yield(); /* queue is empty, release CPU slice */
        }
    }
    atomic_fetch_add(&cnt_removed, deleted);

    printf("Consumer thread [%lu] exited %d\n", pthread_self(), cnt_producer);
    return 0;
}
void *lfq_dequeue_tid(struct lfq_ctx *ctx, int tid)
{
    struct lfq_node *old_head, *new_head;

    /* HP[tid] is necessary for deallocation. */
    do {
    retry:
        /* continue jumps to the bottom of the loop, and would attempt a
         * atomic_compare_exchange_strong with uninitialized new_head.
         */
        old_head = atomic_load(&ctx->head);

        atomic_store(&ctx->HP[tid], old_head);
        atomic_thread_fence(memory_order_seq_cst);

        /* another thread freed it before seeing our HP[tid] store */
        if (old_head != atomic_load(&ctx->head))
            goto retry;
        new_head = atomic_load(&old_head->next);

        if (new_head == 0) {
            atomic_store(&ctx->HP[tid], 0);
            return NULL; /* never remove the last node */
        }
    } while (!atomic_compare_exchange_strong(&ctx->head, &old_head, new_head));

    /* We have atomically advanced head, and we are the thread that won the race
     * to claim a node. We return the data from the *new* head. The list starts
     * off with a dummy node, so the current head is always a node that is
     * already been read.
     */
    atomic_store(&ctx->HP[tid], 0);
    void *ret = new_head->data;
    atomic_store(&new_head->can_free, true);

    /* we need to avoid freeing until other readers are definitely not going to
     * load its ->next in the atomic_compare_exchange_strong loop
     */
    safe_free(ctx, (struct lfq_node *) old_head);

    return ret;
}

每個 consumer thread 會有各自的區域參數 deleted,紀錄當前讀取資料的次數,並且在最後將每個 consumer thread 讀取資料的次數總和儲存在主進程的 cnt_removed。每個 consumer thread 預設會讀取 500000 次資料。如果 consumer thread 從佇列中得到空的資料,並且還有 producer thread 正在執行的話就讓出 CPU 的使用資源,否則退出 while 迴圈,完成 consumer thread 的任務。

另外,要特別注意的地方是 lfq_ctx 結構體的記憶體釋放。

在並行程式設計中,當我們在存取共用的記憶體物件時,需要考慮到其他執行緒是否有可能也正在存取同一個物件,若要釋放該記憶體物件時,不考慮這個問題,會引發嚴重的後果,例如 dangling pointer

因為本程式碼旨在實作 lock free 演算法。因此沒有使用到 lock 保證沒有其他執行緒正在存取同一物件,並安全地釋放記憶體。為了要實現 lock free 演算法的實作,這邊使用的是 Hazard pointer 的實作方式。

並且可以在函式 lfq_dequeue_tid 中注意到只要是有關 lfq_ctx 結構體的操作,都需要使用 atomic 操作,確保資料的正確性。另外在 lfq_dequeue_tid 中,會把正在使用的節點儲存在 Hazard pointer 中。在此 lfq_dequeue_tid 函式中, lfq_ctx 的 head 會是 dummy head ,也就是目前的 head 早已經被讀取過了,head 的下一個 node 才是儲存需要的資料的節點,並因為已經取得需要的資料,所以在該執行緒中可以把 Hazard pointer(儲存 head 位址) 移除。

再來則是如何安全的釋放該記憶體位置的資料,執行函式如下:

static bool in_hp(struct lfq_ctx *ctx, struct lfq_node *node)
{
    for (int i = 0; i < ctx->MAX_HP_SIZE; i++) {
        if (atomic_load(&ctx->HP[i]) == node)
            return true;
    }
    return false;
}

/* add to tail of the free list */
static void insert_pool(struct lfq_ctx *ctx, struct lfq_node *node)
{
    atomic_store(&node->free_next, NULL);
    struct lfq_node *old_tail = atomic_exchange(&ctx->fpt, node); /* seq_cst */
    atomic_store(&old_tail->free_next, node);
}

static void free_pool(struct lfq_ctx *ctx, bool freeall)
{
    bool old = 0;
    if (!atomic_compare_exchange_strong(&ctx->is_freeing, &old, true))
        return;

    for (int i = 0; i < MAX_FREE || freeall; i++) {
        struct lfq_node *p = ctx->fph;
        if ((!atomic_load(&p->can_free)) || (!atomic_load(&p->free_next)) ||
            in_hp(ctx, (struct lfq_node *) p))
            break;
        ctx->fph = p->free_next;
        free(p);
    }
    atomic_store(&ctx->is_freeing, false);
    atomic_thread_fence(memory_order_seq_cst);
}

static void safe_free(struct lfq_ctx *ctx, struct lfq_node *node)
{
    if (atomic_load(&node->can_free) && !in_hp(ctx, node)) {
        /* free is not thread-safe */
        bool old = 0;
        if (atomic_compare_exchange_strong(&ctx->is_freeing, &old, true)) {
            /* poison the pointer to detect use-after-free */
            node->next = (void *) -1;
            free(node); /* we got the lock; actually free */
            atomic_store(&ctx->is_freeing, false);
            atomic_thread_fence(memory_order_seq_cst);
        } else /* we did not get the lock; only add to a freelist */
            insert_pool(ctx, node);
    } else
        insert_pool(ctx, node);
    free_pool(ctx, false);
}

safe_free 函式中,會確認該 node 的狀態是否可以 free ,並且確認是否還有其他執行緒正在使用此 node ,如果該 node 可以 free 並且沒有執行緒正在使用此 node ,就會再進一步判斷 lfq_ctx 是否正在 free ,避免有多個執行緒同時 free ,確認 lfq_ctx 可以 free 就會把該 node 的記憶體位置釋放,並且為了確保之後的執行緒會看到此變化,還加入 atomic_thread_fence(memory_order_seq_cst); 。相反的,如果上述條件不符合的話,就會把該 node 放入 free pool 中等待釋放。

並且再接下來會持續呼叫函式 free_pool ,確認 free pool 中的節點是否還有執行緒在使用,如果沒有就把節點的記憶體釋放,並且在最後要加入 memory barrier 確保其他執行緒在執行 store/load 動作時,會看到此改動。

safe_freefree_pool 的功用:
safe_free 的功用是會確認是否有執行緒正在使用該記憶體位置,如果沒有就馬上釋放剛剛存取過得記憶體位置,否則就把它加入 free pool 等待釋放。
free_pool 的功用則是隨時檢查 free pool 是否滿足可釋放的條件, free pool 中可能會存有時間叫久遠的記憶體位置,但因為還有執行緒在使用,所以無法釋放。

因此 safe_freefree_pool 的差別是釋放記憶體位置的時間。safe_free 會釋放該執行緒最近使用的記憶體,而 free_pool 則是會隨時查看 free pool 是否能夠釋放,不在乎記憶體最近一次被使用的時間。

最後,當 producer 和 consumer 的任務都完成後,就會將先前配置的動態記憶體位置釋放,並將靜態記憶體位置初始化。