Java之NIO(二)selector socketChannel

上篇文章对NIO进行了简介,对Channel和Buffer接口的使用进行了说明,并举了一个简单的例子来说明其使用方法。

本篇则重点说明selector,Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。

与selector联系紧密的是ServerSocketChannel和SocketChannel,他们的使用与上篇文章描述的FileChannel的使用方法类似,然后与ServerSocket和Socket也有一些联系。

本篇首先简单的进selector进行说明,然后一个简单的示例程序,来演示即时通讯。

Selector

使用传统IO进行网络编程,如下图所示:

每一个到服务端的连接,都需要一个单独的线程(或者线程池)来处理其对应的socket,当连接数多的时候,对服务端的压力极大。并使用socket的getInputStream。Read方法来不断的轮训每个socket,效率可想而知。

而selector则可以在同一个线程中监听多个channel的状态,当某个channel有selector感兴趣的事情发现,selector则被激活。即不会主动去轮询。如下图所示:

Selector使用如下示意:

public static void main(String[] args) throws IOException {Selector selector = Selector.open();//声明selectorServerSocketChannel sc = ServerSocketChannel.open();sc.configureBlocking(false);//必须设置为异步sc.socket().bind(new InetSocketAddress(8081));//绑定端口//把channel 注册到 selector上sc.register(selector, SelectionKey.OP_ACCEPT|SelectionKey.OP_CONNECT|SelectionKey.OP_READ|SelectionKey.OP_WRITE);while(true){selector.select();//阻塞,直到注册的channel上某个感兴趣的事情发生Set<SelectionKey> selectedKeys = selector.selectedKeys();Iterator<SelectionKey> keyIterator = selectedKeys.iterator();while(keyIterator.hasNext()) {SelectionKey key = keyIterator.next();if(key.isAcceptable()) {// a connection was accepted by a ServerSocketChannel.} else if (key.isConnectable()) {// a connection was established with a remote server.} else if (key.isReadable()) {// a channel is ready for reading} else if (key.isWritable()) {// a channel is ready for writing}keyIterator.remove();}}}极简即时通讯

本例子是是一个极为简单的例子,很多地方都不完善,但是例子可以很好的说明selector的使用方法。

本例子包含服务端和客户端两个部分,其中服务端采用两个selector,用来建立连接和数据的读写。两个selector在两个线程中。

服务端

/** * 简单的即时通讯服务端,采用建立连接 selector和数据 selector分离。很不完善 * */public class ServerSocketChannelTest {private static final int SERVER_PORT = 8081;private ServerSocketChannel server;private volatile Boolean isStop = false;//负责建立连接的selector private Selector conn_Sel; //负责数据读写的selector private Selector read_Sel; // private ExecutorService sendService = Executors.newFixedThreadPool(3);//锁,用来在建立连接后,唤醒read_Sel时使用的同步 private Object lock = new Object();//注册的用户 private Map<String, ClientInfo> clents = new HashMap<String, ClientInfo>();/*** 初始化,绑定端口*/ public void init() throws IOException {//创建ServerSocketChannelserver = ServerSocketChannel.open();//绑定端口server.socket().bind(new InetSocketAddress(SERVER_PORT));server.configureBlocking(false);//定义两个selectorconn_Sel = Selector.open();read_Sel = Selector.open();//把channel注册到selector上,第二个参数为兴趣的事件server.register(conn_Sel, SelectionKey.OP_ACCEPT);}// 负责建立连接。 private void beginListen() {System.out.println("——–开始监听———-");while (!isStop) {try {conn_Sel.select();} catch (IOException e) {e.printStackTrace();continue;}Iterator<SelectionKey> it = conn_Sel.selectedKeys().iterator();while (it.hasNext()) {SelectionKey con = it.next();it.remove();if (con.isAcceptable()) {try {SocketChannel newConn = ((ServerSocketChannel) con.channel()).accept();handdleNewInConn(newConn);} catch (IOException e) {e.printStackTrace();continue;}} else if (con.isReadable()) {//废代码,执行不到。try {handleData((SocketChannel) con.channel());} catch (IOException e) {e.printStackTrace();}}}} }/*** 负责接收数据*/ private void beginReceive(){System.out.println("———begin receiver data——-");while (true) {synchronized (lock) {}try {read_Sel.select();} catch (IOException e) {e.printStackTrace();continue;}Iterator<SelectionKey> it = read_Sel.selectedKeys().iterator();while (it.hasNext()) {SelectionKey con = it.next();it.remove();if (con.isReadable()) {try {handleData((SocketChannel) con.channel());} catch (IOException e) {e.printStackTrace();}}}} }private void handdleNewInConn(SocketChannel newConn) throws IOException {newConn.configureBlocking(false);//这里必须先唤醒read_Sel,然后加锁,防止读写线程的中select方法再次锁定。synchronized (lock) {read_Sel.wakeup();newConn.register(read_Sel, SelectionKey.OP_READ);}//newConn.register(conn_Sel, SelectionKey.OP_READ); }private void handleData(final SocketChannel data) throws IOException {ByteBuffer buffer = ByteBuffer.allocate(512);try {int size= data.read(buffer);if (size==-1) {System.out.println("——-连接断开—–");//这里暂时不处理,这里可以移除已经注册的客户端}} catch (IOException e) {e.printStackTrace();return;}buffer.flip();byte[] msgByte = new byte[buffer.limit()];buffer.get(msgByte);Message msg = Message.getMsg(new String(msgByte));//这里读完数据其实已经可以另开线程了下一步的处理,理想情况下,根据不同的消息类型,建立不同的队列,把待发送的消息放进队列//当然也可以持久化。如果在数据没有读取前,另开线程的话,读写线程中 read_Sel.select(),会立刻返回。可以把if (msg.getType().equals("0")) {// 注册ClientInfo info = new ClientInfo(msg.getFrom(), data);clents.put(info.getClentID(), info);System.out.println(msg.getFrom() + "注册成功");} else {// 转发System.out.println("收到"+msg.getFrom()+"发给"+msg.getTo()+"的消息");ClientInfo to = clents.get(msg.getTo());buffer.rewind();if (to != null) {SocketChannel sendChannel = to.getChannel();try {while (buffer.hasRemaining()) {sendChannel.write(buffer);}} catch (Exception e) {}finally {buffer.clear();}}}}public static void main(String[] args) throws IOException {final ServerSocketChannelTest a = new ServerSocketChannelTest();a.init();new Thread("receive…"){public void run() {a.beginReceive();};}.start();a.beginListen();} }

客户端

出门走好路,出口说好话,出手做好事。

Java之NIO(二)selector socketChannel

相关文章:

你感兴趣的文章:

标签云: