--- title: 'RxJava 使用' disqus: kyleAlien --- RxJava 使用 === ## OverView of Content 如有引用參考請詳註出處,感謝 :smile: > [**RxJava** ](https://github.com/ReactiveX/RxJava) 在 Android 中載入的方式為,在 gradle 中載入,RxJava 源碼分析在另外一篇文章 ```java= dependencies { implementation "io.reactivex.rxjava3:rxjava:3.0.0" } ``` [TOC] ## RxJava 介紹 * RxJava 是一種鏈式調用,從頭到尾端連續觸發,有點類似於狀態機模式,**只有當目前目標完成才能做下一個**,**每一個目標只認識前一個對象** > ![](https://i.imgur.com/YtDrjFM.png) * **RxJava 類似於在寫一個狀態的腳本,該腳本從頭到尾端都是串連的** ### 為何要學 & 重點 * 學習的原因 1. 代碼邏輯清晰 2. 避免回調過多 3. 線程調度 * 重點 1. 觀察者模式 2. 線程調度 3. 原理、架構 4. 應用 ## RxJava 基礎使用 | 角色 | 功能 | Example | | --------------------- | ---------------------------------------------- | ------- | | Observable (被觀察者) | 觸發/要求/產生事件 | 顧客 | | Observer (觀察者) | 處理事件 | 研發處 | | Subscribe (訂閱) | 接收被觀察者事件,給觀察者處理,也就是**連接** | FAE | > ![](https://i.imgur.com/fmLg9Bw.png) ### 常用的觀察者模式 | 觀察類 | 被觀察類 | 解釋 | | ------------------- | --------------- | --------------------------------------------------------------------------------------------------- | | Observer | Observable<T\> | 發送 0 or n 個數據,並以成功或錯誤事件終止 | | Subscriber | Flowable<T\> | 發送 0 or n 個數據,並以成功或錯誤事件終止,==支持 Backpressure==,可以**控制數據發送的速度(時間)** | | SingleObserver | Single<T\> | 發送 1 個數據,只有兩種狀況 **onSuccess & onError** 事件 | | CompletableObserver | Completable<T\> | 發送 0 個數據,只處理 **onComplete & onError** 事件,可當成 Rx's Runnable | | MaybeObserver | Maybe<T\> | 發送 0 or 1 個數據,只有兩種狀況 **onComplete & onError** 事件 | ### 匿名接口 Observer ```java= public class MainActivity extends AppCompatActivity { @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); //"1. " Observable<String> oble = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable { //"4. " emitter.onNext("Hello "); emitter.onNext("World "); emitter.onNext("Alien "); } }); //"3. " oble.subscribe( //"2. " new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { Log.d("MyObserver", "MyObserver: " + d); } @Override public void onNext(@NonNull String s) { Log.d("MyObserver", "onNext: " + s); } @Override public void onError(@NonNull Throwable e) { Log.d("MyObserver", "onError: " + e); } @Override public void onComplete() { Log.d("MyObserver", "onComplete"); } }); } } ``` 1. 被觀察者可透過靜態方法 create 創建 2. 創建觀察者 Observer 3. 訂閱,以語意來說,意思有點反向,變成了**被觀察者訂閱觀察者**,但是以程式來看,透過==泛型 **super** 限定該類==卻相當合理 > ![](https://i.imgur.com/UzQRVhh.png) 4. 當**訂閱之後會觸發數據方送** > ![](https://i.imgur.com/Ilk07ct.png) **--實作--** > ![](https://i.imgur.com/aPEqUwm.png) ### 鏈式調用 * 類似於 Builder 模式,會返回該類自身,RxJava 建議使用這個方式調用 > ![](https://i.imgur.com/A7jQuYZ.png) ```java= public class MainActivity extends AppCompatActivity { @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable { emitter.onNext("Hello "); emitter.onNext("World "); emitter.onNext("Alien "); } }).subscribe(new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { Log.d("MyObserver", "onSubscribe: " + d); } @Override public void onNext(@NonNull String s) { Log.d("MyObserver", "onNext: " + s); } @Override public void onError(@NonNull Throwable e) { Log.d("MyObserver", "onError: " + e); } @Override public void onComplete() { Log.d("MyObserver", "onComplete"); } }); } } ``` > 邏輯簡單,實現出不同調用不同結果,結果同上 ## Subject 類 * Subject 類也是一個抽象類 ```java= public abstract class Subject<T> extends Observable<T> implements Observer<T> { ... } ``` * 既是`觀察者`也被`被觀察者`,所以**不須分開創建** * 它是一個==Hot Observable== > ![](https://i.imgur.com/1uR6MPY.png) | 類明 | 功能 | | ----------------- | -------------------------------------------- | | AsyncSubject() | 只發生在 **onComplete 方法前的最後一個數據** | | BehaviorSubject() | 發送訂閱前的一個數據 and 訂閱後的全部數據 | | ReplaySubject() | 無論何時訂閱都會發送全部數據,也可限定 | | PublishSubject() | **訂閱後**的全部數據 | ### AsyncSubject 實作 ```java= class TestAsyncSubject { AsyncSubject<String> subject = AsyncSubject.create(); void start() { subject.subscribe( new Consumer<String>() { @Override public void accept(String s) throws Throwable { System.out.println("AsyncSubject: " + s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable s) throws Throwable { System.out.println("AsyncSubject Error: " + s.toString()); } }, new Action() { @Override public void run() throws Throwable { System.out.println("AsyncSubject Complete"); } } ); subject.onNext("Hello"); subject.onNext("World"); //"1. " subject.onComplete(); subject.onNext("Alien"); } } ``` 1. onComplete 前的數據是 World,所以只發送 World > ![](https://i.imgur.com/tTinw6D.png) ### BehaviorSubject 實作 ```java= class TestBehaviorSubject { BehaviorSubject<String> subject = BehaviorSubject.create(); void start() { "1. " subject.onNext("Yo"); subject.onNext("Pan"); subject.subscribe( new Consumer<String>() { @Override public void accept(String s) throws Throwable { System.out.println("BehaviorSubject: " + s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable s) throws Throwable { System.out.println("BehaviorSubject Error: " + s.toString()); } }, new Action() { @Override public void run() throws Throwable { System.out.println("BehaviorSubject Complete"); } } ); subject.onNext("Hello"); subject.onNext("World"); subject.onComplete(); subject.onNext("Alien"); } } ``` 1. 在訂閱前有兩筆數據,只取訂閱前的筆 > ![](https://i.imgur.com/xHdXJGD.png) ### ReplaySubject 實作 * 改為 lambda 表達式可更簡短的寫出代碼 ```java= class TestReplaySubject { ReplaySubject<String> subject = ReplaySubject.create(); void start() { subject.onNext("Yo"); subject.onNext("Pan"); subject.subscribe( s -> System.out.println("AsyncSubject: " + s), s -> System.out.println("AsyncSubject Error: " + s.toString()), new Action() { @Override public void run() throws Throwable { System.out.println("AsyncSubject Complete"); } } ); subject.onNext("Hello"); subject.onNext("World"); subject.onComplete(); subject.onNext("Alien"); } } ``` > ![](https://i.imgur.com/OIpWpEc.png) * createWithSize() 可限定數據量 > ![](https://i.imgur.com/7hlkV2n.png) ### PublishSubject 實作 * 可以做預加載,訂閱前的數據不發送,只發送訂閱後的數據 ```java= class TestPublishSubject { PublishSubject<String> subject = PublishSubject.create(); void start() { subject.onNext("Yo"); subject.onNext("Pan"); subject.subscribe( s -> System.out.println("AsyncSubject: " + s), s -> System.out.println("AsyncSubject Error: " + s.toString()), () -> System.out.println("AsyncSubject Complete") ); subject.onNext("Hello"); subject.onNext("World"); subject.onComplete(); subject.onNext("Alien"); } } ``` > ![](https://i.imgur.com/q7Z82rj.png) ## 操作符 | 操作符 | 解釋 | | ---------------- | ---------------------------------------------------------------------- | | doOnSubscribe | 當訂閱者訂閱時可通過此方法監聽 | | doOnLifecycle | 在觀察者訂閱後,設置是否取消訂閱 | | doOnNext | 在每次發送訊息前,**做預前處理**,不改變發送的訊息,只**加入一個監聽** | | doOnEach | 對發送元素和通知進行統一封裝,每發送一次數據就調用一次 | | doAfterNext | onNext 後執行 | | doOnCompleted | 做 onComplete 之前的處理 | | doFinally | doFinally 優先權大於 doAfterTimater | | doOnTerminate | 發送前預處理 (對 onError & onCompleted 的調用監聽) | | doAfterTerminate | 當 Observable 調用 onComplete or onError 時觸發 | ### 操作符範例 ```java= public class ThreadChangeObserver { public static void main(String []args) { Observable<String> o = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable { emitter.onNext("Hello World, Alien"); emitter.onComplete(); } }); Observer<String> or = new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { System.out.println("Get onSubscribe"); } @Override public void onNext(@NonNull String s) { System.out.println("onNext: " + s); } @Override public void onError(@NonNull Throwable e) { System.out.println("onError: " + e.toString()); } @Override public void onComplete() { System.out.println("onComplete"); } }; o = o.doOnNext(new Consumer<String>() { @Override public void accept(String s) throws Throwable { System.out.println("doOnNext pre Process"); } }); o = o.doOnComplete(new Action() { @Override public void run() throws Throwable { System.out.println("doOnComplete method"); } }); o.subscribe(or); } } ``` > ![](https://i.imgur.com/AAoquHo.png) ### 操作符原理 * 使用裝飾模式修飾每個類 (like 洋蔥模型) * 透過繼承 AbstractObservableWithUpstream,它的內部有一個接口 ObservableSource,而繼承它都必須覆寫 ```java= abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> { protected final ObservableSource<T> source; AbstractObservableWithUpstream(ObservableSource<T> source) { this.source = source; } @Override public final ObservableSource<T> source() { return source; } } ``` * 以 doOnComplete 為例 ```java= private Observable<T> doOnEach(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError, @NonNull Action onComplete, @NonNull Action onAfterTerminate) { Objects.requireNonNull(onNext, "onNext is null"); Objects.requireNonNull(onError, "onError is null"); Objects.requireNonNull(onComplete, "onComplete is null"); Objects.requireNonNull(onAfterTerminate, "onAfterTerminate is null"); return RxJavaPlugins.onAssembly(new ObservableDoOnEach<>(this, onNext, onError, onComplete, onAfterTerminate)); } ...// ObservableDoOnEach 建構函數 public ObservableDoOnEach(ObservableSource<T> source, Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) { super(source); // 重點 this.onNext = onNext; this.onError = onError; this.onComplete = onComplete; this.onAfterTerminate = onAfterTerminate; } ``` > 重點,**傳入的 ObservableSource 接口,是一開始創建的==被觀察者 this==** ```java= @Override public void subscribeActual(Observer<? super T> t) { source.subscribe(new DoOnEachObserver<>(t, onNext, onError, onComplete, onAfterTerminate)); } ...// DoOnEachObserver 類 static final class DoOnEachObserver<T> implements Observer<T>, Disposable { ... } ``` > source.subscribe() 的方法又是傳入 new DoOnEachObserver,所以 Observable 呼叫的 Observer 方法就會傳入 DoOnEachObserver 類 > * 洋蔥模型,**最終呼叫方法都是呼叫最外面的一層**,當呼叫 subscribe 時是 > 1. doOnComplete > 2. doOnNext > 3. ObservableCreate > > ![](https://i.imgur.com/ONI1XfA.png) ### 結論 > 所有的操作符都 > > 1. 繼承 AbstractObservableWithUpstream > > 2. 呼叫方法 subscribeActual (遞歸方法) > > 3. 內部類的發射器要實作 Observer 接口 ### 注意 ```java= public class TestMyOperation { public static void main(String ...args) { Observable<String> o = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable { emitter.onNext("Hello World, Alien"); emitter.onComplete(); } }); Observer<String> or = new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { System.out.println("Get onSubscribe"); } @Override public void onNext(@NonNull String s) { System.out.println("onNext: " + s); } @Override public void onError(@NonNull Throwable e) { System.out.println("onError: " + e.toString()); } @Override public void onComplete() { System.out.println("onComplete"); } }; //"1. " o.map(new Function<String, String>() { @Override public String apply(String s) throws Throwable { System.out.println("------------------apply be code"); return s + " apply"; } }); o.subscribe(or); } } ``` 1. 由於是洋蔥模型,如果沒有把原類傳進去的話並不會做更改 (也就是 o.subscribe 的 o 並沒有被包裝),以下才正確,最好可以使用**鏈式調用** ```java= o = o.map(new Function<String, String>() { @Override public String apply(String s) throws Throwable { System.out.println("------------------apply be code"); return s + " apply"; } }); ``` ## Hot & Cold * 差別可以想成,一個是直播 (Hot) 按照進度觀看,一個是錄頻 (Cold) 看幾次都可以 ### Cold Observable * 觀察者訂閱了才開始發送數據,**觀察者與被觀察者是一對一**,各自事件是獨立的 (預設),透過 just, create, fromxxx 都是冷的被觀察者 * 輕量級,不太消耗資源 > 範例: 使用 Cold Observable 每 10ms 發送一個數據 ```java= public class ColdObservable { public static void main(String ...args) { Observable<Long> o = Observable.create(new ObservableOnSubscribe<Long>() { @Override public void subscribe(@NonNull ObservableEmitter<Long> emitter) throws Throwable { Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation()) .take(Integer.MAX_VALUE) .subscribe(emitter::onNext); } }).observeOn(Schedulers.newThread()); o.subscribe( new Consumer<Long>() { @Override public void accept(Long aLong) throws Throwable { System.out.println("consumer1: " + aLong); } }); o.subscribe(aLong -> System.out.println("-------consumer2: " + aLong)); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } ``` **--實作--** > 數據是獨立的,並不會互相影響 > ![](https://i.imgur.com/Xu1o2Lj.png) ### Hot Observable * 不論有沒有觀察者訂閱事件都會發生,**訊息共享,錯過就沒有了** (類似 Android's Broadcast) * 適合用在**重量級超做**,不用重複執行代碼 * ==小心內存洩漏== ```java= public class ColdObservable { public static void main(String ...args) { ConnectableObservable<Long> o = (ConnectableObservable) Observable.create(new ObservableOnSubscribe<Long>() { @Override public void subscribe(@NonNull ObservableEmitter<Long> emitter) throws Throwable { Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation()) .take(Integer.MAX_VALUE) .subscribe(emitter::onNext); } }).observeOn(Schedulers.newThread()).publish(); o.connect(); o.subscribe(aLong -> System.out.println("consumer1: " + aLong)); o.subscribe(aLong -> System.out.println("-------consumer2: " + aLong)); try { Thread.sleep(30); } catch (InterruptedException e) { e.printStackTrace(); } o.subscribe(aLong -> System.out.println("--------------consumer3: " + aLong)); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } ``` * 關鍵字 > ConnectableObservable 被觀察者 > publish()、connect() **--實作--** > 數據共享,看的出來休眠 30ms 後再訂閱,並不會有之前的消息 > ![](https://i.imgur.com/Gr6xrAV.png) ## Flowable 背壓 * RxJava 1 中並無特殊處理,RxJava 2 就有特數處理**被觀察者 <==Flowable==\>** * 被觀察者數據發送過快,至於訂閱者無法來的及處理,造成阻塞現象 (吞吐量不均) > 上游: 資料產生者 > 下游: 資料處理者 > > 條件: 異步線程,上游產生數據 > 下游處理數據 * Ex: 可以使用在音視頻解碼、解壓縮 ### Flowable 策略 | 策略 | 處理方式 | | -------- | -------- | | MISSING | 如果無法保持同步就會拋出異常 MissingBackpresureException or IllegalStateException | | BUFFER | 上游不斷發出 onNext 請求,多餘的會累積到 BUF (緩存池無限大),直到下游處理完 | | ERROR | 上游無法跟上下游速度會拋出異常 MissingBackpresureException | | DROP | 上游無法跟上下游速度會,拋棄 onNext 任務 | | LATEST | 上游保留最新 onNext 任務,直到下游消耗掉 | ### Flowable 實作 * 假設每 10ms 產生一筆數據,但是處理一筆數據需要 1000ms 1. 未使用被壓操作,數據量大時會產生 OOM ```java= private void TestNoFlow() { Observable.create((ObservableOnSubscribe<Integer>) emitter -> { int i = 0; while(true) { Log.i("Alien", "send: " + i); emitter.onNext(i++); try { Thread.sleep(10L); } catch (InterruptedException e) { e.printStackTrace(); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(integer -> { Log.i("Alien", "call: " + integer); try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } }); } ``` **--實作--** > ![](https://i.imgur.com/2hQVSiz.png) 2. 使用 Flowable ```java= private void TestUseFlow() { Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull FlowableEmitter<Integer> emitter) throws Throwable { int i = 0; while(true) { Log.i("Alien", "flow send: " + i); emitter.onNext(i++); try { Thread.sleep(10L); } catch (InterruptedException e) { e.printStackTrace(); } } } }, BackpressureStrategy.BUFFER) .subscribeOn(Schedulers.io()) // 被觀察者在 io 線程 .observeOn(AndroidSchedulers.mainThread()) // 觀察者在主線程 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Throwable { Log.i("Alien", "-------------flow process: " + integer); try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } } }); } ``` **--實作--** > ![](https://i.imgur.com/9ejzdGm.png) ## 線程調度 * 在調度主線程時要加入的依賴 ```java= dependencies { implementation 'io.reactivex.rxjava3:rxandroid:3.0.0' } ``` ### 調度器 [參考1](https://codertw.com/程式語言/704112/) [參考2](https://codertw.com/程式語言/647198/#outline__4_1_1) | 調度器 | 功能 | | ------------------------------ | ------------------------------------------------------------------------------------------------------ | | AndroidSchedulers.mainThread() | 需要引用rxandroid, 切換到UI線程 | | Schedulers.\io() | 用於**IO密集型任務**,如異步阻塞IO操作,這個調度器的線程池會根據需求,它默認是一個CacheThreadScheduler | | Schedulers.newThread() | 為每一個任務創建一個新線程 | | Schedulers.trampoline() | 需要引用rxandroid, 切換到UI線程 | | Scheduler.from(executor) | 指定Executor作為調度器 | | Scheduler.single() | 單一線程的操作 | | Scheduler.computation() | 運行在密集計算的操作,大多數異步操作符使用該調度器 | | observeOn | ==指定觀察者的線程==,依照最後一次設定生效 | | subscribeOn | ==指定被觀察者的線程==,只有第一次設定的生效 | ### 處理線程切換例子 ```java= private void TestRxThread() { // 1. 創建被觀察者 Observable observable = Observable.create((ObservableOnSubscribe<String>) emitter -> { Log.i("Alien","subscribe: " + Thread.currentThread().getName()); emitter.onNext("test1"); }); // 2. 修飾符 map observable = observable.map(new Function<String,String>(){ @Override public String apply(String s) throws Exception { Log.i("Alien","apply: " + Thread.currentThread().getName()); return s + " map"; } }); //3. 創建觀察者 Observer observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.i("Alien","onSubscribe: " + Thread.currentThread().getName()); Log.i("Alien", "onSubscribe: " + d); } @Override public void onNext(String str) { Log.i("Alien","onNext: " + Thread.currentThread().getName()); Log.i("Alien", "onNext: " + str); } @Override public void onError(Throwable e) { Log.i("Alien","onError: " + Thread.currentThread().getName()); Log.i("Alien", "onError: " + e.getMessage()); } @Override public void onComplete() { Log.i("Alien","onComplete: " + Thread.currentThread().getName()); Log.i("Alien", "onComplete: "); } }; // "1. " Observable so = observable.subscribeOn(Schedulers.io()) .subscribeOn(Schedulers.newThread()) .subscribeOn(Schedulers.single()); // "2. " Observable oo = so.observeOn(AndroidSchedulers.mainThread()) .observeOn(Schedulers.newThread()) .observeOn(Schedulers.single()); // 4. 訂閱 oo.subscribe(observer); } ``` 1. 被觀察者所在線程設定,會在設定線程處理資訊 2. 觀察者所在線程設定 **--實作--** > ![](https://i.imgur.com/NW0MW7P.png) ### ObserveOn & subscribeOn * 上面範例可看到 observeOn(被觀察者) 被呼叫多次, 但它使用==最後一次設定== 的 Single Thread * 而 subscribeOn(觀察者) 被定義多次,但是它只==使用第一次設定==的 IO Thread 主要是因為 `ObserveOn` & `subscribeOn` 的 **subscribeActual 實現方法不一樣** 1. ObserveOn 是 ObservableObserveOn 類 (被觀察者) ```java= ...// 遞規呼叫 subscribe 方法 (又會呼叫回 subscribeActual),最終呼叫到最內層才執行 @Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize)); } } ``` > ![](https://i.imgur.com/AJtuSsB.png) 2. subscribeOn 是 ObservableSubscribeOn 類 (觀察者) ```java= ...// 直接呼叫 scheduleDirect 方法,而每次設定都改變,所以是最外層 @Override public void subscribeActual(final Observer<? super T> observer) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer); observer.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } ``` > ![](https://i.imgur.com/OeF8bE3.png) ### UML > ![](https://i.imgur.com/YFPINfK.png) ## RxJava & Retrofit RxJava & Retrofit 很常一起使用,下面會使用 RxJava 的異步線程處理特性 1. 添加 Android 網路權限 ```xml= <uses-permission android:name="android.permission.INTERNET" /> ``` 2. 添加依賴 ```groovy= dependencies { ... // Retrofit 套件 implementation 'com.squareup.retrofit2:retrofit:2.7.2' implementation 'com.squareup.retrofit2:converter-gson:2.7.2' // RxAndroid & RxJava 依賴 implementation 'io.reactivex.rxjava2:rxandroid:2.0.1' implementation 'io.reactivex.rxjava2:rxjava:2.1.3' // Retrofit & RxJava 的 Adapter 套件 implementation 'com.squareup.retrofit2:adapter-rxjava2:2.7.2' // 使用 Gson implementation 'com.google.code.gson:gson:2.8.5' } ``` 3. 接下來訪問的 json 數據[**網站**](https://www.wanandroid.com/blog/show/2) ```java= // 項目分類 https://www.wanandroid.com/project/tree/json // 項目列表數據 https://www.wanandroid.com/project/list/1/json?cid=294 ``` ### 網路請求使用 :::spoiler Retrofit Bean 類 ```java= public class ProjectBean { private int errorCode; private String errorMsg; private List<DataBean> data; @Override public String toString() { return "ProjectBean{" + "errorCode=" + errorCode + ", errorMsg='" + errorMsg + '\'' + ", data=" + data + '}'; } public int getErrorCode() { return errorCode; } public void setErrorCode(int errorCode) { this.errorCode = errorCode; } public String getErrorMsg() { return errorMsg; } public void setErrorMsg(String errorMsg) { this.errorMsg = errorMsg; } public List<DataBean> getData() { return data; } public void setData(List<DataBean> data) { this.data = data; } public static class DataBean { /** * children : [] * courseId : 13 * id : 294 * name : 完整项目 * order : 145000 * parentChapterId : 293 * userControlSetTop : false * visible : 0 */ private int courseId; private int id; private String name; private int order; private int parentChapterId; private boolean userControlSetTop; private int visible; private List<?> children; @Override public String toString() { return "DataBean{" + "courseId=" + courseId + ", id=" + id + ", name='" + name + '\'' + ", order=" + order + ", parentChapterId=" + parentChapterId + ", userControlSetTop=" + userControlSetTop + ", visible=" + visible + ", children=" + children + '}'; } public int getCourseId() { return courseId; } public void setCourseId(int courseId) { this.courseId = courseId; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getOrder() { return order; } public void setOrder(int order) { this.order = order; } public int getParentChapterId() { return parentChapterId; } public void setParentChapterId(int parentChapterId) { this.parentChapterId = parentChapterId; } public boolean isUserControlSetTop() { return userControlSetTop; } public void setUserControlSetTop(boolean userControlSetTop) { this.userControlSetTop = userControlSetTop; } public int getVisible() { return visible; } public void setVisible(int visible) { this.visible = visible; } public List<?> getChildren() { return children; } public void setChildren(List<?> children) { this.children = children; } } } ``` ::: ```java= // 接口 public interface WangAndroidApi { @GET("project/tree/json") Observable<ProjectBean> getProject(); @GET("project/list/{pageIndex}/json") // ?cid=294 Observable<ProjectItem> getProjectList(@Path("pageIndex") int pageIndex, @Query("cid") int cid); } // 使用 public class RxJava_Retrofit extends AppCompatActivity { // Retrofit 接口 private WangAndroidApi wangAndroidApi; // 測試網頁 public static String BASE_URL = "https://www.wanandroid.com/"; public static Retrofit createRetrofit() { return new Retrofit.Builder() .baseUrl(BASE_URL) .addConverterFactory(GsonConverterFactory.create()) // 添加 Rxjava 處理工具,否則會出錯 .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) .build(); } @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_rx_java__retrofit); // Retrofit 動態代理 wangAndroidApi = createRetrofit().create(WangAndroidApi.class); } // xml 註冊的點擊事件 @SuppressLint("CheckResult") public void getProjectAction(View view) { // 要求網路通訊 wangAndroidApi.getProject() .subscribeOn(Schedulers.io()) // 被觀察者在異步線程 .observeOn(AndroidSchedulers.mainThread()) // 觀察者在主線程 .subscribe(new Consumer<ProjectBean>() { @Override public void accept(ProjectBean projectBean) throws Exception { Log.d("HelloRx", "getProject: " + projectBean; } }); } } ``` **--結果--** > ![](https://i.imgur.com/An4ezad.png) ### 封裝 io 切換 ```java= // 封裝 io private <T> ObservableTransformer<T, T> ioPackage() { return new ObservableTransformer<T, T>() { @Override public ObservableSource<T> apply(Observable<T> upstream) { return upstream // 被觀察者 -> 子線程 .subscribeOn(Schedulers.io()) // 觀察者在 -> 主線程 .observeOn(AndroidSchedulers.mainThread()) .map(new Function<T, T>() { @Override public T apply(T t) throws Exception { Log.d("HelloRx", "封裝線程調度,這只是其中一站, 可在這裡修改往下傳遞的數據類型"); return t; } }); } }; } @SuppressLint("CheckResult") public void getProjectAction(View view) { // 要求網路通訊 wangAndroidApi.getProject() .compose(this.<ProjectBean> ioPackage()) .subscribe(new Consumer<ProjectBean>() { @Override public void accept(ProjectBean projectBean) throws Exception { Log.d("HelloRx", "getProject: " + projectBean); } }); } ``` * 其中使用的 ioPackage 這個方法,其中的 map 可以改變往下傳的數據類型,也就是 map 可以影響下一個接收的數據類型 > ![](https://i.imgur.com/RxQKVSf.png) **--實做結果--** > ![](https://i.imgur.com/dyerBtE.png) ## Appendix & FAQ :::info ::: ###### tags: `Android 基礎`