# 數值轉 Observable
## of
- 直接將基礎型態的變數轉為 Observable
```ts
const obs: Observable<string> = of("");
const obs: Observable<number> = of(1);
const obs: Observable<boolean> = of(true);
```
- 多筆 data 時
```ts
of(1,2,3).subscribe({
next: val => {
console.log(val);
// log: 1
// log: 2
// log: 3
}
});
```
## from
- 直接將基礎型態的變數轉為 Observable (注意跟 `of` 傳入的參數不一樣之處)
```ts
const obs: Observable<string> = from([""]);
const obs: Observable<number> = from([1]);
const obs: Observable<boolean> = from([true]);
```
- 多筆 data 時 (注意跟 `of` 傳入的參數不一樣之處)
```ts
from([1,2,3]).subscribe({
next: val => {
console.log(val);
// log: 1
// log: 2
// log: 3
}
});
```
## NEVER
- 直接回傳一個不會往下送的 Observable
```ts
NEVER
.pipe(tap(x => console.log("tap", x)))
.subscribe({
next: data => console.log("data", data)
})
// 什麼 log 都沒跑
```
# Observable 資料處理
## tap
- 純屬收資料
```ts
from([1,2,3]).pipe(
tap((val)=> {
console.log("val", val + 1);
})
).subscribe({
next: result => {
console.log("result", result);
}
});
// val: 2
// result: 1
// val: 3
// result: 2
// val: 4
// result: 3
```
- 需注意當資料是物件時的行為(object or array),當 `tap` 修改物件 property,會`影響`後續的使用
```ts
from([{
num: 1
}, {
num: 2
}, {
num: 3
}]).pipe(
tap((data)=> {
data.num += 1;
console.log("val", data.num);
})
).subscribe({
next: result => {
console.log("result", result.num);
}
});
// val: 2
// result: 2
// val: 3
// result: 3
// val: 4
// result: 4
```
## map
- 將資料作轉換
```ts
from([1,2,3]).pipe(
map((val)=> {
console.log("val", val + 1);
return val + 2;
})
).subscribe({
next: result => {
console.log("result", result);
}
});
// val: 2
// result: 3
// val: 3
// result: 4
// val: 4
// result: 5
```
- 由於 map 的下一個 pipe 是收 map 傳遞出去的 data 所以修改 data property 不會影響後續 pipe 的結果(但不排除從 from 傳入的資料有人使用)
```ts
from([{
num: 1
}, {
num: 2
}, {
num: 3
}]).pipe(
map((data)=> {
const num = data.num;
data.num += 1;
console.log("val", data.num);
return {
num: num
};
})
).subscribe({
next: result => {
console.log("result", result.num);
}
});
// val: 2
// result: 1
// val: 3
// result: 2
// val: 4
// result: 3
```
## filter
- 過濾資料,當 filter 條件為 false 時將不往下一個 pipe 發送
- 在 typescript 下相對不建議使用,因為 filter 不能將型態過濾
```ts
timer(0, 100)
.pipe(filter(data => data % 2 === 0))
.subscribe({
next: data => console.log(data)
});
// 0
// 2
// 4
// ...
```
# 資料匯整
## Scan
```ts
from([1,2,3,4,5])
.pipe(
// 設定初始值為 0
scan((total, n) => total + n, 0)
)
.subscribe({
next: data => console.log(data)
});
// 1
// 3
// 6
// 10
// 15
```
## Reduce
```ts
from([1,2,3,4,5])
.pipe(
// 設定初始值為 0
reduce((total, n) => total + n, 0)
)
.subscribe({
next: data => console.log(data)
});
// 15
```
## 比較
- Scan: 可以每次將匯整的資料往後送
- Reduce: 只會將最後結果往後送
- Scan & Reduce 如果沒設定初始值,第一筆資料就是初始值,此時將不會跑進 callback 內
# 定期處理資料
- 常用的是 interval & timer
- 需注意這類型處理方式,沒有結束的時候,需要透過 take / takeUntil 這類指定時間終止的 pipe 才能終止 observable
## interval
- 延遲`N`秒後執行第一次,下一次將延遲`N`秒後執行
```ts
interval(2000).subscribe();
// 2000ms => 第一次執行
// 4000ms => 第二次執行
// ...
```
## timer
- 延遲`X`秒後執行第一次,下一次將延遲`Y`秒後執行
```ts
timer(1000, 2000).subscribe();
// 1000ms => 第一次執行
// 3000ms => 第二次執行
// ...
```
- 指定時間執行第一次,下一次將延遲`Y`秒後執行
```ts
const date = new Date(2024, 6, 7, 0, 0, 0, 0);
timer(date, 2000).subscribe();
// 2024/07/07 00:00:00.000 => 第一次執行
// 2024/07/07 00:00:02.000 => 第二次執行
// ...
```
# 非同步資料處理
- 有以下四個常用的
- switchMap
- exhaustMap
- concatMap
- mergeMap
## SwitchMap
- 當有新的串流進來時,使用新的串流資料,執行到一半的將捨棄
```ts
interval(3000).pipe(
switchMap(() => timer(0, 1000))
).subscribe(data => {
console.log(data);
});
// interval: 0, timer: 0
// timer: 1
// timer: 2
// interval: 1, timer: 0 (因為 interval 來了一筆新串流,原本的 timer 串流直接捨棄
// timer: 1
// timer: 2
```
## ConcatMap
- 當原有串流結束時,才會開始原本該執行的串流
- Case1
```ts
interval(3000).pipe(
concatMap(() => timer(0, 1000))
).subscribe(data => {
console.log(data);
});
// interval: 0, timer: 0
// interval: 0, timer: 1
// interval: 0, timer: 2
// interval: 0, timer: 3
// interval: 0, timer: 4
// interval: 0, timer: 5
// interval: 0, timer: 6
// ... 由於 timer 沒結束的機會,所以 interval 的新資料遲遲無法執行
```
- Case2
```ts
interval(3000).pipe(
concatMap(() => timer(0, 1000).pipe(take(3)))
).subscribe(data => {
console.log(data);
});
// interval: 0, timer: 0
// interval: 0, timer: 1
// interval: 0, timer: 2
// interval: 1, timer: 0 (由於 timer 只拿3個,所以 interval: 1 可以排入串流中)
// interval: 1, timer: 1
// interval: 1, timer: 2
// ...
```
## MergeMap
- 將資料串流轉換
```ts
timer(0, 3000).pipe(
mergeMap(id => {
// 產生新的串流 timer
return timer(0, 1500)
.pipe(map(data => `資料流 ${id}: ${data}`));
}
).subscribe(result => {
console.log(result);
});
// 資料流編號 = 外層 timer
// 0ms => 資料流 0: 0
// 1500ms => 資料流 0: 1
// 3000ms => 資料流 1: 0 (外層 timer 新資料)
// 3000ms => 資料流 0: 2 (外層 timer 第一次產生的內層 timer未結束)
// ... 結局 => 3000ms 產生一個 1500ms 間隔 的 timer
```
## ExhaustMap
- 當原本的資料串流沒結束前,新的資料要進來會進不來,而且會直接捨棄
```ts
interval(2000).pipe(
exhaustMap((id) => {
return timer(0, 1000).pipe(
take(3),
map(data => {
return {id, data};
})
)
})
).subscribe(data => {
console.log(data);
});
// interval: 0, timer: 0
// interval: 0, timer: 1 (interval: 1 有資料進來,但 timer 沒結束所以直接捨棄 interval: 1)
// interval: 0, timer: 2
// interval: 2, timer: 0 (由於 timer 只拿3個,所以 interval: 2 可以排入串流中)
// interval: 2, timer: 1 (interval: 3 有資料進來,但 timer 沒結束所以直接捨棄 interval: 3)
// interval: 2, timer: 2
// ...
```
# 資料節流
## ThrottleTime
- 當`N`秒內有多筆資料只取一筆,可以取`第一筆`或`N秒後最後一筆`
```ts
timer(0, 100).pipe(
take(9),
throttleTime(700)
).subscribe({
next: (data) => {
console.log(data);
}
});
// 0ms => data: 0
// 100ms => data: 1 => ThrottleTime 700 => data: 1 keep
// 200ms => data: 2 => ThrottleTime 700 => data: 1 drop => data: 2 keep
// 300ms => data: 3 => ThrottleTime 700 => data: 2 drop => data: 3 keep
// 400ms => data: 4 => ThrottleTime 700 => data: 3 drop => data: 4 keep
// 500ms => data: 5 => ThrottleTime 700 => data: 4 drop => data: 5 keep
// 600ms => data: 6 => ThrottleTime 700 => data: 5 drop => data: 6 keep
// 700ms => commit 7
// 800ms => data: 8 => ThrottleTime 700 => data: 8 keep
// ...
// 1400ms => 因為沒資料進來 commit 8
```
## DebounceTime
- 當`N`秒內有多筆資料只取最後一筆,但`N`秒內如果有新資料,就重新計時`N`秒
```ts
timer(0, 100).pipe(
take(3),
debounceTime(300)
).subscribe({
next: (data) => {
console.log(data);
}
});
// 0ms => data: 0 => DebounceTime 300 => data: 0 keep
// 100ms => data: 1 => DebounceTime 300 => data: 0 drop & data: 1 keep
// 200ms => data: 2 => DebounceTime 300 => data: 1 drop & data: 2 keep
// ...
// 500ms => 因為沒資料進來 commit 2
```
## BufferTime
- 當`N`秒內有多筆資料時,彙整成一個陣列
```ts
timer(0, 100)
.pipe(bufferTime(500))
.subscribe(x => console.log(x));
// [0, 1, 2, 3, 4]
// [5, 6, 7, 8, 9]
// ...
```
## 比較
- ThrottleTime 是`N`秒後取一筆,但`第一筆`一定會送出,下一筆則是`N`秒後的第一筆或最後一筆
- DebounceTime 是`N`秒後取一筆,但如果持續有資料進來會導致一直送不出資料
- BufferTime 是`N`秒後資料彙整,如果要做合併上的資料客製,會是比較好的選擇
# Subject
- Subject 種類
- Subject: 一般subject,沒keep資料
- BehaviorSubject: 會keep 最後一筆資料,但需要給初始值,也可以取得最後狀態
- ReplaySubject: 會 keep 資料,但 keep 筆數取決於 new 時的數量,預設無限直到記憶體不夠
- Subject 也屬於不會終止的類型,所以當需要終止時,還是要用 `take` / `takeUntil` 等限制筆數的方式來讓 subject 終止,也可以用 subject 的 `complete` 來終止
- Subject 屬於可以讓外部使用者修改資料的類別,所以在可控範圍內可以允許使用 subject 物件,如果有要對外使用,還是建議使用 `asObservable` 將 `subject` 轉換為 `observable`,避免資料被修改
## Subject
```ts
const subject = new Subject<number>();
subject.next(1);
subject.subscribe({
next: data => {
console.log(`data: ${data}`);
}
});
subject.next(2);
subject.next(3);
// data: 2
// data: 3
// 1 不會印,因為送出時未subscribe
```
## BehaviorSubject
- 當 subject 使用
```ts
const subject = new BehaviorSubject<number>(0);
subject.next(1);
subject.subscribe({
next: data => {
console.log(`data: ${data}`);
}
});
subject.next(2);
subject.next(3);
// data: 1
// data: 2
// data: 3
```
- 直接取值
```ts
const subject = new BehaviorSubject<number>(0);
console.log(subject.getValue());
subject.next(1);
console.log(subject.getValue());
// data: 0
// data: 1
```
## ReplaySubject
- 無限存
```ts
const subject = new ReplaySubject<number>();
subject.next(1);
subject.next(2);
subject.next(3);
subject.subscribe({
next: data => {
console.log(`data: ${data}`);
}
});
// data: 1
// data: 2
// data: 3
```
- 限制筆數
```ts
// 限制 replay 2筆
const subject = new ReplaySubject<number>(2);
subject.next(1);
subject.next(2);
subject.next(3);
subject.subscribe({
next: data => {
console.log(`data: ${data}`);
}
});
// data: 2
// data: 3
// 因為限制 2 筆,可是送出 3 筆,第 1 筆將在送出第 3 筆時 drop掉
```