Try   HackMD

並行程式設計: Thread Pool 實作和改進

共筆貢獻者: jserv, 93i7xo2, ccs100203, linD026, bakudr18

Thread Pool

thread pool 的設計考量如下:

  1. 在大量的執行緒環境中,建立和銷毀執行緒物件的開銷相當可觀,而且頻繁的執行緒物件建立和銷毀,會對高度並行化的應用程式帶來額外的延遲時間
  2. 考慮到硬體的有效處理器數量有限,使用 thread pool 可控管執行分配到處理器的執行緒數量

用醫院來比喻:

  • 沒有 thread pool 時: 醫院每天要面對成千上萬的病人,每當一個病人求診,就找一位醫生處理,看診後醫生也跟著離開。當看病時間較短時,醫生來去的時間,顯得尤為費時
  • 初步引入 thread pool: 醫院設置門診,把醫生全派出去坐診,病人想看診之前,強制先掛號排隊,醫生根據病人隊列順序,依次處理各個病人,這樣就省去醫生往返的時間。但倘若病患很少,醫生卻過多,這會使得很多醫生閒置,浪費醫療資源
  • 改進 thread pool: 門診一開始只派出部分醫生,但增加一位協調人 (現實就是護理師擔任),病人依舊是排隊看病,協調人負責調度醫療資源。當病人很多、醫生忙不過來時,協調人就呼叫更多醫生來幫忙;當病人不多、醫生過多時,協調人就安排部分醫生休息待命,避免醫療資源的浪費

示意圖:

適用並行運算的圓周率計算

3 月 14 日是圓周率日,這天也是愛因斯坦的生日,求圓周率近似值的討論可見:

Gregory-Leibniz 級數可優雅地計算圓周率,參考 Leibniz's Formula for Pi。從下面的 Madhava–Leibniz series 開始推導:
arctan(1)=π4=113+1517+ ...

首先積分下列數列
11+t2=1t2+t4t6+t8+...+t4nt4n+21+t2

0 積分到 x, 0x1
0x11+t2dt=xx33+x55x77+...+x4n+14n+1Rn(x)where Rn(x)=0xt4n+21+t2dt

先看 Rn(x) ,因為 11+t2,得到
0Rn(x)0xt4n+2dt=x4n+34n+3

又因為
x4n+34n+314n+3,0x1

依據夾擠定理 (squeeze theorem,也稱為 sandwich theorem),當 n,14n+30 ,於是得出下列式子
0x11+t2dt=xx33+x55x77+ ...

而且ddxarctan(x)=11+t2
arctan(x)=xx33+x55x77+ ...

此時將 x 代入 1,即可得 π4

以下是對應實作:

double compute_pi_leibniz(size_t N)
{
    double pi = 0.0;
    for (size_t i = 0; i < N; i++) {
        double tmp = (i & 1) ? (-1) : 1;
        pi += tmp / (2 * i + 1);
    }
    return pi * 4.0;
}

比較單執行緒、多執行緒和 SIMD 版本的表現:

1995 年提出的 Bailey–Borwein–Plouffe formula,可用這三位發表者的姓氏開頭簡稱為 BBP 公式,最初的公式如下:
π=k=0116k(48k+128k+418k+518k+6)

BBP 公式跳脫典型的圓周率的演算法,可計算圓周率的任意第 n 位,而不用事先計算前面的 n1 位,於是 BBP 公式很適合透過並行運算來求圓周率近似值。

典型的圓周率演算法必須計算前面的 n1 位才能夠計算,代表數列每一項之間具相依性。以公式
arctan(1)=π4=113+1517+...
以及
k=1116k(48k+128k+418k+518k+6)
來說,因為每一項的計算都只跟當下的 k 值有關,所以每個獨立的項都可以平行計算,但利用公式
π2=21×23×43×45×65×...
實作的其中一種方式如下:

double pi(size_t N) {
    double pi = 1;
    double n = 1;
    for (size_t j = 1;j <= N; j++, n++) {
        if (j & 1 == 0) {
            pi *= (n / (n + 1));
        } else {
            pi *= ((n + 1) / n);
        }
    }
    return 2 * pi; 
}

因為算出目前項之前需要先知道前面連乘得到的值 pi ,所以無法對每一項做並行運算。以下採用 BBP 公式搭配 Thread Pool 進行驗證。

實作

以下是一個 thread pool 實作: tpool.c

預期執行輸出:

PI calculated with 101 terms: 3.141592653589793

程式架構示意:







struct



jobqueue_fetch
jobqueue_fetch()



bpp
bpp()



bpp2
bpp()



__threadpool

__threadpool

count

workers

jobqueue



__threadpool:l1->jobqueue_fetch:w





jobqueue_t

jobqueue_t

*head

*tail

cond_nonempty

rwlock



__threadpool:l2->jobqueue_t:ll





threadtask_t

threadtask_t

*func

*arg

*future

*next



jobqueue_t:l0->threadtask_t:ll





threadtask_t1

threadtask_t

*func

*arg

*future

*next



jobqueue_t:l1->threadtask_t1:ll





threadtask_t:l0->bpp:w





threadtask_t:l3->threadtask_t1:ll





__tpool_future

__tpool_future

flag

*result

mutex

cond_finished



threadtask_t:l2->__tpool_future:ll





threadtask_t1:l0->bpp2:w





__tpool_future1

__tpool_future

flag

*result

mutex

cond_finished



threadtask_t1:l2->__tpool_future1:ll





Thread Pool & Job Queue

struct __threadpool {
  size_t count;
  pthread_t *workers;
  jobqueue_t *jobqueue;
};

typedef struct __jobqueue {
  threadtask_t *head, *tail;
  pthread_cond_t cond_nonempty;
  pthread_mutex_t rwlock;
} jobqueue_t;
  • count 紀錄 thread pool 內 worker thread (即 workers) 的數量
  • worker 閒置時向 __jobqueue 抓取被封裝的任務 threadtask_t 執行,以 FIFO 方式取出任務,tail 指向下一個待取出的任務。
  • 存取共享變數 __jobqueue 必須保證為 exclusive use,若無任務 worker 則以 pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock) 等待新的任務

Thread Task

typedef struct __threadtask {
  void *(*func)(void *);
  void *arg;
  struct __tpool_future *future;
  struct __threadtask *next;
} threadtask_t;

struct __tpool_future {
  int flag;
  void *result;
  pthread_mutex_t mutex;
  pthread_cond_t cond_finished;
};
  • worker 取得函式 func 執行的結果放在共享變數 future,而同時可能有其他執行緒存取 future 取得運算結果,同樣需要保護。
    • __threadtask__tpool_future 成對建立
    • worker 設置 flag 表示運算進行 (__FUTURE_RUNNING) 或完成 (__FUTURE_FINISHED)
    • 其他執行緒設置 flag 以干涉 worker 行為
      • __FUTURE_CANCELLED
      • __FUTURE_DESTROYED

程式流程

  1. tpool_create(): 建立一定數量的 worker 於 pool,worker 執行 jobqueue_fetch() 抓取 job queue 內的任務
  2. tpool_apply(): 將新建立的 threadtask_t 推進 job queue,每個 threadtask_t 各指向一個 __tpool_future 用以儲存運算結果。同時以 pthread_cond_broadcast() 喚醒等待 job queue 有新任務的 worker
  3. tpool_future_get()tpool_future_destroy(): 從 __tpool_future 取回運算結果後釋放
  4. tpool_join(): 呼叫 tpool_apply 新增空任務,worker 拿到空任務後結束執行緒,緊接著釋放 pool 和 job
  • jobqueue_fetch()






finite_state_machine



ENTRY




get_task

Get a new task from queue



ENTRY->get_task





EXIT




is_null_func

Valid function?



get_task:e->is_null_func





set_run

Set RUNNING



is_destroyed

DESTROYED?



set_run->is_destroyed


  Wait for function to finish



set_fin

Set FINISHED



broadcast

pthread_cond_broadcast()



set_fin->broadcast





free_task

free(__threadtask)



broadcast->free_task





free_task:w->get_task:w





free_future

free(__tpool_future)



free_future->free_task





is_cancelled

CANCELLED?



is_cancelled->set_run


N



is_cancelled:e->free_task:e


Y



is_destroyed:e->set_fin


N



is_destroyed->free_future


Y



is_null_func:e->EXIT


No. Terminated



is_null_func->is_cancelled


Yes



  • __FUTURE_CANCELLED__FUTURE_DESTROYED 的使用情境
    • tpool_future_destroy() 內,若 future->flag__FUTURE_FINISHED__FUTURE_CANCELLED 兩者之一,表示 __threadtask 已被釋放無法被 worker 拿到,故在此釋放資源。
      若非以上情況則說明對應的 __threadtask 仍可存取該 future,故將 future->flag 設置為 __FUTURE_DESTROYED 之後由 worker 來釋放 future 的資源。

      • 同理 jobqueue_destroy() 也有必要依據 __FUTURE_DESTROYED 來釋放 future
    • 使用到 pthread_cancel() 不代表設置 __FUTURE_CANCELLEDpthread_cancel() 僅在 tpool_create() 建立 worker 失敗時使用,向先前建立的 worker 發送 cancellation request

      ​​​​​​​​/* jobqueue_fetch */
      ​​​​​​​​pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
      ​​​​​​​​pthread_testcancel();
      
      ​​​​​​​​while (!jobqueue->tail) pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);
      
      ​​​​​​​​pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
      
    • jobqueue_destroy() 是唯一會將 flag 設置為 __FUTURE_CANCELLED,猜測使用情境是想中斷所有的 worker,摧毀所有 job queue 內的任務但保留 __tpool_future 自行處理 (但並沒有使用到關鍵的 rwlock)。
      目前 jobqueue_destroy() 只有用在 tpool_join() 或是用在 tpool_create() 失敗時摧毀空 queue

      ​​​​​​​​int tpool_join(struct __threadpool *pool) {
      ​​​​​​​​  ...
      ​​​​​​​​  jobqueue_destroy(pool->jobqueue);
      ​​​​​​​​  ...
      ​​​​​​​​}
      
      ​​​​​​​​struct __threadpool *tpool_create(size_t count) {
      ​​​​​​​​  ...
      ​​​​​​​​  jobqueue_destroy(jobqueue);
      ​​​​​​​​  free(pool);
      ​​​​​​​​  return NULL;
      ​​​​​​​​}
      
      ​​​​​​​​static void jobqueue_destroy(jobqueue_t *jobqueue) {
      ​​​​​​​​  threadtask_t *tmp = jobqueue->head;
      ​​​​​​​​  while (tmp) {
      ​​​​​​​​    /* Never executed */
      ​​​​​​​​  }
      ​​​​​​​​  pthread_mutex_destroy(&jobqueue->rwlock);
      ​​​​​​​​  pthread_cond_destroy(&jobqueue->cond_nonempty);
      ​​​​​​​​  free(jobqueue);
      ​​​​​​​​}
      

      總之,沒有一種情況是同時有 worker 在運行。

實作缺失回顧

修正 clock 屬性

pthread_cond_timedwait() 的 expire time 使用 CLOCK_MONOTONIC

struct timespec expire_time;
clock_gettime(CLOCK_MONOTONIC, &expire_time);
expire_time.tv_sec += seconds;
int status = pthread_cond_timedwait(&future->cond_finished,
                                  &future->mutex, &expire_time);

pthread_cond_timedwait
The condition variable shall have a clock attribute which specifies the clock that shall be used to measure the time specified by the abstime argument.

pthread_condattr_setclock
The default value of the clock attribute shall refer to the system clock.

因為 pthread_cond_timedwait() 需要 condition variable 具有 clock attribute,而預設的 system clock 用 pthread_condattr_getclock() 取得得到 CLOCK_REALTIME,非預期的 CLOCK_MONOTONIC

CLOCK_REALTIME 會跟 NTP 校正時間,可能往前或往後;而 CLOCK_MONOTONIC 則保證單調遞增,從特定點開始計數,clock_getres 說明在 Linux 上,特定點指的是開機時間。兩者差異甚大,因此初始化時還需指定 clock attribute。

static struct __tpool_future *tpool_future_create(void) {
  struct __tpool_future *future = malloc(sizeof(struct __tpool_future));
  if (future) {
    future->flag = 0;
    future->result = NULL;
    pthread_mutex_init(&future->mutex, NULL);
    pthread_condattr_t attr;
    pthread_condattr_init(&attr);
+   pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
    pthread_cond_init(&future->cond_finished, &attr);
    pthread_condattr_destroy(&attr);
  }
  return future;
}

Timeout 處理

8dc72 實現 timeout 處理,不使用 __FUTURE_CANCELLED__FUTURE_DESTROYED 達成。

使用 tpool_future_get() 取得計算結果發生逾時時,嘗試從 job queue 移除 task 和 future:

  1. 檢查 future->flag 是否有 __FUTURE_RUNNING,若有代表 worker 已取得對應的 task 進行計算,現行版本尚不支援中斷進行計算中的 worker,於是等待計算結果,結果等同
    tpool_future_get(futures[i], 0)
    
  2. 若無 __FUTURE_RUNNING,表示對應的 task 仍存於 jobqueue,將其連同 future 移除並釋放資源。

試著從 futures[i] 最後一項開始取得計算結果,設置 time limit = 1 ms。由於計算上是由 futures[0] 開始至 futures[PRECISION],一開始尾端的 task 尚未被 worker 取得計算,future 自然等不到 pthread_cond_broadcast(),因此從輸出可見一些 future 被移除。

-  for (int i = 0; i <= PRECISION; i++) {
+  for (int i = PRECISION; i >= 0; i--) {
    if (!futures[i])
      continue;
    double *result = tpool_future_get(futures[i], time_limit, pool->jobqueue);
    if (result) {
      bpp_sum += *result;
      free(result);
      tpool_future_destroy(futures[i]);
      DEBUG_PRINT(("Future[%d] completed!\n", i));
    } else
      DEBUG_PRINT(("Cannot get future[%d], timeout after %d milliseconds.\n", i,
             time_limit));
  }
$ ./pi 4 1
Thread count: 4
Time limit: 1 ms
Cannot get future[100], timeout after 1 milliseconds. /* future[100] is removed */
Future[99] completed!
...
Future[0] completed!
Elapsed time: 11592788 ns
PI calculated with 101 terms: 3.141592653589793

鏈結串列的效率

原實作中 pop task 的方式是走訪整個 鏈結串列取出 tail ,其 time complexity 為 O(n) ,造成在 critical section 內駐留過久,這對於 multithread 效能的傷害是很大的,藉由 sudo taskset 0xF0 ./pi 以 4 core CPU 測量 PRECISION = 100000 時的執行時間,多執行幾次可發現,雖然大多數執行時間落在 160~200 milliseconds ,但偶爾會有超過 100 seconds 或更久的執行時間,以 perf record 紀錄超過 100 seconds 的 process,發現有約 83% 的時間在執行 for (tmp = jobqueue->head; tmp->next != jobqueue->tail; tmp = tmp->next); ,而越是減少可用的 CPU 此情況會更嚴重(因為當 main thread 執行時不斷 push task,若 OS scheduler 沒有頻繁 context switch 給 jobqueue_fetch 消化 task,鏈結串列會增長非常快)。

因此,為了使 pop task 達到 O(1) ,可將原本的 jobqueue 改成雙向鏈結串列,後來更是直接引入 linux/list.h 以環狀雙向鏈結串列取代原本的單向鏈結串列,如此不但使 pop task 達到 O(1) ,也減少了在 jobqueue->head == jobqueue->tail 的 if statement ,詳細程式碼可見 commit f8761 ,而修改後以 4 core 執行的平均時間約為 138 milliseconds,且沒有特別的 outliers 。

typedef struct __threadtask {
    void *(*func)(void *); /* the function that task should execute */
    void *arg;             /* argument passed to func */
    struct __tpool_future
        *future;               /* A structure to store task status and result */
-    struct __threadtask *next; /* pointer to next task */
+    struct list_head list; /* linked list of task structure */
} threadtask_t;

typedef struct __jobqueue {
-    threadtask_t *head, *tail; /* store head and tail of queue */
+    struct list_head head; /* list head of task */
    pthread_cond_t
        cond_nonempty; /* condition variable to check if queue is non-empty */
    pthread_mutex_t rwlock; /* lock share resources like head and tail */
} jobqueue_t;

研讀 atomic_threadpool

atomic_threadpool 是一套使用 lock-free FIFO queue (lfqueue_deq) 實作的函式庫,支援 MS-Windows, macOS 與 Linux,近一半的程式碼是對不同平台所提供的 API 進行調整。

首先看較重要的函式 at_thpool_worker,功能等同 jobqueue_fetch,旨在從 job queue 抓取 task,task 由函式 task->do_work 及參數 task->arg 組成,由於沒有需要存入運算結果, task 運算結束後返回 TASK_PENDING 抓取下一個 task。

使用 tp->nrunning 來表示進行中的 worker 數量,設置 tp->is_running=0 能立即終止尚未執行 task 的 worker。

void *at_thpool_worker(void *_tp)
{
    at_thpool_t *tp = (at_thpool_t *) _tp;
    AT_THPOOL_INC(&tp->nrunning); // __sync_fetch_and_add(v, 1)

    at_thtask_t *task;
    void *_task;
    lfqueue_t *tq = &tp->taskqueue;

TASK_PENDING:
    while (tp->is_running) {
        if ((_task = lfqueue_deq(tq)))
            goto HANDLE_TASK;
        lfqueue_sleep(1);
    }

    AT_THPOOL_DEC(&tp->nrunning); // __sync_fetch_and_dec(v, 1)
    return NULL;
HANDLE_TASK:
    task = (at_thtask_t*) _task;
    task->do_work(task->args);
    AT_THPOOL_FREE(task);
    goto TASK_PENDING;
}

當使用 lfqueue_deq 從 job queue 從尾端取得 task 時,理想狀況其他執行緒可同時使用 lfqueue_enq 插入新的 task 而不衝突,而 lfqueue 也確實做到。

while (tp->is_running) {
    if ((_task = lfqueue_deq(tq)))
        goto HANDLE_TASK;
    lfqueue_sleep(1);
}

lfqueue

atomic_threadpool 所使用到的 lfqueue API,共同操作同一個 lfqueue_t 型態的 lfqueue:

extern int   lfqueue_init(lfqueue_t *lfqueue);
extern int   lfqueue_enq(lfqueue_t *lfqueue, void *value);
extern void* lfqueue_deq(lfqueue_t *lfqueue);
extern void lfqueue_destroy(lfqueue_t *lfqueue);
typedef struct {
	lfqueue_cas_node_t *head, *tail, *root_free, *move_free;
	volatile size_t size;
	volatile lfq_bool_t in_free_mode;
	lfqueue_malloc_fn _malloc;
	lfqueue_free_fn _free;
	void *pl;
} lfqueue_t;

結構內有兩條佇列

  1. head/tail: 分別指向佇列前端/尾端,head 同時也指向上一個取出的節點,
  2. root_free/move_free: root_free 分別指向佇列前端/尾端,紀錄等待被釋放的節點。每個 lfqueue_cas_node_t 型態的節點都帶有 lfq_time_t 型態的 _deactivate_tm 紀錄進入佇列的時間。
    • __lfq_check_free

      ​​​​​​​​static void ​​​​​​​​__lfq_check_free(lfqueue_t *lfqueue) { ​​​​​​​​ lfq_time_t curr_time; ​​​​​​​​ // 限制只有一條執行緒能夠執行 ​​​​​​​​ if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->in_free_mode, 0, 1)) { ​​​​​​​​ // 以 rootfree 為起始刪除後面節點 ​​​​​​​​ // 並限制離 _deactivate_tm 超過 2s ​​​​​​​​ lfq_get_curr_time(&curr_time); ​​​​​​​​ lfqueue_cas_node_t *rtfree = lfqueue->root_free, *nextfree; ​​​​​​​​ while ( rtfree && (rtfree != lfqueue->move_free) ) { ​​​​​​​​ nextfree = rtfree->nextfree; ​​​​​​​​ if ( lfq_diff_time(curr_time, rtfree->_deactivate_tm) > 2) { ​​​​​​​​ // printf("%p\n", rtfree); ​​​​​​​​ lfqueue->_free(lfqueue->pl, rtfree); ​​​​​​​​ rtfree = nextfree; ​​​​​​​​ } else { ​​​​​​​​ break; ​​​​​​​​ } ​​​​​​​​ } ​​​​​​​​ lfqueue->root_free = rtfree; ​​​​​​​​ __LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->in_free_mode, 1, 0); ​​​​​​​​ } ​​​​​​​​ __LFQ_SYNC_MEMORY(); ​​​​​​​​}

      第 9 行表明佇列至少存在一個節點,因此 move_free 不存在指向 NULL 的可能性,好處是插入時不用判斷是否指向 NULL

      ​​​​​​​​move_free->next = new_node;
      

      缺點是針對程式起始、結束要特別處理多出來的一個節點,另一條佇列同樣情況。

      ​​​​​​​​int
      ​​​​​​​​lfqueue_init_mf(lfqueue_t *lfqueue, void* pl, lfqueue_malloc_fn lfqueue_malloc, lfqueue_free_fn lfqueue_free) {
      ​​​​​​​​    ...
      
      ​​​​​​​​    freebase->value = NULL;
      ​​​​​​​​    freebase->next = NULL;
      ​​​​​​​​    freebase->nextfree = NULL;
      ​​​​​​​​    freebase->_deactivate_tm = 0;
      
      ​​​​​​​​    lfqueue->head = lfqueue->tail = base; // Not yet to be free for first node only
      ​​​​​​​​    lfqueue->root_free = lfqueue->move_free = freebase; // Not yet to be free for first node only
      ​​​​​​​​    ...
      ​​​​​​​​}
      
      ​​​​​​​​void
      ​​​​​​​​lfqueue_destroy(lfqueue_t *lfqueue) {
      ​​​​​​​​    ...
      ​​​​​​​​    if (rtfree) { // rtfree will never be NULL
      ​​​​​​​​        lfqueue->_free(lfqueue->pl, rtfree);
      ​​​​​​​​    }
      ​​​​​​​​    ...
      ​​​​​​​​}
      

__lfq_recycle_free

__lfq_check_free 稍早已提到功能是清除以 root_free 為首的佇列,而 __lfq_recycle_free 便是將節點推入佇列。因為在多執行緒下,如果直接把節點的資源釋放掉可能會在之後的操作造成錯誤。







main


cluster_move

CAS(&freed->nextfree, NULL, freenode)


cluster_freed

lfqueue->root_free
lfqueue->move_free
freed



freebase

free base



node_1

free node 1



freebase->node_1


nextfree









main


cluster_cas

CAS(&lfqueue->move_free, freed, freenode)


cluster_freed

lfqueue->root_free
lfqueue->move_free
freed


cluster_freed_l

freed"



freebase

free base



node_1

free node 1



freebase->node_1


compare
 success



temp

free base



freebase->temp





static void __lfq_recycle_free(lfqueue_t *lfqueue, lfqueue_cas_node_t* freenode) { lfqueue_cas_node_t *freed; do { freed = lfqueue->move_free; } while (!__LFQ_BOOL_COMPARE_AND_SWAP(&freed->nextfree, NULL, freenode) ); lfq_get_curr_time(&freenode->_deactivate_tm); __LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->move_free, freed, freenode); }
  • 佇列尾端 move_free->nextfree = NULL
  • 節點型態雖為 lfqueue_cas_node_t 但非用 next 而是以 nextfree 進行連結。
  • 節點在 enqueue 時即初始化 nextfree = NULL,因此 freenode->nextfree = NULL
  • 當第 6 行以 __LFQ_BOOL_COMPARE_AND_SWAP 插入節點 freenodemove_free 後端,即限制了直到第 10 結束都沒有其他執行緒可執行下去,形成 critical section。






%0



mvf

 

nextfree



NULL1
NULL



mvf:e->NULL1:w





fn

 

nextfree



mvf:e->fn:w





NULL2
NULL



fn:e->NULL2:w





movefree
movefree



movefree->mvf





freenode
freenode



freenode->fn











main

free mode
loop
if rtfree != NULL && rtfree != lfqueue->move_free

cluster_move

lfqueue->move_free


cluster_free

free ( functoin )


cluster_rtfree

rtfree



freebase

lfqueue->root_free
(free base)



node_1

free node 1



freebase->node_1


nextfree



node_1->freebase


assign to
rbfree



lfqueue_enq

插入新節點

static int
_enqueue(lfqueue_t *lfqueue, void* value) {
	lfqueue_cas_node_t *tail, *node;
	node = (lfqueue_cas_node_t*) lfqueue->_malloc(lfqueue->pl, sizeof(lfqueue_cas_node_t));
	if (node == NULL) {
		perror("malloc");
		return errno;
	}
	node->value = value;
	node->next = NULL;
	node->nextfree = NULL;
	for (;;) {
		__LFQ_SYNC_MEMORY(); // 沒有 memory barrier 會因為編譯器最佳化或是因為 out-of-order execution 無法拿到最新的 tail
		tail = lfqueue->tail; // 不可能發生 tail = NULL
		if (__LFQ_BOOL_COMPARE_AND_SWAP(&tail->next, NULL, node)) {
			/* 由於使用 atomic operation
			 * 同一時間只會有一個執行緒更新 tail->next
			 * 接下來才會將 tail 指向新的尾端完成整個插入流程
			 * 另一個執行緒所拿到的 tail->next 才會是 NULL
			 */
			 // compulsory swap as tail->next is no NULL anymore, it has fenced on other thread
			__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->tail, tail, node);
			__lfq_check_free(lfqueue); // cleanup
			return 0;
		}
	}

	/*It never be here*/
	return -1;
}

觀察 linux 上的實作,使用到 lock-free 技巧常見的 CAS 等 atomic operation,令人不解的是大部份函式都有 full barrier 的作用,這裡特地引進 __sync_synchronize (full barrier)。

#define __LFQ_VAL_COMPARE_AND_SWAP __sync_val_compare_and_swap
#define __LFQ_BOOL_COMPARE_AND_SWAP __sync_bool_compare_and_swap
#define __LFQ_FETCH_AND_ADD __sync_fetch_and_add // skip
#define __LFQ_ADD_AND_FETCH __sync_add_and_fetch
#define __LFQ_YIELD_THREAD sched_yield
#define __LFQ_SYNC_MEMORY __sync_synchronize

lfqueue_deq

從佇列前端移去節點,要判斷的情況較多。改寫部份程式碼和搭配註解。







main


cluster_next

next


cluster_pop

CAS(&lfqueue->head, head, head)



inter

...



node_n

node_n



inter->node_n





head
head



node_1

node_1



head->node_1





tail
tail



tail->node_n





_next

_next



node_1->_next





_next->inter





  • 8 行的 CAS 應是為了避免取得 head 後,原有的 head 指向的節點被其他執行緒釋放導致 head->next 產生未定義行為。實際上釋放會在 2 秒後由 __lfq_check_free 執行,發生未定義行為的可能性極微,即使加了 CAS 也不能避免 8~9 行間 __lfq_check_free 釋放掉 head
static void * _dequeue(lfqueue_t *lfqueue) { lfqueue_cas_node_t *head, *next; void *val; for (;;) { head = lfqueue->head; if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->head, head, head)) { next = head->next; if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->tail, head, head)) { /* head == tail 代表無可取出節點,此時 next 必為 NULL * 這時可能有 `lfqueue_enq` 執行插入節點產生 `next != NULL` 的情況 * 故需做判斷 */ if (next == NULL) { /* 遇到這種情況重新取值即可,但一些函式需要回傳值判斷 queue 為空 */ val = NULL; goto _done; } } else { /* 目的是排除 next = NULL 的狀況,以免 dereference 出錯 * 正常情況下 next = NULL 只有在 head = tail 情況成立 */ if (next) { val = next->value; if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfqueue->head, head, next)) { break; } } else { val = NULL; goto _done; } } } } __lfq_recycle_free(lfqueue, head); _done: // __asm volatile("" ::: "memory"); __LFQ_SYNC_MEMORY(); __lfq_check_free(lfqueue); return val; }

使用 gdb 觀察 31 行發生當下:

(gdb) p next
$14 = (lfqueue_cas_node_t *) 0x0
(gdb) p head->next
$15 = (struct lfqueue_cas_node_s *) 0x55555bf83db0
(gdb) p head
$17 = (lfqueue_cas_node_t *) 0x55555bf83d80
(gdb) p *head->next
$18 = {value = 0x555555c57490, next = 0x55555bf83de0, nextfree = 0x55555bf83de0, _deactivate_tm = 1623174704}
head = lfqueue->head; next = head->next;

gdb 顯示 next != head->next,明顯與上方程式碼矛盾。表明 7~10 行即使通過 __LFQ_BOOL_COMPARE_AND_SWAPhead 可能經由其他執行緒的 enqueue,導致 next 指向其他新的節點。







main


cluster_next

next


cluster_empty

CAS(&lfqueue->tail, head, head)



inter

...



node_n

node_n



inter->node_n





NULL

next == NULL




head
head



node_1

node_1



head->node_1





tail
tail



_next

_next



node_1->_next





_next->inter





_next->NULL





val

next != NULL



_next->val





node_n->tail





此實作並未解決 ABA problem

使用 C11 Atomics 改寫

The idea of "lock free" is not really not having any lock, the idea is to minimize the number of locks and/or critical sections, by using some techniques that allow us not to use locks for most operations.

In this sense, the lock in lock-free does not refer directly to mutexes, but rather to the possibility of “locking up” the entire application in some way, whether it’s deadlock, livelock – or even due to hypothetical thread scheduling decisions made by your worst enemy. - An Introduction to Lock-Free Programming

廣泛的說,lock-free 並非指不使用 mutex (lock) 而是指鎖住整個 process 的可能性,下方程式碼雖然沒有使用到 mutex,但若兩個 thread 執行同樣的程式碼,在特定執行順序下永遠無法離開 loop。

while (X == 0)
{
    X = 1 - X;
}

而使用 mutex 的 thread 在最差情況下,所使用的 mutex 尚未被其他 thread 所釋放,而停滯不前,甚至導致 lock 之間的競爭,自然不算在 lock-free 範疇。

lock-free queue

參考 RZHuangJeff 使用 ring buffer 實作 atomic queue。 使用 ring buffer 有無需管理記憶體及固定緩衝區大小的好處,再以 mmap 處理緩衝區邊界,減少判斷讀寫是否會超出邊界所帶來的效能影響。

設計上只有一個 producer,使用 count 來紀錄存入的資料數,consumer 依照 count 決定是否往下執行,避開判斷邊界(如下)。由於 head, tail 都是 atomic,依據邊界來判斷需於一個 CAS 指令內對兩個變數進行操作,較為不便。

// is full
head == (tail ^ rbf->size*2)

// is empty
head == tail

為避免 ABA problem,將用來存 offset 的變數前 32-bit 放置要存入 buffer 的資料的一小部份,例如 (void *) 型態的資料為 8 byte,假設資料前 4 byte 相同不具特徵(e.g. 連續記憶體位置的指標),將後 4 byte 併入 offset 的高 4 byte,這麼做即使在 CAS 判斷 offset 相同,也能透過前 4 byte 得知資料更動。

/* producer */
bool enqueue(ringbuffer_t *rb, void **src) {
  uint64_t _read, _write;

  _read = atomic_load(&rb->read_offset) & RB_OFF_MASK;
  _write = atomic_load(&rb->write_offset) & RB_OFF_MASK;
  if (_read == (_write ^ rb->size))
    return false;

  memcpy(&rb->buffer[_write], src, sizeof(void *));
  _write = (_write + sizeof(void *)) & rb->mask;
  _write |= ((uint64_t)*src << 32);
  atomic_store_explicit(&rb->write_offset, _write, memory_order_release);
  atomic_fetch_add_explicit(&rb->count, 1, memory_order_release);
  return true;
}

/* consumer */
bool dequeue(ringbuffer_t *rb, void **dst) {
  int64_t count, new_count;

  do {
    count = atomic_load(&rb->count);
    new_count = count - 1;
    if (__builtin_expect((new_count < 0), 1))
      return false;
  } while (!atomic_compare_exchange_weak(&rb->count, &count, new_count));

  uint64_t _read, new_read;
  do {
    _read = atomic_load(&rb->read_offset);
    new_read = (((_read & RB_OFF_MASK) + sizeof(void *)) & rb->mask);
    memcpy(dst, &rb->buffer[_read & RB_OFF_MASK], sizeof(void *));
  } while (!atomic_compare_exchange_weak(&rb->read_offset, &_read, new_read));

  return true;
}

affinity-based thread pool

Thread safety with affine thread pools 一文提到執行緒需要為在處理器核 (core) 切換間付出 context switching 及 cache refresh/invalidation 的代價,因此主張同一段程式碼應固定在同一個處理器執行。

此外,該文也提出 work stealing 的實作方式:所有 thread 應有屬於自己的 private queue 及共享的 shared queue。thread 從 private queue 提取任務執行,在閒置時則從 shared queue 提取,以最大化利用資源。

afn_threadpool.c 使用 lock-free queue 實作上述功能,達到以下需求:

  • 建立與處理器核數 (physical,而非 logical) 相同數量的 threads 接收任務,並使用 pthread_setaffinity_np() 固定在不同的處理器上。
  • 提供方法將任務排進 private queue 或 shared queue
  • thread 每執行 32 個任務即交換 private queue 和 shared queue,以防 starvation。

afn_threadpool_pi.c 採用 afn_threadpool.c 來計算 pi。

int main(int argc, char **argv) {
    ...

  threadpool_t *tp;
  if (!tp_init(&tp, nthreads))
    exit(EXIT_FAILURE);
  tpool_future_t *futures[PRECISION + 1];

  for (int i = 0; i <= PRECISION; i++) {
    bpp_args[i] = i;
    futures[i] = tp_queue(tp, bpp, (void *)&bpp_args[i]);
  }
  for (int i = 0; i <= PRECISION; i++) {
    if (!futures[i])
      continue;
    double *result = tpool_future_get(futures[i], time_limit);
    if (result) {
      bpp_sum += *result;
      free(result);
      tpool_future_destroy(futures[i]);
    }
  }
  tp_join(tp);
  tp_destroy(tp);
    ...
}

效能比較

比較 3 種 thread pool 計算 π (PRECISION=1000) 所需時間

  1. mutex protected queue (orginal)
    • ./threadpool_pi
  2. lock-free queue + affinity-based thread pool
    • ./afn_threadpool_pi
      • w/o pthread_setaffinity_np()
  3. lock-free queue
    • ./afn_threadpool_pi_v2
      • w/ pthread_setaffinity_np()

原始程式碼 與其執行方式:

$ make benchmark && make plot
$ lscpu
Model name:                      AMD EPYC 7501 32-Core Processor
Stepping:                        2
CPU MHz:                         1999.998
BogoMIPS:                        3999.99
Hypervisor vendor:               KVM
Virtualization type:             full
L1d cache:                       2 MiB
L1i cache:                       2 MiB
L2 cache:                        16 MiB
L3 cache:                        512 MiB

$ uname -r
5.4.0-72-generic

$ gcc --version
gcc (Ubuntu 9.3.0-17ubuntu1~20.04) 9.3.0

使用 lock-free 明顯減少執行時間,可惜隨著 thread 增加 throughtput 反而下降。

接下來參考 eecheng87 的作法使用 sched_yield(),在 dequeue 失敗時讓出 CPU,這樣的好處是如果單一處理器上有多個 thread,執行任務的優先拿到 CPU,減少無謂的 dequeue。因此可見 lock-free queue 的版本執行時間下降,本來就是單個 thread 獨占處理器的則不影響。

/* afn_thradpool.c */
- !(dequeue(p_queue, (void **)&task) || dequeue(s_queue, (void **)&task)));
+ !(dequeue(p_queue, (void **)&task) || dequeue(s_queue, (void **)&task))){
+    sched_yield();
+}

  • 實驗結果 II - AMD Ryzen 7 3800XT & Intel i5-6200U

由於 Linode 提供的虛擬機無法得知 CPU mapping 和實際分配的 physical core 數目,因此改用其他機器進行實驗,同時加入 likwid-topology。

likwid-topology 是一套列出 SMT thread 、快取與處理器核階層關係的工具。HWThread 代表在 linux 出現的 CPU 編號,也就是 htop 看到的 CPU0、CPU1Thread 則是處理器核上的 SMT Thread 編號;Core 代表 physical core,如下列所示。

實驗先使用 likwid 函式庫取得 cpu topology,將 thread 固定在不同的 physical core 上 (雖然從 HWThread=0 循序往下放效果一樣,但使用 likwid 方便日後指定在任意處理器核上實驗)。

$ likwid-topology -g
--------------------------------------------------------------------------------
CPU name:	AMD Ryzen 7 3800XT 8-Core Processor
CPU type:	nil
CPU stepping:	0
********************************************************************************
Hardware Thread Topology
********************************************************************************
Sockets:		1
Cores per socket:	8
Threads per core:	2
--------------------------------------------------------------------------------
HWThread	Thread		Core		Socket		Available
0		0		0		0		*
1		0		1		0		*
2		0		2		0		*
3		0		3		0		*
4		0		4		0		*
5		0		5		0		*
6		0		6		0		*
7		0		7		0		*
8		1		0		0		*
9		1		1		0		*
10		1		2		0		*
11		1		3		0		*
12		1		4		0		*
13		1		5		0		*
14		1		6		0		*
15		1		7		0		*
--------------------------------------------------------------------------------
/* afn_threadpool_pi.c */
    CpuTopology_t topo = get_cpuTopology();
    int numSockets = topo->numSockets,
        numCoresPerSocket = topo->numCoresPerSocket,
        numHWThreads = topo->numHWThreads, cpulist[topo->numHWThreads], idx = 0;
    for (int socket = 0; socket < numSockets; ++socket) {
        for (int core = 0; core < numCoresPerSocket; ++core) {
        for (int i = 0; i < numHWThreads; ++i) {
            int threadId = topo->threadPool[i].threadId,
                coreId = topo->threadPool[i].coreId,
                packageId = topo->threadPool[i].packageId,
                apicId = topo->threadPool[i].apicId;
            if (packageId == socket && coreId == core) {
            cpulist[idx + threadId * (numCoresPerSocket * numSockets)] = apicId;
            }
        }
        idx++;
        }
    }
    topology_finalize();

