# 2023 Homework3
contributed by <[shlin41](https://github.com/shlin41)>
> [題目描述](https://hackmd.io/@sysprog/linux2023-summer-quiz3)
---
## 挑戰 1:
### Q1: 解釋程式碼運作原理
#### 先從資料結構開始:
* work_t 等同於 paper 的 closure
* code: 就是要執行的 function
* join_count: paper 中的 number of missing arguments。只是實作中更像一個 waiting counter(因為沒有未知參數)
* args: function 要吃的參數
* deque_t 等同於 paper 的 ready queue
* top, bottom: \[ ... \) 的設計
* array: 存放 work_t 的array
#### 再來討論 functions:
* void init(deque_t *q, int size_hint)
* 對 deque_t 的初始化
* 預設開一個 size_hint 長的 deque_t.array
* void resize(deque_t *q)
* 把 deque_t.array 變成兩倍大
* 這存在 free 舊的 array 的問題;程式中沒有處理
* work_t *take(deque_t *q)
* 從 deque_t.array 中拿一個 work_t 出來
* 從 bottom 端拿
* 只剩一個 work_t 時有一個我覺得蠻特殊的處理處理

* void push(deque_t *q, work_t *w)
* 如果 deque_t.array 滿了,要作 resize
* 放 bottom 端
* work_t *steal(deque *q)
* 從 top 端拿
#### thread 相關
* void *thread(void *payload)
* while 迴圈直到沒工作也偷不到
* 自己有工作就做
* do_work(id, my_work)
* 自己沒工作就偷
* for(別人的 deque_t) 偷一個工作
* do_work(id, stolen_work)
> 這裡的偷工作 for-loop 應該是個可以改進的地方
> 1. 至少沒有 random 選
> 2. abort 後還是留在原 deque_t,再偷一次:會 abort,應該是表示這個 deque_t 沒多少工作或者太多幫手,是否選別的 deque_t 更好?
* void do_work(int id, work_t *work)
* while-loop(do_one_work) 直到 sub-work 都完成
* work_t *do_one_work(int id, work_t *work)
* 做完自己的工作
* 回傳下一個工作
* work_t *join_work(work_t *work)
* 減少 work_t 的 join_count
* 如果 join_count == 0,表示可以作這個工作了,可以 return work;否則 return NULL
#### main function
* step1: 為 N 個 thread 準備 N 組資料結構
```cpp
pthread_t threads[N]
int tids[N] // 代表 thread 的 id
deque_t thread_queues[N]
```
* step2: 準備一個結束的 done_work
* code = done_work
* join_count = N x nprints
* args 沒有
* step3: 為每個 thread_queues 初始化
* push nprints 個 work_t
* code = print_task
* join_count = 0
* args[0] = 1000 x i + j
* args[1] = done_work
* step4: 啟動並 join N 個 threads
* step5: 印出結果
#### 運作原理
* 每個 thread 一開始都有 npirnts 個 print_task 的工作
* 每執行一次 print_task,都會去 join_work 那裡幫 done_work.join_count - 1
* 直到 done_work.join_count == 0 時,join_work 會把 done_work 排進 do_one_work 的下一個執行
#### 心得:
* paper 和程式的實作出入還蠻大的
* paper 是用 spawn 生 child-work,spwan_next 生 successor—work
* 程式目前不生 child-work,利用參數帶起 successor-work
* 程式並不會把 sub-work 放回 queue 中。當然,sub-work 只有 down_work() 這一個
* 這裡的 sub-work 包含 child-work 和 successor-work
---
### Q2: 指出 work-stealing 程式碼可改進之處並著手進行。
#### 1. 修改 take():
原程式問題:
* b = b - 1 太早做
* 如果 thread 自己的 deque 為空,直接去 take(),b - 1 會 overflow 變成 ffffffff (b 是無號數)。因此會進入迴圈 if(t<=b) 中。
* 使用 deque 時,要確保每個 thread 的 deque 都不是從空的開始,才能保持程式正確。
```cpp
/* 原本的 soruce code */
work_t *take(deque_t *q) {
size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed) - 1;
array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed);
atomic_store_explicit(&q->bottom, b, memory_order_relaxed);
atomic_thread_fence(memory_order_seq_cst);
size_t t = atomic_load_explicit(&q->top, memory_order_relaxed);
work_t *x;
if (t <= b) {
/* Non-empty queue */
x = atomic_load_explicit(&a->buffer[b % a->size], memory_order_relaxed);
if (t == b) {
/* Single last element in queue */
if (!atomic_compare_exchange_strong_explicit(&q->top, &t, t + 1,
memory_order_seq_cst,
memory_order_relaxed)) {
/* Failed race */
x = EMPTY;
}
atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed);
}
} else {
/* Empty queue */
x = EMPTY;
atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed);
}
return x;
}
```
新程式修改:
* 改變 b = b - 1 的位置:確認了 t < b 再進行 b - 1,如此才不會一開始 deque 為空時造成 overflow。
* 使用 deque 時,可以在 thread 的 deque 為空時執行程式。
```cpp
/* 新的 take() 修改 */
work_t *take(deque_t *q)
{
size_t b = atomic_load_explicit(&q->bottom, memory_order_relaxed);
array_t *a = atomic_load_explicit(&q->array, memory_order_relaxed);
atomic_thread_fence(memory_order_seq_cst);
size_t t = atomic_load_explicit(&q->top, memory_order_relaxed);
work_t *x;
if (t < b) {
b = b - 1;
/* Non-empty queue */
x = atomic_load_explicit(&a->buffer[b % a->size], memory_order_relaxed);
if (t == b) {
/* Single last element in queue */
if (!atomic_compare_exchange_strong_explicit(&q->top, &t, t + 1,
memory_order_seq_cst,
memory_order_relaxed)) {
/* Failed race */
x = EMPTY;
}
atomic_store_explicit(&q->bottom, b + 1, memory_order_relaxed);
} else {
atomic_store_explicit(&q->bottom, b, memory_order_relaxed);
}
} else {
/* Empty queue */
x = EMPTY;
}
return x;
}
```
#### 2. 修改 thread() 裡面的 steal 方式
現在的 steal 是從 thread_0 到 thread_N 一個一個找。可以改成隨機。(未完成)
### Q3: 利用 work stealing 改寫第 1 次作業測驗 γ 的 Quick Sort
Source Code: [ws_qsort](https://github.com/shlin41/ws_qsort)
#### 想法:
* Quick Sort 基本作法:選 pivot 後,做完第一輪 qsort compare and swap 的工作,整個 array 會被切成 part_1, part_2, part_3。然後 qsort 遞迴地繼續處理 part_1, part_3
* part_1 elements < pivot
* part_2 elements == pivot
* part_3 elements > pivot
* 搭配 work stealing:把 part_1 和 part_3 丟回自己的 deque
#### 實做:
work_t 資料結構修改
* 增加一個 do_id 欄位:用來紀錄執行這個 work_t 的 thread 是誰,之後方便丟任務回對應的 deque
```cpp
typedef struct work_internal {
task_t code;
atomic_int join_count;
atomic_int do_id;
void *args[];
} work_t;
```
int qsort_algo(int do_id, void *a, size_t n) 修改
* 參數意義:
* int do_id: 要執行這個任務的 thread id
* void \*a: sorting array 的起始 addr
* size_t n: sorting array 的長度
* 回傳值:
* 回傳已經排好了多少個:其實就是有幾個相同的 pivot value 在 array 中
* 運作方式:
* qsort_algo() 主要會把 array 分成三部份
```cpp
/* 這邊是 pseudo code,不是 c language。看 source code 比較容易懂... */
array[n] == part1[nl] + part2[n-nl-nr] + part3[nr]
// part1: 比 pivot 小的部份
// part2: 等於 pivot 的部份 (已完成排序)
// part3: 比 pivot 大的部份
```
* 把 part1 和 part3 丟回自己的 deque
* 把 part2 的長度回傳:這部份等同排好了
程式結束條件:
* 學習原本的 work-steal.c:利用 join_work 去 countdown
* 原始 join_count = len(array)
* 每次都 countdown part2 的回傳值
做了兩個版本:
* v4: 使用原本的 take()
* 由於舊的 take() 有之前講的問題,每個 deque 都事先塞一個 dummy_work 進去
* v5:使用改過的 take()
* v5 不需要塞 dummy_work
偷工減料的部份:只做了數字部份的 qsort
* -s: 沒做
* -l: 沒做
* -f: 沒做。會破壞設計,return 值難算
* -h: 能進 source code 改 N_THREADS
### Q4: 研讀 Linux 核心的 CMWQ,討論其 work stealing 的實作手法
(未完成)
---
## 挑戰 2:
### Q1: 用 C11 Atomics 改寫 mcslock
提供的 source code 已完成該部份修改
### Q2: 核心原始程式碼 mcs_spinlock.h 的研讀
#### 定義 MCS spinlock
```cpp
/* linux-6.5.6/kernel/locking/mcs_spinlock.h */
struct mcs_spinlock {
struct mcs_spinlock *next; // 類似 my_ticket,競爭鎖的人形成的佇列
int locked; /* 1 if lock acquired */ // 本地的自旋變數,不再是全域
int count; /* nesting count, see qspinlock.c */ // 不確定用處
};
```
#### MCS spin lock:上鎖
```cpp
/* linux-6.5.6/kernel/locking/mcs_spinlock.h */
/* 自身的佇列節點 node 需要自行在他處初始化,它將被排到 lock 指示的等鎖佇列中 */
void mcs_spin_lock(struct mcs_spinlock **lock, struct mcs_spinlock *node) {
struct mcs_spinlock *prev;
/* Init node */
node->locked = 0;
node->next = NULL;
// (atomic) 執行 *lock = node 並返回原来的 *lock
prev = xchg(lock, node);
if (likely(prev == NULL)) {
// 底下這個是超有趣的說明:其實在這邊設定 node->locked = 1 比較有統一性
/*
* Lock acquired, don't need to set node->locked to 1. Threads
* only spin on its own node->locked value for lock acquisition.
* However, since this thread can immediately acquire the lock
* and does not proceed to spin on its own node->locked, this
* value won't be used. If a debug mode is needed to
* audit lock status, then set node->locked value here.
*/
return;
}
// (atomic) 執行 prev->next = node;
// 這相當於一個排入佇列的操作。記為(*)
WRITE_ONCE(prev->next, node);
/* Wait until the lock holder passes the lock down. */
// 相當於 while (0 == node->locked)
// /* spin-wait */;
arch_mcs_spin_lock_contended(&node->locked);
}
```
- [ ] 其中的 `arch_mcs_spin_lock_contended(&node->locked)` 是 archecture dependent
- [ ] 走 arm 路線直接進 while-loop:
```cpp
/* linux-6.5.6/arch/arm/include/asm/mcs_spinlock.h */
/* MCS spin-locking. */
#define arch_mcs_spin_lock_contended(lock) \
do { \
/* Ensure prior stores are observed before we enter wfe. */ \
smp_mb(); \
while (!(smp_load_acquire(lock))) \
wfe(); \
} while (0)
```
- [ ] 走一般 general 路線,在 v4.17 之前
```cpp
/* linux-6.5.6/kernel/locking/mcs_spinlock.h */
#define arch_mcs_spin_lock_contended(lock) \
do { \
while (!(smp_load_acquire(lock))) \
cpu_relax(); \
} while (0)
```
- [ ] 走一般 general 路線,在 v4.17 之後:看不太懂
```cpp
/* linux-6.5.6/kernel/locking/mcs_spinlock.h */
/*
* Using smp_cond_load_acquire() provides the acquire semantics
* required so that subsequent operations happen after the
* lock is acquired. Additionally, some architectures such as
* ARM64 would like to do spin-waiting instead of purely
* spinning, and smp_cond_load_acquire() provides that behavior.
*/
#define arch_mcs_spin_lock_contended(lock)
do {
smp_cond_load_acquire(lock, VAL);
} while (0)
```
#### MCS spin unlock:解鎖 (不同版本上都差不多)
```cpp
/* linux-6.5.6/kernel/locking/mcs_spinlock.h */
/* 自己要傳入一個排入到 lock 的佇列中自身的節點物件 node
* 解鎖操作即是 node 移出佇列並且主動將佇列中下一個物件的 locked 設定為 1
*/
void mcs_spin_unlock(struct mcs_spinlock **lock, struct mcs_spinlock *node) {
// (atomic) 持有 node 的下一個節點。
struct mcs_spinlock *next = READ_ONCE(node->next);
/* 如果你是最後一個 */
if (likely(!next)) {
// 再次確認:若 *lock == node,則 *lock = NULL 並返回。
// 說明自身是最後一個,無後續動作
if (likely(cmpxchg_release(lock, node, NULL) == node))
return;
// 否則說明在 if 判斷 next 為 NULL 和 cmpxchg 之間有 node 插入。
// 随即等待其 mcs_spin_lock 呼叫完成,即上面 mcs_spin_lock 中的 (*) 那句完成以後才跳出。
// 相當於等待那個新插入的 node 完成工作。也就是說,本身不再是最後一個
while (!(next = READ_ONCE(node->next)))
cpu_relax();
}
// (atomic) 執行 next->locked = 1;
arch_mcs_spin_unlock_contended(&next->locked);
}
```
- [ ] 其中的 `arch_mcs_spin_unlock_contended(&node->locked)` 是 archecture dependent
- [ ] 走 arm 路線直接進 while-loop:
```cpp
#define arch_mcs_spin_unlock_contended(lock) \
do { \
smp_store_release(lock, 1); \
dsb_sev(); \
} while (0)
```
- [ ] 走一般 general 路線
```cpp
/* linux-6.5.6/kernel/locking/mcs_spinlock.h */
#define arch_mcs_spin_unlock_contended(lock)
smp_store_release((lock), 1)
#endif
```
#### MCS spinlock 設計的成功之處
* mcs_spin_lock() 僅修改 prev_node->next 欄位
* mcs_spin_unlock() 僅操作 next_node->locked 欄位
* prev_node 和 next_node 都屬於在某個 CPU 上自旋等待的 mcs spinlock 物件,++不會觸發全域的 cache coherence 刷新++!
#### ==還看不懂的 source code 部份==
* `smp_cond_load_acquire(x, VAL);` 裡面的 `VAL` 到底在哪邊?找不到?
* 往下 trace 會有:READ_ONCE() and WRITE_ONCE()
* 執行 atomic 且 member barrier
* 但是,是哪種 barrier???
### Q3:設計實驗來說明 mcslock 效益
(未完成)
---
## 挑戰 3:
### Q1: 用 C11 Atomics 改寫 seqlock
Source Code: [seqlock_c11](https://github.com/shlin41/seqlock_shlin)
* 宣告要 atomic 的資料結構:
`typedef _Atomic(uint32_t) seqlock_t;`
* 把程式中 __atomic(...) 的部份都改成 atomic(...):幾乎有一對一對應的程式
* 把 \*a = 1 改成用 atomic_init(a, 1)
### Q2:核心原始程式碼 seqlock.h 的研讀
#### Trace v6.5.6 implement of seqlock_t
* 對照[挑戰3](https://hackmd.io/@sysprog/linux2023-summer-quiz3#%E6%8C%91%E6%88%B0-3)的解釋,原始碼的實作邏輯是一樣的。
* 多放了一些 barrier
* smp_rmb()
* READ_ONCE()
* smp_wmb()
* Read 端看不懂的 function
* kcsan_atomic_next():下一個指令變 atomic ?
* Write 端看不懂的 function (但成對出現)
* kcsan_nestable_atomic_begin(), kcsan_nestable_atomic_end():形成一個 atomic regin ?
* seqcount_acquire(), seqcount_release():形成 acquire-release 組合,為了什麼目的 ?
#### Type Definition
* 實作上直接內嵌 spinlock_t
* 利用這個 seqcount 的序列號 (sequence number) 來確保資料的一致性
```cpp
/* linux-6.5.6/include/linux/seqlock.h */
typedef struct {
/*
* Make sure that readers don't starve writers on PREEMPT_RT: use
* seqcount_spinlock_t instead of seqcount_t. Check __SEQ_LOCK().
*/
seqcount_spinlock_t seqcount;
spinlock_t lock;
} seqlock_t;
```
```cpp
/* linux-6.5.6/include/linux/seqlock.h */
#define SEQCOUNT_LOCKNAME(lockname, locktype, preemptible, lockmember, lockbase, lock_acquire) \
typedef struct seqcount_##lockname { \
seqcount_t seqcount; \
__SEQ_LOCK(locktype *lock); \
} seqcount_##lockname##_t;
SEQCOUNT_LOCKNAME(raw_spinlock, raw_spinlock_t, false, s->lock, raw_spin, raw_spin_lock(s->lock))
SEQCOUNT_LOCKNAME(spinlock, spinlock_t, __SEQ_RT, s->lock, spin, spin_lock(s->lock))
SEQCOUNT_LOCKNAME(rwlock, rwlock_t, __SEQ_RT, s->lock, read, read_lock(s->lock))
SEQCOUNT_LOCKNAME(mutex, struct mutex, true, s->lock, mutex, mutex_lock(s->lock))
```
#### Read_seqcount_begin
```cpp
/* linux-6.5.6/include/linux/seqlock.h */
#define read_seqcount_begin(s) \
({ \
seqcount_lockdep_reader_access(seqprop_ptr(s)); \
raw_read_seqcount_begin(s); \
})
#define raw_read_seqcount_begin(s) \
({ \
unsigned _seq = __read_seqcount_begin(s); \
\
smp_rmb(); \
_seq; \
})
#define __read_seqcount_begin(s) \
({ \
unsigned __seq; \
\
// 等於 while((__seq = s->sequence) & 1) {...}
while ((__seq = seqprop_sequence(s)) & 1) \
cpu_relax(); \
\
kcsan_atomic_next(KCSAN_SEQLOCK_REGION_MAX); \
__seq; \
})
/* 這一大段都是為了翻譯出 s->sequence
* 由前面的定義看出:seqcount_raw_spinlock_t, seqcount_spinlock_t, seqcount_rwlock_t, seqcount_mutex_t 共用 read_seqcount_begin() 這個 MACRO
*/
#define __seqprop_case(s, lockname, prop) \
seqcount_##lockname##_t: __seqprop_##lockname##_##prop((void *)(s))
#define __seqprop(s, prop) _Generic(*(s), \
seqcount_t: __seqprop_##prop((void *)(s)), \
__seqprop_case((s), raw_spinlock, prop), \
__seqprop_case((s), spinlock, prop), \
__seqprop_case((s), rwlock, prop), \
__seqprop_case((s), mutex, prop))
#define seqprop_sequence(s) __seqprop(s, sequence)
```
#### Read_seqcount_retry
```cpp
/* linux-6.5.6/include/linux/seqlock.h */
#define read_seqcount_retry(s, start) \
do_read_seqcount_retry(seqprop_ptr(s), start)
static inline int do_read_seqcount_retry(const seqcount_t *s, unsigned start)
{
smp_rmb();
return do___read_seqcount_retry(s, start);
}
static inline int do___read_seqcount_retry(const seqcount_t *s, unsigned start)
{
kcsan_atomic_next(0);
return unlikely(READ_ONCE(s->sequence) != start);
}
```
#### Read 的用法:
```cpp
seqcount_t seq;
bool X = true, Y = false;
void read(void)
{
bool x, y;
do {
int s = read_seqcount_begin(&seq);
x = X; y = Y;
} while (read_seqcount_retry(&seq, s));
BUG_ON(!x && !y);
}
```
#### Write_seqlock
```cpp
/* linux-6.5.6/include/linux/seqlock.h */
static inline void write_seqlock(seqlock_t *sl)
{
spin_lock(&sl->lock);
do_write_seqcount_begin(&sl->seqcount.seqcount);
}
static inline void do_write_seqcount_begin(seqcount_t *s)
{
do_write_seqcount_begin_nested(s, 0);
}
static inline void do_write_seqcount_begin_nested(seqcount_t *s, int subclass)
{
seqcount_acquire(&s->dep_map, subclass, 0, _RET_IP_);
do_raw_write_seqcount_begin(s);
}
static inline void do_raw_write_seqcount_begin(seqcount_t *s)
{
kcsan_nestable_atomic_begin();
s->sequence++; // 終於出現 sequence++
smp_wmb();
}
```
#### Write_sequnlock
```cpp
/* linux-6.5.6/include/linux/seqlock.h */
static inline void write_sequnlock(seqlock_t *sl)
{
do_write_seqcount_end(&sl->seqcount.seqcount);
spin_unlock(&sl->lock);
}
static inline void do_write_seqcount_end(seqcount_t *s)
{
seqcount_release(&s->dep_map, _RET_IP_);
do_raw_write_seqcount_end(s);
}
static inline void do_raw_write_seqcount_end(seqcount_t *s)
{
smp_wmb();
s->sequence++; // 終於出現 sequence++
kcsan_nestable_atomic_end();
}
```
### Q3:mpmc 及 spmc 運用 seqlock 改寫
#### seqlock 運用在 spmc 上有問題。目前沒有解法
原始想法:
* 1 writer = 1 producer
* N reader = N consumers
* 修改 spmc_enqueue() 和 spmc_dequeue()
* 消除程式中其他 atomic 或 memory_barrier 相關的呼叫
* 單純使用 seqlock.h 的呼叫達到所需的同步效果
問題:
* spmc_enqueue() 沒問題
* ==spmc_dequeue() 有問題==:
* spmc_dequeue() 做為 reader 的程式,不能有修改共享資源(spmc)的行為
* 但是實際上該程式會改動 spmc 的 member: node->front 和 spmc->curr_dequeue
* consumer 的行為不是單純的讀,而是把貨物==拿走==
#### spmc 本身的 bug:
```cpp
static size_t observed_count[N_MC_ITEMS + 1];
```
* 在 mc_thread() 中,讀取 observed_count[element]
* 其中 element 會超過 N_MC_ITEMS
* 因為 termination 的技巧,會 enqueue 數值過大的 element
* 修正:
* 第 9 行:確認 element < N_MC_ITEMS 後,在去 observed_count[element]++
* 第 18 行:中止 element 一律塞 N_MC_ITEMS
```cpp=
static void *mc_thread(void *arg) {
spmc_ref_t spmc = arg;
uintptr_t element = 0, greatest = 0;
for (;;) {
greatest = (greatest > element) ? greatest : element;
if (!spmc_dequeue(spmc, &element))
fprintf(stderr, "Failed to dequeue in mc_thread.\n");
else if ((element < N_MC_ITEMS) && (observed_count[element]++))
fprintf(stderr, "Consumed twice: %zu!\n", (size_t)element);
else if (element < greatest)
fprintf(stderr, "%zu after %zu; bad order!\n", (size_t)element, (size_t)greatest);
printf("Observed %zu.\n", (size_t)element);
/* Test for sentinel signalling termination */
if (element >= (N_MC_ITEMS - 1)) {
spmc_enqueue(spmc, N_MC_ITEMS); /* notify other threads */
break;
}
}
return NULL;
}
```
#### mpmc 單純心得筆記:
1. pthread_t 可以重複利用
```cpp
pthread_t pids[N_THREADS];
for (int i = 0; i < N_THREADS; ++i) {
if (-1 == pthread_create(&pids[i], NULL, producer,
(void *) (intptr_t) i) ||
-1 == pthread_create(&pids[i], NULL, consumer,
(void *) (intptr_t) i)) {
printf("error create thread\n");
exit(1);
}
}
```
2. cells[i] 的值設計
* 0: 表示 NULL
* 1 ~ 4 x 2500000:表示有貨物
* 如果是放 futex_addr 的位址: 表示消費者先到,生產者要喚醒對
---
## 挑戰 4:
### Q1:說明 cmap 運作原理。指出無法通過 ThreadSanitizer 的原因並修正。
#### 修正版:[cmap_quiz1](https://github.com/shlin41/cmap_quiz1.git)
#### 看了發生 race condition 的原因:
* 主要發生在 impl->count 這個變數身上
* Log sample:
```cpp
==================
WARNING: ThreadSanitizer: data race (pid=6711)
Read of size 8 at 0x7bb400000008 by main thread:
#0 cmap_count__ /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/cmap.c:136 (test-cmap+0x342e)
#1 cmap_size /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/cmap.c:152 (test-cmap+0x342e)
#2 main /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/test-cmap.c:211 (test-cmap+0x29d1)
Previous write of size 8 at 0x7bb400000008 by thread T4:
#0 cmap_insert /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/cmap.c:163 (test-cmap+0x3537)
#1 insert_value /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/test-cmap.c:41 (test-cmap+0x4681)
#2 update_cmap /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/test-cmap.c:122 (test-cmap+0x4713)
Location is heap block of size 32904 at 0x7bb400000000 allocated by thread T4:
#0 malloc ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:655 (libtsan.so.0+0x31c57)
#1 xmalloc /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/util.h:145 (test-cmap+0x2eff)
#2 cmap_impl_init /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/cmap.c:53 (test-cmap+0x2eff)
#3 cmap_expand /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/cmap.c:101 (test-cmap+0x369e)
#4 cmap_insert /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/cmap.c:169 (test-cmap+0x369e)
#5 insert_value /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/test-cmap.c:41 (test-cmap+0x4681)
#6 update_cmap /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/test-cmap.c:122 (test-cmap+0x4713)
Thread T4 (tid=6716, running) created by main thread at:
#0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:969 (libtsan.so.0+0x605b8)
#1 main /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/test-cmap.c:199 (test-cmap+0x28eb)
SUMMARY: ThreadSanitizer: data race /home/asrd/.temp/SummerVacation/Homework_3/CMap/cmap_org/cmap.c:136 in cmap_count__
```
#### 以最少修改通共 ThreadSanitizer:
* 讓有影響的 impl->count 讀寫變成 atomic
* main() 這個主 thread 會讀 impl->count
* write_cmap() 這個 writer thread 會讀寫 impl->count
* read_cmap() 這個 reader thread 並不會用到 impl->count
* 並不需要所有 impl->count 都 atomic 處理
* main() 讀的部份 atomic 起來
* write() 寫的部份 atomic 起來
* cmap.c diff:
```cpp
diff -r cmap_org/cmap.c cmap_quiz1/cmap.c
136c136
< size_t count = impl->count;
---
> size_t count = (size_t) atomic_load(&impl->count); // (modify)
163c163
< impl->count++;
---
> atomic_fetch_add(&impl->count, 1); // (modify)
191c191
< impl->count = count;
---
> atomic_store(&impl->count, count); // (modify)
```
#### 心得:
* 修改完的程式基本不再出現 data race warning。
* 在這個範例,應該能排除 data race
* 僅以 trace code 的方式,reader 的部份都有保護到。
* 在最保險的情況下,所有 share data 都進行 atomic 處理會更好。
* ==會有效能上的損失==
* 如果用 memory_order_relaxed 也許會好一些
* 因為是 RCU (one writer + N readers),writer 的 function 不需要特別去處理 atomic。畢竟 writer 只有一個。
* reader 的部份,在呼叫 cmap_find__()時,有一個 cond_is_locked() 的檢查
* 表示可以去讀的時候,物件已經準備好了。過程中 writer 也不會去改已經存在的物件
* 不需要特別處理 atomic 也沒關係
### Q2:解釋 utilization 計算基準,並探討如何進一步提升 throughput/scalability
#### Ans:
* impl->utilization 用來紀錄有被使用到的 impl->arr[i] 數量 (即 impl->arr[i] != NULL 的數量)
* cmap_utilization() 回傳使用到的 impl->arr[i] 比例
* 這個值高,表示 hash 得很好?
* impl->utilization / impl->count 可以更直覺表示 hash 得好
* 看了 [RinHizakura](https://hackmd.io/@RinHizakura/linux2023-summer-hw3#%E6%8C%91%E6%88%B0-4) 的開發紀錄和 comments:
* camp_remove() 時,應該做對應的處理
* concurrent access 如何量化?
### Q3:嘗試執行 [kernel-module-rcu-test](https://github.com/sbcd90/kernel-module-rcu-test) 並將上述程式碼移植到 Linux 核心
#### Insmod Problem:
* insmod 時出現 Key was rejected by service
* 網路上說是 Secure Boot 的問題
* [解決方案](https://blog.csdn.net/hydront/article/details/130280075)
* [改回來的方法](https://askubuntu.com/questions/726052/ubuntu-booting-in-insecure-mode-with-secureboot-enabled)
#### [Kernel Driver Practice](https://github.com/Embetronicx/Tutorials/tree/master/Linux/Device_Driver)
Donelist
1. mutex
2. spinlock -- doing