---
tags: 2022 linux kernel
---
# 2022q1 Homework5 (quiz8)
contributed by < [`Risheng1128`](https://github.com/Risheng1128) >
> [作業說明](https://hackmd.io/@sysprog/linux2022-homework5)
> [測驗題目](https://hackmd.io/@sysprog/linux2022-quiz8)
## [測驗 `1`](https://hackmd.io/@sysprog/linux2022-quiz8/https%3A%2F%2Fhackmd.io%2F%40sysprog%2FHyg5nxO79)
:::info
延伸問題:
- [x] 解釋上述程式碼運作原理
- [x] 比較 Linux 核心原本 (與平台無關) 的實作和 `memchr_opt` ,設計實驗來觀察隨著字串長度和特定 pattern 變化的效能影響
- [x] 在 Linux 核心原始程式碼找出 x86_64 或 arm64 對應的最佳化實作,跟上述程式碼比較,並嘗試舉出其中的策略和分析
:::
### 程式碼運作原理
以下為分析的完整程式碼
:::spoiler `memchr_opt`
```c
void *memchr_opt(const void *src_void, int c, size_t length)
{
const unsigned char *src = (const unsigned char *) src_void;
unsigned char d = c;
// 如果 src 的地址沒有對齊 8 byte ,就一個一個字元找
while (UNALIGNED(src)) {
if (!length--)
return NULL;
if (*src == d)
return (void *) src;
src++;
}
// 表示 length > LBLOCKSIZE
if (!TOO_SMALL(length)) {
/* If we get this far, we know that length is large and
* src is word-aligned.
*/
/* The fast code reads the source one word at a time and only performs
* the bytewise search on word-sized segments if they contain the search
* character, which is detected by XORing the word-sized segment with a
* word-sized block of the search character and then detecting for the
* presence of NULL in the result.
*/
unsigned long *asrc = (unsigned long *) src;
// 建立 Mask ,讓 mask 的每個 byte 都是目標字元
unsigned long mask = d << 8 | d;
mask = mask << 16 | mask;
for (unsigned int i = 32; i < LBLOCKSIZE * 8; i <<= 1)
mask = (mask << i) | mask;
while (length >= LBLOCKSIZE) {
/* XXXXX: Your implementation should appear here */
unsigned long tmp;
// 包含 NULL ,表示已經找到目標字元
if ((tmp = DETECT_CHAR(*asrc, mask))) {
// 計算 asrc 要移動多少 byte
unsigned long shift = __builtin_ctzl(tmp) >> 3;
// 回傳 asrc 移動後指到的地址
return (char *) asrc + shift;
}
// asrc 指到下 8 個 byte 的位置
asrc++;
// length 剪掉 LBLOCKSIZE 的長度
length -= LBLOCKSIZE;
}
/* If there are fewer than LBLOCKSIZE characters left, then we resort to
* the bytewise loop.
*/
src = (unsigned char *) asrc;
}
// 最後當長度小於 LBLOCKSIZE 時如果還沒有找到目標字元,剩下的部份一個一個找
while (length--) {
if (*src == d)
return (void *) src;
src++;
}
return NULL;
}
```
:::
接著將程式碼拆開來一一解釋
```c
/* Nonzero if either X or Y is not aligned on a "long" boundary */
#define UNALIGNED(X) ((long) X & (sizeof(long) - 1))
// 如果 src 的地址沒有對齊 8 byte ,就一個一個字元找
while (UNALIGNED(src)) {
if (!length--)
return NULL;
if (*src == d)
return (void *) src;
src++;
}
```
根據巨集函式 `UNALIGNED` 可以發現其目的是判斷資料的地址是否對齊 `8` 位元組,因此上面的迴圈是要判斷 `src` 所指到的地址是否對齊 `8` 位元組,如果是則不會進迴圈,否則會進迴圈
```c
/* Threshhold for punting to the bytewise iterator */
#define TOO_SMALL(LEN) ((LEN) < LBLOCKSIZE)
if (!TOO_SMALL(length)) {
/* If we get this far, we know that length is large and
* src is word-aligned.
*/
```
判斷字串的長度是否有大於 `LBLOCKSZE` ,這邊的 `LBLOCKSZE` 為 `long` 的大小
```c
// 建立 Mask ,讓 mask 的每個 byte 都是目標字元
unsigned long mask = d << 8 | d;
mask = mask << 16 | mask;
for (unsigned int i = 32; i < LBLOCKSIZE * 8; i <<= 1)
mask = (mask << i) | mask;
```
建立 `mask` ,結果為一個 `unsigned long` 的大小,且每個位元組都是 `d` 的數值
接著是題目的要求
```c
while (length >= LBLOCKSIZE) {
/* XXXXX: Your implementation should appear here */
unsigned long tmp;
// 包含 NULL ,表示已經找到目標字元
if ((tmp = DETECT_CHAR(*asrc, mask))) {
// 計算 asrc 要移動多少 byte
unsigned long shift = __builtin_ctzl(tmp) >> 3;
// 回傳 asrc 移動後指到的地址
return (char *) asrc + shift;
}
// asrc 指到下 8 個 byte 的位置
asrc++;
// length 剪掉 LBLOCKSIZE 的長度
length -= LBLOCKSIZE;
}
```
使用巨集函式 `DETECT_CHAR` 搜尋是否有目標字元出現,如果有的話利用 `__builtin_ctzl` 計算是位於第幾個位元組,最後回傳移動後的地址
```c
// 最後當長度小於 LBLOCKSIZE 時如果還沒有找到目標字元,剩下的部份一個一個找
while (length--) {
if (*src == d)
return (void *) src;
src++;
}
return NULL;
```
而最後的情況則是如果原本的字串長度小於 8 ,或是找到長度小於 8 後還沒找到目標字元,就會執行上述程式碼,直接一個一個找
### 比較 Linux 核心原本 (與平台無關) 的實作
Linux 核心原本版本位於 [lib/string.c](https://github.com/torvalds/linux/blob/master/lib/string.c) ,以下為原始碼實作
```c
/**
* memchr - Find a character in an area of memory.
* @s: The memory area
* @c: The byte to search for
* @n: The size of the area.
*
* returns the address of the first occurrence of @c, or %NULL
* if @c is not found
*/
void *memchr(const void *s, int c, size_t n)
{
const unsigned char *p = s;
while (n-- != 0) {
if ((unsigned char)c == *p++) {
return (void *)(p - 1);
}
}
return NULL;
}
```
很明顯, Linux 核心原始碼的 `memchr` 是使用一個一個找字元的方法
接著參考同學 [arthurchang09](https://hackmd.io/@arthur-chang/linux2022-quiz8) 的開發紀錄,採用類似的方法進行測量
- 指定由 `core 0` 執行程式
- 編譯器優化選項使用 `-O0`
- 宣告一個長度為 `1000` 的字元陣列,每個元素初始化為字元 `0` ,目標字元 `4` 會逐一放在第 0 個字元、第 1 個字元直到第 999 字元為 `\0`
- 使用 `clock_gettime` 測量找到目標字元的時間
- 總共測量 50 次並使用 [fibdrv: 用統計手法去除極端值](https://hackmd.io/@sysprog/linux2022-fibdrv#%E7%94%A8%E7%B5%B1%E8%A8%88%E6%89%8B%E6%B3%95%E5%8E%BB%E9%99%A4%E6%A5%B5%E7%AB%AF%E5%80%BC) 的方法去除極端值
以下為測試程式碼
:::spoiler `time.c`
```c
#define STRSIZE 1000U
long long elapse(struct timespec *start, struct timespec *end)
{
return (long long) (end->tv_sec - start->tv_sec) * 1e9 + (long long) (end->tv_nsec - start->tv_nsec);
}
int main(void)
{
char str[STRSIZE];
const char target = '4';
struct timespec start, end;
long long origin_time = 0, opt_time;
memset(str, '0', STRSIZE - 1);
str[STRSIZE - 1] = '\0';
for (int i = 0; i < STRSIZE - 1; i++) {
str[i] = target;
clock_gettime(CLOCK_MONOTONIC, &start);
memchr_origin(str, target, STRSIZE - 1);
clock_gettime(CLOCK_MONOTONIC, &end);
origin_time = elapse(&start, &end);
clock_gettime(CLOCK_MONOTONIC, &start);
memchr_opt(str, target, STRSIZE - 1);
clock_gettime(CLOCK_MONOTONIC, &end);
opt_time = elapse(&start, &end);
printf("%lld %lld\n", origin_time, opt_time);
origin_time = opt_time = 0;
str[i] = '0';
}
return 0;
}
```
:::
並且在檔案 `eliminate.py` 進行極端值的刪除
::: spoiler `eliminate.py`
```python
import subprocess
import numpy as np
def process(datas, samples, threshold = 2):
datas = np.array(datas)
res = np.zeros((samples, 2))
# 分別計算 time_origin 及 time_opt 的平均
mean = [datas[:,0].mean(), datas[:,1].mean()]
# 分別計算 time_origin 及 time_opt 的標準差
std = [datas[:,0].std(), datas[:,1].std()]
cnt = 0 # 計算有幾組資料被捨去
for i in range(samples):
for j in range(runs):
tmp = np.abs((datas[j * samples + i] - mean) / std) # 計算資料是多少標準差
# 如果某一組的某個資料過大,整組捨去
if tmp[0] > threshold or tmp[1] > threshold:
cnt += 1
datas[j * samples + i] = [0, 0]
res[i] += datas[j * samples + i]
res[i] /= (runs - cnt) # 剩下的資料取平均
cnt = 0 # count 歸 0
return res
if __name__ == '__main__':
runs = 50 # 執行次數
samples = 999 # 一次有幾個點
datas = []
for i in range(runs):
# 執行採樣
subprocess.run('sudo taskset 0x1 ./time > out', shell = True)
# 讀取資料
fr = open('out', 'r')
for line in fr.readlines():
datas.append(line.split(' ', 2))
fr.close()
datas = np.array(datas).astype(np.double)
# 存回資料
np.savetxt('final', process(datas, samples))
```
:::
最後的測試結果
![](https://i.imgur.com/6gnx71n.png)
可以發現我們所使用的 `mem_opt` 比 Linux 核心的 `memchr` 快了非常多
### x86_64 或 ARM64 對應的最佳化實作
#### [x86-64](https://github.com/torvalds/linux/blob/master/arch/x86/lib/string_32.c)
```c
void *memchr(const void *cs, int c, size_t count)
{
int d0;
void *res;
if (!count)
return NULL;
asm volatile("repne\n\t"
"scasb\n\t"
"je 1f\n\t"
"movl $1,%0\n"
"1:\tdecl %0"
: "=D" (res), "=&c" (d0)
: "a" (c), "0" (cs), "1" (count)
: "memory");
return res;
}
```
參考 [Repeat String Operation Prefix](https://www.felixcloutier.com/x86/rep:repe:repz:repne:repnz) 及 [Scan String](https://www.felixcloutier.com/x86/scas:scasb:scasw:scasd)
- `REPNE SCAS m8`: Find `AL`, starting at `ES:[(E)DI]` or `[RDI]`
- `SCASB`: Compare `AL` with byte at `ES:(E)DI` or `RDI` then set status flags
從上述的組合語言,可以大致推論實作方法為一個一個字元比較,直到找到目標為止,和 Linux 核心無平台的方法相似
接著可以開始測量 x86-64 實作的效能,首先編譯產生以下錯誤
```shell
memchr.h: Assembler messages:
memchr.h:50: Error: incorrect register `%rdi' used with `l' suffix
memchr.h:51: Error: incorrect register `%rdi' used with `l' suffix
```
將 `movl` 及 `decl` 分別改成 `movq` 及 `decq` 即可,接著為測量結果,使用的方法和上面的方式一樣,以下為測量結果
![](https://i.imgur.com/30XnSHA.png)
可以看到測量的結果為 `opt` > `x86-64` > `origin`
#### [ARM64](https://github.com/torvalds/linux/blob/master/arch/arm64/lib/memchr.S)
```c=
/*
* Find a character in an area of memory.
*
* Parameters:
* x0 - buf
* x1 - c
* x2 - n
* Returns:
* x0 - address of first occurrence of 'c' or 0
*/
#define L(label) .L ## label
#define REP8_01 0x0101010101010101
#define REP8_7f 0x7f7f7f7f7f7f7f7f
#define srcin x0
#define chrin w1
#define cntin x2
#define result x0
#define wordcnt x3
#define rep01 x4
#define repchr x5
#define cur_word x6
#define cur_byte w6
#define tmp x7
#define tmp2 x8
.p2align 4
nop
SYM_FUNC_START(__pi_memchr)
and chrin, chrin, #0xff // chrin = chrin & 0xff
lsr wordcnt, cntin, #3 // wordcnt = cntin >> 3
cbz wordcnt, L(byte_loop) // 如果 wordcnt 為 0 ,表示字串長度小於 8 ,跳進 byte_loop
mov rep01, #REP8_01 // rep01 = 0x0101010101010101
mul repchr, x1, rep01 // repchr = x1 * rep01 (建立 mask)
and cntin, cntin, #7 // cntin = cntin & 7
L(word_loop):
ldr cur_word, [srcin], #8 // LDR (post-indexing) cur_word = *srcin; srcin += 8;
sub wordcnt, wordcnt, #1 // wordcnt = wordcnt - 1
eor cur_word, cur_word, repchr // cur_word = cur_word ^ repchr (用來尋找目標字元,發現的話該位元組位置數值 0x00)
sub tmp, cur_word, rep01 // tmp = cur_word - rep01
orr tmp2, cur_word, #REP8_7f // tmp2 = cur_word | 0x7f7f7f7f7f7f7f7f
bics tmp, tmp, tmp2 // (Bit clear) tmp = tmp & ~tmp2
b.ne L(found_word) // 判斷是否找到目標字元,有就進到 found_word
cbnz wordcnt, L(word_loop) // 如果 wordcnt 還沒有為 0 ,就跳進 word_loop
L(byte_loop):
cbz cntin, L(not_found) // 如果 cntin 為 0 ,表示沒有字元,跳進 not_found
ldrb cur_byte, [srcin], #1 // b: unsigned byte | cur_byte = *srcin; srcin++;
sub cntin, cntin, #1 // cntin--
cmp cur_byte, chrin // 判斷字元是否相同
b.ne L(byte_loop) // 如果不同就回到 byte_loop
sub srcin, srcin, #1 // srcin--; 因為 ldrb 已經往後移動 srcin 指到的位址,因此需要往前移動一個 byte
ret
L(found_word):
CPU_LE( rev tmp, tmp) // 判斷是否要改變 byte order
clz tmp, tmp // count leading zeros
sub tmp, tmp, #64 // tmp = tmp - 64
add result, srcin, tmp, asr #3 # result = srcin + tmp >> 3
ret
L(not_found):
mov result, #0 // 回傳 0
ret
```
參考 [Arm A64 Instruction Set Architecture](https://developer.arm.com/documentation/ddi0596/2021-12/?lang=en) 得到上述的分析結果,發現在 ARM64 的實作和小考的題目相似都是使用 [SWAR](https://en.wikipedia.org/wiki/SWAR) 的手法解決
- `line 38` 對應小考的變數 `mask` 差在 ARM64 使用 `mul` 指令,而小考則是用迴圈加上 bitwise 實作
- 從 `line 40 ~ 49` 開始則是使用 SWAR 的方法,一樣是使用 8 byte 進行計算,接著稍微分析和小考的差別
```shell
小考:
#define DETECT_CHAR(X, MASK) (DETECT_NULL(X ^ MASK))
#define DETECT_NULL(X) (((X) -0x0101010101010101) & ~(X) & 0x8080808080808080)
可以得到 DETECT_NULL 的回傳值為 (((X ^ MASK) -0x0101010101010101) & ~(X ^ MASK) & 0x8080808080808080)
ARM64:
eor cur_word, cur_word, repchr -> x ^ MASK
sub tmp, cur_word, rep01 -> (x ^ MASK) - 0x0101010101010101
orr tmp2, cur_word, #REP8_7f -> (x ^ MASK) | 0x7f7f7f7f7f7f7f7f
bics tmp, tmp, tmp2 -> ((x ^ MASK) - 0x0101010101010101) & ~((x ^ MASK) | 0x7f7f7f7f7f7f7f7f)
將上述的式子使用德摩根定律即可得到 ((x ^ MASK) - 0x0101010101010101) & (~(x ^ MASK) & 0x8080808080808080)
因此兩種方式是一樣的
```
- `line 49` 對應到小考最後的迴圈,當字串長度不足 8 時就會執行該迴圈,並使用一個一個字串判斷
- `line 57` 則是使用 SWAR 找到字元後如何取得字元地址的實作,對應到小考所要求的部份,兩者主要差別在於 ARM64 是使用指令 `clz` 計算 [Leading zero](https://en.wikipedia.org/wiki/Leading_zero) ,而小考的部份我是使用函式 `__builtin_ctzl` 計算 [Trailing zero](https://en.wikipedia.org/wiki/Trailing_zero)
## [測驗 `2`](https://hackmd.io/@sysprog/linux2022-quiz8/https%3A%2F%2Fhackmd.io%2F%40sysprog%2FrkQMKQu7c)
:::info
延伸問題:
- [x] 解釋上述程式碼運作原理,搭配閱讀 [DPDK: Ring Library](https://doc.dpdk.org/guides/prog_guide/ring_lib.html) 解說,儘量引入圖解和強調 MPMC 的考量
- [ ] 提出改進策略並著手實作
- [ ] 研讀 Linux 核心 [kfifo](https://www.kernel.org/doc/htmldocs/kernel-api/kfifo.html) 文件,搭配 [kfifo-examples](https://github.com/sysprog21/kfifo-examples) 測試,以 git 取得 [linux](https://github.com/torvalds/linux) 程式碼工作副本,觀察 `git log include/linux/kfifo.h lib/kfifo.c` 並觀察修改記錄
- 留意 `spin_unlock_irqrestore` 的使用
- 解釋 commit 7e10505 的 race condition 成因
- [ ] 將 `lfring` 移植到 Linux 核心,並提供對應的測試及效能評比程式
:::
:::success
`DDD`: `idx++`
`KKK`: `mask + 1`
`TTT`: `&lfr->ring[tail & mask].idx, __ATOMIC_RELAXED`
`HHH`: `head + actual`
:::
### 先備知識
參考 [6.55 Built-in Functions for Memory Model Aware Atomic Operations](https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html) 得到以下
- `__ATOMIC_RELAXED`: Implies no inter-thread ordering constraints.
- `__ATOMIC_CONSUME`: This is currently implemented using the stronger `__ATOMIC_ACQUIRE` memory order because of a deficiency in C++11’s semantics for memory_order_consume.
- `__ATOMIC_ACQUIRE`: Creates an inter-thread happens-before constraint from the release (or stronger) semantic store to this acquire load. Can prevent hoisting of code to before the operation.
- `__ATOMIC_RELEASE`: Creates an inter-thread happens-before constraint to acquire (or stronger) semantic loads that read from this release store. Can prevent sinking of code to after the operation.
- `__ATOMIC_ACQ_REL`: Combines the effects of both `__ATOMIC_ACQUIRE` and `__ATOMIC_RELEASE`.
- `__ATOMIC_SEQ_CST`: Enforces total ordering with all other `__ATOMIC_SEQ_CST` operations.
### 初始化
```c
enum {
LFRING_FLAG_MP = 0x0000 /* Multiple producer */,
LFRING_FLAG_SP = 0x0001 /* Single producer */,
LFRING_FLAG_MC = 0x0000 /* Multi consumer */,
LFRING_FLAG_SC = 0x0002 /* Single consumer */,
};
int main(void)
{
printf("testing MPMC lock-free ring\n");
test_ringbuffer(LFRING_FLAG_MP | LFRING_FLAG_MC);
printf("testing MPSC lock-free ring\n");
test_ringbuffer(LFRING_FLAG_MP | LFRING_FLAG_SC);
printf("testing SPMC lock-free ring\n");
test_ringbuffer(LFRING_FLAG_SP | LFRING_FLAG_MC);
printf("testing SPSC lock-free ring\n");
test_ringbuffer(LFRING_FLAG_SP | LFRING_FLAG_SC);
return 0;
}
```
根據上述程式碼,可以得知程式碼測試是由 MPMC → MPSC → SPMC → SPSC 的順序測試
```c
struct element {
void *ptr;
uintptr_t idx;
};
struct lfring {
ringidx_t head;
ringidx_t tail ALIGNED(CACHE_LINE);
uint32_t mask;
uint32_t flags;
struct element ring[] ALIGNED(CACHE_LINE);
} ALIGNED(CACHE_LINE);
lfring_t *lfring_alloc(uint32_t n_elems, uint32_t flags)
{
...
size_t nbytes = sizeof(lfring_t) + ringsz * sizeof(struct element);
lfring_t *lfr = osal_alloc(nbytes, CACHE_LINE);
if (!lfr)
return NULL;
lfr->head = 0, lfr->tail = 0;
lfr->mask = ringsz - 1;
lfr->flags = flags;
for (ringidx_t i = 0; i < ringsz; i++) {
lfr->ring[i].ptr = NULL;
lfr->ring[i].idx = i - ringsz;
}
return lfr;
}
```
從上述的宣告,可以畫出以下連續記憶體的結構,如下圖所示
```graphviz
digraph lfring {
rankdir = LR;
splines = false;
node[shape = "record"]
lfring[label = "<h>head (對齊 CACHE_LINE) | tail (對齊 CACHE_LINE) | mask | flags | ring[0] (對齊 CACHE_LINE) | ring[1] | ... | ring[ringsz - 1]"]
lfr[shape = plaintext, label = "lfr"]
lfr -> lfring:h
}
```
### enqueue
參考 [Ring Library](https://doc.dpdk.org/guides/prog_guide/ring_lib.html) , enqueue 分成兩種作法,分別針對 Single Producer 和 Multiple Producers
#### Single Producer
以下為函式 `lfring_enqueue` 執行 single producer 的部份,搭配 [7.5.1. Single Producer Enqueue](https://doc.dpdk.org/guides/prog_guide/ring_lib.html) 可以畫出實作流程圖
:::success
主要流程
1. 判斷實際可以儲存多少空間
2. 新增資料
3. 將 `tail` 更新
:::
```c=
if (lfr->flags & LFRING_FLAG_SP) { /* single-producer */
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE);
// 實際上剩下多少空間,可容納 n_elems 或是 head + size - tail (容量不足時)
actual = MIN((intptr_t)(head + size - tail), (intptr_t) n_elems);
if (actual <= 0)
return 0;
// 根據 actual 數值新增資料
for (uint32_t i = 0; i < (uint32_t) actual; i++) {
assert(lfr->ring[tail & mask].idx == tail - size);
lfr->ring[tail & mask].ptr = *elems++;
lfr->ring[tail & mask].idx = tail;
tail++;
}
// 將 tail 更新
__atomic_store_n(&lfr->tail, tail, __ATOMIC_RELEASE);
// 回傳實際儲存多少筆資料
return (uint32_t) actual;
}
```
分析上述程式碼 `line 8` 開始的迴圈,假設一開始狀態為下圖
```graphviz
digraph lfring {
splines = false;
node[shape = "record"]
queue[label = "... |<o1> obj1 | obj2 | obj3 |<empty>| ..."]
head[shape = plaintext, label = "head"]
tail[shape = plaintext, label = "tail"]
head -> queue:o1
tail -> queue:empty
}
```
接著將資料複製到 `tail` 所在的空間
```graphviz
digraph lfring {
splines = false;
node[shape = "record"]
queue[label = "... |<o1> obj1 | obj2 | obj3 |<o4> obj4| ..."]
head[shape = plaintext, label = "head"]
tail[shape = plaintext, label = "tail"]
head -> queue:o1
tail -> queue:o4
}
```
接著 `tail` 移動到下一個陣列位置
```graphviz
digraph lfring {
splines = false;
node[shape = "record"]
queue[label = "... |<o1> obj1 | obj2 | obj3 | obj4|<empty>| ..."]
head[shape = plaintext, label = "head"]
tail[shape = plaintext, label = "tail"]
head -> queue:o1
tail -> queue:empty
}
```
重複上述的過程直到一共執行 `actual` 次
#### Multiple Producers
以下為函式 `lfring_enqueue` 在 Multiple producers 的情況所執行的程式碼
```c=
restart:
while ((uint32_t) actual < n_elems &&
before(tail, __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE) + size)) {
union {
struct element e;
ptrpair_t pp;
} old, neu;
void *elem = elems[actual];
struct element *slot = &lfr->ring[tail & mask];
old.e.ptr = __atomic_load_n(&slot->ptr, __ATOMIC_RELAXED);
old.e.idx = __atomic_load_n(&slot->idx, __ATOMIC_RELAXED);
do {
if (UNLIKELY(old.e.idx != tail - size)) {
if (old.e.idx != tail) {
/* We are far behind. Restart with fresh index */
tail = cond_reload(tail, &lfr->tail);
goto restart;
}
/* slot already enqueued */
tail++; /* Try next slot */
goto restart;
}
/* Found slot that was used one lap back.
* Try to enqueue next element.
*/
neu.e.ptr = elem;
neu.e.idx = tail; /* Set idx on enqueue */
} while (!lf_compare_exchange((ptrpair_t *) slot, &old.pp, neu.pp));
/**
* 重新判斷 slot 和 old.pp 是否相同
* 如果相同表示沒有其他執行序影響 slot ,因此將 neu.pp 複製到 slot 並離開迴圈
* 如果不同表示已經有其他執行序修改 slot ,因此將 slot 複製到 old.pp 並重新執行一次
*/
/* Enqueue succeeded */
actual++;
tail++; /* Continue with next slot */
}
(void) cond_update(&lfr->tail, tail);
return (uint32_t) actual;
```
和 single producer 不同的地方在於, multiple producers 考慮到不同執行緒互相影響時要怎麼應對,以下使用流程圖表示,假設有兩個執行續 (Thread1 及 Thread2) 同時要執行 enqueue
```graphviz
digraph lfring {
splines = false;
node[shape = "record"]
queue[label = "... |<o1> obj1 | obj2 | obj3 |<empty>| ..."]
head1[shape = plaintext, label = "head1"]
head2[shape = plaintext, label = "head2"]
tail1[shape = plaintext, label = "tail1"]
tail2[shape = plaintext, label = "tail2"]
head1 -> queue:o1
head2 -> queue:o1
tail1 -> queue:empty
tail2 -> queue:empty
}
```
假設 Thread1 先成功加入資料,因此會更新共享資料的 `lfr->tail` ,如下圖所示
```graphviz
digraph lfring {
splines = false;
node[shape = "record"]
queue[label = "... |<o1> obj1 | obj2 | obj3 |<o4> obj4 |<empty>| ..."]
head1[shape = plaintext, label = "head1"]
head2[shape = plaintext, label = "head2"]
tail1[shape = plaintext, label = "tail1"]
tail2[shape = plaintext, label = "tail2"]
head1 -> queue:o1
head2 -> queue:o1
tail1 -> queue:empty
tail2 -> queue:o4
}
```
此時根據上述程式的 `line 29` ,當 Thread2 要進行加入資料的動作時,會發現 `slot` 已經不是最一開始讀取到的 `slot` ,因此會將 `slot` 複製到 `old.pp` 並重新再執行一次 enqueue ,如下圖所示
```graphviz
digraph lfring {
splines = false;
node[shape = "record"]
queue[label = "... |<o1> obj1 | obj2 | obj3 |<o4> obj4 |<empty>| ..."]
head1[shape = plaintext, label = "head1"]
head2[shape = plaintext, label = "head2"]
tail1[shape = plaintext, label = "tail1"]
tail2[shape = plaintext, label = "tail2"]
head1 -> queue:o1
head2 -> queue:o1
tail1 -> queue:empty
tail2 -> queue:empty
}
```
#### `lf_compare_exchange`
```c
union u128 {
struct {
uint64_t lo, hi;
} s;
__int128 ui;
};
static inline bool lf_compare_exchange(register __int128 *var,
__int128 *exp,
__int128 neu)
{
union u128 cmp = {.ui = *exp}, with = {.ui = neu};
bool ret;
/**
* 1. cmp.s.hi:cmp.s.lo 和 *var 比較
* 2. 如果相同,就 set ZF 且 with.s.hi:with.s.lo 複製到 *var
* 3. 如果不同,就 clear ZF 且 *var 複製到 cmp.s.hi:cmp.s.lo
*/
__asm__ __volatile__("lock cmpxchg16b %1\n\tsetz %0"
: "=q"(ret), "+m"(*var), "+d"(cmp.s.hi), "+a"(cmp.s.lo)
: "c"(with.s.hi), "b"(with.s.lo)
: "cc", "memory");
if (UNLIKELY(!ret))
*exp = cmp.ui;
return ret;
}
```
這邊著重在上述 inline assembly 的實作,參考 [6.36. Constraints for asmOperands](https://www.linuxtopia.org/online_books/programming_tool_guides/linux_using_gnu_compiler_collection/constraints.html) 、[How to specify register constraints on the Intel x86_64 register r8 to r15 in GCC inline assembly?](https://stackoverflow.com/questions/17159551/how-to-specify-register-constraints-on-the-intel-x86-64-register-r8-to-r15-in-gc) 及 [6.47 How to Use Inline Assembly Language in C Code](https://gcc.gnu.org/onlinedocs/gcc/Using-Assembly-Language-with-C.html#Using-Assembly-Language-with-C)
上述的 inline assembly 可以理解為:
> `lock cmpxchg16b [var]` (參考 [Compare and Exchange Bytes](https://www.felixcloutier.com/x86/cmpxchg8b:cmpxchg16b))
> `setz [ret]` (參考 [X86-assembly/Instructions/setz](https://www.aldeid.com/wiki/X86-assembly/Instructions/setz))
- Simple Constraints
- `m`: A memory operand is allowed, with any kind of address that the machine supports in general. Note that the letter used for the general memory constraint can be re-defined by a back end using the `TARGET_MEM_CONSTRAINT` macro.
- `q`: For x86-64 it is equivalent to `r` class. (for 8-bit instructions that do not use upper halves)
- `a`: The `a` register.
- `b`: The `b` register.
- `c`: The `c` register.
- `d`: The `d` register.
- Constraint Modifier Characters
- `=`: Means that this operand is written to by this instruction: the previous value is discarded and replaced by new data.
- `+`: Means that this operand is both read and written by the instruction.
- clobber arguments
- `cc`: The `cc` clobber indicates that the assembler code modifies the flags register. On some machines, GCC represents the condition codes as a specific hardware register; `cc` serves to name this register. On other machines, condition code handling is different, and specifying `cc` has no effect. But it is valid no matter what the target.
- `memory`: The `memory` clobber tells the compiler that the assembly code performs memory reads or writes to items other than those listed in the input and output operands (for example, accessing the memory pointed to by one of the input parameters). To ensure memory contains correct values, GCC may need to flush specific register values to memory before executing the asm. Further, the compiler does not assume that any values read from memory before an asm remain unchanged after that asm; it reloads them as needed. Using the "memory" clobber effectively forms a read/write memory barrier for the compiler.
看了以上所有的參考連結後,可以開始分析程式的邏輯
```shell
- lock cmpxchg16b
1. cmp.s.hi:cmp.s.lo 和 *var 比較
2. 如果相同,set ZF 且 with.s.hi:with.s.lo 複製到 *var
3. 如果不同,clear ZF 且 *var 複製到 cmp.s.hi:cmp.s.lo
- setz
1. 如果 zero flag 為 1 -> ret = 1
2. 如果 zero flag 為 0 -> ret = 0
```
### dequeue
和 enqueue 相同, dequeue 也分成兩種作法,分別針對 Single Consumer 和 Multiple Consumers
#### Single Consumer
以下為函式 `lfring_dequeue` 執行 single consumer 所執行的程式碼,參考 [7.5.2. Single Consumer Dequeue](https://doc.dpdk.org/guides/prog_guide/ring_lib.html) ,畫出流程圖
:::success
主要流程
1. 判斷實際可以讀取多少空間
2. 讀取資料
3. 將 `head` 更新
:::
```c=
// 實際上有多少筆資料,可以取得 n_elems 或 tail - head (資料不足)
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
if (UNLIKELY(actual <= 0)) {
/* Ring buffer is empty, scan for new but unreleased elements */
tail = find_tail(lfr, head, tail);
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
if (actual <= 0)
return 0;
}
// 將資料複製到 elems
for (uint32_t i = 0; i < (uint32_t) actual; i++)
elems[i] = lfr->ring[(head + i) & mask].ptr;
// 根據 actual 數值新增資料
smp_fence(LoadStore); // Order loads only
if (UNLIKELY(lfr->flags & LFRING_FLAG_SC)) { /* Single-consumer */
// 移動 head ,往後移動 actual 個資料
__atomic_store_n(&lfr->head, head + actual, __ATOMIC_RELAXED);
break;
}
```
從上述程式碼 `line 14` 開始分析流程,假設初始狀態如以下所示
```graphviz
digraph lfring {
splines = false;
node[shape = "record"]
queue[label = "... |<o1> obj1 | obj2 | obj3 |<empty>| ..."]
head[shape = plaintext, label = "head"]
tail[shape = plaintext, label = "tail"]
head -> queue:o1
tail -> queue:empty
}
```
往後移動 `lfr->head` 一共 `actual` 個資料,假設 `actual = 1`
```graphviz
digraph lfring {
splines = false;
node[shape = "record"]
queue[label = "... | obj1 |<o2> obj2 | obj3 |<empty>| ..."]
head[shape = plaintext, label = "head"]
tail[shape = plaintext, label = "tail"]
head -> queue:o2
tail -> queue:empty
}
```
#### [Memory Barrier](https://en.wikipedia.org/wiki/Memory_barrier)
在分析函式 `lfring_dequeue` 時,發現一個特別的函式,如以下所示
```c
// Parameters for smp_fence()
#define LoadStore 0x12
#define StoreLoad 0x21
#if defined(__x86_64__)
static inline void smp_fence(unsigned int mask)
{
if ((mask & StoreLoad) == StoreLoad) {
__asm__ volatile("mfence" ::: "memory");
} else if (mask != 0) {
/* Any fence but StoreLoad */
__asm__ volatile("" ::: "memory");
}
}
#else
#error "Unsupported architecture"
#endif
```
首先了解為什麼這邊需要 memory barrier ,參考 [從硬體觀點了解 memory barrier 的實作和效果](https://medium.com/fcamels-notes/%E5%BE%9E%E7%A1%AC%E9%AB%94%E8%A7%80%E9%BB%9E%E4%BA%86%E8%A7%A3-memry-barrier-%E7%9A%84%E5%AF%A6%E4%BD%9C%E5%92%8C%E6%95%88%E6%9E%9C-416ff0a64fc1) 有很清楚的解釋為什麼需要 memory barrier
:::success
這邊直接提出結論
- 硬體為了減少讀寫 memory 而有 cache ,有了 cache 之後就要保證 cache 之間的資料一致,但確保 cache 資料完全一致容易讓 CPU 閒置(每個 CPU 之間必需互動以確保資料一致),為了不讓 CPU 有過多的閒置,因此設計 store buffer 和 invalidate queue 減少 CPU 閒置,但卻產生 CPU 自己會讀到自己寫入的最新資料,而其它 CPU 不一定
> ![](https://i.imgur.com/yx0WD33.png)
- 為了讓其它 CPU 有需要的時候也能讀到最新的資料,針對 store buffer 和 invalidate queue 的副作用設計了 write/read memory barrier,確保關鍵的資料有依正確的順序更新
:::
因此可以得知這邊使用 memory barrier 是為了確保 queue 資料的讀取以及最後 `head` 的更新要保持正確的順序
接著參考 [Working of `__asm__ __volatile__ ("" : : : "memory")`](https://stackoverflow.com/questions/14950614/working-of-asm-volatile-memory) ,可以得知這行 [inline assembly](https://en.wikipedia.org/wiki/Inline_assembler) 產生的影響
#### Multiple Consumers
以下為函式 `lfring_dequeue` 在 multiple consumers 的情況下執行的程式碼
```c=
do {
// 實際上有多少筆資料,可以取得 n_elems 或 tail - head (資料不足)
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
if (UNLIKELY(actual <= 0)) {
/* Ring buffer is empty, scan for new but unreleased elements */
tail = find_tail(lfr, head, tail);
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
if (actual <= 0)
return 0;
}
// 將資料複製到 elems
for (uint32_t i = 0; i < (uint32_t) actual; i++)
elems[i] = lfr->ring[(head + i) & mask].ptr;
// Memory barrier
smp_fence(LoadStore); // Order loads only
if (UNLIKELY(lfr->flags & LFRING_FLAG_SC)) { /* Single-consumer */
// 移動 head ,往後移動 actual 個資料
__atomic_store_n(&lfr->head, head + actual, __ATOMIC_RELAXED);
break;
}
/* else: lock-free multi-consumer */
} while (!__atomic_compare_exchange_n(
&lfr->head, &head, /* Updated on failure */
/* XXXXX */ head + actual,
/* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
/**
* 重新判斷 lfr->head 和 head 是否相同
* 如果相同表示沒有其他執行序影響 lfr->head ,因此將 head + actual 複製到 lfr->head 並離開迴圈
* 如果不同表示已經有其他執行序修改 lfr->head ,因此將 lfr->head 複製到 head 並重新執行一次
*/
*index = (uint32_t) head;
return (uint32_t) actual;
```
迴圈的內容和 single consumer 程式碼相同,但重點在於 `line 23` 的 `while` 判斷,可以用下面的流程圖表示,假設一開始有兩個執行序 (thread1 及 thread2) 都做 dequeue 的動作,並且即將讀取相同的資料
```graphviz
digraph lfring {
splines = false;
node[shape = "record"]
queue[label = "... |<o1> obj1 | obj2 | obj3 |<empty>| ..."]
head1[shape = plaintext, label = "head1"]
head2[shape = plaintext, label = "head2"]
tail1[shape = plaintext, label = "tail1"]
tail2[shape = plaintext, label = "tail2"]
head1 -> queue:o1
head2 -> queue:o1
tail1 -> queue:empty
tail2 -> queue:empty
}
```
接著假設 thread1 的執行比較快,並且取得了資料,如下圖所示,而此時會因為 `line 23` 的關係,整個共享資料的 `lfr->head` 被修改了
```graphviz
digraph lfring {
splines = false;
node[shape = "record"]
queue[label = "... |<o1> obj1 |<o2> obj2 | obj3 |<empty>| ..."]
head1[shape = plaintext, label = "head1"]
head2[shape = plaintext, label = "head2"]
tail1[shape = plaintext, label = "tail1"]
tail2[shape = plaintext, label = "tail2"]
head1 -> queue:o2
head2 -> queue:o1
tail1 -> queue:empty
tail2 -> queue:empty
}
```
因此當 thread2 執行到 `line 23` 時,會發現 `lfr->head` 已經和原本的 `head` 不同,因此接下來的步驟變成將 `lfr->head` 更新到 `head` ,並重新執行一次取資料的動作
```graphviz
digraph lfring {
splines = false;
node[shape = "record"]
queue[label = "... | obj1 |<o2> obj2 | obj3 |<empty>| ..."]
head1[shape = plaintext, label = "head1"]
head2[shape = plaintext, label = "head2"]
tail1[shape = plaintext, label = "tail1"]
tail2[shape = plaintext, label = "tail2"]
head1 -> queue:o2
head2 -> queue:o2
tail1 -> queue:empty
tail2 -> queue:empty
}
```
最後再重新取一次資料,因此可以得知 thread1 得到 `obj1` , thread2 得到 `obj2`
```graphviz
digraph lfring {
splines = false;
node[shape = "record"]
queue[label = "... | obj1 |<o2> obj2 |<o3> obj3 |<empty>| ..."]
head1[shape = plaintext, label = "head1"]
head2[shape = plaintext, label = "head2"]
tail1[shape = plaintext, label = "tail1"]
tail2[shape = plaintext, label = "tail2"]
head1 -> queue:o2
head2 -> queue:o3
tail1 -> queue:empty
tail2 -> queue:empty
}
```
### 研讀 Linux 核心 [kfifo](https://www.kernel.org/doc/htmldocs/kernel-api/kfifo.html) 文件
:::warning
研讀ing...
:::
## [測驗 `3`](https://hackmd.io/@sysprog/linux2022-quiz8/https%3A%2F%2Fhackmd.io%2F%40sysprog%2FBy5JAmOX9)
:::info
延伸問題:
- [ ] 解釋上述程式碼運作原理,應探討 Linux 核心內部 [ptrace](https://man7.org/linux/man-pages/man2/ptrace.2.html) 系統呼叫和 [signal](https://man7.org/linux/man-pages/man7/signal.7.html) 的實作方式
- [ ] 研讀 [The race to limit ptrace](https://www.rezilion.com/blog/the-race-to-limit-ptrace/) 一類的材料,探討行程避免被追蹤的手法
:::
### 掛載 `dont_trace` 核心模組
掛載 `dont_trace` 核心模組時,會執行函式 `dont_trace_init`
```c
static int __init dont_trace_init(void)
{
// 建立一個 workqueue
wq = create_workqueue(DONT_TRACE_WQ_NAME);
// queue work on a workqueue after delay
queue_delayed_work(wq, &dont_trace_task, JIFFIES_DELAY);
loaded = true;
pr_info("Loaded!\n");
return 0;
}
```
首先使用函式 `create_workqueue` 建立 workqueue ,並且使用函式 [`queue_delayed_work`](https://docs.huihoo.com/linux/kernel/2.6.26/kernel-api/re63.html) 使在 workqueue 執行的工作延遲 `JIFFIES_DELAY` 個 [jiffy](https://en.wikipedia.org/wiki/Jiffy_(time)) ,而 linux 核心所指的 jiffy 則是指觸發計時中斷所間隔的時間
```c
int queue_delayed_work (struct workqueue_struct *wq,
struct delayed_work *dwork,
unsigned long delay);
```
接著有一個特別的參數 `dont_trace_task` ,可以發現模組使用巨集 `DECLARE_DELAYED_WORK` 建立延時工作任務
```c
static DECLARE_DELAYED_WORK(dont_trace_task, periodic_routine);
```
而巨集 `DECLARE_DELAYED_WORK` 可以從 [include/linux/workqueue.h](https://github.com/torvalds/linux/blob/master/include/linux/workqueue.h) 找到,以下是相關實作
```c
#define __WORK_INITIALIZER(n, f) { \
.data = WORK_DATA_STATIC_INIT(), \
.entry = { &(n).entry, &(n).entry }, \
.func = (f), \
__WORK_INIT_LOCKDEP_MAP(#n, &(n)) \
}
#define __DELAYED_WORK_INITIALIZER(n, f, tflags) { \
.work = __WORK_INITIALIZER((n).work, (f)), \
.timer = __TIMER_INITIALIZER(delayed_work_timer_fn,\
(tflags) | TIMER_IRQSAFE), \
}
#define DECLARE_DELAYED_WORK(n, f) \
struct delayed_work n = __DELAYED_WORK_INITIALIZER(n, f, 0)
```
雖然沒有很清楚整體的實作,但是可以稍微得知巨集 `DECLARE_DELAYED_WORK` 對模組的影響
- 建立結構 `struct delayed_work dont_trace_task` 作為工作儲存在 workqueue
- 將函式 `periodic_routine` 設定為工作會執行的函式
### 執行 `dont_trace` 核心模組
掛載 `dont_trace` 核心模組後,會不斷執行函式 `periodic_routine` ,正是題目要填補的程式碼,以下為回答的結果
```c
static void periodic_routine(struct work_struct *ws)
{
if (likely(/* XXXXX: Implement */loaded))
check();
/* XXXXX: Implement */;
schedule_delayed_work(&dont_trace_task, JIFFIES_DELAY);
}
```
首先判斷變數 `load` 是否為真,如果為真則直接進入函式 `check` ,主要是用來判斷是否有程式正在被監控,接著就是使用函式 [`schedule_delayed_work`](https://docs.huihoo.com/linux/kernel/2.6.26/kernel-api/re69.html) 重新將任務放回 workqueue
```c
int schedule_delayed_work ( struct delayed_work *dwork,
unsigned long delay);
```
接著蠻好奇這個核心模組是如何知道其他程式是否被監控的,從函式 `check` 開始研究
```c
static void check(void)
{
struct task_struct *task;
for_each_process (task) {
if (!is_tracer(&task->ptraced))
continue;
kill_tracee(&task->ptraced);
kill_task(task); /* Kill the tracer once all tracees are killed */
}
}
```
首先是巨集 `for_each_process` ,從名子可以知道是用來走訪所有程式,其實作位於 [include/linux/sched/signal.h](https://github.com/torvalds/linux/blob/master/include/linux/sched/signal.h) ,很明顯是從第一個程式 `init_task` 一路走訪
```c
#define for_each_process(p) \
for (p = &init_task ; (p = next_task(p)) != &init_task ; )
```
在討論函式 `is_tracer` 之前,先研究結構 `struct task_struct` 成員 `ptraced` 的功能,根據題目敘述
> Once any process starts “ptracing” another, the tracee gets added into ptraced that’s located in task_struct, which is simply a linked list that contains all the tracees that the process is “ptracing”.
當有任何的程式開始追蹤其他的程式,此時管理被追蹤程式的 `task_struct` 成員 `ptraced` 就會被接到追蹤者上,可以觀察 [include/linux/sched.h](https://github.com/torvalds/linux/blob/master/include/linux/sched.h) 裡 `ptraced` 的定義
```c
struct task_struct {
...
/*
* 'ptraced' is the list of tasks this task is using ptrace() on.
*
* This includes both natural children and PTRACE_ATTACH targets.
* 'ptrace_entry' is this task's link on the p->parent->ptraced list.
*/
struct list_head ptraced;
struct list_head ptrace_entry;
...
}
```
很明顯可以看到 `ptraced` 是一個 linked list ,所有被追蹤的程式都會由這個 linked list 所管理
接著分析函式 `is_tracer` ,藉由走訪整個 `ptraced` 的 linked list 判斷該程式是否追蹤其他的程式
```c
/* @return true if the process has tracees */
static bool is_tracer(struct list_head *children)
{
struct list_head *list;
list_for_each (list, children) {
struct task_struct *task =
list_entry(list, struct task_struct, ptrace_entry);
if (task)
return true;
}
return false;
}
```
函式 `kill_tracee` 則是當發現有程式正在追蹤其他程式時,使用函式 `kill_task` 將上述提到的 linked list 所含有的程式中止
```c
/* Traverse the element in the linked list of the ptraced proccesses and
* finally kills them.
*/
static void kill_tracee(struct list_head *children)
{
struct list_head *list;
list_for_each (list, children) {
struct task_struct *task_ptraced =
list_entry(list, struct task_struct, ptrace_entry);
pr_info("ptracee -> comm: %s, pid: %d, gid: %d, ptrace: %d\n",
task_ptraced->comm, task_ptraced->pid, task_ptraced->tgid,
task_ptraced->ptrace);
kill_task(task_ptraced);
}
}
```
而函式 `kill_task` 則是送出 `SIGKILL` 將目標程式中止
```c
/* Send SIGKILL from kernel space */
static void kill_task(struct task_struct *task)
{
send_sig(SIGKILL, task, 1);
}
```
### Linux 核心裡的 [ptrace](https://man7.org/linux/man-pages/man2/ptrace.2.html) 系統