Stream 的添加是 Java 8 中的主要新功能之一。这个深入的教程介绍了流支持的许多功能,重点是简单实用的示例。
要理解本材料,您需要具备 Java 8 的基本应用知识(lambda 表达式、方法引用)。
介绍
首先,不应将 Java 8 Streams 与 Java I/O 流混淆(例如: FileInputStream 等);这些彼此之间关系不大。
简单地说,流是数据源的包装器,允许我们操作该数据源并使批量处理方便快捷。
流不存储数据,从这个意义上讲,它不是数据结构。它也从不修改底层数据源。
这个新功能 — java.util.stream — 支持对元素流的函数式操作,例如对集合的 map-reduce 转换。
在进入术语和核心概念之前,让我们现在深入研究流创建和使用的几个简单示例。
流创建
让我们首先从现有数组中获取一个流:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
我们还可以从现有 列表 中获取流:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
请注意, Java 8 向 Collection 接口添加了一个新的 stream() 方法。
我们可以使用 Stream.of() 从单个对象创建流:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
或者简单地使用 Stream.builder() :
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
还有其他获取流的方法,我们将在下面的部分中看到其中的一些方法。
流操作
现在让我们看看我们可以在语言中新的流支持的帮助下执行的一些常见用法和操作。
forEach() 是最简单和最常见的操作;它遍历流元素,在每个元素上调用提供的函数。
该方法非常常见,在 Iterable、Map 等中已直接引入:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
这将有效地调用 empList 中每个元素的 salaryIncrement() 。
forEach() 是一个终端操作 ,这意味着,执行该操作后,stream pipeline 被认为已消耗,不能再使用。我们将在下一节中详细讨论终端操作。
map() 在将函数应用于原始流的每个元素后生成一个新流。新流可以是不同的类型。
以下示例将 Integer 流转换为 Employee 流:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
在这里,我们从数组中获取员工 ID 的 整数 流。每个 Integer 都被传递给函数 employeeRepository::findById() — 它返回相应的 Employee 对象;这有效地形成了一个 Employee 流。
我们在前面的例子中看到了 collect() 是如何工作的;完成所有处理后,这是从流中获取内容的常用方法之一:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
collect() 对 Stream 实例中保存的数据元素执行可变折叠操作(将元素重新打包为某些数据结构并应用一些额外的逻辑、连接它们等)。
此操作的策略是通过 Collector 接口实现提供的。在上面的示例中,我们使用 toList 收集器将所有 Stream 元素收集到一个 List 实例中。
接下来,让我们看一下 filter() ;这会生成一个新流,其中包含通过给定测试(由谓词指定)的原始流的元素。
让我们来看看它是如何工作的:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
在上面的示例中,我们首先过滤掉 无效 员工 ID 的空引用,然后再次应用过滤器以仅保留薪水超过特定阈值的员工。
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
这里返回第一个薪水大于 100000 的员工。如果不存在这样的员工,则返回 null 。
我们看到了如何使用 collect() 从流中获取数据。如果我们需要从流中获取一个数组,我们可以简单地使用 toArray() :
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
语法 Employee[]::new 创建一个空的 Employee 数组——然后用流中的元素填充它。
流可以包含复杂的数据结构,例如 Stream<List<String>> 。在这种情况下, flatMap() 帮助我们扁平化数据结构以简化进一步的操作:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
我们在本节前面看到了 forEach() ,这是一个终端操作。但是,有时我们需要在应用任何终端操作之前对流的每个元素执行多个操作。
peek() 在这种情况下很有用。简单地说,它对流的每个元素执行指定的操作,并返回一个可以进一步使用的新流。 peek() 是一个中间操作 :
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
方法类型和管道
正如我们一直在讨论的,流操作分为中间操作和终端操作。
诸如 filter() 之类的中间操作返回一个新流,可以对其进行进一步处理。终端操作,例如 forEach() ,将流标记为已消耗,之后就不能再使用了。
流管道由流源、零个或多个中间操作和终端操作组成。
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
有些操作被认为是 短路操作 。短路操作允许对无限流的计算在有限时间内完成:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
稍后我们将详细讨论无限流。
惰性评估
流最重要的特征之一是它们允许通过惰性评估进行显着优化。
仅在启动终端操作时才对源数据进行计算,并且仅在需要时使用源元素。所有中间操作都是惰性的,因此直到实际需要处理结果时才执行它们。
例如,考虑我们之前看到的 findFirst() 示例。这里执行了多少次 map() 操作? 4 次,因为输入数组包含 4 个元素?
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
Stream 执行 映射 和两个 过滤 操作,一次一个元素。
它首先对 id 1 执行所有操作。由于 id 1 的薪水不大于 100000,因此处理转到下一个元素。
Id 2 满足两个过滤器谓词,因此流评估终端操作 findFirst() 并返回结果。
不对 id 3 和 4 执行任何操作。
延迟处理流可以避免在不必要时检查所有数据。当输入流是无限的而不只是非常大时,这种行为变得更加重要。
基于比较的流操作
让我们从 sorted() 操作开始——它根据我们传递给它的比较器对流元素进行排序。
例如,我们可以根据姓名对 Employee 进行排序:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
请注意,短路不会应用于 sorted() 。
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
我们还可以避免使用 Comparator.comparing() 来定义比较逻辑:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
distinct() 不接受任何参数并返回流中的不同元素,消除重复项。它使用元素的 equals() 方法来判断两个元素是否相等:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
allMatch、anyMatch 和 noneMatch
这些操作都接受一个谓词并返回一个布尔值。一旦确定答案,就会应用短路并停止处理:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
allMatch() 检查流中所有元素的谓词是否为真。这里一遇到不能被2整除的5就返回 false 。
anyMatch() 检查谓词对于流中的任何一个元素是否为真。在这里,再次应用短路,并在第一个元素之后立即返回 true 。
noneMatch() 检查是否没有与谓词匹配的元素。在这里,它只是一遇到能被 3 整除的 6 就返回 false 。
流专业化
从我们目前讨论的内容来看, Stream 是一个对象引用流。但是,还有 IntStream 、 LongStream 和 DoubleStream — 它们分别是 int 、 long 和 double 的原始特化。在处理大量数字基元时,这些非常方便。
创建
创建 IntStream 的最常见方法是在现有流上调用 mapToInt() :
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
在这里,我们从 Stream<Employee> 开始,并通过将 Employee::getId 提供给 mapToInt 来获得 IntStream 。最后,我们调用 max() 返回最高整数。
我们还可以使用 IntStream.of() 来创建 IntStream :
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
或 IntStream.range() :
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
这会创建数字 10 到 19 的 IntStream 。
在我们继续下一个主题之前,需要注意一个重要区别:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
这将返回 Stream<Integer> 而不是 IntStream 。
同样,使用 map() 而不是 mapToInt() 会返回 Stream<Integer> 而不是 IntStream。 :
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
专业化运营
与标准 流 相比,专用流提供了额外的操作——这在处理数字时非常方便。
例如 sum()、average()、range() 等:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
还原操作
缩减操作(也称为折叠)采用一系列输入元素,并通过重复应用组合操作将它们组合成单个汇总结果。 我们已经看到了一些缩减操作,如 findFirst() 、 min() 和 max ()。
让我们看一下通用的 reduce() 操作。
reduce() 最常见的形式是:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
其中 身份 是起始值, 累加器 是我们重复应用的二元运算。
例如:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
在这里,我们从初始值 0 开始,并在流的元素上重复应用 Double::sum() 。实际上,我们通过在 Stream 上应用 reduce( ) 实现了 DoubleStream.sum() 。
进阶收集
我们已经了解了如何使用 Collectors.toList() 从流中获取列表。现在让我们看看更多从流中收集元素的方法。
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
我们还可以使用 toSet() 从流元素中获取一组:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
收藏
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
这里,在内部创建了一个空集合,并在流的每个元素上调用它的 add() 方法。
总结双
summarizingDouble() 是另一个有趣的收集器——它对每个输入元素应用一个双重生成映射函数,并返回一个包含结果值统计信息的特殊类:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
请注意我们如何分析每个员工的薪水并获取有关该数据的统计信息——例如最小值、最大值、平均值等。
当我们使用其中一种专用流时, summaryStatistics() 可用于生成类似的结果:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
分区依据
我们可以根据元素是否满足特定条件将流分成两部分。
让我们将数值数据列表拆分为偶数和奇数:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
在这里,流被划分到一个 Map 中,偶数和奇数存储为 true 和 false 键。
分组方式
groupingBy() 提供高级分区——我们可以将流分成两个以上的组。
它以分类函数作为参数。该分类函数应用于流的每个元素。
该函数返回的值用作我们从 groupingBy 收集器获得的映射的键:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
在这个简单的示例中,我们根据员工名字的首字母对员工进行分组。
上一节中讨论的 groupingBy() 使用 Map 对流的元素进行分组。
但是,有时我们可能需要将数据分组为元素类型以外的类型。
这是我们如何做到的;我们可以使用 mapping() ,它实际上可以使收集器适应不同的类型——使用映射函数:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
这里的 mapping() 使用 getId() 映射函数将流元素 Employee 映射到员工 ID——这是一个 整数 。这些 id 仍然根据员工名字的首字符进行分组。
reducing() 类似于我们之前探讨过的 reduce() 。它只是返回一个收集器,该收集器执行其输入元素的归约:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
这里 reducing() 获取每个员工的工资增量并返回总和。
reducing() 在 groupingBy() 或 partitioningBy() 下游的多级缩减中使用时最有用。要对流执行简单的归约,请改用 reduce() 。
例如,让我们看看如何将 reducing() 与 groupingBy() 结合使用:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
在这里,我们根据员工名字的首字母对员工进行分组。在每个组中,我们找到名字最长的员工。
并行流
使用对并行流的支持,我们可以并行执行流操作,而无需编写任何样板代码;我们只需要将流指定为并行:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
这里 salaryIncrement() 将通过简单地添加 parallel() 语法在流的多个元素上并行执行。
当然,如果您需要对操作的性能特征进行更多控制,则可以 进一步调整和配置 此功能。
与编写多线程代码的情况一样,我们在使用并行流时需要注意以下几点:
- 我们需要确保代码是线程安全的。如果并行执行的操作修改共享数据,则需要特别小心。
- 如果执行操作的顺序或输出流中返回的顺序很重要,我们不应该使用并行流。例如,在并行流的情况下,像 findFirst() 这样的操作可能会产生不同的结果。
- 此外,我们应该确保让代码并行执行是值得的。了解操作的性能特征, 尤其是了解整个系统的 性能特征——在这里自然非常重要。
无限流
有时,我们可能希望在元素仍在生成时执行操作。我们可能事先不知道需要多少元素。与使用 list 或 map 不同,其中所有元素都已填充,我们可以使用无限流,也称为无界流。
有两种方法可以生成无限流:
我们为 generate() 提供了一个 Supplier ,只要需要生成新的流元素,它就会被调用:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
在这里,我们将 Math:: random() 作为 Supplier 传递,它返回下一个随机数。
对于无限流,我们需要提供最终终止处理的条件。一种常见的方法是使用 limit() 。在上面的示例中,我们将流限制为 5 个随机数,并在生成它们时打印它们。
请注意,传递给 generate() 的 Supplier 可能是有状态的,这样的流在并行使用时可能不会产生相同的结果。
iterate() 有两个参数:一个初始值,称为种子元素和一个使用前一个值生成下一个元素的函数。 iterate() 在设计上是有状态的,因此在并行流中可能没有用:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
在这里,我们将 2 作为种子值传递,它成为流的第一个元素。此值作为输入传递给返回 4 的 lambda。此值又作为下一次迭代的输入传递。
这一直持续到我们生成了 limit() 指定的元素数量,它作为终止条件。
文件操作
让我们看看如何在文件操作中使用流。
文件写入操作
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
这里我们使用 forEach() 通过调用 PrintWriter.println() 将流的每个元素写入文件。
文件读取操作
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
getPalindrome() 在流上工作,完全不知道流是如何生成的。这也增加了代码的可重用性并简化了单元测试。
结论
在本文中,我们重点介绍了 Java 8 中新的 Stream 功能的细节。我们看到了支持的各种操作以及如何使用 lambda 和管道来编写简洁的代码。
我们还看到了流的一些特性,例如惰性求值、并行和无限流。
您可以通过查看 反应式范式继续探索这些概念,反应式 范式通过与我们在此讨论的概念非常相似的概念而成为可能。
而且,像往常一样,您可以 在 GitHub 上 找到源代码。
如果您喜欢这篇文章并想了解有关 Java Streams 的更多信息,请查看有关 Java Streams 的所有 教程和文章的集合 。