websocket springboot,实战spring-boot-starter-websocket之断网心跳续期实践
websocket springboot,实战spring-boot-starter-websocket之断网心跳续期实践详细介绍
本文目录一览: SpringBoot整合Websocket实现即时聊天功能
近期,公司需要新增即时聊天的业务,于是用websocket 整合到Springboot完成业务的实现。
一、我们来简单的介绍下websocket的交互原理:
1.客户端先服务端发起websocket请求;
2.服务端接收到请求之后,把请求响应返回给客户端;
3.客户端与服务端只需要一次握手即可完成交互通道;
? 二、webscoket支持的协议:基于TCP协议下,http协议和https协议;
? http协议 springboot不需要做任何的配置?
? https协议则需要配置nignx代理,注意证书有效的问题? ---在这不做详细说明
? 三、开始我们的实现java后端的实现
? 1.添加依赖
?
? ? ? ?
? ? ? ? ? ?
org.springframework.boot
? ? ? ? ? ?
spring-boot-starter-websocket
? ? ? ?
? ? ? ?
? ? ? ? ? ?
org.springframework
? ? ? ? ? ?
spring-websocket
? ? ? ? ? ?
${spring.version}
? ? ? ?
? ? ? ?
? ? ? ? ? ?
org.springframework
? ? ? ? ? ?
spring-messaging
? ? ? ? ? ?
${spring.version}
? ? ? ?
? ? ? ?
? 2.配置config
@ConditionalOnWebApplication
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractSessionWebSocketMessageBrokerConfigurer {
? ? @Bean
? ? public ServerEndpointExporter serverEndpointExporter(){
? ? ? ? return? new ServerEndpointExporter();
? ? }
? ? @Bean
? ? public CustomSpringConfigurator customSpringConfigurator() {
? ? ? ? return new CustomSpringConfigurator();
? ? }
? ? @Override
? ? protected void configureStompEndpoints(StompEndpointRegistry registry) {
? ? ? ? registry.addEndpoint("/websocket").setAllowedOrigins("*")
? ? ? ? ? ? ? ? .addInterceptors(new HttpSessionHandshakeInterceptor()).withSockJS();
? ? }
public class CustomSpringConfigurator extends ServerEndpointConfig.Configurator implements ApplicationContextAware {
? ? private static volatile BeanFactory context;
? ? @Override
? ? public
T getEndpointInstance(Class
clazz) throws InstantiationException {
? ? ? ? return context.getBean(clazz);
? ? }
? ? @Override
? ? public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
? ? ? ? CustomSpringConfigurator.context = applicationContext;
? ? }
? ? @Override
? ? public void modifyHandshake(ServerEndpointConfig sec,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? HandshakeRequest request, HandshakeResponse response) {
? ? ? ? super.modifyHandshake(sec,request,response);
? ? ? ? HttpSession httpSession=(HttpSession) request.getHttpSession();
? ? ? ? if(httpSession!=null){
? ? ? ? ? ? sec.getUserProperties().put(HttpSession.class.getName(),httpSession);
? ? ? ? }
? ? }
}
@SpringBootApplication
@EnableCaching
@ComponentScan("com")
@EnableWebSocket
public class Application extends SpringBootServletInitializer {
static final Logger logger = LoggerFactory.getLogger(Application.class);
? ? @Override
? ? protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
? ? ? ? return application.sources(Application.class);
? ? }
需要注意的是: @EnableWebSocket? 一定要加在启动类上,不然springboot无法对其扫描进行管理;
@SeverEndpoint --将目标类定义成一个websocket服务端,注解对应的值将用于监听用户连接的终端访问地址,客户端可以通过URL来连接到websocket服务端。
四、设计思路:用map
来保存房间对应的用户连接列表,当有用户进入一个房间的时候,就会先检测房间是否存在,如果不存在那就新建一个空的用户set,再加入本身到这个set中,确保不同房间号里的用户session不串通!
/**
* Create by wushuyu
* on 2020/4/30 13:24
*
*/
@ServerEndpoint(value = "/websocket/{roomName}", configurator = CustomSpringConfigurator.class)
@Component
public class WebSocketRoom {
? ? //连接超时--一天
? ? private static final long MAX_TIME_OUT = 24*60*60*1000;
? ? // key为房间号,value为该房间号的用户session
? ? private static final Map
<string, set
> rooms = new ConcurrentHashMap<>();
? ? //将用户的信息存储在一个map集合里
? ? private static final Map
users = new ConcurrentHashMap<>();
/**
*{roomName} 使用通用跳转,实现动态获取房间号和用户信息? 格式:roomId|xx|xx
*/
? ? @OnOpen?
? ? public void connect(@PathParam("roomName") String roomName, Session session) {
? ? ? ? String roomId = roomName.split("[|]")[0];
? ? ? ? String nickname = roomName.split("[|]")[1];
? ? ? ? String loginId = roomName.split("[|]")[2];
? ? ? ? //设置连接超时时间
? ? ? ? ? ? session.setMaxIdleTimeout(MAX_TIME_OUT);
? ? ? ? try {
? ? ? ? ? //可实现业务逻辑
? ? ? ? ? ? }
? ? ? ? ? ? // 将session按照房间名来存储,将各个房间的用户隔离
? ? ? ? ? ? if (!rooms.containsKey(roomId)) {
? ? ? ? ? ? ? ? // 创建房间不存在时,创建房间
? ? ? ? ? ? ? ? Set
room = new HashSet<>();
? ? ? ? ? ? ? ? // 添加用户
? ? ? ? ? ? ? ? room.add(session);
? ? ? ? ? ? ? ? rooms.put(roomId, room);
? ? ? ? ? ? } else { // 房间已存在,直接添加用户到相应的房间? ? ? ? ? ? ?
? ? ? ? ? ? ? ? if (rooms.values().contains(session)) {//如果房间里有此session直接不做操作
? ? ? ? ? ? ? ? } else {//不存在则添加
? ? ? ? ? ? ? ? ? ? rooms.get(roomId).add(session);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? JSONObject jsonObject = new JSONObject();
? ? ? ? ? ? -----
? ? ? ? ? ? //根据自身业务情况实现业务
? ? ? ? ? ? -----
? ? ? ? ? ? users.put(session.getId(), jsonObject);
? ? ? ? ? ? //向在线的人发送当前在线的人的列表? ? -------------可有可无,看业务需求
? ? ? ? ? ? List
userList = new LinkedList<>();
? ? ? ? ? ? rooms.get(roomId)
? ? ? ? ? ? ? ? ? ? .stream()
? ? ? ? ? ? ? ? ? ? .map(Session::getId)
? ? ? ? ? ? ? ? ? ? .forEach(s -> {
? ? ? ? ? ? ? ? ? ? ? ? ChatMessage chatMessage = new ChatMessage();
? ? ? ? ? ? ? ? ? ? ? ? chatMessage.setDate(new Date());
? ? ? ? ? ? ? ? ? ? ? ? chatMessage.setStatus(1);
? ? ? ? ? ? ? ? ? ? ? ? chatMessage.setChatContent(users.get(s));
? ? ? ? ? ? ? ? ? ? ? ? chatMessage.setMessage("");
? ? ? ? ? ? ? ? ? ? ? ? userList.add(chatMessage);
? ? ? ? ? ? ? ? ? ? });
//? ? ? ? session.getBasicRemote().sendText(JSON.toJSONString(userList));
? ? ? ? ? ? //向房间的所有人群发谁上线了
? ? ? ? ? ? ChatMessage chatMessage = new ChatMessage();? ----将聊天信息封装起来。
? ? ? ? ? ? chatMessage.setDate(new Date());
? ? ? ? ? ? chatMessage.setStatus(1);
? ? ? ? ? ? chatMessage.setChatContent(users.get(session.getId()));
? ? ? ? ? ? chatMessage.setMessage("");
? ? ? ? ? ? broadcast(roomId, JSON.toJSONString(chatMessage));
? ? ? ? ? ? broadcast(roomId, JSON.toJSONString(userList));
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? }
? ? @OnClose
? ? public void disConnect(@PathParam("roomName") String roomName, Session session) {
? ? ? ? String roomId = roomName.split("[|]")[0];
? ? ? ? String loginId = roomName.split("[|]")[2];
? ? ? ? try {
? ? ? ? ? ? rooms.get(roomId).remove(session);
? ? ? ? ? ? ChatMessage chatMessage = new ChatMessage();
? ? ? ? ? ? chatMessage.setDate(new Date());
? ? ? ? ? ? chatMessage.setUserName(user.getRealname());
? ? ? ? ? ? chatMessage.setStatus(0);
? ? ? ? ? ? chatMessage.setChatContent(users.get(session.getId()));
? ? ? ? ? ? chatMessage.setMessage("");
? ? ? ? ? ? users.remove(session.getId());
? ? ? ? ? ? //向在线的人发送当前在线的人的列表? ----可有可无,根据业务要求
? ? ? ? ? ? List
userList = new LinkedList<>();
? ? ? ? ? ? rooms.get(roomId)
? ? ? ? ? ? ? ? ? ? .stream()
? ? ? ? ? ? ? ? ? ? .map(Session::getId)
? ? ? ? ? ? ? ? ? ? .forEach(s -> {
? ? ? ? ? ? ? ? ? ? ? ? ChatMessage chatMessage1 = new ChatMessage();
? ? ? ? ? ? ? ? ? ? ? ? chatMessage1.setDate(new Date());
? ? ? ? ? ? ? ? ? ? ? ? chatMessage1.setUserName(user.getRealname());
? ? ? ? ? ? ? ? ? ? ? ? chatMessage1.setStatus(1);
? ? ? ? ? ? ? ? ? ? ? ? chatMessage1.setChatContent(users.get(s));
? ? ? ? ? ? ? ? ? ? ? ? chatMessage1.setMessage("");
? ? ? ? ? ? ? ? ? ? ? ? userList.add(chatMessage1);
? ? ? ? ? ? ? ? ? ? });
? ? ? ? ? ? broadcast(roomId, JSON.toJSONString(chatMessage));
? ? ? ? ? ? broadcast(roomId, JSON.toJSONString(userList));
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? }
? ? @OnMessage
? ? public void receiveMsg( String msg, Session session) {
? ? ? ? try {
? ? ? ? ? ? ? ? ChatMessage chatMessage = new ChatMessage();
? ? ? ? ? ? ? ? chatMessage.setUserName(user.getRealname());
? ? ? ? ? ? ? ? chatMessage.setStatus(2);
? ? ? ? ? ? ? ? chatMessage.setChatContent(users.get(session.getId()));
? ? ? ? ? ? ? ? chatMessage.setMessage(msg);
? ? ? ? ? ? ? ? // 按房间群发消息
? ? ? ? ? ? ? ? broadcast(roomId, JSON.toJSONString(chatMessage));
? ? ? ? ? ? }
? ? ? ? } catch (IOException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? }
? ? // 按照房间名进行群发消息
? ? private void broadcast(String roomId, String msg) {
? ? ? ? rooms.get(roomId).forEach(s -> {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? s.getBasicRemote().sendText(msg);? -----此还有一个getAsyncRemote()?
? ? ? ? ? ? } catch (IOException e) {
? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? }
? ? ? ? });
? ? }
? ? @OnError
? ? public void onError(Throwable error) {
? ? ? ? error.printStackTrace();
? ? }
}
友情提示:此session是websocket里的session,并非httpsession;
房间id,>
SpringBoot - WebSocket
websocket 是一种网络通信协议,类似 http 协议
Http 协议有一个缺陷:通信只能由客户端发起
在某种场景下,例如,在外卖场景下,骑手位置更新时,服务器端向客户端发送骑手位置。如果使用 http 协议,那么就只能轮询了,由客户端不停地向服务器端发送请求,获取骑手的位置。
轮询的效率低下,有一定的延迟性,并且频繁的发送请求也会造成资源的浪费。
使用 WebSocket 协议可以 实现由服务器端主动向客户端推送消息 ,当然客户端也可以向服务器端发送消息。
这里仅介绍利用 Spring 框架使用 WebSocket 的方式,原因:Spring 使用 WebSocket 简便且易于扩展。
SpringBoot 使用 WebSocket 非常方便,依赖上仅需要添加相应的 Starter 即可。
先给出概要的开发步骤:
其实到这里,基础的 websocket 服务已经搭建好了,剩下的可以自己在 handler 与 interceptor 中写自己的业务逻辑了
前端页面:
先启动服务器端 SpringBoot 应用,再使用前端页面点击测试一下就 ok 了
Spring Boot + WebSocket 实时消息推送
商家的后台管理系统实现新订单提醒推送功能,利用Spring Boot + WebSocket实时消息推送的方式进行实现。
引入依赖,我使用的是SpringBoot版本2.2.6.RELEASE,自动管理依赖版本
配置类WebSocketConfig,扫描并注册带有@ServerEndpoint注解的所有websocket服务端
新建WebSocketServer类,WebSocket服务端是多例的,一次WebSocket连接对应一个实例
辅助类
新建一个测试类,用于向客户端发送推送消息
1、 启动服务器程序,提供WebSocket服务。
2 、打开前端html客户端页面,连接WebSocket服务器。
3、向客户端发送推送消息
4、客户端收到新订单推送消息
当我们在本地开采用WebSocket用IP连接时是OK的,例如
当我们上线后,用Nginx部署,并用域名连接时就会失败。此时只需要在Nginx配置文件里加入一些配置即可。配置如下
参考文章
Websocket实时推送消息
阿里云折扣快速入口
springbootstart-websocket修改序列化
在springboot中使用websocket时,默认情况下使用的是jackson库进行消息的序列化和反序列化。如果需要修改websocket消息的序列化方式,可以按照以下步骤进行操作:1、创建自定义的消息序列化器:可以实现org.springframework.messaging.simp.stomp.StompEncoder接口来创建自定义的消息序列化器。该接口定义了两个方法encode和supports,分别用于将消息编码为字节数组和判断是否支持某种消息类型。2、配置websocket消息转换器:在SpringBoot的配置类中,通过重写configureMessageConverters方法或添加@Bean注解的方式配置WebSocket消息转换器。在配置中,将自定义的消息序列化器添加到WebSocketMessageBrokerConfigurer中的messageConverter中。3、使用自定义的消息序列化器:在编写WebSocket控制器时,可以直接使用自定义的消息序列化器。
WebSocket SpringBoot实现文件上传进度消息通知
文件上传进度消息:
异步耗时任务完成消息:
常见方案:
根据解析器构造,传入必要参数。该解析器将替代默认实现
spring为WebSocket提供了很好的支持,参照官方文档即可完成服务搭建
继承 WebSocketMessageBrokerConfigurer 类,重写 registerStompEndpoints() configureMessageBroker() configureClientInboundChannel() 方法。
此处通过注解切面,在需要执行的方法前后想Message服务发送消息
该切面将以@SendMessage注解为切入点,利用反射获取形参名及参数值,封装MessageDto,调用Feign接口向消息模块发送消息
文件上传监听日志,成功监听上传进度
文件上传进度消息发送日志
耗时任务消息模块发送日志
前端消息渲染效果
大功告成!
尚有诸多缺点,但保证了基础功能够用,诸位大佬可以做个小参考。
Spring Boot 与 nodejs websocket 通信
开发中突然遇到一个需求,后端分别为Spring Boot rest服务,nodejs websocket 服务。
SpringBoot 服务需要往websocket服务发送消息,所以java服务作为socket 客户端,nodejs为socket服务端。
网上好多案例都是以java作为服务端,后来发现socket.io已经提供了java实现,只需要引入jar包,使用方式与js客户端基本类似。将实现贴出来,以供参考。
socket服务端:nodejs socket.io http
socket服务端:Spring Boot socket.io-client
java 引入socket.io-client:
compile group: 'io.socket', name: 'socket.io-client', version: '1.0.0'
创建SocketClientService
其他需要发消息的地方只需要注入SocketClientService 即可。
其他需要接受消息的地方暂时没需求,所以没有实现。
SpringBoot+Vue+Websocket 实现服务器端向客户端主动发送消息
本文通过一个实际的场景来介绍在前后端分离的项目中通过 WebSocket 来实现服务器端主动向客户端发送消息的应用。主要内容如下
Websocket 是一种在单个 TCP 连接上进行全双工通信的协议。WebSocket 连接成功后,服务端与客户端可以双向通信。在需要消息推送的场景,Websocket 相对于轮询能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。
具体如下特点
在客户端的列表数据中有个 status 字段,服务器端需要花费较长的时间进行处理,处理完成后才会更新对应数据的 status 字段值,通过 Websocket 的处理流程如下:
通过注入 ServerEndpointExporter 类,用于在项目启动的时候自动将使用了 @ServerEndpoint 注解声明的 Websocket endpoint 注册到 WebSocketContainer 中。
为什么增加一个 ServerEndpointExporter Bean,并通过在一个类上增加 @ServerEndpoint 和 @Component 注解就可以实现服务器端 Websocket 功能,这里简单解析一下。
java 定义了一套 javax.servlet-api, 一个 HttpServlet 就是一个 HTTP 服务。java websocket 并非基于 servlet-api 简单扩展, 而是新定义了一套 javax.websocket-api。
一个 websocket 服务对应一个 Endpoint。与 ServletContext 对应, websocket-api 也定义了 WebSocketContainer, 而编程方式注册 websocket 的接口是继承自 WebSocketContainer 的 ServerContainer。
一个 websocket 可以接受并管理多个连接, 因此可被视作一个 server。主流 servlet 容器都支持 websocket, 如 tomcat, jetty 等。看 ServerContainer api 文档, 可从 ServletContext attribute 找到 ServerContainer。
实战spring-boot-starter-websocket之断网心跳续期实践
业务中需要应用到Websocket长连接进行数据传输,由于服务使用的是Zuul1.0版本,对ws协议支持较弱,后续尝试使用了 spring-boot-starter-websocket 来完成的。关于怎么集成的话网上有非常多的文章了,我就不多费口舌了。
我们目前实现的功能是可以通过WebSocket调用接口发送埋点,另外还需要监听用户离开的事件为这个埋点画上一个终止访问时间。目前测试下场景有:
前4点触发了任意操作,服务端都会监听到 DISCONNECT 离开事件。但是第5点直接断网, 服务端竟然是无感知的,这个时候产生的问题就是客户断网了,服务端是认为在线的,如果不重新联网登录的话,那么这个用户将会一直一直在线,埋点会一直计算。完了个蛋~
至于为什么断网之后,ws会认为他是在线的, 可能管道打开了之后由于断网导致断开时间发送不出去吧。
我目前能够想到比较简单的办法就是: 心跳续约
捋清了思路,大概就知道如何做了。
然后特意看了下 spring-boot-starter-websocket 的源码,发现其实他有提供此功能。
先说下如何实现:
我们是在在实现了 DelegatingWebSocketMessageBrokerConfiguration 的配置类中重写 configureMessageBroker 方法。
比较关键就是 setTaskScheduler 和 setHeartbeatValue 一个负责调度、一个负责配置间隔。 这俩要么都填要么都不填。
配置了这俩参数之后,服务启动的时候会触发一个 HeartbeatTask 线程来专门维持心跳。
我们可以看看他的流程是如何运转的。
核心任务调度类: org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler.HeartbeatTask
服务端在启动的时候 SimpleBrokerMessageHandler 在初始化完成之后会回调 start 的方法,然而他会触发一个 startInternal 开始调度任务,判断依据就是有没有配置 TaskScheduler ( 对应的就是配置类中的setTaskScheduler ),一旦启动之后,会根据你给的心跳数组 serverHeartbeat ,来选择调度时间。
org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler
检测心跳、超过间隔则剔除、并且定期回写心跳给客户端。
还有一点需要注意的是读的间隔时间: 假设用户网络不好,心跳漏发了一次,这个时候如果按照本次的逻辑而言,该用户的最后心跳时间肯定会超时。而它的做法是, 将设定的读超时时间 3 ,就相当于有3次机会*。
这个在创建session的时候 SessionInfo 里面就已经做好了处理: org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler.SessionInfo#SessionInfo
所以这个时候,你可能需要根据业务能够接受的时长去配置,也别忘了这个事。
还有很关键的一点就是让客户端的心跳发送间隔和服务端尽可能保持一致,不然有可能出现莫名其妙的下线情况,尽可能还是在这种地方加好日志。
好了,希望在遇到断网问题的时候,能够帮助到你。
如果有疑问请留言,我会尽快答复。
springboot2.1.3整合websocket和websocket-security支持跨域连接
springboot整合websocket和websocket-security支持跨域连接
项目地址: https://gitee.com/xuelingkang/spring-boot-demo
完整配置参考com.example.websocket包
所以继承了StompSubProtocolHandler,WebMvcStompEndpointRegistry,DelegatingWebSocketMessageBrokerConfiguration这三个类,添加websocket自定义拦截器接口,可以在拦截器中自定义websocket授权决策检查
配置类可以重写这个方法,默认该方法返回false,看方法的名字是关闭同源策略,但是只重写这个方法不能解决跨域的问题,还需要在registerStompEndpoints方法中设置允许的域名,"*"代表所有
如果有误导人的地方,欢迎大神批评指正!
springBoot下开发webSocket的sessionId问题
是一个递增的16进制并转为字符串,每次重启服务,这个id的计数又会重新从0开始.
如果建立了多个通道,那他们的id可能为(0,1818,70cc).
因为通道断开,对应的webSocketSession对象被释放,所以不同通道直接的id可能是不连续的.
但在WsSession中id属性是使用final修饰的,无法进行修改,所以只能作罢
因为对webSocket的使用经验比较少,所以就不再考虑使用其他方案对其进行改进.
后续会尝试去研究一下在聊天,游戏领域对于webSocket是如何进行使用的.