2021q1 第 7 週測驗題: 測驗 4
測驗題目總覽
給定的 mbus.c
是個特製的 message bus (mbus
即為 message bus 的簡稱),允許執行緒事先對 mbus
註冊其 callback function (可簡稱 callback
或 cb
),隨後另一個執行緒可透過 mbus
傳送訊息到原本已註冊的執行緒。示意如下:
Thread 2 先是呼叫 bus_register
函式以註冊其 callback function,後者由使用者自行定義,規範倘若其他執行緒要求傳送訊息到 Thread 2 時,該做哪些操作。隨後,Thread 1 藉由呼叫 bus_send
,要求對 Thread 2 發送訊息,這使得執行流程從 Thread 1 變成 Thread 2 的 callback function (注意: 這是 mbus
相當特別的地方!) —— 這使得 Thread 2 的 callback function 實際負責處理傳送到 Thread 2 的訊息。
以下是這專案的簡要文件:
mbus
implements a concurrent data structure. Independent clients can register their callback functions against this bus to receive messages directed to them. Any user with a reference to the bus can send messages to a registered client by just knowing the ID of the destination client. Users can also communicate with all of these clients at once through broadcast messaging.
Even though separate threads can register their own callbacks independently, when Thread 1 sends Thread 2 a message, Thread 2's callback function is executed in Thread 1's context. The only way to execute Thread 2's callback function in that same thread would be to use some sort of low-level interrupt, which might break the callee's execution flow.
bus_send
has two modes of operation, individual and broadcast. Individual calls a single client's registered callback. Broadcast does the same, but for every registered client.
If its callback is set (meaning that it is a registered client), it will be called with the client's context and the message. The client's context is a pointer to some opaque data that will be passed to the callback; it is
usually owned by whomever registered that callback. Reading the client is atomic, meaning that there are no race conditions in which that client is copied from memory at the same time as that client's information being freed; in other words, mbus
are guaranteed that right after calling __atomic_load
, client contains valid data.
bus_unregister
deletes a client's entry in the bus->clients
array. This way, no more messages can be sent to it, since the callback will be set to NULL
.
The store is atomic, meaning that it cannot happen at the same time as the load seen in bus_send
.
mbus
的實作透過 GCC Memory Model Aware Atomic Operations,程式碼列表如下:
#include <limits.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define BUS_DEFAULT_CLIENTS 128
#define BUS_MAX_CLIENTS UINT_MAX
typedef unsigned int bus_client_id_t;
typedef void (*bus_client_cb_t)(void *ctx, void *msg);
#define CAS(dst, expected, value) \
__atomic_compare_exchange(dst, expected, value, 0, __ATOMIC_SEQ_CST, \
__ATOMIC_SEQ_CST)
typedef struct {
bool registered;
unsigned int refcnt;
bus_client_cb_t callback;
void *ctx;
} bus_client_t;
typedef struct {
bus_client_t *clients;
const unsigned int n_clients;
} bus_t;
bool __attribute__((warn_unused_result))
bus_new(bus_t **bus, unsigned int n_clients)
{
if (n_clients > BUS_MAX_CLIENTS)
return false;
bus_t *b;
if (!(b = malloc(sizeof(bus_t))))
return false;
*(unsigned int *) &b->n_clients =
!n_clients ? BUS_DEFAULT_CLIENTS : n_clients;
if (!(b->clients = calloc(b->n_clients, sizeof(bus_client_t)))) {
free(b);
return false;
}
*bus = b;
return true;
}
bool __attribute__((warn_unused_result, nonnull(1)))
bus_register(bus_t *bus,
bus_client_id_t id,
bus_client_cb_t callback,
void *ctx)
{
if (id >= bus->n_clients)
return false;
bus_client_t null_client = {0};
bus_client_t new_client = {
.registered = true,
.callback = callback,
.ctx = ctx,
.refcnt = 0,
};
return (bool) CAS(&(bus->clients[id]), &null_client, &new_client);
}
static bool execute_client_callback(bus_client_t *client, void *msg)
{
bus_client_t local_client;
__atomic_load(client, &local_client, __ATOMIC_SEQ_CST);
while (local_client.registered) {
bus_client_t new_client = local_client;
++(new_client.refcnt);
XXXXX ;
}
return false;
}
bool __attribute__((warn_unused_result, nonnull(1)))
bus_send(bus_t *bus, bus_client_id_t id, void *msg, bool broadcast)
{
if (broadcast) {
for (id = 0; id < bus->n_clients; ++id)
execute_client_callback(&(bus->clients[id]), msg);
return true;
}
if (id >= bus->n_clients)
return false;
return execute_client_callback(&(bus->clients[id]), msg);
}
bool __attribute__((warn_unused_result, nonnull(1)))
bus_unregister(bus_t *bus, bus_client_id_t id)
{
if (id >= bus->n_clients)
return false;
bus_client_t local_client, null_client = {0};
__atomic_load(&(bus->clients[id]), &local_client, __ATOMIC_SEQ_CST);
if (!local_client.registered)
return false;
do {
local_client.refcnt = 0;
if (CAS(&(bus->clients[id]), &local_client, &null_client))
return true;
} while (local_client.registered);
return true;
}
void bus_free(bus_t *bus)
{
if (!bus)
return;
free(bus->clients);
free(bus);
}
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#define NUM_THREADS 4
typedef struct {
bus_t *bus;
unsigned int id;
} thread_data_t;
static void bus_callback(void *_ctx, void *_msg)
{
unsigned int ctx = *(unsigned int *) _ctx, msg = *(unsigned int *) _msg;
printf("Callback for thread %u received: %u\n", ctx, msg);
}
static void *thread_func(void *_data)
{
thread_data_t *data = (thread_data_t *) _data;
bus_client_id_t dest = (data->id + 1) % NUM_THREADS;
if (!bus_register(data->bus, data->id, &bus_callback, &(data->id))) {
perror("bus_register");
return NULL;
}
printf("Registered callback from thread %u\n", data->id);
while (!bus_send(data->bus, dest, &(data->id), false))
;
if (bus_unregister(data->bus, dest))
return NULL;
return NULL;
}
int main()
{
pthread_t threads[NUM_THREADS];
thread_data_t ctx[NUM_THREADS];
bus_t *bus;
if (!bus_new(&bus, 0)) {
perror("bus_new");
exit(EXIT_FAILURE);
}
for (int i = 0; i < NUM_THREADS; ++i) {
ctx[i].bus = bus, ctx[i].id = i;
if (pthread_create(&threads[i], NULL, thread_func, &ctx[i]))
perror("pthread_create");
}
for (int i = 0; i < NUM_THREADS; ++i) {
if (pthread_join(threads[i], NULL))
perror("pthread_join");
}
bus_free(bus);
return 0;
}
mbus.c
編譯並連結:
參考執行輸出:
Image Not Showing
Possible Reasons
- The image file may be corrupted
- The server hosting the image is unavailable
- The image path is incorrect
- The image format is not supported
Learn More →
上述 register/callback 的順序可能會變動
請依據上述程式行為和註解,補完程式碼。作答時,請列出 static bool execute_client_callback(bus_client_t *client, void *msg)
完整內容,並且回答以下問題:
mbus.c
貌似彆扭的 callback function 設計,能夠解決什麼問題?
- 考慮到訊息處理的數量可能相當大,我們可事先建立 thread pool 並透過 Processor affinity 讓執行緒在執行 callback function 時,儘量不要相互干擾。請簡述你打算如何修改程式碼,以實現這目標,可斟酌列出關鍵程式碼。
延伸問題:
- 解釋上述程式碼運作原理,特別是 atomics 的使用,你應該闡述相較於 lock-based (使用 pthread_mutex_lock) 的執行成本開銷
- 在 Linux 核心原始程式碼中,MHI bus 和 KBUS 都是 message bus 的實作,請簡述其設計考量和應用場景