【转载】Java 8 Stream 教程

英文原文

这个以示例驱动的教程为Java 8的Stream提供了深入的概述。当我第一次了解Stream API时,由于其名称听起来与Java I/O中的InputStreamOutputStream相似,我感到困惑。但是Java 8的Stream是完全不同的。Stream是一种Monad(单子),因此在将函数式编程引入Java中起到了重要作用。

在函数式编程中,Monad是一种表示按顺序定义的计算步骤的结构。具有Monad结构的类型定义了如何链接操作,或者嵌套该类型的函数。

本指南教你如何使用Java 8的Stream,以及如何使用不同类型的可用流操作。你将了解处理顺序以及流操作顺序如何影响运行时性能。更强大的流操作reducecollectflatMap也会被详细介绍。教程最后深入探讨了并行Stream。

如果你还不熟悉Java 8的lambda表达式、函数式接口和方法引用,建议在开始本教程之前先阅读Java 8教程。

Stream流是如何工作的

Stream表示一个有序的元素的序列,并支持用不同类型的操作来对这些元素进行计算:

List<String> myList =
    Arrays.asList("a1", "a2", "b1", "c2", "c1");

myList
    .stream()
    .filter(s -> s.startsWith("c"))
    .map(String::toUpperCase)
    .sorted()
    .forEach(System.out::println);

// C1
// C2

Stream的操作可以分为中间操作(intermediate)和终端操作(terminal)。中间操作返回一个Stream,因此我们可以在不使用分号的情况下链接多个中间操作。终端操作要么是void类型,要么返回非流类型的结果。在上面的例子中,filtermapsorted是中间操作,而forEach是终端操作。要查看所有可用的流操作列表,请参考Stream Javadoc。在上面的示例中,Stream操作链也被称为操作管道

大多数流操作接受一种称为lambda的表达式作为参数,lambda表达式是指定操作行为的函数接口。这些操作中的大多数必须同时满足无副作用无状态。什么意思呢?

当函数不修改Stream的底层数据源时,称其为无副作用。例如,在上面的示例中,没有任何lambda表达式通过添加或删除集合中的元素来修改myList。

当操作的执行是确定性的时,称其为无状态函数(确定性指的是,当输入相同时,输出总是相同)。例如,在上面的示例中,lambda表达式都不依赖于外部可能在执行过程中发生变化的变量或状态。

不同的Stream类型

Streams可以从各种数据源创建,特别是集合。Lists和Sets支持用stream()parallelStream()来创建顺序或并行的Streams。并行的Streams能够在多个线程上运行,我们会在本教程的后面部分介绍。现在我们先关注顺序的Streams:

Arrays.asList("a1", "a2", "a3")
    .stream()
    .findFirst()
    .ifPresent(System.out::println);  // a1

在一个集合List<Object>上调用stream()方法会返回一个普通的Stream<Object>。但是我们不一定要集合才能使用Streams,看下面的代码示例:

Stream.of("a1", "a2", "a3")
    .findFirst()
    .ifPresent(System.out::println);  // a1

只要用Stream.of()就可以从一堆对象引用创建一个Stream。

除了普通的对象Streams,Java 8还提供了一些特殊的Streams,用来处理基本数据类型int, long和double。它们是IntStream, LongStreamDoubleStream

IntStreams可以用来替代普通的for循环,利用IntStream.range()方法:

IntStream.range(1, 4)
    .forEach(System.out::println);

// 1
// 2
// 3

所有这些基本类型的Streams都跟普通的对象Streams一样,但有几点不同:基本类型的Streams使用专门的lambda表达式,比如IntFunction代替Function或者IntPredicate代替Predicate。而且基本类型的Streams支持额外的终端聚合操作sum()average()

Arrays.stream(new int[] {1, 2, 3})
    .map(n -> 2 * n + 1)
    .average()
    .ifPresent(System.out::println);  // 5.0

有时我们需要将包装类转换为基本类型转换为基本类型,反之亦然。Stream提供以下几个方法来支持这个需求, mapToInt(), mapToLong() and mapToDouble():

Stream.of("a1", "a2", "a3")
    .map(s -> s.substring(1))
    .mapToInt(Integer::parseInt)
    .max()
    .ifPresent(System.out::println);  // 3

基本类型的Stream可以通过mapToObj()转换为对象流:

IntStream.range(1, 4)
    .mapToObj(i -> "a" + i)
    .forEach(System.out::println);

// a1
// a2
// a3

这个例子先将double流转换为int流,然后转换为String对象流:

Stream.of(1.0, 2.0, 3.0)
    .mapToInt(Double::intValue)
    .mapToObj(i -> "a" + i)
    .forEach(System.out::println);

// a1
// a2
// a3

执行顺序

现在我们已经学习了如何创建和操作不同类型的流,让我们更深入地了解流操作在内部是如何处理的。

中间操作的一个重要特点是惰性求值。看下面这个示例,这里缺少了一个终端操作:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    });

执行这段代码时,控制台不会输出任何内容。这是因为中间操作只有在存在终端操作时才会执行。

让我们给上面的示例添加终端操作forEach:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    })
    .forEach(s -> System.out.println("forEach: " + s));

这下控制台打印出了我们预期的结果:

filter:  d2
forEach: d2
filter:  a2
forEach: a2
filter:  b1
forEach: b1
filter:  b3
forEach: b3
filter:  c
forEach: c

但结果的顺序可能和预期不一致。我们可能会认为Stream可能会依次对所有元素执行操作。但实际上,每个元素沿着调用链单独向下执行。第一个字符串”d2”先经过filter操作,再经过forEach操作,然后才处理第二个字符串”a2”。

这种行为可以减少对每个元素执行的实际操作数,如下面的示例所示:

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .anyMatch(s -> {
        System.out.println("anyMatch: " + s);
        return s.startsWith("A");
    });

// map:      d2
// anyMatch: D2
// map:      a2
// anyMatch: A2

只要anyMatch中的断言对输入的元素判断成功,anyMatch就会立即返回true,第二个元素”A2”会返回true。由于Stream的调用链垂直执行,map在这种情况下只需要执行两次。

为什么顺序很重要

下一个例子由两个中间操作mapfilter以及终端操作forEach组成。让我们看看这些操作是如何执行的:

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("A");
    })
    .forEach(s -> System.out.println("forEach: " + s));

// map:     d2
// filter:  D2
// map:     a2
// filter:  A2
// forEach: A2
// map:     b1
// filter:  B1
// map:     b3
// filter:  B3
// map:     c
// filter:  C

你可能已经猜到,对于底层集合中的每个字符串,mapfilter都被调用了五次,而forEach只被调用了一次。

如果我们改变操作的顺序,将filter移到链的开头,我们可以大大减少实际的执行次数:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

// filter:  d2
// filter:  a2
// map:     a2
// forEach: A2
// filter:  b1
// filter:  b3
// filter:  c

现在,map只被调用了一次,所以,当输入元素非常多时,这种链式操作的执行速度会更快。在组合复杂的方法链时,请记住这一点。

让我们通过额外添加一个sorted的操作来扩展上面的示例:

Stream.of("d2", "a2", "b1", "b3", "c")
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

排序是一种特殊的中间操作。它是一种所谓的有状态操作,因为为了对元素集合进行排序,你必须在排序过程中维护状态。

执行上面的Demo,控制台输出:

sort:    a2; d2
sort:    b1; a2
sort:    b1; d2
sort:    b1; a2
sort:    b3; b1
sort:    b3; d2
sort:    c; b3
sort:    c; d2
filter:  a2
map:     a2
forEach: A2
filter:  b1
filter:  b3
filter:  c
filter:  d2

首先,所有的输入集合都执行了排序操作。换句话说,排序是横向执行的。因此,在这种情况下,排序被调用了八次,因为集合中的每个元素有多种组合。

再次通过调整链的结构来优化性能:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

// filter:  d2
// filter:  a2
// filter:  b1
// filter:  b3
// filter:  c
// map:     a2
// forEach: A2

调整后,sorted一次都没有被调用,因为filter操作将集合的元素过滤到只有一个。性能的到了巨大的提升。

重用Stream

Java 8 Stream不能被重复调用,当你调用任意一个终端操作时,Stream就会被关闭:

Stream<String> stream =
    Stream.of("d2", "a2", "b1", "b3", "c")
        .filter(s -> s.startsWith("a"));

stream.anyMatch(s -> true);    // ok
stream.noneMatch(s -> true);   // exception

在同一个流上先调用 anyMatch 再调用 noneMatch 会导致以下异常:

java.lang.IllegalStateException: stream has already been operated upon or closed
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
	at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
	at com.winterbe.java8.Streams5.test7(Streams5.java:38)
	at com.winterbe.java8.Streams5.main(Streams5.java:28)

为了绕过这种限制,我们必须对每个终端操作创建一个新的流。例如,我们可以创建一个Supplier<Stream>以构造一个新的Stream,并将所有中间操作都设置好:

Supplier<Stream<String>> streamSupplier =
    () -> Stream.of("d2", "a2", "b1", "b3", "c")
            .filter(s -> s.startsWith("a"));

streamSupplier.get().anyMatch(s -> true);   // ok
streamSupplier.get().noneMatch(s -> true);  // ok

每次调用 get() 都会构造一个新的流,我们可以在其上调用所需的终端操作。

进阶操作

Stream 支持许多不同的操作。我们已经学习了最重要的操作,如filter或map。剩下的留给你们自己去探索(请参阅Stream Javadoc)。现在,让我们深入探讨更复杂的操作collect、flatMap和reduce。

本节的大多数代码都使用下面的 People 结构进行演示:

class Person {
    String name;
    int age;

    Person(String name, int age) {
        this.name = name;
        this.age = age;
    }

    @Override
    public String toString() {
        return name;
    }
}

List<Person> persons =
    Arrays.asList(
        new Person("Max", 18),
        new Person("Peter", 23),
        new Person("Pamela", 23),
        new Person("David", 12));

Collect

collect 是一个非常有用的终端操作,可以将流中的元素转换为不同类型的结果,例如ListSetMapcollect接受一个Collector,它由四个不同的操作组成:supplieraccumulatorcombinerfinisher。这听起来一开始非常复杂,但好消息是Java 8通过Collectors类支持各种内置收集器。对于最常见的操作,不需要自己实现收集器。

让我们看一个最常见的例子:

List<Person> filtered =
    persons
        .stream()
        .filter(p -> p.name.startsWith("P"))
        .collect(Collectors.toList());

System.out.println(filtered);    // [Peter, Pamela]

如你所见,当你需要一个Set而不是List时 - 只需要使用 Collectors.toSet().

下一个例子,通过People.ageList<People>进行分组:

Map<Integer, List<Person>> personsByAge = persons
    .stream()
    .collect(Collectors.groupingBy(p -> p.age));

personsByAge
    .forEach((age, p) -> System.out.format("age %s: %s\n", age, p));

// age 18: [Max]
// age 23: [Peter, Pamela]
// age 12: [David]

Collectors的用途非常广。可以在Stream的元素上执行聚合,例如,计算所有人员的平均年龄:

Double averageAge = persons
    .stream()
    .collect(Collectors.averagingInt(p -> p.age));

System.out.println(averageAge);     // 19.0

如果想要更详细的统计数据,那么Collectors.summarizing将返回一个特殊的包含统计数据对象。因此,我们可以轻松确定人们的最小、最大和平均年龄,以及总和和计数。

IntSummaryStatistics ageSummary =
    persons
        .stream()
        .collect(Collectors.summarizingInt(p -> p.age));

System.out.println(ageSummary);
// IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}

在下一个例子里,我们会把List<People>转换并组合成一个字符串:

String phrase = persons
    .stream()
    .filter(p -> p.age >= 18)
    .map(p -> p.name)
    .collect(Collectors.joining(" and ", "In Germany ", " are of legal age."));

System.out.println(phrase);
// In Germany Max and Peter and Pamela are of legal age.

Collectors.joining需要一个分隔符,以及一个可选的前缀和后缀。

为了把Stream转换为一个Map,我们必须指定对keys和values的映射逻辑。要注意的是,映射出来的keys必须是唯一的,否则会抛出IllegalStateException。也可以传入一个合并函数(merge function)将多个values合并为一个,以避免抛出异常。

Map<Integer, String> map = persons
    .stream()
    .collect(Collectors.toMap(
        p -> p.age,
        p -> p.name,
        (name1, name2) -> name1 + ";" + name2));

System.out.println(map);
// {18=Max, 23=Peter;Pamela, 12=David}

我们了解了一些强大的内置collectors,现在我们来创建一个自己的collector。

我们希望把流中的所有人名转换成一个字符串,这个字符串由所有大写字母组成,用 | 管道符号分隔。为了实现这一目标,我们通过 Collector.of() 创建了一个新的Collector。我们需要实现Collector的四个组成部分:supplieraccumulatorcombinerfinisher

  • supplier 创建一个新的容器存放结果
  • accumulator 将一个新的数据元素合并到容器中
  • combiner 将两个容器合并成一个
  • finisher 对容器执行一个可选的最终转换
Collector<Person, StringJoiner, String> personNameCollector =
    Collector.of(
        () -> new StringJoiner(" | "),          // supplier
        (j, p) -> j.add(p.name.toUpperCase()),  // accumulator
        (j1, j2) -> j1.merge(j2),               // combiner
        StringJoiner::toString);                // finisher

String names = persons
    .stream()
    .collect(personNameCollector);

System.out.println(names);  // MAX | PETER | PAMELA | DAVID

String在java中是不可变的,我们需要一个助手StringJoiner来帮助我们收集我们的字符串。

supplier提供一个使用|分割符的StringJoiner

accumulator将每个People的姓名转换为大写,并添加到StringJoiner中。

combiner提供了将2个StringJoiner合并为一个的方法。

最后一步,finisherStringJoiner转换为我们需要的String类型

FlatMap

我们已经学会如何利用 map 操作将流中的对象转换为另一种对象。然而,map 有一定的局限性,因为每个对象只能映射到一个其他对象。但是,如果我们想将一个对象转换为多个对象或者不转换怎么办?这就是 flatMap 的用武之地。

flatMap 将流的每个元素转换为其他对象的流。因此,每个对象将被转换为由流支持的零个、一个或多个其他对象。这些流的数据将被放置到 flatMap 操作返回的流中。

在我们看到 flatMap 的实际应用之前,我们需要一个合适的类型层次结构:

class Foo {
    String name;
    List<Bar> bars = new ArrayList<>();

    Foo(String name) {
        this.name = name;
    }
}

class Bar {
    String name;

    Bar(String name) {
        this.name = name;
    }
}

接下来,我们利用我们对流的知识来实例化一些对象:

List<Foo> foos = new ArrayList<>();

// create foos
IntStream
    .range(1, 4)
    .forEach(i -> foos.add(new Foo("Foo" + i)));

// create bars
foos.forEach(f ->
    IntStream
        .range(1, 4)
        .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));

现在我们有一个包含三个 foo 对象的列表,每个 foo 对象都包含三个 bar 对象。

flatMap 方法接受一个函数,该函数必须返回一个Stream对象:

foos.stream()
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));

// Bar1 <- Foo1
// Bar2 <- Foo1
// Bar3 <- Foo1
// Bar1 <- Foo2
// Bar2 <- Foo2
// Bar3 <- Foo2
// Bar1 <- Foo3
// Bar2 <- Foo3
// Bar3 <- Foo3

如你所见,我们成功把包含3个 foo 对象的流转换成了包含9个 bar 对象的流

最后,上面的代码可以简化整合为一个Stream的调用链:

IntStream.range(1, 4)
    .mapToObj(i -> new Foo("Foo" + i))
    .peek(f -> IntStream.range(1, 4)
        .mapToObj(i -> new Bar("Bar" + i + " <- " f.name))
        .forEach(f.bars::add))
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));

在 Java 8 中引入的 Optional 类中也提供了 flatMap 方法。OptionalflatMap 操作返回另一种类型的可选对象。因此,它可以用于避免繁琐的 null 检查。

下面是一个嵌套的数据结构,例如:

class Outer {
    Nested nested;
}

class Nested {
    Inner inner;
}

class Inner {
    String foo;
}

为了解析Outer实例的内部字符串 foo,必须添加多次检查 null 以避免出现 NullPointerException:

Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) {
    System.out.println(outer.nested.inner.foo);
}

可以利用 Optional 类的 flatMap 操作来得到相同的效果:

Optional.of(new Outer())
    .flatMap(o -> Optional.ofNullable(o.nested))
    .flatMap(n -> Optional.ofNullable(n.inner))
    .flatMap(i -> Optional.ofNullable(i.foo))
    .ifPresent(System.out::println);

每次调用 flatMap 方法都会返回一个 Optional 对象,如果对象存在,则 Optional 包装该对象;如果对象不存在,则返回 Optional.empty()

Reduce

Reduce 将Stream中的所有元素合并为一个结果。Java 8 支持三种不同类型的Reduce方法。第一种方法将Stream中的元素减少为Stream中的一个元素。让我们看看如何使用这种方法来找到年纪最大的人:

persons
    .stream()
    .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
    .ifPresent(System.out::println);    // Pamela

reduce方法接受一个BinaryOperator累加器函数。实际上,这是一个BiFunction,其中两个参数是相同的类型,即PersonBiFunction类似于Function,但接受两个参数。上面的例子比较两个人的年龄,返回年龄大的人。

第二个reduce方法接受一个初始值和BinaryOperator累加器。这个方法可以用于从Stream中的所有其他人构造一个新的Person,包含了聚合的名称和年龄。

Person result =
    persons
        .stream()
        .reduce(new Person("", 0), (p1, p2) -> {
            p1.age += p2.age;
            p1.name += p2.name;
            return p1;
        });

System.out.format("name=%s; age=%s", result.name, result.age);
// name=MaxPeterPamelaDavid; age=76

第三种reduce方法接受三个参数:一个初始值,一个类型为BiFunction的累加器和一个类型为BinaryOperator的组合函数。由于初始值的类型不限于Person类型,因此我们可以利用这种reduce方法来确定所有人的年龄总和:

Integer ageSum = persons
    .stream()
    .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);

System.out.println(ageSum);  // 76

可以看到结果是76,但是具体为什么是这个结果?让我们通过一些Debug输出来调试代码:

Integer ageSum = persons
    .stream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; person=Max
// accumulator: sum=18; person=Peter
// accumulator: sum=41; person=Pamela
// accumulator: sum=64; person=David

正如所看到的,accumulator函数完成了所有的操作。首先使用初始值0和第一个人Max调用该函数。在接下来的三个步骤中,总和不断地增加,直到达到76岁的年龄之和。

等等,什么?combiner从未被调用?我们用相同的流再试一次,但这次我们用并行执行的parallelStream

Integer ageSum = persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; person=Pamela
// accumulator: sum=0; person=David
// accumulator: sum=0; person=Max
// accumulator: sum=0; person=Peter
// combiner: sum1=18; sum2=23
// combiner: sum1=23; sum2=12
// combiner: sum1=41; sum2=35

在并行流上执行产生了完全不同的行为。现在会调用combiner。由于accumulator是并行调用的,因此需要combiner来合并单独累加的值。

让我们在深入探讨parallelStream

Parallel Streams

流可以以并行方式执行,这可以会提高大量元素输入时的运行性能。并行流使用通过静态ForkJoinPool.commonPool()方法获取到公共ForkJoinPool。底层线程池的大小最多使用X个线程,具体情况取决于可用的物理CPU核心数量:

ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());    // 3
//int parallelism = Math.max(1, Runtime.getRuntime().availableProcessors() - 1);

在我的机器上,默认情况下commonPool的并行线程数为3。可以通过设置以下JVM参数来减少或增加这个值:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

集合类型支持parallelStream()方法来创建parallelStream。或者,可以在给定的流上调用中间方法parallel()来将顺序流转换为parallelStream

为了更好地演示parallelStream的执行行为,下面的示例会将当前线程的信息打印到控制台:

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));

通过分析调试输出,我们能够更好地理解并行流中线程到底是如何执行的:

filter:  b1 [main]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  c2 [ForkJoinPool.commonPool-worker-3]
map:     c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map:     b1 [main]
forEach: B1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-3]
map:     a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]

如你所见,parallel stream利用来自ForkJoinPool中所有可用线程来执行流的操作。输出可能在每次运行中有所不同,因为实际使用哪个特定线程的行为是不确定的。

让我们通过添加一个额外的流操作,sort

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .sorted((s1, s2) -> {
        System.out.format("sort: %s <> %s [%s]\n",
            s1, s2, Thread.currentThread().getName());
        return s1.compareTo(s2);
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));

执行结果可能看起来会有些奇怪:

filter:  c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  b1 [main]
map:     b1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-2]
map:     a1 [ForkJoinPool.commonPool-worker-2]
map:     c2 [ForkJoinPool.commonPool-worker-3]
sort:    A2 <> A1 [main]
sort:    B1 <> A2 [main]
sort:    C2 <> B1 [main]
sort:    C1 <> C2 [main]
sort:    C1 <> B1 [main]
sort:    C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]

看起来sort只在主线程上按顺序执行。实际上,对并行流进行排序在背后使用了新的Java 8方法Arrays.parallelSort()。根据Javadoc中的说明,此方法根据数组的长度决定是否顺序或并行排序:

如果指定数组的长度小于最小粒度,则使用适当的Arrays.sort方法进行排序。

回到上一节中的reduce示例中,我们已经发现组合函数仅在并行流中而不是在顺序流中调用。让我们看看实际涉及的线程:

List<Person> persons = Arrays.asList(
    new Person("Max", 18),
    new Person("Peter", 23),
    new Person("Pamela", 23),
    new Person("David", 12));

persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s [%s]\n",
                sum, p, Thread.currentThread().getName());
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
                sum1, sum2, Thread.currentThread().getName());
            return sum1 + sum2;
        });

控制台输出显示,accumulatorcombiner函数都在所有可用线程上并行执行:

accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max;    [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David;  [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter;  [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=18; sum2=23;     [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=23; sum2=12;     [ForkJoinPool.commonPool-worker-2]
combiner:    sum1=41; sum2=35;     [ForkJoinPool.commonPool-worker-2]

总之,可以说并行流可以为具有大量输入元素的流带来良好的性能提升。但是请记住,一些并行流操作(如reducecollect)需要额外的计算(组合操作),而这些在顺序执行时是不需要的。

此外,我们已经知道了所有并行流操作共享同一个JVM范围内的公共ForkJoinPool。因此,尽可能避免在函数中执行慢操作来阻塞流,因为这可能会潜在地减慢依赖并行流的应用程序的其他部分的速度。

That’s it

我的Java 8 Stream编程指南到此结束。如果你想了解更多关于Java 8 Stream的内容,我的建议是查看Stream Javadoc包文档。希望本教程对你有所帮助,并且你享受阅读它的过程。

Happy coding!


【转载】Java 8 Stream 教程
https://coding.gs/2023/06/01/java8-stream-教程/
作者
K
发布于
2023年6月1日
许可协议