GVKun编程网logo

CompletableFuture的完成处理程序在哪个线程中执行?(completablefuture 线程池)

11

对于想了解CompletableFuture的完成处理程序在哪个线程中执行?的读者,本文将是一篇不可错过的文章,我们将详细介绍completablefuture线程池,并且为您提供关于005-多线程-

对于想了解CompletableFuture的完成处理程序在哪个线程中执行?的读者,本文将是一篇不可错过的文章,我们将详细介绍completablefuture 线程池,并且为您提供关于005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture、CompletableFuture allof(..)join()与CompletableFuture.join()、CompletableFuture allof(..)。join()与CompletableFuture.join()、CompletableFuture CompletableFuture.supplyAsync 异常处理的有价值信息。

本文目录一览:

CompletableFuture的完成处理程序在哪个线程中执行?(completablefuture 线程池)

CompletableFuture的完成处理程序在哪个线程中执行?(completablefuture 线程池)

我对CompletableFuture方法有疑问:

public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)

事情是JavaDoc这么说的:

返回一个新的CompletionStage,当此阶段正常完成时,将使用该阶段的结果作为所提供函数的参数来执行该阶段。有关涵盖异常完成的规则​​,请参见CompletionStage文档。

那线程呢?这将在哪个线程中执行?如果将来由线程池完成怎么办?

答案1

小编典典

CompletableFuture文档中指定的策略可以帮助您更好地理解:

  • 对于非异步方法依赖完井供给动作可以由执行 该完成当前CompletableFuture线程或者通过完成方法的任何其他呼叫者

  • 所有没有显式Executor参数的异步方法都使用来执行ForkJoinPool.commonPool()(除非它不支持
    并行度至少为2,在这种情况下,将创建一个新的Thread来运行每个任务
    )。为了简化监视,调试和跟踪,所有生成的异步任务都是标记接口的实例CompletableFuture.AsynchronousCompletionTask

更新
:我也建议阅读@Mike的答案,作为对文档详细信息的有趣分析。

正如@nullpointer指出的那样,文档会告诉您您需要了解的内容。但是,相关文本令人惊讶地模糊不清,此处发布的某些评论(和答案)似乎依赖于文档不支持的假设。因此,我认为有必要将其分开。具体来说,我们应该非常仔细地阅读本段内容:

为非异步方法的相关完成提供的动作可以由完成当前CompletableFuture的线程执行,也可以由完成方法的任何其他调用者执行。

听起来很简单,但是细节上却很少。似乎故意避免描述何时可以在完成线程上调用依赖完成,而不是在调用诸如的完成方法期间thenApply。如前所述,以上段落实际上是在恳求我们用假设来填补空白。这很危险,特别是当主题涉及并发和异步编程时,当我们随着程序员的发展而产生的许多期望被抛诸脑后。让我们仔细看一下文档中没有说的内容。

该文档没有声称在调用之前注册的相关完成complete()将在完成线程上运行。而且,尽管它声明在调用诸如的完成方法时可能会调用从属补全thenApply,但并未声明将在注册它的线程上调用补全(注意单词“ any other”)。

对于任何CompletableFuture用于计划和撰写任务的人来说,这些都是潜在的重要点。考虑以下事件序列:

  1. 线程A通过注册一个相关的完成f.thenApply(c1)
  2. 一段时间后,线程B调用f.complete()
  3. 大约在同一时间,线程C通过注册另一个依赖完成f.thenApply(c2)

从概念上讲,complete()它做两件事:发布将来的结果,然后尝试调用相关的完成。现在,如果线程C 在发布结果值之后但在线程B开始调用之前运行,该c1怎么办?根据实现的不同,线程C可能会看到f已经完成,然后可以调用c1 和 c2。或者,线程C可以调用,c2而线程B可以调用c1。该文档不排除任何可能性。考虑到这一点,以下是文档不支持的假设:

  1. 在调用之前将调用在完成之前c注册的从属完成。f f.complete()
  2. c到时间f.complete()回来,那将完成;
  3. 依赖完成将以任何特定顺序(例如注册顺序)被调用;
  4. 完成之前 注册的相关完成f将在完成之后 注册的f完成之前调用。

考虑另一个示例:

  1. 线程A调用f.complete();
  2. 一段时间后,线程B通过f.thenApply(c1); 注册完成。
  3. 大约在同一时间,线程C通过分别注册一个完成f.thenApply(c2)

