Source code for pysparkling.streaming.context

from __future__ import absolute_import, print_function

import logging
import time

from .dstream import DStream
from .filestream import (FileStream, FileTextStreamDeserializer,
                         FileBinaryStreamDeserializer)
from .queuestream import QueueStream, QueueStreamDeserializer

try:
    from .tcpstream import TCPTextStream, TCPBinaryStream, TCPDeserializer
except ImportError:
    TCPTextStream = False
    TCPBinaryStream = False
    TCPDeserializer = False

try:
    from tornado.ioloop import IOLoop, PeriodicCallback
    from tornado.queues import Queue
except ImportError:
    IOLoop = False
    Queue = False
    PeriodicCallback = False

try:
    from tornado import speedups  # pylint: disable=unused-import
except ImportError:
    speedups = None

log = logging.getLogger(__name__)


[docs]class StreamingContext(object): """Stream processing. :param pysparkling.Context sparkContext: A pysparkling.Context. :param float batchDuration: Duration in seconds per batch. """ _activeContext = None def __init__(self, sparkContext, batchDuration=None): self._context = sparkContext self.batch_duration = batchDuration if batchDuration is not None else 1 self._dstreams = [] self._pcb = None self._on_stop_cb = [] @property def sparkContext(self): """Return context of this StreamingContext.""" return self._context def _add_dstream(self, dstream): self._dstreams.append(dstream)
[docs] def awaitTermination(self, timeout=None): """Wait for context to stop. :param float timeout: in seconds """ if timeout is not None: IOLoop.current().call_later(timeout, self.stop) IOLoop.current().start() IOLoop.clear_current()
[docs] def awaitTerminationOrTimeout(self, timeout): """Provided for compatibility. Same as ``awaitTermination()`` here.""" return self.awaitTermination(timeout)
[docs] def binaryRecordsStream(self, directory, recordLength=None, process_all=False): """Monitor a directory and process all binary files. File names starting with ``.`` are ignored. :param string directory: a path :param recordLength: None, int or struct format string :param bool process_all: whether to process pre-existing files :rtype: DStream .. warning:: Only ``int`` ``recordLength`` are supported in PySpark API. The ``process_all`` parameter does not exist in the PySpark API. """ deserializer = FileBinaryStreamDeserializer(self._context, recordLength) file_stream = FileStream(directory, process_all) self._on_stop_cb.append(file_stream.stop) return DStream(file_stream, self, deserializer)
[docs] def queueStream(self, rdds, oneAtATime=True, default=None): """Create stream iterable over RDDs. :param rdds: Iterable over RDDs or lists. :param oneAtATime: Process one at a time or all. :param default: If no more RDDs in ``rdds``, return this. Can be None. :rtype: DStream Example: >>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> ( ... ssc ... .queueStream([[4], [2], [7]]) ... .foreachRDD(lambda rdd: print(rdd.collect())) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.35) [4] [2] [7] Example testing the default value: >>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> ( ... ssc ... .queueStream([[4], [2]], default=['placeholder']) ... .foreachRDD(lambda rdd: print(rdd.collect())) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.35) [4] [2] ['placeholder'] """ deserializer = QueueStreamDeserializer(self._context) if default is not None: default = deserializer(default) if Queue is False: log.error('Run "pip install tornado" to install tornado.') q = Queue() for i in rdds: q.put(i) qstream = QueueStream(q, oneAtATime, default) return DStream(qstream, self, deserializer)
[docs] def remember(self, duration): """Provided for compatibility, but does nothing here.""" pass
[docs] def socketBinaryStream(self, hostname, port, length): """Create a TCP socket server for binary input. .. warning:: This is not part of the PySpark API. :param string hostname: Hostname of TCP server. :param int port: Port of TCP server. :param length: Message length. Length in bytes or a format string for ``struct.unpack()``. For variable length messages where the message length is sent right before the message itself, ``length`` is a format string that can be passed to ``struct.unpack()``. For example, use ``length='<I'`` for a little-endian (standard on x86) 32-bit unsigned int. :rtype: DStream """ deserializer = TCPDeserializer(self._context) tcp_binary_stream = TCPBinaryStream(length) tcp_binary_stream.listen(port, hostname) self._on_stop_cb.append(tcp_binary_stream.stop) return DStream(tcp_binary_stream, self, deserializer)
[docs] def socketTextStream(self, hostname, port): """Create a TCP socket server. :param string hostname: Hostname of TCP server. :param int port: Port of TCP server. :rtype: DStream """ deserializer = TCPDeserializer(self._context) tcp_text_stream = TCPTextStream() tcp_text_stream.listen(port, hostname) self._on_stop_cb.append(tcp_text_stream.stop) return DStream(tcp_text_stream, self, deserializer)
[docs] def start(self): """Start processing streams.""" def cb(): time_ = time.time() log.debug('Step {}'.format(time_)) # run a step on all streams for d in self._dstreams: d._step(time_) self._pcb = PeriodicCallback(cb, self.batch_duration * 1000.0) self._pcb.start() self._on_stop_cb.append(self._pcb.stop) StreamingContext._activeContext = self
[docs] def stop(self, stopSparkContext=True, stopGraceFully=False): """Stop processing streams. :param stopSparkContext: stop the SparkContext (NOT IMPLEMENTED) :param stopGracefully: stop gracefully (NOT IMPLEMENTED) """ while self._on_stop_cb: cb = self._on_stop_cb.pop() log.debug('calling on_stop_cb {}'.format(cb)) cb() IOLoop.current().stop() StreamingContext._activeContext = None
[docs] def textFileStream(self, directory, process_all=False): """Monitor a directory and process all text files. File names starting with ``.`` are ignored. :param string directory: a path :param bool process_all: whether to process pre-existing files :rtype: DStream .. warning:: The ``process_all`` parameter does not exist in the PySpark API. """ deserializer = FileTextStreamDeserializer(self._context) file_stream = FileStream(directory, process_all) self._on_stop_cb.append(file_stream.stop) return DStream(file_stream, self, deserializer)
#: Alias of :func:`.textFileStream`. fileTextStream = textFileStream #: Alias of :func:`.binaryRecordsStream`. fileBinaryStream = binaryRecordsStream