---
title: 'RxJava 使用'
disqus: kyleAlien
---
RxJava 使用
===
## OverView of Content
如有引用參考請詳註出處,感謝 :smile:
> [**RxJava** ](https://github.com/ReactiveX/RxJava) 在 Android 中載入的方式為,在 gradle 中載入,RxJava 源碼分析在另外一篇文章
```java=
dependencies {
implementation "io.reactivex.rxjava3:rxjava:3.0.0"
}
```
[TOC]
## RxJava 介紹
* RxJava 是一種鏈式調用,從頭到尾端連續觸發,有點類似於狀態機模式,**只有當目前目標完成才能做下一個**,**每一個目標只認識前一個對象**
> 
* **RxJava 類似於在寫一個狀態的腳本,該腳本從頭到尾端都是串連的**
### 為何要學 & 重點
* 學習的原因
1. 代碼邏輯清晰
2. 避免回調過多
3. 線程調度
* 重點
1. 觀察者模式
2. 線程調度
3. 原理、架構
4. 應用
## RxJava 基礎使用
| 角色 | 功能 | Example |
| --------------------- | ---------------------------------------------- | ------- |
| Observable (被觀察者) | 觸發/要求/產生事件 | 顧客 |
| Observer (觀察者) | 處理事件 | 研發處 |
| Subscribe (訂閱) | 接收被觀察者事件,給觀察者處理,也就是**連接** | FAE |
> 
### 常用的觀察者模式
| 觀察類 | 被觀察類 | 解釋 |
| ------------------- | --------------- | --------------------------------------------------------------------------------------------------- |
| Observer | Observable<T\> | 發送 0 or n 個數據,並以成功或錯誤事件終止 |
| Subscriber | Flowable<T\> | 發送 0 or n 個數據,並以成功或錯誤事件終止,==支持 Backpressure==,可以**控制數據發送的速度(時間)** |
| SingleObserver | Single<T\> | 發送 1 個數據,只有兩種狀況 **onSuccess & onError** 事件 |
| CompletableObserver | Completable<T\> | 發送 0 個數據,只處理 **onComplete & onError** 事件,可當成 Rx's Runnable |
| MaybeObserver | Maybe<T\> | 發送 0 or 1 個數據,只有兩種狀況 **onComplete & onError** 事件 |
### 匿名接口 Observer
```java=
public class MainActivity extends AppCompatActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
//"1. "
Observable<String> oble = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
//"4. "
emitter.onNext("Hello ");
emitter.onNext("World ");
emitter.onNext("Alien ");
}
});
//"3. "
oble.subscribe(
//"2. "
new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d("MyObserver", "MyObserver: " + d);
}
@Override
public void onNext(@NonNull String s) {
Log.d("MyObserver", "onNext: " + s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.d("MyObserver", "onError: " + e);
}
@Override
public void onComplete() {
Log.d("MyObserver", "onComplete");
}
});
}
}
```
1. 被觀察者可透過靜態方法 create 創建
2. 創建觀察者 Observer
3. 訂閱,以語意來說,意思有點反向,變成了**被觀察者訂閱觀察者**,但是以程式來看,透過==泛型 **super** 限定該類==卻相當合理
> 
4. 當**訂閱之後會觸發數據方送**
> 
**--實作--**
> 
### 鏈式調用
* 類似於 Builder 模式,會返回該類自身,RxJava 建議使用這個方式調用
> 
```java=
public class MainActivity extends AppCompatActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
emitter.onNext("Hello ");
emitter.onNext("World ");
emitter.onNext("Alien ");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d("MyObserver", "onSubscribe: " + d);
}
@Override
public void onNext(@NonNull String s) {
Log.d("MyObserver", "onNext: " + s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.d("MyObserver", "onError: " + e);
}
@Override
public void onComplete() {
Log.d("MyObserver", "onComplete");
}
});
}
}
```
> 邏輯簡單,實現出不同調用不同結果,結果同上
## Subject 類
* Subject 類也是一個抽象類
```java=
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
...
}
```
* 既是`觀察者`也被`被觀察者`,所以**不須分開創建**
* 它是一個==Hot Observable==
> 
| 類明 | 功能 |
| ----------------- | -------------------------------------------- |
| AsyncSubject() | 只發生在 **onComplete 方法前的最後一個數據** |
| BehaviorSubject() | 發送訂閱前的一個數據 and 訂閱後的全部數據 |
| ReplaySubject() | 無論何時訂閱都會發送全部數據,也可限定 |
| PublishSubject() | **訂閱後**的全部數據 |
### AsyncSubject 實作
```java=
class TestAsyncSubject {
AsyncSubject<String> subject = AsyncSubject.create();
void start() {
subject.subscribe(
new Consumer<String>() {
@Override
public void accept(String s) throws Throwable {
System.out.println("AsyncSubject: " + s);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable s) throws Throwable {
System.out.println("AsyncSubject Error: " + s.toString());
}
},
new Action() {
@Override
public void run() throws Throwable {
System.out.println("AsyncSubject Complete");
}
}
);
subject.onNext("Hello");
subject.onNext("World");
//"1. "
subject.onComplete();
subject.onNext("Alien");
}
}
```
1. onComplete 前的數據是 World,所以只發送 World
> 
### BehaviorSubject 實作
```java=
class TestBehaviorSubject {
BehaviorSubject<String> subject = BehaviorSubject.create();
void start() {
"1. "
subject.onNext("Yo");
subject.onNext("Pan");
subject.subscribe(
new Consumer<String>() {
@Override
public void accept(String s) throws Throwable {
System.out.println("BehaviorSubject: " + s);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable s) throws Throwable {
System.out.println("BehaviorSubject Error: " + s.toString());
}
},
new Action() {
@Override
public void run() throws Throwable {
System.out.println("BehaviorSubject Complete");
}
}
);
subject.onNext("Hello");
subject.onNext("World");
subject.onComplete();
subject.onNext("Alien");
}
}
```
1. 在訂閱前有兩筆數據,只取訂閱前的筆
> 
### ReplaySubject 實作
* 改為 lambda 表達式可更簡短的寫出代碼
```java=
class TestReplaySubject {
ReplaySubject<String> subject = ReplaySubject.create();
void start() {
subject.onNext("Yo");
subject.onNext("Pan");
subject.subscribe(
s -> System.out.println("AsyncSubject: " + s),
s -> System.out.println("AsyncSubject Error: " + s.toString()),
new Action() {
@Override
public void run() throws Throwable {
System.out.println("AsyncSubject Complete");
}
}
);
subject.onNext("Hello");
subject.onNext("World");
subject.onComplete();
subject.onNext("Alien");
}
}
```
> 
* createWithSize() 可限定數據量
> 
### PublishSubject 實作
* 可以做預加載,訂閱前的數據不發送,只發送訂閱後的數據
```java=
class TestPublishSubject {
PublishSubject<String> subject = PublishSubject.create();
void start() {
subject.onNext("Yo");
subject.onNext("Pan");
subject.subscribe(
s -> System.out.println("AsyncSubject: " + s),
s -> System.out.println("AsyncSubject Error: " + s.toString()),
() -> System.out.println("AsyncSubject Complete")
);
subject.onNext("Hello");
subject.onNext("World");
subject.onComplete();
subject.onNext("Alien");
}
}
```
> 
## 操作符
| 操作符 | 解釋 |
| ---------------- | ---------------------------------------------------------------------- |
| doOnSubscribe | 當訂閱者訂閱時可通過此方法監聽 |
| doOnLifecycle | 在觀察者訂閱後,設置是否取消訂閱 |
| doOnNext | 在每次發送訊息前,**做預前處理**,不改變發送的訊息,只**加入一個監聽** |
| doOnEach | 對發送元素和通知進行統一封裝,每發送一次數據就調用一次 |
| doAfterNext | onNext 後執行 |
| doOnCompleted | 做 onComplete 之前的處理 |
| doFinally | doFinally 優先權大於 doAfterTimater |
| doOnTerminate | 發送前預處理 (對 onError & onCompleted 的調用監聽) |
| doAfterTerminate | 當 Observable 調用 onComplete or onError 時觸發 |
### 操作符範例
```java=
public class ThreadChangeObserver {
public static void main(String []args) {
Observable<String> o = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
emitter.onNext("Hello World, Alien");
emitter.onComplete();
}
});
Observer<String> or = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("Get onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
System.out.println("onNext: " + s);
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("onError: " + e.toString());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
o = o.doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Throwable {
System.out.println("doOnNext pre Process");
}
});
o = o.doOnComplete(new Action() {
@Override
public void run() throws Throwable {
System.out.println("doOnComplete method");
}
});
o.subscribe(or);
}
}
```
> 
### 操作符原理
* 使用裝飾模式修飾每個類 (like 洋蔥模型)
* 透過繼承 AbstractObservableWithUpstream,它的內部有一個接口 ObservableSource,而繼承它都必須覆寫
```java=
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
protected final ObservableSource<T> source;
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
```
* 以 doOnComplete 為例
```java=
private Observable<T> doOnEach(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError, @NonNull Action onComplete, @NonNull Action onAfterTerminate) {
Objects.requireNonNull(onNext, "onNext is null");
Objects.requireNonNull(onError, "onError is null");
Objects.requireNonNull(onComplete, "onComplete is null");
Objects.requireNonNull(onAfterTerminate, "onAfterTerminate is null");
return RxJavaPlugins.onAssembly(new ObservableDoOnEach<>(this, onNext, onError, onComplete, onAfterTerminate));
}
...// ObservableDoOnEach 建構函數
public ObservableDoOnEach(ObservableSource<T> source, Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Action onComplete,
Action onAfterTerminate) {
super(source); // 重點
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onAfterTerminate = onAfterTerminate;
}
```
> 重點,**傳入的 ObservableSource 接口,是一開始創建的==被觀察者 this==**
```java=
@Override
public void subscribeActual(Observer<? super T> t) {
source.subscribe(new DoOnEachObserver<>(t, onNext, onError, onComplete, onAfterTerminate));
}
...// DoOnEachObserver 類
static final class DoOnEachObserver<T> implements Observer<T>, Disposable {
...
}
```
> source.subscribe() 的方法又是傳入 new DoOnEachObserver,所以 Observable 呼叫的 Observer 方法就會傳入 DoOnEachObserver 類
>
* 洋蔥模型,**最終呼叫方法都是呼叫最外面的一層**,當呼叫 subscribe 時是
> 1. doOnComplete
> 2. doOnNext
> 3. ObservableCreate
> > 
### 結論
> 所有的操作符都
> > 1. 繼承 AbstractObservableWithUpstream
> > 2. 呼叫方法 subscribeActual (遞歸方法)
> > 3. 內部類的發射器要實作 Observer 接口
### 注意
```java=
public class TestMyOperation {
public static void main(String ...args) {
Observable<String> o = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
emitter.onNext("Hello World, Alien");
emitter.onComplete();
}
});
Observer<String> or = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("Get onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
System.out.println("onNext: " + s);
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("onError: " + e.toString());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
//"1. "
o.map(new Function<String, String>() {
@Override
public String apply(String s) throws Throwable {
System.out.println("------------------apply be code");
return s + " apply";
}
});
o.subscribe(or);
}
}
```
1. 由於是洋蔥模型,如果沒有把原類傳進去的話並不會做更改 (也就是 o.subscribe 的 o 並沒有被包裝),以下才正確,最好可以使用**鏈式調用**
```java=
o = o.map(new Function<String, String>() {
@Override
public String apply(String s) throws Throwable {
System.out.println("------------------apply be code");
return s + " apply";
}
});
```
## Hot & Cold
* 差別可以想成,一個是直播 (Hot) 按照進度觀看,一個是錄頻 (Cold) 看幾次都可以
### Cold Observable
* 觀察者訂閱了才開始發送數據,**觀察者與被觀察者是一對一**,各自事件是獨立的 (預設),透過 just, create, fromxxx 都是冷的被觀察者
* 輕量級,不太消耗資源
> 範例: 使用 Cold Observable 每 10ms 發送一個數據
```java=
public class ColdObservable {
public static void main(String ...args) {
Observable<Long> o = Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Long> emitter) throws Throwable {
Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation())
.take(Integer.MAX_VALUE)
.subscribe(emitter::onNext);
}
}).observeOn(Schedulers.newThread());
o.subscribe( new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Throwable {
System.out.println("consumer1: " + aLong);
}
});
o.subscribe(aLong -> System.out.println("-------consumer2: " + aLong));
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
```
**--實作--**
> 數據是獨立的,並不會互相影響
> 
### Hot Observable
* 不論有沒有觀察者訂閱事件都會發生,**訊息共享,錯過就沒有了** (類似 Android's Broadcast)
* 適合用在**重量級超做**,不用重複執行代碼
* ==小心內存洩漏==
```java=
public class ColdObservable {
public static void main(String ...args) {
ConnectableObservable<Long> o = (ConnectableObservable) Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Long> emitter) throws Throwable {
Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation())
.take(Integer.MAX_VALUE)
.subscribe(emitter::onNext);
}
}).observeOn(Schedulers.newThread()).publish();
o.connect();
o.subscribe(aLong -> System.out.println("consumer1: " + aLong));
o.subscribe(aLong -> System.out.println("-------consumer2: " + aLong));
try {
Thread.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
}
o.subscribe(aLong -> System.out.println("--------------consumer3: " + aLong));
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
```
* 關鍵字
> ConnectableObservable 被觀察者
> publish()、connect()
**--實作--**
> 數據共享,看的出來休眠 30ms 後再訂閱,並不會有之前的消息
> 
## Flowable 背壓
* RxJava 1 中並無特殊處理,RxJava 2 就有特數處理**被觀察者 <==Flowable==\>**
* 被觀察者數據發送過快,至於訂閱者無法來的及處理,造成阻塞現象 (吞吐量不均)
> 上游: 資料產生者
> 下游: 資料處理者
> > 條件: 異步線程,上游產生數據 > 下游處理數據
* Ex: 可以使用在音視頻解碼、解壓縮
### Flowable 策略
| 策略 | 處理方式 |
| -------- | -------- |
| MISSING | 如果無法保持同步就會拋出異常 MissingBackpresureException or IllegalStateException |
| BUFFER | 上游不斷發出 onNext 請求,多餘的會累積到 BUF (緩存池無限大),直到下游處理完 |
| ERROR | 上游無法跟上下游速度會拋出異常 MissingBackpresureException |
| DROP | 上游無法跟上下游速度會,拋棄 onNext 任務 |
| LATEST | 上游保留最新 onNext 任務,直到下游消耗掉 |
### Flowable 實作
* 假設每 10ms 產生一筆數據,但是處理一筆數據需要 1000ms
1. 未使用被壓操作,數據量大時會產生 OOM
```java=
private void TestNoFlow() {
Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
int i = 0;
while(true) {
Log.i("Alien", "send: " + i);
emitter.onNext(i++);
try {
Thread.sleep(10L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(integer -> {
Log.i("Alien", "call: " + integer);
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
```
**--實作--**
> 
2. 使用 Flowable
```java=
private void TestUseFlow() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> emitter) throws Throwable {
int i = 0;
while(true) {
Log.i("Alien", "flow send: " + i);
emitter.onNext(i++);
try {
Thread.sleep(10L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io()) // 被觀察者在 io 線程
.observeOn(AndroidSchedulers.mainThread()) // 觀察者在主線程
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.i("Alien", "-------------flow process: " + integer);
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
```
**--實作--**
> 
## 線程調度
* 在調度主線程時要加入的依賴
```java=
dependencies {
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
}
```
### 調度器
[參考1](https://codertw.com/程式語言/704112/)
[參考2](https://codertw.com/程式語言/647198/#outline__4_1_1)
| 調度器 | 功能 |
| ------------------------------ | ------------------------------------------------------------------------------------------------------ |
| AndroidSchedulers.mainThread() | 需要引用rxandroid, 切換到UI線程 |
| Schedulers.\io() | 用於**IO密集型任務**,如異步阻塞IO操作,這個調度器的線程池會根據需求,它默認是一個CacheThreadScheduler |
| Schedulers.newThread() | 為每一個任務創建一個新線程 |
| Schedulers.trampoline() | 需要引用rxandroid, 切換到UI線程 |
| Scheduler.from(executor) | 指定Executor作為調度器 |
| Scheduler.single() | 單一線程的操作 |
| Scheduler.computation() | 運行在密集計算的操作,大多數異步操作符使用該調度器 |
| observeOn | ==指定觀察者的線程==,依照最後一次設定生效 |
| subscribeOn | ==指定被觀察者的線程==,只有第一次設定的生效 |
### 處理線程切換例子
```java=
private void TestRxThread() {
// 1. 創建被觀察者
Observable observable = Observable.create((ObservableOnSubscribe<String>) emitter -> {
Log.i("Alien","subscribe: " + Thread.currentThread().getName());
emitter.onNext("test1");
});
// 2. 修飾符 map
observable = observable.map(new Function<String,String>(){
@Override
public String apply(String s) throws Exception {
Log.i("Alien","apply: " + Thread.currentThread().getName());
return s + " map";
}
});
//3. 創建觀察者
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("Alien","onSubscribe: " + Thread.currentThread().getName());
Log.i("Alien", "onSubscribe: " + d);
}
@Override
public void onNext(String str) {
Log.i("Alien","onNext: " + Thread.currentThread().getName());
Log.i("Alien", "onNext: " + str);
}
@Override
public void onError(Throwable e) {
Log.i("Alien","onError: " + Thread.currentThread().getName());
Log.i("Alien", "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.i("Alien","onComplete: " + Thread.currentThread().getName());
Log.i("Alien", "onComplete: ");
}
};
// "1. "
Observable so = observable.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.single());
// "2. "
Observable oo = so.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.newThread())
.observeOn(Schedulers.single());
// 4. 訂閱
oo.subscribe(observer);
}
```
1. 被觀察者所在線程設定,會在設定線程處理資訊
2. 觀察者所在線程設定
**--實作--**
> 
### ObserveOn & subscribeOn
* 上面範例可看到 observeOn(被觀察者) 被呼叫多次, 但它使用==最後一次設定== 的 Single Thread
* 而 subscribeOn(觀察者) 被定義多次,但是它只==使用第一次設定==的 IO Thread
主要是因為 `ObserveOn` & `subscribeOn` 的 **subscribeActual 實現方法不一樣**
1. ObserveOn 是 ObservableObserveOn 類 (被觀察者)
```java=
...// 遞規呼叫 subscribe 方法 (又會呼叫回 subscribeActual),最終呼叫到最內層才執行
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
}
}
```
> 
2. subscribeOn 是 ObservableSubscribeOn 類 (觀察者)
```java=
...// 直接呼叫 scheduleDirect 方法,而每次設定都改變,所以是最外層
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
```
> 
### UML
> 
## RxJava & Retrofit
RxJava & Retrofit 很常一起使用,下面會使用 RxJava 的異步線程處理特性
1. 添加 Android 網路權限
```xml=
<uses-permission android:name="android.permission.INTERNET" />
```
2. 添加依賴
```groovy=
dependencies {
...
// Retrofit 套件
implementation 'com.squareup.retrofit2:retrofit:2.7.2'
implementation 'com.squareup.retrofit2:converter-gson:2.7.2'
// RxAndroid & RxJava 依賴
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'io.reactivex.rxjava2:rxjava:2.1.3'
// Retrofit & RxJava 的 Adapter 套件
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.7.2'
// 使用 Gson
implementation 'com.google.code.gson:gson:2.8.5'
}
```
3. 接下來訪問的 json 數據[**網站**](https://www.wanandroid.com/blog/show/2)
```java=
// 項目分類
https://www.wanandroid.com/project/tree/json
// 項目列表數據
https://www.wanandroid.com/project/list/1/json?cid=294
```
### 網路請求使用
:::spoiler Retrofit Bean 類
```java=
public class ProjectBean {
private int errorCode;
private String errorMsg;
private List<DataBean> data;
@Override
public String toString() {
return "ProjectBean{" +
"errorCode=" + errorCode +
", errorMsg='" + errorMsg + '\'' +
", data=" + data +
'}';
}
public int getErrorCode() {
return errorCode;
}
public void setErrorCode(int errorCode) {
this.errorCode = errorCode;
}
public String getErrorMsg() {
return errorMsg;
}
public void setErrorMsg(String errorMsg) {
this.errorMsg = errorMsg;
}
public List<DataBean> getData() {
return data;
}
public void setData(List<DataBean> data) {
this.data = data;
}
public static class DataBean {
/**
* children : []
* courseId : 13
* id : 294
* name : 完整项目
* order : 145000
* parentChapterId : 293
* userControlSetTop : false
* visible : 0
*/
private int courseId;
private int id;
private String name;
private int order;
private int parentChapterId;
private boolean userControlSetTop;
private int visible;
private List<?> children;
@Override
public String toString() {
return "DataBean{" +
"courseId=" + courseId +
", id=" + id +
", name='" + name + '\'' +
", order=" + order +
", parentChapterId=" + parentChapterId +
", userControlSetTop=" + userControlSetTop +
", visible=" + visible +
", children=" + children +
'}';
}
public int getCourseId() {
return courseId;
}
public void setCourseId(int courseId) {
this.courseId = courseId;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getOrder() {
return order;
}
public void setOrder(int order) {
this.order = order;
}
public int getParentChapterId() {
return parentChapterId;
}
public void setParentChapterId(int parentChapterId) {
this.parentChapterId = parentChapterId;
}
public boolean isUserControlSetTop() {
return userControlSetTop;
}
public void setUserControlSetTop(boolean userControlSetTop) {
this.userControlSetTop = userControlSetTop;
}
public int getVisible() {
return visible;
}
public void setVisible(int visible) {
this.visible = visible;
}
public List<?> getChildren() {
return children;
}
public void setChildren(List<?> children) {
this.children = children;
}
}
}
```
:::
```java=
// 接口
public interface WangAndroidApi {
@GET("project/tree/json")
Observable<ProjectBean> getProject();
@GET("project/list/{pageIndex}/json") // ?cid=294
Observable<ProjectItem> getProjectList(@Path("pageIndex") int pageIndex, @Query("cid") int cid);
}
// 使用
public class RxJava_Retrofit extends AppCompatActivity {
// Retrofit 接口
private WangAndroidApi wangAndroidApi;
// 測試網頁
public static String BASE_URL = "https://www.wanandroid.com/";
public static Retrofit createRetrofit() {
return new Retrofit.Builder()
.baseUrl(BASE_URL)
.addConverterFactory(GsonConverterFactory.create())
// 添加 Rxjava 處理工具,否則會出錯
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
}
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rx_java__retrofit);
// Retrofit 動態代理
wangAndroidApi = createRetrofit().create(WangAndroidApi.class);
}
// xml 註冊的點擊事件
@SuppressLint("CheckResult")
public void getProjectAction(View view) {
// 要求網路通訊
wangAndroidApi.getProject()
.subscribeOn(Schedulers.io()) // 被觀察者在異步線程
.observeOn(AndroidSchedulers.mainThread()) // 觀察者在主線程
.subscribe(new Consumer<ProjectBean>() {
@Override
public void accept(ProjectBean projectBean) throws Exception {
Log.d("HelloRx", "getProject: " + projectBean;
}
});
}
}
```
**--結果--**
> 
### 封裝 io 切換
```java=
// 封裝 io
private <T> ObservableTransformer<T, T> ioPackage() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
return upstream
// 被觀察者 -> 子線程
.subscribeOn(Schedulers.io())
// 觀察者在 -> 主線程
.observeOn(AndroidSchedulers.mainThread())
.map(new Function<T, T>() {
@Override
public T apply(T t) throws Exception {
Log.d("HelloRx", "封裝線程調度,這只是其中一站, 可在這裡修改往下傳遞的數據類型");
return t;
}
});
}
};
}
@SuppressLint("CheckResult")
public void getProjectAction(View view) {
// 要求網路通訊
wangAndroidApi.getProject()
.compose(this.<ProjectBean> ioPackage())
.subscribe(new Consumer<ProjectBean>() {
@Override
public void accept(ProjectBean projectBean) throws Exception {
Log.d("HelloRx", "getProject: " + projectBean);
}
});
}
```
* 其中使用的 ioPackage 這個方法,其中的 map 可以改變往下傳的數據類型,也就是 map 可以影響下一個接收的數據類型
> 
**--實做結果--**
> 
## Appendix & FAQ
:::info
:::
###### tags: `Android 基礎`