百木园-与人分享,
就是让自己快乐。

线程池源码解析

1.创建线程池相关参数

线程池的创建要用ThreadPoolExecutor类的构造方法自定义创建,禁止用Executors的静态方法创建线程池,防止内存溢出和创建过多线程消耗资源。

corePoolSize: 线程池核心线程数量,不会自动销毁,除非设置了参数allowCoreThreadTimeOut=true,那么即使当线程数量小于corePoolSize的时候,当线程
空闲时间大于keepAliveTime,也会被回收
maximumPoolSize: 线程池能容纳的最大线程数量 
keepAliveTime: 一般情况下核心线程不可回收,非核心线程空闲时间大于此时间会被回收
unit: keepAliveTime单位
workQueue: 工作队列,当线程池数量等于corePoolSize的时候,此时任务会先进入到队列,其他线程执行完任务后会从该队列获取任务继续执行
threadFactory: 线程工厂,用来创建线程池的线程同时也可以指定线程的名字以及其他属性
handler: 当线程池线程已满执行拒绝策略

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
        null :
    AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

2.核心方法源码分析

2.1 execute详解

// 线程池执行核心方法
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn\'t, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    // 当线程池线程数量小于corePoolSize时,直接尝试添加一个线程并把command当做这个线程的第一个任务
    if (workerCountOf(c) < corePoolSize) {
        // 若添加失败,则线程池数量不符合或者线程池状态发生变化此时继续往下执行 
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 若线程池状态为Running则把任务添加到阻塞队列中 
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 双重校验线程池状态,因为线程池状态可能在任务刚添加进队列的时候就发生改变,此时需要从队列中移除该任务并执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 到这表明线程池此时状态时Running,然后判断线程池线程数量,使其至少有一个线程能够执行任务 
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 队列已满添加失败,尝试添加线程执行任务若添加失败(线程池线程数量已达到maximumPoolSize或者线程池状态shutdown等)则执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}

2.2 addWorker详解

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        // 线程状态
        int rs = runStateOf(c);
        
        // Check if queue empty only if necessary.
        // 第二个与条件分解 rs >= SHUTDOWN && (rs > SHUTDOWN  || firstTask != null || workQueue.isEmpty())
        // 有三种情况线程池不增加线程
        // 1.线程池状态STOP / TIDYING / TERMINATED
        // 2.线程池状态SHUTDOWN但此时已不接受新提交到线程池的任务
        // 3.线程池状态SHUTDOWN此时工作队列无任务 (由此可见在SHUTDOWN状态下只要队列中还存在任务那么线程池还会增加线程处理)
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 根据增加核心线程还是非核心线程 判断线程池中线程数量是否符合 
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 线程数量+ 1 跳出循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 创建Worker工作者
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                // 线程池状态为RUNNING或者 线程池状态为SHUTDOWN但没有新提交任务(此时增加线程是为了处理阻塞队列中的任务)
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 线程不可能已经激活
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 启动线程开始执行任务 Worker$run()方法
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 启动失败 workers移除线程数量-1
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

2.3 runworker详解

// 上一个方法t.start()运行后就会执行这个方法
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // state = 0, 使得线程可中断
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 执行线程自带的第一个任务或者从任务队列中获取的任务
        while (task != null || (task = getTask()) != null) {
            // 获取独占锁 开始运行任务
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 若线程池状态为STOP时,确保线程有设置中断状态,若线程池状态为RUNING和SHUTDOWN,则会清除线程的中断状态
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 前置钩子方法可重写
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 运行任务 
                    // 由上所知线程池为STOP会给线程设置中断状态,若任务代码逻辑有对中断的相关处理可能会直接抛出中断异常
                    // shutdownNow()方法会让线程池放弃队列中以及正在运行中的任务(若任务中没有对中断进行处理则会继续运行)
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 后置钩子方法可重写
                    afterExecute(task, thrown);
                }
            } finally {
                // 清空任务 worker完成任务数+1
                task = null;
                w.completedTasks++;
                // 解锁
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

2.4 getTask详解

/**
 * Performs blocking or timed wait for a task, depending on
 * current configuration settings, or returns null if this worker
 * must exit because of any of:
 * 1. There are more than maximumPoolSize workers (due to
 *    a call to setMaximumPoolSize).
 * 2. The pool is stopped.
 * 3. The pool is shutdown and the queue is empty.
 * 4. This worker timed out waiting for a task, and timed-out
 *    workers are subject to termination (that is,
 *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
 *    both before and after the timed wait, and if the queue is
 *    non-empty, this worker is not the last thread in the pool.
 *
 * @return task, or null if the worker must exit, in which case
 *         workerCount is decremented
 */
private Runnable getTask() {
    // 是否超时 keepAliveTime
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 如果线程池状态为STOP或者为SHUTDOWN并且任务队列为空则不在执行任务,直接回收线程
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 循环cas直到成功
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // 线程是否空闲回收
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 线程池线程数超过最大限制或者当前线程空闲时间已经超过keepAliveTime
        // 并且线程池线程数大于1或者队列为空那么代表此线程可以回收
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // cas设置线程数减1 cas失败表示线程池状态变化或者其他线程先一步回收使得线程池线程已经减1了
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 是否空闲回收, poll限制从队列获取任务超时返回
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            // 未取到说明线程已空闲keepAliveTime时间 超时可回收
            timedOut = true;
        } catch (InterruptedException retry) {
            // 若线程使用take()或poll()且队列中没有任务,则当调用shutdown()/shutdownNow()时会给线程设置中断状态 
            // 此时会抛出中断异常、并且线程池状态可能已经发生了变化此时开始下一轮循环
            timedOut = false;
        }
    }
}

