# Experiments on Concurrent Queue contributed by < [`jeffrey.w`](https://github.com/jeffrey-minwei/experiments-on-concurrency/tree/master/concurrent-queue) > ## Introduction 本文將參考 [`Michael & Scott algorithm`](https://www.researchgate.net/publication/2804621_Simple_Fast_and_Practical_Non-Blocking_and_Blocking_Concurrent_Queue_Algorithms),以 `C11 規格` <sup>[[`1`](http://www.open-std.org/jtc1/sc22/wg14/www/docs/n1570.pdf)]</sup> 實作 `non-blocking concurrent queue`。 ## Non-blocking singly-linked list 由於 [`Michael & Scott algorithm`](https://www.researchgate.net/publication/2804621_Simple_Fast_and_Practical_Non-Blocking_and_Blocking_Concurrent_Queue_Algorithms) 是使用 `singly-linked list` 實作 `non-blocking concurrent queue`,所以本文會先實驗如何實作一個 `non-blocking singly-linked list`,重點會放在 concurrent insertion。 關於 `concurrent linked list`,2001 年 `Harris` 提出一個 `non-blocking` 的 `solution` <sup>[[`2`](https://en.wikipedia.org/wiki/Non-blocking_linked_list)]</sup>,這個 `solution` 使用了 `compare-and-swap` 來完成。 ### Harris's solution `Harris's solution` 對於 `non-blocking singly-linked list` 的 `insertion`是這樣 <sup>[[`2`](https://en.wikipedia.org/wiki/Non-blocking_linked_list)]</sup>: **Insert node n aftre node \*p** - Step 1. node *next = p->next - Step 2. n.next = next - Step 3. bool result = cas(&p->next, next, n) - Step 4. if result == false, **goto Step 1** 用 [`C11 規格`](http://www.open-std.org/jtc1/sc22/wg14/www/docs/n1570.pdf) 實作 `Harris's solution` [[`harris_solution.c`](https://github.com/jeffrey-minwei/experiments-on-concurrency/blob/master/concurrent-queue/harris_solution.c)] ```c typedef struct node_t { volatile struct node_t *next; } node_t; void insert_after(node_t *p, node_t new) { volatile node_t *next; do { next = p->next; new.next = next; }while(! atomic_compare_exchange_strong(&p->next, &next, &new)); } ``` 有一行要特別挑出來實驗一下 ```c atomic_compare_exchange_strong(&p->next, &next, &new) ``` 這樣寫對嗎?我們看一下 [`C11 規格`](http://www.open-std.org/jtc1/sc22/wg14/www/docs/n1570.pdf) `§7.17.7.4/1` > _Bool atomic_compare_exchange_strong(volatile A *object, C *expected, C desired); `§7.17.7.4/3` ```shell NOTE 1 For example, the effect of atomic_compare_exchange_strong is if (memcmp(object, expected, sizeof (*object)) == 0) memcpy(object, &desired, sizeof (*object)); else memcpy(expected, object, sizeof (*object)); ``` 能不能把 `atomic_compare_exchange_strong(&p->next, &next, &new)` 改成這樣? ```c atomic_compare_exchange_strong(p->next, next, new) ``` 以下圖為例來檢視 `(p->next, next, new)` 和 `(&p->next, &next, &new)` 的差別 ```graphviz digraph G { rankdir=LR; node [shape=record]; subgraph cluster_1 { rankdir=TB; label=p; graph[style=dotted]; p[label="<next>"]; } subgraph cluster_2 { rankdir=TB; label="struct node_t"; graph[style=dotted]; node1[label="<next> next\r\n"]; } p:next:c->node1 [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false, arrowsize=0.6]; null[shape=none label="NULL"]; node1:next:c -> null [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false, arrowsize=0.6]; subgraph cluster_next { label=next; graph[style=dotted]; next[label="<next>", fontcolor=blue;color=blue]; } null_1[shape=none label="NULL"]; next:c -> null_1 [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false, arrowsize=0.6, color=blue]; } ``` - next:**一個 pointer**,跟 p->next 指向同一個 address <sub>(在這裡是指向 NULL)</sub> - p: **一個 pointer**,指向一個 struct node_t - p->next:**一個 pointer**,指向一個 struct node_t,在這裡是指向 NULL ```graphviz digraph G { rankdir=LR; node [shape=record]; subgraph cluster_1 { label=new_node_t; graph[style=dotted]; new[label="<next> next\r\n"]; null_0[shape=none, label="NULL"]; new:next:c -> null_0 [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false, arrowsize=0.6]; } } ``` - new:new_node_t 是一個 struct node_t,這個 struct 有一個 pointer,指向 NULL > _Bool atomic_compare_exchange_strong(volatile A *object, C *expected, C desired); `object` 和 `expected` 都是 pointer,如果這兩個 pointer 所指向的 location 的前 n 個 bytes 一樣的話 <sub>(n == sizeof(*object))</sub>,`desired` 的值就會被 copy 到 `object` 所指向的 location。 所以,如果改成 `atomic_compare_exchange_strong(p->next, next, new)`,由於 `object` 這個 pointer <sub>(`p->next`)</sub> 指向 NULL,所以會發生 `Segmentation fault`。 用兩條 `thread` 實驗一下 `Harris’s solution` 的實作 [[`test_harris.c`](https://github.com/jeffrey-minwei/experiments-on-concurrency/blob/master/concurrent-queue/test_harris.c)] ```c void *thx_insert(void *p) { node_t n; n.next = NULL; insert_after((node_t *)p, n); } int main(int argc, char **argv) { node_t dummy; dummy.next = NULL; pthread_t thx0, thx1; pthread_create(&thx0, NULL, thx_insert, &dummy); pthread_create(&thx1, NULL, thx_insert, &dummy); pthread_join(thx0, NULL); pthread_join(thx1, NULL); assert(2 == calc_length(&dummy)); return 0; } ``` 上面這個實作純粹是在實驗 insert element in concurrency,所以每次的 insert 都 insert 到 list 的前面,並不是 insert 到 list 的最後面。 至於驗證 insert 正不正確是看最後的結果 list 的個數正不正確。 `calc_length` 是參考 [`ConcurrentLinkedQueue::size()`](https://github.com/frohoff/jdk8u-jdk/blob/master/src/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java#L441) 的寫法,計算長度的過程中如果發生 `element` 的 `remove` 或 `insert`,計算的結果就會不正確。 [[`list.c`](https://github.com/jeffrey-minwei/experiments-on-concurrency/blob/master/concurrent-queue/list.c)] ```c unsigned int calc_length(node_t *head) { assert(head != NULL); node_t *ptr = head; unsigned int length = 0; while(ptr->next != NULL && length < INT_MAX){ ptr = (node_t *)ptr->next; ++length; } return length; } ``` ### Insertion 上面的實作是 insert 到 list 的前面,而不是 tail,所以這裡實作一個 insertion 是 insert 到 `list` 的 `tail`。 在這個實作會增加兩個 `pointer`: `head` 和 `tail`,在下圖這個例子,`list` 有兩個 elements。是 <font color='red'>2</font> 個 elements,不是 <font color='red'>3</font> 個,因為藍底白字那個 node 是 dummy,就是 `head` 指向的 struct node_t。 ```graphviz digraph G { rankdir=LR; node [shape=record]; subgraph cluster_0 { label="tail"; tail[label="<next>"]; graph[style=dotted]; } subgraph cluster_1 { label="head"; head[label="<next>"]; graph[style=dotted]; } subgraph cluster_2 { rankdir=LR; graph[style=dotted]; dummy[label="<next> next\r\n", color=blue, fontcolor=white, style="filled"]; node0[label="<next> next\r\n"]; node1[label="<next> next\r\n"]; dummy:next:c -> node0 [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false, arrowsize=0.6]; node0:next:c -> node1 [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false, arrowsize=0.6]; } node1:next:c -> null [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false, arrowsize=0.6]; null[shape=none label="NULL"]; head:next:c->dummy [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false, arrowsize=0.6]; tail:next:c->node1 [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false, arrowsize=0.6]; } ``` 所以一個空白的 `list` 會是這樣 ```graphviz digraph G { rankdir=LR; node [shape=record]; subgraph cluster_1 { label="head"; head[label="<next>"]; graph[style=dotted]; } subgraph cluster_0 { label="tail"; tail[label="<next>"]; graph[style=dotted]; } subgraph cluster_2 { graph[style=dotted]; dummy[label="<next> next\r\n", color=blue, fontcolor=white, style="filled"]; } dummy:next:c -> null [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false, arrowsize=0.6]; head:next:c->dummy [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false, arrowsize=0.6]; tail:next:c->dummy [arrowhead=vee, arrowtail=dot, dir=both, tailclip=false, arrowsize=0.6]; null[shape=none label="NULL"]; } ``` 參考 [`Michael & Scott algorithm`](https://www.researchgate.net/publication/2804621_Simple_Fast_and_Practical_Non-Blocking_and_Blocking_Concurrent_Queue_Algorithms) 對 `enque` 的 `pseudo-code`,在 insert node 之後,再用 atomic instruction CAS 將 tail 指到最後一個 node。 修改過的 insert 像這樣 [[`list.c`](https://github.com/jeffrey-minwei/experiments-on-concurrency/blob/master/concurrent-queue/list.c#L23)] ```c void insert(list_t *list, node_t new) { node_t *tail; volatile node_t *next; do { tail = list->tail; next = tail->next; new.next = next; } while (! atomic_compare_exchange_strong(&tail->next, &next, &new)); atomic_compare_exchange_strong(&list->tail, &tail, &new); } ``` 跟 `Harris's solution` 不一樣的是這一行 `atomic_compare_exchange_strong(&list->tail, &tail, &new);`,這裡參考 [`Michael & Scott algorithm`](https://www.researchgate.net/publication/2804621_Simple_Fast_and_Practical_Non-Blocking_and_Blocking_Concurrent_Queue_Algorithms) `enque` 的 `pseudo-code` <sup>[[`3`](https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html#nbq)]</sup>,使用 CAS 將 `list->tail` 指向新增的 node (**下圖的 E17**)。 <small>[[`source: Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms, by Maged M. Michael and Michael L. Scott`](https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html#nbq)]</small> ![](https://i.imgur.com/Jv8b4Mw.png) ### Benchmark 參考 jserv 老師的 [`concurrent-ll`](https://github.com/jserv/concurrent-ll#tools),寫一段 code 來檢驗 performance。 ### References - [ ] [你所不知道的C語言: linked list 和非連續記憶體操作](https://hackmd.io/@sysprog/c-linked-list) <!-- ftp://ftp.cs.rochester.edu/pub/packages/sched_conscious_synch/concurrent_queues --> ## References - [ ] Maged M. Michael and Michael L. Scott. 1996. Simple, fast, and practical non-blocking and blocking concurrent queue algorithms. In <i>Proceedings of the fifteenth annual ACM symposium on Principles of distributed computing</i> (<i>PODC '96</i>). Association for Computing Machinery, New York, NY, USA, 267–275. DOI:https://doi.org/10.1145/248052.248106 - [ ] [github, jdk8u-jdk/src/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java](https://github.com/frohoff/jdk8u-jdk/blob/master/src/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java) - [ ] [Java Language Specification, 17.4.5. Happens-before Order](https://docs.oracle.com/javase/specs/jls/se8/html/jls-17.html#jls-17.4.5) ###### tags: `concurrency` `non-blocking` `blocking` `concurrent queue` `linked list` `Harris's solution` `algorithm`