Posted by & filed under Big Data.

Would you like to step through your Spark job in a debugger? These steps show you how to configure IntelliJ IDEA to allow just that.

Unlike a traditional Java or Scala application, Spark jobs expect to be run within a larger Spark application, that gives access to SparkContext. Your application interacts with the environment through the SparkContext object. Because of this constraint you can’t just launch your Spark job from the IDE and expect it to run correctly.

The steps I outline describe the process to launch a debugging session for a Scala application, however very similar steps can be applied to launching Java applications. PySpark applications are more involved in their interaction with the Spark cluster, so I doubt this procedure applies to Python.

First you want to have an application class that looks something like this:

object Test {
  def main(args: Array[String]): Unit = {
    // Local
    val spark = new SparkContext(new SparkConf()
      .setMaster("local").setAppName("Test")
    )
    spark.setCheckpointDir("/tmp/spark/")

    println("-------------Attach debugger now!--------------")
    Thread.sleep(8000)

    // Your job code here, with breakpoints set on the lines you want to pause
  }
}

Now you want to get this job to the local cluster. First package the job and all its dependencies into a fat JAR

$ sbt assembly

Next submit it to a local cluster. You need to have spark-submit script somewhere on your system:

$ export SPARK_JAVA_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005
$ /path/to/spark/bin/spark-submit --class Test --master local[4] --driver-memory 4G --executor-memory 4G /path/totarget/scala-2.10/project-assembly-0.0.1.jar

First line exports a Java argument that will be used to start Spark with the debugger. --class needs to point to a fully qualified class path to your job. Finally give it the path to the fat jar assembled in the previous command.

If you run that command it will execute your job without breaking at the breakpoints. Now we need to configure IntelliJ to connect to the cluster. This process is detailed in the official IDEA documentation. If you just create a default “Remote” Run/Debug configuration, and leave default port of 5005, it should work fine. Now when you submit the job again, and see the message to attach the debugger, you have 8 seconds to switch to IntelliJ IDEA and trigger this run configuration. The program will then continue to execute and pause at any breakpoint you defined. You can then step through it like any normal Scala/Java program. You can even step into Spark functions to see what its doing under the hood.

Hopefully this helps, as I found it very useful in debugging serialization errors and other difficult to trace issues. Similar process could potentially be applied to debugging a job on the cluster, although you would only be able to debug the code that runs in the driver, not the executor.

Posted by & filed under Android.

I recently upgraded my Android toolchain to the official Android Studio 1.0. I’ve set up a new project, and as with any Android app, included Robolectric to allow for easy unit testing. I was able to configure Gradle build to execute the tests fairly easily using existing documentation on Robolectric project. To my surprise however, I wasn’t able to launch unit tests using the standard IntelliJ JUnit run configuration. This post details my workaround, hopefully temporary until Google fixes the issue in Android Studio.

To start, create a JUnit configuration, as detailed on the Jetbrains site. If you run it now, you’ll face “!!! JUnit version 3.8 or later expected” error. This occurs no matter which JUnit dependency you define in your Gradle file, as Android bundles an earlier version that gets precedence in the classpath. The solution in the past was to move the JUnit dependency in the project configuration or *.iml file up above the Android SDK. This is no longer an option in Android Studio 1.0. So the remaining solution is to modify the classpath used by the run configuration.

In order to do that, copy the run command that was used to launch JUnit tests. It is the first line of output when you run the JUnit run configuration. Get rid of everything except -classpath "..." argument. The argument value is a colon-separated (:) list of files and directories where java looks for classes. In this value, locate the JUnit dependency (either junit, or junit-dep), and make it the first path in the list. Now paste this new argument in the “VM Options” field of JUnit run configuration.

2015-01-18_2155

We’re still not done, as running now will result in java not finding the class file for your test. To resolve this, create a new Gradle run configuration. Point it to your app, and enter testDebugClasses. Call it something memorable (I named it “Prepare Robolectric”), and go back to JUnit run configuration. In the “Before Launch” list add Another Action, and choose the Gradle task.