如果知道f已经完成,可以尝试假设c1会在期间调用f.thenApply(c1)和c2会在期间调用f.thenApply(c2)。人们可能会进一步假设,c1到时间f.thenApply(c1)返回时,它已经完成了。但是,文档不支持这些假设。它可能是一个线程,呼吁thenApply各上调调用既 c1和c2,而其他线程调用都不是。

仔细分析JDK代码可以确定上面的假设方案如何发挥作用。但这甚至是有风险的,因为您最终可能会依赖于(1)不可移植或(2)可能会更改的实现细节。最好的选择是不要假设javadocs或原始JSR规范中未阐明的任何内容。

tldr:谨慎假设,编写文档时,请尽可能清晰明了。简洁是一件奇妙的事情,但请注意人类填补空白的趋势。

005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture

005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture

一、概述

  创建线程的两种方式,一种是直接继承 Thread,另外一种就是实现 Runnable 接口。这两种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。而自从 Java 1.5 开始,就提供了 Callable 和 Future,通过它们可以在任务执行完毕之后得到任务执行结果。

  详述:https://www.cnblogs.com/bjlhx/p/7588971.html

1.1、Runnable 接口

它是一个接口,里面只声明了一个 run () 方法:

public interface Runnable {
    public abstract void run();
}

由于 run () 方法返回值为 void 类型,所以在执行完任务之后无法返回任何结果。

1.2、Callable 接口

Callable 接口位于 java.util.concurrent 包下,在它里面也只声明了一个方法,只不过这个方法叫做 call ()。

public interface Callable<V> {   
    V call() throws Exception;
}

是一个泛型接口,call () 函数返回的类型就是传递进来的 V 类型。Callable 接口可以看作是 Runnable 接口的补充,call 方法带有返回值,并且可以抛出异常。

1.3、Future 接口

  Future 的核心思想是:

    一个方法,计算过程可能非常耗时,等待方法返回,显然不明智。可以在调用方法的时候,立马返回一个 Future,可以通过 Future 这个数据结构去控制方法 f 的计算过程。

  Future 类位于 java.util.concurrent 包下,它是一个接口:这里的控制包括:

    get 方法:获取计算结果(如果还没计算完,也是必须等待的)这个方法会产生阻塞,会一直等到任务执行完毕才返回;

    get (long timeout, TimeUnit unit) 用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回 null。

    cancel 方法:还没计算完,可以取消计算过程,如果取消任务成功则返回 true,如果取消任务失败则返回 false。参数 mayInterruptIfRunning 表示是否允许取消正在执行却没有执行完毕的任务,如果设置 true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论 mayInterruptIfRunning 为 true 还是 false,此方法肯定返回 false,即如果取消已经完成的任务会返回 false;如果任务正在执行,若 mayInterruptIfRunning 设置为 true,则返回 true,若 mayInterruptIfRunning 设置为 false,则返回 false;如果任务还没有执行,则无论 mayInterruptIfRunning 为 true 还是 false,肯定返回 true。

    isDone 方法:判断是否计算完

    isCancelled 方法:判断计算是否被取消,方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。

  也就是说 Future 提供了三种功能:

    1)判断任务是否完成;

    2)能够中断任务;

    3)能够获取任务执行结果。

  Future 就是对于具体的 Runnable 或者 Callable 任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过 get 方法获取执行结果,该方法会阻塞直到任务返回结果。

  因为 Future 只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的 FutureTask。

使用 Callable+Future 获取执行结果:

public class Test {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        Future<Integer> result = executor.submit(task);
        executor.shutdown();
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主线程在执行任务");
         
        try {
            System.out.println("task运行结果"+result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        System.out.println("所有任务执行完毕");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}
View Code

1.4、FutureTask 类

FutureTask 继承体系中的核心接口是 Future。事实上,FutureTask 是 Future 接口的一个唯一实现类。

如何获取 Callable 的返回结果:一般是通过 FutureTask 这个中间媒介来实现的。整体的流程是这样的:

把 Callable 实例当作参数,生成一个 FutureTask 的对象,然后把这个对象当作一个 Runnable,作为参数另起线程。

1.4.1、FutureTask 结构

  

1.4.2、FutureTask 使用

方式一、使用 thread 方式

  FutureTask 实现了 Runnable,因此它既可以通过 Thread 包装来直接执行,也可以提交给 ExecuteService 来执行。以下使用 Thread 包装线程方式启动

public static void main(String[] args) throws Exception {
        Callable<Integer> call = () -> {
            System.out.println("计算线程正在计算结果...");
            Thread.sleep(3000);
            return 1;
        };
        FutureTask<Integer> task = new FutureTask<>(call);
        Thread thread = new Thread(task);
        thread.start();

        System.out.println("main线程干点别的...");

        Integer result = task.get();
        System.out.println("从计算线程拿到的结果为:" + result);
    }

方式二、使用 ExecutorService

   ExecutorService executor = Executors.newFixedThreadPool (2); 线程池方式

public static void main(String[] args) {
        Callable<String> callable1=()->{
            Thread.sleep(2000);
            return Thread.currentThread().getName();
        };
        Callable<String> callable2=()->{
            Thread.sleep(3000);
            return Thread.currentThread().getName();
        };
        FutureTask<String> futureTask1 = new FutureTask<>(callable1);// 将Callable写的任务封装到一个由执行者调度的FutureTask对象
        FutureTask<String> futureTask2 = new FutureTask<>(callable2);

        ExecutorService executor = Executors.newFixedThreadPool(2);        // 创建线程池并返回ExecutorService实例
        executor.execute(futureTask1);  // 执行任务
        executor.execute(futureTask2);
        //同时开启了两个任务
        long startTime = System.currentTimeMillis();
        while (true) {
            try {
                if(futureTask1.isDone() && futureTask2.isDone()){//  两个任务都完成
                    System.out.println("Done");
                    executor.shutdown();                          // 关闭线程池和服务
                    return;
                }

                if(!futureTask1.isDone()){ // 任务1没有完成,会等待,直到任务完成
                    System.out.println("FutureTask1 output="+futureTask1.get());
                }

                System.out.println("Waiting for FutureTask2 to complete");
                String s = futureTask2.get(200L, TimeUnit.MILLISECONDS);
                if(s !=null){
                    System.out.println("FutureTask2 output="+s);
                }
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }catch(TimeoutException e){
                //do nothing
            }
            System.out.println((System.currentTimeMillis()-startTime));
        }
    }
View Code

使用 Callable+FutureTask 获取执行结果

public class Test {
    public static void main(String[] args) {
        //第一种方式
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        executor.submit(futureTask);
        executor.shutdown();
         
        //第二种方式,注意这种方式和第一种方式效果是类似的,只不过一个使用的是ExecutorService,一个使用的是Thread
        /*Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        Thread thread = new Thread(futureTask);
        thread.start();*/
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主线程在执行任务");
         
        try {
            System.out.println("task运行结果"+futureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        System.out.println("所有任务执行完毕");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}
View Code

1.5、CompletionService

原理:内部通过阻塞队列 + FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序。

package com.lhx.cloud.futruetask;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class CompletionServiceDemo {
    public static void main(String[] args)  {
        Long start = System.currentTimeMillis();
        //开启5个线程
        ExecutorService exs = Executors.newFixedThreadPool(5);
        try {
            int taskCount = 10;
            //结果集
            List<Integer> list = new ArrayList<>();
            //1.定义CompletionService
            CompletionService<Integer> completionService = new ExecutorCompletionService<>(exs);
            List<Future<Integer>> futureList = new ArrayList<>();
            //2.添加任务
            for(int i=0;i<taskCount;i++){
                futureList.add(completionService.submit(new Task(i+1)));
            }
            //==================结果归集===================
            //方法1:future是提交时返回的,遍历queue则按照任务提交顺序,获取结果
//            for (Future<Integer> future : futureList) {
//                System.out.println("====================");
//                Integer result = future.get();//线程在这里阻塞等待该任务执行完毕,按照
//                System.out.println("任务result="+result+"获取到结果!"+new Date());
//                list.add(result);
//            }

//            //方法2.使用内部阻塞队列的take()
            for(int i=0;i<taskCount;i++){
                Integer result = completionService.take().get();//采用completionService.take(),内部维护阻塞队列,任务先完成的先获取到
                System.out.println(LocalDateTime.now()+"---任务i=="+result+"完成!");
                list.add(result);
            }
            System.out.println("list="+list);
            System.out.println("总耗时="+(System.currentTimeMillis()-start));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();//关闭线程池
        }

    }


    static class Task implements Callable<Integer>{
        Integer i;

        public Task(Integer i) {
            super();
            this.i=i;
        }

        @Override
        public Integer call() throws Exception {
            if(i==5){
                Thread.sleep(5000);
            }else{
                Thread.sleep(1000);
            }
            System.out.println("线程:"+Thread.currentThread().getName()+"任务i="+i+",执行完成!");
            return i;
        }

    }
}
View Code

建议:使用率也挺高,而且能按照完成先后排序,建议如果有排序需求的优先使用。只是多线程并发执行任务结果归集,也可以使用。

二、CompletableFuture

2.1、对标 Futrue

  Future 接口,用于描述一个异步计算的结果。虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。

  阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式呢?即当计算结果完成及时通知监听者。

    Future 局限性,它很难直接表述多个 Future 结果之间的依赖性。

2.2、类图

  

2.2.1、CompletionStage

  • CompletionStage 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段

  • 一个阶段的计算执行可以是一个 Function,Consumer 或者 Runnable。比如:stage.thenApply (x -> square (x)).thenAccept (x -> System.out.print (x)).thenRun (() -> System.out.println ())

  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发

2.2.2、Future

2.3、创建 CompletableFuture 对象

  CompletableFuture.compleatedFuture 是一个静态辅助方法,用来返回一个已经计算好的 CompletableFuture.

  以下四个静态方法用来为一段异步执行的代码创建 CompletableFuture 对象:

public static CompletableFuture<Void>     runAsync(Runnable runnable)
public static CompletableFuture<Void>     runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U>     supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U>     supplyAsync(Supplier<U> supplier, Executor executor)

  以 Async 结尾并且没有指定 Executor 的方法会使用 ForkJoinPool.commonPool () 作为它的线程池执行异步代码。

  runAsync 方法:它以 Runnabel 函数式接口类型为参数,所以 CompletableFuture 的计算结果为空。

  supplyAsync 方法以 Supplier<U> 函数式接口类型为参数,CompletableFuture 的计算结果类型为 U。

  注意:这些线程都是 Daemon 线程,主线程结束 Daemon 线程不结束,只有 JVM 关闭时,生命周期终止。

示例:简单同步用法

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            //长时间的计算任务
            try {
                System.out.println("计算型任务开始");
                Thread.sleep(2000);
                return "计算型任务结束";
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "·00";
        });
        System.out.println(future.get());
    }
View Code

2.4、计算结果完成时的处理

当 CompletableFuture 的计算结果完成,或者抛出异常的时候,可以执行特定的 Action。主要是下面的方法:

public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)  

  可以看到 Action 的类型是 BiConsumer<? super T,? super Throwable> 它可以处理正常的计算结果,或者异常情况。

  方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

示例:

package com.lhx.cloud.futruetask;

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class BasicFuture {

    private static Random rand = new Random();
    private static long t = System.currentTimeMillis();

    static int getMoreData()  {
        System.out.println("begin to start compute");
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("end to compute,passed " + (System.currentTimeMillis()-t));
        return rand.nextInt(1000);
    }


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(BasicFuture::getMoreData);
        Future<Integer> f = future.whenComplete((v, e) -> {
            System.out.println(v);
            System.out.println(e);
        });
        System.out.println(f.get());
    }}
View Code

2.5、转换

CompletableFuture 可以作为 monad (单子) 和 functor. 由于回调风格的实现,我们不必因为等待一个计算完成而阻塞着调用线程,而是告诉 CompletableFuture 当计算完成的时候请执行某个 Function. 还可以串联起来。

public <U> CompletableFuture<U>     thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

2.6、异常处理 completeExceptionally

  为了能获取任务线程内发生的异常,需要使用 CompletableFuture 的 completeExceptionally 方法将导致 CompletableFuture 内发生问题的异常抛出。

  这样,当执行任务发生异常时,调用 get() 方法的线程将会收到一个 ExecutionException 异常,该异常接收了一个包含失败原因的 Exception 参数。

/**
     * 任务没有异常 正常执行,然后结束
     */
    @Test
    public void test1() throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 告诉completableFuture任务已经完成
            completableFuture.complete("ok");
        }).start();
        // 获取任务结果,如果没有完成会一直阻塞等待
        String result = completableFuture.get();
        System.out.println("计算结果:" + result);
    }

    /**
     * 线程有异常  正常执行,然后无法结束,主线程会一直等待
     */
    @Test
    public void test2() throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
                int i=1/0;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 告诉completableFuture任务已经完成
            completableFuture.complete("ok");
        }).start();
        // 获取任务结果,如果没有完成会一直阻塞等待
        String result = completableFuture.get();
        System.out.println("计算结果:" + result);
    }
View Code
/**
     * 线程有异常  正常执行,然后通过completableFuture.completeExceptionally(e);告诉completableFuture任务发生异常了
     * 主线程接收到 程序继续处理,至结束
     */
    @Test
    public void test3() throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
                int i = 1/0;
            } catch (Exception e) {
                // 告诉completableFuture任务发生异常了
                completableFuture.completeExceptionally(e);
            }
            // 告诉completableFuture任务已经完成
            completableFuture.complete("ok");
        }).start();
        // 获取任务结果,如果没有完成会一直阻塞等待
        String result = completableFuture.get();
        System.out.println("计算结果:" + result);
    }

2.7、多任务组合方法 allOf 和 anyOf

allOf 是等待所有任务完成,构造后 CompletableFuture 完成

anyOf 是只要有一个任务完成,构造后 CompletableFuture 就完成

package com.lhx.cloud.futruetask;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureDemo {
    public static void main(String[] args) {
        Long start = System.currentTimeMillis();
        // 结果集
        List<String> list = new ArrayList<>();

        ExecutorService executorService = Executors.newFixedThreadPool(10);

        List<Integer> taskList = Arrays.asList(2, 1, 3, 4, 5, 6, 7, 8, 9, 10);
        // 全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取
        CompletableFuture[] cfs = taskList.stream()
                .map(integer -> CompletableFuture.supplyAsync(() -> calc(integer), executorService)
                        .thenApply(h -> Integer.toString(h))
                        .whenComplete((s, e) -> {
                            System.out.println(LocalDateTime.now()+"---任务" + s + "完成!result=" + s + ",异常 e=" + e);
                            list.add(s);
                        })
                ).toArray(CompletableFuture[]::new);
        // 封装后无返回值,必须自己whenComplete()获取
        CompletableFuture.allOf(cfs).join();
        System.out.println("list=" + list + ",耗时=" + (System.currentTimeMillis() - start));
    }

    public static Integer calc(Integer i) {
        try {
            if (i == 1) {
                Thread.sleep(3000);//任务1耗时3秒
            } else if (i == 5) {
                Thread.sleep(5000);//任务5耗时5秒
            } else {
                Thread.sleep(1000);//其它任务耗时1秒
            }
            System.out.println(LocalDateTime.now()+"---task线程:" + Thread.currentThread().getName()
                    + "任务i=" + i + ",完成!" );
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }
}
View Code

2.8、常用多线程并发,取结果归集的几种实现方案

描述 Future FutureTask CompletionService CompletableFuture
原理 Future 接口 接口 RunnableFuture 的唯一实现类,RunnableFuture 接口继承自 Future+Runnable 内部通过阻塞队列 + FutureTask 接口 JDK8 实现了 Future, CompletionStage 两个接口
多任务并发执行 支持 支持 支持 支持
获取任务结果的顺序 按照提交顺序获取结果 未知 支持任务完成的先后顺序 支持任务完成的先后顺序
异常捕捉 自己捕捉 自己捕捉 自己捕捉 原生 API 支持,返回每个任务的异常
建议 CPU 高速轮询,耗资源,或者阻塞,可以使用,但不推荐 功能不对口,并发任务这一块多套一层,不推荐使用 推荐使用,没有 JDK8CompletableFuture 之前最好的方案 API 极端丰富,配合流式编程,推荐使用!

上表来源:https://www.cnblogs.com/dennyzhangdd/p/7010972.html

 

 

   

CompletableFuture allof(..)join()与CompletableFuture.join()

CompletableFuture allof(..)join()与CompletableFuture.join()

我目前正在使用CompletableFuture supplyAsync()方法向公共线程池提交一些任务。以下是代码片段的样子:

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
        .map(resolver -> supplyAsync(() -> task.doWork()))
        .collect(toList());

CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()])).join();

final List<Test> tests = new ArrayList<>();
completableFutures.stream()
        .map(completableFuture -> completableFuture.getNow())
        .forEach(tests::addAll);

我想知道下面与上面的代码有何不同。我从下面的代码中删除了父completableFuture,并为每个completableFuture添加了join()而不是getNow():

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
        .map(resolver -> supplyAsync(() -> task.doWork()))
        .collect(toList());

final List<Test> tests = new ArrayList<>();
completableFutures.stream()
        .map(completableFuture -> completableFuture.join())
        .forEach(tests::addAll);

我在spring服务中使用了它,线程池耗尽有问题。任何指针深表赞赏。

CompletableFuture allof(..)。join()与CompletableFuture.join()

CompletableFuture allof(..)。join()与CompletableFuture.join()

我目前正在使用CompletableFuture supplyAsync()方法向公共线程池提交一些任务。以下是代码片段的样子:

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
        .map(resolver -> supplyAsync(() -> task.doWork()))
        .collect(toList());

CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()])).join();

final List<Test> tests = new ArrayList<>();
completableFutures.stream()
        .map(completableFuture -> completableFuture.getNow())
        .forEach(tests::addAll);

我想知道下面与上面的代码有何不同。我从下面的代码中删除了父completableFuture,并为每个completableFuture添加了join()而不是getNow():

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
        .map(resolver -> supplyAsync(() -> task.doWork()))
        .collect(toList());

final List<Test> tests = new ArrayList<>();
completableFutures.stream()
        .map(completableFuture -> completableFuture.join())
        .forEach(tests::addAll);

我在spring服务中使用了它,线程池耗尽有问题。任何指针深表赞赏。

CompletableFuture CompletableFuture.supplyAsync 异常处理

CompletableFuture CompletableFuture.supplyAsync 异常处理

CompletableFuture 异常处理completeExceptionally可以把异常抛到主线程
/**
 * User: laizhenwei
 * Date: 2018-01-30 Time: 22:26
 * Description:
 */
@RunWith(SpringRunner.class)
//@SpringBootTest
public class CompletableFutureTests {

    @Test
    public void testMethod() {

        String[] orders = {"1", "2", "3", "4", "5", "6"};

        List<CompletableFuture<Boolean>> futures = new ArrayList<>();

        Arrays.stream(orders).forEach(id -> {
            try{
                futures.add(submitAsync(id));
            }catch (Exception ex){
                System.out.println(ex);
            }
        });

        futures.stream().forEach(f-> {
            try {
                System.out.println(f.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

    private static Boolean submit(String order) {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        throw new RuntimeException("抛一个异常" + order);
    }

    private static CompletableFuture<Boolean> submitAsync(String order) {
        CompletableFuture<Boolean> future = new CompletableFuture<>();
        new Thread(() -> {
            try {
                Boolean result = submit(order);
                future.complete(result);
            } catch (Exception ex) {
                future.completeExceptionally(ex);
            }
        }).start();
        return future;
    }

}

 

使用 CompletableFuture.supplyAsync  简化代码 加入线程池,exceptionally处理异常

/**
 * User: laizhenwei
 * Date: 2018-01-30 Time: 22:26
 * Description:
 */
@RunWith(SpringRunner.class)
//@SpringBootTest
public class CompletableFutureTests {

    ExecutorService executor = Executors.newFixedThreadPool(3);

    @Test
    public void testMethod() {
        String[] orders = {"1", "2", "3", "4", "5", "6"};
        Arrays.stream(orders).forEach(id -> CompletableFuture.supplyAsync(() -> submit(id), executor).exceptionally(e -> {
            System.out.println(e);
            return false;
        }));

        executor.shutdown();
        while (!executor.isTerminated()) {
            try {
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private static Boolean submit(String order) {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        throw new RuntimeException("抛一个异常" + order);
    }

}

 

今天关于CompletableFuture的完成处理程序在哪个线程中执行?completablefuture 线程池的介绍到此结束,谢谢您的阅读,有关005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture、CompletableFuture allof(..)join()与CompletableFuture.join()、CompletableFuture allof(..)。join()与CompletableFuture.join()、CompletableFuture CompletableFuture.supplyAsync 异常处理等更多相关知识的信息可以在本站进行查询。

本文标签: