//** org.apache.hadoop.ipc.Client.java L165 /** A call waiting for a value. */ privateclassCall{ //** "id" is used to vary calls int id; // call id //** "param" includes the method name and parameters //** of a RPC call Writable param; // parameter //** "value" is the return value of RPC call Writable value; // value, null if error IOException error; // exception, null if value boolean done; // true when call is done
//** Add param to "Call" instance, //** and increase the counter by 1 as call id. protectedCall(Writable param){ this.param = param; synchronized (Client.this) { this.id = counter++; } }
/** Indicate when the call is complete and the * value or error are available. Notifies by default. */ //** As metioned above, "call.wait" is used to wait //** for the return value at final step. //** "notify" is used to tell the client to stop waiting, //** the return value is ready. protectedsynchronizedvoidcallComplete(){ this.done = true; notify(); // notify caller }
...
//** This method will be called when "connection" receives //** response from server, "connection" sets the value, //** and call "callComplete". publicsynchronizedvoidsetValue(Writable value){ this.value = value; callComplete(); } }
//** org.apache.hadoop.ipc.Client.java L1221 /** Get a connection from the pool, or create a new one and add it to the pool. Connections to a given ConnectionId are reused. */ private Connection getConnection(ConnectionId remoteId, Call call) throws IOException, InterruptedException { if (!running.get()) { // the client is stopped thrownew IOException("The client is stopped"); } Connection connection; /* we could avoid this allocation for each RPC by having a * connectionsId object and with set() method. We need to manage the * refs for keys in HashMap properly. For now its ok. */ do { //** "connections" is a "Hashtable" instance defined as follows: //** private Hashtable<connectionid , Connection> connections = //** new Hashtable</connectionid><connectionid , Connection>();
//** Here, we are sure that "remoteId" does represent the connection, //** as the key in <remoteid , connection> pair.
//** Lock is needed, since it's check-then-act sequence, //** which is not thread safe at most of the time. synchronized (connections) { connection = connections.get(remoteId); if (connection == null) { //** If connection represents by "remoteId" can't be found, //** create new "Connection" instance using "remoteId" as parameter. connection = new Connection(remoteId); //** Add the newly created "connection" to "connections" connections.put(remoteId, connection); } } //** Add "call" instance to "connection" } while (!connection.addCall(call));
//we don't invoke the method below inside "synchronized (connections)" //block above. The reason for that is if the server happens to be slow, //it will take longer to establish a connection and that will slow the //entire system down. connection.setupIOstreams(); return connection; }
I forget to mention that class Connection inherits
Thread.
When it runs, it keeps reading responses and notifying callers. But
how does connection setup the connection to remote
server?
Parameters about connection are set in Connection
contructor, and real connection setup is done in method
connection.setupIOstreams, we’ll look at them
separately.
//** Each connection begins with this protocol header, //** which will be parsed by Server to get useful information. //** We'll discuss it in later posts about RPC Server. header = new ConnectionHeader(protocol == null ? null : protocol .getName(), ticket, authMethod);
if (LOG.isDebugEnabled()) LOG.debug("Use " + authMethod + " authentication for protocol " + protocol.getSimpleName());
this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " + remoteId.getAddress().toString() + " from " + ((ticket==null)?"an unknown user":ticket.getUserName())); //** Set "connection" thread as daemon thread. this.setDaemon(true); }
//** org.apache.hadoop.ipc.Client.java L519 /** Connect to the server and set up the I/O streams. It then sends * a header to the server and starts * the connection thread that waits for responses. */ privatesynchronizedvoidsetupIOstreams()throws InterruptedException { if (socket != null || shouldCloseConnection.get()) { return; }
try { if (LOG.isDebugEnabled()) { LOG.debug("Connecting to "+server); } short numRetries = 0; finalshort maxRetries = 15; Random rand = null; while (true) { //** Connects to remote server by Socket setupConnection(); InputStream inStream = NetUtils.getInputStream(socket); OutputStream outStream = NetUtils.getOutputStream(socket); //** Send RPC header to server, //** includes header, version, and authentication method writeRpcHeader(outStream);
//** Ignore SASL authentication code ...
this.in = new DataInputStream(new BufferedInputStream (new PingInputStream(inStream))); this.out = new DataOutputStream (new BufferedOutputStream(outStream)); //** Write the protocol header as the beginning of each connection, //** which is created in "Connection" constructor. writeHeader();
// update last activity time touch();
// start the receiver thread after the socket connection has been set up start(); return; } } catch (Throwable t) { if (t instanceof IOException) { markClosed((IOException)t); } else { markClosed(new IOException("Couldn't set up IO streams", t)); } close(); } }
3. Send “Call” instance to
Server
Connection to remote server is set by step two, step three is
simple:
//** org.apache.hadoop.ipc.client.java L753 /** Initiates a call by sending the parameter to the remote server. * Note: this is not called from the Connection thread, but by other * threads. */ publicvoidsendParam(final Call call)throws InterruptedException { if (shouldCloseConnection.get()) { return; }
// lock the connection for the period of submission and waiting // in order to bound the # of threads in the executor by the number // of connections //** Allow another "sendParam" only when previous "sendParam" completed. synchronized (sendParamsLock) { //** Submit a "Future" task to thread pool, //** SEND_PARAMS_EXECUTOR actually is //** Executors.newCachedThreadPool(DAEMON_THREAD_FACTORY) Future senderFuture = SEND_PARAMS_EXECUTOR.submit(new Runnable() { @Override publicvoidrun(){ DataOutputBuffer d = null;
try { //** Lock the "out" stream created by "setIOstreams" synchronized (Connection.this.out) { if (shouldCloseConnection.get()) { return; } if (LOG.isDebugEnabled()) { LOG.debug(getName() + " sending #" + call.id); }
//for serializing the //data to be written d = new DataOutputBuffer(); //** Write call id to d d.writeInt(call.id); //** Still write call param to d //** This naming convension makes fun of us! call.param.write(d); //** Write data length to d byte[] data = d.getData(); int dataLength = d.getLength(); out.writeInt(dataLength); //first put the data length //** Send them all to server out.write(data, 0, dataLength);//write the data out.flush(); } } catch (IOException e) { markClosed(e); } finally { //the buffer is just an in-memory buffer, but it is still polite to // close early IOUtils.closeStream(d); } } });
try { //** Will block if above task is not complete. senderFuture.get(); } catch (ExecutionException e) { Throwable cause = e.getCause();
// cause should only be a RuntimeException as the Runnable above // catches IOException if (cause instanceof RuntimeException) { throw (RuntimeException) cause; } else { thrownew RuntimeException("checked exception made it here", cause); } } } }
4. Call waits the result
At first step, I metioned that call.wait will be
notified to stop waiting and get the return value if
call.setValue is called.
Let’s see what did the connection thread do to achieve
this.
//** org.apache.hadoop.ipc.Client.java L717 publicvoidrun(){ if (LOG.isDebugEnabled()) LOG.debug(getName() + ": starting, having connections " + connections.size());
try { //** Infinite loop if "waitForWork" always return true while (waitForWork()) {//wait here for work - read or close connection //** Receive response from server. receiveResponse(); } } catch (Throwable t) { // This truly is unexpected, since we catch IOException in receiveResponse // -- this is only to be really sure that we don't leave a client hanging // forever. LOG.warn("Unexpected error reading responses on connection " + this, t); markClosed(new IOException("Error reading responses", t)); } //** Close the "connection" close();
//** org.apache.hadoop.ipc.Client.java L674 /* wait till someone signals us to start reading RPC response or * it is idle too long, it is marked as to be closed, * or the client is marked as not running. * * Return true if it is time to read a response; false otherwise. */ privatesynchronizedbooleanwaitForWork(){ //** If no call exists, should not close, and client is running, //** then wait. if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { long timeout = maxIdleTime- (System.currentTimeMillis()-lastActivity.get()); if (timeout>0) { try { wait(timeout); } catch (InterruptedException e) {} } } //** If call exists, should not close, client is running, //** then, receive response from server. if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { returntrue; } elseif (shouldCloseConnection.get()) { //** if connection should be closed, //** return false. returnfalse; } elseif (calls.isEmpty()) { // idle connection closed or stopped //** If no call in connection for a long time, //** mark the connection to should be closed with no exception. //** Code path enters here only when "timeout" is less than 0, //** which means idle time exceeds maximum bounds. markClosed(null); returnfalse; } else { // get stopped but there are still pending requests //** Mark the connection to should be closed with exception. markClosed((IOException)new IOException().initCause( new InterruptedException())); returnfalse; } }
//** org.apache.hadoop.ipc.Client.java L808 /* Receive a response. * Because only one receiver, so no synchronization on in. */ privatevoidreceiveResponse(){ if (shouldCloseConnection.get()) { return; } touch();
try { int id = in.readInt(); // try to read an id
if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + id);
//** Get call instance by its call id. Call call = calls.get(id);
int state = in.readInt(); // read call status //** If call state is success, read input if (state == Status.SUCCESS.state) { Writable value = ReflectionUtils.newInstance(valueClass, conf); //** Read value from in value.readFields(in); //** Finally, we see "call.setValue", //** it will stop "call.wait" in client thread, //** then, RPC call finishes, //** and "call" is removed from "calls" set. call.setValue(value); calls.remove(id); } elseif (state == Status.ERROR.state) { call.setException(new RemoteException(WritableUtils.readString(in), WritableUtils.readString(in))); calls.remove(id); } elseif (state == Status.FATAL.state) { // Close the connection markClosed(new RemoteException(WritableUtils.readString(in), WritableUtils.readString(in))); } } catch (IOException e) { markClosed(e); } }
Before ending this post, kinda long :), there are three more classes
I think it’s better to talk about.
The first one is ConnectionId, remoteId
appears dozens of times, It’s unfair to not talked about it.
// org.apache.hadoop.ipc.Client.java L1270 //** Below is the constructor of "ConnectionId", //** "ConnectionId" holds the address of the remote server, //** as well as protocol and user tickets.
//** The client connections to servers are uniquely identified //** by <remoteaddress , protocol, tiket>
//** "remoteId" is set in "Invoker" contructor by //** this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, //** ticket, rpcTimeout, conf); //** before "invoke" dynamic proxy dispatching happens. ConnectionId(InetSocketAddress address, Class< ?> protocol, UserGroupInformation ticket, int rpcTimeout, String serverPrincipal, int maxIdleTime, int maxRetries, boolean tcpNoDelay, boolean doPing, int pingInterval) { this.protocol = protocol; this.address = address; this.ticket = ticket; this.rpcTimeout = rpcTimeout; this.serverPrincipal = serverPrincipal; this.maxIdleTime = maxIdleTime; this.maxRetries = maxRetries; this.tcpNoDelay = tcpNoDelay; this.doPing = doPing; this.pingInterval = pingInterval; }
The other two classes are ParallelCall and
ParallelResults.
ParallelCall inherits Call, and is created
to support parallel method invocation, which means multiple calls can be
invoked by a single Client.call.
Not only it adds the writable param, but also a
ParallelResults instance as well as a call index. Call
index is used to vary calls in parallel, and get the return value of
certain call from server since the results might be out of order.
//** org.apache.hadoop.ipc.Client.java L905 /** Call implementation used for parallel calls. */ privateclassParallelCallextendsCall{ private ParallelResults results; privateint index;
publicParallelCall(Writable param, ParallelResults results, int index){ //** Add param super(param); //** Add parallel results and call index this.results = results; this.index = index; }
/** Deliver result to result collector. */ //** Overwrite "callComplete", process it in "ParallelResults" protectedvoidcallComplete(){ results.callComplete(this); } }
/** Result collector for parallel calls. */ privatestaticclassParallelResults{ private Writable[] values; privateint size; privateint count;
publicParallelResults(int size){ this.values = new Writable[size]; this.size = size; }
/** Collect a result. */ publicsynchronizedvoidcallComplete(ParallelCall call){ //** Set n-th call return value to "values" //** according to call index. values[call.index] = call.value; // store the value //** Increase counter count++; // count it //** If all calls in this parallel set all get their return value, //** then, notify "call.wait" to stop, and get the return values. if (count == size) // if all values are in notify(); // then notify waiting caller } }
I searched, but didn’t find any code using ParallelCall
except following testing code.
1 2 3 4 5
//** org.apache.hadoop.ipc.TestIPC.java L159 Writable[] params = new Writable[addresses.length]; for (int j = 0; j < addresses.length; j++) params[j] = new LongWritable(RANDOM.nextLong()); Writable[] values = client.call(params, addresses, null, null, conf);