The Gradle task will compile test sources required for JUnit, and place them in app/build/test-classes/. The last step is to add the absolute path to this directory to the classpath in the JUnit configuration (at the end of the list). You should now be able to run unit tests right from Android Studio.

This presentation was very helpful in configuring Android Studio to run tests. Refer to it for more information and visual screenshots. If you have a better way of configuring unit tests, I’d love to hear it!

Posted by & filed under Big Data.

Its easy to get started with Apache Spark. You can get a template for a Scala job using the Typesafe Activator and have it running on a local cluster with a small dataset. You can also use a handy script spark_ec2 to launch an EC2 cluster as detailed in Running Spark on EC2 document. You could then log into the new cluster and launch up a PySpark or Scala shell and use it to explore Spark API.

Running a production Spark cluster is not as trivial though. In this post I’ll outline some additional steps beyond what is documented to run a simple job on an ephemeral EC2 cluster in a manner similar to how one would use EMR. Here are some additional tips based on my experience

Provide S3 credentials

A typical EMR run starts with input data set resting on S3, with EC2 instances spun up to perform the analysis and store the results back into S3. Spark 1.1 and below install without support for S3 URLs. To provide it, modify file /root/ephemeral-hdfs/conf/core-site.xml, and add the following elements to the XML configuration:

  
  
    fs.s3n.awsAccessKeyId
    {accesskey}
  
  
    fs.s3n.awsSecretAccessKey
    {secret_key}
  

Now you can load in S3 data sources with simple statements:

val data = spark.textFile("s3n://bucket/key/*")

Change temporary files location

By default Spark keeps temporary files in /tmp. This directory is mapped to an in-memory partition on the EC2 instances, so can fill up fairly quickly. Additionally, if you checkpoint large results of your computations, you need provide a larger drive to the instances that you start up.
Start your cluster with extra option --ebs-vol-size 100 to attach additional EBS disk. Then in your job code, use the new partition that has been attached to /vol/

val conf = new SparkConf()
  .set("spark.local.dir", "/vol/")
val spark = new SparkContext(conf)
spark.setCheckpointDir("/vol/")

Use sudo to submit jobs

Finally you want to submit a job to your cluster. Generate a fat jar by running sbt assembly in your Scala job project. Upload the JAR to the master instance, using SCP. Login to your cluster and use spark_submit script to launch the job:

sudo /root/spark/bin/spark-submit --master spark://localhost:7077 --class MyApp --name MyApp /root/MyApp-1.0.0.jar arg0 arg1 arg2...

Make sure to call it using sudo to avoid failures due to missing permissions.

Some future improvements can be made to the way the Spark cluster is being launched. The spark_ec2 script works well for small clusters under 10 nodes, but becomes unstable for larger clusters. A failure of one machine to spin up correctly (not unusual on EC2) will fail the entire launch, and force you to destroy the cluster and start over. Furthermore, the entire script is very sequential, while machine configuration would ideally happen in parallel. There is no solution to these issues I’m aware of at the moment, but I’m exploring some options. Stay tuned!

If there are any other things you had to do to to get your job running, feel free to note it in the comments.

Posted by & filed under Android.

In Parts 1 and 2 of this series we examined different methods of creating a reactive application on Android, where events (either UI events originating from the application user, or background events generated by the system) trigger certain actions. We examined using an event bus and Observables in the context of a demo application, a simple file downloader. In this post I will examine how you construct an Android application using an actor system.

Actor model of managing concurrency comes from Erlang. Akka provides Actor implementation in Java and Scala. I will omit detailed descriptions of actors and Akka, but if you would like an introduction to the topic I’ve listed some useful presentations below.

WARNING: The method described below is only for demonstration purposes, and should not be used in production. Akka framework has been optimized for use on server infrastructure, and is not a good fit on mobile for various reasons. That said, the actor model is in my opinion a good fit for structuring mobile applications, and assuming it gets support from a library it can be a significant improvement to the way we write mobile applications in the new world of multicore pocket-sized devices.

The code for the demo application is available on github. This application is written in Scala to make interaction with Akka less verbose, but if you are unfamiliar with the language don’t let that turn you away. Don’t focus too much on the syntax, just follow the concepts. If you want to try to build the application, the README file should provide the instructions to run it on your device or emulator.

All the code we need is packed into DownloadActivity.scala and Downloader.scala. There are a couple other files in the package that assist us. DownloadApplication.scala contains the custom application that initializes the actor system on start. AndroidDispatcher.scala is a custom Akka dispatcher that allows specified Actors to always run on the UI thread. The configuration for the akka system is in res/raw/akka, and describes the dispatchers. These are not optimal settings, only enough for the system to work.

Let’s now dig into DownloadActivity. As before, its just a container for the DownloadFragment. The only thing to notice is that the Activity extends TypedActivity, which is generated by the scala build tool plugin to create a scala-friendly development environment. The TypedActivity gives us easy access to typed views. When writing a Java Android app, you have to cast anything returned by Activity.findViewById from View to the type you want to work with (ImageView, Button, TextView, etc). But that information is available in the XML resources! So instead of losing it, the sbt plugin creates a mapping, and gives you the view of the correct type when you request it.

DownloadFragment houses all of our UI. The first method in the class may seem a bit cryptic if you haven’t worked with scala:

// Implicit conversion that lets us use anonymous functions as onClickListeners
implicit def onClickListener(f: (View => Unit)): View.OnClickListener = {
  new View.OnClickListener() {
    override def onClick(v: View) {
      f(v)
    }
  }
}

As the comment says, this sets up an implicit conversion allowing us to use lambda functions directly on buttons as click listener actions. The fact that it is implicit means the compiler will look for it whenever it needs a View.OnClickListener, but instead is given an anonymous function. The parameter to the method is a higher order function from View to Unit (scala’s equivalent of void, ie no return value). The method then returns a new instance of the View.OnClickListener with onClick method calling the function it was passed in. Jumping further down, it allows us to use simple functions in setOnClickListener calls:

rootView.findView(TR.downloadButton).setOnClickListener((v: View) => {
  downloadButtonActor.tell(ClickEvent(v.asInstanceOf[Button]), fragmentCompanion)
})
rootView.findView(TR.resetButton).setOnClickListener((v: View) => {
  resetButtonActor.tell(ClickEvent(v.asInstanceOf[Button]), fragmentCompanion)
})

We’ll come back to the actions attached to the buttons in a little bit.

Right at initialization DownloadFragment creates a few actors it will need:

val system = getApplication().asInstanceOf[DownloadApplication].actorSystem
val downloadButtonActor = system.actorOf(Props[DownloadButton]()
    .withDispatcher("akka.actor.main-thread"))
val resetButtonActor = system.actorOf(Props[ResetButton]
    .withDispatcher("akka.actor.main-thread"))
val fragmentCompanion = system.actorOf(Props(new DownloadFragmentActor())
    .withDispatcher("akka.actor.main-thread"))
val downloader = system.actorOf(Props(new Downloader()), "downloader")

system.actorOf gives an actor of type supplied in Props configuration. For some actors that do operations on the UI thread we also call Props.withDispatcher giving our custom dispatcher that pins the actor to the UI thread. Other actors (ex: the Downloader) should run on another thread, and not block the UI thread.

I also decided to create a Fragment companion actor to handle the events, defined as follows:

class DownloadFragmentActor extends Actor {
  def receive = {
    case event: DownloadProgressEvent => updateProgress(event)
    case DownloadFinishedEvent => {
      // Nothing that we need to do
    }
    case TriggerStartDownloadEvent => {
      downloader ! StartDownload(findView(TR.urlEditText).getText.toString)
    }
    case TriggerPauseDownloadEvent => downloader ! PauseEvent
    case TriggerResumeDownloadEvent => downloader ! ResumeEvent
    case TriggerResetDownloadEvent => {
      updateProgress(DownloadProgressEvent(0, 0))
      downloadButtonActor ! ResetButtonState(findView(TR.downloadButton))
      downloader ! ResetEvent
    }
  }
  
  def updateProgress(progress: DownloadProgressEvent) {
    // View elements are wrapped in an Option, so foreach will
    // apply only to ones that are not None/null.
    progressBar foreach (_.setProgress(progress.getProgress))
    downloadProgressTextView foreach (_.setText(String.format("%s / %s",
      progress.getLoadedBytes, progress.getTotalBytes)))
  }
}

Actors define the receive function which gets called when a message arrives. In it we match the message type using scala’s pattern matching – switch statement on steroids. Thanks to this ability we can define the messages by case classes or case objects (in this case, singleton instantiations where a case class would be without parameters). I won’t go into the details of specific message handling just yet, let’s look through the rest of the application:

class DownloadButton extends Actor with ActorLogging {
  def receive = download

  val download: Receive = {
    case ClickEvent(v: Button) => {
      sender ! TriggerStartDownloadEvent
      v.setText("Pause")
      context.become(pause)
    }
    case RestoreButtonState(v: Button) => v.setText("Download")
    case ResetButtonState(v: Button) => performReset(v)
  }

  val pause: Receive = {
    case ClickEvent(v: Button) => {
      sender ! TriggerPauseDownloadEvent
      v.setText("Resume")
      context.become(resume)
    }
    case RestoreButtonState(v: Button) => v.setText("Pause")
    case ResetButtonState(v: Button) => performReset(v)
  }

  val resume: Receive = {
    case ClickEvent(v: Button) => {
      sender ! TriggerResumeDownloadEvent
      v.setText("Pause")
      context.become(pause)
    }
    case RestoreButtonState(v: Button) => v.setText("Resume")
    case ResetButtonState(v: Button) => performReset(v)
  }

  def performReset(v: Button) {
    v.setText("Download")
    context.become(download)
  }
}

DownloadButton is an actor companion for the UI button view. It captures the button state by using context.become() and handling the restore events (by setting appropriate text upon rotation).

The download logic has been created into an actor in Downloader.scala. (Side note, Scala doesn’t have the same restrictions as Java with regards to placement of classes in files, so multiple classes are frequently found in one file. This sometimes makes them hard to find, so make sure not to abuse this feature). The Downloader actor has two states – awaiting download, and downloading. When awaiting download, it accepts a start download message that places it into the downloading state. In it, a Future is kicked off to perform the download on a separate thread, while still accepting messages to control the process (pause, resume, reset). As before the download here represents a process you would like to happen in the background detached from your application UI, since you never block the UI thread.

Let’s now trace through a usecase and see the message flow in the actor system that’s created.

When you click on the Download button, a ClickEvent is sent to the DownloadButton actor. Since we’re coming from outside the akka system we provide the sender of the message to be the DownloadFragmentActor companion:
downloadButtonActor.tell(ClickEvent(v.asInstanceOf[Button]), fragmentCompanion)

(Side note, infix operator (dot) is optional in Scala, and exclamation point is a synonym for the tell method, so above can be written as downloadButtonActor ! (ClickEvent(v.asInstanceOf[Button]), fragmentCompanion)).

Next, the event is handled in the download method on the DownloadButton. That generates a TriggerStartDownloadEvent that gets sent to the sender, which we defined to be the DownloadFragmentActor. There a StartDownload message gets sent to the downloader, with the URL extracted from the EditText view. In the downloader, the download is triggered, and the Actor switches into the awaitingDownload functionality to properly handle pause/resume/reset events. I invite you to do the same exercise with other messages to see how they propagate through the hierarchy.

I hope with this brief introduction you can see advantages to using an actor model for designing your application. To summarize the actor system use on Android:

  • Abstractions over state and concurrency handling, pushing the details into the framework and out of your code.
  • Limited support at this time in the Android ecosystem.

This post concludes the series, but the topic is certainly far from complete. Are there any methods you like to use to create Android applications in a reactive fashion? If so, please share!

Resources:

Posted by & filed under Android.

In this section we will attempt to produce the same application as in Part 1, but using RxJava. If you’re not familiar with RxJava, I highly recommend watching a Netflix presentation on the topic. I’ll assume you’re familiar with the basics of the library.

As a reminder, we’re exploring different ways of communicating between threads in a way to account for application state changes. The source code of the demo app is available on github, so feel free to load it into Android Studio and play around with it.

Outside the core RxJava there is a library specifically for Android that allows to specify a thread to use for observation of events. This makes it easier to generate events on a background thread, but act upon them on a UI thread. We declare it as a dependency in the Gradle build script:

compile group: 'com.netflix.rxjava', name: 'rxjava-android', version: '0.15.1'

The rest of the application is very similar to what was developed in Part 1. Let’s go through the key differences.

When we create the view in DownloadFragment, we create a clickObservable:

// Create an observable, and set up OnClickListeners to post to it
Observable clickObservable = Observable.create(
     new Observable.OnSubscribeFunc() {
           @Override
           public Subscription onSubscribe(final Observer observer) {
                handleReset = new View.OnClickListener() {
                      @Override
                      public void onClick(View v) {
                          observer.onNext(new ResetEvent(v));
                      }
                };
                handleDownload = new View.OnClickListener() {
                      @Override
                      public void onClick(View v) {
                          observer.onNext(new DownloadStartEvent(v));
                      }
                };
                handlePause = new View.OnClickListener() {
                      @Override
                      public void onClick(View v) {
                          observer.onNext(new DownloadPauseEvent(v));
                      }
                };
                handleResume = new View.OnClickListener() {
                      @Override
                      public void onClick(View v) {
                          observer.onNext(new DownloadResumeEvent(v));
                      }
                };

                Button resetButton = ((Button) rootView.findViewById(R.id.resetButton));
                resetButton.setOnClickListener(handleReset);
                Button downloadButton = ((Button) rootView.findViewById(R.id.downloadButton));
                downloadButton.setOnClickListener(handleDownload);

                /**
                 * Restore state of the views based on the fragment instance state
                 * If not done, the center button stays in "download" state that
                 * the view is initialized with
                 */
                if (downloadThread != null) {
                       if (downloadThread.isRunning() && !downloadThread.isKilled()) {
                             switchToPause(downloadButton);
                       } else if (!downloadThread.isRunning() && !downloadThread.isKilled()) {
                             switchToResume(downloadButton);
                       }
                }

                return Subscriptions.empty();
          }
     });

In this block we define the OnClickListeners that post events to the observable upon a click on the button. As in Part 1, you wouldn’t normally do this, but I’m taking the example to an extreme to see how it would look if the main mode of communication were event driven. To post the click event we call the onNext method on the observer.

There is some logic to restore the state. If a device gets rotated, the DownloadFragment instance is retained (because we called setRetainInstance(true)), but the activity holding the fragment is recreated. After the fragment is reattached to the new activity, its view is recreated. This can happen in the process of download going on, so the views need to be returned to the state they’re expected to be in.

One other thing to notice is that the clickObservable is of type Observable<ClickEvent>. Therefore all events we can post to it need to either be ClickEvent, or subclass it. In order to subscribe to this stream we have to explicitly declare that we accept events of type ClickEvent. This provides some compile-time safety, but at the same time makes it tougher for one subscriber to accept different types of events.

Next we create a subscription from the clickObservable we just created:

clickSubscription = AndroidObservable.fromFragment(this, clickObservable)
      .subscribeOn(AndroidSchedulers.mainThread())
      .subscribe(this);

The DownloadFragment gets subscribed to the clickObservable, but before it does we subscribe on the Android UI thread using AndroidSchedulers.mainThread(). This makes it safe for us to modify UI views. The reason we can subscribe DownloadFragment, is that it implements Observer interface with the following methods:

@Override
public void onCompleted() {
    // ignore
}

@Override
public void onError(Throwable throwable) {
    Log.e(DownloadActivity.class.toString(), "Got an error from the Observable", throwable);
}

@Override
public void onNext(ClickEvent clickEvent) {
    if (clickEvent instanceof ResetEvent) {
        answerReset((ResetEvent) clickEvent);
    }
    if (clickEvent instanceof DownloadResumeEvent) {
        answerDownloadResume((DownloadResumeEvent) clickEvent);
    }
    if (clickEvent instanceof DownloadPauseEvent) {
        answerDownloadPause((DownloadPauseEvent) clickEvent);
    }
    if (clickEvent instanceof DownloadStartEvent) {
        answerDownloadStart((DownloadStartEvent) clickEvent);
    }
}

onCompleted can be ignored in this case because we set up an infinite stream. onError simply logs the error that is received, because we’re not expecting any. onNext is where we process the received event by matching it against expected classes. Based on the type of the event object we receive, we call the appropriate method. For example, here is how we handle download start request:

public void answerDownloadStart(DownloadStartEvent event) {
    downloadThread = new Downloader(urlEditText.getText().toString());
    downloadThread.start();

    // Subscribe to the progress observable
    // Sample the stream every 30 milliseconds (ignore events in between)
    // Upon receiving an event, update the views
    downloadThread.getProgressObservable()
           .sample(30, java.util.concurrent.TimeUnit.MILLISECONDS)
           .observeOn(AndroidSchedulers.mainThread())
           .subscribe(new Action1() {
                @Override
                public void call(DownloadProgressEvent event) {
                    progressBar.setProgress((int) event.getProgress());
                    downloadProgress.setText(String.format("%s / %s",
                          event.getLoadedBytes(), event.getTotalBytes()));
                }
            });
    switchToPause(((Button) event.getView()));
}

Here we create a new Downloader thread, and start it. We then get a progress observable from it. The observable is a stream of DownloadProgressEvents. We have to observe the stream on the UI thread because we want to modify the views to notify the user of the download progress, but we don’t want to do too much work on the main thread, so the first modification we run to the stream is to sample it every 30 milliseconds. Sample, just one of the many operations one can call on an observable, processes one event per specified time period, and ignores all others that might have come in. We then observe the sampled stream on the Android main UI thread, and subscribe an anonymous inner class to update progress views upon receiving an event.

How do we get this progress observable in the first place? Let’s look in the Downloader:

public Downloader(String url) {
   this.url = url;
   // Seed the event stream with the first event.
   progressSubject = BehaviorSubject.create(new DownloadProgressEvent(0, 0));
}

Right in the constructor we create a BehaviorSubject. Its one of the Subjects provided by RxJava. In this case at the point that we subscribe to it, we get the latest event.

The rest of the code of the Downloader is exactly the same as in the first part, except for reporting progress:

private void reportProgress() {
    progressSubject.onNext(new DownloadProgressEvent(loadedSize, totalSize));
}

We simply call onNext on the BehaviorSubject with the DownloadProgressEvent.

One thing you may notice in testing the application is that Reset doesn’t clear the progress views. Actually, it does, but because the communication is asynchronous, additional progress events may come in after the download has been reset.

To summarize:

  • RxJava is excellent for streaming events.
  • The library is well supported on Android with Android specific bindings.

RxJava is best used for cases where you have streams from the application – results coming from the network, sensor streams, motion events, and so on. Two way communication is not easy to set up with RxJava. But there is a framework for asynchronous message passing that may make it easier. I will cover it in the next installment of this series.