Extra Cookie

Yet Another Programmer's Blog

Hadoop RPC – Client (1)

In last post, I metioned that all RPC client calls will be dispatched to Invoker.invoke() method by the dynamic proxy, mainly work of method invoke are done by client.call.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//** org.apache.hadoop.ipc.RPC.java L218
public Object invoke(Object proxy, Method method, Object[] args)
  throws Throwable {
  final boolean logDebug = LOG.isDebugEnabled();
  long startTime = 0;
  if (logDebug) {
    startTime = System.currentTimeMillis();
  }
  //** Request comes to "invoke",
  //** then it calls "client.call",
  //** and get the result "value",
  //** then return it back by "value.get".
  //** The first parameter is an instance of "Invocation"
  ObjectWritable value = (ObjectWritable)
    client.call(new Invocation(method, args), remoteId);
  if (logDebug) {
    long callTime = System.currentTimeMillis() - startTime;
    LOG.debug("Call: " + method.getName() + " " + callTime);
  }
  return value.get();
}

There are two parameters in method client.call, the first one is an instance of Invocation, the second one is remoteId, which is easy to guess that it might be used to represent the connection between client and server.

Before step into client.call to see what are done there, it’s better to know what the Invocation is.

Invocation implements Writable interface, so, let’s look at Writable first.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//** org.apache.hadoop.io.Writable.java L61

//** This "Writabla" interface is hadoop's own serializable mechanism
//** based on DataInput and DataOutput.

//** Any key or value type in the Hadoop Map-Reduce framework
//** implements this interface.

//** Implementations typically implement a static read(DataInput) method
//** which constructs a new instance,
//** calls readFields(DataInput) and returns the instance.
public interface Writable {
/**
 * Serialize the fields of this object to <code>out</code>.
 *
 * @param out <code>DataOuput</code> to serialize this object into.
 * @throws IOException
 */
void write(DataOutput out) throws IOException;

/**
 * Deserialize the fields of this object from <code>in</code>.
 *
 * <p>For efficiency, implementations should attempt to re-use storage in the
 * existing object where possible.</p>
 *
 * @param in <code>DataInput</code> to deseriablize this object from.
 * @throws IOException
 */
void readFields(DataInput in) throws IOException;
}

But Java has it’s own seriablization mechanism, why not use it? Doug Cutting said in response to that question:

Why didn’t I use Serialization when we first started Hadoop? Because it looked big and hairy and I thought we needed something lean and mean, where we had precise control over exactly how objects are written and read, since that is central to Hadoop. With Serialization you can get some control, but you have to fight for it.

The logic for not using RMI was similar. Effective, high-performance inter-process communications are critical to Hadoop. I felt like we’d need to precisely control how things like connections, timeouts and buffers are handled, and RMI gives you little control over those.

Invocation must implements two methods, readFields and write.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
//** org.apache.hadoop.ipc.RPC.java L75
/** A method invocation, including the method name and its parameters.*/
private static class Invocation implements Writable, Configurable {
  private String methodName;
  private Class[] parameterClasses;
  private Object[] parameters;
  private Configuration conf;

  ...

  public Invocation(Method method, Object[] parameters) {
    this.methodName = method.getName();
    this.parameterClasses = method.getParameterTypes();
    this.parameters = parameters;
  }

  ...
  //** Deserialize data
  public void readFields(DataInput in) throws IOException {
    //** Read method name first, then number of parameters,
    //** and all parameters.
    methodName = UTF8.readString(in);
    parameters = new Object[in.readInt()];
    parameterClasses = new Class[parameters.length];
    ObjectWritable objectWritable = new ObjectWritable();
    for (int i = 0; i < parameters.length; i++) {
      //** This static method read a Writable, String, primitive type,
      //** or an array from "in".
      parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);
      parameterClasses[i] = objectWritable.getDeclaredClass();
    }
  }
  //** Serialize data
  public void write(DataOutput out) throws IOException {
    //** First element: methodName
    UTF8.writeString(out, methodName);
    //** Second element: total number of parameters
    out.writeInt(parameterClasses.length);
    //** And all parameters class type and value
    for (int i = 0; i < parameterClasses.length; i++) {
      //** This static method write a Writable, String, primitive type,
      //** or an array to "out".
      ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
                                 conf);
    }
  }
  ...

ObjectWritable is a polymorphic Writable that writes an instance with it’s class name, handles arrays, strings and primitive types without a Writable wrapper.

In static method ObjectWritable.writeObject, UTF8.writeString(out, parameterClass.getName()) is used to get the class type of the parameter firstly, then, iteratively check the type of parameters, if it’s an array, recursive call this method, if it’s a string, UTF8.writeString(out, (String)instance) is used to set the instance, if it’s a primitive type, out.writeChar out.writeInt, etc. is used to set the instance.

The static method ObjectWritable.readObject is the opposite operation of ObjectWritable.writeObject, it reads parameter class type and parameter instance from in which is written by ObjectWritable.write.

Method ObjectWritable.getDeclaredClass gets the parameter class type which is generated by ObjectWritable.readObject.

This is how are the method name and parameters passed from client to server, they are sent from client to server through networking by a “Writable” object of type Invocation, which is serialized in client and deserialized in server.

Then, let’s step into the method client.call which I mentioned at the beginning of this post.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
//** org.apache.hadoop.ipc.Client.java L1075
//** Logic of this method is easy to understand.
//** Firstly, Create a "Call" instance, passing the "param" to it.
//** Don't mislead by the name "param", it's the "Invocation" we talked earlier,
//** which contains method name and paramters class type and instance.

//** Then, get a client-server connection.

//** In the third place, send the "Call" instance out.

//** And finally, "call.wait" for the result received from server.
public Writable call(Writable param, ConnectionId remoteId)
                     throws InterruptedException, IOException {
  //** "Call" reprents a method invoke,
  //** it has method name and all parameters of an RPC call,
  //** and also a value which is also "Writable"
  //** represents the return value.
  Call call = new Call(param);
  //** Get a connection from the connection pool, or
  //** create a new connection and add to pool,
  //** connection of the same remoted Id is reused.
  Connection connection = getConnection(remoteId, call);
  try {
    connection.sendParam(call);                 // send the parameter
  } catch (RejectedExecutionException e) {
    throw new IOException("connection has been closed", e);
  } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    LOG.warn("interrupted waiting to send params to server", e);
    throw new IOException(e);
  }

  boolean interrupted = false;
  synchronized (call) {
    while (!call.done) {
      try {
        call.wait();                           // wait for the result
      } catch (InterruptedException ie) {
        // save the fact that we were interrupted
        interrupted = true;
      }
    }

    if (interrupted) {
      // set the interrupt flag now that we are done waiting
      Thread.currentThread().interrupt();
    }

    if (call.error != null) {
      if (call.error instanceof RemoteException) {
        call.error.fillInStackTrace();
        throw call.error;
      } else { // local exception
        throw wrapException(remoteId.getAddress(), call.error);
      }
    } else {
      return call.value;
    }
  }
}

We can see that four classes contribute to the client of Hadoop RPC, which are Client, Call (which has one inheritant ParallelCall), Connection, and ConnectionId, the last four are inner classes of Client.

We’ll disucssed them separately in next post.

Hadoop source code cdh3u2 is used in this post.

Comments