HornetQ JMS 集群 之 (Static discovery)

序言:

一款好的中间件必然少不了高可用性和高吞吐量的特性了,当然这些特性在Hornetq里面也得到非常好的实现。并且支持服务器端和client集群方案,不错吧呵呵…大多数的JBOSS系列产品,都提供两种主要技术来实现动态集群(UDP和Jgroup).在Hornetq也提供另外一种静态配置集群连接器,在某些情况下UDP不能使用,使用静态发现同样可以实现集群功能。本文主要阐述Hornetqcluster的方案之一.也是局限于作者本地环境,因为在本地主机上只有一块物理网卡。所以我选择用静态发现。

无论是动态还是动态的方式搭建集群环境,都可以采用对称以及非对称的集群拓扑。什么是对称或者非对称拓扑?

对称:A<–>B<–>C<—>A,这种拓扑结构下所有的节点都能够发现其他集群节点。

非对称:A–>B–>C在这种拓扑结构下A节点是不能察觉到C节点也在集群环境下。这里节点A到节点C实际上是有两跳,在接下来的配置中可以制定集群的最大跳数。

前面一直提及一个概念就是静态发现其他集群节点的配置方式,什么是静态发现?简单而言就是显示的告诉A节点B节点和C节点在集群环境中。而不是通过广播数据包去探测有哪些节点加入到了集群节点中。当然这种方式属于服务器端集,稍后我们可以看看客户端集群怎么做得类似于这样的静态配置不过要简单许多。接下来一起看看怎么类配置和测试。

环境准备:

1,准备个hornetqsever,,我这里就准备了两个节点Node1和Node2.

2.更改启动脚本,启动cluster这个模块(实际上就是预先配置了一下默认设置,你也可以直接在其他模块上配置也一样)

进入bin目录:/apps/cluster_hornetq/node1/bin,编辑run.sh更改non-clustered成clustered.

if [ a”$1″ = a ]; then CONFIG_DIR=$HORNETQ_HOME/config/stand-alone/clustered; else CONFIG_DIR=”$1″; fi#if [ a”$1″ = a ]; then CONFIG_DIR=$HORNETQ_HOME/config/stand-alone/non-clustered; else CONFIG_DIR=”$1″; fi

配置集群环境:

主要编辑hornetq-configuration.xml.这里只配置Node1,Node2只需要改变一下端口和连接器名字就好了其他配置相同。

1.增加与Node2通信的connector.

2.告诉Node1Node2加入到了集群环境中.

就这样分分别配置好Node1和Node2.

Node1:JNDI-port:1099,Acceptor-port:5445

Node2:JNDI-port:1199,Acceptor-port:5465

启动集群:

只需要分别启动两个节点就好了

测试:

1.测试之前检查一下队列里面的消息,在这里方便测试。我把两个节点的消息全部清空了。

Node1:

Node2:

2.发送两个消息到Node1:

这里是用到的jndi.properteis,1099代表Node1,1199代表Node2.当然Node1是active的时候就会获取连接工厂从Node1通过JNDI

java.naming.factory.initial=org.jnp.interfaces.NamingContextFactoryjava.naming.provider.url=jnp://192.168.1.106:1099,jnp://192.168.1.106:1199#java.naming.provider.url=jnp://192.168.1.106:1199#java.naming.provider.url=jnp://192.168.1.106:1099

java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces

Java测试代码:

package cluster.staticdisc;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.DeliveryMode;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Queue;import javax.jms.Session;import javax.naming.Context;import javax.naming.InitialContext;import javax.naming.NamingException;import org.hornetq.api.core.TransportConfiguration;import org.hornetq.core.client.impl.DelegatingSession;import org.hornetq.jms.client.HornetQConnection;public class ProducerTest3 {/*** @param args* @throws NamingException* @throws JMSException* @throws InterruptedException*/public static void main(String[] args) throws NamingException,JMSException, InterruptedException {Context cnx = new InitialContext();ConnectionFactory cf = (ConnectionFactory) cnx.lookup(“/ConnectionFactory”);Queue queue = (Queue) cnx.lookup(“/queue/test1”);Connection initConnection = cf.createConnection();System.out.println(getServer(initConnection));Session session = initConnection.createSession();MessageProducer producer = session.createProducer(queue);producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);producer.send(session.createTextMessage(“Hello”));producer.send(session.createTextMessage(“Hello”));initConnection.close();}protected static int getServer(Connection connection) {DelegatingSession session = (DelegatingSession) ((HornetQConnection) connection).getInitialSession();TransportConfiguration transportConfiguration = session.getSessionFactory().getConnectorConfiguration();String port = (String) transportConfiguration.getParams().get(“port”);return Integer.valueOf(port);}}用爱生活,你会使自己幸福!用爱工作,你会使很多人幸福!

HornetQ JMS 集群 之 (Static discovery)

相关文章:

你感兴趣的文章:

标签云: