回到顶部 暗色模式

ThreadPoolExecutor原理

1. 线程池介绍

        为什么使用线程池?

  1. 创建/销毁线程需要消耗系统资源,线程池可以复用已经创建的线程;
  2. 控制并发数;
  3. 统一管理线程。

2. 构造方法

/*
    @param corePoolSize    核心线程数最大值
    @param maximumPoolSize 线程总数最大值
    @param keepAliveTime   非核心线程超时时长
    @param unit            keepAliveTime的时间单位
    @param workQueue       维护待执行的Runnable对象的阻塞队列
    @param threadFactory   线程工厂
    @param handler         拒绝处理策略
*/
public ThreadPoolExecutor(@Range(from = 0, to = java.lang.Integer.MAX_VALUE) int corePoolSize,
                          @Range(from = 1, to = java.lang.Integer.MAX_VALUE) int maximumPoolSize,
                          @Range(from = 0, to = java.lang.Long.MAX_VALUE) long keepAliveTime,
                          @NotNull TimeUnit unit,
                          @NotNull BlockingQueue<Runnable> workQueue,
                          @NotNull ThreadFactory threadFactory,
                          @NotNull RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

public ThreadPoolExecutor(@Range(from = 0, to = java.lang.Integer.MAX_VALUE) int corePoolSize,
                          @Range(from = 1, to = java.lang.Integer.MAX_VALUE) int maximumPoolSize,
                          @Range(from = 0, to = java.lang.Long.MAX_VALUE) long keepAliveTime,
                          @NotNull TimeUnit unit,
                          @NotNull BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(@Range(from = 0, to = java.lang.Integer.MAX_VALUE) int corePoolSize,
                          @Range(from = 1, to = java.lang.Integer.MAX_VALUE) int maximumPoolSize,
                          @Range(from = 0, to = java.lang.Long.MAX_VALUE) long keepAliveTime,
                          @NotNull TimeUnit unit,
                          @NotNull BlockingQueue<Runnable> workQueue,
                          @NotNull ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            threadFactory, defaultHandler);
}

public ThreadPoolExecutor(@Range(from = 0, to = java.lang.Integer.MAX_VALUE) int corePoolSize,
                          @Range(from = 1, to = java.lang.Integer.MAX_VALUE) int maximumPoolSize,
                          @Range(from = 0, to = java.lang.Long.MAX_VALUE) long keepAliveTime,
                          @NotNull TimeUnit unit,
                          @NotNull BlockingQueue<Runnable> workQueue,
                          @NotNull RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), handler);
}

3. 策略

        线程池本身有一个调度线程,管理整个线程池里的各种任务和事务,例如创建线程、销毁线程、任务队列管理、线程队列管理等。

// ctl有两层含义,一部分表示运行状态,一部分表示worker数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// ctl的高位存储线程池状态,通过 ctl & (~CAPACITY) 获得
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

3. 任务处理

        分三步处理:

  1. 如果运行线程数少于 $corePoolSize$ ,尝试创建新线程,这一步需要获取全局锁;
  2. 如果一个任务能被插入队列,再次检查是否需要新线程,或者线程池是否被中断;
  3. 如果不能加入新任务,尝试创建新线程,如果失败,代表线程池已经被中断或者到达线程上限,这时会拒绝任务。
public void execute(@NotNull Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 当前线程数少于corePollSize,创建核心线程
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 不少于corePoolSize,将任务添加到队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 线程池非运行状态,移除任务并执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 线程池处于运行状态,但是没有线程,创建线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 加入队列失败,创建非核心线程
    else if (!addWorker(command, false))
        // 非核心线程创建失败,执行拒绝策略
        reject(command);
}

// 获取Worker数
private static int workerCountOf(int c)  { return c & CAPACITY; }

4. Worker

        线程池中的线程在执行完之后不会销毁,而是继续执行另外的线程任务,这就是线程复用。线程复用的核心就是 $Worker$ ,$ThreadPoolExecutor$ 在创建线程时,会将线程封装成 $Worker$ ,并放入线程组中,然后 $Worker$ 从队列中获取线程执行。

// 全局锁,当访问Worker Set时需要
private final ReentrantLock mainLock = new ReentrantLock();

// 使用set存储线程池中的Worker,当且仅当获取全局锁后才能访问
private final HashSet<Worker> workers = new HashSet<Worker>();

/*
    @param firstTask Worker创建后运行的第一个任务
    @param core      是否为核心线程
*/
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 是否处于运行/是否存在任务/队列是否为空
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
                firstTask == null &&
                ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY || // Worker大于容量
                wc >= (core ? corePoolSize : maximumPoolSize)) // Worker是否达到上限
                return false;
            // CAS更新Worker数
            if (compareAndIncrementWorkerCount(c))
                // 更新成功则跳出循环
                break retry;
            c = ctl.get();
            // 查看状态是否发生改变
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 线程池全局锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 在获取锁后再次检查状态,如果线程池关闭则中止
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // 检查线程是否可以运行
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 如果Worker创建成功,启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            // 线程池终止或者线程创建失败会导致Worker创建失败
            // 创建Worker失败,减少Worker计数并尝试中止线程池
            addWorkerFailed(w);
    }
    return workerStarted;
}

// 获取运行状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }

        $Worker$ 对象的部分源码如下:

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    // 当前Worker运行的线程
    final Thread thread;
    // 初始任务,可能为空
    Runnable firstTask;

    Worker(Runnable firstTask) {
        setState(-1); // 表示中断,运行时会改变
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
            runWorker(this);
    }

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 线程启动后释放锁
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 获取任务
            while (task != null || (task = getTask()) != null) {
                // 加锁保证不被中断(除非线程池被中断)
                w.lock();
                // 如果线程池中断,保证线程中断
                // 如果线程池未中断,保证线程未中断
                // 需要在之后再次检查避免shutdownNow带来的竞态条件
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 空实现
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 空实现
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    // 解锁
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 移除Worker并尝试终止线程池
            processWorkerExit(w, completedAbruptly);
        }
    }

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 线程池中止/队列为空
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // 如果允许核心线程超时,则核心线程空闲后会被销毁
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // 线程数超过最大线程/线程在规定时间内未poll()到队列,并且队列为空
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ? // 当前线程是否允许超时
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 在时限内尝试获取任务
                    workQueue.take(); // 阻塞获取任务(对于核心线程)
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
}

5. 拒绝策略

        当线程数量大于最大线程数时就会采取拒绝处理策略,默认的四种策略为:

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

6. 工厂方法

        $Executor$ 类提供几个静态工厂方法。

6.1 CachedThreadPool

@NotNull
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                    60L, TimeUnit.SECONDS,
                                    new SynchronousQueue<Runnable>());
}
  1. $CachedThreadPool$ 的核心线程数为 $0$ ,线程数上限为 $Integer.MAX_-VALUE$ ,即不存在核心线程;
  2. 任务会添加到 $SynchronousQueue$ 队列中,这是一个没有空间的队列。如果 $SynchronousQueue$ 中已存在任务,新任务会被阻塞。

6.2 FixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>());
}
  1. 核心线程数和最大线程数相等,只能创建核心线程;
  2. 阻塞队列默认大小为 $Integer.MAX_-VALUE$ ,任务会一直等待直到有线程空闲;

6.3 SingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
  1. 有且仅有一个核心线程;
  2. 阻塞队列空间很大,任务会一直阻塞;

6.4 ScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
  1. 指定核心线程数,最大线程数为 $Integer.MAX_-VALUE$ ;
  2. 阻塞队列初始容量为 $16$ ,基于堆实现。

ThreadPoolExecutor原理