2.5 processWorkerExit详解

/**
 * Performs cleanup and bookkeeping for a dying worker. Called
 * only from worker threads. Unless completedAbruptly is set,
 * assumes that workerCount has already been adjusted to account
 * for exit.  This method removes thread from worker set, and
 * possibly terminates the pool or replaces the worker if either
 * it exited due to user task exception or if fewer than
 * corePoolSize workers are running or queue is non-empty but
 * there are no workers.
 *
 * @param w the worker
 * @param completedAbruptly if the worker died due to user exception
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // run方法异常 线程数量直接-1
    if (completedAbruptly) // If abrupt, then workerCount wasn\'t adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    //尝试终止线程池,设置最终状态
    tryTerminate();

    int c = ctl.get();
    // 线程池状态为RUNNING或SHUTDONW
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            // min为线程池最小不可回收线程数
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 增加一个线程
        addWorker(null, false);
    }
}

2.6 tryTerminate详解

/**
 * Transitions to TERMINATED state if either (SHUTDOWN and pool
 * and queue empty) or (STOP and pool empty).  If otherwise
 * eligible to terminate but workerCount is nonzero, interrupts an
 * idle worker to ensure that shutdown signals propagate. This
 * method must be called following any action that might make
 * termination possible -- reducing worker count or removing tasks
 * from the queue during shutdown. The method is non-private to
 * allow access from ScheduledThreadPoolExecutor.
 */
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        // 三种情况什么都不做直接返回
        // 1.线程池状态还为RUNNING
        // 2.线程池状态为TIDYING或TERMINATED已终止
        // 3.线程池状态为SHUTDOWN但阻塞队列中还有任务
        // 当状态为STOP或者为SHUTDOW且队列为空才会往下执行
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        // 线程池是否还存在线程 尝试中断一个线程,进行传播
        // 如何理解? 当一个线程从等待中中断后,getTask()返回null,后续会执行processWorkerExit,
        // 而processWorkerExit里的tryTerminate会再次尝试终止线程池或再中断一个线程以达到传播的目的,妙哉
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        // 到这里说明线程池里的线程都已经回收了,可以尝试终止线程池了
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 线程池设置终止状态了
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

2.7 interruptIdleWorkers详解

/**
 * Interrupts threads that might be waiting for tasks (as
 * indicated by not being locked) so they can check for
 * termination or configuration changes. Ignores
 * SecurityExceptions (in which case some threads may remain
 * uninterrupted).
 *
 * @param onlyOne If true, interrupt at most one worker. This is
 * called only from tryTerminate when termination is otherwise
 * enabled but there are still other workers.  In this case, at
 * most one waiting worker is interrupted to propagate shutdown
 * signals in case all threads are currently waiting.(向其他正在等待的线程传递SHUTDOWN关闭信号)
 * Interrupting any arbitrary thread ensures that newly arriving
 * workers since shutdown began will also eventually exit.
 * To guarantee eventual termination, it suffices to always
 * interrupt only one idle worker, but shutdown() interrupts all
 * idle workers so that redundant workers exit promptly, not
 * waiting for a straggler task to finish.
 */
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // 线程非中断并且尝试获取到了worker的锁则给线程设置中断状态
            // w.tryLock() = true,说明线程t没有执行任务空闲,可能在getTask()中阻塞等待任务
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

2.8 shutdown详解

