# 2023 Homework3 contributed by <[shlin41](https://github.com/shlin41)> > [題目描述](https://hackmd.io/@sysprog/linux2023-summer-quiz3) --- ## 挑戰 1: ### Q1: 解釋程式碼運作原理 #### 先從資料結構開始: * work_t 等同於 paper 的 closure * code: 就是要執行的 function * join_count: paper 中的 number of missing arguments。只是實作中更像一個 waiting counter(因為沒有未知參數) * args: function 要吃的參數 * deque_t 等同於 paper 的 ready queue * top, bottom: \[ ... \) 的設計 * array: 存放 work_t 的array #### 再來討論 functions: * void init(deque_t *q, int size_hint) * 對 deque_t 的初始化 * 預設開一個 size_hint 長的 deque_t.array * void resize(deque_t *q) * 把 deque_t.array 變成兩倍大 * 這存在 free 舊的 array 的問題;程式中沒有處理 * work_t *take(deque_t *q) * 從 deque_t.array 中拿一個 work_t 出來 * 從 bottom 端拿 * 只剩一個 work_t 時有一個我覺得蠻特殊的處理處理 ![](https://hackmd.io/_uploads/rkCx7Sf0n.png) * void push(deque_t *q, work_t *w) * 如果 deque_t.array 滿了,要作 resize * 放 bottom 端 * work_t *steal(deque *q) * 從 top 端拿 #### thread 相關 * void *thread(void *payload) * while 迴圈直到沒工作也偷不到 * 自己有工作就做 * do_work(id, my_work) * 自己沒工作就偷 * for(別人的 deque_t) 偷一個工作 * do_work(id, stolen_work) > 這裡的偷工作 for-loop 應該是個可以改進的地方 > 1. 至少沒有 random 選 > 2. abort 後還是留在原 deque_t,再偷一次:會 abort,應該是表示這個 deque_t 沒多少工作或者太多幫手,是否選別的 deque_t 更好? * void do_work(int id, work_t *work) * while-loop(do_one_work) 直到 sub-work 都完成 * work_t *do_one_work(int id, work_t *work) * 做完自己的工作 * 回傳下一個工作 * work_t *join_work(work_t *work) * 減少 work_t 的 join_count * 如果 join_count == 0,表示可以作這個工作了,可以 return work;否則 return NULL #### main function * step1: 為 N 個 thread 準備 N 組資料結構 ```cpp pthread_t threads[N] int tids[N] // 代表 thread 的 id deque_t thread_queues[N] ``` * step2: 準備一個結束的 done_work * code = done_work * join_count = N x nprints * args 沒有 * step3: 為每個 thread_queues 初始化 * push nprints 個 work_t * code = print_task * join_count = 0 * args[0] = 1000 x i + j * args[1] = done_work * step4: 啟動並 join N 個 threads * step5: 印出結果 #### 運作原理 * 每個 thread 一開始都有 npirnts 個 print_task 的工作 * 每執行一次 print_task,都會去 join_work 那裡幫 done_work.join_count - 1 * 直到 done_work.join_count == 0 時,join_work 會把 done_work 排進 do_one_work 的下一個執行 #### 心得: * paper 和程式的實作出入還蠻大的 * paper 是用 spawn 生 child-work,spwan_next 生 successor—work * 程式目前不生 child-work,利用參數帶起 successor-work * 程式並不會把 sub-work 放回 queue 中。當然,sub-work 只有 down_work() 這一個 * 這裡的 sub-work 包含 child-work 和 successor-work --- ### Q2: 指出 work-stealing 程式碼可改進之處並著手進行。 #### 1. 修改 take(): 原程式問題: * b = b - 1 太早做 * 如果 thread 自己的 deque 為空,直接去 take(),b - 1 會 overflow 變成 ffffffff (b 是無號數)。因此會進入迴圈 if(t<=b) 中。 * 使用 deque 時,要確保每個 thread 的 deque 都不是從空的開始,才能保持程式正確。 ```cpp /* 原本的 soruce code */ work_t *take(deque_t *q) { size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed) - 1; array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed); atomic_store_explicit(&q->bottom, b, memory_order_relaxed); atomic_thread_fence(memory_order_seq_cst); size_t t = atomic_load_explicit(&q->top, memory_order_relaxed); work_t *x; if (t <= b) { /* Non-empty queue */ x = atomic_load_explicit(&a->buffer[b % a->size], memory_order_relaxed); if (t == b) { /* Single last element in queue */ if (!atomic_compare_exchange_strong_explicit(&q->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed)) { /* Failed race */ x = EMPTY; } atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed); } } else { /* Empty queue */ x = EMPTY; atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed); } return x; } ``` 新程式修改: * 改變 b = b - 1 的位置:確認了 t < b 再進行 b - 1,如此才不會一開始 deque 為空時造成 overflow。 * 使用 deque 時,可以在 thread 的 deque 為空時執行程式。 ```cpp /* 新的 take() 修改 */ work_t *take(deque_t *q) { size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed); array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed); atomic_thread_fence(memory_order_seq_cst); size_t t = atomic_load_explicit(&q->top, memory_order_relaxed); work_t *x; if (t < b) { b = b - 1; /* Non-empty queue */ x = atomic_load_explicit(&a->buffer[b % a->size], memory_order_relaxed); if (t == b) { /* Single last element in queue */ if (!atomic_compare_exchange_strong_explicit(&q->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed)) { /* Failed race */ x = EMPTY; } atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed); } else { atomic_store_explicit(&q->bottom, b, memory_order_relaxed); } } else { /* Empty queue */ x = EMPTY; } return x; } ``` #### 2. 修改 thread() 裡面的 steal 方式 現在的 steal 是從 thread_0 到 thread_N 一個一個找。可以改成隨機。(未完成) ### Q3: 利用 work stealing 改寫第 1 次作業測驗 γ 的 Quick Sort Source Code: [ws_qsort](https://github.com/shlin41/ws_qsort) #### 想法: * Quick Sort 基本作法:選 pivot 後,做完第一輪 qsort compare and swap 的工作,整個 array 會被切成 part_1, part_2, part_3。然後 qsort 遞迴地繼續處理 part_1, part_3 * part_1 elements < pivot * part_2 elements == pivot * part_3 elements > pivot * 搭配 work stealing:把 part_1 和 part_3 丟回自己的 deque #### 實做: work_t 資料結構修改 * 增加一個 do_id 欄位:用來紀錄執行這個 work_t 的 thread 是誰,之後方便丟任務回對應的 deque ```cpp typedef struct work_internal { task_t code; atomic_int join_count; atomic_int do_id; void *args[]; } work_t; ``` int qsort_algo(int do_id, void *a, size_t n) 修改 * 參數意義: * int do_id: 要執行這個任務的 thread id * void \*a: sorting array 的起始 addr * size_t n: sorting array 的長度 * 回傳值: * 回傳已經排好了多少個:其實就是有幾個相同的 pivot value 在 array 中 * 運作方式: * qsort_algo() 主要會把 array 分成三部份 ```cpp /* 這邊是 pseudo code,不是 c language。看 source code 比較容易懂... */ array[n] == part1[nl] + part2[n-nl-nr] + part3[nr] // part1: 比 pivot 小的部份 // part2: 等於 pivot 的部份 (已完成排序) // part3: 比 pivot 大的部份 ``` * 把 part1 和 part3 丟回自己的 deque * 把 part2 的長度回傳:這部份等同排好了 程式結束條件: * 學習原本的 work-steal.c:利用 join_work 去 countdown * 原始 join_count = len(array) * 每次都 countdown part2 的回傳值 做了兩個版本: * v4: 使用原本的 take() * 由於舊的 take() 有之前講的問題,每個 deque 都事先塞一個 dummy_work 進去 * v5:使用改過的 take() * v5 不需要塞 dummy_work 偷工減料的部份:只做了數字部份的 qsort * -s: 沒做 * -l: 沒做 * -f: 沒做。會破壞設計,return 值難算 * -h: 能進 source code 改 N_THREADS ### Q4: 研讀 Linux 核心的 CMWQ,討論其 work stealing 的實作手法 (未完成) --- ## 挑戰 2: ### Q1: 用 C11 Atomics 改寫 mcslock 提供的 source code 已完成該部份修改 ### Q2: 核心原始程式碼 mcs_spinlock.h 的研讀 #### 定義 MCS spinlock ```cpp /* linux-6.5.6/kernel/locking/mcs_spinlock.h */ struct mcs_spinlock { struct mcs_spinlock *next; // 類似 my_ticket,競爭鎖的人形成的佇列 int locked; /* 1 if lock acquired */ // 本地的自旋變數,不再是全域 int count; /* nesting count, see qspinlock.c */ // 不確定用處 }; ``` #### MCS spin lock:上鎖 ```cpp /* linux-6.5.6/kernel/locking/mcs_spinlock.h */ /* 自身的佇列節點 node 需要自行在他處初始化,它將被排到 lock 指示的等鎖佇列中 */ void mcs_spin_lock(struct mcs_spinlock **lock, struct mcs_spinlock *node) { struct mcs_spinlock *prev; /* Init node */ node->locked = 0; node->next = NULL; // (atomic) 執行 *lock = node 並返回原来的 *lock prev = xchg(lock, node); if (likely(prev == NULL)) { // 底下這個是超有趣的說明:其實在這邊設定 node->locked = 1 比較有統一性 /* * Lock acquired, don't need to set node->locked to 1. Threads * only spin on its own node->locked value for lock acquisition. * However, since this thread can immediately acquire the lock * and does not proceed to spin on its own node->locked, this * value won't be used. If a debug mode is needed to * audit lock status, then set node->locked value here. */ return; } // (atomic) 執行 prev->next = node; // 這相當於一個排入佇列的操作。記為(*) WRITE_ONCE(prev->next, node); /* Wait until the lock holder passes the lock down. */ // 相當於 while (0 == node->locked) // /* spin-wait */; arch_mcs_spin_lock_contended(&node->locked); } ``` - [ ] 其中的 `arch_mcs_spin_lock_contended(&node->locked)` 是 archecture dependent - [ ] 走 arm 路線直接進 while-loop: ```cpp /* linux-6.5.6/arch/arm/include/asm/mcs_spinlock.h */ /* MCS spin-locking. */ #define arch_mcs_spin_lock_contended(lock) \ do { \ /* Ensure prior stores are observed before we enter wfe. */ \ smp_mb(); \ while (!(smp_load_acquire(lock))) \ wfe(); \ } while (0) ``` - [ ] 走一般 general 路線,在 v4.17 之前 ```cpp /* linux-6.5.6/kernel/locking/mcs_spinlock.h */ #define arch_mcs_spin_lock_contended(lock) \ do { \ while (!(smp_load_acquire(lock))) \ cpu_relax(); \ } while (0) ``` - [ ] 走一般 general 路線,在 v4.17 之後:看不太懂 ```cpp /* linux-6.5.6/kernel/locking/mcs_spinlock.h */ /* * Using smp_cond_load_acquire() provides the acquire semantics * required so that subsequent operations happen after the * lock is acquired. Additionally, some architectures such as * ARM64 would like to do spin-waiting instead of purely * spinning, and smp_cond_load_acquire() provides that behavior. */ #define arch_mcs_spin_lock_contended(lock) do { smp_cond_load_acquire(lock, VAL); } while (0) ``` #### MCS spin unlock:解鎖 (不同版本上都差不多) ```cpp /* linux-6.5.6/kernel/locking/mcs_spinlock.h */ /* 自己要傳入一個排入到 lock 的佇列中自身的節點物件 node * 解鎖操作即是 node 移出佇列並且主動將佇列中下一個物件的 locked 設定為 1 */ void mcs_spin_unlock(struct mcs_spinlock **lock, struct mcs_spinlock *node) { // (atomic) 持有 node 的下一個節點。 struct mcs_spinlock *next = READ_ONCE(node->next); /* 如果你是最後一個 */ if (likely(!next)) { // 再次確認:若 *lock == node,則 *lock = NULL 並返回。 // 說明自身是最後一個,無後續動作 if (likely(cmpxchg_release(lock, node, NULL) == node)) return; // 否則說明在 if 判斷 next 為 NULL 和 cmpxchg 之間有 node 插入。 // 随即等待其 mcs_spin_lock 呼叫完成,即上面 mcs_spin_lock 中的 (*) 那句完成以後才跳出。 // 相當於等待那個新插入的 node 完成工作。也就是說,本身不再是最後一個 while (!(next = READ_ONCE(node->next))) cpu_relax(); } // (atomic) 執行 next->locked = 1; arch_mcs_spin_unlock_contended(&next->locked); } ``` - [ ] 其中的 `arch_mcs_spin_unlock_contended(&node->locked)` 是 archecture dependent - [ ] 走 arm 路線直接進 while-loop: ```cpp #define arch_mcs_spin_unlock_contended(lock) \ do { \ smp_store_release(lock, 1); \ dsb_sev(); \ } while (0) ``` - [ ] 走一般 general 路線 ```cpp /* linux-6.5.6/kernel/locking/mcs_spinlock.h */ #define arch_mcs_spin_unlock_contended(lock) smp_store_release((lock), 1) #endif ``` #### MCS spinlock 設計的成功之處 * mcs_spin_lock() 僅修改 prev_node->next 欄位 * mcs_spin_unlock() 僅操作 next_node->locked 欄位 * prev_node 和 next_node 都屬於在某個 CPU 上自旋等待的 mcs spinlock 物件,++不會觸發全域的 cache coherence 刷新++! #### ==還看不懂的 source code 部份== * `smp_cond_load_acquire(x, VAL);` 裡面的 `VAL` 到底在哪邊?找不到? * 往下 trace 會有:READ_ONCE() and WRITE_ONCE() * 執行 atomic 且 member barrier * 但是,是哪種 barrier??? ### Q3:設計實驗來說明 mcslock 效益 (未完成) --- ## 挑戰 3: ### Q1: 用 C11 Atomics 改寫 seqlock Source Code: [seqlock_c11](https://github.com/shlin41/seqlock_shlin) * 宣告要 atomic 的資料結構: `typedef _Atomic(uint32_t) seqlock_t;` * 把程式中 __atomic(...) 的部份都改成 atomic(...):幾乎有一對一對應的程式 * 把 \*a = 1 改成用 atomic_init(a, 1) ### Q2:核心原始程式碼 seqlock.h 的研讀 #### Trace v6.5.6 implement of seqlock_t * 對照[挑戰3](https://hackmd.io/@sysprog/linux2023-summer-quiz3#%E6%8C%91%E6%88%B0-3)的解釋,原始碼的實作邏輯是一樣的。 * 多放了一些 barrier * smp_rmb() * READ_ONCE() * smp_wmb() * Read 端看不懂的 function * kcsan_atomic_next():下一個指令變 atomic ? * Write 端看不懂的 function (但成對出現) * kcsan_nestable_atomic_begin(), kcsan_nestable_atomic_end():形成一個 atomic regin ? * seqcount_acquire(), seqcount_release():形成 acquire-release 組合,為了什麼目的 ? #### Type Definition * 實作上直接內嵌 spinlock_t * 利用這個 seqcount 的序列號 (sequence number) 來確保資料的一致性 ```cpp /* linux-6.5.6/include/linux/seqlock.h */ typedef struct { /* * Make sure that readers don't starve writers on PREEMPT_RT: use * seqcount_spinlock_t instead of seqcount_t. Check __SEQ_LOCK(). */ seqcount_spinlock_t seqcount; spinlock_t lock; } seqlock_t; ``` ```cpp /* linux-6.5.6/include/linux/seqlock.h */ #define SEQCOUNT_LOCKNAME(lockname, locktype, preemptible, lockmember, lockbase, lock_acquire) \ typedef struct seqcount_##lockname { \ seqcount_t seqcount; \ __SEQ_LOCK(locktype *lock); \ } seqcount_##lockname##_t; SEQCOUNT_LOCKNAME(raw_spinlock, raw_spinlock_t, false, s->lock, raw_spin, raw_spin_lock(s->lock)) SEQCOUNT_LOCKNAME(spinlock, spinlock_t, __SEQ_RT, s->lock, spin, spin_lock(s->lock)) SEQCOUNT_LOCKNAME(rwlock, rwlock_t, __SEQ_RT, s->lock, read, read_lock(s->lock)) SEQCOUNT_LOCKNAME(mutex, struct mutex, true, s->lock, mutex, mutex_lock(s->lock)) ``` #### Read_seqcount_begin ```cpp /* linux-6.5.6/include/linux/seqlock.h */ #define read_seqcount_begin(s) \ ({ \ seqcount_lockdep_reader_access(seqprop_ptr(s)); \ raw_read_seqcount_begin(s); \ }) #define raw_read_seqcount_begin(s) \ ({ \ unsigned _seq = __read_seqcount_begin(s); \ \ smp_rmb(); \ _seq; \ }) #define __read_seqcount_begin(s) \ ({ \ unsigned __seq; \ \ // 等於 while((__seq = s->sequence) & 1) {...} while ((__seq = seqprop_sequence(s)) & 1) \ cpu_relax(); \ \ kcsan_atomic_next(KCSAN_SEQLOCK_REGION_MAX); \ __seq; \ }) /* 這一大段都是為了翻譯出 s->sequence * 由前面的定義看出:seqcount_raw_spinlock_t, seqcount_spinlock_t, seqcount_rwlock_t, seqcount_mutex_t 共用 read_seqcount_begin() 這個 MACRO */ #define __seqprop_case(s, lockname, prop) \ seqcount_##lockname##_t: __seqprop_##lockname##_##prop((void *)(s)) #define __seqprop(s, prop) _Generic(*(s), \ seqcount_t: __seqprop_##prop((void *)(s)), \ __seqprop_case((s), raw_spinlock, prop), \ __seqprop_case((s), spinlock, prop), \ __seqprop_case((s), rwlock, prop), \ __seqprop_case((s), mutex, prop)) #define seqprop_sequence(s) __seqprop(s, sequence) ``` #### Read_seqcount_retry ```cpp /* linux-6.5.6/include/linux/seqlock.h */ #define read_seqcount_retry(s, start) \ do_read_seqcount_retry(seqprop_ptr(s), start) static inline int do_read_seqcount_retry(const seqcount_t *s, unsigned start) { smp_rmb(); return do___read_seqcount_retry(s, start); } static inline int do___read_seqcount_retry(const seqcount_t *s, unsigned start) { kcsan_atomic_next(0); return unlikely(READ_ONCE(s->sequence) != start); } ``` #### Read 的用法: ```cpp seqcount_t seq; bool X = true, Y = false; void read(void) { bool x, y; do { int s = read_seqcount_begin(&seq); x = X; y = Y; } while (read_seqcount_retry(&seq, s)); BUG_ON(!x && !y); } ``` #### Write_seqlock ```cpp /* linux-6.5.6/include/linux/seqlock.h */ static inline void write_seqlock(seqlock_t *sl) { spin_lock(&sl->lock); do_write_seqcount_begin(&sl->seqcount.seqcount); } static inline void do_write_seqcount_begin(seqcount_t *s) { do_write_seqcount_begin_nested(s, 0); } static inline void do_write_seqcount_begin_nested(seqcount_t *s, int subclass) { seqcount_acquire(&s->dep_map, subclass, 0, _RET_IP_); do_raw_write_seqcount_begin(s); } static inline void do_raw_write_seqcount_begin(seqcount_t *s) { kcsan_nestable_atomic_begin(); s->sequence++; // 終於出現 sequence++ smp_wmb(); } ``` #### Write_sequnlock ```cpp /* linux-6.5.6/include/linux/seqlock.h */ static inline void write_sequnlock(seqlock_t *sl) { do_write_seqcount_end(&sl->seqcount.seqcount); spin_unlock(&sl->lock); } static inline void do_write_seqcount_end(seqcount_t *s) { seqcount_release(&s->dep_map, _RET_IP_); do_raw_write_seqcount_end(s); } static inline void do_raw_write_seqcount_end(seqcount_t *s) { smp_wmb(); s->sequence++; // 終於出現 sequence++ kcsan_nestable_atomic_end(); } ``` ### Q3:mpmc 及 spmc 運用 seqlock 改寫 #### seqlock 運用在 spmc 上有問題。目前沒有解法 原始想法: * 1 writer = 1 producer * N reader = N consumers * 修改 spmc_enqueue() 和 spmc_dequeue() * 消除程式中其他 atomic 或 memory_barrier 相關的呼叫 * 單純使用 seqlock.h 的呼叫達到所需的同步效果 問題: * spmc_enqueue() 沒問題 * ==spmc_dequeue() 有問題==: * spmc_dequeue() 做為 reader 的程式,不能有修改共享資源(spmc)的行為 * 但是實際上該程式會改動 spmc 的 member: node->front 和 spmc->curr_dequeue * consumer 的行為不是單純的讀,而是把貨物==拿走== #### spmc 本身的 bug: ```cpp static size_t observed_count[N_MC_ITEMS + 1]; ``` * 在 mc_thread() 中,讀取 observed_count[element] * 其中 element 會超過 N_MC_ITEMS * 因為 termination 的技巧,會 enqueue 數值過大的 element * 修正: * 第 9 行:確認 element < N_MC_ITEMS 後,在去 observed_count[element]++ * 第 18 行:中止 element 一律塞 N_MC_ITEMS ```cpp= static void *mc_thread(void *arg) { spmc_ref_t spmc = arg; uintptr_t element = 0, greatest = 0; for (;;) { greatest = (greatest > element) ? greatest : element; if (!spmc_dequeue(spmc, &element)) fprintf(stderr, "Failed to dequeue in mc_thread.\n"); else if ((element < N_MC_ITEMS) && (observed_count[element]++)) fprintf(stderr, "Consumed twice: %zu!\n", (size_t)element); else if (element < greatest) fprintf(stderr, "%zu after %zu; bad order!\n", (size_t)element, (size_t)greatest); printf("Observed %zu.\n", (size_t)element); /* Test for sentinel signalling termination */ if (element >= (N_MC_ITEMS - 1)) { spmc_enqueue(spmc, N_MC_ITEMS); /* notify other threads */ break; } } return NULL; } ``` #### mpmc 單純心得筆記: 1. pthread_t 可以重複利用 ```cpp pthread_t pids[N_THREADS]; for (int i = 0; i < N_THREADS; ++i) { if (-1 == pthread_create(&pids[i], NULL, producer, (void *) (intptr_t) i) || -1 == pthread_create(&pids[i], NULL, consumer, (void *) (intptr_t) i)) { printf("error create thread\n"); exit(1); } } ``` 2. cells[i] 的值設計 * 0: 表示 NULL * 1 ~ 4 x 2500000:表示有貨物 * 如果是放 futex_addr 的位址: 表示消費者先到,生產者要喚醒對 --- ## 挑戰 4: ### Q1:說明 cmap 運作原理。指出無法通過 ThreadSanitizer 的原因並修正。 #### 修正版:[cmap_quiz1](https://github.com/shlin41/cmap_quiz1.git) #### 看了發生 race condition 的原因: * 主要發生在 impl->count 這個變數身上 * Log sample: ```cpp ================== WARNING: ThreadSanitizer: data race (pid=6711) Read of size 8 at 0x7bb400000008 by main thread: #0 cmap_count__ /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/cmap.c:136 (test-cmap+0x342e) #1 cmap_size /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/cmap.c:152 (test-cmap+0x342e) #2 main /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/test-cmap.c:211 (test-cmap+0x29d1) Previous write of size 8 at 0x7bb400000008 by thread T4: #0 cmap_insert /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/cmap.c:163 (test-cmap+0x3537) #1 insert_value /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/test-cmap.c:41 (test-cmap+0x4681) #2 update_cmap /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/test-cmap.c:122 (test-cmap+0x4713) Location is heap block of size 32904 at 0x7bb400000000 allocated by thread T4: #0 malloc ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:655 (libtsan.so.0+0x31c57) #1 xmalloc /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/util.h:145 (test-cmap+0x2eff) #2 cmap_impl_init /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/cmap.c:53 (test-cmap+0x2eff) #3 cmap_expand /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/cmap.c:101 (test-cmap+0x369e) #4 cmap_insert /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/cmap.c:169 (test-cmap+0x369e) #5 insert_value /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/test-cmap.c:41 (test-cmap+0x4681) #6 update_cmap /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/test-cmap.c:122 (test-cmap+0x4713) Thread T4 (tid=6716, running) created by main thread at: #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8) #1 main /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/test-cmap.c:199 (test-cmap+0x28eb) SUMMARY: ThreadSanitizer: data race /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/cmap.c:136 in cmap_count__ ``` #### 以最少修改通共 ThreadSanitizer: * 讓有影響的 impl->count 讀寫變成 atomic * main() 這個主 thread 會讀 impl->count * write_cmap() 這個 writer thread 會讀寫 impl->count * read_cmap() 這個 reader thread 並不會用到 impl->count * 並不需要所有 impl->count 都 atomic 處理 * main() 讀的部份 atomic 起來 * write() 寫的部份 atomic 起來 * cmap.c diff: ```cpp diff -r cmap_org/cmap.c cmap_quiz1/cmap.c 136c136 < size_t count = impl->count; --- > size_t count = (size_t) atomic_load(&impl->count); // (modify) 163c163 < impl->count++; --- > atomic_fetch_add(&impl->count, 1); // (modify) 191c191 < impl->count = count; --- > atomic_store(&impl->count, count); // (modify) ``` #### 心得: * 修改完的程式基本不再出現 data race warning。 * 在這個範例,應該能排除 data race * 僅以 trace code 的方式,reader 的部份都有保護到。 * 在最保險的情況下,所有 share data 都進行 atomic 處理會更好。 * ==會有效能上的損失== * 如果用 memory_order_relaxed 也許會好一些 * 因為是 RCU (one writer + N readers),writer 的 function 不需要特別去處理 atomic。畢竟 writer 只有一個。 * reader 的部份,在呼叫 cmap_find__()時,有一個 cond_is_locked() 的檢查 * 表示可以去讀的時候,物件已經準備好了。過程中 writer 也不會去改已經存在的物件 * 不需要特別處理 atomic 也沒關係 ### Q2:解釋 utilization 計算基準,並探討如何進一步提升 throughput/scalability #### Ans: * impl->utilization 用來紀錄有被使用到的 impl->arr[i] 數量 (即 impl->arr[i] != NULL 的數量) * cmap_utilization() 回傳使用到的 impl->arr[i] 比例 * 這個值高,表示 hash 得很好? * impl->utilization / impl->count 可以更直覺表示 hash 得好 * 看了 [RinHizakura](https://hackmd.io/@RinHizakura/linux2023-summer-hw3#%E6%8C%91%E6%88%B0-4) 的開發紀錄和 comments: * camp_remove() 時,應該做對應的處理 * concurrent access 如何量化? ### Q3:嘗試執行 [kernel-module-rcu-test](https://github.com/sbcd90/kernel-module-rcu-test) 並將上述程式碼移植到 Linux 核心 #### Insmod Problem: * insmod 時出現 Key was rejected by service * 網路上說是 Secure Boot 的問題 * [解決方案](https://blog.csdn.net/hydront/article/details/130280075) * [改回來的方法](https://askubuntu.com/questions/726052/ubuntu-booting-in-insecure-mode-with-secureboot-enabled) #### [Kernel Driver Practice](https://github.com/Embetronicx/Tutorials/tree/master/Linux/Device_Driver) Donelist 1. mutex 2. spinlock -- doing