RDDs only see binary blobs with an attached type
Databases can do many optimizations because they know the data types for each field
D: Say that we have an RDD of 100k people and another of 300k addresses. We need to get the details of all people that live in Delft, along with their addresses. Which of the following joins will be faster?
val people : RDD[Person]
val addresses: RDD[(Int, Address)]
//Option 1
.keyBy(_.id).join(addresses.filter(x._2._2.city == "Delft"))
people
//Option 2
.keyBy(_.id).join(addresses).filter(x._2._2.city == "Delft")
people
//Option 3
.keyBy(_.id).cartesian(addresses).filter(x._2._2.city == "Delft") people
Join 1 is the fastest. However, why do we have to do this optimization ourselves? Because Spark does not know the schema and what the \(\lambda\)s that are args to filter
do!
In Spark SQL, we trade some of the freedom provided by the RDD API to enable:
The price we have to pay is to bring our data to a (semi-)tabular format and describe its schema. Then, we let relational algebra work for us.
SparkSQL is a library build on top of Spark RDDs. It provides two main abstractions:
Dataset[Row]
, where Row
\(\approx\) Array[Object]
. Equivalent to R or Pandas DataframesIt offers a query optimizer (Catalyst) and an off-heap data cache (Tungsteen).
It can directly connect and use structured data sources (e.g. SQL databases) and can import CSV, JSON, Parquet, Avro and data formats by inferring their schema.
A blog post by MySQL experts at Percona wanted to find the number of delayed files per airline using the air trafic dataset.
Running the query on MySQL, it took 19 mins.
On the same server, they used Spark SQL to connect to MySQL, partitioned the Dataframe that resulted from the connection and run the query in Spark SQL. It took 192 secs!
This was the result of Catalyst rewriting the SQL query: instead of 1 complex query, SparkSQL run 24 parallel ones using range conditions to restrict the examined data volumes. MySQL cannot do this!
Similarly to normal Spark, SparkSQL needs a context object to invoke its functionality. This is the SparkSession
.
If a SparkContext
object exists, it is straightforward to get a SparkSession
val spark = SparkSession.builder.config(sc.getConf).getOrCreate()
RDD[(String, Int, String)]
import spark.implicits._
val df = rdd.toDF("name", "id", "address")
RDD[Person]
val df = persons.toDF() // Columns names/types are inferred!
val schema = StructType(Array(
StructField("level", StringType, nullable = true),
StructField("date", DateType, nullable = true),
StructField("client_id", IntegerType, nullable = true),
StructField("stage", StringType, nullable = true),
StructField("msg", StringType, nullable = true),
))
val rowRDD = sc.textFile("ghtorrent-log.txt")
.map(_.split(" ")).
.map(r => Row(r(0), new Date(r(1)), r(2).toInt,
r(3), r(4)))
val logDF = spark.createDataFrame(rowRDD, schema)
val df = spark.read.json("examples/src/main/resources/people.json")
or
= sqlContext.read.csv("/datasets/pullreqs.csv", sep=",", header=True,
df =True) inferSchema
Both RDDs and Datasets:
RDD[T]
)The main difference is that Datasets use special Encoder
s to convert the data in compact internal formats that Spark can use directly when applying operations such as map
or filter
.
The internal format is very efficient; it is not uncommon to have in-memory data that use less memory space than their on disk format.
Moral: when in doubt, provide a schema!
Columns in DataFrames/Sets can be accessed by name:
"team_size"]
df[ df.team_size
or in Scala
df("team_size")
"team_size" //scala only $
Column
s are defined by expressions. The API overrides language operators to return expression objects. For example, the following:
df("team_size") + 1
is syntactic sugar for
.sql.expressions.Add(df("team_size"), lit(1).expr) spark
DataFrames/Datasets include all typical relational algebra operations:
"project_name").show()
df.select("project_name", "pullreq_id").show() df.drop(
filter(df.team_size.between(1,4)).show() df.
Dataframes can be joined irrespective of the underlying implementation, as long as they share a key.
= sqlContext.read.csv("people.csv")
people = sqlContext.read.jdbc("jdbc:mysql://company/departments")
department
filter(people.age > 30).\
people.== department.id) join(department, people.deptId
All types of joins are supported:
== department.id,
people.join(department, people.deptId = left_outer) how
== department.id,
people.join(department, people.deptId = full_outer) how
When groupBy
is called on a Dataframe, it is (conceptually) split in a key/value structure, where key is the different values of the column groupped upon and value are rows containing each individual value.
Same as SQL, we can only apply aggregate functions on groupped Dataframes
"lifetime_minutes").show() df.groupBy(df.project_name).mean(
Why is SparkSQL so fast? Because it uses optimization and code generation.
The key to both features is that code passed to higher order functions (e.g. the predicate to filter
) is syntactic sugar to generate expression trees.
.filter(df("team_size") > (3 + 1)) df
is converted to
.filter(GreaterThan(
dfUnresolvedAttribute("team_size"),
Add(Literal(3) + Literal(1))))
This corresponds to an Abstract Syntax Tree
The optimizer uses tree patterns to simply the AST. For example, the following tree:
val ast = GreaterThan(
UnresolvedAttribute("team_size"),
Add(Literal(3) + Literal(1)))
can be simplified to:
val opt = ast.transform {
case Add(Literal(c1), Literal(c2)) => Literal(c1+c2)
}
GreaterThan(UnresolvedAttribute("team_size"),
Literal(4))
Catalyst knows 10s (100s?) such optimizations. The purpose is to minimize work done when the query is evaluated on real data.
Given an expression tree and a context (the Dataframe), Catalyst exploits Scala’s compiler ability to manipulate ASTs as Strings.
def compile(node: Node): AST = node match {
case Literal(value) => q"$value"
case Attribute(name) => q"row.get($name)"
case GreaterThan(left, right) =>
"${compile(left)} > ${compile(right)}"
q}
The previous expression tree is converted (roughly!) to
.filter(row => row.get("team_size") > 4) df
and gets compiled at runtime to JVM code.
Catalyst performs the following steps:
UnresolvedAttribute("team_size")
to Attribute("team_size")
of type Int
(and would also check whether team_size
exists in df
)The end result is native code that runs in optimal locations on top of an RDD.
This work is (c) 2017, 2018, 2019, 2020, 2021 - onwards by TU Delft and Georgios Gousios and licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International license.