使用 Java 8 CompletableFuture 和 Rx-Java Observable

一则或许对你有用的小广告

欢迎加入小哈的星球 ,你将获得:专属的项目实战 / 1v1 提问 / Java 学习路线 / 学习打卡 / 每月赠书 / 社群讨论

  • 新项目:《从零手撸:仿小红书(微服务架构)》 正在持续爆肝中,基于 Spring Cloud Alibaba + Spring Boot 3.x + JDK 17...点击查看项目介绍 ;
  • 《从零手撸:前后端分离博客项目(全栈开发)》 2 期已完结,演示链接: http://116.62.199.48/ ;

截止目前, 星球 内专栏累计输出 54w+ 字,讲解图 2476+ 张,还在持续爆肝中.. 后续还会上新更多项目,目标是将 Java 领域典型的项目都整一波,如秒杀系统, 在线商城, IM 即时通讯,权限管理,Spring Cloud Alibaba 微服务等等,已有 1900+ 小伙伴加入学习 ,欢迎点击围观

场景很简单——生成大约 10 个任务,每个任务返回一个字符串,并最终将结果收集到一个列表中。


顺序的

其顺序版本如下:


 public void testSequentialScatterGather() throws Exception {
 List<String> list =
   IntStream.range(0, 10)
     .boxed()
     .map(this::generateTask)
     .collect(Collectors.toList());

logger.info(list.toString()); }

private String generateTask(int i) { Util.delay(2000); return i + "-" + "test"; }




与 CompletableFuture

可以使用名为 supplyAsync 的实用方法使方法返回 CompletableFuture,我使用的是此方法的变体,它接受要使用的显式 Executor ,我还故意为其中一个输入抛出异常:


 public void testSequentialScatterGather() throws Exception {
 List<String> list =
   IntStream.range(0, 10)
     .boxed()
     .map(this::generateTask)
     .collect(Collectors.toList());

logger.info(list.toString()); }

private String generateTask(int i) { Util.delay(2000); return i + "-" + "test"; }



现在分散任务:


 public void testSequentialScatterGather() throws Exception {
 List<String> list =
   IntStream.range(0, 10)
     .boxed()
     .map(this::generateTask)
     .collect(Collectors.toList());

logger.info(list.toString()); }

private String generateTask(int i) { Util.delay(2000); return i + "-" + "test"; }



在分散任务结束时,结果是一个 CompletableFuture 列表。现在,要从中获取 String 列表有点棘手,这里我使用 Stackoverflow 中建议的解决方案之一:


 public void testSequentialScatterGather() throws Exception {
 List<String> list =
   IntStream.range(0, 10)
     .boxed()
     .map(this::generateTask)
     .collect(Collectors.toList());

logger.info(list.toString()); }

private String generateTask(int i) { Util.delay(2000); return i + "-" + "test"; }




CompletableFuture.allOf 方法在这里纯粹用于编写下一个动作,一旦所有分散的任务完成,一旦任务完成,期货将再次流式传输并收集到字符串列表中。


然后可以异步呈现最终结果:

查看源代码


 public void testSequentialScatterGather() throws Exception {
 List<String> list =
   IntStream.range(0, 10)
     .boxed()
     .map(this::generateTask)
     .collect(Collectors.toList());

logger.info(list.toString()); }

private String generateTask(int i) { Util.delay(2000); return i + "-" + "test"; }





使用 Rx-java Observable

使用 Rx-java 的分散收集比 CompletableFuture 版本相对更干净,因为 Rx-java 提供了更好的方法将结果组合在一起,同样是执行分散任务的方法:


查看源代码


 public void testSequentialScatterGather() throws Exception {
 List<String> list =
   IntStream.range(0, 10)
     .boxed()
     .map(this::generateTask)
     .collect(Collectors.toList());

logger.info(list.toString()); }

private String generateTask(int i) { Util.delay(2000); return i + "-" + "test"; }


并分散任务:


 public void testSequentialScatterGather() throws Exception {
 List<String> list =
   IntStream.range(0, 10)
     .boxed()
     .map(this::generateTask)
     .collect(Collectors.toList());

logger.info(list.toString()); }

private String generateTask(int i) { Util.delay(2000); return i + "-" + "test"; }


查看源代码


我又一次有了一个 Observable 列表,我需要的是一个结果列表,Observable 提供了一个 合并方法来做到这一点:



 public void testSequentialScatterGather() throws Exception {
 List<String> list =
   IntStream.range(0, 10)
     .boxed()
     .map(this::generateTask)
     .collect(Collectors.toList());

logger.info(list.toString()); }

private String generateTask(int i) { Util.delay(2000); return i + "-" + "test"; }


查看源代码

可以订阅并在可用时打印结果:


 public void testSequentialScatterGather() throws Exception {
 List<String> list =
   IntStream.range(0, 10)
     .boxed()
     .map(this::generateTask)
     .collect(Collectors.toList());

logger.info(list.toString()); }

private String generateTask(int i) { Util.delay(2000); return i + "-" + "test"; }


查看源代码