contributed by < Risheng1128
>
1
延伸問題:
memchr_opt
,設計實驗來觀察隨著字串長度和特定 pattern 變化的效能影響以下為分析的完整程式碼
memchr_opt
void *memchr_opt(const void *src_void, int c, size_t length)
{
const unsigned char *src = (const unsigned char *) src_void;
unsigned char d = c;
// 如果 src 的地址沒有對齊 8 byte ,就一個一個字元找
while (UNALIGNED(src)) {
if (!length--)
return NULL;
if (*src == d)
return (void *) src;
src++;
}
// 表示 length > LBLOCKSIZE
if (!TOO_SMALL(length)) {
/* If we get this far, we know that length is large and
* src is word-aligned.
*/
/* The fast code reads the source one word at a time and only performs
* the bytewise search on word-sized segments if they contain the search
* character, which is detected by XORing the word-sized segment with a
* word-sized block of the search character and then detecting for the
* presence of NULL in the result.
*/
unsigned long *asrc = (unsigned long *) src;
// 建立 Mask ,讓 mask 的每個 byte 都是目標字元
unsigned long mask = d << 8 | d;
mask = mask << 16 | mask;
for (unsigned int i = 32; i < LBLOCKSIZE * 8; i <<= 1)
mask = (mask << i) | mask;
while (length >= LBLOCKSIZE) {
/* XXXXX: Your implementation should appear here */
unsigned long tmp;
// 包含 NULL ,表示已經找到目標字元
if ((tmp = DETECT_CHAR(*asrc, mask))) {
// 計算 asrc 要移動多少 byte
unsigned long shift = __builtin_ctzl(tmp) >> 3;
// 回傳 asrc 移動後指到的地址
return (char *) asrc + shift;
}
// asrc 指到下 8 個 byte 的位置
asrc++;
// length 剪掉 LBLOCKSIZE 的長度
length -= LBLOCKSIZE;
}
/* If there are fewer than LBLOCKSIZE characters left, then we resort to
* the bytewise loop.
*/
src = (unsigned char *) asrc;
}
// 最後當長度小於 LBLOCKSIZE 時如果還沒有找到目標字元,剩下的部份一個一個找
while (length--) {
if (*src == d)
return (void *) src;
src++;
}
return NULL;
}
接著將程式碼拆開來一一解釋
/* Nonzero if either X or Y is not aligned on a "long" boundary */
#define UNALIGNED(X) ((long) X & (sizeof(long) - 1))
// 如果 src 的地址沒有對齊 8 byte ,就一個一個字元找
while (UNALIGNED(src)) {
if (!length--)
return NULL;
if (*src == d)
return (void *) src;
src++;
}
根據巨集函式 UNALIGNED
可以發現其目的是判斷資料的地址是否對齊 8
位元組,因此上面的迴圈是要判斷 src
所指到的地址是否對齊 8
位元組,如果是則不會進迴圈,否則會進迴圈
/* Threshhold for punting to the bytewise iterator */
#define TOO_SMALL(LEN) ((LEN) < LBLOCKSIZE)
if (!TOO_SMALL(length)) {
/* If we get this far, we know that length is large and
* src is word-aligned.
*/
判斷字串的長度是否有大於 LBLOCKSZE
,這邊的 LBLOCKSZE
為 long
的大小
// 建立 Mask ,讓 mask 的每個 byte 都是目標字元
unsigned long mask = d << 8 | d;
mask = mask << 16 | mask;
for (unsigned int i = 32; i < LBLOCKSIZE * 8; i <<= 1)
mask = (mask << i) | mask;
建立 mask
,結果為一個 unsigned long
的大小,且每個位元組都是 d
的數值
接著是題目的要求
while (length >= LBLOCKSIZE) {
/* XXXXX: Your implementation should appear here */
unsigned long tmp;
// 包含 NULL ,表示已經找到目標字元
if ((tmp = DETECT_CHAR(*asrc, mask))) {
// 計算 asrc 要移動多少 byte
unsigned long shift = __builtin_ctzl(tmp) >> 3;
// 回傳 asrc 移動後指到的地址
return (char *) asrc + shift;
}
// asrc 指到下 8 個 byte 的位置
asrc++;
// length 剪掉 LBLOCKSIZE 的長度
length -= LBLOCKSIZE;
}
使用巨集函式 DETECT_CHAR
搜尋是否有目標字元出現,如果有的話利用 __builtin_ctzl
計算是位於第幾個位元組,最後回傳移動後的地址
// 最後當長度小於 LBLOCKSIZE 時如果還沒有找到目標字元,剩下的部份一個一個找
while (length--) {
if (*src == d)
return (void *) src;
src++;
}
return NULL;
而最後的情況則是如果原本的字串長度小於 8 ,或是找到長度小於 8 後還沒找到目標字元,就會執行上述程式碼,直接一個一個找
Linux 核心原本版本位於 lib/string.c ,以下為原始碼實作
/**
* memchr - Find a character in an area of memory.
* @s: The memory area
* @c: The byte to search for
* @n: The size of the area.
*
* returns the address of the first occurrence of @c, or %NULL
* if @c is not found
*/
void *memchr(const void *s, int c, size_t n)
{
const unsigned char *p = s;
while (n-- != 0) {
if ((unsigned char)c == *p++) {
return (void *)(p - 1);
}
}
return NULL;
}
很明顯, Linux 核心原始碼的 memchr
是使用一個一個找字元的方法
接著參考同學 arthurchang09 的開發紀錄,採用類似的方法進行測量
core 0
執行程式-O0
1000
的字元陣列,每個元素初始化為字元 0
,目標字元 4
會逐一放在第 0 個字元、第 1 個字元直到第 999 字元為 \0
clock_gettime
測量找到目標字元的時間以下為測試程式碼
time.c
#define STRSIZE 1000U
long long elapse(struct timespec *start, struct timespec *end)
{
return (long long) (end->tv_sec - start->tv_sec) * 1e9 + (long long) (end->tv_nsec - start->tv_nsec);
}
int main(void)
{
char str[STRSIZE];
const char target = '4';
struct timespec start, end;
long long origin_time = 0, opt_time;
memset(str, '0', STRSIZE - 1);
str[STRSIZE - 1] = '\0';
for (int i = 0; i < STRSIZE - 1; i++) {
str[i] = target;
clock_gettime(CLOCK_MONOTONIC, &start);
memchr_origin(str, target, STRSIZE - 1);
clock_gettime(CLOCK_MONOTONIC, &end);
origin_time = elapse(&start, &end);
clock_gettime(CLOCK_MONOTONIC, &start);
memchr_opt(str, target, STRSIZE - 1);
clock_gettime(CLOCK_MONOTONIC, &end);
opt_time = elapse(&start, &end);
printf("%lld %lld\n", origin_time, opt_time);
origin_time = opt_time = 0;
str[i] = '0';
}
return 0;
}
並且在檔案 eliminate.py
進行極端值的刪除
eliminate.py
import subprocess
import numpy as np
def process(datas, samples, threshold = 2):
datas = np.array(datas)
res = np.zeros((samples, 2))
# 分別計算 time_origin 及 time_opt 的平均
mean = [datas[:,0].mean(), datas[:,1].mean()]
# 分別計算 time_origin 及 time_opt 的標準差
std = [datas[:,0].std(), datas[:,1].std()]
cnt = 0 # 計算有幾組資料被捨去
for i in range(samples):
for j in range(runs):
tmp = np.abs((datas[j * samples + i] - mean) / std) # 計算資料是多少標準差
# 如果某一組的某個資料過大,整組捨去
if tmp[0] > threshold or tmp[1] > threshold:
cnt += 1
datas[j * samples + i] = [0, 0]
res[i] += datas[j * samples + i]
res[i] /= (runs - cnt) # 剩下的資料取平均
cnt = 0 # count 歸 0
return res
if __name__ == '__main__':
runs = 50 # 執行次數
samples = 999 # 一次有幾個點
datas = []
for i in range(runs):
# 執行採樣
subprocess.run('sudo taskset 0x1 ./time > out', shell = True)
# 讀取資料
fr = open('out', 'r')
for line in fr.readlines():
datas.append(line.split(' ', 2))
fr.close()
datas = np.array(datas).astype(np.double)
# 存回資料
np.savetxt('final', process(datas, samples))
最後的測試結果
可以發現我們所使用的 mem_opt
比 Linux 核心的 memchr
快了非常多
void *memchr(const void *cs, int c, size_t count)
{
int d0;
void *res;
if (!count)
return NULL;
asm volatile("repne\n\t"
"scasb\n\t"
"je 1f\n\t"
"movl $1,%0\n"
"1:\tdecl %0"
: "=D" (res), "=&c" (d0)
: "a" (c), "0" (cs), "1" (count)
: "memory");
return res;
}
參考 Repeat String Operation Prefix 及 Scan String
REPNE SCAS m8
: Find AL
, starting at ES:[(E)DI]
or [RDI]
SCASB
: Compare AL
with byte at ES:(E)DI
or RDI
then set status flags從上述的組合語言,可以大致推論實作方法為一個一個字元比較,直到找到目標為止,和 Linux 核心無平台的方法相似
接著可以開始測量 x86-64 實作的效能,首先編譯產生以下錯誤
memchr.h: Assembler messages:
memchr.h:50: Error: incorrect register `%rdi' used with `l' suffix
memchr.h:51: Error: incorrect register `%rdi' used with `l' suffix
將 movl
及 decl
分別改成 movq
及 decq
即可,接著為測量結果,使用的方法和上面的方式一樣,以下為測量結果
可以看到測量的結果為 opt
> x86-64
> origin
/*
* Find a character in an area of memory.
*
* Parameters:
* x0 - buf
* x1 - c
* x2 - n
* Returns:
* x0 - address of first occurrence of 'c' or 0
*/
#define L(label) .L ## label
#define REP8_01 0x0101010101010101
#define REP8_7f 0x7f7f7f7f7f7f7f7f
#define srcin x0
#define chrin w1
#define cntin x2
#define result x0
#define wordcnt x3
#define rep01 x4
#define repchr x5
#define cur_word x6
#define cur_byte w6
#define tmp x7
#define tmp2 x8
.p2align 4
nop
SYM_FUNC_START(__pi_memchr)
and chrin, chrin, #0xff // chrin = chrin & 0xff
lsr wordcnt, cntin, #3 // wordcnt = cntin >> 3
cbz wordcnt, L(byte_loop) // 如果 wordcnt 為 0 ,表示字串長度小於 8 ,跳進 byte_loop
mov rep01, #REP8_01 // rep01 = 0x0101010101010101
mul repchr, x1, rep01 // repchr = x1 * rep01 (建立 mask)
and cntin, cntin, #7 // cntin = cntin & 7
L(word_loop):
ldr cur_word, [srcin], #8 // LDR (post-indexing) cur_word = *srcin; srcin += 8;
sub wordcnt, wordcnt, #1 // wordcnt = wordcnt - 1
eor cur_word, cur_word, repchr // cur_word = cur_word ^ repchr (用來尋找目標字元,發現的話該位元組位置數值 0x00)
sub tmp, cur_word, rep01 // tmp = cur_word - rep01
orr tmp2, cur_word, #REP8_7f // tmp2 = cur_word | 0x7f7f7f7f7f7f7f7f
bics tmp, tmp, tmp2 // (Bit clear) tmp = tmp & ~tmp2
b.ne L(found_word) // 判斷是否找到目標字元,有就進到 found_word
cbnz wordcnt, L(word_loop) // 如果 wordcnt 還沒有為 0 ,就跳進 word_loop
L(byte_loop):
cbz cntin, L(not_found) // 如果 cntin 為 0 ,表示沒有字元,跳進 not_found
ldrb cur_byte, [srcin], #1 // b: unsigned byte | cur_byte = *srcin; srcin++;
sub cntin, cntin, #1 // cntin--
cmp cur_byte, chrin // 判斷字元是否相同
b.ne L(byte_loop) // 如果不同就回到 byte_loop
sub srcin, srcin, #1 // srcin--; 因為 ldrb 已經往後移動 srcin 指到的位址,因此需要往前移動一個 byte
ret
L(found_word):
CPU_LE( rev tmp, tmp) // 判斷是否要改變 byte order
clz tmp, tmp // count leading zeros
sub tmp, tmp, #64 // tmp = tmp - 64
add result, srcin, tmp, asr #3 # result = srcin + tmp >> 3
ret
L(not_found):
mov result, #0 // 回傳 0
ret
參考 Arm A64 Instruction Set Architecture 得到上述的分析結果,發現在 ARM64 的實作和小考的題目相似都是使用 SWAR 的手法解決
line 38
對應小考的變數 mask
差在 ARM64 使用 mul
指令,而小考則是用迴圈加上 bitwise 實作line 40 ~ 49
開始則是使用 SWAR 的方法,一樣是使用 8 byte 進行計算,接著稍微分析和小考的差別
小考:
#define DETECT_CHAR(X, MASK) (DETECT_NULL(X ^ MASK))
#define DETECT_NULL(X) (((X) -0x0101010101010101) & ~(X) & 0x8080808080808080)
可以得到 DETECT_NULL 的回傳值為 (((X ^ MASK) -0x0101010101010101) & ~(X ^ MASK) & 0x8080808080808080)
ARM64:
eor cur_word, cur_word, repchr -> x ^ MASK
sub tmp, cur_word, rep01 -> (x ^ MASK) - 0x0101010101010101
orr tmp2, cur_word, #REP8_7f -> (x ^ MASK) | 0x7f7f7f7f7f7f7f7f
bics tmp, tmp, tmp2 -> ((x ^ MASK) - 0x0101010101010101) & ~((x ^ MASK) | 0x7f7f7f7f7f7f7f7f)
將上述的式子使用德摩根定律即可得到 ((x ^ MASK) - 0x0101010101010101) & (~(x ^ MASK) & 0x8080808080808080)
因此兩種方式是一樣的
line 49
對應到小考最後的迴圈,當字串長度不足 8 時就會執行該迴圈,並使用一個一個字串判斷line 57
則是使用 SWAR 找到字元後如何取得字元地址的實作,對應到小考所要求的部份,兩者主要差別在於 ARM64 是使用指令 clz
計算 Leading zero ,而小考的部份我是使用函式 __builtin_ctzl
計算 Trailing zero2
延伸問題:
git log include/linux/kfifo.h lib/kfifo.c
並觀察修改記錄
spin_unlock_irqrestore
的使用lfring
移植到 Linux 核心,並提供對應的測試及效能評比程式DDD
: idx++
KKK
: mask + 1
TTT
: &lfr->ring[tail & mask].idx, __ATOMIC_RELAXED
HHH
: head + actual
參考 6.55 Built-in Functions for Memory Model Aware Atomic Operations 得到以下
__ATOMIC_RELAXED
: Implies no inter-thread ordering constraints.__ATOMIC_CONSUME
: This is currently implemented using the stronger __ATOMIC_ACQUIRE
memory order because of a deficiency in C++11’s semantics for memory_order_consume.__ATOMIC_ACQUIRE
: Creates an inter-thread happens-before constraint from the release (or stronger) semantic store to this acquire load. Can prevent hoisting of code to before the operation.__ATOMIC_RELEASE
: Creates an inter-thread happens-before constraint to acquire (or stronger) semantic loads that read from this release store. Can prevent sinking of code to after the operation.__ATOMIC_ACQ_REL
: Combines the effects of both __ATOMIC_ACQUIRE
and __ATOMIC_RELEASE
.__ATOMIC_SEQ_CST
: Enforces total ordering with all other __ATOMIC_SEQ_CST
operations.enum {
LFRING_FLAG_MP = 0x0000 /* Multiple producer */,
LFRING_FLAG_SP = 0x0001 /* Single producer */,
LFRING_FLAG_MC = 0x0000 /* Multi consumer */,
LFRING_FLAG_SC = 0x0002 /* Single consumer */,
};
int main(void)
{
printf("testing MPMC lock-free ring\n");
test_ringbuffer(LFRING_FLAG_MP | LFRING_FLAG_MC);
printf("testing MPSC lock-free ring\n");
test_ringbuffer(LFRING_FLAG_MP | LFRING_FLAG_SC);
printf("testing SPMC lock-free ring\n");
test_ringbuffer(LFRING_FLAG_SP | LFRING_FLAG_MC);
printf("testing SPSC lock-free ring\n");
test_ringbuffer(LFRING_FLAG_SP | LFRING_FLAG_SC);
return 0;
}
根據上述程式碼,可以得知程式碼測試是由 MPMC → MPSC → SPMC → SPSC 的順序測試
struct element {
void *ptr;
uintptr_t idx;
};
struct lfring {
ringidx_t head;
ringidx_t tail ALIGNED(CACHE_LINE);
uint32_t mask;
uint32_t flags;
struct element ring[] ALIGNED(CACHE_LINE);
} ALIGNED(CACHE_LINE);
lfring_t *lfring_alloc(uint32_t n_elems, uint32_t flags)
{
...
size_t nbytes = sizeof(lfring_t) + ringsz * sizeof(struct element);
lfring_t *lfr = osal_alloc(nbytes, CACHE_LINE);
if (!lfr)
return NULL;
lfr->head = 0, lfr->tail = 0;
lfr->mask = ringsz - 1;
lfr->flags = flags;
for (ringidx_t i = 0; i < ringsz; i++) {
lfr->ring[i].ptr = NULL;
lfr->ring[i].idx = i - ringsz;
}
return lfr;
}
從上述的宣告,可以畫出以下連續記憶體的結構,如下圖所示
digraph lfring {
rankdir = LR;
splines = false;
node[shape = "record"]
lfring[label = "<h>head (對齊 CACHE_LINE) | tail (對齊 CACHE_LINE) | mask | flags | ring[0] (對齊 CACHE_LINE) | ring[1] | ... | ring[ringsz - 1]"]
lfr[shape = plaintext, label = "lfr"]
lfr -> lfring:h
}
參考 Ring Library , enqueue 分成兩種作法,分別針對 Single Producer 和 Multiple Producers
以下為函式 lfring_enqueue
執行 single producer 的部份,搭配 7.5.1. Single Producer Enqueue 可以畫出實作流程圖
主要流程
tail
更新
if (lfr->flags & LFRING_FLAG_SP) { /* single-producer */
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE);
// 實際上剩下多少空間,可容納 n_elems 或是 head + size - tail (容量不足時)
actual = MIN((intptr_t)(head + size - tail), (intptr_t) n_elems);
if (actual <= 0)
return 0;
// 根據 actual 數值新增資料
for (uint32_t i = 0; i < (uint32_t) actual; i++) {
assert(lfr->ring[tail & mask].idx == tail - size);
lfr->ring[tail & mask].ptr = *elems++;
lfr->ring[tail & mask].idx = tail;
tail++;
}
// 將 tail 更新
__atomic_store_n(&lfr->tail, tail, __ATOMIC_RELEASE);
// 回傳實際儲存多少筆資料
return (uint32_t) actual;
}
分析上述程式碼 line 8
開始的迴圈,假設一開始狀態為下圖
digraph lfring {
splines = false;
node[shape = "record"]
queue[label = "... |<o1> obj1 | obj2 | obj3 |<empty>| ..."]
head[shape = plaintext, label = "head"]
tail[shape = plaintext, label = "tail"]
head -> queue:o1
tail -> queue:empty
}
接著將資料複製到 tail
所在的空間
digraph lfring {
splines = false;
node[shape = "record"]
queue[label = "... |<o1> obj1 | obj2 | obj3 |<o4> obj4| ..."]
head[shape = plaintext, label = "head"]
tail[shape = plaintext, label = "tail"]
head -> queue:o1
tail -> queue:o4
}
接著 tail
移動到下一個陣列位置
digraph lfring {
splines = false;
node[shape = "record"]
queue[label = "... |<o1> obj1 | obj2 | obj3 | obj4|<empty>| ..."]
head[shape = plaintext, label = "head"]
tail[shape = plaintext, label = "tail"]
head -> queue:o1
tail -> queue:empty
}
重複上述的過程直到一共執行 actual
次
以下為函式 lfring_enqueue
在 Multiple producers 的情況所執行的程式碼
restart:
while ((uint32_t) actual < n_elems &&
before(tail, __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE) + size)) {
union {
struct element e;
ptrpair_t pp;
} old, neu;
void *elem = elems[actual];
struct element *slot = &lfr->ring[tail & mask];
old.e.ptr = __atomic_load_n(&slot->ptr, __ATOMIC_RELAXED);
old.e.idx = __atomic_load_n(&slot->idx, __ATOMIC_RELAXED);
do {
if (UNLIKELY(old.e.idx != tail - size)) {
if (old.e.idx != tail) {
/* We are far behind. Restart with fresh index */
tail = cond_reload(tail, &lfr->tail);
goto restart;
}
/* slot already enqueued */
tail++; /* Try next slot */
goto restart;
}
/* Found slot that was used one lap back.
* Try to enqueue next element.
*/
neu.e.ptr = elem;
neu.e.idx = tail; /* Set idx on enqueue */
} while (!lf_compare_exchange((ptrpair_t *) slot, &old.pp, neu.pp));
/**
* 重新判斷 slot 和 old.pp 是否相同
* 如果相同表示沒有其他執行序影響 slot ,因此將 neu.pp 複製到 slot 並離開迴圈
* 如果不同表示已經有其他執行序修改 slot ,因此將 slot 複製到 old.pp 並重新執行一次
*/
/* Enqueue succeeded */
actual++;
tail++; /* Continue with next slot */
}
(void) cond_update(&lfr->tail, tail);
return (uint32_t) actual;
和 single producer 不同的地方在於, multiple producers 考慮到不同執行緒互相影響時要怎麼應對,以下使用流程圖表示,假設有兩個執行續 (Thread1 及 Thread2) 同時要執行 enqueue
digraph lfring {
splines = false;
node[shape = "record"]
queue[label = "... |<o1> obj1 | obj2 | obj3 |<empty>| ..."]
head1[shape = plaintext, label = "head1"]
head2[shape = plaintext, label = "head2"]
tail1[shape = plaintext, label = "tail1"]
tail2[shape = plaintext, label = "tail2"]
head1 -> queue:o1
head2 -> queue:o1
tail1 -> queue:empty
tail2 -> queue:empty
}
假設 Thread1 先成功加入資料,因此會更新共享資料的 lfr->tail
,如下圖所示
digraph lfring {
splines = false;
node[shape = "record"]
queue[label = "... |<o1> obj1 | obj2 | obj3 |<o4> obj4 |<empty>| ..."]
head1[shape = plaintext, label = "head1"]
head2[shape = plaintext, label = "head2"]
tail1[shape = plaintext, label = "tail1"]
tail2[shape = plaintext, label = "tail2"]
head1 -> queue:o1
head2 -> queue:o1
tail1 -> queue:empty
tail2 -> queue:o4
}
此時根據上述程式的 line 29
,當 Thread2 要進行加入資料的動作時,會發現 slot
已經不是最一開始讀取到的 slot
,因此會將 slot
複製到 old.pp
並重新再執行一次 enqueue ,如下圖所示
digraph lfring {
splines = false;
node[shape = "record"]
queue[label = "... |<o1> obj1 | obj2 | obj3 |<o4> obj4 |<empty>| ..."]
head1[shape = plaintext, label = "head1"]
head2[shape = plaintext, label = "head2"]
tail1[shape = plaintext, label = "tail1"]
tail2[shape = plaintext, label = "tail2"]
head1 -> queue:o1
head2 -> queue:o1
tail1 -> queue:empty
tail2 -> queue:empty
}
lf_compare_exchange
union u128 {
struct {
uint64_t lo, hi;
} s;
__int128 ui;
};
static inline bool lf_compare_exchange(register __int128 *var,
__int128 *exp,
__int128 neu)
{
union u128 cmp = {.ui = *exp}, with = {.ui = neu};
bool ret;
/**
* 1. cmp.s.hi:cmp.s.lo 和 *var 比較
* 2. 如果相同,就 set ZF 且 with.s.hi:with.s.lo 複製到 *var
* 3. 如果不同,就 clear ZF 且 *var 複製到 cmp.s.hi:cmp.s.lo
*/
__asm__ __volatile__("lock cmpxchg16b %1\n\tsetz %0"
: "=q"(ret), "+m"(*var), "+d"(cmp.s.hi), "+a"(cmp.s.lo)
: "c"(with.s.hi), "b"(with.s.lo)
: "cc", "memory");
if (UNLIKELY(!ret))
*exp = cmp.ui;
return ret;
}
這邊著重在上述 inline assembly 的實作,參考 6.36. Constraints for asmOperands 、How to specify register constraints on the Intel x86_64 register r8 to r15 in GCC inline assembly? 及 6.47 How to Use Inline Assembly Language in C Code
上述的 inline assembly 可以理解為:
lock cmpxchg16b [var]
(參考 Compare and Exchange Bytes)
setz [ret]
(參考 X86-assembly/Instructions/setz)
Simple Constraints
m
: A memory operand is allowed, with any kind of address that the machine supports in general. Note that the letter used for the general memory constraint can be re-defined by a back end using the TARGET_MEM_CONSTRAINT
macro.q
: For x86-64 it is equivalent to r
class. (for 8-bit instructions that do not use upper halves)a
: The a
register.b
: The b
register.c
: The c
register.d
: The d
register.Constraint Modifier Characters
=
: Means that this operand is written to by this instruction: the previous value is discarded and replaced by new data.+
: Means that this operand is both read and written by the instruction.clobber arguments
cc
: The cc
clobber indicates that the assembler code modifies the flags register. On some machines, GCC represents the condition codes as a specific hardware register; cc
serves to name this register. On other machines, condition code handling is different, and specifying cc
has no effect. But it is valid no matter what the target.memory
: The memory
clobber tells the compiler that the assembly code performs memory reads or writes to items other than those listed in the input and output operands (for example, accessing the memory pointed to by one of the input parameters). To ensure memory contains correct values, GCC may need to flush specific register values to memory before executing the asm. Further, the compiler does not assume that any values read from memory before an asm remain unchanged after that asm; it reloads them as needed. Using the "memory" clobber effectively forms a read/write memory barrier for the compiler.看了以上所有的參考連結後,可以開始分析程式的邏輯
- lock cmpxchg16b
1. cmp.s.hi:cmp.s.lo 和 *var 比較
2. 如果相同,set ZF 且 with.s.hi:with.s.lo 複製到 *var
3. 如果不同,clear ZF 且 *var 複製到 cmp.s.hi:cmp.s.lo
- setz
1. 如果 zero flag 為 1 -> ret = 1
2. 如果 zero flag 為 0 -> ret = 0
和 enqueue 相同, dequeue 也分成兩種作法,分別針對 Single Consumer 和 Multiple Consumers
以下為函式 lfring_dequeue
執行 single consumer 所執行的程式碼,參考 7.5.2. Single Consumer Dequeue ,畫出流程圖
主要流程
head
更新
// 實際上有多少筆資料,可以取得 n_elems 或 tail - head (資料不足)
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
if (UNLIKELY(actual <= 0)) {
/* Ring buffer is empty, scan for new but unreleased elements */
tail = find_tail(lfr, head, tail);
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
if (actual <= 0)
return 0;
}
// 將資料複製到 elems
for (uint32_t i = 0; i < (uint32_t) actual; i++)
elems[i] = lfr->ring[(head + i) & mask].ptr;
// 根據 actual 數值新增資料
smp_fence(LoadStore); // Order loads only
if (UNLIKELY(lfr->flags & LFRING_FLAG_SC)) { /* Single-consumer */
// 移動 head ,往後移動 actual 個資料
__atomic_store_n(&lfr->head, head + actual, __ATOMIC_RELAXED);
break;
}
從上述程式碼 line 14
開始分析流程,假設初始狀態如以下所示
digraph lfring {
splines = false;
node[shape = "record"]
queue[label = "... |<o1> obj1 | obj2 | obj3 |<empty>| ..."]
head[shape = plaintext, label = "head"]
tail[shape = plaintext, label = "tail"]
head -> queue:o1
tail -> queue:empty
}
往後移動 lfr->head
一共 actual
個資料,假設 actual = 1
digraph lfring {
splines = false;
node[shape = "record"]
queue[label = "... | obj1 |<o2> obj2 | obj3 |<empty>| ..."]
head[shape = plaintext, label = "head"]
tail[shape = plaintext, label = "tail"]
head -> queue:o2
tail -> queue:empty
}
在分析函式 lfring_dequeue
時,發現一個特別的函式,如以下所示
// Parameters for smp_fence()
#define LoadStore 0x12
#define StoreLoad 0x21
#if defined(__x86_64__)
static inline void smp_fence(unsigned int mask)
{
if ((mask & StoreLoad) == StoreLoad) {
__asm__ volatile("mfence" ::: "memory");
} else if (mask != 0) {
/* Any fence but StoreLoad */
__asm__ volatile("" ::: "memory");
}
}
#else
#error "Unsupported architecture"
#endif
首先了解為什麼這邊需要 memory barrier ,參考 從硬體觀點了解 memory barrier 的實作和效果 有很清楚的解釋為什麼需要 memory barrier
這邊直接提出結論
因此可以得知這邊使用 memory barrier 是為了確保 queue 資料的讀取以及最後 head
的更新要保持正確的順序
接著參考 Working of __asm__ __volatile__ ("" : : : "memory")
,可以得知這行 inline assembly 產生的影響
以下為函式 lfring_dequeue
在 multiple consumers 的情況下執行的程式碼
do {
// 實際上有多少筆資料,可以取得 n_elems 或 tail - head (資料不足)
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
if (UNLIKELY(actual <= 0)) {
/* Ring buffer is empty, scan for new but unreleased elements */
tail = find_tail(lfr, head, tail);
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
if (actual <= 0)
return 0;
}
// 將資料複製到 elems
for (uint32_t i = 0; i < (uint32_t) actual; i++)
elems[i] = lfr->ring[(head + i) & mask].ptr;
// Memory barrier
smp_fence(LoadStore); // Order loads only
if (UNLIKELY(lfr->flags & LFRING_FLAG_SC)) { /* Single-consumer */
// 移動 head ,往後移動 actual 個資料
__atomic_store_n(&lfr->head, head + actual, __ATOMIC_RELAXED);
break;
}
/* else: lock-free multi-consumer */
} while (!__atomic_compare_exchange_n(
&lfr->head, &head, /* Updated on failure */
/* XXXXX */ head + actual,
/* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
/**
* 重新判斷 lfr->head 和 head 是否相同
* 如果相同表示沒有其他執行序影響 lfr->head ,因此將 head + actual 複製到 lfr->head 並離開迴圈
* 如果不同表示已經有其他執行序修改 lfr->head ,因此將 lfr->head 複製到 head 並重新執行一次
*/
*index = (uint32_t) head;
return (uint32_t) actual;
迴圈的內容和 single consumer 程式碼相同,但重點在於 line 23
的 while
判斷,可以用下面的流程圖表示,假設一開始有兩個執行序 (thread1 及 thread2) 都做 dequeue 的動作,並且即將讀取相同的資料
digraph lfring {
splines = false;
node[shape = "record"]
queue[label = "... |<o1> obj1 | obj2 | obj3 |<empty>| ..."]
head1[shape = plaintext, label = "head1"]
head2[shape = plaintext, label = "head2"]
tail1[shape = plaintext, label = "tail1"]
tail2[shape = plaintext, label = "tail2"]
head1 -> queue:o1
head2 -> queue:o1
tail1 -> queue:empty
tail2 -> queue:empty
}
接著假設 thread1 的執行比較快,並且取得了資料,如下圖所示,而此時會因為 line 23
的關係,整個共享資料的 lfr->head
被修改了
digraph lfring {
splines = false;
node[shape = "record"]
queue[label = "... |<o1> obj1 |<o2> obj2 | obj3 |<empty>| ..."]
head1[shape = plaintext, label = "head1"]
head2[shape = plaintext, label = "head2"]
tail1[shape = plaintext, label = "tail1"]
tail2[shape = plaintext, label = "tail2"]
head1 -> queue:o2
head2 -> queue:o1
tail1 -> queue:empty
tail2 -> queue:empty
}
因此當 thread2 執行到 line 23
時,會發現 lfr->head
已經和原本的 head
不同,因此接下來的步驟變成將 lfr->head
更新到 head
,並重新執行一次取資料的動作
digraph lfring {
splines = false;
node[shape = "record"]
queue[label = "... | obj1 |<o2> obj2 | obj3 |<empty>| ..."]
head1[shape = plaintext, label = "head1"]
head2[shape = plaintext, label = "head2"]
tail1[shape = plaintext, label = "tail1"]
tail2[shape = plaintext, label = "tail2"]
head1 -> queue:o2
head2 -> queue:o2
tail1 -> queue:empty
tail2 -> queue:empty
}
最後再重新取一次資料,因此可以得知 thread1 得到 obj1
, thread2 得到 obj2
digraph lfring {
splines = false;
node[shape = "record"]
queue[label = "... | obj1 |<o2> obj2 |<o3> obj3 |<empty>| ..."]
head1[shape = plaintext, label = "head1"]
head2[shape = plaintext, label = "head2"]
tail1[shape = plaintext, label = "tail1"]
tail2[shape = plaintext, label = "tail2"]
head1 -> queue:o2
head2 -> queue:o3
tail1 -> queue:empty
tail2 -> queue:empty
}
研讀ing…
3
延伸問題:
dont_trace
核心模組掛載 dont_trace
核心模組時,會執行函式 dont_trace_init
static int __init dont_trace_init(void)
{
// 建立一個 workqueue
wq = create_workqueue(DONT_TRACE_WQ_NAME);
// queue work on a workqueue after delay
queue_delayed_work(wq, &dont_trace_task, JIFFIES_DELAY);
loaded = true;
pr_info("Loaded!\n");
return 0;
}
首先使用函式 create_workqueue
建立 workqueue ,並且使用函式 queue_delayed_work
使在 workqueue 執行的工作延遲 JIFFIES_DELAY
個 jiffy ,而 linux 核心所指的 jiffy 則是指觸發計時中斷所間隔的時間
int queue_delayed_work (struct workqueue_struct *wq,
struct delayed_work *dwork,
unsigned long delay);
接著有一個特別的參數 dont_trace_task
,可以發現模組使用巨集 DECLARE_DELAYED_WORK
建立延時工作任務
static DECLARE_DELAYED_WORK(dont_trace_task, periodic_routine);
而巨集 DECLARE_DELAYED_WORK
可以從 include/linux/workqueue.h 找到,以下是相關實作
#define __WORK_INITIALIZER(n, f) { \
.data = WORK_DATA_STATIC_INIT(), \
.entry = { &(n).entry, &(n).entry }, \
.func = (f), \
__WORK_INIT_LOCKDEP_MAP(#n, &(n)) \
}
#define __DELAYED_WORK_INITIALIZER(n, f, tflags) { \
.work = __WORK_INITIALIZER((n).work, (f)), \
.timer = __TIMER_INITIALIZER(delayed_work_timer_fn,\
(tflags) | TIMER_IRQSAFE), \
}
#define DECLARE_DELAYED_WORK(n, f) \
struct delayed_work n = __DELAYED_WORK_INITIALIZER(n, f, 0)
雖然沒有很清楚整體的實作,但是可以稍微得知巨集 DECLARE_DELAYED_WORK
對模組的影響
struct delayed_work dont_trace_task
作為工作儲存在 workqueueperiodic_routine
設定為工作會執行的函式dont_trace
核心模組掛載 dont_trace
核心模組後,會不斷執行函式 periodic_routine
,正是題目要填補的程式碼,以下為回答的結果
static void periodic_routine(struct work_struct *ws)
{
if (likely(/* XXXXX: Implement */loaded))
check();
/* XXXXX: Implement */;
schedule_delayed_work(&dont_trace_task, JIFFIES_DELAY);
}
首先判斷變數 load
是否為真,如果為真則直接進入函式 check
,主要是用來判斷是否有程式正在被監控,接著就是使用函式 schedule_delayed_work
重新將任務放回 workqueue
int schedule_delayed_work ( struct delayed_work *dwork,
unsigned long delay);
接著蠻好奇這個核心模組是如何知道其他程式是否被監控的,從函式 check
開始研究
static void check(void)
{
struct task_struct *task;
for_each_process (task) {
if (!is_tracer(&task->ptraced))
continue;
kill_tracee(&task->ptraced);
kill_task(task); /* Kill the tracer once all tracees are killed */
}
}
首先是巨集 for_each_process
,從名子可以知道是用來走訪所有程式,其實作位於 include/linux/sched/signal.h ,很明顯是從第一個程式 init_task
一路走訪
#define for_each_process(p) \
for (p = &init_task ; (p = next_task(p)) != &init_task ; )
在討論函式 is_tracer
之前,先研究結構 struct task_struct
成員 ptraced
的功能,根據題目敘述
Once any process starts “ptracing” another, the tracee gets added into ptraced that’s located in task_struct, which is simply a linked list that contains all the tracees that the process is “ptracing”.
當有任何的程式開始追蹤其他的程式,此時管理被追蹤程式的 task_struct
成員 ptraced
就會被接到追蹤者上,可以觀察 include/linux/sched.h 裡 ptraced
的定義
struct task_struct {
...
/*
* 'ptraced' is the list of tasks this task is using ptrace() on.
*
* This includes both natural children and PTRACE_ATTACH targets.
* 'ptrace_entry' is this task's link on the p->parent->ptraced list.
*/
struct list_head ptraced;
struct list_head ptrace_entry;
...
}
很明顯可以看到 ptraced
是一個 linked list ,所有被追蹤的程式都會由這個 linked list 所管理
接著分析函式 is_tracer
,藉由走訪整個 ptraced
的 linked list 判斷該程式是否追蹤其他的程式
/* @return true if the process has tracees */
static bool is_tracer(struct list_head *children)
{
struct list_head *list;
list_for_each (list, children) {
struct task_struct *task =
list_entry(list, struct task_struct, ptrace_entry);
if (task)
return true;
}
return false;
}
函式 kill_tracee
則是當發現有程式正在追蹤其他程式時,使用函式 kill_task
將上述提到的 linked list 所含有的程式中止
/* Traverse the element in the linked list of the ptraced proccesses and
* finally kills them.
*/
static void kill_tracee(struct list_head *children)
{
struct list_head *list;
list_for_each (list, children) {
struct task_struct *task_ptraced =
list_entry(list, struct task_struct, ptrace_entry);
pr_info("ptracee -> comm: %s, pid: %d, gid: %d, ptrace: %d\n",
task_ptraced->comm, task_ptraced->pid, task_ptraced->tgid,
task_ptraced->ptrace);
kill_task(task_ptraced);
}
}
而函式 kill_task
則是送出 SIGKILL
將目標程式中止
/* Send SIGKILL from kernel space */
static void kill_task(struct task_struct *task)
{
send_sig(SIGKILL, task, 1);
}