--- tags: Linux Kernel --- # 2022q1 Homework5 (quiz8) contributed by < `kevinshieh0225` > > [測驗題目](https://hackmd.io/@sysprog/linux2022-quiz8) > [GitHub](https://github.com/kevinshieh0225/linux2022-quiz8) ## [測驗 `1`](https://hackmd.io/@sysprog/linux2022-quiz8/https%3A%2F%2Fhackmd.io%2F%40sysprog%2FHyg5nxO79):[memchr](https://man7.org/linux/man-pages/man3/memchr.3.html) 效能改進 :::info ```c while (length >= LBLOCKSIZE) { // if find the char, break to check which char is the target address. if (DETECT_CHAR(*asrc, mask) != 0) break; else { length -= LBLOCKSIZE; asrc++; } } ``` ::: :::spoiler 完整程式碼 ```c #include <stddef.h> #include <stdint.h> #include <limits.h> #include <string.h> /* Nonzero if either X or Y is not aligned on a "long" boundary */ #define UNALIGNED(X) ((long) X & (sizeof(long) - 1)) /* How many bytes are loaded each iteration of the word copy loop */ #define LBLOCKSIZE (sizeof(long)) /* Threshhold for punting to the bytewise iterator */ #define TOO_SMALL(LEN) ((LEN) < LBLOCKSIZE) #if LONG_MAX == 2147483647L #define DETECT_NULL(X) (((X) -0x01010101) & ~(X) & 0x80808080) #else #if LONG_MAX == 9223372036854775807L /* Nonzero if X (a long int) contains a NULL byte. */ #define DETECT_NULL(X) (((X) -0x0101010101010101) & ~(X) & 0x8080808080808080) #else #error long int is not a 32bit or 64bit type. #endif #endif /* @return nonzero if (long)X contains the byte used to fill MASK. */ #define DETECT_CHAR(X, MASK) (DETECT_NULL(X ^ MASK)) void *memchr_opt(const void *src_void, int c, size_t length) { const unsigned char *src = (const unsigned char *) src_void; unsigned char d = c; while (UNALIGNED(src)) { if (!length--) return NULL; if (*src == d) return (void *) src; src++; } if (!TOO_SMALL(length)) { /* If we get this far, we know that length is large and * src is word-aligned. */ /* The fast code reads the source one word at a time and only performs * the bytewise search on word-sized segments if they contain the search * character, which is detected by XORing the word-sized segment with a * word-sized block of the search character and then detecting for the * presence of NULL in the result. */ unsigned long *asrc = (unsigned long *) src; unsigned long mask = d << 8 | d; mask = mask << 16 | mask; for (unsigned int i = 32; i < LBLOCKSIZE * 8; i <<= 1) mask = (mask << i) | mask; while (length >= LBLOCKSIZE) { // if find the char, check which char is the target address. if (DETECT_CHAR(*asrc, mask) != 0) break; else { length -= LBLOCKSIZE; asrc++; } } /* If there are fewer than LBLOCKSIZE characters left, then we resort to * the bytewise loop. */ src = (unsigned char *) asrc; } while (length--) { if (*src == d) return (void *) src; src++; } return NULL; } ``` ::: 原本我們在尋找字元時會迭代的逐一比對尋找,這次我們希望善用處理器一次能處理 32/64 位元的特性,增大一次比對搜尋的範圍。 ### DETECT_CHAR 利用 `DETECT_NULL` 巨集,我們希望搜尋在 `word size` 中,是否有任何一個 byte 是 0(`char size = 1byte`)。 再透過利用 `DETECT_CHAR` 巨集,我們將拼接好的 `word size char MASK` 和 `X` 做 XOR。 XOR 的特性是:兩者若位元相同為 0,位元相異為 1。於是乎如果 `X` 當中如果有我們在找的目標字符,那該位元組就會被設定為 NULL,丟入 `DETECT_NULL` 即會回傳 nonzero。 關於 `DETECT_NULL` 的說明可以在 [數值系統篇](https://hackmd.io/@sysprog/c-numerics?type=view#運用-bit-wise-operator) 找到。 ```c #if LONG_MAX == 9223372036854775807L /* Nonzero if X (a long int) contains a NULL byte. */ #define DETECT_NULL(X) (((X) -0x0101010101010101) & ~(X) & 0x8080808080808080) #else #error long int is not a 32bit or 64bit type. #endif #endif /* @return nonzero if (long)X contains the byte used to fill MASK. */ #define DETECT_CHAR(X, MASK) (DETECT_NULL(X ^ MASK)) ``` ### word size MASK 在處理完例外狀況後(處理無效地址、字串長度大於一個 `word size`),我們製作 `word size MASK`。 ```c void *memchr_opt(const void *src_void, int c, size_t length) { ... unsigned long *asrc = (unsigned long *) src; unsigned long mask = d << 8 | d; mask = mask << 16 | mask; for (unsigned int i = 32; i < LBLOCKSIZE * 8; i <<= 1) mask = (mask << i) | mask; ... } ``` ### Find char in Long mask scale(answer) 在字串長度還大於一個 `word size` 時,進行 `DETECT_CHAR`,如果發現存有空位元組,或是如果搜尋到最後發現剩下的長度已經小於 `word size` 了,那就離開迴圈。 把搜尋規模改為 `char`,對範圍中的字串逐一搜尋位元組。 ```c void *memchr_opt(const void *src_void, int c, size_t length) { ... while (length >= LBLOCKSIZE) { // if find the char, break to check which char is the target address. if (DETECT_CHAR(*asrc, mask) != 0) break; else { length -= LBLOCKSIZE; asrc++; } } /* If there are fewer than LBLOCKSIZE characters left, then we resort to * the bytewise loop. */ src = (unsigned char *) asrc; } while (length--) { if (*src == d) return (void *) src; src++; } return NULL; } ``` :::warning ToDo 使用 `SIMD` , `SSE` 等組合語言再進行搜索加速。 ::: ### Test Code ```c #include <stdio.h> #include "memchr_opt.h" int main() { const char str[] = "http://wiki.csie.ncku.edu.tw"; const char ch1 = '.'; const char ch2 = 'w'; const char ch3 = 'u'; const char ch4 = 'm'; // non exist char char *ret1 = memchr_opt(str, ch1, strlen(str)); char *ret2 = memchr_opt(str, ch2, strlen(str)); char *ret3 = memchr_opt(str, ch3, strlen(str)); char *ret4 = memchr_opt(str, ch4, strlen(str)); printf("String after |%c| is - |%s|\n", ch1, ret1); printf("String after |%c| is - |%s|\n", ch2, ret2); printf("String after |%c| is - |%s|\n", ch3, ret3); printf("String after |%c| is - |%s|\n", ch4, ret4); return 0; } ``` ``` String after |.| is - |.csie.ncku.edu.tw| String after |w| is - |wiki.csie.ncku.edu.tw| String after |u| is - |u.edu.tw| String after |m| is - |(null)| ``` --- ## [測驗 `2`](https://hackmd.io/@sysprog/linux2022-quiz8/https%3A%2F%2Fhackmd.io%2F%40sysprog%2FrkQMKQu7c): multiple-producer/multiple-consumer (MPMC) 的實作 :::info `DDD` : `idx++` `KKK` : `mask + 1` `TTT` : `&lfr->ring[tail & mask].idx, __ATOMIC_RELAXED` `HHH` : `head + actual` ::: :::spoiler 完整程式碼 ```c #include <assert.h> #include <inttypes.h> #include <stdbool.h> #include <stdlib.h> #include <stdio.h> #include "arch.h" #include "common.h" #include "lfring.h" #define SUPPORTED_FLAGS \ (LFRING_FLAG_SP | LFRING_FLAG_MP | LFRING_FLAG_SC | LFRING_FLAG_MC) #define MIN(a, b) \ ({ \ __typeof__(a) tmp_a = (a); \ __typeof__(b) tmp_b = (b); \ tmp_a < tmp_b ? tmp_a : tmp_b; \ }) typedef uintptr_t ringidx_t; struct element { void *ptr; uintptr_t idx; }; /* tail for enqueue * head for dequeue */ struct lfring { ringidx_t head; ringidx_t tail ALIGNED(CACHE_LINE); uint32_t mask; uint32_t flags; struct element ring[] ALIGNED(CACHE_LINE); } ALIGNED(CACHE_LINE); lfring_t *lfring_alloc(uint32_t n_elems, uint32_t flags) { unsigned long ringsz = ROUNDUP_POW2(n_elems); if (n_elems == 0 || ringsz == 0 || ringsz > 0x80000000) { assert(0 && "invalid number of elements"); return NULL; } if ((flags & ~SUPPORTED_FLAGS) != 0) { assert(0 && "invalid flags"); return NULL; } size_t nbytes = sizeof(lfring_t) + ringsz * sizeof(struct element); lfring_t *lfr = osal_alloc(nbytes, CACHE_LINE); if (!lfr) return NULL; lfr->head = 0, lfr->tail = 0; lfr->mask = ringsz - 1; lfr->flags = flags; for (ringidx_t i = 0; i < ringsz; i++) { lfr->ring[i].ptr = NULL; lfr->ring[i].idx = i - ringsz; } return lfr; } void lfring_free(lfring_t *lfr) { if (!lfr) return; if (lfr->head != lfr->tail) { assert(0 && "ring buffer not empty"); return; } osal_free(lfr); } /* True if 'a' is before 'b' ('a' < 'b') in serial number arithmetic */ static inline bool before(ringidx_t a, ringidx_t b) { return (intptr_t)(a - b) < 0; } /* renew loc with neu * if neu < old, means that the process is too old and no need to update, * return old instead. * else: * check if anyone have update the loc which make loc != &old * if no one update, assign loc with neu, and return neu * if someone has already update, assign new loc to &old, and compare again * * # Multiple Producers Second Step */ static inline ringidx_t cond_update(ringidx_t *loc, ringidx_t neu) { ringidx_t old = __atomic_load_n(loc, __ATOMIC_RELAXED); do { if (before(neu, old)) /* neu < old */ return old; /* if neu > old, need to update *loc */ } while (!__atomic_compare_exchange_n(loc, &old, /* Updated on failure */ neu, /* weak */ true, __ATOMIC_RELEASE, __ATOMIC_RELAXED)); return neu; } /* reload a new pointer * if idx < fresh, then just use the current pointer * else may means that loc haven't update * use next slot of idx */ static inline ringidx_t cond_reload(ringidx_t idx, const ringidx_t *loc) { ringidx_t fresh = __atomic_load_n(loc, __ATOMIC_RELAXED); if (before(idx, fresh)) { /* fresh is after idx, use this instead */ idx = fresh; } else { /* Continue with next slot */ idx++; /* DDD */ } return idx; } /* Enqueue elements at tail */ uint32_t lfring_enqueue(lfring_t *lfr, void *const *restrict elems, uint32_t n_elems) { intptr_t actual = 0; ringidx_t mask = lfr->mask; ringidx_t size = mask + 1; ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_RELAXED); if (lfr->flags & LFRING_FLAG_SP) { /* single-producer */ ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE); actual = MIN((intptr_t)(head + size - tail), (intptr_t) n_elems); if (actual <= 0) return 0; for (uint32_t i = 0; i < (uint32_t) actual; i++) { assert(lfr->ring[tail & mask].idx == tail - size); lfr->ring[tail & mask].ptr = *elems++; lfr->ring[tail & mask].idx = tail; tail++; } __atomic_store_n(&lfr->tail, tail, __ATOMIC_RELEASE); return (uint32_t) actual; } /* else: lock-free multi-producer */ restart: while ((uint32_t) actual < n_elems && before(tail, __atomic_load_n(&lfr->head, __ATOMIC_ACQUIRE) + size)) { union { struct element e; ptrpair_t pp; } old, neu; void *elem = elems[actual]; struct element *slot = &lfr->ring[tail & mask]; old.e.ptr = __atomic_load_n(&slot->ptr, __ATOMIC_RELAXED); old.e.idx = __atomic_load_n(&slot->idx, __ATOMIC_RELAXED); do { if (UNLIKELY(old.e.idx != tail - size)) { if (old.e.idx != tail) { /* We are far behind. Restart with fresh index */ tail = cond_reload(tail, &lfr->tail); goto restart; } /* slot already enqueued */ tail++; /* Try next slot */ goto restart; } /* Found slot that was used one lap back. * Try to enqueue next element. */ neu.e.ptr = elem; neu.e.idx = tail; /* Set idx on enqueue */ } while (!lf_compare_exchange((ptrpair_t *) slot, &old.pp, neu.pp)); /* Enqueue succeeded */ actual++; tail++; /* Continue with next slot */ } (void) cond_update(&lfr->tail, tail); return (uint32_t) actual; } static inline ringidx_t find_tail(lfring_t *lfr, ringidx_t head, ringidx_t tail) { if (lfr->flags & LFRING_FLAG_SP) /* single-producer enqueue */ return __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE); /* Multi-producer enqueue. * Scan ring for new elements that have been written but not released. */ ringidx_t mask = lfr->mask; ringidx_t size = mask + 1; /* KKK */ while (before(tail, head + size) && __atomic_load_n(&lfr->ring[tail & mask].idx, __ATOMIC_RELAXED) == tail) { tail++; } tail = cond_update(&lfr->tail, tail); return tail; } /* Dequeue elements from head */ uint32_t lfring_dequeue(lfring_t *lfr, void **restrict elems, uint32_t n_elems, uint32_t *index) { ringidx_t mask = lfr->mask; intptr_t actual; ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED); ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE); do { actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems); if (UNLIKELY(actual <= 0)) { /* Ring buffer is empty, scan for new but unreleased elements */ tail = find_tail(lfr, head, tail); actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems); if (actual <= 0) return 0; } for (uint32_t i = 0; i < (uint32_t) actual; i++) elems[i] = lfr->ring[(head + i) & mask].ptr; smp_fence(LoadStore); // Order loads only if (UNLIKELY(lfr->flags & LFRING_FLAG_SC)) { /* Single-consumer */ __atomic_store_n(&lfr->head, head + actual, __ATOMIC_RELAXED); break; } /* else: lock-free multi-consumer */ } while (!__atomic_compare_exchange_n( &lfr->head, &head, /* Updated on failure */ /* HHH */ head + actual, /* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); *index = (uint32_t) head; return (uint32_t) actual; } ``` ::: 本題要實作 MPMC lock-free ring buffer。其中 producer 的角色是插入新值 `enqueue` ;consumer 對應的是使用並移除舊值 `dequeue`。 參考 lfring 的結構體: ```c /* tail for enqueue : producer * head for dequeue : consumer * mask = ringsz - 1 : * which lfr->ring[tail & mask] becomes ring buffer * flags : the scenerio for mpmc/mpsc/spmc/spsc * ring : the element array for ring buffer */ struct lfring { ringidx_t head; ringidx_t tail ALIGNED(CACHE_LINE); uint32_t mask; uint32_t flags; struct element ring[] ALIGNED(CACHE_LINE); } ALIGNED(CACHE_LINE); ``` ### When multiple producer / consumer 在介紹 enqueue/dequeue 以前,我們先了解 mpmc 下會出現什麼狀況,又該如何避免錯誤: mpmc 可能會出現競爭問題,當兩個執行序同時對 lfring 進行操作時,可能發生競爭與覆寫的非預期結果。 理解到每次操作後的下一個指令都可能有競爭情形,所以我們用佔旗子的思維來解決這個問題:首先我們有一個預定要更新的位置,在準備存入時先確定該資源是否被競爭,若受競爭就往後尋找可以使用的位置,**注意**我們在確認這裡可以插旗子並且要存值時,這個步驟必須 atomic 的完成。 可以參考 [Ring Library](https://doc.dpdk.org/guides/prog_guide/ring_lib.html) `7.5.3.2 Multiple Producers Enqueue Second Step` 的說明: > The second step is to modify ring->prod_head in the ring structure to point to the same location as prod_next. This operation is done using a Compare And Swap (CAS) instruction, which does the following operations atomically: > > * If ring->prod_head is different to local variable prod_head, the CAS operation fails, and the code restarts at first step. > * Otherwise, ring->prod_head is set to local prod_next, the CAS operation is successful, and processing continues. > > In the figure, the operation succeeded on core 1, and step one restarted on core 2. ![](https://doc.dpdk.org/guides/_images/ring-mp-enqueue2.svg) #### `__atomic_compare_exchange_n` 我們透過 `__atomic_compare_exchange_n` 來確保我們存值時的資源是否被其他行程競爭了,根據[文件]((https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html))說明 ```c /* * This built-in function implements an atomic * compare and exchange operation. This compares * the contents of *ptr with the contents of *expected. * * If equal, the operation is a read-modify-write * operation that writes desired into *ptr. * * If they are not equal, the operation is a read and * the current contents of *ptr are written into *expected. */ bool __atomic_compare_exchange_n ( type *ptr, type *expected, type desired, bool weak, int success_memorder, int failure_memorder ) ``` 對應到我們的脈落: > 我們比較 `structure state *ptr` 與 `local variable *expected` 是否還相同,如果相同的話代表並沒有被其他行程競爭,所以我們把 `structure state *ptr` 更新為 `desired`。 > > 如果兩者不相同的話,代表說我們原本載入的 `local variable *expected` 已經不是最新的了,所以我們重新把 `*ptr` 載給 `*expected` ,再重新做一次我們的行為。 ### `cond_update` / `cond_reload` ```c /* renew loc with neu * if neu < old, means that the process is too old and no need to update, * return old instead. * else: * check if anyone have update the loc which make loc != &old * if no one update, assign loc with neu, and return neu * if someone has already update, assign new loc to &old, and compare again * * # Multiple Producers Second Step */ static inline ringidx_t cond_update(ringidx_t *loc, ringidx_t neu) { ringidx_t old = __atomic_load_n(loc, __ATOMIC_RELAXED); do { if (before(neu, old)) /* neu < old */ return old; /* if neu > old, need to update *loc */ } while (!__atomic_compare_exchange_n(loc, &old, /* Updated on failure */ neu, /* weak */ true, __ATOMIC_RELEASE, __ATOMIC_RELAXED)); return neu; } ``` ```c /* reload a new pointer * if idx < fresh, means that the process is too old and no need to reload, * just use the current pointer intead * else may means that loc haven't update * use next slot of idx */ static inline ringidx_t cond_reload(ringidx_t idx, const ringidx_t *loc) { ringidx_t fresh = __atomic_load_n(loc, __ATOMIC_RELAXED); if (before(idx, fresh)) { /* fresh is after idx, use this instead */ idx = fresh; } else { /* Continue with next slot */ idx++; /* DDD */ } return idx; } ``` ### Dequeue in multiple consumer ```c /* Dequeue elements from head * * 1. assign actual how many element are dequeueing * (MIN(the element remain in queue, the number we tempt)) * 1.1 if Ring buffer is empty, scan for new but unreleased elements * 1.2 if still empty, return 0 * * 2. copy the value from lfr->ring to elems * * 3. if single-consumer, just renew &lfr->head and return * * 4. use '__atomic_compare_exchange_n' to ensure * we successfully renew &lfr->head with (head + actual) * * @lfr : lfring struct * @elem : the element array after dequeueing to save to * @n_elems : the number of element to dequeue * @index : the last element we dequeue */ uint32_t lfring_dequeue(lfring_t *lfr, void **restrict elems, uint32_t n_elems, uint32_t *index) { ringidx_t mask = lfr->mask; intptr_t actual; ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED); ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE); do { // 1. assign actual how many element are dequeueing actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems); if (UNLIKELY(actual <= 0)) { // 1.1 if Ring buffer is empty, scan for new but unreleased elements tail = find_tail(lfr, head, tail); actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems); // 1.2 if still empty, return 0 if (actual <= 0) return 0; } // 2. copy the value from lfr->ring to elems for (uint32_t i = 0; i < (uint32_t) actual; i++) elems[i] = lfr->ring[(head + i) & mask].ptr; smp_fence(LoadStore); // Order loads only // 3. if single-consumer, just renew &lfr->head and return if (UNLIKELY(lfr->flags & LFRING_FLAG_SC)) { /* Single-consumer */ __atomic_store_n(&lfr->head, head + actual, __ATOMIC_RELAXED); break; } /* else: lock-free multi-consumer */ // 4. ensure we successfully renew &lfr->head with (head + actual) } while (!__atomic_compare_exchange_n( &lfr->head, &head, /* Updated on failure */ /* HHH */ head + actual, /* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); *index = (uint32_t) head; return (uint32_t) actual; } ``` ### `find_tail` 回顧 `lfring_dequeue` 的 `1.1`: 當我們發現目前目前 `lfr` 紀錄的 `(lfr->tail) - (lfr->head)` 顯示沒有 element 了,為何我們要用 `find_tail` 再更新一次 tail 呢? 關於這個我們要連結到 `lfring_enqueue` 的實作:`lfring_enqueue` 函式會先更新 `lfr->ring` 的存值,最後才會更新 `lfr->tail`。 這引出了一個情況:可能此刻 consumer 透過 `(tail-head)` 觀察到沒有 buffer,但其實同一時間 producer 正在 `enqueue` 更新當中,所以 `lfr->ring` 裡其實是有 buffer 的。所以這次我們改以從現有的 `lfr->ring` 來確認,是不是 producer 已經在 `enqueue` 當中了。 ```c /* find_tail: check current lfr->ring whether * there is someone enqueueing * * 1. Because single producer will store lfr->ring, lfr->tail * right in-time, so just check lfr->tail again * * 2. For Multi-producer, Scan ring for new elements that * have been written but not released. * * 2.1 while tail is before (head + size) && * lfr->ring[tail & mask].idx is actually loaded * renew the tail rightaway * * 3. cond_update for &lfr->tail and tail * maybe as we are finding tail, the producer have already * done his job */ static inline ringidx_t find_tail(lfring_t *lfr, ringidx_t head, ringidx_t tail) { /* 1. Because single producer will store lfr->ring, lfr->tail * right in-time, so just check lfr->tail again */ if (lfr->flags & LFRING_FLAG_SP) /* single-producer enqueue */ return __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE); /* 2. Multi-producer enqueue. * Scan ring for new elements that have been written but not released. */ ringidx_t mask = lfr->mask; ringidx_t size = mask + 1; /* KKK */ /* 2.1 while tail is before (head + size) && * lfr->ring[tail & mask].idx is actually loaded * renew the tail rightaway */ while (before(tail, head + size) && __atomic_load_n(&lfr->ring[tail & mask].idx, __ATOMIC_RELAXED) == tail) { tail++; } /* 3. cond_update for &lfr->tail and tail * maybe as we are finding tail, the producer have already * done his job */ tail = cond_update(&lfr->tail, tail); return tail; } ``` --- ## [測驗 `3`](https://hackmd.io/@sysprog/linux2022-quiz8/https%3A%2F%2Fhackmd.io%2F%40sysprog%2FBy5JAmOX9): 變更特定 Linux 行程的內部狀態 :::info ```c static void periodic_routine(struct work_struct *ws) { if (likely(loaded)) check(); queue_delayed_work(wq, &dont_trace_task, JIFFIES_DELAY); } ``` ::: :::spoiler 完整程式碼 ```c #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt #include <linux/list.h> #include <linux/module.h> #include <linux/sched.h> #include <linux/sched/signal.h> #include <linux/workqueue.h> MODULE_AUTHOR("National Cheng Kung University, Taiwan"); MODULE_LICENSE("Dual BSD/GPL"); MODULE_DESCRIPTION("A kernel module that kills ptrace tracer and its tracees"); #define JIFFIES_DELAY 1 #define DONT_TRACE_WQ_NAME "dont_trace_worker" static void periodic_routine(struct work_struct *); static DECLARE_DELAYED_WORK(dont_trace_task, periodic_routine); static struct workqueue_struct *wq; static bool loaded; /* Send SIGKILL from kernel space */ static void kill_task(struct task_struct *task) { send_sig(SIGKILL, task, 1); } /* @return true if the process has tracees */ static bool is_tracer(struct list_head *children) { struct list_head *list; list_for_each (list, children) { struct task_struct *task = list_entry(list, struct task_struct, ptrace_entry); if (task) return true; } return false; } /* Traverse the element in the linked list of the ptraced proccesses and * finally kills them. */ static void kill_tracee(struct list_head *children) { struct list_head *list; list_for_each (list, children) { struct task_struct *task_ptraced = list_entry(list, struct task_struct, ptrace_entry); pr_info("ptracee -> comm: %s, pid: %d, gid: %d, ptrace: %d\n", task_ptraced->comm, task_ptraced->pid, task_ptraced->tgid, task_ptraced->ptrace); kill_task(task_ptraced); } } static void check(void) { struct task_struct *task; for_each_process (task) { if (!is_tracer(&task->ptraced)) continue; kill_tracee(&task->ptraced); kill_task(task); /* Kill the tracer once all tracees are killed */ } } static void periodic_routine(struct work_struct *ws) { if (likely(loaded)) check(); queue_delayed_work(wq, &dont_trace_task, JIFFIES_DELAY); } static int __init dont_trace_init(void) { wq = create_workqueue(DONT_TRACE_WQ_NAME); queue_delayed_work(wq, &dont_trace_task, JIFFIES_DELAY); loaded = true; pr_info("Loaded!\n"); return 0; } static void __exit dont_trace_exit(void) { loaded = false; /* No new routines will be queued */ cancel_delayed_work(&dont_trace_task); /* Wait for the completion of all routines */ flush_workqueue(wq); destroy_workqueue(wq); pr_info("Unloaded.\n"); } module_init(dont_trace_init); module_exit(dont_trace_exit); ``` ::: 本題實作一個核心模組 `dont_trace`,利用 `task_struct` 內部成員 `ptrace` 來偵測給定的行程是否被除錯器或其他使用 `ptrace` 系統呼叫的程式所 attach,一旦偵測到其他行程嘗試追蹤該行程,`dont_trace` 就會主動 kill 追蹤者行程。流程: > * Once any process starts “ptracing” another, the tracee gets added into ptraced that’s located in task_struct, which is simply a linked list that contains all the tracees that the process is “ptracing”. > * Once a tracer is found, the module lists all its tracees and sends a SIGKILL signal to each of them including the tracer. This results in killing both the tracer and its tracees. > * Once the module is attached to the kernel, the module’s “core” function will run periodically through the advantage of workqueues. Specifically, the module runs every JIFFIES_DELAY, which is set to 1. That is, the module will run every one jiffy. This can be changed through modifying the macro JIFFIES_DELAY defined in the module. 從流程回到程式碼,我們可以理解函式對應的功能: * `kill_task` : 寄出 `SIGKILL` 以中斷指定核心行程。 * `is_tracer` : 假如該行程是 tracer,回傳 true * `kill_tracee` : 尋訪 tracer 的 ptraced proccesses 追蹤的串列任務行程,並殺死 tracee。 * `check` : 利用 `for_each_process(struct task_struct *task)` 尋訪所有排程的行程是否有為 tracer,並 "killing both the tracer and its tracees"。 * `periodic_routine` : 假如目前 `dont_trace` 核心模組被載入,那我就每過一個單位時間排程執行一次 `check`。 * `dont_trace_init`/`dont_trace_exit` : 載入 / 解除模組 接著主要圍繞 `periodic_routine` 來講解: ### `periodic_routine` (answer) ```c /* periodic_routine : * * 假如目前 dont_trace 核心模組被載入(loaded 全域變數) * 那我就每過一個單位時間執行一次 check。 * * 特別注意流程所述:Once the module is attached to the kernel, * the module’s “core” function will run periodically through * the advantage of workqueues. Specifically, the module * runs every JIFFIES_DELAY * * int queue_delayed_work (struct workqueue_struct *wq, * struct delayed_work *dwork, * unsigned long delay); * * @wq : workqueue to use * @dwork : delayable work to queue * @delay : number of jiffies to wait before queueing */ static void periodic_routine(struct work_struct *ws) { if (likely(loaded)) check(); queue_delayed_work(wq, &dont_trace_task, JIFFIES_DELAY); } ``` 根據 [queue_delayed_work](https://docs.huihoo.com/linux/kernel/2.6.26/kernel-api/re63.html) 所述: > queue_delayed_work — queue work on a workqueue after delay 本次核心模組依賴 [work queue](https://codertw.com/ios/61992/) 進行任務規劃排程: > Linux 中的 workqueue 機制就是為了簡化核心執行緒的建立。通過呼叫 workqueue 的介面就能建立核心執行緒。並且可以根據當前系統 CPU 的個數建立執行緒的數量,使得執行緒處理的事務能夠並行化。workqueue 是核心中實現簡單而有效的機制,他顯然簡化了核心 daemon 的建立,方便了使用者的程式設計。 > > 工作佇列 (workqueue) 是另外一種將工作推後執行的形式。工作佇列可以把工作推後,交由一個核心執行緒去執行,也就是說,這個下半部分可以在程序上下文中執行。最重要的就是工作佇列允許被重新排程甚至是睡眠。 延伸閱讀:[Work Queues](https://www.oreilly.com/library/view/understanding-the-linux/0596005652/ch04s08.html) ### `check` 在`<linux/sched.h>` 中定義的 `for_each_process` 用以尋訪目前排程的行程,如果這個行程是 `tracer` ,則尋訪該 `tracer` 中的 `ptraced` 殺死 `tracee` 並殺死 `tracer` 自己。 ```c static void check(void) { struct task_struct *task; for_each_process (task) { if (!is_tracer(&task->ptraced)) continue; kill_tracee(&task->ptraced); kill_task(task); /* Kill the tracer once all tracees are killed */ } } ```