Executor看不懂?教你如何盘它

Executor看不懂?教你如何盘它

Scroll Down

前言

肥壕最近在复习线程这一块知识, ExecutorExecutorServiceThreadPoolExecutor 这三兄弟总感觉很难辨认,每次看完后没过多久又会忘,所以今天特地来盘一下 Executor 框架。

正文

Executors 是在 JDK1.5 引入的,位于 java.util.concurrent包下,其主要目的是简化线程调用,管理线程的生命周期(启动、执行、关闭)。

在 JDK1.5 之前我们使用线程的姿势是:

new Thread(new RunnableTask()).start()

JDK1.5之后呢,我们可以使用 Executor 直接执行的 Runnable 实现类:

// 1.创建具体的Executor对象
Executor ex = new MyExecutor  
// 2.调用execute方法执行任务  
ex.execute(new RunnableTaks())

这两种方式对比,很显然第二种更为优雅。我们的关注点是把任务交给执行器,至于任务的怎么执行我们是不需关心的,这也实现了任务调用者之间的解耦。

我们先看一下 Executors 中各个类之间的依赖图:

  • Executor:定义方法 execute(Runnable command),该方法接收一个 Runable 实例

  • ExecutorService: 继承 Executor 接口,并提供了生命周期管理的方法,以及可以跟踪异步任务执行状况返回 Future 的方法

  • AbstractExecutorService:抽象类,实现 ExecutorService 接口

  • ThreadPoolExecutor:是 ExecutorService 的一个实现类,继承 AbstractExecutorService 。这是Java线程池最核心的一个类。主要功能是创建线程池,给任务分配线程资源,执行任务

  • ScheduledExecutorService:继承 ExecutorService 接口,定义了延迟执行和周期执行的方法

  • ScheduledThreadPoolExecutor:ScheduledExecutorService 的实现类,实现了延迟执行和周期执行的方法

  • Executors:静态工厂类,该类定义了一系列静态工厂方法,通过这些工厂方法可以创建不同类型的线程池

下面我们具体看一下每个类的属性和方法

Executor

void execute(Runnable command);

该接口主要的目的就是解耦任务处理机制中的任务提交任务如何运行`(也包含线程的使用,调度)

ExecutorService

关闭线程池

void shutdown();
List<Runnable> shutdownNow();

提交线程任务

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

同步执行

boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; 

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

AbstractExecutorService

我们重点关注一下这几个方法:

public Future<?> submit(Runnable task) {
  if (task == null) throw new NullPointerException();
  RunnableFuture<Void> ftask = newTaskFor(task, null);
  execute(ftask);
  return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
  if (task == null) throw new NullPointerException();
  RunnableFuture<T> ftask = newTaskFor(task, result);
  execute(ftask);
  return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
  if (task == null) throw new NullPointerException();
  RunnableFuture<T> ftask = newTaskFor(task);
  execute(ftask);
  return ftask;
}

可以看到 submit 方法能接收 RunnableCallable 实例的参数。

submit 方法里面将 Runnable 和 Callable 再封装 RunnableFuture 对象,而 RunnableFuture 对实现类是 FutureTask

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
	return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
  return new FutureTask<T>(callable);
}

有关 Future 下面会再详细说明,这里呢就暂时只需要了解一下就好啦。

ThreadPoolExecutor

这是线程池核心类,要讲解的东西就比较多啦,感觉要开一篇有关线程池的文章重点讲解吧。我们这里只需要能应付面试就足够了。

看一下几个重要的属性:

// 核心线程数
private volatile int corePoolSize;
// 最大线程数
private volatile int maximumPoolSize;
// 超过核心线程数时闲置线程的存活时间
private volatile long keepAliveTime;
// 任务执行前保存任务的队列
private final BlockingQueue<Runnable> workQueue;
// 拒绝策略
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

核心方法:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

这个方法是无返回值的,那对于想跟踪任务的要怎么使用呢?

其实这里使用了模板方法, 上面说过 AbstractExecutorService 中提供了三个返回 Future 对象的 submit 方法,方法里面任务最终的执行是调用了execute()。所以要跟踪任务的话,直接调用 submit 方法即可。

这里简单区别一下 shutdown()shutdownNow() 这两个方法

  • shutdown()

    调用 shutdown() 方法后,线程池停止接收新的任务,但是已经 submit 的任务会等待执行完成。如果我们再向线程池中提交任务,将会抛 RejectedExecutionException 异常。如果线程池的 shutdown() 方法已经调用过,重复调用没有额外效应。注意,当我们调用 shutdown() 方法后,会立即从该方法中返回而不会阻塞等待线程池关闭再返回,如果希望阻塞等待可以调用 awaitTermination() 方法

  • shutdownNow()

    shutdownNow() 方法和 shutdown() 方法基本一样,不同的是

    1. 等待队列中的任务不会执行,并直接返回这些等待执行的任务
    2. 尝试停止现在执行的任务,调用worker线程的interrupt方法去终止运行的任务。

Executors

工厂类,提供了不同类型的静态创建线程池的方法。

下面我们就看看 Executors 中的方法:

  • SingleThreadExecutor线程池

    只有一个核心线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,
    那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

    • corePoolSize:1,只有一个核心线程在工作
    • maximumPoolSize:1
    • keepAliveTime:0L
    • workQueue:new LinkedBlockingQueue(),其缓冲队列是无界的
  • FixedThreadPool线程池

    固定大小的线程池,只有核心线程。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。

    线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

    FixedThreadPool 多数针对一些很稳定很固定的正规并发线程

    • corePoolSize:nThreads

    • maximumPoolSize:nThreads

    • keepAliveTime:0L

    • workQueue:new LinkedBlockingQueue(),其缓冲队列是无界的

  • CachedThreadPool线程池

    无界线程池,如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60 秒不执行任务)线程,

    当任务数增加时,此线程池又可以智能的添加新线程来处理任务。

    线程池大小完全依赖于操作系统(或者说 JVM)能够创建的最大线程大小。SynchronousQueue 是一个是缓冲区为 1 的阻塞队列。

    缓存型池子通常用于执行一些生存期很短的异步型任务,因此在一些面向连接的 daemon 型 SERVER 中用得不多。

    但对于生存期短的异步任务,它是 Executor 的首选

    • corePoolSize:0

    • maximumPoolSize:Integer.MAX_VALUE

    • keepAliveTime:60L

    • workQueue:new SynchronousQueue(),一个是缓冲区为 1 的阻塞队列

  • ScheduledThreadPool线程池

    核心线程池固定大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

    创建一个周期性执行任务的线程池。如果闲置,非核心线程池会在 DEFAULT_KEEPALIVEMILLIS 时间内回收。

    • corePoolSize:corePoolSize

    • maximumPoolSize:Integer.MAX_VALUE

    • keepAliveTime:DEFAULT_KEEPALIVE_MILLIS

    • workQueue:new DelayedWorkQueue()

这个类肥壕在实际项目中没用过,阿里的规约中也是不提倡大家这样使用的。原因嘛我觉得可能是:

  1. 通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
  2. Executors 提供的方法可能会存在各种异常问题。比如 newFixedThreadPool 和 newSingleThreadExecutor 使用的是无界队列,堆积的等待任务可能会导致 OOM;newCachedThreadPool 和 newScheduledThreadPool 线程数最大数是 Integer.MAX_VALUE,也可能会导致 OOM。

总结

  1. Executor 框架的几个核心类有 ExecutorExecutorServiceAbstractExecutorServiceThreadPoolExecutor(关于这几个类的依赖可以看上面的依赖图)ExecutorExecutorService接口定义了 execute()、submit() 方法。AbstractExecutorService抽象类使用模板方法,提供了返回 Future 的 submit() 方法;ThreadPoolExecutor是核心线程类,实现 execute() 任务提交的具体逻辑。
  2. Executors 是静态工厂类,可以创建不同类型的线程池。但是不建议在实际项目中使用,因为如果使用不当有可能会造成 OOM

这篇水文没有涉及到太多线程池相关的知识和具体源码,只是非常简单的梳理了一下 Executor 下这几个常见类的关系和实际的作用。有关线程池的具体的工作流程、还有 Future 跟踪异步之类的知识点,肥壕会在后续复习的时候一块梳理总结。噢,还有一个知识点:线程池任务异常的捕获,之前在这里踩过一个坑,后面也会一并分享出来。

完~