# 2022-04-04 `Uduru0522` ## 測驗一:`memchr()` 效能改進 ### 作答 :::spoiler 完整題目程式碼 ```c= #include <stddef.h> #include <stdint.h> #include <limits.h> #include <string.h> /* Nonzero if either X or Y is not aligned on a "long" boundary */ #define UNALIGNED(X) ((long) X & (sizeof(long) - 1)) /* How many bytes are loaded each iteration of the word copy loop */ #define LBLOCKSIZE (sizeof(long)) /* Threshhold for punting to the bytewise iterator */ #define TOO_SMALL(LEN) ((LEN) < LBLOCKSIZE) #if LONG_MAX == 2147483647L #define DETECT_NULL(X) (((X) -0x01010101) & ~(X) & 0x80808080) #else #if LONG_MAX == 9223372036854775807L /* Nonzero if X (a long int) contains a NULL byte. */ #define DETECT_NULL(X) (((X) -0x0101010101010101) & ~(X) & 0x8080808080808080) #else #error long int is not a 32bit or 64bit type. #endif #endif /* @return nonzero if (long)X contains the byte used to fill MASK. */ #define DETECT_CHAR(X, MASK) (DETECT_NULL(X ^ MASK)) void *memchr_opt(const void *src_void, int c, size_t length) { const unsigned char *src = (const unsigned char *) src_void; unsigned char d = c; while (UNALIGNED(src)) { if (!length--) return NULL; if (*src == d) return (void *) src; src++; } if (!TOO_SMALL(length)) { /* If we get this far, we know that length is large and * src is word-aligned. */ /* The fast code reads the source one word at a time and only performs * the bytewise search on word-sized segments if they contain the search * character, which is detected by XORing the word-sized segment with a * word-sized block of the search character and then detecting for the * presence of NULL in the result. */ unsigned long *asrc = (unsigned long *) src; unsigned long mask = d << 8 | d; mask = mask << 16 | mask; for (unsigned int i = 32; i < LBLOCKSIZE * 8; i <<= 1) mask = (mask << i) | mask; while (length >= LBLOCKSIZE) { /* XXXXX: Your implementation should appear here */ } /* If there are fewer than LBLOCKSIZE characters left, then we resort to * the bytewise loop. */ src = (unsigned char *) asrc; } while (length--) { if (*src == d) return (void *) src; src++; } return NULL; } ``` ::: ```c=59 while (length >= LBLOCKSIZE) { /* XXXXX */ unsigned long r = DETECT_CHAR(*asrc, mask); // Enter if has match within word if(r){ src = (unsigned char *)asrc; return (void *)(src + (__builtin_ctzl(r) >> 3)); } asrc = (unsigned long *)((unsigned char *)asrc + LBLOCKSIZE); length -= LBLOCKSIZE; } ``` ### 分析 :::info **`memchr(const void *s, int c, size_t n)` 行為:** + `const void *s`:搜尋元 + 被視為 `unsigned char` 處理 + `int c`:搜尋對象 + 被視為 `unsigned char` 處理 + `size_t n`:`s` 的搜尋長度 + 回傳值:第一個 `c` 的出現位置指標,如果不存在則回傳 `NULL` ::: ```c=59 while (length >= LBLOCKSIZE) { unsigned long r = DETECT_CHAR(*asrc, mask); // Enter if has match within word if(r){ src = (unsigned char *)asrc; return (void *)(src + (__builtin_ctzl(r) >> 3)); } asrc = (unsigned long *)((unsigned char *)asrc + LBLOCKSIZE); length -= LBLOCKSIZE; } ``` 當到達此處時,假設 `d = 0xαβ` 且 `LBLOCKSIZE = 8`,則 `mask = 0xαβαβαβαβαβαβαβαβ`。 我們要做出以下行為: + 自 `asrc` 開始,一次檢查一個 long 長度的資料並回傳 `0xαβ` 的出現位置 。 + 若出現超過一個,則回傳先出現的位置。 首先,使用 `DETECT_CHAR(X, MASK)` 考以幫助我們找到對應的 byte,若符合我們要找的字元,該 byte 會變成 `0x80`;否則為 `0x00`。一個回傳範例可以是 `0x8000008000808000`。 由於結果的每個 byte 僅會有 1 個 set-bit,且會在固定位置,與其用 for 迴圈檢測,我們可以考慮 count trailing zero (`__builtin_ctzl()`) 或 find first set (`__builtin_ffsl()`) 尋找該 set-bit。 :::warning 由於我們直接將 `unsigned char *` 強制型別轉換成 `unsigned long *`,而非使用 `strtol()`,導致根據系統的 endianness 寫法會有差異。由於我的系統為 little-endian,暫時只寫出對應 little-endian 的解答。稍晚補上通用解。 ::: 假設我們得到的 `r = 0xΑαΒβΓγΔδΕεΖζΗηΘθ`,在記憶體中位址由低而高排列為: | 位址 | N | N + 1 | N + 2 | N + 3 | N + 4 | N + 5 | N + 6 | N + 7 | | :------: | :------: | :------: | :------: | :------: | :------: | :------: | :------: | :------: | | **內容** | Θθ | Ηη | Ζζ | Εε | Δδ | Γγ | Ββ | Αα | 得知我們必須取得**最高位址**的 set-**byte**。假設 `Ηη` 為 `0x80`,其餘皆為 `0x00`, + `__builtin_ctzl(r)` 結果為 **55**, `55 >> 3 = 7`。 + `__builtin_ffsl(r)` 結果為 **56**, `56 >> 3 = 8`。 而根據 `memchr()` 的行為,若我們正在檢查 `unsigned char *src`,我們必須取得的是 `src + 7` 的位址,因此我們為了得到 byte 數將其除以 8 (`>> 3`)。假設我們選用 find first set, 其內部行為常常是進行 count trailing zero 後再加一,我們卻需要額外將它減掉,完全是浪費時間的步驟,因此我們選用 `__builtin_ctzl()`。 ## 測驗二:multiple-producer/multiple-consumer (MPMC) 的實作 ### 作答 完整原始程式碼 $\rightarrow$ [gist](https://gist.github.com/jserv/f810c45ad4423f406f9e0dbe9dabadc9) ```c static inline ringidx_t cond_reload(ringidx_t idx, const ringidx_t *loc) { ringidx_t fresh = __atomic_load_n(loc, __ATOMIC_RELAXED); if (before(idx, fresh)) { /* fresh is after idx, use this instead */ idx = fresh; } else { /* Continue with next slot */ /* XXXXX */ idx += 1; } return idx; } static inline ringidx_t find_tail(lfring_t *lfr, ringidx_t head, ringidx_t tail) { if (lfr->flags & LFRING_FLAG_SP) /* single-producer enqueue */ return __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE); /* Multi-producer enqueue. * Scan ring for new elements that have been written but not released. */ ringidx_t mask = lfr->mask; ringidx_t size = /* XXXXX */ mask + 1; while (before(tail, head + size) && __atomic_load_n(/* XXXXX */ &lfr->ring[tail & mask].idx, __ATOMIC_ACQUIRE) == tail) tail++; tail = cond_update(&lfr->tail, tail); return tail; } /* Dequeue elements from head */ uint32_t lfring_dequeue(lfring_t *lfr, void **restrict elems, uint32_t n_elems, uint32_t *index) { ringidx_t mask = lfr->mask; intptr_t actual; ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED); ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE); do { actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems); if (UNLIKELY(actual <= 0)) { /* Ring buffer is empty, scan for new but unreleased elements */ tail = find_tail(lfr, head, tail); actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems); if (actual <= 0) return 0; } for (uint32_t i = 0; i < (uint32_t) actual; i++) elems[i] = lfr->ring[(head + i) & mask].ptr; smp_fence(LoadStore); // Order loads only if (UNLIKELY(lfr->flags & LFRING_FLAG_SC)) { /* Single-consumer */ __atomic_store_n(&lfr->head, head + actual, __ATOMIC_RELAXED); break; } /* else: lock-free multi-consumer */ } while (!__atomic_compare_exchange_n( &lfr->head, &head, /* Updated on failure */ /* XXXXX */ head + actual, /* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); *index = (uint32_t) head; return (uint32_t) actual; } ``` #### HHH `__atomic_compare_exchange_n()` 的運作方式如下: + 若 `*ptr` (第一個參數) 與 `*expected` (第二個參數) 所指向的值相等 + 將 `*desiered` (第三個參數) 內的值寫入 `*ptr` + 回傳 `true` + 若 `*ptr` 與 `*expected` 所指向的值相異 + 將 `*ptr` 的值寫入 `*expected` + 回傳 `false` `lfring_dqueue()` 內部會在呼叫時與真正給予資料前進行 `tail - head` 數值的檢查,若 `tail` 不大於 `head` 則代表 buffer 內部是空的,則尋找有沒有 producer 有沒有未釋出的物件進行接收並傳給 consumer。此時,會造成 `lft.head` 及 `head` 不一致,因此需要修正成 `head + actual`。 ## 測驗三:變更特定 Linux 行程的內部狀態 ### 作答 :::spoiler 完整程式碼 ```c= #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt #include <linux/list.h> #include <linux/module.h> #include <linux/sched.h> #include <linux/sched/signal.h> #include <linux/workqueue.h> MODULE_AUTHOR("National Cheng Kung University, Taiwan"); MODULE_LICENSE("Dual BSD/GPL"); MODULE_DESCRIPTION("A kernel module that kills ptrace tracer and its tracees"); #define JIFFIES_DELAY 1 #define DONT_TRACE_WQ_NAME "dont_trace_worker" static void periodic_routine(struct work_struct *); static DECLARE_DELAYED_WORK(dont_trace_task, periodic_routine); static struct workqueue_struct *wq; static bool loaded; /* Send SIGKILL from kernel space */ static void kill_task(struct task_struct *task) { send_sig(SIGKILL, task, 1); } /* @return true if the process has tracees */ static bool is_tracer(struct list_head *children) { struct list_head *list; list_for_each (list, children) { struct task_struct *task = list_entry(list, struct task_struct, ptrace_entry); if (task) return true; } return false; } /* Traverse the element in the linked list of the ptraced proccesses and * finally kills them. */ static void kill_tracee(struct list_head *children) { struct list_head *list; list_for_each (list, children) { struct task_struct *task_ptraced = list_entry(list, struct task_struct, ptrace_entry); pr_info("ptracee -> comm: %s, pid: %d, gid: %d, ptrace: %d\n", task_ptraced->comm, task_ptraced->pid, task_ptraced->tgid, task_ptraced->ptrace); kill_task(task_ptraced); } } static void check(void) { struct task_struct *task; for_each_process (task) { if (!is_tracer(&task->ptraced)) continue; kill_tracee(&task->ptraced); kill_task(task); /* Kill the tracer once all tracees are killed */ } } static void periodic_routine(struct work_struct *ws) { if (likely(loaded /* XXXXX */)) check(); /* XXXXX */ queue_delayed_work(wq, &dont_trace_task, JIFFIES_DELAY); } static int __init dont_trace_init(void) { wq = create_workqueue(DONT_TRACE_WQ_NAME); queue_delayed_work(wq, &dont_trace_task, JIFFIES_DELAY); loaded = true; pr_info("Loaded!\n"); return 0; } static void __exit dont_trace_exit(void) { loaded = false; /* No new routines will be queued */ cancel_delayed_work(&dont_trace_task); /* Wait for the completion of all routines */ flush_workqueue(wq); destroy_workqueue(wq); pr_info("Unloaded.\n"); } module_init(dont_trace_init); module_exit(dont_trace_exit); ``` ::: ```c static void periodic_routine(struct work_struct *ws) { if (likely(loaded)) check(); queue_delayed_work(wq, &dont_trace_task, JIFFIES_DELAY); } ``` ### 分析 #### Workqueue 的使用 要使用 workqueue 有以下的流程: 1. 宣告、製造並給予 workqueue 名字: ```c=18 static struct workqueue_struct *wq; ``` ```c=79 wq = create_workqueue(DONT_TRACE_WQ_NAME); ``` 2. 將工作設定為 delayed work: ```c=16 static void periodic_routine(struct work_struct *); static DECLARE_DELAYED_WORK(dont_trace_task, periodic_routine); ``` 3. 將 delayed work 加入至 workqueue 並設定延遲時間 ```c=80 queue_delayed_work(wq, &dont_trace_task, JIFFIES_DELAY); ``` ### periodic_routine 首先觀察程式碼及題目說明,可以發現我們使用 workqueue 的原因是要在**某固定時間 (`JIFFIES_DELAY`) 週期重複利用 `check()` 進行檢查**。當我們在 delayed work 的最後將自己加入 workqueue 時,便可以在執行完畢後經過一定的延遲時間再次執行,因此填入第 74 行。 接著,觀察 `check()` 函數是否在執行完畢後有需要整理或修正的全域變數,**發現沒有**,因此不在後方增加更多程式碼。 接著探討 `if(likely(/* XXXXX */))` 內部。likely 這個 macro 告訴編譯器**這個判別式很容易為 true**,且變數 `loaded` 在掛載後才會為 `true`,在解除掛載時才會是 `false`,符合 `periodic_routine()` 的運作時間,因此填入 `loaded`。