Hadoop RPC协议之 ProtobufRpcEngine

用ProtobufRpcEngine来实现一个简单的RPC协议

add_test.proto

package rpcTest.protobuf;option java_package = "rpcTest.protobuf";option java_outer_classname = "ClientProto";message AddParameters {optional int32 para1 =1;optional int32 para2 = 3;}message AddResult {optional int32 result =1;}AddService.proto

package rpcTest.protobuf;option java_package = "rpcTest.protobuf";option java_outer_classname = "Add";option java_generic_services = true;option java_generate_equals_and_hash = true;import "add_test.proto";service AddService {rpc add(AddParameters) returns (AddResult);}

进入原代码所在目录,使用以下命令生成java文件protoc –proto_path=./ –java_out ../../ ./add_test.protoprotoc –proto_path=./ –java_out ../../ ./AddService.proto

AddProtocol.java

package rpcTest.protobuf;public interface AddProtocol {public int add(int para1,int para2);}

AddImpl.java

package rpcTest.protobuf;public class AddImpl implements AddProtocol {@Overridepublic int add(int para1, int para2) {// TODO Auto-generated method stubreturn para1 + para2;}}

AddProtocolPB.java

package rpcTest.protobuf;import org.apache.hadoop.ipc.ProtocolInfo;import com.google.protobuf.RpcController;import rpcTest.protobuf.ClientProto.AddParameters;@ProtocolInfo(protocolName = "rpcTest.protobuf.AddProtocolPB", protocolVersion = 1)public interface AddProtocolPB {public ClientProto.AddResult add(RpcController controller,AddParameters p) ;}AddProtocolTranslatorPB.java

package rpcTest.protobuf;import rpcTest.protobuf.ClientProto.AddParameters;public class AddProtocolTranslatorPB implements AddProtocol {final private AddProtocolPB rpcProxy;public AddProtocolTranslatorPB(AddProtocolPB proxy) {this.rpcProxy = proxy;}@Overridepublic int add(int para1, int para2) {// TODO Auto-generated method stubAddParameters req = ClientProto.AddParameters.newBuilder().setPara1(para1).setPara2(para2).build();return rpcProxy.add(null, req).getResult();}}AddProtocolServerSidePB.java

package rpcTest.protobuf;import com.google.protobuf.RpcController;import rpcTest.protobuf.ClientProto.AddParameters;import rpcTest.protobuf.ClientProto.AddResult;public class AddProtocolServerSidePB implements AddProtocolPB,Add.AddService.BlockingInterface {final private AddProtocol server;public AddProtocolServerSidePB(AddProtocol server) {this.server = server;}@Overridepublic AddResult add(RpcController controller, AddParameters p) {// TODO Auto-generated method stubrpcTest.protobuf.ClientProto.AddResult.Builder builder = ClientProto.AddResult.newBuilder();int result = server.add(p.getPara1(), p.getPara2());builder.setResult(result);return builder.build();}}ProtoRpcClient.java

package rpcTest.protobuf;import java.io.IOException;import java.net.InetSocketAddress;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.ipc.ProtobufRpcEngine;import org.apache.hadoop.ipc.RPC;public class ProtoRpcClient {final static long VERSION = 1;public static void main(String[] args) throws IOException {Configuration conf = new Configuration();String ADDRESS = "localhost";int port = 9998;RPC.setProtocolEngine(conf, AddProtocolPB.class, ProtobufRpcEngine.class);AddProtocolPB proxy = RPC.getProxy(AddProtocolPB.class, VERSION, new InetSocketAddress(ADDRESS,port),conf);AddProtocol add = new AddProtocolTranslatorPB(proxy);int result = add.add(100, 200);System.out.println("client result:" + result);// int result = proxy.add(5, 6);// System.out.println(result);}}ProtoRpcServer.java

package rpcTest.protobuf;import java.io.IOException;import org.apache.hadoop.HadoopIllegalArgumentException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.ipc.ProtobufRpcEngine;import org.apache.hadoop.ipc.RPC;import com.google.protobuf.BlockingService;public class ProtoRpcServer {public static void main(String[] args) throws HadoopIllegalArgumentException, IOException{Configuration conf = new Configuration();String ADDRESS = "localhost";int port = 9998;RPC.setProtocolEngine(conf, AddProtocolPB.class, ProtobufRpcEngine.class);RPC.Server server = new RPC.Builder(conf).setProtocol(AddProtocolPB.class).setInstance((BlockingService)Add.AddService.newReflectiveBlockingService(new AddProtocolServerSidePB(new AddImpl()))).setBindAddress(ADDRESS).setPort(port).setNumHandlers(1).setVerbose(true).build();server.start();}}先执行ProtoRpcServer监听端口。

再执行ProtoRpcClient执行客户端程序。

曾经拥有的不要忘记,难以得到的更要珍惜,

Hadoop RPC协议之 ProtobufRpcEngine

相关文章:

你感兴趣的文章:

标签云: