Extra Cookie

Yet Another Programmer's Blog

The Hadoop 2.x - Running a YARN Job (1)

In last blog post, a hadoop distribution is built to run a YARN job.

1
2
3
4
$ bin/hadoop jar share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.2.0.jar \
    org.apache.hadoop.yarn.applications.distributedshell.Client -jar \
    share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.2.0.jar \
    -shell_command 'date' -shell_args "-u" -num_containers 2

The date -u command is executed in Hadoop cluster by above script, we might conclude that there exists a dispatcher named Client in “hadoop-yarn-applications-distributedshell-2.2.0.jar”, responsible for deploying a jar to cluster with parameters, such as shell command and args, and notify the cluster to execute the shell command.

To see what’s in the rabbit hole, let’s step into the Client source code.

Code snippets will be full of this post, to not confuse you, all comments added by me begin with //** instead of // or /* and the code can be cloned from Apache Git Repository, commit id is 2e01e27e5ba4ece19650484f646fac42596250ce.

The Process Logic

Since the Client is started as a process, we’d better to look into the main method first.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L164
public static void main(String[] args) {
  boolean result = false;
  try {
    Client client = new Client();
    LOG.info("Initializing Client");
    try {
      boolean doRun = client.init(args);
      if (!doRun) {
        System.exit(0);
      }
    } catch (IllegalArgumentException e) {
      ....
    }
    result = client.run();
  } catch (Throwable t) {
    ...
  }
  ...
}

There are three procedures, first, constructs a Client instance, then initializes it, and invokes the run method of it.

The Client Instance

There are three constructors in Client, the main method calls the default one with no parameters.

1
2
3
4
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L227
public Client() throws Exception  {
  this(new YarnConfiguration());
}

The default constructor creates a YarnConfiguration instance and bypasses it to another constructor.

1
2
3
4
5
6
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L194
public Client(Configuration conf) throws Exception  {
  this(
    "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster",
    conf);
}

Then sets “org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster” as appMasterClass.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L200
Client(String appMasterMainClass, Configuration conf) {
  this.conf = conf;
  //** Set appMasterMainClass
  this.appMasterMainClass = appMasterMainClass;
  //** This method call will create a YarnClientImpl instance
  yarnClient = YarnClient.createYarnClient();
  //** Init the yarn client
  yarnClient.init(conf);
  //** Create options for command parameters
  //** Will be parsed by GnuParser later
  opts = new Options();
  opts.addOption("appname", true, "Application Name. Default value - DistributedShell");
  opts.addOption("priority", true, "Application Priority. Default 0");
  opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
  opts.addOption("timeout", true, "Application timeout in milliseconds");
  opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
  opts.addOption("jar", true, "Jar file containing the application master");
  opts.addOption("shell_command", true, "Shell command to be executed by the Application Master");
  opts.addOption("shell_script", true, "Location of the shell script to be executed");
  opts.addOption("shell_args", true, "Command line args for the shell script");
  opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
  opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
  opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
  opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
  opts.addOption("log_properties", true, "log4j.properties file");
  opts.addOption("debug", false, "Dump out debug information");
  opts.addOption("help", false, "Print usage");
}

The YarnClientImpl extends from YarnClient which extends from AbstractService which implements from Service, the main job of it is to control the service life-cycle,

The YarnClientImpl is created and initialized.

Since the init method can’t be found in YarnClientImpl as well as YarnClient, the actual init happens in AbstractService.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//** org.apache.hadoop.service.AbstractService.java L151
public void init(Configuration conf) {
  if (conf == null) {
    throw new ServiceStateException("Cannot initialize service "
                                    + getName() + ": null configuration");
  }
  if (isInState(STATE.INITED)) {
    return;
  }
  synchronized (stateChangeLock) {
    if (enterState(STATE.INITED) != STATE.INITED) {
      setConfig(conf);
      try {
        serviceInit(config);
        if (isInState(STATE.INITED)) {
          //if the service ended up here during init,
          //notify the listeners
          notifyListeners();
        }
      } catch (Exception e) {
        noteFailure(e);
        ServiceOperations.stopQuietly(LOG, this);
        throw ServiceStateException.convert(e);
      }
    }
  }
}

It first checks whether the state isInState STATE.INITED, if not, enterState(STATE.INITED), and calls the serviceInit method, when the state is successfully transferred to STATE.INITED, notifyListeners() is called.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//** org.apache.hadoop.service.AbstractService.java L415
private void notifyListeners() {
  try {
    listeners.notifyListeners(this);
    globalListeners.notifyListeners(this);
  } catch (Throwable e) {
    LOG.warn("Exception while notifying listeners of " + this + ": " + e,
             e);
  }
}

//** org.apache.hadoop.service.ServiceOperations.java L139
public void notifyListeners(Service service) {
  //take a very fast snapshot of the callback list
  //very much like CopyOnWriteArrayList, only more minimal
  ServiceStateChangeListener[] callbacks;
  synchronized (this) {
    callbacks = listeners.toArray(new ServiceStateChangeListener[listeners.size()]);
  }
  //iterate through the listeners outside the synchronized method,
  //ensuring that listener registration/unregistration doesn't break anything
  for (ServiceStateChangeListener l : callbacks) {
    l.stateChanged(service);
  }
}

notifyListeners notifies all its listeners and global listeners to change their states correspondingly.

But, what is the STATE?

The State Model

Let’s go back to the YarnClient.createYarnClient() method.

1
2
3
4
5
//** org.apache.hadoop.yarn.client.api.YarnClient.java L55
public static YarnClient createYarnClient() {
  YarnClient client = new YarnClientImpl();
  return client;
}

The YarnClientImpl instance is created.

1
2
3
4
//** org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.java L86
public YarnClientImpl() {
  super(YarnClientImpl.class.getName());
}
1
2
3
4
//** org.apache.hadoop.yarn.client.api.YarnClient.java L60
protected YarnClient(String name) {
  super(name);
}
1
2
3
4
5
//** org.apache.hadoop.service.AbstractService.java L111
public AbstractService(String name) {
  this.name = name;
  stateModel = new ServiceStateModel(name);
}

Bingo, that’s the state model: ServiceStateModel.

1
2
3
4
5
6
7
8
9
//** org.apache.hadoop.service.ServiceStateModel.java L66
public ServiceStateModel(String name) {
  this(name, Service.STATE.NOTINITED);
}

public ServiceStateModel(String name, Service.STATE state) {
  this.state = state;
  this.name = name;
}

The state model is simply a name state pair, the name is the service implementation class name.

The isInState checks the state value.

1
2
3
4
//** org.apache.hadoop.service.ServiceStateModel.java L84
public boolean isInState(Service.STATE proposed) {
  return state.equals(proposed);
}

The enterState changes the state value after checkStateTransition.

1
2
3
4
5
6
7
8
//** org.apache.hadoop.service.ServiceStateModel.java L110
public synchronized Service.STATE enterState(Service.STATE proposed) {
  checkStateTransition(name, state, proposed);
  Service.STATE oldState = state;
  //atomic write of the new state
  state = proposed;
  return oldState;
}

The state transferring is checked by looking up the statemap with current state, then return a boolean to indicate whether the state transition is valid, if it’s invalid, the checkStateTransition throws ServiceStateException.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//** org.apache.hadoop.service.ServiceStateModel.java L125
public static void checkStateTransition(String name,
                                        Service.STATE state,
                                        Service.STATE proposed) {
  if (!isValidStateTransition(state, proposed)) {
    throw new ServiceStateException(name + " cannot enter state "
                                    + proposed + " from state " + state);
  }
}

