# **Classical Problems of Synchronization**
* 用於測試新提出同步方案的經典問題
1. Bounded-Buffer Problem
2. Readers and Writers Problem
3. Dining-Philosophers Problem
## **Bounded-Buffer Problem**
* n個緩衝區,每一個可以容納一個item
* Semaphore `mutex` 初始化值為1
* Semaphore `full` 初始化值為0
* Semaphore `empty` 初始化值為n
* **生產者過程的結構**
```
while(true){
...
/*在next_produced 中製作一個item*/
...
wait(empty);
wait(mutex);
...
/*將下一個produced新增到buffer中*/
...
signal(mutex);
signal(full);
}
```
*
* 在程式碼中,首先是在一個稱為 empty 的信號(或稱為條件變數)上等待。當 empty 信號指示緩衝區為空時,表示可以往緩衝區中放入新的項目。一旦等待 empty 信號完成,表示緩衝區有空間可以放置新的項目,接著程式會等待一個互斥鎖(mutex),這是為了確保同一時間只有一個執行緒可以修改緩衝區的內容。
* 在等待互斥鎖之後,程式碼會執行一些製作新項目並放入緩衝區的操作,這部分被註解為 /*在next_produced中製作一個item*/ 和 /*將下一個produced新增到buffer中*/。
* 完成對緩衝區的修改後,程式碼會釋放互斥鎖,表示緩衝區的操作已完成。接著,它會通知一個消費者,表示緩衝區中有新的項目可供消費。這是通過發送一個信號(或條件變數)full 來實現的。
* **消費者過程的結構**
```
while(true){
wait(full);
wait(mutex);
...
/*從buffer中刪除一個item到next_consumed*/
...
signl(mutex);
signal(empty);
...
/*在下次consumed中消費item*/
...
}
```
*
* 程式碼在一個稱為 full 的信號上等待。當 full 信號指示緩衝區中有項目可供消費時,程式才會繼續執行。一旦收到 full 信號,表示緩衝區中有項目可供消費,接著程式會等待一個互斥鎖(mutex),這是為了確保同一時間只有一個執行緒可以修改緩衝區的內容。
* 在獲取互斥鎖之後,程式碼會執行一些操作來消費緩衝區中的項目,這部分被註解為 /*從buffer中刪除一個item到next_consumed*/ 和 /*在下次consumed中消費item*/。
* 完成對緩衝區的操作後,程式碼會釋放互斥鎖,表示緩衝區的操作已完成。接著,它會通知一個生產者,表示緩衝區中有空間可以放入新的項目。這是通過發送一個信號(或條件變數)empty 來實現的
## **Readers-Writers Problem**
* Data set 在多個並發進程(concurrent processes)之間共享
* Readers - 指讀取data set,不執行任何更新
* Writers - 可以讀取(read)和寫入(write)
* 問題點:允許多個readers同時讀取
=>但同個時間點只有一個writer可以存取共享數據
* 考慮readers和writers的方式有多種變化--都涉及某種形式的優先級
* Shared Data(共享資料)
* Data set
* Semaphore `rw_mutex` 初始化值為1 (rw_mutex => reader & writer mutex)
* Semaphore `mutex` 初始化值為1
* Integer `read_count` 初始化值為0
* Writer進程的結構
```
while(true){
wait(tw_mutex);
...
/*寫入被執行*/
...
signal(rw_mutex);
}
```
* reader進程的結構
```
while(true){
wait(mutex);
read_count++;
if(read_count==1){
wait(rw_mutex);
}
signal(mutex);
...
/*執行讀取*/
...
wait(mutex);
read count--;
if(read_count==0){
signal(rw_mutex);
}
signal(mutex);
}
```
*
* 在迴圈中,讀者首先等待 mutex 互斥鎖。這個等待確保了同一時間只有一個讀者可以修改 read_count 變數。
* 接著,讀者將 read_count 變數增加 1,表示有一個讀者進入臨界區。如果這個讀者是第一個進入臨界區的讀者(即 read_count 變為 1),則這個讀者會等待 rw_mutex 互斥鎖。這是為了防止寫者跟讀者碰撞。
* 然後,讀者釋放 mutex 互斥鎖,進入臨界區域進行讀取操作,這部分被註解為 /*執行讀取*/。在讀取操作完成後,讀者再次等待 mutex 互斥鎖,將 read_count 變數減少 1,表示有一個讀者離開了臨界區。如果沒有其他讀者了(即 read_count 變為 0),則釋放 rw_mutex 互斥鎖,允許寫者進入臨界區
### Readers-Writers Problem Variations
* Bounded-Buffer Problem的解決方案可能會導致寫入進程從不寫入的情況。被稱為"First reader-writer"問題
* "Second reader-writer"問題是"First reader-writer"問題的變體,該問題指出:
=>一旦血樹者準備好寫入,就不允許"新到達的reader"讀取
* 而這兩種都可能導致飢餓(starvation)。導致更多的變化
* 問題在某些系統上透過核心提供讀寫鎖定來解決
## **Dining-Philosophers Problem**
* 不與鄰居互動,偶爾嘗試從碗中拿起兩隻筷子(一次一支)進食
=>需要兩者才能進食,完成後釋放兩者
* 在有5位philosophers的情況下
* Shared data
* Bowl of rice (data set)
* Semaphore `chopstick[5] `初始化為1
### Semaphore Solution
* Philosopher i的結構
```
while (true){
wait (chopstick[i] );
wait (chopStick[ (i + 1) % 5] );
/* eat for awhile */
signal (chopstick[i] );
signal (chopstick[ (i + 1) % 5] );
/* think for awhile */
}
```
what is the problem with this algorithm?
### Monitor Solution to Dining Philosophers
```
monitor DiningPhilosophers{
enum { THINKING; HUNGRY, EATING) state [5] ;
condition self [5];
void pickup (int i) {
state[i] = HUNGRY;
test(i); => it can eat if the test is successful=>state becomes "eating"(1)
if (state[i] != EATING) self[i].wait;
}
void putdown (int i) {
state[i] = THINKING;
// test left and right neighbors
test((i + 4) % 5);
test((i + 1) % 5);
}
void test (int i) {
if ((state[(i + 4) % 5] != EATING) &&(state[i] == HUNGRY) &&(state[(i + 1) % 5] != EATING) ) {
state[i] = EATING ;
self[i].signal () ;
}
}
initialization_code() {
for (int i = 0; i < 5; i++)
state[i] = THINKING;
}
}
```
* 每個philosopher i依照以下順序呼叫pickup()和putdown()運算
```
DiningPhilosophers.pickup(i);
/** EAT **/
DiningPhilosophers.putdown(i);
```
* 不會出現deadlock,但starvation(飢餓)可能會出現
### Dining-Philosophers Problem
* **為了防止deadlock,可以通過施加以下限制來解決**:
* 最多允許同時有四位Philosophers坐在桌子旁邊
* 只有當兩根筷子都可用時,philosophers才能夠拿起筷子進食
* 使用asymmetric solution(非對稱解決方案)
=>例如,一位奇數philosopher先拿起左邊的筷子,然後再拿起右邊的筷子,而一位偶數philosopher先拿起右手的筷子,然後拿起左邊的筷子。
# Kernel Synchronization - Windows
* Inside the kernel
* 在**單處理系統**(uniprocessor systems)上,使用**中斷屏蔽**(interrupt masks)來保護對**全局資料**(global resources)的訪問
* 在多處理系統(multiprocessor systems)上,使用自旋鎖(spinlocks)
* spinlocks會導致**線程(thread)永遠不會被搶佔**
* Outside the kernel
* 為用戶線程提供了**調度對象(dispatcher objects)**,用於根據**互斥鎖(mutexs)**、**信號量(semaphores)**、**事件(events)**和**計時器機制(timers mechanisms)**進行同步。
* 事件
=>事件的作用很像條件變數
* 計時器在時間到期時通知一個或多個執行緒。
* 調度對象(dispatcher objects)處於有訊號狀態(signaled-state)(對象可用)或無訊號狀態(non-signaled state)(執行緒將阻塞)
* 互斥調度程序對象

# Linux Synchronization
* Linux:
* 在核心版本 2.6 之前,停用中斷以實現short critical sections
* 版本2.6及更高版本,完全搶佔式
* Linux提供:
* 信號量(Semaphores)
* 原子整數(atomic integers)
* 自旋鎖(spinlocks)
* 兩者的讀寫器版本(reader-writer versions of both)
* 在**單CPU系統(single-cpu system)**上,透過**啟用(enabling)和停用(disabling)**核心**搶佔(preemption)**來替換**自旋鎖(spinlocks)**
* 原子變數(Atomic variables)
* `atomic_t`是原子整數的類型
* 考慮變數
* `atomic_t counter;`
| Atomic Operation | Effect |
| ----------------------------- |:---------------------- |
| atomic_set(&counter,5) | counter = 5 |
| atomic_add(10,&counter) | counter = counter + 10 |
| atomic_sub(4,&counter) | counter = counter - 4 |
| atomic_inc(&counter) | counter = counter + 1 |
| value = atomic_read(&counter) | value =12 |
# POSIX Synchronization
* POSIX API提供
* 互斥鎖(mutex locks)
* 信號量(semaphores)
* 條件變數(condition variable)
* 廣泛用於 UNIX、Linux 和 macOS
## POSIX Mutex Locks
* 建立(Creating)和初始化(initializing)鎖
```
#include <pthread.h>
pthread_mutex_t mutex;
/*create and initialize the mutex lock*/
pthread_mutex_init(&NULL);
```
* 取得(Acquiring)和釋放(releasing)鎖
```
/*acquire the mutex lock*/
pthread_mutex_lock(&mutex);
/*critical section*/
/*release the mutex lock*/
pthread_mutex_unlock(&mutex);
```
## POSIX Semaphores
* POSIX 提供了兩個版本-命名(named)和未命名(unnamed)
* 命名信號量(Named semaphores)可以被不相關的程序使用,未命名的信號(Unnamed semaphores)量則不能。
## POSIX Named Semaphores
* 建立並初始化semaphore:
```
#include <semaphore.h>
sem_t *sem;
/*Create the semaphore and initialize it to 1*/
sem = sem_open("SEM", O_CREAT, 0666, 1);
```
* 其他進程可以透過引用其名稱SEM來存取semaphore
* 取得和釋放semaphore:
```
/*acquire the semaphore*/
sem_wait(sem);
/*critical section*/
/*release the semaphore*/
sem_post(sem);
```
## POSIX Unnamed Semaphores
* 建立和初始化semaphore:
```
#include <semaphore.h>
sem_t *sem;
/*Create the semaphore and initialize it to 1*/
sem_init(&sem, 0, 1);
```
* 取得和釋放semaphore:
```
/*acquire the semaphore*/
sem_wait(sem);
/*critical section*/
/*release the semaphore*/
sem_post(&sem);
```
## POSIX Condition Variables
* 由於 POSIX 通常在 C/C++ 中使用,且這些語言不提供監視器,因此 POSIX 條件變數與 POSIX 互斥鎖關聯以提供互斥:
* 建立和初始化條件變數:
```
pthread_mutex_t mutex;
pthread_cond_t cond_var;
pthread_mutex_init(&mutex,NULL);
pthread_mutex_init(&NULL);
```
* 執行緒等待條件 a == b 變成真:
```
pthread_mutex_lock(&mutex);
while(a !- b)
pthread_cond_wait(&cond_var, &mutex);
pthread_mutex_unlock(&mutex);
```
* 執行緒向另一個正在等待條件變數的執行緒發出訊號:
```
pthread_mutex_lock(&mutex);
a = b;
pthread_cond_signal(&cond_var);
pthread_mutex_unlock(&mutex);
```
# Java Synchronizatoin
* Java 提供了豐富的同步功能:
* Java 監視器(Java monitors)
* 可重入鎖定(Reentrant locks)
* 信號量(Senaphores)
* 條件變量(Condition variables)
## Java Monitors
* 每個 Java 物件都有一個與之關聯的single lock
* 如果一個方法被宣告為同步(synchronized),則呼叫執行緒必須擁有該物件的鎖
* 如果鎖定被另一個執行緒擁有,則呼叫執行緒必須等待鎖定直到它被釋放。
* 當擁有執行緒退出同步方法時,Locks就會釋放。
### Bounded Buffer - Java Synchronization
```
public class BoundedBuffer<E>{
private static final int BUFFER_SIZE = 5;
private int count, in, out;
private E[] buffer;
public BoundBuffer(){
count = 0;
in =0;
out = 0;
buffer = (E[]) new Object[BUFFER_SIZE];
}
/*Producers call this method*/
publc synchronized void insert(E item){
/*See Figure 7.11*/
}
/*Consumers call this method*/
public synchronized E remove(){
/*See figure 7.11*/
}
}
```
## Java Synchronization
* 嘗試取得不可用鎖的執行緒被放置在物件的entry set中

* 同樣,每個物件也有一個等待集
* 當一個執行緒呼叫wait()時:
1. 釋放該物件的鎖定
2. 該執行緒的狀態被設定為阻塞
3. 該執行緒被放入該物件的等待集中

* 執行緒通常在等待條件成立時呼叫 wait()。
* 執行緒如何得到通知?
* 當執行緒呼叫notify()時:
1. 從等待集中選擇任意執行緒T
2. 將T從等待集中移至條目集
3. 將T的狀態從阻塞設定為可運行。
* T 現在可以競爭鎖來檢查它等待的條件現在是否為真
### Bounded Buffer - Java Synchronization(figure 7-11)
```
/*Producers call this method*/
public synchronized void insert(E item){
while(count == BUFFER_SIZE){
try{
wait();
}
catch(InterruptedException ie){ }
}
buffer[in] = item;
in = (in + 1) % BUFFER_SIZE;
count++;
notify();
}
```
```
/*Consumers call this method*/
public synchronized E remove(){
E item;
while(count == B0){
try{
wait();
}
catch(InterruptedException ie){ }
}
item = buffer[out];
out = (out + 1) % BUFFER_SIZE;
count--;
notify();
return item;
}
```
## Java Reentrant Locks
* 與mutex locks類似
* finally 子句確保在 try 區塊中發生異常時釋放鎖。
```
Lock key = new ReentranLock();
key.lock();
try{
/*critical section*/
}
finally{
key.unlock();
}
```
## Java Semaphores
* Constructor:
```
Semaphore(int value);
```
* Usage:
```
Semaphore sem = new Semaphore(1);
try{
sem.acquire();
/*critical section*/
}
catch(InterruptedException ie){}
finallly{
sem.release();
}
```
## Java Condition Variables
* 條件變數與ReentrantLock相關聯
* 使用ReentrantLock的newCondition()方法建立條件變數:
```
Lock key = new ReentrantLock();
Condition condVar = key.newCondition();
```
* 執行緒透過呼叫await()方法進行等待,透過呼叫signal()方法發出訊號
* 範例:
* 五個編號為 0 ~ 4 的執行緒。
* 共用變數turn 指示輪到哪個執行緒了。
* 當執行緒希望執行某些工作時,它會呼叫doWork()。
=>但只有輪到他們時,它才可能起作用
=>如果沒有輪到他們,請等待
=>如果輪到了,就暫時做一些工作
* 完成後,通知下一個輪到的線程。
* 必要的資料結構:
```
Lock lock = new ReentrantLock();
Condition[] condVars = new Condition[5];
for(int i=0; i<5; i++)
condVars[i] = lock.newCondition();
```
```
/*threadNumber is the thread that wishes to do some work*/
public void doWork(int threadNumber){
lock.lock();
try{
/**
*If it's not my turn, then wait
*until I'm signaled
*/
if(threadNumber != turn)
condVars[threadNumber].await();
/**
*Do some work for awhile ...
*/
/**
*Now signal to the next thread.
*/
trun = (turn + 1) % 5;
condVars[turn].signal();
}
catch (InterruptedException ie){ }
finally{
lock.unlock();
}
}
```
# Alternative Approaches
## Transactional Memory
* 考慮一個必須以原子方式呼叫的函數 update()。一種選擇是使用互斥鎖:
```
void update(){
acqire();
/*modify shared data*/
release();
}
```
* 潛在問題包括線程數量增加時出現死鎖和鎖爭用
```
void update(){
atomic{
/*modify shared data*/
}
}
```
* 死鎖是不可能的(不涉及鎖)。
* 記憶體事務是一系列以原子方式執行的記憶體讀寫操作。可以透過加入atomic{S}來完成事務,這確保S中的語句以原子方式執行:
## OpenMP
* OpenMP 是一組支援平行程式設計的編譯器指令和 API
```
void update(int value){
#pragma omp critical{
count += value
}
}
```
* `#pragma omp critical` 指令中包含的程式碼被視為關鍵部分並以原子方式執行
## Functional Programming Languages
* 函數式程式語言提供了與過程式語言不同的範例,因為它們不維護狀態
* 變數被視為不可變的,一旦被賦值就無法改變狀態
* 人們對 Erlang 和 Scala 等函數式語言處理資料競爭的方法越來越感興趣
* Procedural Programming
```
def factorial(n)
result = 1
for i in 2..n
result = i * result
end
return result
end
```
* Functional Programming
```
def factorial(n)
if (n==1)
return 1
else
return n * factorial(n-1)
end
end
```
## Erlang and Scala
* Erlang語言支援並發,易於開發平行應用程式
* Scala是一種函數式語言,也是物件導向的,類似於Java和C#
# Inportant Sentance
1. it can eat if the test is successful(139)