Network connectors

本文为network connectors的static connector学习笔记。

Network connectors

broker网络能够创建多个相互连接的ActiveMq实例组成的簇,以应对更加复杂的消息场景。Network connectors提供了broker之间的通信。默认情况下,network connector是单向通道,它只会把收到的消息投递给与之建立连接的另一个broker。这通常称为forwarding bridge。ActiveMQ也支持双向通道,即duplex connector。下图是一个包含了这两者的复杂网络。

Network connector的XML配置如下:

<networkConnectors><networkConnector name="default-nc" uri="multicast://default"/></networkConnectors>一个重要的概念-discoverydiscovery:是一个检测远程broker服务的进程。client通常需要感知所有的broker。broker,需要感知其他存在的broker,以建立broker的网络。当我们想配置一个broker网络时,首要问题是:我们知道每个broker的准确地址吗?如果是,可以以静态的方式配置,将客户端连接到提前定义好的broker URI,这在你想完全控制所有资源的生产环境中比较常见。如果客户端以及broker相互不知道彼此的地址,,那么必须使用一种discovery机制来发现已有的broker。这种设置在开发环境下比较常见,易于配置和维护。static network概念介绍只要我们知道了想要使用的broker的地址,就可以使用static配置方式。Static connector

用来创建网络中多个broker的静态配置。协议使用组合URI,即URI中包含其他URI。格式如下:static:(uri1,uri2,uri3,…) ?key=valueXML中配置示例:

<networkConnectors><networkConnector name="local network"uri="static://(tcp://remotehost1:61616,tcp://remotehost2:61616)"/></networkConnectors>

程序实例为了更好的理解,可以通过一个发布者-订阅者的例子来进行说明。(demo来自ActiveMQ in action上的例子)这个例子使用下图所示的broker拓扑结构:

BrokerA与brokerB单向相连,当生产者把消息发送给brokerA时,他们会被投递给有订阅需求的broker。这个时候,会被brokerA投递给brokerB。详细代码如下。brokerB配置(brokerB.xml):<beans xmlns="" xmlns:amq="" xmlns:broker="" xmlns:xsi="" xsi:schemaLocation=" http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/activemq-core.xsd http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd http://activemq.apache.org/camel/schema/spring/camel-spring.xsd"> <!– Allows us to use system properties as variables in this configuration file –> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/><!– 定义一个broker –> <broker xmlns="" brokerName="BrokerB" dataDirectory="${activemq.base}/data"><!– The transport connectors ActiveMQ will listen to –><transportConnectors><transportConnector name="openwire" uri="tcp://localhost:61617" /></transportConnectors> </broker> </beans>brokerA配置(brokerA.xml):<beans xmlns="" xmlns:amq="" xmlns:broker="" xmlns:xsi="" xsi:schemaLocation=" http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/activemq-core.xsd http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd http://activemq.apache.org/camel/schema/spring/camel-spring.xsd"> <!– Allows us to use system properties as variables in this configuration file –> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/><broker xmlns="" brokerName="BrokerA" dataDirectory="${activemq.base}/data"><!– The transport connectors ActiveMQ will listen to –><transportConnectors><transportConnector name="openwire" uri="tcp://localhost:61616" /></transportConnectors><!– 定义一个network连接器,连接到其他broker –><networkConnectors><networkConnector uri="static:(tcp://localhost:61617)" /></networkConnectors> </broker> </beans>

消息生产者(Publisher.java):/** * XXX.com Inc. * Copyright (c) 2004-2015 All Rights Reserved. */package com.test.SpringTest.activemqinaction.ch4;import java.util.Hashtable;import java.util.Map;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MapMessage;import javax.jms.Message;import javax.jms.MessageProducer;import javax.jms.Session;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.command.ActiveMQMapMessage;/** * 消息产生者 * * @author jiangnan * @version $Id: Publisher.java, v 0.1 2015年7月4日 下午4:48:48 jiangnan Exp $ */public class Publisher {protected intMAX_DELTA_PERCENT = 1;protected Map<String, Double>LAST_PRICES= new Hashtable<String, Double>();protected static intcount= 10;protected static inttotal;protected static StringbrokerURL= "tcp://localhost:61616";protected static transient ConnectionFactory factory;protected transient Connectionconnection;protected transient Sessionsession;protected transient MessageProducerproducer;public Publisher() throws JMSException {factory = new ActiveMQConnectionFactory(brokerURL);connection = factory.createConnection();connection.start();session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);producer = session.createProducer(null);}public void close() throws JMSException {if (connection != null) {connection.close();}}public static void main(String[] args) throws JMSException {String[] topics = { "topic1", "topic2" };Publisher publisher = new Publisher();while (total < 1000) {for (int i = 0; i < count; i++) {publisher.sendMessage(topics);}total += count;System.out.println("Published '" + count + "' of '" + total + "' price messages");try {Thread.sleep(1000);} catch (InterruptedException x) {}}publisher.close();}protected void sendMessage(String[] stocks) throws JMSException {int idx = 0;while (true) {idx = (int) Math.round(stocks.length * Math.random());if (idx < stocks.length) {break;}}String stock = stocks[idx];Destination destination = session.createTopic("STOCKS." + stock);Message message = createStockMessage(stock, session);System.out.println("Sending: " + ((ActiveMQMapMessage) message).getContentMap()+ " on destination: " + destination);producer.send(destination, message);}protected Message createStockMessage(String stock, Session session) throws JMSException {Double value = LAST_PRICES.get(stock);if (value == null) {value = new Double(Math.random() * 100);}// lets mutate the value by some percentagedouble oldPrice = value.doubleValue();value = new Double(mutatePrice(oldPrice));LAST_PRICES.put(stock, value);double price = value.doubleValue();double offer = price * 1.001;boolean up = (price > oldPrice);MapMessage message = session.createMapMessage();message.setStringProperty("stock", stock);//设置消息的属性message.setString("stock", stock);message.setDouble("price", price);message.setDouble("offer", offer);message.setBoolean("up", up);return message;}protected double mutatePrice(double price) {double percentChange = (2 * Math.random() * MAX_DELTA_PERCENT) – MAX_DELTA_PERCENT;return price * (100 + percentChange) / 100;}}而消极的人则在每个机会都看到某种忧患。

Network connectors

相关文章:

你感兴趣的文章:

标签云: