owned this note
owned this note
Published
Linked with GitHub
# Big data. GraphX lab
## Theoretical part
Main goals of this part are:
* become familiar with Graph Data Bases (graph DBs)
* understand index-free adjasency
* pros and cons of Relational DBs and Graph DBs
* learn basic functions of GraphX
* learn Pregel
* learn PageRank
Please, do not blindly copypaste!
---
Ironically, legacy relational database management systems (RDBMS) are poor at handling large volumes of data. Relational databases are great when schema is predetermined and fixed. However, sometimes we want to store too many different types of objects in our database, and it becomes simply infeasible to design a schema for every type of object. This is when graph databases become useful.
<details>
<summary>Example: Schema</summary>
<p>
<i>The term relation schema refers to a heading paired with a set of constraints defined in terms of that heading.</i>
Creating a table with a specified schema in SQL:
</p>
<p>
```sql
CREATE TABLE List_of_people (
ID INTEGER,
Name CHAR(40),
Address CHAR(200),
PRIMARY KEY (ID)
)
```
</p>
</details>
In order to leverage data relationships, organizations need a database technology that stores relationship information as a first-class entity. That technology is a graph database.
Most of NoSQL databases (and hence Graph databases) were designed at time when *horizontal scaling* problem were described and researched whell. Some of SQL databases were adopted for horizontal scaling as well (such as MySQL - Amazon RDS).
### Why relational databases are not enough?

*Source: [Wikipedia](https://en.wikipedia.org/wiki/Graph_database)*
Consider a database with vertexes and relations between them. In many algorithms we need to traverse relationhip edges or explore a node neighborhood. Both of the tasks are hard and inefficient to compute if we store the data as a relational database that support only SQL queries. First, SQL queries were not designed to handle iterative computation problems. For example, a query "does path between node A and node B exist" requires iterative traversal between nodes. Second, relational databases are not created to ensure locality of connected data. It is very important to locate clusters of connected nodes on the same machine. A lot of social network tasks asympotically optimal to solve using graph DBs.
Reason in graph DB approach or deffinition:
"A graph database is any storage system that provides index-free adjacency" (c) Marko Rodriguez [[source](https://www.slideshare.net/slidarko/problemsolving-using-graph-traversals-searching-scoring-ranking-and-recommendation/47-Dening_a_Graph_Database_A), slides 47-57] - [Gremlin](https://tinkerpop.apache.org/gremlin.html) contributor and researcher.
Hence, such queries as "get all friends of John" (~"get set of neighbours for node John") have lower complexity (in rough order, O(1) instead of O(N)).
#### What index-free adjacency means?
Technically it means that connected elements are **linked together without using an index** to avoid expensive operations (such as *join*). Enought knowledges are in the source [[5](https://www.scitepress.org/papers/2018/68269/68269.pdf)].
NB: index-free adjacency not implies absence of index at all!!!
To apply changes, Graph DB need to reboot system: deletion from graph not frees memory - need restart of DBMS (remind [BASE acronym](https://stackoverflow.com/questions/3342497/explanation-of-base-terminology)).
## Head start in Apache Spark GraphX
GraphX is a component for graphs and graph-parallel computation. GraphX reuses Spark RDD concept, simplifies graph analytics tasks, provides the ability to make operations on a directed multigraph with properties attached to each vertex and edge.
Your goal for today is become familiar with GraphX. You will run provided examples, discuss how parallel graph processing uses message passing. At the end you will become familiar with PageRank - the easiest algorithm on graphs for parallel computing.
Let's start with console example - the easiest way to practice quickly:
```bash
# Assumed that you included path to spark/bin in your $PATH
spark-shell --master local[2]
```
Read logs. If there are something "java like" error traces, fix it.
<details>
<summary>
<i>Error example </i>
</summary>
<p>Such an errors mean that environment is not ok. As implication of it, you can not use Spark sensistive imports and objects. Example of **sc** (SparkContext) object initialization error:
<img src="https://i.imgur.com/nqWjIRa.png"/> </p>
<p>How to check is everything ok:</p>
<img src="https://i.imgur.com/Yusczhs.png"/>
</details>
---
#### Declare our first graph

```scala
import org.apache.spark.graphx._
val myVertices = sc.makeRDD(Array((1L, "Ann"), (2L, "Bill"),
(3L, "Charles"), (4L, "Diane"), (5L, "Went to gym this morning")))
val myEdges = sc.makeRDD(Array(Edge(1L, 2L, "is-friends-with"),
Edge(2L, 3L, "is-friends-with"), Edge(3L, 4L, "is-friends-with"),
Edge(4L, 5L, "Likes-status"), Edge(3L, 5L, "Wrote-status")))
val myGraph = Graph(myVertices, myEdges)
myGraph.vertices.foreach(println(_))
// res1: Array[(org.apache.spark.graphx.VertexId, String)] = Array((4,Diane), (2,Bill), (1,Ann), (3,Charles), (5,Went to gym this morning))
// Try this by yourself:
myGraph.edges.foreach(println(_))
myGraph.degrees.foreach(println(_))
myGraph.inDegrees.foreach(println(_))
myGraph.outDegrees.foreach(println(_))
```
Spark console allow you to preview fields and methods of object: just type **sc.** or **myGraph.** and press TAB.

---
## Graph transformation methods:
### Map functions
**mapEdges**, **mapVertices** and **mapTriplets** return new Graph object (with modifications). Each method maps given in parameters function on each element (edge, vertex or triplet) and modify it (in case of mapTriplet, result stores in edge attributes)
```scala
val tmp = myGraph.mapEdges(e => e.attr == "is-friends-with")
tmp.edges.foreach(println(_))
// Edge(1,2,true)
// Edge(2,3,true)
// Edge(3,4,true)
// Edge(3,5,false)
// Edge(4,5,false)
```
See [official documentation](https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala) for exhaustive information on basic map functions.
<details>
<summary>
<i>Triplet</i>
</summary>
<p>is a tuple of two vertices and edge's attribute:
<img src="https://i.imgur.com/NiVwvkb.png"/> </p>
<img src="https://i.imgur.com/nCiHsFM.png"/>
</details>
---
### aggregateMessages
Applies function *sendMsg* for each node and produces message by this action. Each message has direction (edge has 2 nodes - you specify which one is destination). Received message updates values in node by specified *mergeMsg* function.
```scala
def aggregateMessages[Msg](
sendMsg: EdgeContext[VD, ED, Msg] => Unit, // argument
mergeMsg: (Msg, Msg) => Msg // argument
) : VertexRDD[Msg] // return type
```
#### EdgeContext
This class is the same as *EdgeTriplet* (hence it keeps information about source, destination nodes and about edge value that you can use in sendMsg function) class but additionally has methods *sendToSrc*, *sendToDst*.
<details>
<summary>
methods signatures
</summary>
<a href="https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/graphx/EdgeContext.html">source</a>
<img src="https://i.imgur.com/54TSEy9.png"/>
</details>
#### sendMsg
is a function that accepts object of *EdgeContext* type and returns nothing. Here you perform all you needed transforms for data - resulting object acts as container for data.
#### mergeMsg
All the messages for each vertex are collected together and delivered to the mergeMsg method. This method defines how all the messages for the vertex are reduced down to the answer we’re looking for.
As usual, this function must be associative, commutative.
#### Usage example
Following code counts the out-degree of each vertex—for each vertex, the count of edges leaving the vertex.
```scala
myGraph.aggregateMessages[Int](_.sendToSrc(1),
_ + _).join(myGraph.vertices).foreach(println(_))
// Or better formatted variant:
myGraph.aggregateMessages[Int](_.sendToSrc(1),
_ + _).rightOuterJoin(myGraph.vertices).map(
x => (x._2._2, x._2._1.getOrElse(0))).foreach(println(_))
```
**sendMsg** function

**mergeMsg** function

---
### [Pregel in Spark](https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala)
When Google implemented its graph processing framework, [Pregel](http://www.dcs.bbk.ac.uk/~dell/teaching/cc/paper/sigmod10/p135-malewicz.pdf), it used the principles behind [Bulk Synchronous Parallel](http://albert-jan.yzelman.net/education/parco14/A2.pdf) (BSP) processing. Google’s Pregel is the inspiration for Spark’s own Pregel API.
```scala
def pregel[A] (
initialMsg: A,
maxIter: Int = Int.MaxValue,
activeDir: EdgeDirection = EdgeDirection.Out
) // first group of parameters - Scala feature, you know
(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A
) : Graph[VD, ED] // return type
```
#### pregel review:
* Good for propagation algorithms (such as [PageRank](https://en.wikipedia.org/wiki/PageRank))
* Has 2 convergence critera (no new messages sent during iteration, reached maximum iteration)
#### Usage example
```scala
val g = Pregel(
graph = myGraph.mapVertices((vid,vd) => 0),
initialMsg = 0,
maxIterations = Int.MaxValue,
activeDirection = EdgeDirection.Out
)(
vprog = (id:VertexId,vd:Int,a:Int) => math.max(vd,a),
sendMsg = (et:EdgeTriplet[Int,String]) =>
Iterator((et.dstId, et.srcAttr+1)),
mergeMsg = (a:Int,b:Int) => math.max(a,b)
)
g.vertices.foreach(println(_))
```
---
#### Let's compare 3 approaches on concrete single task
Task: let's update nodes with rule: if node has relation "is-friends-with", then mark it as "has friend".
<details>
<summary>
<i>map approach spoiler</i>
</summary>
<p>It easy to do with triplets:</p>
<pre><code>myGraph.mapTriplets(t=>t.attr=="is-friends-with").edges.foreach(println(_))}</code></pre>
<p>But to update vertices using edge's information we need to do some steps:</p>
<pre><code>val friendlyVertices = myGraph.edges.filter(_.attr=="is-friends-with").map(_.srcId).collect.toList
myGraph.mapVertices((v,s) => friendlyVertices.contains(v)).vertices.foreach(println(_))</code></pre>
</details>
<details>
<summary>
<i>aggregateMessages spoiler</i>
</summary>
<pre><code>myGraph.aggregateMessages[Boolean](c => c.sendToSrc(c.attr == "is-friends-with"), (a, b) => a || b)</code></pre>
<p>Note: <br/>
1) Function returns VertexRDD, if we need Graph, we should create it again using new Vertices and old Edges <br/>
2) Vertices without edges are absent at all in result of funciton (Hint: use join(), example in reference book [1, p. 71] or same code in this tutorial)</p>
</details>
<details>
<summary>
<i>pregel spoiler</i>
</summary>
<pre><code>Pregel(graph = myGraph.mapVertices((vid,vd) => false),
initialMsg = false,
maxIterations = 1,
activeDirection = EdgeDirection.In
)(
vprog = (id,vd,a) => vd || a,
sendMsg = et =>
Iterator((et.srcId, et.attr=="is-friends-with")),
mergeMsg = (a,b) => a||b
).vertices.foreach(println(_))</code></pre>
<p>Seems like using AK-47 vs birds</p>
</details>
Which one is the most relevant?
### PageRank

PageRank is the first algorithm used by Google to order search results. PageRank works by counting the number and quality of links to a page to determine a rough estimate of how important the website is. The underlying assumption is that more important websites are likely to receive more links from other websites.
Reference to formula and details on [wiki](https://en.wikipedia.org/wiki/PageRank) (part of the lab excercise).
----
## Graded part:
1) Using code [docks](https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala), provide your explanation in for all parameters of ```pregel``` function
2) Provide 2 examples of aplying GraphX for processing big data (which tasks could be solved).
3) **UPDATED** For current topology, the most popular node (by PageRank metric) is #5 ("Went to gym this morning"). Change graph structure such that in new graph the most popular node will be #1 (Ann). Provide pictures with console output of ``` myGraph.pageRank(0.001).vertices.foreach(println)``` before and after modification. Describe (1-2 sentences) how you achieved it.
## Additional reading
1. [GraphX in Action](https://drive.google.com/file/d/1wztlOHrLihsWNGLnsLfFlVeFWXAoHnB-/view?usp=sharing)
2. Neo4j (NoSQL Graph DB) [binding with Spark](https://neo4j.com/developer/apache-spark/), [usage example](https://medium.com/data-science-school/practical-apache-spark-in-10-minutes-part-7-graphx-and-neo4j-b6b01cffa4fd)
3. [Comparance of RDBMS and Neo4j in GIS production (rus)](https://www.slideshare.net/profyclub_ru/ss-27999513?ref=https://techno.2gis.ru/lectures/7)
4. [Spark and GraphX coupling. Not works with 2.3.2 version of Spark.](https://medium.com/data-science-school/practical-apache-spark-in-10-minutes-part-7-graphx-and-neo4j-b6b01cffa4fd)
5. [Paper where described concepts of Graph DBs](https://www.scitepress.org/papers/2018/68269/68269.pdf)
6. [(rus) Базы данных. Графы и их хранение на примере Neo4J](https://youtu.be/78ucMUzdp5c?t=1021)
7. [Lecture notes of Gremlin and Tinkerpop developer Marko Rodriguez](https://markorodriguez.com/lectures/)