Try   HackMD

Lock-Free Ringbuffer

在 box64 執行的過程中就已經使用了需多 lock ,如果測量期行為的程式也需使用 lock 的話無疑會讓執行過程更複雜,因此務必使用 lock-free

Race Condition

大家都知道要避免 race condition, 但 race condition 會發生在哪些情境呢? 發生了會怎樣呢?

  1. 多個生產者之間:

  2. 生產者與消費者:

  3. 多個消費者之間:

lock-free

於是我們需要變更原本執行緒的程式邏輯,當 atomic 操作不能成功時,就重來,直到 RMW 一類的 atomic 操作成功為止

並不是「沒有用到 mutex lock」 就是 lock-free, 而是「能保證至少有一個執行緒在進展」才叫 lock-free.

A Pragmatic Implementation of Non-Blocking Linked-Lists

從論文中可以窺探到 lock-free 的秘密,即將步驟分成 logicalphysical ,而 logical 會通知所有執行緒

Locks Aren't Slow; Lock Contention Is

  • high contention
  • high frequency

best-case and worst-case usage scenarios for locks ->

lock-free ring buffer 的考量因素

Compiler Barrier

Memory Barrier

執行路徑

ABA 問題

「裡面放的東西一樣」不能確保「剛剛到現在都沒有人動過裡面的東西」

多行程下的記憶體配置

Memory Order

memory_order_relaxed
memory_order_consume
memory_order_acquire
memory_order_release
memory_order_acq_rel
memory_order_seq_cst

延遲


ringbuf_shm

ringbuf-shm

考量因素

  1. 跨行程的記憶體讀取
  2. Bounded buffer
  3. 共享記憶體的同步問題
  4. Copy free

atomic + while + 預約 -> how ?

結構 :

typedef struct {
    uint32_t size, gap;
} ringbuf_element_t;

typedef struct {
    size_t size, mask, rsvd /* reserved */, gapd;
    memory_order acquire, release;
    atomic_size_t head, tail;
    uint8_t buf[] __attribute__((aligned(sizeof(ringbuf_element_t))));
} ringbuf_t;

生產者

static void *producer_main(void *arg)
{
    ringbuf_t *ringbuf = arg;

    uint64_t cnt = 0;
    while (cnt < iterations) {
        if (rand() < THRESHOLD)
            nanosleep(&req, NULL);

        size_t written = PAD(rand() * 1024.f / RAND_MAX);

        size_t maximum;
        uint8_t *ptr;
        if ((ptr = ringbuf_write_request_max(ringbuf, written, &maximum))) {
            assert(maximum >= written);
            const uint8_t *end = ptr + written;
            for (uint8_t *src = ptr; src < end; src += sizeof(uint64_t)) {
                *(uint64_t *) src = cnt;
                assert(*(uint64_t *) src == cnt);
            }
            ringbuf_write_advance(ringbuf, written);
            cnt++;
        } /* else: buffer full */
    }

    return NULL;
}

MPSC

mpsc

測試程式:

  1. 是否符合 FIFO

考量因素:

  1. 多個生產者同時寫入
  2. ABA 問題

結構

typedef struct node {
    atomic_uintptr_t next;
} node;

struct queue_internal {
    atomic_uintptr_t head, tail;
    size_t item_size;
};

什麼意思?這是 linked-list 的意思嗎?其實不然

生產者

生產者會更新執行的次數 producer_count 並將 in push 到 Queue 中,然而 test_producer 是如何做到 lock-free 呢? 如何確保對共享 queue 的讀-修改-寫(read-modify-write)沒有被打擾呢?

static void *test_producer(void *arg)
{
    queue_test_t *test = (queue_test_t *) arg;
    assert(test->q);
    while (1) {
        int in = atomic_fetch_add(&test->producer_count, 1);
        if (in >= THREAD_COUNT)
            break;
        queue_result_t result = Queue.push(test->q, &in);
        assert(result == QUEUE_SUCCESS);
    }
    return NULL;
}

THREAD_COUNT 的用意為何? 跟 PRODUCER_COUNT 的差別是?

THREAD_COUNT 是指執行過程中所用到的執行緒數量上限,即 consume_count(消費者執行的次數) + producer_count (生產者執行的次數) <= THREAD_COUNT

static queue_result_t queue_push(queue_p q, void *data)
{
    assert(q);
    /* create the new tail */
    node *new_tail = malloc(sizeof(node) + q->item_size);
    if (!new_tail)
        return QUEUE_OUT_OF_MEMORY;

    atomic_init(&new_tail->next, 0);
    memcpy(new_tail + 1, data, q->item_size);

    /* swap the new tail with the old */
    node *old_tail =
        (node *) atomic_exchange(&q->tail, (atomic_uintptr_t) new_tail);

    /* link the old tail to the new */
    atomic_store(old_tail ? &old_tail->next : &q->head,
                 (atomic_uintptr_t) new_tail);
    return QUEUE_SUCCESS;
}

可以注意到 psuh 分成兩個步驟:
a. 找到要「接上」的 node (相當於對所有執行緒預告了未來的 tail)

b. 「接上」node

那如果失敗呢?

步驟 a 不一定要直接接著步驟 a ,兩步驟間可以穿插其他執行緒的步驟,假設有兩個生產者的執行緒同時要加入節點

atomic_exchange 成功 失敗
old_tail
q->tail
atomic_store 成功 失敗
old_tail->next
q->head

參考

  1. lock-free
  2. 並行程式設計: Atomics 操作
  3. Linux 核心設計: 淺談同步機制
  4. 7.17 Atomics <stdatomic.h>
  5. 5.1.2.4 Multi-threaded executions and data races