Try   HackMD

Experiments on Concurrent Queue

contributed by < jeffrey.w >

Introduction

本文將參考 Michael & Scott algorithm,以 C11 規格 [1] 實作 non-blocking concurrent queue

Non-blocking singly-linked list

由於 Michael & Scott algorithm 是使用 singly-linked list 實作 non-blocking concurrent queue,所以本文會先實驗如何實作一個 non-blocking singly-linked list,重點會放在 concurrent insertion。

關於 concurrent linked list,2001 年 Harris 提出一個 non-blockingsolution [2],這個 solution 使用了 compare-and-swap 來完成。

Harris's solution

Harris's solution 對於 non-blocking singly-linked listinsertion是這樣 [2]

Insert node n aftre node *p

  • Step 1. node *next = p->next
  • Step 2. n.next = next
  • Step 3. bool result = cas(&p->next, next, n)
  • Step 4. if result == false, goto Step 1

C11 規格 實作 Harris's solution
[harris_solution.c]

typedef struct node_t {
    volatile struct node_t *next;
} node_t;

void insert_after(node_t *p, node_t new) {
    volatile node_t *next;
    do {
        next = p->next;
        new.next = next;
    }while(! atomic_compare_exchange_strong(&p->next, &next, &new));
}

有一行要特別挑出來實驗一下

atomic_compare_exchange_strong(&p->next, &next, &new)

這樣寫對嗎?我們看一下 C11 規格
§7.17.7.4/1

_Bool atomic_compare_exchange_strong(volatile A *object, C *expected, C desired);

§7.17.7.4/3

NOTE 1 For example, the effect of atomic_compare_exchange_strong is
 if (memcmp(object, expected, sizeof (*object)) == 0)
       memcpy(object, &desired, sizeof (*object));
 else
       memcpy(expected, object, sizeof (*object));

能不能把 atomic_compare_exchange_strong(&p->next, &next, &new) 改成這樣?

atomic_compare_exchange_strong(p->next, next, new)

以下圖為例來檢視 (p->next, next, new)(&p->next, &next, &new) 的差別







G


cluster_1

p


cluster_2

struct node_t


cluster_next

next



p

 



node1

next



p:c->node1






null
NULL



node1:c->null






next

 



null_1
NULL



next:c->null_1






  • next:一個 pointer,跟 p->next 指向同一個 address (在這裡是指向 NULL)
  • p: 一個 pointer,指向一個 struct node_t
  • p->next:一個 pointer,指向一個 struct node_t,在這裡是指向 NULL






G


cluster_1

new_node_t



new

next



null_0
NULL



new:c->null_0






  • new:new_node_t 是一個 struct node_t,這個 struct 有一個 pointer,指向 NULL

_Bool atomic_compare_exchange_strong(volatile A *object, C *expected, C desired);

objectexpected 都是 pointer,如果這兩個 pointer 所指向的 location 的前 n 個 bytes 一樣的話 (n == sizeof(*object))desired 的值就會被 copy 到 object 所指向的 location。

所以,如果改成 atomic_compare_exchange_strong(p->next, next, new),由於 object 這個 pointer (p->next) 指向 NULL,所以會發生 Segmentation fault

用兩條 thread 實驗一下 Harris’s solution 的實作

[test_harris.c]

void *thx_insert(void *p) {
    node_t n;
    n.next = NULL;
    insert_after((node_t *)p, n);
}

int main(int argc, char **argv) {

    node_t dummy;
    dummy.next = NULL;

    pthread_t thx0, thx1;

    pthread_create(&thx0, NULL, thx_insert, &dummy);
    pthread_create(&thx1, NULL, thx_insert, &dummy);
    pthread_join(thx0, NULL);
    pthread_join(thx1, NULL);

    assert(2 == calc_length(&dummy));
    return 0;
} 

上面這個實作純粹是在實驗 insert element in concurrency,所以每次的 insert 都 insert 到 list 的前面,並不是 insert 到 list 的最後面。

至於驗證 insert 正不正確是看最後的結果 list 的個數正不正確。

calc_length 是參考 ConcurrentLinkedQueue::size() 的寫法,計算長度的過程中如果發生 elementremoveinsert,計算的結果就會不正確。
[list.c]

unsigned int calc_length(node_t *head) {
    assert(head != NULL);
    node_t *ptr = head;
    unsigned int length = 0;
    while(ptr->next != NULL && length < INT_MAX){
        ptr = (node_t *)ptr->next;
        ++length;
    }
    return length;
}

Insertion

上面的實作是 insert 到 list 的前面,而不是 tail,所以這裡實作一個 insertion 是 insert 到 listtail
在這個實作會增加兩個 pointer: headtail,在下圖這個例子,list 有兩個 elements。是 2 個 elements,不是 3 個,因為藍底白字那個 node 是 dummy,就是 head 指向的 struct node_t。







G


cluster_0

tail


cluster_1

head


cluster_2




tail

 



node1

next



tail:c->node1






head

 



dummy

next



head:c->dummy






node0

next



dummy:c->node0






node0:c->node1






null
NULL



node1:c->null






所以一個空白的 list 會是這樣







G


cluster_0

tail


cluster_2



cluster_1

head



head

 



dummy

next



head:c->dummy






tail

 



tail:c->dummy






null
NULL



dummy:c->null






參考 Michael & Scott algorithmenquepseudo-code,在 insert node 之後,再用 atomic instruction CAS 將 tail 指到最後一個 node。

修改過的 insert 像這樣
[list.c]

void insert(list_t *list, node_t new) {
    node_t *tail;
    volatile node_t *next;
    do {
        tail = list->tail;
        next = tail->next;
        new.next = next;
    } while (! atomic_compare_exchange_strong(&tail->next, &next, &new));
    atomic_compare_exchange_strong(&list->tail, &tail, &new);
}

Harris's solution 不一樣的是這一行 atomic_compare_exchange_strong(&list->tail, &tail, &new);,這裡參考 Michael & Scott algorithm enquepseudo-code [3],使用 CAS 將 list->tail 指向新增的 node (下圖的 E17)。

[source: Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms, by Maged M. Michael and Michael L. Scott]

Benchmark

參考 jserv 老師的 concurrent-ll,寫一段 code 來檢驗 performance。

References

References

tags: concurrency non-blocking blocking concurrent queue linked list Harris's solution algorithm