# 2022q1 Homework 5 (quiz8)
contributed by < [2020leon](https://github.com/2020leon) >
> archive: [2022-04-16 07:34:12 UTC+0](https://web.archive.org/web/20220416073412/https://hackmd.io/@6649/linux2022-quiz8)
## [測驗題目](https://hackmd.io/@sysprog/linux2022-quiz8)
### [測驗 `1`](https://hackmd.io/@sysprog/Hyg5nxO79)
完整程式碼如下。
```c
#include <stddef.h>
#include <stdint.h>
#include <limits.h>
#include <string.h>
/* Nonzero if either X or Y is not aligned on a "long" boundary */
#define UNALIGNED(X) ((long) X & (sizeof(long) - 1))
/* How many bytes are loaded each iteration of the word copy loop */
#define LBLOCKSIZE (sizeof(long))
/* Threshhold for punting to the bytewise iterator */
#define TOO_SMALL(LEN) ((LEN) < LBLOCKSIZE)
#if LONG_MAX == 2147483647L
#define DETECT_NULL(X) (((X) -0x01010101) & ~(X) & 0x80808080)
#else
#if LONG_MAX == 9223372036854775807L
/* Nonzero if X (a long int) contains a NULL byte. */
#define DETECT_NULL(X) (((X) -0x0101010101010101) & ~(X) & 0x8080808080808080)
#else
#error long int is not a 32bit or 64bit type.
#endif
#endif
/* @return nonzero if (long)X contains the byte used to fill MASK. */
#define DETECT_CHAR(X, MASK) (DETECT_NULL(X ^ MASK))
void *memchr_opt(const void *src_void, int c, size_t length)
{
const unsigned char *src = (const unsigned char *) src_void;
unsigned char d = c;
while (UNALIGNED(src)) {
if (!length--)
return NULL;
if (*src == d)
return (void *) src;
src++;
}
if (!TOO_SMALL(length)) {
/* If we get this far, we know that length is large and
* src is word-aligned.
*/
/* The fast code reads the source one word at a time and only performs
* the bytewise search on word-sized segments if they contain the search
* character, which is detected by XORing the word-sized segment with a
* word-sized block of the search character and then detecting for the
* presence of NULL in the result.
*/
unsigned long *asrc = (unsigned long *) src;
unsigned long mask = d << 8 | d;
mask = mask << 16 | mask;
for (unsigned int i = 32; i < LBLOCKSIZE * 8; i <<= 1)
mask = (mask << i) | mask;
while (length >= LBLOCKSIZE) {
/* XXXXX: Your implementation should appear here */
/* Use macro DETECT_CHAR to check if there contains the byte that
* used to fill mask. If it returns nonzero, we find the byte we
* want. Thus, break the loop.
*/
if (DETECT_CHAR(*asrc, mask))
break;
/* Renew the pointer and length after checking. */
asrc++;
length -= LBLOCKSIZE;
}
/* If there are fewer than LBLOCKSIZE characters left, then we resort to
* the bytewise loop.
*/
src = (unsigned char *) asrc;
}
while (length--) {
if (*src == d)
return (void *) src;
src++;
}
return NULL;
}
```
`XXXXX` 處如下:
```c
if (DETECT_CHAR(*asrc, mask))
break;
asrc++;
length -= LBLOCKSIZE;
```
#### 解釋測驗 `1` 程式碼運作原理
#### 設計實驗觀察隨字串長度和特定 pattern 變化的效能影響
#### 在核心原始程式碼找出 x86_64 或 arm64 對應的最佳化實作
---
### [測驗 `2`](https://hackmd.io/@sysprog/rkQMKQu7c)
> [測驗 `2` gist](https://gist.github.com/jserv/f810c45ad4423f406f9e0dbe9dabadc9)
所回答之完整程式碼如下。
```c
static inline ringidx_t cond_reload(ringidx_t idx, const ringidx_t *loc)
{
ringidx_t fresh = __atomic_load_n(loc, __ATOMIC_RELAXED);
if (before(idx, fresh)) { /* fresh is after idx, use this instead */
idx = fresh;
} else { /* Continue with next slot */
/* XXXXX */ idx++;
}
return idx;
}
static inline ringidx_t find_tail(lfring_t *lfr, ringidx_t head, ringidx_t tail)
{
if (lfr->flags & LFRING_FLAG_SP) /* single-producer enqueue */
return __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
/* Multi-producer enqueue.
* Scan ring for new elements that have been written but not released.
*/
ringidx_t mask = lfr->mask;
ringidx_t size = /* XXXXX */ mask + 1;
while (before(tail, head + size) &&
__atomic_load_n(/* XXXXX */ &lfr->ring[tail & mask].idx) == tail)
tail++;
tail = cond_update(&lfr->tail, tail);
return tail;
}
uint32_t lfring_dequeue(lfring_t *lfr,
void **restrict elems,
uint32_t n_elems,
uint32_t *index)
{
ringidx_t mask = lfr->mask;
intptr_t actual;
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED);
ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
do { /* skipped */
} while (!__atomic_compare_exchange_n(
&lfr->head, &head, /* Updated on failure */
/* XXXXX */ head + actual,
/* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
*index = (uint32_t) head;
return (uint32_t) actual;
}
```
| 陳述式 | 答案 |
| ------ | ----------------------------- |
| `DDD` | `idx++` |
| `KKK` | `mask + 1` |
| `TTT` | `&lfr->ring[tail & mask].idx` |
| `HHH` | `head + actual` |
#### 解釋測驗 `2` 程式碼運作原理
本題以環狀結構實現容量有限的佇列( queue ),用以解決生產者消費者問題。環狀結構可以簡單以下圖表示。
![](https://doc.dpdk.org/guides/_images/ring1.svg)
> source: https://doc.dpdk.org/guides/prog_guide/ring_lib.html
上圖為一切斷的環,以 FIFO 的方式儲存資料。每當有新資料加入時,便會在上圖右側之處加入。若到達邊緣則再從左方持續加入,直到滿為止。而當有資料要從結構中移除時,則從上圖左側移除。
在本題中所對應的結構如下。
```c
struct lfring {
ringidx_t head;
ringidx_t tail ALIGNED(CACHE_LINE);
uint32_t mask;
uint32_t flags;
struct element ring[] ALIGNED(CACHE_LINE);
} ALIGNED(CACHE_LINE);
```
其中 `head` 和 `tail` 分別對應到上方左邊以及右邊的箭頭, `mask` 隱含環狀結構大小之意, `flags` 則指出生產者消費者模式(分別為 SPSC 、 SPMC 、 MPSC 及 MPMC),最後 `ring` 則是環狀結構本體。
下方分別理解 `lfring_alloc`, `lfring_dequeue` 和 `lfring_enqueue` 。
##### `lfring_alloc`
```c
lfring_t *lfring_alloc(uint32_t n_elems, uint32_t flags)
{
unsigned long ringsz = ROUNDUP_POW2(n_elems);
if (n_elems == 0 || ringsz == 0 || ringsz > 0x80000000) {
assert(0 && "invalid number of elements");
return NULL;
}
if ((flags & ~SUPPORTED_FLAGS) != 0) {
assert(0 && "invalid flags");
return NULL;
}
size_t nbytes = sizeof(lfring_t) + ringsz * sizeof(struct element);
lfring_t *lfr = osal_alloc(nbytes, CACHE_LINE);
if (!lfr)
return NULL;
lfr->head = 0, lfr->tail = 0;
lfr->mask = ringsz - 1;
lfr->flags = flags;
for (ringidx_t i = 0; i < ringsz; i++) {
lfr->ring[i].ptr = NULL;
lfr->ring[i].idx = i - ringsz;
}
return lfr;
}
```
`lfring_alloc` 可視為結構的建構子。由實做可見 `mask` 的值為 `ringsz - 1` ,而 `ringsz` 又為 2 的冪,因此可驗證上述所言「 `mask` 隱含環狀結構大小之意」。在此將 `ringsz` 設為 2 的冪的考量為,索引值可藉由與 `mask` 進行位元且運算來取代與 `ringsz` 做取餘運算,精進程式效能。
> 以 bitwise-AND 取代 modulo 運算不只是「精進」效能,更是確保關鍵操作的執行時間,modulo 和 div 在現代處理器雖然有很大的改進,但沒辦法保證一定在固定的 CPU cycle 內完成
> :notes: jserv
##### `lfring_dequeue`
```c
/* Dequeue elements from head */
uint32_t lfring_dequeue(lfring_t *lfr,
void **restrict elems,
uint32_t n_elems,
uint32_t *index)
{
ringidx_t mask = lfr->mask;
intptr_t actual;
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED);
ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
do {
actual = MIN((intptr_t) (tail - head), (intptr_t) n_elems);
if (UNLIKELY(actual <= 0)) {
/* Ring buffer is empty, scan for new but unreleased elements */
tail = find_tail(lfr, head, tail);
actual = MIN((intptr_t) (tail - head), (intptr_t) n_elems);
if (actual <= 0)
return 0;
}
for (uint32_t i = 0; i < (uint32_t) actual; i++)
elems[i] = lfr->ring[(head + i) & mask].ptr;
smp_fence(LoadStore); // Order loads only
if (UNLIKELY(lfr->flags & LFRING_FLAG_SC)) { /* Single-consumer */
__atomic_store_n(&lfr->head, head + actual, __ATOMIC_RELAXED);
break;
}
/* else: lock-free multi-consumer */
} while (!__atomic_compare_exchange_n(
&lfr->head, &head, /* Updated on failure */
/* XXXXX HHH */ head + actual,
/* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
*index = (uint32_t) head;
return (uint32_t) actual;
}
```
`lfring_dequeue` 首先檢查仍可利用的空間。若沒有任何可用空間,在單生產者模式下便會直接返回;若在多生產者模式下,**由於 `tail` 的值會被多個生產者更改,在前方所快取的 `tail` 變數不一定為最新的 `tail`** ,因此會利用 `find_tail()` 再次取得最新的 `tail` ,若仍無空間便返回。在此之後, `actual` 的值便會確定。 `actual` 為一紀錄實際可取元素數的變數。
接下來便來到內部的 `for` 迴圈,一一將位於環中 head 位置的元素取出並存於 `elems` 中。
最後,在單消費者模式下,由於只會有現在的緒更動環的 `head` ,因此可在更動環的 `head` 後直接進行收尾。**在多消費者模式下,因為會有多個消費者更動環的 `head` ,因此需在主邏輯結束後檢查所快取的 `head` 是否同環的 `head`** ,若是則收尾,否則**更新快取的 `head`** 後,再次從檢查仍可利用的空間開始。
##### `lfring_enqueue`
```c
/* Enqueue elements at tail */
uint32_t lfring_enqueue(lfring_t *lfr,
void *const *restrict elems,
uint32_t n_elems)
{
intptr_t actual = 0;
ringidx_t mask = lfr->mask;
ringidx_t size = mask + 1;
ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_RELAXED);
if (lfr->flags & LFRING_FLAG_SP) { /* single-producer */
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE);
actual = MIN((intptr_t) (head + size - tail), (intptr_t) n_elems);
if (actual <= 0)
return 0;
for (uint32_t i = 0; i < (uint32_t) actual; i++) {
assert(lfr->ring[tail & mask].idx == tail - size);
lfr->ring[tail & mask].ptr = *elems++;
lfr->ring[tail & mask].idx = tail;
tail++;
}
__atomic_store_n(&lfr->tail, tail, __ATOMIC_RELEASE);
return (uint32_t) actual;
}
/* else: lock-free multi-producer */
restart:
while ((uint32_t) actual < n_elems &&
before(tail, __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE) + size)) {
union {
struct element e;
ptrpair_t pp;
} old, neu;
void *elem = elems[actual];
struct element *slot = &lfr->ring[tail & mask];
old.e.ptr = __atomic_load_n(&slot->ptr, __ATOMIC_RELAXED);
old.e.idx = __atomic_load_n(&slot->idx, __ATOMIC_RELAXED);
do {
if (UNLIKELY(old.e.idx != tail - size)) {
if (old.e.idx != tail) {
/* We are far behind. Restart with fresh index */
tail = cond_reload(tail, &lfr->tail);
goto restart;
}
/* slot already enqueued */
tail++; /* Try next slot */
goto restart;
}
/* Found slot that was used one lap back.
* Try to enqueue next element.
*/
neu.e.ptr = elem;
neu.e.idx = tail; /* Set idx on enqueue */
} while (!lf_compare_exchange((ptrpair_t *) slot, &old.pp, neu.pp));
/* Enqueue succeeded */
actual++;
tail++; /* Continue with next slot */
}
(void) cond_update(&lfr->tail, tail);
return (uint32_t) actual;
}
```
`lfring_enqueue` 針對單生產者和多生產者有著不同的策略。若為單生產者,則確認是否有空間插入資料,若無則返回,若有則依據目前最大容量依序將欲插入之資料放入環中,更新環的 `tail` 後返回。
若為多生產者模式,亦先確認是否有空間插入資料,若無空間則返回。若有空間,**由於多個生產者會更動環的 `tail` ,因此需檢查快取的 `tail` 所對應的 slot 是否為空**。若所對應的 slot 不為空的話,則依據不為空的原因來更新快取的 `tail` :如是因為其他的生產者插入**少許**新資料導致,則更新快取的 `tail` 以查看**下一個** slot 的狀態;如是因為其他的生產者插入**多個**新資料導致,則更新快取的 `tail` 以查看**最新的 `tail` 所對應之** slot 的狀態。在此,「多個」係指其他生產者所插入的資料數大於等於環的空間。
在確保現在快取的 `tail` 對應之 slot 為空後,最後再經由 `lf_compare_exchange()` 確保沒有任何其他生產者對於該 slot 插入資料才實際更新將資料插入環中。
---
### [測驗 `3`](https://hackmd.io/@sysprog/By5JAmOX9)
```c
#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
#include <linux/list.h>
#include <linux/module.h>
#include <linux/sched.h>
#include <linux/sched/signal.h>
#include <linux/workqueue.h>
MODULE_AUTHOR("National Cheng Kung University, Taiwan");
MODULE_LICENSE("Dual BSD/GPL");
MODULE_DESCRIPTION("A kernel module that kills ptrace tracer and its tracees");
#define JIFFIES_DELAY 1
#define DONT_TRACE_WQ_NAME "dont_trace_worker"
static void periodic_routine(struct work_struct *);
static DECLARE_DELAYED_WORK(dont_trace_task, periodic_routine);
static struct workqueue_struct *wq;
static bool loaded;
/* Send SIGKILL from kernel space */
static void kill_task(struct task_struct *task)
{
send_sig(SIGKILL, task, 1);
}
/* @return true if the process has tracees */
static bool is_tracer(struct list_head *children)
{
struct list_head *list;
list_for_each (list, children) {
struct task_struct *task =
list_entry(list, struct task_struct, ptrace_entry);
if (task)
return true;
}
return false;
}
/* Traverse the element in the linked list of the ptraced proccesses and
* finally kills them.
*/
static void kill_tracee(struct list_head *children)
{
struct list_head *list;
list_for_each (list, children) {
struct task_struct *task_ptraced =
list_entry(list, struct task_struct, ptrace_entry);
pr_info("ptracee -> comm: %s, pid: %d, gid: %d, ptrace: %d\n",
task_ptraced->comm, task_ptraced->pid, task_ptraced->tgid,
task_ptraced->ptrace);
kill_task(task_ptraced);
}
}
static void check(void)
{
struct task_struct *task;
for_each_process (task) {
if (!is_tracer(&task->ptraced))
continue;
kill_tracee(&task->ptraced);
kill_task(task); /* Kill the tracer once all tracees are killed */
}
}
static void periodic_routine(struct work_struct *ws)
{
/* Check if the module is loaded. */
if (likely(/* XXXXX: Implement */loaded))
check();
/* XXXXX: Implement */
/* Put work task in global workqueue after JIFFIES_DELAY */
schedule_delayed_work(&dont_trace_task, JIFFIES_DELAY);
}
static int __init dont_trace_init(void)
{
wq = create_workqueue(DONT_TRACE_WQ_NAME);
queue_delayed_work(wq, &dont_trace_task, JIFFIES_DELAY);
loaded = true;
pr_info("Loaded!\n");
return 0;
}
static void __exit dont_trace_exit(void)
{
loaded = false;
/* No new routines will be queued */
cancel_delayed_work(&dont_trace_task);
/* Wait for the completion of all routines */
flush_workqueue(wq);
destroy_workqueue(wq);
pr_info("Unloaded.\n");
}
module_init(dont_trace_init);
module_exit(dont_trace_exit);
```
<!-- 延伸問題:
解釋上述程式碼運作原理,應探討 Linux 核心內部 ptrace 系統呼叫和 signal 的實作方式
研讀 The race to limit ptrace 一類的材料,探討行程避免被追蹤的手法 -->
### 程式碼運作原理
當 module 被載入 kernel 後,便會執行 `dont_trace_init` 函式。其功能為將已經初始化的 `dont_trace_task` 加入全域的 workqueue 中,並在 `JIFFIES_DELAY` 後執行所指定的函式,在此即 `periodic_routine` 。
`periodic_routine` 的功能為利用 `check()` 刪除所有 tracer 和 tracee ,並再次將 `dont_trace_task` 加入全域的 workqueue 中,待 `JIFFIES_DELAY` 後再執行自己。
`check` 和其所呼叫的函式即為本模組的關鍵所在。其中 `is_tracer` 函式用於檢查 process 是否有追蹤其他的 process 。其技巧為檢查 `task->ptraced` 中是否有元素被繼承為 `struct task_struct` 。在確定該 process 為 tracer 後,則藉 `kill_tracee(&task->ptraced)` 和 `kill_task(task)` 立即中止其所有 tracee 和該 process 。
`kill_task` 發送中止訊號 `SIGKILL` 至所指定的 `task` 中。其呼叫的函式 `send_sig` 之第三個引數是指「[不要忽略由 kernel 所發出之訊號](https://github.com/torvalds/linux/blob/7dd5ad2d3e82fb55229e3fe18e09160878e77e20/kernel/signal.c#L1225)」。
### 探討行程避免被追蹤的手法
在[該文章](https://www.rezilion.com/blog/the-race-to-limit-ptrace/)中所提及的其中一個方法為設置 ptrace_scope ,將 ptrace_scope 設為最高級別的 3 可避免任何人的追蹤。