SpringCloud——消息驱动

Spring Cloud Stream使用场景:消息驱动的微服务应用目的:简化编码统一抽象(门面模式思想)基本概念Source: Stream发送源,类似Produer、PublisherSink:Stream接收器,类似Consumer、SubscriberProcessor:处理管道与RabbitMQ整合改造user-service-client 消息发送源(Source)

依赖

<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency>

定义UserMessage接口,用于输出

/** * 用户消息(输出) */public interface UserMessage { @Output(“user-message-out”) MessageChannel output();}

激活绑定

@EnableBinding(UserMessage.class)public class RibbonClientApplication {

修改配置文件

# Spring Cloud Stream Binding 配置# destination 指定 Rabbit MQ Topic:users# user-message-out 为输出管道名称spring.cloud.stream.bindings.user-message-out.destination = users

添加一个发送消息的Controller:

@RestControllerpublic class UserServiceClientController { @Autowired private UserMessage userMessage; @Autowired private ObjectMapper objectMapper; @PostMapping(“/user/save/message”) public boolean saveUserByRabbitMessage(@RequestBody User user) throws JsonProcessingException { MessageChannel messageChannel = userMessage.output(); // User 序列化成 JSON String payload = objectMapper.writeValueAsString(user); GenericMessage<String> message = new GenericMessage<>(payload); // 发送消息 return messageChannel.send(message); }}User对象实现序列化接口

user-api项目

@Datapublic class User implements Serializable{ private static final long serialVersionUID = -3345217474278625920L; private String id; private String name;}改造user-service-provider消息接收器(Sink)

依赖

<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency>

定义用户接口

package com.learn.service.provider.stream;import org.springframework.cloud.stream.annotation.Input;import org.springframework.messaging.SubscribableChannel;public interface UserMessage { @Input SubscribableChannel input();}

激活

@EnableBinding(UserMessage.class) //激活Stream Binding 到 UserMessagepublic class UserServiceProviderApplication {

修改配置文件

spring: application: name: user-service-provider cloud: stream: bindings: input: #@input注解的方法名称 destination: users # 和消息发送方保持一致

Bindings配置格式为:

Source: spring.cloud.stream.bindings.${source}.*Sink: spring.cloud.stream.bindings.${sink}.*

添加一个Service来消费这个消息

@Servicepublic class UserMessageService { @Autowired private UserMessage userMessage; @Autowired @Qualifier(“userServiceInMemory”) private UserService userService; @Autowired private ObjectMapper objectMapper; @PostConstruct public void init() { SubscribableChannel subscribableChannel = userMessage.input(); subscribableChannel.subscribe(message -> { String body = (String) message.getPayload(); System.out.println(“user:” + body); try { User user = objectMapper.readValue(body,User.class); userService.saveUser(user); } catch (IOException e) { e.printStackTrace(); } }); }}测试

通过POSTMAN 发送POST请求 到 ??http://localhost:6060/user-service-client/user/save/message??

传送的内容为:

{ “id”:”3″, “name”:”张家球”}

访问 ??http://localhost:6060/user-service-provider/user??

得到结果为:

[ { “id”: “2”, “name”: “王五” }, { “id”: “3”, “name”: “张家球” }]

访问RabbitMQ控制台可以看到有消息的起伏(多发送几次消息即可)

也有伤心的,既有令人兴奋的,也有令人灰心的,

SpringCloud——消息驱动

相关文章:

你感兴趣的文章:

标签云: