Fork/Join框架理解

摘要

Fork/Join任务适合用来处理需要递归处理的任务。整个流程是,将大任务递归分解成若干个小任务,直到任务足够小后停止分割,最后多个线程并发执行所有小任务,最后将这些任务结果进行汇总成最终的结果。JDK用来执行Fork/Join任务的工作线程池大小等于CPU核心数。在一个4核CPU上,最多可以同时执行4个子任务。

工作窃取算法

工作窃取(work-stealing)算法是指某个线程从其他队列中窃取任务来执行。那工作窃取算法是如何窃取的呢?假如我们准备处理大的任务,我们可以把这个任务分割成若干小任务,存放在不同的队列中。而每个小任务都有对应的work线程,线程和任务队列会一一对应。那么就会存在有些线程处理完了,有些线程还未处理完任务。那么这些处理完任务的线程与其等着,还不如帮助其他线程处理任务,于是就去窃取其他线程的队列中任务执行。而这时队列就存在窃取任务线程和被窃取任务线程(原线程)之间的竞争,这时候如果是普通队列的话,那么存在相互竞争获取队列中的任务。所以这个队列应该是双向队列更加的合适,这样被窃取任务线程(原线程)永远从双端的队列中的头部获取任务,而窃取任务线程则从队列中的尾部获取任务,减少了相互竞争,从而提高处理的效率。

工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

实践

要完成整个Fork/Join流程,需要以下两个类。

  • ForkJoinTask:我们要使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务中执行 fork() 和 join() 操作的机制,通常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类,Fork/Join 框架提供了以下两个子类:
    • RecursiveAction:用于没有返回结果的任务。
    • RecursiveTask :用于有返回结果的任务。
  • ForkJoinPool :ForkJoinTask 需要通过 ForkJoinPool 来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* @description: Fork/Join框架的demo
* @author: Kris
* @date: 2019-02-12 14:40
**/
public class ForkJoinTaskTest {

static class SumTask extends RecursiveTask<Integer>{

//阀值
private static final Integer THRESHOLD = 2;

private int[] numbers;
private int start;
private int end;

public SumTask(int[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
if( ((end - start) <= THRESHOLD) ){
//如何范围小于了阀值
int sum=0;
for(int i=start ; i<=end ; i++){
sum += numbers[i];
}
return sum;
}else {
//将任务拆分成两个小任务
int middle = (end + start)/2;
SumTask minTask = new SumTask(numbers,start,middle);
SumTask maxTask = new SumTask(numbers,middle+1,end);
//invokeAll的N个任务中,其中N-1个任务会使用fork()交给其它线程执行,
//但是,它还会留一个任务自己执行,这样,就充分利用了线程池,保证没有空闲的不干活的线程。
//注意若不加上invokeAll,会发生work线程一直空闲着。
invokeAll(minTask,maxTask);
minTask.fork();
maxTask.fork();
return minTask.join() + maxTask.join();
}
}
}

public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
int[] numbers = new int[]{1,2,3,4,5,6,7};
SumTask sumTask = new SumTask(numbers,0,6);
sou
Future<Integer> futureTask = forkJoinPool.submit(sumTask);
try {
System.out.println("sum:"+futureTask.get());
}catch (InterruptedException e){
e.printStackTrace();
}catch (ExecutionException e){
e.printStackTrace();
}
}
}

适应范围

如果一个应用能被分解成多个子任务,并且组合多个子任务的结果就能够获得最终的答案

参考

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×