This will be the first article of a series of blogs I am about to
post, which are all about what I learned from reading the Hadoop source code (cdh3u2).
Code snippets will be full of those posts, to not confuse you, all
comments added by me begin with //** instead of
// or /*.
publicclassFakeHelloimplementsInvocationHandler{ private Object agent; publicFakeHello(Object obj){ this.agent = obj; } //** Hijacking real method invocation public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { System.out.println("Hello Hadoop first"); //** Real method invocation happens here Object result = method.invoke(agent, args); return result; } }
publicvoidtest(){ RealHello realHello = new RealHello(); FakeHello fakeHello = new FakeHello(realHello); //** "hello" will be a proxy, //** whose method invocation will be dispatched to "fakeHello". Hello hello = (Hello) Proxy.newProxyInstance( Hello.class.getClassLoader(), new Class[] {Hello.class}, fakeHello); hello.sayHello(); }
publicstaticvoidmain(String[] args){ ProxyTester proxyTester = new ProxyTester(); proxyTester.test(); } }
The results:
1 2
Hello Hadoop first Hello, World
What’s happened? What does “Hello Hadoop first” come from?
It’s clear that method invocations to “hello” were hijacked to
“fakeHello”, but how?
The magic trick is called Java dynamic proxy, catered by two steps
below:
Step 1: Use “Proxy.newProxyInstance” to return an
instance of a proxy class for the specified interfaces that dispatches
method invocations to the specified invocation handler.
In our case, interface is “Hello”, invocation handler is
“fakeHello”
Step 2: Define a class inherits “InvocationHandler”,
and implements “invoke” method.
In our case, the “FakeHello” inherits “InvocationHandler” and is
designed to add “realHello” as a property, in “invoke” method, we can do
whatever we want (e.g. prints “Hello Hadoop first”), and then use
“method.invoke” to do true “Hello” action (e.g. prints “Hello
World”).
//** org.apache.hadoop.ipc.RPC.java L330 static VersionedProtocol waitForProxy( Class< ? extends VersionedProtocol> protocol, long clientVersion, InetSocketAddress addr, Configuration conf, int rpcTimeout, long connTimeout )throws IOException { long startTime = System.currentTimeMillis(); IOException ioe; while (true) { try { //** This method will do the proxy setting return getProxy(protocol, clientVersion, addr, UserGroupInformation.getCurrentUser(), conf, NetUtils .getDefaultSocketFactory(conf), rpcTimeout); } catch(ConnectException se) { // namenode has not been started ... }
//** org.apache.hadoop.ipc.RPC.java L387 publicstatic VersionedProtocol getProxy( Class< ? extends VersionedProtocol> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout)throws IOException { if (UserGroupInformation.isSecurityEnabled()) { SaslRpcServer.init(conf); } //** Familiar? (Step 1) //** "proxy" is a proxy, it dispatches all "protocol" method invocations to "new Invoker", //** Now, Step 1 is done, but where is the class which inherits InvocationHandler? VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)); long serverVersion = proxy.getProtocolVersion(protocol.getName(), clientVersion); if (serverVersion == clientVersion) { return proxy; ... }
//** All "proxy" method invocations are dispatched to this place, //** the "invoke" method accepts request of hadoop client (e.g. DataNode), //** then dispatches the request to hadoop server (e.g. NameNode), //** but client knows nothing about the server, //** it only invokes a method call on proxy. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { finalboolean logDebug = LOG.isDebugEnabled(); long startTime = 0; if (logDebug) { startTime = System.currentTimeMillis(); } //** "client.call" will connect to the RPC server, //** invoke method on it using same method name, //** and wait for the results. //** Next post will discuss what's under the hood of it. ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId); if (logDebug) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); } return value.get(); } ... }
OK, we know how the proxy works. Go back to DataNode.java, let’s see
how “DataNode” uses the proxy.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
//** org.apache.hadoop.hdfs.server.datanode.DataNode.java L530 private NamespaceInfo handshake()throws IOException { NamespaceInfo nsInfo = new NamespaceInfo(); while (shouldRun) { try { //** "namenode" is an instance of interface "DatanodeProtocol", //** method "versionRequest" invocation will be //** dispatched to the RPC proxy "Invoker", //** then, "Invoker" send this method request to RPC server: //** the real "NameNode", //** Since "NameNode" inherited "DatanodeProtocol" also, //** it can accept this request, and invoke the same method in it, //** after method returns, it send the results to "Invoker", //** then, "Invoker" returns the results to RPC client: //** the "DataNode", like a native method call. nsInfo = namenode.versionRequest(); break; } catch(SocketTimeoutException e) { ...
The definition of “versionRequest” method is here:
1 2
//** org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.java L143 public NamespaceInfo versionRequest()throws IOException;
It’s just an interface API, nothing else.
But with the help of Java dynamic proxy, all work are done done by
the real “NameNode”: