再谈消息总线客户端的多线程实现

上次我谈了最近在写的一个基于RabbitMQ的消息总线的客户端在面对并发问题时的一些思考以及最终的实现方案。那是一种简单并且不容易产生并发问题的方案,如果你看过那篇文章,我曾在最终的实现方案之后给出了其利弊分析。

核心的问题是Client建立的跟RabbitMQ Server的connection是共享还是独占。对于这个问题可以举一个通俗一点的例子:如果你想要租间房子,每个人会有不同的想法。比如有人喜欢简单、安静的生活并且在意个人隐私,那么这个时候你最好的选择就是去租个单室套:里面什么都有,并且是独享的,,它的缺点是造成了资源浪费(比如你一个人需要占用洗衣机,电冰箱等其实你也没将它们高效得使用),因此你必须为这些资源的独享付出比较昂贵的代价;如果你不想那么浪费资源,有些东西跟别人共享一下也无所谓,并且你希望有较高的性价比,那么这种情况下,最好的选择其实是——合租。这也是自上次那篇文章之后,我对于自我实现的颠覆。这篇文章我将来探讨新的实现方式。

和上次那篇文章中声明的那样,它不仅仅是针对这个消息总线的客户端,它是一种对其他通用技术组件都可参考的解决方案。

问题分析

首先来谈谈上次纠结的问题是什么?是connection的生命周期受限于Client主对象(其创建跟销毁都依赖Client主对象),而connection如果可以被同一个JVM进程内的多个client对象共享,那么它生命周期的控制权的归属问题。

想在一个JVM进程内共享一个RabbitMQ connection,我只能选择将其实现为一个singleton。对于这种实现方式,connection的生命周期是如何管理的?是通过Client主对象的open/close这一对方法操作的。第一个对client的open方法的调用会触发对connection的实例化,之后这个connection就一直被open,当调用close的时候,connection也一同被关闭(不知道有没有人会问,这里为什么要在调用close方法的时候关闭connection,这是因为在实现的时候肯定是先面向非多线程的实现,如果是单线程,client对象完成任务了,肯定要调用close方法,而此时也就必须要关闭connection,否则就没有调用的时机了)。但这里却牵扯到close方法调用权以及时机的问题,因为在多线程环境下每个Channel都创建于被共享的connection,如果某个client关闭了该connection那么其他正在使用中的channel将全部抛出异常。这也是,为什么在上次那篇文章中,我的思路转向每个Client独占connection的实现方式。

不过我还是希望找到一种解决方案来实现共享connection的模型。因为RabbitMQ 的channel这种多路复用的设计,已经为我们在多线程上复用一个connection提供了基础,如果每个Client独占connection,无疑是打破了最佳实践(redis的client jedis实现connection pool来独占connection是因为redis在通信时没有channel的概念)。

解决思路

上面那个问题最根本的症结在于connection对象的生命周期管理问题。connection对象是宿主在Client内部的。如果我们让它的创建、初始化与销毁只依赖对象池,不就没有这个问题了吗?与此同时,其实还有两个关键对象:pubsuberManager(用于获取一个配置中心的配置)、configManager(用于订阅配置中心的数据变更以及对配置进行解析),它们也可以一同被提出来在同一个池中共享同一实例。

技术实现

对象池用的是apache common pool提供的基础设施。对于上面几个共享对象的生命周期管理,这里在Pool中定义了两个方法:

-init:用于初始化connection等上述几个关键对象

-destroy:用于销毁init方法中的关键对象

init:

protected void init() {this.exchangeManager = new ExchangerManager(this.pubsuberHost, this.pubsuberPort);if (!this.exchangeManager.isPubsuberAlive())throw new RuntimeException("can not connect to pubsub server.");this.configManager = new ConfigManager();this.configManager.setExchangeManager(this.exchangeManager);this.poolId = RandomHelper.randomNumberAndCharacter(12);this.exchangeManager.registerWithMultiChannels(poolId, this.configManager, new String[]{Constants.PUBSUB_ROUTER_CHANNEL,Constants.PUBSUB_CONFIG_CHANNEL,Constants.PUBSUB_EVENT_CHANNEL,Constants.PUBSUB_SINK_CHANNEL,Constants.PUBSUB_CHANNEL_CHANNEL,});try {this.configManager.parseRealTimeData();String host = this.configManager.getClientConfigMap().get("messagebus.client.host").getValue();ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(host);this.connection = connectionFactory.newConnection();} catch (IOException e) {throw new RuntimeException(e);}}destroy:

public void destroy() {this.innerPool.destroy();//release resourceif (exchangeManager != null)exchangeManager.removeRegister(this.poolId);if (configManager != null)configManager.destroy();if (this.connection != null && this.connection.isOpen()) {try {this.connection.close();} catch (IOException e) {throw new RuntimeException(e);}}}可以看到在上面destroy方法中,首先destroy真实的对象池之后,在最后才关闭了connection。这样, connection对象的生命周期跟pool的生命周期关联起来,而跟放入pool中的client对象没有关系,它们只需要获得打开后的connection对象来创建用于通信的Channel即可(现在只有Channel跟Client对象是一对一的关系)。

虽然我们已经将Client依赖的几个关键对象放到Pool中来构建,但我们还需要将他们传递给Client对象才行。两种常见的注入依赖对象的方式:构造器注入、setter方法注入在这里都不可行,因为我们不期望外部对象了解Client的细节。因此这里选择了不对外提供公共的注入点,而是将它们定义为client内部的私有实例字段,然后通过反射打开访问权限,注入后再关闭访问权限。

私有实例字段:

//inject by reflectorprivate ExchangerManager exchangeManager;private ConfigManager configManager;private Connectionconnection;反射注入实例的引用:

我想一个人旅行,背上简单的行囊,踏上行程,

再谈消息总线客户端的多线程实现

相关文章:

你感兴趣的文章:

标签云: