使用 RxJava 和 SseEmitter 的服务器发送事件

一则或许对你有用的小广告

欢迎加入小哈的星球 ,你将获得:专属的项目实战 / Java 学习路线 / 一对一提问 / 学习打卡/ 赠书活动

目前,正在 星球 内带小伙伴们做第一个项目:全栈前后端分离博客项目,采用技术栈 Spring Boot + Mybatis Plus + Vue 3.x + Vite 4手把手,前端 + 后端全栈开发,从 0 到 1 讲解每个功能点开发步骤,1v1 答疑,陪伴式直到项目上线,目前已更新了 204 小节,累计 32w+ 字,讲解图:1416 张,还在持续爆肝中,后续还会上新更多项目,目标是将 Java 领域典型的项目都整上,如秒杀系统、在线商城、IM 即时通讯、权限管理等等,已有 870+ 小伙伴加入,欢迎点击围观

Spring framework 4.2 GA即将发布,让我们来看看它提供的一些新特性。引起我注意的是一个简单的新类 SseEmitter 一种在 Spring MVC 控制器中很容易使用的 服务器发送事件的 抽象。 SSE 是一种技术,允许您在一个 HTTP 连接中以一个方向将数据从服务器流式传输到浏览器。这听起来像是 websockets 可以做的一部分。然而,由于它是一个简单得多的协议,它可以在不需要全双工的情况下使用,例如实时推送股票价格变化或显示长期运行过程的进度。这将是我们的榜样。


假设我们有一个具有以下 API 的虚拟硬币矿工:


 public interface CoinMiner {
BigDecimal mine() {
    //...
}

}


每次我们调用 mine() 时,我们都必须等待几秒钟,然后我们得到大约 1 个硬币的回报(平均)。如果我们想挖多个币,我们必须多次调用这个方法:


 public interface CoinMiner {
BigDecimal mine() {
    //...
}

}

客户端代码必须显式提供 ExecutorService (只是一个设计选择):


 public interface CoinMiner {
BigDecimal mine() {
    //...
}

}

首先多次调用 mineAsync 然后(作为第二阶段)等待所有 futures 完成 join 是非常重要的。很想写:


 public interface CoinMiner {
BigDecimal mine() {
    //...
}

}

然而,由于 Java 8 中流的惰性,任务将按顺序执行!如果您还没有理解流的懒惰性,请始终从下到上阅读它们:我们要求加入某个未来,因此流上升并调用 mineAsync() 一次(懒惰!),将其传递给 join()。当这个 join() 完成时,它会再次请求另一个 Future。通过使用 collect() 我们强制所有 mineAsync() 执行,开始所有异步计算。稍后我们等待他们中的每一个。


介绍 SseEmitter

现在是时候变得更加被动了(我说过了)。控制器可以返回 SseEmitter 的实例。一旦我们从处理程序方法返回,容器线程就会被释放并可以为更多即将到来的请求提供服务。但是连接没有关闭,客户端一直在等待!我们应该做的是保留 SseEmitter 实例的引用,并稍后从不同的线程调用其 send() 和 complete 方法。例如,我们可以启动一个长时间运行的进程并保持来自任意线程的 send()-ing 进程。一旦这个过程完成,我们完成()SseEmitter,最后关闭 HTTP 连接(至少在逻辑上,记住 Keep-alive)。在下面的示例中,我们有一堆 CompletableFutures,当每个完成时,我们只需将 1 发送到客户端 (notifyProgress())。当所有 futures 完成后,我们完成流 thenRun(sseEmitter::complete),关闭连接:


 public interface CoinMiner {
BigDecimal mine() {
    //...
}

}

调用此方法会产生以下响应(注意 Content-Type ):


 public interface CoinMiner {
BigDecimal mine() {
    //...
}

}

稍后我们将学习如何在客户端解释此类响应。现在让我们稍微清理一下设计。


引入具有 Observable 进度的 RxJava

上面的代码有效,但看起来很乱。我们实际上拥有的是一系列事件,每个事件代表计算的进展。计算最终完成,因此流也应该发出结束信号。听起来很像 Observable!我们首先重构 CoinMiner 以返回 Observable<BigDecimal>:


 public interface CoinMiner {
BigDecimal mine() {
    //...
}

}


每次从 mineMany() 返回的 Observable 中出现一个事件,我们就开采了那么多硬币。当所有的期货都完成后,我们也完成了流。这在实现方面看起来并没有好多少,但从控制器的角度来看它是多么干净:


 public interface CoinMiner {
BigDecimal mine() {
    //...
}

}


在调用 coinMiner.mineMany() 之后,我们只需订阅事件。原来 Observable SseEmitter 方法匹配 1:1。这里发生的事情是不言自明的:启动异步计算,每次后台计算发出任何进展信号时,将其转发给客户端。 OK,让我们回到实现上。它看起来很乱,因为我们混合了 CompletableFuture Observable 。我已经描述了如何 CompletableFuture 转换为只有一个元素的 Observable 。这是一个回顾,包括自 RxJava 1.0.13 以来发现的 rx.Single 抽象(此处未使用):


 public interface CoinMiner {
BigDecimal mine() {
    //...
}

}

在某个地方拥有这些实用程序运算符,我们可以改进实现并避免混合使用两个 API:


 public interface CoinMiner {
BigDecimal mine() {
    //...
}

}

RxJava 有一个内置的运算符,用于将多个 Observable 合并为一个,我们的每个底层 Observable 只发出一个事件并不重要。


深入研究 RxJava 运算符

让我们使用 RxJava 的强大功能来改进我们的流式传输。


scan()

目前,每次我们开采一枚硬币时,我们都会向客户端发送 (1) 事件。这意味着每个客户都必须跟踪它已经收到了多少硬币,以便计算总计算量。如果服务器总是发送总量而不是增量,那就太好了。但是,我们不想更改实现。事实证明,使用 Observable.scan() 运算符非常简单:


 public interface CoinMiner {
BigDecimal mine() {
    //...
}

}

scan() 运算符获取前一个事件和当前事件,将它们组合在一起。通过应用 BigDecimal::add 我们只需将所有数字相加即可。例如 1、1 + 1、(1 + 1) + 1 等等。 scan() 类似于 flatMap() ,但保留中间值。

使用 sample() 进行采样

可能是我们的后端服务产生了太多我们无法使用的进度更新。我们不想让不相关的更新淹没客户端并使带宽饱和。每秒最多发送两次更新听起来很合理。幸运的是,RxJava 也有一个内置的运算符:


 public interface CoinMiner {
BigDecimal mine() {
    //...
}

}


sample() 将定期查看底层流并仅发出最新的项目,丢弃中间的项目。幸运的是,我们使用 scan() 即时聚合项目,因此我们不会丢失任何更新。

window() - 恒定的发射间隔

不过有一个问题。如果在选定的 500 毫秒内没有出现任何新内容,则 sample() 不会两次发出相同的项目。这很好,但请记住我们是通过 TCP/IP 连接推送这些更新。定期向客户端发送更新是个好主意,即使在此期间什么也没发生——只是为了保持连接,有点像 ping。可能有许多方法可以实现此要求,例如涉及 timeout() 运算符。我选择使用 window() 运算符每 500 毫秒对所有事件进行一次分组:



 public interface CoinMiner {
BigDecimal mine() {
    //...
}

}

这个很棘手。首先,我们将所有进度更新分组在 500 毫秒的窗口中。然后我们使用 reduce 计算在此时间段内开采的硬币总数(类似于 scan())。如果在那段时间没有开采任何硬币,我们就简单地返回零。我们最后使用 scan() 来聚合每个窗口的小计。我们不再需要 sample(),因为 window() 确保每 500 毫秒发出一个事件。


客户端

在 JavaScript 中有很多 SSE 用法的例子,所以只是为了给你一个调用我们的控制器的快速解决方案:


 public interface CoinMiner {
BigDecimal mine() {
    //...
}

}


我相信 SseEmitter 是 Spring MVC 的一项重大改进,它将使我们能够编写更健壮、更快的需要即时单向更新的 Web 应用程序。