--- title: 'IPC 進程同步、通訊' disqus: kyleAlien --- IPC 進程同步、通訊 === ## OverView of Content [TOC] ## 進程 IPC 概述 * 每個應用都是一個進程,**進程與進程之間是不能相互同訊,並且資源不共享**(為了避免非法訪問),但是仍有方法可以通過進程來通訊(eg. 複製應用 A 的文字到 應用 B 當中) * 廣義的來說,進程通訊 (Inter-process communication, IPC) 是指運行在不同進程(**不論是否在同一台機器**)中的若干線程的數據交換 ## IPC 通訊 IPC 通訊有許多種實現方式,以下會介紹幾種常用的 IPC 通訊方式 ### 管道 Pipe * **管道適用於所有 POSIX 系統(Linux) & Windows 產品**,Pipe 是 Linux 由 UNIX 繼承而來的進程通訊機制,管道有以下特性 1. 分立管道的兩邊進行數據通訊 2. **管道是單向的只能 ++單邊++ 讀 or 寫如,同水流(也就是半雙工)**,如果要讀寫同時就必須建立兩個管道 > 半雙工:雙向都可以通,但不能同時讀、寫 3. 一根管道就有讀取&寫入的特性,不是讀就是寫,**但無法同時** 4. 管道有 **容量限制**,若是管道滿了則會堵塞,若是 **空的也會掛起等待** > ![](https://i.imgur.com/PzjhG7M.png) * 管道機制其實就是 **內存中創建一個共享文件,兩個進程都會映射該文件到自己的進程**,這是有關於記憶體的 [**檔案映射(請參考另外一篇)**](https://hackmd.io/ptxTEwCzQBuxv61dGL_lvA?view#%E6%AA%94%E6%A1%88%E6%98%A0%E5%B0%84---mmap-%E5%8B%95%E6%85%8B%E6%8B%93%E5%B1%95) > ![](https://i.imgur.com/rCAUJfd.png) * Linux 有相對的函數可以實現管道 Pipe | Linux 函數 | 功能 | 返回 | | -------- | -------- | -------- | | int pipe(int[], int) | 創建管道 | 返回 -1 代表失敗 | :::success * **Named Pipe**(FIFO)? 兩個進程之間,管道必須相同(跟內存共享差不多)才可以進行通訊,若是沒有關係則無發通訊,之後產生出了 Named Pipe 發展,Named Pipe 的生命週期也不隨進程結束而結束(sytem-persistent),必須手動結束它 ::: ### 消息隊列 * 消息隊列是由一個一個 Message 所串成的 Linked 列表,每個 Message 都存放在內存中,**由消息隊列 ++標示符號++ 進行標示**,並允 1 ~ 多個進程訪問該消息(Read or Write) :::warning * 消息隊列會**複製兩次消息**,標識符是用來做同步管理 ::: > ![](https://i.imgur.com/uNhsMqA.png) ### 共享內存 Shared Memory * 由兩個進程直接共享同一塊內存(像是共享同一塊土地),這種共享需要經過以下幾個步驟 1. 創建:創建內存共享區 2. 映射:映射該內存共享區到兩個進程的記憶體 3. 訪問:訪問內存共享區 4. 通訊:通訊進程通訊 > 共享內存本身並沒有同步機制,所以同步方案需要協商 5. 撤銷:雙方的內存映射區 6. 刪除:內存用區 > 好處~ 方便回收內存 > > ![](https://i.imgur.com/TbAUw2S.png) * 它與管道的概念類似,同樣是使用一塊內存空間,將該空間映射到司有的虛擬地址中(進程中),我們就可以對該進程進行直接讀寫 * 它的好處就很明顯,速度相當快,但壞處是 **需要自己做內存保護(鎖),否則會發生資源混亂** (管道就不會有這個壞處,因為 PipeLine 是半雙工) > ![](https://i.imgur.com/MbZzT37.png) * Linux 有相對的函數可以實現共享內存 | Linux 函數 | 功能 | 返回 | | -------- | -------- | -------- | | int shmget(...) | 分配內存 | 返回內存區的 id | | char* shmat(...) | 記憶體映射 | 返回映射區的起始內存地址 | | int shmdt(...) | 刪除內存映射 | 返回 0 表示成功 | | int shmctl(...) | 控制內存區數據 | 返回內存區的 id | ### UNIX Domain Socket * Android 中最常使用的機制是 Binder 再來就是 UDS Socket * UNIX Domain Socket(UDS)是專門針對進程溝通設計出來的,也稱為 IPC Socket,IPC **Socket & Network Socket 在實現上差異級大** :::info * Network Socket ? 在網路上我們使用的 Socket 稱之為 **Network Socket**,也同樣可以用在同台機器中的進程溝通,但是 **效率不是很好**(需要多次複製) ::: * UDS 的啟動與 Network Socket 差不多,只是在參數上有些區分 1. 服務器端監聽 IPC 請求 2. 客戶端發起 IPC 連接 3. 連接成功 4. 傳送數據 > ![](https://i.imgur.com/EeafI82.png) ### RPC - Remote Procedure Calls * RPC 通訊涉及了雙方運行在 **不同機台上的通訊**,在 RPC 通訊時不須特別關注他是如何實現通訊的 1. 客戶關進程調用 Sub 接口 2. Stub 根據操作系統進行打包數據,並執行 **系統調用** 3. **由內核進行與服務器的互交**,負責將客戶端的數據傳給服務器的內核 4. 服務器 Sub 接收到數據,並批配解析 5. 傳送到服務器進程 > ![](https://i.imgur.com/WD8hw67.png) ## 同步機制 進程最難的問題在於,**如何同步**,只有達成同步才能 ++安全取得、設定++ 資源 同步定義:多個進程之間存在時序關係,++必須要協同完成 **一項** 任務時,則稱為**同步**++ ### 信號量 - Semaphore * Semaphore 包括了 ^1^ 信號量(Semaphore)、^2^ Operation P(Wait)、^3^ Operation V(Single),**信號量代表個 ++共用的資源++**,P 表示消耗資源,V 回歸資源 * 要注意幾個點,就能了解 1. 對 S 的操作都是 **原子操作** 2. P 在資源不足時會等待 3. V 在資源足夠時會去喚醒等待的 P 操作 > P 是 Consumer、V 是 Producer > > ![](https://i.imgur.com/JTAT5ra.png) * 信號量可看做一個計時器,用來控制多個進程訪問 **同一資源**(符合定義),常用來作為一種同步手段 ### 互斥量 - Mutex * 可以把它當成簡單版本的 Semaphore,**==資源 S 只有 0 & 1==,資源只有佔用 (lcoked) 跟 被佔用中 (unlocked)**,本質上並沒有差異 :::info * 特點是 Mutex 具有所有權的概念,**只有所有者可以釋放 Mutex 資源,信號量則是任何 Thread 都可以釋放** ::: > ![](https://i.imgur.com/RZPMjHe.png) * 信號其實也是上層軟體對於 interrupt 的一種模擬(在 MCU 中其實就有相當多的 interrut),同樣可以作為異步通訊,當接收到通知就代表可以使用該資源 > Linux 內核可以用信號來通知用戶空間發生的 Event (信號資源由內核管理) ### 管程 - Monitor * 它是對 **Semaphone 機制的延伸 & 改善**,**是控制更為簡單的同步手段**(若是對於一個龐大的系統而言 Semaphone 就不容易做管理),為了使資源訪問可以 **==互斥==**,所以提出 管程的概念 * Monitor 可以安全的讓線程 or 進程安全的訪問對象 (object) or 模塊(module),**同一時間只能讓一個訪問者訪問 (鎖的概念)**,它有用以下特性: 1. 安全 2. 互斥 3. 共享 * 流行語言 Delphi、Java、Python、Ruby、C# 都有實現 Monitor 機制 ### Linux Futex * Futex 的特點就在於 Fast 快速~ * 特色:在應用程式終究可以應付大部分的同步問題,**不需要牽扯到內核空間,如此一來減少了許多系統切換上下文的時間 (systemcall)** * Futex 在 **Android 中使用在 ++ART 虛擬機++ ([mutex.cc)](https://android.googlesource.com/platform/art/+/master/runtime/base/mutex.cc)**,如果開啟了 **ART_USE_FUTEXES 宏**,則會使用 Linux Futex 來實現 ```cpp= // --------------------------------定義------------------------------------ // 位置:/runtime/base/mutex.h class LOCKABLE Mutex : public BaseMutex { // ... 省略部份 private: AtomicInteger state_and_contenders_; void increment_contenders() { state_and_contenders_.fetch_add(kContenderIncrement); } void decrement_contenders() { state_and_contenders_.fetch_sub(kContenderIncrement); } } // --------------------------------實現------------------------------------ // 位置:/runtime/base/mutex.cc void Mutex::ExclusiveLock(Thread* self) { DCHECK(self == nullptr || self == Thread::Current()); if (kDebugLocking && !recursive_) { AssertNotHeld(self); } if (!recursive_ || !IsExclusiveHeld(self)) { #if ART_USE_FUTEXES // 宏開關 bool done = false; do { int32_t cur_state = state_and_contenders_.load(std::memory_order_relaxed); // 獲取鎖 if (LIKELY((cur_state & kHeldMask) == 0) /* lock not held */) { // 1. 獲取到鎖,運行 CAS 機制 done = state_and_contenders_.CompareAndSetWeakAcquire(cur_state, cur_state | kHeldMask); } else { // 沒有拿到鎖 ScopedContentionRecorder scr(this, SafeGetTid(self), GetExclusiveOwnerTid()); // ... 忽略部份 do { timespec timeout_ts; timeout_ts.tv_sec = 0; timeout_ts.tv_nsec = Runtime::Current()->GetMonitorTimeoutNs(); // !! 重點 futex 函數 !! // @ futex if (futex(state_and_contenders_.Address(), FUTEX_WAIT_PRIVATE, cur_state, enable_monitor_timeout_ ? &timeout_ts : nullptr , nullptr, 0) != 0) { if ((errno != EAGAIN) && (errno != EINTR)) { if (errno == ETIMEDOUT) { try_times++; if (try_times <= kMonitorTimeoutTryMax) { DumpStack(self, wait_start_ms, try_times); } } else { PLOG(FATAL) << "futex wait failed for " << name_; } } } SleepIfRuntimeDeleted(self); cur_state = state_and_contenders_.load(std::memory_order_relaxed); } while ((cur_state & kHeldMask) != 0); } } while (!done); #else // 使用傳統的 pthread 實現 (信號) CHECK_MUTEX_CALL(pthread_mutex_lock, (&mutex_)); ``` 1. Mutex 加鎖的邏輯是,如果獲取的到鎖,則直接返回,否則就呈現掛起(等待)的狀態 2. 調用 futex 時使用指令 **FUTEX_WAIT_PRIVATE**(operation)通常代表 futext word 保持 val 值,可能會需要等待較長的時間 ```cpp= // 函數原型 static inline int futex( volatile int *uaddr, // 指向 futext word int op, // operation int val, // 根據 op 的不同而有所差異 const struct teimspec *timeout, volatile int *uaddr2, int val3); // 函數呼叫 futex(state_and_contenders_.Address(), // futext word FUTEX_WAIT_PRIVATE, cur_state, enable_monitor_timeout_ ? &timeout_ts : nullptr , nullptr, 0) ``` :::success * Futex 優勢: 在不存在競爭的情況下,採用 Futex 機制在 **用戶狀態就可以完成鎖的獲取**,而不需要通過調用進入內核,從而 **提高了效率** > 省略了 systemcall 的上下文切換時間 (CPU 切換 Process 的時間) ::: ## Android 進程同步 目前 android 封裝的同步類包括以下三種 * **[Mutex](https://android.googlesource.com/platform/frameworks/native/+/android-4.2.2_r1/include/utils/Mutex.h)** * 頭文件:`/include/utils/Mutex.h` * 介紹:Android 的 Mutex 是對 **pthread(Linux Futex) 提供的 API 進行簡單封裝**,函數聲明 & 實現都在頭文件中,方便使用者調用,**同時還包含了 AutoLock 類(自動釋放 Lock)** * **[Condition](https://android.googlesource.com/platform/system/core/+/master/libutils/include/utils/Condition.h)** * 頭文件:`/include/utils/Condition.h` * 介紹:Condition 是添加了 **==條件==,它是依賴 Mutex 來完成的** * **[Barrier](https://android.googlesource.com/platform/frameworks/native/+/android-7.1.1_r28/services/surfaceflinger/Barrier.h)** * 頭文件:`/surfaceflinger/Barrier.h` * 介紹:**Barrier 是基於 Mutex & Condition 實現的模型**,有條件的 Mutex ### 進程同步 - [Mutex](https://android.googlesource.com/platform/frameworks/native/+/android-4.2.2_r1/include/utils/Mutex.h) & AutoMutex * **Mutex 實際上只是基於 ==pthread_mutex_t== 類型的封裝** ```cpp= // -------------------------------定義------------------------------------- // include/utils/Mutex.h class Mutex { public: enum { PRIVATE = 0, // 同一進程內 同步 SHARED = 1 // 不同進程 同步 }; // 建構函數 Mutex(); Mutex(const char* name); Mutex(int type, const char* name = NULL); // 可指定 同步狀態 ~Mutex(); // 函數 status_t lock(); // 獲取鎖,若獲取不到則會等待(掛起) void unlock(); // 喚醒等待的鎖 status_t tryLock(); // 嘗試獲取鎖,若獲取不到並不會等待 // Manages the mutex automatically. It'll be locked when Autolock is // constructed and released when Autolock goes out of scope. class Autolock { public: inline Autolock(Mutex& mutex) : mLock(mutex) { mLock.lock(); } inline Autolock(Mutex* mutex) : mLock(*mutex) { mLock.lock(); } inline ~Autolock() { mLock.unlock(); } private: Mutex& mLock; }; private: friend class Condition; // A mutex cannot be copied Mutex(const Mutex&); Mutex& operator = (const Mutex&); #if defined(HAVE_PTHREADS) pthread_mutex_t mMutex; // 使用 pthread_mutex_t 封裝 #else void _init(); void* mState; #endif }; // -------------------------------實現------------------------------------- // include/utils/Mutex.h #if defined(HAVE_PTHREADS) // 建構函數 inline Mutex::Mutex() { pthread_mutex_init(&mMutex, NULL); } inline Mutex::Mutex(const char* name) { pthread_mutex_init(&mMutex, NULL); } inline Mutex::Mutex(int type, const char* name) { if (type == SHARED) { pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); pthread_mutex_init(&mMutex, &attr); pthread_mutexattr_destroy(&attr); } else { pthread_mutex_init(&mMutex, NULL); } } // 解構函數 inline Mutex::~Mutex() { pthread_mutex_destroy(&mMutex); } // 內聯函數,主要在操作 pthread_mutex_t inline status_t Mutex::lock() { return -pthread_mutex_lock(&mMutex); // mMutex 是 pthread_mutex_t 是類型 } inline void Mutex::unlock() { pthread_mutex_unlock(&mMutex); } inline status_t Mutex::tryLock() { return -pthread_mutex_trylock(&mMutex); } #endif // HAVE_PTHREADS typedef Mutex::Autolock AutoMutex; ``` :::info * AutoMutex 類 另外 Mutex 有定義一個內部類 AutoMutex,用於 **解構時** 自動釋放鎖 ```cpp= class Autolock { public: inline Autolock(Mutex& mutex) : mLock(mutex) { mLock.lock(); } inline Autolock(Mutex* mutex) : mLock(*mutex) { mLock.lock(); } inline ~Autolock() { mLock.unlock(); } private: Mutex& mLock; }; ``` ::: ### 條件判斷 - [Condition](https://android.googlesource.com/platform/system/core/+/master/libutils/include/utils/Condition.h) * 核心思想是 **條件滿足就返回,繼續執行動作**,否則就休眠等待(掛起),直到有滿足條件的人將其喚醒 * **這就像是 ++Mutex 必須主動++ 去獲取鎖再判斷,而 ++Condition 是被動++ 被通知** ```cpp= // -------------------------------定義------------------------------------- // /utils/Condition.h class Condition { public: enum { PRIVATE = 0, // 同 Mutex SHARED = 1 }; enum WakeUpType { WAKE_UP_ONE = 0, // 喚醒單一個 WAKE_UP_ALL = 1 // 全部喚醒 }; // 建構函數 Condition(); explicit Condition(int type); ~Condition(); // 在某個條件上等待 status_t wait(Mutex& mutex); // 在規定時間內喚醒,若無法喚醒則退出 status_t waitRelative(Mutex& mutex, nsecs_t reltime); // 喚醒 void signal(); // 指定喚醒方法 void signal(WakeUpType type) { if (type == WAKE_UP_ONE) { signal(); } else { broadcast(); } } // 通知所有等待者 void broadcast(); private: #if !defined(_WIN32) pthread_cond_t mCond; #else void* mState; #endif }; // -------------------------------實現------------------------------------- // /utils/Condition.h // 建構函數 inline Condition::Condition() : Condition(PRIVATE) { } inline Condition::Condition(int type) { pthread_condattr_t attr; pthread_condattr_init(&attr); #if defined(__linux__) pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); #endif if (type == SHARED) { pthread_condattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); } pthread_cond_init(&mCond, &attr); pthread_condattr_destroy(&attr); } // 解構函數 inline Condition::~Condition() { pthread_cond_destroy(&mCond); } inline status_t Condition::wait(Mutex& mutex) { return -pthread_cond_wait(&mCond, &mutex.mMutex); } inline void Condition::signal() { pthread_cond_signal(&mCond); } inline void Condition::broadcast() { pthread_cond_broadcast(&mCond); } ``` * **Condition 是一個殼,它並不會去定義具體的喚醒條件**,Condition 想要提供一種通用的解決方案,而不是針對某些具體條件去設計 * **==Condition & Mutex 所持有是同一把鎖==** * 為何 Condition wait 會需要 Mutex 會在 Barrier 看到範例 ### 柵欄 - [Barrier](https://android.googlesource.com/platform/frameworks/native/+/android-7.1.1_r28/services/surfaceflinger/Barrier.h) * 先說明一下 **Barrier 累事專門為 SurfaceFlinger 設計的** * Mutex 是互斥鎖,Condition 表示條件,而 **Barrier 則是 ++Condition++ & ++Mutex++ 兩者的應用** ```cpp= class Barrier { public: inline Barrier() : state(CLOSED) { } inline ~Barrier() { } void open() { Mutex::Autolock _l(lock); state = OPENED; // 獲取到鎖之後才能改變狀態 cv.broadcast(); // 使用 broadcast 通知所有等待對象 } void close() { Mutex::Autolock _l(lock); // 獲取到鎖之後才能改變狀態 state = CLOSED; } void wait() const { Mutex::Autolock _l(lock); while (state == CLOSED) { cv.wait(lock); } } private: enum { OPENED, CLOSED }; mutable Mutex lock; mutable Condition cv; volatile int state; }; ``` * Barrier#wait 函數為何需要先獲取 Mutex 鎖再調用 Control 的方法? 1. 假設沒有 Mutex 這把鎖 ```cpp= void wait() const { // Mutex::Autolock _l(lock); while (state == CLOSED) { cv.wait(lock); } } ``` 1. 線程 A 通過 wait() 取得鎖,發現是 CLOSE (準備休眠) 2. 線程 B 通過 open() 取得鎖,將它改為 OPEN 3. B 線程 OPEN 喚醒了 A 線程,**但此時 A 還沒睡眠,導致 ++沒有任何線程會被喚醒++** 4. 最終導致 **A 線程不斷睡眠** 2. 接著看 Condition# wait 方法,pthread_cond_wait 的等待邏輯(休眠的過程) ```cpp= // 接著查看 Condition 的 wait 方法 inline status_t Condition::wait(Mutex& mutex) { return -pthread_cond_wait(&mCond, &mutex.mMutex); } ``` 1. 釋放鎖 mutex (這樣其他人才能獲取鎖,**如果無釋放所就休眠就會變成死鎖**) 2. 進入休眠等待 (這時已經釋放鎖) 3. 喚醒後再獲取 mutex 鎖 3. 如果在 wait 之前先使用鎖鎖住就不會發生無法喚醒的問題 * Barrier 通常用在某現程式已經完成初始化的判斷,這種場景是不可逆的 ### 讀寫鎖 - [ReaderWriterMutex](https://android.googlesource.com/platform/art/+/master/runtime/base/mutex.h) * Read 的特點是可以多個線程(進程)一起讀取同一個檔案,Write 則是相反,只能一次一個線程(進程)執行,所以 Write 就是排他鎖(Exclusive) ```cpp= // Write void ExclusiveLock(Thread* self) ACQUIRE(); void ExclusiveLockUncontendedFor(Thread* new_owner); bool ExclusiveLockWithTimeout(Thread* self, int64_t ms, int32_t ns) EXCLUSIVE_TRYLOCK_FUNCTION(true); // Read void SharedLock(Thread* self) ACQUIRE_SHARED() ALWAYS_INLINE; void ReaderLock(Thread* self) ACQUIRE_SHARED() { SharedLock(self); } void SharedUnlock(Thread* self) RELEASE_SHARED() ALWAYS_INLINE; void ReaderUnlock(Thread* self) RELEASE_SHARED() { SharedUnlock(self); } ``` * ReaderWriterMutex 可以有三種狀態 1. **Free**:還未被任何對象持有 2. **Exclusive**:當前被唯一一個對象持有 3. **Shared**:可被多個對象持有 | State | ExclusiveLock | ExclusiveUnlock | SharedLock | SharedUnlock | | - | - | - | - | - | | Free | Exclusive | error | SharedLock(1) | error | | Exclusive | Block | Free | Block | error | | Shared(n) | Block | error | SharedLock(n+1)* | Shared(n-1) or Free | ## Appendix & FAQ :::info ::: ###### tags: `Android 系統` `進程`