GVKun编程网logo

springboot+websocket+redis搭建的实现(springboot websocket redis)

14

想了解springboot+websocket+redis搭建的实现的新动态吗?本文将为您提供详细的信息,我们还将为您解答关于springbootwebsocketredis的相关问题,此外,我们还将

想了解springboot+websocket+redis搭建的实现的新动态吗?本文将为您提供详细的信息,我们还将为您解答关于springboot websocket redis的相关问题,此外,我们还将为您介绍关于spring boot websocket的实现、Spring 系列 (11) - Springboot+WebSocket 实现发送 JSON 消息实例 (二)、SpringBoot + Websocket 实现实时聊天、SpringBoot + websocket 实现模拟设备上下线的新知识。

本文目录一览:

springboot+websocket+redis搭建的实现(springboot websocket redis)

springboot+websocket+redis搭建的实现(springboot websocket redis)

这篇文章主要介绍了springboot+websocket+redis搭建的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

在多负载环境下使用websocket。

一、原因

在某些业务场景,我们需要页面对于后台的操作进行实时的刷新,这时候就需要使用websocket。通常在后台单机的情况下没有任何的问题,如果后台经过Nginx等进行负载的话,则会导致前台不能准备的接收到后台给与的响应。socket属于长连接,其session只会保存在一台服务器上,其他负载及其不会持有这个session,此时,我们需要使用redis的发布订阅来实现,session的共享。

二、环境准备

在https://mvnrepository.com/里,查找websocket的依赖。使用springboot的starter依赖,注意对应自己springboot的版本。

org.springframework.bootspring-boot-starter-websocket2.2.10.RELEASE

除此之外添加redis的依赖,也使用starter版本:

org.springframework.bootspring-boot-starter-data-redis

三、代码

redis监听配置:

/** * @description: redis监听配置类 * @author:weirx * @date:2021/3/22 14:08 * @version:3.0 */ @Configuration public class RedisConfig { /** * description: 手动注册Redis监听到IOC * * @param redisConnectionFactory * @return: org.springframework.data.redis.listener.RedisMessageListenerContainer * @author: weirx * @time: 2021/3/22 14:11 */ @Bean public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(redisConnectionFactory); return container; } }

webSocket配置:

/** * @description: websocket配置类 * @author:weirx * @date:2021/3/22 14:11 * @version:3.0 */ @Configuration public class WebSocketConfig { /** * description: 这个配置类的作用是要注入ServerEndpointExporter, * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint。 * 如果是使用独立的servlet容器,而不是直接使用springboot的内置容器, * 就不要注入ServerEndpointExporter,因为它将由容器自己提供和管理。 * * @return: org.springframework.web.socket.server.standard.ServerEndpointExporter * @author: weirx * @time: 2021/3/22 14:12 */ @Bean public ServerEndpointExporter serverEndpointExporter(){ return new ServerEndpointExporter(); } }

redis工具类:

@Component public class RedisUtil { @Autowired private StringRedistemplate stringRedistemplate; /** * 发布 * * @param key */ public void publish(String key, String value) { stringRedistemplate.convertAndSend(key, value); } }

WebSocket服务提供类:

