# RxJS Operator ## Creation ### create 引入給定的訂閱函式來創建 Observable。 ```javascript const hello = Rx.Observable.create(function(observer) { observer.next('Hello'); observer.next('World'); }); const subscribe = hello.subscribe(val => console.log(val)); // outputs: // Hello // World ``` ### empty 返回一個立即完成的空 Observable。 ```javascript import { empty } from 'rxjs/observable/empty'; const subscribe = example.subscribe({ next: () => console.log('Next'), complete: () => console.log('Complete!') }); //output: 'Complete!' ``` ### from 將 array、promise、iterator 轉成 Observable。 - 用在 promise 上等同 fromPromise。 - 用在 array、iterator、string 上,其內容會依序發出。 - array 第二層以上會維持數列型態。 - ```javascript const arraySource = Rx.Observable.from([1,2,3,4,5, [8,9]]); const subscribe = arraySource.subscribe(val => console.log(val)); // outputs: // 1 // 2 // 3 // 4 // 5 // [8,9] const promiseSource = Rx.Observable.from( new Promise( resolve => resolve('Hello World'))); const subscribe2 = promiseSource.subscribe(val => console.log(val)); // outputs: // "Hello World" ``` ### fromEvent 將事件轉成 Observable。 ```javascript //create observable that emits click events const source = Rx.Observable.fromEvent(document, 'click'); //map to string with given event timestamp const example = source.map(event => `Event time: ${event.timeStamp}`) //output (example): 'Event time: 7276.390000000001' const subscribe = example.subscribe(val => console.log(val)); ``` ### of 用來發出任意數量的任意值。 - 任意值除了number、string 外,還包含 Array、Object、function。 - Array、Object 等不會被攤平。 ```javascript const source = Rx.Observable.of(1,2, {a: 3}, [4,5,6], function foo() { return bar; }); const subscription = source.subscribe(val => console.log(val)); // outputs: // 1 // 2 // [object Object] { // a: 3 // } // [4,5,6] // function foo() { return bar; } ``` ### interval 在給定的時間參數間隔,發出由 0 起始的數字序列。 ```javascript const source = Rx.Observable.interval(1000); const subcribe = source.subscribe(cal => console.log(val)); // outputs: // 1s => 0 // 2s => 1 // 3s => 2 // ... ``` ### timer 給定初始延遲與時間間隔,依序發出由 0 起始的數字序列。 ```javascript const source = Rx.Observable.timer(3000, 1000); const subscribe = source.subscribe(val => console.log(val)); // outputs: // 3s => 0 // 4s => 1 // 5s => 2 // ... ``` ### range 發出給定數字區間內的數字序列。 ```javascript const source = Rx.Observable.range(5,8); const subscribe = source.subscribe( val => console.log(val)); // outputs: // 5 // 6 // 7 // 8 ``` ### throw 發出錯誤訊息 ```javascript const = Rx.Observable.throw('This is an Error'); const subscribe = source.subscribe({ next: val => console.log(val); complete: () => console.log('Complete!'); error: val => console.log(val); }); // outputs: // This is an Error ``` --- ## Combination ### combineLatest 當每個 Observable 都發出值時,combineLatest 會發出**每個Observable 的最新值**(會以 Array 形式發出 - 只要其中任一 Observable 發出新值,就會此Operator 就會發出每個 Observable 目前的值。 - 當每個 Observable 都已發出初始值之後,才會出現第一個值。因此如果其中任一個未被觸發,這個訂閱會一直沒有出現初始值。 - 需要順序性,用 concatAll / concateMap - 每個值都需要出現,用 merge / mergeAll / mergeMap - 只想拿到最新值,用combinedLatest / combinedAll ```javascript const t1 = Rx.Observable.timer(1000, 4000); // 1s 後發出第一個值,之後依序每 4s 發出值。 const t2 = Rx.Observable.timer(2000, 4000); // 2s 後發出第一個值,之後依序每 4s 發出值。 const t3 = Rx.Observable.timer(3000, 4000); // 3s 後發出第一個值,之後依序每 4s 發出值。 const combined = Rx.Observable.combineLatest(t1, t2, t3); const subscription = combined.subscribe( result => { console.log(result); }); // 3s 印出 [0, 0, 0] // 4s 印出 [1, 0, 0] // 5s 印出 [1, 1, 0] // 6s 印出 [1, 1, 1] // 9s 印出 [2, 1, 1] // ... ``` combinedLatest 還可接受最後一個可選的 callback function,接受前面的Observable 發出的值當參數做處理。上例可改如下: ```javascript const combined = Rx.Observable.combinedLatest(t1, t2, t3, (r1, r2, r3) => { // 做處理後回傳 return [r1, r2, r3]; }); ``` ### combinedAll 等待外部 Observable 結束後,將所有內部的 Observables 用 combinedLatest 作合併。 ### concat 前一個訂閱完成後,才開始下一個訂閱。 (將多個 Observable 依給定的 Observable 的順序攤平。) 如果前一個訂閱沒有結束,後一個訂閱就一直不會送值出來。 ```javascript // 每秒送出一個序列值,並只取前三值; // 如果沒有 .take(), sourceTwo 將不會被取值。 const sourceOne = Rx.Observable.interval(1000).take(3); // 同時發出 4,5,6 const sourceTwo = Rx.Observable.of(4,5,6); // emit values from sourceOne, when complete, subscribe to sourceTwo const example = sourceOne.concat(sourceTwo); const subscribe = example.subscribe(val => console.log('Example: Basic concat:', val)); // 1s 收到 0 // 2s 收到 1 // 3s 收到 2 4 5 6 ``` ### concateAll 訂閱多個發出 Observable 的 Observable,並在前一個內部的 Observable 完成後,才接著訂閱下一個。 ### forkJoin 當所有 Observable **已完成**時,發出每個 Observable 的最新值。 - 如果其中某個 Observable 一直未完成的話,forkJoin 將不會發出值。 ### merge 將多個 Observable 依每個值出現的時間順序攤平。 ```javascript= const first = Rx.Observable.interval(3000); const second = Rx.Observable.interval(1000); const example = first.merge(second); //實例方法 // 也可使用靜態方法 Rx.Observable.map(first, second); const subscribe = example.subscribe(val => console.log(val)); // outputs: // 1s => 0 // 2s => 1 // 3s => 0 // 2 // 4s => 3 // 5s => 4 // 6s => 1 // 5 // ... ``` ### mergeAll 訂閱多個發出 Observable 的 Observable,並將內部的 Observable 依時間順序攤平。 ### race 在多個 Observable 中,只取最先發出的值。 ```javascript= const source1 = Rx.Observable.interval(2500).mapTo(2300); const source2 = Rx.Observable.interval(500).mapTo(500); const source3 = Rx.Observable.interval(1000).mapTo(1100); const example = Rx.Observable.race( source1, source2, source3 ); const subscribe = example.subscribe(val => console.log(val)); // outputs: // 0.5s => 500 // 1s => 500 // 1.5s => 500 // ... ``` ### startWith 在發出的值前面加上一個給定的值。 ```javascript= const source = Rx.Observable.interval(1000); const example = source.startWith(-3,-4).startWith(-9); const subscribe = example.subscribe( val => console.log(val) ); // outputs: // 0s => -9 // -3 // -4 // 1s => 0 // 2s => 1 // 3s => 2 // ... ``` ### zip 在所有內部 Observable 都發出值後,將他們的值以陣列型態發出。 (最後一個參數可以放映射函式。) ```javascript= let age$ = Rx.Observable.of<number>(27, 25, 29); let name$ = Rx.Observable.of<string>('Foo', 'Bar', 'Beer'); let isDev$ = Rx.Observable.of<boolean>(true, true, false); Rx.Observable .zip(age$, name$, isDev$, (age: number, name: string, isDev: boolean) => ({ age, name, isDev })) .subscribe(x => console.log(x)); // output: // [object Object] { // age: 27, // isDev: true, // name: "Foo" // } // [object Object] { // age: 25, // isDev: true, // name: "Bar" // } // [object Object] { // age: 29, // isDev: false, // name: "Beer" // } // 沒有附上最後一個 function 參數時的 outputs: // [object Object] { // age: 27, // isDev: true, // name: "Foo" // } // [object Object] { // age: 25, // isDev: true, // name: "Bar" // } // [object Object] { // age: 29, // isDev: false, // name: "Beer" // } // ``` ### pairwise 將前一個值與當前值以陣列發出。 ```javascript= import { pairwise, take } from 'rxjs/operators'; import { interval } from 'rxjs/observable/interval'; interval(1000).pipe( pairwise(), take(5) ) .subscribe(console.log); // outputs: // [0,1] // [1,2] // [2,3] // [3,4] // [4,5] ``` ## Filtering ### filter 當值符合條件時才發出。(即過濾掉不符合條件的值) ### sample 當給定的 observable 發出值時,對來源 observable 取樣。 ### debounceTime 當 observable 發出新值時,等待固定時間參數,確認期間沒有新值出現才將值發出。如果期間有新值,則暫存最新值並重新等待。 ### throttleTime 當 observable 發出值後,等待固定時間參數,期間如果有新值出現會被忽略,並在等待時間結束後發出一開始收到的值。 --- ## Transformation ### buffer 收集 Observable 的值,直到指定的 Observable 發出,才將收集的值以 Array 型態一次發出。 ```javascript= const myInterval = Rx.Observable.interval(1000); const myClick = Rx.Observable.fromEvent(document, 'click'); const bufferedInterval = myInterval.pipe( buffer(myClick) ); const subscribe = bufferedInterval.subscribe(val => console.log(val)); // 點擊畫面時,console 會印出與上次點擊時間內收集到的數字陣列。 // [0,1] // [2,3,4,5] // [6] ``` ### bufferCount 收集 Observable 的值,直到收集滿指定的數量,再以陣列型態發出。 ```javascript= const myInterval = Rx.Observable.interval(1000); const bufferedInterval = myInterval.pipe( bufferCount(3) ); const subscribe = bufferedInterval.subscribe(val => console.log(val)); // outputs: // 3s => [0,1,2] // 6s => [3,4,5] // 9s => [6,7,8] // ... ``` ### bufferTime / bufferToggle / bufferWhen ...略 ### map 將 Observable 值依給定的函式作映射。 ```javascript= const source = Rx.Observable.from([ { name: 'Joe', age: 30 }, { name: 'Peter', age: 28 }, { name: 'Ryan', age: 36 } ]); const example = source.map(person => person.name); const subscribe = example.subscribe( val => console.log(val)); // outputs: // Joe // Peter // Ryan ``` ### mapTo 將每個 Observable 發出的值映射成給定的固定值。 ```javascript= const source = Rx.Observable.interval(1000); const example = source.mapTo('foo'); const subscribe = example.subscribe( val => console.log(val) ); // outputs: // 1s => 'foo' // 2s => 'foo' // ... ``` ### pluck 提取 observable 其中的屬性來發出。 ```javascript= const source = Rx.Observable.from([ {name: 'Joe', age: 30, job: {title: 'Developer', language: 'JavaScript'}}, //will return undefined when no job is found {name: 'Sarah', age:35} ]); //grab title property under job const example = source.pluck('job', 'title'); //output: "Developer" , undefined const subscribe = example.subscribe(val => console.log(val)); // map 也可做到同樣的功能,但如果其中屬性有缺會報錯: // "TypeError: Cannot read property 'title' of undefined“ const example2 = source.map(val => val.job.title); const subscribe2 = example2.subscribe(val => console.log(val)); ``` ### reduce 在 observable 都完成後,發出累加值。 ```javascript= const source = Rx.Observable.of(1, 2, 3, 4); const example = source.reduce((acc,val) => acc + val); //output: Sum: 10' const subscribe = example.subscribe(val => console.log('Sum:', val)); ``` ### scan 在 observable 發出新值時,送出目前的累加值。 ```javascript= const source = of(1, 2, 3); const example = source.pipe(scan((acc, curr) => acc + curr, 0)); const subscribe = example.subscribe(val => console.log(val)); // outputs: // 1 // 3 // 6 ``` ### concatMap 如果內層有新的 observable 產生,會等待上一個 observable 完成後,才送出新的 observable 的值。 ```javascript= let source = Rx.Observable .fromEvent(document, 'click') .mapTo(1); let example = source.concatMap( val => Rx.Observable.interval(500).take(5)); example.subscribe( val => console.log(val) ) // 如果在上一個內部 observable 未完成前再點擊畫面,會等待上一個完成才送出新的 observable 的值。 ``` ### mergeMap 如果內層有新的 observable 產生,會產生的時間順序依序攤平送出。 ```javascript= let source = Rx.Observable .fromEvent(document, 'click') .mapTo(1); let example = source.mergeMap( val => Rx.Observable.interval(500).take(5)); example.subscribe( val => console.log(val) ) // 如果在上一個內部 observable 未完成前再點擊畫面,新的 observable 的值會依時間順序與上一個產生的值交錯出現。 ``` ### switchMap 如果內層有新的 observable 產生,會中斷未完成的,而送出新的 observable 的值。 ```javascript= let source = Rx.Observable .fromEvent(document, 'click') .mapTo(1); let example = source.switchMap( val => Rx.Observable.interval(500).take(5)); example.subscribe( val => console.log(val) ) // 如果在上一個內部 observable 未完成前再點擊畫面,會終止未完成的 observable 而送出新的 observable 的值。 ``` ## Util ### do / tap 用來執行額外的動作,如`conosle.log`。 在鏈式結構用 do,在 pipable operator 中用 tap。