# Linux 核心專題: 重作第 10 和第 12 週測驗題 > 執行人: stevendd543 > [專題解說影片](https://youtu.be/dKtJYr0xGSw) ### Reviewed by `LindaTing0106` 下方提到關於 pipe 的簡介,另外還想請問在使用 pipe 時,你如何去確保傳遞的數據不會丟失或失真?另外,使用 pipe 是否會影響程序的運行效率或性能? > [name=stevendd543][pipe(7)](https://man7.org/linux/man-pages/man7/pipe.7.html) 裡面有提到說兩個 process 在進行數據讀寫的時候 system call 的 `read` 與 `write` 會有錯誤處理機制防止數據讀寫錯誤。也有提到說 pipe 的通信機制是建立在 kernel space 能夠使交換資料不必 context switch 造成效率低下。 ### Reviewedd by `eleanorLYJ` 在 第 12 週測驗的第 3 題部分,"讀寫逞罰" 是甚麼意思? > [name=stevendd543] 這裡是指當 thread 試圖對 queue 進行寫入或者讀取時。若為滿或為空就會將 thread 暫停數秒鐘等待其他 thread 對其有新的操作。 ### Reviewed by `Shiang1212` > 在共享變數中擁有一個用來儲存 hazard pointers **HP,每一個 thread 都透過自己的 id(透過 alloc_tid 取得) 當作 index 去修改 HP[tid],被成功儲存在這的地址都是被保護不被別的 thread 刪除,可以從 lfq_dequeue_tid 得知要 dequeue 之前必須成功將資料加入 hazard pointer 中... [第 10 週測驗的第 2 題](https://hackmd.io/@sysprog/SJXN0y9UR#TODO-%E7%AC%AC-10-%E9%80%B1%E6%B8%AC%E9%A9%97%E7%9A%84%E7%AC%AC-2-%E9%A1%8C),對於 hazard pointer 實作的描述有點混亂,一個段落裡描述太多東西了。建議你可以列點,把 `HP`、`lfq_dequeue_tid`、`lfq_enqeue` 的作用以及你的發現分開描述,以便讀者理解你想探討的議題。 至於延伸問題,我看到這次期末專題很多同學都使用 [Userspace RCU](https://github.com/urcu/userspace-rcu/blob/master/doc/rcu-api.md),你可以參考其他同學的想法並嘗試實作。 > [name=stevendd543] 了解 ! 我會修改我的描述,以及嘗試去時做看看。 ### Reviewed by `kkkkk1109` > pthread_barrier 是一個計數鎖用來同步同一~~進程~~產生的~~線程~~,因此就算被分配到其他 CPU 執行也是原屬於 pthread_create 的~~進程~~ 根據 [資訊科技詞彙翻譯](https://hackmd.io/@sysprog/it-vocabulary) ,建議將 process 翻譯為 **行程** 。 > [name=stevendd543]收到 !! ## 任務簡介 強化對於並行處理的認知。 ## TODO: 第 10 週測驗的第 1 題 > 包含延伸問題 #### 程式碼解析筆記 ### Signal 這段程式碼主要是在並行環境下執行 bloom filter,首先在程式碼一開始有使用到 **Signal**,訊號是 processes 之間溝通的一種方式,舉例來說對 shell 輸入 `Ctrl + c` 就會對執行中的程式碼傳送訊號 `SIGINT`,因此在程式碼一開始先設定好信號處理程式碼,當收到`SIGHUP` 訊號表示通信中止,因此就能夠自定義通信中止處理函數 `handle_sighup`。 :::danger 注意用語: * default 是「預設」,不是「默認」,二者語境不同 ::: [signal(7)](https://man7.org/linux/man-pages/man7/signal.7.html) 提到如果沒有註冊 signal 處理機制,它將以預設的方式處理中止。 > Using these system calls, a process can elect one of the following behaviors to occur on delivery of the signal: perform the default action; ignore the signal; or catch the signal with a signal handler, a programmer-defined function that is automatically invoked when the signal is delivered. ### Pipe :::danger 務必使用本課程教材規範的術語! ::: [pipe](https://man7.org/linux/man-pages/man2/pipe.2.html) 是 linux support 的 Unix IPC 用於不同 process 之間的資料交換,pipe 只會存在於 memory 且它會具有幾個特性 1、單向通信 2、半雙工,所以 pipe 會回傳兩個 file description, fd[0] 是用於讀取,fd[1] 是用於寫入。 ### What child process do? child process 是從 parent process `create_worker` 中產生,並且會關閉 pipe 讀取通道,只讓 child 擁有寫入的權力。並且將 parent process file description 將其與 pipe 寫入端做對應。 ```c pid_t pid = fork(); if (!pid) { /* Worker */ close(fd[0]); globals.parent_fd = fd[1]; worker_loop(); exit(0); } ``` :::danger 什麼是「機率性地」? 改進你的漢語表達。 ::: `worker_loop` 裡面有以 0.85 的機率呼叫`bloom_test` 0.15 的機率呼叫 `bloom_add`,並且記錄操作次數。 ```c while (1) { int n = (*k) % 100; if (n < CONTAINS_P) { bloom_test(globals.filter, key, key_len); } else { bloom_add(globals.filter, key, key_len); } (*k)++; globals.op_counter++; } ``` 以上是 child process 從被創建後會完成的事情,但假設執行過程中程式被終止,將會有自定義的處理函數將紀錄資料透過 pipe 寫回 parent process。重新看回 parent process 一開始有將 SIGHUP 訊號設定對應的處裡函數,並且在程式結束前調用 system call: [kill](https://man7.org/linux/man-pages/man2/kill.2.html) 傳遞 SIGHUP 訊號給 child processe 來安全地終止程式。 <s> 從 `create_worker` 中可以發現 child process 會關閉讀取通道,再進行 `work_loop` 將資料透過 `write` 寫入 `pipe` 供給 parent 日後 child 被停止時可以讀取,此執行函數內有兩個重要的函數第一個是 `bloom_add` 對資料進行 hash 後加入 bloom filter 中,再來是 `bloom_test` 主要是檢查此資料是否再 bloom filter 當中。而且 `worker_loop` 中是以 0.85 機率去執行檢驗、0.15 進行加入 bloom filter 操作,最後可以發現 `kill` 就是對<s>程序</s> 發起 `SIGHUP` 此值是在一開始就<s>掛起</s> 等待的訊號 。 </s> :::danger 什麼「掛起」,你知道自己在寫什麼嗎?不要看低劣的簡體中文材料,以第一手資訊為主,改進你的漢語表達。 ::: ### Splitting hash values into 32-bit segments for separate storage :::danger 區分「函數」和「函式」,務必採用本課程教材規範的術語。 ::: `get` 函式中會將 key 除以單一資料的 byte 大小來搜尋對應的 index of bytes,並在 1 byte 中設定 bit mask 來表示取得對應的資料。 ```c static inline int get(bloom_t *filter, size_t key) { uint64_t index = key / sizeof(char); uint8_t bit = 1u << (key % sizeof(char)); return (filter->data[index] & bit) != 0; } ``` `bloom_add` 透過將 64-bit hash value 切成兩分來當作有兩個 hash function 產生的 hash value。 ```c void bloom_add(bloom_t *filter, const void *key, size_t keylen) { uint64_t hbase = hash(key, keylen); uint32_t h1 = (hbase >> 32) % BLOOMFILTER_SIZE; uint32_t h2 = hbase % BLOOMFILTER_SIZE; set(filter, h1); set(filter, h2); } ``` ```c int bloom_test(bloom_t *filter, const void *key, size_t keylen) { uint64_t hbase = hash(key, keylen); uint32_t h1 = (hbase >> 32) % BLOOMFILTER_SIZE; uint32_t h2 = hbase % BLOOMFILTER_SIZE; return get(filter, h1) && get(filter, h2); } ``` ### 延伸問題: 由於此程式是以 `fork()` 創建多的子進程,每個 process 彼此不共享內存空間,因此不需要擔心 race condition 問題。 ## TODO: 第 10 週測驗的第 2 題 > 包含延伸問題 #### 程式碼解析筆記 Hazard point 主要是保護指標不會變成 dangling pointer 也就是資料被其他 thread 給刪除後自己無法取得。 :::danger 這就是你的「總結」,難道程式碼這麼多行,你只能用這樣少的話語描述? 重新揣摩程式碼背後的考量、搭配教材,重新闡述。 ::: #### 程式碼架構 ```c struct lfq_ctx { alignas(64) struct lfq_node *head; int count; struct lfq_node **HP; /* hazard pointers */ int *tid_map; bool is_freeing; struct lfq_node *fph, *fpt; /* free pool head/tail */ /* FIXME: get rid of struct. Make it configurable */ int MAX_HP_SIZE; /* avoid cacheline contention */ alignas(64) struct lfq_node *tail; }; ``` 在共享變數中擁有一個用來儲存 hazard pointers `**HP`,每一個 thread 都透過自己的 id(透過 `alloc_tid` 取得) 當作 index 去修改 `HP[tid]`,被**成功**儲存在這的地址都是被保護不被別的 thread 刪除,可以從 `lfq_dequeue_tid` 得知要 dequeue 之前必須成功將資料加入 hazard pointer 中。不過在一開始無法理解為何如果成功將 `old_head` 加入 `HP` 被受保護,還需要在離開迴圈時進行 `ctx->head` 與 `old_head` 比較確認是一樣才能更換成新的 head,不是已經將其保護住了?後來思考才發現 Hazard pointers 是為了保護資料**不被刪除**,因此其他 thread 仍然可以對其 enqueue,所以如果 `ctx->head` 被更動的話就必須重新 `dequeue`。`enqueue` 就不需考慮 HP 直接使用 atomic 放入即可不過會先虛假的更改 tail 最後才會作鏈接。 :::danger 閱讀教材對應的論文。 ::: ```c do { retry: /* continue jumps to the bottom of the loop, and would attempt a * atomic_compare_exchange_strong with uninitialized new_head. */ old_head = atomic_load(&ctx->head); atomic_store(&ctx->HP[tid], old_head); atomic_thread_fence(memory_order_seq_cst); /* another thread freed it before seeing our HP[tid] store */ if (old_head != atomic_load(&ctx->head)) goto retry; new_head = atomic_load(&old_head->next); if (new_head == 0) { atomic_store(&ctx->HP[tid], 0); return NULL; /* never remove the last node */ } } while (!atomic_compare_exchange_strong (&ctx->head, &old_head, new_head)); ``` #### `can_free` 與 `HP` 是否衝突 可以從 `safe_free` 中看到判斷 node 是否存在於危險當中與是否節點標記為 can_free,這裡為好奇為何別人沒有在存取卻還要判斷是否 can_free #### 延伸問題: 藉由 thread-rcu 來改寫 因為目前對於 read-copy-update 應用在 dequeue 與 enqueue 上去作改寫 hazard pointer 的場景有些疑問,這兩個操作對於 RCU 來說的是 updater 那要如何改寫成 RCU 目前無法想通,**希望閱讀者能提供一些想法與思路**。 :::danger 你閱讀本課程的教材了嗎? ::: ## TODO: 第 10 週測驗的第 3 題 > 包含延伸問題 未完成 ## TODO: 第 12 週測驗的第 2 題 > 包含延伸問題 在解釋程式碼結構前,先理解 **lap** 在ring buffer 中扮演什麼角色以及它的用途。 ```c struct chan_item { _Atomic uint32_t lap; void *data; }; ``` `lap` 是存放在每個緩衝資料格內的一個成員,他主要用來記錄目前此格資料的使用狀況,可以發現`lap`主要出現在傳送與接收資料的時候,首先有兩個特殊的 `head` 與 `tail` 他在高位 32 bits 暗藏了 `lap` 的值,我們知道 `tail` 用來放入資料 `head` 用來取出資料,從`trysend_buf`中可以發現,他會先取得 channel 尾端的 item 位置之後再將其位置紀錄的 `lap` 與存放在 `tail` 高位的 `lap` 做比對,這裡重點有幾個,存放在 `tail` 變數中的是獨立的,與 channel item 內存放的並不相同,為了區分頭尾,此不等式若比較失敗就表示無法加入或者無法寄送資料到 channel 內。 ```graphviz digraph RingBuffer { node [shape=circle, fixedsize=true, width=0.5]; rankdir=LR; Head [label="Head", shape=circle]; Tail [label="Tail", shape=circle]; Node0 [label="1"]; Node1 [label="1"]; Node2 [label="1"]; Node3 [label="1"]; Node4 [label="0"]; Head -> Node0 [label=""]; Node0 -> Node1 [label=""]; Node1 -> Node2 [label=""]; Node2 -> Node3 [label=""]; Node3 -> Node4 [label=""]; Node4 -> Node0 [label=""]; Tail -> Node4[label=""]; } ``` 這裡可發現 `tail` 放入資料後都會 +1 ,在 +1 前每個 channel item 的 lap 都是 0,而且 `tail` 高位存放的 `lap` 還沒到尾端不會 +2 所以也是 0。 ```graphviz digraph RingBuffer { node [shape=circle, fixedsize=true, width=0.5]; rankdir=LR; Head [label="Head", shape=circle]; Tail [label="Tail", shape=circle]; Node0 [label="2"]; Node1 [label="2"]; Node2 [label="1"]; Node3 [label="1"]; Node4 [label="0"]; Head -> Node2 [label=""]; Node0 -> Node1 [label=""]; Node1 -> Node2 [label=""]; Node2 -> Node3 [label=""]; Node3 -> Node4 [label=""]; Node4 -> Node0 [label=""]; Tail -> Node4[label=""]; } ``` 在看這張圖假設 `head` 取了兩個元素就會對 channel item lap +1 變成 2,如此一來當 ring buffer tail 走了一圈 +2 後就知道原本上一輪寫入的資料已經被取用,因此可以將其視為空格進行填入資料。換作是 `head` 也是同樣判斷方式,但是要等到資料放入 `lap` 變為 1 時才可讀取,這也是為何當初 `channel_init` 要將其 `head` 設為 1 的原因。 ```c ch->head = (uint64_t) 1 << 32; ch->tail = 0; ``` ```c tail = atomic_load_explicit(&ch->tail, memory_order_acquire); uint32_t pos = tail, lap = tail >> 32; item = ch->ring + pos; if (atomic_load_explicit(&item->lap, memory_order_acquire) != lap) { errno = EAGAIN; return -1; } ``` channel 的資料結構 ```c struct chan_item { _Atomic uint32_t lap; void *data; }; struct chan { _Atomic bool closed; _Atomic(void **) datap; struct mutex send_mtx, recv_mtx; _Atomic uint32_t send_ftx, recv_ftx; _Atomic size_t send_waiters, recv_waiters; size_t cap; _Atomic uint64_t head, tail; struct chan_item ring[0]; }; typedef void *(*chan_alloc_func_t)(size_t); ``` `strcut chan` 主要是用來實作 go channel 的一些機制,它分成緩衝與非緩衝通道。非緩衝通道並非完全沒有空間,只是他只能存放一個元素,直到他被取出否則他 routine 將無法推入新的元素而被暫停,此法更 mutex lock 一樣可以保護共享變數。 1. `bool closed`: 用來表示 channel 是否還能使用 2. `(void **)datap`: 用於無緩衝通道交換數據 3. `send or recv mtx`: 用於保證最多只有一個存取權 4. `send or recv waiters`:用於緩衝通道,表示在 futex wait 的 thead 數量 5. `send_ftx, recv_ftx`: 用於管理等待的 thread #### Thread safety: :::danger 注意用語,務必詳閱 https://hackmd.io/@sysprog/it-vocabulary 不要閱讀低劣的簡體中文素材。 ::: 1、通過 atomic 操作和 `mutex` 保證了 multi-thread 環境下的安全性。 2、`send_waiters` 和 `recv_waiters` 計數器幫助管理等待的線程,優化了喚醒機制。 #### ThreadSanitizer ```shell= WARNING: ThreadSanitizer: data race (pid=123819) Read of size 8 at 0x7ff2c99be2a8 by thread T1: #0 reader /home/dong/linux2024/gcftx/main.c:42 (channel_test+0x2e94) Previous write of size 8 at 0x7ff2c99be2a8 by thread T81: #0 chan_send_unbuf /home/dong/linux2024/gcftx/chan.c:180 (channel_test+0x266d) #1 chan_send /home/dong/linux2024/gcftx/chan.c:261 (channel_test+0x2c8d) #2 writer /home/dong/linux2024/gcftx/main.c:31 (channel_test+0x2da5) Location is stack of thread T1. Thread T1 (tid=123821, running) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:962 (libtsan.so.0+0x5ea79) #1 create_threads /home/dong/linux2024/gcftx/main.c:70 (channel_test+0x309f) #2 test_chan /home/dong/linux2024/gcftx/main.c:101 (channel_test+0x3356) #3 main /home/dong/linux2024/gcftx/main.c:115 (channel_test+0x34fb) Thread T81 (tid=123901, running) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:962 (libtsan.so.0+0x5ea79) #1 create_threads /home/dong/linux2024/gcftx/main.c:70 (channel_test+0x309f) #2 test_chan /home/dong/linux2024/gcftx/main.c:102 (channel_test+0x3381) #3 main /home/dong/linux2024/gcftx/main.c:115 (channel_test+0x34fb) SUMMARY: ThreadSanitizer: data race /home/dong/linux2024/gcftx/main.c:42 in reader ``` 以上是發現是在 unbuf 時產生 race condition,因此會去探討在所有在無 buffer 情況下的共享變數,不過在檢查的時候發現反而是每個 thread 的區域 `msg` 接收資料的時候出現 data race。 ```c // sender if (!atomic_compare_exchange_strong_explicit(&ch->datap, &ptr, &data, memory_order_acq_rel, memory_order_acquire)) *ptr = data; // data race // reader atomic_fetch_add(&msg_count[a->msg],1); // data race ``` > 為何非 shared memory 也會有 data race 發生? 由此可發現只有在 channel 還有資料的時候才會發生 data race,初步認為是 `*ptr` 的存取造成的 data race,因為前面條件不成立時 `ptr = ch->datap`,因此為了避免在此處操作有衝突將其改成以下原子操作 [82dfeeb](https://github.com/stevendd543/go_channel_with_C/commit/82dfeeb9fc69f9fc9fcf125a8a2ff3af2f7b284d)。 ```c // recv atomic_store(data, *ptr); // send atomic_store(&ptr, &data); ``` `ptr` 是指向 unbuffer 的變數也就是 channel,不管是從 channel 儲存資料至 thread data 或是 thread data 儲存到 channel 都以原子操作完成即可避免 threads data race。我們知道 senders 或 readers 之間都有 futex 管理,不管是在 critical section 或者作為 channel 資料有無的管理都可以達成,**但 sender 與 reader 間是沒有 critical section**,所以會有資料被對方清除的可能導致最後不是每一個 `msg` 都能成功傳遞,主要導因為 unbuffer 不用 ring buffer 一樣在對 data 讀寫之後會儲存一個 lap 在資料結構上,提供給下一個使用者是否可存取資料,以上步驟將會同時發生,但 unbuffer 的防範機制在於以下。 ```c if (!atomic_compare_exchange_strong_explicit(&ch->datap, &ptr, data, memory_order_acq_rel, memory_order_acq_rel, memory_order_acquire)) { memory_order_acquire)) { atomic_store_explicit(data, *ptr, memory_order_release); atomic_store_explicit(&ch->datap, NULL, memory_order_release); } ``` sender 與 reader 都一樣只要通道還有資料就不對通到直接修改值,而是先取得通道資料或寫入資料後將通道設為 `NULL`,因此假設兩個 thread 同時進入這個條件區域可能在讀取或寫入前就被改成 NULL,造成讀取失敗。 ## TODO: 第 12 週測驗的第 3 題 > 包含延伸問題 #### Enqueue and Dequeue 在進行讀取修改時會針對 union 結構修改,union 結構分成 `volatile uint32_t w` 和 ` volatile const uint32_t r` 這可以讓在同一個記憶體位置操作不同的變數,來提高程式碼可讀性,但可以發現不管是讀寫操作都沒有使用到 atomic 操作,我認為是因為 single producer and single consumer 彼此之間並不會有 thread safety 問題,因為會有 batch size 來確認邊界和 `wait_ticks(SPSC_CONGESTION_PENALTY)` 來作讀寫逞罰,來避免過度競爭。 #### CPU affinity 在主函數中針對每一個 CPU 都會初始化一個 queue 配上一個負責生產所有 queues 資料的 CPU,每個 consumer 都會配置以自己 id 兩倍對應的 CPU。**但要如何在不同 CPU 達到同步等待?** ```c= cpu_set_t cur_mask; CPU_ZERO(&cur_mask); // initiate cpu mask CPU_SET(cpu_id * 2, &cur_mask); sched_setaffinity(0, sizeof(cur_mask), &cur_mask); ``` #### pthread_barrier_wait [pthread_barrier](https://linux.die.net/man/3/pthread_barrier_init) 是一個計數鎖用來同步**同一行程**產生的線程,因此就算被分配到其他 CPU 執行也是原屬於 pthread_create 的進程 #### false sharing :::danger 注意用語: * access 是「存取」,而非「訪問」(visit) 務必使用本課程教材規範的術語。 ::: 它發生在多個 CPU 同時存取 同一個 cache line 時性能低下,這都是因為 Cache Coherence Protocol 引發的問題,因此程式碼中有針對共享資料作對齊優化。`tail` 因為頻繁被多的 consumer 修改因此對其作對齊優化,**不過這裡尚不懂的地方是,程式碼當初是建立在多個 consumers 有自己對應的 queue 為何還需要對齊來避免 false sharing?** :::danger 為何不作實驗來驗證自己的想法呢?避免「舉燭」 ::: ```c typedef struct spsc_queue { counter_t head; /* Mostly accessed by producer */ volatile uint32_t batch_head; counter_t tail __ALIGN; /* Mostly accessed by consumer */ volatile uint32_t batch_tail; unsigned long batch_history; /* For testing purpose */ uint64_t start_c __ALIGN; uint64_t stop_c; element_t data[SPSC_QUEUE_SIZE] __ALIGN; /* accessed by prod and coms */ } __ALIGN spsc_queue_t; ```