---
title: 'Looper & MessageQueue 消息機制'
disqus: kyleAlien
---
Looper & MessageQueue 消息機制
===
## OverView of Content
這章節主要是分析 Native 層的 Looper & MessageQueue,並且看看它們是如何與 Java 層協作 (這裡不會詳細說明 Handle)
:::success
當然最好也先理解 Android Handle 機制,Handle 請參考 [**Android Handler 消息機制**](https://hackmd.io/7fBX6uEtQt6AzCpuBWTHMQ?view)
:::
[TOC]
## Java - 消息 Looper
* 我們知道要創建一個新的 Looper 才能傳遞訊息 (Main Thread 也是,透過 Framework 層幫我們創建好 Looper);以下是一段 Thread 創建新 Looper 的過程,以下要注意幾點
1. 一個 Thread 只能創建一個 Looper,否則會拋出錯誤
2. 每個進程都只會有一個 MainLooper,其他 Thread 要創建 Looper 要使用 `Looper.prepare()`
```java=
public class ThreadLooperCreator extends Thread implements Handler.Callback {
private static final int MSG_1 = 0x1234;
private Handler handler;
public ThreadLooperCreator() {
prepareLooper();
}
private void prepareLooper() {
Looper.prepare();
// 創建一個 Handle
// 指定它接收當前 Looper 的訊息
handler = new Handler(Looper.myLooper(), this);
Looper.loop();
}
@Override
public boolean handleMessage(@NonNull Message msg) {
switch (msg.what) {
case MSG_1:
// TODO:
break;
}
return true;
}
@Override
public void run() {
// 發送 Msg
handler.sendEmptyMessage(MSG_1);
}
}
```
### Native & Java - 消息建立關係
* 從上面我們知道一個新 Thread 要傳遞消息,要創建 Looper 對象 (Java 層),而這個 Looper 的創建關連到許多事情,其中就包括了 Native 的消息機制
這邊直接列出結果關係圖再開始分析
> 
## Native Looper 創建流程
### Java 層 Looper - 創建 Looper、MessageQueue
* [**Looper**](https://cs.android.com/android/platform/superproject/+/master:frameworks/base/core/java/android/os/Looper.java)#prepare 方法:檢查該 Thread 是否已經創建過 Looper,沒有的話則創建一個 Looper 對象 (Java 層)
```java=
// Looper.java
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
public static void prepare() {
// @ 查看 prepare
prepare(true);
}
private static void prepare(boolean quitAllowed) {
// 一個 Thread 只能創建一個 Looper
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
// @ 查看 Looper 建構函數
sThreadLocal.set(new Looper(quitAllowed));
}
```
* [**Looper**](https://cs.android.com/android/platform/superproject/+/master:frameworks/base/core/java/android/os/Looper.java) 建構函數:指定目前 Thread 為 Looper Thread,並 **創建 MessageQueue 對象**
```java=
// Looper.java
private Looper(boolean quitAllowed) {
// 創建 MessageQueue 對象
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
```
* [**MessageQueue**](https://cs.android.com/android/platform/superproject/+/master:frameworks/base/core/java/android/os/MessageQueue.java) 建構函數:創建 Native 層的 MessageQueue,並將地址儲存在 `mPtr` 成員中
```java=
// MessageQueue.java
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
// 呼叫 JNI
private native static long nativeInit();
```
### Native 層 - 創建 NativeMessageQueue、Looper
* 從 MessageQueue#nativeInit JNI 函數開始分析,該函數的實現在 [**android_os_MessageQueue**](https://cs.android.com/android/platform/superproject/+/master:frameworks/base/core/jni/android_os_MessageQueue.cpp),它會創建 [**NativeMessageQueue**](https://cs.android.com/android/platform/superproject/+/master:frameworks/base/core/jni/android_os_MessageQueue.cpp) 對象,並將對像地址返回給 Java 層儲存 (存在 `mPtr` 中)
```cpp=
// android_os_MessageQueue.cpp
static const JNINativeMethod gMessageQueueMethods[] = {
// 對應實現函數為 android_os_MessageQueue_nativeInit
{ "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit },
... 省略部分
};
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
nativeMessageQueue->incStrong(env);
// 返回 Java 層
return reinterpret_cast<jlong>(nativeMessageQueue);
}
```
* **NativeMessageQueue 建構函數**:創建 Native Looper 對象 (Native 層的 Looper) !
:::info
從這邊可以看到 NativeMessageQueue、MessageQueue 都繼承於 RefBase,所以可以使用智能指標控制生命週期
:::
```cpp=
// android_os_MessageQueue.cpp
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
// 查看當前 Thread 是否有 Looper
mLooper = Looper::getForThread();
if (mLooper == NULL) {
// 創建 Looper
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}
// --------------------------------------------------------------
// android_os_MessageQueue.h
class MessageQueue : public virtual RefBase {
public:
...
protected:
sp<Looper> mLooper;
};
// --------------------------------------------------------------
// Looper.cpp
sp<Looper> Looper::getForThread() {
// 一個 Thread 對應一個 Looper
int result = pthread_once(& gTLSOnce, initTLSKey);
// 返回 0 裁掉表成功
LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed");
Looper* looper = (Looper*)pthread_getspecific(gTLSKey);
return sp<Looper>::fromExisting(looper);
}
void Looper::setForThread(const sp<Looper>& looper) {
sp<Looper> old = getForThread(); // also has side-effect of initializing TLS
if (looper != nullptr) {
// 增加強指針
looper->incStrong((void*)threadDestructor);
}
pthread_setspecific(gTLSKey, looper.get());
if (old != nullptr) {
old->decStrong((void*)threadDestructor);
}
}
```
### Native 層 Looper - 創建 Event、Epoll
* **[Looper](https://cs.android.com/android/platform/superproject/+/master:system/core/libutils/Looper.cpp) 建構函數**:創建 Event、Epoll
| 對象 | 功能說明 | 補充 |
| - | - | - |
| Event | 監聽訊息事件 | 沒有訊息時就掛起等待,收到消息時通知所有監聽者 |
| Epoll | Epoll 監聽 Event 並取出訊息 | Linux Epoll 是 Select 的加強版,使用紅樹儲存,**可監聽大量 FD 檔案描述**,可降低 CPU 使用率 |
```cpp=
// Looper.h
class Looper : public RefBase {
...
private:
android::base::unique_fd mWakeEventFd; // immutable
Mutex mLock;
android::base::unique_fd mEpollFd;
}
// ---------------------------------------------------------------
// Looper.cpp
Looper::Looper(bool allowNonCallbacks)
: mAllowNonCallbacks(allowNonCallbacks),
mSendingMessage(false),
mPolling(false),
mEpollRebuildRequired(false),
mNextRequestSeq(WAKE_EVENT_FD_SEQ + 1),
mResponseIndex(0),
mNextMessageUptime(LLONG_MAX) {
// 創建 event,將 Event 的 fd 存到 mWakeEventFd
mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
... log msg
AutoMutex _l(mLock);
// @ 查看 rebuildEpollLocked 函數
rebuildEpollLocked();
}
```
* `rebuildEpollLocked` 函數:創建 Epoll 對象,並讓 Epoll 監聽 Event 事件
```cpp=
// Looper.cpp
void Looper::rebuildEpollLocked() {
// Close old epoll instance if we have one.
if (mEpollFd >= 0) {
... log msg
mEpollFd.reset(); // 關閉舊的 Epoll
}
// Allocate the new epoll instance and register the .
// 創建新的 Epoll 對象
mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
... log msg
epoll_event wakeEvent = createEpollEvent(EPOLLIN, WAKE_EVENT_FD_SEQ);
// epoll 註冊監聽 WakeEventFd (上面創建的 Event 對象)
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &wakeEvent);
... 省略部分
}
```
:::success
* 這裡為何要使用 Epoll,明明 Looper 只有創建一個 Event ?
Looper 可以註冊多的 FD 檔案描述 (透過 `Looper#addFd` 函數),**也就是說可以多個 FD 監聽 Looper 的訊息是件**
:::
## Looper 接收事件
Looper#loop 函數最終仍是透過讀取 Epoll 監聽的 Event 事件來做事,如果沒有新事件,也會讓當前 Thread 進行休眠
> 
### Java 層 [Looper](https://cs.android.com/android/platform/superproject/+/master:frameworks/base/core/java/android/os/Looper.java) - loop 無限輪迴
* 從上面我們可以知道 Java 層透過 `Looper#loop` 函數開始無限循環,透過 `loopOnce` 讀取訊息;如果 `loopOnce` 回覆 false 則退出迴圈
```java=
// Looper.java
public static void loop() {
final Looper me = myLooper();
... 省略部分檢查
for (;;) {
// @ 查看 loopOnce
if (!loopOnce(me, ident, thresholdOverride)) {
return;
}
}
}
```
* `loopOnce` 函數:不斷從 MessageQueue 中讀取訊息並處理 (`next` 函數),**如果沒有訊息,則 Thread 會在 `next` 函數中休眠**
```java=
// Looper.java
private static boolean loopOnce(final Looper me,
final long ident, final int thresholdOverride) {
// 可能在 next 中休眠
Message msg = me.mQueue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return false;
}
... 省略部分
return true;
}
```
> 
### Java 層 [**MessageQueue**](https://cs.android.com/android/platform/superproject/+/master:frameworks/base/core/java/android/os/MessageQueue.java) - next 讀取訊息
* MessageQueue#next 函數:讀取 MessageQueue 中待處理的 Message,在查看 Native 函數之前,Java 層取 Message 重點流程如下
1. **呼叫 `nativePollOnce` JNI 函數**,**有可能在該函數中休眠**
> 之後小節在分析
2. **首先處理沒有 Handle 並且 ++非異步的 Message++**;沒 Message 則 `nextPollTimeoutMillis` 設定為 -1 繼續循環 (不休眠)
3. 如果有同步 Message 還有會以下兩種情況
* `當前時間` > `Message 要求處理時間`:立刻處理
* `當前時間` < `Message 要求處理時間`:計算需等待的時間,並用等待的時間去處理 Binder 訊息
| 狀況 | 剩餘時間 nextPollTimeoutMillis | 是否休眠 |
| - | - | - |
| 沒 Message | -1 | 不休眠,繼續循環 |
| 有 Message | 某個時間 | 不休眠,去處理 Binder 訊息 |
| 有 Message | 0 | 立刻處理該 Message |
```java=
// MessageQueue.java
// mPtr 就是 NativeMessageQueue 對象的地址
private long mPtr; // used by native code
// 是否要讓該 Thread 休眠
private boolean mBlocked;
// 準備要處理的 Message
Message mMessages;
private native void nativePollOnce(long ptr, int timeoutMillis);
Message next() {
// 檢查 Native 對象是否已建立
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
int pendingIdleHandlerCount = -1; // -1 only during first iteration
// 下一個訊息的時間
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
// 讓當前 Thread 去處理 Binder 事務
Binder.flushPendingCommands();
}
// @ 查看 nativePollOnce 函數
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
// 在 MessageQueue 中找到同訊訊息
if (msg != null && msg.target == null) {
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
// 查看比對時間
if (now < msg.when) {
// 計算當前 msg 需等待多長時間
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// 立刻處理 Messsage
mBlocked = false; // 不休眠
if (prevMsg != null) {
// 串接上一個訊息,到當前的下一個
prevMsg.next = msg.next;
} else {
// 下一個要處理的 Message
mMessages = msg.next;
}
msg.next = null;
... log msg
msg.markInUse();
return msg;
}
} else {
// No more messages.
nextPollTimeoutMillis = -1;
}
... 省略 IdleHandler
}
... 省略 IdleHandler
nextPollTimeoutMillis = 0;
}
}
```
> 
### Native 層 [NativeMessageQueue](https://cs.android.com/android/platform/superproject/+/master:frameworks/base/core/jni/android_os_MessageQueue.cpp) - 抓取事件
* **`nativePollOnce` 函數**:對應 `android_os_MessageQueue_nativePollOnce` 函數
1. 首先會轉換存在 Java 層的 `mPtr` 地址,為一個 `NativeMessageQueue` 對象
2. 呼叫 NativeMessageQueue#`pollOnce` 函數
```cpp=
// android_os_MessageQueue.cpp
static const JNINativeMethod gMessageQueueMethods[] = {
...
{ "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce },
...
};
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
// 轉換指針
NativeMessageQueue* nativeMessageQueue =
reinterpret_cast<NativeMessageQueue*>(ptr);
// 查看 pollOnce 函數
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
```
* `pollOnce` 函數:呼叫 Looper#pollOnce 函數
```cpp=
// android_os_MessageQueue.h
class MessageQueue : public virtual RefBase {
public:
protected:
sp<Looper> mLooper;
};
// ----------------------------------------------------------------------
// android_os_MessageQueue.cpp
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
// @ 查看 pollOnce
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
```
> 
### Native 層 [Looper](https://cs.android.com/android/platform/superproject/+/master:system/core/libutils/Looper.cpp) - Epoll 監聽 Event、或休眠
* Looper#pollOnce 函數:
```cpp=
// Looper.h
int pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData);
inline int pollOnce(int timeoutMillis) {
// 查看 @ pollOnce 實現
return pollOnce(timeoutMillis, nullptr, nullptr, nullptr);
}
// ---------------------------------------------------------------------
// Looper.cpp
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
... 省略部分
// @ 查看 pollInner 函數
result = pollInner(timeoutMillis);
}
}
```
* **Looper#`pollInner` 函數**:讀取透過 epoll 監聽 Looper 創建的檔案描述 FD (建構函數時創建的 Epoll、Event)
* 如果 Epoll 監聽的 FD 內,IO 讀寫事件為 **EPOLLIN** 代表有其他 Thread 對該 Looper (或是說 Event) 發送事件,這時就會透過 **`awoken` 函數讀取事件**
```cpp=
// Looper.cpp
static const int EPOLL_MAX_EVENTS = 16;
constexpr uint64_t WAKE_EVENT_FD_SEQ = 1;
int Looper::pollInner(int timeoutMillis) {
...
// Poll.
int result = POLL_WAKE;
...
// We are about to idle.
mPolling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS]; // 16
// 查看 epoll 監聽的 FD 檔案描述
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// No longer idling.
mPolling = false;
... 省略部分
for (int i = 0; i < eventCount; i++) {
const SequenceNumber seq = eventItems[i].data.u64;
uint32_t epollEvents = eventItems[i].events;
if (seq == WAKE_EVENT_FD_SEQ) { // 1 Event 數量
if (epollEvents & EPOLLIN) { // 有其他 Thread 對該 Looper 寫入訊息
// @ 查看 awoken 函數
awoken();
} else {
... log msg
}
} else {
... 省略部分
}
}
...
return result;
}
```
* Looper#`awoken` 函數:**透過 `read` 函數,讀取資料到 Event 中**
:::success
**如果 Epoll 沒有監聽到新事件,則就進行 ++休眠++**
:::
```cpp=
// Looper.cpp
void Looper::awoken() {
...
uint64_t counter;
// 讀取資料到 Event 中
TEMP_FAILURE_RETRY(read(mWakeEventFd.get(), &counter, sizeof(uint64_t)));
}
```
> 
## Thread 發送訊息
一般來將我們發送訊息會透過 Handler,對指定 Looper 發送訊息 (可以對不同 Looper 發送訊息),下面這是一段,對 Android Main Thread 發送訊息的程式
```java=
private final Handler h = new Handler(Looper.getMainLooper());
public void sendData() {
// 對 Android Main Thread 發送訊息
h.sendMessage(Message.obtain(h, new Runnable() {
@Override
public void run() {
//TODO
}
}));
}
```
> 
### Java 層 [Handle](https://cs.android.com/android/platform/superproject/+/master:frameworks/base/core/java/android/os/Handler.java) 發送訊息 - sendMessage
* 從 Handler#sendMessage 函數:最終會呼叫到 MessageQueue#enqueueMessage 函數,把使用者的訊息添加進去
```java=
// Handler.java
public final boolean sendMessage(@NonNull Message msg) {
// @ 分析 sendMessageDelayed 方法
return sendMessageDelayed(msg, 0);
}
public final boolean sendMessageDelayed(@NonNull Message msg, long delayMillis) {
if (delayMillis < 0) {
delayMillis = 0;
}
// 在指定時間發送 (當前時間 + 延遲時間)
// @ 查看 sendMessageAtTime 方法
return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}
public boolean sendMessageAtTime(@NonNull Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
... 沒有 MessageQueue 會拋錯
return false;
}
// @ 查看 enqueueMessage
return enqueueMessage(queue, msg, uptimeMillis);
}
private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg,
long uptimeMillis) {
msg.target = this;
msg.workSourceUid = ThreadLocalWorkSource.getUid();
if (mAsynchronous) {
msg.setAsynchronous(true);
}
// @ 查看 enqueueMessage 方法
return queue.enqueueMessage(msg, uptimeMillis);
}
```
> 
### Java 層 [**MessageQueue**](https://cs.android.com/android/platform/superproject/+/master:frameworks/base/core/java/android/os/MessageQueue.java) - 按照執行時間排序 Message
* **MessageQueue#`enqueueMessage` 函數**:會有以下情況,並會判斷是否需要喚醒 NativeMessageQueue
* 當前沒有 Message 要處理、時間小於當前要處理的 Message
> 替換當前 Message,並將當前 Message 放在傳入的 Message 後面
* 當前有 Message 要處理
> 遍歷列表,**比對時間找到合適的插入點**
```java=
// MessageQueue.java
private native static void nativeWake(long ptr);
boolean enqueueMessage(Message msg, long when) {
...
synchronized (this) { // 對象同步鎖
...
// 標記該 Message 對象已經被使用
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
// 當前沒有 Message 要處理
// 時間小於當前要處理的 Message
if (p == null || when == 0 || when < p.when) {]
// 替換當前要處理的 Message
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
// 找尋插入點
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
// @ 查看 nativeWake 函數
nativeWake(mPtr);
}
}
return true;
}
```
### Native 層 [NativeMessageQueue](https://cs.android.com/android/platform/superproject/+/master:frameworks/base/core/jni/android_os_MessageQueue.cpp) - 喚醒 Looper
* `nativeWake` 函數:透過 `android_os_MessageQueue_nativeWake` 函數實現,轉換 Java 層除儲存的 `mPtr` 為 NativeMessageQueue 對象
並呼叫 NativeMessageQueue#`wake` 函數
```cpp=
// android_os_MessageQueue.cpp
static const JNINativeMethod gMessageQueueMethods[] = {
...
{ "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake },
...
};
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
// 轉換 Java 層除儲存的 `mPtr` 為 NativeMessageQueue 對象
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
// @ 查看 wake
nativeMessageQueue->wake();
}
void NativeMessageQueue::wake() {
// @ 查看 wake
mLooper->wake();
}
```
> 
### Native 層 [Looper](https://cs.android.com/android/platform/superproject/+/master:system/core/libutils/Looper.cpp) - 緩醒 Epoll
* **Looper#`wake` 函數**:透過 Write 函數,對 Event 寫入事件,這個寫入動作會喚醒監聽 Event 的 Epoll 對象
```cpp=
// Looper.cpp
void Looper::wake() {
...
uint64_t inc = 1;
// 透過 Write 函數,對 Event 寫入事件
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
// 檢查是否寫入成功
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
mWakeEventFd.get(), nWrite, strerror(errno));
}
}
}
```
> 
## Appendix & FAQ
:::info
:::
###### tags: `Android 系統`