Big Data/BI Zone is brought to you in partnership with:

Arnon Rotem-Gal-Oz is architecture manager for Nice Systems ltd. Arnon has more than 20 years of experience developing, managing and architecting large distributed systems using varied platforms and technologies. Arnon is the author of SOA Patterns from Manning publications. Arnon is a DZone MVB and is not an employee of DZone and has posted 66 posts at DZone. You can read more from them at their website. View Full User Profile

How to Develop Map/Reduce with Reduced Assumptions

05.10.2012
| 4973 views |
  • submit to reddit
It all started with this odd bug…

 One of our teams is writing a service, that among other things, runs map/reduce jobs built as pig scripts with Java UDFs. The scripts accepts CSV files which have a text header followed by lines of data. It performs some grouping and then calls a UDF which essentially filter, enrich and transforms the data outputting another CSV with a new header followed by data – something like the following:

input = load '/data/INPUT2.dat' using PigStorage(',')...
grouped = GROUP input BY key;
results = FOREACH grouped GENERATE  evaluateUDF(input) as output;
STORE output...

and all was well. The job spawns a few map tasks partitions, groups the data and runs a single reduce where the actual evaluation happens.

Then someone wanted it to run faster – We can do that by adding reducers we can do that by adding PARALLEL X to the group statement where X is the number of reducers we want. And then the trouble began. Everything worked well for up to 5 reducers – go any higher and few results were lost. The script is pretty basic (the actual script looks a little different but that’s basically it) the UDF is not earth shattering yet still sometimes results are lost.

Even though I am very busy (well, not really, but you know, some members of my team might be reading this…) this was so annoying I just had to understand what happen so I took it upon myself to debug this. The main problem was I couldn’t write a unit test to duplicate the behavior  - it only occurred when running on the large file…

First thing I noticed is that we had duplicate results in the output, I removed those and saw that actually we’re only losing one result (again, only with 5 or more reducers). I diffed the  correct output with faulted one and I even had the problematic key.

I told myself something must be wrong with the partitioner  - The default partitioner that pig (and hadoop in general) uses is fairly simple it takes the hash code of the key, ands it with max int (so we get a positive number) and divides that by the number of partitions:

 return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;

I couldn’t really see what might be wring with this but still I wrote my own instead:

 
public class Partition extends Partitioner {
    //@Override
    private final static Logger logger = LoggerFactory.getLogger(Partition.class);
    public int getPartition(PigNullableWritable key, Writable value, int numPartitions) {
        try {
            if(key.getValueAsPigType() instanceof String) {
                String skey=((String)key.getValueAsPigType()).replaceAll("\"","");
                Long ikey= Long.getLong(skey);
                return (int)((ikey& Long.MAX_VALUE) % numPartitions);
            }
            else {
                return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;
            }
        }
        catch (Exception e ){
            return 0;
        }
    }
}

Problem solved – everything works.  I guess that damn Hadoop partitioner is all flawed – not! – So now I am even more annoyed because I really don’t understand what’s going on here. but at least now I control the partitioner so I added some logging. I saw that the problematic key goes to partition 0 – so I made everything go to partition 0 and everything worked. then it finally stuck me. I logged all the keys that go into partition 0 and saw that the problematic key is the first .   Duh!  I should have noticed that earlier. Here’s what the UDF code essentially looks like:

	public DataBag exec(Tuple input) throws IOException
	{
            if (session == null){
                 session = ....
                 return addHeadersNames();
            }
 
            return  eval(input);
 
	}

The Tuple is the input we get from the pig script into our UDF, what happens here is that we throw out the first input we get and just output the headers for our result file. As it happens when we have enough partitions a significant input (the missing result) is the first we get in the partition

The developer who wrote this assumed that the first input line the UDF would get would be the input header file. It is alright to discard it and output the new header file instead. However Hadoop doesn’t work like that when we have code that runs in a map/reduce it might be called in different circumstances that we originally assumed and we have to pay attention to that. For instance, another problem with the code above is that when run with multiple reducers you get multiple copies of the output header…

So there you have, the title says it all : develop map/reduce code with reduced assumptions

Published at DZone with permission of Arnon Rotem-gal-oz, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)

Comments

Vijay Bhaskar replied on Sun, 2013/10/13 - 4:54am

Partition should be configured properly especially when writing our own partition logic. freshers it jobs

Comment viewing options

Select your preferred way to display the comments and click "Save settings" to activate your changes.