Try   HackMD

[Paper] Graphchi: Large-scale graph computation on just a PC

tags: research-GraphRC

PPT slides

Google Slides

Graph processing problems

  • Social networks
  • Web graphs
  • Protein interaction graphs

Why graph processing is hard ?

  • Hard to divide and conquer this problem
    • Partitioning the graph across cluster nodes
    • Finding efficient graph cuts that minimize communication between nodes, and are also balanced, is hard [27]
    • [27] Community Structure in Large Networks: Natural Cluster Sizes and the Absence of Large Well-Defined Clusters
  • Indeed, many real-world graphs have a substantial amount of inherent locality. For example, webpages are clustered under domains, and people have more connections in social networks inside their geographical region than outside it.
  • Unfortunately, the locality of real-world graphs is limited, because the number of edges crossing local clusters is also large.
  • Graph may be dynamic evolving.
  • CANNOT be readily decomposed into smaller parts that can be processed in parallel
    • MapReduce is inefficient for graph processing [13,30,31]

Vertex-centric model definition

  • Directed sparse graph
    G=(V,E)
  • Associate a value with each vertex
    vV
    , and each edge
    e=(src,dest)E
  • Given an edge
    e=(u,w)
    ,
    e
    is
    u
    's out-edge,
    w
    's in-edge
  • 
    
    
    
    
    
    G
    
    
    
    u
    
    u
    
    
    
    w
    
    w
    
    
    
    u->w
    
    
    e
    
    
    
    
  • Example: PageRank Algorithm
    • Algorithm explained by Youtube link
    • Code
      ​​​​void update(graphchi_vertex<VertexDataType, EdgeDataType> &v, graphchi_context &ginfo) {
      ​​​​    float sum=0;
      ​​​​    if (ginfo.iteration == 0) {
      ​​​​        /* On first iteration, initialize vertex and out-edges. 
      ​​​​           The initialization is important,
      ​​​​           because on every run, GraphChi will modify the data in the edges on disk. 
      ​​​​         */
      ​​​​        for(int i=0; i < v.num_outedges(); i++) {
      ​​​​            graphchi_edge<float> * edge = v.outedge(i);
      ​​​​            edge->set_data(1.0 / v.num_outedges());
      ​​​​        }
      ​​​​        v.set_data(RANDOMRESETPROB); 
      ​​​​    } else {
      ​​​​        /* Compute the sum of neighbors' weighted pageranks by
      ​​​​           reading from the in-edges. */
      ​​​​        for(int i=0; i < v.num_inedges(); i++) {
      ​​​​            float val = v.inedge(i)->get_data();
      ​​​​            sum += val;                    
      ​​​​        }
      ​​​​        
      ​​​​        /* Compute my pagerank */
      ​​​​        float pagerank = RANDOMRESETPROB + (1 - RANDOMRESETPROB) * sum;
      ​​​​        
      ​​​​        /* Write my pagerank divided by the number of out-edges to
      ​​​​           each of my out-edges. */
      ​​​​        if (v.num_outedges() > 0) {
      ​​​​            float pagerankcont = pagerank / v.num_outedges();
      ​​​​            for(int i=0; i < v.num_outedges(); i++) {
      ​​​​                graphchi_edge<float> * edge = v.outedge(i);
      ​​​​                edge->set_data(pagerankcont);
      ​​​​            }
      ​​​​        }
      ​​​​            
      ​​​​        /* Keep track of the progression of the computation.
      ​​​​           GraphChi engine writes a file filename.deltalog. */
      ​​​​        ginfo.log_change(std::abs(pagerank - v.get_data()));
      ​​​​        
      ​​​​        /* Set my new pagerank as the vertex value */
      ​​​​        v.set_data(pagerank); 
      ​​​​    }
      ​​​​}
      

Bulk-Synchronous Parallel (BSP) model

  • From "A Bridging Model for Parallel Computation" Communication of ACM, 1990
  • BSP is a bridge between parallel computation and hardware implementation
  • Goal: Mapping high-level programs to actual machines in a great variety of contexts, little efficiency is lost if we utilize this single model.
  • Detail:
    • A computation consists of a sequence of supersteps. In each superstep, each component is allocated a task consisting of some combination of local computation steps, and message passing steps.
    • After each period of L time units, a global check is made to determine whether the superstep has been completed by all the components.
    • If yes, the machine proceeds, if no, the next L units is allocated to the unfinished superstep.

Sparse graph encoding

Example graph







G



a

a



a->a


10



b

b



a->b


20



b->b


30



d

d



b->d


40



f

f



d->f


80



c

c



c->d


60



c->c


50



e

e



c->e


70



Adjacency matrix

M=(abcdefa10200000b03004000c005060700d0000080)
Transpose of adjacency matrix
MT=(abcda10000b203000c00500d040600e00700f00080)

CSR (Compressed Sparse Row)

  • Encoding (assume zero-index array)
    • W = {10, 20, 30, 40, 50, 60, 70, 80}
    • COL_INDEX = {0, 1, 1, 3, 2, 3, 4, 5}
    • ROW_INDEX = {0, 2, 4, 7, 8}
  • Find the out-edge of vertex
    b
    (row 1 of matrix M)
    • ROW_INDEX[1:2] = {2, 4}
    • W[2:4] = {30, 40}
    • COL_INDEX[2:4] = {1, 3}
  • Find the in-edge of vertex
    b
    (col 1 of adj matrix)
    • Search all rows
  • Fast loading of out-edges of a vertex from the disk

CSC (Compressed Sparse Column)

  • Encoding (assume zero-index array)
    • W = {10, 20, 30, 40, 50, 60, 70, 80}
    • ROW_INDEX = {0, 1, 1, 3, 2, 3, 4, 5}
    • COL_INDEX = {0, 2, 4, 7, 8}
  • Find the in-edge of vertex
    b
    (col 1 of matrix
    MT
    )
    • COL_INDEX[1:2] = {2, 4}
    • W[2:4] = {30, 40}
    • ROW_INDEX[2:4] = {1, 3}
  • Find the out-edge of vertex
    b
    (row 1 of matrix
    MT
    )
    • Search all columns
  • Fast loading of in-edges of a vertex from the disk

Comparison

  • CSR format
    • Adv: row access, out-edge of vertex
  • CSC format
    • Adv: col access, in-edge of vertex

Disadvantage for sparse encoding

  • Random memory access is bad for disk
  • Store graph in BOTH CSR and CSC format for bidirectional access

Graph encoding in GraphChi

  • The following graph can be encoded in two different format in GraphChi






G



src1

src1



dst1

dst1



src1->dst1





dst2

dst2



src1->dst2





dst3

dst3



src1->dst3





dst4

dst4



src1->dst4





src2

src2



src2->dst1





src2->dst4





  • The adjacency shard stores, implicitly, an edge array for each vertex, in order.

    • Edge array of a vertex starts with a variable-sized length word, followed by the list of neighbors. If a vertex has no edges in this shard, zero length byte is followed by the number of subsequent vertices with no edges in this shard.
    • ​​​​src1 4 dst1 dst2 dst3 dst4
      ​​​​src2 2 dst1 dst4
      
  • The edge data shard is a flat array of edge values, in user-defined type. Values must be of constant size.

    • ​​​​src1 dst1 
      ​​​​src1 dst2
      ​​​​src1 dst3
      ​​​​src1 dst4
      ​​​​src2 dst1
      ​​​​src2 dst6
      

PSW (Parallel Sliding Windows)

  • PSW is an algorithm proposed in this paper to execute any vertex-centric graph processing algorithm
  • PSW can process a graph with mutable edge values efficiently from disk, with only a small number of non-sequential disk accesses, while supporting the asynchronous model of computation

What is "intervals" ?

  • Vertices
    V
    are split into
    P
    disjoint intervals
    • Each vertex in graph can ONLY belongs to one interval.
    • Intervals are chosen to balance the number of edges in each shard.
    • Policy: The number of intervals
      P
      is chosen so that any one shard can be loaded completely into memory

