yanf4j引入了客户端非阻塞API

yanf4j发布一个0.50-beta2版本,这个版本最重要的改进就是引入了客户端 连接非阻塞API,主要最近的工作要用到,所以添加了。两个核心类 TCPConnecTorController和UDPConnecTorController分别用于TCP和UDP的客户端 连接控制。例如,现在的UDP echo client可以写成:

//客户端echo handler     class EchoClientHandler extends HandlerAdapter {        public void onReceive(Session udpSession, Object t) {            DatagramPacket datagramPacket = (DatagramPacket) t;            System.out.println("recv:" + new String(datagramPacket.getData()));        }        @Override        public void onMessageSent(Session session, Object t) {            System.out.println("send:" + new String((byte[]) t));        }    }       //连接代码,并发送UDP包        UDPConnecTorController connecTor = new UDPConnecTorController();        connecTor.setSoTimeout(1000);        connecTor.setHandler(new EchoClientHandler());        connecTor.connect(new InetSocketAddress(InetAddress.getByName(host),                port));        for (int i = 0; i < 10000; i++) {            String s = "hello " + i;            DatagramPacket packet = new DatagramPacket(s.getBytes(), s.length());            connecTor.send(packet);        }

UDP不是面向连接的,因此connect方法仅仅是调用了底层 DatagramChannel.connect方法,用来限制接收和发送的packet的远程端点。

再来看看TCPConnecTorController的使用,同样看Echo Client的实现:

//客户端的echo handlerclass EchoHandler extends HandlerAdapter<String> {        @Override        public void onConnected(Session session) {            try {                //一连接就发送NUM个字符串                for (int i = 0; i < NUM; i++)                    session.send(generateString(i));             } catch (Exception e) {             }        }        public String generateString(int len) {            StringBuffer sb = new StringBuffer();            for (int i = 0; i < MESSAGE_LEN; i++)                sb.append(i);            return sb.toString();        }        @Override        public void onReceive(Session session, String t) {            //打印接收到字符串            if (DEBUG)                System.out.println("recv:" + t);        }    }//...连接API,TCPConnecTorController示例    Configuration configuration = new Configuration();        configuration.setTcpSessionReadBufferSize(256 * 1024); // 设置读的缓冲区大小    TCPConnecTorController    connecTor = new TCPConnecTorController(configuration,                new StringCodecFacTory());    connecTor.setHandler(new EchoHandler());    connecTor.setCodecFacTory(new StringCodecFacTory());    try {            connecTor.Connect(new InetSocketAddress("localhost", 8080));    } catch (IOExceptione) {            e.printStackTrace();    }

注意,connect方法并不阻塞,而是立即返回,连接是否建立可以通过 TCPConnecTorController.isConnected()方法来判断,因此通常你可能会这样使 用:

try {            connecTor.Connect(new InetSocketAddress("localhost", 8080));            while(!connecTor.isConnected())                ;        } catch (Exception e) {            e.printStackTrace();        }

来强制确保后面对connecTor的使用是已经连接上的connecTor,然而更好的 做法是在Handler的onConnected()回调方法中处理逻辑,因为这个方法仅仅在连 接建立后才会被调用。

两个ConnecTorController都有系列send方法,用于发送数据:

TCPConnecTorController.send(Object msg) throws InterruptedExceptionUDPConnecTorController.send(DatagramPacket packet) throws InterruptedExceptionUDPConnecTorController.send(SocketAddress targetAddr, Object msg) throws InterruptedException

0.50-beta2带来的另一个修改就是Session接口添加setReadBufferByteOrder 方法,用于设置session接收缓冲区的字节序,默认是网络字节序,也就是大端 法。这个方法建议在Handler的onSessionStarted回调方法中调用。

在0.50-beta最重要的修改是引入了session发送队列缓冲区的流量控制选项 。默认情况下,session的发送缓冲队列是无界的,队列的push和pop也全然不会 阻塞。在设置了缓冲队列的高低水位选项后即引入了发送流量控制,规则如下:

a)当发送队列中的数据总量大于高水位标记(highWaterMark), Session.send将阻塞

b)在条件a的作用下,Session.send的阻塞将持续到发送队列中的数据总量小 于于低水位标记(lowWaterMark)才解除。

缓冲队列高低水位的设置通过Controller的下列方法设置:

public void setSessionWriteQueueHighWaterMark(int highWaterMark);

public void setSessionWriteQueueLowWaterMark(int lowWaterMark);

缓冲队列的流量控制想法来自ACE的ACE_Message_Queue,是通过 com.Google.code.yanf4j.util.MessageQueue类实现的。

0.50-beta还引入了Session.send(Object msg)的重载版本 Session.send (Object msg,long timeout),在超过timeout时间后send仍然阻塞时即终止send 。注意,现在Session.send的这两个方法都返回一个bool值来表示send成功与否 ,并且都将响应中断(仅限启动了流量控制选项)抛出InterruptedException。

告诉自己,我这次失败了,

yanf4j引入了客户端非阻塞API

相关文章:

你感兴趣的文章:

标签云: