# RxJS {%hackmd BJrTq20hE %} - 教學 https://mtwmt.com/blog/rxjs/rxjs/ > 同步:此處為`狀態`上的同步 (synchronous),而不是`時間`上的同步 (parallel、平行),因此同步需要等待時間 (一次只能處理一個)。 非同步:非阻塞式I/O,如多個櫃檯排隊點餐 - 文字版彈珠圖參考:https://ithelp.ithome.com.tw/m/articles/10244422 觀察者模式有兩個角色,`觀察者`(Observer)和`發布者`(Publisher),發布者為觀察者的觀察目標 - 發布者(Observable(Publisher),有些書叫 subject,但是,很不巧在RxJS中Subject这个词有另外一个含义):==要負責管理所有觀察者,並在需要時通知他們== - 创建 Observable 对象也就是创建一个“发布者” - Observable是一个特殊类,它接受一个处理Observer的函数 - 觀察者(Observer): - Observer就是一个普通的对象,没有什么神奇之处,只是它必须包含一个名为next的属性,这个属性的值是一个函数,用于接收被“推”过来的数据 流程: ![image](https://hackmd.io/_uploads/HkVwxV2_gx.png) observer 預設會有三個方法,分別是: - `next`:用來==觸發==新的事件資料,呼叫 `next` ==並傳入==新的事件資訊後,在訂閱步驟就會吃到有新事件發生了。 - `error`(optional):只有第一次呼叫會在訂閱步驟觸發,當整個流程發生錯誤時,可以呼叫 `error` 並傳入作物資訊,用來告知錯誤內容,同時整個 Observable 就算是結束了。 - `complete`(optional):當整個 Observable 結束時,用來通知 Observer 已經結束了,由於是單純的結束了,`complete()` 方法不用帶入任何參數。 ## 基本範例: DEMO:https://stackblitz.com/edit/rxjs-book-counter-practice?file=index.ts - 基礎代碼結構說明: ```jsx= import { Observable } from "rxjs/Observable"; // 建立行為流程 const onSubscribe = (observer) => { observer.next(1); observer.next(2); observer.next(3); }; // 调用Observable构造函数,产生一个名为source$的数据流对象。 const source$ = new Observable(onSubscribe); // 创造观察者theObserver,在訂閱時需做的動作 const theObserver = { next: (item) => console.log(item), }; // 通过subscribe函数将theObserver和source$关联起来, // 讓theObserver成为了source$的“观察者” source$.subscribe(theObserver); ``` ### subscription - 在 observable 訂閱後回傳的物件 - 可用來退訂(unsubscribe),observable 就不會再執行其他動作 - 可以和其他訂單合併,一直執行退訂動作 退订Observable ```jsx= const source$ = new Observable(onSubscribe); // 將訂閱物件存到變數裡 (subscription) const subscription = source$.subscribe((item) => console.log(item)); // 退訂 setTimeout(() => { subscription.unsubscribe(); }, 3500); ``` 注意虽然 unsubscribe 函数调用之后,作为 Observer不再接受到被推送的数据,但是作为 Observable 的 source$ 并没有终结,因为始终没有调用 complete,只不过它再也不会调用next函数了。 # Observable基礎類別 ## [Observable類別](https://stackblitz.com/edit/rxjs-book-creation-observable?file=index.ts)(一對一) - 用來建立`可被觀察的`物件,同時在物件的callback function先寫好整個資料流程,當未來訂閱時,就可以執行 ```typescript= const source$ = new Observable(subscriber => { console.log('stream 開始'); //資料流開始(行為) subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.next(4); console.log('stream 結束'); subscriber.complete(); }); ``` ==source$ 為數據流對象 subscribe 為訂閱後要做的事, next為帶入值 or 函數== ```typescript= source$.subscribe({ next: data => console.log(`Observable 第一次訂閱: ${data}`), complete: () => console.log('第一次訂閱完成') }); // 可簡寫,直接省略 key source$.subscribe( data => console.log(`Observable 第一次訂閱: ${data}`), () => console.log('第一次訂閱完成'), () => console.log('No More Data') ); ``` ``` stream 開始 Observable 第一次訂閱: 1 Observable 第一次訂閱: 2 Observable 第一次訂閱: 3 Observable 第一次訂閱: 4 stream 結束 第一次訂閱完成 ``` 每個訂閱者會產生自己的資料流 ## [Subject](https://stackblitz.com/edit/rxjs-book-creation-subject?file=index.ts)(一對多) 繼承Observable,並且有不同特性。與Observable的差別: 1. `Observable`在建立物件的同時,就決定好流向了;而`Subject`是先產生物件再決定流向 2. `Observable`每個訂閱者會得到獨立資料流(一對一);而`Subject`每次事件發生時,會==同步傳遞給多個訂閱者(一對多)== - ```typescript= import { Subject } from 'rxjs'; const source$ = new Subject(); source$.subscribe(data => console.log(`Subject 第一次訂閱: ${data}`)); source$.next(1); //觸發一個訂閱者 source$.subscribe(data => console.log(`Subject 第二次訂閱: ${data}`)); source$.next(2); //會觸發上面兩個訂閱者 ``` ### [BehaviorSubject](https://stackblitz.com/edit/rxjs-book-creation-behaviorsubject?file=index.ts)(pre事件) 1. 訂閱時,會先收到預設值 2. 可用`.value`來讀取最近一次事件的資料 - ```javascript= import { BehaviorSubject } from 'rxjs'; const source$ = new BehaviorSubject(0); //須給預設值 source$.subscribe(data => console.log(`BehaviorSubject 第一次訂閱: ${data}`)); // BehaviorSubject 第一次訂閱: 0 (預設值) source$.subscribe(data => console.log(`BehaviorSubject 第二次訂閱: ${data}`)); // BehaviorSubject 第二次訂閱: 0 (預設值) source$.next(3); // BehaviorSubject 第一次訂閱: 3 (觸發所有訂閱者) // BehaviorSubject 第二次訂閱: 3 console.log(`目前 BehaviorSubject 的內容為: ${source$.value}`); // 目前 BehaviorSubject 的內容為: 3 (取最後一次的值) source$.subscribe(data => console.log(`BehaviorSubject 第三次訂閱: ${data}`)); // BehaviorSubject 第三次訂閱: 3 (剛訂閱會取得最近一次的資料) ``` ### [ReplaySubject](https://stackblitz.com/edit/rxjs-book-creation-replaysubject?file=index.ts)(重播) 訂閱時,重播發生過的事件資料給訂閱者 - ```javascript= import { ReplaySubject } from 'rxjs'; // 設定「重播」最近 3 次資料給訂閱者 const source$ = new ReplaySubject(3); source$.subscribe(data => console.log(`ReplaySubject 範例 (1) 第一次訂閱: ${data}`)); // 訂閱不會觸發事件 source$.next(1); //觸發事件 source$.next(2); source$.subscribe(data => console.log(`ReplaySubject 範例 (1) 第二次訂閱: ${data}`)); // 1, 2 (拿最近 cache 的 3 次資料來重播,但目前只有兩筆), 3, 4 source$.next(3); source$.next(4); source$.subscribe(data => console.log(`ReplaySubject 範例 (1) 第三次訂閱: ${data}`)); // 2, 3, 4 (沒有新的值發生了,純粹拿最近 cache 的 3 次資料重播) ``` 第二個參數 `windowTime`,可用來控制資料保留期限,單位毫秒 ```javascript= // 設定3秒後到期 const source2$ = new ReplaySubject(3, 3000); source2$.subscribe(data => console.log(`ReplaySubject 範例 (2) 第一次訂閱: ${data}`)); source2$.next(1); source2$.next(2); setTimeout(() => source2$.next(3), 2000); // 3秒內,會觸發前面事件 setTimeout(() => source2$.next(4), 6000); // 前面事件已不復存在 ``` ### [AsyncSubject](https://stackblitz.com/edit/rxjs-book-creation-asyncsubject?file=index.ts)(只能用complete()觸發事件) 只有到complete()被呼叫後,才會收到==最後一次==事件資料 - ```javascript= const source$ = new AsyncSubject(); source$.subscribe(data => console.log(`AsyncSubject 第一次訂閱: ${data}`)); // 4 (complete 得到 stream 的最後一次資料) source$.next(1); source$.subscribe(data => console.log(`AsyncSubject 第二次訂閱: ${data}`)); // 4 source$.next(4); // 只有到最後 complete 時才會得到結果,把這行註解掉則上面程式不會印出任何東西 source$.complete(); ``` ### [asObservable](https://stackblitz.com/edit/rxjs-book-asobservable?file=index.ts)(封裝subject) 封裝subject當成observable(被觀察的),因此它沒有next()、complete()、error()等API - ```typescript= import { Subject } from 'rxjs'; // asObservable 應用 class Student { private _score$ = new Subject(); get score$() { return this._score$.asObservable(); } updateScore(score) { // 大於 60 分才允許推送成績事件 if(score > 60){ this._score$.next(score); } } } const mike = new Student(); mike.score$.subscribe(score => { console.log(`目前成績:${score}`); }); mike.updateScore(70); // 目前成績: 70 mike.updateScore(50); // (沒有任何反應) mike.updateScore(80); // 目前成績: 80 // mike.score$.next(50); // (發生 error,next is not a function) ``` ## 科里函數([Curry Function](https://stackblitz.com/edit/rxjs-book-fp-curry-function?file=index.ts)) 把參數先拆出來,再傳入不同的參數設定,就可以動態建立不同意圖的函數 - ```javascript= // power 是一個科里函數 // 把設定 (n) 和 資料 (data) 拆開成兩個參數 const power = n => { return data => { return data.map(value => Math.pow(value, n)); //回傳n次方 }; }; // 計算平方的函數 const square = power(2); // 計算立方的函數 const cube = power(3); console.log(square([1, 2, 3, 4, 5])); //[1, 4, 9, 16, 25] console.log(cube([1, 2, 3, 4, 5])); //[1, 8, 27, 64, 125] ``` ## `pipe` 使用管線函數串起所有Operators: - ```javascript= import { Subject, fromEvent, Observable } from 'rxjs'; import { map } from 'rxjs/operators'; const source$ = new Subject(); //subject$.pipe():subject觸發後依序要做的事 const generateNextId$ = source$.pipe( map((data: any) => data * 2), // 使用 tap 來隔離 side effect tap(data => console.log('目前資料', data)), map((data: any) => data + 1) ); //訂閱後要做的事 generateNextId$.subscribe((data) => console.log(data)); source$.next(1) //subject$.next()觸發subject事件 ``` ## 認識Cold Observable、Hot Observable 問題場景:假设有这样的场景,一个 Observable 对象有两个 Observer 对象来订阅,而且这两个 Observer 对象并不是同时订阅,第一个 Observer 对象订阅N秒钟之后,第二个 Observer 对象才订阅同一个Observable对象,而且,在这N秒钟之内,Observable 对象已经吐出了一些数据。现在问题来了,后订阅上的 Observer,是不是应该接收到“错过”的那些数据呢? - Hot Observable:熱啟動。错过就错过了,資料一直維持啟動狀態,从订阅那一刻开始接受(如生活中用电视机看电视节目)。 - 每一個 subscriptions 間共享相同的來源,並獲取相同的值。 - [Cold Observable](https://stackblitz.com/edit/rxjs-book-cold-observable-api-request?file=index.ts):冷啟動。不能错过,訂閱時,错过的数据也需要获取(如互联网看电视节目) - 每一個 subscription 間取得的值都是各自獨立。 - [Warm Observable](https://stackblitz.com/edit/rxjs-book-warm-observable-api-request?file=index.ts):將冷啟動包裝成熱啟動,例如用冷啟動呼叫一次API拿取資料,而之後用熱啟動來傳輸資料 - Observable 並不是非 Hot 即 Cold,有的 Observable 同時屬於 Code 和 Hot;有的一開始時 Cold 後來變成 Hot。 ## [ReactiveX練習網站](http://reactivex.io/learnrx/) reactivex第12題: ```javascript= // Use one or more map, concatAll, and filter calls to create an array with the following items // [ // {"id": 675465,"title": "Fracture","boxart":"http://cdn-0.nflximg.com/images/2891/Fracture150.jpg" }, // {"id": 65432445,"title": "The Chamber","boxart":"http://cdn-0.nflximg.com/images/2891/TheChamber150.jpg" }, // {"id": 654356453,"title": "Bad Boys","boxart":"http://cdn-0.nflximg.com/images/2891/BadBoys150.jpg" }, // {"id": 70111470,"title": "Die Hard","boxart":"http://cdn-0.nflximg.com/images/2891/DieHard150.jpg" } // ]; // 資料層總共三層,每concatAll一次就攤平一層 // 由於目標資料需取到第三層,所以map三次(迴圈三次),就可以取到每一層的資料 return movieLists .map((movieList) => movieList.videos .map((video) => video.boxarts .filter((boxart) => boxart.width === 150) .map((boxart) => { return { id: video.id, title: video.title, boxart: boxart.url }; //最後map出新陣列後再攤平層數 }) ) .concatAll() ) .concatAll(); // Complete this expression! }); ``` # 彈珠圖 傳統符號約定 ```asc= 時間軸: - (每個 - 代表一格時間) 事件值: a b c ...(任意字元或數字) 完成: | 錯誤: # 同步群發: (ab) // 同一「時間格」內同時送出 a 與 b 訂閱: ^ // 流開始被訂閱的時間點(可選) 未訂閱: (空白) ``` # Operators [如何选择操作符](https://beeth0ven.github.io/RxSwift-Chinese-Documentation/content/decision_tree.html) ## 操作符的功能要點 每个操作符都是一个函数,不管实现什么功能,都必须考虑下面这些功能要点: * 返回一个全新的Observable对象。 * 对上游和下游的订阅及退订处理。 * 处理异常情况。 * 及时释放资源。 ## 操作符各種分類 * 创建类(creation) * 转化类(transformation) * 过滤类(filtering) * 合并类(combination) * 多播类(multicasting) * 错误处理类(error Handling) * 辅助工具类(utility) * 条件分支类(conditional&boolean) * 数学和合计类(mathmatical&aggregate) * 背压控制类(backpressure control) * 可连接类(connectable) * 高阶Observable(higher order observable)处理类 最重要的三类操作符就是:++合并类、过滤类和转化类++。 所有的操作符都是函数,不过有的操作符是Observable类的静态函数,也就是不需要Observable实例就可以执行的函数,所以称为“静态操作符” - 例如:`const source$ = Observable.of(/*一些参数*/);` 另一类操作符是Observable的实例函数,前提是要有一个创建好的Observable对象,这一类称为“实例操作符”。 - 例如:`const result$ = source$.map(/*一些参数*/);` 无论是静态操作符还是实例操作符,它们都会返回一个 Observable 对象。在链式调用中,静态操作符只能出现在首位,实例操作符则可以出现在任何位置,因为链式调用中各级之间靠Observable对象来关联,一个静态函数在链式调用的中间位置是不可能有容身之处的。 ## 自定义的操作符 只对指定的 Observable 对象擴充操作符,可以用 bind,但會汙染全局的 Observable 對象,而不汙染的用法如下,符號為兩個冒號:`::` p.92 ```jsx= const operator = map.bind(source$); // 將 source$ 當綁定對象 const result$ = operator(x => x * 2); // 簡寫 const result$ = map.bind(source$)(x => x * 2); ``` ```jsx= const result$ = source$::map(x => x * 2).filter(x => x % 3 !== 0); ``` ## ⭐建立類型 Operators ### |--- `Empty` (空的Observable) 在用各種opservable來轉換資料流向時,就是希望什麼都別做的時候用 - ```typescript= import { EMPTY } from 'rxjs'; EMPTY.subscribe({ next: data => console.log(`empty 範例: ${data}`), complete: () => console.log('empty 結束') }); // empty 結束 ``` ### |-- throwError、throwIfEmpty (拋出錯誤) ### |--- `of` (將參數帶入 observable 物件) 訂閱時,這些參數會各自送出next(),然後結束 - ```typescript= import { of } from 'rxjs'; of(1, 2, 3, 4) .subscribe(data => console.log(`of 範例 (2): ${data}`)); // of 範例 (2): 1 // of 範例 (2): 2 // of 範例 (2): 3 // of 範例 (2): 4 // 範例二: of("http://foo.com").pipe( switchMap(url => from(getSearchResults(url))), // 根據參數建立新的 observable 物件 retry(3) ) .subscribe({ next: val => console.log(val), error: val => console.log(`Get error ${val}`) }) ``` ### |--- [from](https://stackblitz.com/edit/rxjs-book-operators-from?file=index.ts) (根據參數建立新的Observable) 常用,可接受參數類型包括 陣列、可迭代物件、Promise、其他Obervable物件 - ```typescript= import { from, of } from 'rxjs'; //陣列 from([1, 2, 3, 4]).subscribe(data => { console.log(`from 示範 (1): ${data}`); }); //可迭代物件 from(myRange(1, 4)).subscribe(data => { console.log(`from 示範 (2): ${data}`); }); //promise物件 from(Promise.resolve(1)).subscribe(data => { console.log(`from 示範 (3): ${data}`); }); //其他operator from(of(1, 2, 3, 4)).subscribe(data => { console.log(`from 示範 (4): ${data}`) }); ``` ### |--- `fromEvent` (DOM元素) 將網頁事件包裝成Observable物件,需要傳入兩個參數: 1. target:要監聽的DOM元素 2. eventName:事件 - ```typescript= import { fromEvent } from 'rxjs'; fromEvent(document, 'click').subscribe(event => { console.log('fromEvent 示範: 滑鼠事件觸發了'); }); ``` 注意:如果下游有會導致 complete 的操作符,一旦 complete,click 事件就無法重複使用,例如:[sample](https://stackblitz.com/run?devtoolsheight=50&file=index.ts) ```jsx= import { fromEvent, first } from 'rxjs'; const clicks = fromEvent(document, 'click'); const result = clicks.pipe(first()); // 觸發一次後即終止 result.subscribe(x => console.log(x)); ``` 🔹 **會導致流結束(complete)** 這些運算子有條件或次數限制,會「自動終止」: - `take(n)` → 收到 n 個值就 complete - `first()` → 收到第一個值就 complete - `last()` → 上游 complete 後才輸出最後一個值,然後 complete - `takeWhile(predicate)` → 當條件不成立時立即 complete - `takeUntil(notifier$)` → notifier$ emit 一次就 complete - `timeout` → 超時會拋錯並結束 ➡️ **行為**:一旦觸發結束條件,下游就不再接收任何事件。 ✅ 判斷小技巧:**如果運算子名字有「take/first/last」** → 幾乎一定會導致 complete。 - Node.js 事件範例: ```jsx= import EventEmitter from 'events'; const emitter = new EventEmitter(); const source$ = Observable.fromEvent(emitter, "msg"); source$.subscribe( console.log, (error) => console.log("catch", error), () => console.log("complete") ); emitter.emit("msg", 1); // 1 emitter.emit("msg", 2); // 2 ``` 為什麼必须在 source$ 添加了 Observer 之后再去调用 emitter.emit,否则 Observer 什么都接受不到? > 因為对 fromEvent 而言,数据源在 RxJS 的世界之外,数据的产生也完全不受 RxJS 控制,这就是 Hot Observable 对象的特点。 ### |--- [fromEventPattern](https://stackblitz.com/edit/rxjs-book-operators-fromeventpattern?file=index.ts) (自訂邏輯參數) 可以將自訂的邏輯代入,然後在訂閱時以及取消訂閱時觸發 需傳入兩個參數: 1. `addHandler`:傳入一個function,當`訂閱`時,呼叫此function 2. `removeHandler`:傳入一個function,當`取消訂閱`時,呼叫此function - ```typescript= import { fromEventPattern } from 'rxjs'; const addClickHandler = (handler) => { console.log('fromEventPattern 示範: 自定義註冊滑鼠事件') document.addEventListener('click', event => handler(event)); } const removeClickHandler = (handler) => { console.log('fromEventPattern 示範: 自定義取消滑鼠事件') document.removeEventListener('click', handler); }; const source$ = fromEventPattern( addClickHandler, removeClickHandler ); const subscription = source$ .subscribe(event => console.log('fromEventPattern 示範: 滑鼠事件發生了', event)); setTimeout(() => { subscription.unsubscribe(); }, 3000); ``` ### |--- `range` (數列範圍) 依照範圍數列資料建立Observable,需有兩個參數 1. start:從哪個數值開始 2. count:建立多少數列 - ```typescript= import { range } from 'rxjs'; range(3, 4) .subscribe(data => console.log(`range 範例: ${data}`)); // range 範例: 3 // range 範例: 4 // range 範例: 5 // range 範例: 6 ``` ### |--- [iif](https://stackblitz.com/edit/rxjs-book-operators-iif?file=index.ts) (if) - 透過iif判斷要用怎樣的Observable,需傳入三個參數: ```typescript= iif(condition, true, false); //類三元運算子 ``` ```typescript= import { iif, of, EMPTY } from 'rxjs'; const emitHelloIfEven = (data) => { return iif(() => data % 2 === 0, of('Hello'), EMPTY); }; emitHelloIfEven(1) .subscribe(data => console.log(`iif 範例 (1): ${data}`)); // (不會印出任何東西) emitHelloIfEven(2) .subscribe(data => console.log(`iif 範例 (2): ${data}`)); // iif 範例 (2): Hello ``` ### |--- [interval](https://stackblitz.com/edit/rxjs-book-opereator-interval?file=index.ts) (輪詢) - 每隔一段時間發生一次資料流,預設從 0 開始 ```typescript= import { interval } from 'rxjs'; const subscription = interval(1000) .subscribe({ next: data => console.log(`interval 示範 (2): ${data}`), complete: () => console.log('結束') }); setTimeout(() => { subscription.unsubscribe(); }, 3000); // interval 示範 (1): 0 // interval 示範 (1): 1 // interval 示範 (1): 2 // (重點:「結束」不會印出) ``` 如果不想從 0 開始,需搭配其他操作符 (在RxJS中,每个操作符都尽量功能精简) ```jsx= import { interval, map, take } from 'rxjs'; interval(1000).pipe( map(x => x + 1), // 從 1 開始 take(5) // 只取 5 次 ).subscribe({ next: data => console.log(`interval 示範 (3): ${data}`), complete: () => console.log('結束') }); ``` ### |--- [timer](https://stackblitz.com/edit/rxjs-book-operator-timer?file=index.ts) (特殊的interval) 有三種用途 1. 經過n秒後才開始計時 (效果同 interval) ```typescript= import { timer } from 'rxjs'; // 經過3秒後開始計時,一秒觸發一次 timer(3000, 1000) .subscribe(data => console.log(`timer 示範 (1): ${data}`)); ``` 1. 無延遲開始計時 (第一個參數設0) ```typescript= timer(0, 1000) .subscribe(data => console.log(`timer 示範 (2): ${data}`)); ``` 1. 經過n秒後只觸發一次即結束 (無第二參數) ```typescript= timer(3000).subscribe({ next: data => console.log(`timer 示範 (3): ${data}`), complete: () => console.log(`timer 示範 (3): 結束`) }); ``` ### |--- [defer](https://stackblitz.com/edit/rxjs-book-operators-defer?file=index.ts) (延遲建立 observable,常搭配promise使用) 適用於訂閱時才需要開始建立物件的情境,因為建立 Observable 需要占用资源。 需要傳入一個`factory function`當作參數,而這個 function 需回傳一個 Observable 物件。 當這個defer被訂閱時,才會呼叫factory function並建立資料。 >*`factory function`:類似constructor,可以回傳一個新的物件,但寫法更簡便。 - ```typescript= import { defer, of } from 'rxjs'; // 將 Promise 包成 factory function // 因此在此 function 被呼叫前,都不會執行 Promise 內的程式 const promiseFactory = () => new Promise((resolve) => { console.log('Promise 內被執行了'); setTimeout(() => { resolve(100); }, 1000); }); const deferSource$ = defer(promiseFactory); // 直到被訂閱了,才會呼叫裡面的 Promise 內的程式 deferSource$.subscribe(result => { console.log(`defer 示範 (2): ${result}`) }); ``` ### |--- [throwError](https://stackblitz.com/edit/rxjs-books-operators-throwerror?file=index.ts) (常搭配pipe設計錯誤) - ```typescript= import { throwError } from 'rxjs'; const source$ = throwError('發生錯誤了'); source$.subscribe({ next: (data) => console.log(`throwError 範例 (next): ${data}`), error: (err) => console.log(`throwError 範例 (error): ${err}`), complete: () => console.log('throwError 範例 (complete)'), }); // throwError 範例 (error): 發生錯誤了 ``` ### |--- [ajax](https://stackblitz.com/edit/rxjs-book-operators-ajax?file=index.ts) 直接把ajax包裝成Observable物件,方便跟其他operator組合在一起 - ```typescript= import { Observable } from 'rxjs'; import { ajax } from 'rxjs/ajax'; const source3$ = ajax.getJSON('https://api.github.com/repos/reactivex/rxjs/issues'); source3$.subscribe(result => { console.log('範例 (3):使用 ajax.getJSON()'); console.log(result); }); ``` ### |-- generate:循环创建 類似 for 迴圈,用來補足 range 無法自訂遞增值的缺點 - ```jsx= const result = []; for (let i = 2; i < 10; i += 2) { result.push(i * i); } // 同上 const source$ = Observable.generate( 2, // 初始值,相当于for循环中的i=2 (value) => value < 10, //继续的条件,相当于for中的条件判断 (value) => value + 2, //每次值的递增 (value) => value * value // 产生的结果 ); ``` - 後面三个参数都應保持為純函数 ### |-- repeat:重复上游的数据流 repeat 的功能是可以重复上游 Observable 中的数据若干次 - ```jsx= const source$ = Observable.of(1, 2, 3); const repeated$ = source$.repeat(10); // 將上游的 of 重複 10 遍 ``` 細節:repeat 只有在上游 Observable 对象完结之后才会再次去 subscribe 这个对象,如果上游 Observable 对象永不完结,那 repeat 也就没有机会去 unsubscribe。因为 repeat 的“重复”功能依赖于上游的完结时机。 ### |-- repeatWhen repeat 能够反复订阅上游的 Observable,但是并不能控制订阅的时间,比如希望在接收到上游完结事件的时候等待一段时间再重新订阅 - ```jsx= const notifier = () => { return Observable.interval(1000); }; const source$ = Observable.of(1, 2, 3); const repeated$ = source$.repeatWhen(notifier); // 延遲一秒後訂閱 ``` ## ⭐組合/建立類型 Operators ### |-- [concat](https://stackblitz.com/edit/rxjs-book-operators-concat?file=index.ts) (首尾相连) 將多個 operator 組合成一個,然後依序訂閱執行。(ps:每個 operator 需訂閱「結束」後(complete())才會執行下一個) 不會結束的 operator 如:`interval`或`subject`忘了呼叫 complete - ```typescript= import { of, concat } from 'rxjs'; const sourceA$ = of(1, 2); const sourceB$ = of(3, 4); const sourceC$ = of(5, 6); concat(sourceA$, sourceB$, sourceC$) .subscribe(data => { console.log(data); }); ``` ### |-- [mergeWith](https://stackblitz.com/run?devtoolsheight=50&file=index.ts) (先到先得快速通过) ~~merge (先到先得快速通过)~~:v8 棄用 - 因為 fromEvent 不能同时获得两个事件的数据流,这时候就要借助 merge 的力量了。 - 與 concat 類似,但會同時訂閱參數內所有的 Observable 物件,因此會有平行處理的感覺 > - ```jsx= import { fromEvent, map, mergeWith } from 'rxjs'; const clicks$ = fromEvent(document, 'click').pipe(map(() => 'click')); const mousemoves$ = fromEvent(document, 'mousemove').pipe(map(() => 'mousemove')); const dblclicks$ = fromEvent(document, 'dblclick').pipe(map(() => 'dblclick')); mousemoves$ .pipe(mergeWith(clicks$, dblclicks$)) .subscribe(x => console.log(x)); ``` ### |-- zip:一對一數據組 就像拉链一样做到一对一咬合,將上游數據轉換為數組,然後一對一組成一個陣列。 - ```jsx= import { interval, of, zip } from 'rxjs'; const source1$ = interval(1000); const source2$ = of('a', 'b', 'c'); const zipped$ = zip(source1$, source2$); zipped$.subscribe(console.log, null, () => console.log('complete')); /* [0, "a"] [1, "b"] [2, "c"] complete */ ``` 上例中 interval 會一直产生数据流,而 source2$ 吐完所有數據後調用了 complete,也就是说,只要任何一个上游的 Observable 完结,那么 zip 就会给下游一个 complete 信号。 ### |-- combineLatest:合并最后一个数据 从所有输入 Observable 对象中拿最后一次产生的数据(最新数据),然后把这些数据组合起来传给下游。 注意,这种方式和 zip 不一样,zip 对上游数据只使用一次,但是 combineLatest 可能会反复使用上游产生的最新数据,只要上游不产生新的数据,那 combineLatest 就会反复使用这个上游最后一次产生的数据。 它就像实时气象播报员,只要有新的天气变化情况,他就要把消息广播出去。 - #### 非同步數據 sample: https://stackblitz.com/run?devtoolsheight=50&file=index.ts ```jsx= import { timer, combineLatest } from 'rxjs'; const firstTimer = timer(0, 1000); // emit 0, 1, 2... after every second, starting from now const secondTimer = timer(500, 1000); // emit 0, 1, 2... after every second, starting 0,5s from now const combinedTimers = combineLatest([firstTimer, secondTimer]); combinedTimers.subscribe(value => console.log(value)); // Logs // [0, 0] after 0.5s // [1, 0] after 1s // [1, 1] after 1.5s // [2, 1] after 2s ``` - #### 同步數據 sample: ```jsx= import { of, combineLatest } from 'rxjs'; const source1$ = of('a', 'b', 'c'); const source2$ = of(1, 2, 3); const result$ = combineLatest(source1$, source2$); result$.subscribe(console.log, null, () => console.log('complete')); // ["c", 1] // ["c", 2] // ["c", 3] // complete ``` combineLatest 会依序订阅所有上游的 Observable 对象,只有所有上游 Observable 对象都已经吐出数据了,才会给下游传递所有上游“最新数据”组合的数据。 上面例子步驟說明: 1. 订阅source1$,因为 source1$ 是由 of 产生的同步数据流,在被订阅时就会吐出所有数据,最后一个吐出的数据是字符串c。 2. 当 source1$ 产生数据时,combineLatest 还没来得及去订阅 source2$ 呢,所以,当 combineLatest 接下来去订阅 source2$ 时,source1$ 的“最新数据”就是字符串 c,这时候 source2$ 虽然依然是同步数据流,但它每产生一个数据,都会有准备好的 source1$ “最新数据”。因此,source2$ 产生的每个数据都会引发下游一个数据的产生。] ### |-- withLatestFrom:單純取得最新組合數據 原理:與 combineLatest 不同,前者不論哪個上游皆可驅動推送數據給下游,而後者是给下游推送数据只能由一个上游 Observable 对象驱动,以避免推送不需要的數據 - https://stackblitz.com/edit/typescript-tznzbj?devtoolsheight=100&file=index.ts ```jsx= import { timer, map, withLatestFrom, combineLatest, take } from 'rxjs'; const original$ = timer(0, 1000); const source1$ = original$.pipe(map((x) => x + 'a')); const source2$ = original$.pipe(map((x) => x + 'b')); // const result$ = combineLatest([original$, source1$, source2$]).pipe(take(9)); // 會傳送包含過度數據 const result$ = source1$.pipe(withLatestFrom(source2$), take(5)); // 只傳送最後數據 (只有在 source2$ 觸發時傳遞) result$.subscribe((x) => console.log(x)); ``` ### |-- raceWith:胜者通吃 - https://stackblitz.com/run?devtoolsheight=50&file=index.ts ```jsx= import { interval, map, raceWith } from 'rxjs'; const obs1 = interval(7000).pipe(map(() => 'slow one')); const obs2 = interval(3000).pipe(map(() => 'fast one')); const obs3 = interval(5000).pipe(map(() => 'medium one')); obs1 .pipe(raceWith(obs2, obs3)) .subscribe(winner => console.log(winner)); ``` ### |-- [startWith](https://stackblitz.com/run?devtoolsheight=50&file=index.ts):初始化訂閱數據 當 Observable 对象在被订阅的时候,总是先吐出指定的若干个数据。 - ```jsx= import { timer, map, startWith } from 'rxjs'; timer(1000) .pipe( map(() => 'timer emit'), startWith('timer start') // 訂閱時立刻輸出 ) .subscribe(x => console.log(x)); // results: // 'timer start' // 'timer emit' ``` ### |-- [forkJoin](https://stackblitz.com/run?devtoolsheight=50&file=index.ts):RxJS 界的 Promise.all 會同時訂閱全部傳入的 Observable 物件,直到每個物件訂閱都「結束」後,才將所有输入的 Observable 对象完结之后把最后一个数据合并。 - ```jsx= import { forkJoin, of, timer } from 'rxjs'; const observable = forkJoin([ of(1, 2, 3, 4), // 4 Promise.resolve(8), //8 timer(4000) // 0 ]); observable.subscribe({ next: value => console.log(value), complete: () => console.log('This is how it ends!'), }); // Logs: // [4, 8, 0] after 4 seconds // 'This is how it ends!' immediately after ``` ### |- groupby、partition 🔑 差異重點整理 | 特性 | `groupBy` | `partition` | | --- | --- | --- | | **分組數量** | 多組(不限數量) | 永遠只有 2 組 | | **回傳型態** | `Observable<Observable<T>>` | `[Observable<T>, Observable<T>]` | | **使用場景** | 依 key 動態分群 | true/false 條件分流 | | **複雜度** | 較高,需要 flatten (e.g. `mergeMap`) | 簡單,直接用 | - groupby:適合需要動態建立多個群組(群組數量不固定)。 ```jsx= const people = from([ { name: 'Anna', age: 25 }, { name: 'Bob', age: 25 }, { name: 'Charlie', age: 30 } ]); people.pipe( groupBy(person => person.age), // 依年齡分組 mergeMap(group$ => group$.pipe(toArray())) // 收集每組成陣列 ).subscribe(console.log); // 輸出: // [ { name: 'Anna', age: 25 }, { name: 'Bob', age: 25 } ] // [ { name: 'Charlie', age: 30 } ] ``` - partition: 回傳一個長度 **2 的 Observable 陣列**: - `[0]` → 條件為 true 的值 - `[1]` → 條件為 false 的值 ```jsx= const numbers$ = from([1, 2, 3, 4, 5, 6]); const [even$, odd$] = partition(numbers$, n => n % 2 === 0); even$.subscribe(n => console.log('偶數:', n)); odd$.subscribe(n => console.log('奇數:', n)); // 輸出: // 偶數: 2 // 偶數: 4 // 偶數: 6 // 奇數: 1 // 奇數: 3 // 奇數: 5 ``` ## 數字類操作符 所有这些操作符都是实例操作符,还有一个共同特点,就是这些操作符必定会遍历上游 Observable 对象中吐出的所有数据才给下游传递数据,也就是说,它们只有在上游完结的时候,才给下游传递唯一数据。 ### |- [count](https://stackblitz.com/edit/rxjs-book-operators-count?file=index.ts):统计上游 Observable 对象吐出的所有数据个数。 ```jsx= of(5, 1, 9, 8) .pipe(count((data) => data > 5)) // count 裡面可以加判斷式 .subscribe((data) => { console.log(`count 示範 (2): ${data}`); }); ``` ### |- max ### |- min max 和 min 的用法相同,唯一区别就是 max 是取得上游Observable吐出所有数据的“最大值”,而 min 是取得“最小值”。[Link](https://stackblitz.com/edit/rxjs-book-operators-min?file=index.ts) ```jsx= import { of } from 'rxjs'; import { min } from 'rxjs/operators'; const initialRelease$ = of( { name: 'RxJS', year: 2011 }, { name: 'React', year: 2013 }, { name: 'Redux', year: 2015 } ); const min$ = initialRelease$.pipe( min((a, b) => a.year - b.year) ); min$.subscribe(result => console.log(result)); // 輸出: { name: 'RxJS', year: 2011 } ``` ### |- [reduce](https://stackblitz.com/edit/rxjs-book-operators-reduce?file=index.ts):對所有数据进行更加复杂的统计运算 ```jsx= const donateAmount = [100, 500, 300, 250]; const accumDonate$ = of(...donateAmount).pipe( reduce( (acc, value) => acc + value, // 累加函數 0 // 初始值 ) ); accumDonate$.subscribe((amount) => { console.log(`目前 donate 金額累計:${amount}`); }); // 目前 donate 金額累計:1150 ``` ## 布林類型操作符 ### |- every:上游所有數據判斷 上游 Observable 吐出的每一个数据都会被这个判定函数检验,如果所有数据的判定结果都是 true,那么在上游 Observable 对象完结的时候,every 产生的新 Observable 对象就会吐出一个且是唯一的布尔值 true;反之,只要上游吐出的数据中有一个数据检验为 false,那么也不用等到上游 Observable 完结,every 产生的 Observable 对就会立刻吐出 false。 ```jsx= interval(1000) .pipe(every((data) => data % 2 === 0)) .subscribe((data) => { console.log(`every 示範 (2): ${data}`); }); // 上游未完結即回傳 false ``` ### |- find ### |- findIndex find 和 findIndex 的功能都是找到上游 Observable 对象中满足判定条件的第一个数据,产生的 Observable 对象在吐出数据之后会立刻完结,两者不同之处是,find 会吐出找到的上游数据,而 findIndex 会吐出满足判定条件的数据序号。 ```jsx= interval(1000) .pipe(find((data) => data === 3)) .subscribe((data) => { console.log(`find 示範: ${data}`); // 3 }); ``` ### |- isEmpty:判斷上游 Observable 对象是否為空 对于 throw,就是上游 Observable 直接吐出 error 的情况,isEmpty 并不会处理 error,而是直接把这个 error 丢给了下游。 ```jsx= const source$ = new Observable(observer => { setTimeout(() => { // observer.next(1); // 如果這行打開 → isEmpty() 會輸出 false observer.complete(); // 沒有 next → isEmpty() 輸出 true }, 1000); }); const isEmpty$ = source$.pipe(isEmpty()); isEmpty$.subscribe(val => console.log('isEmpty:', val)); ``` ### |- [defaultIfEmpty](https://stackblitz.com/edit/rxjs-book-operators-defaultifempty?file=index.ts):判斷上游 Observable 是否為空 & 附帶默認值 如果是就把默认值吐出来给下游;如果发现上游 Observable 不是“空的”,就把上游吐出的所有东西原样照搬转交给下游。如果沒給默認值會回傳 null。 ```jsx= const emptySource$ = new Subject(); emptySource$.pipe(defaultIfEmpty('a')).subscribe((data) => { console.log(`defaultIfEmpty 示範 (1): ${data}`); }); setTimeout(() => emptySource$.complete(), 2000); // a ``` ## ⭐過濾類操作符 ### |- [filter](https://stackblitz.com/edit/rxjs-book-operators-filter?file=index.ts) ```jsx= const even$ = source$.pipe( filter(x => x % 2 === 0) ); even$.subscribe({ next: console.log, complete: () => console.log('complete') }); ``` ### |- [first](https://stackblitz.com/edit/rxjs-book-operators-first?file=index.ts) ```jsx= const source$ = of(3, 1, 4, 1, 5, 9).pipe( map((value, index) => ({ value, index })) // 手動附加 index ); const first$ = source$.pipe( first(x => x.value % 2 === 0) ); first$.subscribe(console.log); // {4,2} first$.subscribe(x => console.log([x.value, x.index])); // [4,2] ``` ### |- last:如上相反 ### |- take:从上游 Observable 拿数据,拿够了就完结 只要没有超过给定的数量限制,上游产生一个数据,take都会立刻转手给下游。 ### |- [takeLast](https://stackblitz.com/edit/rxjs-book-operators-takelast?file=index.ts): 只有确定上游数据完结的时候才能产生数据,而且是一次性产生所有数据 ```jsx= const source$ = interval(1000); const take$ = source$.pipe(take(5)); const last3$ = take$.pipe(takeLast(3)); last3$.subscribe(console.log); // 2,3,4 (等上游跑完才會出現) ``` ### |- [takeUntil](https://stackblitz.com/edit/rxjs-book-operators-takeuntil?file=index.ts):持續觸發上游事件,直到另一個指定的 Observable 物件發生新事件時。 ```jsx= const source$ = interval(1000); const notifier$ = timer(2500); const takeUntil$ = source$.pipe(takeUntil(notifier$)); // 在第 2.5 秒時截斷 takeUntil$.subscribe(console.log); // 0, 1 ``` ### |- takeWhile:可以用判斷函式當作參數 takeWhile 接受一个判定函数作为参数,这个判定函数有两个参数,分别代表上游的数据和对应的序号,takeWhile 会吐出上游数据,直到判定函数返回 false,只要遇到第一个判定函数返回 false 的情况,takeWhile 产生的 Observable 就完结。 判斷函式可以傳入三個參數 1. value 2. index 3. inclusive:預設為 false。當設為 true 時,發生結束事件的那次事件值也會被包含在要發生的事件內。 ```jsx= source$.pipe( takeWhile(data => data < 5, true) ).subscribe({ next: data => console.log(`takeWhile 示範 (2): ${data}`), complete: () => console.log('takeWhile 示範 (2): 結束') }); // takeWhile 示範 (2): 1 // takeWhile 示範 (2): 2 // takeWhile 示範 (2): 3 // takeWhile 示範 (2): 4 // takeWhile 示範 (2): 5 // 判斷為 false 的事件值也會秀出 // takeWhile 示範 (2): 結束 ``` ### |- skip:跳过前N个之后全拿 ### |- skipUntil:跳过,直到...之后全拿 ### |- skipWhile:跳过xxx之后全拿 ```jsx= const source$ = interval(1000); // 每秒輸出 0,1,2,3,... const skipWhile$ = source$.pipe( skipWhile(value => value % 2 === 0) // 只在一開始跳過偶數,直到遇到奇數 ); skipWhile$.subscribe({ next: value => console.log(value), complete: () => console.log('complete') }); ``` ### 回壓控制: 如果数据管道中某一个环节处理数据的速度跟不上数据涌入的速度,上游无法把数据推送给下游,就会在缓冲区中积压数据,这就相当于对上游施加了压力,这就是RxJS世界中的“回压”。 那么,既然处理不过来,干脆就舍弃掉一些涌入的数据,这种方式称为“有损回压控制”(Lossy Backpressure Control),通过损失掉一些数据流入和处理的速度平衡,剩下来的问题就是决定舍弃掉哪些数据? RxJS提供了一系列操作符来实现有损的回压控制,因为涉及只让部分上游数据流入下游,所以功能上这些操作符也属于过滤类操作符的范围。 ### |- throttle ### |- debounce throttle 和 debounce 和不带 Time 后缀的兄弟操作符的区别是,这两个操作符不是用时间来控制流量,而是用 Observable 中的数据来控制流量。(可以使用更複雜的條件) - throttle 的参数是一个函数,这个函数应该返回一个 Observable 对象,这个 Observable 对象可以决定 throttle 如何控制上游和下之间的流量。 ```jsx= const source$ = interval(1000); // 每秒輸出 0,1,2,3,... const durationSelector = (value: number) => { console.log(`# call durationSelector with ${value}`); return timer(2000); // 每次觸發後,鎖定 2 秒 }; const result$ = source$.pipe( throttle(durationSelector) ); result$.subscribe({ next: value => console.log(value), complete: () => console.log('complete') }); ``` ### |- audit:查帳,同 throttle,傳遞上游最後一個數據 audit 是做 throttle 类似的工作,不同的是在“节流时间”范围内,throttle 把第一个数据传给下游,audit 是把最后一个数据传给下游。使用方式同上 ### |- sample:採樣 - [網頁應用實例](https://stackblitz.com/edit/rxjs-book-operators-audittime-2bj1suyv?file=index.ts,index.html) 需要傳入一個 notifer 的 Observable 物件,++每當 notifier 訂閱有新事件發生時,sample 就會在上游取一筆最近的事件值。++ 和上面介绍的 throttle、debounce 和 audit 不同,sample 的参数并不是一个返回 Observable 对象的函数,而就是一个简单的 Observable 对象。sample 之所以这样设计,是因为对于“采样”这个动作,逻辑上可以认为和上游产生什么数据没有任何关系,所以不需要一个函数来根据数据产生 Observable 对象控制节奏,直接提供一个 Observable 对象就足够了。 ### |- throttleTime:節流時間內取上游第一個數據 忽略在 duration 时间范围内传递的数据 ```jsx= const source$ = interval(1000); // 每 1 秒輸出 0,1,2,... const result$ = source$.pipe( throttleTime(2000) // 節流,每 2 秒只取一個值 ); result$.subscribe({ next: value => console.log(value), complete: () => console.log('complete') }); ``` ### |- debounceTime:在 dueTime 間隔時間內如沒有新值,才會從上游傳遞數據 當來源 Ob 物件有新事件資料發生時,會等待指定的時間間隔,如果這段時間沒有新的事件發生,就會以目前此事件資料為主;如果有則重新等待時間間隔 ```jsx= const source$ = interval(1000); // 每秒發出 0,1,2,... const filter$ = source$.pipe( filter(x => x % 3 === 0) // 只取能被 3 整除的數字 ); const result$ = filter$.pipe( debounceTime(2000) // 等到 2 秒內沒有新值才發出 ); result$.subscribe({ next: value => console.log(value), complete: () => console.log('complete') }); ``` ### |- auditTime:節流時間內取上游最後一個數據 ```jsx= interval(1000) .pipe(auditTime(1500)) .subscribe((data) => { console.log(`auditTime 示範: ${data}`); }); ``` - 彈珠圖 ```sql= source$ --0--1--2--3--4--5-- throttleTime --0-----2-----4----- 取時間段內第一個數據 auditTime --------1-----3----- 取最後一個數據 ``` 节流的时间就是 1500 毫秒,在这 1500 毫秒时间里产生的所有数据,无论对于 throttleTime 还是 auditTime,都只会放一个数据进入游,问题就是选择放哪一个数据? ### |- sampleTime:在一个范围内取一个数据,抛弃其他数据。 ```jsx= const source$ = concat( interval(500).pipe(take(2), mapTo('A')), interval(1000).pipe(take(3), mapTo('B')), interval(500).pipe(take(3), mapTo('C')) ); // 加上 operator const result$ = source$.pipe(sampleTime(800)); result$.subscribe((x) => console.log(x)); ``` ![image](https://hackmd.io/_uploads/BJ34XV9seg.png) 不管上游 source$ 产生数据的节奏怎样,完全根据自己参数指定的毫秒数间隔节奏来给下游传递数据。 > 表面上看 sampleTime 和 auditTime 非常像,auditTime 也会把时间块中最后一个数据推给下游,但是对于 auditTime 时间块的开始是由上游产生数据触发的,而 sampleTime 的时间块开始则和上游数据完全无关 注意,如果 sampleTime 发现一个时间块内上游没有产生数据,那在时间块结尾也不会给下游传递数据,比如,修改上面例子參數為 500 ![image](https://hackmd.io/_uploads/BkQvS4qjxe.png) sampleTime 传递给下游的数据间隔虽然不均匀,但是依然是参数 500 毫秒的整数倍。 ### |- distinct:去重複 ```jsx= const source$ = of(0, 1, 1, 2, 0, 0, 1, 3, 3); const distinct$ = source$.pipe(distinct()); distinct$.subscribe({ next: (value) => console.log(value), // 0,1,2,3 complete: () => console.log('complete'), }); ``` 如果上游是物件需指定 key 才能去重複 ```jsx= const source$ = of( { name: 'RxJS', version: 'v4' }, { name: 'React', version: 'v15' }, { name: 'React', version: 'v16' }, { name: 'RxJS', version: 'v5' } ); const distinct$ = source$.pipe(distinct((x) => x.name)); ``` - flushes (第二參數) distinct 有一个潜在的问题需要注意,如果上游产生的不同数据很多,那么可能会造成内存泄露,因為 distinct 需要维护一个“唯一数据集合”记录上游推送下来的所有唯一的数据。 使用了 distinct 的第二个参数,distinct 表现出来的行为就会和以前不一样,传递给下游的数据并不是在整个上游所有数据中唯一的,而==只是在一段时间范围内是唯一的==,是否使用这个参数要根据实际应用需求来判断。 ```jsx= const distinct$ = interval(1000).pipe( map((x) => x % 1000), distinct(null, interval(500)) // 每 500 毫秒就會清空 distinct 以前積壓的數據 ); distinct$.subscribe(console.log); ``` ### |- [distinctUntilChanged](https://stackblitz.com/edit/rxjs-book-operators-distinctuntilchanged?file=index.ts):持續果濾掉重複的事件值,直到事件資料變更為止。 拿到一个数据不是和一个“唯一数据集合”比较,而是直接和上一个数据比较。 參數可以傳入一個 comparator function 來自訂比較邏輯 ```jsx= const distinct$ = of(0, 1, 1, 2, 0, 0, 1, 3, 3).pipe( map((x) => x % 1000), distinctUntilChanged() ); distinct$.subscribe(console.log); // 0,1,2,0,1,3 ``` ### |- distinctUntilKeyChanged distinctUntilChanged 的簡化寫法 (來源是物件的話) ```jsx= const source$ = of( { name: 'RxJS', version: 'v4' }, { name: 'React', version: 'v15' }, { name: 'React', version: 'v16' }, { name: 'RxJS', version: 'v5' } ); const distinct$ = source$.pipe(distinctUntilKeyChanged('name')); distinct$.subscribe(console.log); ``` ### |- ignoreElement:忽略所有上游数据,只关心 complete 和 error 事件 ```jsx= import { of, ignoreElements } from 'rxjs'; of('you', 'talking', 'to', 'me') .pipe(ignoreElements()) .subscribe({ next: word => console.log(word), error: err => console.log('error:', err), complete: () => console.log('the end'), }); // 'the end' ``` ### |- elementAt:取得上游指定索引值之數據 `elementAt(index, default)`:第二個參數為預設值 ```jsx= of(3, 1, 2).pipe(elementAt(0)).subscribe(console.log); ``` ### |- single:检查上游是否只有一个满足对应条件的数据 用来检查==上游是否只有一个满足对应条件的数据==,如果答案为“是”,就向下游传递这个数据;如果答案为“否” (或數據多於兩個),就向下游传递一个异常。 ```jsx= interval(1000) .pipe( take(2), single((x) => x % 2 === 0) ) .subscribe(console.log); // 0 ``` ## ⭐轉換數據類 ### |- map ### |- mapTo:將上游的數據轉成指定的常數值 ```jsx= interval(1000).pipe(take(2), mapTo('abc')).subscribe(console.log); // abc * 2 // 等同如下寫法 interval(1000) .pipe( take(2), map(() => 'abc') ) .subscribe(console.log); ``` ### |- pluck:拔,把上游数据中特定字段的值“拔”出来 ```jsx= const source$ = of( { name: 'RxJS', version: 'v4' }, { name: 'React', version: 'v15' }, { name: 'React', version: 'v16' }, { name: 'RxJS', version: 'v5' } ); const distinct$ = source$.pipe(pluck('name')).subscribe(console.log); /* RxJS React React RxJS */ ``` * 缺点:只能“拔”出一个值,如果想要获得 event.target.tagName 同时也要获得event.type,那用 pluck 就做不到了,只能用 map。 * 优点:就是能够自动处理字段不存在的情况 ```jsx= source$ .pipe(pluck('nosuchfield', 'foo')) .subscribe(console.log); ``` 如果上游数据没有 nosuchfield 这个字段,直接访问 nosuchfield.foo 的话肯定会出错,但是如果使用 pluck 则不用考虑,pluck 发现某一层字段为空,对应就会给下游传递 undefined,不会出错。 ### 无损回压控制 (緩存後給下游) 无损的回压控制就是把上游在一段时间内产生的数据放到一个数据集合里,然后把这个数据集合一次丢给下游。 #### 将上游数据放在数组中传给下游的操作符都包含 buffer 这个词 #### 将上游数据放在 Observable 中传给下游的操作符都包含 window 这个词 ### |- bufferTime、windowTime ```jsx= const source$ = timer(0, 500); const distinct$ = source$.pipe(bufferTime(2000)); // 每兩秒將上游數據打包成數組丟給下游 distinct$.subscribe(console.log); /* [1,2,3,4] * [5,6,7,8] */ ``` 如果使用第二个参数,等于指定每个时间区块开始的时间间隔。 如果第一个参数比第二个参数大,那么就有可能出现数据重复,如果第一个参数比第二个参数小,那么就有可能出现上游数据的丢失。 对于 bufferTime,因为需要缓存上游数据,不管参数设定的数据区间有多短,都无法预期在这段时间内上游会产生多少数据,如果上游在短时间内爆发出很多数据,那就会给 bufferTime 很大的内存压力,为了防止出现这种情况,bufferTime 还支持第三个可选参数,用于指定每个时间区间内缓存的最多数据个数。 对于 windowTime,因为它其实并不缓存数据(因為直接開新時間線),所以并不会有 bufferTime 一样的内存压力问题,但是RxJS为了让这两个操作符对应上,让 windowTime 也支持同样功能的第三个可选参数。 ### |- bufferCount、windowCount 效果同上,只是參數變成要打包的個數 ```jsx= const source$ = timer(0, 500); const distinct$ = source$.pipe(take(16),bufferCount(4)); // 每4個數據緩存一次 distinct$.subscribe(console.log); ``` ### |- bufferWhen、windowWhen 用 Observable 对象来控制 Observable 对象的生成。 它接受一个函数作为参数,这个参数名为closingSelector,closingSelector应该回一个Observable对象,用于控制上游的数据分割,每当返回的Observable对象生数据或者完结时,windowWhen就认为是一个缓冲区块的结束,重新开启一个缓冲窗口。 ```jsx= const source$ = timer(0, 500); const closingSelector = () => timer(2000); const result$ = source$.pipe(windowWhen(closingSelector)); result$.subscribe((window$) => { console.log('new window'); window$.subscribe((value) => console.log(' value:', value)); }); ``` ### |- [bufferToggle](https://stackblitz.com/run?devtoolsheight=50&file=index.ts)、windowToggle toggle 的含义就是两个状态之间的切换,windowToggle和bufferToggle也是利用Observable来控制缓冲窗口的开和关。 需要两个参数,第一个参数 opening$ 是一个 Observable 对象,每当opening$ 产生一个数据,代表一个缓冲窗口的开始,同时,第二个参数 closingSelector 也会被调用,用来获得缓冲窗口结束的通知。 ```jsx= const source$ = timer(0, 500); // 每 500ms 發送數字 const openings$ = timer(0, 2000); // 每 2000ms 打開一個新視窗 const closingSelector = (value: number) => { return value % 2 === 0 ? timer(500) : timer(100); // 如果参数为偶数,就会延时500毫秒产生一个数据,否则就延时100毫秒产生一个数据。 }; const result$ = source$.pipe(windowToggle(openings$, closingSelector)); result$.subscribe((win$) => { console.log('new window'); win$.subscribe((value) => console.log(' value:', value)); }); ``` ### |- [buffer](https://rxjs.angular.tw/api/operators/buffer)、window 只支持一个 Observable 类型的参数,称为 notifier$,每当 notifer\$ 产生一个数据,既是前一个缓存窗口的结束,也是后一个缓存窗口的开始。 ```jsx= const clicks = fromEvent(document, 'click'); const intervalEvents = interval(1000); const buffered = intervalEvents.pipe(buffer(clicks)); buffered.subscribe(x => console.log(x)); ``` ### |- scan:可以累加上游數據傳給下游 ## 高阶 Observable 所谓高阶 Observable,指的是 Observable 产生的数据依然是 Observable 的 Observable。意義就是用 Observable 来管理多个 Observable 对象。 * concatAll:让上游Observable对象的数据依次首尾相连 * mergeAll:任何数据先来先进入下游 * zipAll:保证所有上游Observable对象公平,数据要一一对应。 * combineLatestAll All代表“全部”,这些操作符的功能有差异,但都是把一个高阶Observable 的所有内部Observable都组合起来 ### [concatAll](https://stackblitz.com/run?devtoolsheight=50&file=index.ts) 跟 concat 一樣,只有当第一个 Observable 对象完结的时候,才会去订阅第二个内部 Observable 对象。(類似把所有 observable 攤平後接起來) ```jsx= import { fromEvent, map, interval, take, concatAll } from 'rxjs'; const ho$ = interval(1000).pipe( take(2), map((x) => interval(1500).pipe( map((y) => `${x}:${y}`), take(2) ) ) ); const concated$ = ho$.pipe(concatAll()); concated$.subscribe((val) => console.log(val)); /* [0:0] * [0:1] * [1:0] * [1:1] */ ``` ### mergeAll mergeAll 只要发现上游产生一个内部 Observable 就会立刻订阅,并从中抽取收据 ```jsx= ...同上 const concated$ = ho$.pipe(mergeAll()); /* [0:0] * [1:0] 上游還未結束就立刻訂閱 * [0:1] * [1:1] */ ... ``` ### zipAll 與 zip 多了一個上游,根据“拉链”的工作方式,来自不同数据源的数据要一对一配对,这样一来,zipAll 就只能等待,等待上游高阶 Observable 完结,这样才能确定内部 Observable 对象的数量。如果上游的高阶 Observable 不完结,那么 zipAll 就不会开始工作。 ```jsx= import { interval, take, concat, never, map, zipAll } from 'rxjs'; // 先做上游外層 observable: interval(1000) -> take(2) -> concat never() const outer$ = concat( interval(1000).pipe(take(2)), never() ); // outer$ 會發出 0,1 然後就停住 (因為接 never 不會 complete) const ho$ = outer$.pipe( map(x => interval(1500).pipe( map(y => `${x}:${y}`), take(2) ) ) ); // zipAll 等到所有 inner observable 都有值再 zip const concated$ = ho$.pipe(zipAll()); concated$.subscribe({ next: val => console.log(val), complete: () => console.log('done') }); ``` ### combineLatestAll combineAll 和 zipAll 一样,必须上游高阶 Observable 完结之后才能开始给下游产生数据,因为只有确定了作为输入的内部 Observable 对象的个数,才能拼凑出第一个传给下游的数据。 ```jsx= ...同上 const concated$ = ho$.pipe(combineLatestAll()); ... ``` 最後會跳出上游最後數據+下游的最後數據 ## 高階 Map 可以視為高階 Observable + 迴圈功能 ### |- concatMap = map + concatAll 特别适合使用 concatMap 的应用例子,就是网页应用中的拖拽操作。 - [文章說明](https://ithelp.ithome.com.tw/articles/10187756)、[DEMO](https://jsfiddle.net/s6323859/ochbtpk5/3/) ```jsx= import { fromEvent } from 'rxjs'; import { map, filter, takeUntil, concatMap } from 'rxjs/operators'; const video = document.getElementById('video'); const anchor = document.getElementById('anchor'); const scroll$ = fromEvent(document, 'scroll'); const mouseDown$ = fromEvent(video, 'mousedown'); const mouseUp$ = fromEvent(document, 'mouseup'); const mouseMove$ = fromEvent(document, 'mousemove'); const validValue = (value, max, min) => { return Math.min(Math.max(value, min), max); }; // 監聽捲動 scroll$ .pipe( map(() => anchor.getBoundingClientRect().bottom < 0) ) .subscribe(bool => { if (bool) { video.classList.add('video-fixed'); } else { video.classList.remove('video-fixed'); } }); // 拖曳邏輯 (使用 concatMap) // concatMap 會「排隊」處理 mouseDown → mouseMove → mouseUp 的流程。 // switchMap 會讓「新的拖曳會立即中斷舊的拖曳」 mouseDown$ .pipe( filter(() => video.classList.contains('video-fixed')), concatMap(down => mouseMove$.pipe( takeUntil(mouseUp$), map(move => ({ x: validValue(move.clientX - down.offsetX, window.innerWidth - 320, 0), y: validValue(move.clientY - down.offsetY, window.innerHeight - 180, 0) })) ) ) ) .subscribe(pos => { video.style.top = pos.y + 'px'; video.style.left = pos.x + 'px'; }); ``` ### |- mergeMap = map + mergeAll 最典型的应用场景就是对于 AJAX 请求的处理。在一个网页应用中,一个很典型的场景,每点击某个元素就需要发送一个 AJAX 请求给服务器端,同时还要根据返回结果更新网页中的状态 ### |- switchMap = map + switch:新數據優先 和 concatMap、mergeMap 都不一样的是,后产生的内部 Observable 对象优先级总是更高,只要有新的内部 Observable 对象产生,就立刻退订之前的内部 Observable 对象,改为从最新的内部 Observable 对象拿数据。 ### |- exhaustMap = map + exhaust:舊數據優先 和 switchMap 正好相反,先产生的内部 Observable 优先级总是更高,后产生的内部 Observable 对象被利用的唯一机会,就是之前的内部 Observable 对象已经完结。 如果某种 AJAX 请求要维持现存的 AJAX 请求,那就用得上 exhaustMap。比如,如果利用 AJAX 建立服务器和浏览器之间的长连接,让服务器可以沿着 AJAX 通道推送消息下来,那么只要有这样长连接的 AJAX 存在,就没有必要建立新的 AJAX 连接,exhaustMap 适合这样的场景。 ### |- expand:Rx 中的遞迴 这个操作符类似于 mergeMap,但是,所有 expand 传递给下游的数据,同时也会传递给自己,就像是逐层“展开”所有的数据。 - 它會拿到當前值,然後透過你提供的 **投射函數 (project function)** 產生新的 Observable。 - 每一個新產生的值,也會再次丟進同一個投射函數裡,**不斷遞迴展開**,直到外部運算子(例如 `take`、`takeWhile`)或其他條件中止。 ```jsx= const clicks = fromEvent(document, 'click'); const powersOfTwo = clicks.pipe( map(() => 1), // 每次點擊,先從 1 開始 expand(x => of(2 * x).pipe(delay(1000))), // 每個值再產生下一個 (x*2),延遲 1 秒 take(10) // 只取前 10 個,避免無限展開 ); powersOfTwo.subscribe(x => console.log(x)); // 1, 2, 4, 8, 16, ... ``` ## 数据积压處理 concatAll 存在一个问题,当上游高阶 Observable 产生 Observable 对象的速度过快,快过内部 Observable 产生数据的速度,因为 concatAll 要做无损的数据流连接,導致上游數據會一直不斷累積,消化不完,就会造成数据积压,最終導致記憶體洩漏。 ### switchAll:使用切换输入 Observable 來捨棄舊的數據 - 每当 switch 的上游高阶 Observable 产生一个内部 Observable 对象,switch 都会立刻订阅最新的内部 Observable 对象上,如果已经订阅了之前的内部 Observable 对象,就会退订那个过时的内部 Observable 对象。 - sample: ```jsx= import { map, interval, take, switchAll } from 'rxjs'; const ho$ = interval(1000).pipe( take(3), map((x) => interval(700).pipe( take(2), map((y) => `${x}:${y}`) ) ) ); const result$ = ho$.pipe(switchAll()); result$.subscribe((val) => console.log(val)); ``` - 彈珠圖: 每个外部 Observable 产生的间隔是1秒钟,每个内部 Observable 产生数据的间隔是700毫秒,这样几个内部 Observable 产生的数据在时间上会有重叠。 ```sql= outer$: --0------1------2---| inner1$: ---00----01----| |-切換 inner2$: ----10----11----| |-切換 inner3$: ----20----21----| result$: -----0:0----1:0-----2:0---2:1| ``` 从图中可以清楚地看到,第一个Observable对象有机会产生数据0:0,但是在第二个数据0:1产生之前,第二个内部Observable对象产生,这时发生切换,第一个内部Observable就退场了。同样,第二个内部Observable只有机会产生一个数据1:0,然后第三个内部Observable对象产生,之后没有新的内部Observable对象产生,所以第三个Observable对象的两个数据2:0和2:1都进入了下游。 ### exhaustAll - exhaust 的含义就是“耗尽”,这个操作符的意思是,在耗尽当前内部Observable 的数据之前不会切换到下一个内部 Observable 对象。 同样是连接高阶 Observable 产生的内部 Observable 对象,但是exhaust 的策略和 switch 相反,情景就是前一个内部 Observable 还没有完结,而新的 Observable 又已经产生,exhaust 则选择前一个内部 Observable 对象。 ```jsx= import { fromEvent, map, interval, take, exhaustAll } from 'rxjs'; const clicks = fromEvent(document, 'click'); const higherOrder = clicks.pipe(map(() => interval(1000).pipe(take(5)))); const result = higherOrder.pipe(exhaustAll()); result.subscribe((x) => console.log(x)); ```