Parallelization

Pysparkling supports parallelizations on the local machine and across clusters of computers.

Processes and Threads

Single machine parallelization with concurrent.futures.ThreadPoolExecutor, concurrent.futures.ProcessPoolExecutor or multiprocessing.Pool is supported. Use cloudpickle instead of pickle for serialization to support lambda functions (and more) for data transformations.

import cloudpickle
import concurrent
import pysparkling

sc = pysparkling.Context(
    pool=concurrent.futures.ProcessPoolExecutor(4),
    serializer=cloudpickle.dumps,
    deserializer=pickle.loads,
)

Experimental

The following are experimental notes. Most of them don’t even contain examples how to make use of these techniques with pysparkling.

ipcluster and IPython.parallel

Local test setup:

ipcluster start --n=2
from IPython.parallel import Client

c = Client()
print(c[:].map(lambda _: 'hello world', range(2)).get())

which should print ['hello world', 'hello world'].

To run on a cluster, create a profile:

ipython profile create --parallel --profile=smallcluster

# start controller:
# Creates ~/.ipython/profile_smallcluster/security/ipcontroller-engine.json
# which is used by the engines to identify the location of this controller.
# This is the local-only IP address. Substitute with the machines IP
# address so that the engines can find it.
ipcontroller --ip=127.0.0.1 --port=7123 --profile=smallcluster

# start engines (assuming they have access to the
# ipcontroller-engine.json file)
ipengine --profile=smallcluster

Test it in Python:

from IPython.parallel import Client

c = Client(profile='smallcluster')
print(c[:].map(lambda _: 'hello world', range(2)).get())

If you don’t want to start the engines manually, ipcluster comes with “Launchers” that can start them for you: https://ipython.org/ipython-doc/dev/parallel/parallel_process.html#using-ipcluster-in-ssh-mode

StarCluster

Setting up StarCluster was an experiment. However it does not integrate well with the rest of our EC2 infrastructure, so we switched to a Chef based setup where we use ipcluster directly. A blocker was that the number of engines per node is not configurable and we have many map jobs that wait on external responses.

Setup

# install
pip install starcluster

# create configuration
starcluster help  # choose the option to create a sample config file

# add your user id, aws_access_key_id and aws_secret_access_key to config

# create an ssh key (this creates a new key just for starcluster)
# and registers it with AWS
starcluster createkey starclusterkey -o ~/.ssh/starclusterkey.rsa

# add this key to config:
[key starclusterkey]
KEY_LOCATION=~/.ssh/starclusterkey.rsa
# and use this key in the cluster setup:
KEYNAME = starclusterkey

# disable the queue, Sun Grid Engine
# (unnecessary for pysparkling and takes time during setup)
DISABLE_QUEUE=True

# to enable IPython parallel support, uncomment these lines in config:
[plugin ipcluster]
SETUP_CLASS = starcluster.plugins.ipcluster.IPCluster

# and make sure you have this line inside the cluster section
[cluster smallcluster]
PLUGINS = ipcluster

# start the cluster
starcluster start smallcluster

# check it has started
starcluster listclusters

Currently use: ami-da180db2 (Ubuntu 14.04 with 100GB EBS) on m3.medium instances.

Workarounds:

# this seems to be a dependency that does not get installed
pip install pexpect

# to validate the ssh host, you need to log in once manually, to add it
# to the list of known hosts
starcluster sshmaster smallcluster

In Python, you should now be able to run

from IPython.parallel import Client

# the exact command is printed after the cluster started
rc = Client('/Users/sven/.starcluster/ipcluster/SecurityGroup:@sc-smallcluster-us-east-1.json',
            sshkey='/Users/sven/.ssh/starclusterkey.rsa', packer='pickle')

view = rc[:]
results = view.map(lambda x: x**30, range(8))
print results.get()

which is also in tests/starcluster_simple.py.

Install your own software that is not on pypi:

pip install wheel
python setup.py bdist_wheel  # add --universal for Python2 and 3 packages
starcluster put smallcluster dist/your_package_name.whl /home/sgeadmin/your_package_name.whl

# ssh into remote machine
starcluster sshmaster smallcluster
> pip install --upgrade pip
> pip install wheel
> pip2.7 install /home/sgeadmin/your_package_name.whl