# 2018q1 第 15 週測驗題 ### 測驗 `1` 延伸 [第 14 週測驗題](https://hackmd.io/s/ByN53wcy7),考慮以下 C 程式,實作了符合 [lock-free 特性](http://preshing.com/20120612/an-introduction-to-lock-free-programming/) 的多個 producer、單一 consumer 的 queue: ```C= /* sample lockless Single Consumer Multiple Producer FIFO queue */ #include <assert.h> #include <stdatomic.h> #include <stdlib.h> #include <string.h> typedef int LFQueueResult; enum { LF_QUEUE_RESULT_TRUE = 2, LF_QUEUE_RESULT_SUCCESS = 1, LF_QUEUE_RESULT_FALSE = 0, LF_QUEUE_RESULT_OUT_OF_MEMORY = -1, }; typedef struct LFQueueNode { atomic_uintptr_t next; } LFQueueNode; typedef struct LFQueue { atomic_uintptr_t head; atomic_uintptr_t tail; size_t item_size; } LFQueue; LFQueueResult lfqueue_init(LFQueue *lfqueue, size_t item_size) { assert(lfqueue); atomic_init(&lfqueue->head, NULL); atomic_init(&lfqueue->tail, NULL); lfqueue->item_size = item_size; return LF_QUEUE_RESULT_SUCCESS; } LFQueueResult lfqueue_has_front(LFQueue *lfqueue) { assert(lfqueue); if (ZZ0 (&lfqueue->head) == 0) return LF_QUEUE_RESULT_FALSE; return LF_QUEUE_RESULT_TRUE; } // gets the value at the front of the queue; consumer operation LFQueueResult lfqueue_front(LFQueue *lfqueue, void *data) { assert(lfqueue); assert(data); LFQueueNode *head = (LFQueueNode *) ZZ1 (&lfqueue->head); assert(head); memcpy(data, (void *) (head + 1), lfqueue->item_size); return LF_QUEUE_RESULT_SUCCESS; } // removes the item at the front of the queue; consumer operation LFQueueResult lfqueue_pop(LFQueue *lfqueue) { assert(lfqueue); assert(lfqueue_has_front(lfqueue) == LF_QUEUE_RESULT_TRUE); // get the head LFQueueNode *popped = atomic_load(&lfqueue->head); LFQueueNode *compare = popped; // set the tail and head to nothing if they are the same if (atomic_compare_exchange_strong(&lfqueue->tail, &compare, 0)) { compare = popped; // its possible for another thread to have pushed after we swap out the // tail, in this case the head will be different then what was popped, // so we just do a blind exchange, not caring about the result ZZ2 (&lfqueue->head, &compare, 0); } else { // tail is different from head, set the head to the next value LFQueueNode *new_head = 0; while (!new_head) { // its possible that the next node hasn't been assigned yet, so just // spin until the pushing thread stores the value new_head = (LFQueueNode *) atomic_load(&popped->next); } ZZ3 (&lfqueue->head, new_head); } // delete the popped node free(popped); return LF_QUEUE_RESULT_SUCCESS; } // adds an item to the back of the queue; producer operation LFQueueResult lfqueue_push(LFQueue *lfqueue, void *data) { assert(lfqueue); // create the new tail LFQueueNode *new_tail = malloc(sizeof(LFQueueNode) + lfqueue->item_size); if (!new_tail) return LF_QUEUE_RESULT_OUT_OF_MEMORY; atomic_init(&new_tail->next, 0); memcpy(new_tail + 1, data, lfqueue->item_size); // swap the new tail with the old LFQueueNode *old_tail = (LFQueueNode *) ZZ4 (&lfqueue->tail, new_tail); // link the old tail to the new if (old_tail) { atomic_store(&old_tail->next, new_tail); } else { atomic_store(&lfqueue->head, new_tail); } return LF_QUEUE_RESULT_SUCCESS; } // clears the entire queue; consumer operation LFQueueResult lfqueue_clear(LFQueue *lfqueue) { assert(lfqueue); // pop everything while (lfqueue_has_front(lfqueue) == LF_QUEUE_RESULT_TRUE) { LFQueueResult result = lfqueue_pop(lfqueue); assert(result == LF_QUEUE_RESULT_SUCCESS); } return LF_QUEUE_RESULT_SUCCESS; } #include <pthread.h> typedef struct LFQueueTestThreadSafety { atomic_int consume_count, producer_count; LFQueue lfqueue; } LFQueueTestThreadSafety; #define LF_QUEUE_TEST_THREAD_SAFETY_COUNT 50000 #define LF_QUEUE_TEST_THREAD_PRODUCER_COUNT 7 static void *test_thread_safety_consumer(void *arg) { LFQueueTestThreadSafety *test = (LFQueueTestThreadSafety *) arg; while (atomic_load(&test->consume_count) < LF_QUEUE_TEST_THREAD_SAFETY_COUNT) { if (lfqueue_has_front(&test->lfqueue)) { ZZ5 (&test->consume_count, 1); LFQueueResult result = lfqueue_pop(&test->lfqueue); assert(result == LF_QUEUE_RESULT_SUCCESS); } } return NULL; } static void *test_thread_safety_producer(void *arg) { LFQueueTestThreadSafety *test = (LFQueueTestThreadSafety *) arg; while (1) { int in = atomic_fetch_add(&test->producer_count, 1); if (in >= LF_QUEUE_TEST_THREAD_SAFETY_COUNT) break; LFQueueResult result = lfqueue_push(&test->lfqueue, &in); assert(result == LF_QUEUE_RESULT_SUCCESS); } return NULL; } int main(int argc, const char *argv[]) { LFQueueTestThreadSafety test; atomic_init(&test.consume_count, 0); atomic_init(&test.producer_count, 0); { LFQueueResult result = lfqueue_init(&test.lfqueue, sizeof(int)); assert(result == LF_QUEUE_RESULT_SUCCESS); } // create threads pthread_t consumer; pthread_t producers[LF_QUEUE_TEST_THREAD_PRODUCER_COUNT]; { int p_result = pthread_create(&consumer, NULL, test_thread_safety_consumer, &test); assert(p_result == 0); } for (size_t i = 0; i < LF_QUEUE_TEST_THREAD_PRODUCER_COUNT; ++i) { int p_result = pthread_create(&producers[i], NULL, test_thread_safety_producer, &test); assert(p_result == 0); } // wait for everything to finish for (size_t i = 0; i < LF_QUEUE_TEST_THREAD_PRODUCER_COUNT; ++i) { int p_result = pthread_join(producers[i], NULL); assert(p_result == 0); } { int p_result = pthread_join(consumer, NULL); assert(p_result == 0); } assert(lfqueue_has_front(&test.lfqueue) == LF_QUEUE_RESULT_FALSE); return 0; } ``` 補完以上程式碼。 ==作答區== ZZ0 = ? (第 32 行) * `(a)` atomic_load * `(b)` atomic_compare_exchange_strong * `(c)` atomic_store * `(d)` atomic_exchange * `(e)` atomic_fetch_add ZZ1 = ? (第 39 行) * `(a)` atomic_load * `(b)` atomic_compare_exchange_strong * `(c)` atomic_store * `(d)` atomic_exchange * `(e)` atomic_fetch_add ZZ2 = ? (第 58 行) * `(a)` atomic_load * `(b)` atomic_compare_exchange_strong * `(c)` atomic_store * `(d)` atomic_exchange * `(e)` atomic_fetch_add ZZ3 = ? (第 67 行) * `(a)` atomic_load * `(b)` atomic_compare_exchange_strong * `(c)` atomic_store * `(d)` atomic_exchange * `(e)` atomic_fetch_add ZZ4 = ? (第 84 行) * `(a)` atomic_load * `(b)` atomic_compare_exchange_strong * `(c)` atomic_store * `(d)` atomic_exchange * `(e)` atomic_fetch_add ZZ5 = ? (第 120 行) * `(a)` atomic_load * `(b)` atomic_compare_exchange_strong * `(c)` atomic_store * `(d)` atomic_exchange * `(e)` atomic_fetch_add :::info 延伸題目: 透過這樣的 lock-free queue 改寫網頁伺服器程式碼,使其得以回應多個 HTTP request :::