--- title: 'RxJava 源碼研究' disqus: kyleAlien --- RxJava 源碼研究 === ## OverView of Content 如有引用參考請詳註出處,感謝 :smile: [TOC] ## 設計模式 * 如同 OOP 的設計模式其一,[**觀察者模式**](https://hackmd.io/sNb3SbepRRejTYY57i_Fdw),多對象依賴一個狀態,當改變了狀態依賴的對象就會收到通知並更新 * 內部實現的卡片是調用,則是使用了 [**裝飾器模式**](https://hackmd.io/DlDU-niGRg-0p-dw-BqxKQ),一層一層的解析 ### 觀察者模式 & 發佈訂閱模式 以下以一張簡單的比較表格可看出它的使用差異,比較 JDK 觀察者模式 & RxJava 官者模式 | 類型 | 觀察者 | 被觀察者 | 驅動者 | | - | -------- | -------- | -------- | | JDK | Observer | Observable | setChanged + notifyObservers | | RxJava | Observer | Observable | subscribe | * [**觀察者模式**](https://hackmd.io/sNb3SbepRRejTYY57i_Fdw?view) > 被觀察者內部存在觀察者清單 (List 容器),當改變數據時通知所有的觀察者 > > ![](https://i.imgur.com/4Z9PlI6.png) * 發佈訂閱模式 > 被觀察者內部 **==不會持有清單,並且允許有多個被觀察者==**,而是 **透過創建發射器發佈訊息到每一層 Observable,最後發至 Observer** > > ![](https://i.imgur.com/YW0tHbW.png) ### 裝飾器模式 * 在 RxJava 中裝飾器模式就相當重要,它使用在 **流式布局的卡片創建**,**會把==上一層的卡片做包裝,在傳入下一層中==** * 類似於洋蔥模型,但是稍微不一樣的地方在於,**解開包裝 (傳遞訊息) 時是 ++從最內層開始解開++,會先深入最內層,解到最外層** > ![](https://i.imgur.com/LR3soxB.png) ## 主概念分析 1. 先研究 Observable & Observer 是如何產生關係的 2. 它們又是如何接收、發送訊息的 ```java= // 用一般我們使用的方式來分析 public class MyClass { public static void main(String[] args) { // 創建被觀察者 Observable Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { } }) // subscribe 訂閱,並使用匿名觀察者 Observer .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } } ``` ### 時序圖 * 以上面範例程式畫出的時序圖 (這也可以看出 subscribe 如何驅使該關係成立的) > ![](https://i.imgur.com/qIgDDSN.png) 1. 靠使用者手動注入 ObservableOnSubscribe 接口 2. 內部自己創建 create 創建被觀察者 ObservableCreate 3. 當註冊 subscrible 時,**內部創建發射器 CreateEmitter** > a. 先通知觀察者,以訂閱 (呼叫 onSubscribe 方法) > b. 再通知被觀察者 (您已被訂閱),發出通知觀察者 (呼叫 subscribe) 4. 通過訂閱連結後,就可以發送 onNext 通知到觀察者去 * 透過 subscrible 的觸發 new 一個發射器,並對上一層包裝的發射器做出 onNext / onSubscribe 的動作 ### RxJavaPlugins * **RxJavaPlugins 為全局 hook 的方法**,從以下 Observable.create 這個方法進入就可以看到 ```java= // 從 create 進入 Observable.create() // create 實現 RxJavaPlugins.onAssembly public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } // onAssembly 實現 public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { // onObservableAssembly 預設為 null Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { // 依照預設該方法一定不會執行 return apply(f, source); } return source; } // 變量 onObservableAssembly 被覆值得地方 public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } RxJavaPlugins.onObservableAssembly = onObservableAssembly; } ``` * 由上面可得知經由 setOnObservableAssembly 這個方法可以直接設定整個 RxJava Hook 的函數(當然 **++RxJavaPlugins 內部還有許多的 Hook 函數設定++**),以下來實現 ```java= public class MyClass { public static void main(String[] args) { // 設定被觀察時就 Hook ~! RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() { @Override public Observable apply(Observable observable) throws Exception { System.out.println("RxJava 全局監聽 !!! -> " + observable); return observable; // 返回 null 就不能鏈式調用了 } }); Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { } }).map(new Function<String, Integer>() { @Override public Integer apply(String s) throws Exception { return null; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer s) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } } ``` **--實做結果--** 會發現每一次的都會經過這個 hook Function (如果返回 null 就不能鏈式調用了) > ![](https://i.imgur.com/Y5SBoZA.png) ### Observable & Observer * 觀察它是如何使用裝飾器模式,讓每一層級產生關係的,先看看程式的結果 ```java= public class MyClass { public static void main(String[] args) { Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext("Hello World"); } }).map(new Function<String, Integer>() { @Override public Integer apply(String s) throws Exception { System.out.print(s); return s.length(); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer s) { System.out.println(", Len: " + s); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } } ``` **--實做結果--** > ![](https://i.imgur.com/hwS66uG.png) * **Observable & Observer 是如何透過 subscribe 產生關係**的,如以下概念圖注意線條顏色 紅:包裝,籃:深入,綠:拆包裝 > ![](https://i.imgur.com/L1Nsz8N.png) * 它們又是如何接收、發送訊息的 在一開始產生被觀察者時是一路的包裝,**使用==裝飾模式==++把該類層層包裝++**,當訂閱產生後 subcribe 訂閱後就會產生反向呼叫,解開該包裝,類似於網路七層模型 > ![](https://i.imgur.com/WhXjo13.png) ## 線程切換 在 RxJava 中主要有兩個切換主線程 & 子線程的方法,在 Android 中要使用必須加入依賴 ```groovy= # 佔了 80% 主要 implementation 'io.reactivex.rxjava2:rxjava:2.1.3' # 補滿 20% Android 專用的 RxJava implementation 'io.reactivex.rxjava2:rxandroid:2.0.1' ``` | Function | 功能 | | -------- | -------- | | subscribeOn(Schedulers. io()) | 切換子線程 | | observeOn(AndroidSchedulers.mainThread()) | 切換主線程 | 以下會分成切換子線程、主線程兩個部份,懂了一個另一個概念就相同了 ### io 子線程 先來看看 Schedulers. io 的內容,其實它就是取得一個屬性,讓我們看看該屬性是如何建構的(它返回的是 Scheduler 類),可看出**它使用 Callable 包裝需要的 IOScheduler 類,在需要時才創建,屬於懶加載** (==透過 initIoScheduler 加載==) > ![](https://i.imgur.com/FD4357J.png) ```java= Observable.<String>create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { } }) // 差異在這裡,讓操作跑在 WorkThread 中 .subscribeOn(Schedulers.io()) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); ``` * 接著來分析 **subscribeOn**,看看時序圖(以下省略 RxJavaPlugins,只須知道它有全局 hook 的能力即可),注意黃色部份,**黃色包裹的是 Runnable 任務,之後將會被線程池調用**,在下面可以看到它包裹了兩層的,第一層包裝的是主要任務 > ![](https://i.imgur.com/oaFDwKe.png) * 上面的時序圖看起來很繞(真的繞),現在來看看概念圖,多加了一層 Map,其概念與基礎相同,差異在包裝了 Runnable 任務(黃色代表被包裝的任務) > ![](https://i.imgur.com/hb16cwQ.png) :::info 這裡可以看到主要任務經過兩層的 Runnable 包裝,分別是 DisposeTask、Subcri beTask 兩種 > ![](https://i.imgur.com/MDTdsX9.png) ::: * 接下來看實際的 Log 結果,可以發現全部都切換成子線程執行,要注意 **onSubscribe 是執行在主線程** ```java= Observable.<String>create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext("Yo~ "); Log.d("Test123", "subscribe: " + Thread.currentThread().getName()); } }).map(new Function<String, String>() { @Override public String apply(String s) throws Exception { Log.d("Test123", "map apply: " + Thread.currentThread().getName()); return "Hello World"; } }).subscribeOn(Schedulers.io()) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { // onSubscribe 是運行在主線程 ~ Log.d("Test123", "Observer onSubscribe: " + Thread.currentThread().getName()); } @Override public void onNext(String s) { Log.d("Test123", "Observer onNext: " + Thread.currentThread().getName()); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); ``` **--結果--** > ![](https://i.imgur.com/aTsaIfs.png) ### mainThread 主線程 先來看看 AndroidSchedulers.mainThread 這個變數是如何被建構出來的,這裡有使用到 Handler 所有要複習一下 [**Handler**](https://hackmd.io/7fBX6uEtQt6AzCpuBWTHMQ?view),並且**它使用的 Looper 是 ++MainLooper++ 代表一定會傳入主線程中處理** > ![](https://i.imgur.com/21ZUGdJ.png) ```java= Observable.<String>create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { } }) // 差異在這裡 .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); ``` * 接著分析 **observeOn**,以下是時序圖(忽略 RxJavaPlugins) > ![](https://i.imgur.com/6tAMITo.png) * 這裡也可以看到它包裝了兩層 Runnable 方法,而**其重點在於它調用了主線程的 Handler 傳送訊息** ```java= private static final class HandlerWorker extends Worker { private final Handler handler; private volatile boolean disposed; HandlerWorker(Handler handler) { this.handler = handler; } @Override public Disposable schedule(Runnable run, long delay, TimeUnit unit) { ... run = RxJavaPlugins.onSchedule(run); // 在包裝一層 Runnable ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); // 這裡才是重點,製作資料,使用 obtain(Handler, Runnable) Message message = Message.obtain(handler, scheduled); message.obj = this; // Used as token for batch disposal of this worker's runnables. // 傳送到主線程 handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay))); // Re-check disposed state for removing in case we were racing a call to dispose(). if (disposed) { handler.removeCallbacks(scheduled); return Disposables.disposed(); } return scheduled; } ... } ``` > ![](https://i.imgur.com/zL0Mrqj.png) ## 結論 * RxJava 的卡片式(裝飾模式)調用真的滿繞的,不過在使用者來說就很單純,可以使用鏈式調用,但是相對的,**包裝太多層會導致下率下降**,仍須多加考量 ## Appendix & FAQ :::info ::: ###### tags: `Android 第三方庫` `Java 基礎進階`