Try   HackMD

Lock-free Multiple Producer-Consumer 實作

contributed by < idoleat >

GitHub

第 9 週測驗題第二題和第 11 週測驗題給定的程式碼為基礎,探討 lock-based vs. lock-free Multiple Producer-Consumer 的設計和實作議題

第一種實做 -> correctness/benchmark -> 第二種

Introduction

通常 micro optimization 及對應的 benchmarking 較常見於熱門大型專案或公司的特定團隊,例如 meta 的高效能函式庫 (folly 是其中一個) 或是 rust 的新 std::mutex 實做,但是 C 或是 C++ 不像 Java 背後有商業公司帶領討論較為可惜。我們希望藉此期末專題也可以針對 Producer-Consumer 問題進行探討 (第 14 週測驗即算一種)

Implementation

some brainstorming

通用 -> 特化 -> 分析執行時間和限制
沒有針對特定資料結構,但是可以參考第九週測驗要有一個通用的界面

  • Traditional Lock
  • fine-grained lock
  • URCU
  • tree structured/grouped writers
    • producers are leaf nodes, shared queue is the root node. (arrival time is in topology order?)
  • CRDT
  • futex
  • circular/ring buffer
  • historical modeling
  • preemption-safe
  • cache friendly
  • PRNG
  • leverage immutability
  • threaded logger

concurrent linked list 應用場景:檔案系統
(historical modeling 也可以嘗試用於檔案系統)
討論需要至少有一些細節

lock-based

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
先做好分析,把分析先列出來,例如可能會遇到的問題、困難的點在哪裡、打算怎麼做。寫程式和看論文比較花時間,但是動頭腦先想想看要怎麼做是隨時隨地可以做的 (這也算一種 fine-grained),有可能這件事是不可行的,所以不要說還沒花時間,可能時間花了也做不到。把目標設為一氣呵成做出來是很危險的。

Add a mutex lock in QUEUE struct. Acquire before accessing other queue members. Release after accessing other queue members. Notice that critical section should be as small as possible.

Lock 使用上可能會遇到的問題

  • Multiple writer 的情況下要怎麼知道 bounded buffer 的狀態?
  • 過度頻繁的上鎖解鎖

使用 Lock 的策略

  • Fine grained: Critical Section 的範圍只針對單一 member,甚至有些地方可以不需要上鎖,atomic write 即可
  • 需要同步的是 dequeue/enqueue index (atomic r/w 也可?),若為 SC/SP 則不須 lock
    • 可以先做安全預留,先留留空間再讓 producer 分別寫,例如訂房和入住可以分開來
    • 同理 multi reader 也可以同時讀,如果碰到同個 buffer 位置有 writer 正在寫那就等他寫完並不再往前讀 (要如何知道正在寫?調整 Lock 為針對單一 queue 元素鎖?那會不會太多?Lock Pool? 還是用 semaphore?)
  • Using variance of locks
  • 針對個別 cell 上鎖,若 acquire 失敗就往下一個 cell
    • 每個 producer 各自維護 index (How?)
    • 太多鎖

版本 1: 僅在 MP/MC 時對 index 上鎖

commit 7a0e4f1
上方使用鎖的策略提到 producer 透過 enqueue index 得知 bounded buffer 的狀態,consumer 則是透過 dequeue index,兩個 index 皆為 shared resources。所以在 multi-producer 下 producer 需要取得 p_lock 才能對 index +1,完畢後釋放,multi-consumer 亦然。

預期可能會對,但是在檢查 empty 或 full 的時候存取 index 沒有 acquire lock 所以有可能會錯。在執行多次後,有機會會全部通過測試,也有可能在 mpsc full 測試時 (手動計時) 超時,或是在 mpsc sums1000 時沒有得到預期的 global_count。成功獲得了預期中的時對時錯的錯誤結果。

開啟 thread sanitizer 後,
原本的 lock-free 版本及此版本皆會出現錯誤訊息

Test: Spsc: null_pointers       : PASS
Test: Spsc: create              : PASS
Test: Spsc: empty               : PASS
Test: Spsc: full                : PASS
ThreadSanitizer:DEADLYSIGNAL
==30414==ERROR: ThreadSanitizer: SEGV on unknown address 0x000000000000 (pc 0x7f97256a3ec7 bp 0x000000000000 sp 0x7f9722c3f078 T30415)
==30414==The signal is caused by a READ memory access.
==30414==Hint: address points to the zero page.
ThreadSanitizerThreadSanitizer:DEADLYSIGNAL
:DEADLYSIGNAL
ThreadSanitizer: nested bug in the same thread, aborting.

將 index 印出檢查後發現會超時的原因為當 queue index 進入錯序狀態,即 dequeue index > enqueue index,在呼叫非 try 前綴的操作時,會因為沒有復原機制而無法脫離錯序狀態,不停在錯序狀態下重複嘗試。進入錯序狀態的原因為

在該回傳 contention 時未回傳

ToDo:

  1. 加入 enqueue 和 dequeue 交錯執行的測試案例,設定不同的 enqueue/dequeue 比例,並實際檢驗。
  2. 使用 SIGALRM 加入超時檢測
  3. 處理 index overflow
  4. 應該要用 trylock?
  5. 增加 error handling?
  6. 再仔細思考用鎖策略,補齊 (最近綜觀各語言的 concurrency 發展,皆朝著更有策略性、結構性的運用 concurrency primitives 發展)

版本 2:

先 +1 再檢查 full/empty 是愚蠢的,可以先讀當下的 index 為 local 比較完沒問題再 +1 和寫入資料,否則失敗又要再 -1。 可是 index 一直都在變,即便比較完是正確的,可能要寫入時又變不正確,所以原本的預先 +1 取得 enqueue/dequeue 的位子是比較好的作法

enq_idx - queue->dequeue_index > queue->cell_mask + 1
enqueue 時取 dequeue index 檢查 queue 有沒有滿不須用鎖,因為如果本來是沒滿 dq idx +1 了還是沒滿,本來是因為多一個滿了的話僅會回傳不必要的 QueueResult_full,除非一直處於因多一個滿了的狀態否則結果仍為正確。 (好像不太對)

在 queue 為空的情況下需要注意在 enqueue 時不能被 dequeue

若是在 thread_in() 中插入 printf() 把 enqueue 的編號印出來,會讓原本的超時變成 global counter 數值不對。printf() 實作中有使用 mutex 以確保輸出不會被中斷,據我猜測此處會阻擋當下執行緒,變向的變成拖長每次 enqueue 的時間,還得到不同的測試結果。照理說 enqueue 時間長短不應該造成結果不一,此處不僅結果錯誤還得到不同錯誤,看來可以嘗試作為 debug 線索。

non-blocking

重新分析現況,以 Array 為基礎的 queue 的共享資源為 index,對於 enqueue 及 dequeue index 皆為多寫入者多讀取者問題,最困難的第方式如何「正確」的判斷 queue 什麼時候滿或空,以正確得更新 index (之前都是以 index 差作為判斷標準,可以試試以總量判斷,也可以試試以 List 為基礎)。

所以目標是

  1. 使用同步機制使得 index 的多寫入者多讀取者問題變成單寫入者多讀取者問題
    • 若非 MP 及非 MC ,本身就為單寫入者多讀取者問題,不須協調多寫入者
  2. 正確的判斷當下是否為更新 index 的有效情況。

得知 queue 滿的方法:

  • 要 enqueue 的時候發現 cell 內還有資料,表示已經滿了 (前提是 dequeue 的時候會順便清除或歸回 cell 預設值)
  • 類似 canary,多使用一個 cell,在即將被 dequeue 的 cell 之前,作為!@#$%^&
  • 計算 deq/enq index 的差值,不超過總容量

SP 及 SC 的情況下,

Reference materials

  • GECOS
    • 可以作為 micro optimization 的範例
  • The Art of Concurrency: A Thread Monkey's Guide to Writing Parallel Applications
    • 太聚焦於平行範且例過時,但是可以參考手法及技巧
  • Perfbook, you say parallel is hard

Test

  1. make to see all available testing targets.
  2. make [target] to test specific desired implementation

測試方式

依據原始的測試程式,有以下幾種

  • null_pointers: 未配置 queue 也未配置 byte 的情況
  • create
  • empty
  • full
  • sums10000

注意 EXPECT 在測試中的用途及 do while(0) 的技巧,此巨集將會判斷預期情況是否成立,若不成立則釋放 queue 的記憶體,並將不成立的情形字串化 return

有全空 dequeue、全滿 enqueue 的測試,那麼接下來就是要測試從空的漸漸填滿以及從滿的漸漸取完的交錯 dequeue/enqueue,如同排隊理論中提到的進出比例,進出比 > 1 時會將 buffer 填滿,< 1 時則反之。

實做方式為建立指定數量的 dequeue threads 及 enqueue threads,並透過 pthread barrier 的技巧,待所有執行緒建立就緒再一同喚醒,在沒滿或空的情況下重複執行。目前仿照 sums10000 的寫法,使用 global count 驗證,例如在 enqueue threads 較多的情況下,最後 global count 數值應為 (in - out) * q_size/(in - out) = q_size

超時檢查

測試有可能進入無法停止的錯誤狀態,因此需要設置一個時間上限,超過即算錯誤,目前設為 3 秒 (應該需要個方法來估算合理的時間上限)。

實做方式是在 main 開始時先註冊 SIGALRM handler,每個 TEST 開始前 alarm(3),若有在時間內結束就 alarm(0) 取消還沒響的鬧鐘。由於 handler 無法傳入 signal number 以外的參數,也不在 main 的 scope 內無法 goto,目前只有直接 raise(SIGTERM) 來結束超時的測試。理想狀況是將錯誤訊息設為 timeout,並直接跳到輸出測試結果的部份,目前正在尋找有沒有更好的寫法。

注意 signal man page 中有提到 signal 的行為在各個 UNIX 之間不同,因此建議使用 sigaction(2) 以增進 Portability

Project structure

  • Each directory contains an implementation. Makefile and test.c at root for building and testing.
  • /build contains the implementation copied from specified target directory.
  • /template contains a template for new implementation.

Correctness

以老師提供的 promela 範例作為 model checking 的基礎
ref

Lemma
對於基於 index 操作的實作,只要 producers 各分別對 enqueue index 的 +1 是 linearizable,以及 consumers 分別對 dequeue index 的 +1 也是 linearizable ,那麼對 queue 的並行操作就會得到正確的結果

proof
General lock-free 版本:
linearizable point - 任何對 index 的操作一定要得到最新的狀態,否則就重來,以確保其他 thread 的寫入結果對現在要寫入的 thread 可見

  1. 在 queue 不為空也不為滿的情況下,從讀取最新 index 後,若 index 為持相同數值,則表示 index 所標示的位置沒有其他 thread 爭奪,取得可以安全獨自寫入空間。+1 宣告其所有權後進行資料寫入。
  2. 若是 index 沒有維持相同數值則當下操作不為最新,且結果對其他 thread 不可見,因此不做任何更動
  3. 若 queue 為空或滿 (經由 sequence 檢查 index 是否為合法),則不做任何更動

Benchmark

summation 和交錯執行本身也算一種 benchmark

Comparison


ToDo

  • Set up testing interface to test out each implementation easily
  • General lock-free one in quiz9
  • Lock based implementation

Notes

  • 是不是可以用大量因為不用鎖或使用鎖的方式不正確產出的結果,反推排程策略?
  • 開始寫 lock-based 的版本之後發現自己連 lock 都寫不好只好回去複習
  • 指導教授推薦了一篇 DHASH: Dynamic Hash Tables with Non-blocking Regular Operations 給我,做了一點摘要和筆記
  • byte 存在的用意是可以透過 make 得知整個 queue 所需要的大小或是檢查即將要分配的大小是否正確,並在之後實際分配空間給 queue。感覺可以有更好的作法?
  • dequeue, enqueue index 的追逐應該可以用兩倍 queue 大小來判斷
  • 做到一個階段可以參考一下同學或別人的作法來驗證或是檢查有沒有可以改進之處
  • multi-reader multi-writer 問題都可以化簡為 multi-reader single-writer 問題
    • 所有不是最新的 writer 都會變成 reader,常見作法是使用 CAS (小心 ABA) 或是透過 acquire lock 確認自己是不是最新 writer
    • 也可以轉變成最新 writer 要去通知所有舊 writer 變成 reader
    • 總之就是必須協調 (coordinate) writer
    • 綜觀而言,以上作法是使用 latest-win 的規則來協調 writer,帶來最強的 consistency: linearizability latest-win (in CRDTs) 不是 RWM,即 write 不是基於前一次 read,所以不保證 linearizability。仍是 eventual consistency
    • 使用其他規則例如 add-win, longest-win 甚至是設定 writer 的優先度,有機會可以不這麼緊密的協調,writer 可以比較自由的寫入,但不保證當下結果會與別人同步,weaker consistency. e.g. CRDT (strong eventual consistency)

這邊的 Queue 只保證 Sequencial Consistency,即 consumer 抓取兩次資料,只有在這兩筆資料都來自於同一個 producer 時他們的次序才被保證和放進 Queue 的時候一樣。若是抓取到兩個來自不同的 producer 放進的 queue 的資料,則不保證與他們被放進 Queue 的次序一樣。在此保證下,來自不同的 producer 的操作是 commtative 的。

enqueue(pro1, data)

enqueue(pro2, data) 和 enqueue(pro2, data)
enqueue(pro1, data) 的結果是相同的

然而我們在 producer 放資料的時候卻是一個一個排隊放進去的:透過 atomic operations 嘗試基於最新的 index 再 + 1 。如果是要保證每個 enqueue 的次序為 total order 才需要這樣。queue 的輸入端應該要項漏斗一樣,producer 只管把資料地址丟近來,最後會用什麼順序漏進去 queue 裡面不重要 (假設此漏斗同一個人先丟進去的東西會先漏下去)。所以遇到 contention 的時候是直接往下一個寫不是等等再寫。不過 Queue 到底有沒有滿還是已經空了還是需要注意

幻想中的方法:
每個 producer 都有各自的 enq index,consumer deq index 是每個 producer enq index 加起來。因為 index 是 monotonically increased 的 shared resouces,adding is commutative