Extra Cookies

Yet Another Programmer's Blog

The Hadoop 2.x - Running a YARN Job (2)

In last blog post, The job Client has been created and initialized.

This post will discuss on how does Client do to deploy the job to hadoop cluster.

Code snippets will be full of this post, to not confuse you, all comments added by me begin with //** instead of // or /* and the code can be cloned from Apache Git Repository, commit id is 2e01e27e5ba4ece19650484f646fac42596250ce.

The Resource Manager Proxy

1
2
3
4
5
6
7
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L330
public boolean run() throws IOException, YarnException {

  LOG.info("Running Client");
  yarnClient.start();

  ...

At first, the start method of yarnClient is invoked. In last blog post we know, the yarnClient is actually a YarnClientImpl instance, but since it and its super YarnClient don’t have the start interface implemented, the code path goes to AbstractService.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//** org.apache.hadoop.service.AbstractService.java L184
public void start() {
  if (isInState(STATE.STARTED)) {
    return;
  }
  //enter the started state
  synchronized (stateChangeLock) {
    if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
      try {
        startTime = System.currentTimeMillis();
        serviceStart();
        if (isInState(STATE.STARTED)) {
          //if the service started (and isn't now in a later state), notify
          if (LOG.isDebugEnabled()) {
            LOG.debug("Service " + getName() + " is started");
          }
          notifyListeners();
        }
      } catch (Exception e) {
        ...
      }
    }
  }
}

In above start method, the service state transfers from STATE.INITED to STATE.STARTED, and serviceStart method which implemented in YarnClientImpl is invoked.

1
2
3
4
5
6
7
8
9
10
  //** org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.java L105
  protected void serviceStart() throws Exception {
    try {
      rmClient = ClientRMProxy.createRMProxy(getConfig(),
            ApplicationClientProtocol.class);
    } catch (IOException e) {
      throw new YarnRuntimeException(e);
    }
    super.serviceStart();
  }

The ApplicationClientProtocol instance rmClient is created in serviceStart.

When I step into the createRMProxy method, I was shocked, it is really a lot of code. For simplicity, this method creates a RPC proxy link to the Resource Manager, and communicates according to the ApplicationClientProtocol, the implementation is in ApplicationClientProtocolPBClientImpl class.

We’ll discuss what’s under the hood of Hadoop/YARN RPC framework in later posts, believe in me. :)

The Protocol Buffer Protocols

After the Resource Manager proxy is established, the Client begins to ask questions to Resource Manager.

1
2
3
4
5
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L335
YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics();
LOG.info("Got Cluster metric info from ASM"
    + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
...
1
2
3
4
5
6
7
8
//** org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.java L242
public YarnClusterMetrics getYarnClusterMetrics() throws YarnException,
  IOException {
  GetClusterMetricsRequest request =
    Records.newRecord(GetClusterMetricsRequest.class);
  GetClusterMetricsResponse response = rmClient.getClusterMetrics(request);
  return response.getClusterMetrics();
}

By the rmClient proxy, the Client queries cluster metrics, running nodes and queue information from Resource Manager. The code path is clear, creates the input parameter, passes to method call, and gets the result.

But you might want to ask, what does Records.newRecord do?

1
2
3
4
5
6
7
8
9
10
//** org.apache.hadoop.yarn.util.Records L30
public class Records {
  // The default record factory
  private static final RecordFactory factory =
      RecordFactoryProvider.getRecordFactory(null);

  public static <T> T newRecord(Class<T> cls) {
    return factory.newRecordInstance(cls);
  }
}

In newRecord method, by passing the cls parameter to newRecordInstance method call on factory, a new record is generated, with the cls type GetClusterMetricsRequest.

As we see in above code, the factory is created by calling getRecordFactory from RecordFactoryProvider.

Step into the RecordFactoryProvider.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//** org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider.java L43
public static RecordFactory getRecordFactory(Configuration conf) {
  ...
  String recordFactoryClassName = conf.get(
      YarnConfiguration.IPC_RECORD_FACTORY_CLASS,
      YarnConfiguration.DEFAULT_IPC_RECORD_FACTORY_CLASS);
  return (RecordFactory) getFactoryClassInstance(recordFactoryClassName);
}

private static Object getFactoryClassInstance(String factoryClassName) {
  try {
    Class<?> clazz = Class.forName(factoryClassName);
    Method method = clazz.getMethod("get", null);
    method.setAccessible(true);
    return method.invoke(null, null);
  } catch (ClassNotFoundException e) {
    ...
  }
}

If the IPC_RECORD_FACTORY_CLASS parameter is not set in configuration, the factory instance is created as DEFAULT_IPC_RECORD_FACTORY_CLASS which is “org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl” by calling its get method.

1
2
3
4
//** org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl L44
public static RecordFactory get() {
  return self;
}

The factory is got, then let’s see how the GetClusterMetricsRequest instance is created by newRecordInstance.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//** org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider.java L50
public <T> T newRecordInstance(Class<T> clazz) {
  Constructor<?> constructor = cache.get(clazz);
  if (constructor == null) {
    Class<?> pbClazz = null;
    try {
      pbClazz = localConf.getClassByName(getPBImplClassName(clazz));
    } catch (ClassNotFoundException e) {
      throw new YarnRuntimeException("Failed to load class: ["
          + getPBImplClassName(clazz) + "]", e);
    }
    try {
      constructor = pbClazz.getConstructor();
      constructor.setAccessible(true);
      cache.putIfAbsent(clazz, constructor);
    } catch (NoSuchMethodException e) {
      throw new YarnRuntimeException("Could not find 0 argument constructor", e);
    }
  }
  try {
    Object retObject = constructor.newInstance();
    return (T)retObject;
  } catch (InvocationTargetException e) {
    ...
  }
}

private String getPBImplClassName(Class<?> clazz) {
  String srcPackagePart = getPackageName(clazz);
  String srcClassName = getClassName(clazz);
  String destPackagePart = srcPackagePart + "." + PB_IMPL_PACKAGE_SUFFIX;
  String destClassPart = srcClassName + PB_IMPL_CLASS_SUFFIX;
  return destPackagePart + "." + destClassPart;
}

At first, a class name is got from getPBImplClassName, since the parameter clazz is GetClusterMetricsRequest, the class name is GetClusterMetricsRequestPBImpl.

Then, gets the class constructor, and creates a new instance by calling below default constructor.

1
2
3
4
//** org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetClusterMetricsRequestPBImpl.java L36
public GetClusterMetricsRequestPBImpl() {
  builder = GetClusterMetricsRequestProto.newBuilder();
}

When you see the “Proto”, “newBuilder”, I think you might know now, it’s Protocol Buffer.

Now we get the GetClusterMetricsRequest instance as a parameter to the rmClient proxy.

By calling the getClusterMetrics, a GetClusterMetricsResponse instance is returned as a response.

Let’s step into the real Resource Manager proxy implementation.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
  //** org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.java L146
  public GetClusterMetricsResponse getClusterMetrics(
      GetClusterMetricsRequest request) throws YarnException,
      IOException {
    GetClusterMetricsRequestProto requestProto =
        ((GetClusterMetricsRequestPBImpl) request).getProto();
    try {
      return new GetClusterMetricsResponsePBImpl(proxy.getClusterMetrics(null,
        requestProto));
    } catch (ServiceException e) {
      RPCUtil.unwrapAndThrowException(e);
      return null;
    }
  }

The returned response is actually a GetClusterMetricsResponsePBImpl instance, in which has a getClusterMetrics interface implemented to translate the RPC result to native object.

Finally, the getYarnClusterMetrics method call on yarnClient is finished. Later method call such as getNodeReports, getQueueInfo is similar to getYarnClusterMetrics.

Let’s continue the run method of Client.

The Job Preparation

1
2
3
4
5
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L368
// Get a new application id
YarnClientApplication app = yarnClient.createApplication();
GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
...

A YarnClientApplication instance is created through yarnClient proxy. If you really understand the above code flow for getting the cluster metrics, it is easy for you to look deep into the createApplication method.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//** org.apache.hadoop.yarn.client.api.YarnClientApplication.java L38
public YarnClientApplication(GetNewApplicationResponse newAppResponse,
                             ApplicationSubmissionContext appContext) {
  this.newAppResponse = newAppResponse;
  this.appSubmissionContext = appContext;
}

public GetNewApplicationResponse getNewApplicationResponse() {
  return newAppResponse;
}

public ApplicationSubmissionContext getApplicationSubmissionContext() {
  return appSubmissionContext;
}

A YarnClientApplication has two instance variables, a GetNewApplicationResponse and a ApplicationSubmissionContext instance, the latter is used to store all needed information of a job, which will be submitted to the Resource Manager.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L369
//** Get appContext from YarnClientApplication instance
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
//** Get appId from appContext
ApplicationId appId = appContext.getApplicationId();
//** Set application name to appContext,
//** which will be used to submit the job later
appContext.setApplicationName(appName);

//** Create a ContainerLaunchContextPBImpl instance,
//** which obeys the ContainerLaunchContextProto protocol
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);

//** Create a local resouces map for the application master
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();

//** Copy the application master jar to the hdfs
FileSystem fs = FileSystem.get(conf);
Path src = new Path(appMasterJar);
String pathSuffix = appName + "/" + appId.getId() + "/AppMaster.jar";
...
//** Create a `LocalResourcePBImpl` instance, and add the AppMaster jar to it
LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
...
localResources.put("AppMaster.jar",  amJarRsrc);
...

//** Copy the shell script to hdfs if exsits
String shellPathSuffix = appName + "/" + appId.getId() + "/ExecShellScript.sh";
...

// Save local resource info into app master container launch context
amContainer.setLocalResources(localResources);

// Set the env variables to be setup in the env where the application master will be run
Map<String, String> env = new HashMap<String, String>();
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));

//** Add AppMaster.jar location to classpath
StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$())
  .append(File.pathSeparatorChar).append("./*");
...
env.put("CLASSPATH", classPathEnv.toString());

//** Save env to amContainer
amContainer.setEnvironment(env);

// Set the necessary command to execute the application master
Vector<CharSequence> vargs = new Vector<CharSequence>(30);
vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
...
// Set params for Application Master
vargs.add("--container_memory " + String.valueOf(containerMemory));
vargs.add("--num_containers " + String.valueOf(numContainers));
vargs.add("--priority " + String.valueOf(shellCmdPriority));
if (!shellCommand.isEmpty()) {
  vargs.add("--shell_command " + shellCommand + "");
}
if (!shellArgs.isEmpty()) {
  vargs.add("--shell_args " + shellArgs + "");
}
...
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");

// Get final commmand
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
  command.append(str).append(" ");
}

List<String> commands = new ArrayList<String>();
commands.add(command.toString());

//** Save command to amContainer
amContainer.setCommands(commands);
...
//** Save amContainer to appContext
appContext.setAMContainerSpec(amContainer);

// Set the priority for the application master
Priority pri = Records.newRecord(Priority.class);
pri.setPriority(amPriority);
appContext.setPriority(pri);

// Set the queue to which this application is to be submitted in the RM
appContext.setQueue(amQueue);

// Submit the application to the applications manager
//** Again, use the rmClient proxy
yarnClient.submitApplication(appContext);

return monitorApplication(appId);

Now the job is deployed to hadoop cluster, and the running status is fetched by monitorApplication through rmClient proxy at every 1 second.

The Hadoop 2.x - Running a YARN Job (1)

In last blog post, a hadoop distribution is built to run a YARN job.

1
2
3
4
$ bin/hadoop jar share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.2.0.jar \
    org.apache.hadoop.yarn.applications.distributedshell.Client -jar \
    share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.2.0.jar \
    -shell_command 'date' -shell_args "-u" -num_containers 2

The date -u command is executed in Hadoop cluster by above script, we might conclude that there exists a dispatcher named Client in “hadoop-yarn-applications-distributedshell-2.2.0.jar”, responsible for deploying a jar to cluster with parameters, such as shell command and args, and notify the cluster to execute the shell command.

To see what’s in the rabbit hole, let’s step into the Client source code.

Code snippets will be full of this post, to not confuse you, all comments added by me begin with //** instead of // or /* and the code can be cloned from Apache Git Repository, commit id is 2e01e27e5ba4ece19650484f646fac42596250ce.

The Process Logic

Since the Client is started as a process, we’d better to look into the main method first.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L164
public static void main(String[] args) {
  boolean result = false;
  try {
    Client client = new Client();
    LOG.info("Initializing Client");
    try {
      boolean doRun = client.init(args);
      if (!doRun) {
        System.exit(0);
      }
    } catch (IllegalArgumentException e) {
      ....
    }
    result = client.run();
  } catch (Throwable t) {
    ...
  }
  ...
}

There are three procedures, first, constructs a Client instance, then initializes it, and invokes the run method of it.

The Client Instance

There are three constructors in Client, the main method calls the default one with no parameters.

1
2
3
4
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L227
public Client() throws Exception  {
  this(new YarnConfiguration());
}

The default constructor creates a YarnConfiguration instance and bypasses it to another constructor.

1
2
3
4
5
6
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L194
public Client(Configuration conf) throws Exception  {
  this(
    "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster",
    conf);
}

