Parallelism is about speeding up computations by using multiple processors.
When processing big data, data parallelism is used to move computations close to the data, instead of moving data in the network.
This means that our programming model and execution should hide (but not forget!) those.
Map/Reduce is a general computation framework, loosely based on functional programming[1]. It assumes that data exists in a K/V store
map((K1, V1), f: (K1, V1) -> (K2, V2)): List[(K2, V2)]
reduce((K2, List[V2])): List[(K3, V3)]
Map/Reduce as a system was proposed by Dean & Ghemawat [2], along with the GFS.
What is good about Hadoop: Fault tolerance
Hadoop was the first framework to enable computations to run on 1000s of commodity computers.
What is wrong: Performance
Optional reading (not part of the exam material)
Microsoft’s DryadLINQ combined the Dryad distributed execution engine with the LINQ language for defining queries.
Optional reading (not part of the exam material)
Google’s FlumeJava attempted to provide a few simple abstractions for programming data-parallel computations. These abstractions are higher-level than those provided by MapReduce, and provide better support for pipelines.
PTable<String,Integer> wordsWithOnes =
words.parallelDo( new DoFn<String, Pair<String,Integer>>() {
void process(String word,
EmitFn<Pair<String,Integer>> emitFn) {
emitFn.emit(Pair.of(word, 1));
}
}, tableOf(strings(), ints()));
PTable<String,Collection<Integer>>
groupedWordsWithOnes = wordsWithOnes.groupByKey();
PTable<String,Integer> wordCounts =
groupedWordsWithOnes.combineValues(SUM_INTS);
Spark is on open source cluster computing framework.
RDDs are the core abstraction that Spark uses.
RDDs make datasets distributed over a cluster of machines look like a Scala collection. RDDs:
In practice, RDD[A]
works like Scala’s
List[A]
// http://classics.mit.edu/Homer/odyssey.mb.txt
val rdd = sc.textFile("./datasets/odyssey.mb.txt")
rdd
.flatMap(l => l.split(" ")) // Split file in words
.map(x => (x, 1)) // Create key,1 pairs
.reduceByKey((acc, x) => acc + x) // Count occurrences of same pairs
.sortBy(x => x._2, false) // Sort by number of occurrences
.take(50) // Take the first 50 results
.foreach(println)
RDDs can only be created in the following 3 ways
val rdd1 = sc.textFile("hdfs://...")
val rdd2 = sc.textFile("file://odyssey.txt")
val rdd3 = sc.textFile("s3://...")
. . .
. . .
There are two types of operations we can do on an RDD:
Transformation: Applying a function that returns a new RDD. They are lazy.
Action: Request the computation of a result. They are eager.
map
: Apply \(f\) on
all items in the RDD and return an RDD of the result.\(RDD[A].map(f: A \rightarrow B) : RDD[B]\)
. . .
flatMap
: Apply \(f\)
on all RDD contents, return an RDD with the contents of all intermediate
iterators.\(RDD[A].flatMap(f: A \rightarrow Iterable[B]): RDD[B]\)
. . .
filter
: Apply predicate \(p\), return items that satisfy it.\(RDD[A].filter(p: A \rightarrow Boolean): RDD[A]\)
All uses of articles in the Odyssey
val odyssey = sc.textFile("datasets/odyssey.mb.txt")
.flatMap(_.split(" "))
odyssey.map(_.toLowerCase)
.filter(Seq("a", "the").contains(_))
Q: How can we find uses of all regular verbs in past tense?
. . .
Q: How can we remove all punctuation marks?
. . .
Q: How can we find uses of distinct regular verbs in past tense?
. . .
Q: How can we sort the RDD?
. . .
Q: How can we sample data from the RDD?
collect
: Return all elements of an RDD\(RDD[A].collect() : Array[A]\)
. . .
take
: Return the first n elements of the
RDD\(RDD[A].take(n): Array[A]\)
. . .
reduce
, fold
: Combine all elements to a
single result of the same type.\(RDD[A].reduce(f: (A, A) \rightarrow A): A\)
. . .
aggregate
: Aggregate the elements of each partition,
and then the results for all the partitions\(RDD[A].aggr(init: B)(seqOp: (B, A) \rightarrow B, combOp: (B, B) \rightarrow B): B\)
How many words are there (using reduce
)?
val odyssey = sc.textFile("datasets/odyssey.mb.txt").flatMap(_.split(" "))
odyssey.map(x => 1).reduce((a,b) => a + b)
How many words are there (using aggregate
)?
How to calculate average of a collection of integers (using
aggregate
)?
RDDs can represent any complex data type, if it can be iterated.
Spark treats RDDs of the type RDD[(K,V)]
as special, named
PairRDDs, as they can be both iterated and indexed.
Operations such as join
are only defined on Pair RDDs,
meaning that we can only combine RDDs if their contents can be
indexed.
We can create Pair RDDs by applying an indexing function, using
keyBy
function, or by grouping records:
val rdd = sc.parallelize(List("foo", "bar", "baz")) // RDD[String]
val pairRDD = rdd.map(x => (x.charAt(0), x)) // RDD[(Char, String)]
pairRDD.collect // Array((f,foo), (b,bar), (b,baz))
val pairRDD2 = rdd.keyBy(x => x.toLowerCase.head) // RDD[(Char, String)]
pairRDD2.collect // Array((f,foo), (b,bar), (b,baz))
val pairRDD3 = rdd.groupBy(x => x.charAt(0)) // RDD[(Char, Iterable(String))]
pairRDD3.collect // Array((b,CompactBuffer(bar, baz)), (f,CompactBuffer(foo)))
The following functions are only available on
RDD[(K,V)]
reduceByKey
: Merge the values for each key using an
associative and commutative reduce function\(reduceByKey(f: (V, V) \rightarrow V): RDD[(K, V)]\)
. . .
aggregateByKey
: Aggregate the values of each key, using
given combine functions and a neutral “zero value”\(aggrByKey(zero: U)(f: (U, V) \rightarrow U, g: (U, U) \rightarrow U): RDD[(K, U)]\)
. . .
join
: Return an RDD containing all pairs of elements
with matching keys\(join(b: RDD[(K, W)]): RDD[(K, (V, W))]\)
groupByKey
and
reduceByKey
val odyssey = sc.textFile("sample.txt").flatMap(_.split(" "))
val words = odyssey.flatMap(_.split(" ")).map(c => (c, 1))
Word count using groupByKey
:
val counts = words.groupByKey() // RDD[(String, Iterable[Int])]
.map(row => (row._1, row._2.sum)) // RDD[(String, Int)]
.collect() // Array[(String, Int)]
Word count using reduceByKey
:
Q: It’s more efficient to use
reduceByKey
over groupByKey
. Why?
aggregateByKey
Word count using aggregateByKey
:
How can we count the number of occurrences of part of speech elements?
join
case class Person(id: Int, name: String)
case class Addr(id: Int, person_id: Int,
address: String, number: Int)
val pers = sc.textFile("pers.csv") // id, name
val addr = sc.textFile("addr.csv") // id, person_id, street, num
val ps = pers.map(_.split(",")).map(x => Person(x(0).toInt, x(1)))
val as = addr.map(_.split(",")).map(x => Addr(x(0).toInt, x(1).toInt,
x(2), x(3).toInt))
Q: What are the types of ps
and
as
? How can we join them?
. . .
Given a “left” RDD[(K,A)]
and a “right”
RDD[(K,B)]
join
): The result contains only records
that have the keys in both RDDs.left/rightOuterJoin
): The result contains
records that have keys in either the “left” or the “right” RDD in
addition to the inner join results.\(left.loj(right) : RDD[(K,(A, Option[B]))]\) \(left.roj(right) : RDD[(K,(Option[A], B))]\)
\(left.foj(right) : RDD[(K,(Option[A], Option[B]))]\)
From the RDD.scala, line 61.
Internally, each RDD is characterized by five main properties:
D: Why does an RDD need are all those?
Even though RDDs might give the impression of continuous memory blocks spread across a cluster, data in RDDs is split into partitions.
Partitions define a unit of computation and persistence: any Spark computation transforms a partition to a new partition.
If during computation a machine fails, Spark will redistribute its partitions to other machines and restart the computation one those partitions only.
The partitioning scheme of an application is configurable; better configurations lead to better performance.
Spark supports 3 types of partitioning schemes:
Extended partitioning is only configurable on Pair RDDs.
Range partitioning: Takes into account the natural order of keys to split the dataset in the required number of partitions. Requires keys to be naturally ordered and keys to be equally distributed across the value range.
Hash partitioning: Calculates a hash over each item key and then produces the modulo of this hash to determine the new partition. This is equivalent to:
Narrow dependencies: Each partition of the source RDD is used by at most one partition of the target RDD
Wide dependencies: Multiple partitions in the target RDD depend on a single partition in the source RDD.
When operations need to calculate results using a common characteristic (e.g. a common key), this data needs to reside on the same physical node. This is the case with all “wide dependency” operations. The process of re-arranging data so that similar records end up in the same partitions is called shuffling.
Shuffling is very expensive as it involves moving data across the network and possibly spilling them to disk (e.g. if too much data is computed to be hosted on a single node). Avoid it at all costs!
See this link for a great write up on Spark shuffling
RDDs contain information on how to compute themselves, including dependencies to other RDDs. Effectively, RDDs create a directed acyclic graph of computations.
Lineage information allow an RDD to be traced to its ancestors.
To demonstrate this, consider the following example:
val rdd1 = sc.parallelize(0 to 10)
val rdd2 = sc.parallelize(10 to 100)
val rdd3 = rdd1.cartesian(rdd2)
val rdd4 = rdd3.map(x => (x._1 + 1, x._2 + 1))
Q: What are the computation dependencies here?
. . .
scala> rdd4.toDebugString
res3: String =
(16) MapPartitionsRDD[3] at map at <console>:30 []
| CartesianRDD[2] at cartesian at <console>:28 []
| ParallelCollectionRDD[0] at parallelize at <console>:24 []
| ParallelCollectionRDD[1] at parallelize at <console>:24 []
val input = sc.textFile("sample.txt")
val tokenized = input.flatMap(_.split(" "))
.filter(x => x.length > 1)
val result = tokenized.map(x => (x, 1))
.reduceByKey((a,b) => a + b)
.sortBy(_._1)
> input.toDebugString
(2) sample.txt MapPartitionsRDD[1] at textFile at <console>:24 []
| sample.txt HadoopRDD[0] at textFile at <console>:24 []
> tokenized.toDebugString
(2) MapPartitionsRDD[3] at filter at <console>:25 []
| MapPartitionsRDD[2] at flatMap at <console>:25 []
| sample.txt MapPartitionsRDD[1] at textFile at <console>:24 []
| sample.txt HadoopRDD[0] at textFile at <console>:24 []
> result.toDebugString
(2) MapPartitionsRDD[10] at sortBy at <console>:25 []
| ShuffledRDD[9] at sortBy at <console>:25 []
+-(2) MapPartitionsRDD[6] at sortBy at <console>:25 []
| ShuffledRDD[5] at reduceByKey at <console>:25 []
+-(2) MapPartitionsRDD[4] at map at <console>:25 []
| MapPartitionsRDD[3] at filter at <console>:25 []
| MapPartitionsRDD[2] at flatMap at <console>:25 []
| sample.txt MapPartitionsRDD[1] at textFile at <console>:24 []
| sample.txt HadoopRDD[0] at textFile at <console>:24 []
Data in RDDs is stored in three ways:
Persistence in Spark is configurable and can be used to store frequently used computations in memory, e.g.:
val rdd = sc.textFile("hdfs://host/foo.txt")
val persisted = rdd.map(x => x + 1).persist(StorageLevel.MEMORY_ONLY_SER)
persisted
is now cached. Further accesses will avoid
reloading it and applying the map
function.
Storage level | Meaning |
---|---|
MEMORY_ONLY |
Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they’re needed. This is the default level. |
MEMORY_AND_DISK |
Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don’t fit on disk, and read them from there when they’re needed. |
MEMORY_ONLY_SER \ (Java and Scala) |
Store RDD as serialized Java objects (one byte array per partition). More space-efficient than deserialized objects but more CPU-intensive to read. |
MEMORY_AND_DISK_SER (Java and Scala) |
Similar to MEMORY_ONLY_SER , but spill
partitions that don’t fit in memory to disk instead of recomputing them
on the fly each time they’re needed. |
DISK_ONLY |
Store the RDD partitions only on disk. |
A Spark application:
Jobs are created when an action is requested. Spark walks the RDD dependency graph backwards and builds a graph of stages.
Stages are jobs with wide dependencies. When such an
operation is requested (e.g. groupByKey
or
sort
) the Spark scheduler will need to
reshuffle/repartition the data. Stages (per RDD) are always executed
serially. Each stage consists of one or more tasks.
Tasks is the minimum unit of execution; a task applies a narrow dependency function on a data partition. The cluster manager starts as many jobs as the data partitions.
We can see how our job executes in we connect to our driver’s WebUI (port 4040 on the driver machine).
Here is the graph for the word counting job we saw before.
Spark uses RDD lineage information to know which partition(s) to recompute in case of a node failure.
Recomputing happens at the stage level.
To minimize recompute time, we can use checkpointing. With checkpointing we can save job stages to reliable storage. Checkpointing effective truncates the RDD lineage graph.
Spark clusters are reliable to node failures, but not to master failures. Running Spark on middleware such as YARN or Mesos is the only way to run multi-master setups.
Here is an example of how to start an application with a custom resource configuration.
RDDs by default do not impose any format on the data they store. However, if the data is formatted (e.g. log lines with known format), we can create a schema and have the Scala compiler type-check our computations.
Consider the following data (common log format):
127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] \
"GET /apache_pb.gif HTTP/1.0" 200 2326
We can map this data to a Scala case class
case class LogLine(ip: String, id: String, user: String,
dateTime: Date, req: String, resp: Int,
bytes: Int)
and use a regular expression to parse the data:
([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) "(.+)" (\d+) (\d+)
Then, we can use flatMap
in combination with Scala’s
pattern matching to filter out bad lines:
import java.text.SimpleDateFormat
import java.util.Date
val dateFormat = "d/M/y:HH:mm:ss Z"
val regex = """([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) "(.+)" (\d+) (\d+)""".r
val rdd = sc
.textFile("access-log.txt")
.flatMap ( x => x match {
case regex(ip, id, user, dateTime, req, resp, bytes) =>
val df = new SimpleDateFormat(dateFormat)
new Some(LogLine(ip, id, user, df.parse(dateTime),
req, resp.toInt, bytes.toInt))
case _ => None
})
Then, we can compute the total traffic per month
val bytesPerMonth = rdd
.groupBy(k => k.dateTime.getMonth)
.aggregateByKey(0)({(acc, x) => acc + x.map(_.bytes).sum},
{(x,y) => x + y})
Notice that all code on this slide is type checked!
The data sources that Spark can use go beyond textFile
s.
Spark can connect to databases such as
val readConfig = ReadConfig(Map("uri" -> "mongodb://127.0.0.1/github.events"))
sc.loadFromMongoDB(readConfig)
val events = MongoSpark.load(sc, readConfig)
events.count
val users = spark.read.format("jdbc").options(
Map("url" -> "jdbc:mysql://localhost:3306/ghtorrent?user=root&password=",
"dbtable" -> "ghtorrent.users",
"fetchSize" -> "10000"
)).load()
users.count
or to distributed file systems like HDFS, Amazon S3, Azure Data Lake etc
Partitioning becomes an important consideration when we need to run iterative algorithms. Some cases benefit a lot from defining custom partitioning schemes:
reduceByKey
or aggregateByKey
on RDDs with
arithmetic keys benefit from range partitioning as the shuffling stage
is minimal (or none) because reduction happens locally!From the docs: Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.
Broadcasts are often used to ship precomputed items, e.g. lookup tables or machine learning models, to workers so that they do not have to retransfer them on every shuffle.
With broadcasts, we can implement efficient in-memory joins between a processed dataset and a lookup table.
val curseWords = List("foo", "bar") // Use your imagination here!
val bcw = sc.broadcast(curseWords)
odyssey.filter(x => !curseWords.contains(x))
Broadcasted data should be relatively big and immutable.
Some times we need to keep track of variables like performance counters, debug values or line counts while computations are running.
// Bad code
var taskTime = 0L
odyssey.map{x =>
val ts = System.currentTimeMillis()
val r = foo(x)
taskTime += (System.currentTimeMillis() - ts)
r
}
To make it work, we need to define an accumulator
val taskTime = sc.accumulator(0L)
odyssey.map{x =>
val ts = System.currentTimeMillis()
val r = foo(x)
taskTime += (System.currentTimeMillis() - ts)
r
}
Using accumulators is a side-effecting operation and should be avoided as it complicates design. It is important to understand that accumulators are aggregated at the driver, so frequent writes will cause large amounts of network traffic.
This work is (c) 2017, 2018, 2019, 2020, 2021 - onwards by TU Delft and Georgios Gousios and licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International license.