By: Martin Durant

Intake is an open source project for providing easy pythonic access to a wide variety of data formats, and a simple cataloging system for these data sources. Intake is a new project, and all are encouraged to try and comment on it.

pySpark is the python interface to Apache Spark, a fast and general purpose cluster computing system.

In this blog, we describe the new data driver for Intake, intake-spark, which allows data sources that are to be loaded via Spark to be described and enumerated in Intake catalogs alongside other data sources, files, and data services. In addition, some existing Intake drivers are acquiring methods to be loaded with Spark rather than python libraries.

This is part of a series of blog posts about Intake:

To run this code yourself, you first need to install requirements.

> conda install -c defaults -c intake intake intake-spark pyspark

New drivers

Intake-spark provides three new drivers for Intake:

  • spark_rdd: a handle to a Spark RDD, a list-like set of objects, allowing functional programming
  • spark_dataframe: a handle to a Spark DataFrame, a tabular data structure similar to the Pandas or Dask data-frames
  • spark_cat: a catalog driver, providing a data-frame entry for every table in Spark’s internal catalog, which may, in turn, be populated from Hive.

Let’s investigate the simplest case

In [1]:
import intake

We will perform a word count on a file in the current directory.

Encoding a Spark function for use with Intake is slightly involved, but not too difficult once one is familiar with it. The following cell encodes sc.textFile('cat.yaml').map(str.split).map(len), where sc is the SparkContext. This style is typical in working with Spark, and in the Intake version, each stage of attribute look-up becomes an element in a list, with the attribute name (a string) and any other parameters (a list, each time).

The file in question is created a few cells down, if you don’t already have it.

In [2]:
source = intake.open_spark_rdd([
    ['textFile', ['cat.yaml']],
    ['map', [str.split]],
    ['map', [len]]
], {})

Calling to_spark() on this grabs the Spark reference to the RDD. Note that we are setting up the SparkContext automatically, and it will get a local cluster with as many workers as CPU cores. The last argument, {}, can be used t specify how the context is set up.

In [3]:
rdd = source.to_spark()
rdd
Out[3]:
PythonRDD[2] at RDD at PythonRDD.scala:53

To complete the word count, we can call sum(). We check the result by calling the command-line utility wc.

In [4]:
rdd.sum()
Out[4]:
41

Note that the read() and read_partition() methods work as expected, providing normal python methods.

In [5]:
source.read()
Out[5]:
[1, 1, 1, 1, 3, 3, 3, 3, 3, 2, 3, 4, 2, 7, 2, 2]
In [6]:
sum(source.read())
Out[6]:
41
In [7]:
!wc -w cat.yaml
      41 cat.yaml

As usual, we can get the equivalent YAML for this source and put it in a file to make a catalog. Note how python functions are expressed and the nesting of lists. It is more typical to have single-stage spark chains and leave the manipulation of the RDDs to user code.

In [8]:
print(source.yaml())
sources:
  spark_rdd:
    args:
      args:
      - - textFile
        - - cat.yaml
      - - map
        - - !!python/object/apply:builtins.getattr
            - !!python/name:builtins.str ''
            - split
      - - map
        - - !!python/name:builtins.len ''
      context_kwargs: {}
    description: ''
    driver: spark_rdd
    metadata: {}

In [9]:
%%writefile cat.yaml
sources:
  example_rdd:
    args:
      args:
      - - textFile
        - - cat.yaml
      - - map
        - - !!python/object/apply:builtins.getattr
            - !!python/name:builtins.str ''
            - split
      - - map
        - - !!python/name:builtins.len ''
      context_kwargs: {}
    description: 'Gets number of words per line'
    driver: spark_rdd
    metadata: {}
Overwriting cat.yaml
In [10]:
# we get the same result from the catalog
cat = intake.open_catalog('cat.yaml')
cat.example_rdd.to_spark().sum()
Out[10]:
41

The same operation can be done with the spark_dataframe except that the starting point of the attribute lookups will be a SparkSession, as opposed to a SparkContext and a SparkDataframe output fromto_spark(). The data-frame variant is able to introspect to determine the data types of the columns before loading any data.

Making a Context and Session

The previous example used the default Spark context,local[*], because the argument to context_kwargs was an empty dictionary. In general, intake-spark will make use of any context/session that already exists. It is best to create contexts/sessions ahead of time, in the standard way of creating them for your system. You can be certain that intake-spark will pick them up by explicitly setting them as follows:

In [ ]:
from intake_spark.base import SparkHolder
SparkHolder.set_class_session(sc=sc, session=session)

Additionally, you can include a set of arguments, e.g.master=, app_name=, hive=True, that intake-spark will use to create the session for you. These arguments can also be encoded into catalogs under the keyword context_kwargs.

In [ ]:
SparkHolder.set_class_session(master='spark://my.server:7077', ...)

Existing drivers

As an experimental feature, certain drivers in the Intake ecosystem will also use the .to_spark() method, which will produce the equivalent Spark instance for the data type of the original source. First set up the session using the methods above, otherwise the default cluster will be assumed which possibly maybe the local cluster.

In these cases, most python-specific arguments are simply ignored, and only the URL is passed to Spark. When accessing remote data services, Spark must be already correctly configured to give you access.

The drivers currently supported are:

  • textfile as RDD
  • csv as DataFrame
  • parquet as DataFrame
  • avro as DataFrame (requires installation of avro java library)

Catalogs

Spark has its own internal concept of a catalog, which can be populated by registering tables in code or reflect tables that already exist on Apache Hive (Hive is the de-facto database warehouse layer in Hadoop deployments). You can get direct access to these as follows:

In [ ]:
spark_cat = intake.open_spark_cat()

The entries of the spark_cat catalog will be the same set of tables, each of which would be opened with the spark_dataframe driver.

This raises the question of, “why have an Intake catalog of sources that already exist in a catalog? By itself?” The advantage of the Intake catalog is to have your Spark/Hive sources alongside other non-Spark sources and other data sources (including third-part data services) under a single API, a single framework, and perhaps even hierarchically within the same set of Intake catalogs. From the user’s point of view, there is no operational difference between a data frame computed by Spark and one by Dask, and indeed, the Intake concept stretches to many more data types besides tabular data frames.

This drives home the basic idea of Intake: to provide a single place to describe and unify all other data sources/services, by being a simple layer over those services. With a little care, catalogs can be created to expose the whole of the data ecosystem to users, such that they need to know only the minimum of what to do with each kind of data once loaded.

Summary

By installing intake-spark, you can include RDDs and DataFrames as entries in an Intake catalog, alongside other sources, and also automatically find tables in the Spark catalog. In addition, some data types can be converted to Spark analogues just by calling to_spark.