Spring Batch -集成在春季批处理和Spring集成项目之间

Spring Batch 的许多用户可能会遇到以下要求: 在春季批次的范围之外,但这可能是有效的和 通过使用 Spring 集成简洁地实现。相反,春天 集成用户可能会遇到Spring Batch需求并需要一种方法 有效地整合这两个框架。在这种情况下,几个 模式和用例出现,春季批量集成 满足这些要求。

Spring Batch 和 Spring Integration 之间的界限并不总是 明确,但有两条建议可以 帮助:考虑粒度并应用常见模式。一些 本节将介绍这些常见模式。

将消息传递添加到批处理可实现自动化 运营以及关键问题的分离和战略制定。 例如,一条消息可能会触发要执行的作业,然后 可以通过多种方式公开发送消息。或者,当 作业完成或失败,该事件可能会触发要发送的消息, 这些消息的使用者可能有操作问题 与应用程序本身无关。消息传递可以 也嵌入到作业中(例如,读取或写入 通过渠道处理)。远程分区和远程分块 提供在多个辅助角色之间分配工作负载的方法。

本节介绍以下关键概念:

命名空间支持通过消息启动批处理作业通过信息性消息提供反馈异步处理器外部化批处理执行命名空间支持

专用的XML命名空间支持已添加到1.3版的Spring Batch Integration中, 旨在提供更简单的配置 经验。若要使用命名空间,请添加以下内容 对 Spring XML 应用程序上下文的命名空间声明 文件:

<beans xmlns=”http://www.springframework.org/schema/beans” xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance” xmlns:batch-int=”http://www.springframework.org/schema/batch-integration” xsi:schemaLocation=” http://www.springframework.org/schema/batch-integration https://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd”> …</beans>

以下示例显示了 Spring 完全配置的 Spring XML 应用程序上下文文件 批量集成:

<beans xmlns=”http://www.springframework.org/schema/beans” xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance” xmlns:int=”http://www.springframework.org/schema/integration” xmlns:batch=”http://www.springframework.org/schema/batch” xmlns:batch-int=”http://www.springframework.org/schema/batch-integration” xsi:schemaLocation=” http://www.springframework.org/schema/batch-integration https://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd http://www.springframework.org/schema/batch https://www.springframework.org/schema/batch/spring-batch.xsd http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd”> …</beans>

将版本号追加到引用的 XSD 文件也是 允许。但是,由于无版本声明始终使用 最新架构,我们一般不建议追加版本 编号,以 XSD 名称。添加版本号 更新春季批处理时可能会产生问题 集成依赖项,因为它们可能需要更新的版本 的 XML 架构。

通过消息启动批处理作业

使用核心 Spring Batch API 启动批处理作业时,您 基本上有两种选择:

从命令行中,使用CommandLineJobRunner以编程方式,使用 或JobOperator.start()JobLauncher.run()

例如,您可能希望在调用批处理作业时使用 使用 shell 脚本。或者,您可以直接使用 (例如,当使用 作为 Web 应用程序一部分的 Spring Batch)。但是,呢 更复杂的用例?也许您需要轮询远程 (S)FTP 服务器,用于检索批处理作业或应用程序的数据 必须同时支持多个不同的数据源。为 例如,您不仅可以从 Web 接收数据文件,还可以从 FTP 和其他来源。也许输入文件的其他转换是 在调用 Spring 批处理之前需要。??CommandLineJobRunner????JobOperator??

因此,执行批处理作业会更强大 通过使用弹簧集成及其众多适配器。例如 您可以使用文件入站通道适配器来 监视文件系统中的目录并启动批处理作业 输入文件到达后立即到达。此外,您可以创建 Spring 使用多个不同适配器的集成流程,可轻松实现 从多个源引入批处理作业的数据 同时仅使用配置。实现所有这些 Spring 集成的场景很容易,因为它允许 解耦、事件驱动的执行 .??JobLauncher??

Spring 批量集成提供了您可以 用于启动批处理作业。的输入由 Spring 集成消息,其有效负载类型为 。此类是要启动的包装器和围绕 启动批处理作业所必需的。??JobLaunchingMessageHandler????JobLaunchingMessageHandler????JobLaunchRequest????Job????JobParameters??

下图显示了典型的弹簧集成 启动批处理作业所需的消息流。EIP(企业集成模式)网站提供了消息传递图标及其说明的完整概述。

图1.启动批处理作业

将文件转换为 JobLaunchRequest

以下示例将文件转换为:??JobLaunchRequest??

package io.spring.sbi;public class FileMessageToJobRequest { private Job job; private String fileParameterName; public void setFileParameterName(String fileParameterName) { this.fileParameterName = fileParameterName; } public void setJob(Job job) { this.job = job; } @Transformer public JobLaunchRequest toRequest(Message<File> message) { JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); jobParametersBuilder.addString(fileParameterName, message.getPayload().getAbsolutePath()); return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters()); }}作业执行响应

执行批处理作业时,将返回一个实例。你可以使用这个 实例以确定执行的状态。如果 a 可以创建 成功,它始终返回,无论是否 或者实际执行不成功。??JobExecution????JobExecution??

如何返回实例的确切行为取决于提供的 。如果使用(单线程)实现,则仅返回作业完成的响应。使用 时,将返回实例 马上。然后,您可以获取实例 (使用 ) 并查询作业的更新状态 使用 .欲了解更多信息 信息,请参阅查询存储库。??JobExecution????TaskExecutor????synchronous????TaskExecutor????JobExecution????after????asynchronous????TaskExecutor????JobExecution????id????JobExecution????JobExecution.getJobId()????JobRepository????JobExplorer??

弹簧批量集成配置

考虑一个案例,有人需要创建一个文件来侦听 对于提供的目录中的 CSV 文件,请将它们交给转换器 (),通过作业启动网关启动作业,以及 记录 的输出。??inbound-channel-adapter????FileMessageToJobRequest????JobExecution????logging-channel-adapter??

下面的示例演示如何在 XML 中配置该常见情况: .XML配置

<int:channel id=”inboundFileChannel”/><int:channel id=”outboundJobRequestChannel”/><int:channel id=”jobLaunchReplyChannel”/><int-file:inbound-channel-adapter id=”filePoller” channel=”inboundFileChannel” directory=”file:/tmp/myfiles/” filename-pattern=”*.csv”> <int:poller fixed-rate=”1000″/></int-file:inbound-channel-adapter><int:transformer input-channel=”inboundFileChannel” output-channel=”outboundJobRequestChannel”> <bean class=”io.spring.sbi.FileMessageToJobRequest”> <property name=”job” ref=”personJob”/> <property name=”fileParameterName” value=”input.file.name”/> </bean></int:transformer><batch-int:job-launching-gateway request-channel=”outboundJobRequestChannel” reply-channel=”jobLaunchReplyChannel”/><int:logging-channel-adapter channel=”jobLaunchReplyChannel”/>

以下示例显示了如何在 Java 中配置该常见情况:

爪哇配置

@Beanpublic FileMessageToJobRequest fileMessageToJobRequest() { FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest(); fileMessageToJobRequest.setFileParameterName(“input.file.name”); fileMessageToJobRequest.setJob(personJob()); return fileMessageToJobRequest;}@Beanpublic JobLaunchingGateway jobLaunchingGateway() { TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher(); jobLauncher.setJobRepository(jobRepository); jobLauncher.setTaskExecutor(new SyncTaskExecutor()); JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher); return jobLaunchingGateway;}@Beanpublic IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) { return IntegrationFlow.from(Files.inboundAdapter(new File(“/tmp/myfiles”)). filter(new SimplePatternFileListFilter(“*.csv”)), c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))). transform(fileMessageToJobRequest()). handle(jobLaunchingGateway). log(LoggingHandler.Level.WARN, “headers.id + ‘: ‘ + payload”). get();}示例条目读取器配置

现在我们正在轮询文件并启动作业,我们需要配置我们的 Spring 批处理(例如)以使用在作业定义的位置找到的文件 名为“input.file.name”的参数,如以下 Bean 配置所示:??ItemReader??

以下 XML 示例显示了必要的 Bean 配置:

XML 配置

<bean id=”itemReader” class=”org.springframework.batch.item.file.FlatFileItemReader” scope=”step”> <property name=”resource” value=”file://#{jobParameters[‘input.file.name’]}”/> …</bean>

以下 Java 示例显示了必要的 Bean 配置:

爪哇配置

