Cloud Zone is brought to you in partnership with:

Pascal is a senior JEE Developer and Architect at 4Synergy in The Netherlands. Pascal has been designing and building J2EE applications since 2001. He is particularly interested in Open Source toolstack (Mule, Spring Framework, JBoss) and technologies like Web Services, SOA and Cloud technologies. Specialties: JEE XML Web Services Mule ESB Maven Cloud Technology Pascal is a DZone MVB and is not an employee of DZone and has posted 56 posts at DZone. You can read more from them at their website. View Full User Profile

Run Your Hadoop MapReduce Job on Amazon EMR

09.24.2013
| 5715 views |
  • submit to reddit

I posted a while ago about how to set up an EMR cluster using the CLI. In this post I will show you how to set up the cluster using the Java SDK for AWS. In my opinion the best way to show how to do this with the Java AWS SDK is to show a complete example, so let's get started.

  • Set up a new Maven project
  • For this task I created a new default Maven project. The main class in this project is the one that you can run to initiate the EMR cluster and perform the MapReduce job I created in this post:

package net.pascalalma.aws.emr;
 
import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.ec2.model.InstanceType;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.*;
import com.amazonaws.services.elasticmapreduce.util.StepFactory;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
 
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.UUID;
 
/**
 * Created with IntelliJ IDEA.
 * User: pascal
 * Date: 22-07-13
 * Time: 20:45
 */
public class MyClient {
 
    private static final String HADOOP_VERSION = "1.0.3";
    private static final int INSTANCE_COUNT = 1;
    private static final String INSTANCE_TYPE = InstanceType.M1Small.toString();
    private static final UUID RANDOM_UUID = UUID.randomUUID();
    private static final String FLOW_NAME = "dictionary-" + RANDOM_UUID.toString();
    private static final String BUCKET_NAME = "map-reduce-intro";
    private static final String S3N_HADOOP_JAR =
            "s3n://" + BUCKET_NAME + "/job/MapReduce-1.0-SNAPSHOT.jar";
    private static final String S3N_LOG_URI = "s3n://" + BUCKET_NAME + "/log/";
    private static final String[] JOB_ARGS =
            new String[]{"s3n://" + BUCKET_NAME + "/input/input.txt",
                    "s3n://" + BUCKET_NAME + "/result/" + FLOW_NAME};
    private static final List<String> ARGS_AS_LIST = Arrays.asList(JOB_ARGS);
    private static final List<JobFlowExecutionState> DONE_STATES = Arrays
            .asList(new JobFlowExecutionState[]{JobFlowExecutionState.COMPLETED,
                    JobFlowExecutionState.FAILED,
                    JobFlowExecutionState.TERMINATED});
    static AmazonS3 s3;
    static AmazonElasticMapReduceClient emr;
 
    private static void init() throws Exception {
        AWSCredentials credentials = new PropertiesCredentials(
                MyClient.class.getClassLoader().getResourceAsStream("AwsCredentials.properties"));
        s3 = new AmazonS3Client(credentials);
        emr = new AmazonElasticMapReduceClient(credentials);
        emr.setRegion(Region.getRegion(Regions.EU_WEST_1));
    }
 
    private static JobFlowInstancesConfig configInstance() throws Exception {
 
        // Configure instances to use
        JobFlowInstancesConfig instance = new JobFlowInstancesConfig();
        instance.setHadoopVersion(HADOOP_VERSION);
        instance.setInstanceCount(INSTANCE_COUNT);
        instance.setMasterInstanceType(INSTANCE_TYPE);
        instance.setSlaveInstanceType(INSTANCE_TYPE);
        // instance.setKeepJobFlowAliveWhenNoSteps(true);
        // instance.setEc2KeyName("4synergy_palma");
 
        return instance;
    }
 
    private static void runCluster() throws Exception {
        // Configure the job flow
        RunJobFlowRequest request = new RunJobFlowRequest(FLOW_NAME, configInstance());
        request.setLogUri(S3N_LOG_URI);
 
        // Configure the Hadoop jar to use
        HadoopJarStepConfig jarConfig = new HadoopJarStepConfig(S3N_HADOOP_JAR);
        jarConfig.setArgs(ARGS_AS_LIST);
 
        try {
 
            StepConfig enableDebugging = new StepConfig()
                    .withName("Enable debugging")
                    .withActionOnFailure("TERMINATE_JOB_FLOW")
                    .withHadoopJarStep(new StepFactory().newEnableDebuggingStep());
 
            StepConfig runJar =
                    new StepConfig(S3N_HADOOP_JAR.substring(S3N_HADOOP_JAR.indexOf('/') + 1),
                            jarConfig);
 
            request.setSteps(Arrays.asList(new StepConfig[]{enableDebugging, runJar}));
 
            //Run the job flow
            RunJobFlowResult result = emr.runJobFlow(request);
 
            //Check the status of the running job
            String lastState = "";
 
            STATUS_LOOP:
            while (true) {
                DescribeJobFlowsRequest desc =
                        new DescribeJobFlowsRequest(
                                Arrays.asList(new String[]{result.getJobFlowId()}));
                DescribeJobFlowsResult descResult = emr.describeJobFlows(desc);
                for (JobFlowDetail detail : descResult.getJobFlows()) {
                    String state = detail.getExecutionStatusDetail().getState();
                    if (isDone(state)) {
                        System.out.println("Job " + state + ": " + detail.toString());
                        break STATUS_LOOP;
                    } else if (!lastState.equals(state)) {
                        lastState = state;
                        System.out.println("Job " + state + " at " + new Date().toString());
                    }
                }
                Thread.sleep(10000);
            }
        } catch (AmazonServiceException ase) {
            System.out.println("Caught Exception: " + ase.getMessage());
            System.out.println("Reponse Status Code: " + ase.getStatusCode());
            System.out.println("Error Code: " + ase.getErrorCode());
            System.out.println("Request ID: " + ase.getRequestId());
        }
    }
 
    public static boolean isDone(String value) {
        JobFlowExecutionState state = JobFlowExecutionState.fromValue(value);
        return DONE_STATES.contains(state);
    }
 
    public static void main(String[] args) {
        try {
            init();
            runCluster();
        } catch (Exception e) {
            e.printStackTrace(); 
        }
    }
}

    In this class I first declare some constants, I assume these will be obvious. In the init() method I use the credential properties file that I added to the project. I added this file to the /main/resources folder of my Maven project. It contains my access key and secret key.
    Also I set the region to ‘EU-WEST’ for the EMR client.

    The next method is configInstance(). In this method, I create and configure  JobFlowInstance by setting the Hadoop version, number of instances, size of instances, etc. Also, you can configure the keepAlive setting to keep the cluster alive after the jobs have finished. This could be helpful in some cases. If you want to use this option it might be useful to also set the key-pair you want to use to access the cluster, because I wasn’t able to access the cluster without setting this key.

    The runCluster() method is where the cluster is actually run. It creates the request to initiate the cluster. In this request the steps are added that have to be executed. In our case, one of the steps is running the JAR file we created in the previous steps. I also added a debug step so we have access to the debug logging after the cluster is finished and terminated. We can simply access the log files in the S3 bucket that I set with the constant S3N_LOG_URI.

    When this request is created we start the cluster based on it. Then we pull every 10 seconds to see whether the job has finished and to show a message in the console indicating the current state of the job.

    To execute the first run we have to prepare the input.

  • Prepare the input
  • As an input for the job (read this for more info about this example job) we have to make the dictionary contents available for the EMR cluster. Furthermore, we have to make the JAR file available and make sure the output and log directory exists in our S3 buckets. There are several ways to do this. You can do this programmatically by using the SDK, by using S3cmd to do it from the command line, or by using the AWS Management Console. Whichever method is find as long as you end up with a similar setup to this:

    • s3://map-reduce-intro
    • s3://map-reduce-intro/input
    • s3://map-reduce-intro/input/input.txt
    • s3://map-reduce-intro/job
    • s3://map-reduce-intro/job/MapReduce-1.0-SNAPSHOT.jar
    • s3://map-reduce-intro/log
    • s3://map-reduce-intro/result

    Or, when using S3cmd, like this:

s3cmd-1.5.0-alpha1$ s3cmd ls --recursive s3://map-reduce-intro/
2013-07-20 13:06    469941   s3://map-reduce-intro/input/input.txt
2013-07-20 14:12      5491   s3://map-reduce-intro/job/MapReduce-1.0-SNAPSHOT.jar
2013-08-06 14:30         0   s3://map-reduce-intro/log/
2013-08-06 14:27         0   s3://map-reduce-intro/result/

    In the example above I already introduced an S3 client in the code. You can also use that to prepare the input or get the output as part of the client’s job.

  • Run the cluster
  • When everything is in place we can run the job. I simply run the main method MyClient in IntelliJ and get the following output in my console:

Job STARTING at Tue Aug 06 16:31:55 CEST 2013
Job RUNNING at Tue Aug 06 16:36:18 CEST 2013
Job SHUTTING_DOWN at Tue Aug 06 16:38:40 CEST 2013
Job COMPLETED: {
  JobFlowId: j-JDB14HVTRC1L
  ,Name: dictionary-8288df47-8aef-4ad3-badf-ee352a4a7c43
  ,LogUri: s3n://map-reduce-intro/log/,AmiVersion: 2.4.0
  ,ExecutionStatusDetail: {State: COMPLETED,CreationDateTime: Tue Aug 06 16:31:58 CEST 2013
    ,StartDateTime: Tue Aug 06 16:36:14 CEST 2013
    ,ReadyDateTime: Tue Aug 06 16:36:14 CEST 2013
    ,EndDateTime: Tue Aug 06 16:39:02 CEST 2013
    ,LastStateChangeReason: Steps completed}
  ,Instances: {MasterInstanceType: m1.small
    ,MasterPublicDnsName: ec2-54-216-104-11.eu-west-1.compute.amazonaws.com
    ,MasterInstanceId: i-93268ddf
    ,InstanceCount: 1
    ,InstanceGroups: [{InstanceGroupId: ig-2LURHNAK5NVKZ
      ,Name: master
      ,Market: ON_DEMAND
      ,InstanceRole: MASTER
      ,InstanceType: m1.small
      ,InstanceRequestCount: 1
      ,InstanceRunningCount: 0
      ,State: ENDED
      ,LastStateChangeReason: Job flow terminated
      ,CreationDateTime: Tue Aug 06 16:31:58 CEST 2013
      ,StartDateTime: Tue Aug 06 16:34:28 CEST 2013
      ,ReadyDateTime: Tue Aug 06 16:36:10 CEST 2013
      ,EndDateTime: Tue Aug 06 16:39:02 CEST 2013}]
    ,NormalizedInstanceHours: 1
    ,Ec2KeyName: 4synergy_palma
    ,Placement: {AvailabilityZone: eu-west-1a}
    ,KeepJobFlowAliveWhenNoSteps: false
    ,TerminationProtected: false
    ,HadoopVersion: 1.0.3}
  ,Steps: [
    {StepConfig: {Name: Enable debugging
      ,ActionOnFailure: TERMINATE_JOB_FLOW
      ,HadoopJarStep: {Properties: []
        ,Jar: s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar
        ,Args: [s3://us-east-1.elasticmapreduce/libs/state-pusher/0.1/fetch]}
    }
    ,ExecutionStatusDetail: {State: COMPLETED,CreationDateTime: Tue Aug 06 16:31:58 CEST 2013
      ,StartDateTime: Tue Aug 06 16:36:12 CEST 2013
      ,EndDateTime: Tue Aug 06 16:36:40 CEST 2013
      ,}
    }
  , {StepConfig: {Name: /map-reduce-intro/job/MapReduce-1.0-SNAPSHOT.jar
      ,ActionOnFailure: TERMINATE_JOB_FLOW
      ,HadoopJarStep: {Properties: []
        ,Jar: s3n://map-reduce-intro/job/MapReduce-1.0-SNAPSHOT.jar
        ,Args: [s3n://map-reduce-intro/input/input.txt
          , s3n://map-reduce-intro/result/dictionary-8288df47-8aef-4ad3-badf-ee352a4a7c43]}
    }
    ,ExecutionStatusDetail: {State: COMPLETED
      ,CreationDateTime: Tue Aug 06 16:31:58 CEST 2013
      ,StartDateTime: Tue Aug 06 16:36:40 CEST 2013
      ,EndDateTime: Tue Aug 06 16:38:10 CEST 2013
      ,}
    }]
  ,BootstrapActions: []
  ,SupportedProducts: []
  ,VisibleToAllUsers: false
,}
Process finished with exit code 0

    And ,of course we have a result in the result folder that we configured in our S3 bucket:
    Screen Shot 2013-08-06 at 19.39.15
    I transfer the result to my local machine and have a look at it:
    Screen Shot 2013-08-06 at 19.41.44

So that concludes this simple, but I think rather complete, example of creating a Hadoop job and running it on a cluster after having it unit tested (as we would do with all our software).

With this setup as a base it is quite easy to come up with more complex business cases and have these tested and configured to be run on AWS EMR.



Published at DZone with permission of Pascal Alma, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)