Linux
我們常見的資料同步機制有 Spinlock、Mutex、Semaphore 共三種,而常見的問題則有 Dead lock、Race Condition、Critical Section、Algorithm for Two Processes、Bakery Algorithm…等。
互斥鎖
(英語:Mutual exclusion,縮寫 Mutex)是一種用於多執行緒編程中,防止兩條執行緒同時對同一公共資源(比如全域變數)進行讀寫的機制。該目的通過將代碼切片成一個一個的臨界區域(critical section)達成。臨界區域指的是一塊對公共資源進行存取的代碼,並非一種機制或是演算法。一個程式、行程、執行緒可以擁有多個臨界區域,但是並不一定會應用互斥鎖。
Spinlock 中文稱做 自旋鎖
,透過名稱我們就能大概猜到 Spinlock 的功用。與 Mutex 相同,Spinlock 可以用來保護 Critical section,如果執行緒沒有獲取鎖,則會進入迴圈直到獲得上鎖的資格(Busy waiting),因此叫做自旋鎖。
typedef struct spinlock{
volatile uint lock;
} spinlock_t;
void lock(spinlock_t *lock){
while(xchg(lock−>lock, 1) != 0);
}
void unlock(spinlock_t *lock){
lock->lock = 0;
}
Semaphore 是一個同步物件,用於保持在 0 至指定最大值之間的一個計數值。比起之前介紹的 spinlock 和 mutex lock,semaphore 更像是一個指示號誌。
當執行緒完成一次對該 semaphore 物件的等待 (wait) 時,該計數值減一;當執行緒完成一次對 semaphore 物件的釋放 (release) 時,計數值加一。
semaphore 物件的計數值大於 0,為 signaled 狀態;計數值等於 0,為 nonsignaled 狀態。
也就是說,當計數值為 0 時,處於 nonsignald 狀態的 semaphore 是無法供執行緒使用的,除非該 semaphore 再次轉為 signaled 狀態。
在介紹完 Spinlock 、 Mutex 與 Spinlock 的差別後,再補充一下這兩個鎖適合應用在哪些場景:
簡單來說,當有兩個 Process 或是 Thread 需要彼此目前佔有的資源,而兩者卻又不願意釋放資源時,便會讓彼此的進度卡住,形成死結。又好比兩個人做交易,買家堅持賣家先出貨,賣家則堅持買家先付款,導致交易僵持,如果在電腦中出現類似的情況,就稱為 Deadlock。
當一個 Process 遲遲無法獲得所需資源,進入長期等待,這種情況就可以稱為 Starvation (飢餓)。作業系統為了避免飢餓/死結發生,除了上面提到的可搶斷式排程,也添加了老化的機制,也就是: 當一個 Process 久久沒有執行完成,作業系統便會逐步調高它的優先權,增加該 Process 完成的速度。
Race condition 用來描述一個系統或者 Process 的輸出依賴於不受控制的事件出現順序、出現時機。
像是有多個 Process 嘗試存取同一個記憶體位置,如果沒有處理好,就有可能發生無法預期的執行結果,這種情況容易發生在錯誤的後端程式、資料庫、檔案系統設計或是其他採用多執行緒設計的程式。
#include <pthread.h>
void* myfunc(void* ptr) {
int i = *((int *) ptr);
printf("%d ", i);
return NULL;
}
int main() {
int i;
pthread_t tid;
for(i =0; i < 10; i++) {
pthread_create(&tid, NULL, myfunc, &i);
}
pthread_exit(NULL);
}
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine) (void *), void *arg);
Critical sections 代表某程式區段只能在同一時間被一個執行緒處理,如果有多個執行緒同時執行了這段程式碼,可能會有超出預期的錯誤行為出現。
int sum = 0;
void *countgold(void *param)
{
int i;
for (i = 0; i < 10000000; i++)
{
sum += 1;
}
return NULL;
}
// ...
{
for (i = 0; i < 10000000; i++)
{
// critical section!
sum += 1;
// end
}
}
// ...
在此就不花費過大的篇幅去說明,僅整理及簡述其他問題的大概,詳細請自行 Google。
"Documentation/kernel-hacking/locking.rst"
https://elixir.bootlin.com/linux/v4.14.50/source/kernel/locking/mutex.c
mutex_lock()
/**
* mutex_lock - acquire the mutex
* @lock: the mutex to be acquired
*
* Lock the mutex exclusively for this task. If the mutex is not
* available right now, it will sleep until it can get it.
*
* The mutex must later on be released by the same task that
* acquired it. Recursive locking is not allowed. The task
* may not exit without first unlocking the mutex. Also, kernel
* memory where the mutex resides must not be freed with
* the mutex still locked. The mutex must first be initialized
* (or statically defined) before it can be locked. memset()-ing
* the mutex to 0 is not allowed.
*
* (The CONFIG_DEBUG_MUTEXES .config option turns on debugging
* checks that will enforce the restrictions and will also do
* deadlock debugging)
*
* This function is similar to (but not equivalent to) down().
*/
void __sched mutex_lock(struct mutex *lock)
{
might_sleep();
if (!__mutex_trylock_fast(lock))
__mutex_lock_slowpath(lock);
}
EXPORT_SYMBOL(mutex_lock);
might_sleep()
,否則進入 __mutex_lock_slowpath()
坐下一步。__mutex_lock_slowpath()
static noinline void __sched
__mutex_lock_slowpath(struct mutex *lock)
{
__mutex_lock(lock, TASK_UNINTERRUPTIBLE, 0, NULL, _RET_IP_);
}
__mutex_lock()
__mutex_lock()
static int __sched
__mutex_lock(struct mutex *lock, long state, unsigned int subclass,
struct lockdep_map *nest_lock, unsigned long ip)
{
return __mutex_lock_common(lock, state, subclass, nest_lock, ip, NULL, false);
}
__mutex_lock_common()
__mutex_lock_common()
static __always_inline int __sched
__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
struct lockdep_map *nest_lock, unsigned long ip,
struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
struct mutex_waiter waiter;
bool first = false;
struct ww_mutex *ww;
int ret;
might_sleep();
ww = container_of(lock, struct ww_mutex, base);
if (use_ww_ctx && ww_ctx) {
if (unlikely(ww_ctx == READ_ONCE(ww->ctx)))
return -EALREADY;
}
preempt_disable();
mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);
if (__mutex_trylock(lock) ||
mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, NULL)) {
/* got the lock, yay! */
lock_acquired(&lock->dep_map, ip);
if (use_ww_ctx && ww_ctx)
ww_mutex_set_context_fastpath(ww, ww_ctx);
preempt_enable();
return 0;
}
spin_lock(&lock->wait_lock);
/*
* After waiting to acquire the wait_lock, try again.
*/
if (__mutex_trylock(lock)) {
if (use_ww_ctx && ww_ctx)
__ww_mutex_wakeup_for_backoff(lock, ww_ctx);
goto skip_wait;
}
debug_mutex_lock_common(lock, &waiter);
debug_mutex_add_waiter(lock, &waiter, current);
lock_contended(&lock->dep_map, ip);
if (!use_ww_ctx) {
/* add waiting tasks to the end of the waitqueue (FIFO): */
list_add_tail(&waiter.list, &lock->wait_list);
#ifdef CONFIG_DEBUG_MUTEXES
waiter.ww_ctx = MUTEX_POISON_WW_CTX;
#endif
} else {
/* Add in stamp order, waking up waiters that must back off. */
ret = __ww_mutex_add_waiter(&waiter, lock, ww_ctx);
if (ret)
goto err_early_backoff;
waiter.ww_ctx = ww_ctx;
}
waiter.task = current;
if (__mutex_waiter_is_first(lock, &waiter))
__mutex_set_flag(lock, MUTEX_FLAG_WAITERS);
set_current_state(state);
for (;;) {
/*
* Once we hold wait_lock, we're serialized against
* mutex_unlock() handing the lock off to us, do a trylock
* before testing the error conditions to make sure we pick up
* the handoff.
*/
if (__mutex_trylock(lock))
goto acquired;
/*
* Check for signals and wound conditions while holding
* wait_lock. This ensures the lock cancellation is ordered
* against mutex_unlock() and wake-ups do not go missing.
*/
if (unlikely(signal_pending_state(state, current))) {
ret = -EINTR;
goto err;
}
if (use_ww_ctx && ww_ctx && ww_ctx->acquired > 0) {
ret = __ww_mutex_lock_check_stamp(lock, &waiter, ww_ctx);
if (ret)
goto err;
}
spin_unlock(&lock->wait_lock);
schedule_preempt_disabled();
/*
* ww_mutex needs to always recheck its position since its waiter
* list is not FIFO ordered.
*/
if ((use_ww_ctx && ww_ctx) || !first) {
first = __mutex_waiter_is_first(lock, &waiter);
if (first)
__mutex_set_flag(lock, MUTEX_FLAG_HANDOFF);
}
set_current_state(state);
/*
* Here we order against unlock; we must either see it change
* state back to RUNNING and fall through the next schedule(),
* or we must see its unlock and acquire.
*/
if (__mutex_trylock(lock) ||
(first && mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, &waiter)))
break;
spin_lock(&lock->wait_lock);
}
spin_lock(&lock->wait_lock);
acquired:
__set_current_state(TASK_RUNNING);
mutex_remove_waiter(lock, &waiter, current);
if (likely(list_empty(&lock->wait_list)))
__mutex_clear_flag(lock, MUTEX_FLAGS);
debug_mutex_free_waiter(&waiter);
skip_wait:
/* got the lock - cleanup and rejoice! */
lock_acquired(&lock->dep_map, ip);
if (use_ww_ctx && ww_ctx)
ww_mutex_set_context_slowpath(ww, ww_ctx);
spin_unlock(&lock->wait_lock);
preempt_enable();
return 0;
err:
__set_current_state(TASK_RUNNING);
mutex_remove_waiter(lock, &waiter, current);
err_early_backoff:
spin_unlock(&lock->wait_lock);
debug_mutex_free_waiter(&waiter);
mutex_release(&lock->dep_map, 1, ip);
preempt_enable();
return ret;
}
ww = container_of(lock, struct ww_mutex, base);
preempt_disable();
spin_lock(&lock->wait_lock);
__mutex_set_flag(lock, MUTEX_FLAG_WAITERS);
https://elixir.bootlin.com/linux/v4.14.50/source/include/linux/spinlock.h
spin_lock()
static __always_inline void spin_lock(spinlock_t *lock)
{
raw_spin_lock(&lock->rlock);
}
raw_spin_lock()
raw_spin_lock()
#define raw_spin_lock(lock) _raw_spin_lock(lock)
_raw_spin_lock()
https://elixir.bootlin.com/linux/v4.14.50/source/kernel/locking/spinlock.c
_raw_spin_lock()
void __lockfunc _raw_spin_lock(raw_spinlock_t *lock)
{
__raw_spin_lock(lock);
}
EXPORT_SYMBOL(_raw_spin_lock);
__raw_spin_lock()
__raw_spin_lock()
static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
preempt_disable();
spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}
sem_t semaphore
struct semaphore {
raw_spinlock_t lock;
unsigned int count;
struct list_head wait_list;
};
sem_init
int __init sem_init(void)
{
const int err = sem_init_ns(&init_ipc_ns);
ipc_init_proc_interface("sysvipc/sem",
" key semid perms nsems uid gid cuid cgid otime ctime\n",
IPC_SEM_IDS, sysvipc_sem_proc_show);
return err;
}
sem_lock()
/*
* If the request contains only one semaphore operation, and there are
* no complex transactions pending, lock only the semaphore involved.
* Otherwise, lock the entire semaphore array, since we either have
* multiple semaphores in our own semops, or we need to look at
* semaphores from other pending complex operations.
*/
static inline int sem_lock(struct sem_array *sma, struct sembuf *sops,
int nsops)
{
struct sem *sem;
if (nsops != 1) {
/* Complex operation - acquire a full lock */
ipc_lock_object(&sma->sem_perm);
/* Prevent parallel simple ops */
complexmode_enter(sma);
return SEM_GLOBAL_LOCK;
}
/*
* Only one semaphore affected - try to optimize locking.
* Optimized locking is possible if no complex operation
* is either enqueued or processed right now.
*
* Both facts are tracked by use_global_mode.
*/
sem = &sma->sems[sops->sem_num];
/*
* Initial check for use_global_lock. Just an optimization,
* no locking, no memory barrier.
*/
if (!sma->use_global_lock) {
/*
* It appears that no complex operation is around.
* Acquire the per-semaphore lock.
*/
spin_lock(&sem->lock);
/* pairs with smp_store_release() */
if (!smp_load_acquire(&sma->use_global_lock)) {
/* fast path successful! */
return sops->sem_num;
}
spin_unlock(&sem->lock);
}
/* slow path: acquire the full lock */
ipc_lock_object(&sma->sem_perm);
if (sma->use_global_lock == 0) {
/*
* The use_global_lock mode ended while we waited for
* sma->sem_perm.lock. Thus we must switch to locking
* with sem->lock.
* Unlike in the fast path, there is no need to recheck
* sma->use_global_lock after we have acquired sem->lock:
* We own sma->sem_perm.lock, thus use_global_lock cannot
* change.
*/
spin_lock(&sem->lock);
ipc_unlock_object(&sma->sem_perm);
return sops->sem_num;
} else {
/*
* Not a false alarm, thus continue to use the global lock
* mode. No need for complexmode_enter(), this was done by
* the caller that has set use_global_mode to non-zero.
*/
return SEM_GLOBAL_LOCK;
}
}