springboot sse(长文讲解)
💡一则或许对你有用的小广告
欢迎加入小哈的星球 ,你将获得:专属的项目实战 / 1v1 提问 / Java 学习路线 / 学习打卡 / 每月赠书 / 社群讨论
- 新项目:《从零手撸:仿小红书(微服务架构)》 正在持续爆肝中,基于
Spring Cloud Alibaba + Spring Boot 3.x + JDK 17...
,点击查看项目介绍 ;演示链接: http://116.62.199.48:7070 ;- 《从零手撸:前后端分离博客项目(全栈开发)》 2 期已完结,演示链接: http://116.62.199.48/ ;
截止目前, 星球 内专栏累计输出 90w+ 字,讲解图 3441+ 张,还在持续爆肝中.. 后续还会上新更多项目,目标是将 Java 领域典型的项目都整一波,如秒杀系统, 在线商城, IM 即时通讯,权限管理,Spring Cloud Alibaba 微服务等等,已有 3100+ 小伙伴加入学习 ,欢迎点击围观
在互联网应用开发中,实时数据推送是一个常见需求,例如聊天室、股票行情更新或订单状态通知。传统的实现方式如轮询(Polling)或长轮询(Long Polling)存在效率低、资源消耗大等问题。而 Spring Boot SSE(Server-Sent Events)通过单向通信机制,提供了轻量级、高效且易于维护的解决方案。本文将从基础概念到实战案例,分步骤讲解如何在 Spring Boot 中实现 SSE 功能,并通过比喻和代码示例帮助读者理解其核心原理与应用场景。
什么是 Server-Sent Events(SSE)?
SSE 是一种浏览器原生支持的技术,允许服务器主动向客户端推送实时数据。与 WebSocket 不同,SSE 是单向通信(服务器→客户端),基于 HTTP 协议,无需复杂的握手过程,且开销更小。
类比说明:
想象快递公司送货:
- 传统轮询:用户每 5 分钟给快递员打电话询问包裹是否到达,即使包裹未到也要重复询问。
- SSE:快递员一旦有新包裹,就直接送货上门,用户无需主动询问。
SSE 的优势在于:
- 低延迟:数据推送几乎实时。
- 轻量级:基于 HTTP/1.1,无需额外协议支持。
- 简单易用:客户端通过 JavaScript 的
EventSource
接口即可实现。
Spring Boot 对 SSE 的支持
Spring Boot 通过 SseEmitter
类简化了 SSE 的实现。开发者只需按照以下步骤:
- 创建
SseEmitter
实例。 - 通过
SseEmitter#send()
方法发送事件。 - 处理客户端断开、超时等异常。
核心类与配置
SseEmitter
类
这是 Spring Boot 提供的核心类,用于管理单个客户端的 SSE 通信。关键方法包括:
send()
:发送事件数据。complete()
:主动关闭连接。timeout()
:设置连接超时时间。
MIME 类型配置
SSE 请求的响应头需设置 Content-Type
为 text/event-stream
,Spring Boot 会自动处理这一配置。
第一步:创建基础 SSE 示例
1.1 项目依赖
在 pom.xml
中添加 Spring Web 依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
1.2 控制器实现
创建一个 REST 端点,返回 SseEmitter
:
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@RestController
public class SseController {
@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter startSse() {
SseEmitter emitter = new SseEmitter();
// 发送初始事件
sendEvent(emitter, "欢迎连接 SSE!");
return emitter;
}
private void sendEvent(SseEmitter emitter, String data) {
try {
emitter.send(SseEmitter.event()
.name("message")
.data(data));
} catch (Exception e) {
emitter.complete();
}
}
}
1.3 客户端测试
在浏览器中打开 /sse
端点,或通过 JavaScript 订阅事件:
const eventSource = new EventSource('/sse');
eventSource.onmessage = (event) => {
console.log('接收到消息:', event.data);
};
进阶技巧:动态消息推送与异常处理
2.1 动态发送消息
在实际场景中,可能需要异步推送消息。例如,模拟订单状态更新:
@RestController
public class OrderController {
@GetMapping("/order/{id}")
public SseEmitter getOrderStatus(@PathVariable String id) {
SseEmitter emitter = new SseEmitter(60_000L); // 设置超时时间
new Thread(() -> {
try {
Thread.sleep(2000); // 模拟处理延迟
emitter.send("订单 " + id + " 已发货");
Thread.sleep(3000);
emitter.send("订单 " + id + " 正在派送");
emitter.complete(); // 结束连接
} catch (Exception e) {
emitter.completeWithError(e);
}
}).start();
return emitter;
}
}
2.2 异常处理与资源管理
SSE 连接可能因网络中断或超时断开,需捕获异常并释放资源:
private void sendEventWithSafety(SseEmitter emitter, String data) {
try {
emitter.send(data);
} catch (IOException e) {
if (emitter.hasClients()) {
emitter.complete();
}
} finally {
// 可选:记录日志或清理资源
}
}
高级场景:结合消息队列实现解耦
在高并发场景中,直接在控制器中处理异步任务可能阻塞主线程。可通过消息队列(如 Redis)解耦生产者与消费者:
3.1 发布消息到 Redis
@Autowired
private RedisTemplate<String, String> redisTemplate;
@PostMapping("/publish")
public String publishMessage(@RequestParam String message) {
redisTemplate.convertAndSend("sse-channel", message);
return "消息已发送";
}
3.2 订阅 Redis 通道并推送事件
通过 Spring 的 @MessageMapping
监听 Redis 频道,将消息转发给客户端:
@Service
public class SseService {
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
@MessageMapping("sse-channel")
public void onMessage(String message) {
emitters.values().forEach(emitter -> {
try {
emitter.send(message);
} catch (IOException e) {
// 处理异常
}
});
}
// 提供方法将新连接的 emitter 添加到集合中
public SseEmitter registerEmitter(String clientId) {
SseEmitter emitter = new SseEmitter();
emitters.put(clientId, emitter);
// 设置超时后自动移除
emitter.onCompletion(() -> emitters.remove(clientId));
return emitter;
}
}
性能优化与注意事项
4.1 线程池配置
默认情况下,SseEmitter
的发送操作是同步的,高并发时可能阻塞线程。通过配置线程池提升性能:
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(500);
executor.initialize();
return executor;
}
}
4.2 客户端重连机制
浏览器默认会自动重连断开的 SSE 连接,但可自定义重连策略:
eventSource.onerror = (e) => {
setTimeout(() => {
eventSource.close();
this.startSse(); // 重新建立连接
}, 5000);
};
典型应用场景
5.1 实时聊天室
通过 SSE 实现群组聊天:
- 服务端监听消息队列,将新消息推送给所有在线用户。
- 客户端通过
EventSource
接收并更新界面。
5.2 股票行情推送
模拟实时股票价格更新:
// 每秒推送随机价格
new Thread(() -> {
while (true) {
try {
Thread.sleep(1000);
emitter.send("AAPL: " + (float)(Math.random() * 100));
} catch (Exception e) {
emitter.complete();
}
}
}).start();
结论
本文通过从基础概念到实战案例的讲解,展示了如何在 Spring Boot 中实现 SSE 的核心功能。无论是订单状态通知、聊天室,还是实时数据分析,SSE 都能以高效、简洁的方式满足需求。开发者需注意线程安全、资源管理和异常处理,以构建高可用的实时系统。随着对 Spring Boot SSE 的深入理解,读者可进一步探索与 WebSocket、gRPC 等技术的结合,为复杂场景提供更灵活的解决方案。
通过本文,读者不仅能掌握 SSE 的实现方法,还能理解其在微服务架构中的适用场景,为构建现代化实时应用奠定坚实基础。