Posted by & filed under Big Data, Programming.

Scala is statically typed. Yet we rarely use it to our advantage. For example let’s say you have a function with the signature:

 def convert(a: Double, b: Double): String

Of course, the author hasn’t written a comment, and clearly didn’t expressively label the variables. So lets fix it:

 def reverseGeocode(latitude: Double, longitude: Double): String

The types are the same, but the variable names now relate exactly what the function does. Most people stop there.
But its so easy to flip the order of the arguments, especially if they’re passed around in a long chain. What if we explicitly label the type using a type alias?

 type Latitude = Double
 type Longitude = Double
 def reverseGeocode(point: (Latitude, Longitude)): String

This helps with documentation, but doesn’t let compiler validate we’re passing correct values – any tuple of doubles will be accepted as valid input. We can of course create a case class

 case class GeoPoint(latitude: Double, longitude: Double)
 def reverseGeocode(point: GeoPoint): String

Works for this case, but not always, as the arguments may not fit neatly into a struct that can be given a name, and should be kept separate. We can wrap each value into a case class:

 case class Latitude(value: Double)
 case class Longitude(value: Double)
 def reverseGeocode(lat: Latitude, long: Longitude): String

But now we’ve boxed the Double, which has a performance impact if we were to use it in a loop. Luckily Scala gives us a solution:

 case class Latitude(value: Double) extends AnyVal
 case class Longitude(value: Double) extends AnyVal
 def reverseGeocode(lat: Latitude, long: Longitude): String

This is good, but does require another hop to get at the value, which could get verbose if widely used. Scalaz also offers Tagged Types, which help make the type more explicit and compile safe, so are a good alternative if you have the Scalaz dependency already in your project.

Too often in data science and analysis code I see primitive types. I don’t know if they’re left that way because of performance, or laziness (most likely the latter), but assigning an explicit type helps catch errors and provides implicit documentation better than expressive naming.

Posted by & filed under Big Data.

Apache Spark recently received top level support on Amazon Elastic MapReduce (EMR) cloud offering, joining applications such as Hadoop, Hive, Pig, HBase, Presto, and Impala. This is exciting for me, because most of my workloads run on EMR, and utilizing Spark required either standing up manual EC2 clusters, or using EMR bootstrap, which was very difficult to configure.

Documentation for running Spark on EMR is rather short, only in the form of the launch blog post and official support section (which is outdated already and contains errors). This post details my experience with attempting to run a batch Scala application with official Spark support on AMI 3.8.0.

Scala 2.10

Perhaps the biggest frustration, and complete surprise is the fact that the version of Spark installed is compiled with Scala version 2.10. That means whatever application you write has to be compiled against Scala 2.10 as well, otherwise you will get runtime errors. This is very counterintuitive, as AMI 3.8.0 contains Scala 2.11.1.

Spark 1.3.1

Even though the latest version of Spark out is 1.4.1, EMR currently contains what appears to be a customized version of 1.3.1. These are the JARs available under Spark lib directory:


$ ls ~/spark/lib


Copy JAR to master

When submitting a Hadoop job, its common to give a path to S3 location of the JAR file, and EMR will load it and distribute it across the cluster. This doesn’t work with EMR. The JAR you will run will need to be present on the master node when you start running the step. This is a limitation of the current spark-submit script, which EMR uses to submit the job to the YARN cluster. This can be done either using a custom bootstrap (preferred if you’re going to do custom setup anyway), or using hdfs dfs -get as a first step.

Executors & Memory

Submitting a Hadoop job on EMR usually utilizes the entire cluster. This makes sense, since different jobs tend to launch different clusters, and shutdown when they’re done, or operate sequentially. The default configuration for running a Spark job is left at spark-submit defaults – two executors, with 1Gb of heap memory each, and 512Mb for a driver. If you launch a cluster with more than a few nodes, majority of them will sit idle. When submitting a step, don’t forget to adjust the num-executors to be the total number of cores in your cluster (or more if you’re IO bound and can handle context switching), and subdivide the RAM on each node between them (don’t forget about YARN overhead).

CLI command

With all the above points mentioned, the following is the resulting CLI command that can be issued to start up a Spark cluster, run a job, and shutdown. I’ve added line comments to describe the features.


aws emr create-cluster --name "My Spark Cluster" \
--ami-version 3.8 \
--applications Name=Spark \
--ec2-attributes KeyName=mykey \
--region us-east-1  \
--enable-debugging \
--instance-groups InstanceCount=1,Name=Master,InstanceGroupType=MASTER,InstanceType=m2.xlarge InstanceCount=50,BidPrice=0.30,Name=Core,InstanceGroupType=CORE,InstanceType=m2.xlarge \
--log-uri s3://bucket/path/to/logs/  \
--auto-terminate \
--bootstrap-actions Path=s3://bucket/path/to/,Name=Download \ # Download JAR to local directory using aws cp s3://remote.jar /home/hadoop/local.jar
--steps Type=Spark,Name=Program,ActionOnFailure=CONTINUE,Args=[--num-executors,100,--executor-memory,6595M,--driver-memory,6595M,--class,com.danosipov.MainClass,/home/hadoop/local.jar,commaSeparatedArgumentList] # commaSeparatedArgumentList will be passed to MainClass.main as Array[String]. Make sure there are no spaces in this list

Posted by & filed under Big Data, Programming.

In this post I’ll try to cover how to write and read Avro records in Scalding pipelines. To begin, a reminder that Avro is a serialization format, and Scalding is a scala API on top of Hadoop. If you’re not using Scalding, this post is probably not too interesting for you.

Let’s begin by defining a use case. We have a stream of user interactions written to HDFS, and we want to analyze it in a periodic pipeline, calculating all users, and the number of unique other users they interact with. Our log files are simple text, same as our output. Where Avro comes in, is it helps us avoid recomputing the entire dataset on every pipeline run by writing a checkpoint representation of the data set, that is considerably smaller than a full set, hence reducing our pipeline execution time and cost. This is a hypothetical example that helps me demonstrate some of the interesting features.

Let’s begin by creating a Scalding job. I’m going to use a Scalding SBT Plugin to do the majority of the work of bringing in dependencies. The details are outlined in the plugin README:

We need to bring in Avro dependency to allow us to create and use Avro records. There are a number of SBT plugins to do that, and I’ll use one of them (

To plugins.sbt, add:

resolvers += "sbt-plugin-releases" at ""

addSbtPlugin("com.cavorite" % "sbt-avro" % "0.3.2")

And in build.sbt:

Seq(sbtavro.SbtAvro.avroSettings: _*)

version in avroConfig := "1.7.7"

Now on to defining our Avro schema. In src/main/avro (new directory), create a schema definition file, interactions.avsc with the following content:

    "namespace": "interaction.avro",
    "type": "record",
    "name": "Interaction",
    "fields": [
        {"name": "userid", "type": "string"},
        {"name": "interactions", "type": "bytes"}

For more information on creating the Avro schema, look at the official documentation. I’ll explain the reason for the types chosen further below.

We will now write our job. The first iteration without use of Avro checkpoint is below, with inline comments describing what is happening. It simply counts the number of interactions by grouping them under a user, converting all the IDs the user interacted with to a Set, and getting the set size.

The second iteration is more involved, as it reads in the checkpointed results, presumably accumulated over previous runs, and joins in the new results. It outputs a same counts as the previous job, as well as a checkpoint for future runs.

While most of the steps are straightforward to anyone who’s done some Scalding development, I want to bring to your attention line 43. On this line we create a set of user IDs that have interacted with the current user. Simple count will not work, as we want the unique interactions. This works fine for most users, unless they interacted with millions of other users. But if we want to serialize this list, we’ll be utilizing a lot of space – each ID occupies 16 bytes if UUID is used.

Algebird provides an implementation of an approximate set, called HyperLogLog. With just 16 bytes we can capture the entire set of IDs, if we accept a small accuracy error (around 1% in this case). HyperLogLog instances can be combined just like Sets, and can be serialized to disk.

Let’s now bring Avro serialization into the picture. We want to load up the previous computation, add in new results, and serialize it back out. The reason why the interactions of type “bytes” in the Avro schema should now be clear – we write the HLL representation of the unique set as bytes, and then read it in, deserializing it to an object. This is done using a Bijection.

Full project with unit tests for serialization/deserialization is available on Github.

Posted by & filed under Programming.

I was very fortunate to attend Scala Days 2015 in San Francisco this week. It was incredible to talk to the people behind the language, and to get a glimpse of a wide ecosystem. I wanted to post a short recap of some of my thoughts and feelings after the conference.


Martin Odersky delivered the first keynote on Monday, and described how Scala is evolving – specific changes to the language, solution to binary compatibility problem (intermediate TASTY format), and more. What he clearly conveyed, both through jokes and detailed plans, is that Scala is a unique language, with its own rules and limitations. It is not, and will not become “Hascalator”, despite many in the community demonstrating the benefits of a more strict functional language. At the same time, the language keeps evolving, while becoming more stable for dependable use. This process was further detailed by Grzegorz Kossakowski, who explained how the scala build process works, and how features get implemented into the language.

Bill Venners told a very interesting history lesson on some failed projects, and showed how he decided against including a feature into Scalatest because of API complexity it would create, as well as compile speed issues. Reframing the problem differently allowed for the creation of the SuperSafe compiler plugin, enabling the feature without performance or complexity cost.


Josh Suereth and Eugene Yokota covered a history of build tools and lead up to SBT 1.0. It was great to hear of the influencers on SBT, starting from Make, and ending with Google Blaze. It’s very interesting how SBT server changes the build process, and it will be interesting to see what functionality it enables.

A theme I found extremely interesting, was the use of sun.misc.Unsafe with Scala to manage off-heap memory. Using unsafe allows to bypass JVM garbage collection, and manage the chunk of memory directly, accepting the challenge and danger that it brings. Denys Shabalin covered a library that wraps Unsafe and give a Scala specific access API for memory, which he open sourced during his presentation. Meanwhile Ryan LeCompte covered a specific usecase that benefited from utilizing off-heap storage for fast analytical query processing using Akka Cluster. Similar ideas were echoed by Lee Mighdoll while covering Sparkle, and how it maintains asynchronous arrays of batches of records flowing to keep the visualizer responsive.

All sessions will be posted on soon, but I hope my short post has captured some of the atmosphere of the conference that can’t be conveyed in sessions alone. It will be interesting to watch the ideas presented evolve.


Posted by & filed under Big Data.

Would you like to step through your Spark job in a debugger? These steps show you how to configure IntelliJ IDEA to allow just that.

Unlike a traditional Java or Scala application, Spark jobs expect to be run within a larger Spark application, that gives access to SparkContext. Your application interacts with the environment through the SparkContext object. Because of this constraint you can’t just launch your Spark job from the IDE and expect it to run correctly.

The steps I outline describe the process to launch a debugging session for a Scala application, however very similar steps can be applied to launching Java applications. PySpark applications are more involved in their interaction with the Spark cluster, so I doubt this procedure applies to Python.

First you want to have an application class that looks something like this:

object Test {
  def main(args: Array[String]): Unit = {
    // Local
    val spark = new SparkContext(new SparkConf()

    println("-------------Attach debugger now!--------------")

    // Your job code here, with breakpoints set on the lines you want to pause

Now you want to get this job to the local cluster. First package the job and all its dependencies into a fat JAR

$ sbt assembly

Next submit it to a local cluster. You need to have spark-submit script somewhere on your system:

$ export SPARK_JAVA_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005
$ /path/to/spark/bin/spark-submit --class Test --master local[4] --driver-memory 4G --executor-memory 4G /path/totarget/scala-2.10/project-assembly-0.0.1.jar

First line exports a Java argument that will be used to start Spark with the debugger. --class needs to point to a fully qualified class path to your job. Finally give it the path to the fat jar assembled in the previous command.

If you run that command it will execute your job without breaking at the breakpoints. Now we need to configure IntelliJ to connect to the cluster. This process is detailed in the official IDEA documentation. If you just create a default “Remote” Run/Debug configuration, and leave default port of 5005, it should work fine. Now when you submit the job again, and see the message to attach the debugger, you have 8 seconds to switch to IntelliJ IDEA and trigger this run configuration. The program will then continue to execute and pause at any breakpoint you defined. You can then step through it like any normal Scala/Java program. You can even step into Spark functions to see what its doing under the hood.

Hopefully this helps, as I found it very useful in debugging serialization errors and other difficult to trace issues. Similar process could potentially be applied to debugging a job on the cluster, although you would only be able to debug the code that runs in the driver, not the executor.