使用 Java 8 CompletableFuture 和 Rx-Java Observable

一则或许对你有用的小广告

欢迎加入小哈的星球 ,你将获得:专属的项目实战 / Java 学习路线 / 一对一提问 / 学习打卡/ 赠书活动

目前,正在 星球 内带小伙伴们做第一个项目:全栈前后端分离博客项目,采用技术栈 Spring Boot + Mybatis Plus + Vue 3.x + Vite 4手把手,前端 + 后端全栈开发,从 0 到 1 讲解每个功能点开发步骤,1v1 答疑,陪伴式直到项目上线,目前已更新了 204 小节,累计 32w+ 字,讲解图:1416 张,还在持续爆肝中,后续还会上新更多项目,目标是将 Java 领域典型的项目都整上,如秒杀系统、在线商城、IM 即时通讯、权限管理等等,已有 870+ 小伙伴加入,欢迎点击围观

场景很简单——生成大约 10 个任务,每个任务返回一个字符串,并最终将结果收集到一个列表中。


顺序的

其顺序版本如下:


 public void testSequentialScatterGather() throws Exception {
 List<String> list =
   IntStream.range(0, 10)
     .boxed()
     .map(this::generateTask)
     .collect(Collectors.toList());

logger.info(list.toString()); }

private String generateTask(int i) { Util.delay(2000); return i + "-" + "test"; }




与 CompletableFuture

可以使用名为 supplyAsync 的实用方法使方法返回 CompletableFuture,我使用的是此方法的变体,它接受要使用的显式 Executor ,我还故意为其中一个输入抛出异常:


 public void testSequentialScatterGather() throws Exception {
 List<String> list =
   IntStream.range(0, 10)
     .boxed()
     .map(this::generateTask)
     .collect(Collectors.toList());

logger.info(list.toString()); }

private String generateTask(int i) { Util.delay(2000); return i + "-" + "test"; }



现在分散任务:


 public void testSequentialScatterGather() throws Exception {
 List<String> list =
   IntStream.range(0, 10)
     .boxed()
     .map(this::generateTask)
     .collect(Collectors.toList());

logger.info(list.toString()); }

private String generateTask(int i) { Util.delay(2000); return i + "-" + "test"; }



在分散任务结束时,结果是一个 CompletableFuture 列表。现在,要从中获取 String 列表有点棘手,这里我使用 Stackoverflow 中建议的解决方案之一:


 public void testSequentialScatterGather() throws Exception {
 List<String> list =
   IntStream.range(0, 10)
     .boxed()
     .map(this::generateTask)
     .collect(Collectors.toList());

logger.info(list.toString()); }

private String generateTask(int i) { Util.delay(2000); return i + "-" + "test"; }




CompletableFuture.allOf 方法在这里纯粹用于编写下一个动作,一旦所有分散的任务完成,一旦任务完成,期货将再次流式传输并收集到字符串列表中。


然后可以异步呈现最终结果:

查看源代码


 public void testSequentialScatterGather() throws Exception {
 List<String> list =
   IntStream.range(0, 10)
     .boxed()
     .map(this::generateTask)
     .collect(Collectors.toList());

logger.info(list.toString()); }

private String generateTask(int i) { Util.delay(2000); return i + "-" + "test"; }





使用 Rx-java Observable

使用 Rx-java 的分散收集比 CompletableFuture 版本相对更干净,因为 Rx-java 提供了更好的方法将结果组合在一起,同样是执行分散任务的方法:


查看源代码


 public void testSequentialScatterGather() throws Exception {
 List<String> list =
   IntStream.range(0, 10)
     .boxed()
     .map(this::generateTask)
     .collect(Collectors.toList());

logger.info(list.toString()); }

private String generateTask(int i) { Util.delay(2000); return i + "-" + "test"; }


并分散任务:


 public void testSequentialScatterGather() throws Exception {
 List<String> list =
   IntStream.range(0, 10)
     .boxed()
     .map(this::generateTask)
     .collect(Collectors.toList());

logger.info(list.toString()); }

private String generateTask(int i) { Util.delay(2000); return i + "-" + "test"; }


查看源代码


我又一次有了一个 Observable 列表,我需要的是一个结果列表,Observable 提供了一个 合并方法来做到这一点:



 public void testSequentialScatterGather() throws Exception {
 List<String> list =
   IntStream.range(0, 10)
     .boxed()
     .map(this::generateTask)
     .collect(Collectors.toList());

logger.info(list.toString()); }

private String generateTask(int i) { Util.delay(2000); return i + "-" + "test"; }


查看源代码

可以订阅并在可用时打印结果:


 public void testSequentialScatterGather() throws Exception {
 List<String> list =
   IntStream.range(0, 10)
     .boxed()
     .map(this::generateTask)
     .collect(Collectors.toList());

logger.info(list.toString()); }

private String generateTask(int i) { Util.delay(2000); return i + "-" + "test"; }


查看源代码