--- title: 'Looper & MessageQueue 消息機制' disqus: kyleAlien --- Looper & MessageQueue 消息機制 === ## OverView of Content 這章節主要是分析 Native 層的 Looper & MessageQueue,並且看看它們是如何與 Java 層協作 (這裡不會詳細說明 Handle) :::success 當然最好也先理解 Android Handle 機制,Handle 請參考 [**Android Handler 消息機制**](https://hackmd.io/7fBX6uEtQt6AzCpuBWTHMQ?view) ::: [TOC] ## Java - 消息 Looper * 我們知道要創建一個新的 Looper 才能傳遞訊息 (Main Thread 也是,透過 Framework 層幫我們創建好 Looper);以下是一段 Thread 創建新 Looper 的過程,以下要注意幾點 1. 一個 Thread 只能創建一個 Looper,否則會拋出錯誤 2. 每個進程都只會有一個 MainLooper,其他 Thread 要創建 Looper 要使用 `Looper.prepare()` ```java= public class ThreadLooperCreator extends Thread implements Handler.Callback { private static final int MSG_1 = 0x1234; private Handler handler; public ThreadLooperCreator() { prepareLooper(); } private void prepareLooper() { Looper.prepare(); // 創建一個 Handle // 指定它接收當前 Looper 的訊息 handler = new Handler(Looper.myLooper(), this); Looper.loop(); } @Override public boolean handleMessage(@NonNull Message msg) { switch (msg.what) { case MSG_1: // TODO: break; } return true; } @Override public void run() { // 發送 Msg handler.sendEmptyMessage(MSG_1); } } ``` ### Native & Java - 消息建立關係 * 從上面我們知道一個新 Thread 要傳遞消息,要創建 Looper 對象 (Java 層),而這個 Looper 的創建關連到許多事情,其中就包括了 Native 的消息機制 這邊直接列出結果關係圖再開始分析 > ![](https://i.imgur.com/fdl21A7.png) ## Native Looper 創建流程 ### Java 層 Looper - 創建 Looper、MessageQueue * [**Looper**](https://cs.android.com/android/platform/superproject/+/master:frameworks/base/core/java/android/os/Looper.java)#prepare 方法:檢查該 Thread 是否已經創建過 Looper,沒有的話則創建一個 Looper 對象 (Java 層) ```java= // Looper.java static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>(); public static void prepare() { // @ 查看 prepare prepare(true); } private static void prepare(boolean quitAllowed) { // 一個 Thread 只能創建一個 Looper if (sThreadLocal.get() != null) { throw new RuntimeException("Only one Looper may be created per thread"); } // @ 查看 Looper 建構函數 sThreadLocal.set(new Looper(quitAllowed)); } ``` * [**Looper**](https://cs.android.com/android/platform/superproject/+/master:frameworks/base/core/java/android/os/Looper.java) 建構函數:指定目前 Thread 為 Looper Thread,並 **創建 MessageQueue 對象** ```java= // Looper.java private Looper(boolean quitAllowed) { // 創建 MessageQueue 對象 mQueue = new MessageQueue(quitAllowed); mThread = Thread.currentThread(); } ``` * [**MessageQueue**](https://cs.android.com/android/platform/superproject/+/master:frameworks/base/core/java/android/os/MessageQueue.java) 建構函數:創建 Native 層的 MessageQueue,並將地址儲存在 `mPtr` 成員中 ```java= // MessageQueue.java MessageQueue(boolean quitAllowed) { mQuitAllowed = quitAllowed; mPtr = nativeInit(); } // 呼叫 JNI private native static long nativeInit(); ``` ### Native 層 - 創建 NativeMessageQueue、Looper * 從 MessageQueue#nativeInit JNI 函數開始分析,該函數的實現在 [**android_os_MessageQueue**](https://cs.android.com/android/platform/superproject/+/master:frameworks/base/core/jni/android_os_MessageQueue.cpp),它會創建 [**NativeMessageQueue**](https://cs.android.com/android/platform/superproject/+/master:frameworks/base/core/jni/android_os_MessageQueue.cpp) 對象,並將對像地址返回給 Java 層儲存 (存在 `mPtr` 中) ```cpp= // android_os_MessageQueue.cpp static const JNINativeMethod gMessageQueueMethods[] = { // 對應實現函數為 android_os_MessageQueue_nativeInit { "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit }, ... 省略部分 }; static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) { NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(); if (!nativeMessageQueue) { jniThrowRuntimeException(env, "Unable to allocate native queue"); return 0; } nativeMessageQueue->incStrong(env); // 返回 Java 層 return reinterpret_cast<jlong>(nativeMessageQueue); } ``` * **NativeMessageQueue 建構函數**:創建 Native Looper 對象 (Native 層的 Looper) ! :::info 從這邊可以看到 NativeMessageQueue、MessageQueue 都繼承於 RefBase,所以可以使用智能指標控制生命週期 ::: ```cpp= // android_os_MessageQueue.cpp NativeMessageQueue::NativeMessageQueue() : mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) { // 查看當前 Thread 是否有 Looper mLooper = Looper::getForThread(); if (mLooper == NULL) { // 創建 Looper mLooper = new Looper(false); Looper::setForThread(mLooper); } } // -------------------------------------------------------------- // android_os_MessageQueue.h class MessageQueue : public virtual RefBase { public: ... protected: sp<Looper> mLooper; }; // -------------------------------------------------------------- // Looper.cpp sp<Looper> Looper::getForThread() { // 一個 Thread 對應一個 Looper int result = pthread_once(& gTLSOnce, initTLSKey); // 返回 0 裁掉表成功 LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed"); Looper* looper = (Looper*)pthread_getspecific(gTLSKey); return sp<Looper>::fromExisting(looper); } void Looper::setForThread(const sp<Looper>& looper) { sp<Looper> old = getForThread(); // also has side-effect of initializing TLS if (looper != nullptr) { // 增加強指針 looper->incStrong((void*)threadDestructor); } pthread_setspecific(gTLSKey, looper.get()); if (old != nullptr) { old->decStrong((void*)threadDestructor); } } ``` ### Native 層 Looper - 創建 Event、Epoll * **[Looper](https://cs.android.com/android/platform/superproject/+/master:system/core/libutils/Looper.cpp) 建構函數**:創建 Event、Epoll | 對象 | 功能說明 | 補充 | | - | - | - | | Event | 監聽訊息事件 | 沒有訊息時就掛起等待,收到消息時通知所有監聽者 | | Epoll | Epoll 監聽 Event 並取出訊息 | Linux Epoll 是 Select 的加強版,使用紅樹儲存,**可監聽大量 FD 檔案描述**,可降低 CPU 使用率 | ```cpp= // Looper.h class Looper : public RefBase { ... private: android::base::unique_fd mWakeEventFd; // immutable Mutex mLock; android::base::unique_fd mEpollFd; } // --------------------------------------------------------------- // Looper.cpp Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), mPolling(false), mEpollRebuildRequired(false), mNextRequestSeq(WAKE_EVENT_FD_SEQ + 1), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { // 創建 event,將 Event 的 fd 存到 mWakeEventFd mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC)); ... log msg AutoMutex _l(mLock); // @ 查看 rebuildEpollLocked 函數 rebuildEpollLocked(); } ``` * `rebuildEpollLocked` 函數:創建 Epoll 對象,並讓 Epoll 監聽 Event 事件 ```cpp= // Looper.cpp void Looper::rebuildEpollLocked() { // Close old epoll instance if we have one. if (mEpollFd >= 0) { ... log msg mEpollFd.reset(); // 關閉舊的 Epoll } // Allocate the new epoll instance and register the . // 創建新的 Epoll 對象 mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC)); ... log msg epoll_event wakeEvent = createEpollEvent(EPOLLIN, WAKE_EVENT_FD_SEQ); // epoll 註冊監聽 WakeEventFd (上面創建的 Event 對象) int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &wakeEvent); ... 省略部分 } ``` :::success * 這裡為何要使用 Epoll,明明 Looper 只有創建一個 Event ? Looper 可以註冊多的 FD 檔案描述 (透過 `Looper#addFd` 函數),**也就是說可以多個 FD 監聽 Looper 的訊息是件** ::: ## Looper 接收事件 Looper#loop 函數最終仍是透過讀取 Epoll 監聽的 Event 事件來做事,如果沒有新事件,也會讓當前 Thread 進行休眠 > ![](https://i.imgur.com/6OhGexF.png) ### Java 層 [Looper](https://cs.android.com/android/platform/superproject/+/master:frameworks/base/core/java/android/os/Looper.java) - loop 無限輪迴 * 從上面我們可以知道 Java 層透過 `Looper#loop` 函數開始無限循環,透過 `loopOnce` 讀取訊息;如果 `loopOnce` 回覆 false 則退出迴圈 ```java= // Looper.java public static void loop() { final Looper me = myLooper(); ... 省略部分檢查 for (;;) { // @ 查看 loopOnce if (!loopOnce(me, ident, thresholdOverride)) { return; } } } ``` * `loopOnce` 函數:不斷從 MessageQueue 中讀取訊息並處理 (`next` 函數),**如果沒有訊息,則 Thread 會在 `next` 函數中休眠** ```java= // Looper.java private static boolean loopOnce(final Looper me, final long ident, final int thresholdOverride) { // 可能在 next 中休眠 Message msg = me.mQueue.next(); // might block if (msg == null) { // No message indicates that the message queue is quitting. return false; } ... 省略部分 return true; } ``` > ![](https://i.imgur.com/AsT5RDD.png) ### Java 層 [**MessageQueue**](https://cs.android.com/android/platform/superproject/+/master:frameworks/base/core/java/android/os/MessageQueue.java) - next 讀取訊息 * MessageQueue#next 函數:讀取 MessageQueue 中待處理的 Message,在查看 Native 函數之前,Java 層取 Message 重點流程如下 1. **呼叫 `nativePollOnce` JNI 函數**,**有可能在該函數中休眠** > 之後小節在分析 2. **首先處理沒有 Handle 並且 ++非異步的 Message++**;沒 Message 則 `nextPollTimeoutMillis` 設定為 -1 繼續循環 (不休眠) 3. 如果有同步 Message 還有會以下兩種情況 * `當前時間` > `Message 要求處理時間`:立刻處理 * `當前時間` < `Message 要求處理時間`:計算需等待的時間,並用等待的時間去處理 Binder 訊息 | 狀況 | 剩餘時間 nextPollTimeoutMillis | 是否休眠 | | - | - | - | | 沒 Message | -1 | 不休眠,繼續循環 | | 有 Message | 某個時間 | 不休眠,去處理 Binder 訊息 | | 有 Message | 0 | 立刻處理該 Message | ```java= // MessageQueue.java // mPtr 就是 NativeMessageQueue 對象的地址 private long mPtr; // used by native code // 是否要讓該 Thread 休眠 private boolean mBlocked; // 準備要處理的 Message Message mMessages; private native void nativePollOnce(long ptr, int timeoutMillis); Message next() { // 檢查 Native 對象是否已建立 final long ptr = mPtr; if (ptr == 0) { return null; } int pendingIdleHandlerCount = -1; // -1 only during first iteration // 下一個訊息的時間 int nextPollTimeoutMillis = 0; for (;;) { if (nextPollTimeoutMillis != 0) { // 讓當前 Thread 去處理 Binder 事務 Binder.flushPendingCommands(); } // @ 查看 nativePollOnce 函數 nativePollOnce(ptr, nextPollTimeoutMillis); synchronized (this) { // Try to retrieve the next message. Return if found. final long now = SystemClock.uptimeMillis(); Message prevMsg = null; Message msg = mMessages; // 在 MessageQueue 中找到同訊訊息 if (msg != null && msg.target == null) { do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); } if (msg != null) { // 查看比對時間 if (now < msg.when) { // 計算當前 msg 需等待多長時間 nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); } else { // 立刻處理 Messsage mBlocked = false; // 不休眠 if (prevMsg != null) { // 串接上一個訊息,到當前的下一個 prevMsg.next = msg.next; } else { // 下一個要處理的 Message mMessages = msg.next; } msg.next = null; ... log msg msg.markInUse(); return msg; } } else { // No more messages. nextPollTimeoutMillis = -1; } ... 省略 IdleHandler } ... 省略 IdleHandler nextPollTimeoutMillis = 0; } } ``` > ![](https://i.imgur.com/VOIHDmN.png) ### Native 層 [NativeMessageQueue](https://cs.android.com/android/platform/superproject/+/master:frameworks/base/core/jni/android_os_MessageQueue.cpp) - 抓取事件 * **`nativePollOnce` 函數**:對應 `android_os_MessageQueue_nativePollOnce` 函數 1. 首先會轉換存在 Java 層的 `mPtr` 地址,為一個 `NativeMessageQueue` 對象 2. 呼叫 NativeMessageQueue#`pollOnce` 函數 ```cpp= // android_os_MessageQueue.cpp static const JNINativeMethod gMessageQueueMethods[] = { ... { "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce }, ... }; static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) { // 轉換指針 NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); // 查看 pollOnce 函數 nativeMessageQueue->pollOnce(env, obj, timeoutMillis); } ``` * `pollOnce` 函數:呼叫 Looper#pollOnce 函數 ```cpp= // android_os_MessageQueue.h class MessageQueue : public virtual RefBase { public: protected: sp<Looper> mLooper; }; // ---------------------------------------------------------------------- // android_os_MessageQueue.cpp void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) { mPollEnv = env; mPollObj = pollObj; // @ 查看 pollOnce mLooper->pollOnce(timeoutMillis); mPollObj = NULL; mPollEnv = NULL; if (mExceptionObj) { env->Throw(mExceptionObj); env->DeleteLocalRef(mExceptionObj); mExceptionObj = NULL; } } ``` > ![](https://i.imgur.com/NO6o5s5.png) ### Native 層 [Looper](https://cs.android.com/android/platform/superproject/+/master:system/core/libutils/Looper.cpp) - Epoll 監聽 Event、或休眠 * Looper#pollOnce 函數: ```cpp= // Looper.h int pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData); inline int pollOnce(int timeoutMillis) { // 查看 @ pollOnce 實現 return pollOnce(timeoutMillis, nullptr, nullptr, nullptr); } // --------------------------------------------------------------------- // Looper.cpp int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { int result = 0; for (;;) { ... 省略部分 // @ 查看 pollInner 函數 result = pollInner(timeoutMillis); } } ``` * **Looper#`pollInner` 函數**:讀取透過 epoll 監聽 Looper 創建的檔案描述 FD (建構函數時創建的 Epoll、Event) * 如果 Epoll 監聽的 FD 內,IO 讀寫事件為 **EPOLLIN** 代表有其他 Thread 對該 Looper (或是說 Event) 發送事件,這時就會透過 **`awoken` 函數讀取事件** ```cpp= // Looper.cpp static const int EPOLL_MAX_EVENTS = 16; constexpr uint64_t WAKE_EVENT_FD_SEQ = 1; int Looper::pollInner(int timeoutMillis) { ... // Poll. int result = POLL_WAKE; ... // We are about to idle. mPolling = true; struct epoll_event eventItems[EPOLL_MAX_EVENTS]; // 16 // 查看 epoll 監聽的 FD 檔案描述 int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis); // No longer idling. mPolling = false; ... 省略部分 for (int i = 0; i < eventCount; i++) { const SequenceNumber seq = eventItems[i].data.u64; uint32_t epollEvents = eventItems[i].events; if (seq == WAKE_EVENT_FD_SEQ) { // 1 Event 數量 if (epollEvents & EPOLLIN) { // 有其他 Thread 對該 Looper 寫入訊息 // @ 查看 awoken 函數 awoken(); } else { ... log msg } } else { ... 省略部分 } } ... return result; } ``` * Looper#`awoken` 函數:**透過 `read` 函數,讀取資料到 Event 中** :::success **如果 Epoll 沒有監聽到新事件,則就進行 ++休眠++** ::: ```cpp= // Looper.cpp void Looper::awoken() { ... uint64_t counter; // 讀取資料到 Event 中 TEMP_FAILURE_RETRY(read(mWakeEventFd.get(), &counter, sizeof(uint64_t))); } ``` > ![](https://i.imgur.com/R7cKG7w.png) ## Thread 發送訊息 一般來將我們發送訊息會透過 Handler,對指定 Looper 發送訊息 (可以對不同 Looper 發送訊息),下面這是一段,對 Android Main Thread 發送訊息的程式 ```java= private final Handler h = new Handler(Looper.getMainLooper()); public void sendData() { // 對 Android Main Thread 發送訊息 h.sendMessage(Message.obtain(h, new Runnable() { @Override public void run() { //TODO } })); } ``` > ![](https://i.imgur.com/VUq9gAY.png) ### Java 層 [Handle](https://cs.android.com/android/platform/superproject/+/master:frameworks/base/core/java/android/os/Handler.java) 發送訊息 - sendMessage * 從 Handler#sendMessage 函數:最終會呼叫到 MessageQueue#enqueueMessage 函數,把使用者的訊息添加進去 ```java= // Handler.java public final boolean sendMessage(@NonNull Message msg) { // @ 分析 sendMessageDelayed 方法 return sendMessageDelayed(msg, 0); } public final boolean sendMessageDelayed(@NonNull Message msg, long delayMillis) { if (delayMillis < 0) { delayMillis = 0; } // 在指定時間發送 (當前時間 + 延遲時間) // @ 查看 sendMessageAtTime 方法 return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis); } public boolean sendMessageAtTime(@NonNull Message msg, long uptimeMillis) { MessageQueue queue = mQueue; if (queue == null) { ... 沒有 MessageQueue 會拋錯 return false; } // @ 查看 enqueueMessage return enqueueMessage(queue, msg, uptimeMillis); } private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg, long uptimeMillis) { msg.target = this; msg.workSourceUid = ThreadLocalWorkSource.getUid(); if (mAsynchronous) { msg.setAsynchronous(true); } // @ 查看 enqueueMessage 方法 return queue.enqueueMessage(msg, uptimeMillis); } ``` > ![](https://i.imgur.com/7P0m2gi.png) ### Java 層 [**MessageQueue**](https://cs.android.com/android/platform/superproject/+/master:frameworks/base/core/java/android/os/MessageQueue.java) - 按照執行時間排序 Message * **MessageQueue#`enqueueMessage` 函數**:會有以下情況,並會判斷是否需要喚醒 NativeMessageQueue * 當前沒有 Message 要處理、時間小於當前要處理的 Message > 替換當前 Message,並將當前 Message 放在傳入的 Message 後面 * 當前有 Message 要處理 > 遍歷列表,**比對時間找到合適的插入點** ```java= // MessageQueue.java private native static void nativeWake(long ptr); boolean enqueueMessage(Message msg, long when) { ... synchronized (this) { // 對象同步鎖 ... // 標記該 Message 對象已經被使用 msg.markInUse(); msg.when = when; Message p = mMessages; boolean needWake; // 當前沒有 Message 要處理 // 時間小於當前要處理的 Message if (p == null || when == 0 || when < p.when) {] // 替換當前要處理的 Message msg.next = p; mMessages = msg; needWake = mBlocked; } else { needWake = mBlocked && p.target == null && msg.isAsynchronous(); Message prev; for (;;) { prev = p; p = p.next; // 找尋插入點 if (p == null || when < p.when) { break; } if (needWake && p.isAsynchronous()) { needWake = false; } } msg.next = p; // invariant: p == prev.next prev.next = msg; } // We can assume mPtr != 0 because mQuitting is false. if (needWake) { // @ 查看 nativeWake 函數 nativeWake(mPtr); } } return true; } ``` ### Native 層 [NativeMessageQueue](https://cs.android.com/android/platform/superproject/+/master:frameworks/base/core/jni/android_os_MessageQueue.cpp) - 喚醒 Looper * `nativeWake` 函數:透過 `android_os_MessageQueue_nativeWake` 函數實現,轉換 Java 層除儲存的 `mPtr` 為 NativeMessageQueue 對象 並呼叫 NativeMessageQueue#`wake` 函數 ```cpp= // android_os_MessageQueue.cpp static const JNINativeMethod gMessageQueueMethods[] = { ... { "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake }, ... }; static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) { // 轉換 Java 層除儲存的 `mPtr` 為 NativeMessageQueue 對象 NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); // @ 查看 wake nativeMessageQueue->wake(); } void NativeMessageQueue::wake() { // @ 查看 wake mLooper->wake(); } ``` > ![](https://i.imgur.com/uh9Sg5B.png) ### Native 層 [Looper](https://cs.android.com/android/platform/superproject/+/master:system/core/libutils/Looper.cpp) - 緩醒 Epoll * **Looper#`wake` 函數**:透過 Write 函數,對 Event 寫入事件,這個寫入動作會喚醒監聽 Event 的 Epoll 對象 ```cpp= // Looper.cpp void Looper::wake() { ... uint64_t inc = 1; // 透過 Write 函數,對 Event 寫入事件 ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t))); // 檢查是否寫入成功 if (nWrite != sizeof(uint64_t)) { if (errno != EAGAIN) { LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s", mWakeEventFd.get(), nWrite, strerror(errno)); } } } ``` > ![](https://i.imgur.com/4YJDEfZ.png) ## Appendix & FAQ :::info ::: ###### tags: `Android 系統`