What is "shards" ?

  • For each interval, we associate a shard. Shard stores ALL the edges that have destination in that interval
    • Edges are stored in the order of their source
      • That's the reason why memory access pattern "slides" instead of random access
    • Memory shard (Brown) is fully loaded into memory
    • Sliding shard (Boxes) is a set of edges whose source vertices belongs to the interval

Parallel Updates

  • Run user-defined update-function(v) for each vertex in parallel
  • External determinism to gurantee that each execution produces the same result
    • Vertices that have edges with both end-points in the same interval are flagged as critical. Will be updated sequentially
    • Non-critical vertices can be safely updated in parallel
  • Each update function will compute results based on the previous results. Even for cirtical edges. This adheres to the asynchronous model

Example

Loading the graph

  • Graph is divided into two intervals (disjoint)
    • Interval 1: vertex 1, 2
    • Interval 2: vertex 3, 4
    • Interval 3: vertex 5, 6
  • Only one interval will be fully loaded into memory

Observation

  • Shard is grouped by destination vertex
  • Each shard is sorted by source vertex

Parallel update

  • Critical vertices
    • Processed in sequential
    • Vertex 1, 2, 3, 4, 5, 6
  • Non-critical vertices
    • Processed in parallel

PageRank App tutorial

  • This section teach you how to run "pagerank" example app in GraphChi

Env setup

  • Download GraphChi
    • git clone https://github.com/GraphChi/graphchi-cpp.git
  • Build GraphChi
    • cd graphchi-cpp
    • make -j 12 where 12 is the number of cores on your CPU
  • Download Google web graph from here
    • wget http://snap.stanford.edu/data/web-Google.txt.gz
    • The downloaded file web-Google.txt contains the following entries
    • ​​​​# Directed graph (each unordered pair of nodes is saved once): web-Google.txt ​​​​# Webgraph from the Google programming contest, 2002 ​​​​# Nodes: 875713 Edges: 5105039 ​​​​# FromNodeId ToNodeId ​​​​0 11342 ​​​​0 824020 ​​​​0 867923 ​​​​0 891835 ​​​​11342 0 ​​​​11342 27469 ​​​​11342 38716 ​​​​...

Run GraphChi on Google web graph

  • Run the page rank algorithm for 100 iterations
    • ./bin/example_apps/pagerank file web-Google.txt niters 100 filetype edgelist
    • Notice the web-Google.txt file stores a graph in the form of edgelist

Results

  • The most popular vertices determined by the page rank algorithm will be shown
Print top 20 vertices: 1. 597621 564.27 2. 41909 562.688 3. 163075 552.226 4. 537039 549.065 5. 384666 480.686 6. 504140 467.38 7. 486980 442.841 8. 605856 438.574 9. 32163 435.284 10. 558791 433.215 11. 551829 428.842 12. 765334 417.213 13. 751384 403.905 14. 425770 380.578 15. 908351 379.204 16. 173976 372.106 17. 7314 365.658 18. 213432 363.671 19. 885605 358.624 20. 819223 355.695

Other

/** * Vertex update function. */ void update(graphchi_vertex<VertexDataType, EdgeDataType> &vertex, graphchi_context &gcontext) { if (ginfo.iteration == 0) { /* On first iteration, initialize vertex (and its edges). This is usually required, because on each run, GraphChi will modify the data files. To start from scratch, it is easiest do initialize the program in code. Alternatively, you can keep a copy of initial data files. */ // vertex.set_data(init_value); } else { /* Do computation */ /* Loop over in-edges (example) */ for(int i=0; i < vertex.num_inedges(); i++) { // Do something // value += vertex.inedge(i).get_data(); } /* Loop over out-edges (example) */ for(int i=0; i < vertex.num_outedges(); i++) { // Do something // vertex.outedge(i).set_data(x) } /* Loop over all edges (ignore direction) */ for(int i=0; i < vertex.num_edges(); i++) { // vertex.edge(i).get_data() } // v.set_data(new_value); } }
  • value += vertex.inedge(i).get_data();
    • SEL w from T where dst == i
    • Normal DRAM: