owned this note
owned this note
Published
Linked with GitHub
# ARMv7 Linux 互斥鎖(mutex)分析
###### tags: `twlkh`, `mutex`, `lock`, `concurrency`
Author: [王舜緯](https://www.facebook.com/profile.php?id=100000535188688&fref=ufi)
License: [CC-BY-4.0](https://creativecommons.org/licenses/by/4.0/deed.zh_TW)
**Outline:**
* mutex資料結構
* mutex初始化
* mutex上鎖流程
* mutex解鎖流程
* 結論
----
## 1. mutex 資料結構
``` clike=
struct mutex {
/* 1: unlocked, 0: locked, negative: locked, possible waiters */
atomic_t count;
spinlock_t wait_lock;
struct list_head wait_list;
#if defined(CONFIG_DEBUG_MUTEXES) || defined(CONFIG_MUTEX_SPIN_ON_OWNER)
struct task_struct *owner;
#endif
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
#ifdef CONFIG_DEBUG_MUTEXES
void *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};
```
mutex 主要是利用一個 atomic_t 的變數 count 去描述鎖的狀態(是否被鎖定)。
>當count = 1 表示資源未被鎖定,
>count = 0 表示被鎖定
>count < 0 表示被鎖定,並且可能有等待者等待資源被解鎖[name=王舜緯][time=Sat, Nov 5, 2016 2:23 PM][color=#78bc38]
wait_list 是將 waiter 串起來的鍊結串列, wait_lock 是用來保護 wait_list 的自旋鎖。
此外,從 wait_list 的存在可以推測 mutex 上鎖有可能造成當前行程休眠。因此, mutex 不可以使用在中斷空間(interrput context)的資源保護。
----
## 2. mutex 初始化
``` clike=
/**
* mutex_init - initialize the mutex
* @mutex: the mutex to be initialized
*
* Initialize the mutex to unlocked state.
*
* It is not allowed to initialize an already locked mutex.
*/
# define mutex_init(mutex) \
do { \
static struct lock_class_key __key; \
\
__mutex_init((mutex), #mutex, &__key); \
} while (0)
```
mutex_init 巨集主要是呼叫 __mutex_init 函式去進行 mutex 的初始化。
``` clike=
void
__mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key)
{
atomic_set(&lock->count, 1);
spin_lock_init(&lock->wait_lock);
INIT_LIST_HEAD(&lock->wait_list);
mutex_clear_owner(lock);
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
osq_lock_init(&lock->osq);
#endif
debug_mutex_init(lock, name, key);
}
EXPORT_SYMBOL(__mutex_init);
```
__mutex_init 主要的功能在於將 mutex(lock) 的 count 成員變數透過 atomic_set 設為 1 (未上鎖狀態)。
----
## 3. mutex上鎖流程
mutex 上鎖的函式有以下幾種:
* mutex_lock()
* mutex_lock_interruptible()
* mutex_lock_killable()
* mutex_trylock()
### 3.1 mutex_lock()
``` clike=
void __sched mutex_lock(struct mutex *lock)
{
might_sleep();
/*
* The locking fastpath is the 1->0 transition from
* 'unlocked' into 'locked' state.
*/
__mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath);
mutex_set_owner(lock); //將mutex的所有者設成current
}
```
might_sleep() 巨集是當 CONFIG_PREEMPT_VOLUNTARY 核心組態被設定時,會執行_cond_resched(),檢視現在是否需要重新排程。一旦需要的話現在的行程(strcut task_struct *current)會讓渡CPU的所有權給較高優先權的行程。也就是說,在上鎖之前會安排一個 preemtion point。
```clike=
#define might_sleep() do { might_resched(); } while (0)
#ifdef CONFIG_PREEMPT_VOLUNTARY
extern int _cond_resched(void);
# define might_resched() _cond_resched()
#else
# define might_resched() do { } while (0)
#endif
```
__mutex_fastpath_lock() 最終會呼叫到 arch/arm/include/asm/atomic.h 的 atomic\_##op##_return_relaxed 的通用巨集,此時op為sub。並且,當回傳值是負數時,會呼叫__mutex_lock_slowpath。
----
``` clike=
@inlcude/asm-generic/mutex-dec.h
static inline void
__mutex_fastpath_lock(atomic_t *count, void (*fail_fn)(atomic_t *))
{
if (unlikely(atomic_dec_return_acquire(count) < 0))
fail_fn(count);
}
@linux/include/atomic.h
#ifndef atomic_dec_return_acquire
#define atomic_dec_return_acquire(...) \
__atomic_op_acquire(atomic_dec_return, __VA_ARGS__)
#endif
(in here, op = atomic_dec_return)
#ifndef __atomic_op_acquire
#define __atomic_op_acquire(op, args...) \
({ \
typeof(op##_relaxed(args)) __ret = op##_relaxed(args); \
smp_mb__after_atomic(); \
__ret; \
})
#endif
@arch/arm/asm/atomic.h
#define atomic_dec_return_relaxed(v) (atomic_sub_return_relaxed(1, v))
(in here, op = sub)
#define ATOMIC_OP_RETURN(op, c_op, asm_op) \
static inline int atomic_##op##_return_relaxed(int i, atomic_t *v) \
{ \
unsigned long tmp; \
int result; \
\
prefetchw(&v->counter); \
\
__asm__ __volatile__("@ atomic_" #op "_return\n" \
"1: ldrex %0, [%3]\n" \
" " #asm_op " %0, %0, %4\n" \
" strex %1, %0, [%3]\n" \
" teq %1, #0\n" \
" bne 1b" \
: "=&r" (result), "=&r" (tmp), "+Qo" (v->counter) \
: "r" (&v->counter), "Ir" (i) \
: "cc"); \
\
return result; \
}
```
在此時觀察__asm__ \_\_volatile__ 裡的內嵌組語可以知道,裡面主要是用 ldrex 和 strex 指令來將mutex的count值減1。並將減1後的值派賦到result後回傳。
----
#### 3.1.1 Debugger 分析上鎖流程


此時 r0 值為 &v->counter, r2為tmp, r3為 result值。
strex r2, r3, [r0] 指令的功能為:
當將 r3 的值寫入到 r0 指向的記憶體位址時,若互斥寫入成功則r2的值為0,否則為1。
並且在下個指令比較 r2 值是否為0(寫入是否成功),如果失敗的話就會跳到 ldrex 的指令重新執行。
----
### 3.1.2 __mutex_lock_slowpath() --- 重複上鎖
**Recall:**
```clike=
void __sched mutex_lock(struct mutex *lock)
{
might_sleep();
/*
* The locking fastpath is the 1->0 transition from
* 'unlocked' into 'locked' state.
*/
__mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath);
mutex_set_owner(lock); //將mutex的所有者設成current
}
@inlcude/asm-generic/mutex-dec.h
static inline void
__mutex_fastpath_lock(atomic_t *count, void (*fail_fn)(atomic_t *))
{
if (unlikely(atomic_dec_return_acquire(count) < 0))
fail_fn(count);
}
```
當 atomic_dec_return_aquire 回傳值小於0,也就是資源被重複上鎖時,mutex_lock_slowpath() 會被呼叫。
``` clike=
__visible void __sched
__mutex_lock_slowpath(atomic_t *lock_count)
{
struct mutex *lock = container_of(lock_count, struct mutex, count);
__mutex_lock_common(lock, TASK_UNINTERRUPTIBLE, 0,
NULL, _RET_IP_, NULL, 0);
}
```
可以看到, __mutex_lock_slowpath() 最後會將當前行程設成 TASK_UNINTERRUPTIBLE 並進入休眠。
為了一探究竟,我們刻意用一個核心模組去創造這樣的情境。
``` clike=
static ssize_t devone_write(struct file *filp, const char __user *buf,
size_t count, loff_t *f_pos)
{
struct devone_data *p = filp->private_data;
unsigned char val;
int ret = 0;
printk("[%s]%s: count %d pos %lld\n", DRIVER_NAME, __func__,
count, *f_pos);
if (count >= 1) {
if (copy_from_user(&val, &buf[0], 1)) {
ret = -EFAULT;
goto out;
}
printk(KERN_INFO "%02x\n", val);
mutex_lock(&p->lock); //lock
p->val = val;
mutex_unlock(&p->lock); // unlock
ret = count;
}
out:
return ret;
}
static ssize_t devone_read(struct file *filp, char __user *buf,
size_t count, loff_t *f_pos)
{
struct devone_data *p = filp->private_data;
unsigned char val;
int ret = 0;
int i = 0;
printk("[%s]%s: count %d pos %lld\n", DRIVER_NAME, __func__,
count, *f_pos);
mutex_lock(&p->lock); // lock
val = p->val;
//mutex_unlock(&p->lock); // for experiement, we dont unlock this mutex
for (i = 0; i < count; i++) {
if(copy_to_user(&buf[i], &val, 1)) {
ret = -EFAULT;
goto out;
}
}
ret = count;
out:
return ret;
}
```
**ps. 以上程式碼來自於平田豐的Linux驅動程式設計一書範例改寫**
當先對資源讀取後,刻意讓鎖定不會被解除。 之後再對資源進行寫入,便會將當前行程休眠,且不可中斷。 於是系統便進入死結,只有重新開機才可以恢復。
```
[root@vexpress devone]# ./devone_usr c (前台執行)
[ 42.285540] devone: major 252 minor 0 (pid 740)
[ 42.285971] devone: i_private = 9e84f810, private_data = 9f5e31c0
[ 42.286439] CPU: 0 PID: 740 Comm: devone_usr Not tainted 4.7.9+ #10
[ 42.286866] Hardware name: ARM-Versatile Express
[ 42.287273] [<80110fec>] (unwind_backtrace) from [<8010c9b8>] (show_stack+0x20/0x24)
[ 42.287843] [<8010c9b8>] (show_stack) from [<803cdf98>] (dump_stack+0xb8/0xe4)
[ 42.288371] [<803cdf98>] (dump_stack) from [<8044b9cc>] (devone_open+0xd8/0xe4)
[ 42.288907] [<8044b9cc>] (devone_open) from [<8023cddc>] (chrdev_open+0xe0/0x1a0)
[ 42.289442] [<8023cddc>] (chrdev_open) from [<802358e4>] (do_dentry_open+0x1f4/0x314)
[ 42.289994] [<802358e4>] (do_dentry_open) from [<80236b28>] (vfs_open+0x6c/0x90)
[ 42.290538] [<80236b28>] (vfs_open) from [<802465a0>] (path_openat+0x2bc/0x103c)
[ 42.291084] [<802465a0>] (path_openat) from [<80248218>] (do_filp_open+0x74/0xd8)
[ 42.291628] [<80248218>] (do_filp_open) from [<80236e90>] (do_sys_open+0x11c/0x1cc)
[ 42.292272] [<80236e90>] (do_sys_open) from [<80236f6c>] (SyS_open+0x2c/0x30)
[ 42.292901] [<80236f6c>] (SyS_open) from [<80108180>] (ret_fast_syscall+0x0/0x1c)
[ 42.293827] [devone]devone_read: count 8 pos 0
�������
[ 42.296565] [devone]devone_write: count 1 pos 0
[ 42.297040] 63
<-- 最終行程卡此處沒有反應
```
>此時如果同樣的情形發生在飛機控制系統的話,後果將會不可收拾。[name=王舜緯][time=Sat, Nov 5, 2016 4:27 PM][color=#c7f4a1]
```
[root@vexpress devone]# ./devone_usr c & (後台執行)
[root@vexpress devone]# top
PID PPID USER STAT VSZ %VSZ CPU %CPU COMMAND
750 736 0 R 2176 0.4 0 0.3 top
427 2 0 SW 0 0.0 0 0.0 [kworker/0:1]
15 2 0 SW 0 0.0 1 0.0 [kworker/1:0]
34 2 0 SW 0 0.0 0 0.0 [kworker/u8:2]
736 1 0 S 2176 0.4 3 0.0 -/bin/sh
1 0 0 S 2172 0.4 2 0.0 {linuxrc} init
739 736 0 D 1564 0.3 1 0.0 ./devone_usr c <== 行程在DEAD狀態,不可能回到 TASK_RUNNING
```
**可以看到 mutex 的 counter 為 -1**

----
### 3.2 mutex_lock_interruptible() 與 mutex_lock_killable() --- 較安全的上鎖方式
``` clike=
int __sched mutex_lock_interruptible(struct mutex *lock)
{
int ret;
might_sleep();
ret = __mutex_fastpath_lock_retval(&lock->count);
if (likely(!ret)) {
mutex_set_owner(lock);
return 0;
} else
return __mutex_lock_interruptible_slowpath(lock);
}
int __sched mutex_lock_killable(struct mutex *lock)
{
int ret;
might_sleep();
ret = __mutex_fastpath_lock_retval(&lock->count);
if (likely(!ret)) {
mutex_set_owner(lock);
return 0;
} else
return __mutex_lock_killable_slowpath(lock);
}
```
從程式流程來看似乎 mutex_lock()沒有什麼不同。但最大的差別如下:
``` clike=
static noinline int __sched
__mutex_lock_killable_slowpath(struct mutex *lock)
{
return __mutex_lock_common(lock, TASK_KILLABLE, 0,
NULL, _RET_IP_, NULL, 0);
}
static noinline int __sched
__mutex_lock_interruptible_slowpath(struct mutex *lock)
{
return __mutex_lock_common(lock, TASK_INTERRUPTIBLE, 0,
NULL, _RET_IP_, NULL, 0);
}
```
也就是說,mutex_lock_interruptible()會將競爭鎖失敗的行程射為 TASK_INTERRUPTIBLE 並進入等待, 而 mutex_lock_killable() 則會設成 TASK_KILLABLE。
因此,當前台執行行程進入死結可以透過 Ctrl + C 強行將行程殺死。
而不是讓系統癱瘓。
----
## 4. mutex 解鎖流程
* mutex_unlock()
``` clike=
void __sched mutex_unlock(struct mutex *lock)
{
/*
* The unlocking fastpath is the 0->1 transition from 'locked'
* into 'unlocked' state:
*/
#ifndef CONFIG_DEBUG_MUTEXES
/*
* When debugging is enabled we must not clear the owner before time,
* the slow path will always be taken, and that clears the owner field
* after verifying that it was indeed current.
*/
mutex_clear_owner(lock);
#endif
__mutex_fastpath_unlock(&lock->count, __mutex_unlock_slowpath);
}
```
可以從上鎖流程反推, 解鎖流程應該會是將 mutex 的 counter 透過 ldrex & strex 指令將它加1。~~並且當 counter 值從負數變為0時(這段只是猜測)~~,會將 wait_list 上等待的行程喚醒。
``` clike=
/**
* __mutex_fastpath_unlock - try to promote the count from 0 to 1
* @count: pointer of type atomic_t
* @fail_fn: function to call if the original value was not 0
*
* Try to promote the count from 0 to 1. If it wasn't 0, call <fail_fn>.
* In the failure case, this function is allowed to either set the value to
* 1, or to set it to a value lower than 1.
*
* If the implementation sets it to a value of lower than 1, then the
* __mutex_slowpath_needs_to_unlock() macro needs to return 1, it needs
* to return 0 otherwise.
*/
static inline void
__mutex_fastpath_unlock(atomic_t *count, void (*fail_fn)(atomic_t *))
{
if (unlikely(atomic_inc_return_release(count) <= 0))
fail_fn(count);
}
```
當加1後的 counter 仍然小於或等於0,代表有行程正在等待。便會進入 __mutex_unlock_slowpath()。
``` clike=
__visible void
__mutex_unlock_slowpath(atomic_t *lock_count)
{
struct mutex *lock = container_of(lock_count, struct mutex, count);
__mutex_unlock_common_slowpath(lock, 1);
}
static inline void
__mutex_unlock_common_slowpath(struct mutex *lock, int nested)
{
unsigned long flags;
WAKE_Q(wake_q);
/*
* As a performance measurement, release the lock before doing other
* wakeup related duties to follow. This allows other tasks to acquire
* the lock sooner, while still handling cleanups in past unlock calls.
* This can be done as we do not enforce strict equivalence between the
* mutex counter and wait_list.
*
*
* Some architectures leave the lock unlocked in the fastpath failure
* case, others need to leave it locked. In the later case we have to
* unlock it here - as the lock counter is currently 0 or negative.
*/
if (__mutex_slowpath_needs_to_unlock())
atomic_set(&lock->count, 1); <= 將counter 設成1,解鎖。
spin_lock_mutex(&lock->wait_lock, flags);
mutex_release(&lock->dep_map, nested, _RET_IP_);
debug_mutex_unlock(lock);
if (!list_empty(&lock->wait_list)) {
/* get the first entry from the wait-list: */
struct mutex_waiter *waiter =
list_entry(lock->wait_list.next,
struct mutex_waiter, list);
debug_mutex_wake_waiter(lock, waiter);
wake_q_add(&wake_q, waiter->task);
}
spin_unlock_mutex(&lock->wait_lock, flags);
wake_up_q(&wake_q); <= 喚醒行程
}
#define __mutex_slowpath_needs_to_unclock() 1
```
雖不中,亦不遠矣。事實上,當有行程在等待 mutex時, slowpath 會將 counter 重設為1,並將等待佇列的全部行程喚醒。
----
## 5. 結論
mutex 在 ARMv7 平台中是透過 exclusivly load & store 指令實現出來的。
雖然不太確定,但從 Patterson & Hennessy 的紅算盤介紹的內容可以猜測, MIPS 應該是透過 atomic swap 來實現 mutex。有時間可以追追看。
此外,ARMv6以前改變鎖狀態的atomic operation 透過中斷遮罩 ~~(應該是)~~ 的方式實現。
``` clike=
#define ATOMIC_OP(op, c_op, asm_op) \
static inline void atomic_##op(int i, atomic_t *v) \
{ \
unsigned long flags; \
\
raw_local_irq_save(flags); \
v->counter c_op i; \
raw_local_irq_restore(flags); \
} \
#define ATOMIC_OP_RETURN(op, c_op, asm_op) \
static inline int atomic_##op##_return(int i, atomic_t *v) \
{ \
unsigned long flags; \
int val; \
\
raw_local_irq_save(flags); \
v->counter c_op i; \
val = v->counter; \
raw_local_irq_restore(flags); \
\
return val; \
}
```
因此在ARMv6以前atomic operation的效能並不好。在 ELC 2016 Europe 的[IRQs: the Hard, the Soft, the Threaded and the Preemptible](https://www.youtube.com/watch?v=-pehAzaP1eg&list=PLbzoR-pLrL6pRFP6SOywVJWdEHlmQE51q&index=53)演講有簡略的提到。
**關於ARMv6的中斷遮罩**
將 CPSR 的 Bit7 設為1,就可以禁止中斷。
若要恢復則將 CPSR Bit7 設為 0。
[參考來源](https://www.heyrick.co.uk/armwiki/The_Status_register)
```clike=
#define arch_local_irq_enable arch_local_irq_enable
static inline void arch_local_irq_enable(void)
{
unsigned long temp;
asm volatile(
" mrs %0, cpsr @ arch_local_irq_enable\n"
" bic %0, %0, #128\n"
" msr cpsr_c, %0"
: "=r" (temp)
:
: "memory", "cc");
}
/*
* Disable IRQs
*/
#define arch_local_irq_disable arch_local_irq_disable
static inline void arch_local_irq_disable(void)
{
unsigned long temp;
asm volatile(
" mrs %0, cpsr @ arch_local_irq_disable\n"
" orr %0, %0, #128\n"
" msr cpsr_c, %0"
: "=r" (temp)
:
: "memory", "cc");
}
```