@Bean@StepScopepublic ItemReader sampleReader(@Value(“#{jobParameters[input.file.name]}”) String resource) {… FlatFileItemReader flatFileItemReader = new FlatFileItemReader(); flatFileItemReader.setResource(new FileSystemResource(resource));… return flatFileItemReader;}

前面示例中的主要关注点是注入 的值作为 Resource 属性值并设置 Bean 具有步骤范围。将 Bean 设置为具有步骤范围可利用 后期绑定支持,允许访问变量。??#{jobParameters[‘input.file.name’]}????ItemReader????jobParameters??

作业启动网关的可用属性

作业启动网关具有以下属性,您可以设置这些属性来控制作业:

?id??:标识基础 Spring Bean 定义,该定义是以下任一实例:??EventDrivenConsumer????PollingConsumer??(确切的实现取决于组件的输入通道是 a 还是 。SubscribableChannelPollableChannel??auto-startup??:布尔标志,指示终结点应在 上自动启动 启动。缺省值为 。true??request-channel??:此端点的输入。MessageChannel??reply-channel??:将生成的有效负载发送到该有效负载。MessageChannelJobExecution??reply-timeout??:用于指定此网关等待回复消息的时间(以毫秒为单位) 在抛出之前成功发送到回复通道 异常。仅当通道时,此属性才适用 可能会阻止(例如,使用有界队列通道时 目前已满)。另外,请记住,当发送到 时,会发生调用 在发件人的线程中。因此,发送失败 操作可能由下游的其他组件引起。 该属性映射到基础实例的属性。如果未指定,则属性 默认值为 -1, 这意味着,默认情况下,无限期地等待。DirectChannelreply-timeoutsendTimeoutMessagingTemplateGateway??job-launcher??:自选。接受 自定义 Bean 引用。 如果未指定,则适配器 重用在 下注册的实例。如果没有默认实例 存在,引发异常。JobLauncheridjobLauncher??order??:指定此端点作为订阅者连接时的调用顺序 到 .SubscribableChannel子元素

当它接收来自 的消息时,您必须提供 全局默认值或为 .??Gateway????PollableChannel????Poller????Poller????Job Launching Gateway??

下面的示例演示如何在 XML 中提供轮询器:

XML 配置

<batch-int:job-launching-gateway request-channel=”queueChannel” reply-channel=”replyChannel” job-launcher=”jobLauncher”> <int:poller fixed-rate=”1000″></batch-int:job-launching-gateway>

以下示例演示如何在 Java 中提供轮询器:

爪哇配置

@Bean@ServiceActivator(inputChannel = “queueChannel”, poller = @Poller(fixedRate=”1000″))public JobLaunchingGateway sampleJobLaunchingGateway() { JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher()); jobLaunchingGateway.setOutputChannel(replyChannel()); return jobLaunchingGateway;}Providing Feedback with Informational Messages

As Spring Batch jobs can run for long times, providing progress information is often critical. For example, stakeholders may want to be notified if some or all parts of a batch job have failed. Spring Batch provides support for this information being gathered through:

主动轮询事件驱动的侦听器

异步启动 Spring 批处理作业时(例如,通过使用作业启动 网关),返回一个实例。因此,您可以使用 通过使用 从 检索更新的实例来持续轮询状态更新。然而,这是 被认为是次优的,首选事件驱动的方法。??JobExecution????JobExecution.getJobId()????JobExecution????JobRepository????JobExplorer??

因此,Spring Batch提供了包括三种最常用的监听器 听众:

??StepListener????ChunkListener????JobExecutionListener??

在下图所示的示例中,Spring 批处理作业已配置了 .因此,Spring Integration接收并处理之前的任何步骤 或事件发生后。例如,您可以使用 .根据该检查的结果,可能会发生各种事情(例如 将邮件路由到邮件出站通道适配器),以便电子邮件通知可以 根据某些条件发送出去。??StepExecutionListener????StepExecution????Router??

图2.处理信息性消息

以下由两部分组成的示例演示如何将侦听器配置为发送 向 发送消息以获取事件,并将其输出记录到 .??Gateway????StepExecution????logging-channel-adapter??

首先,创建通知集成 Bean。

以下示例显示了如何在 XML 中创建通知集成 Bean:

XML 配置

<int:channel id=”stepExecutionsChannel”/><int:gateway id=”notificationExecutionsListener” service-interface=”org.springframework.batch.core.StepExecutionListener” default-request-channel=”stepExecutionsChannel”/><int:logging-channel-adapter channel=”stepExecutionsChannel”/>

以下示例显示了如何在 Java 中创建通知集成 Bean:

爪哇配置

@Bean@ServiceActivator(inputChannel = “stepExecutionsChannel”)public LoggingHandler loggingHandler() { LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.WARN); adapter.setLoggerName(“TEST_LOGGER”); adapter.setLogExpressionString(“headers.id + ‘: ‘ + payload”); return adapter;}@MessagingGateway(name = “notificationExecutionsListener”, defaultRequestChannel = “stepExecutionsChannel”)public interface NotificationExecutionListener extends StepExecutionListener {}

您需要将注释添加到配置中。??@IntegrationComponentScan??

其次,修改作业以添加步骤级侦听器。

下面的示例演示如何在 XML 中添加步骤级侦听器:

XML 配置

<job id=”importPayments”> <step id=”step1″> <tasklet ../> <chunk ../> <listeners> <listener ref=”notificationExecutionsListener”/> </listeners> </tasklet> … </step></job>

以下示例演示如何在 Java 中添加步骤级侦听器:

爪哇配置

public Job importPaymentsJob(JobRepository jobRepository) { return new JobBuilder(“importPayments”, jobRepository) .start(stepBuilderFactory.get(“step1”) .chunk(200) .listener(notificationExecutionsListener()) … )}异步处理器

异步处理器可帮助您扩展项目的处理。在异步中 处理器用例,充当调度程序,执行逻辑 对于新线程上的项目。项目完成后,是 传递给要写的。??AsyncItemProcessor????ItemProcessor????Future????AsynchItemWriter??

因此,您可以使用异步项处理来提高性能,基本上 允许您实现分叉联接方案。收集结果和 一旦所有结果都可用,就写回块。??AsyncItemWriter??

下面的示例演示如何在 XML 中配置 :??AsyncItemProcessor??

XML 配置

<bean id=”processor” class=”org.springframework.batch.integration.async.AsyncItemProcessor”> <property name=”delegate”> <bean class=”your.ItemProcessor”/> </property> <property name=”taskExecutor”> <bean class=”org.springframework.core.task.SimpleAsyncTaskExecutor”/> </property></bean>

下面的示例演示如何在 XML 中配置 :??AsyncItemProcessor??

爪哇配置

@Beanpublic AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) { AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor(); asyncItemProcessor.setTaskExecutor(taskExecutor); asyncItemProcessor.setDelegate(itemProcessor); return asyncItemProcessor;}

属性是指您的 bean,属性是指您选择的 bean。??delegate????ItemProcessor????taskExecutor????TaskExecutor??

以下示例演示如何在 XML 中配置 :??AsyncItemWriter??

XML 配置

<bean id=”itemWriter” class=”org.springframework.batch.integration.async.AsyncItemWriter”> <property name=”delegate”> <bean id=”itemWriter” class=”your.ItemWriter”/> </property></bean>

以下示例显示了如何在 Java 中配置 :??AsyncItemWriter??

爪哇配置

@Beanpublic AsyncItemWriter writer(ItemWriter itemWriter) { AsyncItemWriter asyncItemWriter = new AsyncItemWriter(); asyncItemWriter.setDelegate(itemWriter); return asyncItemWriter;}

再次,该物业是 实际上是对你的豆子的引用。??delegate????ItemWriter??

外部化批处理执行

到目前为止讨论的集成方法建议用例 其中,Spring Integration将Spring Batch包裹起来,就像外壳一样。 但是,Spring Batch 也可以在内部使用 Spring Integration。 通过使用这种方法,Spring Batch 用户可以委派 将项目甚至块处理到外部进程。这 让您卸载复杂的处理。春季批量集成 为以下方面提供专门支持:

远程分块远程分区远程分块

下图显示了使用 Spring 批处理时远程分块的一种工作方式 与弹簧集成一起:

图3.远程分块

更进一步,您还可以将 通过使用(由Spring Batch Integration提供)进行块处理,该块将项目发送出去 并收集结果。发送后,春季批次继续 读取和分组项目的过程,无需等待结果。 相反,收集结果并将其重新集成到春季批处理过程中是其责任。??ChunkMessageChannelItemWriter????ChunkMessageChannelItemWriter??

通过弹簧集成,您可以完全 控制流程的并发性(例如,通过 使用 a 而不是 )。此外,依靠 Spring 集成丰富的通道适配器集合(例如 JMS 和 AMQP),您可以将批处理作业的块分发到 用于处理的外部系统。??QueueChannel????DirectChannel??

具有要远程分块的步骤的作业可能具有类似于 在 XML 中执行以下操作:

XML 配置

<job id=”personJob”> <step id=”step1″> <tasklet> <chunk reader=”itemReader” writer=”itemWriter” commit-interval=”200″/> </tasklet> … </step></job>

具有要远程分块的步骤的作业可能具有类似于 在爪哇中以下:

爪哇配置

public Job chunkJob(JobRepository jobRepository) { return new JobBuilder(“personJob”, jobRepository) .start(stepBuilderFactory.get(“step1”) .<Person, Person>chunk(200) .reader(itemReader()) .writer(itemWriter()) .build()) .build(); }

引用指向要用于读取 经理。如前所述,引用指向一个特殊(称为)。处理器(如果有)处于关闭状态 管理器配置,因为它是在辅助角色上配置的。您应该检查任何 实现时的其他组件属性,例如限制等 您的用例。??ItemReader????ItemWriter????ItemWriter????ChunkMessageChannelItemWriter??

以下 XML 配置提供了基本的管理器设置:

XML 配置

<bean id=”connectionFactory” class=”org.apache.activemq.ActiveMQConnectionFactory”> <property name=”brokerURL” value=”tcp://localhost:61616″/></bean><int-jms:outbound-channel-adapter id=”jmsRequests” destination-name=”requests”/><bean id=”messagingTemplate” class=”org.springframework.integration.core.MessagingTemplate”> <property name=”defaultChannel” ref=”requests”/> <property name=”receiveTimeout” value=”2000″/></bean><bean id=”itemWriter” class=”org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter” scope=”step”> <property name=”messagingOperations” ref=”messagingTemplate”/> <property name=”replyChannel” ref=”replies”/></bean><int:channel id=”replies”> <int:queue/></int:channel><int-jms:message-driven-channel-adapter id=”jmsReplies” destination-name=”replies” channel=”replies”/>

以下 Java 配置提供了基本的管理器设置:

爪哇配置

@Beanpublic org.apache.activemq.ActiveMQConnectionFactory connectionFactory() { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); factory.setBrokerURL(“tcp://localhost:61616”); return factory;}/* * Configure outbound flow (requests going to workers) */@Beanpublic DirectChannel requests() { return new DirectChannel();}@Beanpublic IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) { return IntegrationFlow .from(requests()) .handle(Jms.outboundAdapter(connectionFactory).destination(“requests”)) .get();}/* * Configure inbound flow (replies coming from workers) */@Beanpublic QueueChannel replies() { return new QueueChannel();}@Beanpublic IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) { return IntegrationFlow .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination(“replies”)) .channel(replies()) .get();}/* * Configure the ChunkMessageChannelItemWriter */@Beanpublic ItemWriter<Integer> itemWriter() { MessagingTemplate messagingTemplate = new MessagingTemplate(); messagingTemplate.setDefaultChannel(requests()); messagingTemplate.setReceiveTimeout(2000); ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter<>(); chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate); chunkMessageChannelItemWriter.setReplyChannel(replies()); return chunkMessageChannelItemWriter;}

前面的配置为我们提供了许多 bean。我们 通过使用 ActiveMQ 和 Spring 集成提供的入站和出站 JMS 适配器。如 显示,我们的豆子,它是 由我们的作业步骤引用,使用 在 配置的中间件。??itemWriter????ChunkMessageChannelItemWriter??

现在我们可以继续讨论工作线程配置,如以下示例所示:

以下示例显示了 XML 中的工作器配置:

XML 配置

<bean id=”connectionFactory” class=”org.apache.activemq.ActiveMQConnectionFactory”> <property name=”brokerURL” value=”tcp://localhost:61616″/></bean><int:channel id=”requests”/><int:channel id=”replies”/><int-jms:message-driven-channel-adapter id=”incomingRequests” destination-name=”requests” channel=”requests”/><int-jms:outbound-channel-adapter id=”outgoingReplies” destination-name=”replies” channel=”replies”></int-jms:outbound-channel-adapter><int:service-activator id=”serviceActivator” input-channel=”requests” output-channel=”replies” ref=”chunkProcessorChunkHandler” method=”handleChunk”/><bean id=”chunkProcessorChunkHandler” class=”org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler”> <property name=”chunkProcessor”> <bean class=”org.springframework.batch.core.step.item.SimpleChunkProcessor”> <property name=”itemWriter”> <bean class=”io.spring.sbi.PersonItemWriter”/> </property> <property name=”itemProcessor”> <bean class=”io.spring.sbi.PersonItemProcessor”/> </property> </bean> </property></bean>

以下示例显示了 Java 中的工作线程配置:

爪哇配置

@Beanpublic org.apache.activemq.ActiveMQConnectionFactory connectionFactory() { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); factory.setBrokerURL(“tcp://localhost:61616”); return factory;}/* * Configure inbound flow (requests coming from the manager) */@Beanpublic DirectChannel requests() { return new DirectChannel();}@Beanpublic IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) { return IntegrationFlow .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination(“requests”)) .channel(requests()) .get();}/* * Configure outbound flow (replies going to the manager) */@Beanpublic DirectChannel replies() { return new DirectChannel();}@Beanpublic IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) { return IntegrationFlow .from(replies()) .handle(Jms.outboundAdapter(connectionFactory).destination(“replies”)) .get();}/* * Configure the ChunkProcessorChunkHandler */@Bean@ServiceActivator(inputChannel = “requests”, outputChannel = “replies”)public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() { ChunkProcessor<Integer> chunkProcessor = new SimpleChunkProcessor<>(itemProcessor(), itemWriter()); ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler = new ChunkProcessorChunkHandler<>(); chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor); return chunkProcessorChunkHandler;}

这些配置项中的大多数应该看起来很熟悉 管理器配置。工作人员不需要访问 春季批次或 到实际的作业配置文件。感兴趣的主要豆子 是 .的性质取 配置 ,您可以在其中提供对将在工作线程上运行的(以及可选的)的引用 当它从管理器接收块时。??JobRepository????chunkProcessorChunkHandler????chunkProcessor????ChunkProcessorChunkHandler????SimpleChunkProcessor????ItemWriter????ItemProcessor??

有关详细信息,请参阅有关远程分块的“可伸缩性”一章的部分。

从版本 4.1 开始,Spring 批量集成引入了可用于简化远程分块设置的注释。此注释提供 可以在应用程序上下文中自动连线的两个 Bean:??@EnableBatchIntegration??

??RemoteChunkingManagerStepBuilderFactory??:配置管理器步骤??RemoteChunkingWorkerBuilder??:配置远程工作线程集成流

这些 API 负责配置许多组件,如下图所示:

图4.远程分块配置

在经理方面,让您 通过声明以下内容来配置管理器步骤:??RemoteChunkingManagerStepBuilderFactory??

用于读取项目并将其发送给工作人员的项目读取器向工作人员发送请求的输出通道(“传出请求”)用于接收工作人员回复的输入通道(“传入回复”)

无需显式配置 和 . (如果找到这样做的原因,您仍然可以显式配置它们)。??ChunkMessageChannelItemWriter????MessagingTemplate??

在工作线程端,允许您将工作线程配置为:??RemoteChunkingWorkerBuilder??

侦听管理器在输入通道上发送的请求(“传入请求”)为每个请求调用 的方法 使用配置和handleChunkChunkProcessorChunkHandlerItemProcessorItemWriter在输出通道上向经理发送回复(“传出回复”)

无需显式配置 和 。(如果您找到,您仍然可以显式配置它们 这样做的理由)。??SimpleChunkProcessor????ChunkProcessorChunkHandler??

以下示例演示如何使用这些 API:

@EnableBatchIntegration@EnableBatchProcessingpublic class RemoteChunkingJobConfiguration { @Configuration public static class ManagerConfiguration { @Autowired private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory; @Bean public TaskletStep managerStep() { return this.managerStepBuilderFactory.get(“managerStep”) .chunk(100) .reader(itemReader()) .outputChannel(requests()) // requests sent to workers .inputChannel(replies()) // replies received from workers .build(); } // Middleware beans setup omitted } @Configuration public static class WorkerConfiguration { @Autowired private RemoteChunkingWorkerBuilder workerBuilder; @Bean public IntegrationFlow workerFlow() { return this.workerBuilder .itemProcessor(itemProcessor()) .itemWriter(itemWriter()) .inputChannel(requests()) // requests received from the manager .outputChannel(replies()) // replies sent to the manager .build(); } // Middleware beans setup omitted }}

您可以??在此处??找到远程分块作业的完整示例。

远程分区

下图显示了典型的远程分区情况:

图5.远程分区

另一方面,远程分区在以下情况下很有用 不是项目的处理,而是关联的 I/O 导致瓶颈。通过远程分区,您可以发送工作 给执行完整春季批处理的工人 步骤。因此,每个工作线程都有自己的 、 和 。为此,春季批次 集成提供了 .??ItemReader????ItemProcessor????ItemWriter????MessageChannelPartitionHandler??

此接口实现使用实例来 向远程工作人员发送说明并接收他们的回复。 这从传输(例如 JMS)中提供了一个很好的抽象 和 AMQP),用于与远程工作人员进行通信。??PartitionHandler????MessageChannel??

“可伸缩性”一章中涉及远程分区的部分概述了这些概念和 配置远程分区所需的组件,并显示 使用默认值分区的示例 在单独的本地执行线程中。用于远程分区 对于多个 JVM,需要两个附加组件:??TaskExecutorPartitionHandler??

远程交换矩阵或网格环境支持所需 远程结构或网格环境PartitionHandler

与远程分块类似,您可以使用 JMS 作为“远程结构”。在这种情况下,请使用 一个实例作为实现, 如前所述。??MessageChannelPartitionHandler????PartitionHandler??

以下示例假定现有分区作业,并重点介绍 XML 中的 和 JMS 配置:??MessageChannelPartitionHandler??

XML 配置

<bean id=”partitionHandler” class=”org.springframework.batch.integration.partition.MessageChannelPartitionHandler”> <property name=”stepName” value=”step1″/> <property name=”gridSize” value=”3″/> <property name=”replyChannel” ref=”outbound-replies”/> <property name=”messagingOperations”> <bean class=”org.springframework.integration.core.MessagingTemplate”> <property name=”defaultChannel” ref=”outbound-requests”/> <property name=”receiveTimeout” value=”100000″/> </bean> </property></bean><int:channel id=”outbound-requests”/><int-jms:outbound-channel-adapter destination=”requestsQueue” channel=”outbound-requests”/><int:channel id=”inbound-requests”/><int-jms:message-driven-channel-adapter destination=”requestsQueue” channel=”inbound-requests”/><bean id=”stepExecutionRequestHandler” class=”org.springframework.batch.integration.partition.StepExecutionRequestHandler”> <property name=”jobExplorer” ref=”jobExplorer”/> <property name=”stepLocator” ref=”stepLocator”/></bean><int:service-activator ref=”stepExecutionRequestHandler” input-channel=”inbound-requests” output-channel=”outbound-staging”/><int:channel id=”outbound-staging”/><int-jms:outbound-channel-adapter destination=”stagingQueue” channel=”outbound-staging”/><int:channel id=”inbound-staging”/><int-jms:message-driven-channel-adapter destination=”stagingQueue” channel=”inbound-staging”/><int:aggregator ref=”partitionHandler” input-channel=”inbound-staging” output-channel=”outbound-replies”/><int:channel id=”outbound-replies”> <int:queue/></int:channel><bean id=”stepLocator” class=”org.springframework.batch.integration.partition.BeanFactoryStepLocator” />

以下示例假定现有分区作业,重点介绍 Java 中的 和 JMS 配置:??MessageChannelPartitionHandler??

爪哇配置

/* * Configuration of the manager side */@Beanpublic PartitionHandler partitionHandler() { MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler(); partitionHandler.setStepName(“step1”); partitionHandler.setGridSize(3); partitionHandler.setReplyChannel(outboundReplies()); MessagingTemplate template = new MessagingTemplate(); template.setDefaultChannel(outboundRequests()); template.setReceiveTimeout(100000); partitionHandler.setMessagingOperations(template); return partitionHandler;}@Beanpublic QueueChannel outboundReplies() { return new QueueChannel();}@Beanpublic DirectChannel outboundRequests() { return new DirectChannel();}@Beanpublic IntegrationFlow outboundJmsRequests() { return IntegrationFlow.from(“outboundRequests”) .handle(Jms.outboundGateway(connectionFactory()) .requestDestination(“requestsQueue”)) .get();}@Bean@ServiceActivator(inputChannel = “inboundStaging”)public AggregatorFactoryBean partitioningMessageHandler() throws Exception { AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean(); aggregatorFactoryBean.setProcessorBean(partitionHandler()); aggregatorFactoryBean.setOutputChannel(outboundReplies()); // configure other propeties of the aggregatorFactoryBean return aggregatorFactoryBean;}@Beanpublic DirectChannel inboundStaging() { return new DirectChannel();}@Beanpublic IntegrationFlow inboundJmsStaging() { return IntegrationFlow .from(Jms.messageDrivenChannelAdapter(connectionFactory()) .configureListenerContainer(c -> c.subscriptionDurable(false)) .destination(“stagingQueue”)) .channel(inboundStaging()) .get();}/* * Configuration of the worker side */@Beanpublic StepExecutionRequestHandler stepExecutionRequestHandler() { StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler(); stepExecutionRequestHandler.setJobExplorer(jobExplorer); stepExecutionRequestHandler.setStepLocator(stepLocator()); return stepExecutionRequestHandler;}@Bean@ServiceActivator(inputChannel = “inboundRequests”, outputChannel = “outboundStaging”)public StepExecutionRequestHandler serviceActivator() throws Exception { return stepExecutionRequestHandler();}@Beanpublic DirectChannel inboundRequests() { return new DirectChannel();}public IntegrationFlow inboundJmsRequests() { return IntegrationFlow .from(Jms.messageDrivenChannelAdapter(connectionFactory()) .configureListenerContainer(c -> c.subscriptionDurable(false)) .destination(“requestsQueue”)) .channel(inboundRequests()) .get();}@Beanpublic DirectChannel outboundStaging() { return new DirectChannel();}@Beanpublic IntegrationFlow outboundJmsStaging() { return IntegrationFlow.from(“outboundStaging”) .handle(Jms.outboundGateway(connectionFactory()) .requestDestination(“stagingQueue”)) .get();}

还必须确保分区属性映射到 Bean。??handler????partitionHandler??

下面的示例将分区属性映射到 .XML:??handler????partitionHandler??

XML 配置

<job id=”personJob”> <step id=”step1.manager”> <partition partitioner=”partitioner” handler=”partitionHandler”/> … </step></job>

下面的示例将分区属性映射到 爪哇岛:??handler????partitionHandler??

爪哇配置

public Job personJob(JobRepository jobRepository) { return new JobBuilder(“personJob”, jobRepository) .start(stepBuilderFactory.get(“step1.manager”) .partitioner(“step1.worker”, partitioner()) .partitionHandler(partitionHandler()) .build()) .build(); }

您可以??在此处??找到远程分区作业的完整示例。

您可以使用注释来简化遥控器 分区设置。此注释提供了两个对远程分区有用的 bean:??@EnableBatchIntegration??

??RemotePartitioningManagerStepBuilderFactory??:配置管理器步骤??RemotePartitioningWorkerStepBuilderFactory??:配置工作步骤

这些 API 负责配置许多组件,如下图所示:

图6.远程分区配置(带作业存储库轮询)

图7.远程分区配置(带回复聚合)

在经理方面,让您 通过声明以下内容来配置管理器步骤:??RemotePartitioningManagerStepBuilderFactory??

用于对数据进行分区Partitioner向其发送请求的输出通道(“传出请求”)接收工作人员回复的输入通道(“传入回复”)(配置回复聚合时)轮询间隔和超时参数(配置作业存储库轮询时)

无需显式配置 和 。 (如果您找到这样做的理由,您仍然可以显式配置它们)。??MessageChannelPartitionHandler????MessagingTemplate??

在工作线程端,允许您将工作线程配置为:??RemotePartitioningWorkerStepBuilderFactory??

侦听管理器在输入通道上发送的请求(“传入请求”)为每个请求调用 的方法handleStepExecutionRequestHandler在输出通道上向经理发送回复(“传出回复”)

无需显式配置 . (如果您找到这样做的理由,可以显式配置它)。??StepExecutionRequestHandler??

以下示例演示如何使用这些 API:

@Configuration@EnableBatchProcessing@EnableBatchIntegrationpublic class RemotePartitioningJobConfiguration { @Configuration public static class ManagerConfiguration { @Autowired private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory; @Bean public Step managerStep() { return this.managerStepBuilderFactory .get(“managerStep”) .partitioner(“workerStep”, partitioner()) .gridSize(10) .outputChannel(outgoingRequestsToWorkers()) .inputChannel(incomingRepliesFromWorkers()) .build(); } // Middleware beans setup omitted } @Configuration public static class WorkerConfiguration { @Autowired private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory; @Bean public Step workerStep() { return this.workerStepBuilderFactory .get(“workerStep”) .inputChannel(incomingRequestsFromManager()) .outputChannel(outgoingRepliesToManager()) .chunk(100) .reader(itemReader()) .processor(itemProcessor()) .writer(itemWriter()) .build(); } // Middleware beans setup omitted }} 见所未见,闻所未闻。

Spring Batch -集成在春季批处理和Spring集成项目之间

相关文章:

你感兴趣的文章:

标签云: