Posted by & filed under Programming.

I was very fortunate to attend Scala Days 2015 in San Francisco this week. It was incredible to talk to the people behind the language, and to get a glimpse of a wide ecosystem. I wanted to post a short recap of some of my thoughts and feelings after the conference.

20150317_140004

Martin Odersky delivered the first keynote on Monday, and described how Scala is evolving – specific changes to the language, solution to binary compatibility problem (intermediate TASTY format), and more. What he clearly conveyed, both through jokes and detailed plans, is that Scala is a unique language, with its own rules and limitations. It is not, and will not become “Hascalator”, despite many in the community demonstrating the benefits of a more strict functional language. At the same time, the language keeps evolving, while becoming more stable for dependable use. This process was further detailed by Grzegorz Kossakowski, who explained how the scala build process works, and how features get implemented into the language.

Bill Venners told a very interesting history lesson on some failed projects, and showed how he decided against including a feature into Scalatest because of API complexity it would create, as well as compile speed issues. Reframing the problem differently allowed for the creation of the SuperSafe compiler plugin, enabling the feature without performance or complexity cost.

20150317_145954

Josh Suereth and Eugene Yokota covered a history of build tools and lead up to SBT 1.0. It was great to hear of the influencers on SBT, starting from Make, and ending with Google Blaze. It’s very interesting how SBT server changes the build process, and it will be interesting to see what functionality it enables.

A theme I found extremely interesting, was the use of sun.misc.Unsafe with Scala to manage off-heap memory. Using unsafe allows to bypass JVM garbage collection, and manage the chunk of memory directly, accepting the challenge and danger that it brings. Denys Shabalin covered a library that wraps Unsafe and give a Scala specific access API for memory, which he open sourced during his presentation. Meanwhile Ryan LeCompte covered a specific usecase that benefited from utilizing off-heap storage for fast analytical query processing using Akka Cluster. Similar ideas were echoed by Lee Mighdoll while covering Sparkle, and how it maintains asynchronous arrays of batches of records flowing to keep the visualizer responsive.

All sessions will be posted on parleys.com soon, but I hope my short post has captured some of the atmosphere of the conference that can’t be conveyed in sessions alone. It will be interesting to watch the ideas presented evolve.

20150316_192230

Posted by & filed under Big Data.

Would you like to step through your Spark job in a debugger? These steps show you how to configure IntelliJ IDEA to allow just that.

Unlike a traditional Java or Scala application, Spark jobs expect to be run within a larger Spark application, that gives access to SparkContext. Your application interacts with the environment through the SparkContext object. Because of this constraint you can’t just launch your Spark job from the IDE and expect it to run correctly.

The steps I outline describe the process to launch a debugging session for a Scala application, however very similar steps can be applied to launching Java applications. PySpark applications are more involved in their interaction with the Spark cluster, so I doubt this procedure applies to Python.

First you want to have an application class that looks something like this:

object Test {
  def main(args: Array[String]): Unit = {
    // Local
    val spark = new SparkContext(new SparkConf()
      .setMaster("local").setAppName("Test")
    )
    spark.setCheckpointDir("/tmp/spark/")

    println("-------------Attach debugger now!--------------")
    Thread.sleep(8000)

    // Your job code here, with breakpoints set on the lines you want to pause
  }
}

Now you want to get this job to the local cluster. First package the job and all its dependencies into a fat JAR

$ sbt assembly

Next submit it to a local cluster. You need to have spark-submit script somewhere on your system:

$ SPARK_JAVA_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005
$ /path/to/spark/bin/spark-submit --class Test --master local[4] --driver-memory 4G --executor-memory 4G /path/totarget/scala-2.10/project-assembly-0.0.1.jar

First line exports a Java argument that will be used to start Spark with the debugger. --class needs to point to a fully qualified class path to your job. Finally give it the path to the fat jar assembled in the previous command.

If you run that command it will execute your job without breaking at the breakpoints. Now we need to configure IntelliJ to connect to the cluster. This process is detailed in the official IDEA documentation. If you just create a default “Remote” Run/Debug configuration, and leave default port of 5005, it should work fine. Now when you submit the job again, and see the message to attach the debugger, you have 8 seconds to switch to IntelliJ IDEA and trigger this run configuration. The program will then continue to execute and pause at any breakpoint you defined. You can then step through it like any normal Scala/Java program. You can even step into Spark functions to see what its doing under the hood.

Hopefully this helps, as I found it very useful in debugging serialization errors and other difficult to trace issues. Similar process could potentially be applied to debugging a job on the cluster, although you would only be able to debug the code that runs in the driver, not the executor.

