###### tags: `CCU_OS` CH 05 Synchronous === ### 解決 critical section 要滿足三個條件 * mutual exclusion * progress * bounded waiting ### Spinlock / Peterson solution * 重點 * `load` and `store` 需要是 atomic operation * `flag` and `turn` 被 assigned 的“順序“很重要 (不能被 compiler 優化,調整先後順序) * 例子 * trival * flag `-O3` ![](https://hackmd.io/_uploads/SkCzBX9w3.png) ```asm 0000000000001370 <p0>: 1370: f3 0f 1e fa endbr64 1374: 53 push %rbx 1375: 48 8d 3d 04 0d 00 00 lea 0xd04(%rip),%rdi # 2080 <_IO_stdin_used+0x80> 137c: 48 8d 1d d5 0c 00 00 lea 0xcd5(%rip),%rbx # 2058 <_IO_stdin_used+0x58> 1383: e8 68 fd ff ff callq 10f0 <puts@plt> 1388: 83 3d cd 2c 00 00 01 cmpl $0x1,0x2ccd(%rip) # 405c <flag1> 138f: c7 05 bf 2c 00 00 01 movl $0x1,0x2cbf(%rip) # 4058 <flag0> 1396: 00 00 00 1399: c7 05 bd 2c 00 00 01 movl $0x1,0x2cbd(%rip) # 4060 <turn> ``` ```clike void p0(void) { printf("p0: start\n"); while (1) { flag0 = 1; turn = 1; while (flag1==1 && turn==1) ; //waiting // -- critical section -- cpu_p0 = sched_getcpu(); in_cs++; if (in_cs == 2) fprintf(stderr, "Both p0 and p1 are in critical section\n"); p0_in_cs++; in_cs--; // ---------------------- flag0=0; } } ``` * correct * `atomic_thread_fence(memory_order_seq_cst);` 避免 instruction 被調整 * ` atomic_store(&flag[0], 1)` &`tomic_load(&flag[1])` ```clike void p0(void) { printf("start p0\n"); while (1) { atomic_store(&flag[0], 1); atomic_thread_fence(memory_order_seq_cst); atomic_store(&turn, 1); while (atomic_load(&flag[1]) && atomic_load(&turn)==1) ; //waiting // -- critical section -- cpu_p0 = sched_getcpu(); in_cs++; if (in_cs == 2) fprintf(stderr, "p0及p1都在critical section\n"); p0_in_cs++; in_cs--; // ---------------------- atomic_store(&flag[0], 0); } } ``` * spinlock 缺點 ,一個 task 持有 lock 但被 schedule out(time sharing 起他 task 會白白等這個 lock ### Semaphore * mutex / semaphore = spinlock + sleep + wakeup * 實作 ```clike wait (S) { value--; if (value < 0) { // add this process to waiting queue block(); } } signal (S) { value++; if (value <= 0) { // remove a process P from the waiting queue wakeup(P); } } ``` * Semaphore value 可以是 * 「負的」,通常是多個task在等待,例如「-1、-2」代表有「一、二」個task在等待 * 「零」,沒有task在等待。初始化時,通常為零 * 「正的」,常用於「一種資源」最多可以服務「X」個task。例如:印表機共三台,因此最多3個task同時列印 ### Mutex * 特色 * 可以判斷是誰鎖住這個mutex(**owner觀念**) * 可能支援優先權繼承 * 可以支援或者不支援「巢狀鎖定」 * 可以支援「自適應鎖」 ### mutex / semaphore 差異 * 需要進入 CS 時, 用 mutex/lock —— 上鎖/解鎖永遠是同一個 thread/process; * 需要處理 signalling 時,用 semaphore —— 等待/通知通常是不同的 threads/processes; (signaling 是 process/threads 之間要通知彼此 「某些條件已經改變」 的情境:例如 producer-consumer 類型的問題可用兩個 semaphore 來實作) * mutex 與 semaphore 的差別在於: * process 使用 mutex 時,process 的運作是持有 mutex,執行 CS 來存取資源,然後釋放 mutex ==Mutex 就像是資源的一把鎖:解鈴還須繫鈴人== * process 使用 semaphore 時,process 總是發出信號 (signal),或者總是接收信號 (wait),同一個 process 不會先後進行 signal 與 wait 換言之,process 要不擔任 producer,要不充當 consumer 的角色,不能兩者都是。==semaphore 是為了保護 process 的執行同步正確;== * [Mutex, Semaphore, the difference, and Linux kernel](https://blog.louie.lu/2016/10/22/mutex-semaphore-the-difference-and-linux-kernel/) * 30秒:最大的差異在於 Mutex 只能由上鎖的 thread 解鎖,而 Semaphore 沒有這個限制,可以由原本的 thread 或是另外一個 thread 解開。另外,Mutex 只能讓一個 thread 進入 critical section,Semaphore 的話則可以設定要讓幾個 thread 進入。這讓實際上使用 Mutex 跟 Semaphore 場景有很大的差別。 * 60秒 (cont.):舉例而言,Mutex 的兩個特性:一個是只能有持鎖人解鎖、一個是在釋放鎖之前不能退出的特性,讓 Mutex 叫常使用在 critical section 只能有一個 thread 進入,而且要避免 priority inversion 的時候;Semaphore 也能透過 binary semaphore 做到類似的事情,卻沒有辦法避免 priority inversion 出現。 * 120秒 (cont.):而 Semaphore 更常是用在同步兩個 thread 或功能上面,因為 Semaphore 實際上使用的是 signal 的 up 與 down,讓 Semaphore 可以變成是一種 notification 的作用,例如 A thread 執行到某個地方時 B thread 才能繼續下去,就可以使用 Semaphore 來達成這樣的作用。 ### 利用 Semaphore 解決常見的問題 * 哲學家問題解法 ![](https://hackmd.io/_uploads/r10zLaqD2.png =300x) ```clike #define NUM_PHILOSOPHERS 6 #define LEFT (i + NUM_PHILOSOPHERS - 1) % NUM_PHILOSOPHERS #define RIGHT (i + 1) % NUM_PHILOSOPHERS sem_t forks[NUM_PHILOSOPHERS]; sem_t room; sem_init(&room, 0, NUM_PHILOSOPHERS - 1); // Limiting the number of diners in the table void *philosopher(void *arg) { int i = *((int *)arg); while (1) { sem_wait(&room); sem_wait(&forks[LEFT]); sem_wait(&forks[RIGHT]); eat_times[i]++; sem_post(&forks[LEFT]); sem_post(&forks[RIGHT]); sem_post(&room); } } ``` * Limiting the number of diners in the table * `sem_init(&room, 0, NUM_PHILOSOPHERS - 1)` * 不限制的話會卡死,沒有任何可以進入 C.S. ```shell Number of each philosopher: p0: 1583 p1: 447 p2: 1329 p3: 0 p4: 942 p5: 1 Number of each philosopher: p0: 1583 p1: 447 p2: 1329 p3: 0 p4: 942 p5: 1 Number of each philosopher: p0: 1583 p1: 447 p2: 1329 p3: 0 p4: 942 p5: 1 ``` * Simultaneously (low concurrency) * 透過 `rand` 來增加每一個哲學家可以用餐的信號量 `eatPermission[i]` 一定程度的緩解 bounded waiting (避免某幾位持續拿到 resources) ```diff + sem_t eatPermission[NUM_PHILOSOPHERS]; void* philosopher(void* arg) { int i = *((int*)arg); while (1) { + sem_wait(&eatPermission[i]); sem_wait(&forks[i]); sem_wait(&forks[(i + 1) % NUM_PHILOSOPHERS]); eat_times[i]++; sem_post(&forks[i]); sem_post(&forks[(i + 1) % NUM_PHILOSOPHERS]); sem_post(&eatSet); } } int main() { + while (1) { + sem_wait(&eatSet); + int philosopher_id = rand() % NUM_PHILOSOPHERS; + sem_post(&eatPermission[philosopher_id]); + } } ``` ### Cache coherence 不同core擁有不同的cache,因此core可能看到「新舊不一」的值 (這個問題稱之為cache coherence problem) * Snoop:空間成本較低,他是基於broadcast,無法實現點對點傳輸 * Dictionary:空間成本較高(要記住最新版本在哪邊),可以實現點對點傳輸 ### Cache info * cache 操作 * cache line * cache line 是 cpu cache 中的最小快取單位。 * 主流的 CPU Cache 的 Cache Line 大小多為 64 Bytes * Flush 把 Cache 內容寫回 Memory ,當 Cache 為 Write through ,不需要 Flush * Invalidate 把 Cache 內容直接丟掉不要 ### atomic operation * 只保證某些指 atomic,傳統上有:test_n_set、swap * X86為例:lock bts X, Y /*Bit Test and Set*/ * X86為例:lock xchg EAX,mem /*exchange*/ * 這二個指令都不斷的對記憶體寫入 * 對記憶體更新,即是不斷觸發 cache coherence,不斷廣播 ```clike int test_n_set (int *value) { register tmp = value; *value = 1; /*對記憶體更新*/ return tmp; } void swap(int* a, int* b) { register tmp; tmp = *a; *a = *b; /*對記憶體更新*/ *b = tmp; /*對記憶體更新*/ } ``` * 改進: `atomic_compare_exchange_strong` * write一定會觸發「cache coherence」。因為「值」改變了那麼需要讓各個cache的內容「一致」 ```clike bool atomic_compare_exchange_strong (volatile atomic_int* obj, int* expected, int desired );{ if (obj == expected) { /*只有在obj == expected的時候,才會對全域變數obj進行寫入*/ /*只有lock成功的時候,才會對全域變數obj進行寫入*/ obj = desired; return true; } else { expected = obj; return false; /*對obj只有讀取*/ } } ``` * C11 atomic ```clike int atomic_exchange(atomic_int* obj, int desired) { register tmp = obj; obj = desired; /*修改了全域變數*/ return tmp; } /*global value*/ atomic_int lock = 0; do { while (atomic_exchange(&lock, 1)) ; /* do nothing */ /* critical section */ lock = 0; /* remainder section */ } while (true); ``` ```clike bool atomic_compare_exchange_strong (volatile atomic_int* obj, int* expected, int desired );{ if (obj == expected) { /*只有在obj == expected的時候,才會對全域變數obj進行寫入*/ /*只有lock成功的時候,才會對全域變數obj進行寫入*/ obj = desired; return true; } else { expected = obj; return false; /*對obj只有讀取*/ } } void p1(void) { int expected; while (1) { while (!atomic_compare_exchange_strong(&current, &expected, 1)) { expected = 0; } //執行到這一步驟,表示 current == expected == 0,即拿到了lock //此時,current = myid //critical section in_cs++; p1_in_cs++; //離開CS in_cs--; current = 0; } } ``` ### Linux kernel 的 semaphore * down(semaphore *sem),如果count>0表示lock成功,程式進入CS。==否則將呼叫這個函數的task串到wait_list(相當於waiting queue),然後呼叫schedule()== * up(semaphore *sem),如果wait_list是空的,那麼count++,否則從wait_list找出最前面的task,將該task自wait_list移除,並呼叫==wake_up_process將這個task放到適當的run-queue== ```clike struct semaphore { raw_spinlock_t lock; unsigned int count; struct list_head wait_list; }; ``` * `schedule()` 是觸發排程器,主動讓出 CPU 使用權 ```clike void down(struct semaphore *sem) { struct semaphore_waiter waiter; if (sem->count > 0) { sem->count--; /* 1 */ return; } waiter.task = current; /* 2 */ list_add_tail(&waiter.list, &sem->wait_list); /* 2 */ schedule(); /* 3 */ } ``` * 如果 wait_list 沒有 process,那麼只需要增加 counter; * 從 wait_list 開頭取出第一個 process,並從 linked list 上移除。然後就是喚醒該 process; ```clike void up(struct semaphore *sem) { struct semaphore_waiter waiter; if (list_empty(&sem->wait_list)) { sem->count++; /* 1 */ return; } waiter = list_first_entry(&sem->wait_list, struct semaphore_waiter, list); list_del(&waiter->list); /* 2 */ wake_up_process(waiter->task); /* 2 */ } ``` ### Memory order * `atomic_store(&flag[0], 1)` 使用了最強的記憶體模型 * 限制了compiler的優化空 * 限制了CPU的優化空間,CPU無法跨過這個「屏障」進行動態調整指令的執行順序(out-of-order execution) * ==雖然大幅度的降低電腦效能,但在程式設計上相對簡單許多== * C11 memory order 定義 ```clike #include <stdatomic.h> // 越上面越弱 typedef enum memory_order { memory_order_relaxed, memory_order_consume, memory_order_acquire, memory_order_release, memory_order_acq_rel, memory_order_seq_cst } memory_order; /*example*/ atomic_load_explicit( const volatile A* obj, memory_order order ); atomic_fetch_add_explicit( volatile A* obj, M arg, memory_order order ); ``` ### ticket lock 解決 spinlock 可能不公平的機制 若多個 process 申請 lock 的時候,當第一個申請 lock 成功的 process 在釋放時,==其他 process 即刻處於競爭的關係,因此會造成不公平== (可能會有某個 process 頻繁持有 lock)。 現在的 Linux 採用 Ticket lock 排隊機制,也就是說,誰先申請,誰就先得到 lock ```clike struct ticketlock_t { volatile atomic_int next_ticket; volatile int now_serving; } myTicketlock; void ticketLock_acquire(volatile atomic_int* next_ticket,volatile int* now_serving){ int my_ticket; my_ticket = atomic_fetch_add(next_ticket, 1); while(*now_serving != my_ticket) ; } void ticketLock_release(volatile int *now_serving) { ++*now_serving; } ``` ### read-write lock (rwlock) 無論是 spinlock 還是 semaphore,在同一時間僅能有一個 process 進入 critical section (CS)。對於有些情況,我們可==區分讀寫操作,進而提出最佳化機制==,讓 read 操作的 process 可以並行 (concurrent) 運作。對於 write 操作僅限於==一個 process 進入 CS==。這種同步機制就是 rwlock * 同一時間僅有一個 writer process 進入 CS; * 在沒有 writer process 進入 CS 時,同時可有==多個 reader process 進入 CS==; * reader process 和 writer process 不可以同時進入 CS; ```shell +----+-------------------------------------------------+ | 31 | 30 0 | +----+-------------------------------------------------+ | | | +----> [0:30] Read Thread Counter +-------------------------> [31] Write Thread Counter ``` * one bit for writer