# RxJs - Beginner's Guide Have you ever wondered how you can open a 10GB file on a device with only 4GB of RAM? Or how Netflix app can play a cloud hosted 4K movie on your phone as soon as you press play? On a device with 4GB of RAM a 10GB file is **effectively infinite**. How does your computer manage something that is infinite? It has to load the file into memory in small chunks, read them and discard them before loading more data in to memory. It has to *stream* the data and process it in small chunks. ## What's a Stream? Stream is a collection of data that is potentially infinite. It is a sequence of data coming in over time. It can be thought of as items on a conveyor belt being processed one at a time. ``` Stream = Array + Infinity ``` Since the data is potentially infinitely, our trustly loops are not going to be effective on them. You can't write a `for loop` from zero to infinity to process the whole stream. ```javascript= βŒπŸ’€πŸ’€βŒ for (let i = 0; i < ♾️; i++) { const element = stream[i]; } ``` The problem we run into is how do we know when to stop. This is where Observables come into the picture. ## Observables Observables are potentially infinitely collections that return values one at a time asynchronously. I.e. there can potentially be some time passed between one value returned and the next. ``` Observable = Array + Infinity + Asynchronous OR Observable = Promise + Returns many times ``` ```code-center β€”β€”β€”β€”β€” Value 1 β€”β€”β€”β€”β€” Value 2 β€”β€”β€”β€”β€” Value 3 β€”β€”β€”β€”β€” Value 4 β€”β€”β€”β€”β€”|β€”> ``` In this way they are very similar to Promises. Promise can return one value after some time has passed. Observables return potentially infinite values with some time passing between each of them. Promises have two possible states: `resolve`, `reject`. Or in other words: `complete`, `error`. ```javascript= // Two states => resolve, reject const promise = new Promise(resolve, reject); promise .then((data) => console.log("Data came back:" + data)) // complete .catch((err) => console.error("No, Ew David", err)); // error ``` Observables add an extra state to the same concept: `next`, `complete`, `error`. ```javascript= const observable = from([1, 2, 3, 4]); // Three states => next, complete, error observable.subscribe({ next: (value) => console.log("Next value:", value), complete: () => console.log("Infinity is Done!!! Β―\_(ツ)_/Β― "), error: (err) => console.log("No, Ew David", err), }); ``` One of the most popular Observable libraries in Javascript is **[RxJS](https://rxjs.dev/guide/overview)**. What makes RxJS awesome is not just the concept of Observables but also the extensive array of **Operators** it includes that can take action on those Observables to allow complex asynchronous code to be easily composed in a declarative manner. ## RxJS Operators In RxJS, operators are functions that accept an Observable as input, runs some transformations on it and return the new transformed Observable as output. These Operators are (mostly) pure, side effect free functions; meaning, they don't change the incoming Observable in any way. This makes these operators chainable or pipeable; such that, asynchronous logic on your application can be broken down into small managable blocks. RxJs categorizes it's operators in a few categories but the most commonly used operators are creation operators and tranformation operators. In this guide, we will talk about how to create an observable, how to do command transformations and how to consume the data emmited by Observables. ## Creation Operator - `of` This is the simplest way to create an Observable from static data. This is an easy to use wrapper that accepts any sequence of data as input and returns an ready to use Observable. This operator comes in handy to start an brand new Observable pipeline. ```javascript= of(10, 20, 30).subscribe( next => console.log('next:', next), err => console.log('error:', err), () => console.log('the end'), ); // result: // 'next: 10' // 'next: 20' // 'next: 30' // the end ``` ## Creation Operator - `from` This operator is similar to `of` but it works for iterable data. I.e. it accepts a collection of data and returns an Observable that emits each value of the collection one at a time. The real power of this operator comes from the fact that it can also accepts asynchronous iterables like generators, promises and other Observables. ```javascript= from([10, 20, 30]).subscribe( next => console.log('next:', next), err => console.log('error:', err), () => console.log('the end'), ); // result: // 'next: 10' // 'next: 20' // 'next: 30' // the end const promise = fetchDataFromServer(); from(promise).subscribe( next => console.log('next:', next), err => console.log('error:', err), () => console.log('the end'), ); // result: // 'next: {msg: "hello world"}' // the end ``` ## Transformation Operator - `map` This operator is very similar to `Array#map`; in that, it accepts each new value emited by the observable, transforms it and passes it along to the next operator in the pipe. This is where the conceptual underpinning of the streams and observables start to show their true colors. *Why go through the trouble of learning this whole new concept when the same problem can be solved using `Array#map`???* Most problems that we programmers deal with day to day can be solved using putting all the data in an array and running some tranformation over it using `map`. But Observables come in real handy when we simply can't load the whole dataset into an Array (i.e. the data is effectively infinite). Or when we don't have the whole dataset available to us upfront. As in, the dataset is asynchronous and new values are coming in slowly over the network. Or many a times we have both problems; meaning, effectively infinite data is coming in slowly over the network few values at a time. Bulk processing large amounts of data coming in from the internet is a good example of this problem. RxJs operators are almost always Pure/side-effect free and they work with one emitted value at a time. This makes dealing with effectively infinite dataset really easy. Since the function is side effect free, the system doesn't have to hold on items that are not currently being processed. i.e. only one item at a time is held in memory. ```javascript= range(1, 1000) // another RxJS creation operator that starts at 0 and emits 1000 values .pipe(map(x => x * 10)) .subscribe( next => console.log('next:', next), err => console.log('error:', err), () => console.log('the end'), ); // result: // 'next: 10' // 'next: 20' // 'next: 30' // .... // .... // 'next: 10000' // the end ``` ## Trasformation Operator - `mergeMap` This operator is very similar to `map` but it's transformation function returns asynchronous data (Observables or promises). This makes handling many async calls to server or database very easy and even allows us to parallelize those calls. ```javascript= range(1, 1000) .pipe(mergeMap(pageNum => fetchBulkDataFromServer({pageNum: pageNum}))) .pipe(map(bulkData => `Page Num ${bulkData.page} retuned ${bulkData.items.length} items`)) .subscribe( next => console.log('next:', next), err => console.log('error:', err), () => console.log('the end'), ); // result: // 'next: Page Num 1 retuned 100 items' // 'next: Page Num 2 retuned 90 items' // 'next: Page Num 3 retuned 70 items' // 'next: Page Num 4 retuned 100 items' // .... // .... // 'next: Page Num 1000 retuned 30 items' // the end ``` As `mergeMap` is mapping over async data (Observables), it significantly speed things up by mapping over multiple Observables in parallel. It accepts a second argument `concurrency count` that defines how many Observables to run in parallel. Implementing this level of parallel async processing without using Observables is not a straightforward task and can easily result in hard to debug concurrency bugs. ```javascript= const maxParallelApiCalls = 50; range(1, 1000) .pipe(mergeMap(pageNum => fetchBulkDataFromServer({pageNum: pageNum}), maxParallelApiCalls)) .pipe(map(bulkData => `Page Num ${bulkData.page} retuned ${bulkData.items.length} items`)) .subscribe( next => console.log('next:', next), err => console.log('error:', err), () => console.log('the end'), ); // result: // 'next: Page Num 7 retuned 10 items' // 'next: Page Num 12 retuned 8 items' // 'next: Page Num 38 retuned 12 items' // 'next: Page Num 3 retuned 70 items' // .... // .... // 'next: Page Num 1000 retuned 30 items' // the end ``` In the above example, RxJs starts processing 50 observables at the same time and emit the data returned by those observables in the order of they finished. So whichever API call returns first it's data would be piped to the next operator. Here's a timeline visualization how async data is parallelized by `mergeMap`. ```code <Start> β€” Value 1 β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”|β€”> <Start> β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€” Value 2 β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”|β€”> <Start> β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€” Value 3 β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”|β€”> <Start> β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€” Value 4 β€”β€”β€”β€”|β€”> <Start> β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”Value 5 β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”|β€”> β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€” Merge β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€” <Start> β€” Value 1 β€”β€” Value 3 β€”β€” Value 5 β€”β€” Value 2 β€”β€” Value 4 β€”|β€”> ``` ## Conclusion This tutorial is a jumping off point for Observables, RxJs and asynchronous functional programming. I implore you to consider Observables and RxJS for the next asynchronous problem that comes your way. We studied a few operators in this beginners guide, but RxJS has huge number of operators suitable for a wide varity of use cases. Go checkout [their documentation](https://rxjs.dev/api) for yourself and start using them today.