Try   HackMD

2017q1 Homework3 (mergesort-concurrent)

contributed by <ierosodin>

開發環境

作業系統 : elementary OS loki

$ lscpu

Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 12
On-line CPU(s) list: 0-11
Thread(s) per core: 2
Core(s) per socket: 6
Socket(s): 1
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 45
Model name: Genuine Intel® CPU @ 3.30GHz
Stepping: 5
CPU MHz: 2101.558
CPU max MHz: 3900.0000
CPU min MHz: 1200.0000
BogoMIPS: 6600.00
Virtualization: VT-x
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 15360K
NUMA node0 CPU(s): 0-11

C11 template

這次作業裡頭有使用到 _Generic ,搭配 macro 來實作 C++ 的 template ,不過編譯器必須要升級 gcc 4.9.1 以上才支援!

_Generic is not supported in gcc until version 4.9, see: https://gcc.gnu.org/wiki/C11Status

改為可接受 phonebook-concurrent 的資料檔

list.h

配合 phonebook 的字典檔與應用,更改 list.h

typedef struct node {
    char *lastName;
    struct node *next;
    val_t data;
} node_t;

main()

為了排序 sort -R 處理過的字典檔,引入 file.c。利用 file_align() 將字典檔中的每一筆資料都填滿至 MAX_LAST_NAME_SIZE。再利用 list_add()mmap 後的資料一筆一筆加到 list 中。

list.c

由於 merge sort 是進行"數字"大小的比較來排序,所以必須先將字串轉換為數字,這裡我是利用27進位搭配餘數,對每一位字元做轉換,最後產生一個數字來進行排序,轉換方法如下:

    val_t key = 0;
    for (int i = 0; i < MAX_LAST_NAME_SIZE - 1; i++) {
        int rem = 0;
        if (map[i] != '\0')
            rem = map[i] - 96;
        else
            ;
        key += rem* pow(26, (MAX_LAST_NAME_SIZE - i - 2));
    }

    node->data = key;

lock free

一個thread pool的方法, 會使用到mutex lock, 而導致程式的執行受到影響, 如threadpool中的push與pop, lock將導致其他thread無法在同時間放入或取得task

解決辦法

<方法一>
建立THREAD_NUM個worker thread, 讓每一個thread都有自己的工作queue, 同時為了達到thread pool的優勢 (不會因為不同thread執行速度不同, 導致資源的浪費), 由main thread進行task的分配.







hierarchy



Main_Thread

Main_Thread



Work_Queue1

Work_Queue1



Main_Thread->Work_Queue1





Work_Queue2

Work_Queue2



Main_Thread->Work_Queue2





Work_Queue3

Work_Queue3



Main_Thread->Work_Queue3





Worker_Thread_1

Worker_Thread_1



Work_Queue1->Worker_Thread_1





Worker_Thread_2

Worker_Thread_2



Work_Queue2->Worker_Thread_2





Worker_Thread_3

Worker_Thread_3



Work_Queue3->Worker_Thread_3





<方法二>
C11 atomic同步機制

CAS

compare & swap, C11實現lock free的方法
atomic_compare_exchange_weak(the_queue, &orig, next);
藉由不斷的compare, 確保給值時, 資料是最新的, 若資料被改動, 則用新的data重新執行工作

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

實作_Atomic

參考C11 Lock-free Stack

Two different threads will never receive the same item when popping. 
No elements will ever be lost if two threads attempt to push at the same time. 
Thread will never block on a lock when accessing the stack
  • initialize
int tqueue_init(tqueue_t *the_queue, size_t max_size)
{
    the_queue->head.aba = ATOMIC_VAR_INIT(0);
    the_queue->head.head = ATOMIC_VAR_INIT(NULL);
    the_queue->head.tail = ATOMIC_VAR_INIT(NULL);
    the_queue->size = ATOMIC_VAR_INIT(0);

    /* Pre-allocate all nodes. */
    the_queue->task_buffer = malloc(max_size * sizeof(struct _task));
    if (the_queue->task_buffer == NULL)
        return ENOMEM;
    for (size_t i = 0; i < max_size - 1; i++)
        the_queue->task_buffer[i].next = the_queue->task_buffer + i + 1;
    the_queue->task_buffer[max_size - 1].next = NULL;
    return 0;
}
  • task_pop
static struct _task *pop(_Atomic struct tqueue_head *the_queue)
{
    struct tqueue_head next, orig = atomic_load(the_queue);
    do {
        if (orig.tail == NULL)
            return NULL;
        
        next.aba = orig.aba + 1;
        next.tail = orig.tail->last;
    } while (!atomic_compare_exchange_weak(the_queue, &orig, next));
    return orig.tail;
}
  • task_push
static void push(_Atomic struct tqueue_head *the_queue, struct _task *task)
{
    struct tqueue_head next, orig = atomic_load(the_queue);
    task->last = NULL;
    do {
        task->next = orig.head;
        next.aba = orig.aba + 1;
        next.head->last = task;
        next.head = task;
    } while (!atomic_compare_exchange_weak(the_queue, &orig, next));
}
  • get size
static inline size_t tqueue_size(tqueue_t *the_queue)
{
    return atomic_load(&the_queue->size);
}

參考資料

C11 Lock-free Stack
王紹華同學的共筆
atomic cppreference
atomic CAS