實驗過程中發現 likwid 初始化時間頗長,為了凸顯 lock-free 和 mutuex protected 兩者實作的差異,將 thread pool 初始化和釋放排除在測量時間外(b3cd6),結果發現時間大幅縮短,簡短的在 i5-6200U 上測試建立及釋放時間:


和 task 相比,可見 worker thread 的建立及釋放時間頗長,thread pool 有其必要性。再來是運算時間:

  • AMD R7-3800XT

  • Intel i5-6200U

在 lock-free 的測試中,執行時間有 pinned 與否沒差多少。而隨著 thread count 增加,lock-free 實作始終優於原始版本。

Pthread: Cancellation point

取自 man-pages:

  • Creation: pthread_create()

    ​​ int pthread_create(pthread_t *restrict thread,
    ​​                    const pthread_attr_t *restrict attr,
    ​​                    void *(*start_routine)(void *),
    ​​                    void *restrict arg);
    

    thread: 指向新建的 thread ID
    start_routine: thread 建立後執行的函式,參數為 arg

    ​​​​// Creating a new thread
    ​​​​pthread_create(&ptid, NULL, &func, NULL);
    
  • Cancellation clean-up handlers
    建立方式是由 pthread_cleanup_push() 將將函式逐一推進堆疊中,執行時從堆疊最上方依序執行

    pthread_exit()
    Any clean-up handlers established by pthread_cleanup_push(3) that have not yet been popped, are popped (in the reverse of the order in which they were pushed) and executed.

    Clean-up handlers 作用是在執行緒結束前釋放資源,包括

    • mutex
    • condition variables
    • semaphores
    • file descriptor

    等不會在執行緒結束時釋放的資源。觸發情境有幾種:

    1. 執行緒即將結束時呼叫 pthread_exit(),執行堆疊中所有 handlers
    2. 由其他執行緒呼叫 pthread_cancel() 發出請求 (cancellation request),當具有 Deferred cancelability 的執行緒執行到 cancellation point 時,執行堆疊中所有 handlers
    3. 執行緒呼叫 pthread_cleanup_pop() 從堆疊上取出最上層的 handler,可選擇執行與否,與 pthread_cleanup_push() 搭配使用

    值得注意的是,結束執行緒若使用 return, handlers 將不會被呼叫,而 return val 意同 pthread_exit(val)

    Clean-up handlers are not called if the thread terminates by performing a return from the thread start function.

    Performing a return from the start function of any thread other than the main thread results in an implicit call to pthread_exit(), using the function's return value as the thread's exit status.

  • Cancellation point

    • 是執行緒用來檢查是否取消的時間點。"取消"一詞指的是執行緒的終止,被請求或是正常執行到最後,最終釋放所有資源,雖然 pthread_kill() 也能做到執行緒的終止,但不會釋放資源。

    • Cancellation point 可由 pthread_testcancel() 進行設置,其他執行緒以 pthread_cancel() 送出請求 (cancellation request),當執行緒執行到 cancellation point 時才會取消。

    • pthread_setcancelstate() 進行設置 cancelability 決定觸發與否,預設是 ENABLE

      ​​​​​​​​pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate)
      ​​​​​​​​pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate)
      
    • pthread_setcanceltype() 設置 cancelability tpye,預設是 DEFERRED,意思是請求會被推遲到下一個 cancellation point,ASYNCHRONOUS 則是接收到請求後立刻取消。

      ​​​​​​​​pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &oldtype)
      ​​​​​​​​pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &oldtype)
      

      當執行緒會保留/釋放資源時,設置 ASYNCHRONOUS,會讓執行緒收到請求後立刻處理,無法確立資源狀態是釋放前還是釋放後,使得 clean-up handler 無法正確的處理,不建議使用。而在文件中也註明只有 compute-bound loop 的執行緒或是下列 async-cancel-safe functions 適合用 ASYNCHRONOUS

      • pthread_cancel()
      • pthread_setcancelstate()
      • pthread_setcanceltype()
    • pthreads 明確定義哪些函式必須是/可能是 cancellation point

      • 必須是
        • pthread_cond_timedwait()
        • pthread_cond_wait()
        • pthread_testcancel()

      pthread_cond_wait()/pthread_cond_timewait() 為何是 cancellation point? 是為了防止 indefinite wait,例如等待輸入的資料從未被送出(e.g. read()),仍可以取消執行緒的執行。當接收到 cancellation request,如同 unblocked thread 一樣重新取得 mutex,但不是返回呼叫 pthread_cond_wait()/pthread_cond_timedwait() 的地方而是執行 clean-up handlers.

    • pthread_join/pthread_exit()

      ​​​​​​​​void pthread_exit(void *retval);
      ​​​​​​​​int pthread_join(pthread_t thread, void **retval);
      

      pthread_exit() 於執行緒結束時呼叫,返回的數值 retval 供其他呼叫 pthread_join() 的執行緒取得。retval 不可存在於執行緒的 stack 上,否則產生未定義行為。

      pthread_join() 負責將目標執行緒返回的 exit status 複製到指定地址,如果該執行緒先前已取消,則複製 PTHREAD_CANCELED 到指定地址。

      ​​​​​​​​/* example */
      ​​​​​​​​void *app1(void *x)
      ​​​​​​​​{
      ​​​​​​​​    pthread_exit(20);
      ​​​​​​​​}
      
      ​​​​​​​​int main()
      ​​​​​​​​{
      ​​​​​​​​    int ret;
      ​​​​​​​​    pthread_t t1;
      ​​​​​​​​    pthread_create(&t1, NULL, app1, NULL);
      ​​​​​​​​    pthread_join(t1, &ret);
      ​​​​​​​​    return 0;
      ​​​​​​​​}
      

參考資料