# RxJS https://rxjs.dev/ RxJS,全名Reactive Extension for JavaScript 目的是為了能夠更方便的處理非同步事件 - observables - subscriptions - observers - creation functions - pipable operators - subjects ## observable ### array vs stream array: data已經被定義好在那裡給我們使用,且很容易可以取得 stream: data可能從任何的時間點進來,我們不知道data什麼時候會出現 observable使用的是stream 先寫好如何互動的程式,再根據進來的資料判斷要怎麼互動,這也被叫做reactive programming 例子: - 滑鼠移動 (x:5, y:6) -> (x:6, y:6) -> ... - 在輸入欄裡打字 "" -> "T" -> "Te" -> "Tes" -> "Test" ### observable, observer, subscription ```typescript= // Observable const observable$ = new Observable(subscriber => { subscriber.next('Alice'); subscriber.next('Ben'); }) // Observer const observer = { next: value => console.log(value) } //Subscription observable$.subscribe(observer); //Subscription裡面只放一個function的話,那個function會被當成next observable$.subscribe(value => console.log(value)); ``` - Observable 要建立一個Observable,要用new Observable() 裡面放callback function observable像是一個function,每當我們subscribe他,他才會執行 - Observer Observer描述了對於過來的data要做什麼樣的動作 - Subscription Observable有一個subscribe method,會回傳Subscription Subscription可以把Observer丟到Observable裡面執行callback Subscription也有個unsubscribe method,當你不需要監聽時可以用它來關閉,解放記憶體 每一個Subscription都是獨立的,就算是用同樣的Observable跟Observer多次subscribe都一樣 ## Marble Diagram https://rxmarbles.com/ rxjs的流程可以畫成marble diagram,用來解釋資料流向 ![](https://i.imgur.com/bMP56rY.png) 上面是資料流進來的時間點 中間是operator 下面是結果 - 圈圈 可以出現0到多次,表示那個時間點資料傳出去了 - 叉叉 最多一次,代表出現error - 直線 最多一次,代表已完成(unsubscribe) 叉叉或直線後面不會有其他東西 出現錯誤或unsub之後observable不會再監聽 ## Subscription ### lifecycle ![](https://i.imgur.com/WgrUsbB.png) ### Teardown logic 當subscription結束時,會觸發teardown logic - complete ```typescript= const observable$ = new Observable<string>((subscriber) => { setTimeout(() => { subscriber.next("yep"); }, 2000); subscriber.complete(); return () => { console.log("It's all over"); }; }); observable$.subscribe({ next: (value) => console.log(value), complete: () => console.log("complete"), }); ``` ![](https://i.imgur.com/CLWdGM4.png) 在observable裡可以return function 該function在complete被呼叫的時候才會執行 而呼叫完complete進到teardown階段之後,所有在observable裡正在執行中的process都會被捨棄掉 - error ```typescript= const observable$ = new Observable<string>((subscriber) => { setTimeout(() => { subscriber.next("yep"); }, 2000); setTimeout(() => { subscriber.error(new Error("stop, you have violated the law")); }, 4000); return () => { console.log("It's all over"); }; }); observable$.subscribe({ next: (value) => console.log(value), error: (err) => console.log(err.message), }); ``` ![](https://i.imgur.com/S0Ahpab.png) 觸發error之後一樣會進到teardown階段 所以也會執行return的函式 - order ```typescript= // error 4s, complete 2s const observable$ = new Observable<string>((subscriber) => { setTimeout(() => { subscriber.error(new Error("stop, you have violated the law")); }, 4000); setTimeout(() => { subscriber.next("yep"); subscriber.complete(); }, 2000); return () => { console.log("It's all over"); }; }); observable$.subscribe({ next: (value) => console.log(value), complete: () => console.log("complete"), error: (err) => console.log(err.message), }); ``` ![](https://i.imgur.com/j69kfxC.png) 再次證明error或complete後面不會再有資料傳出 - unsubscribe ```typescript= const observable$ = new Observable((subscriber) => { let n = 1; setInterval(() => { subscriber.next(n++); }, 2000); return () => console.log("quit"); }); const sub = observable$.subscribe((value) => console.log(value)); setTimeout(() => { console.log("unsub"); sub.unsubscribe(); }, 7000); ``` ![](https://i.imgur.com/fqq92Do.png) unsub一樣會觸發teardown 雖然unsub了,但程式還在無限迴圈中 因為我們沒有清除interval 所以要在teardown logic裡加上清除interval的動作 ```typescript= return () => { clearInterval(interval); console.log("quit"); }; ``` 這告訴我們要注意在離開observable之前要檢查是否有東西要手動清除 ## Types of Observable ### cold 當一個observable所產出的data是基於它自身裡面的邏輯產出來的時候,我們可以說它是cold observable cold observable建立出來的subscription的資料是獨立的,不會受到其他subscription影響 ex:傳送request的ajax做出來的observable ### hot 當一個observable產出的data是來自已經存在的另一個資料來源時,那就可以說它是hot observable,產出來的data不太跟observable內部邏輯相關,通常是基於該資料來源 這種observable會把資料multicast到所有subscription ex:網頁的DOM,資料來源可能是DOM events ## Creation function(Creation operator) 上面做出observable都是用手動new Observable() 而creation function可以幫我們快速建立observable,並在裡面預先設定一些特定行為,像是ajax可以建立會發出request的observable ### of 使用of建立出來的observable就像參數很多的function 當subscribe之後會一個一個把參數放進next,然後complete ```typescript= import { of } from "rxjs"; of("first", "second", "third").subscribe({ next: (value) => console.log(value), complete: () => console.log("completed"), }); ``` ![](https://i.imgur.com/B2ymV7S.png) ### from from可以把其他資料型別轉換成一個observable 像array,promise, 可迭代物件, observable...... 放promise進去,被reject的話會觸發error function 做的事其實和of差不多 ```typescript= // 會跟上面的of有一樣結果 // array const myPromise = new Promise((resolve, reject) => { // 觸發complete resolve("resolved"); // 觸發error // reject("rejected"); }); from(["first", "second", "third"]).subscribe({ next: (value) => console.log(value), complete: () => console.log("completed"), }); // promise const promise$ = from(myPromise); promise$.subscribe({ next: (value) => console.log(value), error: () => console.log("error"), complete: () => console.log("completed"), }); ``` ![](https://i.imgur.com/6hydXLz.png) ### fromEvent fromEvent可以建立基於event的observable 像是DOM event, Node.js EventEmitter, aedes broker的event... 是hot observable 第一個參數放物件,第二個參數放event name subscribe()可以當成是addEventListener() unsubscribe()可以當成是removeEventListener() ```typescript= const button = document.querySelector("#mybutton")!; fromEvent<MouseEvent>(button, "click").subscribe({ next: (event) => console.log(event.x, event.y), }); ``` ![](https://i.imgur.com/WzBn1aE.png) 用個方法永遠不會complete,如果要停止監聽,就必須要unsubscribe fromEvent unsub之後可以把EventListener完整關掉 不用像原本的observable要去手動移除EventListener ### timer 等待一段時間,送出資料後complete ```typescript= timer(3000).subscribe({ next: (value) => console.log(value), complete: () => console.log("completed"), }); ``` 這裡的value會是0 timer跟setTimeout最大的差別在於,timer提前ubsub的時候會整個結束,而提前unsub有setTimeout的observable時,雖然也不會送出next,但setTimeout並不會結束,變成我們要手動去clearTimeout去解放記憶體 ### interval 每隔一段時間送出資料 ```typescript= interval(3000).subscribe({ next: (value) => console.log(value), }); // 0 1 2 3 4 5 ``` 這裡的value會從0開始一直送,每次送都+1 而如果unsub interval,這個interval會被自動清掉,不需要手動清除 ### forkJoin 他所接收的參數是observable的array 當subscribe這個forkJoin,他會為傳入的所有observable建立subscription,然後等到他們都complete後,收集他們最後所傳出的data到一個array裡 array裡observable的順序即為最後回傳資料的順序 ```typescript= const order1$ = of("First", "Second"); const order2$ = of("Third", "Fourth"); forkJoin([order1$, order2$]).subscribe({ next: ([order1, order2]) => { console.log(order1, order2); }, }); ``` ![](https://i.imgur.com/WhzksID.png) 若是有其中一個observable出現了error,forkJoin會立即觸發error function,所有還沒complete的subscription會立即被unsub,並依array順序執行teardown logic ### combineLatest combineLatest跟forkJoin很像,但發出data的次數更頻繁 他所接收的參數一樣是observable的array subscribe combineLatest時,為所有observable建立subscription 跟forkJoin不同的地方是,當所有subscription都發出過一次資料時,他就會發出最新資料的array 之後每當有subscription發出資料時,他就會更新該array並發出資料,直到所有subscription都complete 出現error一樣會全部unsub並執行teardown logic ![](https://i.imgur.com/8wI9Yy0.png) ## Pipable operator 可以把observable所傳出來的data轉換成各種形式 也可以寫出更複雜的邏輯 當observable傳出資料時,會先經過pipable operator,再進行下一個動作 同一個observable可以有很多個pipable operator ### filter 決定資料是否可以通過,只會對next進行filter ```typescript= import { from, filter } from "rxjs"; const char$ = from(["a", "b", 1, 3, "e", 9, "q", 5]); char$.pipe(filter((item) => typeof item === "number")).subscribe({ next: (value) => console.log(value), }); // 1 3 9 5 ``` 也可以把pipe直接寫在observable後面變成新的observable ```typescript= const char$ = from(["a", "b", 1, 3, "e", 9, "q", 5]).pipe( filter((item) => typeof item === "number") ); ``` ### map 會把資料轉變成其他型態,同樣只會對next進行map 基本上跟javascript array的map差不多 ```typescript= import { from, filter, map } from "rxjs"; const char$ = from([1, 3, 9, 5]).pipe(map((item) => item * 2)); char$.subscribe({ next: (value) => console.log(value), }); //2 6 18 10 ``` ### tap https://jaywoz.medium.com/information-is-king-tap-how-to-console-log-in-rxjs-7fc09db0ad5a tap基本上不會影響到observable,他只會把pipe裡某個階段的data帶出來 通常用於debug ```typescript= const char$ = from([1, 3, 9, 5]).pipe( tap((item) => console.log(item)), map((item) => item * 2), tap((item) => console.log(item)) ); // 第一個tap:1 3 9 5 // 第二個tap:2 6 18 10 ``` tap不是subscribe,所以如果該observable沒有被subscribe的話,tap的結果不會顯示出來 ### debounceTime 設定一個秒數,當有資料傳進來時不會馬上傳出去並進行倒數,當在倒數時間裡有其他資料進來時會刷新秒數,直到沒有資料再進來,秒數歸零時,傳出最近一次的資料 ```typescript= const clicks = fromEvent(document, 'click'); const result = clicks.pipe(debounceTime(1000)); result.subscribe(x => console.log(x)); ``` https://rxjs.dev/api/operators/debounceTime ![](https://i.imgur.com/dNt6Fao.png) 可以用在當一個微小的變動就會發出request或要進行複雜的邏輯的情形,避免被吃資源 ### catchError 只會在error發生的時候觸發,不會對next跟complete做出反應 在catchError裡傳入一個observable,當偵測到error發生時,不會觸發error function,而是會改去subscribe catchError裡的observable,並直到他complete 且在catchError裡產生的資料一樣會被傳出來 當catchError complete時,原本的subscription也會complete 若是在catchError又發生error,則會觸發原本subscription的error function ```typescript= const counting$ = new Observable((subscriber) => { for (let i = 0; i < 5; i++) { if (i === 4) { subscriber.error(new Error("no four")); } subscriber.next(i); } }); counting$.pipe(catchError((error) => of(5, 6, 7, 8, 9))).subscribe({ next: (value) => console.log(value), complete: () => console.log("completed"), }); // 0 1 2 3 5 6 7 8 9 completed ``` ![](https://i.imgur.com/SuXFAeq.png) 如果你不想提供任何資料,可以使用內建的observable:EMPTY EMPTY只會complete,並不會觸發next 在catchError裡也可以讓程式重新再跑一次該subscription 此時catchError的參數要設成2個 ```typescript= counting$ .pipe( catchError((error, caught) => { return caught$; }), take(30) ) .subscribe({ next: (value) => console.log(value), complete: () => console.log("completed"), }); ``` ## Pipable operator: Flattening operator flattening operator跟map其實很像,但他最後map成了observable flattening operator會在subscription裡面再建立一個subscription 通常是放在next裡,並且將inner subscription傳出來的資料當作他自己傳出來的資料,error也會被傳出來,但complete不會被傳出來 ### concatMap https://rxjs.dev/api/operators/concatMap ```typescript= const clicks = fromEvent(document, 'click'); const result = clicks.pipe( concatMap(event => of(1)) ); result.subscribe(x => console.log(x)); ``` ### error handling 一旦在一個永不complete的subscription裡,inner subscription傳出error,那outer subscription就會立刻停止 如果要讓outer subscription不停止,需要error handling 如果直接在concatMap後面加catchError(() => EMPTY),因為EMPTY會產生complete,一樣會停止 但要是加在concatMap的pipe裡面,因為concatMap不會把complete傳出來的關係,所以會繼續運作 ```typescript= const test$ = new Observable((subscriber) => { console.log('test'); subscriber.error('error'); }); const clicks = fromEvent(document, 'click'); const result = clicks.pipe( concatMap(() => test$.pipe(catchError(() => EMPTY))) ); result.subscribe((x) => console.log(x)); ``` ### concurrency 下面有幾種flattening operator,每個對concurrency的機制都不同 - concatMap 重視順序,舊的inner subscription執行完再用新的資料執行新的,期間等待執行的資料會被存在queue裡,很容易可以注意到memory leak的問題 最安全的作法,同時也是最慢的做法 ![](https://i.imgur.com/NGWt7X8.png) - switchMap 當有新資料進來,會unsub目前的subscription,立刻執行下一個subscription,意即同一時間只會有正好一個inner subscription執行,不太會有memory leak的問題 可以確保得到的資料是最新的 但這個subscription的內容如果是送request過去且收到response後complete,用了這個方法雖然不會收到已經unsub的資料,但request還是可能接觸到server ![](https://i.imgur.com/gPnJ5QO.png) - mergeMap 可以有多個inner subscription同時運行 一有資料就馬上傳出來,不太能確認資料的順序 沒有控管好的話,大量資料進來可能導致memory leak ![](https://i.imgur.com/9QDSjLU.png) ## Subject subject同時是observable跟observer subject會multicast資訊到每個subscribe他的subscription,所以subject算是hot observable multicast的方式是subject.next() 簡單來說subject會把observer存在一個清單,並在收到資料的時候尋遍那個清單並把資料傳出去 把subject當成observer的subscription,每當資料進來就會呼叫next(),因此subject也會同時把收到的資料multicast到observer清單裡 但subscription的complete跟error也會被multicast,造成那些subscription的終止,因此要注意 ```typescript= const subject$ = new Subject(); subject$.subscribe((value) => console.log("x")); subject$.next(""); console.log(subject$); of("1").subscribe(subject$); console.log(subject$); ``` ![](https://i.imgur.com/ioFVM1S.png) ### Subject的種類 當我們subscribe subject的當下,沒有辦法知道他的狀態,即沒辦法拿到他之前發出的資料,只有他發出下一個資料才能取得目前狀態 而下列幾個方法有辦法讓我們拿到subject的狀態 - BehaviorSubject ```typescript= const subject$ = new BehaviorSubject<string | number>("initial"); // initial subject$.subscribe((value) => console.log(value)); // u subject$.next("u"); ``` 要建立這個subject,必須要給他初始值來代表他起始的狀態 當subscribe他的時候,BehaviorSubject會回傳他目前的狀態 - ReplaySubject ```typescript= const subject$ = new ReplaySubject(2); subject$.next("u"); subject$.next("and"); subject$.next("me"); // and, me subject$.subscribe((value) => console.log(value)); ``` 當你希望在subscribe後重新發送最後的幾個元素時可以用這個方法 ### multicasted observable https://ithelp.ithome.com.tw/articles/10188750