Spring 集成中的事务支持概述

本章介绍 Spring 集成对事务的支持。 它涵盖以下主题:

了解消息流中的事务交易边界事务同步伪交易了解消息流中的事务

Spring 集成公开了几个钩子来解决消息流的事务需求。 为了更好地理解这些钩子以及如何从中受益,我们必须首先重新审视可用于启动消息流的六种机制,并了解如何在每种机制中满足这些流的事务需求。

以下六种机制启动消息流(本手册中提供了每种机制的详细信息):

网关代理:基本邮件网关。消息通道:与方法的直接交互(例如,)。MessageChannelchannel.send(message)消息发布者:在 Spring bean 上启动消息流作为方法调用的副产品的方法。入站通道适配器和网关:基于将第三方系统与 Spring 集成消息传递系统连接来启动消息流的方法(例如,)。[JmsMessage] → Jms Inbound Adapter[SI Message] → SI Channel调度程序:基于由预配置的调度程序分发的调度事件来启动消息流的方法。轮询器:与调度程序类似,这是基于调度或预配置轮询器分发的基于间隔的事件启动消息流的方法。

我们可以将这六种机制分为两大类:

由用户进程启动的消息流:此类别中的示例场景是调用网关方法或显式向 . 换句话说,这些消息流依赖于要启动的第三方进程(例如您编写的某些代码)。MessageMessageChannel由守护进程启动的消息流:此类别中的示例场景包括轮询器轮询消息队列以使用轮询消息启动新消息流,或者调度程序通过创建新消息并在预定义时间启动消息流来调度进程。

显然,网关代理和所有代理都属于第一类,入站适配器和网关、调度程序和轮询器属于第二类。??MessageChannel.send(…)????MessagePublisher??

那么,您如何解决每个类别中各种场景中的事务需求,并且 Spring 集成是否需要为特定场景提供有关事务的明确内容? 或者你可以使用Spring的交易支持吗?

Spring 本身为事务管理提供了一流的支持。 因此,我们的目标不是提供新的东西,而是使用Spring从其现有的交易支持中受益。 换句话说,作为一个框架,我们必须向 Spring 的事务管理功能公开钩子。 但是,由于 Spring 集成配置基于 Spring 配置,我们不需要总是公开这些钩子,因为 Spring 已经公开了它们。 毕竟,每个弹簧集成组件都是一个弹簧豆。

考虑到这个目标,我们可以再次考虑两种情况:由用户进程启动的消息流和由守护程序启动的消息流。

由用户进程启动并在 Spring 应用程序上下文中配置的消息流受制于此类进程的常规事务配置。 因此,Spring 集成不需要显式配置它们来支持事务。 交易可以而且应该通过Spring的标准交易支持来启动。 Spring 集成消息流自然地遵循组件的事务语义,因为它本身是由 Spring 配置的。 例如,可以使用 批注网关或服务激活器方法,也可以在 XML 配置中使用指向应为事务性的特定方法的切入点表达式定义 。 最重要的是,在这些情况下,您可以完全控制事务配置和边界。??@Transactional????TransactionInterceptor??

但是,当涉及到由守护进程启动的消息流时,情况略有不同。 尽管由开发人员配置,但这些流并不直接涉及要启动的人工或其他一些进程。 这些是基于触发器的流,由触发器进程(守护进程)根据进程的配置启动。 例如,我们可以让调度程序在每个星期五晚上启动消息流。 我们还可以配置一个触发器,每秒启动一个消息流,依此类推。 因此,我们需要一种方法来让这些基于触发器的进程知道我们打算使生成的消息流成为事务性的,以便每当启动新消息流时都可以创建事务上下文。 换句话说,我们需要公开一些事务配置,但只够委托给 Spring 已经提供的事务支持(就像我们在其他场景中所做的那样)。

轮询器事务支持

Spring 集成为轮询器提供事务支持。 轮询器是一种特殊类型的组件,因为在轮询器任务中,我们可以调用本身是事务的资源,从而在事务边界中包含调用,从而允许在任务失败时回滚。 如果我们要添加对通道的相同支持,则添加的事务将影响从调用开始的所有下游组件。 这为没有任何充分理由的事务划分提供了相当广泛的范围,特别是当Spring已经提供了几种方法来满足下游任何组件的交易需求时。 但是,包含在事务边界中的方法对于轮询者来说是“强有力的理由”。??receive()????receive()????send()????receive()??

每次配置 Poller 时,都可以使用子元素及其属性提供事务配置,如以下示例所示:??transactional??

<int:poller max-messages-per-poll=”1″ fixed-rate=”1000″> <transactional transaction-manager=”txManager” isolation=”DEFAULT” propagation=”REQUIRED” read-only=”true” timeout=”1000″/></poller>

前面的配置看起来类似于本机 Spring 事务配置。 您仍然必须提供对事务管理器的引用,并指定事务属性或依赖缺省值(例如,如果未指定“事务管理器”属性,则默认为名为“事务管理器”的 Bean)。 在内部,该过程包装在 Spring 的本机事务中,负责处理事务。 有关如何配置事务管理器、事务管理器的类型(如 JTA、数据源等)以及与事务配置相关的其他详细信息的更多信息,请参阅Spring 框架参考指南。TransactionInterceptor

使用上述配置时,此轮询器启动的所有消息流都是事务性的。 有关轮询器的事务配置的详细信息,请参阅轮询和事务。

除了事务之外,在运行轮询器时,您可能需要解决更多横切问题。 为了帮助解决这个问题,轮询器元素接受一个子元素,该子元素允许您定义要应用于轮询器的自定义建议实例链。 (有关更多详细信息,请参阅可轮询消息源。 在Spring Integration 2.0中,Poller经历了重构工作,现在使用代理机制来解决事务问题以及其他跨领域问题。 从这项工作演变而来的重大变化之一是,我们使 和 元素相互排斥。 这背后的理由是,如果您需要多个建议,其中一个是交易建议,您可以像以前一样方便地将其包含在 中,但有更多的控制权,因为您现在可以选择按所需的顺序定位建议。 以下示例演示如何执行此操作:??<advice-chain>????<transactional>????<advice-chain>????<advice-chain>??

<int:poller max-messages-per-poll=”1″ fixed-rate=”10000″> <advice-chain> <ref bean=”txAdvice”/> <ref bean=”someOtherAdviceBean” /> <beans:bean class=”foo.bar.SampleAdvice”/> </advice-chain></poller><tx:advice id=”txAdvice” transaction-manager=”txManager”> <tx:attributes> <tx:method name=”get*” read-only=”true”/> <tx:method name=”*”/> </tx:attributes></tx:advice>

前面的示例显示了 Spring 事务建议 () 的基于 XML 的基本配置,并将其包含在轮询器定义的配置中。 如果只需要解决轮询器的事务问题,则仍然可以使用该元素作为方便。??txAdvice????<advice-chain>????<transactional>??

交易边界

另一个重要因素是消息流中事务的边界。 启动事务时,事务上下文绑定到当前线程。 因此,无论您的消息流中有多少终端节点和通道,只要您确保流在同一线程上继续,您的事务上下文就会被保留。 一旦通过引入可轮询通道或执行器通道或在某些服务中手动启动新线程来打破它,事务边界也会被打破。 从本质上讲,事务将在那里结束,如果线程之间发生了成功的交接,则流将被视为成功,并且即使流将继续并且仍可能导致下游某处异常,也会发送 COMMIT 信号。 如果此类流是同步的,则该异常可以抛回消息流的发起方,该发起方也是事务上下文的发起方,并且该事务将导致回滚。 中间立场是在线程边界被打破的任何点使用事务通道。 例如,您可以使用委派给事务性消息存储策略的队列支持的通道,也可以使用 JMS 支持的通道。

事务同步

在某些环境中,它有助于将操作与包含整个流的事务同步。 例如,考虑在执行大量数据库更新的流开始时执行。 如果事务提交,我们可能希望将文件移动到目录,而如果事务回滚,我们可能希望将其移动到目录。??<file:inbound-channel-adapter/>????success????failure??

Spring Integration 2.2引入了将这些操作与事务同步的功能。 此外,如果您没有“真实”事务但仍想在成功或失败时执行不同的操作,则可以配置 。 有关更多信息,请参阅伪事务。??PseudoTransactionManager??

以下清单显示了此功能的关键策略接口:

public interface TransactionSynchronizationFactory { TransactionSynchronization create(Object key);}public interface TransactionSynchronizationProcessor { void processBeforeCommit(IntegrationResourceHolder holder); void processAfterCommit(IntegrationResourceHolder holder); void processAfterRollback(IntegrationResourceHolder holder);}

工厂负责创建事务同步对象。 您可以实现自己的框架或使用框架提供的框架:。 此实现返回委托给默认实现 : 。 此处理器支持三个 SpEL 表达式:、 和 。??DefaultTransactionSynchronizationFactory????TransactionSynchronization????TransactionSynchronizationProcessor????ExpressionEvaluatingTransactionSynchronizationProcessor????beforeCommitExpression????afterCommitExpression????afterRollbackExpression??

对于熟悉交易的人来说,这些操作应该是不言自明的。 在每种情况下,变量都是原始的。 在某些情况下,其他 SpEL 变量可用,具体取决于轮询器轮询的变量。 例如,提供了引用消息源的变量。 同样,提供变量,该变量引用由轮询创建的变量。??#root????Message????MessageSource????MongoDbMessageSource????#mongoTemplate????MongoTemplate????RedisStoreMessageSource????#store????RedisStore??

若要为特定轮询器启用该功能,可以使用该属性提供对轮询器元素的引用。??TransactionSynchronizationFactory????<transactional/>????synchronization-factory??

从版本 5.0 开始,Spring 集成提供了 ,当配置了 no 但建议链中存在类型的建议时,默认情况下会将其应用于轮询端点。 使用任何现成的实现时,轮询终结点将轮询的消息绑定到当前事务上下文,并在事务建议后引发异常时将其作为 in 提供。 使用未实现的自定义事务建议时,可以显式配置 来实现此行为。 无论哪种情况,都会成为发送到 的有效负载,原因是建议引发的原始异常。 以前,有效负载是建议引发的原始异常,并且不提供对信息的引用,因此很难确定事务提交问题的原因。??PassThroughTransactionSynchronizationFactory????TransactionSynchronizationFactory????TransactionInterceptor????TransactionSynchronizationFactory????failedMessage????MessagingException????TransactionInterceptor????PassThroughTransactionSynchronizationFactory????MessagingException????ErrorMessage????errorChannel????ErrorMessage????failedMessage??

为了简化这些组件的配置,Spring 集成为默认工厂提供了命名空间支持。 下面的示例演示如何使用命名空间配置文件入站通道适配器:

<int-file:inbound-channel-adapter id=”inputDirPoller” channel=”someChannel” directory=”/foo/bar” filter=”filter” comparator=”testComparator”> <int:poller fixed-rate=”5000″> <int:transactional transaction-manager=”transactionManager” synchronization-factory=”syncFactory” /> </int:poller></int-file:inbound-channel-adapter><int:transaction-synchronization-factory id=”syncFactory”> <int:after-commit expression=”payload.renameTo(new java.io.File(‘/success/’ + payload.name))” channel=”committedChannel” /> <int:after-rollback expression=”payload.renameTo(new java.io.File(‘/failed/’ + payload.name))” channel=”rolledBackChannel” /></int:transaction-synchronization-factory>

SpEL 评估的结果作为有效负载发送到 or (在本例中,这将是 or — 方法调用的结果)。??committedChannel????rolledBackChannel????Boolean.TRUE????Boolean.FALSE????java.io.File.renameTo()??

如果您希望发送整个有效负载以进行进一步的 Spring 集成处理,请使用“有效负载”表达式。

请务必了解这会将操作与事务同步。 它不会使本质上不是事务性的资源实际上成为事务性资源。 相反,事务(无论是 JDBC 还是其他)在轮询之前启动,并在流完成时提交或回滚,然后是同步操作。

如果提供自定义 ,则它负责创建资源同步,使绑定的资源在事务完成时自动解除绑定。 缺省情况下,返回一个子类 ,缺省返回 。??TransactionSynchronizationFactory????TransactionSynchronizationFactory????ResourceHolderSynchronization????shouldUnbindAtCompletion()????true??

除了 和 表达式之外,还支持。 在这种情况下,如果评估(或下游处理)引发异常,事务将回滚而不是提交。??after-commit????after-rollback????before-commit??

伪交易

阅读事务同步部分后,您可能会认为在流完成时执行这些“成功”或“失败”操作会很有用,即使轮询器下游没有“真正的”事务资源(如 JDBC)。 例如,考虑一个“<file:inbound-channel-adapter/>”,后跟一个“<ftp:outbout-channel-adapter/>”。 这些组件都不是事务性的,但我们可能希望根据 FTP 传输的成功或失败将输入文件移动到不同的目录。

为了提供此功能,框架提供了一个 ,即使不涉及实际事务资源,也能启用上述配置。 如果流正常完成,则调用 和 同步。 失败时,将调用同步。 因为它不是真正的事务,所以不会发生实际的提交或回滚。 伪事务是用于启用同步功能的工具。??PseudoTransactionManager????beforeCommit????afterCommit????afterRollback??

要使用 ,可以将其定义为 <bean/>,就像配置实际事务管理器一样。 以下示例演示如何执行此操作:??PseudoTransactionManager??

<bean id=”transactionManager” class=”o.s.i.transaction.PseudoTransactionManager” />反应式事务

从版本 5.3 开始,a 还可以与返回反应式类型的终结点的建议一起使用。 这包括和实现(例如),它们生成带有 或 有效负载的消息。 所有其他回复生成消息处理程序实现都可以依赖于 当它们的回复有效负载也是某种反应式类型时。??ReactiveTransactionManager????TransactionInterceptor????MessageSource????ReactiveMessageHandler????ReactiveMongoDbMessageSource????Flux????Mono????ReactiveTransactionManager??

生活的最大悲剧不是失败,而是一个人已经习惯于失败。

Spring 集成中的事务支持概述

相关文章:

你感兴趣的文章:

标签云: