Posted by & filed under Big Data.

Schema migrations in the relational world are now common practice. The best practices for evolving a database schema are well known, where a migration gets applied before the code that needs to use it is rolled out. If there are any problems, migration can be rolled back. The same practices are not as well established in Big Data world. Part of the reason may be the diversity of tools and usecases, another reason may be scale – a migration on petabyte scale is not something that can be casually approached.

One problem I see a lot is the desire to keep the database schema in version control. There are established best practices around version control tools, such as code review and continuous integration, and it would be nice to keep the schema for the data warehouse to conform to those practices. However, inevitably the schema gets out of sync with the production version – someone creates a view and doesn’t check it in, someone else gives more permissions to a user group, etc. So the goal is to ensure the version controlled schema always matches the current live schema.

Most data warehouses provide a way to view the schema. Here is how to do it in Hive:
SHOW CREATE TABLE tablename
Doing the same in Amazon Redshift is a little more complicated, but can be accomplished using this helpful SQL view. These operate on a table, but you could set up a simple script that iterates over all tables or views to generate similar output along with the necessary GRANT statements. The goal is to have the script that could be run to regenerate the data catalog, if that becomes necessary.

At this point the advantages of having the script may not be clear – after all the database is the source of truth, and can be restored from a backup in an emergency. True, but the advantage of the scripts being in source control is that they can now go through the standard change process that you use (pull requests, code review, corresponding the change to a ticket, etc). In addition, the changes can be carried along with other code changes, for example to ETL pipelines feeding these tables, giving better visibility.

The last step is to set up verification that the source repository is in sync with production catalog. This can be done with a Continuous Integration (CI) server of your choice. A simple Jenkins job that runs a diff between the script output and version controlled file every night (or every hour), and fails the build if the files don’t match. The trickiest part here is setting up credentials in a secure manner, since CI systems are typically isolated from data catalogs.

There are further steps that can be taken at this point, though this is as far as I’ve seen it taken. Further automation is possible, but typically not worth the effort given the number of tables & administrators involved. One possible improvement is building a staging version of the data warehouse (without all the data) using CI jobs, running ALTER commands on it, and verifying the final state matches expected one. This would add further safety to the schema evolution process and catch mistakes that slip through code review.

Have you done something similar at your company, or have you found an alternative solution to this problem? I’d love to hear about it!

Posted by & filed under Big Data, Programming.

Scala is statically typed. Yet we rarely use it to our advantage. For example let’s say you have a function with the signature:

 def convert(a: Double, b: Double): String

Of course, the author hasn’t written a comment, and clearly didn’t expressively label the variables. So lets fix it:

 def reverseGeocode(latitude: Double, longitude: Double): String

The types are the same, but the variable names now relate exactly what the function does. Most people stop there.
But its so easy to flip the order of the arguments, especially if they’re passed around in a long chain. What if we explicitly label the type using a type alias?

 type Latitude = Double
 type Longitude = Double
 def reverseGeocode(point: (Latitude, Longitude)): String

This helps with documentation, but doesn’t let compiler validate we’re passing correct values – any tuple of doubles will be accepted as valid input. We can of course create a case class

 case class GeoPoint(latitude: Double, longitude: Double)
 def reverseGeocode(point: GeoPoint): String

Works for this case, but not always, as the arguments may not fit neatly into a struct that can be given a name, and should be kept separate. We can wrap each value into a case class:

 case class Latitude(value: Double)
 case class Longitude(value: Double)
 def reverseGeocode(lat: Latitude, long: Longitude): String

But now we’ve boxed the Double, which has a performance impact if we were to use it in a loop. Luckily Scala gives us a solution:

 case class Latitude(value: Double) extends AnyVal
 case class Longitude(value: Double) extends AnyVal
 def reverseGeocode(lat: Latitude, long: Longitude): String

This is good, but does require another hop to get at the value, which could get verbose if widely used. Scalaz also offers Tagged Types, which help make the type more explicit and compile safe, so are a good alternative if you have the Scalaz dependency already in your project.

Too often in data science and analysis code I see primitive types. I don’t know if they’re left that way because of performance, or laziness (most likely the latter), but assigning an explicit type helps catch errors and provides implicit documentation better than expressive naming.

Posted by & filed under Big Data.

Apache Spark recently received top level support on Amazon Elastic MapReduce (EMR) cloud offering, joining applications such as Hadoop, Hive, Pig, HBase, Presto, and Impala. This is exciting for me, because most of my workloads run on EMR, and utilizing Spark required either standing up manual EC2 clusters, or using EMR bootstrap, which was very difficult to configure.

Documentation for running Spark on EMR is rather short, only in the form of the launch blog post and official support section (which is outdated already and contains errors). This post details my experience with attempting to run a batch Scala application with official Spark support on AMI 3.8.0.

Scala 2.10

Perhaps the biggest frustration, and complete surprise is the fact that the version of Spark installed is compiled with Scala version 2.10. That means whatever application you write has to be compiled against Scala 2.10 as well, otherwise you will get runtime errors. This is very counterintuitive, as AMI 3.8.0 contains Scala 2.11.1.

Spark 1.3.1

Even though the latest version of Spark out is 1.4.1, EMR currently contains what appears to be a customized version of 1.3.1. These are the JARs available under Spark lib directory:

 

$ ls ~/spark/lib
amazon-kinesis-client-1.1.0.jar
datanucleus-api-jdo-3.2.6.jar
datanucleus-core-3.2.10.jar
datanucleus-rdbms-3.2.9.jar
original-spark-ganglia-lgpl_2.10-1.3.1.jar
original-spark-streaming-flume_2.10-1.3.1.jar
original-spark-streaming-flume-sink_2.10-1.3.1.jar
original-spark-streaming-kafka_2.10-1.3.1.jar
original-spark-streaming-kafka-assembly_2.10-1.3.1.jar
original-spark-streaming-kinesis-asl_2.10-1.3.1.jar
original-spark-streaming-mqtt_2.10-1.3.1.jar
original-spark-streaming-twitter_2.10-1.3.1.jar
original-spark-streaming-zeromq_2.10-1.3.1.jar
spark-1.3.1-yarn-shuffle.jar
spark-assembly-1.3.1-hadoop2.4.0.jar
spark-examples-1.3.1-hadoop2.4.0.jar
spark-ganglia-lgpl_2.10-1.3.1.jar
spark-streaming-flume_2.10-1.3.1.jar
spark-streaming-flume-sink_2.10-1.3.1.jar
spark-streaming-kafka_2.10-1.3.1.jar
spark-streaming-kafka-assembly_2.10-1.3.1.jar
spark-streaming-kinesis-asl_2.10-1.3.1.jar
spark-streaming-mqtt_2.10-1.3.1.jar
spark-streaming-twitter_2.10-1.3.1.jar
spark-streaming-zeromq_2.10-1.3.1.jar

 

Copy JAR to master

When submitting a Hadoop job, its common to give a path to S3 location of the JAR file, and EMR will load it and distribute it across the cluster. This doesn’t work with EMR. The JAR you will run will need to be present on the master node when you start running the step. This is a limitation of the current spark-submit script, which EMR uses to submit the job to the YARN cluster. This can be done either using a custom bootstrap (preferred if you’re going to do custom setup anyway), or using hdfs dfs -get as a first step.

Executors & Memory

Submitting a Hadoop job on EMR usually utilizes the entire cluster. This makes sense, since different jobs tend to launch different clusters, and shutdown when they’re done, or operate sequentially. The default configuration for running a Spark job is left at spark-submit defaults – two executors, with 1Gb of heap memory each, and 512Mb for a driver. If you launch a cluster with more than a few nodes, majority of them will sit idle. When submitting a step, don’t forget to adjust the num-executors to be the total number of cores in your cluster (or more if you’re IO bound and can handle context switching), and subdivide the RAM on each node between them (don’t forget about YARN overhead).

CLI command

With all the above points mentioned, the following is the resulting CLI command that can be issued to start up a Spark cluster, run a job, and shutdown. I’ve added line comments to describe the features.

 

aws emr create-cluster --name "My Spark Cluster" \
--ami-version 3.8 \
--applications Name=Spark \
--ec2-attributes KeyName=mykey \
--region us-east-1  \
--enable-debugging \
--instance-groups InstanceCount=1,Name=Master,InstanceGroupType=MASTER,InstanceType=m2.xlarge InstanceCount=50,BidPrice=0.30,Name=Core,InstanceGroupType=CORE,InstanceType=m2.xlarge \
--log-uri s3://bucket/path/to/logs/  \
--auto-terminate \
--bootstrap-actions Path=s3://bucket/path/to/Bootstrap.sh,Name=Download \ # Download JAR to local directory using aws cp s3://remote.jar /home/hadoop/local.jar
--steps Type=Spark,Name=Program,ActionOnFailure=CONTINUE,Args=[--num-executors,100,--executor-memory,6595M,--driver-memory,6595M,--class,com.danosipov.MainClass,/home/hadoop/local.jar,commaSeparatedArgumentList] # commaSeparatedArgumentList will be passed to MainClass.main as Array[String]. Make sure there are no spaces in this list

Posted by & filed under Big Data, Programming.

In this post I’ll try to cover how to write and read Avro records in Scalding pipelines. To begin, a reminder that Avro is a serialization format, and Scalding is a scala API on top of Hadoop. If you’re not using Scalding, this post is probably not too interesting for you.

Let’s begin by defining a use case. We have a stream of user interactions written to HDFS, and we want to analyze it in a periodic pipeline, calculating all users, and the number of unique other users they interact with. Our log files are simple text, same as our output. Where Avro comes in, is it helps us avoid recomputing the entire dataset on every pipeline run by writing a checkpoint representation of the data set, that is considerably smaller than a full set, hence reducing our pipeline execution time and cost. This is a hypothetical example that helps me demonstrate some of the interesting features.

Let’s begin by creating a Scalding job. I’m going to use a Scalding SBT Plugin to do the majority of the work of bringing in dependencies. The details are outlined in the plugin README: https://github.com/danosipov/sbt-scalding-plugin

We need to bring in Avro dependency to allow us to create and use Avro records. There are a number of SBT plugins to do that, and I’ll use one of them (https://github.com/cavorite/sbt-avro):

To plugins.sbt, add:

resolvers += "sbt-plugin-releases" at "http://repo.scala-sbt.org/scalasbt/sbt-plugin-releases"

addSbtPlugin("com.cavorite" % "sbt-avro" % "0.3.2")

And in build.sbt:

Seq(sbtavro.SbtAvro.avroSettings: _*)

version in avroConfig := "1.7.7"

Now on to defining our Avro schema. In src/main/avro (new directory), create a schema definition file, interactions.avsc with the following content:

{
    "namespace": "interaction.avro",
    "type": "record",
    "name": "Interaction",
    "fields": [
        {"name": "userid", "type": "string"},
        {"name": "interactions", "type": "bytes"}
    ]
}

For more information on creating the Avro schema, look at the official documentation. I’ll explain the reason for the types chosen further below.

We will now write our job. The first iteration without use of Avro checkpoint is below, with inline comments describing what is happening. It simply counts the number of interactions by grouping them under a user, converting all the IDs the user interacted with to a Set, and getting the set size.

The second iteration is more involved, as it reads in the checkpointed results, presumably accumulated over previous runs, and joins in the new results. It outputs a same counts as the previous job, as well as a checkpoint for future runs.

While most of the steps are straightforward to anyone who’s done some Scalding development, I want to bring to your attention line 43. On this line we create a set of user IDs that have interacted with the current user. Simple count will not work, as we want the unique interactions. This works fine for most users, unless they interacted with millions of other users. But if we want to serialize this list, we’ll be utilizing a lot of space – each ID occupies 16 bytes if UUID is used.

Algebird provides an implementation of an approximate set, called HyperLogLog. With just 16 bytes we can capture the entire set of IDs, if we accept a small accuracy error (around 1% in this case). HyperLogLog instances can be combined just like Sets, and can be serialized to disk.

Let’s now bring Avro serialization into the picture. We want to load up the previous computation, add in new results, and serialize it back out. The reason why the interactions of type “bytes” in the Avro schema should now be clear – we write the HLL representation of the unique set as bytes, and then read it in, deserializing it to an object. This is done using a Bijection.

Full project with unit tests for serialization/deserialization is available on Github.

Posted by & filed under Programming.

I was very fortunate to attend Scala Days 2015 in San Francisco this week. It was incredible to talk to the people behind the language, and to get a glimpse of a wide ecosystem. I wanted to post a short recap of some of my thoughts and feelings after the conference.

20150317_140004

Martin Odersky delivered the first keynote on Monday, and described how Scala is evolving – specific changes to the language, solution to binary compatibility problem (intermediate TASTY format), and more. What he clearly conveyed, both through jokes and detailed plans, is that Scala is a unique language, with its own rules and limitations. It is not, and will not become “Hascalator”, despite many in the community demonstrating the benefits of a more strict functional language. At the same time, the language keeps evolving, while becoming more stable for dependable use. This process was further detailed by Grzegorz Kossakowski, who explained how the scala build process works, and how features get implemented into the language.

Bill Venners told a very interesting history lesson on some failed projects, and showed how he decided against including a feature into Scalatest because of API complexity it would create, as well as compile speed issues. Reframing the problem differently allowed for the creation of the SuperSafe compiler plugin, enabling the feature without performance or complexity cost.

20150317_145954

Josh Suereth and Eugene Yokota covered a history of build tools and lead up to SBT 1.0. It was great to hear of the influencers on SBT, starting from Make, and ending with Google Blaze. It’s very interesting how SBT server changes the build process, and it will be interesting to see what functionality it enables.

A theme I found extremely interesting, was the use of sun.misc.Unsafe with Scala to manage off-heap memory. Using unsafe allows to bypass JVM garbage collection, and manage the chunk of memory directly, accepting the challenge and danger that it brings. Denys Shabalin covered a library that wraps Unsafe and give a Scala specific access API for memory, which he open sourced during his presentation. Meanwhile Ryan LeCompte covered a specific usecase that benefited from utilizing off-heap storage for fast analytical query processing using Akka Cluster. Similar ideas were echoed by Lee Mighdoll while covering Sparkle, and how it maintains asynchronous arrays of batches of records flowing to keep the visualizer responsive.

All sessions will be posted on parleys.com soon, but I hope my short post has captured some of the atmosphere of the conference that can’t be conveyed in sessions alone. It will be interesting to watch the ideas presented evolve.

20150316_192230