# 2022-04-04 `Uduru0522`
## 測驗一:`memchr()` 效能改進
### 作答
:::spoiler 完整題目程式碼
```c=
#include <stddef.h>
#include <stdint.h>
#include <limits.h>
#include <string.h>
/* Nonzero if either X or Y is not aligned on a "long" boundary */
#define UNALIGNED(X) ((long) X & (sizeof(long) - 1))
/* How many bytes are loaded each iteration of the word copy loop */
#define LBLOCKSIZE (sizeof(long))
/* Threshhold for punting to the bytewise iterator */
#define TOO_SMALL(LEN) ((LEN) < LBLOCKSIZE)
#if LONG_MAX == 2147483647L
#define DETECT_NULL(X) (((X) -0x01010101) & ~(X) & 0x80808080)
#else
#if LONG_MAX == 9223372036854775807L
/* Nonzero if X (a long int) contains a NULL byte. */
#define DETECT_NULL(X) (((X) -0x0101010101010101) & ~(X) & 0x8080808080808080)
#else
#error long int is not a 32bit or 64bit type.
#endif
#endif
/* @return nonzero if (long)X contains the byte used to fill MASK. */
#define DETECT_CHAR(X, MASK) (DETECT_NULL(X ^ MASK))
void *memchr_opt(const void *src_void, int c, size_t length)
{
const unsigned char *src = (const unsigned char *) src_void;
unsigned char d = c;
while (UNALIGNED(src)) {
if (!length--)
return NULL;
if (*src == d)
return (void *) src;
src++;
}
if (!TOO_SMALL(length)) {
/* If we get this far, we know that length is large and
* src is word-aligned.
*/
/* The fast code reads the source one word at a time and only performs
* the bytewise search on word-sized segments if they contain the search
* character, which is detected by XORing the word-sized segment with a
* word-sized block of the search character and then detecting for the
* presence of NULL in the result.
*/
unsigned long *asrc = (unsigned long *) src;
unsigned long mask = d << 8 | d;
mask = mask << 16 | mask;
for (unsigned int i = 32; i < LBLOCKSIZE * 8; i <<= 1)
mask = (mask << i) | mask;
while (length >= LBLOCKSIZE) {
/* XXXXX: Your implementation should appear here */
}
/* If there are fewer than LBLOCKSIZE characters left, then we resort to
* the bytewise loop.
*/
src = (unsigned char *) asrc;
}
while (length--) {
if (*src == d)
return (void *) src;
src++;
}
return NULL;
}
```
:::
```c=59
while (length >= LBLOCKSIZE) {
/* XXXXX */
unsigned long r = DETECT_CHAR(*asrc, mask);
// Enter if has match within word
if(r){
src = (unsigned char *)asrc;
return (void *)(src + (__builtin_ctzl(r) >> 3));
}
asrc = (unsigned long *)((unsigned char *)asrc + LBLOCKSIZE);
length -= LBLOCKSIZE;
}
```
### 分析
:::info
**`memchr(const void *s, int c, size_t n)` 行為:**
+ `const void *s`:搜尋元
+ 被視為 `unsigned char` 處理
+ `int c`:搜尋對象
+ 被視為 `unsigned char` 處理
+ `size_t n`:`s` 的搜尋長度
+ 回傳值:第一個 `c` 的出現位置指標,如果不存在則回傳 `NULL`
:::
```c=59
while (length >= LBLOCKSIZE) {
unsigned long r = DETECT_CHAR(*asrc, mask);
// Enter if has match within word
if(r){
src = (unsigned char *)asrc;
return (void *)(src + (__builtin_ctzl(r) >> 3));
}
asrc = (unsigned long *)((unsigned char *)asrc + LBLOCKSIZE);
length -= LBLOCKSIZE;
}
```
當到達此處時,假設 `d = 0xαβ` 且 `LBLOCKSIZE = 8`,則 `mask = 0xαβαβαβαβαβαβαβαβ`。
我們要做出以下行為:
+ 自 `asrc` 開始,一次檢查一個 long 長度的資料並回傳 `0xαβ` 的出現位置 。
+ 若出現超過一個,則回傳先出現的位置。
首先,使用 `DETECT_CHAR(X, MASK)` 考以幫助我們找到對應的 byte,若符合我們要找的字元,該 byte 會變成 `0x80`;否則為 `0x00`。一個回傳範例可以是 `0x8000008000808000`。
由於結果的每個 byte 僅會有 1 個 set-bit,且會在固定位置,與其用 for 迴圈檢測,我們可以考慮 count trailing zero (`__builtin_ctzl()`) 或 find first set (`__builtin_ffsl()`) 尋找該 set-bit。
:::warning
由於我們直接將 `unsigned char *` 強制型別轉換成 `unsigned long *`,而非使用 `strtol()`,導致根據系統的 endianness 寫法會有差異。由於我的系統為 little-endian,暫時只寫出對應 little-endian 的解答。稍晚補上通用解。
:::
假設我們得到的 `r = 0xΑαΒβΓγΔδΕεΖζΗηΘθ`,在記憶體中位址由低而高排列為:
| 位址 | N | N + 1 | N + 2 | N + 3 | N + 4 | N + 5 | N + 6 | N + 7 |
| :------: | :------: | :------: | :------: | :------: | :------: | :------: | :------: | :------: |
| **內容** | Θθ | Ηη | Ζζ | Εε | Δδ | Γγ | Ββ | Αα |
得知我們必須取得**最高位址**的 set-**byte**。假設 `Ηη` 為 `0x80`,其餘皆為 `0x00`,
+ `__builtin_ctzl(r)` 結果為 **55**, `55 >> 3 = 7`。
+ `__builtin_ffsl(r)` 結果為 **56**, `56 >> 3 = 8`。
而根據 `memchr()` 的行為,若我們正在檢查 `unsigned char *src`,我們必須取得的是 `src + 7` 的位址,因此我們為了得到 byte 數將其除以 8 (`>> 3`)。假設我們選用 find first set, 其內部行為常常是進行 count trailing zero 後再加一,我們卻需要額外將它減掉,完全是浪費時間的步驟,因此我們選用 `__builtin_ctzl()`。
## 測驗二:multiple-producer/multiple-consumer (MPMC) 的實作
### 作答
完整原始程式碼 $\rightarrow$ [gist](https://gist.github.com/jserv/f810c45ad4423f406f9e0dbe9dabadc9)
```c
static inline ringidx_t cond_reload(ringidx_t idx, const ringidx_t *loc)
{
ringidx_t fresh = __atomic_load_n(loc, __ATOMIC_RELAXED);
if (before(idx, fresh)) { /* fresh is after idx, use this instead */
idx = fresh;
} else { /* Continue with next slot */
/* XXXXX */ idx += 1;
}
return idx;
}
static inline ringidx_t find_tail(lfring_t *lfr, ringidx_t head, ringidx_t tail)
{
if (lfr->flags & LFRING_FLAG_SP) /* single-producer enqueue */
return __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
/* Multi-producer enqueue.
* Scan ring for new elements that have been written but not released.
*/
ringidx_t mask = lfr->mask;
ringidx_t size = /* XXXXX */ mask + 1;
while (before(tail, head + size) &&
__atomic_load_n(/* XXXXX */ &lfr->ring[tail & mask].idx, __ATOMIC_ACQUIRE) == tail)
tail++;
tail = cond_update(&lfr->tail, tail);
return tail;
}
/* Dequeue elements from head */
uint32_t lfring_dequeue(lfring_t *lfr,
void **restrict elems,
uint32_t n_elems,
uint32_t *index)
{
ringidx_t mask = lfr->mask;
intptr_t actual;
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED);
ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
do {
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
if (UNLIKELY(actual <= 0)) {
/* Ring buffer is empty, scan for new but unreleased elements */
tail = find_tail(lfr, head, tail);
actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
if (actual <= 0)
return 0;
}
for (uint32_t i = 0; i < (uint32_t) actual; i++)
elems[i] = lfr->ring[(head + i) & mask].ptr;
smp_fence(LoadStore); // Order loads only
if (UNLIKELY(lfr->flags & LFRING_FLAG_SC)) { /* Single-consumer */
__atomic_store_n(&lfr->head, head + actual, __ATOMIC_RELAXED);
break;
}
/* else: lock-free multi-consumer */
} while (!__atomic_compare_exchange_n(
&lfr->head, &head, /* Updated on failure */
/* XXXXX */ head + actual,
/* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
*index = (uint32_t) head;
return (uint32_t) actual;
}
```
#### HHH
`__atomic_compare_exchange_n()` 的運作方式如下:
+ 若 `*ptr` (第一個參數) 與 `*expected` (第二個參數) 所指向的值相等
+ 將 `*desiered` (第三個參數) 內的值寫入 `*ptr`
+ 回傳 `true`
+ 若 `*ptr` 與 `*expected` 所指向的值相異
+ 將 `*ptr` 的值寫入 `*expected`
+ 回傳 `false`
`lfring_dqueue()` 內部會在呼叫時與真正給予資料前進行 `tail - head` 數值的檢查,若 `tail` 不大於 `head` 則代表 buffer 內部是空的,則尋找有沒有 producer 有沒有未釋出的物件進行接收並傳給 consumer。此時,會造成 `lft.head` 及 `head` 不一致,因此需要修正成 `head + actual`。
## 測驗三:變更特定 Linux 行程的內部狀態
### 作答
:::spoiler 完整程式碼
```c=
#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
#include <linux/list.h>
#include <linux/module.h>
#include <linux/sched.h>
#include <linux/sched/signal.h>
#include <linux/workqueue.h>
MODULE_AUTHOR("National Cheng Kung University, Taiwan");
MODULE_LICENSE("Dual BSD/GPL");
MODULE_DESCRIPTION("A kernel module that kills ptrace tracer and its tracees");
#define JIFFIES_DELAY 1
#define DONT_TRACE_WQ_NAME "dont_trace_worker"
static void periodic_routine(struct work_struct *);
static DECLARE_DELAYED_WORK(dont_trace_task, periodic_routine);
static struct workqueue_struct *wq;
static bool loaded;
/* Send SIGKILL from kernel space */
static void kill_task(struct task_struct *task)
{
send_sig(SIGKILL, task, 1);
}
/* @return true if the process has tracees */
static bool is_tracer(struct list_head *children)
{
struct list_head *list;
list_for_each (list, children) {
struct task_struct *task =
list_entry(list, struct task_struct, ptrace_entry);
if (task)
return true;
}
return false;
}
/* Traverse the element in the linked list of the ptraced proccesses and
* finally kills them.
*/
static void kill_tracee(struct list_head *children)
{
struct list_head *list;
list_for_each (list, children) {
struct task_struct *task_ptraced =
list_entry(list, struct task_struct, ptrace_entry);
pr_info("ptracee -> comm: %s, pid: %d, gid: %d, ptrace: %d\n",
task_ptraced->comm, task_ptraced->pid, task_ptraced->tgid,
task_ptraced->ptrace);
kill_task(task_ptraced);
}
}
static void check(void)
{
struct task_struct *task;
for_each_process (task) {
if (!is_tracer(&task->ptraced))
continue;
kill_tracee(&task->ptraced);
kill_task(task); /* Kill the tracer once all tracees are killed */
}
}
static void periodic_routine(struct work_struct *ws)
{
if (likely(loaded /* XXXXX */))
check();
/* XXXXX */
queue_delayed_work(wq, &dont_trace_task, JIFFIES_DELAY);
}
static int __init dont_trace_init(void)
{
wq = create_workqueue(DONT_TRACE_WQ_NAME);
queue_delayed_work(wq, &dont_trace_task, JIFFIES_DELAY);
loaded = true;
pr_info("Loaded!\n");
return 0;
}
static void __exit dont_trace_exit(void)
{
loaded = false;
/* No new routines will be queued */
cancel_delayed_work(&dont_trace_task);
/* Wait for the completion of all routines */
flush_workqueue(wq);
destroy_workqueue(wq);
pr_info("Unloaded.\n");
}
module_init(dont_trace_init);
module_exit(dont_trace_exit);
```
:::
```c
static void periodic_routine(struct work_struct *ws)
{
if (likely(loaded))
check();
queue_delayed_work(wq, &dont_trace_task, JIFFIES_DELAY);
}
```
### 分析
#### Workqueue 的使用
要使用 workqueue 有以下的流程:
1. 宣告、製造並給予 workqueue 名字:
```c=18
static struct workqueue_struct *wq;
```
```c=79
wq = create_workqueue(DONT_TRACE_WQ_NAME);
```
2. 將工作設定為 delayed work:
```c=16
static void periodic_routine(struct work_struct *);
static DECLARE_DELAYED_WORK(dont_trace_task, periodic_routine);
```
3. 將 delayed work 加入至 workqueue 並設定延遲時間
```c=80
queue_delayed_work(wq, &dont_trace_task, JIFFIES_DELAY);
```
### periodic_routine
首先觀察程式碼及題目說明,可以發現我們使用 workqueue 的原因是要在**某固定時間 (`JIFFIES_DELAY`) 週期重複利用 `check()` 進行檢查**。當我們在 delayed work 的最後將自己加入 workqueue 時,便可以在執行完畢後經過一定的延遲時間再次執行,因此填入第 74 行。
接著,觀察 `check()` 函數是否在執行完畢後有需要整理或修正的全域變數,**發現沒有**,因此不在後方增加更多程式碼。
接著探討 `if(likely(/* XXXXX */))` 內部。likely 這個 macro 告訴編譯器**這個判別式很容易為 true**,且變數 `loaded` 在掛載後才會為 `true`,在解除掛載時才會是 `false`,符合 `periodic_routine()` 的運作時間,因此填入 `loaded`。