Posted by & filed under Big Data.

Apache Spark recently received top level support on Amazon Elastic MapReduce (EMR) cloud offering, joining applications such as Hadoop, Hive, Pig, HBase, Presto, and Impala. This is exciting for me, because most of my workloads run on EMR, and utilizing Spark required either standing up manual EC2 clusters, or using EMR bootstrap, which was very difficult to configure.

Documentation for running Spark on EMR is rather short, only in the form of the launch blog post and official support section (which is outdated already and contains errors). This post details my experience with attempting to run a batch Scala application with official Spark support on AMI 3.8.0.

Scala 2.10

Perhaps the biggest frustration, and complete surprise is the fact that the version of Spark installed is compiled with Scala version 2.10. That means whatever application you write has to be compiled against Scala 2.10 as well, otherwise you will get runtime errors. This is very counterintuitive, as AMI 3.8.0 contains Scala 2.11.1.

Spark 1.3.1

Even though the latest version of Spark out is 1.4.1, EMR currently contains what appears to be a customized version of 1.3.1. These are the JARs available under Spark lib directory:

 

$ ls ~/spark/lib
amazon-kinesis-client-1.1.0.jar
datanucleus-api-jdo-3.2.6.jar
datanucleus-core-3.2.10.jar
datanucleus-rdbms-3.2.9.jar
original-spark-ganglia-lgpl_2.10-1.3.1.jar
original-spark-streaming-flume_2.10-1.3.1.jar
original-spark-streaming-flume-sink_2.10-1.3.1.jar
original-spark-streaming-kafka_2.10-1.3.1.jar
original-spark-streaming-kafka-assembly_2.10-1.3.1.jar
original-spark-streaming-kinesis-asl_2.10-1.3.1.jar
original-spark-streaming-mqtt_2.10-1.3.1.jar
original-spark-streaming-twitter_2.10-1.3.1.jar
original-spark-streaming-zeromq_2.10-1.3.1.jar
spark-1.3.1-yarn-shuffle.jar
spark-assembly-1.3.1-hadoop2.4.0.jar
spark-examples-1.3.1-hadoop2.4.0.jar
spark-ganglia-lgpl_2.10-1.3.1.jar
spark-streaming-flume_2.10-1.3.1.jar
spark-streaming-flume-sink_2.10-1.3.1.jar
spark-streaming-kafka_2.10-1.3.1.jar
spark-streaming-kafka-assembly_2.10-1.3.1.jar
spark-streaming-kinesis-asl_2.10-1.3.1.jar
spark-streaming-mqtt_2.10-1.3.1.jar
spark-streaming-twitter_2.10-1.3.1.jar
spark-streaming-zeromq_2.10-1.3.1.jar

 

Copy JAR to master

When submitting a Hadoop job, its common to give a path to S3 location of the JAR file, and EMR will load it and distribute it across the cluster. This doesn’t work with EMR. The JAR you will run will need to be present on the master node when you start running the step. This is a limitation of the current spark-submit script, which EMR uses to submit the job to the YARN cluster. This can be done either using a custom bootstrap (preferred if you’re going to do custom setup anyway), or using hdfs dfs -get as a first step.

Executors & Memory

Submitting a Hadoop job on EMR usually utilizes the entire cluster. This makes sense, since different jobs tend to launch different clusters, and shutdown when they’re done, or operate sequentially. The default configuration for running a Spark job is left at spark-submit defaults – two executors, with 1Gb of heap memory each, and 512Mb for a driver. If you launch a cluster with more than a few nodes, majority of them will sit idle. When submitting a step, don’t forget to adjust the num-executors to be the total number of cores in your cluster (or more if you’re IO bound and can handle context switching), and subdivide the RAM on each node between them (don’t forget about YARN overhead).

CLI command

With all the above points mentioned, the following is the resulting CLI command that can be issued to start up a Spark cluster, run a job, and shutdown. I’ve added line comments to describe the features.

 

aws emr create-cluster --name "My Spark Cluster" \
--ami-version 3.8 \
--applications Name=Spark \
--ec2-attributes KeyName=mykey \
--region us-east-1  \
--enable-debugging \
--instance-groups InstanceCount=1,Name=Master,InstanceGroupType=MASTER,InstanceType=m2.xlarge InstanceCount=50,BidPrice=0.30,Name=Core,InstanceGroupType=CORE,InstanceType=m2.xlarge \
--log-uri s3://bucket/path/to/logs/  \
--auto-terminate \
--bootstrap-actions Path=s3://bucket/path/to/Bootstrap.sh,Name=Download \ # Download JAR to local directory using aws cp s3://remote.jar /home/hadoop/local.jar
--steps Type=Spark,Name=Program,ActionOnFailure=CONTINUE,Args=[--num-executors,100,--executor-memory,6595M,--driver-memory,6595M,--class,com.danosipov.MainClass,/home/hadoop/local.jar,commaSeparatedArgumentList] # commaSeparatedArgumentList will be passed to MainClass.main as Array[String]. Make sure there are no spaces in this list

Posted by & filed under Big Data, Programming.

In this post I’ll try to cover how to write and read Avro records in Scalding pipelines. To begin, a reminder that Avro is a serialization format, and Scalding is a scala API on top of Hadoop. If you’re not using Scalding, this post is probably not too interesting for you.

Let’s begin by defining a use case. We have a stream of user interactions written to HDFS, and we want to analyze it in a periodic pipeline, calculating all users, and the number of unique other users they interact with. Our log files are simple text, same as our output. Where Avro comes in, is it helps us avoid recomputing the entire dataset on every pipeline run by writing a checkpoint representation of the data set, that is considerably smaller than a full set, hence reducing our pipeline execution time and cost. This is a hypothetical example that helps me demonstrate some of the interesting features.

Let’s begin by creating a Scalding job. I’m going to use a Scalding SBT Plugin to do the majority of the work of bringing in dependencies. The details are outlined in the plugin README: https://github.com/danosipov/sbt-scalding-plugin

We need to bring in Avro dependency to allow us to create and use Avro records. There are a number of SBT plugins to do that, and I’ll use one of them (https://github.com/cavorite/sbt-avro):

To plugins.sbt, add:

resolvers += "sbt-plugin-releases" at "http://repo.scala-sbt.org/scalasbt/sbt-plugin-releases"

addSbtPlugin("com.cavorite" % "sbt-avro" % "0.3.2")

And in build.sbt:

Seq(sbtavro.SbtAvro.avroSettings: _*)

version in avroConfig := "1.7.7"

Now on to defining our Avro schema. In src/main/avro (new directory), create a schema definition file, interactions.avsc with the following content:

{
    "namespace": "interaction.avro",
    "type": "record",
    "name": "Interaction",
    "fields": [
        {"name": "userid", "type": "string"},
        {"name": "interactions", "type": "bytes"}
    ]
}

For more information on creating the Avro schema, look at the official documentation. I’ll explain the reason for the types chosen further below.

We will now write our job. The first iteration without use of Avro checkpoint is below, with inline comments describing what is happening. It simply counts the number of interactions by grouping them under a user, converting all the IDs the user interacted with to a Set, and getting the set size.

The second iteration is more involved, as it reads in the checkpointed results, presumably accumulated over previous runs, and joins in the new results. It outputs a same counts as the previous job, as well as a checkpoint for future runs.

While most of the steps are straightforward to anyone who’s done some Scalding development, I want to bring to your attention line 43. On this line we create a set of user IDs that have interacted with the current user. Simple count will not work, as we want the unique interactions. This works fine for most users, unless they interacted with millions of other users. But if we want to serialize this list, we’ll be utilizing a lot of space – each ID occupies 16 bytes if UUID is used.

Algebird provides an implementation of an approximate set, called HyperLogLog. With just 16 bytes we can capture the entire set of IDs, if we accept a small accuracy error (around 1% in this case). HyperLogLog instances can be combined just like Sets, and can be serialized to disk.

Let’s now bring Avro serialization into the picture. We want to load up the previous computation, add in new results, and serialize it back out. The reason why the interactions of type “bytes” in the Avro schema should now be clear – we write the HLL representation of the unique set as bytes, and then read it in, deserializing it to an object. This is done using a Bijection.

Full project with unit tests for serialization/deserialization is available on Github.

Posted by & filed under Programming.

I was very fortunate to attend Scala Days 2015 in San Francisco this week. It was incredible to talk to the people behind the language, and to get a glimpse of a wide ecosystem. I wanted to post a short recap of some of my thoughts and feelings after the conference.

20150317_140004

Martin Odersky delivered the first keynote on Monday, and described how Scala is evolving – specific changes to the language, solution to binary compatibility problem (intermediate TASTY format), and more. What he clearly conveyed, both through jokes and detailed plans, is that Scala is a unique language, with its own rules and limitations. It is not, and will not become “Hascalator”, despite many in the community demonstrating the benefits of a more strict functional language. At the same time, the language keeps evolving, while becoming more stable for dependable use. This process was further detailed by Grzegorz Kossakowski, who explained how the scala build process works, and how features get implemented into the language.

Bill Venners told a very interesting history lesson on some failed projects, and showed how he decided against including a feature into Scalatest because of API complexity it would create, as well as compile speed issues. Reframing the problem differently allowed for the creation of the SuperSafe compiler plugin, enabling the feature without performance or complexity cost.

20150317_145954

Josh Suereth and Eugene Yokota covered a history of build tools and lead up to SBT 1.0. It was great to hear of the influencers on SBT, starting from Make, and ending with Google Blaze. It’s very interesting how SBT server changes the build process, and it will be interesting to see what functionality it enables.

A theme I found extremely interesting, was the use of sun.misc.Unsafe with Scala to manage off-heap memory. Using unsafe allows to bypass JVM garbage collection, and manage the chunk of memory directly, accepting the challenge and danger that it brings. Denys Shabalin covered a library that wraps Unsafe and give a Scala specific access API for memory, which he open sourced during his presentation. Meanwhile Ryan LeCompte covered a specific usecase that benefited from utilizing off-heap storage for fast analytical query processing using Akka Cluster. Similar ideas were echoed by Lee Mighdoll while covering Sparkle, and how it maintains asynchronous arrays of batches of records flowing to keep the visualizer responsive.

All sessions will be posted on parleys.com soon, but I hope my short post has captured some of the atmosphere of the conference that can’t be conveyed in sessions alone. It will be interesting to watch the ideas presented evolve.

20150316_192230

Posted by & filed under Big Data.

Would you like to step through your Spark job in a debugger? These steps show you how to configure IntelliJ IDEA to allow just that.

Unlike a traditional Java or Scala application, Spark jobs expect to be run within a larger Spark application, that gives access to SparkContext. Your application interacts with the environment through the SparkContext object. Because of this constraint you can’t just launch your Spark job from the IDE and expect it to run correctly.

The steps I outline describe the process to launch a debugging session for a Scala application, however very similar steps can be applied to launching Java applications. PySpark applications are more involved in their interaction with the Spark cluster, so I doubt this procedure applies to Python.

First you want to have an application class that looks something like this:

object Test {
  def main(args: Array[String]): Unit = {
    // Local
    val spark = new SparkContext(new SparkConf()
      .setMaster("local").setAppName("Test")
    )
    spark.setCheckpointDir("/tmp/spark/")

    println("-------------Attach debugger now!--------------")
    Thread.sleep(8000)

    // Your job code here, with breakpoints set on the lines you want to pause
  }
}

Now you want to get this job to the local cluster. First package the job and all its dependencies into a fat JAR

$ sbt assembly

Next submit it to a local cluster. You need to have spark-submit script somewhere on your system:

$ export SPARK_JAVA_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005
$ /path/to/spark/bin/spark-submit --class Test --master local[4] --driver-memory 4G --executor-memory 4G /path/totarget/scala-2.10/project-assembly-0.0.1.jar

First line exports a Java argument that will be used to start Spark with the debugger. --class needs to point to a fully qualified class path to your job. Finally give it the path to the fat jar assembled in the previous command.

If you run that command it will execute your job without breaking at the breakpoints. Now we need to configure IntelliJ to connect to the cluster. This process is detailed in the official IDEA documentation. If you just create a default “Remote” Run/Debug configuration, and leave default port of 5005, it should work fine. Now when you submit the job again, and see the message to attach the debugger, you have 8 seconds to switch to IntelliJ IDEA and trigger this run configuration. The program will then continue to execute and pause at any breakpoint you defined. You can then step through it like any normal Scala/Java program. You can even step into Spark functions to see what its doing under the hood.

Hopefully this helps, as I found it very useful in debugging serialization errors and other difficult to trace issues. Similar process could potentially be applied to debugging a job on the cluster, although you would only be able to debug the code that runs in the driver, not the executor.

Posted by & filed under Android.

I recently upgraded my Android toolchain to the official Android Studio 1.0. I’ve set up a new project, and as with any Android app, included Robolectric to allow for easy unit testing. I was able to configure Gradle build to execute the tests fairly easily using existing documentation on Robolectric project. To my surprise however, I wasn’t able to launch unit tests using the standard IntelliJ JUnit run configuration. This post details my workaround, hopefully temporary until Google fixes the issue in Android Studio.

To start, create a JUnit configuration, as detailed on the Jetbrains site. If you run it now, you’ll face “!!! JUnit version 3.8 or later expected” error. This occurs no matter which JUnit dependency you define in your Gradle file, as Android bundles an earlier version that gets precedence in the classpath. The solution in the past was to move the JUnit dependency in the project configuration or *.iml file up above the Android SDK. This is no longer an option in Android Studio 1.0. So the remaining solution is to modify the classpath used by the run configuration.

In order to do that, copy the run command that was used to launch JUnit tests. It is the first line of output when you run the JUnit run configuration. Get rid of everything except -classpath "..." argument. The argument value is a colon-separated (:) list of files and directories where java looks for classes. In this value, locate the JUnit dependency (either junit, or junit-dep), and make it the first path in the list. Now paste this new argument in the “VM Options” field of JUnit run configuration.

2015-01-18_2155

We’re still not done, as running now will result in java not finding the class file for your test. To resolve this, create a new Gradle run configuration. Point it to your app, and enter testDebugClasses. Call it something memorable (I named it “Prepare Robolectric”), and go back to JUnit run configuration. In the “Before Launch” list add Another Action, and choose the Gradle task.

The Gradle task will compile test sources required for JUnit, and place them in app/build/test-classes/. The last step is to add the absolute path to this directory to the classpath in the JUnit configuration (at the end of the list). You should now be able to run unit tests right from Android Studio.

This presentation was very helpful in configuring Android Studio to run tests. Refer to it for more information and visual screenshots. If you have a better way of configuring unit tests, I’d love to hear it!