Building an Event-Driven, Fault-Tolerant Data Pipeline with AWS Lambda, Alluxio, and Spark

A while back we wrote about how we integrated Redshift into our platform, and, more recently, we’ve discussed how to get Alluxio up and running in a DC/OS cluster. Now, we’d like to spend a little time to discuss how we bring these and other technologies together to create an event-driven data pipeline.

The Problem

In our platform we often have to fetch data from various locations (e.g. S3, SFTP, API) and in various formats (CSV, TSV, JSON, XML) because we have an incredibly diverse client and publisher catalog and each one provides their data in their own unique way. As we have grown over time, we’ve amassed a large list of microservices, processes, and configuration that handle these different data sources and files. The biggest issue that we’ve run into with these services is that the various portions of the data pipeline do not interact as well as we would like, so if there are any errors in that process for any reason, it can be difficult to track down where it is at times. We have begun to feel some strain from this, so we’re abstracting and centralizing as much as we can.

We’ve also realized that, because of the way we have to model and structure our data to provide clients with the availability and reporting flexibility that will be most effective for campaign analysis, we have begun to put quite a bit of unnecessary strain on our Redshift cluster. One of the solutions to this is reworking how our data is inserted into Redshift, which, at the same time, will remove the need to have this data available in the MySQL (or other RDBMS) layer.

The Solution

This led us to developing an event-based, fault-tolerant architecture integrating AWS Lambda (which we’ve also written about in the past), Alluxio, and Spark running in a DC/OS cluster. For those that may be struggling through some of the configuration necessary for such a setup, we would like to run through some of the challenges that we hit and how we worked through them. This post assumes the reader already has a DC/OS cluster up and running on Amazon AWS in a VPC, though several of these services can be run on their own standalone clusters.

Service Discovery in DC/OS

The first challenge was service discovery in DC/OS. If a service is running within the cluster, then there’s not a problem there, but since we had Lambda as part of our architecture, it’s a bit more tricky. The Lambda server can be spun up within the same VPC as the cluster, but it does not contain the mesos-dns service needed to resolve internal service names. So what we ended doing here is using an internal Elastic Load Balancer instance in AWS coupled with an internal marathon-lb instance running within the DC/OS cluster.

We won’t go through the details of how to set up an ELB service, but it should be created within the DC/OS VPC. During the Add EC2 Instances step, select the DC/OS slave nodes with the SlaveServerGroup tag.

Next, set up an internal marathon-lb instance. We can create a new instance by navigating to http://<your-dcos-host>/#/universe/packages and clicking Install next to marathon-lb. However, if there is already an external marathon load balancer running, it will conflict and block a second instance from installing. This is due to the fact that – at least at the time we set up our instance – there is no way to change the service name that registers with marathon in the UI. In this case, you should receive an error like the following:

marathon-lb-name-exists

What we did to get around this was set up the configuration we wanted, clicked Review and Install, then in the top right corner, there is an option to download the config.json file. Edit the file to look something like this:

{
  "marathon-lb": {
    "auto-assign-service-ports": false,
    "bind-http-https": false,
    "cpus": 2,
    "haproxy-group": "internal",
    "haproxy-map": true,
    "instances": 1,
    "mem": 1024,
    "minimumHealthCapacity": 0.5,
    "maximumOverCapacity": 0.2,
    "name": "marathon-lb-internal",
    "role": "*",
    "sysctl-params": "net.ipv4.tcp_tw_reuse=1 net.ipv4.tcp_fin_timeout=30 net.ipv4.tcp_max_syn_backlog=10240 net.ipv4.tcp_max_tw_buckets=400000 net.ipv4.tcp_max_orphans=60000 net.core.somaxconn=10000",
    "marathon-uri": "http://marathon.mesos:8080"
  }
}

 

The important part is the name key, which sets it apart from the existing instance, the haproxy-group setting, and bind-http-https. Then run the following command through the DC/OS cli.

dcos package install --options=config.json marathon-lb

 

This will spin up the package in your cluster with the modified config.

Spark without HDFS

The next issue that we ran into was during the setup of Spark within the cluster. Since we planned on using Alluxio as our distributed file system, we did not need HDFS since that would take up valuable resources in the cluster that could be allocated for other services. The Spark package in the DC/OS Universe has default configuration for HDFS. And to top that, even if HDFS is desired, at the time the HDFS package and Spark packages were out of sync in the Universe. HDFS had been updated but the Spark package had not, so the default configuration was wrong. After some digging, we discovered there is an updated configuration it can be pointed to, or we could create our own. Since we were disabling it altogether, we decided to provide our own.

hdfs-site.xml

<configuration>
  <property>
    <name>dfs.ha.automatic-failover.enabled</name>
    <value>true</value>
  </property>
  <property>
    <name>dfs.nameservice.id</name>
    <value>hdfs</value>
  </property>
  <property>
    <name>dfs.nameservices</name>
    <value>hdfs</value>
  </property>
  <property>
    <name>dfs.ha.namenodes.hdfs</name>
    <value>nn1,nn2</value>
  </property>
</configuration>

 

core-site.xml

<configuration>
  <property>
    <name>fs.default.name</name>
    <value>file:///tmp</value>
  </property>
  <property>
    <name>hadoop.proxyuser.hue.hosts</name>
    <value>*</value>
  </property>
  <property>
    <name>hadoop.proxyuser.hue.groups</name>
    <value>*</value>
  </property>
  <property>
    <name>hadoop.proxyuser.root.hosts</name>
    <value>*</value>
  </property>
  <property>
    <name>hadoop.proxyuser.root.groups</name>
    <value>*</value>
  </property>
  <property>
    <name>hadoop.proxyuser.httpfs.hosts</name>
    <value>*</value>
  </property>
  <property>
    <name>hadoop.proxyuser.httpfs.groups</name>
    <value>*</value>
  </property>
  <property>
    <name>fs.alluxio.impl</name>
    <value>alluxio.hadoop.FileSystem</value>
  </property>
</configuration>

 

The hdfs-site.xml contains the updated configuration. The important part here is the fs.default.name setting in core-site.xml. We just set it to the local temp directory. Also notice fs.alluxio.impl – we’ll discuss this in the next section. Anyway, drop these files in an S3 bucket accessible by the DC/OS cluster. For this, you can configure a bucket policy that allows access from a specific VPC. Then provide that bucket to the config-url option in the Spark package configuration.

Accelerating Data Access with Alluxio

In our initial testing, we realized using Spark to read directly from S3 was not going to work. Due to S3 being a key-store rather than an actual filesystem, the way Spark has to pull files through it is inefficient. Much of this has to do with directory listing and the fact that Spark often has to read the same file multiple times when running aggregations. So, while looking for ways to accelerate this data access, we came across Alluxio, an open source, memory speed, virtual distributed file system. As we said above, we’ve already written about how to get Alluxio up and running in a DC/OS cluster, but there are also a couple of application-side issues to be aware of.

For one, there are a couple of different ways to provide the master hostname to your application. One method must be used when using the Alluxio API, while the other must be used when using the Hadoop filesystem scheme with Spark. For example, the following will not work:

val alluxioPath = new AlluxioURI("alluxio://<your-elb-endpoint>:10102/filename.csv")

 

The uri scheme is accepted here, but has no effect on the configuration. What’s required here is a properties file alluxio-site.properties within your project with the following settings:

alluxio.master.hostname=<your-elb-endpoint>
alluxio.master.port=10102
alluxio.user.network.netty.channel=NIO
alluxio.user.file.writetype.default=CACHE_THROUGH
alluxio.user.ufs.delegation.enabled=true

 

Two other important configuration options here are the netty channel and UFS delegation. The default netty channel is EPOLL which throws an error due to some limitations with sockets in Lambda.

io.netty.channel.ChannelException: Unable to create Channel from class class io.netty.channel.epoll.EpollSocketChannel: java.io.IOException

 

The full stack trace can be seen in this gist.

UFS delegation is noted to be enabled by default in the documentation, but in the version we set up, it was disabled. Without this setting, the Lambda function will likely timeout when it tries to write through to the UnderFS which may be due to a limitation similar to the one above. At the time of writing it is not clear what is causing this, but enabling the setting delegates any UFS operations to the Alluxio worker so we can bypass this issue.

On the other side of this, the properties file is not respected by the filesystem scheme in Spark, so we must use the URI format. In addition, the fs.alluxio.impl configuration value needs to be set to alluxio.hadoop.FileSystem. This should work by adding it to core-site.xml as seen above, but our Spark instance was not picking this up, so we just configured it on the SparkContext in our application:

context.hadoopConfiguration.set("fs.alluxio.impl", "alluxio.hadoop.FileSystem")

 

Submitting the Spark Job

The final challenge was submitting the job to Spark from Lambda. There are a few ways this can be done. One way would be to spin up an EMR cluster via the AWS API and add the job as a step. This was a bit too slow for us, so it would be a better option to run a persistent EMR cluster that we can submit to. However, we already had Chronos running in our DC/OS cluster for job scheduling. Chronos has a REST API so that seemed like a desirable option. It’s possible to submit a `docker` job to it and give it commands to run. So what we ended up doing is submitting a docker job with the mesosphere/spark image and providing commands to grab our JAR file from an S3 bucket and call spark-submit to submit to mesos://leader.mesos:5050.

Call it a Day

Once we we were able to work our way through this maze of challenges, everything began working together and running smoothly. We hope this helps anyone who is looking to set up their own event-driven data pipeline in the AWS ecosystem. If there is anything we missed, or any other issues you might have run in to, or you’d just like to share your experiences, please send us a note in the comments below.

Categories
Technology