Executor框架主要由3大部分组成

作者:美狮美高梅官方网站

策略

Java并发编程系列之十五:Executor框架

Java使用线程完成异步任务是很普遍的事,而线程的创建与销毁需要一定的开销,如果每个任务都需要创建一个线程将会消耗大量的计算资源,JDK 5之后把工作单元和执行机制区分开了,工作单元包括Runnable和Callable,而执行机制则由Executor框架提供。Executor框架为线程的启动、执行和关闭提供了便利,底层使用线程池实现。使用Executor框架管理线程的好处在于简化管理、提高效率,还能避免this逃逸问题——是指不完整的对象被线程调用。

Executor框架使用了两级调度模型进行线程的调度。在上层,Java多线程程序通常把应用分解为多个任务,然后使用用户调度框架Executor将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。

Executor框架包括线程池,Executor,Executors,ExecutorService,CompletionService,Future,C
allable 等。

主线程首先通过Runnable或者Callable接口创建任务对象。工具类Executors可以把一个Runnable对象封装为Callable对象(通过调用Executors.callable(Runnable task)实现),然后可以把Runnable对象直接交给ExecutorService执行,ExecutorService通过调用ExecutorService.execute(Runnable command)完成任务的执行;或者把Runnable对象或Callable对象交给ExecutorService执行,ExecutorService通过调用ExecutorService.submit(Runnable task)或者ExecutorService.submit(Callable task)完成任务的提交。在使用ExecutorService的submit方法的时候会返回一个实现Future接口的对象(目前返回的是FutureTask对象)。由于FutureTask实现了Runnable,也可以直接创建FutureTask,然后交给ExecutorService执行。

ExecutorService 接口继承自 Executor 接口,它提供了更丰富的实现多线程的方法。比如可以调用 ExecutorService 的 shutdown()方法来平滑地关闭 ExecutorService,调用该方法后,将导致 ExecutorService 停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭 ExecutorService。

通过Executors工具类可以创建不同的线程池ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool和CachedThreadPool。

FixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newFixedThreadPool(int nThreads,ThreadFactory factory)

FixedThreadPool适用于为了满足管理资源的需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器。

SingleThreadExecutor

public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor(ThreadFactory factory)

SingleThreadExecutor适用于需要保证顺序地执行各个任务,并且在任意时间点不会有多个线程在活动的场景。

CachedThreadPool

public static ExecutorService newCachedThreadPool()
public static ExecutorService newCachedThreadPool(ThreadFactory factory)

CachedThreadPool是大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者负载比较轻的服务器。

ScheduledThreadPoolExecutor

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize,ThreadFactory factory)

创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。ScheduledThreadPoolExecutor适用于需要在多个后台线程执行周期任务,同时为了满足资源管理需求需要限制后台线程数量的应用场景。

Executor框架的最核心的类是ThreadPoolExecutor,它是线程池的实现类,主要由四个组件构成。

corePool:核心线程池的大小 maximumPool:最大线程池的大小 BlockingQueue:用来暂时保存任务的工作队列 RejectedExecutionHandler:饱和策略。当ThreadPoolExecutor已经关闭或者ThreadPoolExecutor已经饱和时(是指达到了最大线程池的大小且工作队列已满),execute方法将要调用的Handler

 

使用Executor框架执行Runnable任务

package com.rhwayfun.concurrency;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by rhwayfun on 16-4-4.
 */
public class ExecutorRunnableTest {

    static class Runner implements Runnable{
        public void run() {
            System.out.println(Thread.currentThread().getName() + " is called");
        }
    }

    public static void main(String[] args){
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++){
            cachedThreadPool.execute(new Runner());
        }
        cachedThreadPool.shutdown();
    }
}

结果如下:

图片 1

通过下面对CachedThreadPool的分析就能知道执行任务的时候首先会从线程池选择空闲的线程执行任务,如果没有没有空闲的线程就会创建一个新的线程执行任务。这里出现同一个线程执行两遍的原因在于第一次执行任务的空闲线程执行完任务后不会马上终止,认识等待60秒才会终止。

使用Executor框架执行Callable任务

Runnable 任务没有返回值而 Callable 任务有返回值。并且 Callable 的call()方法只能通过 ExecutorService 的 submit(Callable task) 方法来执行,并且返回一个 Future(目前是FutureTask),是表示任务等待完成的 Future。如果需要得到Callable执行返回的结果,可以通过吊桶FutureTask的get方法得到。

下面的代码演示使用Executor框架执行Callable任务:

