“We are witnessing a unprecedented growth of interconnected data, which underscores the vital role of graph processing in our society.”
Graphs and other forms of hierarchical data structures appear every time a system models a dependency relationship. Common big graphs are:
A graph (\(G\)) comprises nodes (\(V\)) and edges (\(E\)). Both can carry metadata. We represent graphs as:
List[(V, List[V])]
where each tuple represents a node and its connections to other nodes.List[(V, V)]
of node pairs that represents and edgeGraph components: Subgraphs in which any two vertices are connected to each other by paths.
Strongly connected component: The largest sub-graph with a path from each node to every other node
Triangles or polygons: A triangle occurs when one vertex is connected to two other vertices and those two vertices are also connected.
Spanning trees: A sub-graph that contains all nodes and the minimum number of edges
To process graphs, we can:
For really big graphs, our options are somewhat limited
Not all applications need to process billions of nodes and trillions of edges. For small to medium sized graphs (< 500M edges), existing tools can go a long way.
CREATE TABLE nodes (
id INTEGER,
...
metadata
)
CREATE TABLE edges (
INTEGER,
src INTEGER,
target ...,
metadata CONSTRAINT src_fkey
FOREIGN KEY (src) REFERENCES nodes(id),
CONSTRAINT target_fkey
FOREIGN KEY (target_id) REFERENCES nodes(id)
)
We model graphs as node pairs. Nodes and edges have metadata.
WITH RECURSIVE transitive_closure (a, b, distance, path) AS
(SELECT a, b, 1 as distance, a || '->' || b AS path
FROM edges
UNION ALL
SELECT tc.a, e.b, tc.distance + 1, tc.path || '->' || e.b
FROM edges e
JOIN transitive_closure tc ON e.a = tc.b
WHERE a.metadata = ... -- Traversal filters on nodes/edges
and tc.path NOT LIKE '%->' || e.b || '->%'
)SELECT * FROM transitive_closure
Recursive queries have a starting clause that is called on and a recursion clause
Given that we (blue node) are direct friends with the yellow nodes, we could recommend second level (red) friends as potential new connections.
WITH RECURSIVE transitive_closure (a, b, distance, path) AS
(-- Find the yellow nodes
SELECT a, b, 1 as distance, a || '->' || b AS path
FROM edges
WHERE a = src -- the blue node
UNION ALL
-- Find the red nodes
SELECT tc.a, e.b, tc.distance + 1, tc.path || '->' || e.b
FROM edges e
JOIN transitive_closure tc ON e.a = tc.b
WHERE tc.path NOT LIKE '%->' || e.b || '->%'
AND tc.distance < 2 -- don't recurse into white nodes
)SELECT a, b FROM transitive_closure
GROUP BY a, b
HAVING MIN(distance) = 2 -- only report red nodes
The base expression will find all directly connected nodes, while the second will recurse into their first level descendants.
Graph databases are specialized RDBMs for storing recursive data structures and support CRUD operations on them, while maintaining transactional consistency (ACID or otherwise).
The most commonly used language for graph databases is Cypher, the base language for Neo4J.
:Person {name:'Emil’})
(emil <-[:KNOWS]-(jim:Person {name:'Jim’})
-[:KNOWS]->(ian:Person {name:'Ian’})
-[:KNOWS]->(emil)
Find mutual friends of a user named Jim:
:Person {name:'Jim'})-[:KNOWS]->(b:Person)-[:KNOWS]->(c:Person), (a)-[:KNOWS]->(c)
MATCH (aRETURN b, c
Graphs are an inherently recursive data structures, which means that computations may have dependencies to previous computation steps (and thus they are not trivially parallelizable).
Traditional frameworks are not well-suited for processing graphs.
PageRank is a centrality measure based on the idea that nodes are important if multiple important nodes point to them. For node \(p_i\), its Page rank is recursively defined as
\[ PR(p_i; 0) = \frac{1}{N} \\ PR(p_i; t+1) = \frac{1-d}{N} + d \sum_{p_j \in M(p_i)} \frac{PR (p_j; t)}{L(p_j)} \]
where \(d\) is a damping factor (usually set 0.85) and \(M(p_i)\) are the nodes pointing to \(p_i\) We notice that each node updates other nodes by propagating its state.
val links: RDD[(V,List(E))] = ....cache()
var ranks = links.mapValues(v => 1.0)
for (i <- 1 to iters) {
val contribs = links.join(ranks).values.flatMap {
case (links, rank) =>
val size = links.size
.map(url => (links, rank / size))
links}
= contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
ranks }
The computation is iterative and side-effecting and therefore non-parallelizable. To make it side-effect free, we need to write each step of the computation to external storage.
The BSP model is a general model for parallel algorithms.
It assumes that a system has:
BSP computation is organized in supersteps. A superstep comprises three phases:
Pregel (by Google) is a distributed graph processing framework.
Pregel computations impose a BSP structure on program execution:
Open source implementations: Apache Giraph and GraphX.
Pregel is a vertex-centric graph processing model: the algorithm iterates over vertices
BSP programs run until the programs stop themselves. Termination works as follows
Graphs are stored as adjacency lists, partitioned (using hash partitioning) and distributed using a network filesystem
Leader: Maintains a mapping between data partitions and cluster node. Implements the BSP barrier
Worker: For each vertex, it maintains the following in memory:
The worker applies all computationally intensive operations.
GraphX is a new component in Spark for graphs and graph-parallel computation. It provides a variant of Pregel’s API for developing vertex-centric algorithms.
Spark uses its underlying fault tolerance, check pointing, partitioning and communication mechanisms to store the graph. Halting is determined by examining if the vertex is sending / receiving messages.
GraphX allows for operating on the underlying data structures as both as a graph using graph concepts and processing primitives, and also as separate collections of edges and vertices that can be transformed using fp primitives.
class Graph[VD, ED] {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
. . .
}
def pregel[A](
// Initialization message
: A,
initialMsg// Max super steps
: Int = Int.MaxValue,
maxIter: EdgeDirection = EdgeDirection.Out,
activeDir// Program to update the vertex
: (VertexId, VD, A) => VD,
vprog// Program to determine edges to send a message to
: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
sendMsg// Program to combine incoming messages
: (A, A) => A
mergeMsg) : Graph[V, E]
val pagerankGraph: Graph[Double, Double] = graph
.mapVertices((id, attr) => 1.0) // Initial Pagerank for nodes
def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
+ (1.0 - resetProb) * msgSum
resetProb def sendMessage(id: VertexId, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
Iterator((edge.dstId, edge.srcAttr * edge.attr))
def messageCombiner(a: Double, b: Double): Double = a + b
val initialMessage = 0.0
// Execute Pregel for a fixed number of iterations.
Pregel(pagerankGraph, initialMessage, numIter)(
, sendMessage, messageCombiner) vertexProgram
From Pregel.scala in Apache Spark
The fault tolerance model is reminiscent of Spark.
Periodically, the leader instructs the workers to save the state of their in-memory data to persistent storage
Worker failure detected through keep-alive messages the leader issues to workers
In case of failure, the leader reassigns graph partitions to live workers; they reload their partition state from the most recently available checkpoint
Ongoing research for: Abstractions, ecosystems, and performance