随着计算机行业的飞速发展,摩尔定律逐渐失效,多核CPU成为主流。使用多线程并行计算逐渐成为开发人员提升服务器性能的基本武器。
J.U.C提供的线程池:ThreadPoolExecutor类,帮助开发人员管理线程并方便地执行并行任务。
了解并合理使用线程池,是一个开发人员必修的基本功。

带着BAT大厂的面试问题去理解

线程池连环17问

  • 为什么要有线程池?
  • Java是实现和管理线程池有哪些方式? 请简单举例如何使用。
  • 为什么很多公司不允许使用Executors去创建线程池? 那么推荐怎么使用呢?
  • ThreadPoolExecutor有哪些核心的配置参数? 请简要说明
  • ThreadPoolExecutor可以创建哪三种线程池呢?
  • 当队列满了并且worker的数量达到maxSize的时候,会怎么样?拒绝策略
    • 线程池为什么设计为核心线程数满了先入队列,而不是先创建最大线程?
    • 线程池为什么设计为【队列满+核心线程数满了】才创建新线程?而不是队列积压一定阈值的时候创建新的线程?
      线程池的本意只是让核心数量的线程工作,而任务队列起到一个缓冲的作用。
      最大线程数这个参数更像是无奈之举,在最坏的情况下做最后的努力,新建线程来帮助消化任务。
    • Tomcat重写逻辑
      原生版线程池的实现可以认为是偏向CPU密集的,也就是当任务过多的时候不是先去创建更多的线程,而是先缓存任务,让核心线程去消化。我们知道,当处理CPU密集型任务的时,线程太多反而会由于线程频繁切换的开销而得不偿失,所以优先堆积任务而不是创建新的线程。
      而像 Tomcat 这种业务场景,大部分情况下是需要大量 I/O 处理的情况就做了一些定制,修改了原生线程池的实现,使得在队列没满的时候,可以创建线程至最大线程数。
  • 说说ThreadPoolExecutor有哪些RejectedExecutionHandler策略? 默认是什么策略?
  • 简要说下线程池的任务执行机制? execute –> addWorker –>runworker (getTask)
  • 线程池中任务是如何提交的?
  • 线程池中任务是如何关闭的?
  • 在配置线程池的时候需要考虑哪些配置因素?
  • 如何监控线程池的状态,如何动态修改核心线程数和最大线程数?
    原生线程池ThreadPoolExecutor已经提供修改配置的方法,也对外暴露出线程池内部执行情况,所以只要我们实时监控情况,调用对应的set方法,即可动态修改线程池对应配置。
  • ThreadPoolExecutor中的锁
    • ReentrantLock mainLock,线程池整体层面的锁,控制线程池的核心状态变更
    • Worker.lock(),Worker类继承的AQS锁,每个Worker内部层面,用于控制每个Worker执行任务的串行性

为什么要有线程池

线程池能够对线程进行统一分配,调优和监控:

  • 降低资源消耗(线程无限制地创建,然后使用完毕后销毁)
  • 提高响应速度(无须创建线程)
  • 提高线程的可管理性

线程池是什么

线程池是一种通过“池化”思想,帮助我们管理线程而获取并发性的工具。

线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。

线程过多会带来额外的开销,其中包括创建/销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。

线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建/销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。

使用线程池可以带来一系列的好处:

  • 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  • 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
  • 可拓展性:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

线程池解决的问题是什么

线程池解决的核心问题是资源管理问题。

在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:

  1. 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
  2. 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
  3. 系统无法合理管理内部的资源分布,会降低系统的稳定性。

为解决资源分配这个问题,线程池采用了池化(Pooling)思想。池化,是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想。

“池化”思想不仅仅能应用在计算机领域,在金融、设备、人员管理、工作管理等领域也有相关的应用。
在计算机领域中的表现为:统一管理IT资源,包括服务器、存储、和网络资源等等。通过共享资源,使用户在低投入中获益。除去线程池,还有其他比较典型的几种使用策略包括:

  • 内存池(Memory Pooling):预先申请内存,提升申请内存速度,减少内存碎片。
  • 连接池(Connection Pooling):预先申请数据库连接,提升申请连接的速度,降低系统的开销。
  • 实例池(Object Pooling):循环使用对象,减少资源在初始化和释放时的昂贵损耗。

ThreadPoolExecutor例子

Java是如何实现和管理线程池的?
从JDK 5开始,把工作单元与执行机制分离开来,工作单元包括RunnableCallable,而执行机制由Executor框架提供。

  • WorkerThread
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    public class WorkerThread implements Runnable {

    private String command;

    public WorkerThread(String s){
    this.command=s;
    }

    @Override
    public void run() {
    System.out.println(Thread.currentThread().getName()+" Start. Command = "+command);
    processCommand();
    System.out.println(Thread.currentThread().getName()+" End.");
    }

    private void processCommand() {
    try {
    Thread.sleep(5000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }

    @Override
    public String toString(){
    return this.command;
    }
    }
  • SimpleThreadPool newFixedThreadPool使用示例
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    public class SimpleThreadPool {

    public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(5);
    for (int i = 0; i < 10; i++) {
    Runnable worker = new WorkerThread("" + i);
    executor.execute(worker);
    }
    executor.shutdown(); // This will make the executor accept no new threads and finish all existing threads in the queue
    while (!executor.isTerminated()) { // Wait until all threads are finish,and also you can use "executor.awaitTermination();" to wait
    }
    System.out.println("Finished all threads");
    }

    }
  • RejectedExecutionHandlerImpl
    自定义的 RejectedExecutionHandler 接口的实现
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadPoolExecutor;

    public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    System.out.println(r.toString() + " is rejected");
    }

    }
  • MyMonitorThread
    用一个监控线程在特定的时间间隔内打印 executor 信息。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    import java.util.concurrent.ThreadPoolExecutor;

    public class MyMonitorThread implements Runnable
    {
    private ThreadPoolExecutor executor;

    private int seconds;

    private boolean run=true;

    public MyMonitorThread(ThreadPoolExecutor executor, int delay)
    {
    this.executor = executor;
    this.seconds=delay;
    }

    public void shutdown(){
    this.run=false;
    }

    @Override
    public void run()
    {
    while(run){
    System.out.println(
    String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s",
    this.executor.getPoolSize(),
    this.executor.getCorePoolSize(),
    this.executor.getActiveCount(),
    this.executor.getCompletedTaskCount(),
    this.executor.getTaskCount(),
    this.executor.isShutdown(),
    this.executor.isTerminated()));
    try {
    Thread.sleep(seconds*1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }

    }
    }
  • WorkerPool
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;

    public class WorkerPool {

    public static void main(String args[]) throws InterruptedException{
    //RejectedExecutionHandler implementation
    RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
    //Get the ThreadFactory implementation to use
    ThreadFactory threadFactory = Executors.defaultThreadFactory();
    //creating the ThreadPoolExecutor
    ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), threadFactory, rejectionHandler);
    //start the monitoring thread
    MyMonitorThread monitor = new MyMonitorThread(executorPool, 3);
    Thread monitorThread = new Thread(monitor);
    monitorThread.start();
    //submit work to the thread pool
    for(int i=0; i<10; i++){
    executorPool.execute(new WorkerThread("cmd"+i));
    }

    Thread.sleep(30000);
    //shut down the pool
    executorPool.shutdown();
    //shut down the monitor thread
    Thread.sleep(5000);
    monitor.shutdown();

    }
    }

ThreadPoolExecutor使用详解

其实java线程池的实现原理很简单,说白了就是一个线程集合HashSet<Worker> workers和一个阻塞队列BlockingQueue<Runnable> workQueue

execute过程

当一个任务提交至线程池之后:

  1. 线程池首先判断当前运行的线程数量是否少于corePoolSize。如果是,则创建一个新的工作线程来执行任务;如果都在执行任务,则进入2
  2. 判断BlockingQueue是否已经满了,倘若还没有满,则将线程放入BlockingQueue。否则进入3
  3. 如果创建一个新的工作线程将使当前运行的线程数量超过maximumPoolSize,则交给RejectedExecutionHandler来处理任务。

当ThreadPoolExecutor创建新线程时,通过CAS来更新线程池的状态ctl。

线程池核心参数

1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
  • corePoolSize - 线程池中的核心线程数
    需要注意的是在初创建线程池时线程不会立即启动,直到有任务提交才开始启动线程并逐渐使线程数目达到corePoolSize。
    若想一开始就创建所有核心线程需调用prestartAllCoreThreads方法。
  • maximumPoolSize - 池中允许的最大线程数。
    需要注意的是当核心线程满且阻塞队列也满时才会判断当前线程数是否小于最大线程数,并决定是否创建新线程。
    当阻塞队列是无界队列,则maximumPoolSize则不起作用,因为无法提交至核心线程池的线程会一直持续地放入workQueue。
  • keepAliveTime - 当线程数大于核心时,多余的空闲线程最多存活时间
  • unit - keepAliveTime参数的时间单位。
  • workQueue - 当线程数目超过核心线程数时用于保存任务的队列。
    此队列仅保存实现Runnable接口的任务。
    • ArrayBlockingQueue 基于数组结构的有界阻塞队列,按FIFO排序任务;
    • LinkedBlockingQueue 基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQueue;
    • SynchronousQueue 一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue;
    • PriorityBlockingQueue 具有优先级的无界阻塞队列;
  • threadFactory - 执行程序创建新线程时使用的工厂。
    通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。默认为DefaultThreadFactory
  • handler - 阻塞队列已满且线程数达到最大值时所采取的拒绝策略。
    • AbortPolicy: 直接抛出异常,默认策略;
    • CallerRunsPolicy: 用调用者所在的线程来执行任务;
    • DiscardOldestPolicy: 丢弃阻塞队列中靠最前的任务,并执行当前任务;
    • DiscardPolicy: 直接丢弃任务;
    • 当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

fixed、single、cached

  • Executors.newFixedThreadPool

    1
    2
    3
    4
    5
    6
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>(),
    threadFactory);
    }

    线程池的线程数量达corePoolSize后,即使线程池没有可执行任务时,也不会释放线程。FixedThreadPool的工作队列为无界队列LinkedBlockingQueue(队列容量为Integer.MAX_VALUE), 这会导致以下问题:

    • 线程池里的线程数量不超过corePoolSize,这导致了maximumPoolSize和keepAliveTime将会是个无用参数
    • 由于使用了无界队列, 所以FixedThreadPool永远不会拒绝, 即饱和策略失效
  • Executors.newSingleThreadExecutor

    1
    2
    3
    4
    5
    6
    public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>()));
    }

    初始化的线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行.
    由于使用了无界队列, 所以SingleThreadPool永远不会拒绝, 即饱和策略失效.

  • Executors.newCachedThreadPool

    1
    2
    3
    4
    5
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }

    线程池的线程数可达到Integer.MAX_VALUE,即2147483647,内部使用SynchronousQueue作为阻塞队列;
    和newFixedThreadPool创建的线程池不同,newCachedThreadPool在没有任务执行时,当线程的空闲时间超过keepAliveTime,会自动释放线程资源;当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销;

    执行过程与前两种稍微不同:

    1. 主线程调用SynchronousQueue的offer()方法放入task, 倘若此时线程池中有空闲的线程尝试读取 SynchronousQueue的task, 即调用了SynchronousQueue的poll(), 那么主线程将该task交给空闲线程. 否则执行(2);
    2. 当线程池为空或者没有空闲的线程, 则创建新的线程执行任务;
    3. 执行完任务的线程倘若在60s内仍空闲, 则会被终止. 因此长时间空闲的CachedThreadPool不会持有任何线程资源。

关闭线程池

遍历线程池中的所有线程,然后逐个调用线程的interrupt方法来中断线程。

  • shutdown
    将线程池里的线程状态设置成SHUTDOWN状态, 然后中断所有没有正在执行任务的线程。
  • shutdownNow
    将线程池里的线程状态设置成STOP状态, 然后停止所有正在执行或暂停任务的线程。

只要调用这两个关闭方法中的任意一个,isShutDown() 返回true。
当所有任务都成功关闭了,isTerminated()返回true。

线程池使用规范

【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

Executors返回的线程池对象的弊端如下:

  1. FixedThreadPool 和 SingleThreadPool : 允许的请求队列长度为 Integer.MAX_VALUE ,可能会堆积大量的请求,从而导致 OOM 。
  2. CachedThreadPool 和 ScheduledThreadPool : 允许的创建线程数量为 Integer.MAX_VALUE ,可能会创建大量的线程,从而导致 OOM 。

Spring配置线程池 - ThreadPoolTaskExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
<bean id="userThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10" />
<property name="maxPoolSize" value="100" />
<property name="queueCapacity" value="2000" />

<property name="threadFactory" value= threadFactory />
<property name="rejectedExecutionHandler">
<ref local="rejectedExecutionHandler" />
</property>
</bean>

//in code
userThreadPool.execute(thread);

配置线程池需要考虑因素

任务的优先级、任务的执行时间长短、任务的性质(CPU密集/ IO密集)、任务的依赖关系这四个角度来分析。并且近可能地使用有界的工作队列。

性质不同的任务可用使用不同规模的线程池分开处理:

  • CPU密集型: 尽可能少的线程,Ncpu+1
  • IO密集型: 尽可能多的线程, Ncpu*2,比如数据库连接池;
  • 混合型: CPU密集型的任务与IO密集型任务的执行时间差别较小,拆分为两个线程池;否则没有必要拆分。

ThreadPoolExecutor线程池核心设计与实现

在Java中的体现是ThreadPoolExecutor类。

2.1 总体设计

Java中的线程池核心实现类是ThreadPoolExecutor,本章基于JDK 1.8的源码来分析Java线程池的核心设计与实现。
ThreadPoolExecutor UML类图

Executor接口
ThreadPoolExecutor实现的顶层接口是Executor
顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦
用户无需关注如何创建线程、如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交execute到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。

ExecutorService接口
ExecutorService接口增加了一些能力:(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的submit方法;(2)提供了管控线程池的方法,比如停止线程池的运行。

AbstractExecutorService抽象类
AbstractExecutorService则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。

ThreadPoolExecutor
最下层的实现类ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。

ThreadPoolExecutor是如何运行,如何同时维护线程执行任务的呢?
其运行机制如下图所示:
ThreadPoolExecutor运行流程

线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务、复用线程。
线程池的运行主要分成两部分:任务管理、线程管理。

  1. 任务管理部分
    充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:(1)直接申请线程执行该任务;(2)缓冲到队列中等待线程执行;(3)拒绝该任务。
  2. 线程管理部分
    是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。

接下来,我们会按照以下三个部分去详细讲解线程池运行机制:

  1. 线程池如何维护自身状态。
  2. 线程池如何管理任务。
  3. 线程池如何管理线程。

几个关键属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//这个属性是用来存放 当前运行的worker数量以及线程池状态的
//int是32位的,这里把int的高3位拿来充当线程池状态的标志位,后29位拿来充当当前运行worker的数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//存放任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
//worker的集合,用set来存放
private final HashSet<Worker> workers = new HashSet<Worker>();
//历史达到的worker数最大值
private int largestPoolSize;
//当队列满了并且worker的数量达到maxSize的时候,执行具体的拒绝策略
private volatile RejectedExecutionHandler handler;
//超出coreSize的worker的生存时间
private volatile long keepAliveTime;
//常驻worker的数量
private volatile int corePoolSize;
//最大worker的数量,一般当workQueue满了才会用到这个参数
private volatile int maximumPoolSize;

2.2 线程池生命周期管理

线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。
线程池内部使用一个变量维护两个值:运行状态(runState)线程数量(workerCount)
在具体实现中,线程池将运行状态(runState)和线程数量(workerCount)两个关键参数的维护放在了一起,如下代码所示:

1
2
3
4
5
6
7
/**
* The main pool control state, ctl, is an atomic integer packing
* two conceptual fields
* workerCount, indicating the effective number of threads
* runState, indicating whether running, shutting down etc
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl这个AtomicInteger类型,是对线程池的运行状态线程池中有效线程的数量进行控制的一个字段。
它同时包含两部分的信息:

  • 线程池的运行状态(runState)
  • 线程池内有效线程的数量(workerCount)
  • 高3位保存runState低29位保存workerCount,两个变量之间互不干扰。

用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。

其实并发包中有很多实现都是一个字段存多个值的,比如读写锁的高 16 位存放读锁,低 16 位存放写锁,这种一个字段存放多个值可以更容易的维护多个值之间的一致性,也算是极简主义。

通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。
线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。

关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码所示:

1
2
3
4
5
6
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

private static int runStateOf(int c) { return c & ~CAPACITY; } //计算当前运行状态
private static int workerCountOf(int c) { return c & CAPACITY; } //计算当前线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; } //通过状态和线程数生成ctl

ThreadPoolExecutor的运行状态有5种,分别为:
ThreadPoolExecutor运行状态

其生命周期状态转换如下图所示:
线程池生命周期

2.3 任务执行机制

execute –> addWorker –> runworker(getTask)

2.3.1 任务调度/分配

任务调度是线程池的主要入口,当用户提交了一个任务,接下来这个任务将如何执行都是由这个阶段决定的。
了解这部分就相当于了解了线程池的核心运行机制。

所有任务的调度都是由execute(Runnable command)方法完成的。
这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。

execute(Runnable command)方法执行过程如下:

  1. 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
  2. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
  3. 如果workerCount >= corePoolSize && 线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
  4. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize && 线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
  5. 如果workerCount >= maximumPoolSize && 线程池内的阻塞队列已满,则根据拒绝策略来处理该任务,默认的处理方式是AbortPolicy直接抛异常。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

任务调度流程

2.3.2 任务缓冲

任务缓冲模块是线程池能够管理任务的核心部分。
线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解藕,不让两者直接关联,才可以做后续的分配工作。
线程池中是以生产者消费者模式,通过一个阻塞队列来实现的,阻塞队列缓存任务,工作线程从阻塞队列中获取任务。

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。
这两个附加的操作是:1、在队列为空时,获取元素的线程会等待队列变为非空;2、当队列满时,存储元素的线程会等待队列可用。
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

下图中展示了线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素:
阻塞队列

使用不同的队列可以实现不一样的任务存取策略。
阻塞队列类型

2.3.3 任务申请

由2.3.1的任务调度/分配部分可知,任务的执行有两种可能:1、任务直接由新创建的线程执行;2、线程从任务队列中获取任务然后执行,执行完任务的空闲线程会再次去从队列中申请任务再去执行。
第一种情况仅出现在线程初始创建的时候,第二种是线程获取任务绝大多数的情况。

线程需要从任务缓存模块中不断地取任务执行,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信,这部分策略由getTask方法实现,其执行流程如下图所示:

getTask这部分进行了多次判断,为的是控制线程的数量,使其符合线程池的状态。如果线程池现在不应该持有那么多线程,则会返回null值。工作线程Worker会不断接收新任务去执行,而当工作线程Worker接收不到任务的时候,就会开始被回收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();

// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

2.3.4 任务拒绝

任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。

拒绝策略是一个接口,其设计如下:

1
2
3
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

用户可以通过实现这个接口去定制拒绝策略,也可以选择JDK提供的四种已有拒绝策略,其特点如下:

2.4 Worker线程管理

2.4.1 Worker线程

线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class ThreadPoolExecutor extends AbstractExecutorService {

// 继承AQS,实现Runnable
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// Worker持有的线程,即工作线程
final Thread thread;
// 初始化的任务,可以为null
Runnable firstTask;
// 这个worker已经完成的任务数
volatile long completedTasks;

Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 创建自身持有的线程
this.thread = getThreadFactory().newThread(this);
}

// 实现Runnable接口的run方法
public void run() {
runWorker(this);
}

// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.

protected boolean isHeldExclusively() {
return getState() != 0;
}

protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
}

Worker这个工作线程,继承了AbstractQueuedSynchronizer,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask
thread是在调用Worker的构造方法时通过ThreadFactory来创建的线程,可以用来执行任务。
firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。

Worker执行任务的模型如下图所示:

线程池需要管理线程的生命周期,需要在线程长时间不运行的时候进行回收。
线程池使用HashSet<Worker> workers去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。这个时候重要的就是如何判断线程是否在运行

Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。

  1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中。
  2. 如果正在执行任务,则不应该中断线程。
  3. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。
  4. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态,如果线程是空闲状态则可以安全回收。

在线程回收过程中就使用到了这种特性,回收过程如下图所示:

2.4.2 Worker线程增加

增加线程是通过线程池中的addWorker(Runnable firstTask, boolean core)
该方法的功能就是增加一个线程,该方法不考虑线程池是在哪个阶段增加的该线程,这个分配线程的策略是在上个步骤完成的,该步骤仅仅完成增加线程,并使它运行,最后返回是否成功这个结果。

addWorker方法有两个参数:firstTask、core。
firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;
core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize。

其执行流程如下图所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
private boolean addWorker(Runnable firstTask, boolean core) {\
// CAS更新线程池数量
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;

for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// mainLock 线程池重入锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();

if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 线程启动,执行任务(Worker.thread(firstTask).start());
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

2.4.3 Worker线程执行任务

在Worker类中的run方法调用了runWorker方法来执行任务。

runWorker方法的执行过程如下:

  1. while循环不断地通过getTask()方法获取任务。
  2. getTask()方法从阻塞队列中取任务。
  3. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。
  4. 执行任务。
  5. 如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
// Worker加锁
// Worker可以通过获取独占锁的方式在执行任务前进行加锁,执行后释放锁,这可以保证任务串行执行
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
// Worker解锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); //获取不到任务时,主动回收自己
}
}

2.4.4 Worker线程回收

线程池中线程的销毁依赖JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。
Worker被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当Worker无法获取到任务,也就是获取的任务为空时,循环会结束,Worker会主动消除自身在线程池内的引用。

线程回收的工作是在processWorkerExit方法完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}

tryTerminate();

int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

事实上,在这个方法中,将线程引用移出线程池就已经结束了线程销毁的部分。但由于引起线程销毁的可能性有很多,线程池还要判断是什么引发了这次销毁,是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程。