---
tags: linux2022
---
# 2022q1 2022-04-04 quiz
contributed by < [ganoliz](https://github.com/ganoliz) >
## 測驗一
[SIMD within a register (SWAR)](https://en.wikipedia.org/wiki/SWAR) 是軟體最佳化技巧之一,以下展示 SWAR 運用於 64 位元微處理器架構,原本判斷 2 個 32 位元寬度的整數是否都是奇數 (odd),可能會這樣撰寫:
```c
#include <stdint.h>
bool both_odd(uint32_t x, uint32_t y) {
return (x & 1) && (y & 1);
}
```
但我們可先組合 (compound) 2 個 32 位元寬度的整數為 1 個 64 位元整數,再運用特製的 bitmask,從而減少運算量:
```c
static uint64_t SWAR_ODD_MASK = (1L << 32) + 1;
bool both_odd_swar(uint64_t xy) {
return (xy & SWAR_ODD_MASK) == SWAR_ODD_MASK;
}
```
在 Linux 核心原始程式碼中,lib/string.c 具備 memchr 的實作:
```c
/**
* memchr - Find a character in an area of memory.
* @s: The memory area
* @c: The byte to search for
* @n: The size of the area.
*
* returns the address of the first occurrence of @c, or %NULL
* if @c is not found
*/
void *memchr(const void *s, int c, size_t n)
{
const unsigned char *p = s;
while (n-- != 0) {
if ((unsigned char)c == *p++) {
return (void *)(p - 1);
}
}
return NULL;
}
```
利用上述 SIMD within a register (SWAR) 的技巧,我們可改寫為以下 memchr_opt 函式:
```c
#include <stddef.h>
#include <stdint.h>
#include <limits.h>
#include <string.h>
/* Nonzero if either X or Y is not aligned on a "long" boundary */
#define UNALIGNED(X) ((long) X & (sizeof(long) - 1))
/* How many bytes are loaded each iteration of the word copy loop */
#define LBLOCKSIZE (sizeof(long))
/* Threshhold for punting to the bytewise iterator */
#define TOO_SMALL(LEN) ((LEN) < LBLOCKSIZE)
#if LONG_MAX == 2147483647L
#define DETECT_NULL(X) (((X) -0x01010101) & ~(X) & 0x80808080)
#else
#if LONG_MAX == 9223372036854775807L
/* Nonzero if X (a long int) contains a NULL byte. */
#define DETECT_NULL(X) (((X) -0x0101010101010101) & ~(X) & 0x8080808080808080)
#else
#error long int is not a 32bit or 64bit type.
#endif
#endif
/* @return nonzero if (long)X contains the byte used to fill MASK. */
#define DETECT_CHAR(X, MASK) (DETECT_NULL(X ^ MASK))
void *memchr_opt(const void *src_void, int c, size_t length)
{
const unsigned char *src = (const unsigned char *) src_void;
unsigned char d = c;
while (UNALIGNED(src)) {
if (!length--)
return NULL;
if (*src == d)
return (void *) src;
src++;
}
if (!TOO_SMALL(length)) {
/* If we get this far, we know that length is large and
* src is word-aligned.
*/
/* The fast code reads the source one word at a time and only performs
* the bytewise search on word-sized segments if they contain the search
* character, which is detected by XORing the word-sized segment with a
* word-sized block of the search character and then detecting for the
* presence of NULL in the result.
*/
unsigned long *asrc = (unsigned long *) src;
unsigned long mask = d << 8 | d;
mask = mask << 16 | mask;
for (unsigned int i = 32; i < LBLOCKSIZE * 8; i <<= 1)
mask = (mask << i) | mask;
while (length >= LBLOCKSIZE) {
/* XXXXX: Your implementation should appear here */
}
/* If there are fewer than LBLOCKSIZE characters left, then we resort to
* the bytewise loop.
*/
src = (unsigned char *) asrc;
}
while (length--) {
if (*src == d)
return (void *) src;
src++;
}
return NULL;
}
```
請補完程式碼,使上述 memchr_opt 的實作符合 memchr 行為,作答規範:
1. 列出 memchr_opt 函式完整程式碼,儘量撰寫程式註解
2. XXXX 所在的 scope 應該利用 DETECT_CHAR 巨集
3. 儘量以最精簡的程式碼撰寫
### 解題思路
根據 [SWAR] (https://www.chessprogramming.org/SIMD_and_SWAR_Techniques)的架構範例程式碼(H = 0x8080808080808080 , L = 0x0101010101010101):
```c
SWAR add z = x + y
z = ((x &~H) + (y &~H)) ^ ((x ^ y) & H)
SWAR sub z = x - y
z = ((x | H) - (y &~H)) ^ ((x ^~y) & H)
SWAR average z = (x+y)/2 based on x + y = (x^y) + 2*(x&y)
z = (x & y) + (((x ^ y) & ~L) >> 1)
```
可知我們使用位元遮罩來遮掉部分位元(比如這裡是 sign bit )運算, Add 就是將 Sum 總和與 carry 分開算完之後再 xor 加起來。
```c=
while (length >= LBLOCKSIZE) {
/* XXXXX: Your implementation should appear here */
printf("length= %d",length);
printf("LBLOCKSIZE= %d",LBLOCKSIZE);
//printf("mask= %lu",mask);
if ( DETECT_CHAR( *asrc, mask)) {
length = LBLOCKSIZE;
break;
}
else {
length -= LBLOCKSIZE;
asrc++;
}
}
```
基本上 mask 就是好幾個 char (比如 32 bits 就有 4 個)我們要找的 char d ,透過巨集 DETECT_CHAR 。在這裡 xor 運算只有對於相同的值 才會變為零,因此我們把我們 32 bits 的值與 mask 檢查是否有 bytes 為 0 ,若有則表示我們要找的字元在此四個 char * 指標之間,若沒有則繼續往後找。因此程式碼應如上述所示。
## 測驗二
考慮 lfring 是個 lock-free ring buffer 實作,並支援 multiple-producer/multiple-consumer (MPMC) 的情境。測試程式的參考輸出:
```
$ ./lfring
testing MPMC lock-free ring
testing MPSC lock-free ring
testing SPMC lock-free ring
testing SPSC lock-free ring
```
執行過程不會觸發任何 assert 失敗。
lfring 目前只支援 x86-64 架構,可在 Linux 和 macOS 執行,程式碼可見 [gist](https://gist.github.com/jserv/f810c45ad4423f406f9e0dbe9dabadc9) (部分程式碼隱蔽)
需要補完的函式列表: (留意 DDD, KKK, TTT, HHH 等處)
```c
static inline ringidx_t cond_reload(ringidx_t idx, const ringidx_t *loc)
{
ringidx_t fresh = __atomic_load_n(loc, __ATOMIC_RELAXED);
if (before(idx, fresh)) { /* fresh is after idx, use this instead */
idx = fresh;
} else { /* Continue with next slot */
/* XXXXX */ DDD;
}
return idx;
}
static inline ringidx_t find_tail(lfring_t *lfr, ringidx_t head, ringidx_t tail)
{
if (lfr->flags & LFRING_FLAG_SP) /* single-producer enqueue */
return __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
/* Multi-producer enqueue.
* Scan ring for new elements that have been written but not released.
*/
ringidx_t mask = lfr->mask;
ringidx_t size = /* XXXXX */ KKK;
while (before(tail, head + size) &&
__atomic_load_n(/* XXXXX */ TTT) ==
tail)
tail++;
tail = cond_update(&lfr->tail, tail);
return tail;
}
uint32_t lfring_dequeue(lfring_t *lfr,
void **restrict elems,
uint32_t n_elems,
uint32_t *index)
{
ringidx_t mask = lfr->mask;
intptr_t actual;
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED);
ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
do { /* skipped */
} while (!__atomic_compare_exchange_n(
&lfr->head, &head, /* Updated on failure */
/* XXXXX */ HHH,
/* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
*index = (uint32_t) head;
return (uint32_t) actual;
}
```
請補完程式碼,使得執行符合預期。作答規範:
* 儘量以最精簡的程式碼撰寫
* 儘量撰寫程式註解
:::success
延伸問題:
1. 解釋上述程式碼運作原理,搭配閱讀 DPDK: Ring Library 解說,儘量引入圖解和強調 MPMC 的考量
2. 提出改進策略並著手實作
3. 研讀 Linux 核心 kfifo 文件,搭配 kfifo-examples 測試,以 git 取得 linux 程式碼工作副本,觀察 git log include/linux/kfifo.h lib/kfifo.c 並觀察修改記錄
* 留意 spin_unlock_irqrestore 的使用
* 解釋 commit 7e10505 的 race condition 成因
4. 將 lfring 移植到 Linux 核心,並提供對應的測試及效能評比程式
:::
### 解題思路
TTT = size