Hadoop RPC – Dynamic Proxy

This will be the first article of a series of blogs I am about to post, which are all about what I learned from reading the Hadoop source code (cdh3u2).

Code snippets will be full of those posts, to not confuse you, all comments added by me begin with //** instead of // or /*.

An Easy Example of Java Dynamic Proxy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.lang.reflect.*;

public class ProxyTester {
public interface Hello {
void sayHello();
}

public class RealHello implements Hello {
public void sayHello() {
System.out.println("Hello, World");
}
}

public class FakeHello implements InvocationHandler {
private Object agent;
public FakeHello(Object obj) {
this.agent = obj;
}
//** Hijacking real method invocation
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
System.out.println("Hello Hadoop first");
//** Real method invocation happens here
Object result = method.invoke(agent, args);
return result;
}
}

public void test() {
RealHello realHello = new RealHello();
FakeHello fakeHello = new FakeHello(realHello);
//** "hello" will be a proxy,
//** whose method invocation will be dispatched to "fakeHello".
Hello hello = (Hello) Proxy.newProxyInstance(
Hello.class.getClassLoader(), new Class[] {Hello.class}, fakeHello);
hello.sayHello();
}

public static void main(String[] args) {
ProxyTester proxyTester = new ProxyTester();
proxyTester.test();
}
}

The results:

1
2
Hello Hadoop first
Hello, World

What’s happened? What does “Hello Hadoop first” come from?

It’s clear that method invocations to “hello” were hijacked to “fakeHello”, but how?

The magic trick is called Java dynamic proxy, catered by two steps below:

Step 1: Use “Proxy.newProxyInstance” to return an instance of a proxy class for the specified interfaces that dispatches method invocations to the specified invocation handler.

In our case, interface is “Hello”, invocation handler is “fakeHello”

Step 2: Define a class inherits “InvocationHandler”, and implements “invoke” method.

In our case, the “FakeHello” inherits “InvocationHandler” and is designed to add “realHello” as a property, in “invoke” method, we can do whatever we want (e.g. prints “Hello Hadoop first”), and then use “method.invoke” to do true “Hello” action (e.g. prints “Hello World”).

The two classes (InvocationHandler and Proxy) make up the heart of Java dynamic proxy.

Proxy in Hadoop RPC

Let’s begin with DataNode as an example, it always keep in contact with NameNode by the “namenode” proxy obtained in method “startDataNode”:

1
2
3
4
5
6
7
8
9
10
//** org.apache.hadoop.hdfs.server.datanode.DataNode.java L347

//** NameNode implements DatanodeProtocol interface,
//** seems RPC.waitForProxy will dispatches method invocation
//** of DatanodeProtocol to something else ("namenode").
this.namenode = (DatanodeProtocol)
RPC.waitForProxy(DatanodeProtocol.class,
DatanodeProtocol.versionID,
nameNodeAddr,
conf);

Step into “RPC.waitForProxy” ...

1
2
3
4
5
6
7
8
9
//** org.apache.hadoop.ipc.RPC.java L291
public static VersionedProtocol waitForProxy(
Class< ? extends VersionedProtocol> protocol,
long clientVersion,
InetSocketAddress addr,
Configuration conf
) throws IOException {
return waitForProxy(protocol, clientVersion, addr, conf, Long.MAX_VALUE);
}

This is a wrapper method to reduce method parameters, let’s ignore all the wrapper methods, and look into the original “waitForProxy” method:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//** org.apache.hadoop.ipc.RPC.java L330
static VersionedProtocol waitForProxy(
Class< ? extends VersionedProtocol> protocol,
long clientVersion,
InetSocketAddress addr,
Configuration conf,
int rpcTimeout,
long connTimeout
) throws IOException {
long startTime = System.currentTimeMillis();
IOException ioe;
while (true) {
try {
//** This method will do the proxy setting
return getProxy(protocol, clientVersion, addr,
UserGroupInformation.getCurrentUser(), conf, NetUtils
.getDefaultSocketFactory(conf), rpcTimeout);
} catch(ConnectException se) { // namenode has not been started
...
}

Step into “getProxy" ...

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//** org.apache.hadoop.ipc.RPC.java L387
public static VersionedProtocol getProxy(
Class< ? extends VersionedProtocol> protocol,
long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
}
//** Familiar? (Step 1)
//** "proxy" is a proxy, it dispatches all "protocol" method invocations to "new Invoker",
//** Now, Step 1 is done, but where is the class which inherits InvocationHandler?
VersionedProtocol proxy =
(VersionedProtocol) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] { protocol },
new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
long serverVersion = proxy.getProtocolVersion(protocol.getName(),
clientVersion);
if (serverVersion == clientVersion) {
return proxy;
...
}

Step into “new Invoker()” ...

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//** org.apache.hadoop.ipc.RPC.java L205

//** yep, this is Step 2
private static class Invoker implements InvocationHandler {
private Client.ConnectionId remoteId;
private Client client;
private boolean isClosed = false;

public Invoker(Class< ? extends VersionedProtocol> protocol,
InetSocketAddress address, UserGroupInformation ticket,
Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException {
this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
ticket, rpcTimeout, conf);
this.client = CLIENTS.getClient(conf, factory);
}

//** All "proxy" method invocations are dispatched to this place,
//** the "invoke" method accepts request of hadoop client (e.g. DataNode),
//** then dispatches the request to hadoop server (e.g. NameNode),
//** but client knows nothing about the server,
//** it only invokes a method call on proxy.
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
final boolean logDebug = LOG.isDebugEnabled();
long startTime = 0;
if (logDebug) {
startTime = System.currentTimeMillis();
}
//** "client.call" will connect to the RPC server,
//** invoke method on it using same method name,
//** and wait for the results.
//** Next post will discuss what's under the hood of it.
ObjectWritable value = (ObjectWritable)
client.call(new Invocation(method, args), remoteId);
if (logDebug) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
}
return value.get();
}
...
}

OK, we know how the proxy works. Go back to DataNode.java, let’s see how “DataNode” uses the proxy.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//** org.apache.hadoop.hdfs.server.datanode.DataNode.java L530
private NamespaceInfo handshake() throws IOException {
NamespaceInfo nsInfo = new NamespaceInfo();
while (shouldRun) {
try {
//** "namenode" is an instance of interface "DatanodeProtocol",
//** method "versionRequest" invocation will be
//** dispatched to the RPC proxy "Invoker",
//** then, "Invoker" send this method request to RPC server:
//** the real "NameNode",
//** Since "NameNode" inherited "DatanodeProtocol" also,
//** it can accept this request, and invoke the same method in it,
//** after method returns, it send the results to "Invoker",
//** then, "Invoker" returns the results to RPC client:
//** the "DataNode", like a native method call.
nsInfo = namenode.versionRequest();
break;
} catch(SocketTimeoutException e) {
...

The definition of “versionRequest” method is here:

1
2
//** org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.java L143
public NamespaceInfo versionRequest() throws IOException;

It’s just an interface API, nothing else.

But with the help of Java dynamic proxy, all work are done done by the real “NameNode”:

1
2
3
4
//** org.apache.hadoop.hdfs.server.namenode.NameNode.java L1088
public NamespaceInfo versionRequest() throws IOException {
return namesystem.getNamespaceInfo();
}