Processing Data with Spark

The World Before Spark

Parallelism

Parallelism is about speeding up computations by using multiple processors.

  • Task parallelism: Different computations performed on the same data
  • Data parallelism: Apply the same computation on dataset partitions

When processing big data, data parallelism is used to move computations close to the data, instead of moving data in the network.

Issues with data parallelism

  • Latency: Operations are 1.000x (disk) or 1.000.000x (network) slower than accessing data in memory
  • (Partial) failures: Computations on 100s of machines may fail at any time

This means that our programming model and execution should hide (but not forget!) those.

Map/Reduce

Map/Reduce is a general computation framework, loosely based on functional programming[1]. It assumes that data exists in a K/V store

  • map((K1, V1), f: (K1, V1) -> (K2, V2)): List[(K2, V2)]
  • reduce((K2, List[V2])): List[(K3, V3)]

Map/Reduce as a system was proposed by Dean & Ghemawat [2], along with the GFS.

Map/Reduce execution overview

  • DFS chunks are assigned to Map tasks processing each chunk into a sequence of KV pairs.
  • Periodically, the buffered pairs are written to local disk.
  • The keys are divided among all Reduce tasks.
  • Reduce tasks work on each key separately and combine all the values associated with a specific key.

Hadoop: OSS Map/Reduce

Hadoop cluster
Hadoop cluster

Hadoop: Pros and Cons

What is good about Hadoop: Fault tolerance

Hadoop was the first framework to enable computations to run on 1000s of commodity computers.

What is wrong: Performance

  • Before each Map or Reduce step, Hadoop writes to HDFS
  • Hard to express iterative problems in M/R
    • All machine learning problems are iterative
The Hadoop Workflow
The Hadoop Workflow

DryadLINQ

Optional reading (not part of the exam material)

Microsoft’s DryadLINQ combined the Dryad distributed execution engine with the LINQ language for defining queries.

DryadLINQ architecture
DryadLINQ architecture

FlumeJava

Optional reading (not part of the exam material)

Google’s FlumeJava attempted to provide a few simple abstractions for programming data-parallel computations. These abstractions are higher-level than those provided by MapReduce, and provide better support for pipelines.

PTable<String,Integer> wordsWithOnes =
  words.parallelDo( new DoFn<String, Pair<String,Integer>>() {
    void process(String word,
                 EmitFn<Pair<String,Integer>> emitFn) {
      emitFn.emit(Pair.of(word, 1));
    }
  }, tableOf(strings(), ints()));

PTable<String,Collection<Integer>>
  groupedWordsWithOnes = wordsWithOnes.groupByKey();

PTable<String,Integer> wordCounts =
  groupedWordsWithOnes.combineValues(SUM_INTS);

Spark and RDDs

What is Spark?

Spark is on open source cluster computing framework.

  • automates distribution of data and computations on a cluster of computers
  • provides a fault-tolerant abstraction to distributed datasets
  • is based on functional programming primitives
  • provides two abstractions to data, list-like (RDDs) and table-like (Datasets)
Spark Logo
Spark Logo

Resilient Distributed Datasets (RDDs)

RDDs are the core abstraction that Spark uses.

RDDs make datasets distributed over a cluster of machines look like a Scala collection. RDDs:

  • are immutable
  • reside (mostly) in memory
  • are transparently distributed
  • feature all FP programming primitives
    • in addition, more to minimize shuffling

In practice, RDD[A] works like Scala’s List[A]

Counting words with Spark

// http://classics.mit.edu/Homer/odyssey.mb.txt
val rdd = sc.textFile("./datasets/odyssey.mb.txt")
rdd
  .flatMap(l => l.split(" "))          // Split file in words
  .map(x => (x, 1))                    // Create key,1 pairs
  .reduceByKey((acc, x) => acc + x)    // Count occurrences of same pairs
  .sortBy(x => x._2, false)            // Sort by number of occurrences
  .take(50)                            // Take the first 50 results
  .foreach(println)

How to create an RDD?

RDDs can only be created in the following 3 ways

  1. Reading data from external sources
val rdd1 = sc.textFile("hdfs://...")
val rdd2 = sc.textFile("file://odyssey.txt")
val rdd3 = sc.textFile("s3://...")

. . .

  1. Convert a local memory dataset to a distributed one
val xs: Range = Range(1, 10000)
val rdd: RDD[Int] = sc.parallelize(xs)

. . .

  1. Transform an existing RDD
rdd.map(x => x.toString) //returns an RDD[String]

RDDs are lazy!

There are two types of operations we can do on an RDD:

  • Transformation: Applying a function that returns a new RDD. They are lazy.

  • Action: Request the computation of a result. They are eager.

// This just sets up the pipeline
val result = rdd
  .flatMap(l => l.split(" "))
  .map(x => (x, 1))

// Side-effect, triggers computation
result.foreach(println)

Common transformations on RDDs

map: Apply \(f\) on all items in the RDD and return an RDD of the result.

\(RDD[A].map(f: A \rightarrow B) : RDD[B]\)

. . .

flatMap: Apply \(f\) on all RDD contents, return an RDD with the contents of all intermediate iterators.

\(RDD[A].flatMap(f: A \rightarrow Iterable[B]): RDD[B]\)

. . .

filter: Apply predicate \(p\), return items that satisfy it.

\(RDD[A].filter(p: A \rightarrow Boolean): RDD[A]\)

Examples of RDD transformations

All uses of articles in the Odyssey

val odyssey = sc.textFile("datasets/odyssey.mb.txt")
                 .flatMap(_.split(" "))

odyssey.map(_.toLowerCase)
        .filter(Seq("a", "the").contains(_))

Q: How can we find uses of all regular verbs in past tense?

. . .

odyssey.filter(x => x.endsWith("ed"))

Q: How can we remove all punctuation marks?

. . .

odyssey.map(x => x.replaceAll("\\p{Punct}", ""))

More examples of RDD transformations

Q: How can we find uses of distinct regular verbs in past tense?

odyssey.filter(x => x.endsWith("ed")).distinct

. . .

Q: How can we sort the RDD?

odyssey.sortBy(x => x)

. . .

Q: How can we sample data from the RDD?

val trainAndTestData = odyssey.randomSplit(Array(0.8, 0.2))

Common actions on RDDs

collect: Return all elements of an RDD

\(RDD[A].collect() : Array[A]\)

. . .

take: Return the first n elements of the RDD

\(RDD[A].take(n): Array[A]\)

. . .

reduce, fold: Combine all elements to a single result of the same type.

\(RDD[A].reduce(f: (A, A) \rightarrow A): A\)

. . .

aggregate: Aggregate the elements of each partition, and then the results for all the partitions

\(RDD[A].aggr(init: B)(seqOp: (B, A) \rightarrow B, combOp: (B, B) \rightarrow B): B\)

Examples of RDD actions

How many words are there (using reduce)?

val odyssey = sc.textFile("datasets/odyssey.mb.txt").flatMap(_.split(" "))

odyssey.map(x => 1).reduce((a,b) => a + b)

How many words are there (using aggregate)?

odyssey.map(x => 1).aggregate(0)(_ + _, _ + _)

How to calculate average of a collection of integers (using aggregate)?

val inputRdd = sc.parallelize(List(1, 2, 3, 4, 5))
val forAvg = inputRdd.aggregate((0 ,0))((acc, value) => (acc._1 + value, acc._2 + 1),
  (acc1, acc2) => (acc1._1 + acc2._1, (acc1._2 + acc2._2)))
val avg = forAvg._1.toDouble / forAvg._2.toDouble

Pair RDDs

RDDs can represent any complex data type, if it can be iterated. Spark treats RDDs of the type RDD[(K,V)] as special, named PairRDDs, as they can be both iterated and indexed.

Operations such as join are only defined on Pair RDDs, meaning that we can only combine RDDs if their contents can be indexed.

We can create Pair RDDs by applying an indexing function, using keyBy function, or by grouping records:

val rdd = sc.parallelize(List("foo", "bar", "baz")) // RDD[String]

val pairRDD = rdd.map(x => (x.charAt(0), x))  // RDD[(Char, String)]
pairRDD.collect // Array((f,foo), (b,bar), (b,baz))

val pairRDD2 = rdd.keyBy(x => x.toLowerCase.head) // RDD[(Char, String)]
pairRDD2.collect // Array((f,foo), (b,bar), (b,baz))

val pairRDD3 = rdd.groupBy(x => x.charAt(0))  // RDD[(Char, Iterable(String))]
pairRDD3.collect // Array((b,CompactBuffer(bar, baz)), (f,CompactBuffer(foo)))

Transformations on Pair RDDs

The following functions are only available on RDD[(K,V)]

reduceByKey: Merge the values for each key using an associative and commutative reduce function

\(reduceByKey(f: (V, V) \rightarrow V): RDD[(K, V)]\)

. . .

aggregateByKey: Aggregate the values of each key, using given combine functions and a neutral “zero value”

\(aggrByKey(zero: U)(f: (U, V) \rightarrow U, g: (U, U) \rightarrow U): RDD[(K, U)]\)

. . .

join: Return an RDD containing all pairs of elements with matching keys

\(join(b: RDD[(K, W)]): RDD[(K, (V, W))]\)

Pair RDD examples: groupByKey and reduceByKey

val odyssey = sc.textFile("sample.txt").flatMap(_.split(" "))
val words = odyssey.flatMap(_.split(" ")).map(c => (c, 1))

Word count using groupByKey:

val counts = words.groupByKey() // RDD[(String, Iterable[Int])]
  .map(row => (row._1, row._2.sum)) // RDD[(String, Int)]
  .collect() // Array[(String, Int)]

Word count using reduceByKey:

val counts2 = words.reduceByKey(_ + _) // RDD[(String, Int)]
  .collect()  // Array[(String, Int)]

Q: It’s more efficient to use reduceByKey over groupByKey. Why?

Pair RDD example: aggregateByKey

Word count using aggregateByKey:

  val counts3 = words.aggregateByKey(0)(_ + _, _ + _) // RDD[(String, Int)]

How can we count the number of occurrences of part of speech elements?

object PartOfSpeech {
  sealed trait Word
  case object Verb extends Word
  case object Noun extends Word
  case object Article extends Word
  case object Other extends Word
}

def partOfSpeech(w: String): Word = ???

odyssey.groupBy(partOfSpeech)
      .aggregateByKey(0)((acc, x) => acc + x.size,
                          (x, y) => x + y)

Pair RDD example: join

case class Person(id: Int, name: String)
case class Addr(id: Int, person_id: Int,
                address: String, number: Int)

val pers = sc.textFile("pers.csv") // id, name
val addr = sc.textFile("addr.csv") // id, person_id, street, num

val ps = pers.map(_.split(",")).map(x => Person(x(0).toInt, x(1)))
val as = addr.map(_.split(",")).map(x => Addr(x(0).toInt, x(1).toInt,
                                             x(2), x(3).toInt))

Q: What are the types of ps and as? How can we join them?

. . .

val pairPs = ps.keyBy(_.id) // RDD[(Int, Person)]
val pairAs = as.keyBy(_.person_id) // RDD[(Int, Addr)]

val addrForPers = pairAs.join(pairPs) // RDD[(Int, (Addr, Person))]

Join types

Given a “left” RDD[(K,A)] and a “right” RDD[(K,B)]

  • Inner Join (join): The result contains only records that have the keys in both RDDs.
  • Outer joins (left/rightOuterJoin): The result contains records that have keys in either the “left” or the “right” RDD in addition to the inner join results.

\(left.loj(right) : RDD[(K,(A, Option[B]))]\) \(left.roj(right) : RDD[(K,(Option[A], B))]\)

  • Full outer join: The result contains records that have keys in any of the “left” or the “right” RDD in addition to the inner join results.

\(left.foj(right) : RDD[(K,(Option[A], Option[B]))]\)

Types of joins
Types of joins

RDDs under the hood

Internals

From the RDD.scala, line 61.

Internally, each RDD is characterized by five main properties:

  • A list of partitions
  • A function for computing each split
  • A list of dependencies on other RDDs
  • Optionally, a Partitioner for key-value RDDs
  • Optionally, a list of preferred locations to compute each split on

D: Why does an RDD need are all those?

Partitions

Even though RDDs might give the impression of continuous memory blocks spread across a cluster, data in RDDs is split into partitions.

Partitions define a unit of computation and persistence: any Spark computation transforms a partition to a new partition.

If during computation a machine fails, Spark will redistribute its partitions to other machines and restart the computation one those partitions only.

The partitioning scheme of an application is configurable; better configurations lead to better performance.

How does partitioning work?

Spark supports 3 types of partitioning schemes:

  • Default partitioning: Splits in equally sized partitions, without knowing the underlying data properties.

Extended partitioning is only configurable on Pair RDDs.

  • Range partitioning: Takes into account the natural order of keys to split the dataset in the required number of partitions. Requires keys to be naturally ordered and keys to be equally distributed across the value range.

  • Hash partitioning: Calculates a hash over each item key and then produces the modulo of this hash to determine the new partition. This is equivalent to:

key.hashCode() % numPartitions

Partition dependencies

Partition Dependencies
Partition Dependencies

Narrow dependencies: Each partition of the source RDD is used by at most one partition of the target RDD

Wide dependencies: Multiple partitions in the target RDD depend on a single partition in the source RDD.

Shuffling

When operations need to calculate results using a common characteristic (e.g. a common key), this data needs to reside on the same physical node. This is the case with all “wide dependency” operations. The process of re-arranging data so that similar records end up in the same partitions is called shuffling.

Shuffling is very expensive as it involves moving data across the network and possibly spilling them to disk (e.g. if too much data is computed to be hosted on a single node). Avoid it at all costs!

See this link for a great write up on Spark shuffling

Shuffling example

Shuffling explained
Shuffling explained

Choose transformations to minimize shuffles:

Recall groupByKey and reduceByKey:

val counts1 = rdd.groupByKey()
  .map(row => (row._1, row._2.sum))  
  .collect()

val counts2 = rdd.reduceByKey(_ + _)
  .collect()

RDD lineage

RDDs contain information on how to compute themselves, including dependencies to other RDDs. Effectively, RDDs create a directed acyclic graph of computations.

Lineage information allow an RDD to be traced to its ancestors.

To demonstrate this, consider the following example:

val rdd1 = sc.parallelize(0 to 10)
val rdd2 = sc.parallelize(10 to 100)
val rdd3 = rdd1.cartesian(rdd2)
val rdd4 = rdd3.map(x => (x._1 + 1, x._2 + 1))

Q: What are the computation dependencies here?

. . .

scala> rdd4.toDebugString
res3: String =
(16) MapPartitionsRDD[3] at map at <console>:30 []
 |   CartesianRDD[2] at cartesian at <console>:28 []
 |   ParallelCollectionRDD[0] at parallelize at <console>:24 []
 |   ParallelCollectionRDD[1] at parallelize at <console>:24 []

Another example for lineage

val input = sc.textFile("sample.txt")

val tokenized = input.flatMap(_.split(" "))
  .filter(x => x.length > 1)

val result = tokenized.map(x => (x, 1))
  .reduceByKey((a,b) => a + b)
  .sortBy(_._1)
> input.toDebugString
(2) sample.txt MapPartitionsRDD[1] at textFile at <console>:24 []
 |  sample.txt HadoopRDD[0] at textFile at <console>:24 []
> tokenized.toDebugString
(2) MapPartitionsRDD[3] at filter at <console>:25 []
 |  MapPartitionsRDD[2] at flatMap at <console>:25 []
 |  sample.txt MapPartitionsRDD[1] at textFile at <console>:24 []
 |  sample.txt HadoopRDD[0] at textFile at <console>:24 []
> result.toDebugString
(2) MapPartitionsRDD[10] at sortBy at <console>:25 []
 |  ShuffledRDD[9] at sortBy at <console>:25 []
 +-(2) MapPartitionsRDD[6] at sortBy at <console>:25 []
    |  ShuffledRDD[5] at reduceByKey at <console>:25 []
    +-(2) MapPartitionsRDD[4] at map at <console>:25 []
       |  MapPartitionsRDD[3] at filter at <console>:25 []
       |  MapPartitionsRDD[2] at flatMap at <console>:25 []
       |  sample.txt MapPartitionsRDD[1] at textFile at <console>:24 []
       |  sample.txt HadoopRDD[0] at textFile at <console>:24 []

Persistence

Data in RDDs is stored in three ways:

  • As Java objects: Each item in an RDD is an allocated object
  • As serialized data: Special (usually memory-efficient) formats. Serialization is more CPU intensive, but faster to send across the network or write to disk.
  • On the filesystem: In case the RDD is too big, it can be mapped on a file system, usually HDFS.

Persistence in Spark is configurable and can be used to store frequently used computations in memory, e.g.:

val rdd = sc.textFile("hdfs://host/foo.txt")
val persisted = rdd.map(x => x + 1).persist(StorageLevel.MEMORY_ONLY_SER)

persisted is now cached. Further accesses will avoid reloading it and applying the map function.

Persistence storage levels

Storage level Meaning
MEMORY_ONLY Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they’re needed. This is the default level.
MEMORY_AND_DISK Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don’t fit on disk, and read them from there when they’re needed.
MEMORY_ONLY_SER \ (Java and Scala) Store RDD as serialized Java objects (one byte array per partition). More space-efficient than deserialized objects but more CPU-intensive to read.
MEMORY_AND_DISK_SER (Java and Scala) Similar to MEMORY_ONLY_SER, but spill partitions that don’t fit in memory to disk instead of recomputing them on the fly each time they’re needed.
DISK_ONLY Store the RDD partitions only on disk.

Spark Applications

Spark cluster architecture

Spark cluster architecture
Spark cluster architecture
  • Executors do the actual processing; worker nodes can contain multiple executors.
  • The driver accepts user programs and returns processing results
  • The cluster manager does resource allocation

A Spark application

A Spark application:

  • begins by specifying the required cluster resources, which the cluster manager seeks to fulfil. If resources are available, executors are started on worker nodes.
  • creates a Spark context, which acts as the driver.
  • issues a number of jobs, which load data partitions on the executor heap and run in threads on the executor’s CPUs
  • when it finishes, the cluster manager frees the resources.

What is a Spark job?

Jobs are created when an action is requested. Spark walks the RDD dependency graph backwards and builds a graph of stages.

Stages are jobs with wide dependencies. When such an operation is requested (e.g. groupByKey or sort) the Spark scheduler will need to reshuffle/repartition the data. Stages (per RDD) are always executed serially. Each stage consists of one or more tasks.

Tasks is the minimum unit of execution; a task applies a narrow dependency function on a data partition. The cluster manager starts as many jobs as the data partitions.

A job graph

We can see how our job executes in we connect to our driver’s WebUI (port 4040 on the driver machine).

Here is the graph for the word counting job we saw before.

val result = sc.textFile("sample.txt")
  .flatMap(_.split(" "))
  .filter(x => x.length > 1)
  .map(x => (x, 1))
  .reduceByKey((a,b) => a + b)
  .sortBy(_._1)

Fault tolerance

Spark uses RDD lineage information to know which partition(s) to recompute in case of a node failure.

Recomputing happens at the stage level.

To minimize recompute time, we can use checkpointing. With checkpointing we can save job stages to reliable storage. Checkpointing effective truncates the RDD lineage graph.

Spark clusters are reliable to node failures, but not to master failures. Running Spark on middleware such as YARN or Mesos is the only way to run multi-master setups.

Effective Spark

Requesting resources

Here is an example of how to start an application with a custom resource configuration.

spark-shell   \
    --master spark://spark.master.ip:7077 \
    --deploy-mode cluster  \
    --driver-cores 12
    --driver-memory 5g \
    --num-executors 52 \
    --executor-cores 6 \
    --executor-memory 30g

RDDs and formatted data

RDDs by default do not impose any format on the data they store. However, if the data is formatted (e.g. log lines with known format), we can create a schema and have the Scala compiler type-check our computations.

Consider the following data (common log format):

127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] \
 "GET /apache_pb.gif HTTP/1.0" 200 2326

We can map this data to a Scala case class

case class LogLine(ip: String, id: String, user: String,
                   dateTime: Date, req: String, resp: Int,
                   bytes: Int)

and use a regular expression to parse the data:

([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) "(.+)" (\d+) (\d+)

Then, we can use flatMap in combination with Scala’s pattern matching to filter out bad lines:

import java.text.SimpleDateFormat
import java.util.Date

val dateFormat = "d/M/y:HH:mm:ss Z"
val regex = """([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) "(.+)" (\d+) (\d+)""".r
val rdd = sc
    .textFile("access-log.txt")
    .flatMap ( x => x match {
      case regex(ip, id, user, dateTime, req, resp, bytes) =>
        val df = new SimpleDateFormat(dateFormat)
        new Some(LogLine(ip, id, user, df.parse(dateTime),
                         req, resp.toInt, bytes.toInt))
      case _ => None
      })

Then, we can compute the total traffic per month

val bytesPerMonth = rdd
    .groupBy(k => k.dateTime.getMonth)
    .aggregateByKey(0)({(acc, x) => acc + x.map(_.bytes).sum},
                      {(x,y) => x + y})

Notice that all code on this slide is type checked!

Connecting to databases

The data sources that Spark can use go beyond textFiles. Spark can connect to databases such as

  • MongoDB
val readConfig = ReadConfig(Map("uri" -> "mongodb://127.0.0.1/github.events"))
sc.loadFromMongoDB(readConfig)
val events = MongoSpark.load(sc, readConfig)

events.count
  • MySQL or Postgres over JDBC
val users = spark.read.format("jdbc").options(
  Map("url" ->  "jdbc:mysql://localhost:3306/ghtorrent?user=root&password=",
  "dbtable" -> "ghtorrent.users",
  "fetchSize" -> "10000"
  )).load()

users.count

or to distributed file systems like HDFS, Amazon S3, Azure Data Lake etc

Optimizing partitioning

Partitioning becomes an important consideration when we need to run iterative algorithms. Some cases benefit a lot from defining custom partitioning schemes:

  • Joins between a large, almost static dataset with a much smaller, continuously updated one.
  • reduceByKey or aggregateByKey on RDDs with arithmetic keys benefit from range partitioning as the shuffling stage is minimal (or none) because reduction happens locally!

Broadcasts

From the docs: Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.

Broadcasts are often used to ship precomputed items, e.g. lookup tables or machine learning models, to workers so that they do not have to retransfer them on every shuffle.

With broadcasts, we can implement efficient in-memory joins between a processed dataset and a lookup table.

val curseWords = List("foo", "bar") // Use your imagination here!
val bcw = sc.broadcast(curseWords)

odyssey.filter(x => !curseWords.contains(x))

Broadcasted data should be relatively big and immutable.

Accumulators

Some times we need to keep track of variables like performance counters, debug values or line counts while computations are running.

// Bad code
var taskTime = 0L
odyssey.map{x =>
  val ts = System.currentTimeMillis()
  val r = foo(x)
  taskTime += (System.currentTimeMillis() - ts)
  r
}

To make it work, we need to define an accumulator

val taskTime = sc.accumulator(0L)
odyssey.map{x =>
  val ts = System.currentTimeMillis()
  val r = foo(x)
  taskTime += (System.currentTimeMillis() - ts)
  r
}

Using accumulators is a side-effecting operation and should be avoided as it complicates design. It is important to understand that accumulators are aggregated at the driver, so frequent writes will cause large amounts of network traffic.

Spark’s Toolkit

Image credits

Bibliography

[1]
R. Lämmel, “Google’s MapReduce programming model—revisited,” Science of computer programming, vol. 70, no. 1, pp. 1–30, 2008.
[2]
J. Dean and S. Ghemawat, “MapReduce: Simplified data processing on large clusters,” 2004.
[3]
B. Chambers and M. Zaharia, Spark: The definitive guide. O’Reilly Media, Inc., 2017.
[4]
M. Zaharia, M. Chowdhury, M. J. Franklin, S. Shenker, and I. Stoica, “Spark: Cluster computing with working sets.” HotCloud, vol. 10, no. 10–10, p. 95, 2010.
[5]
Y. Yu et al., DryadLINQ: A system for general-purpose distributed data-parallel computing using a high-level language,” in Proceedings of the 8th USENIX conference on operating systems design and implementation, 2008, pp. 1–14.
[6]
C. Chambers et al., FlumeJava: Easy, efficient data-parallel pipelines,” in ACM SIGPLAN conference on programming language design and implementation (PLDI), 2010, pp. 363–375.