springboot sse(长文讲解)

更新时间:

💡一则或许对你有用的小广告

欢迎加入小哈的星球 ,你将获得:专属的项目实战 / 1v1 提问 / Java 学习路线 / 学习打卡 / 每月赠书 / 社群讨论

截止目前, 星球 内专栏累计输出 90w+ 字,讲解图 3441+ 张,还在持续爆肝中.. 后续还会上新更多项目,目标是将 Java 领域典型的项目都整一波,如秒杀系统, 在线商城, IM 即时通讯,权限管理,Spring Cloud Alibaba 微服务等等,已有 3100+ 小伙伴加入学习 ,欢迎点击围观

在互联网应用开发中,实时数据推送是一个常见需求,例如聊天室、股票行情更新或订单状态通知。传统的实现方式如轮询(Polling)或长轮询(Long Polling)存在效率低、资源消耗大等问题。而 Spring Boot SSE(Server-Sent Events)通过单向通信机制,提供了轻量级、高效且易于维护的解决方案。本文将从基础概念到实战案例,分步骤讲解如何在 Spring Boot 中实现 SSE 功能,并通过比喻和代码示例帮助读者理解其核心原理与应用场景。


什么是 Server-Sent Events(SSE)?

SSE 是一种浏览器原生支持的技术,允许服务器主动向客户端推送实时数据。与 WebSocket 不同,SSE 是单向通信(服务器→客户端),基于 HTTP 协议,无需复杂的握手过程,且开销更小。

类比说明
想象快递公司送货:

  • 传统轮询:用户每 5 分钟给快递员打电话询问包裹是否到达,即使包裹未到也要重复询问。
  • SSE:快递员一旦有新包裹,就直接送货上门,用户无需主动询问。

SSE 的优势在于:

  • 低延迟:数据推送几乎实时。
  • 轻量级:基于 HTTP/1.1,无需额外协议支持。
  • 简单易用:客户端通过 JavaScript 的 EventSource 接口即可实现。

Spring Boot 对 SSE 的支持

Spring Boot 通过 SseEmitter 类简化了 SSE 的实现。开发者只需按照以下步骤:

  1. 创建 SseEmitter 实例。
  2. 通过 SseEmitter#send() 方法发送事件。
  3. 处理客户端断开、超时等异常。

核心类与配置

SseEmitter

这是 Spring Boot 提供的核心类,用于管理单个客户端的 SSE 通信。关键方法包括:

  • send():发送事件数据。
  • complete():主动关闭连接。
  • timeout():设置连接超时时间。

MIME 类型配置

SSE 请求的响应头需设置 Content-Typetext/event-stream,Spring Boot 会自动处理这一配置。


第一步:创建基础 SSE 示例

1.1 项目依赖

pom.xml 中添加 Spring Web 依赖:

<dependency>  
    <groupId>org.springframework.boot</groupId>  
    <artifactId>spring-boot-starter-web</artifactId>  
</dependency>  

1.2 控制器实现

创建一个 REST 端点,返回 SseEmitter

import org.springframework.http.MediaType;  
import org.springframework.web.bind.annotation.*;  
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;  

@RestController  
public class SseController {  

    @GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)  
    public SseEmitter startSse() {  
        SseEmitter emitter = new SseEmitter();  
        // 发送初始事件  
        sendEvent(emitter, "欢迎连接 SSE!");  
        return emitter;  
    }  

    private void sendEvent(SseEmitter emitter, String data) {  
        try {  
            emitter.send(SseEmitter.event()  
                    .name("message")  
                    .data(data));  
        } catch (Exception e) {  
            emitter.complete();  
        }  
    }  
}  

1.3 客户端测试

在浏览器中打开 /sse 端点,或通过 JavaScript 订阅事件:

const eventSource = new EventSource('/sse');  
eventSource.onmessage = (event) => {  
    console.log('接收到消息:', event.data);  
};  

进阶技巧:动态消息推送与异常处理

2.1 动态发送消息

在实际场景中,可能需要异步推送消息。例如,模拟订单状态更新:

@RestController  
public class OrderController {  

    @GetMapping("/order/{id}")  
    public SseEmitter getOrderStatus(@PathVariable String id) {  
        SseEmitter emitter = new SseEmitter(60_000L); // 设置超时时间  
        new Thread(() -> {  
            try {  
                Thread.sleep(2000); // 模拟处理延迟  
                emitter.send("订单 " + id + " 已发货");  
                Thread.sleep(3000);  
                emitter.send("订单 " + id + " 正在派送");  
                emitter.complete(); // 结束连接  
            } catch (Exception e) {  
                emitter.completeWithError(e);  
            }  
        }).start();  
        return emitter;  
    }  
}  

2.2 异常处理与资源管理

SSE 连接可能因网络中断或超时断开,需捕获异常并释放资源:

private void sendEventWithSafety(SseEmitter emitter, String data) {  
    try {  
        emitter.send(data);  
    } catch (IOException e) {  
        if (emitter.hasClients()) {  
            emitter.complete();  
        }  
    } finally {  
        // 可选:记录日志或清理资源  
    }  
}  

高级场景:结合消息队列实现解耦

在高并发场景中,直接在控制器中处理异步任务可能阻塞主线程。可通过消息队列(如 Redis)解耦生产者与消费者:

3.1 发布消息到 Redis

@Autowired  
private RedisTemplate<String, String> redisTemplate;  

@PostMapping("/publish")  
public String publishMessage(@RequestParam String message) {  
    redisTemplate.convertAndSend("sse-channel", message);  
    return "消息已发送";  
}  

3.2 订阅 Redis 通道并推送事件

通过 Spring 的 @MessageMapping 监听 Redis 频道,将消息转发给客户端:

@Service  
public class SseService {  

    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();  

    @MessageMapping("sse-channel")  
    public void onMessage(String message) {  
        emitters.values().forEach(emitter -> {  
            try {  
                emitter.send(message);  
            } catch (IOException e) {  
                // 处理异常  
            }  
        });  
    }  

    // 提供方法将新连接的 emitter 添加到集合中  
    public SseEmitter registerEmitter(String clientId) {  
        SseEmitter emitter = new SseEmitter();  
        emitters.put(clientId, emitter);  
        // 设置超时后自动移除  
        emitter.onCompletion(() -> emitters.remove(clientId));  
        return emitter;  
    }  
}  

性能优化与注意事项

4.1 线程池配置

默认情况下,SseEmitter 的发送操作是同步的,高并发时可能阻塞线程。通过配置线程池提升性能:

@Configuration  
@EnableAsync  
public class AsyncConfig implements AsyncConfigurer {  

    @Override  
    public Executor getAsyncExecutor() {  
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();  
        executor.setCorePoolSize(10);  
        executor.setMaxPoolSize(20);  
        executor.setQueueCapacity(500);  
        executor.initialize();  
        return executor;  
    }  
}  

4.2 客户端重连机制

浏览器默认会自动重连断开的 SSE 连接,但可自定义重连策略:

eventSource.onerror = (e) => {  
    setTimeout(() => {  
        eventSource.close();  
        this.startSse(); // 重新建立连接  
    }, 5000);  
};  

典型应用场景

5.1 实时聊天室

通过 SSE 实现群组聊天:

  • 服务端监听消息队列,将新消息推送给所有在线用户。
  • 客户端通过 EventSource 接收并更新界面。

5.2 股票行情推送

模拟实时股票价格更新:

// 每秒推送随机价格  
new Thread(() -> {  
    while (true) {  
        try {  
            Thread.sleep(1000);  
            emitter.send("AAPL: " + (float)(Math.random() * 100));  
        } catch (Exception e) {  
            emitter.complete();  
        }  
    }  
}).start();  

结论

本文通过从基础概念到实战案例的讲解,展示了如何在 Spring Boot 中实现 SSE 的核心功能。无论是订单状态通知、聊天室,还是实时数据分析,SSE 都能以高效、简洁的方式满足需求。开发者需注意线程安全、资源管理和异常处理,以构建高可用的实时系统。随着对 Spring Boot SSE 的深入理解,读者可进一步探索与 WebSocket、gRPC 等技术的结合,为复杂场景提供更灵活的解决方案。


通过本文,读者不仅能掌握 SSE 的实现方法,还能理解其在微服务架构中的适用场景,为构建现代化实时应用奠定坚实基础。

最新发布