Big Data/BI Zone is brought to you in partnership with:

Istvan Szegedi is an IT Technical Architect at Vodafone UK. He has been working at Hewlett-Packard, Nokia Networks, Google, Morgan Stanley and Vodafone. He holds certificates such as Sun Certified System Administrator, Sun Certified Java Programmer, Sun Certified Web Component Developer, Salesforce.com Certified Force.com Developer, TOGAF Certified Enterprise Architect. As a big fan of mobile and cloud computing, he likes to believe that these technologies will eventually push aside the desktop/client-server architecture Istvan is a DZone MVB and is not an employee of DZone and has posted 37 posts at DZone. You can read more from them at their website. View Full User Profile

Working with HBase and Hadoop

07.14.2012
| 10264 views |
  • submit to reddit

HBase is a NoSQL database. It is based on Google’s Bigtable distributed storage system – as it is described in Google research paper; “A Bigtable is a sparse, distributed, persistent multi-dimensional sorted map. The map is indexed by a row key, column key, and a timestamp; each value in the map is an uninterpreted array of bytes.” If you want to have a detailed explanation what each word means in this scary definition, I suggest to check this post out.

HBase supports scaling far beyond traditional RDBMS capabilities, it supports automatic sharding and  massive parallel processing capabilities via Mapreduce. HBase is built on top of HDFS and provides fast lookups for large records. See more details about HBase Architecture here.

HBase can be used as data source as well as data sink for Mapreduce jobs. Our example in this post will use HBase as data sink. If you are interested in other examples, have a look at Hadoop wiki, HBase as MapRedude job data source and sink.

HBase distributed storage for stock price information

The example is going to process Apple stock prices downloaded from Yahoo finance web site, this is the same dataset – Apple stock prices –  that we used previously to demonstrate Hive capabilities on Amazon Elastic MapReduce.  It is stored in an AWS S3 bucket called stockprice. The MapReduce job will retrieve the file from there using s3n://AWS Access Key ID:AWS Secret Access Key//bucket/object  url and will store the output in a HBase table called aapl_marketdata. The test environment was based on Hadoop-0.20.2 and HBase-0.90.6.

 Before the MapReduce job can be run, the table needs to be created

$ bin/hbase shell
HBase Shell; enter 'help' for list of supported commands.
Type "exit" to leave the HBase Shell
Version 0.90.6, r1295128, Wed Feb 29 14:29:21 UTC 2012

hbase(main):005:0> create 'aapl_marketdata', 'marketdata'
0 row(s) in 1.4290 seconds

hbase(main):001:0> list
TABLE
aapl_marketdata
1 row(s) in 0.3950 seconds

Now we are ready to run the MapReduce job. It is advisable to have a driver script to run your job and set all the required arguments in there for easier configuration but in essence it is just a plain old java code.

My script looks like this:

$ cat hb.sh
java -classpath /home/ec2-user/hadoop/hadoop-0.20.2-ant.jar:/home/ec2-user/hadoop/hadoop-0.20.2-core.jar:/home/ec2-user/hadoop/hadoop-0.20.2-tools.jar:/home/ec2-user/hadoop/lib/jets3t-0.6.1.jar:/home/ec2-user/aws-java-sdk-1.3.11/aws-java-sdk-1.3.11.jar:/home/ec2-user/hbase/hbase-0.90.6.jar:/home/ec2-user/hbase/lib/commons-codec-1.4.jar:/home/ec2-user/hbase/lib/commons-httpclient-3.1.jar:/home/ec2-user/hbase/lib/commons-cli-1.2.jar:/home/ec2-user/hbase/lib/commons-logging-1.1.1.jar:/home/ec2-user/hbase/lib/zookeeper-3.3.2.jar:/home/ec2-user/hbase/lib/log4j-1.2.16.jar:json_io_1.0.4.jar:awsdemo-hbase.jar:/home/ec2-user/core-site.xml org.awsdemo.hbase.MarketDataApplication s3n://AWSAccessKeyId:AWSSecretAccessKey@stockprice/apple/input/APPL_StockPrices.csv s3n://AWSAccessKeyId:AWSSecretAccessKey@stockprice/apple/output/

Once the MapReduce job was successfully finished, we can check the result in HBase table using bin/hbase shell.

hbase(main):001:0> get ‘table_name’, ‘rowkey’

e.g. hbase(main):001:0> get ‘aapl_marketdata’, ‘AAPL-1984-10-25′

hbase(main):001:0> get 'aapl_marketdata', 'AAPL-1984-10-25'
COLUMN                                  CELL
 marketdata:daily                       timestamp=1341590928097, value={"@type":"org.apache.hadoop.io.MapWritable","@keys":[{"@type":"org.apache.hadoop.io
                                        .Text","bytes":[115,116,111,99,107,83,121,109,98,111,108],"length":11},{"@type":"org.apache.hadoop.io.Text","bytes
                                        ":[115,116,111,99,107,80,114,105,99,101,76,111,119],"length":13},{"@type":"org.apache.hadoop.io.Text","bytes":[115
                                        ,116,111,99,107,80,114,105,99,101,79,112,101,110],"length":14},{"@type":"org.apache.hadoop.io.Text","bytes":[100,9
                                        7,116,101],"length":4},{"@type":"org.apache.hadoop.io.Text","bytes":[115,116,111,99,107,80,114,105,99,101,67,108,1
                                        11,115,101],"length":15},{"@type":"org.apache.hadoop.io.Text","bytes":[115,116,111,99,107,80,114,105,99,101,65,100
                                        ,106,67,108,111,115,101],"length":18},{"@type":"org.apache.hadoop.io.Text","bytes":[115,116,111,99,107,86,111,108,
                                        117,109,101],"length":11},{"@type":"org.apache.hadoop.io.Text","bytes":[115,116,111,99,107,80,114,105,99,101,72,10
                                        5,103,104],"length":14}],"@items":[{"@type":"org.apache.hadoop.io.Text","bytes":[65,65,80,76],"length":4},{"@type"
                                        :"org.apache.hadoop.io.Text","bytes":[50,53,46,50,53],"length":5},{"@type":"org.apache.hadoop.io.Text","bytes":[50
                                        ,54,46,50,53],"length":5},{"@type":"org.apache.hadoop.io.Text","bytes":[49,57,56,52,45,49,48,45,50,53],"length":10
                                        },{"@type":"org.apache.hadoop.io.Text","bytes":[50,53,46,50,53],"length":5},{"@type":"org.apache.hadoop.io.Text","
                                        bytes":[50,46,56,56],"length":4},{"@type":"org.apache.hadoop.io.Text","bytes":[53,54,55,54,48,48,48],"length":7},{
                                        "@type":"org.apache.hadoop.io.Text","bytes":[50,54,46,50,53],"length":5}]}
1 row(s) in 0.4140 seconds

The output was generated by JsonWriter and then serialized and stored in HBase, so it requires some ASCII skills to decode the values. E.g. “115,116,111,99,107,83,121,109,98,111,108″ means stockSymbol, “115,116,111,99,107,80,114,105,99,101,76,111,119″ means stockPriceLow, “115 ,116,111,99,107,80,114,105,99,101,79,112,101,110″ means stockPriceOpen, etc. “65,65,80,76″ means AAPL, “50,53,46,50,53″ means 25.25, you know the rest.

You can also scan the entire table with Hbase shell using

hbase(main):001:0> scan 'aapl_marketdata'

command. If you are done and want to get rid of the data, you need to disable the table and then you can drop it.

hbase(main):002:0> disable 'aapl_marketdata'
0 row(s) in 2.1490 seconds

hbase(main):004:0> drop 'aapl_marketdata'
0 row(s) in 1.1790 seconds

The MapReduce code

The code consist of 4 files.

MarketDataApplication.java:

package org.awsdemo.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;

public class MarketDataApplication {
    public static void main(String[] args) throws Exception {
        System.out.println("MarketDataApplication invoked");
    	int m_rc = 0;
        m_rc = ToolRunner.run(new Configuration(), new MarketDataDriver(), args);
        System.exit(m_rc);
    }
}

MarketDataDriver.java

package org.awsdemo.hbase;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import  org.apache.hadoop.conf.Configured;

public class MarketDataDriver extends Configured implements Tool {
	   @Override
	    public int run(String[] args) throws Exception { 

	        Configuration conf = new Configuration();
	        Job job = new Job(conf, "AAPL Market Data Application");
	        job.setJarByClass(MarketDataApplication.class);
	        job.setInputFormatClass(TextInputFormat.class);
	        job.setMapperClass(MarketDataMapper.class);
	        job.setReducerClass(MarketDataReducer.class);
	        job.setMapOutputKeyClass(Text.class);
	        job.setMapOutputValueClass(MapWritable.class);

	        FileInputFormat.addInputPath(job, new Path(args[0]));
	        FileOutputFormat.setOutputPath(job, new Path(args[1]));

	        TableMapReduceUtil.initTableReducerJob("aapl_marketdata",
	                MarketDataReducer.class, job);

	        boolean jobSucceeded = job.waitForCompletion(true);
	        if (jobSucceeded) {
	            return 0;
	        } else {
	            return -1;
	        }
	    }

	}

MarketDataMapper.java

package org.awsdemo.hbase;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MarketDataMapper extends
    Mapper {

	public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {

	final String APPL_STOCK_SYMBOL = "AAPL";

    final Text STOCK_SYMBOL = new Text("stockSymbol");
    final Text DATE = new Text("date");
    final Text STOCK_PRICE_OPEN = new Text("stockPriceOpen");
    final Text STOCK_PRICE_HIGH = new Text("stockPriceHigh");
    final Text STOCK_PRICE_LOW = new Text("stockPriceLow");
    final Text STOCK_PRICE_CLOSE = new Text("stockPriceClose");
    final Text STOCK_VOLUME = new Text("stockVolume");
    final Text STOCK_PRICE_ADJ_CLOSE = new Text("stockPriceAdjClose");

    String strLine = "";

    strLine = value.toString();
    String[] data_values = strLine.split(",");
    MapWritable marketData = new MapWritable();
    marketData.put(STOCK_SYMBOL, new Text(APPL_STOCK_SYMBOL));
    marketData.put(DATE, new Text(data_values[0]));
    marketData.put(STOCK_PRICE_OPEN, new Text(data_values[1]));
    marketData.put(STOCK_PRICE_HIGH, new Text(data_values[2]));
    marketData.put(STOCK_PRICE_LOW, new Text(data_values[3]));
    marketData.put(STOCK_PRICE_CLOSE, new Text(data_values[4]));
    marketData.put(STOCK_VOLUME, new Text(data_values[5]));
	marketData.put(STOCK_PRICE_ADJ_CLOSE, new Text(data_values[6]));

	context.write(new Text(String.format("%s-%s", APPL_STOCK_SYMBOL, data_values[0])), marketData);	    
  }
}

MarketDataReducer.java

package org.awsdemo.hbase;

import java.io.IOException;

import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

import com.cedarsoftware.util.io.JsonWriter;

public class MarketDataReducer extends TableReducer {
   public void reduce(Text arg0, Iterable arg1, Context context) {
   // Since the complex key made up of stock symbol and date is unique
   // one value comes for a key.

   MapWritable marketData = null;
   for (MapWritable value : arg1) {
       marketData = value;
       break;
   }

   ImmutableBytesWritable key = new ImmutableBytesWritable(Bytes.toBytes(arg0.toString()));
   Put put = new Put(Bytes.toBytes(arg0.toString()));

   put.add(Bytes.toBytes("marketdata"), Bytes.toBytes("daily"), Bytes.toBytes(JsonWriter.toJson(marketData)));
   try {
       context.write(key, put);
   } catch (IOException e) {
       // TODO Auto-generated catch block
   } catch (InterruptedException e) {
       // TODO Auto-generated catch block
   }
  }
}

Amazon EMR HBase

Amazon Web Services recently launched HBase on it Elastic MapReduce. It runs on the Amazon distribution of Hadoop 0.20.205 (as of writing this post, it is not available yet on MapR M3 or M5 distributions).

You can configure it using Create a New Job Flow menu:

Then select the EC2 instance (they need to be Large or bigger). If you like you can also add Hive or Pig:

Then you can define EC2 keys (if you want to login to the instances using ssh, you need to add your key)

Check summary page and the launch HBase by clicking on Create Job Flow :

The instance will be seen as WAITING status in AWS EMR console:

Now you can login using ssh (and your ssh key) and you can start hbase shell, just as we discussed before:

$ bin/hbase shell
HBase Shell; enter 'help' for list of supported commands.
Type "exit" to leave the HBase Shell

You can also check the running HBase instances on AWS EC2 console:


 

 

Published at DZone with permission of Istvan Szegedi, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)