Flex与Netty实现的WEB版IM(即时聊天)

Apache Flex是基于MXML和ActionScript的Flash程序设计框架,可以快速开发RIA(富Internet应用)程序,Netty是JAVA实现的高性能的网络通信框架,可以快速构建网络应用的服务端。即时通讯现在已经非常普遍了,本文以简单的WEB版聊天为例浅谈IM系统的设计与原理,最后再探讨下大规模集群下的服务端的瓶颈与解决思路。1、可用的方案选型(1)socket 基于浏览器的Flash插件的Socket/XMLSocket,可以采用Flex实现(2)websocket HTML5的websocket是非常新的技术,但需要浏览器的支持,面临的问题是老版本的浏览器不可用。(3)HTTP 基于AJAX的长轮询和基于HTTP流,开源的有Comet架构的Pushlet(4)JavaFX与Java Applet本文采用的是方案1,用Flex编写客户端应用,目前我还没有研究如何将Flash与JS的混合,而是完全采用的Flex技术。关于Flex的学习,我推荐到Apache上去找,也可以通过阅读《Flex实战》学习,我目前还对Flex了解很少,只能进行简单的编程,编写Flex程序前我只看了两天相关的资料,而且只用到了Socket(Flex中的一个知识点),主要还是ActionScript的学习,ActionScript与JavaScript很类似,能看懂JS的应该也能看懂AS,这里不过多描述。2、通讯协议设计在即时通讯中使用的协议很多,熟知的有Jabbe/XMPP/SIP等,还有很多私有的自定义协议。协议是双方协定的数据交互规则,具体采用什么样的协议要根据场景和需求,我为了实现通讯功能,简单地设计了一个规则,具体如下:

我的使用场景包括用户的登录、注销、获取通讯录和发送消息,对于应用场景复杂的应该要考虑更多的情况,根据我在工作中的经验,通讯协议不是一下就设计好的,而是在开发过程进行不断修改与完善,可以说没有协议的设计只能遵循具体的原则,没有最终版。我在工作中原本是基于XMPP开发的,由于做的是移动互联的应用,受限于移动网络的网速,而XMPP的协议过于庞大,对用户的流量需求太高,为此我开始寻找XMPP的替代品,我最开始选的方案是Google的ProtoBuf,类似的还有Apache的Thrift,这两者都是二进制级别的编码,虽然两者的压缩程度和性能都非常好,但在通讯协议方面不太适合,,因为编码后没有可读性,出了问题不好定位。后来我在工作选了JSON来设计,相比XML来说拓展性与性能都要好很多。3、客户端程序设计客户端采用的是Flex,使用的集成开发工具是IntelliJ IDEA,基于Apache Flex SDK根据通讯协议的设计进行客户端的开发,具体代码如下:<?xml version="1.0"?><s:Application xmlns:fx="" xmlns:s="library://ns.adobe.com/flex/spark"><fx:Script><![CDATA[import mx.controls.Alert;var socket:Socket = null;// 初始化连接public function conn():void{socket = new Socket();// 增加事件监听处理socket.addEventListener(Event.CLOSE,closeHandler);socket.addEventListener(Event.CONNECT,connectHandler);socket.addEventListener(ProgressEvent.SOCKET_DATA, socketDataHandler);// 建立连接socket.connect('127.0.0.1',8888);}// 监听关闭事件private function closeHandler(event:Event):void{trace("closeHandler: " + event);Alert.show('closed!');}// 监听连接成功事件private function connectHandler(event:Event):void{trace("connectHandler: " + event);// 注册用户setName();Alert.show('connected!');}// 处理接收消息private function socketDataHandler(event:ProgressEvent):void{var str:String = socket.readUTFBytes(socket.bytesAvailable);trace("receive data : " + str);// 沙箱处理if(str.indexOf("<?xml version=\&;1.0\&;?>") == 0){//Alert.show(str);} else if (str.indexOf("ROSTER:") == 0){this.roster.text = str; // 处理通讯录} else {this.content.appendText(str +"\n"); // 普通消息}}// 点击按钮发送消息,内容为输入框中的文本public function send():void{var message:String = this.messageField.text;trace("client send : " + message);socket.writeUTFBytes("MSG:" + this.receive.text + "#" + this.setname.text + ":" + message);socket.flush();// 设置对话框展示效果this.content.appendText(this.setname.text + ":" + this.messageField.text +"\n");this.messageField.text = '';}// 发送字符串函数,用户注册时使用private function sendMsg(str:String):void{trace("client send : " + str);socket.writeUTFBytes(str);socket.flush();}// 点击关闭public function close():void{trace("close the connect");var nickname:String = this.setname.text;// 根据注册的用户注销用户sendMsg("QUIT:" + nickname);// 关闭连接socket.close();}// 设置用户名,用于注册public function setName():void{var nickname:String = this.setname.text;sendMsg("AUTH:" + nickname);}]]></fx:Script><s:Label text="用户名:" x="10" y="10" /><s:TextInput x="50" y="0" width="100" height="31" id="setname"/><s:Button click="conn()" label="连接" x="160" y="0" width="60" height="31"/><s:Label text="接收者:" x="10" y="50"/><s:TextInput x="50" y="40" width="100" height="31" id="receive"/><s:TextInput x="160" y="40" width="200" height="31" id="messageField"/><s:Button click="send()" label="发送" x="370" y="40" width="60" height="31"/><s:Button click="close()" label="关闭" x="440" y="40" width="60" height="31"/><s:Label text="消息:" x="10" y="100"/><s:Label text="通讯录:" x="320" y="100"/><s:TextArea x="10" y="130" width="300" height="100" id="content"/><s:TextArea x="320" y="130" width="200" height="100" id="roster"/></s:Application>4、服务端设计服务端采用的是Netty,设计方案如下:

根据方案开发的主要代码如下:package org.jcluster.im.server;import org.jcluster.im.component.ComponentManager;import org.jcluster.im.component.InterpreterComponent;import org.jcluster.im.handler.ConnectionHandler;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;public class StartServer {private int port;public StartServer(int port) {this.port = port;}public void run() throws Exception {// Acceptor:threads default is availableProcessors * 2EventLoopGroup bossGroup = new NioEventLoopGroup(2);// HandlerEventLoopGroup workerGroup = new NioEventLoopGroup(4);try {ServerBootstrap server = new ServerBootstrap();ChannelHandler handler = new ChannelInitializer<SocketChannel>(){@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ConnectionHandler());}};server.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(handler).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);// Start the clientChannelFuture future = server.bind(port).sync();InterpreterComponent component = new InterpreterComponent();ComponentManager.getInstance().addComponent("test", component);System.out.println("IM Server start");// Wait until the connection is closedfuture.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {new StartServer(8888).run();}}package org.jcluster.im.handler;import java.net.SocketAddress;import java.nio.charset.Charset;import java.util.Iterator;import org.jcluster.im.session.LocalChannelManger;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.buffer.UnpooledByteBufAllocator;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.CharsetUtil;public class ConnectionHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {//SocketAddress address = ctx.channel().remoteAddress();//LocalChannelManger.getInstance().addContext(address.toString(), ctx);super.channelRegistered(ctx);}@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {LocalChannelManger.getInstance().removeContext(ctx);syncRoster();SocketAddress address = ctx.channel().remoteAddress();System.out.println(address.toString() + "channelUnregistered");int count = LocalChannelManger.getInstance().staticClients();System.out.println("current clients : " + count);}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("channelActive");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {ByteBuf in = (ByteBuf) msg;String message = in.toString(Charset.forName("UTF-8"));// Flash沙箱处理String xml = "<?xml version=\&;1.0\&;?><cross-domain-policy><site-control permitted-cross-domain-policies=\&;all\&;/><allow-access-from domain=\&;*\&; to-ports=\&;*\&;/></cross-domain-policy>\0";if(message.trim().equals("<policy-file-request/>")){ctx.writeAndFlush(Unpooled.copiedBuffer(xml,CharsetUtil.UTF_8));}if(message.startsWith("AUTH:")){String name = (message.split(":"))[1];LocalChannelManger.getInstance().addContext(name, ctx);int count = LocalChannelManger.getInstance().staticClients();System.out.println("current clients : " + count);syncRoster();} else if (message.startsWith("MSG:")){String content = message.substring(4);String[] temp = content.split("#");String to = temp[0];String body = "";for(int i=1;i<temp.length;i++){if(i > 1){body += "#";}body += temp[i];}if(LocalChannelManger.getInstance().isAvailable(to)){LocalChannelManger.getInstance().getContext(to).writeAndFlush(Unpooled.copiedBuffer(body,CharsetUtil.UTF_8));}} else if (message.startsWith("QUIT:")){String name = (message.split(":"))[1];LocalChannelManger.getInstance().removeContext(name);int count = LocalChannelManger.getInstance().staticClients();System.out.println("current clients : " + count);syncRoster();}System.out.println(message);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {//ctx.close();//System.out.println("server closed!");}// update all clients rosterprivate void syncRoster(){String respone = "ROSTER:";for(String s : LocalChannelManger.getInstance().getAll()){respone += s + ",";}Iterator<ChannelHandlerContext> it = LocalChannelManger.getInstance().getAllClient().iterator();while(it.hasNext()){it.next().writeAndFlush(Unpooled.copiedBuffer(respone,CharsetUtil.UTF_8));}}}

用户的会话管理设计:

而只有在充满了艰辛的人生旅途中,

Flex与Netty实现的WEB版IM(即时聊天)

相关文章:

你感兴趣的文章:

标签云: