51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

Spring Boot 定时推送 Websocket 消息

1、概览 {#1概览}

本文将带你了解如何在 Spring Boot 中实现定时地往浏览器推送 WebSockets 消息。

另一种方法是使用服务器发送事件 (SSE),但本文不涉及这一点。

Spring 提供了多种调度方式。如 @Scheduled 注解,以及 Project Reactor 提供的 Flux::interval 方法,对于 Webflux 应用来说,该方法开箱即用,它还可以作为独立库用于任何 Java 项目。

此外,还有一些更专业的三方调度框架,如 Quartz Scheduler,但这不在本文范畴。

2、简单的聊天应用 {#2简单的聊天应用}

上一篇文章 中,使用 WebSockets 构建了一个聊天应用。现在让我们用一项新功能来扩展它:聊天机器人。聊天机器人是向浏览器推送预定消息的服务器端组件。

2.1、Maven 依赖 {#21maven-依赖}

先在 Maven 中添加必要依赖, pom.xml 如下:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
</dependency>
<dependency>
    <groupId>com.github.javafaker</groupId>
    <artifactId>javafaker</artifactId>
    <version>1.0.2</version>
</dependency>
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
</dependency>

2.2、JavaFaker {#22javafaker}

使用 JavaFaker 库生成机器人信息。该库通常用于生成测试数据。在这里,用于为聊天室添加一位名为 "Chuck Norris" 的访客。

代码如下:

Faker faker = new Faker();
ChuckNorris chuckNorris = faker.chuckNorris();
String messageFromChuck = chuckNorris.fact();

Faker 为各种数据生成器(Data Generator)提供工厂方法。本文使用 ChuckNorris 生成器。调用 chuckNorris.fact() 从预定义信息列表中随机显示一句话。

2.3、Data Model {#23data-model}

使用简单的 POJO 封装聊天应用的消息:

public class OutputMessage {
private String from;
private String text;
private String time;

// 构造函数、get、set 方法省略 }

下面举例说明如何创建聊天消息:

OutputMessage message = new OutputMessage(
  "Chatbot 1", "Hello there!", new SimpleDateFormat("HH:mm").format(new Date())));

2.4、客户端 {#24客户端}

聊天应用的客户端是一个简单的 HTML 页面。它使用 SockJS 客户端STOMP 消息协议。

客户端订阅 Topic:

<html>
<head>
    <script src="./js/sockjs-0.3.4.js"></script>
    <script src="./js/stomp.js"></script>
    <script type="text/javascript">
        // ...
        stompClient = Stomp.over(socket);
    stompClient.connect({}, function(frame) {
        // ...
        stompClient.subscribe('/topic/pushmessages', function(messageOutput) {
            showMessageOutput(JSON.parse(messageOutput.body));
        });
    });
    // ...
&lt;/script&gt;

</head> <!-- ... --> </html>

首先,通过 SockJS 协议创建一个 Stomp 客户端。然后,Topic 订阅作为服务器和连接的客户端之间的通信渠道。

在代码库中,这段代码位于 webapp/bots.html 文件中。当在本地运行时,可以通过http://localhost:8080/bots.html 访问它。当然,可以需要根据应用的部署方式来调整主机和端口。

2.5、服务端 {#25服务端}

上一篇文章 已经介绍了如何在 Spring 中配置 WebSockets。

现在来稍微修改一下配置:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
    config.enableSimpleBroker(&quot;/topic&quot;);
    config.setApplicationDestinationPrefixes(&quot;/app&quot;);
}

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
    // ...
    registry.addEndpoint(&quot;/chatwithbots&quot;);
    registry.addEndpoint(&quot;/chatwithbots&quot;).withSockJS();
}

}

使用工具类 SimpMessagingTemplate 来推送消息。默认情况下,它作为一个 @Bean 在 Spring Context 中提供。可以通过查看 classpath 上 AbstractMessageBrokerConfiguration 抽象类的自动配置来查看它的声明。

可以将它注入到任何 Spring 组件中。

然后,用它向 Topic /topic/pushmessages 推送消息。假设已将该 Bean 注入到名为 simpMessagingTemplate 的变量中:

simpMessagingTemplate.convertAndSend("/topic/pushmessages", 
  new OutputMessage("Chuck Norris", faker.chuckNorris().fact(), time));

如前所述,在客户端示例中,客户端会订阅该 Topic,以便在消息到达时对其进行处理。

  1. 定时推送消息 {#3-定时推送消息}

在 Spring 生态系统中,可以选择多种调度方法。如果使用 Spring MVC,@Scheduled 注解因其简单性而自然成为首选。如果使用 Spring Webflux,则使用 Project Reactor 的 Flux::interval 方法。

3.1、配置 {#31配置}

聊天机器人使用 JavaFaker 的 Chuck Norris 生成器。

把它配置为一个 Bean,这样就可以在需要的地方注入它。

@Configuration
class AppConfig {
@Bean
public ChuckNorris chuckNorris() {
    return (new Faker()).chuckNorris();
}

}

3.2、使用 @Scheduled {#32使用-scheduled}

"机器人" 是一个定时调度方法。运行时,它使用 SimpMessagingTemplate 通过 WebSocket 发送 OutputMessage POJO。

顾名思义,@Scheduled 注解允许重复执行方法。通过它,可以使用简单的基于速率的调度或更复杂的 cron 表达式。

编写第一个聊天机器人的代码:

@Service
public class ScheduledPushMessages {
@Scheduled(fixedRate = 5000)
public void sendMessage(SimpMessagingTemplate simpMessagingTemplate, ChuckNorris chuckNorris) {
    String time = new SimpleDateFormat(&quot;HH:mm&quot;).format(new Date());
    simpMessagingTemplate.convertAndSend(&quot;/topic/pushmessages&quot;, 
      new OutputMessage(&quot;Chuck Norris (@Scheduled)&quot;, chuckNorris().fact(), time));
}

}

@Scheduled(fixedRate = 5000)sendMessage 方法进行注解。这使得 sendMessage 每五秒运行一次。然后,使用 simpMessagingTemplate 向 Topic 发送 OutputMessagesimpMessagingTemplatechuckNorris 实例作为方法参数从 Spring Context 注入。

3.3、使用 Flux::interval() {#33使用-fluxinterval}

如果使用 WebFlux,可以使用 Flux::interval Operator。它将发布一个指定时间间隔(Duration)分隔的无限长的 Long 流。

现在,将 Flux 与之前的示例结合起来使用。

目标是每五秒钟发送一次 "Chuck Norris"。首先,需要实现 InitializingBean 接口,以便在应用启动时订阅 Flux

@Service
public class ReactiveScheduledPushMessages implements InitializingBean {
private SimpMessagingTemplate simpMessagingTemplate;

private ChuckNorris chuckNorris;

@Autowired
public ReactiveScheduledPushMessages(SimpMessagingTemplate simpMessagingTemplate, ChuckNorris chuckNorris) {
    this.simpMessagingTemplate = simpMessagingTemplate;
    this.chuckNorris = chuckNorris;
}

@Override
public void afterPropertiesSet() throws Exception {
    Flux.interval(Duration.ofSeconds(5L))
        // 丢弃传入的 Long 消息,代之以 OutputMessage
        .map((n) -&gt; new OutputMessage(&quot;Chuck Norris (Flux::interval)&quot;, 
                          chuckNorris.fact(), 
                          new SimpleDateFormat(&quot;HH:mm&quot;).format(new Date()))) 
        .subscribe(message -&gt; simpMessagingTemplate.convertAndSend(&quot;/topic/pushmessages&quot;, message));
}

}

如上,使用构造函数注入来设置 simpMessagingTemplatechuckNorris 实例。这次,调度逻辑位于 afterPropertiesSet() 中,在实现 InitializingBean 时覆写了该方法。该方法将在服务启动后立即运行。

interval Operator 每 5 秒发出一个 Long 值。然后,map Operator 会丢弃该值,并用我们的消息取而代之。最后,订阅 Flux,为每条消息触发我们的逻辑。

4、总结 {#4总结}

本文介绍了在 Spring Boot 中实现定时地往浏览器推送 WebSocket 消息的两种方式,分别是使用 @Scheduled 注解或者 Flux::interval() 方法。


Ref:https://www.baeldung.com/spring-boot-scheduled-websocket

赞(6)
未经允许不得转载:工具盒子 » Spring Boot 定时推送 Websocket 消息