[置顶] 集群RPC通信

RPC即远程过程调用,它的提出旨在消除通信细节、屏蔽繁杂且易错的底层网络通信操作,像调用本地服务一般地调用远程服务,让业务开发者更多关注业务开发而不必考虑网络、硬件、系统的异构复杂环境。

先看看集群中RPC的整个通信过程,假设从节点node1开始一个RPC调用,①先将待传递的数据放到NIO集群通信框架(这里使用的是tribes框架)中;②由于使用的是NIO模式,线程无需阻塞直接返回;③由于与集群其他节点通信需要花销若干时间,为了提高CPU使用率当前线程应该放弃CPU的使用权进行等待操作;④NIO集群通信框架tribes接收到node2节点的响应消息,并将消息封装成Response对象保存至响应数组;⑤tribes接收到node4节点的响应消息,由于是使用了并行通信,所以node4可能比node3先返回消息,并将消息封装成Response对象保存至响应数组;⑥tribes最后接收到node3节点的响应消息,并将消息封装成Response对象保存至响应数组;⑦现在所有节点的响应都已经收集完毕,是时候通知刚刚被阻塞的那条线程了,原来的线程被notify醒后拿到所有节点的响应Response[]进行处理,至此完成了整个集群RPC过程。

上面整个过程是在只有一条线程的情况下,一切看起来没什么问题,但如果有多条线程并发调用则会导致一个问题:线程与响应的对应关系将被打乱,无法确定哪个线程对应哪几个响应。因为NIO通信框架不会每个线程都独自使用一个socket通道,为提高性能一般都是使用长连接,所有线程公用一个socket通道,这时就算线程一比线程二先放入tribes也不能保证响应一比响应二先接收到,所以接收到响应一后不知道该通知线程一还是线程二。只有解决了这个问题才能保证RPC调用的正确性。

要解决线程与响应对应的问题就需要维护一个线程响应关系列表,响应从关系列表中就能查找对应的线程,如下图,在发送之前生成一个UUID标识,此标识要保证同socket中唯一,再把UUID与线程对象关系对应起来,可使用Map数据结构实现,UUID的值作为key,线程对应的锁对象为value。接着制定一个协议报文,UUID作为报文的其中一部分,报文发往另一个节点node2后将响应信息message放入报文中并返回,node1对接收到的报文进行解包根据UUID去查找并唤起对应的线程,告诉它“你要的消息已经收到,往下处理吧”。但在集群环境下,我们更希望是集群中所有节点的消息都接收到了才往下处理,如下图下半部分,一个UUID1的请求报文会发往node2、node3和node4三个节点,这时假如只接收到一个响应则不唤起线程,直到node2、node3对应UUID1的响应报文都接收到后才唤起对应线程往下执行。同样地,UUID2、UUID3的报文消息都是如此处理,最后集群中对应的响应都能正确回到各自线程上。

用简单代码实现一个RPC例子,选择一个集群通信框架负责底层通信,这里使用tribes,接着往下:

①定义一个RPC接口,这些方法是预留提供给上层具体逻辑处理的入口,replyRequest方法用于处理响应逻辑,leftOver方法用于残留请求的逻辑处理。

publicinterfaceRpcCallback{

publicSerializablereplyRequest(Serializablemsg,Membersender);

publicvoidleftOver(Serializablemsg,Membersender);

}

②定义通信消息协议,实现Externalizable接口自定义序列化和反序列化,message用于存放响应消息,uuid标识用于关联线程,rpcId用于标识RPC实例,reply表示是否回复。

publicclassRpcMessageimplementsExternalizable{

protectedSerializablemessage;

protectedbyte[]uuid;

protectedbyte[]rpcId;

protectedbooleanreply=false;

publicRpcMessage(){

}

publicRpcMessage(byte[]rpcId,byte[]uuid,Serializablemessage){

this.rpcId=rpcId;

this.uuid=uuid;

this.message=message;

}

@Override

publicvoidreadExternal(ObjectInputin)throwsIOException,ClassNotFoundException{

reply=in.readBoolean();

intlength=in.readInt();

uuid=newbyte[length];

in.readFully(uuid);

length=in.readInt();

rpcId=newbyte[length];

in.readFully(rpcId);

message=(Serializable)in.readObject();

}

@Override

publicvoidwriteExternal(ObjectOutputout)throwsIOException{

out.writeBoolean(reply);

out.writeInt(uuid.length);

out.write(uuid,0,uuid.length);

out.writeInt(rpcId.length);

out.write(rpcId,0,rpcId.length);

out.writeObject(message);

}

}

③响应类型,提供多种唤起线程的条件,一共四种类型,分别表示接收到第一个响应就唤起线程、接收到集群中大多数节点的响应就唤起线程、接收到集群中所有节点的响应才唤起线程、无需等待响应的无响应模式。

publicclassRpcResponseType{

publicstaticfinalintFIRST_REPLY=1;

publicstaticfinalintMAJORITY_REPLY=2;

publicstaticfinalintALL_REPLY=3;

publicstaticfinalintNO_REPLY=4;

}

④响应对象,用于封装接收到的消息,Member在通信框架tribes是节点的抽象,这里用来表示来源节点。

publicclassRpcResponse{

privateMembersource;

privateSerializablemessage;

publicRpcResponse(){

}

publicRpcResponse(Membersource,Serializablemessage){

this.source=source;

this.message=message;

}

publicvoidsetSource(Membersource){

this.source=source;

}

publicvoidsetMessage(Serializablemessage){

this.message=message;

}

publicMembergetSource(){

returnsource;

}

publicSerializablegetMessage(){

returnmessage;

}

}

⑤RPC响应集,用于存放同个UUID的所有响应。

publicclassRpcCollector{

publicArrayList<RpcResponse>responses=newArrayList<RpcResponse>();

publicbyte[]key;

publicintoptions;

publicintdestcnt;

publicRpcCollector(byte[]key,intoptions,intdestcnt){

this.key=key;

this.options=options;

this.destcnt=destcnt;

}

publicvoidaddResponse(Serializablemessage,Membersender){

RpcResponseresp=newRpcResponse(sender,message);

responses.add(resp);

}

publicbooleanisComplete(){

if(destcnt<=0)returntrue;

switch(options){

caseRpcResponseType.ALL_REPLY:

returndestcnt==responses.size();

caseRpcResponseType.MAJORITY_REPLY:

{

floatperc=((float)responses.size())/((float)destcnt);

returnperc>=0.50f;

}

caseRpcResponseType.FIRST_REPLY:

returnresponses.size()>0;

default:

returnfalse;

}

}

publicRpcResponse[]getResponses(){

returnresponses.toArray(newRpcResponse[responses.size()]);

}

}

大海,别为森林的渺小而沮丧,

[置顶] 集群RPC通信

相关文章:

你感兴趣的文章:

标签云: