GVKun编程网logo

scala.concurrent.forkjoin.ForkJoinPool与java.util.concurrent.ForkJoinPool

15

对于scala.concurrent.forkjoin.ForkJoinPool与java.util.concurrent.ForkJoinPool感兴趣的读者,本文将会是一篇不错的选择,并为您提供关

对于scala.concurrent.forkjoin.ForkJoinPool与java.util.concurrent.ForkJoinPool感兴趣的读者,本文将会是一篇不错的选择,并为您提供关于(十九)java多线程之ForkJoinPool、2019年最新Java线程池之ThreadPool与ForkJoinPool解析、CompletableFuture / ForkJoinPool设置类加载器、Fork/join框架之ForkJoinPool的有用信息。

本文目录一览:

scala.concurrent.forkjoin.ForkJoinPool与java.util.concurrent.ForkJoinPool

scala.concurrent.forkjoin.ForkJoinPool与java.util.concurrent.ForkJoinPool

为什么ForkJoinPool为Scala分叉?

哪种实现方式和哪种情况是首选?

答案1

小编典典

scala库拥有自己的ForkJoinPool副本的明显原因是scala必须在1.7之前的JVM上运行,并且ForkJoinPool仅在Java
1.7中引入。

此外,还对内部(标量)用途进行了一些更改,例如:

https://github.com/scala/scala/commit/76e9da2ca4c31daec2b04848c3c2dbad6ecd426e

鉴于scala的版本可能不会给您带来任何优势(如果您针对Java
1.7进行编译和运行),我想说,对于您自己的使用,您可能应该使用Java的版本。至少java的版本已被精确记录并完全“公开”,而scala的版本状态尚不清楚(它很可能仅供内部使用)。但是在某些地方,您可能别无选择。举例来说,ForkJoinTasks有一种forkJoinPool方法期望使用Scala的版本ForkJoinPool。如果有人可以获得/找到scala版本的任何正式身份,ForkJoinPool说明它确实是公开且稳定的,那么我将很乐意回复此建议。

(十九)java多线程之ForkJoinPool

(十九)java多线程之ForkJoinPool

本人邮箱: kco1989@qq.com
欢迎转载,转载请注明网址 http://blog.csdn.net/tianshi_kco
github: https://github.com/kco1989/kco
代码已经全部托管github有需要的同学自行下载

引言

java 7提供了另外一个很有用的线程池框架,Fork/Join框架

理论

Fork/Join框架主要有以下两个类组成.
* ForkJoinPool 这个类实现了ExecutorService接口和工作窃取算法(Work-Stealing Algorithm).它管理工作者线程,并提供任务的状态信息,以及任务的执行信息
* ForkJoinTask 这个类是一个将在ForkJoinPool执行的任务的基类.

Fork/Join框架提供了在一个任务里执行fork()join()操作的机制和控制任务状态的方法.通常,为了实现Fork/Join任务,需要实现一个以下两个类之一的子类
* RecursiveAction 用于任务没有返回值的场景
* RecursiveTask 用于任务有返回值的场景.

例子 先定个小目标,1亿就太多,先赚个一百万吧

现在你是一个深圳片区的某公司高级销售主管.现在定了一个目标,就是要赚个一百,让你一个人去赚,肯定有难度的.好在有一般手下,把目标缩小,让小弟们去赚,我们坐等拿钱.ok,开始编程

  • 首先我们要定义个赚钱任务 MakeMoneyTask,如果要赚钱的目标小于最小目标,比如十万,那么就自己去完成,否则,就把任务分给小弟们去做.
public class MakeMoneyTask extends RecursiveTask<Integer>{

    private static final int MIN_GOAL_MONEY = 100000;
    private int goalMoney;
    private String name;
    private static final AtomicLong employeeNo = new AtomicLong();
    public MakeMoneyTask(int goalMoney){
        this.goalMoney = goalMoney;
        this.name = "员工" + employeeNo.getAndIncrement() + "号";
    }
    @Override
    protected Integer compute() {
        if (this.goalMoney < MIN_GOAL_MONEY){
            System.out.println(name + ": 老板交代了,要赚 " + goalMoney + " 元,为了买车买房,加油吧....");
            return makeMoney();
        }else{
            int subThreadCount = ThreadLocalRandom.current().nextInt(10) + 2;
            System.out.println(name + ": 上级要我赚 " + goalMoney + ", 有点小多,没事让我" + subThreadCount + "个手下去完成吧," +
                    "每人赚个 " + Math.ceil(goalMoney * 1.0 / subThreadCount) + "元应该没问题...");
            List<MakeMoneyTask> tasks = new ArrayList<>();
            for (int i = 0; i < subThreadCount; i ++){
                tasks.add(new MakeMoneyTask(goalMoney / subThreadCount));
            }
            Collection<MakeMoneyTask> makeMoneyTasks = invokeAll(tasks);
            int sum = 0;
            for (MakeMoneyTask moneyTask : makeMoneyTasks){
                try {
                    sum += moneyTask.get();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            System.out.println(name + ": 嗯,不错,效率还可以,终于赚到 " + sum + "元,赶紧邀功去....");
            return sum;
        }
    }

    private Integer makeMoney(){
        int sum = 0;
        int day = 1;
        try {
            while (true){
                Thread.sleep(ThreadLocalRandom.current().nextInt(500));
                int money = ThreadLocalRandom.current().nextInt(MIN_GOAL_MONEY / 3);
                System.out.println(name + ": 在第 " + (day ++) + " 天赚了" + money);
                sum += money;
                if (sum >= goalMoney){
                    System.out.println(name + ": 终于赚到 " + sum + " 元, 可以交差了...");
                    break;
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return sum;
    }
}
  • 最后我们写一个测试类
public class TestMain {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Integer> task = pool.submit(new MakeMoneyTask(1000000));
        do {
            try {
                TimeUnit.MILLISECONDS.sleep(5);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }while (!task.isDone());
        pool.shutdown();
        System.out.println(task.get());

    }
}

运作之后结果如下:

员工0号: 上级要我赚 1000000, 有点小多,没事让我10个手下去完成吧,每人赚个 100000.0元应该没问题…
员工1号: 上级要我赚 100000, 有点小多,没事让我7个手下去完成吧,每人赚个 14286.0元应该没问题…
员工11号: 老板交代了,要赚 14285 元,为了买车买房,加油吧….
员工10号: 上级要我赚 100000, 有点小多,没事让我5个手下去完成吧,每人赚个 20000.0元应该没问题…
员工18号: 老板交代了,要赚 20000 元,为了买车买房,加油吧….
员工9号: 上级要我赚 100000, 有点小多,没事让我3个手下去完成吧,每人赚个 33334.0元应该没问题…
员工23号: 老板交代了,要赚 33333 元,为了买车买房,加油吧….
员工22号: 老板交代了,要赚 20000 元,为了买车买房,加油吧….
员工22号: 在第 1 天赚了31432
员工22号: 终于赚到 31432 元, 可以交差了…
员工21号: 老板交代了,要赚 20000 元,为了买车买房,加油吧….
员工18号: 在第 1 天赚了32005
员工18号: 终于赚到 32005 元, 可以交差了…
员工19号: 老板交代了,要赚 20000 元,为了买车买房,加油吧….
员工23号: 在第 1 天赚了6166
员工21号: 在第 1 天赚了15433
员工19号: 在第 1 天赚了23419
员工19号: 终于赚到 23419 元, 可以交差了…
员工20号: 老板交代了,要赚 20000 元,为了买车买房,加油吧….
员工20号: 在第 1 天赚了10376
员工11号: 在第 1 天赚了11808
员工21号: 在第 2 天赚了31059
员工21号: 终于赚到 46492 元, 可以交差了…
员工8号: 上级要我赚 100000, 有点小多,没事让我4个手下去完成吧,每人赚个 25000.0元应该没问题…
员工26号: 老板交代了,要赚 25000 元,为了买车买房,加油吧….
员工11号: 在第 2 天赚了11902
员工11号: 终于赚到 23710 元, 可以交差了…
员工12号: 老板交代了,要赚 14285 元,为了买车买房,加油吧….
员工23号: 在第 2 天赚了9077
员工20号: 在第 2 天赚了30386
员工20号: 终于赚到 40762 元, 可以交差了…
员工10号: 嗯,不错,效率还可以,终于赚到 174110元,赶紧邀功去….
员工7号: 上级要我赚 100000, 有点小多,没事让我10个手下去完成吧,每人赚个 10000.0元应该没问题…
员工30号: 老板交代了,要赚 10000 元,为了买车买房,加油吧….
员工12号: 在第 1 天赚了31271
员工12号: 终于赚到 31271 元, 可以交差了…
员工26号: 在第 1 天赚了11631
员工13号: 老板交代了,要赚 14285 元,为了买车买房,加油吧….
员工26号: 在第 2 天赚了10160
员工30号: 在第 1 天赚了10786
员工30号: 终于赚到 10786 元, 可以交差了…
员工31号: 老板交代了,要赚 10000 元,为了买车买房,加油吧….
员工31号: 在第 1 天赚了15201
员工31号: 终于赚到 15201 元, 可以交差了…
员工32号: 老板交代了,要赚 10000 元,为了买车买房,加油吧….
员工26号: 在第 3 天赚了32642
员工26号: 终于赚到 54433 元, 可以交差了…
员工27号: 老板交代了,要赚 25000 元,为了买车买房,加油吧….
员工23号: 在第 3 天赚了33072
员工23号: 终于赚到 48315 元, 可以交差了…
员工24号: 老板交代了,要赚 33333 元,为了买车买房,加油吧….
员工24号: 在第 1 天赚了26309
员工24号: 在第 2 天赚了15420
员工24号: 终于赚到 41729 元, 可以交差了…
员工25号: 老板交代了,要赚 33333 元,为了买车买房,加油吧….
员工13号: 在第 1 天赚了33266
员工13号: 终于赚到 33266 元, 可以交差了…
员工14号: 老板交代了,要赚 14285 元,为了买车买房,加油吧….
员工25号: 在第 1 天赚了19270
员工25号: 在第 2 天赚了15842
员工25号: 终于赚到 35112 元, 可以交差了…
员工9号: 嗯,不错,效率还可以,终于赚到 125156元,赶紧邀功去….
员工6号: 上级要我赚 100000, 有点小多,没事让我9个手下去完成吧,每人赚个 11112.0元应该没问题…
员工40号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工32号: 在第 1 天赚了8133
员工32号: 在第 2 天赚了3518
员工32号: 终于赚到 11651 元, 可以交差了…
员工33号: 老板交代了,要赚 10000 元,为了买车买房,加油吧….
员工27号: 在第 1 天赚了23200
员工14号: 在第 1 天赚了6366
员工27号: 在第 2 天赚了10406
员工27号: 终于赚到 33606 元, 可以交差了…
员工28号: 老板交代了,要赚 25000 元,为了买车买房,加油吧….
员工40号: 在第 1 天赚了28078
员工40号: 终于赚到 28078 元, 可以交差了…
员工41号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工41号: 在第 1 天赚了12996
员工41号: 终于赚到 12996 元, 可以交差了…
员工42号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工33号: 在第 1 天赚了29188
员工33号: 终于赚到 29188 元, 可以交差了…
员工34号: 老板交代了,要赚 10000 元,为了买车买房,加油吧….
员工14号: 在第 2 天赚了17712
员工14号: 终于赚到 24078 元, 可以交差了…
员工15号: 老板交代了,要赚 14285 元,为了买车买房,加油吧….
员工28号: 在第 1 天赚了18623
员工28号: 在第 2 天赚了8205
员工28号: 终于赚到 26828 元, 可以交差了…
员工29号: 老板交代了,要赚 25000 元,为了买车买房,加油吧….
员工34号: 在第 1 天赚了30779
员工34号: 终于赚到 30779 元, 可以交差了…
员工35号: 老板交代了,要赚 10000 元,为了买车买房,加油吧….
员工42号: 在第 1 天赚了26164
员工42号: 终于赚到 26164 元, 可以交差了…
员工43号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工43号: 在第 1 天赚了2995
员工29号: 在第 1 天赚了347
员工15号: 在第 1 天赚了33056
员工15号: 终于赚到 33056 元, 可以交差了…
员工16号: 老板交代了,要赚 14285 元,为了买车买房,加油吧….
员工35号: 在第 1 天赚了3639
员工29号: 在第 2 天赚了22909
员工43号: 在第 2 天赚了2289
员工16号: 在第 1 天赚了27836
员工16号: 终于赚到 27836 元, 可以交差了…
员工17号: 老板交代了,要赚 14285 元,为了买车买房,加油吧….
员工43号: 在第 3 天赚了694
员工17号: 在第 1 天赚了16361
员工17号: 终于赚到 16361 元, 可以交差了…
员工1号: 嗯,不错,效率还可以,终于赚到 189578元,赶紧邀功去….
员工2号: 上级要我赚 100000, 有点小多,没事让我2个手下去完成吧,每人赚个 50000.0元应该没问题…
员工49号: 老板交代了,要赚 50000 元,为了买车买房,加油吧….
员工49号: 在第 1 天赚了8599
员工43号: 在第 4 天赚了10008
员工43号: 终于赚到 15986 元, 可以交差了…
员工44号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工29号: 在第 3 天赚了31298
员工29号: 终于赚到 54554 元, 可以交差了…
员工8号: 嗯,不错,效率还可以,终于赚到 169421元,赶紧邀功去….
员工39号: 老板交代了,要赚 10000 元,为了买车买房,加油吧….
员工49号: 在第 2 天赚了8099
员工35号: 在第 2 天赚了164
员工49号: 在第 3 天赚了5518
员工49号: 在第 4 天赚了22441
员工44号: 在第 1 天赚了6091
员工39号: 在第 1 天赚了18813
员工39号: 终于赚到 18813 元, 可以交差了…
员工48号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工44号: 在第 2 天赚了22324
员工44号: 终于赚到 28415 元, 可以交差了…
员工45号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工49号: 在第 5 天赚了28438
员工49号: 终于赚到 73095 元, 可以交差了…
员工50号: 老板交代了,要赚 50000 元,为了买车买房,加油吧….
员工35号: 在第 3 天赚了31797
员工35号: 终于赚到 35600 元, 可以交差了…
员工36号: 老板交代了,要赚 10000 元,为了买车买房,加油吧….
员工50号: 在第 1 天赚了18071
员工45号: 在第 1 天赚了22528
员工45号: 终于赚到 22528 元, 可以交差了…
员工46号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工36号: 在第 1 天赚了26828
员工36号: 终于赚到 26828 元, 可以交差了…
员工37号: 老板交代了,要赚 10000 元,为了买车买房,加油吧….
员工50号: 在第 2 天赚了32422
员工50号: 终于赚到 50493 元, 可以交差了…
员工2号: 嗯,不错,效率还可以,终于赚到 123588元,赶紧邀功去….
员工3号: 上级要我赚 100000, 有点小多,没事让我9个手下去完成吧,每人赚个 11112.0元应该没问题…
员工51号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工46号: 在第 1 天赚了1537
员工46号: 在第 2 天赚了27529
员工46号: 终于赚到 29066 元, 可以交差了…
员工47号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工48号: 在第 1 天赚了24791
员工48号: 终于赚到 24791 元, 可以交差了…
员工38号: 老板交代了,要赚 10000 元,为了买车买房,加油吧….
员工37号: 在第 1 天赚了17587
员工37号: 终于赚到 17587 元, 可以交差了…
员工47号: 在第 1 天赚了23693
员工47号: 终于赚到 23693 元, 可以交差了…
员工6号: 嗯,不错,效率还可以,终于赚到 211717元,赶紧邀功去….
员工5号: 上级要我赚 100000, 有点小多,没事让我7个手下去完成吧,每人赚个 14286.0元应该没问题…
员工60号: 老板交代了,要赚 14285 元,为了买车买房,加油吧….
员工51号: 在第 1 天赚了27189
员工51号: 终于赚到 27189 元, 可以交差了…
员工52号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工38号: 在第 1 天赚了32285
员工38号: 终于赚到 32285 元, 可以交差了…
员工66号: 老板交代了,要赚 14285 元,为了买车买房,加油吧….
员工7号: 嗯,不错,效率还可以,终于赚到 228718元,赶紧邀功去….
员工65号: 老板交代了,要赚 14285 元,为了买车买房,加油吧….
员工65号: 在第 1 天赚了26122
员工65号: 终于赚到 26122 元, 可以交差了…
员工64号: 老板交代了,要赚 14285 元,为了买车买房,加油吧….
员工52号: 在第 1 天赚了19239
员工52号: 终于赚到 19239 元, 可以交差了…
员工53号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工60号: 在第 1 天赚了10433
员工66号: 在第 1 天赚了25993
员工66号: 终于赚到 25993 元, 可以交差了…
员工63号: 老板交代了,要赚 14285 元,为了买车买房,加油吧….
员工60号: 在第 2 天赚了19529
员工60号: 终于赚到 29962 元, 可以交差了…
员工61号: 老板交代了,要赚 14285 元,为了买车买房,加油吧….
员工64号: 在第 1 天赚了6894
员工53号: 在第 1 天赚了13114
员工53号: 终于赚到 13114 元, 可以交差了…
员工54号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工54号: 在第 1 天赚了8237
员工61号: 在第 1 天赚了15878
员工61号: 终于赚到 15878 元, 可以交差了…
员工62号: 老板交代了,要赚 14285 元,为了买车买房,加油吧….
员工63号: 在第 1 天赚了32108
员工63号: 终于赚到 32108 元, 可以交差了…
员工4号: 上级要我赚 100000, 有点小多,没事让我9个手下去完成吧,每人赚个 11112.0元应该没问题…
员工67号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工64号: 在第 2 天赚了30531
员工64号: 终于赚到 37425 元, 可以交差了…
员工75号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工54号: 在第 2 天赚了13562
员工54号: 终于赚到 21799 元, 可以交差了…
员工55号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工55号: 在第 1 天赚了17774
员工55号: 终于赚到 17774 元, 可以交差了…
员工56号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工67号: 在第 1 天赚了24463
员工67号: 终于赚到 24463 元, 可以交差了…
员工68号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工56号: 在第 1 天赚了1677
员工62号: 在第 1 天赚了14266
员工75号: 在第 1 天赚了26532
员工75号: 终于赚到 26532 元, 可以交差了…
员工74号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工68号: 在第 1 天赚了32639
员工68号: 终于赚到 32639 元, 可以交差了…
员工69号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工69号: 在第 1 天赚了9513
员工56号: 在第 2 天赚了9154
员工56号: 在第 3 天赚了289
员工56号: 终于赚到 11120 元, 可以交差了…
员工57号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工62号: 在第 2 天赚了17321
员工62号: 终于赚到 31587 元, 可以交差了…
员工5号: 嗯,不错,效率还可以,终于赚到 199075元,赶紧邀功去….
员工59号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工69号: 在第 2 天赚了17971
员工69号: 终于赚到 27484 元, 可以交差了…
员工70号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工74号: 在第 1 天赚了26270
员工74号: 终于赚到 26270 元, 可以交差了…
员工73号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工70号: 在第 1 天赚了21237
员工70号: 终于赚到 21237 元, 可以交差了…
员工71号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工59号: 在第 1 天赚了4411
员工57号: 在第 1 天赚了3546
员工57号: 在第 2 天赚了29330
员工57号: 终于赚到 32876 元, 可以交差了…
员工58号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工73号: 在第 1 天赚了10674
员工71号: 在第 1 天赚了8821
员工59号: 在第 2 天赚了11887
员工59号: 终于赚到 16298 元, 可以交差了…
员工72号: 老板交代了,要赚 11111 元,为了买车买房,加油吧….
员工58号: 在第 1 天赚了28241
员工58号: 终于赚到 28241 元, 可以交差了…
员工3号: 嗯,不错,效率还可以,终于赚到 187650元,赶紧邀功去….
员工72号: 在第 1 天赚了14371
员工72号: 终于赚到 14371 元, 可以交差了…
员工73号: 在第 2 天赚了14918
员工73号: 终于赚到 25592 元, 可以交差了…
员工71号: 在第 2 天赚了28814
员工71号: 终于赚到 37635 元, 可以交差了…
员工4号: 嗯,不错,效率还可以,终于赚到 236223元,赶紧邀功去….
员工0号: 嗯,不错,效率还可以,终于赚到 1845236元,赶紧邀功去….
1845236

看到没有,员工0号把任务一百万直接分给了10个手下去做,每个手下有继续往下分,最终在七十几号人的努力下,终于完成了目标–一百万.而且还超出八十多万,老板一开心,直接把八十多万分给这七十多个员工分红了.

后记

通过上面这个例子的学习,相信应该很多人都可以掌握ForkJoinPool这个类,它的核心就是要完成某一个目标任务,如果目标任务太大,那么就创建多个子任务.然后一直等待这些子任务完成.最终完成之前定下的目标任务.


打赏

如果觉得我的文章写的还过得去的话,有钱就捧个钱场,没钱给我捧个人场(帮我点赞或推荐一下)
微信打赏支付宝打赏

2019年最新Java线程池之ThreadPool与ForkJoinPool解析

2019年最新Java线程池之ThreadPool与ForkJoinPool解析

Java线程池

前言


  Java线程池相信很多种解析,但是对于ThreadPool与ForkJoinPool的特性区别很少,下面本文就这两个线程成做了一个详细的讲解,下面一起来看一看吧。


正文

 

一、ThreadPool Executor

  一个线程池包括以下四个基本组成部分:
  1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
  2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
  3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
  4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。
  工作方式:
  线程池有一个工作队列,队列中包含了要分配给各线程的工作。当线程空闲时,就会从队列中认领工作。由于线程资源的创建与销毁开销很大,所以ThreadPool允许线程的重用,减少创建与销毁的次数,提高效率。

  流程图细节:
流程图
二、ForkJoinPool Executor
  ForkJoinPool组成类:
  1,ForkJoinPool:充当fork/join框架里面的管理者,最原始的任务都要交给它才能处理。它负责控制整个fork/join有多少个workerThread,workerThread的创建,激活都是由它来掌控。它还负责workQueue队列的创建和分配,每当创建一个workerThread,它负责分配相应的workQueue。然后它把接到的活都交给workerThread去处理,它可以说是整个frok/join的容器。
  2,ForkJoinWorkerThread:fork/join里面真正干活的"工人",本质是一个线程。里面有一个ForkJoinPool.WorkQueue的队列存放着它要干的活,接活之前它要向ForkJoinPool注册(registerWorker),拿到相应的workQueue。然后就从workQueue里面拿任务出来处理。它是依附于ForkJoinPool而存活,如果ForkJoinPool的销毁了,它也会跟着结束。
  3,ForkJoinPool.WorkQueue: 双端队列就是它,它负责存储接收的任务。
  4,ForkJoinTask:代表fork/join里面任务类型,我们一般用它的两个子类RecursiveTask、RecursiveAction。这两个区别在于RecursiveTask任务是有返回值,RecursiveAction没有返回值。任务的处理逻辑包括任务的切分都集中在compute()方法里面。
  工作方式:
  使用一种分治算法,递归地将任务分割成更小的子任务,其中阈值可配置,然后把子任务分配给不同的线程执行并发执行,最后再把结果组合起来。该用法常见于数组与集合的运算。
  由于提交的任务不一定能够递归地分割成ForkJoinTask,且ForkJoinTask执行时间不等长,所以ForkJoinPool使用一种工作窃取的算法,允许空闲的线程“窃取”分配给另一个线程的工作。由于工作无法平均分配并执行。所以工作窃取算法能更高效地利用硬件资源。
  流程图细节:
流程图

三、应用场景

  ThreadPool:多见于线程并发,阻塞时延比较长的,这种线程池比较常用,一般设置的线程个数根据业务性能要求会比较多。
  ForkJoinPool:特点是少量线程完成大量任务,一般用于非阻塞的,能快速处理的业务,或阻塞时延比较少的。

结尾

  通过本文的详细讲解各位小伙伴们对于线程池ThreadPool与ForkJoinPool是不是有了一个初步的了解了?小伙伴们以后要多多练习达到学以致用的级别,才能说是掌握这两个线程池。

CompletableFuture / ForkJoinPool设置类加载器

CompletableFuture / ForkJoinPool设置类加载器

我解决了一个非常具体的问题,它的解决方案似乎是基本的:

我(Spring)应用程序的类加载器层次结构是这样的: SystemClassLoader -> PlatformClassLoader ->AppClassLoader

如果我使用Java CompleteableFuture运行线程。该ContextClassLoader线程的是:
SystemClassLoader -> PlatformClassLoader -> ThreadClassLoader

因此,AppClassLoader尽管必须访问,但无法访问任何类,因为所有外部库类都驻留在该类中。

源代码库很大,因此我不想/不能将所有与线程相关的部分重写为其他内容(例如,将自定义执行程序传递给每个调用)。

所以我的问题是:
如何使通过创建的线程(例如,CompleteableFuture.supplyAsync()使用)AppClassLoader作为父级?(而不是PlatformClassloader

我发现ForkJoinPool用于创建线程。但是在我看来,所有东西都是
静态的最终的
。因此,我怀疑在这种情况下,即使使用系统属性设置自定义的ForkJoinWorkerThreadFactory也会有所帮助。还是会?

编辑以回答评论中的问题:

  • 您部署到哪里? 是否在码头/ tomcat /任何JEE容器中运行?

    • 我正在使用默认的Spring Boot设置,因此使用了一个内部tomcat容器。
    • 您遇到的确切问题是什么?

    • 确切的问题是: java.lang.IllegalArgumentException:org.keycloak.admin.client.resource.Realms从类加载器看不到方法引用的资源

    • 您提交给supplyAsync()的作业是从AppClassLoader创建的,不是吗?

    • supplyAsync从被称为MainThread它使用AppClassLoader。但是,调试应用程序表明,所有此类线程都具有PlatformClassLoader其父级。据我了解,这是因为ForkJoinPool.commonPool()是在应用程序启动期间构造的(因为它是静态的),因此使用默认的类加载器作为父类PlatformClassLoader。因此,该池中的所有线程均PlatformClassLoader作为ContextClassLoader的父级(而不是AppClassLoader)。

    • 当我在中创建自己的执行程序MainThread并将该执行程序传递给supplyAsync所有可用的程序时,我可以在调试过程中看到确实现在AppClassLoader是我的执行程序的父级ThreadClassLoader。在第一种情况下,这似乎肯定了我的假设,即MainThread至少在使用公共池时不会创建公共池AppClassLoader

完整的堆栈跟踪:

java.lang.IllegalArgumentException: org.keycloak.admin.client.resource.RealmsResource referenced from a method is not visible from class loader    at java.base/java.lang.reflect.Proxy$ProxyBuilder.ensureVisible(Proxy.java:851) ~[na:na]    at java.base/java.lang.reflect.Proxy$ProxyBuilder.validateProxyInterfaces(Proxy.java:682) ~[na:na]    at java.base/java.lang.reflect.Proxy$ProxyBuilder.<init>(Proxy.java:628) ~[na:na]    at java.base/java.lang.reflect.Proxy.lambda$getProxyConstructor$1(Proxy.java:426) ~[na:na]    at java.base/jdk.internal.loader.AbstractClassLoaderValue$Memoizer.get(AbstractClassLoaderValue.java:327) ~[na:na]    at java.base/jdk.internal.loader.AbstractClassLoaderValue.computeIfAbsent(AbstractClassLoaderValue.java:203) ~[na:na]    at java.base/java.lang.reflect.Proxy.getProxyConstructor(Proxy.java:424) ~[na:na]    at java.base/java.lang.reflect.Proxy.newProxyInstance(Proxy.java:999) ~[na:na]    at org.jboss.resteasy.client.jaxrs.ProxyBuilder.proxy(ProxyBuilder.java:79) ~[resteasy-client-3.1.4.Final.jar!/:3.1.4.Final]    at org.jboss.resteasy.client.jaxrs.ProxyBuilder.build(ProxyBuilder.java:131) ~[resteasy-client-3.1.4.Final.jar!/:3.1.4.Final]    at org.jboss.resteasy.client.jaxrs.internal.ClientWebTarget.proxy(ClientWebTarget.java:93) ~[resteasy-client-3.1.4.Final.jar!/:3.1.4.Final]    at org.keycloak.admin.client.Keycloak.realms(Keycloak.java:114) ~[keycloak-admin-client-3.4.3.Final.jar!/:3.4.3.Final]    at org.keycloak.admin.client.Keycloak.realm(Keycloak.java:118) ~[keycloak-admin-client-3.4.3.Final.jar!/:3.4.3.Final]

答案1

小编典典

因此,这是一个非常肮脏的解决方案,我不为此感到骄傲,如果您继续使用它, 可能会为您 带来麻烦:

问题是该应用程序的类加载器未用于ForkJoinPool.commonPool()。由于commonPool的设置是静态的,因此在应用程序启动期间,不容易(至少据我所知)以后进行更改。因此,我们需要依赖
Java反射API

  1. 在应用程序成功启动后创建一个钩子

    • 就我而言(Spring Boot环境),这将是ApplicationReadyEvent
    • 要收听此事件,您需要以下组件
          @Component

      class ForkJoinCommonPoolFix : ApplicationListener {
      override fun onApplicationEvent(event: ApplicationReadyEvent?) {
      }
      }

  2. 在挂钩中,您需要将ForkJoinWorkerThreadFactorycommonPool 设置为自定义实现(因此此自定义实现将使用应用程序类加载器)

    • 在科特林
          val javaClass = ForkJoinPool.commonPool()::class.java

      val field = javaClass.getDeclaredField(“factory”)
      field.isAccessible = true
      val modifiers = field::class.java.getDeclaredField(“modifiers”)
      modifiers.isAccessible = true
      modifiers.setInt(field, field.modifiers and Modifier.FINAL.inv())
      field.set(ForkJoinPool.commonPool(), CustomForkJoinWorkerThreadFactory())
      field.isAccessible = false

  3. 简单实施 CustomForkJoinWorkerThreadFactory

    • 在科特林
          //Custom class

      class CustomForkJoinWorkerThreadFactory : ForkJoinPool.ForkJoinWorkerThreadFactory {
      override fun newThread(pool: ForkJoinPool?): ForkJoinWorkerThread {
      return CustomForkJoinWorkerThread(pool)
      }
      }
      // helper class (probably only needed in kotlin)
      class CustomForkJoinWorkerThread(pool: ForkJoinPool?) : ForkJoinWorkerThread(pool)

如果您需要 有关反射的更多信息
以及为什么更改最终字段不好,请参阅此处和此处。简短摘要:由于优化,更新的最终字段可能对其他对象不可见,并且可能发生其他未知的副作用。

如前所述:这是一个非常肮脏的解决方案。
如果使用此解决方案,可能会发生有害的副作用。使用这样的反射不是一个好主意。如果您可以使用没有反思的解决方案(并在此处发布答案!)。

编辑:单个呼叫的替代

就像问题本身所指出的那样:如果您仅在少数几个地方遇到此问题(即自行解决此问题本身就没有问题),则可以使用自己的Executor。从此处复制的一个简单示例:

ExecutorService pool = Executors.newFixedThreadPool(10);final CompletableFuture<String> future =     CompletableFuture.supplyAsync(() -> { /* ... */ }, pool);

Fork/join框架之ForkJoinPool

Fork/join框架之ForkJoinPool

####概述 jdk7新增了并发框架-fork/join框架,在这种框架下,ForkJoinTask代表一个需要执行的任务,真正执行这些任务的线程是放在一个线程池(ForkJoinPool)里面。ForkJoinPool是一个可以执行ForkJoinTask的ExcuteService,与ExcuteService不同的是它采用了work-stealing模式:所有在池中的线程尝试去执行其他线程创建的子任务,这样就很少有线程处于空闲状态,非常高效。
池中维护着ForkJoinWorkerThread对象数组,数组大小由parallelism属性决定,parallelism默认为处理器个数

int n = parallelism << 1;
if (n >= MAX_ID)
    n = MAX_ID;//MAX_ID=0x7fff
else {
    n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8;
}
workers = new ForkJoinWorkerThread[n + 1];
  • 可见线程个数不会超过0x7fff。

####添加线程 什么情况下需要添加线程呢?当新的任务到来,线程池会通知其他线程前去处理,如果这时没有处于等待的线程或者处于活动的线程非常少(这是通过ctl属性来判断的),就会往线程池中添加线程。

private void addWorker() {  
    Throwable ex = null;  
    ForkJoinWorkerThread t = null;  
    try {  
        t = factory.newThread(this);  
    } catch (Throwable e) {  
        ex = e;  
    }  
    if (t == null) {  // null or exceptional factory return  
        long c;       // adjust counts  
        do {} while (!UNSAFE.compareAndSwapLong  
                     (this, ctlOffset, c = ctl,  
                      (((c - AC_UNIT) & AC_MASK) |  
                       ((c - TC_UNIT) & TC_MASK) |  
                       (c & ~(AC_MASK|TC_MASK)))));  
        // Propagate exception if originating from an external caller  
        if (!tryTerminate(false) && ex != null &&  
            !(Thread.currentThread() instanceof ForkJoinWorkerThread))  
            UNSAFE.throwException(ex);  
    }  
    else  
        t.start();  
}

添加线程的代码比较简单,通过工厂类创建一个线程,通过调用ForkJoinWorkerThread的run方法启动这个线程。如果失败则恢复ctl以前的值,并终止线程。工厂类直接调用其构造方法,最终添加线程其实是在registerWorker方法完成的。

for (int g;;) {  
    ForkJoinWorkerThread[] ws;  
            if (((g = scanGuard) & SG_UNIT) == 0 &&  
                UNSAFE.compareAndSwapInt(this, scanGuardOffset,  
                                         g, g | SG_UNIT)) {  
                int k = nextWorkerIndex;  
                try {  
                    if ((ws = workers) != null) { // ignore on shutdown  
                        int n = ws.length;  
                        if (k < 0 || k >= n || ws[k] != null) {  
                            for (k = 0; k < n && ws[k] != null; ++k)  
                                ;  
                            if (k == n)  
                                ws = workers = Arrays.copyOf(ws, n << 1);  
                        }  
                        ws[k] = w;  
                        nextWorkerIndex = k + 1;  
                        int m = g & SMASK;  
                        g = (k > m) ? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1);  
                    }  
                } finally {  
                    scanGuard = g;  
                }  
                return k;  
            }  
            else if ((ws = workers) != null) { // help release others  
                for (ForkJoinWorkerThread u : ws) {  
                    if (u != null && u.queueBase != u.queueTop) {  
                        if (tryReleaseWaiter())  
                            break;  
                    }  
                }  
            }  
}

这里有个属性scanGuard有必要提一下,从Guard这个词可以知道它是在保护什么,是在保护works这个数组。当需要更新这个数组时,通过不断地检查scanGuard来达到保护的目的。
整个框架大量采用顺序锁,好处是不用阻塞,不好的地方是会有额外的循环。这里也是通过循环来注册这个线程,在循环的过程中有两种情况发生:1、compareAndSwapInt操作成功;2、操作失败。
第一种情况:扫描workers数组,找到一个为空的项,并把新创建的线程放在这个位置;如果没有找到,表示数组大小不够,则将数组扩大2倍;
第二种情况:需要循环重新尝试直接成功为止,从代码中可以看出,即使是失败了,也不忘做一些额外的事:通知其他线程去执行没有完成的任务。

####执行任务 除了从ExecutorService继承过来的execute和submit方法外,还对这两个方法进行了覆盖和重载。

public void execute(ForkJoinTask<?> task) {  
    if (task == null)  
        throw new NullPointerException();  
    forkOrSubmit(task);  
}

public void execute(Runnable task) {  
    if (task == null)  
        throw new NullPointerException();  
    ForkJoinTask<?> job;  
    if (task instanceof ForkJoinTask<?>) // avoid re-wrap  
        job = (ForkJoinTask<?>) task;  
    else  
        job = ForkJoinTask.adapt(task, null);  
    forkOrSubmit(job);  
}

对参数为Runnable的execute进行了加强,将Runnable这种普通任务适配成ForkJoinTask这种任务,然后做为参数传给forkOrSubmit方法统一处理。

private <T> void forkOrSubmit(ForkJoinTask<T> task) {  
    ForkJoinWorkerThread w;  
    Thread t = Thread.currentThread();  
    if (shutdown)  
        throw new RejectedExecutionException();  
    if ((t instanceof ForkJoinWorkerThread) &&  
        (w = (ForkJoinWorkerThread)t).pool == this)  
        w.pushTask(task);  
    else  
        addSubmission(task);  
}

从以上代码可以看出,这两种任务最终的归属还是不一样,ForkJoinTask这种任务被放到线程内部的队列里面,普通的Runnable任务被放到线程池的队列里面了。
除了通过调用execute方法外,对于ForkJoinTask任务通过调用fork方法也可以向自己所在的线程队列中添加一个任务。

####终止线程池 和ExecutorService一样,可以调用shutdown()shutdownNow()来终止线程,会先设置每个线程的任务状态为CANCELLED,然后调用Thread的interrupt方法来终止每个线程。
总结:ForkJoinPool就是一个ExcuteService,与ExcuteService不同的是:
1、ExcuteService执行Runnable/Callable任务,ForkJoinPool除了可以执行Runnable任务外,还可以执行ForkJoinTask任务;
2、ExcuteService中处于后面的任务需要等待前面任务执行后才有机会执行,而ForkJoinPool会采用work-stealing模式帮助其他线程执行任务,即ExcuteService解决的是并发问题,而ForkJoinPool解决的是并行问题。

关于scala.concurrent.forkjoin.ForkJoinPool与java.util.concurrent.ForkJoinPool的介绍已经告一段落,感谢您的耐心阅读,如果想了解更多关于(十九)java多线程之ForkJoinPool、2019年最新Java线程池之ThreadPool与ForkJoinPool解析、CompletableFuture / ForkJoinPool设置类加载器、Fork/join框架之ForkJoinPool的相关信息,请在本站寻找。

本文标签: