Context

A Context describes the setup. Instantiating a Context with the default arguments using Context() is the most lightweight setup. All data is just in the local thread and is never serialized or deserialized.

If you want to process the data in parallel, you can use the multiprocessing module. Given the limitations of the default pickle serializer, you can specify to serialize all methods with cloudpickle instead. For example, a common instantiation with multiprocessing looks like this:

c = Context(
    multiprocessing.Pool(4),
    serializer=cloudpickle.dumps,
    deserializer=pickle.loads,
)

This assumes that your data is serializable with pickle which is generally faster. You can also specify a custom serializer/deserializer for data.

class pysparkling.Context(pool=None, serializer=None, deserializer=None, data_serializer=None, data_deserializer=None, max_retries=3, retry_wait=0.0)[source]

Context object similar to a Spark Context.

The variable _stats contains measured timing information about data and function (de)serialization and workload execution to benchmark your jobs.

Parameters:
  • pool – An instance with a map(func, iterable) method.
  • serializer – Serializer for functions. Examples are pickle.dumps and dill.dumps.
  • deserializer – Deserializer for functions. Examples are pickle.loads and dill.loads.
  • data_serializer – Serializer for the data.
  • data_deserializer – Deserializer for the data.
  • max_retries (int) – maximum number a partition is retried
  • retry_wait (float) – seconds to wait between retries
parallelize(x, numPartitions=None)[source]

Parallelize x.

Parameters:
  • x – An iterable (e.g. a list) that represents the data.
  • numPartitions (int|None) – (optional) The number of partitions the data should be split into. A partition is a unit of data that is processed at a time. Can be None.
Return type:

RDD

pickleFile(name, minPartitions=None)[source]

Read a pickle file.

Reads files created with RDD.saveAsPickleFile() into an RDD.

Parameters:
  • name – Location of a file. Can include schemes like http://, s3:// and file://, wildcard characters ? and * and multiple expressions separated by ,.
  • minPartitions – (optional) By default, every file is a partition, but this option allows to split these further.
Return type:

RDD

Example with a serialized list:

>>> import pickle
>>> from pysparkling import Context
>>> from tempfile import NamedTemporaryFile
>>> tmpFile = NamedTemporaryFile(delete=True)
>>> tmpFile.close()
>>> with open(tmpFile.name, 'wb') as f:
...     pickle.dump(['hello', 'world'], f)
>>> Context().pickleFile(tmpFile.name).collect()[0] == 'hello'
True
runJob(rdd, func, partitions=None, allowLocal=False, resultHandler=None)[source]

This function is used by methods in the RDD.

Note that the maps are only inside generators and the resultHandler needs to take care of executing the ones that it needs. In other words, if you need everything to be executed, the resultHandler needs to be at least lambda x: list(x) to trigger execution of the generators.

Parameters:
  • func – Map function. The signature is func(TaskContext, Iterator over elements).
  • partitions – (optional) List of partitions that are involved. Default is None, meaning the map job is applied to all partitions.
  • allowLocal – (optional) Allows for local execution. Default is False.
  • resultHandler – (optional) Process the result from the maps.
Returns:

Result of resultHandler.

binaryFiles(path, minPartitions=None)[source]

Read a binary file into an RDD.

Parameters:
  • path – Location of a file. Can include schemes like http://, s3:// and file://, wildcard characters ? and * and multiple expressions separated by ,.
  • minPartitions – (optional) By default, every file is a partition, but this option allows to split these further.
Return type:

RDD

Warning

Not part of PySpark API.

Setting up examples:

>>> import os, pysparkling
>>> from backports import tempfile
>>> sc = pysparkling.Context()
>>> decode = lambda bstring: bstring.decode()

Example with whole file:

>>> with tempfile.TemporaryDirectory() as tmp:
...     with open(os.path.join(tmp, 'test.b'), 'wb') as f:
...         _ = f.write(b'bellobello')
...     sc.binaryFiles(tmp+'*').mapValues(decode).collect()
[('...', 'bellobello')]
binaryRecords(path, recordLength=None)[source]

Read a binary file into an RDD.

Parameters:
  • path – Location of a file. Can include schemes like http://, s3:// and file://, wildcard characters ? and * and multiple expressions separated by ,.
  • recordLength – If None every file is a record, int means fixed length records and a string is used as a format string to struct to read the length of variable length binary records.
Return type:

RDD

Warning

Only an int recordLength is part of the PySpark API.

Setting up examples:

>>> import os, pysparkling
>>> from backports import tempfile
>>> sc = pysparkling.Context()
>>> decode = lambda bstring: bstring.decode()

Example with whole file:

>>> with tempfile.TemporaryDirectory() as tmp:
...     with open(os.path.join(tmp, 'test.b'), 'wb') as f:
...         _ = f.write(b'bellobello')
...     sc.binaryRecords(tmp+'*').map(decode).collect()
['bellobello']

Example with fixed length records:

>>> with tempfile.TemporaryDirectory() as tmp:
...     with open(os.path.join(tmp, 'test.b'), 'wb') as f:
...         _ = f.write(b'bellobello')
...     sc.binaryRecords(tmp+'*', recordLength=5).map(decode).collect()
['bello', 'bello']

Example with variable length records:

>>> with tempfile.TemporaryDirectory() as tmp:
...     with open(os.path.join(tmp, 'test.b'), 'wb') as f:
...         _ = f.write(struct.pack('<I', 5) + b'bello')
...         _ = f.write(struct.pack('<I', 10) + b'bellobello')
...     (sc.binaryRecords(tmp+'*', recordLength='<I')
...      .map(decode).collect())
['bello', 'bellobello']
textFile(filename, minPartitions=None, use_unicode=True)[source]

Read a text file into an RDD.

Parameters:
  • filename – Location of a file. Can include schemes like http://, s3:// and file://, wildcard characters ? and * and multiple expressions separated by ,.
  • minPartitions – (optional) By default, every file is a partition, but this option allows to split these further.
  • use_unicode – (optional, default=True) Use utf8 if True and ascii if False.
Return type:

RDD

union(rdds)[source]

Create a union of rdds.

Parameters:rdds – Iterable of RDDs.
Return type:RDD
wholeTextFiles(path, minPartitions=None, use_unicode=True)[source]

Read text files into an RDD of pairs of file name and file content.

Parameters:
  • path – Location of the files. Can include schemes like http://, s3:// and file://, wildcard characters ? and * and multiple expressions separated by ,.
  • minPartitions – (optional) By default, every file is a partition, but this option allows to split these further.
  • use_unicode – (optional, default=True) Use utf8 if True and ascii if False.
Return type:

RDD