介绍基本的JMS概念与开源的JMS框架ActiveMQ应用,内容涵盖一下几点:
基本的JMS概念 JMS的消息模式 介绍ActiveMQ 一个基于ActiveMQ的JMS例子程序
一:JMS基本概念
1.JMS的目标
为企业级的应用提供一种智能的消息系统,JMS定义了一整套的企业级的消息概念与工具,尽可能最小化的Java语言概念去构建最大化企业消息应用。统一已经存在的企业级消息系统功能。
2.提供者
JMS提供者是指那些完全完成JMS功能与管理功能的JMS消息厂商,理论上JMS提供者完成。
JMS消息产品必须是100%的纯Java语言实现,可以运行在跨平台的架构与操作系统上,当前一些JMS厂商包括IBM,Oracle, JBoss社区 (JBoss Community), Apache 社区(ApacheCommunity)。
3.JMS应用程序,一个完整的JMS应用应该实现以下功能:
JMS 客户端 – Java语言开发的接受与发送消息的程序 非JMS客户端 – 基于消息系统的本地API实现而不是JMS 消息 – 应用程序用来相互交流信息的载体 被管理对象–预先配置的JMS对象,JMS管理员创建,被客户端运用。如链接工厂,主题等 JMS提供者–完成JMS功能与管理功能的消息系统
二:JMS的消息模式
1.点对点的消息模式(Point to Point Messaging)
下面的JMS对象在点对点消息模式中是必须的:
a.队列(Queue) – 一个提供者命名的队列对象,客户端将会使用这个命名的队列对象
b.队列链接工厂(QueueConnectionFactory) – 客户端使用队列链接工厂创建链接队列
ConnectionQueue来取得与JMS点对点消息提供者的链接。
c.链接队列(ConnectionQueue) – 一个活动的链接队列存在在客户端与点对点消息提供者之间,客户用它创建一个或者多个JMS队列会话(QueueSession)
d.队列会话(QueueSession) – 用来创建队列消息的发送者与接受者(QueueSenderand QueueReceiver)
e.消息发送者(QueueSender 或者MessageProducer)– 发送消息到已经声明的队列
f.消息接受者(QueueReceiver或者MessageConsumer) – 接受已经被发送到指定队列的消息
2.发布订阅模式(publish – subscribe Mode)
a.主题Topic(Destination) – 一个提供者命名的主题对象,客户端将会使用这个命名的主题对象
b.主题链接工厂(TopciConnectionFactory) – 客户端使用主题链接工厂创建链接主题
ConnectionTopic来取得与JMS消息Pub/Sub提供者的链接。
c.链接主题(ConnectionTopic) – 一个活动的链接主题存在发布者与订阅者之间
d.会话(TopicSession) – 用来创建主题消息的发布者与订阅者 (TopicPublisher and TopicSubscribers)
e.消息发送者MessageProducer) – 发送消息到已经声明的主题
f.消息接受者(MessageConsumer) – 接受已经被发送到指定主题的消息
三:介绍ActiveMQ
ActiveMQ是apache社区完成的JMS开源消息组件,客户端支持多种语言调用,包括Java,C++, C#,
Perl, Python等。支持Spring配置集成等。更多信息访问这里:
四:基于ActiveMQ的Publish/subscribe模式Demo程序
消息Broker,JMSprovider
importjava.net.URI; importjava.net.URISyntaxException; importjavax.jms.Connection; importjavax.jms.ConnectionFactory; importjavax.jms.Destination; importjavax.jms.JMSException; importjavax.jms.MessageProducer; importjavax.jms.Session; importjavax.jms.TextMessage; importjavax.naming.Context; importjavax.naming.InitialContext; importjavax.naming.NamingException; importorg.apache.activemq.broker.BrokerFactory; importorg.apache.activemq.broker.BrokerService; importorg.apache.commons.logging.Log; importorg.apache.commons.logging.LogFactory; /** *referto; *; *@authorgloomyfish * */ publicclassPureJMSProducer{ privatestaticfinalLogLOG=LogFactory.getLog(PureJMSProducer.class); privatePureJMSProducer(){ } /** *@paramargsthedestinationnametosendtoandoptionally,thenumberof *messagestosend */ publicstaticvoidmain(String[]args){ ContextjndiContext=null; ConnectionFactoryconnectionFactory=null; Connectionconnection=null; Sessionsession=null; Destinationdestination=null; MessageProducerproducer=null; BrokerServicebroker=null; finalintnumMsgs=10; /* *CreateaJNDIAPIInitialContextobject */ try{ jndiContext=newInitialContext(); }catch(NamingExceptione){ LOG.info("CouldnotcreateJNDIAPIcontext:"+e.toString()); System.exit(1); } //createexternalTCPbroker try{ broker=BrokerFactory.createBroker(newURI("broker:tcp://localhost:61616")); broker.start(); }catch(URISyntaxExceptione){ LOG.info("Couldnotcreatebroker:"+e.toString()); }catch(Exceptione){ LOG.info("Couldnotcreatebroker:"+e.toString()); } //try{ // //} /* *Lookupconnectionfactoryanddestination. */ try{ connectionFactory=(ConnectionFactory)jndiContext.lookup("ConnectionFactory"); destination=(Destination)jndiContext.lookup("MyTopic"); }catch(NamingExceptione){ LOG.info("JNDIAPIlookupfailed:"+e); System.exit(1); } /* *Createconnection.Createsessionfromconnection;falsemeans *sessionisnottransacted.Createsenderandtextmessage.Send *messages,varyingtextslightly.Sendend-of-messagesmessage. *Finally,closeconnection. */ try{ connection=connectionFactory.createConnection(); session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE); producer=session.createProducer(destination); TextMessagemessage=session.createTextMessage(); Thread.sleep(3000); for(inti=0;i<numMsgs;i++){ message.setText("Thisismessage"+(i+1)); LOG.info("Sendingmessage:"+message.getText()); producer.send(message); Thread.sleep(3000); } /* *Sendanon-textcontrolmessageindicatingendofmessages. */ producer.send(session.createMessage()); }catch(JMSExceptione){ LOG.info("Exceptionoccurred:"+e); }catch(InterruptedExceptione){ LOG.info("Exceptionoccurred:"+e); }finally{ if(connection!=null){ try{ connection.close(); }catch(JMSExceptione){ } } } //stoptheTCPbroker try{ broker.stop(); }catch(Exceptione){ LOG.info("stopthebrokerfailed:"+e); } } }
客户端:
importjava.io.IOException; importjavax.jms.Connection; importjavax.jms.JMSException; importjavax.jms.Message; importjavax.jms.MessageConsumer; importjavax.jms.MessageListener; importjavax.jms.Session; importjavax.jms.TextMessage; importjavax.jms.Topic; importjavax.naming.InitialContext; importorg.apache.activemq.ActiveMQConnectionFactory; publicclassActiveMQClient{ publicstaticvoidmain(String[]args)throwsIOException{ //– try{ ActiveMQConnectionFactoryfactory=newActiveMQConnectionFactory("tcp://localhost:61616"); //ActiveMQConnectionFactoryfactory=newActiveMQConnectionFactory("vm://locahost"); Connectionconnection=factory.createConnection(); connection.start(); //createmessagetopic //Topictopic=newActiveMQTopic("MyTopic"); InitialContextjndiContext=newInitialContext(); Topictopic=(Topic)jndiContext.lookup("MyTopic"); Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE); //registermessageconsumer MessageConsumercomsumer1=session.createConsumer(topic); comsumer1.setMessageListener(newMessageListener(){ publicvoidonMessage(Messagem){ try{ System.out.println("Consumerget"+((TextMessage)m).getText()); }catch(JMSExceptione){ e.printStackTrace(); } } }); Thread.sleep(30000); session.close(); connection.stop(); }catch(Exceptione){ e.printStackTrace(); } } }
项目配置,Jar依赖:
依赖的三个Jar分别为:
activemq-all.jar geronimo-jms_1.1_spec-1.1.1.jar xbean-spring.jar 累死累活不说,走马观花反而少了真实体验,