用Netty解析Redis网络协议

用Netty解析Redis网络协议

根据Redis官方文档的介绍,学习了一下Redis网络通信协议。然后偶然在GitHub上发现了个用Netty实现的Redis服务器,很有趣,于是就动手实现了一下!

1.RESP协议

Redis的客户端与服务端采用一种叫做 RESP(REdis Serialization Protocol)的网络通信协议交换数据。RESP的设计权衡了实现简单、解析快速、人类可读这三个因素。Redis客户端通过RESP序列化整数、字符串、数据等数据类型,发送字符串数组表示参数的命令到服务端。服务端根据不同的请求命令响应不同的数据类型。除了管道和订阅外,Redis客户端和服务端都是以这种简单的请求-响应模型通信的。

具体来看,RESP支持五种数据类型。以”*”消息头标识总长度,消息内部还可能有”$”标识字符串长度,每行以\r\n结束:

例如,观察下面命令对应的RESP,这一组set/get也正是我们要在Netty里实现的:

set name helloworld->*3set\r\n$4\r\nnamehelloworld\r\n<-:1\r\nget name->*2\r\n$3\r\ngetname\r\n<-$10\r\nhelloworld\r\nset name abc111->*3set\r\n$4\r\nnameabc111\r\n<-:0\r\nget age->*2\r\n$3\r\ngetage\r\n<-:-1\r\n2.用Netty解析协议

下面就用高性能的网络通信框架Netty实现一个简单的Redis服务器后端,解析set和get命令,并保存键值对。

2.1 Netty版本

Netty版本,5.0还处于alpha,使用Final版里最新的。但即便是4.0.25.Final竟然也跟4.0的前几个版本有些不同,网上一些例子中用的API根本就找不到了。Netty的API改得有点太“任性”了吧?:)

>netty-all>2.2 启动服务

Netty服务器启动代码,这套代码应该是Netty 4里的标准模板了,具体细节就不在本文赘述了。主要关注我们注册的几个Handler。Netty中Handler分为Inbound和Outbound,RedisCommandDecoder和RedisCommandHandler是Inbound,RedisCommandDecoder是Outbound:

{(String[] args) throws Exception {new Main().start(6379);}(int port) throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap().group(group).channel(NioServerSocketChannel.class).localAddress(port).childHandler(new ChannelInitializer<SocketChannel>() {(SocketChannel ch) throws Exception {ch.pipeline().addLast(new RedisCommandDecoder()).addLast(new RedisReplyEncoder()).addLast(new RedisCommandHandler());}});// Bind and start to accept incoming connections.ChannelFuture f = b.bind(port).sync();// Wait until the server socket is closed.f.channel().closeFuture().sync();} finally {// Shutdown the EventLoopGroup, which releases all resources.group.shutdownGracefully();}}}2.3 协议解析

RedisCommandDecoder开始时cmds是null,进入doDecodeNumOfArgs先解析出命令和参数的个数,并初始化cmds。之后就会进入doDecodeArgs逐一解析命令名和参数了。当最后完成时,会根据解析结果创建出RedisCommand对象,并加入到out列表里。这样下一个handler就能继续处理了。

<[][] cmds;arg;(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {if (cmds == null) {if (in.readByte() == ‘*’) {doDecodeNumOfArgs(in);}} else {doDecodeArgs(in);}if (isComplete()) {doSendCmdToHandler(out);doCleanUp();}}(ByteBuf in) {// Ignore negative caseint numOfArgs = readInt(in);System.out.println(“RedisCommandDecoder NumOfArgs: ” + numOfArgs);cmds = new byte[numOfArgs][];checkpoint();}(ByteBuf in) {for (int i = arg; i < cmds.length; i++) {if (in.readByte() == ‘$’) {int lenOfBulkStr = readInt(in);System.out.println(“RedisCommandDecoder LenOfBulkStr[” + i + “]: ” + lenOfBulkStr);cmds[i] = new byte[lenOfBulkStr];in.readBytes(cmds[i]);// Skip CRLF(\r\n)in.skipBytes(2);arg++;checkpoint();} else {throw new IllegalStateException(“Invalid argument”);}}}/*** cmds != null means header decode complete* arg > 0 means arguments decode has begun* arg == cmds.length means complete!*/() {return (cmds != null)&& (arg > 0)&& (arg == cmds.length);}(List<Object> out) {System.out.println(“RedisCommandDecoder: Send command to next handler”);if (cmds.length == 2) {out.add(new RedisCommand(new String(cmds[0]), cmds[1]));} else if (cmds.length == 3) {out.add(new RedisCommand(new String(cmds[0]), cmds[1], cmds[2]));} else {throw new IllegalStateException(“Unknown command”);}}() {this.cmds = null;this.arg = 0;}(ByteBuf in) {int integer = 0;char c;while ((c = (char) in.readByte()) != ‘\r’) {integer = (integer * 10) + (c – ‘0’);}if (in.readByte() != ‘\n’) {throw new IllegalStateException(“Invalid number”);}return integer;}}穿越茫茫人海,寻找属于我们的那一份宁静。

用Netty解析Redis网络协议

相关文章:

你感兴趣的文章:

标签云: