The Hadoop 2.x - Running a YARN Job (2)

In last blog post, The job Client has been created and initialized.

This post will discuss on how does Client do to deploy the job to hadoop cluster.

Code snippets will be full of this post, to not confuse you, all comments added by me begin with //** instead of // or /* and the code can be cloned from Apache Git Repository, commit id is 2e01e27e5ba4ece19650484f646fac42596250ce.

The Resource Manager Proxy

1
2
3
4
5
6
7
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L330
public boolean run() throws IOException, YarnException {

LOG.info("Running Client");
yarnClient.start();

...

At first, the start method of yarnClient is invoked. In last blog post we know, the yarnClient is actually a YarnClientImpl instance, but since it and its super YarnClient don't have the start interface implemented, the code path goes to AbstractService.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//** org.apache.hadoop.service.AbstractService.java L184
public void start() {
if (isInState(STATE.STARTED)) {
return;
}
//enter the started state
synchronized (stateChangeLock) {
if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
try {
startTime = System.currentTimeMillis();
serviceStart();
if (isInState(STATE.STARTED)) {
//if the service started (and isn't now in a later state), notify
if (LOG.isDebugEnabled()) {
LOG.debug("Service " + getName() + " is started");
}
notifyListeners();
}
} catch (Exception e) {
...
}
}
}
}

In above start method, the service state transfers from STATE.INITED to STATE.STARTED, and serviceStart method which implemented in YarnClientImpl is invoked.

1
2
3
4
5
6
7
8
9
10
//** org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.java L105
protected void serviceStart() throws Exception {
try {
rmClient = ClientRMProxy.createRMProxy(getConfig(),
ApplicationClientProtocol.class);
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
super.serviceStart();
}

The ApplicationClientProtocol instance rmClient is created in serviceStart.

When I step into the createRMProxy method, I was shocked, it is really a lot of code. For simplicity, this method creates a RPC proxy link to the Resource Manager, and communicates according to the ApplicationClientProtocol, the implementation is in ApplicationClientProtocolPBClientImpl class.

We'll discuss what's under the hood of Hadoop/YARN RPC framework in later posts, believe in me. :)

The Protocol Buffer Protocols

After the Resource Manager proxy is established, the Client begins to ask questions to Resource Manager.

1
2
3
4
5
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L335
YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics();
LOG.info("Got Cluster metric info from ASM"
+ ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
...
1
2
3
4
5
6
7
8
//** org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.java L242
public YarnClusterMetrics getYarnClusterMetrics() throws YarnException,
IOException {
GetClusterMetricsRequest request =
Records.newRecord(GetClusterMetricsRequest.class);
GetClusterMetricsResponse response = rmClient.getClusterMetrics(request);
return response.getClusterMetrics();
}

By the rmClient proxy, the Client queries cluster metrics, running nodes and queue information from Resource Manager. The code path is clear, creates the input parameter, passes to method call, and gets the result.

But you might want to ask, what does Records.newRecord do?

1
2
3
4
5
6
7
8
9
10
//** org.apache.hadoop.yarn.util.Records L30
public class Records {
// The default record factory
private static final RecordFactory factory =
RecordFactoryProvider.getRecordFactory(null);

public static <T> T newRecord(Class<T> cls) {
return factory.newRecordInstance(cls);
}
}

In newRecord method, by passing the cls parameter to newRecordInstance method call on factory, a new record is generated, with the cls type GetClusterMetricsRequest.

As we see in above code, the factory is created by calling getRecordFactory from RecordFactoryProvider.

Step into the RecordFactoryProvider.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//** org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider.java L43
public static RecordFactory getRecordFactory(Configuration conf) {
...
String recordFactoryClassName = conf.get(
YarnConfiguration.IPC_RECORD_FACTORY_CLASS,
YarnConfiguration.DEFAULT_IPC_RECORD_FACTORY_CLASS);
return (RecordFactory) getFactoryClassInstance(recordFactoryClassName);
}

private static Object getFactoryClassInstance(String factoryClassName) {
try {
Class<?> clazz = Class.forName(factoryClassName);
Method method = clazz.getMethod("get", null);
method.setAccessible(true);
return method.invoke(null, null);
} catch (ClassNotFoundException e) {
...
}
}

If the IPC_RECORD_FACTORY_CLASS parameter is not set in configuration, the factory instance is created as DEFAULT_IPC_RECORD_FACTORY_CLASS which is "org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl" by calling its get method.

1
2
3
4
//** org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl L44
public static RecordFactory get() {
return self;
}

The factory is got, then let's see how the GetClusterMetricsRequest instance is created by newRecordInstance.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//** org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider.java L50
public <T> T newRecordInstance(Class<T> clazz) {
Constructor<?> constructor = cache.get(clazz);
if (constructor == null) {
Class<?> pbClazz = null;
try {
pbClazz = localConf.getClassByName(getPBImplClassName(clazz));
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException("Failed to load class: ["
+ getPBImplClassName(clazz) + "]", e);
}
try {
constructor = pbClazz.getConstructor();
constructor.setAccessible(true);
cache.putIfAbsent(clazz, constructor);
} catch (NoSuchMethodException e) {
throw new YarnRuntimeException("Could not find 0 argument constructor", e);
}
}
try {
Object retObject = constructor.newInstance();
return (T)retObject;
} catch (InvocationTargetException e) {
...
}
}

private String getPBImplClassName(Class<?> clazz) {
String srcPackagePart = getPackageName(clazz);
String srcClassName = getClassName(clazz);
String destPackagePart = srcPackagePart + "." + PB_IMPL_PACKAGE_SUFFIX;
String destClassPart = srcClassName + PB_IMPL_CLASS_SUFFIX;
return destPackagePart + "." + destClassPart;
}

At first, a class name is got from getPBImplClassName, since the parameter clazz is GetClusterMetricsRequest, the class name is GetClusterMetricsRequestPBImpl.

Then, gets the class constructor, and creates a new instance by calling below default constructor.

1
2
3
4
//** org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetClusterMetricsRequestPBImpl.java L36
public GetClusterMetricsRequestPBImpl() {
builder = GetClusterMetricsRequestProto.newBuilder();
}

When you see the "Proto", "newBuilder", I think you might know now, it's Protocol Buffer.

Now we get the GetClusterMetricsRequest instance as a parameter to the rmClient proxy.

By calling the getClusterMetrics, a GetClusterMetricsResponse instance is returned as a response.

Let's step into the real Resource Manager proxy implementation.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//** org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.java L146
public GetClusterMetricsResponse getClusterMetrics(
GetClusterMetricsRequest request) throws YarnException,
IOException {
GetClusterMetricsRequestProto requestProto =
((GetClusterMetricsRequestPBImpl) request).getProto();
try {
return new GetClusterMetricsResponsePBImpl(proxy.getClusterMetrics(null,
requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}

The returned response is actually a GetClusterMetricsResponsePBImpl instance, in which has a getClusterMetrics interface implemented to translate the RPC result to native object.

Finally, the getYarnClusterMetrics method call on yarnClient is finished. Later method call such as getNodeReports, getQueueInfo is similar to getYarnClusterMetrics.

Let's continue the run method of Client.

The Job Preparation

1
2
3
4
5
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L368
// Get a new application id
YarnClientApplication app = yarnClient.createApplication();
GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
...

A YarnClientApplication instance is created through yarnClient proxy. If you really understand the above code flow for getting the cluster metrics, it is easy for you to look deep into the createApplication method.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//** org.apache.hadoop.yarn.client.api.YarnClientApplication.java L38
public YarnClientApplication(GetNewApplicationResponse newAppResponse,
ApplicationSubmissionContext appContext) {
this.newAppResponse = newAppResponse;
this.appSubmissionContext = appContext;
}

public GetNewApplicationResponse getNewApplicationResponse() {
return newAppResponse;
}

public ApplicationSubmissionContext getApplicationSubmissionContext() {
return appSubmissionContext;
}

A YarnClientApplication has two instance variables, a GetNewApplicationResponse and a ApplicationSubmissionContext instance, the latter is used to store all needed information of a job, which will be submitted to the Resource Manager.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L369
//** Get appContext from YarnClientApplication instance
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
//** Get appId from appContext
ApplicationId appId = appContext.getApplicationId();
//** Set application name to appContext,
//** which will be used to submit the job later
appContext.setApplicationName(appName);

//** Create a ContainerLaunchContextPBImpl instance,
//** which obeys the ContainerLaunchContextProto protocol
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);

//** Create a local resouces map for the application master
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();

//** Copy the application master jar to the hdfs
FileSystem fs = FileSystem.get(conf);
Path src = new Path(appMasterJar);
String pathSuffix = appName + "/" + appId.getId() + "/AppMaster.jar";
...
//** Create a `LocalResourcePBImpl` instance, and add the AppMaster jar to it
LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
...
localResources.put("AppMaster.jar", amJarRsrc);
...

//** Copy the shell script to hdfs if exsits
String shellPathSuffix = appName + "/" + appId.getId() + "/ExecShellScript.sh";
...

// Save local resource info into app master container launch context
amContainer.setLocalResources(localResources);

// Set the env variables to be setup in the env where the application master will be run
Map<String, String> env = new HashMap<String, String>();
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));

//** Add AppMaster.jar location to classpath
StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$())
.append(File.pathSeparatorChar).append("./*");
...
env.put("CLASSPATH", classPathEnv.toString());

//** Save env to amContainer
amContainer.setEnvironment(env);

// Set the necessary command to execute the application master
Vector<CharSequence> vargs = new Vector<CharSequence>(30);
vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
...
// Set params for Application Master
vargs.add("--container_memory " + String.valueOf(containerMemory));
vargs.add("--num_containers " + String.valueOf(numContainers));
vargs.add("--priority " + String.valueOf(shellCmdPriority));
if (!shellCommand.isEmpty()) {
vargs.add("--shell_command " + shellCommand + "");
}
if (!shellArgs.isEmpty()) {
vargs.add("--shell_args " + shellArgs + "");
}
...
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");

// Get final commmand
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
command.append(str).append(" ");
}

List<String> commands = new ArrayList<String>();
commands.add(command.toString());

//** Save command to amContainer
amContainer.setCommands(commands);
...
//** Save amContainer to appContext
appContext.setAMContainerSpec(amContainer);

// Set the priority for the application master
Priority pri = Records.newRecord(Priority.class);
pri.setPriority(amPriority);
appContext.setPriority(pri);

// Set the queue to which this application is to be submitted in the RM
appContext.setQueue(amQueue);

// Submit the application to the applications manager
//** Again, use the rmClient proxy
yarnClient.submitApplication(appContext);

return monitorApplication(appId);

Now the job is deployed to hadoop cluster, and the running status is fetched by monitorApplication through rmClient proxy at every 1 second.