RDD

class pysparkling.RDD(partitions, ctx)[source]

RDD

In Spark’s original form, RDDs are Resilient, Distributed Datasets. This class reimplements the same interface with the goal of being fast on small data at the cost of being resilient and distributed.

Parameters:
  • partitions (list) – A list of instances of Partition.
  • ctx (Context) – An instance of the applicable Context.
compute(split, task_context)[source]

interface to extend behavior for specific cases

Parameters:split (Partition) – a partition
aggregate(zeroValue, seqOp, combOp)[source]

aggregate

[distributed]

Parameters:
  • zeroValue – The initial value to an aggregation, for example 0 or 0.0 for aggregating int s and float s, but any Python object is possible.
  • seqOp – A reference to a function that combines the current state with a new value. In the first iteration, the current state is zeroValue.
  • combOp – A reference to a function that combines outputs of seqOp. In the first iteration, the current state is zeroValue.
Returns:

Output of combOp operations.

Example:

>>> from pysparkling import Context
>>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
>>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
>>> Context().parallelize(
...     [1, 2, 3, 4], 2
... ).aggregate((0, 0), seqOp, combOp)
(10, 4)
aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions=None)[source]

aggregate by key

Parameters:
  • zeroValue – The initial value to an aggregation, for example 0 or 0.0 for aggregating int s and float s, but any Python object is possible.
  • seqFunc – A reference to a function that combines the current state with a new value. In the first iteration, the current state is zeroValue.
  • combFunc – A reference to a function that combines outputs of seqFunc. In the first iteration, the current state is zeroValue.
  • numPartitions (int) – Not used.
Returns:

An RDD with the output of combOp operations.

Return type:

RDD

Example:

>>> from pysparkling import Context
>>> seqOp = (lambda x, y: x + y)
>>> combOp = (lambda x, y: x + y)
>>> r = Context().parallelize(
...     [('a', 1), ('b', 2), ('a', 3), ('c', 4)]
... ).aggregateByKey(0, seqOp, combOp).collectAsMap()
>>> (r['a'], r['b'])
(4, 2)
cache()[source]

Once a partition is computed, cache the result.

Alias for persist().

Example:

>>> import pysparkling
>>>
>>> n_exec = 0
>>>
>>> def _map(e):
...     global n_exec
...     n_exec += 1
...     return e*e
>>>
>>> sc = pysparkling.Context()
>>> my_rdd = sc.parallelize([1, 2, 3, 4], 2).map(_map).cache()
>>>
>>> # no exec until here
>>> n_exec
0
>>> # to get first element, compute the first partition
>>> my_rdd.first()
1
>>> n_exec
2
>>> # now compute the rest
>>> my_rdd.collect()
[1, 4, 9, 16]
>>> n_exec
4
>>> # now _map() was executed on all partitions and should
>>> # not be executed again
>>> my_rdd.collect()
[1, 4, 9, 16]
>>> n_exec
4
cartesian(other)[source]

cartesian product of this RDD with other

Parameters:other (RDD) – Another RDD.
Return type:RDD

Note

This is currently implemented as a local operation requiring all data to be pulled on one machine.

Example:

>>> from pysparkling import Context
>>> rdd = Context().parallelize([1, 2])
>>> sorted(rdd.cartesian(rdd).collect())
[(1, 1), (1, 2), (2, 1), (2, 2)]
coalesce(numPartitions, shuffle=False)[source]

coalesce

Parameters:
  • numPartitions (int) – Number of partitions in the resulting RDD.
  • shuffle – (optional) Not used.
Return type:

RDD

Note

This is currently implemented as a local operation requiring all data to be pulled on one machine.

Example:

>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 3], 2).coalesce(1).getNumPartitions()
1
>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 3], 2).coalesce(4).getNumPartitions()
2
>>> from pysparkling import Context
>>> rdd = Context().parallelize([1, 2, 3, 4, 5, 6, 7, 8], 5)
>>> rdd.foreachPartition(lambda x: print(list(x)))
[1]
[2, 3]
[4]
[5, 6]
[7, 8]
>>> rdd.coalesce(4).foreachPartition(lambda x: print(list(x)))
[1, 2, 3]
[4]
[5, 6]
[7, 8]
>>> rdd.coalesce(4).coalesce(3).foreachPartition(lambda x: print(list(x)))
[1, 2, 3, 4]
[5, 6]
[7, 8]
>>> rdd.coalesce(3).foreachPartition(lambda x: print(list(x)))
[1, 2, 3]
[4, 5, 6]
[7, 8]
cogroup(other, numPartitions=None)[source]

