Big Data/Analytics Zone is brought to you in partnership with:

I'm a software architect/consultant in Boulder, Colorado. I started blogging at: http://wayne-adams.blogspot.com/, but have since started the new blog, Data Sceintist in Training which will cover the areas I am learning as part of my own "big data" training, including the Hadoop family, frameworks like OpenStack, the R programming language, column-oriented databases and the art of examining data sets for useful patterns. The original blog will be kept alive with Java-specific posts. Wayne is a DZone MVB and is not an employee of DZone and has posted 35 posts at DZone. You can read more from them at their website. View Full User Profile

Analyzing Mortage Data with Hadoop MapReduce: Java vs. Pig

01.07.2013
| 4759 views |
  • submit to reddit

In a recent post, I used Pig to analyze some MBS (mortgage-backed security) new-issue pool data from Fannie Mae.  At the time, I noted a number of errors processing the files in my data set, and now I'm going to go back and debug the job.  My goal is to clean up my parsing/processing logic sufficiently to get most, if not all, of the records included in my data.  Whether this is important or not depends on the task, of course.  If you are trying to reconcile records between two systems (e.g. to find lost revenue in a telecom billing system), you'd prefer not to lose any records.  If you are examining polling data to predict an election, you can probably afford to lose a few records.

I'm going to cheat a little, since I feel it's a little easier to debug in a Java MapReduce application than in Pig.  So I'm going to go back to Java, examine the data in a little more detail there, and then see what I could have done in Pig, had that been my only option.  I'll use the "new" MapReduce API available since Hadoop 0.20, referred to as "Context Objects".

To refresh your memory or to save you the trouble of reading my previous post, I'm looking through the files that Fannie Mae makes available after each business day, showing the new issues of mortgage pools from the day before.  I had processed the files to determine the total amount of unpaid balance (UPB) on a per-state basis.  Another important point I will repeat here is that these files contain records of several different types (and widths); you determine the record type by looking at column #2 and then referring to a file that contains the description for that record type.
Since my last post on this topic, a few more NIPS (New Issue Pool Statistics) files have been made available.  I import all of them into HDFS with

hadoop dfs -copyFromLocal /home/hduser/fannie-mae-nips /user/hduser

This set gives me data from 23 August to 26 December 2012.

 The Java MapReduce Job
I write a mapper and reducer, the main details of which follow.  First, my (Maven) project:

 mvn archetype:generate -DgroupId=com.example.fannieMaeNips 
-DartifactId=fannie-mae-nips -DarchetypeArtifactId=maven-archetype-quickstart 
-DinteractiveMode=false
Next, edit the pom.xml file and add the Hadoop dependency:

<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-core</artifactId>
  <version>1.1.0</version>
</dependency>
I drill down into the Java source directory, delete the Maven-supplied sample class, and createNipsMapReduce.java.  The new MapReduce API is found in the org.apache.hadoop.mapreduce package, rather than the old org.apache.hadoop.mapred.  I mention this again because the similarities between the classes in the two packages can lead to some odd errors at compile (or run) time if you use one package and "write to the other."  For more details, see online resources for Hadoop or Tom White'sHadoop:  The Definitive Guide

Here is the full source code for the mapper and reducer:

package com.example.fannieMaeNips;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class NipsMapReduce
{

  static class NipsMapReduceMapper
  extends Mapper<LongWritable, Text, Text, DoubleWritable> {
  private Text state = new Text();
  public void map(LongWritable key, Text value, Context context)
  throws IOException, InterruptedException {
  // the default InputFormat for Mapper is TextInputFormat,
  // which splits a file into lines.
  // here, the key is the offset into the file; 
  // value is the value of the current line of text:
  String nextLine = value.toString();
  // note the argument to java.lang.String.split() is a regex:
  String fields[] = nextLine.split("\\|");
  // are we looking at record type = 9 (Geographic Distribution)?
  if (fields != null && fields.length == 6 && Integer.parseInt(fields[1]) == 9) {
  // set the key to the name of the state:
  state.set(fields[2]);
  // parse the Aggregate Unpaid Balance field
  // "$ddd,ddd,ddd.dd" format into a double:
  double aggUPB =
  Double.parseDouble(fields[5].substring(1).replace(",", ""));
System.out.println("Adding upb " + aggUPB + " (" + fields[5] + ") to state " + fields[2]);
  // write the state and the aggregate unpaid balance 
  // out to the Context:
  context.write(state, new DoubleWritable(aggUPB));
  }
  }
  }
  static class NipsMapReduceReducer
  extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
  public void reduce(Text key, Iterable<DoubleWritable> values, Context context)
  throws IOException, InterruptedException {
  double sum = 0.0;
  for (DoubleWritable value: values) {
System.out.println("Summing " + value.get() + " into state '" + key + "'");
  sum += value.get();
  }
  context.write(key, new DoubleWritable(sum));
  }
  }

  public static void main(String[] args) throws Exception
  {
  if (args.length != 2) {
  System.err.println("Usage: NipsMapReduce <input path> <output path>");
  System.exit(-1);
  }
  Job job = new Job();
  job.setJarByClass(NipsMapReduce.class);
  FileInputFormat.addInputPath(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  job.setMapperClass(NipsMapReduceMapper.class);
  job.setReducerClass(NipsMapReduceReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(DoubleWritable.class);
  System.exit(job.waitForCompletion(true)? 0:1);
  }


I've added a couple of comments to help, plus I put in some output to stdout to check in the log files after the job runs.  After I build the class, I export the path to the jarfile with HADOOP_CLASSPATH:
export HADOOP_CLASSPATH=/home/hduser/dev/fannie-mae-nips/target/fannie-mae-nips-1.0-SNAPSHOT.jar
At which point, we can run:
bin/hadoop com.example.fannieMaeNips.NipsMapReduce /user/hduser/fannie-mae-nips /user/hduser/fannie-mae-nips-output
Hadoop is running in pseudo-distributed node, so the input and output paths are in HDFS.

There's a fair amount of output to stdout, generated by Hadoop, during the run.  My output to stdout is captured in the Hadoop logs.  Somewhere near the end of the output, we see the following:
12/12/27 16:17:16 INFO mapred.JobClient:  Reduce input groups=54
This number includes the 50 states, plus the District of Columbia, Puerto Rico, Guam and the U.S. Virgin Islands.

If I look at the Hadoop log files, I will see my output to stdout in the "stdout" files for each "attempt".  To see the "stdout" and "stderr" files, I switched to the userlogs output directory for this job:

cd $HADOOP_PREFIX/logs/userlogs/job_201212271420_0001
Here's some example output of my mapper, followed by sample output from my reducer:

...
Adding upb 339366.0 ($339,366.00) to state ILLINOIS
Adding upb 679920.0 ($679,920.00) to state KENTUCKY
Adding upb 440944.88 ($440,944.88) to state NORTH CAROLINA
Adding upb 624000.0 ($624,000.00) to state OHIO
Adding upb 194238.0 ($194,238.00) to state SOUTH CAROLINA
...

...
Summing 988363.58 into state 'MASSACHUSETTS'
Summing 372349.79 into state 'MASSACHUSETTS'
Summing 2015439.31 into state 'MASSACHUSETTS'
Summing 1237239.1 into state 'MASSACHUSETTS'
Summing 250000.0 into state 'MASSACHUSETTS'
...
Note how the mapper is processing states as they appear in the NIPS files, while this particular reducer has been given all the values for a particular key ("MASSACHUSETTS") as a group.  I also notice that all of my "stderr" files are empty, leading me to believe that I had no parse errors.  The directories of each attempt are links, so to recursively list them and grep for the stderr entries, I issued the followed command:

ls -lLR | grep stderr | less
which gives me a stream of output like the following:

...
-rw-r--r-- 1 hduser hadoop  0 Dec 27 16:16 stderr
-rw-r--r-- 1 hduser hadoop  0 Dec 27 16:16 stderr
-rw-r--r-- 1 hduser hadoop  0 Dec 27 16:17 stderr
-rw-r--r-- 1 hduser hadoop  0 Dec 27 16:12 stderr
-rw-r--r-- 1 hduser hadoop  0 Dec 27 16:12 stderr
... 

So it appears the Java MapReduce application parsed all the files with no errors.  This result makes me curious to try my earlier post's Pig queries against the same data set. 
Revisiting the Pig Query Example

From my earlier post, I retrieve and modify my Pig script to point to the current data set:

nips = load '/user/hduser/fannie-mae-nips' using PigStorage('|') as (poolNumber:bytearray, recordType:int, state:bytearray, numberOfLoans:int, percentageUpb:float, aggregateUpb:bytearray);
fr = filter nips by (recordType == 9);
fr_clean = foreach fr generate poolNumber, state, numberOfLoans, percentageUpb, (float)REPLACE(REPLACE(aggregateUpb, '\\\$', ''), ',', '') as upbFloat;
byState = group fr_clean by state;
totalUpb = foreach byState generate group, SUM(fr_clean.upbFloat)/1000000.0 as total;
sortedUpb = order totalUpb by total;
dump sortedUpb;

Then, I run it with:
pig -f nipsDump.pig &> nipsDumpPig.log
First, let's compare a few values.  For California, my Java MapReduce program returned the following total:

bin/hadoop dfs -cat /user/hduser/fannie-mae-nips-output/part-r-00000 | grep CALIFORNIA
CALIFORNIA8.911464282207979E10
while the Pig output, scaled by 10^6 (dollars), is
(CALIFORNIA,89114.64280685547)
In other words, identical.  Going back to the Pig log file, I see2012-12-28 11:36:18,471 [main] WARN  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Encountered Warning ACCESSING_NON_EXISTENT_FIELD 3348 time(s).2012-12-28 11:36:18,476 [main] WARN  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Encountered Warning FIELD_DISCARDED_TYPE_CONVERSION_FAILED 41331 time(s).and yet, after checking all 54 outputs of both jobs, they are identical.  And there were no errors reported in the Java MapReduce program.  A false alarm, in other words, due to my inexperience with Pig.A quick Internet search shows that I have likely run into an issue related to the unique nature of the records in the NIPS files, namely, that they contain records of several types (and widths).  My issue probably occurs since my Pig parse command attempts to grab the first 6 elements of every row, and not every row has 6 elements.  This issue doesn't occur in the Java MapReduce program because I check the length of the array into which my line has been split before processing it.  This failure only generates a warning, of course, and so does not affect the results of the Pig query.To test to see if this really is the case, I add an additional counter to the Java MapReduce to count the number of times that a row has less than 6 elements.  Since my mapper is outputting DoubleWritables, I increment the key's value by 1.0 on each hit to keep things simple.  It should be easy to find the "non existent field" warning.  Looking at Fannie Mae's NIPS file format, it appears there are 3 records types with fewer than 6 columns, and hopefully for this file set they will add up to 3348.Finding the type-conversion warnings might be a little more challenging, and time-consuming (especially since this is mostly for my own amusement) -- I would need to check every non-interesting line to see if the fields don't match my expected types.  For example, for record type '01', column 5 is a character string, which would cause a failure, since I try to parse it as a float.  I think I will just check for the "fewer than 6 columns" issue as a sanity check.

Adding the following line to the mapper of my Java MapReduce program, after the "if" statement evaluating the number of columns:

      else if (fields != null && fields.length < 6) {
        state.set("ShortRecords");
        context.write(state, new DoubleWritable(1.0));
      }

and re-running with:
export HADOOP_CLASSPATH=/home/hduser/dev/fannie-mae-nips/target/fannie-mae-nips-1.0-SNAPSHOT.jar
bin/hadoop com.example.fannieMaeNips.NipsMapReduce /user/hduser/fannie-mae-nips /user/hduser/fannie-mae-nips-output2
I get the following output:

SOUTH DAKOTA    1.0024923021099998E9
ShortRecords    3348.0
TENNESSEE       4.426305956350002E9
right in order, as expected, between South Dakota and Tennessee.When I started this post, it was intended to be about debugging MapReduce jobs.  That would be a nice post, although not very original.  It ended up being a post comparing Java MapReduce with Pig MapReduce; hopefully it was interesting.  If nothing else it shows how compact a Pig query can be, even though the Java code wasn't very verbose either, for this admittedly simple case.

Published at DZone with permission of Wayne Adams, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)