Try   HackMD

2021q1 第 7 週測驗題: 測驗 4

tags: linux2021

測驗題目總覽

本題目檢驗學員對於 並行程式設計 的認知

給定的 mbus.c 是個特製的 message bus (mbus 即為 message bus 的簡稱),允許執行緒事先對 mbus 註冊其 callback function (可簡稱 callbackcb),隨後另一個執行緒可透過 mbus 傳送訊息到原本已註冊的執行緒。示意如下:

               +----------+    +-----------------+
               | Thread 1 |    | callbackThread2 |
               +----+-----+    +------------+----+
                    |                ^      |
bus_send(bus,       |                |      +--- queue_push(ctx->queue, msg)
         idThread2, |                |
         msg)       |                |
                    |      +---------+  callbackThread2(ctxThread2, msg)
                    v      |
         +-----------------+-------------------+
         |                mbus                 |
         +-------------------------------------+
               ^    :      :
               | bus_register(bus, idThread2,
               |    :      :  callbackThread2, ctxThread2)
         +-----+------+    :
         |  Thread 2  +--------------> queue_pop(ctx->queue)
         +------------+  loops over
               :    :      :
               :    :      :                                          Time
        -------*----*------*----------------------------------------------->

Thread 2 先是呼叫 bus_register 函式以註冊其 callback function,後者由使用者自行定義,規範倘若其他執行緒要求傳送訊息到 Thread 2 時,該做哪些操作。隨後,Thread 1 藉由呼叫 bus_send,要求對 Thread 2 發送訊息,這使得執行流程從 Thread 1 變成 Thread 2 的 callback function (注意: 這是 mbus 相當特別的地方!) —— 這使得 Thread 2 的 callback function 實際負責處理傳送到 Thread 2 的訊息。

以下是這專案的簡要文件:

mbus implements a concurrent data structure. Independent clients can register their callback functions against this bus to receive messages directed to them. Any user with a reference to the bus can send messages to a registered client by just knowing the ID of the destination client. Users can also communicate with all of these clients at once through broadcast messaging.

Even though separate threads can register their own callbacks independently, when Thread 1 sends Thread 2 a message, Thread 2's callback function is executed in Thread 1's context. The only way to execute Thread 2's callback function in that same thread would be to use some sort of low-level interrupt, which might break the callee's execution flow.

bus_send has two modes of operation, individual and broadcast. Individual calls a single client's registered callback. Broadcast does the same, but for every registered client.

If its callback is set (meaning that it is a registered client), it will be called with the client's context and the message. The client's context is a pointer to some opaque data that will be passed to the callback; it is
usually owned by whomever registered that callback. Reading the client is atomic, meaning that there are no race conditions in which that client is copied from memory at the same time as that client's information being freed; in other words, mbus are guaranteed that right after calling __atomic_load, client contains valid data.

bus_unregister deletes a client's entry in the bus->clients array. This way, no more messages can be sent to it, since the callback will be set to NULL.

The store is atomic, meaning that it cannot happen at the same time as the load seen in bus_send.

mbus 的實作透過 GCC Memory Model Aware Atomic Operations,程式碼列表如下:

#include <limits.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#define BUS_DEFAULT_CLIENTS 128
#define BUS_MAX_CLIENTS UINT_MAX

typedef unsigned int bus_client_id_t;
typedef void (*bus_client_cb_t)(void *ctx, void *msg);

/* FIXME: rewrite with <stdatomic.h> */
#define CAS(dst, expected, value)                                        \
    __atomic_compare_exchange(dst, expected, value, 0, __ATOMIC_SEQ_CST, \
                              __ATOMIC_SEQ_CST)

typedef struct {
    bool registered;
    unsigned int refcnt;
    bus_client_cb_t callback;
    void *ctx;
} bus_client_t;

typedef struct {
    bus_client_t *clients;
    const unsigned int n_clients;
} bus_t;

/*
 * Allocate a new bus. If @n_clients is non-zero, it allocates space for
 * specific number of clients; otherwise, it uses BUS_DEFAULT_CLIENTS.
 * @n_clients can not be greater than BUS_MAX_CLIENTS. Returns true on success.
 */
bool __attribute__((warn_unused_result))
bus_new(bus_t **bus, unsigned int n_clients)
{
    if (n_clients > BUS_MAX_CLIENTS)
        return false;

    bus_t *b;
    if (!(b = malloc(sizeof(bus_t))))
        return false;

    /* Initialize bus struct */
    *(unsigned int *) &b->n_clients =
        !n_clients ? BUS_DEFAULT_CLIENTS : n_clients;
    if (!(b->clients = calloc(b->n_clients, sizeof(bus_client_t)))) {
        free(b);
        return false;
    }

    *bus = b;
    return true;
}

/*
 * Register a new client with the specified @id.
 * The ID must satisfy 0 <= ID < n_clients and not be in use; otherwise the
 * function would fail. Whenever a message is sent to this client, @callback
 * will be called. The first argument for @callback is the the user-supplied
 * context, @ctx (can be ommitted by passing NULL). The second argument for
 * @callback will be the received message. Returns true on success.
 */
bool __attribute__((warn_unused_result, nonnull(1)))
bus_register(bus_t *bus,
             bus_client_id_t id,
             bus_client_cb_t callback,
             void *ctx)
{
    if (id >= bus->n_clients)
        return false;

    bus_client_t null_client = {0};
    bus_client_t new_client = {
        .registered = true,
        .callback = callback,
        .ctx = ctx,
        .refcnt = 0,
    };

    return (bool) CAS(&(bus->clients[id]), &null_client, &new_client);
}

/*
 * Attempt to call a client's callback function to send a message.
 * Might fail if such client gets unregistered while attempting to send message.
 */
static bool execute_client_callback(bus_client_t *client, void *msg)
{
    /* Load the client with which we are attempting to communicate. */
    bus_client_t local_client;
    __atomic_load(client, &local_client, __ATOMIC_SEQ_CST);

    /* Loop until reference count isupdated or client becomes unregistered */
    while (local_client.registered) {
        /* The expected reference count is the current one + 1 */
        bus_client_t new_client = local_client;
        ++(new_client.refcnt);

        /* If CAS succeeds, the client had the expected reference count, and
         * we updated it successfully. If CAS fails, the client was updated
         * recently. The actual value is copied to local_client.
         */
        XXXXX /* 提交你的實作 */ ;
    }

    /* Client was not registered or got unregistered while we attempted to send
     * a message
     */
    return false;
}

/*
 * If @broadcast is set to false, it sends a message to the client with the
 * specified @id. If @broadcast is set to true, the message is sent to every
 * registered client, and the supplied ID is ignored. Returns true on success.
 */
bool __attribute__((warn_unused_result, nonnull(1)))
bus_send(bus_t *bus, bus_client_id_t id, void *msg, bool broadcast)
{
    if (broadcast) {
        for (id = 0; id < bus->n_clients; ++id)
            execute_client_callback(&(bus->clients[id]), msg);
        return true;
    }
    if (id >= bus->n_clients)
        return false;
    return execute_client_callback(&(bus->clients[id]), msg);
}

/*
 * Unregister the client with the specified @id. No additional can be made
 * to the specified client. Returns true on success.
 */
bool __attribute__((warn_unused_result, nonnull(1)))
bus_unregister(bus_t *bus, bus_client_id_t id)
{
    if (id >= bus->n_clients)
        return false;

    /* Load the client we are attempting to unregister */
    bus_client_t local_client, null_client = {0};
    __atomic_load(&(bus->clients[id]), &local_client, __ATOMIC_SEQ_CST);

    /* It was already unregistered */
    if (!local_client.registered)
        return false;

    do {
        local_client.refcnt = 0; /* the expected reference count */

        /* If CAS succeeds, the client had refcnt = 0 and got unregistered.
         * If CAS does not succeed, the value of the client gets copied into
         * local_client.
         */
        if (CAS(&(bus->clients[id]), &local_client, &null_client))
            return true;
    } while (local_client.registered);

    /* Someone else unregistered this client */
    return true;
}

/* Free the bus object */
void bus_free(bus_t *bus)
{
    if (!bus)
        return;
    free(bus->clients);
    free(bus);
}

#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>

#define NUM_THREADS 4

/* Data passed to each thread */
typedef struct {
    bus_t *bus;
    unsigned int id;
} thread_data_t;

/* Function to be called by the bus for each new message */
static void bus_callback(void *_ctx, void *_msg)
{
    unsigned int ctx = *(unsigned int *) _ctx, msg = *(unsigned int *) _msg;
    printf("Callback for thread %u received: %u\n", ctx, msg);
}

/* This funcion will be spawned NUM_THREADS times as a separate thread. */
static void *thread_func(void *_data)
{
    thread_data_t *data = (thread_data_t *) _data;
    bus_client_id_t dest = (data->id + 1) % NUM_THREADS;

    /* Register our callback */
    if (!bus_register(data->bus, data->id, &bus_callback, &(data->id))) {
        perror("bus_register");
        return NULL;
    }
    printf("Registered callback from thread %u\n", data->id);

    /* Loop until the destination is registered from a separate thread */
    while (!bus_send(data->bus, dest, &(data->id), false))
        ;

    if (bus_unregister(data->bus, dest))
        return NULL;

    return NULL;
}

int main()
{
    pthread_t threads[NUM_THREADS];
    thread_data_t ctx[NUM_THREADS];

    bus_t *bus;
    if (!bus_new(&bus, 0)) {
        perror("bus_new");
        exit(EXIT_FAILURE);
    }

    /* Launch threads, each with their own context containing a reference to the
     * bus and their ID
     */
    for (int i = 0; i < NUM_THREADS; ++i) {
        ctx[i].bus = bus, ctx[i].id = i;
        if (pthread_create(&threads[i], NULL, thread_func, &ctx[i]))
            perror("pthread_create");
    }

    /* Wait until completion */
    for (int i = 0; i < NUM_THREADS; ++i) {
        if (pthread_join(threads[i], NULL))
            perror("pthread_join");
    }

    bus_free(bus);

    return 0;
}

mbus.c 編譯並連結:

$ gcc -o mbus mbus.c -lpthread -latomic

參考執行輸出:

Registered callback from thread 0
Callback for thread 1 received: 0
Registered callback from thread 1
Callback for thread 2 received: 1
Registered callback from thread 2
Callback for thread 3 received: 2
Registered callback from thread 3
Callback for thread 0 received: 3

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
上述 register/callback 的順序可能會變動

請依據上述程式行為和註解,補完程式碼。作答時,請列出 static bool execute_client_callback(bus_client_t *client, void *msg) 完整內容,並且回答以下問題:

  1. mbus.c 貌似彆扭的 callback function 設計,能夠解決什麼問題?
  2. 考慮到訊息處理的數量可能相當大,我們可事先建立 thread pool 並透過 Processor affinity 讓執行緒在執行 callback function 時,儘量不要相互干擾。請簡述你打算如何修改程式碼,以實現這目標,可斟酌列出關鍵程式碼。

延伸問題:

  1. 解釋上述程式碼運作原理,特別是 atomics 的使用,你應該闡述相較於 lock-based (使用 pthread_mutex_lock) 的執行成本開銷
  2. 在 Linux 核心原始程式碼中,MHI busKBUS 都是 message bus 的實作,請簡述其設計考量和應用場景