Try   HackMD

2022-04-04 cantfindagoodname

contributed by < cantfindagoodname >

quiz8

題目

測驗 1

void *memchr_opt(const void *src_void, int c, size_t length)
{
    const unsigned char *src = (const unsigned char *) src_void;
    unsigned char d = c;

    while (UNALIGNED(src)) {
        if (!length--)
            return NULL;
        if (*src == d)
            return (void *) src;
        src++;
    }

    if (!TOO_SMALL(length)) {
        /* If we get this far, we know that length is large and
         * src is word-aligned.
         */

        /* The fast code reads the source one word at a time and only performs
         * the bytewise search on word-sized segments if they contain the search
         * character, which is detected by XORing the word-sized segment with a
         * word-sized block of the search character and then detecting for the
         * presence of NULL in the result.
         */
        unsigned long *asrc = (unsigned long *) src;
        unsigned long mask = d << 8 | d;
        mask = mask << 16 | mask;
        for (unsigned int i = 32; i < LBLOCKSIZE * 8; i <<= 1)
            mask = (mask << i) | mask;

        while (length >= LBLOCKSIZE) {
            /* XXXXX: Your implementation should appear here */
            if (DETECT_CHAR(*asrc, mask))
                break;
            ++asrc;
            length -= LBLOCKSIZE;
        }

        /* If there are fewer than LBLOCKSIZE characters left, then we resort to
         * the bytewise loop.
         */
        src = (unsigned char *) asrc;
    }

    while (length--) {
        if (*src == d)
            return (void *) src;
        src++;
    }

    return NULL;
}    

The program use DETECT_CHAR to check if the designated character is in the word of current loop, if so it then break to check which byte of the word contains the charcter. The loop would iterate by scanning the data word by word.

while (length >= LBLOCKSIZE) {
	/* Check if target is in current word */
	/* If target = a, mask would be aaaa, up to length of a word */
	if (DETECT_CHAR(*asrc, mask))
		break;
	/* Move pointer to scan through data */
	++asrc;
	/* Track how many data is left */
	length -= LBLOCKSIZE;
}

In quiz3 測驗9延伸問題, Instead of finding a example of application for ALIGN, I take a problem I've encountered when doing fibdrv. Which I try to optimize numerical arithmetic on character array.

In the program, I would match the NULL character '\0' instead of a given character 'c', using implementation of DETECT_NULL given in this test.
Pseudocode :

while (&s != ROUND_DOWN_TO_ALIGNMENT_SIZE(&s)) {
	/* do something */
	++s;
}

int l = 0;
uint32_t v;
while((v - 0x01010101) & ~v & (0x80808080)) {
	v = *(uint32_t *)s;
	/* do something */
	s += 4; // move pointer
}
if (((v) & 0xFF) == '\0') /* do something on s */;
if (((v >> 8) & 0xFF) == '\0') /* do something on s, s+1 */;
if (((v >> 16) & 0xFF) == '\0') /* do something on s, s+1, s+2 */;

The program have three procedure, similar to the test.

  1. It will align the pointer with word to not segment the memory access later on when we cast to unsigned long, when we try to access a memory block which are located across different memory segment, CPU will try to access the memory twice, when one is sufficient if data is aligned, which severely affect the performance.
  2. It then access the data one word at a time instead of one charcter at a time.
  3. Then, when it reaches (or about to reach) the end, it then go back to access the data a character at a time to ensure we won't have invalid memory access. As in when there are only memory less than a block left, then last byte(s) would be invalid access if the program does not switch back to access character by character.

測驗 2

static inline ringidx_t cond_reload(ringidx_t idx, const ringidx_t *loc)
{
    ringidx_t fresh = __atomic_load_n(loc, __ATOMIC_RELAXED);
    if (before(idx, fresh)) { /* fresh is after idx, use this instead */
        idx = fresh;
    } else { /* Continue with next slot */
        ++idx;
    }
    return idx;
} 

DDD : ++idx

static inline ringidx_t find_tail(lfring_t *lfr, ringidx_t head, ringidx_t tail)
{
    if (lfr->flags & LFRING_FLAG_SP) /* single-producer enqueue */
        return __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);

    /* Multi-producer enqueue.
     * Scan ring for new elements that have been written but not released.
     */
    ringidx_t mask = lfr->mask;
    ringidx_t size = mask + 1;
    while (before(tail, head + size) &&
           __atomic_load_n(&head + size, __ATOMIC_ACQUIRE) ==
               tail)
        tail++;
    tail = cond_update(&lfr->tail, tail);
    return tail;
}

KKK : mask + 1
TTT : &head + size, __ATOMIC_ACQUIRE

The function would grab the tail from the ring buffer, the data it grab should be the most recent data. Hence, __ATOMIC_ACQUIRE is used to make write on other thread visible.

/* Dequeue elements from head */
uint32_t lfring_dequeue(lfring_t *lfr,
                        void **restrict elems,
                        uint32_t n_elems,
                        uint32_t *index)
{
    ringidx_t mask = lfr->mask;
    intptr_t actual;
    ringidx_t head = __atomic_load_n(&lfr->head, __ATOMIC_RELAXED);
    ringidx_t tail = __atomic_load_n(&lfr->tail, __ATOMIC_ACQUIRE);
    do {
        actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
        if (UNLIKELY(actual <= 0)) {
            /* Ring buffer is empty, scan for new but unreleased elements */
            tail = find_tail(lfr, head, tail);
            actual = MIN((intptr_t)(tail - head), (intptr_t) n_elems);
            if (actual <= 0)
                return 0;
        }
        for (uint32_t i = 0; i < (uint32_t) actual; i++)
            elems[i] = lfr->ring[(head + i) & mask].ptr;
        smp_fence(LoadStore);                        // Order loads only
        if (UNLIKELY(lfr->flags & LFRING_FLAG_SC)) { /* Single-consumer */
            __atomic_store_n(&lfr->head, head + actual, __ATOMIC_RELAXED);
            break;
        }

        /* else: lock-free multi-consumer */
    } while (!__atomic_compare_exchange_n(
        &lfr->head, &head, /* Updated on failure */
        head + actual,
        /* weak */ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
    *index = (uint32_t) head;
    return (uint32_t) actual;
}

HHH : head + actual

For single consumer, __atomic_store_n(&lfr->head, head + actual, __ATOMIC_RELAXED) is called to discard elements by moving head forward.
For multiple consumer, compare and swap is used for the same operation to ensure the new value is up-to-date.

測驗 3

Reference : lkmpg:14, include/workqueue.h

static void periodic_routine(struct work_struct *ws)
{
    if (likely(loaded))
        check();
    queue_delayed_work(wq, to_delayed_work(ws), JIFFIES_DELAY);
}

check would go through all process, check if process has ptrace, then send kill signal if there is ptrace.

There is a condition after likely, as loaded is the only Boolean variable that are defined but not used, it should be place to the check to potentially skip some check calls.

One first thing i notice is there is no fops to invoke any of the functions that are not __init and __exit.
The functions are called tasks and are run by using work queues.

First, the work are declared using DECLARE_DELAYED_WORK, where two parameter are required for the macro, by how they appears in the program, it is safe to assume the arguments are the name of task and a pointer to function respectively.

queue_delayed_work are called in __init, where very likely periodic_routine will be called after JIFFIES_DELAY

As the name periodic_routine states, periodic_routine should be invoked periodically, hence the second /* Implement */ must be code that would schedule the task periodic_routine, which we know for sure ws will.

My first attempt is to simply do schedule_work(ws), the program gives the correct output (as shown by the reference). However, shortly after that, the whole system crashed (When running in KVM or loading the module raw).

dmesg --follow would show that the program will repeatedly call kill_tracee (only this has pr_info in it), which indicates periodic_routine will repeatedly try to kill the process yes, all with the same PID.

I figure that this is due to the fact that work queue is not used, hence, I then change schedule_work(ws) to its work queue equilvalent queue_work(wd, ws). Now, the system will not crash however pr_info still shows that kill_tracee is repeatedly called. When I attempt to do sudo rmmod dont_trace, I got the following message :

[ 4039.859796] workqueue dont_trace_worker: drain_workqueue() isn't complete after 10 tries
[ 4039.860772] workqueue dont_trace_worker: drain_workqueue() isn't complete after 100 tries
[ 4039.861873] workqueue dont_trace_worker: drain_workqueue() isn't complete after 200 tries
[ 4039.862950] workqueue dont_trace_worker: drain_workqueue() isn't complete after 300 tries
[ 4039.863891] workqueue dont_trace_worker: drain_workqueue() isn't complete after 400 tries
[ 4039.865038] workqueue dont_trace_worker: drain_workqueue() isn't complete after 500 tries
[ 4039.866118] workqueue dont_trace_worker: drain_workqueue() isn't complete after 600 tries
[ 4039.867208] workqueue dont_trace_worker: drain_workqueue() isn't complete after 700 tries
[ 4039.868275] workqueue dont_trace_worker: drain_workqueue() isn't complete after 800 tries
[ 4039.869427] workqueue dont_trace_worker: drain_workqueue() isn't complete after 900 tries
[ 4039.870541] workqueue dont_trace_worker: drain_workqueue() isn't complete after 1000 tries

Experiment had yet to be done (and im still not sure how to design an experiment for this), but i speculate the scheduler would try to parallize the execution of periodic_routine, before check correctly send signals to all process with ptrace the module would already invoke queue_work multiple times. Resulting in the program repeatedly tranverse the process list even though the process is killed, hogging the CPU to do nothing useful.

After that, I try changing queue_work to queue_delayed_work and I get the desired result, and kill_tracee would only be called once.