51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

Spring Boot + WebSocket + Vue 实现 Ollama 流式会话功能

Spring Boot + WebSocket + Vue 实现 Ollama 流式会话功能

在当今的 Web 应用开发中,实时通信和流式数据处理变得越来越重要。本文深入探讨了使用 Spring Boot 作为后端,结合 WebSocket 技术和 Vue 前端框架来实现与 Ollama 的流式会话。

技术选型与简介

  1. Spring Boot

    Spring Boot 是建立在强大的 Spring 框架之上的一种创新型开发框架。它通过预设的默认配置和自动化的配置管理,极大地简化了 Spring 应用的创建和部署流程。开发人员不再需要花费大量时间去处理繁琐的配置文件,而是能够迅速启动项目并专注于核心业务逻辑的实现。其丰富的 starter 依赖库,使得引入各种常用技术组件变得轻松快捷,为构建高效、稳定的后端服务提供了坚实基础。

    关键特性包括自动配置、嵌入式服务器支持(如 Tomcat、Jetty 等)、健康检查和监控端点、强大的依赖管理等。这些特性使得开发人员能够更高效地开发、测试和部署应用,同时也降低了出错的概率。

    对于企业级应用开发来说,Spring Boot 提供了出色的可扩展性和集成性,可以轻松与各种数据库、消息队列、缓存系统等进行集成,满足复杂业务场景的需求。

  2. WebSocket

    WebSocket 是一种革命性的通信协议,它打破了传统 HTTP 请求-响应模式的限制。通过建立一个持久的全双工连接,使得服器能够实时主动地向客户端推送数据,而客户端也可以随时向服务器发送数据,实现了真正意义上的实时双向通信。

    相较于传统的轮询或长轮询技术,WebSocket 显著降低了网络开销和延迟,提高了数据传输的效率和实时性。它适用于需要实时更新数据的场景,如在线聊天、实时游戏状态同步、金融行情实时推送等。

    在安全性方面,WebSocket 通常与 SSL/TLS 结合使用,确保数据在传输过程中的保密性和完整性。

  3. Vue

    Vue 作为一款轻量级的前端框架,以其简洁易懂的语法和高效的渲染机制受到广大开发者的喜爱。它采用了组件化的开发模式,将用户界面分解为独立、可复用的组件,大大提高了开发效率和代码的可维护性。

    Vue 的核心库专注于视图层,通过数据驱动的方式实现视图的自动更新,使开发者能够更直观地管理页面的状态和交互逻辑。同时,它还拥有丰富的生态系统,包括路由管理(Vue Router)、状态管理(Vuex)等,为构建复杂的单页应用提供了强大的支持。

    对于渐进式的开发理念,Vue 允许开发者可以根据项目的需求逐步引入和扩展功能,从简单的页面应用到大型的企业级前端应用都能轻松应对。

后端实现(Spring Boot)

pom.xml 依赖:

<dependencies>
    <!-- Spring Boot WebSocket 依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    <!-- 其他相关依赖 -->
    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>5.8.18</version>
    </dependency>
</dependencies>

application.yaml 配置:

server:
  port: 8080
  servlet:
    context-path: /yourContextPath

spring:
  websocket:
    stomp:
      register-simple-broker: true
      endpoint: /ws
      allowed-origins: "*"
    external-host: http://localhost:11434

后端核心代码:

import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHttpHeaders;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URLDecoder;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;

@Component
public class ChatWebSocketHandler extends TextWebSocketHandler {

    private String chatId;
    private String baseUrl;
  	@Value("${spring.websocket.external-host}")
    private String externalHost;


    private ConcurrentHashMap<String, ExternalWebSocketHandler> externalHandlerMap = new ConcurrentHashMap<>();

    @Override
    public void afterConnectionEstablished(WebSocketSession clientSession) throws InterruptedException {
        System.out.println("Connected to client");
        parseParameters(Objects.requireNonNull(clientSession.getUri()));
        connectToExternalWebSocket(clientSession);
    }

