Monitoring and Instrumentation
There are several ways to monitor Spark applications: web UIs, metrics, and external instrumentation.
Web Interfaces
Every SparkContext launches a web UI, by default on port 4040, that displays useful information about the application. This includes:
- A list of scheduler stages and tasks
- A summary of RDD sizes and memory usage
- Environmental information.
- Information about the running executors
You can access this interface by simply opening http://<driver-node>:4040
in a web browser.
If multiple SparkContexts are running on the same host, they will bind to successive ports
beginning with 4040 (4041, 4042, etc).
Note that this information is only available for the duration of the application by default.
To view the web UI after the fact, set spark.eventLog.enabled
to true before starting the
application. This configures Spark to log Spark events that encode the information displayed
in the UI to persisted storage.
Viewing After the Fact
It is still possible to construct the UI of an application through Spark’s history server, provided that the application’s event logs exist. You can start the history server by executing:
./sbin/start-history-server.sh
This creates a web interface at http://<server-url>:18080
by default, listing incomplete
and completed applications and attempts.
When using the file-system provider class (see spark.history.provider
below), the base logging
directory must be supplied in the spark.history.fs.logDirectory
configuration option,
and should contain sub-directories that each represents an application’s event logs.
The spark jobs themselves must be configured to log events, and to log them to the same shared,
writable directory. For example, if the server was configured with a log directory of
hdfs://namenode/shared/spark-logs
, then the client-side options would be:
spark.eventLog.enabled true
spark.eventLog.dir hdfs://namenode/shared/spark-logs
The history server can be configured as follows:
Environment Variables
Environment Variable | Meaning |
---|---|
SPARK_DAEMON_MEMORY |
Memory to allocate to the history server (default: 1g). |
SPARK_DAEMON_JAVA_OPTS |
JVM options for the history server (default: none). |
SPARK_DAEMON_CLASSPATH |
Classpath for the history server (default: none). |
SPARK_PUBLIC_DNS |
The public address for the history server. If this is not set, links to application history may use the internal address of the server, resulting in broken links (default: none). |
SPARK_HISTORY_OPTS |
spark.history.* configuration options for the history server (default: none).
|
Spark History Server Configuration Options
Security options for the Spark History Server are covered more detail in the Security page.
Property Name | Default | Meaning |
---|---|---|
spark.history.provider | org.apache.spark.deploy.history.FsHistoryProvider |
Name of the class implementing the application history backend. Currently there is only one implementation, provided by Spark, which looks for application logs stored in the file system. |
spark.history.fs.logDirectory | file:/tmp/spark-events |
For the filesystem history provider, the URL to the directory containing application event
logs to load. This can be a local file:// path,
an HDFS path hdfs://namenode/shared/spark-logs
or that of an alternative filesystem supported by the Hadoop APIs.
|
spark.history.fs.update.interval | 10s | The period at which the filesystem history provider checks for new or updated logs in the log directory. A shorter interval detects new applications faster, at the expense of more server load re-reading updated applications. As soon as an update has completed, listings of the completed and incomplete applications will reflect the changes. |
spark.history.retainedApplications | 50 | The number of applications to retain UI data for in the cache. If this cap is exceeded, then the oldest applications will be removed from the cache. If an application is not in the cache, it will have to be loaded from disk if it is accessed from the UI. |
spark.history.ui.maxApplications | Int.MaxValue | The number of applications to display on the history summary page. Application UIs are still available by accessing their URLs directly even if they are not displayed on the history summary page. |
spark.history.ui.port | 18080 | The port to which the web interface of the history server binds. |
spark.history.kerberos.enabled | false |
Indicates whether the history server should use kerberos to login. This is required
if the history server is accessing HDFS files on a secure Hadoop cluster. If this is
true, it uses the configs spark.history.kerberos.principal and
spark.history.kerberos.keytab .
|
spark.history.kerberos.principal | (none) | Kerberos principal name for the History Server. |
spark.history.kerberos.keytab | (none) | Location of the kerberos keytab file for the History Server. |
spark.history.fs.cleaner.enabled | false | Specifies whether the History Server should periodically clean up event logs from storage. |
spark.history.fs.cleaner.interval | 1d |
How often the filesystem job history cleaner checks for files to delete.
Files are only deleted if they are older than spark.history.fs.cleaner.maxAge
|
spark.history.fs.cleaner.maxAge | 7d | Job history files older than this will be deleted when the filesystem history cleaner runs. |
spark.history.fs.endEventReparseChunkSize | 1m | How many bytes to parse at the end of log files looking for the end event. This is used to speed up generation of application listings by skipping unnecessary parts of event log files. It can be disabled by setting this config to 0. |
spark.history.fs.inProgressOptimization.enabled | true | Enable optimized handling of in-progress logs. This option may leave finished applications that fail to rename their event logs listed as in-progress. |
spark.history.fs.numReplayThreads | 25% of available cores | Number of threads that will be used by history server to process event logs. |
spark.history.store.maxDiskUsage | 10g | Maximum disk usage for the local directory where the cache application history information are stored. |
spark.history.store.path | (none) | Local directory where to cache application history data. If set, the history server will store application data on disk instead of keeping it in memory. The data written to disk will be re-used in the event of a history server restart. |
Note that in all of these UIs, the tables are sortable by clicking their headers, making it easy to identify slow tasks, data skew, etc.
Note
-
The history server displays both completed and incomplete Spark jobs. If an application makes multiple attempts after failures, the failed attempts will be displayed, as well as any ongoing incomplete attempt or the final successful attempt.
-
Incomplete applications are only updated intermittently. The time between updates is defined by the interval between checks for changed files (
spark.history.fs.update.interval
). On larger clusters, the update interval may be set to large values. The way to view a running application is actually to view its own web UI. -
Applications which exited without registering themselves as completed will be listed as incomplete —even though they are no longer running. This can happen if an application crashes.
-
One way to signal the completion of a Spark job is to stop the Spark Context explicitly (
sc.stop()
), or in Python using thewith SparkContext() as sc:
construct to handle the Spark Context setup and tear down.
REST API
In addition to viewing the metrics in the UI, they are also available as JSON. This gives developers
an easy way to create new visualizations and monitoring tools for Spark. The JSON is available for
both running applications, and in the history server. The endpoints are mounted at /api/v1
. Eg.,
for the history server, they would typically be accessible at http://<server-url>:18080/api/v1
, and
for a running application, at http://localhost:4040/api/v1
.
In the API, an application is referenced by its application ID, [app-id]
.
When running on YARN, each application may have multiple attempts, but there are attempt IDs
only for applications in cluster mode, not applications in client mode. Applications in YARN cluster mode
can be identified by their [attempt-id]
. In the API listed below, when running in YARN cluster mode,
[app-id]
will actually be [base-app-id]/[attempt-id]
, where [base-app-id]
is the YARN application ID.
Endpoint | Meaning |
---|---|
/applications |
A list of all applications.
?status=[completed|running] list only applications in the chosen state.
?minDate=[date] earliest start date/time to list.
?maxDate=[date] latest start date/time to list.
?minEndDate=[date] earliest end date/time to list.
?maxEndDate=[date] latest end date/time to list.
?limit=[limit] limits the number of applications listed.
Examples: ?minDate=2015-02-10
?minDate=2015-02-03T16:42:40.000GMT
?maxDate=2015-02-11T20:41:30.000GMT
?minEndDate=2015-02-12
?minEndDate=2015-02-12T09:15:10.000GMT
?maxEndDate=2015-02-14T16:30:45.000GMT
?limit=10 |
/applications/[app-id]/jobs |
A list of all jobs for a given application.
?status=[running|succeeded|failed|unknown] list only jobs in the specific state.
|
/applications/[app-id]/jobs/[job-id] |
Details for the given job. |
/applications/[app-id]/stages |
A list of all stages for a given application.
?status=[active|complete|pending|failed] list only stages in the state.
|
/applications/[app-id]/stages/[stage-id] |
A list of all attempts for the given stage. |
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id] |
Details for the given stage attempt. |
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummary |
Summary metrics of all tasks in the given stage attempt.
?quantiles summarize the metrics with the given quantiles.
Example: ?quantiles=0.01,0.5,0.99
|
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskList |
A list of all tasks for the given stage attempt.
?offset=[offset]&length=[len] list tasks in the given range.
?sortBy=[runtime|-runtime] sort the tasks.
Example: ?offset=10&length=50&sortBy=runtime
|
/applications/[app-id]/executors |
A list of all active executors for the given application. |
/applications/[app-id]/executors/[executor-id]/threads |
Stack traces of all the threads running within the given active executor. Not available via the history server. |
/applications/[app-id]/allexecutors |
A list of all(active and dead) executors for the given application. |
/applications/[app-id]/storage/rdd |
A list of stored RDDs for the given application. |
/applications/[app-id]/storage/rdd/[rdd-id] |
Details for the storage status of a given RDD. |
/applications/[base-app-id]/logs |
Download the event logs for all attempts of the given application as files within a zip file. |
/applications/[base-app-id]/[attempt-id]/logs |
Download the event logs for a specific application attempt as a zip file. |
/applications/[app-id]/streaming/statistics |
Statistics for the streaming context. |
/applications/[app-id]/streaming/receivers |
A list of all streaming receivers. |
/applications/[app-id]/streaming/receivers/[stream-id] |
Details of the given receiver. |
/applications/[app-id]/streaming/batches |
A list of all retained batches. |
/applications/[app-id]/streaming/batches/[batch-id] |
Details of the given batch. |
/applications/[app-id]/streaming/batches/[batch-id]/operations |
A list of all output operations of the given batch. |
/applications/[app-id]/streaming/batches/[batch-id]/operations/[outputOp-id] |
Details of the given operation and given batch. |
/applications/[app-id]/environment |
Environment details of the given application. |
/version |
Get the current spark version. |
The number of jobs and stages which can be retrieved is constrained by the same retention
mechanism of the standalone Spark UI; "spark.ui.retainedJobs"
defines the threshold
value triggering garbage collection on jobs, and spark.ui.retainedStages
that for stages.
Note that the garbage collection takes place on playback: it is possible to retrieve
more entries by increasing these values and restarting the history server.
Executor Task Metrics
The REST API exposes the values of the Task Metrics collected by Spark executors with the granularity of task execution. The metrics can be used for performance troubleshooting and workload characterization. A list of the available metrics, with a short description:
Spark Executor Task Metric name | Short description |
---|---|
executorRunTime | Elapsed time the executor spent running this task. This includes time fetching shuffle data. The value is expressed in milliseconds. |
executorCpuTime | CPU time the executor spent running this task. This includes time fetching shuffle data. The value is expressed in nanoseconds. |
executorDeserializeTime | Elapsed time spent to deserialize this task. The value is expressed in milliseconds. |
executorDeserializeCpuTime | CPU time taken on the executor to deserialize this task. The value is expressed in nanoseconds. |
resultSize | The number of bytes this task transmitted back to the driver as the TaskResult. |
jvmGCTime | Elapsed time the JVM spent in garbage collection while executing this task. The value is expressed in milliseconds. |
resultSerializationTime | Elapsed time spent serializing the task result. The value is expressed in milliseconds. |
memoryBytesSpilled | The number of in-memory bytes spilled by this task. |
diskBytesSpilled | The number of on-disk bytes spilled by this task. |
peakExecutionMemory | Peak memory used by internal data structures created during shuffles, aggregations and joins. The value of this accumulator should be approximately the sum of the peak sizes across all such data structures created in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort. |
inputMetrics.* | Metrics related to reading data from [[org.apache.spark.rdd.HadoopRDD]] or from persisted data. |
.bytesRead | Total number of bytes read. |
.recordsRead | Total number of records read. |
outputMetrics.* | Metrics related to writing data externally (e.g. to a distributed filesystem), defined only in tasks with output. |
.bytesWritten | Total number of bytes written |
.recordsWritten | Total number of records written |
shuffleReadMetrics.* | Metrics related to shuffle read operations. |
.recordsRead | Number of records read in shuffle operations |
.remoteBlocksFetched | Number of remote blocks fetched in shuffle operations |
.localBlocksFetched | Number of local (as opposed to read from a remote executor) blocks fetched in shuffle operations |
.totalBlocksFetched | Number of blocks fetched in shuffle operations (both local and remote) |
.remoteBytesRead | Number of remote bytes read in shuffle operations |
.localBytesRead | Number of bytes read in shuffle operations from local disk (as opposed to read from a remote executor) |
.totalBytesRead | Number of bytes read in shuffle operations (both local and remote) |
.remoteBytesReadToDisk | Number of remote bytes read to disk in shuffle operations. Large blocks are fetched to disk in shuffle read operations, as opposed to being read into memory, which is the default behavior. |
.fetchWaitTime | Time the task spent waiting for remote shuffle blocks. This only includes the time blocking on shuffle input data. For instance if block B is being fetched while the task is still not finished processing block A, it is not considered to be blocking on block B. The value is expressed in milliseconds. |
shuffleWriteMetrics.* | Metrics related to operations writing shuffle data. |
.bytesWritten | Number of bytes written in shuffle operations |
.recordsWritten | Number of records written in shuffle operations |
.writeTime | Time spent blocking on writes to disk or buffer cache. The value is expressed in nanoseconds. |
API Versioning Policy
These endpoints have been strongly versioned to make it easier to develop applications on top. In particular, Spark guarantees:
- Endpoints will never be removed from one version
- Individual fields will never be removed for any given endpoint
- New endpoints may be added
- New fields may be added to existing endpoints
- New versions of the api may be added in the future as a separate endpoint (eg.,
api/v2
). New versions are not required to be backwards compatible. - Api versions may be dropped, but only after at least one minor release of co-existing with a new api version.
Note that even when examining the UI of running applications, the applications/[app-id]
portion is
still required, though there is only one application available. Eg. to see the list of jobs for the
running app, you would go to http://localhost:4040/api/v1/applications/[app-id]/jobs
. This is to
keep the paths consistent in both modes.
Metrics
Spark has a configurable metrics system based on the
Dropwizard Metrics Library.
This allows users to report Spark metrics to a variety of sinks including HTTP, JMX, and CSV
files. The metrics system is configured via a configuration file that Spark expects to be present
at $SPARK_HOME/conf/metrics.properties
. A custom file location can be specified via the
spark.metrics.conf
configuration property.
By default, the root namespace used for driver or executor metrics is
the value of spark.app.id
. However, often times, users want to be able to track the metrics
across apps for driver and executors, which is hard to do with application ID
(i.e. spark.app.id
) since it changes with every invocation of the app. For such use cases,
a custom namespace can be specified for metrics reporting using spark.metrics.namespace
configuration property.
If, say, users wanted to set the metrics namespace to the name of the application, they
can set the spark.metrics.namespace
property to a value like ${spark.app.name}
. This value is
then expanded appropriately by Spark and is used as the root namespace of the metrics system.
Non-driver and executor metrics are never prefixed with spark.app.id
, nor does the
spark.metrics.namespace
property have any such affect on such metrics.
Spark’s metrics are decoupled into different instances corresponding to Spark components. Within each instance, you can configure a set of sinks to which metrics are reported. The following instances are currently supported:
master
: The Spark standalone master process.applications
: A component within the master which reports on various applications.worker
: A Spark standalone worker process.executor
: A Spark executor.driver
: The Spark driver process (the process in which your SparkContext is created).shuffleService
: The Spark shuffle service.applicationMaster
: The Spark ApplicationMaster when running on YARN.
Each instance can report to zero or more sinks. Sinks are contained in the
org.apache.spark.metrics.sink
package:
ConsoleSink
: Logs metrics information to the console.CSVSink
: Exports metrics data to CSV files at regular intervals.JmxSink
: Registers metrics for viewing in a JMX console.MetricsServlet
: Adds a servlet within the existing Spark UI to serve metrics data as JSON data.GraphiteSink
: Sends metrics to a Graphite node.Slf4jSink
: Sends metrics to slf4j as log entries.StatsdSink
: Sends metrics to a StatsD node.
Spark also supports a Ganglia sink which is not included in the default build due to licensing restrictions:
GangliaSink
: Sends metrics to a Ganglia node or multicast group.
To install the GangliaSink
you’ll need to perform a custom build of Spark. Note that
by embedding this library you will include LGPL-licensed
code in your Spark package. For sbt users, set the
SPARK_GANGLIA_LGPL
environment variable before building. For Maven users, enable
the -Pspark-ganglia-lgpl
profile. In addition to modifying the cluster’s Spark build
user applications will need to link to the spark-ganglia-lgpl
artifact.
The syntax of the metrics configuration file is defined in an example configuration file,
$SPARK_HOME/conf/metrics.properties.template
.
Advanced Instrumentation
Several external tools can be used to help profile the performance of Spark jobs:
- Cluster-wide monitoring tools, such as Ganglia, can provide insight into overall cluster utilization and resource bottlenecks. For instance, a Ganglia dashboard can quickly reveal whether a particular workload is disk bound, network bound, or CPU bound.
- OS profiling tools such as dstat, iostat, and iotop can provide fine-grained profiling on individual nodes.
- JVM utilities such as
jstack
for providing stack traces,jmap
for creating heap-dumps,jstat
for reporting time-series statistics andjconsole
for visually exploring various JVM properties are useful for those comfortable with JVM internals.