Posted by & filed under Android.

I recently upgraded my Android toolchain to the official Android Studio 1.0. I’ve set up a new project, and as with any Android app, included Robolectric to allow for easy unit testing. I was able to configure Gradle build to execute the tests fairly easily using existing documentation on Robolectric project. To my surprise however, I wasn’t able to launch unit tests using the standard IntelliJ JUnit run configuration. This post details my workaround, hopefully temporary until Google fixes the issue in Android Studio.

To start, create a JUnit configuration, as detailed on the Jetbrains site. If you run it now, you’ll face “!!! JUnit version 3.8 or later expected” error. This occurs no matter which JUnit dependency you define in your Gradle file, as Android bundles an earlier version that gets precedence in the classpath. The solution in the past was to move the JUnit dependency in the project configuration or *.iml file up above the Android SDK. This is no longer an option in Android Studio 1.0. So the remaining solution is to modify the classpath used by the run configuration.

In order to do that, copy the run command that was used to launch JUnit tests. It is the first line of output when you run the JUnit run configuration. Get rid of everything except -classpath "..." argument. The argument value is a colon-separated (:) list of files and directories where java looks for classes. In this value, locate the JUnit dependency (either junit, or junit-dep), and make it the first path in the list. Now paste this new argument in the “VM Options” field of JUnit run configuration.

2015-01-18_2155

We’re still not done, as running now will result in java not finding the class file for your test. To resolve this, create a new Gradle run configuration. Point it to your app, and enter testDebugClasses. Call it something memorable (I named it “Prepare Robolectric”), and go back to JUnit run configuration. In the “Before Launch” list add Another Action, and choose the Gradle task.

The Gradle task will compile test sources required for JUnit, and place them in app/build/test-classes/. The last step is to add the absolute path to this directory to the classpath in the JUnit configuration (at the end of the list). You should now be able to run unit tests right from Android Studio.

This presentation was very helpful in configuring Android Studio to run tests. Refer to it for more information and visual screenshots. If you have a better way of configuring unit tests, I’d love to hear it!

Posted by & filed under Big Data.

Its easy to get started with Apache Spark. You can get a template for a Scala job using the Typesafe Activator and have it running on a local cluster with a small dataset. You can also use a handy script spark_ec2 to launch an EC2 cluster as detailed in Running Spark on EC2 document. You could then log into the new cluster and launch up a PySpark or Scala shell and use it to explore Spark API.

Running a production Spark cluster is not as trivial though. In this post I’ll outline some additional steps beyond what is documented to run a simple job on an ephemeral EC2 cluster in a manner similar to how one would use EMR. Here are some additional tips based on my experience

Provide S3 credentials

A typical EMR run starts with input data set resting on S3, with EC2 instances spun up to perform the analysis and store the results back into S3. Spark 1.1 and below install without support for S3 URLs. To provide it, modify file /root/ephemeral-hdfs/conf/core-site.xml, and add the following elements to the XML configuration:

  
  <property>
    <name>fs.s3n.awsAccessKeyId</name>
    <value>{accesskey}</value>
  </property>
  <property>
    <name>fs.s3n.awsSecretAccessKey</name>
    <value>{secret_key}</value>
  </property>

Now you can load in S3 data sources with simple statements:

val data = spark.textFile("s3n://bucket/key/*")

Change temporary files location

By default Spark keeps temporary files in /tmp. This directory is mapped to an in-memory partition on the EC2 instances, so can fill up fairly quickly. Additionally, if you checkpoint large results of your computations, you need provide a larger drive to the instances that you start up.
Start your cluster with extra option --ebs-vol-size 100 to attach additional EBS disk. Then in your job code, use the new partition that has been attached to /vol/

val conf = new SparkConf()
  .set("spark.local.dir", "/vol/")
val spark = new SparkContext(conf)
spark.setCheckpointDir("/vol/")

Use sudo to submit jobs

Finally you want to submit a job to your cluster. Generate a fat jar by running sbt assembly in your Scala job project. Upload the JAR to the master instance, using SCP. Login to your cluster and use spark_submit script to launch the job:

sudo /root/spark/bin/spark-submit --master spark://localhost:7077 --class MyApp --name MyApp /root/MyApp-1.0.0.jar arg0 arg1 arg2...

Make sure to call it using sudo to avoid failures due to missing permissions.

Some future improvements can be made to the way the Spark cluster is being launched. The spark_ec2 script works well for small clusters under 10 nodes, but becomes unstable for larger clusters. A failure of one machine to spin up correctly (not unusual on EC2) will fail the entire launch, and force you to destroy the cluster and start over. Furthermore, the entire script is very sequential, while machine configuration would ideally happen in parallel. There is no solution to these issues I’m aware of at the moment, but I’m exploring some options. Stay tuned!

If there are any other things you had to do to to get your job running, feel free to note it in the comments.

Posted by & filed under Android.

In Parts 1 and 2 of this series we examined different methods of creating a reactive application on Android, where events (either UI events originating from the application user, or background events generated by the system) trigger certain actions. We examined using an event bus and Observables in the context of a demo application, a simple file downloader. In this post I will examine how you construct an Android application using an actor system.

Actor model of managing concurrency comes from Erlang. Akka provides Actor implementation in Java and Scala. I will omit detailed descriptions of actors and Akka, but if you would like an introduction to the topic I’ve listed some useful presentations below.

WARNING: The method described below is only for demonstration purposes, and should not be used in production. Akka framework has been optimized for use on server infrastructure, and is not a good fit on mobile for various reasons. That said, the actor model is in my opinion a good fit for structuring mobile applications, and assuming it gets support from a library it can be a significant improvement to the way we write mobile applications in the new world of multicore pocket-sized devices.

The code for the demo application is available on github. This application is written in Scala to make interaction with Akka less verbose, but if you are unfamiliar with the language don’t let that turn you away. Don’t focus too much on the syntax, just follow the concepts. If you want to try to build the application, the README file should provide the instructions to run it on your device or emulator.

All the code we need is packed into DownloadActivity.scala and Downloader.scala. There are a couple other files in the package that assist us. DownloadApplication.scala contains the custom application that initializes the actor system on start. AndroidDispatcher.scala is a custom Akka dispatcher that allows specified Actors to always run on the UI thread. The configuration for the akka system is in res/raw/akka, and describes the dispatchers. These are not optimal settings, only enough for the system to work.

Let’s now dig into DownloadActivity. As before, its just a container for the DownloadFragment. The only thing to notice is that the Activity extends TypedActivity, which is generated by the scala build tool plugin to create a scala-friendly development environment. The TypedActivity gives us easy access to typed views. When writing a Java Android app, you have to cast anything returned by Activity.findViewById from View to the type you want to work with (ImageView, Button, TextView, etc). But that information is available in the XML resources! So instead of losing it, the sbt plugin creates a mapping, and gives you the view of the correct type when you request it.

DownloadFragment houses all of our UI. The first method in the class may seem a bit cryptic if you haven’t worked with scala:

// Implicit conversion that lets us use anonymous functions as onClickListeners
implicit def onClickListener(f: (View => Unit)): View.OnClickListener = {
  new View.OnClickListener() {
    override def onClick(v: View) {
      f(v)
    }
  }
}

As the comment says, this sets up an implicit conversion allowing us to use lambda functions directly on buttons as click listener actions. The fact that it is implicit means the compiler will look for it whenever it needs a View.OnClickListener, but instead is given an anonymous function. The parameter to the method is a higher order function from View to Unit (scala’s equivalent of void, ie no return value). The method then returns a new instance of the View.OnClickListener with onClick method calling the function it was passed in. Jumping further down, it allows us to use simple functions in setOnClickListener calls:

rootView.findView(TR.downloadButton).setOnClickListener((v: View) => {
  downloadButtonActor.tell(ClickEvent(v.asInstanceOf[Button]), fragmentCompanion)
})
rootView.findView(TR.resetButton).setOnClickListener((v: View) => {
  resetButtonActor.tell(ClickEvent(v.asInstanceOf[Button]), fragmentCompanion)
})

We’ll come back to the actions attached to the buttons in a little bit.

Right at initialization DownloadFragment creates a few actors it will need:

val system = getApplication().asInstanceOf[DownloadApplication].actorSystem
val downloadButtonActor = system.actorOf(Props[DownloadButton]()
    .withDispatcher("akka.actor.main-thread"))
val resetButtonActor = system.actorOf(Props[ResetButton]
    .withDispatcher("akka.actor.main-thread"))
val fragmentCompanion = system.actorOf(Props(new DownloadFragmentActor())
    .withDispatcher("akka.actor.main-thread"))
val downloader = system.actorOf(Props(new Downloader()), "downloader")

system.actorOf gives an actor of type supplied in Props configuration. For some actors that do operations on the UI thread we also call Props.withDispatcher giving our custom dispatcher that pins the actor to the UI thread. Other actors (ex: the Downloader) should run on another thread, and not block the UI thread.

I also decided to create a Fragment companion actor to handle the events, defined as follows:

class DownloadFragmentActor extends Actor {
  def receive = {
    case event: DownloadProgressEvent => updateProgress(event)
    case DownloadFinishedEvent => {
      // Nothing that we need to do
    }
    case TriggerStartDownloadEvent => {
      downloader ! StartDownload(findView(TR.urlEditText).getText.toString)
    }
    case TriggerPauseDownloadEvent => downloader ! PauseEvent
    case TriggerResumeDownloadEvent => downloader ! ResumeEvent
    case TriggerResetDownloadEvent => {
      updateProgress(DownloadProgressEvent(0, 0))
      downloadButtonActor ! ResetButtonState(findView(TR.downloadButton))
      downloader ! ResetEvent
    }
  }
  
  def updateProgress(progress: DownloadProgressEvent) {
    // View elements are wrapped in an Option, so foreach will
    // apply only to ones that are not None/null.
    progressBar foreach (_.setProgress(progress.getProgress))
    downloadProgressTextView foreach (_.setText(String.format("%s / %s",
      progress.getLoadedBytes, progress.getTotalBytes)))
  }
}

Actors define the receive function which gets called when a message arrives. In it we match the message type using scala’s pattern matching – switch statement on steroids. Thanks to this ability we can define the messages by case classes or case objects (in this case, singleton instantiations where a case class would be without parameters). I won’t go into the details of specific message handling just yet, let’s look through the rest of the application:

class DownloadButton extends Actor with ActorLogging {
  def receive = download

  val download: Receive = {
    case ClickEvent(v: Button) => {
      sender ! TriggerStartDownloadEvent
      v.setText("Pause")
      context.become(pause)
    }
    case RestoreButtonState(v: Button) => v.setText("Download")
    case ResetButtonState(v: Button) => performReset(v)
  }

  val pause: Receive = {
    case ClickEvent(v: Button) => {
      sender ! TriggerPauseDownloadEvent
      v.setText("Resume")
      context.become(resume)
    }
    case RestoreButtonState(v: Button) => v.setText("Pause")
    case ResetButtonState(v: Button) => performReset(v)
  }

  val resume: Receive = {
    case ClickEvent(v: Button) => {
      sender ! TriggerResumeDownloadEvent
      v.setText("Pause")
      context.become(pause)
    }
    case RestoreButtonState(v: Button) => v.setText("Resume")
    case ResetButtonState(v: Button) => performReset(v)
  }

  def performReset(v: Button) {
    v.setText("Download")
    context.become(download)
  }
}

DownloadButton is an actor companion for the UI button view. It captures the button state by using context.become() and handling the restore events (by setting appropriate text upon rotation).

The download logic has been created into an actor in Downloader.scala. (Side note, Scala doesn’t have the same restrictions as Java with regards to placement of classes in files, so multiple classes are frequently found in one file. This sometimes makes them hard to find, so make sure not to abuse this feature). The Downloader actor has two states – awaiting download, and downloading. When awaiting download, it accepts a start download message that places it into the downloading state. In it, a Future is kicked off to perform the download on a separate thread, while still accepting messages to control the process (pause, resume, reset). As before the download here represents a process you would like to happen in the background detached from your application UI, since you never block the UI thread.

Let’s now trace through a usecase and see the message flow in the actor system that’s created.

When you click on the Download button, a ClickEvent is sent to the DownloadButton actor. Since we’re coming from outside the akka system we provide the sender of the message to be the DownloadFragmentActor companion:
downloadButtonActor.tell(ClickEvent(v.asInstanceOf[Button]), fragmentCompanion)

(Side note, infix operator (dot) is optional in Scala, and exclamation point is a synonym for the tell method, so above can be written as downloadButtonActor ! (ClickEvent(v.asInstanceOf[Button]), fragmentCompanion)).

Next, the event is handled in the download method on the DownloadButton. That generates a TriggerStartDownloadEvent that gets sent to the sender, which we defined to be the DownloadFragmentActor. There a StartDownload message gets sent to the downloader, with the URL extracted from the EditText view. In the downloader, the download is triggered, and the Actor switches into the awaitingDownload functionality to properly handle pause/resume/reset events. I invite you to do the same exercise with other messages to see how they propagate through the hierarchy.

I hope with this brief introduction you can see advantages to using an actor model for designing your application. To summarize the actor system use on Android:

  • Abstractions over state and concurrency handling, pushing the details into the framework and out of your code.
  • Limited support at this time in the Android ecosystem.

This post concludes the series, but the topic is certainly far from complete. Are there any methods you like to use to create Android applications in a reactive fashion? If so, please share!

Resources: