NoSQL Zone is brought to you in partnership with:

Kris is a developer with a passion for combining technologies to create new possibilities for the people around him. Coming from a Java and GIS background and being a fan of open source software, Kris started working with distributed systems and graph databases in the last couple of years. He’s currently working on visualizing Big Data with the help of Hadoop and Neo4j. Kris is a DZone MVB and is not an employee of DZone and has posted 2 posts at DZone. You can read more from them at their website. View Full User Profile

Combining Neo4j and Hadoop (Part 1)

11.22.2012
| 5986 views |
  • submit to reddit

Why combine these two different things?

Hadoop is good for data crunching, but the end-results in flat files don’t present well to the customer, also it’s hard to visualize your network data in excel.

Neo4J is perfect for working with our networked data. We use it a lot when visualizing our different sets of data.
So we prepare our dataset with Hadoop and import it into Neo4J, the graph database, to be able to query and visualize the data.
We have a lot of different ways we want to look at our dataset so we tend to create a new extract of the data with some new properties to look at every few days.

This blog is about how we combined Hadoop and Neo4J and describes the phases we went trough in our search for the optimal solution.

So this is how we started it.

Phase I:

- Use Hive to prepare data. For those of you not familiar with the Hadoop ecosystem, Hive is a tool which enables you to use SQL to write queries which are transformed into map/reduce jobs. We use this to create a nodes table and an edges table out of our data.

The end-result of this series of queries is two sets of files which we can get out of our Hadoop cluster to our local machine.

The nodes table/file looks something like this:

NodeId  Property1    Property2    PropertyN
AAAÂ Â Â Â  nameOfA Â Â Â  Â amountOfAÂ Â  Â someAThing
BBBÂ Â  Â Â nameOfBÂ Â Â Â  Â amountOfBÂ Â  Â someBThing
CCCÂ Â Â  Â nameOfCÂ Â  Â  Â amountOfCÂ Â  Â someCThing
DDD Â Â Â  nameOfDÂ  Â Â Â Â amountOfDÂ Â  Â someDThing

The edges table/file looks something like this:

fromNodeId    ToNodeId    EdgeProperty1    EdgePropertyN
AAAÂ Â  Â Â Â  Â Â Â  BBBÂ Â  Â Â Â Â  Â someDate1Â Â  Â Â Â  Â someNumber1
AAAÂ Â  Â Â Â  Â Â Â Â DDDÂ Â Â  Â Â Â  Â someDate2Â Â  Â Â Â  Â someNumber2
BBBÂ Â  Â Â Â  Â Â  Â DDDÂ  Â  Â Â Â  Â someDate3Â Â  Â Â Â  Â someNumber3
CCCÂ Â  Â Â Â  Â Â  Â BBBÂ Â Â  Â Â Â  Â someDate4Â Â  Â Â Â  Â someNumber4
DDDÂ Â  Â Â Â  Â Â  Â BBBÂ Â Â  Â Â Â  Â someDate5Â Â  Â Â Â  Â someNumber5
DDDÂ Â  Â Â Â Â Â Â  Â CCCÂ Â Â  Â Â Â  Â someDate6Â Â  Â Â Â  Â someNumber6

- For loading these sets into Neo4J we use the batchinserter.
At the time we build our first importer the version of Neo4J was 1.6-ish. So we wrote some code and started the import.
The dataset we’re talking about has some 30 Million nodes with 9 properties each and about 650 Million edges with 4 properties each.

import java.io.BufferedReader;
import java.io.InputStreamReader;
import org.neo4j.kernel.impl.batchinsert.BatchInserter;
import org.neo4j.kernel.impl.batchinsert.BatchInserterImpl;
 
BatchInserter db = new BatchInserterImpl(<outputPath>, <config>)
 
long[] idCache = new long[<nrOfNodes>];
 
BufferedReader reader = new BufferedReader(new InputStreamReader(<InputStreamThingy>), 100 * 1024 *1024)
String line;
while ((line = reader.readLine()) != null) {
    String[] parts = line.split('\t');
    int myOwnId = Integer.parseInt(parts[0]);
 
    //some property magic goes here
    idCache[myOwnId] = db.createNode(<propertiesMap>);
}
reader.close();
 
//Edges
reader = new BufferedReader(newInputStreamReader(<InputStreamThingyforEdges>), 100 * 1024 *1024)
while ((line = reader.readLine()) != null) {
    String[] parts = line.split('\t');
    int fromNodeOwnId = Integer.parseInt(parts[0]);
    int toNodeOwnId = Integer.parseInt(parts[1]);
 
    //some property magic goes here
    db.createRelationship(idCache[fromNodeOwnId], idCache[toNodeOwnId], <RelationshipType>, <propertiesMap>);
}
reader.close();

We import these nodes and edges on our desktop machine with 16Gb of RAM which takes about a full 20 hrs to complete.

Phase II:

So we need to speed things up a little. In this phase we remove the part where we get the nodes and edges files from our hadoop cluster to our local machine and read them straight from the cluster.

Still use hive to prepare the data
Make the importer read the files from the cluster directly (no copy needed anymore)

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
 
FileSystem fs = FileSystem.get(<hadoop configuration>);
 
FileStatus[] files = fs.globStatus(new Path(<pathToNodesTableOrFile>));
for (FileStatus file : files) {
    DataInputStream dis = new DataInputstream(fs.open(file.getPath()));
    BufferedReader reader  = new BufferedReader(new InputStreamReader(dis), 100 * 1024 * 1024);
 
    while ((line = reader.readLine()) != null) {
    }
}

Run the importer on one of the worker nodes of the Hadoop cluster (has 32Gb) (make sure no Hadoop processes are running so we can take the full 32GB)
this takes about a full 16 hrs to complete and we need to get the data from that machine to the machine where we run the Neo4J database (took about 2 hrs for about 80 GB)

Phase III:

We achieved very little improvement of the total time it takes to create the full Neo4J database so we needed to try out some more.
On the mailing list there were some rumors that the current version of Neo4J (1.8) had some major improvements in performance of the batchinserter
So we upgraded the importer code to use version 1.8 of Neo4J (was 1.6)

The only code change in our own code was in the way we create the BatchInserter class. Other optimizations are inside the Neo4J code, mainly in the way the paging subsystem in the PersistenceWindowPool works.

import org.neo4j.unsafe.batchinsert.BatchInserter;
import org.neo4j.unsafe.batchinsert.BatchInserters;
 
BatchInserter db = BatchInserters.inserter(<outputPath>, <config>)
Still use Hive to prepare the data
Run the importer on one of the worker nodes of the Hadoop cluster

takes about a 3 hrs to complete and we need to get the data from that machine to the machine where we run the Neo4J database (took about 2 hrs for about 80 GB)

This is were we are now. But we still have something up our sleeves.
While we were waiting for these imports to complete we decided to look if we could make the batchimporter work in a distributed way so we could make use of our 16 node Hadoop cluster to create the Neo4J initial database faster.

In the next blog I will go into the details about that quest.

Published at DZone with permission of Kris Geusebroek, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)