Apache Cassandra 支持

Spring Integration 提供了通道适配器(从版本 6.0 开始),用于对 Apache Cassandra 集群执行数据库操作。 它完全基于Apache Cassandra项目的Spring Data。

您需要将此依赖项包含在项目中:

<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-cassandra</artifactId> <version>6.0.0</version></dependency>卡桑德拉出站组件

这是一个实现,可以在单向(默认)和请求-答复模式(选项)下工作。 默认情况下,它是异步的(重置),并对提供的 执行反应式 、 或操作。 可以通过该选项配置操作类型。 将模式设置为 ;或 、 或 将模式设置为 .??CassandraMessageHandler????AbstractReplyProducingMessageHandler????producesReply????setAsync(false)????INSERT????UPDATE????DELETE????STATEMENT????ReactiveCassandraOperations????CassandraMessageHandler.Type????ingestQuery????INSERT????query????statementExpression????statementProcessor????STATEMENT??

以下代码片段演示了此通道适配器或网关的各种配置:

@BeanIntegrationFlow cassandraSelectFlow(ReactiveCassandraOperations cassandraOperations) { return flow -> flow .handle(Cassandra.outboundGateway(cassandraOperations) .query(“SELECT * FROM book WHERE author = :author limit :size”) .parameter(“author”, “payload”) .parameter(“size”, m -> m.getHeaders().get(“limit”))) .channel(c -> c.flux(“resultChannel”));}

如果在默认异步模式下将 a 用作网关,则生成 a,根据提供的实现进行处理。 对于真正的反应式处理,建议对输出通道配置使用。 在同步模式下调用以获取回复值。??CassandraMessageHandler????Mono<WriteResult>????MessageChannel????FluxMessageChannel????Mono.block()??

如果执行 或操作,则请求消息有效负载中应有一个实体(标记为 )。 如果有效负载是实体列表,则执行相应的批处理操作。??INSERT????UPDATE????DELETE????org.springframework.data.cassandra.core.mapping.Table??

该模式期望有效负载以要插入的值矩阵的形式存在 – 。 例如,如果实体如下所示:??ingestQuery????List<List<?>>??

@Table(“book”)public record Book(@PrimaryKey String isbn, String title, @Indexed String author, int pages, LocalDate saleDate, boolean isInStock) {}

通道适配器具有以下配置:

@Beanpublic MessageHandler cassandraMessageHandler3() { CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template); String cqlIngest = “insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)”; cassandraMessageHandler.setIngestQuery(cqlIngest); cassandraMessageHandler.setAsync(false); return cassandraMessageHandler;}

请求消息有效负载必须按如下方式转换:

List<List<Object>> ingestBooks = payload.stream() .map(book -> List.<Object>of( book.isbn(), book.title(), book.author(), book.pages(), book.saleDate(), book.isInStock())) .toList();

对于更复杂的用例,有效负载可以作为 的实例。 建议使用 API 构建各种语句来针对 Apache Cassandra 执行。 例如,要从表中删除所有数据,可以将具有此类有效负载的消息发送到 : 。 或者,对于基于请求消息的逻辑,可以为 提供 或 以构建基于该消息的 。 为方便起见,a 被注册为 SpEL 求值上下文中的 ,因此目标表达式可以像这样简单:??com.datastax.oss.driver.api.core.cql.Statement????com.datastax.oss.driver.api.querybuilder.QueryBuilder????Book????CassandraMessageHandler????QueryBuilder.truncate(“book”).build()????statementExpression????statementProcessor????CassandraMessageHandler????Statement????com.datastax.oss.driver.api.querybuilder????import??

statement-expression=”T(QueryBuilder).selectFrom(“book”).all()”

表示可绑定的命名查询参数,并且仅与选项一起使用。 请参阅上面提到的 Java 和 XML 示例。??setParameterExpressions(Map<String, Expression> parameterExpressions)????setQuery(String query)??

【本文由:武汉网站建设 wuhan.html 网络转载请说明出处】蚁穴虽小,溃之千里。

Apache Cassandra 支持

相关文章:

你感兴趣的文章:

标签云: