场景很简单——生成大约 10 个任务,每个任务返回一个字符串,并最终将结果收集到一个列表中。
顺序的
其顺序版本如下:
public void testSequentialScatterGather() throws Exception {
List<String> list =
IntStream.range(0, 10)
.boxed()
.map(this::generateTask)
.collect(Collectors.toList());
logger.info(list.toString());
}
private String generateTask(int i) {
Util.delay(2000);
return i + "-" + "test";
}
与 CompletableFuture
可以使用名为 supplyAsync 的实用方法使方法返回 CompletableFuture,我使用的是此方法的变体,它接受要使用的显式 Executor ,我还故意为其中一个输入抛出异常:
public void testSequentialScatterGather() throws Exception {
List<String> list =
IntStream.range(0, 10)
.boxed()
.map(this::generateTask)
.collect(Collectors.toList());
logger.info(list.toString());
}
private String generateTask(int i) {
Util.delay(2000);
return i + "-" + "test";
}
现在分散任务:
public void testSequentialScatterGather() throws Exception {
List<String> list =
IntStream.range(0, 10)
.boxed()
.map(this::generateTask)
.collect(Collectors.toList());
logger.info(list.toString());
}
private String generateTask(int i) {
Util.delay(2000);
return i + "-" + "test";
}
在分散任务结束时,结果是一个 CompletableFuture 列表。现在,要从中获取 String 列表有点棘手,这里我使用 Stackoverflow 中建议的解决方案之一:
public void testSequentialScatterGather() throws Exception {
List<String> list =
IntStream.range(0, 10)
.boxed()
.map(this::generateTask)
.collect(Collectors.toList());
logger.info(list.toString());
}
private String generateTask(int i) {
Util.delay(2000);
return i + "-" + "test";
}
CompletableFuture.allOf 方法在这里纯粹用于编写下一个动作,一旦所有分散的任务完成,一旦任务完成,期货将再次流式传输并收集到字符串列表中。
然后可以异步呈现最终结果:
public void testSequentialScatterGather() throws Exception {
List<String> list =
IntStream.range(0, 10)
.boxed()
.map(this::generateTask)
.collect(Collectors.toList());
logger.info(list.toString());
}
private String generateTask(int i) {
Util.delay(2000);
return i + "-" + "test";
}
使用 Rx-java Observable
使用 Rx-java 的分散收集比 CompletableFuture 版本相对更干净,因为 Rx-java 提供了更好的方法将结果组合在一起,同样是执行分散任务的方法:
public void testSequentialScatterGather() throws Exception {
List<String> list =
IntStream.range(0, 10)
.boxed()
.map(this::generateTask)
.collect(Collectors.toList());
logger.info(list.toString());
}
private String generateTask(int i) {
Util.delay(2000);
return i + "-" + "test";
}
并分散任务:
public void testSequentialScatterGather() throws Exception {
List<String> list =
IntStream.range(0, 10)
.boxed()
.map(this::generateTask)
.collect(Collectors.toList());
logger.info(list.toString());
}
private String generateTask(int i) {
Util.delay(2000);
return i + "-" + "test";
}
我又一次有了一个 Observable 列表,我需要的是一个结果列表,Observable 提供了一个 合并方法来做到这一点:
public void testSequentialScatterGather() throws Exception {
List<String> list =
IntStream.range(0, 10)
.boxed()
.map(this::generateTask)
.collect(Collectors.toList());
logger.info(list.toString());
}
private String generateTask(int i) {
Util.delay(2000);
return i + "-" + "test";
}
可以订阅并在可用时打印结果:
public void testSequentialScatterGather() throws Exception {
List<String> list =
IntStream.range(0, 10)
.boxed()
.map(this::generateTask)
.collect(Collectors.toList());
logger.info(list.toString());
}
private String generateTask(int i) {
Util.delay(2000);
return i + "-" + "test";
}