Try   HackMD

2024q1 Homework6 (integration)

contributed by < jeremy90307 >

閱讀 The Linux Kernel Module Programming Guide

花了些許時間閱讀,把相關問題及重點紀錄在這 > The Linux Kernel Module Programming Guide 筆記

解釋 simrupt 程式碼裡頭的 mutex lock 的使用方式,並探討能否改寫為 lock-free

參考 : 2021 年的筆記

kfifo

解釋

參考 kfifo(linux kernel 无锁队列)

kfifo 使用 in 和 out 兩個變量作為進入和離開的索引,如圖

Image Not Showing Possible Reasons
  • The image was uploaded to a note which you don't have access to
  • The note which the image was originally uploaded to has been deleted
Learn More →

  • 進入 n 筆資料時, in 變量就 + n
  • 離開 n 筆資料時, out 變量就 - n
  • out 不允許大於 in (out = in 表示 fifo 為空)
  • in 不允許比 out 大(即超出 fifo 的總大小,會有液出)

在 simrupt.c 的註釋中解釋為何無須再使用額外的鎖

/* NOTE: the usage of kfifo is safe (no need for extra locking), until there is
 * only one concurrent reader and one concurrent writer. Writes are serialized
 * from the interrupt context, readers are serialized using this mutex.
 */

使用到的 kfifo API

DECLARE_KFIFO_PTR — macro to declare a fifo pointer object
kfifo_in - put data into the fifo
kfifo_to_user — copies data from the fifo into user space
kfifo_len — returns the number of used elements in the fifo
kfifo_alloc — dynamically allocates a new fifo buffer
kfifo_free — frees the fifo

更多 FIFO 的 API 參考 -> kfifo

/include/linux/kfifo.h 可以看到 kfifo 的結構體

struct __kfifo {
	unsigned int	in;        // 進入索引
	unsigned int	out;       // 離開索引
	unsigned int	mask;      // 為佇列總大小 - 1
	unsigned int	esize;     // 單個元素大小
	void		*data;     // 指向佇列記憶體位置
};

#define __STRUCT_KFIFO_COMMON(datatype, recsize, ptrtype) \
	union { \
		struct __kfifo	kfifo; \
		datatype	*type; \
		const datatype	*const_type; \
		char		(*rectype)[recsize]; \
		ptrtype		*ptr; \
		ptrtype const	*ptr_const; \
	}
  • kfifo_in
unsigned int __kfifo_in(struct __kfifo *fifo,
		const void *buf, unsigned int len)
{
	unsigned int l;

	l = kfifo_unused(fifo);
	if (len > l)
		len = l;

	kfifo_copy_in(fifo, buf, len, fifo->in);
	fifo->in += len;
	return len;
} 

kfifo_unused(fifo) 用作查詢剩餘大小,若進入的大小大於剩餘大小,則以剩餘的大小為主 len = l ,最後 kfifo_copy_in 複製資料至記憶體。

/*
 * internal helper to calculate the unused elements in a fifo
 */
static inline unsigned int kfifo_unused(struct __kfifo *fifo)
{
	return (fifo->mask + 1) - (fifo->in - fifo->out);
}

fifo->mask + 1 為整個佇列的大小減去已用大小 fifo->in - fifo->out

static void kfifo_copy_in(struct __kfifo *fifo, const void *src,
                unsigned int len, unsigned int off)
{
        unsigned int size = fifo->mask + 1;
        unsigned int esize = fifo->esize;
        unsigned int l;

        off &= fifo->mask;
        if (esize != 1) {
                off *= esize;
                size *= esize;
                len *= esize;
        }
        l = min(len, size - off);

        memcpy(fifo->data + off, src, l);
        memcpy(fifo->data, src + l, len - l);
        /*
         * make sure that the data in the fifo is up to date before
         * incrementing the fifo->in index counter
         */
        smp_wmb();
}

為何使用兩次 memcpy ?

  • kfifo_alloc
    申請記憶體空間並初始化 kfifo
int __kfifo_alloc(struct __kfifo *fifo, unsigned int size,
		size_t esize, gfp_t gfp_mask)
{
	/*
	 * round up to the next power of 2, since our 'let the indices
	 * wrap' technique works only in this case.
	 */
	size = roundup_pow_of_two(size);

	fifo->in = 0;
	fifo->out = 0;
	fifo->esize = esize;

	if (size < 2) {
		fifo->data = NULL;
		fifo->mask = 0;
		return -EINVAL;
	}

	fifo->data = kmalloc_array(esize, size, gfp_mask);

	if (!fifo->data) {
		fifo->mask = 0;
		return -ENOMEM;
	}
	fifo->mask = size - 1;

	return 0;
}

當中 roundup_pow_of_two 可以求出 size 向上最接近的 2 的冪次
程式碼取自 /include/linux/log2.h

/**
 * roundup_pow_of_two - round the given value up to nearest power of two
 * @n: parameter
 *
 * round the given value up to the nearest power of two
 * - the result is undefined when n == 0
 * - this can be used to initialise global variables from constant data
 */
#define roundup_pow_of_two(n)			\
(						\
	__builtin_constant_p(n) ? (		\
		((n) == 1) ? 1 :		\
		(1UL << (ilog2((n) - 1) + 1))	\
				   ) :		\
	__roundup_pow_of_two(n)			\
 )

當中 __builtin_constant_p(n) 函式中的 n 若為常數回傳 1 ,反之為 0 。

參考 Other Built-in Functions Provided by GCC

circular buffer

參考
-> 並行程式設計: Ring buffer
-> circular buffer

先看註釋描述,看似要做出一個更快速的 circular buffer ,並結合 kfifo 。

更快速?

/* We use an additional "faster" circular buffer to quickly store data from
 * interrupt context, before adding them to the kfifo.
 */

查找 /include/linux/circ_buf.h 來了解 simrupt 中 circular buffer 使用到的 API

結構體

  • buf : real pointer that contains the array
  • head,tail : head and tail indices
struct circ_buf {
	char *buf;
	int head;
	int tail;
};

CIRC_CNT 回傳 buffer 被佔用的大小

緩衝區中總是有一個儲存單元保持未使用狀態。因此緩衝區最多存入 size - 1

/* Return count in buffer.  */
#define CIRC_CNT(head,tail,size) (((head) - (tail)) & ((size)-1))

CIRC_SPACE 回傳緩衝區剩餘大小

/* Return space available, 0..size-1.  We always leave one free char
   as a completely full buffer has head == tail, which is the same as
   empty.  */
#define CIRC_SPACE(head,tail,size) CIRC_CNT((tail),((head)+1),(size))
  • fast_buf_clear head = tail = 0 表示均指向同個位置,因此 buffer 為空。
/* Clear all data from the circular buffer fast_buf */
static void fast_buf_clear(void)
{
    fast_buf.head = fast_buf.tail = 0;
}
  • fast_buf_get
  • fast_buf_put

Bottom half

中斷概念參考 Linux 核心設計: 中斷處理和現代架構考量

為防止 interrupt 產生打架的情況,分成三種類型:

  • Critical : Top-half (不允許其他中斷,立即處理)
    ex : acknowledge interrupt
  • Non-critical : Top-half (允許其他中斷)
    ex : read key scan code, add to buffer
  • Non-cirtical deferrable : Bottom half, do it later (允許其他中斷)
    可以被延遲中斷,意味著可以重新被排程
    ex : copy keyboard buffer to terminal handler process

在 Linux 作業系統中,使用了三種機制去實現 Bottom Half

  • softirq
  • tasklet
  • workqueue

softirqs 和 tasklet 執行於 interrupt context , workqueue 執行於 process context 。

相關問題:
Difference between interrupt context and process context?

image

softirq

  • 是 reentrant function,同一種 IRQ 可以在多個 CPU 上並行執行。
  • 使用靜態建立
  • kernel thread

tasklet

  • 建構在 softirq 之上,tasklet 相較 softirq 允許動態建立。
  • 同一時間只執行在一個 CPU 上,且不為 reentrant function 。
  • kernel thread

在 simrupt 中

定義 tasklet

#define DECLARE_TASKLET_OLD(arg1, arg2) DECLARE_TASKLET(arg1, arg2, 0L)

加入 tasklet

tasklet_schedule(&simrupt_tasklet);

刪除 tasklet

tasklet_kill(&simrupt_tasklet);

Workqueue

  • 概念為對中斷的處理建立每個 CPU 的 kernel thread

在 simrupt 中建立 workqueue

static struct workqueue_struct *simrupt_workqueue;

當中 WQ_UNBOUND 表示不綁定任何 CPU

更多 Workqueue flags 參考 /include/linux/workqueue.h

/* Create the workqueue */
    simrupt_workqueue = alloc_workqueue("simruptd", WQ_UNBOUND, WQ_MAX_ACTIVE);
    if (!simrupt_workqueue) {
        vfree(fast_buf.buf);
        device_destroy(simrupt_class, dev_id);
        class_destroy(simrupt_class);
        ret = -ENOMEM;
        goto error_cdev;
    }

執行指定的 workqueue 任務,當中 &work 指向具體任務的指針。

queue_work(simrupt_workqueue, &work);

釋放 workqueue

destroy_workqueue(simrupt_workqueue);

當 producer 將數據儲存至 kfifo buffer 及 consumer 將數據從 circular buffer 取出時使用互斥鎖。

Timer

使用 timer_handler 函式來模擬中斷的發生,在函式中 local_irq_disable 用於停用中斷,來避免中斷時又遇到中斷的發生,接著將延遲中斷的部份使用 process_data 將字元放入 circular buffer ,並啟用 tasklet 來處理中斷,最後使用 local_irq_enable 函式再次啟用中斷。

mutex lock

在 simrupt 中定義的鎖

/* NOTE: the usage of kfifo is safe (no need for extra locking), until there is
 * only one concurrent reader and one concurrent writer. Writes are serialized
 * from the interrupt context, readers are serialized using this mutex.
 */
static DEFINE_MUTEX(read_lock);

註釋指出,在特定條件下,即只有一個並行 reader 和一個並行 writer 時,對 kfifo 的使用是安全的,無需額外的鎖定機制。

考慮到並行,因寫入操作是由 interrupt context 進行(不可被搶佔、sleep),因此只有一個 writer 可以存取 kfifo,但讀取操作就必須使用 DEFINE_MUTEX(read_lock) 來限制只能有一位讀者存取 kfifo ,總結最後確保 kfifo 的並行是安全的,在同一時間只有一個 writer 及 reader 可以存取 kfifo 。

/* Mutex to serialize kfifo writers within the workqueue handler */
static DEFINE_MUTEX(producer_lock);

/* Mutex to serialize fast_buf consumers: we can use a mutex because consumers
 * run in workqueue handler (kernel thread context).
 */
static DEFINE_MUTEX(consumer_lock);

藉由 kernel thread 取得 circular buffer 的資料,使用 mutex 來避免執行緒之間在取得資料時發生問題

/* Consume data from the circular buffer */
mutex_lock(&consumer_lock);
val = fast_buf_get();
mutex_unlock(&consumer_lock);

疑問

前面的註釋提到寫入 kfifo 的操作在 interrupt context 已確保同時只能有一個寫著存取,為何這裡還要在上鎖?

/* Store data to the kfifo buffer */
mutex_lock(&producer_lock);
produce_data(val);
mutex_unlock(&producer_lock);

當 circular buffer 為空時 if (val < 0) 喚醒等待中的佇列

wake_up_interruptible(&rx_wait);
/* Wait queue to implement blocking I/O from userspace */
static DECLARE_WAIT_QUEUE_HEAD(rx_wait);

流程圖

參考江冠霆同學的作業時發現他整理了一個流程圖,可以加速認識 simrupt 的運作原理,後續回頭複習時能更快了解。

lock-free

Linux 核心設計:淺談同步機制

  • coarse-grained vs fine-grained

用粗粒(coarse-grained)細粒(fine-grained)來舉 lock 的優劣

  • Atomic instruction
    x++,x-- -> 確保不會被 CPU 交錯執行
  • Compare and swap (CAS)
    • 語意: if (x == y) x = z;
    ​​​​int compareAndSwap(int *loc, int expectedValue, int valueToStore) {
    ​​​​    ATOMIC {
    ​​​​        int val = *loc;
    ​​​​        if (val == expected) {
    ​​​​            *loc = valueToStore;
    ​​​​        }
    ​​​​        return val;
    ​​​​    }
    ​​​​}
    

若傳入為符合預期的值進行 swap ,簡單的程式碼,重點在於 ATOMIC

並非無鎖的操作就是 lock-free

  • 一個 lock-free 的程式需要確保符合這個特徵:執行該程式的所有執行緒至少有一個能夠不被阻擋 (blocked) 而繼續執行
    • lock-Free 中的 "Lock" 沒有直接涉及 mutex 和 semaphore 一類 mutual exclusion,而是描述了應用程式受限於特定原因被鎖定的可能,例如可能因為 dead lock, live lock,或者 thread scheduling 致使的 priority inversion 等等
    • 下方的程式碼沒有任何 mutex,但卻不符合 lock-free 的屬性:若有兩個執行緒同時執行程式碼,考慮到特定的排程方式,非常可能兩個執行緒同時陷入無窮迴圈,也就是阻擋彼此
    • Lock-Free 程式設計的挑戰不僅來自於其任務本身的複雜性,還要始終著眼於對事物本質的洞察
while (X == 0 ) 
{ 
    X = 1 - X; 
}

討論能否改成 lock-free

下方程式碼使用 mutex 確保從 circular buffer 取得的值是一致的

        mutex_lock(&consumer_lock);
        val = fast_buf_get();
        mutex_unlock(&consumer_lock);

fast_buf_get 中或許可以使用 atomic 的 CAS 操作來改寫,但具體怎麼改寫還在思考。

閱讀 Linux 核心模組運作原理

  • 閱讀〈Linux 核心模組運作原理〉並對照 Linux 核心原始程式碼 (v6.1+),解釋 insmod 後,Linux 核心模組的符號 (symbol) 如何被 Linux 核心找到 (使用 List API)、MODULE_LICENSE 巨集指定的授權條款又對核心有什麼影響 (GPL 與否對於可用的符號列表有關),以及藉由 strace 追蹤 Linux 核心的掛載,涉及哪些系統呼叫和子系統?

因目前設備使用的 kernel 核心版本為 v6.5.0,為了後續實作查看 Linux kernel v6.5.0 的源程式碼

使用 strace 追蹤執行 insmod hello.ko 的過程有哪些系統呼叫被執行:

$sudo strace insmod hello.ko

當中的 finit_modulekernel/module/main.c 中可以找到,接著呼叫 idempotent_init_module -> init_module_from_file -> load_module,最後呼叫的 load_module 為 Linux 核心配置記憶體及載入模組。

接著查看到 load_module 的程式碼,-> simplify_symbols -> resolve_symbol_wait -> resolve_symbol -> find_symbol -> list_for_each_entry_rcu

list_for_each_entry_rcu 是一個 RCU 的 List API

/**
 * list_for_each_entry_srcu	-	iterate over rcu list of given type
 * @pos:	the type * to use as a loop cursor.
 * @head:	the head for your list.
 * @member:	the name of the list_head within the struct.
 * @cond:	lockdep expression for the lock required to traverse the list.
 *
 * This list-traversal primitive may safely run concurrently with
 * the _rcu list-mutation primitives such as list_add_rcu()
 * as long as the traversal is guarded by srcu_read_lock().
 * The lockdep expression srcu_read_lock_held() can be passed as the
 * cond argument from read side.
 */
#define list_for_each_entry_srcu(pos, head, member, cond)		\
	for (__list_check_srcu(cond),					\
	     pos = list_entry_rcu((head)->next, typeof(*pos), member);	\
		&pos->member != (head);					\
		pos = list_entry_rcu(pos->member.next, typeof(*pos), member))