Groups keys from both RDDs together. Values are nested iterators.

Parameters:
  • other (RDD) – The other RDD.
  • numPartitions (int) – Number of partitions in the resulting RDD.
Return type:

RDD

Example:

>>> from pysparkling import Context
>>> c = Context()
>>> a = c.parallelize([('house', 1), ('tree', 2)])
>>> b = c.parallelize([('house', 3)])
>>>
>>> [(k, sorted(list([list(vv) for vv in v])))
...  for k, v in sorted(a.cogroup(b).collect())
... ]
[('house', [[1], [3]]), ('tree', [[], [2]])]
collect()[source]

returns the entire dataset as a list

Return type:list

Example:

>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 3]).collect()
[1, 2, 3]
collectAsMap()[source]

returns a dictionary for a pair dataset

Return type:dict

Example:

>>> from pysparkling import Context
>>> d = Context().parallelize([('a', 1), ('b', 2)]).collectAsMap()
>>> (d['a'], d['b'])
(1, 2)
count()[source]

number of entries in this dataset

Return type:int

Example:

>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 3], 2).count()
3
countApprox()[source]

same as count()

Return type:int
countByKey()[source]

returns a dict containing the count for every key

Return type:dict

Example:

>>> from pysparkling import Context
>>> Context().parallelize(
...     [('a', 1), ('b', 2), ('b', 2)]
... ).countByKey()['b']
2
countByValue()[source]

returns a dict containing the count for every value

Return type:dict

Example:

>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 2, 4, 1]).countByValue()[2]
2
distinct(numPartitions=None)[source]

returns only distinct elements

Parameters:numPartitions (int) – Number of partitions in the resulting RDD.
Return type:RDD

Example:

>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 2, 4, 1]).distinct().count()
3
filter(f)[source]

filter elements

Parameters:f – a function that decides whether to keep an element
Return type:RDD

Example:

>>> from pysparkling import Context
>>> Context().parallelize(
...     [1, 2, 2, 4, 1, 3, 5, 9], 3,
... ).filter(lambda x: x % 2 == 0).collect()
[2, 2, 4]
first()[source]

returns the first element in the dataset

Example:

>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 2, 4, 1, 3, 5, 9], 3).first()
1

Works also with empty partitions:

>>> from pysparkling import Context
>>> Context().parallelize([1, 2], 20).first()
1
flatMap(f, preservesPartitioning=True)[source]

map followed by flatten

Parameters:
  • f – The map function.
  • preservesPartitioning – (optional) Preserve the partitioning of the original RDD. Default True.
Return type:

RDD

Example:

>>> from pysparkling import Context
>>> Context().parallelize(['hello', 'world']).flatMap(
...     lambda x: [ord(ch) for ch in x]
... ).collect()
[104, 101, 108, 108, 111, 119, 111, 114, 108, 100]
flatMapValues(f)[source]

map operation on values in a (key, value) pair followed by a flatten

Parameters:f – The map function.
Return type:RDD

Example:

>>> from pysparkling import Context
>>> Context().parallelize([(1, 'hi'), (2, 'world')]).flatMapValues(
...     lambda x: [ord(ch) for ch in x]
... ).collect()
[(1, 104), (1, 105), (2, 119), (2, 111), (2, 114), (2, 108), (2, 100)]
fold(zeroValue, op)[source]

fold

Parameters:
  • zeroValue – The inital value, for example 0 or 0.0.
  • op – The reduce operation.
Returns:

The folded (or aggregated) value.

Example:

>>> from pysparkling import Context
>>> my_rdd = Context().parallelize([4, 7, 2])
>>> my_rdd.fold(0, lambda a, b: a+b)
13
foldByKey(zeroValue, op)[source]

Fold (or aggregate) value by key.

Parameters:
  • zeroValue – The inital value, for example 0 or 0.0.
  • op – The reduce operation.
Return type:

RDD

Example:

>>> from pysparkling import Context
>>> my_rdd = Context().parallelize([('a', 4), ('b', 7), ('a', 2)])
>>> my_rdd.foldByKey(0, lambda a, b: a+b).collectAsMap()['a']
6
foreach(f)[source]

applies f to every element

It does not return a new RDD like map().

Parameters:f – Apply a function to every element.
Return type:None

Example:

>>> from pysparkling import Context
>>> my_rdd = Context().parallelize([1, 2, 3])
>>> a = []
>>> my_rdd.foreach(lambda x: a.append(x))
>>> len(a)
3
foreachPartition(f)[source]

applies f to every partition

It does not return a new RDD like mapPartitions().

Parameters:f – Apply a function to every partition.
Return type:None
fullOuterJoin(other, numPartitions=None)[source]

returns the full outer join of two RDDs

The output contains all keys from both input RDDs, with missing keys replaced with None.

Parameters:
  • other (RDD) – The RDD to join to this one.
  • numPartitions (int) – Number of partitions in the resulting RDD.
Return type:

RDD

Note

Creating the new RDD is currently implemented as a local operation.

Example:

>>> from pysparkling import Context
>>> sc = Context()
>>> rdd1 = sc.parallelize([('a', 0), ('b', 1)])
>>> rdd2 = sc.parallelize([('b', 2), ('c', 3)])
>>> sorted(
...     rdd1.fullOuterJoin(rdd2).collect()
... )
[('a', (0, None)), ('b', (1, 2)), ('c', (None, 3))]
getNumPartitions()[source]

returns the number of partitions

Return type:int
getPartitions()[source]

returns the partitions of this RDD

Return type:list
groupBy(f, numPartitions=None)[source]

group by f

Parameters:
  • f – Function returning a key given an element of the dataset.
  • numPartitions (int) – Number of partitions in the resulting RDD.
Return type:

RDD

Note

Creating the new RDD is currently implemented as a local operation.

Example:

>>> from pysparkling import Context
>>> my_rdd = Context().parallelize([4, 7, 2])
>>> my_rdd.groupBy(lambda x: x % 2).mapValues(sorted).collect()
[(0, [2, 4]), (1, [7])]
groupByKey(numPartitions=None)[source]

group by key

Parameters:numPartitions (int) – Number of partitions in the resulting RDD.
Return type:RDD

Note

Creating the new RDD is currently implemented as a local operation.

histogram(buckets)[source]

histogram

Parameters:buckets – A list of bucket boundaries or an int for the number of buckets.
Returns:A tuple (bucket_boundaries, histogram_values) where bucket_boundaries is a list of length n+1 boundaries and histogram_values is a list of length n with the values of each bucket.

Example:

>>> from pysparkling import Context
>>> my_rdd = Context().parallelize([0, 4, 7, 4, 10])
>>> b, h = my_rdd.histogram(10)
>>> h
[1, 0, 0, 0, 2, 0, 0, 1, 0, 0, 1]
id()[source]

the id of this RDD

intersection(other)[source]

intersection of this and other RDD

Parameters:other (RDD) – The other dataset to do the intersection with.
Return type:RDD

Note

Creating the new RDD is currently implemented as a local operation.

Example:

>>> from pysparkling import Context
>>> rdd1 = Context().parallelize([0, 4, 7, 4, 10])
>>> rdd2 = Context().parallelize([3, 4, 7, 4, 5])
>>> rdd1.intersection(rdd2).collect()
[4, 7]
join(other, numPartitions=None)[source]

join

Parameters:
  • other (RDD) – The other RDD.
  • numPartitions (int) – Number of partitions in the resulting RDD.
Return type:

RDD

Note

Creating the new RDD is currently implemented as a local operation.

Example:

>>> from pysparkling import Context
>>> rdd1 = Context().parallelize([(0, 1), (1, 1)])
>>> rdd2 = Context().parallelize([(2, 1), (1, 3)])
>>> rdd1.join(rdd2).collect()
[(1, (1, 3))]
keyBy(f)[source]

key by f

Parameters:f – Function that returns a key from a dataset element.
Return type:RDD

Example:

>>> from pysparkling import Context
>>> rdd = Context().parallelize([0, 4, 7, 4, 10])
>>> rdd.keyBy(lambda x: x % 2).collect()
[(0, 0), (0, 4), (1, 7), (0, 4), (0, 10)]
keys()[source]

keys of a pair dataset

Return type:RDD

Example:

>>> from pysparkling import Context
>>> Context().parallelize([(0, 1), (1, 1)]).keys().collect()
[0, 1]
leftOuterJoin(other, numPartitions=None)[source]

left outer join

Parameters:
  • other (RDD) – The other RDD.
  • numPartitions (int) – Number of partitions in the resulting RDD.
Return type:

RDD

Note

Creating the new RDD is currently implemented as a local operation.

Example:

>>> from pysparkling import Context
>>> rdd1 = Context().parallelize([(0, 1), (1, 1)])
>>> rdd2 = Context().parallelize([(2, 1), (1, 3)])
>>> rdd1.leftOuterJoin(rdd2).collect()
[(0, (1, None)), (1, (1, 3))]
lookup(key)[source]

Return all the (key, value) pairs where the given key matches.

Parameters:key – The key to lookup.
Return type:list

Example:

>>> from pysparkling import Context
>>> Context().parallelize([(0, 1), (1, 1), (1, 3)]).lookup(1)
[1, 3]
map(f)[source]

map

Parameters:f – map function for elements
Return type:RDD

Example:

>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 3]).map(lambda x: x+1).collect()
[2, 3, 4]
mapPartitions(f, preservesPartitioning=False)[source]

map partitions

Parameters:f – map function for partitions
Return type:RDD

Example:

>>> from pysparkling import Context
>>> rdd = Context().parallelize([1, 2, 3, 4], 2)
>>> def f(iterator):
...     yield sum(iterator)
>>> rdd.mapPartitions(f).collect()
[3, 7]
mapPartitionsWithIndex(f, preservesPartitioning=False)[source]

map partitions with index

Parameters:f – map function for (index, partition)
Return type:RDD

Example:

>>> from pysparkling import Context
>>> rdd = Context().parallelize([9, 8, 7, 6, 5, 4], 3)
>>> def f(splitIndex, iterator):
...     yield splitIndex
>>> rdd.mapPartitionsWithIndex(f).sum()
3
mapValues(f)[source]

map values in a pair dataset

Parameters:f – map function for values
Return type:RDD
max()[source]

returns the maximum element

Example:

>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 3, 4, 3, 2], 2).max() == 4
True
mean()[source]

returns the mean of this dataset

Example:

>>> from pysparkling import Context
>>> Context().parallelize([0, 4, 7, 4, 10]).mean()
5.0
min()[source]

returns the minimum element

name()[source]

returns the name of the dataset

partitionBy(numPartitions, partitionFunc=None)[source]

Return a partitioned copy of this key-value RDD.

Parameters:
  • numPartitions (int) – Number of partitions.
  • partitionFunc (function) – Partition function.
Return type:

RDD

Example where even numbers get assigned to partition 0 and odd numbers to partition 1:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> rdd = sc.parallelize([1, 3, 2, 7, 8, 5], 1)
>>> keyvalue_rdd = rdd.map(lambda x: (x, x))
>>> keyvalue_rdd.partitionBy(2).keys().collect()
[2, 8, 1, 3, 7, 5]
persist(storageLevel=None)[source]

Cache the results of computed partitions.

Parameters:storageLevel – Not used.
pipe(command, env=None)[source]

Run a command with the elements in the dataset as argument.

Parameters:
  • command – Command line command to run.
  • env (dict) – environment variables
Return type:

RDD

Warning

Unsafe for untrusted data.

Example:

>>> from pysparkling import Context
>>> piped = Context().parallelize(['0', 'hello', 'world']).pipe('echo')
>>> b'hello\n' in piped.collect()
True
randomSplit(weights, seed=None)[source]

Split the RDD into a few RDDs according to the given weights.

Parameters:
  • weights (list[float]) – relative lengths of the resulting RDDs
  • seed (int) – seed for random number generator
Returns:

a list of RDDs

Return type:

list

Note

Creating the new RDDs is currently implemented as a local operation.

Example:

>>> from pysparkling import Context
>>> rdd = Context().parallelize(range(500))
>>> rdd1, rdd2 = rdd.randomSplit([2, 3], seed=42)
>>> (rdd1.count(), rdd2.count())
(199, 301)
reduce(f)[source]

reduce

Parameters:f – A commutative and associative binary operator.

Example:

>>> from pysparkling import Context
>>> Context().parallelize([0, 4, 7, 4, 10], 2).reduce(lambda a, b: a+b)
25
>>> Context().parallelize([0, 4, 7, 4, 10], 10).reduce(lambda a, b: a+b)
25
>>> Context().parallelize([0], 10).reduce(lambda a, b: a+b)
0
>>> Context().parallelize([], 10).reduce(lambda a, b: a+b)
Traceback (most recent call last):
...
ValueError: Can not reduce() empty RDD
reduceByKey(f, numPartitions=None)[source]

reduce by key

Parameters:
  • f – A commutative and associative binary operator.
  • numPartitions (int) – Number of partitions in the resulting RDD.
Return type:

RDD

Note

This operation includes a groupByKey() which is a local operation.

Example:

>>> from pysparkling import Context
>>> rdd = Context().parallelize([(0, 1), (1, 1), (1, 3)])
>>> rdd.reduceByKey(lambda a, b: a+b).collect()
[(0, 1), (1, 4)]
repartition(numPartitions)[source]

repartition

Parameters:numPartitions (int) – Number of partitions in new RDD.
Return type:RDD

Note

Creating the new RDD is currently implemented as a local operation.

Example:

>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 3], 2).repartition(1).getNumPartitions()
1
>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 3], 2).repartition(4).getNumPartitions()
4
repartitionAndSortWithinPartitions(numPartitions=None, partitionFunc=None, ascending=True, keyfunc=None)[source]

Repartition and sort within each partition.

Parameters:
  • numPartitions (int) – Number of partitions in new RDD.
  • partitionFunc – function that partitions
  • ascending – Sort order.
  • keyfunc – Returns the value that will be sorted.
Return type:

RDD

Example where even numbers are assigned to partition 0 and odd numbers to partition 1 and then the partitions are sorted individually:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> rdd = sc.parallelize([1, 3, 2, 7, 8, 5], 1)
>>> kv_rdd = rdd.map(lambda x: (x, x))
>>> processed = kv_rdd.repartitionAndSortWithinPartitions(2)
>>> processed.keys().collect()
[2, 8, 1, 3, 5, 7]
rightOuterJoin(other, numPartitions=None)[source]

right outer join

Parameters:
  • other (RDD) – The other RDD.
  • numPartitions (int) – Number of partitions in new RDD.
Return type:

RDD

Note

Creating the new RDD is currently implemented as a local operation.

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> rdd1 = sc.parallelize([(0, 1), (1, 1)])
>>> rdd2 = sc.parallelize([(2, 1), (1, 3)])
>>> sorted(rdd1.rightOuterJoin(rdd2).collect())
[(1, (1, 3)), (2, (None, 1))]
sample(withReplacement, fraction, seed=None)[source]

randomly sample

Parameters:
  • withReplacement (bool) – sample with replacement
  • fraction (float) – probability that an element is sampled
  • seed – (optional) Seed for random number generator
Return type:

RDD

Sampling without replacement uses Bernoulli sampling and fraction is the probability that an element is sampled. Sampling with replacement uses Poisson sampling where fraction is the expectation.

Example:

>>> from pysparkling import Context
>>> rdd = Context().parallelize(range(1000))
>>> sampled = rdd.sample(False, 0.1, seed=5).collect()
>>> len(sampled)
115
>>> sampled_with_replacement = rdd.sample(True, 5.0, seed=5).collect()
>>> len(sampled_with_replacement) in (5067, 5111)  # w/o, w/ numpy
True
sampleByKey(withReplacement, fractions, seed=None)[source]

randomly sample by key

Parameters:
  • withReplacement (bool) – sample with replacement
  • fractions (dict) – per key sample probabilities
  • seed – (optional) Seed for random number generator.
Return type:

RDD

Sampling without replacement uses Bernoulli sampling and fraction is the probability that an element is sampled. Sampling with replacement uses Poisson sampling where fraction is the expectation.

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> fractions = {"a": 0.2, "b": 0.1}
>>> rdd = (sc
...        .parallelize(fractions.keys())
...        .cartesian(sc.parallelize(range(0, 1000))))
>>> sample = (rdd
...           .sampleByKey(False, fractions, 2)
...           .groupByKey().collectAsMap())
>>> 100 < len(sample["a"]) < 300 and 50 < len(sample["b"]) < 150
True
>>> max(sample["a"]) <= 999 and min(sample["a"]) >= 0
True
>>> max(sample["b"]) <= 999 and min(sample["b"]) >= 0
True
sampleStdev()[source]

sample standard deviation

Return type:float

Example:

>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 3]).sampleStdev()
1.0
sampleVariance()[source]

sample variance

Return type:float

Example:

>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 3]).sampleVariance()
1.0
saveAsPickleFile(path, batchSize=10)[source]

save as pickle file

Returns:self
Return type:RDD

Warning

The output of this function is incompatible with the PySpark output as there is no pure Python way to write Sequence files.

Example:

>>> from pysparkling import Context
>>> from tempfile import NamedTemporaryFile
>>> tmpFile = NamedTemporaryFile(delete=True)
>>> tmpFile.close()
>>> d = ['hello', 'world', 1, 2]
>>> rdd = Context().parallelize(d).saveAsPickleFile(tmpFile.name)
>>> 'hello' in Context().pickleFile(tmpFile.name).collect()
True
saveAsTextFile(path, compressionCodecClass=None)[source]

save as text file

If the RDD has many partitions, the contents will be stored directly in the given path. If the RDD has more partitions, the data of the partitions are stored in individual files under path/part-00000 and so on and once all partitions are written, the file path/_SUCCESS is written last.

Parameters:
  • path – Destination of the text file.
  • compressionCodecClass – Not used.
Returns:

self

Return type:

RDD

setName(name)[source]

Assign a new name to this RDD.

Return type:RDD
sortBy(keyfunc, ascending=True, numPartitions=None)[source]

sort by keyfunc

Parameters:
  • keyfunc – Returns the value that will be sorted.
  • ascending – Specify sort order.
  • numPartitions (int) – None means the output will have the same number of partitions as the input.
Return type:

RDD

Note

Sorting is currently implemented as a local operation.

Examples:

>>> from pysparkling import Context
>>> rdd = Context().parallelize([5, 1, 2, 3])
>>> rdd.sortBy(lambda x: x).collect()
[1, 2, 3, 5]
>>> from pysparkling import Context
>>> rdd = Context().parallelize([1, 5, 2, 3])
>>> rdd.sortBy(lambda x: x, ascending=False).collect()
[5, 3, 2, 1]
sortByKey(ascending=True, numPartitions=None, keyfunc=<operator.itemgetter object>)[source]

sort by key

Parameters:
  • ascending – Sort order.
  • numPartitions (int) – None means the output will have the same number of partitions as the input.
  • keyfunc – Returns the value that will be sorted.
Return type:

RDD

Note

Sorting is currently implemented as a local operation.

Examples:

>>> from pysparkling import Context
>>> rdd = Context().parallelize(
...     [(5, 'a'), (1, 'b'), (2, 'c'), (3, 'd')]
... )
>>> rdd.sortByKey().collect()[0][1] == 'b'
True
>>> from pysparkling import Context
>>> rdd = Context().parallelize(
...     [(1, 'b'), (5, 'a'), (2, 'c'), (3, 'd')]
... )
>>> rdd.sortByKey(ascending=False).collect()[0][1] == 'a'
True
stats()[source]

stats

Return type:StatCounter

Example:

>>> from pysparkling import Context
>>> d = [1, 4, 9, 16, 25, 36]
>>> s = Context().parallelize(d, 3).stats()
>>> sum(d)/len(d) == s.mean()
True
stdev()[source]

standard deviation

Return type:float

Example:

>>> from pysparkling import Context
>>> Context().parallelize([1.5, 2.5]).stdev()
0.5
subtract(other, numPartitions=None)[source]

subtract

Parameters:
  • other (RDD) – The RDD to subtract from the current RDD.
  • numPartitions (int) – Currently not used. Partitions are preserved.
Return type:

RDD

Example:

>>> from pysparkling import Context
>>> rdd1 = Context().parallelize([(0, 1), (1, 1)])
>>> rdd2 = Context().parallelize([(1, 1), (1, 3)])
>>> rdd1.subtract(rdd2).collect()
[(0, 1)]
sum()[source]

sum of all the elements

Return type:float

Example:

>>> from pysparkling import Context
>>> Context().parallelize([0, 4, 7, 4, 10]).sum()
25
take(n)[source]

Take n elements and return them in a list.

Only evaluates the partitions that are necessary to return n elements.

Parameters:n (int) – Number of elements to return.
Return type:list

Example:

>>> from pysparkling import Context
>>> Context().parallelize([4, 7, 2]).take(2)
[4, 7]

Another example where only the first two partitions only are computed (check the debug logs):

>>> from pysparkling import Context
>>> Context().parallelize([4, 7, 2], 3).take(2)
[4, 7]
takeSample(n)[source]

take sample

Assumes samples are evenly distributed between partitions. Only evaluates the partitions that are necessary to return n elements.

Parameters:n (int) – The number of elements to sample.
Return type:list

Example:

>>> from pysparkling import Context
>>> Context().parallelize([4, 7, 2]).takeSample(1)[0] in [4, 7, 2]
True

Another example where only one partition is computed (check the debug logs):

>>> from pysparkling import Context
>>> d = [4, 9, 7, 3, 2, 5]
>>> Context().parallelize(d, 3).takeSample(1)[0] in d
True
toLocalIterator()[source]

Returns an iterator over the dataset.

Example:

>>> from pysparkling import Context
>>> sum(Context().parallelize([4, 9, 7, 3, 2, 5], 3).toLocalIterator())
30
top(num, key=None)[source]

Top N elements in descending order.

Parameters:
  • num (int) – number of elements
  • key – optional key function
Return type:

list

Example:

>>> from pysparkling import Context
>>> r = Context().parallelize([4, 9, 7, 3, 2, 5], 3)
>>> r.top(2)
[9, 7]
union(other)[source]

union

Parameters:other (RDD) – The other RDD for the union.
Return type:RDD

Example:

>>> from pysparkling import Context
>>> my_rdd = Context().parallelize([4, 9, 7, 3, 2, 5], 3)
>>> my_rdd.union(my_rdd).count()
12
values()[source]

Values of a (key, value) dataset.

Return type:RDD
variance()[source]

The variance of the dataset.

Return type:float

Example:

>>> from pysparkling import Context
>>> Context().parallelize([1.5, 2.5]).variance()
0.25
zip(other)[source]

zip

Parameters:other (RDD) – Other dataset to zip with.
Return type:RDD

Note

Creating the new RDD is currently implemented as a local operation.

Example:

>>> from pysparkling import Context
>>> my_rdd = Context().parallelize([4, 9, 7, 3, 2, 5], 3)
>>> my_rdd.zip(my_rdd).collect()
[(4, 4), (9, 9), (7, 7), (3, 3), (2, 2), (5, 5)]
zipWithIndex()[source]

Returns pairs of an original element and its index.

Return type:RDD

Note

Creating the new RDD is currently implemented as a local operation.

Example:

>>> from pysparkling import Context
>>> my_rdd = Context().parallelize([4, 9, 7, 3, 2, 5], 3)
>>> my_rdd.zipWithIndex().collect()
[(4, 0), (9, 1), (7, 2), (3, 3), (2, 4), (5, 5)]
zipWithUniqueId()[source]

Zip every entry with a unique index.

This is a fast operation.

Return type:RDD

Example:

>>> from pysparkling import Context
>>> my_rdd = Context().parallelize([423, 234, 986, 5, 345], 3)
>>> my_rdd.zipWithUniqueId().collect()
[(423, 0), (234, 1), (986, 4), (5, 2), (345, 5)]
class pysparkling.StatCounter(values=None)[source]