Spring Batch -项目读写

所有批处理都可以用最简单的形式描述为大量读取 数据,执行某种类型的计算或转换,并写入结果 外。Spring Batch 提供了三个关键接口来帮助执行批量读取和写入:、 和 。??ItemReader????ItemProcessor????ItemWriter??

??ItemReader??

虽然一个简单的概念,但 是提供来自许多数据的方法 不同类型的输入。最一般的例子包括:??ItemReader??

平面文件:平面文件项读取器从平面文件中读取数据行,通常 描述具有由文件中的固定位置定义或分隔的数据字段的记录 通过一些特殊字符(如逗号)。XML:XML 进程 XML,独立于用于解析的技术, 映射和验证对象。输入数据允许验证 XML 文件 针对 XSD 架构。ItemReaders数据库:访问数据库资源以返回可映射到的结果集 要处理的对象。默认的 SQL 实现调用 a 返回对象,如果需要重新启动,请跟踪当前行,存储基本 统计信息,并提供一些稍后将介绍的事务增强功能。ItemReaderRowMapper

还有更多的可能性,但我们在本章中专注于基本的可能性。一个 所有可用实现的完整列表可在??附录 A??中找到。??ItemReader??

??ItemReader??是泛型的基本接口 输入操作,如以下接口定义所示:

public interface ItemReader<T> { T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;}

该方法定义了 的最重要的协定。调用它 返回一个项目,或者如果没有更多项目。项目可能表示 文件、数据库中的行或 XML 文件中的元素。一般预计 这些对象映射到可用的域对象(例如、或其他对象),但在那里 合同中没有要求这样做。??read????ItemReader????null????Trade????Foo??

预计接口的实现仅是向前的。 但是,如果基础资源是事务性的(例如 JMS 队列),则在回滚方案中,调用可能会在后续调用中返回相同的逻辑项。是的 还值得注意的是,缺少要处理的项目不会导致 要抛出的异常。例如,配置了 返回 0 结果的查询在第一次调用 时返回。??ItemReader????read????ItemReader????ItemReader????null????read??

??ItemWriter??

??ItemWriter??在功能上类似于 ,但具有反向操作。 资源仍然需要定位、打开和关闭,但它们的不同之处在于写出,而不是读入。对于数据库或队列, 这些操作可以是插入、更新或发送。序列化的格式 输出特定于每个批处理作业。??ItemReader????ItemWriter??

与 一样,是一个相当通用的接口,如以下接口定义所示:??ItemReader????ItemWriter??

public interface ItemWriter<T> { void write(Chunk<? extends T> items) throws Exception;}

与 on 一样,提供了 的基本协定。它 尝试写出传入的项目列表,只要它处于打开状态。因为它是 通常期望项目一起“批处理”成一个块,然后输出, 接口接受项目列表,而不是项目本身。写出后 列表,可以在从写入返回之前执行任何可能需要的刷新 方法。例如,如果写入休眠DAO,则可以进行多次调用写入, 每个项目一个。然后,编写器可以在之前调用休眠会话 返回。??read????ItemReader????write????ItemWriter????flush??

??ItemStream??

两者都很好地服务于各自的目的,但有一个 他们俩的共同关注点是需要另一个界面。一般来说,作为 批处理作业的一部分范围,需要打开、关闭读取器和写入器,并且 需要一种持久状态的机制。该接口服务于此目的, 如以下示例所示:??ItemReaders????ItemWriters????ItemStream??

public interface ItemStream { void open(ExecutionContext executionContext) throws ItemStreamException; void update(ExecutionContext executionContext) throws ItemStreamException; void close() throws ItemStreamException;}

在描述每种方法之前,我们应该提到.同样实现的客户端应在调用 之前调用 ,以便打开任何资源(如文件)或获取连接。一个类似的 限制适用于实现 的 。如中所述 第2章,如果在 中找到预期数据,则可以使用它来启动 或位于其初始状态以外的位置。相反,调用 是为了确保安全释放在打开期间分配的任何资源。 调用主要是为了确保当前被持有的任何状态都加载到 提供的 .在提交之前调用此方法,以确保 当前状态在提交之前保留在数据库中。??ExecutionContext????ItemReader????ItemStream????open????read????ItemWriter????ItemStream????ExecutionContext????ItemReader????ItemWriter????close????update????ExecutionContext??

在特殊情况下,一个的客户端是(从 Spring 批处理核心),为每个步骤执行创建一个,以允许用户 存储特定执行的状态,并期望在以下情况下返回该状态 同样的事情再次开始。对于那些熟悉石英的人来说,语义 与石英非常相似。??ItemStream????Step????ExecutionContext????JobInstance????JobDataMap??

委托模式和注册步骤

请注意,这是委派模式的一个示例,即 常见于春季批次。委托本身可以实现回调接口, 如。如果有,并且是否与Spring结合使用 批处理核心作为 a 的一部分,那么它们几乎肯定需要 使用 手动注册。直接的读取器、写入器或处理器 如果它实现或接口,则自动连接到 Get 注册。但是,由于代表不为, 它们需要作为侦听器或流(或两者,如果适用)注入。??CompositeItemWriter????StepListener????Step????Job????Step????Step????ItemStream????StepListener????Step??

下面的示例演示如何在 XML 中将委托作为流注入:

XML 配置

<job id=”ioSampleJob”> <step name=”step1″> <tasklet> <chunk reader=”fooReader” processor=”fooProcessor” writer=”compositeItemWriter” commit-interval=”2″> <streams> <stream ref=”barWriter” /> </streams> </chunk> </tasklet> </step></job><bean id=”compositeItemWriter” class=”…CustomCompositeItemWriter”> <property name=”delegate” ref=”barWriter” /></bean><bean id=”barWriter” class=”…BarWriter” />

下面的示例演示如何在 XML 中将委托作为流注入:

爪哇配置

@Beanpublic Job ioSampleJob(JobRepository jobRepository) { return new JobBuilder(“ioSampleJob”, jobRepository) .start(step1()) .build();}@Beanpublic Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new StepBuilder(“step1”, jobRepository) .<String, String>chunk(2, transactionManager) .reader(fooReader()) .processor(fooProcessor()) .writer(compositeItemWriter()) .stream(barWriter()) .build();}@Beanpublic CustomCompositeItemWriter compositeItemWriter() { CustomCompositeItemWriter writer = new CustomCompositeItemWriter(); writer.setDelegate(barWriter()); return writer;}@Beanpublic BarWriter barWriter() { return new BarWriter();}平面文件

交换批量数据的最常见机制之一一直是平面 文件。与XML不同,XML有一个商定的标准来定义它的结构 (XSD),任何阅读平面文件的人都必须提前了解文件的确切情况。 结构。通常,所有平面文件都分为两种类型:分隔文件和固定长度。 分隔文件是指字段由分隔符(如逗号)分隔的文件。 固定长度文件具有设定长度的字段。

这??FieldSet??

在 Spring 批处理中处理平面文件时,无论是用于输入还是 输出,最重要的类之一是 .许多体系结构和 库包含用于帮助您从文件中读入的抽象,但它们通常 返回一个 OR 对象数组。这真的只能让你成功一半 那里。A 是 Spring Batch 的抽象,用于启用字段的绑定 文件资源。它允许开发人员以与 它们将使用数据库输入。A 在概念上类似于 JDBC。A 只需要一个参数:一个令牌数组。 或者,您还可以配置字段的名称,以便字段可以是 按索引或名称访问,如 之后的模式,如下所示 例:??FieldSet????String????String????FieldSet????FieldSet????ResultSet????FieldSet????String????ResultSet??

String[] tokens = new String[]{“foo”, “1”, “true”};FieldSet fs = new DefaultFieldSet(tokens);String name = fs.readString(0);int value = fs.readInt(1);boolean booleanValue = fs.readBoolean(2);

界面上还有更多选项,例如、long等。最大的优点是它提供了 平面文件输入的一致解析。而不是每个批处理作业以不同的方式解析 潜在的意外方式,它可以是一致的,无论是在处理由 格式异常,或在执行简单数据转换时。??FieldSet????Date????BigDecimal????FieldSet??

??FlatFileItemReader??

平面文件是最多包含二维(表格)数据的任何类型的文件。 在 Spring Batch 框架中读取平面文件由名为 的类提供读取和解析平面文件的基本功能 文件。的两个最重要的必需依赖项是 和 。该界面将在下一页中进一步探讨 部分。资源属性表示一个 弹簧核心 。文档 解释如何制作这种类型的豆子可以在春天找到 框架,第 5 章。资源。因此,本指南不详细介绍 创建对象除了显示以下简单示例之外:??FlatFileItemReader????FlatFileItemReader????Resource????LineMapper????LineMapper????Resource????Resource??

Resource resource = new FileSystemResource(“resources/trades.csv”);

在复杂的批处理环境中,目录结构通常由企业应用程序集成 (EAI) 管理 基础结构,其中为移动文件建立外部接口的放置区 从 FTP 位置到批处理位置,反之亦然。文件移动实用程序 超出了 Spring Batch 架构的范围,但对于 batch 来说并不罕见 作业流,将文件移动实用程序作为作业流中的步骤包括在内。批次 体系结构只需要知道如何找到要处理的文件。春季批次 从此起点开始将数据馈送到管道的过程。但是,Spring Integration提供了许多 这些类型的服务。

中的其他属性允许您进一步指定数据的方式 解释,如下表所述:??FlatFileItemReader??

表 1. 性能??FlatFileItemReader??

财产

类型

描述

评论

字符串[]

指定指示注释行的行前缀。

编码

字符串

指定要使用的文本编码。默认值为 。??UTF-8??

行映射器

??LineMapper??

将 a 转换为表示项的 。??String????Object??

行到跳过

国际

文件顶部要忽略的行数。

记录分隔符策略

记录分隔符策略

用于确定行尾的位置 并执行诸如在引号字符串内以行结尾继续之类的操作。

资源

??Resource??

要从中读取的资源。

skippedLinesCallback

LineCallbackHandler

传递原始行内容的接口 文件中要跳过的行。如果设置为 2,则此接口为 叫了两次。??linesToSkip??

严格

布尔

在严格模式下,读取器在 if 输入资源不存在。否则,它会记录问题并继续。??ExecutionContext??

??LineMapper??

与 一样,它采用低级构造,例如 和 返回 平面文件处理需要相同的结构来转换一行 到 ,如以下接口定义所示:??RowMapper????ResultSet????Object????String????Object??

public interface LineMapper<T> { T mapLine(String line, int lineNumber) throws Exception;}

基本合同是,给定当前行和行号。 关联时,映射器应返回生成的域对象。这类似于 ,因为每行都与其行号相关联,就像 中的每一行都与其行号相关联一样。这允许将行号绑定到 生成的域对象,用于标识比较或提供更多信息的日志记录。然而 与 不同的是,它被赋予了一条原始行,如上所述,它只 让你走到一半。该行必须标记为 ,然后可以是 映射到对象,如本文档后面所述。??RowMapper????ResultSet????RowMapper????LineMapper????FieldSet??

??LineTokenizer??

将一行输入转换为 a 的抽象是必要的,因为存在 可以是需要转换为 的多种格式的平面文件数据。在 弹簧批,这个接口是:??FieldSet????FieldSet????LineTokenizer??

public interface LineTokenizer { FieldSet tokenize(String line);}

a的协定是这样的,给定一行输入(理论上可以包含不止一行),表示该行的a是 返回。然后可以将其传递给 .春季批次包含 以下实现:??LineTokenizer????String????FieldSet????FieldSet????FieldSetMapper????LineTokenizer??

??DelimitedLineTokenizer??:用于记录中的字段由 定界符。最常见的分隔符是逗号,但经常使用管道或分号 也。??FixedLengthTokenizer??:用于记录中每个字段都是“固定”的文件 宽度”。必须为每种记录类型定义每个字段的宽度。??PatternMatchingCompositeLineTokenizer??:确定在 应通过检查模式在特定行上使用分词器。LineTokenizer??FieldSetMapper??

该接口定义了单个方法 ,该方法采用一个对象并将其内容映射到一个对象。此对象可能是自定义 DTO,一个 域对象或数组,具体取决于作业的需要。这是 与 结合使用 从资源转换一行数据 到所需类型的对象中,如以下接口定义所示:??FieldSetMapper????mapFieldSet????FieldSet????FieldSetMapper????LineTokenizer??

public interface FieldSetMapper<T> { T mapFieldSet(FieldSet fieldSet) throws BindException;}

使用的模式与 使用的模式相同。??RowMapper????JdbcTemplate??

??DefaultLineMapper??

现在,已经定义了用于读取平面文件的基本接口,它变为 明确需要三个基本步骤:

从文件中读取一行。将该行传递到方法中以检索 .StringLineTokenizer#tokenize()FieldSet将返回的 from 标记化传递给 ,返回 方法的结果。FieldSetFieldSetMapperItemReader#read()

上面描述的两个接口表示两个单独的任务:将行转换为 a 和将 a 映射到域对象。因为 a 的输入与 (a 行) 的输入匹配,而 a 的输出与 的输出匹配,所以默认实现 同时使用 A 和 A 提供。这 在以下类定义中显示,表示大多数用户需要的行为:??FieldSet????FieldSet????LineTokenizer????LineMapper????FieldSetMapper????LineMapper????LineTokenizer????FieldSetMapper????DefaultLineMapper??

public class DefaultLineMapper<T> implements LineMapper<>, InitializingBean { private LineTokenizer tokenizer; private FieldSetMapper<T> fieldSetMapper; public T mapLine(String line, int lineNumber) throws Exception { return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line)); } public void setLineTokenizer(LineTokenizer tokenizer) { this.tokenizer = tokenizer; } public void setFieldSetMapper(FieldSetMapper<T> fieldSetMapper) { this.fieldSetMapper = fieldSetMapper; }}

上述功能在默认实现中提供,而不是构建 进入阅读器本身(如在以前版本的框架中所做的那样)以允许用户 在控制解析过程方面具有更大的灵活性,尤其是在访问原始数据时 需要行。

简单分隔文件读取示例

下面的示例演示如何使用实际域方案读取平面文件。 此特定批处理作业从以下文件中读取足球运动员:

ID,lastName,firstName,position,birthYear,debutYear”AbduKa00,Abdul-Jabbar,Karim,rb,1974,1996″,”AbduRa00,Abdullah,Rabih,rb,1975,1999″,”AberWa00,Abercrombie,Walter,rb,1959,1982″,”AbraDa00,Abramowicz,Danny,wr,1945,1967″,”AdamBo00,Adams,Bob,te,1946,1969″,”AdamCh00,Adams,Charlie,wr,1979,2003″

此文件的内容映射到以下域对象:??Player??

public class Player implements Serializable { private String ID; private String lastName; private String firstName; private String position; private int birthYear; private int debutYear; public String toString() { return “PLAYER:ID=” + ID + “,Last Name=” + lastName + “,First Name=” + firstName + “,Position=” + position + “,Birth Year=” + birthYear + “,DebutYear=” + debutYear; } // setters and getters…}

要将 a 映射到对象中,返回玩家需要 要定义,如以下示例所示:??FieldSet????Player????FieldSetMapper??

protected static class PlayerFieldSetMapper implements FieldSetMapper<Player> { public Player mapFieldSet(FieldSet fieldSet) { Player player = new Player(); player.setID(fieldSet.readString(0)); player.setLastName(fieldSet.readString(1)); player.setFirstName(fieldSet.readString(2)); player.setPosition(fieldSet.readString(3)); player.setBirthYear(fieldSet.readInt(4)); player.setDebutYear(fieldSet.readInt(5)); return player; }}

然后可以通过正确构造 和 调用来读取该文件,如以下示例所示:??FlatFileItemReader????read??

FlatFileItemReader<Player> itemReader = new FlatFileItemReader<>();itemReader.setResource(new FileSystemResource(“resources/players.csv”));DefaultLineMapper<Player> lineMapper = new DefaultLineMapper<>();//DelimitedLineTokenizer defaults to comma as its delimiterlineMapper.setLineTokenizer(new DelimitedLineTokenizer());lineMapper.setFieldSetMapper(new PlayerFieldSetMapper());itemReader.setLineMapper(lineMapper);itemReader.open(new ExecutionContext());Player player = itemReader.read();

每次调用 都会从文件中的每一行返回一个新对象。当文件末尾 到达,返回。??read????Player????null??

按名称映射字段

两者都允许还有一个附加功能,并且在功能上类似于 京东 .字段的名称可以注入到这些实现中的任何一个中,以提高映射函数的可读性。 首先,将平面文件中所有字段的列名注入到分词器中, 如以下示例所示:??DelimitedLineTokenizer????FixedLengthTokenizer????ResultSet????LineTokenizer??

tokenizer.setNames(new String[] {“ID”, “lastName”, “firstName”, “position”, “birthYear”, “debutYear”});

A 可以按如下方式使用此信息:??FieldSetMapper??

public class PlayerMapper implements FieldSetMapper<Player> { public Player mapFieldSet(FieldSet fs) { if (fs == null) { return null; } Player player = new Player(); player.setID(fs.readString(“ID”)); player.setLastName(fs.readString(“lastName”)); player.setFirstName(fs.readString(“firstName”)); player.setPosition(fs.readString(“position”)); player.setDebutYear(fs.readInt(“debutYear”)); player.setBirthYear(fs.readInt(“birthYear”)); return player; }}自动将字段集映射到域对象

对于许多人来说,必须写一个特定的和写一样麻烦 a 特定 a .Spring Batch 通过提供 通过将字段名称与资源库匹配来自动映射字段 在对象上使用 JavaBean 规范。??FieldSetMapper????RowMapper????JdbcTemplate????FieldSetMapper??

再次使用足球示例,配置看起来像 XML 中的以下代码片段:??BeanWrapperFieldSetMapper??

XML 配置

<bean id=”fieldSetMapper” class=”org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper”> <property name=”prototypeBeanName” value=”player” /></bean><bean id=”player” class=”org.springframework.batch.sample.domain.Player” scope=”prototype” />

再次使用足球示例,配置看起来像 Java 中的以下代码片段:??BeanWrapperFieldSetMapper??

@Beanpublic FieldSetMapper fieldSetMapper() { BeanWrapperFieldSetMapper fieldSetMapper = new BeanWrapperFieldSetMapper(); fieldSetMapper.setPrototypeBeanName(“player”); return fieldSetMapper;}@Bean@Scope(“prototype”)public Player player() { return new Player();}

对于 中的每个条目,映射器都会在新的 对象的实例(因此,需要原型范围)在 与 Spring 容器查找与属性名称匹配的二传手的方式相同。每个可用 映射中的字段,并返回结果对象,没有 代码必填。??FieldSet????Player????FieldSet????Player??

固定长度文件格式

到目前为止,只详细讨论了分隔文件。但是,它们代表 只有一半的文件读取图片。许多使用平面文件的组织使用固定 长度格式。固定长度文件示例如下:

UK21341EAH4121131.11customer1UK21341EAH4221232.11customer2UK21341EAH4321333.11customer3UK21341EAH4421434.11customer4UK21341EAH4521535.11customer5

虽然这看起来像一个大字段,但它实际上表示 4 个不同的字段:

ISIN:所订购商品的唯一标识符 – 长度为 12 个字符。数量:订购商品的编号 – 3 个字符长。价格:商品价格 – 5 个字符长。客户:订购商品的客户的 ID – 9 个字符长。

配置 时,必须提供这些长度中的每一个 以范围的形式。??FixedLengthLineTokenizer??

下面的示例演示如何为 .XML:??FixedLengthLineTokenizer??

XML 配置

<bean id=”fixedLengthLineTokenizer” class=”org.springframework.batch.item.file.transform.FixedLengthTokenizer”> <property name=”names” value=”ISIN,Quantity,Price,Customer” /> <property name=”columns” value=”1-12, 13-15, 16-20, 21-29″ /></bean>

因为使用相同的接口 前面讨论过,它返回的内容与使用分隔符相同。这 允许使用相同的方法来处理其输出,例如使用 .??FixedLengthLineTokenizer????LineTokenizer????FieldSet????BeanWrapperFieldSetMapper??

要支持上述范围语法,需要在 中配置专用属性编辑器 。然而,这个豆子 在使用批处理命名空间的位置中自动声明。??RangeArrayPropertyEditor????ApplicationContext????ApplicationContext??

下面的示例演示如何为 爪哇岛:??FixedLengthLineTokenizer??

爪哇配置

@Beanpublic FixedLengthTokenizer fixedLengthTokenizer() { FixedLengthTokenizer tokenizer = new FixedLengthTokenizer(); tokenizer.setNames(“ISIN”, “Quantity”, “Price”, “Customer”); tokenizer.setColumns(new Range(1, 12), new Range(13, 15), new Range(16, 20), new Range(21, 29)); return tokenizer;}

因为使用相同的接口 上面讨论过,它返回的内容与使用分隔符相同。这 允许使用相同的方法来处理其输出,例如使用 .??FixedLengthLineTokenizer????LineTokenizer????FieldSet????BeanWrapperFieldSetMapper??

单个文件中的多种记录类型

到目前为止,所有文件读取示例都对 简单起见:文件中的所有记录都具有相同的格式。但是,这可能会 情况并非总是如此。一个文件可能具有不同 需要以不同的方式标记化并映射到不同对象的格式。这 以下文件摘录说明了这一点:

USER;Smith;Peter;;T;20014539;FLINEA;1044391041ABC037.49G201XX1383.12HLINEB;2134776319DEF422.99M005LI

在这个文件中,我们有三种类型的记录,“USER”、“LINEA”和“LINEB”。“用户”行 对应于一个对象。“LINEA”和“LINEB”都对应对象, 尽管“LINEA”比“LINEB”具有更多信息。??User????Line??

单独读取每一行,但我们必须指定不同的 和 对象,以便接收 正确的项目。通过允许地图使这变得容易 要配置的模式和要配置的模式。??ItemReader????LineTokenizer????FieldSetMapper????ItemWriter????PatternMatchingCompositeLineMapper????LineTokenizers????FieldSetMappers??

下面的示例演示如何为 .XML:??FixedLengthLineTokenizer??

<bean id=”orderFileLineMapper” class=”org.spr…PatternMatchingCompositeLineMapper”> <property name=”tokenizers”> <map> <entry key=”USER*” value-ref=”userTokenizer” /> <entry key=”LINEA*” value-ref=”lineATokenizer” /> <entry key=”LINEB*” value-ref=”lineBTokenizer” /> </map> </property> <property name=”fieldSetMappers”> <map> <entry key=”USER*” value-ref=”userFieldSetMapper” /> <entry key=”LINE*” value-ref=”lineFieldSetMapper” /> </map> </property></bean>@Beanpublic PatternMatchingCompositeLineMapper orderFileLineMapper() { PatternMatchingCompositeLineMapper lineMapper = new PatternMatchingCompositeLineMapper(); Map<String, LineTokenizer> tokenizers = new HashMap<>(3); tokenizers.put(“USER*”, userTokenizer()); tokenizers.put(“LINEA*”, lineATokenizer()); tokenizers.put(“LINEB*”, lineBTokenizer()); lineMapper.setTokenizers(tokenizers); Map<String, FieldSetMapper> mappers = new HashMap<>(2); mappers.put(“USER*”, userFieldSetMapper()); mappers.put(“LINE*”, lineFieldSetMapper()); lineMapper.setFieldSetMappers(mappers); return lineMapper;}

在此示例中,“LINEA”和“LINEB”具有单独的实例,但它们都使用 一样。??LineTokenizer????FieldSetMapper??

使用方法 以便为每行选择正确的代表。允许 两个具有特殊含义的通配符:问号 (“?”) 正好匹配一个 字符,而星号 (“*”) 匹配零个或多个字符。请注意,在 在配置之前,所有模式都以星号结尾,使它们有效地 行的前缀。始终匹配最具体的模式 可能,无论配置中的顺序如何。因此,如果“LINE*”和“LINEA*”是 两者都列为模式,“LINEA”将匹配模式“LINEA*”,而“LINEB”将匹配 模式“线*”。此外,单个星号 (“*”) 可以通过匹配作为默认值 任何其他模式不匹配的任何线条。??PatternMatchingCompositeLineMapper????PatternMatcher#match????PatternMatcher????PatternMatcher??

下面的示例演示如何匹配 XML 中任何其他模式不匹配的行:

XML 配置

<entry key=”*” value-ref=”defaultLineTokenizer” />

以下示例演示如何匹配 Java 中任何其他模式不匹配的行:

…tokenizers.put(“*”, defaultLineTokenizer());…

还有一个可用于标记化 独自。??PatternMatchingCompositeLineTokenizer??

平面文件包含每个记录跨越多行的记录也很常见。自 处理这种情况,需要更复杂的策略。对此的演示 在样本中可以找到常见的模式。??multiLineRecords??

平面文件中的异常处理

在许多情况下,标记行可能会导致引发异常。多 平面文件不完美,包含格式不正确的记录。许多用户选择 在记录问题、原始行和行时跳过这些错误行 数。稍后可以手动检查这些日志,也可以通过其他批处理作业检查这些日志。为此 原因,Spring Batch 提供了用于处理解析异常的异常层次结构:和 。 是 当尝试读取 文件。 由接口的实现引发,指示标记化时遇到的更具体的错误。??FlatFileParseException????FlatFileFormatException????FlatFileParseException????FlatFileItemReader????FlatFileFormatException????LineTokenizer??

??IncorrectTokenCountException??

两者都有能力指定 可用于创建 .但是,如果列数 名称与标记行时找到的列数不匹配,无法创建,并且抛出 ,其中包含 遇到的令牌数和预期数,如以下示例所示:??DelimitedLineTokenizer????FixedLengthLineTokenizer????FieldSet????FieldSet????IncorrectTokenCountException??

tokenizer.setNames(new String[] {“A”, “B”, “C”, “D”});try { tokenizer.tokenize(“a,b,c”);}catch (IncorrectTokenCountException e) { assertEquals(4, e.getExpectedCount()); assertEquals(3, e.getActualCount());}

因为分词器配置了 4 个列名,但在 文件,一个被抛出。??IncorrectTokenCountException??

??IncorrectLineLengthException??

以固定长度格式格式化的文件在解析时有其他要求 因为,与分隔格式不同,每列都必须严格遵守其预定义的 宽度。如果总行长不等于此列的最宽值,则 引发异常,如以下示例所示:

tokenizer.setColumns(new Range[] { new Range(1, 5), new Range(6, 10), new Range(11, 15) });try { tokenizer.tokenize(“12345”); fail(“Expected IncorrectLineLengthException”);}catch (IncorrectLineLengthException ex) { assertEquals(15, ex.getExpectedLength()); assertEquals(5, ex.getActualLength());}

上述分词器的配置范围为:1-5、6-10 和 11-15。因此 线路总长度为15。但是,在前面的示例中,长度为 5 的行 被传入,导致被扔出。投掷一个 这里的异常,而不仅仅是映射第一列,允许处理 行将更早失败,并且包含的信息比它所包含的信息更多,如果它失败了,则 尝试在 .但是,在某些情况下 线的长度并不总是恒定的。因此,线长度的验证可以 通过“strict”属性关闭,如以下示例所示:??IncorrectLineLengthException????FieldSetMapper??

tokenizer.setColumns(new Range[] { new Range(1, 5), new Range(6, 10) });tokenizer.setStrict(false);FieldSet tokens = tokenizer.tokenize(“12345”);assertEquals(“12345”, tokens.readString(0));assertEquals(“”, tokens.readString(1));

前面的示例与前面的示例几乎相同,只是调用了该示例。此设置告知分词器不强制 标记行时的行长度。A 现已正确创建,并且 返回。但是,它仅包含其余值的空令牌。??tokenizer.setStrict(false)????FieldSet??

??FlatFileItemWriter??

写出到平面文件具有与从文件读入相同的问题和问题 必须克服。步骤必须能够以 交易方式。

??LineAggregator??

正如接口需要将一个项目转换为 ,文件写入必须有一种方法将多个字段聚合成一个字符串 用于写入文件。在春季批次中,这是 ,显示在 以下接口定义:??LineTokenizer????String????LineAggregator??

public interface LineAggregator<T> { public String aggregate(T item);}

与 在逻辑上相反。 取 a 并返回 a ,而取 a 并返回 .??LineAggregator????LineTokenizer????LineTokenizer????String????FieldSet????LineAggregator????item????String??

??PassThroughLineAggregator??

接口最基本的实现是 ,它假定对象已经是一个字符串或 它的字符串表示形式可以写入,如以下代码所示:??LineAggregator????PassThroughLineAggregator??

public class PassThroughLineAggregator<T> implements LineAggregator<T> { public String aggregate(T item) { return item.toString(); }}

如果直接控制创建字符串,则上述实现很有用 必需但优点 ,例如事务和重新启动 支持,是必要的。??FlatFileItemWriter??

简化文件写入示例

现在接口及其最基本的实现 已经定义,基本的编写流程可以 解释:??LineAggregator????PassThroughLineAggregator??

将要写入的对象传递给 以获得 .LineAggregatorString返回的将写入配置的文件。String

以下摘录在代码中表达了这一点:??FlatFileItemWriter??

public void write(T item) throws Exception { write(lineAggregator.aggregate(item) + LINE_SEPARATOR);}

在 XML 中,一个简单的配置示例可能如下所示:

<bean id=”itemWriter” class=”org.spr…FlatFileItemWriter”> <property name=”resource” value=”file:target/test-outputs/output.txt” /> <property name=”lineAggregator”> <bean class=”org.spr…PassThroughLineAggregator”/> </property></bean>

在 Java 中,一个简单的配置示例可能如下所示:

@Beanpublic FlatFileItemWriter itemWriter() { return new FlatFileItemWriterBuilder<Foo>() .name(“itemWriter”) .resource(new FileSystemResource(“target/test-outputs/output.txt”)) .lineAggregator(new PassThroughLineAggregator<>()) .build();}??FieldExtractor??

前面的示例对于写入文件的最基本用法可能很有用。 但是,大多数用户都有一个需要 写出来,因此必须转换为一行。在文件读取中,以下内容是 必填:??FlatFileItemWriter??

从文件中读取一行。将该行传递到方法中,以便检索 .LineTokenizer#tokenize()FieldSet将返回的 from 标记化传递给 ,返回 方法的结果。FieldSetFieldSetMapperItemReader#read()

文件写入具有类似但相反的步骤:

将要写入的项目传递给编写器。将项目上的字段转换为数组。将生成的数组聚合成一行。

因为框架无法知道对象的哪些字段需要 被写出来,必须写才能完成翻转的任务 项到数组中,如以下接口定义所示:??FieldExtractor??

public interface FieldExtractor<T> { Object[] extract(T item);}

接口的实现应从字段创建一个数组 然后可以用分隔符写出提供的对象 元素或作为固定宽度线的一部分。??FieldExtractor??

??PassThroughFieldExtractor??

在许多情况下,集合(如数组)或 需要写出来。从这些集合类型之一中“提取”数组非常 简单。为此,请将集合转换为数组。因此,在这种情况下应使用 。应该指出的是,如果 传入的对象不是一种集合类型,则 返回一个仅包含要提取的项的数组。??Collection????FieldSet????PassThroughFieldExtractor????PassThroughFieldExtractor??

??BeanWrapperFieldExtractor??

与文件读取部分所述一样,它是 通常最好配置如何将域对象转换为对象数组,而不是 比自己写转换。提供这个 功能,如以下示例所示:??BeanWrapperFieldSetMapper????BeanWrapperFieldExtractor??

BeanWrapperFieldExtractor<Name> extractor = new BeanWrapperFieldExtractor<>();extractor.setNames(new String[] { “first”, “last”, “born” });String first = “Alan”;String last = “Turing”;int born = 1912;Name n = new Name(first, last, born);Object[] values = extractor.extract(n);assertEquals(first, values[0]);assertEquals(last, values[1]);assertEquals(born, values[2]);

此提取器实现只有一个必需属性:要 地图。正如需要字段名称将 上的字段映射到所提供对象的 setter 一样,需要名称 映射到用于创建对象数组的 getter。值得注意的是,该 名称确定数组中字段的顺序。??BeanWrapperFieldSetMapper????FieldSet????BeanWrapperFieldExtractor??

分隔文件写入示例

最基本的平面文件格式是所有字段都由分隔符分隔的格式。 这可以使用 .以下示例写入 输出一个简单的域对象,该对象表示客户帐户的信用额度:??DelimitedLineAggregator??

public class CustomerCredit { private int id; private String name; private BigDecimal credit; //getters and setters removed for clarity}

由于正在使用域对象,因此必须提供接口的实现以及要使用的分隔符。??FieldExtractor??

下面的示例演示如何在 XML 中使用 with 分隔符:??FieldExtractor??

XML 配置

<bean id=”itemWriter” class=”org.springframework.batch.item.file.FlatFileItemWriter”> <property name=”resource” ref=”outputResource” /> <property name=”lineAggregator”> <bean class=”org.spr…DelimitedLineAggregator”> <property name=”delimiter” value=”,”/> <property name=”fieldExtractor”> <bean class=”org.spr…BeanWrapperFieldExtractor”> <property name=”names” value=”name,credit”/> </bean> </property> </bean> </property></bean>

以下示例显示了如何在 Java 中使用 with 分隔符:??FieldExtractor??

@Beanpublic FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception { BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>(); fieldExtractor.setNames(new String[] {“name”, “credit”}); fieldExtractor.afterPropertiesSet(); DelimitedLineAggregator<CustomerCredit> lineAggregator = new DelimitedLineAggregator<>(); lineAggregator.setDelimiter(“,”); lineAggregator.setFieldExtractor(fieldExtractor); return new FlatFileItemWriterBuilder<CustomerCredit>() .name(“customerCreditWriter”) .resource(outputResource) .lineAggregator(lineAggregator) .build();}

在前面的示例中,前面描述的 章节用于将其中的名称和信用字段转换为对象 数组,然后在每个字段之间用逗号写出。??BeanWrapperFieldExtractor????CustomerCredit??

也可以使用 自动创建 AND,如以下示例所示:??FlatFileItemWriterBuilder.DelimitedBuilder????BeanWrapperFieldExtractor????DelimitedLineAggregator??

@Beanpublic FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception { return new FlatFileItemWriterBuilder<CustomerCredit>() .name(“customerCreditWriter”) .resource(outputResource) .delimited() .delimiter(“|”) .names(new String[] {“name”, “credit”}) .build();}固定宽度文件写入示例

分隔不是平面文件格式的唯一类型。许多人喜欢使用设定的宽度 在字段之间描绘的每一列,这通常称为“固定宽度”。 Spring Batch 在文件写入中支持此功能。??FormatterLineAggregator??

使用上述相同的域对象,它可以配置为 在 XML 中遵循:??CustomerCredit??

<bean id=”itemWriter” class=”org.springframework.batch.item.file.FlatFileItemWriter”> <property name=”resource” ref=”outputResource” /> <property name=”lineAggregator”> <bean class=”org.spr…FormatterLineAggregator”> <property name=”fieldExtractor”> <bean class=”org.spr…BeanWrapperFieldExtractor”> <property name=”names” value=”name,credit” /> </bean> </property> <property name=”format” value=”%-9s%-2.0f” /> </bean> </property></bean>

使用上述相同的域对象,它可以配置为 在爪哇中遵循:??CustomerCredit??

@Beanpublic FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception { BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>(); fieldExtractor.setNames(new String[] {“name”, “credit”}); fieldExtractor.afterPropertiesSet(); FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>(); lineAggregator.setFormat(“%-9s%-2.0f”); lineAggregator.setFieldExtractor(fieldExtractor); return new FlatFileItemWriterBuilder<CustomerCredit>() .name(“customerCreditWriter”) .resource(outputResource) .lineAggregator(lineAggregator) .build();}

前面的大多数示例应该看起来很熟悉。但是,格式的值 物业很新。

下面的示例演示 XML 格式的格式属性:

<property name=”format” value=”%-9s%-2.0f” />

以下示例显示了 Java 中的格式属性:

…FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();lineAggregator.setFormat(“%-9s%-2.0f”);…

底层实现是使用与 Java 5 相同的部分构建的。Java基于C编程的功能 语言。有关如何配置格式化程序的大多数详细信息,请参见格式化程序的Javadoc。??Formatter????Formatter????printf??

也可以使用 自动创建 AND,如以下示例所示:??FlatFileItemWriterBuilder.FormattedBuilder????BeanWrapperFieldExtractor????FormatterLineAggregator??

@Beanpublic FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception { return new FlatFileItemWriterBuilder<CustomerCredit>() .name(“customerCreditWriter”) .resource(outputResource) .formatted() .format(“%-9s%-2.0f”) .names(new String[] {“name”, “credit”}) .build();}处理文件创建

??FlatFileItemReader??与文件资源的关系非常简单。当阅读器 初始化后,它将打开文件(如果存在),如果不存在,则会引发异常。 文件编写并不是那么简单。乍一看,似乎很相似 应存在以下直接合约: 如果文件已经 存在,引发异常,如果不存在,则创建它并开始编写。然而 重新启动 可能会导致问题。在正常重启方案中, 合约反转:如果文件存在,则从上一个已知良好的文件开始写入该文件 位置,如果没有,则引发异常。但是,如果文件名 因为这份工作总是一样吗?在这种情况下,如果文件,您可能需要删除该文件 存在,除非重新启动。由于这种可能性,包含属性 .将此属性设置为 true 会导致 打开编写器时要删除的同名现有文件。??FlatFileItemWriter????Job????FlatFileItemWriter????shouldDeleteIfExists??

XML 项目读取器和写入器

Spring Batch 为读取 XML 记录和 将它们映射到 Java 对象以及将 Java 对象编写为 XML 记录。

对流式处理 XML 的约束

StAX API 用于 I/O,因为其他标准 XML 解析 API 不适合批处理 处理要求(DOM 一次将整个输入加载到内存中,SAX 控件 允许用户仅提供回调的解析过程)。

我们需要考虑 XML 输入和输出在 Spring Batch 中的工作方式。首先,有一个 一些概念与文件读取和写入不同,但在 Spring Batch 中很常见 XML 处理。使用 XML 处理,而不是需要的记录行(实例) 要进行标记化,假定 XML 资源是“片段”的集合 对应于单个记录,如下图所示:??FieldSet??

图1.XML 输入

在上述场景中,“trade”标签被定义为“根元素”。万事 在“<贸易>”和“</贸易>”之间被认为是一个“片段”。春季批次 使用对象/XML 映射 (OXM) 将片段绑定到对象。但是,春季批次不是 绑定到任何特定的 XML 绑定技术。典型的用途是委托给??Spring OXM??,它 为最流行的 OXM 技术提供统一的抽象。对 Spring OXM是可选的,您可以选择实现Spring Batch特定的接口 如果需要。与 OXM 支持的技术的关系显示在 下图:

图2.OXM绑定

通过介绍 OXM 以及如何使用 XML 片段来表示记录,我们 现在可以更仔细地检查读者和作者。

??StaxEventItemReader??

该配置为处理 来自 XML 输入流的记录。首先,考虑以下一组 XML 记录, 罐头工艺:??StaxEventItemReader????StaxEventItemReader??

<?xml versinotallow=”1.0″ encoding=”UTF-8″?><records> <trade xmlns=”https://springframework.org/batch/sample/io/oxm/domain”> <isin>XYZ0001</isin> <quantity>5</quantity> <price>11.39</price> <customer>Customer1</customer> </trade> <trade xmlns=”https://springframework.org/batch/sample/io/oxm/domain”> <isin>XYZ0002</isin> <quantity>2</quantity> <price>72.99</price> <customer>Customer2c</customer> </trade> <trade xmlns=”https://springframework.org/batch/sample/io/oxm/domain”> <isin>XYZ0003</isin> <quantity>9</quantity> <price>99.99</price> <customer>Customer3</customer> </trade></records>

为了能够处理 XML 记录,需要以下各项:

根元素名称:构成 要映射的对象。示例配置通过交易价值证明了这一点。资源:表示要读取的文件的 Spring 资源。??Unmarshaller??:Spring OXM提供的用于映射XML的解组工具 片段到对象。

下面的示例演示如何定义适用于根的 元素命名为 、 的资源和 的解组器 在 XML 中调用:??StaxEventItemReader????trade????data/iosample/input/input.xml????tradeMarshaller??

XML 配置

<bean id=”itemReader” class=”org.springframework.batch.item.xml.StaxEventItemReader”> <property name=”fragmentRootElementName” value=”trade” /> <property name=”resource” value=”org/springframework/batch/item/xml/domain/trades.xml” /> <property name=”unmarshaller” ref=”tradeMarshaller” /></bean>

下面的示例演示如何定义适用于根的 元素命名为 、 的资源和 的解组器 在爪哇中调用:??StaxEventItemReader????trade????data/iosample/input/input.xml????tradeMarshaller??

爪哇配置

@Beanpublic StaxEventItemReader itemReader() { return new StaxEventItemReaderBuilder<Trade>() .name(“itemReader”) .resource(new FileSystemResource(“org/springframework/batch/item/xml/domain/trades.xml”)) .addFragmentRootElements(“trade”) .unmarshaller(tradeMarshaller()) .build();}

请注意,在此示例中,我们选择使用 ,它接受 作为映射传入的别名,其中第一个键和值是片段的名称 (即根元素)和要绑定的对象类型。然后,类似于 , 映射到对象类型中的字段的其他元素的名称描述为 映射中的键/值对。在配置文件中,我们可以使用 Spring 配置 用于描述所需别名的实用程序。??XStreamMarshaller????FieldSet??

下面的示例演示如何在 XML 中描述别名:

XML 配置

<bean id=”tradeMarshaller” class=”org.springframework.oxm.xstream.XStreamMarshaller”> <property name=”aliases”> <util:map id=”aliases”> <entry key=”trade” value=”org.springframework.batch.sample.domain.trade.Trade” /> <entry key=”price” value=”java.math.BigDecimal” /> <entry key=”isin” value=”java.lang.String” /> <entry key=”customer” value=”java.lang.String” /> <entry key=”quantity” value=”java.lang.Long” /> </util:map> </property></bean>

以下示例演示如何在 Java 中描述别名:

爪哇配置

@Beanpublic XStreamMarshaller tradeMarshaller() { Map<String, Class> aliases = new HashMap<>(); aliases.put(“trade”, Trade.class); aliases.put(“price”, BigDecimal.class); aliases.put(“isin”, String.class); aliases.put(“customer”, String.class); aliases.put(“quantity”, Long.class); XStreamMarshaller marshaller = new XStreamMarshaller(); marshaller.setAliases(aliases); return marshaller;}

在输入时,读取器读取 XML 资源,直到它识别出新片段 即将开始。默认情况下,读取器匹配元素名称以识别新的 片段即将开始。阅读器从 片段并将文档传递给反序列化程序(通常是围绕 Spring 的包装器 OXM ) 将 XML 映射到 Java 对象。??Unmarshaller??

总之,此过程类似于以下 Java 代码,它使用 弹簧配置提供的注入:

StaxEventItemReader<Trade> xmlStaxEventItemReader = new StaxEventItemReader<>();Resource resource = new ByteArrayResource(xmlResource.getBytes());Map aliases = new HashMap();aliases.put(“trade”,”org.springframework.batch.sample.domain.trade.Trade”);aliases.put(“price”,”java.math.BigDecimal”);aliases.put(“customer”,”java.lang.String”);aliases.put(“isin”,”java.lang.String”);aliases.put(“quantity”,”java.lang.Long”);XStreamMarshaller unmarshaller = new XStreamMarshaller();unmarshaller.setAliases(aliases);xmlStaxEventItemReader.setUnmarshaller(unmarshaller);xmlStaxEventItemReader.setResource(resource);xmlStaxEventItemReader.setFragmentRootElementName(“trade”);xmlStaxEventItemReader.open(new ExecutionContext());boolean hasNext = true;Trade trade = null;while (hasNext) { trade = xmlStaxEventItemReader.read(); if (trade == null) { hasNext = false; } else { System.out.println(trade); }}??StaxEventItemWriter??

输出与输入对称工作。需要 a , a 编组员,以及.Java 对象被传递给编组器(通常 标准弹簧 OXM Marshaller),它通过使用自定义事件写入 筛选为每个生成的 和 事件的编写器 由 OXM 工具进行片段。??StaxEventItemWriter????Resource????rootTagName????Resource????StartDocument????EndDocument??

以下 XML 示例使用 :??MarshallingEventWriterSerializer??

XML 配置

<bean id=”itemWriter” class=”org.springframework.batch.item.xml.StaxEventItemWriter”> <property name=”resource” ref=”outputResource” /> <property name=”marshaller” ref=”tradeMarshaller” /> <property name=”rootTagName” value=”trade” /> <property name=”overwriteOutput” value=”true” /></bean>

以下 Java 示例使用 :??MarshallingEventWriterSerializer??

爪哇配置

@Beanpublic StaxEventItemWriter itemWriter(Resource outputResource) { return new StaxEventItemWriterBuilder<Trade>() .name(“tradesWriter”) .marshaller(tradeMarshaller()) .resource(outputResource) .rootTagName(“trade”) .overwriteOutput(true) .build();}

上述配置设置了三个必需的属性并设置了可选的属性,本章前面提到用于指定 可以覆盖现有文件。??overwriteOutput=true??

以下 XML 示例使用的封送器与读取示例中使用的封送器相同 在本章前面显示:

XML 配置

<bean id=”customerCreditMarshaller” class=”org.springframework.oxm.xstream.XStreamMarshaller”> <property name=”aliases”> <util:map id=”aliases”> <entry key=”customer” value=”org.springframework.batch.sample.domain.trade.Trade” /> <entry key=”price” value=”java.math.BigDecimal” /> <entry key=”isin” value=”java.lang.String” /> <entry key=”customer” value=”java.lang.String” /> <entry key=”quantity” value=”java.lang.Long” /> </util:map> </property></bean>

以下 Java 示例使用的编组器与阅读示例中使用的编组器相同 在本章前面显示:

爪哇配置

@Beanpublic XStreamMarshaller customerCreditMarshaller() { XStreamMarshaller marshaller = new XStreamMarshaller(); Map<String, Class> aliases = new HashMap<>(); aliases.put(“trade”, Trade.class); aliases.put(“price”, BigDecimal.class); aliases.put(“isin”, String.class); aliases.put(“customer”, String.class); aliases.put(“quantity”, Long.class); marshaller.setAliases(aliases); return marshaller;}

为了总结一个 Java 示例,以下代码说明了所有要点 讨论,演示所需属性的编程设置:

FileSystemResource resource = new FileSystemResource(“data/outputFile.xml”)Map aliases = new HashMap();aliases.put(“trade”,”org.springframework.batch.sample.domain.trade.Trade”);aliases.put(“price”,”java.math.BigDecimal”);aliases.put(“customer”,”java.lang.String”);aliases.put(“isin”,”java.lang.String”);aliases.put(“quantity”,”java.lang.Long”);Marshaller marshaller = new XStreamMarshaller();marshaller.setAliases(aliases);StaxEventItemWriter staxItemWriter = new StaxEventItemWriterBuilder<Trade>() .name(“tradesWriter”) .marshaller(marshaller) .resource(resource) .rootTagName(“trade”) .overwriteOutput(true) .build();staxItemWriter.afterPropertiesSet();ExecutionContext executionContext = new ExecutionContext();staxItemWriter.open(executionContext);Trade trade = new Trade();trade.setPrice(11.39);trade.setIsin(“XYZ0001”);trade.setQuantity(5L);trade.setCustomer(“Customer1”);staxItemWriter.write(trade);JSON 项目读取器和写入器

Spring Batch 支持以以下格式读取和写入 JSON 资源:

[ { “isin”: “123”, “quantity”: 1, “price”: 1.2, “customer”: “foo” }, { “isin”: “456”, “quantity”: 2, “price”: 1.4, “customer”: “bar” }]

假定 JSON 资源是对应于 单个项目。Spring Batch 不绑定到任何特定的 JSON 库。

??JsonItemReader??

委托 JSON 解析和绑定到接口的实现。此接口 旨在通过使用流式处理 API 读取 JSON 对象来实现 以块为单位。目前提供了两种实现:??JsonItemReader????org.springframework.batch.item.json.JsonObjectReader??

??杰克逊??通过org.springframework.batch.item.json.JacksonJsonObjectReader??格森??通过org.springframework.batch.item.json.GsonJsonObjectReader

为了能够处理 JSON 记录,需要以下内容:

??Resource??:表示要读取的 JSON 文件的 Spring 资源。??JsonObjectReader??:用于解析 JSON 对象并将其绑定到项的 JSON 对象读取器

以下示例演示如何定义与 以前的 JSON 资源和基于 Jackson 的 a:??JsonItemReader????org/springframework/batch/item/json/trades.json????JsonObjectReader??

@Beanpublic JsonItemReader<Trade> jsonItemReader() { return new JsonItemReaderBuilder<Trade>() .jsonObjectReader(new JacksonJsonObjectReader<>(Trade.class)) .resource(new ClassPathResource(“trades.json”)) .name(“tradeJsonItemReader”) .build();}??JsonFileItemWriter??

将项的封送委托给接口。合同 这个接口是获取一个对象并将其编组到 JSON 中。 目前提供了两种实现:??JsonFileItemWriter????org.springframework.batch.item.json.JsonObjectMarshaller????String??

??杰克逊??通过org.springframework.batch.item.json.JacksonJsonObjectMarshaller??格森??通过org.springframework.batch.item.json.GsonJsonObjectMarshaller

为了能够写入 JSON 记录,需要以下内容:

??Resource??:表示要写入的 JSON 文件的 SpringResource??JsonObjectMarshaller??:JSON 对象编组器,用于将对象编组为 JSON 格式

下面的示例演示如何定义:??JsonFileItemWriter??

@Beanpublic JsonFileItemWriter<Trade> jsonFileItemWriter() { return new JsonFileItemWriterBuilder<Trade>() .jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>()) .resource(new ClassPathResource(“trades.json”)) .name(“tradeJsonFileItemWriter”) .build();}多文件输入

在单个 .假设 文件都具有相同的格式,支持这种类型的 XML 和平面文件处理的输入。考虑目录中的以下文件:??Step????MultiResourceItemReader??

file-1.txt file-2.txt ignored.txt

文件-1.txt 和文件-2.txt 的格式相同,出于业务原因,应为 一起处理。可用于读取两个文件: 使用通配符。??MultiResourceItemReader??

下面的示例演示如何在 XML 中读取带有通配符的文件:

XML 配置

<bean id=”multiResourceReader” class=”org.spr…MultiResourceItemReader”> <property name=”resources” value=”classpath:data/input/file-*.txt” /> <property name=”delegate” ref=”flatFileItemReader” /></bean>

以下示例演示如何在 Java 中读取带有通配符的文件:

爪哇配置

@Beanpublic MultiResourceItemReader multiResourceReader() { return new MultiResourceItemReaderBuilder<Foo>() .delegate(flatFileItemReader()) .resources(resources()) .build();}

引用的委托是一个简单的 .上面的配置如下: 来自两个文件的输入,处理回滚和重新启动方案。应该指出的是, 与任何 一样,添加额外的输入(在本例中为文件)可能会导致潜在的 重新启动时出现问题。建议批处理作业与自己的个人一起工作 目录,直到成功完成。??FlatFileItemReader????ItemReader??

输入资源按 using 排序,以确保在重新启动方案中的作业运行之间保留资源顺序。??MultiResourceItemReader#setComparator(Comparator)??

数据库

与大多数企业应用程序样式一样,数据库是 批。然而,批次与其他应用风格不同,因为 系统必须使用的数据集。如果 SQL 语句返回 100 万行,则 结果集可能会将所有返回的结果保存在内存中,直到读取所有行。 Spring Batch 针对此问题提供了两种类型的解决方案:

??基于游标的ItemReader实现????分页项目读取器实现??基于游标的实现??ItemReader??

使用数据库游标通常是大多数批处理开发人员的默认方法, 因为它是数据库对“流式传输”关系数据问题的解决方案。这 Java类本质上是一种面向对象的机制,用于操作 光标。A 维护指向当前数据行的游标。调用 会将此光标移动到下一行。Spring Batch 基于游标的实现在初始化时打开一个游标,并将游标向前移动一行 每次调用 ,返回可用于处理的映射对象。然后调用该方法以确保释放所有资源。Spring 核心通过使用回调模式完全映射来解决这个问题 A 中的所有行,并在将控制权返回给方法调用方之前关闭。 但是,在批处理中,这必须等到步骤完成。下图显示了 基于游标的工作原理的通用图。请注意,虽然示例 使用SQL(因为SQL广为人知),任何技术都可以实现基本的 方法。??ResultSet????ResultSet????next????ResultSet????ItemReader????read????close????JdbcTemplate????ResultSet????ItemReader??

图3.游标示例

此示例说明了基本模式。给定一个包含三列的“FOO”表:、 和 ,选择ID 大于 1 但小于 7 的所有行。这 将光标的开头(第 1 行)放在 ID 2 上。此行的结果应为 完全映射的对象。再次调用会将光标移动到下一行, ID 为 3。这些读取的结果在每次之后写出,允许对对象进行垃圾回收(假设没有实例变量 维护对它们的引用)。??ID????NAME????BAR????Foo????read()????Foo????read??

??JdbcCursorItemReader??

??JdbcCursorItemReader??是基于游标技术的 JDBC 实现。它有效 直接使用 a 并需要 SQL 语句才能针对连接运行 从 .以下数据库架构用作示例:??ResultSet????DataSource??

CREATE TABLE CUSTOMER ( ID BIGINT IDENTITY PRIMARY KEY, NAME VARCHAR(45), CREDIT FLOAT);

许多人喜欢对每一行使用域对象,因此以下示例使用 映射对象的接口的实现:??RowMapper????CustomerCredit??

public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> { public static final String ID_COLUMN = “id”; public static final String NAME_COLUMN = “name”; public static final String CREDIT_COLUMN = “credit”; public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException { CustomerCredit customerCredit = new CustomerCredit(); customerCredit.setId(rs.getInt(ID_COLUMN)); customerCredit.setName(rs.getString(NAME_COLUMN)); customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN)); return customerCredit; }}

由于与 共享密钥接口,因此 请参阅如何使用 读取此数据的示例,以便对其进行对比 与 .出于此示例的目的,假设 中有 1,000 行 数据库。第一个示例使用:??JdbcCursorItemReader????JdbcTemplate????JdbcTemplate????ItemReader????CUSTOMER????JdbcTemplate??

//For simplicity sake, assume a dataSource has already been obtainedJdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);List customerCredits = jdbcTemplate.query(“SELECT ID, NAME, CREDIT from CUSTOMER”, new CustomerCreditRowMapper());

运行上述代码片段后,该列表包含 1,000 个对象。在查询方法中,从 获取连接,针对它运行提供的 SQL,并调用该方法 .将此方法与以下示例中所示的方法进行对比:??customerCredits????CustomerCredit????DataSource????mapRow????ResultSet????JdbcCursorItemReader??

JdbcCursorItemReader itemReader = new JdbcCursorItemReader();itemReader.setDataSource(dataSource);itemReader.setSql(“SELECT ID, NAME, CREDIT from CUSTOMER”);itemReader.setRowMapper(new CustomerCreditRowMapper());int counter = 0;ExecutionContext executionContext = new ExecutionContext();itemReader.open(executionContext);Object customerCredit = new Object();while(customerCredit != null){ customerCredit = itemReader.read(); counter++;}itemReader.close();

运行上述代码段后,计数器等于 1,000。如果上面的代码有 将返回的放入列表中,结果将完全是 与示例相同。但是,它的最大优点是它允许“流式传输”项目。该方法可以调用一次,该项 可以通过 写出,然后可以使用 获得下一项。这允许在“块”中完成项目读取和写入并提交 周期性,这是高性能批处理的本质。此外,它 可轻松配置为注入弹簧批次。??customerCredit????JdbcTemplate????ItemReader????read????ItemWriter????read????Step??

下面的示例演示如何在 XML 中注入 :??ItemReader????Step??

XML 配置

<bean id=”itemReader” class=”org.spr…JdbcCursorItemReader”> <property name=”dataSource” ref=”dataSource”/> <property name=”sql” value=”select ID, NAME, CREDIT from CUSTOMER”/> <property name=”rowMapper”> <bean class=”org.springframework.batch.sample.domain.CustomerCreditRowMapper”/> </property></bean>

以下示例显示了如何在 Java 中将 a 注入到中:??ItemReader????Step??

爪哇配置

@Beanpublic JdbcCursorItemReader<CustomerCredit> itemReader() { return new JdbcCursorItemReaderBuilder<CustomerCredit>() .dataSource(this.dataSource) .name(“creditReader”) .sql(“select ID, NAME, CREDIT from CUSTOMER”) .rowMapper(new CustomerCreditRowMapper()) .build();}其他属性

因为在 Java 中打开游标有很多不同的选项,所以有很多 可以设置的属性,如下所述 桌子:??JdbcCursorItemReader??

表 2.JdbcCursorItemReader Properties

忽略警告

确定是记录 SQLWarnings 还是导致异常。 默认值为(表示记录警告)。??true??

获取大小

为 JDBC 驱动程序提供有关应读取的行数的提示 当 使用的对象需要更多行时,从数据库。默认情况下,不会给出任何提示。??ResultSet????ItemReader??

最大行数

设置基础可以的最大行数的限制 随时保持。??ResultSet??

查询超时

将驱动程序等待对象的秒数设置为 跑。如果超出限制,则抛出 a。(请咨询您的司机 供应商文档了解详细信息)。??Statement????DataAccessException??

验证光标位置

因为由持有的相同被传递给 ,用户可以称呼自己,这 可能会导致读取器的内部计数出现问题。将此值设置为原因 如果调用后的光标位置与之前不同,则会引发异常。??ResultSet????ItemReader????RowMapper????ResultSet.next()????true????RowMapper??

保存状态

指示是否应将读取器的状态保存在 提供的 中。缺省值为 。??ExecutionContext????ItemStream#update(ExecutionContext)????true??

驱动程序支持绝对

指示 JDBC 驱动程序是否支持 在 上设置绝对行。建议将其设置为支持 的 JDBC 驱动程序,因为它可能会提高性能, 特别是当一个步骤在处理大型数据集时失败时。默认值为 。??ResultSet????true????ResultSet.absolute()????false??

设置使用共享扩展连接

指示连接是否 用于游标应由所有其他处理使用,因此共享相同的 交易。如果设置为 ,则使用自己的连接打开光标 并且不参与为其余步骤处理而启动的任何事务。 如果将此标志设置为 则必须将数据源包装在 中,以防止连接被关闭和 每次提交后释放。将此选项设置为 时,语句用于 打开 使用“READ_ONLY”和“HOLD_CURSORS_OVER_COMMIT”选项创建光标。 这允许在事务启动时保持游标打开并在 步骤处理。要使用此功能,您需要一个支持此功能的数据库和一个 JDBC 支持 JDBC 3.0 或更高版本的驱动程序。默认值为 。??false????true????ExtendedConnectionDataSourceProxy????true????false??

??HibernateCursorItemReader??

就像普通的Spring用户对是否使用ORM做出重要决定一样 解决方案,这会影响他们是否使用 A 或 A ,Spring Batch 用户具有相同的选项。 是游标技术的休眠实现。 Hibernate在批处理中的使用一直存在相当大的争议。这主要是因为 Hibernate最初是为了支持在线应用程序样式而开发的。然而,那 并不意味着它不能用于批处理。最简单的解决方法 此问题是使用 A 而不是标准会话。这将删除 Hibernate使用的所有缓存和脏检查都可能导致问题 批处理方案。有关无状态和正常状态之间区别的更多信息 休眠会话,请参阅特定休眠版本的文档。允许您声明一个 HQL 语句并传入一个 ,这将在每次调用中传回一个项目以在相同的基本中读取 时尚作为.以下示例配置使用相同的 作为 JDBC 读取器的“客户信用”示例:??JdbcTemplate????HibernateTemplate????HibernateCursorItemReader????StatelessSession????HibernateCursorItemReader????SessionFactory????JdbcCursorItemReader??

HibernateCursorItemReader itemReader = new HibernateCursorItemReader();itemReader.setQueryString(“from CustomerCredit”);//For simplicity sake, assume sessionFactory already obtained.itemReader.setSessionFactory(sessionFactory);itemReader.setUseStatelessSession(true);int counter = 0;ExecutionContext executionContext = new ExecutionContext();itemReader.open(executionContext);Object customerCredit = new Object();while(customerCredit != null){ customerCredit = itemReader.read(); counter++;}itemReader.close();

此配置以完全相同的方式返回对象 如 中所述,假设休眠映射文件已 为表正确创建。“useStatelessSession”属性默认值 为 true,但此处已添加以提请注意打开或关闭它的能力。 还值得注意的是,可以使用该属性设置基础游标的读取大小。与 一样,配置为 简单。??ItemReader????CustomerCredit????JdbcCursorItemReader????Customer????setFetchSize????JdbcCursorItemReader??

下面的示例演示如何在 XML 中注入休眠:??ItemReader??

XML 配置

<bean id=”itemReader” class=”org.springframework.batch.item.database.HibernateCursorItemReader”> <property name=”sessionFactory” ref=”sessionFactory” /> <property name=”queryString” value=”from CustomerCredit” /></bean>

以下示例显示了如何在 Java 中注入休眠:??ItemReader??

爪哇配置

@Beanpublic HibernateCursorItemReader itemReader(SessionFactory sessionFactory) { return new HibernateCursorItemReaderBuilder<CustomerCredit>() .name(“creditReader”) .sessionFactory(sessionFactory) .queryString(“from CustomerCredit”) .build();}??StoredProcedureItemReader??

有时需要使用存储过程获取游标数据。作品喜欢 ,除了,相反 在运行查询以获取游标时,它运行返回游标的存储过程。 存储过程可以通过三种不同的方式返回游标:??StoredProcedureItemReader????JdbcCursorItemReader??

作为返回(由SQL Server,Sybase,DB2,Derby和MySQL使用)。ResultSet作为 ref-cursor 作为 out 参数返回(由 Oracle 和 PostgreSQL 使用)。作为存储函数调用的返回值。

以下 XML 示例配置使用与前面相同的“客户信用”示例 例子:

XML 配置

<bean id=”reader” class=”o.s.batch.item.database.StoredProcedureItemReader”> <property name=”dataSource” ref=”dataSource”/> <property name=”procedureName” value=”sp_customer_credit”/> <property name=”rowMapper”> <bean class=”org.springframework.batch.sample.domain.CustomerCreditRowMapper”/> </property></bean>

以下 Java 示例配置使用与 早期的例子:

爪哇配置

@Beanpublic StoredProcedureItemReader reader(DataSource dataSource) { StoredProcedureItemReader reader = new StoredProcedureItemReader(); reader.setDataSource(dataSource); reader.setProcedureName(“sp_customer_credit”); reader.setRowMapper(new CustomerCreditRowMapper()); return reader;}

前面的示例依赖于存储过程提供 作为 返回的结果(前面的选项 1)。??ResultSet??

如果存储过程返回 (选项 2),那么我们需要提供 返回的 out 参数的位置。??ref-cursor????ref-cursor??

下面的示例演示如何使用第一个参数作为 ref 游标 .XML:

XML 配置

<bean id=”reader” class=”o.s.batch.item.database.StoredProcedureItemReader”> <property name=”dataSource” ref=”dataSource”/> <property name=”procedureName” value=”sp_customer_credit”/> <property name=”refCursorPosition” value=”1″/> <property name=”rowMapper”> <bean class=”org.springframework.batch.sample.domain.CustomerCreditRowMapper”/> </property></bean>

下面的示例演示如何使用第一个参数作为 ref 游标 爪哇岛:

爪哇配置

@Beanpublic StoredProcedureItemReader reader(DataSource dataSource) { StoredProcedureItemReader reader = new StoredProcedureItemReader(); reader.setDataSource(dataSource); reader.setProcedureName(“sp_customer_credit”); reader.setRowMapper(new CustomerCreditRowMapper()); reader.setRefCursorPosition(1); return reader;}

如果游标是从存储函数返回的(选项 3),我们需要设置 属性“函数”到 。默认为 .??true????false??

下面的示例演示 XML 中的属性:??true??

XML 配置

<bean id=”reader” class=”o.s.batch.item.database.StoredProcedureItemReader”> <property name=”dataSource” ref=”dataSource”/> <property name=”procedureName” value=”sp_customer_credit”/> <property name=”function” value=”true”/> <property name=”rowMapper”> <bean class=”org.springframework.batch.sample.domain.CustomerCreditRowMapper”/> </property></bean>

以下示例显示了 Java 中的属性:??true??

爪哇配置

@Beanpublic StoredProcedureItemReader reader(DataSource dataSource) { StoredProcedureItemReader reader = new StoredProcedureItemReader(); reader.setDataSource(dataSource); reader.setProcedureName(“sp_customer_credit”); reader.setRowMapper(new CustomerCreditRowMapper()); reader.setFunction(true); return reader;}

在所有这些情况下,我们需要定义 a 以及 a 和 实际过程名称。??RowMapper????DataSource??

如果存储过程或函数接受参数,则必须声明它们并 使用该属性设置。以下示例(对于 Oracle)声明了三个 参数。第一个是返回 ref 光标的参数,以及 第二个和第三个位于采用类型值的参数中。??parameters????out????INTEGER??

下面的示例演示如何使用 XML 中的参数:

XML 配置

<bean id=”reader” class=”o.s.batch.item.database.StoredProcedureItemReader”> <property name=”dataSource” ref=”dataSource”/> <property name=”procedureName” value=”spring.cursor_func”/> <property name=”parameters”> <list> <bean class=”org.springframework.jdbc.core.SqlOutParameter”> <constructor-arg index=”0″ value=”newid”/> <constructor-arg index=”1″> <util:constant static-field=”oracle.jdbc.OracleTypes.CURSOR”/> </constructor-arg> </bean> <bean class=”org.springframework.jdbc.core.SqlParameter”> <constructor-arg index=”0″ value=”amount”/> <constructor-arg index=”1″> <util:constant static-field=”java.sql.Types.INTEGER”/> </constructor-arg> </bean> <bean class=”org.springframework.jdbc.core.SqlParameter”> <constructor-arg index=”0″ value=”custid”/> <constructor-arg index=”1″> <util:constant static-field=”java.sql.Types.INTEGER”/> </constructor-arg> </bean> </list> </property> <property name=”refCursorPosition” value=”1″/> <property name=”rowMapper” ref=”rowMapper”/> <property name=”preparedStatementSetter” ref=”parameterSetter”/></bean>

以下示例显示了如何在 Java 中使用参数:

爪哇配置

@Beanpublic StoredProcedureItemReader reader(DataSource dataSource) { List<SqlParameter> parameters = new ArrayList<>(); parameters.add(new SqlOutParameter(“newId”, OracleTypes.CURSOR)); parameters.add(new SqlParameter(“amount”, Types.INTEGER); parameters.add(new SqlParameter(“custId”, Types.INTEGER); StoredProcedureItemReader reader = new StoredProcedureItemReader(); reader.setDataSource(dataSource); reader.setProcedureName(“spring.cursor_func”); reader.setParameters(parameters); reader.setRefCursorPosition(1); reader.setRowMapper(rowMapper()); reader.setPreparedStatementSetter(parameterSetter()); return reader;}

除了参数声明之外,我们还需要指定一个为调用设置参数值的实现。这与 以上。其他属性中列出的所有其他??属性??也适用于 。??PreparedStatementSetter????JdbcCursorItemReader????StoredProcedureItemReader??

分页实现??ItemReader??

使用数据库游标的替代方法是运行多个查询,其中每个查询 获取部分结果。我们将此部分称为页面。每个查询必须 指定起始行号和我们希望在页面中返回的行数。

??JdbcPagingItemReader??

分页的一种实现是 .需要负责提供 SQL 用于检索构成页面的行的查询。由于每个数据库都有自己的数据库 为了提供分页支持,我们需要对每种受支持的数据库类型使用不同的。还有自动检测正在使用的数据库并确定适当的实现。这简化了配置,并且是 建议的最佳做法。??ItemReader????JdbcPagingItemReader????JdbcPagingItemReader????PagingQueryProvider????PagingQueryProvider????SqlPagingQueryProviderFactoryBean????PagingQueryProvider??

要求您指定子句和子句。您还可以提供可选子句。这些条款和 必需 用于构建 SQL 语句。??SqlPagingQueryProviderFactoryBean????select????from????where????sortKey??

重要的是要有一个唯一的键约束,以保证 执行之间不会丢失任何数据。??sortKey??

打开读取器后,它会在每次调用中传回一个项目 基本时尚和其他任何.寻呼发生在幕后,当 需要额外的行。??read????ItemReader??

以下 XML 示例配置使用与 前面显示的基于游标的:??ItemReaders??

XML 配置

<bean id=”itemReader” class=”org.spr…JdbcPagingItemReader”> <property name=”dataSource” ref=”dataSource”/> <property name=”queryProvider”> <bean class=”org.spr…SqlPagingQueryProviderFactoryBean”> <property name=”selectClause” value=”select id, name, credit”/> <property name=”fromClause” value=”from customer”/> <property name=”whereClause” value=”where status=:status”/> <property name=”sortKey” value=”id”/> </bean> </property> <property name=”parameterValues”> <map> <entry key=”status” value=”NEW”/> </map> </property> <property name=”pageSize” value=”1000″/> <property name=”rowMapper” ref=”customerMapper”/></bean>

以下 Java 示例配置使用与 前面显示的基于游标的:??ItemReaders??

爪哇配置

@Beanpublic JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) { Map<String, Object> parameterValues = new HashMap<>(); parameterValues.put(“status”, “NEW”); return new JdbcPagingItemReaderBuilder<CustomerCredit>() .name(“creditReader”) .dataSource(dataSource) .queryProvider(queryProvider) .parameterValues(parameterValues) .rowMapper(customerCreditMapper()) .pageSize(1000) .build();}@Beanpublic SqlPagingQueryProviderFactoryBean queryProvider() { SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean(); provider.setSelectClause(“select id, name, credit”); provider.setFromClause(“from customer”); provider.setWhereClause(“where status=:status”); provider.setSortKey(“id”); return provider;}

此配置使用 , 返回对象 , 必须指定。“pageSize”属性确定读取的实体数 从数据库进行每次查询运行。??ItemReader????CustomerCredit????RowMapper??

“参数值”属性可用于指定 查询。如果在子句中使用命名参数,则每个条目的键应 匹配命名参数的名称。如果使用传统的“?”占位符,则 每个条目的键应为占位符的编号,从 1 开始。??Map????where??

??JpaPagingItemReader??

分页的另一个实现是 .JPA做 没有类似于冬眠的概念,所以我们不得不使用其他 JPA 规范提供的功能。由于 JPA 支持分页,因此这是很自然的 选择使用 JPA 进行批处理。阅读每页后, 实体变得分离并清除持久性上下文,以允许实体 处理页面后进行垃圾回收。??ItemReader????JpaPagingItemReader????StatelessSession??

允许您声明 JPQL 语句并传入 .然后,它每次调用传回一个项目以读取相同的基本 时尚和其他任何.寻呼发生在幕后,当附加 需要实体。??JpaPagingItemReader????EntityManagerFactory????ItemReader??

以下 XML 示例配置使用与 前面显示的 JDBC 读取器:

XML 配置

<bean id=”itemReader” class=”org.spr…JpaPagingItemReader”> <property name=”entityManagerFactory” ref=”entityManagerFactory”/> <property name=”queryString” value=”select c from CustomerCredit c”/> <property name=”pageSize” value=”1000″/></bean>

以下 Java 示例配置使用与 前面显示的 JDBC 读取器:

爪哇配置

@Beanpublic JpaPagingItemReader itemReader() { return new JpaPagingItemReaderBuilder<CustomerCredit>() .name(“creditReader”) .entityManagerFactory(entityManagerFactory()) .queryString(“select c from CustomerCredit c”) .pageSize(1000) .build();}

此配置以与 上述描述,假设对象具有 更正 JPA 注释或 ORM 映射文件。“页面大小”属性确定 每次查询执行从数据库中读取的实体数。??ItemReader????CustomerCredit????JdbcPagingItemReader????CustomerCredit??

数据库项编写器

虽然平面文件和 XML 文件都有特定的实例,但没有完全相同的等效实例 在数据库世界中。这是因为事务提供了所有需要的功能。 实现对于文件是必需的,因为它们必须像事务性一样运行, 跟踪书面项目并在适当的时间刷新或清除。 数据库不需要此功能,因为写入已包含在 交易。用户可以创建自己的DAO来实现接口或 使用为一般处理问题编写的自定义中的一个。也 方式,他们应该没有任何问题地工作。需要注意的一件事是性能 以及通过批处理输出提供的错误处理功能。这是最 使用休眠作为 时很常见,但在使用 JDBC 批处理模式。批处理数据库输出没有任何固有缺陷,假设我们 小心刷新,数据中没有错误。但是,任何错误 写作会引起混乱,因为无法知道哪个单独的项目导致了 例外情况,甚至任何单个项目负责,如 下图:??ItemWriter????ItemWriter????ItemWriter????ItemWriter????ItemWriter??

图4.刷新时出错

如果在写入之前缓冲了项目,则在缓冲区 在提交之前刷新。例如,假设每个块写入 20 个项目, 第 15 项抛出 .就 而言,所有 20 个项目都写得成功,因为没有办法知道 在实际写入之前会发生错误。一旦被调用, 清空缓冲区并命中异常。在这一点上,无能为力。必须回滚事务。通常,此异常可能会导致 要跳过的项目(取决于跳过/重试策略),然后不写入 再。但是,在批处理方案中,无法知道哪个项目导致了 问题。发生故障时,正在写入整个缓冲区。唯一的出路 解决此问题是在每个项目后刷新,如下图所示:??DataIntegrityViolationException????Step????Session#flush()????Step??

图5.写入时出错

这是一个常见的用例,尤其是在使用 Hibernate 时,以及 的实现 要在每次调用 时刷新。这样做允许 对于要可靠地跳过的项目,Spring Batch 内部负责 错误后调用的粒度。??ItemWriter????write()????ItemWriter??

重用现有服务

批处理系统通常与其他应用程序样式结合使用。最 常见的是在线系统,但它也可能支持集成甚至胖客户端 通过移动每种应用程序样式使用的必要批量数据来应用。为此 原因,许多用户希望重用现有的DAO或其他服务是很常见的。 他们的批处理作业。Spring 容器本身通过允许任何 要注入的必要类。但是,在某些情况下,现有服务 需要充当 or ,以满足 另一个春季批次类,或者因为它确实是一个步骤的主要内容。是的 为每个需要包装的服务编写适配器类相当简单,但是 因为它是一个常见的问题,Spring Batch 提供了实现:和 .两个类都实现了标准的弹簧 方法通过调用委托模式,并且设置起来相当简单。??ItemReader????ItemWriter????ItemReader????ItemReaderAdapter????ItemWriterAdapter??

以下 XML 示例使用 :??ItemReaderAdapter??

XML 配置

<bean id=”itemReader” class=”org.springframework.batch.item.adapter.ItemReaderAdapter”> <property name=”targetObject” ref=”fooService” /> <property name=”targetMethod” value=”generateFoo” /></bean><bean id=”fooService” class=”org.springframework.batch.item.sample.FooService” />

以下 Java 示例使用 :??ItemReaderAdapter??

爪哇配置

@Beanpublic ItemReaderAdapter itemReader() { ItemReaderAdapter reader = new ItemReaderAdapter(); reader.setTargetObject(fooService()); reader.setTargetMethod(“generateFoo”); return reader;}@Beanpublic FooService fooService() { return new FooService();}

需要注意的一点是,合同必须相同 作为合约:用尽时返回。否则,它将返回一个 .其他任何事情都会阻止框架知道处理何时应该结束, 导致无限循环或不正确的故障,具体取决于实现 的 .??targetMethod????read????null????Object????ItemWriter??

以下 XML 示例使用 :??ItemWriterAdapter??

XML 配置

<bean id=”itemWriter” class=”org.springframework.batch.item.adapter.ItemWriterAdapter”> <property name=”targetObject” ref=”fooService” /> <property name=”targetMethod” value=”processFoo” /></bean><bean id=”fooService” class=”org.springframework.batch.item.sample.FooService” />

以下 Java 示例使用 :??ItemWriterAdapter??

爪哇配置

@Beanpublic ItemWriterAdapter itemWriter() { ItemWriterAdapter writer = new ItemWriterAdapter(); writer.setTargetObject(fooService()); writer.setTargetMethod(“processFoo”); return writer;}@Beanpublic FooService fooService() { return new FooService();}防止状态持久性

默认情况下,所有 和 实现都存储其当前 提交之前的状态。但是,这可能并不总是 所需的行为。例如,许多开发人员选择制作他们的数据库读取器 通过使用过程指示器“可重新运行”。将额外列添加到输入数据中 指示是否已处理。当正在读取特定记录时(或 写入)处理的标志从 翻转到 。然后,SQL 语句可以 在子句中包含额外的语句,例如 , 从而确保在重新启动时仅返回未处理的记录。在 在这种情况下,最好不要存储任何状态,例如当前行号, 因为它在重新启动时无关紧要。因此,所有读者和作者都包括 “保存状态”属性。??ItemReader????ItemWriter????ExecutionContext????false????true????where????where PROCESSED_IND = false??

以下 Bean 定义显示了如何防止 XML 中的状态持久性:

XML 配置

<bean id=”playerSummarizationSource” class=”org.spr…JdbcCursorItemReader”> <property name=”dataSource” ref=”dataSource” /> <property name=”rowMapper”> <bean class=”org.springframework.batch.sample.PlayerSummaryMapper” /> </property> <property name=”saveState” value=”false” /> <property name=”sql”> <value> SELECT games.player_id, games.year_no, SUM(COMPLETES), SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD), SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS), SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD) from games, players where players.player_id = games.player_id group by games.player_id, games.year_no </value> </property></bean>

以下 Bean 定义显示了如何在 Java 中防止状态持久性:

爪哇配置

@Beanpublic JdbcCursorItemReader playerSummarizationSource(DataSource dataSource) { return new JdbcCursorItemReaderBuilder<PlayerSummary>() .dataSource(dataSource) .rowMapper(new PlayerSummaryMapper()) .saveState(false) .sql(“SELECT games.player_id, games.year_no, SUM(COMPLETES),” + “SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),” + “SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),” + “SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)” + “from games, players where players.player_id =” + “games.player_id group by games.player_id, games.year_no”) .build();}

上面配置的不会在 for 中进行任何条目 它参与的任何处决。??ItemReader????ExecutionContext??

创建自定义条目读取器和条目编写器

到目前为止,本章已经讨论了春季阅读和写作的基本契约 批处理和一些用于执行此操作的常见实现。然而,这些都是公平的 通用,并且有许多潜在的方案可能无法被开箱即用的涵盖 实现。本节通过使用一个简单的示例来展示如何创建自定义和实现并正确实现其协定。还实现了,以说明如何使读者或 编写器可重新启动。??ItemReader????ItemWriter????ItemReader????ItemStream??

自定义示例??ItemReader??

出于此示例的目的,我们创建了一个简单的实现 从提供的列表读取。我们首先实现最基本的契约 ,方法,如下面的代码所示:??ItemReader????ItemReader????read??

public class CustomItemReader<T> implements ItemReader<T> { List<T> items; public CustomItemReader(List<T> items) { this.items = items; } public T read() throws Exception, UnexpectedInputException, NonTransientResourceException, ParseException { if (!items.isEmpty()) { return items.remove(0); } return null; }}

前面的类获取项目列表,并一次返回一个项目,并删除每个 从列表中。当列表为空时,它返回 ,从而满足最基本的 的要求,如以下测试代码所示:??null????ItemReader??

List<String> items = new ArrayList<>();items.add(“1”);items.add(“2”);items.add(“3”);ItemReader itemReader = new CustomItemReader<>(items);assertEquals(“1”, itemReader.read());assertEquals(“2”, itemReader.read());assertEquals(“3”, itemReader.read());assertNull(itemReader.read());使??ItemReader??

最后一个挑战是使可重新启动。目前,如果处理 中断并重新开始,必须从头开始。这是 实际上在许多情况下都有效,但有时最好是批处理作业 从中断的地方重新启动。关键的判别因素通常是读者是否是有状态的 或无国籍。无状态读取器无需担心可重启性,但 有状态的必须在重新启动时尝试重建其最后一个已知状态。出于这个原因, 我们建议您尽可能保持自定义读取器无状态,因此您不必担心 关于可重启性。??ItemReader????ItemReader??

如果确实需要存储状态,则应使用该接口:??ItemStream??

public class CustomItemReader<T> implements ItemReader<T>, ItemStream { List<T> items; int currentIndex = 0; private static final String CURRENT_INDEX = “current.index”; public CustomItemReader(List<T> items) { this.items = items; } public T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { if (currentIndex < items.size()) { return items.get(currentIndex++); } return null; } public void open(ExecutionContext executionContext) throws ItemStreamException { if (executionContext.containsKey(CURRENT_INDEX)) { currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue(); } else { currentIndex = 0; } } public void update(ExecutionContext executionContext) throws ItemStreamException { executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue()); } public void close() throws ItemStreamException {}}

在每次调用该方法时,的当前索引都存储在提供的键“current.index”中。调用该方法时,将检查 以查看它是否 包含具有该键的条目。如果找到该键,则当前索引将移动到 那个位置。这是一个相当微不足道的例子,但它仍然符合总合同:??ItemStream????update????ItemReader????ExecutionContext????ItemStream????open????ExecutionContext??

ExecutionContext executionContext = new ExecutionContext();((ItemStream)itemReader).open(executionContext);assertEquals(“1”, itemReader.read());((ItemStream)itemReader).update(executionContext);List<String> items = new ArrayList<>();items.add(“1”);items.add(“2”);items.add(“3”);itemReader = new CustomItemReader<>(items);((ItemStream)itemReader).open(executionContext);assertEquals(“2”, itemReader.read());

大多数都有更复杂的重启逻辑。例如,将上次处理的行的行 ID 存储在 光标。??ItemReaders????JdbcCursorItemReader??

还值得注意的是,其中使用的密钥不应该是 琐碎。那是因为相同的用于所有内部 一个。在大多数情况下,只需在键前面加上类名就足够了 以保证唯一性。但是,在极少数情况下,在同一步骤中使用两个相同类型的 (如果需要两个文件,则可能会发生这种情况) 输出),需要一个更独特的名称。出于这个原因,许多 Spring 批处理和实现都有一个属性,允许 键名称被覆盖。??ExecutionContext????ExecutionContext????ItemStreams????Step????ItemStream????ItemReader????ItemWriter????setName()??

自定义示例??ItemWriter??

实现自定义在许多方面与示例相似 以上,但在足够多的地方有所不同,以保证它自己的例子。但是,添加 可重启性本质上是相同的,因此此示例不涉及它。与示例一样,使用 a 以使示例保持简单 可能:??ItemWriter????ItemReader????ItemReader????List??

public class CustomItemWriter<T> implements ItemWriter<T> { List<T> output = TransactionAwareProxyFactory.createTransactionalList(); public void write(Chunk<? extends T> items) throws Exception { output.addAll(items); } public List<T> getOutput() { return output; }}使??ItemWriter??

为了使可重新启动,我们将遵循与 相同的过程,添加和实现接口以同步 执行上下文。在示例中,我们可能必须计算处理的项目数 并将其添加为页脚记录。如果我们需要这样做,我们可以在我们的中实现,以便从执行中重组计数器 上下文(如果流已重新打开)。??ItemWriter????ItemReader????ItemStream????ItemStream????ItemWriter??

在许多现实情况下,习俗也会委托给另一个作家,即 可重新启动(例如,写入文件时),否则它会写入 事务资源等不需要重新启动,因为它是无状态的。 当你有一个有状态的编写器时,你可能应该确保实现为 以及.还要记住,作家的客户需要知道 ,因此您可能需要在配置中将其注册为流。??ItemWriters????ItemStream????ItemWriter????ItemStream??

项目读取器和编写器实现

在本节中,我们将向您介绍尚未进入的读者和作家 在前面的部分中进行了讨论。

装饰

在某些情况下,用户需要将专用行为附加到预先存在的 .Spring Batch 提供了一些开箱即用的装饰器,可以添加 对你和实现的其他行为。??ItemReader????ItemReader????ItemWriter??

春季批次包括以下装饰器:

SynchronizedItemStreamReaderSingleItemPeekableItemReaderSynchronizedItemStreamWriterMultiResourceItemWriterClassifierCompositeItemWriterClassifierCompositeItemProcessor??SynchronizedItemStreamReader??

当使用非线程安全时,Spring Batch 提供了装饰器,可用于使线程安全。Spring Batch 提供了一个 的实例。??ItemReader????SynchronizedItemStreamReader????ItemReader????SynchronizedItemStreamReaderBuilder????SynchronizedItemStreamReader??

??SingleItemPeekableItemReader??

Spring Batch 包含一个装饰器,用于向 .此一瞥 方法允许用户提前查看一个项目。重复调用速览返回相同的 项,这是从该方法返回的下一项。Spring Batch 提供了一个构造 .??ItemReader????read????SingleItemPeekableItemReaderBuilder????SingleItemPeekableItemReader??

SingleItemPeekableItemReader 的 peek 方法不是线程安全的,因为它不会 可以在多个线程中遵循窥视。只有一个线程偷看 将在下一次调用中读取该项目。

??SynchronizedItemStreamWriter??

当使用非线程安全时,Spring Batch 提供了装饰器,可用于使线程安全。Spring Batch 提供了一个 的实例。??ItemWriter????SynchronizedItemStreamWriter????ItemWriter????SynchronizedItemStreamWriterBuilder????SynchronizedItemStreamWriter??

??MultiResourceItemWriter??

换行并创建一个新的 当前资源中写入的项目数超过 .春季批次提供了一个 构造 的实例。??MultiResourceItemWriter????ResourceAwareItemWriterItemStream????itemCountLimitPerResource????MultiResourceItemWriterBuilder????MultiResourceItemWriter??

??ClassifierCompositeItemWriter??

调用每个项目的实现集合之一,基于通过提供的 .如果所有委托都是线程安全的,则实现是线程安全的。春天 批处理提供了一个用于构造 .??ClassifierCompositeItemWriter????ItemWriter????Classifier????ClassifierCompositeItemWriterBuilder????ClassifierCompositeItemWriter??

??ClassifierCompositeItemProcessor??

是一个调用一个 基于实现的路由器模式的实现集合 通过提供的.Spring Batch 提供了一个构造 .??ClassifierCompositeItemProcessor????ItemProcessor????ItemProcessor????Classifier????ClassifierCompositeItemProcessorBuilder????ClassifierCompositeItemProcessor??

消息传递阅读器和写入器

Spring Batch 为常用的消息传递系统提供以下阅读器和写入器:

AmqpItemReaderAmqpItemWriterJmsItemReaderJmsItemWriterKafkaItemReaderKafkaItemWriter??AmqpItemReader??

是使用 接收或转换 来自交易所的消息。Spring Batch 提供了一个 的实例。??AmqpItemReader????ItemReader????AmqpTemplate????AmqpItemReaderBuilder????AmqpItemReader??

??AmqpItemWriter??

是使用 将消息发送到 AMQP交易所。如果名称未在 提供的 .春季批处理提供了一个 构造 的实例。??AmqpItemWriter????ItemWriter????AmqpTemplate????AmqpTemplate????AmqpItemWriterBuilder????AmqpItemWriter??

??JmsItemReader??

对于使用 .模板 应具有默认目标,用于为方法提供项。Spring Batch 提供了一个构造 .??JmsItemReader????ItemReader????JmsTemplate????read()????JmsItemReaderBuilder????JmsItemReader??

??JmsItemWriter??

对于使用 .模板 应具有默认目标,用于在 中发送项目。春天 批处理提供了一个用于构造 .??JmsItemWriter????ItemWriter????JmsTemplate????write(List)????JmsItemWriterBuilder????JmsItemWriter??

??KafkaItemReader??

这是一个Apache Kafka主题。可以配置 从同一主题的多个分区读取消息。它存储消息偏移 在执行上下文中支持重启功能。Spring Batch 提供了一个构造 .??KafkaItemReader????ItemReader????KafkaItemReaderBuilder????KafkaItemReader??

??KafkaItemWriter??

对于 Apache Kafka,它使用 a 到 将事件发送到默认主题。春季批次提供了一个 构造 的实例。??KafkaItemWriter????ItemWriter????KafkaTemplate????KafkaItemWriterBuilder????KafkaItemWriter??

数据库读取器

Spring Batch提供以下数据库阅读器:

Neo4jItemReaderMongoItemReaderHibernateCursorItemReaderHibernatePagingItemReaderRepositoryItemReader??Neo4jItemReader??

这是一个从图形数据库 Neo4j 读取对象的 通过使用分页技术。春季批次提供了一个 构造 的实例。??Neo4jItemReader????ItemReader????Neo4jItemReaderBuilder????Neo4jItemReader??

??MongoItemReader??

这是一个通过使用 MongoDB 从 MongoDB 读取文档 分页技术。Spring Batch 提供了一个来构造一个 的实例。??MongoItemReader????ItemReader????MongoItemReaderBuilder????MongoItemReader??

??HibernateCursorItemReader??

用于读取数据库记录 建立在冬眠之上。它执行 HQL 查询,然后在初始化时迭代 在调用方法时对结果集进行,连续返回一个对象 对应于当前行。Spring Batch 提供了一个构造 .??HibernateCursorItemReader????ItemStreamReader????read()????HibernateCursorItemReaderBuilder????HibernateCursorItemReader??

??HibernatePagingItemReader??

是用于读取数据库记录构建在 Hibernate顶部,一次最多只能读取固定数量的项目。春季批次 提供了一个用于构造 的实例。??HibernatePagingItemReader????ItemReader????HibernatePagingItemReaderBuilder????HibernatePagingItemReader??

??RepositoryItemReader??

是使用 .春季批次提供了一个 构造 的实例。??RepositoryItemReader????ItemReader????PagingAndSortingRepository????RepositoryItemReaderBuilder????RepositoryItemReader??

数据库编写器

Spring Batch提供以下数据库编写器:

Neo4jItemWriterMongoItemWriterRepositoryItemWriterHibernateItemWriterJdbcBatchItemWriterJpaItemWriter??Neo4jItemWriter??

这是一个写入 Neo4j 数据库的实现。 Spring Batch 提供了一个构造 .??Neo4jItemWriter????ItemWriter????Neo4jItemWriterBuilder????Neo4jItemWriter??

??MongoItemWriter??

这是一个写入MongoDB存储的实现 使用 Spring 数据的实现。Spring Batch 提供了一个构造 .??MongoItemWriter????ItemWriter????MongoOperations????MongoItemWriterBuilder????MongoItemWriter??

??RepositoryItemWriter??

这是来自春天的包装器 数据。Spring Batch 提供了一个构造实例 这。??RepositoryItemWriter????ItemWriter????CrudRepository????RepositoryItemWriterBuilder????RepositoryItemWriter??

??HibernateItemWriter??

是使用休眠会话来保存或 更新不属于当前休眠会话的实体。春季批次提供 a 来构造 .??HibernateItemWriter????ItemWriter????HibernateItemWriterBuilder????HibernateItemWriter??

??JdbcBatchItemWriter??

这是一个使用批处理功能,为提供的所有项目执行一批语句。 Spring Batch 提供了一个构造 .??JdbcBatchItemWriter????ItemWriter????NamedParameterJdbcTemplate????JdbcBatchItemWriterBuilder????JdbcBatchItemWriter??

??JpaItemWriter??

是使用 JPA 进行合并 不属于持久性上下文的任何实体。Spring Batch 提供了一个构造 .??JpaItemWriter????ItemWriter????EntityManagerFactory????JpaItemWriterBuilder????JpaItemWriter??

专业读者

Spring Batch提供以下专业阅读器:

LdifReaderMappingLdifReaderAvroItemReader??LdifReader??

从 、 中读取 LDIF(LDAP 数据交换格式)记录。 解析它们,并为每个执行的内容返回一个对象。春季批次 提供了一个用于构造 的实例。??LdifReader????Resource????LdapAttribute????read????LdifReaderBuilder????LdifReader??

??MappingLdifReader??

从 中读取 LDIF(LDAP 数据交换格式)记录,解析它们,然后将每个 LDIF 记录映射到 POJO(普通旧 Java 对象)。 每次读取都会返回一个 POJO。Spring Batch 提供了一个 的实例。??MappingLdifReader????Resource????MappingLdifReaderBuilder????MappingLdifReader??

??AvroItemReader??

从资源读取序列化的 Avro 数据。 每次读取都返回由 Java 类或 Avro 模式指定的类型的实例。 可以选择将读取器配置为嵌入或不嵌入 Avro 架构的输入。 Spring Batch 提供了一个用于构造 .??AvroItemReader????AvroItemReaderBuilder????AvroItemReader??

专业作家

Spring Batch提供以下专业作家:

SimpleMailMessageItemWriterAvroItemWriter??SimpleMailMessageItemWriter??

这是一个可以发送邮件消息。它 将消息的实际发送委托给 的实例。春季批次 提供了一个用于构造 的实例。??SimpleMailMessageItemWriter????ItemWriter????MailSender????SimpleMailMessageItemWriterBuilder????SimpleMailMessageItemWriter??

??AvroItemWriter??

根据给定的类型或模式将 Java 对象序列化为 WriteableResource。 可以选择将编写器配置为在输出中嵌入或不嵌入 Avro 架构。 Spring Batch 提供了一个用于构造 .??AvroItemWrite????AvroItemWriterBuilder????AvroItemWriter??

专用处理器

Spring Batch提供以下专用处理器:

ScriptItemProcessor??ScriptItemProcessor??

是将当前项目传递给处理的 到提供的脚本,脚本的结果由处理器返回。春天 批处理提供了一个用于构造 .??ScriptItemProcessor????ItemProcessor????ScriptItemProcessorBuilder????ScriptItemProcessor??

会让你的心态更平和更坦然,也会让你心无旁骛,更会让你的心灵得到解脱和抚慰。

Spring Batch -项目读写

相关文章:

你感兴趣的文章:

标签云: