Spring Integration 对R2DBC 支持

Spring 集成提供了通道适配器,用于通过??R2DBC??驱动程序对数据库进行反应式访问来接收和发送消息。

您需要将此依赖项包含在项目中:

马文

格拉德尔

<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-r2dbc</artifactId> <version>6.0.0</version></dependency>R2DBC 入站通道适配器

这是一个基于 的可轮询实现,并根据选项生成带有 or 作为从数据库获取的数据的有效负载的消息。 要的查询可以静态提供,也可以基于每次调用时计算的 SpEL 表达式。 作为评估上下文的根对象存在,以允许使用流畅的 API。 默认情况下,此通道适配器将选择中的记录映射到实例中。 它可以定制,提供一个选项,该选项由基于. 这是可选的,用于标记数据库中的读取记录,以便从后续轮询中跳过。 该操作可以与 一起提供,以将值绑定到基于结果中的记录的 中。??R2dbcMessageSource????MessageSource????R2dbcEntityOperations????Flux????Mono????expectSingleResult????SELECT????receive()????R2dbcMessageSource.SelectCreator????StatementMapper.SelectSpec????LinkedCaseInsensitiveMap????payloadType????EntityRowMapper????this.r2dbcEntityOperations.getConverter()????updateSql????UPDATE????BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec>????UPDATE????SELECT??

此通道适配器的典型配置可能如下所示:

@Bean@InboundChannelAdapter(“fromR2dbcChannel”)public R2dbcMessageSource r2dbcMessageSourceSelectMany() { R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate, “SELECT * FROM person WHERE name=’Name'”); r2dbcMessageSource.setPayloadType(Person.class); r2dbcMessageSource.setUpdateSql(“UPDATE Person SET name=’SomeOtherName’ WHERE id = :id”); r2dbcMessageSource.setBindFunction( (DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind(“id”, o.getId()));} return r2dbcMessageSource;}

对于 Java DSL,此通道适配器的配置如下所示:

@BeanIntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) { return IntegrationFlow .from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate, (selectCreator) -> selectCreator.createSelect(“person”) .withProjection(“*”) .withCriteria(Criteria.where(“id”).is(1))) .expectSingleResult(true) .payloadType(Person.class) .updateSql(“UPDATE Person SET id=’2′ where id = :id”) .bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind(“id”, o.getId())), e -> e.poller(p -> p.fixedDelay(100))) .<Mono<?>>handle((p, h) -> p, e -> e.async(true)) .channel(MessageChannels.flux()) .get();}R2DBC 出站通道适配器

这是使用提供的 在数据库中执行(默认)或查询的实现。 可以静态配置,也可以通过针对请求消息的 SpEL 表达式进行配置。 要执行的查询可以基于 、 和表达式选项,或者(如果未提供)将整个消息有效负载视为要对其执行 SQL 的实体。 该包被注册为 SpEL 评估上下文的导入,以便直接访问用于和查询的流畅 API。 在 和 中使用,并且必须计算为 for 列值对,才能针对请求消息在目标表中执行更改。??R2dbcMessageHandler????ReactiveMessageHandler????INSERT????UPDATE????DELETE????R2dbcEntityOperations????R2dbcMessageHandler.Type????tableName????values????criteria????tableName????org.springframework.data.relational.core.mapping.Table????org.springframework.data.relational.core.query????Criteria????UPDATE????DELETE????valuesExpression????INSERT????UPDATE????Map??

此通道适配器的典型配置可能如下所示:

@Bean@ServiceActivator(inputChannel = “toR2dbcChannel”)public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) { R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate) messageHandler.setValuesExpression(new FunctionExpression<Message<?>>(Message::getPayload)); messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE); messageHandler.setCriteriaExpression( EXPRESSION_PARSER.parseExpression(“T(Criteria).where(‘id).is(headers.personId))); return messageHandler;}

对于 Java DSL,此通道适配器的配置如下所示:

.handle(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate) .queryType(R2dbcMessageHandler.Type.UPDATE) .tableNameExpression(“payload.class.simpleName”) .criteria((message) -> Criteria.where(“id”).is(message.getHeaders().get(“personId”))) .values(“{age:36}”)) 可是我知道结果是惨淡的,但还是心存希望!

Spring Integration 对R2DBC 支持

相关文章:

你感兴趣的文章:

标签云: