NoSQL Zone is brought to you in partnership with:

As a pasionate software developer, motivated by learning and appliyng innovative and interesting software development tools, techniques and methodologies, my professional objectives are the following. To be in a technology oriented enterprise where the technichal staff is the soul of the company. To be in an important IT team. Be able to design and develop state of the art software. Be able to apply new knowledge everyday, on innovative ways and with a great degree of freedom. To architect, design and develop software that uses the best practices of the field. Play with the latest technologies, learn everyday and participate in the research and innovation of the software products. Carlo is a DZone MVB and is not an employee of DZone and has posted 16 posts at DZone. You can read more from them at their website. View Full User Profile

Using Java to access MongoDB, Redis, CouchDB, Riak, Cassandra

06.05.2012
| 12226 views |
  • submit to reddit

I had a requirement in my current job to persist some messages at different points in the running of the system. At the beggining we didn’t know the format in which the messages were going to be saved, where to save them or even which messages to save.

Last weekend I started working on my own in a small library for persisting java objects in different datasources and with different formats so that I was going to be able to leverage that library at work.

I intended to support different datasources. I started with MongoDB, Redis, File System, Cassandra, Riak and CouchDB.

The idea of the solution is to work as a kind of logger, so I took the main architecture characteristics from the Apache Log4j project. So for example I had the idea to easily plug the different datasources in what I called Appenders, following the Log4j concept.

Another thing I wanted is to be able to easily configure it with Spring, so I also created a small namespace for it.

The simple architecture I ended up with was something like this:

The idea is that any object will get “normalized” into a library internal object by using an implementation of a Normalizer. Then this normalized message goes to any of the Appenders where it gets converted into a provider specific message (e.g. DBObject in Mongo) then the appender takes care of storing it.

All the appenders and datastore libraries I currently use are very simple, and none of the datasources have been optimized anyhow, I work with them with their default installation behaviour.


If not for anything else, the library can at least serve to see the basic of how to interact with the different data sources. So next I show how all the appenders I have for the different Datasources.

package org.easytechs.recordpersister.appenders;

import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;

public class MongoAppender extends AbstractAppender<DBObject>{

    /**
     */
    private DBCollection coll;
    public MongoAppender(String host, String port, String dbName, String collection) throws Exception{
        Mongo m = new Mongo(host , Integer.parseInt(port));
        DB db = m.getDB(dbName);
        coll = db.getCollection(collection);
    }

    @Override
    public void close() {
       
    }

    @Override
    protected void doAppend(DBObject record) throws Exception {
        coll.insert(record);
    }

}


package org.easytechs.recordpersister.appenders;

import org.easytechs.recordpersister.appenders.redis.KeyValue;

import redis.clients.jedis.Jedis;


public class RedisAppender extends AbstractAppender<KeyValue>{
    /**
     */
    private Jedis jedis;
    public RedisAppender(String host) {
        jedis = new Jedis(host);
        jedis.connect();
    }

    @Override
    public void close() {
        jedis.disconnect();
    }

    @Override
    protected void doAppend(KeyValue record) throws Exception {
        jedis.rpush(record.getKey(), record.getValue());
    }
}


package org.easytechs.recordpersister.appenders;

import java.util.Map;

import redis.clients.jedis.Jedis;

public class RedisHashAppender extends AbstractAppender<Map<String, String>> {

    /**
     */
    private String listKey;

    /**
     */
    private Jedis jedis;

    public RedisHashAppender(String host, String listKey) {
        this.listKey = listKey;
        jedis = new Jedis(host);
        jedis.connect();
    }

    @Override
    public void close() {
        jedis.disconnect();
    }

    @Override
    protected void doAppend(Map<String, String> record) throws Exception {
        String key = String.valueOf(record.hashCode());
        for (String field : record.keySet()) {
            jedis.hset(key, field, record.get(field));         
        }
        jedis.rpush(getListKey(), key);
    }
   
    /**
     * @return
     */
    private String getListKey(){
        return this.listKey;
    }
}



package org.easytechs.recordpersister.appenders;

import org.easytechs.recordpersister.appenders.redis.KeyValue;

import com.basho.riak.client.IRiakClient;
import com.basho.riak.client.RiakFactory;
import com.basho.riak.client.bucket.Bucket;

public class RiakAppender extends AbstractAppender<KeyValue>{

    private Bucket myBucket;
    private IRiakClient riakClient;
   
    public RiakAppender(String host, int port, String bucket) throws Exception{
        riakClient = RiakFactory.pbcClient(host,port);
        myBucket = riakClient.fetchBucket(bucket).execute();
    }
    @Override
    public void close() {
        riakClient.shutdown();
    }

    @Override
    protected void doAppend(KeyValue record) throws Exception {
        myBucket.store(record.getKey(), record.getValue()).execute();
    }

}



package org.easytechs.recordpersister.appenders;

import java.util.Map;

import org.jcouchdb.db.Database;

public class CouchDBAppender extends AbstractAppender<Map<String, String>>{

    private Database db;
    public CouchDBAppender(String host, String database){
         db = new Database(host, database);
       
    }
    @Override
    public void close() {
       
    }

    @Override
    protected void doAppend(Map<String, String> record) throws Exception {
        db.createDocument(record);
    }

}




package org.easytechs.recordpersister.appenders;

import java.nio.ByteBuffer;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.easytechs.recordpersister.appenders.cassandra.CassandraRow;



public class CassandraAppender extends AbstractAppender<CassandraRow>{


    /**
     */
    private Cassandra.Client client;
    /**
     */
    private ColumnParent columnParent ;
    /**
     */
    private TTransport tr;
    private static final ConsistencyLevel CL = ConsistencyLevel.ANY;

    public CassandraAppender(String host, int port, String keyspace, String columnParent) throws Exception{
        tr = new TSocket(host, port);
        TFramedTransport tf = new TFramedTransport(tr);
        TProtocol proto = new TBinaryProtocol(tf);
        client = new Cassandra.Client(proto);
        tf.open();
        client.set_keyspace(keyspace);
        this.columnParent = new ColumnParent(columnParent);
    }

    @Override
    public void close() {
        tr.close();
    }

    @Override
    protected void doAppend(CassandraRow record) throws Exception{
            client.insert(ByteBuffer.wrap(record.getKey().getBytes()), columnParent, record.getColumns().get(0), CL);
    }
}


This is the abstract appender they all derive from:

package org.easytechs.recordpersister.appenders;


import java.util.ArrayList;
import java.util.List;


import org.easytechs.recordpersister.Appender;
import org.easytechs.recordpersister.NormalizedMessage;
import org.easytechs.recordpersister.RecordGenerator;






public abstract class AbstractAppender<T extends Object> implements Appender{
    /**
     */
    protected RecordGenerator<T> recordGenerator;
   
    @Override
    public void append(NormalizedMessage normalizedMessage) {
        T record = recordGenerator.generate(normalizedMessage);
        try{
            doAppend(record);
        }catch(Exception e){
            e.printStackTrace();
            //Anything else to do here???
        }
    }
   
    @Override
    public final void append(List<NormalizedMessage> messages){
        List<T> records = new ArrayList<>();
        for(NormalizedMessage message:messages){
            records.add(recordGenerator.generate(message));
        }
        doBatchAppend(records);
    }


    /**
     * Basic implementation. Override if the appender supports batch processing
     * @param records
     */
    protected void doBatchAppend(List<T> records){
        for(T record:records){
            try{
                doAppend(record);
            }catch(Exception e){
                e.printStackTrace();
                //Anything else to do here???
            }
        }
    }


    @Override
    protected void finalize() throws Throwable {
        super.finalize();
        close();
    }




    protected abstract void doAppend(T record) throws Exception;
   
    public void setRecordGenerator(RecordGenerator<T> recordGenerator){
        this.recordGenerator = recordGenerator;
    }
}

As an example of how the library would be used there are a couple of Tests. Like the following:

package org.easytechs.recordpersister;


import org.easytechs.recordpersister.GenericPersister;
import org.easytechs.recordpersister.appenders.MongoAppender;
import org.easytechs.recordpersister.normalizers.BeanToMapNormalizer;
import org.easytechs.recordpersister.recordgenerators.MongoDBFromMapGenerator;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;




public class TestBeanMongoFullDocumentPersisterITest extends AbstractTimedTest{


    /**
     */
    private GenericPersister<TestBean> testObj;


    @BeforeMethod
    public void setup() throws Exception {
        testObj = new GenericPersister<>();
        MongoAppender appender = new MongoAppender("127.0.0.1", "27017", "test-db", "ticksfull2");
        appender.setRecordGenerator(new MongoDBFromMapGenerator());
        testObj.setNormalizedMessageTransformer(new BeanToMapNormalizer<TestBean>("symbol", "value","date"));
        testObj.setAppender(appender);
    }


    @Test
    public void shouldPersistOneItem() {
        TestBean tick = new TestBean();
        tick.setSymbol("XX");
        tick.setValue("100.00");
        tick.setDate(123444l);
        testObj.persist(tick);
    }


    @Test(invocationCount=10)
    public void shouldPersistManyItems() {
        doTimed(new IndexedRunnable() {    
            @Override
            public void run(int index) throws Exception {
                TestBean tick = new TestBean();
                tick.setSymbol("XX");
                tick.setValue("100.00");
                tick.setDate(123444l);
                testObj.persist(tick);
               
            }
        }, 20000);
    }


}

If using from Spring, I’m developing a simple namespace so things like the following can be done:

 <persister:mongo-document-persister id="persister" host="127.0.0.1" port="27017" db="test-db" collection="testcol" beanProperties="propA,propB,propC"/>

 The Maven dependencies for all the drivers are:

 

               <dependency>
            <groupId>org.apache.cassandra</groupId>
            <artifactId>cassandra-all</artifactId>
            <version>1.0.10</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongo-java-driver</artifactId>
            <version>2.7.3</version>
        </dependency>
         <dependency>
            <groupId>com.basho.riak</groupId>
            <artifactId>riak-client</artifactId>
            <version>1.0.5</version>
        </dependency>
         <dependency>
            <groupId>com.google.code.jcouchdb</groupId>
            <artifactId>jcouchdb</artifactId>
            <version>0.11.0-1</version>
        </dependency>

The source code is in Github

Published at DZone with permission of Carlo Scarioni, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)

Comments

Fahmeed Nawaz replied on Tue, 2012/06/12 - 11:10am

Does WriteConcern.SAFE force the server's write buffer to flush on disk each time the WriteConcern.SAFE param is used ?
Or, simply, when WriteConcern.SAFE is used, the write just waits on the server for the next server's write buffer success-or-failure in order to return a result to the Java driver ?

Comment viewing options

Select your preferred way to display the comments and click "Save settings" to activate your changes.