In this section, we review the basic data types we use when processing data.
D: What types of data are more convenient when processing?
Sequences or Lists or Arrays represent consecutive items in memory
In Scala
Basic properties:
l(3)
Sets store values, without any particular order, and no repeated values.
Basic properties:
Maps or Dictionaries or
Associative Arrays is a collection of
(k,v)
pairs in such a way that each k
appears
only once.
Some languages have build-in support for Dictionaries
scala> val m = Map(("a" -> 1), ("b" -> 2))
val m: scala.collection.immutable.Map[String,Int] = Map(a -> 1, b -> 2)
Basic properties:
A graph data structure consists of a finite set of vertices or nodes, together with a set of unordered pairs of these vertices for an undirected graph or a set of ordered pairs for a directed graph.
Graphs are usually represented as Map[Node, List[Edge]]
,
where
Ordered graphs without loops
a = {"id": "5542101946", "type": "PushEvent",
"actor": {
"id": 801183,
"login": "tvansteenburgh"
},
"repo": {
"id": 42362423,
"name": "juju-solutions/review-queue"
}}
If we parse the above JSON in almost any language, we get a series of nested maps
An \(n\)-tuple is a sequence of \(n\) elements, whose types are known.
val record = Tuple4[Int, String, String, Int] (1, "Matt", "Damon", 1970)
// alternatively
val record = (1, "Matt", "Damon", 1970)
Scala makes it easy to declare and use tuples by automatically inferring the types of the tuple contents.
A relation is a Set
of \(n\)-tuples \((d1,
d2, ..., dn)\) of the same type; one of the tuple elements
denotes a key. Keys cannot be repeated.
Relations are very important for data processing, as they form the theoretical framework (Relational Algebra) for relational (SQL) databases.
Typical operations on relations are insert, remove and join. Join allows us to compute new relations by joining existing ones on common fields.
val movie1 = (1, "Martian", 2015, 2)
val movie2 = (2, "Prometheus", 2012, 2)
val movie3 = (3, "2001: Space Odyssey", 1968, 1)
val movies = Set(movie1, movie2, movie3)
val stanley = (1, "Stanley Kubrick", 1928)
val ridley = (2, "Ridley Scott", 1937)
val directors = Set(stanley, ridley)
Q: How can we get the list of movie names together with the name of their directors?
. . .
A key/value pair (or K/V) is a more general type of a relation, where each key can appear more than once.
// We assume that the first Tuple element represents the key
val a = (1, ("Martian", 2015))
val b = (1, ("Prometheus", 2012))
val kv = List(a, b)
// type: List[(Int, (String, Int))]
Another way to represent K/V pairs is with a Map
val xs = Map(1 -> List(("Martian", 2015), ("Prometheus", 2012)))
// type: Map[Int, List[(String, Int)]]
K and V are flexible: that’s why the Key/Value abstraction is key to NoSQL databases, including MongoDB, DynamoDB, Redis etc.
In this section, we discuss the basics of functional programming, as those apply to data processing with tools like Hadoop, Spark and Flink.
Functional programming is a programming paradigm where programs are constructed by applying and composing functions. (Wikipedia).
Functional programming characteristics:
\(foo(x: [A], y: B) \rightarrow C\)
We read this as: Function foo takes as arguments an array/list of type A and an argument of type B and returns an argument of type C
Q: What does the following function signature mean? \(f(x:[A], y:(z: A) \rightarrow B) \rightarrow [B]\)
A function has a side effect if it modifies some state outside its scope or has an observable interaction with its calling functions or the outside world besides returning a value.
var max = -1
def greaterOrEqual(a: Int, b: Int): Boolean = {
if(a >= b) {
max = a // side effect!
true
} else {
max = b // side effect!
false
}
}
As a general rule, any function that returns nothing
(void
or Unit
) does a side effect!
A pure function depends only on its declared inputs and its internal algorithm to produce its output.
def greaterOrEqual(a: Int, b: Int, max: Int): (Boolean, Int) = {
if(a >= b) (true, a)
else (false, b)
}
Pure functions offer referential transparency.
An expression is said to be referentially transparent if it can be replaced by its value and not change the program’s behavior.
Referential transparency enables simpler reasoning about programs.
Q: Is this function referentially transparent?
How can we write code that does something useful given those restrictions?
Writing loops functionally:
class Cafe {
def buyCoffee(cc: CreditCard): Coffee = {
val cup = new Coffee()
cc.charge(cup.price)
cup
}
}
Can you spot any problems with this code?
. . .
charge()
performs a side-effect: we need to contact the
credit card company!buyCoffee()
is not testableclass Cafe {
def buyCoffee(cc: CreditCard, p: Payments): Coffee = {
val cup = new Coffee()
p.charge(cc, cup.price)
cup
}
}
Slightly better option, but:
. . .
Payments
has to be an interfacePayments
mock. . .
Q: How can we buy 10 coffees?
class Cafe {
def buyCoffee(cc: CreditCard, p: Payments): Coffee = { ... }
def buyCoffees(cc: CreditCard, p: Payments, num: Int) = {
for (i <- 1 to num) {
buyCoffee(cc, p)
}
}
}
Seems to be working, but:
. . .
Idea: How about instead of charging in place, we decouple the action of buying coffee from that of charging for it?
class Cafe {
def buyCoffee(cc: CreditCard): (Coffee, Charge) = {
val cup = new Coffee()
(cup, Charge(cc, cup.price))
}
}
class Charge(cc: CreditCard, amount: Double) {
def combine(other: Charge) = new Charge(cc, amount + other.amount)
def pay = cc.charge(amount)
}
Nice! We can now:
Charge
s in oneclass Charge(cc: CreditCard, amount: Double) {
def combine(other: Charge) = new Charge(cc, amount + other.amount)
def pay = cc.charge(amount)
}
class Cafe {
def buyCoffee(cc: CreditCard): (Coffee, Charge) = { ... }
def buyCoffees(cc: CreditCard, num: Int): Seq[(Coffee, Charge)] =
(1 to num).map(buyCoffee(cc))
def checkout(cc: CreditCard, charges: Seq[Charge]) : Seq[Charge] = {
charges
.filter(charge => charge.cc == cc)
.foldLeft(new Charge(cc, 0.0)){(acc, x) => acc.combine(x)}
.pay // <- side-effect, but once, in one place.
charges.filter(charge => charge.cc != cc)
}
}
. . .
This example was adapted from the (awesome) FP in Scala book, by Chiusano and Bjarnason
Functional data structures are operated on pure functions and they are immutable.
Scala has immutable lists, tuples, maps, and sets.
val oneTwoThree = List(1, 2, 3)
val oneTwoThree_2 = 1 :: 2 :: 3 :: Nil
val one = oneTwoThree.head
val twoThree = oneTwoThree.tail
Scala has both mutable and immutable versions of many common data structures. If in doubt, use immutable.
“Pattern matching” checks a value against a pattern.
It can deconstruct a value into its constituent parts if the match is successful.
. . .
Q: What is the result of the following expression?
def incrementAll(ints: List[Int]): List[Int] = ints match {
case Nil => Nil
case x :: xs => x + 1 :: incrementAll(xs)
}
. . .
def doubleAll(ints: List[Int]): List[Int] = ints match {
case Nil => Nil
case x :: xs => x * 2 :: doubleAll(xs)
}
. . .
We can generalize the pattern using a higher-order function.
A higher-order function is a function that can take a function as an argument or return a function.
In the context of BDP, high-order functions capture common idioms of processing data as enumerated elements, e.g. going over all elements, selectively removing elements and aggregating them.
Defining a higher order function, applyToAll
:
def applyToAll[A, B](xs: List[A], f: A => B): List[B] = xs match {
case Nil => Nil
case h :: t => f(h) :: applyToAll(t, f)
}
Using applyToAll
to define other functions:
def incrementAll2(ints: List[Int]): List[Int] = applyToAll(ints, (x:Int) => x + 1)
def doubleAll2(ints: List[Int]): List[Int] = applyToAll(ints, (x:Int) => x * 2)
. . .
Scala collections has built-in map
function:
map(xs: List[A], f: A => B) : List[B]
Applies f
to all elements and returns a new list.
flatMap(xs: List[A], f: A => List[B]) : List[B]
Like map
, but flattens the result to a single list.
foldL(xs: List[A], f: (B, A) => B, init: B) : B
Takes f
of 2 arguments and an init value and combines
the elements by applying f
on the result of each previous
application. AKA reduce
.
groupBy(xs: List[A], f: A => K): Map[K, List[A]]
Partitions xs
into a map of traversable collections
according to a discriminator function.
filter(xs: List[A], f: A => Boolean) : List[A]
Takes a predicate and returns all elements that satisfy it
scanL(xs: List[A], f: (B, A) => B, init: B) : List[B]
Like foldL
, but returns a list of all intermediate
results
zip(xs: List[A], ys: List[B]): List[(A,B)]
Returns an iterable collection formed by iterating over the
corresponding items of xs
and ys
.
Laziness is an evaluation strategy which delays the evaluation of an expression until its value is needed.
Allows to separate defining how you evaluate a value from when you actually evaluate it.
Output:
Evaluates y
Evaluates x
// does not print:
val a = (1 to 10).iterator.map(print)
// prints the first 5 items:
val b = (1 to 10).iterator.map(print).take(5).toList
Output:
12345
// Working on a list
val r1 = (1 to 3)
.map(x => {
println("Mapping")
x + 1
})
.filter( x => {
println("Filtering")
x % 2 == 0
})
.sum
Output:
## Mapping
## Mapping
## Mapping
## Filtering
## Filtering
## Filtering
// Working on a lazy list
val r2 = (1 to 3).to(LazyList)
.map(x => {
println("Mapping")
x + 1
})
.filter( x => {
println("Filtering")
x % 2 == 0
})
.sum
Output:
## Mapping
## Filtering
## Mapping
## Filtering
## Mapping
## Filtering
Scala example:
In tools like Spark and Flink, we always express computations in a lazy manner. This allows for optimizations before the actual computation is executed
# Word count in PySpark
text_file = sc.textFile("words.txt")
counts = text_file \
.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("results.txt")
Q: In the code above, when will Spark compute the result?
. . .
A: When saveAsTextFile
is called!
Monads are a design pattern that defines how functions can be used together to build generic types.
Practically, a monad is a value-wrapping type that:
* Has an identity
function * Has a flatMap
function, that allows data to be transferred between monad types
Monads are the tool FP uses to deal with (side-)effects
Option[T]
Try[T]
Future[T]
Exceptions break referential transparency.
def failingFn(i: Int): Int = {
val x: Int = throw new Exception("fail!")
try {
x + 1
} catch { case e: Exception => -1 }
}
Exceptions are not type-safe and lead to complicated control-flow.
def mean(xs: Seq[Double]): Double =
if (xs.isEmpty)
throw new ArithmeticException("mean of empty list!")
else xs.sum / xs.length
Q: How to handle exceptions?
Option
wraps a value that may be null or undefined.
Option[A]
can have 2 instances: * Some[A]
,
where A represents the type of the value
* None
, if the value is undefined
flatMap
flatMap
enables us to join sequences of arbitrary
types.
Option
s that wrap undefined
valuesdef getArgument(k: String): Option[Arg]
def processArgument(a: Arg): Option[Result]
getArgument("foo")
.flatMap(processArgument)
.getOrElse(new Result("default"))
Future
s that wrap values that will
be eventually availableTry
wraps a computation that may either result in an
exception, or return a successfully computed value.
Try
is a type that can have 2 instances:
Success[T]
, where T
represents the type of
the resultFailure[E]
, where E
represents the type of
error, usually an exceptionobject Converter extends App {
def toInt(a: String): Try[Int] = Try{Integer.parseInt(a)}
def toString(b: Int): Try[String] = Try{b.toString}
val a = toInt("4").flatMap(x => toString(x))
println(a)
val b = toInt("foo").flatMap(x => toString(x))
println(b)
}
## Success(4)
## Failure(java.lang.NumberFormatException: For input string: "foo")
public class Converter {
public static Integer toInt(String a) throws NumberFormatException {
return Integer.parseInt(a);
}
public static String toString(Integer a) throws NullPointerException {
return a.toString();
}
public static void main(String[] args) {
try {
Integer five = toInt("5");
try {
String str = toString(five);
} catch(NullPointerException e) {
//baaah
}
} catch(NumberFormatException r) {
//ooofff
}
}
}
object Amazon {
def login(login: String, passwd: String): Future[Amazon]
}
class Amazon(val user: String) {
def search(term: String): Future[Seq[String]]
}
object Main extends App {
val result = Amazon.login("uname", "passwd") // Might fail
.flatMap(a => a.search("foo"))
val resultVal = result.value
}
}
Q: What would be the type of
result
?
Q: What would be the type of
resultVal
?
Future[Seq[String]]
Try[Seq[String]
Option[Try[Seq[String]]]
Before starting to process datasets, we need to be able to go over their contents in a systematic way. The process of visiting all items in a dataset is called traversal.
In a big data system:
There are two fundamental techniques for the client to process all available data in the data source
Iteration: The client asks the data source whether there are items left and then pulls the next item.
Observation: The data source pushes the next available item to a client end point.
D: What are the relative merits of each technique?
In the context of BDP, iteration allows us to process finite-sized data sets without loading them in memory at once.
Typical usage
The Iterator
pattern is supported in all programming
languages.
Reading from a file, first in Scala
Observation allows us to process (almost) unbounded size data sets, where the data source controls the processing rate
// Consumer
trait Observer[A] {
def onNext(a: A): Unit
def onError(t: Throwable): Unit
def onComplete(): Unit
}
// Producer
trait Observable[A] {
def subscribe(obs: Observer[A]): Unit
}
Example usage
Iterator-based map
. “Pulls” data out of array.
Observation-based (reactive) map
. Data is “pushed” to it
asynchronously, when new data is available.
D: (How) Can we convert between the two types of enumeration?
We apply a strategy to visit all individual items in a collection.
In case of nested data types (e.g. trees/graphs), we need to decide how to traverse. Common strategies include:
In most programming environments, traversal is implemented by iterators.
case class Tree(value: String, children: List[Tree])
def dfs(tree: Tree): Iterator[String] = tree match {
case Tree(value, Nil) => Iterator(value)
case Tree(value, children) => children.iterator.flatMap(c => dfs(c)) ++ Iterator(value)
}
Then, we can iterate using standard language constructs.
Operations are transformations, aggregations or cross-referenceing of data stored in data types. All of container data types can be iterated.
We generally have two types of operations:
Element-wise ops apply a function to each individual message (e.g. map or flatMap).
Aggregations group multiple events together and apply a reduction (e.g. fold or max) on them.
Suppose we have a list of people; each person is identified by a unique identifier, their age, their height in cm and their gender.
Now, let’s create 1000 random people!
val genders = List("Male", "Female", "Other")
val rand = new Random(42)
val people = (1 to 1000).map(i => Person(i,
age = rand.between(10, 80),
height = rand.between(60, 200),
weight = rand.between(40, 120),
gender = genders(rand.between(0, 3))))
.toList
## Person(1000,50,102,58,Female)
To convert values, we traverse a collection and apply a conversion function to each individual element. This is generalized to the \(map\) function:
\(map(xs: [A], f: (A) \rightarrow B) \rightarrow [B]\)
Let’s convert the persons’ heights to meters
def toMeters(person: Person): Person = person.copy(height = person.height * 1.0 / 100)
val peopleInMeters = map(people, p => toMeters(p))
or using a lambda function:
Note: Scala
List
has built-in implementations for most of the
functions we discuss in this lecture.
Projection allows us to select parts of a Tuple, Relation or other nested data type for further processing. To implement this, we need to iterate over all items of a collection and apply a conversion function.
To filter values from a list, we traverse a collection and apply a predicate function to each individual element.
\(filter(xs: [A], f: (A) \rightarrow Boolean): [A]\)
Q: How can we implement filter?
. . .
def filter(xs: List[Person], pred: Person => Boolean): List[Person] = xs match {
case Nil => Nil
case h :: t if pred(h) => h :: filter(t, pred)
case h :: t => filter(t, pred)
}
. . .
Filter on a generic class:
Aggregations apply a combining operator on a traversable sequence to aggregate the individual items into a single result.
Aggregation is implemented using reduction or folding. Two variants exist: left reduction (or folding) and right reduction (or folding).
Our task is to calculate the total weight for our people list. To do so, we need to iterate over the list and add the individual weights. First, we do it with an imperative approach:
We notice that iteration and reduction are independent. What if we abstract iteration away?
Folding functions generate a single value from a collection of data.
\(foldLeft(xs: [A], init: B, f: (acc: B, x: A) \rightarrow B) \rightarrow B\)
Left fold takes a function f
with 2 arguments and an
initial value and applies f
on all items, starting from the
left most. f
combines the result of its
previous application with the current element.
Then we can calculate the total weight of all people in our collection as follows
\(foldRight(xs: [A], init: B, f: (x: A, acc: B) \rightarrow B) \rightarrow B\)
Right fold takes a function f
with 2 arguments and an
initial value and applies f
on all items, starting from the
right most. f
combines the result of its
previous application with the current element.
foldRight
and foldLeft
:
differencesTo see how foldRight
and foldLeft
are
evaluated, we define a reduction function that accumulates the
intermediate steps as a string.
How does foldLeft
work?
## (((((0 + 1) + 2) + 3) + 4) + 5)
How does foldRight
work?
## (1 + (2 + (3 + (4 + (5 + 0)))))
Q: Can we always apply foldLeft
instead of foldRight
?
foldRight
!=
foldLeft
The answer to the previous question is: it depends on whether the reduction operation is commutative and associative.
An op \(\circ\) is commutative iff \(x \circ y = y \circ x\).
An op \(\circ\) is associative iff \(x \circ (y \circ z) = (x \circ y) \circ z\).
We can see that if we choose a non-commutative operator,
reduceL
and reduceR
produce different
results.
val resultLeft = foldLeft(List(1,2,3,4,5), 0, (x:Int, y:Int) => x - y)
val resultRight = foldRight(List(1,2,3,4,5), 0, (x:Int, y:Int) => x - y)
Q: What are the values of resultLeft
and
resultRight
?
reduceRight
and
reduceLeft
Similar to foldRight
and foldLeft
, reduce
functions generate a single value from a list of sequence starting from
right or left.
Therefore, reduceRight
!= reduceLeft
Different from folding, reduce
operations:
B
must be a subtype of
A
.The simplest possible aggregation is counting the number of elements, possibly matching a condition
\(count(xs: [A], pred): Integer\)
Q: How can we implement this with folding?
Given a sequence of items, produce a new sequence with no duplicates.
\(distinct(xs: [A]): [A]\)
## List(1, 2, 3)
## Set(1, 2, 3)
Distinct assumes that items have unique identities (e.g.,
hashCode()
in Java).
Aggregation functions have the following generic signature:
\(f: [A] \rightarrow Number\)
Their job is to reduce sequences of elements to a single measurement. Some examples are:
min
, max
,
count
mean
, median
,
stdev
Grouping splits a sequence of items to groups given a classification function.
\(groupBy(xs:[A], f: A \rightarrow K): Map[K, [A]]\)
def groupBy(xs: List[Int], classifier: Int => String): Map[String, List[Int]] = {
groupByHelper(xs, classifier, Map())
}
private def groupByHelper(xs: List[Int], classifier: Int => String, groups: Map[String, List[Int]]): Map[String, List[Int]] = xs match {
case Nil => groups
case h :: t =>
val key: String = classifier(h)
val value: List[Int] = h :: groups.getOrElse(key, List[Int]())
groupByHelper(t, classifier, groups + ((key, value)))
}
def number_classifier(x: Int): String =
if (x % 2 == 0) "even"
else "odd"
val list = List(1, 2, 3, 4, 5, 6, 7)
print(groupBy(list, number_classifier))
## Map(odd -> List(7, 5, 3, 1), even -> List(6, 4, 2))
How can we get the average height per gender (adults only) in our list of people?
val adults = people.filter(_.age > 18)
val adultsByGender = adults.groupBy(_.gender)
val heightsByGender = adultsByGender.map(x => (x._1, x._2.map(_.height)))
val avgHeightByGender = heightsByGender.map(x => (x._1, x._2.sum / x._2.size))
## HashMap(Other -> 135.0, Female -> 130.0, Male -> 132.0)
Q: What are the types of each variable?
The above is equivalent to the following SQL expression
D: What are the relative strengths and weaknesses of each representation?
KV stores is the most common format for distributed databases.
What KV systems enable us to do effectively is processing data locally (e.g. by key) before re-distributing them for further processing. Keys are naturally used to aggregate data before distribution. They also enable (distributed) data joins.
Typical examples of distributed KV stores are Dynamo, MongoDB and Cassandra
The most common data structure in big data processing is key-value pairs.
mapValues
: Transform the values part\(mapValues(kv: [(K,V)], f: V\rightarrow U): [(K,U)]\)
. . .
groupByKey
: Group the values for each key into a single
sequence.\(groupByKey(kv: [(K,V)]) : [(K, [V])]\)
. . .
reduceByKey
: Combine all elements mapped by the same
key into one\(reduceByKey(kv: [(K,V)], f: (V,V) \rightarrow V) : [(K, V)]\)
. . .
join
: Return a sequence containing all pairs of
elements with matching keys\(join(kv1: [(K,V)], kv2: [(K,W)]) : [(K, (V,W))]\)
Suppose we have a dataset of addresses
case class Addr(k: String, street: String, num: Int)
val addr = List(
Addr("EWI", "Mekelweg", 4),
Addr("EWI", "Van Mourik Broekmanweg", 6),
Addr("TPM", "Jafaalaan", 5),
Addr("AE", "Kluyverweg", 1)
)
and a dataset of deans
case class Dean(k: String, name: String, surname: String)
val deans = List(
Dean("EWI", "John", "Schmitz"),
Dean("TPM", "Hans", "Wamelink")
)
Q: Define a method deanAddresses
to
retrieve a list of address of each dean.
In practice, we get the following results
// EEEWWWW
List(
(Dean(EWI,John,Schmitz), List(
Addr(EWI,Mekelweg,4),
Addr(EWI,Van Mourik Broekmanweg,6))
),
(Dean(TPM,Hans,Wamelink),List(
Addr(TPM,Jafaalaan,5))
)
)
This is OK, but a more practical result would be a
List[(Dean, Addr)]
. For this, we need to flatten
the internal sequence.
flatMap
: Map and flatten in one
step\(flatMap(xs: [A], f: A \rightarrow [B]): [B]\)
flatMap
enables us to combine two data collections and
return a new collection with flattened values.
def deanAddresses2: List[(Dean, Addr)] =
deans.flatMap(d =>
addr.filter(a => a.k == d.k)
.map(a => (d, v)))
## List((Dean(EWI,John,Schmitz),Addr(EWI,Mekelweg,4)),
## (Dean(EWI,John,Schmitz),Addr(EWI,Van Mourik Broekmanweg,6)),
## (Dean(TPM,Hans,Wamelink),Addr(TPM,Jafaalaan,5)))
. . .
In Scala, flatMap
is special
A KV pair is an alternative form of a relation, indexed by a key. We can always convert between the two.
Convert the above relation to a KV pair.
Convert the KV pair back to a relation.
This means that any operation we can do on relations, we can also do on KV pairs.
One of the key characteristics of data processing is that data is never modified in place. Instead, we apply operations that create new versions of the data, without modifying the original version.
Immutability is a general concept that expands in much of the data processing stack.
Image from Helland in ref [1]
Copy-On-Write is a general technique that allows us to share memory for read-only access across processes and deal with writes only if/when they are performed by copying the modified resource in a private version.
COW is the basis for many operating system mechanisms, such as process creation (forking), while many new filesystems (e.g. BTRFS, ZFS) use it as their storage format.
COW enables systems with multiple readers and few writers to efficiently share resources.
Immutable or persistent data structures always preserve the previous version of themselves when they are modified [2].
With immutable data structures, we can:
They come at a cost of increased memory usage (data is never deleted).
ADT | collection.mutable |
collection.immutable |
---|---|---|
Array | ArrayBuffer | Vector |
List | LinkedList | List |
Map | HashMap | HashMap |
Set | HashSet | HashSet |
Queue | SynchronizedQueue | Queue |
Tree | — | TreeSet |
This work is (c) 2017, 2018, 2019, 2020, 2021 - onwards by TU Delft and Georgios Gousios and licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International license.