Streaming

Warning

This is a new addition to the API (March 2017) that should only be used with care.

StreamingContext

class pysparkling.streaming.StreamingContext(sparkContext, batchDuration=None)[source]

Stream processing.

Parameters:
  • sparkContext (pysparkling.Context) – A pysparkling.Context.
  • batchDuration (float) – Duration in seconds per batch.
sparkContext

Return context of this StreamingContext.

awaitTermination(timeout=None)[source]

Wait for context to stop.

Parameters:timeout (float) – in seconds
awaitTerminationOrTimeout(timeout)[source]

Provided for compatibility. Same as awaitTermination() here.

binaryRecordsStream(directory, recordLength=None, process_all=False)[source]

Monitor a directory and process all binary files.

File names starting with . are ignored.

Parameters:
  • directory (string) – a path
  • recordLength – None, int or struct format string
  • process_all (bool) – whether to process pre-existing files
Return type:

DStream

Warning

Only int recordLength are supported in PySpark API. The process_all parameter does not exist in the PySpark API.

queueStream(rdds, oneAtATime=True, default=None)[source]

Create stream iterable over RDDs.

Parameters:
  • rdds – Iterable over RDDs or lists.
  • oneAtATime – Process one at a time or all.
  • default – If no more RDDs in rdds, return this. Can be None.
Return type:

DStream

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> (
...     ssc
...     .queueStream([[4], [2], [7]])
...     .foreachRDD(lambda rdd: print(rdd.collect()))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.39)
[4]
[2]
[7]

Example testing the default value:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> (
...     ssc
...     .queueStream([[4], [2]], default=['placeholder'])
...     .foreachRDD(lambda rdd: print(rdd.collect()))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.35)
[4]
[2]
['placeholder']
remember(duration)[source]

Provided for compatibility, but does nothing here.

socketBinaryStream(hostname, port, length)[source]

Create a TCP socket server for binary input.

Warning

This is not part of the PySpark API.

Parameters:
  • hostname (string) – Hostname of TCP server.
  • port (int) – Port of TCP server.
  • length

    Message length. Length in bytes or a format string for struct.unpack().

    For variable length messages where the message length is sent right before the message itself, length is a format string that can be passed to struct.unpack(). For example, use length='<I' for a little-endian (standard on x86) 32-bit unsigned int.

Return type:

DStream

socketTextStream(hostname, port)[source]

Create a TCP socket server.

Parameters:
  • hostname (string) – Hostname of TCP server.
  • port (int) – Port of TCP server.
Return type:

DStream

start()[source]

Start processing streams.

stop(stopSparkContext=True, stopGraceFully=False)[source]

Stop processing streams.

Parameters:
  • stopSparkContext – stop the SparkContext (NOT IMPLEMENTED)
  • stopGracefully – stop gracefully (NOT IMPLEMENTED)
textFileStream(directory, process_all=False)[source]

Monitor a directory and process all text files.

File names starting with . are ignored.

Parameters:
  • directory (string) – a path
  • process_all (bool) – whether to process pre-existing files
Return type:

DStream

Warning

The process_all parameter does not exist in the PySpark API.

fileTextStream(directory, process_all=False)

Alias of textFileStream().

fileBinaryStream(directory, recordLength=None, process_all=False)

Alias of binaryRecordsStream().

DStream

class pysparkling.streaming.DStream(jdstream, ssc, jrdd_deserializer=None)[source]

A discrete stream of RDDs.

Usually a DStream is created by a pysparkling.streaming.StreamingContext method like pysparkling.streaming.StreamingContext.queueStream() and then operated on with the methods below.

Parameters:
  • jdstream – previous stream
  • ssc (StreamingContext) – the streaming context
  • jrdd_deserializer – a deserializer callable
cache()[source]

Cache RDDs.

Return type:DStream
cogroup(other, numPartitions=None)[source]

Apply cogroup to RDDs of this and other DStream.

Parameters:other (DStream) – another DStream
Return type:DStream

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> s1 = ssc.queueStream([[('a', 4), ('b', 2)], [('c', 7)]])
>>> s2 = ssc.queueStream([[('a', 1), ('b', 3)], [('c', 8)]])
>>> s1.cogroup(s2).foreachRDD(lambda rdd: print(sorted(rdd.collect())))
>>> ssc.start()
>>> ssc.awaitTermination(0.25)
[('a', [[4], [1]]), ('b', [[2], [3]])]
[('c', [[7], [8]])]
context()[source]

Return the StreamContext of this stream.

Return type:StreamingContext
count()[source]

Count elements per RDD.

Creates a new RDD stream where each RDD has a single entry that is the count of the elements.

Return type:DStream
countByValue()[source]

Apply countByValue to every RDD.abs

Return type:DStream

Warning

Implemented as a local operation.

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> (
...     ssc
...     .queueStream([[1, 1, 5, 5, 5, 2]])
...     .countByValue()
...     .foreachRDD(lambda rdd: print(sorted(rdd.collect())))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.15)
[(1, 2), (2, 1), (5, 3)]
countByWindow(windowDuration, slideDuration=None)[source]

Applies count() after window().

Parameters:
  • windowDuration (float) – multiple of batch interval
  • slideDuration (float) – multiple of batch interval
Return type:

DStream

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> (
...     ssc
...     .queueStream([[1, 1, 5], [5, 5, 2, 4], [1, 2]])
...     .countByWindow(0.2)
...     .foreachRDD(lambda rdd: print(rdd.collect()))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.35)
[3]
[7]
[6]
filter(f)[source]

Filter elements.

Parameters:f – filter function
Return type:DStream
flatMap(f, preservesPartitioning=False)[source]

Apply function f and flatten.

Parameters:f – mapping function
Return type:DStream
flatMapValues(f)[source]

Apply f to each value of a key-value pair.

Parameters:f – map function
Return type:DStream
foreachRDD(func)[source]

Apply func.

Parameters:func – Function to apply.
fullOuterJoin(other, numPartitions=None)[source]

Apply fullOuterJoin to each pair of RDDs.

Return type:DStream

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> s1 = ssc.queueStream([[('a', 4), ('b', 2)], [('c', 7)]])
>>> s2 = ssc.queueStream([[('a', 1), ('b', 3)], [('c', 8)]])
>>> (
...     s1.fullOuterJoin(s2)
...     .foreachRDD(lambda rdd: print(sorted(rdd.collect())))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.25)
[('a', (4, 1)), ('b', (2, 3))]
[('c', (7, 8))]

Example with repeated keys:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> s1 = ssc.queueStream([[('a', 4), ('a', 2)], [('c', 7)]])
>>> s2 = ssc.queueStream([[('b', 1)], [('c', 8)]])
>>> (
...     s1.fullOuterJoin(s2)
...     .foreachRDD(lambda rdd: print(sorted(rdd.collect())))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.25)
[('a', (2, None)), ('a', (4, None)), ('b', (None, 1))]
[('c', (7, 8))]
groupByKey()[source]

group by key

Return type:DStream
join(other, numPartitions=None)[source]

Apply join to each pair of RDDs.

Return type:DStream

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> s1 = ssc.queueStream([[('a', 4), ('e', 2)], [('c', 7)]])
>>> s2 = ssc.queueStream([[('a', 1), ('b', 3)], [('c', 8)]])
>>> (
...     s1.join(s2)
...     .foreachRDD(lambda rdd: print(sorted(rdd.collect())))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.25)
[('a', (4, 1))]
[('c', (7, 8))]
leftOuterJoin(other, numPartitions=None)[source]

Apply leftOuterJoin to each pair of RDDs.

Return type:DStream

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> s1 = ssc.queueStream([[('a', 4), ('e', 2)], [('c', 7)]])
>>> s2 = ssc.queueStream([[('a', 1), ('b', 3)], [('c', 8)]])
>>> (
...     s1.leftOuterJoin(s2)
...     .foreachRDD(lambda rdd: print(sorted(rdd.collect())))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.25)
[('a', (4, 1)), ('e', (2, None))]
[('c', (7, 8))]
map(f, preservesPartitioning=False)[source]

Apply function f

Parameters:f – mapping function
Return type:DStream

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> (
...     ssc
...     .queueStream([[4], [2], [7]])
...     .map(lambda e: e + 1)
...     .foreachRDD(lambda rdd: print(rdd.collect()))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.35)
[5]
[3]
[8]
mapPartitions(f, preservesPartitioning=False)[source]

Map partitions.

Parameters:f – mapping function
Return type:DStream
mapPartitionsWithIndex(f, preservesPartitioning=False)[source]

Apply a map function that takes an index and the data.

Map partitions with a function that takes the partition index and an iterator over the partition data as arguments.

Parameters:f – mapping function
Return type:DStream
mapValues(f)[source]

Apply f to every element.

Return type:DStream

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> (
...     ssc
...     .queueStream([[('a', 4)], [('b', 2)], [('c', 7)]])
...     .mapValues(lambda e: e + 1)
...     .foreachRDD(lambda rdd: print(rdd.collect()))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.35)
[('a', 5)]
[('b', 3)]
[('c', 8)]
pprint(num=10)[source]

Print the first num elements of each RDD.

Parameters:num (int) – Set number of elements to be printed.
reduce(func)[source]

Return a new DStream where each RDD was reduced with func.

Return type:DStream
reduceByKey(func, numPartitions=None)[source]

Apply reduceByKey to every RDD.

Parameters:
  • func – reduce function to apply
  • numPartitions (int) – number of partitions
Return type:

DStream

repartition(numPartitions)[source]

Repartition every RDD.

Return type:DStream

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> (
...     ssc
...     .queueStream([['hello', 'world']])
...     .repartition(2)
...     .foreachRDD(lambda rdd: print(len(rdd.partitions())))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.25)
2
0
rightOuterJoin(other, numPartitions=None)[source]

Apply rightOuterJoin to each pair of RDDs.

Return type:DStream

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> s1 = ssc.queueStream([[('a', 4), ('e', 2)], [('c', 7)]])
>>> s2 = ssc.queueStream([[('a', 1), ('b', 3)], [('c', 8)]])
>>> (
...     s1.rightOuterJoin(s2)
...     .foreachRDD(lambda rdd: print(sorted(rdd.collect())))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.25)
[('a', (4, 1)), ('b', (None, 3))]
[('c', (7, 8))]
saveAsTextFiles(prefix, suffix=None)[source]

Save every RDD as a text file (or sets of text files).

Parameters:
  • prefix (string) – path prefix of the output
  • suffix (string) – file suffix (e.g. ‘.gz’ to enable compression)

Example:

>>> from backports import tempfile
>>> import os, pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> with tempfile.TemporaryDirectory() as tmp_dir:
...     (
...         ssc.queueStream([['hello', 'world'], [1, 2]])
...         .saveAsTextFiles(os.path.join(tmp_dir, 'textout'))
...     )
...     ssc.start()
...     ssc.awaitTermination(0.25)
...     result = sc.textFile(tmp_dir + '*').collect()
>>> result
['hello', 'world', '1', '2']
slice(begin, end)[source]

Filter RDDs to between begin and end.

Parameters:
  • begin (datetime.datetime|int) – datetiem or unix timestamp
  • end (datetime.datetime|int) – datetiem or unix timestamp
Return type:

DStream

transform(func)[source]

Return a new DStream where each RDD is transformed by f.

Parameters:f – Function that transforms an RDD.
Return type:DStream
transformWith(func, other, keepSerializer=False)[source]

Return a new DStream where each RDD is transformed by f.

Parameters:f – transformation function
Return type:DStream

The transformation function can have arguments (time, rdd_a, rdd_b) or (rdd_a, rdd_b).

union(other)[source]

Union of two DStreams.

Parameters:other (DStream) – Another DStream.

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> odd = ssc.queueStream([[1], [3], [5]])
>>> even = ssc.queueStream([[2], [4], [6]])
>>> (
...     odd.union(even)
...     .foreachRDD(lambda rdd: print(rdd.collect()))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.35)
[1, 2]
[3, 4]
[5, 6]
updateStateByKey(func)[source]

Process with state.

Parameters:func – Evaluated per key. Takes list of input_values and a state.
Return type:DStream

This example shows how to return the latest value per key:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.2)
>>> (
...     ssc
...     .queueStream([[('a', 1), ('b', 3)], [('a', 2), ('c', 4)]])
...     .updateStateByKey(lambda input_values, state:
...                       state
...                       if not input_values
...                       else input_values[-1])
...     .foreachRDD(lambda rdd: print(sorted(rdd.collect())))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.5)
[('a', 1), ('b', 3)]
[('a', 2), ('b', 3), ('c', 4)]

This example counts values per key:

>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.2)
>>> (
...     ssc
...     .queueStream([[('a', 1)], [('a', 2), ('b', 4), ('b', 3)]])
...     .updateStateByKey(lambda input_values, state:
...                       (state if state is not None else 0) +
...                       sum(input_values))
...     .foreachRDD(lambda rdd: print(sorted(rdd.collect())))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.5)
[('a', 1)]
[('a', 3), ('b', 7)]
window(windowDuration, slideDuration=None)[source]

Windowed RDD.

Parameters:
  • windowDuration (float) – multiple of batching interval
  • slideDuration (float) – multiple of batching interval
Return type:

DStream

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.2)
>>> (
...     ssc
...     .queueStream([[1], [2], [3], [4], [5], [6]])
...     .window(0.6)
...     .foreachRDD(lambda rdd: print(rdd.collect()))
... )
>>> ssc.start()
>>> ssc.awaitTermination(1.3)
[1]
[1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]
[4, 5, 6]