---
tags: Linux Kernel
---
# 2022q1 Homework5 (quiz8)
contributed by < `kevinshieh0225` >
> [測驗題目](https://hackmd.io/@sysprog/linux2022-quiz8)
> [GitHub](https://github.com/kevinshieh0225/linux2022-quiz8)
## [測驗 `1`](https://hackmd.io/@sysprog/linux2022-quiz8/https%3A%2F%2Fhackmd.io%2F%40sysprog%2FHyg5nxO79):[memchr](https://man7.org/linux/man-pages/man3/memchr.3.html) 效能改進
:::info
```c
while (length >= LBLOCKSIZE) {
// if find the char, break to check which char is the target address.
if (DETECT_CHAR(*asrc, mask) != 0)
break;
else {
length -= LBLOCKSIZE;
asrc++;
}
}
```
:::
:::spoiler 完整程式碼
```c
#include <stddef.h>
#include <stdint.h>
#include <limits.h>
#include <string.h>
/* Nonzero if either X or Y is not aligned on a "long" boundary */
#define UNALIGNED(X) ((long) X & (sizeof(long) - 1))
/* How many bytes are loaded each iteration of the word copy loop */
#define LBLOCKSIZE (sizeof(long))
/* Threshhold for punting to the bytewise iterator */
#define TOO_SMALL(LEN) ((LEN) < LBLOCKSIZE)
#if LONG_MAX == 2147483647L
#define DETECT_NULL(X) (((X) -0x01010101) & ~(X) & 0x80808080)
#else
#if LONG_MAX == 9223372036854775807L
/* Nonzero if X (a long int) contains a NULL byte. */
#define DETECT_NULL(X) (((X) -0x0101010101010101) & ~(X) & 0x8080808080808080)
#else
#error long int is not a 32bit or 64bit type.
#endif
#endif
/* @return nonzero if (long)X contains the byte used to fill MASK. */
#define DETECT_CHAR(X, MASK) (DETECT_NULL(X ^ MASK))
void *memchr_opt(const void *src_void, int c, size_t length)
{
const unsigned char *src = (const unsigned char *) src_void;
unsigned char d = c;
while (UNALIGNED(src)) {
if (!length--)
return NULL;
if (*src == d)
return (void *) src;
src++;
}
if (!TOO_SMALL(length)) {
/* If we get this far, we know that length is large and
* src is word-aligned.
*/
/* The fast code reads the source one word at a time and only performs
* the bytewise search on word-sized segments if they contain the search
* character, which is detected by XORing the word-sized segment with a
* word-sized block of the search character and then detecting for the
* presence of NULL in the result.
*/
unsigned long *asrc = (unsigned long *) src;
unsigned long mask = d << 8 | d;
mask = mask << 16 | mask;
for (unsigned int i = 32; i < LBLOCKSIZE * 8; i <<= 1)
mask = (mask << i) | mask;
while (length >= LBLOCKSIZE) {
// if find the char, check which char is the target address.
if (DETECT_CHAR(*asrc, mask) != 0)
break;
else {
length -= LBLOCKSIZE;
asrc++;
}
}
/* If there are fewer than LBLOCKSIZE characters left, then we resort to
* the bytewise loop.
*/
src = (unsigned char *) asrc;
}
while (length--) {
if (*src == d)
return (void *) src;
src++;
}
return NULL;
}
```
:::
原本我們在尋找字元時會迭代的逐一比對尋找,這次我們希望善用處理器一次能處理 32/64 位元的特性,增大一次比對搜尋的範圍。
### DETECT_CHAR
利用 `DETECT_NULL` 巨集,我們希望搜尋在 `word size` 中,是否有任何一個 byte 是 0(`char size = 1byte`)。
再透過利用 `DETECT_CHAR` 巨集,我們將拼接好的 `word size char MASK` 和 `X` 做 XOR。
XOR 的特性是:兩者若位元相同為 0,位元相異為 1。於是乎如果 `X` 當中如果有我們在找的目標字符,那該位元組就會被設定為 NULL,丟入 `DETECT_NULL` 即會回傳 nonzero。
關於 `DETECT_NULL` 的說明可以在 [數值系統篇](https://hackmd.io/@sysprog/c-numerics?type=view#運用-bit-wise-operator) 找到。
```c
#if LONG_MAX == 9223372036854775807L
/* Nonzero if X (a long int) contains a NULL byte. */
#define DETECT_NULL(X) (((X) -0x0101010101010101) & ~(X) & 0x8080808080808080)
#else
#error long int is not a 32bit or 64bit type.
#endif
#endif
/* @return nonzero if (long)X contains the byte used to fill MASK. */
#define DETECT_CHAR(X, MASK) (DETECT_NULL(X ^ MASK))
```
### word size MASK
在處理完例外狀況後(處理無效地址、字串長度大於一個 `word size`),我們製作 `word size MASK`。
```c
void *memchr_opt(const void *src_void, int c, size_t length)
{
...
unsigned long *asrc = (unsigned long *) src;
unsigned long mask = d << 8 | d;
mask = mask << 16 | mask;
for (unsigned int i = 32; i < LBLOCKSIZE * 8; i <<= 1)
mask = (mask << i) | mask;
...
}
```
### Find char in Long mask scale(answer)
在字串長度還大於一個 `word size` 時,進行 `DETECT_CHAR`,如果發現存有空位元組,或是如果搜尋到最後發現剩下的長度已經小於 `word size` 了,那就離開迴圈。
把搜尋規模改為 `char`,對範圍中的字串逐一搜尋位元組。
```c
void *memchr_opt(const void *src_void, int c, size_t length)
{
...
while (length >= LBLOCKSIZE) {
// if find the char, break to check which char is the target address.
if (DETECT_CHAR(*asrc, mask) != 0)
break;
else {
length -= LBLOCKSIZE;
asrc++;
}
}
/* If there are fewer than LBLOCKSIZE characters left, then we resort to
* the bytewise loop.
*/
src = (unsigned char *) asrc;
}
while (length--) {
if (*src == d)
return (void *) src;
src++;
}
return NULL;
}
```
:::warning
ToDo
使用 `SIMD` , `SSE` 等組合語言再進行搜索加速。
:::
### Test Code
```c
#include <stdio.h>
#include "memchr_opt.h"
int main()
{
const char str[] = "http://wiki.csie.ncku.edu.tw";
const char ch1 = '.';
const char ch2 = 'w';
const char ch3 = 'u';
const char ch4 = 'm'; // non exist char
char *ret1 = memchr_opt(str, ch1, strlen(str));
char *ret2 = memchr_opt(str, ch2, strlen(str));
char *ret3 = memchr_opt(str, ch3, strlen(str));
char *ret4 = memchr_opt(str, ch4, strlen(str));
printf("String after |%c| is - |%s|\n", ch1, ret1);
printf("String after |%c| is - |%s|\n", ch2, ret2);
printf("String after |%c| is - |%s|\n", ch3, ret3);
printf("String after |%c| is - |%s|\n", ch4, ret4);
return 0;
}
```
```
String after |.| is - |.csie.ncku.edu.tw|
String after |w| is - |wiki.csie.ncku.edu.tw|
String after |u| is - |u.edu.tw|
String after |m| is - |(null)|
```
---
## [測驗 `2`](https://hackmd.io/@sysprog/linux2022-quiz8/https%3A%2F%2Fhackmd.io%2F%40sysprog%2FrkQMKQu7c): multiple-producer/multiple-consumer (MPMC) 的實作
:::info
`DDD` : `idx++`
`KKK` : `mask + 1`
`TTT` : `&lfr->ring[tail & mask].idx, __ATOMIC_RELAXED`
`HHH` : `head + actual`
:::
:::spoiler 完整程式碼
```c
#include <assert.h>
#include <inttypes.h>
#include <stdbool.h>
#include <stdlib.h>
#include <stdio.h>
#include "arch.h"
#include "common.h"
#include "lfring.h"
#define SUPPORTED_FLAGS \
(LFRING_FLAG_SP | LFRING_FLAG_MP | LFRING_FLAG_SC | LFRING_FLAG_MC)
#define MIN(a, b) \
({ \
__typeof__(a) tmp_a = (a); \
__typeof__(b) tmp_b = (b); \
tmp_a < tmp_b ? tmp_a : tmp_b; \
})
typedef uintptr_t ringidx_t;
struct element {
void *ptr;
uintptr_t idx;
};
/* tail for enqueue
* head for dequeue
*/
struct lfring {
ringidx_t head;
ringidx_t tail ALIGNED(CACHE_LINE);
uint32_t mask;
uint32_t flags;
struct element ring[] ALIGNED(CACHE_LINE);
} ALIGNED(CACHE_LINE);
lfring_t *lfring_alloc(uint32_t n_elems, uint32_t flags)
{
unsigned long ringsz = ROUNDUP_POW2(n_elems);
if (n_elems == 0 || ringsz == 0 || ringsz > 0x80000000) {
assert(0 && "invalid number of elements");
return NULL;
}
if ((flags & ~SUPPORTED_FLAGS) != 0) {
assert(0 && "invalid flags");
return NULL;
}
size_t nbytes = sizeof(lfring_t) + ringsz * sizeof(struct element);
lfring_t *lfr = osal_alloc(nbytes, CACHE_LINE);
if (!lfr)
return NULL;
lfr->head = 0, lfr->tail = 0;
lfr->mask = ringsz - 1;
lfr->flags = flags;
for (ringidx_t i = 0; i < ringsz; i++) {
lfr->ring[i].ptr = NULL;
lfr->ring[i].idx = i - ringsz;
}
return lfr;
}
void lfring_free(lfring_t *lfr)
{
if (!lfr)
return;
if (lfr->head != lfr->tail) {
assert(0 && "ring buffer not empty");
return;
}
osal_free(lfr);
}
/* True if 'a' is before 'b' ('a' < 'b') in serial number arithmetic */
static inline bool before(ringidx_t a, ringidx_t b)
{
return (intptr_t)(a - b) < 0;
}
/* renew loc with neu
* if neu < old, means that the process is too old and no need to update,
* return old instead.
* else:
* check if anyone have update the loc which make loc != &old
* if no one update, assign loc with neu, and return neu
* if someone has already update, assign new loc to &old, and compare again
*
* # Multiple Producers Second Step
*/
static inline ringidx_t cond_update(ringidx_t *loc, ringidx_t neu)
{
ringidx_t old = __atomic_load_n(loc, __ATOMIC_RELAXED);
do {
if (before(neu, old)) /* neu < old */
return old;
/* if neu > old, need to update *loc */
} while (!__atomic_compare_exchange_n(loc, &old, /* Updated on failure */
neu,
/* weak */ true, __ATOMIC_RELEASE,
__ATOMIC_RELAXED));
return neu;
}
/* reload a new pointer
* if idx < fresh, then just use the current pointer
* else may means that loc haven't update
* use next slot of idx
*/
static inline ringidx_t cond_reload(ringidx_t idx, const ringidx_t *loc)
{
ringidx_t fresh = __atomic_load_n(loc, __ATOMIC_RELAXED);
if (before(idx, fresh)) { /* fresh is after idx, use this instead */
idx = fresh;
} else { /* Continue with next slot */
idx++; /* DDD */
}
return idx;
}
/* Enqueue elements at tail */
uint32_t lfring_enqueue(lfring_t *lfr,
void *const *restrict elems,
uint32_t n_elems)
{
intptr_t actual = 0;
ringidx_t mask = lfr->mask;
ringidx_t size = mask + 1;
ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_RELAXED);
if (lfr->flags & LFRING_FLAG_SP) { /* single-producer */
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE);
actual = MIN((intptr_t)(head + size - tail), (intptr_t) n_elems);
if (actual <= 0)
return 0;
for (uint32_t i = 0; i < (uint32_t) actual; i++) {
assert(lfr->ring[tail & mask].idx == tail - size);
lfr->ring[tail & mask].ptr = *elems++;
lfr->ring[tail & mask].idx = tail;
tail++;
}
__atomic_store_n(&lfr->tail, tail, __ATOMIC_RELEASE);
return (uint32_t) actual;
}
/* else: lock-free multi-producer */
restart:
while ((uint32_t) actual < n_elems &&
before(tail, __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE) + size)) {
union {
struct element e;
ptrpair_t pp;
} old, neu;
void *elem = elems[actual];
struct element *slot = &lfr->ring[tail & mask];
old.e.ptr = __atomic_load_n(&slot->ptr, __ATOMIC_RELAXED);
old.e.idx = __atomic_load_n(&slot->idx, __ATOMIC_RELAXED);
do {
if (UNLIKELY(old.e.idx != tail - size)) {
if (old.e.idx != tail) {
/* We are far behind. Restart with fresh index */
tail = cond_reload(tail, &lfr->tail);
goto restart;
}
/* slot already enqueued */
tail++; /* Try next slot */
goto restart;
}
/* Found slot that was used one lap back.
* Try to enqueue next element.
*/
neu.e.ptr = elem;
neu.e.idx = tail; /* Set idx on enqueue */
} while (!lf_compare_exchange((ptrpair_t *) slot, &old.pp, neu.pp));
/* Enqueue succeeded */
actual++;
tail++; /* Continue with next slot */
}
(void) cond_update(&lfr->tail, tail);
return (uint32_t) actual;
}
static inline ringidx_t find_tail(lfring_t *lfr, ringidx_t head, ringidx_t tail)
{
if (lfr->flags & LFRING_FLAG_SP) /* single-producer enqueue */
return __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
/* Multi-producer enqueue.
* Scan ring for new elements that have been written but not released.
*/
ringidx_t mask = lfr->mask;
ringidx_t size = mask + 1; /* KKK */
while (before(tail, head + size) &&
__atomic_load_n(&lfr->ring[tail & mask].idx, __ATOMIC_RELAXED) ==
tail) {
tail++;
}
tail = cond_update(&lfr->tail, tail);
return tail;
}
/* Dequeue elements from head */
uint32_t lfring_dequeue(lfring_t *lfr,
void **restrict elems,
uint32_t n_elems,
uint32_t *index)
{
ringidx_t mask = lfr->mask;
intptr_t actual;
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED);
ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
do {
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
if (UNLIKELY(actual <= 0)) {
/* Ring buffer is empty, scan for new but unreleased elements */
tail = find_tail(lfr, head, tail);
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
if (actual <= 0)
return 0;
}
for (uint32_t i = 0; i < (uint32_t) actual; i++)
elems[i] = lfr->ring[(head + i) & mask].ptr;
smp_fence(LoadStore); // Order loads only
if (UNLIKELY(lfr->flags & LFRING_FLAG_SC)) { /* Single-consumer */
__atomic_store_n(&lfr->head, head + actual, __ATOMIC_RELAXED);
break;
}
/* else: lock-free multi-consumer */
} while (!__atomic_compare_exchange_n(
&lfr->head, &head, /* Updated on failure */
/* HHH */ head + actual,
/* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
*index = (uint32_t) head;
return (uint32_t) actual;
}
```
:::
本題要實作 MPMC lock-free ring buffer。其中 producer 的角色是插入新值 `enqueue` ;consumer 對應的是使用並移除舊值 `dequeue`。
參考 lfring 的結構體:
```c
/* tail for enqueue : producer
* head for dequeue : consumer
* mask = ringsz - 1 :
* which lfr->ring[tail & mask] becomes ring buffer
* flags : the scenerio for mpmc/mpsc/spmc/spsc
* ring : the element array for ring buffer
*/
struct lfring {
ringidx_t head;
ringidx_t tail ALIGNED(CACHE_LINE);
uint32_t mask;
uint32_t flags;
struct element ring[] ALIGNED(CACHE_LINE);
} ALIGNED(CACHE_LINE);
```
### When multiple producer / consumer
在介紹 enqueue/dequeue 以前,我們先了解 mpmc 下會出現什麼狀況,又該如何避免錯誤:
mpmc 可能會出現競爭問題,當兩個執行序同時對 lfring 進行操作時,可能發生競爭與覆寫的非預期結果。
理解到每次操作後的下一個指令都可能有競爭情形,所以我們用佔旗子的思維來解決這個問題:首先我們有一個預定要更新的位置,在準備存入時先確定該資源是否被競爭,若受競爭就往後尋找可以使用的位置,**注意**我們在確認這裡可以插旗子並且要存值時,這個步驟必須 atomic 的完成。
可以參考 [Ring Library](https://doc.dpdk.org/guides/prog_guide/ring_lib.html) `7.5.3.2 Multiple Producers Enqueue Second Step` 的說明:
> The second step is to modify ring->prod_head in the ring structure to point to the same location as prod_next. This operation is done using a Compare And Swap (CAS) instruction, which does the following operations atomically:
>
> * If ring->prod_head is different to local variable prod_head, the CAS operation fails, and the code restarts at first step.
> * Otherwise, ring->prod_head is set to local prod_next, the CAS operation is successful, and processing continues.
>
> In the figure, the operation succeeded on core 1, and step one restarted on core 2.

#### `__atomic_compare_exchange_n`
我們透過 `__atomic_compare_exchange_n` 來確保我們存值時的資源是否被其他行程競爭了,根據[文件]((https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html))說明
```c
/*
* This built-in function implements an atomic
* compare and exchange operation. This compares
* the contents of *ptr with the contents of *expected.
*
* If equal, the operation is a read-modify-write
* operation that writes desired into *ptr.
*
* If they are not equal, the operation is a read and
* the current contents of *ptr are written into *expected.
*/
bool __atomic_compare_exchange_n (
type *ptr,
type *expected,
type desired,
bool weak,
int success_memorder,
int failure_memorder
)
```
對應到我們的脈落:
> 我們比較 `structure state *ptr` 與 `local variable *expected` 是否還相同,如果相同的話代表並沒有被其他行程競爭,所以我們把 `structure state *ptr` 更新為 `desired`。
>
> 如果兩者不相同的話,代表說我們原本載入的 `local variable *expected` 已經不是最新的了,所以我們重新把 `*ptr` 載給 `*expected` ,再重新做一次我們的行為。
### `cond_update` / `cond_reload`
```c
/* renew loc with neu
* if neu < old, means that the process is too old and no need to update,
* return old instead.
* else:
* check if anyone have update the loc which make loc != &old
* if no one update, assign loc with neu, and return neu
* if someone has already update, assign new loc to &old, and compare again
*
* # Multiple Producers Second Step
*/
static inline ringidx_t cond_update(ringidx_t *loc, ringidx_t neu)
{
ringidx_t old = __atomic_load_n(loc, __ATOMIC_RELAXED);
do {
if (before(neu, old)) /* neu < old */
return old;
/* if neu > old, need to update *loc */
} while (!__atomic_compare_exchange_n(loc, &old, /* Updated on failure */
neu,
/* weak */ true, __ATOMIC_RELEASE,
__ATOMIC_RELAXED));
return neu;
}
```
```c
/* reload a new pointer
* if idx < fresh, means that the process is too old and no need to reload,
* just use the current pointer intead
* else may means that loc haven't update
* use next slot of idx
*/
static inline ringidx_t cond_reload(ringidx_t idx, const ringidx_t *loc)
{
ringidx_t fresh = __atomic_load_n(loc, __ATOMIC_RELAXED);
if (before(idx, fresh)) { /* fresh is after idx, use this instead */
idx = fresh;
} else { /* Continue with next slot */
idx++; /* DDD */
}
return idx;
}
```
### Dequeue in multiple consumer
```c
/* Dequeue elements from head
*
* 1. assign actual how many element are dequeueing
* (MIN(the element remain in queue, the number we tempt))
* 1.1 if Ring buffer is empty, scan for new but unreleased elements
* 1.2 if still empty, return 0
*
* 2. copy the value from lfr->ring to elems
*
* 3. if single-consumer, just renew &lfr->head and return
*
* 4. use '__atomic_compare_exchange_n' to ensure
* we successfully renew &lfr->head with (head + actual)
*
* @lfr : lfring struct
* @elem : the element array after dequeueing to save to
* @n_elems : the number of element to dequeue
* @index : the last element we dequeue
*/
uint32_t lfring_dequeue(lfring_t *lfr,
void **restrict elems,
uint32_t n_elems,
uint32_t *index)
{
ringidx_t mask = lfr->mask;
intptr_t actual;
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED);
ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
do {
// 1. assign actual how many element are dequeueing
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
if (UNLIKELY(actual <= 0)) {
// 1.1 if Ring buffer is empty, scan for new but unreleased elements
tail = find_tail(lfr, head, tail);
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
// 1.2 if still empty, return 0
if (actual <= 0)
return 0;
}
// 2. copy the value from lfr->ring to elems
for (uint32_t i = 0; i < (uint32_t) actual; i++)
elems[i] = lfr->ring[(head + i) & mask].ptr;
smp_fence(LoadStore); // Order loads only
// 3. if single-consumer, just renew &lfr->head and return
if (UNLIKELY(lfr->flags & LFRING_FLAG_SC)) { /* Single-consumer */
__atomic_store_n(&lfr->head, head + actual, __ATOMIC_RELAXED);
break;
}
/* else: lock-free multi-consumer */
// 4. ensure we successfully renew &lfr->head with (head + actual)
} while (!__atomic_compare_exchange_n(
&lfr->head, &head, /* Updated on failure */
/* HHH */ head + actual,
/* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
*index = (uint32_t) head;
return (uint32_t) actual;
}
```
### `find_tail`
回顧 `lfring_dequeue` 的 `1.1`: 當我們發現目前目前 `lfr` 紀錄的 `(lfr->tail) - (lfr->head)` 顯示沒有 element 了,為何我們要用 `find_tail` 再更新一次 tail 呢?
關於這個我們要連結到 `lfring_enqueue` 的實作:`lfring_enqueue` 函式會先更新 `lfr->ring` 的存值,最後才會更新 `lfr->tail`。
這引出了一個情況:可能此刻 consumer 透過 `(tail-head)` 觀察到沒有 buffer,但其實同一時間 producer 正在 `enqueue` 更新當中,所以 `lfr->ring` 裡其實是有 buffer 的。所以這次我們改以從現有的 `lfr->ring` 來確認,是不是 producer 已經在 `enqueue` 當中了。
```c
/* find_tail: check current lfr->ring whether
* there is someone enqueueing
*
* 1. Because single producer will store lfr->ring, lfr->tail
* right in-time, so just check lfr->tail again
*
* 2. For Multi-producer, Scan ring for new elements that
* have been written but not released.
*
* 2.1 while tail is before (head + size) &&
* lfr->ring[tail & mask].idx is actually loaded
* renew the tail rightaway
*
* 3. cond_update for &lfr->tail and tail
* maybe as we are finding tail, the producer have already
* done his job
*/
static inline ringidx_t find_tail(lfring_t *lfr, ringidx_t head, ringidx_t tail)
{
/* 1. Because single producer will store lfr->ring, lfr->tail
* right in-time, so just check lfr->tail again
*/
if (lfr->flags & LFRING_FLAG_SP) /* single-producer enqueue */
return __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
/* 2. Multi-producer enqueue.
* Scan ring for new elements that have been written but not released.
*/
ringidx_t mask = lfr->mask;
ringidx_t size = mask + 1; /* KKK */
/* 2.1 while tail is before (head + size) &&
* lfr->ring[tail & mask].idx is actually loaded
* renew the tail rightaway
*/
while (before(tail, head + size) &&
__atomic_load_n(&lfr->ring[tail & mask].idx, __ATOMIC_RELAXED) ==
tail) {
tail++;
}
/* 3. cond_update for &lfr->tail and tail
* maybe as we are finding tail, the producer have already
* done his job
*/
tail = cond_update(&lfr->tail, tail);
return tail;
}
```
---
## [測驗 `3`](https://hackmd.io/@sysprog/linux2022-quiz8/https%3A%2F%2Fhackmd.io%2F%40sysprog%2FBy5JAmOX9): 變更特定 Linux 行程的內部狀態
:::info
```c
static void periodic_routine(struct work_struct *ws)
{
if (likely(loaded))
check();
queue_delayed_work(wq, &dont_trace_task, JIFFIES_DELAY);
}
```
:::
:::spoiler 完整程式碼
```c
#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
#include <linux/list.h>
#include <linux/module.h>
#include <linux/sched.h>
#include <linux/sched/signal.h>
#include <linux/workqueue.h>
MODULE_AUTHOR("National Cheng Kung University, Taiwan");
MODULE_LICENSE("Dual BSD/GPL");
MODULE_DESCRIPTION("A kernel module that kills ptrace tracer and its tracees");
#define JIFFIES_DELAY 1
#define DONT_TRACE_WQ_NAME "dont_trace_worker"
static void periodic_routine(struct work_struct *);
static DECLARE_DELAYED_WORK(dont_trace_task, periodic_routine);
static struct workqueue_struct *wq;
static bool loaded;
/* Send SIGKILL from kernel space */
static void kill_task(struct task_struct *task)
{
send_sig(SIGKILL, task, 1);
}
/* @return true if the process has tracees */
static bool is_tracer(struct list_head *children)
{
struct list_head *list;
list_for_each (list, children) {
struct task_struct *task =
list_entry(list, struct task_struct, ptrace_entry);
if (task)
return true;
}
return false;
}
/* Traverse the element in the linked list of the ptraced proccesses and
* finally kills them.
*/
static void kill_tracee(struct list_head *children)
{
struct list_head *list;
list_for_each (list, children) {
struct task_struct *task_ptraced =
list_entry(list, struct task_struct, ptrace_entry);
pr_info("ptracee -> comm: %s, pid: %d, gid: %d, ptrace: %d\n",
task_ptraced->comm, task_ptraced->pid, task_ptraced->tgid,
task_ptraced->ptrace);
kill_task(task_ptraced);
}
}
static void check(void)
{
struct task_struct *task;
for_each_process (task) {
if (!is_tracer(&task->ptraced))
continue;
kill_tracee(&task->ptraced);
kill_task(task); /* Kill the tracer once all tracees are killed */
}
}
static void periodic_routine(struct work_struct *ws)
{
if (likely(loaded))
check();
queue_delayed_work(wq, &dont_trace_task, JIFFIES_DELAY);
}
static int __init dont_trace_init(void)
{
wq = create_workqueue(DONT_TRACE_WQ_NAME);
queue_delayed_work(wq, &dont_trace_task, JIFFIES_DELAY);
loaded = true;
pr_info("Loaded!\n");
return 0;
}
static void __exit dont_trace_exit(void)
{
loaded = false;
/* No new routines will be queued */
cancel_delayed_work(&dont_trace_task);
/* Wait for the completion of all routines */
flush_workqueue(wq);
destroy_workqueue(wq);
pr_info("Unloaded.\n");
}
module_init(dont_trace_init);
module_exit(dont_trace_exit);
```
:::
本題實作一個核心模組 `dont_trace`,利用 `task_struct` 內部成員 `ptrace` 來偵測給定的行程是否被除錯器或其他使用 `ptrace` 系統呼叫的程式所 attach,一旦偵測到其他行程嘗試追蹤該行程,`dont_trace` 就會主動 kill 追蹤者行程。流程:
> * Once any process starts “ptracing” another, the tracee gets added into ptraced that’s located in task_struct, which is simply a linked list that contains all the tracees that the process is “ptracing”.
> * Once a tracer is found, the module lists all its tracees and sends a SIGKILL signal to each of them including the tracer. This results in killing both the tracer and its tracees.
> * Once the module is attached to the kernel, the module’s “core” function will run periodically through the advantage of workqueues. Specifically, the module runs every JIFFIES_DELAY, which is set to 1. That is, the module will run every one jiffy. This can be changed through modifying the macro JIFFIES_DELAY defined in the module.
從流程回到程式碼,我們可以理解函式對應的功能:
* `kill_task` : 寄出 `SIGKILL` 以中斷指定核心行程。
* `is_tracer` : 假如該行程是 tracer,回傳 true
* `kill_tracee` : 尋訪 tracer 的 ptraced proccesses 追蹤的串列任務行程,並殺死 tracee。
* `check` : 利用 `for_each_process(struct task_struct *task)` 尋訪所有排程的行程是否有為 tracer,並 "killing both the tracer and its tracees"。
* `periodic_routine` : 假如目前 `dont_trace` 核心模組被載入,那我就每過一個單位時間排程執行一次 `check`。
* `dont_trace_init`/`dont_trace_exit` : 載入 / 解除模組
接著主要圍繞 `periodic_routine` 來講解:
### `periodic_routine` (answer)
```c
/* periodic_routine :
*
* 假如目前 dont_trace 核心模組被載入(loaded 全域變數)
* 那我就每過一個單位時間執行一次 check。
*
* 特別注意流程所述:Once the module is attached to the kernel,
* the module’s “core” function will run periodically through
* the advantage of workqueues. Specifically, the module
* runs every JIFFIES_DELAY
*
* int queue_delayed_work (struct workqueue_struct *wq,
* struct delayed_work *dwork,
* unsigned long delay);
*
* @wq : workqueue to use
* @dwork : delayable work to queue
* @delay : number of jiffies to wait before queueing
*/
static void periodic_routine(struct work_struct *ws)
{
if (likely(loaded))
check();
queue_delayed_work(wq, &dont_trace_task, JIFFIES_DELAY);
}
```
根據 [queue_delayed_work](https://docs.huihoo.com/linux/kernel/2.6.26/kernel-api/re63.html) 所述:
> queue_delayed_work — queue work on a workqueue after delay
本次核心模組依賴 [work queue](https://codertw.com/ios/61992/) 進行任務規劃排程:
> Linux 中的 workqueue 機制就是為了簡化核心執行緒的建立。通過呼叫 workqueue 的介面就能建立核心執行緒。並且可以根據當前系統 CPU 的個數建立執行緒的數量,使得執行緒處理的事務能夠並行化。workqueue 是核心中實現簡單而有效的機制,他顯然簡化了核心 daemon 的建立,方便了使用者的程式設計。
>
> 工作佇列 (workqueue) 是另外一種將工作推後執行的形式。工作佇列可以把工作推後,交由一個核心執行緒去執行,也就是說,這個下半部分可以在程序上下文中執行。最重要的就是工作佇列允許被重新排程甚至是睡眠。
延伸閱讀:[Work Queues](https://www.oreilly.com/library/view/understanding-the-linux/0596005652/ch04s08.html)
### `check`
在`<linux/sched.h>` 中定義的 `for_each_process` 用以尋訪目前排程的行程,如果這個行程是 `tracer` ,則尋訪該 `tracer` 中的 `ptraced` 殺死 `tracee` 並殺死 `tracer` 自己。
```c
static void check(void)
{
struct task_struct *task;
for_each_process (task) {
if (!is_tracer(&task->ptraced))
continue;
kill_tracee(&task->ptraced);
kill_task(task); /* Kill the tracer once all tracees are killed */
}
}
```