% Spark % T. Verbeiren % 9/7/2014





Not covered


Me, Myself and I


Distribution is hard ...


2 world views:


  • High Performance Computing
  • High Throughput Computing

Map / Reduce






(Nothing special ?!)

Word Count in M/R

import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCount {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "wordcount");
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

     public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens()) {
                context.write(word, one);

     public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

        public void reduce(Text key, Iterable<IntWritable> values, Context context) 
          throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            context.write(key, new IntWritable(sum));

What if ...

... we could just write


val y = x map () reduce ()

... this could be extended


val y = x map () filter() map () flatMap () reduce ()

What would be needed?


  1. Platform


  1. Parallel abstraction mechanism


  1. Language support





  • Berkeley University
  • Apache Project
  • v1.0 released on May 30d 2014
  • Written in Scala
  • Supported by DataBricks
  • Used by ...

Language support




Built for low-latency

Abststraction mechanism


Resilient Distributed Datasets



Immutable Collection


Accepting transformations and actions



  • map
  • filter
  • sample
  • union / intersection
  • groupByKey
  • reduceByKey
  • join
  • ...



  • reduce
  • collect
  • count
  • take(n)
  • saveAsTextFile
  • ...

The first example


r = 1/2


$$\text{P}(\text{hitting circle}) \approx \text{Surface circle} = \frac{\pi}{4}$$

import sc._

val N = 10000000

// Generate a sequence of numbers and distribute
val par = parallelize(1 to N)

// Generate a point in 2D unit square
def randomPoint:(Double,Double) = {
    val x = Math.random()
    val y = Math.random()
// Check if a point lies in the unit circle
def inCircle(point:(Double,Double)):Int = {
    if (point._1*point._1 + point._2*point._2 < 1) 1 else 0

// List of hits yes/no
val inCircleList = par map(i => inCircle(randomPoint))

// Return the first 5 elements from the RDD
inCircleList take 5

// Get info about the RDD

// The number of hits
val total = inCircleList reduce (_+_)

// Probability of hitting the circle *4 = Pi
val S = 4. * total / N

From the Spark examples page


val count = parallelize(1 to N).map{i =>
  val x = Math.random()
  val y = Math.random()
  if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / N)

Hadoop M/R in Spark


// Read a file from Hadoop FS, (e.g. Ulysses / Project Gutenberg)
// and process it similar to Hadoop M/R
val file = textFile("Joyce-Ulysses.txt")

// Convert to an array of words in the text
val words = file.flatMap(_.split(" "))

// Map to (key,value) pairs
val mapped = words map (word => (word,1))

// Sort and group by key, 
// Result is of form (key, List(value1, value2, value3, ...))
val grouped = mapped sortByKey() groupByKey()

// The length of the values array yields the amount
val result = grouped map {case (k,vs) => (k,vs.length)}
// But where is the *reduce*?

Be careful with definitions of map and reduce!


// Read a file from Hadoop FS, (e.g. Ulysses / Project Gutenberg)
// and process it similar to Hadoop M/R
val file = textFile("Joyce-Ulysses.txt")

// Convert to an array of words in the text
val words = file.flatMap(_.split(" "))

// Map to (key,value) pairs
val mapped = words map (word => (word,1)) 

// Sort and group by key, 
// Result is of form (key, List(value1, value2, value3, ...))
val grouped = mapped sortByKey() groupByKey()

// The length of the values array yields the amount
// val result = grouped map {case (k,vs) => (k,vs.length)}
val result = grouped map {case (k,vs) => (k, vs reduce (_+_))}

In Spark, we would use:


val file = textFile("Joyce-Ulysses.txt")
val words = file.flatMap(_.split(" "))
val mapped = words map (word => (word,1))
val result = mapped reduceByKey(_+_)
result collect

... and REPL

... and Web interface

... and distributed memory caching


val file = textFile("Joyce-Ulysses.txt")
val words = file.flatMap(_.split(" "))
val mapped = words map (word => (word,1))
// Cache the RDD for later use
val cached = mapped cache()
// Use the cached version
val result = cached reduceByKey(_+_)
// Oops, nothing happens?
// Laziness... oh my

// Count how many times the word 'the' occurs in the text
cached filter {case(word,v) => word=="the"} reduceByKey(_+_) collect

... and interactive use

... and the Ecosystem


Better ?

Faster ?

Easier ?


RDDs Under the Hood



Configuration & Performance



Installation & Deployment




Genomic Data

3 billion base pairs ($3.2 \times 10^9$)

Packaged in chromosomes

~ 3GB for one human


Analysis requires a lot of processing power and storage


Transcription Factors: proteins that bind on region

Coverage: basepair occurence in sequencing

Coverage data:


Chromosome   Position    Sequencing coverage
19           11004       1
19           11005       2
19           11006       2
19           11007       2
19           11008       3
19           11009       3

Transcription Factor data:


> awk 'BEGIN {srand()} !/^$/ { if (rand() <= .00001) print $0}' bedfile.bed

chr1    70529738    70529754    Maf     .   -
chr1    161676477   161676495   Pou2f2  .   -
chr1    176484690   176484699   AP-1    .   -
chr10   6020071     6020084     CTCF    .   -
chr11   1410823     1410838     NF-Y    .   -
chr16   4366053     4366067     YY1     .   +
chr17   77824593    77824602    BAF155  .   +
chr19   10947006    10947013    Rad21   .   -
chr19   49342112    49342121    SIX5    .   +
chr22   39548908    39548922    Irf     .   +
chr7    100048475   100048485   Egr-1   .   -
chr8    119123364   119123374   YY1     .   +
chr8    128562635   128562649   p300    .   -
chr9    14315969    14315982    Egr-1   .   -
chrX    101409366   101409384   CTCF    .   +

// Load files from HDFS
val covFile = sc.textFile("NA12878.chrom19.SLX.maq.SRP000032.2009_07.coverage",8)
val bedFile = sc.textFile("201101_encode_motifs_in_tf_peaks.bed",8)

// Class to hold records from coverage data
class covData(val chr: String, val pos: Int, val cov: Int) {
    def this(line: Array[String]) {
     this(line(0).toString, line(1).toInt, line(2).toInt)

// Class to hold records from Transcription Factor data
class tfsData(val chr: String, val pos1: Int, val pos2:Int, val tf: String) {
    def this(line: Array[String]) {
     this(line(0).toString, line(1).toInt, line(2).toInt, line(3).toString)

// Turn input files into an RDD of objects
val cov ="\\s+")).map(new covData(_))
val tfs ="\\s+")).map(new tfsData(_))

// Count the number of items in both datasets

// Cache in memory
val ccov = cov cache
val ctfs = tfs cache

// Count once for the caching to occur

// Turn coverage data into K/V pairs
val kvcov = => (x.pos,(x.cov))).cache
// Turn TF data into K/V pairs
val kvtfs = ctfs.filter(x => x.chr == "chr19").map(x => (x.pos1,(x.pos2,

// Activate the caching of the coverage data

// Join both datasets together by key
val cjoined = kvcov.join(kvtfs)

// Waaaw, that's fast! In fact, nothing happened yet.
// select 5 entries to see the result but reformat first
val flatjoined = cjoined map { case(x,(y,(z,zz))) => (x,z,zz,y) }
flatjoined take 5


Visualization of Big Data

Visual Analytics

Lazy functional tree zipper

Part of treeDraw :"keydown", function() {
    switch (d3.event.keyCode) {
      case 38: 
        break ;
      case 40:
      case 37: 
      case 39: 

object CovQuery extends SparkJob with NamedRddSupport {

  type Range = (Int, Int)
  type TreePath = List[Int]

  val B = 100     // number of bins, also N
  val Z = 10      // zoom level with every step

  override def validate(sc: SparkContext, config: Config): SparkJobValidation = SparkJobValid

  override def runJob(sc: SparkContext, config: Config): Any = {

    // a map with the persistent RDDs
    val listOfPersistentRDDs = sc.getPersistentRDDs

    val chrCache = namedRdds.get[DataPoint](myRDD).get

    // Info about the interval
    val superMin = low
    val superMax = high

    val subRange: Range = calcRange(path,superMin,superMax)
    val mn = subRange._1
    val mx = subRange._2

    // The bin width, based on the number of bins (N)
    val delta = (mx - mn) / B

    // Only filter the region within the interval
    val chrCache1 = chrCache filter (x => (mn <= x.position) && (x.position <= mx))

    // Create the bins as the first argument of a new triple
    val x1 = chrCache1 map (x => (myDiv(x.position - mn, delta), x.position, x.coverage))

    // sum, max and min can be done on only the bin number and the coverage:
    val fork1 = => (x._1,x._3))

    // The 4 statistics
    val sumSet = fork1.reduceByKey(_ + _)
    val maxSet = fork1.reduceByKey(Math.max(_,_))
    val minSet = fork1.reduceByKey(Math.min(_,_))
    val countSet = => (x._1,1)).reduceByKey(_+_)

    // Join the data and flatten the (nested) result
    val joined = sumSet.join(maxSet).join(minSet).join(countSet)
    val collected = joined.collect().map(x => (x._1, flattenT3(x._2)))

    // The boundaries in the original coordinate can not be derived
    // from the data, because it can not cope with empty regions.
    val lowerboundRange = mn until mx by delta
    val lowerboundIndex = 0 until B
    val lowerbound = lowerboundIndex zip lowerboundRange

    val dataMap> (x._1,List(x._2._1,x._2._2,x._2._3,x._2._4,x._2._1/x._2._4))).toMap.withDefaultValue(List(0,0,0,0,0))

    return => (x._1,List(x._2) ++ dataMap(x._1)))


Paper submitted to LDAV 2014


The end


Some links: