Try   HackMD

2021q1 第 4 週測驗題

tags: linux2021

目的: 檢驗學員對 bitwise operation並行和多執行緒程式設計 的認知

作答表單

測驗 1

3 月 14 日是圓周率日,這天也是愛因斯坦的生日,求圓周率近似值的討論可見:

Gregory-Leibniz 級數可優雅地計算圓周率,參考 Leibniz's Formula for Pi。從下面的 Madhava–Leibniz series 開始推導:

arctan(1)=π4=113+1517+ ...

首先積分下列數列

11+t2=1t2+t4t6+t8+...+t4nt4n+21+t2

0 積分到
x
,
0x1

0x11+t2dt=xx35+x55x77+...+x4n+14n+1Rn(x)where Rn(x)=0xt4n+21+t2dt

先看

Rn(x) ,因為
11+t2
,得到
0Rn(x)0xt4n+2dt=x4n+34n+3

又因為

x4n+34n+314n+3,0x1

依據夾擠定理 (squeeze theorem,也稱為 sandwich theorem),當

n,14n+30 ,於是得出下列式子
0x11+t2dt=xx33+x55x77+ ...

而且

ddxarctan(x)=11+t2
arctan(x)=xx33+x55x77+ ...

此時將

x 代入 1,即可得
π4

以下是對應實作:

double compute_pi_leibniz(size_t N)
{
    double pi = 0.0;
    for (size_t i = 0; i < N; i++) {
        double tmp = (i & 1) ? (-1) : 1;
        pi += tmp / (2 * i + 1);
    }
    return pi * 4.0;
}

比較單執行緒、多執行緒和 SIMD 版本的表現:

1995 年提出的 Bailey–Borwein–Plouffe formula,可用這三位發表者的姓氏開頭簡稱為 BBP 公式,最初的公式如下:

π=k=0116k(48k+128k+418k+518k+6)

BBP 公式跳脫典型的圓周率的演算法,可計算圓周率的任意第

n 位,而不用事先計算前面的
n1
位,於是 BBP 公式很適合透過並行運算來求圓周率近似值。

並行和多執行緒程式設計 提到 thread pool,如此設計的考量如下:

  1. 在大量的執行緒環境中,建立和銷毀執行緒物件的開銷相當可觀,而且頻繁的執行緒物件建立和銷毀,會對高度並行化的應用程式帶來額外的延遲時間
  2. 考慮到硬體的有效處理器數量有限,使用 thread pool 可控管執行分配到處理器的執行緒數量

用醫院來比喻:

  • 沒有 thread pool 時: 醫院每天要面對成千上萬的病人,每當一個病人求診,就找一位醫生處理,看診後醫生也跟著離開。當看病時間較短時,醫生來去的時間,顯得尤為費時
  • 初步引入 thread pool: 醫院設置門診,把醫生全派出去坐診,病人想看診之前,強制先掛號排隊,醫生根據病人隊列順序,依次處理各個病人,這樣就省去醫生往返的時間。但倘若病患很少,醫生卻過多,這會使得很多醫生閒置,浪費醫療資源
  • 改進 thread pool: 門診一開始只派出部分醫生,但增加一位協調人 (現實就是護理師擔任),病人依舊是排隊看病,協調人負責調度醫療資源。當病人很多、醫生忙不過來時,協調人就呼叫更多醫生來幫忙;當病人不多、醫生過多時,協調人就安排部分醫生休息待命,避免醫療資源的浪費

示意圖:

以下是一個 thread pool 實作:

#include <stddef.h>

typedef struct __tpool_future *tpool_future_t;
typedef struct __threadpool *tpool_t;

/**
 * Create a thread pool containing specified number of threads.
 * If successful, the thread pool is returned. Otherwise, it
 * returns NULL.
 */
tpool_t tpool_create(size_t count);

/**
 * Schedules the specific function to be executed.
 * If successful, a future object representing the execution of
 * the task is returned. Otherwise, it returns NULL.
 */
tpool_future_t tpool_apply(tpool_t pool, void *(*func)(void *), void *arg);

/**
 * Wait for all pending tasks to complete before destroying the thread pool.
 */
int tpool_join(tpool_t pool);

/**
 * Return the result when it becomes available.
 * If @seconds is non-zero and the result does not arrive within specified time,
 * NULL is returned. Each tpool_future_get() resets the timeout status on
 * @future.
 */
void *tpool_future_get(tpool_future_t future, unsigned int seconds);

/**
 * Destroy the future object and free resources once it is no longer used.
 * It is an error to refer to a destroyed future object. Note that destroying
 * a future object does not prevent a pending task from being executed.
 */
int tpool_future_destroy(tpool_future_t future);

#include <errno.h>
#include <pthread.h>
#include <stdlib.h>
#include <time.h>

enum __future_flags {
    __FUTURE_RUNNING = 01,
    __FUTURE_FINISHED = 02,
    __FUTURE_TIMEOUT = 04,
    __FUTURE_CANCELLED = 010,
    __FUTURE_DESTROYED = 020,
};

typedef struct __threadtask {
    void *(*func)(void *);
    void *arg;
    struct __tpool_future *future;
    struct __threadtask *next;
} threadtask_t;

typedef struct __jobqueue {
    threadtask_t *head, *tail;
    pthread_cond_t cond_nonempty;
    pthread_mutex_t rwlock;
} jobqueue_t;

struct __tpool_future {
    int flag;
    void *result;
    pthread_mutex_t mutex;
    pthread_cond_t cond_finished;
};

struct __threadpool {
    size_t count;
    pthread_t *workers;
    jobqueue_t *jobqueue;
};

static struct __tpool_future *tpool_future_create(void)
{
    struct __tpool_future *future = malloc(sizeof(struct __tpool_future));
    if (future) {
        future->flag = 0;
        future->result = NULL;
        pthread_mutex_init(&future->mutex, NULL);
        pthread_condattr_t attr;
        pthread_condattr_init(&attr);
        pthread_cond_init(&future->cond_finished, &attr);
        pthread_condattr_destroy(&attr);
    }
    return future;
}

int tpool_future_destroy(struct __tpool_future *future)
{
    if (future) {
        pthread_mutex_lock(&future->mutex);
        if (future->flag & __FUTURE_FINISHED ||
            future->flag & __FUTURE_CANCELLED) {
            pthread_mutex_unlock(&future->mutex);
            pthread_mutex_destroy(&future->mutex);
            pthread_cond_destroy(&future->cond_finished);
            free(future);
        } else {
            future->flag |= __FUTURE_DESTROYED;
            pthread_mutex_unlock(&future->mutex);
        }
    }
    return 0;
}

void *tpool_future_get(struct __tpool_future *future, unsigned int seconds)
{
    pthread_mutex_lock(&future->mutex);
    /* turn off the timeout bit set previously */
    future->flag &= ~__FUTURE_TIMEOUT;
    while ((future->flag & __FUTURE_FINISHED) == 0) {
        if (seconds) {
            struct timespec expire_time;
            clock_gettime(CLOCK_MONOTONIC, &expire_time);
            expire_time.tv_sec += seconds;
            int status = pthread_cond_timedwait(&future->cond_finished,
                                                &future->mutex, &expire_time);
            if (status == ETIMEDOUT) {
                future->flag |= __FUTURE_TIMEOUT;
                pthread_mutex_unlock(&future->mutex);
                return NULL;
            }
        } else
            FFF;
    }

    pthread_mutex_unlock(&future->mutex);
    return future->result;
}

static jobqueue_t *jobqueue_create(void)
{
    jobqueue_t *jobqueue = malloc(sizeof(jobqueue_t));
    if (jobqueue) {
        jobqueue->head = jobqueue->tail = NULL;
        pthread_cond_init(&jobqueue->cond_nonempty, NULL);
        pthread_mutex_init(&jobqueue->rwlock, NULL);
    }
    return jobqueue;
}

static void jobqueue_destroy(jobqueue_t *jobqueue)
{
    threadtask_t *tmp = jobqueue->head;
    while (tmp) {
        jobqueue->head = jobqueue->head->next;
        pthread_mutex_lock(&tmp->future->mutex);
        if (tmp->future->flag & __FUTURE_DESTROYED) {
            pthread_mutex_unlock(&tmp->future->mutex);
            pthread_mutex_destroy(&tmp->future->mutex);
            pthread_cond_destroy(&tmp->future->cond_finished);
            free(tmp->future);
        } else {
            tmp->future->flag |= __FUTURE_CANCELLED;
            pthread_mutex_unlock(&tmp->future->mutex);
        }
        free(tmp);
        tmp = jobqueue->head;
    }

    pthread_mutex_destroy(&jobqueue->rwlock);
    pthread_cond_destroy(&jobqueue->cond_nonempty);
    free(jobqueue);
}

static void __jobqueue_fetch_cleanup(void *arg)
{
    pthread_mutex_t *mutex = (pthread_mutex_t *) arg;
    pthread_mutex_unlock(mutex);
}

static void *jobqueue_fetch(void *queue)
{
    jobqueue_t *jobqueue = (jobqueue_t *) queue;
    threadtask_t *task;
    int old_state;

    pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);

    while (1) {
        pthread_mutex_lock(&jobqueue->rwlock);
        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
        pthread_testcancel();

        GGG;
        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
        if (jobqueue->head == jobqueue->tail) {
            task = jobqueue->tail;
            jobqueue->head = jobqueue->tail = NULL;
        } else {
            threadtask_t *tmp;
            for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
                 tmp = tmp->next)
                ;
            task = tmp->next;
            tmp->next = NULL;
            jobqueue->tail = tmp;
        }
        pthread_mutex_unlock(&jobqueue->rwlock);

        if (task->func) {
            pthread_mutex_lock(&task->future->mutex);
            if (task->future->flag & __FUTURE_CANCELLED) {
                pthread_mutex_unlock(&task->future->mutex);
                free(task);
                continue;
            } else {
                task->future->flag |= __FUTURE_RUNNING;
                pthread_mutex_unlock(&task->future->mutex);
            }

            void *ret_value = task->func(task->arg);
            pthread_mutex_lock(&task->future->mutex);
            if (task->future->flag & __FUTURE_DESTROYED) {
                pthread_mutex_unlock(&task->future->mutex);
                pthread_mutex_destroy(&task->future->mutex);
                pthread_cond_destroy(&task->future->cond_finished);
                free(task->future);
            } else {
                task->future->flag |= KKK;
                task->future->result = ret_value;
                LLL;
                pthread_mutex_unlock(&task->future->mutex);
            }
            free(task);
        } else {
            pthread_mutex_destroy(&task->future->mutex);
            pthread_cond_destroy(&task->future->cond_finished);
            free(task->future);
            free(task);
            break;
        }
    }

    pthread_cleanup_pop(0);
    pthread_exit(NULL);
}

struct __threadpool *tpool_create(size_t count)
{
    jobqueue_t *jobqueue = jobqueue_create();
    struct __threadpool *pool = malloc(sizeof(struct __threadpool));
    if (!jobqueue || !pool) {
        if (jobqueue)
            jobqueue_destroy(jobqueue);
        free(pool);
        return NULL;
    }

    pool->count = count, pool->jobqueue = jobqueue;
    if ((pool->workers = malloc(count * sizeof(pthread_t)))) {
        for (int i = 0; i < count; i++) {
            if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch,
                               (void *) jobqueue)) {
                for (int j = 0; j < i; j++)
                    pthread_cancel(pool->workers[j]);
                for (int j = 0; j < i; j++)
                    pthread_join(pool->workers[j], NULL);
                free(pool->workers);
                jobqueue_destroy(jobqueue);
                free(pool);
                return NULL;
            }
        }
        return pool;
    }

    jobqueue_destroy(jobqueue);
    free(pool);
    return NULL;
}

struct __tpool_future *tpool_apply(struct __threadpool *pool,
                                   void *(*func)(void *),
                                   void *arg)
{
    jobqueue_t *jobqueue = pool->jobqueue;
    threadtask_t *new_head = malloc(sizeof(threadtask_t));
    struct __tpool_future *future = tpool_future_create();
    if (new_head && future) {
        new_head->func = func, new_head->arg = arg, new_head->future = future;
        pthread_mutex_lock(&jobqueue->rwlock);
        if (jobqueue->head) {
            new_head->next = jobqueue->head;
            jobqueue->head = new_head;
        } else {
            jobqueue->head = jobqueue->tail = new_head;
            HHH;
        }
        pthread_mutex_unlock(&jobqueue->rwlock);
    } else if (new_head) {
        free(new_head);
        return NULL;
    } else if (future) {
        tpool_future_destroy(future);
        return NULL;
    }
    return future;
}

int tpool_join(struct __threadpool *pool)
{
    size_t num_threads = pool->count;
    for (int i = 0; i < num_threads; i++)
        tpool_apply(pool, NULL, NULL);
    for (int i = 0; i < num_threads; i++)
        pthread_join(pool->workers[i], NULL);
    free(pool->workers);
    jobqueue_destroy(pool->jobqueue);
    free(pool);
    return 0;
}

我們使用 Bailey–Borwein–Plouffe formula 和上述的 thread pool 撰寫求圓周率近似值的程式碼:

#include <math.h>
#include <stdio.h>

#define PRECISION 100 /* upper bound in BPP sum */

/* Use Bailey–Borwein–Plouffe formula to approximate PI */
static void *bpp(void *arg)
{
    int k = *(int *) arg;
    double sum = (4.0 / (8 * k + 1)) - (2.0 / (8 * k + 4)) -
                 (1.0 / (8 * k + 5)) - (1.0 / (8 * k + 6));
    double *product = malloc(sizeof(double));
    if (product)
        *product = 1 / pow(16, k) * sum;
    return (void *) product;
}

int main()
{
    int bpp_args[PRECISION + 1];
    double bpp_sum = 0;
    tpool_t pool = tpool_create(4);
    tpool_future_t futures[PRECISION + 1];

    for (int i = 0; i <= PRECISION; i++) {
        bpp_args[i] = i;
        futures[i] = tpool_apply(pool, bpp, (void *) &bpp_args[i]);
    }

    for (int i = 0; i <= PRECISION; i++) {
        double *result = tpool_future_get(futures[i], 0 /* blocking wait */);
        bpp_sum += *result;
        tpool_future_destroy(futures[i]);
        free(result);
    }

    tpool_join(pool);
    printf("PI calculated with %d terms: %.15f\n", PRECISION + 1, bpp_sum);
    return 0;
}

預期執行輸出:

PI calculated with 101 terms: 3.141592653589793

請補完程式碼,使執行結果符合預期。

作答區

FFF = ?

  • (a) pthread_cond_wait(&future->cond_finished, &future->mutex)
  • (b) pthread_cond_wait(&future->mutex, &future->cond_finished)
  • (c) pthread_cond_broadcast(&future->cond_finished, &future->mutex)
  • (d) return NULL

GGG = ?

  • (a) while (!jobqueue->tail) /* wait */
  • (b) /* no operation */
  • (c) while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock)
  • (d) while (!jobqueue->tail) pthread_cond_broadcast(&jobqueue->cond_nonempty)

HHH = ?

  • (a) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock)
  • (b) pthread_cond_broadcast(&jobqueue->cond_nonempty)

KKK = ?

  • (a) __FUTURE_RUNNING
  • (b) __FUTURE_FINISHED
  • (c) __FUTURE_TIMEOUT
  • (d) __FUTURE_CANCELLED
  • (e) __FUTURE_DESTROYED

LLL = ?

  • (a) pthread_cond_broadcast(&task->future->cond_finished)
  • (b) pthread_cond_wait(&task->future->cond_finished, &future->mutex)

延伸問題:

  1. 解釋上述程式碼運作原理,包含 timeout 處理機制,指出改進空間並實作
  2. 研讀 atomic_threadpool,指出其 atomic 操作的運用方式,並說明該 lock-free 的手法
  3. 嘗試使用 C11 Atomics 改寫上述程式碼,使其有更好的 scalability