---
tags: linux2022
---
# Lock-free Ringbuffer 實作
contributed by < [Eddielin0926](https://github.com/Eddielin0926) >
:::info
- [ ] 在探討 lfring 之前,嘗試撰寫 lock-based ring buffer,設計正確性驗證程式
- [ ] 比較 lock-based vs. lock-free: 例如 https://github.com/sysprog21/concurrent-ll
:::
## 觀察 `lfring`
因為對 `lfring` 的 enqueue 和 dequeue 的行為還不熟悉,因此我想先將 `lfring` 的內容印出來觀察。在 `lfring_enqueue` 和 `lfring_dequeue` 回傳前將 `lfring` 先印出來,可以知道在這個測資下,四種情況的 `lfring` 除了 `flag` 以外的內容不會有差異。
```c
void lfring_show(struct lfring *ring)
{
printf("lfring: head %" PRIuPTR ", tail %" PRIuPTR ", mask %" PRIu32
", flags %" PRIu32 "\n",
ring->head, ring->tail, ring->mask, ring->flags);
for (ringidx_t i = 0; i < ring->mask + 1; i++) {
printf("lfring[%" PRIuPTR "]: ptr %p, idx %" PRIuPTR "\n", i,
ring->ring[i].ptr, ring->ring[i].idx);
}
printf("\n");
}
```
## lfring_t
`ALIGNED(CACHE_LINE)` 會展開成 `__attribute__((__aligned__(64)))`要求編譯器在分配空間時對齊 64 byte。
```c
typedef uintptr_t ringidx_t;
struct element {
void *ptr;
uintptr_t idx;
};
struct lfring {
ringidx_t head;
ringidx_t tail ALIGNED(CACHE_LINE);
uint32_t mask;
uint32_t flags;
struct element ring[] ALIGNED(CACHE_LINE);
} ALIGNED(CACHE_LINE);
```
## lfring flag
在 `lfring` 有四種情況,分別是**多生產者**、**單一生產者**、**多消費者**和**單一生產者**。
第一位 bit 代表生產者,第二位 bit 代表消費者,舉例來說單一生產者單一消費者就是 `LFRING_FLAG_SP | LFRING_FLAG_SC` 以二進位來說就是 `0b11`。
```c
enum {
LFRING_FLAG_MP = 0x0000 /* Multiple producer */,
LFRING_FLAG_SP = 0x0001 /* Single producer */,
LFRING_FLAG_MC = 0x0000 /* Multi consumer */,
LFRING_FLAG_SC = 0x0002 /* Single consumer */,
};
```
## lfring_alloc
`lfring_alloc` 會分配記憶體空間給 `lfring_t`,依據傳入的 `n_elems` 決定陣列大小。`ringsz` 的值是由 `n_elems` 近似至 2 的冪次方,用來決定要分配多少記憶體空間,最小的空間為 2,最大為 2^31。
```c
lfring_t *lfring_alloc(uint32_t n_elems, uint32_t flags)
{
unsigned long ringsz = ROUNDUP_POW2(n_elems);
if (n_elems == 0 || ringsz == 0 || ringsz > 0x80000000) {
assert(0 && "invalid number of elements");
return NULL;
}
if ((flags & ~SUPPORTED_FLAGS) != 0) {
assert(0 && "invalid flags");
return NULL;
}
size_t nbytes = sizeof(lfring_t) + ringsz * sizeof(struct element);
lfring_t *lfr = osal_alloc(nbytes, CACHE_LINE);
if (!lfr)
return NULL;
lfr->head = 0, lfr->tail = 0;
lfr->mask = ringsz - 1;
lfr->flags = flags;
for (ringidx_t i = 0; i < ringsz; i++) {
lfr->ring[i].ptr = NULL;
lfr->ring[i].idx = i - ringsz;
}
return lfr;
}
```
:::success
根據註解,`ringsz > 0x80000000` 應該要是 `ringsz >= 0x80000000`
:::
在分配記憶體空間時,使用的是 `oscal_alloc`,但實際上使用的是 `aligned_alloc`,`aligned_alloc` 會讓分配到的記憶體空間對齊 `alignment`,舉例來說,這邊的 `alignment` 是 `CACHE_LINE` 64,因此初始的記憶體位址最低 6 位都會是 0。
`ROUNDUP(a, b)` 會讓 `a` 去近似 `b` 的倍數。
```c
static inline void *osal_alloc(size_t size, size_t alignment)
{
return alignment > 1 ? aligned_alloc(alignment, ROUNDUP(size, alignment))
: malloc(size);
}
```
## lfring_enqueue
`restrict` 關鍵字告訴編譯器所有對該位址的操作都會通過該指標,使編譯器更好的進行優化。
考慮到多執行續的情況,在讀取 `lfr->tail` 時使用了 `__atomic_load_n` 來會保證位址在讀取不會被其他執行緒打斷而造成指標的值被影響,可以參考[並行程式設計: Atomics 操作](https://hackmd.io/@sysprog/concurrency-atomics#%E8%83%8C%E5%BE%8C%E4%BD%9C%E7%A5%9F%E7%9A%84-cache)和 [Atomic vs. Non-Atomic Operations](https://preshing.com/20130618/atomic-vs-non-atomic-operations/) 來理解當沒有使用 `atomic` 操作時可能會造成的結果。
以第 25 行的 `restart` 為界分為兩個部分,上半部為單一生產者,下半部為多生產者的情況。
```c=
uint32_t lfring_enqueue(lfring_t *lfr,
void *const *restrict elems,
uint32_t n_elems)
{
intptr_t actual = 0;
ringidx_t mask = lfr->mask;
ringidx_t size = mask + 1;
ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_RELAXED);
if (lfr->flags & LFRING_FLAG_SP) {
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE);
actual = MIN((intptr_t)(head + size - tail), (intptr_t) n_elems);
if (actual <= 0)
return 0;
for (uint32_t i = 0; i < (uint32_t) actual; i++) {
assert(lfr->ring[tail & mask].idx == tail - size);
lfr->ring[tail & mask].ptr = *elems++;
lfr->ring[tail & mask].idx = tail;
tail++;
}
__atomic_store_n(&lfr->tail, tail, __ATOMIC_RELEASE);
return (uint32_t) actual;
}
restart:
while ((uint32_t) actual < n_elems &&
before(tail, __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE) + size)) {
union {
struct element e;
ptrpair_t pp;
} old, neu;
void *elem = elems[actual];
struct element *slot = &lfr->ring[tail & mask];
old.e.ptr = __atomic_load_n(&slot->ptr, __ATOMIC_RELAXED);
old.e.idx = __atomic_load_n(&slot->idx, __ATOMIC_RELAXED);
do {
if (UNLIKELY(old.e.idx != tail - size)) {
if (old.e.idx != tail) {
tail = cond_reload(tail, &lfr->tail);
goto restart;
}
tail++;
goto restart;
}
neu.e.ptr = elem;
neu.e.idx = tail;
} while (!lf_compare_exchange((ptrpair_t *) slot, &old.pp, neu.pp));
actual++;
tail++;
}
(void) cond_update(&lfr->tail, tail);
return (uint32_t) actual;
}
```
### Single producer
`actual` 代表的是 `n_element` 實際上能存放到 `lfring` 的最大數量,`head + size - tail` 能夠成立的原因是因為陣列大小是 2 的冪次方,然後我們的 `head` 和 `tail` 都是不斷增加的,所以 `tail` 會一直大於 `head`,所以 `tail & mask` 就是 ring index。
### Multiple producer
`before(tail, __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE) + size)` 其實就等於 `head + size - tail` 的 atomic 版本,用來檢查是否還有空間存放。
第一步會先將 tail 的 element 的內容放到 old 中,檢查 `old.e.idx` 是否等於 `tail - size` 意思就是這個位置被用掉了,就要找新的 `tail`。
## lfring_dequeue
dequeue 要做的與 enqueue 類似,首先也是要先計算實際能 dequeue 的數量,將 element 保存之後,檢查是否能成功更新 head,失敗的話就重頭再來一遍。當是單一消費者時就不用去檢查 `head` 的更新是否成功。
```c
uint32_t lfring_dequeue(lfring_t *lfr,
void **restrict elems,
uint32_t n_elems,
uint32_t *index)
{
ringidx_t mask = lfr->mask;
intptr_t actual;
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED);
ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
do {
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
if (UNLIKELY(actual <= 0)) {
/* Ring buffer is empty, scan for new but unreleased elements */
tail = find_tail(lfr, head, tail);
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
if (actual <= 0)
return 0;
}
for (uint32_t i = 0; i < (uint32_t) actual; i++)
elems[i] = lfr->ring[(head + i) & mask].ptr;
smp_fence(LoadStore);
if (UNLIKELY(lfr->flags & LFRING_FLAG_SC)) {
__atomic_store_n(&lfr->head, head + actual, __ATOMIC_RELAXED);
break;
}
} while (!__atomic_compare_exchange_n(
&lfr->head, &head,
head + actual,
false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
*index = (uint32_t) head;
return (uint32_t) actual;
}
```
## Configurable `CACHE_LINE`
利用 `getconf LEVEL1_DCACHE_LINESIZE` 來取得 cache line 大小,使得 `CACHE_LINE` 是可調整的。
- `common.h`
```c
#ifndef CACHE_LINE
#define CACHE_LINE 64
#endif
```
- `Makefile`
```shell
CFLAGS += -DCACHE_LINE=$(shell getconf LEVEL1_DCACHE_LINESIZE)
```