---
tags: linux2022
---
# 2022-04-04 [HScallop](https://github.com/HScallop/linux2022-quiz8)
[quiz8](https://hackmd.io/@sysprog/linux2022-quiz8/https%3A%2F%2Fhackmd.io%2F%40sysprog%2FHyo0W7OQ5)
## 測驗 `1`
```c
#include <stddef.h>
#include <stdint.h>
#include <limits.h>
#include <string.h>
/* Nonzero if either X or Y is not aligned on a "long" boundary */
#define UNALIGNED(X) ((long) X & (sizeof(long) - 1))
/* How many bytes are loaded each iteration of the word copy loop */
#define LBLOCKSIZE (sizeof(long))
/* Threshhold for punting to the bytewise iterator */
#define TOO_SMALL(LEN) ((LEN) < LBLOCKSIZE)
#if LONG_MAX == 2147483647L
#define DETECT_NULL(X) (((X) -0x01010101) & ~(X) & 0x80808080)
#else
#if LONG_MAX == 9223372036854775807L
/* Nonzero if X (a long int) contains a NULL byte. */
#define DETECT_NULL(X) (((X) -0x0101010101010101) & ~(X) & 0x8080808080808080)
#else
#error long int is not a 32bit or 64bit type.
#endif
#endif
/* @return nonzero if (long)X contains the byte used to fill MASK. */
#define DETECT_CHAR(X, MASK) (DETECT_NULL(X ^ MASK))
void *memchr_opt(const void *src_void, int c, size_t length)
{
const unsigned char *src = (const unsigned char *) src_void;
unsigned char d = c;
while (UNALIGNED(src)) {
if (!length--)
return NULL;
if (*src == d)
return (void *) src;
src++;
}
if (!TOO_SMALL(length)) {
/* If we get this far, we know that length is large and
* src is word-aligned.
*/
/* The fast code reads the source one word at a time and only performs
* the bytewise search on word-sized segments if they contain the search
* character, which is detected by XORing the word-sized segment with a
* word-sized block of the search character and then detecting for the
* presence of NULL in the result.
*/
unsigned long *asrc = (unsigned long *) src;
unsigned long mask = d << 8 | d;
mask = mask << 16 | mask;
for (unsigned int i = 32; i < LBLOCKSIZE * 8; i <<= 1)
mask = (mask << i) | mask;
/* Use unsigned long mask to check multiple characters at once.
* The loop will terminate when length of string is too small.
*/
while (length >= LBLOCKSIZE) {
if (DETECT_CHAR(*asrc, mask)) break;
length -= LBLOCKSIZE;
asrc++;
}
/* If there are fewer than LBLOCKSIZE characters left, then we resort to
* the bytewise loop.
*/
src = (unsigned char *) asrc;
}
while (length--) {
if (*src == d)
return (void *) src;
src++;
}
return NULL;
}
```
把多個 char 拼起來變成 long 再用我們做好的 mask 就可以一次辨識多個 char(在此題中一次可辨識的數量與 data model 有關,在 lp64 中可以一次辨識 8 個 char)。
## 測驗 `2`
```c
#include <assert.h>
#include <inttypes.h>
#include <stdbool.h>
#include <stdlib.h>
#include "arch.h"
#include "common.h"
#include "lfring.h"
#define SUPPORTED_FLAGS \
(LFRING_FLAG_SP | LFRING_FLAG_MP | LFRING_FLAG_SC | LFRING_FLAG_MC)
#define MIN(a, b) \
({ \
__typeof__(a) tmp_a = (a); \
__typeof__(b) tmp_b = (b); \
tmp_a < tmp_b ? tmp_a : tmp_b; \
})
typedef uintptr_t ringidx_t;
struct element {
void *ptr;
uintptr_t idx;
};
struct lfring {
ringidx_t head;
ringidx_t tail ALIGNED(CACHE_LINE);
uint32_t mask;
uint32_t flags;
struct element ring[] ALIGNED(CACHE_LINE);
} ALIGNED(CACHE_LINE);
lfring_t *lfring_alloc(uint32_t n_elems, uint32_t flags)
{
unsigned long ringsz = ROUNDUP_POW2(n_elems);
if (n_elems == 0 || ringsz == 0 || ringsz > 0x80000000) {
assert(0 && "invalid number of elements");
return NULL;
}
if ((flags & ~SUPPORTED_FLAGS) != 0) {
assert(0 && "invalid flags");
return NULL;
}
size_t nbytes = sizeof(lfring_t) + ringsz * sizeof(struct element);
lfring_t *lfr = osal_alloc(nbytes, CACHE_LINE);
if (!lfr)
return NULL;
lfr->head = 0, lfr->tail = 0;
lfr->mask = ringsz - 1;
lfr->flags = flags;
for (ringidx_t i = 0; i < ringsz; i++) {
lfr->ring[i].ptr = NULL;
lfr->ring[i].idx = i - ringsz;
}
return lfr;
}
void lfring_free(lfring_t *lfr)
{
if (!lfr)
return;
if (lfr->head != lfr->tail) {
assert(0 && "ring buffer not empty");
return;
}
osal_free(lfr);
}
/* True if 'a' is before 'b' ('a' < 'b') in serial number arithmetic */
static inline bool before(ringidx_t a, ringidx_t b)
{
return (intptr_t)(a - b) < 0;
}
static inline ringidx_t cond_update(ringidx_t *loc, ringidx_t neu)
{
ringidx_t old = __atomic_load_n(loc, __ATOMIC_RELAXED);
do {
if (before(neu, old)) /* neu < old */
return old;
/* if neu > old, need to update *loc */
} while (!__atomic_compare_exchange_n(loc, &old, /* Updated on failure */
neu,
/* weak */ true, __ATOMIC_RELEASE,
__ATOMIC_RELAXED));
return neu;
}
static inline ringidx_t cond_reload(ringidx_t idx, const ringidx_t *loc)
{
ringidx_t fresh = __atomic_load_n(loc, __ATOMIC_RELAXED);
if (before(idx, fresh)) { /* fresh is after idx, use this instead */
idx = fresh;
} else {
/* If idx is after fresh, continue with next slot */
idx++;
}
return idx;
}
/* Enqueue elements at tail */
uint32_t lfring_enqueue(lfring_t *lfr,
void *const *restrict elems,
uint32_t n_elems)
{
intptr_t actual = 0;
ringidx_t mask = lfr->mask;
ringidx_t size = mask + 1;
ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_RELAXED);
if (lfr->flags & LFRING_FLAG_SP) { /* single-producer */
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE);
actual = MIN((intptr_t)(head + size - tail), (intptr_t) n_elems);
if (actual <= 0)
return 0;
for (uint32_t i = 0; i < (uint32_t) actual; i++) {
assert(lfr->ring[tail & mask].idx == tail - size);
lfr->ring[tail & mask].ptr = *elems++;
lfr->ring[tail & mask].idx = tail;
tail++;
}
__atomic_store_n(&lfr->tail, tail, __ATOMIC_RELEASE);
return (uint32_t) actual;
}
/* else: lock-free multi-producer */
restart:
while ((uint32_t) actual < n_elems &&
before(tail, __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE) + size)) {
union {
struct element e;
ptrpair_t pp;
} old, neu;
void *elem = elems[actual];
struct element *slot = &lfr->ring[tail & mask];
old.e.ptr = __atomic_load_n(&slot->ptr, __ATOMIC_RELAXED);
old.e.idx = __atomic_load_n(&slot->idx, __ATOMIC_RELAXED);
do {
if (UNLIKELY(old.e.idx != tail - size)) {
if (old.e.idx != tail) {
/* We are far behind. Restart with fresh index */
tail = cond_reload(tail, &lfr->tail);
goto restart;
}
/* slot already enqueued */
tail++; /* Try next slot */
goto restart;
}
/* Found slot that was used one lap back.
* Try to enqueue next element.
*/
neu.e.ptr = elem;
neu.e.idx = tail; /* Set idx on enqueue */
} while (!lf_compare_exchange((ptrpair_t *) slot, &old.pp, neu.pp));
/* Enqueue succeeded */
actual++;
tail++; /* Continue with next slot */
}
(void) cond_update(&lfr->tail, tail);
return (uint32_t) actual;
}
static inline ringidx_t find_tail(lfring_t *lfr, ringidx_t head, ringidx_t tail)
{
if (lfr->flags & LFRING_FLAG_SP) /* single-producer enqueue */
return __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
/* Multi-producer enqueue.
* Scan ring for new elements that have been written but not released.
*/
ringidx_t mask = lfr->mask;
ringidx_t size = mask + 1;
while (before(tail, head + size) &&
__atomic_load_n(&lfr->ring[tail & mask], __ATOMIC_ACQUIRE) ==
tail)
tail++;
tail = cond_update(&lfr->tail, tail);
return tail;
}
/* Dequeue elements from head */
uint32_t lfring_dequeue(lfring_t *lfr,
void **restrict elems,
uint32_t n_elems,
uint32_t *index)
{
ringidx_t mask = lfr->mask;
intptr_t actual;
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED);
ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
do {
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
if (UNLIKELY(actual <= 0)) {
/* Ring buffer is empty, scan for new but unreleased elements */
tail = find_tail(lfr, head, tail);
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
if (actual <= 0)
return 0;
}
for (uint32_t i = 0; i < (uint32_t) actual; i++)
elems[i] = lfr->ring[(head + i) & mask].ptr;
smp_fence(LoadStore); // Order loads only
if (UNLIKELY(lfr->flags & LFRING_FLAG_SC)) { /* Single-consumer */
__atomic_store_n(&lfr->head, head + actual, __ATOMIC_RELAXED);
break;
}
/* else: lock-free multi-consumer */
} while (!__atomic_compare_exchange_n(
&lfr->head, &head, /* Updated on failure */
head + actual,
/* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
*index = (uint32_t) head;
return (uint32_t) actual;
}
```
DDD: 當 idx is after fresh 代表需要更新 idx 來維持最新進度。
KKK: 在 `alloc()` 中有 `lfr->mask = ringsz - 1;` 所以可以知道 `size = mask + 1` ,其實在 `dnqueue()` 裡面也有同樣的一行程式碼。
TTT: 從前面 single producer 的地方 `__atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE)` 可以推測這邊也要用一樣的 atomic operation ,但必須考慮到 multiple producer 的情況,這邊再參考前面 enqueue 中的操作 把 `&lfr->tail` 改成 `lfr->ring[tail & mask]` 。
HHH: 查看 [__atomic Builtins (gcc)](https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html),`__atomic_compare_exchange_n` the third parameter is `type desired` 當前面兩個參數一致時,會把 `desired` 寫入第一個參數