46 通过Fork/Join框架分而治之完成数据处理
Diego38 37 1

1. 前言

Fork-Join框架是JDK7新引入的一种特殊的任务处理框架,内部由ForkJoinPool,RecursiveTask组合完成。

用于将一个大任务分割成若干小任务,上节我们学习的CompletableFuture默认就是使用ForkJoinPool。

实际上,Fork-Join框架在我们日常工作中使用较少,不是面试中考察的重点,了解基本的使用和实现就可以了,内部实现也相对复杂,感兴趣的同学可以研究其中的分而治之的设计思想

2. Fork-Join框架的使用

Fork-Join是把大任务切割成小任务,然后将小任务的结果汇总得到大任务的结果的过程。举个例子,假设我们要累加一个list中所有整形数字的和,{1,4,5,6,7,2,78,89,…, 32,15},list中有10万个元素,我们可以将这10万个数字分成100个任务,每个任务1000个数字,最后将这100个任务的结果进行汇总。

如图所示: image

以数字加和为例,实现代码如下:

public class ForkJoinTest {

  static class ForkTask extends RecursiveTask<Integer> {
    //限制每个小任务最多元素20个
    private static final int THRESHOLD = 20;
    private int arr[];
    private int start;
    private int end;

    // 累加从start到end的数组元素
    public ForkTask(int[] arr, int start, int end) {
      this.arr = arr;
      this.start = start;
      this.end = end;
    }

    @Override
    protected Integer compute() {
      int sum = 0;
      // 当end与start之间的差小于THRESHOLD时,开始进行实际累加
      if (end - start < THRESHOLD) {
        for (int i = start; i < end; i++) {
          sum += arr[i];
        }
        return sum;
      } else {
        //以下很重要,是要切割的逻辑
        int middle = (start + end) / 2;
        ForkTask left = new ForkTask(arr, start, middle);
        ForkTask right = new ForkTask(arr, middle, end);
        // 调用fork来fork任务
        left.fork();
        right.fork();
        // 将任务累加起来
        return left.join() + right.join();
      }
    }
  }

  public static void main(String[] args)
          throws Exception {
    int[] arr = new int[100];
    Random rand = new Random();
    int total = 0;
    // 随机生产100个元素
    for (int i = 0, len = arr.length; i < len; i++) {
      int tmp = rand.nextInt(20);
      // 对数组元素赋值,并将数组元素的值添加到total总和中。
      total += (arr[i] = tmp);
    }
    System.out.println("通过单线程计算得出结果:" + total);
    ForkJoinPool pool = new ForkJoinPool();
    // 提交可分解的CalTask任务
    Future<Integer> future = pool.submit(new ForkTask(arr, 0, arr.length));
    System.out.println("通过ForkJoin框架得出结果:" + future.get());
    // 关闭线程池
    pool.shutdown();
  }
}

使用ForkJoin框架需要关注几点

  • 需要继承RecursiveTask实现一个ForkTask任务,比如上例中的FokrTask。
  • 覆盖compute方法,定义好切割任务的条件、单个任务的计算逻辑、切割逻辑
  • 构造ForkJoinPool实例,提交刚刚定义好的RecursiveTask

上述代码即是按照图中描述的,fork + join来完成任务处理的。但从代码可以看出在任务中需要关注任务切割和显示调用fork,这对业务代码是不友好的,除非用在公共组件当中。

3. Fork-Join的基本原理

Fork-Join有一些设计技巧是值得借鉴的,Fork-Join框架的原理如图所示 image (图示来自于网络)

  • ForkJoinPool是由ForkJoinTask和ForkJoinWorkerThread组成,当调用fork时,会调用ForkJoinWorkerThread将这个任务push到数组队列中
  • 每个数组队列对应一个线程,这个线程要么新创建,要么处于空闲状态被唤醒,线程接到任务会做pop操作,取出任务,进行执行
  • 做join操作就是阻塞等待子任务线程完成任务
  • 每个线程一个队列,自己消费完了可以stealing其他线程的task, 当一个工作线程无法再从其他线程中获取任务和失败处理的时候,它就会退出(通过yield、sleep和/或者优先级调整)并经过一段时间之后再度尝试直到所有的工作线程都被告知他们都处于空闲的状态。
  • 线程从top位置进行pop和push,做的Stealing其他线程从base位置进行stealing

从以上几点可以得出当线程空闲时会做Stealing,帮忙整体的ForkJoinPool合理的资源利用。

4. 总结

ForkJoin框架是JDK7新引入的,默认是CompletableFuture的默认框架。work-stealing工作窃取算法是ForkJoin框架的一大优化亮点。

预览图
评论区

索引目录