傳統的 Button 點擊事件:
let button = UIButton()
button.addTarget(self , action: #Selector(buttonTapped) , for: .touchUpInside)
func buttonTapped(){
print("button Tapped")
}
以 Rx 的方式:
let button = UIButton()
button.rx.tap
.subscribe(onNext:{
print("button Tapped")
})
.disposed(by: disposeBag)
ObservableType.subscribe(_:)
相當於 Sequance.makeIterator()
onNext(_:)
來返回Observable<Element>
代表的是一個可觀察序列,從字面上的意思可以看出這是在觀察者模式中的觀察者,他會向觀察對象發送事件序列 1. onNext(Element):代表新事件,他會將可觀察對象的最新值傳給觀察者
2. onError(ErrorTypre):帶有異常的完成序列
3. onCompleted():正常事件完成序列
– Error 跟 completed 結束後不會繼續觸發事件
API.download(file: "https://~~~~~~~~")
.subscribe(onNext: { data in
//append data to file
},
onError: { error in
//display error tp user
},
onCompleted: {
//use downloaded files
})
UIDevice.rx.orientation
.subscribe(onNext: { current in
switch current{
case .landscape:
//re-arrange UI for landscape
case .portrait:
//re-arrange UI for portrait
}
})
Observable
的元素Observable
產生了元素,就發出這個元素Observables
一個接一個的發出元素,當一個Observable
元素發送完畢後,下一個Observable
才能開始發出元素PublishSubject :
ex:
let subject = PublishSubject<String>()
subject.onNext("A")
subject.subscribe(onNext: {
print($0)
})
.disposed(by: disposeBag)
subject.onNext("B")
subject.onNext("C")
output:
B
C
onNext
時,會執行PublishSubject.on
函數,並繼續調用 dispatch 函數和 self._synchronized_on(event)
函數
self._synchronized_on(event)
函數最終會返回self._observers
self._observers
的類型是Bag<(Event<Element>) -> Void>
subscribe
let key = self._observers.insert(observer.on)
_dictionary
值為nil,且 _pairs.count
小於 arrayDictionaryMaxSize
,所以,觀察者的 on函數 observer.on
被加入到 _pairs
數組中。onNext
函數,步驟跟第一次一樣,只是最後執行 dispatch 函數時,因為bag._pairs
有保存一個觀察者的 on 函數代碼,所以會執行回調subscribe.onNext
閉包,產生outputBehaviorSubject :
ex:
let subject = BehaviorSubject<Int>(value: 100)
subject.subscribe(onNext: {
print("訂閱1: \($0)")
})
.disposed(by: disposeBag)
subject.onNext(3)
subject.onNext(5)
subject.subscribe(onNext: {
print("訂閱2: \($0)")
})
.disposed(by: disposeBag)
output:
訂閱1: 100
訂閱1: 3
訂閱1: 5
訂閱2: 5
_synchronized_subscribe
函數中比 PublishSubject 多一行代碼,執行了一次 observer.on
函數,並將 self._element
作為參數傳遞。 self._element
中保存的是最新發送的元素,如果沒有最新元素,則為 init
初始化時的默認元素self._element
中,在執行執行 subscribe
時,發送出去ReplaySubject :
ex:
let subject = ReplaySubject<Int>.create(buffersize: 2)
subject.onNext(1)
subject.onNext(2)
subject.onNext(3)
subject.subscribe(onNext: {
print($0)
})
.disposed(by: disposeBag)
subject.onNext(4)
subject.onNext(5)
subject.onNext(6)
output:
2
3
4
5
6
onNext
函數調用的是 ReplayBufferBase
的 on
函數self._observers
之前,先調用了self.replayBuffer(anyObserver)
會執行 ReplayManyBase.replayBuffer
函數。override func replayBuffer<O: ObserverType>(_ observer: O) where O.E == Element {
for item in self._queue {
observer.on(.next(item))
}
}
AsyncSubject :
ex:
let subject = AsyncSubject<Int>()
subject.onNext(1)
subject.onNext(2)
subject.subscribe({
print($0)
})
.disposed(by: disposeBag)
subject.onNext(3)
subject.onNext(4)
subject.onCompleted()
output:
next(4)
completed
是Observable
的另外一個版本,不像Observable
可以發出多個元素,他要嘛只能發出一個元素,要嘛產生一個 error 事件
func getRepo(_ repo: String) -> Single<[String: Any]> {
return Single<[String: Any]>.create { single in
let url = URL(string: "https://api.github.com/repos/\(repo)")!
let task = URLSession.shared.dataTask(with: url) { data, _, error in
if let error = error {
single(.error(error))
return
}
guard let data = data,
let json = try? JSONSerialization.jsonObject(with: data, options: .mutableLeaves),
let result = json as? [String: Any] else {
single(.error(DataError.cantParseJSON))
return
}
single(.success(result))
}
task.resume()
return Disposables.create { task.cancel() }
}
}
getRepo("ReactiveX/RxSwift")
.subscribe(onSuccess: { json in
print("JSON: ", json)
}, onError: { error in
print("Error: ", error)
})
disposed(by: disposeBag)
同樣可以對Observable
調用.asSingle
來將他轉為Single
同樣是Observable
的另外一個版本,不像Observable
可以發出多個元素,他要嘛只能產生一個 completed 事件,要嘛產生一個 error 事件
Completable
適用於你關心的任務是否完成,而不需要在意任務返回值的奇框,這點跟 Observable<Void>
有點類似func cacheLocally() -> Completable {
return Completable.create { completable in
// Store some data locally
...
...
guard success else {
completable(.error(CacheError.failedCaching))
return Disposables.create {}
}
completable(.completed)
return Disposables.create {}
}
cacheLocally()
.subscribe(onCompleted: {
print("Completed with no error")
}, onError: { error in
print("Completed with an error: \(error.localizedDescription)")
})
.disposed(by: disposeBag)
他介於Single
和Completable
之間。他要嘛只能發出一個元素,要嘛產生一個 completed 事件,要嘛產生一個 error 事件
func generateString() -> Maybe<String> {
return Maybe<String>.create { maybe in
maybe(.success("RxSwift"))
// OR
maybe(.completed)
// OR
maybe(.error(error))
return Disposables.create {}
}
}
generateString()
.subscribe(onSuccess: { element in
print("Completed with element \(element)")
}, onError: { error in
print("Completed with an error \(error.localizedDescription)")
}, onCompleted: {
print("Completed with no element")
})
.disposed(by: disposeBag)
同樣可以對 Observable 調用.asMaybe()
將它轉換為 Maybe
是一個精心準備的特徵序列。他主要是為了簡化 UI 層的代碼,過如果你遇到序列具有以下特徵:
有時與 UI 互動的網路請求若有一個錯誤,這個錯誤將取消所有綁定,導致用戶在輸入新的關鍵字時沒有辦法發起新的網路請求,同時若成功的 item 從後台返回序列,那麼 UI 的更新也會在後台進行,就會造成異常崩潰
再越大型的系統內想要確保每一部不被遺漏是一件困難的事,因此需要運用合理的特徵序列來處理,來避免錯誤
let results = query.rx.text.asDriver() // 将普通序列转换为 Driver
.throttle(0.3, scheduler: MainScheduler.instance)
.flatMapLatest { query in
fetchAutoCompleteItems(query)
.asDriver(onErrorJustReturn: []) // 仅仅提供发生错误时的备选返回值
}
results
.map { "\($0.count)" }
.drive(resultCount.rx.text) // 这里改用 `drive` 而不是 `bindTo`
.disposed(by: disposeBag) // 这样可以确保必备条件都已经满足了
results
.drive(resultsTableView.rx.items(cellIdentifier: "Cell")) {
(_, result, cell) in
cell.textLabel?.text = "\(result)"
}
.disposed(by: disposeBag)
任何可監聽序列都可以被轉為 Driver ,只要滿足上面三個條件
而在Driver
裡使用drive()
而不是bind(to:_)
,因為drive()
方法只能被Driver
調用,這表示如果發現代碼存在drive()
那麼這個序列不會產生錯誤事件並且一定在主線程監聽,這樣就可以安全地進行 UI 綁定
Signal
跟Driver
相似,唯一的區別是,Driver
會對新觀察者回放(重新發送)上一個元素,而 Signal
不會對新觀察者回放上一個元素
專門用於描述 UI 控件所產生的事件,他具有以下特徵:
可以用來描述任何一種觀察者
主要以下兩個特徵:
fatalError
,在發布環境下將打印錯誤訊息Binder
可以只處理 next 事件,並且保證響應 next 事件的代碼一定會在Scheduler
上執行Swift
,所以我們更習慣於使用 ARC 來管理內存,因此我們依舊可以用 ARC 來管理我們的訂閱生命週期,就是用DisposeBag
(清除包)來實現訂閱管理機制DisposeBag
被釋放的時候,DisposeBag
內部的所有可被清除的資源都將被清除Schedulers
(調節器)是 Rx 實現多線程核心模式,他主要用於控制任務在哪個線程或列隊運行ex: 原本GCD寫法:
// 后台取得数据,主线程处理结果
DispatchQueue.global(qos: .userInitiated).async {
let data = try? Data(contentsOf: url)
DispatchQueue.main.async {
self.data = data
}
}
如果用 RxSwift 實現:
let rxData: Observable<Data> = ...
rxData
.subscribeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
.observeOn(MainScheduler.instance)
.subscribe(onNext: { [weak self] data in
self?.data = data
})
.disposed(by: disposeBag)
subscribeOn
:
subscribeOn
來決定數據序列的構建函數在哪個Scheduler
上運行,由上面例子來說,由於獲取 data 需要花很長的時間,所以用subscribeOn
切換到後台Scheduler
來獲取 data,就可以避免主線程被阻塞observeOn
:
obseveOn
來決定在哪個Scheduler
監聽這個數據序列,由上面例子來說,通過使用observeOn
方法切換到主線程來監聽應且處理結果subscribeOn
切到後台去發送請求並解析數據,最後用observeOn
切換到主線程更新頁面MainScheduler
表示主線程。如果你需要執行一些和 UI 相關的任務,就需要切換到該Scheduler
運行DispatchQueue
。如果你需要執行一些串行任務,可以切換到這個Scheduler
運行DispatchQueue
。如果你需要執行一些並行任務,可以切換到這個Scheduler
運行NSOperationQueue
。他具備NSOperationQueue
的一些特點,例如,你可以通過設置maxConcurrentOperationCount
來控制同時執行併發任務的最大數量retry 可以讓序列發生在錯誤後重試:
// 请求 JSON 失败时,立即重试,
// 重试 3 次后仍然失败,就将错误抛出
let rxJson: Observable<JSON> = ...
rxJson
.retry(3)
.subscribe(onNext: { json in
print("取得 JSON 成功: \(json)")
}, onError: { error in
print("取得 JSON 失败: \(error)")
})
.disposed(by: disposeBag)
以上的代碼retry(3)
就是當發生錯誤時,就進行重試操作,並且最多重試3次
如果我們需要再發生錯誤時,經過一段延遲後重試,就可以用retryWhen
實現:
// 请求 JSON 失败时,等待 5 秒后重试,
let retryDelay: Double = 5 // 重试延时 5 秒
rxJson
.retryWhen { (rxError: Observable<Error>) -> Observable<Int> in
return Observable.timer(retryDelay, scheduler: MainScheduler.instance)
}
.subscribe(...)
.disposed(by: disposeBag)
這裡我們需要用到retryWhen
,他主要敘述應該在何時重試,並且通過閉包裡面返回的Observable
來控制重試的機制:
.retryWhen { (rxError: Observable<Error>) -> Observable<Int> in
...
}
閉包裡面的參數是Observable<Error>
也就是所產生錯誤的序列,然後返回值是一個Observable
,當這個返回值的Observable
發出一個元素時,就進行重試操作。當他發出一個 error 或者 completed 事件時就不會重試,並且將這個事件傳遞到後面的觀察者
如果需要加上一個最大重試次數的設定:
// 请求 JSON 失败时,等待 5 秒后重试,
// 重试 4 次后仍然失败,就将错误抛出
let maxRetryCount = 4 // 最多重试 4 次
let retryDelay: Double = 5 // 重试延时 5 秒
rxJson
.retryWhen { (rxError: Observable<Error>) -> Observable<Int> in
return rxError.flatMapWithIndex { (error, index) -> Observable<Int> in
guard index < maxRetryCount else {
return Observable.error(error)
}
return Observable<Int>.timer(retryDelay, scheduler: MainScheduler.instance)
}
}
.subscribe(...)
.disposed(by: disposeBag)
catchError
可以在錯誤產生時,用一個備用元素或者一組備用元素將錯誤替換掉,這樣可以使得Observable
正常結束或者根本不需要結束:
let disposeBag = DisposeBag()
let sequenceThatFails = PublishSubject<String>()
let recoverySequence = PublishSubject<String>()
sequenceThatFails
.catchError {
print("Error:", $0)
return recoverySequence
}
.subscribe { print($0) }
.disposed(by: disposeBag)
sequenceThatFails.onNext("😬")
sequenceThatFails.onNext("😨")
sequenceThatFails.onNext("😡")
sequenceThatFails.onNext("🔴")
sequenceThatFails.onError(TestError.test)
recoverySequence.onNext("😊")
output:
next(😬)
next(😨)
next(😡)
next(🔴)
Error: test
next(😊)
如果只是想要給用戶錯誤提示,以下提供一個最為直接的方案:
public enum Result<Success, Failure> where Failure : Error {
case success(Success)
case failure(Failure)
}
updateUserInfoButton.rx.tap
.withLatestFrom(rxUserInfo)
.flatMapLatest { userInfo -> Observable<Result<Void, Error>> in
return update(userInfo)
.map(Result.success) // 转换成 Result
.catchError { error in Observable.just(Result.failure(error)) }
}
.observeOn(MainScheduler.instance)
.subscribe(onNext: { result in
switch result { // 处理 Result
case .success:
print("用户信息更新成功")
case .failure(let error):
print("用户信息更新失败: \(error.localizedDescription)")
}
})
.disposed(by: disposeBag)
如此一來將錯誤事件包裝成了Result.failure(Error)
元素,就不會終止整個程序,即便網路請求失敗了,整個訂閱依然存在,如果用戶再次點擊按鈕,也是能夠發起網路請求進行操作
在多個Observables
中,取第一個發出元素或產生事件的Observable
,然後只發出她的元素
當你傳入多個Observables
到amb
時,他將取其中一個Observable
,amb
將忽略掉其他的Observables
緩存元素,然後將緩存的元素集合,週期性的發出來
buffer
將緩存Observable
中發出的新元素,當元素達到某個數量,或者經過了特定的時間,他就會將這個元素集合發送出來
catchErrorJustReturn
會將 error 事件替換成其他一個元素,然後結束該序列:
let disposeBag = DisposeBag()
let sequenceThatFails = PublishSubject<String>()
sequenceThatFails
.catchErrorJustReturn("😊")
.subscribe { print($0) }
.disposed(by: disposeBag)
sequenceThatFails.onNext("😬")
sequenceThatFails.onNext("😨")
sequenceThatFails.onNext("😡")
sequenceThatFails.onNext("🔴")
sequenceThatFails.onError(TestError.test)
output:
next(😬)
next(😨)
next(😡)
next(🔴)
next(😊)
completed
RxSwift
是一個響應式編程的基礎框架,他並不會強制要求你使用某種架構,他和多個應用程序完美適配:RxCocoa
給 UI 框架提供了 Rx 的支持,讓我們能夠使用按鈕點擊序列,輸入框的當前文字等等。不過 RxCocoa
也只是 RxSwift
生態系統中的一員。RxSwift
生態系統還給其他框架提供了 Rx 支持:UITableView
& UICollectionView
數據庫CoreData
數據庫Realm
數據庫WebView
書寫 tableView 或 collectionView 的數據源是一件非常瑣碎的事情,有一大堆的代理方法需要被執行。RxdataSources
可以幫助簡化這個過程,你只需要幾行代碼就可以佈局 view,而且他還提供動畫支持
let dataSource = RxTableViewSectionedReloadDataSource<SectionModel<String, Int>>()
Observable.just([SectionModel(model: "title", items: [1, 2, 3])])
.bind(to: tableView.rx.items(dataSource: dataSource))
.disposed(by: disposeBag)
let dataSource = RxTableViewSectionedReloadDataSource<SectionModel<String, Double>>(
configureCell: { (_, tv, indexPath, element) in
let cell = tv.dequeueReusableCell(withIdentifier: "Cell")!
cell.textLabel?.text = "\(element) @ row \(indexPath.row)"
return cell
},
titleForHeaderInSection: { dataSource, sectionIndex in
return dataSource[sectionIndex].model
}
)
override func viewDidLoad() {
super.viewDidLoad()
let dataSource = self.dataSource
let items = Observable.just([SectionModel(model: "First section", items: [
1.0,
2.0,
3.0
]),
SectionModel(model: "Second section", items: [
1.0,
2.0,
3.0
]),
SectionModel(model: "Third section", items: [
1.0,
2.0,
3.0
])
])
items
.bind(to: tableView.rx.items(dataSource: dataSource))
.disposed(by: disposeBag)
tableView.rx
.itemSelected
.map { indexPath in
return (indexPath, dataSource[indexPath])
}
.subscribe(onNext: { pair in
DefaultWireframe.presentAlert("Tapped `\(pair.1)` @ \(pair.0)")
})
.disposed(by: disposeBag)
tableView.rx
.setDelegate(self)
.disposed(by: disposeBag)
}
// to prevent swipe to delete behavior
func tableView(_ tableView: UITableView, editingStyleForRowAt indexPath: IndexPath) -> UITableViewCellEditingStyle {
return .none
}
func tableView(_ tableView: UITableView, heightForHeaderInSection section: Int) -> CGFloat {
return 40
}
}