# 數值轉 Observable ## of - 直接將基礎型態的變數轉為 Observable ```ts const obs: Observable<string> = of(""); const obs: Observable<number> = of(1); const obs: Observable<boolean> = of(true); ``` - 多筆 data 時 ```ts of(1,2,3).subscribe({ next: val => { console.log(val); // log: 1 // log: 2 // log: 3 } }); ``` ## from - 直接將基礎型態的變數轉為 Observable (注意跟 `of` 傳入的參數不一樣之處) ```ts const obs: Observable<string> = from([""]); const obs: Observable<number> = from([1]); const obs: Observable<boolean> = from([true]); ``` - 多筆 data 時 (注意跟 `of` 傳入的參數不一樣之處) ```ts from([1,2,3]).subscribe({ next: val => { console.log(val); // log: 1 // log: 2 // log: 3 } }); ``` ## NEVER - 直接回傳一個不會往下送的 Observable ```ts NEVER .pipe(tap(x => console.log("tap", x))) .subscribe({ next: data => console.log("data", data) }) // 什麼 log 都沒跑 ``` # Observable 資料處理 ## tap - 純屬收資料 ```ts from([1,2,3]).pipe( tap((val)=> { console.log("val", val + 1); }) ).subscribe({ next: result => { console.log("result", result); } }); // val: 2 // result: 1 // val: 3 // result: 2 // val: 4 // result: 3 ``` - 需注意當資料是物件時的行為(object or array),當 `tap` 修改物件 property,會`影響`後續的使用 ```ts from([{ num: 1 }, { num: 2 }, { num: 3 }]).pipe( tap((data)=> { data.num += 1; console.log("val", data.num); }) ).subscribe({ next: result => { console.log("result", result.num); } }); // val: 2 // result: 2 // val: 3 // result: 3 // val: 4 // result: 4 ``` ## map - 將資料作轉換 ```ts from([1,2,3]).pipe( map((val)=> { console.log("val", val + 1); return val + 2; }) ).subscribe({ next: result => { console.log("result", result); } }); // val: 2 // result: 3 // val: 3 // result: 4 // val: 4 // result: 5 ``` - 由於 map 的下一個 pipe 是收 map 傳遞出去的 data 所以修改 data property 不會影響後續 pipe 的結果(但不排除從 from 傳入的資料有人使用) ```ts from([{ num: 1 }, { num: 2 }, { num: 3 }]).pipe( map((data)=> { const num = data.num; data.num += 1; console.log("val", data.num); return { num: num }; }) ).subscribe({ next: result => { console.log("result", result.num); } }); // val: 2 // result: 1 // val: 3 // result: 2 // val: 4 // result: 3 ``` ## filter - 過濾資料,當 filter 條件為 false 時將不往下一個 pipe 發送 - 在 typescript 下相對不建議使用,因為 filter 不能將型態過濾 ```ts timer(0, 100) .pipe(filter(data => data % 2 === 0)) .subscribe({ next: data => console.log(data) }); // 0 // 2 // 4 // ... ``` # 資料匯整 ## Scan ```ts from([1,2,3,4,5]) .pipe( // 設定初始值為 0 scan((total, n) => total + n, 0) ) .subscribe({ next: data => console.log(data) }); // 1 // 3 // 6 // 10 // 15 ``` ## Reduce ```ts from([1,2,3,4,5]) .pipe( // 設定初始值為 0 reduce((total, n) => total + n, 0) ) .subscribe({ next: data => console.log(data) }); // 15 ``` ## 比較 - Scan: 可以每次將匯整的資料往後送 - Reduce: 只會將最後結果往後送 - Scan & Reduce 如果沒設定初始值,第一筆資料就是初始值,此時將不會跑進 callback 內 # 定期處理資料 - 常用的是 interval & timer - 需注意這類型處理方式,沒有結束的時候,需要透過 take / takeUntil 這類指定時間終止的 pipe 才能終止 observable ## interval - 延遲`N`秒後執行第一次,下一次將延遲`N`秒後執行 ```ts interval(2000).subscribe(); // 2000ms => 第一次執行 // 4000ms => 第二次執行 // ... ``` ## timer - 延遲`X`秒後執行第一次,下一次將延遲`Y`秒後執行 ```ts timer(1000, 2000).subscribe(); // 1000ms => 第一次執行 // 3000ms => 第二次執行 // ... ``` - 指定時間執行第一次,下一次將延遲`Y`秒後執行 ```ts const date = new Date(2024, 6, 7, 0, 0, 0, 0); timer(date, 2000).subscribe(); // 2024/07/07 00:00:00.000 => 第一次執行 // 2024/07/07 00:00:02.000 => 第二次執行 // ... ``` # 非同步資料處理 - 有以下四個常用的 - switchMap - exhaustMap - concatMap - mergeMap ## SwitchMap - 當有新的串流進來時,使用新的串流資料,執行到一半的將捨棄 ```ts interval(3000).pipe( switchMap(() => timer(0, 1000)) ).subscribe(data => { console.log(data); }); // interval: 0, timer: 0 // timer: 1 // timer: 2 // interval: 1, timer: 0 (因為 interval 來了一筆新串流,原本的 timer 串流直接捨棄 // timer: 1 // timer: 2 ``` ## ConcatMap - 當原有串流結束時,才會開始原本該執行的串流 - Case1 ```ts interval(3000).pipe( concatMap(() => timer(0, 1000)) ).subscribe(data => { console.log(data); }); // interval: 0, timer: 0 // interval: 0, timer: 1 // interval: 0, timer: 2 // interval: 0, timer: 3 // interval: 0, timer: 4 // interval: 0, timer: 5 // interval: 0, timer: 6 // ... 由於 timer 沒結束的機會,所以 interval 的新資料遲遲無法執行 ``` - Case2 ```ts interval(3000).pipe( concatMap(() => timer(0, 1000).pipe(take(3))) ).subscribe(data => { console.log(data); }); // interval: 0, timer: 0 // interval: 0, timer: 1 // interval: 0, timer: 2 // interval: 1, timer: 0 (由於 timer 只拿3個,所以 interval: 1 可以排入串流中) // interval: 1, timer: 1 // interval: 1, timer: 2 // ... ``` ## MergeMap - 將資料串流轉換 ```ts timer(0, 3000).pipe( mergeMap(id => { // 產生新的串流 timer return timer(0, 1500) .pipe(map(data => `資料流 ${id}: ${data}`)); } ).subscribe(result => { console.log(result); }); // 資料流編號 = 外層 timer // 0ms => 資料流 0: 0 // 1500ms => 資料流 0: 1 // 3000ms => 資料流 1: 0 (外層 timer 新資料) // 3000ms => 資料流 0: 2 (外層 timer 第一次產生的內層 timer未結束) // ... 結局 => 3000ms 產生一個 1500ms 間隔 的 timer ``` ## ExhaustMap - 當原本的資料串流沒結束前,新的資料要進來會進不來,而且會直接捨棄 ```ts interval(2000).pipe( exhaustMap((id) => { return timer(0, 1000).pipe( take(3), map(data => { return {id, data}; }) ) }) ).subscribe(data => { console.log(data); }); // interval: 0, timer: 0 // interval: 0, timer: 1 (interval: 1 有資料進來,但 timer 沒結束所以直接捨棄 interval: 1) // interval: 0, timer: 2 // interval: 2, timer: 0 (由於 timer 只拿3個,所以 interval: 2 可以排入串流中) // interval: 2, timer: 1 (interval: 3 有資料進來,但 timer 沒結束所以直接捨棄 interval: 3) // interval: 2, timer: 2 // ... ``` # 資料節流 ## ThrottleTime - 當`N`秒內有多筆資料只取一筆,可以取`第一筆`或`N秒後最後一筆` ```ts timer(0, 100).pipe( take(9), throttleTime(700) ).subscribe({ next: (data) => { console.log(data); } }); // 0ms => data: 0 // 100ms => data: 1 => ThrottleTime 700 => data: 1 keep // 200ms => data: 2 => ThrottleTime 700 => data: 1 drop => data: 2 keep // 300ms => data: 3 => ThrottleTime 700 => data: 2 drop => data: 3 keep // 400ms => data: 4 => ThrottleTime 700 => data: 3 drop => data: 4 keep // 500ms => data: 5 => ThrottleTime 700 => data: 4 drop => data: 5 keep // 600ms => data: 6 => ThrottleTime 700 => data: 5 drop => data: 6 keep // 700ms => commit 7 // 800ms => data: 8 => ThrottleTime 700 => data: 8 keep // ... // 1400ms => 因為沒資料進來 commit 8 ``` ## DebounceTime - 當`N`秒內有多筆資料只取最後一筆,但`N`秒內如果有新資料,就重新計時`N`秒 ```ts timer(0, 100).pipe( take(3), debounceTime(300) ).subscribe({ next: (data) => { console.log(data); } }); // 0ms => data: 0 => DebounceTime 300 => data: 0 keep // 100ms => data: 1 => DebounceTime 300 => data: 0 drop & data: 1 keep // 200ms => data: 2 => DebounceTime 300 => data: 1 drop & data: 2 keep // ... // 500ms => 因為沒資料進來 commit 2 ``` ## BufferTime - 當`N`秒內有多筆資料時,彙整成一個陣列 ```ts timer(0, 100) .pipe(bufferTime(500)) .subscribe(x => console.log(x)); // [0, 1, 2, 3, 4] // [5, 6, 7, 8, 9] // ... ``` ## 比較 - ThrottleTime 是`N`秒後取一筆,但`第一筆`一定會送出,下一筆則是`N`秒後的第一筆或最後一筆 - DebounceTime 是`N`秒後取一筆,但如果持續有資料進來會導致一直送不出資料 - BufferTime 是`N`秒後資料彙整,如果要做合併上的資料客製,會是比較好的選擇 # Subject - Subject 種類 - Subject: 一般subject,沒keep資料 - BehaviorSubject: 會keep 最後一筆資料,但需要給初始值,也可以取得最後狀態 - ReplaySubject: 會 keep 資料,但 keep 筆數取決於 new 時的數量,預設無限直到記憶體不夠 - Subject 也屬於不會終止的類型,所以當需要終止時,還是要用 `take` / `takeUntil` 等限制筆數的方式來讓 subject 終止,也可以用 subject 的 `complete` 來終止 - Subject 屬於可以讓外部使用者修改資料的類別,所以在可控範圍內可以允許使用 subject 物件,如果有要對外使用,還是建議使用 `asObservable` 將 `subject` 轉換為 `observable`,避免資料被修改 ## Subject ```ts const subject = new Subject<number>(); subject.next(1); subject.subscribe({ next: data => { console.log(`data: ${data}`); } }); subject.next(2); subject.next(3); // data: 2 // data: 3 // 1 不會印,因為送出時未subscribe ``` ## BehaviorSubject - 當 subject 使用 ```ts const subject = new BehaviorSubject<number>(0); subject.next(1); subject.subscribe({ next: data => { console.log(`data: ${data}`); } }); subject.next(2); subject.next(3); // data: 1 // data: 2 // data: 3 ``` - 直接取值 ```ts const subject = new BehaviorSubject<number>(0); console.log(subject.getValue()); subject.next(1); console.log(subject.getValue()); // data: 0 // data: 1 ``` ## ReplaySubject - 無限存 ```ts const subject = new ReplaySubject<number>(); subject.next(1); subject.next(2); subject.next(3); subject.subscribe({ next: data => { console.log(`data: ${data}`); } }); // data: 1 // data: 2 // data: 3 ``` - 限制筆數 ```ts // 限制 replay 2筆 const subject = new ReplaySubject<number>(2); subject.next(1); subject.next(2); subject.next(3); subject.subscribe({ next: data => { console.log(`data: ${data}`); } }); // data: 2 // data: 3 // 因為限制 2 筆,可是送出 3 筆,第 1 筆將在送出第 3 筆時 drop掉 ```