The Hadoop 2.x - Running a YARN Job (1)

In last blog post, a hadoop distribution is built to run a YARN job.

1
2
3
4
$ bin/hadoop jar share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.2.0.jar \
org.apache.hadoop.yarn.applications.distributedshell.Client -jar \
share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.2.0.jar \
-shell_command 'date' -shell_args "-u" -num_containers 2

The date -u command is executed in Hadoop cluster by above script, we might conclude that there exists a dispatcher named Client in "hadoop-yarn-applications-distributedshell-2.2.0.jar", responsible for deploying a jar to cluster with parameters, such as shell command and args, and notify the cluster to execute the shell command.

To see what's in the rabbit hole, let's step into the Client source code.

Code snippets will be full of this post, to not confuse you, all comments added by me begin with //** instead of // or /* and the code can be cloned from Apache Git Repository, commit id is 2e01e27e5ba4ece19650484f646fac42596250ce.

The Process Logic

Since the Client is started as a process, we'd better to look into the main method first.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L164
public static void main(String[] args) {
boolean result = false;
try {
Client client = new Client();
LOG.info("Initializing Client");
try {
boolean doRun = client.init(args);
if (!doRun) {
System.exit(0);
}
} catch (IllegalArgumentException e) {
....
}
result = client.run();
} catch (Throwable t) {
...
}
...
}

There are three procedures, first, constructs a Client instance, then initializes it, and invokes the run method of it.

The Client Instance

There are three constructors in Client, the main method calls the default one with no parameters.

1
2
3
4
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L227
public Client() throws Exception {
this(new YarnConfiguration());
}

The default constructor creates a YarnConfiguration instance and bypasses it to another constructor.

1
2
3
4
5
6
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L194
public Client(Configuration conf) throws Exception {
this(
"org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster",
conf);
}

Then sets "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster" as appMasterClass.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//** org.apache.hadoop.yarn.applications.distributedshell.Client.java L200
Client(String appMasterMainClass, Configuration conf) {
this.conf = conf;
//** Set appMasterMainClass
this.appMasterMainClass = appMasterMainClass;
//** This method call will create a YarnClientImpl instance
yarnClient = YarnClient.createYarnClient();
//** Init the yarn client
yarnClient.init(conf);
//** Create options for command parameters
//** Will be parsed by GnuParser later
opts = new Options();
opts.addOption("appname", true, "Application Name. Default value - DistributedShell");
opts.addOption("priority", true, "Application Priority. Default 0");
opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
opts.addOption("timeout", true, "Application timeout in milliseconds");
opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
opts.addOption("jar", true, "Jar file containing the application master");
opts.addOption("shell_command", true, "Shell command to be executed by the Application Master");
opts.addOption("shell_script", true, "Location of the shell script to be executed");
opts.addOption("shell_args", true, "Command line args for the shell script");
opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
opts.addOption("log_properties", true, "log4j.properties file");
opts.addOption("debug", false, "Dump out debug information");
opts.addOption("help", false, "Print usage");
}

The YarnClientImpl extends from YarnClient which extends from AbstractService which implements from Service, the main job of it is to control the service life-cycle,

The YarnClientImpl is created and initialized.

Since the init method can't be found in YarnClientImpl as well as YarnClient, the actual init happens in AbstractService.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//** org.apache.hadoop.service.AbstractService.java L151
public void init(Configuration conf) {
if (conf == null) {
throw new ServiceStateException("Cannot initialize service "
+ getName() + ": null configuration");
}
if (isInState(STATE.INITED)) {
return;
}
synchronized (stateChangeLock) {
if (enterState(STATE.INITED) != STATE.INITED) {
setConfig(conf);
try {
serviceInit(config);
if (isInState(STATE.INITED)) {
//if the service ended up here during init,
//notify the listeners
notifyListeners();
}
} catch (Exception e) {
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
}

It first checks whether the state isInState STATE.INITED, if not, enterState(STATE.INITED), and calls the serviceInit method, when the state is successfully transferred to STATE.INITED, notifyListeners() is called.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//** org.apache.hadoop.service.AbstractService.java L415
private void notifyListeners() {
try {
listeners.notifyListeners(this);
globalListeners.notifyListeners(this);
} catch (Throwable e) {
LOG.warn("Exception while notifying listeners of " + this + ": " + e,
e);
}
}

//** org.apache.hadoop.service.ServiceOperations.java L139
public void notifyListeners(Service service) {
//take a very fast snapshot of the callback list
//very much like CopyOnWriteArrayList, only more minimal
ServiceStateChangeListener[] callbacks;
synchronized (this) {
callbacks = listeners.toArray(new ServiceStateChangeListener[listeners.size()]);
}
//iterate through the listeners outside the synchronized method,
//ensuring that listener registration/unregistration doesn't break anything
for (ServiceStateChangeListener l : callbacks) {
l.stateChanged(service);
}
}

notifyListeners notifies all its listeners and global listeners to change their states correspondingly.

But, what is the STATE?

The State Model

Let's go back to the YarnClient.createYarnClient() method.

1
2
3
4
5
//** org.apache.hadoop.yarn.client.api.YarnClient.java L55
public static YarnClient createYarnClient() {
YarnClient client = new YarnClientImpl();
return client;
}

The YarnClientImpl instance is created.

1
2
3
4
//** org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.java L86
public YarnClientImpl() {
super(YarnClientImpl.class.getName());
}
1
2
3
4
//** org.apache.hadoop.yarn.client.api.YarnClient.java L60
protected YarnClient(String name) {
super(name);
}
1
2
3
4
5
//** org.apache.hadoop.service.AbstractService.java L111
public AbstractService(String name) {
this.name = name;
stateModel = new ServiceStateModel(name);
}

Bingo, that's the state model: ServiceStateModel.

1
2
3
4
5
6
7
8
9
//** org.apache.hadoop.service.ServiceStateModel.java L66
public ServiceStateModel(String name) {
this(name, Service.STATE.NOTINITED);
}

public ServiceStateModel(String name, Service.STATE state) {
this.state = state;
this.name = name;
}

The state model is simply a name state pair, the name is the service implementation class name.

The isInState checks the state value.

1
2
3
4
//** org.apache.hadoop.service.ServiceStateModel.java L84
public boolean isInState(Service.STATE proposed) {
return state.equals(proposed);
}

The enterState changes the state value after checkStateTransition.

1
2
3
4
5
6
7
8
//** org.apache.hadoop.service.ServiceStateModel.java L110
public synchronized Service.STATE enterState(Service.STATE proposed) {
checkStateTransition(name, state, proposed);
Service.STATE oldState = state;
//atomic write of the new state
state = proposed;
return oldState;
}

The state transferring is checked by looking up the statemap with current state, then return a boolean to indicate whether the state transition is valid, if it's invalid, the checkStateTransition throws ServiceStateException.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//** org.apache.hadoop.service.ServiceStateModel.java L125
public static void checkStateTransition(String name,
Service.STATE state,
Service.STATE proposed) {
if (!isValidStateTransition(state, proposed)) {
throw new ServiceStateException(name + " cannot enter state "
+ proposed + " from state " + state);
}
}

public static boolean isValidStateTransition(Service.STATE current,
Service.STATE proposed) {
boolean[] row = statemap[current.getValue()];
return row[proposed.getValue()];
}

Then what's in the statemap?

1
2
3
4
5
6
7
8
9
//** org.apache.hadoop.service.ServiceStateModel.java L35
private static final boolean[][] statemap =
{
// uninited inited started stopped
/* uninited */ {false, true, false, true},
/* inited */ {false, true, true, true},
/* started */ {false, false, true, true},
/* stopped */ {false, false, false, true},
};

That's the state model we are looking for. The current state is the row index, the proposed state is the column index, the value is whether the current state can be transfered to proposed state.

The Command Initialization

Go back again, to when the YarnClientImpl is about to be initialized, the init method of its super AbstractService calls the serviceInit method and the state is transfered to STATE.INITED.

YarnClientImpl has implemented the serviceInit interface.

1
2
3
4
5
6
7
8
//** org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.java L96
protected void serviceInit(Configuration conf) throws Exception {
this.rmAddress = getRmAddress(conf);
statePollIntervalMillis = conf.getLong(
YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
YarnConfiguration.DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS);
super.serviceInit(conf);
}

The address of Resource Manager, rmAddress is assigned from the configuration instance.

Now the Client instance is created successfully, init method is invoked by main to initialize the instance.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//** org.apache.hadoop.yarn.applications.distributedshell.java L244
public boolean init(String[] args) throws ParseException {

CommandLine cliParser = new GnuParser().parse(opts, args);

...

appName = cliParser.getOptionValue("appname", "DistributedShell");
amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
amQueue = cliParser.getOptionValue("queue", "default");
amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));

...

appMasterJar = cliParser.getOptionValue("jar");
...

shellCommand = cliParser.getOptionValue("shell_command");

...

return true;
}

The command line options is parsed, and assigned to instance variables for later usage.

The Client is ready to run.