Spring framework 4.2 GA即将发布,让我们来看看它提供的一些新特性。引起我注意的是一个简单的新类
SseEmitter
一种在 Spring MVC 控制器中很容易使用的
服务器发送事件的
抽象。 SSE 是一种技术,允许您在一个 HTTP 连接中以一个方向将数据从服务器流式传输到浏览器。这听起来像是
websockets
可以做的一部分。然而,由于它是一个简单得多的协议,它可以在不需要全双工的情况下使用,例如实时推送股票价格变化或显示长期运行过程的进度。这将是我们的榜样。
假设我们有一个具有以下 API 的虚拟硬币矿工:
public interface CoinMiner {
BigDecimal mine() {
//...
}
}
每次我们调用 mine() 时,我们都必须等待几秒钟,然后我们得到大约 1 个硬币的回报(平均)。如果我们想挖多个币,我们必须多次调用这个方法:
public interface CoinMiner {
BigDecimal mine() {
//...
}
}
客户端代码必须显式提供
ExecutorService
(只是一个设计选择):
public interface CoinMiner {
BigDecimal mine() {
//...
}
}
首先多次调用
mineAsync
然后(作为第二阶段)等待所有 futures 完成
join
是非常重要的。很想写:
public interface CoinMiner {
BigDecimal mine() {
//...
}
}
然而,由于 Java 8 中流的惰性,任务将按顺序执行!如果您还没有理解流的懒惰性,请始终从下到上阅读它们:我们要求加入某个未来,因此流上升并调用 mineAsync() 一次(懒惰!),将其传递给 join()。当这个 join() 完成时,它会再次请求另一个 Future。通过使用 collect() 我们强制所有 mineAsync() 执行,开始所有异步计算。稍后我们等待他们中的每一个。
介绍
SseEmitter
现在是时候变得更加被动了(我说过了)。控制器可以返回 SseEmitter 的实例。一旦我们从处理程序方法返回,容器线程就会被释放并可以为更多即将到来的请求提供服务。但是连接没有关闭,客户端一直在等待!我们应该做的是保留 SseEmitter 实例的引用,并稍后从不同的线程调用其 send() 和 complete 方法。例如,我们可以启动一个长时间运行的进程并保持来自任意线程的 send()-ing 进程。一旦这个过程完成,我们完成()SseEmitter,最后关闭 HTTP 连接(至少在逻辑上,记住 Keep-alive)。在下面的示例中,我们有一堆 CompletableFutures,当每个完成时,我们只需将 1 发送到客户端 (notifyProgress())。当所有 futures 完成后,我们完成流 thenRun(sseEmitter::complete),关闭连接:
public interface CoinMiner {
BigDecimal mine() {
//...
}
}
调用此方法会产生以下响应(注意
Content-Type
):
public interface CoinMiner {
BigDecimal mine() {
//...
}
}
稍后我们将学习如何在客户端解释此类响应。现在让我们稍微清理一下设计。
引入具有
Observable
进度的 RxJava
上面的代码有效,但看起来很乱。我们实际上拥有的是一系列事件,每个事件代表计算的进展。计算最终完成,因此流也应该发出结束信号。听起来很像 Observable!我们首先重构 CoinMiner 以返回 Observable<BigDecimal>:
public interface CoinMiner {
BigDecimal mine() {
//...
}
}
每次从
mineMany()
返回的
Observable
中出现一个事件,我们就开采了那么多硬币。当所有的期货都完成后,我们也完成了流。这在实现方面看起来并没有好多少,但从控制器的角度来看它是多么干净:
public interface CoinMiner {
BigDecimal mine() {
//...
}
}
在调用
coinMiner.mineMany()
之后,我们只需订阅事件。原来
Observable
和
SseEmitter
方法匹配 1:1。这里发生的事情是不言自明的:启动异步计算,每次后台计算发出任何进展信号时,将其转发给客户端。 OK,让我们回到实现上。它看起来很乱,因为我们混合了
CompletableFuture
和
Observable
。我已经描述了如何
将
CompletableFuture
转换为只有一个元素的
Observable
。这是一个回顾,包括自 RxJava 1.0.13 以来发现的
rx.Single
抽象(此处未使用):
public interface CoinMiner {
BigDecimal mine() {
//...
}
}
在某个地方拥有这些实用程序运算符,我们可以改进实现并避免混合使用两个 API:
public interface CoinMiner {
BigDecimal mine() {
//...
}
}
RxJava 有一个内置的运算符,用于将多个 Observable 合并为一个,我们的每个底层 Observable 只发出一个事件并不重要。
深入研究 RxJava 运算符
让我们使用 RxJava 的强大功能来改进我们的流式传输。
scan()
目前,每次我们开采一枚硬币时,我们都会向客户端发送 (1) 事件。这意味着每个客户都必须跟踪它已经收到了多少硬币,以便计算总计算量。如果服务器总是发送总量而不是增量,那就太好了。但是,我们不想更改实现。事实证明,使用 Observable.scan() 运算符非常简单:
public interface CoinMiner {
BigDecimal mine() {
//...
}
}
scan()
运算符获取前一个事件和当前事件,将它们组合在一起。通过应用
BigDecimal::add
我们只需将所有数字相加即可。例如 1、1 + 1、(1 + 1) + 1 等等。
scan()
类似于
flatMap()
,但保留中间值。
使用
sample()
进行采样
可能是我们的后端服务产生了太多我们无法使用的进度更新。我们不想让不相关的更新淹没客户端并使带宽饱和。每秒最多发送两次更新听起来很合理。幸运的是,RxJava 也有一个内置的运算符:
public interface CoinMiner {
BigDecimal mine() {
//...
}
}
sample()
将定期查看底层流并仅发出最新的项目,丢弃中间的项目。幸运的是,我们使用
scan()
即时聚合项目,因此我们不会丢失任何更新。
window()
- 恒定的发射间隔
不过有一个问题。如果在选定的 500 毫秒内没有出现任何新内容,则 sample() 不会两次发出相同的项目。这很好,但请记住我们是通过 TCP/IP 连接推送这些更新。定期向客户端发送更新是个好主意,即使在此期间什么也没发生——只是为了保持连接,有点像 ping。可能有许多方法可以实现此要求,例如涉及 timeout() 运算符。我选择使用 window() 运算符每 500 毫秒对所有事件进行一次分组:
public interface CoinMiner {
BigDecimal mine() {
//...
}
}
这个很棘手。首先,我们将所有进度更新分组在 500 毫秒的窗口中。然后我们使用 reduce 计算在此时间段内开采的硬币总数(类似于 scan())。如果在那段时间没有开采任何硬币,我们就简单地返回零。我们最后使用 scan() 来聚合每个窗口的小计。我们不再需要 sample(),因为 window() 确保每 500 毫秒发出一个事件。
客户端
在 JavaScript 中有很多 SSE 用法的例子,所以只是为了给你一个调用我们的控制器的快速解决方案:
public interface CoinMiner {
BigDecimal mine() {
//...
}
}
我相信
SseEmitter
是 Spring MVC 的一项重大改进,它将使我们能够编写更健壮、更快的需要即时单向更新的 Web 应用程序。