Try   HackMD

Linux 核心專題: 重作第 10 和第 12 週測驗題

執行人: stevendd543
專題解說影片

Reviewed by LindaTing0106

下方提到關於 pipe 的簡介,另外還想請問在使用 pipe 時,你如何去確保傳遞的數據不會丟失或失真?另外,使用 pipe 是否會影響程序的運行效率或性能?

stevendd543pipe(7) 裡面有提到說兩個 process 在進行數據讀寫的時候 system call 的 readwrite 會有錯誤處理機制防止數據讀寫錯誤。也有提到說 pipe 的通信機制是建立在 kernel space 能夠使交換資料不必 context switch 造成效率低下。

Reviewedd by eleanorLYJ

在 第 12 週測驗的第 3 題部分,"讀寫逞罰" 是甚麼意思?

stevendd543 這裡是指當 thread 試圖對 queue 進行寫入或者讀取時。若為滿或為空就會將 thread 暫停數秒鐘等待其他 thread 對其有新的操作。

Reviewed by Shiang1212

在共享變數中擁有一個用來儲存 hazard pointers **HP,每一個 thread 都透過自己的 id(透過 alloc_tid 取得) 當作 index 去修改 HP[tid],被成功儲存在這的地址都是被保護不被別的 thread 刪除,可以從 lfq_dequeue_tid 得知要 dequeue 之前必須成功將資料加入 hazard pointer 中

第 10 週測驗的第 2 題,對於 hazard pointer 實作的描述有點混亂,一個段落裡描述太多東西了。建議你可以列點,把 HPlfq_dequeue_tidlfq_enqeue 的作用以及你的發現分開描述,以便讀者理解你想探討的議題。

至於延伸問題,我看到這次期末專題很多同學都使用 Userspace RCU,你可以參考其他同學的想法並嘗試實作。

stevendd543 了解 ! 我會修改我的描述,以及嘗試去時做看看。

Reviewed by kkkkk1109

pthread_barrier 是一個計數鎖用來同步同一進程產生的線程,因此就算被分配到其他 CPU 執行也是原屬於 pthread_create 的進程

根據 資訊科技詞彙翻譯 ,建議將 process 翻譯為 行程

stevendd543收到 !!

任務簡介

強化對於並行處理的認知。

TODO: 第 10 週測驗的第 1 題

包含延伸問題

程式碼解析筆記

Signal

這段程式碼主要是在並行環境下執行 bloom filter,首先在程式碼一開始有使用到 Signal,訊號是 processes 之間溝通的一種方式,舉例來說對 shell 輸入 Ctrl + c 就會對執行中的程式碼傳送訊號 SIGINT,因此在程式碼一開始先設定好信號處理程式碼,當收到SIGHUP 訊號表示通信中止,因此就能夠自定義通信中止處理函數 handle_sighup

注意用語:

  • default 是「預設」,不是「默認」,二者語境不同

signal(7) 提到如果沒有註冊 signal 處理機制,它將以預設的方式處理中止。

Using these system calls, a process can elect one of the following behaviors to occur on delivery of the signal: perform the default action; ignore the signal; or catch the signal with a signal handler, a programmer-defined function that is automatically invoked when the signal is delivered.

Pipe

務必使用本課程教材規範的術語!

pipe 是 linux support 的 Unix IPC 用於不同 process 之間的資料交換,pipe 只會存在於 memory 且它會具有幾個特性 1、單向通信 2、半雙工,所以 pipe 會回傳兩個 file description, fd[0] 是用於讀取,fd[1] 是用於寫入。

What child process do?

child process 是從 parent process create_worker 中產生,並且會關閉 pipe 讀取通道,只讓 child 擁有寫入的權力。並且將 parent process file description 將其與 pipe 寫入端做對應。

    pid_t pid = fork();
    if (!pid) {
        /* Worker */
        close(fd[0]);
        globals.parent_fd = fd[1];
        worker_loop();
        exit(0);
    }

什麼是「機率性地」?
改進你的漢語表達。

worker_loop 裡面有以 0.85 的機率呼叫bloom_test 0.15 的機率呼叫 bloom_add,並且記錄操作次數。

    while (1) {
        int n = (*k) % 100;
        if (n < CONTAINS_P) {
            bloom_test(globals.filter, key, key_len);
        } else {
            bloom_add(globals.filter, key, key_len);
        }
        (*k)++;
        globals.op_counter++;
    }

以上是 child process 從被創建後會完成的事情,但假設執行過程中程式被終止,將會有自定義的處理函數將紀錄資料透過 pipe 寫回 parent process。重新看回 parent process 一開始有將 SIGHUP 訊號設定對應的處裡函數,並且在程式結束前調用 system call: kill 傳遞 SIGHUP 訊號給 child processe 來安全地終止程式。

create_worker 中可以發現 child process 會關閉讀取通道,再進行 work_loop 將資料透過 write 寫入 pipe 供給 parent 日後 child 被停止時可以讀取,此執行函數內有兩個重要的函數第一個是 bloom_add 對資料進行 hash 後加入 bloom filter 中,再來是 bloom_test 主要是檢查此資料是否再 bloom filter 當中。而且 worker_loop 中是以 0.85 機率去執行檢驗、0.15 進行加入 bloom filter 操作,最後可以發現 kill 就是對程序 發起 SIGHUP 此值是在一開始就掛起 等待的訊號 。

什麼「掛起」,你知道自己在寫什麼嗎?不要看低劣的簡體中文材料,以第一手資訊為主,改進你的漢語表達。

Splitting hash values into 32-bit segments for separate storage

區分「函數」和「函式」,務必採用本課程教材規範的術語。

get 函式中會將 key 除以單一資料的 byte 大小來搜尋對應的 index of bytes,並在 1 byte 中設定 bit mask 來表示取得對應的資料。

static inline int get(bloom_t *filter, size_t key)
{
    uint64_t index = key / sizeof(char);
    uint8_t bit = 1u << (key % sizeof(char));
    return (filter->data[index] & bit) != 0;
}

bloom_add 透過將 64-bit hash value 切成兩分來當作有兩個 hash function 產生的 hash value。

void bloom_add(bloom_t *filter, const void *key, size_t keylen)
{
    uint64_t hbase = hash(key, keylen);
    uint32_t h1 = (hbase >> 32) % BLOOMFILTER_SIZE;
    uint32_t h2 = hbase % BLOOMFILTER_SIZE;
    set(filter, h1);
    set(filter, h2);
}
int bloom_test(bloom_t *filter, const void *key, size_t keylen)
{
    uint64_t hbase = hash(key, keylen);
    uint32_t h1 = (hbase >> 32) % BLOOMFILTER_SIZE;
    uint32_t h2 = hbase % BLOOMFILTER_SIZE;
    return get(filter, h1) && get(filter, h2);
}

延伸問題:

由於此程式是以 fork() 創建多的子行程,每個 process 彼此不共享定址空間,因此不需要擔心 race condition 問題。

TODO: 第 10 週測驗的第 2 題

包含延伸問題

程式碼解析筆記

Hazard point 主要是保護指標不會變成 dangling pointer 也就是資料被其他 thread 給刪除後自己無法取得。

這就是你的「總結」,難道程式碼這麼多行,你只能用這樣少的話語描述?

重新揣摩程式碼背後的考量、搭配教材,重新闡述。

程式碼架構

struct lfq_ctx {
    alignas(64) struct lfq_node *head;
    int count;
    struct lfq_node **HP; /* hazard pointers */
    int *tid_map;
    bool is_freeing;
    struct lfq_node *fph, *fpt; /* free pool head/tail */

    /* FIXME: get rid of struct. Make it configurable */
    int MAX_HP_SIZE;

    /* avoid cacheline contention */
    alignas(64) struct lfq_node *tail;
};

在共享變數中擁有一個用來儲存 hazard pointers **HP,每一個 thread 都透過自己的 id(透過 alloc_tid 取得) 當作 index 去修改 HP[tid],被成功儲存在這的地址都是被保護不被別的 thread 刪除,可以從 lfq_dequeue_tid 得知要 dequeue 之前必須成功將資料加入 hazard pointer 中。不過在一開始無法理解為何如果成功將 old_head 加入 HP 被受保護,還需要在離開迴圈時進行 ctx->headold_head 比較確認是一樣才能更換成新的 head,不是已經將其保護住了?後來思考才發現 Hazard pointers 是為了保護資料不被刪除,因此其他 thread 仍然可以對其 enqueue,所以如果 ctx->head 被更動的話就必須重新 dequeueenqueue 就不需考慮 HP 直接使用 atomic 放入即可不過會先虛假的更改 tail 最後才會作鏈接。

閱讀教材對應的論文。

    do {
    retry:
        /* continue jumps to the bottom of the loop, and would attempt a
         * atomic_compare_exchange_strong with uninitialized new_head.
         */
        old_head = atomic_load(&ctx->head);

        atomic_store(&ctx->HP[tid], old_head);
        atomic_thread_fence(memory_order_seq_cst);

        /* another thread freed it before seeing our HP[tid] store */
        if (old_head != atomic_load(&ctx->head))
            goto retry;
        new_head = atomic_load(&old_head->next);

        if (new_head == 0) {
            atomic_store(&ctx->HP[tid], 0);
            return NULL; /* never remove the last node */
        }
    } while (!atomic_compare_exchange_strong (&ctx->head, &old_head, new_head));

can_freeHP 是否衝突

可以從 safe_free 中看到判斷 node 是否存在於危險當中與是否節點標記為 can_free,這裡為好奇為何別人沒有在存取卻還要判斷是否 can_free

延伸問題: 藉由 thread-rcu 來改寫

因為目前對於 read-copy-update 應用在 dequeue 與 enqueue 上去作改寫 hazard pointer 的場景有些疑問,這兩個操作對於 RCU 來說的是 updater 那要如何改寫成 RCU 目前無法想通,希望閱讀者能提供一些想法與思路

你閱讀本課程的教材了嗎?

TODO: 第 10 週測驗的第 3 題

包含延伸問題

未完成

TODO: 第 12 週測驗的第 2 題

包含延伸問題

在解釋程式碼結構前,先理解 lap 在ring buffer 中扮演什麼角色以及它的用途。

struct chan_item {
    _Atomic uint32_t lap;
    void *data;
};

lap 是存放在每個緩衝資料格內的一個成員,他主要用來記錄目前此格資料的使用狀況,可以發現lap主要出現在傳送與接收資料的時候,首先有兩個特殊的 headtail 他在高位 32 bits 暗藏了 lap 的值,我們知道 tail 用來放入資料 head 用來取出資料,從trysend_buf中可以發現,他會先取得 channel 尾端的 item 位置之後再將其位置紀錄的 lap 與存放在 tail 高位的 lap 做比對,這裡重點有幾個,存放在 tail 變數中的是獨立的,與 channel item 內存放的並不相同,為了區分頭尾,此不等式若比較失敗就表示無法加入或者無法寄送資料到 channel 內。







RingBuffer



Head

Head



Node0

1



Head->Node0





Tail

Tail



Node4

0



Tail->Node4





Node1

1



Node0->Node1





Node2

1



Node1->Node2





Node3

1



Node2->Node3





Node3->Node4





Node4->Node0





這裡可發現 tail 放入資料後都會 +1 ,在 +1 前每個 channel item 的 lap 都是 0,而且 tail 高位存放的 lap 還沒到尾端不會 +2 所以也是 0。







RingBuffer



Head

Head



Node2

1



Head->Node2





Tail

Tail



Node4

0



Tail->Node4





Node0

2



Node1

2



Node0->Node1





Node1->Node2





Node3

1



Node2->Node3





Node3->Node4





Node4->Node0





在看這張圖假設 head 取了兩個元素就會對 channel item lap +1 變成 2,如此一來當 ring buffer tail 走了一圈 +2 後就知道原本上一輪寫入的資料已經被取用,因此可以將其視為空格進行填入資料。換作是 head 也是同樣判斷方式,但是要等到資料放入 lap 變為 1 時才可讀取,這也是為何當初 channel_init 要將其 head 設為 1 的原因。

ch->head = (uint64_t) 1 << 32;
ch->tail = 0;
    tail = atomic_load_explicit(&ch->tail, memory_order_acquire);
    uint32_t pos = tail, lap = tail >> 32;
    item = ch->ring + pos;

    if (atomic_load_explicit(&item->lap, memory_order_acquire) != lap) {
        errno = EAGAIN;
        return -1;
    }

channel 的資料結構

struct chan_item {
    _Atomic uint32_t lap;
    void *data;
};

struct chan {
    _Atomic bool closed;
    _Atomic(void **) datap;
    struct mutex send_mtx, recv_mtx;
    _Atomic uint32_t send_ftx, recv_ftx;
    _Atomic size_t send_waiters, recv_waiters;
    
    size_t cap;
    _Atomic uint64_t head, tail;
    struct chan_item ring[0];
};

typedef void *(*chan_alloc_func_t)(size_t);

strcut chan 主要是用來實作 go channel 的一些機制,它分成緩衝與非緩衝通道。非緩衝通道並非完全沒有空間,只是他只能存放一個元素,直到他被取出否則他 routine 將無法推入新的元素而被暫停,此法更 mutex lock 一樣可以保護共享變數。

  1. bool closed: 用來表示 channel 是否還能使用
  2. (void **)datap: 用於無緩衝通道交換數據
  3. send or recv mtx: 用於保證最多只有一個存取權
  4. send or recv waiters:用於緩衝通道,表示在 futex wait 的 thead 數量
  5. send_ftx, recv_ftx: 用於管理等待的 thread

Thread safety:

注意用語,務必詳閱 https://hackmd.io/@sysprog/it-vocabulary

不要閱讀低劣的簡體中文素材。

1、通過 atomic 操作和 mutex 保證了 multi-thread 環境下的安全性。
2、send_waitersrecv_waiters 計數器幫助管理等待的線程,優化了喚醒機制。

ThreadSanitizer

WARNING: ThreadSanitizer: data race (pid=123819) Read of size 8 at 0x7ff2c99be2a8 by thread T1: #0 reader /home/dong/linux2024/gcftx/main.c:42 (channel_test+0x2e94) Previous write of size 8 at 0x7ff2c99be2a8 by thread T81: #0 chan_send_unbuf /home/dong/linux2024/gcftx/chan.c:180 (channel_test+0x266d) #1 chan_send /home/dong/linux2024/gcftx/chan.c:261 (channel_test+0x2c8d) #2 writer /home/dong/linux2024/gcftx/main.c:31 (channel_test+0x2da5) Location is stack of thread T1. Thread T1 (tid=123821, running) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:962 (libtsan.so.0+0x5ea79) #1 create_threads /home/dong/linux2024/gcftx/main.c:70 (channel_test+0x309f) #2 test_chan /home/dong/linux2024/gcftx/main.c:101 (channel_test+0x3356) #3 main /home/dong/linux2024/gcftx/main.c:115 (channel_test+0x34fb) Thread T81 (tid=123901, running) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:962 (libtsan.so.0+0x5ea79) #1 create_threads /home/dong/linux2024/gcftx/main.c:70 (channel_test+0x309f) #2 test_chan /home/dong/linux2024/gcftx/main.c:102 (channel_test+0x3381) #3 main /home/dong/linux2024/gcftx/main.c:115 (channel_test+0x34fb) SUMMARY: ThreadSanitizer: data race /home/dong/linux2024/gcftx/main.c:42 in reader

以上是發現是在 unbuf 時產生 race condition,因此會去探討在所有在無 buffer 情況下的共享變數,不過在檢查的時候發現反而是每個 thread 的區域 msg 接收資料的時候出現 data race。

    // sender
if (!atomic_compare_exchange_strong_explicit(&ch->datap, &ptr, &data,
                                                 memory_order_acq_rel,
                                                 memory_order_acquire)) 
    *ptr = data;  // data race

    // reader 
atomic_fetch_add(&msg_count[a->msg],1);  // data race


為何非 shared memory 也會有 data race 發生?

由此可發現只有在 channel 還有資料的時候才會發生 data race,初步認為是 *ptr 的存取造成的 data race,因為前面條件不成立時 ptr = ch->datap,因此為了避免在此處操作有衝突將其改成以下原子操作 82dfeeb

// recv
atomic_store(data, *ptr);
// send
atomic_store(&ptr, &data);

ptr 是指向 unbuffer 的變數也就是 channel,不管是從 channel 儲存資料至 thread data 或是 thread data 儲存到 channel 都以原子操作完成即可避免 threads data race。我們知道 senders 或 readers 之間都有 futex 管理,不管是在 critical section 或者作為 channel 資料有無的管理都可以達成,但 sender 與 reader 間是沒有 critical section,所以會有資料被對方清除的可能導致最後不是每一個 msg 都能成功傳遞,主要導因為 unbuffer 不用 ring buffer 一樣在對 data 讀寫之後會儲存一個 lap 在資料結構上,提供給下一個使用者是否可存取資料,以上步驟將會同時發生,但 unbuffer 的防範機制在於以下。

   if (!atomic_compare_exchange_strong_explicit(&ch->datap, &ptr, data,
                                                 memory_order_acq_rel,	                                                 memory_order_acq_rel,
                                                 memory_order_acquire)) {	                                                 memory_order_acquire)) {
        atomic_store_explicit(data, *ptr, memory_order_release);
        atomic_store_explicit(&ch->datap, NULL, memory_order_release);	        
   }

sender 與 reader 都一樣只要通道還有資料就不對通到直接修改值,而是先取得通道資料或寫入資料後將通道設為 NULL,因此假設兩個 thread 同時進入這個條件區域可能在讀取或寫入前就被改成 NULL,造成讀取失敗。

TODO: 第 12 週測驗的第 3 題

包含延伸問題

Enqueue and Dequeue

在進行讀取修改時會針對 union 結構修改,union 結構分成 volatile uint32_t w volatile const uint32_t r 這可以讓在同一個記憶體位置操作不同的變數,來提高程式碼可讀性,但可以發現不管是讀寫操作都沒有使用到 atomic 操作,我認為是因為 single producer and single consumer 彼此之間並不會有 thread safety 問題,因為會有 batch size 來確認邊界和 wait_ticks(SPSC_CONGESTION_PENALTY) 來作讀寫逞罰,來避免過度競爭。

CPU affinity

在主函數中針對每一個 CPU 都會初始化一個 queue 配上一個負責生產所有 queues 資料的 CPU,每個 consumer 都會配置以自己 id 兩倍對應的 CPU。但要如何在不同 CPU 達到同步等待?

cpu_set_t cur_mask; CPU_ZERO(&cur_mask); // initiate cpu mask CPU_SET(cpu_id * 2, &cur_mask); sched_setaffinity(0, sizeof(cur_mask), &cur_mask);

pthread_barrier_wait

pthread_barrier 是一個計數鎖用來同步同一行程產生的線程,因此就算被分配到其他 CPU 執行也是原屬於 pthread_create 的進程

false sharing

注意用語:

  • access 是「存取」,而非「訪問」(visit)

務必使用本課程教材規範的術語。

它發生在多個 CPU 同時存取 同一個 cache line 時性能低下,這都是因為 Cache Coherence Protocol 引發的問題,因此程式碼中有針對共享資料作對齊優化。tail 因為頻繁被多的 consumer 修改因此對其作對齊優化,不過這裡尚不懂的地方是,程式碼當初是建立在多個 consumers 有自己對應的 queue 為何還需要對齊來避免 false sharing?

為何不作實驗來驗證自己的想法呢?避免「舉燭」

typedef struct spsc_queue {
    counter_t head; /* Mostly accessed by producer */
    volatile uint32_t batch_head;
    counter_t tail __ALIGN; /* Mostly accessed by consumer */
    volatile uint32_t batch_tail;
    unsigned long batch_history;

    /* For testing purpose */
    uint64_t start_c __ALIGN;
    uint64_t stop_c;

    element_t data[SPSC_QUEUE_SIZE] __ALIGN; /* accessed by prod and coms */
} __ALIGN spsc_queue_t;