Apache Spark recently received top level support on Amazon Elastic MapReduce (EMR) cloud offering, joining applications such as Hadoop, Hive, Pig, HBase, Presto, and Impala. This is exciting for me, because most of my workloads run on EMR, and utilizing Spark required either standing up manual EC2 clusters, or using EMR bootstrap, which was very difficult to configure.
Documentation for running Spark on EMR is rather short, only in the form of the launch blog post and official support section (which is outdated already and contains errors). This post details my experience with attempting to run a batch Scala application with official Spark support on AMI 3.8.0.
Perhaps the biggest frustration, and complete surprise is the fact that the version of Spark installed is compiled with Scala version 2.10. That means whatever application you write has to be compiled against Scala 2.10 as well, otherwise you will get runtime errors. This is very counterintuitive, as AMI 3.8.0 contains Scala 2.11.1.
Even though the latest version of Spark out is 1.4.1, EMR currently contains what appears to be a customized version of 1.3.1. These are the JARs available under Spark lib directory:
$ ls ~/spark/lib amazon-kinesis-client-1.1.0.jar datanucleus-api-jdo-3.2.6.jar datanucleus-core-3.2.10.jar datanucleus-rdbms-3.2.9.jar original-spark-ganglia-lgpl_2.10-1.3.1.jar original-spark-streaming-flume_2.10-1.3.1.jar original-spark-streaming-flume-sink_2.10-1.3.1.jar original-spark-streaming-kafka_2.10-1.3.1.jar original-spark-streaming-kafka-assembly_2.10-1.3.1.jar original-spark-streaming-kinesis-asl_2.10-1.3.1.jar original-spark-streaming-mqtt_2.10-1.3.1.jar original-spark-streaming-twitter_2.10-1.3.1.jar original-spark-streaming-zeromq_2.10-1.3.1.jar spark-1.3.1-yarn-shuffle.jar spark-assembly-1.3.1-hadoop2.4.0.jar spark-examples-1.3.1-hadoop2.4.0.jar spark-ganglia-lgpl_2.10-1.3.1.jar spark-streaming-flume_2.10-1.3.1.jar spark-streaming-flume-sink_2.10-1.3.1.jar spark-streaming-kafka_2.10-1.3.1.jar spark-streaming-kafka-assembly_2.10-1.3.1.jar spark-streaming-kinesis-asl_2.10-1.3.1.jar spark-streaming-mqtt_2.10-1.3.1.jar spark-streaming-twitter_2.10-1.3.1.jar spark-streaming-zeromq_2.10-1.3.1.jar
Copy JAR to master
When submitting a Hadoop job, its common to give a path to S3 location of the JAR file, and EMR will load it and distribute it across the cluster. This doesn’t work with EMR. The JAR you will run will need to be present on the master node when you start running the step. This is a limitation of the current spark-submit script, which EMR uses to submit the job to the YARN cluster. This can be done either using a custom bootstrap (preferred if you’re going to do custom setup anyway), or using hdfs dfs -get as a first step.
Executors & Memory
Submitting a Hadoop job on EMR usually utilizes the entire cluster. This makes sense, since different jobs tend to launch different clusters, and shutdown when they’re done, or operate sequentially. The default configuration for running a Spark job is left at spark-submit defaults – two executors, with 1Gb of heap memory each, and 512Mb for a driver. If you launch a cluster with more than a few nodes, majority of them will sit idle. When submitting a step, don’t forget to adjust the num-executors to be the total number of cores in your cluster (or more if you’re IO bound and can handle context switching), and subdivide the RAM on each node between them (don’t forget about YARN overhead).
With all the above points mentioned, the following is the resulting CLI command that can be issued to start up a Spark cluster, run a job, and shutdown. I’ve added line comments to describe the features.
aws emr create-cluster --name "My Spark Cluster" \ --ami-version 3.8 \ --applications Name=Spark \ --ec2-attributes KeyName=mykey \ --region us-east-1 \ --enable-debugging \ --instance-groups InstanceCount=1,Name=Master,InstanceGroupType=MASTER,InstanceType=m2.xlarge InstanceCount=50,BidPrice=0.30,Name=Core,InstanceGroupType=CORE,InstanceType=m2.xlarge \ --log-uri s3://bucket/path/to/logs/ \ --auto-terminate \ --bootstrap-actions Path=s3://bucket/path/to/Bootstrap.sh,Name=Download \ # Download JAR to local directory using aws cp s3://remote.jar /home/hadoop/local.jar --steps Type=Spark,Name=Program,ActionOnFailure=CONTINUE,Args=[--num-executors,100,--executor-memory,6595M,--driver-memory,6595M,--class,com.danosipov.MainClass,/home/hadoop/local.jar,commaSeparatedArgumentList] # commaSeparatedArgumentList will be passed to MainClass.main as Array[String]. Make sure there are no spaces in this list