【转载】Java 8 Stream 教程
这个以示例驱动的教程为Java 8的Stream提供了深入的概述。当我第一次了解Stream
API时,由于其名称听起来与Java I/O中的InputStream
和OutputStream
相似,我感到困惑。但是Java 8的Stream是完全不同的。Stream是一种Monad(单子),因此在将函数式编程引入Java中起到了重要作用。
在函数式编程中,Monad是一种表示按顺序定义的计算步骤的结构。具有Monad结构的类型定义了如何链接操作,或者嵌套该类型的函数。
本指南教你如何使用Java 8的Stream,以及如何使用不同类型的可用流操作。你将了解处理顺序以及流操作顺序如何影响运行时性能。更强大的流操作reduce
、collect
和flatMap
也会被详细介绍。教程最后深入探讨了并行Stream。
如果你还不熟悉Java 8的lambda表达式、函数式接口和方法引用,建议在开始本教程之前先阅读Java 8教程。
Stream流是如何工作的
Stream表示一个有序的元素的序列,并支持用不同类型的操作来对这些元素进行计算:
List<String> myList =
Arrays.asList("a1", "a2", "b1", "c2", "c1");
myList
.stream()
.filter(s -> s.startsWith("c"))
.map(String::toUpperCase)
.sorted()
.forEach(System.out::println);
// C1
// C2
Stream的操作可以分为中间操作(intermediate)和终端操作(terminal)。中间操作返回一个Stream,因此我们可以在不使用分号的情况下链接多个中间操作。终端操作要么是void类型,要么返回非流类型的结果。在上面的例子中,filter
、map
和sorted
是中间操作,而forEach
是终端操作。要查看所有可用的流操作列表,请参考Stream Javadoc。在上面的示例中,Stream操作链也被称为操作管道。
大多数流操作接受一种称为lambda的表达式作为参数,lambda表达式是指定操作行为的函数接口。这些操作中的大多数必须同时满足无副作用和无状态。什么意思呢?
当函数不修改Stream的底层数据源时,称其为无副作用。例如,在上面的示例中,没有任何lambda表达式通过添加或删除集合中的元素来修改myList。
当操作的执行是确定性的时,称其为无状态函数(确定性指的是,当输入相同时,输出总是相同)。例如,在上面的示例中,lambda表达式都不依赖于外部可能在执行过程中发生变化的变量或状态。
不同的Stream类型
Streams可以从各种数据源创建,特别是集合。Lists和Sets支持用stream()
和parallelStream()
来创建顺序或并行的Streams。并行的Streams能够在多个线程上运行,我们会在本教程的后面部分介绍。现在我们先关注顺序的Streams:
Arrays.asList("a1", "a2", "a3")
.stream()
.findFirst()
.ifPresent(System.out::println); // a1
在一个集合List<Object>
上调用stream()
方法会返回一个普通的Stream<Object>
。但是我们不一定要集合才能使用Streams,看下面的代码示例:
Stream.of("a1", "a2", "a3")
.findFirst()
.ifPresent(System.out::println); // a1
只要用Stream.of()就可以从一堆对象引用创建一个Stream。
除了普通的对象Streams,Java 8还提供了一些特殊的Streams,用来处理基本数据类型int, long和double。它们是IntStream
, LongStream
和DoubleStream
。
IntStreams可以用来替代普通的for循环,利用IntStream.range()
方法:
IntStream.range(1, 4)
.forEach(System.out::println);
// 1
// 2
// 3
所有这些基本类型的Streams都跟普通的对象Streams一样,但有几点不同:基本类型的Streams使用专门的lambda表达式,比如IntFunction
代替Function
或者IntPredicate
代替Predicate
。而且基本类型的Streams支持额外的终端聚合操作sum()
和average()
:
Arrays.stream(new int[] {1, 2, 3})
.map(n -> 2 * n + 1)
.average()
.ifPresent(System.out::println); // 5.0
有时我们需要将包装类转换为基本类型转换为基本类型,反之亦然。Stream提供以下几个方法来支持这个需求, mapToInt()
, mapToLong()
and mapToDouble()
:
Stream.of("a1", "a2", "a3")
.map(s -> s.substring(1))
.mapToInt(Integer::parseInt)
.max()
.ifPresent(System.out::println); // 3
基本类型的Stream可以通过mapToObj()
转换为对象流:
IntStream.range(1, 4)
.mapToObj(i -> "a" + i)
.forEach(System.out::println);
// a1
// a2
// a3
这个例子先将double流转换为int流,然后转换为String对象流:
Stream.of(1.0, 2.0, 3.0)
.mapToInt(Double::intValue)
.mapToObj(i -> "a" + i)
.forEach(System.out::println);
// a1
// a2
// a3
执行顺序
现在我们已经学习了如何创建和操作不同类型的流,让我们更深入地了解流操作在内部是如何处理的。
中间操作的一个重要特点是惰性求值。看下面这个示例,这里缺少了一个终端操作:
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return true;
});
执行这段代码时,控制台不会输出任何内容。这是因为中间操作只有在存在终端操作时才会执行。
让我们给上面的示例添加终端操作forEach:
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return true;
})
.forEach(s -> System.out.println("forEach: " + s));
这下控制台打印出了我们预期的结果:
filter: d2
forEach: d2
filter: a2
forEach: a2
filter: b1
forEach: b1
filter: b3
forEach: b3
filter: c
forEach: c
但结果的顺序可能和预期不一致。我们可能会认为Stream可能会依次对所有元素执行操作。但实际上,每个元素沿着调用链单独向下执行。第一个字符串”d2”先经过filter
操作,再经过forEach
操作,然后才处理第二个字符串”a2”。
这种行为可以减少对每个元素执行的实际操作数,如下面的示例所示:
Stream.of("d2", "a2", "b1", "b3", "c")
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.anyMatch(s -> {
System.out.println("anyMatch: " + s);
return s.startsWith("A");
});
// map: d2
// anyMatch: D2
// map: a2
// anyMatch: A2
只要anyMatch中的断言对输入的元素判断成功,anyMatch
就会立即返回true,第二个元素”A2”会返回true。由于Stream的调用链垂直执行,map
在这种情况下只需要执行两次。
为什么顺序很重要
下一个例子由两个中间操作map
和filter
以及终端操作forEach
组成。让我们看看这些操作是如何执行的:
Stream.of("d2", "a2", "b1", "b3", "c")
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("A");
})
.forEach(s -> System.out.println("forEach: " + s));
// map: d2
// filter: D2
// map: a2
// filter: A2
// forEach: A2
// map: b1
// filter: B1
// map: b3
// filter: B3
// map: c
// filter: C
你可能已经猜到,对于底层集合中的每个字符串,map
和filter
都被调用了五次,而forEach只被调用了一次。
如果我们改变操作的顺序,将filter
移到链的开头,我们可以大大减少实际的执行次数:
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));
// filter: d2
// filter: a2
// map: a2
// forEach: A2
// filter: b1
// filter: b3
// filter: c
现在,map
只被调用了一次,所以,当输入元素非常多时,这种链式操作的执行速度会更快。在组合复杂的方法链时,请记住这一点。
让我们通过额外添加一个sorted
的操作来扩展上面的示例:
Stream.of("d2", "a2", "b1", "b3", "c")
.sorted((s1, s2) -> {
System.out.printf("sort: %s; %s\n", s1, s2);
return s1.compareTo(s2);
})
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));
排序是一种特殊的中间操作。它是一种所谓的有状态操作,因为为了对元素集合进行排序,你必须在排序过程中维护状态。
执行上面的Demo,控制台输出:
sort: a2; d2
sort: b1; a2
sort: b1; d2
sort: b1; a2
sort: b3; b1
sort: b3; d2
sort: c; b3
sort: c; d2
filter: a2
map: a2
forEach: A2
filter: b1
filter: b3
filter: c
filter: d2
首先,所有的输入集合都执行了排序操作。换句话说,排序是横向执行的。因此,在这种情况下,排序被调用了八次,因为集合中的每个元素有多种组合。
再次通过调整链的结构来优化性能:
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.sorted((s1, s2) -> {
System.out.printf("sort: %s; %s\n", s1, s2);
return s1.compareTo(s2);
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));
// filter: d2
// filter: a2
// filter: b1
// filter: b3
// filter: c
// map: a2
// forEach: A2
调整后,sorted
一次都没有被调用,因为filter
操作将集合的元素过滤到只有一个。性能的到了巨大的提升。
重用Stream
Java 8 Stream
不能被重复调用,当你调用任意一个终端操作时,Stream
就会被关闭:
Stream<String> stream =
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> s.startsWith("a"));
stream.anyMatch(s -> true); // ok
stream.noneMatch(s -> true); // exception
在同一个流上先调用 anyMatch
再调用 noneMatch
会导致以下异常:
java.lang.IllegalStateException: stream has already been operated upon or closed
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
at com.winterbe.java8.Streams5.test7(Streams5.java:38)
at com.winterbe.java8.Streams5.main(Streams5.java:28)
为了绕过这种限制,我们必须对每个终端操作创建一个新的流。例如,我们可以创建一个Supplier<Stream>
以构造一个新的Stream
,并将所有中间操作都设置好:
Supplier<Stream<String>> streamSupplier =
() -> Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> s.startsWith("a"));
streamSupplier.get().anyMatch(s -> true); // ok
streamSupplier.get().noneMatch(s -> true); // ok
每次调用 get() 都会构造一个新的流,我们可以在其上调用所需的终端操作。
进阶操作
Stream
支持许多不同的操作。我们已经学习了最重要的操作,如filter或map。剩下的留给你们自己去探索(请参阅Stream Javadoc)。现在,让我们深入探讨更复杂的操作collect、flatMap和reduce。
本节的大多数代码都使用下面的 People
结构进行演示:
class Person {
String name;
int age;
Person(String name, int age) {
this.name = name;
this.age = age;
}
@Override
public String toString() {
return name;
}
}
List<Person> persons =
Arrays.asList(
new Person("Max", 18),
new Person("Peter", 23),
new Person("Pamela", 23),
new Person("David", 12));
Collect
collect
是一个非常有用的终端操作,可以将流中的元素转换为不同类型的结果,例如List
、Set
或Map
。collect
接受一个Collector
,它由四个不同的操作组成:supplier
、accumulator
、combiner
和finisher
。这听起来一开始非常复杂,但好消息是Java 8通过Collectors
类支持各种内置收集器。对于最常见的操作,不需要自己实现收集器。
让我们看一个最常见的例子:
List<Person> filtered =
persons
.stream()
.filter(p -> p.name.startsWith("P"))
.collect(Collectors.toList());
System.out.println(filtered); // [Peter, Pamela]
如你所见,当你需要一个Set
而不是List
时 - 只需要使用 Collectors.toSet()
.
下一个例子,通过People.age
对List<People>
进行分组:
Map<Integer, List<Person>> personsByAge = persons
.stream()
.collect(Collectors.groupingBy(p -> p.age));
personsByAge
.forEach((age, p) -> System.out.format("age %s: %s\n", age, p));
// age 18: [Max]
// age 23: [Peter, Pamela]
// age 12: [David]
Collectors
的用途非常广。可以在Stream
的元素上执行聚合,例如,计算所有人员的平均年龄:
Double averageAge = persons
.stream()
.collect(Collectors.averagingInt(p -> p.age));
System.out.println(averageAge); // 19.0
如果想要更详细的统计数据,那么Collectors.summarizing
将返回一个特殊的包含统计数据对象。因此,我们可以轻松确定人们的最小、最大和平均年龄,以及总和和计数。
IntSummaryStatistics ageSummary =
persons
.stream()
.collect(Collectors.summarizingInt(p -> p.age));
System.out.println(ageSummary);
// IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}
在下一个例子里,我们会把List<People>
转换并组合成一个字符串:
String phrase = persons
.stream()
.filter(p -> p.age >= 18)
.map(p -> p.name)
.collect(Collectors.joining(" and ", "In Germany ", " are of legal age."));
System.out.println(phrase);
// In Germany Max and Peter and Pamela are of legal age.
Collectors.joining
需要一个分隔符,以及一个可选的前缀和后缀。
为了把Stream
转换为一个Map
,我们必须指定对keys和values的映射逻辑。要注意的是,映射出来的keys必须是唯一的,否则会抛出IllegalStateException
。也可以传入一个合并函数(merge function)将多个values合并为一个,以避免抛出异常。
Map<Integer, String> map = persons
.stream()
.collect(Collectors.toMap(
p -> p.age,
p -> p.name,
(name1, name2) -> name1 + ";" + name2));
System.out.println(map);
// {18=Max, 23=Peter;Pamela, 12=David}
我们了解了一些强大的内置collectors,现在我们来创建一个自己的collector。
我们希望把流中的所有人名转换成一个字符串,这个字符串由所有大写字母组成,用 | 管道符号分隔。为了实现这一目标,我们通过 Collector.of()
创建了一个新的Collector。我们需要实现Collector的四个组成部分:supplier
、accumulator
、combiner
和finisher
。
supplier
创建一个新的容器存放结果accumulator
将一个新的数据元素合并到容器中combiner
将两个容器合并成一个finisher
对容器执行一个可选的最终转换
Collector<Person, StringJoiner, String> personNameCollector =
Collector.of(
() -> new StringJoiner(" | "), // supplier
(j, p) -> j.add(p.name.toUpperCase()), // accumulator
(j1, j2) -> j1.merge(j2), // combiner
StringJoiner::toString); // finisher
String names = persons
.stream()
.collect(personNameCollector);
System.out.println(names); // MAX | PETER | PAMELA | DAVID
String
在java中是不可变的,我们需要一个助手StringJoiner
来帮助我们收集我们的字符串。
supplier
提供一个使用|
分割符的StringJoiner
。
accumulator
将每个People
的姓名转换为大写,并添加到StringJoiner
中。
combiner
提供了将2个StringJoiner
合并为一个的方法。
最后一步,finisher
将StringJoiner
转换为我们需要的String
类型
FlatMap
我们已经学会如何利用 map
操作将流中的对象转换为另一种对象。然而,map
有一定的局限性,因为每个对象只能映射到一个其他对象。但是,如果我们想将一个对象转换为多个对象或者不转换怎么办?这就是 flatMap
的用武之地。
flatMap
将流的每个元素转换为其他对象的流。因此,每个对象将被转换为由流支持的零个、一个或多个其他对象。这些流的数据将被放置到 flatMap
操作返回的流中。
在我们看到 flatMap
的实际应用之前,我们需要一个合适的类型层次结构:
class Foo {
String name;
List<Bar> bars = new ArrayList<>();
Foo(String name) {
this.name = name;
}
}
class Bar {
String name;
Bar(String name) {
this.name = name;
}
}
接下来,我们利用我们对流的知识来实例化一些对象:
List<Foo> foos = new ArrayList<>();
// create foos
IntStream
.range(1, 4)
.forEach(i -> foos.add(new Foo("Foo" + i)));
// create bars
foos.forEach(f ->
IntStream
.range(1, 4)
.forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));
现在我们有一个包含三个 foo 对象的列表,每个 foo 对象都包含三个 bar 对象。
flatMap
方法接受一个函数,该函数必须返回一个Stream对象:
foos.stream()
.flatMap(f -> f.bars.stream())
.forEach(b -> System.out.println(b.name));
// Bar1 <- Foo1
// Bar2 <- Foo1
// Bar3 <- Foo1
// Bar1 <- Foo2
// Bar2 <- Foo2
// Bar3 <- Foo2
// Bar1 <- Foo3
// Bar2 <- Foo3
// Bar3 <- Foo3
如你所见,我们成功把包含3个 foo 对象的流转换成了包含9个 bar 对象的流
最后,上面的代码可以简化整合为一个Stream的调用链:
IntStream.range(1, 4)
.mapToObj(i -> new Foo("Foo" + i))
.peek(f -> IntStream.range(1, 4)
.mapToObj(i -> new Bar("Bar" + i + " <- " f.name))
.forEach(f.bars::add))
.flatMap(f -> f.bars.stream())
.forEach(b -> System.out.println(b.name));
在 Java 8 中引入的 Optional
类中也提供了 flatMap
方法。Optional
的 flatMap
操作返回另一种类型的可选对象。因此,它可以用于避免繁琐的 null 检查。
下面是一个嵌套的数据结构,例如:
class Outer {
Nested nested;
}
class Nested {
Inner inner;
}
class Inner {
String foo;
}
为了解析Outer
实例的内部字符串 foo,必须添加多次检查 null 以避免出现 NullPointerException:
Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) {
System.out.println(outer.nested.inner.foo);
}
可以利用 Optional
类的 flatMap
操作来得到相同的效果:
Optional.of(new Outer())
.flatMap(o -> Optional.ofNullable(o.nested))
.flatMap(n -> Optional.ofNullable(n.inner))
.flatMap(i -> Optional.ofNullable(i.foo))
.ifPresent(System.out::println);
每次调用 flatMap
方法都会返回一个 Optional
对象,如果对象存在,则 Optional
包装该对象;如果对象不存在,则返回 Optional.empty()
。
Reduce
Reduce
将Stream中的所有元素合并为一个结果。Java 8 支持三种不同类型的Reduce
方法。第一种方法将Stream中的元素减少为Stream中的一个元素。让我们看看如何使用这种方法来找到年纪最大的人:
persons
.stream()
.reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
.ifPresent(System.out::println); // Pamela
reduce
方法接受一个BinaryOperator
累加器函数。实际上,这是一个BiFunction
,其中两个参数是相同的类型,即Person
。BiFunction
类似于Function
,但接受两个参数。上面的例子比较两个人的年龄,返回年龄大的人。
第二个reduce
方法接受一个初始值和BinaryOperator
累加器。这个方法可以用于从Stream
中的所有其他人构造一个新的Person,包含了聚合的名称和年龄。
Person result =
persons
.stream()
.reduce(new Person("", 0), (p1, p2) -> {
p1.age += p2.age;
p1.name += p2.name;
return p1;
});
System.out.format("name=%s; age=%s", result.name, result.age);
// name=MaxPeterPamelaDavid; age=76
第三种reduce
方法接受三个参数:一个初始值,一个类型为BiFunction
的累加器和一个类型为BinaryOperator
的组合函数。由于初始值的类型不限于Person
类型,因此我们可以利用这种reduce
方法来确定所有人的年龄总和:
Integer ageSum = persons
.stream()
.reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);
System.out.println(ageSum); // 76
可以看到结果是76,但是具体为什么是这个结果?让我们通过一些Debug输出来调试代码:
Integer ageSum = persons
.stream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
return sum1 + sum2;
});
// accumulator: sum=0; person=Max
// accumulator: sum=18; person=Peter
// accumulator: sum=41; person=Pamela
// accumulator: sum=64; person=David
正如所看到的,accumulator
函数完成了所有的操作。首先使用初始值0和第一个人Max调用该函数。在接下来的三个步骤中,总和不断地增加,直到达到76岁的年龄之和。
等等,什么?combiner
从未被调用?我们用相同的流再试一次,但这次我们用并行执行的parallelStream
:
Integer ageSum = persons
.parallelStream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
return sum1 + sum2;
});
// accumulator: sum=0; person=Pamela
// accumulator: sum=0; person=David
// accumulator: sum=0; person=Max
// accumulator: sum=0; person=Peter
// combiner: sum1=18; sum2=23
// combiner: sum1=23; sum2=12
// combiner: sum1=41; sum2=35
在并行流上执行产生了完全不同的行为。现在会调用combiner
。由于accumulator
是并行调用的,因此需要combiner
来合并单独累加的值。
让我们在深入探讨parallelStream
。
Parallel Streams
流可以以并行方式执行,这可以会提高大量元素输入时的运行性能。并行流使用通过静态ForkJoinPool.commonPool()
方法获取到公共ForkJoinPool
。底层线程池的大小最多使用X个线程,具体情况取决于可用的物理CPU核心数量:
ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism()); // 3
//int parallelism = Math.max(1, Runtime.getRuntime().availableProcessors() - 1);
在我的机器上,默认情况下commonPool
的并行线程数为3。可以通过设置以下JVM参数来减少或增加这个值:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
集合类型支持parallelStream()
方法来创建parallelStream
。或者,可以在给定的流上调用中间方法parallel()
来将顺序流转换为parallelStream
。
为了更好地演示parallelStream
的执行行为,下面的示例会将当前线程的信息打印到控制台:
Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]\n",
s, Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format("map: %s [%s]\n",
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.forEach(s -> System.out.format("forEach: %s [%s]\n",
s, Thread.currentThread().getName()));
通过分析调试输出,我们能够更好地理解并行流中线程到底是如何执行的:
filter: b1 [main]
filter: a2 [ForkJoinPool.commonPool-worker-1]
map: a2 [ForkJoinPool.commonPool-worker-1]
filter: c2 [ForkJoinPool.commonPool-worker-3]
map: c2 [ForkJoinPool.commonPool-worker-3]
filter: c1 [ForkJoinPool.commonPool-worker-2]
map: c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map: b1 [main]
forEach: B1 [main]
filter: a1 [ForkJoinPool.commonPool-worker-3]
map: a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]
如你所见,parallel stream利用来自ForkJoinPool中所有可用线程来执行流的操作。输出可能在每次运行中有所不同,因为实际使用哪个特定线程的行为是不确定的。
让我们通过添加一个额外的流操作,sort
:
Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]\n",
s, Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format("map: %s [%s]\n",
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.sorted((s1, s2) -> {
System.out.format("sort: %s <> %s [%s]\n",
s1, s2, Thread.currentThread().getName());
return s1.compareTo(s2);
})
.forEach(s -> System.out.format("forEach: %s [%s]\n",
s, Thread.currentThread().getName()));
执行结果可能看起来会有些奇怪:
filter: c2 [ForkJoinPool.commonPool-worker-3]
filter: c1 [ForkJoinPool.commonPool-worker-2]
map: c1 [ForkJoinPool.commonPool-worker-2]
filter: a2 [ForkJoinPool.commonPool-worker-1]
map: a2 [ForkJoinPool.commonPool-worker-1]
filter: b1 [main]
map: b1 [main]
filter: a1 [ForkJoinPool.commonPool-worker-2]
map: a1 [ForkJoinPool.commonPool-worker-2]
map: c2 [ForkJoinPool.commonPool-worker-3]
sort: A2 <> A1 [main]
sort: B1 <> A2 [main]
sort: C2 <> B1 [main]
sort: C1 <> C2 [main]
sort: C1 <> B1 [main]
sort: C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]
看起来sort
只在主线程上按顺序执行。实际上,对并行流进行排序在背后使用了新的Java 8方法Arrays.parallelSort()
。根据Javadoc中的说明,此方法根据数组的长度决定是否顺序或并行排序:
如果指定数组的长度小于最小粒度,则使用适当的Arrays.sort方法进行排序。
回到上一节中的reduce
示例中,我们已经发现组合函数仅在并行流中而不是在顺序流中调用。让我们看看实际涉及的线程:
List<Person> persons = Arrays.asList(
new Person("Max", 18),
new Person("Peter", 23),
new Person("Pamela", 23),
new Person("David", 12));
persons
.parallelStream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s [%s]\n",
sum, p, Thread.currentThread().getName());
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
sum1, sum2, Thread.currentThread().getName());
return sum1 + sum2;
});
控制台输出显示,accumulator
和combiner
函数都在所有可用线程上并行执行:
accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1]
combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1]
combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2]
combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2]
总之,可以说并行流可以为具有大量输入元素的流带来良好的性能提升。但是请记住,一些并行流操作(如reduce
和collect
)需要额外的计算(组合操作),而这些在顺序执行时是不需要的。
此外,我们已经知道了所有并行流操作共享同一个JVM范围内的公共ForkJoinPool。因此,尽可能避免在函数中执行慢操作来阻塞流,因为这可能会潜在地减慢依赖并行流的应用程序的其他部分的速度。
That’s it
我的Java 8 Stream编程指南到此结束。如果你想了解更多关于Java 8 Stream的内容,我的建议是查看Stream Javadoc包文档。希望本教程对你有所帮助,并且你享受阅读它的过程。
Happy coding!