Java 8实战:流处理
流
是一系列数据项,一次只生成一项。程序可以从输入流中一个一个读取数据项,然后以同样的方式将数据项写入输出流。一个程序的输出流很可能是另一个程序的输入流。基于流
的思想,Java 8
在 $java.util.stream$ 中添加了一个Stream API
。 $Stream$<$T$> 就是一系列 $T$ 类型的项目。
和Collection API
相比,Stream API
处理数据的方式非常不同。用集合的话,需要使用 $for$ 循环迭代并处理元素,我们称之为外部迭代
。相反,Stream API
的数据处理完全是在库内部进行的,我们称之为内部迭代
。虽然都能访问数据项目的序列,但是相比之下,Collection API
主要是为了存储和访问数据,而Stream API
主要用于描述对数据的计算。这里的关键点在于,Stream API
允许并行处理一个 $Stream$ 中的元素。筛选一个 $Collection$ 最快的方式通常是将其转换为 $Stream$ ,进行并行处理,再转换回 $List$ 。粗略地说,流与集合之间的差异就在于什么时候进行计算。集合是一个内存中的数据结构,包含数据结构中目前所有的值,集合中的元素只有经过计算后才能添加;相比之下,流是在概念上固定的数据结构,不能通过流添加或删除元素,流中的元素是按需计算的,即只从流中提取出需要的值。与迭代器类似,流只能遍历一次。遍历完之后,我们就说这个流已经被消费掉了。要想重新遍历,可以从原始数据源中重新获取一个新的流。尝试再次遍历一个已经消费的流会抛出 $IllegalStateException$ 异常。流与集合的另一个关键区别在于遍历数据的方式,正如之前所说,使用 $Collection$ 接口需要用户进行外部迭代,而 $Steams$ 库使用的是内部迭代,即库自动完成迭代,并将流值存放在某个地方。相较于显式的外部迭代,内部迭代下项目可以透明地并行处理,或者用更优化的顺序进行处理,例如同时处理多个数据,或者优先处理某些数据等。
1. 流操作
操作 | 类型 | 返回类型 | 操作参数 | 函数描述符 | 目的 |
---|---|---|---|---|---|
$filter$ | 中间 | $Stream$<$T$> | $Predicate$<$T$> | $T \rightarrow boolean$ | 返回一个包含所有符合谓词的元素的流 |
$distinct$ | 中间 | $Stream$<$T$> | 返回一个元素各异的流 | ||
$map$ | 中间 | $Stream$<$R$> | $Function$<$T, R$> | $T \rightarrow R$ | 将函数应用到每个元素上,并映射成一个新元素 |
$flatMap$ | 中间 | $Stream$<$R$> | $Function$<$T,R$> | $T \rightarrow R$ | 将多个生成流扁平化为单个流 |
$limit$ | 中间 | $Stream$<$T$> | 返回一个不超过给定长度的流 | ||
$skip$ | 中间 | $Stream$<$T$> | 返回一个跳过给定数量元素的流 | ||
$sorted$ | 中间 | $Stream$<$T$> | $Comparator$<$T$> | $(T, T) \rightarrow int$ | 返回一个经过排序的流 |
$forEach$ | 终端 | $void$ | $Consumer$<$T$> | $T \rightarrow void$ | 消费流中的每一个元素并对其应用Lambda ,返回类型为 $void$ |
$count$ | 终端 | $long$ | 返回流中元素的个数,返回类型为 $long$ | ||
$collect$ | 终端 | $R$ | $Collector$<$T, A, R$> | 把流归约成一个集合并返回 | |
$anyMatch$ | 终端 | $boolean$ | $Predicate$<$T$> | $T \rightarrow boolean$ | 流中是否含有一个元素能匹配给定的谓词 |
$allMatch$ | 终端 | $boolean$ | $Predicate$<$T$> | $T \rightarrow boolean$ | 流中的元素是否都能匹配给定的谓词 |
$noneMatch$ | 终端 | $boolean$ | $Predicate$<$T$> | $T \rightarrow boolean$ | 流中是否没有元素与给定的谓词匹配 |
$findAny$ | 终端 | $Optional$<$T$> | 返回当前流中的任意元素 | ||
$findFirst$ | 终端 | $Optional$<$T$> | 返回当前流中第一个元素 | ||
$reduce$ | 终端 | $Optional$<$T$> | $BinaryOperator$<$T$> | $(T, T) \rightarrow T$ | 对流中元素重复应用方法 |
诸如 $filter$ 或 $Sorted$ 等操作会返回另一个流,这让多个操作可以连接起来形成一个查询,这种操作称为中间操作
。除非流水线上触发一个终端操作,否则中间操作不会执行任何处理。终端操作
会从流的流水线生成结果,其结果是任何不是流的值,比如 $List$ 、$Integer$ 等。总而言之,流的使用一般包括三件事:一个数据源、一个中间操作链和一个终端操作。
诸如 $map$ 或 $filter$ 等操作会从输入流中获取每一个元素,并在输出流中得到 $0$ 或 $1$ 个结果。这些操作一般都是无状态操作
。但诸如 $reduce$ 、$sum$ 、$max$ 等操作需要内部状态来累计结果,这些操作就是有状态操作
。但是这些操作的内部状态很小,通常只是一些基本数据类型,不管流中有多少元素需要处理,内部状态都是有界的。而诸如 $sorted$ 或 $distinct$ 等操作,它们的内部状态可能很大,这时就要求无界的存储空间。
$peek$ 是一个特殊的流操作,可以在流的每个元素恢复运行之前,插入执行一个动作。$peek$ 不会恢复流的运行,而是在一个元素上完成操作之后,将操作顺承到流水线中的下一个操作。
Java 8
引入了三个原始类型特化流接口 $IntStream$ 、$DoubleStream$ 和 $LongStream$ ,分别将流中的元素特化为 $int$ 、$long$ 和 $double$ ,从而避免了暗含的装箱成本。每个接口都带来了进行常用数值归约的新方法,比如对数值流求和的 $sum$ ,此外还可以在必要时将它们转换回对象流。将流转换为特化版本的常用方法是 $mapToInt$ 、$mapToDouble$ 和 $mapToLong$ 。这些方法和 $map$ 的工作方式一样,但是会返回特化流。如果想要将特化流转换为对象流,可以使用 $boxed$ 方法。类似的,也可以使用 $mapToObj$ 方法生成对象流。
Java 8
还引入了两个可以用于 $IntStream$ 和 $LongStream$ 的静态方法,帮助生成数值范围,分别是 $range$ 和 $rangeClosed$ ,前者为开区间,后者为闭区间。
除了调用集合中的 $stream$ 方法之外,还有一些其他的构造流的方式:
- $Stream.of$ :通过显式值创建一个流;
- $Arrays.stream$ :从数组创建一个流;
- $Files.lines$ :由指定文件中的各行构成的字符串流;
- $Stream.iterate$ :从函数创建一个无限流,接收一个初始值;
- $Stream.generate$ :接收一个 $Supplier$<$T$> 类型参数,创建一个无限流;
- $Stream.empty$ :创建一个空流。
2. Optional
$Optional$ 是Java 8
中引入的一个新类。当变量存在时,$Optional$ 只是对类的简单封装。变量不存在时,缺失的值会被建模成一个空的 $Optional$ 对象,由方法 $Optional.empty(\ )$ 返回,该方法为一个静态工厂方法,返回 $Optional$ 类的特定单一实例。引入 $Optional$ 的意图并非要消除每一个 $null$ 引用,其目的是帮助设计出一个更加普适的API
。$Optional$ 提供了几种可以迫使你显式地检查值是否存在或处理值不存在的情形的方法:
- $isPresent(\ )$ :包含值时返回 $true$ ;
- $isPresent(Consumer$<$T$> $block)$ :值存在时执行给定代码块;
- $get(\ )$ :值存在时返回值,否则抛出 $NoSuchElement$ 异常;
- $orElse(T\ other)$ :值存在时返回值,否则返回默认值;
- $orElsetGet(Supplier$<$?\ extends\ T$> $other)$ :当值不存在时会调用 $Supplier$ 对象并返回;
- $orElseThrow(Supplier$<$?\ extends\ X$> $exceptionSupplier)$ :当值不存在时会抛出 $Supplier$ 指定的异常类型;
- $ifPresent(Consumer$<$?\ super\ T$>$)$ :能在变量值存在时执行一个作为参数传入的方法,否则不进行任何操作。
创建 $Optional$ 对象的方式很简单,可以通过 $Optional.empty(\ )$ 声明一个空的 $Optional$ 对象,也可以通过 $Optional.of(\ )$ 方法通过一个非空值创建对象。如果不知道传入的对象是否为空值,可以使用 $Optional.ofNullable(\ )$ 方法,使用该方法构造的对象如果参数为 $null$ ,那么会返回一个空的 $Optional$ 对象。
一般情况下,当我们访问一个可以为空的对象值前,需要先判断其是否为空。$Optional.map(\ )$ 方法会将提供的函数应用于当前值,如果当前 $Optional$ 非空,那么将该值作为参数传递给 $map$ ,反之则什么也不做。如果在一个对象上连续多次调用 $map$ 方法,可能会出现嵌套式的 $Optional$ 对象,此时我们可以使用 $Optional.flatMap(\ )$ 方法。类似于流,该方法将嵌套式的 $Optional$ 对象合并为一个。
当需要调用某个对象的方法,查看某些属性时,可以使用 $filter$ 方法。该方法接受一个谓词作为参数,如果对象值存在并且符合谓词条件,$filter$ 就不做任何改变,否则就返回一个空的 $Optional$ 对象。
与 $Stream$ 一样,$Optional$ 也提供了类似的基础类型 $OptionalInt$ 、$OptionalLong$ 以及 $OptionalDouble$ 。对于 $Stream$ 而言,因为其中可能包含大量的元素,出于性能考虑,我们最好使用基础类型。但对于只有单个元素的 $Optional$ ,这个理由就不成立了。基础类型的 $Optional$ 并不支持 $map$ 、$flatMap$ 以及 $filter$ 方法,同时也不能作为方法引用传递给另一个 $flatMap$ 方法,所以并不推荐使用。
3. Collector
$Collector$ 用于定义 $collect$ 方法用来生成结果集合的标准。更具体地说,对流调用 $collect$ 方法将对流中的元素触发一个归约操作。一般来说,$Collector$ 会对元素应用一个转换函数,并将结果累积在一个数据结构中。$Collector$ 接口中方法的实现决定了如何对流执行归约操作, 而 $Collectors$ 实用类中提供了很多静态工厂方法,可以方便地创建常见收集器的实例:
- $toList$ :把流中的所有项目收集到一个 $List$ ;
- $toSet$ :把流中的所有项目收集到一个 $Set$ ,删除重复项;
- $toCollection$ :把流中所有项目收集到给定的供应源创建的集合,需要额外指定一个集合创建方法;
- $counting$ :返回流中元素个数,更直接地,也可以使用 $Stream.count$ 方法;
- $maxBy$ :接收一个 $Comparator$ ,返回流中最大值;
- $minBy$ :接收一个 $Comparator$ ,返回流中最小值;
- $summingInt$ / $summingLong$ / $summingDouble$ :接收一个把对象映射为 $int$ / $long$ / $double$ 的函数,并返回总和;
- $averagingInt$ / $averagingLong$ / $averagingDouble$ :接收一个把对象映射为 $int$ / $long$ / $double$ 的函数,并返回平均值;
- $summarizingInt$ / $summarizingLong$ / $summarizingDouble$ :接收一个把对象映射为 $int$ / $long$ / $double$ 的函数,并返回一个 $IntSummaryStatistics$ / $LongSummaryStatistics$ / $DoubleSummaryStatistics$ 对象,包含流中元素个数、最大值、最小值、总和和平均值信息;
- $joining$ :对流中的每一个对象应用 $toString$ 方法,并将结果串连成一个字符串,允许接收一个分界符;
- $groupingBy$ :接收两个函数,前者为键值分类函数,后者为收集器类型的收集函数,并返回一个 $Map$ ,其中第二个参数可以省略,默认为 $groupingBy(f,\ toList(\ ))$ ;
- $partitionBy$ :接收一个 $Predicate$<$T$> 类型参数和一个转换函数,返回一个键值为 $Boolean$ 的 $Map$ ,其中第二个参数可以省略,默认为 $partitionBy(f,\ toList(\ ))$ ;
$reducing$ 方法允许定义一个更一般化的归约过程,它接收三个参数:初始值、转换函数和一个 $BinaryOperator$ ,用于将两个项目累积成一个同类型的值。此外还有一个单参数类型的重载版本,只接收一个 $BinaryOperator$ ,以第一个项目作为起点,把恒等函数作为转换函数。
在收集的时候,由于调用的方法的原因,可能会返回 $Optional$ 类型的参数,而这往往不是我们想要的,这时候就可以使用 $collectingAndThen$ 方法,可以额外接收一个转换函数,将 $Optional$ 类型进行转换。
如果要自定义收集器,需要使用 $Collector$ 接口:
public interface Collector<T, A, R> {
Supplier<A> supplier();
BiConsumer<A, T> accumulator();
Function<A, R> finisher();
BinaryOperator<A> combiner();
Set<Characteristics> characteristics();
}
其中 $T$ 是流要收集的项目的泛型,$A$ 是累加器的类型,$R$ 是收集操作得到的对象的类型。$supplier$ 方法返回一个结果为空的 $Supplier$ ,创建一个空的累加器实例;$accumulator$ 方法返回执行归约操作的函数;$finisher$ 方法返回在累积过程的最后调用的函数,以便将累加器对象转换为整个集合操作的最终结果;$combiner$ 方法返回一个供归约操作使用的函数,定义了对流的各个子部分进行并行处理时如何合并累加器结果;$characteristics$ 方法会返回一个不可变的 $Characteristics$ 集合,定义了收集器的行为,是一个包含了三个项目的枚举:$UNORDERED$ ( 结果不受流中项目的遍历和累计顺序的影响 )、$CONCURRENT$ ( $accumulator$ 可以多线程调用 )、$IDENTITY_-FINISH$ ( $finisher$ 返回恒等函数,可以跳过 )。以下为 $ToListCollector$ 类型实例,它将流转换为 $List$ :
public class ToListCollector<T> implements Collector<T, List<T>, List<T>> {
@Override
public Supplier<List<T>> supplier() {
return ArrayList::new;
}
@Override
public BiConsumer<List<T>, T> accumulator() {
return List::add;
}
@Override
public Function<List<T>, List<T>> finisher() {
return Function.identity(); // 恒等函数
}
@Override
public BinaryOperator<List<T>> combiner() {
return (list1, list2) -> {
list1.addAll(list2);
return list1;
};
}
@Override
public Set<Characteristics> characteristics() {
return Collections.unmodifiableSet(
EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.CONCURRENT));
}
}
除了实现 $Collector$ 之外,自定义收集操作也可以通过调用一个重载版本的 $collect$ 方法完成。该方法接收 $supplier$ 、$accumulator$ 和 $combiner$ 三个函数,并且永远都是一个 $IDENTITY_-FINISH$ 和 $CONCURRENT$ 但非 $UNORDERED$ 的收集器。
4. 并行流
并行流
就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流。通过对收集源调用 $parallelStream$ 方法,可以把集合转换为并行流。也可以通过在顺序流上调用 $parallel$ 方法来转换为并行流,调用该方法后流本身并不会有任何实际的变化,在内部实际上只设置了一个 $boolean$ 标志位。类似的,也可以在并行流上调用 $sequential$ 方法来将它转换为顺序流。
并行化的过程本身需要对流做递归划分,把每个子流的归纳操作分配到不同的线程,然后把这些操作的结果合并成一个值。但在多个内核之间移动数据的代价也可能很大,所以很重要的一点是要保证在内核中并行执行工作的时间比在内核之间传输数据的时间长。总而言之,很多情况下不可能或不方便并行化。一般而言,想给出任何关于什么时候该用并行流的定量建议都是不可能也毫无意义的。
- 在将顺序流转换为并行流时,对于存在疑问的转换,可以先测试;
- 自动装箱和拆箱操作会大大降低性能,对于存在大量这种操作的流,应该尽可能地使用原始类型特化流;
- 有些操作例如 $limit$ 和 $findFirst$ 等依赖于元素顺序的操作,在并行流上的性能就比顺序流差;
- 对于处理成本较高的操作,使用并行流时性能更好的可能性较大;
- 避免使用并行流处理数据量较小的集合;
- 考虑流背后的数据结构是否易于分解,例如 $ArrayList$ 、$range$ 、$HashSet$ 的拆分效率较好,而 $LinkedList$ 、$iterate$ 的拆分效率则很差;
- 考虑流自身的特点和流水线中间操作修改流的方式,例如筛选操作丢弃的元素个数无法预测,因而难以将流划分为大小近似的部分;
- 考虑终端操作中合并代价,避免合并代价大于使用并行流带来的性能提升。
并行流的背后使用的是Java 7
中引入的分支/合并框架
,目的是以递归方式将可以并行的任务拆分成更小的任务,然后把每个子任务的结果合并起来生成整体结果。它是 $ExecutorService$ 接口的一个实现,它把子任务分配给线程池 ( 称为 $ForJoinPool$ ) 中的工作线程。
要把任务提交到该线程池,必须创建 $RecursiveTask$<$R$>的一个子类,其中 $R$ 是并行化任务产生的结果,如果任务不返回结果,则是 $RecursiveTask$ 类型。要定义 $RecursiveTask$ ,只需要实现它唯一的抽象方法 $compute$ 。$compute$ 方法定义了将任务拆分为子任务的逻辑,以及无法再拆分或不方便再拆分时生成单个子任务结果的逻辑。一般来说并没有确切的标准决定一个任务是否应该再拆分。分支/合并框架中使用了工作窃取
( $work\ stealing$ ) 技术,每个线程都为分配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列中取出下一个任务,提前完成了队列中的所有任务,则会随机选择一个其他线程的队列,从中获取任务。由于工作窃取,因此将原任务分为大量的小任务一般来说都是一个好的选择。
$Spliterator$ 是 Java 8
中加入的另一个新接口,用于并行遍历数据源中的元素。同时Java 8
为集合框架中包含的所有数据结构提供了一个默认的 $Spliterator$ 实现。
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action);
Spliterator<T> trySplit();
long estimateSize();
int characteristics();
}
$tryAdvance$ 的行为类似于普通的 $Iterator$ ,它会按顺序依次遍历使用 $Spliterator$ 的元素,并且如果还有其他元素需要遍历则返回 $true$ 。$trySplit$ 可以将一些元素划分给另一个 $Spliterator$ ,让它们两并行处理,将 $Stream$ 拆分为多个部分的算法是一个递归过程,即不断地调用 $trySplit$ 方法直到不可划分。$estimateSize$ 方法用于估计剩余的需要遍历的元素个数,对于已知大小的源,这个数字是准确的。