Grant Ingersoll is a committer on the Apache Lucene and Apache Solr projects, as well as the current Lucene PMC chair. He is also a founding team member of Lucid Imagination. Grant has posted 11 posts at DZone. You can read more from them at their website. View Full User Profile

Scaling Solr Indexing with SolrCloud, Hadoop and Behemoth

03.06.2012
| 12714 views |
  • submit to reddit

We’ve been doing a lot of work at Lucid lately on scaling out Solr, so I thought I would blog about some of the things we’ve been working on recently and how it might help you handle large indexes with ease.  First off, if you want a more basic approach using versions of Solr prior to what will be Solr4 and you don’t care about scaling out Solr indexing to match Hadoop or being fault tolerant, I recommend you read Indexing Files via Solr and Java MapReduce.  (Note, you could also modify that code to handle these things.  If you need do that, we’d be happy to help.)

Instead of doing all the extra work of making sure instances are up, etc., however, I am going to focus on using some of the new features of Solr4 (i.e. SolrCloud whose development effort has been primarily led by several of my colleagues: Yonik Seeley, Mark Miller and Sami Siren) which remove the need to figure out where to send documents when indexing, along with a convenient Hadoop-based document processing toolkit, created by Julien Nioche, called Behemoth that takes care of the need to write any Map/Reduce code and also handles things like extracting content from PDFs and Word files in a Hadoop friendly manner (think Apache Tika run in Map/Reduce) while also allowing you to output the results to things like Solr or Mahout, GATE and others as well as to annotate the intermediary results.  Behemoth isn’t super sophisticated in terms of ETL (Extract-Transform-Load) capabilities, but it is lightweight, easy to extend and gets the job done on Hadoop without you having to spend time worrying about writing mappers and reducers.

Setup

To get started, let’s assume you have a bunch of content.  I’ll use the same works of Shakespeare that Adam used, but note that I could use HTML, PDFs, or any format supported by Tika.  First, I assume you already know how to setup Hadoop and have a cluster up and running.  As in the other post, put the unpacked data into HDFS:

> hadoop fs -put shakespeare /user/hadoop/shakespeare


Next, get a copy of Behemoth that is SolrCloud aware by checking it out of my Github fork of  the main project (I just recently pushed it as a branch and have not submitted a pull request yet, since Solr 4 is not officially released, even if it is close to being so).  You can get this fork at https://github.com/gsingers/behemoth/tree/SOLR4.  To get the code and get things setup, do the following steps (you’ll need Maven in order to build Behemoth)

> git clone https://github.com/gsingers/behemoth

> cd behemoth

> git checkout SOLR4

> mvn package


While Maven is downloading the entirety of the Java world and packaging up the Hadoop job jars for Behemoth, let’s get Solr up and running in SolrCloud mode (which is completely optional, which is one of the things I really like about it).   To get the latest Lucene/Solr code and get things setup, do the following:

> git clone git://git.apache.org/lucene-solr.git

> cd lucene-solr/solr

> ant example

> cd example


At this point, we should have Solr built, so let’s start up two instances that can talk to each other via SolrCloud and will do things like automatically fail over, etc.   We’ll setup a 2 node cluster with an embedded instance of Zookeeper on 2 different ports of the same machine.  (In many situations, you’ll likely setup Zookeeper standalone and will put each Solr instance on it’s own machine.)

> java -Dbootstrap_confdir=solr/conf -Dcollection.configName=hadoop -DzkRun -DnumShards=2 -jar start.jar

In a second terminal, change into the example directory again and run:

> java -Dsolr.data.dir=/tmp/solr-hadoop -Djetty.port=7574 -DzkHost=localhost:9983 -jar start.jar

> Point your browser at http://localhost:8983/solr/ to verify things are working.  You should see Solr’s shiny new built in UI.


A couple of notes on this setup:

  • As of now, you have to declare how many shards you want to use (in this case 2), but in the not too distant future you should be able to avoid this, as Solr will likely either employ a microsharding approach or an index splitting approach (or both).  Note, even now, there are some tricks one can do to work around it by installing a new core into an existing node and then having new nodes replicate that new core.  Thus, one node will be slightly more saturated, but this likely isn’t a big deal given you can spread the work out more.
  • Behemoth is using SolrJ’s new CloudServer client, which is aware of the state of Zookeeper and will automatically figure out where to send the documents, unlike the StreamingUpdateSolrServer which does not.  Thus, if a particular node goes down, Solr should keep right on indexing, just like one would expect out of a solution that works with Hadoop.  In the near future, CloudServer should also be leader aware which should make indexing with CloudServer even more efficient.  See SOLR-3154 to track this idea.  By the way, the Behemoth CloudServer code looks like:
    solr = new CloudSolrServer(zkHost);
    ((CloudSolrServer)solr).setDefaultCollection(collection);
  • In this setup, I have 2 shards and 2 instances of Solr running, meaning each instance will be a “leader” in SolrCloud parlance.  If I added nodes, they would automatically join the cluster and replicate one of the two shards.  Additionally, they would stay in sync with the leaders without you having to do any of the old fashioned Solr replication setup.
  • Instead of writing to Solr in the mapper, Behemoth uses a custom OutputFormat and an IdentityMapper which creates a Hadoop RecordWriter that handles the writing of the documents to Solr.  I’m not enough of a Hadoop expert to comment on this approach versus doing it directly in the Mapper (or Reducer), but it does strike me as an interesting alternative.  I’d appreciate feedback from others as to the approach.  I believe when I looked at Cassandra’s Hadoop integration that it does a similar thing to Behemoth.
  • SOLR-1301 takes an approach of using the EmbeddedSolrServer and builds the indexes directly in Hadoop.  This presumably saves on network traffic, but it won’t work yet w/ Solr4 due to the need to version documents as they come in (at least that is my understanding) and, of course, it makes distributed indexing depend on Hadoop, which may not be something everyone wants, especially in “smaller” (keep in mind, a single Solr instance can often handle upwards of 100-200 million documents, so even a small cluster of 30 nodes can handle 1 billion documents — 10 shards, 100M per shard, with 3 nodes per shard) environments where one doesn’t want the overhead and headaches of maintaining a Hadoop cluster.
  • For the 2nd instance, I set the index in a different directory (-Dsolr.data.dir) because I was starting Solr out of the same home area as the first and I didn’t want to overwrite it’s data, which defaults to ./solr/data.  If you were working on a different machine or made a copy of the example directory as directed on the wiki, you wouldn’t need that argument.

If you want to see the state of the cluster, go to http://localhost:8983/solr/#/cloud and select the clusterstate.json node under the Zookeeper data, which should look something like:

{\"collection1\":{
    \"shard1\":{\"beast:8983_solr_\":{
        \"shard\":\"shard1\",
        \"leader\":\"true\",
        \"state\":\"active\",
        \"core\":\"\",
        \"collection\":\"collection1\",
        \"node_name\":\"beast:8983_solr\",
        \"base_url\":\"http://beast:8983/solr\"}},
    \"shard2\":{\"beast:7574_solr_\":{
        \"shard\":\"shard2\",
        \"leader\":\"true\",
        \"state\":\"active\",
        \"core\":\"\",
        \"collection\":\"collection1\",
        \"node_name\":\"beast:7574_solr\",
        \"base_url\":\"http://beast:7574/solr\"}}}}

Indexing with Hadoop and Solr

Now, to index the content, let’s dig in with Hadoop and Behemoth.  Let’s convert the content to Behemoth’s intermediate format, which is a SequenceFile of BehemothDocument instances.  By doing this step, it allows us to then work off the intermediate form for all other operations without having to reprocess the original, plus it gives us the benefit of a smaller number of larger SequenceFiles, which is generally better for Hadoop.  For instance, we could take this intermediate form and easily send it to GATE, Mahout, Tika or Solr.  To do this step, run the CorpusGenerator:

hadoop jar ./core/target/behemoth-core-1.0-SNAPSHOT-job.jar com.digitalpebble.behemoth.util.CorpusGenerator /user/hadoop/shakespeare shake-behemoth –recurse


You should see something like:

12/03/04 17:34:43 INFO util.NativeCodeLoader: Loaded the native-hadoop library
12/03/04 17:34:43 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
12/03/04 17:34:43 INFO compress.CodecPool: Got brand-new compressor
44 docs converted


Note, CorpusGenerator is not a Map/Reduce job at this point in time, so you could just as well call it the good old fashioned way in Java.  This could be converted to Map/Reduce, but it isn’t likely the long pole in the tent anyway.

Next, let’s convert them using Tika:

/home/hadoop/hadoop-0.20.2/bin/hadoop jar ./tika/target/behemoth-tika-1.0-SNAPSHOT-job.jar com.digitalpebble.behemoth.tika.TikaDriver -i shake-behemoth -o shake-behemoth-tika


Running this, you should see something like:

12/03/04 17:42:52 INFO mapred.FileInputFormat: Total input paths to process : 1
12/03/04 17:42:53 INFO mapred.JobClient: Running job: job_201203041658_0001
12/03/04 17:42:54 INFO mapred.JobClient: map 0% reduce 0%
12/03/04 17:43:09 INFO mapred.JobClient: map 50% reduce 0%
12/03/04 17:43:11 INFO mapred.JobClient: map 100% reduce 0%
12/03/04 17:43:13 INFO mapred.JobClient: Job complete: job_201203041658_0001
12/03/04 17:43:13 INFO mapred.JobClient: Counters: 10
12/03/04 17:43:13 INFO mapred.JobClient: MIME-TYPE
12/03/04 17:43:13 INFO mapred.JobClient: text/plain=44
12/03/04 17:43:13 INFO mapred.JobClient: Job Counters
12/03/04 17:43:13 INFO mapred.JobClient: Launched map tasks=2
12/03/04 17:43:13 INFO mapred.JobClient: Data-local map tasks=2
12/03/04 17:43:13 INFO mapred.JobClient: TIKA
12/03/04 17:43:13 INFO mapred.JobClient: DOC-PARSED=44
12/03/04 17:43:13 INFO mapred.JobClient: FileSystemCounters
12/03/04 17:43:13 INFO mapred.JobClient: HDFS_BYTES_READ=2222475
12/03/04 17:43:13 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=10707718
12/03/04 17:43:13 INFO mapred.JobClient: Map-Reduce Framework
12/03/04 17:43:13 INFO mapred.JobClient: Map input records=44
12/03/04 17:43:13 INFO mapred.JobClient: Spilled Records=0
12/03/04 17:43:13 INFO mapred.JobClient: Map input bytes=2131305
12/03/04 17:43:13 INFO mapred.JobClient: Map output records=44


Now, let’s index them to Solr:

>hadoop jar ./solr/target/behemoth-solr-1.0-SNAPSHOT-job.jar com.digitalpebble.behemoth.solr.SOLRIndexerJob  shake-behemoth-tika collection1 localhost:9983


(Note, depending on the version of Hadoop you are using, you may hit some conflicts w/ SLF4J.  I ended up having to replace the Hadoop SLF4J libraries with the same version that Solr uses: 1.6.1)

Your output should look something like:

12/03/04 18:14:19 INFO mapred.FileInputFormat: Total input paths to process : 2

12/03/04 18:14:19 INFO mapred.JobClient: Running job: job_201203041809_0001
12/03/04 18:14:20 INFO mapred.JobClient: map 0% reduce 0%
12/03/04 18:14:28 INFO mapred.JobClient: map 50% reduce 0%

12/03/04 18:14:43 INFO mapred.JobClient: map 100% reduce 0%

12/03/04 18:14:45 INFO mapred.JobClient: Job complete: job_201203041809_0001
12/03/04 18:14:45 INFO mapred.JobClient: Counters: 7
12/03/04 18:14:45 INFO mapred.JobClient: Job Counters
12/03/04 18:14:45 INFO mapred.JobClient: Launched map tasks=3
12/03/04 18:14:45 INFO mapred.JobClient: Data-local map tasks=3
12/03/04 18:14:45 INFO mapred.JobClient: FileSystemCounters
12/03/04 18:14:45 INFO mapred.JobClient: HDFS_BYTES_READ=10707718
12/03/04 18:14:45 INFO mapred.JobClient: Map-Reduce Framework
12/03/04 18:14:45 INFO mapred.JobClient: Map input records=44
12/03/04 18:14:45 INFO mapred.JobClient: Spilled Records=0
12/03/04 18:14:45 INFO mapred.JobClient: Map input bytes=10707526
12/03/04 18:14:45 INFO mapred.JobClient: Map output records=44

That’s all there is to it!  You should now have content in Solr.

Searching

Now that we have some content indexed, let’s do some searches.  For a simple way to do this, point your browser at http://localhost:8983/solr and then click the “singlecore” menu item on the left hand side and then click the “query” option and enter your query into the provided input form.  For instance, I entered: text:”to be or not to be” and got the following response back (choosing JSON as the output):


Thankfully, we get back Hamlet!  Feel free to try other queries as you see fit.  If you do the MatchAllDocsQuery (*:*), you should see that all 44 docs are found.  The beauty of it is, Solr automatically knows what shards to query and can gracefully handle nodes failing, etc.

To learn more about the current SolrCloud capabilities as well as to get some hints at what is coming, check out the Solr Cloud wiki page.  And while I didn’t demonstrate it here, Solr4 also handles near real time search of the content, so you can now search the content almost as soon as it is indexed.

If you are wondering about where the documents live, you can get a hint of that by hitting the Luke request handler on each of the shards, as in http://localhost:7574/solr/admin/luke or http://localhost:8983/solr/admin/luke or by querying the individual shards.  You should see that the docs are more or less evenly split across the two nodes.  In my case, one node has 23 of the 44 docs and the other one has 21. YMMV.

Finally, while I’m using Behemoth here, you could use Adam’s code from the link above and replace the use of the StreamingUpdateSolrServer with the appropriate call to CloudServer, but you’d likely also have to add in more ETL code in the real world.

Next Steps

Taking things to the next level, we’ve been testing this and similar Hadoop based approaches, along with SolrCloud on much larger data sets.  If you’d like to try this out on a bigger dataset, you might try the Common Crawl data set, where you can really put all the pieces through the paces.  For that, you’ll obviously need more nodes running Solr and Hadoop and you’ll want to run Zookeeper separately.  You’ll also want to think hard about how best to configure your cluster.  This can be tricky, as you are often trading off between indexing performance and search performance, just like in the smaller setup cases.  For indexing performance, you likely want one Solr instance per Hadoop task node and you’d likely want CloudServer to be aware of this.  On the other hand, you often need far less instances of Solr to search the same content as you do Hadoop nodes, so you may want to pay the network penalty at indexing time and have fewer nodes, thus resulting in less network traffic at query time.  Naturally, the answer here is to benchmark what works best in your situation.  As with any real search setup, spend some time also thinking about what content needs to be indexed and stored and be ruthlessly efficient about trimming down the data you ship around.  Just because you have a giant hammer like Hadoop doesn’t mean you still shouldn’t think about what your data looks like and where it is going to end up.

That’s all there is to it.  Hopefully you have seen how easy it is to get started scaling out indexing and searching with Hadoop, Solr and Behemoth.




Published at DZone with permission of its author, Grant Ingersoll. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)