Try   HackMD

2022q1 第 14 週測驗題

tags: linux2022

目的: 檢驗學員對 並行和多執行緒程式設計Linux 核心記憶體管理 的認知

作答表單:

測驗 1

考慮一個多生產者單消費者的並行佇列實作 (檔名: mpscq.c),程式碼如下:

#include <stddef.h>

#define XCHG(p, n) __atomic_exchange_n(p, n, __ATOMIC_SEQ_CST)
#define STORE(p, n) __atomic_store_n(p, n, __ATOMIC_SEQ_CST)
#define LOAD(p) __atomic_load_n(p, __ATOMIC_SEQ_CST)

struct node {
    struct node *next;
};

#define QUEUE_STATIC_INIT(self)                \
    {                                          \
        .head = &self.stub, .tail = &self.stub \
    }
struct mpscq {
    struct node *head, *tail;
    struct node stub;
};

static void mpscq_create(struct mpscq *self)
{
    self->head = self->tail = &self->stub;
    self->stub.next = NULL;
}

static void mpscq_push(struct mpscq *self, struct node *n)
{
    n->next = 0;
    struct node *prev = PPPP;
    STORE(&prev->next, n);
}

static struct node *mpscq_pop(struct mpscq *self)
{
    struct node *tail = self->tail, *next = LOAD(&tail->next);
    if (tail == &self->stub) {
        if (!next)
            return NULL;

        self->tail = next;
        tail = next;
        next = QQQQ;
    }

    if (next) {
        self->tail = next;
        return tail;
    }

    struct node *head = LOAD(&self->head);
    if (RRRR)
        return NULL;

    mpscq_push(self, &self->stub);
    next = LOAD(&tail->next);
    if (next) {
        self->tail = next;
        return tail;
    }
    return NULL;
}

#include <assert.h>
#include <pthread.h>

#define P 8
#define N 4000000L

struct result {
    struct node n;
    long value;
};

struct producer {
    pthread_t t;
    struct mpscq *q;
    long s;
};

static void *produce(void *arg)
{
    struct producer *p = arg;
    struct mpscq *q = p->q;
    static struct result r[P * N];
    for (long i = p->s; i < P * N; i += P) {
        r[i].value = i;
        mpscq_push(q, &r[i].n);
    }
    return 0;
}

int main(void)
{
    struct mpscq q;
    mpscq_create(&q);

    struct producer p[P];
    for (int i = 0; i < P; i++) {
        p[i].q = &q;
        p[i].s = i;
        pthread_create(&p[i].t, 0, produce, p + i);
    }

    static char seen[P * N];
    for (long i = 0; i < P * N; i++) {
        struct result *r;
        do {
            r = (struct result *) mpscq_pop(&q);
        } while (!r);
        assert(!seen[r->value]);
        seen[r->value] = 1;
    }

    for (int i = 0; i < P; i++)
        pthread_join(p[i].t, 0);
    return 0;
}

已知執行過程中,不會遇到任何 assert 錯誤,請補完程式碼,使其運作符合預期。作答規範:

  • PPPPQQQQ 都該使用 XCHG, STORE, LOAD 等巨集之一
  • PPPP, QQQQ, RRRR 都該以最精簡的形式撰寫

測驗 2

考慮一個 Linux 封包捕捉和分析工具,使用 Packet MMAP,程式碼列表如下: (檔名 rxtest.c)

#include <linux/if.h>
#include <linux/if_ether.h> /* The L2 protocols */
#include <linux/if_packet.h>
#include <netinet/in.h>
#include <signal.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/poll.h>
#include <unistd.h>

typedef struct _rxring *rxring_t;
typedef int (*rx_cb_t)(void *u, const uint8_t *buf, size_t len);

struct priv {
    /* unused */
};

struct _rxring {
    void *user;
    rx_cb_t cb;
    uint8_t *map;
    size_t map_sz;
    sig_atomic_t cancel;
    unsigned int r_idx, nr_blocks, block_sz;
    int ifindex;
    int fd;
};

#define N_BLOCKS 2049

/* 1. Open the packet socket */
static bool packet_socket(rxring_t rx)
{
    return (rx->fd = socket(PF_PACKET, SOCK_RAW, htons(ETH_P_ALL))) >= 0;
}

/* 2. Set TPACKET_V3 */
static bool set_v3(rxring_t rx)
{
    int val = TPACKET_V3;
    return !setsockopt(rx->fd, SOL_PACKET, PACKET_VERSION, &val, sizeof(val));
}

/* 3. Setup the fd for mmap() ring buffer */
static bool rx_ring(rxring_t rx)
{
    struct tpacket_req3 req = {
        .tp_block_size = getpagesize() << 2,
        .tp_block_nr = N_BLOCKS,
        .tp_frame_size = TPACKET_ALIGNMENT << 7,
        .tp_frame_nr = req.tp_block_size / req.tp_frame_size * req.tp_block_nr,
        .tp_retire_blk_tov = 64,
        .tp_sizeof_priv = sizeof(struct priv),
        .tp_feature_req_word = 0,
    };
    if (setsockopt(rx->fd, SOL_PACKET, RRRR, (char *) &req,
                   sizeof(req)))
        return false;

    rx->map_sz = req.tp_block_size * req.tp_block_nr;
    rx->nr_blocks = req.tp_block_nr;
    rx->block_sz = req.tp_block_size;
    return true;
}

/* 4. Bind to the ifindex on our sending interface */
static bool bind_if(rxring_t rx, const char *ifname)
{
    if (ifname) {
        struct ifreq ifr;
        snprintf(ifr.ifr_name, sizeof(ifr.ifr_name), "%s", ifname);
        if (ioctl(rx->fd, SIOCGIFINDEX, &ifr))
            return false;

        rx->ifindex = ifr.ifr_ifindex;
    } else {
        rx->ifindex = 0; /* interface "any" */
    }

    struct sockaddr_ll sll;
    memset(&sll, 0, sizeof(sll));
    sll.sll_family = PF_PACKET;
    sll.sll_protocol = htons(ETH_P_ALL);
    sll.sll_ifindex = rx->ifindex;
    if (bind(rx->fd, (struct sockaddr *) &sll, sizeof(sll)))
        return false;

    return true;
}

/* 5. finally mmap() the sucker */
static bool map_ring(rxring_t rx)
{
    printf("mapping %zu MiB ring buffer\n", rx->map_sz >> 20);
    int flags = FFFF;
    rx->map = mmap(NULL, rx->map_sz, flags, MAP_SHARED, rx->fd, 0);
    return rx->map != MAP_FAILED;
}

rxring_t rxring_init(const char *ifname, rx_cb_t cb, void *user)
{
    struct _rxring *rx = calloc(1, sizeof(*rx));
    if (!rx)
        goto out;

    if (!packet_socket(rx))
        goto out_free;
    if (!set_v3(rx))
        goto out_close;
    if (!rx_ring(rx))
        goto out_close;
    if (!bind_if(rx, ifname))
        goto out_close;
    if (!map_ring(rx))
        goto out_close;

    rx->cb = cb;
    rx->user = user;

    /* success */
    goto out;

out_close:
    close(rx->fd);
out_free:
    free(rx);
    rx = NULL;
out:
    return rx;
}

bool rxring_fanout_hash(rxring_t rx, uint16_t id)
{
    int val = PACKET_FANOUT_FLAG_DEFRAG | (PACKET_FANOUT_HASH << 16) | id;
    return !setsockopt(rx->fd, SOL_PACKET, PACKET_FANOUT, &val, sizeof(val));
}

static void do_block(rxring_t rx, struct tpacket_block_desc *desc)
{
    const uint8_t *ptr = (uint8_t *) desc + desc->hdr.bh1.offset_to_first_pkt;
    unsigned int num_pkts = desc->hdr.bh1.num_pkts;

    for (unsigned int i = 0; i < num_pkts; i++) {
        struct tpacket3_hdr *hdr = (struct tpacket3_hdr *) ptr;
        printf("packet %u/%u %u.%u\n", i, num_pkts, hdr->tp_sec, hdr->tp_nsec);

        /* packet */
        if (rx->cb)
            (*rx->cb)(rx->user, ptr + hdr->tp_mac, hdr->tp_snaplen);

        ptr += hdr->tp_next_offset;
        __sync_synchronize();
    }
}

void rxring_mainloop(rxring_t rx)
{
    struct pollfd pfd = {
        .fd = rx->fd,
        .events = POLLIN | POLLERR,
        .revents = 0,
    };
    while (!rx->cancel) {
        struct tpacket_block_desc *desc =
            (struct tpacket_block_desc *) rx->map + rx->r_idx * rx->block_sz;

        while (!(desc->hdr.bh1.block_status & TP_STATUS_USER))
            poll(&pfd, 1, -1);

        /* walk block */
        do_block(rx, desc);

        desc->hdr.bh1.block_status = TP_STATUS_KERNEL;
        __sync_synchronize();
        rx->r_idx = (rx->r_idx + 1) % rx->nr_blocks;
    }
}

void rxring_free(rxring_t rx)
{
    if (!rx)
        return;

    munmap(rx->map, rx->map_sz);
    close(rx->fd);
    free(rx);
}

#include <ctype.h>
static void hex_dumpf(FILE *f, const uint8_t *tmp, size_t len, size_t llen)
{
    if (!f || 0 == len)
        return;

    if (!llen)
        llen = 0x10;
    for (size_t line, i, j = 0; j < len; j += line, tmp += line) {
        line = (j + llen > len) ? len - j : llen;
        fprintf(f, " | %05zx : ", j);
        for (i = 0; i < line; i++)
            fprintf(f, "%c", isprint(tmp[i]) ? tmp[i] : '.');
        for (; i < llen; i++)
            fprintf(f, " ");
        for (i = 0; i < line; i++)
            fprintf(f, " %02x", tmp[i]);
        fprintf(f, "\n");
    }
    fprintf(f, "\n");
}

static int cb(void *u, const uint8_t *buf, size_t len)
{
    hex_dumpf(stdout, buf, len, 0);
    return 1;
}

int main(int argc, char **argv)
{
    const char *cmd = argv[0];
    if (argc < 2) {
        fprintf(stderr, "Usage:\n\t%s <ifname>\n\n", cmd);
        return EXIT_FAILURE;
    }

    rxring_t rx = rxring_init(argv[1], cb, NULL);
    if (!rx || !rxring_fanout_hash(rx, 0x1234))
        return EXIT_FAILURE;

    rxring_mainloop(rx);

    printf("%s: OK\n", cmd);
    rxring_free(rx);

    return EXIT_SUCCESS;
}

參考執行輸出:

$ sudo ./rxtest eth0
mapping 32 MiB ring buffer
packet 0/2 1652933832.465103535
 | 00000 : RU....RUU..g..E. 52 55 c0 a8 05 02 52 55 55 0a b2 67 08 00 45 08
 | 00010 : .l.W@.@......... 00 6c c6 57 40 00 40 06 e8 ca c0 a8 05 0f c0 a8
 | 00020 : ............~.P. 05 02 00 16 e7 90 bb a3 8a fe 00 08 7e ab 50 18
 | 00030 : ..T......0..!..Z ff ff 54 fd 00 00 00 00 00 30 12 d6 21 e7 ad 5a
 | 00040 : ...v..\...W0:... 0d 8f db 76 12 ac 5c e8 8f 17 57 30 3a 8f ad ac
 | 00050 : VB.J.<..<.X.pX.. 56 42 fb 4a e7 3c e4 0b 3c 8e 58 db 70 58 90 18
 | 00060 : .|...."...[+.c.. dc 7c 97 a5 a9 0e 22 2e f6 be 5b 2b f7 63 9c d3
 | 00070 : vtV..... .       76 74 56 b7 b4 93 9d d8 20 fd

packet 1/2 1652933832.465293157
 | 00000 : RUU..gRU......E. 52 55 55 0a b2 67 52 55 c0 a8 05 02 08 00 45 00
 | 00010 : .(.<..@..1...... 00 28 f2 3c 00 00 40 06 fd 31 c0 a8 05 02 c0 a8
 | 00020 : ........~....BP. 05 0f e7 90 00 16 00 08 7e ab bb a3 8b 42 50 10
 | 00030 : ..w2..           ff ff 77 32 00 00

請補完程式碼,使其符合預期。作答規範:

  • RRRRFFFF 為巨集或巨集的組合表示式 (搭配 bitwise 操作)