基于信道进行消息可靠传输

程序的演示场景是:处理器发送命令,,接收者接收到消息后进行处理并且对发送方发送消息确认表明已经成功收到消息。如果没有发送确认则表明该消息没有被接收并正确处理。失败消息会到达死信箱,系统下次启动时后继续发送死信箱中的发送失败的消息。

1.创建信道回复命令对象

import com.center.akka.simple.command.Command;public class ChannelReply {private Command command; private long sequenceNbr ; public ChannelReply(Command command, long sequenceNbr ) {this. command = command;this. sequenceNbr = sequenceNbr ; } public Command getCommand() {return command; } public long getSequenceNbr() {return sequenceNbr; } @Override public String toString() {return "ChannelReply{" + "command=" + command + ", sequenceNbr=" + sequenceNbr + '}'; }}2.编写接收者对象import akka.actor.UntypedActor;import akka.event.Logging;import akka.event.LoggingAdapter;import akka.persistence.ConfirmablePersistent;import com.center.akka.persistent_channel.command.ChannelReply;import com.center.akka.simple.command.Command;public class Receiver extends UntypedActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); @Override public void onReceive(Object msg ) throws Exception {if (msg instanceof ConfirmablePersistent) {ConfirmablePersistent confirmablePersistent = (ConfirmablePersistent) msg;log.info("Incoming Paylod: " + confirmablePersistent.payload() + " #: " + confirmablePersistent.sequenceNr());getSender().tell( new ChannelReply((Command) confirmablePersistent.payload(), confirmablePersistent.sequenceNr()), null);confirmablePersistent. confirm();} }}3.编写处理器import java.util.concurrent.TimeUnit;import scala.concurrent.duration.Duration;import akka.actor.ActorRef;import akka.actor.UntypedActor;import akka.event.Logging;import akka.event.LoggingAdapter;import akka.persistence.Deliver;import akka.persistence.Persistent;import akka.persistence.PersistentChannel;import akka.persistence.PersistentChannelSettings;import com.center.akka.persistent_channel.command.ChannelReply;public class BaseProcessor extends UntypedActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); private ActorRef receiver; private ActorRef channel; public BaseProcessor(ActorRef receiver) {this. receiver = receiver;this. channel =getContext().actorOf(PersistentChannel.props(PersistentChannelSettings .create().withRedeliverInterval(Duration.create(30, TimeUnit.SECONDS )).withPendingConfirmationsMax(10000) // max # of pending confirmations. suspend// delivery until <.withPendingConfirmationsMin(2000) // min # of pending confirmation. suspend// delivery until >.withReplyPersistent( true) // ack.withRedeliverMax(15)) , "channel" ); } @Override public void onReceive(Object msg ) {if (msg instanceof Persistent) {log.info("Send to Channel: " + (( Persistent) msg).payload());channel.tell(Deliver.create (((Persistent) msg).withPayload(((Persistent ) msg ).payload()), receiver.path()), getSelf());} else if (msg instanceof ChannelReply) {log.info(msg.toString());} }}4.测试类import org.slf4j.Logger;import org.slf4j.LoggerFactory;import akka.actor.ActorRef;import akka.actor.ActorSystem;import akka.actor.Props;import akka.persistence.Persistent;import com.center.akka.persistent_channel.actor.BaseProcessor;import com.center.akka.persistent_channel.actor.Receiver;import com.center.akka.simple.command.Command;public class System { public static final Logger log = LoggerFactory.getLogger(System.class); public static void main(String… args) throws Exception {final ActorSystem actorSystem = ActorSystem.create("channel-system");Thread.sleep(2000);final ActorRef receiver = actorSystem.actorOf(Props.create(Receiver. class));final ActorRef processor = actorSystem.actorOf(Props.create(BaseProcessor. class, receiver), "channel-processor" );for ( int i = 0; i < 10; i++) {processor.tell(Persistent.create (new Command("CMD " + i )), null);}Thread.sleep(2000);log.debug( "Actor System Shutdown Starting…" );actorSystem.shutdown(); }}5.输出结果[INFO] [05/17/2015 18:29:46.699] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 0'}[INFO] [05/17/2015 18:29:46.702] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 1'}[INFO] [05/17/2015 18:29:46.702] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 2'}[INFO] [05/17/2015 18:29:46.702] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 3'}[INFO] [05/17/2015 18:29:46.702] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 4'}[INFO] [05/17/2015 18:29:46.703] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 5'}[INFO] [05/17/2015 18:29:46.703] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 6'}[INFO] [05/17/2015 18:29:46.703] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 7'}[INFO] [05/17/2015 18:29:46.703] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 8'}[INFO] [05/17/2015 18:29:46.703] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] Send to Channel: Command{data='CMD 9'}[INFO] [05/17/2015 18:29:46.965] [channel-system-akka.actor.default-dispatcher-2] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 0'} #: 1[INFO] [05/17/2015 18:29:46.986] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 0'}, sequenceNbr=1}[INFO] [05/17/2015 18:29:46.986] [channel-system-akka.actor.default-dispatcher-2] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 1'} #: 2[INFO] [05/17/2015 18:29:46.986] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 1'}, sequenceNbr=2}[INFO] [05/17/2015 18:29:46.986] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 2'} #: 3[INFO] [05/17/2015 18:29:46.987] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 2'}, sequenceNbr=3}[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 3'} #: 4[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-3] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 3'}, sequenceNbr=4}[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 4'} #: 5[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 5'} #: 6[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-11] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 4'}, sequenceNbr=5}[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 6'} #: 7[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-11] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 5'}, sequenceNbr=6}[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-11] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 6'}, sequenceNbr=7}[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 7'} #: 8[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-4] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 7'}, sequenceNbr=8}[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 8'} #: 9[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-4] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 8'}, sequenceNbr=9}[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-5] [akka://channel-system/user/$a] Incoming Paylod: Command{data='CMD 9'} #: 10[INFO] [05/17/2015 18:29:46.988] [channel-system-akka.actor.default-dispatcher-11] [akka://channel-system/user/channel-processor] ChannelReply{command=Command{data='CMD 9'}, sequenceNbr=10}18:29:48.699 [main] DEBUG c.c.a.persistent_channel.app.System – Actor System Shutdown Starting…

与一个赏心悦目的人错肩,真真实实的活着,也就够了。

基于信道进行消息可靠传输

相关文章:

你感兴趣的文章:

标签云: