Posted by & filed under Android.

In this section we will attempt to produce the same application as in Part 1, but using RxJava. If you’re not familiar with RxJava, I highly recommend watching a Netflix presentation on the topic. I’ll assume you’re familiar with the basics of the library.

As a reminder, we’re exploring different ways of communicating between threads in a way to account for application state changes. The source code of the demo app is available on github, so feel free to load it into Android Studio and play around with it.

Outside the core RxJava there is a library specifically for Android that allows to specify a thread to use for observation of events. This makes it easier to generate events on a background thread, but act upon them on a UI thread. We declare it as a dependency in the Gradle build script:

compile group: 'com.netflix.rxjava', name: 'rxjava-android', version: '0.15.1'

The rest of the application is very similar to what was developed in Part 1. Let’s go through the key differences.

When we create the view in DownloadFragment, we create a clickObservable:

// Create an observable, and set up OnClickListeners to post to it
Observable<ClickEvent> clickObservable = Observable.create(
     new Observable.OnSubscribeFunc<ClickEvent>() {
           @Override
           public Subscription onSubscribe(final Observer<? super ClickEvent> observer) {
                handleReset = new View.OnClickListener() {
                      @Override
                      public void onClick(View v) {
                          observer.onNext(new ResetEvent(v));
                      }
                };
                handleDownload = new View.OnClickListener() {
                      @Override
                      public void onClick(View v) {
                          observer.onNext(new DownloadStartEvent(v));
                      }
                };
                handlePause = new View.OnClickListener() {
                      @Override
                      public void onClick(View v) {
                          observer.onNext(new DownloadPauseEvent(v));
                      }
                };
                handleResume = new View.OnClickListener() {
                      @Override
                      public void onClick(View v) {
                          observer.onNext(new DownloadResumeEvent(v));
                      }
                };

                Button resetButton = ((Button) rootView.findViewById(R.id.resetButton));
                resetButton.setOnClickListener(handleReset);
                Button downloadButton = ((Button) rootView.findViewById(R.id.downloadButton));
                downloadButton.setOnClickListener(handleDownload);

                /**
                 * Restore state of the views based on the fragment instance state
                 * If not done, the center button stays in "download" state that
                 * the view is initialized with
                 */
                if (downloadThread != null) {
                       if (downloadThread.isRunning() && !downloadThread.isKilled()) {
                             switchToPause(downloadButton);
                       } else if (!downloadThread.isRunning() && !downloadThread.isKilled()) {
                             switchToResume(downloadButton);
                       }
                }

                return Subscriptions.empty();
          }
     });

In this block we define the OnClickListeners that post events to the observable upon a click on the button. As in Part 1, you wouldn’t normally do this, but I’m taking the example to an extreme to see how it would look if the main mode of communication were event driven. To post the click event we call the onNext method on the observer.

There is some logic to restore the state. If a device gets rotated, the DownloadFragment instance is retained (because we called setRetainInstance(true)), but the activity holding the fragment is recreated. After the fragment is reattached to the new activity, its view is recreated. This can happen in the process of download going on, so the views need to be returned to the state they’re expected to be in.

One other thing to notice is that the clickObservable is of type Observable<ClickEvent>. Therefore all events we can post to it need to either be ClickEvent, or subclass it. In order to subscribe to this stream we have to explicitly declare that we accept events of type ClickEvent. This provides some compile-time safety, but at the same time makes it tougher for one subscriber to accept different types of events.

Next we create a subscription from the clickObservable we just created:

clickSubscription = AndroidObservable.fromFragment(this, clickObservable)
      .subscribeOn(AndroidSchedulers.mainThread())
      .subscribe(this);

The DownloadFragment gets subscribed to the clickObservable, but before it does we subscribe on the Android UI thread using AndroidSchedulers.mainThread(). This makes it safe for us to modify UI views. The reason we can subscribe DownloadFragment, is that it implements Observer interface with the following methods:

@Override
public void onCompleted() {
    // ignore
}

@Override
public void onError(Throwable throwable) {
    Log.e(DownloadActivity.class.toString(), "Got an error from the Observable", throwable);
}

@Override
public void onNext(ClickEvent clickEvent) {
    if (clickEvent instanceof ResetEvent) {
        answerReset((ResetEvent) clickEvent);
    }
    if (clickEvent instanceof DownloadResumeEvent) {
        answerDownloadResume((DownloadResumeEvent) clickEvent);
    }
    if (clickEvent instanceof DownloadPauseEvent) {
        answerDownloadPause((DownloadPauseEvent) clickEvent);
    }
    if (clickEvent instanceof DownloadStartEvent) {
        answerDownloadStart((DownloadStartEvent) clickEvent);
    }
}

onCompleted can be ignored in this case because we set up an infinite stream. onError simply logs the error that is received, because we’re not expecting any. onNext is where we process the received event by matching it against expected classes. Based on the type of the event object we receive, we call the appropriate method. For example, here is how we handle download start request:

public void answerDownloadStart(DownloadStartEvent event) {
    downloadThread = new Downloader(urlEditText.getText().toString());
    downloadThread.start();

    // Subscribe to the progress observable
    // Sample the stream every 30 milliseconds (ignore events in between)
    // Upon receiving an event, update the views
    downloadThread.getProgressObservable()
           .sample(30, java.util.concurrent.TimeUnit.MILLISECONDS)
           .observeOn(AndroidSchedulers.mainThread())
           .subscribe(new Action1<DownloadProgressEvent>() {
                @Override
                public void call(DownloadProgressEvent event) {
                    progressBar.setProgress((int) event.getProgress());
                    downloadProgress.setText(String.format("%s / %s",
                          event.getLoadedBytes(), event.getTotalBytes()));
                }
            });
    switchToPause(((Button) event.getView()));
}

Here we create a new Downloader thread, and start it. We then get a progress observable from it. The observable is a stream of DownloadProgressEvents. We have to observe the stream on the UI thread because we want to modify the views to notify the user of the download progress, but we don’t want to do too much work on the main thread, so the first modification we run to the stream is to sample it every 30 milliseconds. Sample, just one of the many operations one can call on an observable, processes one event per specified time period, and ignores all others that might have come in. We then observe the sampled stream on the Android main UI thread, and subscribe an anonymous inner class to update progress views upon receiving an event.

How do we get this progress observable in the first place? Let’s look in the Downloader:

public Downloader(String url) {
   this.url = url;
   // Seed the event stream with the first event.
   progressSubject = BehaviorSubject.create(new DownloadProgressEvent(0, 0));
}

Right in the constructor we create a BehaviorSubject. Its one of the Subjects provided by RxJava. In this case at the point that we subscribe to it, we get the latest event.

The rest of the code of the Downloader is exactly the same as in the first part, except for reporting progress:

private void reportProgress() {
    progressSubject.onNext(new DownloadProgressEvent(loadedSize, totalSize));
}

We simply call onNext on the BehaviorSubject with the DownloadProgressEvent.

One thing you may notice in testing the application is that Reset doesn’t clear the progress views. Actually, it does, but because the communication is asynchronous, additional progress events may come in after the download has been reset.

To summarize:

  • RxJava is excellent for streaming events.
  • The library is well supported on Android with Android specific bindings.

RxJava is best used for cases where you have streams from the application – results coming from the network, sensor streams, motion events, and so on. Two way communication is not easy to set up with RxJava. But there is a framework for asynchronous message passing that may make it easier. I will cover it in the next installment of this series.

One Response to “Reactive Android, Part 2”

  1. terry turner

    this fixes a race condition in the reset button processing
    Downloader.java
    - if (killed && outFile.exists()) {
    + if (killed) {
    + loadedSize = 0;
    + reportProgress();
    +
    + if (outFile.exists())
    outFile.delete();

Leave a Reply

  • (will not be published)