Python has become an increasingly popular tool for data analysis, including data processing, feature engineering, machine learning, and visualization. Data scientists and data engineers enjoy Python’s rich numerical and analytical libraries — such as NumPy, pandas, and scikit-learn — and have long wanted to apply them to large datasets stored in Hadoop clusters. While Apache Spark, through PySpark, has made data in Hadoop clusters more accessible to Python users, actually using these libraries on a Hadoop cluster remains challenging. In particular, setting up a full-featured and modern Python environment on a cluster can be challenging, error-prone, and time-consuming.

Continuum Analytics and Cloudera have partnered to create an Anaconda parcel for CDH to enable simple distribution and installation of popular Python packages and their dependencies.

Anaconda dramatically simplifies installation and management of popular Python packages and their dependencies, and this new parcel makes it easy for CDH users to deploy Anaconda across a Hadoop cluster for use in PySpark, Hadoop Streaming, and other contexts where Python is available and useful.

The newly available Anaconda parcel:

  • includes 300+ of the most popular Python packages
  • simplifies the installation of Anaconda across a CDH cluster
  • gives you access to Anaconda packages that you already know and love

The rest of this blog post will show you how to install and configure the Anaconda parcel, as well as provide an example of training a scikit-learn model on a single node and then using the model to make predictions on data in a cluster.

Installing the Anaconda Parcel

1. From the Cloudera Manager Admin Console, click the Parcels indicator in the top navigation bar.

2. Click the Edit Settings button on the top right of the Parcels page.

3. Click the plus symbol in the Remote Parcel Repository URLs section, and add the following repository URL for the Anaconda parcel: https://repo.continuum.io/pkgs/misc/parcels/

4. Cick the Save Changes button at the top of the page.

5. Click the Parcels indicator in the top navigation bar to return to the list of available parcels, where you should see the latest version of the Anaconda parcel that is available.

6. Click the Download button to the right of the Anaconda parcel listing.

7. After the parcel is downloaded, click the Distribute button to distribute the parcel to all of the cluster nodes.

8. After the parcel is distributed, click the Activate button to activate the parcel on all of the cluster nodes, which will prompt with a confirmation dialog.

9. After the parcel is activated, Anaconda is now available on all of the cluster nodes.

These instructions are current as of the day of publication. Up to date instructions will be maintained in Anaconda’s documentation.

To make Spark aware that we want to use the installed parcels as the Python runtime environment on the cluster, we need to set the PYSPARK_PYTHON environment variable. Spark determines which Python interpreter to use by checking the value of the PYSPARK_PYTHON environment variable on the driver node. With the default configuration for Cloudera Manager and parcels, Anaconda will be installed to /opt/cloudera/parcels/Anaconda, but if the parcel directory for Cloudera Manager has been changed, you will need to change the below instructions to ${YOUR_PARCEL_DIR}/Anaconda/bin/python.

To specify which Python to use on a per-application basis, we can specify it on the same line as our spark-submit command. This would look like:

$ PYSPARK_PYTHON=/opt/cloudera/parcels/Anaconda/bin/python spark-submit pyspark_script.py

You can also use Anaconda by default in Spark applications while still allowing users to override the value if they wish. To do this, you will need follow the instructions for Advanced Configuration Snippets and add the following lines to Spark’s configuration:

if [ -n "${PYSPARK_PYTHON}" ]; then
   export PYSPARK_PYTHON=/opt/cloudera/parcels/Anaconda/bin/python
fi

Now with Anaconda on your CDH cluster, there’s no need to manually install, manage, and provision Python packages on your Hadoop cluster.

Anaconda in Action

A commonly needed workflow for a Python using data scientist is to:

  1. Train a scikit-learn model on a single node
  2. Save the results to disk
  3. Apply the trained model using PySpark to generate predictions on a larger dataset.

Let’s take a classic machine learning classification problem as an example of what having complex Python dependencies from Anaconda installed on CDH cluster allows us to do.

The MNIST dataset is a canonical machine learning classification problem that involves recognizing handwritten digits, where each row of the dataset consists of a representation of one handwritten digit from 0 to 9. The training data we will use is the original MNIST dataset (60,000 rows). The prediction will be done with the MNIST8M dataset (8,000,000 rows). Both of these datasets are available from the libsvm datasets website. This dataset is used as a standard test for various machine learning algorithms. More information, including benchmarks, can be found on the MNIST Dataset website.

To train the model on a single node we will use scikit-learn and then we can save the model to a file with pickle:

with open('mnist.scale', 'r') as f:
    train = f.read()

with open('mnist.scale.t', 'r') as f:
    test = f.read()

import numpy as np

def parse(data):
    lines = data.split('n')
    lines = filter(lambda x: x.strip() != '', lines)
    nlines = len(lines)
    X = np.zeros((nlines, 784) , dtype=float)
    Y = np.zeros((nlines, ) , dtype=float)

    for n, line in enumerate(lines):
        line = line.strip()
        if line != '':
            parts = line.strip().split(' ')
            for pair in parts[1:]:
                pos, val = pair.strip().split(':')
                pos, val = int(pos), float(val)
                X[n, pos] = float(val)
            Y[n] = parts[0]
    return X, Y

X_train, Y_train = parse(train)
X_test, Y_test = parse(test)

from sklearn import svm, metrics

classifier = svm.SVC(gamma=0.001)
classifier.fit(X_train, Y_train)
predicted = classifier.predict(X_test)

print metrics.classification_report(Y_test, predicted)

import pickle
with open('classifier.pickle', 'w') as f:
    pickle.dump(classifier, f)

With the classifier now trained, it can be saved to disk and then copied to HDFS.

Next, we configure and create a SparkContext to run in yarn-client mode:

from pyspark import SparkConf
from pyspark import SparkContext

conf = SparkConf()
conf.setMaster('yarn-client')
conf.setAppName('sklearn-predict')
sc = SparkContext(conf=conf)

To load the MNIST8M data from HDFS into an RDD:

input_data = sc.textFile('hdfs:///tmp/mnist8m.scale')

We’ll do some preprocessing on this dataset to convert the text to a NumPy array, which will serve as input for the scikit-learn classifier. We’ve installed Anaconda on every cluster node, so both NumPy and scikit-learn are available to the Spark worker processes.

def clean(line):
    """
    Read the mnist8m file format and return a numpy array
    """
    import numpy as np
    X = np.zeros((1, 784) , dtype=float)
    parts = line.strip().split(' ')
    for pair in parts[1:]:
        pos, val = pair.strip().split(':')
        pos, val = int(pos), float(val)
        if pos < pos:
            X[0, pos] = float(val)
    return X
inputs = input_data.map(clean)

To import the scikit-learn model and load the training data:

from sklearn.externals import joblib
classifier = joblib.load('classifier.pickle')

In order to apply the trained model to a data in a large file in HDFS we need the trained model available in memory on the executors. To move the classifier from one node to all of the Spark workers we can then use the SparkContext.broadcast function to:

broadcastVar = sc.broadcast(classifier)

This broadcast variable is then available to us in executors. This means we can use the variable in logic that needs to be executed on the cluster, eg inside of map or flatMap functions. It is simple to apply the trained model and save the output to a file:

def apply_classifier(input_array):
    label = broadcastVar.value.predict(input_array)
    return label
predictions = inputs.map(apply_classifier)
predictions.saveAsTextFile('hdfs:///tmp/predictions')

To submit this code as a script we add the environment variable declaration at the beginning and then the usual spark-submit command:

PYSPARK_PYTHON=/opt/cloudera/parcels/Anaconda/bin/python spark-submit pyspark_job.py

Getting started with Anaconda on your CDH cluster is easy with the newly available parcel. Be sure to check out the Anaconda parcel documentation for more details.