/**
 * Initiates an orderly shutdown in which previously submitted
 * tasks are executed, but no new tasks will be accepted.
 * Invocation has no additional effect if already shut down.
 *
 * <p>This method does not wait for previously submitted tasks to
 * complete execution.  Use {@link #awaitTermination awaitTermination}
 * to do that.
 *
 * @throws SecurityException {@inheritDoc} 
 */
// 尝试停止线程池
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 线程池状态设置为SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 中断所有空闲的线程
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

2.9 shutdownNow详解

/**
 * Attempts to stop all actively executing tasks, halts the
 * processing of waiting tasks, and returns a list of the tasks
 * that were awaiting execution. These tasks are drained (removed)
 * from the task queue upon return from this method.
 *
 * <p>This method does not wait for actively executing tasks to
 * terminate.  Use {@link #awaitTermination awaitTermination} to
 * do that.
 *
 * <p>There are no guarantees beyond best-effort attempts to stop
 * processing actively executing tasks.  This implementation
 * cancels tasks via {@link Thread#interrupt}, so any task that
 * fails to respond to interrupts may never terminate.
 *
 * @throws SecurityException {@inheritDoc}
 */
// 取消执行队列中的和正在执行中的任务
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 线程池状态设置为STOP
        advanceRunState(STOP);
        // 中断线程池中的所有线程,即使是在执行任务中的线程
        interruptWorkers();
        // 获取队列中的任务并且清空队列
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    // 尝试终止线程池,若线程池为空则终止完成,否则依赖传播来终止线程池
    tryTerminate();
    return tasks;
}

3.问题

3.1 线程池被 shutdown 后,还能产生新的线程?

线程池被shutdown后在阻塞队列还有任务的情况下会产生新的线程,见addWorker方法详解

3.2 线程把任务丢给线程池后肯定就马上返回了?

线程把任务丢给线程池后不是立即返回,当线程池线程数量小于核心线程数的时候会接着创建一个线程并直接运行这个提交到线程池的任务,当线程池线程数量等于核心线程数会将任务添加到阻塞队列(若添加失败则使用拒绝策略,添加成功会判断当前线程池是否有线程存在,不存在则创建一个线程),若任务队列已满则尝试创建一个非核心线程运行此任务。参考execute方法

3.3 线程池里的线程异常后会再次新增线程吗,如何捕获这些线程抛出的异常?

线程池里的线程运行异常会抛出异常且会将该worker移除,若线程池状态不为STOP则会再次新增线程。

1.在run方法中捕获

2.通过submit提交任务获取future,之后调用get()方法若线程有抛出异常则会捕获到

3.重写afterExecute()方法,此方法会以runnable和throwable为入参

4.通过Thread的uncaughtExceptionHander处理

3.4 线程池的大小如何设置,如何动态设置线程池的参数

setMaximumPoolSize 若当前线程池线程数量大于设置的maximumPoolSize 会尝试中断线程,达到回收多余线程的目的

setCorePoolSize 若当前线程池线程数量大于设置的corePoolSize会尝试中断线程,若设置值大于原先的corePoolSize则会根据队列中的任务创建合适数量的线程来执行任务

3.5 阿里 Java 代码规范为什么不允许使用 Executors 快速创建线程池?

newFixedThreadPool和newSingleThreadExecutor创建workQueue LinkedBlockingQueue未声明大小相当于创建无界队列,若任务数量过多添加到队列中可能会导致OOM

newCachedThreadPool和newScheduledThreadPool最大线程数设置为Integer.MAX_VALUE也可能导致OOM

3.6 如何优雅关闭线程池

调用shutdown()或shutdownNow()方法关闭线程池后使用awaitTermination方法等待线程池线程和队列任务清空变为TERMINATED状态

3.7 使用线程池应该避免哪些问题,能否简单说下线程池的最佳实践?

1.线程池执行的任务应该是相互独立的,如果都在一个线程池里执行可能会导致死锁

2.核心任务与非核心任务最好能用多个线程池隔离开来,非核心任务可能过多导致核心任务堆积在队列中无法及时执行,影响业务

3.线程池各个参数很难一次性确定,可以添加告警,比如三分钟内队列任务数都是满的情况下触发告警,支持动态调整修改线程池的核心线程数和最大线程数。

 

4. 结语   

本篇文章简要分析了线程池ThreadPoolExecutor中比较重要的几个方法以及对几个常见问题的理解,如果理解的有问题欢迎读者大佬指出讨论,谢谢~


来源:https://www.cnblogs.com/monianxd/p/16282204.html
本站部分图文来源于网络,如有侵权请联系删除。

未经允许不得转载:百木园 » 线程池源码解析

相关推荐

  • 暂无文章