###### Feb 09, 2018
By Anaconda Team

I’ve been working with a large retail bank on their credit modeling system. We're doing interesting work with Dask to manage complex computations (see task graph below) that I’d like to share. This is an example of using Dask for complex problems that are neither a big dataframe nor a big array, but are still highly parallel.

## The Problem

When applying for a loan, like a credit card, mortgage, auto loan, etc., we want to estimate the likelihood of default and the profit (or loss) to be gained. Those models are composed of a complex set of equations that depend on each other. There can be hundreds of equations each of which could have up to 20 inputs and yield 20 outputs. That is a lot of information to keep track of! We want to avoid manually keeping track of the dependencies, as well as messy code like the following Python function:

``````def final_equation(inputs):
out1 = equation1(inputs)
out2_1, out2_2, out2_3 = equation2(inputs, out1)
out3_1, out3_2 = equation3(out2_3, out1)
...
out_final = equation_n(inputs, out,...)
return out_final``````

This boils down to a dependency and ordering problem known as task scheduling.

## DAGs to the rescue

A directed acyclic graph (DAG) is commonly used to solve task scheduling problems. Dask is a library for delayed task computation that makes use of directed graphs at its core. dask.delayed is a simple decorator that turns a Python function into a graph vertex. If I pass the output from one delayed function as a parameter to another delayed function, Dask creates a directed edge between them. Let’s look at an example:

``````def add(x, y):
return x + y

4``````

So here we have a function to add two numbers together. Let’s see what happens when we wrap it with `dask.delayed`:

``````>>> add = dask.delayed(add)
>>> left

`add` now returns a `Delayed` object. We can pass this as an argument back into our `dask.delayed` function to start building out a chain of computation.

``````>>> right = add(1, 1)
>>> four.compute()
4

>>> four.visualize()``````

Below we can see how the DAG starts to come together.

Mock credit example

Let’s assume I’m a mortgage bank and have 10 people applying for a mortgage. I want to estimate the group’s average likelihood to default based on years of credit history and income.

``````hist_yrs = range(10)
incomes = range(10)``````

Let’s also assume that default is a function of the incremented years history and half the years experience. While this could be written like:

``````def default(hist, income):
return (hist + 1) ** 2 + (income / 2)``````

I know in the future that I will need the incremented history for another calculation and want to be able to reuse the code as well as avoid doing the computation twice. Instead, I can break those functions out:

``````

@delayed
def increment(x):
return x + 1

@delayed
def halve(y):
return y / 2

@delayed
def default(hist, income):
return hist**2 + income``````

Note how I wrapped the functions with `delayed`. Now instead of returning a number these functions will return a `Delayed` object. Even better is that these functions can also take `Delayed` objects as inputs. It is this passing of `Delayed` objects as inputs to other `delayed` functions that allows Dask to construct the task graph. I can now call these functions on my data in the style of normal Python code:

``````inc_hist = [increment(n) for n in hist_yrs]
halved_income = [halve(n) for n in income]
estimated_default = [default(hist, income) for hist, income in zip(inc_hist, halved_income)]``````

If you look at these variables, you will see that nothing has actually been calculated yet. They are all lists of `Delayed`objects.

Now, to get the average, I could just take the sum of `estimated_default` but I want this to scale (and make a more interesting graph) so let’s do a merge-style reduction.

``````@delayed
def agg(x, y):
return x + y

def merge(seq):
if len(seq) < 2:
return seq
middle = len(seq)//2
left = merge(seq[:middle])
right = merge(seq[middle:])
if not right:
return left
return [agg(left, right)]

default_sum = merge(estimated_defaults)``````

At this point `default_sum` is a list of length 1 and that first element is the sum of estimated default for all applicants. To get the average, we divide by the number of applicants and call compute:

``````avg_default = default_sum / 10
avg_default.compute() # 40.75``````

To see the computation graph that Dask will use, we call `visualize`:

``avg_default.visualize()``

And that is how Dask can be used to construct a complex system of equations with reusable intermediary calculations.

## How we used Dask in practice

For our credit modeling problem, we used Dask to make a custom data structure to represent the individual equations. Using the default example above, this looked something like the following:

``````class Default(Equation):
inputs = ['inc_hist', 'halved_income']
outputs = ['defaults']

@delayed
def equation(self, inc_hist, halved_income, **kwargs):
return inc_hist**2 + halved_income``````

This allows us to write each equation as its own isolated function and mark its inputs and outputs. With this set of equation objects, we can determine the order of computation (with a topological sort) and let Dask handle the graph generation and computation. This eliminates the onerous task of manually passing around the arguments in the code base. Below is an example task graph for one particular model that the bank actually does.

This graph was a bit too large to render with the normal `my_task.visualize()` method, so instead we rendered it with Gephi to make the pretty colored graph above. The chaotic upper region of this graph is the individual equation calculations. Zooming in we can see the entry point, our input pandas DataFrame, as the large orange circle at the top and how it gets fed into many of the equations.

The output of the model is about 100 times the size of the input so we do some aggregation at the end via tree reduction. This accounts for the more structured bottom half of the graph. The large green node at the bottom is our final output.

## Full Example

``````from dask import delayed

@delayed
def increment(x):
return x + 1

@delayed
def halve(y):
return y / 2

@delayed
def default(hist, income):
return hist**2 + income

@delayed
def agg(x, y):
return x + y

def merge(seq):
if len(seq) < 2:
return seq
middle = len(seq)//2
left = merge(seq[:middle])
right = merge(seq[middle:])
if not right:
return left
return [agg(left, right)]

hist_yrs = range(10)
incomes = range(10)
inc_hist = [increment(n) for n in hist_yrs]
halved_income = [halve(n) for n in incomes]
estimated_defaults = [default(hist, income) for hist, income in zip(inc_hist, halved_income)]
default_sum = merge(estimated_defaults)
avg_default = default_sum / 10
avg_default.compute()
avg_default.visualize() # requires graphviz and python-graphviz to be installed``````

## Acknowledgements

Special thanks to Matt Rocklin, Michael Grant, Gus Cavanaugh, and Rory Merritt for their feedback when writing this article.