Counting Words: The “Hello World” of MapReduce

In the original Willy Wonka film, if you recall, Charlie attends a chemistry class taught by Mr. Turkentine. Mr. Turkentine asks Charlie what the result would be from a complicated mix of chemicals.Charlie, of course, doesn’t know the answer and Mr. Turkentine says:

Of course you don’t know. You don’t know because only I know. If you knew and I didn’t know, then you’d be teaching me instead of me teaching you – and for a student to be teaching his teacher is presumptuous and rude. Do I make myself clear?

Yes, repetitive use and conjugation of a few words over several sentences can add dramatic effect — but, it’s also confusing, annoying, and something authors typically try to avoid. If Charlie could quickly count the unique words in Mr. Turkentine’s rhetorical insult, then perhaps Charlie, in a snappy retort, could point out this fact to Mr. Turkentine.

This brings us to our canonical MapReduce example: Counting Words. Many MapReduce jobs are simply counting the occurrences of events. These routines are useful for nearly all data types: text, images, weather, finance, particle physics, logs, etc.

So our question is: “What is the total count for each unique word in Mr. Turkentine’s quote?”

Obviously, this can be accomplished in a few Python lines on a single machine, but the example below will form the core of formally understanding MapReduce and how to use Disco’s MapReduce distributed computing framework.

Where We’re Going

Again, the purpose of MapReduce is to distribute counting over several machines in a cluster; data can be split into records and operated on independently. This is often referred to as embarrassingly parallel; little effort is required to decompose the problem into smaller problems. As a sanity check, it’s good to remember that the output of MapReduce should result in less data — often significantly less — than the input.

Level 3 Reiterated

For each word in a line of text, map the word to a key and a value of 1 as a tuple. Partition the key-value pairs into “buckets” or “shards” so that reducers can operate on those buckets. In each reducer, sum the number of counts of each unique word and return a list of unique words and count totals.

Input

Let’s reorganize the quote into several lines:

Of course you don't know. You don't know
because only I know. If you knew and I didn't
know, then you'd be teaching me instead of me teaching you --
and for a student to be teaching a teacher is presumptuous
and rude. Do I make myself clear?

Each line from the quote is passed to the map function. In Disco, splitting lines of text from a file is done by default. Later, we will override the default functionality and use a map reader to handle data which can’t easily be split line by line. We want many nodes working on mapping the data. Disco’s MapReduce framework accomplishes by delivering lines to any available map node.

Map Step

Map is a common function built into many programming language. Generally, map applies a function to every item in a list. In the map step, we map each word to a key-value pair tuple. We’ll use the word as a key and the numerical value 1. as the value. The map function, in Disco has two arguments — line (a line of text) and params (which we will ignore for now). Map Step

The line is split into words, each word is stripped of punctuation, and all letters are converted to lower case. The map function applied to a line of text, returns a series of tuples in form of “word,1” or

[(“of”, 1), (“course”, 1), (“you”, 1), (“don’t”,1), (“know”,1), (“you”,1)…]

def map(line, params):
     import string
     for word in line.split():
                    strippedWord = word.translate(string.maketrans("",""), string.punctuation)
                    yield strippedWord, 1

 

Python

{ accordion(toggle_name=”Series of Tuples Explained”,inner_define=”The series of tuples returned from the map is called an iterable — which can be treated in a similar manner to a list i.e, [i for i in iterable] — though not directly callable as in: iterable[0]. With larger and larger data sets, we need to be able to process data sizes larger than memory space found on common machines. If the map function stores the key-value pairs in a Python object to be returned when complete, we will be in danger of exceeding that memory space. Using the keyword yield, gives Disco the power to consume and manage data in a much more flexible way.”,_href=”tuples”

Partition Step

Next, we need to assign each key-value pair to a particular reducer. This is accomplished by partitioning, and is often thought of as placing data into buckets. (Sometimes I think MapReduce should really be MapPartitionReduce since partitioning is such an important step). Partitioning should spread the data as evenly as possible over the “buckets” resulting in each reducer processing the same amount of data.

A common method of accomplishing such a task is to create a hash of the word and take the resulting value modulo the number of reducers.

def default_partition(key, nr_partitions, params):
     return hash(str(key)) % nr_partitions

 

Python

The table below demonstrates the results from hash and modulo functions on words in the text.

Word Hash Hash % 4
of -1463488791 4
course 2334184425 0
you 1116843962 2
don’t -482782459 1
know 326123353 3
instead -1683785287 3
of -1463488791 4
me -1475813024 1
teaching -1354514375 0

In bucket 2 (words with hash(key)% 5 equal to 2) we have tuples such as:

 

[(“you”, 1), (“you”, 1), (“then”,1), (“and”,1), (“be”,1),(“only”,1), (“and”,1),(“rude”,1),(“you”,1)…]

How can we be sure that each instance of know the text goes to the same reducer?

Because each instance of know is placed into the same reducer by the partition function.

How does the reducer know which partition it’s operating on?

It doesn’t! Reducers are unaware of the partition they are operating on.

Reduce Step

Reduce is another common built-in function which, traditionally, operates on two arguments and returns one value; often, this is an algebraic expression. Within the context of MapReduce, the Reduce step is given the iterable of key-value pairs and combines the values into an answer or list of answers. For counting words, we sum the values of keys which compare equal — that is, count the number of you’s, the’s, etc.

def reduce(iter, params):
     from disco.util import kvgroup
     for word, counts in kvgroup(sorted(iter)):
          yield word, sum(counts)

Python is wonderfully succinct, but for the inexperienced programmer, it can be more on the cryptic side of coding. reduce takes two arguments: iter (an iterable object) and param (ignored for now). Remember, the goal is to count the occurrence of each word. The reduce function above takes advantage of two Python functions: sorted and kvgroup. iter isn’t really a list — it’s an iterable object, but we still use it in similar ways to a list. The function sorted sorts any iterable: dictionary, lists, tuples, etc. and returns a sorted list. By default sorted compares the elements directly, but one can define any key to be used by sorted. In our case, we have tuples of (word, 1), and we want to sort them alphabetically.

Sorting Example

>>>iterList = [("course", 1), ("you", 1), ("you", 1), ("then",1), ("and",1), ("then",1),("only",1)]
>>>sorted_iterList = sorted(iterList)
>>>sorted_iterList
[('and', 1), ('course', 1), ('only', 1), ('then', 1), ('then', 1), ('you', 1), ('you', 1)]

On the sorted list we then apply the kvgroup function. Similar to Python’s groupby, Disco’s kvgroup, groups values of consecutive keys which compare equal. kvgroup returns the key and a generator of values. A generator is a function which returns an iterable — and again, can be used similarly to a list. Starting from the sorted list above:

sorted/kvgroup breakdown
>>>from disco.util import kvgroup
>>>sorted_iterList
>>>for k,v in kvgroup(sorted_iterList):
....: print k,v
....:
and at 0xa28a2ac>
course at 0xa28a2fc>
only at 0xa28a2ac>
then at 0xa28a2fc>
you at 0xa28a2ac>

 

Python

 

 

Recall the reduce function:

def reduce(iter, params):
     from disco.util import kvgroup
     for word, counts in kvgroup(sorted(iter)):
          yield word, sum(counts)

 

Python

From the map function an iterable of key-value tuples is yielded. That iterable, the keys (which are words), are sorted alphabetically and the values of the key-value tuples are grouped. Lastly, for every word and group of values, we return the word and the sum of those values.

sums are incorrect and the same keys are returned at different times.

And again with graphics!

Put words in a bucket, sort the bucket, and count the unique items: Map Step

Output

From the reducers print the word and total count of the occurrences:

for word, count in result_iterator(job.wait(show=True)):
     print word, count

 

Python

Conculsion

MapReduce takes advantage of commodity clusters by spreading the computation over each node. Like many frameworks, MapReduce offers the following:

Learning the right abstraction will simplify your life.” — Travis Oliphant

Disco offers a Python MapReduce framework which allows me to easily process data I otherwise would have difficulty doing.


About the Author

Ben Zaitlen

Data Scientist

Ben Zaitlen has been with the Anaconda Global Inc. team for over 5 years.

Read more

Join the Disucssion