---
tags: linux kernel
---
# 2022q1 Quiz8
contributed by < `zoanana990` >
> [測驗題目 8](https://hackmd.io/@sysprog/linux2022-quiz8/https%3A%2F%2Fhackmd.io%2F%40sysprog%2FHyo0W7OQ5)
## 測驗 `1`
- SWAR (SIMD within a register)
- SIMD (Single Instruction Multiple Data),中文為單指令多資料,而 SWAR 可以衍伸為在一個暫存器中進行單指令多資料的操作
- [SIMD and SWAR Techniques](https://www.chessprogramming.org/SIMD_and_SWAR_Techniques)
- memchr
- `DETECT_NULL`
- 在[你所不知道的 C 語言:數值系統篇](/dWpRR9RGRc6v_MNrsrnw5w)有提到 `strlen` 如何藉由 `0x80` 與 `0x01` 找到 `NULL` 位址,這裡用到相同的技巧
- 程式碼:
- 如同 `strlen` 一般,`memchr_opt` 若收到資料尚未對齊的字串,其檢查方式為每個 bit,直到記憶體對齊為止,檢查方式變為每個 word
```c=
while (UNALIGNED(src)) {
if (!length--)
return NULL;
if (*src == d)
return (void *) src;
src++;
}
```
- 解答:
```c=
#include <stddef.h>
#include <stdint.h>
#include <limits.h>
#include <string.h>
/* Nonzero if either X or Y is not aligned on a "long" boundary */
#define UNALIGNED(X) ((long) X & (sizeof(long) - 1))
/* How many bytes are loaded each iteration of the word copy loop */
#define LBLOCKSIZE (sizeof(long)) /* LBLOCKSIZE = 8 */
/* Threshhold for punting to the bytewise iterator */
#define TOO_SMALL(LEN) ((LEN) < LBLOCKSIZE)
#if LONG_MAX == 2147483647L
#define DETECT_NULL(X) (((X) -0x01010101) & ~(X) & 0x80808080)
#else
#if LONG_MAX == 9223372036854775807L
/* Nonzero if X (a long int) contains a NULL byte. */
#define DETECT_NULL(X) (((X) -0x0101010101010101) & ~(X) & 0x8080808080808080)
#else
#error long int is not a 32bit or 64bit type.
#endif
#endif
/* @return nonzero if (long)X contains the byte used to fill MASK. */
#define DETECT_CHAR(X, MASK) (DETECT_NULL(X ^ MASK))
void *memchr_opt(const void *src_void, int c, size_t length)
{
const unsigned char *src = (const unsigned char *) src_void;
unsigned char d = c;
/* Check each bit, until word alignment */
while (UNALIGNED(src)) {
if (!length--)
return NULL;
if (*src == d)
return (void *) src;
src++;
}
if (!TOO_SMALL(length)) {
/* If we get this far, we know that length is large and
* src is word-aligned.
*/
/* The fast code reads the source one word at a time and only performs
* the bytewise search on word-sized segments if they contain the search
* character, which is detected by XORing the word-sized segment with a
* word-sized block of the search character and then detecting for the
* presence of NULL in the result.
*/
unsigned long *asrc = (unsigned long *) src;
unsigned long mask = d << 8 | d;
mask = mask << 16 | mask;
for (unsigned int i = 32; i < LBLOCKSIZE * 8; i <<= 1)
mask = (mask << i) | mask;
while (length >= LBLOCKSIZE) {
/* XXXXX: Your implementation should appear here */
if(!DETECT_CHAR(*asrc, mask));
break;
length -= LBLOCKSIZE;
asrc = asrc + 8;
}
/* If there are fewer than LBLOCKSIZE characters left, then we resort to
* the bytewise loop.
*/
src = (unsigned char *) asrc;
}
while (length--) {
if (*src == d)
return (void *) src;
src++;
}
return NULL;
}
```
## 測驗 `2`
### [Lock Free Ring Buffer](https://lenshood.github.io/2021/04/19/lock-free-ring-buffer/)
由於對這個資料結構不同,這邊查到這篇文章決得很受用,因此在這邊紀錄下來
#### 介紹
Ring Buffer 的主要場景就是在 Producer - Consumer 且我們希望 Ring Buffer 是 thread - safe 的,
### 巨集查詢
1. `likely` 與 `unlikely`:
```c
#define LIKELY(x) __builtin_expect(!!(x), 1)
#define UNLIKELY(x) __builtin_expect(!!(x), 0)
```
- `__builtin_expect` 根據 [GCC](https://gcc.gnu.org/onlinedocs/gcc/Other-Builtins.html) 的文件,此巨集的用意並不影響程式的正確性,但是影響程式的性能,其主要目的是提供處理器在分支預測 (Branch Prediction) 時能夠大概率的預測成功以避免預測失敗的延遲 (Stall)
- 引述原文:
> ==You may use __builtin_expect to provide the compiler with branch prediction information.== In general, you should prefer to use actual profile feedback for this (-fprofile-arcs), as programmers are notoriously bad at predicting how their programs actually perform. However, there are applications in which this data is hard to collect.
- 由此可知 `likely` 和 `unlikely` 的巨集目的:
- `likely`:很可能為發生,編譯器會往 `if` 裡面猜
- `unlikely`:不太可能發生
2. `__builtin_clzl`: 根據 [GCC](https://gcc.gnu.org/onlinedocs/gcc/Other-Builtins.html)文件中得知,它與 `__builtin_clz` 相似,僅是輸入不同而已,其返回左起第一個 1 之前 0 的個數
3. 六種原子模式,這裡只用到 `__ATOMIC_RELAXED` 和 `__ATOMIC_ACQUIRE` 這兩個
```
__ATOMIC_RELAXED
Implies no inter-thread ordering constraints.
/* 沒有任何線程之間的約束 */
__ATOMIC_CONSUME
This is currently implemented using the stronger __ATOMIC_ACQUIRE memory order because of a deficiency in C++11’s semantics for memory_order_consume.
__ATOMIC_ACQUIRE
Creates an inter-thread happens-before constraint from the release (or stronger) semantic store to this acquire load. Can prevent hoisting of code to before the operation.
/* 創造線程之間的約束 */
__ATOMIC_RELEASE
Creates an inter-thread happens-before constraint to acquire (or stronger) semantic loads that read from this release store. Can prevent sinking of code to after the operation.
__ATOMIC_ACQ_REL
Combines the effects of both __ATOMIC_ACQUIRE and __ATOMIC_RELEASE.
__ATOMIC_SEQ_CST
Enforces total ordering with all other __ATOMIC_SEQ_CST operations.
```
解答:
這個一直失敗,不過附上最後嘗試的程式碼
```c
static inline ringidx_t cond_reload(ringidx_t idx, const ringidx_t *loc)
{
ringidx_t fresh = __atomic_load_n(loc, __ATOMIC_RELAXED);
if (before(idx, fresh)) { /* fresh is after idx, use this instead */
idx = fresh;
} else { /* Continue with next slot */
/* XXXXX */ idx = idx + 1; /* DDD */;
}
return idx;
}
static inline ringidx_t find_tail(lfring_t *lfr, ringidx_t head, ringidx_t tail)
{
if (lfr->flags & LFRING_FLAG_SP) /* single-producer enqueue */
return __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
/* Multi-producer enqueue.
* Scan ring for new elements that have been written but not released.
*/
ringidx_t mask = lfr->mask;
ringidx_t size = /* XXXXX */ mask + 1 /* KKK */;
while (before(tail, head + size) &&
__atomic_load_n(/* XXXXX */ &lfr->head, __ATOMIC_ACQUIRE/* TTT */) ==
tail)
tail++;
tail = cond_update(&lfr->tail, tail);
return tail;
}
uint32_t lfring_dequeue(lfring_t *lfr,
void **restrict elems,
uint32_t n_elems,
uint32_t *index)
{
ringidx_t mask = lfr->mask;
intptr_t actual;
ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED);
ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
do { /* skipped */
} while (!__atomic_compare_exchange_n(
&lfr->head, &head, /* Updated on failure */
/* XXXXX */ actual /* HHH */,
/* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
*index = (uint32_t) head;
return (uint32_t) actual;
}
```
## 測驗 `3`
首先了解這兩個名詞的定義:
- ptrace
- ptrace 是一種行程用來監視其他和控制其它行程,它能够改變子行程中的暫存器,因而實現系统调用的追踪與系統調用 (System Call)。
- [workqueue](https://www.kernel.org/doc/html/v4.14/core-api/workqueue.html)
-
解答:
```c
#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
#include <linux/list.h>
#include <linux/module.h>
#include <linux/sched.h>
#include <linux/sched/signal.h>
#include <linux/workqueue.h>
MODULE_AUTHOR("National Cheng Kung University, Taiwan");
MODULE_LICENSE("Dual BSD/GPL");
MODULE_DESCRIPTION("A kernel module that kills ptrace tracer and its tracees");
#define JIFFIES_DELAY 1
#define DONT_TRACE_WQ_NAME "dont_trace_worker"
static void periodic_routine(struct work_struct *);
static DECLARE_DELAYED_WORK(dont_trace_task, periodic_routine);
static struct workqueue_struct *wq;
static bool loaded;
/* Send SIGKILL from kernel space */
static void kill_task(struct task_struct *task)
{
send_sig(SIGKILL, task, 1);
}
/* @return true if the process has tracees */
static bool is_tracer(struct list_head *children)
{
struct list_head *list;
list_for_each (list, children) {
struct task_struct *task =
list_entry(list, struct task_struct, ptrace_entry);
if (task)
return true;
}
return false;
}
/* Traverse the element in the linked list of the ptraced proccesses and
* finally kills them.
*/
static void kill_tracee(struct list_head *children)
{
struct list_head *list;
list_for_each (list, children) {
struct task_struct *task_ptraced =
list_entry(list, struct task_struct, ptrace_entry);
pr_info("ptracee -> comm: %s, pid: %d, gid: %d, ptrace: %d\n",
task_ptraced->comm, task_ptraced->pid, task_ptraced->tgid,
task_ptraced->ptrace);
kill_task(task_ptraced);
}
}
/* kill both tracer and the corresponding tracee */
static void check(void)
{
struct task_struct *task;
for_each_process (task) {
if (!is_tracer(&task->ptraced))
continue;
kill_tracee(&task->ptraced);
kill_task(task); /* Kill the tracer once all tracees are killed */
}
}
/* TODO */
static void periodic_routine(struct work_struct *ws)
{
if (likely(loaded))
check();
queue_delayed_work(wq, &dont_trace_task, JIFFIES_DELAY);
}
// MODULE INITIALIZATION
static int __init dont_trace_init(void)
{
wq = create_workqueue(DONT_TRACE_WQ_NAME);
/* */
queue_delayed_work(wq, &dont_trace_task, JIFFIES_DELAY);
loaded = true;
pr_info("Loaded!\n");
return 0;
}
// MODULE EXIT
static void __exit dont_trace_exit(void)
{
loaded = false;
/* No new routines will be queued */
cancel_delayed_work(&dont_trace_task);
/* Wait for the completion of all routines */
flush_workqueue(wq);
destroy_workqueue(wq);
pr_info("Unloaded.\n");
}
module_init(dont_trace_init);
module_exit(dont_trace_exit);
```