# **Classical Problems of Synchronization** * 用於測試新提出同步方案的經典問題 1. Bounded-Buffer Problem 2. Readers and Writers Problem 3. Dining-Philosophers Problem ## **Bounded-Buffer Problem** * n個緩衝區,每一個可以容納一個item * Semaphore `mutex` 初始化值為1 * Semaphore `full` 初始化值為0 * Semaphore `empty` 初始化值為n * **生產者過程的結構** ``` while(true){ ... /*在next_produced 中製作一個item*/ ... wait(empty); wait(mutex); ... /*將下一個produced新增到buffer中*/ ... signal(mutex); signal(full); } ``` * * 在程式碼中,首先是在一個稱為 empty 的信號(或稱為條件變數)上等待。當 empty 信號指示緩衝區為空時,表示可以往緩衝區中放入新的項目。一旦等待 empty 信號完成,表示緩衝區有空間可以放置新的項目,接著程式會等待一個互斥鎖(mutex),這是為了確保同一時間只有一個執行緒可以修改緩衝區的內容。 * 在等待互斥鎖之後,程式碼會執行一些製作新項目並放入緩衝區的操作,這部分被註解為 /*在next_produced中製作一個item*/ 和 /*將下一個produced新增到buffer中*/。 * 完成對緩衝區的修改後,程式碼會釋放互斥鎖,表示緩衝區的操作已完成。接著,它會通知一個消費者,表示緩衝區中有新的項目可供消費。這是通過發送一個信號(或條件變數)full 來實現的。 * **消費者過程的結構** ``` while(true){ wait(full); wait(mutex); ... /*從buffer中刪除一個item到next_consumed*/ ... signl(mutex); signal(empty); ... /*在下次consumed中消費item*/ ... } ``` * * 程式碼在一個稱為 full 的信號上等待。當 full 信號指示緩衝區中有項目可供消費時,程式才會繼續執行。一旦收到 full 信號,表示緩衝區中有項目可供消費,接著程式會等待一個互斥鎖(mutex),這是為了確保同一時間只有一個執行緒可以修改緩衝區的內容。 * 在獲取互斥鎖之後,程式碼會執行一些操作來消費緩衝區中的項目,這部分被註解為 /*從buffer中刪除一個item到next_consumed*/ 和 /*在下次consumed中消費item*/。 * 完成對緩衝區的操作後,程式碼會釋放互斥鎖,表示緩衝區的操作已完成。接著,它會通知一個生產者,表示緩衝區中有空間可以放入新的項目。這是通過發送一個信號(或條件變數)empty 來實現的 ## **Readers-Writers Problem** * Data set 在多個並發進程(concurrent processes)之間共享 * Readers - 指讀取data set,不執行任何更新 * Writers - 可以讀取(read)和寫入(write) * 問題點:允許多個readers同時讀取 =>但同個時間點只有一個writer可以存取共享數據 * 考慮readers和writers的方式有多種變化--都涉及某種形式的優先級 * Shared Data(共享資料) * Data set * Semaphore `rw_mutex` 初始化值為1 (rw_mutex => reader & writer mutex) * Semaphore `mutex` 初始化值為1 * Integer `read_count` 初始化值為0 * Writer進程的結構 ``` while(true){ wait(tw_mutex); ... /*寫入被執行*/ ... signal(rw_mutex); } ``` * reader進程的結構 ``` while(true){ wait(mutex); read_count++; if(read_count==1){ wait(rw_mutex); } signal(mutex); ... /*執行讀取*/ ... wait(mutex); read count--; if(read_count==0){ signal(rw_mutex); } signal(mutex); } ``` * * 在迴圈中,讀者首先等待 mutex 互斥鎖。這個等待確保了同一時間只有一個讀者可以修改 read_count 變數。 * 接著,讀者將 read_count 變數增加 1,表示有一個讀者進入臨界區。如果這個讀者是第一個進入臨界區的讀者(即 read_count 變為 1),則這個讀者會等待 rw_mutex 互斥鎖。這是為了防止寫者跟讀者碰撞。 * 然後,讀者釋放 mutex 互斥鎖,進入臨界區域進行讀取操作,這部分被註解為 /*執行讀取*/。在讀取操作完成後,讀者再次等待 mutex 互斥鎖,將 read_count 變數減少 1,表示有一個讀者離開了臨界區。如果沒有其他讀者了(即 read_count 變為 0),則釋放 rw_mutex 互斥鎖,允許寫者進入臨界區 ### Readers-Writers Problem Variations * Bounded-Buffer Problem的解決方案可能會導致寫入進程從不寫入的情況。被稱為"First reader-writer"問題 * "Second reader-writer"問題是"First reader-writer"問題的變體,該問題指出: =>一旦血樹者準備好寫入,就不允許"新到達的reader"讀取 * 而這兩種都可能導致飢餓(starvation)。導致更多的變化 * 問題在某些系統上透過核心提供讀寫鎖定來解決 ## **Dining-Philosophers Problem** * 不與鄰居互動,偶爾嘗試從碗中拿起兩隻筷子(一次一支)進食 =>需要兩者才能進食,完成後釋放兩者 * 在有5位philosophers的情況下 * Shared data * Bowl of rice (data set) * Semaphore `chopstick[5] `初始化為1 ### Semaphore Solution * Philosopher i的結構 ``` while (true){ wait (chopstick[i] ); wait (chopStick[ (i + 1) % 5] ); /* eat for awhile */ signal (chopstick[i] ); signal (chopstick[ (i + 1) % 5] ); /* think for awhile */ } ``` what is the problem with this algorithm? ### Monitor Solution to Dining Philosophers ``` monitor DiningPhilosophers{ enum { THINKING; HUNGRY, EATING) state [5] ; condition self [5]; void pickup (int i) { state[i] = HUNGRY; test(i); => it can eat if the test is successful=>state becomes "eating"(1) if (state[i] != EATING) self[i].wait; } void putdown (int i) { state[i] = THINKING; // test left and right neighbors test((i + 4) % 5); test((i + 1) % 5); } void test (int i) { if ((state[(i + 4) % 5] != EATING) &&(state[i] == HUNGRY) &&(state[(i + 1) % 5] != EATING) ) { state[i] = EATING ; self[i].signal () ; } } initialization_code() { for (int i = 0; i < 5; i++) state[i] = THINKING; } } ``` * 每個philosopher i依照以下順序呼叫pickup()和putdown()運算 ``` DiningPhilosophers.pickup(i); /** EAT **/ DiningPhilosophers.putdown(i); ``` * 不會出現deadlock,但starvation(飢餓)可能會出現 ### Dining-Philosophers Problem * **為了防止deadlock,可以通過施加以下限制來解決**: * 最多允許同時有四位Philosophers坐在桌子旁邊 * 只有當兩根筷子都可用時,philosophers才能夠拿起筷子進食 * 使用asymmetric solution(非對稱解決方案) =>例如,一位奇數philosopher先拿起左邊的筷子,然後再拿起右邊的筷子,而一位偶數philosopher先拿起右手的筷子,然後拿起左邊的筷子。 # Kernel Synchronization - Windows * Inside the kernel * 在**單處理系統**(uniprocessor systems)上,使用**中斷屏蔽**(interrupt masks)來保護對**全局資料**(global resources)的訪問 * 在多處理系統(multiprocessor systems)上,使用自旋鎖(spinlocks) * spinlocks會導致**線程(thread)永遠不會被搶佔** * Outside the kernel * 為用戶線程提供了**調度對象(dispatcher objects)**,用於根據**互斥鎖(mutexs)**、**信號量(semaphores)**、**事件(events)**和**計時器機制(timers mechanisms)**進行同步。 * 事件 =>事件的作用很像條件變數 * 計時器在時間到期時通知一個或多個執行緒。 * 調度對象(dispatcher objects)處於有訊號狀態(signaled-state)(對象可用)或無訊號狀態(non-signaled state)(執行緒將阻塞) * 互斥調度程序對象 ![image](https://hackmd.io/_uploads/BJ1kqYzR6.png) # Linux Synchronization * Linux: * 在核心版本 2.6 之前,停用中斷以實現short critical sections * 版本2.6及更高版本,完全搶佔式 * Linux提供: * 信號量(Semaphores) * 原子整數(atomic integers) * 自旋鎖(spinlocks) * 兩者的讀寫器版本(reader-writer versions of both) * 在**單CPU系統(single-cpu system)**上,透過**啟用(enabling)和停用(disabling)**核心**搶佔(preemption)**來替換**自旋鎖(spinlocks)** * 原子變數(Atomic variables) * `atomic_t`是原子整數的類型 * 考慮變數 * `atomic_t counter;` | Atomic Operation | Effect | | ----------------------------- |:---------------------- | | atomic_set(&counter,5) | counter = 5 | | atomic_add(10,&counter) | counter = counter + 10 | | atomic_sub(4,&counter) | counter = counter - 4 | | atomic_inc(&counter) | counter = counter + 1 | | value = atomic_read(&counter) | value =12 | # POSIX Synchronization * POSIX API提供 * 互斥鎖(mutex locks) * 信號量(semaphores) * 條件變數(condition variable) * 廣泛用於 UNIX、Linux 和 macOS ## POSIX Mutex Locks * 建立(Creating)和初始化(initializing)鎖 ``` #include <pthread.h> pthread_mutex_t mutex; /*create and initialize the mutex lock*/ pthread_mutex_init(&NULL); ``` * 取得(Acquiring)和釋放(releasing)鎖 ``` /*acquire the mutex lock*/ pthread_mutex_lock(&mutex); /*critical section*/ /*release the mutex lock*/ pthread_mutex_unlock(&mutex); ``` ## POSIX Semaphores * POSIX 提供了兩個版本-命名(named)和未命名(unnamed) * 命名信號量(Named semaphores)可以被不相關的程序使用,未命名的信號(Unnamed semaphores)量則不能。 ## POSIX Named Semaphores * 建立並初始化semaphore: ``` #include <semaphore.h> sem_t *sem; /*Create the semaphore and initialize it to 1*/ sem = sem_open("SEM", O_CREAT, 0666, 1); ``` * 其他進程可以透過引用其名稱SEM來存取semaphore * 取得和釋放semaphore: ``` /*acquire the semaphore*/ sem_wait(sem); /*critical section*/ /*release the semaphore*/ sem_post(sem); ``` ## POSIX Unnamed Semaphores * 建立和初始化semaphore: ``` #include <semaphore.h> sem_t *sem; /*Create the semaphore and initialize it to 1*/ sem_init(&sem, 0, 1); ``` * 取得和釋放semaphore: ``` /*acquire the semaphore*/ sem_wait(sem); /*critical section*/ /*release the semaphore*/ sem_post(&sem); ``` ## POSIX Condition Variables * 由於 POSIX 通常在 C/C++ 中使用,且這些語言不提供監視器,因此 POSIX 條件變數與 POSIX 互斥鎖關聯以提供互斥: * 建立和初始化條件變數: ``` pthread_mutex_t mutex; pthread_cond_t cond_var; pthread_mutex_init(&mutex,NULL); pthread_mutex_init(&NULL); ``` * 執行緒等待條件 a == b 變成真: ``` pthread_mutex_lock(&mutex); while(a !- b) pthread_cond_wait(&cond_var, &mutex); pthread_mutex_unlock(&mutex); ``` * 執行緒向另一個正在等待條件變數的執行緒發出訊號: ``` pthread_mutex_lock(&mutex); a = b; pthread_cond_signal(&cond_var); pthread_mutex_unlock(&mutex); ``` # Java Synchronizatoin * Java 提供了豐富的同步功能: * Java 監視器(Java monitors) * 可重入鎖定(Reentrant locks) * 信號量(Senaphores) * 條件變量(Condition variables) ## Java Monitors * 每個 Java 物件都有一個與之關聯的single lock * 如果一個方法被宣告為同步(synchronized),則呼叫執行緒必須擁有該物件的鎖 * 如果鎖定被另一個執行緒擁有,則呼叫執行緒必須等待鎖定直到它被釋放。 * 當擁有執行緒退出同步方法時,Locks就會釋放。 ### Bounded Buffer - Java Synchronization ``` public class BoundedBuffer<E>{ private static final int BUFFER_SIZE = 5; private int count, in, out; private E[] buffer; public BoundBuffer(){ count = 0; in =0; out = 0; buffer = (E[]) new Object[BUFFER_SIZE]; } /*Producers call this method*/ publc synchronized void insert(E item){ /*See Figure 7.11*/ } /*Consumers call this method*/ public synchronized E remove(){ /*See figure 7.11*/ } } ``` ## Java Synchronization * 嘗試取得不可用鎖的執行緒被放置在物件的entry set中 ![image](https://hackmd.io/_uploads/Skz-7v7C6.png) * 同樣,每個物件也有一個等待集 * 當一個執行緒呼叫wait()時: 1. 釋放該物件的鎖定 2. 該執行緒的狀態被設定為阻塞 3. 該執行緒被放入該物件的等待集中 ![image](https://hackmd.io/_uploads/SJLPXDQ0p.png) * 執行緒通常在等待條件成立時呼叫 wait()。 * 執行緒如何得到通知? * 當執行緒呼叫notify()時: 1. 從等待集中選擇任意執行緒T 2. 將T從等待集中移至條目集 3. 將T的狀態從阻塞設定為可運行。 * T 現在可以競爭鎖來檢查它等待的條件現在是否為真 ### Bounded Buffer - Java Synchronization(figure 7-11) ``` /*Producers call this method*/ public synchronized void insert(E item){ while(count == BUFFER_SIZE){ try{ wait(); } catch(InterruptedException ie){ } } buffer[in] = item; in = (in + 1) % BUFFER_SIZE; count++; notify(); } ``` ``` /*Consumers call this method*/ public synchronized E remove(){ E item; while(count == B0){ try{ wait(); } catch(InterruptedException ie){ } } item = buffer[out]; out = (out + 1) % BUFFER_SIZE; count--; notify(); return item; } ``` ## Java Reentrant Locks * 與mutex locks類似 * finally 子句確保在 try 區塊中發生異常時釋放鎖。 ``` Lock key = new ReentranLock(); key.lock(); try{ /*critical section*/ } finally{ key.unlock(); } ``` ## Java Semaphores * Constructor: ``` Semaphore(int value); ``` * Usage: ``` Semaphore sem = new Semaphore(1); try{ sem.acquire(); /*critical section*/ } catch(InterruptedException ie){} finallly{ sem.release(); } ``` ## Java Condition Variables * 條件變數與ReentrantLock相關聯 * 使用ReentrantLock的newCondition()方法建立條件變數: ``` Lock key = new ReentrantLock(); Condition condVar = key.newCondition(); ``` * 執行緒透過呼叫await()方法進行等待,透過呼叫signal()方法發出訊號 * 範例: * 五個編號為 0 ~ 4 的執行緒。 * 共用變數turn 指示輪到哪個執行緒了。 * 當執行緒希望執行某些工作時,它會呼叫doWork()。 =>但只有輪到他們時,它才可能起作用 =>如果沒有輪到他們,請等待 =>如果輪到了,就暫時做一些工作 * 完成後,通知下一個輪到的線程。 * 必要的資料結構: ``` Lock lock = new ReentrantLock(); Condition[] condVars = new Condition[5]; for(int i=0; i<5; i++) condVars[i] = lock.newCondition(); ``` ``` /*threadNumber is the thread that wishes to do some work*/ public void doWork(int threadNumber){ lock.lock(); try{ /** *If it's not my turn, then wait *until I'm signaled */ if(threadNumber != turn) condVars[threadNumber].await(); /** *Do some work for awhile ... */ /** *Now signal to the next thread. */ trun = (turn + 1) % 5; condVars[turn].signal(); } catch (InterruptedException ie){ } finally{ lock.unlock(); } } ``` # Alternative Approaches ## Transactional Memory * 考慮一個必須以原子方式呼叫的函數 update()。一種選擇是使用互斥鎖: ``` void update(){ acqire(); /*modify shared data*/ release(); } ``` * 潛在問題包括線程數量增加時出現死鎖和鎖爭用 ``` void update(){ atomic{ /*modify shared data*/ } } ``` * 死鎖是不可能的(不涉及鎖)。 * 記憶體事務是一系列以原子方式執行的記憶體讀寫操作。可以透過加入atomic{S}來完成事務,這確保S中的語句以原子方式執行: ## OpenMP * OpenMP 是一組支援平行程式設計的編譯器指令和 API ``` void update(int value){ #pragma omp critical{ count += value } } ``` * `#pragma omp critical` 指令中包含的程式碼被視為關鍵部分並以原子方式執行 ## Functional Programming Languages * 函數式程式語言提供了與過程式語言不同的範例,因為它們不維護狀態 * 變數被視為不可變的,一旦被賦值就無法改變狀態 * 人們對 Erlang 和 Scala 等函數式語言處理資料競爭的方法越來越感興趣 * Procedural Programming ``` def factorial(n) result = 1 for i in 2..n result = i * result end return result end ``` * Functional Programming ``` def factorial(n) if (n==1) return 1 else return n * factorial(n-1) end end ``` ## Erlang and Scala * Erlang語言支援並發,易於開發平行應用程式 * Scala是一種函數式語言,也是物件導向的,類似於Java和C# # Inportant Sentance 1. it can eat if the test is successful(139)