pyspark.streaming module¶
Module contents¶
-
class
pyspark.streaming.
StreamingContext
(sparkContext, batchDuration=None, jssc=None)[source]¶ Bases:
object
Main entry point for Spark Streaming functionality. A StreamingContext represents the connection to a Spark cluster, and can be used to create
DStream
various input sources. It can be from an existingSparkContext
. After creating and transforming DStreams, the streaming computation can be started and stopped using context.start() and context.stop(), respectively. context.awaitTermination() allows the current thread to wait for the termination of the context by stop() or by an exception.-
addStreamingListener
(streamingListener)[source]¶ Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for receiving system events related to streaming.
-
awaitTermination
(timeout=None)[source]¶ Wait for the execution to stop.
- Parameters
timeout – time to wait in seconds
-
awaitTerminationOrTimeout
(timeout)[source]¶ Wait for the execution to stop. Return true if it’s stopped; or throw the reported error during the execution; or false if the waiting time elapsed before returning from the method.
- Parameters
timeout – time to wait in seconds
-
binaryRecordsStream
(directory, recordLength)[source]¶ Create an input stream that monitors a Hadoop-compatible file system for new files and reads them as flat binary files with records of fixed length. Files must be written to the monitored directory by “moving” them from another location within the same file system. File names starting with . are ignored.
- Parameters
directory – Directory to load data from
recordLength – Length of each record in bytes
-
checkpoint
(directory)[source]¶ Sets the context to periodically checkpoint the DStream operations for master fault-tolerance. The graph will be checkpointed every batch interval.
- Parameters
directory – HDFS-compatible directory where the checkpoint data will be reliably stored
-
classmethod
getActive
()[source]¶ Return either the currently active StreamingContext (i.e., if there is a context started but not stopped) or None.
-
classmethod
getActiveOrCreate
(checkpointPath, setupFunc)[source]¶ Either return the active StreamingContext (i.e. currently started but not stopped), or recreate a StreamingContext from checkpoint data or create a new StreamingContext using the provided setupFunc function. If the checkpointPath is None or does not contain valid checkpoint data, then setupFunc will be called to create a new context and setup DStreams.
- Parameters
checkpointPath – Checkpoint directory used in an earlier streaming program. Can be None if the intention is to always create a new context when there is no active context.
setupFunc – Function to create a new JavaStreamingContext and setup DStreams
-
classmethod
getOrCreate
(checkpointPath, setupFunc)[source]¶ Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. If checkpoint data exists in the provided checkpointPath, then StreamingContext will be recreated from the checkpoint data. If the data does not exist, then the provided setupFunc will be used to create a new context.
- Parameters
checkpointPath – Checkpoint directory used in an earlier streaming program
setupFunc – Function to create a new context and setup DStreams
-
queueStream
(rdds, oneAtATime=True, default=None)[source]¶ Create an input stream from an queue of RDDs or list. In each batch, it will process either one or all of the RDDs returned by the queue.
Note
Changes to the queue after the stream is created will not be recognized.
- Parameters
rdds – Queue of RDDs
oneAtATime – pick one rdd each time or pick all of them once.
default – The default rdd if no more in rdds
-
remember
(duration)[source]¶ Set each DStreams in this context to remember RDDs it generated in the last given duration. DStreams remember RDDs only for a limited duration of time and releases them for garbage collection. This method allows the developer to specify how to long to remember the RDDs (if the developer wishes to query old data outside the DStream computation).
- Parameters
duration – Minimum duration (in seconds) that each DStream should remember its RDDs
-
socketTextStream
(hostname, port, storageLevel=StorageLevel(True, True, False, False, 2))[source]¶ Create an input from TCP source hostname:port. Data is received using a TCP socket and receive byte is interpreted as UTF8 encoded
\n
delimited lines.- Parameters
hostname – Hostname to connect to for receiving data
port – Port to connect to for receiving data
storageLevel – Storage level to use for storing the received objects
-
property
sparkContext
¶ Return SparkContext which is associated with this StreamingContext.
-
stop
(stopSparkContext=True, stopGraceFully=False)[source]¶ Stop the execution of the streams, with option of ensuring all received data has been processed.
- Parameters
stopSparkContext – Stop the associated SparkContext or not
stopGracefully – Stop gracefully by waiting for the processing of all received data to be completed
-
textFileStream
(directory)[source]¶ Create an input stream that monitors a Hadoop-compatible file system for new files and reads them as text files. Files must be wrriten to the monitored directory by “moving” them from another location within the same file system. File names starting with . are ignored.
-
-
class
pyspark.streaming.
DStream
(jdstream, ssc, jrdd_deserializer)[source]¶ Bases:
object
A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous sequence of RDDs (of the same type) representing a continuous stream of data (see
RDD
in the Spark core documentation for more details on RDDs).DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume, etc.) using a
StreamingContext
or it can be generated by transforming existing DStreams using operations such as map, window and reduceByKeyAndWindow. While a Spark Streaming program is running, each DStream periodically generates a RDD, either from live data or by transforming the RDD generated by a parent DStream.- DStreams internally is characterized by a few basic properties:
A list of other DStreams that the DStream depends on
A time interval at which the DStream generates an RDD
A function that is used to generate an RDD after each time interval
-
checkpoint
(interval)[source]¶ Enable periodic checkpointing of RDDs of this DStream
- Parameters
interval – time in seconds, after each period of that, generated RDD will be checkpointed
-
cogroup
(other, numPartitions=None)[source]¶ Return a new DStream by applying ‘cogroup’ between RDDs of this DStream and other DStream.
Hash partitioning is used to generate the RDDs with numPartitions partitions.
-
combineByKey
(createCombiner, mergeValue, mergeCombiners, numPartitions=None)[source]¶ Return a new DStream by applying combineByKey to each RDD.
-
count
()[source]¶ Return a new DStream in which each RDD has a single element generated by counting each RDD of this DStream.
-
countByValue
()[source]¶ Return a new DStream in which each RDD contains the counts of each distinct value in each RDD of this DStream.
-
countByValueAndWindow
(windowDuration, slideDuration, numPartitions=None)[source]¶ Return a new DStream in which each RDD contains the count of distinct elements in RDDs in a sliding window over this DStream.
- Parameters
windowDuration – width of the window; must be a multiple of this DStream’s batching interval
slideDuration – sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream’s batching interval
numPartitions – number of partitions of each RDD in the new DStream.
-
countByWindow
(windowDuration, slideDuration)[source]¶ Return a new DStream in which each RDD has a single element generated by counting the number of elements in a window over this DStream. windowDuration and slideDuration are as defined in the window() operation.
This is equivalent to window(windowDuration, slideDuration).count(), but will be more efficient if window is large.
-
flatMap
(f, preservesPartitioning=False)[source]¶ Return a new DStream by applying a function to all elements of this DStream, and then flattening the results
-
flatMapValues
(f)[source]¶ Return a new DStream by applying a flatmap function to the value of each key-value pairs in this DStream without changing the key.
-
fullOuterJoin
(other, numPartitions=None)[source]¶ Return a new DStream by applying ‘full outer join’ between RDDs of this DStream and other DStream.
Hash partitioning is used to generate the RDDs with numPartitions partitions.
-
glom
()[source]¶ Return a new DStream in which RDD is generated by applying glom() to RDD of this DStream.
-
groupByKeyAndWindow
(windowDuration, slideDuration, numPartitions=None)[source]¶ Return a new DStream by applying groupByKey over a sliding window. Similar to DStream.groupByKey(), but applies it over a sliding window.
- Parameters
windowDuration – width of the window; must be a multiple of this DStream’s batching interval
slideDuration – sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream’s batching interval
numPartitions – Number of partitions of each RDD in the new DStream.
-
join
(other, numPartitions=None)[source]¶ Return a new DStream by applying ‘join’ between RDDs of this DStream and other DStream.
Hash partitioning is used to generate the RDDs with numPartitions partitions.
-
leftOuterJoin
(other, numPartitions=None)[source]¶ Return a new DStream by applying ‘left outer join’ between RDDs of this DStream and other DStream.
Hash partitioning is used to generate the RDDs with numPartitions partitions.
-
map
(f, preservesPartitioning=False)[source]¶ Return a new DStream by applying a function to each element of DStream.
-
mapPartitions
(f, preservesPartitioning=False)[source]¶ Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs of this DStream.
-
mapPartitionsWithIndex
(f, preservesPartitioning=False)[source]¶ Return a new DStream in which each RDD is generated by applying mapPartitionsWithIndex() to each RDDs of this DStream.
-
mapValues
(f)[source]¶ Return a new DStream by applying a map function to the value of each key-value pairs in this DStream without changing the key.
-
partitionBy
(numPartitions, partitionFunc=<function portable_hash>)[source]¶ Return a copy of the DStream in which each RDD are partitioned using the specified partitioner.
-
pprint
(num=10)[source]¶ Print the first num elements of each RDD generated in this DStream.
- Parameters
num – the number of elements from the first will be printed.
-
reduce
(func)[source]¶ Return a new DStream in which each RDD has a single element generated by reducing each RDD of this DStream.
-
reduceByKey
(func, numPartitions=None)[source]¶ Return a new DStream by applying reduceByKey to each RDD.
-
reduceByKeyAndWindow
(func, invFunc, windowDuration, slideDuration=None, numPartitions=None, filterFunc=None)[source]¶ Return a new DStream by applying incremental reduceByKey over a sliding window.
- The reduced value of over a new window is calculated using the old window’s reduce value :
reduce the new values that entered the window (e.g., adding new counts)
“inverse reduce” the old values that left the window (e.g., subtracting old counts)
invFunc can be None, then it will reduce all the RDDs in window, could be slower than having invFunc.
- Parameters
func – associative and commutative reduce function
invFunc – inverse function of reduceFunc
windowDuration – width of the window; must be a multiple of this DStream’s batching interval
slideDuration – sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream’s batching interval
numPartitions – number of partitions of each RDD in the new DStream.
filterFunc – function to filter expired key-value pairs; only pairs that satisfy the function are retained set this to null if you do not want to filter
-
reduceByWindow
(reduceFunc, invReduceFunc, windowDuration, slideDuration)[source]¶ Return a new DStream in which each RDD has a single element generated by reducing all elements in a sliding window over this DStream.
if invReduceFunc is not None, the reduction is done incrementally using the old window’s reduced value :
reduce the new values that entered the window (e.g., adding new counts)
2. “inverse reduce” the old values that left the window (e.g., subtracting old counts) This is more efficient than invReduceFunc is None.
- Parameters
reduceFunc – associative and commutative reduce function
invReduceFunc – inverse reduce function of reduceFunc; such that for all y, and invertible x: invReduceFunc(reduceFunc(x, y), x) = y
windowDuration – width of the window; must be a multiple of this DStream’s batching interval
slideDuration – sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream’s batching interval
-
repartition
(numPartitions)[source]¶ Return a new DStream with an increased or decreased level of parallelism.
-
rightOuterJoin
(other, numPartitions=None)[source]¶ Return a new DStream by applying ‘right outer join’ between RDDs of this DStream and other DStream.
Hash partitioning is used to generate the RDDs with numPartitions partitions.
-
saveAsTextFiles
(prefix, suffix=None)[source]¶ Save each RDD in this DStream as at text file, using string representation of elements.
-
slice
(begin, end)[source]¶ Return all the RDDs between ‘begin’ to ‘end’ (both included)
begin, end could be datetime.datetime() or unix_timestamp
-
transform
(func)[source]¶ Return a new DStream in which each RDD is generated by applying a function on each RDD of this DStream.
func can have one argument of rdd, or have two arguments of (time, rdd)
-
transformWith
(func, other, keepSerializer=False)[source]¶ Return a new DStream in which each RDD is generated by applying a function on each RDD of this DStream and ‘other’ DStream.
func can have two arguments of (rdd_a, rdd_b) or have three arguments of (time, rdd_a, rdd_b)
-
union
(other)[source]¶ Return a new DStream by unifying data of another DStream with this DStream.
- Parameters
other – Another DStream having the same interval (i.e., slideDuration) as this DStream.
-
updateStateByKey
(updateFunc, numPartitions=None, initialRDD=None)[source]¶ Return a new “state” DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values of the key.
- Parameters
updateFunc – State update function. If this function returns None, then corresponding state key-value pair will be eliminated.
-
window
(windowDuration, slideDuration=None)[source]¶ Return a new DStream in which each RDD contains all the elements in seen in a sliding window of time over this DStream.
- Parameters
windowDuration – width of the window; must be a multiple of this DStream’s batching interval
slideDuration – sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream’s batching interval
-
class
pyspark.streaming.
StreamingListener
[source]¶ Bases:
object
-
class
Java
[source]¶ Bases:
object
-
implements
= ['org.apache.spark.streaming.api.java.PythonStreamingListener']¶
-
-
onBatchSubmitted
(batchSubmitted)[source]¶ Called when a batch of jobs has been submitted for processing.
-
onOutputOperationCompleted
(outputOperationCompleted)[source]¶ Called when processing of a job of a batch has completed
-
class
pyspark.streaming.kafka module¶
-
class
pyspark.streaming.kafka.
Broker
(host, port)[source]¶ Bases:
object
Represent the host and port info for a Kafka broker.
Note
Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. See SPARK-21893.
-
class
pyspark.streaming.kafka.
KafkaMessageAndMetadata
(topic, partition, offset, key, message)[source]¶ Bases:
object
Kafka message and metadata information. Including topic, partition, offset and message
Note
Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. See SPARK-21893.
-
property
key
¶
-
property
message
¶
-
property
-
class
pyspark.streaming.kafka.
KafkaUtils
[source]¶ Bases:
object
-
static
createDirectStream
(ssc, topics, kafkaParams, fromOffsets=None, keyDecoder=<function utf8_decoder>, valueDecoder=<function utf8_decoder>, messageHandler=None)[source]¶ Create an input stream that directly pulls messages from a Kafka Broker and specific offset.
This is not a receiver based Kafka input stream, it directly pulls the message from Kafka in each batch duration and processed without storing.
This does not use Zookeeper to store offsets. The consumed offsets are tracked by the stream itself. For interoperability with Kafka monitoring tools that depend on Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. You can access the offsets used in each batch from the generated RDDs (see
To recover from driver failures, you have to enable checkpointing in the StreamingContext. The information on consumed offset can be recovered from the checkpoint. See the programming guide for details (constraints, etc.).
- Parameters
ssc – StreamingContext object.
topics – list of topic_name to consume.
kafkaParams – Additional params for Kafka.
fromOffsets – Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream (a dictionary mapping TopicAndPartition to integers).
keyDecoder – A function used to decode key (default is utf8_decoder).
valueDecoder – A function used to decode value (default is utf8_decoder).
messageHandler – A function used to convert KafkaMessageAndMetadata. You can assess meta using messageHandler (default is None).
- Returns
A DStream object
Note
Experimental
Note
Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. See SPARK-21893.
-
static
createRDD
(sc, kafkaParams, offsetRanges, leaders=None, keyDecoder=<function utf8_decoder>, valueDecoder=<function utf8_decoder>, messageHandler=None)[source]¶ Create an RDD from Kafka using offset ranges for each topic and partition.
- Parameters
sc – SparkContext object
kafkaParams – Additional params for Kafka
offsetRanges – list of offsetRange to specify topic:partition:[start, end) to consume
leaders – Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map, in which case leaders will be looked up on the driver.
keyDecoder – A function used to decode key (default is utf8_decoder)
valueDecoder – A function used to decode value (default is utf8_decoder)
messageHandler – A function used to convert KafkaMessageAndMetadata. You can assess meta using messageHandler (default is None).
- Returns
An RDD object
Note
Experimental
Note
Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. See SPARK-21893.
-
static
createStream
(ssc, zkQuorum, groupId, topics, kafkaParams=None, storageLevel=StorageLevel(True, True, False, False, 2), keyDecoder=<function utf8_decoder>, valueDecoder=<function utf8_decoder>)[source]¶ Create an input stream that pulls messages from a Kafka Broker.
- Parameters
ssc – StreamingContext object
zkQuorum – Zookeeper quorum (hostname:port,hostname:port,..).
groupId – The group id for this consumer.
topics – Dict of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread.
kafkaParams – Additional params for Kafka
storageLevel – RDD storage level.
keyDecoder – A function used to decode key (default is utf8_decoder)
valueDecoder – A function used to decode value (default is utf8_decoder)
- Returns
A DStream object
Note
Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. See SPARK-21893.
-
static
-
class
pyspark.streaming.kafka.
OffsetRange
(topic, partition, fromOffset, untilOffset)[source]¶ Bases:
object
Represents a range of offsets from a single Kafka TopicAndPartition.
Note
Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. See SPARK-21893.
pyspark.streaming.kinesis module¶
-
class
pyspark.streaming.kinesis.
KinesisUtils
[source]¶ Bases:
object
-
static
createStream
(ssc, kinesisAppName, streamName, endpointUrl, regionName, initialPositionInStream, checkpointInterval, storageLevel=StorageLevel(True, True, False, False, 2), awsAccessKeyId=None, awsSecretKey=None, decoder=<function utf8_decoder>, stsAssumeRoleArn=None, stsSessionName=None, stsExternalId=None)[source]¶ Create an input stream that pulls messages from a Kinesis stream. This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
Note
The given AWS credentials will get saved in DStream checkpoints if checkpointing is enabled. Make sure that your checkpoint directory is secure.
- Parameters
ssc – StreamingContext object
kinesisAppName – Kinesis application name used by the Kinesis Client Library (KCL) to update DynamoDB
streamName – Kinesis stream name
endpointUrl – Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
regionName – Name of region used by the Kinesis Client Library (KCL) to update DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
initialPositionInStream – In the absence of Kinesis checkpoint info, this is the worker’s initial starting position in the stream. The values are either the beginning of the stream per Kinesis’ limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) or the tip of the stream (InitialPositionInStream.LATEST).
checkpointInterval – Checkpoint interval for Kinesis checkpointing. See the Kinesis Spark Streaming documentation for more details on the different types of checkpoints.
storageLevel – Storage level to use for storing the received objects (default is StorageLevel.MEMORY_AND_DISK_2)
awsAccessKeyId – AWS AccessKeyId (default is None. If None, will use DefaultAWSCredentialsProviderChain)
awsSecretKey – AWS SecretKey (default is None. If None, will use DefaultAWSCredentialsProviderChain)
decoder – A function used to decode value (default is utf8_decoder)
stsAssumeRoleArn – ARN of IAM role to assume when using STS sessions to read from the Kinesis stream (default is None).
stsSessionName – Name to uniquely identify STS sessions used to read from Kinesis stream, if STS is being used (default is None).
stsExternalId – External ID that can be used to validate against the assumed IAM role’s trust policy, if STS is being used (default is None).
- Returns
A DStream object
-
static
pyspark.streaming.flume.module¶
-
class
pyspark.streaming.flume.
FlumeUtils
[source]¶ Bases:
object
-
static
createPollingStream
(ssc, addresses, storageLevel=StorageLevel(True, True, False, False, 2), maxBatchSize=1000, parallelism=5, bodyDecoder=<function utf8_decoder>)[source]¶ Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. This stream will poll the sink for data and will pull events as they are available.
- Parameters
ssc – StreamingContext object
addresses – List of (host, port)s on which the Spark Sink is running.
storageLevel – Storage level to use for storing the received objects
maxBatchSize – The maximum number of events to be pulled from the Spark sink in a single RPC call
parallelism – Number of concurrent requests this stream should send to the sink. Note that having a higher number of requests concurrently being pulled will result in this stream using more threads
bodyDecoder – A function used to decode body (default is utf8_decoder)
- Returns
A DStream object
Note
Deprecated in 2.3.0. Flume support is deprecated as of Spark 2.3.0. See SPARK-22142.
-
static
createStream
(ssc, hostname, port, storageLevel=StorageLevel(True, True, False, False, 2), enableDecompression=False, bodyDecoder=<function utf8_decoder>)[source]¶ Create an input stream that pulls events from Flume.
- Parameters
ssc – StreamingContext object
hostname – Hostname of the slave machine to which the flume data will be sent
port – Port of the slave machine to which the flume data will be sent
storageLevel – Storage level to use for storing the received objects
enableDecompression – Should netty server decompress input stream
bodyDecoder – A function used to decode body (default is utf8_decoder)
- Returns
A DStream object
Note
Deprecated in 2.3.0. Flume support is deprecated as of Spark 2.3.0. See SPARK-22142.
-
static