Java ThreadPoolExecutor 解析

线程池相对于手动创建线程有以下优势:

  1. 减少创建销毁线程的额外消耗。
  2. CPU资源有限,创建的线程过多,有的任务不能及时响应。线程池可以提高响应速度。
  3. 有效管理线程。

ThreadPoolExecutor(JDK 1.8)

构造函数

线程池的构造函数和主要属性如下:

public class ThreadPoolExecutor extends AbstractExecutorService {

    // 任务阻塞队列(向线程池提交的任务在此排队)
    private final BlockingQueue<Runnable> workQueue;

    // Work缓存,线程池启动执行任务的线程
    private final HashSet<Worker> workers = new HashSet<Worker>();

    // 任务拒绝策略,当不能接收线程时调用的处理方法
    private volatile RejectedExecutionHandler handler;

    // 如果的空闲线程(Worker)空闲时间超过 keepAliveTime
    // 将被回收(从 workers 中移除,减少线程技术)。
    // 大于 0 生效
    private volatile long keepAliveTime;

    // 核心线程数量,不会被回收的线程
    private volatile int corePoolSize;

    // 最大线程数量,
    // maximumPoolSize = corePoolSize,为固定大小线程池
    // maximumPoolSize = Integer.MAX_VALUE,可以无限制的加入任务
    private volatile int maximumPoolSize;

    // 构造函数
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {

        // 检查参数合法性,并初始化变量
    }
}

RejectedExecutionHandler 提供了几种默认实现:

  • AbortPolicy,抛出 RejectedExecutionException,这是线程池中的默认实现
  • CallerRunsPolicy,在调用者的线程中执行 run() 方法
  • DiscardPolicy,不做任何事
  • DiscardOldestPolicy,丢弃 workQueue 队头任务,提交新任务

提交任务

向线程池提交任务有两种方法 execute()submit(),接收的参数不同

// 使用 RunnableFuture 包装 task,调用 execute() 提交
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

// 提交一个 task
public void execute(Runnable command) {
    /**
     * 如果正在运行的线程数少于 corePoolSize,尝试启动新线程(Worker)
     * 并且 task 作为 Worker 的第一个任务
     *
     * 如果一个任务可以成功地排队(进入 workQueue),再次检查线程池状态
     * 判断是否需要回滚任务或启动新线程执行任务
     *
     * 如果一个任务不能成功排队(workQueue 的限制),再次尝试添加一个新的线程。
     * 如果依然失败(线程池状态停止或执行线程数达到上限),执行拒绝任务的调用
    */

    // 获取线程池中的线程数量
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        // 线程池中线程数量小于 corePoolSize,调用 addWorker 添加新线程执行任务。
        // core = true,添加成功返回
        if (addWorker(command, true))
            return;
        // 如果添加失败,重启获取正在运行线程数量(可能多线程原因导致失败)。
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        // 如果线程池状态是 RUNNING,向 workQueue 添加任务,添加成功
        int recheck = ctl.get();
        if (!isRunning(recheck) && remove(command))
            // 再次检查线程数量后发现线程池已经关闭或者数量超出,回滚已经添加的任务(remove)
            // 并且执行拒绝策略
            reject(command);
        else if (workerCountOf(recheck) == 0)
            // 可以调用 addWorker 添加新线程执行任务。
            // core = false
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        // 再次添加线程,失败则调用拒绝策略。
        reject(command);
}

addWorker 方法

// 尝试创建 Worker 线程
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:    
    for (;;) {
        // 线程池状态
        int c = ctl.get();     
        int rs = runStateOf(c);

        for (;;) {
            // 线程运行数量
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                // 检查是否超过数量限制,达到数量限制不创建
                return false;
            // CAS操作更新 c+1
            if (compareAndIncrementWorkerCount(c))
                // 更新成功,跳出外层循环
                break retry;
            c = ctl.get();  
            // 更新失败,重新判断状态
            if (runStateOf(c) != rs)
                // 如果状态变化,跳出内层循环
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // Worker 封装任务
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 获取锁后,检查线程池运行状态
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    // 添加 worker
                    workers.add(w);
                    int s = workers.size();
                    // 池中同时存在的最大线程数。
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (!workerStarted)
            // 移除 worker
            addWorkerFailed(w);
    }
    return workerStarted;
}

Worker 相关方法

Worker 是线程池中的执行线程,需要关注以下方法:

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    // 执行完成的任务计数
    volatile long completedTasks;

    // 构造函数
    Worker(Runnable firstTask) {
        setState(-1); 
        this.firstTask = firstTask;
        // 使用 ThreadFactory 创建线程,指定 thread 的 target 为 this(当前 Worker 线程)
        this.thread = getThreadFactory().newThread(this);
    }

    // 指定了 thread 的 target 为 this
    // thread 执行完成会调用 Worker 线程的 run 方法
    public void run() {
        // 调用线程池的 runWorker
        runWorker(this);
    }

}

runWorker 方法

// Worker 线程方法
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;
    try {
        // 如果 firstTask 不为空,或者 workerQueue 中能取出任务
        // 如果 task == null,表示 Worker 空闲,跳出循环
        while (task != null || (task = getTask()) != null) {
            w.lock();

            // 如果线程池状态为 STOP(TIDYING、TERMINATED)
            // 确保 Worker 线程执行 interrupt()
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();

            try {
                // 提供钩子函数 beforeExecute、afterExecute 可以重写方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 在 Worker 线程中执行任务的方法体
                    task.run();
                } catch (RuntimeException x) {
                    // ...
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // Worker 执行退出(workQueue 中没有任务)
        processWorkerExit(w, completedAbruptly);
    }
}

// 获取 workerQueue 中的任务
private Runnable getTask() {

    for (;;) {
        // 判断线程池的运行状态 ...

        int wc = workerCountOf(c);
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        try {
            // 如果超过 keepAliveTime 没有获取到 workQueue 中的任务,返回 null
            // 如果不需要判断超时时间,一直阻塞,等待任务
            // 如果返回 null,意味着 Worker 线程空闲可以回收,参考 runWorker 方法
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;

        } catch (InterruptedException retry) {

        }
    }
}

// Worker 退出
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 判断是否被打断退出
    if (completedAbruptly) 
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 统计线程池完成任务数
        completedTaskCount += w.completedTasks;
        // 移除 Worker
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // ...

    if (runStateLessThan(c, STOP)) {
        // 线程池 RUNNING 状态
        // 如果 workerQueue 不为空,并且活动线程数量少于 corePoolSize
        // 启动新的 Worker 线程
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

Executors

以上,基本上介绍了 ThreadPoolExecutor 管理线程的方法。Executors 提供了便捷创建多种不同线程池的方法

// 固定大小的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

// 可以无限提交的可缓存线程池
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

// 可以周期性执行任务的线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

// 单线程线程池,串行执行任务
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}


Add a Comment

电子邮件地址不会被公开。 必填项已用*标注

11 + 4 =