--- tags: linux kernel --- # 2022q1 Quiz8 contributed by < `zoanana990` > > [測驗題目 8](https://hackmd.io/@sysprog/linux2022-quiz8/https%3A%2F%2Fhackmd.io%2F%40sysprog%2FHyo0W7OQ5) ## 測驗 `1` - SWAR (SIMD within a register) - SIMD (Single Instruction Multiple Data),中文為單指令多資料,而 SWAR 可以衍伸為在一個暫存器中進行單指令多資料的操作 - [SIMD and SWAR Techniques](https://www.chessprogramming.org/SIMD_and_SWAR_Techniques) - memchr - `DETECT_NULL` - 在[你所不知道的 C 語言:數值系統篇](/dWpRR9RGRc6v_MNrsrnw5w)有提到 `strlen` 如何藉由 `0x80` 與 `0x01` 找到 `NULL` 位址,這裡用到相同的技巧 - 程式碼: - 如同 `strlen` 一般,`memchr_opt` 若收到資料尚未對齊的字串,其檢查方式為每個 bit,直到記憶體對齊為止,檢查方式變為每個 word ```c= while (UNALIGNED(src)) { if (!length--) return NULL; if (*src == d) return (void *) src; src++; } ``` - 解答: ```c= #include <stddef.h> #include <stdint.h> #include <limits.h> #include <string.h> /* Nonzero if either X or Y is not aligned on a "long" boundary */ #define UNALIGNED(X) ((long) X & (sizeof(long) - 1)) /* How many bytes are loaded each iteration of the word copy loop */ #define LBLOCKSIZE (sizeof(long)) /* LBLOCKSIZE = 8 */ /* Threshhold for punting to the bytewise iterator */ #define TOO_SMALL(LEN) ((LEN) < LBLOCKSIZE) #if LONG_MAX == 2147483647L #define DETECT_NULL(X) (((X) -0x01010101) & ~(X) & 0x80808080) #else #if LONG_MAX == 9223372036854775807L /* Nonzero if X (a long int) contains a NULL byte. */ #define DETECT_NULL(X) (((X) -0x0101010101010101) & ~(X) & 0x8080808080808080) #else #error long int is not a 32bit or 64bit type. #endif #endif /* @return nonzero if (long)X contains the byte used to fill MASK. */ #define DETECT_CHAR(X, MASK) (DETECT_NULL(X ^ MASK)) void *memchr_opt(const void *src_void, int c, size_t length) { const unsigned char *src = (const unsigned char *) src_void; unsigned char d = c; /* Check each bit, until word alignment */ while (UNALIGNED(src)) { if (!length--) return NULL; if (*src == d) return (void *) src; src++; } if (!TOO_SMALL(length)) { /* If we get this far, we know that length is large and * src is word-aligned. */ /* The fast code reads the source one word at a time and only performs * the bytewise search on word-sized segments if they contain the search * character, which is detected by XORing the word-sized segment with a * word-sized block of the search character and then detecting for the * presence of NULL in the result. */ unsigned long *asrc = (unsigned long *) src; unsigned long mask = d << 8 | d; mask = mask << 16 | mask; for (unsigned int i = 32; i < LBLOCKSIZE * 8; i <<= 1) mask = (mask << i) | mask; while (length >= LBLOCKSIZE) { /* XXXXX: Your implementation should appear here */ if(!DETECT_CHAR(*asrc, mask)); break; length -= LBLOCKSIZE; asrc = asrc + 8; } /* If there are fewer than LBLOCKSIZE characters left, then we resort to * the bytewise loop. */ src = (unsigned char *) asrc; } while (length--) { if (*src == d) return (void *) src; src++; } return NULL; } ``` ## 測驗 `2` ### [Lock Free Ring Buffer](https://lenshood.github.io/2021/04/19/lock-free-ring-buffer/) 由於對這個資料結構不同,這邊查到這篇文章決得很受用,因此在這邊紀錄下來 #### 介紹 Ring Buffer 的主要場景就是在 Producer - Consumer 且我們希望 Ring Buffer 是 thread - safe 的, ### 巨集查詢 1. `likely` 與 `unlikely`: ```c #define LIKELY(x) __builtin_expect(!!(x), 1) #define UNLIKELY(x) __builtin_expect(!!(x), 0) ``` - `__builtin_expect` 根據 [GCC](https://gcc.gnu.org/onlinedocs/gcc/Other-Builtins.html) 的文件,此巨集的用意並不影響程式的正確性,但是影響程式的性能,其主要目的是提供處理器在分支預測 (Branch Prediction) 時能夠大概率的預測成功以避免預測失敗的延遲 (Stall) - 引述原文: > ==You may use __builtin_expect to provide the compiler with branch prediction information.== In general, you should prefer to use actual profile feedback for this (-fprofile-arcs), as programmers are notoriously bad at predicting how their programs actually perform. However, there are applications in which this data is hard to collect. - 由此可知 `likely` 和 `unlikely` 的巨集目的: - `likely`:很可能為發生,編譯器會往 `if` 裡面猜 - `unlikely`:不太可能發生 2. `__builtin_clzl`: 根據 [GCC](https://gcc.gnu.org/onlinedocs/gcc/Other-Builtins.html)文件中得知,它與 `__builtin_clz` 相似,僅是輸入不同而已,其返回左起第一個 1 之前 0 的個數 3. 六種原子模式,這裡只用到 `__ATOMIC_RELAXED` 和 `__ATOMIC_ACQUIRE` 這兩個 ``` __ATOMIC_RELAXED Implies no inter-thread ordering constraints. /* 沒有任何線程之間的約束 */ __ATOMIC_CONSUME This is currently implemented using the stronger __ATOMIC_ACQUIRE memory order because of a deficiency in C++11’s semantics for memory_order_consume. __ATOMIC_ACQUIRE Creates an inter-thread happens-before constraint from the release (or stronger) semantic store to this acquire load. Can prevent hoisting of code to before the operation. /* 創造線程之間的約束 */ __ATOMIC_RELEASE Creates an inter-thread happens-before constraint to acquire (or stronger) semantic loads that read from this release store. Can prevent sinking of code to after the operation. __ATOMIC_ACQ_REL Combines the effects of both __ATOMIC_ACQUIRE and __ATOMIC_RELEASE. __ATOMIC_SEQ_CST Enforces total ordering with all other __ATOMIC_SEQ_CST operations. ``` 解答: 這個一直失敗,不過附上最後嘗試的程式碼 ```c static inline ringidx_t cond_reload(ringidx_t idx, const ringidx_t *loc) { ringidx_t fresh = __atomic_load_n(loc, __ATOMIC_RELAXED); if (before(idx, fresh)) { /* fresh is after idx, use this instead */ idx = fresh; } else { /* Continue with next slot */ /* XXXXX */ idx = idx + 1; /* DDD */; } return idx; } static inline ringidx_t find_tail(lfring_t *lfr, ringidx_t head, ringidx_t tail) { if (lfr->flags & LFRING_FLAG_SP) /* single-producer enqueue */ return __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE); /* Multi-producer enqueue. * Scan ring for new elements that have been written but not released. */ ringidx_t mask = lfr->mask; ringidx_t size = /* XXXXX */ mask + 1 /* KKK */; while (before(tail, head + size) && __atomic_load_n(/* XXXXX */ &lfr->head, __ATOMIC_ACQUIRE/* TTT */) == tail) tail++; tail = cond_update(&lfr->tail, tail); return tail; } uint32_t lfring_dequeue(lfring_t *lfr, void **restrict elems, uint32_t n_elems, uint32_t *index) { ringidx_t mask = lfr->mask; intptr_t actual; ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED); ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE); do { /* skipped */ } while (!__atomic_compare_exchange_n( &lfr->head, &head, /* Updated on failure */ /* XXXXX */ actual /* HHH */, /* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); *index = (uint32_t) head; return (uint32_t) actual; } ``` ## 測驗 `3` 首先了解這兩個名詞的定義: - ptrace - ptrace 是一種行程用來監視其他和控制其它行程,它能够改變子行程中的暫存器,因而實現系统调用的追踪與系統調用 (System Call)。 - [workqueue](https://www.kernel.org/doc/html/v4.14/core-api/workqueue.html) - 解答: ```c #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt #include <linux/list.h> #include <linux/module.h> #include <linux/sched.h> #include <linux/sched/signal.h> #include <linux/workqueue.h> MODULE_AUTHOR("National Cheng Kung University, Taiwan"); MODULE_LICENSE("Dual BSD/GPL"); MODULE_DESCRIPTION("A kernel module that kills ptrace tracer and its tracees"); #define JIFFIES_DELAY 1 #define DONT_TRACE_WQ_NAME "dont_trace_worker" static void periodic_routine(struct work_struct *); static DECLARE_DELAYED_WORK(dont_trace_task, periodic_routine); static struct workqueue_struct *wq; static bool loaded; /* Send SIGKILL from kernel space */ static void kill_task(struct task_struct *task) { send_sig(SIGKILL, task, 1); } /* @return true if the process has tracees */ static bool is_tracer(struct list_head *children) { struct list_head *list; list_for_each (list, children) { struct task_struct *task = list_entry(list, struct task_struct, ptrace_entry); if (task) return true; } return false; } /* Traverse the element in the linked list of the ptraced proccesses and * finally kills them. */ static void kill_tracee(struct list_head *children) { struct list_head *list; list_for_each (list, children) { struct task_struct *task_ptraced = list_entry(list, struct task_struct, ptrace_entry); pr_info("ptracee -> comm: %s, pid: %d, gid: %d, ptrace: %d\n", task_ptraced->comm, task_ptraced->pid, task_ptraced->tgid, task_ptraced->ptrace); kill_task(task_ptraced); } } /* kill both tracer and the corresponding tracee */ static void check(void) { struct task_struct *task; for_each_process (task) { if (!is_tracer(&task->ptraced)) continue; kill_tracee(&task->ptraced); kill_task(task); /* Kill the tracer once all tracees are killed */ } } /* TODO */ static void periodic_routine(struct work_struct *ws) { if (likely(loaded)) check(); queue_delayed_work(wq, &dont_trace_task, JIFFIES_DELAY); } // MODULE INITIALIZATION static int __init dont_trace_init(void) { wq = create_workqueue(DONT_TRACE_WQ_NAME); /* */ queue_delayed_work(wq, &dont_trace_task, JIFFIES_DELAY); loaded = true; pr_info("Loaded!\n"); return 0; } // MODULE EXIT static void __exit dont_trace_exit(void) { loaded = false; /* No new routines will be queued */ cancel_delayed_work(&dont_trace_task); /* Wait for the completion of all routines */ flush_workqueue(wq); destroy_workqueue(wq); pr_info("Unloaded.\n"); } module_init(dont_trace_init); module_exit(dont_trace_exit); ```