/** * description: @ServerEndpoint 注解是一个类层次的注解, * 它的功能主要是将目前的类定义成一个websocket服务器端,注解的值将被用于监听用户连接的终端访问URL地址, * 客户端可以通过这个URL来连接到WebSocket服务器端使用springboot的唯一区别是要@Component声明下, * 而使用独立容器是由容器自己管理websocket的,但在springboot中连容器都是spring管理的。 * * @author: weirx * @time: 2021/3/22 14:31 */ @Slf4j @Component @ServerEndpoint("/websocket/server/{loginName}") public class WebSocketServer { /** * 因为@ServerEndpoint不支持注入,所以使用SpringUtils获取IOC实例 */ private RedisMessageListenerContainer redisMessageListenerContainer = ApplicationContextProvider.getBean(RedisMessageListenerContainer.class); /** * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 */ private static AtomicInteger onlineCount = new AtomicInteger(0); /** * concurrent包的线程安全Set,用来存放每个客户端对应的webSocket对象。 * 若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识 */ private static copyOnWriteArraySet webSocketSet = new copyOnWriteArraySet(); /** * 与某个客户端的连接会话,需要通过它来给客户端发送数据 */ private Session session; /** * redis监听 */ private SubscribeListener subscribeListener; /** * 连接建立成功调用的方法 * * @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据 */ @Onopen public void onopen(@PathParam("loginName") String loginName, Session session) { this.session = session; //加入set中 webSocketSet.add(this); //在线数加1 addOnlineCount(); log.info("有新连接[" + loginName + "]加入!当前在线人数为{}", getonlineCount()); subscribeListener = new SubscribeListener(); subscribeListener.setSession(session); //设置订阅topic redisMessageListenerContainer.addMessageListener( subscribeListener, new ChannelTopic(Constants.TOPIC_PREFIX + loginName)); } /** * 连接关闭调用的方法 */ @OnClose public void onClose() throws IOException { //从set中删除 webSocketSet.remove(this); //在线数减1 subOnlineCount(); redisMessageListenerContainer.removeMessageListener(subscribeListener); log.info("有一连接关闭!当前在线人数为{}", getonlineCount()); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息 * @param session 可选的参数 */ @OnMessage public void onMessage(String message, Session session) { log.info("来自客户端的消息:{}", message); //群发消息 for (WebSocketServer item : webSocketSet) { try { item.sendMessage(message); } catch (IOException e) { log.info("发送消息异常:msg = {}", e); continue; } } } /** * 发生错误时调用 * * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { log.info("发生错误,{}", error); } /** * 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。 * * @param message * @throws IOException */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } public int getonlineCount() { return onlineCount.get(); } public void addOnlineCount() { WebSocketServer.onlineCount.getAndIncrement(); } public void subOnlineCount() { WebSocketServer.onlineCount.getAndDecrement(); } }

redis消息发布:

@Autowired private RedisUtil redisUtil; @Override public Result send(String loginName, String msg) { //推送站内信webSocket redisUtil.publish("TOPIC" + loginName, msg); return Result.success(); }

前端vue代码:

消息内容: {{ responseData }}

四、测试

发送前

发送后

到此这篇关于springboot+websocket+redis搭建的实现的文章就介绍到这了,更多相关springboot websocket redis搭建内容请搜索小编以前的文章或继续浏览下面的相关文章希望大家以后多多支持小编!

spring boot websocket的实现

spring boot websocket的实现

简单介绍

    WebSocket是为浏览器和服务端提供双工艺部通信功能一种工具,即浏览器可以先服务端发送消息,服务端也可以先浏览器发送消息。现在支持Websocket的浏览器有  IE10+,Crome13+,FileFox6+。

WebSocket子协议

    WebSocket只是一个消息传递的体系结构,没有指定任何的消息传递协议。与HTTP协议不同的是,WebSocket只是一个应用层的协议,它非常简单,并不能理解传入的消息,也不能对消息进行路由或处理,因此WebSocket协议只是一个应用层的协议,其上需要一个框架来理解和处理消息。

    Spring框架提供了对使用STOMP子协议的支持。STOMP,全称Streaming Text Orientated Message Protol,流文本定向协议。STOMP是一个简单的消息传递协议,是一种为MOM(Message Orientated  Middleware,面向消息的中间件)设计的简单文本协议。STOMP提供了一个可操作的连接格式,允许STOMP客户端与任意代理(Broker)进行交互,类似于OpenWire(一种二进制协议)。

    

Spring Boot的WebSocket实现

     SpringBoot对内嵌的Tomcat(7或者8)、Jetty9和Undertow使用了WebSocket提供了支持。

广播式

广播式即服务端有消息时,会将消息发送到所有连接了当前endpoint的浏览器。

配置WebSocket

需要在配置类上使用@EnableWebSocketMessageBroker开启WebSocket支持,并通过集成AbstractWebSocketMessageBrokerConfigurer类,重写其方法来配置WebSocket。
        

package com.example.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;

/**
 * Created by lenovo on 2017/3/15.
 */
@Configuration
@EnableWebSocketMessageBroker //通过@EnableWebSocketMessageBroker 注解凯旗使用STOMP协议来传输基于代理(message broker)的消息
//这时控制器支持使用@MessageMapping,就像使用@RequestMapping一样
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer{


    @Override
    public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {
        stompEndpointRegistry.addEndpoint("/endpoint").withSockJS();//注册STOMP协议的节点,映射指定的URL,并指定使用SockJS协议
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {//配置消息代码(Message Broker)
        registry.enableSimpleBroker("/topic");//广播式应配置一个/topic消息代理
    }
}

消息的接收器、发送器和控制器

package com.example.model;

/**
 * Created by lenovo on 2017/3/15.
 */
public class MessageSender {
    private String msg;

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public MessageSender(String msg) {
        this.msg = msg;
    }
}
package com.example.model;

import java.io.Serializable;

/**
 * Created by lenovo on 2017/3/15.
 */
public class MessageAcceptor implements Serializable{

    private String msg;

    public String getMsg() {
        return msg;
    }

}
package com.example.websocket;

import com.example.model.MessageAcceptor;
import com.example.model.MessageSender;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

/**
 * Created by lenovo on 2017/3/15.
 */
@Controller
public class TestWeb {

    @MessageMapping(value = "/message/test")//当浏览器向服务端发送请求时,通过@MessageMapping映射的地址,类似于@RequestMapping
    @SendTo(value = "/topic/response")//当服务端有消息时,会对订阅了@SendTo中的路径的浏览器发送消息
    public MessageSender say(MessageAcceptor acceptor){
        return new MessageSender("HELLO!"+acceptor.getMsg());
    }

    @RequestMapping("index")
    public String index(){
        return "index";
    }
}

准备 WebSocket需要的前端文件。

下载stomp.min.js和sockjs.min.js文件,并放在static下,然后在templates下新建index.html页面

stomp的API参考链接:https://segmentfault.com/a/11...

<!DOCTYPE html>
<html xmlns="http://www.thymeleaf.org">
<head>
    <meta charset="UTF-8"/>
    <title>Title</title>
</head>
<body>
<div>
    <button onclick="connect()" id="connect">连接</button>
    <button onclick="disconnect()" id="disconnect">断开</button>
</div>
<div>
    <input type="text" id="name"/>
    <button id="send">发送</button>
</div>
<div id="msg">

</div>
<script th:src="@{stomp.min.js}"></script>
<script th:src="@{sockjs.min.js}"></script>
<script>
    var stompClient = null;

    function connect() {
        var socket = new SockJS("/endpoint");//连接SockJS的endpoint名称为"/endpoint"
        stompClient = Stomp.over(socket);//使用STOMP子协议的WebSocket客户端
        stompClient.connect({}, function (frame) {//连接WebSocket服务端
            stompClient.subscribe("/topic/response", function (msg) {//通过stopmClient.subscribe订阅"/topic/response"目标发送的消息,这个路径是在控制器的@SendTo中定义的
                console.log(msg);
                var msgDom = document.getElementById("msg");
                var html = msgDom.innerHTML;
                msgDom.innerHTML = html + "\n" + msg.body;
            });
        });

    }

    function disconnect() {
        if(stompClient!=null){
            stompClient.disconnect();
        }
    }

    function send() {
        var name = document.getElementById("name").value;
        stompClient.send("/message/test", {}, JSON.stringify({//通过stompClient.send向"/message/test"目标发送消息,这个在控制器的@MessageMapping中定义的。
            ''msg'': name
        }));
    }

    document.getElementById("send").onclick = send;
</script>
</body>
</html>

上述代码都已经准备好了,那么一起来看一看运行效果

如预期一样,在连接了WebSocket的客户端发送消息时,其它同样连接了WebSocket的客户端浏览器也收到了消息,没有连接WebSocket的客户端则没有收到消息

看完了代码和代码的运行效果,我们再来看一看WebSocket运行中STOMP的帧

连接STOMP服务端帧:CONNECT↵accept-version:1.1,1.0↵heart-beat:10000,10000 

连接STOMP服务端成功帧:CONNECTED↵version:1.1↵heart-beat:0,0 

订阅目标/topic/response:SUBSCRIBE↵id:sub-0↵destination:/topic/response 

向目标/message/test发送消息:SEND↵destination:/message/test↵content-length:16↵↵{"msg":"测试"} 

从目标/topic/response接收到消息:MESSAGE↵destination:/topic/response↵content-type:application/json;charset=UTF-8↵subscription:sub-0↵message-id:hstpp2xl-0↵content-length:22↵↵{"msg":"HELLO!测试"} 

点对点式

广播式有自己的应用场景,但是广播式不能解决我们我们一个常见的问题,即消息由谁发送,由谁接受的问题。

1.在进行点对点传递消息的时候,必然发生在两个用户之间的行为,那么就需要添加用户相关的内容,在这里先完成一个简单的登陆。

首先添加Spring Security的starter pom:    

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-security</artifactId>
        </dependency>

2.然后进行spring security的简单配置

  

package com.example.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.authentication.builders.AuthenticationManagerBuilder;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.builders.WebSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;

/**
 * Created by lenovo on 2017/3/17.
 */
@Configuration
@EnableWebSecurity
public class WebSecurityConfig extends WebSecurityConfigurerAdapter {

    /**
     * 权限管理配置构造器
     *
     * @param auth 权限管理
     * @throws Exception
     */
    @Override
    protected void configure(AuthenticationManagerBuilder auth) throws Exception {
        //配置了两个用户和对应的密码,并且申明了他们的角色
        auth.inMemoryAuthentication().withUser("muxiao").password("123456").roles("USER")
                .and().withUser("hahaha").password("123456").roles("USER");//在内存中分别配置两个用户muxiao和hahaha
    }

    /**
     * Web安全配置
     * @param web
     * @throws Exception
     */
    @Override
    public void configure(WebSecurity web) throws Exception {
        //静态资源不做安全校验
        web.ignoring().antMatchers("/resources/static/**");///resources/static/目录下的静态资源,不拦截
    }

    /**
     * 配置http安全
     * @param http
     * @throws Exception
     */
    @Override
    protected void configure(HttpSecurity http) throws Exception {
        //简单的配置运行点对点所需要的登陆权限
        http.authorizeRequests()
                .antMatchers("/","login").permitAll()//设置Spring Security对/和/login路径不拦截
                .anyRequest().authenticated()
                .and().formLogin().loginPage("/login")//设置登录页面访问的路径为/login
                .defaultSuccessUrl("/chat").permitAll()//登陆成功后转向chat页面
                .and().logout().permitAll();
    }
}
        然后在TestWeb中增加一个MessageMapping接口:

  

  @Autowired private SimpMessagingTemplate messagingTemplate;//spring实现的一个发送模板类 

    @MessageMapping("/chat")
    public void handlerChat(Principal principal, String msg) {//springmvc中可以直接在参数中获得pricipal,pricipal中包含当前永不的信息
        if (principal.getName().equalsIgnoreCase("muxiao")) {
            messagingTemplate.convertAndSendToUser("hahaha","/queue/notice",principal.getName()+":"+msg);
        } else {
            messagingTemplate.convertAndSendToUser("muxiao","/queue/notice",principal.getName()+":"+msg);
            //通过messaginTemplate.converAndSendTiUser向用户发送消息,第一次参数是接受信息的用户,第二个是浏览器订阅的地址,第三个是消息本身
        }
    }

3.同时定义需要的页面访问路径:

package com.example.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.ViewControllerRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;

/**
 * Created by lenovo on 2017/3/17.
 */
@Configuration
public class WebViewConfig extends WebMvcConfigurerAdapter {
    @Override
    public void addViewControllers(ViewControllerRegistry registry) {
        registry.addViewController("/chat").setViewName("/chat");
        registry.addViewController("/login").setViewName("/login");
    }
}

4.我们已经准备好了所需要的后台,这时候就开始实现我们需要的功能的前端编写了。

首先,实现登陆页面,在浏览器中访问除过"/","/index"之外的其它页面,都会来到login页面以进行登陆,即下面的页面:

<!DOCTYPE html>
<html xmlns="http://www.thymeleaf.org">
<head>
    <meta charset="UTF-8" />
    <title>Title</title>
</head>
<body>
<form th:action="@{/login}" method="post">
    <input type="text" name="username"/>
    <input type="password" name="password"/>
    <input type="submit"/>
</form>
</body>
</html>
              输入我们在内存中指定的用户名和密码,登陆进入chat页面

<!DOCTYPE html>
<html xmlns="http://www.thymeleaf.org">
<head>
    <meta charset="UTF-8" />
    <title>Title</title>
</head>
<body>
<textarea id="content"></textarea>
<input type="button" value="发送" onclick="send()"/>
<div id="out">

</div>
<script th:src="@{sockjs.min.js}"></script>
<script th:src="@{stomp.min.js}"></script>
<script>
    var sock = new SockJS("/endpointOneToOne");//连接"endpointOneToOne"
    var stomp = Stomp.over(sock);
    stomp.connect({},function(frame){
        //订阅/user/queue/notice发送的消息,这里与在控制器的messagingTemplate.convertAndSendToUser中定义的订阅地址保持一致。
        //这里多一个/user,并且这个/user是必须的,使用了/user才会发送消息到指定的用户
        stomp.subscribe("/user/queue/notice",function(message){//
            var out = document.getElementById("out");
            var html = out.innerHTML;
            out.innerHTML = html +"<br />"+message.body;
        })
    });

    function send(){
        var content = document.getElementById("content").value;
        stomp.send("/chat",{},content);
    }
</script>
</body>
</html>

同时在两个浏览器上面,用在内存中指定的两个用户登陆,这样两个用户就可以互相发送消息了,延时效果如下:

           图片描述 

Spring 系列 (11) - Springboot+WebSocket 实现发送 JSON 消息实例 (二)

Spring 系列 (11) - Springboot+WebSocket 实现发送 JSON 消息实例 (二)

STOMP 即 Simple (or Streaming) Text Orientated Messaging Protocol,简单(流)文本定向消息协议,它提供了一个可互操作的连接格式,允许 STOMP 客户端与任意 STOMP 消息代理(broker)进行交互。

本文使用 STOMP 来实现发送 JSON 消息实例。


1. 开发环境

    Windows版本:Windows 10 Home (20H2)   
    IntelliJ IDEA (https://www.jetbrains.com/idea/download/):Community Edition for Windows 2020.1.4
    Apache Maven (https://maven.apache.org/):3.8.1

    注:Spring 开发环境的搭建,可以参考 “ Spring基础知识(1)- Spring简介、Spring体系结构和开发环境配置 ”。


2. 创建 Spring Boot 基础项目

    项目实例名称:SpringbootExample11
    Spring Boot 版本:2.6.6

    创建步骤:

        (1) 创建 Maven 项目实例 SpringbootExample11;
        (2) Spring Boot Web 配置;
        (3) 导入 Thymeleaf 依赖包;
        (4) 配置 jQuery;
        
    具体操作请参考 “Spring 系列 (2) - 在 Spring Boot 项目里使用 Thymeleaf、JQuery+Bootstrap 和国际化” 里的项目实例 SpringbootExample02,文末包含如何使用 spring-boot-maven-plugin 插件运行打包的内容。

    SpringbootExample11 和 SpringbootExample02 相比,SpringbootExample11 不配置 Bootstrap、模版文件(templates/*.html)和国际化。


3. 配置 Security

    1) 修改 pom.xml,导入 Security 依赖包

 1         <project ... >
 2             ...
 3             <dependencies>
 4                 ...
 5 
 6                 <!-- Spring security -->
 7                 <dependency>
 8                     <groupId>org.springframework.boot</groupId>
 9                     <artifactId>spring-boot-starter-security</artifactId>
10                 </dependency>
11 
12                 ...
13             </dependencies>
14 
15             ...
16         </project>


        在IDE中项目列表 -> SpringbootExample11 -> 点击鼠标右键 -> Maven -> Reload Project

     2) 修改 src/main/resources/application.properties 文件,添加如下配置

        # security
        spring.security.user.name=admin
        spring.security.user.password=123456
        spring.security.user.roles=admin

        运行并访问 http://localhost:9090/test,自动跳转到 http://localhost:9090/login (Spring security 的默认页面),输入上面的用户名和密码登录,登录后跳转到 http://localhost:9090/test。


4. 配置 STOMP

    1) 修改 pom.xml,导入 WebSocket 依赖包

 1        <project ... >
 2             ...
 3             <dependencies>
 4                 ...
 5 
 6                 <dependency>
 7                     <groupId>org.springframework.boot</groupId>
 8                     <artifactId>spring-boot-starter-websocket</artifactId>
 9                 </dependency>
10 
11                 ...
12             </dependencies>
13 
14             ...
15         </project>


        在IDE中项目列表 -> SpringbootExample11 -> 点击鼠标右键 -> Maven -> Reload Project

    2) 创建 src/main/java/com/example/ws/WsstompConfig.java 文件

 1         package com.example.ws;
 2 
 3         import org.springframework.beans.factory.annotation.Autowired;
 4         import org.springframework.context.annotation.Configuration;
 5         import org.springframework.messaging.simp.config.MessagebrokerRegistry;
 6         import org.springframework.web.socket.config.annotation.EnableWebSocketMessagebroker;
 7         import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
 8         import org.springframework.web.socket.config.annotation.WebSocketMessagebrokerConfigurer;
 9 
10         @Configuration
11         @EnableWebSocketMessagebroker
12         public class WsstompConfig implements WebSocketMessagebrokerConfigurer {
13             @Autowired
14             private WsstompInterceptor wsstompInterceptor;
15 
16             @Override
17             public void registerStompEndpoints(StompEndpointRegistry registry) {
18                 // 配置客户端尝试连接地址
19                 registry.addEndpoint("/websocket")    // 设置连接节点,http://hostname:port/websocket
20                         //.setHandshakeHandler()               // 握手处理,主要是连接的时候认证获取其他数据验证等
21                         .addInterceptors(wsstompInterceptor)   // 设置握手拦截器
22                         .setAllowedOriginPatterns("*")         // 配置跨域, 不能用 setAllowedOrigins("*")
23                         .withSockJS();                         // 开启 sockJS 支持,这里可以对不支持 stomp 的浏览器进行兼容
24             }
25 
26             @Override
27             public void configureMessagebroker(MessagebrokerRegistry registry) {
28                 // 这里使用的是内存模式,生产环境可以使用 RabbitMQ 或者其他 MQ。
29                 // 点对点应配置一个 /queue 消息代理,广播式应配置一个 /topic 消息代理
30                 registry.enableSimplebroker("/topic", "/queue");
31 
32                 // 客户端向服务端发送消息需有 /app 前缀
33                 registry.setApplicationDestinationPrefixes("/app");
34 
35                 // 指定用户发送(一对一)的前缀 /user/
36                 registry.setUserDestinationPrefix("/user/");
37             }
38         }


    3) 创建 src/main/java/com/example/ws/WsstompInterceptor.java 文件

 1         package com.example.ws;
 2 
 3         import java.util.Map;
 4 
 5         import org.springframework.stereotype.Component;
 6         import org.springframework.http.server.ServerHttpRequest;
 7         import org.springframework.http.server.ServerHttpResponse;
 8         import org.springframework.web.socket.WebSocketHandler;
 9         import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
10 
11         @Component
12         public class WsstompInterceptor extends HttpSessionHandshakeInterceptor {
13             @Override
14             public boolean beforeHandshake(ServerHttpRequest request,
15                                         ServerHttpResponse response,
16                                         WebSocketHandler wsHandler,
17                                         Map<String, Object> attributes) throws Exception {
18                 return super.beforeHandshake(request, response, wsHandler, attributes);
19             }
20 
21             @Override
22             public void afterHandshake(ServerHttpRequest request,
23                                     ServerHttpResponse response,
24                                     WebSocketHandler wsHandler,
25                                     Exception ex) {
26                 super.afterHandshake(request, response, wsHandler, ex);
27             }
28 
29         }


    4) 创建 src/main/java/com/example/ws/WSMessage.java 文件

 1         package com.example.ws;
 2 
 3         public class WSMessage {
 4             private String content;
 5 
 6             public String getContent() {
 7                 return content;
 8             }
 9 
10             public void setContent(String content) {
11                 this.content = content;
12             }
13         }


    5) 创建 src/main/java/com/example/ws/WsstompHandler.java 文件

 1         package com.example.ws;
 2 
 3         import java.security.Principal;
 4 
 5         import org.springframework.beans.factory.annotation.Autowired;
 6         import org.springframework.web.bind.annotation.RestController;
 7         import org.springframework.messaging.simp.SimpMessagingTemplate;
 8         import org.springframework.messaging.simp.annotation.SubscribeMapping;
 9         import org.springframework.messaging.handler.annotation.DestinationVariable;
10         import org.springframework.messaging.handler.annotation.MessageMapping;
11         import org.springframework.messaging.handler.annotation.SendTo;
12 
13         @RestController
14         public class WsstompHandler {
15             @Autowired
16             private SimpMessagingTemplate simpMessagingTemplate;
17 
18             // broadcast
19             @MessageMapping("/broadcast")
20             @SendTo("/topic/broadcast")
21             public WSMessage broadcast(WSMessage requestMsg) {
22                 // 这里是有 return,如果不写 @SendTo 默认和 /topic/broadcast 一样
23 
24                 WSMessage responseMsg = new WSMessage();
25                 responseMsg.setContent(requestMsg.getContent() + " - from server (broadcast)");
26 
27                 return responseMsg;
28             }
29 
30             // User
31             @MessageMapping("/one")
32             //@SendToUser("/queue/one") 如果存在 return, 可以使用这种方式
33             public void one(WSMessage requestMsg, Principal principal) {
34                 // 注意为什么使用 queue,主要目的是为了区分广播和队列的方式。实际采用 topic,也没有关系。但是为了好理解
35 
36                 WSMessage responseMsg = new WSMessage();
37                 responseMsg.setContent(requestMsg.getContent() + " - from server (User)");
38 
39                 simpMessagingTemplate.convertAndSendToUser(principal.getName(), "/queue/one", responseMsg);
40             }
41 
42             // Subscribe
43             @SubscribeMapping("/subscribe/{id}")
44             public WSMessage subscribe(@DestinationVariable Long id) {
45 
46                 WSMessage responseMsg = new WSMessage();
47                 responseMsg.setContent("Subscribe success - from server (Subscribe)");
48 
49                 return responseMsg;
50             }
51 
52         }

 

  注:WsstompHandler 是 @RestController 注解修饰的类,或许定义成 WsstompController 更符合习惯,这里暂时作为 Handler 来定义。


5. 配置 SockJS、StompJS

    SockJS: https://github.com/sockjs/sockjs-client/tree/main/dist
    StomJS:https://github.com/jmesnil/stomp-websocket/tree/master/lib
    
    本文使用 sockjs.min.js(1.6.0) 和 stomp.min.js(2.3.4),两者放到 src/main/resources/static/lib/ 目录下。
    
    目录结构如下

      static
        |
        |- lib
            |- jquery
            |     |- jquery-3.6.0.min.js
            |
            |- sockjs.min.js
            |- sockjs.min.js.map
            |- stomp.min.js


6. 测试实例 (Web 模式)

    1) 创建 src/main/resources/templates/stomp_client.html 文件

  1         <html lang="en" xmlns:th="http://www.thymeleaf.org">
  2         <head>
  3             <Meta charset="UTF-8">
  4             <title>STOMP Client</title>
  5             <script language="javascript" th:src="@{/lib/jquery/jquery-3.6.0.min.js}"></script>
  6             <script language="javascript" th:src="@{/lib/sockjs.min.js}"></script>
  7             <script language="javascript" th:src="@{/lib/stomp.min.js}"></script>
  8         </head>
  9         <body>
 10             <h4>WebSocket STOMP - Client</h4>
 11             <hr>
 12 
 13             <p>
 14                 <label>WebSocket Url:</label><br>
 15                 <input id="url" type="text" th:value="@{/websocket}" value="http://localhost:9090/websocket"><br><br>
 16                 <button id="connect">Connect</button>&nbsp;<button id="disconnect" disabled="disabled">disconnect</button><br><br>
 17 
 18                 <label>Subscribed Message: </label><br>
 19                 <input id="subscribeMsg" type="text" disabled="disabled"><br><br>
 20             </p>
 21             <hr>
 22             <p>
 23                 <label>broadcast: </label><br>
 24                 <input id="broadcastText" type="text"value="broadcast message"><button id="broadcastButton">Send</button><br><br>
 25 
 26                 <label>Return Message: </label><br>
 27                 <input id="broadcastMsg" type="text" disabled="disabled"><br><br>
 28             </p>
 29             <hr>
 30             <p>
 31                 <label>User: </label><br>
 32                 <input id="userText" type="text"value="user message"><button id="userButton">Send</button><br><br>
 33 
 34                 <label>Return Message: </label><br>
 35                 <input id="userMsg" type="text" disabled="disabled"><br><br>
 36             </p>
 37             <hr>
 38             <p>
 39                 <label>App: </label><br>
 40                 <input id="appText" type="text"value="app message"><button id="appButton">Send</button><br><br>
 41 
 42                 <label>Return Message: </label><br>
 43                 <input id="appMsg" type="text" disabled="disabled"><br><br>
 44             </p>
 45 
 46             <p>&nbsp;</p>
 47 
 48             <script type="text/javascript">
 49                 var stomp = null;
 50 
 51                 $(document).ready(function() {
 52 
 53                     $("#connect").click(function () {
 54 
 55                         var url = $("#url").val();
 56                         if (url == "") {
 57                             alert("Please enter url");
 58                             $("#url").focus();
 59                             return;
 60                         }
 61 
 62                         var socket = new SockJS(url);
 63                         stomp = Stomp.over(socket);
 64 
 65                         // Connect
 66                         stomp.connect({}, function (frame) {
 67                             // Subscribe broadcast
 68                             stomp.subscribe("/topic/broadcast", function (res) {
 69                                 $("#broadcastMsg").val(res.body);
 70                             });
 71 
 72                             // Subscribe
 73                             stomp.subscribe("/app/subscribe/1", function (res) {
 74                                 $("#subscribeMsg").val(res.body);
 75                             });
 76 
 77                             // User
 78                             stomp.subscribe("/user/queue/one", function (res) {
 79                                 $("#userMsg").val(res.body);
 80                             });
 81 
 82                             // App
 83                             stomp.subscribe("/topic/app", function (res) {
 84                                 $("#appMsg").val(res.body);
 85                             });
 86                             setConnect(true);
 87                         });
 88                     });
 89 
 90                     $("#disconnect").click(function () {
 91                         if (stomp != null) {
 92                             stomp.disconnect();
 93                             stomp = null;
 94                         }
 95                         setConnect(false);
 96                     });
 97 
 98                     // Send broadcast message
 99                     $("#broadcastButton").click(function () {
100                         if (stomp == null) {
101                             alert("Please connect to server");
102                             return;
103                         }
104                         var msg = $("#broadcastText").val();
105                         if (msg == '') {
106                             alert("Please enter broadcast text");
107                             $("#broadcastText").focus();
108                             return;
109                         }
110                         stomp.send("/app/broadcast", {}, JSON.stringify({"content": msg}))
111                     });
112 
113                     // Send user message
114                     $("#userButton").click(function () {
115                         if (stomp == null) {
116                             alert("Please connect to server");
117                             return;
118                         }
119                         var msg = $("#userText").val();
120                         if (msg == '') {
121                             alert("Please enter user text");
122                             $("#userText").focus();
123                             return;
124                         }
125                         stomp.send("/app/one", {}, JSON.stringify({"content": msg}))
126                     });
127 
128                     // Send app message
129                     $("#appButton").click(function () {
130                         if (stomp == null) {
131                             alert("Please connect to server");
132                             return;
133                         }
134                         var msg = $("#appText").val();
135                         if (msg == '') {
136                             alert("Please enter app text");
137                             $("#appText").focus();
138                             return;
139                         }
140                         stomp.send("/topic/app", {}, JSON.stringify({"content": msg}))
141                     });
142 
143                 });
144 
145                 // Set buttons
146                 function setConnect(connectStatus) {
147                     $("#connect").attr("disabled", connectStatus);
148                     $("#disconnect").attr("disabled", !connectStatus);
149                 }
150 
151             </script>
152         </body>
153         </html>


    2) 修改 src/main/java/com/example/controller/IndexController.java 文件

 1         package com.example.controller;
 2 
 3         import org.springframework.stereotype.Controller;
 4         import org.springframework.web.bind.annotation.RequestMapping;
 5         import org.springframework.web.bind.annotation.ResponseBody;
 6 
 7         @Controller
 8         public class IndexController {
 9             @ResponseBody
10             @RequestMapping("/test")
11             public String test() {
12                 return "Test Page";
13             }
14 
15             @RequestMapping("/stomp/client")
16             public String stompClient() {
17                 return "stomp_client";
18             }
19 
20         }


    访问 http://localhost:9090/stomp/client


SpringBoot + Websocket 实现实时聊天

SpringBoot + Websocket 实现实时聊天

SpringBoot + WebSocket 实现实时聊天

最近有点小时间,上个项目正好用到了websocket实现广播消息来着,现在来整理一下之前的一些代码,分享给大家。

WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。

一、环境介绍

开发工具:IntelliJ IDEA

运行环境:SpringBoot2.x、ReconnectingWebSocket、JDK1.8+、Maven 3.6 +

ReconnectingWebSocket 是一个小型的 JavaScript 库,封装了 WebSocket API 提供了在连接断开时自动重连的机制。

只需要简单的将:

ws = new WebSocket(''ws://....'');

替换成:

ws = new ReconnectingWebSocket(''ws://....'');

WebSocket 属性ws.readyState:

​ 0 - 表示连接尚未建立。

​ 1 - 表示连接已建立,可以进行通信。

​ 2 - 表示连接正在进行关闭。

​ 3 - 表示连接已经关闭或者连接不能打开。

WebSocket事件:

事件 事件处理程序 描述
open ws.onopen 连接建立时触发
message ws.onmessage 客户端接收服务端数据时触发
error ws.onerror 通信发生错误时触发
close ws.onclose 连接关闭时触发

WebSocket方法:

方法 描述
Socket.send() 使用连接发送数据
Socket.close() 关闭连接

二、代码实现

(一)、创建SpringBoot项目

在这里插入图片描述

(二)、添加 pom 依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-thymeleaf</artifactId>
    </dependency>
   <!-- springbooot 集成 websocket -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>commons-lang</groupId>
        <artifactId>commons-lang</artifactId>
        <version>2.5</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.71</version>
    </dependency>
</dependencies>

(三)、编写前端模板index.html

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8"/>
    <title>SpringBoot-ws</title>
    <script src="../js/reconnectingwebsocket.js" type="text/javascript" charset="utf-8"></script>
    <!--    <script src="../js/sockjs.min.js" type="text/javascript" charset="utf-8"></script>-->
    <script src="../js/jquery.min.js" type="text/javascript" charset="utf-8"></script>
    <link rel="stylesheet" type="text/css" href="../css/style.css">
</head>
<body>
<div id="info">
    <div>发送人:<input type="text" id="suer" required="required" placeholder="请输入发送人"></div>
    <div>接收人:<input type="text" id="ruser" required="required" placeholder="请输入接收人"></div>
</div>
<div id="index">
</div>
<div class="msg">
    <textarea id="send_content" placeholder="在此输入消息..."></textarea>
</div>
<div class="ibtn c">
    <button onclick=openWebsocket()>开启连接</button>
    <button onclick=closeWebsocket()>关闭连接</button>
    <button onclick=sendMessage()>发送消息</button>
</div>
<script type="text/javascript">
    document.getElementById(''send_content'').focus();

    var websocket = null;

    //关闭websocket
    function closeWebsocket() {
        //3代表已经关闭
        if (3 != websocket.readyState) {
            websocket.close();
        } else {
            alert("websocket之前已经关闭");
        }
    }

    // 开启websocket
    function openWebsocket() {
        username = $("#suer").val()
        if (username != "") {

            //当前浏览前是否支持websocket
            if ("WebSocket" in window) {
                websocket = new ReconnectingWebSocket("ws://localhost:8080/send/" + username);
                websocket.reconnectInterval = 3000 //每3s进行一次重连,默认是每秒
            } else if (''MozWebSocket'' in window) {
                websocket = new MozWebSocket("ws://localhost:8080/send/" + username);
            } else {
                //低版本
                websocket = new SockJS("http://localhost:8080/sockjs/send/" + username);
            }
        }
        websocket.onopen = function (event) {
            setMessage("打开连接");
        }

        websocket.onclose = function (event) {
            setMessage("关闭连接");
        }

        websocket.onmessage = function (event) {
            // setMessage(event.data);
            setMessageTxt(event.data)

        }

        websocket.onerror = function (event) {
            setMessage("连接异常,正在重连中...");
        }

        //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
        window.onbeforeunload = function () {
            closeWebsocket();
        }
    }

    //将消息显示在网页上
    function setMessage(message) {
        alert(message)
    }

    function setMessageTxt(message) {
        mObj = JSON.parse(message)
        var div = document.createElement(''div'')
        div.innerHTML = "<divname l''><h2>" + mObj[''from_topic''] + "</h2></div>" +
            "<divcontent w l''>" + mObj[''content''] + "</div>"
        div.setAttribute("class", "from_info")
        document.getElementById(''index'').appendChild(div)
    }

    // 发送消息
    function sendMessage() {
        //1代表正在连接
        if (1 == websocket.readyState) {
            var message = document.getElementById(''send_content'').value;
            var div = document.createElement(''div'')
            div.innerHTML = "<divname r rcontent''><h2> Me </h2></div>" +
                "<divcontent w r''>" + message + "</div>"
            div.setAttribute("class", "send_info")
            document.getElementById(''index'').appendChild(div)
            ruser = document.getElementById("ruser").value;
            message = "{''content'':''" + message + "'',''to_topic'':''" + ruser + "''}"
            websocket.send(message);
        } else {
            alert("websocket未连接");
        }
        document.getElementById(''send_content'').value = "";
        document.getElementById(''send_content'').focus();
    }
</script>
</body>
</html>

(四)、服务端代码编写

  1. 编写 SWCrontroller.java 类
 package com.jhzhong.swchat.controller;
 
 import com.jhzhong.swchat.websocket.WebSocketServer;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Controller;
 import org.springframework.web.bind.annotation.*;
 
 @Controller
 public class SWController {
 
     @Autowired
     private WebSocketServer webSocketServer;
 
     /**
      * author: jhzhong95@gmail.com
      * date: 2020-06-24 12:35 AM
      * desc: 跳转index.html页面
      * @return
      */
     @RequestMapping("/")
     public String index() {
         return "index";
     }
 }
  1. 编写WebSocketConfig.java 类,开启WebSocket支持。
 package com.jhzhong.swchat.websocket;
 
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.web.socket.server.standard.ServerEndpointExporter;
 
 /**
  * author: jhzhong95@gmail.com
  * date: 2020-06-24 12:28 AM
  * desc: 开启WebSocket支持
  */
 @Configuration
 public class WebSocketConfig {
 
     @Bean
     public ServerEndpointExporter serverEndpointExporter(){
         return new ServerEndpointExporter();
     }
 }
  1. 编写核心代码类 WebSocketServer.java
 package com.jhzhong.swchat.websocket;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import freemarker.log.Logger;
 import org.apache.commons.lang.StringUtils;
 import org.springframework.stereotype.Component;
 
 import javax.websocket.*;
 import javax.websocket.server.PathParam;
 import javax.websocket.server.ServerEndpoint;
 import java.io.IOException;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * author: jhzhong95@gmail.com
  * date: 2020-06-24 12:40 AM
  * desc: WebSocket服务端
  */
 @ServerEndpoint("/send/{topic}")
 @Component
 public class WebSocketServer {
     static Logger logger = Logger.getLogger("WebSocketServer");
     /**
      * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
      */
     private static int onlineCount = 0;
     /**
      * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
      */
     private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
     /**
      * 与某个客户端的连接会话,需要通过它来给客户端发送数据
      */
     private Session session;
     /**
      * 接收频道topic
      */
     private String topic = "";
 
     /**
      * 连接建立成功调用的方法
      */
     @OnOpen
     public void onOpen(Session session, @PathParam("topic") String topic) {
         this.session = session;
         this.topic = topic;
         if (webSocketMap.containsKey(topic)) {
             webSocketMap.remove(topic);
             webSocketMap.put(topic, this);
             //加入set中
         } else {
             webSocketMap.put(topic, this);
             //加入set中
             addOnlineCount();
             //在线数加1
         }
 
         logger.info("用户连接:" + topic + ",当前在线人数为:" + getOnlineCount());
         try {
             sendMessage("连接成功");
         } catch (IOException e) {
             logger.error("用户:" + topic + ",网络异常!!!!!!");
         }
     }
 
 
     /**
      * 连接关闭调用的方法
      */
     @OnClose
     public void onClose() {
         if (webSocketMap.containsKey(topic)) {
             webSocketMap.remove(topic);
             //从set中删除
             subOnlineCount();
         }
         logger.info("用户退出:" + topic + ",当前在线人数为:" + getOnlineCount());
     }
 
     /**
      * 收到客户端消息后调用的方法
      *
      * @param message 客户端发送过来的消息
      */
     @OnMessage
     public void onMessage(String message, Session session) {
         logger.info("用户:" + topic + ",信息:" + message);
         //可以群发消息
         //消息保存到数据库、redis
         if (StringUtils.isNotBlank(message)) {
             try {
                 //解析发送的报文
                 JSONObject jsonObject = JSON.parseObject(message);
                 //追加发送人(防止串改)
                 jsonObject.put("from_topic", this.topic);
                 String to_topic = jsonObject.getString("to_topic");
                 //传送给对应toUserId用户的websocket
                 if (StringUtils.isNotBlank(to_topic) && webSocketMap.containsKey(to_topic)) {
                     webSocketMap.get(to_topic).sendMessage(jsonObject.toJSONString());
                 } else {
                     logger.error("请求的to_topic:" + to_topic + "不在该服务器上");
                     //否则不在这个服务器上,发送到mysql或者redis
                 }
             } catch (Exception e) {
                 e.printStackTrace();
             }
         }
     }
 
     /**
      * @param session
      * @param error
      */
     @OnError
     public void onError(Session session, Throwable error) {
         logger.error("用户错误:" + this.topic + ",原因:" + error.getMessage());
         error.printStackTrace();
     }
 
     /**
      * 实现服务器主动推送
      */
     public void sendMessage(String message) throws IOException {
         this.session.getBasicRemote().sendText(message);
     }
 
 
     /**
      * 发送自定义消息
      */
     public static void sendInfo(String message, @PathParam("topic") String topic) throws IOException {
         logger.info("发送消息到:" + topic + ",信息:" + message);
         if (StringUtils.isNotBlank(topic) && webSocketMap.containsKey(topic)) {
             webSocketMap.get(topic).sendMessage(message);
         } else {
             logger.error("用户" + topic + ",不在线!");
         }
     }
 
     public static synchronized int getOnlineCount() {
         return onlineCount;
     }
 
     public static synchronized void addOnlineCount() {
         WebSocketServer.onlineCount++;
     }
 
     public static synchronized void subOnlineCount() {
         WebSocketServer.onlineCount--;
     }
 }

三、运行截图

  1. 首页截图

  2. 视频效果

SpringBoot+WebSocket实现消息广播

如需源码请参考: sw-chat.zip 源码下载

SpringBoot + websocket 实现模拟设备上下线

SpringBoot + websocket 实现模拟设备上下线

本文系本人原创,首先发布在我的个人博客上,SpringBoot + websocket 实现模拟设备上下线,转载请注明出处。

 

之前有做过车联网项目,gps设备都会有上下线检测的功能,但有的时候没有真实设备测试,如何模拟设备上下线呢?可以使用websocket实现,因为它是长连接,性能开销小且通信高效。

下面就直接上代码

pom.xml

 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-tomcat</artifactId>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-tomcat</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
        <version>1.3.5.RELEASE</version>
    </dependency>
</dependencies>

DemoApplication.java

 
1
2
3
4
5
6
@SpringBootApplication
public class DemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}

WebSocketServer.java

 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package com.example.demo.websocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
 
@ServerEndpoint(value = "/{deviceId}")
@Component
public class WebSocketServer {
 
private static final Logger logger = LoggerFactory.getLogger(WebSocketServer.class);
 
/**
* 存放每个客户端对应的MyWebSocket对象
*/
private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
 
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
 
/**
* 连接建立成功调用的方法
* */
@OnOpen
public void onOpen(Session session,@PathParam(value = "deviceId") String deviceId) {
this.session = session;
//判断当前设备是否已有web socket连接,有则删除
deleteWebsocketExisted(deviceId);
//保存当前websocket连接
webSocketMap.put(deviceId, this);
 
// 通知客户端建立连接成功
sendMessage(deviceId + "connected success");
 
//设备状态变化(设备连接成功通知)
statusChange(deviceId, true);
}
 
/**
* 收到客户端消息后调用的方法
* @param message 客户端发送过来的消息
* */
@OnMessage
public void onMessage(String message) {
if("ping".equals(message)){
// 心跳
sendMessage(message);
}else{
// 省略业务消息处理步骤
// 返回消息给客户端
sendMessage("some message");
}
}
 
/**
* 连接关闭调用的方法
* */
@OnClose
public void onClose() {
String deviceId =  getKey(webSocketMap, this);
// 从set中删除
webSocketMap.remove(deviceId);
//设备下线
statusChange(deviceId, false);
}
 
/**
* 发生错误时调用
* */
@OnError
public void onError(Throwable error) {
logger.error("Error:"+error.getMessage());
}
 
private void statusChange(String deviceId, boolean status) {
if (status) {
logger.info("设备{}上线", deviceId);
}else{
logger.info("设备{}下线", deviceId);
}
// 省略操作数据库步骤
}
 
 
private void deleteWebsocketExisted(String deviceId) {
WebSocketServer oldConnection = webSocketMap.get(deviceId);
if(null != oldConnection){
try {
oldConnection.getSession().close();
} catch (IOException e) {
logger.error("IOException:{}"+e.getMessage());
}
}
}
 
/**
* 根据map的value获取map的key
* @param map
* @param myWebSocket
* @return
*/
private static String getKey(Map<String,WebSocketServer> map,WebSocketServer myWebSocket){
String key="";
for (Entry<String, WebSocketServer> entry : map.entrySet()) {
if(myWebSocket.equals(entry.getValue())){
key=entry.getKey();
}
}
return key;
}
 
public void sendMessage(String message) {
this.session.getAsyncRemote().sendText(message);
}
 
public Session getSession() {
return session;
}
 
}

WebSocketConfig.java

 
1
2
3
4
5
6
7
8
9
10
11
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
 
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

application.properties

 
1
server.port=5656

启动springboot,这时就可以在客户端模拟调用了,就是这个地址

 
1
ws://localhost:5656/123456789

123456789是设备id

问题总结

1.WebSocketServer里是无法通过@autowired或者@resource注入bean的

2.如果是打包成war到外部tomcat运行,则不需要WebSocketConfig.java这个配置类

今天的关于springboot+websocket+redis搭建的实现springboot websocket redis的分享已经结束,谢谢您的关注,如果想了解更多关于spring boot websocket的实现、Spring 系列 (11) - Springboot+WebSocket 实现发送 JSON 消息实例 (二)、SpringBoot + Websocket 实现实时聊天、SpringBoot + websocket 实现模拟设备上下线的相关知识,请在本站进行查询。

本文标签: