We’ve been working on some fun stuff lately, namely dask and Anaconda Cluster. So we have been experimenting with analyzing all the github data for 2015 on an EC2 cluster (a distributed memory environment). We use Anaconda Cluster to set up a 50 node cluster onEC2, then use dask for analysis.

Dask is a tool for out-of-core, parallel data analysis. Recently we added a distributed memory scheduler for running dask on clusters.

(Disclaimer added 2016 May 10: Much of the information in this post has been made obsolete with the introduction of dask.distributed. Read more on the dask.distributed site.)

(Disclaimer added 2016 January 20: Some of the code mentioned below has been removed, including the distributed memory scheduler for using dask with IPython Parallels, but please stay tuned for more blog posts coming soon from the dask team discussing the better alternatives they have recently implemented!)

We’ve been working on some fun stuff lately, namely dask and Anaconda Cluster. So we have been experimenting with analyzing all the github data for 2015 on an EC2 cluster (a distributed memory environment). We use Anaconda Cluster to set up a 50 node cluster onEC2, then use dask for analysis.

Dask is a tool for out-of-core, parallel data analysis. Recently we added a distributed memory scheduler for running dask on clusters. We will also be using dask.bag, which provides an API for operations on unordered lists (like sets but with duplicates). It is great for dealing with semi-structured data like JSON blobs or log files. More blogposts about dask can be found here or here.

Anaconda Cluster lets us easily setup clusters and manage the packages in them with conda. Running the cluster for this demo is just a few lines.

acluster create my-cluster -p aws_profile  # create the cluster
acluster install notebook dask-cluster  # install plugins that setup an ipython notebook and dask cluster
acluster conda install boto ujson # install conda packages we need for the demo
acluster open notebook  # open ipython-notebook in the browser to interact with the cluster

 

Bash

While dask.distributed is well integrated with Anaconda Cluster it isn’t restricted to it. This blogpost shows how to set up a dask.distributed network manually and these docs show how to set up dask.distributed from any IPyParallel cluster.

Related Projects

Projects for python analytics in a distributed memory environment:

Github Archive data on S3

We took data from githubarchive.com, from January 2015 to May 2015, and put this on S3. We choose S3 because there are nice python libraries for interacting with it, and we can get awesome bandwidth from EC2 to S3. (The script for gathering this data is here).

Lets inspect the data first so we can find something to analyze and learn the data schema. You can inspect the data yourself in the githubarchive-data S3 bucket.

Inspect S3 data with dask.bag

We have approximately 28 GB of data. One file per hour, averaging around 7.8 MB each (compressed). So what is the schema and how can we inspect it? We take one file and turn it into a dask.Bag for analysis on our local machine.

>>> import dask.bag as db
>>> import ujson as json
 
>>> # take one file from the bucket load it as a json object, not gz decompression
>>> # happens automatically at compute time.
>>> b = db.from_s3('githubarchive-data', '2015-01-01-0.json.gz').map(json.loads)
 
>>> first = b.take(1)[0]  # take the first json object from the file
>>> first
 
{u'actor': {u'avatar_url': u'https://avatars.githubusercontent.com/u/9152315?',
  u'gravatar_id': u'',
  u'id': 9152315,
  u'login': u'davidjhulse',
  u'url': u'https://api.github.com/users/davidjhulse'},
 u'created_at': u'2015-01-01T00:00:00Z',
 u'id': u'2489368070',
 u'payload': {u'before': u'86ffa724b4d70fce46e760f8cc080f5ec3d7d85f',
  u'commits': [{u'author': {u'email': u'[email protected]',
     u'name': u'davidjhulse'},
    u'distinct': True,
    u'message': u'Altered BingBot.jarnnFixed issue with multiple account support',
    u'sha': u'a9b22a6d80c1e0bb49c1cf75a3c075b642c28f81',
    u'url': u'https://api.github.com/repos/davidjhulse/davesbingrewardsbot/commits/a9b22a6d80c1e0bb49c1cf75a3c075b642c28f81'}],
  u'distinct_size': 1,
  u'head': u'a9b22a6d80c1e0bb49c1cf75a3c075b642c28f81',
  u'push_id': 536740396,
  u'ref': u'refs/heads/master',
  u'size': 1},
 u'public': True,
 u'repo': {u'id': 28635890,
  u'name': u'davidjhulse/davesbingrewardsbot',
  u'url': u'https://api.github.com/repos/davidjhulse/davesbingrewardsbot'},
 u'type': u'PushEvent'}

 

Python
>>> first.keys()  # top level keys in this json object
[u'payload', u'created_at', u'actor', u'public', u'repo', u'type', u'id']

 

Python

Type looks interesting. What are possible types and how often does each occur? We can inspect this with dask.bag.frequencies.

>>> %time b.pluck('type').frequencies().compute()
CPU times: user 8 ms, sys: 76 ms, total: 84 ms
Wall time: 1.28 s
 
[(u'ReleaseEvent', 24),
 (u'PublicEvent', 2),
 (u'PullRequestReviewCommentEvent', 85),
 (u'ForkEvent', 213),
 (u'MemberEvent', 16),
 (u'PullRequestEvent', 315),
 (u'IssueCommentEvent', 650),
 (u'PushEvent', 4280),
 (u'DeleteEvent', 141),
 (u'CommitCommentEvent', 56),
 (u'WatchEvent', 642),
 (u'IssuesEvent', 373),
 (u'CreateEvent', 815),
 (u'GollumEvent', 90)]

 

Python

Top Committers

So most events are pushes, that is not surprising. Lets ask “Who pushes the most?”.

We do this by filtering out PushEvents. Then we count the frequencies of usernames for the pushes. Then take the top 5.

>>> pushes = b.filter(lambda x: x['type'] == 'PushEvent')  # filter out the push events
>>> names = pushes.pluck('actor').pluck('login') # get the login names
>>> top_5 = names.frequencies().topk(5, key=lambda (name, count): count)  # List top 5 pushers
>>> %time top_5.compute()  # run the above computations
CPU times: user 12 ms, sys: 64 ms, total: 76 ms
Wall time: 1.26 s
 
[(u'KenanSulayman', 79),
 (u'mirror-updates', 42),
 (u'cm-gerrit', 35),
 (u'qdm', 29),
 (u'greatfire', 24)]

 

Python

These users pushed the most, but push can have multiple commits. So we can ask “who pushed the most commits?”.

We can figure this out by grouping by username, then summing the number of commits from every push, for each user. More technically speaking, we want to GroupBy on usernames, so for each username we get a list their of PushEvents. Then reduce each PushEvent by taking a count of their commits. Then reducing these counts by suming them for each user. So we are grouping then reducing.

However there are algorithms for grouping and reducing simultaneously which avoid expensive shuffle operations and are much faster. In dask bag we have foldby. Analogous methods: toolz.reduceby, and in pyspark RDD.combineByKey.

def get_logins(x):
    """The key for foldby, like a groupby key. Get the username from a PushEvent"""
    return x['actor']['login']
 
def binop(total, x):
    """Count the number of commits in a PushEvent"""
    return total + len(x['payload']['commits'])
 
def combine(total1, total2):
    """This combines commit counts from PushEvents"""
    return total1 + total2

 

Python
>>> commits = pushes.foldby(get_logins, binop, initial=0, combine=combine)
>>> top_commits = commits.topk(5, key=lambda (name, count): count)
>>> %time top_commits.compute()
CPU times: user 12 ms, sys: 64 ms, total: 76 ms
Wall time: 1.28 s
 
[(u'mirror-updates', 413),
 (u'jrmarino', 88),
 (u'javra', 80),
 (u'KenanSulayman', 79),
 (u'chcholman', 51)]

 

Python

Recall this dask.Bag had one file. Now that we know how to get the top committers, we’ll gradually load more data, and benchmark the dask.distributed scheduler against the default dask.multiprocessing scheduler.

Benchmarking dask.distributed

First we setup the distributed scheduler…

# dask.distributed setup
import dask
from dask.distributed import Client
 
dc = Client('tcp://localhost:9000') # client connected to 50 nodes, 2 workers per node.

 

Python

… and perform our same computation using this distributed cluster rather than the default single-node scheduler.

>>> b = db.from_s3('githubarchive-data', '2015-*.json.gz').map(json.loads)
>>> # do the steps above on the larger dataset
>>> top_commits.compute()              # use single node scheduler or...
>>> top_commits.compute(get=dc.get)    # use client with distributed cluster
[(u'mirror-updates', 1463019),
 (u'KenanSulayman', 235300),
 (u'greatfirebot', 167558),
 (u'rydnr', 133323),
 (u'markkcc', 127625)]

 

Python

We write a small script to benchmark this process on increasing data sizes here but omit it for space reasons inside this post. We present the size, computation time, and effective bandwidth of datasets of increasing size.

  Single file 1 day 10 days Jan ‘15 J-M ‘15
Size (GB) 0.0026 0.1952 1.5871 5.0187 28.2891
Time default scheduler (s) 1.25 36.58 258.54
Time distributed scheduler (s) 1.09 4.29 14.82 43.14 246.21
Distributed scheduler is (times faster): 1.15 8.52 17.45
Default scheduler compute bandwidth (MB/s): 2.10 5.34 6.14
Distributed scheduler compute bandwidth (MB/s): 2.41 45.49 107.11 116.35 114.90
Compute bandwidth per node with distributed scheduler (MB/(s node)): 0.048 0.910 2.142 2.327 2.298

Final Thoughts

This is experimental work. We encountered the following problems when doing this experiment:

  • It’s hard to deploy worker processes across a cluster – We solved this by making a dask-cluster plugin for anaconda-cluster
  • It’s hard to debug and investigate the state of distributed network – Solved by providing dask clients views to the dask worker state
  • Profiling distributed computation is hard – This was a bit easier because we could switch easily to single-node computations but in the future we’ll try applying new dask profiling methods to the distributed scheduler.

We also have some lingering questions regarding performance:

  • Why does the distributed cluster perform worse than the single-node scheduler per node? This computation should be embarrassingly parallel.
  • 6MB/s of compressed data throughput on a single node is nice but we can probably do much better. As always we should think first about single-core performance before we “go big” with a cluster.

The original notebook for this blog post is available on Anaconda.org. For more information on dask and dask.distributed, see docs. For more information on anaconda cluster, see homepage.