---
title: 'RxJava 源碼研究'
disqus: kyleAlien
---
RxJava 源碼研究
===
## OverView of Content
如有引用參考請詳註出處,感謝 :smile:
[TOC]
## 設計模式
* 如同 OOP 的設計模式其一,[**觀察者模式**](https://hackmd.io/sNb3SbepRRejTYY57i_Fdw),多對象依賴一個狀態,當改變了狀態依賴的對象就會收到通知並更新
* 內部實現的卡片是調用,則是使用了 [**裝飾器模式**](https://hackmd.io/DlDU-niGRg-0p-dw-BqxKQ),一層一層的解析
### 觀察者模式 & 發佈訂閱模式
以下以一張簡單的比較表格可看出它的使用差異,比較 JDK 觀察者模式 & RxJava 官者模式
| 類型 | 觀察者 | 被觀察者 | 驅動者 |
| - | -------- | -------- | -------- |
| JDK | Observer | Observable | setChanged + notifyObservers |
| RxJava | Observer | Observable | subscribe |
* [**觀察者模式**](https://hackmd.io/sNb3SbepRRejTYY57i_Fdw?view)
> 被觀察者內部存在觀察者清單 (List 容器),當改變數據時通知所有的觀察者
>
> 
* 發佈訂閱模式
> 被觀察者內部 **==不會持有清單,並且允許有多個被觀察者==**,而是 **透過創建發射器發佈訊息到每一層 Observable,最後發至 Observer**
>
> 
### 裝飾器模式
* 在 RxJava 中裝飾器模式就相當重要,它使用在 **流式布局的卡片創建**,**會把==上一層的卡片做包裝,在傳入下一層中==**
* 類似於洋蔥模型,但是稍微不一樣的地方在於,**解開包裝 (傳遞訊息) 時是 ++從最內層開始解開++,會先深入最內層,解到最外層**
> 
## 主概念分析
1. 先研究 Observable & Observer 是如何產生關係的
2. 它們又是如何接收、發送訊息的
```java=
// 用一般我們使用的方式來分析
public class MyClass {
public static void main(String[] args) {
// 創建被觀察者 Observable
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
}
})
// subscribe 訂閱,並使用匿名觀察者 Observer
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
}
```
### 時序圖
* 以上面範例程式畫出的時序圖 (這也可以看出 subscribe 如何驅使該關係成立的)
> 
1. 靠使用者手動注入 ObservableOnSubscribe 接口
2. 內部自己創建 create 創建被觀察者 ObservableCreate
3. 當註冊 subscrible 時,**內部創建發射器 CreateEmitter**
> a. 先通知觀察者,以訂閱 (呼叫 onSubscribe 方法)
> b. 再通知被觀察者 (您已被訂閱),發出通知觀察者 (呼叫 subscribe)
4. 通過訂閱連結後,就可以發送 onNext 通知到觀察者去
* 透過 subscrible 的觸發 new 一個發射器,並對上一層包裝的發射器做出 onNext / onSubscribe 的動作
### RxJavaPlugins
* **RxJavaPlugins 為全局 hook 的方法**,從以下 Observable.create 這個方法進入就可以看到
```java=
// 從 create 進入
Observable.create()
// create 實現 RxJavaPlugins.onAssembly
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
// onAssembly 實現
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
// onObservableAssembly 預設為 null
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
// 依照預設該方法一定不會執行
return apply(f, source);
}
return source;
}
// 變量 onObservableAssembly 被覆值得地方
public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
RxJavaPlugins.onObservableAssembly = onObservableAssembly;
}
```
* 由上面可得知經由 setOnObservableAssembly 這個方法可以直接設定整個 RxJava Hook 的函數(當然 **++RxJavaPlugins 內部還有許多的 Hook 函數設定++**),以下來實現
```java=
public class MyClass {
public static void main(String[] args) {
// 設定被觀察時就 Hook ~!
RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
@Override
public Observable apply(Observable observable) throws Exception {
System.out.println("RxJava 全局監聽 !!! -> " + observable);
return observable; // 返回 null 就不能鏈式調用了
}
});
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
}
}).map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return null;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
}
```
**--實做結果--**
會發現每一次的都會經過這個 hook Function (如果返回 null 就不能鏈式調用了)
> 
### Observable & Observer
* 觀察它是如何使用裝飾器模式,讓每一層級產生關係的,先看看程式的結果
```java=
public class MyClass {
public static void main(String[] args) {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("Hello World");
}
}).map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
System.out.print(s);
return s.length();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer s) {
System.out.println(", Len: " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
}
```
**--實做結果--**
> 
* **Observable & Observer 是如何透過 subscribe 產生關係**的,如以下概念圖注意線條顏色
紅:包裝,籃:深入,綠:拆包裝
> 
* 它們又是如何接收、發送訊息的
在一開始產生被觀察者時是一路的包裝,**使用==裝飾模式==++把該類層層包裝++**,當訂閱產生後 subcribe 訂閱後就會產生反向呼叫,解開該包裝,類似於網路七層模型
> 
## 線程切換
在 RxJava 中主要有兩個切換主線程 & 子線程的方法,在 Android 中要使用必須加入依賴
```groovy=
# 佔了 80% 主要
implementation 'io.reactivex.rxjava2:rxjava:2.1.3'
# 補滿 20% Android 專用的 RxJava
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
```
| Function | 功能 |
| -------- | -------- |
| subscribeOn(Schedulers. io()) | 切換子線程 |
| observeOn(AndroidSchedulers.mainThread()) | 切換主線程 |
以下會分成切換子線程、主線程兩個部份,懂了一個另一個概念就相同了
### io 子線程
先來看看 Schedulers. io 的內容,其實它就是取得一個屬性,讓我們看看該屬性是如何建構的(它返回的是 Scheduler 類),可看出**它使用 Callable 包裝需要的 IOScheduler 類,在需要時才創建,屬於懶加載** (==透過 initIoScheduler 加載==)
> 
```java=
Observable.<String>create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
}
})
// 差異在這裡,讓操作跑在 WorkThread 中
.subscribeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
```
* 接著來分析 **subscribeOn**,看看時序圖(以下省略 RxJavaPlugins,只須知道它有全局 hook 的能力即可),注意黃色部份,**黃色包裹的是 Runnable 任務,之後將會被線程池調用**,在下面可以看到它包裹了兩層的,第一層包裝的是主要任務
> 
* 上面的時序圖看起來很繞(真的繞),現在來看看概念圖,多加了一層 Map,其概念與基礎相同,差異在包裝了 Runnable 任務(黃色代表被包裝的任務)
> 
:::info
這裡可以看到主要任務經過兩層的 Runnable 包裝,分別是 DisposeTask、Subcri
beTask 兩種
> 
:::
* 接下來看實際的 Log 結果,可以發現全部都切換成子線程執行,要注意 **onSubscribe 是執行在主線程**
```java=
Observable.<String>create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("Yo~ ");
Log.d("Test123", "subscribe: " + Thread.currentThread().getName());
}
}).map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("Test123", "map apply: " + Thread.currentThread().getName());
return "Hello World";
}
}).subscribeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
// onSubscribe 是運行在主線程 ~
Log.d("Test123", "Observer onSubscribe: " + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.d("Test123", "Observer onNext: " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
```
**--結果--**
> 
### mainThread 主線程
先來看看 AndroidSchedulers.mainThread 這個變數是如何被建構出來的,這裡有使用到 Handler 所有要複習一下 [**Handler**](https://hackmd.io/7fBX6uEtQt6AzCpuBWTHMQ?view),並且**它使用的 Looper 是 ++MainLooper++ 代表一定會傳入主線程中處理**
> 
```java=
Observable.<String>create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
}
})
// 差異在這裡
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
```
* 接著分析 **observeOn**,以下是時序圖(忽略 RxJavaPlugins)
> 
* 這裡也可以看到它包裝了兩層 Runnable 方法,而**其重點在於它調用了主線程的 Handler 傳送訊息**
```java=
private static final class HandlerWorker extends Worker {
private final Handler handler;
private volatile boolean disposed;
HandlerWorker(Handler handler) {
this.handler = handler;
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
...
run = RxJavaPlugins.onSchedule(run);
// 在包裝一層 Runnable
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
// 這裡才是重點,製作資料,使用 obtain(Handler, Runnable)
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
// 傳送到主線程
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
...
}
```
> 
## 結論
* RxJava 的卡片式(裝飾模式)調用真的滿繞的,不過在使用者來說就很單純,可以使用鏈式調用,但是相對的,**包裝太多層會導致下率下降**,仍須多加考量
## Appendix & FAQ
:::info
:::
###### tags: `Android 第三方庫` `Java 基礎進階`