51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

在 Spring Boot 中整合、使用 WebSocket

WebSocket 是一种基于 TCP 协议的全双工通信协议,它允许客户端和服务器之间建立持久的、双向的通信连接。相比传统的 HTTP 请求 - 响应模式,WebSocket 提供了实时、低延迟的数据传输能力。通过 WebSocket,客户端和服务器可以在任意时间点互相发送消息,实现实时更新和即时通信的功能。WebSocket 协议经过了多个浏览器和服务器的支持,成为了现代 Web 应用中常用的通信协议之一。它广泛应用于聊天应用、实时数据更新、多人游戏等场景,为 Web 应用提供了更好的用户体验和更高效的数据传输方式。

本文将会指导你如何在 Spring Boot 中整合、使用 WebSocket,以及如何在 @ServerEndpoint 类中注入其他 Bean 依赖 。

在 Spring Boot 中使用 WebSocket 有 2 种方式。第 1 种是使用由 Jakarta EE 规范提供的 Api,也就是 jakarta.websocket 包下的接口。第 2 种是使用 spring 提供的支持,也就是 spring-websocket 模块。前者是一种独立于框架的技术规范,而后者是 Spring 生态系统的一部分,可以与其他 Spring 模块(如 Spring MVC、Spring Security)无缝集成,共享其配置和功能。

2 种方式各有优劣,你可以按需选择。本文将使用第 1 种方式,也就是使用 jakarta.websocket 来开发 WebSocket 应用。

软件版本:

  • Spring Boot:3.1.3

在 Spring Boot 中整合 WebSocket {#在-spring-boot-中整合-websocket}

添加依赖 {#添加依赖}

pom.xml 中添加 spring-boot-starter-websocket 依赖。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

开发 ServerEndpoint 端点 {#开发-serverendpoint-端点}

服务端 WebSocket 端点的开发也有 2 种方式。第 1 种是实现规范所提供的各种接口,通过接口定义的回调方法来处理新的连接、客户端消息、连接断开等等事件。另一种方式是使用注解,类似于 Spring 中的 Controller,通过在方法上使用不同的注解来监听不同的 WebSocket 事件,灵活性比较高,推荐使用。

我们打算创建一个 echo 端点,该端点会处理客户端的连接、断开、消息事件。在收到消息后,我们会在消息前面加上服务器时间戳和 Hello 前缀,原样写回给客户端。如果客户端发送的消息为 bye,则服务器会主动断开与客户端的连接。

package cn.springdoc.demo.channel;

import java.io.IOException;
import java.time.Instant;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.websocket.CloseReason;
import jakarta.websocket.EndpointConfig;
import jakarta.websocket.OnClose;
import jakarta.websocket.OnError;
import jakarta.websocket.OnMessage;
import jakarta.websocket.OnOpen;
import jakarta.websocket.Session;
import jakarta.websocket.server.ServerEndpoint;

// 使用 @ServerEndpoint 注解表示此类是一个 WebSocket 端点
// 通过 value 注解,指定 websocket 的路径
@ServerEndpoint(value = "/channel/echo")
public class EchoChannel {

    private static final Logger LOGGER = LoggerFactory.getLogger(EchoChannel.class);

    private Session session;

    // 收到消息
    @OnMessage
    public void onMessage(String message) throws IOException{
        
        LOGGER.info("[websocket] 收到消息:id={},message={}", this.session.getId(), message);
        
        if (message.equalsIgnoreCase("bye")) {
            // 由服务器主动关闭连接。状态码为 NORMAL_CLOSURE(正常关闭)。
            this.session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "Bye"));;
            return;
        }
        
        
        this.session.getAsyncRemote().sendText("["+ Instant.now().toEpochMilli() +"] Hello " + message);
    }

    // 连接打开
    @OnOpen
    public void onOpen(Session session, EndpointConfig endpointConfig){
        // 保存 session 到对象
        this.session = session;
        LOGGER.info("[websocket] 新的连接:id={}", this.session.getId());
    }

    // 连接关闭
    @OnClose
    public void onClose(CloseReason closeReason){
        LOGGER.info("[websocket] 连接断开:id={},reason={}", this.session.getId(),closeReason);
    }

    // 连接异常
    @OnError
    public void onError(Throwable throwable) throws IOException {
        
        LOGGER.info("[websocket] 连接异常:id={},throwable={}", this.session.getId(), throwable.getMessage());
        
        // 关闭连接。状态码为 UNEXPECTED_CONDITION(意料之外的异常)
        this.session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, throwable.getMessage()));
    }
}

首先,使用 @ServerEndpoint 注解表示此类是一个 WebSocket 端点,value 属性是必须的,用于设置路由。它还有其他的一些可选属性可以用于自定义子协议、消息编码器、消息解码器、握手处理器等等,篇幅原因这里不展开。

@OnMessage {#onmessage}

@OnMessage 注解用于监听客户端消息事件,它只有一个属性 long maxMessageSize() default -1; 用于限制客户端消息的大小,如果小于等于 0 则表示不限制。当客户端消息体积超过这个阈值,那么服务器就会主动断开连接,状态码为:1009。方法的参数可以是基本的 String / byte[] 或者是 Reader / InputStream,分别表示 WebSocket 中的文本和二进制消息。也可以是自定义的 Java 对象,但是需要在 @ServerEndpoint 中配置对象的解码器(jakarta.websocket.Decoder)。对于内容较长的消息,支持分批发送,可以在消息参数后面定义一个布尔类型的 boolean last参数,如果该值为 true 则表示此消息是批次消息中的最后一条。

@OnMessage
public void onMessage(String message, boolean last) throws IOException{
    if (last) {
            // 这是批量消息的最后一条
    }
}

@OnOpen {#onopen}

@OnOpen 方法用于监听客户端的连接事件,它没有任何属性。可以作为方法参数的对象有很多,Session 对象是必须的,表示当前连接对象,我们可以通过此对象来执行发送消息、断开连接等操作。WebSocket 的连接 URL,类似于 Http 的 URL,也可以传递查询参数、path 参数。通常用于传递认证、鉴权用的 Token 或其他信息。

要获取查询参数,我们可以通过 SessiongetRequestParameterMap(); 获取。

Map<String, List<String>> query = session.getRequestParameterMap();

要获取 path 参数,首先要在 @ServerEndpoint 中定义 path 参数,类似于 Spring Mvc 的 path 参数定义。例如: @ServerEndpoint(value = "/channel/echo/{id}")。那么我们可以在 @OnOpen 方法中使用 @PathParam 注解接收,如下:

@ServerEndpoint(value = "/channel/echo/{id}")

...

@OnOpen
public void onOpen(Session session, @PathParam("id") Long id, EndpointConfig endpointConfig){
    ....
}

示例中的最后一个参数 EndpointConfig ,它是可选,用于获取全局的一些配置。在本文中未用到。

@OnClose {#onclose}

@OnClose 用于处理连接断开事件,参数中可以指定一个 CloseReason 对象,它封装了断开连接的状态码、原因信息。

@OnError {#onerror}

@OnError 用于处理异常事件,该方法必须要有一个 Throwable 类型的参数,表示发生的异常。否则应用会启用失败:

Caused by: jakarta.websocket.DeploymentException: No Throwable parameter was present on the method [onError] of class [cn.springdoc.demo.channel.EchoChannel] that was annotated with OnError
    at org.apache.tomcat.websocket.pojo.PojoMethodMapping.getPathParams(PojoMethodMapping.java:311) ~[tomcat-embed-websocket-10.1.12.jar:10.1.12]
    at org.apache.tomcat.websocket.pojo.PojoMethodMapping.<init>(PojoMethodMapping.java:194) ~[tomcat-embed-websocket-10.1.12.jar:10.1.12]
    at org.apache.tomcat.websocket.server.WsServerContainer.addEndpoint(WsServerContainer.java:130) ~[tomcat-embed-websocket-10.1.12.jar:10.1.12]
    at org.apache.tomcat.websocket.server.WsServerContainer.addEndpoint(WsServerContainer.java:240) ~[tomcat-embed-websocket-10.1.12.jar:10.1.12]
    at org.apache.tomcat.websocket.server.WsServerContainer.addEndpoint(WsServerContainer.java:198) ~[tomcat-embed-websocket-10.1.12.jar:10.1.12]
    at org.springframework.web.socket.server.standard.ServerEndpointExporter.registerEndpoint(ServerEndpointExporter.java:156) ~[spring-websocket-6.0.11.jar:6.0.11]
    ... 12 common frames omitted

所有事件方法,都支持使用 Session 作为参数,表示当前连接参数。但是为了更加方便,我们在 @OnOpen 事件中直接把 Session 存储到了当前对象中,可以在任意方法中使用 this 访问。服务器会为每个连接创建一个端点对象,所以这是线程安全的。

上面还提到了一个 "连接关闭状态码",WebSocket 协议定义了一系列状态码来表示连接断开的原因,这些状态码定义在了 CloseReason.CloseCodes 枚举中。

配置 ServerEndpointExporter {#配置-serverendpointexporter}

定义好端点后,需要在配置类中通过定义 ServerEndpointExporter Bean 进行注册。

package cn.springdoc.demo.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

import cn.springdoc.demo.channel.EchoChannel;

@Configuration
public class WebSocketConfiguration {

    @Bean  
    public ServerEndpointExporter serverEndpointExporter (){
        
        ServerEndpointExporter exporter = new ServerEndpointExporter();
        
        // 手动注册 WebSocket 端点
        exporter.setAnnotatedEndpointClasses(EchoChannel.class);
        
        return exporter;
    }  
}

你也可以在 WebSocket 端点上添加 @Component 注解,使用 Spring 自动扫描,这样的话不需要手动调用 setAnnotatedEndpointClasses 方法进行注册。

测试 {#测试}

在项目的 src/main/resources 目录下创建一个 public 文件夹,再在此文件夹中新建一个 index.html 文件,作为 WebSocket 客户端。内容如下: Spring Boot 默认会把 public 目录下的 index.html 作为应用主页。

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>WebSocket</title>
</head>
<body>
    <script type="text/javascript">
        let websocket = new WebSocket("ws://localhost:8080/channel/echo");
        
        // 连接断开
        websocket.onclose = e => {
            console.log(`连接关闭: code=${e.code}, reason=${e.reason}`)
        }
        // 收到消息
        websocket.onmessage = e => {
            console.log(`收到消息:${e.data}`);
        }
        // 异常
        websocket.onerror = e => {
            console.log("连接异常")
            console.error(e)
        }
        // 连接打开
        websocket.onopen = e => {
            console.log("连接打开");
            
            // 创建连接后,往服务器连续写入3条消息
            websocket.send("springdoc.cn");
            websocket.send("springdoc.cn");
            websocket.send("springdoc.cn");
            
            // 最后发送 bye,由服务器断开连接
            websocket.send("bye");
            
            // 也可以由客户端主动断开
            // websocket.close();
        }
    </script>
</body>
</html>

内容很简单,网页加载后运行 Javascript 代码。立即创建与 ws://localhost:8080/channel/echo 的 WebSocket 连接对象,通过注册对象的各种监听方法来处理事件。

在连接就绪后,也就是在 onopen 方法中往服务器端点发送了 3 条消息。按照逻辑,服务端也会回复 3 条消息,这会触发 onmessage 事件,把消息内容输出到控制台。最后,发送 bye,服务器收到消息后会主动断开连接,这就会触发 onclose 事件,把 "连接关闭状态码" 和原因输出到控制台。

其实你可以直接把这段 Javascript 代码复制到任意支持 WebSocket 的浏览器的控制台执行,WebSocket 没有跨域的说法!

启动应用,打开浏览器(先打开控制台),然后访问 http://localhost:8080/,查看控制台输出的日志:

连接打开
收到消息:[1694505275009] Hello springdoc.cn
收到消息:[1694505275012] Hello springdoc.cn
收到消息:[1694505275014] Hello springdoc.cn
连接关闭: code=1000, reason=Bye

再看看服务端控制台日志:

cn.springdoc.demo.channel.EchoChannel    : [websocket] 新的连接:id=0
cn.springdoc.demo.channel.EchoChannel    : [websocket] 收到消息:id=0,message=springdoc.cn
cn.springdoc.demo.channel.EchoChannel    : [websocket] 收到消息:id=0,message=springdoc.cn
cn.springdoc.demo.channel.EchoChannel    : [websocket] 收到消息:id=0,message=springdoc.cn
cn.springdoc.demo.channel.EchoChannel    : [websocket] 收到消息:id=0,message=bye
cn.springdoc.demo.channel.EchoChannel    : [websocket] 连接断开:id=0,reason=CloseReason: code [1000], reason [Bye]

没有任何问题,一切按照我们预定义的逻辑在运行。客户端发送 3 条消息,服务器响应 3 条消息,最后断开连接。客户端、服务器相应的事件方法都成功执行。

服务端日志中的 sessionId(id=0),是通过 SessionString getId(); 方法获取的。服务器会为每个连接分配一个不同的 id 值,不同服务器生成的 id 类型不一样。 Tomcat 使用从 0 开始的自增值(本例),Undertow 使用的是类似于 UUID 的 32 位长度的字符串。

在端点中注入 Bean {#在端点中注入-bean}

往往我们需要在端点中使用其他 Spring 管理的 Bean 来完成业务,例如认证、鉴权、保存消息。。。等等。

假如我们有一个 UserService 服务类,内容如下:

package cn.springdoc.demo.service;

import org.springframework.stereotype.Service;

@Service
public class UserService {
    public void foo() {}

    // ....
}

我们现在要在端点中注入使用它,很多人会直接在端点类上使用 @Component 注解,然后注入:

@ServerEndpoint(value = "/channel/echo")
@Component  // 注册为 Spring 组件
public class EchoChannel {

    @Autowired // 注入需要的 Bean
    private UserService userService;

    // ...

    @OnOpen
    public void onOpen(Session session, EndpointConfig endpointConfig){

        this.session = session;

        // 在业务中使用
        this.userService.foo();
    }
}

服务可以正常启动,看似一切都没问题!可是当你在事件方法中使用这 Bean 的时候就会导致 NullPointerException 异常。

java.lang.NullPointerException: Cannot invoke "cn.springdoc.demo.service.UserService.foo()" because "this.userService" is null
    at cn.springdoc.demo.channel.EchoChannel.onOpen(EchoChannel.java:54)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.apache.tomcat.websocket.pojo.PojoEndpointBase.doOnOpen(PojoEndpointBase.java:67)
    at org.apache.tomcat.websocket.pojo.PojoEndpointServer.onOpen(PojoEndpointServer.java:46)
    at org.apache.tomcat.websocket.server.WsHttpUpgradeHandler.init(WsHttpUpgradeHandler.java:131)
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:936)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1740)
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52)
    at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191)
    at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659)
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
    at java.base/java.lang.Thread.run(Thread.java:833)

原因:运行时的 WebSocket 连接对象,也就是端点实例,是由服务器创建,而不是 Spring,所以不能使用自动装配。上文也提到过 "服务器会为每个连接创建一个端点实例对象"。

知道了原因后,解决办法也很简单,我们可以使用 Spring 的 ApplicationContextAware 接口,在应用启动时获取到 ApplicationContext 并且保存在全局静态变量中。

服务器每次创建连接的时候,我们就在 @OnOpen 事件方法中从 ApplicationContext 获取到需要 Bean 来初始化端点对象。

@ServerEndpoint(value = "/channel/echo")
@Component  // 由 spring 扫描管理
public class EchoChannel implements
                ApplicationContextAware { // 实现 ApplicationContextAware 接口, Spring 会在运行时注入 ApplicationContext

    private static final Logger LOGGER = LoggerFactory.getLogger(EchoChannel.class);

    // 全局静态变量,保存 ApplicationContext
    private static ApplicationContext applicationContext;

    private Session session;

    // 声明需要的 Bean
    private UserService userService;


    // 保存 Spring 注入的 ApplicationContext 到静态变量
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        EchoChannel.applicationContext = applicationContext;
    }

    @OnOpen
    public void onOpen(Session session, EndpointConfig endpointConfig){
        
        // 保存 session 到对象
        this.session = session;
        
        // 连接创建的时候,从 ApplicationContext 获取到 Bean 进行初始化
        this.userService = EchoChannel.applicationContext.getBean(UserService.class);
        
        // 在业务中使用
        this.userService.foo();
        
        LOGGER.info("[websocket] 新的连接:id={}", this.session.getId());
    }
    // ....
}

onOpen 方法在整个连接的生命周期中,只会执行一次,所以这种方式不会带来通信时的性能损耗。

进阶阅读:

赞(1)
未经允许不得转载:工具盒子 » 在 Spring Boot 中整合、使用 WebSocket