---
tags: linux2022
---
# 2022q1 Homework5 (quiz8)
contributed by < [Eddielin0926](https://github.com/Eddielin0926) >
> [第八週測驗題](https://hackmd.io/@sysprog/linux2022-quiz8)
## 測驗 1
---
## [測驗 2](https://hackmd.io/@sysprog/rkQMKQu7c) lfring
[`lfring`](https://github.com/Eddielin0926/lfring) 是個 lock-free ring buffer 實作,並支援 multiple-producer/multiple-consumer (MPMC)
```c
static inline ringidx_t cond_reload(ringidx_t idx, const ringidx_t *loc)
{
ringidx_t fresh = __atomic_load_n(loc, __ATOMIC_RELAXED);
if (before(idx, fresh)) { /* fresh is after idx, use this instead */
idx = fresh;
} else { /* Continue with next slot */
idx++; /* XXXXX DDD; */
}
return idx;
}
static inline ringidx_t find_tail(lfring_t *lfr, ringidx_t head, ringidx_t tail)
{
if (lfr->flags & LFRING_FLAG_SP) /* single-producer enqueue */
return __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
/* Multi-producer enqueue.
* Scan ring for new elements that have been written but not released.
*/
ringidx_t mask = lfr->mask;
ringidx_t size = mask + 1; /* XXXXX KKK; */
while (before(tail, head + size) &&
__atomic_load_n(&lfr->ring[tail & mask].idx, __ATOMIC_ACQUIRE) ==
tail)
tail++;
tail = cond_update(&lfr->tail, tail);
return tail;
}
uint32_t lfring_dequeue(lfring_t *lfr,
void **restrict elems,
uint32_t n_elems,
uint32_t *index)
{
ringidx_t mask = lfr->mask;
intptr_t actual;
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED);
ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
do { /* skipped */
} while (!__atomic_compare_exchange_n(
&lfr->head, &head, /* Updated on failure */
/* XXXXX */ HHH,
/* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
*index = (uint32_t) head;
return (uint32_t) actual;
}
```
| Question | Answer |
|:-------- |:----------------------------------------------- |
| DDD | `idx++;` |
| KKK | `mask + 1` |
| TTT | `&lfr->ring[tail & mask].idx, __ATOMIC_ACQUIRE` |
| HHH | `head + actual` |
### 解釋上述程式碼運作原理
這個程式是[生產者消費者問題](https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem)的實作,以有限環狀鏈結串列解決緩衝區問題。
從 `tests.c` 的 `main` 開始理解,可以看到 `main` 是由四種測試狀況構成的,分別是 multiple producer multiple consumer (MPMC), multiple producer single consumer (MPSC), single producer multiple consumer (SPMC), single producer single consumer (SPSC)。
```c
int main(void)
{
printf("testing MPMC lock-free ring\n");
test_ringbuffer(LFRING_FLAG_MP | LFRING_FLAG_MC);
printf("testing MPSC lock-free ring\n");
test_ringbuffer(LFRING_FLAG_MP | LFRING_FLAG_SC);
printf("testing SPMC lock-free ring\n");
test_ringbuffer(LFRING_FLAG_SP | LFRING_FLAG_MC);
printf("testing SPSC lock-free ring\n");
test_ringbuffer(LFRING_FLAG_SP | LFRING_FLAG_SC);
return 0;
}
```
`test_ringbuffer` 的參數 `flag` 會決定測試的生產者消費者組合,有四種前綴是 `LFRING_FLAG_` 的 enum,用最低的兩個位元來決定情況。
```c
enum {
LFRING_FLAG_MP = 0x0000 /* Multiple producer */,
LFRING_FLAG_SP = 0x0001 /* Single producer */,
LFRING_FLAG_MC = 0x0000 /* Multi consumer */,
LFRING_FLAG_SC = 0x0002 /* Single consumer */,
};
```
在 `test_ringbuffer` 中可以看到 `flags` 是作為 `lfring_alloc` 的參數,會存放在 `lfring` 中。`ROUNDUP_POW2` 會將 `n_elems` 近似至 2 的指數倍,用來決定要分配多少記憶體空間,最後初始化結構體中的成員。
```c
lfring_t *lfring_alloc(uint32_t n_elems, uint32_t flags)
{
unsigned long ringsz = ROUNDUP_POW2(n_elems);
if (n_elems == 0 || ringsz == 0 || ringsz > 0x80000000) {
assert(0 && "invalid number of elements");
return NULL;
}
if ((flags & ~SUPPORTED_FLAGS) != 0) {
assert(0 && "invalid flags");
return NULL;
}
size_t nbytes = sizeof(lfring_t) + ringsz * sizeof(struct element);
lfring_t *lfr = osal_alloc(nbytes, CACHE_LINE);
if (!lfr)
return NULL;
lfr->head = 0, lfr->tail = 0;
lfr->mask = ringsz - 1;
lfr->flags = flags;
for (ringidx_t i = 0; i < ringsz; i++) {
lfr->ring[i].ptr = NULL;
lfr->ring[i].idx = i - ringsz;
}
return lfr;
}
```
在分配記憶體空間時,使用的是 `oscal_alloc`,因為 `aligment` 是 64,因此實際上使用的是 `aligned_alloc`,`aligned_alloc` 會讓非配到的記憶體空間對齊 `alignment`,舉例來說,這邊的 `alignment` 是 64,因此初始的記憶體位址最低 6 位都會是 0。
`ROUNDUP(a, b)` 會讓 `a` 去近似 `b` 的倍數。
```c
static inline void *osal_alloc(size_t size, size_t alignment)
{
return alignment > 1 ? aligned_alloc(alignment, ROUNDUP(size, alignment))
: malloc(size);
}
```
`ALIGNED(CACHE_LINE)` 會展開成 `__attribute__((__aligned__(64)))`要求編譯器在分配空間時對齊 64 byte。
```c
typedef uintptr_t ringidx_t;
struct element {
void *ptr;
uintptr_t idx;
};
struct lfring {
ringidx_t head;
ringidx_t tail ALIGNED(CACHE_LINE);
uint32_t mask;
uint32_t flags;
struct element ring[] ALIGNED(CACHE_LINE);
} ALIGNED(CACHE_LINE);
```
`restrict` 關鍵字告訴編譯器所有對該位址的操作都會通過該指標,使編譯器更好的進行優化。
考慮到多執行續的情況,在讀取 `lfr->tail` 時使用了 `__atomic_load_n` 來會保證位址在讀取不會被其他執行緒打斷而造成指標的值被影響,可以參考[並行程式設計: Atomics 操作](https://hackmd.io/@sysprog/concurrency-atomics#%E8%83%8C%E5%BE%8C%E4%BD%9C%E7%A5%9F%E7%9A%84-cache)和 [Atomic vs. Non-Atomic Operations](https://preshing.com/20130618/atomic-vs-non-atomic-operations/) 來理解當沒有使用 `atomic` 操作時可能會造成的結果。
`tail & mask` 是因為 `tail` 對齊 cache line,所以與 `mask` 做 AND 可以當作 index 來使用。
```c
/* Enqueue elements at tail */
uint32_t lfring_enqueue(lfring_t *lfr,
void *const *restrict elems,
uint32_t n_elems)
{
intptr_t actual = 0;
ringidx_t mask = lfr->mask;
ringidx_t size = mask + 1;
ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_RELAXED);
if (lfr->flags & LFRING_FLAG_SP) { /* single-producer */
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE);
actual = MIN((intptr_t)(head + size - tail), (intptr_t) n_elems);
if (actual <= 0)
return 0;
for (uint32_t i = 0; i < (uint32_t) actual; i++) {
assert(lfr->ring[tail & mask].idx == tail - size);
lfr->ring[tail & mask].ptr = *elems++;
lfr->ring[tail & mask].idx = tail;
tail++;
}
__atomic_store_n(&lfr->tail, tail, __ATOMIC_RELEASE);
return (uint32_t) actual;
}
/* else: lock-free multi-producer */
restart:
while ((uint32_t) actual < n_elems &&
before(tail, __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE) + size)) {
union {
struct element e;
ptrpair_t pp;
} old, neu;
void *elem = elems[actual];
struct element *slot = &lfr->ring[tail & mask];
old.e.ptr = __atomic_load_n(&slot->ptr, __ATOMIC_RELAXED);
old.e.idx = __atomic_load_n(&slot->idx, __ATOMIC_RELAXED);
do {
if (UNLIKELY(old.e.idx != tail - size)) {
if (old.e.idx != tail) {
/* We are far behind. Restart with fresh index */
tail = cond_reload(tail, &lfr->tail);
goto restart;
}
/* slot already enqueued */
tail++; /* Try next slot */
goto restart;
}
/* Found slot that was used one lap back.
* Try to enqueue next element.
*/
neu.e.ptr = elem;
neu.e.idx = tail; /* Set idx on enqueue */
} while (!lf_compare_exchange((ptrpair_t *) slot, &old.pp, neu.pp));
/* Enqueue succeeded */
actual++;
tail++; /* Continue with next slot */
}
(void) cond_update(&lfr->tail, tail);
return (uint32_t) actual;
}
```
## 測驗 3