Context¶
A Context
describes the setup. Instantiating a Context with the default
arguments using Context()
is the most lightweight setup. All data is just
in the local thread and is never serialized or deserialized.
If you want to process the data in parallel, you can use the multiprocessing
module. Given the limitations of the default pickle
serializer, you can
specify to serialize all methods with cloudpickle
instead. For example,
a common instantiation with multiprocessing
looks like this:
sc = pysparkling.Context(
multiprocessing.Pool(4),
serializer=cloudpickle.dumps,
deserializer=pickle.loads,
)
This assumes that your data is serializable with pickle
which is generally
faster. You can also specify a custom serializer/deserializer for data.
-
class
pysparkling.
Context
(pool=None, serializer=None, deserializer=None, data_serializer=None, data_deserializer=None, max_retries=3, retry_wait=0.0, cache_manager=None, catch_exceptions=False)[source]¶ Context object similar to a Spark Context.
The variable
_stats
contains measured timing information about data and function (de)serialization and workload execution to benchmark your jobs.Parameters: - pool – An instance with a
map(func, iterable)
method. - serializer – Serializer for functions. Examples are
pickle.dumps
andcloudpickle.dumps
. - deserializer – Deserializer for functions. For example
pickle.loads
. - data_serializer – Serializer for the data.
- data_deserializer – Deserializer for the data.
- max_retries (int) – maximum number a partition is retried
- retry_wait (float) – seconds to wait between retries
- cache_manager – custom cache manager (like
TimedCacheManager
) - catch_exceptions – whether to catch and silence user space exceptions
-
accumulator
(value, accum_param=None)[source]¶ Create an
Accumulator
with the given initial value, using a givenAccumulatorParam
helper object to define how to add values of the data type if provided. Default AccumulatorParams are used for integers and floating-point numbers if you do not provide one. For other types, a custom AccumulatorParam can be used.
-
parallelize
(x, numSlices=None)[source]¶ Parallelize x.
Parameters: - x – An iterable (e.g. a list) that represents the data.
- numSlices (int) – The number of partitions the data should be split into. A partition is a unit of data that is processed at a time.
Return type:
-
pickleFile
(name, minPartitions=None)[source]¶ Read a pickle file.
Reads files created with
RDD.saveAsPickleFile()
into an RDD.Parameters: - name – Location of a file. Can include schemes like
http://
,s3://
andfile://
, wildcard characters?
and*
and multiple expressions separated by,
. - minPartitions – (optional) By default, every file is a partition, but this option allows to split these further.
Return type: Example with a serialized list:
>>> import pickle >>> from pysparkling import Context >>> from tempfile import NamedTemporaryFile >>> tmpFile = NamedTemporaryFile(delete=True) >>> tmpFile.close() >>> with open(tmpFile.name, 'wb') as f: ... pickle.dump(['hello', 'world'], f) >>> Context().pickleFile(tmpFile.name).collect()[0] == 'hello' True
- name – Location of a file. Can include schemes like
-
runJob
(rdd, func, partitions=None, allowLocal=False, resultHandler=None)[source]¶ This function is used by methods in the RDD.
Note that the maps are only inside generators and the resultHandler needs to take care of executing the ones that it needs. In other words, if you need everything to be executed, the resultHandler needs to be at least
lambda x: list(x)
to trigger execution of the generators.Parameters: - func – Map function with signature func(TaskContext, Iterator over elements).
- partitions – List of partitions that are involved.
None
means the map job is applied to all partitions. - allowLocal – Allows local execution.
- resultHandler – Process the result from the maps.
Returns: Result of resultHandler.
Return type:
-
binaryFiles
(path, minPartitions=None)[source]¶ Read a binary file into an RDD.
Parameters: - path – Location of a file. Can include schemes like
http://
,s3://
andfile://
, wildcard characters?
and*
and multiple expressions separated by,
. - minPartitions – (optional) By default, every file is a partition, but this option allows to split these further.
Return type: Warning
Not part of PySpark API.
Setting up examples:
>>> import os, pysparkling >>> from backports import tempfile >>> sc = pysparkling.Context() >>> decode = lambda bstring: bstring.decode()
Example with whole file:
>>> with tempfile.TemporaryDirectory() as tmp: ... with open(os.path.join(tmp, 'test.b'), 'wb') as f: ... _ = f.write(b'bellobello') ... sc.binaryFiles(tmp+'*').values().map(decode).collect() ['bellobello']
- path – Location of a file. Can include schemes like
-
binaryRecords
(path, recordLength=None)[source]¶ Read a binary file into an RDD.
Parameters: - path – Location of a file. Can include schemes like
http://
,s3://
andfile://
, wildcard characters?
and*
and multiple expressions separated by,
. - recordLength – If
None
every file is a record,int
means fixed length records and astring
is used as a format string tostruct
to read the length of variable length binary records.
Return type: Warning
Only an
int
recordLength is part of the PySpark API.Setting up examples:
>>> import os, pysparkling >>> from backports import tempfile >>> sc = pysparkling.Context() >>> decode = lambda bstring: bstring.decode()
Example with whole file:
>>> with tempfile.TemporaryDirectory() as tmp: ... with open(os.path.join(tmp, 'test.b'), 'wb') as f: ... _ = f.write(b'bellobello') ... sc.binaryRecords(tmp+'*').map(decode).collect() ['bellobello']
Example with fixed length records:
>>> with tempfile.TemporaryDirectory() as tmp: ... with open(os.path.join(tmp, 'test.b'), 'wb') as f: ... _ = f.write(b'bellobello') ... sc.binaryRecords(tmp+'*', recordLength=5).map(decode).collect() ['bello', 'bello']
Example with variable length records:
>>> with tempfile.TemporaryDirectory() as tmp: ... with open(os.path.join(tmp, 'test.b'), 'wb') as f: ... _ = f.write(struct.pack('<I', 5) + b'bello') ... _ = f.write(struct.pack('<I', 10) + b'bellobello') ... (sc.binaryRecords(tmp+'*', recordLength='<I') ... .map(decode).collect()) ['bello', 'bellobello']
- path – Location of a file. Can include schemes like
-
textFile
(filename, minPartitions=None, use_unicode=True)[source]¶ Read a text file into an RDD.
Parameters: - filename – Location of a file. Can include schemes like
http://
,s3://
andfile://
, wildcard characters?
and*
and multiple expressions separated by,
. - minPartitions – (optional) By default, every file is a partition, but this option allows to split these further.
- use_unicode – (optional, default=True)
Use
utf8
ifTrue
andascii
ifFalse
.
Return type: - filename – Location of a file. Can include schemes like
-
wholeTextFiles
(path, minPartitions=None, use_unicode=True)[source]¶ Read text files into an RDD of pairs of file name and file content.
Parameters: - path – Location of the files. Can include schemes like
http://
,s3://
andfile://
, wildcard characters?
and*
and multiple expressions separated by,
. - minPartitions – (optional) By default, every file is a partition, but this option allows to split these further.
- use_unicode – (optional, default=True)
Use
utf8
ifTrue
andascii
ifFalse
.
Return type: - path – Location of the files. Can include schemes like
- pool – An instance with a