架构设计:系统间通信(6)

1、Netty介绍

在Netty官网上,对于Netty的介绍是:

Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.

‘Quick and easy’ doesn’t mean that a resulting application will suffer from a maintainability or a performance issue. Netty has been designed carefully with the experiences earned from the implementation of a lot of protocols such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols. As a result, Netty has succeeded to find a way to achieve ease of development, performance, stability, and flexibility without a compromise.

百度上的中文解释是:

Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

但实际上呢,Netty框架并不只是封装了多路复用的IO模型,也包括提供了传统的阻塞式/非阻塞式 同步IO的模型封装。当然,从Netty官网上的几句中文并不能概括完Netty的全部作用。下面的两篇文章我们将会在您已经理解原生的JAVA NIO框架的基础上,向您介绍Netty的原理和使用。

这里说明一下,讲解Netty并不是我们这个系列“系统间通信”的内容重点。目的是通过讲解IO通信模型、JAVA对各种通信模型的支持、上层的Netty/MINA封装,可以让大家深刻理解“系统间通信”中一个重要要素——信息如何传递。

Netty架构的官方文档可以参看《JBOSS-Netty Architectural Overview》()。

2、Netty快速上手2-1、代码示例

下面这段代码本身就比较好理解,我在其上又加上了比较详细的注解。相信就算您之前没有接触过Netty,也应该是可以看懂的。如果您之前接触过Netty,那您可以发现,这段代码中基本上已经包含了Netty中比较重要的几个概念了:Channel、Buffer、ChannelPipeline、ChannelHandler、ChannelHandlerContext等

是的,我们将从这个示例代码入手,介绍Netty的基本概念和使用。然后我们再回头看看上文中的那个问题:为什么已经有的JAVA NIO框架,还需要一个Netty呢?

package testNetty;import java.net.InetSocketAddress;import java.nio.channels.spi.SelectorProvider;import java.util.concurrent.ThreadFactory;import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.ChannelHandler.Sharable;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.bytes.ByteArrayDecoder;import io.netty.handler.codec.bytes.ByteArrayEncoder;import io.netty.util.AttributeKey;import io.netty.util.concurrent.DefaultThreadFactory;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.log4j.BasicConfigurator;{static {BasicConfigurator.configure();}(String[] args) throws Exception {//这就是主要的服务启动器ServerBootstrap serverBootstrap = new ServerBootstrap();//=======================下面我们设置线程池//BOSS线程池EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);//WORK线程池:这样的申明方式,主要是为了向读者说明Netty的线程组是怎样工作的ThreadFactory threadFactory = new DefaultThreadFactory(“work thread pool”);//CPU个数int processorsNumber = Runtime.getRuntime().availableProcessors();EventLoopGroup workLoogGroup = new NioEventLoopGroup(processorsNumber * 2, threadFactory, SelectorProvider.provider());//指定Netty的Boss线程和work线程serverBootstrap.group(bossLoopGroup , workLoogGroup);serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {/* (non-Javadoc)* @see io.netty.channel.ChannelInitializer#initChannel(io.netty.channel.Channel)*/(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new ByteArrayEncoder());ch.pipeline().addLast(new TCPServerHandler());ch.pipeline().addLast(new ByteArrayDecoder());}});//========================设置netty服务器绑定的ip和端口serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);serverBootstrap.bind(new InetSocketAddress(“0.0.0.0”, 83));//还可以监控多个端口//serverBootstrap.bind(new InetSocketAddress(“0.0.0.0”, 84));}}/** * @author yinwenjie */@Sharableclass TCPServerHandler extends ChannelInboundHandlerAdapter {/*** 日志*/private static Log LOGGER = LogFactory.getLog(TCPServerHandler.class);/*** 每一个channel,都有独立的handler、ChannelHandlerContext、ChannelPipeline、Attribute* 所以不需要担心多个channel中的这些对象相互影响。<br>* 这里我们使用content这个key,,记录这个handler中已经接收到的客户端信息。*/private static AttributeKey<StringBuffer> content = AttributeKey.valueOf(“content”);/* (non-Javadoc)* @see io.netty.channel.ChannelInboundHandlerAdapter#channelRegistered(io.netty.channel.ChannelHandlerContext)*/(ChannelHandlerContext ctx) throws Exception {TCPServerHandler.LOGGER.info(“super.channelRegistered(ctx)”);}/* (non-Javadoc)* @see io.netty.channel.ChannelInboundHandlerAdapter#channelUnregistered(io.netty.channel.ChannelHandlerContext)*/(ChannelHandlerContext ctx) throws Exception {TCPServerHandler.LOGGER.info(“super.channelUnregistered(ctx)”);}/* (non-Javadoc)* @see io.netty.channel.ChannelInboundHandlerAdapter#channelActive(io.netty.channel.ChannelHandlerContext)*/(ChannelHandlerContext ctx) throws Exception {TCPServerHandler.LOGGER.info(“super.channelActive(ctx) = ” + ctx.toString());}/* (non-Javadoc)* @see io.netty.channel.ChannelInboundHandlerAdapter#channelInactive(io.netty.channel.ChannelHandlerContext)*/(ChannelHandlerContext ctx) throws Exception {TCPServerHandler.LOGGER.info(“super.channelInactive(ctx)”);}/* (non-Javadoc)* @see io.netty.channel.ChannelInboundHandlerAdapter#channelRead(io.netty.channel.ChannelHandlerContext, java.lang.Object)*/(ChannelHandlerContext ctx, Object msg) throws Exception {TCPServerHandler.LOGGER.info(“channelRead(ChannelHandlerContext ctx, Object msg)”);/** 我们使用IDE工具模拟长连接中的数据缓慢提交。* 由read方法负责接收数据,但只是进行数据累加,不进行任何处理* */ByteBuf byteBuf = (ByteBuf)msg;try {StringBuffer contextBuffer = new StringBuffer();while(byteBuf.isReadable()) {contextBuffer.append((char)byteBuf.readByte());}//加入临时区域StringBuffer content = ctx.attr(TCPServerHandler.content).get();if(content == null) {content = new StringBuffer();ctx.attr(TCPServerHandler.content).set(content);}content.append(contextBuffer);} catch(Exception e) {throw e;} finally {byteBuf.release();}}/* (non-Javadoc)* @see io.netty.channel.ChannelInboundHandlerAdapter#channelReadComplete(io.netty.channel.ChannelHandlerContext)*/(ChannelHandlerContext ctx) throws Exception {TCPServerHandler.LOGGER.info(“super.channelReadComplete(ChannelHandlerContext ctx)”);/** 由readComplete方法负责检查数据是否接收完了。* 和之前的文章一样,我们检查整个内容中是否有“over”关键字* */StringBuffer content = ctx.attr(TCPServerHandler.content).get();//如果条件成立说明还没有接收到完整客户端信息if(content.indexOf(“over”) == -1) {return;}//当接收到信息后,首先要做的的是清空原来的历史信息ctx.attr(TCPServerHandler.content).set(new StringBuffer());//准备向客户端发送响应ByteBuf byteBuf = ctx.alloc().buffer(1024);byteBuf.writeBytes(“回发响应信息!”.getBytes());ctx.writeAndFlush(byteBuf);/** 关闭,正常终止这个通道上下文,就可以关闭通道了* (如果不关闭,这个通道的回话将一直存在,只要网络是稳定的,服务器就可以随时通过这个回话向客户端发送信息)。* 关闭通道意味着TCP将正常断开,其中所有的* handler、ChannelHandlerContext、ChannelPipeline、Attribute等信息都将注销* */ctx.close();}/* (non-Javadoc)* @see io.netty.channel.ChannelInboundHandlerAdapter#userEventTriggered(io.netty.channel.ChannelHandlerContext, java.lang.Object)*/(ChannelHandlerContext ctx, Object evt) throws Exception {TCPServerHandler.LOGGER.info(“super.userEventTriggered(ctx, evt)”);}/* (non-Javadoc)* @see io.netty.channel.ChannelInboundHandlerAdapter#channelWritabilityChanged(io.netty.channel.ChannelHandlerContext)*/(ChannelHandlerContext ctx) throws Exception {TCPServerHandler.LOGGER.info(“super.channelWritabilityChanged(ctx)”);}/* (non-Javadoc)* @see io.netty.channel.ChannelInboundHandlerAdapter#exceptionCaught(io.netty.channel.ChannelHandlerContext, java.lang.Throwable)*/(ChannelHandlerContext ctx, Throwable cause) throws Exception {TCPServerHandler.LOGGER.info(“super.exceptionCaught(ctx, cause)”);}/* (non-Javadoc)* @see io.netty.channel.ChannelHandlerAdapter#handlerAdded(io.netty.channel.ChannelHandlerContext)*/(ChannelHandlerContext ctx) throws Exception {TCPServerHandler.LOGGER.info(“super.handlerAdded(ctx)”);}/* (non-Javadoc)* @see io.netty.channel.ChannelHandlerAdapter#handlerRemoved(io.netty.channel.ChannelHandlerContext)*/(ChannelHandlerContext ctx) throws Exception {TCPServerHandler.LOGGER.info(“super.handlerRemoved(ctx)”);}}去了不同的地方,看了不同的风景,知道了不同的事,

架构设计:系统间通信(6)

相关文章:

你感兴趣的文章:

标签云: