在publish/subscribe模式中使用fanout类型有个缺陷,就是不能选择性接收的消息。
我们可以让consumer获得所有已发布的消息中指定的几个消息。
在之前的例子中我们这样绑定exchange和队列:
channel.queueBind(queueName,EXCHANGE_NAME,””);
暂且不论该代码中绑定的exchange类型,这里空着的参数就是routing key。
routing key的意义与exchange类型有关,比如使用fanout类型就会忽略掉routing key。
而解决这一问题的就是direct类型。
direct exchange并不复杂,只不过是producer和consumer双方的exchange对应时还需要对应routing key。
以下代码中,同一个exchange和两个队列进行绑定,两个队列分别和不同的binding key绑定。
(PS:当然,我们也可以将同一个routing key绑定给不同的队列也没有问题。)
另外,SERVERITY变量是rounting数组,,假设将日志通过exchange发送出去,consumer根据自己的需要获取不同级别的日志:
finalclassChannelFactory_{privatefinalstaticConnectionFactoryconnFactory=newConnectionFactory();publicfinalstaticStringEXCHANGE_NAME=”direct_exchange”;publicfinalstaticString[]SEVERITY={“info”,”warning”,”error”};static{Channeltemp=getChannel();try{temp.exchangeDeclare(EXCHANGE_NAME,ExchangeTypes.DIRECT);}catch(IOExceptione){e.printStackTrace();}}publicstaticChannelgetChannel(intchannelNumber){try{Connectionconnection=connFactory.newConnection();returnconnection.createChannel(channelNumber);}catch(IOExceptione){e.printStackTrace();}returnnull;}publicstaticChannelgetChannel(){try{Connectionconnection=connFactory.newConnection();returnconnection.createChannel();}catch(IOExceptione){e.printStackTrace();}returnnull;}publicstaticvoidcloseChannel(Channelchannel)throwsIOException{channel.close();channel.getConnection().close();}}
确认定义:
consumer只需要warning和error级别(routing)的日志消息:
publicstaticvoidmain(String[]args)throwsIOException,InterruptedException{Channelchannel=ChannelFactory_.getChannel();StringqueueName=channel.queueDeclare().getQueue();channel.queueBind(queueName,ChannelFactory_.EXCHANGE_NAME,”warning”);channel.queueBind(queueName,ChannelFactory_.EXCHANGE_NAME,”error”);QueueingConsumerconsumer=newQueueingConsumer(channel);channel.basicConsume(queueName,true,consumer);while(true){QueueingConsumer.Deliverydelivery=consumer.nextDelivery();Stringmessage=newString(delivery.getBody());StringroutingKey=delivery.getEnvelope().getRoutingKey();System.out.println(“[x]Received'”+routingKey+”‘:'”+message+”‘”);}}
接受失败也等于给了自己从零开始的机会,接受失败更是一种智者的宣言和呐喊;