# RxJava Guide Updates ## Overview [RxJava](https://github.com/ReactiveX/RxAndroid) is described officially as "a library for composing asynchronous and event-based programs by using observable sequences". But what does this actually mean? Let's put this library into context. One of the challenges in writing robust Android apps is the dynamic nature of changing inputs. In traditional imperative programming models, values have to be explicitly set on variables for them to be updated. If one dependent value changes, the value will not be updated without adding another line of code. Consider the following example: ```java // init variables int i, j, k; // Init inputs i = 1; j = 2; // Set output value k = i + j; // Update a dependent value j = 4; k = ? // What should k be? ``` State variables such as `k` intrinsically may not reflect the current value of its inputs. Traditional asynchronous programming approaches tend to rely on callbacks to update these changes, but this way can lead to a problem known as [callback hell](http://callbackhell.com/). **_Reactive programming_** (see an intro [here](https://gist.github.com/staltz/868e7e9bc2a7b8c1f754)) addresses these issues by providing a framework to describe outputs to reflect their changing inputs. RxJava, which is a port of the [Reactive Extensions](https://msdn.microsoft.com/en-us/data/gg577609.aspx) library from .NET, enables Android apps to be built in this style. ### Advantages Here are a few advantages of using RxJava on Android: * **Simplifies the ability to chain async operations.** If you need to make an API call that depends on another API call, you will likely end up implementing this call in the callback of the first one. RxJava provides a way to avoid needing to creating [layers of callbacks](https://www.bignerdranch.com/blog/what-is-functional-reactive-programming/) to address this issue. For this reason, RxJava became [popular within Netflix](http://www.infoq.com/presentations/rx-service-architecture) in 2014 for abstracting away the complexities of performing dependent API calls. * **Exposes a more explicit way for declaring how concurrent operations should operate.** Although RxJava is single-threaded by default, RxJava helps enable you to define more explicitly what type of threading models should be used for both background and callback tasks. Since Android only allows UI updates on the main thread, using RxJava helps make the code more clear about what operations will be done to update the views. * **Surfaces errors sooner.** One issue with [[AsyncTask|Creating-and-Executing-Async-Tasks]] is that errors that occur on the background thread are hard to pass along when updating the UI thread using the `onPostExecute()` method. In addition, there are limitations in how many AsyncTasks can be dispatched concurrently as described in this [blog post](http://blog.danlew.net/2014/06/21/the-hidden-pitfalls-of-asynctask/). RxJava provides a way to enable these errors to be surfaced. * **Helps reduce the need for state variables that can introduce bugs.** One mindset shift need in using RxJava is thinking about everything in terms of describing as data flows in the system. Click events generated by the user, network calls, data updates from external sources all can all be described as asynchronous streams of data. The power of RxJava is that it enables these streams to be transformed, filtered, or used to create new streams of data with only a few lines of code while also minimizing the need for storing state variables. ## Setup Setup your `app/build.gradle`: ```gradle dependencies { compile 'io.reactivex:rxandroid:1.2.0' compile 'io.reactivex:rxjava:1.1.4' } ``` ## Observables and Observers The basic building blocks of reactive code are `Observables` and `Observers`. An `Observable` emits items; an `Observer` consumes those items. An `Observable` **may emit any number of items** (including zero items), then it terminates either by successfully completing, or due to an error. An `Observable` can then have **any number of observers**. For each `Observer` attached, an Observable calls `Observer#onNext()` for every item, followed by either `Observer#onComplete()` or `Observer#onError()`. Keep in mind that `Observables` often don't start emitting items until there's at least one subscriber. ### Defining Observables Let's take the most basic example to understand how this is structured. First, let's define an `Observable` which is an object that can emit any number of items to be processed: ```java // Observables emit any number of items to be processed // The type of the item to be processed needs to be specified as a "generic type" // In this case, the item type is `String` Observable<String> myObservable = Observable.create( new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> sub) { // "Emit" any data to the subscriber sub.onNext("a"); sub.onNext("b"); sub.onNext("c"); // Trigger the completion of the event sub.onCompleted(); } } ); ``` This observable event emits the data "a", "b", "c" and then completes. ### Defining Observers Now let's create a `Observer` to consume this emitted data from the `Observable`: ```java Observer<String> mySubscriber = new Observer<String>() { // Triggered for each emitted value @Override public void onNext(String s) { System.out.println("onNext: " + s); } // Triggered once the observable is complete @Override public void onCompleted() { System.out.println("done!"); } // Triggered if there is any errors during the event @Override public void onError(Throwable e) { } }; ``` ### Subscribing to Observables An `Observer` can be attached to an `Observable` in order to respond to emitted data with: ```java // Attaches the subscriber above to the observable object myObservable.subscribe(mySubscriber); // Outputs: // onNext: "a" // onNext: "b" // onNext: "c" // done! ``` This example would simply print each emitted item and then exit since each item is invoked with a call to `onNext`. Once all items have been invoked, the `onCompleted` method of the `Observer` is called. ## Creating Asynchronous Streams As demonstrated above, an `Observer` watches for result values emitted by the `Observable`. When these events occur, the role of the subscriber is to respond to these events. An `Observable` can be created from any type of input. For instance, an `Observable` can be a set of string items that should be iterated: ```java // `just` generates an observable object that emits each letter Observable.just("a", "b", "c") ``` To implement an observer for these events, the following interface must be defined: ```java public interface Observer<T> { void onNext(T t); // called for each "emitted" item void onCompleted(); // will not be called if onError() is called void onError(Throwable e); // called if there's an error } ``` Note that an `Observer` is a generic type. It must be represent the type of value that the `Observable` will emit. For a subscriber to start watching an observable that will generate string types, it must subscribe to it: ```java Observable.just("a", "b", "c").subscribe(new Observer<String>() { // Triggered for each emitted value // Invoked with "a", then "b", then "c" @Override public void onNext(String s) { System.out.println("onNext: " + s); } // Triggered once the observable is complete @Override public void onCompleted() { System.out.println("done!"); } // Triggered if there is any errors during the event @Override public void onError(Throwable e) { } }); ``` This example above would simply print each argument ("a", "b", "c") and then exit since each item is invoked with a call to `onNext`. Once all items have been invoked, the `onCompleted` method is called. This might seem contrived and not particularly useful in this example but as layers are built on top of these foundations, we begin to see the power of RxJava. ### Schedulers RxJava is synchronous by default, but work can be defined asynchronously using schedulers. For instance, we can define that the network call should be done on a background thread, but the callback should be done on the main UI thread. Using schedulers relies on queuing the work through bounded or unbounded thread pools. Here are a few options available that come with RxJava. See [this link](http://reactivex.io/RxJava/javadoc/rx/schedulers/Schedulers.html) for all the possible options. | Name | Description | |:--------------------------:|:------------------------------------------------------:| | Schedulers.computation() | fixed number of threads (= to # CPU's) | | Schedulers.immediate() | current thread | | Schedulers.io() | backed by a current | | Schedulers.newThread() | create a new thread | | Schedulers.tramponline() | schedule work on the current thread but put on a queue | These schedulers than then be used to control which thread an observable or the subscriber are operating on using the `subscribeOn()` and `observeOn()` ### Replacing AsyncTask with Observables We can replace any `AsyncTask` calls with RxJava calls instead using `Observable`. Similar to how `AsyncTask` performs the task in the background and then calls `onPostExecute()` on the main thread on the UI, RxJava can accomplish this same function by defining which thread to perform the task with `subscribeOn()`, and then where to define the callback thread with `observeOn()`: ```java // This constructs an `Observable` to download the image public Observable<Bitmap> getImageNetworkCall() { // Insert network call here! } // Construct the observable and use `subscribeOn` and `observeOn` // This controls which threads are used for processing and observing Subscription subscription = getImageNetworkCall() // Specify the `Scheduler` on which an Observable will operate .subscribeOn(Schedulers.io()) // Specify the `Scheduler` on which a subscriber will observe this `Observable` .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<Bitmap>() { // This replaces `onPostExecute(Bitmap bitmap)` @Override public void onNext(Bitmap bitmap) { // Handle result of network request } @Override public void onCompleted() { // Update user interface if needed } @Override public void onError() { // Update user interface to handle error } }); ``` Using this combination of `Observable`, `Subscriber` and thread scheduling, we can see the power of `RxJava`. But there's a lot more that can be done. ### Retrofit The [[Retrofit|https://github.com/codepath/android_guides/wiki/Consuming-APIs-with-Retrofit#rxjava]] library simply [wraps a synchronous network call](https://github.com/square/retrofit/blob/master/retrofit-adapters/rxjava/src/main/java/retrofit/RxJavaCallAdapterFactory.java##L152-L163) as an `Observable` type for use with RxJava. Declaring the endpoints as `Observable` automatically does this work. ```java public interface MyApiEndpointInterface { @GET("/users/{username}") Observable<User> getUser(@Path("username") String username); } ``` We can then instantiate an instance of this interface and get back an `Observable` type: ```java MyApiEndpointInterface apiService = retrofit.create(MyApiEndpointInterface.class); // Get the observable User object Observable<User> call = apiService.getUser(username); // To define where the work is done, we can use `observeOn()` with Retrofit // This means the result is handed to the subscriber on the main thread call.observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<User>() { @Override public void onNext(User user) { // Called once the `User` object is available } @Override public void onCompleted() { // Nothing to do here } @Override public void onError(Throwable e) { // cast to retrofit.HttpException to get the response code if (e instanceof HttpException) { HttpException response; int code = response.code(); } } }); ``` The RxAndroid library includes `AndroidSchedulers.mainThread()` for allowing callbacks to be fired on the main UI thread. ### Hot vs. Cold Observables By [default](https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/Observable.java#L66-L67), Observables are initialized to begin executing after the first subscriber is attached. Retrofit, for instance, by default operates in this way, which are known as **hot** observables. You can take a look at the Retrofit [source code](https://github.com/square/retrofit/blob/master/retrofit-adapters/rxjava/src/main/java/retrofit2/RxJavaCallAdapterFactory.java#L88) to see that the network request is made on the first subscription. #### Hot to Cold Observables If you wish to change it so that multiple subscribers are attached before executing the request, otherwise known as converting to a **cold** observable, you need to convert the `Observable` to an `ConnectableObservable`. To initiate the network request, you need to call `connect()` on the observable: ```java Observable<User> call = apiService.getUser(username); // convert Observable to ConnectedObservable, get a reference so we can call connect() later ConnectableObservable<User> connectedObservable = call.publish(); // define 1st observer Observer<User> observer1 = new Observer<User>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(User user) { // do work here } }; // define 2nd observer here Observer<User> observer2 = new Observer<User>() { } // observer is subscribing connectedObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(observer1); connectedObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(observer2); // initiate the network request connectedObservable.connect(); ``` #### Cold to Hot Observables You can also turn a cold observable back to a hot observable by using `autoConnect()`. Instead of needing to call an explicit `connect()` and passing around `ConnectedObservable` types, you can use this approach to enable the next subscriber to trigger a network request upon the next subscription: ```java // back to hot observable Observable<User> observable = connectedObservable.autoConnect(); // define 3rd observer here Observer<User> observer3 = new Observer<User>() { } observable.subscribe(observer3); ``` ### Avoiding Memory Leaks The flexibility in being able to schedule observables on different threads and have them operate as long-running tasks can make it easy to contribute the memory leaks. One of the reasons is that observers are often created with anonymous classes. In Java, creating anonymous classes requires the inner class retaining an instance to the containing class as discussed in this [Stack Overflow article](http://stackoverflow.com/questions/5054360/do-anonymous-classes-always-maintain-a-reference-to-their-enclosing-instance). An observer that is created within an Activity or Fragment therefore can hold a reference that will be unable to be garbage collected if the observable is still running. There are several different approaches suggested. Both approaches attempt to manage the subscriptions created from attaching an observer to an observable and canceling them when a lifecycle event occurs. #### Composite Subscriptions One of the simplest approach is to simply instantiate a CompositeSubscription object inside your Activity or Fragment. To avoid issues with determining when the object is created during life cycle events, it should be defined outside any of these related methods: ```java public class MainActivity extends AppCompatActivity { private CompositeSubscription subscriptions = new CompositeSubscription(); } ``` We can then use `CompositeSubscription` to track any subscriptions created by using the `add()` method: ```java subscriptions.add(connectedObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(observer1)) subscriptions.add(connectedObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(observer1)); ``` Canceling these subscriptions should occur during the `onPause()` method when the Activity or Fragment is suspended: ```java @Override public void onPause() { super.onPause(); if (subscriptions != null) { subscriptions.unsubscribe(); } } ``` Once an `unsubscribe()` call is made, the `CompositeSubscription` cannot be reused. The reason is that RxJava is intended to have a termination state as discussed in [this GitHub issue](https://github.com/ReactiveX/RxJava/issues/2959): ```java @Override public void onResume() { super.onResume(); subscriptions = new CompositeSubscription(); } ``` #### RxLifecycle There is also a library called [RxLifecycle](https://github.com/trello/RxLifecycle) which provides support for managing the lifecycle of activities and fragments. In the past [RxAndroid](https://github.com/ReactiveX/RxAndroid/wiki) provided this support with `AndroidObservables`, but a decision was made to simplify the library. See this [release note](https://github.com/ReactiveX/RxAndroid/releases/tag/v1.0.0) for more context. To setup, these Gradle lines must be added: ```gradle compile 'com.trello:rxlifecycle:0.4.0' compile 'com.trello:rxlifecycle-components:0.4.0' ``` RxLifecycle requires subclassing all activities with [RxActivity](https://github.com/trello/RxLifecycle/blob/master/rxlifecycle-components/src/main/java/com/trello/rxlifecycle/components/RxActivity.java). One issue is that it does not directly from `AppCompatActivity` so you may need to create a similar class that performs this same behavior. See [this guide](https://github.com/trello/RxLifecycle#rxlifecycle) for more details. ### Chaining Observables For a better understanding about how subscriptions can be chained and how RxJava works in general, it's best to first to understand what happens beneath the surfaces when this `subscribe()` call is made. Beneath the covers `Subscriber` objects are created. If we wish to chain the input, there are various **operators** that are available that map one `Subscriber` type to another. For more context, watch this [video talk](https://vimeo.com/144812843). ## References * <https://www.captechconsulting.com/blogs/getting-started-with-rxjava-and-android> * <http://blog.danlew.net/2014/09/15/grokking-rxjava-part-1/> * <https://github.com/ReactiveX/RxJava/wiki/The-RxJava-Android-Module> * <http://saulmm.github.io/when-Iron-Man-becomes-Reactive-Avengers2/> * <http://blog.stablekernel.com/replace-asynctask-asynctaskloader-rx-observable-rxjava-android-patterns/> * <https://www.youtube.com/watch?v=_t06LRX0DV0/> * <https://vimeo.com/144812843> * <http://code.hootsuite.com/asynchronous-android-programming-the-good-the-bad-and-the-ugly/> * <https://www.youtube.com/watch?v=va1d4MqLUGY&feature=youtu.be/> * <http://www.slideshare.net/TimoTuominen1/rxjava-architectures-on-android-android-livecode-berlin/> * <https://www.youtube.com/watch?v=va1d4MqLUGY&feature=youtu.be/> * <https://speakerdeck.com/benjchristensen/reactive-streams-with-rx-at-javaone-2014> * <http://www.philosophicalhacker.com/2015/06/12/an-introduction-to-rxjava-for-android/> * <http://www.oreilly.com/programming/free/files/rxjava-for-android-app-development.pdf> * <https://medium.com/@LiudasSurvila/droidcon-2015-london-part-1-698a6b750f30#.tvinpqa2q> * <https://speakerdeck.com/passsy/get-reactive> * <http://www.grokkingandroid.com/rxjavas-side-effect-methods/> * <http://colintheshots.com/blog/?p=85/>