Big Data/BI Zone is brought to you in partnership with:

Big Graph Data on the Hortonworks Big Data Platform

01.06.2013
| 3533 views |
  • submit to reddit

hortonworks-aurelius-header

This is an archival repost of a blog post that was originally published on Hortonworks’ blog.

The Hortonworks Data Platform (HDP) conveniently integrates numerous Big Data tools in the Hadoop ecosystem. As such, it provides cluster-oriented storageprocessingmonitoring, and data integration services. HDP simplifies the deployment and management of a production Hadoop-based system.

HDP MonitorIn Hadoop, data is represented as key/value pairs. In HBase, data is represented as a collection of wide rows. These atomic structures makes global data processing (via MapReduce) and row-specific reading/writing (via HBase) simple. However, writing queries is nontrivial if the data has a complex, interconnected structure that needs to be analyzed (see Hadoop joins and HBase joins). Without an appropriate abstraction layer, processing highly structured data is cumbersome. Indeed, choosing theright data representation and associated tools opens up otherwise unimaginable possibilities. One such data representation that naturally captures complex relationships is a graph (or network). This post presentsAurelius‘ Big Graph Data technology suite in concert with Hortonworks Data Platform. For a real-world grounding, a GitHub clone is described in this context to help the reader understand how to use these technologies for building scalable, distributed, graph-based systems.

Aurelius Graph Cluster and Hortonworks Data Platform Integration

Aurelius Graph ClusterThe Aurelius Graph Cluster can be used in concert with Hortonworks Data Platform to provide users a distributed graph storage and processing system with the management and integration benefits provided by HDP. Aurelius’ graph technologies include Titan, a highly-scalable graph database optimized for serving real-time results to thousands of concurrent users and Faunus, a distributed graph analytics engine that is optimized for batch processing graphs represented across a multi-machine cluster.

In an online social system, for example, there typically exists a user base that is creating things and various relationships amongst these things (e.g. likes, authored, references, stream). Moreover, they are creating relationships amongst themselves (e.g. friend, group member). To capture and process this structure, a graph database is useful. When the graph is large and it is under heavy transactional load, then a distributed graph database such as Titan/HBase can be used to provide real-time services such as searches, recommendations, rankings, scorings, etc. Next, periodic offline global graph statistics can be leveraged. Examples include identifying the most connected users, or tracking the relative importance of particular trends. Faunus/Hadoop serves this requirement. Graph queries/traversals in Titan and Faunus are simple, one-line commands that are optimized both semantically and computationally for graph processing. They are expressed using the Gremlin graph traversal language. The roles that Titan, Faunus, and Gremlin play within HDP are diagrammed below.Aurelius and HDP Integration

A Graph Representation of GitHub

Octocat socialiteGitHub is an online source code service where over 2 million people collaborate on over 4 million projects. However, GitHub provides more than just revision control. In the last 4 years, GitHub has become a massive online community for software collaboration. Some of the biggest software projects in the world use GitHub (e.g. the Linux kernel).

GitHub is growing rapidly — 10,000 to 30,000 events occur each hour (e.g. a user contributing code to a repository). Hortonworks Data Platform is suited to storing, analyzing, and monitoring the state of GitHub. However, it lacks specific tools for processing this data from a relationship-centric perspective. Representing GitHub as a graph is natural because GitHub connects people, source code, contributions, projects, and organizations in diverse ways. Thinking purely in terms of key/value pairs and wide rows obfuscates the underlying relational structure which can be leveraged for more complex real-time and batch analytic algorithms.

GitHub Octocat

GitHub provides 18 event types, which range from new commits and fork events, to opening new tickets, commenting, and adding members to a project. The activity is aggregated in hourly archives, [each of which] contains a stream of JSON encoded GitHub events. (via githubarchive.org)

The aforementioned events can be represented according to the popular property graph data model. A graphschema describing the types of “things” and relationships between them is diagrammed below. A parse of the raw data according to this schema yields a graph instance.GitHub Schema

Deploying a Graph-Based GitHub

Amazon EC2To integrate the Aurelius Graph Cluster with HDP, Whirr is used to launch a 4 m1.xlarge machine cluster on Amazon EC2. Detailed instructions for this process are provided on the Aurelius Blog, with the exception that a modified Whirr properties file must be used for HDP. A complete HDP Whirr solution is currently in development. To add Aurelius technologies to an existing HDP cluster, simply download Titan and Faunus, which interface with installed components such as Hadoop and HBase without further configuration.

5830 hourly GitHub Archive files between mid-March 2012 and mid-November 2012 contain 31 million GitHub events. The archive files are parsed to generate a graph. For example, when a GitHub push event is parsed, vertices with the types user, commit, and repository are generated. An edge with label pushed links the user to the commit and an edge with label to links the commit to the repository. The user vertex has properties such as user name and email address, the commit vertex has properties such as the unique sha sum identifier for the commit and its timestamp, and the repository vertex has properties like its URL and the programming language used. In this way, the 31 million events give rise to 27 million vertices and 79 million edges (a relatively small graph — though growing). Complete instructions for parsing the data are in the githubarchive-parser documentation. Once the configuration options are reviewed, launching the automated parallel parser is simple.

$ exportLC_ALL="C"
$ exportJAVA_OPTIONS="-Xmx1G"
$ python AutomatedParallelParser.py batch


The generated vertex and edge data is imported into the Titan/HBase cluster using the BatchGraph wrapper of theBlueprints graph API (a simple, single threaded insertion tool).
1 $ exportJAVA_OPTIONS="-Xmx12G"
2 $ gremlin -e ImportGitHubArchive.groovy vertices.txt edges.txt

Titan: Distributed Graph Database

 A Distributed Graph DatabaseTitan is a distributed graph database that leverages existing storage systems for its persistence. Currently, Titan provides out-of-the-box support for Apache HBase andCassandra (see documentation). Graph storage and processing in a clustered environment is made possible because of numerous techniques to both efficiently represent a graph within a BigTable-style data system and to efficiently process that graph using linked-list walking and vertex-centric indices. Moreover, for the developer, Titan provides native support for the Gremin graph traversal language. This section will demonstrate various Gremlin traversals over the parsed GitHub data.

The following Gremlin snippet determines which repositories Marko Rodriguez (okram) has committed to the most. The query first locates the vertex with name okram and then takes outgoing pushed-edges to his commits. For each of those commits, the outgoing to-edges are traversed to the repository that commit was pushed to. Next, the name of the repository is retrieved and those names are grouped and counted. The side-effect count map is outputted, sorted in decreasing order, and displayed. A graphical example demonstrating gremlins walking is diagrammed below.

01 gremlin> g = TitanFactory.open('bin/hbase.local') 
02 ==>titangraph[hbase:127.0.0.1]
03 gremlin> g.V('name','okram').out('pushed').out('to').github_name.groupCount.cap.next().sort{-it.value}
04 ==>blueprints=413
05 ==>gremlin=69
06 ==>titan=49
07 ==>pipes=49
08 ==>rexster=40
09 ==>frames=26
10 ==>faunus=23
11 ==>furnace=9
12 ==>tinkubator=5
13 ==>homepage=1

Github Gremlin Traversal

The above query can be taken 2-steps further to determine Marko’s collaborators. If two people have pushed commits to the same repository, then they are collaborators. Given that the number of people committing to a repository could be many and typically, a collaborator has pushed numerous commits, a max of 2500 such collaborator paths are searched. One of the most important aspects of graph traversing is understanding thecombinatorial path explosions that can occur when traversing multiple hops through a graph (see Loopy Lattices).

1 gremlin> g.V('name','okram').out('pushed').out('to').in('to').in('pushed').hasNot('name','okram')[0..2500].name.groupCount.cap.next().sort{-it.value}[0..4]
2 ==>lvca=877
3 ==>spmallette=504
4 ==>sgomezvillamor=424
5 ==>mbroecheler=356
6 ==>joshsh=137

Complex traversals are easy to formulate with the data in this representation. For example, Titan can be used to generate followship recommendations. There are numerous ways to express a recommendation (with varying semantics). A simple one is: “Recommend me people to follow based on people who watch the same repositories as me. The more repositories I watch in common with someone, the higher they should be ranked.” The traversal below starts at Marko, then traverses to all the repositories that Marko watches. Then to who else (not Marko) looks at those repositories and finally counts those people and returns the top 5 names of the sorted result set. In fact, Marko andStephen (spmallette) are long time collaborators and thus, have similar tastes in software.

1 gremlin> g.V('name','okram').out('watched').in('watched').hasNot('name','okram').name.groupCount.cap.next().sort{-it.value}[0..4]
2 ==>spmallette=3
3 ==>alex-wajam=3
4 ==>crimeminister=2
5 ==>redgetan=2
6 ==>snicaise=2

1 gremlin> g.V('name','okram').out('created').has('type','Comment').count()
2 ==>159
3 gremlin> g.V('name','okram').out('created').has('type','Issue').count() 
4 ==>176
5 gremlin> g.V('name','okram').out('edited').count() 
6 ==>85

A few self-describing traversals are presented above that are rooted at okram. Finally, note that Titan is optimized for local/ego-centric traversals. That is, from a particular source vertex (or small set of vertices), use some path description to yield a computation based on the explicit paths walked. For doing global graph analyses (where the source vertex set is the entire graph), a batch processing framework such as Faunus is used.

Faunus: Graph Analytics Engine

 Graph Computing with HadoopEvery Titan traversal begins at a small set of vertices (or edges). Titan is not designed for global analyses which involve processing the entire graph structure. The Hadoop component of Hortonworks Data Platform provides a reliable backend for global queries via Faunus. Gremlin traversals in Faunus are compiled down to MapReduce jobs, where the first job’s InputFormatis Titan/HBase. In order to not interfere with the production Titan/HBase instance, a snapshot of the live graph is typically generated and stored in Hadoop’s distributed file system HDFS as aSequenceFile available for repeated analysis. The most general SequenceFile (with all vertices, edges, and properties) is created below (i.e. a full graph dump).

01 faunus$ cat bin/titan-seq.properties
02 faunus.graph.input.format=com.thinkaurelius.faunus.formats.titan.hbase.TitanHBaseInputFormat
03 hbase.zookeeper.quorum=10.68.65.161
04 hbase.mapreduce.inputtable=titan
05 hbase.mapreduce.scan.cachedrows=75
06 faunus.graph.output.format=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
07 faunus.sideeffect.output.format=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
08 faunus.output.location=full-seq
09 faunus.output.location.overwrite=true
10
11 faunus$ bin/gremlin.sh
12
13 \,,,/
14 (o o)
15 -----oOOo-(_)-oOOo-----
16 gremlin> g = FaunusFactory.open('bin/titan-seq.properties')
17 ==>faunusgraph[titanhbaseinputformat]
18 gremlin> g._().toString()
19 ==>[IdentityMap]
20 gremlin> g._()
21 12/12/13 09:19:53 INFO mapreduce.FaunusCompiler: Compiled to 1 MapReduce job(s)
22 12/12/13 09:19:55 INFO mapred.JobClient:  map 0% reduce 0%
23 12/12/13 09:21:26 INFO mapred.JobClient:  map 1% reduce 0%
24 12/12/13 09:21:36 INFO mapred.JobClient:  map 2% reduce 0%
25 12/12/13 09:21:43 INFO mapred.JobClient:  map 3% reduce 0%
26 ...
27 gremlin> hdfs.ls()
28 ==>rwx------ ubuntu supergroup 0 (D) .staging
29 ==>rwxr-xr-x ubuntu supergroup 0 (D) full-seq
30 gremlin> hdfs.ls('full-seq/job-0')
31 ==>rw-r--r-- ubuntu supergroup 0 _SUCCESS
32 ==>rwxr-xr-x ubuntu supergroup 0 (D) _logs
33 ==>rw-r--r-- ubuntu supergroup 243768636 part-m-00000
34 ==>rw-r--r-- ubuntu supergroup 125250887 part-m-00001
35 ==>rw-r--r-- ubuntu supergroup 331912876 part-m-00002
36 ==>rw-r--r-- ubuntu supergroup 431617929 part-m-00003
37 ...

Given the generated SequenceFile, the vertices and edges are counted by type and label, which is by definition a global operation.

01 gremlin> g.V.type.groupCount
02 ==>Gist  780626
03 ==>Issue  1298935
04 ==>Organization 36281
05 ==>Comment  2823507
06 ==>Commit  20338926
07 ==>Repository  2075934
08 ==>User  983384
09 ==>WikiPage  252915
10 gremlin> g.E.label.groupCount 
11 ==>deleted  170139
12 ==>on  7014052
13 ==>owns  180092
14 ==>pullRequested  930796
15 ==>pushed  27538088
16 ==>to  27719774
17 ==>added  181609
18 ==>created  10063346
19 ==>downloaded  122157
20 ==>edited  276609
21 ==>forked  1015435
22 ==>of  536816
23 ==>appliedForkTo  1791
24 ==>followed  753451
25 ==>madePublic  26602
26 ==>watched  2784640

Since GitHub is collaborative in a way similar to Wikipedia, there are a few users who contribute a lot, and many users who contribute little or none at all. To determine the distribution of contributions, Faunus can be used to compute the out degree distribution of pushed-edges, which correspond to users pushing commits to repositories. This is equivalent to Gremlin visiting each user vertex, counting all of the outgoing pushed-edges, and returning the distribution of counts.

01 gremlin> g.V.sideEffect('{it.degree = it.outE("pushed").count()}').degree.groupCount
02 ==>1 57423
03 ==>10  8856
04 ==>100  527
05 ==>1000  9
06 ==>1004  5
07 ==>1008  6
08 ==>1011  6
09 ==>1015  6
10 ==>1019  3
11 ==>1022  9
12 ==>1026  2
13 ==>1033  6
14 ==>1037  4
15 ==>104  462
16 ==>1040  3
17 ==>...

When the degree distribution is plotted using log-scaled axes, the results are similar to the Wikipedia contribution distribution, as expected. This is a common theme in most natural graphs — real-world graphs are not random structures and are composed of few “hubs” and numerous “satellites.”
github-pushed-out-degree-distribution

Hortonworks with GremlinMore sophisticated queries can be performed by first extracting a slice of the original graph that only contains relevant information, so that computational resources are not wasted loading needless aspects of the graph. These slices can be saved to HDFS for subsequent traversals. For example, to calculate the most central co-watched project on GitHub, the primary graph is stripped down to only watched-edges between users and repositories. The final traversal below, walks the “co-watched” graph 2 times and counts the number of paths that have gone through each repository. The repositories are sorted by their path counts in order to express which repositories are most central/important/respected according to the watches subgraph.

01 gremlin> g.E.has('label','watched').keep.V.has('type','Repository','User').keep
02 ...
03 12/12/13 11:08:13 INFO mapred.JobClient:  com.thinkaurelius.faunus.mapreduce.sideeffect.CommitVerticesMapReduce$Counters
04 12/12/13 11:08:13 INFO mapred.JobClient:  VERTICES_DROPPED=19377850
05 12/12/13 11:08:13 INFO mapred.JobClient:  VERTICES_KEPT=2074099
06 12/12/13 11:08:13 INFO mapred.JobClient:  com.thinkaurelius.faunus.mapreduce.sideeffect.CommitEdgesMap$Counters
07 12/12/13 11:08:13 INFO mapred.JobClient:  OUT_EDGES_DROPPED=55971128
08 12/12/13 11:08:13 INFO mapred.JobClient:  OUT_EDGES_KEPT=1934706
09 ...
10 gremlin> g = g.getNextGraph()
11 gremlin> g.V.in('watched').out('watched').in('watched').out('watched').property('_count',Long.class).order(F.decr,'github_name')
12 ==>backbone  4173578345
13 ==>html5-boilerplate 4146508400
14 ==>normalize.css 3255207281
15 ==>django  3168825839
16 ==>three.js  3078851951
17 ==>Modernizr 2971383230
18 ==>rails 2819031209
19 ==>httpie  2697798869
20 ==>phantomjs 2589138977
21 ==>homebrew  2528483507
22 ...

Conclusion

AureliusThis post discussed the use of Hortonworks Data Platform in concert with the Aurelius Graph Cluster to store and process the graph data generated by the online social coding system GitHub. The example data set used throughout was provided by GitHub Archive, an ongoing record of events in GitHub. While the dataset currently afforded by GitHub Archive is relatively small, it continues to grow each day. The Aurelius Graph Cluster has been demonstrated in practice to support graphs with hundreds of billions of edges. As more organizations realize the graph structure within their Big Data, the Aurelius Graph Cluster is there to provide both real-time and batch graph analytics.

Acknowledgments

The authors wish to thank Steve Loughran for his help with Whirr and HDP. Moreover, Russell Jurney requested this post and, in a steadfast manner, ensured it was delivered.

Related Material

Hawkins, P., Aiken, A., Fisher, K., Rinard, M., Sagiv, M., “Data Representation Synthesis,” PLDI’11, June 2011.

Pham, R., Singer, L., Liskin, O., Filho, F. F., Schneider, K., “Creating a Shared Understanding of
Testing Culture on a Social Coding Site
.” Leibniz Universität Hannover, Software Engineering Group: Technical Report, Septeber 2012.

Alder, B. T., de Alfaro, L., Pye, I., Raman V., “Measuring Author Contributions to the Wikipedia,” WikiSym ’08 Proceedings of the 4th International Symposium on Wikis, Article No. 15, September 2008.

Rodriguez, M.A., Mallette, S.P., Gintautas, V., Broecheler, M., “Faunus Provides Big Graph Data Analytics,” Aurelius Blog, November 2012.

Rodriguez, M.A., LaRocque, D., “Deploying the Aurelius Graph Cluster,” Aurelius Blog, October 2012.

Ho, R., “Graph Processing in Map Reduce,” Pragmatic Programming Techniques Blog, July 2010.

Published at DZone with permission of Marko Rodriguez, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)