--- tags: linux2022 --- # 2022-04-04 [HScallop](https://github.com/HScallop/linux2022-quiz8) [quiz8](https://hackmd.io/@sysprog/linux2022-quiz8/https%3A%2F%2Fhackmd.io%2F%40sysprog%2FHyo0W7OQ5) ## 測驗 `1` ```c #include <stddef.h> #include <stdint.h> #include <limits.h> #include <string.h> /* Nonzero if either X or Y is not aligned on a "long" boundary */ #define UNALIGNED(X) ((long) X & (sizeof(long) - 1)) /* How many bytes are loaded each iteration of the word copy loop */ #define LBLOCKSIZE (sizeof(long)) /* Threshhold for punting to the bytewise iterator */ #define TOO_SMALL(LEN) ((LEN) < LBLOCKSIZE) #if LONG_MAX == 2147483647L #define DETECT_NULL(X) (((X) -0x01010101) & ~(X) & 0x80808080) #else #if LONG_MAX == 9223372036854775807L /* Nonzero if X (a long int) contains a NULL byte. */ #define DETECT_NULL(X) (((X) -0x0101010101010101) & ~(X) & 0x8080808080808080) #else #error long int is not a 32bit or 64bit type. #endif #endif /* @return nonzero if (long)X contains the byte used to fill MASK. */ #define DETECT_CHAR(X, MASK) (DETECT_NULL(X ^ MASK)) void *memchr_opt(const void *src_void, int c, size_t length) { const unsigned char *src = (const unsigned char *) src_void; unsigned char d = c; while (UNALIGNED(src)) { if (!length--) return NULL; if (*src == d) return (void *) src; src++; } if (!TOO_SMALL(length)) { /* If we get this far, we know that length is large and * src is word-aligned. */ /* The fast code reads the source one word at a time and only performs * the bytewise search on word-sized segments if they contain the search * character, which is detected by XORing the word-sized segment with a * word-sized block of the search character and then detecting for the * presence of NULL in the result. */ unsigned long *asrc = (unsigned long *) src; unsigned long mask = d << 8 | d; mask = mask << 16 | mask; for (unsigned int i = 32; i < LBLOCKSIZE * 8; i <<= 1) mask = (mask << i) | mask; /* Use unsigned long mask to check multiple characters at once. * The loop will terminate when length of string is too small. */ while (length >= LBLOCKSIZE) { if (DETECT_CHAR(*asrc, mask)) break; length -= LBLOCKSIZE; asrc++; } /* If there are fewer than LBLOCKSIZE characters left, then we resort to * the bytewise loop. */ src = (unsigned char *) asrc; } while (length--) { if (*src == d) return (void *) src; src++; } return NULL; } ``` 把多個 char 拼起來變成 long 再用我們做好的 mask 就可以一次辨識多個 char(在此題中一次可辨識的數量與 data model 有關,在 lp64 中可以一次辨識 8 個 char)。 ## 測驗 `2` ```c #include <assert.h> #include <inttypes.h> #include <stdbool.h> #include <stdlib.h> #include "arch.h" #include "common.h" #include "lfring.h" #define SUPPORTED_FLAGS \ (LFRING_FLAG_SP | LFRING_FLAG_MP | LFRING_FLAG_SC | LFRING_FLAG_MC) #define MIN(a, b) \ ({ \ __typeof__(a) tmp_a = (a); \ __typeof__(b) tmp_b = (b); \ tmp_a < tmp_b ? tmp_a : tmp_b; \ }) typedef uintptr_t ringidx_t; struct element { void *ptr; uintptr_t idx; }; struct lfring { ringidx_t head; ringidx_t tail ALIGNED(CACHE_LINE); uint32_t mask; uint32_t flags; struct element ring[] ALIGNED(CACHE_LINE); } ALIGNED(CACHE_LINE); lfring_t *lfring_alloc(uint32_t n_elems, uint32_t flags) { unsigned long ringsz = ROUNDUP_POW2(n_elems); if (n_elems == 0 || ringsz == 0 || ringsz > 0x80000000) { assert(0 && "invalid number of elements"); return NULL; } if ((flags & ~SUPPORTED_FLAGS) != 0) { assert(0 && "invalid flags"); return NULL; } size_t nbytes = sizeof(lfring_t) + ringsz * sizeof(struct element); lfring_t *lfr = osal_alloc(nbytes, CACHE_LINE); if (!lfr) return NULL; lfr->head = 0, lfr->tail = 0; lfr->mask = ringsz - 1; lfr->flags = flags; for (ringidx_t i = 0; i < ringsz; i++) { lfr->ring[i].ptr = NULL; lfr->ring[i].idx = i - ringsz; } return lfr; } void lfring_free(lfring_t *lfr) { if (!lfr) return; if (lfr->head != lfr->tail) { assert(0 && "ring buffer not empty"); return; } osal_free(lfr); } /* True if 'a' is before 'b' ('a' < 'b') in serial number arithmetic */ static inline bool before(ringidx_t a, ringidx_t b) { return (intptr_t)(a - b) < 0; } static inline ringidx_t cond_update(ringidx_t *loc, ringidx_t neu) { ringidx_t old = __atomic_load_n(loc, __ATOMIC_RELAXED); do { if (before(neu, old)) /* neu < old */ return old; /* if neu > old, need to update *loc */ } while (!__atomic_compare_exchange_n(loc, &old, /* Updated on failure */ neu, /* weak */ true, __ATOMIC_RELEASE, __ATOMIC_RELAXED)); return neu; } static inline ringidx_t cond_reload(ringidx_t idx, const ringidx_t *loc) { ringidx_t fresh = __atomic_load_n(loc, __ATOMIC_RELAXED); if (before(idx, fresh)) { /* fresh is after idx, use this instead */ idx = fresh; } else { /* If idx is after fresh, continue with next slot */ idx++; } return idx; } /* Enqueue elements at tail */ uint32_t lfring_enqueue(lfring_t *lfr, void *const *restrict elems, uint32_t n_elems) { intptr_t actual = 0; ringidx_t mask = lfr->mask; ringidx_t size = mask + 1; ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_RELAXED); if (lfr->flags & LFRING_FLAG_SP) { /* single-producer */ ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE); actual = MIN((intptr_t)(head + size - tail), (intptr_t) n_elems); if (actual <= 0) return 0; for (uint32_t i = 0; i < (uint32_t) actual; i++) { assert(lfr->ring[tail & mask].idx == tail - size); lfr->ring[tail & mask].ptr = *elems++; lfr->ring[tail & mask].idx = tail; tail++; } __atomic_store_n(&lfr->tail, tail, __ATOMIC_RELEASE); return (uint32_t) actual; } /* else: lock-free multi-producer */ restart: while ((uint32_t) actual < n_elems && before(tail, __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE) + size)) { union { struct element e; ptrpair_t pp; } old, neu; void *elem = elems[actual]; struct element *slot = &lfr->ring[tail & mask]; old.e.ptr = __atomic_load_n(&slot->ptr, __ATOMIC_RELAXED); old.e.idx = __atomic_load_n(&slot->idx, __ATOMIC_RELAXED); do { if (UNLIKELY(old.e.idx != tail - size)) { if (old.e.idx != tail) { /* We are far behind. Restart with fresh index */ tail = cond_reload(tail, &lfr->tail); goto restart; } /* slot already enqueued */ tail++; /* Try next slot */ goto restart; } /* Found slot that was used one lap back. * Try to enqueue next element. */ neu.e.ptr = elem; neu.e.idx = tail; /* Set idx on enqueue */ } while (!lf_compare_exchange((ptrpair_t *) slot, &old.pp, neu.pp)); /* Enqueue succeeded */ actual++; tail++; /* Continue with next slot */ } (void) cond_update(&lfr->tail, tail); return (uint32_t) actual; } static inline ringidx_t find_tail(lfring_t *lfr, ringidx_t head, ringidx_t tail) { if (lfr->flags & LFRING_FLAG_SP) /* single-producer enqueue */ return __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE); /* Multi-producer enqueue. * Scan ring for new elements that have been written but not released. */ ringidx_t mask = lfr->mask; ringidx_t size = mask + 1; while (before(tail, head + size) && __atomic_load_n(&lfr->ring[tail & mask], __ATOMIC_ACQUIRE) == tail) tail++; tail = cond_update(&lfr->tail, tail); return tail; } /* Dequeue elements from head */ uint32_t lfring_dequeue(lfring_t *lfr, void **restrict elems, uint32_t n_elems, uint32_t *index) { ringidx_t mask = lfr->mask; intptr_t actual; ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED); ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE); do { actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems); if (UNLIKELY(actual <= 0)) { /* Ring buffer is empty, scan for new but unreleased elements */ tail = find_tail(lfr, head, tail); actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems); if (actual <= 0) return 0; } for (uint32_t i = 0; i < (uint32_t) actual; i++) elems[i] = lfr->ring[(head + i) & mask].ptr; smp_fence(LoadStore); // Order loads only if (UNLIKELY(lfr->flags & LFRING_FLAG_SC)) { /* Single-consumer */ __atomic_store_n(&lfr->head, head + actual, __ATOMIC_RELAXED); break; } /* else: lock-free multi-consumer */ } while (!__atomic_compare_exchange_n( &lfr->head, &head, /* Updated on failure */ head + actual, /* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); *index = (uint32_t) head; return (uint32_t) actual; } ``` DDD: 當 idx is after fresh 代表需要更新 idx 來維持最新進度。 KKK: 在 `alloc()` 中有 `lfr->mask = ringsz - 1;` 所以可以知道 `size = mask + 1` ,其實在 `dnqueue()` 裡面也有同樣的一行程式碼。 TTT: 從前面 single producer 的地方 `__atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE)` 可以推測這邊也要用一樣的 atomic operation ,但必須考慮到 multiple producer 的情況,這邊再參考前面 enqueue 中的操作 把 `&lfr->tail` 改成 `lfr->ring[tail & mask]` 。 HHH: 查看 [__atomic Builtins (gcc)](https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html),`__atomic_compare_exchange_n` the third parameter is `type desired` 當前面兩個參數一致時,會把 `desired` 寫入第一個參數