从呼吁式编程到Fork/Join再到Java 8中的并行Streams
当前位置:以往代写 > JAVA 教程 >从呼吁式编程到Fork/Join再到Java 8中的并行Streams
2019-06-14

从呼吁式编程到Fork/Join再到Java 8中的并行Streams

从呼吁式编程到Fork/Join再到Java 8中的并行Streams

副标题#e#

Java 8带来了许多可以使编码更简捷的特性。譬喻,像下面的代码:

Collections.sort(transactions, new Comparator<Transaction>(){
  public int compare(Transaction t1, Transaction t2){
    return t1.getValue().compareTo(t2.getValue());
  }
});

可以用替换为如下更为紧凑的代码,成果沟通,可是读上去与问题语句自己更靠近了:

transactions.sort(comparing(Transaction::getValue));

Java 8引入的主要特性是Lambda表达式、要领引用和新的Streams API。它被认为是自20年前Java降生以来语言方面变革最大的版本。要想通过具体且实际的例子来相识如何从这些特性中获益,可以参考本文作者和Alan Mycroft配合编写的《Java 8 in Action: Lambdas, Streams and Functional-style programming》一书。

这些特性支持措施员编写更简捷的代码,还使他们可以或许受益于多核架构。实际上,编写可以优雅地并行执行的措施照旧Java专家们的特权。然而,借助新的Streams API,Java 8改变了这种状况,让每小我私家都可以或许更容易地编写操作多核架构的代码。

在这篇文章中,我们将利用以下三种气势气魄,以差异要领计较一个大数据集的方差,并加以比拟。

呼吁式气势气魄

Fork/Join框架

Streams API

方差是统计学中的观念,用于怀抱一组数的偏离水平。方差可以通过对每个数据与平均值之差的平方和求平均值来计较。譬喻,给定一组暗示人口年数的数:40、30、50和80,我们可以这样计较方差:

计较平均值:(40 + 30 + 50 + 80) / 4 = 50

计较每个数据与平均值之差的平方和:(40-50)2 + (30-50)2 + (50-50)2 + (80-50)2 = 1400

最后平均:1400/4 = 350

呼吁式气势气魄

下面是计较方差的一种典范的呼吁式气势气魄实现:

public static double varianceImperative(double[] population){
   double average = 0.0;
   for(double p: population){
      average += p;
   }
   average /= population.length;

   double variance = 0.0;
   for(double p: population){
     variance += (p - average) * (p - average);
   }
   return variance/population.length;
}

为什么说这是呼吁式的呢?我们的实现用修改状态的语句序列描写了计较进程。这里,我们显式地对人口年数数组中的每个元素举办迭代,并且每次迭代时更新average和variance这两个局部变量。这种代码很适合只有一个CPU的硬件架构。确实,它可以很是直接地映射到CPU的指令集。

查察本栏目


#p#副标题#e#

Fork/Join框架

那么,如何编写适合在多核架构上执行的实现代码呢?应该利用线程吗?这些线程是不是要在某个点上同步?Java 7引入的Fork/Join框架缓解了一些坚苦,所以让我们利用该框架来开拓方差算法的一个并行版本吧。

public class ForkJoinCalculator extends RecursiveTask<Double> {

   public static final long THRESHOLD = 1_000_000;

   private final SequentialCalculator sequentialCalculator;
   private final double[] numbers;
   private final int start;
   private final int end;

   public ForkJoinCalculator(double[] numbers, SequentialCalculator sequentialCalculator) {
     this(numbers, 0, numbers.length, sequentialCalculator);
   }

   private ForkJoinCalculator(double[] numbers, int start, int end, SequentialCalculator 
sequentialCalculator) {
     this.numbers = numbers;
     this.start = start;
     this.end = end;
     this.sequentialCalculator = sequentialCalculator;
   }

   @Override
   protected Double compute() {
     int length = end - start;
     if (length <= THRESHOLD) {
         return sequentialCalculator.computeSequentially(numbers, start, end);
     }
     ForkJoinCalculator leftTask = new ForkJoinCalculator(numbers, start, start + length/2, 
sequentialCalculator);
     leftTask.fork();
     ForkJoinCalculator rightTask = new ForkJoinCalculator(numbers, start + length/2, end, 
sequentialCalculator);
     Double rightResult = rightTask.compute();
     Double leftResult = leftTask.join();
     return leftResult + rightResult;
  }
}

这里我们编写了一个RecursiveTask类的子类,它对一个double数组举办切分,当子数组的长度小于便是给定阈值(THRESHOLD)时遏制切分。切分完成后,对子数组举办顺序处理惩罚,并将下列接口界说的操纵应用于子数组。

public interface SequentialCalculator {
  double computeSequentially(double[] numbers, int start, int end);
}

