HornetQ之JMS2.0 (实例讲解)

前言:

在2013年4月终于迎来了新的JMS规范-JMS2.0,这是第一次对JMS规范进行更新从2002年发布的JMS1.1版本.我们也许会认为JMS这么久以来从来没更新是否是因为已经停止发展或者被废弃不用.但是,如果你从另外一个叫角度来分析,JMS这个规范存在很多不同的实现版本来看,就充分说明JMS是一个非常成功的API规范。

在JMS2.0规范中,主要包括两方面的重大改进。其一是:更方便的使用API,再则是:引入了许多新的消息特性。JMS2.0是JAVAEE7平台的一部分,它不但可以被用于到JAVAEEWeb或者EJB应用程序重也可以被用于J2SE环境中.

接下来我们就来看看新特性和更方便的API.

简单API:

为了下面的讲解做准备,首先创建一个queue和一个topic在hornetq-jms.xml中

<queue><entry/></queue><topic><entry/></topic >

合并Connection和session

在老的API中要生产/消费一个消息必须经过这么几步,获取connectionFactory->获取queue/topic->创建Connection->创建session->生产/消费消息.在JMS2.0中提供了更简单的API,将创建connection和创建session合并成了一个对象JMSContext,对应的MessageProducer/MessageConsume分别用JMSProcedure/JMSConsumer替代.对参数封装上也相对于JMS1.1更易于理解,在老版本中JMS的事务和ACK模式都是在创建session时显示声明的,并且两个参数很容易误导开发者:Sessionsession=conn.createSession(false,Session.AUTO_ACKNOWLEDGE);第一个参数代表是启用事务,第二个参数代表ACK模式。如果启用事务第二参数将被忽略但是第二个参数又是必须的,这点就容易误导后来的程序维护工程师.在JMS2.0API中对这个特性进行了整改通过builder模式。

Context cnx = new InitialContext();ConnectionFactory cf = (ConnectionFactory) cnx.lookup(“/ConnectionFactory”);Queue queue = (Queue) cnx.lookup(“/queue/test1”);try(JMSContext jmsContext = cf.createContext(JMSContext.AUTO_ACKNOWLEDGE);) {jmsContext.createProducer().setDeliveryDelay(1000).setDeliveryMode(DeliveryMode.NON_PERSISTENT).send(queue, “content1”);System.out.println(“Continue…………..”);} catch (JMSRuntimeException jmse) {jmse.printStackTrace();}

直接在创建JMSContext的时声明事务模式和ACK模式,两者选其一。

public static final int AUTO_ACKNOWLEDGE = 1;public static final int CLIENT_ACKNOWLEDGE = 2;public static final int DUPS_OK_ACKNOWLEDGE = 3;public static final int SESSION_TRANSACTED = 0;

利用JDK7的closeable接口实现自动关闭,观察上面例子我们并没有类似于JMS1.1API那样去关闭Connection关闭session,什么时候关闭的呢?JMSContext利用了JDK1.7的新特性它继承至AutoCloseable接口,当块代码结束的时候自动调用对象的close()方法进行连接关闭.

更方便进行异步消费消息,在JMS1.1中需要手动调用connection.start()方法去开启消费进程。在新的API中默认自动就启动了,不需要显示的启动。

Context cnx;JMSContext jmsContext =null;try {cnx = new InitialContext();ConnectionFactory cf = (ConnectionFactory) cnx.lookup(“/ConnectionFactory”);Queue queue = (Queue) cnx.lookup(“/queue/test1”);jmsContext = cf.createContext();//jmsContext.setAutoStart(true);JMSConsumer jmsConsumer = jmsContext.createConsumer(queue);jmsConsumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {System.out.println(Message.class.getName());}});CountDownLatch latch = new CountDownLatch(1);try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}} catch (NamingException e) {e.printStackTrace();} catch (JMSRuntimeException jmse) {jmse.printStackTrace();}finally{if(jmsContext!=null){jmsContext.close();}}

消息payload自动解析通过泛型,在老版本中需要进行手动的类型转换来获取到真实的消息内容.第一个步骤需要将Message转化为对应的TextMessage/ByteMessage/MapMessage/StreamMessage/ObjectMessage.在新的API中除了StreamMessage不能自动检索外,其他类型的message都可以通过方便的API直接获取到消息内容不必经过多次的强制类型转化。

Context cnx = null;ConnectionFactory cf = null;Queue queue = null;try {cnx = new InitialContext();cf = (ConnectionFactory) cnx.lookup(“/ConnectionFactory”);queue = (Queue) cnx.lookup(“/queue/test1”);} catch (Exception e) {}try (JMSContext jmsContext = cf.createContext(JMSContext.AUTO_ACKNOWLEDGE);) {// Delay 3 seconds to devlivery .setDeliveryDelay(3000)jmsContext.createProducer().setDeliveryDelay(DeliveryMode.PERSISTENT).send(queue, “content@”).send(queue, “content@1”).send(queue, “content@2”);System.out.println(“Start receive…………..”);JMSConsumer jmsConsumer = jmsContext.createConsumer(queue);String msgBody = jmsConsumer.receiveBody(String.class);System.out.println(msgBody);Message msg1 = jmsConsumer.receive();System.out.println(msg1.getBody(String.class));Message msg2 = jmsConsumer.receive();System.out.println(msg2.getBody(String.class));} catch (JMSRuntimeException jmse) {jmse.printStackTrace();} catch (JMSException e) {e.printStackTrace();}

新特性:

允许多个订阅者在同一个TopicSubscription,想象现在消息的吞吐量很大,在老的版本中我们一个消息只能绑定到一个订阅者。但是消息两很大希望能做到负载均衡类似于Apache。JMS2.0提供这个功能,支持在多个虚拟机中共享一个消息。新建四个订阅者在四个(两组)不同的VM中.

第一组:

Client1-S1:

相信成功的信念比成功本身更重要,

HornetQ之JMS2.0 (实例讲解)

相关文章:

你感兴趣的文章:

标签云: