At several hundred Terabytes and growing, the 1000 Genomes Project is clearly in the realm of Big Data. Earlier this year Chris Mueller gave two great presentations at PyData and PyGotham outlining the Disco MapReduce Framework and how to use it for processing high-throughput gene sequencing data. I highly encourage everyone to supplement their Disco MapReduce education with his PyData Talk and PyGotham Talk.

With Chris’s help, I have fleshed out his gene sequencing low-coverage analysis in this tutorial. To follow along with this tutorial, download Anaconda, Continuum’s “batteries included” Python Big Data distribution.”

1000 Genomes Project Outline

The 1000 Genomes project collects and stores a variety of gene sequencing data from over 2500 people and 27 countries. The goal of the project is to understand the relationship between genotype and phenotype. One method of understanding this complex relationship is to look at how genes vary across individuals and populations. By doing so we can begin to untangle the complex relationship of diseases and traits, as well as our cryptic genetic code.

Currently, the project has collected over 200 TB of data and is publicly available on Amazon’s S3. Within each of the three pilot studies the project offers two kinds of data: FASTQ Sequence Data, which contains all the reads of DNA fragments; and BAM Data (Binary version of SAM), which contains aligned reads that map back to the current reference genome: GRCh27

Questions and Early Results

What is the coverage for each position for each Chromosome [1-22,X,Y,MT]? That is, what is the number of DNA bases observed at each position?

Input (Streaming)

The BAM files stored in S3 are 15-100GB depending on the method of gene sequencing. Transfer rates vary between S3 and the size of the EC2 instance; for M1.Large, rates have been reported between 500 KB/s and 50 MB/s, but generally users experience around USB speeds, ~35MB/s. Because copying, storing, and pushing data to ddfs is a waste of time and money, we will employ a streaming solution within Disco: when streaming text, Disco uses the default map reader which delivers lines of text to any available mapper. BAM files are binary, so we’ll need to use PySam to read the data and yield a record to the mapper. In a way, we are are mapping to the mappers.

input = [''],


BAM files contain a lot of information: mapping quality, sequence of the read, CIGAR strings, several flags, and a few other fields important to mapping. Please review the sam file format for the complete list of fields. We are interested in the reference (chromosome), position, and length:

obj.qlen = 72 #length of read
obj.pos = 15657595 #position
obj.tid = 10 #chromosome. must call sam.getrname(read.tid) to get true chromosome reference

Map Reader

Disco’s Map Reader allows us to define how data is delivered to the mappers. In the case of binary data, tools must be provided to read and manipulate files.

def sam_url_reader(stream, size, url, params):
    import tempfile
    import pysam
    cache = tempfile.NamedTemporaryFile(dir='/mnt')
    BLOCK_SIZE = 4*(1024**2)
    block =
    while block != "":
        block =
    sam = pysam.Samfile(
    for read in sam:
        yield (sam.getrname(read.tid), read)



There are a few things to point out. First, Disco has no concept of globals; as such, we must import modules in each function. In the case of the map reader, we will need two modules: PySam and tempfile.

With files exceeding 10s of GBs we need a solution that lets us manage the data efficently. The Python module tempfile creates a temporary file which is deleted as soon as the file is closed. Rather than stream and store all the data, tempfile allows us to create small files of BLOCK_SIZE, load them into DDFS, then remove the files after yielding to the map function.

After streaming a chunk of data to a tempfile, we use the PySam Python module which knows how to read BAM files. Lastly, we yield tuples of the reference (the chromosome number/type) and a PySam Python object to the mappers.


The map should be straightforward — get the record from the binary file and yield a tuple of the reference (Chromosome), the position, and the length of the alignment.

def read_coverage_map(rec, params):
    ref, read = rec
    yield '%s:%d' % (ref, read.pos), read.qlen




Returning to our question reminds us how to partition the data: What is the coverage for each chromosome? Humans have 22 autosomes (1-22), 2 sex chromosomes (X,Y), and a Mitochondrial Chromosome (MT). Splitting on the chromosome type is the logical choice:

def chr_partition(key, nrp, params):
    chr = key.split(':')[0]
    if chr == 'X': return 24
    elif chr == 'Y': return 25
    elif chr == 'MT': return 0
            return int(chr)
        except ValueError: #avoid extraneous references



Notice the try-except statement. Contained in the BAM files are references such as GL0002261:1487—this is a unplaced contig (an overlapping sequence of DNA). In this case, the contig is known to be in the reference genome but can’t be placed accurately in any chromosome. Often, this is the case for DNA sequences which repeat. We are not interested in these unplaced contigs and they are therefore ignored.


With the buckets ordered and keyed by the reference, we can use the key to determine the size of the NumPy array required to store the coverage data. We want each reduce job to have one NumPy array storing the count at each position. Chromosomes are numbered in descending order of size, with 1 being the largest and 22 the smallest. (Chromosome Lengths)

def coverage_reduce(reduce_iter, params):
    import numpy
    chrs = { # Chromosome sizes
        'Y':100000000 }



Just as in the previous tutorial, reduce_iter is an iterator — a list-like object. Contained in that object is data in the following form: [(chromosome:position),length] or [ [(11:432345), 10], [(11:2321), 45], [(11:9984), 100], ….].

One way in which an iterator is different from a list is that as the object is iterated, it is consumed. That is, after running a loop over an iterator, the data in the iterator is no longer accessible. To overcome this problem, Python’s iter function will return an iterator of the object.

iter(object).next() returns the next element in the list, which is the first element: [(11:432345), 10].



p, l = iter(reduce_iter).next()
chr, pos = p.split(':')
c = numpy.zeros(chrs[chr])
for p, l in reduce_iter:
    chr, pos = p.split(':')
    pos = int(pos); l = int(l)
    c[pos:pos+l] += 1
yield (chr, ' '.join((str(int(i)) for i in c)))



Iterate through all the items in reduce_iter, store the count at each position, and we’re finished. Yield the resulting NumPy array as a string and output the results to file.

Collection and Output:

filePath = '/mnt/'
for chr, coverage in result_iterator(job.wait(show=True)):
     out = open(filePath+chr+'_coverage.out', 'w')
     out.write('%s %sn' % (chr, coverage))
import os
from data_binner import makePlot
fileHandleList = (fname for fname in os.listdir('/mnt') if fname.endswith('.out'))



In lines 6-10, I use a small plotting routine to help generate the plots.


Immediately, you should be drawn to the spikes and the large valley in the center of the plot. Spikes usually are the result of mismapped reads: highly repetitive sequences often get mapped to the same location and oversampling of the same location can also result in spikes. The valley near the center is the Centromere. This region has not yet been assembled so no reads will map to this location.

A plot of coverage data is useful for a few reasons:

  • If the features — the spikes and valleys — are missing, then we know something is probably wrong experimentally or in the alignment analysis. In a way, the coverage plots provide valuable feedback for the accuracy of the sequencing.

  • If you have correctly accounted for experimental and analytical artifacts, coverage maps can help identify which regions may have biological significance.

Download Example Code

Next Week…

In the next post, I will show you how to easily spin up an Anaconda cluster with EC2 and StarCluster using only 1 command.