package com.rhwayfun.concurrency;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
 * Created by rhwayfun on 16-4-4.
 */
public class ExecutorCallableTest {

    /**
     * Callable任务
     */
    static class Runner implements Callable {

        private String runId;

        public Runner(String runId) {
            this.runId = runId;
        }

        public String call() throws Exception {
            System.out.println(Thread.currentThread().getName() + " call method is invoked!");
            return Thread.currentThread().getName() + " call method and id is " + runId;
        }
    }

    public static void main(String[] args) {
        //线程池
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        //接收Callable任务的返回结果
        List> futureTaskList = new ArrayList>();

        for (int i = 0; i < 5; i++) {
            Future future = cachedThreadPool.submit(new Runner(String.valueOf(i)));
            futureTaskList.add(future);
        }

        //遍历线程执行的返回结果
        for (Future f : futureTaskList) {
            try {
                //如果任务没有完成则忙等待
                while (!f.isDone()) {}
                System.out.println(f.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            } finally {
                //关闭线程池,不再接收新的任务
                cachedThreadPool.shutdown();
            }
        }
    }
}

程序的运行结果如下:

图片 2

submit 方法也是首先选择空闲线程来执行任务,如果没有,才会创建新的线程来执行任务。如果 Future 的返回尚未完成则 get()方法会阻塞等待直到 Future 完成返回。  

平时工作中经常碰到个各种多线程,有时候搞不清它们之间到底有什么区别,这次来个总体的总结,主要是以下这些:
Executor,Executors,ExecutorService, CompletionServie,Future,Callable,Runnable,FutureTask

  1. 当线程数量未达到核心线程数量,直接启动一个核心线程来执行任务。
  2. 如果线程数量已达到或者超过核心线程的数量,任务被插入到任务队列中排队执行。
  3. 如果在步骤2中无法将任务插入到任务队列中,这往往是由于任务队列已满,这时候如果线程数量未达到线程池规定的最大值,会立刻启动一个非核心线程来执行任务。
  4. 如果步骤3中线程数量达到最大值,就拒绝执行此任务。

FixedThreadPool详解

创建FixedThreadPool的源码如下:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue());
    }

其corePoolSize和maximumPoolSize都被设为nThreads的值。当线程池中的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止。具体在FixedThreadPool的执行过程如下:

如果当前运行的线程数少于corePoolSize,就创建新的线程执行任务 在线程池如果当前运行的线程数等于corePoolSize时,将任务加入到LinkedBlockingQueue等待执行 线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行

由于LinkedBlockingQueue使用的无界队列,所以线程池中线程数不会超过corePoolSize,因此不断加入线程池中的任务将被执行,因为不会马上被执行的任务都加入到LinkedBlockingQueue等待了。

一、Runnable(interface)

public interface Runnable {
    public void run();
}

run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。

模式并发编程中经常使用消费者和生产者模式,通过一个容器来解决生产者和消费者的强耦合问题。大多数设计模式都会找出一个第三者进行解耦。

CachedThreadPool详解

CachedThreadPool是一个根据需要创建线程的线程池。创建一个CachedThreadPool的源码如下:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue());
    }

由源码可以看出,CachedThreadPool的corePoolSize为0,maximumPoolSize为Integer.MAX_VALUE,keepAliveTime为60L,意味着多余的空闲线程等待新任务的执行时间为60秒。

CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列(SynchronousQueue是一个没有容量的阻塞队列,每个插入操作必须等待另一个线程的对应移除操作),但是CachedThreadPool的maximumPool是无界的。这就意味着如果线程的提交速度高于线程的处理速度,CachedThreadPool会不断创建线程,极端情况是因为创建线程过多耗尽CPU和内存资源。

CachedThreadPool的执行过程如下:

首先执行SynchronousQueue的offer方法。如果maximumPool有空闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行offer操作与空闲线程的poll操作配对成功,主线程把任务交给空闲线程执行,否则执行2 如果maximumPool为空或者maximumPool没有空闲线程时,CachedThreadPool将会创建一个新线程执行任务 在步骤2新创建的线程将任务执行完后,将会在SynchronousQueue队列中等待60秒,如果60秒内主线程提交了新任务,那么将继续执行主线程提交的新任务,否则会终止该空闲线程。

二、Callable (interface)

public interface Callable<V> {
    V call() throws Exception;
}

与 Runnable 不同的是call()函数返回的类型就是传递进来的V类型,而且能够抛出异常。一般情况下是配合ExecutorService来使用的

优点

ScheduledThreadPoolExecutor详解

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,主要用来在给定的延迟之后执行任务,或者定期运行任务。Timer类也具有类似的功能,Timer对应的单个的后台线程,而ScheduledThreadPoolExecutor可以在构造函数内指定多个对应的后台线程。

ScheduledThreadPoolExecutor为了支持周期性任务的执行,使用了DelayQueue作为任务队列。ScheduledThreadPoolExecutor会把待调度的任务(该任务是ScheduledFutureTask)放到DelayQueue中,线程池中的线程从DelayQueue中获取要执行的定时任务并执行。

ScheduledFutureTask包含了3个变量:

long型变量time,是任务具体的执行时间 long型变量sequenceNumber,是这个任务被添加到ScheduledThreadPoolExecutor中的序号 long型成员period,表示任务执行的间隔周期

下面是ScheduledThreadPoolExecutor具体的执行步骤:

线程从DelayQueue中获取已经到期的ScheduledFutureTask。到期任务是指time大于等于当前时间的任务 线程执行这个过期任务 线程修改这个任务的time变量为下次执行的时间(当前时间加上间隔时间) 线程把修改后的任务放回DelayQueue,过期后会被重新执行

Java使用线程完成异步任务是很普遍的事,而线程的创建与销毁需要一定的开销,如果每个任务都需要...

三、Future( interface)

Future是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果、设置结果操作。

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
  • cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
  • isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
  • isDone方法表示任务是否已经完成,若任务完成,则返回true;
  • get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
  • get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。

也就是说Future提供了三种功能:

  • 判断任务是否完成;
  • 能够中断任务;
  • 能够获取任务执行结果。
  1. 重用线程池中的线程
  2. 有效控制线程池的最大并发数
  3. 对线程进行有效的管理,并提供定时执行及制定间隔循环执行等功能。

四、FutureTask(Runnable, Future<V>的具体实现)

public class FutureTask<V> implements RunnableFuture<V> 。。。

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

可以看出RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值,管理任务。
其中有两个构造方法

   //接受一个 Callable 参数
   public FutureTask(Callable<V> callable) {
       if (callable == null)
           throw new NullPointerException();
       this.callable = callable;
       this.state = NEW;       // ensure visibility of callable
   }

   //接受一个 Runnable ,利用 Executors.callable 将Runnable 转换为Callable
   public FutureTask(Runnable runnable, V result) {
       this.callable = Executors.callable(runnable, result);
       this.state = NEW;       // ensure visibility of callable
   }

具体使用可以参考 AsyncTask 中的使用

备注如果已知晓策略,没有必要记优点。因为优点可直接由策略推导出来。

五、Executor(interface)

在Executor框架中,使用执行器(Exectuor)来管理Thread对象,从而简化了并发编程。并发编程的一种编程方式把任务拆分为一系列的小任务,即Runnable,然后将这些任务提交给一个Executor执行,Executor.execute(Runnalbe) 。Executor在执行时使用其内部的线程池来完成操作。

Executor 接口中之定义了一个方法 execute(Runnable command),该方法接收一个 Runable 实例,它用来执行一个任务,任务即一个实现了 Runnable 接口的类。

public interface Executor {
    void execute(Runnable command);
}

为了避免调用 new Thread(new RunnableTask()).start()这样的代码我们可以

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
...

Executor 并不是严格的要求一步执行,我们可以简单的直接在调用者线程执行运行提交的任务

class DirectExecutor implements Executor {
  public void execute(Runnable r) {
    r.run();    // 在调用者线程执行
}}

一般来说任务在非调用者的线程中执行,比如产生一个新的线程

class ThreadPerTaskExecutor implements Executor {
  public void execute(Runnable r) {
    new Thread(r).start();   //新启一个线程,在非调用者线程中执行
}}

有很多Executor 的实现是为了实现任务的某种调度,比如 AsyncTask 中的串行任务队列

class SerialExecutor implements Executor {
   final Queue tasks = new ArrayDeque<>();
   final Executor executor;
   Runnable active;

   SerialExecutor(Executor executor) {
     this.executor = executor;


   public synchronized void execute(final Runnable r) {
     tasks.add(new Runnable() {
       public void run() {
         try {
           r.run();
         } finally {
           scheduleNext();
         }
       }
     });
     if (active == null) {
       scheduleNext();
     }
   }

   protected synchronized void scheduleNext() {
     if ((active = tasks.poll()) != null) {
       executor.execute(active);
     }
   }
 }}

本文由美狮美高梅官方网站发布,转载请注明来源

关键词: