# RxJS
https://rxjs.dev/
RxJS,全名Reactive Extension for JavaScript
目的是為了能夠更方便的處理非同步事件
- observables
- subscriptions
- observers
- creation functions
- pipable operators
- subjects
## observable
### array vs stream
array: data已經被定義好在那裡給我們使用,且很容易可以取得
stream: data可能從任何的時間點進來,我們不知道data什麼時候會出現
observable使用的是stream
先寫好如何互動的程式,再根據進來的資料判斷要怎麼互動,這也被叫做reactive programming
例子:
- 滑鼠移動
(x:5, y:6) -> (x:6, y:6) -> ...
- 在輸入欄裡打字
"" -> "T" -> "Te" -> "Tes" -> "Test"
### observable, observer, subscription
```typescript=
// Observable
const observable$ = new Observable(subscriber => {
subscriber.next('Alice');
subscriber.next('Ben');
})
// Observer
const observer = {
next: value => console.log(value)
}
//Subscription
observable$.subscribe(observer);
//Subscription裡面只放一個function的話,那個function會被當成next
observable$.subscribe(value => console.log(value));
```
- Observable
要建立一個Observable,要用new Observable()
裡面放callback function
observable像是一個function,每當我們subscribe他,他才會執行
- Observer
Observer描述了對於過來的data要做什麼樣的動作
- Subscription
Observable有一個subscribe method,會回傳Subscription
Subscription可以把Observer丟到Observable裡面執行callback
Subscription也有個unsubscribe method,當你不需要監聽時可以用它來關閉,解放記憶體
每一個Subscription都是獨立的,就算是用同樣的Observable跟Observer多次subscribe都一樣
## Marble Diagram
https://rxmarbles.com/
rxjs的流程可以畫成marble diagram,用來解釋資料流向

上面是資料流進來的時間點
中間是operator
下面是結果
- 圈圈
可以出現0到多次,表示那個時間點資料傳出去了
- 叉叉
最多一次,代表出現error
- 直線
最多一次,代表已完成(unsubscribe)
叉叉或直線後面不會有其他東西
出現錯誤或unsub之後observable不會再監聽
## Subscription
### lifecycle

### Teardown logic
當subscription結束時,會觸發teardown logic
- complete
```typescript=
const observable$ = new Observable<string>((subscriber) => {
setTimeout(() => {
subscriber.next("yep");
}, 2000);
subscriber.complete();
return () => {
console.log("It's all over");
};
});
observable$.subscribe({
next: (value) => console.log(value),
complete: () => console.log("complete"),
});
```

在observable裡可以return function
該function在complete被呼叫的時候才會執行
而呼叫完complete進到teardown階段之後,所有在observable裡正在執行中的process都會被捨棄掉
- error
```typescript=
const observable$ = new Observable<string>((subscriber) => {
setTimeout(() => {
subscriber.next("yep");
}, 2000);
setTimeout(() => {
subscriber.error(new Error("stop, you have violated the law"));
}, 4000);
return () => {
console.log("It's all over");
};
});
observable$.subscribe({
next: (value) => console.log(value),
error: (err) => console.log(err.message),
});
```

觸發error之後一樣會進到teardown階段
所以也會執行return的函式
- order
```typescript=
// error 4s, complete 2s
const observable$ = new Observable<string>((subscriber) => {
setTimeout(() => {
subscriber.error(new Error("stop, you have violated the law"));
}, 4000);
setTimeout(() => {
subscriber.next("yep");
subscriber.complete();
}, 2000);
return () => {
console.log("It's all over");
};
});
observable$.subscribe({
next: (value) => console.log(value),
complete: () => console.log("complete"),
error: (err) => console.log(err.message),
});
```

再次證明error或complete後面不會再有資料傳出
- unsubscribe
```typescript=
const observable$ = new Observable((subscriber) => {
let n = 1;
setInterval(() => {
subscriber.next(n++);
}, 2000);
return () => console.log("quit");
});
const sub = observable$.subscribe((value) => console.log(value));
setTimeout(() => {
console.log("unsub");
sub.unsubscribe();
}, 7000);
```

unsub一樣會觸發teardown
雖然unsub了,但程式還在無限迴圈中
因為我們沒有清除interval
所以要在teardown logic裡加上清除interval的動作
```typescript=
return () => {
clearInterval(interval);
console.log("quit");
};
```
這告訴我們要注意在離開observable之前要檢查是否有東西要手動清除
## Types of Observable
### cold
當一個observable所產出的data是基於它自身裡面的邏輯產出來的時候,我們可以說它是cold observable
cold observable建立出來的subscription的資料是獨立的,不會受到其他subscription影響
ex:傳送request的ajax做出來的observable
### hot
當一個observable產出的data是來自已經存在的另一個資料來源時,那就可以說它是hot observable,產出來的data不太跟observable內部邏輯相關,通常是基於該資料來源
這種observable會把資料multicast到所有subscription
ex:網頁的DOM,資料來源可能是DOM events
## Creation function(Creation operator)
上面做出observable都是用手動new Observable()
而creation function可以幫我們快速建立observable,並在裡面預先設定一些特定行為,像是ajax可以建立會發出request的observable
### of
使用of建立出來的observable就像參數很多的function
當subscribe之後會一個一個把參數放進next,然後complete
```typescript=
import { of } from "rxjs";
of("first", "second", "third").subscribe({
next: (value) => console.log(value),
complete: () => console.log("completed"),
});
```

### from
from可以把其他資料型別轉換成一個observable
像array,promise, 可迭代物件, observable......
放promise進去,被reject的話會觸發error function
做的事其實和of差不多
```typescript=
// 會跟上面的of有一樣結果
// array
const myPromise = new Promise((resolve, reject) => {
// 觸發complete
resolve("resolved");
// 觸發error
// reject("rejected");
});
from(["first", "second", "third"]).subscribe({
next: (value) => console.log(value),
complete: () => console.log("completed"),
});
// promise
const promise$ = from(myPromise);
promise$.subscribe({
next: (value) => console.log(value),
error: () => console.log("error"),
complete: () => console.log("completed"),
});
```

### fromEvent
fromEvent可以建立基於event的observable
像是DOM event, Node.js EventEmitter, aedes broker的event...
是hot observable
第一個參數放物件,第二個參數放event name
subscribe()可以當成是addEventListener()
unsubscribe()可以當成是removeEventListener()
```typescript=
const button = document.querySelector("#mybutton")!;
fromEvent<MouseEvent>(button, "click").subscribe({
next: (event) => console.log(event.x, event.y),
});
```

用個方法永遠不會complete,如果要停止監聽,就必須要unsubscribe
fromEvent unsub之後可以把EventListener完整關掉
不用像原本的observable要去手動移除EventListener
### timer
等待一段時間,送出資料後complete
```typescript=
timer(3000).subscribe({
next: (value) => console.log(value),
complete: () => console.log("completed"),
});
```
這裡的value會是0
timer跟setTimeout最大的差別在於,timer提前ubsub的時候會整個結束,而提前unsub有setTimeout的observable時,雖然也不會送出next,但setTimeout並不會結束,變成我們要手動去clearTimeout去解放記憶體
### interval
每隔一段時間送出資料
```typescript=
interval(3000).subscribe({
next: (value) => console.log(value),
});
// 0 1 2 3 4 5
```
這裡的value會從0開始一直送,每次送都+1
而如果unsub interval,這個interval會被自動清掉,不需要手動清除
### forkJoin
他所接收的參數是observable的array
當subscribe這個forkJoin,他會為傳入的所有observable建立subscription,然後等到他們都complete後,收集他們最後所傳出的data到一個array裡
array裡observable的順序即為最後回傳資料的順序
```typescript=
const order1$ = of("First", "Second");
const order2$ = of("Third", "Fourth");
forkJoin([order1$, order2$]).subscribe({
next: ([order1, order2]) => {
console.log(order1, order2);
},
});
```

若是有其中一個observable出現了error,forkJoin會立即觸發error function,所有還沒complete的subscription會立即被unsub,並依array順序執行teardown logic
### combineLatest
combineLatest跟forkJoin很像,但發出data的次數更頻繁
他所接收的參數一樣是observable的array
subscribe combineLatest時,為所有observable建立subscription
跟forkJoin不同的地方是,當所有subscription都發出過一次資料時,他就會發出最新資料的array
之後每當有subscription發出資料時,他就會更新該array並發出資料,直到所有subscription都complete
出現error一樣會全部unsub並執行teardown logic

## Pipable operator
可以把observable所傳出來的data轉換成各種形式
也可以寫出更複雜的邏輯
當observable傳出資料時,會先經過pipable operator,再進行下一個動作
同一個observable可以有很多個pipable operator
### filter
決定資料是否可以通過,只會對next進行filter
```typescript=
import { from, filter } from "rxjs";
const char$ = from(["a", "b", 1, 3, "e", 9, "q", 5]);
char$.pipe(filter((item) => typeof item === "number")).subscribe({
next: (value) => console.log(value),
});
// 1 3 9 5
```
也可以把pipe直接寫在observable後面變成新的observable
```typescript=
const char$ = from(["a", "b", 1, 3, "e", 9, "q", 5]).pipe(
filter((item) => typeof item === "number")
);
```
### map
會把資料轉變成其他型態,同樣只會對next進行map
基本上跟javascript array的map差不多
```typescript=
import { from, filter, map } from "rxjs";
const char$ = from([1, 3, 9, 5]).pipe(map((item) => item * 2));
char$.subscribe({
next: (value) => console.log(value),
});
//2 6 18 10
```
### tap
https://jaywoz.medium.com/information-is-king-tap-how-to-console-log-in-rxjs-7fc09db0ad5a
tap基本上不會影響到observable,他只會把pipe裡某個階段的data帶出來
通常用於debug
```typescript=
const char$ = from([1, 3, 9, 5]).pipe(
tap((item) => console.log(item)),
map((item) => item * 2),
tap((item) => console.log(item))
);
// 第一個tap:1 3 9 5
// 第二個tap:2 6 18 10
```
tap不是subscribe,所以如果該observable沒有被subscribe的話,tap的結果不會顯示出來
### debounceTime
設定一個秒數,當有資料傳進來時不會馬上傳出去並進行倒數,當在倒數時間裡有其他資料進來時會刷新秒數,直到沒有資料再進來,秒數歸零時,傳出最近一次的資料
```typescript=
const clicks = fromEvent(document, 'click');
const result = clicks.pipe(debounceTime(1000));
result.subscribe(x => console.log(x));
```
https://rxjs.dev/api/operators/debounceTime

可以用在當一個微小的變動就會發出request或要進行複雜的邏輯的情形,避免被吃資源
### catchError
只會在error發生的時候觸發,不會對next跟complete做出反應
在catchError裡傳入一個observable,當偵測到error發生時,不會觸發error function,而是會改去subscribe catchError裡的observable,並直到他complete
且在catchError裡產生的資料一樣會被傳出來
當catchError complete時,原本的subscription也會complete
若是在catchError又發生error,則會觸發原本subscription的error function
```typescript=
const counting$ = new Observable((subscriber) => {
for (let i = 0; i < 5; i++) {
if (i === 4) {
subscriber.error(new Error("no four"));
}
subscriber.next(i);
}
});
counting$.pipe(catchError((error) => of(5, 6, 7, 8, 9))).subscribe({
next: (value) => console.log(value),
complete: () => console.log("completed"),
});
// 0 1 2 3 5 6 7 8 9 completed
```

如果你不想提供任何資料,可以使用內建的observable:EMPTY
EMPTY只會complete,並不會觸發next
在catchError裡也可以讓程式重新再跑一次該subscription
此時catchError的參數要設成2個
```typescript=
counting$
.pipe(
catchError((error, caught) => {
return caught$;
}),
take(30)
)
.subscribe({
next: (value) => console.log(value),
complete: () => console.log("completed"),
});
```
## Pipable operator: Flattening operator
flattening operator跟map其實很像,但他最後map成了observable
flattening operator會在subscription裡面再建立一個subscription
通常是放在next裡,並且將inner subscription傳出來的資料當作他自己傳出來的資料,error也會被傳出來,但complete不會被傳出來
### concatMap
https://rxjs.dev/api/operators/concatMap
```typescript=
const clicks = fromEvent(document, 'click');
const result = clicks.pipe(
concatMap(event => of(1))
);
result.subscribe(x => console.log(x));
```
### error handling
一旦在一個永不complete的subscription裡,inner subscription傳出error,那outer subscription就會立刻停止
如果要讓outer subscription不停止,需要error handling
如果直接在concatMap後面加catchError(() => EMPTY),因為EMPTY會產生complete,一樣會停止
但要是加在concatMap的pipe裡面,因為concatMap不會把complete傳出來的關係,所以會繼續運作
```typescript=
const test$ = new Observable((subscriber) => {
console.log('test');
subscriber.error('error');
});
const clicks = fromEvent(document, 'click');
const result = clicks.pipe(
concatMap(() => test$.pipe(catchError(() => EMPTY)))
);
result.subscribe((x) => console.log(x));
```
### concurrency
下面有幾種flattening operator,每個對concurrency的機制都不同
- concatMap
重視順序,舊的inner subscription執行完再用新的資料執行新的,期間等待執行的資料會被存在queue裡,很容易可以注意到memory leak的問題
最安全的作法,同時也是最慢的做法

- switchMap
當有新資料進來,會unsub目前的subscription,立刻執行下一個subscription,意即同一時間只會有正好一個inner subscription執行,不太會有memory leak的問題
可以確保得到的資料是最新的
但這個subscription的內容如果是送request過去且收到response後complete,用了這個方法雖然不會收到已經unsub的資料,但request還是可能接觸到server

- mergeMap
可以有多個inner subscription同時運行
一有資料就馬上傳出來,不太能確認資料的順序
沒有控管好的話,大量資料進來可能導致memory leak

## Subject
subject同時是observable跟observer
subject會multicast資訊到每個subscribe他的subscription,所以subject算是hot observable
multicast的方式是subject.next()
簡單來說subject會把observer存在一個清單,並在收到資料的時候尋遍那個清單並把資料傳出去
把subject當成observer的subscription,每當資料進來就會呼叫next(),因此subject也會同時把收到的資料multicast到observer清單裡
但subscription的complete跟error也會被multicast,造成那些subscription的終止,因此要注意
```typescript=
const subject$ = new Subject();
subject$.subscribe((value) => console.log("x"));
subject$.next("");
console.log(subject$);
of("1").subscribe(subject$);
console.log(subject$);
```

### Subject的種類
當我們subscribe subject的當下,沒有辦法知道他的狀態,即沒辦法拿到他之前發出的資料,只有他發出下一個資料才能取得目前狀態
而下列幾個方法有辦法讓我們拿到subject的狀態
- BehaviorSubject
```typescript=
const subject$ = new BehaviorSubject<string | number>("initial");
// initial
subject$.subscribe((value) => console.log(value));
// u
subject$.next("u");
```
要建立這個subject,必須要給他初始值來代表他起始的狀態
當subscribe他的時候,BehaviorSubject會回傳他目前的狀態
- ReplaySubject
```typescript=
const subject$ = new ReplaySubject(2);
subject$.next("u");
subject$.next("and");
subject$.next("me");
// and, me
subject$.subscribe((value) => console.log(value));
```
當你希望在subscribe後重新發送最後的幾個元素時可以用這個方法
### multicasted observable
https://ithelp.ithome.com.tw/articles/10188750