Posted by & filed under Big Data, Programming.

In this post I’ll try to cover how to write and read Avro records in Scalding pipelines. To begin, a reminder that Avro is a serialization format, and Scalding is a scala API on top of Hadoop. If you’re not using Scalding, this post is probably not too interesting for you.

Let’s begin by defining a use case. We have a stream of user interactions written to HDFS, and we want to analyze it in a periodic pipeline, calculating all users, and the number of unique other users they interact with. Our log files are simple text, same as our output. Where Avro comes in, is it helps us avoid recomputing the entire dataset on every pipeline run by writing a checkpoint representation of the data set, that is considerably smaller than a full set, hence reducing our pipeline execution time and cost. This is a hypothetical example that helps me demonstrate some of the interesting features.

Let’s begin by creating a Scalding job. I’m going to use a Scalding SBT Plugin to do the majority of the work of bringing in dependencies. The details are outlined in the plugin README: https://github.com/danosipov/sbt-scalding-plugin

We need to bring in Avro dependency to allow us to create and use Avro records. There are a number of SBT plugins to do that, and I’ll use one of them (https://github.com/cavorite/sbt-avro):

To plugins.sbt, add:

resolvers += "sbt-plugin-releases" at "http://repo.scala-sbt.org/scalasbt/sbt-plugin-releases"

addSbtPlugin("com.cavorite" % "sbt-avro" % "0.3.2")

And in build.sbt:

Seq(sbtavro.SbtAvro.avroSettings: _*)

version in avroConfig := "1.7.7"

Now on to defining our Avro schema. In src/main/avro (new directory), create a schema definition file, interactions.avsc with the following content:

{
    "namespace": "interaction.avro",
    "type": "record",
    "name": "Interaction",
    "fields": [
        {"name": "userid", "type": "string"},
        {"name": "interactions", "type": "bytes"}
    ]
}

For more information on creating the Avro schema, look at the official documentation. I’ll explain the reason for the types chosen further below.

We will now write our job. The first iteration without use of Avro checkpoint is below, with inline comments describing what is happening. It simply counts the number of interactions by grouping them under a user, converting all the IDs the user interacted with to a Set, and getting the set size.

The second iteration is more involved, as it reads in the checkpointed results, presumably accumulated over previous runs, and joins in the new results. It outputs a same counts as the previous job, as well as a checkpoint for future runs.

While most of the steps are straightforward to anyone who’s done some Scalding development, I want to bring to your attention line 43. On this line we create a set of user IDs that have interacted with the current user. Simple count will not work, as we want the unique interactions. This works fine for most users, unless they interacted with millions of other users. But if we want to serialize this list, we’ll be utilizing a lot of space – each ID occupies 16 bytes if UUID is used.

Algebird provides an implementation of an approximate set, called HyperLogLog. With just 16 bytes we can capture the entire set of IDs, if we accept a small accuracy error (around 1% in this case). HyperLogLog instances can be combined just like Sets, and can be serialized to disk.

Let’s now bring Avro serialization into the picture. We want to load up the previous computation, add in new results, and serialize it back out. The reason why the interactions of type “bytes” in the Avro schema should now be clear – we write the HLL representation of the unique set as bytes, and then read it in, deserializing it to an object. This is done using a Bijection.

Full project with unit tests for serialization/deserialization is available on Github.

Trackbacks/Pingbacks

  1.  Staying Sane While Working With Avro In Scalding | Jana Technology

Leave a Reply

  • (will not be published)