RDDs only see binary blobs with an attached type
Databases can do many optimizations because they know the data types for each field
D: Say that we have an RDD of 100k people and another of 300k addresses. We need to get the details of all people that live in Delft, along with their addresses. Which of the following joins will be faster?
val people : RDD[Person]
val addresses: RDD[(Int, Address)]
//Option 1
people.keyBy(_.id).join(addresses.filter(x._2._2.city == "Delft"))
//Option 2
people.keyBy(_.id).join(addresses).filter(x._2._2.city == "Delft")
//Option 3
people.keyBy(_.id).cartesian(addresses).filter(x._2._2.city == "Delft")
. . .
Join 1 is the fastest. However, why do we have to do this
optimization ourselves? Because Spark does not know the schema and what
the \(\lambda\)s that are args to
filter
do!
In Spark SQL, we trade some of the freedom provided by the RDD API to enable:
The price we have to pay is to bring our data to a (semi-)tabular format and describe its schema. Then, we let relational algebra work for us.
SparkSQL is a library build on top of Spark RDDs. It provides two main abstractions:
Dataset[Row]
, where
Row
\(\approx\)
Array[Object]
. Equivalent to R or Pandas DataframesIt offers a query optimizer (Catalyst) and an off-heap data cache (Tungsteen).
It can directly connect and use structured data sources (e.g. SQL databases) and can import CSV, JSON, Parquet, Avro and data formats by inferring their schema.
A blog post by MySQL experts at Percona wanted to find the number of delayed files per airline using the air trafic dataset.
Running the query on MySQL, it took 19 mins.
On the same server, they used Spark SQL to connect to MySQL, partitioned the Dataframe that resulted from the connection and run the query in Spark SQL. It took 192 secs!
This was the result of Catalyst rewriting the SQL query: instead of 1 complex query, SparkSQL run 24 parallel ones using range conditions to restrict the examined data volumes. MySQL cannot do this!
Similarly to normal Spark, SparkSQL needs a context object to invoke
its functionality. This is the SparkSession
.
If a SparkContext
object exists, it is straightforward
to get a SparkSession
RDD[(String, Int, String)]
RDD[Person]
val schema = StructType(Array(
StructField("level", StringType, nullable = true),
StructField("date", DateType, nullable = true),
StructField("client_id", IntegerType, nullable = true),
StructField("stage", StringType, nullable = true),
StructField("msg", StringType, nullable = true),
))
val rowRDD = sc.textFile("ghtorrent-log.txt")
.map(_.split(" ")).
.map(r => Row(r(0), new Date(r(1)), r(2).toInt,
r(3), r(4)))
val logDF = spark.createDataFrame(rowRDD, schema)
or
Both RDDs and Datasets:
RDD[T]
)The main difference is that Datasets use special
Encoder
s to convert the data in compact internal formats
that Spark can use directly when applying operations such as
map
or filter
.
The internal format is very efficient; it is not uncommon to have in-memory data that use less memory space than their on disk format.
Columns in DataFrames/Sets can be accessed by name:
Column
s are defined by expressions. The API
overrides language operators to return expression objects. For example,
the following:
is syntactic sugar for
DataFrames/Datasets include all typical relational algebra operations:
Dataframes can be joined irrespective of the underlying implementation, as long as they share a key.
people = sqlContext.read.csv("people.csv")
department = sqlContext.read.jdbc("jdbc:mysql://company/departments")
people.filter(people.age > 30).\
join(department, people("deptId") === department("id")
All types of joins are supported:
When groupBy
is called on a Dataframe, it is
(conceptually) split in a key/value structure, where key is the
different values of the column groupped upon and value are rows
containing each individual value.
Same as SQL, we can only apply aggregate functions on groupped Dataframes
Register the DataFrame as a temporary SQL view
Use SQL syntax to operate on the data
Why is SparkSQL so fast? Because it uses optimization and code generation.
The key to both features is that code passed to higher order
functions (e.g. the predicate to filter
) is syntactic sugar
to generate expression trees.
is converted to
This corresponds to an Abstract Syntax Tree
The optimizer uses tree patterns to simply the AST. For example, the following tree:
can be simplified to:
. . .
Catalyst knows 10s (100s?) such optimizations. The purpose is to minimize work done when the query is evaluated on real data.
Catalyst performs the following steps:
UnresolvedAttribute("team_size")
to
Attribute("team_size")
of type Int
(and would
also check whether team_size
exists in
df
)The end result is native code that runs in optimal locations on top of an RDD.
This work is (c) 2017, 2018, 2019, 2020, 2021 - onwards by TU Delft and Georgios Gousios and licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International license.