In last blog
post, The job Client has been created and
initialized.
This post will discuss on how does Client do to deploy
the job to hadoop cluster.
Code snippets will be full of this post, to not confuse you, all
comments added by me begin with //** instead of
// or /* and the code can be cloned from Apache Git
Repository, commit id is
2e01e27e5ba4ece19650484f646fac42596250ce.
At first, the start method of yarnClient is
invoked. In last blog
post we know, the yarnClient is actually a
YarnClientImpl instance, but since it and its super
YarnClient don't have the start interface
implemented, the code path goes to AbstractService.
//** org.apache.hadoop.service.AbstractService.java L184 publicvoidstart(){ if (isInState(STATE.STARTED)) { return; } //enter the started state synchronized (stateChangeLock) { if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) { try { startTime = System.currentTimeMillis(); serviceStart(); if (isInState(STATE.STARTED)) { //if the service started (and isn't now in a later state), notify if (LOG.isDebugEnabled()) { LOG.debug("Service " + getName() + " is started"); } notifyListeners(); } } catch (Exception e) { ... } } } }
In above start method, the service state transfers from
STATE.INITED to STATE.STARTED, and
serviceStart method which implemented in
YarnClientImpl is invoked.
The ApplicationClientProtocol instance
rmClient is created in serviceStart.
When I step into the createRMProxy method, I was
shocked, it is really a lot of code. For simplicity, this method creates
a RPC proxy link to the Resource Manager, and communicates according to
the ApplicationClientProtocol, the implementation is in
ApplicationClientProtocolPBClientImpl class.
We'll discuss what's under the hood of Hadoop/YARN RPC framework in
later posts, believe in me. :)
The Protocol Buffer
Protocols
After the Resource Manager proxy is established, the
Client begins to ask questions to Resource Manager.
1 2 3 4 5
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L335 YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics(); LOG.info("Got Cluster metric info from ASM" + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers()); ...
By the rmClient proxy, the Client queries
cluster metrics, running nodes and queue information from Resource
Manager. The code path is clear, creates the input parameter, passes to
method call, and gets the result.
But you might want to ask, what does Records.newRecord
do?
1 2 3 4 5 6 7 8 9 10
//** org.apache.hadoop.yarn.util.Records L30 publicclassRecords{ // The default record factory privatestaticfinal RecordFactory factory = RecordFactoryProvider.getRecordFactory(null);
publicstatic <T> T newRecord(Class<T> cls){ return factory.newRecordInstance(cls); } }
In newRecord method, by passing the cls
parameter to newRecordInstance method call on
factory, a new record is generated, with the
cls type GetClusterMetricsRequest.
As we see in above code, the factory is created by
calling getRecordFactory from
RecordFactoryProvider.
If the IPC_RECORD_FACTORY_CLASS parameter is not set in
configuration, the factory instance is created as
DEFAULT_IPC_RECORD_FACTORY_CLASS which is
"org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl" by
calling its get method.
At first, a class name is got from getPBImplClassName,
since the parameter clazz is
GetClusterMetricsRequest, the class name is
GetClusterMetricsRequestPBImpl.
Then, gets the class constructor, and creates a new instance by
calling below default constructor.
The returned response is actually a
GetClusterMetricsResponsePBImpl instance, in which has a
getClusterMetrics interface implemented to translate the
RPC result to native object.
Finally, the getYarnClusterMetrics method call on
yarnClient is finished. Later method call such as
getNodeReports, getQueueInfo is similar to
getYarnClusterMetrics.
Let's continue the run method of
Client.
The Job Preparation
1 2 3 4 5
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L368 // Get a new application id YarnClientApplication app = yarnClient.createApplication(); GetNewApplicationResponse appResponse = app.getNewApplicationResponse(); ...
A YarnClientApplication instance is created through
yarnClient proxy. If you really understand the above code
flow for getting the cluster metrics, it is easy for you to look deep
into the createApplication method.
public GetNewApplicationResponse getNewApplicationResponse(){ return newAppResponse; }
public ApplicationSubmissionContext getApplicationSubmissionContext(){ return appSubmissionContext; }
A YarnClientApplication has two instance variables, a
GetNewApplicationResponse and a
ApplicationSubmissionContext instance, the latter is used
to store all needed information of a job, which will be submitted to the
Resource Manager.
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L369 //** Get appContext from YarnClientApplication instance ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); //** Get appId from appContext ApplicationId appId = appContext.getApplicationId(); //** Set application name to appContext, //** which will be used to submit the job later appContext.setApplicationName(appName);
//** Create a ContainerLaunchContextPBImpl instance, //** which obeys the ContainerLaunchContextProto protocol ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
//** Create a local resouces map for the application master Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
//** Copy the application master jar to the hdfs FileSystem fs = FileSystem.get(conf); Path src = new Path(appMasterJar); String pathSuffix = appName + "/" + appId.getId() + "/AppMaster.jar"; ... //** Create a `LocalResourcePBImpl` instance, and add the AppMaster jar to it LocalResource amJarRsrc = Records.newRecord(LocalResource.class); ... localResources.put("AppMaster.jar", amJarRsrc); ...
//** Copy the shell script to hdfs if exsits String shellPathSuffix = appName + "/" + appId.getId() + "/ExecShellScript.sh"; ...
// Save local resource info into app master container launch context amContainer.setLocalResources(localResources);
// Set the env variables to be setup in the env where the application master will be run Map<String, String> env = new HashMap<String, String>(); env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation); env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp)); env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
//** Add AppMaster.jar location to classpath StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$()) .append(File.pathSeparatorChar).append("./*"); ... env.put("CLASSPATH", classPathEnv.toString());
//** Save env to amContainer amContainer.setEnvironment(env);
// Set the necessary command to execute the application master Vector<CharSequence> vargs = new Vector<CharSequence>(30); vargs.add(Environment.JAVA_HOME.$() + "/bin/java"); ... // Set params for Application Master vargs.add("--container_memory " + String.valueOf(containerMemory)); vargs.add("--num_containers " + String.valueOf(numContainers)); vargs.add("--priority " + String.valueOf(shellCmdPriority)); if (!shellCommand.isEmpty()) { vargs.add("--shell_command " + shellCommand + ""); } if (!shellArgs.isEmpty()) { vargs.add("--shell_args " + shellArgs + ""); } ... vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
// Get final commmand StringBuilder command = new StringBuilder(); for (CharSequence str : vargs) { command.append(str).append(" "); }
List<String> commands = new ArrayList<String>(); commands.add(command.toString());
//** Save command to amContainer amContainer.setCommands(commands); ... //** Save amContainer to appContext appContext.setAMContainerSpec(amContainer);
// Set the priority for the application master Priority pri = Records.newRecord(Priority.class); pri.setPriority(amPriority); appContext.setPriority(pri);
// Set the queue to which this application is to be submitted in the RM appContext.setQueue(amQueue);
// Submit the application to the applications manager //** Again, use the rmClient proxy yarnClient.submitApplication(appContext);
return monitorApplication(appId);
Now the job is deployed to hadoop cluster, and the running status is
fetched by monitorApplication through rmClient
proxy at every 1 second.