Try   HackMD

2016q3 Homework2 (phonebook-concurrent)

contributed by <tundergod>

作業內容

預期目標

  • 學習第二週提及的 concurrency 程式設計
  • 學習 POSIX Thread
  • 學習效能分析工具
  • code refactoring 練習
  • 探索 clz 的應用

相關資料閱讀及重點整理

Toward Concurrency

  • The Free Lunch Is Over: A Fundamental Turn Toward Concurrency in Software

    • 程式開發未來的趨勢:因爲(1.)硬體(CPU速度)達到瓶頸(圖),(2.)高速緩存cache容量持續增加,轉而向concurrency模式發展.
    • concurrency的開銷相較於他的好處:各個線程能夠獨立控制流程和效能改進上並不重要
    • I/O device的技術一直在進步(I/O bound可能發生的幾率越來越小),相對的以來並行改進CPU bound的技術也越加重要
  • An Introduction to Lock-Free Programming

    • Lock-Free(reference):
      • 當一個程序在面對一系列訪問Look-Free操作的線程中,如果有某一個線程被lock-up,絕對不會影響其他線程的運行(Non-Blocking).
      • 在raytracing中的Pthread實作就是一個lock-free操作(每一個線程去獨立計算每一行/列等的pixels).上一篇文章中concurency的batabase例子也屬於lock-free操作
    • Lock-Free使用技術:
      • Atomic Read-Modify-Write Operations:Atomic Operations在操作內存時被系統視爲不可分割的,即線程不可以被中斷,例如對x = 1 賦值(參考:多线程程序中操作的原子性)
        • 原子性操作:
          • x=1在assembly code中的顯示爲:
          ​​​​mov dword ptr [x],1
          
          • x++在assembly code中的顯示爲:
          ​​​​mov eax,dword ptr [x]
          ​​​​add eax,1
          ​​​​mov dword ptr [x],eax
          
          上面兩個組合語言可以看到在賦值時只用了一條指令,而x++ 則用了三條(x=1不論在多少個線程同時進行的情況下都會=1,而x++ 則必須依據程式寫法判定)
        • 在多人同時撰寫程式時要使用RMW operation因爲當多個線程同時對一個地址進行操作時確保只會在同時間執行一個線程
      • Compare-And-Swap Loops:通過執行CAS的循環確保執行Atomic操作,例如Win32中的_InterlockedCompareExchange,該指令會在指定的address中讀取原始值再計算新值並檢查如果讀取位置值不變才寫入新值,例如:
      ​​void LockFreeQueue::push(Node* newHead)
      ​​{
      ​​​​​    for (;;) {
      ​​        // Copy a shared variable (m_Head) to a local.
      ​​        Node* oldHead = m_Head;
      ​​        // Do some speculative work, not yet visible to other threads.
      ​​​​​​​​        newHead->next = oldHead;
      ​​        // Next, attempt to publish our changes to the shared variable.
      ​​        // If the shared variable hasn't changed, the CAS succeeds and we return.
      ​​​​​​​​        // Otherwise, repeat.
      ​​​​​​​​        if (_InterlockedCompareExchange(&m_Head, newHead, oldHead) == oldHead)
      ​​​​​​​​            return;
      ​​​​​    }
      ​​}
      
      • 在處理CAS Loop時要特別小心ABA Problem:ABA 問題是若兩個線程同時對一個地址值進行操作但優先執行完畢的線程把原值A改爲B又改回A,這樣後執行的線程將會讀取錯誤的資料:
        上圖可以看到T1要執行pop,但是當他在讀取A->B時,T2搶佔了T1執行push的動作(Top->A->C),結束之後T1(已經pop掉A了所以T1的Top指向B)回來檢查Top跟一開始一樣指向A所以CAS完成,但是結果確實錯的因爲B已經被push並釋放了記憶體位置
        • 解決辦法:Garbage Collection(確保記憶體不再需要時才釋放)
      • Sequential Consistency:
        • 對於每個獨立的處理單元,執行時都維持程式的順序(Program Order)
        • 整個程式以某種順序在所有處理器上執行
        • Compiler Barrier :防止編譯器優化所產生的reordering,用法參考
  • Acquire and Release Semantics

    • 定義:
      • Acquire semantics :只能被視爲對共享內存的讀取(read-acquire),且禁止read-acquire之後的所有操作被提前到它之前執行
      • Release semantics :只能被視爲對共享內存的寫入(write-release),且禁止write-release之前的所有操作被延遲到它之後執行
      • 在weak memoey model中的四個memory reordering的acquire and release semantic:
  • Weak vs. Strong Memory Models

    • 四種memory reordering:loadload, loadstore, storeload, storestore
    • Weak memory model:可能存在所有的memory reordering
    • Weak with data dependency ordering:可能存在storeload和storestore的reordering(load的順序必能被保證)
    • Usually Stong:保證acquire and release semantic的執行,所以只存在storeload reordering
    • Sequentially consistent:全部被保證順序

Functional Programming

Concurrency (並行)

Concurrency (並行) vs. Parallelism (平行)

  • refences:
    • Concurrency is not Parallelism, by Rob Pike ,PPT
      用地鼠燒書做例子

      如果以平行的方式加快燒書的速度,就是增加多一只地鼠,一個推車或多一個焚燒盧,這樣就能做到平行.但是我們不能保證兩只或更多地鼠會同時進行(可能只有有限的火爐(在單核系統中只能允許一次進行一次的燒書工作,那樣就沒有效率了)

      以並行的方式去作業,能夠以不同的結構(並行的目標就是一個好的結構)去進行,可以是三個地鼠分別負責一部分的工作(decomposition)

      其中也可以有平行的設計元素



      影片中也提到例如OS的操作就是並行,OS同時要執行多個driver(keyboard,touchpad,mouse and etc),如果OS在管理這些driver的時候是平行管理,對整個系統都會造成巨大的負擔.

重構

  • 定義:書中把重構分爲動詞名詞兩種:
    • 名詞 : 對軟件內部結構的一種調整,在不改變程式可觀察行爲上提高其理解性並降低日後的修改成本
    • 動詞 : 使用一系列的重構手法,在不改變程式可觀察行爲的提前下,調整其結構
  • 目的:
    • 使程式更容易被理解和修改,但是相對的效能會有所降低(在重構不應該考慮效能問題,效能改進問題應該與重構分開進行,因爲重構在提高可讀性的同時也會提高優化的易難度)
  • 臭味道:書中把bad smell分爲22種類並都提供了解決方法
    ​Grandma Beck, discussing child(臭的尿布會幹擾孩子,應該換掉)
    ​	-rearing philosophy
    
  • 列出一些重要常用的來學習:
    • Duplicated Code : 重復的程式
    • Long Method : 太長的函式
    • Large Class : 太大的class(structure)會在後續的發展中生成許多其他問題
    • Long Parameter List : 太長的參數列表
    • Divergent Change : 發散式的變化,某個class常常因爲某些原因而往兩個方向發展.
    • Shotgun Surgery : 與divergent change相反,是當遇到變化時必須在許多class中調用參數或進行修改
    • Data Clumps : 幾個數據或程式會一直同時出現在一些地方
    • Switch Statement :
    • Speculative Generality : 沒有太大作用或是測試用程式
    • Temporary Field : 爲某些特殊情況設的變數
    • Message Chain : 程式的運行過程接連向多個object或是function發送request(A->B->C->result)
    • Middle Man : 過度運用delegation(委託)
    • Alternative Classses with Different Interfaces : 兩個函式做同一件事
    • Comment : 要寫大量注釋前先看看能否重構(When you feel the need to write a comment, first try to refactor the code so that any comment becomes superfluous.)

閱讀程式碼及修改

  • phonebook-concurrency利用mmap()和pthread的concurrency進行實作
    • mmap()可參考man pages記憶體映射函式 mmap 的使用方法, mmap()函數:
      ​​void *mmap(void *start,size_t length,
      ​​           int prot, int flags, int fd, off_t offsize);
      
      • start:指向要映射的起始位置(NULL是讓系統自行決定),函式會返回該起始位置
      • length:映射所需空間大小(code中利用fsize()判斷)
      • prot:映射區域保護方式(執行,讀取,寫入,不能存取)
      • flags:映射區域的特性(通常分爲MAP_SHARED或MAP_PRIVATE)
      • fd:由open()返回的file descriptor,代表要映射的檔案,O_RDONLY | O_NONBLOCK代表的是read only和以nonblocking的方式去存取檔案
      ​​int open(const char *pathname, int flags);
      
      • offset:檔案從起始位置的偏移量
      • pthread_setconcurrency()告訴系統明確的並行數量(沒有寫有時候會出錯)

請詳閱 man page: pthread_setconcurrency,最下方的 NOTES 要認真看 jserv

執行時間

size of entry : 136 bytes
execution time of append() : 0.071213 sec
execution time of findName() : 0.005603 sec
3
size of entry : 24 bytes
execution time of append() : 0.005196 sec
execution time of findName() : 0.004604 sec

驗證正確性

把 phonebook_opt建構的資料輸出並與dictionary中words.txt利用linux內建功能diff比較,結果:

0a1
> aaaa

發現在phonebook_opt少了aaaa,問題出現在main.c中

for (int i = 0; i < THREAD_NUM; i++) { if (i == 0) { pHead = app[i]->pHead->pNext; dprintf("Connect %d head string %s %p\n", i, app[i]->pHead->pNext->lastName, app[i]->ptr); } else { etmp->pNext = app[i]->pHead->pNext; dprintf("Connect %d head string %s %p\n", i, app[i]->pHead->pNext->lastName, app[i]->ptr); } ..... }

可以看到在第三行程式碼中,第一筆資料在設置pHead時指標指多一個pNext,所以他會跳過第一筆lastname,而第7行中etmp也犯了同樣的錯誤.只要把多指向的pNext拿掉就能夠得到正確的答案了.如下:

 pHead = app[i]->pHead;
 ...
 etmp->pNext = app[i]->pHead;

執行時間

「執行時間」是台灣科技慣用術語,「運行」是對岸說法。 jserv

opt:

size of entry : 24 bytes
execution time of append() : 0.005021 sec
execution time of findName() : 0.003749 sec

orig:

size of entry : 136 bytes
execution time of append() : 0.048624 sec
execution time of findName() : 0.005534 sec

圖標:

  • 4 thread

  • 各thread的比較圖

Refactoring

Thread Pool & Lock-Free Thread Pool

普通的Thread Pool

Lock-Free Thread Pool

理解lock-free Thread Pool中的訊號傳遞

  • linux系統定義了64重信號,可以通過kill -l指令查看
    • 前32個成爲不可靠信號,即不管發送多少次進程只能收到一個.
    • 後32個爲可靠信號,發送多少次系統就接收多少次
  • 內核處理接受到的信號是在當前進程的上下文(context),也就是進程在執行中的時候.當進程被喚醒或是被分配給CPU的時候會從kernel mode轉到 user mode,同時檢查是否有還沒有處理的信號在等待(還沒有處理的信號會被標上未處理的記號(等待處理),與阻塞是不一樣的概念,在阻塞中的信號進程是不會去處理的)
  • 老師在筆記中有說到其大體開發模式爲:
    ​sigemptyset(&zeromask); //初始化 ​sigemptyset(&newmask); //初始化 ​sigaddset(&newmask, SIGUSR); ​sigprocmask(SIG_BLOCK, &newmask, &oldmask) ; ​while (!CONDITION) { ​ sigsuspend(&zeromask); ​} ​sigprocmask(SIG_SETMASK, &oldmask, NULL)
    • 以上的程式碼是利用阻塞的形式在處理(其中一種處理的方式)
      • 這樣處理是爲了不被在電腦運行的其他信號幹擾或破壞環境
      • 介紹各函式的功能(成功返回0,失敗返回-1):
        • SIGUSR1和SIGUSR2是用戶自定義的信號,接受時會強行發出終止指令強行終止process的運行
        • sigemptyset(sigset_t *set): 初始化信號(全部清0)
        • sigfillset(sigset_t *set): 全部變成1
        • sigaddset(sigset_t *set, int signum):向信號集set中加入signum信號
        • sigdelser(sigset_t *set, int signum):在信號集set中刪除signum信號
        • sigismember(const sigset_t *set, int signum):判定signum信號是否在信號集set中
        • sigprocmask(int how, const sigset_t *set, sigset_t *oldset):是一種信號阻塞函式(也是lock-free thread pool的主要做法),how參數是不同的執行方法
          • SIG_BLOCK:將set指向oldset,並添加到阻塞信號集裏面
          • SIG_UNBLOCK:將set指向oldset,並從阻塞信號集裏面刪除
          • SIG_SETMASK:將set指向oldset,並設置成阻塞信號集
        • sigpending(sigset_t *set):獲取已經發送的卻被丟到阻塞信號集中的信號
        • sigsuspend(const sigset_t *mask):利用新的mask阻塞當前的進程,直到行程收到新的信號才會回復原本的mask,調用行程設置的處理函式,等信號處理函式返回sigsuspend才返回,sigsuspend是一個atomic操作
      • 程式碼解讀:
        • 在初始化zeromask,newmask之後把接收到的新信號SIGUSR1賦予newmask(在程式碼裏面把SIGUSR1設成了do nothing,因爲SIGUSR在爲設置處理函式的情況下會終止整個線程)
        • 然後會啓動信號處理函式(SIG_BLOCK method),即把新獲得的信號newmask~指向(連接到)~代替之前的信號oldmask~後面(他們在排隊)~
        • 然後程式就會判定condition是不是爲1(在c語言中實作concurrency因爲沒有內建函式之類的東西需要自定義一個類似flag的condition,老師的blog有實作例子),當thread pool中執行完畢就會設condition爲1 ,否則爲0
        • 如果執行緒還在跑着(condition=0),執行sigsuspend(&zeromask)去阻止現在的行程運作直到thread做完工作
        • 當thread做完,執行sigprocmask(SIG_SETMASK,&oldset,NULL)把舊的信號oldmask傳遞出去
        • 每一次接收到信號都會重復上述動作

第一次接觸信號的東西以上是我的想法和理解如果不對請大家救救我XDDLim Wen Sheng
疑問:在程式碼中可以看到每一個signal都在sigsuspend下等待,這樣也算是lock-free嗎?Lim Wen Sheng
這裏以我的理解,還有觀察sigsuspend的condition後,我覺得signal並不會在這裏等太久,每個訊號都會一直執行下去(除非condition真的不成立),而且這部分都不是critical section。最後按照我的理解,每個process會停留的地方,是在處理share variable的時候,也就是compare_and_swap跟fetch_and_add。 TempoJiJi
這裏其實我也還不是很清楚sigprocmask(SIG_BLOCK, &newmask, &oldmask)會不會像mutex_lock那樣阻止其它訊號執行下去,還需要做點實驗TempoJiJi
補充:我做了實驗後,發現sigprocmask(SIG_BLOCK, &newmask, &oldmask)是不會阻止其它process執行下去,所以只有在queue裏沒worker時,才會暫停TempoJiJi

Lock-Free Thread Pool流程與函式解析

  • Lock-Free Thread Pool的基本建立流程:
    1.~利用*tpool_init初始化thread pool(定義好有幾個thread和work queue)
    2.決定用什麼方式去分配工作到work queue中(round robin / least-load)
    3.利用spawn_new_threads()生成對應的threads
    4.利用wait_for_thread_registration檢查所有的threads是否已經準備好了
    5.利用*tpool_thread去做信號的溝通(建立semophere),呼叫get_work_concurrently建立work queue並把工作以concurrency的方式放進work queue中
    ~
    1. 先初始化thread pool,初始化的同時會生成新的threads,並等待work的到來,並利用get_work_concurrency在每一次CAS做完之後接收新的work
    2. 利用add_work函式和之中的分配演算法(round robin / least load)分配到work queue中,返回dispatch work to thread 這個函式
    3. dispatch_work2threads()會操控in函數把worker送出去做工

實作Lock-Free Thread Pool

  • 實作lock-free thread pool

    int cpu_num = sysconf(_SC_NPROCESSORS_CONF); ​​​​void *tpool = tpool_init(cpu_num); ​//thread num = 4 ​​​​for (int i = 0; i < THREAD_NUM; i++){ ​​​​ app[i] = new_append_a(map + MAX_LAST_NAME_SIZE * i, map + fs,THREAD_NUM, entry_pool + i); ​​​​ if(tpool_add_work(tpool, &append, (void*)app[i]) < 0) ​​​​ tpool_destroy(tpool,1); ​​​​} ​​​​tpool_destroy(tpool,1);
  • cpu time就是work queue的數量(上述運用範例程式碼的做法,也就是把電腦cpu數量帶進去執行)

  • 實驗經過正確驗證!

  • 運行時間:

    ​size of entry : 24 bytes
    ​execution time of append() : 0.103644 sec
    ​execution time of findName() : 0.003529 sec
    

​發現到append的時間非常的久,比original版本都慢了大約兩倍的時間
  • 各work queue和worker thread的比較結果圖:




可以看到在4個work queue和4個worker thread的時候是最快的.

tags: tundergod hw2 2016q3