# 2024q1 Homework6 (integration)
contributed by < [jeremy90307](https://github.com/jeremy90307) >
## 閱讀 [The Linux Kernel Module Programming Guide](https://sysprog21.github.io/lkmpg/#headers)
花了些許時間閱讀,把相關問題及重點紀錄在這 ----> [The Linux Kernel Module Programming Guide 筆記](https://hackmd.io/@jeremytsai/HW6_note)
### 解釋 [simrupt](https://github.com/sysprog21/simrupt) 程式碼裡頭的 mutex lock 的使用方式,並探討能否改寫為 [lock-free](https://hackmd.io/@sysprog/concurrency-lockfree)
> 參考 : [2021 年的筆記](https://hackmd.io/@linD026/simrupt-vwifi)
### kfifo
**解釋**
> 參考 [kfifo(linux kernel 无锁队列)](https://zhuanlan.zhihu.com/p/500070147)
>
kfifo 使用 in 和 out 兩個變量作為進入和離開的索引,如圖
![image](https://hackmd.io/_uploads/HyvH6mT-0.png)
- 進入 n 筆資料時, in 變量就 `+ n`
- 離開 n 筆資料時, out 變量就 `- n`
- out 不允許大於 in (out = in 表示 fifo 為空)
- in 不允許比 out 大(即超出 fifo 的總大小,會有液出)
在 simrupt.c 的註釋中解釋為何無須再使用額外的鎖
```
/* NOTE: the usage of kfifo is safe (no need for extra locking), until there is
* only one concurrent reader and one concurrent writer. Writes are serialized
* from the interrupt context, readers are serialized using this mutex.
*/
```
使用到的 kfifo API
`DECLARE_KFIFO_PTR` — macro to declare a fifo pointer object
`kfifo_in` - put data into the fifo
`kfifo_to_user` — copies data from the fifo into user space
`kfifo_len` — returns the number of used elements in the fifo
`kfifo_alloc` — dynamically allocates a new fifo buffer
`kfifo_free` — frees the fifo
> 更多 FIFO 的 API 參考 -> [kfifo](https://archive.kernel.org/oldlinux/htmldocs/kernel-api/kfifo.html)
在 [/include/linux/kfifo.h](https://elixir.bootlin.com/linux/latest/source/include/linux/kfifo.h) 可以看到 **kfifo 的結構體**
```c
struct __kfifo {
unsigned int in; // 進入索引
unsigned int out; // 離開索引
unsigned int mask; // 為佇列總大小 - 1
unsigned int esize; // 單個元素大小
void *data; // 指向佇列記憶體位置
};
#define __STRUCT_KFIFO_COMMON(datatype, recsize, ptrtype) \
union { \
struct __kfifo kfifo; \
datatype *type; \
const datatype *const_type; \
char (*rectype)[recsize]; \
ptrtype *ptr; \
ptrtype const *ptr_const; \
}
```
- `kfifo_in`
```c
unsigned int __kfifo_in(struct __kfifo *fifo,
const void *buf, unsigned int len)
{
unsigned int l;
l = kfifo_unused(fifo);
if (len > l)
len = l;
kfifo_copy_in(fifo, buf, len, fifo->in);
fifo->in += len;
return len;
}
```
`kfifo_unused(fifo)` 用作查詢剩餘大小,若進入的大小大於剩餘大小,則以剩餘的大小為主 `len = l` ,最後 `kfifo_copy_in` 複製資料至記憶體。
```c
/*
* internal helper to calculate the unused elements in a fifo
*/
static inline unsigned int kfifo_unused(struct __kfifo *fifo)
{
return (fifo->mask + 1) - (fifo->in - fifo->out);
}
```
`fifo->mask + 1` 為整個佇列的大小減去已用大小 `fifo->in - fifo->out`
```c
static void kfifo_copy_in(struct __kfifo *fifo, const void *src,
unsigned int len, unsigned int off)
{
unsigned int size = fifo->mask + 1;
unsigned int esize = fifo->esize;
unsigned int l;
off &= fifo->mask;
if (esize != 1) {
off *= esize;
size *= esize;
len *= esize;
}
l = min(len, size - off);
memcpy(fifo->data + off, src, l);
memcpy(fifo->data, src + l, len - l);
/*
* make sure that the data in the fifo is up to date before
* incrementing the fifo->in index counter
*/
smp_wmb();
}
```
:::info
為何使用兩次 `memcpy` ?
:::
- `kfifo_alloc`
申請記憶體空間並初始化 kfifo
```c
int __kfifo_alloc(struct __kfifo *fifo, unsigned int size,
size_t esize, gfp_t gfp_mask)
{
/*
* round up to the next power of 2, since our 'let the indices
* wrap' technique works only in this case.
*/
size = roundup_pow_of_two(size);
fifo->in = 0;
fifo->out = 0;
fifo->esize = esize;
if (size < 2) {
fifo->data = NULL;
fifo->mask = 0;
return -EINVAL;
}
fifo->data = kmalloc_array(esize, size, gfp_mask);
if (!fifo->data) {
fifo->mask = 0;
return -ENOMEM;
}
fifo->mask = size - 1;
return 0;
}
```
當中 `roundup_pow_of_two` 可以求出 size 向上最接近的 2 的冪次
程式碼取自 [/include/linux/log2.h](https://elixir.bootlin.com/linux/latest/source/include/linux/log2.h#L174)
```c
/**
* roundup_pow_of_two - round the given value up to nearest power of two
* @n: parameter
*
* round the given value up to the nearest power of two
* - the result is undefined when n == 0
* - this can be used to initialise global variables from constant data
*/
#define roundup_pow_of_two(n) \
( \
__builtin_constant_p(n) ? ( \
((n) == 1) ? 1 : \
(1UL << (ilog2((n) - 1) + 1)) \
) : \
__roundup_pow_of_two(n) \
)
```
當中 `__builtin_constant_p(n)` 函式中的 n 若為常數回傳 1 ,反之為 0 。
> 參考 [Other Built-in Functions Provided by GCC](https://gcc.gnu.org/onlinedocs/gcc/Other-Builtins.html)
### circular buffer
> 參考
> -> [並行程式設計: Ring buffer](https://hackmd.io/@sysprog/concurrency/%2F%40sysprog%2Fconcurrency-ringbuffer#Ring-Buffer)
> -> [circular buffer](https://zh.wikipedia.org/zh-tw/%E7%92%B0%E5%BD%A2%E7%B7%A9%E8%A1%9D%E5%8D%80)
先看註釋描述,看似要做出一個更快速的 circular buffer ,並結合 kfifo 。
:::info
更快速?
:::
```
/* We use an additional "faster" circular buffer to quickly store data from
* interrupt context, before adding them to the kfifo.
*/
```
查找 [/include/linux/circ_buf.h](https://elixir.bootlin.com/linux/latest/source/include/linux/circ_buf.h) 來了解 simrupt 中 circular buffer 使用到的 API
**結構體**
- `buf` : real pointer that contains the array
- `head`,`tail` : head and tail indices
```c
struct circ_buf {
char *buf;
int head;
int tail;
};
```
`CIRC_CNT` 回傳 buffer 被佔用的大小
> 緩衝區中總是有一個儲存單元保持未使用狀態。因此緩衝區最多存入 `size - 1` 。
```c
/* Return count in buffer. */
#define CIRC_CNT(head,tail,size) (((head) - (tail)) & ((size)-1))
```
`CIRC_SPACE` 回傳緩衝區剩餘大小
```c
/* Return space available, 0..size-1. We always leave one free char
as a completely full buffer has head == tail, which is the same as
empty. */
#define CIRC_SPACE(head,tail,size) CIRC_CNT((tail),((head)+1),(size))
```
- `fast_buf_clear` head = tail = 0 表示均指向同個位置,因此 buffer 為空。
```c
/* Clear all data from the circular buffer fast_buf */
static void fast_buf_clear(void)
{
fast_buf.head = fast_buf.tail = 0;
}
```
- `fast_buf_get`
- `fast_buf_put`
### Bottom half
> 中斷概念參考 [Linux 核心設計: 中斷處理和現代架構考量](https://hackmd.io/@sysprog/linux-interrupt#Linux-%E6%A0%B8%E5%BF%83%E8%A8%AD%E8%A8%88-%E4%B8%AD%E6%96%B7%E8%99%95%E7%90%86%E5%92%8C%E7%8F%BE%E4%BB%A3%E6%9E%B6%E6%A7%8B%E8%80%83%E9%87%8F)
為防止 interrupt 產生打架的情況,分成三種類型:
- Critical : Top-half (不允許其他中斷,立即處理)
ex : acknowledge interrupt
- Non-critical : Top-half (允許其他中斷)
ex : read key scan code, add to buffer
- Non-cirtical deferrable : Bottom half, do it **later** (允許其他中斷)
可以被延遲中斷,意味著可以重新被排程
ex : copy keyboard buffer to terminal handler process
在 Linux 作業系統中,使用了三種機制去實現 Bottom Half
- softirq
- tasklet
- workqueue
softirqs 和 tasklet 執行於 interrupt context , workqueue 執行於 process context 。
> 相關問題:
> [Difference between interrupt context and process context?](https://stackoverflow.com/questions/57987140/difference-between-interrupt-context-and-process-context)
![image](https://hackmd.io/_uploads/rJWUMvXMA.png)
#### softirq
- 是 reentrant function,同一種 IRQ 可以在多個 CPU 上並行執行。
- 使用靜態建立
- kernel thread
#### tasklet
- 建構在 softirq 之上,tasklet 相較 softirq 允許動態建立。
- 同一時間只執行在一個 CPU 上,且不為 reentrant function 。
- kernel thread
在 simrupt 中
定義 tasklet
```c
#define DECLARE_TASKLET_OLD(arg1, arg2) DECLARE_TASKLET(arg1, arg2, 0L)
```
加入 tasklet
```c
tasklet_schedule(&simrupt_tasklet);
```
刪除 tasklet
```
tasklet_kill(&simrupt_tasklet);
```
#### Workqueue
- 概念為對中斷的處理建立每個 CPU 的 kernel thread
在 simrupt 中建立 workqueue
```c
static struct workqueue_struct *simrupt_workqueue;
```
當中 `WQ_UNBOUND` 表示不綁定任何 CPU
> 更多 Workqueue flags 參考 [/include/linux/workqueue.h](https://elixir.bootlin.com/linux/latest/source/include/linux/workqueue.h#L351)
```c
/* Create the workqueue */
simrupt_workqueue = alloc_workqueue("simruptd", WQ_UNBOUND, WQ_MAX_ACTIVE);
if (!simrupt_workqueue) {
vfree(fast_buf.buf);
device_destroy(simrupt_class, dev_id);
class_destroy(simrupt_class);
ret = -ENOMEM;
goto error_cdev;
}
```
執行指定的 workqueue 任務,當中 `&work` 指向具體任務的指針。
```c
queue_work(simrupt_workqueue, &work);
```
釋放 workqueue
```c
destroy_workqueue(simrupt_workqueue);
```
當 producer 將數據儲存至 kfifo buffer 及 consumer 將數據從 circular buffer 取出時使用互斥鎖。
### Timer
使用 `timer_handler` 函式來模擬中斷的發生,在函式中 `local_irq_disable` 用於停用中斷,來避免中斷時又遇到中斷的發生,接著將延遲中斷的部份使用 `process_data` 將字元放入 circular buffer ,並啟用 tasklet 來處理中斷,最後使用 `local_irq_enable` 函式再次啟用中斷。
### mutex lock
在 simrupt 中定義的鎖
```c
/* NOTE: the usage of kfifo is safe (no need for extra locking), until there is
* only one concurrent reader and one concurrent writer. Writes are serialized
* from the interrupt context, readers are serialized using this mutex.
*/
static DEFINE_MUTEX(read_lock);
```
註釋指出,在特定條件下,即只有一個並行 reader 和一個並行 writer 時,對 kfifo 的使用是安全的,無需額外的鎖定機制。
考慮到並行,因寫入操作是由 interrupt context 進行(不可被搶佔、sleep),因此只有一個 writer 可以存取 kfifo,但讀取操作就必須使用 `DEFINE_MUTEX(read_lock)` 來限制只能有一位讀者存取 kfifo ,總結最後確保 kfifo 的並行是安全的,在同一時間只有一個 writer 及 reader 可以存取 kfifo 。
```c
/* Mutex to serialize kfifo writers within the workqueue handler */
static DEFINE_MUTEX(producer_lock);
/* Mutex to serialize fast_buf consumers: we can use a mutex because consumers
* run in workqueue handler (kernel thread context).
*/
static DEFINE_MUTEX(consumer_lock);
```
藉由 kernel thread 取得 circular buffer 的資料,使用 mutex 來避免執行緒之間在取得資料時發生問題
```c
/* Consume data from the circular buffer */
mutex_lock(&consumer_lock);
val = fast_buf_get();
mutex_unlock(&consumer_lock);
```
:::info
**疑問**
前面的註釋提到寫入 kfifo 的操作在 interrupt context 已確保同時只能有一個寫著存取,為何這裡還要在上鎖?
:::
```c
/* Store data to the kfifo buffer */
mutex_lock(&producer_lock);
produce_data(val);
mutex_unlock(&producer_lock);
```
當 circular buffer 為空時 `if (val < 0)` 喚醒等待中的佇列
```c
wake_up_interruptible(&rx_wait);
```
```c
/* Wait queue to implement blocking I/O from userspace */
static DECLARE_WAIT_QUEUE_HEAD(rx_wait);
```
### 流程圖
> 參考[江冠霆同學](https://hackmd.io/@Henry0922/linux2024-homework6#simrupt)的作業時發現他整理了一個流程圖,可以加速認識 simrupt 的運作原理,後續回頭複習時能更快了解。
### lock-free
[Linux 核心設計:淺談同步機制](https://www.youtube.com/watch?v=lNt-ewfygy8)
- coarse-grained vs fine-grained
用粗粒(coarse-grained)細粒(fine-grained)來舉 lock 的優劣
- Atomic instruction
`x++`,`x--` -> 確保不會被 CPU 交錯執行
- Compare and swap (CAS)
* 語意: `if (x == y) x = z;`
```C
int compareAndSwap(int *loc, int expectedValue, int valueToStore) {
ATOMIC {
int val = *loc;
if (val == expected) {
*loc = valueToStore;
}
return val;
}
}
```
若傳入為符合預期的值進行 swap ,簡單的程式碼,重點在於 `ATOMIC`
**並非無鎖的操作就是 `lock-free`**
- **一個 lock-free 的程式需要確保符合這個特徵:執行該程式的所有執行緒至少有一個能夠不被阻擋 (blocked) 而繼續執行**
* lock-Free 中的 "Lock" 沒有直接涉及 mutex 和 semaphore 一類 mutual exclusion,而是描述了應用程式受限於特定原因被鎖定的可能,例如可能因為 dead lock, live lock,或者 thread scheduling 致使的 priority inversion 等等
* 下方的程式碼沒有任何 mutex,但卻不符合 lock-free 的屬性:若有兩個執行緒同時執行程式碼,考慮到特定的排程方式,非常可能兩個執行緒同時陷入無窮迴圈,也就是阻擋彼此
* Lock-Free 程式設計的挑戰不僅來自於其任務本身的複雜性,還要始終著眼於對事物本質的洞察
```c
while (X == 0 )
{
X = 1 - X;
}
```
**討論能否改成 lock-free**
下方程式碼使用 mutex 確保從 circular buffer 取得的值是一致的
```c
mutex_lock(&consumer_lock);
val = fast_buf_get();
mutex_unlock(&consumer_lock);
```
在 `fast_buf_get` 中或許可以使用 atomic 的 CAS 操作來改寫,但具體怎麼改寫還在思考。
## 閱讀 [Linux 核心模組運作原理](https://hackmd.io/@sysprog/linux-kernel-module)
- [ ] 閱讀〈Linux 核心模組運作原理〉並對照 Linux 核心原始程式碼 (v6.1+),解釋 insmod 後,Linux 核心模組的符號 (symbol) 如何被 Linux 核心找到 (使用 List API)、MODULE_LICENSE 巨集指定的授權條款又對核心有什麼影響 (GPL 與否對於可用的符號列表有關),以及藉由 strace 追蹤 Linux 核心的掛載,涉及哪些系統呼叫和子系統?
因目前設備使用的 kernel 核心版本為 v6.5.0,為了後續實作查看 Linux kernel v6.5.0 的源程式碼
使用 strace 追蹤執行 insmod hello.ko 的過程有哪些系統呼叫被執行:
```
$sudo strace insmod hello.ko
```
當中的 `finit_module` 在 [kernel/module/main.c](https://elixir.bootlin.com/linux/v6.5/source/kernel/module/main.c) 中可以找到,接著呼叫 `idempotent_init_module` -> `init_module_from_file` -> `load_module`,最後呼叫的 `load_module` 為 Linux 核心配置記憶體及載入模組。
接著查看到 `load_module` 的程式碼,-> `simplify_symbols` -> `resolve_symbol_wait ` -> `resolve_symbol` -> `find_symbol` -> `list_for_each_entry_rcu`
`list_for_each_entry_rcu` 是一個 RCU 的 List API
```c
/**
* list_for_each_entry_srcu - iterate over rcu list of given type
* @pos: the type * to use as a loop cursor.
* @head: the head for your list.
* @member: the name of the list_head within the struct.
* @cond: lockdep expression for the lock required to traverse the list.
*
* This list-traversal primitive may safely run concurrently with
* the _rcu list-mutation primitives such as list_add_rcu()
* as long as the traversal is guarded by srcu_read_lock().
* The lockdep expression srcu_read_lock_held() can be passed as the
* cond argument from read side.
*/
#define list_for_each_entry_srcu(pos, head, member, cond) \
for (__list_check_srcu(cond), \
pos = list_entry_rcu((head)->next, typeof(*pos), member); \
&pos->member != (head); \
pos = list_entry_rcu(pos->member.next, typeof(*pos), member))
```