ActiveMQ:JMS开源框架入门介绍

  介绍基本的JMS概念与开源的JMS框架ActiveMQ应用,内容涵盖一下几点:

    基本的JMS概念   JMS的消息模式   介绍ActiveMQ   一个基于ActiveMQ的JMS例子程序

  一:JMS基本概念

  1.JMS的目标

  为企业级的应用提供一种智能的消息系统,JMS定义了一整套的企业级的消息概念与工具,尽可能最小化的Java语言概念去构建最大化企业消息应用。统一已经存在的企业级消息系统功能。

  2.提供者

  JMS提供者是指那些完全完成JMS功能与管理功能的JMS消息厂商,理论上JMS提供者完成。

  JMS消息产品必须是100%的纯Java语言实现,可以运行在跨平台的架构与操作系统上,当前一些JMS厂商包括IBM,Oracle, JBoss社区 (JBoss Community), Apache 社区(ApacheCommunity)。

  3.JMS应用程序,一个完整的JMS应用应该实现以下功能:

  JMS 客户端 – Java语言开发的接受与发送消息的程序   非JMS客户端 – 基于消息系统的本地API实现而不是JMS   消息 – 应用程序用来相互交流信息的载体   被管理对象–预先配置的JMS对象,JMS管理员创建,被客户端运用。如链接工厂,主题等   JMS提供者–完成JMS功能与管理功能的消息系统

  二:JMS的消息模式

  1.点对点的消息模式(Point to Point Messaging)

    

  下面的JMS对象在点对点消息模式中是必须的:

  a.队列(Queue) – 一个提供者命名的队列对象,客户端将会使用这个命名的队列对象

  b.队列链接工厂(QueueConnectionFactory) – 客户端使用队列链接工厂创建链接队列

  ConnectionQueue来取得与JMS点对点消息提供者的链接。

  c.链接队列(ConnectionQueue) – 一个活动的链接队列存在在客户端与点对点消息提供者之间,客户用它创建一个或者多个JMS队列会话(QueueSession)

  d.队列会话(QueueSession) – 用来创建队列消息的发送者与接受者(QueueSenderand QueueReceiver)

  e.消息发送者(QueueSender 或者MessageProducer)– 发送消息到已经声明的队列

  f.消息接受者(QueueReceiver或者MessageConsumer) – 接受已经被发送到指定队列的消息

  2.发布订阅模式(publish – subscribe Mode)

    

  a.主题Topic(Destination) – 一个提供者命名的主题对象,客户端将会使用这个命名的主题对象

  b.主题链接工厂(TopciConnectionFactory) – 客户端使用主题链接工厂创建链接主题

  ConnectionTopic来取得与JMS消息Pub/Sub提供者的链接。

  c.链接主题(ConnectionTopic) – 一个活动的链接主题存在发布者与订阅者之间

  d.会话(TopicSession) – 用来创建主题消息的发布者与订阅者 (TopicPublisher and TopicSubscribers)

  e.消息发送者MessageProducer) – 发送消息到已经声明的主题

  f.消息接受者(MessageConsumer) – 接受已经被发送到指定主题的消息

  三:介绍ActiveMQ

  ActiveMQ是apache社区完成的JMS开源消息组件,客户端支持多种语言调用,包括Java,C++, C#,

  Perl, Python等。支持Spring配置集成等。更多信息访问这里:

  四:基于ActiveMQ的Publish/subscribe模式Demo程序

  消息Broker,JMSprovider

  importjava.net.URI;   importjava.net.URISyntaxException;   importjavax.jms.Connection;   importjavax.jms.ConnectionFactory;   importjavax.jms.Destination;   importjavax.jms.JMSException;   importjavax.jms.MessageProducer;   importjavax.jms.Session;   importjavax.jms.TextMessage;   importjavax.naming.Context;   importjavax.naming.InitialContext;   importjavax.naming.NamingException;   importorg.apache.activemq.broker.BrokerFactory;   importorg.apache.activemq.broker.BrokerService;   importorg.apache.commons.logging.Log;   importorg.apache.commons.logging.LogFactory;   /**   *referto;  *;  *@authorgloomyfish   *   */  publicclassPureJMSProducer{      privatestaticfinalLogLOG=LogFactory.getLog(PureJMSProducer.class);   privatePureJMSProducer(){   }   /**   *@paramargsthedestinationnametosendtoandoptionally,thenumberof   *messagestosend   */  publicstaticvoidmain(String[]args){   ContextjndiContext=null;   ConnectionFactoryconnectionFactory=null;   Connectionconnection=null;   Sessionsession=null;   Destinationdestination=null;   MessageProducerproducer=null;   BrokerServicebroker=null;   finalintnumMsgs=10;   /*   *CreateaJNDIAPIInitialContextobject   */  try{   jndiContext=newInitialContext();   }catch(NamingExceptione){   LOG.info("CouldnotcreateJNDIAPIcontext:"+e.toString());   System.exit(1);   }      //createexternalTCPbroker   try{   broker=BrokerFactory.createBroker(newURI("broker:tcp://localhost:61616"));   broker.start();   }catch(URISyntaxExceptione){   LOG.info("Couldnotcreatebroker:"+e.toString());   }catch(Exceptione){   LOG.info("Couldnotcreatebroker:"+e.toString());   }   //try{   //   //}   /*   *Lookupconnectionfactoryanddestination.   */  try{   connectionFactory=(ConnectionFactory)jndiContext.lookup("ConnectionFactory");   destination=(Destination)jndiContext.lookup("MyTopic");   }catch(NamingExceptione){   LOG.info("JNDIAPIlookupfailed:"+e);   System.exit(1);   }      /*   *Createconnection.Createsessionfromconnection;falsemeans   *sessionisnottransacted.Createsenderandtextmessage.Send   *messages,varyingtextslightly.Sendend-of-messagesmessage.   *Finally,closeconnection.   */  try{   connection=connectionFactory.createConnection();   session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);   producer=session.createProducer(destination);   TextMessagemessage=session.createTextMessage();   Thread.sleep(3000);   for(inti=0;i<numMsgs;i++){   message.setText("Thisismessage"+(i+1));   LOG.info("Sendingmessage:"+message.getText());   producer.send(message);   Thread.sleep(3000);   }   /*   *Sendanon-textcontrolmessageindicatingendofmessages.   */  producer.send(session.createMessage());   }catch(JMSExceptione){   LOG.info("Exceptionoccurred:"+e);   }catch(InterruptedExceptione){   LOG.info("Exceptionoccurred:"+e);   }finally{   if(connection!=null){   try{   connection.close();   }catch(JMSExceptione){   }   }   }      //stoptheTCPbroker   try{   broker.stop();   }catch(Exceptione){   LOG.info("stopthebrokerfailed:"+e);   }   }   }

  客户端:

  importjava.io.IOException;   importjavax.jms.Connection;   importjavax.jms.JMSException;   importjavax.jms.Message;   importjavax.jms.MessageConsumer;   importjavax.jms.MessageListener;   importjavax.jms.Session;   importjavax.jms.TextMessage;   importjavax.jms.Topic;   importjavax.naming.InitialContext;   importorg.apache.activemq.ActiveMQConnectionFactory;   publicclassActiveMQClient{      publicstaticvoidmain(String[]args)throwsIOException{      //–  try{   ActiveMQConnectionFactoryfactory=newActiveMQConnectionFactory("tcp://localhost:61616");   //ActiveMQConnectionFactoryfactory=newActiveMQConnectionFactory("vm://locahost");   Connectionconnection=factory.createConnection();   connection.start();      //createmessagetopic   //Topictopic=newActiveMQTopic("MyTopic");   InitialContextjndiContext=newInitialContext();   Topictopic=(Topic)jndiContext.lookup("MyTopic");   Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);      //registermessageconsumer   MessageConsumercomsumer1=session.createConsumer(topic);   comsumer1.setMessageListener(newMessageListener(){   publicvoidonMessage(Messagem){   try{   System.out.println("Consumerget"+((TextMessage)m).getText());   }catch(JMSExceptione){   e.printStackTrace();   }   }   });   Thread.sleep(30000);   session.close();   connection.stop();      }catch(Exceptione){   e.printStackTrace();   }   }   }

  项目配置,Jar依赖:

  

  依赖的三个Jar分别为:

  activemq-all.jar   geronimo-jms_1.1_spec-1.1.1.jar   xbean-spring.jar 累死累活不说,走马观花反而少了真实体验,

ActiveMQ:JMS开源框架入门介绍

相关文章:

你感兴趣的文章:

标签云: