# linux2023 HW3: vax-r
[Github repo](https://github.com/vax-r/linux2023-HW3)
## 挑戰1
### 研讀論文 解釋程式碼
先了解實作work-stealing有用到的結構
**`task_t`**
一個function pointer, 這個function的input是a pointer to `work_t`
且會return另一個a pointer to `work_t`
Serves like a trampoline, 傳遞the subsequent unit of work to be execute
return the next work item if it's prepared for execution, or NULL if the task isn't ready to proceed
```c
typedef struct work_internal *(*task_t)(struct work_internal *);
```
**`work_t`**
```c
typedef struct work_internal {
task_t code;
atomic_int join_count;
void *args[];
} work_t;
```
:::warning
注意程式碼中利用數值來表示empty和abort
```c
static work_t *EMPTY = (work_t *) 0x100, *ABORT = (work_t *) 0x200;
```
不像許多人可能使用NULL等特殊型別來表示, 這樣可以讓return type統一都是work_t但是也能辨識
:::
**`deque_t`**
用來做work-stealing的deque
注意此處是假設deque永遠不會overflow
```c
typedef struct {
atomic_size_t size;
_Atomic work_t *buffer[];
} array_t;
typedef struct {
/* Assume that they never overflow */
atomic_size_t top, bottom;
_Atomic(array_t *) array;
} deque_t;
```
#### init
此函式比較特別的是`a`的size不僅僅是配置`sizeof(array_t)`
而是給他`sizeof(array_t) + sizeof(work_t *) * size_hint`
```c
void init(deque_t *q, int size_hint)
{
atomic_init(&q->top, 0);
atomic_init(&q->bottom, 0);
array_t *a = malloc(sizeof(array_t) + sizeof(work_t *) * size_hint);
atomic_init(&a->size, size_hint);
atomic_init(&q->array, a);
}
```
#### resize
把deque的大小變成原本的兩倍
注意這裡怎麼把舊的work搬進新的deque裏面
```c
void resize(deque_t *q)
{
array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed);
size_t old_size = a->size;
size_t new_size = old_size * 2;
array_t *new = malloc(sizeof(array_t) + sizeof(work_t *) * new_size);
atomic_init(&new->size, new_size);
size_t t = atomic_load_explicit(&q->top, memory_order_relaxed);
size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed);
for (size_t i = t; i < b; i++)
new->buffer[i % new_size] = a->buffer[i % old_size];
atomic_store_explicit(&q->array, new, memory_order_relaxed);
}
```
#### take
input一個deque之後返回一個a pointer to `work_t`
從這裡就可以觀察到, 原本的bottom指向的地方應該是沒有work
所以要拿的時候要先load在減一
如果是空的deque則b原本應該就等於t, 所以減一後會小於t
如果是非空deque則把位在b的元素找出來
要特別處理deque只有一個work的情形
且在這個地方可能發生競爭
```c
work_t *take(deque_t *q)
{
size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed) - 1;
array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed);
atomic_store_explicit(&q->bottom, b, memory_order_relaxed);
atomic_thread_fence(memory_order_seq_cst);
size_t t = atomic_load_explicit(&q->top, memory_order_relaxed);
work_t *x;
if (t <= b) {
/* Non-empty queue */
x = atomic_load_explicit(&a->buffer[b % a->size], memory_order_relaxed);
if (t == b) {
/* Single last element in queue */
// AAAA
if (!atomic_compare_exchange_strong_explicit(&q->top, &t, t + 1,
memory_order_seq_cst,
memory_order_relaxed))
/* Failed race */
x = EMPTY;
// BBBB
atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed);
}
} else { /* Empty queue */
x = EMPTY;
// CCCC
atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed);
}
return x;
}
```
#### push
這個operation比較普通, 就是把w放到deque的bottom
比較有疑慮的是在resize之後的load, 有沒有可能resize還沒完成就load?
```c
void push(deque_t *q, work_t *w)
{
size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed);
size_t t = atomic_load_explicit(&q->top, memory_order_acquire);
array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed);
if (b - t > a->size - 1) { /* Full queue */
resize(q);
a = atomic_load_explicit(&q->array, memory_order_relaxed);
}
atomic_store_explicit(&a->buffer[b % a->size], w, memory_order_relaxed);
atomic_thread_fence(memory_order_release);
// DDDD
atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed);
}
```
#### steal
和take不同的是它從top拿
而且拿的時候可能有競爭發生
```c
work_t *steal(deque_t *q)
{
size_t t = atomic_load_explicit(&q->top, memory_order_acquire);
atomic_thread_fence(memory_order_seq_cst);
size_t b = atomic_load_explicit(&q->bottom, memory_order_acquire);
work_t *x = EMPTY;
if (t < b) {
/* Non-empty queue */
array_t *a = atomic_load_explicit(&q->array, memory_order_consume);
x = atomic_load_explicit(&a->buffer[t % a->size], memory_order_relaxed);
// EEEE
if (!atomic_compare_exchange_strong_explicit(
&q->top, &t, t+1, memory_order_seq_cst, memory_order_relaxed))
/* Failed race */
return ABORT;
}
return x;
}
```
#### thread
每個thread自己的id是送進來的payload, deque就是`thread_queues[id]`
接著嘗試利用`take`拿自己deque當中的work來做, 如果有東西就`do_work`
沒有的話就開始搶別人的
如果沒搶到東西還有判斷是EMPTY還是ABORT
如果是ABORT就重新搶同一個人, EMPTY就搶下一個人
搶到就開始做, 一直重複以上過程直到整個system沒有work為止
#### do_work
一次做一件事`do_one_work`
如果還有下一件事就一直做下去
```c
void do_work(int id, work_t *work)
{
while (work)
work = do_one_work(id, work);
}
```
#### do_one_work
這裡有使用到`task_t`, 就在`return (*(work->code)) (work)`
送入work做為work->code的input, 執行完後會return一個`work_t`
```c
static work_t *do_one_work(int id, work_t *work)
{
printf("work item %d running item %p\n", id, work);
return (*(work->code)) (work);
}
```
#### print_task, join_work
從main裡面可以知道 一開始每個thread有個size為8的deque, 並且塞入10個work, 每個work的內容如下
* code: print_task
* join_count: 0
* args[0]: 1000 * i + j
* args[1]: done_work
所有thread共享同一個`done_work`
之後進入`thread`當中, 如果有work可以做就`do_work`一開始會先做到`print_task`
之後會取出args[1]也就是`done_work`丟入`join_work`當中
```c
work_t *join_work(work_t *work)
{
int old_join_count = atomic_fetch_sub(&work->join_count, 1);
if (old_join_count == 1)
return work;
return NULL;
}
```
`done_work->join_count`初始值是`N_THREADS * nprints`
多數時候減一後不會等於ㄧ, 所以return NULL
所以當前那一次的`do_work`就結束
返回到`thread`繼續拿下一個work
### 改進程式碼
#### resize複製舊值的方法改進
原本在`resize`當中將原本的work從舊的array搬到新的array時候是用迴圈慢慢搬
如果array size越來越大搬的速度會慢很多
我利用`memcpy`改進此function如下
```diff
- for (size_t i = t; i < b; i++)
- new->buffer[i % new_size] = a->buffer[i % old_size];
+ size_t new_t = t % old_size;
+ size_t new_b = b % old_size;
+ if (new_b > new_t) {
+ memcpy(new->buffer + new_t, a->buffer + new_t, old_size * sizeof(work_t *));
+ } else {
+ memcpy(new->buffer + new_t, a->buffer + new_t, (old_size - new_t) * sizeof(work_t *));
+ memcpy(new->buffer + old_size, a->buffer, new_b * sizeof(work_t *));
+ }
```
```c
void resize(deque_t *q)
{
array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed);
size_t old_size = a->size;
size_t new_size = old_size * 2;
array_t *new = malloc(sizeof(array_t) + sizeof(work_t *) * new_size);
atomic_init(&new->size, new_size);
size_t t = atomic_load_explicit(&q->top, memory_order_relaxed);
size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed);
size_t new_t = t % old_size;
size_t new_b = b % old_size;
if (new_b > new_t) {
memcpy(new->buffer + new_t, a->buffer + new_t, old_size * sizeof(work_t *));
} else {
memcpy(new->buffer + new_t, a->buffer + new_t, (old_size - new_t) * sizeof(work_t *));
memcpy(new->buffer + old_size, a->buffer, new_b * sizeof(work_t *));
}
atomic_store_explicit(&q->array, new, memory_order_relaxed);
}
```
#### 隨機挑選要stole的thread
原本在`thread`當中要挑選哪個thread來做work stealing是使用迴圈從id 0開始循序挑選, 和理想上隨機挑選不一樣並且這麼做更容易產生多個thread同時嘗試搶奪一個work的情況, 因為大家stealing的順序都相同
改寫`thread`當中挑選stealing thread的方法
```c
if (work != EMPTY) {
do_work(id, work);
} else {
/* Currently, there is no work present in my own queue */
work_t *stolen = EMPTY;
for (int i = 0; i < N_THREADS; ++i) {
int j = rand() % N_THREADS; // pick a thread randomly
if (j == id)
continue;
do {
stolen = steal(&thread_queues[j]);
} while (stolen == ABORT); /* Try again at the same j */
if (stolen == EMPTY)
continue;
/* Found some work to do */
break;
}
```
比較原本的做法和改寫後的執行時間
可以看到以下結果 從24條thread到100條thread
隨機挑選要做work-stealing的thread會讓程式執行快速許多

*sequence為修改之前, random為修改之後*
### 改寫multi-threaded quicksort
### Linux Kernel CMWQ
## 挑戰2
### 用C11 Atomics改寫程式碼
透過C11 Atomics改寫後function如下
**wait_until_equal_u8**
```c
static inline void wait_until_equal_u8(uint8_t *loc, uint8_t val, int mm)
{
while (atomic_load_explicit((atomic_uchar *) loc, mm_casting(mm)) != val)
spin_wait();
}
```
**mcslock_acquire**
```c
void mcslock_acquire(mcslock_t *lock, mcsnode_t *node)
{
node->next = NULL;
/* A0: Read and write lock, synchronized with A0/A1 */
mcsnode_t *prev = atomic_exchange_explicit((_Atomic mcslock_t *) lock, node, memory_order_acq_rel);
if (LIKELY(!prev)) /* Lock uncontended, the lock is acquired */
return;
/* Otherwise, the lock is owned by another thread, waiting for its turn */
node->wait = MCS_WAIT;
/* B0: Write next, synchronized with B1/B2 */
atomic_store_explicit((_Atomic mcslock_t *) &prev->next, node, memory_order_release);
/* Waiting for the previous thread to signal using the assigned node
* C0: Read wait, synchronized with C1
*/
wait_until_equal_u8(&node->wait, MCS_PROCEED, __ATOMIC_ACQUIRE);
}
```
**mcslock_release**
```c
void mcslock_release(mcslock_t *lock, mcsnode_t *node)
{
mcsnode_t *next;
/* Check if any waiting thread exists */
/* B1: Read next, synchronized with B0 */
if ((next = atomic_load_explicit((_Atomic mcslock_t *) &node->next, memory_order_acquire)) == NULL) {
/* No waiting threads detected, attempt lock release */
/* Use temporary variable as it might be overwritten */
mcsnode_t *tmp = node;
/* A1: write lock, synchronize with A0 */
if (atomic_compare_exchange_strong_explicit((_Atomic mcslock_t *) lock, &tmp, NULL, memory_order_release, memory_order_relaxed)) {
/* No waiting threads yet, lock released successfully */
return;
}
/* Otherwise, at least one waiting thread exists */
/* Wait for the first waiting thread to link its node with ours */
/* B2: Read next, synchronized with B0 */
while ((next = atomic_load_explicit((_Atomic mcslock_t *) &node->next, memory_order_acquire)) == NULL)
spin_wait();
}
/* Signal the first waiting thread */
/* C1: Write wait, synchronized with C0 */
atomic_store_explicit((atomic_uchar *) &next->wait, MCS_PROCEED, memory_order_release);
}
```
### 研讀linux核心的locking實作
在linux kernel中, lock主要被分為三種
* sleeping locks
* CPU local locks
* Spinning locks
:::info
**Lock type nesting rules**
同時使用多個不同種類的lock要遵守以下規定
* 如果都是同一個種類的lock, 只要他們遵守general lock ordering rules就可以任意nesting
* Sleeping lock types不能被包在CPU local lock或Spinning lock裏面
* CPU local locks跟Spinning locks可以被包在Sleeping locks裡面
* Spinning locks可以被包在任何lock裡面
:::
並且這些lock都有一個嚴格的規定
**取得lock的owner必須自己釋放它**
*不過也是存在例外例如`rw_semaphore`*
特別注意的是`local_lock`跟`spinlock_t`
`local_lock`做出critical section的方式是透過disabling preemption or interrupts
在non-PREEMPT_RT kernels當中`local_lock`的operation都可以背對應到各種preemption或disable interrupt的primitives
如果在PREEMPT-RT的kernel中使用`local_lock`則會對應到每個CPU自己的`spinlock_t`
文件中有提到適合使用`local_lock`的時機
> local_lock should be used in situations where disabling preemption or interrupts is the appropriate form of concurrency control to protect per-CPU data structures on a non PREEMPT_RT kernel.
他適合被用在non PREEMPT_RT kernel當中, 用來保護每個CPU自己的data structure
至於spinlock則分為數種
**raw_spinlock_t**
所有kernel中都有的嚴格實作, 最好只在非常重要的code section中使用
例如low-level interrupt handling或一些需要disable preemption, interrupts的地方
如果critical section很小也可以使用, 用來避免RT mutex的overhead
**spinlock_t**
在non PREEMPT_RT kernel當中, `spinlock_t`就是`raw_spinlock_t`
:::warning
看起來lock會因為preempt或non-preempt kernel而在實作上有很大的不同
我對PREEMPT_RT kernel一無所知
應作為未來研讀素材
:::
**解釋mcs_spinlock.h**
mcs_spinlock在每個CPU中自己保有一個local的lock
而waiting的方式則是和spinlock一樣
所以可以看到它在spin waiting的部分是會參考不同硬體架構而有所不同
```c
#define arch_mcs_spin_lock_contended(l) \
do { \
smp_cond_load_acquire(l, VAL); \
} while (0)
#endif
```
:::warning
`VAL`在整份程式碼當中並沒有出現過
他是什麼?
:::
再來看到`mcs_spin_lock`
```c
static inline
void mcs_spin_lock(struct mcs_spinlock **lock, struct mcs_spinlock *node)
{
struct mcs_spinlock *prev;
/* Init node */
node->locked = 0;
node->next = NULL;
/*
* We rely on the full barrier with global transitivity implied by the
* below xchg() to order the initialization stores above against any
* observation of @node. And to provide the ACQUIRE ordering associated
* with a LOCK primitive.
*/
prev = xchg(lock, node);
if (likely(prev == NULL)) {
/*
* Lock acquired, don't need to set node->locked to 1. Threads
* only spin on its own node->locked value for lock acquisition.
* However, since this thread can immediately acquire the lock
* and does not proceed to spin on its own node->locked, this
* value won't be used. If a debug mode is needed to
* audit lock status, then set node->locked value here.
*/
return;
}
WRITE_ONCE(prev->next, node);
/* Wait until the lock holder passes the lock down. */
arch_mcs_spin_lock_contended(&node->locked);
}
```
`lock`就是整個系統的global lock, `node`則是想要嘗試獲取lock的cpu自己的lock
在把`node`給初始化之後, 先利用`xchg`獲取當前global lock是否被佔有
如果沒有就可以直接return
註解有寫為甚麼此處不用設`node->locked = 1`
我認為這就體現了mcs_spinlock的優點就是waiting condition都是在local上
因此自己知道已經取得lock的話也不需要透過任何動作通知其他cpu或者修改任何global variable, 這樣就不會有cache coherence造成的問題
接著看到`mcs_spin_unlock`
```c
static inline
void mcs_spin_unlock(struct mcs_spinlock **lock, struct mcs_spinlock *node)
{
struct mcs_spinlock *next = READ_ONCE(node->next);
if (likely(!next)) {
/*
* Release the lock by setting it to NULL
*/
if (likely(cmpxchg_release(lock, node, NULL) == node))
return;
/* Wait until the next pointer is set */
while (!(next = READ_ONCE(node->next)))
cpu_relax();
}
/* Pass lock to next waiter. */
arch_mcs_spin_unlock_contended(&next->locked);
}
```
十分簡單就是檢查讓佔有lock的cpu釋放這個lock
先檢查這個cpu後面有沒有人在waiting queue當中
通常沒有, 此時把lock寫成NULL就可以了
不過如果在PREEMPT_RT kernel中, 有可能處理unlock的時候突然被搶佔
然後有cpu在這時候嘗試獲取lock, 當然會排進去waiting queue
這時候就會造成`cmpxchg_release`失敗
所以要取得誰突然插隊進來的, 然後把lock傳給他
### 設計實驗說明MCS lock相對於ticket lock的優點
TODO
## 挑戰3
rwlock的好處是在讀的時候可以同時允許大量reader進行操作
但缺點則是可能造成writer starvation
理論上來說rwlock可以分為三種
1. **Read-preferring RW lock**
* allows maximum concurrency
* prone to write-starvation if contention is high
2. **Write-preferring RW lock**
* prevent write-starvation
* leass concurrency
3. **Unspecified RW locks**
* doesn't provide any gaurantees
在linux kernel當中的RW lock稱為`rwlock_t`
在non-PREEMPT kernel當中, `rwlock_t`使用spinlock來實作
並且絕對公平所以不會發生write-starvation
在PREEMPT kernel中使用seperate rt_mutex-based implementation
並且有以下數個改變
* 所有對`spinlock_t`做的改變也都會套用到`rwlock_t`
* preempted low-priority reader可能會一直持有lock, 所以即使是high-priority writer也可能會starvation
不過reader可以對low-priority writer做priority boosting
所以不會發生reader-starvation
## 挑戰4
## mpmc
### 解釋程式碼原理
先理解主要程式運行流程
首先建立兩個`pthread_barrier_t`分別作為consumer跟producer的barrier, 同樣都要等`N_THREADS + 1`個thread
```c=366
pthread_barrier_init(&prod_barrier, NULL, N_THREADS + 1);
pthread_barrier_init(&cons_barrier, NULL, N_THREADS + 1);
```
建立一個array, 會在consumer裡面用到
```c=375
array = calloc(1, (1 + N_THREADS * COUNTS_PER_THREAD) * sizeof(bool));
```
接著把global variable `mpmc`拿去init
```c=376
mpmc_init_queue(&mpmc, N_THREADS, N_THREADS, threshold);
```
此處為初始化一個queue `mpmc_t`, 其中threshold預設值是8
`put_index, pop_index, init_id`都設為0
然後兩個barrier的大小都設置為`N_THREADS`
這個queue `mpmc`是global variable, 會被全部的producer跟consumer使用
之後就開始創建producer, consumer的threads
把th扔進去`mpmc`當中作register, 成為mpmc的handler
consumer也是一樣
```c
mpmc_t *q = &mpmc;
handle_t *th = calloc(1, sizeof(handle_t));
mpmc_queue_register(q, th, ENQUEUE);
```
等到mpmc的enque handler都塞好之後, 就會開始進行enqueue
每個thread會enqueue `COUNTS_PER_THREAD`次
在了解enqueue或dequeue之前, 要先了解以下函式
#### mpmc_find_cell
先以enqueue的情況為例, `ptr`會是handler的push node也就是`th->push`
`i`會是`__atomic_fetch_add(&q->put_index, 1, __ATOMIC_SEQ_CST)`
其實就是`&mpmc->put_index`
而`th`就是想做enqueue的thread的enqueue handler
注意這裡`N`是node size, 從`th->push->id`開始一直找到`mpmc->put_index / N`
過程中如果有node的next node是NULL, 則把handler的spare node指派給tmp
然後把tmp放到next node的位置上
經過這個步驟, `th->push`一直到`th->push + put_index / N`之間都不會有NULL
這時候return `&curr->cells[i & N_BITS]`
```c
/* locate the offset on the nodes and nodes needed. */
static void *mpmc_find_cell(node_t *volatile *ptr, long i, handle_t *th)
{
node_t *curr = *ptr; /* get current node */
/* j is thread's local node id (put node or pop node), (i / N) is the cell
* needed node id. and we should take it, By filling the nodes between the j
* and (i / N) through 'next' field
*/
for (long j = curr->id; j < i / N; ++j) {
node_t *next = curr->next;
if (!next) { /* start filling */
/* use thread's standby node */
node_t *tmp = th->spare;
if (!tmp) {
tmp = mpmc_new_node();
th->spare = tmp;
}
tmp->id = j + 1; /* next node's id */
/* if true, then use this thread's node, else then thread has have
* done this.
*/
/* __atomic_compare_exchange_n(ptr, cmp, val, 0, __ATOMIC_RELEASE,
* __ATOMIC_ACQUIRE) is an atomic compare-and-swap that ensures
* release semantic when succeed or acquire semantic when failed.
*/
if (__atomic_compare_exchange_n(&curr->next, &next, tmp, 0,
__ATOMIC_RELEASE,
__ATOMIC_ACQUIRE)) {
next = tmp;
th->spare = NULL; /* now thread there is no standby node */
}
}
curr = next; /* take the next node */
}
*ptr = curr; /* update our node to the present node */
/* Orders processor execution, so other thread can see the '*ptr = curr' */
__atomic_thread_fence(__ATOMIC_SEQ_CST);
/* now we get the needed cell */
// ZDDD
return &curr->cells[i & N_BITS];
}
```
:::danger
為何要使用`i & N_BITS` ?
`N_BITS`是N-1所以會是一個0x0..1111的形式
和`N_BITS`做AND效果應該還是原本的i但型別會轉換
:::
#### mpmc_enqueue
`q`通常就是`mpmc`, `th`則是呼叫`mpmc_enqueue`的handler, `v`則是enqueue的數值, 通常是`((intptr_t) index) * COUNTS_PER_THREAD`
`index`是該producer, consumer的index
```c
void mpmc_enqueue(mpmc_t *q, handle_t *th, void *v)
{
/* return the needed index */
void *volatile *c = mpmc_find_cell(
&th->push, __atomic_fetch_add(&q->put_index, 1, __ATOMIC_SEQ_CST), th);
/* __atomic_fetch_add(ptr, val) is an atomic fetch-and-add that also
* ensures sequential consistency
*/
/* now c is the needed cell */
void *cv;
/* if XCHG (ATOMIC: Exchange Register/Memory with Register) return NULL,
* so our value has put into the cell, just return.
*/
if ((cv = __atomic_exchange_n(c, v, __ATOMIC_ACQ_REL)) == NULL)
return;
/* else the counterpart pop thread has wait this cell, so we just change the
* waiting value and wake it
*/
// ZAAA
*((int *) cv) = 0;
mpmc_futex_wake(cv, 1);
}
```
#### node_t
```c
typedef struct __node {
struct __node *volatile next __DOUBLE___CACHE_ALIGNED;
long id __DOUBLE___CACHE_ALIGNED;
void *cells[N] __DOUBLE___CACHE_ALIGNED;
} node_t;
```
#### handle_t
```c
typedef struct {
node_t *spare;
node_t *volatile push __CACHE_ALIGNED;
node_t *volatile pop __CACHE_ALIGNED;
} handle_t;
```
#### mpmc_t
```c
typedef struct {
node_t *init_node;
volatile long init_id __DOUBLE___CACHE_ALIGNED;
volatile long put_index __DOUBLE___CACHE_ALIGNED;
volatile long pop_index __DOUBLE___CACHE_ALIGNED;
handle_t *enqueue_handles[N_HANDLES], *dequeue_handles[N_HANDLES];
int threshold;
pthread_barrier_t enq_barrier, deq_barrier;
} mpmc_t;
```
#### mpmc_queue_register
在剛創建producer, consumer的時候會使用到
通常`q`會是`mpmc`, `th`則是該producer, consumer新建立的, `flag`則會用來區分是register ENQUEUE或DEQUEUE
以producer為例, 一開始會先把`th`的push, pop都設置成為`mpmc->init_node`
之後因為`mpmc->enqueue_handles`在init queue的時候其實沒有任何動作, 依舊是空的
迴圈當中的行為就是在尋找`q->enqueue_handles`當中還是空的位置然後把`th`塞進去
等到全部的enquer都把自己的handler塞進去`mpmc->enqueue_handles`之後就會跳出這個函式
```c
void mpmc_queue_register(mpmc_t *q, handle_t *th, int flag)
{
th->spare = mpmc_new_node();
th->push = th->pop = q->init_node;
if (flag & ENQUEUE) {
handle_t **tail = q->enqueue_handles;
for (int i = 0;; ++i) {
handle_t *init = NULL;
if (!tail[i] &&
__atomic_compare_exchange_n(tail + i, &init, th, 0,
__ATOMIC_RELAXED, __ATOMIC_RELAXED))
break;
}
/* wait for the other enqueuers to register */
pthread_barrier_wait(&q->enq_barrier);
}
if (flag & DEQUEUE) {
handle_t **tail = q->dequeue_handles;
for (int i = 0;; ++i) {
handle_t *init = NULL;
if (!tail[i] &&
__atomic_compare_exchange_n(tail + i, &init, th, 0,
__ATOMIC_RELAXED, __ATOMIC_RELAXED))
break;
}
/* wait for the other dequeuers to register */
pthread_barrier_wait(&q->deq_barrier);
}
}
```
先判斷此操作是對queue做enqueue或dequeue
然後找出queue中的空位進行寫入
:::warning
**__atomic_compare_exchange_n**
原型是
```c
bool __atomic_compare_exchange_n (type *ptr, type *expected, type desired, bool weak, int success_memorder, int failure_memorder)
```
比較ptr跟expected的內容
如果相同則此function是一個*read-modify-write*的operation
會把desired寫進ptr並return true
如果不相同則是一個*read* operation, 會把desired寫進expected並return false
:::
#### mpmc_find_cell
```c
static void *mpmc_find_cell(node_t *volatile *ptr, long i, handle_t *th)
{
node_t *curr = *ptr; /* get current node */
/* j is thread's local node id (put node or pop node), (i / N) is the cell
* needed node id. and we should take it, By filling the nodes between the j
* and (i / N) through 'next' field
*/
for (long j = curr->id; j < i / N; ++j) {
node_t *next = curr->next;
if (!next) { /* start filling */
/* use thread's standby node */
node_t *tmp = th->spare;
if (!tmp) {
tmp = mpmc_new_node();
th->spare = tmp;
}
// ZBBB
tmp->id = j + 1; /* next node's id */
/* if true, then use this thread's node, else then thread has have
* done this.
*/
/* __atomic_compare_exchange_n(ptr, cmp, val, 0, __ATOMIC_RELEASE,
* __ATOMIC_ACQUIRE) is an atomic compare-and-swap that ensures
* release semantic when succeed or acquire semantic when failed.
*/
if (__atomic_compare_exchange_n(&curr->next, &next, tmp, 0,
__ATOMIC_RELEASE,
__ATOMIC_ACQUIRE)) {
next = tmp;
th->spare = NULL; /* now thread there is no standby node */
}
}
curr = next; /* take the next node */
}
*ptr = curr; /* update our node to the present node */
/* Orders processor execution, so other thread can see the '*ptr = curr' */
__atomic_thread_fence(__ATOMIC_SEQ_CST);
/* now we get the needed cell */
// ZDDD
return &curr->cells[i & N_BITS];
}
```
在j跟i/N之間找到cell
### C11 Atomics改寫
### 研讀Linux Kernel CMWQ