操作该基本设施,可以按如下方法并行计较方差。

 public static double varianceForkJoin(double[] population){
   final ForkJoinPool forkJoinPool = new ForkJoinPool();
   double total = forkJoinPool.invoke(new ForkJoinCalculator
(population, new SequentialCalculator() {
     @Override
     public double computeSequentially(double[] numbers, int start, int end) {
       double total = 0;
       for (int i = start; i < end; i++) {
         total += numbers[i];
       }
       return total;
     }
  }));
  final double average = total / population.length;
  double variance = forkJoinPool.invoke(new ForkJoinCalculator
(population, new SequentialCalculator() {
    @Override
    public double computeSequentially(double[] numbers, int start, int end) {
      double variance = 0;
      for (int i = start; i < end; i++) {
        variance += (numbers[i] - average) * (numbers[i] - average);
      }
      return variance;
    }
 }));
 return variance / population.length;
}

本质上,即便利用Fork/Join框架,相对付顺序版本,并行版本的编写和最后的调试仍然坚苦很多。

查察本栏目

#p#副标题#e#

并行Streams

#p#分页标题#e#

Java 8让我们可以以差异的方法办理这个问题。差异于编写代码指出计较如何实现,我们可以利用Streams API粗线条地描写让它做什么。作为功效,库可以或许知道如作甚我们实现计较,并施以各类百般的优化。这种气势气魄被称为声明式编程。Java 8有一个为操作多核架构而专门设计的并行Stream。我们来看一下如何利用它们来更快地计较方差。

假定读者对本节探讨的Stream有些相识。作为温习,Stream<T>是T范例元素的一个序列,支持聚合操纵。我们可以利用这些操纵来建设暗示计较的一个管道(pipeline)。这里的管道和UNIX的呼吁管道一样。并行Stream就是一个可以并行执行管道的Stream,可以通过在普通的Stream上挪用parallel()要领得到。要温习Stream,可以参考Javadoc文档。

好动静是,Java 8 API内建了一些算术操纵,如max、min和average。我们可以利用Stream的几种根基范例特化形式来会见前面几个要领:IntStream(int范例元素)、LongStream(long范例元素)和DoubleStream(double范例元素)。譬喻,可以利用IntStream.rangeClosed()建设一系列数,然后利用max()和min()要领计较Stream中的最大元素和最小元素。

回到最初的问题,我们想利用这些操纵来计较一个局限较大的人口年数数据的方差。第一步是从人口年数数组建设一个Stream,可以通过Arrays.stream()静态要领实现:

DoubleStream populationStream = Arrays.stream(population).parallel();

我们可以利用DoubleStream所支持的average()要领:

double average = populationStream.average().orElse(0.0);

下一步是利用average计较方差。人口年数中的每个元素首先需要减去平均值,然后计较差的平方。可以将其视作一个Map操纵:利用一个Lambda表达式(double p) -> (p – average) * (p – average)把每个元素转换为另一个数,这里是转换为该元素与平均值差的平方。一旦转换完成,我们就可以挪用sum()要领来计较所有功效元素的和了。.

不外别那么着急。Stream只能耗损一次。假如复用populationStream,我们会遇到下面这个令人惊奇的错误:

java.lang.IllegalStateException: stream has already been operated upon or closed

所以我们需要利用第二个流来计较方差,如下所示:

public static double varianceStreams(double[] population){
   double average = Arrays.stream(population).parallel().average().orElse(0.0);
   double variance = Arrays.stream(population).parallel()
             .map(p -> (p - average) * (p - average))
             .sum() / population.length;
   return variance;
}

通过利用Streams API内建的操纵,我们以声明式、并且很是简捷的方法重写了最初的呼吁式气势气魄代码,并且声明式气势气魄读上去险些就是方差的数学界说。我们再来研究一下三种实现版本的机能。

基准测试

我们以很是差异的气势气魄编写了三个版本的方差算法。Stream版本是最简捷的,并且是以声明式气势气魄编写的,它让类库去确定详细的实现,并操作多核基本设施。不外你大概想知道它们的执行结果如何。为找出谜底,让我们建设一个基准测试,比拟一下三个版本的表示。我们先随机生成1到140之间的3000万小我私家口年数数据,然后计较其方差。我们利用jmh来研究每个版本的机能。Jmh是OpenJDK支持的一个Java套件。读者可以从GitHub克隆该项目,本身运行基准测试。

基准测试运行的呆板是Macbook Pro,配备2.3 GHz的4核Intel Core i7处理惩罚器,16GB 1600MHz DDR3内存。另外,我们利用的JDK 8版本如下:

java version "1.8.0-ea"
Java(TM) SE Runtime Environment (build 1.8.0-ea-b121)
Java HotSpot(TM) 64-Bit Server VM (build 25.0-b63, mixed mode)

功效用下面的柱状图说明。呼吁式版本用了60毫秒,Fork/Join版本用了22毫秒,而流版本用了46毫秒。

从号令式编程到Fork/Join再到Java 8中的并行Streams

#p#分页标题#e#

这些数据应该审慎看待。好比,假如在32位JVM上运行测试,功效很大概有较大的不同。然而有趣的是,利用Java 8中的Streams API这种差异的编程气势气魄,为在场景背后执行一些优化打开了一扇门,而这在严格的呼吁式气势气魄中是不行能的;相对付利用Fork/Join框架,这种气势气魄也更为直接。

查察本栏目

    关键字:

在线提交作业