Hadoop异步rpc通信机制–org.apache.hadoop.ipc.Server
Java NOI非阻塞技术不是开启线程去等待端口的响应,而是采用Reactor模式或Observer模式监听I/O端口,当端口有响应时,会自动通知我们,从而实现流畅的I/O读写。
Java NOI中selector可视为一个观察者,只要我们把要观察的SocketChannel告诉Selector(注册的方式),我们就可以做其余的事情,等到已告知Channel上有事情发生时,Selector会通知我们,传回一组SelectionKey,我们读取这些Key,就可以获得Channel上的数据了。
Client端的底层通信直接采用了阻塞式IO编程,Server是采用Java NIO机制进行RPC通信:
java NIO参考资料:
http://www.iteye.com/topic/834447
http://weixiaolu.iteye.com/blog/1479656
=========================================================================================================================
Server是一个abstract类,抽象之处在call方法中,RPC.Server是ipc.Server的实现类,RPC.Server的构造函数调用了ipc.Server类的构造函数的,Namenode在初始化时调用RPC.getServer方法初始化了RPC.Server:
public static Server getServer(final Object instance, final String bindAddress, final int port, final int numHandlers, final boolean verbose, Configuration conf, SecretManager<? extends TokenIdentifier> secretManager) throws IOException { return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager); }
Server.Call是一个请求类,类似Client.Call,只是添加了Call的时间戳机制:
private static class Call { private int id; // 请求id private Writable param; // 请求的参数 private Connection connection; // 和Client一样,表示一个C/S间的连接 private long timestamp; // 时间戳 private ByteBuffer response; // server对此次请求的响应结果 ... }
知道了Client.Connection后,显然Server.Connection就是Server到Client的连接。Server.Connection内保存了Client的地址,用于灾难恢复。Server.Connection通过调用readAndProcess对Client进行一些操作:版本校验,读数据头processHeader(获取通信协议protocol,根据头部的ugi信息创建user对象)以及读数据processData(获取Client发送过来的Call.id和params,根据二者建立一个请求call,并将请求call入队callQueue)【readAndProcess方法是在Listener.doRead时调用,此时监听器监听到新连接的读数据事件】。
public int readAndProcess() throws IOException, InterruptedException { //先对connection进行版本校验,校验成功后读取Header头部信息(得到客户端所用的协议和客户端的标识user) //,接着读取数据(Call.id和参数params,其中params),然后建立一个Call while (true) { /* Read at most one RPC. If the header is not read completely yet * then iterate until we read first RPC or until there is no data left. */ int count = -1; if (dataLengthBuffer.remaining() > 0) { count = channelRead(channel, dataLengthBuffer); if (count < 0 || dataLengthBuffer.remaining() > 0) return count; } if (!versionRead) {//尚未版本验证 //Every connection is expected to send the header. ByteBuffer versionBuffer = ByteBuffer.allocate(1); count = channelRead(channel, versionBuffer); if (count <= 0) { return count; } int version = versionBuffer.get(0); //要读取BufferByte前要先flip下 dataLengthBuffer.flip();//.flip();一定得有,如果没有,就是从最后开始读取的,当然读出来的都是byte=0时候的字符。 //通过buffer.flip();这个语句,就能把buffer的当前位置更改为buffer缓冲区的第一个位置 if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) { //Warning is ok since this is not supposed to happen. LOG.warn("Incorrect header or version mismatch from " + hostAddress + ":" + remotePort + " got version " + version + " expected version " + CURRENT_VERSION); return -1; } dataLengthBuffer.clear();//清除内容 versionRead = true;//验证版本了 continue; } if (data == null) {//分配新的data dataLengthBuffer.flip(); dataLength = dataLengthBu