Programming for Big Data

Basic Data Types

In this section, we review the basic data types we use when processing data.

Types of data

  • Unstructured: Data whose format is not known
    • Raw text documents
    • HTML pages
  • Semi-Structured: Data with a known format.
    • Pre-parsed data to standard formats: JSON, CSV, XML
  • Structured: Data with known formats, linked together in graphs or tables
    • SQL or Graph databases

D: What types of data are more convenient when processing?

Sequences / Lists

Sequences or Lists or Arrays represent consecutive items in memory

In Scala

val l = List(1, 2, 3, 4)

Basic properties:

  • Size is bounded by memory
  • Items can be accessed by an index: l(3)
  • Items can only be inserted at the end (append)
  • Can be sorted

Sets

Sets store values, without any particular order, and no repeated values.

scala> val s = Set(1, 2, 3, 4, 4)
s: scala.collection.immutable.Set[Int] = Set(1, 2, 3, 4)

Basic properties:

  • Size is bounded by memory
  • Can be queried for containment
  • Set operations: union, intersection, difference, subset

Maps or Dictionaries

Maps or Dictionaries or Associative Arrays is a collection of (k,v) pairs in such a way that each k appears only once.

Some languages have build-in support for Dictionaries

scala> val m = Map(("a" -> 1), ("b" -> 2)) 
val m: scala.collection.immutable.Map[String,Int] = Map(a -> 1, b -> 2)

Basic properties:

  • One key always corresponds to one value.
  • Accessing a value given a key is very fast (\(\approx O(1)\))
A HashTable
A HashTable

Nested data types: Graphs

A graph data structure consists of a finite set of vertices or nodes, together with a set of unordered pairs of these vertices for an undirected graph or a set of ordered pairs for a directed graph.

  • Nodes can contain attributes
  • Edges can contain weights and directions

Graphs are usually represented as Map[Node, List[Edge]], where

case class Node(id: Int, attributes: Map[A, B])
case class Edge(a: Node, b: Node, directed: Option[Boolean],
                  weight: Option[Double] )

Nested data types: Trees

Ordered graphs without loops

a = {"id": "5542101946", "type": "PushEvent",
    "actor": {
      "id": 801183,
      "login": "tvansteenburgh"
    },
    "repo": {
      "id": 42362423,
      "name": "juju-solutions/review-queue"
   }}

If we parse the above JSON in almost any language, we get a series of nested maps

Map("id" -> 5542101946L,
    "type" -> "PushEvent",
    "actor" -> Map("id" -> 801183.0, "login" -> "tvansteenburgh"),
    "repo" -> Map("id" -> 4.2362423E7, "name" -> "juju-solutions/review-queue"))
)

Tuples

An \(n\)-tuple is a sequence of \(n\) elements, whose types are known.

val record = Tuple4[Int, String, String, Int] (1, "Matt", "Damon", 1970)
// alternatively
val record = (1, "Matt", "Damon", 1970)

Scala makes it easy to declare and use tuples by automatically inferring the types of the tuple contents.

val a = (1, ("Foo", 2)) // type: Tuple2[Int, Tuple2[String, Int]]
// alternatively: (Int, (String, Int))

println(a._1) // prints 1
println(a._2._1) // prints Foo

Relations

A relation is a Set of \(n\)-tuples \((d1, d2, ..., dn)\) of the same type; one of the tuple elements denotes a key. Keys cannot be repeated.

Relations are very important for data processing, as they form the theoretical framework (Relational Algebra) for relational (SQL) databases.

Typical operations on relations are insert, remove and join. Join allows us to compute new relations by joining existing ones on common fields.

Relations
Relations

Relations example

val movie1 = (1, "Martian", 2015, 2)
val movie2 = (2, "Prometheus", 2012, 2)
val movie3 = (3, "2001: Space Odyssey", 1968, 1)

val movies = Set(movie1, movie2, movie3)

val stanley = (1, "Stanley Kubrick", 1928)
val ridley = (2, "Ridley Scott", 1937)

val directors = Set(stanley, ridley)

Q: How can we get the list of movie names together with the name of their directors?

. . .

Key/Value pairs

A key/value pair (or K/V) is a more general type of a relation, where each key can appear more than once.

// We assume that the first Tuple element represents the key
val a = (1, ("Martian", 2015))
val b = (1, ("Prometheus", 2012))

val kv = List(a, b)
// type: List[(Int, (String, Int))]

Another way to represent K/V pairs is with a Map

val xs = Map(1 -> List(("Martian", 2015), ("Prometheus", 2012)))
// type: Map[Int, List[(String, Int)]]

K and V are flexible: that’s why the Key/Value abstraction is key to NoSQL databases, including MongoDB, DynamoDB, Redis etc.

Functional Programming in a Nutshell

In this section, we discuss the basics of functional programming, as those apply to data processing with tools like Hadoop, Spark and Flink.

Functional programming

Functional programming is a programming paradigm where programs are constructed by applying and composing functions. (Wikipedia).

Functional programming characteristics:

  • Absence of side-effects: A function, given an argument, always returns the same results irrespective of and without modifying its environment.
  • Immutable data structures: Side-effect free functions operate on immutable data.
  • Higher-order functions: Functions can take functions as arguments to parametrize their behavior
  • Laziness: The art of waiting to compute till you can wait no more

Function signatures

\(foo(x: [A], y: B) \rightarrow C\)

  • \(foo\): function name
  • \(x\) and \(y\): Names of function arguments
  • \([A]\) and \(B\): Types of function arguments.
  • \(\rightarrow\): Denotes the return type
  • \(C\): Type of the returned result
  • \([A]\): Denotes that type \(A\) can be traversed

We read this as: Function foo takes as arguments an array/list of type A and an argument of type B and returns an argument of type C

Q: What does the following function signature mean? \(f(x:[A], y:(z: A) \rightarrow B) \rightarrow [B]\)

Side effects

A function has a side effect if it modifies some state outside its scope or has an observable interaction with its calling functions or the outside world besides returning a value.

var max = -1

def greaterOrEqual(a: Int, b: Int): Boolean = {
  if(a >= b) {
      max = a // side effect!
      true
    } else {
      max = b // side effect!
      false
    }
}

As a general rule, any function that returns nothing (void or Unit) does a side effect!

Pure functions

A pure function depends only on its declared inputs and its internal algorithm to produce its output.

  • It does not read any other values
  • It does not have any side effects
def greaterOrEqual(a: Int, b: Int, max: Int): (Boolean, Int) = {
  if(a >= b) (true, a)
  else (false, b)
}

Pure functions offer referential transparency.

Referential transparency

An expression is said to be referentially transparent if it can be replaced by its value and not change the program’s behavior.

Referential transparency enables simpler reasoning about programs.

var base = 10

def plus(a: Int): Int = {
  base + a
}

Q: Is this function referentially transparent?

Examples of side effects

  • Modifying a variable
  • Modifying a data structure in place: In FP, data structures are always persistent.
  • Setting a field on an object: OO is not FP!
  • Throwing an exception or halting with an error: In FP, we use types that encapsulate and propagate erroneous behavior
  • Printing to the console or reading user input, reading writing to files or the screen: In FP, we encapsulate external resources into Monads.

How can we write code that does something useful given those restrictions?

From IP to FP

Writing loops functionally:

  • We use recursive functions instead of loops with mutable iterator variables
  • Tail-recursive functions are optimized by the Scala compiler
  def factorial(n: Int): Int = {
    @annotation.tailrec
    def factorialHelper(n: Int, acc: Int): Int =
      if (n <= 0) acc
      else factorialHelper(n-1, n*acc)

    factorialHelper(n, 1)
  }

From OO to FP

Buying coffee in OO

class Cafe {
  def buyCoffee(cc: CreditCard): Coffee = {
    val cup = new Coffee()
    cc.charge(cup.price)
    cup
  }
}

Can you spot any problems with this code?

. . .

  • charge() performs a side-effect: we need to contact the credit card company!
  • buyCoffee() is not testable

Breaking external dependencies

class Cafe {
  def buyCoffee(cc: CreditCard, p: Payments): Coffee = {
    val cup = new Coffee()
    p.charge(cc, cup.price)
    cup
  }
}

Slightly better option, but:

. . .

  • Payments has to be an interface
  • We still need to perform side effects
  • We need to inspect state within the Payments mock

. . .

Q: How can we buy 10 coffees?

Buying 10 coffees

class Cafe {
  def buyCoffee(cc: CreditCard, p: Payments): Coffee = { ... }

  def buyCoffees(cc: CreditCard, p: Payments, num: Int) = {
    for (i <- 1 to num) {
      buyCoffee(cc, p)
    }
  }
}

Seems to be working, but:

. . .

  • No way to batch payments
  • No way to batch checkouts

Removing side effects

Idea: How about instead of charging in place, we decouple the action of buying coffee from that of charging for it?

class Cafe {
  def buyCoffee(cc: CreditCard): (Coffee, Charge) = {
    val cup = new Coffee()
    (cup, Charge(cc, cup.price))
  }
}

class Charge(cc: CreditCard, amount: Double) {
  def combine(other: Charge) = new Charge(cc, amount + other.amount)
  def pay = cc.charge(amount)
}

Nice! We can now:

  • Test
  • Combine multiple Charges in one
  • Maintain in flight accounts for all customers

Buying 10 functional coffees

class Charge(cc: CreditCard, amount: Double) {
  def combine(other: Charge) = new Charge(cc, amount + other.amount)
  def pay = cc.charge(amount)
}

class Cafe {
  def buyCoffee(cc: CreditCard): (Coffee, Charge) = { ...   }
  def buyCoffees(cc: CreditCard, num: Int): Seq[(Coffee, Charge)] =
    (1 to num).map(buyCoffee(cc))

  def checkout(cc: CreditCard, charges: Seq[Charge]) : Seq[Charge] = {
    charges
      .filter(charge => charge.cc == cc)
      .foldLeft(new Charge(cc, 0.0)){(acc, x) => acc.combine(x)}
      .pay  // <- side-effect, but once, in one place.

    charges.filter(charge => charge.cc != cc)
  }
}

. . .

This example was adapted from the (awesome) FP in Scala book, by Chiusano and Bjarnason

Immutable data structures

Functional data structures are operated on pure functions and they are immutable.

Scala has immutable lists, tuples, maps, and sets.

  val oneTwoThree = List(1, 2, 3)
  val oneTwoThree_2 = 1 :: 2 :: 3 :: Nil
  val one = oneTwoThree.head
  val twoThree = oneTwoThree.tail

Scala has both mutable and immutable versions of many common data structures. If in doubt, use immutable.

Pattern matching on data structures

“Pattern matching” checks a value against a pattern.

It can deconstruct a value into its constituent parts if the match is successful.

  def length(ints: List[Int]): Int = ints match {
    case Nil => 0
    case x :: xs => 1 + length(xs)
  }

. . .

Q: What is the result of the following expression?

  val x = List(1,2,3,4,5) match {
    case x :: 2 :: 4 :: xs => x
    case Nil => 42
    case x :: y :: 3 :: 4 :: xs => x + y
    case h :: t => h
    case _ => 404
  }

Recursion on data structures

  def incrementAll(ints: List[Int]): List[Int] = ints match {
    case Nil => Nil
    case x :: xs => x + 1 :: incrementAll(xs)
  }

. . .

  def doubleAll(ints: List[Int]): List[Int] = ints match {
    case Nil => Nil
    case x :: xs => x * 2 :: doubleAll(xs)
  }

. . .

We can generalize the pattern using a higher-order function.

Higher-order functions

A higher-order function is a function that can take a function as an argument or return a function.

  // Return elements that satisfy f
  def filter(xs: List[A], f: A => Boolean) : List[A]

In the context of BDP, high-order functions capture common idioms of processing data as enumerated elements, e.g. going over all elements, selectively removing elements and aggregating them.

Defining and using higher-order functions

Defining a higher order function, applyToAll:

  def applyToAll[A, B](xs: List[A], f: A => B): List[B] = xs match {
    case Nil => Nil
    case h :: t => f(h) :: applyToAll(t, f)
  }

Using applyToAll to define other functions:

  def incrementAll2(ints: List[Int]): List[Int] = applyToAll(ints, (x:Int) => x + 1)
  def doubleAll2(ints: List[Int]): List[Int] = applyToAll(ints, (x:Int) => x * 2)

. . .

Scala collections has built-in map function:

  def incrementAll3(ints: List[Int]): List[Int] = ints.map((x:Int) => x + 1)
  def doubleAll3(ints: List[Int]): List[Int] = ints.map((x:Int) => x * 2)

Important higher-order functions

map(xs: List[A], f: A => B) : List[B]

Applies f to all elements and returns a new list.

flatMap(xs: List[A], f: A => List[B]) : List[B]

Like map, but flattens the result to a single list.

foldL(xs: List[A], f: (B, A) => B, init: B) : B

Takes f of 2 arguments and an init value and combines the elements by applying f on the result of each previous application. AKA reduce.

Aux higher-order functions

groupBy(xs: List[A], f: A => K): Map[K, List[A]]

Partitions xs into a map of traversable collections according to a discriminator function.

filter(xs: List[A], f: A => Boolean) : List[A]

Takes a predicate and returns all elements that satisfy it

scanL(xs: List[A], f: (B, A) => B, init: B) : List[B]

Like foldL, but returns a list of all intermediate results

zip(xs: List[A], ys: List[B]): List[(A,B)]

Returns an iterable collection formed by iterating over the corresponding items of xs and ys.

Laziness

Laziness is an evaluation strategy which delays the evaluation of an expression until its value is needed.

Allows to separate defining how you evaluate a value from when you actually evaluate it.

lazy val x = { println("Evaluates x "); 1}
val y = { println("Evaluates y"); 2}
x + y
Output:
Evaluates y
Evaluates x 
// does not print:
val a = (1 to 10).iterator.map(print) 

// prints the first 5 items:
val b = (1 to 10).iterator.map(print).take(5).toList 
Output:
12345 

Lazy evaluation is useful for:

  • Separating a pipeline construction from its evaluation
  • Not requiring to read datasets in memory: we can process them in lazy-loaded batches
  • Generating infinite collections
  • Optimizing execution plans
  // Working on a list
  val r1 = (1 to 3)
    .map(x => {
      println("Mapping")
      x + 1
    })
    .filter( x => {
      println("Filtering")
      x % 2 == 0
    })
    .sum
Output:
## Mapping
## Mapping
## Mapping
## Filtering
## Filtering
## Filtering
  // Working on a lazy list
  val r2 = (1 to 3).to(LazyList)
    .map(x => {
      println("Mapping")
      x + 1
    })
    .filter( x => {
      println("Filtering")
      x % 2 == 0
    })
    .sum
Output:
## Mapping
## Filtering
## Mapping
## Filtering
## Mapping
## Filtering

Generating infinite streams:

Scala example:

def fibs: LazyList[Int] = {
  0 #:: 1 #:: fibs.zip(fibs.tail).map{ n =>
    println("Adding " + n._1 + " and " + n._2)
    n._1 + n._2
  }
}
fibs.take(5).foreach(println)

Lazy data pipelines

In tools like Spark and Flink, we always express computations in a lazy manner. This allows for optimizations before the actual computation is executed

# Word count in PySpark
text_file = sc.textFile("words.txt")
counts = text_file \
              .flatMap(lambda line: line.split(" ")) \
              .map(lambda word: (word, 1)) \
              .reduceByKey(lambda a, b: a + b)

counts.saveAsTextFile("results.txt")

Q: In the code above, when will Spark compute the result?

. . .

A: When saveAsTextFile is called!

Monads

Monads are a design pattern that defines how functions can be used together to build generic types.

Practically, a monad is a value-wrapping type that:
* Has an identity function * Has a flatMap function, that allows data to be transferred between monad types

trait Monad[M[_]] {
  def unit[S](a: S) : M[S]
  def flatMap[S, T] (m: M[S])(f: S => M[T]) : M[T]
}

Monads are the tool FP uses to deal with (side-)effects

  • Null points: Option[T]
  • Exceptions: Try[T]
  • Latency in asynchronous actions: Future[T]

Tackling the effect of throwing exceptions

Exceptions break referential transparency.

def failingFn(i: Int): Int = {
  val x: Int = throw new Exception("fail!")
  try { 
    x + 1
  } catch { case e: Exception => -1 }
}

Exceptions are not type-safe and lead to complicated control-flow.

def mean(xs: Seq[Double]): Double =
  if (xs.isEmpty)
  throw new ArithmeticException("mean of empty list!")
  else xs.sum / xs.length

Q: How to handle exceptions?

Solution attempt: Return a predefined bogus value

def mean(xs: Seq[Double]): Double =
  if (xs.isEmpty) return 0
  else xs.sum / xs.length
  • Allows errors to silently propagate
  • Not applicable to polymorphic code
  • Difficult to use the result – requires special policy

Solution: Mapping effects to the type system

Option wraps a value that may be null or undefined. Option[A] can have 2 instances: * Some[A], where A represents the type of the value
* None, if the value is undefined

def mean(xs: Seq[Double]): Option[Double] =
  if (xs.isEmpty) None
  else Some(xs.sum / xs.length)

Combining monads with flatMap

flatMap enables us to join sequences of arbitrary types.

  • Example: Combining Options that wrap undefined values
def getArgument(k: String): Option[Arg]
def processArgument(a: Arg): Option[Result]

getArgument("foo")
  .flatMap(processArgument)
  .getOrElse(new Result("default"))
  • Example: Combining Futures that wrap values that will be eventually available
def callWebService(): Future[R]
def heavyComputation(r: R): Future[V]

val r = callWebService().flatMap(r => heavyComputation2(r))

Dealing with exceptions: Scala

Try wraps a computation that may either result in an exception, or return a successfully computed value.
Try is a type that can have 2 instances:

  • Success[T], where T represents the type of the result
  • Failure[E], where E represents the type of error, usually an exception
object Converter extends App {
  def toInt(a: String): Try[Int] = Try{Integer.parseInt(a)}
  def toString(b: Int): Try[String] = Try{b.toString}

  val a = toInt("4").flatMap(x => toString(x))
  println(a)

  val b = toInt("foo").flatMap(x => toString(x))
  println(b)
}
## Success(4)
## Failure(java.lang.NumberFormatException: For input string: "foo")

Dealing with exceptions: Java

public class Converter {
  public static Integer toInt(String a) throws NumberFormatException {
    return Integer.parseInt(a);
  }
    
  public static String toString(Integer a) throws NullPointerException {
    return a.toString();
  }

  public static void main(String[] args) {
    try {
      Integer five = toInt("5");
      try {
        String str = toString(five);
      } catch(NullPointerException e) {
        //baaah
      }
    } catch(NumberFormatException r) {
        //ooofff
    }
  }
}

Another example: Mapping effects to the type system

object Amazon {
  def login(login: String, passwd: String): Future[Amazon]
}

class Amazon(val user: String) { 
  def search(term: String): Future[Seq[String]]
}

object Main extends App {
  val result = Amazon.login("uname", "passwd") // Might fail
      .flatMap(a => a.search("foo"))
  val resultVal = result.value
  }
}

Q: What would be the type of result?
Q: What would be the type of resultVal?

  • Future[Seq[String]]
  • Try[Seq[String]
  • Option[Try[Seq[String]]]

Enumerating datasets

Before starting to process datasets, we need to be able to go over their contents in a systematic way. The process of visiting all items in a dataset is called traversal.

Big data sets

In a big data system:

  • Client code processes data
  • A data source is a container of data (e.g. array, database, web service)

There are two fundamental techniques for the client to process all available data in the data source

  • Iteration: The client asks the data source whether there are items left and then pulls the next item.

  • Observation: The data source pushes the next available item to a client end point.

D: What are the relative merits of each technique?

Iteration

In the context of BDP, iteration allows us to process finite-sized data sets without loading them in memory at once.

trait Iterator[A] {
  def hasNext: Boolean
  def next(): A
}

Typical usage

val it = Array(1,2,3,4).iterator
while(it.hasNext) {
  val i = it.next + 1
  println(i)
}

The Iterator pattern is supported in all programming languages.

Iteration example

Reading from a file, first in Scala

val data = scala.io.Source.fromFile("/big/data").getLines
while (data.hasNext) {
  println(data.next)
}
// Equivalently...
for (line <- data) {
  println(line)
}

Observation

Observation allows us to process (almost) unbounded size data sets, where the data source controls the processing rate

// Consumer
trait Observer[A] {
  def onNext(a: A): Unit
  def onError(t: Throwable): Unit
  def onComplete(): Unit
}

// Producer
trait Observable[A] {
  def subscribe(obs: Observer[A]): Unit
}

Example usage

  // Example using library rx.lang.scala
  // Having additional method in Observable: def subscribe(onNext: T => Unit)
  Observable.just(1, 2 ,3 ,4 ,5)
    .map(x => x + 1)
    .subscribe(x => println(x))

Iteration and Observation

  • Iteration and Observation are dual
  • The same set of higher-order functions can be used to process data in both cases

Iterator-based map. “Pulls” data out of array.

Array(1,2,3,4).map(x => x + 1) // Scala

Observation-based (reactive) map. Data is “pushed” to it asynchronously, when new data is available.

Observable.just(1,2,3,4,5).map(x => x + 1) // Scala with rx.lang.scala

D: (How) Can we convert between the two types of enumeration?

Traversal

We apply a strategy to visit all individual items in a collection.

  val l = List(1, 2, 3)
  l.foreach(print)

  val m = Map("x" -> 1, "y" -> 2)
  m.foreach(print)

In case of nested data types (e.g. trees/graphs), we need to decide how to traverse. Common strategies include:

  • Breadth-first traversal: From any node A, visit its neighbors first, then its children.
  • Depth-first traversal: From any node A, visit its children first, then its neighbors.

Traversal through iteration

In most programming environments, traversal is implemented by iterators.

  case class Tree(value: String, children: List[Tree])

  def dfs(tree: Tree): Iterator[String] = tree match {
    case Tree(value, Nil) => Iterator(value)
    case Tree(value, children) => children.iterator.flatMap(c => dfs(c)) ++ Iterator(value)
  }

Then, we can iterate using standard language constructs.

  val tree = Tree("a", List(Tree("b", List(Tree("c", Nil), Tree("d", Nil)))))
  dfs(tree).foreach(print)

Operations

Operations are transformations, aggregations or cross-referenceing of data stored in data types. All of container data types can be iterated.

We generally have two types of operations:

Element-wise operations

  • Conversion: Convert values of type \(A\) to type \(B\)
    • Celcius to Kelvin
    • € to $
  • Filtering: Only present data items that match a condition
    • All adults from a list of people
    • Remove duplicates
  • Projection: Only present parts of each data item
    • From a list of cars, only display their brand

Our running example

Suppose we have a list of people; each person is identified by a unique identifier, their age, their height in cm and their gender.

case class Person(id: Int, age: Int, height: Double, weight: Int, gender: String)

Now, let’s create 1000 random people!

  val genders = List("Male", "Female", "Other")
  val rand = new Random(42)

  val people = (1 to 1000).map(i => Person(i,
    age = rand.between(10, 80),
    height = rand.between(60, 200),
    weight = rand.between(40, 120),
    gender = genders(rand.between(0, 3))))
    .toList
## Person(1000,50,102,58,Female)

Conversion or Mapping

To convert values, we traverse a collection and apply a conversion function to each individual element. This is generalized to the \(map\) function:

\(map(xs: [A], f: (A) \rightarrow B) \rightarrow [B]\)

Let’s convert the persons’ heights to meters

  def toMeters(person: Person): Person = person.copy(height = person.height * 1.0 / 100)

  val peopleInMeters = map(people, p => toMeters(p))

or using a lambda function:

  val peopleInMeters = map(people, (p: Person) => p.copy(height = p.height * 1.0 / 100))

Note: Scala List has built-in implementations for most of the functions we discuss in this lecture.

  // definition in scala.collection.immutable.List
  def map[B](f: (A) => B): List[B]
  // example usage of the built-in function
  val peopleInMeters = people.map(p => p.copy(height = p.height * 1.0 / 100))

Projection

Projection allows us to select parts of a Tuple, Relation or other nested data type for further processing. To implement this, we need to iterate over all items of a collection and apply a conversion function.

  val heightsAndWeights = map(people, (p: Person) => (p.height, p.weight))
  val weights = map(people, (p:Person) => p.weight)

Filtering

To filter values from a list, we traverse a collection and apply a predicate function to each individual element.

\(filter(xs: [A], f: (A) \rightarrow Boolean): [A]\)

val tallPeople = filter(people, p => p.height > 180)

Q: How can we implement filter?

. . .

  def filter(xs: List[Person], pred: Person => Boolean): List[Person] = xs match {
    case Nil => Nil
    case h :: t if pred(h) => h :: filter(t, pred)
    case h :: t => filter(t, pred)
  }

. . .

Filter on a generic class:

  def filter[T](xs: List[T], pred: T => Boolean): List[T] = xs match {
    case Nil => Nil
    case h :: t if pred(h) => h :: filter(t, pred)
    case h :: t => filter(t, pred)
  }

Aggregations

Aggregations apply a combining operator on a traversable sequence to aggregate the individual items into a single result.

Aggregation is implemented using reduction or folding. Two variants exist: left reduction (or folding) and right reduction (or folding).

  • With left reduction/folding, we traverse items from the first to last
  • With right reduction/folding, we traverse items from the last to first

Example: Calculate total weight

Our task is to calculate the total weight for our people list. To do so, we need to iterate over the list and add the individual weights. First, we do it with an imperative approach:

  var totalWeight = 0

  weights.foreach(w => {
    totalWeight = totalWeight + w
  })

  println(totalWeight)

We notice that iteration and reduction are independent. What if we abstract iteration away?

Folding left

Folding functions generate a single value from a collection of data.

\(foldLeft(xs: [A], init: B, f: (acc: B, x: A) \rightarrow B) \rightarrow B\)

Left fold takes a function f with 2 arguments and an initial value and applies f on all items, starting from the left most. f combines the result of its previous application with the current element.

  def totalWeight(acc: Double, p: Person): Double = // acc is for accumulator
    acc + p.weight

Then we can calculate the total weight of all people in our collection as follows

val total = foldLeft(people, 0, (acc: Double, p: Person) => totalWeight(acc, p))
// or equivalently
val total = foldLeft(people, 0, (acc: Double, p: Person) => acc + p.weight)

Folding right

\(foldRight(xs: [A], init: B, f: (x: A, acc: B) \rightarrow B) \rightarrow B\)

Right fold takes a function f with 2 arguments and an initial value and applies f on all items, starting from the right most. f combines the result of its previous application with the current element.

val total = foldRight(people, 0, (p: Person, acc: Double) => (p.weight + acc))

foldRight and foldLeft: differences

To see how foldRight and foldLeft are evaluated, we define a reduction function that accumulates the intermediate steps as a string.

def reduce_str(acc: String, x: String): String = "(" + acc + " + " + x + ")"

How does foldLeft work?

println(foldLeft(List(1,2,3,4,5).map(_.toString), "0", reduce_str))
## (((((0 + 1) + 2) + 3) + 4) + 5)

How does foldRight work?

println(foldRight(List(1,2,3,4,5).map(_.toString), "0", reduce_str))
## (1 + (2 + (3 + (4 + (5 + 0)))))

Q: Can we always apply foldLeft instead of foldRight?

foldRight != foldLeft

The answer to the previous question is: it depends on whether the reduction operation is commutative and associative.

An op \(\circ\) is commutative iff \(x \circ y = y \circ x\).

An op \(\circ\) is associative iff \(x \circ (y \circ z) = (x \circ y) \circ z\).

We can see that if we choose a non-commutative operator, reduceL and reduceR produce different results.

val resultLeft = foldLeft(List(1,2,3,4,5), 0, (x:Int, y:Int) => x - y)

val resultRight = foldRight(List(1,2,3,4,5), 0, (x:Int, y:Int) => x - y)

Q: What are the values of resultLeft and resultRight?

reduceRight and reduceLeft

Similar to foldRight and foldLeft, reduce functions generate a single value from a list of sequence starting from right or left.

Therefore, reduceRight != reduceLeft

Different from folding, reduce operations:

  • Do not take an initial value, the initial value of the accumulator is the first value in the collection.
  • The return type B must be a subtype of A.
val weights = people.map(_.weight)
val totalWeightLeft = weights.reduceLeft(_ + _)
val totalWeightRight = weights.reduceRight(_ + _)

Counting elements

The simplest possible aggregation is counting the number of elements, possibly matching a condition

\(count(xs: [A], pred): Integer\)

def count(list: List[Int], f: Int => Boolean) =  filter(list,f).length

Q: How can we implement this with folding?

Distinct elements

Given a sequence of items, produce a new sequence with no duplicates.

\(distinct(xs: [A]): [A]\)

val a = List(1,2,2,3)
a.distinct
a.toSet
## List(1, 2, 3)
## Set(1, 2, 3)

Distinct assumes that items have unique identities (e.g., hashCode() in Java).

Numerical aggregations

Aggregation functions have the following generic signature:

\(f: [A] \rightarrow Number\)

Their job is to reduce sequences of elements to a single measurement. Some examples are:

  • Mathematical functions: min, max, count
  • Statistical functions: mean, median, stdev

Grouping

Grouping splits a sequence of items to groups given a classification function.

\(groupBy(xs:[A], f: A \rightarrow K): Map[K, [A]]\)

  def groupBy(xs: List[Int], classifier: Int => String): Map[String, List[Int]] = {
    groupByHelper(xs, classifier, Map())
  }

  private def groupByHelper(xs: List[Int], classifier: Int => String, groups: Map[String, List[Int]]): Map[String, List[Int]] = xs match {
    case Nil => groups
    case h :: t =>
      val key: String = classifier(h)
      val value: List[Int] = h :: groups.getOrElse(key, List[Int]())
      groupByHelper(t, classifier, groups + ((key,  value)))
  }
def number_classifier(x: Int): String = 
  if (x % 2 == 0) "even"
  else "odd"

val list = List(1, 2, 3, 4, 5, 6, 7)
print(groupBy(list, number_classifier))
## Map(odd -> List(7, 5, 3, 1), even -> List(6, 4, 2))

Aggregation example

How can we get the average height per gender (adults only) in our list of people?

  val adults = people.filter(_.age > 18)
  val adultsByGender = adults.groupBy(_.gender)
  val heightsByGender = adultsByGender.map(x => (x._1, x._2.map(_.height)))
  val avgHeightByGender = heightsByGender.map(x => (x._1, x._2.sum / x._2.size))
## HashMap(Other -> 135.0, Female -> 130.0, Male -> 132.0)

Q: What are the types of each variable?

The above is equivalent to the following SQL expression

select gender, mean(height)
from people
where age > 18
group by gender

D: What are the relative strengths and weaknesses of each representation?

Key-value pairs

KV databases / systems

KV stores is the most common format for distributed databases.

What KV systems enable us to do effectively is processing data locally (e.g. by key) before re-distributing them for further processing. Keys are naturally used to aggregate data before distribution. They also enable (distributed) data joins.

Typical examples of distributed KV stores are Dynamo, MongoDB and Cassandra

Keys and Values

The most common data structure in big data processing is key-value pairs.

  • A key is something that identifies a data record.
  • A value is the data record. Can be a complex data structure.
  • The KV pairs are usually represented as sequences
List( // Scala
    ("EWI", ("Mekelweg", 4)),
    ("TPM", ("Jafaalaan", 5)),
    ("AE", ("Kluyverweg", 1))
  )

Typical operations for KV collections

mapValues: Transform the values part

\(mapValues(kv: [(K,V)], f: V\rightarrow U): [(K,U)]\)

. . .

groupByKey: Group the values for each key into a single sequence.

\(groupByKey(kv: [(K,V)]) : [(K, [V])]\)

. . .

reduceByKey: Combine all elements mapped by the same key into one

\(reduceByKey(kv: [(K,V)], f: (V,V) \rightarrow V) : [(K, V)]\)

. . .

join: Return a sequence containing all pairs of elements with matching keys

\(join(kv1: [(K,V)], kv2: [(K,W)]) : [(K, (V,W))]\)

Joining datasets

Suppose we have a dataset of addresses

case class Addr(k: String, street: String, num: Int)
val addr = List(
  Addr("EWI", "Mekelweg", 4),
  Addr("EWI", "Van Mourik Broekmanweg", 6),
  Addr("TPM", "Jafaalaan", 5),
  Addr("AE",  "Kluyverweg", 1)
)

and a dataset of deans

case class Dean(k: String, name: String, surname: String)
val deans = List(
  Dean("EWI", "John", "Schmitz"),
  Dean("TPM", "Hans", "Wamelink")
)

Q: Define a method deanAddresses to retrieve a list of address of each dean.

First attempt

def deanAddresses : List[(Dean, List[Addr])] =
  deans.map {d => (d, addr.filter(a => a.k == d.k))}

In practice, we get the following results

// EEEWWWW
List(
  (Dean(EWI,John,Schmitz), List(
    Addr(EWI,Mekelweg,4),
    Addr(EWI,Van Mourik Broekmanweg,6))
  ),
  (Dean(TPM,Hans,Wamelink),List(
    Addr(TPM,Jafaalaan,5))
  )
)

This is OK, but a more practical result would be a List[(Dean, Addr)]. For this, we need to flatten the internal sequence.

flatMap: Map and flatten in one step

\(flatMap(xs: [A], f: A \rightarrow [B]): [B]\)

flatMap enables us to combine two data collections and return a new collection with flattened values.

def deanAddresses2: List[(Dean, Addr)] =
  deans.flatMap(d =>
    addr.filter(a => a.k == d.k)
     .map(a => (d, v)))
## List((Dean(EWI,John,Schmitz),Addr(EWI,Mekelweg,4)), 
##       (Dean(EWI,John,Schmitz),Addr(EWI,Van Mourik Broekmanweg,6)),
##       (Dean(TPM,Hans,Wamelink),Addr(TPM,Jafaalaan,5)))

. . .

In Scala, flatMap is special

def deanAddresses2: List[(Dean, Addr)] = {
  for (
    d <- deans
    v <- addr.filter(a => a.k == d.k)
  ) yield (d, v)
}

KV pairs and SQL tables

A KV pair is an alternative form of a relation, indexed by a key. We can always convert between the two.

val relation : Set[Tuple3[Int, Int, String]] = ???

Convert the above relation to a KV pair.

val kvpair : Set[Tuple2[Int, Tuple2[Int, String]]] =
  relation.map(x => (x._1, (x._2, x._3)))

Convert the KV pair back to a relation.

val relation2: Set[Tuple3[Int, Int, String]] =
  kvpair.map(x=> (x._1, x._2._1, x._2._2))

This means that any operation we can do on relations, we can also do on KV pairs.

More on Immutability

Big data is immutable

One of the key characteristics of data processing is that data is never modified in place. Instead, we apply operations that create new versions of the data, without modifying the original version.

Immutability is a general concept that expands in much of the data processing stack.

Immutability across the stack

Immutability
Immutability

Image from Helland in ref [1]

Copy-On-Write (COW)

Copy-On-Write is a general technique that allows us to share memory for read-only access across processes and deal with writes only if/when they are performed by copying the modified resource in a private version.

COW is the basis for many operating system mechanisms, such as process creation (forking), while many new filesystems (e.g. BTRFS, ZFS) use it as their storage format.

COW enables systems with multiple readers and few writers to efficiently share resources.

Immutable data structures

Immutable or persistent data structures always preserve the previous version of themselves when they are modified [2].

With immutable data structures, we can:

  • Avoid locking while processing them, so we can process items in parallel
  • Maintain and share old versions of data
  • Reason about changes in the data
  • Safe hash table keys

They come at a cost of increased memory usage (data is never deleted).

Example: immutable tree

A tree
A tree
The tree after addition
The tree after addition

Data structures in Scala

ADT collection.mutable collection.immutable
Array ArrayBuffer Vector
List LinkedList List
Map HashMap HashMap
Set HashSet HashSet
Queue SynchronizedQueue Queue
Tree TreeSet

Bibliography

[1]
P. Helland, “Immutability changes everything,” Queue, vol. 13, no. 9, p. 40, 2015.
[2]
C. Okasaki, Purely functional data structures. Cambridge University Press, 1999.
[3]
G. Hutton, “A tutorial on the universality and expressiveness of fold,” Journal of Functional Programming, vol. 9, no. 4, pp. 355–372, 1999.