A serverless approach to AWS EMR
Run a Spark application on a transient EMR cluster
Table of Contents
– AWS serverless big data computing
– Application overview and prerequisites
– Project structure
– PySpark application
– Packaging our application
– Define Python environment and external libraries using Docker on Yarn
– EMR cluster configurations
— Steps configuration
— Services configurations
— Instance groups: how to make your cluster elastic using autoscaling
– Launch the cluster
– In conclusion
AWS serverless big data computing
Nowadays it’s a common task to periodically process a large quantity of data at fixed intervals (daily, weekly, etc). From reporting to statistics, from analytics to machine learning, the reasons could be many. These workloads are generally considered under the umbrella term of Big Data Computing, where data can’t be stored / processed on a single machine and requires instead to be distributed on a cluster (Hadoop clusters have been a common choice for a long time). The problem is these clusters are costly and difficult to set up and maintain, so in the last few years a new computing model has emerged in the cloud: serverless computing. In serverless computing the cloud provider manages infrastructure and scaling of resources while the user can focus mainly on application code. On AWS ecosystem, two possible examples of serverless computing services for processing / analyze data at scale are AWS Athena and AWS Glue.
AWS Athena lets you analyze and process data stored on S3 using a SQL-like syntax. It is basically a managed Hive / PrestoDB service where you can submit your query and AWS will automatically scale resources to handle the workload. In addition, using CTAS syntax, it is possible to store the result of a query in a new table with the option to partition / bucket the data and change storage to other formats such as Avro or Parquet. Athena is a great tool and it is generally a fast and reliable way to analyze data on S3, but as of today comes with a few limitations:
- Global sorting on a large number of records can cause the error “Query exhausted resources at this scale factor”. This is because Athena runs on an old version of PrestoDB which lacks distributed sorting and all the records need to be ordered on the master node.
- CTAS syntax allows writing up to 100 unique partition and bucket combinations. This could be a problem if you need to split your data to an higher number of partitions. See here for more details.
I am sure both of these issues will be addressed in the future, but today they certainly represent an obstacle.
AWS Glue is a serverless ETL (Extract Transform Load) service based on Apache Spark. It is great for running ETL jobs without bothering about infrastructure and scaling of resources and comes with goodies such as Data Catalog, Glue Studio, Development Endpoints, Data Crawlers and others. Although it is definitely a very useful tool, for certain scenarios it can lack some flexibility (e.g. if you need anything outside of Spark ecosystem you are out of luck).
In this article I would like to share an alternative solution to the above tools which is based mainly on AWS EMR (Elastic Map Reduce) service. EMR is a managed service for Hadoop and other Big Data frameworks but it is not completely serverless (in case of need you can still access machines in your cluster over SSH). We will develop a sample ETL application to load and process data on S3 using PySpark and S3DistCp. We will leverage the auto-terminate option of EMR clusters, which means all the resources will be released once the execution of our application is finished (as in the serverless model).
Specifically this is what we will cover:
- Structuring a PySpark project and packaging of the application.
- Managing external dependencies and Python environment using a custom Docker image hosted on AWS ECR.
- How to run an EMR cluster in transient mode (auto termination).
- How to make our cluster elastic using autoscaling policies.
Sounds interesting? Let’s get started!
Application overview and prerequisites
We’ll build a simple ETL application based on the COVID-19 Data Lake S3 open dataset which collects data about SARS-CoV-2 from various sources such as Johns Hopkins University, New York Times and others. Specifically our application will produce reports in csv format about SARS-CoV-2 statistics partitioned by country-region / province-state.
To follow along, you can refer to this GitHub Repository.
What you will need to complete all the steps:
- An AWS account and AWS Command Line Interface properly installed and configured.
- Python3
- Git
- Docker
- Node.js & NPM (optional, only for NPM scripting)
Project structure
Clone this GitHub Repository on a directory of your choice. You should have a folder structure like the following:
Inside the emr
folder there are configuration files and scripts to launch our application on an EMR cluster, as well as the Docker image file to use for the Yarn container. The src
folder contains the source code of our PySpark application while setup.py
script will package our application dependencies. If you have NPM installed there is also a package.json
file with some pre-configured npm scripts to package and launch the application (but standalone commands will be shown as well).
PySpark application
There are countless tutorials about Spark so I won’t go too deep here about how Spark works (I will assume a basic knowledge of Spark and Python as well). The application main package is my_spark_app
under src
directory. Although not necessary, we have also added a shared
sub-package to simulate an application with a more complex structure than a single python script. Let’s briefly describe the content of the shared.config
module:
This simple module exposes a Config
class with two static variables READ_PROTOCOL
and WRITE_PROTOCOL
. By default our application will read using s3:
protocol and write using hdfs:
protocol. We can override the defaults using the environment variables READ_PROTOCOL
and WRITE_PROTOCOL
respectively, this can come in handy if you are testing the application locally or in another environment where you don’t have any of these available (e.g. you want to read using s3a:
and write to the local file system instead of HDFS). Indeed one thing to keep in mind is that on EMR is natively available a modified version of Hadoop File System called EMRFS (EMR File System) which provides efficient read and write access to S3, but unfortunately this can’t be used outside of EMR.
Now let’s describe the content of the main module my_spark_app.index
.
First we import the required Spark modules as well as our custom configuration module. Notice also we are importing boto3
module. We won’t use it, but it’s just for verifying that we can leverage the external libraries installed in the docker image used for the Yarn container (we’ll talk about that later).
Next, after initializing the Spark session, we define a schema and load data on a dataframe:
Input data is in multiline json format where each line is a json object with the following structure:
After registering a temporary table (so we can easily query it using SQL-like syntax), we define a new dataframe keeping only the records with the last update for each location:
The following is a sample of data for Italy:
Now we can produce our csv reports partitioned by country-region / province-state. To make sure Spark will produce one report for each partition, we repartition the data before writing:
That’s it, our PySpark application is ready!
As a useful comparison, let’s see how the same application would look like on AWS Athena:
Unfortunately the above will not work. Although this a simple application with a small dataset, we’ll face the limit of 100 partitions for CTAS queries in Athena described previously:
Packaging our application
We can easily package our PySpark application running the npm script npm run package
or using the following command from terminal (make sure to be in the root folder of the project):
python setup.py bdist_egg
Our application should be packed in an egg
file under the dist
folder. In my case the file name is emr_pyspark_docker_tutorial-1.0.0-py3.8.egg, but it can vary depending on your Python version. An egg
file is one possible way to distribute our application modules in Spark (there are others), it should not be confused with packaging of external libraries such as boto3
or numpy
that must be available in the Python environment (see next section).
Define Python environment and external libraries using Docker on Yarn
For those unfamiliar with Apache Yarn, it is a resource manager and jobs scheduler inside Hadoop and one of the possible cluster managers to be used with Spark (the default inside EMR). Starting from EMR version 6.0.0, we can now define a custom Docker image that Yarn will use to schedule execution containers (where our application will run). This means we can define our Python environment and libraries in the Docker image and they will be available at runtime for our application. There are other ways to achieve the same thing in Yarn, for example zipping the whole Python environment and libraries in a file to ship along with our application, but as a Docker fan I prefer the other way 😎.
The only requirement for the Docker image is to have Java JDK8 installed. For our application we need also Python3
and boto3
. Under the emr
folder you can find two examples of Docker files using Amazon Corretto JDK and openjdk respectively as a base image:
As long as there is JDK8 installed, we should be fine. There is something to keep in mind though: for the second Docker file we’ll need also to specify the paths for JAVA_HOME
, PYSPARK_PYTHON
and PYSPARK_DRIVER_PYTHON
variables in the EMR configuration for Yarn and Spark. This is because by default EMR is configured to look in different locations for these variables, so we need to replace the default locations with the ones of our image (covered in the next section).
For this tutorial I will use the custom Docker file. To build our image, type the following in the terminal:
cd emrdocker build --rm -f emr.custom.Dockerfile -t emr-pyspark-docker-tutorial .
The build will take some time.
Next we need to host our image somewhere where EMR can pull it when we will launch our application. One option is AWS ECR (Elastic Container Registry), a managed Docker registry hosted on AWS. Log in to your account in AWS Console then from services pick ECR. Create a new repository called emr-pyspark-docker-tutorial
as shown below:
Now to push your image to the registry use the following commands from terminal (please replace region and ECR endpoint with the right values for your account):
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin 621329850245.dkr.ecr.us-east-1.amazonaws.comdocker tag emr-pyspark-docker-tutorial:latest 621329850245.dkr.ecr.us-east-1.amazonaws.com/emr-pyspark-docker-tutorial:latestdocker push 621329850245.dkr.ecr.us-east-1.amazonaws.com/emr-pyspark-docker-tutorial:latest
With the above commands we have authenticated the Docker client with our ECR registry, tagged and pushed our image to the registry. Once finished the upload our image will be available in the registry:
EMR cluster configurations
Before talking about EMR configuration, we need an S3 bucket for storing our PySpark files as well as the reports generated by our application and the logs produced by the EMR cluster. If you already have one that’s fine, otherwise create a new S3 bucket with the name you prefer. For this tutorial I will use the unbelievably short name of emr-pyspark-docker-tutorial-bucket
(unfortunately since S3 bucket names must be unique across AWS we cannot use the same name).
To load the PySpark files on the bucket you can use the following commands in the terminal (from the project root):
aws s3 cp dist/emr_pyspark_docker_tutorial-1.0.0-py3.8.egg s3://emr-pyspark-docker-tutorial-bucket/aws s3 cp src/my_spark_app/index.py s3://emr-pyspark-docker-tutorial-bucket/
Please replace S3 bucket name and egg
file name with the correct ones for your environment.
Ok now let’s discuss the main command to launch our EMR cluster in the launch-cluster.sh
script under the emr
folder, then we’ll discuss each configuration file one by one:
Here is a brief description of each option (you can always type aws emr create-cluster help
to get the complete list of options available):
- name: the name of our cluster.
- release-label: EMR cluster version.
- applications: the frameworks we need to be installed and configured on our cluster. We need Spark and also Hadoop for the S3DistCp tool (described later).
- log-uri: S3 path to store logs generated by our cluster.
- instance-groups: this specify the type of EC2 instances we want to use in our cluster. We can also specify scaling policies to handle different workloads. Instead of using an inline definition, we define the configuration in the
instance_groups.json
file under theconfig
folder. - auto-scaling-role: the IAM role to be used for autoscaling our cluster. The default one is
EMR_AutoScaling_DefaultRole
which we will create in a minute. - configurations: here we can specify custom configurations for Yarn, Spark and other services. Defined in
services.json
configuration file. - steps: each step represents a unit of work executed with one of the frameworks installed in our cluster, such as Spark, Hive and others. Defined in the
steps.json
configuration file. - auto-terminate: this is a very important option for our goal, it will auto-terminate the cluster once our application has finished, emulating the serverless model and minimizing the costs.
- enable-debugging: this will enable debugging logs to be written to S3. More information here.
- use-default-roles: this will tell EMR to use default roles for our cluster. Initially these roles don’t exist, so we need to create them.
To create the default roles for EMR, just type the following command in the terminal:
aws emr create-default-roles
The following IAM roles should be created: EMR_DefaultRole
, EMR_EC2_DefaultRole
, EMR_AutoScaling_DefaultRole
. To allow EC2 machines provisioned by EMR to pull our previously defined Docker image from ECR registry, we need to add the AmazonEC2ContainerRegistryReadOnly
policy to EMR_EC2_DefaultRole
role. Type the following command in the terminal to add the policy:
aws iam attach-role-policy --role-name EMR_EC2_DefaultRole --policy-arn arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryReadOnly
Steps configuration
Let’s first talk about steps.json
configuration file:
Here we define two steps that will be executed sequentially. The first step is the one that will execute our PySpark application. The second step will leverage the S3DistCp tool to copy data from HDFS to S3 (by default our PySpark application will write to HDFS). Although in Spark it is possible to write to S3 directly using EMRFS, I found that writing to HDFS first and copy data from HDFS to S3 using S3DistCp is generally faster (tested on a production application with 700 GB of data) but feel free to modify the application and remove the S3DistCp step if you want. Please make sure to replace both the S3 bucket name and egg
file name with the correct values for your environment. Notice also that we have set ActionOnFailure
property to TERMINATE_CLUSTER
for both steps, meaning that our cluster will be terminated and all resources released in case of any error.
Services configurations
Ok now let’s talk about services.json
configuration file:
Here you can specify custom configurations for Yarn, Spark and other services available in the cluster. Aside from specific Spark configuration parameters, I would like to bring to your attention the following properties:
- docker.privileged-containers.registries and docker.trusted.registries: here we are telling EMR to trust Docker images coming from the specified repositories. You need to replace the string 621329850245.dkr.ecr.us-east-1.amazonaws.com with your ECR specific endpoint.
- spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE and spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE: both of these properties are set to
docker
, thus enabling the Docker execution mode both for Yarn Application Master and Spark executors. - spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE and spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE: here we specify the Docker image for the Yarn Application Master and the Spark executors. You need to replace the string 621329850245.dkr.ecr.us-east-1.amazonaws.com/emr-pyspark-docker-tutorial:latest with the URI of your ECR Docker image.
- spark.yarn.appMasterEnv.JAVA_HOME and spark.yarn.executorEnv.JAVA_HOME: as pointed previously, we need to override the default JAVA_HOME location with the one defined in our custom Docker image.
- PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON: we need also to override the default PySpark executable path with the one defined in our custom Docker image.
- spark.dynamicAllocation.enabled and spark.shuffle.service.enabled: these will enable dynamic allocation / deallocation of Spark executors depending on the workload. It will go in synergy with EMR autoscaling capability to add or remove cluster nodes based on CloudWatch metrics (discussed next).
Instance groups: how to make your cluster elastic using autoscaling
Finally let’s discuss instance_groups.json
configuration file:
In this file we specify EC2 instance type / family as well as autoscaling policies for both MASTER and CORE nodes. On EMR we have much finer-grained control over computing capacity compared to other services like AWS Glue where there are only three types of workers. Here we can find list of all the EC2 instance types that we can use on EMR.
Another EMR cool feature is the definition of autoscaling policies for cluster nodes. These policies can make our cluster “elastic”, meaning that nodes can be added or removed based on CloudWatch metrics we specify. This can be useful in all those situations where the workload can vary or is not predictable, keeping the costs low for normal workload and adding more computing power in case of need. Of course the frameworks used in our application must be designed to take advantage of this elasticity at runtime. In the case of Spark we have enabled dynamic allocation which means Spark will try to add or remove executors based on the workload and will use all the available nodes in the cluster. There are several CloudWatch metrics you can use to define your scaling policies, two common ones are:
- YARNMemoryAvailablePercentage: the percentage of remaining memory that’s available for YARN.
- ContainerPendingRatio: the ratio of pending containers to allocated containers.
As an example (we don’t need autoscaling for this simple application), we have defined a scaling out policy for our core nodes which will add two instances if the Yarn memory available is below 30%. This metric will be evaluated every 5 minutes by CloudWatch.
Below is shown an example of autoscaling in action for another application:
Launch the cluster
We are finally ready to launch our application.
WARNING: you will be charged by AWS as long as your cluster is in the running state. Here you can find details for EMR pricing. Our application is very small and should complete in a few minutes, thus the cost should be low (around 0,1 $).
If you have NPM installed you can simply type npm run launch-cluster
in terminal, otherwise use the following commands:
cd emr./launch-cluster.sh
You should receive a response like this:
Our cluster has been assigned an ID and it is starting up. To monitor your cluster and the state of the application go to AWS Console and then EMR service. Under clusters section, you should find your new cluster. Luckily there are two user interfaces, YARN timeline server
and Spark history server
, that are publicly available and don’t require SSH to be accessed.
In the hardware tab we can see the instance types and autoscaling policies we have defined:
Finally in the steps tab we can check the state of our application:
If all worked correctly, you should see the reports generated by our application on the S3 bucket under the path reports
:
That’s it! Our cluster will auto-terminate automatically once the application has finished, releasing all resources.
In conclusion
In this article we have seen a possible strategy to run distributed computing workloads on EMR using the auto-terminate option to keep costs low as in the serverless model while keeping all the flexibility of EMR for tuning, configuration and scalability. We have also seen how we can package our environment and library dependencies using Docker as container engine for our Yarn containers. Is this a better option compared to other AWS services such as Athena or Glue? It is not, as always it depends on your requirements.
As a last note, I would like to point out that all we have done here can be easily automated in a CI/CD pipeline with GitHub, GitLab, Travis-CI or any other tool that offers integration capabilities. We could also have scheduled a Lambda to launch our cluster application using an EventBridge rule or Steps Functions.
There are many possibilities, that is up to you.
Thanks for reading! 😎