Job类中实现了InvocationHandler 接口的是 Invoker, 这种语法是java语言对代理模式的一种支持,最终对对象的调用都会成为对代理
的调用。下面看看真正的处理代理的代码Invoker。
private static class Invoker implements InvocationHandler { private InetSocketAddress address; private UserGroupInformation ticket; private Client client; private boolean isClosed = false; public Invoker(InetSocketAddress address, UserGroupInformation ticket, Configuration conf, SocketFactory factory) { this.address = address; this.ticket = ticket; this.client = CLIENTS.getClient(conf, factory); } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { final boolean logDebug = LOG.isDebugEnabled(); long startTime = 0; if (logDebug) { startTime = System.currentTimeMillis(); } ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), address, method.getDeclaringClass(), ticket); if (logDebug) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); } return value.get(); } /* close the IPC client that's responsible for this invoker's RPCs */ synchronized private void close() { if (!isClosed) { isClosed = true; CLIENTS.stopClient(client); } } }
其中factory的默认是
<property> <name>hadoop.rpc.socket.factory.class.default</name> <value>org.apache.hadoop.net.SocksSocketFactory</value> </property> client是调用ClientCach类的getClient方法得到,这个类是对Client的一个缓存。之后调用client.call(...)来远程调用改方法, 其中最重要的参数是实现了Writable接口的类Invocation的一个实例,client.call(...)方法如下所示 /** Make a call, passing <code>param</code>, to the IPC server running at * <code>address</code> which is servicing the <code>protocol</code> protocol, * with the <code>ticket</code> credentials, returning the value. * Throws exceptions if there are network problems or if the remote code * threw an exception. */ public Writable call(Writable param, InetSocketAddress addr, Class<?> protocol, UserGroupInformation ticket) throws InterruptedException, IOException { Call call = new Call(param); //建立一个socket , 包括输入输出流in , //out 开启接受线程,发出头消息 Connection connection = getConnection(addr, protocol, ticket, call); //发送参数 connection.sendParam(call); // send the parameter boolean interrupted = false; synchronized (call) { while (!call.done) { try { call.wait(); // wait for the result } catch (InterruptedException ie) { // save the fact that we were interrupted interrupted = true; } } if (interrupted) { // set the interrupt flag now that we are done waiting Thread.currentThread().interrupt(); } if (call.error != null) { if (call.error instanceof RemoteException) { call.error.fillInStackTrace(); throw call.error; } else { // local exception throw wrapException(addr, call.error); } } else { return call.value; } } Connection是一个线程,用来进行socket连接和send(call)的,这里可以看到大师写的代码是严谨的,值得学习。在call.wait之后, 调用interrupt将线程中止。 sendParams会调用NetUtils类的connect方法去连接socket的输出流,建立的tcp连接是禁止Nagle算法的,也就是NO_DELAY,方法是
public static OutputStream getOutputStream(Socket socket, long timeout) throws IOException { return (socket.getChannel() == null) ? socket.getOutputStream() : new SocketOutputStream(socket, timeout); }