Streaming¶
Warning
This is a new addition to the API (March 2017) that should only be used with care.
StreamingContext¶
-
class
pysparkling.streaming.
StreamingContext
(sparkContext, batchDuration=None)[source]¶ Stream processing.
Parameters: - sparkContext (pysparkling.Context) – A pysparkling.Context.
- batchDuration (float) – Duration in seconds per batch.
-
sparkContext
¶ Return context of this StreamingContext.
-
awaitTermination
(timeout=None)[source]¶ Wait for context to stop.
Parameters: timeout (float) – in seconds
-
awaitTerminationOrTimeout
(timeout)[source]¶ Provided for compatibility. Same as
awaitTermination()
here.
-
binaryRecordsStream
(directory, recordLength=None, process_all=False)[source]¶ Monitor a directory and process all binary files.
File names starting with
.
are ignored.Parameters: - directory (string) – a path
- recordLength – None, int or struct format string
- process_all (bool) – whether to process pre-existing files
Return type: Warning
Only
int
recordLength
are supported in PySpark API. Theprocess_all
parameter does not exist in the PySpark API.
-
queueStream
(rdds, oneAtATime=True, default=None)[source]¶ Create stream iterable over RDDs.
Parameters: - rdds – Iterable over RDDs or lists.
- oneAtATime – Process one at a time or all.
- default – If no more RDDs in
rdds
, return this. Can be None.
Return type: Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> ( ... ssc ... .queueStream([[4], [2], [7]]) ... .foreachRDD(lambda rdd: print(rdd.collect())) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.39) [4] [2] [7]
Example testing the default value:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> ( ... ssc ... .queueStream([[4], [2]], default=['placeholder']) ... .foreachRDD(lambda rdd: print(rdd.collect())) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.35) [4] [2] ['placeholder']
-
socketBinaryStream
(hostname, port, length)[source]¶ Create a TCP socket server for binary input.
Warning
This is not part of the PySpark API.
Parameters: - hostname (string) – Hostname of TCP server.
- port (int) – Port of TCP server.
- length –
Message length. Length in bytes or a format string for
struct.unpack()
.For variable length messages where the message length is sent right before the message itself,
length
is a format string that can be passed tostruct.unpack()
. For example, uselength='<I'
for a little-endian (standard on x86) 32-bit unsigned int.
Return type:
-
socketTextStream
(hostname, port)[source]¶ Create a TCP socket server.
Parameters: - hostname (string) – Hostname of TCP server.
- port (int) – Port of TCP server.
Return type:
-
stop
(stopSparkContext=True, stopGraceFully=False)[source]¶ Stop processing streams.
Parameters: - stopSparkContext – stop the SparkContext (NOT IMPLEMENTED)
- stopGracefully – stop gracefully (NOT IMPLEMENTED)
-
textFileStream
(directory, process_all=False)[source]¶ Monitor a directory and process all text files.
File names starting with
.
are ignored.Parameters: - directory (string) – a path
- process_all (bool) – whether to process pre-existing files
Return type: Warning
The
process_all
parameter does not exist in the PySpark API.
-
fileTextStream
(directory, process_all=False)¶ Alias of
textFileStream()
.
-
fileBinaryStream
(directory, recordLength=None, process_all=False)¶ Alias of
binaryRecordsStream()
.
DStream¶
-
class
pysparkling.streaming.
DStream
(jdstream, ssc, jrdd_deserializer=None)[source]¶ A discrete stream of RDDs.
Usually a DStream is created by a
pysparkling.streaming.StreamingContext
method likepysparkling.streaming.StreamingContext.queueStream()
and then operated on with the methods below.Parameters: - jdstream – previous stream
- ssc (StreamingContext) – the streaming context
- jrdd_deserializer – a deserializer callable
-
cogroup
(other, numPartitions=None)[source]¶ Apply cogroup to RDDs of this and other DStream.
Parameters: other (DStream) – another DStream Return type: DStream Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> s1 = ssc.queueStream([[('a', 4), ('b', 2)], [('c', 7)]]) >>> s2 = ssc.queueStream([[('a', 1), ('b', 3)], [('c', 8)]]) >>> s1.cogroup(s2).foreachRDD(lambda rdd: print(sorted(rdd.collect()))) >>> ssc.start() >>> ssc.awaitTermination(0.25) [('a', [[4], [1]]), ('b', [[2], [3]])] [('c', [[7], [8]])]
-
context
()[source]¶ Return the StreamContext of this stream.
Return type: StreamingContext
-
count
()[source]¶ Count elements per RDD.
Creates a new RDD stream where each RDD has a single entry that is the count of the elements.
Return type: DStream
-
countByValue
()[source]¶ Apply countByValue to every RDD.abs
Return type: DStream Warning
Implemented as a local operation.
Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> ( ... ssc ... .queueStream([[1, 1, 5, 5, 5, 2]]) ... .countByValue() ... .foreachRDD(lambda rdd: print(sorted(rdd.collect()))) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.15) [(1, 2), (2, 1), (5, 3)]
-
countByWindow
(windowDuration, slideDuration=None)[source]¶ Applies count() after window().
Parameters: Return type: Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> ( ... ssc ... .queueStream([[1, 1, 5], [5, 5, 2, 4], [1, 2]]) ... .countByWindow(0.2) ... .foreachRDD(lambda rdd: print(rdd.collect())) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.35) [3] [7] [6]
-
flatMap
(f, preservesPartitioning=False)[source]¶ Apply function f and flatten.
Parameters: f – mapping function Return type: DStream
-
flatMapValues
(f)[source]¶ Apply f to each value of a key-value pair.
Parameters: f – map function Return type: DStream
-
fullOuterJoin
(other, numPartitions=None)[source]¶ Apply fullOuterJoin to each pair of RDDs.
Return type: DStream Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> s1 = ssc.queueStream([[('a', 4), ('b', 2)], [('c', 7)]]) >>> s2 = ssc.queueStream([[('a', 1), ('b', 3)], [('c', 8)]]) >>> ( ... s1.fullOuterJoin(s2) ... .foreachRDD(lambda rdd: print(sorted(rdd.collect()))) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.25) [('a', (4, 1)), ('b', (2, 3))] [('c', (7, 8))]
Example with repeated keys:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> s1 = ssc.queueStream([[('a', 4), ('a', 2)], [('c', 7)]]) >>> s2 = ssc.queueStream([[('b', 1)], [('c', 8)]]) >>> ( ... s1.fullOuterJoin(s2) ... .foreachRDD(lambda rdd: print(sorted(rdd.collect()))) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.25) [('a', (2, None)), ('a', (4, None)), ('b', (None, 1))] [('c', (7, 8))]
-
join
(other, numPartitions=None)[source]¶ Apply join to each pair of RDDs.
Return type: DStream Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> s1 = ssc.queueStream([[('a', 4), ('e', 2)], [('c', 7)]]) >>> s2 = ssc.queueStream([[('a', 1), ('b', 3)], [('c', 8)]]) >>> ( ... s1.join(s2) ... .foreachRDD(lambda rdd: print(sorted(rdd.collect()))) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.25) [('a', (4, 1))] [('c', (7, 8))]
-
leftOuterJoin
(other, numPartitions=None)[source]¶ Apply leftOuterJoin to each pair of RDDs.
Return type: DStream Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> s1 = ssc.queueStream([[('a', 4), ('e', 2)], [('c', 7)]]) >>> s2 = ssc.queueStream([[('a', 1), ('b', 3)], [('c', 8)]]) >>> ( ... s1.leftOuterJoin(s2) ... .foreachRDD(lambda rdd: print(sorted(rdd.collect()))) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.25) [('a', (4, 1)), ('e', (2, None))] [('c', (7, 8))]
-
map
(f, preservesPartitioning=False)[source]¶ Apply function f
Parameters: f – mapping function Return type: DStream Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> ( ... ssc ... .queueStream([[4], [2], [7]]) ... .map(lambda e: e + 1) ... .foreachRDD(lambda rdd: print(rdd.collect())) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.35) [5] [3] [8]
-
mapPartitions
(f, preservesPartitioning=False)[source]¶ Map partitions.
Parameters: f – mapping function Return type: DStream
-
mapPartitionsWithIndex
(f, preservesPartitioning=False)[source]¶ Apply a map function that takes an index and the data.
Map partitions with a function that takes the partition index and an iterator over the partition data as arguments.
Parameters: f – mapping function Return type: DStream
-
mapValues
(f)[source]¶ Apply
f
to every element.Return type: DStream Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> ( ... ssc ... .queueStream([[('a', 4)], [('b', 2)], [('c', 7)]]) ... .mapValues(lambda e: e + 1) ... .foreachRDD(lambda rdd: print(rdd.collect())) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.35) [('a', 5)] [('b', 3)] [('c', 8)]
-
pprint
(num=10)[source]¶ Print the first
num
elements of each RDD.Parameters: num (int) – Set number of elements to be printed.
-
reduce
(func)[source]¶ Return a new DStream where each RDD was reduced with
func
.Return type: DStream
-
reduceByKey
(func, numPartitions=None)[source]¶ Apply reduceByKey to every RDD.
Parameters: - func – reduce function to apply
- numPartitions (int) – number of partitions
Return type:
-
repartition
(numPartitions)[source]¶ Repartition every RDD.
Return type: DStream Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> ( ... ssc ... .queueStream([['hello', 'world']]) ... .repartition(2) ... .foreachRDD(lambda rdd: print(len(rdd.partitions()))) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.25) 2 0
-
rightOuterJoin
(other, numPartitions=None)[source]¶ Apply rightOuterJoin to each pair of RDDs.
Return type: DStream Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> s1 = ssc.queueStream([[('a', 4), ('e', 2)], [('c', 7)]]) >>> s2 = ssc.queueStream([[('a', 1), ('b', 3)], [('c', 8)]]) >>> ( ... s1.rightOuterJoin(s2) ... .foreachRDD(lambda rdd: print(sorted(rdd.collect()))) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.25) [('a', (4, 1)), ('b', (None, 3))] [('c', (7, 8))]
-
saveAsTextFiles
(prefix, suffix=None)[source]¶ Save every RDD as a text file (or sets of text files).
Parameters: - prefix (string) – path prefix of the output
- suffix (string) – file suffix (e.g. ‘.gz’ to enable compression)
Example:
>>> from backports import tempfile >>> import os, pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> with tempfile.TemporaryDirectory() as tmp_dir: ... ( ... ssc.queueStream([['hello', 'world'], [1, 2]]) ... .saveAsTextFiles(os.path.join(tmp_dir, 'textout')) ... ) ... ssc.start() ... ssc.awaitTermination(0.25) ... result = sc.textFile(tmp_dir + '*').collect() >>> result ['hello', 'world', '1', '2']
-
slice
(begin, end)[source]¶ Filter RDDs to between begin and end.
Parameters: - begin (datetime.datetime|int) – datetiem or unix timestamp
- end (datetime.datetime|int) – datetiem or unix timestamp
Return type:
-
transform
(func)[source]¶ Return a new DStream where each RDD is transformed by
f
.Parameters: f – Function that transforms an RDD. Return type: DStream
-
transformWith
(func, other, keepSerializer=False)[source]¶ Return a new DStream where each RDD is transformed by
f
.Parameters: f – transformation function Return type: DStream The transformation function can have arguments
(time, rdd_a, rdd_b)
or(rdd_a, rdd_b)
.
-
union
(other)[source]¶ Union of two DStreams.
Parameters: other (DStream) – Another DStream. Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> odd = ssc.queueStream([[1], [3], [5]]) >>> even = ssc.queueStream([[2], [4], [6]]) >>> ( ... odd.union(even) ... .foreachRDD(lambda rdd: print(rdd.collect())) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.35) [1, 2] [3, 4] [5, 6]
-
updateStateByKey
(func)[source]¶ Process with state.
Parameters: func – Evaluated per key. Takes list of input_values and a state. Return type: DStream This example shows how to return the latest value per key:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.2) >>> ( ... ssc ... .queueStream([[('a', 1), ('b', 3)], [('a', 2), ('c', 4)]]) ... .updateStateByKey(lambda input_values, state: ... state ... if not input_values ... else input_values[-1]) ... .foreachRDD(lambda rdd: print(sorted(rdd.collect()))) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.5) [('a', 1), ('b', 3)] [('a', 2), ('b', 3), ('c', 4)]
This example counts values per key:
>>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.2) >>> ( ... ssc ... .queueStream([[('a', 1)], [('a', 2), ('b', 4), ('b', 3)]]) ... .updateStateByKey(lambda input_values, state: ... (state if state is not None else 0) + ... sum(input_values)) ... .foreachRDD(lambda rdd: print(sorted(rdd.collect()))) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.5) [('a', 1)] [('a', 3), ('b', 7)]
-
window
(windowDuration, slideDuration=None)[source]¶ Windowed RDD.
Parameters: Return type: Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.2) >>> ( ... ssc ... .queueStream([[1], [2], [3], [4], [5], [6]]) ... .window(0.6) ... .foreachRDD(lambda rdd: print(rdd.collect())) ... ) >>> ssc.start() >>> ssc.awaitTermination(1.3) [1] [1, 2] [1, 2, 3] [2, 3, 4] [3, 4, 5] [4, 5, 6]