---
title: 'IPC 進程同步、通訊'
disqus: kyleAlien
---
IPC 進程同步、通訊
===
## OverView of Content
[TOC]
## 進程 IPC 概述
* 每個應用都是一個進程,**進程與進程之間是不能相互同訊,並且資源不共享**(為了避免非法訪問),但是仍有方法可以通過進程來通訊(eg. 複製應用 A 的文字到 應用 B 當中)
* 廣義的來說,進程通訊 (Inter-process communication, IPC) 是指運行在不同進程(**不論是否在同一台機器**)中的若干線程的數據交換
## IPC 通訊
IPC 通訊有許多種實現方式,以下會介紹幾種常用的 IPC 通訊方式
### 管道 Pipe
* **管道適用於所有 POSIX 系統(Linux) & Windows 產品**,Pipe 是 Linux 由 UNIX 繼承而來的進程通訊機制,管道有以下特性
1. 分立管道的兩邊進行數據通訊
2. **管道是單向的只能 ++單邊++ 讀 or 寫如,同水流(也就是半雙工)**,如果要讀寫同時就必須建立兩個管道
> 半雙工:雙向都可以通,但不能同時讀、寫
3. 一根管道就有讀取&寫入的特性,不是讀就是寫,**但無法同時**
4. 管道有 **容量限制**,若是管道滿了則會堵塞,若是 **空的也會掛起等待**
> 
* 管道機制其實就是 **內存中創建一個共享文件,兩個進程都會映射該文件到自己的進程**,這是有關於記憶體的 [**檔案映射(請參考另外一篇)**](https://hackmd.io/ptxTEwCzQBuxv61dGL_lvA?view#%E6%AA%94%E6%A1%88%E6%98%A0%E5%B0%84---mmap-%E5%8B%95%E6%85%8B%E6%8B%93%E5%B1%95)
> 
* Linux 有相對的函數可以實現管道 Pipe
| Linux 函數 | 功能 | 返回 |
| -------- | -------- | -------- |
| int pipe(int[], int) | 創建管道 | 返回 -1 代表失敗 |
:::success
* **Named Pipe**(FIFO)?
兩個進程之間,管道必須相同(跟內存共享差不多)才可以進行通訊,若是沒有關係則無發通訊,之後產生出了 Named Pipe 發展,Named Pipe 的生命週期也不隨進程結束而結束(sytem-persistent),必須手動結束它
:::
### 消息隊列
* 消息隊列是由一個一個 Message 所串成的 Linked 列表,每個 Message 都存放在內存中,**由消息隊列 ++標示符號++ 進行標示**,並允 1 ~ 多個進程訪問該消息(Read or Write)
:::warning
* 消息隊列會**複製兩次消息**,標識符是用來做同步管理
:::
> 
### 共享內存 Shared Memory
* 由兩個進程直接共享同一塊內存(像是共享同一塊土地),這種共享需要經過以下幾個步驟
1. 創建:創建內存共享區
2. 映射:映射該內存共享區到兩個進程的記憶體
3. 訪問:訪問內存共享區
4. 通訊:通訊進程通訊
> 共享內存本身並沒有同步機制,所以同步方案需要協商
5. 撤銷:雙方的內存映射區
6. 刪除:內存用區
> 好處~ 方便回收內存
>
> 
* 它與管道的概念類似,同樣是使用一塊內存空間,將該空間映射到司有的虛擬地址中(進程中),我們就可以對該進程進行直接讀寫
* 它的好處就很明顯,速度相當快,但壞處是 **需要自己做內存保護(鎖),否則會發生資源混亂** (管道就不會有這個壞處,因為 PipeLine 是半雙工)
> 
* Linux 有相對的函數可以實現共享內存
| Linux 函數 | 功能 | 返回 |
| -------- | -------- | -------- |
| int shmget(...) | 分配內存 | 返回內存區的 id |
| char* shmat(...) | 記憶體映射 | 返回映射區的起始內存地址 |
| int shmdt(...) | 刪除內存映射 | 返回 0 表示成功 |
| int shmctl(...) | 控制內存區數據 | 返回內存區的 id |
### UNIX Domain Socket
* Android 中最常使用的機制是 Binder 再來就是 UDS Socket
* UNIX Domain Socket(UDS)是專門針對進程溝通設計出來的,也稱為 IPC Socket,IPC **Socket & Network Socket 在實現上差異級大**
:::info
* Network Socket ?
在網路上我們使用的 Socket 稱之為 **Network Socket**,也同樣可以用在同台機器中的進程溝通,但是 **效率不是很好**(需要多次複製)
:::
* UDS 的啟動與 Network Socket 差不多,只是在參數上有些區分
1. 服務器端監聽 IPC 請求
2. 客戶端發起 IPC 連接
3. 連接成功
4. 傳送數據
> 
### RPC - Remote Procedure Calls
* RPC 通訊涉及了雙方運行在 **不同機台上的通訊**,在 RPC 通訊時不須特別關注他是如何實現通訊的
1. 客戶關進程調用 Sub 接口
2. Stub 根據操作系統進行打包數據,並執行 **系統調用**
3. **由內核進行與服務器的互交**,負責將客戶端的數據傳給服務器的內核
4. 服務器 Sub 接收到數據,並批配解析
5. 傳送到服務器進程
> 
## 同步機制
進程最難的問題在於,**如何同步**,只有達成同步才能 ++安全取得、設定++ 資源
同步定義:多個進程之間存在時序關係,++必須要協同完成 **一項** 任務時,則稱為**同步**++
### 信號量 - Semaphore
* Semaphore 包括了 ^1^ 信號量(Semaphore)、^2^ Operation P(Wait)、^3^ Operation V(Single),**信號量代表個 ++共用的資源++**,P 表示消耗資源,V 回歸資源
* 要注意幾個點,就能了解
1. 對 S 的操作都是 **原子操作**
2. P 在資源不足時會等待
3. V 在資源足夠時會去喚醒等待的 P 操作
> P 是 Consumer、V 是 Producer
>
> 
* 信號量可看做一個計時器,用來控制多個進程訪問 **同一資源**(符合定義),常用來作為一種同步手段
### 互斥量 - Mutex
* 可以把它當成簡單版本的 Semaphore,**==資源 S 只有 0 & 1==,資源只有佔用 (lcoked) 跟 被佔用中 (unlocked)**,本質上並沒有差異
:::info
* 特點是 Mutex 具有所有權的概念,**只有所有者可以釋放 Mutex 資源,信號量則是任何 Thread 都可以釋放**
:::
> 
* 信號其實也是上層軟體對於 interrupt 的一種模擬(在 MCU 中其實就有相當多的 interrut),同樣可以作為異步通訊,當接收到通知就代表可以使用該資源
> Linux 內核可以用信號來通知用戶空間發生的 Event (信號資源由內核管理)
### 管程 - Monitor
* 它是對 **Semaphone 機制的延伸 & 改善**,**是控制更為簡單的同步手段**(若是對於一個龐大的系統而言 Semaphone 就不容易做管理),為了使資源訪問可以 **==互斥==**,所以提出 管程的概念
* Monitor 可以安全的讓線程 or 進程安全的訪問對象 (object) or 模塊(module),**同一時間只能讓一個訪問者訪問 (鎖的概念)**,它有用以下特性:
1. 安全
2. 互斥
3. 共享
* 流行語言 Delphi、Java、Python、Ruby、C# 都有實現 Monitor 機制
### Linux Futex
* Futex 的特點就在於 Fast 快速~
* 特色:在應用程式終究可以應付大部分的同步問題,**不需要牽扯到內核空間,如此一來減少了許多系統切換上下文的時間 (systemcall)**
* Futex 在 **Android 中使用在 ++ART 虛擬機++ ([mutex.cc)](https://android.googlesource.com/platform/art/+/master/runtime/base/mutex.cc)**,如果開啟了 **ART_USE_FUTEXES 宏**,則會使用 Linux Futex 來實現
```cpp=
// --------------------------------定義------------------------------------
// 位置:/runtime/base/mutex.h
class LOCKABLE Mutex : public BaseMutex {
// ... 省略部份
private:
AtomicInteger state_and_contenders_;
void increment_contenders() {
state_and_contenders_.fetch_add(kContenderIncrement);
}
void decrement_contenders() {
state_and_contenders_.fetch_sub(kContenderIncrement);
}
}
// --------------------------------實現------------------------------------
// 位置:/runtime/base/mutex.cc
void Mutex::ExclusiveLock(Thread* self) {
DCHECK(self == nullptr || self == Thread::Current());
if (kDebugLocking && !recursive_) {
AssertNotHeld(self);
}
if (!recursive_ || !IsExclusiveHeld(self)) {
#if ART_USE_FUTEXES // 宏開關
bool done = false;
do {
int32_t cur_state = state_and_contenders_.load(std::memory_order_relaxed);
// 獲取鎖
if (LIKELY((cur_state & kHeldMask) == 0) /* lock not held */) {
// 1. 獲取到鎖,運行 CAS 機制
done = state_and_contenders_.CompareAndSetWeakAcquire(cur_state, cur_state | kHeldMask);
} else {
// 沒有拿到鎖
ScopedContentionRecorder scr(this, SafeGetTid(self), GetExclusiveOwnerTid());
// ... 忽略部份
do {
timespec timeout_ts;
timeout_ts.tv_sec = 0;
timeout_ts.tv_nsec = Runtime::Current()->GetMonitorTimeoutNs();
// !! 重點 futex 函數 !!
// @ futex
if (futex(state_and_contenders_.Address(), FUTEX_WAIT_PRIVATE, cur_state,
enable_monitor_timeout_ ? &timeout_ts : nullptr , nullptr, 0) != 0) {
if ((errno != EAGAIN) && (errno != EINTR)) {
if (errno == ETIMEDOUT) {
try_times++;
if (try_times <= kMonitorTimeoutTryMax) {
DumpStack(self, wait_start_ms, try_times);
}
} else {
PLOG(FATAL) << "futex wait failed for " << name_;
}
}
}
SleepIfRuntimeDeleted(self);
cur_state = state_and_contenders_.load(std::memory_order_relaxed);
} while ((cur_state & kHeldMask) != 0);
}
} while (!done);
#else
// 使用傳統的 pthread 實現 (信號)
CHECK_MUTEX_CALL(pthread_mutex_lock, (&mutex_));
```
1. Mutex 加鎖的邏輯是,如果獲取的到鎖,則直接返回,否則就呈現掛起(等待)的狀態
2. 調用 futex 時使用指令 **FUTEX_WAIT_PRIVATE**(operation)通常代表 futext word 保持 val 值,可能會需要等待較長的時間
```cpp=
// 函數原型
static inline int futex(
volatile int *uaddr, // 指向 futext word
int op, // operation
int val, // 根據 op 的不同而有所差異
const struct teimspec *timeout,
volatile int *uaddr2,
int val3);
// 函數呼叫
futex(state_and_contenders_.Address(), // futext word
FUTEX_WAIT_PRIVATE,
cur_state,
enable_monitor_timeout_ ? &timeout_ts : nullptr ,
nullptr,
0)
```
:::success
* Futex 優勢:
在不存在競爭的情況下,採用 Futex 機制在 **用戶狀態就可以完成鎖的獲取**,而不需要通過調用進入內核,從而 **提高了效率**
> 省略了 systemcall 的上下文切換時間 (CPU 切換 Process 的時間)
:::
## Android 進程同步
目前 android 封裝的同步類包括以下三種
* **[Mutex](https://android.googlesource.com/platform/frameworks/native/+/android-4.2.2_r1/include/utils/Mutex.h)**
* 頭文件:`/include/utils/Mutex.h`
* 介紹:Android 的 Mutex 是對 **pthread(Linux Futex) 提供的 API 進行簡單封裝**,函數聲明 & 實現都在頭文件中,方便使用者調用,**同時還包含了 AutoLock 類(自動釋放 Lock)**
* **[Condition](https://android.googlesource.com/platform/system/core/+/master/libutils/include/utils/Condition.h)**
* 頭文件:`/include/utils/Condition.h`
* 介紹:Condition 是添加了 **==條件==,它是依賴 Mutex 來完成的**
* **[Barrier](https://android.googlesource.com/platform/frameworks/native/+/android-7.1.1_r28/services/surfaceflinger/Barrier.h)**
* 頭文件:`/surfaceflinger/Barrier.h`
* 介紹:**Barrier 是基於 Mutex & Condition 實現的模型**,有條件的 Mutex
### 進程同步 - [Mutex](https://android.googlesource.com/platform/frameworks/native/+/android-4.2.2_r1/include/utils/Mutex.h) & AutoMutex
* **Mutex 實際上只是基於 ==pthread_mutex_t== 類型的封裝**
```cpp=
// -------------------------------定義-------------------------------------
// include/utils/Mutex.h
class Mutex {
public:
enum {
PRIVATE = 0, // 同一進程內 同步
SHARED = 1 // 不同進程 同步
};
// 建構函數
Mutex();
Mutex(const char* name);
Mutex(int type, const char* name = NULL); // 可指定 同步狀態
~Mutex();
// 函數
status_t lock(); // 獲取鎖,若獲取不到則會等待(掛起)
void unlock(); // 喚醒等待的鎖
status_t tryLock(); // 嘗試獲取鎖,若獲取不到並不會等待
// Manages the mutex automatically. It'll be locked when Autolock is
// constructed and released when Autolock goes out of scope.
class Autolock {
public:
inline Autolock(Mutex& mutex) : mLock(mutex) { mLock.lock(); }
inline Autolock(Mutex* mutex) : mLock(*mutex) { mLock.lock(); }
inline ~Autolock() { mLock.unlock(); }
private:
Mutex& mLock;
};
private:
friend class Condition;
// A mutex cannot be copied
Mutex(const Mutex&);
Mutex& operator = (const Mutex&);
#if defined(HAVE_PTHREADS)
pthread_mutex_t mMutex; // 使用 pthread_mutex_t 封裝
#else
void _init();
void* mState;
#endif
};
// -------------------------------實現-------------------------------------
// include/utils/Mutex.h
#if defined(HAVE_PTHREADS)
// 建構函數
inline Mutex::Mutex() {
pthread_mutex_init(&mMutex, NULL);
}
inline Mutex::Mutex(const char* name) {
pthread_mutex_init(&mMutex, NULL);
}
inline Mutex::Mutex(int type, const char* name) {
if (type == SHARED) {
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(&mMutex, &attr);
pthread_mutexattr_destroy(&attr);
} else {
pthread_mutex_init(&mMutex, NULL);
}
}
// 解構函數
inline Mutex::~Mutex() {
pthread_mutex_destroy(&mMutex);
}
// 內聯函數,主要在操作 pthread_mutex_t
inline status_t Mutex::lock() {
return -pthread_mutex_lock(&mMutex); // mMutex 是 pthread_mutex_t 是類型
}
inline void Mutex::unlock() {
pthread_mutex_unlock(&mMutex);
}
inline status_t Mutex::tryLock() {
return -pthread_mutex_trylock(&mMutex);
}
#endif // HAVE_PTHREADS
typedef Mutex::Autolock AutoMutex;
```
:::info
* AutoMutex 類
另外 Mutex 有定義一個內部類 AutoMutex,用於 **解構時** 自動釋放鎖
```cpp=
class Autolock {
public:
inline Autolock(Mutex& mutex) : mLock(mutex) { mLock.lock(); }
inline Autolock(Mutex* mutex) : mLock(*mutex) { mLock.lock(); }
inline ~Autolock() { mLock.unlock(); }
private:
Mutex& mLock;
};
```
:::
### 條件判斷 - [Condition](https://android.googlesource.com/platform/system/core/+/master/libutils/include/utils/Condition.h)
* 核心思想是 **條件滿足就返回,繼續執行動作**,否則就休眠等待(掛起),直到有滿足條件的人將其喚醒
* **這就像是 ++Mutex 必須主動++ 去獲取鎖再判斷,而 ++Condition 是被動++ 被通知**
```cpp=
// -------------------------------定義-------------------------------------
// /utils/Condition.h
class Condition {
public:
enum {
PRIVATE = 0, // 同 Mutex
SHARED = 1
};
enum WakeUpType {
WAKE_UP_ONE = 0, // 喚醒單一個
WAKE_UP_ALL = 1 // 全部喚醒
};
// 建構函數
Condition();
explicit Condition(int type);
~Condition();
// 在某個條件上等待
status_t wait(Mutex& mutex);
// 在規定時間內喚醒,若無法喚醒則退出
status_t waitRelative(Mutex& mutex, nsecs_t reltime);
// 喚醒
void signal();
// 指定喚醒方法
void signal(WakeUpType type) {
if (type == WAKE_UP_ONE) {
signal();
} else {
broadcast();
}
}
// 通知所有等待者
void broadcast();
private:
#if !defined(_WIN32)
pthread_cond_t mCond;
#else
void* mState;
#endif
};
// -------------------------------實現-------------------------------------
// /utils/Condition.h
// 建構函數
inline Condition::Condition() : Condition(PRIVATE) {
}
inline Condition::Condition(int type) {
pthread_condattr_t attr;
pthread_condattr_init(&attr);
#if defined(__linux__)
pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
#endif
if (type == SHARED) {
pthread_condattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
}
pthread_cond_init(&mCond, &attr);
pthread_condattr_destroy(&attr);
}
// 解構函數
inline Condition::~Condition() {
pthread_cond_destroy(&mCond);
}
inline status_t Condition::wait(Mutex& mutex) {
return -pthread_cond_wait(&mCond, &mutex.mMutex);
}
inline void Condition::signal() {
pthread_cond_signal(&mCond);
}
inline void Condition::broadcast() {
pthread_cond_broadcast(&mCond);
}
```
* **Condition 是一個殼,它並不會去定義具體的喚醒條件**,Condition 想要提供一種通用的解決方案,而不是針對某些具體條件去設計
* **==Condition & Mutex 所持有是同一把鎖==**
* 為何 Condition wait 會需要 Mutex 會在 Barrier 看到範例
### 柵欄 - [Barrier](https://android.googlesource.com/platform/frameworks/native/+/android-7.1.1_r28/services/surfaceflinger/Barrier.h)
* 先說明一下 **Barrier 累事專門為 SurfaceFlinger 設計的**
* Mutex 是互斥鎖,Condition 表示條件,而 **Barrier 則是 ++Condition++ & ++Mutex++ 兩者的應用**
```cpp=
class Barrier
{
public:
inline Barrier() : state(CLOSED) { }
inline ~Barrier() { }
void open() {
Mutex::Autolock _l(lock);
state = OPENED; // 獲取到鎖之後才能改變狀態
cv.broadcast(); // 使用 broadcast 通知所有等待對象
}
void close() {
Mutex::Autolock _l(lock); // 獲取到鎖之後才能改變狀態
state = CLOSED;
}
void wait() const {
Mutex::Autolock _l(lock);
while (state == CLOSED) {
cv.wait(lock);
}
}
private:
enum { OPENED, CLOSED };
mutable Mutex lock;
mutable Condition cv;
volatile int state;
};
```
* Barrier#wait 函數為何需要先獲取 Mutex 鎖再調用 Control 的方法?
1. 假設沒有 Mutex 這把鎖
```cpp=
void wait() const {
// Mutex::Autolock _l(lock);
while (state == CLOSED) {
cv.wait(lock);
}
}
```
1. 線程 A 通過 wait() 取得鎖,發現是 CLOSE (準備休眠)
2. 線程 B 通過 open() 取得鎖,將它改為 OPEN
3. B 線程 OPEN 喚醒了 A 線程,**但此時 A 還沒睡眠,導致 ++沒有任何線程會被喚醒++**
4. 最終導致 **A 線程不斷睡眠**
2. 接著看 Condition# wait 方法,pthread_cond_wait 的等待邏輯(休眠的過程)
```cpp=
// 接著查看 Condition 的 wait 方法
inline status_t Condition::wait(Mutex& mutex) {
return -pthread_cond_wait(&mCond, &mutex.mMutex);
}
```
1. 釋放鎖 mutex (這樣其他人才能獲取鎖,**如果無釋放所就休眠就會變成死鎖**)
2. 進入休眠等待 (這時已經釋放鎖)
3. 喚醒後再獲取 mutex 鎖
3. 如果在 wait 之前先使用鎖鎖住就不會發生無法喚醒的問題
* Barrier 通常用在某現程式已經完成初始化的判斷,這種場景是不可逆的
### 讀寫鎖 - [ReaderWriterMutex](https://android.googlesource.com/platform/art/+/master/runtime/base/mutex.h)
* Read 的特點是可以多個線程(進程)一起讀取同一個檔案,Write 則是相反,只能一次一個線程(進程)執行,所以 Write 就是排他鎖(Exclusive)
```cpp=
// Write
void ExclusiveLock(Thread* self) ACQUIRE();
void ExclusiveLockUncontendedFor(Thread* new_owner);
bool ExclusiveLockWithTimeout(Thread* self, int64_t ms, int32_t ns)
EXCLUSIVE_TRYLOCK_FUNCTION(true);
// Read
void SharedLock(Thread* self) ACQUIRE_SHARED() ALWAYS_INLINE;
void ReaderLock(Thread* self) ACQUIRE_SHARED() { SharedLock(self); }
void SharedUnlock(Thread* self) RELEASE_SHARED() ALWAYS_INLINE;
void ReaderUnlock(Thread* self) RELEASE_SHARED() { SharedUnlock(self); }
```
* ReaderWriterMutex 可以有三種狀態
1. **Free**:還未被任何對象持有
2. **Exclusive**:當前被唯一一個對象持有
3. **Shared**:可被多個對象持有
| State | ExclusiveLock | ExclusiveUnlock | SharedLock | SharedUnlock |
| - | - | - | - | - |
| Free | Exclusive | error | SharedLock(1) | error |
| Exclusive | Block | Free | Block | error |
| Shared(n) | Block | error | SharedLock(n+1)* | Shared(n-1) or Free |
## Appendix & FAQ
:::info
:::
###### tags: `Android 系統` `進程`