Hadoop RPC – Client (2)

What does org.apache.hadoop.ipc.Client do to help the real RPC client to make a remote method invocation on RPC server?

If you don’t know, or forget, please read my last post.

Firstly, create a Call instance, passing the param to it, which contains method name and parameters (class type and instances).

Then, get a client-server connection.

In the third place, send the Call instance to server using connection.sendParam method.

At last, call.wait for the result received from server.

1. Create a “call” instance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//** org.apache.hadoop.ipc.Client.java L165
/** A call waiting for a value. */
private class Call {
//** "id" is used to vary calls
int id; // call id
//** "param" includes the method name and parameters
//** of a RPC call
Writable param; // parameter
//** "value" is the return value of RPC call
Writable value; // value, null if error
IOException error; // exception, null if value
boolean done; // true when call is done

//** Add param to "Call" instance,
//** and increase the counter by 1 as call id.
protected Call(Writable param) {
this.param = param;
synchronized (Client.this) {
this.id = counter++;
}
}

/** Indicate when the call is complete and the
* value or error are available. Notifies by default. */
//** As metioned above, "call.wait" is used to wait
//** for the return value at final step.
//** "notify" is used to tell the client to stop waiting,
//** the return value is ready.
protected synchronized void callComplete() {
this.done = true;
notify(); // notify caller
}

...

//** This method will be called when "connection" receives
//** response from server, "connection" sets the value,
//** and call "callComplete".
public synchronized void setValue(Writable value) {
this.value = value;
callComplete();
}
}

2. Get a Client-Server connection

Client is get by this line of code,

1
Connection connection = getConnection(remoteId, call);

Step into getConnection ...

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//** org.apache.hadoop.ipc.Client.java L1221
/** Get a connection from the pool, or create a new one and add it to the
pool. Connections to a given ConnectionId are reused. */
private Connection getConnection(ConnectionId remoteId,
Call call)
throws IOException, InterruptedException {
if (!running.get()) {
// the client is stopped
throw new IOException("The client is stopped");
}
Connection connection;
/* we could avoid this allocation for each RPC by having a
* connectionsId object and with set() method. We need to manage the
* refs for keys in HashMap properly. For now its ok.
*/
do {
//** "connections" is a "Hashtable" instance defined as follows:
//** private Hashtable<connectionid , Connection> connections =
//** new Hashtable</connectionid><connectionid , Connection>();

//** Here, we are sure that "remoteId" does represent the connection,
//** as the key in <remoteid , connection> pair.

//** Lock is needed, since it's check-then-act sequence,
//** which is not thread safe at most of the time.
synchronized (connections) {
connection = connections.get(remoteId);
if (connection == null) {
//** If connection represents by "remoteId" can't be found,
//** create new "Connection" instance using "remoteId" as parameter.
connection = new Connection(remoteId);
//** Add the newly created "connection" to "connections"
connections.put(remoteId, connection);
}
}
//** Add "call" instance to "connection"
} while (!connection.addCall(call));

//we don't invoke the method below inside "synchronized (connections)"
//block above. The reason for that is if the server happens to be slow,
//it will take longer to establish a connection and that will slow the
//entire system down.
connection.setupIOstreams();
return connection;
}

I forget to mention that class Connection inherits Thread.

When it runs, it keeps reading responses and notifying callers. But how does connection setup the connection to remote server?

Parameters about connection are set in Connection contructor, and real connection setup is done in method connection.setupIOstreams, we’ll look at them separately.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//** org.apache.hadoop.ipc.Client.java L240
public Connection(ConnectionId remoteId) throws IOException {
//** Lots of connection related settings are gotten from "remotId"
this.remoteId = remoteId;
this.server = remoteId.getAddress();
if (server.isUnresolved()) {
throw new UnknownHostException("unknown host: " +
remoteId.getAddress().getHostName());
}
this.rpcTimeout = remoteId.getRpcTimeout();
this.maxIdleTime = remoteId.getMaxIdleTime();
this.maxRetries = remoteId.getMaxRetries();
this.tcpNoDelay = remoteId.getTcpNoDelay();
this.doPing = remoteId.getDoPing();
this.pingInterval = remoteId.getPingInterval();
if (LOG.isDebugEnabled()) {
LOG.debug("The ping interval is" + this.pingInterval + "ms.");
}

//** Ignore the code for authentication.
...

//** Each connection begins with this protocol header,
//** which will be parsed by Server to get useful information.
//** We'll discuss it in later posts about RPC Server.
header = new ConnectionHeader(protocol == null ? null : protocol
.getName(), ticket, authMethod);

if (LOG.isDebugEnabled())
LOG.debug("Use " + authMethod + " authentication for protocol "
+ protocol.getSimpleName());

this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
remoteId.getAddress().toString() +
" from " + ((ticket==null)?"an unknown user":ticket.getUserName()));
//** Set "connection" thread as daemon thread.
this.setDaemon(true);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
//** org.apache.hadoop.ipc.Client.java L519
/** Connect to the server and set up the I/O streams. It then sends
* a header to the server and starts
* the connection thread that waits for responses.
*/
private synchronized void setupIOstreams() throws InterruptedException {
if (socket != null || shouldCloseConnection.get()) {
return;
}

try {
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to "+server);
}
short numRetries = 0;
final short maxRetries = 15;
Random rand = null;
while (true) {
//** Connects to remote server by Socket
setupConnection();
InputStream inStream = NetUtils.getInputStream(socket);
OutputStream outStream = NetUtils.getOutputStream(socket);
//** Send RPC header to server,
//** includes header, version, and authentication method
writeRpcHeader(outStream);

//** Ignore SASL authentication code
...

this.in = new DataInputStream(new BufferedInputStream
(new PingInputStream(inStream)));
this.out = new DataOutputStream
(new BufferedOutputStream(outStream));
//** Write the protocol header as the beginning of each connection,
//** which is created in "Connection" constructor.
writeHeader();

// update last activity time
touch();

// start the receiver thread after the socket connection has been set up
start();
return;
}
} catch (Throwable t) {
if (t instanceof IOException) {
markClosed((IOException)t);
} else {
markClosed(new IOException("Couldn't set up IO streams", t));
}
close();
}
}

3. Send “Call” instance to Server

Connection to remote server is set by step two, step three is simple:

1
connection.sendParam(call);

Step into connection.sendParam ...

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
//** org.apache.hadoop.ipc.client.java L753
/** Initiates a call by sending the parameter to the remote server.
* Note: this is not called from the Connection thread, but by other
* threads.
*/
public void sendParam(final Call call) throws InterruptedException {
if (shouldCloseConnection.get()) {
return;
}

// lock the connection for the period of submission and waiting
// in order to bound the # of threads in the executor by the number
// of connections
//** Allow another "sendParam" only when previous "sendParam" completed.
synchronized (sendParamsLock) {
//** Submit a "Future" task to thread pool,
//** SEND_PARAMS_EXECUTOR actually is
//** Executors.newCachedThreadPool(DAEMON_THREAD_FACTORY)
Future senderFuture = SEND_PARAMS_EXECUTOR.submit(new Runnable() {
@Override
public void run() {
DataOutputBuffer d = null;

try {
//** Lock the "out" stream created by "setIOstreams"
synchronized (Connection.this.out) {
if (shouldCloseConnection.get()) {
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + " sending #" + call.id);
}

//for serializing the
//data to be written
d = new DataOutputBuffer();
//** Write call id to d
d.writeInt(call.id);
//** Still write call param to d
//** This naming convension makes fun of us!
call.param.write(d);
//** Write data length to d
byte[] data = d.getData();
int dataLength = d.getLength();
out.writeInt(dataLength); //first put the data length
//** Send them all to server
out.write(data, 0, dataLength);//write the data
out.flush();
}
} catch (IOException e) {
markClosed(e);
} finally {
//the buffer is just an in-memory buffer, but it is still polite to
// close early
IOUtils.closeStream(d);
}
}
});

try {
//** Will block if above task is not complete.
senderFuture.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();

// cause should only be a RuntimeException as the Runnable above
// catches IOException
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {
throw new RuntimeException("checked exception made it here", cause);
}
}
}
}

4. Call waits the result

At first step, I metioned that call.wait will be notified to stop waiting and get the return value if call.setValue is called.

Let’s see what did the connection thread do to achieve this.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//** org.apache.hadoop.ipc.Client.java L717
public void run() {
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": starting, having connections "
+ connections.size());

try {
//** Infinite loop if "waitForWork" always return true
while (waitForWork()) {//wait here for work - read or close connection
//** Receive response from server.
receiveResponse();
}
} catch (Throwable t) {
// This truly is unexpected, since we catch IOException in receiveResponse
// -- this is only to be really sure that we don't leave a client hanging
// forever.
LOG.warn("Unexpected error reading responses on connection " + this, t);
markClosed(new IOException("Error reading responses", t));
}
//** Close the "connection"
close();

if (LOG.isDebugEnabled())
LOG.debug(getName() + ": stopped, remaining connections "
+ connections.size());
}

Step into WaitForWork ...

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//** org.apache.hadoop.ipc.Client.java L674
/* wait till someone signals us to start reading RPC response or
* it is idle too long, it is marked as to be closed,
* or the client is marked as not running.
*
* Return true if it is time to read a response; false otherwise.
*/
private synchronized boolean waitForWork() {
//** If no call exists, should not close, and client is running,
//** then wait.
if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
long timeout = maxIdleTime-
(System.currentTimeMillis()-lastActivity.get());
if (timeout>0) {
try {
wait(timeout);
} catch (InterruptedException e) {}
}
}
//** If call exists, should not close, client is running,
//** then, receive response from server.
if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
return true;
} else if (shouldCloseConnection.get()) {
//** if connection should be closed,
//** return false.
return false;
} else if (calls.isEmpty()) { // idle connection closed or stopped
//** If no call in connection for a long time,
//** mark the connection to should be closed with no exception.
//** Code path enters here only when "timeout" is less than 0,
//** which means idle time exceeds maximum bounds.
markClosed(null);
return false;
} else { // get stopped but there are still pending requests
//** Mark the connection to should be closed with exception.
markClosed((IOException)new IOException().initCause(
new InterruptedException()));
return false;
}
}

Step into receiveResponse ...

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//** org.apache.hadoop.ipc.Client.java L808
/* Receive a response.
* Because only one receiver, so no synchronization on in.
*/
private void receiveResponse() {
if (shouldCloseConnection.get()) {
return;
}
touch();

try {
int id = in.readInt(); // try to read an id

if (LOG.isDebugEnabled())
LOG.debug(getName() + " got value #" + id);

//** Get call instance by its call id.
Call call = calls.get(id);

int state = in.readInt(); // read call status
//** If call state is success, read input
if (state == Status.SUCCESS.state) {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
//** Read value from in
value.readFields(in);
//** Finally, we see "call.setValue",
//** it will stop "call.wait" in client thread,
//** then, RPC call finishes,
//** and "call" is removed from "calls" set.
call.setValue(value);
calls.remove(id);
} else if (state == Status.ERROR.state) {
call.setException(new RemoteException(WritableUtils.readString(in),
WritableUtils.readString(in)));
calls.remove(id);
} else if (state == Status.FATAL.state) {
// Close the connection
markClosed(new RemoteException(WritableUtils.readString(in),
WritableUtils.readString(in)));
}
} catch (IOException e) {
markClosed(e);
}
}

Before ending this post, kinda long :), there are three more classes I think it’s better to talk about.

The first one is ConnectionId, remoteId appears dozens of times, It’s unfair to not talked about it.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// org.apache.hadoop.ipc.Client.java L1270
//** Below is the constructor of "ConnectionId",
//** "ConnectionId" holds the address of the remote server,
//** as well as protocol and user tickets.

//** The client connections to servers are uniquely identified
//** by <remoteaddress , protocol, tiket>

//** "remoteId" is set in "Invoker" contructor by
//** this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
//** ticket, rpcTimeout, conf);
//** before "invoke" dynamic proxy dispatching happens.
ConnectionId(InetSocketAddress address, Class< ?> protocol,
UserGroupInformation ticket, int rpcTimeout,
String serverPrincipal, int maxIdleTime,
int maxRetries, boolean tcpNoDelay,
boolean doPing, int pingInterval) {
this.protocol = protocol;
this.address = address;
this.ticket = ticket;
this.rpcTimeout = rpcTimeout;
this.serverPrincipal = serverPrincipal;
this.maxIdleTime = maxIdleTime;
this.maxRetries = maxRetries;
this.tcpNoDelay = tcpNoDelay;
this.doPing = doPing;
this.pingInterval = pingInterval;
}

The other two classes are ParallelCall and ParallelResults.

ParallelCall inherits Call, and is created to support parallel method invocation, which means multiple calls can be invoked by a single Client.call.

Not only it adds the writable param, but also a ParallelResults instance as well as a call index. Call index is used to vary calls in parallel, and get the return value of certain call from server since the results might be out of order.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//** org.apache.hadoop.ipc.Client.java L905
/** Call implementation used for parallel calls. */
private class ParallelCall extends Call {
private ParallelResults results;
private int index;

public ParallelCall(Writable param, ParallelResults results, int index) {
//** Add param
super(param);
//** Add parallel results and call index
this.results = results;
this.index = index;
}

/** Deliver result to result collector. */
//** Overwrite "callComplete", process it in "ParallelResults"
protected void callComplete() {
results.callComplete(this);
}
}

/** Result collector for parallel calls. */
private static class ParallelResults {
private Writable[] values;
private int size;
private int count;

public ParallelResults(int size) {
this.values = new Writable[size];
this.size = size;
}

/** Collect a result. */
public synchronized void callComplete(ParallelCall call) {
//** Set n-th call return value to "values"
//** according to call index.
values[call.index] = call.value; // store the value
//** Increase counter
count++; // count it
//** If all calls in this parallel set all get their return value,
//** then, notify "call.wait" to stop, and get the return values.
if (count == size) // if all values are in
notify(); // then notify waiting caller
}
}

I searched, but didn’t find any code using ParallelCall except following testing code.

1
2
3
4
5
//** org.apache.hadoop.ipc.TestIPC.java L159
Writable[] params = new Writable[addresses.length];
for (int j = 0; j < addresses.length; j++)
params[j] = new LongWritable(RANDOM.nextLong());
Writable[] values = client.call(params, addresses, null, null, conf);

Hadoop source code cdh3u2 is used in this post.