# Concurrency Study -- MPMC
參考
1. [MPMC](https://github.com/sysprog21/concurrent-programs/blob/master/mpmc/mpmc.c):A multiple-producer/multiple-consumer (MPMC) queue using Linux futex.
2. [Basics of Futexes](https://eli.thegreenplace.net/2018/basics-of-futexes/)
3. [futex-basics at github](https://github.com/eliben/code-for-blog/tree/master/2018/futex-basics)
4. [man of futex](https://man7.org/linux/man-pages/man2/futex.2.html)
5. [Futex Wiki](https://en.wikipedia.org/wiki/Futex)
6. [021q1 第 9 週測驗題](/@sysprog/linux2021-quiz9)
## Futex -- fast userspace mutex
==Futex讓threads間使用同步,而最少的讓kernel介入。==
>The main idea is to enable a more efficient way for userspace code to synchronize multiple threads, with minimal kernel involvement.
因為執行時間 syscall > atomic operation。所以在可能的時候使用atomic操作,不得已的情況有兩種選擇1. busy wait,2. syscall。使用busy wait為userspace的操作,如果等待時間過長,則會浪費一個core的執行效能。使用syscall例如sleep可以把cpu的資源讓出來,但是必須需要kernel的幫忙。
>Futex 的操作幾乎全部在使用者空間完成;只有當操作結果不一致從而需要仲裁時,才需要進入作業系統核心空間執行。這種機制允許使用 futex 的鎖定原語有非常高的執行效率:由於絕大多數的操作並不需要在多個行程之間進行仲裁,所以絕大多數操作都可以在應用程式空間執行,而不需要使用(相對高代價的)核心系統呼叫。
futex在kernel中是一種由kernel來管理的queue。
可以請求某個process/thread suspend直到某個條件成立。
或是可以signal某個條件,來喚醒process/thread。
>futex is a queue the kernel manages for userspace convenience.
>It lets userspace code ask the kernel to suspend until a certain condition is satisfied, and lets other userspace code signal that condition and wake up waiting processes.

futex實現在kernel/futex.c。
kernel保存一個hash table,key為address,可以快速找到wait queue裡面的process。
### prototype
```c=
int futex(int *uaddr, int futex_op, int val,
const struct timespec *timeout, /* or: uint32_t val2 */
int *uaddr2, int val3);
```
其中可以忽略,uaddr2和val3的使用。
==Glibc沒提供wrapper給futex,所以必須自己使用syscall。==
```c=
static inline int mpmc_futex_wake(void *addr, int val)
{
return syscall(SYS_futex, addr, FUTEX_WAKE, val, NULL, NULL, 0);
}
static inline int mpmc_futex_wait(void *addr, int val)
{
return syscall(SYS_futex, addr, FUTEX_WAIT, val, NULL, NULL, 0);
}
```
在FUTEX_WAIT的情況:
:::info
*addr == val,則進入sleep。
*addr != val,則回傳-1,並且設定errno = EAGAIN(11)。
:::
> This operation tests that the value at the futex word pointed to by the address uaddr still contains the expected value val, and if so, then sleeps waiting for a FUTEX_WAKE operation on the futex word.
在FUTEX_WAKE的情況,==val代表至少喚醒的waiter數量。==
:::info
回傳值表示喚醒的數量。
:::
>Most commonly, val is specified as either
1 (wake up a single waiter) or INT_MAX (wake up all
waiters).
>No guarantee is provided about which waiters
are awoken
所以下面function的用意是,==busy wait直到喚醒一個以上的waiter為止。==
```cpp=
void wake_futex_blocking(int* futex_addr) {
while (1) {
int futex_rc = futex(futex_addr, FUTEX_WAKE, 1, NULL, NULL, 0);
if (futex_rc == -1) { // 錯誤,離開程式。
perror("futex wake");
exit(1);
} else if (futex_rc > 0) { // 有喚醒waiter,則離開。
return;
} // 否則持續喚醒waiter
}
}
```
### return value
-1 : 表示失敗,並且會設定errno。
==FUTEX_WAIT的情況,0代表被喚醒。==
==FUTEX_WAKE的情控,返回被叫醒的process數量。==
:::danger
timeout的返回值為-1
:::
### the wakeup is spurious?
因為*addr != val就會馬上返回,返回值為-1,並且errno = EAGAIN。所以不是真的wakeup,而是程式沒進入sleep。
>Futex semantics are tricky [4]! FUTEX_WAIT will immediately return if the value at the futex address is not equal to val. In our case this can happen if the child issued a wait before the parent wrote 0xA, for example. The futex call will return an error with EAGAIN in this case.
避免這樣的情況,必須使用while(1)並且返回後,檢查條件是否成立。
所以下面的function的用意是,
case 1 : *futex_addr == val,則sleep直到被wakeup後條件依然成立。
case 2 : *futex_addr != val,則busy wait直到case 1成立。
```c=
void wait_on_futex_value(int* futex_addr, int val) {
while (1) {
int futex_rc = futex(futex_addr, FUTEX_WAIT, val, NULL, NULL, 0);
if (futex_rc == -1) { // timeout或是val不相等,或是其他錯誤狀況
if (errno != EAGAIN) { // 不是val不同的退出,離開程式,否則繼續等待。
perror("futex");
exit(1);
}
} else if (futex_rc == 0) { // 從sleep醒來
if (*futex_addr == val) { // 再檢查一次
// This is a real wakeup.
return;
}
} else {
abort();
}
}
}
```
### Timed blocking with FUTEX_WAIT
futex可以設定timeout時間,如果timeout時間一到就會醒來並且return -1,此時進入while loop繼續等待。如果是從sleep醒來(futex_rc == 0),則檢查等待值是否到來。
```c=
printf("child waiting for A\n");
struct timespec timeout = {.tv_sec = 0, .tv_nsec = 500000000};
while (1) {
unsigned long long t1 = time_ns();
int futex_rc = futex(shared_data, FUTEX_WAIT, 0xA, &timeout, NULL, 0);
printf("child woken up rc=%d errno=%s, elapsed=%llu\n", futex_rc,
futex_rc ? strerror(errno) : "", time_ns() - t1);
if (futex_rc == 0 && *shared_data == 0xA) {
break;
}
}
```
### Using a futex to implement a simple mutex
:::info
atomic_fetch_sub返回==修改前的值==。
cmpxchg返回==atom修改前的值。==
:::
```cpp=
int cmpxchg(std::atomic<int>* atom, int expected, int desired) {
int* ep = &expected;
std::atomic_compare_exchange_strong(atom, ep, desired);
return *ep;
}
```
其中使用到了atomic_compare_exchange_strong,定義如下:
```cpp=
_Bool atomic_compare_exchange_strong( volatile A* obj,
C* expected, C desired );
```
> Atomically compares the contents of memory pointed to by obj with the contents of memory pointed to by expected, and if those are bitwise equal, replaces the former with desired (performs read-modify-write operation). Otherwise, loads the actual contents of memory pointed to by obj into *expected (performs load operation).
比較obj和expected兩個值,如果bitwise一樣,把obj值換成desired。==否則把expected值換成obj的值。==
單就obj有兩種情況,
+ 1.(not equal)obj = obj,
+ 2.(equal) obj = desired
單就expected也有兩種情況,
+ ==1(not equal)expected = obj==
+ 2(equal) expected = obj
所以cmpxchg的返回值也是分兩種情況,
+ 1. atom == expected, return : atom修改前的值。
+ 2. atom != expected, return : atomc的值。
## pthread_barrier
ref1 : [man of pthread_barrier_init](https://man7.org/linux/man-pages/man3/pthread_barrier_init.3p.html)
ref2 : [man of pthread_barrier_wait](https://man7.org/linux/man-pages/man3/pthread_barrier_wait.3p.html)
### pthread_barrier_init
```c=
int pthread_barrier_init(pthread_barrier_t *restrict barrier,
const pthread_barrierattr_t *restrict attr, unsigned count);
```
其中有三個參數,barrier, attr, 和count。
其中如果attr給NULL的話,就是使用default attr。
count為call pthread_barrier_wait的數量。達到這個數量才會有thread成功地從pthread_barrier_wait中離開。
### pthread_barrier_wait
所有呼叫pthread_barrier_wait的thread/process必須等待,直到規定數量的thread/process都呼叫了這個function。
>The calling thread shall block until the required number of threads have called pthread_barrier_wait() specifying the barrier.
### return value
如果thread的數量到達了,==其中一個的return value會是PTHREAD_BARRIER_SERIAL_THREAD,其他thread會是0。==
>Upon successful completion, the pthread_barrier_wait() function shall return PTHREAD_BARRIER_SERIAL_THREAD for a single (arbitrary) thread synchronized at the barrier and zero for each of the other threads.
如果達到了init所規定的count數量,則會被reset到最近init的值。所以可以重複使用直到destroy為止。
>At this point, the barrier shall be reset to the state it had as a result of the most recent pthread_barrier_init() function that referenced it.
```c=
static pthread_barrier_t prod_barrier, cons_barrier;
#define THREAD_NUM 4
static void *producer(void *index)
{
...
for (;;) {
// 等待所有producer thread一起開始
pthread_barrier_wait(&prod_barrier);
for (int i = 0; i < COUNTS_PER_THREAD; ++i)
mpmc_enqueue(q, th, 1 + i + ((int) index) * COUNTS_PER_THREAD);
// 等待所有producer thread一起結束
pthread_barrier_wait(&prod_barrier);
}
return NULL;
}
static void *consumer(void *index)
{
...
for (;;) {
// 等待所有consumer thread一起開始
pthread_barrier_wait(&cons_barrier);
for (long i = 0; i < COUNTS_PER_THREAD; ++i) {
int value;
if (!(value = mpmc_dequeue(q, th)))
return NULL;
array[value] = true;
}
// 等待所有consumer thread一起結束
pthread_barrier_wait(&cons_barrier);
}
...
}
main(){
pthread_barrier_init(&prod_barrier, NULL, THREAD_NUM + 1); // 4 + 1
pthread_barrier_init(&cons_barrier, NULL, THREAD_NUM + 1);
...
pthread_barrier_wait(&cons_barrier); // start all consumer threads
pthread_barrier_wait(&prod_barrier); // start all producer threads
pthread_barrier_wait(&prod_barrier); // 等待全部producer結束
pthread_barrier_wait(&cons_barrier); // 等待全部的consumer結束
}
```
###### tags: `linux2021`