Try   HackMD

Synchronization (資料同步)

tags: Linux

目錄

簡介

我們常見的資料同步機制有 Spinlock、Mutex、Semaphore 共三種,而常見的問題則有 Dead lock、Race Condition、Critical Section、Algorithm for Two Processes、Bakery Algorithm等。

常見的資料同步機制

Mutex

互斥鎖(英語:Mutual exclusion,縮寫 Mutex)是一種用於多執行緒編程中,防止兩條執行緒同時對同一公共資源(比如全域變數)進行讀寫的機制。該目的通過將代碼切片成一個一個的臨界區域(critical section)達成。臨界區域指的是一塊對公共資源進行存取的代碼,並非一種機制或是演算法。一個程式、行程、執行緒可以擁有多個臨界區域,但是並不一定會應用互斥鎖。

  • Mutex lock 便是實踐 Sleep-waiting 的鎖。
  • 使用 Mutex lock 前須詳閱公開說明書,互斥鎖有以下特性:
    • 誰負責上鎖就由誰負責解鎖
    • 在銷毀鎖之前,必須確保沒有執行緒被這個鎖 block
    • Mutex lock 有專屬的 type: pthread_mutex_t

Spinlock

Spinlock 中文稱做 自旋鎖,透過名稱我們就能大概猜到 Spinlock 的功用。與 Mutex 相同,Spinlock 可以用來保護 Critical section,如果執行緒沒有獲取鎖,則會進入迴圈直到獲得上鎖的資格(Busy waiting),因此叫做自旋鎖。

  • 用 C 語言打造簡單的 Spinlock
typedef struct spinlock{
    volatile uint lock;
} spinlock_t;
void lock(spinlock_t *lock){
    while(xchg(lock−>lock, 1) != 0);
}
void unlock(spinlock_t *lock){
    lock->lock = 0;
}
  • Spinlock 便是實踐 Busy-waiting 的鎖。

Semaphore

Semaphore 是一個同步物件,用於保持在 0 至指定最大值之間的一個計數值。比起之前介紹的 spinlock 和 mutex lock,semaphore 更像是一個指示號誌。
當執行緒完成一次對該 semaphore 物件的等待 (wait) 時,該計數值減一;當執行緒完成一次對 semaphore 物件的釋放 (release) 時,計數值加一。
semaphore 物件的計數值大於 0,為 signaled 狀態;計數值等於 0,為 nonsignaled 狀態。
也就是說,當計數值為 0 時,處於 nonsignald 狀態的 semaphore 是無法供執行緒使用的,除非該 semaphore 再次轉為 signaled 狀態。

總結

在介紹完 Spinlock 、 Mutex 與 Spinlock 的差別後,再補充一下這兩個鎖適合應用在哪些場景:

  • spinlock
    • spinlock 適合用來保護一段簡單的操作,設想現在我們在排隊的機台,每個人只能使用 5 秒,那根本不值得讓我們離開去做別的事情,待在原地排隊是更好的選擇。
  • Mutex lock
    • 相反的,Mutex lock 適合拿來保護一段區域,以排吃到飽餐廳為例,當輪到我們入場時,店家會使用電話告知。因此在這段等待的時間,我們就可以到商場周邊晃晃避免空等。
      Semaphore 與 Mutex lock 的異曲同工之妙:
  • 文章前面有提到,Semaphore 用於保持在 0 至指定最大值之間的一個計數值。
  • 如果開發者將 Semaphore 設計成只會出現 0 和 1 兩種狀態,便稱為 Binary semaphore。
  • 仔細想想,Binary semaphore 同樣確保一次只能有一個執行緒獲得共用資源的存取權,比較不同的是 Binary semaphore 不像 Mutex lock 有 Owner ship 的概念。因此在應用上,Binary semaphore 會更具彈性。

常見的問題

Dead lock

簡單來說,當有兩個 Process 或是 Thread 需要彼此目前佔有的資源,而兩者卻又不願意釋放資源時,便會讓彼此的進度卡住,形成死結。又好比兩個人做交易,買家堅持賣家先出貨,賣家則堅持買家先付款,導致交易僵持,如果在電腦中出現類似的情況,就稱為 Deadlock。

  • Deadlock 觸發條件
    • No preemption: 占用系統資源的一個任務無法被強制停止。
    • Hold and wait: 一個行程可以在等待時持有系統資源。
    • Mutual exclusion: 系統資源只能同時分配給一個行程,無法在多個行程共享。
    • Circular waiting: 多個行程互相持有其他行程所需要的資源。

Starvation

當一個 Process 遲遲無法獲得所需資源,進入長期等待,這種情況就可以稱為 Starvation (飢餓)。作業系統為了避免飢餓/死結發生,除了上面提到的可搶斷式排程,也添加了老化的機制,也就是: 當一個 Process 久久沒有執行完成,作業系統便會逐步調高它的優先權,增加該 Process 完成的速度。

Race Condition

Race condition 用來描述一個系統或者 Process 的輸出依賴於不受控制的事件出現順序、出現時機。
像是有多個 Process 嘗試存取同一個記憶體位置,如果沒有處理好,就有可能發生無法預期的執行結果,這種情況容易發生在錯誤的後端程式、資料庫、檔案系統設計或是其他採用多執行緒設計的程式。

#include <pthread.h>
void* myfunc(void* ptr) {
    int i = *((int *) ptr);
    printf("%d ", i);
    return NULL;
}

int main() {
    int i;
    pthread_t tid;
    for(i =0; i < 10; i++) {
        pthread_create(&tid, NULL, myfunc, &i);
    }
    pthread_exit(NULL);
}
  • 上面的範例使用 POSIX Thread 建立 10 個執行緒,其程式目的是希望能夠接連印出 0 - 9 的數字。
  • 實際上,其執行結果與預期行為大有不同!程式執行後,印出的結果會在不同數字間跳動,其原因在查看 pthread_create() 的定義後便能輕鬆找出:
    int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
    ​						  void *(*start_routine) (void *), void *arg);
    
    • 我們可以看到,傳遞到子執行緒的參數必須是 void 指標,也就代表,我們並不是傳遞實體數字給子執行緒,是只交給他參數的記憶體位址而已。
    • 也因為這樣,當作業系統在各執行緒間反覆切換時各個子執行緒就會同時操作同一個記憶體的資料,無法確保執行緒的執行順序,這個情況就是 Race condition。

Critical Section

Critical sections 代表某程式區段只能在同一時間被一個執行緒處理,如果有多個執行緒同時執行了這段程式碼,可能會有超出預期的錯誤行為出現。

Image Not Showing Possible Reasons
  • The image was uploaded to a note which you don't have access to
  • The note which the image was originally uploaded to has been deleted
Learn More →

int sum = 0;

void *countgold(void *param)
{
    int i;
    for (i = 0; i < 10000000; i++)
    {
        sum += 1;
    }
    return NULL;
}
  • 該程式碼若是在單一執行緒上工作,並不會出現問題。
  • 不過!如果使用 POSIX Thread 建立多個執行緒處理 countgold() 時,由於多個執行緒同時存取同一個記憶體的資料 (在這個範例中為 sum ),便會造成前面提到的 Race condition。
  • 要避免 Race condition,我們只需要預防 Critical sections 在多個執行緒同時執行,我們先標示出程式中的 Critical sections:
    // ...
    ​{
    ​	for (i = 0; i < 10000000; i++)
    ​	{
    ​		// critical section!
    ​		sum += 1;
    ​		// end
    ​	}
    ​}
    ​// ...  
    
    • 問題的解決辦法就是: 在存取 sum 變數行為的前面加上一道鎖,任何執行緒訪問他之前都需要將鎖上鎖,等到操作完再由上鎖的執行緒做解鎖。

其他

在此就不花費過大的篇幅去說明,僅整理及簡述其他問題的大概,詳細請自行 Google。

  • Bounded-Buffer Problem
    • 假設有 n 個 buffers,每個都有資料,三個 semaphore 預設值為 mutex=1、full=0、empty=n。
  • Algorithm for Two Processes
    • 有兩個 Process P0 & P1,共享變數 int turn,turn = i > Pi 可進入 critical section。
  • Bakery Algorithm
    • 在進入 critical section 之前,每一個 Process 會抽號碼牌
    • 號碼牌數字最小的先進入 critical section (注意可能有相同的號碼牌,如下方程式碼以 max 實作,而 max 指令其實有好幾行)
    • 號碼牌數字的產生一定是 non-decreasing order; i.e. 1,2,3,3,4,5,5,5…
    • 若兩個 Process Pi & Pj 有相同的號碼牌,則 PID 小的先進入
  • Readers and Writers Problem
    • 有一個檔案或資料,有很多process要平行存取。

Linux 上資料同步的實作

"Documentation/kernel-hacking/locking.rst"

Mutex

https://elixir.bootlin.com/linux/v4.14.50/source/kernel/locking/mutex.c

  • mutex_lock()
    /**
    ​ * mutex_lock - acquire the mutex
    ​ * @lock: the mutex to be acquired
    ​ *
    ​ * Lock the mutex exclusively for this task. If the mutex is not
    ​ * available right now, it will sleep until it can get it.
    ​ *
    ​ * The mutex must later on be released by the same task that
    ​ * acquired it. Recursive locking is not allowed. The task
    ​ * may not exit without first unlocking the mutex. Also, kernel
    ​ * memory where the mutex resides must not be freed with
    ​ * the mutex still locked. The mutex must first be initialized
    ​ * (or statically defined) before it can be locked. memset()-ing
    ​ * the mutex to 0 is not allowed.
    ​ *
    ​ * (The CONFIG_DEBUG_MUTEXES .config option turns on debugging
    ​ * checks that will enforce the restrictions and will also do
    ​ * deadlock debugging)
    ​ *
    ​ * This function is similar to (but not equivalent to) down().
    ​ */void __sched mutex_lock(struct mutex *lock)
    ​{
    ​	might_sleep();
    
    ​	if (!__mutex_trylock_fast(lock))
    ​		__mutex_lock_slowpath(lock);
    ​}
    ​EXPORT_SYMBOL(mutex_lock);
    
    • 確認 mutex lock 狀態後,如果不可用則進入 might_sleep(),否則進入 __mutex_lock_slowpath() 坐下一步。
  • __mutex_lock_slowpath()
    static noinline void __sched
    ​__mutex_lock_slowpath(struct mutex *lock)
    ​{
    ​	__mutex_lock(lock, TASK_UNINTERRUPTIBLE, 0, NULL, _RET_IP_);
    ​}
    
    • 填入各項數值後做 mutex,進入 __mutex_lock()
  • __mutex_lock()
    static int __sched
    ​__mutex_lock(struct mutex *lock, long state, unsigned int subclass,
    ​		 struct lockdep_map *nest_lock, unsigned long ip)
    ​{
    ​	return __mutex_lock_common(lock, state, subclass, nest_lock, ip, NULL, false);
    ​}
    
    • 整理過引入參數後,進入 __mutex_lock_common()
  • __mutex_lock_common()
    static __always_inline int __sched
    ​__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
    ​			struct lockdep_map *nest_lock, unsigned long ip,
    ​			struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
    ​{
    ​	struct mutex_waiter waiter;
    ​	bool first = false;
    ​	struct ww_mutex *ww;
    ​	int ret;
    
    ​	might_sleep();
    
    ​	ww = container_of(lock, struct ww_mutex, base);
    ​	if (use_ww_ctx && ww_ctx) {
    ​		if (unlikely(ww_ctx == READ_ONCE(ww->ctx)))
    ​			return -EALREADY;
    ​	}
    
    ​	preempt_disable();
    ​	mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);
    
    ​	if (__mutex_trylock(lock) ||
    ​		mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, NULL)) {
    ​		/* got the lock, yay! */
    ​		lock_acquired(&lock->dep_map, ip);
    ​		if (use_ww_ctx && ww_ctx)
    ​			ww_mutex_set_context_fastpath(ww, ww_ctx);
    ​		preempt_enable();
    ​		return 0;
    ​	}
    
    ​	spin_lock(&lock->wait_lock);
    ​	/*
    ​	 * After waiting to acquire the wait_lock, try again.
    ​	 */if (__mutex_trylock(lock)) {
    ​		if (use_ww_ctx && ww_ctx)
    ​			__ww_mutex_wakeup_for_backoff(lock, ww_ctx);
    
    ​		goto skip_wait;
    ​	}
    
    ​	debug_mutex_lock_common(lock, &waiter);
    ​	debug_mutex_add_waiter(lock, &waiter, current);
    
    ​	lock_contended(&lock->dep_map, ip);
    
    ​	if (!use_ww_ctx) {
    ​		/* add waiting tasks to the end of the waitqueue (FIFO): */
    ​		list_add_tail(&waiter.list, &lock->wait_list);
    
    ​#ifdef CONFIG_DEBUG_MUTEXES
    ​		waiter.ww_ctx = MUTEX_POISON_WW_CTX;
    ​#endif
    ​	} else {
    ​		/* Add in stamp order, waking up waiters that must back off. */
    ​		ret = __ww_mutex_add_waiter(&waiter, lock, ww_ctx);
    ​		if (ret)
    ​			goto err_early_backoff;
    
    ​		waiter.ww_ctx = ww_ctx;
    ​	}
    
    ​	waiter.task = current;
    
    ​	if (__mutex_waiter_is_first(lock, &waiter))
    ​		__mutex_set_flag(lock, MUTEX_FLAG_WAITERS);
    
    ​	set_current_state(state);
    ​	for (;;) {
    ​		/*
    ​		 * Once we hold wait_lock, we're serialized against
    ​		 * mutex_unlock() handing the lock off to us, do a trylock
    ​		 * before testing the error conditions to make sure we pick up
    ​		 * the handoff.
    ​		 */if (__mutex_trylock(lock))
    ​			goto acquired;
    
    ​		/*
    ​		 * Check for signals and wound conditions while holding
    ​		 * wait_lock. This ensures the lock cancellation is ordered
    ​		 * against mutex_unlock() and wake-ups do not go missing.
    ​		 */if (unlikely(signal_pending_state(state, current))) {
    ​			ret = -EINTR;
    ​			goto err;
    ​		}
    
    ​		if (use_ww_ctx && ww_ctx && ww_ctx->acquired > 0) {
    ​			ret = __ww_mutex_lock_check_stamp(lock, &waiter, ww_ctx);
    ​			if (ret)
    ​				goto err;
    ​		}
    
    ​		spin_unlock(&lock->wait_lock);
    ​		schedule_preempt_disabled();
    
    ​		/*
    ​		 * ww_mutex needs to always recheck its position since its waiter
    ​		 * list is not FIFO ordered.
    ​		 */if ((use_ww_ctx && ww_ctx) || !first) {
    ​			first = __mutex_waiter_is_first(lock, &waiter);
    ​			if (first)
    ​				__mutex_set_flag(lock, MUTEX_FLAG_HANDOFF);
    ​		}
    
    ​		set_current_state(state);
    ​		/*
    ​		 * Here we order against unlock; we must either see it change
    ​		 * state back to RUNNING and fall through the next schedule(),
    ​		 * or we must see its unlock and acquire.
    ​		 */if (__mutex_trylock(lock) ||
    ​			(first && mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, &waiter)))
    ​			break;
    
    ​		spin_lock(&lock->wait_lock);
    ​	}
    ​	spin_lock(&lock->wait_lock);
    ​acquired:
    ​	__set_current_state(TASK_RUNNING);
    
    ​	mutex_remove_waiter(lock, &waiter, current);
    ​	if (likely(list_empty(&lock->wait_list)))
    ​		__mutex_clear_flag(lock, MUTEX_FLAGS);
    
    ​	debug_mutex_free_waiter(&waiter);
    
    ​skip_wait:
    ​	/* got the lock - cleanup and rejoice! */
    ​	lock_acquired(&lock->dep_map, ip);
    
    ​	if (use_ww_ctx && ww_ctx)
    ​		ww_mutex_set_context_slowpath(ww, ww_ctx);
    
    ​	spin_unlock(&lock->wait_lock);
    ​	preempt_enable();
    ​	return 0;
    
    ​err:
    ​	__set_current_state(TASK_RUNNING);
    ​	mutex_remove_waiter(lock, &waiter, current);
    ​err_early_backoff:
    ​	spin_unlock(&lock->wait_lock);
    ​	debug_mutex_free_waiter(&waiter);
    ​	mutex_release(&lock->dep_map, 1, ip);
    ​	preempt_enable();
    ​	return ret;
    ​}
    
    • 主要處理 mutex 的函式
    1. 先取得 structure
      • ww = container_of(lock, struct ww_mutex, base);
    2. 關閉 preempt
      • preempt_disable();
    3. 透過 spinlock 實做
      • spin_lock(&lock->wait_lock);
    4. 設定 Flag
      • __mutex_set_flag(lock, MUTEX_FLAG_WAITERS);
    5. 直到事情做完才會解鎖

Spinlock (https://zhuanlan.zhihu.com/p/363993550)

https://elixir.bootlin.com/linux/v4.14.50/source/include/linux/spinlock.h

  • spin_lock()
    static __always_inline void spin_lock(spinlock_t *lock)
    ​{
    ​	raw_spin_lock(&lock->rlock);
    ​}
    
    • 虛擬入口接到 raw_spin_lock()
  • raw_spin_lock()
    #define raw_spin_lock(lock)	_raw_spin_lock(lock)
    
    • 接到 _raw_spin_lock()

https://elixir.bootlin.com/linux/v4.14.50/source/kernel/locking/spinlock.c

  • _raw_spin_lock()
    void __lockfunc _raw_spin_lock(raw_spinlock_t *lock)
    ​{
    ​	__raw_spin_lock(lock);
    ​}
    ​EXPORT_SYMBOL(_raw_spin_lock);
    
    • 接到 __raw_spin_lock()
  • __raw_spin_lock()
    static inline void __raw_spin_lock(raw_spinlock_t *lock)
    ​{
    ​	preempt_disable();
    ​	spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    ​	LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
    ​}
    
    • 設定 preempt 是關閉的

Semaphore

  • sem_t semaphore
    struct semaphore {
    ​	raw_spinlock_t		lock;
    ​	unsigned int		count;
    ​	struct list_head	wait_list;
    ​};
    
  • sem_init
    int __init sem_init(void)
    ​{
    ​	const int err = sem_init_ns(&init_ipc_ns);
    
    ​	ipc_init_proc_interface("sysvipc/sem",
    ​				"       key      semid perms      nsems   uid   gid  cuid  cgid      otime      ctime\n",
    ​				IPC_SEM_IDS, sysvipc_sem_proc_show);
    ​	return err;
    ​}
    
    • 初始化
  • sem_lock()
    /*
    ​ * If the request contains only one semaphore operation, and there are
    ​ * no complex transactions pending, lock only the semaphore involved.
    ​ * Otherwise, lock the entire semaphore array, since we either have
    ​ * multiple semaphores in our own semops, or we need to look at
    ​ * semaphores from other pending complex operations.
    ​ */
    ​static inline int sem_lock(struct sem_array *sma, struct sembuf *sops,
    ​				  int nsops)
    ​{
    ​	struct sem *sem;
    
    ​	if (nsops != 1) {
    ​		/* Complex operation - acquire a full lock */
    ​		ipc_lock_object(&sma->sem_perm);
    
    ​		/* Prevent parallel simple ops */
    ​		complexmode_enter(sma);
    ​		return SEM_GLOBAL_LOCK;
    ​	}
    
    ​	/*
    ​	 * Only one semaphore affected - try to optimize locking.
    ​	 * Optimized locking is possible if no complex operation
    ​	 * is either enqueued or processed right now.
    ​	 *
    ​	 * Both facts are tracked by use_global_mode.
    ​	 */
    ​	sem = &sma->sems[sops->sem_num];
    
    ​	/*
    ​	 * Initial check for use_global_lock. Just an optimization,
    ​	 * no locking, no memory barrier.
    ​	 */if (!sma->use_global_lock) {
    ​		/*
    ​		 * It appears that no complex operation is around.
    ​		 * Acquire the per-semaphore lock.
    ​		 */
    ​		spin_lock(&sem->lock);
    
    ​		/* pairs with smp_store_release() */if (!smp_load_acquire(&sma->use_global_lock)) {
    ​			/* fast path successful! */return sops->sem_num;
    ​		}
    ​		spin_unlock(&sem->lock);
    ​	}
    
    ​	/* slow path: acquire the full lock */
    ​	ipc_lock_object(&sma->sem_perm);
    
    ​	if (sma->use_global_lock == 0) {
    ​		/*
    ​		 * The use_global_lock mode ended while we waited for
    ​		 * sma->sem_perm.lock. Thus we must switch to locking
    ​		 * with sem->lock.
    ​		 * Unlike in the fast path, there is no need to recheck
    ​		 * sma->use_global_lock after we have acquired sem->lock:
    ​		 * We own sma->sem_perm.lock, thus use_global_lock cannot
    ​		 * change.
    ​		 */
    ​		spin_lock(&sem->lock);
    
    ​		ipc_unlock_object(&sma->sem_perm);
    ​		return sops->sem_num;
    ​	} else {
    ​		/*
    ​		 * Not a false alarm, thus continue to use the global lock
    ​		 * mode. No need for complexmode_enter(), this was done by
    ​		 * the caller that has set use_global_mode to non-zero.
    ​		 */return SEM_GLOBAL_LOCK;
    ​	}
    ​}
    
    • 鎖定

本章節練習與反思

  • 試著說明看看不同情境下適合使用哪種同步方式?
    • IPC
    • RPC
    • Thread
    • Process
  • Mutex 及 Spinlock 是否可以替換使用?

參考資料