Then sets “org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster” as appMasterClass.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L200
Client(String appMasterMainClass, Configuration conf) {
  this.conf = conf;
  //** Set appMasterMainClass
  this.appMasterMainClass = appMasterMainClass;
  //** This method call will create a YarnClientImpl instance
  yarnClient = YarnClient.createYarnClient();
  //** Init the yarn client
  yarnClient.init(conf);
  //** Create options for command parameters
  //** Will be parsed by GnuParser later
  opts = new Options();
  opts.addOption("appname", true, "Application Name. Default value - DistributedShell");
  opts.addOption("priority", true, "Application Priority. Default 0");
  opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
  opts.addOption("timeout", true, "Application timeout in milliseconds");
  opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
  opts.addOption("jar", true, "Jar file containing the application master");
  opts.addOption("shell_command", true, "Shell command to be executed by the Application Master");
  opts.addOption("shell_script", true, "Location of the shell script to be executed");
  opts.addOption("shell_args", true, "Command line args for the shell script");
  opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
  opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
  opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
  opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
  opts.addOption("log_properties", true, "log4j.properties file");
  opts.addOption("debug", false, "Dump out debug information");
  opts.addOption("help", false, "Print usage");
}

The YarnClientImpl extends from YarnClient which extends from AbstractService which implements from Service, the main job of it is to control the service life-cycle,

The YarnClientImpl is created and initialized.

Since the init method can’t be found in YarnClientImpl as well as YarnClient, the actual init happens in AbstractService.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//** org.apache.hadoop.service.AbstractService.java L151
public void init(Configuration conf) {
  if (conf == null) {
    throw new ServiceStateException("Cannot initialize service "
                                    + getName() + ": null configuration");
  }
  if (isInState(STATE.INITED)) {
    return;
  }
  synchronized (stateChangeLock) {
    if (enterState(STATE.INITED) != STATE.INITED) {
      setConfig(conf);
      try {
        serviceInit(config);
        if (isInState(STATE.INITED)) {
          //if the service ended up here during init,
          //notify the listeners
          notifyListeners();
        }
      } catch (Exception e) {
        noteFailure(e);
        ServiceOperations.stopQuietly(LOG, this);
        throw ServiceStateException.convert(e);
      }
    }
  }
}

It first checks whether the state isInState STATE.INITED, if not, enterState(STATE.INITED), and calls the serviceInit method, when the state is successfully transferred to STATE.INITED, notifyListeners() is called.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//** org.apache.hadoop.service.AbstractService.java L415
private void notifyListeners() {
  try {
    listeners.notifyListeners(this);
    globalListeners.notifyListeners(this);
  } catch (Throwable e) {
    LOG.warn("Exception while notifying listeners of " + this + ": " + e,
             e);
  }
}

//** org.apache.hadoop.service.ServiceOperations.java L139
public void notifyListeners(Service service) {
  //take a very fast snapshot of the callback list
  //very much like CopyOnWriteArrayList, only more minimal
  ServiceStateChangeListener[] callbacks;
  synchronized (this) {
    callbacks = listeners.toArray(new ServiceStateChangeListener[listeners.size()]);
  }
  //iterate through the listeners outside the synchronized method,
  //ensuring that listener registration/unregistration doesn't break anything
  for (ServiceStateChangeListener l : callbacks) {
    l.stateChanged(service);
  }
}

notifyListeners notifies all its listeners and global listeners to change their states correspondingly.

But, what is the STATE?

The State Model

Let’s go back to the YarnClient.createYarnClient() method.

1
2
3
4
5
//** org.apache.hadoop.yarn.client.api.YarnClient.java L55
public static YarnClient createYarnClient() {
  YarnClient client = new YarnClientImpl();
  return client;
}

The YarnClientImpl instance is created.

1
2
3
4
//** org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.java L86
public YarnClientImpl() {
  super(YarnClientImpl.class.getName());
}
1
2
3
4
//** org.apache.hadoop.yarn.client.api.YarnClient.java L60
protected YarnClient(String name) {
  super(name);
}
1
2
3
4
5
//** org.apache.hadoop.service.AbstractService.java L111
public AbstractService(String name) {
  this.name = name;
  stateModel = new ServiceStateModel(name);
}

Bingo, that’s the state model: ServiceStateModel.

1
2
3
4
5
6
7
8
9
//** org.apache.hadoop.service.ServiceStateModel.java L66
public ServiceStateModel(String name) {
  this(name, Service.STATE.NOTINITED);
}

public ServiceStateModel(String name, Service.STATE state) {
  this.state = state;
  this.name = name;
}

The state model is simply a name state pair, the name is the service implementation class name.

The isInState checks the state value.

1
2
3
4
//** org.apache.hadoop.service.ServiceStateModel.java L84
public boolean isInState(Service.STATE proposed) {
  return state.equals(proposed);
}

The enterState changes the state value after checkStateTransition.

1
2
3
4
5
6
7
8
//** org.apache.hadoop.service.ServiceStateModel.java L110
public synchronized Service.STATE enterState(Service.STATE proposed) {
  checkStateTransition(name, state, proposed);
  Service.STATE oldState = state;
  //atomic write of the new state
  state = proposed;
  return oldState;
}

The state transferring is checked by looking up the statemap with current state, then return a boolean to indicate whether the state transition is valid, if it’s invalid, the checkStateTransition throws ServiceStateException.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//** org.apache.hadoop.service.ServiceStateModel.java L125
public static void checkStateTransition(String name,
                                        Service.STATE state,
                                        Service.STATE proposed) {
  if (!isValidStateTransition(state, proposed)) {
    throw new ServiceStateException(name + " cannot enter state "
                                    + proposed + " from state " + state);
  }
}

public static boolean isValidStateTransition(Service.STATE current,
                                             Service.STATE proposed) {
  boolean[] row = statemap[current.getValue()];
  return row[proposed.getValue()];
}

Then what’s in the statemap?

1
2
3
4
5
6
7
8
9
//** org.apache.hadoop.service.ServiceStateModel.java L35
private static final boolean[][] statemap =
  {
    //                uninited inited started stopped
    /* uninited  */    {false, true,  false,  true},
    /* inited    */    {false, true,  true,   true},
    /* started   */    {false, false, true,   true},
    /* stopped   */    {false, false, false,  true},
  };

That’s the state model we are looking for. The current state is the row index, the proposed state is the column index, the value is whether the current state can be transfered to proposed state.

The Command Initialization

Go back again, to when the YarnClientImpl is about to be initialized, the init method of its super AbstractService calls the serviceInit method and the state is transfered to STATE.INITED.

YarnClientImpl has implemented the serviceInit interface.

1
2
3
4
5
6
7
8
//** org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.java L96
protected void serviceInit(Configuration conf) throws Exception {
  this.rmAddress = getRmAddress(conf);
  statePollIntervalMillis = conf.getLong(
      YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
      YarnConfiguration.DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS);
  super.serviceInit(conf);
}

The address of Resource Manager, rmAddress is assigned from the configuration instance.

Now the Client instance is created successfully, init method is invoked by main to initialize the instance.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//** org.apache.hadoop.yarn.applications.distributedshell.java L244
public boolean init(String[] args) throws ParseException {

  CommandLine cliParser = new GnuParser().parse(opts, args);

  ...

  appName = cliParser.getOptionValue("appname", "DistributedShell");
  amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
  amQueue = cliParser.getOptionValue("queue", "default");
  amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));

  ...

  appMasterJar = cliParser.getOptionValue("jar");
  ...

  shellCommand = cliParser.getOptionValue("shell_command");

  ...

  return true;
}

The command line options is parsed, and assigned to instance variables for later usage.

The Client is ready to run.

The Hadoop 2.x - Introduction to YARN

The Old MapReduce

The Hadoop 0.x MapReduce system composed of JobTracker and TaskTrackers.

The JobTracker is responsible for resource management, tracking resource usage and job life-cycle management, e.g. scheduling job tasks, tracking progress, providing fault-tolerance for tasks.

The TaskTracker is the per-node slave for JobTracker, takes orders from the JobTracker to launch or tear-down tasks, and provides task status information to the JobTracker periodically.

For those years, we are benefited from the MapReduce framework, it’s the most successful programming model in the big data world.

But MapReduce is not everything, we need to do graph processing, or real-time stream processing, since Hadoop is essentially batch oriented, we have to look for other systems to do those work.

And the hadoop community made a huge change.

The Hadoop YARN

The fundamental idea of YARN is to split up the two major responsibilities of the JobTracker i.e. resource management and job scheduling/monitoring, into separate daemons: a global ResourceManager (RM) and per-application ApplicationMaster (AM).

The ResourceManager is responsible for allocating resources to the running applications.

The NodeManager is a per-machine slave, works on launching the application’s containers, monitoring the resource usage, and reporting them to the ResourceManager.

The ApplicationMaster is a per-application framework, which runs as a normal container, responsible for negotiating appropriate resource containers from ResourceManager, tracking their status and monitoring for progress.

Click here to read the details about Hadoop YARN.

The Hadoop Distribution

My intention is to read the source code of hadoop, so I prefer to build a hadoop distribution from the source code.

First, git clone from github or apache.

Second, checkout the branch-2.2.0 branch, which is quite stable.

Third, apply below patch,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
diff --git a/hadoop-common-project/hadoop-auth/pom.xml b/hadoop-common-project/hadoop-auth/pom.xml
index 8819941..70ff207 100644
--- a/hadoop-common-project/hadoop-auth/pom.xml
+++ b/hadoop-common-project/hadoop-auth/pom.xml
@@ -55,6 +55,11 @@
     </dependency>
     <dependency>
       <groupId>org.mortbay.jetty</groupId>
+      <artifactId>jetty-util</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mortbay.jetty</groupId>
       <artifactId>jetty</artifactId>
       <scope>test</scope>
     </dependency>

Fourth, type and run.

mvn package -Pdist -DskipTests -Dtar

The Installation Guide

This is a great guide to install Hadoop 2.2.0.

I did a single installation, after configuring everything, hdfs can be setup and daemons are started by below scripts.

1
2
3
4
5
$ /bin/hdfs namenode -format
$ sbin/hadoop-daemon.sh start namenode
$ sbin/hadoop-daemon.sh start datanode
$ sbin/yarn-daemon.sh start resourcemanager
$ sbin/yarn-daemon.sh start nodemanager

Then, run date -u commands on two containers.

1
$ bin/hadoop jar share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.2.0.jar org.apache.hadoop.yarn.applications.distributedshell.Client -jar share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.2.0.jar -shell_command 'date' -shell_args "-u" -num_containers 2

The logs are printed:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
14/02/20 22:50:59 INFO distributedshell.Client: Initializing Client
14/02/20 22:50:59 INFO distributedshell.Client: Running Client
14/02/20 22:50:59 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
14/02/20 22:50:59 INFO distributedshell.Client: Got Cluster metric info from ASM, numNodeManagers=1
14/02/20 22:50:59 INFO distributedshell.Client: Got Cluster node info from ASM
14/02/20 22:50:59 INFO distributedshell.Client: Got node report from ASM for, nodeId=192.168.0.102:52786, nodeAddress192.168.0.102:8042, nodeRackName/default-rack, nodeNumContainers0
14/02/20 22:50:59 INFO distributedshell.Client: Queue info, queueName=default, queueCurrentCapacity=0.0, queueMaxCapacity=1.0, queueApplicationCount=0, queueChildQueueCount=0
14/02/20 22:50:59 INFO distributedshell.Client: User ACL Info for Queue, queueName=root, userAcl=SUBMIT_APPLICATIONS

...

14/02/20 22:50:59 INFO distributedshell.Client: Max mem capabililty of resources in this cluster 2048
14/02/20 22:50:59 INFO distributedshell.Client: Copy App Master jar from local filesystem and add to local environment
14/02/20 22:51:00 INFO distributedshell.Client: Set the environment for the application master
14/02/20 22:51:00 INFO distributedshell.Client: Setting up app master command
14/02/20 22:51:00 INFO distributedshell.Client: Completed setting up app master command $JAVA_HOME/bin/java -Xmx10m org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster --container_memory 10 --num_containers 2 --priority 0 --shell_command date --shell_args -u 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr
14/02/20 22:51:00 INFO distributedshell.Client: Submitting application to ASM
14/02/20 22:51:00 INFO impl.YarnClientImpl: Submitted application application_1392907840296_0001 to ResourceManager at /0.0.0.0:8032
14/02/20 22:51:01 INFO distributedshell.Client: Got application report from ASM for, appId=1, clientToAMToken=null, appDiagnostics=, appMasterHost=N/A, appQueue=default, appMasterRpcPort=0, appStartTime=1392907860335, yarnAppState=ACCEPTED, distributedFinalState=UNDEFINED, appTrackingUrl=192.168.0.102:8088/proxy/application_1392907840296_0001/, appUser=chris

...

14/02/20 22:51:08 INFO distributedshell.Client: Application has completed successfully. Breaking monitoring loop
14/02/20 22:51:08 INFO distributedshell.Client: Application completed successfully

And the results are:

1
2
3
$ cat logs/userlogs/application_1392907840296_0001/*/stdout
Thu Feb 20 14:51:05 UTC 2014
Thu Feb 20 14:51:06 UTC 2014

That’s my first YARN job running!