前言
肥壕最近在复习线程这一块知识, Executor
、ExecutorService
、ThreadPoolExecutor
这三兄弟总感觉很难辨认,每次看完后没过多久又会忘,所以今天特地来盘一下 Executor 框架。
正文
Executors 是在 JDK1.5 引入的,位于 java.util.concurrent
包下,其主要目的是简化线程调用,管理线程的生命周期(启动、执行、关闭)。
在 JDK1.5 之前我们使用线程的姿势是:
new Thread(new RunnableTask()).start()
JDK1.5之后呢,我们可以使用 Executor 直接执行的 Runnable 实现类:
// 1.创建具体的Executor对象
Executor ex = new MyExecutor
// 2.调用execute方法执行任务
ex.execute(new RunnableTaks())
这两种方式对比,很显然第二种更为优雅。我们的关注点是把任务
交给执行器
,至于任务的怎么执行我们是不需关心的,这也实现了任务
与调用者
之间的解耦。
我们先看一下 Executors 中各个类之间的依赖图:
-
Executor:定义方法
execute(Runnable command)
,该方法接收一个 Runable 实例 -
ExecutorService: 继承 Executor 接口,并提供了生命周期管理的方法,以及可以跟踪
异步任务
执行状况返回 Future 的方法 -
AbstractExecutorService:抽象类,实现 ExecutorService 接口
-
ThreadPoolExecutor:是 ExecutorService 的一个实现类,继承 AbstractExecutorService 。这是Java线程池最核心的一个类。主要功能是创建线程池,给任务分配线程资源,执行任务
-
ScheduledExecutorService:继承 ExecutorService 接口,定义了延迟执行和周期执行的方法
-
ScheduledThreadPoolExecutor:ScheduledExecutorService 的实现类,实现了延迟执行和周期执行的方法
-
Executors:静态工厂类,该类定义了一系列静态工厂方法,通过这些工厂方法可以创建不同类型的线程池
下面我们具体看一下每个类的属性和方法
Executor
void execute(Runnable command);
该接口主要的目的就是解耦任务处理机制中的
任务提交和
任务如何运行`(也包含线程的使用,调度)
ExecutorService
关闭线程池
void shutdown();
List<Runnable> shutdownNow();
提交线程任务
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
同步执行
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
AbstractExecutorService
我们重点关注一下这几个方法:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
可以看到 submit 方法能接收 Runnable 和 Callable 实例的参数。
submit 方法里面将 Runnable 和 Callable 再封装 RunnableFuture 对象,而 RunnableFuture 对实现类是 FutureTask
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
有关 Future 下面会再详细说明,这里呢就暂时只需要了解一下就好啦。
ThreadPoolExecutor
这是线程池核心类,要讲解的东西就比较多啦,感觉要开一篇有关线程池的文章重点讲解吧。我们这里只需要能应付面试就足够了。
看一下几个重要的属性:
// 核心线程数
private volatile int corePoolSize;
// 最大线程数
private volatile int maximumPoolSize;
// 超过核心线程数时闲置线程的存活时间
private volatile long keepAliveTime;
// 任务执行前保存任务的队列
private final BlockingQueue<Runnable> workQueue;
// 拒绝策略
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
核心方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
这个方法是无返回值的,那对于想跟踪任务的要怎么使用呢?
其实这里使用了模板方法, 上面说过 AbstractExecutorService 中提供了三个返回 Future 对象的 submit 方法,方法里面任务最终的执行是调用了execute()。所以要跟踪任务的话,直接调用 submit 方法即可。
这里简单区别一下 shutdown()
和 shutdownNow()
这两个方法
-
shutdown()
调用 shutdown() 方法后,线程池停止接收新的任务,但是已经 submit 的任务会等待执行完成。如果我们再向线程池中提交任务,将会抛
RejectedExecutionException
异常。如果线程池的 shutdown() 方法已经调用过,重复调用没有额外效应。注意,当我们调用 shutdown() 方法后,会立即从该方法中返回而不会阻塞等待线程池关闭再返回,如果希望阻塞等待可以调用awaitTermination()
方法。 -
shutdownNow()
shutdownNow() 方法和 shutdown() 方法基本一样,不同的是
- 等待队列中的任务不会执行,并直接返回这些等待执行的任务
- 尝试停止现在执行的任务,调用worker线程的interrupt方法去终止运行的任务。
Executors
工厂类,提供了不同类型的静态创建线程池的方法。
下面我们就看看 Executors 中的方法:
-
SingleThreadExecutor
线程池只有一个核心线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,
那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。- corePoolSize:1,只有一个核心线程在工作
- maximumPoolSize:1
- keepAliveTime:0L
- workQueue:new LinkedBlockingQueue
(),其缓冲队列是无界的
-
FixedThreadPool
线程池固定大小的线程池,只有核心线程。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。
线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
FixedThreadPool 多数针对一些很稳定很固定的正规并发线程。
-
corePoolSize:nThreads
-
maximumPoolSize:nThreads
-
keepAliveTime:0L
-
workQueue:new LinkedBlockingQueue
(),其缓冲队列是无界的。
-
-
CachedThreadPool
线程池无界线程池,如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60 秒不执行任务)线程,
当任务数增加时,此线程池又可以智能的添加新线程来处理任务。
线程池大小完全依赖于操作系统(或者说 JVM)能够创建的最大线程大小。SynchronousQueue 是一个是缓冲区为 1 的阻塞队列。
缓存型池子通常用于执行一些生存期很短的异步型任务,因此在一些面向连接的 daemon 型 SERVER 中用得不多。
但对于生存期短的异步任务,它是 Executor 的首选。
-
corePoolSize:0
-
maximumPoolSize:Integer.MAX_VALUE
-
keepAliveTime:60L
-
workQueue:new SynchronousQueue
(),一个是缓冲区为 1 的阻塞队列。
-
-
ScheduledThreadPool
线程池核心线程池固定,大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
创建一个周期性执行任务的线程池。如果闲置,非核心线程池会在 DEFAULT_KEEPALIVEMILLIS 时间内回收。
-
corePoolSize:corePoolSize
-
maximumPoolSize:Integer.MAX_VALUE
-
keepAliveTime:DEFAULT_KEEPALIVE_MILLIS
-
workQueue:new DelayedWorkQueue()
-
这个类肥壕在实际项目中没用过,阿里的规约中也是不提倡大家这样使用的。原因嘛我觉得可能是:
- 通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
- Executors 提供的方法可能会存在各种异常问题。比如 newFixedThreadPool 和 newSingleThreadExecutor 使用的是无界队列,堆积的等待任务可能会导致 OOM;newCachedThreadPool 和 newScheduledThreadPool 线程数最大数是 Integer.MAX_VALUE,也可能会导致 OOM。
总结
- Executor 框架的几个核心类有
Executor
、ExecutorService
、AbstractExecutorService
、ThreadPoolExecutor
(关于这几个类的依赖可以看上面的依赖图)Executor
、ExecutorService
接口定义了 execute()、submit() 方法。AbstractExecutorService
抽象类使用模板方法,提供了返回 Future 的 submit() 方法;ThreadPoolExecutor
是核心线程类,实现 execute() 任务提交的具体逻辑。 - Executors 是静态工厂类,可以创建不同类型的线程池。但是不建议在实际项目中使用,因为如果使用不当有可能会造成 OOM
这篇水文没有涉及到太多线程池相关的知识和具体源码,只是非常简单的梳理了一下 Executor 下这几个常见类的关系和实际的作用。有关线程池的具体的工作流程、还有 Future 跟踪异步之类的知识点,肥壕会在后续复习的时候一块梳理总结。噢,还有一个知识点:线程池任务异常的捕获,之前在这里踩过一个坑,后面也会一并分享出来。
完~
普通的改变,将改变普通
我是宅小年,一个在互联网低调前行的小青年
关注公众号「宅小年」,个人博客 📖 edisonz.cn,阅读更多分享文章