# linux2023 HW3: vax-r [Github repo](https://github.com/vax-r/linux2023-HW3) ## 挑戰1 ### 研讀論文 解釋程式碼 先了解實作work-stealing有用到的結構 **`task_t`** 一個function pointer, 這個function的input是a pointer to `work_t` 且會return另一個a pointer to `work_t` Serves like a trampoline, 傳遞the subsequent unit of work to be execute return the next work item if it's prepared for execution, or NULL if the task isn't ready to proceed ```c typedef struct work_internal *(*task_t)(struct work_internal *); ``` **`work_t`** ```c typedef struct work_internal { task_t code; atomic_int join_count; void *args[]; } work_t; ``` :::warning 注意程式碼中利用數值來表示empty和abort ```c static work_t *EMPTY = (work_t *) 0x100, *ABORT = (work_t *) 0x200; ``` 不像許多人可能使用NULL等特殊型別來表示, 這樣可以讓return type統一都是work_t但是也能辨識 ::: **`deque_t`** 用來做work-stealing的deque 注意此處是假設deque永遠不會overflow ```c typedef struct { atomic_size_t size; _Atomic work_t *buffer[]; } array_t; typedef struct { /* Assume that they never overflow */ atomic_size_t top, bottom; _Atomic(array_t *) array; } deque_t; ``` #### init 此函式比較特別的是`a`的size不僅僅是配置`sizeof(array_t)` 而是給他`sizeof(array_t) + sizeof(work_t *) * size_hint` ```c void init(deque_t *q, int size_hint) { atomic_init(&q->top, 0); atomic_init(&q->bottom, 0); array_t *a = malloc(sizeof(array_t) + sizeof(work_t *) * size_hint); atomic_init(&a->size, size_hint); atomic_init(&q->array, a); } ``` #### resize 把deque的大小變成原本的兩倍 注意這裡怎麼把舊的work搬進新的deque裏面 ```c void resize(deque_t *q) { array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed); size_t old_size = a->size; size_t new_size = old_size * 2; array_t *new = malloc(sizeof(array_t) + sizeof(work_t *) * new_size); atomic_init(&new->size, new_size); size_t t = atomic_load_explicit(&q->top, memory_order_relaxed); size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed); for (size_t i = t; i < b; i++) new->buffer[i % new_size] = a->buffer[i % old_size]; atomic_store_explicit(&q->array, new, memory_order_relaxed); } ``` #### take input一個deque之後返回一個a pointer to `work_t` 從這裡就可以觀察到, 原本的bottom指向的地方應該是沒有work 所以要拿的時候要先load在減一 如果是空的deque則b原本應該就等於t, 所以減一後會小於t 如果是非空deque則把位在b的元素找出來 要特別處理deque只有一個work的情形 且在這個地方可能發生競爭 ```c work_t *take(deque_t *q) { size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed) - 1; array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed); atomic_store_explicit(&q->bottom, b, memory_order_relaxed); atomic_thread_fence(memory_order_seq_cst); size_t t = atomic_load_explicit(&q->top, memory_order_relaxed); work_t *x; if (t <= b) { /* Non-empty queue */ x = atomic_load_explicit(&a->buffer[b % a->size], memory_order_relaxed); if (t == b) { /* Single last element in queue */ // AAAA if (!atomic_compare_exchange_strong_explicit(&q->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed)) /* Failed race */ x = EMPTY; // BBBB atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed); } } else { /* Empty queue */ x = EMPTY; // CCCC atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed); } return x; } ``` #### push 這個operation比較普通, 就是把w放到deque的bottom 比較有疑慮的是在resize之後的load, 有沒有可能resize還沒完成就load? ```c void push(deque_t *q, work_t *w) { size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed); size_t t = atomic_load_explicit(&q->top, memory_order_acquire); array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed); if (b - t > a->size - 1) { /* Full queue */ resize(q); a = atomic_load_explicit(&q->array, memory_order_relaxed); } atomic_store_explicit(&a->buffer[b % a->size], w, memory_order_relaxed); atomic_thread_fence(memory_order_release); // DDDD atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed); } ``` #### steal 和take不同的是它從top拿 而且拿的時候可能有競爭發生 ```c work_t *steal(deque_t *q) { size_t t = atomic_load_explicit(&q->top, memory_order_acquire); atomic_thread_fence(memory_order_seq_cst); size_t b = atomic_load_explicit(&q->bottom, memory_order_acquire); work_t *x = EMPTY; if (t < b) { /* Non-empty queue */ array_t *a = atomic_load_explicit(&q->array, memory_order_consume); x = atomic_load_explicit(&a->buffer[t % a->size], memory_order_relaxed); // EEEE if (!atomic_compare_exchange_strong_explicit( &q->top, &t, t+1, memory_order_seq_cst, memory_order_relaxed)) /* Failed race */ return ABORT; } return x; } ``` #### thread 每個thread自己的id是送進來的payload, deque就是`thread_queues[id]` 接著嘗試利用`take`拿自己deque當中的work來做, 如果有東西就`do_work` 沒有的話就開始搶別人的 如果沒搶到東西還有判斷是EMPTY還是ABORT 如果是ABORT就重新搶同一個人, EMPTY就搶下一個人 搶到就開始做, 一直重複以上過程直到整個system沒有work為止 #### do_work 一次做一件事`do_one_work` 如果還有下一件事就一直做下去 ```c void do_work(int id, work_t *work) { while (work) work = do_one_work(id, work); } ``` #### do_one_work 這裡有使用到`task_t`, 就在`return (*(work->code)) (work)` 送入work做為work->code的input, 執行完後會return一個`work_t` ```c static work_t *do_one_work(int id, work_t *work) { printf("work item %d running item %p\n", id, work); return (*(work->code)) (work); } ``` #### print_task, join_work 從main裡面可以知道 一開始每個thread有個size為8的deque, 並且塞入10個work, 每個work的內容如下 * code: print_task * join_count: 0 * args[0]: 1000 * i + j * args[1]: done_work 所有thread共享同一個`done_work` 之後進入`thread`當中, 如果有work可以做就`do_work`一開始會先做到`print_task` 之後會取出args[1]也就是`done_work`丟入`join_work`當中 ```c work_t *join_work(work_t *work) { int old_join_count = atomic_fetch_sub(&work->join_count, 1); if (old_join_count == 1) return work; return NULL; } ``` `done_work->join_count`初始值是`N_THREADS * nprints` 多數時候減一後不會等於ㄧ, 所以return NULL 所以當前那一次的`do_work`就結束 返回到`thread`繼續拿下一個work ### 改進程式碼 #### resize複製舊值的方法改進 原本在`resize`當中將原本的work從舊的array搬到新的array時候是用迴圈慢慢搬 如果array size越來越大搬的速度會慢很多 我利用`memcpy`改進此function如下 ```diff - for (size_t i = t; i < b; i++) - new->buffer[i % new_size] = a->buffer[i % old_size]; + size_t new_t = t % old_size; + size_t new_b = b % old_size; + if (new_b > new_t) { + memcpy(new->buffer + new_t, a->buffer + new_t, old_size * sizeof(work_t *)); + } else { + memcpy(new->buffer + new_t, a->buffer + new_t, (old_size - new_t) * sizeof(work_t *)); + memcpy(new->buffer + old_size, a->buffer, new_b * sizeof(work_t *)); + } ``` ```c void resize(deque_t *q) { array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed); size_t old_size = a->size; size_t new_size = old_size * 2; array_t *new = malloc(sizeof(array_t) + sizeof(work_t *) * new_size); atomic_init(&new->size, new_size); size_t t = atomic_load_explicit(&q->top, memory_order_relaxed); size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed); size_t new_t = t % old_size; size_t new_b = b % old_size; if (new_b > new_t) { memcpy(new->buffer + new_t, a->buffer + new_t, old_size * sizeof(work_t *)); } else { memcpy(new->buffer + new_t, a->buffer + new_t, (old_size - new_t) * sizeof(work_t *)); memcpy(new->buffer + old_size, a->buffer, new_b * sizeof(work_t *)); } atomic_store_explicit(&q->array, new, memory_order_relaxed); } ``` #### 隨機挑選要stole的thread 原本在`thread`當中要挑選哪個thread來做work stealing是使用迴圈從id 0開始循序挑選, 和理想上隨機挑選不一樣並且這麼做更容易產生多個thread同時嘗試搶奪一個work的情況, 因為大家stealing的順序都相同 改寫`thread`當中挑選stealing thread的方法 ```c if (work != EMPTY) { do_work(id, work); } else { /* Currently, there is no work present in my own queue */ work_t *stolen = EMPTY; for (int i = 0; i < N_THREADS; ++i) { int j = rand() % N_THREADS; // pick a thread randomly if (j == id) continue; do { stolen = steal(&thread_queues[j]); } while (stolen == ABORT); /* Try again at the same j */ if (stolen == EMPTY) continue; /* Found some work to do */ break; } ``` 比較原本的做法和改寫後的執行時間 可以看到以下結果 從24條thread到100條thread 隨機挑選要做work-stealing的thread會讓程式執行快速許多 ![](https://hackmd.io/_uploads/BJbtJ-z6n.png) *sequence為修改之前, random為修改之後* ### 改寫multi-threaded quicksort ### Linux Kernel CMWQ ## 挑戰2 ### 用C11 Atomics改寫程式碼 透過C11 Atomics改寫後function如下 **wait_until_equal_u8** ```c static inline void wait_until_equal_u8(uint8_t *loc, uint8_t val, int mm) { while (atomic_load_explicit((atomic_uchar *) loc, mm_casting(mm)) != val) spin_wait(); } ``` **mcslock_acquire** ```c void mcslock_acquire(mcslock_t *lock, mcsnode_t *node) { node->next = NULL; /* A0: Read and write lock, synchronized with A0/A1 */ mcsnode_t *prev = atomic_exchange_explicit((_Atomic mcslock_t *) lock, node, memory_order_acq_rel); if (LIKELY(!prev)) /* Lock uncontended, the lock is acquired */ return; /* Otherwise, the lock is owned by another thread, waiting for its turn */ node->wait = MCS_WAIT; /* B0: Write next, synchronized with B1/B2 */ atomic_store_explicit((_Atomic mcslock_t *) &prev->next, node, memory_order_release); /* Waiting for the previous thread to signal using the assigned node * C0: Read wait, synchronized with C1 */ wait_until_equal_u8(&node->wait, MCS_PROCEED, __ATOMIC_ACQUIRE); } ``` **mcslock_release** ```c void mcslock_release(mcslock_t *lock, mcsnode_t *node) { mcsnode_t *next; /* Check if any waiting thread exists */ /* B1: Read next, synchronized with B0 */ if ((next = atomic_load_explicit((_Atomic mcslock_t *) &node->next, memory_order_acquire)) == NULL) { /* No waiting threads detected, attempt lock release */ /* Use temporary variable as it might be overwritten */ mcsnode_t *tmp = node; /* A1: write lock, synchronize with A0 */ if (atomic_compare_exchange_strong_explicit((_Atomic mcslock_t *) lock, &tmp, NULL, memory_order_release, memory_order_relaxed)) { /* No waiting threads yet, lock released successfully */ return; } /* Otherwise, at least one waiting thread exists */ /* Wait for the first waiting thread to link its node with ours */ /* B2: Read next, synchronized with B0 */ while ((next = atomic_load_explicit((_Atomic mcslock_t *) &node->next, memory_order_acquire)) == NULL) spin_wait(); } /* Signal the first waiting thread */ /* C1: Write wait, synchronized with C0 */ atomic_store_explicit((atomic_uchar *) &next->wait, MCS_PROCEED, memory_order_release); } ``` ### 研讀linux核心的locking實作 在linux kernel中, lock主要被分為三種 * sleeping locks * CPU local locks * Spinning locks :::info **Lock type nesting rules** 同時使用多個不同種類的lock要遵守以下規定 * 如果都是同一個種類的lock, 只要他們遵守general lock ordering rules就可以任意nesting * Sleeping lock types不能被包在CPU local lock或Spinning lock裏面 * CPU local locks跟Spinning locks可以被包在Sleeping locks裡面 * Spinning locks可以被包在任何lock裡面 ::: 並且這些lock都有一個嚴格的規定 **取得lock的owner必須自己釋放它** *不過也是存在例外例如`rw_semaphore`* 特別注意的是`local_lock`跟`spinlock_t` `local_lock`做出critical section的方式是透過disabling preemption or interrupts 在non-PREEMPT_RT kernels當中`local_lock`的operation都可以背對應到各種preemption或disable interrupt的primitives 如果在PREEMPT-RT的kernel中使用`local_lock`則會對應到每個CPU自己的`spinlock_t` 文件中有提到適合使用`local_lock`的時機 > local_lock should be used in situations where disabling preemption or interrupts is the appropriate form of concurrency control to protect per-CPU data structures on a non PREEMPT_RT kernel. 他適合被用在non PREEMPT_RT kernel當中, 用來保護每個CPU自己的data structure 至於spinlock則分為數種 **raw_spinlock_t** 所有kernel中都有的嚴格實作, 最好只在非常重要的code section中使用 例如low-level interrupt handling或一些需要disable preemption, interrupts的地方 如果critical section很小也可以使用, 用來避免RT mutex的overhead **spinlock_t** 在non PREEMPT_RT kernel當中, `spinlock_t`就是`raw_spinlock_t` :::warning 看起來lock會因為preempt或non-preempt kernel而在實作上有很大的不同 我對PREEMPT_RT kernel一無所知 應作為未來研讀素材 ::: **解釋mcs_spinlock.h** mcs_spinlock在每個CPU中自己保有一個local的lock 而waiting的方式則是和spinlock一樣 所以可以看到它在spin waiting的部分是會參考不同硬體架構而有所不同 ```c #define arch_mcs_spin_lock_contended(l) \ do { \ smp_cond_load_acquire(l, VAL); \ } while (0) #endif ``` :::warning `VAL`在整份程式碼當中並沒有出現過 他是什麼? ::: 再來看到`mcs_spin_lock` ```c static inline void mcs_spin_lock(struct mcs_spinlock **lock, struct mcs_spinlock *node) { struct mcs_spinlock *prev; /* Init node */ node->locked = 0; node->next = NULL; /* * We rely on the full barrier with global transitivity implied by the * below xchg() to order the initialization stores above against any * observation of @node. And to provide the ACQUIRE ordering associated * with a LOCK primitive. */ prev = xchg(lock, node); if (likely(prev == NULL)) { /* * Lock acquired, don't need to set node->locked to 1. Threads * only spin on its own node->locked value for lock acquisition. * However, since this thread can immediately acquire the lock * and does not proceed to spin on its own node->locked, this * value won't be used. If a debug mode is needed to * audit lock status, then set node->locked value here. */ return; } WRITE_ONCE(prev->next, node); /* Wait until the lock holder passes the lock down. */ arch_mcs_spin_lock_contended(&node->locked); } ``` `lock`就是整個系統的global lock, `node`則是想要嘗試獲取lock的cpu自己的lock 在把`node`給初始化之後, 先利用`xchg`獲取當前global lock是否被佔有 如果沒有就可以直接return 註解有寫為甚麼此處不用設`node->locked = 1` 我認為這就體現了mcs_spinlock的優點就是waiting condition都是在local上 因此自己知道已經取得lock的話也不需要透過任何動作通知其他cpu或者修改任何global variable, 這樣就不會有cache coherence造成的問題 接著看到`mcs_spin_unlock` ```c static inline void mcs_spin_unlock(struct mcs_spinlock **lock, struct mcs_spinlock *node) { struct mcs_spinlock *next = READ_ONCE(node->next); if (likely(!next)) { /* * Release the lock by setting it to NULL */ if (likely(cmpxchg_release(lock, node, NULL) == node)) return; /* Wait until the next pointer is set */ while (!(next = READ_ONCE(node->next))) cpu_relax(); } /* Pass lock to next waiter. */ arch_mcs_spin_unlock_contended(&next->locked); } ``` 十分簡單就是檢查讓佔有lock的cpu釋放這個lock 先檢查這個cpu後面有沒有人在waiting queue當中 通常沒有, 此時把lock寫成NULL就可以了 不過如果在PREEMPT_RT kernel中, 有可能處理unlock的時候突然被搶佔 然後有cpu在這時候嘗試獲取lock, 當然會排進去waiting queue 這時候就會造成`cmpxchg_release`失敗 所以要取得誰突然插隊進來的, 然後把lock傳給他 ### 設計實驗說明MCS lock相對於ticket lock的優點 TODO ## 挑戰3 rwlock的好處是在讀的時候可以同時允許大量reader進行操作 但缺點則是可能造成writer starvation 理論上來說rwlock可以分為三種 1. **Read-preferring RW lock** * allows maximum concurrency * prone to write-starvation if contention is high 2. **Write-preferring RW lock** * prevent write-starvation * leass concurrency 3. **Unspecified RW locks** * doesn't provide any gaurantees 在linux kernel當中的RW lock稱為`rwlock_t` 在non-PREEMPT kernel當中, `rwlock_t`使用spinlock來實作 並且絕對公平所以不會發生write-starvation 在PREEMPT kernel中使用seperate rt_mutex-based implementation 並且有以下數個改變 * 所有對`spinlock_t`做的改變也都會套用到`rwlock_t` * preempted low-priority reader可能會一直持有lock, 所以即使是high-priority writer也可能會starvation 不過reader可以對low-priority writer做priority boosting 所以不會發生reader-starvation ## 挑戰4 ## mpmc ### 解釋程式碼原理 先理解主要程式運行流程 首先建立兩個`pthread_barrier_t`分別作為consumer跟producer的barrier, 同樣都要等`N_THREADS + 1`個thread ```c=366 pthread_barrier_init(&prod_barrier, NULL, N_THREADS + 1); pthread_barrier_init(&cons_barrier, NULL, N_THREADS + 1); ``` 建立一個array, 會在consumer裡面用到 ```c=375 array = calloc(1, (1 + N_THREADS * COUNTS_PER_THREAD) * sizeof(bool)); ``` 接著把global variable `mpmc`拿去init ```c=376 mpmc_init_queue(&mpmc, N_THREADS, N_THREADS, threshold); ``` 此處為初始化一個queue `mpmc_t`, 其中threshold預設值是8 `put_index, pop_index, init_id`都設為0 然後兩個barrier的大小都設置為`N_THREADS` 這個queue `mpmc`是global variable, 會被全部的producer跟consumer使用 之後就開始創建producer, consumer的threads 把th扔進去`mpmc`當中作register, 成為mpmc的handler consumer也是一樣 ```c mpmc_t *q = &mpmc; handle_t *th = calloc(1, sizeof(handle_t)); mpmc_queue_register(q, th, ENQUEUE); ``` 等到mpmc的enque handler都塞好之後, 就會開始進行enqueue 每個thread會enqueue `COUNTS_PER_THREAD`次 在了解enqueue或dequeue之前, 要先了解以下函式 #### mpmc_find_cell 先以enqueue的情況為例, `ptr`會是handler的push node也就是`th->push` `i`會是`__atomic_fetch_add(&q->put_index, 1, __ATOMIC_SEQ_CST)` 其實就是`&mpmc->put_index` 而`th`就是想做enqueue的thread的enqueue handler 注意這裡`N`是node size, 從`th->push->id`開始一直找到`mpmc->put_index / N` 過程中如果有node的next node是NULL, 則把handler的spare node指派給tmp 然後把tmp放到next node的位置上 經過這個步驟, `th->push`一直到`th->push + put_index / N`之間都不會有NULL 這時候return `&curr->cells[i & N_BITS]` ```c /* locate the offset on the nodes and nodes needed. */ static void *mpmc_find_cell(node_t *volatile *ptr, long i, handle_t *th) { node_t *curr = *ptr; /* get current node */ /* j is thread's local node id (put node or pop node), (i / N) is the cell * needed node id. and we should take it, By filling the nodes between the j * and (i / N) through 'next' field */ for (long j = curr->id; j < i / N; ++j) { node_t *next = curr->next; if (!next) { /* start filling */ /* use thread's standby node */ node_t *tmp = th->spare; if (!tmp) { tmp = mpmc_new_node(); th->spare = tmp; } tmp->id = j + 1; /* next node's id */ /* if true, then use this thread's node, else then thread has have * done this. */ /* __atomic_compare_exchange_n(ptr, cmp, val, 0, __ATOMIC_RELEASE, * __ATOMIC_ACQUIRE) is an atomic compare-and-swap that ensures * release semantic when succeed or acquire semantic when failed. */ if (__atomic_compare_exchange_n(&curr->next, &next, tmp, 0, __ATOMIC_RELEASE, __ATOMIC_ACQUIRE)) { next = tmp; th->spare = NULL; /* now thread there is no standby node */ } } curr = next; /* take the next node */ } *ptr = curr; /* update our node to the present node */ /* Orders processor execution, so other thread can see the '*ptr = curr' */ __atomic_thread_fence(__ATOMIC_SEQ_CST); /* now we get the needed cell */ // ZDDD return &curr->cells[i & N_BITS]; } ``` :::danger 為何要使用`i & N_BITS` ? `N_BITS`是N-1所以會是一個0x0..1111的形式 和`N_BITS`做AND效果應該還是原本的i但型別會轉換 ::: #### mpmc_enqueue `q`通常就是`mpmc`, `th`則是呼叫`mpmc_enqueue`的handler, `v`則是enqueue的數值, 通常是`((intptr_t) index) * COUNTS_PER_THREAD` `index`是該producer, consumer的index ```c void mpmc_enqueue(mpmc_t *q, handle_t *th, void *v) { /* return the needed index */ void *volatile *c = mpmc_find_cell( &th->push, __atomic_fetch_add(&q->put_index, 1, __ATOMIC_SEQ_CST), th); /* __atomic_fetch_add(ptr, val) is an atomic fetch-and-add that also * ensures sequential consistency */ /* now c is the needed cell */ void *cv; /* if XCHG (ATOMIC: Exchange Register/Memory with Register) return NULL, * so our value has put into the cell, just return. */ if ((cv = __atomic_exchange_n(c, v, __ATOMIC_ACQ_REL)) == NULL) return; /* else the counterpart pop thread has wait this cell, so we just change the * waiting value and wake it */ // ZAAA *((int *) cv) = 0; mpmc_futex_wake(cv, 1); } ``` #### node_t ```c typedef struct __node { struct __node *volatile next __DOUBLE___CACHE_ALIGNED; long id __DOUBLE___CACHE_ALIGNED; void *cells[N] __DOUBLE___CACHE_ALIGNED; } node_t; ``` #### handle_t ```c typedef struct { node_t *spare; node_t *volatile push __CACHE_ALIGNED; node_t *volatile pop __CACHE_ALIGNED; } handle_t; ``` #### mpmc_t ```c typedef struct { node_t *init_node; volatile long init_id __DOUBLE___CACHE_ALIGNED; volatile long put_index __DOUBLE___CACHE_ALIGNED; volatile long pop_index __DOUBLE___CACHE_ALIGNED; handle_t *enqueue_handles[N_HANDLES], *dequeue_handles[N_HANDLES]; int threshold; pthread_barrier_t enq_barrier, deq_barrier; } mpmc_t; ``` #### mpmc_queue_register 在剛創建producer, consumer的時候會使用到 通常`q`會是`mpmc`, `th`則是該producer, consumer新建立的, `flag`則會用來區分是register ENQUEUE或DEQUEUE 以producer為例, 一開始會先把`th`的push, pop都設置成為`mpmc->init_node` 之後因為`mpmc->enqueue_handles`在init queue的時候其實沒有任何動作, 依舊是空的 迴圈當中的行為就是在尋找`q->enqueue_handles`當中還是空的位置然後把`th`塞進去 等到全部的enquer都把自己的handler塞進去`mpmc->enqueue_handles`之後就會跳出這個函式 ```c void mpmc_queue_register(mpmc_t *q, handle_t *th, int flag) { th->spare = mpmc_new_node(); th->push = th->pop = q->init_node; if (flag & ENQUEUE) { handle_t **tail = q->enqueue_handles; for (int i = 0;; ++i) { handle_t *init = NULL; if (!tail[i] && __atomic_compare_exchange_n(tail + i, &init, th, 0, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) break; } /* wait for the other enqueuers to register */ pthread_barrier_wait(&q->enq_barrier); } if (flag & DEQUEUE) { handle_t **tail = q->dequeue_handles; for (int i = 0;; ++i) { handle_t *init = NULL; if (!tail[i] && __atomic_compare_exchange_n(tail + i, &init, th, 0, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) break; } /* wait for the other dequeuers to register */ pthread_barrier_wait(&q->deq_barrier); } } ``` 先判斷此操作是對queue做enqueue或dequeue 然後找出queue中的空位進行寫入 :::warning **__atomic_compare_exchange_n** 原型是 ```c bool __atomic_compare_exchange_n (type *ptr, type *expected, type desired, bool weak, int success_memorder, int failure_memorder) ``` 比較ptr跟expected的內容 如果相同則此function是一個*read-modify-write*的operation 會把desired寫進ptr並return true 如果不相同則是一個*read* operation, 會把desired寫進expected並return false ::: #### mpmc_find_cell ```c static void *mpmc_find_cell(node_t *volatile *ptr, long i, handle_t *th) { node_t *curr = *ptr; /* get current node */ /* j is thread's local node id (put node or pop node), (i / N) is the cell * needed node id. and we should take it, By filling the nodes between the j * and (i / N) through 'next' field */ for (long j = curr->id; j < i / N; ++j) { node_t *next = curr->next; if (!next) { /* start filling */ /* use thread's standby node */ node_t *tmp = th->spare; if (!tmp) { tmp = mpmc_new_node(); th->spare = tmp; } // ZBBB tmp->id = j + 1; /* next node's id */ /* if true, then use this thread's node, else then thread has have * done this. */ /* __atomic_compare_exchange_n(ptr, cmp, val, 0, __ATOMIC_RELEASE, * __ATOMIC_ACQUIRE) is an atomic compare-and-swap that ensures * release semantic when succeed or acquire semantic when failed. */ if (__atomic_compare_exchange_n(&curr->next, &next, tmp, 0, __ATOMIC_RELEASE, __ATOMIC_ACQUIRE)) { next = tmp; th->spare = NULL; /* now thread there is no standby node */ } } curr = next; /* take the next node */ } *ptr = curr; /* update our node to the present node */ /* Orders processor execution, so other thread can see the '*ptr = curr' */ __atomic_thread_fence(__ATOMIC_SEQ_CST); /* now we get the needed cell */ // ZDDD return &curr->cells[i & N_BITS]; } ``` 在j跟i/N之間找到cell ### C11 Atomics改寫 ### 研讀Linux Kernel CMWQ