# RxJS Operator
## Creation
### create
引入給定的訂閱函式來創建 Observable。
```javascript
const hello = Rx.Observable.create(function(observer) {
observer.next('Hello');
observer.next('World');
});
const subscribe = hello.subscribe(val => console.log(val));
// outputs:
// Hello
// World
```
### empty
返回一個立即完成的空 Observable。
```javascript
import { empty } from 'rxjs/observable/empty';
const subscribe = example.subscribe({
next: () => console.log('Next'),
complete: () => console.log('Complete!')
});
//output: 'Complete!'
```
### from
將 array、promise、iterator 轉成 Observable。
- 用在 promise 上等同 fromPromise。
- 用在 array、iterator、string 上,其內容會依序發出。
- array 第二層以上會維持數列型態。
-
```javascript
const arraySource = Rx.Observable.from([1,2,3,4,5, [8,9]]);
const subscribe = arraySource.subscribe(val => console.log(val));
// outputs:
// 1
// 2
// 3
// 4
// 5
// [8,9]
const promiseSource = Rx.Observable.from( new Promise( resolve => resolve('Hello World')));
const subscribe2 = promiseSource.subscribe(val => console.log(val));
// outputs:
// "Hello World"
```
### fromEvent
將事件轉成 Observable。
```javascript
//create observable that emits click events
const source = Rx.Observable.fromEvent(document, 'click');
//map to string with given event timestamp
const example = source.map(event => `Event time: ${event.timeStamp}`)
//output (example): 'Event time: 7276.390000000001'
const subscribe = example.subscribe(val => console.log(val));
```
### of
用來發出任意數量的任意值。
- 任意值除了number、string 外,還包含 Array、Object、function。
- Array、Object 等不會被攤平。
```javascript
const source = Rx.Observable.of(1,2, {a: 3}, [4,5,6], function foo() { return bar; });
const subscription = source.subscribe(val => console.log(val));
// outputs:
// 1
// 2
// [object Object] {
// a: 3
// }
// [4,5,6]
// function foo() { return bar; }
```
### interval
在給定的時間參數間隔,發出由 0 起始的數字序列。
```javascript
const source = Rx.Observable.interval(1000);
const subcribe = source.subscribe(cal => console.log(val));
// outputs:
// 1s => 0
// 2s => 1
// 3s => 2
// ...
```
### timer
給定初始延遲與時間間隔,依序發出由 0 起始的數字序列。
```javascript
const source = Rx.Observable.timer(3000, 1000);
const subscribe = source.subscribe(val => console.log(val));
// outputs:
// 3s => 0
// 4s => 1
// 5s => 2
// ...
```
### range
發出給定數字區間內的數字序列。
```javascript
const source = Rx.Observable.range(5,8);
const subscribe = source.subscribe( val => console.log(val));
// outputs:
// 5
// 6
// 7
// 8
```
### throw
發出錯誤訊息
```javascript
const = Rx.Observable.throw('This is an Error');
const subscribe = source.subscribe({
next: val => console.log(val);
complete: () => console.log('Complete!');
error: val => console.log(val);
});
// outputs:
// This is an Error
```
---
## Combination
### combineLatest
當每個 Observable 都發出值時,combineLatest 會發出**每個Observable 的最新值**(會以 Array 形式發出
- 只要其中任一 Observable 發出新值,就會此Operator 就會發出每個 Observable 目前的值。
- 當每個 Observable 都已發出初始值之後,才會出現第一個值。因此如果其中任一個未被觸發,這個訂閱會一直沒有出現初始值。
- 需要順序性,用 concatAll / concateMap
- 每個值都需要出現,用 merge / mergeAll / mergeMap
- 只想拿到最新值,用combinedLatest / combinedAll
```javascript
const t1 = Rx.Observable.timer(1000, 4000);
// 1s 後發出第一個值,之後依序每 4s 發出值。
const t2 = Rx.Observable.timer(2000, 4000);
// 2s 後發出第一個值,之後依序每 4s 發出值。
const t3 = Rx.Observable.timer(3000, 4000);
// 3s 後發出第一個值,之後依序每 4s 發出值。
const combined = Rx.Observable.combineLatest(t1, t2, t3);
const subscription = combined.subscribe( result => {
console.log(result);
});
// 3s 印出 [0, 0, 0]
// 4s 印出 [1, 0, 0]
// 5s 印出 [1, 1, 0]
// 6s 印出 [1, 1, 1]
// 9s 印出 [2, 1, 1]
// ...
```
combinedLatest 還可接受最後一個可選的 callback function,接受前面的Observable 發出的值當參數做處理。上例可改如下:
```javascript
const combined = Rx.Observable.combinedLatest(t1, t2, t3, (r1, r2, r3) => {
// 做處理後回傳
return [r1, r2, r3];
});
```
### combinedAll
等待外部 Observable 結束後,將所有內部的 Observables 用 combinedLatest 作合併。
### concat
前一個訂閱完成後,才開始下一個訂閱。
(將多個 Observable 依給定的 Observable 的順序攤平。)
如果前一個訂閱沒有結束,後一個訂閱就一直不會送值出來。
```javascript
// 每秒送出一個序列值,並只取前三值;
// 如果沒有 .take(), sourceTwo 將不會被取值。
const sourceOne = Rx.Observable.interval(1000).take(3);
// 同時發出 4,5,6
const sourceTwo = Rx.Observable.of(4,5,6);
// emit values from sourceOne, when complete, subscribe to sourceTwo
const example = sourceOne.concat(sourceTwo);
const subscribe = example.subscribe(val => console.log('Example: Basic concat:', val));
// 1s 收到 0
// 2s 收到 1
// 3s 收到 2 4 5 6
```
### concateAll
訂閱多個發出 Observable 的 Observable,並在前一個內部的 Observable 完成後,才接著訂閱下一個。
### forkJoin
當所有 Observable **已完成**時,發出每個 Observable 的最新值。
- 如果其中某個 Observable 一直未完成的話,forkJoin 將不會發出值。
### merge
將多個 Observable 依每個值出現的時間順序攤平。
```javascript=
const first = Rx.Observable.interval(3000);
const second = Rx.Observable.interval(1000);
const example = first.merge(second); //實例方法
// 也可使用靜態方法 Rx.Observable.map(first, second);
const subscribe = example.subscribe(val => console.log(val));
// outputs:
// 1s => 0
// 2s => 1
// 3s => 0
// 2
// 4s => 3
// 5s => 4
// 6s => 1
// 5
// ...
```
### mergeAll
訂閱多個發出 Observable 的 Observable,並將內部的 Observable 依時間順序攤平。
### race
在多個 Observable 中,只取最先發出的值。
```javascript=
const source1 = Rx.Observable.interval(2500).mapTo(2300);
const source2 = Rx.Observable.interval(500).mapTo(500);
const source3 = Rx.Observable.interval(1000).mapTo(1100);
const example = Rx.Observable.race( source1, source2, source3 );
const subscribe = example.subscribe(val => console.log(val));
// outputs:
// 0.5s => 500
// 1s => 500
// 1.5s => 500
// ...
```
### startWith
在發出的值前面加上一個給定的值。
```javascript=
const source = Rx.Observable.interval(1000);
const example = source.startWith(-3,-4).startWith(-9);
const subscribe = example.subscribe( val => console.log(val) );
// outputs:
// 0s => -9
// -3
// -4
// 1s => 0
// 2s => 1
// 3s => 2
// ...
```
### zip
在所有內部 Observable 都發出值後,將他們的值以陣列型態發出。
(最後一個參數可以放映射函式。)
```javascript=
let age$ = Rx.Observable.of<number>(27, 25, 29);
let name$ = Rx.Observable.of<string>('Foo', 'Bar', 'Beer');
let isDev$ = Rx.Observable.of<boolean>(true, true, false);
Rx.Observable
.zip(age$, name$, isDev$,
(age: number, name: string, isDev: boolean) => ({ age, name, isDev }))
.subscribe(x => console.log(x));
// output:
// [object Object] {
// age: 27,
// isDev: true,
// name: "Foo"
// }
// [object Object] {
// age: 25,
// isDev: true,
// name: "Bar"
// }
// [object Object] {
// age: 29,
// isDev: false,
// name: "Beer"
// }
// 沒有附上最後一個 function 參數時的 outputs:
// [object Object] {
// age: 27,
// isDev: true,
// name: "Foo"
// }
// [object Object] {
// age: 25,
// isDev: true,
// name: "Bar"
// }
// [object Object] {
// age: 29,
// isDev: false,
// name: "Beer"
// }
//
```
### pairwise
將前一個值與當前值以陣列發出。
```javascript=
import { pairwise, take } from 'rxjs/operators';
import { interval } from 'rxjs/observable/interval';
interval(1000).pipe(
pairwise(),
take(5)
)
.subscribe(console.log);
// outputs:
// [0,1]
// [1,2]
// [2,3]
// [3,4]
// [4,5]
```
## Filtering
### filter
當值符合條件時才發出。(即過濾掉不符合條件的值)
### sample
當給定的 observable 發出值時,對來源 observable 取樣。
### debounceTime
當 observable 發出新值時,等待固定時間參數,確認期間沒有新值出現才將值發出。如果期間有新值,則暫存最新值並重新等待。
### throttleTime
當 observable 發出值後,等待固定時間參數,期間如果有新值出現會被忽略,並在等待時間結束後發出一開始收到的值。
---
## Transformation
### buffer
收集 Observable 的值,直到指定的 Observable 發出,才將收集的值以 Array 型態一次發出。
```javascript=
const myInterval = Rx.Observable.interval(1000);
const myClick = Rx.Observable.fromEvent(document, 'click');
const bufferedInterval = myInterval.pipe( buffer(myClick) );
const subscribe = bufferedInterval.subscribe(val => console.log(val));
// 點擊畫面時,console 會印出與上次點擊時間內收集到的數字陣列。
// [0,1]
// [2,3,4,5]
// [6]
```
### bufferCount
收集 Observable 的值,直到收集滿指定的數量,再以陣列型態發出。
```javascript=
const myInterval = Rx.Observable.interval(1000);
const bufferedInterval = myInterval.pipe( bufferCount(3) );
const subscribe = bufferedInterval.subscribe(val => console.log(val));
// outputs:
// 3s => [0,1,2]
// 6s => [3,4,5]
// 9s => [6,7,8]
// ...
```
### bufferTime / bufferToggle / bufferWhen
...略
### map
將 Observable 值依給定的函式作映射。
```javascript=
const source = Rx.Observable.from([
{ name: 'Joe', age: 30 },
{ name: 'Peter', age: 28 },
{ name: 'Ryan', age: 36 }
]);
const example = source.map(person => person.name);
const subscribe = example.subscribe( val => console.log(val));
// outputs:
// Joe
// Peter
// Ryan
```
### mapTo
將每個 Observable 發出的值映射成給定的固定值。
```javascript=
const source = Rx.Observable.interval(1000);
const example = source.mapTo('foo');
const subscribe = example.subscribe( val => console.log(val) );
// outputs:
// 1s => 'foo'
// 2s => 'foo'
// ...
```
### pluck
提取 observable 其中的屬性來發出。
```javascript=
const source = Rx.Observable.from([
{name: 'Joe', age: 30, job: {title: 'Developer', language: 'JavaScript'}},
//will return undefined when no job is found
{name: 'Sarah', age:35}
]);
//grab title property under job
const example = source.pluck('job', 'title');
//output: "Developer" , undefined
const subscribe = example.subscribe(val => console.log(val));
// map 也可做到同樣的功能,但如果其中屬性有缺會報錯:
// "TypeError: Cannot read property 'title' of undefined“
const example2 = source.map(val => val.job.title);
const subscribe2 = example2.subscribe(val => console.log(val));
```
### reduce
在 observable 都完成後,發出累加值。
```javascript=
const source = Rx.Observable.of(1, 2, 3, 4);
const example = source.reduce((acc,val) => acc + val);
//output: Sum: 10'
const subscribe = example.subscribe(val => console.log('Sum:', val));
```
### scan
在 observable 發出新值時,送出目前的累加值。
```javascript=
const source = of(1, 2, 3);
const example = source.pipe(scan((acc, curr) => acc + curr, 0));
const subscribe = example.subscribe(val => console.log(val));
// outputs:
// 1
// 3
// 6
```
### concatMap
如果內層有新的 observable 產生,會等待上一個 observable 完成後,才送出新的 observable 的值。
```javascript=
let source = Rx.Observable
.fromEvent(document, 'click')
.mapTo(1);
let example = source.concatMap( val => Rx.Observable.interval(500).take(5));
example.subscribe( val => console.log(val) )
// 如果在上一個內部 observable 未完成前再點擊畫面,會等待上一個完成才送出新的 observable 的值。
```
### mergeMap
如果內層有新的 observable 產生,會產生的時間順序依序攤平送出。
```javascript=
let source = Rx.Observable
.fromEvent(document, 'click')
.mapTo(1);
let example = source.mergeMap( val => Rx.Observable.interval(500).take(5));
example.subscribe( val => console.log(val) )
// 如果在上一個內部 observable 未完成前再點擊畫面,新的 observable 的值會依時間順序與上一個產生的值交錯出現。
```
### switchMap
如果內層有新的 observable 產生,會中斷未完成的,而送出新的 observable 的值。
```javascript=
let source = Rx.Observable
.fromEvent(document, 'click')
.mapTo(1);
let example = source.switchMap( val => Rx.Observable.interval(500).take(5));
example.subscribe( val => console.log(val) )
// 如果在上一個內部 observable 未完成前再點擊畫面,會終止未完成的 observable 而送出新的 observable 的值。
```
## Util
### do / tap
用來執行額外的動作,如`conosle.log`。
在鏈式結構用 do,在 pipable operator 中用 tap。