    private void parseParameters(URI uri) {
        // 解析路径变量 chatId
        String path = uri.getPath();
        String[] segments = path.split("/");
        if (segments.length > 1) {
            this.chatId = segments[segments.length - 1];
        }

        System.out.println("Parsed chatId: " + chatId);

        // 解析查询参数 baseUrl
        String query = uri.getQuery();
        Map<String, String> queryParams = new HashMap<>();
        if (query!= null) {
            String[] pairs = query.split("&");
            for (String pair : pairs) {
                int idx = pair.indexOf("=");
                try {
                    queryParams.put(URLDecoder.decode(pair.substring(0, idx), "UTF-8"),
                            URLDecoder.decode(pair.substring(idx + 1), "UTF-8"));
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }
        }
        this.baseUrl = MapUtil.getStr(queryParams, "baseUrl", "");
    }

    private void connectToExternalWebSocket(WebSocketSession clientSession) throws InterruptedException {
        String template = "{host}/{chatId}?base_url={baseUrl}";
        Map<String, Object> urlParams = new HashMap<>();
        urlParams.put("host", externalHost);
        urlParams.put("chatId", chatId);
        urlParams.put("baseUrl", baseUrl);
        String url = StrUtil.format(template, urlParams);

        StandardWebSocketClient client = new StandardWebSocketClient();
        WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
        CountDownLatch connectionLatch = new CountDownLatch(1);
        ExternalWebSocketHandler externalHandler = new ExternalWebSocketHandler(connectionLatch, clientSession);
        client.doHandshake(externalHandler, headers, URI.create(url))
          .addCallback(
                        result -> System.out.println("External connection established"),
                        ex -> System.err.println("External connection failed: " + ex.getMessage())
                );
        externalHandlerMap.put(clientSession.getId(), externalHandler);
        connectionLatch.await();
    }

    @Override
    protected void handleTextMessage(WebSocketSession clientSession, TextMessage message) throws Exception {
        System.out.println("Received message from client: " + message.getPayload());
        ExternalWebSocketHandler externalHandler = externalHandlerMap.get(clientSession.getId());
        externalHandler.forwardMessage(message.getPayload());
    }

    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) {
        System.err.println("Transport error: " + exception.getMessage());
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, org.springframework.web.socket.CloseStatus status) {
        externalHandlerMap.remove(session.getId());
        System.out.println("Connection closed: " + session.getId());
    }

    public static class ExternalWebSocketHandler extends AbstractWebSocketHandler {

        private final CountDownLatch connectionLatch;
        private WebSocketSession clientSession;
        private WebSocketSession externalSession;

        public ExternalWebSocketHandler(CountDownLatch connectionLatch, WebSocketSession clientSession) {
            this.connectionLatch = connectionLatch;
            this.clientSession = clientSession;
        }

        @Override
        public void afterConnectionEstablished(WebSocketSession session) throws Exception {
            System.out.println("Connected to external service: " + session.getId());
            this.externalSession = session;
            sendInitialMessage(); 
            connectionLatch.countDown();
        }

        @Override
        protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
            clientSession.sendMessage(message);
        }

        public void forwardMessage(String message) throws Exception {
            if (externalSession!= null && externalSession.isOpen()) {
                externalSession.sendMessage(new TextMessage(message));
            } else {
                System.err.println("External WebSocket session is not open");
            }
        }

        private void sendInitialMessage() throws Exception {
        }
    }
}

配置 WebSocket 端点:

import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(new ChatWebSocketHandler(), "/chat");
    }
}

前端实现(Vue)

<template>
  <div>
    <div v-if="messages.length > 0" v-for="message in messages" :key="message">{{ message }}</div>
  </div>
</template>

<script>
export default {
  data() {
    return {
      messages: [],
      socket: null,
      // 新增后端需要的参数
      chatId: '', 
      baseUrl: ''
    };
  },
  mounted() {
    this.initWebSocket();
  },
  methods: {
    initWebSocket() {
      // 传递后端需要的参数
      this.socket = new WebSocket(`ws://your-backend-url/chat?chatId=``{this.chatId}&baseUrl=``{this.baseUrl}`);
      this.socket.onmessage = event => {
        this.messages.push(event.data);
      };
    }
  }
};
</script>

通过以上后端与前端的配合,实现了 Spring Boot + WebSocket + Vue 的 Ollama 流式会话。

总结:

本文详细介绍了从技术选型到具体实现的全过程,涵盖了 Spring Boot 后端的代码编写、配置(包括从 yaml 文件读取外部主机配置),以及 Vue 前端的连接和消息处理(增加了后端需要的参数传递)。需要注意的是,开发者在运行此项目前,应确保启动 Ollama 服务,其 URL 为 http://localhost:11434,以便实现完整的功能。这有助于深入理解和实践相关技术在实时通信和流式会话中的应用。

赞(4)
未经允许不得转载:工具盒子 » Spring Boot + WebSocket + Vue 实现 Ollama 流式会话功能