我来为你详细介绍如何在Spring Boot + Vue中实现SSE(Server-Sent Events)接口。

1. Spring Boot 后端实现

1.1 添加依赖

<!-- pom.xml -->
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

1.2 SSE控制器

// SseController.java
@RestController
@RequestMapping("/api/sse")
@CrossOrigin(origins = "*") // 允许跨域
public class SseController {
    
    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
    
    /**
     * 建立SSE连接
     */
    @GetMapping("/connect/{clientId}")
    public SseEmitter connect(@PathVariable String clientId) {
        // 设置超时时间(0表示永不超时)
        SseEmitter emitter = new SseEmitter(0L);
        
        // 注册到Map中
        emitters.put(clientId, emitter);
        
        // 连接成功回调
        emitter.onCompletion(() -> {
            System.out.println("SSE连接完成: " + clientId);
            emitters.remove(clientId);
        });
        
        emitter.onTimeout(() -> {
            System.out.println("SSE连接超时: " + clientId);
            emitters.remove(clientId);
        });
        
        emitter.onError((ex) -> {
            System.out.println("SSE连接错误: " + clientId + ", 错误: " + ex.getMessage());
            emitters.remove(clientId);
        });
        
        try {
            // 发送连接成功的消息
            emitter.send(SseEmitter.event()
                    .name("connect")
                    .data("连接成功")
                    .id(String.valueOf(System.currentTimeMillis())));
        } catch (IOException e) {
            emitter.completeWithError(e);
        }
        
        return emitter;
    }
    
    /**
     * 广播消息给所有客户端
     */
    @PostMapping("/broadcast")
    public ResponseEntity<String> broadcast(@RequestBody MessageDTO message) {
        sendToAllClients(message);
        return ResponseEntity.ok("广播成功");
    }
    
    /**
     * 发送消息给指定客户端
     */
    @PostMapping("/send/{clientId}")
    public ResponseEntity<String> sendMessage(
            @PathVariable String clientId, 
            @RequestBody MessageDTO message) {
        boolean success = sendToClient(clientId, message);
        if (success) {
            return ResponseEntity.ok("发送成功");
        } else {
            return ResponseEntity.status(HttpStatus.NOT_FOUND).body("客户端不存在");
        }
    }
    
    /**
     * 获取在线客户端数量
     */
    @GetMapping("/online")
    public ResponseEntity<Map<String, Object>> getOnlineCount() {
        Map<String, Object> result = new HashMap<>();
        result.put("onlineCount", emitters.size());
        result.put("clientIds", new ArrayList<>(emitters.keySet()));
        return ResponseEntity.ok(result);
    }
    
    /**
     * 关闭指定客户端的连接
     */
    @DeleteMapping("/close/{clientId}")
    public ResponseEntity<String> closeConnection(@PathVariable String clientId) {
        SseEmitter emitter = emitters.get(clientId);
        if (emitter != null) {
            emitter.complete();
            emitters.remove(clientId);
            return ResponseEntity.ok("连接已关闭");
        }
        return ResponseEntity.status(HttpStatus.NOT_FOUND).body("客户端不存在");
    }
    
    /**
     * 向所有客户端发送消息
     */
    private void sendToAllClients(MessageDTO message) {
        List<String> deadClients = new ArrayList<>();
        
        emitters.forEach((clientId, emitter) -> {
            try {
                emitter.send(SseEmitter.event()
                        .name(message.getType())
                        .data(message.getContent())
                        .id(String.valueOf(System.currentTimeMillis()))
                        .reconnectTime(3000)); // 重连时间
            } catch (IOException e) {
                deadClients.add(clientId);
                emitter.complete();
            }
        });
        
        // 清理无效连接
        deadClients.forEach(emitters::remove);
    }
    
    /**
     * 向指定客户端发送消息
     */
    private boolean sendToClient(String clientId, MessageDTO message) {
        SseEmitter emitter = emitters.get(clientId);
        if (emitter != null) {
            try {
                emitter.send(SseEmitter.event()
                        .name(message.getType())
                        .data(message.getContent())
                        .id(String.valueOf(System.currentTimeMillis())));
                return true;
            } catch (IOException e) {
                emitter.complete();
                emitters.remove(clientId);
            }
        }
        return false;
    }
}

1.3 消息DTO类

// MessageDTO.java
public class MessageDTO {
    private String type;      // 消息类型
    private String content;   // 消息内容
    private Object data;      // 附加数据
    
    // 构造方法、getter、setter
    public MessageDTO() {}
    
    public MessageDTO(String type, String content) {
        this.type = type;
        this.content = content;
    }
    
    public MessageDTO(String type, String content, Object data) {
        this.type = type;
        this.content = content;
        this.data = data;
    }
    
    // getter和setter方法
    public String getType() { return type; }
    public void setType(String type) { this.type = type; }
    
    public String getContent() { return content; }
    public void setContent(String content) { this.content = content; }
    
    public Object getData() { return data; }
    public void setData(Object data) { this.data = data; }
}

1.4 配置类(可选)

// WebConfig.java
@Configuration
public class WebConfig implements WebMvcConfigurer {
    
    @Override
    public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
        configurer.setDefaultTimeout(0); // 异步请求不超时
    }
}

2. Vue 前端实现

2.1 安装依赖(可选)

npm install event-source-polyfill  # 如果需要兼容旧浏览器

2.2 SSE服务类

// src/services/sseService.js
class SSEService {
  constructor() {
    this.eventSource = null;
    this.clientId = null;
    this.listeners = new Map();
  }

  /**
   * 建立SSE连接
   * @param {string} clientId 客户端ID
   * @param {string} baseURL 基础URL
   */
  connect(clientId, baseURL = 'http://localhost:8080') {
    if (this.eventSource) {
      this.disconnect();
    }

    this.clientId = clientId;
    const url = `${baseURL}/api/sse/connect/${clientId}`;
    
    // 创建EventSource连接
    this.eventSource = new EventSource(url);

    // 连接打开
    this.eventSource.onopen = (event) => {
      console.log('SSE连接已建立');
      this.emit('connected', event);
    };

    // 消息
    this.eventSource.onmessage = (event) => {
      try {
        const data = JSON.parse(event.data);
        this.emit('message', data);
        
        // 根据事件名称分发
        if (data.name) {
          this.emit(data.name, data.data || data);
        }
      } catch (error) {
        console.error('解析SSE消息失败:', error);
        this.emit('raw-message', event.data);
      }
    };

    // 错误
    this.eventSource.onerror = (event) => {
      console.error('SSE连接错误:', event);
      this.emit('error', event);
      
      // 如果是网络错误,尝试重连
      if (event.target.readyState === EventSource.CLOSED) {
        console.log('SSE连接已关闭,尝试重连...');
        setTimeout(() => {
          this.connect(this.clientId, baseURL);
        }, 3000);
      }
    };

    return this;
  }

  /**
   * 添加事件器
   * @param {string} eventName 事件名称
   * @param {function} callback 回调函数
   */
  on(eventName, callback) {
    if (!this.listeners.has(eventName)) {
      this.listeners.set(eventName, []);
    }
    this.listeners.get(eventName).push(callback);
    return this;
  }

  /**
   * 移除事件器
   * @param {string} eventName 事件名称
   * @param {function} callback 回调函数
   */
  off(eventName, callback) {
    if (this.listeners.has(eventName)) {
      const callbacks = this.listeners.get(eventName);
      const index = callbacks.indexOf(callback);
      if (index > -1) {
        callbacks.splice(index, 1);
      }
    }
    return this;
  }

  /**
   * 触发事件
   * @param {string} eventName 事件名称
   * @param {any} data 数据
   */
  emit(eventName, data) {
    if (this.listeners.has(eventName)) {
      this.listeners.get(eventName).forEach(callback => {
        try {
          callback(data);
        } catch (error) {
          console.error(`执行事件${eventName}的回调时出错:`, error);
        }
      });
    }
  }

  /**
   * 发送消息到服务器
   * @param {object} message 消息对象
   * @param {string} endpoint 端点
   * @param {string} baseURL 基础URL
   */
  async sendMessage(message, endpoint = 'broadcast', baseURL = 'http://localhost:8080') {
    try {
      const url = `${baseURL}/api/sse/${endpoint}`;
      const options = {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
        },
        body: JSON.stringify(message)
      };

      const response = await fetch(url, options);
      return await response.json();
    } catch (error) {
      console.error('发送消息失败:', error);
      throw error;
    }
  }

  /**
   * 发送到指定客户端
   * @param {string} targetClientId 目标客户端ID
   * @param {object} message 消息对象
   * @param {string} baseURL 基础URL
   */
  async sendToClient(targetClientId, message, baseURL = 'http://localhost:8080') {
    return this.sendMessage(message, `send/${targetClientId}`, baseURL);
  }

  /**
   * 断开连接
   */
  disconnect() {
    if (this.eventSource) {
      this.eventSource.close();
      this.eventSource = null;
      console.log('SSE连接已断开');
      this.emit('disconnected');
    }
  }

  /**
   * 检查连接状态
   */
  isConnected() {
    return this.eventSource && this.eventSource.readyState === EventSource.OPEN;
  }
}

// 创建单例实例
const sseService = new SSEService();

export default sseService;

2.3 Vue组件中使用

<!-- SSEDemo.vue -->
<template>
  <div class="sse-demo">
    <h2>SSE实时通信演示</h2>
    
    <!-- 连接控制 -->
    <div class="connection-panel">
      <input v-model="clientId" placeholder="输入客户端ID" />
      <button @click="connect" :disabled="isConnected">连接</button>
      <button @click="disconnect" :disabled="!isConnected">断开</button>
      <span class="status" :class="{ connected: isConnected }">
        {{ isConnected ? '已连接' : '未连接' }}
      </span>
    </div>

    <!-- 消息发送 -->
    <div class="message-panel">
      <h3>发送消息</h3>
      <select v-model="messageType">
        <option value="ch@t">聊天</option>
        <option value="notification">通知</option>
        <option value="system">系统</option>
      </select>
      <input v-model="messageContent" placeholder="输入消息内容" />
      <button @click="sendBroadcast" :disabled="!isConnected">广播</button>
      <button @click="sendToSelf" :disabled="!isConnected">发送给自己</button>
    </div>

    <!-- 在线信息 -->
    <div class="online-info">
      <button @click="getOnlineInfo" :disabled="!isConnected">刷新在线信息</button>
      <p>在线人数: {{ onlineInfo.onlineCount || 0 }}</p>
    </div>

    <!-- 消息显示 -->
    <div class="messages">
      <h3>收到的消息</h3>
      <div class="message-list">
        <div v-for="(msg, index) in messages" :key="index" 
             class="message-item" :class="msg.type">
          <span class="time">{{ formatTime(msg.timestamp) }}</span>
          <span class="type">[{{ msg.type }}]</span>
          <span class="content">{{ msg.content }}</span>
        </div>
      </div>
    </div>
  </div>
</template>

<script>
import sseService from '@/services/sseService';

export default {
  name: 'SSEDemo',
  data() {
    return {
      clientId: '',
      messageType: 'ch@t',
      messageContent: '',
      isConnected: false,
      messages: [],
      onlineInfo: {}
    };
  },
  mounted() {
    // 生成随机客户端ID
    this.clientId = 'client_' + Math.random().toString(36).substr(2, 9);
    
    // 注册事件器
    this.registerEventListeners();
  },
  beforeUnmount() {
    // 组件销毁前断开连接
    sseService.disconnect();
  },
  methods: {
    registerEventListeners() {
      // 连接成功
      sseService.on('connected', () => {
        this.isConnected = true;
        this.addMessage('system', '连接成功');
      });

      // 连接断开
      sseService.on('disconnected', () => {
        this.isConnected = false;
        this.addMessage('system', '连接已断开');
      });

      // 接收消息
      sseService.on('message', (data) => {
        this.addMessage('message', JSON.stringify(data));
      });

      // 特定类型消息
      sseService.on('ch@t', (data) => {
        this.addMessage('ch@t', typeof data === 'string' ? data : JSON.stringify(data));
      });

      sseService.on('notification', (data) => {
        this.addMessage('notification', typeof data === 'string' ? data : JSON.stringify(data));
      });

      sseService.on('system', (data) => {
        this.addMessage('system', typeof data === 'string' ? data : JSON.stringify(data));
      });

      // 错误
      sseService.on('error', (error) => {
        console.error('SSE错误:', error);
        this.addMessage('system', '连接错误');
      });
    },

    connect() {
      if (!this.clientId.trim()) {
        alert('请输入客户端ID');
        return;
      }
      sseService.connect(this.clientId);
    },

    disconnect() {
      sseService.disconnect();
    },

    async sendBroadcast() {
      if (!this.messageContent.trim()) {
        alert('请输入消息内容');
        return;
      }

      try {
        await sseService.sendMessage({
          type: this.messageType,
          content: this.messageContent
        });
        this.messageContent = '';
      } catch (error) {
        alert('发送失败: ' + error.message);
      }
    },

    async sendToSelf() {
      if (!this.messageContent.trim()) {
        alert('请输入消息内容');
        return;
      }

      try {
        await sseService.sendToClient(this.clientId, {
          type: this.messageType,
          content: `[自发送] ${this.messageContent}`
        });
        this.messageContent = '';
      } catch (error) {
        alert('发送失败: ' + error.message);
      }
    },

    async getOnlineInfo() {
      try {
        const response = await fetch('http://localhost:8080/api/sse/online');
        this.onlineInfo = await response.json();
      } catch (error) {
        console.error('获取在线信息失败:', error);
      }
    },

    addMessage(type, content) {
      this.messages.unshift({
        type,
        content,
        timestamp: new Date()
      });
      
      // 限制消息数量
      if (this.messages.length > 50) {
        this.messages.pop();
      }
    },

    formatTime(timestamp) {
      return new Date(timestamp).toLocaleTimeString();
    }
  }
};
</script>

<style scoped>
.sse-demo {
  max-width: 800px;
  margin: 0 auto;
  padding: 20px;
}

.connection-panel, .message-panel, .online-info {
  margin-bottom: 20px;
  padding: 15px;
  border: 1px solid #ddd;
  border-radius: 5px;
}

.connection-panel input, .message-panel input, .message-panel select {
  margin-right: 10px;
  padding: 5px 10px;
}

button {
  padding: 5px 15px;
  margin-right: 10px;
  cursor: pointer;
}

button:disabled {
  cursor: not-allowed;
  opacity: 0.5;
}

.status {
  padding: 2px 8px;
  border-radius: 3px;
}

.status.connected {
  background-color: #4CAF50;
  color: white;
}

.messages {
  border: 1px solid #ddd;
  border-radius: 5px;
  padding: 15px;
}

.message-list {
  height: 300px;
  overflow-y: auto;
  border: 1px solid #eee;
  padding: 10px;
}

.message-item {
  margin-bottom: 10px;
  padding: 5px;
  border-radius: 3px;
}

.message-item.ch@t {
  background-color: #e3f2fd;
}

.message-item.notification {
  background-color: #fff3e0;
}

.message-item.system {
  background-color: #f3e5f5;
}

.time {
  font-size: 12px;
  color: #666;
  margin-right: 10px;
}

.type {
  font-weight: bold;
  margin-right: 10px;
}
</style>

3. 高级特性示例

3.1 带认证的SSE连接

// 在Controller中添加认证
@GetMapping("/connect/{clientId}")
public SseEmitter connect(@PathVariable String clientId, 
                         @RequestHeader(value = "Authorization", required = false) String token) {
    // 验证token逻辑
    if (!validateToken(token)) {
        throw new SecurityException("未授权访问");
    }
    
    // ... 其余代码相同
}

3.2 心跳检测

// 定期发送心跳
@Component
public class HeartbeatScheduler {
    
    @Autowired
    private SseController sseController;
    
    @Scheduled(fixedRate = 30000) // 每30秒发送一次心跳
    public void sendHeartbeat() {
        MessageDTO heartbeat = new MessageDTO("heartbeat", "ping");
        sseController.sendToAllClients(heartbeat);
    }
}

这个完整的实现提供了:

  1. 后端功能:连接管理、消息广播、定向发送、连接坚控
  2. 前端功能:连接管理、消息收发、事件、状态显示
  3. 错误处理:自动重连、连接状态坚控
  4. 扩展性:易于添加新的消息类型和业务逻辑

你可以根据具体需求调整消息格式、认证方式和业务逻辑。

本站提供的所有下载资源均来自互联网,仅提供学习交流使用,版权归原作者所有。如需商业使用,请联系原作者获得授权。 如您发现有涉嫌侵权的内容,请联系我们 邮箱:alixiixcom@163.com