public static boolean isValidStateTransition(Service.STATE current,
                                             Service.STATE proposed) {
  boolean[] row = statemap[current.getValue()];
  return row[proposed.getValue()];
}

Then what’s in the statemap?

1
2
3
4
5
6
7
8
9
//** org.apache.hadoop.service.ServiceStateModel.java L35
private static final boolean[][] statemap =
  {
    //                uninited inited started stopped
    /* uninited  */    {false, true,  false,  true},
    /* inited    */    {false, true,  true,   true},
    /* started   */    {false, false, true,   true},
    /* stopped   */    {false, false, false,  true},
  };

That’s the state model we are looking for. The current state is the row index, the proposed state is the column index, the value is whether the current state can be transfered to proposed state.

The Command Initialization

Go back again, to when the YarnClientImpl is about to be initialized, the init method of its super AbstractService calls the serviceInit method and the state is transfered to STATE.INITED.

YarnClientImpl has implemented the serviceInit interface.

1
2
3
4
5
6
7
8
//** org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.java L96
protected void serviceInit(Configuration conf) throws Exception {
  this.rmAddress = getRmAddress(conf);
  statePollIntervalMillis = conf.getLong(
      YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
      YarnConfiguration.DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS);
  super.serviceInit(conf);
}

The address of Resource Manager, rmAddress is assigned from the configuration instance.

Now the Client instance is created successfully, init method is invoked by main to initialize the instance.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//** org.apache.hadoop.yarn.applications.distributedshell.java L244
public boolean init(String[] args) throws ParseException {

  CommandLine cliParser = new GnuParser().parse(opts, args);

  ...

  appName = cliParser.getOptionValue("appname", "DistributedShell");
  amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
  amQueue = cliParser.getOptionValue("queue", "default");
  amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));

  ...

  appMasterJar = cliParser.getOptionValue("jar");
  ...

  shellCommand = cliParser.getOptionValue("shell_command");

  ...

  return true;
}

The command line options is parsed, and assigned to instance variables for later usage.

The Client is ready to run.

The Hadoop 2.x - Introduction to YARN

The Old MapReduce

The Hadoop 0.x MapReduce system composed of JobTracker and TaskTrackers.

The JobTracker is responsible for resource management, tracking resource usage and job life-cycle management, e.g. scheduling job tasks, tracking progress, providing fault-tolerance for tasks.

The TaskTracker is the per-node slave for JobTracker, takes orders from the JobTracker to launch or tear-down tasks, and provides task status information to the JobTracker periodically.

For those years, we are benefited from the MapReduce framework, it’s the most successful programming model in the big data world.

But MapReduce is not everything, we need to do graph processing, or real-time stream processing, since Hadoop is essentially batch oriented, we have to look for other systems to do those work.

And the hadoop community made a huge change.

The Hadoop YARN

The fundamental idea of YARN is to split up the two major responsibilities of the JobTracker i.e. resource management and job scheduling/monitoring, into separate daemons: a global ResourceManager (RM) and per-application ApplicationMaster (AM).

The ResourceManager is responsible for allocating resources to the running applications.

The NodeManager is a per-machine slave, works on launching the application’s containers, monitoring the resource usage, and reporting them to the ResourceManager.

The ApplicationMaster is a per-application framework, which runs as a normal container, responsible for negotiating appropriate resource containers from ResourceManager, tracking their status and monitoring for progress.

Click here to read the details about Hadoop YARN.

The Hadoop Distribution

My intention is to read the source code of hadoop, so I prefer to build a hadoop distribution from the source code.

First, git clone from github or apache.

Second, checkout the branch-2.2.0 branch, which is quite stable.

Third, apply below patch,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
diff --git a/hadoop-common-project/hadoop-auth/pom.xml b/hadoop-common-project/hadoop-auth/pom.xml
index 8819941..70ff207 100644
--- a/hadoop-common-project/hadoop-auth/pom.xml
+++ b/hadoop-common-project/hadoop-auth/pom.xml
@@ -55,6 +55,11 @@
     </dependency>
     <dependency>
       <groupId>org.mortbay.jetty</groupId>
+      <artifactId>jetty-util</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mortbay.jetty</groupId>
       <artifactId>jetty</artifactId>
       <scope>test</scope>
     </dependency>

Fourth, type and run.

mvn package -Pdist -DskipTests -Dtar

The Installation Guide

This is a great guide to install Hadoop 2.2.0.

I did a single installation, after configuring everything, hdfs can be setup and daemons are started by below scripts.

1
2
3
4
5
$ /bin/hdfs namenode -format
$ sbin/hadoop-daemon.sh start namenode
$ sbin/hadoop-daemon.sh start datanode
$ sbin/yarn-daemon.sh start resourcemanager
$ sbin/yarn-daemon.sh start nodemanager

Then, run date -u commands on two containers.

1
$ bin/hadoop jar share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.2.0.jar org.apache.hadoop.yarn.applications.distributedshell.Client -jar share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.2.0.jar -shell_command 'date' -shell_args "-u" -num_containers 2

The logs are printed:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
14/02/20 22:50:59 INFO distributedshell.Client: Initializing Client
14/02/20 22:50:59 INFO distributedshell.Client: Running Client
14/02/20 22:50:59 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
14/02/20 22:50:59 INFO distributedshell.Client: Got Cluster metric info from ASM, numNodeManagers=1
14/02/20 22:50:59 INFO distributedshell.Client: Got Cluster node info from ASM
14/02/20 22:50:59 INFO distributedshell.Client: Got node report from ASM for, nodeId=192.168.0.102:52786, nodeAddress192.168.0.102:8042, nodeRackName/default-rack, nodeNumContainers0
14/02/20 22:50:59 INFO distributedshell.Client: Queue info, queueName=default, queueCurrentCapacity=0.0, queueMaxCapacity=1.0, queueApplicationCount=0, queueChildQueueCount=0
14/02/20 22:50:59 INFO distributedshell.Client: User ACL Info for Queue, queueName=root, userAcl=SUBMIT_APPLICATIONS

...

14/02/20 22:50:59 INFO distributedshell.Client: Max mem capabililty of resources in this cluster 2048
14/02/20 22:50:59 INFO distributedshell.Client: Copy App Master jar from local filesystem and add to local environment
14/02/20 22:51:00 INFO distributedshell.Client: Set the environment for the application master
14/02/20 22:51:00 INFO distributedshell.Client: Setting up app master command
14/02/20 22:51:00 INFO distributedshell.Client: Completed setting up app master command $JAVA_HOME/bin/java -Xmx10m org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster --container_memory 10 --num_containers 2 --priority 0 --shell_command date --shell_args -u 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr
14/02/20 22:51:00 INFO distributedshell.Client: Submitting application to ASM
14/02/20 22:51:00 INFO impl.YarnClientImpl: Submitted application application_1392907840296_0001 to ResourceManager at /0.0.0.0:8032
14/02/20 22:51:01 INFO distributedshell.Client: Got application report from ASM for, appId=1, clientToAMToken=null, appDiagnostics=, appMasterHost=N/A, appQueue=default, appMasterRpcPort=0, appStartTime=1392907860335, yarnAppState=ACCEPTED, distributedFinalState=UNDEFINED, appTrackingUrl=192.168.0.102:8088/proxy/application_1392907840296_0001/, appUser=chris

...

14/02/20 22:51:08 INFO distributedshell.Client: Application has completed successfully. Breaking monitoring loop
14/02/20 22:51:08 INFO distributedshell.Client: Application completed successfully

And the results are:

1
2
3
$ cat logs/userlogs/application_1392907840296_0001/*/stdout
Thu Feb 20 14:51:05 UTC 2014
Thu Feb 20 14:51:06 UTC 2014

That’s my first YARN job running!