# 2018q1 第 15 週測驗題
### 測驗 `1`
延伸 [第 14 週測驗題](https://hackmd.io/s/ByN53wcy7),考慮以下 C 程式,實作了符合 [lock-free 特性](http://preshing.com/20120612/an-introduction-to-lock-free-programming/) 的多個 producer、單一 consumer 的 queue:
```C=
/* sample lockless Single Consumer Multiple Producer FIFO queue */
#include <assert.h>
#include <stdatomic.h>
#include <stdlib.h>
#include <string.h>
typedef int LFQueueResult;
enum {
LF_QUEUE_RESULT_TRUE = 2,
LF_QUEUE_RESULT_SUCCESS = 1,
LF_QUEUE_RESULT_FALSE = 0,
LF_QUEUE_RESULT_OUT_OF_MEMORY = -1,
};
typedef struct LFQueueNode { atomic_uintptr_t next; } LFQueueNode;
typedef struct LFQueue {
atomic_uintptr_t head;
atomic_uintptr_t tail;
size_t item_size;
} LFQueue;
LFQueueResult lfqueue_init(LFQueue *lfqueue, size_t item_size) {
assert(lfqueue);
atomic_init(&lfqueue->head, NULL); atomic_init(&lfqueue->tail, NULL);
lfqueue->item_size = item_size;
return LF_QUEUE_RESULT_SUCCESS;
}
LFQueueResult lfqueue_has_front(LFQueue *lfqueue) {
assert(lfqueue);
if (ZZ0 (&lfqueue->head) == 0) return LF_QUEUE_RESULT_FALSE;
return LF_QUEUE_RESULT_TRUE;
}
// gets the value at the front of the queue; consumer operation
LFQueueResult lfqueue_front(LFQueue *lfqueue, void *data) {
assert(lfqueue); assert(data);
LFQueueNode *head = (LFQueueNode *) ZZ1 (&lfqueue->head);
assert(head);
memcpy(data, (void *) (head + 1), lfqueue->item_size);
return LF_QUEUE_RESULT_SUCCESS;
}
// removes the item at the front of the queue; consumer operation
LFQueueResult lfqueue_pop(LFQueue *lfqueue) {
assert(lfqueue);
assert(lfqueue_has_front(lfqueue) == LF_QUEUE_RESULT_TRUE);
// get the head
LFQueueNode *popped = atomic_load(&lfqueue->head);
LFQueueNode *compare = popped;
// set the tail and head to nothing if they are the same
if (atomic_compare_exchange_strong(&lfqueue->tail, &compare, 0)) {
compare = popped;
// its possible for another thread to have pushed after we swap out the
// tail, in this case the head will be different then what was popped,
// so we just do a blind exchange, not caring about the result
ZZ2 (&lfqueue->head, &compare, 0);
} else {
// tail is different from head, set the head to the next value
LFQueueNode *new_head = 0;
while (!new_head) {
// its possible that the next node hasn't been assigned yet, so just
// spin until the pushing thread stores the value
new_head = (LFQueueNode *) atomic_load(&popped->next);
}
ZZ3 (&lfqueue->head, new_head);
}
// delete the popped node
free(popped);
return LF_QUEUE_RESULT_SUCCESS;
}
// adds an item to the back of the queue; producer operation
LFQueueResult lfqueue_push(LFQueue *lfqueue, void *data) {
assert(lfqueue);
// create the new tail
LFQueueNode *new_tail = malloc(sizeof(LFQueueNode) + lfqueue->item_size);
if (!new_tail) return LF_QUEUE_RESULT_OUT_OF_MEMORY;
atomic_init(&new_tail->next, 0);
memcpy(new_tail + 1, data, lfqueue->item_size);
// swap the new tail with the old
LFQueueNode *old_tail =
(LFQueueNode *) ZZ4 (&lfqueue->tail, new_tail);
// link the old tail to the new
if (old_tail) {
atomic_store(&old_tail->next, new_tail);
} else {
atomic_store(&lfqueue->head, new_tail);
}
return LF_QUEUE_RESULT_SUCCESS;
}
// clears the entire queue; consumer operation
LFQueueResult lfqueue_clear(LFQueue *lfqueue) {
assert(lfqueue);
// pop everything
while (lfqueue_has_front(lfqueue) == LF_QUEUE_RESULT_TRUE) {
LFQueueResult result = lfqueue_pop(lfqueue);
assert(result == LF_QUEUE_RESULT_SUCCESS);
}
return LF_QUEUE_RESULT_SUCCESS;
}
#include <pthread.h>
typedef struct LFQueueTestThreadSafety {
atomic_int consume_count, producer_count;
LFQueue lfqueue;
} LFQueueTestThreadSafety;
#define LF_QUEUE_TEST_THREAD_SAFETY_COUNT 50000
#define LF_QUEUE_TEST_THREAD_PRODUCER_COUNT 7
static void *test_thread_safety_consumer(void *arg) {
LFQueueTestThreadSafety *test = (LFQueueTestThreadSafety *) arg;
while (atomic_load(&test->consume_count) <
LF_QUEUE_TEST_THREAD_SAFETY_COUNT) {
if (lfqueue_has_front(&test->lfqueue)) {
ZZ5 (&test->consume_count, 1);
LFQueueResult result = lfqueue_pop(&test->lfqueue);
assert(result == LF_QUEUE_RESULT_SUCCESS);
}
}
return NULL;
}
static void *test_thread_safety_producer(void *arg) {
LFQueueTestThreadSafety *test = (LFQueueTestThreadSafety *) arg;
while (1) {
int in = atomic_fetch_add(&test->producer_count, 1);
if (in >= LF_QUEUE_TEST_THREAD_SAFETY_COUNT) break;
LFQueueResult result = lfqueue_push(&test->lfqueue, &in);
assert(result == LF_QUEUE_RESULT_SUCCESS);
}
return NULL;
}
int main(int argc, const char *argv[]) {
LFQueueTestThreadSafety test;
atomic_init(&test.consume_count, 0);
atomic_init(&test.producer_count, 0);
{
LFQueueResult result = lfqueue_init(&test.lfqueue, sizeof(int));
assert(result == LF_QUEUE_RESULT_SUCCESS);
}
// create threads
pthread_t consumer;
pthread_t producers[LF_QUEUE_TEST_THREAD_PRODUCER_COUNT];
{
int p_result =
pthread_create(&consumer, NULL, test_thread_safety_consumer, &test);
assert(p_result == 0);
}
for (size_t i = 0; i < LF_QUEUE_TEST_THREAD_PRODUCER_COUNT; ++i) {
int p_result = pthread_create(&producers[i], NULL,
test_thread_safety_producer, &test);
assert(p_result == 0);
}
// wait for everything to finish
for (size_t i = 0; i < LF_QUEUE_TEST_THREAD_PRODUCER_COUNT; ++i) {
int p_result = pthread_join(producers[i], NULL);
assert(p_result == 0);
}
{
int p_result = pthread_join(consumer, NULL);
assert(p_result == 0);
}
assert(lfqueue_has_front(&test.lfqueue) == LF_QUEUE_RESULT_FALSE);
return 0;
}
```
補完以上程式碼。
==作答區==
ZZ0 = ? (第 32 行)
* `(a)` atomic_load
* `(b)` atomic_compare_exchange_strong
* `(c)` atomic_store
* `(d)` atomic_exchange
* `(e)` atomic_fetch_add
ZZ1 = ? (第 39 行)
* `(a)` atomic_load
* `(b)` atomic_compare_exchange_strong
* `(c)` atomic_store
* `(d)` atomic_exchange
* `(e)` atomic_fetch_add
ZZ2 = ? (第 58 行)
* `(a)` atomic_load
* `(b)` atomic_compare_exchange_strong
* `(c)` atomic_store
* `(d)` atomic_exchange
* `(e)` atomic_fetch_add
ZZ3 = ? (第 67 行)
* `(a)` atomic_load
* `(b)` atomic_compare_exchange_strong
* `(c)` atomic_store
* `(d)` atomic_exchange
* `(e)` atomic_fetch_add
ZZ4 = ? (第 84 行)
* `(a)` atomic_load
* `(b)` atomic_compare_exchange_strong
* `(c)` atomic_store
* `(d)` atomic_exchange
* `(e)` atomic_fetch_add
ZZ5 = ? (第 120 行)
* `(a)` atomic_load
* `(b)` atomic_compare_exchange_strong
* `(c)` atomic_store
* `(d)` atomic_exchange
* `(e)` atomic_fetch_add
:::info
延伸題目: 透過這樣的 lock-free queue 改寫網頁伺服器程式碼,使其得以回應多個 HTTP request
:::