英文原文
这个以示例驱动的教程为Java 8的Stream提供了深入的概述。当我第一次了解Stream
API时,由于其名称听起来与Java I/O中的InputStream
和OutputStream
相似,我感到困惑。但是Java 8的Stream是完全不同的。Stream是一种Monad(单子),因此在将函数式编程引入Java中起到了重要作用。
在函数式编程中,Monad是一种表示按顺序定义的计算步骤的结构。具有Monad结构的类型定义了如何链接操作,或者嵌套该类型的函数。
本指南教你如何使用Java 8的Stream,以及如何使用不同类型的可用流操作。你将了解处理顺序以及流操作顺序如何影响运行时性能。更强大的流操作reduce
、collect
和flatMap
也会被详细介绍。教程最后深入探讨了并行Stream。
如果你还不熟悉Java 8的lambda表达式、函数式接口和方法引用,建议在开始本教程之前先阅读Java 8教程。
Stream流是如何工作的
Stream表示一个有序的元素的序列,并支持用不同类型的操作来对这些元素进行计算:
Stream的操作可以分为中间操作(intermediate)和终端操作(terminal)。中间操作返回一个Stream,因此我们可以在不使用分号的情况下链接多个中间操作。终端操作要么是void类型,要么返回非流类型的结果。在上面的例子中,filter
、map
和sorted
是中间操作,而forEach
是终端操作。要查看所有可用的流操作列表,请参考Stream Javadoc。在上面的示例中,Stream操作链也被称为操作管道。
大多数流操作接受一种称为lambda的表达式作为参数,lambda表达式是指定操作行为的函数接口。这些操作中的大多数必须同时满足无副作用和无状态。什么意思呢?
当函数不修改Stream的底层数据源时,称其为无副作用。例如,在上面的示例中,没有任何lambda表达式通过添加或删除集合中的元素来修改myList。
当操作的执行是确定性的时,称其为无状态函数(确定性指的是,当输入相同时,输出总是相同)。例如,在上面的示例中,lambda表达式都不依赖于外部可能在执行过程中发生变化的变量或状态。
不同的Stream类型
Streams可以从各种数据源创建,特别是集合。Lists和Sets支持用stream()
和parallelStream()
来创建顺序或并行的Streams。并行的Streams能够在多个线程上运行,我们会在本教程的后面部分介绍。现在我们先关注顺序的Streams:
在一个集合List<Object>
上调用stream()
方法会返回一个普通的Stream<Object>
。但是我们不一定要集合才能使用Streams,看下面的代码示例:
只要用Stream.of()就可以从一堆对象引用创建一个Stream。
除了普通的对象Streams,Java 8还提供了一些特殊的Streams,用来处理基本数据类型int, long和double。它们是IntStream
, LongStream
和DoubleStream
。
IntStreams可以用来替代普通的for循环,利用IntStream.range()
方法:
所有这些基本类型的Streams都跟普通的对象Streams一样,但有几点不同:基本类型的Streams使用专门的lambda表达式,比如IntFunction
代替Function
或者IntPredicate
代替Predicate
。而且基本类型的Streams支持额外的终端聚合操作sum()
和average()
:
有时我们需要将包装类转换为基本类型转换为基本类型,反之亦然。Stream提供以下几个方法来支持这个需求, mapToInt()
, mapToLong()
and mapToDouble()
:
基本类型的Stream可以通过mapToObj()
转换为对象流:
这个例子先将double流转换为int流,然后转换为String对象流:
执行顺序
现在我们已经学习了如何创建和操作不同类型的流,让我们更深入地了解流操作在内部是如何处理的。
中间操作的一个重要特点是惰性求值。看下面这个示例,这里缺少了一个终端操作:
执行这段代码时,控制台不会输出任何内容。这是因为中间操作只有在存在终端操作时才会执行。
让我们给上面的示例添加终端操作forEach:
这下控制台打印出了我们预期的结果:
但结果的顺序可能和预期不一致。我们可能会认为Stream可能会依次对所有元素执行操作。但实际上,每个元素沿着调用链单独向下执行。第一个字符串”d2”先经过filter
操作,再经过forEach
操作,然后才处理第二个字符串”a2”。
这种行为可以减少对每个元素执行的实际操作数,如下面的示例所示:
只要anyMatch中的断言对输入的元素判断成功,anyMatch
就会立即返回true,第二个元素”A2”会返回true。由于Stream的调用链垂直执行,map
在这种情况下只需要执行两次。
为什么顺序很重要
下一个例子由两个中间操作map
和filter
以及终端操作forEach
组成。让我们看看这些操作是如何执行的:
你可能已经猜到,对于底层集合中的每个字符串,map
和filter
都被调用了五次,而forEach只被调用了一次。
如果我们改变操作的顺序,将filter
移到链的开头,我们可以大大减少实际的执行次数:
现在,map
只被调用了一次,所以,当输入元素非常多时,这种链式操作的执行速度会更快。在组合复杂的方法链时,请记住这一点。
让我们通过额外添加一个sorted
的操作来扩展上面的示例:
排序是一种特殊的中间操作。它是一种所谓的有状态操作,因为为了对元素集合进行排序,你必须在排序过程中维护状态。
执行上面的Demo,控制台输出:
首先,所有的输入集合都执行了排序操作。换句话说,排序是横向执行的。因此,在这种情况下,排序被调用了八次,因为集合中的每个元素有多种组合。
再次通过调整链的结构来优化性能:
调整后,sorted
一次都没有被调用,因为filter
操作将集合的元素过滤到只有一个。性能的到了巨大的提升。
重用Stream
Java 8 Stream
不能被重复调用,当你调用任意一个终端操作时,Stream
就会被关闭:
在同一个流上先调用 anyMatch
再调用 noneMatch
会导致以下异常:
为了绕过这种限制,我们必须对每个终端操作创建一个新的流。例如,我们可以创建一个Supplier<Stream>
以构造一个新的Stream
,并将所有中间操作都设置好:
每次调用 get() 都会构造一个新的流,我们可以在其上调用所需的终端操作。
进阶操作
Stream
支持许多不同的操作。我们已经学习了最重要的操作,如filter或map。剩下的留给你们自己去探索(请参阅Stream Javadoc)。现在,让我们深入探讨更复杂的操作collect、flatMap和reduce。
本节的大多数代码都使用下面的 People
结构进行演示:
Collect
collect
是一个非常有用的终端操作,可以将流中的元素转换为不同类型的结果,例如List
、Set
或Map
。collect
接受一个Collector
,它由四个不同的操作组成:supplier
、accumulator
、combiner
和finisher
。这听起来一开始非常复杂,但好消息是Java 8通过Collectors
类支持各种内置收集器。对于最常见的操作,不需要自己实现收集器。
让我们看一个最常见的例子:
如你所见,当你需要一个Set
而不是List
时 - 只需要使用 Collectors.toSet()
.
下一个例子,通过People.age
对List<People>
进行分组:
Collectors
的用途非常广。可以在Stream
的元素上执行聚合,例如,计算所有人员的平均年龄:
如果想要更详细的统计数据,那么Collectors.summarizing
将返回一个特殊的包含统计数据对象。因此,我们可以轻松确定人们的最小、最大和平均年龄,以及总和和计数。
在下一个例子里,我们会把List<People>
转换并组合成一个字符串:
Collectors.joining
需要一个分隔符,以及一个可选的前缀和后缀。
为了把Stream
转换为一个Map
,我们必须指定对keys和values的映射逻辑。要注意的是,映射出来的keys必须是唯一的,否则会抛出IllegalStateException
。也可以传入一个合并函数(merge function)将多个values合并为一个,以避免抛出异常。
我们了解了一些强大的内置collectors,现在我们来创建一个自己的collector。
我们希望把流中的所有人名转换成一个字符串,这个字符串由所有大写字母组成,用 | 管道符号分隔。为了实现这一目标,我们通过 Collector.of()
创建了一个新的Collector。我们需要实现Collector的四个组成部分:supplier
、accumulator
、combiner
和finisher
。
supplier
创建一个新的容器存放结果
accumulator
将一个新的数据元素合并到容器中
combiner
将两个容器合并成一个
finisher
对容器执行一个可选的最终转换
String
在java中是不可变的,我们需要一个助手StringJoiner
来帮助我们收集我们的字符串。
supplier
提供一个使用|
分割符的StringJoiner
。
accumulator
将每个People
的姓名转换为大写,并添加到StringJoiner
中。
combiner
提供了将2个StringJoiner
合并为一个的方法。
最后一步,finisher
将StringJoiner
转换为我们需要的String
类型
FlatMap
我们已经学会如何利用 map
操作将流中的对象转换为另一种对象。然而,map
有一定的局限性,因为每个对象只能映射到一个其他对象。但是,如果我们想将一个对象转换为多个对象或者不转换怎么办?这就是 flatMap
的用武之地。
flatMap
将流的每个元素转换为其他对象的流。因此,每个对象将被转换为由流支持的零个、一个或多个其他对象。这些流的数据将被放置到 flatMap
操作返回的流中。
在我们看到 flatMap
的实际应用之前,我们需要一个合适的类型层次结构:
接下来,我们利用我们对流的知识来实例化一些对象:
现在我们有一个包含三个 foo 对象的列表,每个 foo 对象都包含三个 bar 对象。
flatMap
方法接受一个函数,该函数必须返回一个Stream对象:
如你所见,我们成功把包含3个 foo 对象的流转换成了包含9个 bar 对象的流
最后,上面的代码可以简化整合为一个Stream的调用链:
在 Java 8 中引入的 Optional
类中也提供了 flatMap
方法。Optional
的 flatMap
操作返回另一种类型的可选对象。因此,它可以用于避免繁琐的 null 检查。
下面是一个嵌套的数据结构,例如:
为了解析Outer
实例的内部字符串 foo,必须添加多次检查 null 以避免出现 NullPointerException:
可以利用 Optional
类的 flatMap
操作来得到相同的效果:
每次调用 flatMap
方法都会返回一个 Optional
对象,如果对象存在,则 Optional
包装该对象;如果对象不存在,则返回 Optional.empty()
。
Reduce
Reduce
将Stream中的所有元素合并为一个结果。Java 8 支持三种不同类型的Reduce
方法。第一种方法将Stream中的元素减少为Stream中的一个元素。让我们看看如何使用这种方法来找到年纪最大的人:
reduce
方法接受一个BinaryOperator
累加器函数。实际上,这是一个BiFunction
,其中两个参数是相同的类型,即Person
。BiFunction
类似于Function
,但接受两个参数。上面的例子比较两个人的年龄,返回年龄大的人。
第二个reduce
方法接受一个初始值和BinaryOperator
累加器。这个方法可以用于从Stream
中的所有其他人构造一个新的Person,包含了聚合的名称和年龄。
第三种reduce
方法接受三个参数:一个初始值,一个类型为BiFunction
的累加器和一个类型为BinaryOperator
的组合函数。由于初始值的类型不限于Person
类型,因此我们可以利用这种reduce
方法来确定所有人的年龄总和:
可以看到结果是76,但是具体为什么是这个结果?让我们通过一些Debug输出来调试代码:
正如所看到的,accumulator
函数完成了所有的操作。首先使用初始值0和第一个人Max调用该函数。在接下来的三个步骤中,总和不断地增加,直到达到76岁的年龄之和。
等等,什么?combiner
从未被调用?我们用相同的流再试一次,但这次我们用并行执行的parallelStream
:
在并行流上执行产生了完全不同的行为。现在会调用combiner
。由于accumulator
是并行调用的,因此需要combiner
来合并单独累加的值。
让我们在深入探讨parallelStream
。
Parallel Streams
流可以以并行方式执行,这可以会提高大量元素输入时的运行性能。并行流使用通过静态ForkJoinPool.commonPool()
方法获取到公共ForkJoinPool
。底层线程池的大小最多使用X个线程,具体情况取决于可用的物理CPU核心数量:
在我的机器上,默认情况下commonPool
的并行线程数为3。可以通过设置以下JVM参数来减少或增加这个值:
集合类型支持parallelStream()
方法来创建parallelStream
。或者,可以在给定的流上调用中间方法parallel()
来将顺序流转换为parallelStream
。
为了更好地演示parallelStream
的执行行为,下面的示例会将当前线程的信息打印到控制台:
通过分析调试输出,我们能够更好地理解并行流中线程到底是如何执行的:
如你所见,parallel stream利用来自ForkJoinPool中所有可用线程来执行流的操作。输出可能在每次运行中有所不同,因为实际使用哪个特定线程的行为是不确定的。
让我们通过添加一个额外的流操作,sort
:
执行结果可能看起来会有些奇怪:
看起来sort
只在主线程上按顺序执行。实际上,对并行流进行排序在背后使用了新的Java 8方法Arrays.parallelSort()
。根据Javadoc中的说明,此方法根据数组的长度决定是否顺序或并行排序:
如果指定数组的长度小于最小粒度,则使用适当的Arrays.sort方法进行排序。
回到上一节中的reduce
示例中,我们已经发现组合函数仅在并行流中而不是在顺序流中调用。让我们看看实际涉及的线程:
控制台输出显示,accumulator
和combiner
函数都在所有可用线程上并行执行:
总之,可以说并行流可以为具有大量输入元素的流带来良好的性能提升。但是请记住,一些并行流操作(如reduce
和collect
)需要额外的计算(组合操作),而这些在顺序执行时是不需要的。
此外,我们已经知道了所有并行流操作共享同一个JVM范围内的公共ForkJoinPool。因此,尽可能避免在函数中执行慢操作来阻塞流,因为这可能会潜在地减慢依赖并行流的应用程序的其他部分的速度。
That’s it
我的Java 8 Stream编程指南到此结束。如果你想了解更多关于Java 8 Stream的内容,我的建议是查看Stream Javadoc包文档。希望本教程对你有所帮助,并且你享受阅读它的过程。
Happy coding!