Try   HackMD

Linux 核心專題: 改進 LKMPG

執行人: mesohandsome
專題解說錄影

Reviewed by Lccgth

在 tasklet 中切記不能加入 sleep 等暫停功能,運氣好時模組載入僅會出現錯誤訊息,嚴重時會導致系統 crash,這個時候只能乖乖地按下電源鍵重新開機。

運氣好這個詞彙不能精確反映實際執行情形,是否有其他因素導致執行時發生兩種錯誤情況?

任務簡介

閱讀 LKMPG 並改進其內容,預計加上 simrupt 到 LKMPG 來說明 tasklet, mutex, kfifo 等使用方式。

TODO: 閱讀 LKMPG 並紀錄問題 (含可改進之處)

針對書中錯誤或過時資訊,提交 pull request 改進

問題 1

在提及 major number 時,後續未介紹 minor number。

kernel 會自動分配 major number 給 device driver,那 device driver 要怎麼用 minor number 判斷目前的裝置為何?

問題 2

僅有某些章節提供 reference 資料,其他章節也能針對該段重點提供更多參考資訊?

問題 3

14.1 Tasklets 中 example_tasklet.c 的運作結果與書中的結果不同。

# Example
tasklet example init
Example tasklet starts
Example tasklet init continues...
Example tasklet ends

# My result
[702163.732017] tasklet example init
[702163.732369] Example tasklet starts
[702168.693030] Example tasklet ends
[702168.890467] Example tasklet init continues...

該篇文章 提到每個 tasklet 只能在其執行 tasklet_schedule 的 CPU 上運作:

Tasklets are used to queue up work to be done at a later time. Tasklets can be run in parallel, but the same tasklet cannot be run on multiple CPUs at the same time. Also, each tasklet will run only on the CPU that schedules it, to optimize cache usage. Since the thread that queued up the tasklet must complete before it can run the tasklet, race conditions are naturally avoided. However, this arrangement can be suboptimal, as other potentially idle CPUs cannot be used to run the tasklet. Therefore workqueues can and should be used instead, and workqueues were already discussed here.

因此程式一定會先將 tasklet 執行完畢再繼續執行 module_init 中剩下的操作,不會像書中所給的範例輸出會像是 multi-thread 那樣同步執行。

要修改 example_tasklet.c 範例下的的輸出或是直接變更 example_tasklet.c?

後者

tasklet 在運作時是 non-blocking 的,因此一定會讓 tasklet_fn 全部完成後才會繼續做其他事,且在 tasklet 中不宜使用 delay 或 sleep 等操作。

static void tasklet_fn(unsigned long data) 
{ 
    pr_info("Example tasklet starts\n"); 
    mdelay(5000); 
    pr_info("Example tasklet ends\n"); 
}

static int __init example_tasklet_init(void) 
{ 
    pr_info("tasklet example init\n"); 
    tasklet_schedule(&mytask); 
    mdelay(200); 
    pr_info("Example tasklet init continues...\n"); 
    return 0; 
} 

將範例中的 delay 時間做調整後,會得到兩種結果,分別為先執行 tasklet 以及後執行,因此可以得到以下兩種 output:

# 1
tasklet example init
Example tasklet init continues...
Example tasklet starts
Example tasklet ends

# 2
tasklet example init
Example tasklet starts
Example tasklet ends
Example tasklet init continues...

如果想得到書中所給的輸出,需要使用到 multi-thread 或是用像 simrupt 中使用 workqueue 等方式讓程式得以非同步執行,才能將 Example tasklet init continues... 穿插在 Example tasklet startsExample tasklet ends 中間。

問題 4

procfs2.cprocfile_write 中有一段程式碼,當 procfs_buffer_size == PROCFS_MAX_SIZE,在 procfs_buffer[0] 填上 \0,等於是將 buffer 清空:

procfs_buffer[procfs_buffer_size & (PROCFS_MAX_SIZE - 1)] = '\0';

在剛進入該函式時就已經有使用一次 if 判斷 procfs_buffer_size 是否大於 PROCFS_MAX_SIZE,並將 procfs_buffer_size 設定為 PROCFS_MAX_SIZE,為何不在這時就清空 procfs_buffer 並 return,這樣也能少做一次 copy_from_user

if (procfs_buffer_size >= PROCFS_MAX_SIZE) {
    procfs_buffer[0] = '\0';
    procfs_buffer_size = PROCFS_MAX_SIZE;
    *off += procfs_buffer_size;
    
    return procfs_buffer_size;
}

...

整體看下來是在 procfs_buffer_size 大小超出時清空 buffer,但做的不是很直覺?

TODO: 加上 simrupt 到 LKMPG 來說明 tasklet, mutex, kfifo 等使用方式

提交 pull request,作為對課程的貢獻和自身認知的驗證

使用 simrupt

simrupt 專案 clone 到 local 端,使用以下命令編譯程式並載入模組:

$ git clone https://github.com/sysprog21/simrupt.git

# Compile
$ make

# Insert module
$ sudo insmod simrupt.ko

insmod 後,可以觀察 simrupt 是否有成功被載入,接著可以使用 catsimrupt 開始運作,且可以在畫面上看到輸出的資訊,掛載模組訊息中的最後兩個數字 5070 分別為該模組的 Major number 和 Minor number:

$ dmesg
simrupt: registered new simrupt device: 507,0

$ cat /dev/simrupt
 !"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefghijklmnopqrstuvwxyz{|}~
$ dmesg -w
openm current cnt: 1
simrupt: [CPU#2] enter timer_handler
simrupt: [CPU#2] produce data
simrupt: [CPU#2] scheduling tasklet
simrupt: [CPU#2] timer_handler in_irq: 4 usec
simrupt: [CPU#2] simrupt_tasklet_func in_softirq: 3 usec
simrupt: [CPU#7] simrupt_work_func
simrupt: [CPU#2] enter timer_handler
simrupt: [CPU#2] produce data
simrupt: [CPU#2] scheduling tasklet
simrupt: [CPU#2] timer_handler in_irq: 4 usec
simrupt: [CPU#2] simrupt_tasklet_func in_softirq: 4 usec
simrupt: [CPU#8] simrupt_work_func
...

tasklet

tasklet 是 Linux kernel 中的一中軟中斷機制,用於中斷上下文中處理較長時間的任務,相關的定義及函式都存在 linux/interrupt.h 中。

在持續更新的 kernel 中,DECLARE_TASKLET 的定義被修改過後,比起原本多接收了一個 data 參數,在 tasklet_struct 中可見 DECLARE_TASKLET 巨集中的每項參數分別對應到其中的哪 一項變數:

struct tasklet_struct
{
    struct tasklet_struct *next;
    unsigned long state;
    atomic_t count;
    bool use_callback;
    union {
        void (*func)(unsigned long data);
        void (*callback)(struct tasklet_struct *t);
    };
    unsigned long data;
};
#define DECLARE_TASKLET(name, func, data) \
          struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(0), func, data }

注意用語!

為了相容舊的 kernel 版本需要再定義 DECLARE_TASKLET_OLD,避免出現編譯錯誤,在新版本的 interrupt.h 已經有定義好的 DECLARE_TASKLET_OLD,所以主要是針對 kernel 版本還沒使用到最新的情況。

#ifndef DECLARE_TASKLET_OLD
#define DECLARE_TASKLET_OLD(arg1, arg2) DECLARE_TASKLET(arg1, arg2, 0L)
#endif

好奇為什麼不是把以前使用到的 DECLARE_TASKLET 都更新,而是定義新的 DECLARE_TASKLET_OLD 去相容?

為了使用 tasklet,接下來會使用到剛才所提到的 macro,後續如要呼叫 takslet 便是呼叫 DECLARED_TASKLET_OLD 中的第一個參數,在範例中即為 simrupt_tasklet

在 tasklet 中切記不能加入 sleep 等暫停功能,運氣好時模組載入僅會出現錯誤訊息,嚴重時會導致系統 crash,這個時候只能乖乖地按下電源鍵重新開機。

static void simrupt_tasklet_func(unsigned long __data){
    ...
}

static DECLARE_TASKLET_OLD(simrupt_tasklet, simrupt_tasklet_func);

timer_handler 中,會間隔一段時間模擬 keyboard interrupt,tasklet 通常在 bottom-half 處理不能被中斷的部分,可以看見函式中在處理完所有事情後,最後才呼叫 tasklet_schedule 將 tasklet 掛起。

執行程式後可以從 dmesg 中觀察執行狀況,可以發現 tasklet 運作在呼叫 tasklet_schedule 時的同一個 CPU 上。

static void process_data(void)
{
    WARN_ON_ONCE(!irqs_disabled());

    pr_info("simrupt: [CPU#%d] produce data\n", smp_processor_id());
    fast_buf_put(update_simrupt_data());

    pr_info("simrupt: [CPU#%d] scheduling tasklet\n", smp_processor_id());
    tasklet_schedule(&simrupt_tasklet);
}

當 module 要 exit 時,也不要忘記將 garbage collection,使用 tasklet_kill 將先前宣告的 tasklet 給清除:

static void __exit simrupt_exit(void)
{
    ...
    tasklet_kill(&simrupt_tasklet);
    ...
}

mutex

process 使用 mutex 時,必須先持有 mutex 才得以進入 CS (critical section) 存取資源,
結束後再釋放 mutex 讓其他 process 使用。

在 kernel module 中使用 mutex 非常便利,會用到的函式以及巨集都已經在 mutex.h 中定義好了。
首先使用巨集 DEFINE_MUTEX(mutexname) 來宣告變數。

#define DEFINE_MUTEX(mutexname) \
     struct mutex mutexname = __MUTEX_INITIALIZER(mutexname)

接著會使用到 mutex_lock() 以及 mutex_unlock() 來取得及釋放 mutex,在 simrupt.c 中提供了單一生產者及單一消費者的範例,當消費者想存取 buffer 中的資料 (進入 critical section) 之前,需先以 mutex_lock(&consumer_lock) 取得 mutex,且在存取完畢後以 mutex_unlock(&consumer_lock) 釋放 mutex,讓下一個消費者存取時可以正常取得,否則會陷入 deadlock 導致行為無法正常運作,
對於生產者也是一樣,皆是存取 mutex 後才能操作記憶體空間。

while (1) {
    /* Consume data from the circular buffer */
    mutex_lock(&consumer_lock);
    val = fast_buf_get();
    mutex_unlock(&consumer_lock);

    if (val < 0)
        break;

    /* Store data to the kfifo buffer */
    mutex_lock(&producer_lock);
    produce_data(val);
    mutex_unlock(&producer_lock);
}

若無使用 mutex 確保 critical section 的單一存取,有可能會發生 race condition
對於下面的狀況,預期結果會是 Process A 與 Process B 分別讀取數值並加 1 而結果為 2,若發生 race condition,沒有將讀寫保護在 critical section 中,導致結果的數值為 1,與預期並不相符。

預期情況:

Process A Process B Data
Read (0) 0
Write (0+1) 1
Read (1) 1
Write (1+1) 2

實際情況:

Process A Process B Data
Read (0) 0
Read (0) 0
Write (0+1) 1
Write (0+1) 1

kfifo

linux/kfifo.h
linux/kfifo.c

kfifo 是在 Linux kernel 中的一種 Ring buffer 機制,如果只有一個讀取端執行緒、一個寫入端執行緒,二者沒有共享的被修改的控制變數,那麼可以證明這種情況下不需要並行控制,kfifo 就滿足上述條件,kfifo 主要用於生產者和消費者之間傳遞資訊,或者在中斷上下文和 process 上下文中傳遞資訊,且 kfifo 具有 circular buffer 的特性,當 buffer 寫入位置已經到末端時,會再從 buffer 的開頭繼續寫入,從而實現 循環利用,且因為 circular buffer 的特性,可以避免頻繁配置空間和釋放記憶體。

kfifo 中的 in 可以想成 queue 中的 tail,當生產者要寫入資訊時會從這端開始,out 則是串列中的 head,為消費者取得資料的目標處。

struct __kfifo {
    unsigned int in;        // 下一次寫入的位置
    unsigned int out;       // 下一次讀取的位置
    unsigned int mask;      // buffer size - 1
    unsigned int esize;     // buffer 中每個元素的大小
    void *data;             // 指向實際儲存空間的指標
};

kfifo 要求 buffer size 為 2 的冪,這樣做可以讓傳入的 offset 和 mask (buffer size - 1)& 快速得到 mod 後的值,得到的值即為目前要在環中做其他操作所需使用的 index,以 buffer size 為 16 來舉例:

buffer size = 16 -> 10000
       mask = 15 -> 01111
        off = 18 -> 10010
off &= mask =  2 -> 00010

kfifo_copy_inkfifo_copy_out 中可以看到這樣的操作,且在函式的最後使用 smp_wmb() ( Symmetric Multi-Processing Write Memory Barrier ) 來確保所有的寫入操作在這個 memory barrier 之前完成,讓程式不會因為 CPU 或編譯器的 reordering 而被打亂順序,其他還有 smp_mb()smp_rmb() 的類似應用。

off &= fifo->mask;
if (esize != 1) {
    off *= esize;
    size *= esize;
    len *= esize;
}
l = min(len, size - off);

memcpy(fifo->data + off, src, l);
memcpy(fifo->data, src + l, len - l);
/*
 * make sure that the data in the fifo is up to date before
 * incrementing the fifo->in index counter
 */
smp_wmb();

若想得到整個 buffer 的長度以及大小,可以透過 esize 以及 mask

unsigned int length = fifo->mask + 1;
unsigned int size = length * fifo->esize;

接著介紹 simrupt.c 中,有使用到的 kfifo APIs 以及與他們相關的一些函式。

kfifo_init

有兩種方式可以用來初始化 kfifo,但兩者在使用的細節上有些許不同,在 simrupt.c 中使用巨集 DECLARE_KFIFO_PTR,另外一種是使用 kfifo_init,先看到 simrupt.c 中的用法,使用巨集 DECLARE_KFIFO_PTR 初始化之後,需要再以 kfifo_alloc 來配置其中的 buffer。

/**
 * DECLARE_KFIFO_PTR - macro to declare a fifo pointer object
 * @fifo: name of the declared fifo
 * @type: type of the fifo elements
 */
#define DECLARE_KFIFO_PTR(fifo, type) \
    STRUCT_KFIFO_PTR(type) fifo

kfifo_alloc 需傳入指向 kfifo 的指標,要分配的 buffer 大小,以及要使用的 GFP mask,在 __kfifo_alloc 中會使用 roundup_pow_of_two 將 buffer 規定為 2 的冪,並以 kmalloc_array 實際將空間配置完畢。

#define kfifo_alloc(fifo, size, gfp_mask) \
__kfifo_int_must_check_helper( \
({ \
	typeof((fifo) + 1) __tmp = (fifo); \
	struct __kfifo *__kfifo = &__tmp->kfifo; \
	__is_kfifo_ptr(__tmp) ? \
	__kfifo_alloc(__kfifo, size, sizeof(*__tmp->type), gfp_mask) : \
	-EINVAL; \
}) \
)

simrupt.c 中的初始化範例:

/* Data are stored into a kfifo buffer before passing them to the userspace */
static DECLARE_KFIFO_PTR(rx_fifo, unsigned char);

static int __init simrupt_init(void)
{
    dev_t dev_id;
    int ret;

    if (kfifo_alloc(&rx_fifo, PAGE_SIZE, GFP_KERNEL) < 0)
        return -ENOMEM;
    ...
}

要想使用 kfifo_init 的話,需要先宣告要使用的 buffer 並以 kmalloc 將其配置好,再將之帶入參數進行初始化,以下是一個簡單的範例:

/**
 * kfifo_init - initialize a fifo using a preallocated buffer
 * @fifo: the fifo to assign the buffer
 * @buffer: the preallocated buffer to be used
 * @size: the size of the internal buffer, this have to be a power of 2
 *
 * This macro initializes a fifo using a preallocated buffer.
 *
 * The number of elements will be rounded-up to a power of 2.
 * Return 0 if no error, otherwise an error code.
 */
#define kfifo_init(fifo, buffer, size)
#define FIFO_SIZE 16

static struct kfifo my_kfifo;
static unsigned char *buffer;

static int __init module_init(void)
{
    int ret;

    buffer = kmalloc(FIFO_SIZE, GFP_KERNEL);
    if (!buffer) {
        pr_err("Failed to allocate buffer\n");
        return -ENOMEM;
    }

    ret = kfifo_init(&my_kfifo, buffer, FIFO_SIZE);
    if (ret) {
        pr_err("Failed to initialize kfifo\n");
        kfree(buffer);
        return ret;
    }
    ...
}

kfifo_in

介紹完初始化,接著是要將資料寫入到 kfifo 中,可以透過巨集 kfifo_in 來完成,他接收三個參數:

  • fifo : 指向 kfifo 結構的指標。
  • buf : 要寫入的資料。
  • n : 該筆資料的長度。
#define	kfifo_in(fifo, buf, n) \
({ \
	typeof((fifo) + 1) __tmp = (fifo); \
	typeof(__tmp->ptr_const) __buf = (buf); \
	unsigned long __n = (n); \
	const size_t __recsize = sizeof(*__tmp->rectype); \
	struct __kfifo *__kfifo = &__tmp->kfifo; \
	(__recsize) ?\
	__kfifo_in_r(__kfifo, __buf, __n, __recsize) : \
	__kfifo_in(__kfifo, __buf, __n); \
})

kfifo_in 中會再呼叫 __kfifo_in 函式處理實際的資料寫入,首先以 kfifo_unused 獲取 fifo 中未使用的空間大小並存入 l 中,檢查長度否超出可用空間,若超過則將長度設為 l,防止資料溢出 buffer,接著使用 kfifo_copy_in 將資料複製到 fifo 中,資料長度為 len,起始位置為 fifo->in,最後以 fifo->in += len 更新下一次寫入的位置。

unsigned int __kfifo_in(struct __kfifo *fifo,
		const void *buf, unsigned int len)
{
	unsigned int l;

	l = kfifo_unused(fifo);
	if (len > l)
		len = l;

	kfifo_copy_in(fifo, buf, len, fifo->in);
	fifo->in += len;
	return len;
}

kfifo_unused

__kfifo_in 中使用到此函式來計算 kfifo 中未使用的空間大小,其中 fifo->mask + 1 表示 buffer 的總大小,kfifo_len(fifo) 會回傳 buffer 中已使用的大小。

static inline unsigned int kfifo_unused(struct __kfifo *fifo)
{
	return fifo->mask + 1 - kfifo_len(fifo);
}

kfifo_copy_in

kfifo_copy_in 中,會先計算出資料要複製到 buffer 中的哪個位置,會使用到前面所提到的 fifo->mask& 來快速取的取模後的值,又因為 ring buffer 的特性,資料在寫入時會被分成兩部分存取,一是從傳入的 fifo->in 位置寫入,接著計算出後半段後,將其從開頭處再繼續儲存,因此才會看到以下程式碼中的兩次 memcpy

static void kfifo_copy_in(struct __kfifo *fifo, const void *src,
		unsigned int len, unsigned int off)
{
	unsigned int size = fifo->mask + 1;
	unsigned int esize = fifo->esize;
	unsigned int l;

	off &= fifo->mask;
	if (esize != 1) {
		off *= esize;
		size *= esize;
		len *= esize;
	}
	l = min(len, size - off);

	memcpy(fifo->data + off, src, l);
	memcpy(fifo->data, src + l, len - l);
	/*
	 * make sure that the data in the fifo is up to date before
	 * incrementing the fifo->in index counter
	 */
	smp_wmb();
}

kfifo_to_user

巨集 kfifo_to_user 用於將資料從 kfifo 複製到 userspace 中。

  • fifo : 指向 kfifo 結構的指標。
  • to : 指向 userspace 中 buffer 的指標。
  • len : 要複製的資料長度。
  • copied : 指向實際複製的數據長度的指標。
#define	kfifo_to_user(fifo, to, len, copied) \
__kfifo_int_must_check_helper( \
({ \
	typeof((fifo) + 1) __tmp = (fifo); \
	void __user *__to = (to); \
	unsigned int __len = (len); \
	unsigned int *__copied = (copied); \
	const size_t __recsize = sizeof(*__tmp->rectype); \
	struct __kfifo *__kfifo = &__tmp->kfifo; \
	(__recsize) ? \
	__kfifo_to_user_r(__kfifo, __to, __len, __copied, __recsize) : \
	__kfifo_to_user(__kfifo, __to, __len, __copied); \
}) \
)

kfifo_to_user 會呼叫 __kfifo_to_user 將數據實際複製到 userspace 的 buffer ,其中主要是以 kfifo_copy_to_userto 為 userspace 中的 buffer,len 為要複製的長度,fifo->out 為起始的讀取位置,copied 與剛才所提到的相同,當複製完成後會將 fifo->out += len 更新下一次的讀取位置。

int __kfifo_to_user(struct __kfifo *fifo, void __user *to,
		unsigned long len, unsigned int *copied)
{
	...
        
	ret = kfifo_copy_to_user(fifo, to, len, fifo->out, copied);
	if (unlikely(ret)) {
		len -= ret;
		err = -EFAULT;
	} else
		err = 0;
	fifo->out += len;
	return err;
}

開發紀錄務必依據以下: