Integration with Cloud Infrastructures

Introduction

All major cloud providers offer persistent data storage in object stores. These are not classic “POSIX” file systems. In order to store hundreds of petabytes of data without any single points of failure, object stores replace the classic filesystem directory tree with a simpler model of object-name => data. To enable remote access, operations on objects are usually offered as (slow) HTTP REST operations.

Spark can read and write data in object stores through filesystem connectors implemented in Hadoop or provided by the infrastructure suppliers themselves. These connectors make the object stores look almost like filesystems, with directories and files and the classic operations on them such as list, delete and rename.

Important: Cloud Object Stores are Not Real Filesystems

While the stores appear to be filesystems, underneath they are still object stores, and the difference is significant

They cannot be used as a direct replacement for a cluster filesystem such as HDFS except where this is explicitly stated.

Key differences are:

How does this affect Spark?

  1. Reading and writing data can be significantly slower than working with a normal filesystem.
  2. Some directory structures may be very inefficient to scan during query split calculation.
  3. The output of work may not be immediately visible to a follow-on query.
  4. The rename-based algorithm by which Spark normally commits work when saving an RDD, DataFrame or Dataset is potentially both slow and unreliable.

For these reasons, it is not always safe to use an object store as a direct destination of queries, or as an intermediate store in a chain of queries. Consult the documentation of the object store and its connector to determine which uses are considered safe.

In particular: without some form of consistency layer, Amazon S3 cannot be safely used as the direct destination of work with the normal rename-based committer.

Installation

With the relevant libraries on the classpath and Spark configured with valid credentials, objects can be can be read or written by using their URLs as the path to data. For example sparkContext.textFile("s3a://landsat-pds/scene_list.gz") will create an RDD of the file scene_list.gz stored in S3, using the s3a connector.

To add the relevant libraries to an application’s classpath, include the hadoop-cloud module and its dependencies.

In Maven, add the following to the pom.xml file, assuming spark.version is set to the chosen version of Spark:

<dependencyManagement>
  ...
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>hadoop-cloud_2.11</artifactId>
    <version>${spark.version}</version>
  </dependency>
  ...
</dependencyManagement>

Commercial products based on Apache Spark generally directly set up the classpath for talking to cloud infrastructures, in which case this module may not be needed.

Authenticating

Spark jobs must authenticate with the object stores to access data within them.

  1. When Spark is running in a cloud infrastructure, the credentials are usually automatically set up.
  2. spark-submit reads the AWS_ACCESS_KEY, AWS_SECRET_KEY and AWS_SESSION_TOKEN environment variables and sets the associated authentication options for the s3n and s3a connectors to Amazon S3.
  3. In a Hadoop cluster, settings may be set in the core-site.xml file.
  4. Authentication details may be manually added to the Spark configuration in spark-default.conf
  5. Alternatively, they can be programmatically set in the SparkConf instance used to configure the application’s SparkContext.

Important: never check authentication secrets into source code repositories, especially public ones

Consult the Hadoop documentation for the relevant configuration and security options.

Configuring

Each cloud connector has its own set of configuration parameters, again, consult the relevant documentation.

For object stores whose consistency model means that rename-based commits are safe use the FileOutputCommitter v2 algorithm for performance:

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2

This does less renaming at the end of a job than the “version 1” algorithm. As it still uses rename() to commit files, it is unsafe to use when the object store does not have consistent metadata/listings.

The committer can also be set to ignore failures when cleaning up temporary files; this reduces the risk that a transient network problem is escalated into a job failure:

spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored true

As storing temporary files can run up charges; delete directories called "_temporary" on a regular basis to avoid this.

Parquet I/O Settings

For optimal performance when working with Parquet data use the following settings:

spark.hadoop.parquet.enable.summary-metadata false
spark.sql.parquet.mergeSchema false
spark.sql.parquet.filterPushdown true
spark.sql.hive.metastorePartitionPruning true

These minimise the amount of data read during queries.

ORC I/O Settings

For best performance when working with ORC data, use these settings:

spark.sql.orc.filterPushdown true
spark.sql.orc.splits.include.file.footer true
spark.sql.orc.cache.stripe.details.size 10000
spark.sql.hive.metastorePartitionPruning true

Again, these minimise the amount of data read during queries.

Spark Streaming and Object Storage

Spark Streaming can monitor files added to object stores, by creating a FileInputDStream to monitor a path in the store through a call to StreamingContext.textFileStream().

  1. The time to scan for new files is proportional to the number of files under the path, not the number of new files, so it can become a slow operation. The size of the window needs to be set to handle this.

  2. Files only appear in an object store once they are completely written; there is no need for a workflow of write-then-rename to ensure that files aren’t picked up while they are still being written. Applications can write straight to the monitored directory.

  3. Streams should only be checkpointed to a store implementing a fast and atomic rename() operation Otherwise the checkpointing may be slow and potentially unreliable.

Further Reading

Here is the documentation on the standard connectors both from Apache and the cloud providers.